1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import paramiko 8import zipfile 9import threading 10import subprocess 11import itertools 12import time 13import uuid 14import rpc 15import rpc.client 16import pandas as pd 17from collections import OrderedDict 18from common import * 19 20 21class Server: 22 def __init__(self, name, username, password, mode, nic_ips, transport): 23 self.name = name 24 self.mode = mode 25 self.username = username 26 self.password = password 27 self.nic_ips = nic_ips 28 self.transport = transport.lower() 29 30 if not re.match("^[A-Za-z0-9]*$", name): 31 self.log_print("Please use a name which contains only letters or numbers") 32 sys.exit(1) 33 34 def log_print(self, msg): 35 print("[%s] %s" % (self.name, msg), flush=True) 36 37 def get_uncommented_lines(self, lines): 38 return [l for l in lines if l and not l.startswith('#')] 39 40 41class Target(Server): 42 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 43 null_block_devices=0, sar_settings=None, pcm_settings=None, 44 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None, 45 scheduler_settings="static"): 46 47 super(Target, self).__init__(name, username, password, mode, nic_ips, transport) 48 self.null_block = null_block_devices 49 self.enable_sar = False 50 self.enable_pcm_memory = False 51 self.enable_pcm = False 52 self.enable_bandwidth = False 53 self.enable_dpdk_memory = False 54 self.enable_zcopy = False 55 self.scheduler_name = scheduler_settings 56 57 if sar_settings: 58 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings 59 60 if pcm_settings: 61 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings 62 63 if bandwidth_settings: 64 self.enable_bandwidth, self.bandwidth_count = bandwidth_settings 65 66 if dpdk_settings: 67 self.enable_dpdk_memory, self.dpdk_wait_time = dpdk_settings 68 69 if zcopy_settings: 70 self.enable_zcopy = zcopy_settings 71 72 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 73 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 74 self.sys_config() 75 76 def zip_spdk_sources(self, spdk_dir, dest_file): 77 self.log_print("Zipping SPDK source directory") 78 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 79 for root, directories, files in os.walk(spdk_dir, followlinks=True): 80 for file in files: 81 fh.write(os.path.relpath(os.path.join(root, file))) 82 fh.close() 83 self.log_print("Done zipping") 84 85 def read_json_stats(self, file): 86 with open(file, "r") as json_data: 87 data = json.load(json_data) 88 job_pos = 0 # job_post = 0 because using aggregated results 89 90 # Check if latency is in nano or microseconds to choose correct dict key 91 def get_lat_unit(key_prefix, dict_section): 92 # key prefix - lat, clat or slat. 93 # dict section - portion of json containing latency bucket in question 94 # Return dict key to access the bucket and unit as string 95 for k, v in dict_section.items(): 96 if k.startswith(key_prefix): 97 return k, k.split("_")[1] 98 99 def get_clat_percentiles(clat_dict_leaf): 100 if "percentile" in clat_dict_leaf: 101 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 102 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 103 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 104 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 105 106 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 107 else: 108 # Latest fio versions do not provide "percentile" results if no 109 # measurements were done, so just return zeroes 110 return [0, 0, 0, 0] 111 112 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 113 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 114 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 115 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 116 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 117 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 118 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 119 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 120 data["jobs"][job_pos]["read"][clat_key]) 121 122 if "ns" in lat_unit: 123 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 124 if "ns" in clat_unit: 125 read_p99_lat = read_p99_lat / 1000 126 read_p99_9_lat = read_p99_9_lat / 1000 127 read_p99_99_lat = read_p99_99_lat / 1000 128 read_p99_999_lat = read_p99_999_lat / 1000 129 130 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 131 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 132 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 133 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 134 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 135 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 136 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 137 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 138 data["jobs"][job_pos]["write"][clat_key]) 139 140 if "ns" in lat_unit: 141 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 142 if "ns" in clat_unit: 143 write_p99_lat = write_p99_lat / 1000 144 write_p99_9_lat = write_p99_9_lat / 1000 145 write_p99_99_lat = write_p99_99_lat / 1000 146 write_p99_999_lat = write_p99_999_lat / 1000 147 148 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 149 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 150 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 151 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 152 153 def parse_results(self, results_dir, initiator_count=None, run_num=None): 154 files = os.listdir(results_dir) 155 fio_files = filter(lambda x: ".fio" in x, files) 156 json_files = [x for x in files if ".json" in x] 157 158 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 159 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 160 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 161 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 162 163 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 164 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 165 166 header_line = ",".join(["Name", *headers]) 167 aggr_header_line = ",".join(["Name", *aggr_headers]) 168 169 # Create empty results file 170 csv_file = "nvmf_results.csv" 171 with open(os.path.join(results_dir, csv_file), "w") as fh: 172 fh.write(aggr_header_line + "\n") 173 rows = set() 174 175 for fio_config in fio_files: 176 self.log_print("Getting FIO stats for %s" % fio_config) 177 job_name, _ = os.path.splitext(fio_config) 178 179 # Look in the filename for rwmixread value. Function arguments do 180 # not have that information. 181 # TODO: Improve this function by directly using workload params instead 182 # of regexing through filenames. 183 if "read" in job_name: 184 rw_mixread = 1 185 elif "write" in job_name: 186 rw_mixread = 0 187 else: 188 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 189 190 # If "_CPU" exists in name - ignore it 191 # Initiators for the same job could have diffrent num_cores parameter 192 job_name = re.sub(r"_\d+CPU", "", job_name) 193 job_result_files = [x for x in json_files if job_name in x] 194 self.log_print("Matching result files for current fio config:") 195 for j in job_result_files: 196 self.log_print("\t %s" % j) 197 198 # There may have been more than 1 initiator used in test, need to check that 199 # Result files are created so that string after last "_" separator is server name 200 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 201 inits_avg_results = [] 202 for i in inits_names: 203 self.log_print("\tGetting stats for initiator %s" % i) 204 # There may have been more than 1 test run for this job, calculate average results for initiator 205 i_results = [x for x in job_result_files if i in x] 206 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 207 208 separate_stats = [] 209 for r in i_results: 210 stats = self.read_json_stats(os.path.join(results_dir, r)) 211 separate_stats.append(stats) 212 self.log_print(stats) 213 214 init_results = [sum(x) for x in zip(*separate_stats)] 215 init_results = [x / len(separate_stats) for x in init_results] 216 inits_avg_results.append(init_results) 217 218 self.log_print("\tAverage results for initiator %s" % i) 219 self.log_print(init_results) 220 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 221 fh.write(header_line + "\n") 222 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 223 224 # Sum results of all initiators running this FIO job. 225 # Latency results are an average of latencies from accros all initiators. 226 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 227 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 228 for key in inits_avg_results: 229 if "lat" in key: 230 inits_avg_results[key] /= len(inits_names) 231 232 # Aggregate separate read/write values into common labels 233 # Take rw_mixread into consideration for mixed read/write workloads. 234 aggregate_results = OrderedDict() 235 for h in aggr_headers: 236 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 237 if "lat" in h: 238 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 239 else: 240 _ = read_stat + write_stat 241 aggregate_results[h] = "{0:.3f}".format(_) 242 243 rows.add(",".join([job_name, *aggregate_results.values()])) 244 245 # Save results to file 246 for row in rows: 247 with open(os.path.join(results_dir, csv_file), "a") as fh: 248 fh.write(row + "\n") 249 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 250 251 def measure_sar(self, results_dir, sar_file_name): 252 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 253 time.sleep(self.sar_delay) 254 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8") 255 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 256 for line in out.split("\n"): 257 if "Average" in line and "CPU" in line: 258 self.log_print("Summary CPU utilization from SAR:") 259 self.log_print(line) 260 if "Average" in line and "all" in line: 261 self.log_print(line) 262 fh.write(out) 263 264 def measure_pcm_memory(self, results_dir, pcm_file_name): 265 time.sleep(self.pcm_delay) 266 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, 267 results_dir, pcm_file_name), shell=True) 268 time.sleep(self.pcm_count) 269 pcm_memory.kill() 270 271 def measure_pcm(self, results_dir, pcm_file_name): 272 time.sleep(self.pcm_delay) 273 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count, 274 results_dir, pcm_file_name), shell=True, check=True) 275 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 276 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 277 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 278 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 279 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 280 281 def measure_bandwidth(self, results_dir, bandwidth_file_name): 282 bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name, 283 self.bandwidth_count), shell=True, check=True) 284 285 def measure_dpdk_memory(self, results_dir): 286 self.log_print("INFO: waiting to generate DPDK memory usage") 287 time.sleep(self.dpdk_wait_time) 288 self.log_print("INFO: generating DPDK memory usage") 289 rpc.env.env_dpdk_get_mem_stats 290 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 291 292 def sys_config(self): 293 self.log_print("====Kernel release:====") 294 self.log_print(os.uname().release) 295 self.log_print("====Kernel command line:====") 296 with open('/proc/cmdline') as f: 297 cmdline = f.readlines() 298 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 299 self.log_print("====sysctl conf:====") 300 with open('/etc/sysctl.conf') as f: 301 sysctl = f.readlines() 302 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 303 self.log_print("====Cpu power info:====") 304 subprocess.run("cpupower frequency-info", shell=True, check=True) 305 self.log_print("====zcopy settings:====") 306 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 307 self.log_print("====Scheduler settings:====") 308 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 309 310 311class Initiator(Server): 312 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None, 313 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None, 314 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"): 315 316 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport) 317 318 self.ip = ip 319 self.spdk_dir = workspace 320 if os.getenv('SPDK_WORKSPACE'): 321 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 322 self.fio_bin = fio_bin 323 self.cpus_allowed = cpus_allowed 324 self.cpus_allowed_policy = cpus_allowed_policy 325 self.cpu_frequency = cpu_frequency 326 self.nvmecli_bin = nvmecli_bin 327 self.ssh_connection = paramiko.SSHClient() 328 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 329 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 330 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir) 331 self.remote_call("mkdir -p %s" % self.spdk_dir) 332 self.set_cpu_frequency() 333 self.sys_config() 334 335 def __del__(self): 336 self.ssh_connection.close() 337 338 def put_file(self, local, remote_dest): 339 ftp = self.ssh_connection.open_sftp() 340 ftp.put(local, remote_dest) 341 ftp.close() 342 343 def get_file(self, remote, local_dest): 344 ftp = self.ssh_connection.open_sftp() 345 ftp.get(remote, local_dest) 346 ftp.close() 347 348 def remote_call(self, cmd): 349 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd) 350 out = stdout.read().decode(encoding="utf-8") 351 err = stderr.read().decode(encoding="utf-8") 352 return out, err 353 354 def copy_result_files(self, dest_dir): 355 self.log_print("Copying results") 356 357 if not os.path.exists(dest_dir): 358 os.mkdir(dest_dir) 359 360 # Get list of result files from initiator and copy them back to target 361 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir) 362 file_list = stdout.strip().split("\n") 363 364 for file in file_list: 365 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 366 os.path.join(dest_dir, file)) 367 self.log_print("Done copying results") 368 369 def discover_subsystems(self, address_list, subsys_no): 370 num_nvmes = range(0, subsys_no) 371 nvme_discover_output = "" 372 for ip, subsys_no in itertools.product(address_list, num_nvmes): 373 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 374 nvme_discover_cmd = ["sudo", 375 "%s" % self.nvmecli_bin, 376 "discover", "-t %s" % self.transport, 377 "-s %s" % (4420 + subsys_no), 378 "-a %s" % ip] 379 nvme_discover_cmd = " ".join(nvme_discover_cmd) 380 381 stdout, stderr = self.remote_call(nvme_discover_cmd) 382 if stdout: 383 nvme_discover_output = nvme_discover_output + stdout 384 385 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 386 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 387 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 388 nvme_discover_output) # from nvme discovery output 389 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 390 subsystems = list(set(subsystems)) 391 subsystems.sort(key=lambda x: x[1]) 392 self.log_print("Found matching subsystems on target side:") 393 for s in subsystems: 394 self.log_print(s) 395 396 return subsystems 397 398 def gen_fio_filename_conf(self, *args, **kwargs): 399 # Logic implemented in SPDKInitiator and KernelInitiator classes 400 pass 401 402 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 403 fio_conf_template = """ 404[global] 405ioengine={ioengine} 406{spdk_conf} 407thread=1 408group_reporting=1 409direct=1 410percentile_list=50:90:99:99.5:99.9:99.99:99.999 411 412norandommap=1 413rw={rw} 414rwmixread={rwmixread} 415bs={block_size} 416time_based=1 417ramp_time={ramp_time} 418runtime={run_time} 419""" 420 if "spdk" in self.mode: 421 subsystems = self.discover_subsystems(self.nic_ips, subsys_no) 422 bdev_conf = self.gen_spdk_bdev_conf(subsystems) 423 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir)) 424 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 425 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 426 else: 427 ioengine = "libaio" 428 spdk_conf = "" 429 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 430 subsystems = [x for x in out.split("\n") if "nvme" in x] 431 432 if self.cpus_allowed is not None: 433 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 434 cpus_num = 0 435 cpus = self.cpus_allowed.split(",") 436 for cpu in cpus: 437 if "-" in cpu: 438 a, b = cpu.split("-") 439 a = int(a) 440 b = int(b) 441 cpus_num += len(range(a, b)) 442 else: 443 cpus_num += 1 444 threads = range(0, cpus_num) 445 elif hasattr(self, 'num_cores'): 446 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 447 threads = range(0, int(self.num_cores)) 448 else: 449 threads = range(0, len(subsystems)) 450 451 if "spdk" in self.mode: 452 filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs) 453 else: 454 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 455 456 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 457 rw=rw, rwmixread=rwmixread, block_size=block_size, 458 ramp_time=ramp_time, run_time=run_time) 459 if num_jobs: 460 fio_config = fio_config + "numjobs=%s \n" % num_jobs 461 if self.cpus_allowed is not None: 462 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 463 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 464 fio_config = fio_config + filename_section 465 466 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 467 if hasattr(self, "num_cores"): 468 fio_config_filename += "_%sCPU" % self.num_cores 469 fio_config_filename += ".fio" 470 471 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir) 472 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename)) 473 self.log_print("Created FIO Config:") 474 self.log_print(fio_config) 475 476 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 477 478 def set_cpu_frequency(self): 479 if self.cpu_frequency is not None: 480 try: 481 self.remote_call('sudo cpupower frequency-set -g userspace') 482 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency) 483 cmd = "sudo cpupower frequency-info" 484 output, error = self.remote_call(cmd) 485 self.log_print(output) 486 self.log_print(error) 487 except Exception: 488 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 489 sys.exit() 490 else: 491 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 492 493 def run_fio(self, fio_config_file, run_num=None): 494 job_name, _ = os.path.splitext(fio_config_file) 495 self.log_print("Starting FIO run for job: %s" % job_name) 496 self.log_print("Using FIO: %s" % self.fio_bin) 497 498 if run_num: 499 for i in range(1, run_num + 1): 500 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 501 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 502 output, error = self.remote_call(cmd) 503 self.log_print(output) 504 self.log_print(error) 505 else: 506 output_filename = job_name + "_" + self.name + ".json" 507 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename) 508 output, error = self.remote_call(cmd) 509 self.log_print(output) 510 self.log_print(error) 511 self.log_print("FIO run finished. Results in: %s" % output_filename) 512 513 def sys_config(self): 514 self.log_print("====Kernel release:====") 515 self.log_print(self.remote_call('uname -r')[0]) 516 self.log_print("====Kernel command line:====") 517 cmdline, error = self.remote_call('cat /proc/cmdline') 518 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 519 self.log_print("====sysctl conf:====") 520 sysctl, error = self.remote_call('cat /etc/sysctl.conf') 521 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 522 self.log_print("====Cpu power info:====") 523 self.remote_call("cpupower frequency-info") 524 525 526class KernelTarget(Target): 527 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 528 null_block_devices=0, sar_settings=None, pcm_settings=None, 529 bandwidth_settings=None, dpdk_settings=None, nvmet_bin="nvmetcli", **kwargs): 530 531 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport, 532 null_block_devices, sar_settings, pcm_settings, bandwidth_settings, 533 dpdk_settings) 534 self.nvmet_bin = nvmet_bin 535 536 def __del__(self): 537 nvmet_command(self.nvmet_bin, "clear") 538 539 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 540 541 nvmet_cfg = { 542 "ports": [], 543 "hosts": [], 544 "subsystems": [], 545 } 546 547 # Split disks between NIC IP's 548 disks_per_ip = int(len(nvme_list) / len(address_list)) 549 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 550 551 subsys_no = 1 552 port_no = 0 553 for ip, chunk in zip(address_list, disk_chunks): 554 for disk in chunk: 555 nvmet_cfg["subsystems"].append({ 556 "allowed_hosts": [], 557 "attr": { 558 "allow_any_host": "1", 559 "serial": "SPDK00%s" % subsys_no, 560 "version": "1.3" 561 }, 562 "namespaces": [ 563 { 564 "device": { 565 "path": disk, 566 "uuid": "%s" % uuid.uuid4() 567 }, 568 "enable": 1, 569 "nsid": subsys_no 570 } 571 ], 572 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no 573 }) 574 575 nvmet_cfg["ports"].append({ 576 "addr": { 577 "adrfam": "ipv4", 578 "traddr": ip, 579 "trsvcid": "%s" % (4420 + port_no), 580 "trtype": "%s" % self.transport 581 }, 582 "portid": subsys_no, 583 "referrals": [], 584 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no] 585 }) 586 subsys_no += 1 587 port_no += 1 588 589 with open("kernel.conf", "w") as fh: 590 fh.write(json.dumps(nvmet_cfg, indent=2)) 591 pass 592 593 def tgt_start(self): 594 self.log_print("Configuring kernel NVMeOF Target") 595 596 if self.null_block: 597 print("Configuring with null block device.") 598 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 599 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 600 self.subsys_no = len(null_blk_list) 601 else: 602 print("Configuring with NVMe drives.") 603 nvme_list = get_nvme_devices() 604 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 605 self.subsys_no = len(nvme_list) 606 607 nvmet_command(self.nvmet_bin, "clear") 608 nvmet_command(self.nvmet_bin, "restore kernel.conf") 609 self.log_print("Done configuring kernel NVMeOF Target") 610 611 612class SPDKTarget(Target): 613 614 def __init__(self, name, username, password, mode, nic_ips, transport="rdma", 615 null_block_devices=0, null_block_dif_type=0, sar_settings=None, pcm_settings=None, 616 bandwidth_settings=None, dpdk_settings=None, zcopy_settings=None, 617 scheduler_settings="static", num_shared_buffers=4096, num_cores=1, 618 dif_insert_strip=False, **kwargs): 619 620 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport, 621 null_block_devices, sar_settings, pcm_settings, bandwidth_settings, 622 dpdk_settings, zcopy_settings, scheduler_settings) 623 self.num_cores = num_cores 624 self.num_shared_buffers = num_shared_buffers 625 self.null_block_dif_type = null_block_dif_type 626 self.dif_insert_strip = dif_insert_strip 627 628 def spdk_tgt_configure(self): 629 self.log_print("Configuring SPDK NVMeOF target via RPC") 630 numa_list = get_used_numa_nodes() 631 632 # Create RDMA transport layer 633 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 634 num_shared_buffers=self.num_shared_buffers, 635 dif_insert_or_strip=self.dif_insert_strip) 636 self.log_print("SPDK NVMeOF transport layer:") 637 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 638 639 if self.null_block: 640 nvme_section = self.spdk_tgt_add_nullblock(self.null_block) 641 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 642 else: 643 nvme_section = self.spdk_tgt_add_nvme_conf() 644 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips) 645 self.log_print("Done configuring SPDK NVMeOF Target") 646 647 def spdk_tgt_add_nullblock(self, null_block_count): 648 md_size = 0 649 block_size = 4096 650 if self.null_block_dif_type != 0: 651 md_size = 128 652 653 self.log_print("Adding null block bdevices to config via RPC") 654 for i in range(null_block_count): 655 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 656 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 657 dif_type=self.null_block_dif_type, md_size=md_size) 658 self.log_print("SPDK Bdevs configuration:") 659 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 660 661 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 662 self.log_print("Adding NVMe bdevs to config via RPC") 663 664 bdfs = get_nvme_devices_bdf() 665 bdfs = [b.replace(":", ".") for b in bdfs] 666 667 if req_num_disks: 668 if req_num_disks > len(bdfs): 669 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 670 sys.exit(1) 671 else: 672 bdfs = bdfs[0:req_num_disks] 673 674 for i, bdf in enumerate(bdfs): 675 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 676 677 self.log_print("SPDK Bdevs configuration:") 678 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 679 680 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 681 self.log_print("Adding subsystems to config") 682 if not req_num_disks: 683 req_num_disks = get_nvme_devices_count() 684 685 # Distribute bdevs between provided NICs 686 num_disks = range(0, req_num_disks) 687 if len(num_disks) == 1: 688 disks_per_ip = 1 689 else: 690 disks_per_ip = int(len(num_disks) / len(ips)) 691 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 692 693 # Create subsystems, add bdevs to namespaces, add listeners 694 for ip, chunk in zip(ips, disk_chunks): 695 for c in chunk: 696 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 697 serial = "SPDK00%s" % c 698 bdev_name = "Nvme%sn1" % c 699 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 700 allow_any_host=True, max_namespaces=8) 701 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 702 703 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 704 trtype=self.transport, 705 traddr=ip, 706 trsvcid="4420", 707 adrfam="ipv4") 708 709 self.log_print("SPDK NVMeOF subsystem configuration:") 710 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 711 712 def tgt_start(self): 713 if self.null_block: 714 self.subsys_no = 1 715 else: 716 self.subsys_no = get_nvme_devices_count() 717 self.log_print("Starting SPDK NVMeOF Target process") 718 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt --wait-for-rpc") 719 command = " ".join([nvmf_app_path, "-m", self.num_cores]) 720 proc = subprocess.Popen(command, shell=True) 721 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 722 723 with open(self.pid, "w") as fh: 724 fh.write(str(proc.pid)) 725 self.nvmf_proc = proc 726 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 727 self.log_print("Waiting for spdk to initilize...") 728 while True: 729 if os.path.exists("/var/tmp/spdk.sock"): 730 break 731 time.sleep(1) 732 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 733 734 if self.enable_zcopy: 735 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 736 enable_zerocopy_send=True) 737 self.log_print("Target socket options:") 738 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 739 740 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 741 742 rpc.framework_start_init(self.client) 743 self.spdk_tgt_configure() 744 745 def __del__(self): 746 if hasattr(self, "nvmf_proc"): 747 try: 748 self.nvmf_proc.terminate() 749 self.nvmf_proc.wait() 750 except Exception as e: 751 self.log_print(e) 752 self.nvmf_proc.kill() 753 self.nvmf_proc.communicate() 754 755 756class KernelInitiator(Initiator): 757 def __init__(self, name, username, password, mode, nic_ips, ip, transport, 758 cpus_allowed=None, cpus_allowed_policy="shared", 759 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 760 761 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 762 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 763 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 764 765 self.extra_params = "" 766 if kwargs["extra_params"]: 767 self.extra_params = kwargs["extra_params"] 768 769 def __del__(self): 770 self.ssh_connection.close() 771 772 def kernel_init_connect(self, address_list, subsys_no): 773 subsystems = self.discover_subsystems(address_list, subsys_no) 774 self.log_print("Below connection attempts may result in error messages, this is expected!") 775 for subsystem in subsystems: 776 self.log_print("Trying to connect %s %s %s" % subsystem) 777 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin, 778 self.transport, 779 *subsystem, 780 self.extra_params)) 781 time.sleep(2) 782 783 def kernel_init_disconnect(self, address_list, subsys_no): 784 subsystems = self.discover_subsystems(address_list, subsys_no) 785 for subsystem in subsystems: 786 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1])) 787 time.sleep(1) 788 789 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 790 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'") 791 nvme_list = [x for x in out.split("\n") if "nvme" in x] 792 793 filename_section = "" 794 nvme_per_split = int(len(nvme_list) / len(threads)) 795 remainder = len(nvme_list) % len(threads) 796 iterator = iter(nvme_list) 797 result = [] 798 for i in range(len(threads)): 799 result.append([]) 800 for j in range(nvme_per_split): 801 result[i].append(next(iterator)) 802 if remainder: 803 result[i].append(next(iterator)) 804 remainder -= 1 805 for i, r in enumerate(result): 806 header = "[filename%s]" % i 807 disks = "\n".join(["filename=%s" % x for x in r]) 808 job_section_qd = round((io_depth * len(r)) / num_jobs) 809 if job_section_qd == 0: 810 job_section_qd = 1 811 iodepth = "iodepth=%s" % job_section_qd 812 filename_section = "\n".join([filename_section, header, disks, iodepth]) 813 814 return filename_section 815 816 817class SPDKInitiator(Initiator): 818 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", 819 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared", 820 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs): 821 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport, 822 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy, 823 cpu_frequency=cpu_frequency, fio_bin=fio_bin) 824 825 self.num_cores = num_cores 826 827 def install_spdk(self, local_spdk_zip): 828 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 829 self.log_print("Copied sources zip from target") 830 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir) 831 832 self.log_print("Sources unpacked") 833 self.log_print("Using fio binary %s" % self.fio_bin) 834 self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;" 835 "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin))) 836 837 self.log_print("SPDK built") 838 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir) 839 840 def gen_spdk_bdev_conf(self, remote_subsystem_list): 841 bdev_cfg_section = { 842 "subsystems": [ 843 { 844 "subsystem": "bdev", 845 "config": [] 846 } 847 ] 848 } 849 850 for i, subsys in enumerate(remote_subsystem_list): 851 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 852 nvme_ctrl = { 853 "method": "bdev_nvme_attach_controller", 854 "params": { 855 "name": "Nvme{}".format(i), 856 "trtype": self.transport, 857 "traddr": sub_addr, 858 "trsvcid": sub_port, 859 "subnqn": sub_nqn, 860 "adrfam": "IPv4" 861 } 862 } 863 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 864 865 return json.dumps(bdev_cfg_section, indent=2) 866 867 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 868 filename_section = "" 869 if len(threads) >= len(subsystems): 870 threads = range(0, len(subsystems)) 871 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 872 nvme_per_split = int(len(subsystems) / len(threads)) 873 remainder = len(subsystems) % len(threads) 874 iterator = iter(filenames) 875 result = [] 876 for i in range(len(threads)): 877 result.append([]) 878 for j in range(nvme_per_split): 879 result[i].append(next(iterator)) 880 if remainder: 881 result[i].append(next(iterator)) 882 remainder -= 1 883 for i, r in enumerate(result): 884 header = "[filename%s]" % i 885 disks = "\n".join(["filename=%s" % x for x in r]) 886 job_section_qd = round((io_depth * len(r)) / num_jobs) 887 if job_section_qd == 0: 888 job_section_qd = 1 889 iodepth = "iodepth=%s" % job_section_qd 890 filename_section = "\n".join([filename_section, header, disks, iodepth]) 891 892 return filename_section 893 894 895if __name__ == "__main__": 896 spdk_zip_path = "/tmp/spdk.zip" 897 target_results_dir = "/tmp/results" 898 899 if (len(sys.argv) > 1): 900 config_file_path = sys.argv[1] 901 else: 902 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 903 config_file_path = os.path.join(script_full_dir, "config.json") 904 905 print("Using config file: %s" % config_file_path) 906 with open(config_file_path, "r") as config: 907 data = json.load(config) 908 909 initiators = [] 910 fio_cases = [] 911 912 for k, v in data.items(): 913 if "target" in k: 914 if data[k]["mode"] == "spdk": 915 target_obj = SPDKTarget(name=k, **data["general"], **v) 916 elif data[k]["mode"] == "kernel": 917 target_obj = KernelTarget(name=k, **data["general"], **v) 918 elif "initiator" in k: 919 if data[k]["mode"] == "spdk": 920 init_obj = SPDKInitiator(name=k, **data["general"], **v) 921 elif data[k]["mode"] == "kernel": 922 init_obj = KernelInitiator(name=k, **data["general"], **v) 923 initiators.append(init_obj) 924 elif "fio" in k: 925 fio_workloads = itertools.product(data[k]["bs"], 926 data[k]["qd"], 927 data[k]["rw"]) 928 929 fio_run_time = data[k]["run_time"] 930 fio_ramp_time = data[k]["ramp_time"] 931 fio_rw_mix_read = data[k]["rwmixread"] 932 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 933 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 934 else: 935 continue 936 937 # Copy and install SPDK on remote initiators 938 if "skip_spdk_install" not in data["general"]: 939 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 940 threads = [] 941 for i in initiators: 942 if i.mode == "spdk": 943 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 944 threads.append(t) 945 t.start() 946 for t in threads: 947 t.join() 948 949 target_obj.tgt_start() 950 951 # Poor mans threading 952 # Run FIO tests 953 for block_size, io_depth, rw in fio_workloads: 954 threads = [] 955 configs = [] 956 for i in initiators: 957 if i.mode == "kernel": 958 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no) 959 960 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 961 fio_num_jobs, fio_ramp_time, fio_run_time) 962 configs.append(cfg) 963 964 for i, cfg in zip(initiators, configs): 965 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 966 threads.append(t) 967 if target_obj.enable_sar: 968 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 969 sar_file_name = ".".join([sar_file_name, "txt"]) 970 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 971 threads.append(t) 972 973 if target_obj.enable_pcm: 974 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)]) 975 pcm_file_name = ".".join([pcm_file_name, "csv"]) 976 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,)) 977 threads.append(t) 978 979 if target_obj.enable_pcm_memory: 980 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)]) 981 pcm_file_name = ".".join([pcm_file_name, "csv"]) 982 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,)) 983 threads.append(t) 984 985 if target_obj.enable_bandwidth: 986 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 987 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 988 t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 989 threads.append(t) 990 991 if target_obj.enable_dpdk_memory: 992 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir)) 993 threads.append(t) 994 995 for t in threads: 996 t.start() 997 for t in threads: 998 t.join() 999 1000 for i in initiators: 1001 if i.mode == "kernel": 1002 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no) 1003 i.copy_result_files(target_results_dir) 1004 1005 target_obj.parse_results(target_results_dir) 1006