xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 0ed85362c8132a2d1927757fbcade66b6660d26a)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from collections import OrderedDict
18from common import *
19
20
21class Server:
22    def __init__(self, name, username, password, mode, nic_ips, transport):
23        self.name = name
24        self.mode = mode
25        self.username = username
26        self.password = password
27        self.nic_ips = nic_ips
28        self.transport = transport.lower()
29
30        if not re.match("^[A-Za-z0-9]*$", name):
31            self.log_print("Please use a name which contains only letters or numbers")
32            sys.exit(1)
33
34    def log_print(self, msg):
35        print("[%s] %s" % (self.name, msg), flush=True)
36
37
38class Target(Server):
39    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
40                 use_null_block=False, sar_settings=None, pcm_settings=None,
41                 bandwidth_settings=None, dpdk_settings=None):
42
43        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
44        self.null_block = bool(use_null_block)
45        self.enable_sar = False
46        self.enable_pcm_memory = False
47        self.enable_pcm = False
48        self.enable_bandwidth = False
49        self.enable_dpdk_memory = False
50
51        if sar_settings:
52            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
53
54        if pcm_settings:
55            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
56
57        if bandwidth_settings:
58            self.enable_bandwidth, self.bandwidth_count = bandwidth_settings
59
60        if dpdk_settings:
61            self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings
62
63        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
64        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
65
66    def zip_spdk_sources(self, spdk_dir, dest_file):
67        self.log_print("Zipping SPDK source directory")
68        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
69        for root, directories, files in os.walk(spdk_dir, followlinks=True):
70            for file in files:
71                fh.write(os.path.relpath(os.path.join(root, file)))
72        fh.close()
73        self.log_print("Done zipping")
74
75    def read_json_stats(self, file):
76        with open(file, "r") as json_data:
77            data = json.load(json_data)
78            job_pos = 0  # job_post = 0 because using aggregated results
79
80            # Check if latency is in nano or microseconds to choose correct dict key
81            def get_lat_unit(key_prefix, dict_section):
82                # key prefix - lat, clat or slat.
83                # dict section - portion of json containing latency bucket in question
84                # Return dict key to access the bucket and unit as string
85                for k, v in dict_section.items():
86                    if k.startswith(key_prefix):
87                        return k, k.split("_")[1]
88
89            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
90            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
91            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
92            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
93            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
94            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
95            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
96            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
97            read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
98            read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
99            read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
100
101            if "ns" in lat_unit:
102                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
103            if "ns" in clat_unit:
104                read_p99_lat = read_p99_lat / 1000
105                read_p99_9_lat = read_p99_9_lat / 1000
106                read_p99_99_lat = read_p99_99_lat / 1000
107                read_p99_999_lat = read_p99_999_lat / 1000
108
109            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
110            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
111            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
112            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
113            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
114            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
115            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
116            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
117            write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
118            write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
119            write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
120
121            if "ns" in lat_unit:
122                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
123            if "ns" in clat_unit:
124                write_p99_lat = write_p99_lat / 1000
125                write_p99_9_lat = write_p99_9_lat / 1000
126                write_p99_99_lat = write_p99_99_lat / 1000
127                write_p99_999_lat = write_p99_999_lat / 1000
128
129        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
130                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
131                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
132                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
133
134    def parse_results(self, results_dir, initiator_count=None, run_num=None):
135        files = os.listdir(results_dir)
136        fio_files = filter(lambda x: ".fio" in x, files)
137        json_files = [x for x in files if ".json" in x]
138
139        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
140                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
141                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
142                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
143
144        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
145                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
146
147        header_line = ",".join(["Name", *headers])
148        aggr_header_line = ",".join(["Name", *aggr_headers])
149
150        # Create empty results file
151        csv_file = "nvmf_results.csv"
152        with open(os.path.join(results_dir, csv_file), "w") as fh:
153            fh.write(aggr_header_line + "\n")
154        rows = set()
155
156        for fio_config in fio_files:
157            self.log_print("Getting FIO stats for %s" % fio_config)
158            job_name, _ = os.path.splitext(fio_config)
159
160            # Look in the filename for rwmixread value. Function arguments do
161            # not have that information.
162            # TODO: Improve this function by directly using workload params instead
163            # of regexing through filenames.
164            if "read" in job_name:
165                rw_mixread = 1
166            elif "write" in job_name:
167                rw_mixread = 0
168            else:
169                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
170
171            # If "_CPU" exists in name - ignore it
172            # Initiators for the same job could have diffrent num_cores parameter
173            job_name = re.sub(r"_\d+CPU", "", job_name)
174            job_result_files = [x for x in json_files if job_name in x]
175            self.log_print("Matching result files for current fio config:")
176            for j in job_result_files:
177                self.log_print("\t %s" % j)
178
179            # There may have been more than 1 initiator used in test, need to check that
180            # Result files are created so that string after last "_" separator is server name
181            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
182            inits_avg_results = []
183            for i in inits_names:
184                self.log_print("\tGetting stats for initiator %s" % i)
185                # There may have been more than 1 test run for this job, calculate average results for initiator
186                i_results = [x for x in job_result_files if i in x]
187                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
188
189                separate_stats = []
190                for r in i_results:
191                    stats = self.read_json_stats(os.path.join(results_dir, r))
192                    separate_stats.append(stats)
193                    self.log_print(stats)
194
195                init_results = [sum(x) for x in zip(*separate_stats)]
196                init_results = [x / len(separate_stats) for x in init_results]
197                inits_avg_results.append(init_results)
198
199                self.log_print("\tAverage results for initiator %s" % i)
200                self.log_print(init_results)
201                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
202                    fh.write(header_line + "\n")
203                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
204
205            # Sum results of all initiators running this FIO job.
206            # Latency results are an average of latencies from accros all initiators.
207            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
208            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
209            for key in inits_avg_results:
210                if "lat" in key:
211                    inits_avg_results[key] /= len(inits_names)
212
213            # Aggregate separate read/write values into common labels
214            # Take rw_mixread into consideration for mixed read/write workloads.
215            aggregate_results = OrderedDict()
216            for h in aggr_headers:
217                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
218                if "lat" in h:
219                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
220                else:
221                    _ = read_stat + write_stat
222                aggregate_results[h] = "{0:.3f}".format(_)
223
224            rows.add(",".join([job_name, *aggregate_results.values()]))
225
226        # Save results to file
227        for row in rows:
228            with open(os.path.join(results_dir, csv_file), "a") as fh:
229                fh.write(row + "\n")
230        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
231
232    def measure_sar(self, results_dir, sar_file_name):
233        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
234        time.sleep(self.sar_delay)
235        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
236        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
237            for line in out.split("\n"):
238                if "Average" in line and "CPU" in line:
239                    self.log_print("Summary CPU utilization from SAR:")
240                    self.log_print(line)
241                if "Average" in line and "all" in line:
242                    self.log_print(line)
243            fh.write(out)
244
245    def measure_pcm_memory(self, results_dir, pcm_file_name):
246        time.sleep(self.pcm_delay)
247        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
248                                      results_dir, pcm_file_name), shell=True)
249        time.sleep(self.pcm_count)
250        pcm_memory.kill()
251
252    def measure_pcm(self, results_dir, pcm_file_name):
253        time.sleep(self.pcm_delay)
254        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
255                       results_dir, pcm_file_name), shell=True, check=True)
256        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
257        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
258        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
259        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
260        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
261
262    def measure_bandwidth(self, results_dir, bandwidth_file_name):
263        bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name,
264                             self.bandwidth_count), shell=True, check=True)
265
266    def measure_dpdk_memory(self, results_dir):
267        self.log_print("INFO: waiting to generate DPDK memory usage")
268        time.sleep(self.dpdk_wait_time)
269        self.log_print("INFO: generating DPDK memory usage")
270        rpc.env.env_dpdk_get_mem_stats
271        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
272
273
274class Initiator(Server):
275    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
276                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
277                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
278
279        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
280
281        self.ip = ip
282        self.spdk_dir = workspace
283        if os.getenv('SPDK_WORKSPACE'):
284            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
285        self.fio_bin = fio_bin
286        self.cpus_allowed = cpus_allowed
287        self.cpus_allowed_policy = cpus_allowed_policy
288        self.cpu_frequency = cpu_frequency
289        self.nvmecli_bin = nvmecli_bin
290        self.ssh_connection = paramiko.SSHClient()
291        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
292        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
293        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
294        self.remote_call("mkdir -p %s" % self.spdk_dir)
295        self.set_cpu_frequency()
296
297    def __del__(self):
298        self.ssh_connection.close()
299
300    def put_file(self, local, remote_dest):
301        ftp = self.ssh_connection.open_sftp()
302        ftp.put(local, remote_dest)
303        ftp.close()
304
305    def get_file(self, remote, local_dest):
306        ftp = self.ssh_connection.open_sftp()
307        ftp.get(remote, local_dest)
308        ftp.close()
309
310    def remote_call(self, cmd):
311        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
312        out = stdout.read().decode(encoding="utf-8")
313        err = stderr.read().decode(encoding="utf-8")
314        return out, err
315
316    def copy_result_files(self, dest_dir):
317        self.log_print("Copying results")
318
319        if not os.path.exists(dest_dir):
320            os.mkdir(dest_dir)
321
322        # Get list of result files from initiator and copy them back to target
323        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
324        file_list = stdout.strip().split("\n")
325
326        for file in file_list:
327            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
328                          os.path.join(dest_dir, file))
329        self.log_print("Done copying results")
330
331    def discover_subsystems(self, address_list, subsys_no):
332        num_nvmes = range(0, subsys_no)
333        nvme_discover_output = ""
334        for ip, subsys_no in itertools.product(address_list, num_nvmes):
335            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
336            nvme_discover_cmd = ["sudo",
337                                 "%s" % self.nvmecli_bin,
338                                 "discover", "-t %s" % self.transport,
339                                 "-s %s" % (4420 + subsys_no),
340                                 "-a %s" % ip]
341            nvme_discover_cmd = " ".join(nvme_discover_cmd)
342
343            stdout, stderr = self.remote_call(nvme_discover_cmd)
344            if stdout:
345                nvme_discover_output = nvme_discover_output + stdout
346
347        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
348                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
349                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
350                                nvme_discover_output)  # from nvme discovery output
351        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
352        subsystems = list(set(subsystems))
353        subsystems.sort(key=lambda x: x[1])
354        self.log_print("Found matching subsystems on target side:")
355        for s in subsystems:
356            self.log_print(s)
357
358        return subsystems
359
360    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
361        fio_conf_template = """
362[global]
363ioengine={ioengine}
364{spdk_conf}
365thread=1
366group_reporting=1
367direct=1
368percentile_list=50:90:99:99.5:99.9:99.99:99.999
369
370norandommap=1
371rw={rw}
372rwmixread={rwmixread}
373bs={block_size}
374time_based=1
375ramp_time={ramp_time}
376runtime={run_time}
377"""
378        if "spdk" in self.mode:
379            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
380            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
381            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
382            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
383            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
384        else:
385            ioengine = "libaio"
386            spdk_conf = ""
387            out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
388            subsystems = [x for x in out.split("\n") if "nvme" in x]
389
390        if self.cpus_allowed is not None:
391            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
392            cpus_num = 0
393            cpus = self.cpus_allowed.split(",")
394            for cpu in cpus:
395                if "-" in cpu:
396                    a, b = cpu.split("-")
397                    a = int(a)
398                    b = int(b)
399                    cpus_num += len(range(a, b))
400                else:
401                    cpus_num += 1
402            threads = range(0, cpus_num)
403        elif hasattr(self, 'num_cores'):
404            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
405            threads = range(0, int(self.num_cores))
406        else:
407            threads = range(0, len(subsystems))
408
409        if "spdk" in self.mode:
410            filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs)
411        else:
412            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
413
414        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
415                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
416                                              ramp_time=ramp_time, run_time=run_time)
417        if num_jobs:
418            fio_config = fio_config + "numjobs=%s \n" % num_jobs
419        if self.cpus_allowed is not None:
420            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
421            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
422        fio_config = fio_config + filename_section
423
424        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
425        if hasattr(self, "num_cores"):
426            fio_config_filename += "_%sCPU" % self.num_cores
427        fio_config_filename += ".fio"
428
429        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
430        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
431        self.log_print("Created FIO Config:")
432        self.log_print(fio_config)
433
434        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
435
436    def set_cpu_frequency(self):
437        if self.cpu_frequency is not None:
438            try:
439                self.remote_call('sudo cpupower frequency-set -g userspace')
440                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
441                cmd = "sudo cpupower frequency-info"
442                output, error = self.remote_call(cmd)
443                self.log_print(output)
444                self.log_print(error)
445            except Exception:
446                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
447                sys.exit()
448        else:
449            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
450
451    def run_fio(self, fio_config_file, run_num=None):
452        job_name, _ = os.path.splitext(fio_config_file)
453        self.log_print("Starting FIO run for job: %s" % job_name)
454        self.log_print("Using FIO: %s" % self.fio_bin)
455
456        if run_num:
457            for i in range(1, run_num + 1):
458                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
459                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
460                output, error = self.remote_call(cmd)
461                self.log_print(output)
462                self.log_print(error)
463        else:
464            output_filename = job_name + "_" + self.name + ".json"
465            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
466            output, error = self.remote_call(cmd)
467            self.log_print(output)
468            self.log_print(error)
469        self.log_print("FIO run finished. Results in: %s" % output_filename)
470
471
472class KernelTarget(Target):
473    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
474                 use_null_block=False, sar_settings=None, pcm_settings=None,
475                 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs):
476
477        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
478                                           use_null_block, sar_settings, pcm_settings, bandwidth_settings,
479                                           dpdk_settings)
480        self.nvmet_bin = nvmet_bin
481
482    def __del__(self):
483        nvmet_command(self.nvmet_bin, "clear")
484
485    def kernel_tgt_gen_nullblock_conf(self, address):
486        nvmet_cfg = {
487            "ports": [],
488            "hosts": [],
489            "subsystems": [],
490        }
491
492        nvmet_cfg["subsystems"].append({
493            "allowed_hosts": [],
494            "attr": {
495                "allow_any_host": "1",
496                "serial": "SPDK0001",
497                "version": "1.3"
498            },
499            "namespaces": [
500                {
501                    "device": {
502                        "path": "/dev/nullb0",
503                        "uuid": "%s" % uuid.uuid4()
504                    },
505                    "enable": 1,
506                    "nsid": 1
507                }
508            ],
509            "nqn": "nqn.2018-09.io.spdk:cnode1"
510        })
511
512        nvmet_cfg["ports"].append({
513            "addr": {
514                "adrfam": "ipv4",
515                "traddr": address,
516                "trsvcid": "4420",
517                "trtype": "%s" % self.transport,
518            },
519            "portid": 1,
520            "referrals": [],
521            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
522        })
523        with open("kernel.conf", 'w') as fh:
524            fh.write(json.dumps(nvmet_cfg, indent=2))
525
526    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
527
528        nvmet_cfg = {
529            "ports": [],
530            "hosts": [],
531            "subsystems": [],
532        }
533
534        # Split disks between NIC IP's
535        disks_per_ip = int(len(nvme_list) / len(address_list))
536        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
537
538        subsys_no = 1
539        port_no = 0
540        for ip, chunk in zip(address_list, disk_chunks):
541            for disk in chunk:
542                nvmet_cfg["subsystems"].append({
543                    "allowed_hosts": [],
544                    "attr": {
545                        "allow_any_host": "1",
546                        "serial": "SPDK00%s" % subsys_no,
547                        "version": "1.3"
548                    },
549                    "namespaces": [
550                        {
551                            "device": {
552                                "path": disk,
553                                "uuid": "%s" % uuid.uuid4()
554                            },
555                            "enable": 1,
556                            "nsid": subsys_no
557                        }
558                    ],
559                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
560                })
561
562                nvmet_cfg["ports"].append({
563                    "addr": {
564                        "adrfam": "ipv4",
565                        "traddr": ip,
566                        "trsvcid": "%s" % (4420 + port_no),
567                        "trtype": "%s" % self.transport
568                    },
569                    "portid": subsys_no,
570                    "referrals": [],
571                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
572                })
573                subsys_no += 1
574                port_no += 1
575
576        with open("kernel.conf", "w") as fh:
577            fh.write(json.dumps(nvmet_cfg, indent=2))
578        pass
579
580    def tgt_start(self):
581        self.log_print("Configuring kernel NVMeOF Target")
582
583        if self.null_block:
584            print("Configuring with null block device.")
585            if len(self.nic_ips) > 1:
586                print("Testing with null block limited to single RDMA NIC.")
587                print("Please specify only 1 IP address.")
588                exit(1)
589            self.subsys_no = 1
590            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
591        else:
592            print("Configuring with NVMe drives.")
593            nvme_list = get_nvme_devices()
594            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
595            self.subsys_no = len(nvme_list)
596
597        nvmet_command(self.nvmet_bin, "clear")
598        nvmet_command(self.nvmet_bin, "restore kernel.conf")
599        self.log_print("Done configuring kernel NVMeOF Target")
600
601
602class SPDKTarget(Target):
603
604    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
605                 use_null_block=False, sar_settings=None, pcm_settings=None,
606                 bandwidth_settings=None, dpdk_settings=None, num_shared_buffers=4096,
607                 num_cores=1, **kwargs):
608
609        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
610                                         use_null_block, sar_settings, pcm_settings, bandwidth_settings,
611                                         dpdk_settings)
612        self.num_cores = num_cores
613        self.num_shared_buffers = num_shared_buffers
614
615    def spdk_tgt_configure(self):
616        self.log_print("Configuring SPDK NVMeOF target via RPC")
617        numa_list = get_used_numa_nodes()
618
619        # Create RDMA transport layer
620        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
621        self.log_print("SPDK NVMeOF transport layer:")
622        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
623
624        if self.null_block:
625            nvme_section = self.spdk_tgt_add_nullblock()
626            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
627        else:
628            nvme_section = self.spdk_tgt_add_nvme_conf()
629            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
630        self.log_print("Done configuring SPDK NVMeOF Target")
631
632    def spdk_tgt_add_nullblock(self):
633        self.log_print("Adding null block bdev to config via RPC")
634        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
635        self.log_print("SPDK Bdevs configuration:")
636        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
637
638    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
639        self.log_print("Adding NVMe bdevs to config via RPC")
640
641        bdfs = get_nvme_devices_bdf()
642        bdfs = [b.replace(":", ".") for b in bdfs]
643
644        if req_num_disks:
645            if req_num_disks > len(bdfs):
646                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
647                sys.exit(1)
648            else:
649                bdfs = bdfs[0:req_num_disks]
650
651        for i, bdf in enumerate(bdfs):
652            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
653
654        self.log_print("SPDK Bdevs configuration:")
655        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
656
657    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
658        self.log_print("Adding subsystems to config")
659        if not req_num_disks:
660            req_num_disks = get_nvme_devices_count()
661
662        # Distribute bdevs between provided NICs
663        num_disks = range(0, req_num_disks)
664        if len(num_disks) == 1:
665            disks_per_ip = 1
666        else:
667            disks_per_ip = int(len(num_disks) / len(ips))
668        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
669
670        # Create subsystems, add bdevs to namespaces, add listeners
671        for ip, chunk in zip(ips, disk_chunks):
672            for c in chunk:
673                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
674                serial = "SPDK00%s" % c
675                bdev_name = "Nvme%sn1" % c
676                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
677                                               allow_any_host=True, max_namespaces=8)
678                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
679
680                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
681                                                     trtype=self.transport,
682                                                     traddr=ip,
683                                                     trsvcid="4420",
684                                                     adrfam="ipv4")
685
686        self.log_print("SPDK NVMeOF subsystem configuration:")
687        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
688
689    def tgt_start(self):
690        if self.null_block:
691            self.subsys_no = 1
692        else:
693            self.subsys_no = get_nvme_devices_count()
694        self.log_print("Starting SPDK NVMeOF Target process")
695        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
696        command = " ".join([nvmf_app_path, "-m", self.num_cores])
697        proc = subprocess.Popen(command, shell=True)
698        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
699
700        with open(self.pid, "w") as fh:
701            fh.write(str(proc.pid))
702        self.nvmf_proc = proc
703        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
704        self.log_print("Waiting for spdk to initilize...")
705        while True:
706            if os.path.exists("/var/tmp/spdk.sock"):
707                break
708            time.sleep(1)
709        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
710
711        self.spdk_tgt_configure()
712
713    def __del__(self):
714        if hasattr(self, "nvmf_proc"):
715            try:
716                self.nvmf_proc.terminate()
717                self.nvmf_proc.wait()
718            except Exception as e:
719                self.log_print(e)
720                self.nvmf_proc.kill()
721                self.nvmf_proc.communicate()
722
723
724class KernelInitiator(Initiator):
725    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
726                 cpus_allowed=None, cpus_allowed_policy="shared",
727                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
728
729        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
730                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
731                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
732
733        self.extra_params = ""
734        if kwargs["extra_params"]:
735            self.extra_params = kwargs["extra_params"]
736
737    def __del__(self):
738        self.ssh_connection.close()
739
740    def kernel_init_connect(self, address_list, subsys_no):
741        subsystems = self.discover_subsystems(address_list, subsys_no)
742        self.log_print("Below connection attempts may result in error messages, this is expected!")
743        for subsystem in subsystems:
744            self.log_print("Trying to connect %s %s %s" % subsystem)
745            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
746                                                                             self.transport,
747                                                                             *subsystem,
748                                                                             self.extra_params))
749            time.sleep(2)
750
751    def kernel_init_disconnect(self, address_list, subsys_no):
752        subsystems = self.discover_subsystems(address_list, subsys_no)
753        for subsystem in subsystems:
754            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
755            time.sleep(1)
756
757    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
758        out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
759        nvme_list = [x for x in out.split("\n") if "nvme" in x]
760
761        filename_section = ""
762        nvme_per_split = int(len(nvme_list) / len(threads))
763        remainder = len(nvme_list) % len(threads)
764        iterator = iter(nvme_list)
765        result = []
766        for i in range(len(threads)):
767            result.append([])
768            for j in range(nvme_per_split):
769                result[i].append(next(iterator))
770                if remainder:
771                    result[i].append(next(iterator))
772                    remainder -= 1
773        for i, r in enumerate(result):
774            header = "[filename%s]" % i
775            disks = "\n".join(["filename=%s" % x for x in r])
776            job_section_qd = round((io_depth * len(r)) / num_jobs)
777            if job_section_qd == 0:
778                job_section_qd = 1
779            iodepth = "iodepth=%s" % job_section_qd
780            filename_section = "\n".join([filename_section, header, disks, iodepth])
781
782        return filename_section
783
784
785class SPDKInitiator(Initiator):
786    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
787                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
788                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
789        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
790                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
791                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
792
793        self.num_cores = num_cores
794
795    def install_spdk(self, local_spdk_zip):
796        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
797        self.log_print("Copied sources zip from target")
798        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
799
800        self.log_print("Sources unpacked")
801        self.log_print("Using fio binary %s" % self.fio_bin)
802        self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;"
803                         "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
804
805        self.log_print("SPDK built")
806        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
807
808    def gen_spdk_bdev_conf(self, remote_subsystem_list):
809        header = "[Nvme]"
810        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
811
812        bdev_rows = [row_template.format(transport=self.transport,
813                                         svc=x[0],
814                                         nqn=x[1],
815                                         ip=x[2],
816                                         i=i) for i, x in enumerate(remote_subsystem_list)]
817        bdev_rows = "\n".join(bdev_rows)
818        bdev_section = "\n".join([header, bdev_rows])
819        return bdev_section
820
821    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
822        filename_section = ""
823        if len(threads) >= len(subsystems):
824            threads = range(0, len(subsystems))
825        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
826        nvme_per_split = int(len(subsystems) / len(threads))
827        remainder = len(subsystems) % len(threads)
828        iterator = iter(filenames)
829        result = []
830        for i in range(len(threads)):
831            result.append([])
832            for j in range(nvme_per_split):
833                result[i].append(next(iterator))
834            if remainder:
835                result[i].append(next(iterator))
836                remainder -= 1
837        for i, r in enumerate(result):
838            header = "[filename%s]" % i
839            disks = "\n".join(["filename=%s" % x for x in r])
840            job_section_qd = round((io_depth * len(r)) / num_jobs)
841            if job_section_qd == 0:
842                job_section_qd = 1
843            iodepth = "iodepth=%s" % job_section_qd
844            filename_section = "\n".join([filename_section, header, disks, iodepth])
845
846        return filename_section
847
848
849if __name__ == "__main__":
850    spdk_zip_path = "/tmp/spdk.zip"
851    target_results_dir = "/tmp/results"
852
853    if (len(sys.argv) > 1):
854        config_file_path = sys.argv[1]
855    else:
856        script_full_dir = os.path.dirname(os.path.realpath(__file__))
857        config_file_path = os.path.join(script_full_dir, "config.json")
858
859    print("Using config file: %s" % config_file_path)
860    with open(config_file_path, "r") as config:
861        data = json.load(config)
862
863    initiators = []
864    fio_cases = []
865
866    for k, v in data.items():
867        if "target" in k:
868            if data[k]["mode"] == "spdk":
869                target_obj = SPDKTarget(name=k, **data["general"], **v)
870            elif data[k]["mode"] == "kernel":
871                target_obj = KernelTarget(name=k, **data["general"], **v)
872        elif "initiator" in k:
873            if data[k]["mode"] == "spdk":
874                init_obj = SPDKInitiator(name=k, **data["general"], **v)
875            elif data[k]["mode"] == "kernel":
876                init_obj = KernelInitiator(name=k, **data["general"], **v)
877            initiators.append(init_obj)
878        elif "fio" in k:
879            fio_workloads = itertools.product(data[k]["bs"],
880                                              data[k]["qd"],
881                                              data[k]["rw"])
882
883            fio_run_time = data[k]["run_time"]
884            fio_ramp_time = data[k]["ramp_time"]
885            fio_rw_mix_read = data[k]["rwmixread"]
886            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
887            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
888        else:
889            continue
890
891    # Copy and install SPDK on remote initiators
892    if "skip_spdk_install" not in data["general"]:
893        target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
894        threads = []
895        for i in initiators:
896            if i.mode == "spdk":
897                t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
898                threads.append(t)
899                t.start()
900        for t in threads:
901            t.join()
902
903    target_obj.tgt_start()
904
905    # Poor mans threading
906    # Run FIO tests
907    for block_size, io_depth, rw in fio_workloads:
908        threads = []
909        configs = []
910        for i in initiators:
911            if i.mode == "kernel":
912                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
913
914            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
915                                   fio_num_jobs, fio_ramp_time, fio_run_time)
916            configs.append(cfg)
917
918        for i, cfg in zip(initiators, configs):
919            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
920            threads.append(t)
921        if target_obj.enable_sar:
922            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
923            sar_file_name = ".".join([sar_file_name, "txt"])
924            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
925            threads.append(t)
926
927        if target_obj.enable_pcm:
928            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
929            pcm_file_name = ".".join([pcm_file_name, "csv"])
930            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
931            threads.append(t)
932
933        if target_obj.enable_pcm_memory:
934            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
935            pcm_file_name = ".".join([pcm_file_name, "csv"])
936            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
937            threads.append(t)
938
939        if target_obj.enable_bandwidth:
940            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
941            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
942            t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,))
943            threads.append(t)
944
945        if target_obj.enable_dpdk_memory:
946            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir))
947            threads.append(t)
948
949        for t in threads:
950            t.start()
951        for t in threads:
952            t.join()
953
954        for i in initiators:
955            if i.mode == "kernel":
956                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
957            i.copy_result_files(target_results_dir)
958
959    target_obj.parse_results(target_results_dir)
960