1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from collections import OrderedDict 18from common import * 19 20 21class Server: 22 def __init__(self, name, username, password, mode, nic_ips, transport): 23 self.name = name 24 self.mode = mode 25 self.username = username 26 self.password = password 27 self.nic_ips = nic_ips 28 self.transport = transport.lower() 29 30 if not re.match("^[A-Za-z0-9]*$", name): 31 self.log_print("Please use a name which contains only letters or numbers") 32 sys.exit(1) 33 34 def log_print(self, msg): 35 print("[%s] %s" % (self.name, msg), flush=True) 36 37 38class Target(Server): 39 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 40 use_null_block=False, sar_settings=None, pcm_settings=None, 41 bandwidth_settings=None, dpdk_settings=None): 42 43 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 44 self.null_block = bool(use_null_block) 45 self.enable_sar = False 46 self.enable_pcm_memory = False 47 self.enable_pcm = False 48 self.enable_bandwidth = False 49 self.enable_dpdk_memory = False 50 51 if sar_settings: 52 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 53 54 if pcm_settings: 55 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 56 57 if bandwidth_settings: 58 self.enable_bandwidth, self.bandwidth_count = bandwidth_settings 59 60 if dpdk_settings: 61 self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings 62 63 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 64 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 65 66 def zip_spdk_sources(self, spdk_dir, dest_file): 67 self.log_print("Zipping SPDK source directory") 68 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 69 for root, directories, files in os.walk(spdk_dir, followlinks=True): 70 for file in files: 71 fh.write(os.path.relpath(os.path.join(root, file))) 72 fh.close() 73 self.log_print("Done zipping") 74 75 def read_json_stats(self, file): 76 with open(file, "r") as json_data: 77 data = json.load(json_data) 78 job_pos = 0 # job_post = 0 because using aggregated results 79 80 # Check if latency is in nano or microseconds to choose correct dict key 81 def get_lat_unit(key_prefix, dict_section): 82 # key prefix - lat, clat or slat. 83 # dict section - portion of json containing latency bucket in question 84 # Return dict key to access the bucket and unit as string 85 for k, v in dict_section.items(): 86 if k.startswith(key_prefix): 87 return k, k.split("_")[1] 88 89 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 90 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 91 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 92 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 93 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 94 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 95 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 96 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 97 read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"]) 98 read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"]) 99 read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"]) 100 101 if "ns" in lat_unit: 102 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 103 if "ns" in clat_unit: 104 read_p99_lat = read_p99_lat / 1000 105 read_p99_9_lat = read_p99_9_lat / 1000 106 read_p99_99_lat = read_p99_99_lat / 1000 107 read_p99_999_lat = read_p99_999_lat / 1000 108 109 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 110 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 111 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 112 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 113 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 114 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 115 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 116 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 117 write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"]) 118 write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"]) 119 write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"]) 120 121 if "ns" in lat_unit: 122 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 123 if "ns" in clat_unit: 124 write_p99_lat = write_p99_lat / 1000 125 write_p99_9_lat = write_p99_9_lat / 1000 126 write_p99_99_lat = write_p99_99_lat / 1000 127 write_p99_999_lat = write_p99_999_lat / 1000 128 129 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 130 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 131 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 132 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 133 134 def parse_results(self, results_dir, initiator_count=None, run_num=None): 135 files = os.listdir(results_dir) 136 fio_files = filter(lambda x: ".fio" in x, files) 137 json_files = [x for x in files if ".json" in x] 138 139 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 140 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 141 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 142 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 143 144 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 145 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 146 147 header_line = ",".join(["Name", *headers]) 148 aggr_header_line = ",".join(["Name", *aggr_headers]) 149 150 # Create empty results file 151 csv_file = "nvmf_results.csv" 152 with open(os.path.join(results_dir, csv_file), "w") as fh: 153 fh.write(aggr_header_line + "\n") 154 rows = set() 155 156 for fio_config in fio_files: 157 self.log_print("Getting FIO stats for %s" % fio_config) 158 job_name, _ = os.path.splitext(fio_config) 159 160 # Look in the filename for rwmixread value. Function arguments do 161 # not have that information. 162 # TODO: Improve this function by directly using workload params instead 163 # of regexing through filenames. 164 if "read" in job_name: 165 rw_mixread = 1 166 elif "write" in job_name: 167 rw_mixread = 0 168 else: 169 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 170 171 # If "_CPU" exists in name - ignore it 172 # Initiators for the same job could have diffrent num_cores parameter 173 job_name = re.sub(r"_\d+CPU", "", job_name) 174 job_result_files = [x for x in json_files if job_name in x] 175 self.log_print("Matching result files for current fio config:") 176 for j in job_result_files: 177 self.log_print("\t %s" % j) 178 179 # There may have been more than 1 initiator used in test, need to check that 180 # Result files are created so that string after last "_" separator is server name 181 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 182 inits_avg_results = [] 183 for i in inits_names: 184 self.log_print("\tGetting stats for initiator %s" % i) 185 # There may have been more than 1 test run for this job, calculate average results for initiator 186 i_results = [x for x in job_result_files if i in x] 187 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 188 189 separate_stats = [] 190 for r in i_results: 191 stats = self.read_json_stats(os.path.join(results_dir, r)) 192 separate_stats.append(stats) 193 self.log_print(stats) 194 195 init_results = [sum(x) for x in zip(*separate_stats)] 196 init_results = [x / len(separate_stats) for x in init_results] 197 inits_avg_results.append(init_results) 198 199 self.log_print("\tAverage results for initiator %s" % i) 200 self.log_print(init_results) 201 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 202 fh.write(header_line + "\n") 203 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 204 205 # Sum results of all initiators running this FIO job. 206 # Latency results are an average of latencies from accros all initiators. 207 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 208 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 209 for key in inits_avg_results: 210 if "lat" in key: 211 inits_avg_results[key] /= len(inits_names) 212 213 # Aggregate separate read/write values into common labels 214 # Take rw_mixread into consideration for mixed read/write workloads. 215 aggregate_results = OrderedDict() 216 for h in aggr_headers: 217 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 218 if "lat" in h: 219 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 220 else: 221 _ = read_stat + write_stat 222 aggregate_results[h] = "{0:.3f}".format(_) 223 224 rows.add(",".join([job_name, *aggregate_results.values()])) 225 226 # Save results to file 227 for row in rows: 228 with open(os.path.join(results_dir, csv_file), "a") as fh: 229 fh.write(row + "\n") 230 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 231 232 def measure_sar(self, results_dir, sar_file_name): 233 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 234 time.sleep(self.sar_delay) 235 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 236 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 237 for line in out.split("\n"): 238 if "Average" in line and "CPU" in line: 239 self.log_print("Summary CPU utilization from SAR:") 240 self.log_print(line) 241 if "Average" in line and "all" in line: 242 self.log_print(line) 243 fh.write(out) 244 245 def measure_pcm_memory(self, results_dir, pcm_file_name): 246 time.sleep(self.pcm_delay) 247 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 248 results_dir, pcm_file_name), shell=True) 249 time.sleep(self.pcm_count) 250 pcm_memory.kill() 251 252 def measure_pcm(self, results_dir, pcm_file_name): 253 time.sleep(self.pcm_delay) 254 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 255 results_dir, pcm_file_name), shell=True, check=True) 256 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 257 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 258 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 259 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 260 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 261 262 def measure_bandwidth(self, results_dir, bandwidth_file_name): 263 bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name, 264 self.bandwidth_count), shell=True, check=True) 265 266 def measure_dpdk_memory(self, results_dir): 267 self.log_print("INFO: waiting to generate DPDK memory usage") 268 time.sleep(self.dpdk_wait_time) 269 self.log_print("INFO: generating DPDK memory usage") 270 rpc.env.env_dpdk_get_mem_stats 271 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 272 273 274class Initiator(Server): 275 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 276 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 277 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 278 279 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 280 281 self.ip = ip 282 self.spdk_dir = workspace 283 if os.getenv('SPDK_WORKSPACE'): 284 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 285 self.fio_bin = fio_bin 286 self.cpus_allowed = cpus_allowed 287 self.cpus_allowed_policy = cpus_allowed_policy 288 self.cpu_frequency = cpu_frequency 289 self.nvmecli_bin = nvmecli_bin 290 self.ssh_connection = paramiko.SSHClient() 291 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 292 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 293 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 294 self.remote_call("mkdir -p %s" % self.spdk_dir) 295 self.set_cpu_frequency() 296 297 def __del__(self): 298 self.ssh_connection.close() 299 300 def put_file(self, local, remote_dest): 301 ftp = self.ssh_connection.open_sftp() 302 ftp.put(local, remote_dest) 303 ftp.close() 304 305 def get_file(self, remote, local_dest): 306 ftp = self.ssh_connection.open_sftp() 307 ftp.get(remote, local_dest) 308 ftp.close() 309 310 def remote_call(self, cmd): 311 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 312 out = stdout.read().decode(encoding="utf-8") 313 err = stderr.read().decode(encoding="utf-8") 314 return out, err 315 316 def copy_result_files(self, dest_dir): 317 self.log_print("Copying results") 318 319 if not os.path.exists(dest_dir): 320 os.mkdir(dest_dir) 321 322 # Get list of result files from initiator and copy them back to target 323 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 324 file_list = stdout.strip().split("\n") 325 326 for file in file_list: 327 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 328 os.path.join(dest_dir, file)) 329 self.log_print("Done copying results") 330 331 def discover_subsystems(self, address_list, subsys_no): 332 num_nvmes = range(0, subsys_no) 333 nvme_discover_output = "" 334 for ip, subsys_no in itertools.product(address_list, num_nvmes): 335 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 336 nvme_discover_cmd = ["sudo", 337 "%s" % self.nvmecli_bin, 338 "discover", "-t %s" % self.transport, 339 "-s %s" % (4420 + subsys_no), 340 "-a %s" % ip] 341 nvme_discover_cmd = " ".join(nvme_discover_cmd) 342 343 stdout, stderr = self.remote_call(nvme_discover_cmd) 344 if stdout: 345 nvme_discover_output = nvme_discover_output + stdout 346 347 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 348 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 349 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 350 nvme_discover_output) # from nvme discovery output 351 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 352 subsystems = list(set(subsystems)) 353 subsystems.sort(key=lambda x: x[1]) 354 self.log_print("Found matching subsystems on target side:") 355 for s in subsystems: 356 self.log_print(s) 357 358 return subsystems 359 360 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 361 fio_conf_template = """ 362[global] 363ioengine={ioengine} 364{spdk_conf} 365thread=1 366group_reporting=1 367direct=1 368percentile_list=50:90:99:99.5:99.9:99.99:99.999 369 370norandommap=1 371rw={rw} 372rwmixread={rwmixread} 373bs={block_size} 374time_based=1 375ramp_time={ramp_time} 376runtime={run_time} 377""" 378 if "spdk" in self.mode: 379 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 380 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 381 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 382 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 383 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 384 else: 385 ioengine = "libaio" 386 spdk_conf = "" 387 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 388 subsystems = [x for x in out.split("\n") if "nvme" in x] 389 390 if self.cpus_allowed is not None: 391 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 392 cpus_num = 0 393 cpus = self.cpus_allowed.split(",") 394 for cpu in cpus: 395 if "-" in cpu: 396 a, b = cpu.split("-") 397 a = int(a) 398 b = int(b) 399 cpus_num += len(range(a, b)) 400 else: 401 cpus_num += 1 402 threads = range(0, cpus_num) 403 elif hasattr(self, 'num_cores'): 404 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 405 threads = range(0, int(self.num_cores)) 406 else: 407 threads = range(0, len(subsystems)) 408 409 if "spdk" in self.mode: 410 filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs) 411 else: 412 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 413 414 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 415 rw=rw, rwmixread=rwmixread, block_size=block_size, 416 ramp_time=ramp_time, run_time=run_time) 417 if num_jobs: 418 fio_config = fio_config + "numjobs=%s \n" % num_jobs 419 if self.cpus_allowed is not None: 420 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 421 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 422 fio_config = fio_config + filename_section 423 424 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 425 if hasattr(self, "num_cores"): 426 fio_config_filename += "_%sCPU" % self.num_cores 427 fio_config_filename += ".fio" 428 429 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 430 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 431 self.log_print("Created FIO Config:") 432 self.log_print(fio_config) 433 434 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 435 436 def set_cpu_frequency(self): 437 if self.cpu_frequency is not None: 438 try: 439 self.remote_call('sudo cpupower frequency-set -g userspace') 440 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 441 cmd = "sudo cpupower frequency-info" 442 output, error = self.remote_call(cmd) 443 self.log_print(output) 444 self.log_print(error) 445 except Exception: 446 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 447 sys.exit() 448 else: 449 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 450 451 def run_fio(self, fio_config_file, run_num=None): 452 job_name, _ = os.path.splitext(fio_config_file) 453 self.log_print("Starting FIO run for job: %s" % job_name) 454 self.log_print("Using FIO: %s" % self.fio_bin) 455 456 if run_num: 457 for i in range(1, run_num + 1): 458 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 459 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 460 output, error = self.remote_call(cmd) 461 self.log_print(output) 462 self.log_print(error) 463 else: 464 output_filename = job_name + "_" + self.name + ".json" 465 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 466 output, error = self.remote_call(cmd) 467 self.log_print(output) 468 self.log_print(error) 469 self.log_print("FIO run finished. Results in: %s" % output_filename) 470 471 472class KernelTarget(Target): 473 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 474 use_null_block=False, sar_settings=None, pcm_settings=None, 475 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs): 476 477 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 478 use_null_block, sar_settings, pcm_settings, bandwidth_settings, 479 dpdk_settings) 480 self.nvmet_bin = nvmet_bin 481 482 def __del__(self): 483 nvmet_command(self.nvmet_bin, "clear") 484 485 def kernel_tgt_gen_nullblock_conf(self, address): 486 nvmet_cfg = { 487 "ports": [], 488 "hosts": [], 489 "subsystems": [], 490 } 491 492 nvmet_cfg["subsystems"].append({ 493 "allowed_hosts": [], 494 "attr": { 495 "allow_any_host": "1", 496 "serial": "SPDK0001", 497 "version": "1.3" 498 }, 499 "namespaces": [ 500 { 501 "device": { 502 "path": "/dev/nullb0", 503 "uuid": "%s" % uuid.uuid4() 504 }, 505 "enable": 1, 506 "nsid": 1 507 } 508 ], 509 "nqn": "nqn.2018-09.io.spdk:cnode1" 510 }) 511 512 nvmet_cfg["ports"].append({ 513 "addr": { 514 "adrfam": "ipv4", 515 "traddr": address, 516 "trsvcid": "4420", 517 "trtype": "%s" % self.transport, 518 }, 519 "portid": 1, 520 "referrals": [], 521 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 522 }) 523 with open("kernel.conf", 'w') as fh: 524 fh.write(json.dumps(nvmet_cfg, indent=2)) 525 526 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 527 528 nvmet_cfg = { 529 "ports": [], 530 "hosts": [], 531 "subsystems": [], 532 } 533 534 # Split disks between NIC IP's 535 disks_per_ip = int(len(nvme_list) / len(address_list)) 536 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 537 538 subsys_no = 1 539 port_no = 0 540 for ip, chunk in zip(address_list, disk_chunks): 541 for disk in chunk: 542 nvmet_cfg["subsystems"].append({ 543 "allowed_hosts": [], 544 "attr": { 545 "allow_any_host": "1", 546 "serial": "SPDK00%s" % subsys_no, 547 "version": "1.3" 548 }, 549 "namespaces": [ 550 { 551 "device": { 552 "path": disk, 553 "uuid": "%s" % uuid.uuid4() 554 }, 555 "enable": 1, 556 "nsid": subsys_no 557 } 558 ], 559 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 560 }) 561 562 nvmet_cfg["ports"].append({ 563 "addr": { 564 "adrfam": "ipv4", 565 "traddr": ip, 566 "trsvcid": "%s" % (4420 + port_no), 567 "trtype": "%s" % self.transport 568 }, 569 "portid": subsys_no, 570 "referrals": [], 571 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 572 }) 573 subsys_no += 1 574 port_no += 1 575 576 with open("kernel.conf", "w") as fh: 577 fh.write(json.dumps(nvmet_cfg, indent=2)) 578 pass 579 580 def tgt_start(self): 581 self.log_print("Configuring kernel NVMeOF Target") 582 583 if self.null_block: 584 print("Configuring with null block device.") 585 if len(self.nic_ips) > 1: 586 print("Testing with null block limited to single RDMA NIC.") 587 print("Please specify only 1 IP address.") 588 exit(1) 589 self.subsys_no = 1 590 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 591 else: 592 print("Configuring with NVMe drives.") 593 nvme_list = get_nvme_devices() 594 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 595 self.subsys_no = len(nvme_list) 596 597 nvmet_command(self.nvmet_bin, "clear") 598 nvmet_command(self.nvmet_bin, "restore kernel.conf") 599 self.log_print("Done configuring kernel NVMeOF Target") 600 601 602class SPDKTarget(Target): 603 604 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 605 use_null_block=False, sar_settings=None, pcm_settings=None, 606 bandwidth_settings=None, dpdk_settings=None, num_shared_buffers=4096, 607 num_cores=1, **kwargs): 608 609 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 610 use_null_block, sar_settings, pcm_settings, bandwidth_settings, 611 dpdk_settings) 612 self.num_cores = num_cores 613 self.num_shared_buffers = num_shared_buffers 614 615 def spdk_tgt_configure(self): 616 self.log_print("Configuring SPDK NVMeOF target via RPC") 617 numa_list = get_used_numa_nodes() 618 619 # Create RDMA transport layer 620 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 621 self.log_print("SPDK NVMeOF transport layer:") 622 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 623 624 if self.null_block: 625 nvme_section = self.spdk_tgt_add_nullblock() 626 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 627 else: 628 nvme_section = self.spdk_tgt_add_nvme_conf() 629 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 630 self.log_print("Done configuring SPDK NVMeOF Target") 631 632 def spdk_tgt_add_nullblock(self): 633 self.log_print("Adding null block bdev to config via RPC") 634 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1") 635 self.log_print("SPDK Bdevs configuration:") 636 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 637 638 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 639 self.log_print("Adding NVMe bdevs to config via RPC") 640 641 bdfs = get_nvme_devices_bdf() 642 bdfs = [b.replace(":", ".") for b in bdfs] 643 644 if req_num_disks: 645 if req_num_disks > len(bdfs): 646 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 647 sys.exit(1) 648 else: 649 bdfs = bdfs[0:req_num_disks] 650 651 for i, bdf in enumerate(bdfs): 652 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 653 654 self.log_print("SPDK Bdevs configuration:") 655 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 656 657 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 658 self.log_print("Adding subsystems to config") 659 if not req_num_disks: 660 req_num_disks = get_nvme_devices_count() 661 662 # Distribute bdevs between provided NICs 663 num_disks = range(0, req_num_disks) 664 if len(num_disks) == 1: 665 disks_per_ip = 1 666 else: 667 disks_per_ip = int(len(num_disks) / len(ips)) 668 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 669 670 # Create subsystems, add bdevs to namespaces, add listeners 671 for ip, chunk in zip(ips, disk_chunks): 672 for c in chunk: 673 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 674 serial = "SPDK00%s" % c 675 bdev_name = "Nvme%sn1" % c 676 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 677 allow_any_host=True, max_namespaces=8) 678 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 679 680 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 681 trtype=self.transport, 682 traddr=ip, 683 trsvcid="4420", 684 adrfam="ipv4") 685 686 self.log_print("SPDK NVMeOF subsystem configuration:") 687 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 688 689 def tgt_start(self): 690 if self.null_block: 691 self.subsys_no = 1 692 else: 693 self.subsys_no = get_nvme_devices_count() 694 self.log_print("Starting SPDK NVMeOF Target process") 695 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 696 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 697 proc = subprocess.Popen(command, shell=True) 698 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 699 700 with open(self.pid, "w") as fh: 701 fh.write(str(proc.pid)) 702 self.nvmf_proc = proc 703 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 704 self.log_print("Waiting for spdk to initilize...") 705 while True: 706 if os.path.exists("/var/tmp/spdk.sock"): 707 break 708 time.sleep(1) 709 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 710 711 self.spdk_tgt_configure() 712 713 def __del__(self): 714 if hasattr(self, "nvmf_proc"): 715 try: 716 self.nvmf_proc.terminate() 717 self.nvmf_proc.wait() 718 except Exception as e: 719 self.log_print(e) 720 self.nvmf_proc.kill() 721 self.nvmf_proc.communicate() 722 723 724class KernelInitiator(Initiator): 725 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 726 cpus_allowed=None, cpus_allowed_policy="shared", 727 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 728 729 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 730 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 731 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 732 733 self.extra_params = "" 734 if kwargs["extra_params"]: 735 self.extra_params = kwargs["extra_params"] 736 737 def __del__(self): 738 self.ssh_connection.close() 739 740 def kernel_init_connect(self, address_list, subsys_no): 741 subsystems = self.discover_subsystems(address_list, subsys_no) 742 self.log_print("Below connection attempts may result in error messages, this is expected!") 743 for subsystem in subsystems: 744 self.log_print("Trying to connect %s %s %s" % subsystem) 745 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 746 self.transport, 747 *subsystem, 748 self.extra_params)) 749 time.sleep(2) 750 751 def kernel_init_disconnect(self, address_list, subsys_no): 752 subsystems = self.discover_subsystems(address_list, subsys_no) 753 for subsystem in subsystems: 754 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 755 time.sleep(1) 756 757 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 758 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 759 nvme_list = [x for x in out.split("\n") if "nvme" in x] 760 761 filename_section = "" 762 nvme_per_split = int(len(nvme_list) / len(threads)) 763 remainder = len(nvme_list) % len(threads) 764 iterator = iter(nvme_list) 765 result = [] 766 for i in range(len(threads)): 767 result.append([]) 768 for j in range(nvme_per_split): 769 result[i].append(next(iterator)) 770 if remainder: 771 result[i].append(next(iterator)) 772 remainder -= 1 773 for i, r in enumerate(result): 774 header = "[filename%s]" % i 775 disks = "\n".join(["filename=%s" % x for x in r]) 776 job_section_qd = round((io_depth * len(r)) / num_jobs) 777 if job_section_qd == 0: 778 job_section_qd = 1 779 iodepth = "iodepth=%s" % job_section_qd 780 filename_section = "\n".join([filename_section, header, disks, iodepth]) 781 782 return filename_section 783 784 785class SPDKInitiator(Initiator): 786 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 787 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 788 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 789 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 790 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 791 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 792 793 self.num_cores = num_cores 794 795 def install_spdk(self, local_spdk_zip): 796 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 797 self.log_print("Copied sources zip from target") 798 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 799 800 self.log_print("Sources unpacked") 801 self.log_print("Using fio binary %s" % self.fio_bin) 802 self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;" 803 "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 804 805 self.log_print("SPDK built") 806 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 807 808 def gen_spdk_bdev_conf(self, remote_subsystem_list): 809 header = "[Nvme]" 810 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 811 812 bdev_rows = [row_template.format(transport=self.transport, 813 svc=x[0], 814 nqn=x[1], 815 ip=x[2], 816 i=i) for i, x in enumerate(remote_subsystem_list)] 817 bdev_rows = "\n".join(bdev_rows) 818 bdev_section = "\n".join([header, bdev_rows]) 819 return bdev_section 820 821 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 822 filename_section = "" 823 if len(threads) >= len(subsystems): 824 threads = range(0, len(subsystems)) 825 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 826 nvme_per_split = int(len(subsystems) / len(threads)) 827 remainder = len(subsystems) % len(threads) 828 iterator = iter(filenames) 829 result = [] 830 for i in range(len(threads)): 831 result.append([]) 832 for j in range(nvme_per_split): 833 result[i].append(next(iterator)) 834 if remainder: 835 result[i].append(next(iterator)) 836 remainder -= 1 837 for i, r in enumerate(result): 838 header = "[filename%s]" % i 839 disks = "\n".join(["filename=%s" % x for x in r]) 840 job_section_qd = round((io_depth * len(r)) / num_jobs) 841 if job_section_qd == 0: 842 job_section_qd = 1 843 iodepth = "iodepth=%s" % job_section_qd 844 filename_section = "\n".join([filename_section, header, disks, iodepth]) 845 846 return filename_section 847 848 849if __name__ == "__main__": 850 spdk_zip_path = "/tmp/spdk.zip" 851 target_results_dir = "/tmp/results" 852 853 if (len(sys.argv) > 1): 854 config_file_path = sys.argv[1] 855 else: 856 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 857 config_file_path = os.path.join(script_full_dir, "config.json") 858 859 print("Using config file: %s" % config_file_path) 860 with open(config_file_path, "r") as config: 861 data = json.load(config) 862 863 initiators = [] 864 fio_cases = [] 865 866 for k, v in data.items(): 867 if "target" in k: 868 if data[k]["mode"] == "spdk": 869 target_obj = SPDKTarget(name=k, **data["general"], **v) 870 elif data[k]["mode"] == "kernel": 871 target_obj = KernelTarget(name=k, **data["general"], **v) 872 elif "initiator" in k: 873 if data[k]["mode"] == "spdk": 874 init_obj = SPDKInitiator(name=k, **data["general"], **v) 875 elif data[k]["mode"] == "kernel": 876 init_obj = KernelInitiator(name=k, **data["general"], **v) 877 initiators.append(init_obj) 878 elif "fio" in k: 879 fio_workloads = itertools.product(data[k]["bs"], 880 data[k]["qd"], 881 data[k]["rw"]) 882 883 fio_run_time = data[k]["run_time"] 884 fio_ramp_time = data[k]["ramp_time"] 885 fio_rw_mix_read = data[k]["rwmixread"] 886 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 887 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 888 else: 889 continue 890 891 # Copy and install SPDK on remote initiators 892 if "skip_spdk_install" not in data["general"]: 893 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 894 threads = [] 895 for i in initiators: 896 if i.mode == "spdk": 897 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 898 threads.append(t) 899 t.start() 900 for t in threads: 901 t.join() 902 903 target_obj.tgt_start() 904 905 # Poor mans threading 906 # Run FIO tests 907 for block_size, io_depth, rw in fio_workloads: 908 threads = [] 909 configs = [] 910 for i in initiators: 911 if i.mode == "kernel": 912 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 913 914 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 915 fio_num_jobs, fio_ramp_time, fio_run_time) 916 configs.append(cfg) 917 918 for i, cfg in zip(initiators, configs): 919 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 920 threads.append(t) 921 if target_obj.enable_sar: 922 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 923 sar_file_name = ".".join([sar_file_name, "txt"]) 924 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 925 threads.append(t) 926 927 if target_obj.enable_pcm: 928 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 929 pcm_file_name = ".".join([pcm_file_name, "csv"]) 930 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 931 threads.append(t) 932 933 if target_obj.enable_pcm_memory: 934 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 935 pcm_file_name = ".".join([pcm_file_name, "csv"]) 936 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 937 threads.append(t) 938 939 if target_obj.enable_bandwidth: 940 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 941 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 942 t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 943 threads.append(t) 944 945 if target_obj.enable_dpdk_memory: 946 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir)) 947 threads.append(t) 948 949 for t in threads: 950 t.start() 951 for t in threads: 952 t.join() 953 954 for i in initiators: 955 if i.mode == "kernel": 956 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 957 i.copy_result_files(target_results_dir) 958 959 target_obj.parse_results(target_results_dir) 960