xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision ce1501218bda70b4fdb3132506ec5c0759cb312d)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from collections import OrderedDict
18from common import *
19
20
21class Server:
22    def __init__(self, name, username, password, mode, nic_ips, transport):
23        self.name = name
24        self.mode = mode
25        self.username = username
26        self.password = password
27        self.nic_ips = nic_ips
28        self.transport = transport.lower()
29
30        if not re.match("^[A-Za-z0-9]*$", name):
31            self.log_print("Please use a name which contains only letters or numbers")
32            sys.exit(1)
33
34    def log_print(self, msg):
35        print("[%s] %s" % (self.name, msg), flush=True)
36
37
38class Target(Server):
39    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
40                 use_null_block=False, sar_settings=None, pcm_settings=None,
41                 bandwidth_settings=None):
42
43        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
44        self.null_block = bool(use_null_block)
45        self.enable_sar = False
46        self.enable_pcm_memory = False
47        self.enable_pcm = False
48        self.enable_bandwidth = False
49
50        if sar_settings:
51            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
52
53        if pcm_settings:
54            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
55
56        if bandwidth_settings:
57            self.enable_bandwidth, self.bandwidth_count = bandwidth_settings
58
59        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
60        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
61
62    def zip_spdk_sources(self, spdk_dir, dest_file):
63        self.log_print("Zipping SPDK source directory")
64        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
65        for root, directories, files in os.walk(spdk_dir, followlinks=True):
66            for file in files:
67                fh.write(os.path.relpath(os.path.join(root, file)))
68        fh.close()
69        self.log_print("Done zipping")
70
71    def read_json_stats(self, file):
72        with open(file, "r") as json_data:
73            data = json.load(json_data)
74            job_pos = 0  # job_post = 0 because using aggregated results
75
76            # Check if latency is in nano or microseconds to choose correct dict key
77            def get_lat_unit(key_prefix, dict_section):
78                # key prefix - lat, clat or slat.
79                # dict section - portion of json containing latency bucket in question
80                # Return dict key to access the bucket and unit as string
81                for k, v in dict_section.items():
82                    if k.startswith(key_prefix):
83                        return k, k.split("_")[1]
84
85            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
86            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
87            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
88            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
89            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
90            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
91            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
92            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
93            read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
94            read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
95            read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
96
97            if "ns" in lat_unit:
98                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
99            if "ns" in clat_unit:
100                read_p99_lat = read_p99_lat / 1000
101                read_p99_9_lat = read_p99_9_lat / 1000
102                read_p99_99_lat = read_p99_99_lat / 1000
103                read_p99_999_lat = read_p99_999_lat / 1000
104
105            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
106            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
107            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
108            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
109            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
110            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
111            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
112            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
113            write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
114            write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
115            write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
116
117            if "ns" in lat_unit:
118                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
119            if "ns" in clat_unit:
120                write_p99_lat = write_p99_lat / 1000
121                write_p99_9_lat = write_p99_9_lat / 1000
122                write_p99_99_lat = write_p99_99_lat / 1000
123                write_p99_999_lat = write_p99_999_lat / 1000
124
125        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
126                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
127                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
128                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
129
130    def parse_results(self, results_dir, initiator_count=None, run_num=None):
131        files = os.listdir(results_dir)
132        fio_files = filter(lambda x: ".fio" in x, files)
133        json_files = [x for x in files if ".json" in x]
134
135        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
136                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
137                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
138                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
139
140        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
141                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
142
143        header_line = ",".join(["Name", *headers])
144        aggr_header_line = ",".join(["Name", *aggr_headers])
145
146        # Create empty results file
147        csv_file = "nvmf_results.csv"
148        with open(os.path.join(results_dir, csv_file), "w") as fh:
149            fh.write(aggr_header_line + "\n")
150        rows = set()
151
152        for fio_config in fio_files:
153            self.log_print("Getting FIO stats for %s" % fio_config)
154            job_name, _ = os.path.splitext(fio_config)
155
156            # Look in the filename for rwmixread value. Function arguments do
157            # not have that information.
158            # TODO: Improve this function by directly using workload params instead
159            # of regexing through filenames.
160            if "read" in job_name:
161                rw_mixread = 1
162            elif "write" in job_name:
163                rw_mixread = 0
164            else:
165                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
166
167            # If "_CPU" exists in name - ignore it
168            # Initiators for the same job could have diffrent num_cores parameter
169            job_name = re.sub(r"_\d+CPU", "", job_name)
170            job_result_files = [x for x in json_files if job_name in x]
171            self.log_print("Matching result files for current fio config:")
172            for j in job_result_files:
173                self.log_print("\t %s" % j)
174
175            # There may have been more than 1 initiator used in test, need to check that
176            # Result files are created so that string after last "_" separator is server name
177            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
178            inits_avg_results = []
179            for i in inits_names:
180                self.log_print("\tGetting stats for initiator %s" % i)
181                # There may have been more than 1 test run for this job, calculate average results for initiator
182                i_results = [x for x in job_result_files if i in x]
183                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
184
185                separate_stats = []
186                for r in i_results:
187                    stats = self.read_json_stats(os.path.join(results_dir, r))
188                    separate_stats.append(stats)
189                    self.log_print(stats)
190
191                init_results = [sum(x) for x in zip(*separate_stats)]
192                init_results = [x / len(separate_stats) for x in init_results]
193                inits_avg_results.append(init_results)
194
195                self.log_print("\tAverage results for initiator %s" % i)
196                self.log_print(init_results)
197                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
198                    fh.write(header_line + "\n")
199                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
200
201            # Sum results of all initiators running this FIO job.
202            # Latency results are an average of latencies from accros all initiators.
203            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
204            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
205            for key in inits_avg_results:
206                if "lat" in key:
207                    inits_avg_results[key] /= len(inits_names)
208
209            # Aggregate separate read/write values into common labels
210            # Take rw_mixread into consideration for mixed read/write workloads.
211            aggregate_results = OrderedDict()
212            for h in aggr_headers:
213                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
214                if "lat" in h:
215                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
216                else:
217                    _ = read_stat + write_stat
218                aggregate_results[h] = "{0:.3f}".format(_)
219
220            rows.add(",".join([job_name, *aggregate_results.values()]))
221
222        # Save results to file
223        for row in rows:
224            with open(os.path.join(results_dir, csv_file), "a") as fh:
225                fh.write(row + "\n")
226        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
227
228    def measure_sar(self, results_dir, sar_file_name):
229        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
230        time.sleep(self.sar_delay)
231        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
232        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
233            for line in out.split("\n"):
234                if "Average" in line and "CPU" in line:
235                    self.log_print("Summary CPU utilization from SAR:")
236                    self.log_print(line)
237                if "Average" in line and "all" in line:
238                    self.log_print(line)
239            fh.write(out)
240
241    def measure_pcm_memory(self, results_dir, pcm_file_name):
242        time.sleep(self.pcm_delay)
243        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
244                                      results_dir, pcm_file_name), shell=True)
245        time.sleep(self.pcm_count)
246        pcm_memory.kill()
247
248    def measure_pcm(self, results_dir, pcm_file_name):
249        time.sleep(self.pcm_delay)
250        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
251                       results_dir, pcm_file_name), shell=True, check=True)
252        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
253        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
254        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
255        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
256        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
257
258    def measure_bandwidth(self, results_dir, bandwidth_file_name):
259        bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name,
260                             self.bandwidth_count), shell=True, check=True)
261
262
263class Initiator(Server):
264    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
265                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
266                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
267
268        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
269
270        self.ip = ip
271        self.spdk_dir = workspace
272        if os.getenv('SPDK_WORKSPACE'):
273            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
274        self.fio_bin = fio_bin
275        self.cpus_allowed = cpus_allowed
276        self.cpus_allowed_policy = cpus_allowed_policy
277        self.cpu_frequency = cpu_frequency
278        self.nvmecli_bin = nvmecli_bin
279        self.ssh_connection = paramiko.SSHClient()
280        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
281        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
282        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
283        self.remote_call("mkdir -p %s" % self.spdk_dir)
284        self.set_cpu_frequency()
285
286    def __del__(self):
287        self.ssh_connection.close()
288
289    def put_file(self, local, remote_dest):
290        ftp = self.ssh_connection.open_sftp()
291        ftp.put(local, remote_dest)
292        ftp.close()
293
294    def get_file(self, remote, local_dest):
295        ftp = self.ssh_connection.open_sftp()
296        ftp.get(remote, local_dest)
297        ftp.close()
298
299    def remote_call(self, cmd):
300        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
301        out = stdout.read().decode(encoding="utf-8")
302        err = stderr.read().decode(encoding="utf-8")
303        return out, err
304
305    def copy_result_files(self, dest_dir):
306        self.log_print("Copying results")
307
308        if not os.path.exists(dest_dir):
309            os.mkdir(dest_dir)
310
311        # Get list of result files from initiator and copy them back to target
312        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
313        file_list = stdout.strip().split("\n")
314
315        for file in file_list:
316            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
317                          os.path.join(dest_dir, file))
318        self.log_print("Done copying results")
319
320    def discover_subsystems(self, address_list, subsys_no):
321        num_nvmes = range(0, subsys_no)
322        nvme_discover_output = ""
323        for ip, subsys_no in itertools.product(address_list, num_nvmes):
324            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
325            nvme_discover_cmd = ["sudo",
326                                 "%s" % self.nvmecli_bin,
327                                 "discover", "-t %s" % self.transport,
328                                 "-s %s" % (4420 + subsys_no),
329                                 "-a %s" % ip]
330            nvme_discover_cmd = " ".join(nvme_discover_cmd)
331
332            stdout, stderr = self.remote_call(nvme_discover_cmd)
333            if stdout:
334                nvme_discover_output = nvme_discover_output + stdout
335
336        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
337                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
338                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
339                                nvme_discover_output)  # from nvme discovery output
340        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
341        subsystems = list(set(subsystems))
342        subsystems.sort(key=lambda x: x[1])
343        self.log_print("Found matching subsystems on target side:")
344        for s in subsystems:
345            self.log_print(s)
346
347        return subsystems
348
349    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
350        fio_conf_template = """
351[global]
352ioengine={ioengine}
353{spdk_conf}
354thread=1
355group_reporting=1
356direct=1
357percentile_list=50:90:99:99.5:99.9:99.99:99.999
358
359norandommap=1
360rw={rw}
361rwmixread={rwmixread}
362bs={block_size}
363time_based=1
364ramp_time={ramp_time}
365runtime={run_time}
366"""
367        if "spdk" in self.mode:
368            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
369            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
370            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
371            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
372            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
373        else:
374            ioengine = "libaio"
375            spdk_conf = ""
376            out, err = self.remote_call("sudo nvme list | grep 'SPDK' | awk '{print $1}'")
377            subsystems = [x for x in out.split("\n") if "nvme" in x]
378
379        if self.cpus_allowed is not None:
380            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
381            cpus_num = 0
382            cpus = self.cpus_allowed.split(",")
383            for cpu in cpus:
384                if "-" in cpu:
385                    a, b = cpu.split("-")
386                    a = int(a)
387                    b = int(b)
388                    cpus_num += len(range(a, b))
389                else:
390                    cpus_num += 1
391            threads = range(0, cpus_num)
392        elif hasattr(self, 'num_cores'):
393            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
394            threads = range(0, int(self.num_cores))
395        else:
396            threads = range(0, len(subsystems))
397
398        if "spdk" in self.mode:
399            filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs)
400        else:
401            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
402
403        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
404                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
405                                              ramp_time=ramp_time, run_time=run_time)
406        if num_jobs:
407            fio_config = fio_config + "numjobs=%s \n" % num_jobs
408        if self.cpus_allowed is not None:
409            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
410            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
411        fio_config = fio_config + filename_section
412
413        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
414        if hasattr(self, "num_cores"):
415            fio_config_filename += "_%sCPU" % self.num_cores
416        fio_config_filename += ".fio"
417
418        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
419        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
420        self.log_print("Created FIO Config:")
421        self.log_print(fio_config)
422
423        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
424
425    def set_cpu_frequency(self):
426        if self.cpu_frequency is not None:
427            try:
428                self.remote_call('sudo cpupower frequency-set -g userspace')
429                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
430                cmd = "sudo cpupower frequency-info"
431                output, error = self.remote_call(cmd)
432                self.log_print(output)
433                self.log_print(error)
434            except Exception:
435                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
436                sys.exit()
437        else:
438            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
439
440    def run_fio(self, fio_config_file, run_num=None):
441        job_name, _ = os.path.splitext(fio_config_file)
442        self.log_print("Starting FIO run for job: %s" % job_name)
443        self.log_print("Using FIO: %s" % self.fio_bin)
444
445        if run_num:
446            for i in range(1, run_num + 1):
447                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
448                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
449                output, error = self.remote_call(cmd)
450                self.log_print(output)
451                self.log_print(error)
452        else:
453            output_filename = job_name + "_" + self.name + ".json"
454            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
455            output, error = self.remote_call(cmd)
456            self.log_print(output)
457            self.log_print(error)
458        self.log_print("FIO run finished. Results in: %s" % output_filename)
459
460
461class KernelTarget(Target):
462    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
463                 use_null_block=False, sar_settings=None, pcm_settings=None,
464                 bandwidth_settings=None, nvmet_bin="nvmetcli", **kwargs):
465
466        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
467                                           use_null_block, sar_settings, pcm_settings, bandwidth_settings)
468        self.nvmet_bin = nvmet_bin
469
470    def __del__(self):
471        nvmet_command(self.nvmet_bin, "clear")
472
473    def kernel_tgt_gen_nullblock_conf(self, address):
474        nvmet_cfg = {
475            "ports": [],
476            "hosts": [],
477            "subsystems": [],
478        }
479
480        nvmet_cfg["subsystems"].append({
481            "allowed_hosts": [],
482            "attr": {
483                "allow_any_host": "1",
484                "serial": "SPDK0001"
485                "version": "1.3"
486            },
487            "namespaces": [
488                {
489                    "device": {
490                        "path": "/dev/nullb0",
491                        "uuid": "%s" % uuid.uuid4()
492                    },
493                    "enable": 1,
494                    "nsid": 1
495                }
496            ],
497            "nqn": "nqn.2018-09.io.spdk:cnode1"
498        })
499
500        nvmet_cfg["ports"].append({
501            "addr": {
502                "adrfam": "ipv4",
503                "traddr": address,
504                "trsvcid": "4420",
505                "trtype": "%s" % self.transport,
506            },
507            "portid": 1,
508            "referrals": [],
509            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
510        })
511        with open("kernel.conf", 'w') as fh:
512            fh.write(json.dumps(nvmet_cfg, indent=2))
513
514    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
515
516        nvmet_cfg = {
517            "ports": [],
518            "hosts": [],
519            "subsystems": [],
520        }
521
522        # Split disks between NIC IP's
523        disks_per_ip = int(len(nvme_list) / len(address_list))
524        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
525
526        subsys_no = 1
527        port_no = 0
528        for ip, chunk in zip(address_list, disk_chunks):
529            for disk in chunk:
530                nvmet_cfg["subsystems"].append({
531                    "allowed_hosts": [],
532                    "attr": {
533                        "allow_any_host": "1",
534                        "serial": "SPDK00%s" % subsys_no
535                        "version": "1.3"
536                    },
537                    "namespaces": [
538                        {
539                            "device": {
540                                "path": disk,
541                                "uuid": "%s" % uuid.uuid4()
542                            },
543                            "enable": 1,
544                            "nsid": subsys_no
545                        }
546                    ],
547                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
548                })
549
550                nvmet_cfg["ports"].append({
551                    "addr": {
552                        "adrfam": "ipv4",
553                        "traddr": ip,
554                        "trsvcid": "%s" % (4420 + port_no),
555                        "trtype": "%s" % self.transport
556                    },
557                    "portid": subsys_no,
558                    "referrals": [],
559                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
560                })
561                subsys_no += 1
562                port_no += 1
563
564        with open("kernel.conf", "w") as fh:
565            fh.write(json.dumps(nvmet_cfg, indent=2))
566        pass
567
568    def tgt_start(self):
569        self.log_print("Configuring kernel NVMeOF Target")
570        self.subsys_no = get_nvme_devices_count()
571
572        if self.null_block:
573            print("Configuring with null block device.")
574            if len(self.nic_ips) > 1:
575                print("Testing with null block limited to single RDMA NIC.")
576                print("Please specify only 1 IP address.")
577                exit(1)
578            self.subsys_no = 1
579            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
580        else:
581            print("Configuring with NVMe drives.")
582            nvme_list = get_nvme_devices()
583            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
584            self.subsys_no = len(nvme_list)
585
586        nvmet_command(self.nvmet_bin, "clear")
587        nvmet_command(self.nvmet_bin, "restore kernel.conf")
588        self.log_print("Done configuring kernel NVMeOF Target")
589
590
591class SPDKTarget(Target):
592
593    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
594                 use_null_block=False, sar_settings=None, pcm_settings=None,
595                 bandwidth_settings=None, num_shared_buffers=4096, num_cores=1, **kwargs):
596
597        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
598                                         use_null_block, sar_settings, pcm_settings, bandwidth_settings)
599        self.num_cores = num_cores
600        self.num_shared_buffers = num_shared_buffers
601
602    def spdk_tgt_configure(self):
603        self.log_print("Configuring SPDK NVMeOF target via RPC")
604        numa_list = get_used_numa_nodes()
605
606        # Create RDMA transport layer
607        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
608        self.log_print("SPDK NVMeOF transport layer:")
609        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
610
611        if self.null_block:
612            nvme_section = self.spdk_tgt_add_nullblock()
613            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
614        else:
615            nvme_section = self.spdk_tgt_add_nvme_conf()
616            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
617        self.log_print("Done configuring SPDK NVMeOF Target")
618
619    def spdk_tgt_add_nullblock(self):
620        self.log_print("Adding null block bdev to config via RPC")
621        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
622        self.log_print("SPDK Bdevs configuration:")
623        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
624
625    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
626        self.log_print("Adding NVMe bdevs to config via RPC")
627
628        bdfs = get_nvme_devices_bdf()
629        bdfs = [b.replace(":", ".") for b in bdfs]
630
631        if req_num_disks:
632            if req_num_disks > len(bdfs):
633                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
634                sys.exit(1)
635            else:
636                bdfs = bdfs[0:req_num_disks]
637
638        for i, bdf in enumerate(bdfs):
639            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
640
641        self.log_print("SPDK Bdevs configuration:")
642        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
643
644    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
645        self.log_print("Adding subsystems to config")
646        if not req_num_disks:
647            req_num_disks = get_nvme_devices_count()
648
649        # Distribute bdevs between provided NICs
650        num_disks = range(0, req_num_disks)
651        if len(num_disks) == 1:
652            disks_per_ip = 1
653        else:
654            disks_per_ip = int(len(num_disks) / len(ips))
655        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
656
657        # Create subsystems, add bdevs to namespaces, add listeners
658        for ip, chunk in zip(ips, disk_chunks):
659            for c in chunk:
660                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
661                serial = "SPDK00%s" % c
662                bdev_name = "Nvme%sn1" % c
663                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
664                                               allow_any_host=True, max_namespaces=8)
665                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
666
667                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
668                                                     trtype=self.transport,
669                                                     traddr=ip,
670                                                     trsvcid="4420",
671                                                     adrfam="ipv4")
672
673        self.log_print("SPDK NVMeOF subsystem configuration:")
674        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
675
676    def tgt_start(self):
677        self.subsys_no = get_nvme_devices_count()
678        if self.null_block:
679            self.subsys_no = 1
680        self.log_print("Starting SPDK NVMeOF Target process")
681        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
682        command = " ".join([nvmf_app_path, "-m", self.num_cores])
683        proc = subprocess.Popen(command, shell=True)
684        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
685
686        with open(self.pid, "w") as fh:
687            fh.write(str(proc.pid))
688        self.nvmf_proc = proc
689        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
690        self.log_print("Waiting for spdk to initilize...")
691        while True:
692            if os.path.exists("/var/tmp/spdk.sock"):
693                break
694            time.sleep(1)
695        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
696
697        self.spdk_tgt_configure()
698
699    def __del__(self):
700        if hasattr(self, "nvmf_proc"):
701            try:
702                self.nvmf_proc.terminate()
703                self.nvmf_proc.wait()
704            except Exception as e:
705                self.log_print(e)
706                self.nvmf_proc.kill()
707                self.nvmf_proc.communicate()
708
709
710class KernelInitiator(Initiator):
711    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
712                 cpus_allowed=None, cpus_allowed_policy="shared",
713                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
714
715        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
716                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
717                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
718
719        self.extra_params = ""
720        if kwargs["extra_params"]:
721            self.extra_params = kwargs["extra_params"]
722
723    def __del__(self):
724        self.ssh_connection.close()
725
726    def kernel_init_connect(self, address_list, subsys_no):
727        subsystems = self.discover_subsystems(address_list, subsys_no)
728        self.log_print("Below connection attempts may result in error messages, this is expected!")
729        for subsystem in subsystems:
730            self.log_print("Trying to connect %s %s %s" % subsystem)
731            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
732                                                                             self.transport,
733                                                                             *subsystem,
734                                                                             self.extra_params))
735            time.sleep(2)
736
737    def kernel_init_disconnect(self, address_list, subsys_no):
738        subsystems = self.discover_subsystems(address_list, subsys_no)
739        for subsystem in subsystems:
740            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
741            time.sleep(1)
742
743    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
744        out, err = self.remote_call("sudo nvme list | grep 'SPDK' | awk '{print $1}'")
745        nvme_list = [x for x in out.split("\n") if "nvme" in x]
746
747        filename_section = ""
748        nvme_per_split = int(len(nvme_list) / len(threads))
749        remainder = len(nvme_list) % len(threads)
750        iterator = iter(nvme_list)
751        result = []
752        for i in range(len(threads)):
753            result.append([])
754            for j in range(nvme_per_split):
755                result[i].append(next(iterator))
756                if remainder:
757                    result[i].append(next(iterator))
758                    remainder -= 1
759        for i, r in enumerate(result):
760            header = "[filename%s]" % i
761            disks = "\n".join(["filename=%s" % x for x in r])
762            job_section_qd = round((io_depth * len(r)) / num_jobs)
763            iodepth = "iodepth=%s" % job_section_qd
764            filename_section = "\n".join([filename_section, header, disks, iodepth])
765
766        return filename_section
767
768
769class SPDKInitiator(Initiator):
770    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
771                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
772                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
773        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
774                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
775                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
776
777        self.num_cores = num_cores
778
779    def install_spdk(self, local_spdk_zip):
780        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
781        self.log_print("Copied sources zip from target")
782        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
783
784        self.log_print("Sources unpacked")
785        self.log_print("Using fio binary %s" % self.fio_bin)
786        self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;"
787                         "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
788
789        self.log_print("SPDK built")
790        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
791
792    def gen_spdk_bdev_conf(self, remote_subsystem_list):
793        header = "[Nvme]"
794        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
795
796        bdev_rows = [row_template.format(transport=self.transport,
797                                         svc=x[0],
798                                         nqn=x[1],
799                                         ip=x[2],
800                                         i=i) for i, x in enumerate(remote_subsystem_list)]
801        bdev_rows = "\n".join(bdev_rows)
802        bdev_section = "\n".join([header, bdev_rows])
803        return bdev_section
804
805    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
806        filename_section = ""
807        if len(threads) >= len(subsystems):
808            threads = range(0, len(subsystems))
809        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
810        nvme_per_split = int(len(subsystems) / len(threads))
811        remainder = len(subsystems) % len(threads)
812        iterator = iter(filenames)
813        result = []
814        for i in range(len(threads)):
815            result.append([])
816            for j in range(nvme_per_split):
817                result[i].append(next(iterator))
818            if remainder:
819                result[i].append(next(iterator))
820                remainder -= 1
821        for i, r in enumerate(result):
822            header = "[filename%s]" % i
823            disks = "\n".join(["filename=%s" % x for x in r])
824            job_section_qd = round((io_depth * len(r)) / num_jobs)
825            iodepth = "iodepth=%s" % job_section_qd
826            filename_section = "\n".join([filename_section, header, disks, iodepth])
827
828        return filename_section
829
830
831if __name__ == "__main__":
832    spdk_zip_path = "/tmp/spdk.zip"
833    target_results_dir = "/tmp/results"
834
835    if (len(sys.argv) > 1):
836        config_file_path = sys.argv[1]
837    else:
838        script_full_dir = os.path.dirname(os.path.realpath(__file__))
839        config_file_path = os.path.join(script_full_dir, "config.json")
840
841    print("Using config file: %s" % config_file_path)
842    with open(config_file_path, "r") as config:
843        data = json.load(config)
844
845    initiators = []
846    fio_cases = []
847
848    for k, v in data.items():
849        if "target" in k:
850            if data[k]["mode"] == "spdk":
851                target_obj = SPDKTarget(name=k, **data["general"], **v)
852            elif data[k]["mode"] == "kernel":
853                target_obj = KernelTarget(name=k, **data["general"], **v)
854        elif "initiator" in k:
855            if data[k]["mode"] == "spdk":
856                init_obj = SPDKInitiator(name=k, **data["general"], **v)
857            elif data[k]["mode"] == "kernel":
858                init_obj = KernelInitiator(name=k, **data["general"], **v)
859            initiators.append(init_obj)
860        elif "fio" in k:
861            fio_workloads = itertools.product(data[k]["bs"],
862                                              data[k]["qd"],
863                                              data[k]["rw"])
864
865            fio_run_time = data[k]["run_time"]
866            fio_ramp_time = data[k]["ramp_time"]
867            fio_rw_mix_read = data[k]["rwmixread"]
868            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
869            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
870        else:
871            continue
872
873    # Copy and install SPDK on remote initiators
874    if "skip_spdk_install" not in data["general"]:
875        target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
876        threads = []
877        for i in initiators:
878            if i.mode == "spdk":
879                t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
880                threads.append(t)
881                t.start()
882        for t in threads:
883            t.join()
884
885    target_obj.tgt_start()
886
887    # Poor mans threading
888    # Run FIO tests
889    for block_size, io_depth, rw in fio_workloads:
890        threads = []
891        configs = []
892        for i in initiators:
893            if i.mode == "kernel":
894                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
895
896            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
897                                   fio_num_jobs, fio_ramp_time, fio_run_time)
898            configs.append(cfg)
899
900        for i, cfg in zip(initiators, configs):
901            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
902            threads.append(t)
903        if target_obj.enable_sar:
904            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
905            sar_file_name = ".".join([sar_file_name, "txt"])
906            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
907            threads.append(t)
908
909        if target_obj.enable_pcm:
910            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
911            pcm_file_name = ".".join([pcm_file_name, "csv"])
912            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
913            threads.append(t)
914
915        if target_obj.enable_pcm_memory:
916            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
917            pcm_file_name = ".".join([pcm_file_name, "csv"])
918            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
919            threads.append(t)
920
921        if target_obj.enable_bandwidth:
922            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
923            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
924            t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,))
925            threads.append(t)
926
927        for t in threads:
928            t.start()
929        for t in threads:
930            t.join()
931
932        for i in initiators:
933            if i.mode == "kernel":
934                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
935            i.copy_result_files(target_results_dir)
936
937    target_obj.parse_results(target_results_dir)
938