xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision 15d04459cdb621d306f63e7eeb92ceb7c80972ee)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 
28 struct bdevperf_task {
29 	struct iovec			iov;
30 	struct bdevperf_job		*job;
31 	struct spdk_bdev_io		*bdev_io;
32 	void				*buf;
33 	void				*md_buf;
34 	uint64_t			offset_blocks;
35 	struct bdevperf_task		*task_to_abort;
36 	enum spdk_bdev_io_type		io_type;
37 	TAILQ_ENTRY(bdevperf_task)	link;
38 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
39 };
40 
41 static const char *g_workload_type = NULL;
42 static int g_io_size = 0;
43 /* initialize to invalid value so we can detect if user overrides it. */
44 static int g_rw_percentage = -1;
45 static bool g_verify = false;
46 static bool g_reset = false;
47 static bool g_continue_on_failure = false;
48 static bool g_abort = false;
49 static bool g_error_to_exit = false;
50 static int g_queue_depth = 0;
51 static uint64_t g_time_in_usec;
52 static int g_show_performance_real_time = 0;
53 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
54 static uint64_t g_show_performance_period_num = 0;
55 static uint64_t g_show_performance_ema_period = 0;
56 static int g_run_rc = 0;
57 static bool g_shutdown = false;
58 static uint64_t g_start_tsc;
59 static uint64_t g_shutdown_tsc;
60 static bool g_zcopy = false;
61 static struct spdk_thread *g_main_thread;
62 static int g_time_in_sec = 0;
63 static bool g_mix_specified = false;
64 static const char *g_job_bdev_name;
65 static bool g_wait_for_tests = false;
66 static struct spdk_jsonrpc_request *g_request = NULL;
67 static bool g_multithread_mode = false;
68 static int g_timeout_in_sec;
69 static struct spdk_conf *g_bdevperf_conf = NULL;
70 static const char *g_bdevperf_conf_file = NULL;
71 static double g_zipf_theta;
72 static bool g_random_map = false;
73 
74 static struct spdk_cpuset g_all_cpuset;
75 static struct spdk_poller *g_perf_timer = NULL;
76 
77 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
78 static void rpc_perform_tests_cb(void);
79 
80 static uint32_t g_bdev_count = 0;
81 static uint32_t g_latency_display_level;
82 
83 static bool g_one_thread_per_lcore = false;
84 
85 static const double g_latency_cutoffs[] = {
86 	0.01,
87 	0.10,
88 	0.25,
89 	0.50,
90 	0.75,
91 	0.90,
92 	0.95,
93 	0.98,
94 	0.99,
95 	0.995,
96 	0.999,
97 	0.9999,
98 	0.99999,
99 	0.999999,
100 	0.9999999,
101 	-1,
102 };
103 
104 static const char *g_rpc_log_file_name = NULL;
105 static FILE *g_rpc_log_file = NULL;
106 
107 struct latency_info {
108 	uint64_t	min;
109 	uint64_t	max;
110 	uint64_t	total;
111 };
112 
113 struct bdevperf_job {
114 	char				*name;
115 	struct spdk_bdev		*bdev;
116 	struct spdk_bdev_desc		*bdev_desc;
117 	struct spdk_io_channel		*ch;
118 	TAILQ_ENTRY(bdevperf_job)	link;
119 	struct spdk_thread		*thread;
120 
121 	const char			*workload_type;
122 	int				io_size;
123 	int				rw_percentage;
124 	bool				is_random;
125 	bool				verify;
126 	bool				reset;
127 	bool				continue_on_failure;
128 	bool				unmap;
129 	bool				write_zeroes;
130 	bool				flush;
131 	bool				abort;
132 	int				queue_depth;
133 	unsigned int			seed;
134 
135 	uint64_t			io_completed;
136 	uint64_t			io_failed;
137 	uint64_t			io_timeout;
138 	uint64_t			prev_io_completed;
139 	double				ema_io_per_second;
140 	int				current_queue_depth;
141 	uint64_t			size_in_ios;
142 	uint64_t			ios_base;
143 	uint64_t			offset_in_ios;
144 	uint64_t			io_size_blocks;
145 	uint64_t			buf_size;
146 	uint32_t			dif_check_flags;
147 	bool				is_draining;
148 	struct spdk_poller		*run_timer;
149 	struct spdk_poller		*reset_timer;
150 	struct spdk_bit_array		*outstanding;
151 	struct spdk_zipf		*zipf;
152 	TAILQ_HEAD(, bdevperf_task)	task_list;
153 	uint64_t			run_time_in_usec;
154 
155 	/* keep channel's histogram data before being destroyed */
156 	struct spdk_histogram_data	*histogram;
157 	struct spdk_bit_array		*random_map;
158 };
159 
160 struct spdk_bdevperf {
161 	TAILQ_HEAD(, bdevperf_job)	jobs;
162 	uint32_t			running_jobs;
163 };
164 
165 static struct spdk_bdevperf g_bdevperf = {
166 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
167 	.running_jobs = 0,
168 };
169 
170 enum job_config_rw {
171 	JOB_CONFIG_RW_READ = 0,
172 	JOB_CONFIG_RW_WRITE,
173 	JOB_CONFIG_RW_RANDREAD,
174 	JOB_CONFIG_RW_RANDWRITE,
175 	JOB_CONFIG_RW_RW,
176 	JOB_CONFIG_RW_RANDRW,
177 	JOB_CONFIG_RW_VERIFY,
178 	JOB_CONFIG_RW_RESET,
179 	JOB_CONFIG_RW_UNMAP,
180 	JOB_CONFIG_RW_FLUSH,
181 	JOB_CONFIG_RW_WRITE_ZEROES,
182 };
183 
184 /* Storing values from a section of job config file */
185 struct job_config {
186 	const char			*name;
187 	const char			*filename;
188 	struct spdk_cpuset		cpumask;
189 	int				bs;
190 	int				iodepth;
191 	int				rwmixread;
192 	uint32_t			lcore;
193 	int64_t				offset;
194 	uint64_t			length;
195 	enum job_config_rw		rw;
196 	TAILQ_ENTRY(job_config)	link;
197 };
198 
199 TAILQ_HEAD(, job_config) job_config_list
200 	= TAILQ_HEAD_INITIALIZER(job_config_list);
201 
202 static bool g_performance_dump_active = false;
203 
204 struct bdevperf_aggregate_stats {
205 	struct bdevperf_job		*current_job;
206 	uint64_t			io_time_in_usec;
207 	uint64_t			ema_period;
208 	double				total_io_per_second;
209 	double				total_mb_per_second;
210 	double				total_failed_per_second;
211 	double				total_timeout_per_second;
212 	double				min_latency;
213 	double				max_latency;
214 	uint64_t			total_io_completed;
215 	uint64_t			total_tsc;
216 };
217 
218 static struct bdevperf_aggregate_stats g_stats = {.min_latency = (double)UINT64_MAX};
219 
220 struct lcore_thread {
221 	struct spdk_thread		*thread;
222 	uint32_t			lcore;
223 	TAILQ_ENTRY(lcore_thread)	link;
224 };
225 
226 TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
227 	= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
228 
229 /*
230  * Cumulative Moving Average (CMA): average of all data up to current
231  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
232  * Simple Moving Average (SMA): unweighted mean of the previous n data
233  *
234  * Bdevperf supports CMA and EMA.
235  */
236 static double
237 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
238 {
239 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
240 }
241 
242 static double
243 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
244 {
245 	double io_completed, io_per_second;
246 
247 	io_completed = job->io_completed;
248 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
249 			/ g_show_performance_period_in_usec;
250 	job->prev_io_completed = io_completed;
251 
252 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
253 				  / (ema_period + 1);
254 	return job->ema_io_per_second;
255 }
256 
257 static void
258 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
259 		uint64_t total, uint64_t so_far)
260 {
261 	struct latency_info *latency_info = ctx;
262 
263 	if (count == 0) {
264 		return;
265 	}
266 
267 	latency_info->total += (start + end) / 2 * count;
268 
269 	if (so_far == count) {
270 		latency_info->min = start;
271 	}
272 
273 	if (so_far == total) {
274 		latency_info->max = end;
275 	}
276 }
277 
278 static void
279 performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job *job)
280 {
281 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
282 	double average_latency = 0.0, min_latency, max_latency;
283 	uint64_t time_in_usec;
284 	uint64_t tsc_rate;
285 	uint64_t total_io;
286 	struct latency_info latency_info = {};
287 
288 	printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
289 	       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
290 
291 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
292 		printf("\r Job: %s ended in about %.2f seconds with error\n",
293 		       job->name, (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
294 	}
295 	if (job->verify) {
296 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
297 		       job->ios_base, job->size_in_ios);
298 	}
299 
300 	if (g_performance_dump_active == true) {
301 		/* Use job's actual run time as Job has ended */
302 		if (job->io_failed > 0 && !job->continue_on_failure) {
303 			time_in_usec = job->run_time_in_usec;
304 		} else {
305 			time_in_usec = stats->io_time_in_usec;
306 		}
307 	} else {
308 		time_in_usec = job->run_time_in_usec;
309 	}
310 
311 	if (stats->ema_period == 0) {
312 		io_per_second = get_cma_io_per_second(job, time_in_usec);
313 	} else {
314 		io_per_second = get_ema_io_per_second(job, stats->ema_period);
315 	}
316 
317 	tsc_rate = spdk_get_ticks_hz();
318 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
319 
320 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
321 
322 	total_io = job->io_completed + job->io_failed;
323 	if (total_io != 0) {
324 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
325 	}
326 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
327 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
328 
329 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
330 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
331 
332 	printf("\t %-20s: %10.2f %10.2f %10.2f",
333 	       job->name, (float)time_in_usec / SPDK_SEC_TO_USEC, io_per_second, mb_per_second);
334 	printf(" %10.2f %8.2f",
335 	       failed_per_second, timeout_per_second);
336 	printf(" %10.2f %10.2f %10.2f\n",
337 	       average_latency, min_latency, max_latency);
338 
339 	stats->total_io_per_second += io_per_second;
340 	stats->total_mb_per_second += mb_per_second;
341 	stats->total_failed_per_second += failed_per_second;
342 	stats->total_timeout_per_second += timeout_per_second;
343 	stats->total_io_completed += job->io_completed + job->io_failed;
344 	stats->total_tsc += latency_info.total;
345 	if (min_latency < stats->min_latency) {
346 		stats->min_latency = min_latency;
347 	}
348 	if (max_latency > stats->max_latency) {
349 		stats->max_latency = max_latency;
350 	}
351 }
352 
353 static void
354 generate_data(void *buf, int buf_len, int block_size, void *md_buf, int md_size,
355 	      int num_blocks)
356 {
357 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
358 
359 	if (buf_len < num_blocks * block_size) {
360 		return;
361 	}
362 
363 	if (md_buf == NULL) {
364 		data_block_size = block_size - md_size;
365 		md_buf = (char *)buf + data_block_size;
366 		md_offset = block_size;
367 	} else {
368 		data_block_size = block_size;
369 		md_offset = md_size;
370 	}
371 
372 	while (offset_blocks < num_blocks) {
373 		inner_offset = 0;
374 		while (inner_offset < data_block_size) {
375 			*(uint32_t *)buf = offset_blocks + inner_offset;
376 			inner_offset += sizeof(uint32_t);
377 			buf += sizeof(uint32_t);
378 		}
379 		memset(md_buf, offset_blocks, md_size);
380 		md_buf += md_offset;
381 		offset_blocks++;
382 	}
383 }
384 
385 static bool
386 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
387 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
388 {
389 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
390 		return false;
391 	}
392 
393 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
394 
395 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
396 
397 	if (wr_md_buf != NULL) {
398 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
399 	}
400 
401 	return true;
402 }
403 
404 static bool
405 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
406 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
407 {
408 	int offset_blocks = 0, md_offset, data_block_size;
409 
410 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
411 		return false;
412 	}
413 
414 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
415 
416 	if (wr_md_buf == NULL) {
417 		data_block_size = block_size - md_size;
418 		wr_md_buf = (char *)wr_buf + data_block_size;
419 		rd_md_buf = (char *)rd_buf + data_block_size;
420 		md_offset = block_size;
421 	} else {
422 		data_block_size = block_size;
423 		md_offset = md_size;
424 	}
425 
426 	while (offset_blocks < num_blocks) {
427 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
428 			return false;
429 		}
430 
431 		wr_buf += block_size;
432 		rd_buf += block_size;
433 
434 		if (md_check) {
435 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
436 				return false;
437 			}
438 
439 			wr_md_buf += md_offset;
440 			rd_md_buf += md_offset;
441 		}
442 
443 		offset_blocks++;
444 	}
445 
446 	return true;
447 }
448 
449 static void
450 free_job_config(void)
451 {
452 	struct job_config *config, *tmp;
453 
454 	spdk_conf_free(g_bdevperf_conf);
455 	g_bdevperf_conf = NULL;
456 
457 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
458 		TAILQ_REMOVE(&job_config_list, config, link);
459 		free(config);
460 	}
461 }
462 
463 static void
464 bdevperf_job_free(struct bdevperf_job *job)
465 {
466 	spdk_histogram_data_free(job->histogram);
467 	spdk_bit_array_free(&job->outstanding);
468 	spdk_bit_array_free(&job->random_map);
469 	spdk_zipf_free(&job->zipf);
470 	free(job->name);
471 	free(job);
472 }
473 
474 static void
475 job_thread_exit(void *ctx)
476 {
477 	spdk_thread_exit(spdk_get_thread());
478 }
479 
480 static void
481 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
482 	     uint64_t total, uint64_t so_far)
483 {
484 	double so_far_pct;
485 	double **cutoff = ctx;
486 	uint64_t tsc_rate;
487 
488 	if (count == 0) {
489 		return;
490 	}
491 
492 	tsc_rate = spdk_get_ticks_hz();
493 	so_far_pct = (double)so_far / total;
494 	while (so_far_pct >= **cutoff && **cutoff > 0) {
495 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
496 		(*cutoff)++;
497 	}
498 }
499 
500 static void
501 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
502 	     uint64_t total, uint64_t so_far)
503 {
504 	double so_far_pct;
505 	uint64_t tsc_rate;
506 
507 	if (count == 0) {
508 		return;
509 	}
510 
511 	tsc_rate = spdk_get_ticks_hz();
512 	so_far_pct = (double)so_far * 100 / total;
513 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
514 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
515 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
516 	       so_far_pct, count);
517 }
518 
519 static void
520 bdevperf_test_done(void *ctx)
521 {
522 	struct bdevperf_job *job, *jtmp;
523 	struct bdevperf_task *task, *ttmp;
524 	struct lcore_thread *lthread, *lttmp;
525 	double average_latency = 0.0;
526 	uint64_t time_in_usec;
527 	int rc;
528 
529 	if (g_time_in_usec) {
530 		g_stats.io_time_in_usec = g_time_in_usec;
531 
532 		if (!g_run_rc && g_performance_dump_active) {
533 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
534 			return;
535 		}
536 	}
537 
538 	if (g_show_performance_real_time) {
539 		spdk_poller_unregister(&g_perf_timer);
540 	}
541 
542 	if (g_shutdown) {
543 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
544 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
545 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
546 		printf("Received shutdown signal, test time was about %.6f seconds\n",
547 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
548 	}
549 
550 	printf("\n%*s\n", 107, "Latency(us)");
551 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
552 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
553 
554 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
555 		performance_dump_job(&g_stats, job);
556 	}
557 
558 	printf("\r =================================================================================="
559 	       "=================================\n");
560 	printf("\r %-28s: %10s %10.2f %10.2f",
561 	       "Total", "", g_stats.total_io_per_second, g_stats.total_mb_per_second);
562 	printf(" %10.2f %8.2f",
563 	       g_stats.total_failed_per_second, g_stats.total_timeout_per_second);
564 
565 	if (g_stats.total_io_completed != 0) {
566 		average_latency = ((double)g_stats.total_tsc / g_stats.total_io_completed) * SPDK_SEC_TO_USEC /
567 				  spdk_get_ticks_hz();
568 	}
569 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.min_latency, g_stats.max_latency);
570 
571 	fflush(stdout);
572 
573 	if (g_latency_display_level == 0 || g_stats.total_io_completed == 0) {
574 		goto clean;
575 	}
576 
577 	printf("\n Latency summary\n");
578 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
579 		printf("\r =============================================\n");
580 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
581 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
582 
583 		const double *cutoff = g_latency_cutoffs;
584 
585 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
586 
587 		printf("\n");
588 	}
589 
590 	if (g_latency_display_level == 1) {
591 		goto clean;
592 	}
593 
594 	printf("\r Latency histogram\n");
595 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
596 		printf("\r =============================================\n");
597 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
598 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
599 
600 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
601 		printf("\n");
602 	}
603 
604 clean:
605 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
606 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
607 
608 		if (!g_one_thread_per_lcore) {
609 			spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
610 		}
611 
612 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
613 			TAILQ_REMOVE(&job->task_list, task, link);
614 			spdk_free(task->buf);
615 			spdk_free(task->md_buf);
616 			free(task);
617 		}
618 
619 		bdevperf_job_free(job);
620 	}
621 
622 	if (g_one_thread_per_lcore) {
623 		TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
624 			TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
625 			spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
626 			free(lthread);
627 		}
628 	}
629 
630 	rc = g_run_rc;
631 	if (g_request && !g_shutdown) {
632 		rpc_perform_tests_cb();
633 		if (rc != 0) {
634 			spdk_app_stop(rc);
635 		}
636 	} else {
637 		spdk_app_stop(rc);
638 	}
639 }
640 
641 static void
642 bdevperf_job_end(void *ctx)
643 {
644 	assert(g_main_thread == spdk_get_thread());
645 
646 	if (--g_bdevperf.running_jobs == 0) {
647 		bdevperf_test_done(NULL);
648 	}
649 }
650 
651 static void
652 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
653 {
654 	struct spdk_histogram_data *job_hist = cb_arg;
655 
656 	if (status == 0) {
657 		spdk_histogram_data_merge(job_hist, histogram);
658 	}
659 }
660 
661 static void
662 bdevperf_job_empty(struct bdevperf_job *job)
663 {
664 	uint64_t end_tsc = 0;
665 
666 	end_tsc = spdk_get_ticks() - g_start_tsc;
667 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
668 	/* keep histogram info before channel is destroyed */
669 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
670 					job->histogram);
671 	spdk_put_io_channel(job->ch);
672 	spdk_bdev_close(job->bdev_desc);
673 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
674 }
675 
676 static void
677 bdevperf_end_task(struct bdevperf_task *task)
678 {
679 	struct bdevperf_job     *job = task->job;
680 
681 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
682 	if (job->is_draining) {
683 		if (job->current_queue_depth == 0) {
684 			bdevperf_job_empty(job);
685 		}
686 	}
687 }
688 
689 static void
690 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
691 {
692 	struct bdevperf_job	*job = task->job;
693 
694 	task->bdev_io_wait.bdev = job->bdev;
695 	task->bdev_io_wait.cb_fn = cb_fn;
696 	task->bdev_io_wait.cb_arg = task;
697 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
698 }
699 
700 static int
701 bdevperf_job_drain(void *ctx)
702 {
703 	struct bdevperf_job *job = ctx;
704 
705 	spdk_poller_unregister(&job->run_timer);
706 	if (job->reset) {
707 		spdk_poller_unregister(&job->reset_timer);
708 	}
709 
710 	job->is_draining = true;
711 
712 	return -1;
713 }
714 
715 static int
716 bdevperf_job_drain_timer(void *ctx)
717 {
718 	struct bdevperf_job *job = ctx;
719 
720 	bdevperf_job_drain(ctx);
721 	if (job->current_queue_depth == 0) {
722 		bdevperf_job_empty(job);
723 	}
724 
725 	return SPDK_POLLER_BUSY;
726 }
727 
728 static void
729 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
730 {
731 	struct bdevperf_task	*task = cb_arg;
732 	struct bdevperf_job	*job = task->job;
733 
734 	job->current_queue_depth--;
735 
736 	if (success) {
737 		job->io_completed++;
738 	} else {
739 		job->io_failed++;
740 		if (!job->continue_on_failure) {
741 			bdevperf_job_drain(job);
742 			g_run_rc = -1;
743 		}
744 	}
745 
746 	spdk_bdev_free_io(bdev_io);
747 	bdevperf_end_task(task);
748 }
749 
750 static int
751 bdevperf_verify_dif(struct bdevperf_task *task, struct iovec *iovs, int iovcnt)
752 {
753 	struct bdevperf_job	*job = task->job;
754 	struct spdk_bdev	*bdev = job->bdev;
755 	struct spdk_dif_ctx	dif_ctx;
756 	struct spdk_dif_error	err_blk = {};
757 	int			rc;
758 	struct spdk_dif_ctx_init_ext_opts dif_opts;
759 
760 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
761 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
762 	rc = spdk_dif_ctx_init(&dif_ctx,
763 			       spdk_bdev_get_block_size(bdev),
764 			       spdk_bdev_get_md_size(bdev),
765 			       spdk_bdev_is_md_interleaved(bdev),
766 			       spdk_bdev_is_dif_head_of_md(bdev),
767 			       spdk_bdev_get_dif_type(bdev),
768 			       job->dif_check_flags,
769 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
770 	if (rc != 0) {
771 		fprintf(stderr, "Initialization of DIF context failed\n");
772 		return rc;
773 	}
774 
775 	if (spdk_bdev_is_md_interleaved(bdev)) {
776 		rc = spdk_dif_verify(iovs, iovcnt, job->io_size_blocks, &dif_ctx, &err_blk);
777 	} else {
778 		struct iovec md_iov = {
779 			.iov_base	= task->md_buf,
780 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
781 		};
782 
783 		rc = spdk_dix_verify(iovs, iovcnt, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
784 	}
785 
786 	if (rc != 0) {
787 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
788 			err_blk.err_type, err_blk.err_offset);
789 	}
790 
791 	return rc;
792 }
793 
794 static void
795 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
796 {
797 	struct bdevperf_job	*job;
798 	struct bdevperf_task	*task = cb_arg;
799 	struct iovec		*iovs;
800 	int			iovcnt;
801 	bool			md_check;
802 	uint64_t		offset_in_ios;
803 	int			rc;
804 
805 	job = task->job;
806 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
807 
808 	if (g_error_to_exit == true) {
809 		bdevperf_job_drain(job);
810 	} else if (!success) {
811 		if (!job->reset && !job->continue_on_failure) {
812 			bdevperf_job_drain(job);
813 			g_run_rc = -1;
814 			g_error_to_exit = true;
815 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
816 			       task->offset_blocks, job->name);
817 		}
818 	} else if (job->verify || job->reset) {
819 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
820 		assert(iovcnt == 1);
821 		assert(iovs != NULL);
822 		if (!verify_data(task->buf, job->buf_size, iovs[0].iov_base, iovs[0].iov_len,
823 				 spdk_bdev_get_block_size(job->bdev),
824 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
825 				 spdk_bdev_get_md_size(job->bdev),
826 				 job->io_size_blocks, md_check)) {
827 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
828 			printf("   First dword expected 0x%x got 0x%x\n", *(int *)task->buf, *(int *)iovs[0].iov_base);
829 			bdevperf_job_drain(job);
830 			g_run_rc = -1;
831 		}
832 	} else if (job->dif_check_flags != 0) {
833 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
834 			spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
835 			assert(iovcnt == 1);
836 			assert(iovs != NULL);
837 			rc = bdevperf_verify_dif(task, iovs, iovcnt);
838 			if (rc != 0) {
839 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
840 				       task->offset_blocks, job->name);
841 
842 				success = false;
843 				if (!job->reset && !job->continue_on_failure) {
844 					bdevperf_job_drain(job);
845 					g_run_rc = -1;
846 					g_error_to_exit = true;
847 				}
848 			}
849 		}
850 	}
851 
852 	job->current_queue_depth--;
853 
854 	if (success) {
855 		job->io_completed++;
856 	} else {
857 		job->io_failed++;
858 	}
859 
860 	if (job->verify) {
861 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
862 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
863 
864 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
865 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
866 	}
867 
868 	spdk_bdev_free_io(bdev_io);
869 
870 	/*
871 	 * is_draining indicates when time has expired for the test run
872 	 * and we are just waiting for the previously submitted I/O
873 	 * to complete.  In this case, do not submit a new I/O to replace
874 	 * the one just completed.
875 	 */
876 	if (!job->is_draining) {
877 		bdevperf_submit_single(job, task);
878 	} else {
879 		bdevperf_end_task(task);
880 	}
881 }
882 
883 static void
884 bdevperf_verify_submit_read(void *cb_arg)
885 {
886 	struct bdevperf_job	*job;
887 	struct bdevperf_task	*task = cb_arg;
888 	int			rc;
889 
890 	job = task->job;
891 
892 	/* Read the data back in */
893 	rc = spdk_bdev_read_blocks_with_md(job->bdev_desc, job->ch, NULL, NULL,
894 					   task->offset_blocks, job->io_size_blocks,
895 					   bdevperf_complete, task);
896 
897 	if (rc == -ENOMEM) {
898 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
899 	} else if (rc != 0) {
900 		printf("Failed to submit read: %d\n", rc);
901 		bdevperf_job_drain(job);
902 		g_run_rc = rc;
903 	}
904 }
905 
906 static void
907 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
908 			       void *cb_arg)
909 {
910 	if (success) {
911 		spdk_bdev_free_io(bdev_io);
912 		bdevperf_verify_submit_read(cb_arg);
913 	} else {
914 		bdevperf_complete(bdev_io, success, cb_arg);
915 	}
916 }
917 
918 static void
919 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
920 {
921 	if (!success) {
922 		bdevperf_complete(bdev_io, success, cb_arg);
923 		return;
924 	}
925 
926 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
927 }
928 
929 static int
930 bdevperf_generate_dif(struct bdevperf_task *task)
931 {
932 	struct bdevperf_job	*job = task->job;
933 	struct spdk_bdev	*bdev = job->bdev;
934 	struct spdk_dif_ctx	dif_ctx;
935 	int			rc;
936 	struct spdk_dif_ctx_init_ext_opts dif_opts;
937 
938 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
939 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
940 	rc = spdk_dif_ctx_init(&dif_ctx,
941 			       spdk_bdev_get_block_size(bdev),
942 			       spdk_bdev_get_md_size(bdev),
943 			       spdk_bdev_is_md_interleaved(bdev),
944 			       spdk_bdev_is_dif_head_of_md(bdev),
945 			       spdk_bdev_get_dif_type(bdev),
946 			       job->dif_check_flags,
947 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
948 	if (rc != 0) {
949 		fprintf(stderr, "Initialization of DIF context failed\n");
950 		return rc;
951 	}
952 
953 	if (spdk_bdev_is_md_interleaved(bdev)) {
954 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
955 	} else {
956 		struct iovec md_iov = {
957 			.iov_base	= task->md_buf,
958 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
959 		};
960 
961 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
962 	}
963 
964 	if (rc != 0) {
965 		fprintf(stderr, "Generation of DIF/DIX failed\n");
966 	}
967 
968 	return rc;
969 }
970 
971 static void
972 bdevperf_submit_task(void *arg)
973 {
974 	struct bdevperf_task	*task = arg;
975 	struct bdevperf_job	*job = task->job;
976 	struct spdk_bdev_desc	*desc;
977 	struct spdk_io_channel	*ch;
978 	spdk_bdev_io_completion_cb cb_fn;
979 	uint64_t		offset_in_ios;
980 	int			rc = 0;
981 
982 	desc = job->bdev_desc;
983 	ch = job->ch;
984 
985 	switch (task->io_type) {
986 	case SPDK_BDEV_IO_TYPE_WRITE:
987 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
988 			rc = bdevperf_generate_dif(task);
989 		}
990 		if (rc == 0) {
991 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
992 
993 			if (g_zcopy) {
994 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
995 				return;
996 			} else {
997 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
998 								     task->md_buf,
999 								     task->offset_blocks,
1000 								     job->io_size_blocks,
1001 								     cb_fn, task);
1002 			}
1003 		}
1004 		break;
1005 	case SPDK_BDEV_IO_TYPE_FLUSH:
1006 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
1007 					    job->io_size_blocks, bdevperf_complete, task);
1008 		break;
1009 	case SPDK_BDEV_IO_TYPE_UNMAP:
1010 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
1011 					    job->io_size_blocks, bdevperf_complete, task);
1012 		break;
1013 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1014 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
1015 						   job->io_size_blocks, bdevperf_complete, task);
1016 		break;
1017 	case SPDK_BDEV_IO_TYPE_READ:
1018 		if (g_zcopy) {
1019 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
1020 						   true, bdevperf_zcopy_populate_complete, task);
1021 		} else {
1022 			rc = spdk_bdev_read_blocks_with_md(desc, ch, task->buf, task->md_buf,
1023 							   task->offset_blocks,
1024 							   job->io_size_blocks,
1025 							   bdevperf_complete, task);
1026 		}
1027 		break;
1028 	case SPDK_BDEV_IO_TYPE_ABORT:
1029 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
1030 		break;
1031 	default:
1032 		assert(false);
1033 		rc = -EINVAL;
1034 		break;
1035 	}
1036 
1037 	if (rc == -ENOMEM) {
1038 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1039 		return;
1040 	} else if (rc != 0) {
1041 		printf("Failed to submit bdev_io: %d\n", rc);
1042 		if (job->verify) {
1043 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1044 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1045 
1046 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1047 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1048 		}
1049 		bdevperf_job_drain(job);
1050 		g_run_rc = rc;
1051 		return;
1052 	}
1053 
1054 	job->current_queue_depth++;
1055 }
1056 
1057 static void
1058 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1059 {
1060 	struct bdevperf_task	*task = cb_arg;
1061 	struct bdevperf_job	*job = task->job;
1062 	struct iovec		*iovs;
1063 	int			iovcnt;
1064 
1065 	if (!success) {
1066 		bdevperf_job_drain(job);
1067 		g_run_rc = -1;
1068 		return;
1069 	}
1070 
1071 	task->bdev_io = bdev_io;
1072 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1073 
1074 	if (job->verify || job->reset) {
1075 		/* When job->verify or job->reset is enabled, task->buf is used for
1076 		 *  verification of read after write.  For write I/O, when zcopy APIs
1077 		 *  are used, task->buf cannot be used, and data must be written to
1078 		 *  the data buffer allocated underneath bdev layer instead.
1079 		 *  Hence we copy task->buf to the allocated data buffer here.
1080 		 */
1081 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1082 		assert(iovcnt == 1);
1083 		assert(iovs != NULL);
1084 
1085 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1086 			  spdk_bdev_get_block_size(job->bdev),
1087 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1088 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
1089 	}
1090 
1091 	bdevperf_submit_task(task);
1092 }
1093 
1094 static void
1095 bdevperf_prep_zcopy_write_task(void *arg)
1096 {
1097 	struct bdevperf_task	*task = arg;
1098 	struct bdevperf_job	*job = task->job;
1099 	int			rc;
1100 
1101 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1102 				   task->offset_blocks, job->io_size_blocks,
1103 				   false, bdevperf_zcopy_get_buf_complete, task);
1104 	if (rc != 0) {
1105 		assert(rc == -ENOMEM);
1106 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1107 		return;
1108 	}
1109 
1110 	job->current_queue_depth++;
1111 }
1112 
1113 static struct bdevperf_task *
1114 bdevperf_job_get_task(struct bdevperf_job *job)
1115 {
1116 	struct bdevperf_task *task;
1117 
1118 	task = TAILQ_FIRST(&job->task_list);
1119 	if (!task) {
1120 		printf("Task allocation failed\n");
1121 		abort();
1122 	}
1123 
1124 	TAILQ_REMOVE(&job->task_list, task, link);
1125 	return task;
1126 }
1127 
1128 static void
1129 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1130 {
1131 	uint64_t offset_in_ios;
1132 	uint64_t rand_value;
1133 	uint32_t first_clear;
1134 
1135 	if (job->zipf) {
1136 		offset_in_ios = spdk_zipf_generate(job->zipf);
1137 	} else if (job->is_random) {
1138 		/* RAND_MAX is only INT32_MAX, so use 2 calls to rand_r to
1139 		 * get a large enough value to ensure we are issuing I/O
1140 		 * uniformly across the whole bdev.
1141 		 */
1142 		rand_value = (uint64_t)rand_r(&job->seed) * RAND_MAX + rand_r(&job->seed);
1143 		offset_in_ios = rand_value % job->size_in_ios;
1144 
1145 		if (g_random_map) {
1146 			/* Make sure, that the offset does not exceed the maximum size
1147 			 * of the bit array (verified during job creation)
1148 			 */
1149 			assert(offset_in_ios < UINT32_MAX);
1150 
1151 			first_clear = spdk_bit_array_find_first_clear(job->random_map, (uint32_t)offset_in_ios);
1152 
1153 			if (first_clear == UINT32_MAX) {
1154 				first_clear = spdk_bit_array_find_first_clear(job->random_map, 0);
1155 
1156 				if (first_clear == UINT32_MAX) {
1157 					/* If there are no more clear bits in the array, we start over
1158 					 * and select the previously selected random value.
1159 					 */
1160 					spdk_bit_array_clear_mask(job->random_map);
1161 					first_clear = (uint32_t)offset_in_ios;
1162 				}
1163 			}
1164 
1165 			spdk_bit_array_set(job->random_map, first_clear);
1166 
1167 			offset_in_ios = first_clear;
1168 		}
1169 	} else {
1170 		offset_in_ios = job->offset_in_ios++;
1171 		if (job->offset_in_ios == job->size_in_ios) {
1172 			job->offset_in_ios = 0;
1173 		}
1174 
1175 		/* Increment of offset_in_ios if there's already an outstanding IO
1176 		 * to that location. We only need this with job->verify as random
1177 		 * offsets are not supported with job->verify at this time.
1178 		 */
1179 		if (job->verify) {
1180 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1181 
1182 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1183 				offset_in_ios = job->offset_in_ios++;
1184 				if (job->offset_in_ios == job->size_in_ios) {
1185 					job->offset_in_ios = 0;
1186 				}
1187 			}
1188 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1189 		}
1190 	}
1191 
1192 	/* For multi-thread to same job, offset_in_ios is relative
1193 	 * to the LBA range assigned for that job. job->offset_blocks
1194 	 * is absolute (entire bdev LBA range).
1195 	 */
1196 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1197 
1198 	if (job->verify || job->reset) {
1199 		generate_data(task->buf, job->buf_size,
1200 			      spdk_bdev_get_block_size(job->bdev),
1201 			      task->md_buf, spdk_bdev_get_md_size(job->bdev),
1202 			      job->io_size_blocks);
1203 		if (g_zcopy) {
1204 			bdevperf_prep_zcopy_write_task(task);
1205 			return;
1206 		} else {
1207 			task->iov.iov_base = task->buf;
1208 			task->iov.iov_len = job->buf_size;
1209 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1210 		}
1211 	} else if (job->flush) {
1212 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1213 	} else if (job->unmap) {
1214 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1215 	} else if (job->write_zeroes) {
1216 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1217 	} else if ((job->rw_percentage == 100) ||
1218 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1219 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1220 	} else {
1221 		if (g_zcopy) {
1222 			bdevperf_prep_zcopy_write_task(task);
1223 			return;
1224 		} else {
1225 			task->iov.iov_base = task->buf;
1226 			task->iov.iov_len = job->buf_size;
1227 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1228 		}
1229 	}
1230 
1231 	bdevperf_submit_task(task);
1232 }
1233 
1234 static int reset_job(void *arg);
1235 
1236 static void
1237 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1238 {
1239 	struct bdevperf_task	*task = cb_arg;
1240 	struct bdevperf_job	*job = task->job;
1241 
1242 	if (!success) {
1243 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1244 		bdevperf_job_drain(job);
1245 		g_run_rc = -1;
1246 	}
1247 
1248 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1249 	spdk_bdev_free_io(bdev_io);
1250 
1251 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1252 						10 * SPDK_SEC_TO_USEC);
1253 }
1254 
1255 static int
1256 reset_job(void *arg)
1257 {
1258 	struct bdevperf_job *job = arg;
1259 	struct bdevperf_task *task;
1260 	int rc;
1261 
1262 	spdk_poller_unregister(&job->reset_timer);
1263 
1264 	/* Do reset. */
1265 	task = bdevperf_job_get_task(job);
1266 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1267 			     reset_cb, task);
1268 	if (rc) {
1269 		printf("Reset failed: %d\n", rc);
1270 		bdevperf_job_drain(job);
1271 		g_run_rc = -1;
1272 	}
1273 
1274 	return -1;
1275 }
1276 
1277 static void
1278 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1279 {
1280 	struct bdevperf_job *job = cb_arg;
1281 	struct bdevperf_task *task;
1282 
1283 	job->io_timeout++;
1284 
1285 	if (job->is_draining || !job->abort ||
1286 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1287 		return;
1288 	}
1289 
1290 	task = bdevperf_job_get_task(job);
1291 	if (task == NULL) {
1292 		return;
1293 	}
1294 
1295 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1296 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1297 
1298 	bdevperf_submit_task(task);
1299 }
1300 
1301 static void
1302 bdevperf_job_run(void *ctx)
1303 {
1304 	struct bdevperf_job *job = ctx;
1305 	struct bdevperf_task *task;
1306 	int i;
1307 
1308 	/* Submit initial I/O for this job. Each time one
1309 	 * completes, another will be submitted. */
1310 
1311 	/* Start a timer to stop this I/O chain when the run is over */
1312 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1313 	if (job->reset) {
1314 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1315 							10 * SPDK_SEC_TO_USEC);
1316 	}
1317 
1318 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1319 
1320 	for (i = 0; i < job->queue_depth; i++) {
1321 		task = bdevperf_job_get_task(job);
1322 		bdevperf_submit_single(job, task);
1323 	}
1324 }
1325 
1326 static void
1327 _performance_dump_done(void *ctx)
1328 {
1329 	struct bdevperf_aggregate_stats *stats = ctx;
1330 	double average_latency;
1331 
1332 	printf("\r =================================================================================="
1333 	       "=================================\n");
1334 	printf("\r %-28s: %10s %10.2f %10.2f",
1335 	       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1336 	printf(" %10.2f %8.2f",
1337 	       stats->total_failed_per_second, stats->total_timeout_per_second);
1338 
1339 	average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1340 			  spdk_get_ticks_hz();
1341 	printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1342 	printf("\n");
1343 
1344 	fflush(stdout);
1345 
1346 	g_performance_dump_active = false;
1347 
1348 	free(stats);
1349 }
1350 
1351 static void
1352 _performance_dump(void *ctx)
1353 {
1354 	struct bdevperf_aggregate_stats *stats = ctx;
1355 
1356 	performance_dump_job(stats, stats->current_job);
1357 
1358 	/* This assumes the jobs list is static after start up time.
1359 	 * That's true right now, but if that ever changed this would need a lock. */
1360 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1361 	if (stats->current_job == NULL) {
1362 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1363 	} else {
1364 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1365 	}
1366 }
1367 
1368 static int
1369 performance_statistics_thread(void *arg)
1370 {
1371 	struct bdevperf_aggregate_stats *stats;
1372 
1373 	if (g_performance_dump_active) {
1374 		return -1;
1375 	}
1376 
1377 	g_performance_dump_active = true;
1378 
1379 	stats = calloc(1, sizeof(*stats));
1380 	if (stats == NULL) {
1381 		return -1;
1382 	}
1383 
1384 	stats->min_latency = (double)UINT64_MAX;
1385 
1386 	g_show_performance_period_num++;
1387 
1388 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1389 	stats->ema_period = g_show_performance_ema_period;
1390 
1391 	/* Iterate all of the jobs to gather stats
1392 	 * These jobs will not get removed here until a final performance dump is run,
1393 	 * so this should be safe without locking.
1394 	 */
1395 	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1396 	if (stats->current_job == NULL) {
1397 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1398 	} else {
1399 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1400 	}
1401 
1402 	return -1;
1403 }
1404 
1405 static void
1406 bdevperf_test(void)
1407 {
1408 	struct bdevperf_job *job;
1409 
1410 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1411 	fflush(stdout);
1412 
1413 	/* Start a timer to dump performance numbers */
1414 	g_start_tsc = spdk_get_ticks();
1415 	if (g_show_performance_real_time && !g_perf_timer) {
1416 		printf("%*s\n", 107, "Latency(us)");
1417 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1418 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1419 
1420 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1421 						    g_show_performance_period_in_usec);
1422 	}
1423 
1424 	/* Iterate jobs to start all I/O */
1425 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1426 		g_bdevperf.running_jobs++;
1427 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1428 	}
1429 }
1430 
1431 static void
1432 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1433 {
1434 	struct bdevperf_job *job = event_ctx;
1435 
1436 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1437 		bdevperf_job_drain(job);
1438 	}
1439 }
1440 
1441 static void
1442 bdevperf_histogram_status_cb(void *cb_arg, int status)
1443 {
1444 	if (status != 0) {
1445 		g_run_rc = status;
1446 		if (g_continue_on_failure == false) {
1447 			g_error_to_exit = true;
1448 		}
1449 	}
1450 
1451 	if (--g_bdev_count == 0) {
1452 		if (g_run_rc == 0) {
1453 			/* Ready to run the test */
1454 			bdevperf_test();
1455 		} else {
1456 			bdevperf_test_done(NULL);
1457 		}
1458 	}
1459 }
1460 
1461 static uint32_t g_construct_job_count = 0;
1462 
1463 static int
1464 _bdevperf_enable_histogram(void *ctx, struct spdk_bdev *bdev)
1465 {
1466 	bool *enable = ctx;
1467 
1468 	g_bdev_count++;
1469 
1470 	spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, *enable);
1471 
1472 	return 0;
1473 }
1474 
1475 static void
1476 bdevperf_enable_histogram(bool enable)
1477 {
1478 	struct spdk_bdev *bdev;
1479 	int rc;
1480 
1481 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1482 	g_bdev_count = 1;
1483 
1484 	if (g_job_bdev_name != NULL) {
1485 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1486 		if (bdev) {
1487 			rc = _bdevperf_enable_histogram(&enable, bdev);
1488 		} else {
1489 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1490 			rc = -1;
1491 		}
1492 	} else {
1493 		rc = spdk_for_each_bdev_leaf(&enable, _bdevperf_enable_histogram);
1494 	}
1495 
1496 	bdevperf_histogram_status_cb(NULL, rc);
1497 }
1498 
1499 static void
1500 _bdevperf_construct_job_done(void *ctx)
1501 {
1502 	if (--g_construct_job_count == 0) {
1503 		if (g_run_rc != 0) {
1504 			/* Something failed. */
1505 			bdevperf_test_done(NULL);
1506 			return;
1507 		}
1508 
1509 		/* always enable histogram. */
1510 		bdevperf_enable_histogram(true);
1511 	} else if (g_run_rc != 0) {
1512 		/* Reset error as some jobs constructed right */
1513 		g_run_rc = 0;
1514 		if (g_continue_on_failure == false) {
1515 			g_error_to_exit = true;
1516 		}
1517 	}
1518 }
1519 
1520 /* Checkformat will not allow to use inlined type,
1521    this is a workaround */
1522 typedef struct spdk_thread *spdk_thread_t;
1523 
1524 static spdk_thread_t
1525 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1526 {
1527 	struct spdk_cpuset tmp;
1528 
1529 	/* This function runs on the main thread. */
1530 	assert(g_main_thread == spdk_get_thread());
1531 
1532 	/* Handle default mask */
1533 	if (spdk_cpuset_count(cpumask) == 0) {
1534 		cpumask = &g_all_cpuset;
1535 	}
1536 
1537 	/* Warn user that mask might need to be changed */
1538 	spdk_cpuset_copy(&tmp, cpumask);
1539 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1540 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1541 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1542 	}
1543 
1544 	return spdk_thread_create(tag, cpumask);
1545 }
1546 
1547 static uint32_t
1548 _get_next_core(void)
1549 {
1550 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1551 
1552 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1553 		current_core = spdk_env_get_first_core();
1554 		return current_core;
1555 	}
1556 
1557 	current_core = spdk_env_get_next_core(current_core);
1558 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1559 		current_core = spdk_env_get_first_core();
1560 	}
1561 
1562 	return current_core;
1563 }
1564 
1565 static void
1566 _bdevperf_construct_job(void *ctx)
1567 {
1568 	struct bdevperf_job *job = ctx;
1569 	int rc;
1570 
1571 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1572 				&job->bdev_desc);
1573 	if (rc != 0) {
1574 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1575 		g_run_rc = -EINVAL;
1576 		goto end;
1577 	}
1578 
1579 	if (g_zcopy) {
1580 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1581 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1582 			g_run_rc = -ENOTSUP;
1583 			goto end;
1584 		}
1585 	}
1586 
1587 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1588 	if (!job->ch) {
1589 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1590 			    rc);
1591 		spdk_bdev_close(job->bdev_desc);
1592 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1593 		g_run_rc = -ENOMEM;
1594 		goto end;
1595 	}
1596 
1597 end:
1598 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1599 }
1600 
1601 static void
1602 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1603 {
1604 	switch (rw) {
1605 	case JOB_CONFIG_RW_READ:
1606 		job->rw_percentage = 100;
1607 		break;
1608 	case JOB_CONFIG_RW_WRITE:
1609 		job->rw_percentage = 0;
1610 		break;
1611 	case JOB_CONFIG_RW_RANDREAD:
1612 		job->is_random = true;
1613 		job->rw_percentage = 100;
1614 		job->seed = rand();
1615 		break;
1616 	case JOB_CONFIG_RW_RANDWRITE:
1617 		job->is_random = true;
1618 		job->rw_percentage = 0;
1619 		job->seed = rand();
1620 		break;
1621 	case JOB_CONFIG_RW_RW:
1622 		job->is_random = false;
1623 		break;
1624 	case JOB_CONFIG_RW_RANDRW:
1625 		job->is_random = true;
1626 		job->seed = rand();
1627 		break;
1628 	case JOB_CONFIG_RW_VERIFY:
1629 		job->verify = true;
1630 		job->rw_percentage = 50;
1631 		break;
1632 	case JOB_CONFIG_RW_RESET:
1633 		job->reset = true;
1634 		job->verify = true;
1635 		job->rw_percentage = 50;
1636 		break;
1637 	case JOB_CONFIG_RW_UNMAP:
1638 		job->unmap = true;
1639 		break;
1640 	case JOB_CONFIG_RW_FLUSH:
1641 		job->flush = true;
1642 		break;
1643 	case JOB_CONFIG_RW_WRITE_ZEROES:
1644 		job->write_zeroes = true;
1645 		break;
1646 	}
1647 }
1648 
1649 static int
1650 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1651 		       struct spdk_thread *thread)
1652 {
1653 	struct bdevperf_job *job;
1654 	struct bdevperf_task *task;
1655 	int block_size, data_block_size;
1656 	int rc;
1657 	int task_num, n;
1658 
1659 	block_size = spdk_bdev_get_block_size(bdev);
1660 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1661 
1662 	job = calloc(1, sizeof(struct bdevperf_job));
1663 	if (!job) {
1664 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1665 		return -ENOMEM;
1666 	}
1667 
1668 	job->name = strdup(spdk_bdev_get_name(bdev));
1669 	if (!job->name) {
1670 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1671 		bdevperf_job_free(job);
1672 		return -ENOMEM;
1673 	}
1674 
1675 	job->workload_type = g_workload_type;
1676 	job->io_size = config->bs;
1677 	job->rw_percentage = config->rwmixread;
1678 	job->continue_on_failure = g_continue_on_failure;
1679 	job->queue_depth = config->iodepth;
1680 	job->bdev = bdev;
1681 	job->io_size_blocks = job->io_size / data_block_size;
1682 	job->buf_size = job->io_size_blocks * block_size;
1683 	job->abort = g_abort;
1684 	job_init_rw(job, config->rw);
1685 
1686 	if ((job->io_size % data_block_size) != 0) {
1687 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1688 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1689 		bdevperf_job_free(job);
1690 		return -ENOTSUP;
1691 	}
1692 
1693 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1694 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1695 		bdevperf_job_free(job);
1696 		return -ENOTSUP;
1697 	}
1698 
1699 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1700 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1701 	}
1702 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1703 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1704 	}
1705 
1706 	job->offset_in_ios = 0;
1707 
1708 	if (config->length != 0) {
1709 		/* Use subset of disk */
1710 		job->size_in_ios = config->length / job->io_size_blocks;
1711 		job->ios_base = config->offset / job->io_size_blocks;
1712 	} else {
1713 		/* Use whole disk */
1714 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1715 		job->ios_base = 0;
1716 	}
1717 
1718 	if (job->is_random && g_zipf_theta > 0) {
1719 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1720 	}
1721 
1722 	if (job->verify) {
1723 		if (job->size_in_ios >= UINT32_MAX) {
1724 			SPDK_ERRLOG("Due to constraints of verify operation, the job storage capacity is too large\n");
1725 			bdevperf_job_free(job);
1726 			return -ENOMEM;
1727 		}
1728 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1729 		if (job->outstanding == NULL) {
1730 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1731 				    spdk_bdev_get_name(bdev));
1732 			bdevperf_job_free(job);
1733 			return -ENOMEM;
1734 		}
1735 		if (job->queue_depth > (int)job->size_in_ios) {
1736 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1737 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1738 				     "Queue depth is limited to %"PRIu64"\n",
1739 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1740 			job->queue_depth = (int)job->size_in_ios;
1741 		}
1742 	}
1743 
1744 	job->histogram = spdk_histogram_data_alloc();
1745 	if (job->histogram == NULL) {
1746 		fprintf(stderr, "Failed to allocate histogram\n");
1747 		bdevperf_job_free(job);
1748 		return -ENOMEM;
1749 	}
1750 
1751 	TAILQ_INIT(&job->task_list);
1752 
1753 	if (g_random_map) {
1754 		if (job->size_in_ios >= UINT32_MAX) {
1755 			SPDK_ERRLOG("Due to constraints of the random map, the job storage capacity is too large\n");
1756 			bdevperf_job_free(job);
1757 			return -ENOMEM;
1758 		}
1759 		job->random_map = spdk_bit_array_create(job->size_in_ios);
1760 		if (job->random_map == NULL) {
1761 			SPDK_ERRLOG("Could not create random_map array bitmap for bdev %s\n",
1762 				    spdk_bdev_get_name(bdev));
1763 			bdevperf_job_free(job);
1764 			return -ENOMEM;
1765 		}
1766 	}
1767 
1768 	task_num = job->queue_depth;
1769 	if (job->reset) {
1770 		task_num += 1;
1771 	}
1772 	if (job->abort) {
1773 		task_num += job->queue_depth;
1774 	}
1775 
1776 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1777 
1778 	for (n = 0; n < task_num; n++) {
1779 		task = calloc(1, sizeof(struct bdevperf_task));
1780 		if (!task) {
1781 			fprintf(stderr, "Failed to allocate task from memory\n");
1782 			spdk_zipf_free(&job->zipf);
1783 			return -ENOMEM;
1784 		}
1785 
1786 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1787 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1788 		if (!task->buf) {
1789 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
1790 			spdk_zipf_free(&job->zipf);
1791 			free(task);
1792 			return -ENOMEM;
1793 		}
1794 
1795 		if (spdk_bdev_is_md_separate(job->bdev)) {
1796 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
1797 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
1798 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1799 			if (!task->md_buf) {
1800 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
1801 				spdk_zipf_free(&job->zipf);
1802 				spdk_free(task->buf);
1803 				free(task);
1804 				return -ENOMEM;
1805 			}
1806 		}
1807 
1808 		task->job = job;
1809 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
1810 	}
1811 
1812 	job->thread = thread;
1813 
1814 	g_construct_job_count++;
1815 
1816 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
1817 	assert(rc == 0);
1818 
1819 	return rc;
1820 }
1821 
1822 static int
1823 parse_rw(const char *str, enum job_config_rw ret)
1824 {
1825 	if (str == NULL) {
1826 		return ret;
1827 	}
1828 
1829 	if (!strcmp(str, "read")) {
1830 		ret = JOB_CONFIG_RW_READ;
1831 	} else if (!strcmp(str, "randread")) {
1832 		ret = JOB_CONFIG_RW_RANDREAD;
1833 	} else if (!strcmp(str, "write")) {
1834 		ret = JOB_CONFIG_RW_WRITE;
1835 	} else if (!strcmp(str, "randwrite")) {
1836 		ret = JOB_CONFIG_RW_RANDWRITE;
1837 	} else if (!strcmp(str, "verify")) {
1838 		ret = JOB_CONFIG_RW_VERIFY;
1839 	} else if (!strcmp(str, "reset")) {
1840 		ret = JOB_CONFIG_RW_RESET;
1841 	} else if (!strcmp(str, "unmap")) {
1842 		ret = JOB_CONFIG_RW_UNMAP;
1843 	} else if (!strcmp(str, "write_zeroes")) {
1844 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
1845 	} else if (!strcmp(str, "flush")) {
1846 		ret = JOB_CONFIG_RW_FLUSH;
1847 	} else if (!strcmp(str, "rw")) {
1848 		ret = JOB_CONFIG_RW_RW;
1849 	} else if (!strcmp(str, "randrw")) {
1850 		ret = JOB_CONFIG_RW_RANDRW;
1851 	} else {
1852 		fprintf(stderr, "rw must be one of\n"
1853 			"(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
1854 		ret = BDEVPERF_CONFIG_ERROR;
1855 	}
1856 
1857 	return ret;
1858 }
1859 
1860 static const char *
1861 config_filename_next(const char *filename, char *out)
1862 {
1863 	int i, k;
1864 
1865 	if (filename == NULL) {
1866 		out[0] = '\0';
1867 		return NULL;
1868 	}
1869 
1870 	if (filename[0] == ':') {
1871 		filename++;
1872 	}
1873 
1874 	for (i = 0, k = 0;
1875 	     filename[i] != '\0' &&
1876 	     filename[i] != ':' &&
1877 	     i < BDEVPERF_CONFIG_MAX_FILENAME &&
1878 	     k < (BDEVPERF_CONFIG_MAX_FILENAME - 1);
1879 	     i++) {
1880 		if (filename[i] == ' ' || filename[i] == '\t') {
1881 			continue;
1882 		}
1883 
1884 		out[k++] = filename[i];
1885 	}
1886 	out[k] = 0;
1887 
1888 	return filename + i;
1889 }
1890 
1891 static struct spdk_thread *
1892 get_lcore_thread(uint32_t lcore)
1893 {
1894 	struct lcore_thread *lthread;
1895 
1896 	TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
1897 		if (lthread->lcore == lcore) {
1898 			return lthread->thread;
1899 		}
1900 	}
1901 
1902 	return NULL;
1903 }
1904 
1905 static void
1906 bdevperf_construct_jobs(void)
1907 {
1908 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
1909 	struct spdk_thread *thread;
1910 	struct job_config *config;
1911 	struct spdk_bdev *bdev;
1912 	const char *filenames;
1913 	int rc;
1914 
1915 	TAILQ_FOREACH(config, &job_config_list, link) {
1916 		filenames = config->filename;
1917 
1918 		if (!g_one_thread_per_lcore) {
1919 			thread = construct_job_thread(&config->cpumask, config->name);
1920 		} else {
1921 			thread = get_lcore_thread(config->lcore);
1922 		}
1923 		assert(thread);
1924 
1925 		while (filenames) {
1926 			filenames = config_filename_next(filenames, filename);
1927 			if (strlen(filename) == 0) {
1928 				break;
1929 			}
1930 
1931 			bdev = spdk_bdev_get_by_name(filename);
1932 			if (!bdev) {
1933 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
1934 				g_run_rc = -EINVAL;
1935 				return;
1936 			}
1937 
1938 			rc = bdevperf_construct_job(bdev, config, thread);
1939 			if (rc < 0) {
1940 				g_run_rc = rc;
1941 				return;
1942 			}
1943 		}
1944 	}
1945 }
1946 
1947 static int
1948 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
1949 {
1950 	struct job_config *config = calloc(1, sizeof(*config));
1951 
1952 	if (config == NULL) {
1953 		fprintf(stderr, "Unable to allocate memory for job config\n");
1954 		return -ENOMEM;
1955 	}
1956 
1957 	config->name = filename;
1958 	config->filename = filename;
1959 	config->lcore = _get_next_core();
1960 	spdk_cpuset_zero(&config->cpumask);
1961 	spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
1962 	config->bs = g_io_size;
1963 	config->iodepth = g_queue_depth;
1964 	config->rwmixread = g_rw_percentage;
1965 	config->offset = offset;
1966 	config->length = range;
1967 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
1968 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
1969 		free(config);
1970 		return -EINVAL;
1971 	}
1972 
1973 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
1974 	return 0;
1975 }
1976 
1977 static int
1978 bdevperf_construct_multithread_job_config(void *ctx, struct spdk_bdev *bdev)
1979 {
1980 	uint32_t *num_cores = ctx;
1981 	uint32_t i;
1982 	uint64_t blocks_per_job;
1983 	int64_t offset;
1984 	int rc;
1985 
1986 	blocks_per_job = spdk_bdev_get_num_blocks(bdev) / *num_cores;
1987 	offset = 0;
1988 
1989 	SPDK_ENV_FOREACH_CORE(i) {
1990 		rc = make_cli_job_config(spdk_bdev_get_name(bdev), offset, blocks_per_job);
1991 		if (rc) {
1992 			return rc;
1993 		}
1994 
1995 		offset += blocks_per_job;
1996 	}
1997 
1998 	return 0;
1999 }
2000 
2001 static void
2002 bdevperf_construct_multithread_job_configs(void)
2003 {
2004 	struct spdk_bdev *bdev;
2005 	uint32_t i;
2006 	uint32_t num_cores;
2007 
2008 	num_cores = 0;
2009 	SPDK_ENV_FOREACH_CORE(i) {
2010 		num_cores++;
2011 	}
2012 
2013 	if (num_cores == 0) {
2014 		g_run_rc = -EINVAL;
2015 		return;
2016 	}
2017 
2018 	if (g_job_bdev_name != NULL) {
2019 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2020 		if (!bdev) {
2021 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2022 			return;
2023 		}
2024 		g_run_rc = bdevperf_construct_multithread_job_config(&num_cores, bdev);
2025 	} else {
2026 		g_run_rc = spdk_for_each_bdev_leaf(&num_cores, bdevperf_construct_multithread_job_config);
2027 	}
2028 
2029 }
2030 
2031 static int
2032 bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
2033 {
2034 	/* Construct the job */
2035 	return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
2036 }
2037 
2038 static void
2039 create_lcore_thread(uint32_t lcore)
2040 {
2041 	struct lcore_thread *lthread;
2042 	struct spdk_cpuset cpumask = {};
2043 	char name[32];
2044 
2045 	lthread = calloc(1, sizeof(*lthread));
2046 	assert(lthread != NULL);
2047 
2048 	lthread->lcore = lcore;
2049 
2050 	snprintf(name, sizeof(name), "lcore_%u", lcore);
2051 	spdk_cpuset_set_cpu(&cpumask, lcore, true);
2052 
2053 	lthread->thread = spdk_thread_create(name, &cpumask);
2054 	assert(lthread->thread != NULL);
2055 
2056 	TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
2057 }
2058 
2059 static void
2060 bdevperf_construct_job_configs(void)
2061 {
2062 	struct spdk_bdev *bdev;
2063 	uint32_t i;
2064 
2065 	/* There are three different modes for allocating jobs. Standard mode
2066 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
2067 	 *
2068 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
2069 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
2070 	 * This runs multiple threads per bdev, effectively.
2071 	 *
2072 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
2073 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
2074 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
2075 	 * which is different from other modes in that they only support global options.
2076 	 *
2077 	 * Both for standard mode and "multithread" mode, if the -E flag is specified,
2078 	 * it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
2079 	 * multiple jobs.
2080 	 */
2081 
2082 	if (g_bdevperf_conf) {
2083 		goto end;
2084 	}
2085 
2086 	if (g_one_thread_per_lcore) {
2087 		SPDK_ENV_FOREACH_CORE(i) {
2088 			create_lcore_thread(i);
2089 		}
2090 	}
2091 
2092 	if (g_multithread_mode) {
2093 		bdevperf_construct_multithread_job_configs();
2094 	} else if (g_job_bdev_name != NULL) {
2095 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2096 		if (bdev) {
2097 			/* Construct the job */
2098 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
2099 		} else {
2100 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2101 		}
2102 	} else {
2103 		g_run_rc = spdk_for_each_bdev_leaf(NULL, bdevperf_construct_job_config);
2104 	}
2105 
2106 end:
2107 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
2108 	 * of iteration.
2109 	 */
2110 	g_construct_job_count = 1;
2111 
2112 	if (g_run_rc == 0) {
2113 		bdevperf_construct_jobs();
2114 	}
2115 
2116 	_bdevperf_construct_job_done(NULL);
2117 }
2118 
2119 static int
2120 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
2121 {
2122 	const char *job_name;
2123 	int tmp;
2124 
2125 	tmp = spdk_conf_section_get_intval(s, name);
2126 	if (tmp == -1) {
2127 		/* Field was not found. Check default value
2128 		 * In [global] section it is ok to have undefined values
2129 		 * but for other sections it is not ok */
2130 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
2131 			job_name = spdk_conf_section_get_name(s);
2132 			if (strcmp(job_name, "global") == 0) {
2133 				return def;
2134 			}
2135 
2136 			fprintf(stderr,
2137 				"Job '%s' has no '%s' assigned\n",
2138 				job_name, name);
2139 			return BDEVPERF_CONFIG_ERROR;
2140 		}
2141 		return def;
2142 	}
2143 
2144 	/* NOTE: get_intval returns nonnegative on success */
2145 	if (tmp < 0) {
2146 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2147 			spdk_conf_section_get_name(s), name);
2148 		return BDEVPERF_CONFIG_ERROR;
2149 	}
2150 
2151 	return tmp;
2152 }
2153 
2154 /* CLI arguments override parameters for global sections */
2155 static void
2156 config_set_cli_args(struct job_config *config)
2157 {
2158 	if (g_job_bdev_name) {
2159 		config->filename = g_job_bdev_name;
2160 	}
2161 	if (g_io_size > 0) {
2162 		config->bs = g_io_size;
2163 	}
2164 	if (g_queue_depth > 0) {
2165 		config->iodepth = g_queue_depth;
2166 	}
2167 	if (g_rw_percentage > 0) {
2168 		config->rwmixread = g_rw_percentage;
2169 	}
2170 	if (g_workload_type) {
2171 		config->rw = parse_rw(g_workload_type, config->rw);
2172 	}
2173 }
2174 
2175 static int
2176 read_job_config(void)
2177 {
2178 	struct job_config global_default_config;
2179 	struct job_config global_config;
2180 	struct spdk_conf_section *s;
2181 	struct job_config *config = NULL;
2182 	const char *cpumask;
2183 	const char *rw;
2184 	bool is_global;
2185 	int n = 0;
2186 	int val;
2187 
2188 	if (g_bdevperf_conf_file == NULL) {
2189 		return 0;
2190 	}
2191 
2192 	g_bdevperf_conf = spdk_conf_allocate();
2193 	if (g_bdevperf_conf == NULL) {
2194 		fprintf(stderr, "Could not allocate job config structure\n");
2195 		return 1;
2196 	}
2197 
2198 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2199 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2200 		fprintf(stderr, "Invalid job config");
2201 		return 1;
2202 	}
2203 
2204 	/* Initialize global defaults */
2205 	global_default_config.filename = NULL;
2206 	/* Zero mask is the same as g_all_cpuset
2207 	 * The g_all_cpuset is not initialized yet,
2208 	 * so use zero mask as the default instead */
2209 	spdk_cpuset_zero(&global_default_config.cpumask);
2210 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2211 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2212 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2213 	global_default_config.rwmixread = 50;
2214 	global_default_config.offset = 0;
2215 	/* length 0 means 100% */
2216 	global_default_config.length = 0;
2217 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2218 	config_set_cli_args(&global_default_config);
2219 
2220 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2221 		return 1;
2222 	}
2223 
2224 	/* There is only a single instance of global job_config
2225 	 * We just reset its value when we encounter new [global] section */
2226 	global_config = global_default_config;
2227 
2228 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2229 	     s != NULL;
2230 	     s = spdk_conf_next_section(s)) {
2231 		config = calloc(1, sizeof(*config));
2232 		if (config == NULL) {
2233 			fprintf(stderr, "Unable to allocate memory for job config\n");
2234 			return 1;
2235 		}
2236 
2237 		config->name = spdk_conf_section_get_name(s);
2238 		is_global = strcmp(config->name, "global") == 0;
2239 
2240 		if (is_global) {
2241 			global_config = global_default_config;
2242 		}
2243 
2244 		config->filename = spdk_conf_section_get_val(s, "filename");
2245 		if (config->filename == NULL) {
2246 			config->filename = global_config.filename;
2247 		}
2248 		if (!is_global) {
2249 			if (config->filename == NULL) {
2250 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2251 				goto error;
2252 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2253 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2254 				fprintf(stderr,
2255 					"filename for '%s' job is too long. Max length is %d\n",
2256 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2257 				goto error;
2258 			}
2259 		}
2260 
2261 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2262 		if (cpumask == NULL) {
2263 			config->cpumask = global_config.cpumask;
2264 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2265 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2266 			goto error;
2267 		}
2268 
2269 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2270 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2271 			goto error;
2272 		} else if (config->bs == 0) {
2273 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2274 			goto error;
2275 		}
2276 
2277 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2278 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2279 			goto error;
2280 		} else if (config->iodepth == 0) {
2281 			fprintf(stderr,
2282 				"'iodepth' of job '%s' must be greater than 0\n",
2283 				config->name);
2284 			goto error;
2285 		}
2286 
2287 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2288 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2289 			goto error;
2290 		} else if (config->rwmixread > 100) {
2291 			fprintf(stderr,
2292 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2293 				config->name);
2294 			goto error;
2295 		}
2296 
2297 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2298 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2299 			goto error;
2300 		}
2301 
2302 		val = parse_uint_option(s, "length", global_config.length);
2303 		if (val == BDEVPERF_CONFIG_ERROR) {
2304 			goto error;
2305 		}
2306 		config->length = val;
2307 
2308 		rw = spdk_conf_section_get_val(s, "rw");
2309 		config->rw = parse_rw(rw, global_config.rw);
2310 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2311 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2312 			goto error;
2313 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2314 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2315 			goto error;
2316 		}
2317 
2318 		if (is_global) {
2319 			config_set_cli_args(config);
2320 			global_config = *config;
2321 			free(config);
2322 		} else {
2323 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2324 			n++;
2325 		}
2326 	}
2327 
2328 	if (g_rpc_log_file_name != NULL) {
2329 		g_rpc_log_file = fopen(g_rpc_log_file_name, "a");
2330 		if (g_rpc_log_file == NULL) {
2331 			fprintf(stderr, "Failed to open %s\n", g_rpc_log_file_name);
2332 			goto error;
2333 		}
2334 	}
2335 
2336 	printf("Using job config with %d jobs\n", n);
2337 	return 0;
2338 error:
2339 	free(config);
2340 	return 1;
2341 }
2342 
2343 static void
2344 bdevperf_run(void *arg1)
2345 {
2346 	uint32_t i;
2347 
2348 	g_main_thread = spdk_get_thread();
2349 
2350 	spdk_cpuset_zero(&g_all_cpuset);
2351 	SPDK_ENV_FOREACH_CORE(i) {
2352 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2353 	}
2354 
2355 	if (g_wait_for_tests) {
2356 		/* Do not perform any tests until RPC is received */
2357 		return;
2358 	}
2359 
2360 	bdevperf_construct_job_configs();
2361 }
2362 
2363 static void
2364 rpc_perform_tests_reset(void)
2365 {
2366 	/* Reset g_run_rc to 0 for the next test run. */
2367 	g_run_rc = 0;
2368 
2369 	/* Reset g_stats to 0 for the next test run. */
2370 	memset(&g_stats, 0, sizeof(g_stats));
2371 
2372 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2373 	g_show_performance_period_num = 0;
2374 }
2375 
2376 static void
2377 rpc_perform_tests_cb(void)
2378 {
2379 	struct spdk_json_write_ctx *w;
2380 	struct spdk_jsonrpc_request *request = g_request;
2381 
2382 	g_request = NULL;
2383 
2384 	if (g_run_rc == 0) {
2385 		w = spdk_jsonrpc_begin_result(request);
2386 		spdk_json_write_uint32(w, g_run_rc);
2387 		spdk_jsonrpc_end_result(request, w);
2388 	} else {
2389 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2390 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2391 	}
2392 
2393 	rpc_perform_tests_reset();
2394 }
2395 
2396 static void
2397 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2398 {
2399 	if (params != NULL) {
2400 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
2401 						 "perform_tests method requires no parameters");
2402 		return;
2403 	}
2404 	if (g_request != NULL) {
2405 		fprintf(stderr, "Another test is already in progress.\n");
2406 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2407 						 spdk_strerror(-EINPROGRESS));
2408 		return;
2409 	}
2410 	g_request = request;
2411 
2412 	/* Only construct job configs at the first test run.  */
2413 	if (TAILQ_EMPTY(&job_config_list)) {
2414 		bdevperf_construct_job_configs();
2415 	} else {
2416 		bdevperf_construct_jobs();
2417 	}
2418 }
2419 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2420 
2421 static void
2422 _bdevperf_job_drain(void *ctx)
2423 {
2424 	bdevperf_job_drain(ctx);
2425 }
2426 
2427 static void
2428 spdk_bdevperf_shutdown_cb(void)
2429 {
2430 	g_shutdown = true;
2431 	struct bdevperf_job *job, *tmp;
2432 
2433 	if (g_bdevperf.running_jobs == 0) {
2434 		bdevperf_test_done(NULL);
2435 		return;
2436 	}
2437 
2438 	/* Iterate jobs to stop all I/O */
2439 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2440 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2441 	}
2442 }
2443 
2444 static int
2445 bdevperf_parse_arg(int ch, char *arg)
2446 {
2447 	long long tmp;
2448 
2449 	if (ch == 'w') {
2450 		g_workload_type = optarg;
2451 	} else if (ch == 'T') {
2452 		g_job_bdev_name = optarg;
2453 	} else if (ch == 'z') {
2454 		g_wait_for_tests = true;
2455 	} else if (ch == 'Z') {
2456 		g_zcopy = true;
2457 	} else if (ch == 'X') {
2458 		g_abort = true;
2459 	} else if (ch == 'C') {
2460 		g_multithread_mode = true;
2461 	} else if (ch == 'f') {
2462 		g_continue_on_failure = true;
2463 	} else if (ch == 'j') {
2464 		g_bdevperf_conf_file = optarg;
2465 	} else if (ch == 'F') {
2466 		char *endptr;
2467 
2468 		errno = 0;
2469 		g_zipf_theta = strtod(optarg, &endptr);
2470 		if (errno || optarg == endptr || g_zipf_theta < 0) {
2471 			fprintf(stderr, "Illegal zipf theta value %s\n", optarg);
2472 			return -EINVAL;
2473 		}
2474 	} else if (ch == 'l') {
2475 		g_latency_display_level++;
2476 	} else if (ch == 'D') {
2477 		g_random_map = true;
2478 	} else if (ch == 'E') {
2479 		g_one_thread_per_lcore = true;
2480 	} else if (ch == 'J') {
2481 		g_rpc_log_file_name = optarg;
2482 	} else {
2483 		tmp = spdk_strtoll(optarg, 10);
2484 		if (tmp < 0) {
2485 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2486 			return tmp;
2487 		} else if (tmp >= INT_MAX) {
2488 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2489 			return -ERANGE;
2490 		}
2491 
2492 		switch (ch) {
2493 		case 'q':
2494 			g_queue_depth = tmp;
2495 			break;
2496 		case 'o':
2497 			g_io_size = tmp;
2498 			break;
2499 		case 't':
2500 			g_time_in_sec = tmp;
2501 			break;
2502 		case 'k':
2503 			g_timeout_in_sec = tmp;
2504 			break;
2505 		case 'M':
2506 			g_rw_percentage = tmp;
2507 			g_mix_specified = true;
2508 			break;
2509 		case 'P':
2510 			g_show_performance_ema_period = tmp;
2511 			break;
2512 		case 'S':
2513 			g_show_performance_real_time = 1;
2514 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2515 			break;
2516 		default:
2517 			return -EINVAL;
2518 		}
2519 	}
2520 	return 0;
2521 }
2522 
2523 static void
2524 bdevperf_usage(void)
2525 {
2526 	printf(" -q <depth>                io depth\n");
2527 	printf(" -o <size>                 io size in bytes\n");
2528 	printf(" -w <type>                 io pattern type, must be one of (read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
2529 	printf(" -t <time>                 time in seconds\n");
2530 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2531 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2532 	printf(" -P <num>                  number of moving average period\n");
2533 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2534 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2535 	printf("\t\t(only valid with -S)\n");
2536 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2537 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2538 	printf(" -f                        continue processing I/O even after failures\n");
2539 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2540 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2541 	printf(" -z                        start bdevperf, but wait for RPC to start tests\n");
2542 	printf(" -X                        abort timed out I/O\n");
2543 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2544 	printf(" -j <filename>             use job config file\n");
2545 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2546 	printf(" -D                        use a random map for picking offsets not previously read or written (for all jobs)\n");
2547 	printf(" -E                        share per lcore thread among jobs. Available only if -j is not used.\n");
2548 	printf(" -J                        File name to open with append mode and log JSON RPC calls.\n");
2549 }
2550 
2551 static void
2552 bdevperf_fini(void)
2553 {
2554 	free_job_config();
2555 
2556 	if (g_rpc_log_file != NULL) {
2557 		fclose(g_rpc_log_file);
2558 		g_rpc_log_file = NULL;
2559 	}
2560 }
2561 
2562 static int
2563 verify_test_params(struct spdk_app_opts *opts)
2564 {
2565 	/* When RPC is used for starting tests and
2566 	 * no rpc_addr was configured for the app,
2567 	 * use the default address. */
2568 	if (g_wait_for_tests && opts->rpc_addr == NULL) {
2569 		opts->rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2570 	}
2571 
2572 	if (g_rpc_log_file != NULL) {
2573 		opts->rpc_log_file = g_rpc_log_file;
2574 	}
2575 
2576 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2577 		goto out;
2578 	}
2579 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2580 		goto out;
2581 	}
2582 	if (!g_bdevperf_conf_file && !g_workload_type) {
2583 		goto out;
2584 	}
2585 	if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
2586 		printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
2587 		goto out;
2588 	}
2589 	if (g_time_in_sec <= 0) {
2590 		goto out;
2591 	}
2592 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2593 
2594 	if (g_timeout_in_sec < 0) {
2595 		goto out;
2596 	}
2597 
2598 	if (g_abort && !g_timeout_in_sec) {
2599 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2600 	}
2601 
2602 	if (g_show_performance_ema_period > 0 &&
2603 	    g_show_performance_real_time == 0) {
2604 		fprintf(stderr, "-P option must be specified with -S option\n");
2605 		return 1;
2606 	}
2607 
2608 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2609 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2610 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2611 		printf("Zero copy mechanism will not be used.\n");
2612 		g_zcopy = false;
2613 	}
2614 
2615 	if (g_bdevperf_conf_file) {
2616 		/* workload_type verification happens during config file parsing */
2617 		return 0;
2618 	}
2619 
2620 	if (!strcmp(g_workload_type, "verify") ||
2621 	    !strcmp(g_workload_type, "reset")) {
2622 		g_rw_percentage = 50;
2623 		if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2624 			fprintf(stderr, "Unable to exceed max I/O size of %d for verify. (%d provided).\n",
2625 				SPDK_BDEV_LARGE_BUF_MAX_SIZE, g_io_size);
2626 			return 1;
2627 		}
2628 		g_verify = true;
2629 		if (!strcmp(g_workload_type, "reset")) {
2630 			g_reset = true;
2631 		}
2632 	}
2633 
2634 	if (!strcmp(g_workload_type, "read") ||
2635 	    !strcmp(g_workload_type, "randread") ||
2636 	    !strcmp(g_workload_type, "write") ||
2637 	    !strcmp(g_workload_type, "randwrite") ||
2638 	    !strcmp(g_workload_type, "verify") ||
2639 	    !strcmp(g_workload_type, "reset") ||
2640 	    !strcmp(g_workload_type, "unmap") ||
2641 	    !strcmp(g_workload_type, "write_zeroes") ||
2642 	    !strcmp(g_workload_type, "flush")) {
2643 		if (g_mix_specified) {
2644 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2645 				" only when using rw or randrw.\n");
2646 		}
2647 	}
2648 
2649 	if (!strcmp(g_workload_type, "rw") ||
2650 	    !strcmp(g_workload_type, "randrw")) {
2651 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2652 			fprintf(stderr,
2653 				"-M must be specified to value from 0 to 100 "
2654 				"for rw or randrw.\n");
2655 			return 1;
2656 		}
2657 	}
2658 
2659 	if (strcmp(g_workload_type, "randread") &&
2660 	    strcmp(g_workload_type, "randwrite") &&
2661 	    strcmp(g_workload_type, "randrw")) {
2662 		if (g_random_map) {
2663 			fprintf(stderr, "Ignoring -D option... Please use -D option"
2664 				" only when using randread, randwrite or randrw.\n");
2665 			return 1;
2666 		}
2667 	}
2668 
2669 	return 0;
2670 out:
2671 	spdk_app_usage();
2672 	bdevperf_usage();
2673 	return 1;
2674 }
2675 
2676 int
2677 main(int argc, char **argv)
2678 {
2679 	struct spdk_app_opts opts = {};
2680 	int rc;
2681 
2682 	/* Use the runtime PID to set the random seed */
2683 	srand(getpid());
2684 
2685 	spdk_app_opts_init(&opts, sizeof(opts));
2686 	opts.name = "bdevperf";
2687 	opts.rpc_addr = NULL;
2688 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2689 
2690 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:J:M:P:S:T:Xlj:D", NULL,
2691 				      bdevperf_parse_arg, bdevperf_usage)) !=
2692 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2693 		return rc;
2694 	}
2695 
2696 	if (read_job_config()) {
2697 		bdevperf_fini();
2698 		return 1;
2699 	}
2700 
2701 	if (verify_test_params(&opts) != 0) {
2702 		bdevperf_fini();
2703 		exit(1);
2704 	}
2705 
2706 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
2707 
2708 	spdk_app_fini();
2709 	bdevperf_fini();
2710 	return rc;
2711 }
2712