xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 3aa204fb3138c43e63b868e488277f13b098cef1)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16from common import *
17
18
19class Server:
20    def __init__(self, name, username, password, mode, rdma_ips):
21        self.name = name
22        self.mode = mode
23        self.username = username
24        self.password = password
25        self.rdma_ips = rdma_ips
26
27        if not re.match("^[A-Za-z0-9]*$", name):
28            self.log_print("Please use a name which contains only letters or numbers")
29            sys.exit(1)
30
31    def log_print(self, msg):
32        print("[%s] %s" % (self.name, msg), flush=True)
33
34
35class Target(Server):
36    def __init__(self, name, username, password, mode, rdma_ips, use_null_block=False, sar_settings=None):
37        super(Target, self).__init__(name, username, password, mode, rdma_ips)
38        self.null_block = bool(use_null_block)
39        self.enable_sar = False
40        if sar_settings:
41            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
42
43        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
44        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
45
46    def zip_spdk_sources(self, spdk_dir, dest_file):
47        self.log_print("Zipping SPDK source directory")
48        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
49        for root, directories, files in os.walk(spdk_dir):
50            for file in files:
51                fh.write(os.path.relpath(os.path.join(root, file)))
52        fh.close()
53        self.log_print("Done zipping")
54
55    def read_json_stats(self, file):
56        with open(file, "r") as json_data:
57            data = json.load(json_data)
58            job_pos = 0  # job_post = 0 because using aggregated results
59
60            # Check if latency is in nano or microseconds to choose correct dict key
61            def get_lat_unit(key_prefix, dict_section):
62                # key prefix - lat, clat or slat.
63                # dict section - portion of json containing latency bucket in question
64                # Return dict key to access the bucket and unit as string
65                for k, v in dict_section.items():
66                    if k.startswith(key_prefix):
67                        return k, k.split("_")[1]
68
69            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
70            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
71            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
72            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
73            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
74            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
75            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
76            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
77
78            if "ns" in lat_unit:
79                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
80            if "ns" in clat_unit:
81                read_p99_lat = read_p99_lat / 1000
82
83            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
84            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
85            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
86            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
87            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
88            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
89            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
90            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
91
92            if "ns" in lat_unit:
93                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
94            if "ns" in clat_unit:
95                write_p99_lat = write_p99_lat / 1000
96
97        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, read_p99_lat,
98                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, write_p99_lat]
99
100    def parse_results(self, results_dir, initiator_count=None, run_num=None):
101        files = os.listdir(results_dir)
102        fio_files = filter(lambda x: ".fio" in x, files)
103        json_files = [x for x in files if ".json" in x]
104
105        # Create empty results file
106        csv_file = "nvmf_results.csv"
107        with open(os.path.join(results_dir, csv_file), "w") as fh:
108            header_line = ",".join(["Name",
109                                    "read_iops", "read_bw", "read_avg_lat_us",
110                                    "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us",
111                                    "write_iops", "write_bw", "write_avg_lat_us",
112                                    "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us"])
113            fh.write(header_line + "\n")
114        rows = set()
115
116        for fio_config in fio_files:
117            self.log_print("Getting FIO stats for %s" % fio_config)
118            job_name, _ = os.path.splitext(fio_config)
119
120            # If "_CPU" exists in name - ignore it
121            # Initiators for the same job could have diffrent num_cores parameter
122            job_name = re.sub(r"_\d+CPU", "", job_name)
123            job_result_files = [x for x in json_files if job_name in x]
124            self.log_print("Matching result files for current fio config:")
125            for j in job_result_files:
126                self.log_print("\t %s" % j)
127
128            # There may have been more than 1 initiator used in test, need to check that
129            # Result files are created so that string after last "_" separator is server name
130            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
131            inits_avg_results = []
132            for i in inits_names:
133                self.log_print("\tGetting stats for initiator %s" % i)
134                # There may have been more than 1 test run for this job, calculate average results for initiator
135                i_results = [x for x in job_result_files if i in x]
136
137                separate_stats = []
138                for r in i_results:
139                    stats = self.read_json_stats(os.path.join(results_dir, r))
140                    separate_stats.append(stats)
141                    self.log_print(stats)
142
143                z = [sum(c) for c in zip(*separate_stats)]
144                z = [c/len(separate_stats) for c in z]
145                inits_avg_results.append(z)
146
147                self.log_print("\tAverage results for initiator %s" % i)
148                self.log_print(z)
149
150            # Sum average results of all initiators running this FIO job
151            self.log_print("\tTotal results for %s from all initiators" % fio_config)
152            for a in inits_avg_results:
153                self.log_print(a)
154            total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)]
155            rows.add(",".join([job_name, *total]))
156
157            # Save results to file
158            for row in rows:
159                with open(os.path.join(results_dir, csv_file), "a") as fh:
160                    fh.write(row + "\n")
161
162    def measure_sar(self, results_dir, sar_file_name):
163        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
164        time.sleep(self.sar_delay)
165        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
166        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
167            for line in out.split("\n"):
168                if "Average" in line and "CPU" in line:
169                    self.log_print("Summary CPU utilization from SAR:")
170                    self.log_print(line)
171                if "Average" in line and "all" in line:
172                    self.log_print(line)
173            fh.write(out)
174
175
176class Initiator(Server):
177    def __init__(self, name, username, password, mode, rdma_ips, ip, workspace="/tmp/spdk"):
178        super(Initiator, self).__init__(name, username, password, mode, rdma_ips)
179        self.ip = ip
180        self.spdk_dir = workspace
181
182        self.ssh_connection = paramiko.SSHClient()
183        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy)
184        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
185        stdin, stdout, stderr = self.ssh_connection.exec_command("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
186        stdout.channel.recv_exit_status()
187        stdin, stdout, stderr = self.ssh_connection.exec_command("mkdir -p %s" % self.spdk_dir)
188        stdout.channel.recv_exit_status()
189
190    def __del__(self):
191        self.ssh_connection.close()
192
193    def put_file(self, local, remote_dest):
194        ftp = self.ssh_connection.open_sftp()
195        ftp.put(local, remote_dest)
196        ftp.close()
197
198    def get_file(self, remote, local_dest):
199        ftp = self.ssh_connection.open_sftp()
200        ftp.get(remote, local_dest)
201        ftp.close()
202
203    def copy_result_files(self, dest_dir):
204        self.log_print("Copying results")
205
206        if not os.path.exists(dest_dir):
207            os.mkdir(dest_dir)
208
209        # Get list of result files from initiator and copy them back to target
210        stdin, stdout, stderr = self.ssh_connection.exec_command("ls %s/nvmf_perf" % self.spdk_dir)
211
212        file_list = stdout.read().decode(encoding="utf-8").strip().split("\n")
213        for file in file_list:
214            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
215                          os.path.join(dest_dir, file))
216        self.log_print("Done copying results")
217
218    def discover_subsystems(self, address_list, subsys_no):
219        num_nvmes = range(0, subsys_no)
220        nvme_discover_output = ""
221        for ip, subsys_no in itertools.product(address_list, num_nvmes):
222            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
223            nvme_discover_cmd = ["sudo", "nvme", "discover", "-t rdma", "-s %s" % (4420 + subsys_no), "-a %s" % ip]
224            nvme_discover_cmd = " ".join(nvme_discover_cmd)
225
226            stdin, stdout, stderr = self.ssh_connection.exec_command(nvme_discover_cmd)
227            out = stdout.read().decode(encoding="utf-8")
228            if out:
229                nvme_discover_output = nvme_discover_output + out
230
231        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
232                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
233                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
234                                nvme_discover_output)  # from nvme discovery output
235        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
236        subsystems = list(set(subsystems))
237        subsystems.sort(key=lambda x: x[1])
238        self.log_print("Found matching subsystems on target side:")
239        for s in subsystems:
240            self.log_print(s)
241
242        return subsystems
243
244    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
245        fio_conf_template = """
246[global]
247ioengine={ioengine}
248{spdk_conf}
249thread=1
250group_reporting=1
251direct=1
252
253norandommap=1
254rw={rw}
255rwmixread={rwmixread}
256bs={block_size}
257iodepth={io_depth}
258time_based=1
259ramp_time={ramp_time}
260runtime={run_time}
261"""
262        if "spdk" in self.mode:
263            subsystems = self.discover_subsystems(self.rdma_ips, subsys_no)
264            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
265            stdin, stdout, stderr = self.ssh_connection.exec_command("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
266            ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir
267            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
268            filename_section = self.gen_fio_filename_conf(subsystems)
269        else:
270            ioengine = "libaio"
271            spdk_conf = ""
272            filename_section = self.gen_fio_filename_conf()
273
274        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
275                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
276                                              io_depth=io_depth, ramp_time=ramp_time, run_time=run_time)
277        if num_jobs:
278            fio_config = fio_config + "numjobs=%s" % num_jobs
279        fio_config = fio_config + filename_section
280
281        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
282        if hasattr(self, "num_cores"):
283            fio_config_filename += "_%sCPU" % self.num_cores
284        fio_config_filename += ".fio"
285
286        stdin, stdout, stderr = self.ssh_connection.exec_command("mkdir -p %s/nvmf_perf" % self.spdk_dir)
287        stdin, stdout, stderr = self.ssh_connection.exec_command(
288            "echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
289        stdout.channel.recv_exit_status()
290        self.log_print("Created FIO Config:")
291        self.log_print(fio_config)
292
293        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
294
295    def run_fio(self, fio_config_file, run_num=None):
296        job_name, _ = os.path.splitext(fio_config_file)
297        self.log_print("Starting FIO run for job: %s" % job_name)
298        if run_num:
299            for i in range(1, run_num + 1):
300                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
301                cmd = "sudo /usr/src/fio/fio %s --output-format=json --output=%s" % (fio_config_file, output_filename)
302                stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
303                output = stdout.read().decode(encoding="utf-8")
304                error = stderr.read().decode(encoding="utf-8")
305                self.log_print(output)
306                self.log_print(error)
307        else:
308            output_filename = job_name + "_" + self.name + ".json"
309            cmd = "sudo /usr/src/fio/fio %s --output-format=json --output=%s" % (fio_config_file, output_filename)
310            stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
311            output = stdout.read().decode(encoding="utf-8")
312            error = stderr.read().decode(encoding="utf-8")
313            self.log_print(output)
314            self.log_print(error)
315        self.log_print("FIO run finished. Results in: %s" % output_filename)
316
317
318class KernelTarget(Target):
319    def __init__(self, name, username, password, mode, rdma_ips, use_null_block=False, sar_settings=None, nvmet_dir=None, **kwargs):
320        super(KernelTarget, self).__init__(name, username, password, mode, rdma_ips, use_null_block, sar_settings)
321
322        if nvmet_dir:
323            self.nvmet_bin = os.path.join(nvmet_dir, "nvmetcli")
324        else:
325            self.nvmet_bin = "nvmetcli"
326
327    def __del__(self):
328        nvmet_command(self.nvmet_bin, "clear")
329
330    def kernel_tgt_gen_nullblock_conf(self, address):
331        nvmet_cfg = {
332            "ports": [],
333            "hosts": [],
334            "subsystems": [],
335        }
336
337        nvmet_cfg["subsystems"].append({
338            "allowed_hosts": [],
339            "attr": {
340                "allow_any_host": "1",
341                "version": "1.3"
342            },
343            "namespaces": [
344                {
345                    "device": {
346                        "path": "/dev/nullb0",
347                        "uuid": "%s" % uuid.uuid4()
348                    },
349                    "enable": 1,
350                    "nsid": 1
351                }
352            ],
353            "nqn": "nqn.2018-09.io.spdk:cnode1"
354        })
355
356        nvmet_cfg["ports"].append({
357            "addr": {
358                "adrfam": "ipv4",
359                "traddr": address,
360                "trsvcid": "4420",
361                "trtype": "rdma"
362            },
363            "portid": 1,
364            "referrals": [],
365            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
366        })
367        with open("kernel.conf", 'w') as fh:
368            fh.write(json.dumps(nvmet_cfg, indent=2))
369
370    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
371
372        nvmet_cfg = {
373            "ports": [],
374            "hosts": [],
375            "subsystems": [],
376        }
377
378        # Split disks between NIC IP's
379        disks_per_ip = int(len(nvme_list) / len(address_list))
380        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
381
382        subsys_no = 1
383        port_no = 0
384        for ip, chunk in zip(address_list, disk_chunks):
385            for disk in chunk:
386                nvmet_cfg["subsystems"].append({
387                    "allowed_hosts": [],
388                    "attr": {
389                        "allow_any_host": "1",
390                        "version": "1.3"
391                    },
392                    "namespaces": [
393                        {
394                            "device": {
395                                "path": disk,
396                                "uuid": "%s" % uuid.uuid4()
397                            },
398                            "enable": 1,
399                            "nsid": subsys_no
400                        }
401                    ],
402                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
403                })
404
405                nvmet_cfg["ports"].append({
406                    "addr": {
407                        "adrfam": "ipv4",
408                        "traddr": ip,
409                        "trsvcid": "%s" % (4420 + port_no),
410                        "trtype": "rdma"
411                    },
412                    "portid": subsys_no,
413                    "referrals": [],
414                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
415                })
416                subsys_no += 1
417                port_no += 1
418
419        with open("kernel.conf", "w") as fh:
420            fh.write(json.dumps(nvmet_cfg, indent=2))
421        pass
422
423    def tgt_start(self):
424        self.log_print("Configuring kernel NVMeOF Target")
425
426        if self.null_block:
427            print("Configuring with null block device.")
428            if len(self.rdma_ips) > 1:
429                print("Testing with null block limited to single RDMA NIC.")
430                print("Please specify only 1 IP address.")
431                exit(1)
432            self.subsys_no = 1
433            self.kernel_tgt_gen_nullblock_conf(self.rdma_ips[0])
434        else:
435            print("Configuring with NVMe drives.")
436            nvme_list = get_nvme_devices()
437            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.rdma_ips)
438            self.subsys_no = len(nvme_list)
439
440        nvmet_command(self.nvmet_bin, "clear")
441        nvmet_command(self.nvmet_bin, "restore kernel.conf")
442        self.log_print("Done configuring kernel NVMeOF Target")
443
444
445class SPDKTarget(Target):
446    def __init__(self, name, username, password, mode, rdma_ips, num_cores, num_shared_buffers=4096,
447                 use_null_block=False, sar_settings=None, **kwargs):
448        super(SPDKTarget, self).__init__(name, username, password, mode, rdma_ips, use_null_block, sar_settings)
449        self.num_cores = num_cores
450        self.num_shared_buffers = num_shared_buffers
451
452    def spdk_tgt_configure(self):
453        self.log_print("Configuring SPDK NVMeOF target via RPC")
454        numa_list = get_used_numa_nodes()
455
456        # Create RDMA transport layer
457        rpc.nvmf.nvmf_create_transport(self.client, trtype="RDMA", num_shared_buffers=self.num_shared_buffers)
458        self.log_print("SPDK NVMeOF transport layer:")
459        rpc.client.print_dict(rpc.nvmf.get_nvmf_transports(self.client))
460
461        if self.null_block:
462            nvme_section = self.spdk_tgt_add_nullblock()
463            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.rdma_ips, req_num_disks=1)
464        else:
465            nvme_section = self.spdk_tgt_add_nvme_conf()
466            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.rdma_ips)
467        self.log_print("Done configuring SPDK NVMeOF Target")
468
469    def spdk_tgt_add_nullblock(self):
470        self.log_print("Adding null block bdev to config via RPC")
471        rpc.bdev.construct_null_bdev(self.client, 102400, 4096, "Nvme0n1")
472        self.log_print("SPDK Bdevs configuration:")
473        rpc.client.print_dict(rpc.bdev.get_bdevs(self.client))
474
475    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
476        self.log_print("Adding NVMe bdevs to config via RPC")
477
478        bdfs = get_nvme_devices_bdf()
479        bdfs = [b.replace(":", ".") for b in bdfs]
480
481        if req_num_disks:
482            if req_num_disks > len(bdfs):
483                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
484                sys.exit(1)
485            else:
486                bdfs = bdfs[0:req_num_disks]
487
488        for i, bdf in enumerate(bdfs):
489            rpc.bdev.construct_nvme_bdev(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
490
491        self.log_print("SPDK Bdevs configuration:")
492        rpc.client.print_dict(rpc.bdev.get_bdevs(self.client))
493
494    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
495        self.log_print("Adding subsystems to config")
496        if not req_num_disks:
497            req_num_disks = get_nvme_devices_count()
498
499        # Distribute bdevs between provided NICs
500        num_disks = range(1, req_num_disks + 1)
501        disks_per_ip = int(len(num_disks) / len(ips))
502        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
503
504        # Create subsystems, add bdevs to namespaces, add listeners
505        for ip, chunk in zip(ips, disk_chunks):
506            for c in chunk:
507                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
508                serial = "SPDK00%s" % c
509                bdev_name = "Nvme%sn1" % (c - 1)
510                rpc.nvmf.nvmf_subsystem_create(self.client, nqn, serial,
511                                               allow_any_host=True, max_namespaces=8)
512                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
513
514                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
515                                                     trtype="RDMA",
516                                                     traddr=ip,
517                                                     trsvcid="4420",
518                                                     adrfam="ipv4")
519
520        self.log_print("SPDK NVMeOF subsystem configuration:")
521        rpc.client.print_dict(rpc.nvmf.get_nvmf_subsystems(self.client))
522
523    def tgt_start(self):
524        self.subsys_no = get_nvme_devices_count()
525        self.log_print("Starting SPDK NVMeOF Target process")
526        nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt")
527        command = " ".join([nvmf_app_path, "-m", self.num_cores])
528        proc = subprocess.Popen(command, shell=True)
529        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
530
531        with open(self.pid, "w") as fh:
532            fh.write(str(proc.pid))
533        self.nvmf_proc = proc
534        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
535        self.log_print("Waiting for spdk to initilize...")
536        while True:
537            if os.path.exists("/var/tmp/spdk.sock"):
538                break
539            time.sleep(1)
540        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
541
542        self.spdk_tgt_configure()
543
544    def __del__(self):
545        if hasattr(self, "nvmf_proc"):
546            try:
547                self.nvmf_proc.terminate()
548                self.nvmf_proc.wait()
549            except Exception as e:
550                self.log_print(e)
551                self.nvmf_proc.kill()
552                self.nvmf_proc.communicate()
553
554
555class KernelInitiator(Initiator):
556    def __init__(self, name, username, password, mode, rdma_ips, ip, **kwargs):
557        super(KernelInitiator, self).__init__(name, username, password, mode, rdma_ips, ip)
558
559    def __del__(self):
560        self.ssh_connection.close()
561
562    def kernel_init_connect(self, address_list, subsys_no):
563        subsystems = self.discover_subsystems(address_list, subsys_no)
564        self.log_print("Below connection attempts may result in error messages, this is expected!")
565        for subsystem in subsystems:
566            self.log_print("Trying to connect %s %s %s" % subsystem)
567            cmd = "sudo nvme connect -t rdma -s %s -n %s -a %s" % subsystem
568            stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
569            time.sleep(2)
570
571    def kernel_init_disconnect(self, address_list, subsys_no):
572        subsystems = self.discover_subsystems(address_list, subsys_no)
573        for subsystem in subsystems:
574            cmd = "sudo nvme disconnect -n %s" % subsystem[1]
575            stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
576            time.sleep(1)
577
578    def gen_fio_filename_conf(self):
579        stdin, stdout, stderr = self.ssh_connection.exec_command("lsblk -o NAME -nlp")
580        out = stdout.read().decode(encoding="utf-8")
581        nvme_list = [x for x in out.split("\n") if "nvme" in x]
582
583        filename_section = ""
584        for i, nvme in enumerate(nvme_list):
585            filename_section = "\n".join([filename_section,
586                                          "[filename%s]" % i,
587                                          "filename=%s" % nvme])
588
589        return filename_section
590
591
592class SPDKInitiator(Initiator):
593    def __init__(self, name, username, password, mode, rdma_ips, ip, num_cores=None, **kwargs):
594        super(SPDKInitiator, self).__init__(name, username, password, mode, rdma_ips, ip)
595        if num_cores:
596            self.num_cores = num_cores
597
598    def install_spdk(self, local_spdk_zip):
599        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
600        self.log_print("Copied sources zip from target")
601        stdin, stdout, stderr = self.ssh_connection.exec_command("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
602        stdout.channel.recv_exit_status()
603
604        self.log_print("Sources unpacked")
605        stdin, stdout, stderr = self.ssh_connection.exec_command(
606            "cd %s; git submodule update --init; ./configure --with-rdma --with-fio=/usr/src/fio;"
607            "make clean; make -j$(($(nproc)*2))" % self.spdk_dir)
608        stdout.channel.recv_exit_status()
609
610        self.log_print("SPDK built")
611        stdin, stdout, stderr = self.ssh_connection.exec_command("sudo %s/scripts/setup.sh" % self.spdk_dir)
612        stdout.channel.recv_exit_status()
613
614    def gen_spdk_bdev_conf(self, remote_subsystem_list):
615        header = "[Nvme]"
616        row_template = """  TransportId "trtype:RDMA adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
617
618        bdev_rows = [row_template.format(svc=x[0],
619                                         nqn=x[1],
620                                         ip=x[2],
621                                         i=i) for i, x in enumerate(remote_subsystem_list)]
622        bdev_rows = "\n".join(bdev_rows)
623        bdev_section = "\n".join([header, bdev_rows])
624        return bdev_section
625
626    def gen_fio_filename_conf(self, remote_subsystem_list):
627        subsystems = [str(x) for x in range(0, len(remote_subsystem_list))]
628
629        # If num_cpus exists then limit FIO to this number of CPUs
630        # Otherwise - each connected subsystem gets its own CPU
631        if hasattr(self, 'num_cores'):
632            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
633            threads = range(0, int(self.num_cores))
634        else:
635            threads = range(0, len(subsystems))
636
637        n = int(len(subsystems) / len(threads))
638
639        filename_section = ""
640        for t in threads:
641            header = "[filename%s]" % t
642            disks = "\n".join(["filename=Nvme%sn1" % x for x in subsystems[n * t:n + n * t]])
643            filename_section = "\n".join([filename_section, header, disks])
644
645        return filename_section
646
647
648if __name__ == "__main__":
649    spdk_zip_path = "/tmp/spdk.zip"
650    target_results_dir = "/tmp/results"
651
652    with open("./scripts/perf/nvmf/config.json", "r") as config:
653        data = json.load(config)
654
655    initiators = []
656    fio_cases = []
657
658    # Read user/pass first as order of objects is undeffined
659    uname = data['general']["username"]
660    passwd = data['general']["password"]
661
662    for k, v in data.items():
663        if "target" in k:
664            if data[k]["mode"] == "spdk":
665                target_obj = SPDKTarget(name=k, username=uname, password=passwd, **v)
666            elif data[k]["mode"] == "kernel":
667                target_obj = KernelTarget(name=k, username=uname, password=passwd, **v)
668        elif "initiator" in k:
669            if data[k]["mode"] == "spdk":
670                init_obj = SPDKInitiator(name=k, username=uname, password=passwd, **v)
671            elif data[k]["mode"] == "kernel":
672                init_obj = KernelInitiator(name=k, username=uname, password=passwd, **v)
673            initiators.append(init_obj)
674        elif "fio" in k:
675            fio_workloads = itertools.product(data[k]["bs"],
676                                              data[k]["qd"],
677                                              data[k]["rw"])
678
679            fio_run_time = data[k]["run_time"]
680            fio_ramp_time = data[k]["ramp_time"]
681            fio_rw_mix_read = data[k]["rwmixread"]
682            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
683            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
684        else:
685            continue
686
687    # Copy and install SPDK on remote initiators
688    target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
689    threads = []
690    for i in initiators:
691        if i.mode == "spdk":
692            t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
693            threads.append(t)
694            t.start()
695    for t in threads:
696        t.join()
697
698    target_obj.tgt_start()
699
700    # Poor mans threading
701    # Run FIO tests
702    for block_size, io_depth, rw in fio_workloads:
703        threads = []
704        configs = []
705        for i in initiators:
706            if i.mode == "kernel":
707                i.kernel_init_connect(i.rdma_ips, target_obj.subsys_no)
708
709            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
710                                   fio_num_jobs, fio_ramp_time, fio_run_time)
711            configs.append(cfg)
712
713        for i, cfg in zip(initiators, configs):
714            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
715            threads.append(t)
716        if target_obj.enable_sar:
717            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
718            sar_file_name = ".".join([sar_file_name, "txt"])
719            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
720            threads.append(t)
721
722        for t in threads:
723            t.start()
724        for t in threads:
725            t.join()
726
727        for i in initiators:
728            if i.mode == "kernel":
729                i.kernel_init_disconnect(i.rdma_ips, target_obj.subsys_no)
730            i.copy_result_files(target_results_dir)
731
732    target_obj.parse_results(target_results_dir)
733