1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from common import * 18 19 20class Server: 21 def __init__(self, name, username, password, mode, nic_ips, transport): 22 self.name = name 23 self.mode = mode 24 self.username = username 25 self.password = password 26 self.nic_ips = nic_ips 27 self.transport = transport.lower() 28 29 if not re.match("^[A-Za-z0-9]*$", name): 30 self.log_print("Please use a name which contains only letters or numbers") 31 sys.exit(1) 32 33 def log_print(self, msg): 34 print("[%s] %s" % (self.name, msg), flush=True) 35 36 37class Target(Server): 38 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 39 use_null_block=False, sar_settings=None, pcm_settings=None): 40 41 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 42 self.null_block = bool(use_null_block) 43 self.enable_sar = False 44 self.enable_pcm_memory = False 45 self.enable_pcm = False 46 47 if sar_settings: 48 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 49 50 if pcm_settings: 51 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 52 53 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 54 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 55 56 def zip_spdk_sources(self, spdk_dir, dest_file): 57 self.log_print("Zipping SPDK source directory") 58 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 59 for root, directories, files in os.walk(spdk_dir, followlinks=True): 60 for file in files: 61 fh.write(os.path.relpath(os.path.join(root, file))) 62 fh.close() 63 self.log_print("Done zipping") 64 65 def read_json_stats(self, file): 66 with open(file, "r") as json_data: 67 data = json.load(json_data) 68 job_pos = 0 # job_post = 0 because using aggregated results 69 70 # Check if latency is in nano or microseconds to choose correct dict key 71 def get_lat_unit(key_prefix, dict_section): 72 # key prefix - lat, clat or slat. 73 # dict section - portion of json containing latency bucket in question 74 # Return dict key to access the bucket and unit as string 75 for k, v in dict_section.items(): 76 if k.startswith(key_prefix): 77 return k, k.split("_")[1] 78 79 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 80 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 81 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 82 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 83 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 84 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 85 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 86 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 87 read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"]) 88 read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"]) 89 read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"]) 90 91 if "ns" in lat_unit: 92 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 93 if "ns" in clat_unit: 94 read_p99_lat = read_p99_lat / 1000 95 read_p99_9_lat = read_p99_9_lat / 1000 96 read_p99_99_lat = read_p99_99_lat / 1000 97 read_p99_999_lat = read_p99_999_lat / 1000 98 99 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 100 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 101 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 102 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 103 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 104 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 105 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 106 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 107 write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"]) 108 write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"]) 109 write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"]) 110 111 if "ns" in lat_unit: 112 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 113 if "ns" in clat_unit: 114 write_p99_lat = write_p99_lat / 1000 115 write_p99_9_lat = write_p99_9_lat / 1000 116 write_p99_99_lat = write_p99_99_lat / 1000 117 write_p99_999_lat = write_p99_999_lat / 1000 118 119 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 120 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 121 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 122 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 123 124 def parse_results(self, results_dir, initiator_count=None, run_num=None): 125 files = os.listdir(results_dir) 126 fio_files = filter(lambda x: ".fio" in x, files) 127 json_files = [x for x in files if ".json" in x] 128 129 # Create empty results file 130 csv_file = "nvmf_results.csv" 131 with open(os.path.join(results_dir, csv_file), "w") as fh: 132 header_line = ",".join(["Name", 133 "read_iops", "read_bw", "read_avg_lat_us", 134 "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us", 135 "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 136 "write_iops", "write_bw", "write_avg_lat_us", 137 "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us", 138 "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]) 139 fh.write(header_line + "\n") 140 rows = set() 141 142 for fio_config in fio_files: 143 self.log_print("Getting FIO stats for %s" % fio_config) 144 job_name, _ = os.path.splitext(fio_config) 145 146 # If "_CPU" exists in name - ignore it 147 # Initiators for the same job could have diffrent num_cores parameter 148 job_name = re.sub(r"_\d+CPU", "", job_name) 149 job_result_files = [x for x in json_files if job_name in x] 150 self.log_print("Matching result files for current fio config:") 151 for j in job_result_files: 152 self.log_print("\t %s" % j) 153 154 # There may have been more than 1 initiator used in test, need to check that 155 # Result files are created so that string after last "_" separator is server name 156 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 157 inits_avg_results = [] 158 for i in inits_names: 159 self.log_print("\tGetting stats for initiator %s" % i) 160 # There may have been more than 1 test run for this job, calculate average results for initiator 161 i_results = [x for x in job_result_files if i in x] 162 163 separate_stats = [] 164 for r in i_results: 165 stats = self.read_json_stats(os.path.join(results_dir, r)) 166 separate_stats.append(stats) 167 self.log_print(stats) 168 169 z = [sum(c) for c in zip(*separate_stats)] 170 z = [c/len(separate_stats) for c in z] 171 inits_avg_results.append(z) 172 173 self.log_print("\tAverage results for initiator %s" % i) 174 self.log_print(z) 175 176 # Sum average results of all initiators running this FIO job 177 self.log_print("\tTotal results for %s from all initiators" % fio_config) 178 for a in inits_avg_results: 179 self.log_print(a) 180 total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)] 181 rows.add(",".join([job_name, *total])) 182 183 # Save results to file 184 for row in rows: 185 with open(os.path.join(results_dir, csv_file), "a") as fh: 186 fh.write(row + "\n") 187 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 188 189 def measure_sar(self, results_dir, sar_file_name): 190 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 191 time.sleep(self.sar_delay) 192 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 193 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 194 for line in out.split("\n"): 195 if "Average" in line and "CPU" in line: 196 self.log_print("Summary CPU utilization from SAR:") 197 self.log_print(line) 198 if "Average" in line and "all" in line: 199 self.log_print(line) 200 fh.write(out) 201 202 def measure_pcm_memory(self, results_dir, pcm_file_name): 203 time.sleep(self.pcm_delay) 204 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 205 results_dir, pcm_file_name), shell=True) 206 time.sleep(self.pcm_count) 207 pcm_memory.kill() 208 209 def measure_pcm(self, results_dir, pcm_file_name): 210 time.sleep(self.pcm_delay) 211 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 212 results_dir, pcm_file_name), shell=True, check=True) 213 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 214 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 215 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 216 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 217 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 218 219 220class Initiator(Server): 221 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 222 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 223 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 224 225 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 226 227 self.ip = ip 228 self.spdk_dir = workspace 229 self.fio_bin = fio_bin 230 self.cpus_allowed = cpus_allowed 231 self.cpus_allowed_policy = cpus_allowed_policy 232 self.cpu_frequency = cpu_frequency 233 self.nvmecli_bin = nvmecli_bin 234 self.ssh_connection = paramiko.SSHClient() 235 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 236 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 237 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 238 self.remote_call("mkdir -p %s" % self.spdk_dir) 239 self.set_cpu_frequency() 240 241 def __del__(self): 242 self.ssh_connection.close() 243 244 def put_file(self, local, remote_dest): 245 ftp = self.ssh_connection.open_sftp() 246 ftp.put(local, remote_dest) 247 ftp.close() 248 249 def get_file(self, remote, local_dest): 250 ftp = self.ssh_connection.open_sftp() 251 ftp.get(remote, local_dest) 252 ftp.close() 253 254 def remote_call(self, cmd): 255 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 256 out = stdout.read().decode(encoding="utf-8") 257 err = stderr.read().decode(encoding="utf-8") 258 return out, err 259 260 def copy_result_files(self, dest_dir): 261 self.log_print("Copying results") 262 263 if not os.path.exists(dest_dir): 264 os.mkdir(dest_dir) 265 266 # Get list of result files from initiator and copy them back to target 267 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 268 file_list = stdout.strip().split("\n") 269 270 for file in file_list: 271 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 272 os.path.join(dest_dir, file)) 273 self.log_print("Done copying results") 274 275 def discover_subsystems(self, address_list, subsys_no): 276 num_nvmes = range(0, subsys_no) 277 nvme_discover_output = "" 278 for ip, subsys_no in itertools.product(address_list, num_nvmes): 279 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 280 nvme_discover_cmd = ["sudo", 281 "%s" % self.nvmecli_bin, 282 "discover", "-t %s" % self.transport, 283 "-s %s" % (4420 + subsys_no), 284 "-a %s" % ip] 285 nvme_discover_cmd = " ".join(nvme_discover_cmd) 286 287 stdout, stderr = self.remote_call(nvme_discover_cmd) 288 if stdout: 289 nvme_discover_output = nvme_discover_output + stdout 290 291 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 292 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 293 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 294 nvme_discover_output) # from nvme discovery output 295 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 296 subsystems = list(set(subsystems)) 297 subsystems.sort(key=lambda x: x[1]) 298 self.log_print("Found matching subsystems on target side:") 299 for s in subsystems: 300 self.log_print(s) 301 302 return subsystems 303 304 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 305 fio_conf_template = """ 306[global] 307ioengine={ioengine} 308{spdk_conf} 309thread=1 310group_reporting=1 311direct=1 312percentile_list=50:90:99:99.5:99.9:99.99:99.999 313 314norandommap=1 315rw={rw} 316rwmixread={rwmixread} 317bs={block_size} 318iodepth={io_depth} 319time_based=1 320ramp_time={ramp_time} 321runtime={run_time} 322""" 323 if "spdk" in self.mode: 324 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 325 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 326 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 327 ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir 328 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 329 else: 330 ioengine = "libaio" 331 spdk_conf = "" 332 out, err = self.remote_call("lsblk -o NAME -nlp") 333 subsystems = [x for x in out.split("\n") if "nvme" in x] 334 335 if self.cpus_allowed is not None: 336 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 337 cpus_num = 0 338 cpus = self.cpus_allowed.split(",") 339 for cpu in cpus: 340 if "-" in cpu: 341 a, b = cpu.split("-") 342 a = int(a) 343 b = int(b) 344 cpus_num += len(range(a, b)) 345 else: 346 cpus_num += 1 347 threads = range(0, cpus_num) 348 elif hasattr(self, 'num_cores'): 349 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 350 threads = range(0, int(self.num_cores)) 351 else: 352 threads = range(0, len(subsystems)) 353 354 if "spdk" in self.mode: 355 filename_section = self.gen_fio_filename_conf(subsystems, threads) 356 else: 357 filename_section = self.gen_fio_filename_conf(threads) 358 359 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 360 rw=rw, rwmixread=rwmixread, block_size=block_size, 361 io_depth=io_depth, ramp_time=ramp_time, run_time=run_time) 362 if num_jobs: 363 fio_config = fio_config + "numjobs=%s \n" % num_jobs 364 if self.cpus_allowed is not None: 365 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 366 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 367 fio_config = fio_config + filename_section 368 369 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 370 if hasattr(self, "num_cores"): 371 fio_config_filename += "_%sCPU" % self.num_cores 372 fio_config_filename += ".fio" 373 374 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 375 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 376 self.log_print("Created FIO Config:") 377 self.log_print(fio_config) 378 379 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 380 381 def set_cpu_frequency(self): 382 if self.cpu_frequency is not None: 383 try: 384 self.remote_call('sudo cpupower frequency-set -g userspace') 385 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 386 cmd = "sudo cpupower frequency-info" 387 output, error = self.remote_call(cmd) 388 self.log_print(output) 389 self.log_print(error) 390 except Exception: 391 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 392 sys.exit() 393 else: 394 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 395 396 def run_fio(self, fio_config_file, run_num=None): 397 job_name, _ = os.path.splitext(fio_config_file) 398 self.log_print("Starting FIO run for job: %s" % job_name) 399 self.log_print("Using FIO: %s" % self.fio_bin) 400 401 if run_num: 402 for i in range(1, run_num + 1): 403 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 404 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 405 output, error = self.remote_call(cmd) 406 self.log_print(output) 407 self.log_print(error) 408 else: 409 output_filename = job_name + "_" + self.name + ".json" 410 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 411 output, error = self.remote_call(cmd) 412 self.log_print(output) 413 self.log_print(error) 414 self.log_print("FIO run finished. Results in: %s" % output_filename) 415 416 417class KernelTarget(Target): 418 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 419 use_null_block=False, sar_settings=None, pcm_settings=None, 420 nvmet_bin="nvmetcli", **kwargs): 421 422 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 423 use_null_block, sar_settings, pcm_settings) 424 self.nvmet_bin = nvmet_bin 425 426 def __del__(self): 427 nvmet_command(self.nvmet_bin, "clear") 428 429 def kernel_tgt_gen_nullblock_conf(self, address): 430 nvmet_cfg = { 431 "ports": [], 432 "hosts": [], 433 "subsystems": [], 434 } 435 436 nvmet_cfg["subsystems"].append({ 437 "allowed_hosts": [], 438 "attr": { 439 "allow_any_host": "1", 440 "version": "1.3" 441 }, 442 "namespaces": [ 443 { 444 "device": { 445 "path": "/dev/nullb0", 446 "uuid": "%s" % uuid.uuid4() 447 }, 448 "enable": 1, 449 "nsid": 1 450 } 451 ], 452 "nqn": "nqn.2018-09.io.spdk:cnode1" 453 }) 454 455 nvmet_cfg["ports"].append({ 456 "addr": { 457 "adrfam": "ipv4", 458 "traddr": address, 459 "trsvcid": "4420", 460 "trtype": "%s" % self.transport, 461 }, 462 "portid": 1, 463 "referrals": [], 464 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 465 }) 466 with open("kernel.conf", 'w') as fh: 467 fh.write(json.dumps(nvmet_cfg, indent=2)) 468 469 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 470 471 nvmet_cfg = { 472 "ports": [], 473 "hosts": [], 474 "subsystems": [], 475 } 476 477 # Split disks between NIC IP's 478 disks_per_ip = int(len(nvme_list) / len(address_list)) 479 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 480 481 subsys_no = 1 482 port_no = 0 483 for ip, chunk in zip(address_list, disk_chunks): 484 for disk in chunk: 485 nvmet_cfg["subsystems"].append({ 486 "allowed_hosts": [], 487 "attr": { 488 "allow_any_host": "1", 489 "version": "1.3" 490 }, 491 "namespaces": [ 492 { 493 "device": { 494 "path": disk, 495 "uuid": "%s" % uuid.uuid4() 496 }, 497 "enable": 1, 498 "nsid": subsys_no 499 } 500 ], 501 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 502 }) 503 504 nvmet_cfg["ports"].append({ 505 "addr": { 506 "adrfam": "ipv4", 507 "traddr": ip, 508 "trsvcid": "%s" % (4420 + port_no), 509 "trtype": "%s" % self.transport 510 }, 511 "portid": subsys_no, 512 "referrals": [], 513 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 514 }) 515 subsys_no += 1 516 port_no += 1 517 518 with open("kernel.conf", "w") as fh: 519 fh.write(json.dumps(nvmet_cfg, indent=2)) 520 pass 521 522 def tgt_start(self): 523 self.log_print("Configuring kernel NVMeOF Target") 524 525 if self.null_block: 526 print("Configuring with null block device.") 527 if len(self.nic_ips) > 1: 528 print("Testing with null block limited to single RDMA NIC.") 529 print("Please specify only 1 IP address.") 530 exit(1) 531 self.subsys_no = 1 532 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 533 else: 534 print("Configuring with NVMe drives.") 535 nvme_list = get_nvme_devices() 536 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 537 self.subsys_no = len(nvme_list) 538 539 nvmet_command(self.nvmet_bin, "clear") 540 nvmet_command(self.nvmet_bin, "restore kernel.conf") 541 self.log_print("Done configuring kernel NVMeOF Target") 542 543 544class SPDKTarget(Target): 545 546 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 547 use_null_block=False, sar_settings=None, pcm_settings=None, 548 num_shared_buffers=4096, num_cores=1, **kwargs): 549 550 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 551 use_null_block, sar_settings, pcm_settings) 552 self.num_cores = num_cores 553 self.num_shared_buffers = num_shared_buffers 554 555 def spdk_tgt_configure(self): 556 self.log_print("Configuring SPDK NVMeOF target via RPC") 557 numa_list = get_used_numa_nodes() 558 559 # Create RDMA transport layer 560 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 561 self.log_print("SPDK NVMeOF transport layer:") 562 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 563 564 if self.null_block: 565 nvme_section = self.spdk_tgt_add_nullblock() 566 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 567 else: 568 nvme_section = self.spdk_tgt_add_nvme_conf() 569 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 570 self.log_print("Done configuring SPDK NVMeOF Target") 571 572 def spdk_tgt_add_nullblock(self): 573 self.log_print("Adding null block bdev to config via RPC") 574 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1") 575 self.log_print("SPDK Bdevs configuration:") 576 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 577 578 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 579 self.log_print("Adding NVMe bdevs to config via RPC") 580 581 bdfs = get_nvme_devices_bdf() 582 bdfs = [b.replace(":", ".") for b in bdfs] 583 584 if req_num_disks: 585 if req_num_disks > len(bdfs): 586 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 587 sys.exit(1) 588 else: 589 bdfs = bdfs[0:req_num_disks] 590 591 for i, bdf in enumerate(bdfs): 592 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 593 594 self.log_print("SPDK Bdevs configuration:") 595 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 596 597 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 598 self.log_print("Adding subsystems to config") 599 if not req_num_disks: 600 req_num_disks = get_nvme_devices_count() 601 602 # Distribute bdevs between provided NICs 603 num_disks = range(1, req_num_disks + 1) 604 disks_per_ip = int(len(num_disks) / len(ips)) 605 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 606 607 # Create subsystems, add bdevs to namespaces, add listeners 608 for ip, chunk in zip(ips, disk_chunks): 609 for c in chunk: 610 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 611 serial = "SPDK00%s" % c 612 bdev_name = "Nvme%sn1" % (c - 1) 613 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 614 allow_any_host=True, max_namespaces=8) 615 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 616 617 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 618 trtype=self.transport, 619 traddr=ip, 620 trsvcid="4420", 621 adrfam="ipv4") 622 623 self.log_print("SPDK NVMeOF subsystem configuration:") 624 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 625 626 def tgt_start(self): 627 self.subsys_no = get_nvme_devices_count() 628 self.log_print("Starting SPDK NVMeOF Target process") 629 nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt") 630 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 631 proc = subprocess.Popen(command, shell=True) 632 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 633 634 with open(self.pid, "w") as fh: 635 fh.write(str(proc.pid)) 636 self.nvmf_proc = proc 637 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 638 self.log_print("Waiting for spdk to initilize...") 639 while True: 640 if os.path.exists("/var/tmp/spdk.sock"): 641 break 642 time.sleep(1) 643 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 644 645 self.spdk_tgt_configure() 646 647 def __del__(self): 648 if hasattr(self, "nvmf_proc"): 649 try: 650 self.nvmf_proc.terminate() 651 self.nvmf_proc.wait() 652 except Exception as e: 653 self.log_print(e) 654 self.nvmf_proc.kill() 655 self.nvmf_proc.communicate() 656 657 658class KernelInitiator(Initiator): 659 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 660 cpus_allowed=None, cpus_allowed_policy="shared", 661 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 662 663 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 664 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 665 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 666 667 self.extra_params = "" 668 if kwargs["extra_params"]: 669 self.extra_params = kwargs["extra_params"] 670 671 def __del__(self): 672 self.ssh_connection.close() 673 674 def kernel_init_connect(self, address_list, subsys_no): 675 subsystems = self.discover_subsystems(address_list, subsys_no) 676 self.log_print("Below connection attempts may result in error messages, this is expected!") 677 for subsystem in subsystems: 678 self.log_print("Trying to connect %s %s %s" % subsystem) 679 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 680 self.transport, 681 *subsystem, 682 self.extra_params)) 683 time.sleep(2) 684 685 def kernel_init_disconnect(self, address_list, subsys_no): 686 subsystems = self.discover_subsystems(address_list, subsys_no) 687 for subsystem in subsystems: 688 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 689 time.sleep(1) 690 691 def gen_fio_filename_conf(self, threads): 692 out, err = self.remote_call("lsblk -o NAME -nlp") 693 nvme_list = [x for x in out.split("\n") if "nvme" in x] 694 695 filename_section = "" 696 filenames = ["nvme%sn1" % x for x in range(0, len(nvme_list))] 697 nvme_per_split = int(len(nvme_list) / threads) 698 remainder = len(nvme_list) % threads 699 iterator = iter(filenames) 700 result = [] 701 for i in range(len(threads)): 702 result.append([]) 703 for j in range(nvme_per_split): 704 result[i].append(next(iterator)) 705 if remainder: 706 result[i].append(next(iterator)) 707 remainder -= 1 708 for i, r in enumerate(result): 709 header = "[filename%s]" % i 710 disks = "\n".join(["filename=/dev/%s" % x for x in r]) 711 filename_section = "\n".join([filename_section, header, disks]) 712 713 return filename_section 714 715 716class SPDKInitiator(Initiator): 717 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 718 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 719 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 720 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 721 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 722 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 723 724 self.num_cores = num_cores 725 726 def install_spdk(self, local_spdk_zip): 727 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 728 self.log_print("Copied sources zip from target") 729 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 730 731 self.log_print("Sources unpacked") 732 self.log_print("Using fio binary %s" % self.fio_bin) 733 self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;" 734 "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 735 736 self.log_print("SPDK built") 737 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 738 739 def gen_spdk_bdev_conf(self, remote_subsystem_list): 740 header = "[Nvme]" 741 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 742 743 bdev_rows = [row_template.format(transport=self.transport, 744 svc=x[0], 745 nqn=x[1], 746 ip=x[2], 747 i=i) for i, x in enumerate(remote_subsystem_list)] 748 bdev_rows = "\n".join(bdev_rows) 749 bdev_section = "\n".join([header, bdev_rows]) 750 return bdev_section 751 752 def gen_fio_filename_conf(self, subsystems, threads): 753 filename_section = "" 754 if len(threads) >= len(subsystems): 755 threads = range(0, len(subsystems)) 756 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 757 nvme_per_split = int(len(subsystems) / len(threads)) 758 remainder = len(subsystems) % len(threads) 759 iterator = iter(filenames) 760 result = [] 761 for i in range(len(threads)): 762 result.append([]) 763 for j in range(nvme_per_split): 764 result[i].append(next(iterator)) 765 if remainder: 766 result[i].append(next(iterator)) 767 remainder -= 1 768 for i, r in enumerate(result): 769 header = "[filename%s]" % i 770 disks = "\n".join(["filename=%s" % x for x in r]) 771 filename_section = "\n".join([filename_section, header, disks]) 772 773 return filename_section 774 775 776if __name__ == "__main__": 777 spdk_zip_path = "/tmp/spdk.zip" 778 target_results_dir = "/tmp/results" 779 780 if (len(sys.argv) > 1): 781 config_file_path = sys.argv[1] 782 else: 783 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 784 config_file_path = os.path.join(script_full_dir, "config.json") 785 786 print("Using config file: %s" % config_file_path) 787 with open(config_file_path, "r") as config: 788 data = json.load(config) 789 790 initiators = [] 791 fio_cases = [] 792 793 for k, v in data.items(): 794 if "target" in k: 795 if data[k]["mode"] == "spdk": 796 target_obj = SPDKTarget(name=k, **data["general"], **v) 797 elif data[k]["mode"] == "kernel": 798 target_obj = KernelTarget(name=k, **data["general"], **v) 799 elif "initiator" in k: 800 if data[k]["mode"] == "spdk": 801 init_obj = SPDKInitiator(name=k, **data["general"], **v) 802 elif data[k]["mode"] == "kernel": 803 init_obj = KernelInitiator(name=k, **data["general"], **v) 804 initiators.append(init_obj) 805 elif "fio" in k: 806 fio_workloads = itertools.product(data[k]["bs"], 807 data[k]["qd"], 808 data[k]["rw"]) 809 810 fio_run_time = data[k]["run_time"] 811 fio_ramp_time = data[k]["ramp_time"] 812 fio_rw_mix_read = data[k]["rwmixread"] 813 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 814 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 815 else: 816 continue 817 818 # Copy and install SPDK on remote initiators 819 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 820 threads = [] 821 for i in initiators: 822 if i.mode == "spdk": 823 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 824 threads.append(t) 825 t.start() 826 for t in threads: 827 t.join() 828 829 target_obj.tgt_start() 830 831 # Poor mans threading 832 # Run FIO tests 833 for block_size, io_depth, rw in fio_workloads: 834 threads = [] 835 configs = [] 836 for i in initiators: 837 if i.mode == "kernel": 838 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 839 840 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 841 fio_num_jobs, fio_ramp_time, fio_run_time) 842 configs.append(cfg) 843 844 for i, cfg in zip(initiators, configs): 845 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 846 threads.append(t) 847 if target_obj.enable_sar: 848 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 849 sar_file_name = ".".join([sar_file_name, "txt"]) 850 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 851 threads.append(t) 852 853 if target_obj.enable_pcm: 854 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 855 pcm_file_name = ".".join([pcm_file_name, "csv"]) 856 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 857 threads.append(t) 858 859 if target_obj.enable_pcm_memory: 860 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 861 pcm_file_name = ".".join([pcm_file_name, "csv"]) 862 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 863 threads.append(t) 864 865 for t in threads: 866 t.start() 867 for t in threads: 868 t.join() 869 870 for i in initiators: 871 if i.mode == "kernel": 872 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 873 i.copy_result_files(target_results_dir) 874 875 target_obj.parse_results(target_results_dir) 876