xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 94a84ae98590bea46939eb1dcd7a9876bd393b54)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from common import *
18
19
20class Server:
21    def __init__(self, name, username, password, mode, nic_ips, transport):
22        self.name = name
23        self.mode = mode
24        self.username = username
25        self.password = password
26        self.nic_ips = nic_ips
27        self.transport = transport.lower()
28
29        if not re.match("^[A-Za-z0-9]*$", name):
30            self.log_print("Please use a name which contains only letters or numbers")
31            sys.exit(1)
32
33    def log_print(self, msg):
34        print("[%s] %s" % (self.name, msg), flush=True)
35
36
37class Target(Server):
38    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
39                 use_null_block=False, sar_settings=None, pcm_settings=None):
40
41        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
42        self.null_block = bool(use_null_block)
43        self.enable_sar = False
44        self.enable_pcm_memory = False
45        self.enable_pcm = False
46
47        if sar_settings:
48            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
49
50        if pcm_settings:
51            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
52
53        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
54        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
55
56    def zip_spdk_sources(self, spdk_dir, dest_file):
57        self.log_print("Zipping SPDK source directory")
58        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
59        for root, directories, files in os.walk(spdk_dir, followlinks=True):
60            for file in files:
61                fh.write(os.path.relpath(os.path.join(root, file)))
62        fh.close()
63        self.log_print("Done zipping")
64
65    def read_json_stats(self, file):
66        with open(file, "r") as json_data:
67            data = json.load(json_data)
68            job_pos = 0  # job_post = 0 because using aggregated results
69
70            # Check if latency is in nano or microseconds to choose correct dict key
71            def get_lat_unit(key_prefix, dict_section):
72                # key prefix - lat, clat or slat.
73                # dict section - portion of json containing latency bucket in question
74                # Return dict key to access the bucket and unit as string
75                for k, v in dict_section.items():
76                    if k.startswith(key_prefix):
77                        return k, k.split("_")[1]
78
79            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
80            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
81            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
82            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
83            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
84            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
85            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
86            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
87            read_p99.9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
88            read_p99.99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
89            read_p99.999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
90
91            if "ns" in lat_unit:
92                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
93            if "ns" in clat_unit:
94                read_p99_lat = read_p99_lat / 1000
95                read_p99.9_lat = read_p99.9_lat / 1000
96                read_p99.99_lat = read_p99.99_lat / 1000
97                read_p99.999_lat = read_p99.999_lat / 1000
98
99            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
100            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
101            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
102            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
103            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
104            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
105            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
106            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
107            write_p99.9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
108            write_p99.99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
109            write_p99.999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
110
111            if "ns" in lat_unit:
112                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
113            if "ns" in clat_unit:
114                write_p99_lat = write_p99_lat / 1000
115                write_p99.9_lat = write_p99.9_lat / 1000
116                write_p99.99_lat = write_p99.99_lat / 1000
117                write_p99.999_lat = write_p99.999_lat / 1000
118
119        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
120                read_p99_lat, read_p99.9_lat, read_p99.99_lat, read_p99.999_lat,
121                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
122                write_p99_lat, write_p99.9_lat, write_p99.99_lat, write_p99.999_lat]
123
124    def parse_results(self, results_dir, initiator_count=None, run_num=None):
125        files = os.listdir(results_dir)
126        fio_files = filter(lambda x: ".fio" in x, files)
127        json_files = [x for x in files if ".json" in x]
128
129        # Create empty results file
130        csv_file = "nvmf_results.csv"
131        with open(os.path.join(results_dir, csv_file), "w") as fh:
132            header_line = ",".join(["Name",
133                                    "read_iops", "read_bw", "read_avg_lat_us",
134                                    "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us",
135                                    "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
136                                    "write_iops", "write_bw", "write_avg_lat_us",
137                                    "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us",
138                                    "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"])
139            fh.write(header_line + "\n")
140        rows = set()
141
142        for fio_config in fio_files:
143            self.log_print("Getting FIO stats for %s" % fio_config)
144            job_name, _ = os.path.splitext(fio_config)
145
146            # If "_CPU" exists in name - ignore it
147            # Initiators for the same job could have diffrent num_cores parameter
148            job_name = re.sub(r"_\d+CPU", "", job_name)
149            job_result_files = [x for x in json_files if job_name in x]
150            self.log_print("Matching result files for current fio config:")
151            for j in job_result_files:
152                self.log_print("\t %s" % j)
153
154            # There may have been more than 1 initiator used in test, need to check that
155            # Result files are created so that string after last "_" separator is server name
156            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
157            inits_avg_results = []
158            for i in inits_names:
159                self.log_print("\tGetting stats for initiator %s" % i)
160                # There may have been more than 1 test run for this job, calculate average results for initiator
161                i_results = [x for x in job_result_files if i in x]
162
163                separate_stats = []
164                for r in i_results:
165                    stats = self.read_json_stats(os.path.join(results_dir, r))
166                    separate_stats.append(stats)
167                    self.log_print(stats)
168
169                z = [sum(c) for c in zip(*separate_stats)]
170                z = [c/len(separate_stats) for c in z]
171                inits_avg_results.append(z)
172
173                self.log_print("\tAverage results for initiator %s" % i)
174                self.log_print(z)
175
176            # Sum average results of all initiators running this FIO job
177            self.log_print("\tTotal results for %s from all initiators" % fio_config)
178            for a in inits_avg_results:
179                self.log_print(a)
180            total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)]
181            rows.add(",".join([job_name, *total]))
182
183        # Save results to file
184        for row in rows:
185            with open(os.path.join(results_dir, csv_file), "a") as fh:
186                fh.write(row + "\n")
187        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
188
189    def measure_sar(self, results_dir, sar_file_name):
190        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
191        time.sleep(self.sar_delay)
192        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
193        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
194            for line in out.split("\n"):
195                if "Average" in line and "CPU" in line:
196                    self.log_print("Summary CPU utilization from SAR:")
197                    self.log_print(line)
198                if "Average" in line and "all" in line:
199                    self.log_print(line)
200            fh.write(out)
201
202    def measure_pcm_memory(self, results_dir, pcm_file_name):
203        time.sleep(self.pcm_delay)
204        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
205                                      results_dir, pcm_file_name), shell=True)
206        time.sleep(self.pcm_count)
207        pcm_memory.kill()
208
209    def measure_pcm(self, results_dir, pcm_file_name):
210        time.sleep(self.pcm_delay)
211        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
212                       results_dir, pcm_file_name), shell=True, check=True)
213        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
214        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
215        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
216        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
217        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
218
219
220class Initiator(Server):
221    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
222                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
223                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
224
225        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
226
227        self.ip = ip
228        self.spdk_dir = workspace
229        self.fio_bin = fio_bin
230        self.cpus_allowed = cpus_allowed
231        self.cpus_allowed_policy = cpus_allowed_policy
232        self.cpu_frequency = cpu_frequency
233        self.nvmecli_bin = nvmecli_bin
234        self.ssh_connection = paramiko.SSHClient()
235        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
236        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
237        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
238        self.remote_call("mkdir -p %s" % self.spdk_dir)
239        self.set_cpu_frequency()
240
241    def __del__(self):
242        self.ssh_connection.close()
243
244    def put_file(self, local, remote_dest):
245        ftp = self.ssh_connection.open_sftp()
246        ftp.put(local, remote_dest)
247        ftp.close()
248
249    def get_file(self, remote, local_dest):
250        ftp = self.ssh_connection.open_sftp()
251        ftp.get(remote, local_dest)
252        ftp.close()
253
254    def remote_call(self, cmd):
255        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
256        out = stdout.read().decode(encoding="utf-8")
257        err = stderr.read().decode(encoding="utf-8")
258        return out, err
259
260    def copy_result_files(self, dest_dir):
261        self.log_print("Copying results")
262
263        if not os.path.exists(dest_dir):
264            os.mkdir(dest_dir)
265
266        # Get list of result files from initiator and copy them back to target
267        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
268        file_list = stdout.strip().split("\n")
269
270        for file in file_list:
271            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
272                          os.path.join(dest_dir, file))
273        self.log_print("Done copying results")
274
275    def discover_subsystems(self, address_list, subsys_no):
276        num_nvmes = range(0, subsys_no)
277        nvme_discover_output = ""
278        for ip, subsys_no in itertools.product(address_list, num_nvmes):
279            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
280            nvme_discover_cmd = ["sudo",
281                                 "%s" % self.nvmecli_bin,
282                                 "discover", "-t %s" % self.transport,
283                                 "-s %s" % (4420 + subsys_no),
284                                 "-a %s" % ip]
285            nvme_discover_cmd = " ".join(nvme_discover_cmd)
286
287            stdout, stderr = self.remote_call(nvme_discover_cmd)
288            if stdout:
289                nvme_discover_output = nvme_discover_output + stdout
290
291        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
292                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
293                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
294                                nvme_discover_output)  # from nvme discovery output
295        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
296        subsystems = list(set(subsystems))
297        subsystems.sort(key=lambda x: x[1])
298        self.log_print("Found matching subsystems on target side:")
299        for s in subsystems:
300            self.log_print(s)
301
302        return subsystems
303
304    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
305        fio_conf_template = """
306[global]
307ioengine={ioengine}
308{spdk_conf}
309thread=1
310group_reporting=1
311direct=1
312
313norandommap=1
314rw={rw}
315rwmixread={rwmixread}
316bs={block_size}
317iodepth={io_depth}
318time_based=1
319ramp_time={ramp_time}
320runtime={run_time}
321"""
322        if "spdk" in self.mode:
323            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
324            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
325            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
326            ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir
327            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
328        else:
329            ioengine = "libaio"
330            spdk_conf = ""
331            out, err = self.remote_call("lsblk -o NAME -nlp")
332            subsystems = [x for x in out.split("\n") if "nvme" in x]
333
334        if self.cpus_allowed is not None:
335            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
336            cpus_num = 0
337            cpus = self.cpus_allowed.split(",")
338            for cpu in cpus:
339                if "-" in cpu:
340                    a, b = cpu.split("-")
341                    cpus_num += len(range(a, b))
342                else:
343                    cpus_num += 1
344            threads = range(0, cpus_num)
345        elif hasattr(self, 'num_cores'):
346            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
347            threads = range(0, int(self.num_cores))
348        else:
349            threads = range(0, len(subsystems))
350
351        if "spdk" in self.mode:
352            filename_section = self.gen_fio_filename_conf(subsystems, threads)
353        else:
354            filename_section = self.gen_fio_filename_conf(threads)
355
356        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
357                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
358                                              io_depth=io_depth, ramp_time=ramp_time, run_time=run_time)
359        if num_jobs:
360            fio_config = fio_config + "numjobs=%s \n" % num_jobs
361        if self.cpus_allowed is not None:
362            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
363            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
364        fio_config = fio_config + filename_section
365
366        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
367        if hasattr(self, "num_cores"):
368            fio_config_filename += "_%sCPU" % self.num_cores
369        fio_config_filename += ".fio"
370
371        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
372        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
373        self.log_print("Created FIO Config:")
374        self.log_print(fio_config)
375
376        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
377
378    def set_cpu_frequency(self):
379        if self.cpu_frequency is not None:
380            try:
381                self.remote_call('sudo cpupower frequency-set -g userspace')
382                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
383            except Exception:
384                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
385                sys.exit()
386        else:
387            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
388
389    def run_fio(self, fio_config_file, run_num=None):
390        job_name, _ = os.path.splitext(fio_config_file)
391        self.log_print("Starting FIO run for job: %s" % job_name)
392        self.log_print("Using FIO: %s" % self.fio_bin)
393
394        if run_num:
395            for i in range(1, run_num + 1):
396                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
397                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
398                output, error = self.remote_call(cmd)
399                self.log_print(output)
400                self.log_print(error)
401        else:
402            output_filename = job_name + "_" + self.name + ".json"
403            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
404            output, error = self.remote_call(cmd)
405            self.log_print(output)
406            self.log_print(error)
407        self.log_print("FIO run finished. Results in: %s" % output_filename)
408
409
410class KernelTarget(Target):
411    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
412                 use_null_block=False, sar_settings=None, pcm_settings=None,
413                 nvmet_bin="nvmetcli", **kwargs):
414
415        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
416                                           use_null_block, sar_settings, pcm_settings)
417        self.nvmet_bin = nvmet_bin
418
419    def __del__(self):
420        nvmet_command(self.nvmet_bin, "clear")
421
422    def kernel_tgt_gen_nullblock_conf(self, address):
423        nvmet_cfg = {
424            "ports": [],
425            "hosts": [],
426            "subsystems": [],
427        }
428
429        nvmet_cfg["subsystems"].append({
430            "allowed_hosts": [],
431            "attr": {
432                "allow_any_host": "1",
433                "version": "1.3"
434            },
435            "namespaces": [
436                {
437                    "device": {
438                        "path": "/dev/nullb0",
439                        "uuid": "%s" % uuid.uuid4()
440                    },
441                    "enable": 1,
442                    "nsid": 1
443                }
444            ],
445            "nqn": "nqn.2018-09.io.spdk:cnode1"
446        })
447
448        nvmet_cfg["ports"].append({
449            "addr": {
450                "adrfam": "ipv4",
451                "traddr": address,
452                "trsvcid": "4420",
453                "trtype": "%s" % self.transport,
454            },
455            "portid": 1,
456            "referrals": [],
457            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
458        })
459        with open("kernel.conf", 'w') as fh:
460            fh.write(json.dumps(nvmet_cfg, indent=2))
461
462    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
463
464        nvmet_cfg = {
465            "ports": [],
466            "hosts": [],
467            "subsystems": [],
468        }
469
470        # Split disks between NIC IP's
471        disks_per_ip = int(len(nvme_list) / len(address_list))
472        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
473
474        subsys_no = 1
475        port_no = 0
476        for ip, chunk in zip(address_list, disk_chunks):
477            for disk in chunk:
478                nvmet_cfg["subsystems"].append({
479                    "allowed_hosts": [],
480                    "attr": {
481                        "allow_any_host": "1",
482                        "version": "1.3"
483                    },
484                    "namespaces": [
485                        {
486                            "device": {
487                                "path": disk,
488                                "uuid": "%s" % uuid.uuid4()
489                            },
490                            "enable": 1,
491                            "nsid": subsys_no
492                        }
493                    ],
494                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
495                })
496
497                nvmet_cfg["ports"].append({
498                    "addr": {
499                        "adrfam": "ipv4",
500                        "traddr": ip,
501                        "trsvcid": "%s" % (4420 + port_no),
502                        "trtype": "%s" % self.transport
503                    },
504                    "portid": subsys_no,
505                    "referrals": [],
506                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
507                })
508                subsys_no += 1
509                port_no += 1
510
511        with open("kernel.conf", "w") as fh:
512            fh.write(json.dumps(nvmet_cfg, indent=2))
513        pass
514
515    def tgt_start(self):
516        self.log_print("Configuring kernel NVMeOF Target")
517
518        if self.null_block:
519            print("Configuring with null block device.")
520            if len(self.nic_ips) > 1:
521                print("Testing with null block limited to single RDMA NIC.")
522                print("Please specify only 1 IP address.")
523                exit(1)
524            self.subsys_no = 1
525            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
526        else:
527            print("Configuring with NVMe drives.")
528            nvme_list = get_nvme_devices()
529            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
530            self.subsys_no = len(nvme_list)
531
532        nvmet_command(self.nvmet_bin, "clear")
533        nvmet_command(self.nvmet_bin, "restore kernel.conf")
534        self.log_print("Done configuring kernel NVMeOF Target")
535
536
537class SPDKTarget(Target):
538
539    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
540                 use_null_block=False, sar_settings=None, pcm_settings=None,
541                 num_shared_buffers=4096, num_cores=1, **kwargs):
542
543        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
544                                         use_null_block, sar_settings, pcm_settings)
545        self.num_cores = num_cores
546        self.num_shared_buffers = num_shared_buffers
547
548    def spdk_tgt_configure(self):
549        self.log_print("Configuring SPDK NVMeOF target via RPC")
550        numa_list = get_used_numa_nodes()
551
552        # Create RDMA transport layer
553        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
554        self.log_print("SPDK NVMeOF transport layer:")
555        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
556
557        if self.null_block:
558            nvme_section = self.spdk_tgt_add_nullblock()
559            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
560        else:
561            nvme_section = self.spdk_tgt_add_nvme_conf()
562            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
563        self.log_print("Done configuring SPDK NVMeOF Target")
564
565    def spdk_tgt_add_nullblock(self):
566        self.log_print("Adding null block bdev to config via RPC")
567        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
568        self.log_print("SPDK Bdevs configuration:")
569        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
570
571    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
572        self.log_print("Adding NVMe bdevs to config via RPC")
573
574        bdfs = get_nvme_devices_bdf()
575        bdfs = [b.replace(":", ".") for b in bdfs]
576
577        if req_num_disks:
578            if req_num_disks > len(bdfs):
579                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
580                sys.exit(1)
581            else:
582                bdfs = bdfs[0:req_num_disks]
583
584        for i, bdf in enumerate(bdfs):
585            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
586
587        self.log_print("SPDK Bdevs configuration:")
588        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
589
590    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
591        self.log_print("Adding subsystems to config")
592        if not req_num_disks:
593            req_num_disks = get_nvme_devices_count()
594
595        # Distribute bdevs between provided NICs
596        num_disks = range(1, req_num_disks + 1)
597        disks_per_ip = int(len(num_disks) / len(ips))
598        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
599
600        # Create subsystems, add bdevs to namespaces, add listeners
601        for ip, chunk in zip(ips, disk_chunks):
602            for c in chunk:
603                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
604                serial = "SPDK00%s" % c
605                bdev_name = "Nvme%sn1" % (c - 1)
606                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
607                                               allow_any_host=True, max_namespaces=8)
608                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
609
610                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
611                                                     trtype=self.transport,
612                                                     traddr=ip,
613                                                     trsvcid="4420",
614                                                     adrfam="ipv4")
615
616        self.log_print("SPDK NVMeOF subsystem configuration:")
617        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
618
619    def tgt_start(self):
620        self.subsys_no = get_nvme_devices_count()
621        self.log_print("Starting SPDK NVMeOF Target process")
622        nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt")
623        command = " ".join([nvmf_app_path, "-m", self.num_cores])
624        proc = subprocess.Popen(command, shell=True)
625        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
626
627        with open(self.pid, "w") as fh:
628            fh.write(str(proc.pid))
629        self.nvmf_proc = proc
630        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
631        self.log_print("Waiting for spdk to initilize...")
632        while True:
633            if os.path.exists("/var/tmp/spdk.sock"):
634                break
635            time.sleep(1)
636        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
637
638        self.spdk_tgt_configure()
639
640    def __del__(self):
641        if hasattr(self, "nvmf_proc"):
642            try:
643                self.nvmf_proc.terminate()
644                self.nvmf_proc.wait()
645            except Exception as e:
646                self.log_print(e)
647                self.nvmf_proc.kill()
648                self.nvmf_proc.communicate()
649
650
651class KernelInitiator(Initiator):
652    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
653                 cpus_allowed=None, cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio", **kwargs):
654
655        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
656                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
657                                              fio_bin=fio_bin)
658
659        self.extra_params = ""
660        if kwargs["extra_params"]:
661            self.extra_params = kwargs["extra_params"]
662
663    def __del__(self):
664        self.ssh_connection.close()
665
666    def kernel_init_connect(self, address_list, subsys_no):
667        subsystems = self.discover_subsystems(address_list, subsys_no)
668        self.log_print("Below connection attempts may result in error messages, this is expected!")
669        for subsystem in subsystems:
670            self.log_print("Trying to connect %s %s %s" % subsystem)
671            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
672                                                                             self.transport,
673                                                                             *subsystem,
674                                                                             self.extra_params))
675            time.sleep(2)
676
677    def kernel_init_disconnect(self, address_list, subsys_no):
678        subsystems = self.discover_subsystems(address_list, subsys_no)
679        for subsystem in subsystems:
680            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
681            time.sleep(1)
682
683    def gen_fio_filename_conf(self, threads):
684        out, err = self.remote_call("lsblk -o NAME -nlp")
685        nvme_list = [x for x in out.split("\n") if "nvme" in x]
686
687        filename_section = ""
688        filenames = ["nvme%sn1" % x for x in range(0, len(nvme_list))]
689        nvme_per_split = int(len(nvme_list) / threads)
690        remainder = len(nvme_list) % threads
691        iterator = iter(filenames)
692        result = []
693        for i in range(threads):
694            result.append([])
695            for j in range(nvme_per_split):
696                result[i].append(next(iterator))
697                if remainder:
698                    result[i].append(next(iterator))
699                    remainder -= 1
700        for i, r in enumerate(result):
701            header = "[filename%s]" % i
702            disks = "\n".join(["filename=/dev/%s" % x for x in r])
703            filename_section = "\n".join([filename_section, header, disks])
704
705        return filename_section
706
707
708class SPDKInitiator(Initiator):
709    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
710                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
711                 fio_bin="/usr/src/fio/fio", **kwargs):
712        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
713                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
714                                            fio_bin=fio_bin)
715
716        self.num_cores = num_cores
717
718    def install_spdk(self, local_spdk_zip):
719        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
720        self.log_print("Copied sources zip from target")
721        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
722
723        self.log_print("Sources unpacked")
724        self.log_print("Using fio binary %s" % self.fio_bin)
725        self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;"
726                         "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
727
728        self.log_print("SPDK built")
729        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
730
731    def gen_spdk_bdev_conf(self, remote_subsystem_list):
732        header = "[Nvme]"
733        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
734
735        bdev_rows = [row_template.format(transport=self.transport,
736                                         svc=x[0],
737                                         nqn=x[1],
738                                         ip=x[2],
739                                         i=i) for i, x in enumerate(remote_subsystem_list)]
740        bdev_rows = "\n".join(bdev_rows)
741        bdev_section = "\n".join([header, bdev_rows])
742        return bdev_section
743
744    def gen_fio_filename_conf(self, subsystems, threads):
745        filename_section = ""
746        filenames = ["Nvme%sn1" % x for x in range(0, subsystems)]
747        nvme_per_split = int(subsystems / threads)
748        remainder = subsystems % threads
749        iterator = iter(filenames)
750        result = []
751        for i in range(threads):
752            result.append([])
753            for j in range(nvme_per_split):
754                result[i].append(next(iterator))
755            if remainder:
756                result[i].append(next(iterator))
757                remainder -= 1
758        for i, r in enumerate(result):
759            header = "[filename%s]" % i
760            disks = "\n".join(["filename=%s" % x for x in r])
761            filename_section = "\n".join([filename_section, header, disks])
762
763        return filename_section
764
765
766if __name__ == "__main__":
767    spdk_zip_path = "/tmp/spdk.zip"
768    target_results_dir = "/tmp/results"
769
770    if (len(sys.argv) > 1):
771        config_file_path = sys.argv[1]
772    else:
773        script_full_dir = os.path.dirname(os.path.realpath(__file__))
774        config_file_path = os.path.join(script_full_dir, "config.json")
775
776    print("Using config file: %s" % config_file_path)
777    with open(config_file_path, "r") as config:
778        data = json.load(config)
779
780    initiators = []
781    fio_cases = []
782
783    for k, v in data.items():
784        if "target" in k:
785            if data[k]["mode"] == "spdk":
786                target_obj = SPDKTarget(name=k, **data["general"], **v)
787            elif data[k]["mode"] == "kernel":
788                target_obj = KernelTarget(name=k, **data["general"], **v)
789        elif "initiator" in k:
790            if data[k]["mode"] == "spdk":
791                init_obj = SPDKInitiator(name=k, **data["general"], **v)
792            elif data[k]["mode"] == "kernel":
793                init_obj = KernelInitiator(name=k, **data["general"], **v)
794            initiators.append(init_obj)
795        elif "fio" in k:
796            fio_workloads = itertools.product(data[k]["bs"],
797                                              data[k]["qd"],
798                                              data[k]["rw"])
799
800            fio_run_time = data[k]["run_time"]
801            fio_ramp_time = data[k]["ramp_time"]
802            fio_rw_mix_read = data[k]["rwmixread"]
803            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
804            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
805        else:
806            continue
807
808    # Copy and install SPDK on remote initiators
809    target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
810    threads = []
811    for i in initiators:
812        if i.mode == "spdk":
813            t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
814            threads.append(t)
815            t.start()
816    for t in threads:
817        t.join()
818
819    target_obj.tgt_start()
820
821    # Poor mans threading
822    # Run FIO tests
823    for block_size, io_depth, rw in fio_workloads:
824        threads = []
825        configs = []
826        for i in initiators:
827            if i.mode == "kernel":
828                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
829
830            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
831                                   fio_num_jobs, fio_ramp_time, fio_run_time)
832            configs.append(cfg)
833
834        for i, cfg in zip(initiators, configs):
835            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
836            threads.append(t)
837        if target_obj.enable_sar:
838            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
839            sar_file_name = ".".join([sar_file_name, "txt"])
840            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
841            threads.append(t)
842
843        if target_obj.enable_pcm:
844            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
845            pcm_file_name = ".".join([pcm_file_name, "csv"])
846            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
847            threads.append(t)
848
849        if target_obj.enable_pcm_memory:
850            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
851            pcm_file_name = ".".join([pcm_file_name, "csv"])
852            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
853            threads.append(t)
854
855        for t in threads:
856            t.start()
857        for t in threads:
858            t.join()
859
860        for i in initiators:
861            if i.mode == "kernel":
862                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
863            i.copy_result_files(target_results_dir)
864
865    target_obj.parse_results(target_results_dir)
866