xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 9889ab2dc80e40dae92dcef361d53dcba722043d)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16from common import *
17
18
19class Server:
20    def __init__(self, name, username, password, mode, nic_ips, transport):
21        self.name = name
22        self.mode = mode
23        self.username = username
24        self.password = password
25        self.nic_ips = nic_ips
26        self.transport = transport.lower()
27
28        if not re.match("^[A-Za-z0-9]*$", name):
29            self.log_print("Please use a name which contains only letters or numbers")
30            sys.exit(1)
31
32    def log_print(self, msg):
33        print("[%s] %s" % (self.name, msg), flush=True)
34
35
36class Target(Server):
37    def __init__(self, name, username, password, mode, nic_ips, transport="rdma", use_null_block=False, sar_settings=None):
38        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
39        self.null_block = bool(use_null_block)
40        self.enable_sar = False
41        if sar_settings:
42            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
43
44        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
45        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
46
47    def zip_spdk_sources(self, spdk_dir, dest_file):
48        self.log_print("Zipping SPDK source directory")
49        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
50        for root, directories, files in os.walk(spdk_dir, followlinks=True):
51            for file in files:
52                fh.write(os.path.relpath(os.path.join(root, file)))
53        fh.close()
54        self.log_print("Done zipping")
55
56    def read_json_stats(self, file):
57        with open(file, "r") as json_data:
58            data = json.load(json_data)
59            job_pos = 0  # job_post = 0 because using aggregated results
60
61            # Check if latency is in nano or microseconds to choose correct dict key
62            def get_lat_unit(key_prefix, dict_section):
63                # key prefix - lat, clat or slat.
64                # dict section - portion of json containing latency bucket in question
65                # Return dict key to access the bucket and unit as string
66                for k, v in dict_section.items():
67                    if k.startswith(key_prefix):
68                        return k, k.split("_")[1]
69
70            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
71            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
72            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
73            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
74            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
75            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
76            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
77            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
78
79            if "ns" in lat_unit:
80                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
81            if "ns" in clat_unit:
82                read_p99_lat = read_p99_lat / 1000
83
84            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
85            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
86            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
87            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
88            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
89            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
90            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
91            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
92
93            if "ns" in lat_unit:
94                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
95            if "ns" in clat_unit:
96                write_p99_lat = write_p99_lat / 1000
97
98        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, read_p99_lat,
99                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, write_p99_lat]
100
101    def parse_results(self, results_dir, initiator_count=None, run_num=None):
102        files = os.listdir(results_dir)
103        fio_files = filter(lambda x: ".fio" in x, files)
104        json_files = [x for x in files if ".json" in x]
105
106        # Create empty results file
107        csv_file = "nvmf_results.csv"
108        with open(os.path.join(results_dir, csv_file), "w") as fh:
109            header_line = ",".join(["Name",
110                                    "read_iops", "read_bw", "read_avg_lat_us",
111                                    "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us",
112                                    "write_iops", "write_bw", "write_avg_lat_us",
113                                    "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us"])
114            fh.write(header_line + "\n")
115        rows = set()
116
117        for fio_config in fio_files:
118            self.log_print("Getting FIO stats for %s" % fio_config)
119            job_name, _ = os.path.splitext(fio_config)
120
121            # If "_CPU" exists in name - ignore it
122            # Initiators for the same job could have diffrent num_cores parameter
123            job_name = re.sub(r"_\d+CPU", "", job_name)
124            job_result_files = [x for x in json_files if job_name in x]
125            self.log_print("Matching result files for current fio config:")
126            for j in job_result_files:
127                self.log_print("\t %s" % j)
128
129            # There may have been more than 1 initiator used in test, need to check that
130            # Result files are created so that string after last "_" separator is server name
131            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
132            inits_avg_results = []
133            for i in inits_names:
134                self.log_print("\tGetting stats for initiator %s" % i)
135                # There may have been more than 1 test run for this job, calculate average results for initiator
136                i_results = [x for x in job_result_files if i in x]
137
138                separate_stats = []
139                for r in i_results:
140                    stats = self.read_json_stats(os.path.join(results_dir, r))
141                    separate_stats.append(stats)
142                    self.log_print(stats)
143
144                z = [sum(c) for c in zip(*separate_stats)]
145                z = [c/len(separate_stats) for c in z]
146                inits_avg_results.append(z)
147
148                self.log_print("\tAverage results for initiator %s" % i)
149                self.log_print(z)
150
151            # Sum average results of all initiators running this FIO job
152            self.log_print("\tTotal results for %s from all initiators" % fio_config)
153            for a in inits_avg_results:
154                self.log_print(a)
155            total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)]
156            rows.add(",".join([job_name, *total]))
157
158        # Save results to file
159        for row in rows:
160            with open(os.path.join(results_dir, csv_file), "a") as fh:
161                fh.write(row + "\n")
162        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
163
164    def measure_sar(self, results_dir, sar_file_name):
165        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
166        time.sleep(self.sar_delay)
167        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
168        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
169            for line in out.split("\n"):
170                if "Average" in line and "CPU" in line:
171                    self.log_print("Summary CPU utilization from SAR:")
172                    self.log_print(line)
173                if "Average" in line and "all" in line:
174                    self.log_print(line)
175            fh.write(out)
176
177
178class Initiator(Server):
179    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", nvmecli_dir=None, workspace="/tmp/spdk",
180                 fio_dir="/usr/src/fio"):
181        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
182        self.ip = ip
183        self.spdk_dir = workspace
184        self.fio_dir = fio_dir
185
186        if nvmecli_dir:
187            self.nvmecli_bin = os.path.join(nvmecli_dir, "nvme")
188        else:
189            self.nvmecli_bin = "nvme"  # Use system-wide nvme-cli
190
191        self.ssh_connection = paramiko.SSHClient()
192        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
193        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
194        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
195        self.remote_call("mkdir -p %s" % self.spdk_dir)
196
197    def __del__(self):
198        self.ssh_connection.close()
199
200    def put_file(self, local, remote_dest):
201        ftp = self.ssh_connection.open_sftp()
202        ftp.put(local, remote_dest)
203        ftp.close()
204
205    def get_file(self, remote, local_dest):
206        ftp = self.ssh_connection.open_sftp()
207        ftp.get(remote, local_dest)
208        ftp.close()
209
210    def remote_call(self, cmd):
211        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
212        out = stdout.read().decode(encoding="utf-8")
213        err = stderr.read().decode(encoding="utf-8")
214        return out, err
215
216    def copy_result_files(self, dest_dir):
217        self.log_print("Copying results")
218
219        if not os.path.exists(dest_dir):
220            os.mkdir(dest_dir)
221
222        # Get list of result files from initiator and copy them back to target
223        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
224        file_list = stdout.strip().split("\n")
225
226        for file in file_list:
227            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
228                          os.path.join(dest_dir, file))
229        self.log_print("Done copying results")
230
231    def discover_subsystems(self, address_list, subsys_no):
232        num_nvmes = range(0, subsys_no)
233        nvme_discover_output = ""
234        for ip, subsys_no in itertools.product(address_list, num_nvmes):
235            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
236            nvme_discover_cmd = ["sudo",
237                                 "%s" % self.nvmecli_bin,
238                                 "discover", "-t %s" % self.transport,
239                                 "-s %s" % (4420 + subsys_no),
240                                 "-a %s" % ip]
241            nvme_discover_cmd = " ".join(nvme_discover_cmd)
242
243            stdout, stderr = self.remote_call(nvme_discover_cmd)
244            if stdout:
245                nvme_discover_output = nvme_discover_output + stdout
246
247        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
248                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
249                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
250                                nvme_discover_output)  # from nvme discovery output
251        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
252        subsystems = list(set(subsystems))
253        subsystems.sort(key=lambda x: x[1])
254        self.log_print("Found matching subsystems on target side:")
255        for s in subsystems:
256            self.log_print(s)
257
258        return subsystems
259
260    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
261        fio_conf_template = """
262[global]
263ioengine={ioengine}
264{spdk_conf}
265thread=1
266group_reporting=1
267direct=1
268
269norandommap=1
270rw={rw}
271rwmixread={rwmixread}
272bs={block_size}
273iodepth={io_depth}
274time_based=1
275ramp_time={ramp_time}
276runtime={run_time}
277"""
278        if "spdk" in self.mode:
279            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
280            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
281            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
282            ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir
283            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
284            filename_section = self.gen_fio_filename_conf(subsystems)
285        else:
286            ioengine = "libaio"
287            spdk_conf = ""
288            filename_section = self.gen_fio_filename_conf()
289
290        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
291                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
292                                              io_depth=io_depth, ramp_time=ramp_time, run_time=run_time)
293        if num_jobs:
294            fio_config = fio_config + "numjobs=%s" % num_jobs
295        fio_config = fio_config + filename_section
296
297        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
298        if hasattr(self, "num_cores"):
299            fio_config_filename += "_%sCPU" % self.num_cores
300        fio_config_filename += ".fio"
301
302        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
303        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
304        self.log_print("Created FIO Config:")
305        self.log_print(fio_config)
306
307        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
308
309    def run_fio(self, fio_config_file, run_num=None):
310        job_name, _ = os.path.splitext(fio_config_file)
311        self.log_print("Starting FIO run for job: %s" % job_name)
312        fio_bin = os.path.join(self.fio_dir, "fio")
313        self.log_print("Using FIO: %s" % fio_bin)
314        if run_num:
315            for i in range(1, run_num + 1):
316                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
317                cmd = "sudo %s %s --output-format=json --output=%s" % (fio_bin, fio_config_file, output_filename)
318                output, error = self.remote_call(cmd)
319                self.log_print(output)
320                self.log_print(error)
321        else:
322            output_filename = job_name + "_" + self.name + ".json"
323            cmd = "sudo %s %s --output-format=json --output=%s" % (fio_bin, fio_config_file, output_filename)
324            output, error = self.remote_call(cmd)
325            self.log_print(output)
326            self.log_print(error)
327        self.log_print("FIO run finished. Results in: %s" % output_filename)
328
329
330class KernelTarget(Target):
331    def __init__(self, name, username, password, mode, nic_ips,
332                 use_null_block=False, sar_settings=None, transport="rdma", nvmet_dir=None, **kwargs):
333        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips,
334                                           transport, use_null_block, sar_settings)
335
336        if nvmet_dir:
337            self.nvmet_bin = os.path.join(nvmet_dir, "nvmetcli")
338        else:
339            self.nvmet_bin = "nvmetcli"
340
341    def __del__(self):
342        nvmet_command(self.nvmet_bin, "clear")
343
344    def kernel_tgt_gen_nullblock_conf(self, address):
345        nvmet_cfg = {
346            "ports": [],
347            "hosts": [],
348            "subsystems": [],
349        }
350
351        nvmet_cfg["subsystems"].append({
352            "allowed_hosts": [],
353            "attr": {
354                "allow_any_host": "1",
355                "version": "1.3"
356            },
357            "namespaces": [
358                {
359                    "device": {
360                        "path": "/dev/nullb0",
361                        "uuid": "%s" % uuid.uuid4()
362                    },
363                    "enable": 1,
364                    "nsid": 1
365                }
366            ],
367            "nqn": "nqn.2018-09.io.spdk:cnode1"
368        })
369
370        nvmet_cfg["ports"].append({
371            "addr": {
372                "adrfam": "ipv4",
373                "traddr": address,
374                "trsvcid": "4420",
375                "trtype": "%s" % self.transport,
376            },
377            "portid": 1,
378            "referrals": [],
379            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
380        })
381        with open("kernel.conf", 'w') as fh:
382            fh.write(json.dumps(nvmet_cfg, indent=2))
383
384    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
385
386        nvmet_cfg = {
387            "ports": [],
388            "hosts": [],
389            "subsystems": [],
390        }
391
392        # Split disks between NIC IP's
393        disks_per_ip = int(len(nvme_list) / len(address_list))
394        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
395
396        subsys_no = 1
397        port_no = 0
398        for ip, chunk in zip(address_list, disk_chunks):
399            for disk in chunk:
400                nvmet_cfg["subsystems"].append({
401                    "allowed_hosts": [],
402                    "attr": {
403                        "allow_any_host": "1",
404                        "version": "1.3"
405                    },
406                    "namespaces": [
407                        {
408                            "device": {
409                                "path": disk,
410                                "uuid": "%s" % uuid.uuid4()
411                            },
412                            "enable": 1,
413                            "nsid": subsys_no
414                        }
415                    ],
416                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
417                })
418
419                nvmet_cfg["ports"].append({
420                    "addr": {
421                        "adrfam": "ipv4",
422                        "traddr": ip,
423                        "trsvcid": "%s" % (4420 + port_no),
424                        "trtype": "%s" % self.transport
425                    },
426                    "portid": subsys_no,
427                    "referrals": [],
428                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
429                })
430                subsys_no += 1
431                port_no += 1
432
433        with open("kernel.conf", "w") as fh:
434            fh.write(json.dumps(nvmet_cfg, indent=2))
435        pass
436
437    def tgt_start(self):
438        self.log_print("Configuring kernel NVMeOF Target")
439
440        if self.null_block:
441            print("Configuring with null block device.")
442            if len(self.nic_ips) > 1:
443                print("Testing with null block limited to single RDMA NIC.")
444                print("Please specify only 1 IP address.")
445                exit(1)
446            self.subsys_no = 1
447            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
448        else:
449            print("Configuring with NVMe drives.")
450            nvme_list = get_nvme_devices()
451            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
452            self.subsys_no = len(nvme_list)
453
454        nvmet_command(self.nvmet_bin, "clear")
455        nvmet_command(self.nvmet_bin, "restore kernel.conf")
456        self.log_print("Done configuring kernel NVMeOF Target")
457
458
459class SPDKTarget(Target):
460    def __init__(self, name, username, password, mode, nic_ips, num_cores, num_shared_buffers=4096,
461                 use_null_block=False, sar_settings=None, transport="rdma", **kwargs):
462        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, use_null_block, sar_settings)
463        self.num_cores = num_cores
464        self.num_shared_buffers = num_shared_buffers
465
466    def spdk_tgt_configure(self):
467        self.log_print("Configuring SPDK NVMeOF target via RPC")
468        numa_list = get_used_numa_nodes()
469
470        # Create RDMA transport layer
471        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
472        self.log_print("SPDK NVMeOF transport layer:")
473        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
474
475        if self.null_block:
476            nvme_section = self.spdk_tgt_add_nullblock()
477            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
478        else:
479            nvme_section = self.spdk_tgt_add_nvme_conf()
480            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
481        self.log_print("Done configuring SPDK NVMeOF Target")
482
483    def spdk_tgt_add_nullblock(self):
484        self.log_print("Adding null block bdev to config via RPC")
485        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
486        self.log_print("SPDK Bdevs configuration:")
487        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
488
489    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
490        self.log_print("Adding NVMe bdevs to config via RPC")
491
492        bdfs = get_nvme_devices_bdf()
493        bdfs = [b.replace(":", ".") for b in bdfs]
494
495        if req_num_disks:
496            if req_num_disks > len(bdfs):
497                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
498                sys.exit(1)
499            else:
500                bdfs = bdfs[0:req_num_disks]
501
502        for i, bdf in enumerate(bdfs):
503            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
504
505        self.log_print("SPDK Bdevs configuration:")
506        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
507
508    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
509        self.log_print("Adding subsystems to config")
510        if not req_num_disks:
511            req_num_disks = get_nvme_devices_count()
512
513        # Distribute bdevs between provided NICs
514        num_disks = range(1, req_num_disks + 1)
515        disks_per_ip = int(len(num_disks) / len(ips))
516        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
517
518        # Create subsystems, add bdevs to namespaces, add listeners
519        for ip, chunk in zip(ips, disk_chunks):
520            for c in chunk:
521                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
522                serial = "SPDK00%s" % c
523                bdev_name = "Nvme%sn1" % (c - 1)
524                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
525                                               allow_any_host=True, max_namespaces=8)
526                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
527
528                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
529                                                     trtype=self.transport,
530                                                     traddr=ip,
531                                                     trsvcid="4420",
532                                                     adrfam="ipv4")
533
534        self.log_print("SPDK NVMeOF subsystem configuration:")
535        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
536
537    def tgt_start(self):
538        self.subsys_no = get_nvme_devices_count()
539        self.log_print("Starting SPDK NVMeOF Target process")
540        nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt")
541        command = " ".join([nvmf_app_path, "-m", self.num_cores])
542        proc = subprocess.Popen(command, shell=True)
543        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
544
545        with open(self.pid, "w") as fh:
546            fh.write(str(proc.pid))
547        self.nvmf_proc = proc
548        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
549        self.log_print("Waiting for spdk to initilize...")
550        while True:
551            if os.path.exists("/var/tmp/spdk.sock"):
552                break
553            time.sleep(1)
554        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
555
556        self.spdk_tgt_configure()
557
558    def __del__(self):
559        if hasattr(self, "nvmf_proc"):
560            try:
561                self.nvmf_proc.terminate()
562                self.nvmf_proc.wait()
563            except Exception as e:
564                self.log_print(e)
565                self.nvmf_proc.kill()
566                self.nvmf_proc.communicate()
567
568
569class KernelInitiator(Initiator):
570    def __init__(self, name, username, password, mode, nic_ips, ip, transport, **kwargs):
571        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, fio_dir)
572
573    def __del__(self):
574        self.ssh_connection.close()
575
576    def kernel_init_connect(self, address_list, subsys_no):
577        subsystems = self.discover_subsystems(address_list, subsys_no)
578        self.log_print("Below connection attempts may result in error messages, this is expected!")
579        for subsystem in subsystems:
580            self.log_print("Trying to connect %s %s %s" % subsystem)
581            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s -i 8" % (self.nvmecli_bin, self.transport, *subsystem))
582            time.sleep(2)
583
584    def kernel_init_disconnect(self, address_list, subsys_no):
585        subsystems = self.discover_subsystems(address_list, subsys_no)
586        for subsystem in subsystems:
587            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
588            time.sleep(1)
589
590    def gen_fio_filename_conf(self):
591        out, err = self.remote_call("lsblk -o NAME -nlp")
592        nvme_list = [x for x in out.split("\n") if "nvme" in x]
593
594        filename_section = ""
595        for i, nvme in enumerate(nvme_list):
596            filename_section = "\n".join([filename_section,
597                                          "[filename%s]" % i,
598                                          "filename=%s" % nvme])
599
600        return filename_section
601
602
603class SPDKInitiator(Initiator):
604    def __init__(self, name, username, password, mode, nic_ips, ip, num_cores=None, transport="rdma", **kwargs):
605        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, fio_dir)
606        if num_cores:
607            self.num_cores = num_cores
608
609    def install_spdk(self, local_spdk_zip):
610        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
611        self.log_print("Copied sources zip from target")
612        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
613
614        self.log_print("Sources unpacked")
615        self.log_print("Using fio directory %s" % self.fio_dir)
616        self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;"
617                         "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, self.fio_dir))
618
619        self.log_print("SPDK built")
620        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
621
622    def gen_spdk_bdev_conf(self, remote_subsystem_list):
623        header = "[Nvme]"
624        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
625
626        bdev_rows = [row_template.format(transport=self.transport,
627                                         svc=x[0],
628                                         nqn=x[1],
629                                         ip=x[2],
630                                         i=i) for i, x in enumerate(remote_subsystem_list)]
631        bdev_rows = "\n".join(bdev_rows)
632        bdev_section = "\n".join([header, bdev_rows])
633        return bdev_section
634
635    def gen_fio_filename_conf(self, remote_subsystem_list):
636        subsystems = [str(x) for x in range(0, len(remote_subsystem_list))]
637
638        # If num_cpus exists then limit FIO to this number of CPUs
639        # Otherwise - each connected subsystem gets its own CPU
640        if hasattr(self, 'num_cores'):
641            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
642            threads = range(0, int(self.num_cores))
643        else:
644            threads = range(0, len(subsystems))
645
646        n = int(len(subsystems) / len(threads))
647
648        filename_section = ""
649        for t in threads:
650            header = "[filename%s]" % t
651            disks = "\n".join(["filename=Nvme%sn1" % x for x in subsystems[n * t:n + n * t]])
652            filename_section = "\n".join([filename_section, header, disks])
653
654        return filename_section
655
656
657if __name__ == "__main__":
658    spdk_zip_path = "/tmp/spdk.zip"
659    target_results_dir = "/tmp/results"
660
661    if (len(sys.argv) > 1):
662        config_file_path = sys.argv[1]
663    else:
664        script_full_dir = os.path.dirname(os.path.realpath(__file__))
665        config_file_path = os.path.join(script_full_dir, "config.json")
666
667    print("Using config file: %s" % config_file_path)
668    with open(config_file_path, "r") as config:
669        data = json.load(config)
670
671    initiators = []
672    fio_cases = []
673
674    for k, v in data.items():
675        if "target" in k:
676            if data[k]["mode"] == "spdk":
677                target_obj = SPDKTarget(name=k, **data["general"], **v)
678            elif data[k]["mode"] == "kernel":
679                target_obj = KernelTarget(name=k, **data["general"], **v)
680        elif "initiator" in k:
681            if data[k]["mode"] == "spdk":
682                init_obj = SPDKInitiator(name=k, **data["general"], **v)
683            elif data[k]["mode"] == "kernel":
684                init_obj = KernelInitiator(name=k, **data["general"], **v)
685            initiators.append(init_obj)
686        elif "fio" in k:
687            fio_workloads = itertools.product(data[k]["bs"],
688                                              data[k]["qd"],
689                                              data[k]["rw"])
690
691            fio_run_time = data[k]["run_time"]
692            fio_ramp_time = data[k]["ramp_time"]
693            fio_rw_mix_read = data[k]["rwmixread"]
694            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
695            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
696        else:
697            continue
698
699    # Copy and install SPDK on remote initiators
700    target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
701    threads = []
702    for i in initiators:
703        if i.mode == "spdk":
704            t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
705            threads.append(t)
706            t.start()
707    for t in threads:
708        t.join()
709
710    target_obj.tgt_start()
711
712    # Poor mans threading
713    # Run FIO tests
714    for block_size, io_depth, rw in fio_workloads:
715        threads = []
716        configs = []
717        for i in initiators:
718            if i.mode == "kernel":
719                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
720
721            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
722                                   fio_num_jobs, fio_ramp_time, fio_run_time)
723            configs.append(cfg)
724
725        for i, cfg in zip(initiators, configs):
726            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
727            threads.append(t)
728        if target_obj.enable_sar:
729            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
730            sar_file_name = ".".join([sar_file_name, "txt"])
731            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
732            threads.append(t)
733
734        for t in threads:
735            t.start()
736        for t in threads:
737            t.join()
738
739        for i in initiators:
740            if i.mode == "kernel":
741                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
742            i.copy_result_files(target_results_dir)
743
744    target_obj.parse_results(target_results_dir)
745