xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision b02581a89058ebaebe03bd0e16e3b58adfe406c1)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 #define PATTERN_TYPES_STR "(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush, write_zeroes)"
28 
29 struct bdevperf_task {
30 	struct iovec			iov;
31 	struct bdevperf_job		*job;
32 	struct spdk_bdev_io		*bdev_io;
33 	void				*buf;
34 	void				*md_buf;
35 	uint64_t			offset_blocks;
36 	struct bdevperf_task		*task_to_abort;
37 	enum spdk_bdev_io_type		io_type;
38 	TAILQ_ENTRY(bdevperf_task)	link;
39 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
40 };
41 
42 static char *g_workload_type = NULL;
43 static int g_io_size = 0;
44 /* initialize to invalid value so we can detect if user overrides it. */
45 static int g_rw_percentage = -1;
46 static bool g_verify = false;
47 static bool g_reset = false;
48 static bool g_continue_on_failure = false;
49 static bool g_abort = false;
50 static bool g_error_to_exit = false;
51 static int g_queue_depth = 0;
52 static uint64_t g_time_in_usec;
53 static int g_show_performance_real_time = 0;
54 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
55 static uint64_t g_show_performance_period_num = 0;
56 static uint64_t g_show_performance_ema_period = 0;
57 static int g_run_rc = 0;
58 static bool g_shutdown = false;
59 static uint64_t g_start_tsc;
60 static uint64_t g_shutdown_tsc;
61 static bool g_zcopy = false;
62 static struct spdk_thread *g_main_thread;
63 static int g_time_in_sec = 0;
64 static bool g_mix_specified = false;
65 static const char *g_job_bdev_name;
66 static bool g_wait_for_tests = false;
67 static struct spdk_jsonrpc_request *g_request = NULL;
68 static bool g_multithread_mode = false;
69 static int g_timeout_in_sec;
70 static struct spdk_conf *g_bdevperf_conf = NULL;
71 static const char *g_bdevperf_conf_file = NULL;
72 static double g_zipf_theta;
73 static bool g_random_map = false;
74 static bool g_unique_writes = false;
75 
76 static struct spdk_cpuset g_all_cpuset;
77 static struct spdk_poller *g_perf_timer = NULL;
78 
79 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
80 static void rpc_perform_tests_cb(void);
81 static int bdevperf_parse_arg(int ch, char *arg);
82 static int verify_test_params(void);
83 static void bdevperf_usage(void);
84 
85 static uint32_t g_bdev_count = 0;
86 static uint32_t g_latency_display_level;
87 
88 static bool g_one_thread_per_lcore = false;
89 
90 static const double g_latency_cutoffs[] = {
91 	0.01,
92 	0.10,
93 	0.25,
94 	0.50,
95 	0.75,
96 	0.90,
97 	0.95,
98 	0.98,
99 	0.99,
100 	0.995,
101 	0.999,
102 	0.9999,
103 	0.99999,
104 	0.999999,
105 	0.9999999,
106 	-1,
107 };
108 
109 static const char *g_rpc_log_file_name = NULL;
110 static FILE *g_rpc_log_file = NULL;
111 
112 struct latency_info {
113 	uint64_t	min;
114 	uint64_t	max;
115 	uint64_t	total;
116 };
117 
118 
119 enum job_config_rw {
120 	JOB_CONFIG_RW_READ = 0,
121 	JOB_CONFIG_RW_WRITE,
122 	JOB_CONFIG_RW_RANDREAD,
123 	JOB_CONFIG_RW_RANDWRITE,
124 	JOB_CONFIG_RW_RW,
125 	JOB_CONFIG_RW_RANDRW,
126 	JOB_CONFIG_RW_VERIFY,
127 	JOB_CONFIG_RW_RESET,
128 	JOB_CONFIG_RW_UNMAP,
129 	JOB_CONFIG_RW_FLUSH,
130 	JOB_CONFIG_RW_WRITE_ZEROES,
131 };
132 
133 struct bdevperf_job {
134 	char				*name;
135 	struct spdk_bdev		*bdev;
136 	struct spdk_bdev_desc		*bdev_desc;
137 	struct spdk_io_channel		*ch;
138 	TAILQ_ENTRY(bdevperf_job)	link;
139 	struct spdk_thread		*thread;
140 
141 	enum job_config_rw		workload_type;
142 	int				io_size;
143 	int				rw_percentage;
144 	bool				is_random;
145 	bool				verify;
146 	bool				reset;
147 	bool				continue_on_failure;
148 	bool				unmap;
149 	bool				write_zeroes;
150 	bool				flush;
151 	bool				abort;
152 	int				queue_depth;
153 	unsigned int			seed;
154 
155 	uint64_t			io_completed;
156 	uint64_t			io_failed;
157 	uint64_t			io_timeout;
158 	uint64_t			prev_io_completed;
159 	double				ema_io_per_second;
160 	int				current_queue_depth;
161 	uint64_t			size_in_ios;
162 	uint64_t			ios_base;
163 	uint64_t			offset_in_ios;
164 	uint64_t			io_size_blocks;
165 	uint64_t			buf_size;
166 	uint32_t			dif_check_flags;
167 	bool				is_draining;
168 	struct spdk_poller		*run_timer;
169 	struct spdk_poller		*reset_timer;
170 	struct spdk_bit_array		*outstanding;
171 	struct spdk_zipf		*zipf;
172 	TAILQ_HEAD(, bdevperf_task)	task_list;
173 	uint64_t			run_time_in_usec;
174 
175 	/* keep channel's histogram data before being destroyed */
176 	struct spdk_histogram_data	*histogram;
177 	struct spdk_bit_array		*random_map;
178 
179 	/* counter used for generating unique write data (-U option) */
180 	uint32_t			write_io_count;
181 };
182 
183 struct spdk_bdevperf {
184 	TAILQ_HEAD(, bdevperf_job)	jobs;
185 	uint32_t			running_jobs;
186 };
187 
188 static struct spdk_bdevperf g_bdevperf = {
189 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
190 	.running_jobs = 0,
191 };
192 
193 /* Storing values from a section of job config file */
194 struct job_config {
195 	const char			*name;
196 	const char			*filename;
197 	struct spdk_cpuset		cpumask;
198 	int				bs;
199 	int				iodepth;
200 	int				rwmixread;
201 	uint32_t			lcore;
202 	int64_t				offset;
203 	uint64_t			length;
204 	enum job_config_rw		rw;
205 	TAILQ_ENTRY(job_config)	link;
206 };
207 
208 TAILQ_HEAD(, job_config) job_config_list
209 	= TAILQ_HEAD_INITIALIZER(job_config_list);
210 
211 static bool g_performance_dump_active = false;
212 
213 struct bdevperf_aggregate_stats {
214 	struct bdevperf_job		*current_job;
215 	uint64_t			io_time_in_usec;
216 	uint64_t			ema_period;
217 	double				total_io_per_second;
218 	double				total_mb_per_second;
219 	double				total_failed_per_second;
220 	double				total_timeout_per_second;
221 	double				min_latency;
222 	double				max_latency;
223 	uint64_t			total_io_completed;
224 	uint64_t			total_tsc;
225 };
226 
227 static struct bdevperf_aggregate_stats g_stats = {.min_latency = (double)UINT64_MAX};
228 
229 struct lcore_thread {
230 	struct spdk_thread		*thread;
231 	uint32_t			lcore;
232 	TAILQ_ENTRY(lcore_thread)	link;
233 };
234 
235 TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
236 	= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
237 
238 
239 static char *
240 parse_workload_type(enum job_config_rw ret)
241 {
242 	switch (ret) {
243 	case JOB_CONFIG_RW_READ:
244 		return "read";
245 	case JOB_CONFIG_RW_RANDREAD:
246 		return "randread";
247 	case JOB_CONFIG_RW_WRITE:
248 		return "write";
249 	case JOB_CONFIG_RW_RANDWRITE:
250 		return "randwrite";
251 	case JOB_CONFIG_RW_VERIFY:
252 		return "verify";
253 	case JOB_CONFIG_RW_RESET:
254 		return "reset";
255 	case JOB_CONFIG_RW_UNMAP:
256 		return "unmap";
257 	case JOB_CONFIG_RW_WRITE_ZEROES:
258 		return "write_zeroes";
259 	case JOB_CONFIG_RW_FLUSH:
260 		return "flush";
261 	case JOB_CONFIG_RW_RW:
262 		return "rw";
263 	case JOB_CONFIG_RW_RANDRW:
264 		return "randrw";
265 	default:
266 		fprintf(stderr, "wrong workload_type code\n");
267 	}
268 
269 	return NULL;
270 }
271 
272 /*
273  * Cumulative Moving Average (CMA): average of all data up to current
274  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
275  * Simple Moving Average (SMA): unweighted mean of the previous n data
276  *
277  * Bdevperf supports CMA and EMA.
278  */
279 static double
280 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
281 {
282 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
283 }
284 
285 static double
286 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
287 {
288 	double io_completed, io_per_second;
289 
290 	io_completed = job->io_completed;
291 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
292 			/ g_show_performance_period_in_usec;
293 	job->prev_io_completed = io_completed;
294 
295 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
296 				  / (ema_period + 1);
297 	return job->ema_io_per_second;
298 }
299 
300 static void
301 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
302 		uint64_t total, uint64_t so_far)
303 {
304 	struct latency_info *latency_info = ctx;
305 
306 	if (count == 0) {
307 		return;
308 	}
309 
310 	latency_info->total += (start + end) / 2 * count;
311 
312 	if (so_far == count) {
313 		latency_info->min = start;
314 	}
315 
316 	if (so_far == total) {
317 		latency_info->max = end;
318 	}
319 }
320 
321 static void
322 performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job *job)
323 {
324 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
325 	double average_latency = 0.0, min_latency, max_latency;
326 	uint64_t time_in_usec;
327 	uint64_t tsc_rate;
328 	uint64_t total_io;
329 	struct latency_info latency_info = {};
330 
331 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
332 		printf("\r Job: %s (Core Mask 0x%s, workload: %s, percentage: %d, depth: %d, IO size: %d)\n",
333 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
334 		       parse_workload_type(job->workload_type), job->rw_percentage,
335 		       job->queue_depth, job->io_size);
336 	} else {
337 		printf("\r Job: %s (Core Mask 0x%s, workload: %s, depth: %d, IO size: %d)\n",
338 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
339 		       parse_workload_type(job->workload_type), job->queue_depth, job->io_size);
340 	}
341 
342 
343 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
344 		printf("\r Job: %s ended in about %.2f seconds with error\n",
345 		       job->name, (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
346 	}
347 	if (job->verify) {
348 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
349 		       job->ios_base, job->size_in_ios);
350 	}
351 
352 	if (g_performance_dump_active == true) {
353 		/* Use job's actual run time as Job has ended */
354 		if (job->io_failed > 0 && !job->continue_on_failure) {
355 			time_in_usec = job->run_time_in_usec;
356 		} else {
357 			time_in_usec = stats->io_time_in_usec;
358 		}
359 	} else {
360 		time_in_usec = job->run_time_in_usec;
361 	}
362 
363 	if (stats->ema_period == 0) {
364 		io_per_second = get_cma_io_per_second(job, time_in_usec);
365 	} else {
366 		io_per_second = get_ema_io_per_second(job, stats->ema_period);
367 	}
368 
369 	tsc_rate = spdk_get_ticks_hz();
370 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
371 
372 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
373 
374 	total_io = job->io_completed + job->io_failed;
375 	if (total_io != 0) {
376 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
377 	}
378 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
379 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
380 
381 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
382 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
383 
384 	printf("\t %-20s: %10.2f %10.2f %10.2f",
385 	       job->name, (float)time_in_usec / SPDK_SEC_TO_USEC, io_per_second, mb_per_second);
386 	printf(" %10.2f %8.2f",
387 	       failed_per_second, timeout_per_second);
388 	printf(" %10.2f %10.2f %10.2f\n",
389 	       average_latency, min_latency, max_latency);
390 
391 	stats->total_io_per_second += io_per_second;
392 	stats->total_mb_per_second += mb_per_second;
393 	stats->total_failed_per_second += failed_per_second;
394 	stats->total_timeout_per_second += timeout_per_second;
395 	stats->total_io_completed += job->io_completed + job->io_failed;
396 	stats->total_tsc += latency_info.total;
397 	if (min_latency < stats->min_latency) {
398 		stats->min_latency = min_latency;
399 	}
400 	if (max_latency > stats->max_latency) {
401 		stats->max_latency = max_latency;
402 	}
403 }
404 
405 static void
406 generate_data(struct bdevperf_job *job, void *buf, void *md_buf, bool unique)
407 {
408 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
409 	int buf_len = job->buf_size;
410 	int block_size = spdk_bdev_get_block_size(job->bdev);
411 	int md_size = spdk_bdev_get_md_size(job->bdev);
412 	int num_blocks = job->io_size_blocks;
413 
414 	if (buf_len < num_blocks * block_size) {
415 		return;
416 	}
417 
418 	if (md_buf == NULL) {
419 		data_block_size = block_size - md_size;
420 		md_buf = (char *)buf + data_block_size;
421 		md_offset = block_size;
422 	} else {
423 		data_block_size = block_size;
424 		md_offset = md_size;
425 	}
426 
427 	if (unique) {
428 		uint64_t io_count = job->write_io_count++;
429 		unsigned int i;
430 
431 		assert(md_size == 0 || md_size >= (int)sizeof(uint64_t));
432 
433 		while (offset_blocks < num_blocks) {
434 			inner_offset = 0;
435 			while (inner_offset < data_block_size) {
436 				*(uint64_t *)buf = (io_count << 32) | (offset_blocks + inner_offset);
437 				inner_offset += sizeof(uint64_t);
438 				buf += sizeof(uint64_t);
439 			}
440 			for (i = 0; i < md_size / sizeof(uint64_t); i++) {
441 				((uint64_t *)md_buf)[i] = (io_count << 32) | offset_blocks;
442 			}
443 			md_buf += md_offset;
444 			offset_blocks++;
445 		}
446 		return;
447 	}
448 
449 	while (offset_blocks < num_blocks) {
450 		inner_offset = 0;
451 		while (inner_offset < data_block_size) {
452 			*(uint32_t *)buf = offset_blocks + inner_offset;
453 			inner_offset += sizeof(uint32_t);
454 			buf += sizeof(uint32_t);
455 		}
456 		memset(md_buf, offset_blocks, md_size);
457 		md_buf += md_offset;
458 		offset_blocks++;
459 	}
460 }
461 
462 static bool
463 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
464 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
465 {
466 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
467 		return false;
468 	}
469 
470 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
471 
472 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
473 
474 	if (wr_md_buf != NULL) {
475 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
476 	}
477 
478 	return true;
479 }
480 
481 static bool
482 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
483 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
484 {
485 	int offset_blocks = 0, md_offset, data_block_size;
486 
487 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
488 		return false;
489 	}
490 
491 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
492 
493 	if (wr_md_buf == NULL) {
494 		data_block_size = block_size - md_size;
495 		wr_md_buf = (char *)wr_buf + data_block_size;
496 		rd_md_buf = (char *)rd_buf + data_block_size;
497 		md_offset = block_size;
498 	} else {
499 		data_block_size = block_size;
500 		md_offset = md_size;
501 	}
502 
503 	while (offset_blocks < num_blocks) {
504 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
505 			return false;
506 		}
507 
508 		wr_buf += block_size;
509 		rd_buf += block_size;
510 
511 		if (md_check) {
512 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
513 				return false;
514 			}
515 
516 			wr_md_buf += md_offset;
517 			rd_md_buf += md_offset;
518 		}
519 
520 		offset_blocks++;
521 	}
522 
523 	return true;
524 }
525 
526 static void
527 free_job_config(void)
528 {
529 	struct job_config *config, *tmp;
530 
531 	spdk_conf_free(g_bdevperf_conf);
532 	g_bdevperf_conf = NULL;
533 
534 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
535 		TAILQ_REMOVE(&job_config_list, config, link);
536 		free(config);
537 	}
538 }
539 
540 static void
541 bdevperf_job_free(struct bdevperf_job *job)
542 {
543 	spdk_histogram_data_free(job->histogram);
544 	spdk_bit_array_free(&job->outstanding);
545 	spdk_bit_array_free(&job->random_map);
546 	spdk_zipf_free(&job->zipf);
547 	free(job->name);
548 	free(job);
549 }
550 
551 static void
552 job_thread_exit(void *ctx)
553 {
554 	spdk_thread_exit(spdk_get_thread());
555 }
556 
557 static void
558 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
559 	     uint64_t total, uint64_t so_far)
560 {
561 	double so_far_pct;
562 	double **cutoff = ctx;
563 	uint64_t tsc_rate;
564 
565 	if (count == 0) {
566 		return;
567 	}
568 
569 	tsc_rate = spdk_get_ticks_hz();
570 	so_far_pct = (double)so_far / total;
571 	while (so_far_pct >= **cutoff && **cutoff > 0) {
572 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
573 		(*cutoff)++;
574 	}
575 }
576 
577 static void
578 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
579 	     uint64_t total, uint64_t so_far)
580 {
581 	double so_far_pct;
582 	uint64_t tsc_rate;
583 
584 	if (count == 0) {
585 		return;
586 	}
587 
588 	tsc_rate = spdk_get_ticks_hz();
589 	so_far_pct = (double)so_far * 100 / total;
590 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
591 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
592 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
593 	       so_far_pct, count);
594 }
595 
596 static void
597 bdevperf_test_done(void *ctx)
598 {
599 	struct bdevperf_job *job, *jtmp;
600 	struct bdevperf_task *task, *ttmp;
601 	struct lcore_thread *lthread, *lttmp;
602 	double average_latency = 0.0;
603 	uint64_t time_in_usec;
604 	int rc;
605 
606 	if (g_time_in_usec) {
607 		g_stats.io_time_in_usec = g_time_in_usec;
608 
609 		if (!g_run_rc && g_performance_dump_active) {
610 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
611 			return;
612 		}
613 	}
614 
615 	if (g_show_performance_real_time) {
616 		spdk_poller_unregister(&g_perf_timer);
617 	}
618 
619 	if (g_shutdown) {
620 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
621 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
622 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
623 		printf("Received shutdown signal, test time was about %.6f seconds\n",
624 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
625 	}
626 
627 	printf("\n%*s\n", 107, "Latency(us)");
628 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
629 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
630 
631 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
632 		performance_dump_job(&g_stats, job);
633 	}
634 
635 	printf("\r =================================================================================="
636 	       "=================================\n");
637 	printf("\r %-28s: %10s %10.2f %10.2f",
638 	       "Total", "", g_stats.total_io_per_second, g_stats.total_mb_per_second);
639 	printf(" %10.2f %8.2f",
640 	       g_stats.total_failed_per_second, g_stats.total_timeout_per_second);
641 
642 	if (g_stats.total_io_completed != 0) {
643 		average_latency = ((double)g_stats.total_tsc / g_stats.total_io_completed) * SPDK_SEC_TO_USEC /
644 				  spdk_get_ticks_hz();
645 	}
646 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.min_latency, g_stats.max_latency);
647 
648 	fflush(stdout);
649 
650 	if (g_latency_display_level == 0 || g_stats.total_io_completed == 0) {
651 		goto clean;
652 	}
653 
654 	printf("\n Latency summary\n");
655 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
656 		printf("\r =============================================\n");
657 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
658 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
659 
660 		const double *cutoff = g_latency_cutoffs;
661 
662 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
663 
664 		printf("\n");
665 	}
666 
667 	if (g_latency_display_level == 1) {
668 		goto clean;
669 	}
670 
671 	printf("\r Latency histogram\n");
672 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
673 		printf("\r =============================================\n");
674 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
675 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
676 
677 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
678 		printf("\n");
679 	}
680 
681 clean:
682 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
683 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
684 
685 		if (!g_one_thread_per_lcore) {
686 			spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
687 		}
688 
689 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
690 			TAILQ_REMOVE(&job->task_list, task, link);
691 			spdk_free(task->buf);
692 			spdk_free(task->md_buf);
693 			free(task);
694 		}
695 
696 		bdevperf_job_free(job);
697 	}
698 
699 	if (g_one_thread_per_lcore) {
700 		TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
701 			TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
702 			spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
703 			free(lthread);
704 		}
705 	}
706 
707 	if (g_bdevperf_conf == NULL) {
708 		free_job_config();
709 	}
710 
711 	rc = g_run_rc;
712 	if (g_request && !g_shutdown) {
713 		rpc_perform_tests_cb();
714 		if (rc != 0) {
715 			spdk_app_stop(rc);
716 		}
717 	} else {
718 		spdk_app_stop(rc);
719 	}
720 }
721 
722 static void
723 bdevperf_job_end(void *ctx)
724 {
725 	assert(g_main_thread == spdk_get_thread());
726 
727 	if (--g_bdevperf.running_jobs == 0) {
728 		bdevperf_test_done(NULL);
729 	}
730 }
731 
732 static void
733 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
734 {
735 	struct spdk_histogram_data *job_hist = cb_arg;
736 
737 	if (status == 0) {
738 		spdk_histogram_data_merge(job_hist, histogram);
739 	}
740 }
741 
742 static void
743 bdevperf_job_empty(struct bdevperf_job *job)
744 {
745 	uint64_t end_tsc = 0;
746 
747 	end_tsc = spdk_get_ticks() - g_start_tsc;
748 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
749 	/* keep histogram info before channel is destroyed */
750 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
751 					job->histogram);
752 	spdk_put_io_channel(job->ch);
753 	spdk_bdev_close(job->bdev_desc);
754 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
755 }
756 
757 static void
758 bdevperf_end_task(struct bdevperf_task *task)
759 {
760 	struct bdevperf_job     *job = task->job;
761 
762 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
763 	if (job->is_draining) {
764 		if (job->current_queue_depth == 0) {
765 			bdevperf_job_empty(job);
766 		}
767 	}
768 }
769 
770 static void
771 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
772 {
773 	struct bdevperf_job	*job = task->job;
774 
775 	task->bdev_io_wait.bdev = job->bdev;
776 	task->bdev_io_wait.cb_fn = cb_fn;
777 	task->bdev_io_wait.cb_arg = task;
778 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
779 }
780 
781 static int
782 bdevperf_job_drain(void *ctx)
783 {
784 	struct bdevperf_job *job = ctx;
785 
786 	spdk_poller_unregister(&job->run_timer);
787 	if (job->reset) {
788 		spdk_poller_unregister(&job->reset_timer);
789 	}
790 
791 	job->is_draining = true;
792 
793 	return -1;
794 }
795 
796 static int
797 bdevperf_job_drain_timer(void *ctx)
798 {
799 	struct bdevperf_job *job = ctx;
800 
801 	bdevperf_job_drain(ctx);
802 	if (job->current_queue_depth == 0) {
803 		bdevperf_job_empty(job);
804 	}
805 
806 	return SPDK_POLLER_BUSY;
807 }
808 
809 static void
810 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
811 {
812 	struct bdevperf_task	*task = cb_arg;
813 	struct bdevperf_job	*job = task->job;
814 
815 	job->current_queue_depth--;
816 
817 	if (success) {
818 		job->io_completed++;
819 	} else {
820 		job->io_failed++;
821 		if (!job->continue_on_failure) {
822 			bdevperf_job_drain(job);
823 			g_run_rc = -1;
824 		}
825 	}
826 
827 	spdk_bdev_free_io(bdev_io);
828 	bdevperf_end_task(task);
829 }
830 
831 static int
832 bdevperf_verify_dif(struct bdevperf_task *task, struct iovec *iovs, int iovcnt)
833 {
834 	struct bdevperf_job	*job = task->job;
835 	struct spdk_bdev	*bdev = job->bdev;
836 	struct spdk_dif_ctx	dif_ctx;
837 	struct spdk_dif_error	err_blk = {};
838 	int			rc;
839 	struct spdk_dif_ctx_init_ext_opts dif_opts;
840 
841 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
842 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
843 	rc = spdk_dif_ctx_init(&dif_ctx,
844 			       spdk_bdev_get_block_size(bdev),
845 			       spdk_bdev_get_md_size(bdev),
846 			       spdk_bdev_is_md_interleaved(bdev),
847 			       spdk_bdev_is_dif_head_of_md(bdev),
848 			       spdk_bdev_get_dif_type(bdev),
849 			       job->dif_check_flags,
850 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
851 	if (rc != 0) {
852 		fprintf(stderr, "Initialization of DIF context failed\n");
853 		return rc;
854 	}
855 
856 	if (spdk_bdev_is_md_interleaved(bdev)) {
857 		rc = spdk_dif_verify(iovs, iovcnt, job->io_size_blocks, &dif_ctx, &err_blk);
858 	} else {
859 		struct iovec md_iov = {
860 			.iov_base	= task->md_buf,
861 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
862 		};
863 
864 		rc = spdk_dix_verify(iovs, iovcnt, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
865 	}
866 
867 	if (rc != 0) {
868 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
869 			err_blk.err_type, err_blk.err_offset);
870 	}
871 
872 	return rc;
873 }
874 
875 static void
876 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
877 {
878 	struct bdevperf_job	*job;
879 	struct bdevperf_task	*task = cb_arg;
880 	struct iovec		*iovs;
881 	int			iovcnt;
882 	bool			md_check;
883 	uint64_t		offset_in_ios;
884 	int			rc;
885 
886 	job = task->job;
887 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
888 
889 	if (g_error_to_exit == true) {
890 		bdevperf_job_drain(job);
891 	} else if (!success) {
892 		if (!job->reset && !job->continue_on_failure) {
893 			bdevperf_job_drain(job);
894 			g_run_rc = -1;
895 			g_error_to_exit = true;
896 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
897 			       task->offset_blocks, job->name);
898 		}
899 	} else if (job->verify || job->reset) {
900 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
901 		assert(iovcnt == 1);
902 		assert(iovs != NULL);
903 		if (!verify_data(task->buf, job->buf_size, iovs[0].iov_base, iovs[0].iov_len,
904 				 spdk_bdev_get_block_size(job->bdev),
905 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
906 				 spdk_bdev_get_md_size(job->bdev),
907 				 job->io_size_blocks, md_check)) {
908 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
909 			printf("   First dword expected 0x%x got 0x%x\n", *(int *)task->buf, *(int *)iovs[0].iov_base);
910 			bdevperf_job_drain(job);
911 			g_run_rc = -1;
912 		}
913 	} else if (job->dif_check_flags != 0) {
914 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
915 			spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
916 			assert(iovcnt == 1);
917 			assert(iovs != NULL);
918 			rc = bdevperf_verify_dif(task, iovs, iovcnt);
919 			if (rc != 0) {
920 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
921 				       task->offset_blocks, job->name);
922 
923 				success = false;
924 				if (!job->reset && !job->continue_on_failure) {
925 					bdevperf_job_drain(job);
926 					g_run_rc = -1;
927 					g_error_to_exit = true;
928 				}
929 			}
930 		}
931 	}
932 
933 	job->current_queue_depth--;
934 
935 	if (success) {
936 		job->io_completed++;
937 	} else {
938 		job->io_failed++;
939 	}
940 
941 	if (job->verify) {
942 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
943 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
944 
945 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
946 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
947 	}
948 
949 	spdk_bdev_free_io(bdev_io);
950 
951 	/*
952 	 * is_draining indicates when time has expired for the test run
953 	 * and we are just waiting for the previously submitted I/O
954 	 * to complete.  In this case, do not submit a new I/O to replace
955 	 * the one just completed.
956 	 */
957 	if (!job->is_draining) {
958 		bdevperf_submit_single(job, task);
959 	} else {
960 		bdevperf_end_task(task);
961 	}
962 }
963 
964 static void
965 bdevperf_verify_submit_read(void *cb_arg)
966 {
967 	struct bdevperf_job	*job;
968 	struct bdevperf_task	*task = cb_arg;
969 	int			rc;
970 
971 	job = task->job;
972 
973 	/* Read the data back in */
974 	rc = spdk_bdev_read_blocks_with_md(job->bdev_desc, job->ch, NULL, NULL,
975 					   task->offset_blocks, job->io_size_blocks,
976 					   bdevperf_complete, task);
977 
978 	if (rc == -ENOMEM) {
979 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
980 	} else if (rc != 0) {
981 		printf("Failed to submit read: %d\n", rc);
982 		bdevperf_job_drain(job);
983 		g_run_rc = rc;
984 	}
985 }
986 
987 static void
988 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
989 			       void *cb_arg)
990 {
991 	if (success) {
992 		spdk_bdev_free_io(bdev_io);
993 		bdevperf_verify_submit_read(cb_arg);
994 	} else {
995 		bdevperf_complete(bdev_io, success, cb_arg);
996 	}
997 }
998 
999 static void
1000 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1001 {
1002 	if (!success) {
1003 		bdevperf_complete(bdev_io, success, cb_arg);
1004 		return;
1005 	}
1006 
1007 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
1008 }
1009 
1010 static int
1011 bdevperf_generate_dif(struct bdevperf_task *task)
1012 {
1013 	struct bdevperf_job	*job = task->job;
1014 	struct spdk_bdev	*bdev = job->bdev;
1015 	struct spdk_dif_ctx	dif_ctx;
1016 	int			rc;
1017 	struct spdk_dif_ctx_init_ext_opts dif_opts;
1018 
1019 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
1020 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
1021 	rc = spdk_dif_ctx_init(&dif_ctx,
1022 			       spdk_bdev_get_block_size(bdev),
1023 			       spdk_bdev_get_md_size(bdev),
1024 			       spdk_bdev_is_md_interleaved(bdev),
1025 			       spdk_bdev_is_dif_head_of_md(bdev),
1026 			       spdk_bdev_get_dif_type(bdev),
1027 			       job->dif_check_flags,
1028 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
1029 	if (rc != 0) {
1030 		fprintf(stderr, "Initialization of DIF context failed\n");
1031 		return rc;
1032 	}
1033 
1034 	if (spdk_bdev_is_md_interleaved(bdev)) {
1035 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
1036 	} else {
1037 		struct iovec md_iov = {
1038 			.iov_base	= task->md_buf,
1039 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
1040 		};
1041 
1042 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
1043 	}
1044 
1045 	if (rc != 0) {
1046 		fprintf(stderr, "Generation of DIF/DIX failed\n");
1047 	}
1048 
1049 	return rc;
1050 }
1051 
1052 static void
1053 bdevperf_submit_task(void *arg)
1054 {
1055 	struct bdevperf_task	*task = arg;
1056 	struct bdevperf_job	*job = task->job;
1057 	struct spdk_bdev_desc	*desc;
1058 	struct spdk_io_channel	*ch;
1059 	spdk_bdev_io_completion_cb cb_fn;
1060 	uint64_t		offset_in_ios;
1061 	int			rc = 0;
1062 
1063 	desc = job->bdev_desc;
1064 	ch = job->ch;
1065 
1066 	switch (task->io_type) {
1067 	case SPDK_BDEV_IO_TYPE_WRITE:
1068 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
1069 			rc = bdevperf_generate_dif(task);
1070 		}
1071 		if (rc == 0) {
1072 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
1073 
1074 			if (g_zcopy) {
1075 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
1076 				return;
1077 			} else {
1078 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
1079 								     task->md_buf,
1080 								     task->offset_blocks,
1081 								     job->io_size_blocks,
1082 								     cb_fn, task);
1083 			}
1084 		}
1085 		break;
1086 	case SPDK_BDEV_IO_TYPE_FLUSH:
1087 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
1088 					    job->io_size_blocks, bdevperf_complete, task);
1089 		break;
1090 	case SPDK_BDEV_IO_TYPE_UNMAP:
1091 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
1092 					    job->io_size_blocks, bdevperf_complete, task);
1093 		break;
1094 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1095 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
1096 						   job->io_size_blocks, bdevperf_complete, task);
1097 		break;
1098 	case SPDK_BDEV_IO_TYPE_READ:
1099 		if (g_zcopy) {
1100 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
1101 						   true, bdevperf_zcopy_populate_complete, task);
1102 		} else {
1103 			rc = spdk_bdev_read_blocks_with_md(desc, ch, task->buf, task->md_buf,
1104 							   task->offset_blocks,
1105 							   job->io_size_blocks,
1106 							   bdevperf_complete, task);
1107 		}
1108 		break;
1109 	case SPDK_BDEV_IO_TYPE_ABORT:
1110 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
1111 		break;
1112 	default:
1113 		assert(false);
1114 		rc = -EINVAL;
1115 		break;
1116 	}
1117 
1118 	if (rc == -ENOMEM) {
1119 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1120 		return;
1121 	} else if (rc != 0) {
1122 		printf("Failed to submit bdev_io: %d\n", rc);
1123 		if (job->verify) {
1124 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1125 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1126 
1127 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1128 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1129 		}
1130 		bdevperf_job_drain(job);
1131 		g_run_rc = rc;
1132 		return;
1133 	}
1134 
1135 	job->current_queue_depth++;
1136 }
1137 
1138 static void
1139 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1140 {
1141 	struct bdevperf_task	*task = cb_arg;
1142 	struct bdevperf_job	*job = task->job;
1143 	struct iovec		*iovs;
1144 	int			iovcnt;
1145 
1146 	if (!success) {
1147 		bdevperf_job_drain(job);
1148 		g_run_rc = -1;
1149 		return;
1150 	}
1151 
1152 	task->bdev_io = bdev_io;
1153 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1154 
1155 	if (job->verify || job->reset) {
1156 		/* When job->verify or job->reset is enabled, task->buf is used for
1157 		 *  verification of read after write.  For write I/O, when zcopy APIs
1158 		 *  are used, task->buf cannot be used, and data must be written to
1159 		 *  the data buffer allocated underneath bdev layer instead.
1160 		 *  Hence we copy task->buf to the allocated data buffer here.
1161 		 */
1162 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1163 		assert(iovcnt == 1);
1164 		assert(iovs != NULL);
1165 
1166 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1167 			  spdk_bdev_get_block_size(job->bdev),
1168 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1169 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
1170 	}
1171 
1172 	bdevperf_submit_task(task);
1173 }
1174 
1175 static void
1176 bdevperf_prep_zcopy_write_task(void *arg)
1177 {
1178 	struct bdevperf_task	*task = arg;
1179 	struct bdevperf_job	*job = task->job;
1180 	int			rc;
1181 
1182 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1183 				   task->offset_blocks, job->io_size_blocks,
1184 				   false, bdevperf_zcopy_get_buf_complete, task);
1185 	if (rc != 0) {
1186 		assert(rc == -ENOMEM);
1187 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1188 		return;
1189 	}
1190 
1191 	job->current_queue_depth++;
1192 }
1193 
1194 static struct bdevperf_task *
1195 bdevperf_job_get_task(struct bdevperf_job *job)
1196 {
1197 	struct bdevperf_task *task;
1198 
1199 	task = TAILQ_FIRST(&job->task_list);
1200 	if (!task) {
1201 		printf("Task allocation failed\n");
1202 		abort();
1203 	}
1204 
1205 	TAILQ_REMOVE(&job->task_list, task, link);
1206 	return task;
1207 }
1208 
1209 static void
1210 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1211 {
1212 	uint64_t offset_in_ios;
1213 	uint64_t rand_value;
1214 	uint32_t first_clear;
1215 
1216 	if (job->zipf) {
1217 		offset_in_ios = spdk_zipf_generate(job->zipf);
1218 	} else if (job->is_random) {
1219 		/* RAND_MAX is only INT32_MAX, so use 2 calls to rand_r to
1220 		 * get a large enough value to ensure we are issuing I/O
1221 		 * uniformly across the whole bdev.
1222 		 */
1223 		rand_value = (uint64_t)rand_r(&job->seed) * RAND_MAX + rand_r(&job->seed);
1224 		offset_in_ios = rand_value % job->size_in_ios;
1225 
1226 		if (g_random_map) {
1227 			/* Make sure, that the offset does not exceed the maximum size
1228 			 * of the bit array (verified during job creation)
1229 			 */
1230 			assert(offset_in_ios < UINT32_MAX);
1231 
1232 			first_clear = spdk_bit_array_find_first_clear(job->random_map, (uint32_t)offset_in_ios);
1233 
1234 			if (first_clear == UINT32_MAX) {
1235 				first_clear = spdk_bit_array_find_first_clear(job->random_map, 0);
1236 
1237 				if (first_clear == UINT32_MAX) {
1238 					/* If there are no more clear bits in the array, we start over
1239 					 * and select the previously selected random value.
1240 					 */
1241 					spdk_bit_array_clear_mask(job->random_map);
1242 					first_clear = (uint32_t)offset_in_ios;
1243 				}
1244 			}
1245 
1246 			spdk_bit_array_set(job->random_map, first_clear);
1247 
1248 			offset_in_ios = first_clear;
1249 		}
1250 	} else {
1251 		offset_in_ios = job->offset_in_ios++;
1252 		if (job->offset_in_ios == job->size_in_ios) {
1253 			job->offset_in_ios = 0;
1254 		}
1255 
1256 		/* Increment of offset_in_ios if there's already an outstanding IO
1257 		 * to that location. We only need this with job->verify as random
1258 		 * offsets are not supported with job->verify at this time.
1259 		 */
1260 		if (job->verify) {
1261 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1262 
1263 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1264 				offset_in_ios = job->offset_in_ios++;
1265 				if (job->offset_in_ios == job->size_in_ios) {
1266 					job->offset_in_ios = 0;
1267 				}
1268 			}
1269 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1270 		}
1271 	}
1272 
1273 	/* For multi-thread to same job, offset_in_ios is relative
1274 	 * to the LBA range assigned for that job. job->offset_blocks
1275 	 * is absolute (entire bdev LBA range).
1276 	 */
1277 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1278 
1279 	if (job->flush) {
1280 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1281 	} else if (job->unmap) {
1282 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1283 	} else if (job->write_zeroes) {
1284 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1285 	} else if ((job->rw_percentage == 100) ||
1286 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1287 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1288 	} else {
1289 		if (job->verify || job->reset || g_unique_writes) {
1290 			generate_data(job, task->buf, task->md_buf, g_unique_writes);
1291 		}
1292 		if (g_zcopy) {
1293 			bdevperf_prep_zcopy_write_task(task);
1294 			return;
1295 		} else {
1296 			task->iov.iov_base = task->buf;
1297 			task->iov.iov_len = job->buf_size;
1298 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1299 		}
1300 	}
1301 
1302 	bdevperf_submit_task(task);
1303 }
1304 
1305 static int reset_job(void *arg);
1306 
1307 static void
1308 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1309 {
1310 	struct bdevperf_task	*task = cb_arg;
1311 	struct bdevperf_job	*job = task->job;
1312 
1313 	if (!success) {
1314 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1315 		bdevperf_job_drain(job);
1316 		g_run_rc = -1;
1317 	}
1318 
1319 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1320 	spdk_bdev_free_io(bdev_io);
1321 
1322 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1323 						10 * SPDK_SEC_TO_USEC);
1324 }
1325 
1326 static int
1327 reset_job(void *arg)
1328 {
1329 	struct bdevperf_job *job = arg;
1330 	struct bdevperf_task *task;
1331 	int rc;
1332 
1333 	spdk_poller_unregister(&job->reset_timer);
1334 
1335 	/* Do reset. */
1336 	task = bdevperf_job_get_task(job);
1337 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1338 			     reset_cb, task);
1339 	if (rc) {
1340 		printf("Reset failed: %d\n", rc);
1341 		bdevperf_job_drain(job);
1342 		g_run_rc = -1;
1343 	}
1344 
1345 	return -1;
1346 }
1347 
1348 static void
1349 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1350 {
1351 	struct bdevperf_job *job = cb_arg;
1352 	struct bdevperf_task *task;
1353 
1354 	job->io_timeout++;
1355 
1356 	if (job->is_draining || !job->abort ||
1357 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1358 		return;
1359 	}
1360 
1361 	task = bdevperf_job_get_task(job);
1362 	if (task == NULL) {
1363 		return;
1364 	}
1365 
1366 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1367 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1368 
1369 	bdevperf_submit_task(task);
1370 }
1371 
1372 static void
1373 bdevperf_job_run(void *ctx)
1374 {
1375 	struct bdevperf_job *job = ctx;
1376 	struct bdevperf_task *task;
1377 	int i;
1378 
1379 	/* Submit initial I/O for this job. Each time one
1380 	 * completes, another will be submitted. */
1381 
1382 	/* Start a timer to stop this I/O chain when the run is over */
1383 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1384 	if (job->reset) {
1385 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1386 							10 * SPDK_SEC_TO_USEC);
1387 	}
1388 
1389 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1390 
1391 	for (i = 0; i < job->queue_depth; i++) {
1392 		task = bdevperf_job_get_task(job);
1393 		bdevperf_submit_single(job, task);
1394 	}
1395 }
1396 
1397 static void
1398 _performance_dump_done(void *ctx)
1399 {
1400 	struct bdevperf_aggregate_stats *stats = ctx;
1401 	double average_latency;
1402 
1403 	printf("\r =================================================================================="
1404 	       "=================================\n");
1405 	printf("\r %-28s: %10s %10.2f %10.2f",
1406 	       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1407 	printf(" %10.2f %8.2f",
1408 	       stats->total_failed_per_second, stats->total_timeout_per_second);
1409 
1410 	average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1411 			  spdk_get_ticks_hz();
1412 	printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1413 	printf("\n");
1414 
1415 	fflush(stdout);
1416 
1417 	g_performance_dump_active = false;
1418 
1419 	free(stats);
1420 }
1421 
1422 static void
1423 _performance_dump(void *ctx)
1424 {
1425 	struct bdevperf_aggregate_stats *stats = ctx;
1426 
1427 	performance_dump_job(stats, stats->current_job);
1428 
1429 	/* This assumes the jobs list is static after start up time.
1430 	 * That's true right now, but if that ever changed this would need a lock. */
1431 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1432 	if (stats->current_job == NULL) {
1433 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1434 	} else {
1435 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1436 	}
1437 }
1438 
1439 static int
1440 performance_statistics_thread(void *arg)
1441 {
1442 	struct bdevperf_aggregate_stats *stats;
1443 
1444 	if (g_performance_dump_active) {
1445 		return -1;
1446 	}
1447 
1448 	g_performance_dump_active = true;
1449 
1450 	stats = calloc(1, sizeof(*stats));
1451 	if (stats == NULL) {
1452 		return -1;
1453 	}
1454 
1455 	stats->min_latency = (double)UINT64_MAX;
1456 
1457 	g_show_performance_period_num++;
1458 
1459 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1460 	stats->ema_period = g_show_performance_ema_period;
1461 
1462 	/* Iterate all of the jobs to gather stats
1463 	 * These jobs will not get removed here until a final performance dump is run,
1464 	 * so this should be safe without locking.
1465 	 */
1466 	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1467 	if (stats->current_job == NULL) {
1468 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1469 	} else {
1470 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1471 	}
1472 
1473 	return -1;
1474 }
1475 
1476 static void
1477 bdevperf_test(void)
1478 {
1479 	struct bdevperf_job *job;
1480 
1481 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1482 	fflush(stdout);
1483 
1484 	/* Start a timer to dump performance numbers */
1485 	g_start_tsc = spdk_get_ticks();
1486 	if (g_show_performance_real_time && !g_perf_timer) {
1487 		printf("%*s\n", 107, "Latency(us)");
1488 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1489 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1490 
1491 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1492 						    g_show_performance_period_in_usec);
1493 	}
1494 
1495 	/* Iterate jobs to start all I/O */
1496 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1497 		g_bdevperf.running_jobs++;
1498 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1499 	}
1500 }
1501 
1502 static void
1503 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1504 {
1505 	struct bdevperf_job *job = event_ctx;
1506 
1507 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1508 		bdevperf_job_drain(job);
1509 	}
1510 }
1511 
1512 static void
1513 bdevperf_histogram_status_cb(void *cb_arg, int status)
1514 {
1515 	if (status != 0) {
1516 		g_run_rc = status;
1517 		if (g_continue_on_failure == false) {
1518 			g_error_to_exit = true;
1519 		}
1520 	}
1521 
1522 	if (--g_bdev_count == 0) {
1523 		if (g_run_rc == 0) {
1524 			/* Ready to run the test */
1525 			bdevperf_test();
1526 		} else {
1527 			bdevperf_test_done(NULL);
1528 		}
1529 	}
1530 }
1531 
1532 static uint32_t g_construct_job_count = 0;
1533 
1534 static int
1535 _bdevperf_enable_histogram(void *ctx, struct spdk_bdev *bdev)
1536 {
1537 	bool *enable = ctx;
1538 
1539 	g_bdev_count++;
1540 
1541 	spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, *enable);
1542 
1543 	return 0;
1544 }
1545 
1546 static void
1547 bdevperf_enable_histogram(bool enable)
1548 {
1549 	struct spdk_bdev *bdev;
1550 	int rc;
1551 
1552 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1553 	g_bdev_count = 1;
1554 
1555 	if (g_job_bdev_name != NULL) {
1556 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1557 		if (bdev) {
1558 			rc = _bdevperf_enable_histogram(&enable, bdev);
1559 		} else {
1560 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1561 			rc = -1;
1562 		}
1563 	} else {
1564 		rc = spdk_for_each_bdev_leaf(&enable, _bdevperf_enable_histogram);
1565 	}
1566 
1567 	bdevperf_histogram_status_cb(NULL, rc);
1568 }
1569 
1570 static void
1571 _bdevperf_construct_job_done(void *ctx)
1572 {
1573 	if (--g_construct_job_count == 0) {
1574 		if (g_run_rc != 0) {
1575 			/* Something failed. */
1576 			bdevperf_test_done(NULL);
1577 			return;
1578 		}
1579 
1580 		/* always enable histogram. */
1581 		bdevperf_enable_histogram(true);
1582 	} else if (g_run_rc != 0) {
1583 		/* Reset error as some jobs constructed right */
1584 		g_run_rc = 0;
1585 		if (g_continue_on_failure == false) {
1586 			g_error_to_exit = true;
1587 		}
1588 	}
1589 }
1590 
1591 /* Checkformat will not allow to use inlined type,
1592    this is a workaround */
1593 typedef struct spdk_thread *spdk_thread_t;
1594 
1595 static spdk_thread_t
1596 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1597 {
1598 	struct spdk_cpuset tmp;
1599 
1600 	/* This function runs on the main thread. */
1601 	assert(g_main_thread == spdk_get_thread());
1602 
1603 	/* Handle default mask */
1604 	if (spdk_cpuset_count(cpumask) == 0) {
1605 		cpumask = &g_all_cpuset;
1606 	}
1607 
1608 	/* Warn user that mask might need to be changed */
1609 	spdk_cpuset_copy(&tmp, cpumask);
1610 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1611 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1612 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1613 	}
1614 
1615 	return spdk_thread_create(tag, cpumask);
1616 }
1617 
1618 static uint32_t
1619 _get_next_core(void)
1620 {
1621 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1622 
1623 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1624 		current_core = spdk_env_get_first_core();
1625 		return current_core;
1626 	}
1627 
1628 	current_core = spdk_env_get_next_core(current_core);
1629 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1630 		current_core = spdk_env_get_first_core();
1631 	}
1632 
1633 	return current_core;
1634 }
1635 
1636 static void
1637 _bdevperf_construct_job(void *ctx)
1638 {
1639 	struct bdevperf_job *job = ctx;
1640 	int rc;
1641 
1642 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1643 				&job->bdev_desc);
1644 	if (rc != 0) {
1645 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1646 		g_run_rc = -EINVAL;
1647 		goto end;
1648 	}
1649 
1650 	if (g_zcopy) {
1651 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1652 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1653 			g_run_rc = -ENOTSUP;
1654 			goto end;
1655 		}
1656 	}
1657 
1658 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1659 	if (!job->ch) {
1660 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1661 			    rc);
1662 		spdk_bdev_close(job->bdev_desc);
1663 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1664 		g_run_rc = -ENOMEM;
1665 		goto end;
1666 	}
1667 
1668 end:
1669 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1670 }
1671 
1672 static void
1673 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1674 {
1675 	switch (rw) {
1676 	case JOB_CONFIG_RW_READ:
1677 		job->rw_percentage = 100;
1678 		break;
1679 	case JOB_CONFIG_RW_WRITE:
1680 		job->rw_percentage = 0;
1681 		break;
1682 	case JOB_CONFIG_RW_RANDREAD:
1683 		job->is_random = true;
1684 		job->rw_percentage = 100;
1685 		job->seed = rand();
1686 		break;
1687 	case JOB_CONFIG_RW_RANDWRITE:
1688 		job->is_random = true;
1689 		job->rw_percentage = 0;
1690 		job->seed = rand();
1691 		break;
1692 	case JOB_CONFIG_RW_RW:
1693 		job->is_random = false;
1694 		break;
1695 	case JOB_CONFIG_RW_RANDRW:
1696 		job->is_random = true;
1697 		job->seed = rand();
1698 		break;
1699 	case JOB_CONFIG_RW_VERIFY:
1700 		job->verify = true;
1701 		job->rw_percentage = 50;
1702 		break;
1703 	case JOB_CONFIG_RW_RESET:
1704 		job->reset = true;
1705 		job->verify = true;
1706 		job->rw_percentage = 50;
1707 		break;
1708 	case JOB_CONFIG_RW_UNMAP:
1709 		job->unmap = true;
1710 		break;
1711 	case JOB_CONFIG_RW_FLUSH:
1712 		job->flush = true;
1713 		break;
1714 	case JOB_CONFIG_RW_WRITE_ZEROES:
1715 		job->write_zeroes = true;
1716 		break;
1717 	}
1718 }
1719 
1720 static int
1721 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1722 		       struct spdk_thread *thread)
1723 {
1724 	struct bdevperf_job *job;
1725 	struct bdevperf_task *task;
1726 	int block_size, data_block_size;
1727 	int rc;
1728 	int task_num, n;
1729 
1730 	block_size = spdk_bdev_get_block_size(bdev);
1731 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1732 
1733 	job = calloc(1, sizeof(struct bdevperf_job));
1734 	if (!job) {
1735 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1736 		return -ENOMEM;
1737 	}
1738 
1739 	job->name = strdup(spdk_bdev_get_name(bdev));
1740 	if (!job->name) {
1741 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1742 		bdevperf_job_free(job);
1743 		return -ENOMEM;
1744 	}
1745 
1746 	job->workload_type = config->rw;
1747 	job->io_size = config->bs;
1748 	job->rw_percentage = config->rwmixread;
1749 	job->continue_on_failure = g_continue_on_failure;
1750 	job->queue_depth = config->iodepth;
1751 	job->bdev = bdev;
1752 	job->io_size_blocks = job->io_size / data_block_size;
1753 	job->buf_size = job->io_size_blocks * block_size;
1754 	job->abort = g_abort;
1755 	job_init_rw(job, config->rw);
1756 
1757 	if ((job->io_size % data_block_size) != 0) {
1758 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1759 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1760 		bdevperf_job_free(job);
1761 		return -ENOTSUP;
1762 	}
1763 
1764 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1765 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1766 		bdevperf_job_free(job);
1767 		return -ENOTSUP;
1768 	}
1769 
1770 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1771 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1772 	}
1773 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1774 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1775 	}
1776 
1777 	job->offset_in_ios = 0;
1778 
1779 	if (config->length != 0) {
1780 		/* Use subset of disk */
1781 		job->size_in_ios = config->length / job->io_size_blocks;
1782 		job->ios_base = config->offset / job->io_size_blocks;
1783 	} else {
1784 		/* Use whole disk */
1785 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1786 		job->ios_base = 0;
1787 	}
1788 
1789 	if (job->is_random && g_zipf_theta > 0) {
1790 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1791 	}
1792 
1793 	if (job->verify) {
1794 		if (job->size_in_ios >= UINT32_MAX) {
1795 			SPDK_ERRLOG("Due to constraints of verify operation, the job storage capacity is too large\n");
1796 			bdevperf_job_free(job);
1797 			return -ENOMEM;
1798 		}
1799 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1800 		if (job->outstanding == NULL) {
1801 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1802 				    spdk_bdev_get_name(bdev));
1803 			bdevperf_job_free(job);
1804 			return -ENOMEM;
1805 		}
1806 		if (job->queue_depth > (int)job->size_in_ios) {
1807 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1808 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1809 				     "Queue depth is limited to %"PRIu64"\n",
1810 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1811 			job->queue_depth = (int)job->size_in_ios;
1812 		}
1813 	}
1814 
1815 	job->histogram = spdk_histogram_data_alloc();
1816 	if (job->histogram == NULL) {
1817 		fprintf(stderr, "Failed to allocate histogram\n");
1818 		bdevperf_job_free(job);
1819 		return -ENOMEM;
1820 	}
1821 
1822 	TAILQ_INIT(&job->task_list);
1823 
1824 	if (g_random_map) {
1825 		if (job->size_in_ios >= UINT32_MAX) {
1826 			SPDK_ERRLOG("Due to constraints of the random map, the job storage capacity is too large\n");
1827 			bdevperf_job_free(job);
1828 			return -ENOMEM;
1829 		}
1830 		job->random_map = spdk_bit_array_create(job->size_in_ios);
1831 		if (job->random_map == NULL) {
1832 			SPDK_ERRLOG("Could not create random_map array bitmap for bdev %s\n",
1833 				    spdk_bdev_get_name(bdev));
1834 			bdevperf_job_free(job);
1835 			return -ENOMEM;
1836 		}
1837 	}
1838 
1839 	task_num = job->queue_depth;
1840 	if (job->reset) {
1841 		task_num += 1;
1842 	}
1843 	if (job->abort) {
1844 		task_num += job->queue_depth;
1845 	}
1846 
1847 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1848 
1849 	for (n = 0; n < task_num; n++) {
1850 		task = calloc(1, sizeof(struct bdevperf_task));
1851 		if (!task) {
1852 			fprintf(stderr, "Failed to allocate task from memory\n");
1853 			spdk_zipf_free(&job->zipf);
1854 			return -ENOMEM;
1855 		}
1856 
1857 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1858 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1859 		if (!task->buf) {
1860 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
1861 			spdk_zipf_free(&job->zipf);
1862 			free(task);
1863 			return -ENOMEM;
1864 		}
1865 
1866 		if (spdk_bdev_is_md_separate(job->bdev)) {
1867 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
1868 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
1869 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1870 			if (!task->md_buf) {
1871 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
1872 				spdk_zipf_free(&job->zipf);
1873 				spdk_free(task->buf);
1874 				free(task);
1875 				return -ENOMEM;
1876 			}
1877 		}
1878 
1879 		task->job = job;
1880 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
1881 	}
1882 
1883 	job->thread = thread;
1884 
1885 	g_construct_job_count++;
1886 
1887 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
1888 	assert(rc == 0);
1889 
1890 	return rc;
1891 }
1892 
1893 static int
1894 parse_rw(const char *str, enum job_config_rw ret)
1895 {
1896 	if (str == NULL) {
1897 		return ret;
1898 	}
1899 
1900 	if (!strcmp(str, "read")) {
1901 		ret = JOB_CONFIG_RW_READ;
1902 	} else if (!strcmp(str, "randread")) {
1903 		ret = JOB_CONFIG_RW_RANDREAD;
1904 	} else if (!strcmp(str, "write")) {
1905 		ret = JOB_CONFIG_RW_WRITE;
1906 	} else if (!strcmp(str, "randwrite")) {
1907 		ret = JOB_CONFIG_RW_RANDWRITE;
1908 	} else if (!strcmp(str, "verify")) {
1909 		ret = JOB_CONFIG_RW_VERIFY;
1910 	} else if (!strcmp(str, "reset")) {
1911 		ret = JOB_CONFIG_RW_RESET;
1912 	} else if (!strcmp(str, "unmap")) {
1913 		ret = JOB_CONFIG_RW_UNMAP;
1914 	} else if (!strcmp(str, "write_zeroes")) {
1915 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
1916 	} else if (!strcmp(str, "flush")) {
1917 		ret = JOB_CONFIG_RW_FLUSH;
1918 	} else if (!strcmp(str, "rw")) {
1919 		ret = JOB_CONFIG_RW_RW;
1920 	} else if (!strcmp(str, "randrw")) {
1921 		ret = JOB_CONFIG_RW_RANDRW;
1922 	} else {
1923 		fprintf(stderr, "rw must be one of\n"
1924 			PATTERN_TYPES_STR "\n");
1925 		ret = BDEVPERF_CONFIG_ERROR;
1926 	}
1927 
1928 	return ret;
1929 }
1930 
1931 static const char *
1932 config_filename_next(const char *filename, char *out)
1933 {
1934 	int i, k;
1935 
1936 	if (filename == NULL) {
1937 		out[0] = '\0';
1938 		return NULL;
1939 	}
1940 
1941 	if (filename[0] == ':') {
1942 		filename++;
1943 	}
1944 
1945 	for (i = 0, k = 0;
1946 	     filename[i] != '\0' &&
1947 	     filename[i] != ':' &&
1948 	     i < BDEVPERF_CONFIG_MAX_FILENAME &&
1949 	     k < (BDEVPERF_CONFIG_MAX_FILENAME - 1);
1950 	     i++) {
1951 		if (filename[i] == ' ' || filename[i] == '\t') {
1952 			continue;
1953 		}
1954 
1955 		out[k++] = filename[i];
1956 	}
1957 	out[k] = 0;
1958 
1959 	return filename + i;
1960 }
1961 
1962 static struct spdk_thread *
1963 get_lcore_thread(uint32_t lcore)
1964 {
1965 	struct lcore_thread *lthread;
1966 
1967 	TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
1968 		if (lthread->lcore == lcore) {
1969 			return lthread->thread;
1970 		}
1971 	}
1972 
1973 	return NULL;
1974 }
1975 
1976 static void
1977 create_lcore_thread(uint32_t lcore)
1978 {
1979 	struct lcore_thread *lthread;
1980 	struct spdk_cpuset cpumask = {};
1981 	char name[32];
1982 
1983 	lthread = calloc(1, sizeof(*lthread));
1984 	assert(lthread != NULL);
1985 
1986 	lthread->lcore = lcore;
1987 
1988 	snprintf(name, sizeof(name), "lcore_%u", lcore);
1989 	spdk_cpuset_set_cpu(&cpumask, lcore, true);
1990 
1991 	lthread->thread = spdk_thread_create(name, &cpumask);
1992 	assert(lthread->thread != NULL);
1993 
1994 	TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
1995 }
1996 
1997 static void
1998 bdevperf_construct_jobs(void)
1999 {
2000 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
2001 	struct spdk_thread *thread;
2002 	struct job_config *config;
2003 	struct spdk_bdev *bdev;
2004 	const char *filenames;
2005 	uint32_t i;
2006 	int rc;
2007 
2008 	if (g_one_thread_per_lcore) {
2009 		SPDK_ENV_FOREACH_CORE(i) {
2010 			create_lcore_thread(i);
2011 		}
2012 	}
2013 
2014 	TAILQ_FOREACH(config, &job_config_list, link) {
2015 		filenames = config->filename;
2016 
2017 		if (!g_one_thread_per_lcore) {
2018 			thread = construct_job_thread(&config->cpumask, config->name);
2019 		} else {
2020 			thread = get_lcore_thread(config->lcore);
2021 		}
2022 		assert(thread);
2023 
2024 		while (filenames) {
2025 			filenames = config_filename_next(filenames, filename);
2026 			if (strlen(filename) == 0) {
2027 				break;
2028 			}
2029 
2030 			bdev = spdk_bdev_get_by_name(filename);
2031 			if (!bdev) {
2032 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
2033 				g_run_rc = -EINVAL;
2034 				return;
2035 			}
2036 
2037 			rc = bdevperf_construct_job(bdev, config, thread);
2038 			if (rc < 0) {
2039 				g_run_rc = rc;
2040 				return;
2041 			}
2042 		}
2043 	}
2044 }
2045 
2046 static int
2047 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
2048 {
2049 	struct job_config *config = calloc(1, sizeof(*config));
2050 
2051 	if (config == NULL) {
2052 		fprintf(stderr, "Unable to allocate memory for job config\n");
2053 		return -ENOMEM;
2054 	}
2055 
2056 	config->name = filename;
2057 	config->filename = filename;
2058 	config->lcore = _get_next_core();
2059 	spdk_cpuset_zero(&config->cpumask);
2060 	spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
2061 	config->bs = g_io_size;
2062 	config->iodepth = g_queue_depth;
2063 	config->rwmixread = g_rw_percentage;
2064 	config->offset = offset;
2065 	config->length = range;
2066 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
2067 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2068 		free(config);
2069 		return -EINVAL;
2070 	}
2071 
2072 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
2073 	return 0;
2074 }
2075 
2076 static int
2077 bdevperf_construct_multithread_job_config(void *ctx, struct spdk_bdev *bdev)
2078 {
2079 	uint32_t *num_cores = ctx;
2080 	uint32_t i;
2081 	uint64_t blocks_per_job;
2082 	int64_t offset;
2083 	int rc;
2084 
2085 	blocks_per_job = spdk_bdev_get_num_blocks(bdev) / *num_cores;
2086 	offset = 0;
2087 
2088 	SPDK_ENV_FOREACH_CORE(i) {
2089 		rc = make_cli_job_config(spdk_bdev_get_name(bdev), offset, blocks_per_job);
2090 		if (rc) {
2091 			return rc;
2092 		}
2093 
2094 		offset += blocks_per_job;
2095 	}
2096 
2097 	return 0;
2098 }
2099 
2100 static void
2101 bdevperf_construct_multithread_job_configs(void)
2102 {
2103 	struct spdk_bdev *bdev;
2104 	uint32_t i;
2105 	uint32_t num_cores;
2106 
2107 	num_cores = 0;
2108 	SPDK_ENV_FOREACH_CORE(i) {
2109 		num_cores++;
2110 	}
2111 
2112 	if (num_cores == 0) {
2113 		g_run_rc = -EINVAL;
2114 		return;
2115 	}
2116 
2117 	if (g_job_bdev_name != NULL) {
2118 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2119 		if (!bdev) {
2120 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2121 			return;
2122 		}
2123 		g_run_rc = bdevperf_construct_multithread_job_config(&num_cores, bdev);
2124 	} else {
2125 		g_run_rc = spdk_for_each_bdev_leaf(&num_cores, bdevperf_construct_multithread_job_config);
2126 	}
2127 
2128 }
2129 
2130 static int
2131 bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
2132 {
2133 	/* Construct the job */
2134 	return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
2135 }
2136 
2137 static void
2138 bdevperf_construct_job_configs(void)
2139 {
2140 	struct spdk_bdev *bdev;
2141 
2142 	/* There are three different modes for allocating jobs. Standard mode
2143 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
2144 	 *
2145 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
2146 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
2147 	 * This runs multiple threads per bdev, effectively.
2148 	 *
2149 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
2150 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
2151 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
2152 	 * which is different from other modes in that they only support global options.
2153 	 *
2154 	 * Both for standard mode and "multithread" mode, if the -E flag is specified,
2155 	 * it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
2156 	 * multiple jobs.
2157 	 */
2158 
2159 	if (g_bdevperf_conf) {
2160 		goto end;
2161 	}
2162 
2163 	if (g_multithread_mode) {
2164 		bdevperf_construct_multithread_job_configs();
2165 	} else if (g_job_bdev_name != NULL) {
2166 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2167 		if (bdev) {
2168 			/* Construct the job */
2169 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
2170 		} else {
2171 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2172 		}
2173 	} else {
2174 		g_run_rc = spdk_for_each_bdev_leaf(NULL, bdevperf_construct_job_config);
2175 	}
2176 
2177 end:
2178 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
2179 	 * of iteration.
2180 	 */
2181 	g_construct_job_count = 1;
2182 
2183 	if (g_run_rc == 0) {
2184 		bdevperf_construct_jobs();
2185 	}
2186 
2187 	_bdevperf_construct_job_done(NULL);
2188 }
2189 
2190 static int
2191 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
2192 {
2193 	const char *job_name;
2194 	int tmp;
2195 
2196 	tmp = spdk_conf_section_get_intval(s, name);
2197 	if (tmp == -1) {
2198 		/* Field was not found. Check default value
2199 		 * In [global] section it is ok to have undefined values
2200 		 * but for other sections it is not ok */
2201 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
2202 			job_name = spdk_conf_section_get_name(s);
2203 			if (strcmp(job_name, "global") == 0) {
2204 				return def;
2205 			}
2206 
2207 			fprintf(stderr,
2208 				"Job '%s' has no '%s' assigned\n",
2209 				job_name, name);
2210 			return BDEVPERF_CONFIG_ERROR;
2211 		}
2212 		return def;
2213 	}
2214 
2215 	/* NOTE: get_intval returns nonnegative on success */
2216 	if (tmp < 0) {
2217 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2218 			spdk_conf_section_get_name(s), name);
2219 		return BDEVPERF_CONFIG_ERROR;
2220 	}
2221 
2222 	return tmp;
2223 }
2224 
2225 /* CLI arguments override parameters for global sections */
2226 static void
2227 config_set_cli_args(struct job_config *config)
2228 {
2229 	if (g_job_bdev_name) {
2230 		config->filename = g_job_bdev_name;
2231 	}
2232 	if (g_io_size > 0) {
2233 		config->bs = g_io_size;
2234 	}
2235 	if (g_queue_depth > 0) {
2236 		config->iodepth = g_queue_depth;
2237 	}
2238 	if (g_rw_percentage > 0) {
2239 		config->rwmixread = g_rw_percentage;
2240 	}
2241 	if (g_workload_type) {
2242 		config->rw = parse_rw(g_workload_type, config->rw);
2243 	}
2244 }
2245 
2246 static int
2247 read_job_config(void)
2248 {
2249 	struct job_config global_default_config;
2250 	struct job_config global_config;
2251 	struct spdk_conf_section *s;
2252 	struct job_config *config = NULL;
2253 	const char *cpumask;
2254 	const char *rw;
2255 	bool is_global;
2256 	int n = 0;
2257 	int val;
2258 
2259 	if (g_bdevperf_conf_file == NULL) {
2260 		return 0;
2261 	}
2262 
2263 	g_bdevperf_conf = spdk_conf_allocate();
2264 	if (g_bdevperf_conf == NULL) {
2265 		fprintf(stderr, "Could not allocate job config structure\n");
2266 		return 1;
2267 	}
2268 
2269 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2270 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2271 		fprintf(stderr, "Invalid job config");
2272 		return 1;
2273 	}
2274 
2275 	/* Initialize global defaults */
2276 	global_default_config.filename = NULL;
2277 	/* Zero mask is the same as g_all_cpuset
2278 	 * The g_all_cpuset is not initialized yet,
2279 	 * so use zero mask as the default instead */
2280 	spdk_cpuset_zero(&global_default_config.cpumask);
2281 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2282 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2283 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2284 	global_default_config.rwmixread = 50;
2285 	global_default_config.offset = 0;
2286 	/* length 0 means 100% */
2287 	global_default_config.length = 0;
2288 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2289 	config_set_cli_args(&global_default_config);
2290 
2291 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2292 		return 1;
2293 	}
2294 
2295 	/* There is only a single instance of global job_config
2296 	 * We just reset its value when we encounter new [global] section */
2297 	global_config = global_default_config;
2298 
2299 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2300 	     s != NULL;
2301 	     s = spdk_conf_next_section(s)) {
2302 		config = calloc(1, sizeof(*config));
2303 		if (config == NULL) {
2304 			fprintf(stderr, "Unable to allocate memory for job config\n");
2305 			return 1;
2306 		}
2307 
2308 		config->name = spdk_conf_section_get_name(s);
2309 		is_global = strcmp(config->name, "global") == 0;
2310 
2311 		if (is_global) {
2312 			global_config = global_default_config;
2313 		}
2314 
2315 		config->filename = spdk_conf_section_get_val(s, "filename");
2316 		if (config->filename == NULL) {
2317 			config->filename = global_config.filename;
2318 		}
2319 		if (!is_global) {
2320 			if (config->filename == NULL) {
2321 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2322 				goto error;
2323 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2324 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2325 				fprintf(stderr,
2326 					"filename for '%s' job is too long. Max length is %d\n",
2327 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2328 				goto error;
2329 			}
2330 		}
2331 
2332 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2333 		if (cpumask == NULL) {
2334 			config->cpumask = global_config.cpumask;
2335 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2336 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2337 			goto error;
2338 		}
2339 
2340 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2341 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2342 			goto error;
2343 		} else if (config->bs == 0) {
2344 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2345 			goto error;
2346 		}
2347 
2348 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2349 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2350 			goto error;
2351 		} else if (config->iodepth == 0) {
2352 			fprintf(stderr,
2353 				"'iodepth' of job '%s' must be greater than 0\n",
2354 				config->name);
2355 			goto error;
2356 		}
2357 
2358 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2359 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2360 			goto error;
2361 		} else if (config->rwmixread > 100) {
2362 			fprintf(stderr,
2363 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2364 				config->name);
2365 			goto error;
2366 		}
2367 
2368 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2369 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2370 			goto error;
2371 		}
2372 
2373 		val = parse_uint_option(s, "length", global_config.length);
2374 		if (val == BDEVPERF_CONFIG_ERROR) {
2375 			goto error;
2376 		}
2377 		config->length = val;
2378 
2379 		rw = spdk_conf_section_get_val(s, "rw");
2380 		config->rw = parse_rw(rw, global_config.rw);
2381 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2382 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2383 			goto error;
2384 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2385 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2386 			goto error;
2387 		}
2388 
2389 		if (is_global) {
2390 			config_set_cli_args(config);
2391 			global_config = *config;
2392 			free(config);
2393 			config = NULL;
2394 		} else {
2395 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2396 			n++;
2397 		}
2398 	}
2399 
2400 	if (g_rpc_log_file_name != NULL) {
2401 		g_rpc_log_file = fopen(g_rpc_log_file_name, "a");
2402 		if (g_rpc_log_file == NULL) {
2403 			fprintf(stderr, "Failed to open %s\n", g_rpc_log_file_name);
2404 			goto error;
2405 		}
2406 	}
2407 
2408 	printf("Using job config with %d jobs\n", n);
2409 	return 0;
2410 error:
2411 	free(config);
2412 	return 1;
2413 }
2414 
2415 static void
2416 bdevperf_run(void *arg1)
2417 {
2418 	uint32_t i;
2419 
2420 	g_main_thread = spdk_get_thread();
2421 
2422 	spdk_cpuset_zero(&g_all_cpuset);
2423 	SPDK_ENV_FOREACH_CORE(i) {
2424 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2425 	}
2426 
2427 	if (g_wait_for_tests) {
2428 		/* Do not perform any tests until RPC is received */
2429 		return;
2430 	}
2431 
2432 	bdevperf_construct_job_configs();
2433 }
2434 
2435 static void
2436 rpc_perform_tests_reset(void)
2437 {
2438 	/* Reset g_run_rc to 0 for the next test run. */
2439 	g_run_rc = 0;
2440 
2441 	/* Reset g_stats to 0 for the next test run. */
2442 	memset(&g_stats, 0, sizeof(g_stats));
2443 
2444 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2445 	g_show_performance_period_num = 0;
2446 }
2447 
2448 static void
2449 rpc_perform_tests_cb(void)
2450 {
2451 	struct spdk_json_write_ctx *w;
2452 	struct spdk_jsonrpc_request *request = g_request;
2453 
2454 	g_request = NULL;
2455 
2456 	if (g_run_rc == 0) {
2457 		w = spdk_jsonrpc_begin_result(request);
2458 		spdk_json_write_uint32(w, g_run_rc);
2459 		spdk_jsonrpc_end_result(request, w);
2460 	} else {
2461 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2462 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2463 	}
2464 
2465 	rpc_perform_tests_reset();
2466 }
2467 
2468 struct rpc_bdevperf_params {
2469 	int	time_in_sec;
2470 	char	*workload_type;
2471 	int	queue_depth;
2472 	char	*io_size;
2473 	int	rw_percentage;
2474 };
2475 
2476 static const struct spdk_json_object_decoder rpc_bdevperf_params_decoders[] = {
2477 	{"time_in_sec", offsetof(struct rpc_bdevperf_params, time_in_sec), spdk_json_decode_int32, true},
2478 	{"workload_type", offsetof(struct rpc_bdevperf_params, workload_type), spdk_json_decode_string, true},
2479 	{"queue_depth", offsetof(struct rpc_bdevperf_params, queue_depth), spdk_json_decode_int32, true},
2480 	{"io_size", offsetof(struct rpc_bdevperf_params, io_size), spdk_json_decode_string, true},
2481 	{"rw_percentage", offsetof(struct rpc_bdevperf_params, rw_percentage), spdk_json_decode_int32, true},
2482 };
2483 
2484 static void
2485 rpc_apply_bdevperf_params(struct rpc_bdevperf_params *params)
2486 {
2487 	if (params->workload_type) {
2488 		/* we need to clear previously settled parameter to avoid memory leak */
2489 		free(g_workload_type);
2490 		g_workload_type = strdup(params->workload_type);
2491 	}
2492 	if (params->queue_depth) {
2493 		g_queue_depth = params->queue_depth;
2494 	}
2495 	if (params->io_size) {
2496 		bdevperf_parse_arg('o', params->io_size);
2497 	}
2498 	if (params->time_in_sec) {
2499 		g_time_in_sec = params->time_in_sec;
2500 	}
2501 	if (params->rw_percentage) {
2502 		g_rw_percentage = params->rw_percentage;
2503 		g_mix_specified = true;
2504 	} else {
2505 		g_mix_specified = false;
2506 	}
2507 }
2508 
2509 static void
2510 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2511 {
2512 	struct rpc_bdevperf_params req = {}, backup = {};
2513 	int rc;
2514 
2515 	if (g_request != NULL) {
2516 		fprintf(stderr, "Another test is already in progress.\n");
2517 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2518 						 spdk_strerror(-EINPROGRESS));
2519 		return;
2520 	}
2521 
2522 	if (params) {
2523 		if (spdk_json_decode_object_relaxed(params, rpc_bdevperf_params_decoders,
2524 						    SPDK_COUNTOF(rpc_bdevperf_params_decoders),
2525 						    &req)) {
2526 			spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2527 							 "spdk_json_decode_object failed");
2528 			return;
2529 		}
2530 
2531 		if (g_workload_type) {
2532 			backup.workload_type = strdup(g_workload_type);
2533 		}
2534 		backup.queue_depth = g_queue_depth;
2535 		if (asprintf(&backup.io_size, "%d", g_io_size) < 0) {
2536 			fprintf(stderr, "Couldn't allocate memory for queue depth");
2537 			goto rpc_error;
2538 		}
2539 		backup.time_in_sec = g_time_in_sec;
2540 		backup.rw_percentage = g_rw_percentage;
2541 
2542 		rpc_apply_bdevperf_params(&req);
2543 
2544 		free(req.workload_type);
2545 		free(req.io_size);
2546 	}
2547 
2548 	rc = verify_test_params();
2549 
2550 	if (rc) {
2551 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2552 						 "Invalid parameters provided");
2553 		/* restore old params on error */
2554 		rpc_apply_bdevperf_params(&backup);
2555 		goto rpc_error;
2556 	}
2557 
2558 	g_request = request;
2559 
2560 	/* Only construct job configs at the first test run.  */
2561 	if (TAILQ_EMPTY(&job_config_list)) {
2562 		bdevperf_construct_job_configs();
2563 	} else {
2564 		bdevperf_construct_jobs();
2565 	}
2566 
2567 rpc_error:
2568 	free(backup.io_size);
2569 	free(backup.workload_type);
2570 }
2571 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2572 
2573 static void
2574 _bdevperf_job_drain(void *ctx)
2575 {
2576 	bdevperf_job_drain(ctx);
2577 }
2578 
2579 static void
2580 spdk_bdevperf_shutdown_cb(void)
2581 {
2582 	g_shutdown = true;
2583 	struct bdevperf_job *job, *tmp;
2584 
2585 	if (g_bdevperf.running_jobs == 0) {
2586 		bdevperf_test_done(NULL);
2587 		return;
2588 	}
2589 
2590 	/* Iterate jobs to stop all I/O */
2591 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2592 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2593 	}
2594 }
2595 
2596 static int
2597 bdevperf_parse_arg(int ch, char *arg)
2598 {
2599 	long long tmp;
2600 
2601 	if (ch == 'w') {
2602 		g_workload_type = strdup(arg);
2603 	} else if (ch == 'T') {
2604 		g_job_bdev_name = arg;
2605 	} else if (ch == 'z') {
2606 		g_wait_for_tests = true;
2607 	} else if (ch == 'Z') {
2608 		g_zcopy = true;
2609 	} else if (ch == 'X') {
2610 		g_abort = true;
2611 	} else if (ch == 'C') {
2612 		g_multithread_mode = true;
2613 	} else if (ch == 'f') {
2614 		g_continue_on_failure = true;
2615 	} else if (ch == 'j') {
2616 		g_bdevperf_conf_file = arg;
2617 	} else if (ch == 'F') {
2618 		char *endptr;
2619 
2620 		errno = 0;
2621 		g_zipf_theta = strtod(arg, &endptr);
2622 		if (errno || arg == endptr || g_zipf_theta < 0) {
2623 			fprintf(stderr, "Illegal zipf theta value %s\n", arg);
2624 			return -EINVAL;
2625 		}
2626 	} else if (ch == 'l') {
2627 		g_latency_display_level++;
2628 	} else if (ch == 'D') {
2629 		g_random_map = true;
2630 	} else if (ch == 'E') {
2631 		g_one_thread_per_lcore = true;
2632 	} else if (ch == 'J') {
2633 		g_rpc_log_file_name = arg;
2634 	} else if (ch == 'o') {
2635 		uint64_t size;
2636 
2637 		if (spdk_parse_capacity(arg, &size, NULL) != 0) {
2638 			fprintf(stderr, "Invalid IO size: %s\n", arg);
2639 			return -EINVAL;
2640 		}
2641 		g_io_size = (int)size;
2642 	} else if (ch == 'U') {
2643 		g_unique_writes = true;
2644 	} else {
2645 		tmp = spdk_strtoll(arg, 10);
2646 		if (tmp < 0) {
2647 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2648 			return tmp;
2649 		} else if (tmp >= INT_MAX) {
2650 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2651 			return -ERANGE;
2652 		}
2653 
2654 		switch (ch) {
2655 		case 'q':
2656 			g_queue_depth = tmp;
2657 			break;
2658 		case 't':
2659 			g_time_in_sec = tmp;
2660 			break;
2661 		case 'k':
2662 			g_timeout_in_sec = tmp;
2663 			break;
2664 		case 'M':
2665 			g_rw_percentage = tmp;
2666 			g_mix_specified = true;
2667 			break;
2668 		case 'P':
2669 			g_show_performance_ema_period = tmp;
2670 			break;
2671 		case 'S':
2672 			g_show_performance_real_time = 1;
2673 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2674 			break;
2675 		default:
2676 			return -EINVAL;
2677 		}
2678 	}
2679 	return 0;
2680 }
2681 
2682 static void
2683 bdevperf_usage(void)
2684 {
2685 	printf(" -q <depth>                io depth\n");
2686 	printf(" -o <size>                 io size in bytes\n");
2687 	printf(" -w <type>                 io pattern type, must be one of " PATTERN_TYPES_STR "\n");
2688 	printf(" -t <time>                 time in seconds\n");
2689 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2690 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2691 	printf(" -P <num>                  number of moving average period\n");
2692 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2693 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2694 	printf("\t\t(only valid with -S)\n");
2695 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2696 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2697 	printf(" -f                        continue processing I/O even after failures\n");
2698 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2699 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2700 	printf(" -z                        start bdevperf, but wait for perform_tests RPC to start tests\n");
2701 	printf("                           (See examples/bdev/bdevperf/bdevperf.py)\n");
2702 	printf(" -X                        abort timed out I/O\n");
2703 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2704 	printf(" -j <filename>             use job config file\n");
2705 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2706 	printf(" -D                        use a random map for picking offsets not previously read or written (for all jobs)\n");
2707 	printf(" -E                        share per lcore thread among jobs. Available only if -j is not used.\n");
2708 	printf(" -J                        File name to open with append mode and log JSON RPC calls.\n");
2709 	printf(" -U                        generate unique data for each write I/O, has no effect on non-write I/O\n");
2710 }
2711 
2712 static void
2713 bdevperf_fini(void)
2714 {
2715 	free_job_config();
2716 	free(g_workload_type);
2717 
2718 	if (g_rpc_log_file != NULL) {
2719 		fclose(g_rpc_log_file);
2720 		g_rpc_log_file = NULL;
2721 	}
2722 }
2723 
2724 static int
2725 verify_test_params(void)
2726 {
2727 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2728 		goto out;
2729 	}
2730 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2731 		goto out;
2732 	}
2733 	if (!g_bdevperf_conf_file && !g_workload_type) {
2734 		goto out;
2735 	}
2736 	if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
2737 		printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
2738 		goto out;
2739 	}
2740 	if (g_time_in_sec <= 0) {
2741 		goto out;
2742 	}
2743 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2744 
2745 	if (g_timeout_in_sec < 0) {
2746 		goto out;
2747 	}
2748 
2749 	if (g_abort && !g_timeout_in_sec) {
2750 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2751 	}
2752 
2753 	if (g_show_performance_ema_period > 0 &&
2754 	    g_show_performance_real_time == 0) {
2755 		fprintf(stderr, "-P option must be specified with -S option\n");
2756 		return 1;
2757 	}
2758 
2759 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2760 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2761 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2762 		printf("Zero copy mechanism will not be used.\n");
2763 		g_zcopy = false;
2764 	}
2765 
2766 	if (g_bdevperf_conf_file) {
2767 		/* workload_type verification happens during config file parsing */
2768 		return 0;
2769 	}
2770 
2771 	if (!strcmp(g_workload_type, "verify") ||
2772 	    !strcmp(g_workload_type, "reset")) {
2773 		g_rw_percentage = 50;
2774 		if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2775 			fprintf(stderr, "Unable to exceed max I/O size of %d for verify. (%d provided).\n",
2776 				SPDK_BDEV_LARGE_BUF_MAX_SIZE, g_io_size);
2777 			return 1;
2778 		}
2779 		g_verify = true;
2780 		if (!strcmp(g_workload_type, "reset")) {
2781 			g_reset = true;
2782 		}
2783 	}
2784 
2785 	if (!strcmp(g_workload_type, "read") ||
2786 	    !strcmp(g_workload_type, "randread") ||
2787 	    !strcmp(g_workload_type, "write") ||
2788 	    !strcmp(g_workload_type, "randwrite") ||
2789 	    !strcmp(g_workload_type, "verify") ||
2790 	    !strcmp(g_workload_type, "reset") ||
2791 	    !strcmp(g_workload_type, "unmap") ||
2792 	    !strcmp(g_workload_type, "write_zeroes") ||
2793 	    !strcmp(g_workload_type, "flush")) {
2794 		if (g_mix_specified) {
2795 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2796 				" only when using rw or randrw.\n");
2797 		}
2798 	}
2799 
2800 	if (!strcmp(g_workload_type, "rw") ||
2801 	    !strcmp(g_workload_type, "randrw")) {
2802 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2803 			fprintf(stderr,
2804 				"-M must be specified to value from 0 to 100 "
2805 				"for rw or randrw.\n");
2806 			return 1;
2807 		}
2808 	}
2809 
2810 	if (strcmp(g_workload_type, "randread") &&
2811 	    strcmp(g_workload_type, "randwrite") &&
2812 	    strcmp(g_workload_type, "randrw")) {
2813 		if (g_random_map) {
2814 			fprintf(stderr, "Ignoring -D option... Please use -D option"
2815 				" only when using randread, randwrite or randrw.\n");
2816 			return 1;
2817 		}
2818 	}
2819 
2820 	return 0;
2821 out:
2822 	return 1;
2823 }
2824 
2825 int
2826 main(int argc, char **argv)
2827 {
2828 	struct spdk_app_opts opts = {};
2829 	int rc;
2830 
2831 	/* Use the runtime PID to set the random seed */
2832 	srand(getpid());
2833 
2834 	spdk_app_opts_init(&opts, sizeof(opts));
2835 	opts.name = "bdevperf";
2836 	opts.rpc_addr = NULL;
2837 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2838 
2839 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:J:M:P:S:T:Xlj:DU", NULL,
2840 				      bdevperf_parse_arg, bdevperf_usage)) !=
2841 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2842 		return rc;
2843 	}
2844 
2845 	/* Set the default address if no rpc_addr was provided in args
2846 	 * and RPC is used for starting tests */
2847 	if (g_wait_for_tests && opts.rpc_addr == NULL) {
2848 		opts.rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2849 	}
2850 
2851 	if (read_job_config()) {
2852 		bdevperf_fini();
2853 		return 1;
2854 	}
2855 
2856 	if (g_rpc_log_file != NULL) {
2857 		opts.rpc_log_file = g_rpc_log_file;
2858 	}
2859 
2860 	if (verify_test_params() != 0 && !g_wait_for_tests) {
2861 		spdk_app_usage();
2862 		bdevperf_usage();
2863 		bdevperf_fini();
2864 		exit(1);
2865 	}
2866 
2867 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
2868 
2869 	spdk_app_fini();
2870 	bdevperf_fini();
2871 	return rc;
2872 }
2873