1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from collections import OrderedDict 18from common import * 19 20 21class Server: 22 def __init__(self, name, username, password, mode, nic_ips, transport): 23 self.name = name 24 self.mode = mode 25 self.username = username 26 self.password = password 27 self.nic_ips = nic_ips 28 self.transport = transport.lower() 29 30 if not re.match("^[A-Za-z0-9]*$", name): 31 self.log_print("Please use a name which contains only letters or numbers") 32 sys.exit(1) 33 34 def log_print(self, msg): 35 print("[%s] %s" % (self.name, msg), flush=True) 36 37 38class Target(Server): 39 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 40 use_null_block=False, sar_settings=None, pcm_settings=None): 41 42 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 43 self.null_block = bool(use_null_block) 44 self.enable_sar = False 45 self.enable_pcm_memory = False 46 self.enable_pcm = False 47 48 if sar_settings: 49 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 50 51 if pcm_settings: 52 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 53 54 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 55 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 56 57 def zip_spdk_sources(self, spdk_dir, dest_file): 58 self.log_print("Zipping SPDK source directory") 59 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 60 for root, directories, files in os.walk(spdk_dir, followlinks=True): 61 for file in files: 62 fh.write(os.path.relpath(os.path.join(root, file))) 63 fh.close() 64 self.log_print("Done zipping") 65 66 def read_json_stats(self, file): 67 with open(file, "r") as json_data: 68 data = json.load(json_data) 69 job_pos = 0 # job_post = 0 because using aggregated results 70 71 # Check if latency is in nano or microseconds to choose correct dict key 72 def get_lat_unit(key_prefix, dict_section): 73 # key prefix - lat, clat or slat. 74 # dict section - portion of json containing latency bucket in question 75 # Return dict key to access the bucket and unit as string 76 for k, v in dict_section.items(): 77 if k.startswith(key_prefix): 78 return k, k.split("_")[1] 79 80 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 81 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 82 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 83 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 84 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 85 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 86 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 87 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 88 read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"]) 89 read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"]) 90 read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"]) 91 92 if "ns" in lat_unit: 93 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 94 if "ns" in clat_unit: 95 read_p99_lat = read_p99_lat / 1000 96 read_p99_9_lat = read_p99_9_lat / 1000 97 read_p99_99_lat = read_p99_99_lat / 1000 98 read_p99_999_lat = read_p99_999_lat / 1000 99 100 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 101 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 102 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 103 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 104 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 105 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 106 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 107 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 108 write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"]) 109 write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"]) 110 write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"]) 111 112 if "ns" in lat_unit: 113 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 114 if "ns" in clat_unit: 115 write_p99_lat = write_p99_lat / 1000 116 write_p99_9_lat = write_p99_9_lat / 1000 117 write_p99_99_lat = write_p99_99_lat / 1000 118 write_p99_999_lat = write_p99_999_lat / 1000 119 120 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 121 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 122 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 123 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 124 125 def parse_results(self, results_dir, initiator_count=None, run_num=None): 126 files = os.listdir(results_dir) 127 fio_files = filter(lambda x: ".fio" in x, files) 128 json_files = [x for x in files if ".json" in x] 129 130 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 131 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 132 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 133 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 134 135 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 136 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 137 138 header_line = ",".join(["Name", *headers]) 139 aggr_header_line = ",".join(["Name", *aggr_headers]) 140 141 # Create empty results file 142 csv_file = "nvmf_results.csv" 143 with open(os.path.join(results_dir, csv_file), "w") as fh: 144 fh.write(aggr_header_line + "\n") 145 rows = set() 146 147 for fio_config in fio_files: 148 self.log_print("Getting FIO stats for %s" % fio_config) 149 job_name, _ = os.path.splitext(fio_config) 150 151 # Look in the filename for rwmixread value. Function arguments do 152 # not have that information. 153 # TODO: Improve this function by directly using workload params instead 154 # of regexing through filenames. 155 if "read" in job_name: 156 rw_mixread = 1 157 elif "write" in job_name: 158 rw_mixread = 0 159 else: 160 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 161 162 # If "_CPU" exists in name - ignore it 163 # Initiators for the same job could have diffrent num_cores parameter 164 job_name = re.sub(r"_\d+CPU", "", job_name) 165 job_result_files = [x for x in json_files if job_name in x] 166 self.log_print("Matching result files for current fio config:") 167 for j in job_result_files: 168 self.log_print("\t %s" % j) 169 170 # There may have been more than 1 initiator used in test, need to check that 171 # Result files are created so that string after last "_" separator is server name 172 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 173 inits_avg_results = [] 174 for i in inits_names: 175 self.log_print("\tGetting stats for initiator %s" % i) 176 # There may have been more than 1 test run for this job, calculate average results for initiator 177 i_results = [x for x in job_result_files if i in x] 178 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 179 180 separate_stats = [] 181 for r in i_results: 182 stats = self.read_json_stats(os.path.join(results_dir, r)) 183 separate_stats.append(stats) 184 self.log_print(stats) 185 186 init_results = [sum(x) for x in zip(*separate_stats)] 187 init_results = [x / len(separate_stats) for x in init_results] 188 inits_avg_results.append(init_results) 189 190 self.log_print("\tAverage results for initiator %s" % i) 191 self.log_print(init_results) 192 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 193 fh.write(header_line + "\n") 194 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 195 196 # Sum results of all initiators running this FIO job. 197 # Latency results are an average of latencies from accros all initiators. 198 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 199 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 200 for key in inits_avg_results: 201 if "lat" in key: 202 inits_avg_results[key] /= len(inits_names) 203 204 # Aggregate separate read/write values into common labels 205 # Take rw_mixread into consideration for mixed read/write workloads. 206 aggregate_results = OrderedDict() 207 for h in aggr_headers: 208 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 209 if "lat" in h: 210 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 211 else: 212 _ = read_stat + write_stat 213 aggregate_results[h] = "{0:.3f}".format(_) 214 215 rows.add(",".join([job_name, *aggregate_results.values()])) 216 217 # Save results to file 218 for row in rows: 219 with open(os.path.join(results_dir, csv_file), "a") as fh: 220 fh.write(row + "\n") 221 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 222 223 def measure_sar(self, results_dir, sar_file_name): 224 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 225 time.sleep(self.sar_delay) 226 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 227 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 228 for line in out.split("\n"): 229 if "Average" in line and "CPU" in line: 230 self.log_print("Summary CPU utilization from SAR:") 231 self.log_print(line) 232 if "Average" in line and "all" in line: 233 self.log_print(line) 234 fh.write(out) 235 236 def measure_pcm_memory(self, results_dir, pcm_file_name): 237 time.sleep(self.pcm_delay) 238 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 239 results_dir, pcm_file_name), shell=True) 240 time.sleep(self.pcm_count) 241 pcm_memory.kill() 242 243 def measure_pcm(self, results_dir, pcm_file_name): 244 time.sleep(self.pcm_delay) 245 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 246 results_dir, pcm_file_name), shell=True, check=True) 247 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 248 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 249 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 250 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 251 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 252 253 254class Initiator(Server): 255 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 256 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 257 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 258 259 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 260 261 self.ip = ip 262 self.spdk_dir = workspace 263 if os.getenv('SPDK_WORKSPACE'): 264 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 265 self.fio_bin = fio_bin 266 self.cpus_allowed = cpus_allowed 267 self.cpus_allowed_policy = cpus_allowed_policy 268 self.cpu_frequency = cpu_frequency 269 self.nvmecli_bin = nvmecli_bin 270 self.ssh_connection = paramiko.SSHClient() 271 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 272 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 273 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 274 self.remote_call("mkdir -p %s" % self.spdk_dir) 275 self.set_cpu_frequency() 276 277 def __del__(self): 278 self.ssh_connection.close() 279 280 def put_file(self, local, remote_dest): 281 ftp = self.ssh_connection.open_sftp() 282 ftp.put(local, remote_dest) 283 ftp.close() 284 285 def get_file(self, remote, local_dest): 286 ftp = self.ssh_connection.open_sftp() 287 ftp.get(remote, local_dest) 288 ftp.close() 289 290 def remote_call(self, cmd): 291 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 292 out = stdout.read().decode(encoding="utf-8") 293 err = stderr.read().decode(encoding="utf-8") 294 return out, err 295 296 def copy_result_files(self, dest_dir): 297 self.log_print("Copying results") 298 299 if not os.path.exists(dest_dir): 300 os.mkdir(dest_dir) 301 302 # Get list of result files from initiator and copy them back to target 303 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 304 file_list = stdout.strip().split("\n") 305 306 for file in file_list: 307 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 308 os.path.join(dest_dir, file)) 309 self.log_print("Done copying results") 310 311 def discover_subsystems(self, address_list, subsys_no): 312 num_nvmes = range(0, subsys_no) 313 nvme_discover_output = "" 314 for ip, subsys_no in itertools.product(address_list, num_nvmes): 315 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 316 nvme_discover_cmd = ["sudo", 317 "%s" % self.nvmecli_bin, 318 "discover", "-t %s" % self.transport, 319 "-s %s" % (4420 + subsys_no), 320 "-a %s" % ip] 321 nvme_discover_cmd = " ".join(nvme_discover_cmd) 322 323 stdout, stderr = self.remote_call(nvme_discover_cmd) 324 if stdout: 325 nvme_discover_output = nvme_discover_output + stdout 326 327 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 328 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 329 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 330 nvme_discover_output) # from nvme discovery output 331 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 332 subsystems = list(set(subsystems)) 333 subsystems.sort(key=lambda x: x[1]) 334 self.log_print("Found matching subsystems on target side:") 335 for s in subsystems: 336 self.log_print(s) 337 338 return subsystems 339 340 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 341 fio_conf_template = """ 342[global] 343ioengine={ioengine} 344{spdk_conf} 345thread=1 346group_reporting=1 347direct=1 348percentile_list=50:90:99:99.5:99.9:99.99:99.999 349 350norandommap=1 351rw={rw} 352rwmixread={rwmixread} 353bs={block_size} 354time_based=1 355ramp_time={ramp_time} 356runtime={run_time} 357""" 358 if "spdk" in self.mode: 359 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 360 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 361 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 362 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 363 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 364 else: 365 ioengine = "libaio" 366 spdk_conf = "" 367 out, err = self.remote_call("lsblk -o NAME -nlp") 368 subsystems = [x for x in out.split("\n") if "nvme" in x] 369 370 if self.cpus_allowed is not None: 371 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 372 cpus_num = 0 373 cpus = self.cpus_allowed.split(",") 374 for cpu in cpus: 375 if "-" in cpu: 376 a, b = cpu.split("-") 377 a = int(a) 378 b = int(b) 379 cpus_num += len(range(a, b)) 380 else: 381 cpus_num += 1 382 threads = range(0, cpus_num) 383 elif hasattr(self, 'num_cores'): 384 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 385 threads = range(0, int(self.num_cores)) 386 else: 387 threads = range(0, len(subsystems)) 388 389 if "spdk" in self.mode: 390 filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs) 391 else: 392 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 393 394 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 395 rw=rw, rwmixread=rwmixread, block_size=block_size, 396 ramp_time=ramp_time, run_time=run_time) 397 if num_jobs: 398 fio_config = fio_config + "numjobs=%s \n" % num_jobs 399 if self.cpus_allowed is not None: 400 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 401 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 402 fio_config = fio_config + filename_section 403 404 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 405 if hasattr(self, "num_cores"): 406 fio_config_filename += "_%sCPU" % self.num_cores 407 fio_config_filename += ".fio" 408 409 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 410 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 411 self.log_print("Created FIO Config:") 412 self.log_print(fio_config) 413 414 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 415 416 def set_cpu_frequency(self): 417 if self.cpu_frequency is not None: 418 try: 419 self.remote_call('sudo cpupower frequency-set -g userspace') 420 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 421 cmd = "sudo cpupower frequency-info" 422 output, error = self.remote_call(cmd) 423 self.log_print(output) 424 self.log_print(error) 425 except Exception: 426 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 427 sys.exit() 428 else: 429 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 430 431 def run_fio(self, fio_config_file, run_num=None): 432 job_name, _ = os.path.splitext(fio_config_file) 433 self.log_print("Starting FIO run for job: %s" % job_name) 434 self.log_print("Using FIO: %s" % self.fio_bin) 435 436 if run_num: 437 for i in range(1, run_num + 1): 438 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 439 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 440 output, error = self.remote_call(cmd) 441 self.log_print(output) 442 self.log_print(error) 443 else: 444 output_filename = job_name + "_" + self.name + ".json" 445 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 446 output, error = self.remote_call(cmd) 447 self.log_print(output) 448 self.log_print(error) 449 self.log_print("FIO run finished. Results in: %s" % output_filename) 450 451 452class KernelTarget(Target): 453 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 454 use_null_block=False, sar_settings=None, pcm_settings=None, 455 nvmet_bin="nvmetcli", **kwargs): 456 457 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 458 use_null_block, sar_settings, pcm_settings) 459 self.nvmet_bin = nvmet_bin 460 461 def __del__(self): 462 nvmet_command(self.nvmet_bin, "clear") 463 464 def kernel_tgt_gen_nullblock_conf(self, address): 465 nvmet_cfg = { 466 "ports": [], 467 "hosts": [], 468 "subsystems": [], 469 } 470 471 nvmet_cfg["subsystems"].append({ 472 "allowed_hosts": [], 473 "attr": { 474 "allow_any_host": "1", 475 "version": "1.3" 476 }, 477 "namespaces": [ 478 { 479 "device": { 480 "path": "/dev/nullb0", 481 "uuid": "%s" % uuid.uuid4() 482 }, 483 "enable": 1, 484 "nsid": 1 485 } 486 ], 487 "nqn": "nqn.2018-09.io.spdk:cnode1" 488 }) 489 490 nvmet_cfg["ports"].append({ 491 "addr": { 492 "adrfam": "ipv4", 493 "traddr": address, 494 "trsvcid": "4420", 495 "trtype": "%s" % self.transport, 496 }, 497 "portid": 1, 498 "referrals": [], 499 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 500 }) 501 with open("kernel.conf", 'w') as fh: 502 fh.write(json.dumps(nvmet_cfg, indent=2)) 503 504 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 505 506 nvmet_cfg = { 507 "ports": [], 508 "hosts": [], 509 "subsystems": [], 510 } 511 512 # Split disks between NIC IP's 513 disks_per_ip = int(len(nvme_list) / len(address_list)) 514 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 515 516 subsys_no = 1 517 port_no = 0 518 for ip, chunk in zip(address_list, disk_chunks): 519 for disk in chunk: 520 nvmet_cfg["subsystems"].append({ 521 "allowed_hosts": [], 522 "attr": { 523 "allow_any_host": "1", 524 "version": "1.3" 525 }, 526 "namespaces": [ 527 { 528 "device": { 529 "path": disk, 530 "uuid": "%s" % uuid.uuid4() 531 }, 532 "enable": 1, 533 "nsid": subsys_no 534 } 535 ], 536 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 537 }) 538 539 nvmet_cfg["ports"].append({ 540 "addr": { 541 "adrfam": "ipv4", 542 "traddr": ip, 543 "trsvcid": "%s" % (4420 + port_no), 544 "trtype": "%s" % self.transport 545 }, 546 "portid": subsys_no, 547 "referrals": [], 548 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 549 }) 550 subsys_no += 1 551 port_no += 1 552 553 with open("kernel.conf", "w") as fh: 554 fh.write(json.dumps(nvmet_cfg, indent=2)) 555 pass 556 557 def tgt_start(self): 558 self.log_print("Configuring kernel NVMeOF Target") 559 self.subsys_no = get_nvme_devices_count() 560 561 if self.null_block: 562 print("Configuring with null block device.") 563 if len(self.nic_ips) > 1: 564 print("Testing with null block limited to single RDMA NIC.") 565 print("Please specify only 1 IP address.") 566 exit(1) 567 self.subsys_no = 1 568 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 569 else: 570 print("Configuring with NVMe drives.") 571 nvme_list = get_nvme_devices() 572 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 573 self.subsys_no = len(nvme_list) 574 575 nvmet_command(self.nvmet_bin, "clear") 576 nvmet_command(self.nvmet_bin, "restore kernel.conf") 577 self.log_print("Done configuring kernel NVMeOF Target") 578 579 580class SPDKTarget(Target): 581 582 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 583 use_null_block=False, sar_settings=None, pcm_settings=None, 584 num_shared_buffers=4096, num_cores=1, **kwargs): 585 586 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 587 use_null_block, sar_settings, pcm_settings) 588 self.num_cores = num_cores 589 self.num_shared_buffers = num_shared_buffers 590 591 def spdk_tgt_configure(self): 592 self.log_print("Configuring SPDK NVMeOF target via RPC") 593 numa_list = get_used_numa_nodes() 594 595 # Create RDMA transport layer 596 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 597 self.log_print("SPDK NVMeOF transport layer:") 598 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 599 600 if self.null_block: 601 nvme_section = self.spdk_tgt_add_nullblock() 602 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 603 else: 604 nvme_section = self.spdk_tgt_add_nvme_conf() 605 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 606 self.log_print("Done configuring SPDK NVMeOF Target") 607 608 def spdk_tgt_add_nullblock(self): 609 self.log_print("Adding null block bdev to config via RPC") 610 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1") 611 self.log_print("SPDK Bdevs configuration:") 612 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 613 614 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 615 self.log_print("Adding NVMe bdevs to config via RPC") 616 617 bdfs = get_nvme_devices_bdf() 618 bdfs = [b.replace(":", ".") for b in bdfs] 619 620 if req_num_disks: 621 if req_num_disks > len(bdfs): 622 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 623 sys.exit(1) 624 else: 625 bdfs = bdfs[0:req_num_disks] 626 627 for i, bdf in enumerate(bdfs): 628 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 629 630 self.log_print("SPDK Bdevs configuration:") 631 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 632 633 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 634 self.log_print("Adding subsystems to config") 635 if not req_num_disks: 636 req_num_disks = get_nvme_devices_count() 637 638 # Distribute bdevs between provided NICs 639 num_disks = range(0, req_num_disks) 640 if len(num_disks) == 1: 641 disks_per_ip = 1 642 else: 643 disks_per_ip = int(len(num_disks) / len(ips)) 644 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 645 646 # Create subsystems, add bdevs to namespaces, add listeners 647 for ip, chunk in zip(ips, disk_chunks): 648 for c in chunk: 649 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 650 serial = "SPDK00%s" % c 651 bdev_name = "Nvme%sn1" % c 652 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 653 allow_any_host=True, max_namespaces=8) 654 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 655 656 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 657 trtype=self.transport, 658 traddr=ip, 659 trsvcid="4420", 660 adrfam="ipv4") 661 662 self.log_print("SPDK NVMeOF subsystem configuration:") 663 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 664 665 def tgt_start(self): 666 self.subsys_no = get_nvme_devices_count() 667 if self.null_block: 668 self.subsys_no = 1 669 self.log_print("Starting SPDK NVMeOF Target process") 670 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 671 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 672 proc = subprocess.Popen(command, shell=True) 673 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 674 675 with open(self.pid, "w") as fh: 676 fh.write(str(proc.pid)) 677 self.nvmf_proc = proc 678 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 679 self.log_print("Waiting for spdk to initilize...") 680 while True: 681 if os.path.exists("/var/tmp/spdk.sock"): 682 break 683 time.sleep(1) 684 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 685 686 self.spdk_tgt_configure() 687 688 def __del__(self): 689 if hasattr(self, "nvmf_proc"): 690 try: 691 self.nvmf_proc.terminate() 692 self.nvmf_proc.wait() 693 except Exception as e: 694 self.log_print(e) 695 self.nvmf_proc.kill() 696 self.nvmf_proc.communicate() 697 698 699class KernelInitiator(Initiator): 700 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 701 cpus_allowed=None, cpus_allowed_policy="shared", 702 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 703 704 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 705 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 706 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 707 708 self.extra_params = "" 709 if kwargs["extra_params"]: 710 self.extra_params = kwargs["extra_params"] 711 712 def __del__(self): 713 self.ssh_connection.close() 714 715 def kernel_init_connect(self, address_list, subsys_no): 716 subsystems = self.discover_subsystems(address_list, subsys_no) 717 self.log_print("Below connection attempts may result in error messages, this is expected!") 718 for subsystem in subsystems: 719 self.log_print("Trying to connect %s %s %s" % subsystem) 720 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 721 self.transport, 722 *subsystem, 723 self.extra_params)) 724 time.sleep(2) 725 726 def kernel_init_disconnect(self, address_list, subsys_no): 727 subsystems = self.discover_subsystems(address_list, subsys_no) 728 for subsystem in subsystems: 729 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 730 time.sleep(1) 731 732 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 733 out, err = self.remote_call("lsblk -o NAME -nlp") 734 nvme_list = [x for x in out.split("\n") if "nvme" in x] 735 736 filename_section = "" 737 nvme_per_split = int(len(nvme_list) / len(threads)) 738 remainder = len(nvme_list) % len(threads) 739 iterator = iter(nvme_list) 740 result = [] 741 for i in range(len(threads)): 742 result.append([]) 743 for j in range(nvme_per_split): 744 result[i].append(next(iterator)) 745 if remainder: 746 result[i].append(next(iterator)) 747 remainder -= 1 748 for i, r in enumerate(result): 749 header = "[filename%s]" % i 750 disks = "\n".join(["filename=%s" % x for x in r]) 751 job_section_qd = round((io_depth * len(r)) / num_jobs) 752 iodepth = "iodepth=%s" % job_section_qd 753 filename_section = "\n".join([filename_section, header, disks, iodepth]) 754 755 return filename_section 756 757 758class SPDKInitiator(Initiator): 759 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 760 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 761 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 762 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 763 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 764 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 765 766 self.num_cores = num_cores 767 768 def install_spdk(self, local_spdk_zip): 769 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 770 self.log_print("Copied sources zip from target") 771 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 772 773 self.log_print("Sources unpacked") 774 self.log_print("Using fio binary %s" % self.fio_bin) 775 self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;" 776 "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 777 778 self.log_print("SPDK built") 779 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 780 781 def gen_spdk_bdev_conf(self, remote_subsystem_list): 782 header = "[Nvme]" 783 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 784 785 bdev_rows = [row_template.format(transport=self.transport, 786 svc=x[0], 787 nqn=x[1], 788 ip=x[2], 789 i=i) for i, x in enumerate(remote_subsystem_list)] 790 bdev_rows = "\n".join(bdev_rows) 791 bdev_section = "\n".join([header, bdev_rows]) 792 return bdev_section 793 794 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 795 filename_section = "" 796 if len(threads) >= len(subsystems): 797 threads = range(0, len(subsystems)) 798 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 799 nvme_per_split = int(len(subsystems) / len(threads)) 800 remainder = len(subsystems) % len(threads) 801 iterator = iter(filenames) 802 result = [] 803 for i in range(len(threads)): 804 result.append([]) 805 for j in range(nvme_per_split): 806 result[i].append(next(iterator)) 807 if remainder: 808 result[i].append(next(iterator)) 809 remainder -= 1 810 for i, r in enumerate(result): 811 header = "[filename%s]" % i 812 disks = "\n".join(["filename=%s" % x for x in r]) 813 job_section_qd = round((io_depth * len(r)) / num_jobs) 814 iodepth = "iodepth=%s" % job_section_qd 815 filename_section = "\n".join([filename_section, header, disks, iodepth]) 816 817 return filename_section 818 819 820if __name__ == "__main__": 821 spdk_zip_path = "/tmp/spdk.zip" 822 target_results_dir = "/tmp/results" 823 824 if (len(sys.argv) > 1): 825 config_file_path = sys.argv[1] 826 else: 827 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 828 config_file_path = os.path.join(script_full_dir, "config.json") 829 830 print("Using config file: %s" % config_file_path) 831 with open(config_file_path, "r") as config: 832 data = json.load(config) 833 834 initiators = [] 835 fio_cases = [] 836 837 for k, v in data.items(): 838 if "target" in k: 839 if data[k]["mode"] == "spdk": 840 target_obj = SPDKTarget(name=k, **data["general"], **v) 841 elif data[k]["mode"] == "kernel": 842 target_obj = KernelTarget(name=k, **data["general"], **v) 843 elif "initiator" in k: 844 if data[k]["mode"] == "spdk": 845 init_obj = SPDKInitiator(name=k, **data["general"], **v) 846 elif data[k]["mode"] == "kernel": 847 init_obj = KernelInitiator(name=k, **data["general"], **v) 848 initiators.append(init_obj) 849 elif "fio" in k: 850 fio_workloads = itertools.product(data[k]["bs"], 851 data[k]["qd"], 852 data[k]["rw"]) 853 854 fio_run_time = data[k]["run_time"] 855 fio_ramp_time = data[k]["ramp_time"] 856 fio_rw_mix_read = data[k]["rwmixread"] 857 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 858 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 859 else: 860 continue 861 862 # Copy and install SPDK on remote initiators 863 if "skip_spdk_install" not in data["general"]: 864 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 865 threads = [] 866 for i in initiators: 867 if i.mode == "spdk": 868 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 869 threads.append(t) 870 t.start() 871 for t in threads: 872 t.join() 873 874 target_obj.tgt_start() 875 876 # Poor mans threading 877 # Run FIO tests 878 for block_size, io_depth, rw in fio_workloads: 879 threads = [] 880 configs = [] 881 for i in initiators: 882 if i.mode == "kernel": 883 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 884 885 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 886 fio_num_jobs, fio_ramp_time, fio_run_time) 887 configs.append(cfg) 888 889 for i, cfg in zip(initiators, configs): 890 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 891 threads.append(t) 892 if target_obj.enable_sar: 893 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 894 sar_file_name = ".".join([sar_file_name, "txt"]) 895 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 896 threads.append(t) 897 898 if target_obj.enable_pcm: 899 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 900 pcm_file_name = ".".join([pcm_file_name, "csv"]) 901 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 902 threads.append(t) 903 904 if target_obj.enable_pcm_memory: 905 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 906 pcm_file_name = ".".join([pcm_file_name, "csv"]) 907 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 908 threads.append(t) 909 910 for t in threads: 911 t.start() 912 for t in threads: 913 t.join() 914 915 for i in initiators: 916 if i.mode == "kernel": 917 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 918 i.copy_result_files(target_results_dir) 919 920 target_obj.parse_results(target_results_dir) 921