xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 75c2135de5d2e752b8a851e3fa55395b02a47bed)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16from common import *
17
18
19class Server:
20    def __init__(self, name, username, password, mode, nic_ips, transport):
21        self.name = name
22        self.mode = mode
23        self.username = username
24        self.password = password
25        self.nic_ips = nic_ips
26        self.transport = transport.lower()
27
28        if not re.match("^[A-Za-z0-9]*$", name):
29            self.log_print("Please use a name which contains only letters or numbers")
30            sys.exit(1)
31
32    def log_print(self, msg):
33        print("[%s] %s" % (self.name, msg), flush=True)
34
35
36class Target(Server):
37    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
38                 use_null_block=False, sar_settings=None, pcm_settings=None):
39
40        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
41        self.null_block = bool(use_null_block)
42        self.enable_sar = False
43        self.enable_pcm_memory = False
44        self.enable_pcm = False
45
46        if sar_settings:
47            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
48
49        if pcm_settings:
50            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
51
52        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
53        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
54
55    def zip_spdk_sources(self, spdk_dir, dest_file):
56        self.log_print("Zipping SPDK source directory")
57        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
58        for root, directories, files in os.walk(spdk_dir, followlinks=True):
59            for file in files:
60                fh.write(os.path.relpath(os.path.join(root, file)))
61        fh.close()
62        self.log_print("Done zipping")
63
64    def read_json_stats(self, file):
65        with open(file, "r") as json_data:
66            data = json.load(json_data)
67            job_pos = 0  # job_post = 0 because using aggregated results
68
69            # Check if latency is in nano or microseconds to choose correct dict key
70            def get_lat_unit(key_prefix, dict_section):
71                # key prefix - lat, clat or slat.
72                # dict section - portion of json containing latency bucket in question
73                # Return dict key to access the bucket and unit as string
74                for k, v in dict_section.items():
75                    if k.startswith(key_prefix):
76                        return k, k.split("_")[1]
77
78            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
79            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
80            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
81            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
82            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
83            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
84            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
85            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
86
87            if "ns" in lat_unit:
88                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
89            if "ns" in clat_unit:
90                read_p99_lat = read_p99_lat / 1000
91
92            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
93            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
94            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
95            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
96            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
97            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
98            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
99            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
100
101            if "ns" in lat_unit:
102                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
103            if "ns" in clat_unit:
104                write_p99_lat = write_p99_lat / 1000
105
106        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, read_p99_lat,
107                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, write_p99_lat]
108
109    def parse_results(self, results_dir, initiator_count=None, run_num=None):
110        files = os.listdir(results_dir)
111        fio_files = filter(lambda x: ".fio" in x, files)
112        json_files = [x for x in files if ".json" in x]
113
114        # Create empty results file
115        csv_file = "nvmf_results.csv"
116        with open(os.path.join(results_dir, csv_file), "w") as fh:
117            header_line = ",".join(["Name",
118                                    "read_iops", "read_bw", "read_avg_lat_us",
119                                    "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us",
120                                    "write_iops", "write_bw", "write_avg_lat_us",
121                                    "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us"])
122            fh.write(header_line + "\n")
123        rows = set()
124
125        for fio_config in fio_files:
126            self.log_print("Getting FIO stats for %s" % fio_config)
127            job_name, _ = os.path.splitext(fio_config)
128
129            # If "_CPU" exists in name - ignore it
130            # Initiators for the same job could have diffrent num_cores parameter
131            job_name = re.sub(r"_\d+CPU", "", job_name)
132            job_result_files = [x for x in json_files if job_name in x]
133            self.log_print("Matching result files for current fio config:")
134            for j in job_result_files:
135                self.log_print("\t %s" % j)
136
137            # There may have been more than 1 initiator used in test, need to check that
138            # Result files are created so that string after last "_" separator is server name
139            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
140            inits_avg_results = []
141            for i in inits_names:
142                self.log_print("\tGetting stats for initiator %s" % i)
143                # There may have been more than 1 test run for this job, calculate average results for initiator
144                i_results = [x for x in job_result_files if i in x]
145
146                separate_stats = []
147                for r in i_results:
148                    stats = self.read_json_stats(os.path.join(results_dir, r))
149                    separate_stats.append(stats)
150                    self.log_print(stats)
151
152                z = [sum(c) for c in zip(*separate_stats)]
153                z = [c/len(separate_stats) for c in z]
154                inits_avg_results.append(z)
155
156                self.log_print("\tAverage results for initiator %s" % i)
157                self.log_print(z)
158
159            # Sum average results of all initiators running this FIO job
160            self.log_print("\tTotal results for %s from all initiators" % fio_config)
161            for a in inits_avg_results:
162                self.log_print(a)
163            total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)]
164            rows.add(",".join([job_name, *total]))
165
166        # Save results to file
167        for row in rows:
168            with open(os.path.join(results_dir, csv_file), "a") as fh:
169                fh.write(row + "\n")
170        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
171
172    def measure_sar(self, results_dir, sar_file_name):
173        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
174        time.sleep(self.sar_delay)
175        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
176        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
177            for line in out.split("\n"):
178                if "Average" in line and "CPU" in line:
179                    self.log_print("Summary CPU utilization from SAR:")
180                    self.log_print(line)
181                if "Average" in line and "all" in line:
182                    self.log_print(line)
183            fh.write(out)
184
185    def measure_pcm_memory(self, results_dir, pcm_file_name):
186        time.sleep(self.pcm_delay)
187        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
188                                      results_dir, pcm_file_name), shell=True)
189        time.sleep(self.pcm_count)
190        pcm_memory.kill()
191
192    def measure_pcm(self, results_dir, pcm_file_name):
193        time.sleep(self.pcm_delay)
194        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
195                       results_dir, pcm_file_name), shell=True, check=True)
196
197
198class Initiator(Server):
199    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
200                 nvmecli_bin="nvme", workspace="/tmp/spdk", fio_bin="/usr/src/fio/fio"):
201
202        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
203
204        self.ip = ip
205        self.spdk_dir = workspace
206        self.fio_bin = fio_bin
207        self.nvmecli_bin = nvmecli_bin
208        self.ssh_connection = paramiko.SSHClient()
209        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
210        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
211        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
212        self.remote_call("mkdir -p %s" % self.spdk_dir)
213
214    def __del__(self):
215        self.ssh_connection.close()
216
217    def put_file(self, local, remote_dest):
218        ftp = self.ssh_connection.open_sftp()
219        ftp.put(local, remote_dest)
220        ftp.close()
221
222    def get_file(self, remote, local_dest):
223        ftp = self.ssh_connection.open_sftp()
224        ftp.get(remote, local_dest)
225        ftp.close()
226
227    def remote_call(self, cmd):
228        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
229        out = stdout.read().decode(encoding="utf-8")
230        err = stderr.read().decode(encoding="utf-8")
231        return out, err
232
233    def copy_result_files(self, dest_dir):
234        self.log_print("Copying results")
235
236        if not os.path.exists(dest_dir):
237            os.mkdir(dest_dir)
238
239        # Get list of result files from initiator and copy them back to target
240        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
241        file_list = stdout.strip().split("\n")
242
243        for file in file_list:
244            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
245                          os.path.join(dest_dir, file))
246        self.log_print("Done copying results")
247
248    def discover_subsystems(self, address_list, subsys_no):
249        num_nvmes = range(0, subsys_no)
250        nvme_discover_output = ""
251        for ip, subsys_no in itertools.product(address_list, num_nvmes):
252            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
253            nvme_discover_cmd = ["sudo",
254                                 "%s" % self.nvmecli_bin,
255                                 "discover", "-t %s" % self.transport,
256                                 "-s %s" % (4420 + subsys_no),
257                                 "-a %s" % ip]
258            nvme_discover_cmd = " ".join(nvme_discover_cmd)
259
260            stdout, stderr = self.remote_call(nvme_discover_cmd)
261            if stdout:
262                nvme_discover_output = nvme_discover_output + stdout
263
264        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
265                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
266                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
267                                nvme_discover_output)  # from nvme discovery output
268        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
269        subsystems = list(set(subsystems))
270        subsystems.sort(key=lambda x: x[1])
271        self.log_print("Found matching subsystems on target side:")
272        for s in subsystems:
273            self.log_print(s)
274
275        return subsystems
276
277    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
278        fio_conf_template = """
279[global]
280ioengine={ioengine}
281{spdk_conf}
282thread=1
283group_reporting=1
284direct=1
285
286norandommap=1
287rw={rw}
288rwmixread={rwmixread}
289bs={block_size}
290iodepth={io_depth}
291time_based=1
292ramp_time={ramp_time}
293runtime={run_time}
294"""
295        if "spdk" in self.mode:
296            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
297            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
298            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
299            ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir
300            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
301            filename_section = self.gen_fio_filename_conf(subsystems)
302        else:
303            ioengine = "libaio"
304            spdk_conf = ""
305            filename_section = self.gen_fio_filename_conf()
306
307        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
308                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
309                                              io_depth=io_depth, ramp_time=ramp_time, run_time=run_time)
310        if num_jobs:
311            fio_config = fio_config + "numjobs=%s" % num_jobs
312        fio_config = fio_config + filename_section
313
314        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
315        if hasattr(self, "num_cores"):
316            fio_config_filename += "_%sCPU" % self.num_cores
317        fio_config_filename += ".fio"
318
319        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
320        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
321        self.log_print("Created FIO Config:")
322        self.log_print(fio_config)
323
324        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
325
326    def run_fio(self, fio_config_file, run_num=None):
327        job_name, _ = os.path.splitext(fio_config_file)
328        self.log_print("Starting FIO run for job: %s" % job_name)
329        self.log_print("Using FIO: %s" % self.fio_bin)
330        if run_num:
331            for i in range(1, run_num + 1):
332                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
333                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
334                output, error = self.remote_call(cmd)
335                self.log_print(output)
336                self.log_print(error)
337        else:
338            output_filename = job_name + "_" + self.name + ".json"
339            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
340            output, error = self.remote_call(cmd)
341            self.log_print(output)
342            self.log_print(error)
343        self.log_print("FIO run finished. Results in: %s" % output_filename)
344
345
346class KernelTarget(Target):
347    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
348                 use_null_block=False, sar_settings=None, pcm_settings=None,
349                 nvmet_bin="nvmetcli", **kwargs):
350
351        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
352                                           use_null_block, sar_settings, pcm_settings)
353        self.nvmet_bin = nvmet_bin
354
355    def __del__(self):
356        nvmet_command(self.nvmet_bin, "clear")
357
358    def kernel_tgt_gen_nullblock_conf(self, address):
359        nvmet_cfg = {
360            "ports": [],
361            "hosts": [],
362            "subsystems": [],
363        }
364
365        nvmet_cfg["subsystems"].append({
366            "allowed_hosts": [],
367            "attr": {
368                "allow_any_host": "1",
369                "version": "1.3"
370            },
371            "namespaces": [
372                {
373                    "device": {
374                        "path": "/dev/nullb0",
375                        "uuid": "%s" % uuid.uuid4()
376                    },
377                    "enable": 1,
378                    "nsid": 1
379                }
380            ],
381            "nqn": "nqn.2018-09.io.spdk:cnode1"
382        })
383
384        nvmet_cfg["ports"].append({
385            "addr": {
386                "adrfam": "ipv4",
387                "traddr": address,
388                "trsvcid": "4420",
389                "trtype": "%s" % self.transport,
390            },
391            "portid": 1,
392            "referrals": [],
393            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
394        })
395        with open("kernel.conf", 'w') as fh:
396            fh.write(json.dumps(nvmet_cfg, indent=2))
397
398    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
399
400        nvmet_cfg = {
401            "ports": [],
402            "hosts": [],
403            "subsystems": [],
404        }
405
406        # Split disks between NIC IP's
407        disks_per_ip = int(len(nvme_list) / len(address_list))
408        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
409
410        subsys_no = 1
411        port_no = 0
412        for ip, chunk in zip(address_list, disk_chunks):
413            for disk in chunk:
414                nvmet_cfg["subsystems"].append({
415                    "allowed_hosts": [],
416                    "attr": {
417                        "allow_any_host": "1",
418                        "version": "1.3"
419                    },
420                    "namespaces": [
421                        {
422                            "device": {
423                                "path": disk,
424                                "uuid": "%s" % uuid.uuid4()
425                            },
426                            "enable": 1,
427                            "nsid": subsys_no
428                        }
429                    ],
430                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
431                })
432
433                nvmet_cfg["ports"].append({
434                    "addr": {
435                        "adrfam": "ipv4",
436                        "traddr": ip,
437                        "trsvcid": "%s" % (4420 + port_no),
438                        "trtype": "%s" % self.transport
439                    },
440                    "portid": subsys_no,
441                    "referrals": [],
442                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
443                })
444                subsys_no += 1
445                port_no += 1
446
447        with open("kernel.conf", "w") as fh:
448            fh.write(json.dumps(nvmet_cfg, indent=2))
449        pass
450
451    def tgt_start(self):
452        self.log_print("Configuring kernel NVMeOF Target")
453
454        if self.null_block:
455            print("Configuring with null block device.")
456            if len(self.nic_ips) > 1:
457                print("Testing with null block limited to single RDMA NIC.")
458                print("Please specify only 1 IP address.")
459                exit(1)
460            self.subsys_no = 1
461            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
462        else:
463            print("Configuring with NVMe drives.")
464            nvme_list = get_nvme_devices()
465            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
466            self.subsys_no = len(nvme_list)
467
468        nvmet_command(self.nvmet_bin, "clear")
469        nvmet_command(self.nvmet_bin, "restore kernel.conf")
470        self.log_print("Done configuring kernel NVMeOF Target")
471
472
473class SPDKTarget(Target):
474
475    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
476                 use_null_block=False, sar_settings=None, pcm_settings=None,
477                 num_shared_buffers=4096, num_cores=1, **kwargs):
478
479        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
480                                         use_null_block, sar_settings, pcm_settings)
481        self.num_cores = num_cores
482        self.num_shared_buffers = num_shared_buffers
483
484    def spdk_tgt_configure(self):
485        self.log_print("Configuring SPDK NVMeOF target via RPC")
486        numa_list = get_used_numa_nodes()
487
488        # Create RDMA transport layer
489        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
490        self.log_print("SPDK NVMeOF transport layer:")
491        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
492
493        if self.null_block:
494            nvme_section = self.spdk_tgt_add_nullblock()
495            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
496        else:
497            nvme_section = self.spdk_tgt_add_nvme_conf()
498            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
499        self.log_print("Done configuring SPDK NVMeOF Target")
500
501    def spdk_tgt_add_nullblock(self):
502        self.log_print("Adding null block bdev to config via RPC")
503        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
504        self.log_print("SPDK Bdevs configuration:")
505        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
506
507    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
508        self.log_print("Adding NVMe bdevs to config via RPC")
509
510        bdfs = get_nvme_devices_bdf()
511        bdfs = [b.replace(":", ".") for b in bdfs]
512
513        if req_num_disks:
514            if req_num_disks > len(bdfs):
515                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
516                sys.exit(1)
517            else:
518                bdfs = bdfs[0:req_num_disks]
519
520        for i, bdf in enumerate(bdfs):
521            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
522
523        self.log_print("SPDK Bdevs configuration:")
524        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
525
526    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
527        self.log_print("Adding subsystems to config")
528        if not req_num_disks:
529            req_num_disks = get_nvme_devices_count()
530
531        # Distribute bdevs between provided NICs
532        num_disks = range(1, req_num_disks + 1)
533        disks_per_ip = int(len(num_disks) / len(ips))
534        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
535
536        # Create subsystems, add bdevs to namespaces, add listeners
537        for ip, chunk in zip(ips, disk_chunks):
538            for c in chunk:
539                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
540                serial = "SPDK00%s" % c
541                bdev_name = "Nvme%sn1" % (c - 1)
542                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
543                                               allow_any_host=True, max_namespaces=8)
544                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
545
546                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
547                                                     trtype=self.transport,
548                                                     traddr=ip,
549                                                     trsvcid="4420",
550                                                     adrfam="ipv4")
551
552        self.log_print("SPDK NVMeOF subsystem configuration:")
553        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
554
555    def tgt_start(self):
556        self.subsys_no = get_nvme_devices_count()
557        self.log_print("Starting SPDK NVMeOF Target process")
558        nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt")
559        command = " ".join([nvmf_app_path, "-m", self.num_cores])
560        proc = subprocess.Popen(command, shell=True)
561        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
562
563        with open(self.pid, "w") as fh:
564            fh.write(str(proc.pid))
565        self.nvmf_proc = proc
566        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
567        self.log_print("Waiting for spdk to initilize...")
568        while True:
569            if os.path.exists("/var/tmp/spdk.sock"):
570                break
571            time.sleep(1)
572        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
573
574        self.spdk_tgt_configure()
575
576    def __del__(self):
577        if hasattr(self, "nvmf_proc"):
578            try:
579                self.nvmf_proc.terminate()
580                self.nvmf_proc.wait()
581            except Exception as e:
582                self.log_print(e)
583                self.nvmf_proc.kill()
584                self.nvmf_proc.communicate()
585
586
587class KernelInitiator(Initiator):
588    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
589                 fio_bin="/usr/src/fio/fio", **kwargs):
590
591        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
592                                              fio_bin=fio_bin)
593
594        self.extra_params = ""
595        if kwargs["extra_params"]:
596            self.extra_params = kwargs["extra_params"]
597
598    def __del__(self):
599        self.ssh_connection.close()
600
601    def kernel_init_connect(self, address_list, subsys_no):
602        subsystems = self.discover_subsystems(address_list, subsys_no)
603        self.log_print("Below connection attempts may result in error messages, this is expected!")
604        for subsystem in subsystems:
605            self.log_print("Trying to connect %s %s %s" % subsystem)
606            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
607                                                                             self.transport,
608                                                                             *subsystem,
609                                                                             self.extra_params))
610            time.sleep(2)
611
612    def kernel_init_disconnect(self, address_list, subsys_no):
613        subsystems = self.discover_subsystems(address_list, subsys_no)
614        for subsystem in subsystems:
615            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
616            time.sleep(1)
617
618    def gen_fio_filename_conf(self):
619        out, err = self.remote_call("lsblk -o NAME -nlp")
620        nvme_list = [x for x in out.split("\n") if "nvme" in x]
621
622        filename_section = ""
623        for i, nvme in enumerate(nvme_list):
624            filename_section = "\n".join([filename_section,
625                                          "[filename%s]" % i,
626                                          "filename=%s" % nvme])
627
628        return filename_section
629
630
631class SPDKInitiator(Initiator):
632    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
633                 num_cores=1, fio_bin="/usr/src/fio/fio", **kwargs):
634        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
635                                            fio_bin=fio_bin)
636
637        self.num_cores = num_cores
638
639    def install_spdk(self, local_spdk_zip):
640        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
641        self.log_print("Copied sources zip from target")
642        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
643
644        self.log_print("Sources unpacked")
645        self.log_print("Using fio binary %s" % self.fio_bin)
646        self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;"
647                         "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
648
649        self.log_print("SPDK built")
650        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
651
652    def gen_spdk_bdev_conf(self, remote_subsystem_list):
653        header = "[Nvme]"
654        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
655
656        bdev_rows = [row_template.format(transport=self.transport,
657                                         svc=x[0],
658                                         nqn=x[1],
659                                         ip=x[2],
660                                         i=i) for i, x in enumerate(remote_subsystem_list)]
661        bdev_rows = "\n".join(bdev_rows)
662        bdev_section = "\n".join([header, bdev_rows])
663        return bdev_section
664
665    def gen_fio_filename_conf(self, remote_subsystem_list):
666        subsystems = [str(x) for x in range(0, len(remote_subsystem_list))]
667
668        # If num_cpus exists then limit FIO to this number of CPUs
669        # Otherwise - each connected subsystem gets its own CPU
670        if hasattr(self, 'num_cores'):
671            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
672            threads = range(0, int(self.num_cores))
673        else:
674            threads = range(0, len(subsystems))
675
676        n = int(len(subsystems) / len(threads))
677
678        filename_section = ""
679        for t in threads:
680            header = "[filename%s]" % t
681            disks = "\n".join(["filename=Nvme%sn1" % x for x in subsystems[n * t:n + n * t]])
682            filename_section = "\n".join([filename_section, header, disks])
683
684        return filename_section
685
686
687if __name__ == "__main__":
688    spdk_zip_path = "/tmp/spdk.zip"
689    target_results_dir = "/tmp/results"
690
691    if (len(sys.argv) > 1):
692        config_file_path = sys.argv[1]
693    else:
694        script_full_dir = os.path.dirname(os.path.realpath(__file__))
695        config_file_path = os.path.join(script_full_dir, "config.json")
696
697    print("Using config file: %s" % config_file_path)
698    with open(config_file_path, "r") as config:
699        data = json.load(config)
700
701    initiators = []
702    fio_cases = []
703
704    for k, v in data.items():
705        if "target" in k:
706            if data[k]["mode"] == "spdk":
707                target_obj = SPDKTarget(name=k, **data["general"], **v)
708            elif data[k]["mode"] == "kernel":
709                target_obj = KernelTarget(name=k, **data["general"], **v)
710        elif "initiator" in k:
711            if data[k]["mode"] == "spdk":
712                init_obj = SPDKInitiator(name=k, **data["general"], **v)
713            elif data[k]["mode"] == "kernel":
714                init_obj = KernelInitiator(name=k, **data["general"], **v)
715            initiators.append(init_obj)
716        elif "fio" in k:
717            fio_workloads = itertools.product(data[k]["bs"],
718                                              data[k]["qd"],
719                                              data[k]["rw"])
720
721            fio_run_time = data[k]["run_time"]
722            fio_ramp_time = data[k]["ramp_time"]
723            fio_rw_mix_read = data[k]["rwmixread"]
724            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
725            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
726        else:
727            continue
728
729    # Copy and install SPDK on remote initiators
730    target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
731    threads = []
732    for i in initiators:
733        if i.mode == "spdk":
734            t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
735            threads.append(t)
736            t.start()
737    for t in threads:
738        t.join()
739
740    target_obj.tgt_start()
741
742    # Poor mans threading
743    # Run FIO tests
744    for block_size, io_depth, rw in fio_workloads:
745        threads = []
746        configs = []
747        for i in initiators:
748            if i.mode == "kernel":
749                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
750
751            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
752                                   fio_num_jobs, fio_ramp_time, fio_run_time)
753            configs.append(cfg)
754
755        for i, cfg in zip(initiators, configs):
756            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
757            threads.append(t)
758        if target_obj.enable_sar:
759            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
760            sar_file_name = ".".join([sar_file_name, "txt"])
761            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
762            threads.append(t)
763
764        if target_obj.enable_pcm:
765            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
766            pcm_file_name = ".".join([pcm_file_name, "csv"])
767            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
768            threads.append(t)
769
770        if target_obj.enable_pcm_memory:
771            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
772            pcm_file_name = ".".join([pcm_file_name, "csv"])
773            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
774            threads.append(t)
775
776        for t in threads:
777            t.start()
778        for t in threads:
779            t.join()
780
781        for i in initiators:
782            if i.mode == "kernel":
783                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
784            i.copy_result_files(target_results_dir)
785
786    target_obj.parse_results(target_results_dir)
787