xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 91a594ad8b3c0d00f25c9a20dfc8f1f2dfce81ce)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import paramiko
8import zipfile
9import threading
10import subprocess
11import itertools
12import time
13import uuid
14import rpc
15import rpc.client
16import pandas as pd
17from common import *
18
19
20class Server:
21    def __init__(self, name, username, password, mode, nic_ips, transport):
22        self.name = name
23        self.mode = mode
24        self.username = username
25        self.password = password
26        self.nic_ips = nic_ips
27        self.transport = transport.lower()
28
29        if not re.match("^[A-Za-z0-9]*$", name):
30            self.log_print("Please use a name which contains only letters or numbers")
31            sys.exit(1)
32
33    def log_print(self, msg):
34        print("[%s] %s" % (self.name, msg), flush=True)
35
36
37class Target(Server):
38    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
39                 use_null_block=False, sar_settings=None, pcm_settings=None):
40
41        super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
42        self.null_block = bool(use_null_block)
43        self.enable_sar = False
44        self.enable_pcm_memory = False
45        self.enable_pcm = False
46
47        if sar_settings:
48            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
49
50        if pcm_settings:
51            self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
52
53        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
54        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
55
56    def zip_spdk_sources(self, spdk_dir, dest_file):
57        self.log_print("Zipping SPDK source directory")
58        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
59        for root, directories, files in os.walk(spdk_dir, followlinks=True):
60            for file in files:
61                fh.write(os.path.relpath(os.path.join(root, file)))
62        fh.close()
63        self.log_print("Done zipping")
64
65    def read_json_stats(self, file):
66        with open(file, "r") as json_data:
67            data = json.load(json_data)
68            job_pos = 0  # job_post = 0 because using aggregated results
69
70            # Check if latency is in nano or microseconds to choose correct dict key
71            def get_lat_unit(key_prefix, dict_section):
72                # key prefix - lat, clat or slat.
73                # dict section - portion of json containing latency bucket in question
74                # Return dict key to access the bucket and unit as string
75                for k, v in dict_section.items():
76                    if k.startswith(key_prefix):
77                        return k, k.split("_")[1]
78
79            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
80            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
81            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
82            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
83            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
84            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
85            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
86            read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
87            read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
88            read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
89            read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
90
91            if "ns" in lat_unit:
92                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
93            if "ns" in clat_unit:
94                read_p99_lat = read_p99_lat / 1000
95                read_p99_9_lat = read_p99_9_lat / 1000
96                read_p99_99_lat = read_p99_99_lat / 1000
97                read_p99_999_lat = read_p99_999_lat / 1000
98
99            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
100            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
101            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
102            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
103            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
104            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
105            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
106            write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
107            write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
108            write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
109            write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
110
111            if "ns" in lat_unit:
112                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
113            if "ns" in clat_unit:
114                write_p99_lat = write_p99_lat / 1000
115                write_p99_9_lat = write_p99_9_lat / 1000
116                write_p99_99_lat = write_p99_99_lat / 1000
117                write_p99_999_lat = write_p99_999_lat / 1000
118
119        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
120                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
121                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
122                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
123
124    def parse_results(self, results_dir, initiator_count=None, run_num=None):
125        files = os.listdir(results_dir)
126        fio_files = filter(lambda x: ".fio" in x, files)
127        json_files = [x for x in files if ".json" in x]
128
129        # Create empty results file
130        csv_file = "nvmf_results.csv"
131        with open(os.path.join(results_dir, csv_file), "w") as fh:
132            header_line = ",".join(["Name",
133                                    "read_iops", "read_bw", "read_avg_lat_us",
134                                    "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us",
135                                    "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
136                                    "write_iops", "write_bw", "write_avg_lat_us",
137                                    "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us",
138                                    "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"])
139            fh.write(header_line + "\n")
140        rows = set()
141
142        for fio_config in fio_files:
143            self.log_print("Getting FIO stats for %s" % fio_config)
144            job_name, _ = os.path.splitext(fio_config)
145
146            # If "_CPU" exists in name - ignore it
147            # Initiators for the same job could have diffrent num_cores parameter
148            job_name = re.sub(r"_\d+CPU", "", job_name)
149            job_result_files = [x for x in json_files if job_name in x]
150            self.log_print("Matching result files for current fio config:")
151            for j in job_result_files:
152                self.log_print("\t %s" % j)
153
154            # There may have been more than 1 initiator used in test, need to check that
155            # Result files are created so that string after last "_" separator is server name
156            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
157            inits_avg_results = []
158            for i in inits_names:
159                self.log_print("\tGetting stats for initiator %s" % i)
160                # There may have been more than 1 test run for this job, calculate average results for initiator
161                i_results = [x for x in job_result_files if i in x]
162
163                separate_stats = []
164                for r in i_results:
165                    stats = self.read_json_stats(os.path.join(results_dir, r))
166                    separate_stats.append(stats)
167                    self.log_print(stats)
168
169                z = [sum(c) for c in zip(*separate_stats)]
170                z = [c/len(separate_stats) for c in z]
171                inits_avg_results.append(z)
172
173                self.log_print("\tAverage results for initiator %s" % i)
174                self.log_print(z)
175
176            # Sum average results of all initiators running this FIO job
177            self.log_print("\tTotal results for %s from all initiators" % fio_config)
178            for a in inits_avg_results:
179                self.log_print(a)
180            total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)]
181            rows.add(",".join([job_name, *total]))
182
183        # Save results to file
184        for row in rows:
185            with open(os.path.join(results_dir, csv_file), "a") as fh:
186                fh.write(row + "\n")
187        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
188
189    def measure_sar(self, results_dir, sar_file_name):
190        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
191        time.sleep(self.sar_delay)
192        out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
193        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
194            for line in out.split("\n"):
195                if "Average" in line and "CPU" in line:
196                    self.log_print("Summary CPU utilization from SAR:")
197                    self.log_print(line)
198                if "Average" in line and "all" in line:
199                    self.log_print(line)
200            fh.write(out)
201
202    def measure_pcm_memory(self, results_dir, pcm_file_name):
203        time.sleep(self.pcm_delay)
204        pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
205                                      results_dir, pcm_file_name), shell=True)
206        time.sleep(self.pcm_count)
207        pcm_memory.kill()
208
209    def measure_pcm(self, results_dir, pcm_file_name):
210        time.sleep(self.pcm_delay)
211        subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
212                       results_dir, pcm_file_name), shell=True, check=True)
213        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
214        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
215        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
216        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
217        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
218
219
220class Initiator(Server):
221    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
222                 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
223                 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
224
225        super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
226
227        self.ip = ip
228        self.spdk_dir = workspace
229        if os.getenv('SPDK_WORKSPACE'):
230            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
231        self.fio_bin = fio_bin
232        self.cpus_allowed = cpus_allowed
233        self.cpus_allowed_policy = cpus_allowed_policy
234        self.cpu_frequency = cpu_frequency
235        self.nvmecli_bin = nvmecli_bin
236        self.ssh_connection = paramiko.SSHClient()
237        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
238        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
239        self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
240        self.remote_call("mkdir -p %s" % self.spdk_dir)
241        self.set_cpu_frequency()
242
243    def __del__(self):
244        self.ssh_connection.close()
245
246    def put_file(self, local, remote_dest):
247        ftp = self.ssh_connection.open_sftp()
248        ftp.put(local, remote_dest)
249        ftp.close()
250
251    def get_file(self, remote, local_dest):
252        ftp = self.ssh_connection.open_sftp()
253        ftp.get(remote, local_dest)
254        ftp.close()
255
256    def remote_call(self, cmd):
257        stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
258        out = stdout.read().decode(encoding="utf-8")
259        err = stderr.read().decode(encoding="utf-8")
260        return out, err
261
262    def copy_result_files(self, dest_dir):
263        self.log_print("Copying results")
264
265        if not os.path.exists(dest_dir):
266            os.mkdir(dest_dir)
267
268        # Get list of result files from initiator and copy them back to target
269        stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
270        file_list = stdout.strip().split("\n")
271
272        for file in file_list:
273            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
274                          os.path.join(dest_dir, file))
275        self.log_print("Done copying results")
276
277    def discover_subsystems(self, address_list, subsys_no):
278        num_nvmes = range(0, subsys_no)
279        nvme_discover_output = ""
280        for ip, subsys_no in itertools.product(address_list, num_nvmes):
281            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
282            nvme_discover_cmd = ["sudo",
283                                 "%s" % self.nvmecli_bin,
284                                 "discover", "-t %s" % self.transport,
285                                 "-s %s" % (4420 + subsys_no),
286                                 "-a %s" % ip]
287            nvme_discover_cmd = " ".join(nvme_discover_cmd)
288
289            stdout, stderr = self.remote_call(nvme_discover_cmd)
290            if stdout:
291                nvme_discover_output = nvme_discover_output + stdout
292
293        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
294                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
295                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
296                                nvme_discover_output)  # from nvme discovery output
297        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
298        subsystems = list(set(subsystems))
299        subsystems.sort(key=lambda x: x[1])
300        self.log_print("Found matching subsystems on target side:")
301        for s in subsystems:
302            self.log_print(s)
303
304        return subsystems
305
306    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
307        fio_conf_template = """
308[global]
309ioengine={ioengine}
310{spdk_conf}
311thread=1
312group_reporting=1
313direct=1
314percentile_list=50:90:99:99.5:99.9:99.99:99.999
315
316norandommap=1
317rw={rw}
318rwmixread={rwmixread}
319bs={block_size}
320iodepth={io_depth}
321time_based=1
322ramp_time={ramp_time}
323runtime={run_time}
324"""
325        if "spdk" in self.mode:
326            subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
327            bdev_conf = self.gen_spdk_bdev_conf(subsystems)
328            self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
329            ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir
330            spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
331        else:
332            ioengine = "libaio"
333            spdk_conf = ""
334            out, err = self.remote_call("lsblk -o NAME -nlp")
335            subsystems = [x for x in out.split("\n") if "nvme" in x]
336
337        if self.cpus_allowed is not None:
338            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
339            cpus_num = 0
340            cpus = self.cpus_allowed.split(",")
341            for cpu in cpus:
342                if "-" in cpu:
343                    a, b = cpu.split("-")
344                    a = int(a)
345                    b = int(b)
346                    cpus_num += len(range(a, b))
347                else:
348                    cpus_num += 1
349            threads = range(0, cpus_num)
350        elif hasattr(self, 'num_cores'):
351            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
352            threads = range(0, int(self.num_cores))
353        else:
354            threads = range(0, len(subsystems))
355
356        if "spdk" in self.mode:
357            filename_section = self.gen_fio_filename_conf(subsystems, threads)
358        else:
359            filename_section = self.gen_fio_filename_conf(threads)
360
361        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
362                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
363                                              io_depth=io_depth, ramp_time=ramp_time, run_time=run_time)
364        if num_jobs:
365            fio_config = fio_config + "numjobs=%s \n" % num_jobs
366        if self.cpus_allowed is not None:
367            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
368            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
369        fio_config = fio_config + filename_section
370
371        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
372        if hasattr(self, "num_cores"):
373            fio_config_filename += "_%sCPU" % self.num_cores
374        fio_config_filename += ".fio"
375
376        self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
377        self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
378        self.log_print("Created FIO Config:")
379        self.log_print(fio_config)
380
381        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
382
383    def set_cpu_frequency(self):
384        if self.cpu_frequency is not None:
385            try:
386                self.remote_call('sudo cpupower frequency-set -g userspace')
387                self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
388                cmd = "sudo cpupower frequency-info"
389                output, error = self.remote_call(cmd)
390                self.log_print(output)
391                self.log_print(error)
392            except Exception:
393                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
394                sys.exit()
395        else:
396            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
397
398    def run_fio(self, fio_config_file, run_num=None):
399        job_name, _ = os.path.splitext(fio_config_file)
400        self.log_print("Starting FIO run for job: %s" % job_name)
401        self.log_print("Using FIO: %s" % self.fio_bin)
402
403        if run_num:
404            for i in range(1, run_num + 1):
405                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
406                cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
407                output, error = self.remote_call(cmd)
408                self.log_print(output)
409                self.log_print(error)
410        else:
411            output_filename = job_name + "_" + self.name + ".json"
412            cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
413            output, error = self.remote_call(cmd)
414            self.log_print(output)
415            self.log_print(error)
416        self.log_print("FIO run finished. Results in: %s" % output_filename)
417
418
419class KernelTarget(Target):
420    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
421                 use_null_block=False, sar_settings=None, pcm_settings=None,
422                 nvmet_bin="nvmetcli", **kwargs):
423
424        super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
425                                           use_null_block, sar_settings, pcm_settings)
426        self.nvmet_bin = nvmet_bin
427
428    def __del__(self):
429        nvmet_command(self.nvmet_bin, "clear")
430
431    def kernel_tgt_gen_nullblock_conf(self, address):
432        nvmet_cfg = {
433            "ports": [],
434            "hosts": [],
435            "subsystems": [],
436        }
437
438        nvmet_cfg["subsystems"].append({
439            "allowed_hosts": [],
440            "attr": {
441                "allow_any_host": "1",
442                "version": "1.3"
443            },
444            "namespaces": [
445                {
446                    "device": {
447                        "path": "/dev/nullb0",
448                        "uuid": "%s" % uuid.uuid4()
449                    },
450                    "enable": 1,
451                    "nsid": 1
452                }
453            ],
454            "nqn": "nqn.2018-09.io.spdk:cnode1"
455        })
456
457        nvmet_cfg["ports"].append({
458            "addr": {
459                "adrfam": "ipv4",
460                "traddr": address,
461                "trsvcid": "4420",
462                "trtype": "%s" % self.transport,
463            },
464            "portid": 1,
465            "referrals": [],
466            "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
467        })
468        with open("kernel.conf", 'w') as fh:
469            fh.write(json.dumps(nvmet_cfg, indent=2))
470
471    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
472
473        nvmet_cfg = {
474            "ports": [],
475            "hosts": [],
476            "subsystems": [],
477        }
478
479        # Split disks between NIC IP's
480        disks_per_ip = int(len(nvme_list) / len(address_list))
481        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
482
483        subsys_no = 1
484        port_no = 0
485        for ip, chunk in zip(address_list, disk_chunks):
486            for disk in chunk:
487                nvmet_cfg["subsystems"].append({
488                    "allowed_hosts": [],
489                    "attr": {
490                        "allow_any_host": "1",
491                        "version": "1.3"
492                    },
493                    "namespaces": [
494                        {
495                            "device": {
496                                "path": disk,
497                                "uuid": "%s" % uuid.uuid4()
498                            },
499                            "enable": 1,
500                            "nsid": subsys_no
501                        }
502                    ],
503                    "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
504                })
505
506                nvmet_cfg["ports"].append({
507                    "addr": {
508                        "adrfam": "ipv4",
509                        "traddr": ip,
510                        "trsvcid": "%s" % (4420 + port_no),
511                        "trtype": "%s" % self.transport
512                    },
513                    "portid": subsys_no,
514                    "referrals": [],
515                    "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
516                })
517                subsys_no += 1
518                port_no += 1
519
520        with open("kernel.conf", "w") as fh:
521            fh.write(json.dumps(nvmet_cfg, indent=2))
522        pass
523
524    def tgt_start(self):
525        self.log_print("Configuring kernel NVMeOF Target")
526
527        if self.null_block:
528            print("Configuring with null block device.")
529            if len(self.nic_ips) > 1:
530                print("Testing with null block limited to single RDMA NIC.")
531                print("Please specify only 1 IP address.")
532                exit(1)
533            self.subsys_no = 1
534            self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
535        else:
536            print("Configuring with NVMe drives.")
537            nvme_list = get_nvme_devices()
538            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
539            self.subsys_no = len(nvme_list)
540
541        nvmet_command(self.nvmet_bin, "clear")
542        nvmet_command(self.nvmet_bin, "restore kernel.conf")
543        self.log_print("Done configuring kernel NVMeOF Target")
544
545
546class SPDKTarget(Target):
547
548    def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
549                 use_null_block=False, sar_settings=None, pcm_settings=None,
550                 num_shared_buffers=4096, num_cores=1, **kwargs):
551
552        super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
553                                         use_null_block, sar_settings, pcm_settings)
554        self.num_cores = num_cores
555        self.num_shared_buffers = num_shared_buffers
556
557    def spdk_tgt_configure(self):
558        self.log_print("Configuring SPDK NVMeOF target via RPC")
559        numa_list = get_used_numa_nodes()
560
561        # Create RDMA transport layer
562        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
563        self.log_print("SPDK NVMeOF transport layer:")
564        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
565
566        if self.null_block:
567            nvme_section = self.spdk_tgt_add_nullblock()
568            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
569        else:
570            nvme_section = self.spdk_tgt_add_nvme_conf()
571            subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
572        self.log_print("Done configuring SPDK NVMeOF Target")
573
574    def spdk_tgt_add_nullblock(self):
575        self.log_print("Adding null block bdev to config via RPC")
576        rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
577        self.log_print("SPDK Bdevs configuration:")
578        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
579
580    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
581        self.log_print("Adding NVMe bdevs to config via RPC")
582
583        bdfs = get_nvme_devices_bdf()
584        bdfs = [b.replace(":", ".") for b in bdfs]
585
586        if req_num_disks:
587            if req_num_disks > len(bdfs):
588                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
589                sys.exit(1)
590            else:
591                bdfs = bdfs[0:req_num_disks]
592
593        for i, bdf in enumerate(bdfs):
594            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
595
596        self.log_print("SPDK Bdevs configuration:")
597        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
598
599    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
600        self.log_print("Adding subsystems to config")
601        if not req_num_disks:
602            req_num_disks = get_nvme_devices_count()
603
604        # Distribute bdevs between provided NICs
605        num_disks = range(1, req_num_disks + 1)
606        disks_per_ip = int(len(num_disks) / len(ips))
607        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
608
609        # Create subsystems, add bdevs to namespaces, add listeners
610        for ip, chunk in zip(ips, disk_chunks):
611            for c in chunk:
612                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
613                serial = "SPDK00%s" % c
614                bdev_name = "Nvme%sn1" % (c - 1)
615                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
616                                               allow_any_host=True, max_namespaces=8)
617                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
618
619                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
620                                                     trtype=self.transport,
621                                                     traddr=ip,
622                                                     trsvcid="4420",
623                                                     adrfam="ipv4")
624
625        self.log_print("SPDK NVMeOF subsystem configuration:")
626        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
627
628    def tgt_start(self):
629        self.subsys_no = get_nvme_devices_count()
630        self.log_print("Starting SPDK NVMeOF Target process")
631        nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt")
632        command = " ".join([nvmf_app_path, "-m", self.num_cores])
633        proc = subprocess.Popen(command, shell=True)
634        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
635
636        with open(self.pid, "w") as fh:
637            fh.write(str(proc.pid))
638        self.nvmf_proc = proc
639        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
640        self.log_print("Waiting for spdk to initilize...")
641        while True:
642            if os.path.exists("/var/tmp/spdk.sock"):
643                break
644            time.sleep(1)
645        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
646
647        self.spdk_tgt_configure()
648
649    def __del__(self):
650        if hasattr(self, "nvmf_proc"):
651            try:
652                self.nvmf_proc.terminate()
653                self.nvmf_proc.wait()
654            except Exception as e:
655                self.log_print(e)
656                self.nvmf_proc.kill()
657                self.nvmf_proc.communicate()
658
659
660class KernelInitiator(Initiator):
661    def __init__(self, name, username, password, mode, nic_ips, ip, transport,
662                 cpus_allowed=None, cpus_allowed_policy="shared",
663                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
664
665        super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
666                                              cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
667                                              cpu_frequency=cpu_frequency, fio_bin=fio_bin)
668
669        self.extra_params = ""
670        if kwargs["extra_params"]:
671            self.extra_params = kwargs["extra_params"]
672
673    def __del__(self):
674        self.ssh_connection.close()
675
676    def kernel_init_connect(self, address_list, subsys_no):
677        subsystems = self.discover_subsystems(address_list, subsys_no)
678        self.log_print("Below connection attempts may result in error messages, this is expected!")
679        for subsystem in subsystems:
680            self.log_print("Trying to connect %s %s %s" % subsystem)
681            self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
682                                                                             self.transport,
683                                                                             *subsystem,
684                                                                             self.extra_params))
685            time.sleep(2)
686
687    def kernel_init_disconnect(self, address_list, subsys_no):
688        subsystems = self.discover_subsystems(address_list, subsys_no)
689        for subsystem in subsystems:
690            self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
691            time.sleep(1)
692
693    def gen_fio_filename_conf(self, threads):
694        out, err = self.remote_call("lsblk -o NAME -nlp")
695        nvme_list = [x for x in out.split("\n") if "nvme" in x]
696
697        filename_section = ""
698        filenames = ["nvme%sn1" % x for x in range(0, len(nvme_list))]
699        nvme_per_split = int(len(nvme_list) / threads)
700        remainder = len(nvme_list) % threads
701        iterator = iter(filenames)
702        result = []
703        for i in range(len(threads)):
704            result.append([])
705            for j in range(nvme_per_split):
706                result[i].append(next(iterator))
707                if remainder:
708                    result[i].append(next(iterator))
709                    remainder -= 1
710        for i, r in enumerate(result):
711            header = "[filename%s]" % i
712            disks = "\n".join(["filename=/dev/%s" % x for x in r])
713            filename_section = "\n".join([filename_section, header, disks])
714
715        return filename_section
716
717
718class SPDKInitiator(Initiator):
719    def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
720                 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
721                 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
722        super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
723                                            cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
724                                            cpu_frequency=cpu_frequency, fio_bin=fio_bin)
725
726        self.num_cores = num_cores
727
728    def install_spdk(self, local_spdk_zip):
729        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
730        self.log_print("Copied sources zip from target")
731        self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
732
733        self.log_print("Sources unpacked")
734        self.log_print("Using fio binary %s" % self.fio_bin)
735        self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;"
736                         "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
737
738        self.log_print("SPDK built")
739        self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
740
741    def gen_spdk_bdev_conf(self, remote_subsystem_list):
742        header = "[Nvme]"
743        row_template = """  TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
744
745        bdev_rows = [row_template.format(transport=self.transport,
746                                         svc=x[0],
747                                         nqn=x[1],
748                                         ip=x[2],
749                                         i=i) for i, x in enumerate(remote_subsystem_list)]
750        bdev_rows = "\n".join(bdev_rows)
751        bdev_section = "\n".join([header, bdev_rows])
752        return bdev_section
753
754    def gen_fio_filename_conf(self, subsystems, threads):
755        filename_section = ""
756        if len(threads) >= len(subsystems):
757            threads = range(0, len(subsystems))
758        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
759        nvme_per_split = int(len(subsystems) / len(threads))
760        remainder = len(subsystems) % len(threads)
761        iterator = iter(filenames)
762        result = []
763        for i in range(len(threads)):
764            result.append([])
765            for j in range(nvme_per_split):
766                result[i].append(next(iterator))
767            if remainder:
768                result[i].append(next(iterator))
769                remainder -= 1
770        for i, r in enumerate(result):
771            header = "[filename%s]" % i
772            disks = "\n".join(["filename=%s" % x for x in r])
773            filename_section = "\n".join([filename_section, header, disks])
774
775        return filename_section
776
777
778if __name__ == "__main__":
779    spdk_zip_path = "/tmp/spdk.zip"
780    target_results_dir = "/tmp/results"
781
782    if (len(sys.argv) > 1):
783        config_file_path = sys.argv[1]
784    else:
785        script_full_dir = os.path.dirname(os.path.realpath(__file__))
786        config_file_path = os.path.join(script_full_dir, "config.json")
787
788    print("Using config file: %s" % config_file_path)
789    with open(config_file_path, "r") as config:
790        data = json.load(config)
791
792    initiators = []
793    fio_cases = []
794
795    for k, v in data.items():
796        if "target" in k:
797            if data[k]["mode"] == "spdk":
798                target_obj = SPDKTarget(name=k, **data["general"], **v)
799            elif data[k]["mode"] == "kernel":
800                target_obj = KernelTarget(name=k, **data["general"], **v)
801        elif "initiator" in k:
802            if data[k]["mode"] == "spdk":
803                init_obj = SPDKInitiator(name=k, **data["general"], **v)
804            elif data[k]["mode"] == "kernel":
805                init_obj = KernelInitiator(name=k, **data["general"], **v)
806            initiators.append(init_obj)
807        elif "fio" in k:
808            fio_workloads = itertools.product(data[k]["bs"],
809                                              data[k]["qd"],
810                                              data[k]["rw"])
811
812            fio_run_time = data[k]["run_time"]
813            fio_ramp_time = data[k]["ramp_time"]
814            fio_rw_mix_read = data[k]["rwmixread"]
815            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
816            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
817        else:
818            continue
819
820    # Copy and install SPDK on remote initiators
821    target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
822    threads = []
823    for i in initiators:
824        if i.mode == "spdk":
825            t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
826            threads.append(t)
827            t.start()
828    for t in threads:
829        t.join()
830
831    target_obj.tgt_start()
832
833    # Poor mans threading
834    # Run FIO tests
835    for block_size, io_depth, rw in fio_workloads:
836        threads = []
837        configs = []
838        for i in initiators:
839            if i.mode == "kernel":
840                i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
841
842            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
843                                   fio_num_jobs, fio_ramp_time, fio_run_time)
844            configs.append(cfg)
845
846        for i, cfg in zip(initiators, configs):
847            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
848            threads.append(t)
849        if target_obj.enable_sar:
850            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
851            sar_file_name = ".".join([sar_file_name, "txt"])
852            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
853            threads.append(t)
854
855        if target_obj.enable_pcm:
856            pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
857            pcm_file_name = ".".join([pcm_file_name, "csv"])
858            t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
859            threads.append(t)
860
861        if target_obj.enable_pcm_memory:
862            pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
863            pcm_file_name = ".".join([pcm_file_name, "csv"])
864            t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
865            threads.append(t)
866
867        for t in threads:
868            t.start()
869        for t in threads:
870            t.join()
871
872        for i in initiators:
873            if i.mode == "kernel":
874                i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
875            i.copy_result_files(target_results_dir)
876
877    target_obj.parse_results(target_results_dir)
878