1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from common import * 18 19 20class Server: 21 def __init__(self, name, username, password, mode, nic_ips, transport): 22 self.name = name 23 self.mode = mode 24 self.username = username 25 self.password = password 26 self.nic_ips = nic_ips 27 self.transport = transport.lower() 28 29 if not re.match("^[A-Za-z0-9]*$", name): 30 self.log_print("Please use a name which contains only letters or numbers") 31 sys.exit(1) 32 33 def log_print(self, msg): 34 print("[%s] %s" % (self.name, msg), flush=True) 35 36 37class Target(Server): 38 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 39 use_null_block=False, sar_settings=None, pcm_settings=None): 40 41 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 42 self.null_block = bool(use_null_block) 43 self.enable_sar = False 44 self.enable_pcm_memory = False 45 self.enable_pcm = False 46 47 if sar_settings: 48 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 49 50 if pcm_settings: 51 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 52 53 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 54 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 55 56 def zip_spdk_sources(self, spdk_dir, dest_file): 57 self.log_print("Zipping SPDK source directory") 58 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 59 for root, directories, files in os.walk(spdk_dir, followlinks=True): 60 for file in files: 61 fh.write(os.path.relpath(os.path.join(root, file))) 62 fh.close() 63 self.log_print("Done zipping") 64 65 def read_json_stats(self, file): 66 with open(file, "r") as json_data: 67 data = json.load(json_data) 68 job_pos = 0 # job_post = 0 because using aggregated results 69 70 # Check if latency is in nano or microseconds to choose correct dict key 71 def get_lat_unit(key_prefix, dict_section): 72 # key prefix - lat, clat or slat. 73 # dict section - portion of json containing latency bucket in question 74 # Return dict key to access the bucket and unit as string 75 for k, v in dict_section.items(): 76 if k.startswith(key_prefix): 77 return k, k.split("_")[1] 78 79 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 80 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 81 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 82 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 83 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 84 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 85 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 86 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 87 read_p99.9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"]) 88 read_p99.99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"]) 89 read_p99.999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"]) 90 91 if "ns" in lat_unit: 92 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 93 if "ns" in clat_unit: 94 read_p99_lat = read_p99_lat / 1000 95 read_p99.9_lat = read_p99.9_lat / 1000 96 read_p99.99_lat = read_p99.99_lat / 1000 97 read_p99.999_lat = read_p99.999_lat / 1000 98 99 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 100 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 101 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 102 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 103 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 104 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 105 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 106 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 107 write_p99.9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"]) 108 write_p99.99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"]) 109 write_p99.999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"]) 110 111 if "ns" in lat_unit: 112 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 113 if "ns" in clat_unit: 114 write_p99_lat = write_p99_lat / 1000 115 write_p99.9_lat = write_p99.9_lat / 1000 116 write_p99.99_lat = write_p99.99_lat / 1000 117 write_p99.999_lat = write_p99.999_lat / 1000 118 119 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 120 read_p99_lat, read_p99.9_lat, read_p99.99_lat, read_p99.999_lat, 121 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 122 write_p99_lat, write_p99.9_lat, write_p99.99_lat, write_p99.999_lat] 123 124 def parse_results(self, results_dir, initiator_count=None, run_num=None): 125 files = os.listdir(results_dir) 126 fio_files = filter(lambda x: ".fio" in x, files) 127 json_files = [x for x in files if ".json" in x] 128 129 # Create empty results file 130 csv_file = "nvmf_results.csv" 131 with open(os.path.join(results_dir, csv_file), "w") as fh: 132 header_line = ",".join(["Name", 133 "read_iops", "read_bw", "read_avg_lat_us", 134 "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us", 135 "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 136 "write_iops", "write_bw", "write_avg_lat_us", 137 "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us", 138 "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]) 139 fh.write(header_line + "\n") 140 rows = set() 141 142 for fio_config in fio_files: 143 self.log_print("Getting FIO stats for %s" % fio_config) 144 job_name, _ = os.path.splitext(fio_config) 145 146 # If "_CPU" exists in name - ignore it 147 # Initiators for the same job could have diffrent num_cores parameter 148 job_name = re.sub(r"_\d+CPU", "", job_name) 149 job_result_files = [x for x in json_files if job_name in x] 150 self.log_print("Matching result files for current fio config:") 151 for j in job_result_files: 152 self.log_print("\t %s" % j) 153 154 # There may have been more than 1 initiator used in test, need to check that 155 # Result files are created so that string after last "_" separator is server name 156 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 157 inits_avg_results = [] 158 for i in inits_names: 159 self.log_print("\tGetting stats for initiator %s" % i) 160 # There may have been more than 1 test run for this job, calculate average results for initiator 161 i_results = [x for x in job_result_files if i in x] 162 163 separate_stats = [] 164 for r in i_results: 165 stats = self.read_json_stats(os.path.join(results_dir, r)) 166 separate_stats.append(stats) 167 self.log_print(stats) 168 169 z = [sum(c) for c in zip(*separate_stats)] 170 z = [c/len(separate_stats) for c in z] 171 inits_avg_results.append(z) 172 173 self.log_print("\tAverage results for initiator %s" % i) 174 self.log_print(z) 175 176 # Sum average results of all initiators running this FIO job 177 self.log_print("\tTotal results for %s from all initiators" % fio_config) 178 for a in inits_avg_results: 179 self.log_print(a) 180 total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)] 181 rows.add(",".join([job_name, *total])) 182 183 # Save results to file 184 for row in rows: 185 with open(os.path.join(results_dir, csv_file), "a") as fh: 186 fh.write(row + "\n") 187 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 188 189 def measure_sar(self, results_dir, sar_file_name): 190 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 191 time.sleep(self.sar_delay) 192 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 193 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 194 for line in out.split("\n"): 195 if "Average" in line and "CPU" in line: 196 self.log_print("Summary CPU utilization from SAR:") 197 self.log_print(line) 198 if "Average" in line and "all" in line: 199 self.log_print(line) 200 fh.write(out) 201 202 def measure_pcm_memory(self, results_dir, pcm_file_name): 203 time.sleep(self.pcm_delay) 204 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 205 results_dir, pcm_file_name), shell=True) 206 time.sleep(self.pcm_count) 207 pcm_memory.kill() 208 209 def measure_pcm(self, results_dir, pcm_file_name): 210 time.sleep(self.pcm_delay) 211 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 212 results_dir, pcm_file_name), shell=True, check=True) 213 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 214 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 215 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 216 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 217 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 218 219 220class Initiator(Server): 221 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 222 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 223 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 224 225 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 226 227 self.ip = ip 228 self.spdk_dir = workspace 229 self.fio_bin = fio_bin 230 self.cpus_allowed = cpus_allowed 231 self.cpus_allowed_policy = cpus_allowed_policy 232 self.cpu_frequency = cpu_frequency 233 self.nvmecli_bin = nvmecli_bin 234 self.ssh_connection = paramiko.SSHClient() 235 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 236 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 237 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 238 self.remote_call("mkdir -p %s" % self.spdk_dir) 239 self.set_cpu_frequency() 240 241 def __del__(self): 242 self.ssh_connection.close() 243 244 def put_file(self, local, remote_dest): 245 ftp = self.ssh_connection.open_sftp() 246 ftp.put(local, remote_dest) 247 ftp.close() 248 249 def get_file(self, remote, local_dest): 250 ftp = self.ssh_connection.open_sftp() 251 ftp.get(remote, local_dest) 252 ftp.close() 253 254 def remote_call(self, cmd): 255 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 256 out = stdout.read().decode(encoding="utf-8") 257 err = stderr.read().decode(encoding="utf-8") 258 return out, err 259 260 def copy_result_files(self, dest_dir): 261 self.log_print("Copying results") 262 263 if not os.path.exists(dest_dir): 264 os.mkdir(dest_dir) 265 266 # Get list of result files from initiator and copy them back to target 267 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 268 file_list = stdout.strip().split("\n") 269 270 for file in file_list: 271 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 272 os.path.join(dest_dir, file)) 273 self.log_print("Done copying results") 274 275 def discover_subsystems(self, address_list, subsys_no): 276 num_nvmes = range(0, subsys_no) 277 nvme_discover_output = "" 278 for ip, subsys_no in itertools.product(address_list, num_nvmes): 279 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 280 nvme_discover_cmd = ["sudo", 281 "%s" % self.nvmecli_bin, 282 "discover", "-t %s" % self.transport, 283 "-s %s" % (4420 + subsys_no), 284 "-a %s" % ip] 285 nvme_discover_cmd = " ".join(nvme_discover_cmd) 286 287 stdout, stderr = self.remote_call(nvme_discover_cmd) 288 if stdout: 289 nvme_discover_output = nvme_discover_output + stdout 290 291 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 292 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 293 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 294 nvme_discover_output) # from nvme discovery output 295 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 296 subsystems = list(set(subsystems)) 297 subsystems.sort(key=lambda x: x[1]) 298 self.log_print("Found matching subsystems on target side:") 299 for s in subsystems: 300 self.log_print(s) 301 302 return subsystems 303 304 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 305 fio_conf_template = """ 306[global] 307ioengine={ioengine} 308{spdk_conf} 309thread=1 310group_reporting=1 311direct=1 312 313norandommap=1 314rw={rw} 315rwmixread={rwmixread} 316bs={block_size} 317iodepth={io_depth} 318time_based=1 319ramp_time={ramp_time} 320runtime={run_time} 321""" 322 if "spdk" in self.mode: 323 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 324 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 325 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 326 ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir 327 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 328 else: 329 ioengine = "libaio" 330 spdk_conf = "" 331 out, err = self.remote_call("lsblk -o NAME -nlp") 332 subsystems = [x for x in out.split("\n") if "nvme" in x] 333 334 if self.cpus_allowed is not None: 335 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 336 cpus_num = 0 337 cpus = self.cpus_allowed.split(",") 338 for cpu in cpus: 339 if "-" in cpu: 340 a, b = cpu.split("-") 341 cpus_num += len(range(a, b)) 342 else: 343 cpus_num += 1 344 threads = range(0, cpus_num) 345 elif hasattr(self, 'num_cores'): 346 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 347 threads = range(0, int(self.num_cores)) 348 else: 349 threads = range(0, len(subsystems)) 350 351 if "spdk" in self.mode: 352 filename_section = self.gen_fio_filename_conf(subsystems, threads) 353 else: 354 filename_section = self.gen_fio_filename_conf(threads) 355 356 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 357 rw=rw, rwmixread=rwmixread, block_size=block_size, 358 io_depth=io_depth, ramp_time=ramp_time, run_time=run_time) 359 if num_jobs: 360 fio_config = fio_config + "numjobs=%s \n" % num_jobs 361 if self.cpus_allowed is not None: 362 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 363 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 364 fio_config = fio_config + filename_section 365 366 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 367 if hasattr(self, "num_cores"): 368 fio_config_filename += "_%sCPU" % self.num_cores 369 fio_config_filename += ".fio" 370 371 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 372 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 373 self.log_print("Created FIO Config:") 374 self.log_print(fio_config) 375 376 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 377 378 def set_cpu_frequency(self): 379 if self.cpu_frequency is not None: 380 try: 381 self.remote_call('sudo cpupower frequency-set -g userspace') 382 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 383 except Exception: 384 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 385 sys.exit() 386 else: 387 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 388 389 def run_fio(self, fio_config_file, run_num=None): 390 job_name, _ = os.path.splitext(fio_config_file) 391 self.log_print("Starting FIO run for job: %s" % job_name) 392 self.log_print("Using FIO: %s" % self.fio_bin) 393 394 if run_num: 395 for i in range(1, run_num + 1): 396 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 397 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 398 output, error = self.remote_call(cmd) 399 self.log_print(output) 400 self.log_print(error) 401 else: 402 output_filename = job_name + "_" + self.name + ".json" 403 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 404 output, error = self.remote_call(cmd) 405 self.log_print(output) 406 self.log_print(error) 407 self.log_print("FIO run finished. Results in: %s" % output_filename) 408 409 410class KernelTarget(Target): 411 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 412 use_null_block=False, sar_settings=None, pcm_settings=None, 413 nvmet_bin="nvmetcli", **kwargs): 414 415 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 416 use_null_block, sar_settings, pcm_settings) 417 self.nvmet_bin = nvmet_bin 418 419 def __del__(self): 420 nvmet_command(self.nvmet_bin, "clear") 421 422 def kernel_tgt_gen_nullblock_conf(self, address): 423 nvmet_cfg = { 424 "ports": [], 425 "hosts": [], 426 "subsystems": [], 427 } 428 429 nvmet_cfg["subsystems"].append({ 430 "allowed_hosts": [], 431 "attr": { 432 "allow_any_host": "1", 433 "version": "1.3" 434 }, 435 "namespaces": [ 436 { 437 "device": { 438 "path": "/dev/nullb0", 439 "uuid": "%s" % uuid.uuid4() 440 }, 441 "enable": 1, 442 "nsid": 1 443 } 444 ], 445 "nqn": "nqn.2018-09.io.spdk:cnode1" 446 }) 447 448 nvmet_cfg["ports"].append({ 449 "addr": { 450 "adrfam": "ipv4", 451 "traddr": address, 452 "trsvcid": "4420", 453 "trtype": "%s" % self.transport, 454 }, 455 "portid": 1, 456 "referrals": [], 457 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 458 }) 459 with open("kernel.conf", 'w') as fh: 460 fh.write(json.dumps(nvmet_cfg, indent=2)) 461 462 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 463 464 nvmet_cfg = { 465 "ports": [], 466 "hosts": [], 467 "subsystems": [], 468 } 469 470 # Split disks between NIC IP's 471 disks_per_ip = int(len(nvme_list) / len(address_list)) 472 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 473 474 subsys_no = 1 475 port_no = 0 476 for ip, chunk in zip(address_list, disk_chunks): 477 for disk in chunk: 478 nvmet_cfg["subsystems"].append({ 479 "allowed_hosts": [], 480 "attr": { 481 "allow_any_host": "1", 482 "version": "1.3" 483 }, 484 "namespaces": [ 485 { 486 "device": { 487 "path": disk, 488 "uuid": "%s" % uuid.uuid4() 489 }, 490 "enable": 1, 491 "nsid": subsys_no 492 } 493 ], 494 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 495 }) 496 497 nvmet_cfg["ports"].append({ 498 "addr": { 499 "adrfam": "ipv4", 500 "traddr": ip, 501 "trsvcid": "%s" % (4420 + port_no), 502 "trtype": "%s" % self.transport 503 }, 504 "portid": subsys_no, 505 "referrals": [], 506 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 507 }) 508 subsys_no += 1 509 port_no += 1 510 511 with open("kernel.conf", "w") as fh: 512 fh.write(json.dumps(nvmet_cfg, indent=2)) 513 pass 514 515 def tgt_start(self): 516 self.log_print("Configuring kernel NVMeOF Target") 517 518 if self.null_block: 519 print("Configuring with null block device.") 520 if len(self.nic_ips) > 1: 521 print("Testing with null block limited to single RDMA NIC.") 522 print("Please specify only 1 IP address.") 523 exit(1) 524 self.subsys_no = 1 525 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 526 else: 527 print("Configuring with NVMe drives.") 528 nvme_list = get_nvme_devices() 529 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 530 self.subsys_no = len(nvme_list) 531 532 nvmet_command(self.nvmet_bin, "clear") 533 nvmet_command(self.nvmet_bin, "restore kernel.conf") 534 self.log_print("Done configuring kernel NVMeOF Target") 535 536 537class SPDKTarget(Target): 538 539 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 540 use_null_block=False, sar_settings=None, pcm_settings=None, 541 num_shared_buffers=4096, num_cores=1, **kwargs): 542 543 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 544 use_null_block, sar_settings, pcm_settings) 545 self.num_cores = num_cores 546 self.num_shared_buffers = num_shared_buffers 547 548 def spdk_tgt_configure(self): 549 self.log_print("Configuring SPDK NVMeOF target via RPC") 550 numa_list = get_used_numa_nodes() 551 552 # Create RDMA transport layer 553 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 554 self.log_print("SPDK NVMeOF transport layer:") 555 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 556 557 if self.null_block: 558 nvme_section = self.spdk_tgt_add_nullblock() 559 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 560 else: 561 nvme_section = self.spdk_tgt_add_nvme_conf() 562 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 563 self.log_print("Done configuring SPDK NVMeOF Target") 564 565 def spdk_tgt_add_nullblock(self): 566 self.log_print("Adding null block bdev to config via RPC") 567 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1") 568 self.log_print("SPDK Bdevs configuration:") 569 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 570 571 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 572 self.log_print("Adding NVMe bdevs to config via RPC") 573 574 bdfs = get_nvme_devices_bdf() 575 bdfs = [b.replace(":", ".") for b in bdfs] 576 577 if req_num_disks: 578 if req_num_disks > len(bdfs): 579 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 580 sys.exit(1) 581 else: 582 bdfs = bdfs[0:req_num_disks] 583 584 for i, bdf in enumerate(bdfs): 585 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 586 587 self.log_print("SPDK Bdevs configuration:") 588 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 589 590 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 591 self.log_print("Adding subsystems to config") 592 if not req_num_disks: 593 req_num_disks = get_nvme_devices_count() 594 595 # Distribute bdevs between provided NICs 596 num_disks = range(1, req_num_disks + 1) 597 disks_per_ip = int(len(num_disks) / len(ips)) 598 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 599 600 # Create subsystems, add bdevs to namespaces, add listeners 601 for ip, chunk in zip(ips, disk_chunks): 602 for c in chunk: 603 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 604 serial = "SPDK00%s" % c 605 bdev_name = "Nvme%sn1" % (c - 1) 606 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 607 allow_any_host=True, max_namespaces=8) 608 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 609 610 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 611 trtype=self.transport, 612 traddr=ip, 613 trsvcid="4420", 614 adrfam="ipv4") 615 616 self.log_print("SPDK NVMeOF subsystem configuration:") 617 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 618 619 def tgt_start(self): 620 self.subsys_no = get_nvme_devices_count() 621 self.log_print("Starting SPDK NVMeOF Target process") 622 nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt") 623 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 624 proc = subprocess.Popen(command, shell=True) 625 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 626 627 with open(self.pid, "w") as fh: 628 fh.write(str(proc.pid)) 629 self.nvmf_proc = proc 630 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 631 self.log_print("Waiting for spdk to initilize...") 632 while True: 633 if os.path.exists("/var/tmp/spdk.sock"): 634 break 635 time.sleep(1) 636 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 637 638 self.spdk_tgt_configure() 639 640 def __del__(self): 641 if hasattr(self, "nvmf_proc"): 642 try: 643 self.nvmf_proc.terminate() 644 self.nvmf_proc.wait() 645 except Exception as e: 646 self.log_print(e) 647 self.nvmf_proc.kill() 648 self.nvmf_proc.communicate() 649 650 651class KernelInitiator(Initiator): 652 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 653 cpus_allowed=None, cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio", **kwargs): 654 655 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 656 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 657 fio_bin=fio_bin) 658 659 self.extra_params = "" 660 if kwargs["extra_params"]: 661 self.extra_params = kwargs["extra_params"] 662 663 def __del__(self): 664 self.ssh_connection.close() 665 666 def kernel_init_connect(self, address_list, subsys_no): 667 subsystems = self.discover_subsystems(address_list, subsys_no) 668 self.log_print("Below connection attempts may result in error messages, this is expected!") 669 for subsystem in subsystems: 670 self.log_print("Trying to connect %s %s %s" % subsystem) 671 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 672 self.transport, 673 *subsystem, 674 self.extra_params)) 675 time.sleep(2) 676 677 def kernel_init_disconnect(self, address_list, subsys_no): 678 subsystems = self.discover_subsystems(address_list, subsys_no) 679 for subsystem in subsystems: 680 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 681 time.sleep(1) 682 683 def gen_fio_filename_conf(self, threads): 684 out, err = self.remote_call("lsblk -o NAME -nlp") 685 nvme_list = [x for x in out.split("\n") if "nvme" in x] 686 687 filename_section = "" 688 filenames = ["nvme%sn1" % x for x in range(0, len(nvme_list))] 689 nvme_per_split = int(len(nvme_list) / threads) 690 remainder = len(nvme_list) % threads 691 iterator = iter(filenames) 692 result = [] 693 for i in range(threads): 694 result.append([]) 695 for j in range(nvme_per_split): 696 result[i].append(next(iterator)) 697 if remainder: 698 result[i].append(next(iterator)) 699 remainder -= 1 700 for i, r in enumerate(result): 701 header = "[filename%s]" % i 702 disks = "\n".join(["filename=/dev/%s" % x for x in r]) 703 filename_section = "\n".join([filename_section, header, disks]) 704 705 return filename_section 706 707 708class SPDKInitiator(Initiator): 709 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 710 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 711 fio_bin="/usr/src/fio/fio", **kwargs): 712 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 713 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 714 fio_bin=fio_bin) 715 716 self.num_cores = num_cores 717 718 def install_spdk(self, local_spdk_zip): 719 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 720 self.log_print("Copied sources zip from target") 721 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 722 723 self.log_print("Sources unpacked") 724 self.log_print("Using fio binary %s" % self.fio_bin) 725 self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;" 726 "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 727 728 self.log_print("SPDK built") 729 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 730 731 def gen_spdk_bdev_conf(self, remote_subsystem_list): 732 header = "[Nvme]" 733 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 734 735 bdev_rows = [row_template.format(transport=self.transport, 736 svc=x[0], 737 nqn=x[1], 738 ip=x[2], 739 i=i) for i, x in enumerate(remote_subsystem_list)] 740 bdev_rows = "\n".join(bdev_rows) 741 bdev_section = "\n".join([header, bdev_rows]) 742 return bdev_section 743 744 def gen_fio_filename_conf(self, subsystems, threads): 745 filename_section = "" 746 filenames = ["Nvme%sn1" % x for x in range(0, subsystems)] 747 nvme_per_split = int(subsystems / threads) 748 remainder = subsystems % threads 749 iterator = iter(filenames) 750 result = [] 751 for i in range(threads): 752 result.append([]) 753 for j in range(nvme_per_split): 754 result[i].append(next(iterator)) 755 if remainder: 756 result[i].append(next(iterator)) 757 remainder -= 1 758 for i, r in enumerate(result): 759 header = "[filename%s]" % i 760 disks = "\n".join(["filename=%s" % x for x in r]) 761 filename_section = "\n".join([filename_section, header, disks]) 762 763 return filename_section 764 765 766if __name__ == "__main__": 767 spdk_zip_path = "/tmp/spdk.zip" 768 target_results_dir = "/tmp/results" 769 770 if (len(sys.argv) > 1): 771 config_file_path = sys.argv[1] 772 else: 773 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 774 config_file_path = os.path.join(script_full_dir, "config.json") 775 776 print("Using config file: %s" % config_file_path) 777 with open(config_file_path, "r") as config: 778 data = json.load(config) 779 780 initiators = [] 781 fio_cases = [] 782 783 for k, v in data.items(): 784 if "target" in k: 785 if data[k]["mode"] == "spdk": 786 target_obj = SPDKTarget(name=k, **data["general"], **v) 787 elif data[k]["mode"] == "kernel": 788 target_obj = KernelTarget(name=k, **data["general"], **v) 789 elif "initiator" in k: 790 if data[k]["mode"] == "spdk": 791 init_obj = SPDKInitiator(name=k, **data["general"], **v) 792 elif data[k]["mode"] == "kernel": 793 init_obj = KernelInitiator(name=k, **data["general"], **v) 794 initiators.append(init_obj) 795 elif "fio" in k: 796 fio_workloads = itertools.product(data[k]["bs"], 797 data[k]["qd"], 798 data[k]["rw"]) 799 800 fio_run_time = data[k]["run_time"] 801 fio_ramp_time = data[k]["ramp_time"] 802 fio_rw_mix_read = data[k]["rwmixread"] 803 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 804 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 805 else: 806 continue 807 808 # Copy and install SPDK on remote initiators 809 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 810 threads = [] 811 for i in initiators: 812 if i.mode == "spdk": 813 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 814 threads.append(t) 815 t.start() 816 for t in threads: 817 t.join() 818 819 target_obj.tgt_start() 820 821 # Poor mans threading 822 # Run FIO tests 823 for block_size, io_depth, rw in fio_workloads: 824 threads = [] 825 configs = [] 826 for i in initiators: 827 if i.mode == "kernel": 828 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 829 830 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 831 fio_num_jobs, fio_ramp_time, fio_run_time) 832 configs.append(cfg) 833 834 for i, cfg in zip(initiators, configs): 835 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 836 threads.append(t) 837 if target_obj.enable_sar: 838 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 839 sar_file_name = ".".join([sar_file_name, "txt"]) 840 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 841 threads.append(t) 842 843 if target_obj.enable_pcm: 844 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 845 pcm_file_name = ".".join([pcm_file_name, "csv"]) 846 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 847 threads.append(t) 848 849 if target_obj.enable_pcm_memory: 850 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 851 pcm_file_name = ".".join([pcm_file_name, "csv"]) 852 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 853 threads.append(t) 854 855 for t in threads: 856 t.start() 857 for t in threads: 858 t.join() 859 860 for i in initiators: 861 if i.mode == "kernel": 862 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 863 i.copy_result_files(target_results_dir) 864 865 target_obj.parse_results(target_results_dir) 866