xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision 8d3f8fb818735d717730489685debac3c814d0ac)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 #define PATTERN_TYPES_STR "(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush, write_zeroes)"
28 #define BDEVPERF_MAX_COREMASK_STRING 64
29 
30 struct bdevperf_task {
31 	struct iovec			iov;
32 	struct bdevperf_job		*job;
33 	struct spdk_bdev_io		*bdev_io;
34 	void				*buf;
35 	void				*verify_buf;
36 	void				*md_buf;
37 	uint64_t			offset_blocks;
38 	struct bdevperf_task		*task_to_abort;
39 	enum spdk_bdev_io_type		io_type;
40 	TAILQ_ENTRY(bdevperf_task)	link;
41 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
42 };
43 
44 static char *g_workload_type = NULL;
45 static int g_io_size = 0;
46 /* initialize to invalid value so we can detect if user overrides it. */
47 static int g_rw_percentage = -1;
48 static bool g_verify = false;
49 static bool g_reset = false;
50 static bool g_continue_on_failure = false;
51 static bool g_abort = false;
52 static bool g_error_to_exit = false;
53 static int g_queue_depth = 0;
54 static uint64_t g_time_in_usec;
55 static bool g_summarize_performance = true;
56 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
57 static uint64_t g_show_performance_period_num = 0;
58 static uint64_t g_show_performance_ema_period = 0;
59 static int g_run_rc = 0;
60 static bool g_shutdown = false;
61 static uint64_t g_start_tsc;
62 static uint64_t g_shutdown_tsc;
63 static bool g_zcopy = false;
64 static struct spdk_thread *g_main_thread;
65 static int g_time_in_sec = 0;
66 static bool g_mix_specified = false;
67 static const char *g_job_bdev_name;
68 static bool g_wait_for_tests = false;
69 static struct spdk_jsonrpc_request *g_request = NULL;
70 static bool g_multithread_mode = false;
71 static int g_timeout_in_sec;
72 static struct spdk_conf *g_bdevperf_conf = NULL;
73 static const char *g_bdevperf_conf_file = NULL;
74 static double g_zipf_theta;
75 static bool g_random_map = false;
76 static bool g_unique_writes = false;
77 
78 static struct spdk_cpuset g_all_cpuset;
79 static struct spdk_poller *g_perf_timer = NULL;
80 
81 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
82 static void rpc_perform_tests_cb(void);
83 static int bdevperf_parse_arg(int ch, char *arg);
84 static int verify_test_params(void);
85 static void bdevperf_usage(void);
86 
87 static uint32_t g_bdev_count = 0;
88 static uint32_t g_latency_display_level;
89 
90 static bool g_one_thread_per_lcore = false;
91 
92 static const double g_latency_cutoffs[] = {
93 	0.01,
94 	0.10,
95 	0.25,
96 	0.50,
97 	0.75,
98 	0.90,
99 	0.95,
100 	0.98,
101 	0.99,
102 	0.995,
103 	0.999,
104 	0.9999,
105 	0.99999,
106 	0.999999,
107 	0.9999999,
108 	-1,
109 };
110 
111 static const char *g_rpc_log_file_name = NULL;
112 static FILE *g_rpc_log_file = NULL;
113 
114 struct latency_info {
115 	uint64_t	min;
116 	uint64_t	max;
117 	uint64_t	total;
118 };
119 
120 
121 enum job_config_rw {
122 	JOB_CONFIG_RW_READ = 0,
123 	JOB_CONFIG_RW_WRITE,
124 	JOB_CONFIG_RW_RANDREAD,
125 	JOB_CONFIG_RW_RANDWRITE,
126 	JOB_CONFIG_RW_RW,
127 	JOB_CONFIG_RW_RANDRW,
128 	JOB_CONFIG_RW_VERIFY,
129 	JOB_CONFIG_RW_RESET,
130 	JOB_CONFIG_RW_UNMAP,
131 	JOB_CONFIG_RW_FLUSH,
132 	JOB_CONFIG_RW_WRITE_ZEROES,
133 };
134 
135 struct bdevperf_job {
136 	char				*name;
137 	struct spdk_bdev		*bdev;
138 	struct spdk_bdev_desc		*bdev_desc;
139 	struct spdk_io_channel		*ch;
140 	TAILQ_ENTRY(bdevperf_job)	link;
141 	struct spdk_thread		*thread;
142 
143 	enum job_config_rw		workload_type;
144 	int				io_size;
145 	int				rw_percentage;
146 	bool				is_random;
147 	bool				verify;
148 	bool				reset;
149 	bool				continue_on_failure;
150 	bool				unmap;
151 	bool				write_zeroes;
152 	bool				flush;
153 	bool				abort;
154 	int				queue_depth;
155 	unsigned int			seed;
156 
157 	uint64_t			io_completed;
158 	uint64_t			io_failed;
159 	uint64_t			io_timeout;
160 	uint64_t			prev_io_completed;
161 	double				ema_io_per_second;
162 	int				current_queue_depth;
163 	uint64_t			size_in_ios;
164 	uint64_t			ios_base;
165 	uint64_t			offset_in_ios;
166 	uint64_t			io_size_blocks;
167 	uint64_t			buf_size;
168 	uint32_t			dif_check_flags;
169 	bool				is_draining;
170 	struct spdk_poller		*run_timer;
171 	struct spdk_poller		*reset_timer;
172 	struct spdk_bit_array		*outstanding;
173 	struct spdk_zipf		*zipf;
174 	TAILQ_HEAD(, bdevperf_task)	task_list;
175 	uint64_t			run_time_in_usec;
176 
177 	/* keep channel's histogram data before being destroyed */
178 	struct spdk_histogram_data	*histogram;
179 	struct spdk_bit_array		*random_map;
180 
181 	/* counter used for generating unique write data (-U option) */
182 	uint32_t			write_io_count;
183 };
184 
185 struct spdk_bdevperf {
186 	TAILQ_HEAD(, bdevperf_job)	jobs;
187 	uint32_t			running_jobs;
188 };
189 
190 static struct spdk_bdevperf g_bdevperf = {
191 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
192 	.running_jobs = 0,
193 };
194 
195 /* Storing values from a section of job config file */
196 struct job_config {
197 	const char			*name;
198 	const char			*filename;
199 	struct spdk_cpuset		cpumask;
200 	int				bs;
201 	int				iodepth;
202 	int				rwmixread;
203 	uint32_t			lcore;
204 	int64_t				offset;
205 	uint64_t			length;
206 	enum job_config_rw		rw;
207 	TAILQ_ENTRY(job_config)	link;
208 };
209 
210 TAILQ_HEAD(, job_config) job_config_list
211 	= TAILQ_HEAD_INITIALIZER(job_config_list);
212 
213 static bool g_performance_dump_active = false;
214 
215 struct bdevperf_stats {
216 	uint64_t			io_time_in_usec;
217 	double				total_io_per_second;
218 	double				total_mb_per_second;
219 	double				total_failed_per_second;
220 	double				total_timeout_per_second;
221 	double				min_latency;
222 	double				max_latency;
223 	double				average_latency;
224 	uint64_t			total_io_completed;
225 	uint64_t			total_tsc;
226 };
227 
228 struct bdevperf_aggregate_stats {
229 	struct bdevperf_job		*current_job;
230 	struct bdevperf_stats		total;
231 };
232 
233 static struct bdevperf_aggregate_stats g_stats = {.total.min_latency = (double)UINT64_MAX};
234 
235 struct lcore_thread {
236 	struct spdk_thread		*thread;
237 	uint32_t			lcore;
238 	TAILQ_ENTRY(lcore_thread)	link;
239 };
240 
241 TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
242 	= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
243 
244 
245 static char *
246 parse_workload_type(enum job_config_rw ret)
247 {
248 	switch (ret) {
249 	case JOB_CONFIG_RW_READ:
250 		return "read";
251 	case JOB_CONFIG_RW_RANDREAD:
252 		return "randread";
253 	case JOB_CONFIG_RW_WRITE:
254 		return "write";
255 	case JOB_CONFIG_RW_RANDWRITE:
256 		return "randwrite";
257 	case JOB_CONFIG_RW_VERIFY:
258 		return "verify";
259 	case JOB_CONFIG_RW_RESET:
260 		return "reset";
261 	case JOB_CONFIG_RW_UNMAP:
262 		return "unmap";
263 	case JOB_CONFIG_RW_WRITE_ZEROES:
264 		return "write_zeroes";
265 	case JOB_CONFIG_RW_FLUSH:
266 		return "flush";
267 	case JOB_CONFIG_RW_RW:
268 		return "rw";
269 	case JOB_CONFIG_RW_RANDRW:
270 		return "randrw";
271 	default:
272 		fprintf(stderr, "wrong workload_type code\n");
273 	}
274 
275 	return NULL;
276 }
277 
278 /*
279  * Cumulative Moving Average (CMA): average of all data up to current
280  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
281  * Simple Moving Average (SMA): unweighted mean of the previous n data
282  *
283  * Bdevperf supports CMA and EMA.
284  */
285 static double
286 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
287 {
288 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
289 }
290 
291 static double
292 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
293 {
294 	double io_completed, io_per_second;
295 
296 	io_completed = job->io_completed;
297 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
298 			/ g_show_performance_period_in_usec;
299 	job->prev_io_completed = io_completed;
300 
301 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
302 				  / (ema_period + 1);
303 	return job->ema_io_per_second;
304 }
305 
306 static void
307 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
308 		uint64_t total, uint64_t so_far)
309 {
310 	struct latency_info *latency_info = ctx;
311 
312 	if (count == 0) {
313 		return;
314 	}
315 
316 	latency_info->total += (start + end) / 2 * count;
317 
318 	if (so_far == count) {
319 		latency_info->min = start;
320 	}
321 
322 	if (so_far == total) {
323 		latency_info->max = end;
324 	}
325 }
326 
327 static void
328 bdevperf_job_stats_accumulate(struct bdevperf_stats *aggr_stats,
329 			      struct bdevperf_stats *job_stats)
330 {
331 	aggr_stats->total_io_per_second += job_stats->total_io_per_second;
332 	aggr_stats->total_mb_per_second += job_stats->total_mb_per_second;
333 	aggr_stats->total_failed_per_second += job_stats->total_failed_per_second;
334 	aggr_stats->total_timeout_per_second += job_stats->total_timeout_per_second;
335 	aggr_stats->total_io_completed += job_stats->total_io_completed;
336 	aggr_stats->total_tsc += job_stats->total_tsc;
337 
338 	if (job_stats->min_latency < aggr_stats->min_latency) {
339 		aggr_stats->min_latency = job_stats->min_latency;
340 	}
341 	if (job_stats->max_latency > aggr_stats->max_latency) {
342 		aggr_stats->max_latency = job_stats->max_latency;
343 	}
344 }
345 
346 static void
347 bdevperf_job_get_stats(struct bdevperf_job *job,
348 		       struct bdevperf_stats *job_stats,
349 		       uint64_t time_in_usec,
350 		       uint64_t ema_period)
351 {
352 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
353 	double average_latency = 0.0, min_latency, max_latency;
354 	uint64_t tsc_rate;
355 	uint64_t total_io;
356 	struct latency_info latency_info = {};
357 
358 	if (ema_period == 0) {
359 		io_per_second = get_cma_io_per_second(job, time_in_usec);
360 	} else {
361 		io_per_second = get_ema_io_per_second(job, ema_period);
362 	}
363 	tsc_rate = spdk_get_ticks_hz();
364 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
365 
366 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
367 
368 	total_io = job->io_completed + job->io_failed;
369 	if (total_io != 0) {
370 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
371 	}
372 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
373 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
374 
375 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
376 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
377 
378 	job_stats->total_io_per_second = io_per_second;
379 	job_stats->total_mb_per_second = mb_per_second;
380 	job_stats->total_failed_per_second = failed_per_second;
381 	job_stats->total_timeout_per_second = timeout_per_second;
382 	job_stats->total_io_completed = total_io;
383 	job_stats->total_tsc = latency_info.total;
384 	job_stats->average_latency = average_latency;
385 	job_stats->min_latency = min_latency;
386 	job_stats->max_latency = max_latency;
387 	job_stats->io_time_in_usec = time_in_usec;
388 }
389 
390 static void
391 performance_dump_job_stdout(struct bdevperf_job *job,
392 			    struct bdevperf_stats *job_stats)
393 {
394 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
395 		printf("Job: %s (Core Mask 0x%s, workload: %s, percentage: %d, depth: %d, IO size: %d)\n",
396 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
397 		       parse_workload_type(job->workload_type), job->rw_percentage,
398 		       job->queue_depth, job->io_size);
399 	} else {
400 		printf("Job: %s (Core Mask 0x%s, workload: %s, depth: %d, IO size: %d)\n",
401 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
402 		       parse_workload_type(job->workload_type), job->queue_depth, job->io_size);
403 	}
404 
405 
406 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
407 		printf("Job: %s ended in about %.2f seconds with error\n",
408 		       job->name, (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
409 	}
410 	if (job->verify) {
411 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
412 		       job->ios_base, job->size_in_ios);
413 	}
414 
415 	printf("\t %-20s: %10.2f %10.2f %10.2f",
416 	       job->name,
417 	       (float)job_stats->io_time_in_usec / SPDK_SEC_TO_USEC,
418 	       job_stats->total_io_per_second,
419 	       job_stats->total_mb_per_second);
420 	printf(" %10.2f %8.2f",
421 	       job_stats->total_failed_per_second,
422 	       job_stats->total_timeout_per_second);
423 	printf(" %10.2f %10.2f %10.2f\n",
424 	       job_stats->average_latency,
425 	       job_stats->min_latency,
426 	       job_stats->max_latency);
427 }
428 
429 static void
430 performance_dump_job_json(struct bdevperf_job *job,
431 			  struct spdk_json_write_ctx *w,
432 			  struct bdevperf_stats *job_stats)
433 {
434 	char core_mask_string[BDEVPERF_MAX_COREMASK_STRING] = {0};
435 
436 	spdk_json_write_named_string(w, "job", job->name);
437 	snprintf(core_mask_string, BDEVPERF_MAX_COREMASK_STRING,
438 		 "0x%s", spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
439 	spdk_json_write_named_string(w, "core_mask", core_mask_string);
440 	spdk_json_write_named_string(w, "workload", parse_workload_type(job->workload_type));
441 
442 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
443 		spdk_json_write_named_uint32(w, "percentage", job->rw_percentage);
444 	}
445 
446 	if (g_shutdown) {
447 		spdk_json_write_named_string(w, "status", "terminated");
448 	} else if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
449 		spdk_json_write_named_string(w, "status", "failed");
450 	} else {
451 		spdk_json_write_named_string(w, "status", "finished");
452 	}
453 
454 	if (job->verify) {
455 		spdk_json_write_named_object_begin(w, "verify_range");
456 		spdk_json_write_named_uint64(w, "start", job->ios_base);
457 		spdk_json_write_named_uint64(w, "length", job->size_in_ios);
458 		spdk_json_write_object_end(w);
459 	}
460 
461 	spdk_json_write_named_uint32(w, "queue_depth", job->queue_depth);
462 	spdk_json_write_named_uint32(w, "io_size", job->io_size);
463 	spdk_json_write_named_double(w, "runtime", (double)job_stats->io_time_in_usec / SPDK_SEC_TO_USEC);
464 	spdk_json_write_named_double(w, "iops", job_stats->total_io_per_second);
465 	spdk_json_write_named_double(w, "mibps", job_stats->total_mb_per_second);
466 	spdk_json_write_named_uint64(w, "io_failed", job->io_failed);
467 	spdk_json_write_named_uint64(w, "io_timeout", job->io_timeout);
468 	spdk_json_write_named_double(w, "avg_latency_us", job_stats->average_latency);
469 	spdk_json_write_named_double(w, "min_latency_us", job_stats->min_latency);
470 	spdk_json_write_named_double(w, "max_latency_us", job_stats->max_latency);
471 }
472 
473 static void
474 generate_data(struct bdevperf_job *job, void *buf, void *md_buf, bool unique)
475 {
476 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
477 	int buf_len = job->buf_size;
478 	int block_size = spdk_bdev_get_block_size(job->bdev);
479 	int md_size = spdk_bdev_get_md_size(job->bdev);
480 	int num_blocks = job->io_size_blocks;
481 
482 	if (buf_len < num_blocks * block_size) {
483 		return;
484 	}
485 
486 	if (md_buf == NULL) {
487 		data_block_size = block_size - md_size;
488 		md_buf = (char *)buf + data_block_size;
489 		md_offset = block_size;
490 	} else {
491 		data_block_size = block_size;
492 		md_offset = md_size;
493 	}
494 
495 	if (unique) {
496 		uint64_t io_count = job->write_io_count++;
497 		unsigned int i;
498 
499 		assert(md_size == 0 || md_size >= (int)sizeof(uint64_t));
500 
501 		while (offset_blocks < num_blocks) {
502 			inner_offset = 0;
503 			while (inner_offset < data_block_size) {
504 				*(uint64_t *)buf = (io_count << 32) | (offset_blocks + inner_offset);
505 				inner_offset += sizeof(uint64_t);
506 				buf += sizeof(uint64_t);
507 			}
508 			for (i = 0; i < md_size / sizeof(uint64_t); i++) {
509 				((uint64_t *)md_buf)[i] = (io_count << 32) | offset_blocks;
510 			}
511 			md_buf += md_offset;
512 			offset_blocks++;
513 		}
514 		return;
515 	}
516 
517 	while (offset_blocks < num_blocks) {
518 		inner_offset = 0;
519 		while (inner_offset < data_block_size) {
520 			*(uint32_t *)buf = offset_blocks + inner_offset;
521 			inner_offset += sizeof(uint32_t);
522 			buf += sizeof(uint32_t);
523 		}
524 		memset(md_buf, offset_blocks, md_size);
525 		md_buf += md_offset;
526 		offset_blocks++;
527 	}
528 }
529 
530 static bool
531 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
532 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
533 {
534 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
535 		return false;
536 	}
537 
538 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
539 
540 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
541 
542 	if (wr_md_buf != NULL) {
543 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
544 	}
545 
546 	return true;
547 }
548 
549 static bool
550 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
551 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
552 {
553 	int offset_blocks = 0, md_offset, data_block_size;
554 
555 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
556 		return false;
557 	}
558 
559 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
560 
561 	if (wr_md_buf == NULL) {
562 		data_block_size = block_size - md_size;
563 		wr_md_buf = (char *)wr_buf + data_block_size;
564 		rd_md_buf = (char *)rd_buf + data_block_size;
565 		md_offset = block_size;
566 	} else {
567 		data_block_size = block_size;
568 		md_offset = md_size;
569 	}
570 
571 	while (offset_blocks < num_blocks) {
572 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
573 			printf("data_block_size %d, num_blocks %d, offset %d\n", data_block_size, num_blocks,
574 			       offset_blocks);
575 			spdk_log_dump(stdout, "rd_buf", rd_buf, data_block_size);
576 			spdk_log_dump(stdout, "wr_buf", wr_buf, data_block_size);
577 			return false;
578 		}
579 
580 		wr_buf += block_size;
581 		rd_buf += block_size;
582 
583 		if (md_check) {
584 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
585 				printf("md_size %d, num_blocks %d, offset %d\n", md_size, num_blocks, offset_blocks);
586 				spdk_log_dump(stdout, "rd_md_buf", rd_md_buf, md_size);
587 				spdk_log_dump(stdout, "wr_md_buf", wr_md_buf, md_size);
588 				return false;
589 			}
590 
591 			wr_md_buf += md_offset;
592 			rd_md_buf += md_offset;
593 		}
594 
595 		offset_blocks++;
596 	}
597 
598 	return true;
599 }
600 
601 static void
602 free_job_config(void)
603 {
604 	struct job_config *config, *tmp;
605 
606 	spdk_conf_free(g_bdevperf_conf);
607 	g_bdevperf_conf = NULL;
608 
609 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
610 		TAILQ_REMOVE(&job_config_list, config, link);
611 		free(config);
612 	}
613 }
614 
615 static void
616 bdevperf_job_free(struct bdevperf_job *job)
617 {
618 	spdk_histogram_data_free(job->histogram);
619 	spdk_bit_array_free(&job->outstanding);
620 	spdk_bit_array_free(&job->random_map);
621 	spdk_zipf_free(&job->zipf);
622 	free(job->name);
623 	free(job);
624 }
625 
626 static void
627 job_thread_exit(void *ctx)
628 {
629 	spdk_thread_exit(spdk_get_thread());
630 }
631 
632 static void
633 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
634 	     uint64_t total, uint64_t so_far)
635 {
636 	double so_far_pct;
637 	double **cutoff = ctx;
638 	uint64_t tsc_rate;
639 
640 	if (count == 0) {
641 		return;
642 	}
643 
644 	tsc_rate = spdk_get_ticks_hz();
645 	so_far_pct = (double)so_far / total;
646 	while (so_far_pct >= **cutoff && **cutoff > 0) {
647 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
648 		(*cutoff)++;
649 	}
650 }
651 
652 static void
653 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
654 	     uint64_t total, uint64_t so_far)
655 {
656 	double so_far_pct;
657 	uint64_t tsc_rate;
658 
659 	if (count == 0) {
660 		return;
661 	}
662 
663 	tsc_rate = spdk_get_ticks_hz();
664 	so_far_pct = (double)so_far * 100 / total;
665 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
666 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
667 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
668 	       so_far_pct, count);
669 }
670 
671 static void
672 bdevperf_test_done(void *ctx)
673 {
674 	struct bdevperf_job *job, *jtmp;
675 	struct bdevperf_task *task, *ttmp;
676 	struct lcore_thread *lthread, *lttmp;
677 	double average_latency = 0.0;
678 	uint64_t time_in_usec;
679 	int rc;
680 	struct spdk_json_write_ctx *w = NULL;
681 	struct bdevperf_stats job_stats = {0};
682 	struct spdk_cpuset cpu_mask;
683 
684 	if (g_time_in_usec) {
685 		g_stats.total.io_time_in_usec = g_time_in_usec;
686 
687 		if (!g_run_rc && g_performance_dump_active) {
688 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
689 			return;
690 		}
691 	}
692 
693 	spdk_poller_unregister(&g_perf_timer);
694 
695 	if (g_shutdown) {
696 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
697 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
698 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
699 		printf("Received shutdown signal, test time was about %.6f seconds\n",
700 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
701 	}
702 	/* Send RPC response if g_run_rc indicate success, or shutdown request was sent to bdevperf.
703 	 * rpc_perform_tests_cb will send error response in case of error.
704 	 */
705 	if ((g_run_rc == 0 || g_shutdown) && g_request) {
706 		w = spdk_jsonrpc_begin_result(g_request);
707 		spdk_json_write_object_begin(w);
708 		spdk_json_write_named_array_begin(w, "results");
709 	}
710 
711 	printf("\n%*s\n", 107, "Latency(us)");
712 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
713 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
714 
715 
716 	spdk_cpuset_zero(&cpu_mask);
717 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
718 		spdk_cpuset_or(&cpu_mask, spdk_thread_get_cpumask(job->thread));
719 		memset(&job_stats, 0, sizeof(job_stats));
720 		bdevperf_job_get_stats(job, &job_stats, job->run_time_in_usec, 0);
721 		bdevperf_job_stats_accumulate(&g_stats.total, &job_stats);
722 		performance_dump_job_stdout(job, &job_stats);
723 		if (w) {
724 			spdk_json_write_object_begin(w);
725 			performance_dump_job_json(job, w, &job_stats);
726 			spdk_json_write_object_end(w);
727 		}
728 	}
729 
730 	if (w) {
731 		spdk_json_write_array_end(w);
732 		spdk_json_write_named_uint32(w, "core_count", spdk_cpuset_count(&cpu_mask));
733 		spdk_json_write_object_end(w);
734 		spdk_jsonrpc_end_result(g_request, w);
735 	}
736 	printf("\r =================================================================================="
737 	       "=================================\n");
738 	printf("\r %-28s: %10s %10.2f %10.2f",
739 	       "Total", "", g_stats.total.total_io_per_second, g_stats.total.total_mb_per_second);
740 	printf(" %10.2f %8.2f",
741 	       g_stats.total.total_failed_per_second, g_stats.total.total_timeout_per_second);
742 
743 	if (g_stats.total.total_io_completed != 0) {
744 		average_latency = ((double)g_stats.total.total_tsc / g_stats.total.total_io_completed) *
745 				  SPDK_SEC_TO_USEC /
746 				  spdk_get_ticks_hz();
747 	}
748 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.total.min_latency,
749 	       g_stats.total.max_latency);
750 
751 	if (g_latency_display_level == 0 || g_stats.total.total_io_completed == 0) {
752 		goto clean;
753 	}
754 
755 	printf("\n Latency summary\n");
756 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
757 		printf("\r =============================================\n");
758 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
759 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
760 
761 		const double *cutoff = g_latency_cutoffs;
762 
763 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
764 
765 		printf("\n");
766 	}
767 
768 	if (g_latency_display_level == 1) {
769 		goto clean;
770 	}
771 
772 	printf("\r Latency histogram\n");
773 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
774 		printf("\r =============================================\n");
775 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
776 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
777 
778 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
779 		printf("\n");
780 	}
781 
782 clean:
783 	fflush(stdout);
784 
785 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
786 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
787 
788 		if (!g_one_thread_per_lcore) {
789 			spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
790 		}
791 
792 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
793 			TAILQ_REMOVE(&job->task_list, task, link);
794 			spdk_free(task->buf);
795 			spdk_free(task->verify_buf);
796 			spdk_free(task->md_buf);
797 			free(task);
798 		}
799 
800 		bdevperf_job_free(job);
801 	}
802 
803 	if (g_one_thread_per_lcore) {
804 		TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
805 			TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
806 			spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
807 			free(lthread);
808 		}
809 	}
810 
811 	if (g_bdevperf_conf == NULL) {
812 		free_job_config();
813 	}
814 
815 	rc = g_run_rc;
816 	if (g_request && !g_shutdown) {
817 		rpc_perform_tests_cb();
818 		if (rc != 0) {
819 			spdk_app_stop(rc);
820 		}
821 	} else {
822 		spdk_app_stop(rc);
823 	}
824 }
825 
826 static void
827 bdevperf_job_end(void *ctx)
828 {
829 	assert(g_main_thread == spdk_get_thread());
830 
831 	if (--g_bdevperf.running_jobs == 0) {
832 		bdevperf_test_done(NULL);
833 	}
834 }
835 
836 static void
837 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
838 {
839 	struct spdk_histogram_data *job_hist = cb_arg;
840 
841 	if (status == 0) {
842 		spdk_histogram_data_merge(job_hist, histogram);
843 	}
844 }
845 
846 static void
847 bdevperf_job_empty(struct bdevperf_job *job)
848 {
849 	uint64_t end_tsc = 0;
850 
851 	end_tsc = spdk_get_ticks() - g_start_tsc;
852 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
853 	/* keep histogram info before channel is destroyed */
854 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
855 					job->histogram);
856 	spdk_put_io_channel(job->ch);
857 	spdk_bdev_close(job->bdev_desc);
858 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
859 }
860 
861 static void
862 bdevperf_end_task(struct bdevperf_task *task)
863 {
864 	struct bdevperf_job     *job = task->job;
865 
866 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
867 	if (job->is_draining) {
868 		if (job->current_queue_depth == 0) {
869 			bdevperf_job_empty(job);
870 		}
871 	}
872 }
873 
874 static void
875 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
876 {
877 	struct bdevperf_job	*job = task->job;
878 
879 	task->bdev_io_wait.bdev = job->bdev;
880 	task->bdev_io_wait.cb_fn = cb_fn;
881 	task->bdev_io_wait.cb_arg = task;
882 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
883 }
884 
885 static int
886 bdevperf_job_drain(void *ctx)
887 {
888 	struct bdevperf_job *job = ctx;
889 
890 	spdk_poller_unregister(&job->run_timer);
891 	if (job->reset) {
892 		spdk_poller_unregister(&job->reset_timer);
893 	}
894 
895 	job->is_draining = true;
896 
897 	return -1;
898 }
899 
900 static int
901 bdevperf_job_drain_timer(void *ctx)
902 {
903 	struct bdevperf_job *job = ctx;
904 
905 	bdevperf_job_drain(ctx);
906 	if (job->current_queue_depth == 0) {
907 		bdevperf_job_empty(job);
908 	}
909 
910 	return SPDK_POLLER_BUSY;
911 }
912 
913 static void
914 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
915 {
916 	struct bdevperf_task	*task = cb_arg;
917 	struct bdevperf_job	*job = task->job;
918 
919 	job->current_queue_depth--;
920 
921 	if (success) {
922 		job->io_completed++;
923 	} else {
924 		job->io_failed++;
925 		if (!job->continue_on_failure) {
926 			bdevperf_job_drain(job);
927 			g_run_rc = -1;
928 		}
929 	}
930 
931 	spdk_bdev_free_io(bdev_io);
932 	bdevperf_end_task(task);
933 }
934 
935 static int
936 bdevperf_verify_dif(struct bdevperf_task *task)
937 {
938 	struct bdevperf_job	*job = task->job;
939 	struct spdk_bdev	*bdev = job->bdev;
940 	struct spdk_dif_ctx	dif_ctx;
941 	struct spdk_dif_error	err_blk = {};
942 	int			rc;
943 	struct spdk_dif_ctx_init_ext_opts dif_opts;
944 
945 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
946 	dif_opts.dif_pi_format = spdk_bdev_get_dif_pi_format(bdev);
947 	rc = spdk_dif_ctx_init(&dif_ctx,
948 			       spdk_bdev_get_block_size(bdev),
949 			       spdk_bdev_get_md_size(bdev),
950 			       spdk_bdev_is_md_interleaved(bdev),
951 			       spdk_bdev_is_dif_head_of_md(bdev),
952 			       spdk_bdev_get_dif_type(bdev),
953 			       job->dif_check_flags,
954 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
955 	if (rc != 0) {
956 		fprintf(stderr, "Initialization of DIF context failed\n");
957 		return rc;
958 	}
959 
960 	if (spdk_bdev_is_md_interleaved(bdev)) {
961 		rc = spdk_dif_verify(&task->iov, 1, job->io_size_blocks, &dif_ctx, &err_blk);
962 	} else {
963 		struct iovec md_iov = {
964 			.iov_base	= task->md_buf,
965 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
966 		};
967 
968 		rc = spdk_dix_verify(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
969 	}
970 
971 	if (rc != 0) {
972 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
973 			err_blk.err_type, err_blk.err_offset);
974 	}
975 
976 	return rc;
977 }
978 
979 static void
980 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
981 {
982 	struct bdevperf_job	*job;
983 	struct bdevperf_task	*task = cb_arg;
984 	bool			md_check;
985 	uint64_t		offset_in_ios;
986 	int			rc;
987 
988 	job = task->job;
989 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
990 
991 	if (g_error_to_exit == true) {
992 		bdevperf_job_drain(job);
993 	} else if (!success) {
994 		if (!job->reset && !job->continue_on_failure) {
995 			bdevperf_job_drain(job);
996 			g_run_rc = -1;
997 			g_error_to_exit = true;
998 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
999 			       task->offset_blocks, job->name);
1000 		}
1001 	} else if (job->verify || job->reset) {
1002 		if (!verify_data(task->buf, job->buf_size,
1003 				 task->iov.iov_base, job->buf_size,
1004 				 spdk_bdev_get_block_size(job->bdev),
1005 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
1006 				 spdk_bdev_get_md_size(job->bdev),
1007 				 job->io_size_blocks, md_check)) {
1008 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
1009 			bdevperf_job_drain(job);
1010 			g_run_rc = -1;
1011 		}
1012 	} else if (job->dif_check_flags != 0) {
1013 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
1014 			rc = bdevperf_verify_dif(task);
1015 			if (rc != 0) {
1016 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
1017 				       task->offset_blocks, job->name);
1018 
1019 				success = false;
1020 				if (!job->reset && !job->continue_on_failure) {
1021 					bdevperf_job_drain(job);
1022 					g_run_rc = -1;
1023 					g_error_to_exit = true;
1024 				}
1025 			}
1026 		}
1027 	}
1028 
1029 	job->current_queue_depth--;
1030 
1031 	if (success) {
1032 		job->io_completed++;
1033 	} else {
1034 		job->io_failed++;
1035 	}
1036 
1037 	if (job->verify) {
1038 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1039 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1040 
1041 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1042 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
1043 	}
1044 
1045 	spdk_bdev_free_io(bdev_io);
1046 
1047 	/*
1048 	 * is_draining indicates when time has expired for the test run
1049 	 * and we are just waiting for the previously submitted I/O
1050 	 * to complete.  In this case, do not submit a new I/O to replace
1051 	 * the one just completed.
1052 	 */
1053 	if (!job->is_draining) {
1054 		bdevperf_submit_single(job, task);
1055 	} else {
1056 		bdevperf_end_task(task);
1057 	}
1058 }
1059 
1060 static void
1061 bdevperf_verify_submit_read(void *cb_arg)
1062 {
1063 	struct bdevperf_job	*job;
1064 	struct bdevperf_task	*task = cb_arg;
1065 	int			rc;
1066 
1067 	job = task->job;
1068 
1069 	task->iov.iov_base = task->verify_buf;
1070 	task->iov.iov_len = job->buf_size;
1071 
1072 	/* Read the data back in */
1073 	rc = spdk_bdev_readv_blocks_with_md(job->bdev_desc, job->ch, &task->iov, 1, NULL,
1074 					    task->offset_blocks, job->io_size_blocks,
1075 					    bdevperf_complete, task);
1076 
1077 	if (rc == -ENOMEM) {
1078 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
1079 	} else if (rc != 0) {
1080 		printf("Failed to submit read: %d\n", rc);
1081 		bdevperf_job_drain(job);
1082 		g_run_rc = rc;
1083 	}
1084 }
1085 
1086 static void
1087 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
1088 			       void *cb_arg)
1089 {
1090 	if (success) {
1091 		spdk_bdev_free_io(bdev_io);
1092 		bdevperf_verify_submit_read(cb_arg);
1093 	} else {
1094 		bdevperf_complete(bdev_io, success, cb_arg);
1095 	}
1096 }
1097 
1098 static void
1099 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1100 {
1101 	if (!success) {
1102 		bdevperf_complete(bdev_io, success, cb_arg);
1103 		return;
1104 	}
1105 
1106 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
1107 }
1108 
1109 static int
1110 bdevperf_generate_dif(struct bdevperf_task *task)
1111 {
1112 	struct bdevperf_job	*job = task->job;
1113 	struct spdk_bdev	*bdev = job->bdev;
1114 	struct spdk_dif_ctx	dif_ctx;
1115 	int			rc;
1116 	struct spdk_dif_ctx_init_ext_opts dif_opts;
1117 
1118 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
1119 	dif_opts.dif_pi_format = spdk_bdev_get_dif_pi_format(bdev);
1120 	rc = spdk_dif_ctx_init(&dif_ctx,
1121 			       spdk_bdev_get_block_size(bdev),
1122 			       spdk_bdev_get_md_size(bdev),
1123 			       spdk_bdev_is_md_interleaved(bdev),
1124 			       spdk_bdev_is_dif_head_of_md(bdev),
1125 			       spdk_bdev_get_dif_type(bdev),
1126 			       job->dif_check_flags,
1127 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
1128 	if (rc != 0) {
1129 		fprintf(stderr, "Initialization of DIF context failed\n");
1130 		return rc;
1131 	}
1132 
1133 	if (spdk_bdev_is_md_interleaved(bdev)) {
1134 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
1135 	} else {
1136 		struct iovec md_iov = {
1137 			.iov_base	= task->md_buf,
1138 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
1139 		};
1140 
1141 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
1142 	}
1143 
1144 	if (rc != 0) {
1145 		fprintf(stderr, "Generation of DIF/DIX failed\n");
1146 	}
1147 
1148 	return rc;
1149 }
1150 
1151 static void
1152 bdevperf_submit_task(void *arg)
1153 {
1154 	struct bdevperf_task	*task = arg;
1155 	struct bdevperf_job	*job = task->job;
1156 	struct spdk_bdev_desc	*desc;
1157 	struct spdk_io_channel	*ch;
1158 	spdk_bdev_io_completion_cb cb_fn;
1159 	uint64_t		offset_in_ios;
1160 	int			rc = 0;
1161 
1162 	desc = job->bdev_desc;
1163 	ch = job->ch;
1164 
1165 	switch (task->io_type) {
1166 	case SPDK_BDEV_IO_TYPE_WRITE:
1167 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
1168 			rc = bdevperf_generate_dif(task);
1169 		}
1170 		if (rc == 0) {
1171 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
1172 
1173 			if (g_zcopy) {
1174 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
1175 				return;
1176 			} else {
1177 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
1178 								     task->md_buf,
1179 								     task->offset_blocks,
1180 								     job->io_size_blocks,
1181 								     cb_fn, task);
1182 			}
1183 		}
1184 		break;
1185 	case SPDK_BDEV_IO_TYPE_FLUSH:
1186 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
1187 					    job->io_size_blocks, bdevperf_complete, task);
1188 		break;
1189 	case SPDK_BDEV_IO_TYPE_UNMAP:
1190 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
1191 					    job->io_size_blocks, bdevperf_complete, task);
1192 		break;
1193 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1194 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
1195 						   job->io_size_blocks, bdevperf_complete, task);
1196 		break;
1197 	case SPDK_BDEV_IO_TYPE_READ:
1198 		if (g_zcopy) {
1199 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
1200 						   true, bdevperf_zcopy_populate_complete, task);
1201 		} else {
1202 			rc = spdk_bdev_readv_blocks_with_md(desc, ch, &task->iov, 1,
1203 							    task->md_buf,
1204 							    task->offset_blocks,
1205 							    job->io_size_blocks,
1206 							    bdevperf_complete, task);
1207 		}
1208 		break;
1209 	case SPDK_BDEV_IO_TYPE_ABORT:
1210 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
1211 		break;
1212 	default:
1213 		assert(false);
1214 		rc = -EINVAL;
1215 		break;
1216 	}
1217 
1218 	if (rc == -ENOMEM) {
1219 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1220 		return;
1221 	} else if (rc != 0) {
1222 		printf("Failed to submit bdev_io: %d\n", rc);
1223 		if (job->verify) {
1224 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1225 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1226 
1227 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1228 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1229 		}
1230 		bdevperf_job_drain(job);
1231 		g_run_rc = rc;
1232 		return;
1233 	}
1234 
1235 	job->current_queue_depth++;
1236 }
1237 
1238 static void
1239 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1240 {
1241 	struct bdevperf_task	*task = cb_arg;
1242 	struct bdevperf_job	*job = task->job;
1243 	struct iovec		*iovs;
1244 	int			iovcnt;
1245 
1246 	if (!success) {
1247 		bdevperf_job_drain(job);
1248 		g_run_rc = -1;
1249 		return;
1250 	}
1251 
1252 	task->bdev_io = bdev_io;
1253 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1254 
1255 	if (job->verify || job->reset) {
1256 		/* When job->verify or job->reset is enabled, task->buf is used for
1257 		 *  verification of read after write.  For write I/O, when zcopy APIs
1258 		 *  are used, task->buf cannot be used, and data must be written to
1259 		 *  the data buffer allocated underneath bdev layer instead.
1260 		 *  Hence we copy task->buf to the allocated data buffer here.
1261 		 */
1262 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1263 		assert(iovcnt == 1);
1264 		assert(iovs != NULL);
1265 
1266 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1267 			  spdk_bdev_get_block_size(job->bdev),
1268 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1269 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
1270 	}
1271 
1272 	bdevperf_submit_task(task);
1273 }
1274 
1275 static void
1276 bdevperf_prep_zcopy_write_task(void *arg)
1277 {
1278 	struct bdevperf_task	*task = arg;
1279 	struct bdevperf_job	*job = task->job;
1280 	int			rc;
1281 
1282 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1283 				   task->offset_blocks, job->io_size_blocks,
1284 				   false, bdevperf_zcopy_get_buf_complete, task);
1285 	if (rc != 0) {
1286 		assert(rc == -ENOMEM);
1287 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1288 		return;
1289 	}
1290 
1291 	job->current_queue_depth++;
1292 }
1293 
1294 static struct bdevperf_task *
1295 bdevperf_job_get_task(struct bdevperf_job *job)
1296 {
1297 	struct bdevperf_task *task;
1298 
1299 	task = TAILQ_FIRST(&job->task_list);
1300 	if (!task) {
1301 		printf("Task allocation failed\n");
1302 		abort();
1303 	}
1304 
1305 	TAILQ_REMOVE(&job->task_list, task, link);
1306 	return task;
1307 }
1308 
1309 static void
1310 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1311 {
1312 	uint64_t offset_in_ios;
1313 	uint64_t rand_value;
1314 	uint32_t first_clear;
1315 
1316 	if (job->zipf) {
1317 		offset_in_ios = spdk_zipf_generate(job->zipf);
1318 	} else if (job->is_random) {
1319 		/* RAND_MAX is only INT32_MAX, so use 2 calls to rand_r to
1320 		 * get a large enough value to ensure we are issuing I/O
1321 		 * uniformly across the whole bdev.
1322 		 */
1323 		rand_value = (uint64_t)rand_r(&job->seed) * RAND_MAX + rand_r(&job->seed);
1324 		offset_in_ios = rand_value % job->size_in_ios;
1325 
1326 		if (g_random_map) {
1327 			/* Make sure, that the offset does not exceed the maximum size
1328 			 * of the bit array (verified during job creation)
1329 			 */
1330 			assert(offset_in_ios < UINT32_MAX);
1331 
1332 			first_clear = spdk_bit_array_find_first_clear(job->random_map, (uint32_t)offset_in_ios);
1333 
1334 			if (first_clear == UINT32_MAX) {
1335 				first_clear = spdk_bit_array_find_first_clear(job->random_map, 0);
1336 
1337 				if (first_clear == UINT32_MAX) {
1338 					/* If there are no more clear bits in the array, we start over
1339 					 * and select the previously selected random value.
1340 					 */
1341 					spdk_bit_array_clear_mask(job->random_map);
1342 					first_clear = (uint32_t)offset_in_ios;
1343 				}
1344 			}
1345 
1346 			spdk_bit_array_set(job->random_map, first_clear);
1347 
1348 			offset_in_ios = first_clear;
1349 		}
1350 	} else {
1351 		offset_in_ios = job->offset_in_ios++;
1352 		if (job->offset_in_ios == job->size_in_ios) {
1353 			job->offset_in_ios = 0;
1354 		}
1355 
1356 		/* Increment of offset_in_ios if there's already an outstanding IO
1357 		 * to that location. We only need this with job->verify as random
1358 		 * offsets are not supported with job->verify at this time.
1359 		 */
1360 		if (job->verify) {
1361 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1362 
1363 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1364 				offset_in_ios = job->offset_in_ios++;
1365 				if (job->offset_in_ios == job->size_in_ios) {
1366 					job->offset_in_ios = 0;
1367 				}
1368 			}
1369 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1370 		}
1371 	}
1372 
1373 	/* For multi-thread to same job, offset_in_ios is relative
1374 	 * to the LBA range assigned for that job. job->offset_blocks
1375 	 * is absolute (entire bdev LBA range).
1376 	 */
1377 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1378 
1379 	if (job->flush) {
1380 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1381 	} else if (job->unmap) {
1382 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1383 	} else if (job->write_zeroes) {
1384 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1385 	} else if ((job->rw_percentage == 100) ||
1386 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1387 		assert(!job->verify);
1388 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1389 		if (!g_zcopy) {
1390 			task->iov.iov_base = task->buf;
1391 			task->iov.iov_len = job->buf_size;
1392 		}
1393 	} else {
1394 		if (job->verify || job->reset || g_unique_writes) {
1395 			generate_data(job, task->buf, task->md_buf, g_unique_writes);
1396 		}
1397 		if (g_zcopy) {
1398 			bdevperf_prep_zcopy_write_task(task);
1399 			return;
1400 		} else {
1401 			task->iov.iov_base = task->buf;
1402 			task->iov.iov_len = job->buf_size;
1403 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1404 		}
1405 	}
1406 
1407 	bdevperf_submit_task(task);
1408 }
1409 
1410 static int reset_job(void *arg);
1411 
1412 static void
1413 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1414 {
1415 	struct bdevperf_task	*task = cb_arg;
1416 	struct bdevperf_job	*job = task->job;
1417 
1418 	if (!success) {
1419 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1420 		bdevperf_job_drain(job);
1421 		g_run_rc = -1;
1422 	}
1423 
1424 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1425 	spdk_bdev_free_io(bdev_io);
1426 
1427 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1428 						10 * SPDK_SEC_TO_USEC);
1429 }
1430 
1431 static int
1432 reset_job(void *arg)
1433 {
1434 	struct bdevperf_job *job = arg;
1435 	struct bdevperf_task *task;
1436 	int rc;
1437 
1438 	spdk_poller_unregister(&job->reset_timer);
1439 
1440 	/* Do reset. */
1441 	task = bdevperf_job_get_task(job);
1442 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1443 			     reset_cb, task);
1444 	if (rc) {
1445 		printf("Reset failed: %d\n", rc);
1446 		bdevperf_job_drain(job);
1447 		g_run_rc = -1;
1448 	}
1449 
1450 	return -1;
1451 }
1452 
1453 static void
1454 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1455 {
1456 	struct bdevperf_job *job = cb_arg;
1457 	struct bdevperf_task *task;
1458 
1459 	job->io_timeout++;
1460 
1461 	if (job->is_draining || !job->abort ||
1462 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1463 		return;
1464 	}
1465 
1466 	task = bdevperf_job_get_task(job);
1467 	if (task == NULL) {
1468 		return;
1469 	}
1470 
1471 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1472 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1473 
1474 	bdevperf_submit_task(task);
1475 }
1476 
1477 static void
1478 bdevperf_job_run(void *ctx)
1479 {
1480 	struct bdevperf_job *job = ctx;
1481 	struct bdevperf_task *task;
1482 	int i;
1483 
1484 	/* Submit initial I/O for this job. Each time one
1485 	 * completes, another will be submitted. */
1486 
1487 	/* Start a timer to stop this I/O chain when the run is over */
1488 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1489 	if (job->reset) {
1490 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1491 							10 * SPDK_SEC_TO_USEC);
1492 	}
1493 
1494 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1495 
1496 	for (i = 0; i < job->queue_depth; i++) {
1497 		task = bdevperf_job_get_task(job);
1498 		bdevperf_submit_single(job, task);
1499 	}
1500 }
1501 
1502 static void
1503 _performance_dump_done(void *ctx)
1504 {
1505 	struct bdevperf_aggregate_stats *aggregate = ctx;
1506 	struct bdevperf_stats *stats = &aggregate->total;
1507 	double average_latency;
1508 
1509 	if (g_summarize_performance) {
1510 		printf("%12.2f IOPS, %8.2f MiB/s", stats->total_io_per_second, stats->total_mb_per_second);
1511 		printf("\r");
1512 	} else {
1513 		printf("\r =================================================================================="
1514 		       "=================================\n");
1515 		printf("\r %-28s: %10s %10.2f %10.2f",
1516 		       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1517 		printf(" %10.2f %8.2f",
1518 		       stats->total_failed_per_second, stats->total_timeout_per_second);
1519 
1520 		average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1521 				  spdk_get_ticks_hz();
1522 		printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1523 		printf("\n");
1524 	}
1525 
1526 	fflush(stdout);
1527 
1528 	g_performance_dump_active = false;
1529 
1530 	free(aggregate);
1531 }
1532 
1533 static void
1534 _performance_dump(void *ctx)
1535 {
1536 	struct bdevperf_aggregate_stats *stats = ctx;
1537 	struct bdevperf_stats job_stats = {0};
1538 	struct bdevperf_job *job = stats->current_job;
1539 	uint64_t time_in_usec;
1540 
1541 	if (job->io_failed > 0 && !job->continue_on_failure) {
1542 		time_in_usec = job->run_time_in_usec;
1543 	} else {
1544 		time_in_usec = stats->total.io_time_in_usec;
1545 	}
1546 
1547 	bdevperf_job_get_stats(job, &job_stats, time_in_usec, g_show_performance_ema_period);
1548 	bdevperf_job_stats_accumulate(&stats->total, &job_stats);
1549 	if (!g_summarize_performance) {
1550 		performance_dump_job_stdout(stats->current_job, &job_stats);
1551 	}
1552 
1553 	/* This assumes the jobs list is static after start up time.
1554 	 * That's true right now, but if that ever changed this would need a lock. */
1555 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1556 	if (stats->current_job == NULL) {
1557 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1558 	} else {
1559 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1560 	}
1561 }
1562 
1563 static int
1564 performance_statistics_thread(void *arg)
1565 {
1566 	struct bdevperf_aggregate_stats *aggregate;
1567 	struct bdevperf_stats *stats;
1568 
1569 
1570 	if (g_performance_dump_active) {
1571 		return -1;
1572 	}
1573 
1574 	g_performance_dump_active = true;
1575 
1576 	aggregate = calloc(1, sizeof(*aggregate));
1577 	if (aggregate == NULL) {
1578 		return -1;
1579 	}
1580 	stats = &aggregate->total;
1581 	stats->min_latency = (double)UINT64_MAX;
1582 
1583 	g_show_performance_period_num++;
1584 
1585 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1586 
1587 	/* Iterate all of the jobs to gather stats
1588 	 * These jobs will not get removed here until a final performance dump is run,
1589 	 * so this should be safe without locking.
1590 	 */
1591 	aggregate->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1592 	if (aggregate->current_job == NULL) {
1593 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, aggregate);
1594 	} else {
1595 		spdk_thread_send_msg(aggregate->current_job->thread, _performance_dump, aggregate);
1596 	}
1597 
1598 	return -1;
1599 }
1600 
1601 static void
1602 bdevperf_test(void)
1603 {
1604 	struct bdevperf_job *job;
1605 
1606 	if (TAILQ_EMPTY(&g_bdevperf.jobs)) {
1607 		if (g_request) {
1608 			spdk_jsonrpc_send_error_response_fmt(g_request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
1609 							     "No jobs defined or bdevs created");
1610 			g_request = NULL;
1611 		}
1612 		return;
1613 	}
1614 
1615 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1616 	fflush(stdout);
1617 
1618 	/* Start a timer to dump performance numbers */
1619 	g_start_tsc = spdk_get_ticks();
1620 	if (!g_summarize_performance) {
1621 		printf("%*s\n", 107, "Latency(us)");
1622 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1623 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1624 	}
1625 	if (!g_perf_timer) {
1626 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1627 						    g_show_performance_period_in_usec);
1628 	}
1629 
1630 	/* Iterate jobs to start all I/O */
1631 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1632 		g_bdevperf.running_jobs++;
1633 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1634 	}
1635 }
1636 
1637 static void
1638 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1639 {
1640 	struct bdevperf_job *job = event_ctx;
1641 
1642 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1643 		bdevperf_job_drain(job);
1644 	}
1645 }
1646 
1647 static void
1648 bdevperf_histogram_status_cb(void *cb_arg, int status)
1649 {
1650 	if (status != 0) {
1651 		g_run_rc = status;
1652 		if (g_continue_on_failure == false) {
1653 			g_error_to_exit = true;
1654 		}
1655 	}
1656 
1657 	if (--g_bdev_count == 0) {
1658 		if (g_run_rc == 0) {
1659 			/* Ready to run the test */
1660 			bdevperf_test();
1661 		} else {
1662 			bdevperf_test_done(NULL);
1663 		}
1664 	}
1665 }
1666 
1667 static uint32_t g_construct_job_count = 0;
1668 
1669 static int
1670 _bdevperf_enable_histogram(void *ctx, struct spdk_bdev *bdev)
1671 {
1672 	bool *enable = ctx;
1673 
1674 	g_bdev_count++;
1675 
1676 	spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, *enable);
1677 
1678 	return 0;
1679 }
1680 
1681 static void
1682 bdevperf_enable_histogram(bool enable)
1683 {
1684 	struct spdk_bdev *bdev;
1685 	int rc;
1686 
1687 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1688 	g_bdev_count = 1;
1689 
1690 	if (g_job_bdev_name != NULL) {
1691 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1692 		if (bdev) {
1693 			rc = _bdevperf_enable_histogram(&enable, bdev);
1694 		} else {
1695 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1696 			rc = -1;
1697 		}
1698 	} else {
1699 		rc = spdk_for_each_bdev_leaf(&enable, _bdevperf_enable_histogram);
1700 	}
1701 
1702 	bdevperf_histogram_status_cb(NULL, rc);
1703 }
1704 
1705 static void
1706 _bdevperf_construct_job_done(void *ctx)
1707 {
1708 	if (--g_construct_job_count == 0) {
1709 		if (g_run_rc != 0) {
1710 			/* Something failed. */
1711 			bdevperf_test_done(NULL);
1712 			return;
1713 		}
1714 
1715 		/* always enable histogram. */
1716 		bdevperf_enable_histogram(true);
1717 	} else if (g_run_rc != 0) {
1718 		/* Reset error as some jobs constructed right */
1719 		g_run_rc = 0;
1720 		if (g_continue_on_failure == false) {
1721 			g_error_to_exit = true;
1722 		}
1723 	}
1724 }
1725 
1726 /* Checkformat will not allow to use inlined type,
1727    this is a workaround */
1728 typedef struct spdk_thread *spdk_thread_t;
1729 
1730 static spdk_thread_t
1731 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1732 {
1733 	struct spdk_cpuset tmp;
1734 
1735 	/* This function runs on the main thread. */
1736 	assert(g_main_thread == spdk_get_thread());
1737 
1738 	/* Handle default mask */
1739 	if (spdk_cpuset_count(cpumask) == 0) {
1740 		cpumask = &g_all_cpuset;
1741 	}
1742 
1743 	/* Warn user that mask might need to be changed */
1744 	spdk_cpuset_copy(&tmp, cpumask);
1745 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1746 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1747 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1748 	}
1749 
1750 	return spdk_thread_create(tag, cpumask);
1751 }
1752 
1753 static uint32_t
1754 _get_next_core(void)
1755 {
1756 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1757 
1758 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1759 		current_core = spdk_env_get_first_core();
1760 		return current_core;
1761 	}
1762 
1763 	current_core = spdk_env_get_next_core(current_core);
1764 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1765 		current_core = spdk_env_get_first_core();
1766 	}
1767 
1768 	return current_core;
1769 }
1770 
1771 static void
1772 _bdevperf_construct_job(void *ctx)
1773 {
1774 	struct bdevperf_job *job = ctx;
1775 	int rc;
1776 
1777 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1778 				&job->bdev_desc);
1779 	if (rc != 0) {
1780 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1781 		g_run_rc = -EINVAL;
1782 		goto end;
1783 	}
1784 
1785 	if (g_zcopy) {
1786 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1787 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1788 			g_run_rc = -ENOTSUP;
1789 			goto end;
1790 		}
1791 	}
1792 
1793 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1794 	if (!job->ch) {
1795 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1796 			    rc);
1797 		spdk_bdev_close(job->bdev_desc);
1798 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1799 		g_run_rc = -ENOMEM;
1800 		goto end;
1801 	}
1802 
1803 end:
1804 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1805 }
1806 
1807 static void
1808 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1809 {
1810 	switch (rw) {
1811 	case JOB_CONFIG_RW_READ:
1812 		job->rw_percentage = 100;
1813 		break;
1814 	case JOB_CONFIG_RW_WRITE:
1815 		job->rw_percentage = 0;
1816 		break;
1817 	case JOB_CONFIG_RW_RANDREAD:
1818 		job->is_random = true;
1819 		job->rw_percentage = 100;
1820 		job->seed = rand();
1821 		break;
1822 	case JOB_CONFIG_RW_RANDWRITE:
1823 		job->is_random = true;
1824 		job->rw_percentage = 0;
1825 		job->seed = rand();
1826 		break;
1827 	case JOB_CONFIG_RW_RW:
1828 		job->is_random = false;
1829 		break;
1830 	case JOB_CONFIG_RW_RANDRW:
1831 		job->is_random = true;
1832 		job->seed = rand();
1833 		break;
1834 	case JOB_CONFIG_RW_RESET:
1835 		/* Reset shares the flow with verify. */
1836 		job->reset = true;
1837 	/* fallthrough */
1838 	case JOB_CONFIG_RW_VERIFY:
1839 		job->verify = true;
1840 		/* For verify flow read is done on write completion
1841 		 * callback only, rw_percentage shall not be used. */
1842 		job->rw_percentage = 0;
1843 		break;
1844 	case JOB_CONFIG_RW_UNMAP:
1845 		job->unmap = true;
1846 		break;
1847 	case JOB_CONFIG_RW_FLUSH:
1848 		job->flush = true;
1849 		break;
1850 	case JOB_CONFIG_RW_WRITE_ZEROES:
1851 		job->write_zeroes = true;
1852 		break;
1853 	}
1854 }
1855 
1856 static int
1857 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1858 		       struct spdk_thread *thread)
1859 {
1860 	struct bdevperf_job *job;
1861 	struct bdevperf_task *task;
1862 	int block_size, data_block_size;
1863 	int rc;
1864 	int task_num, n;
1865 	int32_t numa_id;
1866 
1867 	block_size = spdk_bdev_get_block_size(bdev);
1868 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1869 
1870 	job = calloc(1, sizeof(struct bdevperf_job));
1871 	if (!job) {
1872 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1873 		return -ENOMEM;
1874 	}
1875 
1876 	job->name = strdup(spdk_bdev_get_name(bdev));
1877 	if (!job->name) {
1878 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1879 		bdevperf_job_free(job);
1880 		return -ENOMEM;
1881 	}
1882 
1883 	job->workload_type = config->rw;
1884 	job->io_size = config->bs;
1885 	job->rw_percentage = config->rwmixread;
1886 	job->continue_on_failure = g_continue_on_failure;
1887 	job->queue_depth = config->iodepth;
1888 	job->bdev = bdev;
1889 	job->io_size_blocks = job->io_size / data_block_size;
1890 	job->buf_size = job->io_size_blocks * block_size;
1891 	job->abort = g_abort;
1892 	job_init_rw(job, config->rw);
1893 
1894 	if ((job->io_size % data_block_size) != 0) {
1895 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1896 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1897 		bdevperf_job_free(job);
1898 		return -ENOTSUP;
1899 	}
1900 
1901 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1902 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1903 		bdevperf_job_free(job);
1904 		return -ENOTSUP;
1905 	}
1906 
1907 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1908 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1909 	}
1910 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1911 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1912 	}
1913 
1914 	job->offset_in_ios = 0;
1915 
1916 	if (config->length != 0) {
1917 		/* Use subset of disk */
1918 		job->size_in_ios = config->length / job->io_size_blocks;
1919 		job->ios_base = config->offset / job->io_size_blocks;
1920 	} else {
1921 		/* Use whole disk */
1922 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1923 		job->ios_base = 0;
1924 	}
1925 
1926 	if (job->is_random && g_zipf_theta > 0) {
1927 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1928 	}
1929 
1930 	if (job->verify) {
1931 		if (job->size_in_ios >= UINT32_MAX) {
1932 			SPDK_ERRLOG("Due to constraints of verify operation, the job storage capacity is too large\n");
1933 			bdevperf_job_free(job);
1934 			return -ENOMEM;
1935 		}
1936 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1937 		if (job->outstanding == NULL) {
1938 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1939 				    spdk_bdev_get_name(bdev));
1940 			bdevperf_job_free(job);
1941 			return -ENOMEM;
1942 		}
1943 		if (job->queue_depth > (int)job->size_in_ios) {
1944 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1945 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1946 				     "Queue depth is limited to %"PRIu64"\n",
1947 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1948 			job->queue_depth = (int)job->size_in_ios;
1949 		}
1950 	}
1951 
1952 	job->histogram = spdk_histogram_data_alloc();
1953 	if (job->histogram == NULL) {
1954 		fprintf(stderr, "Failed to allocate histogram\n");
1955 		bdevperf_job_free(job);
1956 		return -ENOMEM;
1957 	}
1958 
1959 	TAILQ_INIT(&job->task_list);
1960 
1961 	if (g_random_map) {
1962 		if (job->size_in_ios >= UINT32_MAX) {
1963 			SPDK_ERRLOG("Due to constraints of the random map, the job storage capacity is too large\n");
1964 			bdevperf_job_free(job);
1965 			return -ENOMEM;
1966 		}
1967 		job->random_map = spdk_bit_array_create(job->size_in_ios);
1968 		if (job->random_map == NULL) {
1969 			SPDK_ERRLOG("Could not create random_map array bitmap for bdev %s\n",
1970 				    spdk_bdev_get_name(bdev));
1971 			bdevperf_job_free(job);
1972 			return -ENOMEM;
1973 		}
1974 	}
1975 
1976 	task_num = job->queue_depth;
1977 	if (job->reset) {
1978 		task_num += 1;
1979 	}
1980 	if (job->abort) {
1981 		task_num += job->queue_depth;
1982 	}
1983 
1984 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1985 
1986 	numa_id = spdk_bdev_get_numa_id(job->bdev);
1987 
1988 	for (n = 0; n < task_num; n++) {
1989 		task = calloc(1, sizeof(struct bdevperf_task));
1990 		if (!task) {
1991 			fprintf(stderr, "Failed to allocate task from memory\n");
1992 			spdk_zipf_free(&job->zipf);
1993 			return -ENOMEM;
1994 		}
1995 
1996 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1997 					 numa_id, SPDK_MALLOC_DMA);
1998 		if (!task->buf) {
1999 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
2000 			spdk_zipf_free(&job->zipf);
2001 			free(task);
2002 			return -ENOMEM;
2003 		}
2004 
2005 		if (job->verify && job->buf_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2006 			task->verify_buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
2007 							numa_id, SPDK_MALLOC_DMA);
2008 			if (!task->verify_buf) {
2009 				fprintf(stderr, "Cannot allocate buf_verify for task=%p\n", task);
2010 				spdk_free(task->buf);
2011 				spdk_zipf_free(&job->zipf);
2012 				free(task);
2013 				return -ENOMEM;
2014 			}
2015 
2016 		}
2017 
2018 		if (spdk_bdev_is_md_separate(job->bdev)) {
2019 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
2020 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
2021 						    numa_id, SPDK_MALLOC_DMA);
2022 			if (!task->md_buf) {
2023 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
2024 				spdk_zipf_free(&job->zipf);
2025 				spdk_free(task->verify_buf);
2026 				spdk_free(task->buf);
2027 				free(task);
2028 				return -ENOMEM;
2029 			}
2030 		}
2031 
2032 		task->job = job;
2033 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
2034 	}
2035 
2036 	job->thread = thread;
2037 
2038 	g_construct_job_count++;
2039 
2040 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
2041 	assert(rc == 0);
2042 
2043 	return rc;
2044 }
2045 
2046 static int
2047 parse_rw(const char *str, enum job_config_rw ret)
2048 {
2049 	if (str == NULL) {
2050 		return ret;
2051 	}
2052 
2053 	if (!strcmp(str, "read")) {
2054 		ret = JOB_CONFIG_RW_READ;
2055 	} else if (!strcmp(str, "randread")) {
2056 		ret = JOB_CONFIG_RW_RANDREAD;
2057 	} else if (!strcmp(str, "write")) {
2058 		ret = JOB_CONFIG_RW_WRITE;
2059 	} else if (!strcmp(str, "randwrite")) {
2060 		ret = JOB_CONFIG_RW_RANDWRITE;
2061 	} else if (!strcmp(str, "verify")) {
2062 		ret = JOB_CONFIG_RW_VERIFY;
2063 	} else if (!strcmp(str, "reset")) {
2064 		ret = JOB_CONFIG_RW_RESET;
2065 	} else if (!strcmp(str, "unmap")) {
2066 		ret = JOB_CONFIG_RW_UNMAP;
2067 	} else if (!strcmp(str, "write_zeroes")) {
2068 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
2069 	} else if (!strcmp(str, "flush")) {
2070 		ret = JOB_CONFIG_RW_FLUSH;
2071 	} else if (!strcmp(str, "rw")) {
2072 		ret = JOB_CONFIG_RW_RW;
2073 	} else if (!strcmp(str, "randrw")) {
2074 		ret = JOB_CONFIG_RW_RANDRW;
2075 	} else {
2076 		fprintf(stderr, "rw must be one of\n"
2077 			PATTERN_TYPES_STR "\n");
2078 		ret = BDEVPERF_CONFIG_ERROR;
2079 	}
2080 
2081 	return ret;
2082 }
2083 
2084 static const char *
2085 config_filename_next(const char *filename, char *out)
2086 {
2087 	int i, k;
2088 
2089 	if (filename == NULL) {
2090 		out[0] = '\0';
2091 		return NULL;
2092 	}
2093 
2094 	if (filename[0] == ':') {
2095 		filename++;
2096 	}
2097 
2098 	for (i = 0, k = 0;
2099 	     filename[i] != '\0' &&
2100 	     filename[i] != ':' &&
2101 	     i < BDEVPERF_CONFIG_MAX_FILENAME &&
2102 	     k < (BDEVPERF_CONFIG_MAX_FILENAME - 1);
2103 	     i++) {
2104 		if (filename[i] == ' ' || filename[i] == '\t') {
2105 			continue;
2106 		}
2107 
2108 		out[k++] = filename[i];
2109 	}
2110 	out[k] = 0;
2111 
2112 	return filename + i;
2113 }
2114 
2115 static struct spdk_thread *
2116 get_lcore_thread(uint32_t lcore)
2117 {
2118 	struct lcore_thread *lthread;
2119 
2120 	TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
2121 		if (lthread->lcore == lcore) {
2122 			return lthread->thread;
2123 		}
2124 	}
2125 
2126 	return NULL;
2127 }
2128 
2129 static void
2130 create_lcore_thread(uint32_t lcore)
2131 {
2132 	struct lcore_thread *lthread;
2133 	struct spdk_cpuset cpumask = {};
2134 	char name[32];
2135 
2136 	lthread = calloc(1, sizeof(*lthread));
2137 	assert(lthread != NULL);
2138 
2139 	lthread->lcore = lcore;
2140 
2141 	snprintf(name, sizeof(name), "lcore_%u", lcore);
2142 	spdk_cpuset_set_cpu(&cpumask, lcore, true);
2143 
2144 	lthread->thread = spdk_thread_create(name, &cpumask);
2145 	assert(lthread->thread != NULL);
2146 
2147 	TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
2148 }
2149 
2150 static void
2151 bdevperf_construct_jobs(void)
2152 {
2153 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
2154 	struct spdk_thread *thread;
2155 	struct job_config *config;
2156 	struct spdk_bdev *bdev;
2157 	const char *filenames;
2158 	uint32_t i;
2159 	int rc;
2160 
2161 	if (g_one_thread_per_lcore) {
2162 		SPDK_ENV_FOREACH_CORE(i) {
2163 			create_lcore_thread(i);
2164 		}
2165 	}
2166 
2167 	TAILQ_FOREACH(config, &job_config_list, link) {
2168 		filenames = config->filename;
2169 
2170 		if (!g_one_thread_per_lcore) {
2171 			thread = construct_job_thread(&config->cpumask, config->name);
2172 		} else {
2173 			thread = get_lcore_thread(config->lcore);
2174 		}
2175 		assert(thread);
2176 
2177 		while (filenames) {
2178 			filenames = config_filename_next(filenames, filename);
2179 			if (strlen(filename) == 0) {
2180 				break;
2181 			}
2182 
2183 			bdev = spdk_bdev_get_by_name(filename);
2184 			if (!bdev) {
2185 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
2186 				g_run_rc = -EINVAL;
2187 				return;
2188 			}
2189 
2190 			rc = bdevperf_construct_job(bdev, config, thread);
2191 			if (rc < 0) {
2192 				g_run_rc = rc;
2193 				return;
2194 			}
2195 		}
2196 	}
2197 }
2198 
2199 static int
2200 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
2201 {
2202 	struct job_config *config = calloc(1, sizeof(*config));
2203 
2204 	if (config == NULL) {
2205 		fprintf(stderr, "Unable to allocate memory for job config\n");
2206 		return -ENOMEM;
2207 	}
2208 
2209 	config->name = filename;
2210 	config->filename = filename;
2211 	config->lcore = _get_next_core();
2212 	spdk_cpuset_zero(&config->cpumask);
2213 	spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
2214 	config->bs = g_io_size;
2215 	config->iodepth = g_queue_depth;
2216 	config->rwmixread = g_rw_percentage;
2217 	config->offset = offset;
2218 	config->length = range;
2219 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
2220 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2221 		free(config);
2222 		return -EINVAL;
2223 	}
2224 
2225 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
2226 	return 0;
2227 }
2228 
2229 static int
2230 bdevperf_construct_multithread_job_config(void *ctx, struct spdk_bdev *bdev)
2231 {
2232 	uint32_t *num_cores = ctx;
2233 	uint32_t i;
2234 	uint64_t blocks_per_job;
2235 	int64_t offset;
2236 	int rc;
2237 
2238 	blocks_per_job = spdk_bdev_get_num_blocks(bdev) / *num_cores;
2239 	offset = 0;
2240 
2241 	SPDK_ENV_FOREACH_CORE(i) {
2242 		rc = make_cli_job_config(spdk_bdev_get_name(bdev), offset, blocks_per_job);
2243 		if (rc) {
2244 			return rc;
2245 		}
2246 
2247 		offset += blocks_per_job;
2248 	}
2249 
2250 	return 0;
2251 }
2252 
2253 static void
2254 bdevperf_construct_multithread_job_configs(void)
2255 {
2256 	struct spdk_bdev *bdev;
2257 	uint32_t i;
2258 	uint32_t num_cores;
2259 
2260 	num_cores = 0;
2261 	SPDK_ENV_FOREACH_CORE(i) {
2262 		num_cores++;
2263 	}
2264 
2265 	if (num_cores == 0) {
2266 		g_run_rc = -EINVAL;
2267 		return;
2268 	}
2269 
2270 	if (g_job_bdev_name != NULL) {
2271 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2272 		if (!bdev) {
2273 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2274 			return;
2275 		}
2276 		g_run_rc = bdevperf_construct_multithread_job_config(&num_cores, bdev);
2277 	} else {
2278 		g_run_rc = spdk_for_each_bdev_leaf(&num_cores, bdevperf_construct_multithread_job_config);
2279 	}
2280 
2281 }
2282 
2283 static int
2284 bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
2285 {
2286 	/* Construct the job */
2287 	return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
2288 }
2289 
2290 static void
2291 bdevperf_construct_job_configs(void)
2292 {
2293 	struct spdk_bdev *bdev;
2294 
2295 	/* There are three different modes for allocating jobs. Standard mode
2296 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
2297 	 *
2298 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
2299 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
2300 	 * This runs multiple threads per bdev, effectively.
2301 	 *
2302 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
2303 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
2304 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
2305 	 * which is different from other modes in that they only support global options.
2306 	 *
2307 	 * Both for standard mode and "multithread" mode, if the -E flag is specified,
2308 	 * it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
2309 	 * multiple jobs.
2310 	 */
2311 
2312 	if (g_bdevperf_conf) {
2313 		goto end;
2314 	}
2315 
2316 	if (g_multithread_mode) {
2317 		bdevperf_construct_multithread_job_configs();
2318 	} else if (g_job_bdev_name != NULL) {
2319 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2320 		if (bdev) {
2321 			/* Construct the job */
2322 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
2323 		} else {
2324 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2325 		}
2326 	} else {
2327 		g_run_rc = spdk_for_each_bdev_leaf(NULL, bdevperf_construct_job_config);
2328 	}
2329 
2330 end:
2331 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
2332 	 * of iteration.
2333 	 */
2334 	g_construct_job_count = 1;
2335 
2336 	if (g_run_rc == 0) {
2337 		bdevperf_construct_jobs();
2338 	}
2339 
2340 	_bdevperf_construct_job_done(NULL);
2341 }
2342 
2343 static int
2344 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
2345 {
2346 	const char *job_name;
2347 	int tmp;
2348 
2349 	tmp = spdk_conf_section_get_intval(s, name);
2350 	if (tmp == -1) {
2351 		/* Field was not found. Check default value
2352 		 * In [global] section it is ok to have undefined values
2353 		 * but for other sections it is not ok */
2354 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
2355 			job_name = spdk_conf_section_get_name(s);
2356 			if (strcmp(job_name, "global") == 0) {
2357 				return def;
2358 			}
2359 
2360 			fprintf(stderr,
2361 				"Job '%s' has no '%s' assigned\n",
2362 				job_name, name);
2363 			return BDEVPERF_CONFIG_ERROR;
2364 		}
2365 		return def;
2366 	}
2367 
2368 	/* NOTE: get_intval returns nonnegative on success */
2369 	if (tmp < 0) {
2370 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2371 			spdk_conf_section_get_name(s), name);
2372 		return BDEVPERF_CONFIG_ERROR;
2373 	}
2374 
2375 	return tmp;
2376 }
2377 
2378 /* CLI arguments override parameters for global sections */
2379 static void
2380 config_set_cli_args(struct job_config *config)
2381 {
2382 	if (g_job_bdev_name) {
2383 		config->filename = g_job_bdev_name;
2384 	}
2385 	if (g_io_size > 0) {
2386 		config->bs = g_io_size;
2387 	}
2388 	if (g_queue_depth > 0) {
2389 		config->iodepth = g_queue_depth;
2390 	}
2391 	if (g_rw_percentage > 0) {
2392 		config->rwmixread = g_rw_percentage;
2393 	}
2394 	if (g_workload_type) {
2395 		config->rw = parse_rw(g_workload_type, config->rw);
2396 	}
2397 }
2398 
2399 static int
2400 read_job_config(void)
2401 {
2402 	struct job_config global_default_config;
2403 	struct job_config global_config;
2404 	struct spdk_conf_section *s;
2405 	struct job_config *config = NULL;
2406 	const char *cpumask;
2407 	const char *rw;
2408 	bool is_global;
2409 	int n = 0;
2410 	int val;
2411 
2412 	if (g_bdevperf_conf_file == NULL) {
2413 		return 0;
2414 	}
2415 
2416 	g_bdevperf_conf = spdk_conf_allocate();
2417 	if (g_bdevperf_conf == NULL) {
2418 		fprintf(stderr, "Could not allocate job config structure\n");
2419 		return 1;
2420 	}
2421 
2422 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2423 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2424 		fprintf(stderr, "Invalid job config");
2425 		return 1;
2426 	}
2427 
2428 	/* Initialize global defaults */
2429 	global_default_config.filename = NULL;
2430 	/* Zero mask is the same as g_all_cpuset
2431 	 * The g_all_cpuset is not initialized yet,
2432 	 * so use zero mask as the default instead */
2433 	spdk_cpuset_zero(&global_default_config.cpumask);
2434 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2435 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2436 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2437 	global_default_config.rwmixread = 50;
2438 	global_default_config.offset = 0;
2439 	/* length 0 means 100% */
2440 	global_default_config.length = 0;
2441 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2442 	config_set_cli_args(&global_default_config);
2443 
2444 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2445 		return 1;
2446 	}
2447 
2448 	/* There is only a single instance of global job_config
2449 	 * We just reset its value when we encounter new [global] section */
2450 	global_config = global_default_config;
2451 
2452 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2453 	     s != NULL;
2454 	     s = spdk_conf_next_section(s)) {
2455 		config = calloc(1, sizeof(*config));
2456 		if (config == NULL) {
2457 			fprintf(stderr, "Unable to allocate memory for job config\n");
2458 			return 1;
2459 		}
2460 
2461 		config->name = spdk_conf_section_get_name(s);
2462 		is_global = strcmp(config->name, "global") == 0;
2463 
2464 		if (is_global) {
2465 			global_config = global_default_config;
2466 		}
2467 
2468 		config->filename = spdk_conf_section_get_val(s, "filename");
2469 		if (config->filename == NULL) {
2470 			config->filename = global_config.filename;
2471 		}
2472 		if (!is_global) {
2473 			if (config->filename == NULL) {
2474 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2475 				goto error;
2476 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2477 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2478 				fprintf(stderr,
2479 					"filename for '%s' job is too long. Max length is %d\n",
2480 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2481 				goto error;
2482 			}
2483 		}
2484 
2485 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2486 		if (cpumask == NULL) {
2487 			config->cpumask = global_config.cpumask;
2488 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2489 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2490 			goto error;
2491 		}
2492 
2493 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2494 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2495 			goto error;
2496 		} else if (config->bs == 0) {
2497 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2498 			goto error;
2499 		}
2500 
2501 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2502 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2503 			goto error;
2504 		} else if (config->iodepth == 0) {
2505 			fprintf(stderr,
2506 				"'iodepth' of job '%s' must be greater than 0\n",
2507 				config->name);
2508 			goto error;
2509 		}
2510 
2511 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2512 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2513 			goto error;
2514 		} else if (config->rwmixread > 100) {
2515 			fprintf(stderr,
2516 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2517 				config->name);
2518 			goto error;
2519 		}
2520 
2521 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2522 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2523 			goto error;
2524 		}
2525 
2526 		val = parse_uint_option(s, "length", global_config.length);
2527 		if (val == BDEVPERF_CONFIG_ERROR) {
2528 			goto error;
2529 		}
2530 		config->length = val;
2531 
2532 		rw = spdk_conf_section_get_val(s, "rw");
2533 		config->rw = parse_rw(rw, global_config.rw);
2534 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2535 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2536 			goto error;
2537 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2538 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2539 			goto error;
2540 		}
2541 
2542 		if (is_global) {
2543 			config_set_cli_args(config);
2544 			global_config = *config;
2545 			free(config);
2546 			config = NULL;
2547 		} else {
2548 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2549 			n++;
2550 		}
2551 	}
2552 
2553 	if (g_rpc_log_file_name != NULL) {
2554 		g_rpc_log_file = fopen(g_rpc_log_file_name, "a");
2555 		if (g_rpc_log_file == NULL) {
2556 			fprintf(stderr, "Failed to open %s\n", g_rpc_log_file_name);
2557 			goto error;
2558 		}
2559 	}
2560 
2561 	printf("Using job config with %d jobs\n", n);
2562 	return 0;
2563 error:
2564 	free(config);
2565 	return 1;
2566 }
2567 
2568 static void
2569 bdevperf_run(void *arg1)
2570 {
2571 	uint32_t i;
2572 
2573 	g_main_thread = spdk_get_thread();
2574 
2575 	spdk_cpuset_zero(&g_all_cpuset);
2576 	SPDK_ENV_FOREACH_CORE(i) {
2577 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2578 	}
2579 
2580 	if (g_wait_for_tests) {
2581 		/* Do not perform any tests until RPC is received */
2582 		return;
2583 	}
2584 
2585 	bdevperf_construct_job_configs();
2586 }
2587 
2588 static void
2589 rpc_perform_tests_reset(void)
2590 {
2591 	/* Reset g_run_rc to 0 for the next test run. */
2592 	g_run_rc = 0;
2593 
2594 	/* Reset g_stats to 0 for the next test run. */
2595 	memset(&g_stats, 0, sizeof(g_stats));
2596 
2597 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2598 	g_show_performance_period_num = 0;
2599 }
2600 
2601 static void
2602 rpc_perform_tests_cb(void)
2603 {
2604 	struct spdk_jsonrpc_request *request = g_request;
2605 
2606 	g_request = NULL;
2607 
2608 	if (g_run_rc) {
2609 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2610 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2611 	}
2612 
2613 	rpc_perform_tests_reset();
2614 }
2615 
2616 struct rpc_bdevperf_params {
2617 	int	time_in_sec;
2618 	char	*workload_type;
2619 	int	queue_depth;
2620 	char	*io_size;
2621 	int	rw_percentage;
2622 };
2623 
2624 static const struct spdk_json_object_decoder rpc_bdevperf_params_decoders[] = {
2625 	{"time_in_sec", offsetof(struct rpc_bdevperf_params, time_in_sec), spdk_json_decode_int32, true},
2626 	{"workload_type", offsetof(struct rpc_bdevperf_params, workload_type), spdk_json_decode_string, true},
2627 	{"queue_depth", offsetof(struct rpc_bdevperf_params, queue_depth), spdk_json_decode_int32, true},
2628 	{"io_size", offsetof(struct rpc_bdevperf_params, io_size), spdk_json_decode_string, true},
2629 	{"rw_percentage", offsetof(struct rpc_bdevperf_params, rw_percentage), spdk_json_decode_int32, true},
2630 };
2631 
2632 static void
2633 rpc_apply_bdevperf_params(struct rpc_bdevperf_params *params)
2634 {
2635 	if (params->workload_type) {
2636 		/* we need to clear previously settled parameter to avoid memory leak */
2637 		free(g_workload_type);
2638 		g_workload_type = strdup(params->workload_type);
2639 	}
2640 	if (params->queue_depth) {
2641 		g_queue_depth = params->queue_depth;
2642 	}
2643 	if (params->io_size) {
2644 		bdevperf_parse_arg('o', params->io_size);
2645 	}
2646 	if (params->time_in_sec) {
2647 		g_time_in_sec = params->time_in_sec;
2648 	}
2649 	if (params->rw_percentage) {
2650 		g_rw_percentage = params->rw_percentage;
2651 		g_mix_specified = true;
2652 	} else {
2653 		g_mix_specified = false;
2654 	}
2655 }
2656 
2657 static void
2658 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2659 {
2660 	struct rpc_bdevperf_params req = {}, backup = {};
2661 	int rc;
2662 
2663 	if (g_request != NULL) {
2664 		fprintf(stderr, "Another test is already in progress.\n");
2665 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2666 						 spdk_strerror(-EINPROGRESS));
2667 		return;
2668 	}
2669 
2670 	if (params) {
2671 		if (spdk_json_decode_object_relaxed(params, rpc_bdevperf_params_decoders,
2672 						    SPDK_COUNTOF(rpc_bdevperf_params_decoders),
2673 						    &req)) {
2674 			spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2675 							 "spdk_json_decode_object failed");
2676 			return;
2677 		}
2678 
2679 		if (g_workload_type) {
2680 			backup.workload_type = strdup(g_workload_type);
2681 		}
2682 		backup.queue_depth = g_queue_depth;
2683 		if (asprintf(&backup.io_size, "%d", g_io_size) < 0) {
2684 			fprintf(stderr, "Couldn't allocate memory for queue depth");
2685 			goto rpc_error;
2686 		}
2687 		backup.time_in_sec = g_time_in_sec;
2688 		backup.rw_percentage = g_rw_percentage;
2689 
2690 		rpc_apply_bdevperf_params(&req);
2691 
2692 		free(req.workload_type);
2693 		free(req.io_size);
2694 	}
2695 
2696 	rc = verify_test_params();
2697 
2698 	if (rc) {
2699 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2700 						 "Invalid parameters provided");
2701 		/* restore old params on error */
2702 		rpc_apply_bdevperf_params(&backup);
2703 		goto rpc_error;
2704 	}
2705 
2706 	g_request = request;
2707 
2708 	/* Only construct job configs at the first test run.  */
2709 	if (TAILQ_EMPTY(&job_config_list)) {
2710 		bdevperf_construct_job_configs();
2711 	} else {
2712 		bdevperf_construct_jobs();
2713 	}
2714 
2715 rpc_error:
2716 	free(backup.io_size);
2717 	free(backup.workload_type);
2718 }
2719 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2720 
2721 static void
2722 _bdevperf_job_drain(void *ctx)
2723 {
2724 	bdevperf_job_drain(ctx);
2725 }
2726 
2727 static void
2728 spdk_bdevperf_shutdown_cb(void)
2729 {
2730 	g_shutdown = true;
2731 	struct bdevperf_job *job, *tmp;
2732 
2733 	if (g_bdevperf.running_jobs == 0) {
2734 		bdevperf_test_done(NULL);
2735 		return;
2736 	}
2737 
2738 	/* Iterate jobs to stop all I/O */
2739 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2740 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2741 	}
2742 }
2743 
2744 static int
2745 bdevperf_parse_arg(int ch, char *arg)
2746 {
2747 	long long tmp;
2748 
2749 	if (ch == 'w') {
2750 		g_workload_type = strdup(arg);
2751 	} else if (ch == 'T') {
2752 		g_job_bdev_name = arg;
2753 	} else if (ch == 'z') {
2754 		g_wait_for_tests = true;
2755 	} else if (ch == 'Z') {
2756 		g_zcopy = true;
2757 	} else if (ch == 'X') {
2758 		g_abort = true;
2759 	} else if (ch == 'C') {
2760 		g_multithread_mode = true;
2761 	} else if (ch == 'f') {
2762 		g_continue_on_failure = true;
2763 	} else if (ch == 'j') {
2764 		g_bdevperf_conf_file = arg;
2765 	} else if (ch == 'F') {
2766 		char *endptr;
2767 
2768 		errno = 0;
2769 		g_zipf_theta = strtod(arg, &endptr);
2770 		if (errno || arg == endptr || g_zipf_theta < 0) {
2771 			fprintf(stderr, "Illegal zipf theta value %s\n", arg);
2772 			return -EINVAL;
2773 		}
2774 	} else if (ch == 'l') {
2775 		g_latency_display_level++;
2776 	} else if (ch == 'D') {
2777 		g_random_map = true;
2778 	} else if (ch == 'E') {
2779 		g_one_thread_per_lcore = true;
2780 	} else if (ch == 'J') {
2781 		g_rpc_log_file_name = arg;
2782 	} else if (ch == 'o') {
2783 		uint64_t size;
2784 
2785 		if (spdk_parse_capacity(arg, &size, NULL) != 0) {
2786 			fprintf(stderr, "Invalid IO size: %s\n", arg);
2787 			return -EINVAL;
2788 		}
2789 		g_io_size = (int)size;
2790 	} else if (ch == 'U') {
2791 		g_unique_writes = true;
2792 	} else {
2793 		tmp = spdk_strtoll(arg, 10);
2794 		if (tmp < 0) {
2795 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2796 			return tmp;
2797 		} else if (tmp >= INT_MAX) {
2798 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2799 			return -ERANGE;
2800 		}
2801 
2802 		switch (ch) {
2803 		case 'q':
2804 			g_queue_depth = tmp;
2805 			break;
2806 		case 't':
2807 			g_time_in_sec = tmp;
2808 			break;
2809 		case 'k':
2810 			g_timeout_in_sec = tmp;
2811 			break;
2812 		case 'M':
2813 			g_rw_percentage = tmp;
2814 			g_mix_specified = true;
2815 			break;
2816 		case 'P':
2817 			g_show_performance_ema_period = tmp;
2818 			break;
2819 		case 'S':
2820 			g_summarize_performance = false;
2821 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2822 			break;
2823 		default:
2824 			return -EINVAL;
2825 		}
2826 	}
2827 	return 0;
2828 }
2829 
2830 static void
2831 bdevperf_usage(void)
2832 {
2833 	printf(" -q <depth>                io depth\n");
2834 	printf(" -o <size>                 io size in bytes\n");
2835 	printf(" -w <type>                 io pattern type, must be one of " PATTERN_TYPES_STR "\n");
2836 	printf(" -t <time>                 time in seconds\n");
2837 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2838 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2839 	printf(" -P <num>                  number of moving average period\n");
2840 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2841 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2842 	printf("\t\t(only valid with -S)\n");
2843 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2844 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2845 	printf(" -f                        continue processing I/O even after failures\n");
2846 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2847 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2848 	printf(" -z                        start bdevperf, but wait for perform_tests RPC to start tests\n");
2849 	printf("                           (See examples/bdev/bdevperf/bdevperf.py)\n");
2850 	printf(" -X                        abort timed out I/O\n");
2851 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2852 	printf(" -j <filename>             use job config file\n");
2853 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2854 	printf(" -D                        use a random map for picking offsets not previously read or written (for all jobs)\n");
2855 	printf(" -E                        share per lcore thread among jobs. Available only if -j is not used.\n");
2856 	printf(" -J                        File name to open with append mode and log JSON RPC calls.\n");
2857 	printf(" -U                        generate unique data for each write I/O, has no effect on non-write I/O\n");
2858 }
2859 
2860 static void
2861 bdevperf_fini(void)
2862 {
2863 	free_job_config();
2864 	free(g_workload_type);
2865 
2866 	if (g_rpc_log_file != NULL) {
2867 		fclose(g_rpc_log_file);
2868 		g_rpc_log_file = NULL;
2869 	}
2870 }
2871 
2872 static int
2873 verify_test_params(void)
2874 {
2875 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2876 		goto out;
2877 	}
2878 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2879 		goto out;
2880 	}
2881 	if (!g_bdevperf_conf_file && !g_workload_type) {
2882 		goto out;
2883 	}
2884 	if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
2885 		printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
2886 		goto out;
2887 	}
2888 	if (g_time_in_sec <= 0) {
2889 		goto out;
2890 	}
2891 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2892 
2893 	if (g_timeout_in_sec < 0) {
2894 		goto out;
2895 	}
2896 
2897 	if (g_abort && !g_timeout_in_sec) {
2898 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2899 	}
2900 
2901 	if (g_show_performance_ema_period > 0 && g_summarize_performance) {
2902 		fprintf(stderr, "-P option must be specified with -S option\n");
2903 		return 1;
2904 	}
2905 
2906 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2907 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2908 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2909 		printf("Zero copy mechanism will not be used.\n");
2910 		g_zcopy = false;
2911 	}
2912 
2913 	if (g_bdevperf_conf_file) {
2914 		/* workload_type verification happens during config file parsing */
2915 		return 0;
2916 	}
2917 
2918 	if (!strcmp(g_workload_type, "verify") ||
2919 	    !strcmp(g_workload_type, "reset")) {
2920 		g_rw_percentage = 50;
2921 		g_verify = true;
2922 		if (!strcmp(g_workload_type, "reset")) {
2923 			g_reset = true;
2924 		}
2925 	}
2926 
2927 	if (!strcmp(g_workload_type, "read") ||
2928 	    !strcmp(g_workload_type, "randread") ||
2929 	    !strcmp(g_workload_type, "write") ||
2930 	    !strcmp(g_workload_type, "randwrite") ||
2931 	    !strcmp(g_workload_type, "verify") ||
2932 	    !strcmp(g_workload_type, "reset") ||
2933 	    !strcmp(g_workload_type, "unmap") ||
2934 	    !strcmp(g_workload_type, "write_zeroes") ||
2935 	    !strcmp(g_workload_type, "flush")) {
2936 		if (g_mix_specified) {
2937 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2938 				" only when using rw or randrw.\n");
2939 		}
2940 	}
2941 
2942 	if (!strcmp(g_workload_type, "rw") ||
2943 	    !strcmp(g_workload_type, "randrw")) {
2944 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2945 			fprintf(stderr,
2946 				"-M must be specified to value from 0 to 100 "
2947 				"for rw or randrw.\n");
2948 			return 1;
2949 		}
2950 	}
2951 
2952 	if (strcmp(g_workload_type, "randread") &&
2953 	    strcmp(g_workload_type, "randwrite") &&
2954 	    strcmp(g_workload_type, "randrw")) {
2955 		if (g_random_map) {
2956 			fprintf(stderr, "Ignoring -D option... Please use -D option"
2957 				" only when using randread, randwrite or randrw.\n");
2958 			return 1;
2959 		}
2960 	}
2961 
2962 	return 0;
2963 out:
2964 	return 1;
2965 }
2966 
2967 int
2968 main(int argc, char **argv)
2969 {
2970 	struct spdk_app_opts opts = {};
2971 	int rc;
2972 
2973 	/* Use the runtime PID to set the random seed */
2974 	srand(getpid());
2975 
2976 	spdk_app_opts_init(&opts, sizeof(opts));
2977 	opts.name = "bdevperf";
2978 	opts.rpc_addr = NULL;
2979 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2980 
2981 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:J:M:P:S:T:Xlj:DU", NULL,
2982 				      bdevperf_parse_arg, bdevperf_usage)) !=
2983 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2984 		return rc;
2985 	}
2986 
2987 	/* Set the default address if no rpc_addr was provided in args
2988 	 * and RPC is used for starting tests */
2989 	if (g_wait_for_tests && opts.rpc_addr == NULL) {
2990 		opts.rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2991 	}
2992 
2993 	if (read_job_config()) {
2994 		bdevperf_fini();
2995 		return 1;
2996 	}
2997 
2998 	if (g_rpc_log_file != NULL) {
2999 		opts.rpc_log_file = g_rpc_log_file;
3000 	}
3001 
3002 	if (verify_test_params() != 0 && !g_wait_for_tests) {
3003 		spdk_app_usage();
3004 		bdevperf_usage();
3005 		bdevperf_fini();
3006 		exit(1);
3007 	}
3008 
3009 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
3010 
3011 	spdk_app_fini();
3012 	bdevperf_fini();
3013 	return rc;
3014 }
3015