1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from collections import OrderedDict 18from common import * 19 20 21class Server: 22 def __init__(self, name, username, password, mode, nic_ips, transport): 23 self.name = name 24 self.mode = mode 25 self.username = username 26 self.password = password 27 self.nic_ips = nic_ips 28 self.transport = transport.lower() 29 30 if not re.match("^[A-Za-z0-9]*$", name): 31 self.log_print("Please use a name which contains only letters or numbers") 32 sys.exit(1) 33 34 def log_print(self, msg): 35 print("[%s] %s" % (self.name, msg), flush=True) 36 37 38class Target(Server): 39 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 40 null_block_devices=0, sar_settings=None, pcm_settings=None, 41 bandwidth_settings=None, dpdk_settings=None): 42 43 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 44 self.null_block = null_block_devices 45 self.enable_sar = False 46 self.enable_pcm_memory = False 47 self.enable_pcm = False 48 self.enable_bandwidth = False 49 self.enable_dpdk_memory = False 50 51 if sar_settings: 52 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 53 54 if pcm_settings: 55 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 56 57 if bandwidth_settings: 58 self.enable_bandwidth, self.bandwidth_count = bandwidth_settings 59 60 if dpdk_settings: 61 self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings 62 63 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 64 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 65 66 def zip_spdk_sources(self, spdk_dir, dest_file): 67 self.log_print("Zipping SPDK source directory") 68 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 69 for root, directories, files in os.walk(spdk_dir, followlinks=True): 70 for file in files: 71 fh.write(os.path.relpath(os.path.join(root, file))) 72 fh.close() 73 self.log_print("Done zipping") 74 75 def read_json_stats(self, file): 76 with open(file, "r") as json_data: 77 data = json.load(json_data) 78 job_pos = 0 # job_post = 0 because using aggregated results 79 80 # Check if latency is in nano or microseconds to choose correct dict key 81 def get_lat_unit(key_prefix, dict_section): 82 # key prefix - lat, clat or slat. 83 # dict section - portion of json containing latency bucket in question 84 # Return dict key to access the bucket and unit as string 85 for k, v in dict_section.items(): 86 if k.startswith(key_prefix): 87 return k, k.split("_")[1] 88 89 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 90 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 91 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 92 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 93 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 94 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 95 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 96 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"]) 97 read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"]) 98 read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"]) 99 read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"]) 100 101 if "ns" in lat_unit: 102 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 103 if "ns" in clat_unit: 104 read_p99_lat = read_p99_lat / 1000 105 read_p99_9_lat = read_p99_9_lat / 1000 106 read_p99_99_lat = read_p99_99_lat / 1000 107 read_p99_999_lat = read_p99_999_lat / 1000 108 109 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 110 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 111 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 112 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 113 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 114 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 115 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 116 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"]) 117 write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"]) 118 write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"]) 119 write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"]) 120 121 if "ns" in lat_unit: 122 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 123 if "ns" in clat_unit: 124 write_p99_lat = write_p99_lat / 1000 125 write_p99_9_lat = write_p99_9_lat / 1000 126 write_p99_99_lat = write_p99_99_lat / 1000 127 write_p99_999_lat = write_p99_999_lat / 1000 128 129 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 130 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 131 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 132 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 133 134 def parse_results(self, results_dir, initiator_count=None, run_num=None): 135 files = os.listdir(results_dir) 136 fio_files = filter(lambda x: ".fio" in x, files) 137 json_files = [x for x in files if ".json" in x] 138 139 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 140 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 141 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 142 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 143 144 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 145 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 146 147 header_line = ",".join(["Name", *headers]) 148 aggr_header_line = ",".join(["Name", *aggr_headers]) 149 150 # Create empty results file 151 csv_file = "nvmf_results.csv" 152 with open(os.path.join(results_dir, csv_file), "w") as fh: 153 fh.write(aggr_header_line + "\n") 154 rows = set() 155 156 for fio_config in fio_files: 157 self.log_print("Getting FIO stats for %s" % fio_config) 158 job_name, _ = os.path.splitext(fio_config) 159 160 # Look in the filename for rwmixread value. Function arguments do 161 # not have that information. 162 # TODO: Improve this function by directly using workload params instead 163 # of regexing through filenames. 164 if "read" in job_name: 165 rw_mixread = 1 166 elif "write" in job_name: 167 rw_mixread = 0 168 else: 169 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 170 171 # If "_CPU" exists in name - ignore it 172 # Initiators for the same job could have diffrent num_cores parameter 173 job_name = re.sub(r"_\d+CPU", "", job_name) 174 job_result_files = [x for x in json_files if job_name in x] 175 self.log_print("Matching result files for current fio config:") 176 for j in job_result_files: 177 self.log_print("\t %s" % j) 178 179 # There may have been more than 1 initiator used in test, need to check that 180 # Result files are created so that string after last "_" separator is server name 181 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 182 inits_avg_results = [] 183 for i in inits_names: 184 self.log_print("\tGetting stats for initiator %s" % i) 185 # There may have been more than 1 test run for this job, calculate average results for initiator 186 i_results = [x for x in job_result_files if i in x] 187 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 188 189 separate_stats = [] 190 for r in i_results: 191 stats = self.read_json_stats(os.path.join(results_dir, r)) 192 separate_stats.append(stats) 193 self.log_print(stats) 194 195 init_results = [sum(x) for x in zip(*separate_stats)] 196 init_results = [x / len(separate_stats) for x in init_results] 197 inits_avg_results.append(init_results) 198 199 self.log_print("\tAverage results for initiator %s" % i) 200 self.log_print(init_results) 201 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 202 fh.write(header_line + "\n") 203 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 204 205 # Sum results of all initiators running this FIO job. 206 # Latency results are an average of latencies from accros all initiators. 207 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 208 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 209 for key in inits_avg_results: 210 if "lat" in key: 211 inits_avg_results[key] /= len(inits_names) 212 213 # Aggregate separate read/write values into common labels 214 # Take rw_mixread into consideration for mixed read/write workloads. 215 aggregate_results = OrderedDict() 216 for h in aggr_headers: 217 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 218 if "lat" in h: 219 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 220 else: 221 _ = read_stat + write_stat 222 aggregate_results[h] = "{0:.3f}".format(_) 223 224 rows.add(",".join([job_name, *aggregate_results.values()])) 225 226 # Save results to file 227 for row in rows: 228 with open(os.path.join(results_dir, csv_file), "a") as fh: 229 fh.write(row + "\n") 230 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 231 232 def measure_sar(self, results_dir, sar_file_name): 233 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 234 time.sleep(self.sar_delay) 235 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 236 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 237 for line in out.split("\n"): 238 if "Average" in line and "CPU" in line: 239 self.log_print("Summary CPU utilization from SAR:") 240 self.log_print(line) 241 if "Average" in line and "all" in line: 242 self.log_print(line) 243 fh.write(out) 244 245 def measure_pcm_memory(self, results_dir, pcm_file_name): 246 time.sleep(self.pcm_delay) 247 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 248 results_dir, pcm_file_name), shell=True) 249 time.sleep(self.pcm_count) 250 pcm_memory.kill() 251 252 def measure_pcm(self, results_dir, pcm_file_name): 253 time.sleep(self.pcm_delay) 254 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 255 results_dir, pcm_file_name), shell=True, check=True) 256 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 257 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 258 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 259 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 260 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 261 262 def measure_bandwidth(self, results_dir, bandwidth_file_name): 263 bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name, 264 self.bandwidth_count), shell=True, check=True) 265 266 def measure_dpdk_memory(self, results_dir): 267 self.log_print("INFO: waiting to generate DPDK memory usage") 268 time.sleep(self.dpdk_wait_time) 269 self.log_print("INFO: generating DPDK memory usage") 270 rpc.env.env_dpdk_get_mem_stats 271 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 272 273 274class Initiator(Server): 275 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 276 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 277 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 278 279 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 280 281 self.ip = ip 282 self.spdk_dir = workspace 283 if os.getenv('SPDK_WORKSPACE'): 284 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 285 self.fio_bin = fio_bin 286 self.cpus_allowed = cpus_allowed 287 self.cpus_allowed_policy = cpus_allowed_policy 288 self.cpu_frequency = cpu_frequency 289 self.nvmecli_bin = nvmecli_bin 290 self.ssh_connection = paramiko.SSHClient() 291 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 292 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 293 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 294 self.remote_call("mkdir -p %s" % self.spdk_dir) 295 self.set_cpu_frequency() 296 297 def __del__(self): 298 self.ssh_connection.close() 299 300 def put_file(self, local, remote_dest): 301 ftp = self.ssh_connection.open_sftp() 302 ftp.put(local, remote_dest) 303 ftp.close() 304 305 def get_file(self, remote, local_dest): 306 ftp = self.ssh_connection.open_sftp() 307 ftp.get(remote, local_dest) 308 ftp.close() 309 310 def remote_call(self, cmd): 311 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 312 out = stdout.read().decode(encoding="utf-8") 313 err = stderr.read().decode(encoding="utf-8") 314 return out, err 315 316 def copy_result_files(self, dest_dir): 317 self.log_print("Copying results") 318 319 if not os.path.exists(dest_dir): 320 os.mkdir(dest_dir) 321 322 # Get list of result files from initiator and copy them back to target 323 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 324 file_list = stdout.strip().split("\n") 325 326 for file in file_list: 327 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 328 os.path.join(dest_dir, file)) 329 self.log_print("Done copying results") 330 331 def discover_subsystems(self, address_list, subsys_no): 332 num_nvmes = range(0, subsys_no) 333 nvme_discover_output = "" 334 for ip, subsys_no in itertools.product(address_list, num_nvmes): 335 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 336 nvme_discover_cmd = ["sudo", 337 "%s" % self.nvmecli_bin, 338 "discover", "-t %s" % self.transport, 339 "-s %s" % (4420 + subsys_no), 340 "-a %s" % ip] 341 nvme_discover_cmd = " ".join(nvme_discover_cmd) 342 343 stdout, stderr = self.remote_call(nvme_discover_cmd) 344 if stdout: 345 nvme_discover_output = nvme_discover_output + stdout 346 347 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 348 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 349 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 350 nvme_discover_output) # from nvme discovery output 351 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 352 subsystems = list(set(subsystems)) 353 subsystems.sort(key=lambda x: x[1]) 354 self.log_print("Found matching subsystems on target side:") 355 for s in subsystems: 356 self.log_print(s) 357 358 return subsystems 359 360 def gen_fio_filename_conf(self, *args, **kwargs): 361 # Logic implemented in SPDKInitiator and KernelInitiator classes 362 pass 363 364 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 365 fio_conf_template = """ 366[global] 367ioengine={ioengine} 368{spdk_conf} 369thread=1 370group_reporting=1 371direct=1 372percentile_list=50:90:99:99.5:99.9:99.99:99.999 373 374norandommap=1 375rw={rw} 376rwmixread={rwmixread} 377bs={block_size} 378time_based=1 379ramp_time={ramp_time} 380runtime={run_time} 381""" 382 if "spdk" in self.mode: 383 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 384 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 385 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 386 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 387 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 388 else: 389 ioengine = "libaio" 390 spdk_conf = "" 391 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 392 subsystems = [x for x in out.split("\n") if "nvme" in x] 393 394 if self.cpus_allowed is not None: 395 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 396 cpus_num = 0 397 cpus = self.cpus_allowed.split(",") 398 for cpu in cpus: 399 if "-" in cpu: 400 a, b = cpu.split("-") 401 a = int(a) 402 b = int(b) 403 cpus_num += len(range(a, b)) 404 else: 405 cpus_num += 1 406 threads = range(0, cpus_num) 407 elif hasattr(self, 'num_cores'): 408 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 409 threads = range(0, int(self.num_cores)) 410 else: 411 threads = range(0, len(subsystems)) 412 413 if "spdk" in self.mode: 414 filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs) 415 else: 416 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 417 418 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 419 rw=rw, rwmixread=rwmixread, block_size=block_size, 420 ramp_time=ramp_time, run_time=run_time) 421 if num_jobs: 422 fio_config = fio_config + "numjobs=%s \n" % num_jobs 423 if self.cpus_allowed is not None: 424 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 425 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 426 fio_config = fio_config + filename_section 427 428 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 429 if hasattr(self, "num_cores"): 430 fio_config_filename += "_%sCPU" % self.num_cores 431 fio_config_filename += ".fio" 432 433 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 434 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 435 self.log_print("Created FIO Config:") 436 self.log_print(fio_config) 437 438 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 439 440 def set_cpu_frequency(self): 441 if self.cpu_frequency is not None: 442 try: 443 self.remote_call('sudo cpupower frequency-set -g userspace') 444 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 445 cmd = "sudo cpupower frequency-info" 446 output, error = self.remote_call(cmd) 447 self.log_print(output) 448 self.log_print(error) 449 except Exception: 450 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 451 sys.exit() 452 else: 453 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 454 455 def run_fio(self, fio_config_file, run_num=None): 456 job_name, _ = os.path.splitext(fio_config_file) 457 self.log_print("Starting FIO run for job: %s" % job_name) 458 self.log_print("Using FIO: %s" % self.fio_bin) 459 460 if run_num: 461 for i in range(1, run_num + 1): 462 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 463 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 464 output, error = self.remote_call(cmd) 465 self.log_print(output) 466 self.log_print(error) 467 else: 468 output_filename = job_name + "_" + self.name + ".json" 469 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 470 output, error = self.remote_call(cmd) 471 self.log_print(output) 472 self.log_print(error) 473 self.log_print("FIO run finished. Results in: %s" % output_filename) 474 475 476class KernelTarget(Target): 477 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 478 null_block_devices=0, sar_settings=None, pcm_settings=None, 479 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs): 480 481 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 482 null_block_devices, sar_settings, pcm_settings, bandwidth_settings, 483 dpdk_settings) 484 self.nvmet_bin = nvmet_bin 485 486 def __del__(self): 487 nvmet_command(self.nvmet_bin, "clear") 488 489 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 490 491 nvmet_cfg = { 492 "ports": [], 493 "hosts": [], 494 "subsystems": [], 495 } 496 497 # Split disks between NIC IP's 498 disks_per_ip = int(len(nvme_list) / len(address_list)) 499 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 500 501 subsys_no = 1 502 port_no = 0 503 for ip, chunk in zip(address_list, disk_chunks): 504 for disk in chunk: 505 nvmet_cfg["subsystems"].append({ 506 "allowed_hosts": [], 507 "attr": { 508 "allow_any_host": "1", 509 "serial": "SPDK00%s" % subsys_no, 510 "version": "1.3" 511 }, 512 "namespaces": [ 513 { 514 "device": { 515 "path": disk, 516 "uuid": "%s" % uuid.uuid4() 517 }, 518 "enable": 1, 519 "nsid": subsys_no 520 } 521 ], 522 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 523 }) 524 525 nvmet_cfg["ports"].append({ 526 "addr": { 527 "adrfam": "ipv4", 528 "traddr": ip, 529 "trsvcid": "%s" % (4420 + port_no), 530 "trtype": "%s" % self.transport 531 }, 532 "portid": subsys_no, 533 "referrals": [], 534 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 535 }) 536 subsys_no += 1 537 port_no += 1 538 539 with open("kernel.conf", "w") as fh: 540 fh.write(json.dumps(nvmet_cfg, indent=2)) 541 pass 542 543 def tgt_start(self): 544 self.log_print("Configuring kernel NVMeOF Target") 545 546 if self.null_block: 547 print("Configuring with null block device.") 548 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 549 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 550 self.subsys_no = len(null_blk_list) 551 else: 552 print("Configuring with NVMe drives.") 553 nvme_list = get_nvme_devices() 554 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 555 self.subsys_no = len(nvme_list) 556 557 nvmet_command(self.nvmet_bin, "clear") 558 nvmet_command(self.nvmet_bin, "restore kernel.conf") 559 self.log_print("Done configuring kernel NVMeOF Target") 560 561 562class SPDKTarget(Target): 563 564 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 565 null_block_devices=0, null_block_dif_type=0, sar_settings=None, pcm_settings=None, 566 bandwidth_settings=None, dpdk_settings=None, num_shared_buffers=4096, 567 num_cores=1, dif_insert_strip=False, **kwargs): 568 569 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 570 null_block_devices, sar_settings, pcm_settings, bandwidth_settings, 571 dpdk_settings) 572 self.num_cores = num_cores 573 self.num_shared_buffers = num_shared_buffers 574 self.null_block_dif_type = null_block_dif_type 575 self.dif_insert_strip = dif_insert_strip 576 577 def spdk_tgt_configure(self): 578 self.log_print("Configuring SPDK NVMeOF target via RPC") 579 numa_list = get_used_numa_nodes() 580 581 # Create RDMA transport layer 582 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 583 num_shared_buffers=self.num_shared_buffers, 584 dif_insert_or_strip=self.dif_insert_strip) 585 self.log_print("SPDK NVMeOF transport layer:") 586 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 587 588 if self.null_block: 589 nvme_section = self.spdk_tgt_add_nullblock(self.null_block) 590 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 591 else: 592 nvme_section = self.spdk_tgt_add_nvme_conf() 593 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 594 self.log_print("Done configuring SPDK NVMeOF Target") 595 596 def spdk_tgt_add_nullblock(self, null_block_count): 597 md_size = 0 598 block_size = 4096 599 if self.null_block_dif_type != 0: 600 md_size = 128 601 602 self.log_print("Adding null block bdevices to config via RPC") 603 for i in range(null_block_count): 604 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 605 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 606 dif_type=self.null_block_dif_type, md_size=md_size) 607 self.log_print("SPDK Bdevs configuration:") 608 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 609 610 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 611 self.log_print("Adding NVMe bdevs to config via RPC") 612 613 bdfs = get_nvme_devices_bdf() 614 bdfs = [b.replace(":", ".") for b in bdfs] 615 616 if req_num_disks: 617 if req_num_disks > len(bdfs): 618 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 619 sys.exit(1) 620 else: 621 bdfs = bdfs[0:req_num_disks] 622 623 for i, bdf in enumerate(bdfs): 624 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 625 626 self.log_print("SPDK Bdevs configuration:") 627 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 628 629 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 630 self.log_print("Adding subsystems to config") 631 if not req_num_disks: 632 req_num_disks = get_nvme_devices_count() 633 634 # Distribute bdevs between provided NICs 635 num_disks = range(0, req_num_disks) 636 if len(num_disks) == 1: 637 disks_per_ip = 1 638 else: 639 disks_per_ip = int(len(num_disks) / len(ips)) 640 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 641 642 # Create subsystems, add bdevs to namespaces, add listeners 643 for ip, chunk in zip(ips, disk_chunks): 644 for c in chunk: 645 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 646 serial = "SPDK00%s" % c 647 bdev_name = "Nvme%sn1" % c 648 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 649 allow_any_host=True, max_namespaces=8) 650 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 651 652 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 653 trtype=self.transport, 654 traddr=ip, 655 trsvcid="4420", 656 adrfam="ipv4") 657 658 self.log_print("SPDK NVMeOF subsystem configuration:") 659 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 660 661 def tgt_start(self): 662 if self.null_block: 663 self.subsys_no = 1 664 else: 665 self.subsys_no = get_nvme_devices_count() 666 self.log_print("Starting SPDK NVMeOF Target process") 667 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 668 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 669 proc = subprocess.Popen(command, shell=True) 670 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 671 672 with open(self.pid, "w") as fh: 673 fh.write(str(proc.pid)) 674 self.nvmf_proc = proc 675 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 676 self.log_print("Waiting for spdk to initilize...") 677 while True: 678 if os.path.exists("/var/tmp/spdk.sock"): 679 break 680 time.sleep(1) 681 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 682 683 self.spdk_tgt_configure() 684 685 def __del__(self): 686 if hasattr(self, "nvmf_proc"): 687 try: 688 self.nvmf_proc.terminate() 689 self.nvmf_proc.wait() 690 except Exception as e: 691 self.log_print(e) 692 self.nvmf_proc.kill() 693 self.nvmf_proc.communicate() 694 695 696class KernelInitiator(Initiator): 697 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 698 cpus_allowed=None, cpus_allowed_policy="shared", 699 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 700 701 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 702 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 703 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 704 705 self.extra_params = "" 706 if kwargs["extra_params"]: 707 self.extra_params = kwargs["extra_params"] 708 709 def __del__(self): 710 self.ssh_connection.close() 711 712 def kernel_init_connect(self, address_list, subsys_no): 713 subsystems = self.discover_subsystems(address_list, subsys_no) 714 self.log_print("Below connection attempts may result in error messages, this is expected!") 715 for subsystem in subsystems: 716 self.log_print("Trying to connect %s %s %s" % subsystem) 717 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 718 self.transport, 719 *subsystem, 720 self.extra_params)) 721 time.sleep(2) 722 723 def kernel_init_disconnect(self, address_list, subsys_no): 724 subsystems = self.discover_subsystems(address_list, subsys_no) 725 for subsystem in subsystems: 726 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 727 time.sleep(1) 728 729 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 730 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 731 nvme_list = [x for x in out.split("\n") if "nvme" in x] 732 733 filename_section = "" 734 nvme_per_split = int(len(nvme_list) / len(threads)) 735 remainder = len(nvme_list) % len(threads) 736 iterator = iter(nvme_list) 737 result = [] 738 for i in range(len(threads)): 739 result.append([]) 740 for j in range(nvme_per_split): 741 result[i].append(next(iterator)) 742 if remainder: 743 result[i].append(next(iterator)) 744 remainder -= 1 745 for i, r in enumerate(result): 746 header = "[filename%s]" % i 747 disks = "\n".join(["filename=%s" % x for x in r]) 748 job_section_qd = round((io_depth * len(r)) / num_jobs) 749 if job_section_qd == 0: 750 job_section_qd = 1 751 iodepth = "iodepth=%s" % job_section_qd 752 filename_section = "\n".join([filename_section, header, disks, iodepth]) 753 754 return filename_section 755 756 757class SPDKInitiator(Initiator): 758 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 759 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 760 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 761 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 762 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 763 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 764 765 self.num_cores = num_cores 766 767 def install_spdk(self, local_spdk_zip): 768 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 769 self.log_print("Copied sources zip from target") 770 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 771 772 self.log_print("Sources unpacked") 773 self.log_print("Using fio binary %s" % self.fio_bin) 774 self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;" 775 "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 776 777 self.log_print("SPDK built") 778 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 779 780 def gen_spdk_bdev_conf(self, remote_subsystem_list): 781 bdev_cfg_section = { 782 "subsystems": [ 783 { 784 "subsystem": "bdev", 785 "config": [] 786 } 787 ] 788 } 789 790 for i, subsys in enumerate(remote_subsystem_list): 791 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 792 nvme_ctrl = { 793 "method": "bdev_nvme_attach_controller", 794 "params": { 795 "name": "Nvme{}".format(i), 796 "trtype": self.transport, 797 "traddr": sub_addr, 798 "trsvcid": sub_port, 799 "subnqn": sub_nqn, 800 "adrfam": "IPv4" 801 } 802 } 803 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 804 805 return json.dumps(bdev_cfg_section, indent=2) 806 807 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 808 filename_section = "" 809 if len(threads) >= len(subsystems): 810 threads = range(0, len(subsystems)) 811 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 812 nvme_per_split = int(len(subsystems) / len(threads)) 813 remainder = len(subsystems) % len(threads) 814 iterator = iter(filenames) 815 result = [] 816 for i in range(len(threads)): 817 result.append([]) 818 for j in range(nvme_per_split): 819 result[i].append(next(iterator)) 820 if remainder: 821 result[i].append(next(iterator)) 822 remainder -= 1 823 for i, r in enumerate(result): 824 header = "[filename%s]" % i 825 disks = "\n".join(["filename=%s" % x for x in r]) 826 job_section_qd = round((io_depth * len(r)) / num_jobs) 827 if job_section_qd == 0: 828 job_section_qd = 1 829 iodepth = "iodepth=%s" % job_section_qd 830 filename_section = "\n".join([filename_section, header, disks, iodepth]) 831 832 return filename_section 833 834 835if __name__ == "__main__": 836 spdk_zip_path = "/tmp/spdk.zip" 837 target_results_dir = "/tmp/results" 838 839 if (len(sys.argv) > 1): 840 config_file_path = sys.argv[1] 841 else: 842 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 843 config_file_path = os.path.join(script_full_dir, "config.json") 844 845 print("Using config file: %s" % config_file_path) 846 with open(config_file_path, "r") as config: 847 data = json.load(config) 848 849 initiators = [] 850 fio_cases = [] 851 852 for k, v in data.items(): 853 if "target" in k: 854 if data[k]["mode"] == "spdk": 855 target_obj = SPDKTarget(name=k, **data["general"], **v) 856 elif data[k]["mode"] == "kernel": 857 target_obj = KernelTarget(name=k, **data["general"], **v) 858 elif "initiator" in k: 859 if data[k]["mode"] == "spdk": 860 init_obj = SPDKInitiator(name=k, **data["general"], **v) 861 elif data[k]["mode"] == "kernel": 862 init_obj = KernelInitiator(name=k, **data["general"], **v) 863 initiators.append(init_obj) 864 elif "fio" in k: 865 fio_workloads = itertools.product(data[k]["bs"], 866 data[k]["qd"], 867 data[k]["rw"]) 868 869 fio_run_time = data[k]["run_time"] 870 fio_ramp_time = data[k]["ramp_time"] 871 fio_rw_mix_read = data[k]["rwmixread"] 872 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 873 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 874 else: 875 continue 876 877 # Copy and install SPDK on remote initiators 878 if "skip_spdk_install" not in data["general"]: 879 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 880 threads = [] 881 for i in initiators: 882 if i.mode == "spdk": 883 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 884 threads.append(t) 885 t.start() 886 for t in threads: 887 t.join() 888 889 target_obj.tgt_start() 890 891 # Poor mans threading 892 # Run FIO tests 893 for block_size, io_depth, rw in fio_workloads: 894 threads = [] 895 configs = [] 896 for i in initiators: 897 if i.mode == "kernel": 898 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 899 900 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 901 fio_num_jobs, fio_ramp_time, fio_run_time) 902 configs.append(cfg) 903 904 for i, cfg in zip(initiators, configs): 905 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 906 threads.append(t) 907 if target_obj.enable_sar: 908 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 909 sar_file_name = ".".join([sar_file_name, "txt"]) 910 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 911 threads.append(t) 912 913 if target_obj.enable_pcm: 914 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 915 pcm_file_name = ".".join([pcm_file_name, "csv"]) 916 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 917 threads.append(t) 918 919 if target_obj.enable_pcm_memory: 920 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 921 pcm_file_name = ".".join([pcm_file_name, "csv"]) 922 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 923 threads.append(t) 924 925 if target_obj.enable_bandwidth: 926 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 927 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 928 t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 929 threads.append(t) 930 931 if target_obj.enable_dpdk_memory: 932 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir)) 933 threads.append(t) 934 935 for t in threads: 936 t.start() 937 for t in threads: 938 t.join() 939 940 for i in initiators: 941 if i.mode == "kernel": 942 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 943 i.copy_result_files(target_results_dir) 944 945 target_obj.parse_results(target_results_dir) 946