1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from collections import OrderedDict 18from common import * 19 20 21class Server: 22 def __init__(self, name, username, password, mode, nic_ips, transport): 23 self.name = name 24 self.mode = mode 25 self.username = username 26 self.password = password 27 self.nic_ips = nic_ips 28 self.transport = transport.lower() 29 30 if not re.match("^[A-Za-z0-9]*$", name): 31 self.log_print("Please use a name which contains only letters or numbers") 32 sys.exit(1) 33 34 def log_print(self, msg): 35 print("[%s] %s" % (self.name, msg), flush=True) 36 37 def get_uncommented_lines(self, lines): 38 return [l for l in lines if l and not l.startswith('#')] 39 40 41class Target(Server): 42 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 43 null_block_devices=0, sar_settings=None, pcm_settings=None, 44 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None, 45 scheduler_settings="static"): 46 47 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 48 self.null_block = null_block_devices 49 self.enable_sar = False 50 self.enable_pcm_power = False 51 self.enable_pcm_memory = False 52 self.enable_pcm = False 53 self.enable_bandwidth = False 54 self.enable_dpdk_memory = False 55 self.enable_zcopy = False 56 self.scheduler_name = scheduler_settings 57 58 if sar_settings: 59 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 60 61 if pcm_settings: 62 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.enable_pcm_power,\ 63 self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 64 65 if bandwidth_settings: 66 self.enable_bandwidth, self.bandwidth_count = bandwidth_settings 67 68 if dpdk_settings: 69 self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings 70 71 if zcopy_settings: 72 self.enable_zcopy = zcopy_settings 73 74 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 75 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 76 self.sys_config() 77 78 def zip_spdk_sources(self, spdk_dir, dest_file): 79 self.log_print("Zipping SPDK source directory") 80 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 81 for root, directories, files in os.walk(spdk_dir, followlinks=True): 82 for file in files: 83 fh.write(os.path.relpath(os.path.join(root, file))) 84 fh.close() 85 self.log_print("Done zipping") 86 87 def read_json_stats(self, file): 88 with open(file, "r") as json_data: 89 data = json.load(json_data) 90 job_pos = 0 # job_post = 0 because using aggregated results 91 92 # Check if latency is in nano or microseconds to choose correct dict key 93 def get_lat_unit(key_prefix, dict_section): 94 # key prefix - lat, clat or slat. 95 # dict section - portion of json containing latency bucket in question 96 # Return dict key to access the bucket and unit as string 97 for k, v in dict_section.items(): 98 if k.startswith(key_prefix): 99 return k, k.split("_")[1] 100 101 def get_clat_percentiles(clat_dict_leaf): 102 if "percentile" in clat_dict_leaf: 103 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 104 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 105 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 106 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 107 108 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 109 else: 110 # Latest fio versions do not provide "percentile" results if no 111 # measurements were done, so just return zeroes 112 return [0, 0, 0, 0] 113 114 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 115 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 116 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 117 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 118 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 119 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 120 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 121 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 122 data["jobs"][job_pos]["read"][clat_key]) 123 124 if "ns" in lat_unit: 125 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 126 if "ns" in clat_unit: 127 read_p99_lat = read_p99_lat / 1000 128 read_p99_9_lat = read_p99_9_lat / 1000 129 read_p99_99_lat = read_p99_99_lat / 1000 130 read_p99_999_lat = read_p99_999_lat / 1000 131 132 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 133 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 134 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 135 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 136 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 137 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 138 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 139 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 140 data["jobs"][job_pos]["write"][clat_key]) 141 142 if "ns" in lat_unit: 143 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 144 if "ns" in clat_unit: 145 write_p99_lat = write_p99_lat / 1000 146 write_p99_9_lat = write_p99_9_lat / 1000 147 write_p99_99_lat = write_p99_99_lat / 1000 148 write_p99_999_lat = write_p99_999_lat / 1000 149 150 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 151 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 152 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 153 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 154 155 def parse_results(self, results_dir, initiator_count=None, run_num=None): 156 files = os.listdir(results_dir) 157 fio_files = filter(lambda x: ".fio" in x, files) 158 json_files = [x for x in files if ".json" in x] 159 160 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 161 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 162 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 163 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 164 165 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 166 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 167 168 header_line = ",".join(["Name", *headers]) 169 aggr_header_line = ",".join(["Name", *aggr_headers]) 170 171 # Create empty results file 172 csv_file = "nvmf_results.csv" 173 with open(os.path.join(results_dir, csv_file), "w") as fh: 174 fh.write(aggr_header_line + "\n") 175 rows = set() 176 177 for fio_config in fio_files: 178 self.log_print("Getting FIO stats for %s" % fio_config) 179 job_name, _ = os.path.splitext(fio_config) 180 181 # Look in the filename for rwmixread value. Function arguments do 182 # not have that information. 183 # TODO: Improve this function by directly using workload params instead 184 # of regexing through filenames. 185 if "read" in job_name: 186 rw_mixread = 1 187 elif "write" in job_name: 188 rw_mixread = 0 189 else: 190 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 191 192 # If "_CPU" exists in name - ignore it 193 # Initiators for the same job could have diffrent num_cores parameter 194 job_name = re.sub(r"_\d+CPU", "", job_name) 195 job_result_files = [x for x in json_files if job_name in x] 196 self.log_print("Matching result files for current fio config:") 197 for j in job_result_files: 198 self.log_print("\t %s" % j) 199 200 # There may have been more than 1 initiator used in test, need to check that 201 # Result files are created so that string after last "_" separator is server name 202 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 203 inits_avg_results = [] 204 for i in inits_names: 205 self.log_print("\tGetting stats for initiator %s" % i) 206 # There may have been more than 1 test run for this job, calculate average results for initiator 207 i_results = [x for x in job_result_files if i in x] 208 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 209 210 separate_stats = [] 211 for r in i_results: 212 stats = self.read_json_stats(os.path.join(results_dir, r)) 213 separate_stats.append(stats) 214 self.log_print(stats) 215 216 init_results = [sum(x) for x in zip(*separate_stats)] 217 init_results = [x / len(separate_stats) for x in init_results] 218 inits_avg_results.append(init_results) 219 220 self.log_print("\tAverage results for initiator %s" % i) 221 self.log_print(init_results) 222 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 223 fh.write(header_line + "\n") 224 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 225 226 # Sum results of all initiators running this FIO job. 227 # Latency results are an average of latencies from accros all initiators. 228 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 229 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 230 for key in inits_avg_results: 231 if "lat" in key: 232 inits_avg_results[key] /= len(inits_names) 233 234 # Aggregate separate read/write values into common labels 235 # Take rw_mixread into consideration for mixed read/write workloads. 236 aggregate_results = OrderedDict() 237 for h in aggr_headers: 238 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 239 if "lat" in h: 240 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 241 else: 242 _ = read_stat + write_stat 243 aggregate_results[h] = "{0:.3f}".format(_) 244 245 rows.add(",".join([job_name, *aggregate_results.values()])) 246 247 # Save results to file 248 for row in rows: 249 with open(os.path.join(results_dir, csv_file), "a") as fh: 250 fh.write(row + "\n") 251 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 252 253 def measure_sar(self, results_dir, sar_file_name): 254 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 255 time.sleep(self.sar_delay) 256 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 257 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 258 for line in out.split("\n"): 259 if "Average" in line and "CPU" in line: 260 self.log_print("Summary CPU utilization from SAR:") 261 self.log_print(line) 262 if "Average" in line and "all" in line: 263 self.log_print(line) 264 fh.write(out) 265 266 def measure_pcm_memory(self, results_dir, pcm_file_name): 267 time.sleep(self.pcm_delay) 268 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 269 results_dir, pcm_file_name), shell=True) 270 time.sleep(self.pcm_count) 271 pcm_memory.kill() 272 273 def measure_pcm(self, results_dir, pcm_file_name): 274 time.sleep(self.pcm_delay) 275 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 276 results_dir, pcm_file_name), shell=True, check=True) 277 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 278 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 279 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 280 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 281 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 282 283 def measure_pcm_power(self, results_dir, pcm_power_file_name): 284 time.sleep(self.pcm_delay) 285 out = subprocess.check_output("%s/pcm-power.x %s -i=%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count), 286 shell=True).decode(encoding="utf-8") 287 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 288 fh.write(out) 289 290 def measure_bandwidth(self, results_dir, bandwidth_file_name): 291 bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name, 292 self.bandwidth_count), shell=True, check=True) 293 294 def measure_dpdk_memory(self, results_dir): 295 self.log_print("INFO: waiting to generate DPDK memory usage") 296 time.sleep(self.dpdk_wait_time) 297 self.log_print("INFO: generating DPDK memory usage") 298 rpc.env.env_dpdk_get_mem_stats 299 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 300 301 def sys_config(self): 302 self.log_print("====Kernel release:====") 303 self.log_print(os.uname().release) 304 self.log_print("====Kernel command line:====") 305 with open('/proc/cmdline') as f: 306 cmdline = f.readlines() 307 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 308 self.log_print("====sysctl conf:====") 309 with open('/etc/sysctl.conf') as f: 310 sysctl = f.readlines() 311 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 312 self.log_print("====Cpu power info:====") 313 subprocess.run("cpupower frequency-info", shell=True, check=True) 314 self.log_print("====zcopy settings:====") 315 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 316 self.log_print("====Scheduler settings:====") 317 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 318 319 320class Initiator(Server): 321 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 322 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 323 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 324 325 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 326 327 self.ip = ip 328 self.spdk_dir = workspace 329 if os.getenv('SPDK_WORKSPACE'): 330 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 331 self.fio_bin = fio_bin 332 self.cpus_allowed = cpus_allowed 333 self.cpus_allowed_policy = cpus_allowed_policy 334 self.cpu_frequency = cpu_frequency 335 self.nvmecli_bin = nvmecli_bin 336 self.ssh_connection = paramiko.SSHClient() 337 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 338 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 339 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 340 self.remote_call("mkdir -p %s" % self.spdk_dir) 341 self.set_cpu_frequency() 342 self.sys_config() 343 344 def __del__(self): 345 self.ssh_connection.close() 346 347 def put_file(self, local, remote_dest): 348 ftp = self.ssh_connection.open_sftp() 349 ftp.put(local, remote_dest) 350 ftp.close() 351 352 def get_file(self, remote, local_dest): 353 ftp = self.ssh_connection.open_sftp() 354 ftp.get(remote, local_dest) 355 ftp.close() 356 357 def remote_call(self, cmd): 358 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 359 out = stdout.read().decode(encoding="utf-8") 360 err = stderr.read().decode(encoding="utf-8") 361 return out, err 362 363 def copy_result_files(self, dest_dir): 364 self.log_print("Copying results") 365 366 if not os.path.exists(dest_dir): 367 os.mkdir(dest_dir) 368 369 # Get list of result files from initiator and copy them back to target 370 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 371 file_list = stdout.strip().split("\n") 372 373 for file in file_list: 374 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 375 os.path.join(dest_dir, file)) 376 self.log_print("Done copying results") 377 378 def discover_subsystems(self, address_list, subsys_no): 379 num_nvmes = range(0, subsys_no) 380 nvme_discover_output = "" 381 for ip, subsys_no in itertools.product(address_list, num_nvmes): 382 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 383 nvme_discover_cmd = ["sudo", 384 "%s" % self.nvmecli_bin, 385 "discover", "-t %s" % self.transport, 386 "-s %s" % (4420 + subsys_no), 387 "-a %s" % ip] 388 nvme_discover_cmd = " ".join(nvme_discover_cmd) 389 390 stdout, stderr = self.remote_call(nvme_discover_cmd) 391 if stdout: 392 nvme_discover_output = nvme_discover_output + stdout 393 394 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 395 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 396 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 397 nvme_discover_output) # from nvme discovery output 398 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 399 subsystems = list(set(subsystems)) 400 subsystems.sort(key=lambda x: x[1]) 401 self.log_print("Found matching subsystems on target side:") 402 for s in subsystems: 403 self.log_print(s) 404 405 return subsystems 406 407 def gen_fio_filename_conf(self, *args, **kwargs): 408 # Logic implemented in SPDKInitiator and KernelInitiator classes 409 pass 410 411 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 412 fio_conf_template = """ 413[global] 414ioengine={ioengine} 415{spdk_conf} 416thread=1 417group_reporting=1 418direct=1 419percentile_list=50:90:99:99.5:99.9:99.99:99.999 420 421norandommap=1 422rw={rw} 423rwmixread={rwmixread} 424bs={block_size} 425time_based=1 426ramp_time={ramp_time} 427runtime={run_time} 428""" 429 if "spdk" in self.mode: 430 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 431 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 432 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 433 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 434 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 435 else: 436 ioengine = "libaio" 437 spdk_conf = "" 438 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 439 subsystems = [x for x in out.split("\n") if "nvme" in x] 440 441 if self.cpus_allowed is not None: 442 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 443 cpus_num = 0 444 cpus = self.cpus_allowed.split(",") 445 for cpu in cpus: 446 if "-" in cpu: 447 a, b = cpu.split("-") 448 a = int(a) 449 b = int(b) 450 cpus_num += len(range(a, b)) 451 else: 452 cpus_num += 1 453 threads = range(0, cpus_num) 454 elif hasattr(self, 'num_cores'): 455 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 456 threads = range(0, int(self.num_cores)) 457 else: 458 threads = range(0, len(subsystems)) 459 460 if "spdk" in self.mode: 461 filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs) 462 else: 463 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 464 465 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 466 rw=rw, rwmixread=rwmixread, block_size=block_size, 467 ramp_time=ramp_time, run_time=run_time) 468 if num_jobs: 469 fio_config = fio_config + "numjobs=%s \n" % num_jobs 470 if self.cpus_allowed is not None: 471 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 472 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 473 fio_config = fio_config + filename_section 474 475 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 476 if hasattr(self, "num_cores"): 477 fio_config_filename += "_%sCPU" % self.num_cores 478 fio_config_filename += ".fio" 479 480 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 481 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 482 self.log_print("Created FIO Config:") 483 self.log_print(fio_config) 484 485 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 486 487 def set_cpu_frequency(self): 488 if self.cpu_frequency is not None: 489 try: 490 self.remote_call('sudo cpupower frequency-set -g userspace') 491 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 492 cmd = "sudo cpupower frequency-info" 493 output, error = self.remote_call(cmd) 494 self.log_print(output) 495 self.log_print(error) 496 except Exception: 497 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 498 sys.exit() 499 else: 500 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 501 502 def run_fio(self, fio_config_file, run_num=None): 503 job_name, _ = os.path.splitext(fio_config_file) 504 self.log_print("Starting FIO run for job: %s" % job_name) 505 self.log_print("Using FIO: %s" % self.fio_bin) 506 507 if run_num: 508 for i in range(1, run_num + 1): 509 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 510 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 511 output, error = self.remote_call(cmd) 512 self.log_print(output) 513 self.log_print(error) 514 else: 515 output_filename = job_name + "_" + self.name + ".json" 516 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 517 output, error = self.remote_call(cmd) 518 self.log_print(output) 519 self.log_print(error) 520 self.log_print("FIO run finished. Results in: %s" % output_filename) 521 522 def sys_config(self): 523 self.log_print("====Kernel release:====") 524 self.log_print(self.remote_call('uname -r')[0]) 525 self.log_print("====Kernel command line:====") 526 cmdline, error = self.remote_call('cat /proc/cmdline') 527 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 528 self.log_print("====sysctl conf:====") 529 sysctl, error = self.remote_call('cat /etc/sysctl.conf') 530 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 531 self.log_print("====Cpu power info:====") 532 self.remote_call("cpupower frequency-info") 533 534 535class KernelTarget(Target): 536 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 537 null_block_devices=0, sar_settings=None, pcm_settings=None, 538 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs): 539 540 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 541 null_block_devices, sar_settings, pcm_settings, bandwidth_settings, 542 dpdk_settings) 543 self.nvmet_bin = nvmet_bin 544 545 def __del__(self): 546 nvmet_command(self.nvmet_bin, "clear") 547 548 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 549 550 nvmet_cfg = { 551 "ports": [], 552 "hosts": [], 553 "subsystems": [], 554 } 555 556 # Split disks between NIC IP's 557 disks_per_ip = int(len(nvme_list) / len(address_list)) 558 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 559 560 subsys_no = 1 561 port_no = 0 562 for ip, chunk in zip(address_list, disk_chunks): 563 for disk in chunk: 564 nvmet_cfg["subsystems"].append({ 565 "allowed_hosts": [], 566 "attr": { 567 "allow_any_host": "1", 568 "serial": "SPDK00%s" % subsys_no, 569 "version": "1.3" 570 }, 571 "namespaces": [ 572 { 573 "device": { 574 "path": disk, 575 "uuid": "%s" % uuid.uuid4() 576 }, 577 "enable": 1, 578 "nsid": subsys_no 579 } 580 ], 581 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 582 }) 583 584 nvmet_cfg["ports"].append({ 585 "addr": { 586 "adrfam": "ipv4", 587 "traddr": ip, 588 "trsvcid": "%s" % (4420 + port_no), 589 "trtype": "%s" % self.transport 590 }, 591 "portid": subsys_no, 592 "referrals": [], 593 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 594 }) 595 subsys_no += 1 596 port_no += 1 597 598 with open("kernel.conf", "w") as fh: 599 fh.write(json.dumps(nvmet_cfg, indent=2)) 600 pass 601 602 def tgt_start(self): 603 self.log_print("Configuring kernel NVMeOF Target") 604 605 if self.null_block: 606 print("Configuring with null block device.") 607 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 608 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 609 self.subsys_no = len(null_blk_list) 610 else: 611 print("Configuring with NVMe drives.") 612 nvme_list = get_nvme_devices() 613 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 614 self.subsys_no = len(nvme_list) 615 616 nvmet_command(self.nvmet_bin, "clear") 617 nvmet_command(self.nvmet_bin, "restore kernel.conf") 618 self.log_print("Done configuring kernel NVMeOF Target") 619 620 621class SPDKTarget(Target): 622 623 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 624 null_block_devices=0, null_block_dif_type=0, sar_settings=None, pcm_settings=None, 625 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None, 626 scheduler_settings="static", num_shared_buffers=4096, num_cores=1, 627 dif_insert_strip=False, **kwargs): 628 629 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 630 null_block_devices, sar_settings, pcm_settings, bandwidth_settings, 631 dpdk_settings, zcopy_settings, scheduler_settings) 632 self.num_cores = num_cores 633 self.num_shared_buffers = num_shared_buffers 634 self.null_block_dif_type = null_block_dif_type 635 self.dif_insert_strip = dif_insert_strip 636 637 def spdk_tgt_configure(self): 638 self.log_print("Configuring SPDK NVMeOF target via RPC") 639 numa_list = get_used_numa_nodes() 640 641 # Create RDMA transport layer 642 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 643 num_shared_buffers=self.num_shared_buffers, 644 dif_insert_or_strip=self.dif_insert_strip) 645 self.log_print("SPDK NVMeOF transport layer:") 646 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 647 648 if self.null_block: 649 nvme_section = self.spdk_tgt_add_nullblock(self.null_block) 650 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 651 else: 652 nvme_section = self.spdk_tgt_add_nvme_conf() 653 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 654 self.log_print("Done configuring SPDK NVMeOF Target") 655 656 def spdk_tgt_add_nullblock(self, null_block_count): 657 md_size = 0 658 block_size = 4096 659 if self.null_block_dif_type != 0: 660 md_size = 128 661 662 self.log_print("Adding null block bdevices to config via RPC") 663 for i in range(null_block_count): 664 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 665 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 666 dif_type=self.null_block_dif_type, md_size=md_size) 667 self.log_print("SPDK Bdevs configuration:") 668 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 669 670 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 671 self.log_print("Adding NVMe bdevs to config via RPC") 672 673 bdfs = get_nvme_devices_bdf() 674 bdfs = [b.replace(":", ".") for b in bdfs] 675 676 if req_num_disks: 677 if req_num_disks > len(bdfs): 678 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 679 sys.exit(1) 680 else: 681 bdfs = bdfs[0:req_num_disks] 682 683 for i, bdf in enumerate(bdfs): 684 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 685 686 self.log_print("SPDK Bdevs configuration:") 687 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 688 689 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 690 self.log_print("Adding subsystems to config") 691 if not req_num_disks: 692 req_num_disks = get_nvme_devices_count() 693 694 # Distribute bdevs between provided NICs 695 num_disks = range(0, req_num_disks) 696 if len(num_disks) == 1: 697 disks_per_ip = 1 698 else: 699 disks_per_ip = int(len(num_disks) / len(ips)) 700 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 701 702 # Create subsystems, add bdevs to namespaces, add listeners 703 for ip, chunk in zip(ips, disk_chunks): 704 for c in chunk: 705 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 706 serial = "SPDK00%s" % c 707 bdev_name = "Nvme%sn1" % c 708 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 709 allow_any_host=True, max_namespaces=8) 710 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 711 712 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 713 trtype=self.transport, 714 traddr=ip, 715 trsvcid="4420", 716 adrfam="ipv4") 717 718 self.log_print("SPDK NVMeOF subsystem configuration:") 719 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 720 721 def tgt_start(self): 722 if self.null_block: 723 self.subsys_no = 1 724 else: 725 self.subsys_no = get_nvme_devices_count() 726 self.log_print("Starting SPDK NVMeOF Target process") 727 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt --wait-for-rpc") 728 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 729 proc = subprocess.Popen(command, shell=True) 730 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 731 732 with open(self.pid, "w") as fh: 733 fh.write(str(proc.pid)) 734 self.nvmf_proc = proc 735 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 736 self.log_print("Waiting for spdk to initilize...") 737 while True: 738 if os.path.exists("/var/tmp/spdk.sock"): 739 break 740 time.sleep(1) 741 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 742 743 if self.enable_zcopy: 744 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 745 enable_zerocopy_send=True) 746 self.log_print("Target socket options:") 747 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 748 749 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 750 751 rpc.framework_start_init(self.client) 752 self.spdk_tgt_configure() 753 754 def __del__(self): 755 if hasattr(self, "nvmf_proc"): 756 try: 757 self.nvmf_proc.terminate() 758 self.nvmf_proc.wait() 759 except Exception as e: 760 self.log_print(e) 761 self.nvmf_proc.kill() 762 self.nvmf_proc.communicate() 763 764 765class KernelInitiator(Initiator): 766 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 767 cpus_allowed=None, cpus_allowed_policy="shared", 768 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 769 770 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 771 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 772 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 773 774 self.extra_params = "" 775 if kwargs["extra_params"]: 776 self.extra_params = kwargs["extra_params"] 777 778 def __del__(self): 779 self.ssh_connection.close() 780 781 def kernel_init_connect(self, address_list, subsys_no): 782 subsystems = self.discover_subsystems(address_list, subsys_no) 783 self.log_print("Below connection attempts may result in error messages, this is expected!") 784 for subsystem in subsystems: 785 self.log_print("Trying to connect %s %s %s" % subsystem) 786 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 787 self.transport, 788 *subsystem, 789 self.extra_params)) 790 time.sleep(2) 791 792 def kernel_init_disconnect(self, address_list, subsys_no): 793 subsystems = self.discover_subsystems(address_list, subsys_no) 794 for subsystem in subsystems: 795 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 796 time.sleep(1) 797 798 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 799 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 800 nvme_list = [x for x in out.split("\n") if "nvme" in x] 801 802 filename_section = "" 803 nvme_per_split = int(len(nvme_list) / len(threads)) 804 remainder = len(nvme_list) % len(threads) 805 iterator = iter(nvme_list) 806 result = [] 807 for i in range(len(threads)): 808 result.append([]) 809 for j in range(nvme_per_split): 810 result[i].append(next(iterator)) 811 if remainder: 812 result[i].append(next(iterator)) 813 remainder -= 1 814 for i, r in enumerate(result): 815 header = "[filename%s]" % i 816 disks = "\n".join(["filename=%s" % x for x in r]) 817 job_section_qd = round((io_depth * len(r)) / num_jobs) 818 if job_section_qd == 0: 819 job_section_qd = 1 820 iodepth = "iodepth=%s" % job_section_qd 821 filename_section = "\n".join([filename_section, header, disks, iodepth]) 822 823 return filename_section 824 825 826class SPDKInitiator(Initiator): 827 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 828 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 829 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 830 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 831 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 832 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 833 834 self.num_cores = num_cores 835 836 def install_spdk(self, local_spdk_zip): 837 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 838 self.log_print("Copied sources zip from target") 839 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 840 841 self.log_print("Sources unpacked") 842 self.log_print("Using fio binary %s" % self.fio_bin) 843 self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;" 844 "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 845 846 self.log_print("SPDK built") 847 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 848 849 def gen_spdk_bdev_conf(self, remote_subsystem_list): 850 bdev_cfg_section = { 851 "subsystems": [ 852 { 853 "subsystem": "bdev", 854 "config": [] 855 } 856 ] 857 } 858 859 for i, subsys in enumerate(remote_subsystem_list): 860 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 861 nvme_ctrl = { 862 "method": "bdev_nvme_attach_controller", 863 "params": { 864 "name": "Nvme{}".format(i), 865 "trtype": self.transport, 866 "traddr": sub_addr, 867 "trsvcid": sub_port, 868 "subnqn": sub_nqn, 869 "adrfam": "IPv4" 870 } 871 } 872 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 873 874 return json.dumps(bdev_cfg_section, indent=2) 875 876 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 877 filename_section = "" 878 if len(threads) >= len(subsystems): 879 threads = range(0, len(subsystems)) 880 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 881 nvme_per_split = int(len(subsystems) / len(threads)) 882 remainder = len(subsystems) % len(threads) 883 iterator = iter(filenames) 884 result = [] 885 for i in range(len(threads)): 886 result.append([]) 887 for j in range(nvme_per_split): 888 result[i].append(next(iterator)) 889 if remainder: 890 result[i].append(next(iterator)) 891 remainder -= 1 892 for i, r in enumerate(result): 893 header = "[filename%s]" % i 894 disks = "\n".join(["filename=%s" % x for x in r]) 895 job_section_qd = round((io_depth * len(r)) / num_jobs) 896 if job_section_qd == 0: 897 job_section_qd = 1 898 iodepth = "iodepth=%s" % job_section_qd 899 filename_section = "\n".join([filename_section, header, disks, iodepth]) 900 901 return filename_section 902 903 904if __name__ == "__main__": 905 spdk_zip_path = "/tmp/spdk.zip" 906 target_results_dir = "/tmp/results" 907 908 if (len(sys.argv) > 1): 909 config_file_path = sys.argv[1] 910 else: 911 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 912 config_file_path = os.path.join(script_full_dir, "config.json") 913 914 print("Using config file: %s" % config_file_path) 915 with open(config_file_path, "r") as config: 916 data = json.load(config) 917 918 initiators = [] 919 fio_cases = [] 920 921 for k, v in data.items(): 922 if "target" in k: 923 if data[k]["mode"] == "spdk": 924 target_obj = SPDKTarget(name=k, **data["general"], **v) 925 elif data[k]["mode"] == "kernel": 926 target_obj = KernelTarget(name=k, **data["general"], **v) 927 elif "initiator" in k: 928 if data[k]["mode"] == "spdk": 929 init_obj = SPDKInitiator(name=k, **data["general"], **v) 930 elif data[k]["mode"] == "kernel": 931 init_obj = KernelInitiator(name=k, **data["general"], **v) 932 initiators.append(init_obj) 933 elif "fio" in k: 934 fio_workloads = itertools.product(data[k]["bs"], 935 data[k]["qd"], 936 data[k]["rw"]) 937 938 fio_run_time = data[k]["run_time"] 939 fio_ramp_time = data[k]["ramp_time"] 940 fio_rw_mix_read = data[k]["rwmixread"] 941 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 942 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 943 else: 944 continue 945 946 # Copy and install SPDK on remote initiators 947 if "skip_spdk_install" not in data["general"]: 948 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 949 threads = [] 950 for i in initiators: 951 if i.mode == "spdk": 952 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 953 threads.append(t) 954 t.start() 955 for t in threads: 956 t.join() 957 958 target_obj.tgt_start() 959 960 # Poor mans threading 961 # Run FIO tests 962 for block_size, io_depth, rw in fio_workloads: 963 threads = [] 964 configs = [] 965 for i in initiators: 966 if i.mode == "kernel": 967 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 968 969 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 970 fio_num_jobs, fio_ramp_time, fio_run_time) 971 configs.append(cfg) 972 973 for i, cfg in zip(initiators, configs): 974 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 975 threads.append(t) 976 if target_obj.enable_sar: 977 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 978 sar_file_name = ".".join([sar_file_name, "txt"]) 979 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 980 threads.append(t) 981 982 if target_obj.enable_pcm: 983 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 984 pcm_file_name = ".".join([pcm_file_name, "csv"]) 985 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 986 threads.append(t) 987 988 if target_obj.enable_pcm_memory: 989 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 990 pcm_file_name = ".".join([pcm_file_name, "csv"]) 991 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 992 threads.append(t) 993 994 if target_obj.enable_pcm_power: 995 pcm_file_name = "_".join(["pcm_power", str(block_size), str(rw), str(io_depth)]) 996 pcm_file_name = ".".join([pcm_file_name, "csv"]) 997 t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_file_name,)) 998 threads.append(t) 999 1000 if target_obj.enable_bandwidth: 1001 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1002 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1003 t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 1004 threads.append(t) 1005 1006 if target_obj.enable_dpdk_memory: 1007 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir)) 1008 threads.append(t) 1009 1010 for t in threads: 1011 t.start() 1012 for t in threads: 1013 t.join() 1014 1015 for i in initiators: 1016 if i.mode == "kernel": 1017 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 1018 i.copy_result_files(target_results_dir) 1019 1020 target_obj.parse_results(target_results_dir) 1021