1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16from common import * 17 18 19class Server: 20 def __init__(self, name, username, password, mode, nic_ips, transport): 21 self.name = name 22 self.mode = mode 23 self.username = username 24 self.password = password 25 self.nic_ips = nic_ips 26 self.transport = transport.lower() 27 28 if not re.match("^[A-Za-z0-9]*$", name): 29 self.log_print("Please use a name which contains only letters or numbers") 30 sys.exit(1) 31 32 def log_print(self, msg): 33 print("[%s] %s" % (self.name, msg), flush=True) 34 35 36class Target(Server): 37 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 38 use_null_block=False, sar_settings=None, pcm_settings=None): 39 40 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 41 self.null_block = bool(use_null_block) 42 self.enable_sar = False 43 self.enable_pcm_memory = False 44 self.enable_pcm = False 45 46 if sar_settings: 47 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 48 49 if pcm_settings: 50 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 51 52 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 53 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 54 55 def zip_spdk_sources(self, spdk_dir, dest_file): 56 self.log_print("Zipping SPDK source directory") 57 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 58 for root, directories, files in os.walk(spdk_dir, followlinks=True): 59 for file in files: 60 fh.write(os.path.relpath(os.path.join(root, file))) 61 fh.close() 62 self.log_print("Done zipping") 63 64 def read_json_stats(self, file): 65 with open(file, "r") as json_data: 66 data = json.load(json_data) 67 job_pos = 0 # job_post = 0 because using aggregated results 68 69 # Check if latency is in nano or microseconds to choose correct dict key 70 def get_lat_unit(key_prefix, dict_section): 71 # key prefix - lat, clat or slat. 72 # dict section - portion of json containing latency bucket in question 73 # Return dict key to access the bucket and unit as string 74 for k, v in dict_section.items(): 75 if k.startswith(key_prefix): 76 return k, k.split("_")[1] 77 78 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 79 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 80 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 81 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 82 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 83 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 84 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 85 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 86 87 if "ns" in lat_unit: 88 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 89 if "ns" in clat_unit: 90 read_p99_lat = read_p99_lat / 1000 91 92 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 93 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 94 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 95 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 96 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 97 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 98 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 99 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 100 101 if "ns" in lat_unit: 102 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 103 if "ns" in clat_unit: 104 write_p99_lat = write_p99_lat / 1000 105 106 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, read_p99_lat, 107 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, write_p99_lat] 108 109 def parse_results(self, results_dir, initiator_count=None, run_num=None): 110 files = os.listdir(results_dir) 111 fio_files = filter(lambda x: ".fio" in x, files) 112 json_files = [x for x in files if ".json" in x] 113 114 # Create empty results file 115 csv_file = "nvmf_results.csv" 116 with open(os.path.join(results_dir, csv_file), "w") as fh: 117 header_line = ",".join(["Name", 118 "read_iops", "read_bw", "read_avg_lat_us", 119 "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us", 120 "write_iops", "write_bw", "write_avg_lat_us", 121 "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us"]) 122 fh.write(header_line + "\n") 123 rows = set() 124 125 for fio_config in fio_files: 126 self.log_print("Getting FIO stats for %s" % fio_config) 127 job_name, _ = os.path.splitext(fio_config) 128 129 # If "_CPU" exists in name - ignore it 130 # Initiators for the same job could have diffrent num_cores parameter 131 job_name = re.sub(r"_\d+CPU", "", job_name) 132 job_result_files = [x for x in json_files if job_name in x] 133 self.log_print("Matching result files for current fio config:") 134 for j in job_result_files: 135 self.log_print("\t %s" % j) 136 137 # There may have been more than 1 initiator used in test, need to check that 138 # Result files are created so that string after last "_" separator is server name 139 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 140 inits_avg_results = [] 141 for i in inits_names: 142 self.log_print("\tGetting stats for initiator %s" % i) 143 # There may have been more than 1 test run for this job, calculate average results for initiator 144 i_results = [x for x in job_result_files if i in x] 145 146 separate_stats = [] 147 for r in i_results: 148 stats = self.read_json_stats(os.path.join(results_dir, r)) 149 separate_stats.append(stats) 150 self.log_print(stats) 151 152 z = [sum(c) for c in zip(*separate_stats)] 153 z = [c/len(separate_stats) for c in z] 154 inits_avg_results.append(z) 155 156 self.log_print("\tAverage results for initiator %s" % i) 157 self.log_print(z) 158 159 # Sum average results of all initiators running this FIO job 160 self.log_print("\tTotal results for %s from all initiators" % fio_config) 161 for a in inits_avg_results: 162 self.log_print(a) 163 total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)] 164 rows.add(",".join([job_name, *total])) 165 166 # Save results to file 167 for row in rows: 168 with open(os.path.join(results_dir, csv_file), "a") as fh: 169 fh.write(row + "\n") 170 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 171 172 def measure_sar(self, results_dir, sar_file_name): 173 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 174 time.sleep(self.sar_delay) 175 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 176 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 177 for line in out.split("\n"): 178 if "Average" in line and "CPU" in line: 179 self.log_print("Summary CPU utilization from SAR:") 180 self.log_print(line) 181 if "Average" in line and "all" in line: 182 self.log_print(line) 183 fh.write(out) 184 185 def measure_pcm_memory(self, results_dir, pcm_file_name): 186 time.sleep(self.pcm_delay) 187 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 188 results_dir, pcm_file_name), shell=True) 189 time.sleep(self.pcm_count) 190 pcm_memory.kill() 191 192 def measure_pcm(self, results_dir, pcm_file_name): 193 time.sleep(self.pcm_delay) 194 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 195 results_dir, pcm_file_name), shell=True, check=True) 196 197 198class Initiator(Server): 199 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 200 nvmecli_bin="nvme", workspace="/tmp/spdk", fio_bin="/usr/src/fio/fio"): 201 202 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 203 204 self.ip = ip 205 self.spdk_dir = workspace 206 self.fio_bin = fio_bin 207 self.nvmecli_bin = nvmecli_bin 208 self.ssh_connection = paramiko.SSHClient() 209 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 210 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 211 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 212 self.remote_call("mkdir -p %s" % self.spdk_dir) 213 214 def __del__(self): 215 self.ssh_connection.close() 216 217 def put_file(self, local, remote_dest): 218 ftp = self.ssh_connection.open_sftp() 219 ftp.put(local, remote_dest) 220 ftp.close() 221 222 def get_file(self, remote, local_dest): 223 ftp = self.ssh_connection.open_sftp() 224 ftp.get(remote, local_dest) 225 ftp.close() 226 227 def remote_call(self, cmd): 228 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 229 out = stdout.read().decode(encoding="utf-8") 230 err = stderr.read().decode(encoding="utf-8") 231 return out, err 232 233 def copy_result_files(self, dest_dir): 234 self.log_print("Copying results") 235 236 if not os.path.exists(dest_dir): 237 os.mkdir(dest_dir) 238 239 # Get list of result files from initiator and copy them back to target 240 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 241 file_list = stdout.strip().split("\n") 242 243 for file in file_list: 244 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 245 os.path.join(dest_dir, file)) 246 self.log_print("Done copying results") 247 248 def discover_subsystems(self, address_list, subsys_no): 249 num_nvmes = range(0, subsys_no) 250 nvme_discover_output = "" 251 for ip, subsys_no in itertools.product(address_list, num_nvmes): 252 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 253 nvme_discover_cmd = ["sudo", 254 "%s" % self.nvmecli_bin, 255 "discover", "-t %s" % self.transport, 256 "-s %s" % (4420 + subsys_no), 257 "-a %s" % ip] 258 nvme_discover_cmd = " ".join(nvme_discover_cmd) 259 260 stdout, stderr = self.remote_call(nvme_discover_cmd) 261 if stdout: 262 nvme_discover_output = nvme_discover_output + stdout 263 264 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 265 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 266 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 267 nvme_discover_output) # from nvme discovery output 268 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 269 subsystems = list(set(subsystems)) 270 subsystems.sort(key=lambda x: x[1]) 271 self.log_print("Found matching subsystems on target side:") 272 for s in subsystems: 273 self.log_print(s) 274 275 return subsystems 276 277 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 278 fio_conf_template = """ 279[global] 280ioengine={ioengine} 281{spdk_conf} 282thread=1 283group_reporting=1 284direct=1 285 286norandommap=1 287rw={rw} 288rwmixread={rwmixread} 289bs={block_size} 290iodepth={io_depth} 291time_based=1 292ramp_time={ramp_time} 293runtime={run_time} 294""" 295 if "spdk" in self.mode: 296 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 297 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 298 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 299 ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir 300 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 301 filename_section = self.gen_fio_filename_conf(subsystems) 302 else: 303 ioengine = "libaio" 304 spdk_conf = "" 305 filename_section = self.gen_fio_filename_conf() 306 307 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 308 rw=rw, rwmixread=rwmixread, block_size=block_size, 309 io_depth=io_depth, ramp_time=ramp_time, run_time=run_time) 310 if num_jobs: 311 fio_config = fio_config + "numjobs=%s" % num_jobs 312 fio_config = fio_config + filename_section 313 314 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 315 if hasattr(self, "num_cores"): 316 fio_config_filename += "_%sCPU" % self.num_cores 317 fio_config_filename += ".fio" 318 319 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 320 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 321 self.log_print("Created FIO Config:") 322 self.log_print(fio_config) 323 324 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 325 326 def run_fio(self, fio_config_file, run_num=None): 327 job_name, _ = os.path.splitext(fio_config_file) 328 self.log_print("Starting FIO run for job: %s" % job_name) 329 self.log_print("Using FIO: %s" % self.fio_bin) 330 if run_num: 331 for i in range(1, run_num + 1): 332 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 333 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 334 output, error = self.remote_call(cmd) 335 self.log_print(output) 336 self.log_print(error) 337 else: 338 output_filename = job_name + "_" + self.name + ".json" 339 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 340 output, error = self.remote_call(cmd) 341 self.log_print(output) 342 self.log_print(error) 343 self.log_print("FIO run finished. Results in: %s" % output_filename) 344 345 346class KernelTarget(Target): 347 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 348 use_null_block=False, sar_settings=None, pcm_settings=None, 349 nvmet_bin="nvmetcli", **kwargs): 350 351 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 352 use_null_block, sar_settings, pcm_settings) 353 self.nvmet_bin = nvmet_bin 354 355 def __del__(self): 356 nvmet_command(self.nvmet_bin, "clear") 357 358 def kernel_tgt_gen_nullblock_conf(self, address): 359 nvmet_cfg = { 360 "ports": [], 361 "hosts": [], 362 "subsystems": [], 363 } 364 365 nvmet_cfg["subsystems"].append({ 366 "allowed_hosts": [], 367 "attr": { 368 "allow_any_host": "1", 369 "version": "1.3" 370 }, 371 "namespaces": [ 372 { 373 "device": { 374 "path": "/dev/nullb0", 375 "uuid": "%s" % uuid.uuid4() 376 }, 377 "enable": 1, 378 "nsid": 1 379 } 380 ], 381 "nqn": "nqn.2018-09.io.spdk:cnode1" 382 }) 383 384 nvmet_cfg["ports"].append({ 385 "addr": { 386 "adrfam": "ipv4", 387 "traddr": address, 388 "trsvcid": "4420", 389 "trtype": "%s" % self.transport, 390 }, 391 "portid": 1, 392 "referrals": [], 393 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 394 }) 395 with open("kernel.conf", 'w') as fh: 396 fh.write(json.dumps(nvmet_cfg, indent=2)) 397 398 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 399 400 nvmet_cfg = { 401 "ports": [], 402 "hosts": [], 403 "subsystems": [], 404 } 405 406 # Split disks between NIC IP's 407 disks_per_ip = int(len(nvme_list) / len(address_list)) 408 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 409 410 subsys_no = 1 411 port_no = 0 412 for ip, chunk in zip(address_list, disk_chunks): 413 for disk in chunk: 414 nvmet_cfg["subsystems"].append({ 415 "allowed_hosts": [], 416 "attr": { 417 "allow_any_host": "1", 418 "version": "1.3" 419 }, 420 "namespaces": [ 421 { 422 "device": { 423 "path": disk, 424 "uuid": "%s" % uuid.uuid4() 425 }, 426 "enable": 1, 427 "nsid": subsys_no 428 } 429 ], 430 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 431 }) 432 433 nvmet_cfg["ports"].append({ 434 "addr": { 435 "adrfam": "ipv4", 436 "traddr": ip, 437 "trsvcid": "%s" % (4420 + port_no), 438 "trtype": "%s" % self.transport 439 }, 440 "portid": subsys_no, 441 "referrals": [], 442 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 443 }) 444 subsys_no += 1 445 port_no += 1 446 447 with open("kernel.conf", "w") as fh: 448 fh.write(json.dumps(nvmet_cfg, indent=2)) 449 pass 450 451 def tgt_start(self): 452 self.log_print("Configuring kernel NVMeOF Target") 453 454 if self.null_block: 455 print("Configuring with null block device.") 456 if len(self.nic_ips) > 1: 457 print("Testing with null block limited to single RDMA NIC.") 458 print("Please specify only 1 IP address.") 459 exit(1) 460 self.subsys_no = 1 461 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 462 else: 463 print("Configuring with NVMe drives.") 464 nvme_list = get_nvme_devices() 465 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 466 self.subsys_no = len(nvme_list) 467 468 nvmet_command(self.nvmet_bin, "clear") 469 nvmet_command(self.nvmet_bin, "restore kernel.conf") 470 self.log_print("Done configuring kernel NVMeOF Target") 471 472 473class SPDKTarget(Target): 474 475 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 476 use_null_block=False, sar_settings=None, pcm_settings=None, 477 num_shared_buffers=4096, num_cores=1, **kwargs): 478 479 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 480 use_null_block, sar_settings, pcm_settings) 481 self.num_cores = num_cores 482 self.num_shared_buffers = num_shared_buffers 483 484 def spdk_tgt_configure(self): 485 self.log_print("Configuring SPDK NVMeOF target via RPC") 486 numa_list = get_used_numa_nodes() 487 488 # Create RDMA transport layer 489 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 490 self.log_print("SPDK NVMeOF transport layer:") 491 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 492 493 if self.null_block: 494 nvme_section = self.spdk_tgt_add_nullblock() 495 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 496 else: 497 nvme_section = self.spdk_tgt_add_nvme_conf() 498 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 499 self.log_print("Done configuring SPDK NVMeOF Target") 500 501 def spdk_tgt_add_nullblock(self): 502 self.log_print("Adding null block bdev to config via RPC") 503 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1") 504 self.log_print("SPDK Bdevs configuration:") 505 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 506 507 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 508 self.log_print("Adding NVMe bdevs to config via RPC") 509 510 bdfs = get_nvme_devices_bdf() 511 bdfs = [b.replace(":", ".") for b in bdfs] 512 513 if req_num_disks: 514 if req_num_disks > len(bdfs): 515 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 516 sys.exit(1) 517 else: 518 bdfs = bdfs[0:req_num_disks] 519 520 for i, bdf in enumerate(bdfs): 521 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 522 523 self.log_print("SPDK Bdevs configuration:") 524 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 525 526 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 527 self.log_print("Adding subsystems to config") 528 if not req_num_disks: 529 req_num_disks = get_nvme_devices_count() 530 531 # Distribute bdevs between provided NICs 532 num_disks = range(1, req_num_disks + 1) 533 disks_per_ip = int(len(num_disks) / len(ips)) 534 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 535 536 # Create subsystems, add bdevs to namespaces, add listeners 537 for ip, chunk in zip(ips, disk_chunks): 538 for c in chunk: 539 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 540 serial = "SPDK00%s" % c 541 bdev_name = "Nvme%sn1" % (c - 1) 542 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 543 allow_any_host=True, max_namespaces=8) 544 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 545 546 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 547 trtype=self.transport, 548 traddr=ip, 549 trsvcid="4420", 550 adrfam="ipv4") 551 552 self.log_print("SPDK NVMeOF subsystem configuration:") 553 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 554 555 def tgt_start(self): 556 self.subsys_no = get_nvme_devices_count() 557 self.log_print("Starting SPDK NVMeOF Target process") 558 nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt") 559 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 560 proc = subprocess.Popen(command, shell=True) 561 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 562 563 with open(self.pid, "w") as fh: 564 fh.write(str(proc.pid)) 565 self.nvmf_proc = proc 566 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 567 self.log_print("Waiting for spdk to initilize...") 568 while True: 569 if os.path.exists("/var/tmp/spdk.sock"): 570 break 571 time.sleep(1) 572 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 573 574 self.spdk_tgt_configure() 575 576 def __del__(self): 577 if hasattr(self, "nvmf_proc"): 578 try: 579 self.nvmf_proc.terminate() 580 self.nvmf_proc.wait() 581 except Exception as e: 582 self.log_print(e) 583 self.nvmf_proc.kill() 584 self.nvmf_proc.communicate() 585 586 587class KernelInitiator(Initiator): 588 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 589 fio_bin="/usr/src/fio/fio", **kwargs): 590 591 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 592 fio_bin=fio_bin) 593 594 self.extra_params = "" 595 if kwargs["extra_params"]: 596 self.extra_params = kwargs["extra_params"] 597 598 def __del__(self): 599 self.ssh_connection.close() 600 601 def kernel_init_connect(self, address_list, subsys_no): 602 subsystems = self.discover_subsystems(address_list, subsys_no) 603 self.log_print("Below connection attempts may result in error messages, this is expected!") 604 for subsystem in subsystems: 605 self.log_print("Trying to connect %s %s %s" % subsystem) 606 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 607 self.transport, 608 *subsystem, 609 self.extra_params)) 610 time.sleep(2) 611 612 def kernel_init_disconnect(self, address_list, subsys_no): 613 subsystems = self.discover_subsystems(address_list, subsys_no) 614 for subsystem in subsystems: 615 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 616 time.sleep(1) 617 618 def gen_fio_filename_conf(self): 619 out, err = self.remote_call("lsblk -o NAME -nlp") 620 nvme_list = [x for x in out.split("\n") if "nvme" in x] 621 622 filename_section = "" 623 for i, nvme in enumerate(nvme_list): 624 filename_section = "\n".join([filename_section, 625 "[filename%s]" % i, 626 "filename=%s" % nvme]) 627 628 return filename_section 629 630 631class SPDKInitiator(Initiator): 632 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 633 num_cores=1, fio_bin="/usr/src/fio/fio", **kwargs): 634 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 635 fio_bin=fio_bin) 636 637 self.num_cores = num_cores 638 639 def install_spdk(self, local_spdk_zip): 640 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 641 self.log_print("Copied sources zip from target") 642 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 643 644 self.log_print("Sources unpacked") 645 self.log_print("Using fio binary %s" % self.fio_bin) 646 self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;" 647 "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 648 649 self.log_print("SPDK built") 650 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 651 652 def gen_spdk_bdev_conf(self, remote_subsystem_list): 653 header = "[Nvme]" 654 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 655 656 bdev_rows = [row_template.format(transport=self.transport, 657 svc=x[0], 658 nqn=x[1], 659 ip=x[2], 660 i=i) for i, x in enumerate(remote_subsystem_list)] 661 bdev_rows = "\n".join(bdev_rows) 662 bdev_section = "\n".join([header, bdev_rows]) 663 return bdev_section 664 665 def gen_fio_filename_conf(self, remote_subsystem_list): 666 subsystems = [str(x) for x in range(0, len(remote_subsystem_list))] 667 668 # If num_cpus exists then limit FIO to this number of CPUs 669 # Otherwise - each connected subsystem gets its own CPU 670 if hasattr(self, 'num_cores'): 671 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 672 threads = range(0, int(self.num_cores)) 673 else: 674 threads = range(0, len(subsystems)) 675 676 n = int(len(subsystems) / len(threads)) 677 678 filename_section = "" 679 for t in threads: 680 header = "[filename%s]" % t 681 disks = "\n".join(["filename=Nvme%sn1" % x for x in subsystems[n * t:n + n * t]]) 682 filename_section = "\n".join([filename_section, header, disks]) 683 684 return filename_section 685 686 687if __name__ == "__main__": 688 spdk_zip_path = "/tmp/spdk.zip" 689 target_results_dir = "/tmp/results" 690 691 if (len(sys.argv) > 1): 692 config_file_path = sys.argv[1] 693 else: 694 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 695 config_file_path = os.path.join(script_full_dir, "config.json") 696 697 print("Using config file: %s" % config_file_path) 698 with open(config_file_path, "r") as config: 699 data = json.load(config) 700 701 initiators = [] 702 fio_cases = [] 703 704 for k, v in data.items(): 705 if "target" in k: 706 if data[k]["mode"] == "spdk": 707 target_obj = SPDKTarget(name=k, **data["general"], **v) 708 elif data[k]["mode"] == "kernel": 709 target_obj = KernelTarget(name=k, **data["general"], **v) 710 elif "initiator" in k: 711 if data[k]["mode"] == "spdk": 712 init_obj = SPDKInitiator(name=k, **data["general"], **v) 713 elif data[k]["mode"] == "kernel": 714 init_obj = KernelInitiator(name=k, **data["general"], **v) 715 initiators.append(init_obj) 716 elif "fio" in k: 717 fio_workloads = itertools.product(data[k]["bs"], 718 data[k]["qd"], 719 data[k]["rw"]) 720 721 fio_run_time = data[k]["run_time"] 722 fio_ramp_time = data[k]["ramp_time"] 723 fio_rw_mix_read = data[k]["rwmixread"] 724 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 725 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 726 else: 727 continue 728 729 # Copy and install SPDK on remote initiators 730 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 731 threads = [] 732 for i in initiators: 733 if i.mode == "spdk": 734 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 735 threads.append(t) 736 t.start() 737 for t in threads: 738 t.join() 739 740 target_obj.tgt_start() 741 742 # Poor mans threading 743 # Run FIO tests 744 for block_size, io_depth, rw in fio_workloads: 745 threads = [] 746 configs = [] 747 for i in initiators: 748 if i.mode == "kernel": 749 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 750 751 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 752 fio_num_jobs, fio_ramp_time, fio_run_time) 753 configs.append(cfg) 754 755 for i, cfg in zip(initiators, configs): 756 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 757 threads.append(t) 758 if target_obj.enable_sar: 759 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 760 sar_file_name = ".".join([sar_file_name, "txt"]) 761 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 762 threads.append(t) 763 764 if target_obj.enable_pcm: 765 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 766 pcm_file_name = ".".join([pcm_file_name, "csv"]) 767 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 768 threads.append(t) 769 770 if target_obj.enable_pcm_memory: 771 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 772 pcm_file_name = ".".join([pcm_file_name, "csv"]) 773 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 774 threads.append(t) 775 776 for t in threads: 777 t.start() 778 for t in threads: 779 t.join() 780 781 for i in initiators: 782 if i.mode == "kernel": 783 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 784 i.copy_result_files(target_results_dir) 785 786 target_obj.parse_results(target_results_dir) 787