xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision cfb65ba61180ae80899756239e08f4656866beb3)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from common import *
18
19
20class Server:
21    def __init__(self, name, username, password, mode, nic_ips, transport):
22        self.name = name
23        self.mode = mode
24        self.username = username
25        self.password = password
26        self.nic_ips = nic_ips
27        self.transport = transport.lower()
28
29        if not re.match("^[A-Za-z0-9]*$", name):
30            self.log_print("Please use a name which contains only letters or numbers")
31            sys.exit(1)
32
33    def log_print(self, msg):
34        print("[%s] %s" % (self.name, msg), flush=True)
35
36
37class Target(Server):
38    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
39                 use_null_block=False, sar_settings=None, pcm_settings=None):
40
41        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
42        self.null_block = bool(use_null_block)
43        self.enable_sar = False
44        self.enable_pcm_memory = False
45        self.enable_pcm = False
46
47        if sar_settings:
48            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
49
50        if pcm_settings:
51            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
52
53        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
54        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
55
56    def zip_spdk_sources(self, spdk_dir, dest_file):
57        self.log_print("Zipping SPDK source directory")
58        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
59        for root, directories, files in os.walk(spdk_dir, followlinks=True):
60            for file in files:
61                fh.write(os.path.relpath(os.path.join(root, file)))
62        fh.close()
63        self.log_print("Done zipping")
64
65    def read_json_stats(self, file):
66        with open(file, "r") as json_data:
67            data = json.load(json_data)
68            job_pos = 0  # job_post = 0 because using aggregated results
69
70            # Check if latency is in nano or microseconds to choose correct dict key
71            def get_lat_unit(key_prefix, dict_section):
72                # key prefix - lat, clat or slat.
73                # dict section - portion of json containing latency bucket in question
74                # Return dict key to access the bucket and unit as string
75                for k, v in dict_section.items():
76                    if k.startswith(key_prefix):
77                        return k, k.split("_")[1]
78
79            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
80            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
81            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
82            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
83            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
84            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
85            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
86            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
87            read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
88            read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
89            read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
90
91            if "ns" in lat_unit:
92                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
93            if "ns" in clat_unit:
94                read_p99_lat = read_p99_lat / 1000
95                read_p99_9_lat = read_p99_9_lat / 1000
96                read_p99_99_lat = read_p99_99_lat / 1000
97                read_p99_999_lat = read_p99_999_lat / 1000
98
99            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
100            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
101            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
102            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
103            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
104            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
105            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
106            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
107            write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
108            write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
109            write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
110
111            if "ns" in lat_unit:
112                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
113            if "ns" in clat_unit:
114                write_p99_lat = write_p99_lat / 1000
115                write_p99_9_lat = write_p99_9_lat / 1000
116                write_p99_99_lat = write_p99_99_lat / 1000
117                write_p99_999_lat = write_p99_999_lat / 1000
118
119        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
120                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
121                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
122                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
123
124    def parse_results(self, results_dir, initiator_count=None, run_num=None):
125        files = os.listdir(results_dir)
126        fio_files = filter(lambda x: ".fio" in x, files)
127        json_files = [x for x in files if ".json" in x]
128
129        # Create empty results file
130        csv_file = "nvmf_results.csv"
131        with open(os.path.join(results_dir, csv_file), "w") as fh:
132            header_line = ",".join(["Name",
133                                    "read_iops", "read_bw", "read_avg_lat_us",
134                                    "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us",
135                                    "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
136                                    "write_iops", "write_bw", "write_avg_lat_us",
137                                    "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us",
138                                    "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"])
139            fh.write(header_line + "\n")
140        rows = set()
141
142        for fio_config in fio_files:
143            self.log_print("Getting FIO stats for %s" % fio_config)
144            job_name, _ = os.path.splitext(fio_config)
145
146            # If "_CPU" exists in name - ignore it
147            # Initiators for the same job could have diffrent num_cores parameter
148            job_name = re.sub(r"_\d+CPU", "", job_name)
149            job_result_files = [x for x in json_files if job_name in x]
150            self.log_print("Matching result files for current fio config:")
151            for j in job_result_files:
152                self.log_print("\t %s" % j)
153
154            # There may have been more than 1 initiator used in test, need to check that
155            # Result files are created so that string after last "_" separator is server name
156            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
157            inits_avg_results = []
158            for i in inits_names:
159                self.log_print("\tGetting stats for initiator %s" % i)
160                # There may have been more than 1 test run for this job, calculate average results for initiator
161                i_results = [x for x in job_result_files if i in x]
162
163                separate_stats = []
164                for r in i_results:
165                    stats = self.read_json_stats(os.path.join(results_dir, r))
166                    separate_stats.append(stats)
167                    self.log_print(stats)
168
169                z = [sum(c) for c in zip(*separate_stats)]
170                z = [c/len(separate_stats) for c in z]
171                inits_avg_results.append(z)
172
173                self.log_print("\tAverage results for initiator %s" % i)
174                self.log_print(z)
175
176            # Sum average results of all initiators running this FIO job
177            self.log_print("\tTotal results for %s from all initiators" % fio_config)
178            for a in inits_avg_results:
179                self.log_print(a)
180            total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)]
181            rows.add(",".join([job_name, *total]))
182
183        # Save results to file
184        for row in rows:
185            with open(os.path.join(results_dir, csv_file), "a") as fh:
186                fh.write(row + "\n")
187        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
188
189    def measure_sar(self, results_dir, sar_file_name):
190        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
191        time.sleep(self.sar_delay)
192        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
193        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
194            for line in out.split("\n"):
195                if "Average" in line and "CPU" in line:
196                    self.log_print("Summary CPU utilization from SAR:")
197                    self.log_print(line)
198                if "Average" in line and "all" in line:
199                    self.log_print(line)
200            fh.write(out)
201
202    def measure_pcm_memory(self, results_dir, pcm_file_name):
203        time.sleep(self.pcm_delay)
204        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
205                                      results_dir, pcm_file_name), shell=True)
206        time.sleep(self.pcm_count)
207        pcm_memory.kill()
208
209    def measure_pcm(self, results_dir, pcm_file_name):
210        time.sleep(self.pcm_delay)
211        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
212                       results_dir, pcm_file_name), shell=True, check=True)
213        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
214        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
215        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
216        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
217        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
218
219
220class Initiator(Server):
221    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
222                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
223                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
224
225        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
226
227        self.ip = ip
228        self.spdk_dir = workspace
229        self.fio_bin = fio_bin
230        self.cpus_allowed = cpus_allowed
231        self.cpus_allowed_policy = cpus_allowed_policy
232        self.cpu_frequency = cpu_frequency
233        self.nvmecli_bin = nvmecli_bin
234        self.ssh_connection = paramiko.SSHClient()
235        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
236        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
237        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
238        self.remote_call("mkdir -p %s" % self.spdk_dir)
239        self.set_cpu_frequency()
240
241    def __del__(self):
242        self.ssh_connection.close()
243
244    def put_file(self, local, remote_dest):
245        ftp = self.ssh_connection.open_sftp()
246        ftp.put(local, remote_dest)
247        ftp.close()
248
249    def get_file(self, remote, local_dest):
250        ftp = self.ssh_connection.open_sftp()
251        ftp.get(remote, local_dest)
252        ftp.close()
253
254    def remote_call(self, cmd):
255        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
256        out = stdout.read().decode(encoding="utf-8")
257        err = stderr.read().decode(encoding="utf-8")
258        return out, err
259
260    def copy_result_files(self, dest_dir):
261        self.log_print("Copying results")
262
263        if not os.path.exists(dest_dir):
264            os.mkdir(dest_dir)
265
266        # Get list of result files from initiator and copy them back to target
267        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
268        file_list = stdout.strip().split("\n")
269
270        for file in file_list:
271            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
272                          os.path.join(dest_dir, file))
273        self.log_print("Done copying results")
274
275    def discover_subsystems(self, address_list, subsys_no):
276        num_nvmes = range(0, subsys_no)
277        nvme_discover_output = ""
278        for ip, subsys_no in itertools.product(address_list, num_nvmes):
279            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
280            nvme_discover_cmd = ["sudo",
281                                 "%s" % self.nvmecli_bin,
282                                 "discover", "-t %s" % self.transport,
283                                 "-s %s" % (4420 + subsys_no),
284                                 "-a %s" % ip]
285            nvme_discover_cmd = " ".join(nvme_discover_cmd)
286
287            stdout, stderr = self.remote_call(nvme_discover_cmd)
288            if stdout:
289                nvme_discover_output = nvme_discover_output + stdout
290
291        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
292                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
293                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
294                                nvme_discover_output)  # from nvme discovery output
295        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
296        subsystems = list(set(subsystems))
297        subsystems.sort(key=lambda x: x[1])
298        self.log_print("Found matching subsystems on target side:")
299        for s in subsystems:
300            self.log_print(s)
301
302        return subsystems
303
304    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
305        fio_conf_template = """
306[global]
307ioengine={ioengine}
308{spdk_conf}
309thread=1
310group_reporting=1
311direct=1
312percentile_list=50:90:99:99.5:99.9:99.99:99.999
313
314norandommap=1
315rw={rw}
316rwmixread={rwmixread}
317bs={block_size}
318iodepth={io_depth}
319time_based=1
320ramp_time={ramp_time}
321runtime={run_time}
322"""
323        if "spdk" in self.mode:
324            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
325            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
326            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
327            ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir
328            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
329        else:
330            ioengine = "libaio"
331            spdk_conf = ""
332            out, err = self.remote_call("lsblk -o NAME -nlp")
333            subsystems = [x for x in out.split("\n") if "nvme" in x]
334
335        if self.cpus_allowed is not None:
336            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
337            cpus_num = 0
338            cpus = self.cpus_allowed.split(",")
339            for cpu in cpus:
340                if "-" in cpu:
341                    a, b = cpu.split("-")
342                    a = int(a)
343                    b = int(b)
344                    cpus_num += len(range(a, b))
345                else:
346                    cpus_num += 1
347            threads = range(0, cpus_num)
348        elif hasattr(self, 'num_cores'):
349            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
350            threads = range(0, int(self.num_cores))
351        else:
352            threads = range(0, len(subsystems))
353
354        if "spdk" in self.mode:
355            filename_section = self.gen_fio_filename_conf(subsystems, threads)
356        else:
357            filename_section = self.gen_fio_filename_conf(threads)
358
359        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
360                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
361                                              io_depth=io_depth, ramp_time=ramp_time, run_time=run_time)
362        if num_jobs:
363            fio_config = fio_config + "numjobs=%s \n" % num_jobs
364        if self.cpus_allowed is not None:
365            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
366            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
367        fio_config = fio_config + filename_section
368
369        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
370        if hasattr(self, "num_cores"):
371            fio_config_filename += "_%sCPU" % self.num_cores
372        fio_config_filename += ".fio"
373
374        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
375        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
376        self.log_print("Created FIO Config:")
377        self.log_print(fio_config)
378
379        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
380
381    def set_cpu_frequency(self):
382        if self.cpu_frequency is not None:
383            try:
384                self.remote_call('sudo cpupower frequency-set -g userspace')
385                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
386                cmd = "sudo cpupower frequency-info"
387                output, error = self.remote_call(cmd)
388                self.log_print(output)
389                self.log_print(error)
390            except Exception:
391                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
392                sys.exit()
393        else:
394            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
395
396    def run_fio(self, fio_config_file, run_num=None):
397        job_name, _ = os.path.splitext(fio_config_file)
398        self.log_print("Starting FIO run for job: %s" % job_name)
399        self.log_print("Using FIO: %s" % self.fio_bin)
400
401        if run_num:
402            for i in range(1, run_num + 1):
403                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
404                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
405                output, error = self.remote_call(cmd)
406                self.log_print(output)
407                self.log_print(error)
408        else:
409            output_filename = job_name + "_" + self.name + ".json"
410            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
411            output, error = self.remote_call(cmd)
412            self.log_print(output)
413            self.log_print(error)
414        self.log_print("FIO run finished. Results in: %s" % output_filename)
415
416
417class KernelTarget(Target):
418    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
419                 use_null_block=False, sar_settings=None, pcm_settings=None,
420                 nvmet_bin="nvmetcli", **kwargs):
421
422        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
423                                           use_null_block, sar_settings, pcm_settings)
424        self.nvmet_bin = nvmet_bin
425
426    def __del__(self):
427        nvmet_command(self.nvmet_bin, "clear")
428
429    def kernel_tgt_gen_nullblock_conf(self, address):
430        nvmet_cfg = {
431            "ports": [],
432            "hosts": [],
433            "subsystems": [],
434        }
435
436        nvmet_cfg["subsystems"].append({
437            "allowed_hosts": [],
438            "attr": {
439                "allow_any_host": "1",
440                "version": "1.3"
441            },
442            "namespaces": [
443                {
444                    "device": {
445                        "path": "/dev/nullb0",
446                        "uuid": "%s" % uuid.uuid4()
447                    },
448                    "enable": 1,
449                    "nsid": 1
450                }
451            ],
452            "nqn": "nqn.2018-09.io.spdk:cnode1"
453        })
454
455        nvmet_cfg["ports"].append({
456            "addr": {
457                "adrfam": "ipv4",
458                "traddr": address,
459                "trsvcid": "4420",
460                "trtype": "%s" % self.transport,
461            },
462            "portid": 1,
463            "referrals": [],
464            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
465        })
466        with open("kernel.conf", 'w') as fh:
467            fh.write(json.dumps(nvmet_cfg, indent=2))
468
469    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
470
471        nvmet_cfg = {
472            "ports": [],
473            "hosts": [],
474            "subsystems": [],
475        }
476
477        # Split disks between NIC IP's
478        disks_per_ip = int(len(nvme_list) / len(address_list))
479        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
480
481        subsys_no = 1
482        port_no = 0
483        for ip, chunk in zip(address_list, disk_chunks):
484            for disk in chunk:
485                nvmet_cfg["subsystems"].append({
486                    "allowed_hosts": [],
487                    "attr": {
488                        "allow_any_host": "1",
489                        "version": "1.3"
490                    },
491                    "namespaces": [
492                        {
493                            "device": {
494                                "path": disk,
495                                "uuid": "%s" % uuid.uuid4()
496                            },
497                            "enable": 1,
498                            "nsid": subsys_no
499                        }
500                    ],
501                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
502                })
503
504                nvmet_cfg["ports"].append({
505                    "addr": {
506                        "adrfam": "ipv4",
507                        "traddr": ip,
508                        "trsvcid": "%s" % (4420 + port_no),
509                        "trtype": "%s" % self.transport
510                    },
511                    "portid": subsys_no,
512                    "referrals": [],
513                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
514                })
515                subsys_no += 1
516                port_no += 1
517
518        with open("kernel.conf", "w") as fh:
519            fh.write(json.dumps(nvmet_cfg, indent=2))
520        pass
521
522    def tgt_start(self):
523        self.log_print("Configuring kernel NVMeOF Target")
524
525        if self.null_block:
526            print("Configuring with null block device.")
527            if len(self.nic_ips) > 1:
528                print("Testing with null block limited to single RDMA NIC.")
529                print("Please specify only 1 IP address.")
530                exit(1)
531            self.subsys_no = 1
532            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
533        else:
534            print("Configuring with NVMe drives.")
535            nvme_list = get_nvme_devices()
536            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
537            self.subsys_no = len(nvme_list)
538
539        nvmet_command(self.nvmet_bin, "clear")
540        nvmet_command(self.nvmet_bin, "restore kernel.conf")
541        self.log_print("Done configuring kernel NVMeOF Target")
542
543
544class SPDKTarget(Target):
545
546    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
547                 use_null_block=False, sar_settings=None, pcm_settings=None,
548                 num_shared_buffers=4096, num_cores=1, **kwargs):
549
550        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
551                                         use_null_block, sar_settings, pcm_settings)
552        self.num_cores = num_cores
553        self.num_shared_buffers = num_shared_buffers
554
555    def spdk_tgt_configure(self):
556        self.log_print("Configuring SPDK NVMeOF target via RPC")
557        numa_list = get_used_numa_nodes()
558
559        # Create RDMA transport layer
560        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
561        self.log_print("SPDK NVMeOF transport layer:")
562        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
563
564        if self.null_block:
565            nvme_section = self.spdk_tgt_add_nullblock()
566            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
567        else:
568            nvme_section = self.spdk_tgt_add_nvme_conf()
569            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
570        self.log_print("Done configuring SPDK NVMeOF Target")
571
572    def spdk_tgt_add_nullblock(self):
573        self.log_print("Adding null block bdev to config via RPC")
574        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
575        self.log_print("SPDK Bdevs configuration:")
576        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
577
578    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
579        self.log_print("Adding NVMe bdevs to config via RPC")
580
581        bdfs = get_nvme_devices_bdf()
582        bdfs = [b.replace(":", ".") for b in bdfs]
583
584        if req_num_disks:
585            if req_num_disks > len(bdfs):
586                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
587                sys.exit(1)
588            else:
589                bdfs = bdfs[0:req_num_disks]
590
591        for i, bdf in enumerate(bdfs):
592            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
593
594        self.log_print("SPDK Bdevs configuration:")
595        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
596
597    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
598        self.log_print("Adding subsystems to config")
599        if not req_num_disks:
600            req_num_disks = get_nvme_devices_count()
601
602        # Distribute bdevs between provided NICs
603        num_disks = range(1, req_num_disks + 1)
604        disks_per_ip = int(len(num_disks) / len(ips))
605        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
606
607        # Create subsystems, add bdevs to namespaces, add listeners
608        for ip, chunk in zip(ips, disk_chunks):
609            for c in chunk:
610                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
611                serial = "SPDK00%s" % c
612                bdev_name = "Nvme%sn1" % (c - 1)
613                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
614                                               allow_any_host=True, max_namespaces=8)
615                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
616
617                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
618                                                     trtype=self.transport,
619                                                     traddr=ip,
620                                                     trsvcid="4420",
621                                                     adrfam="ipv4")
622
623        self.log_print("SPDK NVMeOF subsystem configuration:")
624        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
625
626    def tgt_start(self):
627        self.subsys_no = get_nvme_devices_count()
628        self.log_print("Starting SPDK NVMeOF Target process")
629        nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt")
630        command = " ".join([nvmf_app_path, "-m", self.num_cores])
631        proc = subprocess.Popen(command, shell=True)
632        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
633
634        with open(self.pid, "w") as fh:
635            fh.write(str(proc.pid))
636        self.nvmf_proc = proc
637        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
638        self.log_print("Waiting for spdk to initilize...")
639        while True:
640            if os.path.exists("/var/tmp/spdk.sock"):
641                break
642            time.sleep(1)
643        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
644
645        self.spdk_tgt_configure()
646
647    def __del__(self):
648        if hasattr(self, "nvmf_proc"):
649            try:
650                self.nvmf_proc.terminate()
651                self.nvmf_proc.wait()
652            except Exception as e:
653                self.log_print(e)
654                self.nvmf_proc.kill()
655                self.nvmf_proc.communicate()
656
657
658class KernelInitiator(Initiator):
659    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
660                 cpus_allowed=None, cpus_allowed_policy="shared",
661                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
662
663        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
664                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
665                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
666
667        self.extra_params = ""
668        if kwargs["extra_params"]:
669            self.extra_params = kwargs["extra_params"]
670
671    def __del__(self):
672        self.ssh_connection.close()
673
674    def kernel_init_connect(self, address_list, subsys_no):
675        subsystems = self.discover_subsystems(address_list, subsys_no)
676        self.log_print("Below connection attempts may result in error messages, this is expected!")
677        for subsystem in subsystems:
678            self.log_print("Trying to connect %s %s %s" % subsystem)
679            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
680                                                                             self.transport,
681                                                                             *subsystem,
682                                                                             self.extra_params))
683            time.sleep(2)
684
685    def kernel_init_disconnect(self, address_list, subsys_no):
686        subsystems = self.discover_subsystems(address_list, subsys_no)
687        for subsystem in subsystems:
688            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
689            time.sleep(1)
690
691    def gen_fio_filename_conf(self, threads):
692        out, err = self.remote_call("lsblk -o NAME -nlp")
693        nvme_list = [x for x in out.split("\n") if "nvme" in x]
694
695        filename_section = ""
696        filenames = ["nvme%sn1" % x for x in range(0, len(nvme_list))]
697        nvme_per_split = int(len(nvme_list) / threads)
698        remainder = len(nvme_list) % threads
699        iterator = iter(filenames)
700        result = []
701        for i in range(len(threads)):
702            result.append([])
703            for j in range(nvme_per_split):
704                result[i].append(next(iterator))
705                if remainder:
706                    result[i].append(next(iterator))
707                    remainder -= 1
708        for i, r in enumerate(result):
709            header = "[filename%s]" % i
710            disks = "\n".join(["filename=/dev/%s" % x for x in r])
711            filename_section = "\n".join([filename_section, header, disks])
712
713        return filename_section
714
715
716class SPDKInitiator(Initiator):
717    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
718                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
719                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
720        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
721                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
722                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
723
724        self.num_cores = num_cores
725
726    def install_spdk(self, local_spdk_zip):
727        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
728        self.log_print("Copied sources zip from target")
729        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
730
731        self.log_print("Sources unpacked")
732        self.log_print("Using fio binary %s" % self.fio_bin)
733        self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;"
734                         "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
735
736        self.log_print("SPDK built")
737        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
738
739    def gen_spdk_bdev_conf(self, remote_subsystem_list):
740        header = "[Nvme]"
741        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
742
743        bdev_rows = [row_template.format(transport=self.transport,
744                                         svc=x[0],
745                                         nqn=x[1],
746                                         ip=x[2],
747                                         i=i) for i, x in enumerate(remote_subsystem_list)]
748        bdev_rows = "\n".join(bdev_rows)
749        bdev_section = "\n".join([header, bdev_rows])
750        return bdev_section
751
752    def gen_fio_filename_conf(self, subsystems, threads):
753        filename_section = ""
754        if len(threads) >= len(subsystems):
755            threads = range(0, len(subsystems))
756        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
757        nvme_per_split = int(len(subsystems) / len(threads))
758        remainder = len(subsystems) % len(threads)
759        iterator = iter(filenames)
760        result = []
761        for i in range(len(threads)):
762            result.append([])
763            for j in range(nvme_per_split):
764                result[i].append(next(iterator))
765            if remainder:
766                result[i].append(next(iterator))
767                remainder -= 1
768        for i, r in enumerate(result):
769            header = "[filename%s]" % i
770            disks = "\n".join(["filename=%s" % x for x in r])
771            filename_section = "\n".join([filename_section, header, disks])
772
773        return filename_section
774
775
776if __name__ == "__main__":
777    spdk_zip_path = "/tmp/spdk.zip"
778    target_results_dir = "/tmp/results"
779
780    if (len(sys.argv) > 1):
781        config_file_path = sys.argv[1]
782    else:
783        script_full_dir = os.path.dirname(os.path.realpath(__file__))
784        config_file_path = os.path.join(script_full_dir, "config.json")
785
786    print("Using config file: %s" % config_file_path)
787    with open(config_file_path, "r") as config:
788        data = json.load(config)
789
790    initiators = []
791    fio_cases = []
792
793    for k, v in data.items():
794        if "target" in k:
795            if data[k]["mode"] == "spdk":
796                target_obj = SPDKTarget(name=k, **data["general"], **v)
797            elif data[k]["mode"] == "kernel":
798                target_obj = KernelTarget(name=k, **data["general"], **v)
799        elif "initiator" in k:
800            if data[k]["mode"] == "spdk":
801                init_obj = SPDKInitiator(name=k, **data["general"], **v)
802            elif data[k]["mode"] == "kernel":
803                init_obj = KernelInitiator(name=k, **data["general"], **v)
804            initiators.append(init_obj)
805        elif "fio" in k:
806            fio_workloads = itertools.product(data[k]["bs"],
807                                              data[k]["qd"],
808                                              data[k]["rw"])
809
810            fio_run_time = data[k]["run_time"]
811            fio_ramp_time = data[k]["ramp_time"]
812            fio_rw_mix_read = data[k]["rwmixread"]
813            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
814            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
815        else:
816            continue
817
818    # Copy and install SPDK on remote initiators
819    target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
820    threads = []
821    for i in initiators:
822        if i.mode == "spdk":
823            t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
824            threads.append(t)
825            t.start()
826    for t in threads:
827        t.join()
828
829    target_obj.tgt_start()
830
831    # Poor mans threading
832    # Run FIO tests
833    for block_size, io_depth, rw in fio_workloads:
834        threads = []
835        configs = []
836        for i in initiators:
837            if i.mode == "kernel":
838                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
839
840            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
841                                   fio_num_jobs, fio_ramp_time, fio_run_time)
842            configs.append(cfg)
843
844        for i, cfg in zip(initiators, configs):
845            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
846            threads.append(t)
847        if target_obj.enable_sar:
848            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
849            sar_file_name = ".".join([sar_file_name, "txt"])
850            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
851            threads.append(t)
852
853        if target_obj.enable_pcm:
854            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
855            pcm_file_name = ".".join([pcm_file_name, "csv"])
856            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
857            threads.append(t)
858
859        if target_obj.enable_pcm_memory:
860            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
861            pcm_file_name = ".".join([pcm_file_name, "csv"])
862            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
863            threads.append(t)
864
865        for t in threads:
866            t.start()
867        for t in threads:
868            t.join()
869
870        for i in initiators:
871            if i.mode == "kernel":
872                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
873            i.copy_result_files(target_results_dir)
874
875    target_obj.parse_results(target_results_dir)
876