xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 06b537bfdb4393dea857e204b85d8df46a351d8a)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from collections import OrderedDict
18from common import *
19
20
21class Server:
22    def __init__(self, name, username, password, mode, nic_ips, transport):
23        self.name = name
24        self.mode = mode
25        self.username = username
26        self.password = password
27        self.nic_ips = nic_ips
28        self.transport = transport.lower()
29
30        if not re.match("^[A-Za-z0-9]*$", name):
31            self.log_print("Please use a name which contains only letters or numbers")
32            sys.exit(1)
33
34    def log_print(self, msg):
35        print("[%s] %s" % (self.name, msg), flush=True)
36
37
38class Target(Server):
39    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
40                 null_block_devices=0, sar_settings=None, pcm_settings=None,
41                 bandwidth_settings=None, dpdk_settings=None):
42
43        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
44        self.null_block = null_block_devices
45        self.enable_sar = False
46        self.enable_pcm_memory = False
47        self.enable_pcm = False
48        self.enable_bandwidth = False
49        self.enable_dpdk_memory = False
50
51        if sar_settings:
52            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
53
54        if pcm_settings:
55            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
56
57        if bandwidth_settings:
58            self.enable_bandwidth, self.bandwidth_count = bandwidth_settings
59
60        if dpdk_settings:
61            self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings
62
63        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
64        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
65
66    def zip_spdk_sources(self, spdk_dir, dest_file):
67        self.log_print("Zipping SPDK source directory")
68        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
69        for root, directories, files in os.walk(spdk_dir, followlinks=True):
70            for file in files:
71                fh.write(os.path.relpath(os.path.join(root, file)))
72        fh.close()
73        self.log_print("Done zipping")
74
75    def read_json_stats(self, file):
76        with open(file, "r") as json_data:
77            data = json.load(json_data)
78            job_pos = 0  # job_post = 0 because using aggregated results
79
80            # Check if latency is in nano or microseconds to choose correct dict key
81            def get_lat_unit(key_prefix, dict_section):
82                # key prefix - lat, clat or slat.
83                # dict section - portion of json containing latency bucket in question
84                # Return dict key to access the bucket and unit as string
85                for k, v in dict_section.items():
86                    if k.startswith(key_prefix):
87                        return k, k.split("_")[1]
88
89            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
90            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
91            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
92            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
93            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
94            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
95            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
96            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
97            read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
98            read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
99            read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
100
101            if "ns" in lat_unit:
102                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
103            if "ns" in clat_unit:
104                read_p99_lat = read_p99_lat / 1000
105                read_p99_9_lat = read_p99_9_lat / 1000
106                read_p99_99_lat = read_p99_99_lat / 1000
107                read_p99_999_lat = read_p99_999_lat / 1000
108
109            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
110            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
111            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
112            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
113            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
114            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
115            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
116            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
117            write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
118            write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
119            write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
120
121            if "ns" in lat_unit:
122                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
123            if "ns" in clat_unit:
124                write_p99_lat = write_p99_lat / 1000
125                write_p99_9_lat = write_p99_9_lat / 1000
126                write_p99_99_lat = write_p99_99_lat / 1000
127                write_p99_999_lat = write_p99_999_lat / 1000
128
129        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
130                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
131                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
132                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
133
134    def parse_results(self, results_dir, initiator_count=None, run_num=None):
135        files = os.listdir(results_dir)
136        fio_files = filter(lambda x: ".fio" in x, files)
137        json_files = [x for x in files if ".json" in x]
138
139        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
140                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
141                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
142                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
143
144        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
145                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
146
147        header_line = ",".join(["Name", *headers])
148        aggr_header_line = ",".join(["Name", *aggr_headers])
149
150        # Create empty results file
151        csv_file = "nvmf_results.csv"
152        with open(os.path.join(results_dir, csv_file), "w") as fh:
153            fh.write(aggr_header_line + "\n")
154        rows = set()
155
156        for fio_config in fio_files:
157            self.log_print("Getting FIO stats for %s" % fio_config)
158            job_name, _ = os.path.splitext(fio_config)
159
160            # Look in the filename for rwmixread value. Function arguments do
161            # not have that information.
162            # TODO: Improve this function by directly using workload params instead
163            # of regexing through filenames.
164            if "read" in job_name:
165                rw_mixread = 1
166            elif "write" in job_name:
167                rw_mixread = 0
168            else:
169                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
170
171            # If "_CPU" exists in name - ignore it
172            # Initiators for the same job could have diffrent num_cores parameter
173            job_name = re.sub(r"_\d+CPU", "", job_name)
174            job_result_files = [x for x in json_files if job_name in x]
175            self.log_print("Matching result files for current fio config:")
176            for j in job_result_files:
177                self.log_print("\t %s" % j)
178
179            # There may have been more than 1 initiator used in test, need to check that
180            # Result files are created so that string after last "_" separator is server name
181            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
182            inits_avg_results = []
183            for i in inits_names:
184                self.log_print("\tGetting stats for initiator %s" % i)
185                # There may have been more than 1 test run for this job, calculate average results for initiator
186                i_results = [x for x in job_result_files if i in x]
187                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
188
189                separate_stats = []
190                for r in i_results:
191                    stats = self.read_json_stats(os.path.join(results_dir, r))
192                    separate_stats.append(stats)
193                    self.log_print(stats)
194
195                init_results = [sum(x) for x in zip(*separate_stats)]
196                init_results = [x / len(separate_stats) for x in init_results]
197                inits_avg_results.append(init_results)
198
199                self.log_print("\tAverage results for initiator %s" % i)
200                self.log_print(init_results)
201                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
202                    fh.write(header_line + "\n")
203                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
204
205            # Sum results of all initiators running this FIO job.
206            # Latency results are an average of latencies from accros all initiators.
207            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
208            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
209            for key in inits_avg_results:
210                if "lat" in key:
211                    inits_avg_results[key] /= len(inits_names)
212
213            # Aggregate separate read/write values into common labels
214            # Take rw_mixread into consideration for mixed read/write workloads.
215            aggregate_results = OrderedDict()
216            for h in aggr_headers:
217                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
218                if "lat" in h:
219                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
220                else:
221                    _ = read_stat + write_stat
222                aggregate_results[h] = "{0:.3f}".format(_)
223
224            rows.add(",".join([job_name, *aggregate_results.values()]))
225
226        # Save results to file
227        for row in rows:
228            with open(os.path.join(results_dir, csv_file), "a") as fh:
229                fh.write(row + "\n")
230        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
231
232    def measure_sar(self, results_dir, sar_file_name):
233        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
234        time.sleep(self.sar_delay)
235        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
236        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
237            for line in out.split("\n"):
238                if "Average" in line and "CPU" in line:
239                    self.log_print("Summary CPU utilization from SAR:")
240                    self.log_print(line)
241                if "Average" in line and "all" in line:
242                    self.log_print(line)
243            fh.write(out)
244
245    def measure_pcm_memory(self, results_dir, pcm_file_name):
246        time.sleep(self.pcm_delay)
247        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
248                                      results_dir, pcm_file_name), shell=True)
249        time.sleep(self.pcm_count)
250        pcm_memory.kill()
251
252    def measure_pcm(self, results_dir, pcm_file_name):
253        time.sleep(self.pcm_delay)
254        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
255                       results_dir, pcm_file_name), shell=True, check=True)
256        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
257        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
258        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
259        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
260        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
261
262    def measure_bandwidth(self, results_dir, bandwidth_file_name):
263        bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name,
264                             self.bandwidth_count), shell=True, check=True)
265
266    def measure_dpdk_memory(self, results_dir):
267        self.log_print("INFO: waiting to generate DPDK memory usage")
268        time.sleep(self.dpdk_wait_time)
269        self.log_print("INFO: generating DPDK memory usage")
270        rpc.env.env_dpdk_get_mem_stats
271        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
272
273
274class Initiator(Server):
275    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
276                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
277                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
278
279        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
280
281        self.ip = ip
282        self.spdk_dir = workspace
283        if os.getenv('SPDK_WORKSPACE'):
284            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
285        self.fio_bin = fio_bin
286        self.cpus_allowed = cpus_allowed
287        self.cpus_allowed_policy = cpus_allowed_policy
288        self.cpu_frequency = cpu_frequency
289        self.nvmecli_bin = nvmecli_bin
290        self.ssh_connection = paramiko.SSHClient()
291        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
292        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
293        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
294        self.remote_call("mkdir -p %s" % self.spdk_dir)
295        self.set_cpu_frequency()
296
297    def __del__(self):
298        self.ssh_connection.close()
299
300    def put_file(self, local, remote_dest):
301        ftp = self.ssh_connection.open_sftp()
302        ftp.put(local, remote_dest)
303        ftp.close()
304
305    def get_file(self, remote, local_dest):
306        ftp = self.ssh_connection.open_sftp()
307        ftp.get(remote, local_dest)
308        ftp.close()
309
310    def remote_call(self, cmd):
311        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
312        out = stdout.read().decode(encoding="utf-8")
313        err = stderr.read().decode(encoding="utf-8")
314        return out, err
315
316    def copy_result_files(self, dest_dir):
317        self.log_print("Copying results")
318
319        if not os.path.exists(dest_dir):
320            os.mkdir(dest_dir)
321
322        # Get list of result files from initiator and copy them back to target
323        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
324        file_list = stdout.strip().split("\n")
325
326        for file in file_list:
327            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
328                          os.path.join(dest_dir, file))
329        self.log_print("Done copying results")
330
331    def discover_subsystems(self, address_list, subsys_no):
332        num_nvmes = range(0, subsys_no)
333        nvme_discover_output = ""
334        for ip, subsys_no in itertools.product(address_list, num_nvmes):
335            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
336            nvme_discover_cmd = ["sudo",
337                                 "%s" % self.nvmecli_bin,
338                                 "discover", "-t %s" % self.transport,
339                                 "-s %s" % (4420 + subsys_no),
340                                 "-a %s" % ip]
341            nvme_discover_cmd = " ".join(nvme_discover_cmd)
342
343            stdout, stderr = self.remote_call(nvme_discover_cmd)
344            if stdout:
345                nvme_discover_output = nvme_discover_output + stdout
346
347        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
348                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
349                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
350                                nvme_discover_output)  # from nvme discovery output
351        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
352        subsystems = list(set(subsystems))
353        subsystems.sort(key=lambda x: x[1])
354        self.log_print("Found matching subsystems on target side:")
355        for s in subsystems:
356            self.log_print(s)
357
358        return subsystems
359
360    def gen_fio_filename_conf(self, *args, **kwargs):
361        # Logic implemented in SPDKInitiator and KernelInitiator classes
362        pass
363
364    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
365        fio_conf_template = """
366[global]
367ioengine={ioengine}
368{spdk_conf}
369thread=1
370group_reporting=1
371direct=1
372percentile_list=50:90:99:99.5:99.9:99.99:99.999
373
374norandommap=1
375rw={rw}
376rwmixread={rwmixread}
377bs={block_size}
378time_based=1
379ramp_time={ramp_time}
380runtime={run_time}
381"""
382        if "spdk" in self.mode:
383            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
384            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
385            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
386            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
387            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
388        else:
389            ioengine = "libaio"
390            spdk_conf = ""
391            out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
392            subsystems = [x for x in out.split("\n") if "nvme" in x]
393
394        if self.cpus_allowed is not None:
395            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
396            cpus_num = 0
397            cpus = self.cpus_allowed.split(",")
398            for cpu in cpus:
399                if "-" in cpu:
400                    a, b = cpu.split("-")
401                    a = int(a)
402                    b = int(b)
403                    cpus_num += len(range(a, b))
404                else:
405                    cpus_num += 1
406            threads = range(0, cpus_num)
407        elif hasattr(self, 'num_cores'):
408            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
409            threads = range(0, int(self.num_cores))
410        else:
411            threads = range(0, len(subsystems))
412
413        if "spdk" in self.mode:
414            filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs)
415        else:
416            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
417
418        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
419                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
420                                              ramp_time=ramp_time, run_time=run_time)
421        if num_jobs:
422            fio_config = fio_config + "numjobs=%s \n" % num_jobs
423        if self.cpus_allowed is not None:
424            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
425            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
426        fio_config = fio_config + filename_section
427
428        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
429        if hasattr(self, "num_cores"):
430            fio_config_filename += "_%sCPU" % self.num_cores
431        fio_config_filename += ".fio"
432
433        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
434        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
435        self.log_print("Created FIO Config:")
436        self.log_print(fio_config)
437
438        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
439
440    def set_cpu_frequency(self):
441        if self.cpu_frequency is not None:
442            try:
443                self.remote_call('sudo cpupower frequency-set -g userspace')
444                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
445                cmd = "sudo cpupower frequency-info"
446                output, error = self.remote_call(cmd)
447                self.log_print(output)
448                self.log_print(error)
449            except Exception:
450                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
451                sys.exit()
452        else:
453            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
454
455    def run_fio(self, fio_config_file, run_num=None):
456        job_name, _ = os.path.splitext(fio_config_file)
457        self.log_print("Starting FIO run for job: %s" % job_name)
458        self.log_print("Using FIO: %s" % self.fio_bin)
459
460        if run_num:
461            for i in range(1, run_num + 1):
462                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
463                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
464                output, error = self.remote_call(cmd)
465                self.log_print(output)
466                self.log_print(error)
467        else:
468            output_filename = job_name + "_" + self.name + ".json"
469            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
470            output, error = self.remote_call(cmd)
471            self.log_print(output)
472            self.log_print(error)
473        self.log_print("FIO run finished. Results in: %s" % output_filename)
474
475
476class KernelTarget(Target):
477    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
478                 null_block_devices=0, sar_settings=None, pcm_settings=None,
479                 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs):
480
481        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
482                                           null_block_devices, sar_settings, pcm_settings, bandwidth_settings,
483                                           dpdk_settings)
484        self.nvmet_bin = nvmet_bin
485
486    def __del__(self):
487        nvmet_command(self.nvmet_bin, "clear")
488
489    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
490
491        nvmet_cfg = {
492            "ports": [],
493            "hosts": [],
494            "subsystems": [],
495        }
496
497        # Split disks between NIC IP's
498        disks_per_ip = int(len(nvme_list) / len(address_list))
499        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
500
501        subsys_no = 1
502        port_no = 0
503        for ip, chunk in zip(address_list, disk_chunks):
504            for disk in chunk:
505                nvmet_cfg["subsystems"].append({
506                    "allowed_hosts": [],
507                    "attr": {
508                        "allow_any_host": "1",
509                        "serial": "SPDK00%s" % subsys_no,
510                        "version": "1.3"
511                    },
512                    "namespaces": [
513                        {
514                            "device": {
515                                "path": disk,
516                                "uuid": "%s" % uuid.uuid4()
517                            },
518                            "enable": 1,
519                            "nsid": subsys_no
520                        }
521                    ],
522                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
523                })
524
525                nvmet_cfg["ports"].append({
526                    "addr": {
527                        "adrfam": "ipv4",
528                        "traddr": ip,
529                        "trsvcid": "%s" % (4420 + port_no),
530                        "trtype": "%s" % self.transport
531                    },
532                    "portid": subsys_no,
533                    "referrals": [],
534                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
535                })
536                subsys_no += 1
537                port_no += 1
538
539        with open("kernel.conf", "w") as fh:
540            fh.write(json.dumps(nvmet_cfg, indent=2))
541        pass
542
543    def tgt_start(self):
544        self.log_print("Configuring kernel NVMeOF Target")
545
546        if self.null_block:
547            print("Configuring with null block device.")
548            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
549            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
550            self.subsys_no = len(null_blk_list)
551        else:
552            print("Configuring with NVMe drives.")
553            nvme_list = get_nvme_devices()
554            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
555            self.subsys_no = len(nvme_list)
556
557        nvmet_command(self.nvmet_bin, "clear")
558        nvmet_command(self.nvmet_bin, "restore kernel.conf")
559        self.log_print("Done configuring kernel NVMeOF Target")
560
561
562class SPDKTarget(Target):
563
564    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
565                 null_block_devices=0, null_block_dif_type=0, sar_settings=None, pcm_settings=None,
566                 bandwidth_settings=None, dpdk_settings=None, num_shared_buffers=4096,
567                 num_cores=1, dif_insert_strip=False, **kwargs):
568
569        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
570                                         null_block_devices, sar_settings, pcm_settings, bandwidth_settings,
571                                         dpdk_settings)
572        self.num_cores = num_cores
573        self.num_shared_buffers = num_shared_buffers
574        self.null_block_dif_type = null_block_dif_type
575        self.dif_insert_strip = dif_insert_strip
576
577    def spdk_tgt_configure(self):
578        self.log_print("Configuring SPDK NVMeOF target via RPC")
579        numa_list = get_used_numa_nodes()
580
581        # Create RDMA transport layer
582        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
583                                       num_shared_buffers=self.num_shared_buffers,
584                                       dif_insert_or_strip=self.dif_insert_strip)
585        self.log_print("SPDK NVMeOF transport layer:")
586        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
587
588        if self.null_block:
589            nvme_section = self.spdk_tgt_add_nullblock(self.null_block)
590            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
591        else:
592            nvme_section = self.spdk_tgt_add_nvme_conf()
593            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
594        self.log_print("Done configuring SPDK NVMeOF Target")
595
596    def spdk_tgt_add_nullblock(self, null_block_count):
597        md_size = 0
598        block_size = 4096
599        if self.null_block_dif_type != 0:
600            md_size = 128
601
602        self.log_print("Adding null block bdevices to config via RPC")
603        for i in range(null_block_count):
604            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
605            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
606                                      dif_type=self.null_block_dif_type, md_size=md_size)
607        self.log_print("SPDK Bdevs configuration:")
608        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
609
610    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
611        self.log_print("Adding NVMe bdevs to config via RPC")
612
613        bdfs = get_nvme_devices_bdf()
614        bdfs = [b.replace(":", ".") for b in bdfs]
615
616        if req_num_disks:
617            if req_num_disks > len(bdfs):
618                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
619                sys.exit(1)
620            else:
621                bdfs = bdfs[0:req_num_disks]
622
623        for i, bdf in enumerate(bdfs):
624            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
625
626        self.log_print("SPDK Bdevs configuration:")
627        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
628
629    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
630        self.log_print("Adding subsystems to config")
631        if not req_num_disks:
632            req_num_disks = get_nvme_devices_count()
633
634        # Distribute bdevs between provided NICs
635        num_disks = range(0, req_num_disks)
636        if len(num_disks) == 1:
637            disks_per_ip = 1
638        else:
639            disks_per_ip = int(len(num_disks) / len(ips))
640        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
641
642        # Create subsystems, add bdevs to namespaces, add listeners
643        for ip, chunk in zip(ips, disk_chunks):
644            for c in chunk:
645                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
646                serial = "SPDK00%s" % c
647                bdev_name = "Nvme%sn1" % c
648                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
649                                               allow_any_host=True, max_namespaces=8)
650                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
651
652                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
653                                                     trtype=self.transport,
654                                                     traddr=ip,
655                                                     trsvcid="4420",
656                                                     adrfam="ipv4")
657
658        self.log_print("SPDK NVMeOF subsystem configuration:")
659        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
660
661    def tgt_start(self):
662        if self.null_block:
663            self.subsys_no = 1
664        else:
665            self.subsys_no = get_nvme_devices_count()
666        self.log_print("Starting SPDK NVMeOF Target process")
667        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
668        command = " ".join([nvmf_app_path, "-m", self.num_cores])
669        proc = subprocess.Popen(command, shell=True)
670        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
671
672        with open(self.pid, "w") as fh:
673            fh.write(str(proc.pid))
674        self.nvmf_proc = proc
675        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
676        self.log_print("Waiting for spdk to initilize...")
677        while True:
678            if os.path.exists("/var/tmp/spdk.sock"):
679                break
680            time.sleep(1)
681        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
682
683        self.spdk_tgt_configure()
684
685    def __del__(self):
686        if hasattr(self, "nvmf_proc"):
687            try:
688                self.nvmf_proc.terminate()
689                self.nvmf_proc.wait()
690            except Exception as e:
691                self.log_print(e)
692                self.nvmf_proc.kill()
693                self.nvmf_proc.communicate()
694
695
696class KernelInitiator(Initiator):
697    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
698                 cpus_allowed=None, cpus_allowed_policy="shared",
699                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
700
701        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
702                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
703                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
704
705        self.extra_params = ""
706        if kwargs["extra_params"]:
707            self.extra_params = kwargs["extra_params"]
708
709    def __del__(self):
710        self.ssh_connection.close()
711
712    def kernel_init_connect(self, address_list, subsys_no):
713        subsystems = self.discover_subsystems(address_list, subsys_no)
714        self.log_print("Below connection attempts may result in error messages, this is expected!")
715        for subsystem in subsystems:
716            self.log_print("Trying to connect %s %s %s" % subsystem)
717            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
718                                                                             self.transport,
719                                                                             *subsystem,
720                                                                             self.extra_params))
721            time.sleep(2)
722
723    def kernel_init_disconnect(self, address_list, subsys_no):
724        subsystems = self.discover_subsystems(address_list, subsys_no)
725        for subsystem in subsystems:
726            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
727            time.sleep(1)
728
729    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
730        out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
731        nvme_list = [x for x in out.split("\n") if "nvme" in x]
732
733        filename_section = ""
734        nvme_per_split = int(len(nvme_list) / len(threads))
735        remainder = len(nvme_list) % len(threads)
736        iterator = iter(nvme_list)
737        result = []
738        for i in range(len(threads)):
739            result.append([])
740            for j in range(nvme_per_split):
741                result[i].append(next(iterator))
742                if remainder:
743                    result[i].append(next(iterator))
744                    remainder -= 1
745        for i, r in enumerate(result):
746            header = "[filename%s]" % i
747            disks = "\n".join(["filename=%s" % x for x in r])
748            job_section_qd = round((io_depth * len(r)) / num_jobs)
749            if job_section_qd == 0:
750                job_section_qd = 1
751            iodepth = "iodepth=%s" % job_section_qd
752            filename_section = "\n".join([filename_section, header, disks, iodepth])
753
754        return filename_section
755
756
757class SPDKInitiator(Initiator):
758    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
759                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
760                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
761        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
762                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
763                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
764
765        self.num_cores = num_cores
766
767    def install_spdk(self, local_spdk_zip):
768        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
769        self.log_print("Copied sources zip from target")
770        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
771
772        self.log_print("Sources unpacked")
773        self.log_print("Using fio binary %s" % self.fio_bin)
774        self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;"
775                         "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
776
777        self.log_print("SPDK built")
778        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
779
780    def gen_spdk_bdev_conf(self, remote_subsystem_list):
781        header = "[Nvme]"
782        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
783
784        bdev_rows = [row_template.format(transport=self.transport,
785                                         svc=x[0],
786                                         nqn=x[1],
787                                         ip=x[2],
788                                         i=i) for i, x in enumerate(remote_subsystem_list)]
789        bdev_rows = "\n".join(bdev_rows)
790        bdev_section = "\n".join([header, bdev_rows])
791        return bdev_section
792
793    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
794        filename_section = ""
795        if len(threads) >= len(subsystems):
796            threads = range(0, len(subsystems))
797        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
798        nvme_per_split = int(len(subsystems) / len(threads))
799        remainder = len(subsystems) % len(threads)
800        iterator = iter(filenames)
801        result = []
802        for i in range(len(threads)):
803            result.append([])
804            for j in range(nvme_per_split):
805                result[i].append(next(iterator))
806            if remainder:
807                result[i].append(next(iterator))
808                remainder -= 1
809        for i, r in enumerate(result):
810            header = "[filename%s]" % i
811            disks = "\n".join(["filename=%s" % x for x in r])
812            job_section_qd = round((io_depth * len(r)) / num_jobs)
813            if job_section_qd == 0:
814                job_section_qd = 1
815            iodepth = "iodepth=%s" % job_section_qd
816            filename_section = "\n".join([filename_section, header, disks, iodepth])
817
818        return filename_section
819
820
821if __name__ == "__main__":
822    spdk_zip_path = "/tmp/spdk.zip"
823    target_results_dir = "/tmp/results"
824
825    if (len(sys.argv) > 1):
826        config_file_path = sys.argv[1]
827    else:
828        script_full_dir = os.path.dirname(os.path.realpath(__file__))
829        config_file_path = os.path.join(script_full_dir, "config.json")
830
831    print("Using config file: %s" % config_file_path)
832    with open(config_file_path, "r") as config:
833        data = json.load(config)
834
835    initiators = []
836    fio_cases = []
837
838    for k, v in data.items():
839        if "target" in k:
840            if data[k]["mode"] == "spdk":
841                target_obj = SPDKTarget(name=k, **data["general"], **v)
842            elif data[k]["mode"] == "kernel":
843                target_obj = KernelTarget(name=k, **data["general"], **v)
844        elif "initiator" in k:
845            if data[k]["mode"] == "spdk":
846                init_obj = SPDKInitiator(name=k, **data["general"], **v)
847            elif data[k]["mode"] == "kernel":
848                init_obj = KernelInitiator(name=k, **data["general"], **v)
849            initiators.append(init_obj)
850        elif "fio" in k:
851            fio_workloads = itertools.product(data[k]["bs"],
852                                              data[k]["qd"],
853                                              data[k]["rw"])
854
855            fio_run_time = data[k]["run_time"]
856            fio_ramp_time = data[k]["ramp_time"]
857            fio_rw_mix_read = data[k]["rwmixread"]
858            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
859            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
860        else:
861            continue
862
863    # Copy and install SPDK on remote initiators
864    if "skip_spdk_install" not in data["general"]:
865        target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
866        threads = []
867        for i in initiators:
868            if i.mode == "spdk":
869                t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
870                threads.append(t)
871                t.start()
872        for t in threads:
873            t.join()
874
875    target_obj.tgt_start()
876
877    # Poor mans threading
878    # Run FIO tests
879    for block_size, io_depth, rw in fio_workloads:
880        threads = []
881        configs = []
882        for i in initiators:
883            if i.mode == "kernel":
884                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
885
886            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
887                                   fio_num_jobs, fio_ramp_time, fio_run_time)
888            configs.append(cfg)
889
890        for i, cfg in zip(initiators, configs):
891            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
892            threads.append(t)
893        if target_obj.enable_sar:
894            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
895            sar_file_name = ".".join([sar_file_name, "txt"])
896            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
897            threads.append(t)
898
899        if target_obj.enable_pcm:
900            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
901            pcm_file_name = ".".join([pcm_file_name, "csv"])
902            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
903            threads.append(t)
904
905        if target_obj.enable_pcm_memory:
906            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
907            pcm_file_name = ".".join([pcm_file_name, "csv"])
908            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
909            threads.append(t)
910
911        if target_obj.enable_bandwidth:
912            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
913            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
914            t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,))
915            threads.append(t)
916
917        if target_obj.enable_dpdk_memory:
918            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir))
919            threads.append(t)
920
921        for t in threads:
922            t.start()
923        for t in threads:
924            t.join()
925
926        for i in initiators:
927            if i.mode == "kernel":
928                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
929            i.copy_result_files(target_results_dir)
930
931    target_obj.parse_results(target_results_dir)
932