xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 367c980b453f48310e52d2574afe7d2774df800c)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from collections import OrderedDict
18from common import *
19
20
21class Server:
22    def __init__(self, name, username, password, mode, nic_ips, transport):
23        self.name = name
24        self.mode = mode
25        self.username = username
26        self.password = password
27        self.nic_ips = nic_ips
28        self.transport = transport.lower()
29
30        if not re.match("^[A-Za-z0-9]*$", name):
31            self.log_print("Please use a name which contains only letters or numbers")
32            sys.exit(1)
33
34    def log_print(self, msg):
35        print("[%s] %s" % (self.name, msg), flush=True)
36
37
38class Target(Server):
39    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
40                 use_null_block=False, sar_settings=None, pcm_settings=None):
41
42        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
43        self.null_block = bool(use_null_block)
44        self.enable_sar = False
45        self.enable_pcm_memory = False
46        self.enable_pcm = False
47
48        if sar_settings:
49            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
50
51        if pcm_settings:
52            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
53
54        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
55        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
56
57    def zip_spdk_sources(self, spdk_dir, dest_file):
58        self.log_print("Zipping SPDK source directory")
59        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
60        for root, directories, files in os.walk(spdk_dir, followlinks=True):
61            for file in files:
62                fh.write(os.path.relpath(os.path.join(root, file)))
63        fh.close()
64        self.log_print("Done zipping")
65
66    def read_json_stats(self, file):
67        with open(file, "r") as json_data:
68            data = json.load(json_data)
69            job_pos = 0  # job_post = 0 because using aggregated results
70
71            # Check if latency is in nano or microseconds to choose correct dict key
72            def get_lat_unit(key_prefix, dict_section):
73                # key prefix - lat, clat or slat.
74                # dict section - portion of json containing latency bucket in question
75                # Return dict key to access the bucket and unit as string
76                for k, v in dict_section.items():
77                    if k.startswith(key_prefix):
78                        return k, k.split("_")[1]
79
80            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
81            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
82            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
83            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
84            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
85            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
86            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
87            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
88            read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
89            read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
90            read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
91
92            if "ns" in lat_unit:
93                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
94            if "ns" in clat_unit:
95                read_p99_lat = read_p99_lat / 1000
96                read_p99_9_lat = read_p99_9_lat / 1000
97                read_p99_99_lat = read_p99_99_lat / 1000
98                read_p99_999_lat = read_p99_999_lat / 1000
99
100            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
101            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
102            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
103            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
104            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
105            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
106            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
107            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
108            write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
109            write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
110            write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
111
112            if "ns" in lat_unit:
113                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
114            if "ns" in clat_unit:
115                write_p99_lat = write_p99_lat / 1000
116                write_p99_9_lat = write_p99_9_lat / 1000
117                write_p99_99_lat = write_p99_99_lat / 1000
118                write_p99_999_lat = write_p99_999_lat / 1000
119
120        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
121                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
122                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
123                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
124
125    def parse_results(self, results_dir, initiator_count=None, run_num=None):
126        files = os.listdir(results_dir)
127        fio_files = filter(lambda x: ".fio" in x, files)
128        json_files = [x for x in files if ".json" in x]
129
130        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
131                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
132                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
133                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
134
135        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
136                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
137
138        header_line = ",".join(["Name", *headers])
139        aggr_header_line = ",".join(["Name", *aggr_headers])
140
141        # Create empty results file
142        csv_file = "nvmf_results.csv"
143        with open(os.path.join(results_dir, csv_file), "w") as fh:
144            fh.write(aggr_header_line + "\n")
145        rows = set()
146
147        for fio_config in fio_files:
148            self.log_print("Getting FIO stats for %s" % fio_config)
149            job_name, _ = os.path.splitext(fio_config)
150
151            # Look in the filename for rwmixread value. Function arguments do
152            # not have that information.
153            # TODO: Improve this function by directly using workload params instead
154            # of regexing through filenames.
155            if "read" in job_name:
156                rw_mixread = 1
157            elif "write" in job_name:
158                rw_mixread = 0
159            else:
160                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
161
162            # If "_CPU" exists in name - ignore it
163            # Initiators for the same job could have diffrent num_cores parameter
164            job_name = re.sub(r"_\d+CPU", "", job_name)
165            job_result_files = [x for x in json_files if job_name in x]
166            self.log_print("Matching result files for current fio config:")
167            for j in job_result_files:
168                self.log_print("\t %s" % j)
169
170            # There may have been more than 1 initiator used in test, need to check that
171            # Result files are created so that string after last "_" separator is server name
172            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
173            inits_avg_results = []
174            for i in inits_names:
175                self.log_print("\tGetting stats for initiator %s" % i)
176                # There may have been more than 1 test run for this job, calculate average results for initiator
177                i_results = [x for x in job_result_files if i in x]
178                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
179
180                separate_stats = []
181                for r in i_results:
182                    stats = self.read_json_stats(os.path.join(results_dir, r))
183                    separate_stats.append(stats)
184                    self.log_print(stats)
185
186                init_results = [sum(x) for x in zip(*separate_stats)]
187                init_results = [x / len(separate_stats) for x in init_results]
188                inits_avg_results.append(init_results)
189
190                self.log_print("\tAverage results for initiator %s" % i)
191                self.log_print(init_results)
192                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
193                    fh.write(header_line + "\n")
194                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
195
196            # Sum results of all initiators running this FIO job.
197            # Latency results are an average of latencies from accros all initiators.
198            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
199            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
200            for key in inits_avg_results:
201                if "lat" in key:
202                    inits_avg_results[key] /= len(inits_names)
203
204            # Aggregate separate read/write values into common labels
205            # Take rw_mixread into consideration for mixed read/write workloads.
206            aggregate_results = OrderedDict()
207            for h in aggr_headers:
208                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
209                if "lat" in h:
210                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
211                else:
212                    _ = read_stat + write_stat
213                aggregate_results[h] = "{0:.3f}".format(_)
214
215            rows.add(",".join([job_name, *aggregate_results.values()]))
216
217        # Save results to file
218        for row in rows:
219            with open(os.path.join(results_dir, csv_file), "a") as fh:
220                fh.write(row + "\n")
221        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
222
223    def measure_sar(self, results_dir, sar_file_name):
224        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
225        time.sleep(self.sar_delay)
226        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
227        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
228            for line in out.split("\n"):
229                if "Average" in line and "CPU" in line:
230                    self.log_print("Summary CPU utilization from SAR:")
231                    self.log_print(line)
232                if "Average" in line and "all" in line:
233                    self.log_print(line)
234            fh.write(out)
235
236    def measure_pcm_memory(self, results_dir, pcm_file_name):
237        time.sleep(self.pcm_delay)
238        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
239                                      results_dir, pcm_file_name), shell=True)
240        time.sleep(self.pcm_count)
241        pcm_memory.kill()
242
243    def measure_pcm(self, results_dir, pcm_file_name):
244        time.sleep(self.pcm_delay)
245        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
246                       results_dir, pcm_file_name), shell=True, check=True)
247        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
248        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
249        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
250        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
251        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
252
253
254class Initiator(Server):
255    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
256                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
257                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
258
259        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
260
261        self.ip = ip
262        self.spdk_dir = workspace
263        if os.getenv('SPDK_WORKSPACE'):
264            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
265        self.fio_bin = fio_bin
266        self.cpus_allowed = cpus_allowed
267        self.cpus_allowed_policy = cpus_allowed_policy
268        self.cpu_frequency = cpu_frequency
269        self.nvmecli_bin = nvmecli_bin
270        self.ssh_connection = paramiko.SSHClient()
271        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
272        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
273        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
274        self.remote_call("mkdir -p %s" % self.spdk_dir)
275        self.set_cpu_frequency()
276
277    def __del__(self):
278        self.ssh_connection.close()
279
280    def put_file(self, local, remote_dest):
281        ftp = self.ssh_connection.open_sftp()
282        ftp.put(local, remote_dest)
283        ftp.close()
284
285    def get_file(self, remote, local_dest):
286        ftp = self.ssh_connection.open_sftp()
287        ftp.get(remote, local_dest)
288        ftp.close()
289
290    def remote_call(self, cmd):
291        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
292        out = stdout.read().decode(encoding="utf-8")
293        err = stderr.read().decode(encoding="utf-8")
294        return out, err
295
296    def copy_result_files(self, dest_dir):
297        self.log_print("Copying results")
298
299        if not os.path.exists(dest_dir):
300            os.mkdir(dest_dir)
301
302        # Get list of result files from initiator and copy them back to target
303        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
304        file_list = stdout.strip().split("\n")
305
306        for file in file_list:
307            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
308                          os.path.join(dest_dir, file))
309        self.log_print("Done copying results")
310
311    def discover_subsystems(self, address_list, subsys_no):
312        num_nvmes = range(0, subsys_no)
313        nvme_discover_output = ""
314        for ip, subsys_no in itertools.product(address_list, num_nvmes):
315            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
316            nvme_discover_cmd = ["sudo",
317                                 "%s" % self.nvmecli_bin,
318                                 "discover", "-t %s" % self.transport,
319                                 "-s %s" % (4420 + subsys_no),
320                                 "-a %s" % ip]
321            nvme_discover_cmd = " ".join(nvme_discover_cmd)
322
323            stdout, stderr = self.remote_call(nvme_discover_cmd)
324            if stdout:
325                nvme_discover_output = nvme_discover_output + stdout
326
327        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
328                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
329                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
330                                nvme_discover_output)  # from nvme discovery output
331        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
332        subsystems = list(set(subsystems))
333        subsystems.sort(key=lambda x: x[1])
334        self.log_print("Found matching subsystems on target side:")
335        for s in subsystems:
336            self.log_print(s)
337
338        return subsystems
339
340    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
341        fio_conf_template = """
342[global]
343ioengine={ioengine}
344{spdk_conf}
345thread=1
346group_reporting=1
347direct=1
348percentile_list=50:90:99:99.5:99.9:99.99:99.999
349
350norandommap=1
351rw={rw}
352rwmixread={rwmixread}
353bs={block_size}
354time_based=1
355ramp_time={ramp_time}
356runtime={run_time}
357"""
358        if "spdk" in self.mode:
359            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
360            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
361            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
362            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
363            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
364        else:
365            ioengine = "libaio"
366            spdk_conf = ""
367            out, err = self.remote_call("lsblk -o NAME -nlp")
368            subsystems = [x for x in out.split("\n") if "nvme" in x]
369
370        if self.cpus_allowed is not None:
371            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
372            cpus_num = 0
373            cpus = self.cpus_allowed.split(",")
374            for cpu in cpus:
375                if "-" in cpu:
376                    a, b = cpu.split("-")
377                    a = int(a)
378                    b = int(b)
379                    cpus_num += len(range(a, b))
380                else:
381                    cpus_num += 1
382            threads = range(0, cpus_num)
383        elif hasattr(self, 'num_cores'):
384            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
385            threads = range(0, int(self.num_cores))
386        else:
387            threads = range(0, len(subsystems))
388
389        if "spdk" in self.mode:
390            filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs)
391        else:
392            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
393
394        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
395                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
396                                              ramp_time=ramp_time, run_time=run_time)
397        if num_jobs:
398            fio_config = fio_config + "numjobs=%s \n" % num_jobs
399        if self.cpus_allowed is not None:
400            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
401            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
402        fio_config = fio_config + filename_section
403
404        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
405        if hasattr(self, "num_cores"):
406            fio_config_filename += "_%sCPU" % self.num_cores
407        fio_config_filename += ".fio"
408
409        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
410        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
411        self.log_print("Created FIO Config:")
412        self.log_print(fio_config)
413
414        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
415
416    def set_cpu_frequency(self):
417        if self.cpu_frequency is not None:
418            try:
419                self.remote_call('sudo cpupower frequency-set -g userspace')
420                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
421                cmd = "sudo cpupower frequency-info"
422                output, error = self.remote_call(cmd)
423                self.log_print(output)
424                self.log_print(error)
425            except Exception:
426                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
427                sys.exit()
428        else:
429            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
430
431    def run_fio(self, fio_config_file, run_num=None):
432        job_name, _ = os.path.splitext(fio_config_file)
433        self.log_print("Starting FIO run for job: %s" % job_name)
434        self.log_print("Using FIO: %s" % self.fio_bin)
435
436        if run_num:
437            for i in range(1, run_num + 1):
438                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
439                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
440                output, error = self.remote_call(cmd)
441                self.log_print(output)
442                self.log_print(error)
443        else:
444            output_filename = job_name + "_" + self.name + ".json"
445            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
446            output, error = self.remote_call(cmd)
447            self.log_print(output)
448            self.log_print(error)
449        self.log_print("FIO run finished. Results in: %s" % output_filename)
450
451
452class KernelTarget(Target):
453    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
454                 use_null_block=False, sar_settings=None, pcm_settings=None,
455                 nvmet_bin="nvmetcli", **kwargs):
456
457        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
458                                           use_null_block, sar_settings, pcm_settings)
459        self.nvmet_bin = nvmet_bin
460
461    def __del__(self):
462        nvmet_command(self.nvmet_bin, "clear")
463
464    def kernel_tgt_gen_nullblock_conf(self, address):
465        nvmet_cfg = {
466            "ports": [],
467            "hosts": [],
468            "subsystems": [],
469        }
470
471        nvmet_cfg["subsystems"].append({
472            "allowed_hosts": [],
473            "attr": {
474                "allow_any_host": "1",
475                "version": "1.3"
476            },
477            "namespaces": [
478                {
479                    "device": {
480                        "path": "/dev/nullb0",
481                        "uuid": "%s" % uuid.uuid4()
482                    },
483                    "enable": 1,
484                    "nsid": 1
485                }
486            ],
487            "nqn": "nqn.2018-09.io.spdk:cnode1"
488        })
489
490        nvmet_cfg["ports"].append({
491            "addr": {
492                "adrfam": "ipv4",
493                "traddr": address,
494                "trsvcid": "4420",
495                "trtype": "%s" % self.transport,
496            },
497            "portid": 1,
498            "referrals": [],
499            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
500        })
501        with open("kernel.conf", 'w') as fh:
502            fh.write(json.dumps(nvmet_cfg, indent=2))
503
504    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
505
506        nvmet_cfg = {
507            "ports": [],
508            "hosts": [],
509            "subsystems": [],
510        }
511
512        # Split disks between NIC IP's
513        disks_per_ip = int(len(nvme_list) / len(address_list))
514        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
515
516        subsys_no = 1
517        port_no = 0
518        for ip, chunk in zip(address_list, disk_chunks):
519            for disk in chunk:
520                nvmet_cfg["subsystems"].append({
521                    "allowed_hosts": [],
522                    "attr": {
523                        "allow_any_host": "1",
524                        "version": "1.3"
525                    },
526                    "namespaces": [
527                        {
528                            "device": {
529                                "path": disk,
530                                "uuid": "%s" % uuid.uuid4()
531                            },
532                            "enable": 1,
533                            "nsid": subsys_no
534                        }
535                    ],
536                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
537                })
538
539                nvmet_cfg["ports"].append({
540                    "addr": {
541                        "adrfam": "ipv4",
542                        "traddr": ip,
543                        "trsvcid": "%s" % (4420 + port_no),
544                        "trtype": "%s" % self.transport
545                    },
546                    "portid": subsys_no,
547                    "referrals": [],
548                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
549                })
550                subsys_no += 1
551                port_no += 1
552
553        with open("kernel.conf", "w") as fh:
554            fh.write(json.dumps(nvmet_cfg, indent=2))
555        pass
556
557    def tgt_start(self):
558        self.log_print("Configuring kernel NVMeOF Target")
559        self.subsys_no = get_nvme_devices_count()
560
561        if self.null_block:
562            print("Configuring with null block device.")
563            if len(self.nic_ips) > 1:
564                print("Testing with null block limited to single RDMA NIC.")
565                print("Please specify only 1 IP address.")
566                exit(1)
567            self.subsys_no = 1
568            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
569        else:
570            print("Configuring with NVMe drives.")
571            nvme_list = get_nvme_devices()
572            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
573            self.subsys_no = len(nvme_list)
574
575        nvmet_command(self.nvmet_bin, "clear")
576        nvmet_command(self.nvmet_bin, "restore kernel.conf")
577        self.log_print("Done configuring kernel NVMeOF Target")
578
579
580class SPDKTarget(Target):
581
582    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
583                 use_null_block=False, sar_settings=None, pcm_settings=None,
584                 num_shared_buffers=4096, num_cores=1, **kwargs):
585
586        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
587                                         use_null_block, sar_settings, pcm_settings)
588        self.num_cores = num_cores
589        self.num_shared_buffers = num_shared_buffers
590
591    def spdk_tgt_configure(self):
592        self.log_print("Configuring SPDK NVMeOF target via RPC")
593        numa_list = get_used_numa_nodes()
594
595        # Create RDMA transport layer
596        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
597        self.log_print("SPDK NVMeOF transport layer:")
598        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
599
600        if self.null_block:
601            nvme_section = self.spdk_tgt_add_nullblock()
602            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
603        else:
604            nvme_section = self.spdk_tgt_add_nvme_conf()
605            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
606        self.log_print("Done configuring SPDK NVMeOF Target")
607
608    def spdk_tgt_add_nullblock(self):
609        self.log_print("Adding null block bdev to config via RPC")
610        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
611        self.log_print("SPDK Bdevs configuration:")
612        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
613
614    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
615        self.log_print("Adding NVMe bdevs to config via RPC")
616
617        bdfs = get_nvme_devices_bdf()
618        bdfs = [b.replace(":", ".") for b in bdfs]
619
620        if req_num_disks:
621            if req_num_disks > len(bdfs):
622                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
623                sys.exit(1)
624            else:
625                bdfs = bdfs[0:req_num_disks]
626
627        for i, bdf in enumerate(bdfs):
628            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
629
630        self.log_print("SPDK Bdevs configuration:")
631        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
632
633    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
634        self.log_print("Adding subsystems to config")
635        if not req_num_disks:
636            req_num_disks = get_nvme_devices_count()
637
638        # Distribute bdevs between provided NICs
639        num_disks = range(0, req_num_disks)
640        if len(num_disks) == 1:
641            disks_per_ip = 1
642        else:
643            disks_per_ip = int(len(num_disks) / len(ips))
644        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
645
646        # Create subsystems, add bdevs to namespaces, add listeners
647        for ip, chunk in zip(ips, disk_chunks):
648            for c in chunk:
649                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
650                serial = "SPDK00%s" % c
651                bdev_name = "Nvme%sn1" % c
652                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
653                                               allow_any_host=True, max_namespaces=8)
654                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
655
656                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
657                                                     trtype=self.transport,
658                                                     traddr=ip,
659                                                     trsvcid="4420",
660                                                     adrfam="ipv4")
661
662        self.log_print("SPDK NVMeOF subsystem configuration:")
663        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
664
665    def tgt_start(self):
666        self.subsys_no = get_nvme_devices_count()
667        if self.null_block:
668            self.subsys_no = 1
669        self.log_print("Starting SPDK NVMeOF Target process")
670        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
671        command = " ".join([nvmf_app_path, "-m", self.num_cores])
672        proc = subprocess.Popen(command, shell=True)
673        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
674
675        with open(self.pid, "w") as fh:
676            fh.write(str(proc.pid))
677        self.nvmf_proc = proc
678        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
679        self.log_print("Waiting for spdk to initilize...")
680        while True:
681            if os.path.exists("/var/tmp/spdk.sock"):
682                break
683            time.sleep(1)
684        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
685
686        self.spdk_tgt_configure()
687
688    def __del__(self):
689        if hasattr(self, "nvmf_proc"):
690            try:
691                self.nvmf_proc.terminate()
692                self.nvmf_proc.wait()
693            except Exception as e:
694                self.log_print(e)
695                self.nvmf_proc.kill()
696                self.nvmf_proc.communicate()
697
698
699class KernelInitiator(Initiator):
700    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
701                 cpus_allowed=None, cpus_allowed_policy="shared",
702                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
703
704        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
705                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
706                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
707
708        self.extra_params = ""
709        if kwargs["extra_params"]:
710            self.extra_params = kwargs["extra_params"]
711
712    def __del__(self):
713        self.ssh_connection.close()
714
715    def kernel_init_connect(self, address_list, subsys_no):
716        subsystems = self.discover_subsystems(address_list, subsys_no)
717        self.log_print("Below connection attempts may result in error messages, this is expected!")
718        for subsystem in subsystems:
719            self.log_print("Trying to connect %s %s %s" % subsystem)
720            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
721                                                                             self.transport,
722                                                                             *subsystem,
723                                                                             self.extra_params))
724            time.sleep(2)
725
726    def kernel_init_disconnect(self, address_list, subsys_no):
727        subsystems = self.discover_subsystems(address_list, subsys_no)
728        for subsystem in subsystems:
729            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
730            time.sleep(1)
731
732    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
733        out, err = self.remote_call("lsblk -o NAME -nlp")
734        nvme_list = [x for x in out.split("\n") if "nvme" in x]
735
736        filename_section = ""
737        nvme_per_split = int(len(nvme_list) / len(threads))
738        remainder = len(nvme_list) % len(threads)
739        iterator = iter(nvme_list)
740        result = []
741        for i in range(len(threads)):
742            result.append([])
743            for j in range(nvme_per_split):
744                result[i].append(next(iterator))
745                if remainder:
746                    result[i].append(next(iterator))
747                    remainder -= 1
748        for i, r in enumerate(result):
749            header = "[filename%s]" % i
750            disks = "\n".join(["filename=%s" % x for x in r])
751            job_section_qd = round((io_depth * len(r)) / num_jobs)
752            iodepth = "iodepth=%s" % job_section_qd
753            filename_section = "\n".join([filename_section, header, disks, iodepth])
754
755        return filename_section
756
757
758class SPDKInitiator(Initiator):
759    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
760                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
761                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
762        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
763                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
764                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
765
766        self.num_cores = num_cores
767
768    def install_spdk(self, local_spdk_zip):
769        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
770        self.log_print("Copied sources zip from target")
771        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
772
773        self.log_print("Sources unpacked")
774        self.log_print("Using fio binary %s" % self.fio_bin)
775        self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;"
776                         "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
777
778        self.log_print("SPDK built")
779        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
780
781    def gen_spdk_bdev_conf(self, remote_subsystem_list):
782        header = "[Nvme]"
783        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
784
785        bdev_rows = [row_template.format(transport=self.transport,
786                                         svc=x[0],
787                                         nqn=x[1],
788                                         ip=x[2],
789                                         i=i) for i, x in enumerate(remote_subsystem_list)]
790        bdev_rows = "\n".join(bdev_rows)
791        bdev_section = "\n".join([header, bdev_rows])
792        return bdev_section
793
794    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
795        filename_section = ""
796        if len(threads) >= len(subsystems):
797            threads = range(0, len(subsystems))
798        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
799        nvme_per_split = int(len(subsystems) / len(threads))
800        remainder = len(subsystems) % len(threads)
801        iterator = iter(filenames)
802        result = []
803        for i in range(len(threads)):
804            result.append([])
805            for j in range(nvme_per_split):
806                result[i].append(next(iterator))
807            if remainder:
808                result[i].append(next(iterator))
809                remainder -= 1
810        for i, r in enumerate(result):
811            header = "[filename%s]" % i
812            disks = "\n".join(["filename=%s" % x for x in r])
813            job_section_qd = round((io_depth * len(r)) / num_jobs)
814            iodepth = "iodepth=%s" % job_section_qd
815            filename_section = "\n".join([filename_section, header, disks, iodepth])
816
817        return filename_section
818
819
820if __name__ == "__main__":
821    spdk_zip_path = "/tmp/spdk.zip"
822    target_results_dir = "/tmp/results"
823
824    if (len(sys.argv) > 1):
825        config_file_path = sys.argv[1]
826    else:
827        script_full_dir = os.path.dirname(os.path.realpath(__file__))
828        config_file_path = os.path.join(script_full_dir, "config.json")
829
830    print("Using config file: %s" % config_file_path)
831    with open(config_file_path, "r") as config:
832        data = json.load(config)
833
834    initiators = []
835    fio_cases = []
836
837    for k, v in data.items():
838        if "target" in k:
839            if data[k]["mode"] == "spdk":
840                target_obj = SPDKTarget(name=k, **data["general"], **v)
841            elif data[k]["mode"] == "kernel":
842                target_obj = KernelTarget(name=k, **data["general"], **v)
843        elif "initiator" in k:
844            if data[k]["mode"] == "spdk":
845                init_obj = SPDKInitiator(name=k, **data["general"], **v)
846            elif data[k]["mode"] == "kernel":
847                init_obj = KernelInitiator(name=k, **data["general"], **v)
848            initiators.append(init_obj)
849        elif "fio" in k:
850            fio_workloads = itertools.product(data[k]["bs"],
851                                              data[k]["qd"],
852                                              data[k]["rw"])
853
854            fio_run_time = data[k]["run_time"]
855            fio_ramp_time = data[k]["ramp_time"]
856            fio_rw_mix_read = data[k]["rwmixread"]
857            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
858            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
859        else:
860            continue
861
862    # Copy and install SPDK on remote initiators
863    if "skip_spdk_install" not in data["general"]:
864        target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
865        threads = []
866        for i in initiators:
867            if i.mode == "spdk":
868                t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
869                threads.append(t)
870                t.start()
871        for t in threads:
872            t.join()
873
874    target_obj.tgt_start()
875
876    # Poor mans threading
877    # Run FIO tests
878    for block_size, io_depth, rw in fio_workloads:
879        threads = []
880        configs = []
881        for i in initiators:
882            if i.mode == "kernel":
883                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
884
885            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
886                                   fio_num_jobs, fio_ramp_time, fio_run_time)
887            configs.append(cfg)
888
889        for i, cfg in zip(initiators, configs):
890            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
891            threads.append(t)
892        if target_obj.enable_sar:
893            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
894            sar_file_name = ".".join([sar_file_name, "txt"])
895            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
896            threads.append(t)
897
898        if target_obj.enable_pcm:
899            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
900            pcm_file_name = ".".join([pcm_file_name, "csv"])
901            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
902            threads.append(t)
903
904        if target_obj.enable_pcm_memory:
905            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
906            pcm_file_name = ".".join([pcm_file_name, "csv"])
907            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
908            threads.append(t)
909
910        for t in threads:
911            t.start()
912        for t in threads:
913            t.join()
914
915        for i in initiators:
916            if i.mode == "kernel":
917                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
918            i.copy_result_files(target_results_dir)
919
920    target_obj.parse_results(target_results_dir)
921