xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision 8130039ee5287100d9eb93eb886967645da3d545)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 #define PATTERN_TYPES_STR "(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush, write_zeroes)"
28 
29 struct bdevperf_task {
30 	struct iovec			iov;
31 	struct bdevperf_job		*job;
32 	struct spdk_bdev_io		*bdev_io;
33 	void				*buf;
34 	void				*verify_buf;
35 	void				*md_buf;
36 	uint64_t			offset_blocks;
37 	struct bdevperf_task		*task_to_abort;
38 	enum spdk_bdev_io_type		io_type;
39 	TAILQ_ENTRY(bdevperf_task)	link;
40 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
41 };
42 
43 static char *g_workload_type = NULL;
44 static int g_io_size = 0;
45 /* initialize to invalid value so we can detect if user overrides it. */
46 static int g_rw_percentage = -1;
47 static bool g_verify = false;
48 static bool g_reset = false;
49 static bool g_continue_on_failure = false;
50 static bool g_abort = false;
51 static bool g_error_to_exit = false;
52 static int g_queue_depth = 0;
53 static uint64_t g_time_in_usec;
54 static int g_show_performance_real_time = 0;
55 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
56 static uint64_t g_show_performance_period_num = 0;
57 static uint64_t g_show_performance_ema_period = 0;
58 static int g_run_rc = 0;
59 static bool g_shutdown = false;
60 static uint64_t g_start_tsc;
61 static uint64_t g_shutdown_tsc;
62 static bool g_zcopy = false;
63 static struct spdk_thread *g_main_thread;
64 static int g_time_in_sec = 0;
65 static bool g_mix_specified = false;
66 static const char *g_job_bdev_name;
67 static bool g_wait_for_tests = false;
68 static struct spdk_jsonrpc_request *g_request = NULL;
69 static bool g_multithread_mode = false;
70 static int g_timeout_in_sec;
71 static struct spdk_conf *g_bdevperf_conf = NULL;
72 static const char *g_bdevperf_conf_file = NULL;
73 static double g_zipf_theta;
74 static bool g_random_map = false;
75 static bool g_unique_writes = false;
76 
77 static struct spdk_cpuset g_all_cpuset;
78 static struct spdk_poller *g_perf_timer = NULL;
79 
80 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
81 static void rpc_perform_tests_cb(void);
82 static int bdevperf_parse_arg(int ch, char *arg);
83 static int verify_test_params(void);
84 static void bdevperf_usage(void);
85 
86 static uint32_t g_bdev_count = 0;
87 static uint32_t g_latency_display_level;
88 
89 static bool g_one_thread_per_lcore = false;
90 
91 static const double g_latency_cutoffs[] = {
92 	0.01,
93 	0.10,
94 	0.25,
95 	0.50,
96 	0.75,
97 	0.90,
98 	0.95,
99 	0.98,
100 	0.99,
101 	0.995,
102 	0.999,
103 	0.9999,
104 	0.99999,
105 	0.999999,
106 	0.9999999,
107 	-1,
108 };
109 
110 static const char *g_rpc_log_file_name = NULL;
111 static FILE *g_rpc_log_file = NULL;
112 
113 struct latency_info {
114 	uint64_t	min;
115 	uint64_t	max;
116 	uint64_t	total;
117 };
118 
119 
120 enum job_config_rw {
121 	JOB_CONFIG_RW_READ = 0,
122 	JOB_CONFIG_RW_WRITE,
123 	JOB_CONFIG_RW_RANDREAD,
124 	JOB_CONFIG_RW_RANDWRITE,
125 	JOB_CONFIG_RW_RW,
126 	JOB_CONFIG_RW_RANDRW,
127 	JOB_CONFIG_RW_VERIFY,
128 	JOB_CONFIG_RW_RESET,
129 	JOB_CONFIG_RW_UNMAP,
130 	JOB_CONFIG_RW_FLUSH,
131 	JOB_CONFIG_RW_WRITE_ZEROES,
132 };
133 
134 struct bdevperf_job {
135 	char				*name;
136 	struct spdk_bdev		*bdev;
137 	struct spdk_bdev_desc		*bdev_desc;
138 	struct spdk_io_channel		*ch;
139 	TAILQ_ENTRY(bdevperf_job)	link;
140 	struct spdk_thread		*thread;
141 
142 	enum job_config_rw		workload_type;
143 	int				io_size;
144 	int				rw_percentage;
145 	bool				is_random;
146 	bool				verify;
147 	bool				reset;
148 	bool				continue_on_failure;
149 	bool				unmap;
150 	bool				write_zeroes;
151 	bool				flush;
152 	bool				abort;
153 	int				queue_depth;
154 	unsigned int			seed;
155 
156 	uint64_t			io_completed;
157 	uint64_t			io_failed;
158 	uint64_t			io_timeout;
159 	uint64_t			prev_io_completed;
160 	double				ema_io_per_second;
161 	int				current_queue_depth;
162 	uint64_t			size_in_ios;
163 	uint64_t			ios_base;
164 	uint64_t			offset_in_ios;
165 	uint64_t			io_size_blocks;
166 	uint64_t			buf_size;
167 	uint32_t			dif_check_flags;
168 	bool				is_draining;
169 	struct spdk_poller		*run_timer;
170 	struct spdk_poller		*reset_timer;
171 	struct spdk_bit_array		*outstanding;
172 	struct spdk_zipf		*zipf;
173 	TAILQ_HEAD(, bdevperf_task)	task_list;
174 	uint64_t			run_time_in_usec;
175 
176 	/* keep channel's histogram data before being destroyed */
177 	struct spdk_histogram_data	*histogram;
178 	struct spdk_bit_array		*random_map;
179 
180 	/* counter used for generating unique write data (-U option) */
181 	uint32_t			write_io_count;
182 };
183 
184 struct spdk_bdevperf {
185 	TAILQ_HEAD(, bdevperf_job)	jobs;
186 	uint32_t			running_jobs;
187 };
188 
189 static struct spdk_bdevperf g_bdevperf = {
190 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
191 	.running_jobs = 0,
192 };
193 
194 /* Storing values from a section of job config file */
195 struct job_config {
196 	const char			*name;
197 	const char			*filename;
198 	struct spdk_cpuset		cpumask;
199 	int				bs;
200 	int				iodepth;
201 	int				rwmixread;
202 	uint32_t			lcore;
203 	int64_t				offset;
204 	uint64_t			length;
205 	enum job_config_rw		rw;
206 	TAILQ_ENTRY(job_config)	link;
207 };
208 
209 TAILQ_HEAD(, job_config) job_config_list
210 	= TAILQ_HEAD_INITIALIZER(job_config_list);
211 
212 static bool g_performance_dump_active = false;
213 
214 struct bdevperf_aggregate_stats {
215 	struct bdevperf_job		*current_job;
216 	uint64_t			io_time_in_usec;
217 	uint64_t			ema_period;
218 	double				total_io_per_second;
219 	double				total_mb_per_second;
220 	double				total_failed_per_second;
221 	double				total_timeout_per_second;
222 	double				min_latency;
223 	double				max_latency;
224 	uint64_t			total_io_completed;
225 	uint64_t			total_tsc;
226 };
227 
228 static struct bdevperf_aggregate_stats g_stats = {.min_latency = (double)UINT64_MAX};
229 
230 struct lcore_thread {
231 	struct spdk_thread		*thread;
232 	uint32_t			lcore;
233 	TAILQ_ENTRY(lcore_thread)	link;
234 };
235 
236 TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
237 	= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
238 
239 
240 static char *
241 parse_workload_type(enum job_config_rw ret)
242 {
243 	switch (ret) {
244 	case JOB_CONFIG_RW_READ:
245 		return "read";
246 	case JOB_CONFIG_RW_RANDREAD:
247 		return "randread";
248 	case JOB_CONFIG_RW_WRITE:
249 		return "write";
250 	case JOB_CONFIG_RW_RANDWRITE:
251 		return "randwrite";
252 	case JOB_CONFIG_RW_VERIFY:
253 		return "verify";
254 	case JOB_CONFIG_RW_RESET:
255 		return "reset";
256 	case JOB_CONFIG_RW_UNMAP:
257 		return "unmap";
258 	case JOB_CONFIG_RW_WRITE_ZEROES:
259 		return "write_zeroes";
260 	case JOB_CONFIG_RW_FLUSH:
261 		return "flush";
262 	case JOB_CONFIG_RW_RW:
263 		return "rw";
264 	case JOB_CONFIG_RW_RANDRW:
265 		return "randrw";
266 	default:
267 		fprintf(stderr, "wrong workload_type code\n");
268 	}
269 
270 	return NULL;
271 }
272 
273 /*
274  * Cumulative Moving Average (CMA): average of all data up to current
275  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
276  * Simple Moving Average (SMA): unweighted mean of the previous n data
277  *
278  * Bdevperf supports CMA and EMA.
279  */
280 static double
281 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
282 {
283 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
284 }
285 
286 static double
287 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
288 {
289 	double io_completed, io_per_second;
290 
291 	io_completed = job->io_completed;
292 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
293 			/ g_show_performance_period_in_usec;
294 	job->prev_io_completed = io_completed;
295 
296 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
297 				  / (ema_period + 1);
298 	return job->ema_io_per_second;
299 }
300 
301 static void
302 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
303 		uint64_t total, uint64_t so_far)
304 {
305 	struct latency_info *latency_info = ctx;
306 
307 	if (count == 0) {
308 		return;
309 	}
310 
311 	latency_info->total += (start + end) / 2 * count;
312 
313 	if (so_far == count) {
314 		latency_info->min = start;
315 	}
316 
317 	if (so_far == total) {
318 		latency_info->max = end;
319 	}
320 }
321 
322 static void
323 performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job *job)
324 {
325 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
326 	double average_latency = 0.0, min_latency, max_latency;
327 	uint64_t time_in_usec;
328 	uint64_t tsc_rate;
329 	uint64_t total_io;
330 	struct latency_info latency_info = {};
331 
332 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
333 		printf("\r Job: %s (Core Mask 0x%s, workload: %s, percentage: %d, depth: %d, IO size: %d)\n",
334 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
335 		       parse_workload_type(job->workload_type), job->rw_percentage,
336 		       job->queue_depth, job->io_size);
337 	} else {
338 		printf("\r Job: %s (Core Mask 0x%s, workload: %s, depth: %d, IO size: %d)\n",
339 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
340 		       parse_workload_type(job->workload_type), job->queue_depth, job->io_size);
341 	}
342 
343 
344 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
345 		printf("\r Job: %s ended in about %.2f seconds with error\n",
346 		       job->name, (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
347 	}
348 	if (job->verify) {
349 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
350 		       job->ios_base, job->size_in_ios);
351 	}
352 
353 	if (g_performance_dump_active == true) {
354 		/* Use job's actual run time as Job has ended */
355 		if (job->io_failed > 0 && !job->continue_on_failure) {
356 			time_in_usec = job->run_time_in_usec;
357 		} else {
358 			time_in_usec = stats->io_time_in_usec;
359 		}
360 	} else {
361 		time_in_usec = job->run_time_in_usec;
362 	}
363 
364 	if (stats->ema_period == 0) {
365 		io_per_second = get_cma_io_per_second(job, time_in_usec);
366 	} else {
367 		io_per_second = get_ema_io_per_second(job, stats->ema_period);
368 	}
369 
370 	tsc_rate = spdk_get_ticks_hz();
371 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
372 
373 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
374 
375 	total_io = job->io_completed + job->io_failed;
376 	if (total_io != 0) {
377 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
378 	}
379 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
380 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
381 
382 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
383 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
384 
385 	printf("\t %-20s: %10.2f %10.2f %10.2f",
386 	       job->name, (float)time_in_usec / SPDK_SEC_TO_USEC, io_per_second, mb_per_second);
387 	printf(" %10.2f %8.2f",
388 	       failed_per_second, timeout_per_second);
389 	printf(" %10.2f %10.2f %10.2f\n",
390 	       average_latency, min_latency, max_latency);
391 
392 	stats->total_io_per_second += io_per_second;
393 	stats->total_mb_per_second += mb_per_second;
394 	stats->total_failed_per_second += failed_per_second;
395 	stats->total_timeout_per_second += timeout_per_second;
396 	stats->total_io_completed += job->io_completed + job->io_failed;
397 	stats->total_tsc += latency_info.total;
398 	if (min_latency < stats->min_latency) {
399 		stats->min_latency = min_latency;
400 	}
401 	if (max_latency > stats->max_latency) {
402 		stats->max_latency = max_latency;
403 	}
404 }
405 
406 static void
407 generate_data(struct bdevperf_job *job, void *buf, void *md_buf, bool unique)
408 {
409 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
410 	int buf_len = job->buf_size;
411 	int block_size = spdk_bdev_get_block_size(job->bdev);
412 	int md_size = spdk_bdev_get_md_size(job->bdev);
413 	int num_blocks = job->io_size_blocks;
414 
415 	if (buf_len < num_blocks * block_size) {
416 		return;
417 	}
418 
419 	if (md_buf == NULL) {
420 		data_block_size = block_size - md_size;
421 		md_buf = (char *)buf + data_block_size;
422 		md_offset = block_size;
423 	} else {
424 		data_block_size = block_size;
425 		md_offset = md_size;
426 	}
427 
428 	if (unique) {
429 		uint64_t io_count = job->write_io_count++;
430 		unsigned int i;
431 
432 		assert(md_size == 0 || md_size >= (int)sizeof(uint64_t));
433 
434 		while (offset_blocks < num_blocks) {
435 			inner_offset = 0;
436 			while (inner_offset < data_block_size) {
437 				*(uint64_t *)buf = (io_count << 32) | (offset_blocks + inner_offset);
438 				inner_offset += sizeof(uint64_t);
439 				buf += sizeof(uint64_t);
440 			}
441 			for (i = 0; i < md_size / sizeof(uint64_t); i++) {
442 				((uint64_t *)md_buf)[i] = (io_count << 32) | offset_blocks;
443 			}
444 			md_buf += md_offset;
445 			offset_blocks++;
446 		}
447 		return;
448 	}
449 
450 	while (offset_blocks < num_blocks) {
451 		inner_offset = 0;
452 		while (inner_offset < data_block_size) {
453 			*(uint32_t *)buf = offset_blocks + inner_offset;
454 			inner_offset += sizeof(uint32_t);
455 			buf += sizeof(uint32_t);
456 		}
457 		memset(md_buf, offset_blocks, md_size);
458 		md_buf += md_offset;
459 		offset_blocks++;
460 	}
461 }
462 
463 static bool
464 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
465 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
466 {
467 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
468 		return false;
469 	}
470 
471 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
472 
473 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
474 
475 	if (wr_md_buf != NULL) {
476 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
477 	}
478 
479 	return true;
480 }
481 
482 static bool
483 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
484 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
485 {
486 	int offset_blocks = 0, md_offset, data_block_size;
487 
488 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
489 		return false;
490 	}
491 
492 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
493 
494 	if (wr_md_buf == NULL) {
495 		data_block_size = block_size - md_size;
496 		wr_md_buf = (char *)wr_buf + data_block_size;
497 		rd_md_buf = (char *)rd_buf + data_block_size;
498 		md_offset = block_size;
499 	} else {
500 		data_block_size = block_size;
501 		md_offset = md_size;
502 	}
503 
504 	while (offset_blocks < num_blocks) {
505 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
506 			return false;
507 		}
508 
509 		wr_buf += block_size;
510 		rd_buf += block_size;
511 
512 		if (md_check) {
513 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
514 				return false;
515 			}
516 
517 			wr_md_buf += md_offset;
518 			rd_md_buf += md_offset;
519 		}
520 
521 		offset_blocks++;
522 	}
523 
524 	return true;
525 }
526 
527 static void
528 free_job_config(void)
529 {
530 	struct job_config *config, *tmp;
531 
532 	spdk_conf_free(g_bdevperf_conf);
533 	g_bdevperf_conf = NULL;
534 
535 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
536 		TAILQ_REMOVE(&job_config_list, config, link);
537 		free(config);
538 	}
539 }
540 
541 static void
542 bdevperf_job_free(struct bdevperf_job *job)
543 {
544 	spdk_histogram_data_free(job->histogram);
545 	spdk_bit_array_free(&job->outstanding);
546 	spdk_bit_array_free(&job->random_map);
547 	spdk_zipf_free(&job->zipf);
548 	free(job->name);
549 	free(job);
550 }
551 
552 static void
553 job_thread_exit(void *ctx)
554 {
555 	spdk_thread_exit(spdk_get_thread());
556 }
557 
558 static void
559 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
560 	     uint64_t total, uint64_t so_far)
561 {
562 	double so_far_pct;
563 	double **cutoff = ctx;
564 	uint64_t tsc_rate;
565 
566 	if (count == 0) {
567 		return;
568 	}
569 
570 	tsc_rate = spdk_get_ticks_hz();
571 	so_far_pct = (double)so_far / total;
572 	while (so_far_pct >= **cutoff && **cutoff > 0) {
573 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
574 		(*cutoff)++;
575 	}
576 }
577 
578 static void
579 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
580 	     uint64_t total, uint64_t so_far)
581 {
582 	double so_far_pct;
583 	uint64_t tsc_rate;
584 
585 	if (count == 0) {
586 		return;
587 	}
588 
589 	tsc_rate = spdk_get_ticks_hz();
590 	so_far_pct = (double)so_far * 100 / total;
591 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
592 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
593 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
594 	       so_far_pct, count);
595 }
596 
597 static void
598 bdevperf_test_done(void *ctx)
599 {
600 	struct bdevperf_job *job, *jtmp;
601 	struct bdevperf_task *task, *ttmp;
602 	struct lcore_thread *lthread, *lttmp;
603 	double average_latency = 0.0;
604 	uint64_t time_in_usec;
605 	int rc;
606 
607 	if (g_time_in_usec) {
608 		g_stats.io_time_in_usec = g_time_in_usec;
609 
610 		if (!g_run_rc && g_performance_dump_active) {
611 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
612 			return;
613 		}
614 	}
615 
616 	if (g_show_performance_real_time) {
617 		spdk_poller_unregister(&g_perf_timer);
618 	}
619 
620 	if (g_shutdown) {
621 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
622 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
623 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
624 		printf("Received shutdown signal, test time was about %.6f seconds\n",
625 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
626 	}
627 
628 	printf("\n%*s\n", 107, "Latency(us)");
629 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
630 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
631 
632 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
633 		performance_dump_job(&g_stats, job);
634 	}
635 
636 	printf("\r =================================================================================="
637 	       "=================================\n");
638 	printf("\r %-28s: %10s %10.2f %10.2f",
639 	       "Total", "", g_stats.total_io_per_second, g_stats.total_mb_per_second);
640 	printf(" %10.2f %8.2f",
641 	       g_stats.total_failed_per_second, g_stats.total_timeout_per_second);
642 
643 	if (g_stats.total_io_completed != 0) {
644 		average_latency = ((double)g_stats.total_tsc / g_stats.total_io_completed) * SPDK_SEC_TO_USEC /
645 				  spdk_get_ticks_hz();
646 	}
647 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.min_latency, g_stats.max_latency);
648 
649 	fflush(stdout);
650 
651 	if (g_latency_display_level == 0 || g_stats.total_io_completed == 0) {
652 		goto clean;
653 	}
654 
655 	printf("\n Latency summary\n");
656 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
657 		printf("\r =============================================\n");
658 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
659 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
660 
661 		const double *cutoff = g_latency_cutoffs;
662 
663 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
664 
665 		printf("\n");
666 	}
667 
668 	if (g_latency_display_level == 1) {
669 		goto clean;
670 	}
671 
672 	printf("\r Latency histogram\n");
673 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
674 		printf("\r =============================================\n");
675 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
676 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
677 
678 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
679 		printf("\n");
680 	}
681 
682 clean:
683 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
684 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
685 
686 		if (!g_one_thread_per_lcore) {
687 			spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
688 		}
689 
690 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
691 			TAILQ_REMOVE(&job->task_list, task, link);
692 			spdk_free(task->buf);
693 			spdk_free(task->verify_buf);
694 			spdk_free(task->md_buf);
695 			free(task);
696 		}
697 
698 		bdevperf_job_free(job);
699 	}
700 
701 	if (g_one_thread_per_lcore) {
702 		TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
703 			TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
704 			spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
705 			free(lthread);
706 		}
707 	}
708 
709 	if (g_bdevperf_conf == NULL) {
710 		free_job_config();
711 	}
712 
713 	rc = g_run_rc;
714 	if (g_request && !g_shutdown) {
715 		rpc_perform_tests_cb();
716 		if (rc != 0) {
717 			spdk_app_stop(rc);
718 		}
719 	} else {
720 		spdk_app_stop(rc);
721 	}
722 }
723 
724 static void
725 bdevperf_job_end(void *ctx)
726 {
727 	assert(g_main_thread == spdk_get_thread());
728 
729 	if (--g_bdevperf.running_jobs == 0) {
730 		bdevperf_test_done(NULL);
731 	}
732 }
733 
734 static void
735 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
736 {
737 	struct spdk_histogram_data *job_hist = cb_arg;
738 
739 	if (status == 0) {
740 		spdk_histogram_data_merge(job_hist, histogram);
741 	}
742 }
743 
744 static void
745 bdevperf_job_empty(struct bdevperf_job *job)
746 {
747 	uint64_t end_tsc = 0;
748 
749 	end_tsc = spdk_get_ticks() - g_start_tsc;
750 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
751 	/* keep histogram info before channel is destroyed */
752 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
753 					job->histogram);
754 	spdk_put_io_channel(job->ch);
755 	spdk_bdev_close(job->bdev_desc);
756 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
757 }
758 
759 static void
760 bdevperf_end_task(struct bdevperf_task *task)
761 {
762 	struct bdevperf_job     *job = task->job;
763 
764 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
765 	if (job->is_draining) {
766 		if (job->current_queue_depth == 0) {
767 			bdevperf_job_empty(job);
768 		}
769 	}
770 }
771 
772 static void
773 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
774 {
775 	struct bdevperf_job	*job = task->job;
776 
777 	task->bdev_io_wait.bdev = job->bdev;
778 	task->bdev_io_wait.cb_fn = cb_fn;
779 	task->bdev_io_wait.cb_arg = task;
780 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
781 }
782 
783 static int
784 bdevperf_job_drain(void *ctx)
785 {
786 	struct bdevperf_job *job = ctx;
787 
788 	spdk_poller_unregister(&job->run_timer);
789 	if (job->reset) {
790 		spdk_poller_unregister(&job->reset_timer);
791 	}
792 
793 	job->is_draining = true;
794 
795 	return -1;
796 }
797 
798 static int
799 bdevperf_job_drain_timer(void *ctx)
800 {
801 	struct bdevperf_job *job = ctx;
802 
803 	bdevperf_job_drain(ctx);
804 	if (job->current_queue_depth == 0) {
805 		bdevperf_job_empty(job);
806 	}
807 
808 	return SPDK_POLLER_BUSY;
809 }
810 
811 static void
812 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
813 {
814 	struct bdevperf_task	*task = cb_arg;
815 	struct bdevperf_job	*job = task->job;
816 
817 	job->current_queue_depth--;
818 
819 	if (success) {
820 		job->io_completed++;
821 	} else {
822 		job->io_failed++;
823 		if (!job->continue_on_failure) {
824 			bdevperf_job_drain(job);
825 			g_run_rc = -1;
826 		}
827 	}
828 
829 	spdk_bdev_free_io(bdev_io);
830 	bdevperf_end_task(task);
831 }
832 
833 static int
834 bdevperf_verify_dif(struct bdevperf_task *task, struct iovec *iovs, int iovcnt)
835 {
836 	struct bdevperf_job	*job = task->job;
837 	struct spdk_bdev	*bdev = job->bdev;
838 	struct spdk_dif_ctx	dif_ctx;
839 	struct spdk_dif_error	err_blk = {};
840 	int			rc;
841 	struct spdk_dif_ctx_init_ext_opts dif_opts;
842 
843 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
844 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
845 	rc = spdk_dif_ctx_init(&dif_ctx,
846 			       spdk_bdev_get_block_size(bdev),
847 			       spdk_bdev_get_md_size(bdev),
848 			       spdk_bdev_is_md_interleaved(bdev),
849 			       spdk_bdev_is_dif_head_of_md(bdev),
850 			       spdk_bdev_get_dif_type(bdev),
851 			       job->dif_check_flags,
852 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
853 	if (rc != 0) {
854 		fprintf(stderr, "Initialization of DIF context failed\n");
855 		return rc;
856 	}
857 
858 	if (spdk_bdev_is_md_interleaved(bdev)) {
859 		rc = spdk_dif_verify(iovs, iovcnt, job->io_size_blocks, &dif_ctx, &err_blk);
860 	} else {
861 		struct iovec md_iov = {
862 			.iov_base	= task->md_buf,
863 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
864 		};
865 
866 		rc = spdk_dix_verify(iovs, iovcnt, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
867 	}
868 
869 	if (rc != 0) {
870 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
871 			err_blk.err_type, err_blk.err_offset);
872 	}
873 
874 	return rc;
875 }
876 
877 static void
878 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
879 {
880 	struct bdevperf_job	*job;
881 	struct bdevperf_task	*task = cb_arg;
882 	struct iovec		*iovs;
883 	int			iovcnt;
884 	bool			md_check;
885 	uint64_t		offset_in_ios;
886 	int			rc;
887 
888 	job = task->job;
889 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
890 
891 	if (g_error_to_exit == true) {
892 		bdevperf_job_drain(job);
893 	} else if (!success) {
894 		if (!job->reset && !job->continue_on_failure) {
895 			bdevperf_job_drain(job);
896 			g_run_rc = -1;
897 			g_error_to_exit = true;
898 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
899 			       task->offset_blocks, job->name);
900 		}
901 	} else if (job->verify || job->reset) {
902 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
903 		assert(iovcnt == 1);
904 		assert(iovs != NULL);
905 		if (!verify_data(task->buf, job->buf_size, iovs[0].iov_base, iovs[0].iov_len,
906 				 spdk_bdev_get_block_size(job->bdev),
907 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
908 				 spdk_bdev_get_md_size(job->bdev),
909 				 job->io_size_blocks, md_check)) {
910 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
911 			printf("   First dword expected 0x%x got 0x%x\n", *(int *)task->buf, *(int *)iovs[0].iov_base);
912 			bdevperf_job_drain(job);
913 			g_run_rc = -1;
914 		}
915 	} else if (job->dif_check_flags != 0) {
916 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
917 			spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
918 			assert(iovcnt == 1);
919 			assert(iovs != NULL);
920 			rc = bdevperf_verify_dif(task, iovs, iovcnt);
921 			if (rc != 0) {
922 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
923 				       task->offset_blocks, job->name);
924 
925 				success = false;
926 				if (!job->reset && !job->continue_on_failure) {
927 					bdevperf_job_drain(job);
928 					g_run_rc = -1;
929 					g_error_to_exit = true;
930 				}
931 			}
932 		}
933 	}
934 
935 	job->current_queue_depth--;
936 
937 	if (success) {
938 		job->io_completed++;
939 	} else {
940 		job->io_failed++;
941 	}
942 
943 	if (job->verify) {
944 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
945 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
946 
947 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
948 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
949 	}
950 
951 	spdk_bdev_free_io(bdev_io);
952 
953 	/*
954 	 * is_draining indicates when time has expired for the test run
955 	 * and we are just waiting for the previously submitted I/O
956 	 * to complete.  In this case, do not submit a new I/O to replace
957 	 * the one just completed.
958 	 */
959 	if (!job->is_draining) {
960 		bdevperf_submit_single(job, task);
961 	} else {
962 		bdevperf_end_task(task);
963 	}
964 }
965 
966 static void
967 bdevperf_verify_submit_read(void *cb_arg)
968 {
969 	struct bdevperf_job	*job;
970 	struct bdevperf_task	*task = cb_arg;
971 	int			rc;
972 
973 	job = task->job;
974 
975 	/* Read the data back in */
976 	rc = spdk_bdev_read_blocks_with_md(job->bdev_desc, job->ch, task->verify_buf, NULL,
977 					   task->offset_blocks, job->io_size_blocks,
978 					   bdevperf_complete, task);
979 
980 	if (rc == -ENOMEM) {
981 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
982 	} else if (rc != 0) {
983 		printf("Failed to submit read: %d\n", rc);
984 		bdevperf_job_drain(job);
985 		g_run_rc = rc;
986 	}
987 }
988 
989 static void
990 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
991 			       void *cb_arg)
992 {
993 	if (success) {
994 		spdk_bdev_free_io(bdev_io);
995 		bdevperf_verify_submit_read(cb_arg);
996 	} else {
997 		bdevperf_complete(bdev_io, success, cb_arg);
998 	}
999 }
1000 
1001 static void
1002 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1003 {
1004 	if (!success) {
1005 		bdevperf_complete(bdev_io, success, cb_arg);
1006 		return;
1007 	}
1008 
1009 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
1010 }
1011 
1012 static int
1013 bdevperf_generate_dif(struct bdevperf_task *task)
1014 {
1015 	struct bdevperf_job	*job = task->job;
1016 	struct spdk_bdev	*bdev = job->bdev;
1017 	struct spdk_dif_ctx	dif_ctx;
1018 	int			rc;
1019 	struct spdk_dif_ctx_init_ext_opts dif_opts;
1020 
1021 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
1022 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
1023 	rc = spdk_dif_ctx_init(&dif_ctx,
1024 			       spdk_bdev_get_block_size(bdev),
1025 			       spdk_bdev_get_md_size(bdev),
1026 			       spdk_bdev_is_md_interleaved(bdev),
1027 			       spdk_bdev_is_dif_head_of_md(bdev),
1028 			       spdk_bdev_get_dif_type(bdev),
1029 			       job->dif_check_flags,
1030 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
1031 	if (rc != 0) {
1032 		fprintf(stderr, "Initialization of DIF context failed\n");
1033 		return rc;
1034 	}
1035 
1036 	if (spdk_bdev_is_md_interleaved(bdev)) {
1037 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
1038 	} else {
1039 		struct iovec md_iov = {
1040 			.iov_base	= task->md_buf,
1041 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
1042 		};
1043 
1044 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
1045 	}
1046 
1047 	if (rc != 0) {
1048 		fprintf(stderr, "Generation of DIF/DIX failed\n");
1049 	}
1050 
1051 	return rc;
1052 }
1053 
1054 static void
1055 bdevperf_submit_task(void *arg)
1056 {
1057 	struct bdevperf_task	*task = arg;
1058 	struct bdevperf_job	*job = task->job;
1059 	struct spdk_bdev_desc	*desc;
1060 	struct spdk_io_channel	*ch;
1061 	spdk_bdev_io_completion_cb cb_fn;
1062 	uint64_t		offset_in_ios;
1063 	int			rc = 0;
1064 
1065 	desc = job->bdev_desc;
1066 	ch = job->ch;
1067 
1068 	switch (task->io_type) {
1069 	case SPDK_BDEV_IO_TYPE_WRITE:
1070 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
1071 			rc = bdevperf_generate_dif(task);
1072 		}
1073 		if (rc == 0) {
1074 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
1075 
1076 			if (g_zcopy) {
1077 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
1078 				return;
1079 			} else {
1080 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
1081 								     task->md_buf,
1082 								     task->offset_blocks,
1083 								     job->io_size_blocks,
1084 								     cb_fn, task);
1085 			}
1086 		}
1087 		break;
1088 	case SPDK_BDEV_IO_TYPE_FLUSH:
1089 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
1090 					    job->io_size_blocks, bdevperf_complete, task);
1091 		break;
1092 	case SPDK_BDEV_IO_TYPE_UNMAP:
1093 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
1094 					    job->io_size_blocks, bdevperf_complete, task);
1095 		break;
1096 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1097 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
1098 						   job->io_size_blocks, bdevperf_complete, task);
1099 		break;
1100 	case SPDK_BDEV_IO_TYPE_READ:
1101 		if (g_zcopy) {
1102 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
1103 						   true, bdevperf_zcopy_populate_complete, task);
1104 		} else {
1105 			rc = spdk_bdev_read_blocks_with_md(desc, ch, task->buf, task->md_buf,
1106 							   task->offset_blocks,
1107 							   job->io_size_blocks,
1108 							   bdevperf_complete, task);
1109 		}
1110 		break;
1111 	case SPDK_BDEV_IO_TYPE_ABORT:
1112 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
1113 		break;
1114 	default:
1115 		assert(false);
1116 		rc = -EINVAL;
1117 		break;
1118 	}
1119 
1120 	if (rc == -ENOMEM) {
1121 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1122 		return;
1123 	} else if (rc != 0) {
1124 		printf("Failed to submit bdev_io: %d\n", rc);
1125 		if (job->verify) {
1126 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1127 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1128 
1129 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1130 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1131 		}
1132 		bdevperf_job_drain(job);
1133 		g_run_rc = rc;
1134 		return;
1135 	}
1136 
1137 	job->current_queue_depth++;
1138 }
1139 
1140 static void
1141 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1142 {
1143 	struct bdevperf_task	*task = cb_arg;
1144 	struct bdevperf_job	*job = task->job;
1145 	struct iovec		*iovs;
1146 	int			iovcnt;
1147 
1148 	if (!success) {
1149 		bdevperf_job_drain(job);
1150 		g_run_rc = -1;
1151 		return;
1152 	}
1153 
1154 	task->bdev_io = bdev_io;
1155 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1156 
1157 	if (job->verify || job->reset) {
1158 		/* When job->verify or job->reset is enabled, task->buf is used for
1159 		 *  verification of read after write.  For write I/O, when zcopy APIs
1160 		 *  are used, task->buf cannot be used, and data must be written to
1161 		 *  the data buffer allocated underneath bdev layer instead.
1162 		 *  Hence we copy task->buf to the allocated data buffer here.
1163 		 */
1164 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1165 		assert(iovcnt == 1);
1166 		assert(iovs != NULL);
1167 
1168 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1169 			  spdk_bdev_get_block_size(job->bdev),
1170 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1171 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
1172 	}
1173 
1174 	bdevperf_submit_task(task);
1175 }
1176 
1177 static void
1178 bdevperf_prep_zcopy_write_task(void *arg)
1179 {
1180 	struct bdevperf_task	*task = arg;
1181 	struct bdevperf_job	*job = task->job;
1182 	int			rc;
1183 
1184 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1185 				   task->offset_blocks, job->io_size_blocks,
1186 				   false, bdevperf_zcopy_get_buf_complete, task);
1187 	if (rc != 0) {
1188 		assert(rc == -ENOMEM);
1189 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1190 		return;
1191 	}
1192 
1193 	job->current_queue_depth++;
1194 }
1195 
1196 static struct bdevperf_task *
1197 bdevperf_job_get_task(struct bdevperf_job *job)
1198 {
1199 	struct bdevperf_task *task;
1200 
1201 	task = TAILQ_FIRST(&job->task_list);
1202 	if (!task) {
1203 		printf("Task allocation failed\n");
1204 		abort();
1205 	}
1206 
1207 	TAILQ_REMOVE(&job->task_list, task, link);
1208 	return task;
1209 }
1210 
1211 static void
1212 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1213 {
1214 	uint64_t offset_in_ios;
1215 	uint64_t rand_value;
1216 	uint32_t first_clear;
1217 
1218 	if (job->zipf) {
1219 		offset_in_ios = spdk_zipf_generate(job->zipf);
1220 	} else if (job->is_random) {
1221 		/* RAND_MAX is only INT32_MAX, so use 2 calls to rand_r to
1222 		 * get a large enough value to ensure we are issuing I/O
1223 		 * uniformly across the whole bdev.
1224 		 */
1225 		rand_value = (uint64_t)rand_r(&job->seed) * RAND_MAX + rand_r(&job->seed);
1226 		offset_in_ios = rand_value % job->size_in_ios;
1227 
1228 		if (g_random_map) {
1229 			/* Make sure, that the offset does not exceed the maximum size
1230 			 * of the bit array (verified during job creation)
1231 			 */
1232 			assert(offset_in_ios < UINT32_MAX);
1233 
1234 			first_clear = spdk_bit_array_find_first_clear(job->random_map, (uint32_t)offset_in_ios);
1235 
1236 			if (first_clear == UINT32_MAX) {
1237 				first_clear = spdk_bit_array_find_first_clear(job->random_map, 0);
1238 
1239 				if (first_clear == UINT32_MAX) {
1240 					/* If there are no more clear bits in the array, we start over
1241 					 * and select the previously selected random value.
1242 					 */
1243 					spdk_bit_array_clear_mask(job->random_map);
1244 					first_clear = (uint32_t)offset_in_ios;
1245 				}
1246 			}
1247 
1248 			spdk_bit_array_set(job->random_map, first_clear);
1249 
1250 			offset_in_ios = first_clear;
1251 		}
1252 	} else {
1253 		offset_in_ios = job->offset_in_ios++;
1254 		if (job->offset_in_ios == job->size_in_ios) {
1255 			job->offset_in_ios = 0;
1256 		}
1257 
1258 		/* Increment of offset_in_ios if there's already an outstanding IO
1259 		 * to that location. We only need this with job->verify as random
1260 		 * offsets are not supported with job->verify at this time.
1261 		 */
1262 		if (job->verify) {
1263 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1264 
1265 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1266 				offset_in_ios = job->offset_in_ios++;
1267 				if (job->offset_in_ios == job->size_in_ios) {
1268 					job->offset_in_ios = 0;
1269 				}
1270 			}
1271 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1272 		}
1273 	}
1274 
1275 	/* For multi-thread to same job, offset_in_ios is relative
1276 	 * to the LBA range assigned for that job. job->offset_blocks
1277 	 * is absolute (entire bdev LBA range).
1278 	 */
1279 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1280 
1281 	if (job->flush) {
1282 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1283 	} else if (job->unmap) {
1284 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1285 	} else if (job->write_zeroes) {
1286 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1287 	} else if ((job->rw_percentage == 100) ||
1288 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1289 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1290 	} else {
1291 		if (job->verify || job->reset || g_unique_writes) {
1292 			generate_data(job, task->buf, task->md_buf, g_unique_writes);
1293 		}
1294 		if (g_zcopy) {
1295 			bdevperf_prep_zcopy_write_task(task);
1296 			return;
1297 		} else {
1298 			task->iov.iov_base = task->buf;
1299 			task->iov.iov_len = job->buf_size;
1300 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1301 		}
1302 	}
1303 
1304 	bdevperf_submit_task(task);
1305 }
1306 
1307 static int reset_job(void *arg);
1308 
1309 static void
1310 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1311 {
1312 	struct bdevperf_task	*task = cb_arg;
1313 	struct bdevperf_job	*job = task->job;
1314 
1315 	if (!success) {
1316 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1317 		bdevperf_job_drain(job);
1318 		g_run_rc = -1;
1319 	}
1320 
1321 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1322 	spdk_bdev_free_io(bdev_io);
1323 
1324 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1325 						10 * SPDK_SEC_TO_USEC);
1326 }
1327 
1328 static int
1329 reset_job(void *arg)
1330 {
1331 	struct bdevperf_job *job = arg;
1332 	struct bdevperf_task *task;
1333 	int rc;
1334 
1335 	spdk_poller_unregister(&job->reset_timer);
1336 
1337 	/* Do reset. */
1338 	task = bdevperf_job_get_task(job);
1339 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1340 			     reset_cb, task);
1341 	if (rc) {
1342 		printf("Reset failed: %d\n", rc);
1343 		bdevperf_job_drain(job);
1344 		g_run_rc = -1;
1345 	}
1346 
1347 	return -1;
1348 }
1349 
1350 static void
1351 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1352 {
1353 	struct bdevperf_job *job = cb_arg;
1354 	struct bdevperf_task *task;
1355 
1356 	job->io_timeout++;
1357 
1358 	if (job->is_draining || !job->abort ||
1359 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1360 		return;
1361 	}
1362 
1363 	task = bdevperf_job_get_task(job);
1364 	if (task == NULL) {
1365 		return;
1366 	}
1367 
1368 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1369 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1370 
1371 	bdevperf_submit_task(task);
1372 }
1373 
1374 static void
1375 bdevperf_job_run(void *ctx)
1376 {
1377 	struct bdevperf_job *job = ctx;
1378 	struct bdevperf_task *task;
1379 	int i;
1380 
1381 	/* Submit initial I/O for this job. Each time one
1382 	 * completes, another will be submitted. */
1383 
1384 	/* Start a timer to stop this I/O chain when the run is over */
1385 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1386 	if (job->reset) {
1387 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1388 							10 * SPDK_SEC_TO_USEC);
1389 	}
1390 
1391 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1392 
1393 	for (i = 0; i < job->queue_depth; i++) {
1394 		task = bdevperf_job_get_task(job);
1395 		bdevperf_submit_single(job, task);
1396 	}
1397 }
1398 
1399 static void
1400 _performance_dump_done(void *ctx)
1401 {
1402 	struct bdevperf_aggregate_stats *stats = ctx;
1403 	double average_latency;
1404 
1405 	printf("\r =================================================================================="
1406 	       "=================================\n");
1407 	printf("\r %-28s: %10s %10.2f %10.2f",
1408 	       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1409 	printf(" %10.2f %8.2f",
1410 	       stats->total_failed_per_second, stats->total_timeout_per_second);
1411 
1412 	average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1413 			  spdk_get_ticks_hz();
1414 	printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1415 	printf("\n");
1416 
1417 	fflush(stdout);
1418 
1419 	g_performance_dump_active = false;
1420 
1421 	free(stats);
1422 }
1423 
1424 static void
1425 _performance_dump(void *ctx)
1426 {
1427 	struct bdevperf_aggregate_stats *stats = ctx;
1428 
1429 	performance_dump_job(stats, stats->current_job);
1430 
1431 	/* This assumes the jobs list is static after start up time.
1432 	 * That's true right now, but if that ever changed this would need a lock. */
1433 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1434 	if (stats->current_job == NULL) {
1435 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1436 	} else {
1437 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1438 	}
1439 }
1440 
1441 static int
1442 performance_statistics_thread(void *arg)
1443 {
1444 	struct bdevperf_aggregate_stats *stats;
1445 
1446 	if (g_performance_dump_active) {
1447 		return -1;
1448 	}
1449 
1450 	g_performance_dump_active = true;
1451 
1452 	stats = calloc(1, sizeof(*stats));
1453 	if (stats == NULL) {
1454 		return -1;
1455 	}
1456 
1457 	stats->min_latency = (double)UINT64_MAX;
1458 
1459 	g_show_performance_period_num++;
1460 
1461 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1462 	stats->ema_period = g_show_performance_ema_period;
1463 
1464 	/* Iterate all of the jobs to gather stats
1465 	 * These jobs will not get removed here until a final performance dump is run,
1466 	 * so this should be safe without locking.
1467 	 */
1468 	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1469 	if (stats->current_job == NULL) {
1470 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1471 	} else {
1472 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1473 	}
1474 
1475 	return -1;
1476 }
1477 
1478 static void
1479 bdevperf_test(void)
1480 {
1481 	struct bdevperf_job *job;
1482 
1483 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1484 	fflush(stdout);
1485 
1486 	/* Start a timer to dump performance numbers */
1487 	g_start_tsc = spdk_get_ticks();
1488 	if (g_show_performance_real_time && !g_perf_timer) {
1489 		printf("%*s\n", 107, "Latency(us)");
1490 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1491 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1492 
1493 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1494 						    g_show_performance_period_in_usec);
1495 	}
1496 
1497 	/* Iterate jobs to start all I/O */
1498 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1499 		g_bdevperf.running_jobs++;
1500 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1501 	}
1502 }
1503 
1504 static void
1505 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1506 {
1507 	struct bdevperf_job *job = event_ctx;
1508 
1509 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1510 		bdevperf_job_drain(job);
1511 	}
1512 }
1513 
1514 static void
1515 bdevperf_histogram_status_cb(void *cb_arg, int status)
1516 {
1517 	if (status != 0) {
1518 		g_run_rc = status;
1519 		if (g_continue_on_failure == false) {
1520 			g_error_to_exit = true;
1521 		}
1522 	}
1523 
1524 	if (--g_bdev_count == 0) {
1525 		if (g_run_rc == 0) {
1526 			/* Ready to run the test */
1527 			bdevperf_test();
1528 		} else {
1529 			bdevperf_test_done(NULL);
1530 		}
1531 	}
1532 }
1533 
1534 static uint32_t g_construct_job_count = 0;
1535 
1536 static int
1537 _bdevperf_enable_histogram(void *ctx, struct spdk_bdev *bdev)
1538 {
1539 	bool *enable = ctx;
1540 
1541 	g_bdev_count++;
1542 
1543 	spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, *enable);
1544 
1545 	return 0;
1546 }
1547 
1548 static void
1549 bdevperf_enable_histogram(bool enable)
1550 {
1551 	struct spdk_bdev *bdev;
1552 	int rc;
1553 
1554 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1555 	g_bdev_count = 1;
1556 
1557 	if (g_job_bdev_name != NULL) {
1558 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1559 		if (bdev) {
1560 			rc = _bdevperf_enable_histogram(&enable, bdev);
1561 		} else {
1562 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1563 			rc = -1;
1564 		}
1565 	} else {
1566 		rc = spdk_for_each_bdev_leaf(&enable, _bdevperf_enable_histogram);
1567 	}
1568 
1569 	bdevperf_histogram_status_cb(NULL, rc);
1570 }
1571 
1572 static void
1573 _bdevperf_construct_job_done(void *ctx)
1574 {
1575 	if (--g_construct_job_count == 0) {
1576 		if (g_run_rc != 0) {
1577 			/* Something failed. */
1578 			bdevperf_test_done(NULL);
1579 			return;
1580 		}
1581 
1582 		/* always enable histogram. */
1583 		bdevperf_enable_histogram(true);
1584 	} else if (g_run_rc != 0) {
1585 		/* Reset error as some jobs constructed right */
1586 		g_run_rc = 0;
1587 		if (g_continue_on_failure == false) {
1588 			g_error_to_exit = true;
1589 		}
1590 	}
1591 }
1592 
1593 /* Checkformat will not allow to use inlined type,
1594    this is a workaround */
1595 typedef struct spdk_thread *spdk_thread_t;
1596 
1597 static spdk_thread_t
1598 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1599 {
1600 	struct spdk_cpuset tmp;
1601 
1602 	/* This function runs on the main thread. */
1603 	assert(g_main_thread == spdk_get_thread());
1604 
1605 	/* Handle default mask */
1606 	if (spdk_cpuset_count(cpumask) == 0) {
1607 		cpumask = &g_all_cpuset;
1608 	}
1609 
1610 	/* Warn user that mask might need to be changed */
1611 	spdk_cpuset_copy(&tmp, cpumask);
1612 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1613 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1614 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1615 	}
1616 
1617 	return spdk_thread_create(tag, cpumask);
1618 }
1619 
1620 static uint32_t
1621 _get_next_core(void)
1622 {
1623 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1624 
1625 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1626 		current_core = spdk_env_get_first_core();
1627 		return current_core;
1628 	}
1629 
1630 	current_core = spdk_env_get_next_core(current_core);
1631 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1632 		current_core = spdk_env_get_first_core();
1633 	}
1634 
1635 	return current_core;
1636 }
1637 
1638 static void
1639 _bdevperf_construct_job(void *ctx)
1640 {
1641 	struct bdevperf_job *job = ctx;
1642 	int rc;
1643 
1644 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1645 				&job->bdev_desc);
1646 	if (rc != 0) {
1647 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1648 		g_run_rc = -EINVAL;
1649 		goto end;
1650 	}
1651 
1652 	if (g_zcopy) {
1653 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1654 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1655 			g_run_rc = -ENOTSUP;
1656 			goto end;
1657 		}
1658 	}
1659 
1660 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1661 	if (!job->ch) {
1662 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1663 			    rc);
1664 		spdk_bdev_close(job->bdev_desc);
1665 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1666 		g_run_rc = -ENOMEM;
1667 		goto end;
1668 	}
1669 
1670 end:
1671 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1672 }
1673 
1674 static void
1675 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1676 {
1677 	switch (rw) {
1678 	case JOB_CONFIG_RW_READ:
1679 		job->rw_percentage = 100;
1680 		break;
1681 	case JOB_CONFIG_RW_WRITE:
1682 		job->rw_percentage = 0;
1683 		break;
1684 	case JOB_CONFIG_RW_RANDREAD:
1685 		job->is_random = true;
1686 		job->rw_percentage = 100;
1687 		job->seed = rand();
1688 		break;
1689 	case JOB_CONFIG_RW_RANDWRITE:
1690 		job->is_random = true;
1691 		job->rw_percentage = 0;
1692 		job->seed = rand();
1693 		break;
1694 	case JOB_CONFIG_RW_RW:
1695 		job->is_random = false;
1696 		break;
1697 	case JOB_CONFIG_RW_RANDRW:
1698 		job->is_random = true;
1699 		job->seed = rand();
1700 		break;
1701 	case JOB_CONFIG_RW_VERIFY:
1702 		job->verify = true;
1703 		job->rw_percentage = 50;
1704 		break;
1705 	case JOB_CONFIG_RW_RESET:
1706 		job->reset = true;
1707 		job->verify = true;
1708 		job->rw_percentage = 50;
1709 		break;
1710 	case JOB_CONFIG_RW_UNMAP:
1711 		job->unmap = true;
1712 		break;
1713 	case JOB_CONFIG_RW_FLUSH:
1714 		job->flush = true;
1715 		break;
1716 	case JOB_CONFIG_RW_WRITE_ZEROES:
1717 		job->write_zeroes = true;
1718 		break;
1719 	}
1720 }
1721 
1722 static int
1723 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1724 		       struct spdk_thread *thread)
1725 {
1726 	struct bdevperf_job *job;
1727 	struct bdevperf_task *task;
1728 	int block_size, data_block_size;
1729 	int rc;
1730 	int task_num, n;
1731 
1732 	block_size = spdk_bdev_get_block_size(bdev);
1733 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1734 
1735 	job = calloc(1, sizeof(struct bdevperf_job));
1736 	if (!job) {
1737 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1738 		return -ENOMEM;
1739 	}
1740 
1741 	job->name = strdup(spdk_bdev_get_name(bdev));
1742 	if (!job->name) {
1743 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1744 		bdevperf_job_free(job);
1745 		return -ENOMEM;
1746 	}
1747 
1748 	job->workload_type = config->rw;
1749 	job->io_size = config->bs;
1750 	job->rw_percentage = config->rwmixread;
1751 	job->continue_on_failure = g_continue_on_failure;
1752 	job->queue_depth = config->iodepth;
1753 	job->bdev = bdev;
1754 	job->io_size_blocks = job->io_size / data_block_size;
1755 	job->buf_size = job->io_size_blocks * block_size;
1756 	job->abort = g_abort;
1757 	job_init_rw(job, config->rw);
1758 
1759 	if ((job->io_size % data_block_size) != 0) {
1760 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1761 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1762 		bdevperf_job_free(job);
1763 		return -ENOTSUP;
1764 	}
1765 
1766 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1767 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1768 		bdevperf_job_free(job);
1769 		return -ENOTSUP;
1770 	}
1771 
1772 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1773 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1774 	}
1775 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1776 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1777 	}
1778 
1779 	job->offset_in_ios = 0;
1780 
1781 	if (config->length != 0) {
1782 		/* Use subset of disk */
1783 		job->size_in_ios = config->length / job->io_size_blocks;
1784 		job->ios_base = config->offset / job->io_size_blocks;
1785 	} else {
1786 		/* Use whole disk */
1787 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1788 		job->ios_base = 0;
1789 	}
1790 
1791 	if (job->is_random && g_zipf_theta > 0) {
1792 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1793 	}
1794 
1795 	if (job->verify) {
1796 		if (job->size_in_ios >= UINT32_MAX) {
1797 			SPDK_ERRLOG("Due to constraints of verify operation, the job storage capacity is too large\n");
1798 			bdevperf_job_free(job);
1799 			return -ENOMEM;
1800 		}
1801 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1802 		if (job->outstanding == NULL) {
1803 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1804 				    spdk_bdev_get_name(bdev));
1805 			bdevperf_job_free(job);
1806 			return -ENOMEM;
1807 		}
1808 		if (job->queue_depth > (int)job->size_in_ios) {
1809 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1810 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1811 				     "Queue depth is limited to %"PRIu64"\n",
1812 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1813 			job->queue_depth = (int)job->size_in_ios;
1814 		}
1815 	}
1816 
1817 	job->histogram = spdk_histogram_data_alloc();
1818 	if (job->histogram == NULL) {
1819 		fprintf(stderr, "Failed to allocate histogram\n");
1820 		bdevperf_job_free(job);
1821 		return -ENOMEM;
1822 	}
1823 
1824 	TAILQ_INIT(&job->task_list);
1825 
1826 	if (g_random_map) {
1827 		if (job->size_in_ios >= UINT32_MAX) {
1828 			SPDK_ERRLOG("Due to constraints of the random map, the job storage capacity is too large\n");
1829 			bdevperf_job_free(job);
1830 			return -ENOMEM;
1831 		}
1832 		job->random_map = spdk_bit_array_create(job->size_in_ios);
1833 		if (job->random_map == NULL) {
1834 			SPDK_ERRLOG("Could not create random_map array bitmap for bdev %s\n",
1835 				    spdk_bdev_get_name(bdev));
1836 			bdevperf_job_free(job);
1837 			return -ENOMEM;
1838 		}
1839 	}
1840 
1841 	task_num = job->queue_depth;
1842 	if (job->reset) {
1843 		task_num += 1;
1844 	}
1845 	if (job->abort) {
1846 		task_num += job->queue_depth;
1847 	}
1848 
1849 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1850 
1851 	for (n = 0; n < task_num; n++) {
1852 		task = calloc(1, sizeof(struct bdevperf_task));
1853 		if (!task) {
1854 			fprintf(stderr, "Failed to allocate task from memory\n");
1855 			spdk_zipf_free(&job->zipf);
1856 			return -ENOMEM;
1857 		}
1858 
1859 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1860 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1861 		if (!task->buf) {
1862 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
1863 			spdk_zipf_free(&job->zipf);
1864 			free(task);
1865 			return -ENOMEM;
1866 		}
1867 
1868 		if (job->verify && job->buf_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
1869 			task->verify_buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1870 							SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1871 			if (!task->verify_buf) {
1872 				fprintf(stderr, "Cannot allocate buf_verify for task=%p\n", task);
1873 				spdk_free(task->buf);
1874 				spdk_zipf_free(&job->zipf);
1875 				free(task);
1876 				return -ENOMEM;
1877 			}
1878 
1879 		}
1880 
1881 		if (spdk_bdev_is_md_separate(job->bdev)) {
1882 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
1883 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
1884 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1885 			if (!task->md_buf) {
1886 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
1887 				spdk_zipf_free(&job->zipf);
1888 				spdk_free(task->verify_buf);
1889 				spdk_free(task->buf);
1890 				free(task);
1891 				return -ENOMEM;
1892 			}
1893 		}
1894 
1895 		task->job = job;
1896 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
1897 	}
1898 
1899 	job->thread = thread;
1900 
1901 	g_construct_job_count++;
1902 
1903 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
1904 	assert(rc == 0);
1905 
1906 	return rc;
1907 }
1908 
1909 static int
1910 parse_rw(const char *str, enum job_config_rw ret)
1911 {
1912 	if (str == NULL) {
1913 		return ret;
1914 	}
1915 
1916 	if (!strcmp(str, "read")) {
1917 		ret = JOB_CONFIG_RW_READ;
1918 	} else if (!strcmp(str, "randread")) {
1919 		ret = JOB_CONFIG_RW_RANDREAD;
1920 	} else if (!strcmp(str, "write")) {
1921 		ret = JOB_CONFIG_RW_WRITE;
1922 	} else if (!strcmp(str, "randwrite")) {
1923 		ret = JOB_CONFIG_RW_RANDWRITE;
1924 	} else if (!strcmp(str, "verify")) {
1925 		ret = JOB_CONFIG_RW_VERIFY;
1926 	} else if (!strcmp(str, "reset")) {
1927 		ret = JOB_CONFIG_RW_RESET;
1928 	} else if (!strcmp(str, "unmap")) {
1929 		ret = JOB_CONFIG_RW_UNMAP;
1930 	} else if (!strcmp(str, "write_zeroes")) {
1931 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
1932 	} else if (!strcmp(str, "flush")) {
1933 		ret = JOB_CONFIG_RW_FLUSH;
1934 	} else if (!strcmp(str, "rw")) {
1935 		ret = JOB_CONFIG_RW_RW;
1936 	} else if (!strcmp(str, "randrw")) {
1937 		ret = JOB_CONFIG_RW_RANDRW;
1938 	} else {
1939 		fprintf(stderr, "rw must be one of\n"
1940 			PATTERN_TYPES_STR "\n");
1941 		ret = BDEVPERF_CONFIG_ERROR;
1942 	}
1943 
1944 	return ret;
1945 }
1946 
1947 static const char *
1948 config_filename_next(const char *filename, char *out)
1949 {
1950 	int i, k;
1951 
1952 	if (filename == NULL) {
1953 		out[0] = '\0';
1954 		return NULL;
1955 	}
1956 
1957 	if (filename[0] == ':') {
1958 		filename++;
1959 	}
1960 
1961 	for (i = 0, k = 0;
1962 	     filename[i] != '\0' &&
1963 	     filename[i] != ':' &&
1964 	     i < BDEVPERF_CONFIG_MAX_FILENAME &&
1965 	     k < (BDEVPERF_CONFIG_MAX_FILENAME - 1);
1966 	     i++) {
1967 		if (filename[i] == ' ' || filename[i] == '\t') {
1968 			continue;
1969 		}
1970 
1971 		out[k++] = filename[i];
1972 	}
1973 	out[k] = 0;
1974 
1975 	return filename + i;
1976 }
1977 
1978 static struct spdk_thread *
1979 get_lcore_thread(uint32_t lcore)
1980 {
1981 	struct lcore_thread *lthread;
1982 
1983 	TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
1984 		if (lthread->lcore == lcore) {
1985 			return lthread->thread;
1986 		}
1987 	}
1988 
1989 	return NULL;
1990 }
1991 
1992 static void
1993 create_lcore_thread(uint32_t lcore)
1994 {
1995 	struct lcore_thread *lthread;
1996 	struct spdk_cpuset cpumask = {};
1997 	char name[32];
1998 
1999 	lthread = calloc(1, sizeof(*lthread));
2000 	assert(lthread != NULL);
2001 
2002 	lthread->lcore = lcore;
2003 
2004 	snprintf(name, sizeof(name), "lcore_%u", lcore);
2005 	spdk_cpuset_set_cpu(&cpumask, lcore, true);
2006 
2007 	lthread->thread = spdk_thread_create(name, &cpumask);
2008 	assert(lthread->thread != NULL);
2009 
2010 	TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
2011 }
2012 
2013 static void
2014 bdevperf_construct_jobs(void)
2015 {
2016 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
2017 	struct spdk_thread *thread;
2018 	struct job_config *config;
2019 	struct spdk_bdev *bdev;
2020 	const char *filenames;
2021 	uint32_t i;
2022 	int rc;
2023 
2024 	if (g_one_thread_per_lcore) {
2025 		SPDK_ENV_FOREACH_CORE(i) {
2026 			create_lcore_thread(i);
2027 		}
2028 	}
2029 
2030 	TAILQ_FOREACH(config, &job_config_list, link) {
2031 		filenames = config->filename;
2032 
2033 		if (!g_one_thread_per_lcore) {
2034 			thread = construct_job_thread(&config->cpumask, config->name);
2035 		} else {
2036 			thread = get_lcore_thread(config->lcore);
2037 		}
2038 		assert(thread);
2039 
2040 		while (filenames) {
2041 			filenames = config_filename_next(filenames, filename);
2042 			if (strlen(filename) == 0) {
2043 				break;
2044 			}
2045 
2046 			bdev = spdk_bdev_get_by_name(filename);
2047 			if (!bdev) {
2048 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
2049 				g_run_rc = -EINVAL;
2050 				return;
2051 			}
2052 
2053 			rc = bdevperf_construct_job(bdev, config, thread);
2054 			if (rc < 0) {
2055 				g_run_rc = rc;
2056 				return;
2057 			}
2058 		}
2059 	}
2060 }
2061 
2062 static int
2063 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
2064 {
2065 	struct job_config *config = calloc(1, sizeof(*config));
2066 
2067 	if (config == NULL) {
2068 		fprintf(stderr, "Unable to allocate memory for job config\n");
2069 		return -ENOMEM;
2070 	}
2071 
2072 	config->name = filename;
2073 	config->filename = filename;
2074 	config->lcore = _get_next_core();
2075 	spdk_cpuset_zero(&config->cpumask);
2076 	spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
2077 	config->bs = g_io_size;
2078 	config->iodepth = g_queue_depth;
2079 	config->rwmixread = g_rw_percentage;
2080 	config->offset = offset;
2081 	config->length = range;
2082 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
2083 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2084 		free(config);
2085 		return -EINVAL;
2086 	}
2087 
2088 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
2089 	return 0;
2090 }
2091 
2092 static int
2093 bdevperf_construct_multithread_job_config(void *ctx, struct spdk_bdev *bdev)
2094 {
2095 	uint32_t *num_cores = ctx;
2096 	uint32_t i;
2097 	uint64_t blocks_per_job;
2098 	int64_t offset;
2099 	int rc;
2100 
2101 	blocks_per_job = spdk_bdev_get_num_blocks(bdev) / *num_cores;
2102 	offset = 0;
2103 
2104 	SPDK_ENV_FOREACH_CORE(i) {
2105 		rc = make_cli_job_config(spdk_bdev_get_name(bdev), offset, blocks_per_job);
2106 		if (rc) {
2107 			return rc;
2108 		}
2109 
2110 		offset += blocks_per_job;
2111 	}
2112 
2113 	return 0;
2114 }
2115 
2116 static void
2117 bdevperf_construct_multithread_job_configs(void)
2118 {
2119 	struct spdk_bdev *bdev;
2120 	uint32_t i;
2121 	uint32_t num_cores;
2122 
2123 	num_cores = 0;
2124 	SPDK_ENV_FOREACH_CORE(i) {
2125 		num_cores++;
2126 	}
2127 
2128 	if (num_cores == 0) {
2129 		g_run_rc = -EINVAL;
2130 		return;
2131 	}
2132 
2133 	if (g_job_bdev_name != NULL) {
2134 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2135 		if (!bdev) {
2136 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2137 			return;
2138 		}
2139 		g_run_rc = bdevperf_construct_multithread_job_config(&num_cores, bdev);
2140 	} else {
2141 		g_run_rc = spdk_for_each_bdev_leaf(&num_cores, bdevperf_construct_multithread_job_config);
2142 	}
2143 
2144 }
2145 
2146 static int
2147 bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
2148 {
2149 	/* Construct the job */
2150 	return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
2151 }
2152 
2153 static void
2154 bdevperf_construct_job_configs(void)
2155 {
2156 	struct spdk_bdev *bdev;
2157 
2158 	/* There are three different modes for allocating jobs. Standard mode
2159 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
2160 	 *
2161 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
2162 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
2163 	 * This runs multiple threads per bdev, effectively.
2164 	 *
2165 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
2166 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
2167 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
2168 	 * which is different from other modes in that they only support global options.
2169 	 *
2170 	 * Both for standard mode and "multithread" mode, if the -E flag is specified,
2171 	 * it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
2172 	 * multiple jobs.
2173 	 */
2174 
2175 	if (g_bdevperf_conf) {
2176 		goto end;
2177 	}
2178 
2179 	if (g_multithread_mode) {
2180 		bdevperf_construct_multithread_job_configs();
2181 	} else if (g_job_bdev_name != NULL) {
2182 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2183 		if (bdev) {
2184 			/* Construct the job */
2185 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
2186 		} else {
2187 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2188 		}
2189 	} else {
2190 		g_run_rc = spdk_for_each_bdev_leaf(NULL, bdevperf_construct_job_config);
2191 	}
2192 
2193 end:
2194 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
2195 	 * of iteration.
2196 	 */
2197 	g_construct_job_count = 1;
2198 
2199 	if (g_run_rc == 0) {
2200 		bdevperf_construct_jobs();
2201 	}
2202 
2203 	_bdevperf_construct_job_done(NULL);
2204 }
2205 
2206 static int
2207 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
2208 {
2209 	const char *job_name;
2210 	int tmp;
2211 
2212 	tmp = spdk_conf_section_get_intval(s, name);
2213 	if (tmp == -1) {
2214 		/* Field was not found. Check default value
2215 		 * In [global] section it is ok to have undefined values
2216 		 * but for other sections it is not ok */
2217 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
2218 			job_name = spdk_conf_section_get_name(s);
2219 			if (strcmp(job_name, "global") == 0) {
2220 				return def;
2221 			}
2222 
2223 			fprintf(stderr,
2224 				"Job '%s' has no '%s' assigned\n",
2225 				job_name, name);
2226 			return BDEVPERF_CONFIG_ERROR;
2227 		}
2228 		return def;
2229 	}
2230 
2231 	/* NOTE: get_intval returns nonnegative on success */
2232 	if (tmp < 0) {
2233 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2234 			spdk_conf_section_get_name(s), name);
2235 		return BDEVPERF_CONFIG_ERROR;
2236 	}
2237 
2238 	return tmp;
2239 }
2240 
2241 /* CLI arguments override parameters for global sections */
2242 static void
2243 config_set_cli_args(struct job_config *config)
2244 {
2245 	if (g_job_bdev_name) {
2246 		config->filename = g_job_bdev_name;
2247 	}
2248 	if (g_io_size > 0) {
2249 		config->bs = g_io_size;
2250 	}
2251 	if (g_queue_depth > 0) {
2252 		config->iodepth = g_queue_depth;
2253 	}
2254 	if (g_rw_percentage > 0) {
2255 		config->rwmixread = g_rw_percentage;
2256 	}
2257 	if (g_workload_type) {
2258 		config->rw = parse_rw(g_workload_type, config->rw);
2259 	}
2260 }
2261 
2262 static int
2263 read_job_config(void)
2264 {
2265 	struct job_config global_default_config;
2266 	struct job_config global_config;
2267 	struct spdk_conf_section *s;
2268 	struct job_config *config = NULL;
2269 	const char *cpumask;
2270 	const char *rw;
2271 	bool is_global;
2272 	int n = 0;
2273 	int val;
2274 
2275 	if (g_bdevperf_conf_file == NULL) {
2276 		return 0;
2277 	}
2278 
2279 	g_bdevperf_conf = spdk_conf_allocate();
2280 	if (g_bdevperf_conf == NULL) {
2281 		fprintf(stderr, "Could not allocate job config structure\n");
2282 		return 1;
2283 	}
2284 
2285 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2286 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2287 		fprintf(stderr, "Invalid job config");
2288 		return 1;
2289 	}
2290 
2291 	/* Initialize global defaults */
2292 	global_default_config.filename = NULL;
2293 	/* Zero mask is the same as g_all_cpuset
2294 	 * The g_all_cpuset is not initialized yet,
2295 	 * so use zero mask as the default instead */
2296 	spdk_cpuset_zero(&global_default_config.cpumask);
2297 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2298 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2299 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2300 	global_default_config.rwmixread = 50;
2301 	global_default_config.offset = 0;
2302 	/* length 0 means 100% */
2303 	global_default_config.length = 0;
2304 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2305 	config_set_cli_args(&global_default_config);
2306 
2307 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2308 		return 1;
2309 	}
2310 
2311 	/* There is only a single instance of global job_config
2312 	 * We just reset its value when we encounter new [global] section */
2313 	global_config = global_default_config;
2314 
2315 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2316 	     s != NULL;
2317 	     s = spdk_conf_next_section(s)) {
2318 		config = calloc(1, sizeof(*config));
2319 		if (config == NULL) {
2320 			fprintf(stderr, "Unable to allocate memory for job config\n");
2321 			return 1;
2322 		}
2323 
2324 		config->name = spdk_conf_section_get_name(s);
2325 		is_global = strcmp(config->name, "global") == 0;
2326 
2327 		if (is_global) {
2328 			global_config = global_default_config;
2329 		}
2330 
2331 		config->filename = spdk_conf_section_get_val(s, "filename");
2332 		if (config->filename == NULL) {
2333 			config->filename = global_config.filename;
2334 		}
2335 		if (!is_global) {
2336 			if (config->filename == NULL) {
2337 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2338 				goto error;
2339 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2340 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2341 				fprintf(stderr,
2342 					"filename for '%s' job is too long. Max length is %d\n",
2343 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2344 				goto error;
2345 			}
2346 		}
2347 
2348 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2349 		if (cpumask == NULL) {
2350 			config->cpumask = global_config.cpumask;
2351 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2352 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2353 			goto error;
2354 		}
2355 
2356 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2357 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2358 			goto error;
2359 		} else if (config->bs == 0) {
2360 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2361 			goto error;
2362 		}
2363 
2364 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2365 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2366 			goto error;
2367 		} else if (config->iodepth == 0) {
2368 			fprintf(stderr,
2369 				"'iodepth' of job '%s' must be greater than 0\n",
2370 				config->name);
2371 			goto error;
2372 		}
2373 
2374 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2375 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2376 			goto error;
2377 		} else if (config->rwmixread > 100) {
2378 			fprintf(stderr,
2379 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2380 				config->name);
2381 			goto error;
2382 		}
2383 
2384 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2385 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2386 			goto error;
2387 		}
2388 
2389 		val = parse_uint_option(s, "length", global_config.length);
2390 		if (val == BDEVPERF_CONFIG_ERROR) {
2391 			goto error;
2392 		}
2393 		config->length = val;
2394 
2395 		rw = spdk_conf_section_get_val(s, "rw");
2396 		config->rw = parse_rw(rw, global_config.rw);
2397 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2398 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2399 			goto error;
2400 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2401 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2402 			goto error;
2403 		}
2404 
2405 		if (is_global) {
2406 			config_set_cli_args(config);
2407 			global_config = *config;
2408 			free(config);
2409 			config = NULL;
2410 		} else {
2411 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2412 			n++;
2413 		}
2414 	}
2415 
2416 	if (g_rpc_log_file_name != NULL) {
2417 		g_rpc_log_file = fopen(g_rpc_log_file_name, "a");
2418 		if (g_rpc_log_file == NULL) {
2419 			fprintf(stderr, "Failed to open %s\n", g_rpc_log_file_name);
2420 			goto error;
2421 		}
2422 	}
2423 
2424 	printf("Using job config with %d jobs\n", n);
2425 	return 0;
2426 error:
2427 	free(config);
2428 	return 1;
2429 }
2430 
2431 static void
2432 bdevperf_run(void *arg1)
2433 {
2434 	uint32_t i;
2435 
2436 	g_main_thread = spdk_get_thread();
2437 
2438 	spdk_cpuset_zero(&g_all_cpuset);
2439 	SPDK_ENV_FOREACH_CORE(i) {
2440 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2441 	}
2442 
2443 	if (g_wait_for_tests) {
2444 		/* Do not perform any tests until RPC is received */
2445 		return;
2446 	}
2447 
2448 	bdevperf_construct_job_configs();
2449 }
2450 
2451 static void
2452 rpc_perform_tests_reset(void)
2453 {
2454 	/* Reset g_run_rc to 0 for the next test run. */
2455 	g_run_rc = 0;
2456 
2457 	/* Reset g_stats to 0 for the next test run. */
2458 	memset(&g_stats, 0, sizeof(g_stats));
2459 
2460 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2461 	g_show_performance_period_num = 0;
2462 }
2463 
2464 static void
2465 rpc_perform_tests_cb(void)
2466 {
2467 	struct spdk_json_write_ctx *w;
2468 	struct spdk_jsonrpc_request *request = g_request;
2469 
2470 	g_request = NULL;
2471 
2472 	if (g_run_rc == 0) {
2473 		w = spdk_jsonrpc_begin_result(request);
2474 		spdk_json_write_uint32(w, g_run_rc);
2475 		spdk_jsonrpc_end_result(request, w);
2476 	} else {
2477 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2478 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2479 	}
2480 
2481 	rpc_perform_tests_reset();
2482 }
2483 
2484 struct rpc_bdevperf_params {
2485 	int	time_in_sec;
2486 	char	*workload_type;
2487 	int	queue_depth;
2488 	char	*io_size;
2489 	int	rw_percentage;
2490 };
2491 
2492 static const struct spdk_json_object_decoder rpc_bdevperf_params_decoders[] = {
2493 	{"time_in_sec", offsetof(struct rpc_bdevperf_params, time_in_sec), spdk_json_decode_int32, true},
2494 	{"workload_type", offsetof(struct rpc_bdevperf_params, workload_type), spdk_json_decode_string, true},
2495 	{"queue_depth", offsetof(struct rpc_bdevperf_params, queue_depth), spdk_json_decode_int32, true},
2496 	{"io_size", offsetof(struct rpc_bdevperf_params, io_size), spdk_json_decode_string, true},
2497 	{"rw_percentage", offsetof(struct rpc_bdevperf_params, rw_percentage), spdk_json_decode_int32, true},
2498 };
2499 
2500 static void
2501 rpc_apply_bdevperf_params(struct rpc_bdevperf_params *params)
2502 {
2503 	if (params->workload_type) {
2504 		/* we need to clear previously settled parameter to avoid memory leak */
2505 		free(g_workload_type);
2506 		g_workload_type = strdup(params->workload_type);
2507 	}
2508 	if (params->queue_depth) {
2509 		g_queue_depth = params->queue_depth;
2510 	}
2511 	if (params->io_size) {
2512 		bdevperf_parse_arg('o', params->io_size);
2513 	}
2514 	if (params->time_in_sec) {
2515 		g_time_in_sec = params->time_in_sec;
2516 	}
2517 	if (params->rw_percentage) {
2518 		g_rw_percentage = params->rw_percentage;
2519 		g_mix_specified = true;
2520 	} else {
2521 		g_mix_specified = false;
2522 	}
2523 }
2524 
2525 static void
2526 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2527 {
2528 	struct rpc_bdevperf_params req = {}, backup = {};
2529 	int rc;
2530 
2531 	if (g_request != NULL) {
2532 		fprintf(stderr, "Another test is already in progress.\n");
2533 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2534 						 spdk_strerror(-EINPROGRESS));
2535 		return;
2536 	}
2537 
2538 	if (params) {
2539 		if (spdk_json_decode_object_relaxed(params, rpc_bdevperf_params_decoders,
2540 						    SPDK_COUNTOF(rpc_bdevperf_params_decoders),
2541 						    &req)) {
2542 			spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2543 							 "spdk_json_decode_object failed");
2544 			return;
2545 		}
2546 
2547 		if (g_workload_type) {
2548 			backup.workload_type = strdup(g_workload_type);
2549 		}
2550 		backup.queue_depth = g_queue_depth;
2551 		if (asprintf(&backup.io_size, "%d", g_io_size) < 0) {
2552 			fprintf(stderr, "Couldn't allocate memory for queue depth");
2553 			goto rpc_error;
2554 		}
2555 		backup.time_in_sec = g_time_in_sec;
2556 		backup.rw_percentage = g_rw_percentage;
2557 
2558 		rpc_apply_bdevperf_params(&req);
2559 
2560 		free(req.workload_type);
2561 		free(req.io_size);
2562 	}
2563 
2564 	rc = verify_test_params();
2565 
2566 	if (rc) {
2567 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2568 						 "Invalid parameters provided");
2569 		/* restore old params on error */
2570 		rpc_apply_bdevperf_params(&backup);
2571 		goto rpc_error;
2572 	}
2573 
2574 	g_request = request;
2575 
2576 	/* Only construct job configs at the first test run.  */
2577 	if (TAILQ_EMPTY(&job_config_list)) {
2578 		bdevperf_construct_job_configs();
2579 	} else {
2580 		bdevperf_construct_jobs();
2581 	}
2582 
2583 rpc_error:
2584 	free(backup.io_size);
2585 	free(backup.workload_type);
2586 }
2587 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2588 
2589 static void
2590 _bdevperf_job_drain(void *ctx)
2591 {
2592 	bdevperf_job_drain(ctx);
2593 }
2594 
2595 static void
2596 spdk_bdevperf_shutdown_cb(void)
2597 {
2598 	g_shutdown = true;
2599 	struct bdevperf_job *job, *tmp;
2600 
2601 	if (g_bdevperf.running_jobs == 0) {
2602 		bdevperf_test_done(NULL);
2603 		return;
2604 	}
2605 
2606 	/* Iterate jobs to stop all I/O */
2607 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2608 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2609 	}
2610 }
2611 
2612 static int
2613 bdevperf_parse_arg(int ch, char *arg)
2614 {
2615 	long long tmp;
2616 
2617 	if (ch == 'w') {
2618 		g_workload_type = strdup(arg);
2619 	} else if (ch == 'T') {
2620 		g_job_bdev_name = arg;
2621 	} else if (ch == 'z') {
2622 		g_wait_for_tests = true;
2623 	} else if (ch == 'Z') {
2624 		g_zcopy = true;
2625 	} else if (ch == 'X') {
2626 		g_abort = true;
2627 	} else if (ch == 'C') {
2628 		g_multithread_mode = true;
2629 	} else if (ch == 'f') {
2630 		g_continue_on_failure = true;
2631 	} else if (ch == 'j') {
2632 		g_bdevperf_conf_file = arg;
2633 	} else if (ch == 'F') {
2634 		char *endptr;
2635 
2636 		errno = 0;
2637 		g_zipf_theta = strtod(arg, &endptr);
2638 		if (errno || arg == endptr || g_zipf_theta < 0) {
2639 			fprintf(stderr, "Illegal zipf theta value %s\n", arg);
2640 			return -EINVAL;
2641 		}
2642 	} else if (ch == 'l') {
2643 		g_latency_display_level++;
2644 	} else if (ch == 'D') {
2645 		g_random_map = true;
2646 	} else if (ch == 'E') {
2647 		g_one_thread_per_lcore = true;
2648 	} else if (ch == 'J') {
2649 		g_rpc_log_file_name = arg;
2650 	} else if (ch == 'o') {
2651 		uint64_t size;
2652 
2653 		if (spdk_parse_capacity(arg, &size, NULL) != 0) {
2654 			fprintf(stderr, "Invalid IO size: %s\n", arg);
2655 			return -EINVAL;
2656 		}
2657 		g_io_size = (int)size;
2658 	} else if (ch == 'U') {
2659 		g_unique_writes = true;
2660 	} else {
2661 		tmp = spdk_strtoll(arg, 10);
2662 		if (tmp < 0) {
2663 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2664 			return tmp;
2665 		} else if (tmp >= INT_MAX) {
2666 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2667 			return -ERANGE;
2668 		}
2669 
2670 		switch (ch) {
2671 		case 'q':
2672 			g_queue_depth = tmp;
2673 			break;
2674 		case 't':
2675 			g_time_in_sec = tmp;
2676 			break;
2677 		case 'k':
2678 			g_timeout_in_sec = tmp;
2679 			break;
2680 		case 'M':
2681 			g_rw_percentage = tmp;
2682 			g_mix_specified = true;
2683 			break;
2684 		case 'P':
2685 			g_show_performance_ema_period = tmp;
2686 			break;
2687 		case 'S':
2688 			g_show_performance_real_time = 1;
2689 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2690 			break;
2691 		default:
2692 			return -EINVAL;
2693 		}
2694 	}
2695 	return 0;
2696 }
2697 
2698 static void
2699 bdevperf_usage(void)
2700 {
2701 	printf(" -q <depth>                io depth\n");
2702 	printf(" -o <size>                 io size in bytes\n");
2703 	printf(" -w <type>                 io pattern type, must be one of " PATTERN_TYPES_STR "\n");
2704 	printf(" -t <time>                 time in seconds\n");
2705 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2706 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2707 	printf(" -P <num>                  number of moving average period\n");
2708 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2709 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2710 	printf("\t\t(only valid with -S)\n");
2711 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2712 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2713 	printf(" -f                        continue processing I/O even after failures\n");
2714 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2715 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2716 	printf(" -z                        start bdevperf, but wait for perform_tests RPC to start tests\n");
2717 	printf("                           (See examples/bdev/bdevperf/bdevperf.py)\n");
2718 	printf(" -X                        abort timed out I/O\n");
2719 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2720 	printf(" -j <filename>             use job config file\n");
2721 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2722 	printf(" -D                        use a random map for picking offsets not previously read or written (for all jobs)\n");
2723 	printf(" -E                        share per lcore thread among jobs. Available only if -j is not used.\n");
2724 	printf(" -J                        File name to open with append mode and log JSON RPC calls.\n");
2725 	printf(" -U                        generate unique data for each write I/O, has no effect on non-write I/O\n");
2726 }
2727 
2728 static void
2729 bdevperf_fini(void)
2730 {
2731 	free_job_config();
2732 	free(g_workload_type);
2733 
2734 	if (g_rpc_log_file != NULL) {
2735 		fclose(g_rpc_log_file);
2736 		g_rpc_log_file = NULL;
2737 	}
2738 }
2739 
2740 static int
2741 verify_test_params(void)
2742 {
2743 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2744 		goto out;
2745 	}
2746 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2747 		goto out;
2748 	}
2749 	if (!g_bdevperf_conf_file && !g_workload_type) {
2750 		goto out;
2751 	}
2752 	if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
2753 		printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
2754 		goto out;
2755 	}
2756 	if (g_time_in_sec <= 0) {
2757 		goto out;
2758 	}
2759 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2760 
2761 	if (g_timeout_in_sec < 0) {
2762 		goto out;
2763 	}
2764 
2765 	if (g_abort && !g_timeout_in_sec) {
2766 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2767 	}
2768 
2769 	if (g_show_performance_ema_period > 0 &&
2770 	    g_show_performance_real_time == 0) {
2771 		fprintf(stderr, "-P option must be specified with -S option\n");
2772 		return 1;
2773 	}
2774 
2775 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2776 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2777 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2778 		printf("Zero copy mechanism will not be used.\n");
2779 		g_zcopy = false;
2780 	}
2781 
2782 	if (g_bdevperf_conf_file) {
2783 		/* workload_type verification happens during config file parsing */
2784 		return 0;
2785 	}
2786 
2787 	if (!strcmp(g_workload_type, "verify") ||
2788 	    !strcmp(g_workload_type, "reset")) {
2789 		g_rw_percentage = 50;
2790 		g_verify = true;
2791 		if (!strcmp(g_workload_type, "reset")) {
2792 			g_reset = true;
2793 		}
2794 	}
2795 
2796 	if (!strcmp(g_workload_type, "read") ||
2797 	    !strcmp(g_workload_type, "randread") ||
2798 	    !strcmp(g_workload_type, "write") ||
2799 	    !strcmp(g_workload_type, "randwrite") ||
2800 	    !strcmp(g_workload_type, "verify") ||
2801 	    !strcmp(g_workload_type, "reset") ||
2802 	    !strcmp(g_workload_type, "unmap") ||
2803 	    !strcmp(g_workload_type, "write_zeroes") ||
2804 	    !strcmp(g_workload_type, "flush")) {
2805 		if (g_mix_specified) {
2806 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2807 				" only when using rw or randrw.\n");
2808 		}
2809 	}
2810 
2811 	if (!strcmp(g_workload_type, "rw") ||
2812 	    !strcmp(g_workload_type, "randrw")) {
2813 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2814 			fprintf(stderr,
2815 				"-M must be specified to value from 0 to 100 "
2816 				"for rw or randrw.\n");
2817 			return 1;
2818 		}
2819 	}
2820 
2821 	if (strcmp(g_workload_type, "randread") &&
2822 	    strcmp(g_workload_type, "randwrite") &&
2823 	    strcmp(g_workload_type, "randrw")) {
2824 		if (g_random_map) {
2825 			fprintf(stderr, "Ignoring -D option... Please use -D option"
2826 				" only when using randread, randwrite or randrw.\n");
2827 			return 1;
2828 		}
2829 	}
2830 
2831 	return 0;
2832 out:
2833 	return 1;
2834 }
2835 
2836 int
2837 main(int argc, char **argv)
2838 {
2839 	struct spdk_app_opts opts = {};
2840 	int rc;
2841 
2842 	/* Use the runtime PID to set the random seed */
2843 	srand(getpid());
2844 
2845 	spdk_app_opts_init(&opts, sizeof(opts));
2846 	opts.name = "bdevperf";
2847 	opts.rpc_addr = NULL;
2848 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2849 
2850 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:J:M:P:S:T:Xlj:DU", NULL,
2851 				      bdevperf_parse_arg, bdevperf_usage)) !=
2852 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2853 		return rc;
2854 	}
2855 
2856 	/* Set the default address if no rpc_addr was provided in args
2857 	 * and RPC is used for starting tests */
2858 	if (g_wait_for_tests && opts.rpc_addr == NULL) {
2859 		opts.rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2860 	}
2861 
2862 	if (read_job_config()) {
2863 		bdevperf_fini();
2864 		return 1;
2865 	}
2866 
2867 	if (g_rpc_log_file != NULL) {
2868 		opts.rpc_log_file = g_rpc_log_file;
2869 	}
2870 
2871 	if (verify_test_params() != 0 && !g_wait_for_tests) {
2872 		spdk_app_usage();
2873 		bdevperf_usage();
2874 		bdevperf_fini();
2875 		exit(1);
2876 	}
2877 
2878 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
2879 
2880 	spdk_app_fini();
2881 	bdevperf_fini();
2882 	return rc;
2883 }
2884