1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from common import * 18 19 20class Server: 21 def __init__(self, name, username, password, mode, nic_ips, transport): 22 self.name = name 23 self.mode = mode 24 self.username = username 25 self.password = password 26 self.nic_ips = nic_ips 27 self.transport = transport.lower() 28 29 if not re.match("^[A-Za-z0-9]*$", name): 30 self.log_print("Please use a name which contains only letters or numbers") 31 sys.exit(1) 32 33 def log_print(self, msg): 34 print("[%s] %s" % (self.name, msg), flush=True) 35 36 37class Target(Server): 38 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 39 use_null_block=False, sar_settings=None, pcm_settings=None): 40 41 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 42 self.null_block = bool(use_null_block) 43 self.enable_sar = False 44 self.enable_pcm_memory = False 45 self.enable_pcm = False 46 47 if sar_settings: 48 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 49 50 if pcm_settings: 51 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 52 53 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 54 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 55 56 def zip_spdk_sources(self, spdk_dir, dest_file): 57 self.log_print("Zipping SPDK source directory") 58 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 59 for root, directories, files in os.walk(spdk_dir, followlinks=True): 60 for file in files: 61 fh.write(os.path.relpath(os.path.join(root, file))) 62 fh.close() 63 self.log_print("Done zipping") 64 65 def read_json_stats(self, file): 66 with open(file, "r") as json_data: 67 data = json.load(json_data) 68 job_pos = 0 # job_post = 0 because using aggregated results 69 70 # Check if latency is in nano or microseconds to choose correct dict key 71 def get_lat_unit(key_prefix, dict_section): 72 # key prefix - lat, clat or slat. 73 # dict section - portion of json containing latency bucket in question 74 # Return dict key to access the bucket and unit as string 75 for k, v in dict_section.items(): 76 if k.startswith(key_prefix): 77 return k, k.split("_")[1] 78 79 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 80 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 81 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 82 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 83 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 84 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 85 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 86 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 87 read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"]) 88 read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"]) 89 read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"]) 90 91 if "ns" in lat_unit: 92 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 93 if "ns" in clat_unit: 94 read_p99_lat = read_p99_lat / 1000 95 read_p99_9_lat = read_p99_9_lat / 1000 96 read_p99_99_lat = read_p99_99_lat / 1000 97 read_p99_999_lat = read_p99_999_lat / 1000 98 99 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 100 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 101 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 102 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 103 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 104 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 105 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 106 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 107 write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"]) 108 write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"]) 109 write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"]) 110 111 if "ns" in lat_unit: 112 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 113 if "ns" in clat_unit: 114 write_p99_lat = write_p99_lat / 1000 115 write_p99_9_lat = write_p99_9_lat / 1000 116 write_p99_99_lat = write_p99_99_lat / 1000 117 write_p99_999_lat = write_p99_999_lat / 1000 118 119 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 120 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 121 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 122 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 123 124 def parse_results(self, results_dir, initiator_count=None, run_num=None): 125 files = os.listdir(results_dir) 126 fio_files = filter(lambda x: ".fio" in x, files) 127 json_files = [x for x in files if ".json" in x] 128 129 # Create empty results file 130 csv_file = "nvmf_results.csv" 131 with open(os.path.join(results_dir, csv_file), "w") as fh: 132 header_line = ",".join(["Name", 133 "read_iops", "read_bw", "read_avg_lat_us", 134 "read_min_lat_us", "read_max_lat_us", "read_p99_lat_us", 135 "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 136 "write_iops", "write_bw", "write_avg_lat_us", 137 "write_min_lat_us", "write_max_lat_us", "write_p99_lat_us", 138 "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]) 139 fh.write(header_line + "\n") 140 rows = set() 141 142 for fio_config in fio_files: 143 self.log_print("Getting FIO stats for %s" % fio_config) 144 job_name, _ = os.path.splitext(fio_config) 145 146 # If "_CPU" exists in name - ignore it 147 # Initiators for the same job could have diffrent num_cores parameter 148 job_name = re.sub(r"_\d+CPU", "", job_name) 149 job_result_files = [x for x in json_files if job_name in x] 150 self.log_print("Matching result files for current fio config:") 151 for j in job_result_files: 152 self.log_print("\t %s" % j) 153 154 # There may have been more than 1 initiator used in test, need to check that 155 # Result files are created so that string after last "_" separator is server name 156 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 157 inits_avg_results = [] 158 for i in inits_names: 159 self.log_print("\tGetting stats for initiator %s" % i) 160 # There may have been more than 1 test run for this job, calculate average results for initiator 161 i_results = [x for x in job_result_files if i in x] 162 163 separate_stats = [] 164 for r in i_results: 165 stats = self.read_json_stats(os.path.join(results_dir, r)) 166 separate_stats.append(stats) 167 self.log_print(stats) 168 169 z = [sum(c) for c in zip(*separate_stats)] 170 z = [c/len(separate_stats) for c in z] 171 inits_avg_results.append(z) 172 173 self.log_print("\tAverage results for initiator %s" % i) 174 self.log_print(z) 175 176 # Sum average results of all initiators running this FIO job 177 self.log_print("\tTotal results for %s from all initiators" % fio_config) 178 for a in inits_avg_results: 179 self.log_print(a) 180 total = ["{0:.3f}".format(sum(c)) for c in zip(*inits_avg_results)] 181 rows.add(",".join([job_name, *total])) 182 183 # Save results to file 184 for row in rows: 185 with open(os.path.join(results_dir, csv_file), "a") as fh: 186 fh.write(row + "\n") 187 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 188 189 def measure_sar(self, results_dir, sar_file_name): 190 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 191 time.sleep(self.sar_delay) 192 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 193 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 194 for line in out.split("\n"): 195 if "Average" in line and "CPU" in line: 196 self.log_print("Summary CPU utilization from SAR:") 197 self.log_print(line) 198 if "Average" in line and "all" in line: 199 self.log_print(line) 200 fh.write(out) 201 202 def measure_pcm_memory(self, results_dir, pcm_file_name): 203 time.sleep(self.pcm_delay) 204 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 205 results_dir, pcm_file_name), shell=True) 206 time.sleep(self.pcm_count) 207 pcm_memory.kill() 208 209 def measure_pcm(self, results_dir, pcm_file_name): 210 time.sleep(self.pcm_delay) 211 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 212 results_dir, pcm_file_name), shell=True, check=True) 213 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 214 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 215 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 216 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 217 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 218 219 220class Initiator(Server): 221 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 222 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 223 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 224 225 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 226 227 self.ip = ip 228 self.spdk_dir = workspace 229 if os.getenv('SPDK_WORKSPACE'): 230 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 231 self.fio_bin = fio_bin 232 self.cpus_allowed = cpus_allowed 233 self.cpus_allowed_policy = cpus_allowed_policy 234 self.cpu_frequency = cpu_frequency 235 self.nvmecli_bin = nvmecli_bin 236 self.ssh_connection = paramiko.SSHClient() 237 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 238 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 239 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 240 self.remote_call("mkdir -p %s" % self.spdk_dir) 241 self.set_cpu_frequency() 242 243 def __del__(self): 244 self.ssh_connection.close() 245 246 def put_file(self, local, remote_dest): 247 ftp = self.ssh_connection.open_sftp() 248 ftp.put(local, remote_dest) 249 ftp.close() 250 251 def get_file(self, remote, local_dest): 252 ftp = self.ssh_connection.open_sftp() 253 ftp.get(remote, local_dest) 254 ftp.close() 255 256 def remote_call(self, cmd): 257 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 258 out = stdout.read().decode(encoding="utf-8") 259 err = stderr.read().decode(encoding="utf-8") 260 return out, err 261 262 def copy_result_files(self, dest_dir): 263 self.log_print("Copying results") 264 265 if not os.path.exists(dest_dir): 266 os.mkdir(dest_dir) 267 268 # Get list of result files from initiator and copy them back to target 269 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 270 file_list = stdout.strip().split("\n") 271 272 for file in file_list: 273 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 274 os.path.join(dest_dir, file)) 275 self.log_print("Done copying results") 276 277 def discover_subsystems(self, address_list, subsys_no): 278 num_nvmes = range(0, subsys_no) 279 nvme_discover_output = "" 280 for ip, subsys_no in itertools.product(address_list, num_nvmes): 281 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 282 nvme_discover_cmd = ["sudo", 283 "%s" % self.nvmecli_bin, 284 "discover", "-t %s" % self.transport, 285 "-s %s" % (4420 + subsys_no), 286 "-a %s" % ip] 287 nvme_discover_cmd = " ".join(nvme_discover_cmd) 288 289 stdout, stderr = self.remote_call(nvme_discover_cmd) 290 if stdout: 291 nvme_discover_output = nvme_discover_output + stdout 292 293 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 294 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 295 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 296 nvme_discover_output) # from nvme discovery output 297 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 298 subsystems = list(set(subsystems)) 299 subsystems.sort(key=lambda x: x[1]) 300 self.log_print("Found matching subsystems on target side:") 301 for s in subsystems: 302 self.log_print(s) 303 304 return subsystems 305 306 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 307 fio_conf_template = """ 308[global] 309ioengine={ioengine} 310{spdk_conf} 311thread=1 312group_reporting=1 313direct=1 314percentile_list=50:90:99:99.5:99.9:99.99:99.999 315 316norandommap=1 317rw={rw} 318rwmixread={rwmixread} 319bs={block_size} 320iodepth={io_depth} 321time_based=1 322ramp_time={ramp_time} 323runtime={run_time} 324""" 325 if "spdk" in self.mode: 326 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 327 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 328 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 329 ioengine = "%s/examples/bdev/fio_plugin/fio_plugin" % self.spdk_dir 330 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir 331 else: 332 ioengine = "libaio" 333 spdk_conf = "" 334 out, err = self.remote_call("lsblk -o NAME -nlp") 335 subsystems = [x for x in out.split("\n") if "nvme" in x] 336 337 if self.cpus_allowed is not None: 338 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 339 cpus_num = 0 340 cpus = self.cpus_allowed.split(",") 341 for cpu in cpus: 342 if "-" in cpu: 343 a, b = cpu.split("-") 344 a = int(a) 345 b = int(b) 346 cpus_num += len(range(a, b)) 347 else: 348 cpus_num += 1 349 threads = range(0, cpus_num) 350 elif hasattr(self, 'num_cores'): 351 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 352 threads = range(0, int(self.num_cores)) 353 else: 354 threads = range(0, len(subsystems)) 355 356 if "spdk" in self.mode: 357 filename_section = self.gen_fio_filename_conf(subsystems, threads) 358 else: 359 filename_section = self.gen_fio_filename_conf(threads) 360 361 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 362 rw=rw, rwmixread=rwmixread, block_size=block_size, 363 io_depth=io_depth, ramp_time=ramp_time, run_time=run_time) 364 if num_jobs: 365 fio_config = fio_config + "numjobs=%s \n" % num_jobs 366 if self.cpus_allowed is not None: 367 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 368 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 369 fio_config = fio_config + filename_section 370 371 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 372 if hasattr(self, "num_cores"): 373 fio_config_filename += "_%sCPU" % self.num_cores 374 fio_config_filename += ".fio" 375 376 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 377 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 378 self.log_print("Created FIO Config:") 379 self.log_print(fio_config) 380 381 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 382 383 def set_cpu_frequency(self): 384 if self.cpu_frequency is not None: 385 try: 386 self.remote_call('sudo cpupower frequency-set -g userspace') 387 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 388 cmd = "sudo cpupower frequency-info" 389 output, error = self.remote_call(cmd) 390 self.log_print(output) 391 self.log_print(error) 392 except Exception: 393 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 394 sys.exit() 395 else: 396 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 397 398 def run_fio(self, fio_config_file, run_num=None): 399 job_name, _ = os.path.splitext(fio_config_file) 400 self.log_print("Starting FIO run for job: %s" % job_name) 401 self.log_print("Using FIO: %s" % self.fio_bin) 402 403 if run_num: 404 for i in range(1, run_num + 1): 405 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 406 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 407 output, error = self.remote_call(cmd) 408 self.log_print(output) 409 self.log_print(error) 410 else: 411 output_filename = job_name + "_" + self.name + ".json" 412 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 413 output, error = self.remote_call(cmd) 414 self.log_print(output) 415 self.log_print(error) 416 self.log_print("FIO run finished. Results in: %s" % output_filename) 417 418 419class KernelTarget(Target): 420 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 421 use_null_block=False, sar_settings=None, pcm_settings=None, 422 nvmet_bin="nvmetcli", **kwargs): 423 424 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 425 use_null_block, sar_settings, pcm_settings) 426 self.nvmet_bin = nvmet_bin 427 428 def __del__(self): 429 nvmet_command(self.nvmet_bin, "clear") 430 431 def kernel_tgt_gen_nullblock_conf(self, address): 432 nvmet_cfg = { 433 "ports": [], 434 "hosts": [], 435 "subsystems": [], 436 } 437 438 nvmet_cfg["subsystems"].append({ 439 "allowed_hosts": [], 440 "attr": { 441 "allow_any_host": "1", 442 "version": "1.3" 443 }, 444 "namespaces": [ 445 { 446 "device": { 447 "path": "/dev/nullb0", 448 "uuid": "%s" % uuid.uuid4() 449 }, 450 "enable": 1, 451 "nsid": 1 452 } 453 ], 454 "nqn": "nqn.2018-09.io.spdk:cnode1" 455 }) 456 457 nvmet_cfg["ports"].append({ 458 "addr": { 459 "adrfam": "ipv4", 460 "traddr": address, 461 "trsvcid": "4420", 462 "trtype": "%s" % self.transport, 463 }, 464 "portid": 1, 465 "referrals": [], 466 "subsystems": ["nqn.2018-09.io.spdk:cnode1"] 467 }) 468 with open("kernel.conf", 'w') as fh: 469 fh.write(json.dumps(nvmet_cfg, indent=2)) 470 471 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 472 473 nvmet_cfg = { 474 "ports": [], 475 "hosts": [], 476 "subsystems": [], 477 } 478 479 # Split disks between NIC IP's 480 disks_per_ip = int(len(nvme_list) / len(address_list)) 481 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 482 483 subsys_no = 1 484 port_no = 0 485 for ip, chunk in zip(address_list, disk_chunks): 486 for disk in chunk: 487 nvmet_cfg["subsystems"].append({ 488 "allowed_hosts": [], 489 "attr": { 490 "allow_any_host": "1", 491 "version": "1.3" 492 }, 493 "namespaces": [ 494 { 495 "device": { 496 "path": disk, 497 "uuid": "%s" % uuid.uuid4() 498 }, 499 "enable": 1, 500 "nsid": subsys_no 501 } 502 ], 503 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 504 }) 505 506 nvmet_cfg["ports"].append({ 507 "addr": { 508 "adrfam": "ipv4", 509 "traddr": ip, 510 "trsvcid": "%s" % (4420 + port_no), 511 "trtype": "%s" % self.transport 512 }, 513 "portid": subsys_no, 514 "referrals": [], 515 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 516 }) 517 subsys_no += 1 518 port_no += 1 519 520 with open("kernel.conf", "w") as fh: 521 fh.write(json.dumps(nvmet_cfg, indent=2)) 522 pass 523 524 def tgt_start(self): 525 self.log_print("Configuring kernel NVMeOF Target") 526 527 if self.null_block: 528 print("Configuring with null block device.") 529 if len(self.nic_ips) > 1: 530 print("Testing with null block limited to single RDMA NIC.") 531 print("Please specify only 1 IP address.") 532 exit(1) 533 self.subsys_no = 1 534 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0]) 535 else: 536 print("Configuring with NVMe drives.") 537 nvme_list = get_nvme_devices() 538 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 539 self.subsys_no = len(nvme_list) 540 541 nvmet_command(self.nvmet_bin, "clear") 542 nvmet_command(self.nvmet_bin, "restore kernel.conf") 543 self.log_print("Done configuring kernel NVMeOF Target") 544 545 546class SPDKTarget(Target): 547 548 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 549 use_null_block=False, sar_settings=None, pcm_settings=None, 550 num_shared_buffers=4096, num_cores=1, **kwargs): 551 552 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 553 use_null_block, sar_settings, pcm_settings) 554 self.num_cores = num_cores 555 self.num_shared_buffers = num_shared_buffers 556 557 def spdk_tgt_configure(self): 558 self.log_print("Configuring SPDK NVMeOF target via RPC") 559 numa_list = get_used_numa_nodes() 560 561 # Create RDMA transport layer 562 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers) 563 self.log_print("SPDK NVMeOF transport layer:") 564 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 565 566 if self.null_block: 567 nvme_section = self.spdk_tgt_add_nullblock() 568 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1) 569 else: 570 nvme_section = self.spdk_tgt_add_nvme_conf() 571 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 572 self.log_print("Done configuring SPDK NVMeOF Target") 573 574 def spdk_tgt_add_nullblock(self): 575 self.log_print("Adding null block bdev to config via RPC") 576 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1") 577 self.log_print("SPDK Bdevs configuration:") 578 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 579 580 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 581 self.log_print("Adding NVMe bdevs to config via RPC") 582 583 bdfs = get_nvme_devices_bdf() 584 bdfs = [b.replace(":", ".") for b in bdfs] 585 586 if req_num_disks: 587 if req_num_disks > len(bdfs): 588 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 589 sys.exit(1) 590 else: 591 bdfs = bdfs[0:req_num_disks] 592 593 for i, bdf in enumerate(bdfs): 594 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 595 596 self.log_print("SPDK Bdevs configuration:") 597 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 598 599 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 600 self.log_print("Adding subsystems to config") 601 if not req_num_disks: 602 req_num_disks = get_nvme_devices_count() 603 604 # Distribute bdevs between provided NICs 605 num_disks = range(1, req_num_disks + 1) 606 disks_per_ip = int(len(num_disks) / len(ips)) 607 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 608 609 # Create subsystems, add bdevs to namespaces, add listeners 610 for ip, chunk in zip(ips, disk_chunks): 611 for c in chunk: 612 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 613 serial = "SPDK00%s" % c 614 bdev_name = "Nvme%sn1" % (c - 1) 615 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 616 allow_any_host=True, max_namespaces=8) 617 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 618 619 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 620 trtype=self.transport, 621 traddr=ip, 622 trsvcid="4420", 623 adrfam="ipv4") 624 625 self.log_print("SPDK NVMeOF subsystem configuration:") 626 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 627 628 def tgt_start(self): 629 self.subsys_no = get_nvme_devices_count() 630 self.log_print("Starting SPDK NVMeOF Target process") 631 nvmf_app_path = os.path.join(self.spdk_dir, "app/nvmf_tgt/nvmf_tgt") 632 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 633 proc = subprocess.Popen(command, shell=True) 634 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 635 636 with open(self.pid, "w") as fh: 637 fh.write(str(proc.pid)) 638 self.nvmf_proc = proc 639 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 640 self.log_print("Waiting for spdk to initilize...") 641 while True: 642 if os.path.exists("/var/tmp/spdk.sock"): 643 break 644 time.sleep(1) 645 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 646 647 self.spdk_tgt_configure() 648 649 def __del__(self): 650 if hasattr(self, "nvmf_proc"): 651 try: 652 self.nvmf_proc.terminate() 653 self.nvmf_proc.wait() 654 except Exception as e: 655 self.log_print(e) 656 self.nvmf_proc.kill() 657 self.nvmf_proc.communicate() 658 659 660class KernelInitiator(Initiator): 661 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 662 cpus_allowed=None, cpus_allowed_policy="shared", 663 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 664 665 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 666 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 667 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 668 669 self.extra_params = "" 670 if kwargs["extra_params"]: 671 self.extra_params = kwargs["extra_params"] 672 673 def __del__(self): 674 self.ssh_connection.close() 675 676 def kernel_init_connect(self, address_list, subsys_no): 677 subsystems = self.discover_subsystems(address_list, subsys_no) 678 self.log_print("Below connection attempts may result in error messages, this is expected!") 679 for subsystem in subsystems: 680 self.log_print("Trying to connect %s %s %s" % subsystem) 681 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 682 self.transport, 683 *subsystem, 684 self.extra_params)) 685 time.sleep(2) 686 687 def kernel_init_disconnect(self, address_list, subsys_no): 688 subsystems = self.discover_subsystems(address_list, subsys_no) 689 for subsystem in subsystems: 690 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 691 time.sleep(1) 692 693 def gen_fio_filename_conf(self, threads): 694 out, err = self.remote_call("lsblk -o NAME -nlp") 695 nvme_list = [x for x in out.split("\n") if "nvme" in x] 696 697 filename_section = "" 698 filenames = ["nvme%sn1" % x for x in range(0, len(nvme_list))] 699 nvme_per_split = int(len(nvme_list) / threads) 700 remainder = len(nvme_list) % threads 701 iterator = iter(filenames) 702 result = [] 703 for i in range(len(threads)): 704 result.append([]) 705 for j in range(nvme_per_split): 706 result[i].append(next(iterator)) 707 if remainder: 708 result[i].append(next(iterator)) 709 remainder -= 1 710 for i, r in enumerate(result): 711 header = "[filename%s]" % i 712 disks = "\n".join(["filename=/dev/%s" % x for x in r]) 713 filename_section = "\n".join([filename_section, header, disks]) 714 715 return filename_section 716 717 718class SPDKInitiator(Initiator): 719 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 720 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 721 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 722 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 723 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 724 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 725 726 self.num_cores = num_cores 727 728 def install_spdk(self, local_spdk_zip): 729 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 730 self.log_print("Copied sources zip from target") 731 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 732 733 self.log_print("Sources unpacked") 734 self.log_print("Using fio binary %s" % self.fio_bin) 735 self.remote_call("cd %s; git submodule update --init; ./configure --with-rdma --with-fio=%s;" 736 "make clean; make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 737 738 self.log_print("SPDK built") 739 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 740 741 def gen_spdk_bdev_conf(self, remote_subsystem_list): 742 header = "[Nvme]" 743 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}""" 744 745 bdev_rows = [row_template.format(transport=self.transport, 746 svc=x[0], 747 nqn=x[1], 748 ip=x[2], 749 i=i) for i, x in enumerate(remote_subsystem_list)] 750 bdev_rows = "\n".join(bdev_rows) 751 bdev_section = "\n".join([header, bdev_rows]) 752 return bdev_section 753 754 def gen_fio_filename_conf(self, subsystems, threads): 755 filename_section = "" 756 if len(threads) >= len(subsystems): 757 threads = range(0, len(subsystems)) 758 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 759 nvme_per_split = int(len(subsystems) / len(threads)) 760 remainder = len(subsystems) % len(threads) 761 iterator = iter(filenames) 762 result = [] 763 for i in range(len(threads)): 764 result.append([]) 765 for j in range(nvme_per_split): 766 result[i].append(next(iterator)) 767 if remainder: 768 result[i].append(next(iterator)) 769 remainder -= 1 770 for i, r in enumerate(result): 771 header = "[filename%s]" % i 772 disks = "\n".join(["filename=%s" % x for x in r]) 773 filename_section = "\n".join([filename_section, header, disks]) 774 775 return filename_section 776 777 778if __name__ == "__main__": 779 spdk_zip_path = "/tmp/spdk.zip" 780 target_results_dir = "/tmp/results" 781 782 if (len(sys.argv) > 1): 783 config_file_path = sys.argv[1] 784 else: 785 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 786 config_file_path = os.path.join(script_full_dir, "config.json") 787 788 print("Using config file: %s" % config_file_path) 789 with open(config_file_path, "r") as config: 790 data = json.load(config) 791 792 initiators = [] 793 fio_cases = [] 794 795 for k, v in data.items(): 796 if "target" in k: 797 if data[k]["mode"] == "spdk": 798 target_obj = SPDKTarget(name=k, **data["general"], **v) 799 elif data[k]["mode"] == "kernel": 800 target_obj = KernelTarget(name=k, **data["general"], **v) 801 elif "initiator" in k: 802 if data[k]["mode"] == "spdk": 803 init_obj = SPDKInitiator(name=k, **data["general"], **v) 804 elif data[k]["mode"] == "kernel": 805 init_obj = KernelInitiator(name=k, **data["general"], **v) 806 initiators.append(init_obj) 807 elif "fio" in k: 808 fio_workloads = itertools.product(data[k]["bs"], 809 data[k]["qd"], 810 data[k]["rw"]) 811 812 fio_run_time = data[k]["run_time"] 813 fio_ramp_time = data[k]["ramp_time"] 814 fio_rw_mix_read = data[k]["rwmixread"] 815 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 816 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 817 else: 818 continue 819 820 # Copy and install SPDK on remote initiators 821 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 822 threads = [] 823 for i in initiators: 824 if i.mode == "spdk": 825 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 826 threads.append(t) 827 t.start() 828 for t in threads: 829 t.join() 830 831 target_obj.tgt_start() 832 833 # Poor mans threading 834 # Run FIO tests 835 for block_size, io_depth, rw in fio_workloads: 836 threads = [] 837 configs = [] 838 for i in initiators: 839 if i.mode == "kernel": 840 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 841 842 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 843 fio_num_jobs, fio_ramp_time, fio_run_time) 844 configs.append(cfg) 845 846 for i, cfg in zip(initiators, configs): 847 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 848 threads.append(t) 849 if target_obj.enable_sar: 850 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 851 sar_file_name = ".".join([sar_file_name, "txt"]) 852 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 853 threads.append(t) 854 855 if target_obj.enable_pcm: 856 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 857 pcm_file_name = ".".join([pcm_file_name, "csv"]) 858 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 859 threads.append(t) 860 861 if target_obj.enable_pcm_memory: 862 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 863 pcm_file_name = ".".join([pcm_file_name, "csv"]) 864 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 865 threads.append(t) 866 867 for t in threads: 868 t.start() 869 for t in threads: 870 t.join() 871 872 for i in initiators: 873 if i.mode == "kernel": 874 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 875 i.copy_result_files(target_results_dir) 876 877 target_obj.parse_results(target_results_dir) 878