1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16from common import * 17 18 19class Server: 20 def __init__(self, name, username, password, mode, nic_ips, transport): 21 self.name = name 22 self.mode = mode 23 self.username = username 24 self.password = password 25 self.nic_ips = nic_ips 26 self.transport = transport.lower() 27 28 if not re.match("^[A-Za-z0-9]*$", name): 29 self.log_print("Please use a name which contains only letters or numbers") 30 sys.exit(1) 31 32 def log_print(self, msg): 33 print("[%s] %s" % (self.name, msg), flush=True) 34 35 36class Target(Server): 37 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", use_null_block=False, sar_settings=None): 38 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 39 self.null_block = bool(use_null_block) 40 self.enable_sar = False 41 if sar_settings: 42 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 43 44 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 45 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 46 47 def zip_spdk_sources(self, spdk_dir, dest_file): 48 self.log_print("Zipping SPDK source directory") 49 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 50 for root, directories, files in os.walk(spdk_dir): 51 for file in files: 52 fh.write(os.path.relpath(os.path.join(root, file))) 53 fh.close() 54 self.log_print("Done zipping") 55 56 def read_json_stats(self, file): 57 with open(file, "r") as json_data: 58 data = json.load(json_data) 59 job_pos = 0 # job_post = 0 because using aggregated results 60 61 # Check if latency is in nano or microseconds to choose correct dict key 62 def get_lat_unit(key_prefix, dict_section): 63 # key prefix - lat, clat or slat. 64 # dict section - portion of json containing latency bucket in question 65 # Return dict key to access the bucket and unit as string 66 for k, v in dict_section.items(): 67 if k.startswith(key_prefix): 68 return k, k.split("_")[1] 69 70 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 71 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 72 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 73 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 74 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 75 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 76 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 77 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 78 79 if "ns" in lat_unit: 80 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 81 if "ns" in clat_unit: 82 read_p99_lat = read_p99_lat / 1000 83 84 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 85 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 86 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 87 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 88 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 89 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 90 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 91 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 92 93 if "ns" in lat_unit: 94 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 95 if "ns" in clat_unit: 96 write_p99_lat = write_p99_lat / 1000 97 98 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, read_p99_lat, 99 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, write_p99_lat] 100 101 def parse_results(self, results_dir, initiator_count=None, run_num=None): 102 files = os.listdir(results_dir) 103 fio_files = filter(lambda x: ".fio" in x, files) 104 json_files = [x for x in files if ".json" in x] 105 106 # Create empty results file 107 csv_file = "nvmf_results.csv" 108 with open(os.path.join(results_dir, csv_file), "w") as fh: 109 header_line = ",".join(["Name", 110 "read_iops", "read_bw", "read_avg_lat_us", 111 "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us", 112 "write_iops", "write_bw", "write_avg_lat_us", 113 "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us"]) 114 fh.write(header_line + "\n") 115 rows = set() 116 117 for fio_config in fio_files: 118 self.log_print("Getting FIO stats for %s" % fio_config) 119 job_name, _ = os.path.splitext(fio_config) 120 121 # If "_CPU" exists in name - ignore it 122 # Initiators for the same job could have diffrent num_cores parameter 123 job_name = re.sub(r"_\d+CPU", "", job_name) 124 job_result_files = [x for x in json_files if job_name in x] 125 self.log_print("Matching result files for current fio config:") 126 for j in job_result_files: 127 self.log_print("\t %s" % j) 128 129 # There may have been more than 1 initiator used in test, need to check that 130 # Result files are created so that string after last "_" separator is server name 131 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 132 inits_avg_results = [] 133 for i in inits_names: 134 self.log_print("\tGetting stats for initiator %s" % i) 135 # There may have been more than 1 test run for this job, calculate average results for initiator 136 i_results = [x for x in job_result_files if i in x] 137 138 separate_stats = [] 139 for r in i_results: 140 stats = self.read_json_stats(os.path.join(results_dir, r)) 141 separate_stats.append(stats) 142 self.log_print(stats) 143 144 z = [sum(c) for c in zip(*separate_stats)] 145 z = [c/len(separate_stats) for c in z] 146 inits_avg_results.append(z) 147 148 self.log_print("\tAverage results for initiator %s" % i) 149 self.log_print(z) 150 151 # Sum average results of all initiators running this FIO job 152 self.log_print("\tTotal results for %s from all initiators" % fio_config) 153 for a in inits_avg_results: 154 self.log_print(a) 155 total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)] 156 rows.add(",".join([job_name, *total])) 157 158 # Save results to file 159 for row in rows: 160 with open(os.path.join(results_dir, csv_file), "a") as fh: 161 fh.write(row + "\n") 162 163 def measure_sar(self, results_dir, sar_file_name): 164 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 165 time.sleep(self.sar_delay) 166 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 167 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 168 for line in out.split("\n"): 169 if "Average" in line and "CPU" in line: 170 self.log_print("Summary CPU utilization from SAR:") 171 self.log_print(line) 172 if "Average" in line and "all" in line: 173 self.log_print(line) 174 fh.write(out) 175 176 177class Initiator(Server): 178 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", nvmecli_dir=None, workspace="/tmp/spdk"): 179 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 180 self.ip = ip 181 self.spdk_dir = workspace 182 183 if nvmecli_dir: 184 self.nvmecli_bin = os.path.join(nvmecli_dir, "nvme") 185 else: 186 self.nvmecli_bin = "nvme" # Use system-wide nvme-cli 187 188 self.ssh_connection = paramiko.SSHClient() 189 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy) 190 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 191 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 192 self.remote_call("mkdir -p %s" % self.spdk_dir) 193 194 def __del__(self): 195 self.ssh_connection.close() 196 197 def put_file(self, local, remote_dest): 198 ftp = self.ssh_connection.open_sftp() 199 ftp.put(local, remote_dest) 200 ftp.close() 201 202 def get_file(self, remote, local_dest): 203 ftp = self.ssh_connection.open_sftp() 204 ftp.get(remote, local_dest) 205 ftp.close() 206 207 def remote_call(self, cmd): 208 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 209 out = stdout.read().decode(encoding="utf-8") 210 err = stderr.read().decode(encoding="utf-8") 211 return out, err 212 213 def copy_result_files(self, dest_dir): 214 self.log_print("Copying results") 215 216 if not os.path.exists(dest_dir): 217 os.mkdir(dest_dir) 218 219 # Get list of result files from initiator and copy them back to target 220 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 221 file_list = stdout.strip().split("\n") 222 223 for file in file_list: 224 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 225 os.path.join(dest_dir, file)) 226 self.log_print("Done copying results") 227 228 def discover_subsystems(self, address_list, subsys_no): 229 num_nvmes = range(0, subsys_no) 230 nvme_discover_output = "" 231 for ip, subsys_no in itertools.product(address_list, num_nvmes): 232 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 233 nvme_discover_cmd = ["sudo", 234 "%s" % self.nvmecli_bin, 235 "discover", "-t %s" % self.transport, 236 "-s %s" % (4420 + subsys_no), 237 "-a %s" % ip] 238 nvme_discover_cmd = " ".join(nvme_discover_cmd) 239 240 stdout, stderr = self.remote_call(nvme_discover_cmd) 241 if stdout: 242 nvme_discover_output = nvme_discover_output + stdout 243 244 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 245 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 246 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 247 nvme_discover_output) # from nvme discovery output 248 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 249 subsystems = list(set(subsystems)) 250 subsystems.sort(key=lambda x: x[1]) 251 self.log_print("Found matching subsystems on target side:") 252 for s in subsystems: 253 self.log_print(s) 254 255 return subsystems 256 257 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 258 fio_conf_template = """ 259[global] 260ioengine={ioengine} 261{spdk_conf} 262thread=1 263group_reporting=1 264direct=1 265 266norandommap=1 267rw={rw} 268rwmixread={rwmixread} 269bs={block_size} 270iodepth={io_depth} 271time_based=1 272ramp_time={ramp_time} 273runtime={run_time} 274""" 275 if "spdk" in self.mode: 276 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 277 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 278 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 279 ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir 280 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 281 filename_section = self.gen_fio_filename_conf(subsystems) 282 else: 283 ioengine = "libaio" 284 spdk_conf = "" 285 filename_section = self.gen_fio_filename_conf() 286 287 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 288 rw=rw, rwmixread=rwmixread, block_size=block_size, 289 io_depth=io_depth, ramp_time=ramp_time, run_time=run_time) 290 if num_jobs: 291 fio_config = fio_config + "numjobs=%s" % num_jobs 292 fio_config = fio_config + filename_section 293 294 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 295 if hasattr(self, "num_cores"): 296 fio_config_filename += "_%sCPU" % self.num_cores 297 fio_config_filename += ".fio" 298 299 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 300 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 301 self.log_print("Created FIO Config:") 302 self.log_print(fio_config) 303 304 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 305 306 def run_fio(self, fio_config_file, run_num=None): 307 job_name, _ = os.path.splitext(fio_config_file) 308 self.log_print("Starting FIO run for job: %s" % job_name) 309 if run_num: 310 for i in range(1, run_num + 1): 311 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 312 cmd = "sudo /usr/src/fio/fio %s --output-format=json --output=%s" % (fio_config_file, output_filename) 313 output, error = self.remote_call(cmd) 314 self.log_print(output) 315 self.log_print(error) 316 else: 317 output_filename = job_name + "_" + self.name + ".json" 318 cmd = "sudo /usr/src/fio/fio %s --output-format=json --output=%s" % (fio_config_file, output_filename) 319 output, error = self.remote_call(cmd) 320 self.log_print(output) 321 self.log_print(error) 322 self.log_print("FIO run finished. Results in: %s" % output_filename) 323 324 325class KernelTarget(Target): 326 def __init__(self, name, username, password, mode, nic_ips, 327 use_null_block=False, sar_settings=None, transport="rdma", nvmet_dir=None, **kwargs): 328 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, 329 transport, use_null_block, sar_settings) 330 331 if nvmet_dir: 332 self.nvmet_bin = os.path.join(nvmet_dir, "nvmetcli") 333 else: 334 self.nvmet_bin = "nvmetcli" 335 336 def __del__(self): 337 nvmet_command(self.nvmet_bin, "clear") 338 339 def kernel_tgt_gen_nullblock_conf(self, address): 340 nvmet_cfg = { 341 "ports": [], 342 "hosts": [], 343 "subsystems": [], 344 } 345 346 nvmet_cfg["subsystems"].append({ 347 "allowed_hosts": [], 348 "attr": { 349 "allow_any_host": "1", 350 "version": "1.3" 351 }, 352 "namespaces": [ 353 { 354 "device": { 355 "path": "/dev/nullb0", 356 "uuid": "%s" % uuid.uuid4() 357 }, 358 "enable": 1, 359 "nsid": 1 360 } 361 ], 362 "nqn": "nqn.2018-09.io.spdk:cnode1" 363 }) 364 365 nvmet_cfg["ports"].append({ 366 "addr": { 367 "adrfam": "ipv4", 368 "traddr": address, 369 "trsvcid": "4420", 370 "trtype": "%s" % self.transport, 371 }, 372 "portid": 1, 373 "referrals": [], 374 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 375 }) 376 with open("kernel.conf", 'w') as fh: 377 fh.write(json.dumps(nvmet_cfg, indent=2)) 378 379 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 380 381 nvmet_cfg = { 382 "ports": [], 383 "hosts": [], 384 "subsystems": [], 385 } 386 387 # Split disks between NIC IP's 388 disks_per_ip = int(len(nvme_list) / len(address_list)) 389 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 390 391 subsys_no = 1 392 port_no = 0 393 for ip, chunk in zip(address_list, disk_chunks): 394 for disk in chunk: 395 nvmet_cfg["subsystems"].append({ 396 "allowed_hosts": [], 397 "attr": { 398 "allow_any_host": "1", 399 "version": "1.3" 400 }, 401 "namespaces": [ 402 { 403 "device": { 404 "path": disk, 405 "uuid": "%s" % uuid.uuid4() 406 }, 407 "enable": 1, 408 "nsid": subsys_no 409 } 410 ], 411 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 412 }) 413 414 nvmet_cfg["ports"].append({ 415 "addr": { 416 "adrfam": "ipv4", 417 "traddr": ip, 418 "trsvcid": "%s" % (4420 + port_no), 419 "trtype": "%s" % self.transport 420 }, 421 "portid": subsys_no, 422 "referrals": [], 423 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 424 }) 425 subsys_no += 1 426 port_no += 1 427 428 with open("kernel.conf", "w") as fh: 429 fh.write(json.dumps(nvmet_cfg, indent=2)) 430 pass 431 432 def tgt_start(self): 433 self.log_print("Configuring kernel NVMeOF Target") 434 435 if self.null_block: 436 print("Configuring with null block device.") 437 if len(self.nic_ips) > 1: 438 print("Testing with null block limited to single RDMA NIC.") 439 print("Please specify only 1 IP address.") 440 exit(1) 441 self.subsys_no = 1 442 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 443 else: 444 print("Configuring with NVMe drives.") 445 nvme_list = get_nvme_devices() 446 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 447 self.subsys_no = len(nvme_list) 448 449 nvmet_command(self.nvmet_bin, "clear") 450 nvmet_command(self.nvmet_bin, "restore kernel.conf") 451 self.log_print("Done configuring kernel NVMeOF Target") 452 453 454class SPDKTarget(Target): 455 def __init__(self, name, username, password, mode, nic_ips, num_cores, num_shared_buffers=4096, 456 use_null_block=False, sar_settings=None, transport="rdma", **kwargs): 457 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, use_null_block, sar_settings) 458 self.num_cores = num_cores 459 self.num_shared_buffers = num_shared_buffers 460 461 def spdk_tgt_configure(self): 462 self.log_print("Configuring SPDK NVMeOF target via RPC") 463 numa_list = get_used_numa_nodes() 464 465 # Create RDMA transport layer 466 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 467 self.log_print("SPDK NVMeOF transport layer:") 468 rpc.client.print_dict(rpc.nvmf.get_nvmf_transports(self.client)) 469 470 if self.null_block: 471 nvme_section = self.spdk_tgt_add_nullblock() 472 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 473 else: 474 nvme_section = self.spdk_tgt_add_nvme_conf() 475 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 476 self.log_print("Done configuring SPDK NVMeOF Target") 477 478 def spdk_tgt_add_nullblock(self): 479 self.log_print("Adding null block bdev to config via RPC") 480 rpc.bdev.construct_null_bdev(self.client, 102400, 4096, "Nvme0n1") 481 self.log_print("SPDK Bdevs configuration:") 482 rpc.client.print_dict(rpc.bdev.get_bdevs(self.client)) 483 484 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 485 self.log_print("Adding NVMe bdevs to config via RPC") 486 487 bdfs = get_nvme_devices_bdf() 488 bdfs = [b.replace(":", ".") for b in bdfs] 489 490 if req_num_disks: 491 if req_num_disks > len(bdfs): 492 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 493 sys.exit(1) 494 else: 495 bdfs = bdfs[0:req_num_disks] 496 497 for i, bdf in enumerate(bdfs): 498 rpc.bdev.construct_nvme_bdev(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 499 500 self.log_print("SPDK Bdevs configuration:") 501 rpc.client.print_dict(rpc.bdev.get_bdevs(self.client)) 502 503 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 504 self.log_print("Adding subsystems to config") 505 if not req_num_disks: 506 req_num_disks = get_nvme_devices_count() 507 508 # Distribute bdevs between provided NICs 509 num_disks = range(1, req_num_disks + 1) 510 disks_per_ip = int(len(num_disks) / len(ips)) 511 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 512 513 # Create subsystems, add bdevs to namespaces, add listeners 514 for ip, chunk in zip(ips, disk_chunks): 515 for c in chunk: 516 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 517 serial = "SPDK00%s" % c 518 bdev_name = "Nvme%sn1" % (c - 1) 519 rpc.nvmf.nvmf_subsystem_create(self.client, nqn, serial, 520 allow_any_host=True, max_namespaces=8) 521 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 522 523 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 524 trtype=self.transport, 525 traddr=ip, 526 trsvcid="4420", 527 adrfam="ipv4") 528 529 self.log_print("SPDK NVMeOF subsystem configuration:") 530 rpc.client.print_dict(rpc.nvmf.get_nvmf_subsystems(self.client)) 531 532 def tgt_start(self): 533 self.subsys_no = get_nvme_devices_count() 534 self.log_print("Starting SPDK NVMeOF Target process") 535 nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt") 536 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 537 proc = subprocess.Popen(command, shell=True) 538 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 539 540 with open(self.pid, "w") as fh: 541 fh.write(str(proc.pid)) 542 self.nvmf_proc = proc 543 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 544 self.log_print("Waiting for spdk to initilize...") 545 while True: 546 if os.path.exists("/var/tmp/spdk.sock"): 547 break 548 time.sleep(1) 549 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 550 551 self.spdk_tgt_configure() 552 553 def __del__(self): 554 if hasattr(self, "nvmf_proc"): 555 try: 556 self.nvmf_proc.terminate() 557 self.nvmf_proc.wait() 558 except Exception as e: 559 self.log_print(e) 560 self.nvmf_proc.kill() 561 self.nvmf_proc.communicate() 562 563 564class KernelInitiator(Initiator): 565 def __init__(self, name, username, password, mode, nic_ips, ip, transport, **kwargs): 566 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport) 567 568 def __del__(self): 569 self.ssh_connection.close() 570 571 def kernel_init_connect(self, address_list, subsys_no): 572 subsystems = self.discover_subsystems(address_list, subsys_no) 573 self.log_print("Below connection attempts may result in error messages, this is expected!") 574 for subsystem in subsystems: 575 self.log_print("Trying to connect %s %s %s" % subsystem) 576 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s -i 8" % (self.nvmecli_bin, self.transport, *subsystem)) 577 time.sleep(2) 578 579 def kernel_init_disconnect(self, address_list, subsys_no): 580 subsystems = self.discover_subsystems(address_list, subsys_no) 581 for subsystem in subsystems: 582 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 583 time.sleep(1) 584 585 def gen_fio_filename_conf(self): 586 out, err = self.remote_call("lsblk -o NAME -nlp") 587 nvme_list = [x for x in out.split("\n") if "nvme" in x] 588 589 filename_section = "" 590 for i, nvme in enumerate(nvme_list): 591 filename_section = "\n".join([filename_section, 592 "[filename%s]" % i, 593 "filename=%s" % nvme]) 594 595 return filename_section 596 597 598class SPDKInitiator(Initiator): 599 def __init__(self, name, username, password, mode, nic_ips, ip, num_cores=None, transport="rdma", **kwargs): 600 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport) 601 if num_cores: 602 self.num_cores = num_cores 603 604 def install_spdk(self, local_spdk_zip): 605 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 606 self.log_print("Copied sources zip from target") 607 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 608 609 self.log_print("Sources unpacked") 610 self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=/usr/src/fio;" 611 "make clean; make -j$(($(nproc)*2))" % self.spdk_dir) 612 613 self.log_print("SPDK built") 614 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 615 616 def gen_spdk_bdev_conf(self, remote_subsystem_list): 617 header = "[Nvme]" 618 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 619 620 bdev_rows = [row_template.format(transport=self.transport, 621 svc=x[0], 622 nqn=x[1], 623 ip=x[2], 624 i=i) for i, x in enumerate(remote_subsystem_list)] 625 bdev_rows = "\n".join(bdev_rows) 626 bdev_section = "\n".join([header, bdev_rows]) 627 return bdev_section 628 629 def gen_fio_filename_conf(self, remote_subsystem_list): 630 subsystems = [str(x) for x in range(0, len(remote_subsystem_list))] 631 632 # If num_cpus exists then limit FIO to this number of CPUs 633 # Otherwise - each connected subsystem gets its own CPU 634 if hasattr(self, 'num_cores'): 635 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 636 threads = range(0, int(self.num_cores)) 637 else: 638 threads = range(0, len(subsystems)) 639 640 n = int(len(subsystems) / len(threads)) 641 642 filename_section = "" 643 for t in threads: 644 header = "[filename%s]" % t 645 disks = "\n".join(["filename=Nvme%sn1" % x for x in subsystems[n * t:n + n * t]]) 646 filename_section = "\n".join([filename_section, header, disks]) 647 648 return filename_section 649 650 651if __name__ == "__main__": 652 spdk_zip_path = "/tmp/spdk.zip" 653 target_results_dir = "/tmp/results" 654 655 with open("./scripts/perf/nvmf/config.json", "r") as config: 656 data = json.load(config) 657 658 initiators = [] 659 fio_cases = [] 660 661 for k, v in data.items(): 662 if "target" in k: 663 if data[k]["mode"] == "spdk": 664 target_obj = SPDKTarget(name=k, **data["general"], **v) 665 elif data[k]["mode"] == "kernel": 666 target_obj = KernelTarget(name=k, **data["general"], **v) 667 elif "initiator" in k: 668 if data[k]["mode"] == "spdk": 669 init_obj = SPDKInitiator(name=k, **data["general"], **v) 670 elif data[k]["mode"] == "kernel": 671 init_obj = KernelInitiator(name=k, **data["general"], **v) 672 initiators.append(init_obj) 673 elif "fio" in k: 674 fio_workloads = itertools.product(data[k]["bs"], 675 data[k]["qd"], 676 data[k]["rw"]) 677 678 fio_run_time = data[k]["run_time"] 679 fio_ramp_time = data[k]["ramp_time"] 680 fio_rw_mix_read = data[k]["rwmixread"] 681 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 682 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 683 else: 684 continue 685 686 # Copy and install SPDK on remote initiators 687 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 688 threads = [] 689 for i in initiators: 690 if i.mode == "spdk": 691 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 692 threads.append(t) 693 t.start() 694 for t in threads: 695 t.join() 696 697 target_obj.tgt_start() 698 699 # Poor mans threading 700 # Run FIO tests 701 for block_size, io_depth, rw in fio_workloads: 702 threads = [] 703 configs = [] 704 for i in initiators: 705 if i.mode == "kernel": 706 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 707 708 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 709 fio_num_jobs, fio_ramp_time, fio_run_time) 710 configs.append(cfg) 711 712 for i, cfg in zip(initiators, configs): 713 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 714 threads.append(t) 715 if target_obj.enable_sar: 716 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 717 sar_file_name = ".".join([sar_file_name, "txt"]) 718 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 719 threads.append(t) 720 721 for t in threads: 722 t.start() 723 for t in threads: 724 t.join() 725 726 for i in initiators: 727 if i.mode == "kernel": 728 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 729 i.copy_result_files(target_results_dir) 730 731 target_obj.parse_results(target_results_dir) 732