1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16from common import * 17 18 19class Server: 20 def __init__(self, name, username, password, mode, rdma_ips): 21 self.name = name 22 self.mode = mode 23 self.username = username 24 self.password = password 25 self.rdma_ips = rdma_ips 26 27 if not re.match("^[A-Za-z0-9]*$", name): 28 self.log_print("Please use a name which contains only letters or numbers") 29 sys.exit(1) 30 31 def log_print(self, msg): 32 print("[%s] %s" % (self.name, msg), flush=True) 33 34 35class Target(Server): 36 def __init__(self, name, username, password, mode, rdma_ips, use_null_block=False, sar_settings=None): 37 super(Target, self).__init__(name, username, password, mode, rdma_ips) 38 self.null_block = bool(use_null_block) 39 self.enable_sar = False 40 if sar_settings: 41 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 42 43 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 44 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 45 46 def zip_spdk_sources(self, spdk_dir, dest_file): 47 self.log_print("Zipping SPDK source directory") 48 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 49 for root, directories, files in os.walk(spdk_dir): 50 for file in files: 51 fh.write(os.path.relpath(os.path.join(root, file))) 52 fh.close() 53 self.log_print("Done zipping") 54 55 def read_json_stats(self, file): 56 with open(file, "r") as json_data: 57 data = json.load(json_data) 58 job_pos = 0 # job_post = 0 because using aggregated results 59 60 # Check if latency is in nano or microseconds to choose correct dict key 61 def get_lat_unit(key_prefix, dict_section): 62 # key prefix - lat, clat or slat. 63 # dict section - portion of json containing latency bucket in question 64 # Return dict key to access the bucket and unit as string 65 for k, v in dict_section.items(): 66 if k.startswith(key_prefix): 67 return k, k.split("_")[1] 68 69 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 70 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 71 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 72 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 73 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 74 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 75 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 76 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 77 78 if "ns" in lat_unit: 79 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 80 if "ns" in clat_unit: 81 read_p99_lat = read_p99_lat / 1000 82 83 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 84 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 85 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 86 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 87 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 88 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 89 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 90 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 91 92 if "ns" in lat_unit: 93 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 94 if "ns" in clat_unit: 95 write_p99_lat = write_p99_lat / 1000 96 97 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, read_p99_lat, 98 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, write_p99_lat] 99 100 def parse_results(self, results_dir, initiator_count=None, run_num=None): 101 files = os.listdir(results_dir) 102 fio_files = filter(lambda x: ".fio" in x, files) 103 json_files = [x for x in files if ".json" in x] 104 105 # Create empty results file 106 csv_file = "nvmf_results.csv" 107 with open(os.path.join(results_dir, csv_file), "w") as fh: 108 header_line = ",".join(["Name", 109 "read_iops", "read_bw", "read_avg_lat_us", 110 "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us", 111 "write_iops", "write_bw", "write_avg_lat_us", 112 "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us"]) 113 fh.write(header_line + "\n") 114 rows = set() 115 116 for fio_config in fio_files: 117 self.log_print("Getting FIO stats for %s" % fio_config) 118 job_name, _ = os.path.splitext(fio_config) 119 120 # If "_CPU" exists in name - ignore it 121 # Initiators for the same job could have diffrent num_cores parameter 122 job_name = re.sub(r"_\d+CPU", "", job_name) 123 job_result_files = [x for x in json_files if job_name in x] 124 self.log_print("Matching result files for current fio config:") 125 for j in job_result_files: 126 self.log_print("\t %s" % j) 127 128 # There may have been more than 1 initiator used in test, need to check that 129 # Result files are created so that string after last "_" separator is server name 130 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 131 inits_avg_results = [] 132 for i in inits_names: 133 self.log_print("\tGetting stats for initiator %s" % i) 134 # There may have been more than 1 test run for this job, calculate average results for initiator 135 i_results = [x for x in job_result_files if i in x] 136 137 separate_stats = [] 138 for r in i_results: 139 stats = self.read_json_stats(os.path.join(results_dir, r)) 140 separate_stats.append(stats) 141 self.log_print(stats) 142 143 z = [sum(c) for c in zip(*separate_stats)] 144 z = [c/len(separate_stats) for c in z] 145 inits_avg_results.append(z) 146 147 self.log_print("\tAverage results for initiator %s" % i) 148 self.log_print(z) 149 150 # Sum average results of all initiators running this FIO job 151 self.log_print("\tTotal results for %s from all initiators" % fio_config) 152 for a in inits_avg_results: 153 self.log_print(a) 154 total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)] 155 rows.add(",".join([job_name, *total])) 156 157 # Save results to file 158 for row in rows: 159 with open(os.path.join(results_dir, csv_file), "a") as fh: 160 fh.write(row + "\n") 161 162 def measure_sar(self, results_dir, sar_file_name): 163 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 164 time.sleep(self.sar_delay) 165 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 166 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 167 for line in out.split("\n"): 168 if "Average" in line and "CPU" in line: 169 self.log_print("Summary CPU utilization from SAR:") 170 self.log_print(line) 171 if "Average" in line and "all" in line: 172 self.log_print(line) 173 fh.write(out) 174 175 176class Initiator(Server): 177 def __init__(self, name, username, password, mode, rdma_ips, ip, workspace="/tmp/spdk"): 178 super(Initiator, self).__init__(name, username, password, mode, rdma_ips) 179 self.ip = ip 180 self.spdk_dir = workspace 181 182 self.ssh_connection = paramiko.SSHClient() 183 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy) 184 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 185 stdin, stdout, stderr = self.ssh_connection.exec_command("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 186 stdout.channel.recv_exit_status() 187 stdin, stdout, stderr = self.ssh_connection.exec_command("mkdir -p %s" % self.spdk_dir) 188 stdout.channel.recv_exit_status() 189 190 def __del__(self): 191 self.ssh_connection.close() 192 193 def put_file(self, local, remote_dest): 194 ftp = self.ssh_connection.open_sftp() 195 ftp.put(local, remote_dest) 196 ftp.close() 197 198 def get_file(self, remote, local_dest): 199 ftp = self.ssh_connection.open_sftp() 200 ftp.get(remote, local_dest) 201 ftp.close() 202 203 def copy_result_files(self, dest_dir): 204 self.log_print("Copying results") 205 206 if not os.path.exists(dest_dir): 207 os.mkdir(dest_dir) 208 209 # Get list of result files from initiator and copy them back to target 210 stdin, stdout, stderr = self.ssh_connection.exec_command("ls %s/nvmf_perf" % self.spdk_dir) 211 212 file_list = stdout.read().decode(encoding="utf-8").strip().split("\n") 213 for file in file_list: 214 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 215 os.path.join(dest_dir, file)) 216 self.log_print("Done copying results") 217 218 def discover_subsystems(self, address_list, subsys_no): 219 num_nvmes = range(0, subsys_no) 220 nvme_discover_output = "" 221 for ip, subsys_no in itertools.product(address_list, num_nvmes): 222 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 223 nvme_discover_cmd = ["sudo", "nvme", "discover", "-t rdma", "-s %s" % (4420 + subsys_no), "-a %s" % ip] 224 nvme_discover_cmd = " ".join(nvme_discover_cmd) 225 226 stdin, stdout, stderr = self.ssh_connection.exec_command(nvme_discover_cmd) 227 out = stdout.read().decode(encoding="utf-8") 228 if out: 229 nvme_discover_output = nvme_discover_output + out 230 231 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 232 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 233 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 234 nvme_discover_output) # from nvme discovery output 235 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 236 subsystems = list(set(subsystems)) 237 subsystems.sort(key=lambda x: x[1]) 238 self.log_print("Found matching subsystems on target side:") 239 for s in subsystems: 240 self.log_print(s) 241 242 return subsystems 243 244 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 245 fio_conf_template = """ 246[global] 247ioengine={ioengine} 248{spdk_conf} 249thread=1 250group_reporting=1 251direct=1 252 253norandommap=1 254rw={rw} 255rwmixread={rwmixread} 256bs={block_size} 257iodepth={io_depth} 258time_based=1 259ramp_time={ramp_time} 260runtime={run_time} 261""" 262 if "spdk" in self.mode: 263 subsystems = self.discover_subsystems(self.rdma_ips, subsys_no) 264 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 265 stdin, stdout, stderr = self.ssh_connection.exec_command("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 266 ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir 267 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 268 filename_section = self.gen_fio_filename_conf(subsystems) 269 else: 270 ioengine = "libaio" 271 spdk_conf = "" 272 filename_section = self.gen_fio_filename_conf() 273 274 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 275 rw=rw, rwmixread=rwmixread, block_size=block_size, 276 io_depth=io_depth, ramp_time=ramp_time, run_time=run_time) 277 if num_jobs: 278 fio_config = fio_config + "numjobs=%s" % num_jobs 279 fio_config = fio_config + filename_section 280 281 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 282 if hasattr(self, "num_cores"): 283 fio_config_filename += "_%sCPU" % self.num_cores 284 fio_config_filename += ".fio" 285 286 stdin, stdout, stderr = self.ssh_connection.exec_command("mkdir -p %s/nvmf_perf" % self.spdk_dir) 287 stdin, stdout, stderr = self.ssh_connection.exec_command( 288 "echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 289 stdout.channel.recv_exit_status() 290 self.log_print("Created FIO Config:") 291 self.log_print(fio_config) 292 293 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 294 295 def run_fio(self, fio_config_file, run_num=None): 296 job_name, _ = os.path.splitext(fio_config_file) 297 self.log_print("Starting FIO run for job: %s" % job_name) 298 if run_num: 299 for i in range(1, run_num + 1): 300 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 301 cmd = "sudo /usr/src/fio/fio %s --output-format=json --output=%s" % (fio_config_file, output_filename) 302 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 303 output = stdout.read().decode(encoding="utf-8") 304 error = stderr.read().decode(encoding="utf-8") 305 self.log_print(output) 306 self.log_print(error) 307 else: 308 output_filename = job_name + "_" + self.name + ".json" 309 cmd = "sudo /usr/src/fio/fio %s --output-format=json --output=%s" % (fio_config_file, output_filename) 310 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 311 output = stdout.read().decode(encoding="utf-8") 312 error = stderr.read().decode(encoding="utf-8") 313 self.log_print(output) 314 self.log_print(error) 315 self.log_print("FIO run finished. Results in: %s" % output_filename) 316 317 318class KernelTarget(Target): 319 def __init__(self, name, username, password, mode, rdma_ips, use_null_block=False, sar_settings=None, nvmet_dir=None, **kwargs): 320 super(KernelTarget, self).__init__(name, username, password, mode, rdma_ips, use_null_block, sar_settings) 321 322 if nvmet_dir: 323 self.nvmet_bin = os.path.join(nvmet_dir, "nvmetcli") 324 else: 325 self.nvmet_bin = "nvmetcli" 326 327 def __del__(self): 328 nvmet_command(self.nvmet_bin, "clear") 329 330 def kernel_tgt_gen_nullblock_conf(self, address): 331 nvmet_cfg = { 332 "ports": [], 333 "hosts": [], 334 "subsystems": [], 335 } 336 337 nvmet_cfg["subsystems"].append({ 338 "allowed_hosts": [], 339 "attr": { 340 "allow_any_host": "1", 341 "version": "1.3" 342 }, 343 "namespaces": [ 344 { 345 "device": { 346 "path": "/dev/nullb0", 347 "uuid": "%s" % uuid.uuid4() 348 }, 349 "enable": 1, 350 "nsid": 1 351 } 352 ], 353 "nqn": "nqn.2018-09.io.spdk:cnode1" 354 }) 355 356 nvmet_cfg["ports"].append({ 357 "addr": { 358 "adrfam": "ipv4", 359 "traddr": address, 360 "trsvcid": "4420", 361 "trtype": "rdma" 362 }, 363 "portid": 1, 364 "referrals": [], 365 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 366 }) 367 with open("kernel.conf", 'w') as fh: 368 fh.write(json.dumps(nvmet_cfg, indent=2)) 369 370 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 371 372 nvmet_cfg = { 373 "ports": [], 374 "hosts": [], 375 "subsystems": [], 376 } 377 378 # Split disks between NIC IP's 379 disks_per_ip = int(len(nvme_list) / len(address_list)) 380 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 381 382 subsys_no = 1 383 port_no = 0 384 for ip, chunk in zip(address_list, disk_chunks): 385 for disk in chunk: 386 nvmet_cfg["subsystems"].append({ 387 "allowed_hosts": [], 388 "attr": { 389 "allow_any_host": "1", 390 "version": "1.3" 391 }, 392 "namespaces": [ 393 { 394 "device": { 395 "path": disk, 396 "uuid": "%s" % uuid.uuid4() 397 }, 398 "enable": 1, 399 "nsid": subsys_no 400 } 401 ], 402 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 403 }) 404 405 nvmet_cfg["ports"].append({ 406 "addr": { 407 "adrfam": "ipv4", 408 "traddr": ip, 409 "trsvcid": "%s" % (4420 + port_no), 410 "trtype": "rdma" 411 }, 412 "portid": subsys_no, 413 "referrals": [], 414 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 415 }) 416 subsys_no += 1 417 port_no += 1 418 419 with open("kernel.conf", "w") as fh: 420 fh.write(json.dumps(nvmet_cfg, indent=2)) 421 pass 422 423 def tgt_start(self): 424 self.log_print("Configuring kernel NVMeOF Target") 425 426 if self.null_block: 427 print("Configuring with null block device.") 428 if len(self.rdma_ips) > 1: 429 print("Testing with null block limited to single RDMA NIC.") 430 print("Please specify only 1 IP address.") 431 exit(1) 432 self.subsys_no = 1 433 self.kernel_tgt_gen_nullblock_conf(self.rdma_ips[0]) 434 else: 435 print("Configuring with NVMe drives.") 436 nvme_list = get_nvme_devices() 437 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.rdma_ips) 438 self.subsys_no = len(nvme_list) 439 440 nvmet_command(self.nvmet_bin, "clear") 441 nvmet_command(self.nvmet_bin, "restore kernel.conf") 442 self.log_print("Done configuring kernel NVMeOF Target") 443 444 445class SPDKTarget(Target): 446 def __init__(self, name, username, password, mode, rdma_ips, num_cores, num_shared_buffers=4096, 447 use_null_block=False, sar_settings=None, **kwargs): 448 super(SPDKTarget, self).__init__(name, username, password, mode, rdma_ips, use_null_block, sar_settings) 449 self.num_cores = num_cores 450 self.num_shared_buffers = num_shared_buffers 451 452 def spdk_tgt_configure(self): 453 self.log_print("Configuring SPDK NVMeOF target via RPC") 454 numa_list = get_used_numa_nodes() 455 456 # Create RDMA transport layer 457 rpc.nvmf.nvmf_create_transport(self.client, trtype="RDMA", num_shared_buffers=self.num_shared_buffers) 458 self.log_print("SPDK NVMeOF transport layer:") 459 rpc.client.print_dict(rpc.nvmf.get_nvmf_transports(self.client)) 460 461 if self.null_block: 462 nvme_section = self.spdk_tgt_add_nullblock() 463 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.rdma_ips, req_num_disks=1) 464 else: 465 nvme_section = self.spdk_tgt_add_nvme_conf() 466 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.rdma_ips) 467 self.log_print("Done configuring SPDK NVMeOF Target") 468 469 def spdk_tgt_add_nullblock(self): 470 self.log_print("Adding null block bdev to config via RPC") 471 rpc.bdev.construct_null_bdev(self.client, 102400, 4096, "Nvme0n1") 472 self.log_print("SPDK Bdevs configuration:") 473 rpc.client.print_dict(rpc.bdev.get_bdevs(self.client)) 474 475 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 476 self.log_print("Adding NVMe bdevs to config via RPC") 477 478 bdfs = get_nvme_devices_bdf() 479 bdfs = [b.replace(":", ".") for b in bdfs] 480 481 if req_num_disks: 482 if req_num_disks > len(bdfs): 483 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 484 sys.exit(1) 485 else: 486 bdfs = bdfs[0:req_num_disks] 487 488 for i, bdf in enumerate(bdfs): 489 rpc.bdev.construct_nvme_bdev(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 490 491 self.log_print("SPDK Bdevs configuration:") 492 rpc.client.print_dict(rpc.bdev.get_bdevs(self.client)) 493 494 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 495 self.log_print("Adding subsystems to config") 496 if not req_num_disks: 497 req_num_disks = get_nvme_devices_count() 498 499 # Distribute bdevs between provided NICs 500 num_disks = range(1, req_num_disks + 1) 501 disks_per_ip = int(len(num_disks) / len(ips)) 502 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 503 504 # Create subsystems, add bdevs to namespaces, add listeners 505 for ip, chunk in zip(ips, disk_chunks): 506 for c in chunk: 507 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 508 serial = "SPDK00%s" % c 509 bdev_name = "Nvme%sn1" % (c - 1) 510 rpc.nvmf.nvmf_subsystem_create(self.client, nqn, serial, 511 allow_any_host=True, max_namespaces=8) 512 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 513 514 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 515 trtype="RDMA", 516 traddr=ip, 517 trsvcid="4420", 518 adrfam="ipv4") 519 520 self.log_print("SPDK NVMeOF subsystem configuration:") 521 rpc.client.print_dict(rpc.nvmf.get_nvmf_subsystems(self.client)) 522 523 def tgt_start(self): 524 self.subsys_no = get_nvme_devices_count() 525 self.log_print("Starting SPDK NVMeOF Target process") 526 nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt") 527 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 528 proc = subprocess.Popen(command, shell=True) 529 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 530 531 with open(self.pid, "w") as fh: 532 fh.write(str(proc.pid)) 533 self.nvmf_proc = proc 534 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 535 self.log_print("Waiting for spdk to initilize...") 536 while True: 537 if os.path.exists("/var/tmp/spdk.sock"): 538 break 539 time.sleep(1) 540 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 541 542 self.spdk_tgt_configure() 543 544 def __del__(self): 545 if hasattr(self, "nvmf_proc"): 546 try: 547 self.nvmf_proc.terminate() 548 self.nvmf_proc.wait() 549 except Exception as e: 550 self.log_print(e) 551 self.nvmf_proc.kill() 552 self.nvmf_proc.communicate() 553 554 555class KernelInitiator(Initiator): 556 def __init__(self, name, username, password, mode, rdma_ips, ip, **kwargs): 557 super(KernelInitiator, self).__init__(name, username, password, mode, rdma_ips, ip) 558 559 def __del__(self): 560 self.ssh_connection.close() 561 562 def kernel_init_connect(self, address_list, subsys_no): 563 subsystems = self.discover_subsystems(address_list, subsys_no) 564 self.log_print("Below connection attempts may result in error messages, this is expected!") 565 for subsystem in subsystems: 566 self.log_print("Trying to connect %s %s %s" % subsystem) 567 cmd = "sudo nvme connect -t rdma -s %s -n %s -a %s" % subsystem 568 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 569 time.sleep(2) 570 571 def kernel_init_disconnect(self, address_list, subsys_no): 572 subsystems = self.discover_subsystems(address_list, subsys_no) 573 for subsystem in subsystems: 574 cmd = "sudo nvme disconnect -n %s" % subsystem[1] 575 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 576 time.sleep(1) 577 578 def gen_fio_filename_conf(self): 579 stdin, stdout, stderr = self.ssh_connection.exec_command("lsblk -o NAME -nlp") 580 out = stdout.read().decode(encoding="utf-8") 581 nvme_list = [x for x in out.split("\n") if "nvme" in x] 582 583 filename_section = "" 584 for i, nvme in enumerate(nvme_list): 585 filename_section = "\n".join([filename_section, 586 "[filename%s]" % i, 587 "filename=%s" % nvme]) 588 589 return filename_section 590 591 592class SPDKInitiator(Initiator): 593 def __init__(self, name, username, password, mode, rdma_ips, ip, num_cores=None, **kwargs): 594 super(SPDKInitiator, self).__init__(name, username, password, mode, rdma_ips, ip) 595 if num_cores: 596 self.num_cores = num_cores 597 598 def install_spdk(self, local_spdk_zip): 599 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 600 self.log_print("Copied sources zip from target") 601 stdin, stdout, stderr = self.ssh_connection.exec_command("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 602 stdout.channel.recv_exit_status() 603 604 self.log_print("Sources unpacked") 605 stdin, stdout, stderr = self.ssh_connection.exec_command( 606 "cd %s; git submodule update --init; ./configure --with-rdma --with-fio=/usr/src/fio;" 607 "make clean; make -j$(($(nproc)*2))" % self.spdk_dir) 608 stdout.channel.recv_exit_status() 609 610 self.log_print("SPDK built") 611 stdin, stdout, stderr = self.ssh_connection.exec_command("sudo %s/scripts/setup.sh" % self.spdk_dir) 612 stdout.channel.recv_exit_status() 613 614 def gen_spdk_bdev_conf(self, remote_subsystem_list): 615 header = "[Nvme]" 616 row_template = """ TransportId "trtype:RDMA adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 617 618 bdev_rows = [row_template.format(svc=x[0], 619 nqn=x[1], 620 ip=x[2], 621 i=i) for i, x in enumerate(remote_subsystem_list)] 622 bdev_rows = "\n".join(bdev_rows) 623 bdev_section = "\n".join([header, bdev_rows]) 624 return bdev_section 625 626 def gen_fio_filename_conf(self, remote_subsystem_list): 627 subsystems = [str(x) for x in range(0, len(remote_subsystem_list))] 628 629 # If num_cpus exists then limit FIO to this number of CPUs 630 # Otherwise - each connected subsystem gets its own CPU 631 if hasattr(self, 'num_cores'): 632 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 633 threads = range(0, int(self.num_cores)) 634 else: 635 threads = range(0, len(subsystems)) 636 637 n = int(len(subsystems) / len(threads)) 638 639 filename_section = "" 640 for t in threads: 641 header = "[filename%s]" % t 642 disks = "\n".join(["filename=Nvme%sn1" % x for x in subsystems[n * t:n + n * t]]) 643 filename_section = "\n".join([filename_section, header, disks]) 644 645 return filename_section 646 647 648if __name__ == "__main__": 649 spdk_zip_path = "/tmp/spdk.zip" 650 target_results_dir = "/tmp/results" 651 652 with open("./scripts/perf/nvmf/config.json", "r") as config: 653 data = json.load(config) 654 655 initiators = [] 656 fio_cases = [] 657 658 # Read user/pass first as order of objects is undeffined 659 uname = data['general']["username"] 660 passwd = data['general']["password"] 661 662 for k, v in data.items(): 663 if "target" in k: 664 if data[k]["mode"] == "spdk": 665 target_obj = SPDKTarget(name=k, username=uname, password=passwd, **v) 666 elif data[k]["mode"] == "kernel": 667 target_obj = KernelTarget(name=k, username=uname, password=passwd, **v) 668 elif "initiator" in k: 669 if data[k]["mode"] == "spdk": 670 init_obj = SPDKInitiator(name=k, username=uname, password=passwd, **v) 671 elif data[k]["mode"] == "kernel": 672 init_obj = KernelInitiator(name=k, username=uname, password=passwd, **v) 673 initiators.append(init_obj) 674 elif "fio" in k: 675 fio_workloads = itertools.product(data[k]["bs"], 676 data[k]["qd"], 677 data[k]["rw"]) 678 679 fio_run_time = data[k]["run_time"] 680 fio_ramp_time = data[k]["ramp_time"] 681 fio_rw_mix_read = data[k]["rwmixread"] 682 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 683 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 684 else: 685 continue 686 687 # Copy and install SPDK on remote initiators 688 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 689 threads = [] 690 for i in initiators: 691 if i.mode == "spdk": 692 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 693 threads.append(t) 694 t.start() 695 for t in threads: 696 t.join() 697 698 target_obj.tgt_start() 699 700 # Poor mans threading 701 # Run FIO tests 702 for block_size, io_depth, rw in fio_workloads: 703 threads = [] 704 configs = [] 705 for i in initiators: 706 if i.mode == "kernel": 707 i.kernel_init_connect(i.rdma_ips, target_obj.subsys_no) 708 709 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 710 fio_num_jobs, fio_ramp_time, fio_run_time) 711 configs.append(cfg) 712 713 for i, cfg in zip(initiators, configs): 714 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 715 threads.append(t) 716 if target_obj.enable_sar: 717 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 718 sar_file_name = ".".join([sar_file_name, "txt"]) 719 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 720 threads.append(t) 721 722 for t in threads: 723 t.start() 724 for t in threads: 725 t.join() 726 727 for i in initiators: 728 if i.mode == "kernel": 729 i.kernel_init_disconnect(i.rdma_ips, target_obj.subsys_no) 730 i.copy_result_files(target_results_dir) 731 732 target_obj.parse_results(target_results_dir) 733