xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from collections import OrderedDict
18from common import *
19
20
21class Server:
22    def __init__(self, name, username, password, mode, nic_ips, transport):
23        self.name = name
24        self.mode = mode
25        self.username = username
26        self.password = password
27        self.nic_ips = nic_ips
28        self.transport = transport.lower()
29
30        if not re.match("^[A-Za-z0-9]*$", name):
31            self.log_print("Please use a name which contains only letters or numbers")
32            sys.exit(1)
33
34    def log_print(self, msg):
35        print("[%s] %s" % (self.name, msg), flush=True)
36
37    def get_uncommented_lines(self, lines):
38        return [l for l in lines if l and not l.startswith('#')]
39
40
41class Target(Server):
42    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
43                 null_block_devices=0, sar_settings=None, pcm_settings=None,
44                 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None,
45                 scheduler_settings="static"):
46
47        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
48        self.null_block = null_block_devices
49        self.enable_sar = False
50        self.enable_pcm_memory = False
51        self.enable_pcm = False
52        self.enable_bandwidth = False
53        self.enable_dpdk_memory = False
54        self.enable_zcopy = False
55        self.scheduler_name = scheduler_settings
56
57        if sar_settings:
58            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
59
60        if pcm_settings:
61            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
62
63        if bandwidth_settings:
64            self.enable_bandwidth, self.bandwidth_count = bandwidth_settings
65
66        if dpdk_settings:
67            self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings
68
69        if zcopy_settings:
70            self.enable_zcopy = zcopy_settings
71
72        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
73        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
74        self.sys_config()
75
76    def zip_spdk_sources(self, spdk_dir, dest_file):
77        self.log_print("Zipping SPDK source directory")
78        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
79        for root, directories, files in os.walk(spdk_dir, followlinks=True):
80            for file in files:
81                fh.write(os.path.relpath(os.path.join(root, file)))
82        fh.close()
83        self.log_print("Done zipping")
84
85    def read_json_stats(self, file):
86        with open(file, "r") as json_data:
87            data = json.load(json_data)
88            job_pos = 0  # job_post = 0 because using aggregated results
89
90            # Check if latency is in nano or microseconds to choose correct dict key
91            def get_lat_unit(key_prefix, dict_section):
92                # key prefix - lat, clat or slat.
93                # dict section - portion of json containing latency bucket in question
94                # Return dict key to access the bucket and unit as string
95                for k, v in dict_section.items():
96                    if k.startswith(key_prefix):
97                        return k, k.split("_")[1]
98
99            def get_clat_percentiles(clat_dict_leaf):
100                if "percentile" in clat_dict_leaf:
101                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
102                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
103                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
104                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
105
106                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
107                else:
108                    # Latest fio versions do not provide "percentile" results if no
109                    # measurements were done, so just return zeroes
110                    return [0, 0, 0, 0]
111
112            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
113            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
114            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
115            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
116            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
117            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
118            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
119            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
120                data["jobs"][job_pos]["read"][clat_key])
121
122            if "ns" in lat_unit:
123                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
124            if "ns" in clat_unit:
125                read_p99_lat = read_p99_lat / 1000
126                read_p99_9_lat = read_p99_9_lat / 1000
127                read_p99_99_lat = read_p99_99_lat / 1000
128                read_p99_999_lat = read_p99_999_lat / 1000
129
130            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
131            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
132            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
133            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
134            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
135            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
136            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
137            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
138                data["jobs"][job_pos]["write"][clat_key])
139
140            if "ns" in lat_unit:
141                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
142            if "ns" in clat_unit:
143                write_p99_lat = write_p99_lat / 1000
144                write_p99_9_lat = write_p99_9_lat / 1000
145                write_p99_99_lat = write_p99_99_lat / 1000
146                write_p99_999_lat = write_p99_999_lat / 1000
147
148        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
149                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
150                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
151                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
152
153    def parse_results(self, results_dir, initiator_count=None, run_num=None):
154        files = os.listdir(results_dir)
155        fio_files = filter(lambda x: ".fio" in x, files)
156        json_files = [x for x in files if ".json" in x]
157
158        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
159                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
160                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
161                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
162
163        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
164                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
165
166        header_line = ",".join(["Name", *headers])
167        aggr_header_line = ",".join(["Name", *aggr_headers])
168
169        # Create empty results file
170        csv_file = "nvmf_results.csv"
171        with open(os.path.join(results_dir, csv_file), "w") as fh:
172            fh.write(aggr_header_line + "\n")
173        rows = set()
174
175        for fio_config in fio_files:
176            self.log_print("Getting FIO stats for %s" % fio_config)
177            job_name, _ = os.path.splitext(fio_config)
178
179            # Look in the filename for rwmixread value. Function arguments do
180            # not have that information.
181            # TODO: Improve this function by directly using workload params instead
182            # of regexing through filenames.
183            if "read" in job_name:
184                rw_mixread = 1
185            elif "write" in job_name:
186                rw_mixread = 0
187            else:
188                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
189
190            # If "_CPU" exists in name - ignore it
191            # Initiators for the same job could have diffrent num_cores parameter
192            job_name = re.sub(r"_\d+CPU", "", job_name)
193            job_result_files = [x for x in json_files if job_name in x]
194            self.log_print("Matching result files for current fio config:")
195            for j in job_result_files:
196                self.log_print("\t %s" % j)
197
198            # There may have been more than 1 initiator used in test, need to check that
199            # Result files are created so that string after last "_" separator is server name
200            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
201            inits_avg_results = []
202            for i in inits_names:
203                self.log_print("\tGetting stats for initiator %s" % i)
204                # There may have been more than 1 test run for this job, calculate average results for initiator
205                i_results = [x for x in job_result_files if i in x]
206                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
207
208                separate_stats = []
209                for r in i_results:
210                    stats = self.read_json_stats(os.path.join(results_dir, r))
211                    separate_stats.append(stats)
212                    self.log_print(stats)
213
214                init_results = [sum(x) for x in zip(*separate_stats)]
215                init_results = [x / len(separate_stats) for x in init_results]
216                inits_avg_results.append(init_results)
217
218                self.log_print("\tAverage results for initiator %s" % i)
219                self.log_print(init_results)
220                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
221                    fh.write(header_line + "\n")
222                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
223
224            # Sum results of all initiators running this FIO job.
225            # Latency results are an average of latencies from accros all initiators.
226            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
227            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
228            for key in inits_avg_results:
229                if "lat" in key:
230                    inits_avg_results[key] /= len(inits_names)
231
232            # Aggregate separate read/write values into common labels
233            # Take rw_mixread into consideration for mixed read/write workloads.
234            aggregate_results = OrderedDict()
235            for h in aggr_headers:
236                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
237                if "lat" in h:
238                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
239                else:
240                    _ = read_stat + write_stat
241                aggregate_results[h] = "{0:.3f}".format(_)
242
243            rows.add(",".join([job_name, *aggregate_results.values()]))
244
245        # Save results to file
246        for row in rows:
247            with open(os.path.join(results_dir, csv_file), "a") as fh:
248                fh.write(row + "\n")
249        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
250
251    def measure_sar(self, results_dir, sar_file_name):
252        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
253        time.sleep(self.sar_delay)
254        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
255        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
256            for line in out.split("\n"):
257                if "Average" in line and "CPU" in line:
258                    self.log_print("Summary CPU utilization from SAR:")
259                    self.log_print(line)
260                if "Average" in line and "all" in line:
261                    self.log_print(line)
262            fh.write(out)
263
264    def measure_pcm_memory(self, results_dir, pcm_file_name):
265        time.sleep(self.pcm_delay)
266        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
267                                      results_dir, pcm_file_name), shell=True)
268        time.sleep(self.pcm_count)
269        pcm_memory.kill()
270
271    def measure_pcm(self, results_dir, pcm_file_name):
272        time.sleep(self.pcm_delay)
273        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
274                       results_dir, pcm_file_name), shell=True, check=True)
275        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
276        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
277        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
278        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
279        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
280
281    def measure_bandwidth(self, results_dir, bandwidth_file_name):
282        bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name,
283                             self.bandwidth_count), shell=True, check=True)
284
285    def measure_dpdk_memory(self, results_dir):
286        self.log_print("INFO: waiting to generate DPDK memory usage")
287        time.sleep(self.dpdk_wait_time)
288        self.log_print("INFO: generating DPDK memory usage")
289        rpc.env.env_dpdk_get_mem_stats
290        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
291
292    def sys_config(self):
293        self.log_print("====Kernel release:====")
294        self.log_print(os.uname().release)
295        self.log_print("====Kernel command line:====")
296        with open('/proc/cmdline') as f:
297            cmdline = f.readlines()
298            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
299        self.log_print("====sysctl conf:====")
300        with open('/etc/sysctl.conf') as f:
301            sysctl = f.readlines()
302            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
303        self.log_print("====Cpu power info:====")
304        subprocess.run("cpupower frequency-info", shell=True, check=True)
305        self.log_print("====zcopy settings:====")
306        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
307        self.log_print("====Scheduler settings:====")
308        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
309
310
311class Initiator(Server):
312    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
313                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
314                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
315
316        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
317
318        self.ip = ip
319        self.spdk_dir = workspace
320        if os.getenv('SPDK_WORKSPACE'):
321            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
322        self.fio_bin = fio_bin
323        self.cpus_allowed = cpus_allowed
324        self.cpus_allowed_policy = cpus_allowed_policy
325        self.cpu_frequency = cpu_frequency
326        self.nvmecli_bin = nvmecli_bin
327        self.ssh_connection = paramiko.SSHClient()
328        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
329        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
330        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
331        self.remote_call("mkdir -p %s" % self.spdk_dir)
332        self.set_cpu_frequency()
333        self.sys_config()
334
335    def __del__(self):
336        self.ssh_connection.close()
337
338    def put_file(self, local, remote_dest):
339        ftp = self.ssh_connection.open_sftp()
340        ftp.put(local, remote_dest)
341        ftp.close()
342
343    def get_file(self, remote, local_dest):
344        ftp = self.ssh_connection.open_sftp()
345        ftp.get(remote, local_dest)
346        ftp.close()
347
348    def remote_call(self, cmd):
349        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
350        out = stdout.read().decode(encoding="utf-8")
351        err = stderr.read().decode(encoding="utf-8")
352        return out, err
353
354    def copy_result_files(self, dest_dir):
355        self.log_print("Copying results")
356
357        if not os.path.exists(dest_dir):
358            os.mkdir(dest_dir)
359
360        # Get list of result files from initiator and copy them back to target
361        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
362        file_list = stdout.strip().split("\n")
363
364        for file in file_list:
365            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
366                          os.path.join(dest_dir, file))
367        self.log_print("Done copying results")
368
369    def discover_subsystems(self, address_list, subsys_no):
370        num_nvmes = range(0, subsys_no)
371        nvme_discover_output = ""
372        for ip, subsys_no in itertools.product(address_list, num_nvmes):
373            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
374            nvme_discover_cmd = ["sudo",
375                                 "%s" % self.nvmecli_bin,
376                                 "discover", "-t %s" % self.transport,
377                                 "-s %s" % (4420 + subsys_no),
378                                 "-a %s" % ip]
379            nvme_discover_cmd = " ".join(nvme_discover_cmd)
380
381            stdout, stderr = self.remote_call(nvme_discover_cmd)
382            if stdout:
383                nvme_discover_output = nvme_discover_output + stdout
384
385        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
386                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
387                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
388                                nvme_discover_output)  # from nvme discovery output
389        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
390        subsystems = list(set(subsystems))
391        subsystems.sort(key=lambda x: x[1])
392        self.log_print("Found matching subsystems on target side:")
393        for s in subsystems:
394            self.log_print(s)
395
396        return subsystems
397
398    def gen_fio_filename_conf(self, *args, **kwargs):
399        # Logic implemented in SPDKInitiator and KernelInitiator classes
400        pass
401
402    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
403        fio_conf_template = """
404[global]
405ioengine={ioengine}
406{spdk_conf}
407thread=1
408group_reporting=1
409direct=1
410percentile_list=50:90:99:99.5:99.9:99.99:99.999
411
412norandommap=1
413rw={rw}
414rwmixread={rwmixread}
415bs={block_size}
416time_based=1
417ramp_time={ramp_time}
418runtime={run_time}
419"""
420        if "spdk" in self.mode:
421            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
422            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
423            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
424            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
425            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
426        else:
427            ioengine = "libaio"
428            spdk_conf = ""
429            out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
430            subsystems = [x for x in out.split("\n") if "nvme" in x]
431
432        if self.cpus_allowed is not None:
433            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
434            cpus_num = 0
435            cpus = self.cpus_allowed.split(",")
436            for cpu in cpus:
437                if "-" in cpu:
438                    a, b = cpu.split("-")
439                    a = int(a)
440                    b = int(b)
441                    cpus_num += len(range(a, b))
442                else:
443                    cpus_num += 1
444            threads = range(0, cpus_num)
445        elif hasattr(self, 'num_cores'):
446            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
447            threads = range(0, int(self.num_cores))
448        else:
449            threads = range(0, len(subsystems))
450
451        if "spdk" in self.mode:
452            filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs)
453        else:
454            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
455
456        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
457                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
458                                              ramp_time=ramp_time, run_time=run_time)
459        if num_jobs:
460            fio_config = fio_config + "numjobs=%s \n" % num_jobs
461        if self.cpus_allowed is not None:
462            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
463            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
464        fio_config = fio_config + filename_section
465
466        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
467        if hasattr(self, "num_cores"):
468            fio_config_filename += "_%sCPU" % self.num_cores
469        fio_config_filename += ".fio"
470
471        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
472        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
473        self.log_print("Created FIO Config:")
474        self.log_print(fio_config)
475
476        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
477
478    def set_cpu_frequency(self):
479        if self.cpu_frequency is not None:
480            try:
481                self.remote_call('sudo cpupower frequency-set -g userspace')
482                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
483                cmd = "sudo cpupower frequency-info"
484                output, error = self.remote_call(cmd)
485                self.log_print(output)
486                self.log_print(error)
487            except Exception:
488                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
489                sys.exit()
490        else:
491            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
492
493    def run_fio(self, fio_config_file, run_num=None):
494        job_name, _ = os.path.splitext(fio_config_file)
495        self.log_print("Starting FIO run for job: %s" % job_name)
496        self.log_print("Using FIO: %s" % self.fio_bin)
497
498        if run_num:
499            for i in range(1, run_num + 1):
500                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
501                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
502                output, error = self.remote_call(cmd)
503                self.log_print(output)
504                self.log_print(error)
505        else:
506            output_filename = job_name + "_" + self.name + ".json"
507            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
508            output, error = self.remote_call(cmd)
509            self.log_print(output)
510            self.log_print(error)
511        self.log_print("FIO run finished. Results in: %s" % output_filename)
512
513    def sys_config(self):
514        self.log_print("====Kernel release:====")
515        self.log_print(self.remote_call('uname -r')[0])
516        self.log_print("====Kernel command line:====")
517        cmdline, error = self.remote_call('cat /proc/cmdline')
518        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
519        self.log_print("====sysctl conf:====")
520        sysctl, error = self.remote_call('cat /etc/sysctl.conf')
521        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
522        self.log_print("====Cpu power info:====")
523        self.remote_call("cpupower frequency-info")
524
525
526class KernelTarget(Target):
527    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
528                 null_block_devices=0, sar_settings=None, pcm_settings=None,
529                 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs):
530
531        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
532                                           null_block_devices, sar_settings, pcm_settings, bandwidth_settings,
533                                           dpdk_settings)
534        self.nvmet_bin = nvmet_bin
535
536    def __del__(self):
537        nvmet_command(self.nvmet_bin, "clear")
538
539    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
540
541        nvmet_cfg = {
542            "ports": [],
543            "hosts": [],
544            "subsystems": [],
545        }
546
547        # Split disks between NIC IP's
548        disks_per_ip = int(len(nvme_list) / len(address_list))
549        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
550
551        subsys_no = 1
552        port_no = 0
553        for ip, chunk in zip(address_list, disk_chunks):
554            for disk in chunk:
555                nvmet_cfg["subsystems"].append({
556                    "allowed_hosts": [],
557                    "attr": {
558                        "allow_any_host": "1",
559                        "serial": "SPDK00%s" % subsys_no,
560                        "version": "1.3"
561                    },
562                    "namespaces": [
563                        {
564                            "device": {
565                                "path": disk,
566                                "uuid": "%s" % uuid.uuid4()
567                            },
568                            "enable": 1,
569                            "nsid": subsys_no
570                        }
571                    ],
572                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
573                })
574
575                nvmet_cfg["ports"].append({
576                    "addr": {
577                        "adrfam": "ipv4",
578                        "traddr": ip,
579                        "trsvcid": "%s" % (4420 + port_no),
580                        "trtype": "%s" % self.transport
581                    },
582                    "portid": subsys_no,
583                    "referrals": [],
584                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
585                })
586                subsys_no += 1
587                port_no += 1
588
589        with open("kernel.conf", "w") as fh:
590            fh.write(json.dumps(nvmet_cfg, indent=2))
591        pass
592
593    def tgt_start(self):
594        self.log_print("Configuring kernel NVMeOF Target")
595
596        if self.null_block:
597            print("Configuring with null block device.")
598            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
599            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
600            self.subsys_no = len(null_blk_list)
601        else:
602            print("Configuring with NVMe drives.")
603            nvme_list = get_nvme_devices()
604            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
605            self.subsys_no = len(nvme_list)
606
607        nvmet_command(self.nvmet_bin, "clear")
608        nvmet_command(self.nvmet_bin, "restore kernel.conf")
609        self.log_print("Done configuring kernel NVMeOF Target")
610
611
612class SPDKTarget(Target):
613
614    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
615                 null_block_devices=0, null_block_dif_type=0, sar_settings=None, pcm_settings=None,
616                 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None,
617                 scheduler_settings="static", num_shared_buffers=4096, num_cores=1,
618                 dif_insert_strip=False, **kwargs):
619
620        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
621                                         null_block_devices, sar_settings, pcm_settings, bandwidth_settings,
622                                         dpdk_settings, zcopy_settings, scheduler_settings)
623        self.num_cores = num_cores
624        self.num_shared_buffers = num_shared_buffers
625        self.null_block_dif_type = null_block_dif_type
626        self.dif_insert_strip = dif_insert_strip
627
628    def spdk_tgt_configure(self):
629        self.log_print("Configuring SPDK NVMeOF target via RPC")
630        numa_list = get_used_numa_nodes()
631
632        # Create RDMA transport layer
633        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
634                                       num_shared_buffers=self.num_shared_buffers,
635                                       dif_insert_or_strip=self.dif_insert_strip)
636        self.log_print("SPDK NVMeOF transport layer:")
637        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
638
639        if self.null_block:
640            nvme_section = self.spdk_tgt_add_nullblock(self.null_block)
641            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
642        else:
643            nvme_section = self.spdk_tgt_add_nvme_conf()
644            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
645        self.log_print("Done configuring SPDK NVMeOF Target")
646
647    def spdk_tgt_add_nullblock(self, null_block_count):
648        md_size = 0
649        block_size = 4096
650        if self.null_block_dif_type != 0:
651            md_size = 128
652
653        self.log_print("Adding null block bdevices to config via RPC")
654        for i in range(null_block_count):
655            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
656            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
657                                      dif_type=self.null_block_dif_type, md_size=md_size)
658        self.log_print("SPDK Bdevs configuration:")
659        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
660
661    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
662        self.log_print("Adding NVMe bdevs to config via RPC")
663
664        bdfs = get_nvme_devices_bdf()
665        bdfs = [b.replace(":", ".") for b in bdfs]
666
667        if req_num_disks:
668            if req_num_disks > len(bdfs):
669                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
670                sys.exit(1)
671            else:
672                bdfs = bdfs[0:req_num_disks]
673
674        for i, bdf in enumerate(bdfs):
675            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
676
677        self.log_print("SPDK Bdevs configuration:")
678        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
679
680    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
681        self.log_print("Adding subsystems to config")
682        if not req_num_disks:
683            req_num_disks = get_nvme_devices_count()
684
685        # Distribute bdevs between provided NICs
686        num_disks = range(0, req_num_disks)
687        if len(num_disks) == 1:
688            disks_per_ip = 1
689        else:
690            disks_per_ip = int(len(num_disks) / len(ips))
691        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
692
693        # Create subsystems, add bdevs to namespaces, add listeners
694        for ip, chunk in zip(ips, disk_chunks):
695            for c in chunk:
696                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
697                serial = "SPDK00%s" % c
698                bdev_name = "Nvme%sn1" % c
699                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
700                                               allow_any_host=True, max_namespaces=8)
701                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
702
703                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
704                                                     trtype=self.transport,
705                                                     traddr=ip,
706                                                     trsvcid="4420",
707                                                     adrfam="ipv4")
708
709        self.log_print("SPDK NVMeOF subsystem configuration:")
710        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
711
712    def tgt_start(self):
713        if self.null_block:
714            self.subsys_no = 1
715        else:
716            self.subsys_no = get_nvme_devices_count()
717        self.log_print("Starting SPDK NVMeOF Target process")
718        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt --wait-for-rpc")
719        command = " ".join([nvmf_app_path, "-m", self.num_cores])
720        proc = subprocess.Popen(command, shell=True)
721        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
722
723        with open(self.pid, "w") as fh:
724            fh.write(str(proc.pid))
725        self.nvmf_proc = proc
726        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
727        self.log_print("Waiting for spdk to initilize...")
728        while True:
729            if os.path.exists("/var/tmp/spdk.sock"):
730                break
731            time.sleep(1)
732        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
733
734        if self.enable_zcopy:
735            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
736                                           enable_zerocopy_send=True)
737            self.log_print("Target socket options:")
738            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
739
740        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
741
742        rpc.framework_start_init(self.client)
743        self.spdk_tgt_configure()
744
745    def __del__(self):
746        if hasattr(self, "nvmf_proc"):
747            try:
748                self.nvmf_proc.terminate()
749                self.nvmf_proc.wait()
750            except Exception as e:
751                self.log_print(e)
752                self.nvmf_proc.kill()
753                self.nvmf_proc.communicate()
754
755
756class KernelInitiator(Initiator):
757    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
758                 cpus_allowed=None, cpus_allowed_policy="shared",
759                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
760
761        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
762                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
763                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
764
765        self.extra_params = ""
766        if kwargs["extra_params"]:
767            self.extra_params = kwargs["extra_params"]
768
769    def __del__(self):
770        self.ssh_connection.close()
771
772    def kernel_init_connect(self, address_list, subsys_no):
773        subsystems = self.discover_subsystems(address_list, subsys_no)
774        self.log_print("Below connection attempts may result in error messages, this is expected!")
775        for subsystem in subsystems:
776            self.log_print("Trying to connect %s %s %s" % subsystem)
777            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
778                                                                             self.transport,
779                                                                             *subsystem,
780                                                                             self.extra_params))
781            time.sleep(2)
782
783    def kernel_init_disconnect(self, address_list, subsys_no):
784        subsystems = self.discover_subsystems(address_list, subsys_no)
785        for subsystem in subsystems:
786            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
787            time.sleep(1)
788
789    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
790        out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
791        nvme_list = [x for x in out.split("\n") if "nvme" in x]
792
793        filename_section = ""
794        nvme_per_split = int(len(nvme_list) / len(threads))
795        remainder = len(nvme_list) % len(threads)
796        iterator = iter(nvme_list)
797        result = []
798        for i in range(len(threads)):
799            result.append([])
800            for j in range(nvme_per_split):
801                result[i].append(next(iterator))
802                if remainder:
803                    result[i].append(next(iterator))
804                    remainder -= 1
805        for i, r in enumerate(result):
806            header = "[filename%s]" % i
807            disks = "\n".join(["filename=%s" % x for x in r])
808            job_section_qd = round((io_depth * len(r)) / num_jobs)
809            if job_section_qd == 0:
810                job_section_qd = 1
811            iodepth = "iodepth=%s" % job_section_qd
812            filename_section = "\n".join([filename_section, header, disks, iodepth])
813
814        return filename_section
815
816
817class SPDKInitiator(Initiator):
818    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
819                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
820                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
821        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
822                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
823                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
824
825        self.num_cores = num_cores
826
827    def install_spdk(self, local_spdk_zip):
828        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
829        self.log_print("Copied sources zip from target")
830        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
831
832        self.log_print("Sources unpacked")
833        self.log_print("Using fio binary %s" % self.fio_bin)
834        self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;"
835                         "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
836
837        self.log_print("SPDK built")
838        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
839
840    def gen_spdk_bdev_conf(self, remote_subsystem_list):
841        bdev_cfg_section = {
842            "subsystems": [
843                {
844                    "subsystem": "bdev",
845                    "config": []
846                }
847            ]
848        }
849
850        for i, subsys in enumerate(remote_subsystem_list):
851            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
852            nvme_ctrl = {
853                "method": "bdev_nvme_attach_controller",
854                "params": {
855                    "name": "Nvme{}".format(i),
856                    "trtype": self.transport,
857                    "traddr": sub_addr,
858                    "trsvcid": sub_port,
859                    "subnqn": sub_nqn,
860                    "adrfam": "IPv4"
861                }
862            }
863            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
864
865        return json.dumps(bdev_cfg_section, indent=2)
866
867    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
868        filename_section = ""
869        if len(threads) >= len(subsystems):
870            threads = range(0, len(subsystems))
871        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
872        nvme_per_split = int(len(subsystems) / len(threads))
873        remainder = len(subsystems) % len(threads)
874        iterator = iter(filenames)
875        result = []
876        for i in range(len(threads)):
877            result.append([])
878            for j in range(nvme_per_split):
879                result[i].append(next(iterator))
880            if remainder:
881                result[i].append(next(iterator))
882                remainder -= 1
883        for i, r in enumerate(result):
884            header = "[filename%s]" % i
885            disks = "\n".join(["filename=%s" % x for x in r])
886            job_section_qd = round((io_depth * len(r)) / num_jobs)
887            if job_section_qd == 0:
888                job_section_qd = 1
889            iodepth = "iodepth=%s" % job_section_qd
890            filename_section = "\n".join([filename_section, header, disks, iodepth])
891
892        return filename_section
893
894
895if __name__ == "__main__":
896    spdk_zip_path = "/tmp/spdk.zip"
897    target_results_dir = "/tmp/results"
898
899    if (len(sys.argv) > 1):
900        config_file_path = sys.argv[1]
901    else:
902        script_full_dir = os.path.dirname(os.path.realpath(__file__))
903        config_file_path = os.path.join(script_full_dir, "config.json")
904
905    print("Using config file: %s" % config_file_path)
906    with open(config_file_path, "r") as config:
907        data = json.load(config)
908
909    initiators = []
910    fio_cases = []
911
912    for k, v in data.items():
913        if "target" in k:
914            if data[k]["mode"] == "spdk":
915                target_obj = SPDKTarget(name=k, **data["general"], **v)
916            elif data[k]["mode"] == "kernel":
917                target_obj = KernelTarget(name=k, **data["general"], **v)
918        elif "initiator" in k:
919            if data[k]["mode"] == "spdk":
920                init_obj = SPDKInitiator(name=k, **data["general"], **v)
921            elif data[k]["mode"] == "kernel":
922                init_obj = KernelInitiator(name=k, **data["general"], **v)
923            initiators.append(init_obj)
924        elif "fio" in k:
925            fio_workloads = itertools.product(data[k]["bs"],
926                                              data[k]["qd"],
927                                              data[k]["rw"])
928
929            fio_run_time = data[k]["run_time"]
930            fio_ramp_time = data[k]["ramp_time"]
931            fio_rw_mix_read = data[k]["rwmixread"]
932            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
933            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
934        else:
935            continue
936
937    # Copy and install SPDK on remote initiators
938    if "skip_spdk_install" not in data["general"]:
939        target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
940        threads = []
941        for i in initiators:
942            if i.mode == "spdk":
943                t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
944                threads.append(t)
945                t.start()
946        for t in threads:
947            t.join()
948
949    target_obj.tgt_start()
950
951    # Poor mans threading
952    # Run FIO tests
953    for block_size, io_depth, rw in fio_workloads:
954        threads = []
955        configs = []
956        for i in initiators:
957            if i.mode == "kernel":
958                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
959
960            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
961                                   fio_num_jobs, fio_ramp_time, fio_run_time)
962            configs.append(cfg)
963
964        for i, cfg in zip(initiators, configs):
965            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
966            threads.append(t)
967        if target_obj.enable_sar:
968            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
969            sar_file_name = ".".join([sar_file_name, "txt"])
970            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
971            threads.append(t)
972
973        if target_obj.enable_pcm:
974            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
975            pcm_file_name = ".".join([pcm_file_name, "csv"])
976            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
977            threads.append(t)
978
979        if target_obj.enable_pcm_memory:
980            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
981            pcm_file_name = ".".join([pcm_file_name, "csv"])
982            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
983            threads.append(t)
984
985        if target_obj.enable_bandwidth:
986            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
987            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
988            t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,))
989            threads.append(t)
990
991        if target_obj.enable_dpdk_memory:
992            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir))
993            threads.append(t)
994
995        for t in threads:
996            t.start()
997        for t in threads:
998            t.join()
999
1000        for i in initiators:
1001            if i.mode == "kernel":
1002                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
1003            i.copy_result_files(target_results_dir)
1004
1005    target_obj.parse_results(target_results_dir)
1006