xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision a8d21b9b550dde7d3e7ffc0cd1171528a136165f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 
28 struct bdevperf_task {
29 	struct iovec			iov;
30 	struct bdevperf_job		*job;
31 	struct spdk_bdev_io		*bdev_io;
32 	void				*buf;
33 	void				*md_buf;
34 	uint64_t			offset_blocks;
35 	struct bdevperf_task		*task_to_abort;
36 	enum spdk_bdev_io_type		io_type;
37 	TAILQ_ENTRY(bdevperf_task)	link;
38 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
39 };
40 
41 static const char *g_workload_type = NULL;
42 static int g_io_size = 0;
43 /* initialize to invalid value so we can detect if user overrides it. */
44 static int g_rw_percentage = -1;
45 static bool g_verify = false;
46 static bool g_reset = false;
47 static bool g_continue_on_failure = false;
48 static bool g_abort = false;
49 static bool g_error_to_exit = false;
50 static int g_queue_depth = 0;
51 static uint64_t g_time_in_usec;
52 static int g_show_performance_real_time = 0;
53 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
54 static uint64_t g_show_performance_period_num = 0;
55 static uint64_t g_show_performance_ema_period = 0;
56 static int g_run_rc = 0;
57 static bool g_shutdown = false;
58 static uint64_t g_start_tsc;
59 static uint64_t g_shutdown_tsc;
60 static bool g_zcopy = false;
61 static struct spdk_thread *g_main_thread;
62 static int g_time_in_sec = 0;
63 static bool g_mix_specified = false;
64 static const char *g_job_bdev_name;
65 static bool g_wait_for_tests = false;
66 static struct spdk_jsonrpc_request *g_request = NULL;
67 static bool g_multithread_mode = false;
68 static int g_timeout_in_sec;
69 static struct spdk_conf *g_bdevperf_conf = NULL;
70 static const char *g_bdevperf_conf_file = NULL;
71 static double g_zipf_theta;
72 
73 static struct spdk_cpuset g_all_cpuset;
74 static struct spdk_poller *g_perf_timer = NULL;
75 
76 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
77 static void rpc_perform_tests_cb(void);
78 
79 static uint32_t g_bdev_count = 0;
80 static uint32_t g_latency_display_level;
81 
82 static const double g_latency_cutoffs[] = {
83 	0.01,
84 	0.10,
85 	0.25,
86 	0.50,
87 	0.75,
88 	0.90,
89 	0.95,
90 	0.98,
91 	0.99,
92 	0.995,
93 	0.999,
94 	0.9999,
95 	0.99999,
96 	0.999999,
97 	0.9999999,
98 	-1,
99 };
100 
101 struct latency_info {
102 	uint64_t	min;
103 	uint64_t	max;
104 	uint64_t	total;
105 };
106 
107 struct bdevperf_job {
108 	char				*name;
109 	struct spdk_bdev		*bdev;
110 	struct spdk_bdev_desc		*bdev_desc;
111 	struct spdk_io_channel		*ch;
112 	TAILQ_ENTRY(bdevperf_job)	link;
113 	struct spdk_thread		*thread;
114 
115 	const char			*workload_type;
116 	int				io_size;
117 	int				rw_percentage;
118 	bool				is_random;
119 	bool				verify;
120 	bool				reset;
121 	bool				continue_on_failure;
122 	bool				unmap;
123 	bool				write_zeroes;
124 	bool				flush;
125 	bool				abort;
126 	int				queue_depth;
127 	unsigned int			seed;
128 
129 	uint64_t			io_completed;
130 	uint64_t			io_failed;
131 	uint64_t			io_timeout;
132 	uint64_t			prev_io_completed;
133 	double				ema_io_per_second;
134 	int				current_queue_depth;
135 	uint64_t			size_in_ios;
136 	uint64_t			ios_base;
137 	uint64_t			offset_in_ios;
138 	uint64_t			io_size_blocks;
139 	uint64_t			buf_size;
140 	uint32_t			dif_check_flags;
141 	bool				is_draining;
142 	struct spdk_poller		*run_timer;
143 	struct spdk_poller		*reset_timer;
144 	struct spdk_bit_array		*outstanding;
145 	struct spdk_zipf		*zipf;
146 	TAILQ_HEAD(, bdevperf_task)	task_list;
147 	uint64_t			run_time_in_usec;
148 
149 	/* keep channel's histogram data before being destroyed */
150 	struct spdk_histogram_data	*histogram;
151 };
152 
153 struct spdk_bdevperf {
154 	TAILQ_HEAD(, bdevperf_job)	jobs;
155 	uint32_t			running_jobs;
156 };
157 
158 static struct spdk_bdevperf g_bdevperf = {
159 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
160 	.running_jobs = 0,
161 };
162 
163 enum job_config_rw {
164 	JOB_CONFIG_RW_READ = 0,
165 	JOB_CONFIG_RW_WRITE,
166 	JOB_CONFIG_RW_RANDREAD,
167 	JOB_CONFIG_RW_RANDWRITE,
168 	JOB_CONFIG_RW_RW,
169 	JOB_CONFIG_RW_RANDRW,
170 	JOB_CONFIG_RW_VERIFY,
171 	JOB_CONFIG_RW_RESET,
172 	JOB_CONFIG_RW_UNMAP,
173 	JOB_CONFIG_RW_FLUSH,
174 	JOB_CONFIG_RW_WRITE_ZEROES,
175 };
176 
177 /* Storing values from a section of job config file */
178 struct job_config {
179 	const char			*name;
180 	const char			*filename;
181 	struct spdk_cpuset		cpumask;
182 	int				bs;
183 	int				iodepth;
184 	int				rwmixread;
185 	int64_t				offset;
186 	uint64_t			length;
187 	enum job_config_rw		rw;
188 	TAILQ_ENTRY(job_config)	link;
189 };
190 
191 TAILQ_HEAD(, job_config) job_config_list
192 	= TAILQ_HEAD_INITIALIZER(job_config_list);
193 
194 static bool g_performance_dump_active = false;
195 
196 struct bdevperf_aggregate_stats {
197 	struct bdevperf_job		*current_job;
198 	uint64_t			io_time_in_usec;
199 	uint64_t			ema_period;
200 	double				total_io_per_second;
201 	double				total_mb_per_second;
202 	double				total_failed_per_second;
203 	double				total_timeout_per_second;
204 	double				min_latency;
205 	double				max_latency;
206 	uint64_t			total_io_completed;
207 	uint64_t			total_tsc;
208 };
209 
210 static struct bdevperf_aggregate_stats g_stats = {.min_latency = (double)UINT64_MAX};
211 
212 /*
213  * Cumulative Moving Average (CMA): average of all data up to current
214  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
215  * Simple Moving Average (SMA): unweighted mean of the previous n data
216  *
217  * Bdevperf supports CMA and EMA.
218  */
219 static double
220 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
221 {
222 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
223 }
224 
225 static double
226 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
227 {
228 	double io_completed, io_per_second;
229 
230 	io_completed = job->io_completed;
231 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
232 			/ g_show_performance_period_in_usec;
233 	job->prev_io_completed = io_completed;
234 
235 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
236 				  / (ema_period + 1);
237 	return job->ema_io_per_second;
238 }
239 
240 static void
241 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
242 		uint64_t total, uint64_t so_far)
243 {
244 	struct latency_info *latency_info = ctx;
245 
246 	if (count == 0) {
247 		return;
248 	}
249 
250 	latency_info->total += (start + end) / 2 * count;
251 
252 	if (so_far == count) {
253 		latency_info->min = start;
254 	}
255 
256 	if (so_far == total) {
257 		latency_info->max = end;
258 	}
259 }
260 
261 static void
262 performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job *job)
263 {
264 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
265 	double average_latency = 0.0, min_latency, max_latency;
266 	uint64_t time_in_usec;
267 	uint64_t tsc_rate;
268 	uint64_t total_io;
269 	struct latency_info latency_info = {};
270 
271 	printf("\r Job: %s (Core Mask 0x%s)\n", spdk_thread_get_name(job->thread),
272 	       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
273 
274 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
275 		printf("\r Job: %s ended in about %.2f seconds with error\n",
276 		       spdk_thread_get_name(job->thread), (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
277 	}
278 	if (job->verify) {
279 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
280 		       job->ios_base, job->size_in_ios);
281 	}
282 
283 	if (g_performance_dump_active == true) {
284 		/* Use job's actual run time as Job has ended */
285 		if (job->io_failed > 0 && !job->continue_on_failure) {
286 			time_in_usec = job->run_time_in_usec;
287 		} else {
288 			time_in_usec = stats->io_time_in_usec;
289 		}
290 	} else {
291 		time_in_usec = job->run_time_in_usec;
292 	}
293 
294 	if (stats->ema_period == 0) {
295 		io_per_second = get_cma_io_per_second(job, time_in_usec);
296 	} else {
297 		io_per_second = get_ema_io_per_second(job, stats->ema_period);
298 	}
299 
300 	tsc_rate = spdk_get_ticks_hz();
301 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
302 
303 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
304 
305 	total_io = job->io_completed + job->io_failed;
306 	if (total_io != 0) {
307 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
308 	}
309 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
310 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
311 
312 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
313 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
314 
315 	printf("\t %-20s: %10.2f %10.2f %10.2f",
316 	       job->name, (float)time_in_usec / SPDK_SEC_TO_USEC, io_per_second, mb_per_second);
317 	printf(" %10.2f %8.2f",
318 	       failed_per_second, timeout_per_second);
319 	printf(" %10.2f %10.2f %10.2f\n",
320 	       average_latency, min_latency, max_latency);
321 
322 	stats->total_io_per_second += io_per_second;
323 	stats->total_mb_per_second += mb_per_second;
324 	stats->total_failed_per_second += failed_per_second;
325 	stats->total_timeout_per_second += timeout_per_second;
326 	stats->total_io_completed += job->io_completed + job->io_failed;
327 	stats->total_tsc += latency_info.total;
328 	if (min_latency < stats->min_latency) {
329 		stats->min_latency = min_latency;
330 	}
331 	if (max_latency > stats->max_latency) {
332 		stats->max_latency = max_latency;
333 	}
334 }
335 
336 static void
337 generate_data(void *buf, int buf_len, int block_size, void *md_buf, int md_size,
338 	      int num_blocks)
339 {
340 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
341 
342 	if (buf_len < num_blocks * block_size) {
343 		return;
344 	}
345 
346 	if (md_buf == NULL) {
347 		data_block_size = block_size - md_size;
348 		md_buf = (char *)buf + data_block_size;
349 		md_offset = block_size;
350 	} else {
351 		data_block_size = block_size;
352 		md_offset = md_size;
353 	}
354 
355 	while (offset_blocks < num_blocks) {
356 		inner_offset = 0;
357 		while (inner_offset < data_block_size) {
358 			*(uint32_t *)buf = offset_blocks + inner_offset;
359 			inner_offset += sizeof(uint32_t);
360 			buf += sizeof(uint32_t);
361 		}
362 		memset(md_buf, offset_blocks, md_size);
363 		md_buf += md_offset;
364 		offset_blocks++;
365 	}
366 }
367 
368 static bool
369 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
370 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
371 {
372 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
373 		return false;
374 	}
375 
376 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
377 
378 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
379 
380 	if (wr_md_buf != NULL) {
381 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
382 	}
383 
384 	return true;
385 }
386 
387 static bool
388 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
389 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
390 {
391 	int offset_blocks = 0, md_offset, data_block_size;
392 
393 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
394 		return false;
395 	}
396 
397 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
398 
399 	if (wr_md_buf == NULL) {
400 		data_block_size = block_size - md_size;
401 		wr_md_buf = (char *)wr_buf + data_block_size;
402 		rd_md_buf = (char *)rd_buf + data_block_size;
403 		md_offset = block_size;
404 	} else {
405 		data_block_size = block_size;
406 		md_offset = md_size;
407 	}
408 
409 	while (offset_blocks < num_blocks) {
410 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
411 			return false;
412 		}
413 
414 		wr_buf += block_size;
415 		rd_buf += block_size;
416 
417 		if (md_check) {
418 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
419 				return false;
420 			}
421 
422 			wr_md_buf += md_offset;
423 			rd_md_buf += md_offset;
424 		}
425 
426 		offset_blocks++;
427 	}
428 
429 	return true;
430 }
431 
432 static void
433 free_job_config(void)
434 {
435 	struct job_config *config, *tmp;
436 
437 	spdk_conf_free(g_bdevperf_conf);
438 	g_bdevperf_conf = NULL;
439 
440 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
441 		TAILQ_REMOVE(&job_config_list, config, link);
442 		free(config);
443 	}
444 }
445 
446 static void
447 bdevperf_job_free(struct bdevperf_job *job)
448 {
449 	spdk_histogram_data_free(job->histogram);
450 	spdk_bit_array_free(&job->outstanding);
451 	spdk_zipf_free(&job->zipf);
452 	free(job->name);
453 	free(job);
454 }
455 
456 static void
457 job_thread_exit(void *ctx)
458 {
459 	spdk_thread_exit(spdk_get_thread());
460 }
461 
462 static void
463 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
464 	     uint64_t total, uint64_t so_far)
465 {
466 	double so_far_pct;
467 	double **cutoff = ctx;
468 	uint64_t tsc_rate;
469 
470 	if (count == 0) {
471 		return;
472 	}
473 
474 	tsc_rate = spdk_get_ticks_hz();
475 	so_far_pct = (double)so_far / total;
476 	while (so_far_pct >= **cutoff && **cutoff > 0) {
477 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
478 		(*cutoff)++;
479 	}
480 }
481 
482 static void
483 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
484 	     uint64_t total, uint64_t so_far)
485 {
486 	double so_far_pct;
487 	uint64_t tsc_rate;
488 
489 	if (count == 0) {
490 		return;
491 	}
492 
493 	tsc_rate = spdk_get_ticks_hz();
494 	so_far_pct = (double)so_far * 100 / total;
495 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
496 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
497 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
498 	       so_far_pct, count);
499 }
500 
501 static void
502 bdevperf_test_done(void *ctx)
503 {
504 	struct bdevperf_job *job, *jtmp;
505 	struct bdevperf_task *task, *ttmp;
506 	double average_latency = 0.0;
507 	uint64_t time_in_usec;
508 	int rc;
509 
510 	if (g_time_in_usec) {
511 		g_stats.io_time_in_usec = g_time_in_usec;
512 
513 		if (!g_run_rc && g_performance_dump_active) {
514 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
515 			return;
516 		}
517 	}
518 
519 	if (g_show_performance_real_time) {
520 		spdk_poller_unregister(&g_perf_timer);
521 	}
522 
523 	if (g_shutdown) {
524 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
525 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
526 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
527 		printf("Received shutdown signal, test time was about %.6f seconds\n",
528 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
529 	}
530 
531 	printf("\n%*s\n", 107, "Latency(us)");
532 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
533 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
534 
535 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
536 		performance_dump_job(&g_stats, job);
537 	}
538 
539 	printf("\r =================================================================================="
540 	       "=================================\n");
541 	printf("\r %-28s: %10s %10.2f %10.2f",
542 	       "Total", "", g_stats.total_io_per_second, g_stats.total_mb_per_second);
543 	printf(" %10.2f %8.2f",
544 	       g_stats.total_failed_per_second, g_stats.total_timeout_per_second);
545 
546 	if (g_stats.total_io_completed != 0) {
547 		average_latency = ((double)g_stats.total_tsc / g_stats.total_io_completed) * SPDK_SEC_TO_USEC /
548 				  spdk_get_ticks_hz();
549 	}
550 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.min_latency, g_stats.max_latency);
551 
552 	fflush(stdout);
553 
554 	if (g_latency_display_level == 0 || g_stats.total_io_completed == 0) {
555 		goto clean;
556 	}
557 
558 	printf("\n Latency summary\n");
559 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
560 		printf("\r =============================================\n");
561 		printf("\r Job: %s (Core Mask 0x%s)\n", spdk_thread_get_name(job->thread),
562 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
563 
564 		const double *cutoff = g_latency_cutoffs;
565 
566 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
567 
568 		printf("\n");
569 	}
570 
571 	if (g_latency_display_level == 1) {
572 		goto clean;
573 	}
574 
575 	printf("\r Latency histogram\n");
576 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
577 		printf("\r =============================================\n");
578 		printf("\r Job: %s (Core Mask 0x%s)\n", spdk_thread_get_name(job->thread),
579 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
580 
581 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
582 		printf("\n");
583 	}
584 
585 clean:
586 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
587 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
588 
589 		spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
590 
591 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
592 			TAILQ_REMOVE(&job->task_list, task, link);
593 			spdk_free(task->buf);
594 			spdk_free(task->md_buf);
595 			free(task);
596 		}
597 
598 		bdevperf_job_free(job);
599 	}
600 
601 	rc = g_run_rc;
602 	if (g_request && !g_shutdown) {
603 		rpc_perform_tests_cb();
604 		if (rc != 0) {
605 			spdk_app_stop(rc);
606 		}
607 	} else {
608 		spdk_app_stop(rc);
609 	}
610 }
611 
612 static void
613 bdevperf_job_end(void *ctx)
614 {
615 	assert(g_main_thread == spdk_get_thread());
616 
617 	if (--g_bdevperf.running_jobs == 0) {
618 		bdevperf_test_done(NULL);
619 	}
620 }
621 
622 static void
623 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
624 {
625 	struct spdk_histogram_data *job_hist = cb_arg;
626 
627 	if (status == 0) {
628 		spdk_histogram_data_merge(job_hist, histogram);
629 	}
630 }
631 
632 static void
633 bdevperf_job_empty(struct bdevperf_job *job)
634 {
635 	uint64_t end_tsc = 0;
636 
637 	end_tsc = spdk_get_ticks() - g_start_tsc;
638 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
639 	/* keep histogram info before channel is destroyed */
640 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
641 					job->histogram);
642 	spdk_put_io_channel(job->ch);
643 	spdk_bdev_close(job->bdev_desc);
644 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
645 }
646 
647 static void
648 bdevperf_end_task(struct bdevperf_task *task)
649 {
650 	struct bdevperf_job     *job = task->job;
651 
652 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
653 	if (job->is_draining) {
654 		if (job->current_queue_depth == 0) {
655 			bdevperf_job_empty(job);
656 		}
657 	}
658 }
659 
660 static void
661 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
662 {
663 	struct bdevperf_job	*job = task->job;
664 
665 	task->bdev_io_wait.bdev = job->bdev;
666 	task->bdev_io_wait.cb_fn = cb_fn;
667 	task->bdev_io_wait.cb_arg = task;
668 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
669 }
670 
671 static int
672 bdevperf_job_drain(void *ctx)
673 {
674 	struct bdevperf_job *job = ctx;
675 
676 	spdk_poller_unregister(&job->run_timer);
677 	if (job->reset) {
678 		spdk_poller_unregister(&job->reset_timer);
679 	}
680 
681 	job->is_draining = true;
682 
683 	return -1;
684 }
685 
686 static int
687 bdevperf_job_drain_timer(void *ctx)
688 {
689 	struct bdevperf_job *job = ctx;
690 
691 	bdevperf_job_drain(ctx);
692 	if (job->current_queue_depth == 0) {
693 		bdevperf_job_empty(job);
694 	}
695 
696 	return SPDK_POLLER_BUSY;
697 }
698 
699 static void
700 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
701 {
702 	struct bdevperf_task	*task = cb_arg;
703 	struct bdevperf_job	*job = task->job;
704 
705 	job->current_queue_depth--;
706 
707 	if (success) {
708 		job->io_completed++;
709 	} else {
710 		job->io_failed++;
711 		if (!job->continue_on_failure) {
712 			bdevperf_job_drain(job);
713 			g_run_rc = -1;
714 		}
715 	}
716 
717 	spdk_bdev_free_io(bdev_io);
718 	bdevperf_end_task(task);
719 }
720 
721 static int
722 bdevperf_verify_dif(struct bdevperf_task *task, struct iovec *iovs, int iovcnt)
723 {
724 	struct bdevperf_job	*job = task->job;
725 	struct spdk_bdev	*bdev = job->bdev;
726 	struct spdk_dif_ctx	dif_ctx;
727 	struct spdk_dif_error	err_blk = {};
728 	int			rc;
729 
730 	rc = spdk_dif_ctx_init(&dif_ctx,
731 			       spdk_bdev_get_block_size(bdev),
732 			       spdk_bdev_get_md_size(bdev),
733 			       spdk_bdev_is_md_interleaved(bdev),
734 			       spdk_bdev_is_dif_head_of_md(bdev),
735 			       spdk_bdev_get_dif_type(bdev),
736 			       job->dif_check_flags,
737 			       task->offset_blocks, 0, 0, 0, 0);
738 	if (rc != 0) {
739 		fprintf(stderr, "Initialization of DIF context failed\n");
740 		return rc;
741 	}
742 
743 	if (spdk_bdev_is_md_interleaved(bdev)) {
744 		rc = spdk_dif_verify(iovs, iovcnt, job->io_size_blocks, &dif_ctx, &err_blk);
745 	} else {
746 		struct iovec md_iov = {
747 			.iov_base	= task->md_buf,
748 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
749 		};
750 
751 		rc = spdk_dix_verify(iovs, iovcnt, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
752 	}
753 
754 	if (rc != 0) {
755 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
756 			err_blk.err_type, err_blk.err_offset);
757 	}
758 
759 	return rc;
760 }
761 
762 static void
763 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
764 {
765 	struct bdevperf_job	*job;
766 	struct bdevperf_task	*task = cb_arg;
767 	struct iovec		*iovs;
768 	int			iovcnt;
769 	bool			md_check;
770 	uint64_t		offset_in_ios;
771 	int			rc;
772 
773 	job = task->job;
774 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
775 
776 	if (g_error_to_exit == true) {
777 		bdevperf_job_drain(job);
778 	} else if (!success) {
779 		if (!job->reset && !job->continue_on_failure) {
780 			bdevperf_job_drain(job);
781 			g_run_rc = -1;
782 			g_error_to_exit = true;
783 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
784 			       task->offset_blocks, job->name);
785 		}
786 	} else if (job->verify || job->reset) {
787 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
788 		assert(iovcnt == 1);
789 		assert(iovs != NULL);
790 		if (!verify_data(task->buf, job->buf_size, iovs[0].iov_base, iovs[0].iov_len,
791 				 spdk_bdev_get_block_size(job->bdev),
792 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
793 				 spdk_bdev_get_md_size(job->bdev),
794 				 job->io_size_blocks, md_check)) {
795 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
796 			printf("   First dword expected 0x%x got 0x%x\n", *(int *)task->buf, *(int *)iovs[0].iov_base);
797 			bdevperf_job_drain(job);
798 			g_run_rc = -1;
799 		}
800 	} else if (job->dif_check_flags != 0) {
801 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
802 			spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
803 			assert(iovcnt == 1);
804 			assert(iovs != NULL);
805 			rc = bdevperf_verify_dif(task, iovs, iovcnt);
806 			if (rc != 0) {
807 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
808 				       task->offset_blocks, job->name);
809 
810 				success = false;
811 				if (!job->reset && !job->continue_on_failure) {
812 					bdevperf_job_drain(job);
813 					g_run_rc = -1;
814 					g_error_to_exit = true;
815 				}
816 			}
817 		}
818 	}
819 
820 	job->current_queue_depth--;
821 
822 	if (success) {
823 		job->io_completed++;
824 	} else {
825 		job->io_failed++;
826 	}
827 
828 	if (job->verify) {
829 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
830 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
831 
832 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
833 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
834 	}
835 
836 	spdk_bdev_free_io(bdev_io);
837 
838 	/*
839 	 * is_draining indicates when time has expired for the test run
840 	 * and we are just waiting for the previously submitted I/O
841 	 * to complete.  In this case, do not submit a new I/O to replace
842 	 * the one just completed.
843 	 */
844 	if (!job->is_draining) {
845 		bdevperf_submit_single(job, task);
846 	} else {
847 		bdevperf_end_task(task);
848 	}
849 }
850 
851 static void
852 bdevperf_verify_submit_read(void *cb_arg)
853 {
854 	struct bdevperf_job	*job;
855 	struct bdevperf_task	*task = cb_arg;
856 	int			rc;
857 
858 	job = task->job;
859 
860 	/* Read the data back in */
861 	rc = spdk_bdev_read_blocks_with_md(job->bdev_desc, job->ch, NULL, NULL,
862 					   task->offset_blocks, job->io_size_blocks,
863 					   bdevperf_complete, task);
864 
865 	if (rc == -ENOMEM) {
866 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
867 	} else if (rc != 0) {
868 		printf("Failed to submit read: %d\n", rc);
869 		bdevperf_job_drain(job);
870 		g_run_rc = rc;
871 	}
872 }
873 
874 static void
875 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
876 			       void *cb_arg)
877 {
878 	if (success) {
879 		spdk_bdev_free_io(bdev_io);
880 		bdevperf_verify_submit_read(cb_arg);
881 	} else {
882 		bdevperf_complete(bdev_io, success, cb_arg);
883 	}
884 }
885 
886 static void
887 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
888 {
889 	if (!success) {
890 		bdevperf_complete(bdev_io, success, cb_arg);
891 		return;
892 	}
893 
894 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
895 }
896 
897 static int
898 bdevperf_generate_dif(struct bdevperf_task *task)
899 {
900 	struct bdevperf_job	*job = task->job;
901 	struct spdk_bdev	*bdev = job->bdev;
902 	struct spdk_dif_ctx	dif_ctx;
903 	int			rc;
904 
905 	rc = spdk_dif_ctx_init(&dif_ctx,
906 			       spdk_bdev_get_block_size(bdev),
907 			       spdk_bdev_get_md_size(bdev),
908 			       spdk_bdev_is_md_interleaved(bdev),
909 			       spdk_bdev_is_dif_head_of_md(bdev),
910 			       spdk_bdev_get_dif_type(bdev),
911 			       job->dif_check_flags,
912 			       task->offset_blocks, 0, 0, 0, 0);
913 	if (rc != 0) {
914 		fprintf(stderr, "Initialization of DIF context failed\n");
915 		return rc;
916 	}
917 
918 	if (spdk_bdev_is_md_interleaved(bdev)) {
919 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
920 	} else {
921 		struct iovec md_iov = {
922 			.iov_base	= task->md_buf,
923 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
924 		};
925 
926 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
927 	}
928 
929 	if (rc != 0) {
930 		fprintf(stderr, "Generation of DIF/DIX failed\n");
931 	}
932 
933 	return rc;
934 }
935 
936 static void
937 bdevperf_submit_task(void *arg)
938 {
939 	struct bdevperf_task	*task = arg;
940 	struct bdevperf_job	*job = task->job;
941 	struct spdk_bdev_desc	*desc;
942 	struct spdk_io_channel	*ch;
943 	spdk_bdev_io_completion_cb cb_fn;
944 	uint64_t		offset_in_ios;
945 	int			rc = 0;
946 
947 	desc = job->bdev_desc;
948 	ch = job->ch;
949 
950 	switch (task->io_type) {
951 	case SPDK_BDEV_IO_TYPE_WRITE:
952 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
953 			rc = bdevperf_generate_dif(task);
954 		}
955 		if (rc == 0) {
956 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
957 
958 			if (g_zcopy) {
959 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
960 				return;
961 			} else {
962 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
963 								     task->md_buf,
964 								     task->offset_blocks,
965 								     job->io_size_blocks,
966 								     cb_fn, task);
967 			}
968 		}
969 		break;
970 	case SPDK_BDEV_IO_TYPE_FLUSH:
971 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
972 					    job->io_size_blocks, bdevperf_complete, task);
973 		break;
974 	case SPDK_BDEV_IO_TYPE_UNMAP:
975 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
976 					    job->io_size_blocks, bdevperf_complete, task);
977 		break;
978 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
979 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
980 						   job->io_size_blocks, bdevperf_complete, task);
981 		break;
982 	case SPDK_BDEV_IO_TYPE_READ:
983 		if (g_zcopy) {
984 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
985 						   true, bdevperf_zcopy_populate_complete, task);
986 		} else {
987 			rc = spdk_bdev_read_blocks_with_md(desc, ch, task->buf, task->md_buf,
988 							   task->offset_blocks,
989 							   job->io_size_blocks,
990 							   bdevperf_complete, task);
991 		}
992 		break;
993 	case SPDK_BDEV_IO_TYPE_ABORT:
994 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
995 		break;
996 	default:
997 		assert(false);
998 		rc = -EINVAL;
999 		break;
1000 	}
1001 
1002 	if (rc == -ENOMEM) {
1003 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1004 		return;
1005 	} else if (rc != 0) {
1006 		printf("Failed to submit bdev_io: %d\n", rc);
1007 		if (job->verify) {
1008 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1009 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1010 
1011 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1012 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1013 		}
1014 		bdevperf_job_drain(job);
1015 		g_run_rc = rc;
1016 		return;
1017 	}
1018 
1019 	job->current_queue_depth++;
1020 }
1021 
1022 static void
1023 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1024 {
1025 	struct bdevperf_task	*task = cb_arg;
1026 	struct bdevperf_job	*job = task->job;
1027 	struct iovec		*iovs;
1028 	int			iovcnt;
1029 
1030 	if (!success) {
1031 		bdevperf_job_drain(job);
1032 		g_run_rc = -1;
1033 		return;
1034 	}
1035 
1036 	task->bdev_io = bdev_io;
1037 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1038 
1039 	if (job->verify || job->reset) {
1040 		/* When job->verify or job->reset is enabled, task->buf is used for
1041 		 *  verification of read after write.  For write I/O, when zcopy APIs
1042 		 *  are used, task->buf cannot be used, and data must be written to
1043 		 *  the data buffer allocated underneath bdev layer instead.
1044 		 *  Hence we copy task->buf to the allocated data buffer here.
1045 		 */
1046 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1047 		assert(iovcnt == 1);
1048 		assert(iovs != NULL);
1049 
1050 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1051 			  spdk_bdev_get_block_size(job->bdev),
1052 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1053 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
1054 	}
1055 
1056 	bdevperf_submit_task(task);
1057 }
1058 
1059 static void
1060 bdevperf_prep_zcopy_write_task(void *arg)
1061 {
1062 	struct bdevperf_task	*task = arg;
1063 	struct bdevperf_job	*job = task->job;
1064 	int			rc;
1065 
1066 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1067 				   task->offset_blocks, job->io_size_blocks,
1068 				   false, bdevperf_zcopy_get_buf_complete, task);
1069 	if (rc != 0) {
1070 		assert(rc == -ENOMEM);
1071 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1072 		return;
1073 	}
1074 
1075 	job->current_queue_depth++;
1076 }
1077 
1078 static struct bdevperf_task *
1079 bdevperf_job_get_task(struct bdevperf_job *job)
1080 {
1081 	struct bdevperf_task *task;
1082 
1083 	task = TAILQ_FIRST(&job->task_list);
1084 	if (!task) {
1085 		printf("Task allocation failed\n");
1086 		abort();
1087 	}
1088 
1089 	TAILQ_REMOVE(&job->task_list, task, link);
1090 	return task;
1091 }
1092 
1093 static void
1094 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1095 {
1096 	uint64_t offset_in_ios;
1097 
1098 	if (job->zipf) {
1099 		offset_in_ios = spdk_zipf_generate(job->zipf);
1100 	} else if (job->is_random) {
1101 		offset_in_ios = rand_r(&job->seed) % job->size_in_ios;
1102 	} else {
1103 		offset_in_ios = job->offset_in_ios++;
1104 		if (job->offset_in_ios == job->size_in_ios) {
1105 			job->offset_in_ios = 0;
1106 		}
1107 
1108 		/* Increment of offset_in_ios if there's already an outstanding IO
1109 		 * to that location. We only need this with job->verify as random
1110 		 * offsets are not supported with job->verify at this time.
1111 		 */
1112 		if (job->verify) {
1113 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1114 
1115 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1116 				offset_in_ios = job->offset_in_ios++;
1117 				if (job->offset_in_ios == job->size_in_ios) {
1118 					job->offset_in_ios = 0;
1119 				}
1120 			}
1121 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1122 		}
1123 	}
1124 
1125 	/* For multi-thread to same job, offset_in_ios is relative
1126 	 * to the LBA range assigned for that job. job->offset_blocks
1127 	 * is absolute (entire bdev LBA range).
1128 	 */
1129 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1130 
1131 	if (job->verify || job->reset) {
1132 		generate_data(task->buf, job->buf_size,
1133 			      spdk_bdev_get_block_size(job->bdev),
1134 			      task->md_buf, spdk_bdev_get_md_size(job->bdev),
1135 			      job->io_size_blocks);
1136 		if (g_zcopy) {
1137 			bdevperf_prep_zcopy_write_task(task);
1138 			return;
1139 		} else {
1140 			task->iov.iov_base = task->buf;
1141 			task->iov.iov_len = job->buf_size;
1142 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1143 		}
1144 	} else if (job->flush) {
1145 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1146 	} else if (job->unmap) {
1147 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1148 	} else if (job->write_zeroes) {
1149 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1150 	} else if ((job->rw_percentage == 100) ||
1151 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1152 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1153 	} else {
1154 		if (g_zcopy) {
1155 			bdevperf_prep_zcopy_write_task(task);
1156 			return;
1157 		} else {
1158 			task->iov.iov_base = task->buf;
1159 			task->iov.iov_len = job->buf_size;
1160 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1161 		}
1162 	}
1163 
1164 	bdevperf_submit_task(task);
1165 }
1166 
1167 static int reset_job(void *arg);
1168 
1169 static void
1170 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1171 {
1172 	struct bdevperf_task	*task = cb_arg;
1173 	struct bdevperf_job	*job = task->job;
1174 
1175 	if (!success) {
1176 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1177 		bdevperf_job_drain(job);
1178 		g_run_rc = -1;
1179 	}
1180 
1181 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1182 	spdk_bdev_free_io(bdev_io);
1183 
1184 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1185 						10 * SPDK_SEC_TO_USEC);
1186 }
1187 
1188 static int
1189 reset_job(void *arg)
1190 {
1191 	struct bdevperf_job *job = arg;
1192 	struct bdevperf_task *task;
1193 	int rc;
1194 
1195 	spdk_poller_unregister(&job->reset_timer);
1196 
1197 	/* Do reset. */
1198 	task = bdevperf_job_get_task(job);
1199 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1200 			     reset_cb, task);
1201 	if (rc) {
1202 		printf("Reset failed: %d\n", rc);
1203 		bdevperf_job_drain(job);
1204 		g_run_rc = -1;
1205 	}
1206 
1207 	return -1;
1208 }
1209 
1210 static void
1211 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1212 {
1213 	struct bdevperf_job *job = cb_arg;
1214 	struct bdevperf_task *task;
1215 
1216 	job->io_timeout++;
1217 
1218 	if (job->is_draining || !job->abort ||
1219 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1220 		return;
1221 	}
1222 
1223 	task = bdevperf_job_get_task(job);
1224 	if (task == NULL) {
1225 		return;
1226 	}
1227 
1228 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1229 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1230 
1231 	bdevperf_submit_task(task);
1232 }
1233 
1234 static void
1235 bdevperf_job_run(void *ctx)
1236 {
1237 	struct bdevperf_job *job = ctx;
1238 	struct bdevperf_task *task;
1239 	int i;
1240 
1241 	/* Submit initial I/O for this job. Each time one
1242 	 * completes, another will be submitted. */
1243 
1244 	/* Start a timer to stop this I/O chain when the run is over */
1245 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1246 	if (job->reset) {
1247 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1248 							10 * SPDK_SEC_TO_USEC);
1249 	}
1250 
1251 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1252 
1253 	for (i = 0; i < job->queue_depth; i++) {
1254 		task = bdevperf_job_get_task(job);
1255 		bdevperf_submit_single(job, task);
1256 	}
1257 }
1258 
1259 static void
1260 _performance_dump_done(void *ctx)
1261 {
1262 	struct bdevperf_aggregate_stats *stats = ctx;
1263 	double average_latency;
1264 
1265 	printf("\r =================================================================================="
1266 	       "=================================\n");
1267 	printf("\r %-28s: %10s %10.2f %10.2f",
1268 	       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1269 	printf(" %10.2f %8.2f",
1270 	       stats->total_failed_per_second, stats->total_timeout_per_second);
1271 
1272 	average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1273 			  spdk_get_ticks_hz();
1274 	printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1275 	printf("\n");
1276 
1277 	fflush(stdout);
1278 
1279 	g_performance_dump_active = false;
1280 
1281 	free(stats);
1282 }
1283 
1284 static void
1285 _performance_dump(void *ctx)
1286 {
1287 	struct bdevperf_aggregate_stats *stats = ctx;
1288 
1289 	performance_dump_job(stats, stats->current_job);
1290 
1291 	/* This assumes the jobs list is static after start up time.
1292 	 * That's true right now, but if that ever changed this would need a lock. */
1293 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1294 	if (stats->current_job == NULL) {
1295 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1296 	} else {
1297 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1298 	}
1299 }
1300 
1301 static int
1302 performance_statistics_thread(void *arg)
1303 {
1304 	struct bdevperf_aggregate_stats *stats;
1305 
1306 	if (g_performance_dump_active) {
1307 		return -1;
1308 	}
1309 
1310 	g_performance_dump_active = true;
1311 
1312 	stats = calloc(1, sizeof(*stats));
1313 	if (stats == NULL) {
1314 		return -1;
1315 	}
1316 
1317 	stats->min_latency = (double)UINT64_MAX;
1318 
1319 	g_show_performance_period_num++;
1320 
1321 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1322 	stats->ema_period = g_show_performance_ema_period;
1323 
1324 	/* Iterate all of the jobs to gather stats
1325 	 * These jobs will not get removed here until a final performance dump is run,
1326 	 * so this should be safe without locking.
1327 	 */
1328 	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1329 	if (stats->current_job == NULL) {
1330 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1331 	} else {
1332 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1333 	}
1334 
1335 	return -1;
1336 }
1337 
1338 static void
1339 bdevperf_test(void)
1340 {
1341 	struct bdevperf_job *job;
1342 
1343 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1344 	fflush(stdout);
1345 
1346 	/* Start a timer to dump performance numbers */
1347 	g_start_tsc = spdk_get_ticks();
1348 	if (g_show_performance_real_time && !g_perf_timer) {
1349 		printf("%*s\n", 107, "Latency(us)");
1350 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1351 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1352 
1353 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1354 						    g_show_performance_period_in_usec);
1355 	}
1356 
1357 	/* Iterate jobs to start all I/O */
1358 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1359 		g_bdevperf.running_jobs++;
1360 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1361 	}
1362 }
1363 
1364 static void
1365 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1366 {
1367 	struct bdevperf_job *job = event_ctx;
1368 
1369 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1370 		bdevperf_job_drain(job);
1371 	}
1372 }
1373 
1374 static void
1375 bdevperf_histogram_status_cb(void *cb_arg, int status)
1376 {
1377 	if (status != 0) {
1378 		g_run_rc = status;
1379 		if (g_continue_on_failure == false) {
1380 			g_error_to_exit = true;
1381 		}
1382 	}
1383 
1384 	if (--g_bdev_count == 0) {
1385 		if (g_run_rc == 0) {
1386 			/* Ready to run the test */
1387 			bdevperf_test();
1388 		} else {
1389 			bdevperf_test_done(NULL);
1390 		}
1391 	}
1392 }
1393 
1394 static uint32_t g_construct_job_count = 0;
1395 
1396 static void
1397 _bdevperf_enable_histogram(bool enable)
1398 {
1399 	struct spdk_bdev *bdev;
1400 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1401 	g_bdev_count = 1;
1402 
1403 	if (g_job_bdev_name != NULL) {
1404 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1405 		if (bdev) {
1406 			g_bdev_count++;
1407 
1408 			spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, enable);
1409 		} else {
1410 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1411 		}
1412 	} else {
1413 		bdev = spdk_bdev_first_leaf();
1414 
1415 		while (bdev != NULL) {
1416 			g_bdev_count++;
1417 
1418 			spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, enable);
1419 			bdev = spdk_bdev_next_leaf(bdev);
1420 		}
1421 	}
1422 
1423 	bdevperf_histogram_status_cb(NULL, 0);
1424 }
1425 
1426 static void
1427 _bdevperf_construct_job_done(void *ctx)
1428 {
1429 	if (--g_construct_job_count == 0) {
1430 		if (g_run_rc != 0) {
1431 			/* Something failed. */
1432 			bdevperf_test_done(NULL);
1433 			return;
1434 		}
1435 
1436 		/* always enable histogram. */
1437 		_bdevperf_enable_histogram(true);
1438 	} else if (g_run_rc != 0) {
1439 		/* Reset error as some jobs constructed right */
1440 		g_run_rc = 0;
1441 		if (g_continue_on_failure == false) {
1442 			g_error_to_exit = true;
1443 		}
1444 	}
1445 }
1446 
1447 /* Checkformat will not allow to use inlined type,
1448    this is a workaround */
1449 typedef struct spdk_thread *spdk_thread_t;
1450 
1451 static spdk_thread_t
1452 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1453 {
1454 	struct spdk_cpuset tmp;
1455 
1456 	/* This function runs on the main thread. */
1457 	assert(g_main_thread == spdk_get_thread());
1458 
1459 	/* Handle default mask */
1460 	if (spdk_cpuset_count(cpumask) == 0) {
1461 		cpumask = &g_all_cpuset;
1462 	}
1463 
1464 	/* Warn user that mask might need to be changed */
1465 	spdk_cpuset_copy(&tmp, cpumask);
1466 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1467 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1468 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1469 	}
1470 
1471 	return spdk_thread_create(tag, cpumask);
1472 }
1473 
1474 static uint32_t
1475 _get_next_core(void)
1476 {
1477 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1478 
1479 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1480 		current_core = spdk_env_get_first_core();
1481 		return current_core;
1482 	}
1483 
1484 	current_core = spdk_env_get_next_core(current_core);
1485 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1486 		current_core = spdk_env_get_first_core();
1487 	}
1488 
1489 	return current_core;
1490 }
1491 
1492 static void
1493 _bdevperf_construct_job(void *ctx)
1494 {
1495 	struct bdevperf_job *job = ctx;
1496 	int rc;
1497 
1498 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1499 				&job->bdev_desc);
1500 	if (rc != 0) {
1501 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1502 		g_run_rc = -EINVAL;
1503 		goto end;
1504 	}
1505 
1506 	if (g_zcopy) {
1507 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1508 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1509 			g_run_rc = -ENOTSUP;
1510 			goto end;
1511 		}
1512 	}
1513 
1514 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1515 	if (!job->ch) {
1516 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1517 			    rc);
1518 		spdk_bdev_close(job->bdev_desc);
1519 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1520 		g_run_rc = -ENOMEM;
1521 		goto end;
1522 	}
1523 
1524 end:
1525 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1526 }
1527 
1528 static void
1529 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1530 {
1531 	switch (rw) {
1532 	case JOB_CONFIG_RW_READ:
1533 		job->rw_percentage = 100;
1534 		break;
1535 	case JOB_CONFIG_RW_WRITE:
1536 		job->rw_percentage = 0;
1537 		break;
1538 	case JOB_CONFIG_RW_RANDREAD:
1539 		job->is_random = true;
1540 		job->rw_percentage = 100;
1541 		job->seed = rand();
1542 		break;
1543 	case JOB_CONFIG_RW_RANDWRITE:
1544 		job->is_random = true;
1545 		job->rw_percentage = 0;
1546 		job->seed = rand();
1547 		break;
1548 	case JOB_CONFIG_RW_RW:
1549 		job->is_random = false;
1550 		break;
1551 	case JOB_CONFIG_RW_RANDRW:
1552 		job->is_random = true;
1553 		job->seed = rand();
1554 		break;
1555 	case JOB_CONFIG_RW_VERIFY:
1556 		job->verify = true;
1557 		job->rw_percentage = 50;
1558 		break;
1559 	case JOB_CONFIG_RW_RESET:
1560 		job->reset = true;
1561 		job->verify = true;
1562 		job->rw_percentage = 50;
1563 		break;
1564 	case JOB_CONFIG_RW_UNMAP:
1565 		job->unmap = true;
1566 		break;
1567 	case JOB_CONFIG_RW_FLUSH:
1568 		job->flush = true;
1569 		break;
1570 	case JOB_CONFIG_RW_WRITE_ZEROES:
1571 		job->write_zeroes = true;
1572 		break;
1573 	}
1574 }
1575 
1576 static int
1577 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1578 		       struct spdk_thread *thread)
1579 {
1580 	struct bdevperf_job *job;
1581 	struct bdevperf_task *task;
1582 	int block_size, data_block_size;
1583 	int rc;
1584 	int task_num, n;
1585 
1586 	block_size = spdk_bdev_get_block_size(bdev);
1587 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1588 
1589 	job = calloc(1, sizeof(struct bdevperf_job));
1590 	if (!job) {
1591 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1592 		return -ENOMEM;
1593 	}
1594 
1595 	job->name = strdup(spdk_bdev_get_name(bdev));
1596 	if (!job->name) {
1597 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1598 		bdevperf_job_free(job);
1599 		return -ENOMEM;
1600 	}
1601 
1602 	job->workload_type = g_workload_type;
1603 	job->io_size = config->bs;
1604 	job->rw_percentage = config->rwmixread;
1605 	job->continue_on_failure = g_continue_on_failure;
1606 	job->queue_depth = config->iodepth;
1607 	job->bdev = bdev;
1608 	job->io_size_blocks = job->io_size / data_block_size;
1609 	job->buf_size = job->io_size_blocks * block_size;
1610 	job->abort = g_abort;
1611 	job_init_rw(job, config->rw);
1612 
1613 	if ((job->io_size % data_block_size) != 0) {
1614 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1615 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1616 		bdevperf_job_free(job);
1617 		return -ENOTSUP;
1618 	}
1619 
1620 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1621 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1622 		bdevperf_job_free(job);
1623 		return -ENOTSUP;
1624 	}
1625 
1626 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1627 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1628 	}
1629 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1630 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1631 	}
1632 
1633 	job->offset_in_ios = 0;
1634 
1635 	if (config->length != 0) {
1636 		/* Use subset of disk */
1637 		job->size_in_ios = config->length / job->io_size_blocks;
1638 		job->ios_base = config->offset / job->io_size_blocks;
1639 	} else {
1640 		/* Use whole disk */
1641 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1642 		job->ios_base = 0;
1643 	}
1644 
1645 	if (job->is_random && g_zipf_theta > 0) {
1646 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1647 	}
1648 
1649 	if (job->verify) {
1650 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1651 		if (job->outstanding == NULL) {
1652 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1653 				    spdk_bdev_get_name(bdev));
1654 			bdevperf_job_free(job);
1655 			return -ENOMEM;
1656 		}
1657 		if (job->queue_depth > (int)job->size_in_ios) {
1658 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1659 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1660 				     "Queue depth is limited to %"PRIu64"\n",
1661 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1662 			job->queue_depth = (int)job->size_in_ios;
1663 		}
1664 	}
1665 
1666 	job->histogram = spdk_histogram_data_alloc();
1667 	if (job->histogram == NULL) {
1668 		fprintf(stderr, "Failed to allocate histogram\n");
1669 		bdevperf_job_free(job);
1670 		return -ENOMEM;
1671 	}
1672 
1673 	TAILQ_INIT(&job->task_list);
1674 
1675 	task_num = job->queue_depth;
1676 	if (job->reset) {
1677 		task_num += 1;
1678 	}
1679 	if (job->abort) {
1680 		task_num += job->queue_depth;
1681 	}
1682 
1683 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1684 
1685 	for (n = 0; n < task_num; n++) {
1686 		task = calloc(1, sizeof(struct bdevperf_task));
1687 		if (!task) {
1688 			fprintf(stderr, "Failed to allocate task from memory\n");
1689 			return -ENOMEM;
1690 		}
1691 
1692 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1693 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1694 		if (!task->buf) {
1695 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
1696 			free(task);
1697 			return -ENOMEM;
1698 		}
1699 
1700 		if (spdk_bdev_is_md_separate(job->bdev)) {
1701 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
1702 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
1703 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1704 			if (!task->md_buf) {
1705 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
1706 				spdk_free(task->buf);
1707 				free(task);
1708 				return -ENOMEM;
1709 			}
1710 		}
1711 
1712 		task->job = job;
1713 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
1714 	}
1715 
1716 	job->thread = thread;
1717 
1718 	g_construct_job_count++;
1719 
1720 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
1721 	assert(rc == 0);
1722 
1723 	return rc;
1724 }
1725 
1726 static int
1727 parse_rw(const char *str, enum job_config_rw ret)
1728 {
1729 	if (str == NULL) {
1730 		return ret;
1731 	}
1732 
1733 	if (!strcmp(str, "read")) {
1734 		ret = JOB_CONFIG_RW_READ;
1735 	} else if (!strcmp(str, "randread")) {
1736 		ret = JOB_CONFIG_RW_RANDREAD;
1737 	} else if (!strcmp(str, "write")) {
1738 		ret = JOB_CONFIG_RW_WRITE;
1739 	} else if (!strcmp(str, "randwrite")) {
1740 		ret = JOB_CONFIG_RW_RANDWRITE;
1741 	} else if (!strcmp(str, "verify")) {
1742 		ret = JOB_CONFIG_RW_VERIFY;
1743 	} else if (!strcmp(str, "reset")) {
1744 		ret = JOB_CONFIG_RW_RESET;
1745 	} else if (!strcmp(str, "unmap")) {
1746 		ret = JOB_CONFIG_RW_UNMAP;
1747 	} else if (!strcmp(str, "write_zeroes")) {
1748 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
1749 	} else if (!strcmp(str, "flush")) {
1750 		ret = JOB_CONFIG_RW_FLUSH;
1751 	} else if (!strcmp(str, "rw")) {
1752 		ret = JOB_CONFIG_RW_RW;
1753 	} else if (!strcmp(str, "randrw")) {
1754 		ret = JOB_CONFIG_RW_RANDRW;
1755 	} else {
1756 		fprintf(stderr, "rw must be one of\n"
1757 			"(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
1758 		ret = BDEVPERF_CONFIG_ERROR;
1759 	}
1760 
1761 	return ret;
1762 }
1763 
1764 static const char *
1765 config_filename_next(const char *filename, char *out)
1766 {
1767 	int i, k;
1768 
1769 	if (filename == NULL) {
1770 		out[0] = '\0';
1771 		return NULL;
1772 	}
1773 
1774 	if (filename[0] == ':') {
1775 		filename++;
1776 	}
1777 
1778 	for (i = 0, k = 0;
1779 	     filename[i] != '\0' &&
1780 	     filename[i] != ':' &&
1781 	     i < BDEVPERF_CONFIG_MAX_FILENAME;
1782 	     i++) {
1783 		if (filename[i] == ' ' || filename[i] == '\t') {
1784 			continue;
1785 		}
1786 
1787 		out[k++] = filename[i];
1788 	}
1789 	out[k] = 0;
1790 
1791 	return filename + i;
1792 }
1793 
1794 static void
1795 bdevperf_construct_jobs(void)
1796 {
1797 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
1798 	struct spdk_thread *thread;
1799 	struct job_config *config;
1800 	struct spdk_bdev *bdev;
1801 	const char *filenames;
1802 	int rc;
1803 
1804 	TAILQ_FOREACH(config, &job_config_list, link) {
1805 		filenames = config->filename;
1806 
1807 		thread = construct_job_thread(&config->cpumask, config->name);
1808 		assert(thread);
1809 
1810 		while (filenames) {
1811 			filenames = config_filename_next(filenames, filename);
1812 			if (strlen(filename) == 0) {
1813 				break;
1814 			}
1815 
1816 			bdev = spdk_bdev_get_by_name(filename);
1817 			if (!bdev) {
1818 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
1819 				g_run_rc = -EINVAL;
1820 				return;
1821 			}
1822 
1823 			rc = bdevperf_construct_job(bdev, config, thread);
1824 			if (rc < 0) {
1825 				g_run_rc = rc;
1826 				return;
1827 			}
1828 		}
1829 	}
1830 }
1831 
1832 static int
1833 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
1834 {
1835 	struct job_config *config = calloc(1, sizeof(*config));
1836 
1837 	if (config == NULL) {
1838 		fprintf(stderr, "Unable to allocate memory for job config\n");
1839 		return -ENOMEM;
1840 	}
1841 
1842 	config->name = filename;
1843 	config->filename = filename;
1844 	spdk_cpuset_zero(&config->cpumask);
1845 	spdk_cpuset_set_cpu(&config->cpumask, _get_next_core(), true);
1846 	config->bs = g_io_size;
1847 	config->iodepth = g_queue_depth;
1848 	config->rwmixread = g_rw_percentage;
1849 	config->offset = offset;
1850 	config->length = range;
1851 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
1852 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
1853 		free(config);
1854 		return -EINVAL;
1855 	}
1856 
1857 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
1858 	return 0;
1859 }
1860 
1861 static void
1862 bdevperf_construct_multithread_job_configs(void)
1863 {
1864 	struct spdk_bdev *bdev;
1865 	uint32_t i;
1866 	uint32_t num_cores;
1867 	uint64_t blocks_per_job;
1868 	int64_t offset;
1869 
1870 	num_cores = 0;
1871 	SPDK_ENV_FOREACH_CORE(i) {
1872 		num_cores++;
1873 	}
1874 
1875 	if (num_cores == 0) {
1876 		g_run_rc = -EINVAL;
1877 		return;
1878 	}
1879 
1880 	if (g_job_bdev_name != NULL) {
1881 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1882 		if (!bdev) {
1883 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1884 			return;
1885 		}
1886 
1887 		blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
1888 		offset = 0;
1889 
1890 		SPDK_ENV_FOREACH_CORE(i) {
1891 			g_run_rc = make_cli_job_config(g_job_bdev_name, offset, blocks_per_job);
1892 			if (g_run_rc) {
1893 				return;
1894 			}
1895 
1896 			offset += blocks_per_job;
1897 		}
1898 	} else {
1899 		bdev = spdk_bdev_first_leaf();
1900 		while (bdev != NULL) {
1901 			blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
1902 			offset = 0;
1903 
1904 			SPDK_ENV_FOREACH_CORE(i) {
1905 				g_run_rc = make_cli_job_config(spdk_bdev_get_name(bdev),
1906 							       offset, blocks_per_job);
1907 				if (g_run_rc) {
1908 					return;
1909 				}
1910 
1911 				offset += blocks_per_job;
1912 			}
1913 
1914 			bdev = spdk_bdev_next_leaf(bdev);
1915 		}
1916 	}
1917 }
1918 
1919 static void
1920 bdevperf_construct_job_configs(void)
1921 {
1922 	struct spdk_bdev *bdev;
1923 
1924 	/* There are three different modes for allocating jobs. Standard mode
1925 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
1926 	 *
1927 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
1928 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
1929 	 * This runs multiple threads per bdev, effectively.
1930 	 *
1931 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
1932 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
1933 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
1934 	 * which is different from other modes in that they only support global options.
1935 	 */
1936 
1937 	if (g_bdevperf_conf) {
1938 		goto end;
1939 	} else if (g_multithread_mode) {
1940 		bdevperf_construct_multithread_job_configs();
1941 		goto end;
1942 	}
1943 
1944 	if (g_job_bdev_name != NULL) {
1945 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1946 		if (bdev) {
1947 			/* Construct the job */
1948 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
1949 		} else {
1950 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1951 		}
1952 	} else {
1953 		bdev = spdk_bdev_first_leaf();
1954 
1955 		while (bdev != NULL) {
1956 			/* Construct the job */
1957 			g_run_rc = make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
1958 			if (g_run_rc) {
1959 				break;
1960 			}
1961 
1962 			bdev = spdk_bdev_next_leaf(bdev);
1963 		}
1964 	}
1965 
1966 end:
1967 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
1968 	 * of iteration.
1969 	 */
1970 	g_construct_job_count = 1;
1971 
1972 	if (g_run_rc == 0) {
1973 		bdevperf_construct_jobs();
1974 	}
1975 
1976 	_bdevperf_construct_job_done(NULL);
1977 }
1978 
1979 static int
1980 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
1981 {
1982 	const char *job_name;
1983 	int tmp;
1984 
1985 	tmp = spdk_conf_section_get_intval(s, name);
1986 	if (tmp == -1) {
1987 		/* Field was not found. Check default value
1988 		 * In [global] section it is ok to have undefined values
1989 		 * but for other sections it is not ok */
1990 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
1991 			job_name = spdk_conf_section_get_name(s);
1992 			if (strcmp(job_name, "global") == 0) {
1993 				return def;
1994 			}
1995 
1996 			fprintf(stderr,
1997 				"Job '%s' has no '%s' assigned\n",
1998 				job_name, name);
1999 			return BDEVPERF_CONFIG_ERROR;
2000 		}
2001 		return def;
2002 	}
2003 
2004 	/* NOTE: get_intval returns nonnegative on success */
2005 	if (tmp < 0) {
2006 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2007 			spdk_conf_section_get_name(s), name);
2008 		return BDEVPERF_CONFIG_ERROR;
2009 	}
2010 
2011 	return tmp;
2012 }
2013 
2014 /* CLI arguments override parameters for global sections */
2015 static void
2016 config_set_cli_args(struct job_config *config)
2017 {
2018 	if (g_job_bdev_name) {
2019 		config->filename = g_job_bdev_name;
2020 	}
2021 	if (g_io_size > 0) {
2022 		config->bs = g_io_size;
2023 	}
2024 	if (g_queue_depth > 0) {
2025 		config->iodepth = g_queue_depth;
2026 	}
2027 	if (g_rw_percentage > 0) {
2028 		config->rwmixread = g_rw_percentage;
2029 	}
2030 	if (g_workload_type) {
2031 		config->rw = parse_rw(g_workload_type, config->rw);
2032 	}
2033 }
2034 
2035 static int
2036 read_job_config(void)
2037 {
2038 	struct job_config global_default_config;
2039 	struct job_config global_config;
2040 	struct spdk_conf_section *s;
2041 	struct job_config *config;
2042 	const char *cpumask;
2043 	const char *rw;
2044 	bool is_global;
2045 	int n = 0;
2046 	int val;
2047 
2048 	if (g_bdevperf_conf_file == NULL) {
2049 		return 0;
2050 	}
2051 
2052 	g_bdevperf_conf = spdk_conf_allocate();
2053 	if (g_bdevperf_conf == NULL) {
2054 		fprintf(stderr, "Could not allocate job config structure\n");
2055 		return 1;
2056 	}
2057 
2058 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2059 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2060 		fprintf(stderr, "Invalid job config");
2061 		return 1;
2062 	}
2063 
2064 	/* Initialize global defaults */
2065 	global_default_config.filename = NULL;
2066 	/* Zero mask is the same as g_all_cpuset
2067 	 * The g_all_cpuset is not initialized yet,
2068 	 * so use zero mask as the default instead */
2069 	spdk_cpuset_zero(&global_default_config.cpumask);
2070 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2071 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2072 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2073 	global_default_config.rwmixread = 50;
2074 	global_default_config.offset = 0;
2075 	/* length 0 means 100% */
2076 	global_default_config.length = 0;
2077 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2078 	config_set_cli_args(&global_default_config);
2079 
2080 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2081 		return 1;
2082 	}
2083 
2084 	/* There is only a single instance of global job_config
2085 	 * We just reset its value when we encounter new [global] section */
2086 	global_config = global_default_config;
2087 
2088 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2089 	     s != NULL;
2090 	     s = spdk_conf_next_section(s)) {
2091 		config = calloc(1, sizeof(*config));
2092 		if (config == NULL) {
2093 			fprintf(stderr, "Unable to allocate memory for job config\n");
2094 			return 1;
2095 		}
2096 
2097 		config->name = spdk_conf_section_get_name(s);
2098 		is_global = strcmp(config->name, "global") == 0;
2099 
2100 		if (is_global) {
2101 			global_config = global_default_config;
2102 		}
2103 
2104 		config->filename = spdk_conf_section_get_val(s, "filename");
2105 		if (config->filename == NULL) {
2106 			config->filename = global_config.filename;
2107 		}
2108 		if (!is_global) {
2109 			if (config->filename == NULL) {
2110 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2111 				goto error;
2112 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2113 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2114 				fprintf(stderr,
2115 					"filename for '%s' job is too long. Max length is %d\n",
2116 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2117 				goto error;
2118 			}
2119 		}
2120 
2121 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2122 		if (cpumask == NULL) {
2123 			config->cpumask = global_config.cpumask;
2124 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2125 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2126 			goto error;
2127 		}
2128 
2129 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2130 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2131 			goto error;
2132 		} else if (config->bs == 0) {
2133 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2134 			goto error;
2135 		}
2136 
2137 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2138 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2139 			goto error;
2140 		} else if (config->iodepth == 0) {
2141 			fprintf(stderr,
2142 				"'iodepth' of job '%s' must be greater than 0\n",
2143 				config->name);
2144 			goto error;
2145 		}
2146 
2147 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2148 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2149 			goto error;
2150 		} else if (config->rwmixread > 100) {
2151 			fprintf(stderr,
2152 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2153 				config->name);
2154 			goto error;
2155 		}
2156 
2157 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2158 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2159 			goto error;
2160 		}
2161 
2162 		val = parse_uint_option(s, "length", global_config.length);
2163 		if (val == BDEVPERF_CONFIG_ERROR) {
2164 			goto error;
2165 		}
2166 		config->length = val;
2167 
2168 		rw = spdk_conf_section_get_val(s, "rw");
2169 		config->rw = parse_rw(rw, global_config.rw);
2170 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2171 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2172 			goto error;
2173 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2174 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2175 			goto error;
2176 		}
2177 
2178 		if (is_global) {
2179 			config_set_cli_args(config);
2180 			global_config = *config;
2181 			free(config);
2182 		} else {
2183 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2184 			n++;
2185 		}
2186 	}
2187 
2188 	printf("Using job config with %d jobs\n", n);
2189 	return 0;
2190 error:
2191 	free(config);
2192 	return 1;
2193 }
2194 
2195 static void
2196 bdevperf_run(void *arg1)
2197 {
2198 	uint32_t i;
2199 
2200 	g_main_thread = spdk_get_thread();
2201 
2202 	spdk_cpuset_zero(&g_all_cpuset);
2203 	SPDK_ENV_FOREACH_CORE(i) {
2204 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2205 	}
2206 
2207 	if (g_wait_for_tests) {
2208 		/* Do not perform any tests until RPC is received */
2209 		return;
2210 	}
2211 
2212 	bdevperf_construct_job_configs();
2213 }
2214 
2215 static void
2216 rpc_perform_tests_reset(void)
2217 {
2218 	/* Reset g_run_rc to 0 for the next test run. */
2219 	g_run_rc = 0;
2220 
2221 	/* Reset g_stats to 0 for the next test run. */
2222 	memset(&g_stats, 0, sizeof(g_stats));
2223 
2224 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2225 	g_show_performance_period_num = 0;
2226 }
2227 
2228 static void
2229 rpc_perform_tests_cb(void)
2230 {
2231 	struct spdk_json_write_ctx *w;
2232 	struct spdk_jsonrpc_request *request = g_request;
2233 
2234 	g_request = NULL;
2235 
2236 	if (g_run_rc == 0) {
2237 		w = spdk_jsonrpc_begin_result(request);
2238 		spdk_json_write_uint32(w, g_run_rc);
2239 		spdk_jsonrpc_end_result(request, w);
2240 	} else {
2241 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2242 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2243 	}
2244 
2245 	rpc_perform_tests_reset();
2246 }
2247 
2248 static void
2249 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2250 {
2251 	if (params != NULL) {
2252 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
2253 						 "perform_tests method requires no parameters");
2254 		return;
2255 	}
2256 	if (g_request != NULL) {
2257 		fprintf(stderr, "Another test is already in progress.\n");
2258 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2259 						 spdk_strerror(-EINPROGRESS));
2260 		return;
2261 	}
2262 	g_request = request;
2263 
2264 	/* Only construct job configs at the first test run.  */
2265 	if (TAILQ_EMPTY(&job_config_list)) {
2266 		bdevperf_construct_job_configs();
2267 	} else {
2268 		bdevperf_construct_jobs();
2269 	}
2270 }
2271 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2272 
2273 static void
2274 _bdevperf_job_drain(void *ctx)
2275 {
2276 	bdevperf_job_drain(ctx);
2277 }
2278 
2279 static void
2280 spdk_bdevperf_shutdown_cb(void)
2281 {
2282 	g_shutdown = true;
2283 	struct bdevperf_job *job, *tmp;
2284 
2285 	if (g_bdevperf.running_jobs == 0) {
2286 		bdevperf_test_done(NULL);
2287 		return;
2288 	}
2289 
2290 	/* Iterate jobs to stop all I/O */
2291 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2292 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2293 	}
2294 }
2295 
2296 static int
2297 bdevperf_parse_arg(int ch, char *arg)
2298 {
2299 	long long tmp;
2300 
2301 	if (ch == 'w') {
2302 		g_workload_type = optarg;
2303 	} else if (ch == 'T') {
2304 		g_job_bdev_name = optarg;
2305 	} else if (ch == 'z') {
2306 		g_wait_for_tests = true;
2307 	} else if (ch == 'Z') {
2308 		g_zcopy = true;
2309 	} else if (ch == 'X') {
2310 		g_abort = true;
2311 	} else if (ch == 'C') {
2312 		g_multithread_mode = true;
2313 	} else if (ch == 'f') {
2314 		g_continue_on_failure = true;
2315 	} else if (ch == 'j') {
2316 		g_bdevperf_conf_file = optarg;
2317 	} else if (ch == 'F') {
2318 		char *endptr;
2319 
2320 		errno = 0;
2321 		g_zipf_theta = strtod(optarg, &endptr);
2322 		if (errno || optarg == endptr || g_zipf_theta < 0) {
2323 			fprintf(stderr, "Illegal zipf theta value %s\n", optarg);
2324 			return -EINVAL;
2325 		}
2326 	} else if (ch == 'l') {
2327 		g_latency_display_level++;
2328 	} else {
2329 		tmp = spdk_strtoll(optarg, 10);
2330 		if (tmp < 0) {
2331 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2332 			return tmp;
2333 		} else if (tmp >= INT_MAX) {
2334 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2335 			return -ERANGE;
2336 		}
2337 
2338 		switch (ch) {
2339 		case 'q':
2340 			g_queue_depth = tmp;
2341 			break;
2342 		case 'o':
2343 			g_io_size = tmp;
2344 			break;
2345 		case 't':
2346 			g_time_in_sec = tmp;
2347 			break;
2348 		case 'k':
2349 			g_timeout_in_sec = tmp;
2350 			break;
2351 		case 'M':
2352 			g_rw_percentage = tmp;
2353 			g_mix_specified = true;
2354 			break;
2355 		case 'P':
2356 			g_show_performance_ema_period = tmp;
2357 			break;
2358 		case 'S':
2359 			g_show_performance_real_time = 1;
2360 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2361 			break;
2362 		default:
2363 			return -EINVAL;
2364 		}
2365 	}
2366 	return 0;
2367 }
2368 
2369 static void
2370 bdevperf_usage(void)
2371 {
2372 	printf(" -q <depth>                io depth\n");
2373 	printf(" -o <size>                 io size in bytes\n");
2374 	printf(" -w <type>                 io pattern type, must be one of (read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
2375 	printf(" -t <time>                 time in seconds\n");
2376 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2377 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2378 	printf(" -P <num>                  number of moving average period\n");
2379 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2380 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2381 	printf("\t\t(only valid with -S)\n");
2382 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2383 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2384 	printf(" -f                        continue processing I/O even after failures\n");
2385 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2386 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2387 	printf(" -z                        start bdevperf, but wait for RPC to start tests\n");
2388 	printf(" -X                        abort timed out I/O\n");
2389 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2390 	printf(" -j <filename>             use job config file\n");
2391 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2392 }
2393 
2394 static int
2395 verify_test_params(struct spdk_app_opts *opts)
2396 {
2397 	/* When RPC is used for starting tests and
2398 	 * no rpc_addr was configured for the app,
2399 	 * use the default address. */
2400 	if (g_wait_for_tests && opts->rpc_addr == NULL) {
2401 		opts->rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2402 	}
2403 
2404 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2405 		goto out;
2406 	}
2407 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2408 		goto out;
2409 	}
2410 	if (!g_bdevperf_conf_file && !g_workload_type) {
2411 		goto out;
2412 	}
2413 	if (g_time_in_sec <= 0) {
2414 		goto out;
2415 	}
2416 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2417 
2418 	if (g_timeout_in_sec < 0) {
2419 		goto out;
2420 	}
2421 
2422 	if (g_abort && !g_timeout_in_sec) {
2423 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2424 	}
2425 
2426 	if (g_show_performance_ema_period > 0 &&
2427 	    g_show_performance_real_time == 0) {
2428 		fprintf(stderr, "-P option must be specified with -S option\n");
2429 		return 1;
2430 	}
2431 
2432 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2433 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2434 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2435 		printf("Zero copy mechanism will not be used.\n");
2436 		g_zcopy = false;
2437 	}
2438 
2439 	if (g_bdevperf_conf_file) {
2440 		/* workload_type verification happens during config file parsing */
2441 		return 0;
2442 	}
2443 
2444 	if (!strcmp(g_workload_type, "verify") ||
2445 	    !strcmp(g_workload_type, "reset")) {
2446 		g_rw_percentage = 50;
2447 		if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2448 			fprintf(stderr, "Unable to exceed max I/O size of %d for verify. (%d provided).\n",
2449 				SPDK_BDEV_LARGE_BUF_MAX_SIZE, g_io_size);
2450 			return 1;
2451 		}
2452 		g_verify = true;
2453 		if (!strcmp(g_workload_type, "reset")) {
2454 			g_reset = true;
2455 		}
2456 	}
2457 
2458 	if (!strcmp(g_workload_type, "read") ||
2459 	    !strcmp(g_workload_type, "randread") ||
2460 	    !strcmp(g_workload_type, "write") ||
2461 	    !strcmp(g_workload_type, "randwrite") ||
2462 	    !strcmp(g_workload_type, "verify") ||
2463 	    !strcmp(g_workload_type, "reset") ||
2464 	    !strcmp(g_workload_type, "unmap") ||
2465 	    !strcmp(g_workload_type, "write_zeroes") ||
2466 	    !strcmp(g_workload_type, "flush")) {
2467 		if (g_mix_specified) {
2468 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2469 				" only when using rw or randrw.\n");
2470 		}
2471 	}
2472 
2473 	if (!strcmp(g_workload_type, "rw") ||
2474 	    !strcmp(g_workload_type, "randrw")) {
2475 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2476 			fprintf(stderr,
2477 				"-M must be specified to value from 0 to 100 "
2478 				"for rw or randrw.\n");
2479 			return 1;
2480 		}
2481 	}
2482 
2483 	return 0;
2484 out:
2485 	spdk_app_usage();
2486 	bdevperf_usage();
2487 	return 1;
2488 }
2489 
2490 int
2491 main(int argc, char **argv)
2492 {
2493 	struct spdk_app_opts opts = {};
2494 	int rc;
2495 
2496 	/* Use the runtime PID to set the random seed */
2497 	srand(getpid());
2498 
2499 	spdk_app_opts_init(&opts, sizeof(opts));
2500 	opts.name = "bdevperf";
2501 	opts.rpc_addr = NULL;
2502 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2503 
2504 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CF:M:P:S:T:Xlj:", NULL,
2505 				      bdevperf_parse_arg, bdevperf_usage)) !=
2506 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2507 		return rc;
2508 	}
2509 
2510 	if (read_job_config()) {
2511 		free_job_config();
2512 		return 1;
2513 	}
2514 
2515 	if (verify_test_params(&opts) != 0) {
2516 		free_job_config();
2517 		exit(1);
2518 	}
2519 
2520 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
2521 
2522 	spdk_app_fini();
2523 	free_job_config();
2524 	return rc;
2525 }
2526