xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 8bb0ded3e55c182cea67af1f6790f8de5f38c05f)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from collections import OrderedDict
18from common import *
19
20
21class Server:
22    def __init__(self, name, username, password, mode, nic_ips, transport):
23        self.name = name
24        self.mode = mode
25        self.username = username
26        self.password = password
27        self.nic_ips = nic_ips
28        self.transport = transport.lower()
29
30        if not re.match("^[A-Za-z0-9]*$", name):
31            self.log_print("Please use a name which contains only letters or numbers")
32            sys.exit(1)
33
34    def log_print(self, msg):
35        print("[%s] %s" % (self.name, msg), flush=True)
36
37    def get_uncommented_lines(self, lines):
38        return [l for l in lines if l and not l.startswith('#')]
39
40
41class Target(Server):
42    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
43                 null_block_devices=0, sar_settings=None, pcm_settings=None,
44                 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None,
45                 scheduler_settings="static"):
46
47        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
48        self.null_block = null_block_devices
49        self.enable_sar = False
50        self.enable_pcm_power = False
51        self.enable_pcm_memory = False
52        self.enable_pcm = False
53        self.enable_bandwidth = False
54        self.enable_dpdk_memory = False
55        self.enable_zcopy = False
56        self.scheduler_name = scheduler_settings
57
58        if sar_settings:
59            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
60
61        if pcm_settings:
62            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.enable_pcm_power,\
63                self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
64
65        if bandwidth_settings:
66            self.enable_bandwidth, self.bandwidth_count = bandwidth_settings
67
68        if dpdk_settings:
69            self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings
70
71        if zcopy_settings:
72            self.enable_zcopy = zcopy_settings
73
74        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
75        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
76        self.sys_config()
77
78    def zip_spdk_sources(self, spdk_dir, dest_file):
79        self.log_print("Zipping SPDK source directory")
80        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
81        for root, directories, files in os.walk(spdk_dir, followlinks=True):
82            for file in files:
83                fh.write(os.path.relpath(os.path.join(root, file)))
84        fh.close()
85        self.log_print("Done zipping")
86
87    def read_json_stats(self, file):
88        with open(file, "r") as json_data:
89            data = json.load(json_data)
90            job_pos = 0  # job_post = 0 because using aggregated results
91
92            # Check if latency is in nano or microseconds to choose correct dict key
93            def get_lat_unit(key_prefix, dict_section):
94                # key prefix - lat, clat or slat.
95                # dict section - portion of json containing latency bucket in question
96                # Return dict key to access the bucket and unit as string
97                for k, v in dict_section.items():
98                    if k.startswith(key_prefix):
99                        return k, k.split("_")[1]
100
101            def get_clat_percentiles(clat_dict_leaf):
102                if "percentile" in clat_dict_leaf:
103                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
104                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
105                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
106                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
107
108                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
109                else:
110                    # Latest fio versions do not provide "percentile" results if no
111                    # measurements were done, so just return zeroes
112                    return [0, 0, 0, 0]
113
114            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
115            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
116            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
117            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
118            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
119            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
120            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
121            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
122                data["jobs"][job_pos]["read"][clat_key])
123
124            if "ns" in lat_unit:
125                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
126            if "ns" in clat_unit:
127                read_p99_lat = read_p99_lat / 1000
128                read_p99_9_lat = read_p99_9_lat / 1000
129                read_p99_99_lat = read_p99_99_lat / 1000
130                read_p99_999_lat = read_p99_999_lat / 1000
131
132            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
133            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
134            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
135            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
136            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
137            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
138            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
139            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
140                data["jobs"][job_pos]["write"][clat_key])
141
142            if "ns" in lat_unit:
143                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
144            if "ns" in clat_unit:
145                write_p99_lat = write_p99_lat / 1000
146                write_p99_9_lat = write_p99_9_lat / 1000
147                write_p99_99_lat = write_p99_99_lat / 1000
148                write_p99_999_lat = write_p99_999_lat / 1000
149
150        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
151                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
152                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
153                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
154
155    def parse_results(self, results_dir, initiator_count=None, run_num=None):
156        files = os.listdir(results_dir)
157        fio_files = filter(lambda x: ".fio" in x, files)
158        json_files = [x for x in files if ".json" in x]
159
160        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
161                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
162                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
163                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
164
165        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
166                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
167
168        header_line = ",".join(["Name", *headers])
169        aggr_header_line = ",".join(["Name", *aggr_headers])
170
171        # Create empty results file
172        csv_file = "nvmf_results.csv"
173        with open(os.path.join(results_dir, csv_file), "w") as fh:
174            fh.write(aggr_header_line + "\n")
175        rows = set()
176
177        for fio_config in fio_files:
178            self.log_print("Getting FIO stats for %s" % fio_config)
179            job_name, _ = os.path.splitext(fio_config)
180
181            # Look in the filename for rwmixread value. Function arguments do
182            # not have that information.
183            # TODO: Improve this function by directly using workload params instead
184            # of regexing through filenames.
185            if "read" in job_name:
186                rw_mixread = 1
187            elif "write" in job_name:
188                rw_mixread = 0
189            else:
190                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
191
192            # If "_CPU" exists in name - ignore it
193            # Initiators for the same job could have diffrent num_cores parameter
194            job_name = re.sub(r"_\d+CPU", "", job_name)
195            job_result_files = [x for x in json_files if job_name in x]
196            self.log_print("Matching result files for current fio config:")
197            for j in job_result_files:
198                self.log_print("\t %s" % j)
199
200            # There may have been more than 1 initiator used in test, need to check that
201            # Result files are created so that string after last "_" separator is server name
202            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
203            inits_avg_results = []
204            for i in inits_names:
205                self.log_print("\tGetting stats for initiator %s" % i)
206                # There may have been more than 1 test run for this job, calculate average results for initiator
207                i_results = [x for x in job_result_files if i in x]
208                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
209
210                separate_stats = []
211                for r in i_results:
212                    stats = self.read_json_stats(os.path.join(results_dir, r))
213                    separate_stats.append(stats)
214                    self.log_print(stats)
215
216                init_results = [sum(x) for x in zip(*separate_stats)]
217                init_results = [x / len(separate_stats) for x in init_results]
218                inits_avg_results.append(init_results)
219
220                self.log_print("\tAverage results for initiator %s" % i)
221                self.log_print(init_results)
222                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
223                    fh.write(header_line + "\n")
224                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
225
226            # Sum results of all initiators running this FIO job.
227            # Latency results are an average of latencies from accros all initiators.
228            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
229            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
230            for key in inits_avg_results:
231                if "lat" in key:
232                    inits_avg_results[key] /= len(inits_names)
233
234            # Aggregate separate read/write values into common labels
235            # Take rw_mixread into consideration for mixed read/write workloads.
236            aggregate_results = OrderedDict()
237            for h in aggr_headers:
238                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
239                if "lat" in h:
240                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
241                else:
242                    _ = read_stat + write_stat
243                aggregate_results[h] = "{0:.3f}".format(_)
244
245            rows.add(",".join([job_name, *aggregate_results.values()]))
246
247        # Save results to file
248        for row in rows:
249            with open(os.path.join(results_dir, csv_file), "a") as fh:
250                fh.write(row + "\n")
251        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
252
253    def measure_sar(self, results_dir, sar_file_name):
254        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
255        time.sleep(self.sar_delay)
256        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
257        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
258            for line in out.split("\n"):
259                if "Average" in line and "CPU" in line:
260                    self.log_print("Summary CPU utilization from SAR:")
261                    self.log_print(line)
262                if "Average" in line and "all" in line:
263                    self.log_print(line)
264            fh.write(out)
265
266    def measure_pcm_memory(self, results_dir, pcm_file_name):
267        time.sleep(self.pcm_delay)
268        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
269                                      results_dir, pcm_file_name), shell=True)
270        time.sleep(self.pcm_count)
271        pcm_memory.kill()
272
273    def measure_pcm(self, results_dir, pcm_file_name):
274        time.sleep(self.pcm_delay)
275        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
276                       results_dir, pcm_file_name), shell=True, check=True)
277        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
278        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
279        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
280        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
281        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
282
283    def measure_pcm_power(self, results_dir, pcm_power_file_name):
284        time.sleep(self.pcm_delay)
285        out = subprocess.check_output("%s/pcm-power.x %s -i=%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count),
286                                      shell=True).decode(encoding="utf-8")
287        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
288            fh.write(out)
289
290    def measure_bandwidth(self, results_dir, bandwidth_file_name):
291        bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name,
292                             self.bandwidth_count), shell=True, check=True)
293
294    def measure_dpdk_memory(self, results_dir):
295        self.log_print("INFO: waiting to generate DPDK memory usage")
296        time.sleep(self.dpdk_wait_time)
297        self.log_print("INFO: generating DPDK memory usage")
298        rpc.env.env_dpdk_get_mem_stats
299        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
300
301    def sys_config(self):
302        self.log_print("====Kernel release:====")
303        self.log_print(os.uname().release)
304        self.log_print("====Kernel command line:====")
305        with open('/proc/cmdline') as f:
306            cmdline = f.readlines()
307            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
308        self.log_print("====sysctl conf:====")
309        with open('/etc/sysctl.conf') as f:
310            sysctl = f.readlines()
311            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
312        self.log_print("====Cpu power info:====")
313        subprocess.run("cpupower frequency-info", shell=True, check=True)
314        self.log_print("====zcopy settings:====")
315        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
316        self.log_print("====Scheduler settings:====")
317        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
318
319
320class Initiator(Server):
321    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
322                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
323                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
324
325        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
326
327        self.ip = ip
328        self.spdk_dir = workspace
329        if os.getenv('SPDK_WORKSPACE'):
330            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
331        self.fio_bin = fio_bin
332        self.cpus_allowed = cpus_allowed
333        self.cpus_allowed_policy = cpus_allowed_policy
334        self.cpu_frequency = cpu_frequency
335        self.nvmecli_bin = nvmecli_bin
336        self.ssh_connection = paramiko.SSHClient()
337        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
338        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
339        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
340        self.remote_call("mkdir -p %s" % self.spdk_dir)
341        self.set_cpu_frequency()
342        self.sys_config()
343
344    def __del__(self):
345        self.ssh_connection.close()
346
347    def put_file(self, local, remote_dest):
348        ftp = self.ssh_connection.open_sftp()
349        ftp.put(local, remote_dest)
350        ftp.close()
351
352    def get_file(self, remote, local_dest):
353        ftp = self.ssh_connection.open_sftp()
354        ftp.get(remote, local_dest)
355        ftp.close()
356
357    def remote_call(self, cmd):
358        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
359        out = stdout.read().decode(encoding="utf-8")
360        err = stderr.read().decode(encoding="utf-8")
361        return out, err
362
363    def copy_result_files(self, dest_dir):
364        self.log_print("Copying results")
365
366        if not os.path.exists(dest_dir):
367            os.mkdir(dest_dir)
368
369        # Get list of result files from initiator and copy them back to target
370        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
371        file_list = stdout.strip().split("\n")
372
373        for file in file_list:
374            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
375                          os.path.join(dest_dir, file))
376        self.log_print("Done copying results")
377
378    def discover_subsystems(self, address_list, subsys_no):
379        num_nvmes = range(0, subsys_no)
380        nvme_discover_output = ""
381        for ip, subsys_no in itertools.product(address_list, num_nvmes):
382            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
383            nvme_discover_cmd = ["sudo",
384                                 "%s" % self.nvmecli_bin,
385                                 "discover", "-t %s" % self.transport,
386                                 "-s %s" % (4420 + subsys_no),
387                                 "-a %s" % ip]
388            nvme_discover_cmd = " ".join(nvme_discover_cmd)
389
390            stdout, stderr = self.remote_call(nvme_discover_cmd)
391            if stdout:
392                nvme_discover_output = nvme_discover_output + stdout
393
394        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
395                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
396                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
397                                nvme_discover_output)  # from nvme discovery output
398        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
399        subsystems = list(set(subsystems))
400        subsystems.sort(key=lambda x: x[1])
401        self.log_print("Found matching subsystems on target side:")
402        for s in subsystems:
403            self.log_print(s)
404
405        return subsystems
406
407    def gen_fio_filename_conf(self, *args, **kwargs):
408        # Logic implemented in SPDKInitiator and KernelInitiator classes
409        pass
410
411    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
412        fio_conf_template = """
413[global]
414ioengine={ioengine}
415{spdk_conf}
416thread=1
417group_reporting=1
418direct=1
419percentile_list=50:90:99:99.5:99.9:99.99:99.999
420
421norandommap=1
422rw={rw}
423rwmixread={rwmixread}
424bs={block_size}
425time_based=1
426ramp_time={ramp_time}
427runtime={run_time}
428"""
429        if "spdk" in self.mode:
430            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
431            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
432            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
433            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
434            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
435        else:
436            ioengine = "libaio"
437            spdk_conf = ""
438            out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
439            subsystems = [x for x in out.split("\n") if "nvme" in x]
440
441        if self.cpus_allowed is not None:
442            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
443            cpus_num = 0
444            cpus = self.cpus_allowed.split(",")
445            for cpu in cpus:
446                if "-" in cpu:
447                    a, b = cpu.split("-")
448                    a = int(a)
449                    b = int(b)
450                    cpus_num += len(range(a, b))
451                else:
452                    cpus_num += 1
453            threads = range(0, cpus_num)
454        elif hasattr(self, 'num_cores'):
455            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
456            threads = range(0, int(self.num_cores))
457        else:
458            threads = range(0, len(subsystems))
459
460        if "spdk" in self.mode:
461            filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs)
462        else:
463            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
464
465        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
466                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
467                                              ramp_time=ramp_time, run_time=run_time)
468        if num_jobs:
469            fio_config = fio_config + "numjobs=%s \n" % num_jobs
470        if self.cpus_allowed is not None:
471            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
472            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
473        fio_config = fio_config + filename_section
474
475        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
476        if hasattr(self, "num_cores"):
477            fio_config_filename += "_%sCPU" % self.num_cores
478        fio_config_filename += ".fio"
479
480        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
481        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
482        self.log_print("Created FIO Config:")
483        self.log_print(fio_config)
484
485        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
486
487    def set_cpu_frequency(self):
488        if self.cpu_frequency is not None:
489            try:
490                self.remote_call('sudo cpupower frequency-set -g userspace')
491                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
492                cmd = "sudo cpupower frequency-info"
493                output, error = self.remote_call(cmd)
494                self.log_print(output)
495                self.log_print(error)
496            except Exception:
497                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
498                sys.exit()
499        else:
500            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
501
502    def run_fio(self, fio_config_file, run_num=None):
503        job_name, _ = os.path.splitext(fio_config_file)
504        self.log_print("Starting FIO run for job: %s" % job_name)
505        self.log_print("Using FIO: %s" % self.fio_bin)
506
507        if run_num:
508            for i in range(1, run_num + 1):
509                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
510                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
511                output, error = self.remote_call(cmd)
512                self.log_print(output)
513                self.log_print(error)
514        else:
515            output_filename = job_name + "_" + self.name + ".json"
516            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
517            output, error = self.remote_call(cmd)
518            self.log_print(output)
519            self.log_print(error)
520        self.log_print("FIO run finished. Results in: %s" % output_filename)
521
522    def sys_config(self):
523        self.log_print("====Kernel release:====")
524        self.log_print(self.remote_call('uname -r')[0])
525        self.log_print("====Kernel command line:====")
526        cmdline, error = self.remote_call('cat /proc/cmdline')
527        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
528        self.log_print("====sysctl conf:====")
529        sysctl, error = self.remote_call('cat /etc/sysctl.conf')
530        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
531        self.log_print("====Cpu power info:====")
532        self.remote_call("cpupower frequency-info")
533
534
535class KernelTarget(Target):
536    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
537                 null_block_devices=0, sar_settings=None, pcm_settings=None,
538                 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs):
539
540        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
541                                           null_block_devices, sar_settings, pcm_settings, bandwidth_settings,
542                                           dpdk_settings)
543        self.nvmet_bin = nvmet_bin
544
545    def __del__(self):
546        nvmet_command(self.nvmet_bin, "clear")
547
548    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
549
550        nvmet_cfg = {
551            "ports": [],
552            "hosts": [],
553            "subsystems": [],
554        }
555
556        # Split disks between NIC IP's
557        disks_per_ip = int(len(nvme_list) / len(address_list))
558        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
559
560        subsys_no = 1
561        port_no = 0
562        for ip, chunk in zip(address_list, disk_chunks):
563            for disk in chunk:
564                nvmet_cfg["subsystems"].append({
565                    "allowed_hosts": [],
566                    "attr": {
567                        "allow_any_host": "1",
568                        "serial": "SPDK00%s" % subsys_no,
569                        "version": "1.3"
570                    },
571                    "namespaces": [
572                        {
573                            "device": {
574                                "path": disk,
575                                "uuid": "%s" % uuid.uuid4()
576                            },
577                            "enable": 1,
578                            "nsid": subsys_no
579                        }
580                    ],
581                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
582                })
583
584                nvmet_cfg["ports"].append({
585                    "addr": {
586                        "adrfam": "ipv4",
587                        "traddr": ip,
588                        "trsvcid": "%s" % (4420 + port_no),
589                        "trtype": "%s" % self.transport
590                    },
591                    "portid": subsys_no,
592                    "referrals": [],
593                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
594                })
595                subsys_no += 1
596                port_no += 1
597
598        with open("kernel.conf", "w") as fh:
599            fh.write(json.dumps(nvmet_cfg, indent=2))
600        pass
601
602    def tgt_start(self):
603        self.log_print("Configuring kernel NVMeOF Target")
604
605        if self.null_block:
606            print("Configuring with null block device.")
607            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
608            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
609            self.subsys_no = len(null_blk_list)
610        else:
611            print("Configuring with NVMe drives.")
612            nvme_list = get_nvme_devices()
613            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
614            self.subsys_no = len(nvme_list)
615
616        nvmet_command(self.nvmet_bin, "clear")
617        nvmet_command(self.nvmet_bin, "restore kernel.conf")
618        self.log_print("Done configuring kernel NVMeOF Target")
619
620
621class SPDKTarget(Target):
622
623    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
624                 null_block_devices=0, null_block_dif_type=0, sar_settings=None, pcm_settings=None,
625                 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None,
626                 scheduler_settings="static", num_shared_buffers=4096, num_cores=1,
627                 dif_insert_strip=False, **kwargs):
628
629        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
630                                         null_block_devices, sar_settings, pcm_settings, bandwidth_settings,
631                                         dpdk_settings, zcopy_settings, scheduler_settings)
632        self.num_cores = num_cores
633        self.num_shared_buffers = num_shared_buffers
634        self.null_block_dif_type = null_block_dif_type
635        self.dif_insert_strip = dif_insert_strip
636
637    def spdk_tgt_configure(self):
638        self.log_print("Configuring SPDK NVMeOF target via RPC")
639        numa_list = get_used_numa_nodes()
640
641        # Create RDMA transport layer
642        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
643                                       num_shared_buffers=self.num_shared_buffers,
644                                       dif_insert_or_strip=self.dif_insert_strip)
645        self.log_print("SPDK NVMeOF transport layer:")
646        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
647
648        if self.null_block:
649            nvme_section = self.spdk_tgt_add_nullblock(self.null_block)
650            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
651        else:
652            nvme_section = self.spdk_tgt_add_nvme_conf()
653            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
654        self.log_print("Done configuring SPDK NVMeOF Target")
655
656    def spdk_tgt_add_nullblock(self, null_block_count):
657        md_size = 0
658        block_size = 4096
659        if self.null_block_dif_type != 0:
660            md_size = 128
661
662        self.log_print("Adding null block bdevices to config via RPC")
663        for i in range(null_block_count):
664            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
665            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
666                                      dif_type=self.null_block_dif_type, md_size=md_size)
667        self.log_print("SPDK Bdevs configuration:")
668        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
669
670    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
671        self.log_print("Adding NVMe bdevs to config via RPC")
672
673        bdfs = get_nvme_devices_bdf()
674        bdfs = [b.replace(":", ".") for b in bdfs]
675
676        if req_num_disks:
677            if req_num_disks > len(bdfs):
678                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
679                sys.exit(1)
680            else:
681                bdfs = bdfs[0:req_num_disks]
682
683        for i, bdf in enumerate(bdfs):
684            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
685
686        self.log_print("SPDK Bdevs configuration:")
687        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
688
689    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
690        self.log_print("Adding subsystems to config")
691        if not req_num_disks:
692            req_num_disks = get_nvme_devices_count()
693
694        # Distribute bdevs between provided NICs
695        num_disks = range(0, req_num_disks)
696        if len(num_disks) == 1:
697            disks_per_ip = 1
698        else:
699            disks_per_ip = int(len(num_disks) / len(ips))
700        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
701
702        # Create subsystems, add bdevs to namespaces, add listeners
703        for ip, chunk in zip(ips, disk_chunks):
704            for c in chunk:
705                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
706                serial = "SPDK00%s" % c
707                bdev_name = "Nvme%sn1" % c
708                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
709                                               allow_any_host=True, max_namespaces=8)
710                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
711
712                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
713                                                     trtype=self.transport,
714                                                     traddr=ip,
715                                                     trsvcid="4420",
716                                                     adrfam="ipv4")
717
718        self.log_print("SPDK NVMeOF subsystem configuration:")
719        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
720
721    def tgt_start(self):
722        if self.null_block:
723            self.subsys_no = 1
724        else:
725            self.subsys_no = get_nvme_devices_count()
726        self.log_print("Starting SPDK NVMeOF Target process")
727        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt --wait-for-rpc")
728        command = " ".join([nvmf_app_path, "-m", self.num_cores])
729        proc = subprocess.Popen(command, shell=True)
730        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
731
732        with open(self.pid, "w") as fh:
733            fh.write(str(proc.pid))
734        self.nvmf_proc = proc
735        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
736        self.log_print("Waiting for spdk to initilize...")
737        while True:
738            if os.path.exists("/var/tmp/spdk.sock"):
739                break
740            time.sleep(1)
741        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
742
743        if self.enable_zcopy:
744            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
745                                           enable_zerocopy_send=True)
746            self.log_print("Target socket options:")
747            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
748
749        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
750
751        rpc.framework_start_init(self.client)
752        self.spdk_tgt_configure()
753
754    def __del__(self):
755        if hasattr(self, "nvmf_proc"):
756            try:
757                self.nvmf_proc.terminate()
758                self.nvmf_proc.wait()
759            except Exception as e:
760                self.log_print(e)
761                self.nvmf_proc.kill()
762                self.nvmf_proc.communicate()
763
764
765class KernelInitiator(Initiator):
766    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
767                 cpus_allowed=None, cpus_allowed_policy="shared",
768                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
769
770        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
771                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
772                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
773
774        self.extra_params = ""
775        if kwargs["extra_params"]:
776            self.extra_params = kwargs["extra_params"]
777
778    def __del__(self):
779        self.ssh_connection.close()
780
781    def kernel_init_connect(self, address_list, subsys_no):
782        subsystems = self.discover_subsystems(address_list, subsys_no)
783        self.log_print("Below connection attempts may result in error messages, this is expected!")
784        for subsystem in subsystems:
785            self.log_print("Trying to connect %s %s %s" % subsystem)
786            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
787                                                                             self.transport,
788                                                                             *subsystem,
789                                                                             self.extra_params))
790            time.sleep(2)
791
792    def kernel_init_disconnect(self, address_list, subsys_no):
793        subsystems = self.discover_subsystems(address_list, subsys_no)
794        for subsystem in subsystems:
795            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
796            time.sleep(1)
797
798    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
799        out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
800        nvme_list = [x for x in out.split("\n") if "nvme" in x]
801
802        filename_section = ""
803        nvme_per_split = int(len(nvme_list) / len(threads))
804        remainder = len(nvme_list) % len(threads)
805        iterator = iter(nvme_list)
806        result = []
807        for i in range(len(threads)):
808            result.append([])
809            for j in range(nvme_per_split):
810                result[i].append(next(iterator))
811                if remainder:
812                    result[i].append(next(iterator))
813                    remainder -= 1
814        for i, r in enumerate(result):
815            header = "[filename%s]" % i
816            disks = "\n".join(["filename=%s" % x for x in r])
817            job_section_qd = round((io_depth * len(r)) / num_jobs)
818            if job_section_qd == 0:
819                job_section_qd = 1
820            iodepth = "iodepth=%s" % job_section_qd
821            filename_section = "\n".join([filename_section, header, disks, iodepth])
822
823        return filename_section
824
825
826class SPDKInitiator(Initiator):
827    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
828                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
829                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
830        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
831                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
832                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
833
834        self.num_cores = num_cores
835
836    def install_spdk(self, local_spdk_zip):
837        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
838        self.log_print("Copied sources zip from target")
839        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
840
841        self.log_print("Sources unpacked")
842        self.log_print("Using fio binary %s" % self.fio_bin)
843        self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;"
844                         "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
845
846        self.log_print("SPDK built")
847        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
848
849    def gen_spdk_bdev_conf(self, remote_subsystem_list):
850        bdev_cfg_section = {
851            "subsystems": [
852                {
853                    "subsystem": "bdev",
854                    "config": []
855                }
856            ]
857        }
858
859        for i, subsys in enumerate(remote_subsystem_list):
860            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
861            nvme_ctrl = {
862                "method": "bdev_nvme_attach_controller",
863                "params": {
864                    "name": "Nvme{}".format(i),
865                    "trtype": self.transport,
866                    "traddr": sub_addr,
867                    "trsvcid": sub_port,
868                    "subnqn": sub_nqn,
869                    "adrfam": "IPv4"
870                }
871            }
872            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
873
874        return json.dumps(bdev_cfg_section, indent=2)
875
876    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
877        filename_section = ""
878        if len(threads) >= len(subsystems):
879            threads = range(0, len(subsystems))
880        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
881        nvme_per_split = int(len(subsystems) / len(threads))
882        remainder = len(subsystems) % len(threads)
883        iterator = iter(filenames)
884        result = []
885        for i in range(len(threads)):
886            result.append([])
887            for j in range(nvme_per_split):
888                result[i].append(next(iterator))
889            if remainder:
890                result[i].append(next(iterator))
891                remainder -= 1
892        for i, r in enumerate(result):
893            header = "[filename%s]" % i
894            disks = "\n".join(["filename=%s" % x for x in r])
895            job_section_qd = round((io_depth * len(r)) / num_jobs)
896            if job_section_qd == 0:
897                job_section_qd = 1
898            iodepth = "iodepth=%s" % job_section_qd
899            filename_section = "\n".join([filename_section, header, disks, iodepth])
900
901        return filename_section
902
903
904if __name__ == "__main__":
905    spdk_zip_path = "/tmp/spdk.zip"
906    target_results_dir = "/tmp/results"
907
908    if (len(sys.argv) > 1):
909        config_file_path = sys.argv[1]
910    else:
911        script_full_dir = os.path.dirname(os.path.realpath(__file__))
912        config_file_path = os.path.join(script_full_dir, "config.json")
913
914    print("Using config file: %s" % config_file_path)
915    with open(config_file_path, "r") as config:
916        data = json.load(config)
917
918    initiators = []
919    fio_cases = []
920
921    for k, v in data.items():
922        if "target" in k:
923            if data[k]["mode"] == "spdk":
924                target_obj = SPDKTarget(name=k, **data["general"], **v)
925            elif data[k]["mode"] == "kernel":
926                target_obj = KernelTarget(name=k, **data["general"], **v)
927        elif "initiator" in k:
928            if data[k]["mode"] == "spdk":
929                init_obj = SPDKInitiator(name=k, **data["general"], **v)
930            elif data[k]["mode"] == "kernel":
931                init_obj = KernelInitiator(name=k, **data["general"], **v)
932            initiators.append(init_obj)
933        elif "fio" in k:
934            fio_workloads = itertools.product(data[k]["bs"],
935                                              data[k]["qd"],
936                                              data[k]["rw"])
937
938            fio_run_time = data[k]["run_time"]
939            fio_ramp_time = data[k]["ramp_time"]
940            fio_rw_mix_read = data[k]["rwmixread"]
941            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
942            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
943        else:
944            continue
945
946    # Copy and install SPDK on remote initiators
947    if "skip_spdk_install" not in data["general"]:
948        target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
949        threads = []
950        for i in initiators:
951            if i.mode == "spdk":
952                t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
953                threads.append(t)
954                t.start()
955        for t in threads:
956            t.join()
957
958    target_obj.tgt_start()
959
960    # Poor mans threading
961    # Run FIO tests
962    for block_size, io_depth, rw in fio_workloads:
963        threads = []
964        configs = []
965        for i in initiators:
966            if i.mode == "kernel":
967                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
968
969            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
970                                   fio_num_jobs, fio_ramp_time, fio_run_time)
971            configs.append(cfg)
972
973        for i, cfg in zip(initiators, configs):
974            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
975            threads.append(t)
976        if target_obj.enable_sar:
977            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
978            sar_file_name = ".".join([sar_file_name, "txt"])
979            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
980            threads.append(t)
981
982        if target_obj.enable_pcm:
983            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
984            pcm_file_name = ".".join([pcm_file_name, "csv"])
985            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
986            threads.append(t)
987
988        if target_obj.enable_pcm_memory:
989            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
990            pcm_file_name = ".".join([pcm_file_name, "csv"])
991            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
992            threads.append(t)
993
994        if target_obj.enable_pcm_power:
995            pcm_file_name = "_".join(["pcm_power", str(block_size), str(rw), str(io_depth)])
996            pcm_file_name = ".".join([pcm_file_name, "csv"])
997            t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_file_name,))
998            threads.append(t)
999
1000        if target_obj.enable_bandwidth:
1001            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1002            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1003            t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,))
1004            threads.append(t)
1005
1006        if target_obj.enable_dpdk_memory:
1007            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir))
1008            threads.append(t)
1009
1010        for t in threads:
1011            t.start()
1012        for t in threads:
1013            t.join()
1014
1015        for i in initiators:
1016            if i.mode == "kernel":
1017                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
1018            i.copy_result_files(target_results_dir)
1019
1020    target_obj.parse_results(target_results_dir)
1021