1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from collections import OrderedDict 18from common import * 19 20 21class Server: 22 def __init__(self, name, username, password, mode, nic_ips, transport): 23 self.name = name 24 self.mode = mode 25 self.username = username 26 self.password = password 27 self.nic_ips = nic_ips 28 self.transport = transport.lower() 29 30 if not re.match("^[A-Za-z0-9]*$", name): 31 self.log_print("Please use a name which contains only letters or numbers") 32 sys.exit(1) 33 34 def log_print(self, msg): 35 print("[%s] %s" % (self.name, msg), flush=True) 36 37 38class Target(Server): 39 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 40 use_null_block=False, sar_settings=None, pcm_settings=None, 41 bandwidth_settings=None): 42 43 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 44 self.null_block = bool(use_null_block) 45 self.enable_sar = False 46 self.enable_pcm_memory = False 47 self.enable_pcm = False 48 self.enable_bandwidth = False 49 50 if sar_settings: 51 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 52 53 if pcm_settings: 54 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 55 56 if bandwidth_settings: 57 self.enable_bandwidth, self.bandwidth_count = bandwidth_settings 58 59 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 60 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 61 62 def zip_spdk_sources(self, spdk_dir, dest_file): 63 self.log_print("Zipping SPDK source directory") 64 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 65 for root, directories, files in os.walk(spdk_dir, followlinks=True): 66 for file in files: 67 fh.write(os.path.relpath(os.path.join(root, file))) 68 fh.close() 69 self.log_print("Done zipping") 70 71 def read_json_stats(self, file): 72 with open(file, "r") as json_data: 73 data = json.load(json_data) 74 job_pos = 0 # job_post = 0 because using aggregated results 75 76 # Check if latency is in nano or microseconds to choose correct dict key 77 def get_lat_unit(key_prefix, dict_section): 78 # key prefix - lat, clat or slat. 79 # dict section - portion of json containing latency bucket in question 80 # Return dict key to access the bucket and unit as string 81 for k, v in dict_section.items(): 82 if k.startswith(key_prefix): 83 return k, k.split("_")[1] 84 85 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 86 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 87 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 88 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 89 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 90 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 91 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 92 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 93 read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"]) 94 read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"]) 95 read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"]) 96 97 if "ns" in lat_unit: 98 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 99 if "ns" in clat_unit: 100 read_p99_lat = read_p99_lat / 1000 101 read_p99_9_lat = read_p99_9_lat / 1000 102 read_p99_99_lat = read_p99_99_lat / 1000 103 read_p99_999_lat = read_p99_999_lat / 1000 104 105 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 106 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 107 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 108 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 109 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 110 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 111 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 112 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 113 write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"]) 114 write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"]) 115 write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"]) 116 117 if "ns" in lat_unit: 118 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 119 if "ns" in clat_unit: 120 write_p99_lat = write_p99_lat / 1000 121 write_p99_9_lat = write_p99_9_lat / 1000 122 write_p99_99_lat = write_p99_99_lat / 1000 123 write_p99_999_lat = write_p99_999_lat / 1000 124 125 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 126 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 127 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 128 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 129 130 def parse_results(self, results_dir, initiator_count=None, run_num=None): 131 files = os.listdir(results_dir) 132 fio_files = filter(lambda x: ".fio" in x, files) 133 json_files = [x for x in files if ".json" in x] 134 135 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 136 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 137 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 138 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 139 140 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 141 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 142 143 header_line = ",".join(["Name", *headers]) 144 aggr_header_line = ",".join(["Name", *aggr_headers]) 145 146 # Create empty results file 147 csv_file = "nvmf_results.csv" 148 with open(os.path.join(results_dir, csv_file), "w") as fh: 149 fh.write(aggr_header_line + "\n") 150 rows = set() 151 152 for fio_config in fio_files: 153 self.log_print("Getting FIO stats for %s" % fio_config) 154 job_name, _ = os.path.splitext(fio_config) 155 156 # Look in the filename for rwmixread value. Function arguments do 157 # not have that information. 158 # TODO: Improve this function by directly using workload params instead 159 # of regexing through filenames. 160 if "read" in job_name: 161 rw_mixread = 1 162 elif "write" in job_name: 163 rw_mixread = 0 164 else: 165 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 166 167 # If "_CPU" exists in name - ignore it 168 # Initiators for the same job could have diffrent num_cores parameter 169 job_name = re.sub(r"_\d+CPU", "", job_name) 170 job_result_files = [x for x in json_files if job_name in x] 171 self.log_print("Matching result files for current fio config:") 172 for j in job_result_files: 173 self.log_print("\t %s" % j) 174 175 # There may have been more than 1 initiator used in test, need to check that 176 # Result files are created so that string after last "_" separator is server name 177 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 178 inits_avg_results = [] 179 for i in inits_names: 180 self.log_print("\tGetting stats for initiator %s" % i) 181 # There may have been more than 1 test run for this job, calculate average results for initiator 182 i_results = [x for x in job_result_files if i in x] 183 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 184 185 separate_stats = [] 186 for r in i_results: 187 stats = self.read_json_stats(os.path.join(results_dir, r)) 188 separate_stats.append(stats) 189 self.log_print(stats) 190 191 init_results = [sum(x) for x in zip(*separate_stats)] 192 init_results = [x / len(separate_stats) for x in init_results] 193 inits_avg_results.append(init_results) 194 195 self.log_print("\tAverage results for initiator %s" % i) 196 self.log_print(init_results) 197 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 198 fh.write(header_line + "\n") 199 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 200 201 # Sum results of all initiators running this FIO job. 202 # Latency results are an average of latencies from accros all initiators. 203 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 204 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 205 for key in inits_avg_results: 206 if "lat" in key: 207 inits_avg_results[key] /= len(inits_names) 208 209 # Aggregate separate read/write values into common labels 210 # Take rw_mixread into consideration for mixed read/write workloads. 211 aggregate_results = OrderedDict() 212 for h in aggr_headers: 213 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 214 if "lat" in h: 215 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 216 else: 217 _ = read_stat + write_stat 218 aggregate_results[h] = "{0:.3f}".format(_) 219 220 rows.add(",".join([job_name, *aggregate_results.values()])) 221 222 # Save results to file 223 for row in rows: 224 with open(os.path.join(results_dir, csv_file), "a") as fh: 225 fh.write(row + "\n") 226 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 227 228 def measure_sar(self, results_dir, sar_file_name): 229 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 230 time.sleep(self.sar_delay) 231 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 232 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 233 for line in out.split("\n"): 234 if "Average" in line and "CPU" in line: 235 self.log_print("Summary CPU utilization from SAR:") 236 self.log_print(line) 237 if "Average" in line and "all" in line: 238 self.log_print(line) 239 fh.write(out) 240 241 def measure_pcm_memory(self, results_dir, pcm_file_name): 242 time.sleep(self.pcm_delay) 243 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 244 results_dir, pcm_file_name), shell=True) 245 time.sleep(self.pcm_count) 246 pcm_memory.kill() 247 248 def measure_pcm(self, results_dir, pcm_file_name): 249 time.sleep(self.pcm_delay) 250 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 251 results_dir, pcm_file_name), shell=True, check=True) 252 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 253 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 254 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 255 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 256 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 257 258 def measure_bandwidth(self, results_dir, bandwidth_file_name): 259 bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name, 260 self.bandwidth_count), shell=True, check=True) 261 262 263class Initiator(Server): 264 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 265 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 266 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 267 268 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 269 270 self.ip = ip 271 self.spdk_dir = workspace 272 if os.getenv('SPDK_WORKSPACE'): 273 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 274 self.fio_bin = fio_bin 275 self.cpus_allowed = cpus_allowed 276 self.cpus_allowed_policy = cpus_allowed_policy 277 self.cpu_frequency = cpu_frequency 278 self.nvmecli_bin = nvmecli_bin 279 self.ssh_connection = paramiko.SSHClient() 280 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 281 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 282 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 283 self.remote_call("mkdir -p %s" % self.spdk_dir) 284 self.set_cpu_frequency() 285 286 def __del__(self): 287 self.ssh_connection.close() 288 289 def put_file(self, local, remote_dest): 290 ftp = self.ssh_connection.open_sftp() 291 ftp.put(local, remote_dest) 292 ftp.close() 293 294 def get_file(self, remote, local_dest): 295 ftp = self.ssh_connection.open_sftp() 296 ftp.get(remote, local_dest) 297 ftp.close() 298 299 def remote_call(self, cmd): 300 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 301 out = stdout.read().decode(encoding="utf-8") 302 err = stderr.read().decode(encoding="utf-8") 303 return out, err 304 305 def copy_result_files(self, dest_dir): 306 self.log_print("Copying results") 307 308 if not os.path.exists(dest_dir): 309 os.mkdir(dest_dir) 310 311 # Get list of result files from initiator and copy them back to target 312 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 313 file_list = stdout.strip().split("\n") 314 315 for file in file_list: 316 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 317 os.path.join(dest_dir, file)) 318 self.log_print("Done copying results") 319 320 def discover_subsystems(self, address_list, subsys_no): 321 num_nvmes = range(0, subsys_no) 322 nvme_discover_output = "" 323 for ip, subsys_no in itertools.product(address_list, num_nvmes): 324 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 325 nvme_discover_cmd = ["sudo", 326 "%s" % self.nvmecli_bin, 327 "discover", "-t %s" % self.transport, 328 "-s %s" % (4420 + subsys_no), 329 "-a %s" % ip] 330 nvme_discover_cmd = " ".join(nvme_discover_cmd) 331 332 stdout, stderr = self.remote_call(nvme_discover_cmd) 333 if stdout: 334 nvme_discover_output = nvme_discover_output + stdout 335 336 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 337 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 338 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 339 nvme_discover_output) # from nvme discovery output 340 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 341 subsystems = list(set(subsystems)) 342 subsystems.sort(key=lambda x: x[1]) 343 self.log_print("Found matching subsystems on target side:") 344 for s in subsystems: 345 self.log_print(s) 346 347 return subsystems 348 349 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 350 fio_conf_template = """ 351[global] 352ioengine={ioengine} 353{spdk_conf} 354thread=1 355group_reporting=1 356direct=1 357percentile_list=50:90:99:99.5:99.9:99.99:99.999 358 359norandommap=1 360rw={rw} 361rwmixread={rwmixread} 362bs={block_size} 363time_based=1 364ramp_time={ramp_time} 365runtime={run_time} 366""" 367 if "spdk" in self.mode: 368 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 369 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 370 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 371 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 372 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 373 else: 374 ioengine = "libaio" 375 spdk_conf = "" 376 out, err = self.remote_call("sudo nvme list | grep 'SPDK' | awk '{print $1}'") 377 subsystems = [x for x in out.split("\n") if "nvme" in x] 378 379 if self.cpus_allowed is not None: 380 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 381 cpus_num = 0 382 cpus = self.cpus_allowed.split(",") 383 for cpu in cpus: 384 if "-" in cpu: 385 a, b = cpu.split("-") 386 a = int(a) 387 b = int(b) 388 cpus_num += len(range(a, b)) 389 else: 390 cpus_num += 1 391 threads = range(0, cpus_num) 392 elif hasattr(self, 'num_cores'): 393 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 394 threads = range(0, int(self.num_cores)) 395 else: 396 threads = range(0, len(subsystems)) 397 398 if "spdk" in self.mode: 399 filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs) 400 else: 401 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 402 403 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 404 rw=rw, rwmixread=rwmixread, block_size=block_size, 405 ramp_time=ramp_time, run_time=run_time) 406 if num_jobs: 407 fio_config = fio_config + "numjobs=%s \n" % num_jobs 408 if self.cpus_allowed is not None: 409 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 410 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 411 fio_config = fio_config + filename_section 412 413 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 414 if hasattr(self, "num_cores"): 415 fio_config_filename += "_%sCPU" % self.num_cores 416 fio_config_filename += ".fio" 417 418 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 419 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 420 self.log_print("Created FIO Config:") 421 self.log_print(fio_config) 422 423 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 424 425 def set_cpu_frequency(self): 426 if self.cpu_frequency is not None: 427 try: 428 self.remote_call('sudo cpupower frequency-set -g userspace') 429 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 430 cmd = "sudo cpupower frequency-info" 431 output, error = self.remote_call(cmd) 432 self.log_print(output) 433 self.log_print(error) 434 except Exception: 435 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 436 sys.exit() 437 else: 438 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 439 440 def run_fio(self, fio_config_file, run_num=None): 441 job_name, _ = os.path.splitext(fio_config_file) 442 self.log_print("Starting FIO run for job: %s" % job_name) 443 self.log_print("Using FIO: %s" % self.fio_bin) 444 445 if run_num: 446 for i in range(1, run_num + 1): 447 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 448 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 449 output, error = self.remote_call(cmd) 450 self.log_print(output) 451 self.log_print(error) 452 else: 453 output_filename = job_name + "_" + self.name + ".json" 454 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 455 output, error = self.remote_call(cmd) 456 self.log_print(output) 457 self.log_print(error) 458 self.log_print("FIO run finished. Results in: %s" % output_filename) 459 460 461class KernelTarget(Target): 462 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 463 use_null_block=False, sar_settings=None, pcm_settings=None, 464 bandwidth_settings=None, nvmet_bin="nvmetcli", **kwargs): 465 466 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 467 use_null_block, sar_settings, pcm_settings, bandwidth_settings) 468 self.nvmet_bin = nvmet_bin 469 470 def __del__(self): 471 nvmet_command(self.nvmet_bin, "clear") 472 473 def kernel_tgt_gen_nullblock_conf(self, address): 474 nvmet_cfg = { 475 "ports": [], 476 "hosts": [], 477 "subsystems": [], 478 } 479 480 nvmet_cfg["subsystems"].append({ 481 "allowed_hosts": [], 482 "attr": { 483 "allow_any_host": "1", 484 "serial": "SPDK0001", 485 "version": "1.3" 486 }, 487 "namespaces": [ 488 { 489 "device": { 490 "path": "/dev/nullb0", 491 "uuid": "%s" % uuid.uuid4() 492 }, 493 "enable": 1, 494 "nsid": 1 495 } 496 ], 497 "nqn": "nqn.2018-09.io.spdk:cnode1" 498 }) 499 500 nvmet_cfg["ports"].append({ 501 "addr": { 502 "adrfam": "ipv4", 503 "traddr": address, 504 "trsvcid": "4420", 505 "trtype": "%s" % self.transport, 506 }, 507 "portid": 1, 508 "referrals": [], 509 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 510 }) 511 with open("kernel.conf", 'w') as fh: 512 fh.write(json.dumps(nvmet_cfg, indent=2)) 513 514 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 515 516 nvmet_cfg = { 517 "ports": [], 518 "hosts": [], 519 "subsystems": [], 520 } 521 522 # Split disks between NIC IP's 523 disks_per_ip = int(len(nvme_list) / len(address_list)) 524 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 525 526 subsys_no = 1 527 port_no = 0 528 for ip, chunk in zip(address_list, disk_chunks): 529 for disk in chunk: 530 nvmet_cfg["subsystems"].append({ 531 "allowed_hosts": [], 532 "attr": { 533 "allow_any_host": "1", 534 "serial": "SPDK00%s" % subsys_no, 535 "version": "1.3" 536 }, 537 "namespaces": [ 538 { 539 "device": { 540 "path": disk, 541 "uuid": "%s" % uuid.uuid4() 542 }, 543 "enable": 1, 544 "nsid": subsys_no 545 } 546 ], 547 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 548 }) 549 550 nvmet_cfg["ports"].append({ 551 "addr": { 552 "adrfam": "ipv4", 553 "traddr": ip, 554 "trsvcid": "%s" % (4420 + port_no), 555 "trtype": "%s" % self.transport 556 }, 557 "portid": subsys_no, 558 "referrals": [], 559 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 560 }) 561 subsys_no += 1 562 port_no += 1 563 564 with open("kernel.conf", "w") as fh: 565 fh.write(json.dumps(nvmet_cfg, indent=2)) 566 pass 567 568 def tgt_start(self): 569 self.log_print("Configuring kernel NVMeOF Target") 570 self.subsys_no = get_nvme_devices_count() 571 572 if self.null_block: 573 print("Configuring with null block device.") 574 if len(self.nic_ips) > 1: 575 print("Testing with null block limited to single RDMA NIC.") 576 print("Please specify only 1 IP address.") 577 exit(1) 578 self.subsys_no = 1 579 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 580 else: 581 print("Configuring with NVMe drives.") 582 nvme_list = get_nvme_devices() 583 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 584 self.subsys_no = len(nvme_list) 585 586 nvmet_command(self.nvmet_bin, "clear") 587 nvmet_command(self.nvmet_bin, "restore kernel.conf") 588 self.log_print("Done configuring kernel NVMeOF Target") 589 590 591class SPDKTarget(Target): 592 593 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 594 use_null_block=False, sar_settings=None, pcm_settings=None, 595 bandwidth_settings=None, num_shared_buffers=4096, num_cores=1, **kwargs): 596 597 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 598 use_null_block, sar_settings, pcm_settings, bandwidth_settings) 599 self.num_cores = num_cores 600 self.num_shared_buffers = num_shared_buffers 601 602 def spdk_tgt_configure(self): 603 self.log_print("Configuring SPDK NVMeOF target via RPC") 604 numa_list = get_used_numa_nodes() 605 606 # Create RDMA transport layer 607 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 608 self.log_print("SPDK NVMeOF transport layer:") 609 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 610 611 if self.null_block: 612 nvme_section = self.spdk_tgt_add_nullblock() 613 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 614 else: 615 nvme_section = self.spdk_tgt_add_nvme_conf() 616 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 617 self.log_print("Done configuring SPDK NVMeOF Target") 618 619 def spdk_tgt_add_nullblock(self): 620 self.log_print("Adding null block bdev to config via RPC") 621 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1") 622 self.log_print("SPDK Bdevs configuration:") 623 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 624 625 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 626 self.log_print("Adding NVMe bdevs to config via RPC") 627 628 bdfs = get_nvme_devices_bdf() 629 bdfs = [b.replace(":", ".") for b in bdfs] 630 631 if req_num_disks: 632 if req_num_disks > len(bdfs): 633 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 634 sys.exit(1) 635 else: 636 bdfs = bdfs[0:req_num_disks] 637 638 for i, bdf in enumerate(bdfs): 639 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 640 641 self.log_print("SPDK Bdevs configuration:") 642 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 643 644 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 645 self.log_print("Adding subsystems to config") 646 if not req_num_disks: 647 req_num_disks = get_nvme_devices_count() 648 649 # Distribute bdevs between provided NICs 650 num_disks = range(0, req_num_disks) 651 if len(num_disks) == 1: 652 disks_per_ip = 1 653 else: 654 disks_per_ip = int(len(num_disks) / len(ips)) 655 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 656 657 # Create subsystems, add bdevs to namespaces, add listeners 658 for ip, chunk in zip(ips, disk_chunks): 659 for c in chunk: 660 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 661 serial = "SPDK00%s" % c 662 bdev_name = "Nvme%sn1" % c 663 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 664 allow_any_host=True, max_namespaces=8) 665 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 666 667 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 668 trtype=self.transport, 669 traddr=ip, 670 trsvcid="4420", 671 adrfam="ipv4") 672 673 self.log_print("SPDK NVMeOF subsystem configuration:") 674 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 675 676 def tgt_start(self): 677 self.subsys_no = get_nvme_devices_count() 678 if self.null_block: 679 self.subsys_no = 1 680 self.log_print("Starting SPDK NVMeOF Target process") 681 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 682 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 683 proc = subprocess.Popen(command, shell=True) 684 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 685 686 with open(self.pid, "w") as fh: 687 fh.write(str(proc.pid)) 688 self.nvmf_proc = proc 689 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 690 self.log_print("Waiting for spdk to initilize...") 691 while True: 692 if os.path.exists("/var/tmp/spdk.sock"): 693 break 694 time.sleep(1) 695 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 696 697 self.spdk_tgt_configure() 698 699 def __del__(self): 700 if hasattr(self, "nvmf_proc"): 701 try: 702 self.nvmf_proc.terminate() 703 self.nvmf_proc.wait() 704 except Exception as e: 705 self.log_print(e) 706 self.nvmf_proc.kill() 707 self.nvmf_proc.communicate() 708 709 710class KernelInitiator(Initiator): 711 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 712 cpus_allowed=None, cpus_allowed_policy="shared", 713 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 714 715 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 716 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 717 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 718 719 self.extra_params = "" 720 if kwargs["extra_params"]: 721 self.extra_params = kwargs["extra_params"] 722 723 def __del__(self): 724 self.ssh_connection.close() 725 726 def kernel_init_connect(self, address_list, subsys_no): 727 subsystems = self.discover_subsystems(address_list, subsys_no) 728 self.log_print("Below connection attempts may result in error messages, this is expected!") 729 for subsystem in subsystems: 730 self.log_print("Trying to connect %s %s %s" % subsystem) 731 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 732 self.transport, 733 *subsystem, 734 self.extra_params)) 735 time.sleep(2) 736 737 def kernel_init_disconnect(self, address_list, subsys_no): 738 subsystems = self.discover_subsystems(address_list, subsys_no) 739 for subsystem in subsystems: 740 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 741 time.sleep(1) 742 743 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 744 out, err = self.remote_call("sudo nvme list | grep 'SPDK' | awk '{print $1}'") 745 nvme_list = [x for x in out.split("\n") if "nvme" in x] 746 747 filename_section = "" 748 nvme_per_split = int(len(nvme_list) / len(threads)) 749 remainder = len(nvme_list) % len(threads) 750 iterator = iter(nvme_list) 751 result = [] 752 for i in range(len(threads)): 753 result.append([]) 754 for j in range(nvme_per_split): 755 result[i].append(next(iterator)) 756 if remainder: 757 result[i].append(next(iterator)) 758 remainder -= 1 759 for i, r in enumerate(result): 760 header = "[filename%s]" % i 761 disks = "\n".join(["filename=%s" % x for x in r]) 762 job_section_qd = round((io_depth * len(r)) / num_jobs) 763 iodepth = "iodepth=%s" % job_section_qd 764 filename_section = "\n".join([filename_section, header, disks, iodepth]) 765 766 return filename_section 767 768 769class SPDKInitiator(Initiator): 770 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 771 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 772 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 773 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 774 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 775 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 776 777 self.num_cores = num_cores 778 779 def install_spdk(self, local_spdk_zip): 780 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 781 self.log_print("Copied sources zip from target") 782 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 783 784 self.log_print("Sources unpacked") 785 self.log_print("Using fio binary %s" % self.fio_bin) 786 self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;" 787 "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 788 789 self.log_print("SPDK built") 790 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 791 792 def gen_spdk_bdev_conf(self, remote_subsystem_list): 793 header = "[Nvme]" 794 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 795 796 bdev_rows = [row_template.format(transport=self.transport, 797 svc=x[0], 798 nqn=x[1], 799 ip=x[2], 800 i=i) for i, x in enumerate(remote_subsystem_list)] 801 bdev_rows = "\n".join(bdev_rows) 802 bdev_section = "\n".join([header, bdev_rows]) 803 return bdev_section 804 805 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 806 filename_section = "" 807 if len(threads) >= len(subsystems): 808 threads = range(0, len(subsystems)) 809 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 810 nvme_per_split = int(len(subsystems) / len(threads)) 811 remainder = len(subsystems) % len(threads) 812 iterator = iter(filenames) 813 result = [] 814 for i in range(len(threads)): 815 result.append([]) 816 for j in range(nvme_per_split): 817 result[i].append(next(iterator)) 818 if remainder: 819 result[i].append(next(iterator)) 820 remainder -= 1 821 for i, r in enumerate(result): 822 header = "[filename%s]" % i 823 disks = "\n".join(["filename=%s" % x for x in r]) 824 job_section_qd = round((io_depth * len(r)) / num_jobs) 825 iodepth = "iodepth=%s" % job_section_qd 826 filename_section = "\n".join([filename_section, header, disks, iodepth]) 827 828 return filename_section 829 830 831if __name__ == "__main__": 832 spdk_zip_path = "/tmp/spdk.zip" 833 target_results_dir = "/tmp/results" 834 835 if (len(sys.argv) > 1): 836 config_file_path = sys.argv[1] 837 else: 838 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 839 config_file_path = os.path.join(script_full_dir, "config.json") 840 841 print("Using config file: %s" % config_file_path) 842 with open(config_file_path, "r") as config: 843 data = json.load(config) 844 845 initiators = [] 846 fio_cases = [] 847 848 for k, v in data.items(): 849 if "target" in k: 850 if data[k]["mode"] == "spdk": 851 target_obj = SPDKTarget(name=k, **data["general"], **v) 852 elif data[k]["mode"] == "kernel": 853 target_obj = KernelTarget(name=k, **data["general"], **v) 854 elif "initiator" in k: 855 if data[k]["mode"] == "spdk": 856 init_obj = SPDKInitiator(name=k, **data["general"], **v) 857 elif data[k]["mode"] == "kernel": 858 init_obj = KernelInitiator(name=k, **data["general"], **v) 859 initiators.append(init_obj) 860 elif "fio" in k: 861 fio_workloads = itertools.product(data[k]["bs"], 862 data[k]["qd"], 863 data[k]["rw"]) 864 865 fio_run_time = data[k]["run_time"] 866 fio_ramp_time = data[k]["ramp_time"] 867 fio_rw_mix_read = data[k]["rwmixread"] 868 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 869 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 870 else: 871 continue 872 873 # Copy and install SPDK on remote initiators 874 if "skip_spdk_install" not in data["general"]: 875 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 876 threads = [] 877 for i in initiators: 878 if i.mode == "spdk": 879 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 880 threads.append(t) 881 t.start() 882 for t in threads: 883 t.join() 884 885 target_obj.tgt_start() 886 887 # Poor mans threading 888 # Run FIO tests 889 for block_size, io_depth, rw in fio_workloads: 890 threads = [] 891 configs = [] 892 for i in initiators: 893 if i.mode == "kernel": 894 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 895 896 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 897 fio_num_jobs, fio_ramp_time, fio_run_time) 898 configs.append(cfg) 899 900 for i, cfg in zip(initiators, configs): 901 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 902 threads.append(t) 903 if target_obj.enable_sar: 904 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 905 sar_file_name = ".".join([sar_file_name, "txt"]) 906 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 907 threads.append(t) 908 909 if target_obj.enable_pcm: 910 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 911 pcm_file_name = ".".join([pcm_file_name, "csv"]) 912 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 913 threads.append(t) 914 915 if target_obj.enable_pcm_memory: 916 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 917 pcm_file_name = ".".join([pcm_file_name, "csv"]) 918 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 919 threads.append(t) 920 921 if target_obj.enable_bandwidth: 922 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 923 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 924 t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 925 threads.append(t) 926 927 for t in threads: 928 t.start() 929 for t in threads: 930 t.join() 931 932 for i in initiators: 933 if i.mode == "kernel": 934 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 935 i.copy_result_files(target_results_dir) 936 937 target_obj.parse_results(target_results_dir) 938