xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision c2c1a76766891540d4e7c9053f6d8e0af31043cb)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 #define PATTERN_TYPES_STR "(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush, write_zeroes)"
28 
29 struct bdevperf_task {
30 	struct iovec			iov;
31 	struct bdevperf_job		*job;
32 	struct spdk_bdev_io		*bdev_io;
33 	void				*buf;
34 	void				*verify_buf;
35 	void				*md_buf;
36 	uint64_t			offset_blocks;
37 	struct bdevperf_task		*task_to_abort;
38 	enum spdk_bdev_io_type		io_type;
39 	TAILQ_ENTRY(bdevperf_task)	link;
40 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
41 };
42 
43 static char *g_workload_type = NULL;
44 static int g_io_size = 0;
45 /* initialize to invalid value so we can detect if user overrides it. */
46 static int g_rw_percentage = -1;
47 static bool g_verify = false;
48 static bool g_reset = false;
49 static bool g_continue_on_failure = false;
50 static bool g_abort = false;
51 static bool g_error_to_exit = false;
52 static int g_queue_depth = 0;
53 static uint64_t g_time_in_usec;
54 static bool g_summarize_performance = true;
55 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
56 static uint64_t g_show_performance_period_num = 0;
57 static uint64_t g_show_performance_ema_period = 0;
58 static int g_run_rc = 0;
59 static bool g_shutdown = false;
60 static uint64_t g_start_tsc;
61 static uint64_t g_shutdown_tsc;
62 static bool g_zcopy = false;
63 static struct spdk_thread *g_main_thread;
64 static int g_time_in_sec = 0;
65 static bool g_mix_specified = false;
66 static const char *g_job_bdev_name;
67 static bool g_wait_for_tests = false;
68 static struct spdk_jsonrpc_request *g_request = NULL;
69 static bool g_multithread_mode = false;
70 static int g_timeout_in_sec;
71 static struct spdk_conf *g_bdevperf_conf = NULL;
72 static const char *g_bdevperf_conf_file = NULL;
73 static double g_zipf_theta;
74 static bool g_random_map = false;
75 static bool g_unique_writes = false;
76 
77 static struct spdk_cpuset g_all_cpuset;
78 static struct spdk_poller *g_perf_timer = NULL;
79 
80 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
81 static void rpc_perform_tests_cb(void);
82 static int bdevperf_parse_arg(int ch, char *arg);
83 static int verify_test_params(void);
84 static void bdevperf_usage(void);
85 
86 static uint32_t g_bdev_count = 0;
87 static uint32_t g_latency_display_level;
88 
89 static bool g_one_thread_per_lcore = false;
90 
91 static const double g_latency_cutoffs[] = {
92 	0.01,
93 	0.10,
94 	0.25,
95 	0.50,
96 	0.75,
97 	0.90,
98 	0.95,
99 	0.98,
100 	0.99,
101 	0.995,
102 	0.999,
103 	0.9999,
104 	0.99999,
105 	0.999999,
106 	0.9999999,
107 	-1,
108 };
109 
110 static const char *g_rpc_log_file_name = NULL;
111 static FILE *g_rpc_log_file = NULL;
112 
113 struct latency_info {
114 	uint64_t	min;
115 	uint64_t	max;
116 	uint64_t	total;
117 };
118 
119 
120 enum job_config_rw {
121 	JOB_CONFIG_RW_READ = 0,
122 	JOB_CONFIG_RW_WRITE,
123 	JOB_CONFIG_RW_RANDREAD,
124 	JOB_CONFIG_RW_RANDWRITE,
125 	JOB_CONFIG_RW_RW,
126 	JOB_CONFIG_RW_RANDRW,
127 	JOB_CONFIG_RW_VERIFY,
128 	JOB_CONFIG_RW_RESET,
129 	JOB_CONFIG_RW_UNMAP,
130 	JOB_CONFIG_RW_FLUSH,
131 	JOB_CONFIG_RW_WRITE_ZEROES,
132 };
133 
134 struct bdevperf_job {
135 	char				*name;
136 	struct spdk_bdev		*bdev;
137 	struct spdk_bdev_desc		*bdev_desc;
138 	struct spdk_io_channel		*ch;
139 	TAILQ_ENTRY(bdevperf_job)	link;
140 	struct spdk_thread		*thread;
141 
142 	enum job_config_rw		workload_type;
143 	int				io_size;
144 	int				rw_percentage;
145 	bool				is_random;
146 	bool				verify;
147 	bool				reset;
148 	bool				continue_on_failure;
149 	bool				unmap;
150 	bool				write_zeroes;
151 	bool				flush;
152 	bool				abort;
153 	int				queue_depth;
154 	unsigned int			seed;
155 
156 	uint64_t			io_completed;
157 	uint64_t			io_failed;
158 	uint64_t			io_timeout;
159 	uint64_t			prev_io_completed;
160 	double				ema_io_per_second;
161 	int				current_queue_depth;
162 	uint64_t			size_in_ios;
163 	uint64_t			ios_base;
164 	uint64_t			offset_in_ios;
165 	uint64_t			io_size_blocks;
166 	uint64_t			buf_size;
167 	uint32_t			dif_check_flags;
168 	bool				is_draining;
169 	struct spdk_poller		*run_timer;
170 	struct spdk_poller		*reset_timer;
171 	struct spdk_bit_array		*outstanding;
172 	struct spdk_zipf		*zipf;
173 	TAILQ_HEAD(, bdevperf_task)	task_list;
174 	uint64_t			run_time_in_usec;
175 
176 	/* keep channel's histogram data before being destroyed */
177 	struct spdk_histogram_data	*histogram;
178 	struct spdk_bit_array		*random_map;
179 
180 	/* counter used for generating unique write data (-U option) */
181 	uint32_t			write_io_count;
182 };
183 
184 struct spdk_bdevperf {
185 	TAILQ_HEAD(, bdevperf_job)	jobs;
186 	uint32_t			running_jobs;
187 };
188 
189 static struct spdk_bdevperf g_bdevperf = {
190 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
191 	.running_jobs = 0,
192 };
193 
194 /* Storing values from a section of job config file */
195 struct job_config {
196 	const char			*name;
197 	const char			*filename;
198 	struct spdk_cpuset		cpumask;
199 	int				bs;
200 	int				iodepth;
201 	int				rwmixread;
202 	uint32_t			lcore;
203 	int64_t				offset;
204 	uint64_t			length;
205 	enum job_config_rw		rw;
206 	TAILQ_ENTRY(job_config)	link;
207 };
208 
209 TAILQ_HEAD(, job_config) job_config_list
210 	= TAILQ_HEAD_INITIALIZER(job_config_list);
211 
212 static bool g_performance_dump_active = false;
213 
214 struct bdevperf_aggregate_stats {
215 	struct bdevperf_job		*current_job;
216 	uint64_t			io_time_in_usec;
217 	uint64_t			ema_period;
218 	double				total_io_per_second;
219 	double				total_mb_per_second;
220 	double				total_failed_per_second;
221 	double				total_timeout_per_second;
222 	double				min_latency;
223 	double				max_latency;
224 	uint64_t			total_io_completed;
225 	uint64_t			total_tsc;
226 };
227 
228 static struct bdevperf_aggregate_stats g_stats = {.min_latency = (double)UINT64_MAX};
229 
230 struct lcore_thread {
231 	struct spdk_thread		*thread;
232 	uint32_t			lcore;
233 	TAILQ_ENTRY(lcore_thread)	link;
234 };
235 
236 TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
237 	= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
238 
239 
240 static char *
241 parse_workload_type(enum job_config_rw ret)
242 {
243 	switch (ret) {
244 	case JOB_CONFIG_RW_READ:
245 		return "read";
246 	case JOB_CONFIG_RW_RANDREAD:
247 		return "randread";
248 	case JOB_CONFIG_RW_WRITE:
249 		return "write";
250 	case JOB_CONFIG_RW_RANDWRITE:
251 		return "randwrite";
252 	case JOB_CONFIG_RW_VERIFY:
253 		return "verify";
254 	case JOB_CONFIG_RW_RESET:
255 		return "reset";
256 	case JOB_CONFIG_RW_UNMAP:
257 		return "unmap";
258 	case JOB_CONFIG_RW_WRITE_ZEROES:
259 		return "write_zeroes";
260 	case JOB_CONFIG_RW_FLUSH:
261 		return "flush";
262 	case JOB_CONFIG_RW_RW:
263 		return "rw";
264 	case JOB_CONFIG_RW_RANDRW:
265 		return "randrw";
266 	default:
267 		fprintf(stderr, "wrong workload_type code\n");
268 	}
269 
270 	return NULL;
271 }
272 
273 /*
274  * Cumulative Moving Average (CMA): average of all data up to current
275  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
276  * Simple Moving Average (SMA): unweighted mean of the previous n data
277  *
278  * Bdevperf supports CMA and EMA.
279  */
280 static double
281 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
282 {
283 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
284 }
285 
286 static double
287 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
288 {
289 	double io_completed, io_per_second;
290 
291 	io_completed = job->io_completed;
292 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
293 			/ g_show_performance_period_in_usec;
294 	job->prev_io_completed = io_completed;
295 
296 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
297 				  / (ema_period + 1);
298 	return job->ema_io_per_second;
299 }
300 
301 static void
302 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
303 		uint64_t total, uint64_t so_far)
304 {
305 	struct latency_info *latency_info = ctx;
306 
307 	if (count == 0) {
308 		return;
309 	}
310 
311 	latency_info->total += (start + end) / 2 * count;
312 
313 	if (so_far == count) {
314 		latency_info->min = start;
315 	}
316 
317 	if (so_far == total) {
318 		latency_info->max = end;
319 	}
320 }
321 
322 static void
323 performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job *job,
324 		     bool print_job_info)
325 {
326 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
327 	double average_latency = 0.0, min_latency, max_latency;
328 	uint64_t time_in_usec;
329 	uint64_t tsc_rate;
330 	uint64_t total_io;
331 	struct latency_info latency_info = {};
332 
333 	if (g_performance_dump_active == true) {
334 		/* Use job's actual run time as Job has ended */
335 		if (job->io_failed > 0 && !job->continue_on_failure) {
336 			time_in_usec = job->run_time_in_usec;
337 		} else {
338 			time_in_usec = stats->io_time_in_usec;
339 		}
340 	} else {
341 		time_in_usec = job->run_time_in_usec;
342 	}
343 
344 	if (stats->ema_period == 0) {
345 		io_per_second = get_cma_io_per_second(job, time_in_usec);
346 	} else {
347 		io_per_second = get_ema_io_per_second(job, stats->ema_period);
348 	}
349 
350 	tsc_rate = spdk_get_ticks_hz();
351 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
352 
353 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
354 
355 	total_io = job->io_completed + job->io_failed;
356 	if (total_io != 0) {
357 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
358 	}
359 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
360 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
361 
362 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
363 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
364 
365 	if (print_job_info) {
366 		if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
367 			printf("Job: %s (Core Mask 0x%s, workload: %s, percentage: %d, depth: %d, IO size: %d)\n",
368 			       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
369 			       parse_workload_type(job->workload_type), job->rw_percentage,
370 			       job->queue_depth, job->io_size);
371 		} else {
372 			printf("Job: %s (Core Mask 0x%s, workload: %s, depth: %d, IO size: %d)\n",
373 			       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
374 			       parse_workload_type(job->workload_type), job->queue_depth, job->io_size);
375 		}
376 
377 
378 		if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
379 			printf("Job: %s ended in about %.2f seconds with error\n",
380 			       job->name, (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
381 		}
382 		if (job->verify) {
383 			printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
384 			       job->ios_base, job->size_in_ios);
385 		}
386 
387 		printf("\t %-20s: %10.2f %10.2f %10.2f",
388 		       job->name, (float)time_in_usec / SPDK_SEC_TO_USEC, io_per_second, mb_per_second);
389 		printf(" %10.2f %8.2f",
390 		       failed_per_second, timeout_per_second);
391 		printf(" %10.2f %10.2f %10.2f\n",
392 		       average_latency, min_latency, max_latency);
393 	}
394 
395 	stats->total_io_per_second += io_per_second;
396 	stats->total_mb_per_second += mb_per_second;
397 	stats->total_failed_per_second += failed_per_second;
398 	stats->total_timeout_per_second += timeout_per_second;
399 	stats->total_io_completed += job->io_completed + job->io_failed;
400 	stats->total_tsc += latency_info.total;
401 	if (min_latency < stats->min_latency) {
402 		stats->min_latency = min_latency;
403 	}
404 	if (max_latency > stats->max_latency) {
405 		stats->max_latency = max_latency;
406 	}
407 }
408 
409 static void
410 generate_data(struct bdevperf_job *job, void *buf, void *md_buf, bool unique)
411 {
412 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
413 	int buf_len = job->buf_size;
414 	int block_size = spdk_bdev_get_block_size(job->bdev);
415 	int md_size = spdk_bdev_get_md_size(job->bdev);
416 	int num_blocks = job->io_size_blocks;
417 
418 	if (buf_len < num_blocks * block_size) {
419 		return;
420 	}
421 
422 	if (md_buf == NULL) {
423 		data_block_size = block_size - md_size;
424 		md_buf = (char *)buf + data_block_size;
425 		md_offset = block_size;
426 	} else {
427 		data_block_size = block_size;
428 		md_offset = md_size;
429 	}
430 
431 	if (unique) {
432 		uint64_t io_count = job->write_io_count++;
433 		unsigned int i;
434 
435 		assert(md_size == 0 || md_size >= (int)sizeof(uint64_t));
436 
437 		while (offset_blocks < num_blocks) {
438 			inner_offset = 0;
439 			while (inner_offset < data_block_size) {
440 				*(uint64_t *)buf = (io_count << 32) | (offset_blocks + inner_offset);
441 				inner_offset += sizeof(uint64_t);
442 				buf += sizeof(uint64_t);
443 			}
444 			for (i = 0; i < md_size / sizeof(uint64_t); i++) {
445 				((uint64_t *)md_buf)[i] = (io_count << 32) | offset_blocks;
446 			}
447 			md_buf += md_offset;
448 			offset_blocks++;
449 		}
450 		return;
451 	}
452 
453 	while (offset_blocks < num_blocks) {
454 		inner_offset = 0;
455 		while (inner_offset < data_block_size) {
456 			*(uint32_t *)buf = offset_blocks + inner_offset;
457 			inner_offset += sizeof(uint32_t);
458 			buf += sizeof(uint32_t);
459 		}
460 		memset(md_buf, offset_blocks, md_size);
461 		md_buf += md_offset;
462 		offset_blocks++;
463 	}
464 }
465 
466 static bool
467 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
468 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
469 {
470 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
471 		return false;
472 	}
473 
474 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
475 
476 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
477 
478 	if (wr_md_buf != NULL) {
479 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
480 	}
481 
482 	return true;
483 }
484 
485 static bool
486 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
487 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
488 {
489 	int offset_blocks = 0, md_offset, data_block_size;
490 
491 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
492 		return false;
493 	}
494 
495 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
496 
497 	if (wr_md_buf == NULL) {
498 		data_block_size = block_size - md_size;
499 		wr_md_buf = (char *)wr_buf + data_block_size;
500 		rd_md_buf = (char *)rd_buf + data_block_size;
501 		md_offset = block_size;
502 	} else {
503 		data_block_size = block_size;
504 		md_offset = md_size;
505 	}
506 
507 	while (offset_blocks < num_blocks) {
508 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
509 			printf("data_block_size %d, num_blocks %d, offset %d\n", data_block_size, num_blocks,
510 			       offset_blocks);
511 			spdk_log_dump(stdout, "rd_buf", rd_buf, data_block_size);
512 			spdk_log_dump(stdout, "wr_buf", wr_buf, data_block_size);
513 			return false;
514 		}
515 
516 		wr_buf += block_size;
517 		rd_buf += block_size;
518 
519 		if (md_check) {
520 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
521 				printf("md_size %d, num_blocks %d, offset %d\n", md_size, num_blocks, offset_blocks);
522 				spdk_log_dump(stdout, "rd_md_buf", rd_md_buf, md_size);
523 				spdk_log_dump(stdout, "wr_md_buf", wr_md_buf, md_size);
524 				return false;
525 			}
526 
527 			wr_md_buf += md_offset;
528 			rd_md_buf += md_offset;
529 		}
530 
531 		offset_blocks++;
532 	}
533 
534 	return true;
535 }
536 
537 static void
538 free_job_config(void)
539 {
540 	struct job_config *config, *tmp;
541 
542 	spdk_conf_free(g_bdevperf_conf);
543 	g_bdevperf_conf = NULL;
544 
545 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
546 		TAILQ_REMOVE(&job_config_list, config, link);
547 		free(config);
548 	}
549 }
550 
551 static void
552 bdevperf_job_free(struct bdevperf_job *job)
553 {
554 	spdk_histogram_data_free(job->histogram);
555 	spdk_bit_array_free(&job->outstanding);
556 	spdk_bit_array_free(&job->random_map);
557 	spdk_zipf_free(&job->zipf);
558 	free(job->name);
559 	free(job);
560 }
561 
562 static void
563 job_thread_exit(void *ctx)
564 {
565 	spdk_thread_exit(spdk_get_thread());
566 }
567 
568 static void
569 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
570 	     uint64_t total, uint64_t so_far)
571 {
572 	double so_far_pct;
573 	double **cutoff = ctx;
574 	uint64_t tsc_rate;
575 
576 	if (count == 0) {
577 		return;
578 	}
579 
580 	tsc_rate = spdk_get_ticks_hz();
581 	so_far_pct = (double)so_far / total;
582 	while (so_far_pct >= **cutoff && **cutoff > 0) {
583 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
584 		(*cutoff)++;
585 	}
586 }
587 
588 static void
589 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
590 	     uint64_t total, uint64_t so_far)
591 {
592 	double so_far_pct;
593 	uint64_t tsc_rate;
594 
595 	if (count == 0) {
596 		return;
597 	}
598 
599 	tsc_rate = spdk_get_ticks_hz();
600 	so_far_pct = (double)so_far * 100 / total;
601 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
602 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
603 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
604 	       so_far_pct, count);
605 }
606 
607 static void
608 bdevperf_test_done(void *ctx)
609 {
610 	struct bdevperf_job *job, *jtmp;
611 	struct bdevperf_task *task, *ttmp;
612 	struct lcore_thread *lthread, *lttmp;
613 	double average_latency = 0.0;
614 	uint64_t time_in_usec;
615 	int rc;
616 
617 	if (g_time_in_usec) {
618 		g_stats.io_time_in_usec = g_time_in_usec;
619 
620 		if (!g_run_rc && g_performance_dump_active) {
621 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
622 			return;
623 		}
624 	}
625 
626 	spdk_poller_unregister(&g_perf_timer);
627 
628 	if (g_shutdown) {
629 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
630 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
631 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
632 		printf("Received shutdown signal, test time was about %.6f seconds\n",
633 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
634 	}
635 
636 	printf("\n%*s\n", 107, "Latency(us)");
637 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
638 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
639 
640 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
641 		performance_dump_job(&g_stats, job, true);
642 	}
643 
644 	printf("\r =================================================================================="
645 	       "=================================\n");
646 	printf("\r %-28s: %10s %10.2f %10.2f",
647 	       "Total", "", g_stats.total_io_per_second, g_stats.total_mb_per_second);
648 	printf(" %10.2f %8.2f",
649 	       g_stats.total_failed_per_second, g_stats.total_timeout_per_second);
650 
651 	if (g_stats.total_io_completed != 0) {
652 		average_latency = ((double)g_stats.total_tsc / g_stats.total_io_completed) * SPDK_SEC_TO_USEC /
653 				  spdk_get_ticks_hz();
654 	}
655 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.min_latency, g_stats.max_latency);
656 
657 	if (g_latency_display_level == 0 || g_stats.total_io_completed == 0) {
658 		goto clean;
659 	}
660 
661 	printf("\n Latency summary\n");
662 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
663 		printf("\r =============================================\n");
664 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
665 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
666 
667 		const double *cutoff = g_latency_cutoffs;
668 
669 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
670 
671 		printf("\n");
672 	}
673 
674 	if (g_latency_display_level == 1) {
675 		goto clean;
676 	}
677 
678 	printf("\r Latency histogram\n");
679 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
680 		printf("\r =============================================\n");
681 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
682 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
683 
684 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
685 		printf("\n");
686 	}
687 
688 clean:
689 	fflush(stdout);
690 
691 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
692 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
693 
694 		if (!g_one_thread_per_lcore) {
695 			spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
696 		}
697 
698 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
699 			TAILQ_REMOVE(&job->task_list, task, link);
700 			spdk_free(task->buf);
701 			spdk_free(task->verify_buf);
702 			spdk_free(task->md_buf);
703 			free(task);
704 		}
705 
706 		bdevperf_job_free(job);
707 	}
708 
709 	if (g_one_thread_per_lcore) {
710 		TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
711 			TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
712 			spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
713 			free(lthread);
714 		}
715 	}
716 
717 	if (g_bdevperf_conf == NULL) {
718 		free_job_config();
719 	}
720 
721 	rc = g_run_rc;
722 	if (g_request && !g_shutdown) {
723 		rpc_perform_tests_cb();
724 		if (rc != 0) {
725 			spdk_app_stop(rc);
726 		}
727 	} else {
728 		spdk_app_stop(rc);
729 	}
730 }
731 
732 static void
733 bdevperf_job_end(void *ctx)
734 {
735 	assert(g_main_thread == spdk_get_thread());
736 
737 	if (--g_bdevperf.running_jobs == 0) {
738 		bdevperf_test_done(NULL);
739 	}
740 }
741 
742 static void
743 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
744 {
745 	struct spdk_histogram_data *job_hist = cb_arg;
746 
747 	if (status == 0) {
748 		spdk_histogram_data_merge(job_hist, histogram);
749 	}
750 }
751 
752 static void
753 bdevperf_job_empty(struct bdevperf_job *job)
754 {
755 	uint64_t end_tsc = 0;
756 
757 	end_tsc = spdk_get_ticks() - g_start_tsc;
758 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
759 	/* keep histogram info before channel is destroyed */
760 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
761 					job->histogram);
762 	spdk_put_io_channel(job->ch);
763 	spdk_bdev_close(job->bdev_desc);
764 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
765 }
766 
767 static void
768 bdevperf_end_task(struct bdevperf_task *task)
769 {
770 	struct bdevperf_job     *job = task->job;
771 
772 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
773 	if (job->is_draining) {
774 		if (job->current_queue_depth == 0) {
775 			bdevperf_job_empty(job);
776 		}
777 	}
778 }
779 
780 static void
781 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
782 {
783 	struct bdevperf_job	*job = task->job;
784 
785 	task->bdev_io_wait.bdev = job->bdev;
786 	task->bdev_io_wait.cb_fn = cb_fn;
787 	task->bdev_io_wait.cb_arg = task;
788 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
789 }
790 
791 static int
792 bdevperf_job_drain(void *ctx)
793 {
794 	struct bdevperf_job *job = ctx;
795 
796 	spdk_poller_unregister(&job->run_timer);
797 	if (job->reset) {
798 		spdk_poller_unregister(&job->reset_timer);
799 	}
800 
801 	job->is_draining = true;
802 
803 	return -1;
804 }
805 
806 static int
807 bdevperf_job_drain_timer(void *ctx)
808 {
809 	struct bdevperf_job *job = ctx;
810 
811 	bdevperf_job_drain(ctx);
812 	if (job->current_queue_depth == 0) {
813 		bdevperf_job_empty(job);
814 	}
815 
816 	return SPDK_POLLER_BUSY;
817 }
818 
819 static void
820 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
821 {
822 	struct bdevperf_task	*task = cb_arg;
823 	struct bdevperf_job	*job = task->job;
824 
825 	job->current_queue_depth--;
826 
827 	if (success) {
828 		job->io_completed++;
829 	} else {
830 		job->io_failed++;
831 		if (!job->continue_on_failure) {
832 			bdevperf_job_drain(job);
833 			g_run_rc = -1;
834 		}
835 	}
836 
837 	spdk_bdev_free_io(bdev_io);
838 	bdevperf_end_task(task);
839 }
840 
841 static int
842 bdevperf_verify_dif(struct bdevperf_task *task)
843 {
844 	struct bdevperf_job	*job = task->job;
845 	struct spdk_bdev	*bdev = job->bdev;
846 	struct spdk_dif_ctx	dif_ctx;
847 	struct spdk_dif_error	err_blk = {};
848 	int			rc;
849 	struct spdk_dif_ctx_init_ext_opts dif_opts;
850 
851 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
852 	dif_opts.dif_pi_format = spdk_bdev_get_dif_pi_format(bdev);
853 	rc = spdk_dif_ctx_init(&dif_ctx,
854 			       spdk_bdev_get_block_size(bdev),
855 			       spdk_bdev_get_md_size(bdev),
856 			       spdk_bdev_is_md_interleaved(bdev),
857 			       spdk_bdev_is_dif_head_of_md(bdev),
858 			       spdk_bdev_get_dif_type(bdev),
859 			       job->dif_check_flags,
860 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
861 	if (rc != 0) {
862 		fprintf(stderr, "Initialization of DIF context failed\n");
863 		return rc;
864 	}
865 
866 	if (spdk_bdev_is_md_interleaved(bdev)) {
867 		rc = spdk_dif_verify(&task->iov, 1, job->io_size_blocks, &dif_ctx, &err_blk);
868 	} else {
869 		struct iovec md_iov = {
870 			.iov_base	= task->md_buf,
871 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
872 		};
873 
874 		rc = spdk_dix_verify(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
875 	}
876 
877 	if (rc != 0) {
878 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
879 			err_blk.err_type, err_blk.err_offset);
880 	}
881 
882 	return rc;
883 }
884 
885 static void
886 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
887 {
888 	struct bdevperf_job	*job;
889 	struct bdevperf_task	*task = cb_arg;
890 	bool			md_check;
891 	uint64_t		offset_in_ios;
892 	int			rc;
893 
894 	job = task->job;
895 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
896 
897 	if (g_error_to_exit == true) {
898 		bdevperf_job_drain(job);
899 	} else if (!success) {
900 		if (!job->reset && !job->continue_on_failure) {
901 			bdevperf_job_drain(job);
902 			g_run_rc = -1;
903 			g_error_to_exit = true;
904 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
905 			       task->offset_blocks, job->name);
906 		}
907 	} else if (job->verify || job->reset) {
908 		if (!verify_data(task->buf, job->buf_size,
909 				 task->iov.iov_base, job->buf_size,
910 				 spdk_bdev_get_block_size(job->bdev),
911 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
912 				 spdk_bdev_get_md_size(job->bdev),
913 				 job->io_size_blocks, md_check)) {
914 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
915 			bdevperf_job_drain(job);
916 			g_run_rc = -1;
917 		}
918 	} else if (job->dif_check_flags != 0) {
919 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
920 			rc = bdevperf_verify_dif(task);
921 			if (rc != 0) {
922 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
923 				       task->offset_blocks, job->name);
924 
925 				success = false;
926 				if (!job->reset && !job->continue_on_failure) {
927 					bdevperf_job_drain(job);
928 					g_run_rc = -1;
929 					g_error_to_exit = true;
930 				}
931 			}
932 		}
933 	}
934 
935 	job->current_queue_depth--;
936 
937 	if (success) {
938 		job->io_completed++;
939 	} else {
940 		job->io_failed++;
941 	}
942 
943 	if (job->verify) {
944 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
945 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
946 
947 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
948 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
949 	}
950 
951 	spdk_bdev_free_io(bdev_io);
952 
953 	/*
954 	 * is_draining indicates when time has expired for the test run
955 	 * and we are just waiting for the previously submitted I/O
956 	 * to complete.  In this case, do not submit a new I/O to replace
957 	 * the one just completed.
958 	 */
959 	if (!job->is_draining) {
960 		bdevperf_submit_single(job, task);
961 	} else {
962 		bdevperf_end_task(task);
963 	}
964 }
965 
966 static void
967 bdevperf_verify_submit_read(void *cb_arg)
968 {
969 	struct bdevperf_job	*job;
970 	struct bdevperf_task	*task = cb_arg;
971 	int			rc;
972 
973 	job = task->job;
974 
975 	task->iov.iov_base = task->verify_buf;
976 	task->iov.iov_len = job->buf_size;
977 
978 	/* Read the data back in */
979 	rc = spdk_bdev_readv_blocks_with_md(job->bdev_desc, job->ch, &task->iov, 1, NULL,
980 					    task->offset_blocks, job->io_size_blocks,
981 					    bdevperf_complete, task);
982 
983 	if (rc == -ENOMEM) {
984 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
985 	} else if (rc != 0) {
986 		printf("Failed to submit read: %d\n", rc);
987 		bdevperf_job_drain(job);
988 		g_run_rc = rc;
989 	}
990 }
991 
992 static void
993 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
994 			       void *cb_arg)
995 {
996 	if (success) {
997 		spdk_bdev_free_io(bdev_io);
998 		bdevperf_verify_submit_read(cb_arg);
999 	} else {
1000 		bdevperf_complete(bdev_io, success, cb_arg);
1001 	}
1002 }
1003 
1004 static void
1005 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1006 {
1007 	if (!success) {
1008 		bdevperf_complete(bdev_io, success, cb_arg);
1009 		return;
1010 	}
1011 
1012 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
1013 }
1014 
1015 static int
1016 bdevperf_generate_dif(struct bdevperf_task *task)
1017 {
1018 	struct bdevperf_job	*job = task->job;
1019 	struct spdk_bdev	*bdev = job->bdev;
1020 	struct spdk_dif_ctx	dif_ctx;
1021 	int			rc;
1022 	struct spdk_dif_ctx_init_ext_opts dif_opts;
1023 
1024 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
1025 	dif_opts.dif_pi_format = spdk_bdev_get_dif_pi_format(bdev);
1026 	rc = spdk_dif_ctx_init(&dif_ctx,
1027 			       spdk_bdev_get_block_size(bdev),
1028 			       spdk_bdev_get_md_size(bdev),
1029 			       spdk_bdev_is_md_interleaved(bdev),
1030 			       spdk_bdev_is_dif_head_of_md(bdev),
1031 			       spdk_bdev_get_dif_type(bdev),
1032 			       job->dif_check_flags,
1033 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
1034 	if (rc != 0) {
1035 		fprintf(stderr, "Initialization of DIF context failed\n");
1036 		return rc;
1037 	}
1038 
1039 	if (spdk_bdev_is_md_interleaved(bdev)) {
1040 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
1041 	} else {
1042 		struct iovec md_iov = {
1043 			.iov_base	= task->md_buf,
1044 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
1045 		};
1046 
1047 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
1048 	}
1049 
1050 	if (rc != 0) {
1051 		fprintf(stderr, "Generation of DIF/DIX failed\n");
1052 	}
1053 
1054 	return rc;
1055 }
1056 
1057 static void
1058 bdevperf_submit_task(void *arg)
1059 {
1060 	struct bdevperf_task	*task = arg;
1061 	struct bdevperf_job	*job = task->job;
1062 	struct spdk_bdev_desc	*desc;
1063 	struct spdk_io_channel	*ch;
1064 	spdk_bdev_io_completion_cb cb_fn;
1065 	uint64_t		offset_in_ios;
1066 	int			rc = 0;
1067 
1068 	desc = job->bdev_desc;
1069 	ch = job->ch;
1070 
1071 	switch (task->io_type) {
1072 	case SPDK_BDEV_IO_TYPE_WRITE:
1073 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
1074 			rc = bdevperf_generate_dif(task);
1075 		}
1076 		if (rc == 0) {
1077 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
1078 
1079 			if (g_zcopy) {
1080 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
1081 				return;
1082 			} else {
1083 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
1084 								     task->md_buf,
1085 								     task->offset_blocks,
1086 								     job->io_size_blocks,
1087 								     cb_fn, task);
1088 			}
1089 		}
1090 		break;
1091 	case SPDK_BDEV_IO_TYPE_FLUSH:
1092 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
1093 					    job->io_size_blocks, bdevperf_complete, task);
1094 		break;
1095 	case SPDK_BDEV_IO_TYPE_UNMAP:
1096 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
1097 					    job->io_size_blocks, bdevperf_complete, task);
1098 		break;
1099 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1100 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
1101 						   job->io_size_blocks, bdevperf_complete, task);
1102 		break;
1103 	case SPDK_BDEV_IO_TYPE_READ:
1104 		if (g_zcopy) {
1105 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
1106 						   true, bdevperf_zcopy_populate_complete, task);
1107 		} else {
1108 			rc = spdk_bdev_readv_blocks_with_md(desc, ch, &task->iov, 1,
1109 							    task->md_buf,
1110 							    task->offset_blocks,
1111 							    job->io_size_blocks,
1112 							    bdevperf_complete, task);
1113 		}
1114 		break;
1115 	case SPDK_BDEV_IO_TYPE_ABORT:
1116 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
1117 		break;
1118 	default:
1119 		assert(false);
1120 		rc = -EINVAL;
1121 		break;
1122 	}
1123 
1124 	if (rc == -ENOMEM) {
1125 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1126 		return;
1127 	} else if (rc != 0) {
1128 		printf("Failed to submit bdev_io: %d\n", rc);
1129 		if (job->verify) {
1130 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1131 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1132 
1133 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1134 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1135 		}
1136 		bdevperf_job_drain(job);
1137 		g_run_rc = rc;
1138 		return;
1139 	}
1140 
1141 	job->current_queue_depth++;
1142 }
1143 
1144 static void
1145 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1146 {
1147 	struct bdevperf_task	*task = cb_arg;
1148 	struct bdevperf_job	*job = task->job;
1149 	struct iovec		*iovs;
1150 	int			iovcnt;
1151 
1152 	if (!success) {
1153 		bdevperf_job_drain(job);
1154 		g_run_rc = -1;
1155 		return;
1156 	}
1157 
1158 	task->bdev_io = bdev_io;
1159 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1160 
1161 	if (job->verify || job->reset) {
1162 		/* When job->verify or job->reset is enabled, task->buf is used for
1163 		 *  verification of read after write.  For write I/O, when zcopy APIs
1164 		 *  are used, task->buf cannot be used, and data must be written to
1165 		 *  the data buffer allocated underneath bdev layer instead.
1166 		 *  Hence we copy task->buf to the allocated data buffer here.
1167 		 */
1168 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1169 		assert(iovcnt == 1);
1170 		assert(iovs != NULL);
1171 
1172 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1173 			  spdk_bdev_get_block_size(job->bdev),
1174 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1175 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
1176 	}
1177 
1178 	bdevperf_submit_task(task);
1179 }
1180 
1181 static void
1182 bdevperf_prep_zcopy_write_task(void *arg)
1183 {
1184 	struct bdevperf_task	*task = arg;
1185 	struct bdevperf_job	*job = task->job;
1186 	int			rc;
1187 
1188 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1189 				   task->offset_blocks, job->io_size_blocks,
1190 				   false, bdevperf_zcopy_get_buf_complete, task);
1191 	if (rc != 0) {
1192 		assert(rc == -ENOMEM);
1193 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1194 		return;
1195 	}
1196 
1197 	job->current_queue_depth++;
1198 }
1199 
1200 static struct bdevperf_task *
1201 bdevperf_job_get_task(struct bdevperf_job *job)
1202 {
1203 	struct bdevperf_task *task;
1204 
1205 	task = TAILQ_FIRST(&job->task_list);
1206 	if (!task) {
1207 		printf("Task allocation failed\n");
1208 		abort();
1209 	}
1210 
1211 	TAILQ_REMOVE(&job->task_list, task, link);
1212 	return task;
1213 }
1214 
1215 static void
1216 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1217 {
1218 	uint64_t offset_in_ios;
1219 	uint64_t rand_value;
1220 	uint32_t first_clear;
1221 
1222 	if (job->zipf) {
1223 		offset_in_ios = spdk_zipf_generate(job->zipf);
1224 	} else if (job->is_random) {
1225 		/* RAND_MAX is only INT32_MAX, so use 2 calls to rand_r to
1226 		 * get a large enough value to ensure we are issuing I/O
1227 		 * uniformly across the whole bdev.
1228 		 */
1229 		rand_value = (uint64_t)rand_r(&job->seed) * RAND_MAX + rand_r(&job->seed);
1230 		offset_in_ios = rand_value % job->size_in_ios;
1231 
1232 		if (g_random_map) {
1233 			/* Make sure, that the offset does not exceed the maximum size
1234 			 * of the bit array (verified during job creation)
1235 			 */
1236 			assert(offset_in_ios < UINT32_MAX);
1237 
1238 			first_clear = spdk_bit_array_find_first_clear(job->random_map, (uint32_t)offset_in_ios);
1239 
1240 			if (first_clear == UINT32_MAX) {
1241 				first_clear = spdk_bit_array_find_first_clear(job->random_map, 0);
1242 
1243 				if (first_clear == UINT32_MAX) {
1244 					/* If there are no more clear bits in the array, we start over
1245 					 * and select the previously selected random value.
1246 					 */
1247 					spdk_bit_array_clear_mask(job->random_map);
1248 					first_clear = (uint32_t)offset_in_ios;
1249 				}
1250 			}
1251 
1252 			spdk_bit_array_set(job->random_map, first_clear);
1253 
1254 			offset_in_ios = first_clear;
1255 		}
1256 	} else {
1257 		offset_in_ios = job->offset_in_ios++;
1258 		if (job->offset_in_ios == job->size_in_ios) {
1259 			job->offset_in_ios = 0;
1260 		}
1261 
1262 		/* Increment of offset_in_ios if there's already an outstanding IO
1263 		 * to that location. We only need this with job->verify as random
1264 		 * offsets are not supported with job->verify at this time.
1265 		 */
1266 		if (job->verify) {
1267 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1268 
1269 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1270 				offset_in_ios = job->offset_in_ios++;
1271 				if (job->offset_in_ios == job->size_in_ios) {
1272 					job->offset_in_ios = 0;
1273 				}
1274 			}
1275 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1276 		}
1277 	}
1278 
1279 	/* For multi-thread to same job, offset_in_ios is relative
1280 	 * to the LBA range assigned for that job. job->offset_blocks
1281 	 * is absolute (entire bdev LBA range).
1282 	 */
1283 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1284 
1285 	if (job->flush) {
1286 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1287 	} else if (job->unmap) {
1288 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1289 	} else if (job->write_zeroes) {
1290 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1291 	} else if ((job->rw_percentage == 100) ||
1292 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1293 		assert(!job->verify);
1294 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1295 		if (!g_zcopy) {
1296 			task->iov.iov_base = task->buf;
1297 			task->iov.iov_len = job->buf_size;
1298 		}
1299 	} else {
1300 		if (job->verify || job->reset || g_unique_writes) {
1301 			generate_data(job, task->buf, task->md_buf, g_unique_writes);
1302 		}
1303 		if (g_zcopy) {
1304 			bdevperf_prep_zcopy_write_task(task);
1305 			return;
1306 		} else {
1307 			task->iov.iov_base = task->buf;
1308 			task->iov.iov_len = job->buf_size;
1309 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1310 		}
1311 	}
1312 
1313 	bdevperf_submit_task(task);
1314 }
1315 
1316 static int reset_job(void *arg);
1317 
1318 static void
1319 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1320 {
1321 	struct bdevperf_task	*task = cb_arg;
1322 	struct bdevperf_job	*job = task->job;
1323 
1324 	if (!success) {
1325 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1326 		bdevperf_job_drain(job);
1327 		g_run_rc = -1;
1328 	}
1329 
1330 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1331 	spdk_bdev_free_io(bdev_io);
1332 
1333 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1334 						10 * SPDK_SEC_TO_USEC);
1335 }
1336 
1337 static int
1338 reset_job(void *arg)
1339 {
1340 	struct bdevperf_job *job = arg;
1341 	struct bdevperf_task *task;
1342 	int rc;
1343 
1344 	spdk_poller_unregister(&job->reset_timer);
1345 
1346 	/* Do reset. */
1347 	task = bdevperf_job_get_task(job);
1348 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1349 			     reset_cb, task);
1350 	if (rc) {
1351 		printf("Reset failed: %d\n", rc);
1352 		bdevperf_job_drain(job);
1353 		g_run_rc = -1;
1354 	}
1355 
1356 	return -1;
1357 }
1358 
1359 static void
1360 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1361 {
1362 	struct bdevperf_job *job = cb_arg;
1363 	struct bdevperf_task *task;
1364 
1365 	job->io_timeout++;
1366 
1367 	if (job->is_draining || !job->abort ||
1368 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1369 		return;
1370 	}
1371 
1372 	task = bdevperf_job_get_task(job);
1373 	if (task == NULL) {
1374 		return;
1375 	}
1376 
1377 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1378 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1379 
1380 	bdevperf_submit_task(task);
1381 }
1382 
1383 static void
1384 bdevperf_job_run(void *ctx)
1385 {
1386 	struct bdevperf_job *job = ctx;
1387 	struct bdevperf_task *task;
1388 	int i;
1389 
1390 	/* Submit initial I/O for this job. Each time one
1391 	 * completes, another will be submitted. */
1392 
1393 	/* Start a timer to stop this I/O chain when the run is over */
1394 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1395 	if (job->reset) {
1396 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1397 							10 * SPDK_SEC_TO_USEC);
1398 	}
1399 
1400 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1401 
1402 	for (i = 0; i < job->queue_depth; i++) {
1403 		task = bdevperf_job_get_task(job);
1404 		bdevperf_submit_single(job, task);
1405 	}
1406 }
1407 
1408 static void
1409 _performance_dump_done(void *ctx)
1410 {
1411 	struct bdevperf_aggregate_stats *stats = ctx;
1412 	double average_latency;
1413 
1414 	if (g_summarize_performance) {
1415 		printf("%12.2f IOPS, %8.2f MiB/s", stats->total_io_per_second, stats->total_mb_per_second);
1416 		printf("\r");
1417 	} else {
1418 		printf("\r =================================================================================="
1419 		       "=================================\n");
1420 		printf("\r %-28s: %10s %10.2f %10.2f",
1421 		       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1422 		printf(" %10.2f %8.2f",
1423 		       stats->total_failed_per_second, stats->total_timeout_per_second);
1424 
1425 		average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1426 				  spdk_get_ticks_hz();
1427 		printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1428 		printf("\n");
1429 	}
1430 
1431 	fflush(stdout);
1432 
1433 	g_performance_dump_active = false;
1434 
1435 	free(stats);
1436 }
1437 
1438 static void
1439 _performance_dump(void *ctx)
1440 {
1441 	struct bdevperf_aggregate_stats *stats = ctx;
1442 
1443 	performance_dump_job(stats, stats->current_job, !g_summarize_performance);
1444 
1445 	/* This assumes the jobs list is static after start up time.
1446 	 * That's true right now, but if that ever changed this would need a lock. */
1447 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1448 	if (stats->current_job == NULL) {
1449 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1450 	} else {
1451 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1452 	}
1453 }
1454 
1455 static int
1456 performance_statistics_thread(void *arg)
1457 {
1458 	struct bdevperf_aggregate_stats *stats;
1459 
1460 	if (g_performance_dump_active) {
1461 		return -1;
1462 	}
1463 
1464 	g_performance_dump_active = true;
1465 
1466 	stats = calloc(1, sizeof(*stats));
1467 	if (stats == NULL) {
1468 		return -1;
1469 	}
1470 
1471 	stats->min_latency = (double)UINT64_MAX;
1472 
1473 	g_show_performance_period_num++;
1474 
1475 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1476 	stats->ema_period = g_show_performance_ema_period;
1477 
1478 	/* Iterate all of the jobs to gather stats
1479 	 * These jobs will not get removed here until a final performance dump is run,
1480 	 * so this should be safe without locking.
1481 	 */
1482 	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1483 	if (stats->current_job == NULL) {
1484 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1485 	} else {
1486 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1487 	}
1488 
1489 	return -1;
1490 }
1491 
1492 static void
1493 bdevperf_test(void)
1494 {
1495 	struct bdevperf_job *job;
1496 
1497 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1498 	fflush(stdout);
1499 
1500 	/* Start a timer to dump performance numbers */
1501 	g_start_tsc = spdk_get_ticks();
1502 	if (!g_summarize_performance) {
1503 		printf("%*s\n", 107, "Latency(us)");
1504 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1505 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1506 	}
1507 	if (!g_perf_timer) {
1508 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1509 						    g_show_performance_period_in_usec);
1510 	}
1511 
1512 	/* Iterate jobs to start all I/O */
1513 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1514 		g_bdevperf.running_jobs++;
1515 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1516 	}
1517 }
1518 
1519 static void
1520 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1521 {
1522 	struct bdevperf_job *job = event_ctx;
1523 
1524 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1525 		bdevperf_job_drain(job);
1526 	}
1527 }
1528 
1529 static void
1530 bdevperf_histogram_status_cb(void *cb_arg, int status)
1531 {
1532 	if (status != 0) {
1533 		g_run_rc = status;
1534 		if (g_continue_on_failure == false) {
1535 			g_error_to_exit = true;
1536 		}
1537 	}
1538 
1539 	if (--g_bdev_count == 0) {
1540 		if (g_run_rc == 0) {
1541 			/* Ready to run the test */
1542 			bdevperf_test();
1543 		} else {
1544 			bdevperf_test_done(NULL);
1545 		}
1546 	}
1547 }
1548 
1549 static uint32_t g_construct_job_count = 0;
1550 
1551 static int
1552 _bdevperf_enable_histogram(void *ctx, struct spdk_bdev *bdev)
1553 {
1554 	bool *enable = ctx;
1555 
1556 	g_bdev_count++;
1557 
1558 	spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, *enable);
1559 
1560 	return 0;
1561 }
1562 
1563 static void
1564 bdevperf_enable_histogram(bool enable)
1565 {
1566 	struct spdk_bdev *bdev;
1567 	int rc;
1568 
1569 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1570 	g_bdev_count = 1;
1571 
1572 	if (g_job_bdev_name != NULL) {
1573 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1574 		if (bdev) {
1575 			rc = _bdevperf_enable_histogram(&enable, bdev);
1576 		} else {
1577 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1578 			rc = -1;
1579 		}
1580 	} else {
1581 		rc = spdk_for_each_bdev_leaf(&enable, _bdevperf_enable_histogram);
1582 	}
1583 
1584 	bdevperf_histogram_status_cb(NULL, rc);
1585 }
1586 
1587 static void
1588 _bdevperf_construct_job_done(void *ctx)
1589 {
1590 	if (--g_construct_job_count == 0) {
1591 		if (g_run_rc != 0) {
1592 			/* Something failed. */
1593 			bdevperf_test_done(NULL);
1594 			return;
1595 		}
1596 
1597 		/* always enable histogram. */
1598 		bdevperf_enable_histogram(true);
1599 	} else if (g_run_rc != 0) {
1600 		/* Reset error as some jobs constructed right */
1601 		g_run_rc = 0;
1602 		if (g_continue_on_failure == false) {
1603 			g_error_to_exit = true;
1604 		}
1605 	}
1606 }
1607 
1608 /* Checkformat will not allow to use inlined type,
1609    this is a workaround */
1610 typedef struct spdk_thread *spdk_thread_t;
1611 
1612 static spdk_thread_t
1613 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1614 {
1615 	struct spdk_cpuset tmp;
1616 
1617 	/* This function runs on the main thread. */
1618 	assert(g_main_thread == spdk_get_thread());
1619 
1620 	/* Handle default mask */
1621 	if (spdk_cpuset_count(cpumask) == 0) {
1622 		cpumask = &g_all_cpuset;
1623 	}
1624 
1625 	/* Warn user that mask might need to be changed */
1626 	spdk_cpuset_copy(&tmp, cpumask);
1627 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1628 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1629 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1630 	}
1631 
1632 	return spdk_thread_create(tag, cpumask);
1633 }
1634 
1635 static uint32_t
1636 _get_next_core(void)
1637 {
1638 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1639 
1640 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1641 		current_core = spdk_env_get_first_core();
1642 		return current_core;
1643 	}
1644 
1645 	current_core = spdk_env_get_next_core(current_core);
1646 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1647 		current_core = spdk_env_get_first_core();
1648 	}
1649 
1650 	return current_core;
1651 }
1652 
1653 static void
1654 _bdevperf_construct_job(void *ctx)
1655 {
1656 	struct bdevperf_job *job = ctx;
1657 	int rc;
1658 
1659 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1660 				&job->bdev_desc);
1661 	if (rc != 0) {
1662 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1663 		g_run_rc = -EINVAL;
1664 		goto end;
1665 	}
1666 
1667 	if (g_zcopy) {
1668 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1669 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1670 			g_run_rc = -ENOTSUP;
1671 			goto end;
1672 		}
1673 	}
1674 
1675 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1676 	if (!job->ch) {
1677 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1678 			    rc);
1679 		spdk_bdev_close(job->bdev_desc);
1680 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1681 		g_run_rc = -ENOMEM;
1682 		goto end;
1683 	}
1684 
1685 end:
1686 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1687 }
1688 
1689 static void
1690 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1691 {
1692 	switch (rw) {
1693 	case JOB_CONFIG_RW_READ:
1694 		job->rw_percentage = 100;
1695 		break;
1696 	case JOB_CONFIG_RW_WRITE:
1697 		job->rw_percentage = 0;
1698 		break;
1699 	case JOB_CONFIG_RW_RANDREAD:
1700 		job->is_random = true;
1701 		job->rw_percentage = 100;
1702 		job->seed = rand();
1703 		break;
1704 	case JOB_CONFIG_RW_RANDWRITE:
1705 		job->is_random = true;
1706 		job->rw_percentage = 0;
1707 		job->seed = rand();
1708 		break;
1709 	case JOB_CONFIG_RW_RW:
1710 		job->is_random = false;
1711 		break;
1712 	case JOB_CONFIG_RW_RANDRW:
1713 		job->is_random = true;
1714 		job->seed = rand();
1715 		break;
1716 	case JOB_CONFIG_RW_RESET:
1717 		/* Reset shares the flow with verify. */
1718 		job->reset = true;
1719 	/* fallthrough */
1720 	case JOB_CONFIG_RW_VERIFY:
1721 		job->verify = true;
1722 		/* For verify flow read is done on write completion
1723 		 * callback only, rw_percentage shall not be used. */
1724 		job->rw_percentage = 0;
1725 		break;
1726 	case JOB_CONFIG_RW_UNMAP:
1727 		job->unmap = true;
1728 		break;
1729 	case JOB_CONFIG_RW_FLUSH:
1730 		job->flush = true;
1731 		break;
1732 	case JOB_CONFIG_RW_WRITE_ZEROES:
1733 		job->write_zeroes = true;
1734 		break;
1735 	}
1736 }
1737 
1738 static int
1739 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1740 		       struct spdk_thread *thread)
1741 {
1742 	struct bdevperf_job *job;
1743 	struct bdevperf_task *task;
1744 	int block_size, data_block_size;
1745 	int rc;
1746 	int task_num, n;
1747 	int32_t numa_id;
1748 
1749 	block_size = spdk_bdev_get_block_size(bdev);
1750 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1751 
1752 	job = calloc(1, sizeof(struct bdevperf_job));
1753 	if (!job) {
1754 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1755 		return -ENOMEM;
1756 	}
1757 
1758 	job->name = strdup(spdk_bdev_get_name(bdev));
1759 	if (!job->name) {
1760 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1761 		bdevperf_job_free(job);
1762 		return -ENOMEM;
1763 	}
1764 
1765 	job->workload_type = config->rw;
1766 	job->io_size = config->bs;
1767 	job->rw_percentage = config->rwmixread;
1768 	job->continue_on_failure = g_continue_on_failure;
1769 	job->queue_depth = config->iodepth;
1770 	job->bdev = bdev;
1771 	job->io_size_blocks = job->io_size / data_block_size;
1772 	job->buf_size = job->io_size_blocks * block_size;
1773 	job->abort = g_abort;
1774 	job_init_rw(job, config->rw);
1775 
1776 	if ((job->io_size % data_block_size) != 0) {
1777 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1778 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1779 		bdevperf_job_free(job);
1780 		return -ENOTSUP;
1781 	}
1782 
1783 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1784 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1785 		bdevperf_job_free(job);
1786 		return -ENOTSUP;
1787 	}
1788 
1789 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1790 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1791 	}
1792 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1793 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1794 	}
1795 
1796 	job->offset_in_ios = 0;
1797 
1798 	if (config->length != 0) {
1799 		/* Use subset of disk */
1800 		job->size_in_ios = config->length / job->io_size_blocks;
1801 		job->ios_base = config->offset / job->io_size_blocks;
1802 	} else {
1803 		/* Use whole disk */
1804 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1805 		job->ios_base = 0;
1806 	}
1807 
1808 	if (job->is_random && g_zipf_theta > 0) {
1809 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1810 	}
1811 
1812 	if (job->verify) {
1813 		if (job->size_in_ios >= UINT32_MAX) {
1814 			SPDK_ERRLOG("Due to constraints of verify operation, the job storage capacity is too large\n");
1815 			bdevperf_job_free(job);
1816 			return -ENOMEM;
1817 		}
1818 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1819 		if (job->outstanding == NULL) {
1820 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1821 				    spdk_bdev_get_name(bdev));
1822 			bdevperf_job_free(job);
1823 			return -ENOMEM;
1824 		}
1825 		if (job->queue_depth > (int)job->size_in_ios) {
1826 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1827 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1828 				     "Queue depth is limited to %"PRIu64"\n",
1829 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1830 			job->queue_depth = (int)job->size_in_ios;
1831 		}
1832 	}
1833 
1834 	job->histogram = spdk_histogram_data_alloc();
1835 	if (job->histogram == NULL) {
1836 		fprintf(stderr, "Failed to allocate histogram\n");
1837 		bdevperf_job_free(job);
1838 		return -ENOMEM;
1839 	}
1840 
1841 	TAILQ_INIT(&job->task_list);
1842 
1843 	if (g_random_map) {
1844 		if (job->size_in_ios >= UINT32_MAX) {
1845 			SPDK_ERRLOG("Due to constraints of the random map, the job storage capacity is too large\n");
1846 			bdevperf_job_free(job);
1847 			return -ENOMEM;
1848 		}
1849 		job->random_map = spdk_bit_array_create(job->size_in_ios);
1850 		if (job->random_map == NULL) {
1851 			SPDK_ERRLOG("Could not create random_map array bitmap for bdev %s\n",
1852 				    spdk_bdev_get_name(bdev));
1853 			bdevperf_job_free(job);
1854 			return -ENOMEM;
1855 		}
1856 	}
1857 
1858 	task_num = job->queue_depth;
1859 	if (job->reset) {
1860 		task_num += 1;
1861 	}
1862 	if (job->abort) {
1863 		task_num += job->queue_depth;
1864 	}
1865 
1866 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1867 
1868 	numa_id = spdk_bdev_get_numa_id(job->bdev);
1869 
1870 	for (n = 0; n < task_num; n++) {
1871 		task = calloc(1, sizeof(struct bdevperf_task));
1872 		if (!task) {
1873 			fprintf(stderr, "Failed to allocate task from memory\n");
1874 			spdk_zipf_free(&job->zipf);
1875 			return -ENOMEM;
1876 		}
1877 
1878 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1879 					 numa_id, SPDK_MALLOC_DMA);
1880 		if (!task->buf) {
1881 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
1882 			spdk_zipf_free(&job->zipf);
1883 			free(task);
1884 			return -ENOMEM;
1885 		}
1886 
1887 		if (job->verify && job->buf_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
1888 			task->verify_buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1889 							numa_id, SPDK_MALLOC_DMA);
1890 			if (!task->verify_buf) {
1891 				fprintf(stderr, "Cannot allocate buf_verify for task=%p\n", task);
1892 				spdk_free(task->buf);
1893 				spdk_zipf_free(&job->zipf);
1894 				free(task);
1895 				return -ENOMEM;
1896 			}
1897 
1898 		}
1899 
1900 		if (spdk_bdev_is_md_separate(job->bdev)) {
1901 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
1902 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
1903 						    numa_id, SPDK_MALLOC_DMA);
1904 			if (!task->md_buf) {
1905 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
1906 				spdk_zipf_free(&job->zipf);
1907 				spdk_free(task->verify_buf);
1908 				spdk_free(task->buf);
1909 				free(task);
1910 				return -ENOMEM;
1911 			}
1912 		}
1913 
1914 		task->job = job;
1915 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
1916 	}
1917 
1918 	job->thread = thread;
1919 
1920 	g_construct_job_count++;
1921 
1922 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
1923 	assert(rc == 0);
1924 
1925 	return rc;
1926 }
1927 
1928 static int
1929 parse_rw(const char *str, enum job_config_rw ret)
1930 {
1931 	if (str == NULL) {
1932 		return ret;
1933 	}
1934 
1935 	if (!strcmp(str, "read")) {
1936 		ret = JOB_CONFIG_RW_READ;
1937 	} else if (!strcmp(str, "randread")) {
1938 		ret = JOB_CONFIG_RW_RANDREAD;
1939 	} else if (!strcmp(str, "write")) {
1940 		ret = JOB_CONFIG_RW_WRITE;
1941 	} else if (!strcmp(str, "randwrite")) {
1942 		ret = JOB_CONFIG_RW_RANDWRITE;
1943 	} else if (!strcmp(str, "verify")) {
1944 		ret = JOB_CONFIG_RW_VERIFY;
1945 	} else if (!strcmp(str, "reset")) {
1946 		ret = JOB_CONFIG_RW_RESET;
1947 	} else if (!strcmp(str, "unmap")) {
1948 		ret = JOB_CONFIG_RW_UNMAP;
1949 	} else if (!strcmp(str, "write_zeroes")) {
1950 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
1951 	} else if (!strcmp(str, "flush")) {
1952 		ret = JOB_CONFIG_RW_FLUSH;
1953 	} else if (!strcmp(str, "rw")) {
1954 		ret = JOB_CONFIG_RW_RW;
1955 	} else if (!strcmp(str, "randrw")) {
1956 		ret = JOB_CONFIG_RW_RANDRW;
1957 	} else {
1958 		fprintf(stderr, "rw must be one of\n"
1959 			PATTERN_TYPES_STR "\n");
1960 		ret = BDEVPERF_CONFIG_ERROR;
1961 	}
1962 
1963 	return ret;
1964 }
1965 
1966 static const char *
1967 config_filename_next(const char *filename, char *out)
1968 {
1969 	int i, k;
1970 
1971 	if (filename == NULL) {
1972 		out[0] = '\0';
1973 		return NULL;
1974 	}
1975 
1976 	if (filename[0] == ':') {
1977 		filename++;
1978 	}
1979 
1980 	for (i = 0, k = 0;
1981 	     filename[i] != '\0' &&
1982 	     filename[i] != ':' &&
1983 	     i < BDEVPERF_CONFIG_MAX_FILENAME &&
1984 	     k < (BDEVPERF_CONFIG_MAX_FILENAME - 1);
1985 	     i++) {
1986 		if (filename[i] == ' ' || filename[i] == '\t') {
1987 			continue;
1988 		}
1989 
1990 		out[k++] = filename[i];
1991 	}
1992 	out[k] = 0;
1993 
1994 	return filename + i;
1995 }
1996 
1997 static struct spdk_thread *
1998 get_lcore_thread(uint32_t lcore)
1999 {
2000 	struct lcore_thread *lthread;
2001 
2002 	TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
2003 		if (lthread->lcore == lcore) {
2004 			return lthread->thread;
2005 		}
2006 	}
2007 
2008 	return NULL;
2009 }
2010 
2011 static void
2012 create_lcore_thread(uint32_t lcore)
2013 {
2014 	struct lcore_thread *lthread;
2015 	struct spdk_cpuset cpumask = {};
2016 	char name[32];
2017 
2018 	lthread = calloc(1, sizeof(*lthread));
2019 	assert(lthread != NULL);
2020 
2021 	lthread->lcore = lcore;
2022 
2023 	snprintf(name, sizeof(name), "lcore_%u", lcore);
2024 	spdk_cpuset_set_cpu(&cpumask, lcore, true);
2025 
2026 	lthread->thread = spdk_thread_create(name, &cpumask);
2027 	assert(lthread->thread != NULL);
2028 
2029 	TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
2030 }
2031 
2032 static void
2033 bdevperf_construct_jobs(void)
2034 {
2035 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
2036 	struct spdk_thread *thread;
2037 	struct job_config *config;
2038 	struct spdk_bdev *bdev;
2039 	const char *filenames;
2040 	uint32_t i;
2041 	int rc;
2042 
2043 	if (g_one_thread_per_lcore) {
2044 		SPDK_ENV_FOREACH_CORE(i) {
2045 			create_lcore_thread(i);
2046 		}
2047 	}
2048 
2049 	TAILQ_FOREACH(config, &job_config_list, link) {
2050 		filenames = config->filename;
2051 
2052 		if (!g_one_thread_per_lcore) {
2053 			thread = construct_job_thread(&config->cpumask, config->name);
2054 		} else {
2055 			thread = get_lcore_thread(config->lcore);
2056 		}
2057 		assert(thread);
2058 
2059 		while (filenames) {
2060 			filenames = config_filename_next(filenames, filename);
2061 			if (strlen(filename) == 0) {
2062 				break;
2063 			}
2064 
2065 			bdev = spdk_bdev_get_by_name(filename);
2066 			if (!bdev) {
2067 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
2068 				g_run_rc = -EINVAL;
2069 				return;
2070 			}
2071 
2072 			rc = bdevperf_construct_job(bdev, config, thread);
2073 			if (rc < 0) {
2074 				g_run_rc = rc;
2075 				return;
2076 			}
2077 		}
2078 	}
2079 }
2080 
2081 static int
2082 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
2083 {
2084 	struct job_config *config = calloc(1, sizeof(*config));
2085 
2086 	if (config == NULL) {
2087 		fprintf(stderr, "Unable to allocate memory for job config\n");
2088 		return -ENOMEM;
2089 	}
2090 
2091 	config->name = filename;
2092 	config->filename = filename;
2093 	config->lcore = _get_next_core();
2094 	spdk_cpuset_zero(&config->cpumask);
2095 	spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
2096 	config->bs = g_io_size;
2097 	config->iodepth = g_queue_depth;
2098 	config->rwmixread = g_rw_percentage;
2099 	config->offset = offset;
2100 	config->length = range;
2101 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
2102 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2103 		free(config);
2104 		return -EINVAL;
2105 	}
2106 
2107 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
2108 	return 0;
2109 }
2110 
2111 static int
2112 bdevperf_construct_multithread_job_config(void *ctx, struct spdk_bdev *bdev)
2113 {
2114 	uint32_t *num_cores = ctx;
2115 	uint32_t i;
2116 	uint64_t blocks_per_job;
2117 	int64_t offset;
2118 	int rc;
2119 
2120 	blocks_per_job = spdk_bdev_get_num_blocks(bdev) / *num_cores;
2121 	offset = 0;
2122 
2123 	SPDK_ENV_FOREACH_CORE(i) {
2124 		rc = make_cli_job_config(spdk_bdev_get_name(bdev), offset, blocks_per_job);
2125 		if (rc) {
2126 			return rc;
2127 		}
2128 
2129 		offset += blocks_per_job;
2130 	}
2131 
2132 	return 0;
2133 }
2134 
2135 static void
2136 bdevperf_construct_multithread_job_configs(void)
2137 {
2138 	struct spdk_bdev *bdev;
2139 	uint32_t i;
2140 	uint32_t num_cores;
2141 
2142 	num_cores = 0;
2143 	SPDK_ENV_FOREACH_CORE(i) {
2144 		num_cores++;
2145 	}
2146 
2147 	if (num_cores == 0) {
2148 		g_run_rc = -EINVAL;
2149 		return;
2150 	}
2151 
2152 	if (g_job_bdev_name != NULL) {
2153 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2154 		if (!bdev) {
2155 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2156 			return;
2157 		}
2158 		g_run_rc = bdevperf_construct_multithread_job_config(&num_cores, bdev);
2159 	} else {
2160 		g_run_rc = spdk_for_each_bdev_leaf(&num_cores, bdevperf_construct_multithread_job_config);
2161 	}
2162 
2163 }
2164 
2165 static int
2166 bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
2167 {
2168 	/* Construct the job */
2169 	return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
2170 }
2171 
2172 static void
2173 bdevperf_construct_job_configs(void)
2174 {
2175 	struct spdk_bdev *bdev;
2176 
2177 	/* There are three different modes for allocating jobs. Standard mode
2178 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
2179 	 *
2180 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
2181 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
2182 	 * This runs multiple threads per bdev, effectively.
2183 	 *
2184 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
2185 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
2186 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
2187 	 * which is different from other modes in that they only support global options.
2188 	 *
2189 	 * Both for standard mode and "multithread" mode, if the -E flag is specified,
2190 	 * it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
2191 	 * multiple jobs.
2192 	 */
2193 
2194 	if (g_bdevperf_conf) {
2195 		goto end;
2196 	}
2197 
2198 	if (g_multithread_mode) {
2199 		bdevperf_construct_multithread_job_configs();
2200 	} else if (g_job_bdev_name != NULL) {
2201 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2202 		if (bdev) {
2203 			/* Construct the job */
2204 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
2205 		} else {
2206 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2207 		}
2208 	} else {
2209 		g_run_rc = spdk_for_each_bdev_leaf(NULL, bdevperf_construct_job_config);
2210 	}
2211 
2212 end:
2213 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
2214 	 * of iteration.
2215 	 */
2216 	g_construct_job_count = 1;
2217 
2218 	if (g_run_rc == 0) {
2219 		bdevperf_construct_jobs();
2220 	}
2221 
2222 	_bdevperf_construct_job_done(NULL);
2223 }
2224 
2225 static int
2226 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
2227 {
2228 	const char *job_name;
2229 	int tmp;
2230 
2231 	tmp = spdk_conf_section_get_intval(s, name);
2232 	if (tmp == -1) {
2233 		/* Field was not found. Check default value
2234 		 * In [global] section it is ok to have undefined values
2235 		 * but for other sections it is not ok */
2236 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
2237 			job_name = spdk_conf_section_get_name(s);
2238 			if (strcmp(job_name, "global") == 0) {
2239 				return def;
2240 			}
2241 
2242 			fprintf(stderr,
2243 				"Job '%s' has no '%s' assigned\n",
2244 				job_name, name);
2245 			return BDEVPERF_CONFIG_ERROR;
2246 		}
2247 		return def;
2248 	}
2249 
2250 	/* NOTE: get_intval returns nonnegative on success */
2251 	if (tmp < 0) {
2252 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2253 			spdk_conf_section_get_name(s), name);
2254 		return BDEVPERF_CONFIG_ERROR;
2255 	}
2256 
2257 	return tmp;
2258 }
2259 
2260 /* CLI arguments override parameters for global sections */
2261 static void
2262 config_set_cli_args(struct job_config *config)
2263 {
2264 	if (g_job_bdev_name) {
2265 		config->filename = g_job_bdev_name;
2266 	}
2267 	if (g_io_size > 0) {
2268 		config->bs = g_io_size;
2269 	}
2270 	if (g_queue_depth > 0) {
2271 		config->iodepth = g_queue_depth;
2272 	}
2273 	if (g_rw_percentage > 0) {
2274 		config->rwmixread = g_rw_percentage;
2275 	}
2276 	if (g_workload_type) {
2277 		config->rw = parse_rw(g_workload_type, config->rw);
2278 	}
2279 }
2280 
2281 static int
2282 read_job_config(void)
2283 {
2284 	struct job_config global_default_config;
2285 	struct job_config global_config;
2286 	struct spdk_conf_section *s;
2287 	struct job_config *config = NULL;
2288 	const char *cpumask;
2289 	const char *rw;
2290 	bool is_global;
2291 	int n = 0;
2292 	int val;
2293 
2294 	if (g_bdevperf_conf_file == NULL) {
2295 		return 0;
2296 	}
2297 
2298 	g_bdevperf_conf = spdk_conf_allocate();
2299 	if (g_bdevperf_conf == NULL) {
2300 		fprintf(stderr, "Could not allocate job config structure\n");
2301 		return 1;
2302 	}
2303 
2304 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2305 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2306 		fprintf(stderr, "Invalid job config");
2307 		return 1;
2308 	}
2309 
2310 	/* Initialize global defaults */
2311 	global_default_config.filename = NULL;
2312 	/* Zero mask is the same as g_all_cpuset
2313 	 * The g_all_cpuset is not initialized yet,
2314 	 * so use zero mask as the default instead */
2315 	spdk_cpuset_zero(&global_default_config.cpumask);
2316 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2317 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2318 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2319 	global_default_config.rwmixread = 50;
2320 	global_default_config.offset = 0;
2321 	/* length 0 means 100% */
2322 	global_default_config.length = 0;
2323 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2324 	config_set_cli_args(&global_default_config);
2325 
2326 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2327 		return 1;
2328 	}
2329 
2330 	/* There is only a single instance of global job_config
2331 	 * We just reset its value when we encounter new [global] section */
2332 	global_config = global_default_config;
2333 
2334 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2335 	     s != NULL;
2336 	     s = spdk_conf_next_section(s)) {
2337 		config = calloc(1, sizeof(*config));
2338 		if (config == NULL) {
2339 			fprintf(stderr, "Unable to allocate memory for job config\n");
2340 			return 1;
2341 		}
2342 
2343 		config->name = spdk_conf_section_get_name(s);
2344 		is_global = strcmp(config->name, "global") == 0;
2345 
2346 		if (is_global) {
2347 			global_config = global_default_config;
2348 		}
2349 
2350 		config->filename = spdk_conf_section_get_val(s, "filename");
2351 		if (config->filename == NULL) {
2352 			config->filename = global_config.filename;
2353 		}
2354 		if (!is_global) {
2355 			if (config->filename == NULL) {
2356 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2357 				goto error;
2358 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2359 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2360 				fprintf(stderr,
2361 					"filename for '%s' job is too long. Max length is %d\n",
2362 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2363 				goto error;
2364 			}
2365 		}
2366 
2367 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2368 		if (cpumask == NULL) {
2369 			config->cpumask = global_config.cpumask;
2370 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2371 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2372 			goto error;
2373 		}
2374 
2375 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2376 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2377 			goto error;
2378 		} else if (config->bs == 0) {
2379 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2380 			goto error;
2381 		}
2382 
2383 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2384 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2385 			goto error;
2386 		} else if (config->iodepth == 0) {
2387 			fprintf(stderr,
2388 				"'iodepth' of job '%s' must be greater than 0\n",
2389 				config->name);
2390 			goto error;
2391 		}
2392 
2393 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2394 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2395 			goto error;
2396 		} else if (config->rwmixread > 100) {
2397 			fprintf(stderr,
2398 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2399 				config->name);
2400 			goto error;
2401 		}
2402 
2403 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2404 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2405 			goto error;
2406 		}
2407 
2408 		val = parse_uint_option(s, "length", global_config.length);
2409 		if (val == BDEVPERF_CONFIG_ERROR) {
2410 			goto error;
2411 		}
2412 		config->length = val;
2413 
2414 		rw = spdk_conf_section_get_val(s, "rw");
2415 		config->rw = parse_rw(rw, global_config.rw);
2416 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2417 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2418 			goto error;
2419 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2420 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2421 			goto error;
2422 		}
2423 
2424 		if (is_global) {
2425 			config_set_cli_args(config);
2426 			global_config = *config;
2427 			free(config);
2428 			config = NULL;
2429 		} else {
2430 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2431 			n++;
2432 		}
2433 	}
2434 
2435 	if (g_rpc_log_file_name != NULL) {
2436 		g_rpc_log_file = fopen(g_rpc_log_file_name, "a");
2437 		if (g_rpc_log_file == NULL) {
2438 			fprintf(stderr, "Failed to open %s\n", g_rpc_log_file_name);
2439 			goto error;
2440 		}
2441 	}
2442 
2443 	printf("Using job config with %d jobs\n", n);
2444 	return 0;
2445 error:
2446 	free(config);
2447 	return 1;
2448 }
2449 
2450 static void
2451 bdevperf_run(void *arg1)
2452 {
2453 	uint32_t i;
2454 
2455 	g_main_thread = spdk_get_thread();
2456 
2457 	spdk_cpuset_zero(&g_all_cpuset);
2458 	SPDK_ENV_FOREACH_CORE(i) {
2459 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2460 	}
2461 
2462 	if (g_wait_for_tests) {
2463 		/* Do not perform any tests until RPC is received */
2464 		return;
2465 	}
2466 
2467 	bdevperf_construct_job_configs();
2468 }
2469 
2470 static void
2471 rpc_perform_tests_reset(void)
2472 {
2473 	/* Reset g_run_rc to 0 for the next test run. */
2474 	g_run_rc = 0;
2475 
2476 	/* Reset g_stats to 0 for the next test run. */
2477 	memset(&g_stats, 0, sizeof(g_stats));
2478 
2479 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2480 	g_show_performance_period_num = 0;
2481 }
2482 
2483 static void
2484 rpc_perform_tests_cb(void)
2485 {
2486 	struct spdk_json_write_ctx *w;
2487 	struct spdk_jsonrpc_request *request = g_request;
2488 
2489 	g_request = NULL;
2490 
2491 	if (g_run_rc == 0) {
2492 		w = spdk_jsonrpc_begin_result(request);
2493 		spdk_json_write_uint32(w, g_run_rc);
2494 		spdk_jsonrpc_end_result(request, w);
2495 	} else {
2496 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2497 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2498 	}
2499 
2500 	rpc_perform_tests_reset();
2501 }
2502 
2503 struct rpc_bdevperf_params {
2504 	int	time_in_sec;
2505 	char	*workload_type;
2506 	int	queue_depth;
2507 	char	*io_size;
2508 	int	rw_percentage;
2509 };
2510 
2511 static const struct spdk_json_object_decoder rpc_bdevperf_params_decoders[] = {
2512 	{"time_in_sec", offsetof(struct rpc_bdevperf_params, time_in_sec), spdk_json_decode_int32, true},
2513 	{"workload_type", offsetof(struct rpc_bdevperf_params, workload_type), spdk_json_decode_string, true},
2514 	{"queue_depth", offsetof(struct rpc_bdevperf_params, queue_depth), spdk_json_decode_int32, true},
2515 	{"io_size", offsetof(struct rpc_bdevperf_params, io_size), spdk_json_decode_string, true},
2516 	{"rw_percentage", offsetof(struct rpc_bdevperf_params, rw_percentage), spdk_json_decode_int32, true},
2517 };
2518 
2519 static void
2520 rpc_apply_bdevperf_params(struct rpc_bdevperf_params *params)
2521 {
2522 	if (params->workload_type) {
2523 		/* we need to clear previously settled parameter to avoid memory leak */
2524 		free(g_workload_type);
2525 		g_workload_type = strdup(params->workload_type);
2526 	}
2527 	if (params->queue_depth) {
2528 		g_queue_depth = params->queue_depth;
2529 	}
2530 	if (params->io_size) {
2531 		bdevperf_parse_arg('o', params->io_size);
2532 	}
2533 	if (params->time_in_sec) {
2534 		g_time_in_sec = params->time_in_sec;
2535 	}
2536 	if (params->rw_percentage) {
2537 		g_rw_percentage = params->rw_percentage;
2538 		g_mix_specified = true;
2539 	} else {
2540 		g_mix_specified = false;
2541 	}
2542 }
2543 
2544 static void
2545 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2546 {
2547 	struct rpc_bdevperf_params req = {}, backup = {};
2548 	int rc;
2549 
2550 	if (g_request != NULL) {
2551 		fprintf(stderr, "Another test is already in progress.\n");
2552 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2553 						 spdk_strerror(-EINPROGRESS));
2554 		return;
2555 	}
2556 
2557 	if (params) {
2558 		if (spdk_json_decode_object_relaxed(params, rpc_bdevperf_params_decoders,
2559 						    SPDK_COUNTOF(rpc_bdevperf_params_decoders),
2560 						    &req)) {
2561 			spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2562 							 "spdk_json_decode_object failed");
2563 			return;
2564 		}
2565 
2566 		if (g_workload_type) {
2567 			backup.workload_type = strdup(g_workload_type);
2568 		}
2569 		backup.queue_depth = g_queue_depth;
2570 		if (asprintf(&backup.io_size, "%d", g_io_size) < 0) {
2571 			fprintf(stderr, "Couldn't allocate memory for queue depth");
2572 			goto rpc_error;
2573 		}
2574 		backup.time_in_sec = g_time_in_sec;
2575 		backup.rw_percentage = g_rw_percentage;
2576 
2577 		rpc_apply_bdevperf_params(&req);
2578 
2579 		free(req.workload_type);
2580 		free(req.io_size);
2581 	}
2582 
2583 	rc = verify_test_params();
2584 
2585 	if (rc) {
2586 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2587 						 "Invalid parameters provided");
2588 		/* restore old params on error */
2589 		rpc_apply_bdevperf_params(&backup);
2590 		goto rpc_error;
2591 	}
2592 
2593 	g_request = request;
2594 
2595 	/* Only construct job configs at the first test run.  */
2596 	if (TAILQ_EMPTY(&job_config_list)) {
2597 		bdevperf_construct_job_configs();
2598 	} else {
2599 		bdevperf_construct_jobs();
2600 	}
2601 
2602 rpc_error:
2603 	free(backup.io_size);
2604 	free(backup.workload_type);
2605 }
2606 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2607 
2608 static void
2609 _bdevperf_job_drain(void *ctx)
2610 {
2611 	bdevperf_job_drain(ctx);
2612 }
2613 
2614 static void
2615 spdk_bdevperf_shutdown_cb(void)
2616 {
2617 	g_shutdown = true;
2618 	struct bdevperf_job *job, *tmp;
2619 
2620 	if (g_bdevperf.running_jobs == 0) {
2621 		bdevperf_test_done(NULL);
2622 		return;
2623 	}
2624 
2625 	/* Iterate jobs to stop all I/O */
2626 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2627 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2628 	}
2629 }
2630 
2631 static int
2632 bdevperf_parse_arg(int ch, char *arg)
2633 {
2634 	long long tmp;
2635 
2636 	if (ch == 'w') {
2637 		g_workload_type = strdup(arg);
2638 	} else if (ch == 'T') {
2639 		g_job_bdev_name = arg;
2640 	} else if (ch == 'z') {
2641 		g_wait_for_tests = true;
2642 	} else if (ch == 'Z') {
2643 		g_zcopy = true;
2644 	} else if (ch == 'X') {
2645 		g_abort = true;
2646 	} else if (ch == 'C') {
2647 		g_multithread_mode = true;
2648 	} else if (ch == 'f') {
2649 		g_continue_on_failure = true;
2650 	} else if (ch == 'j') {
2651 		g_bdevperf_conf_file = arg;
2652 	} else if (ch == 'F') {
2653 		char *endptr;
2654 
2655 		errno = 0;
2656 		g_zipf_theta = strtod(arg, &endptr);
2657 		if (errno || arg == endptr || g_zipf_theta < 0) {
2658 			fprintf(stderr, "Illegal zipf theta value %s\n", arg);
2659 			return -EINVAL;
2660 		}
2661 	} else if (ch == 'l') {
2662 		g_latency_display_level++;
2663 	} else if (ch == 'D') {
2664 		g_random_map = true;
2665 	} else if (ch == 'E') {
2666 		g_one_thread_per_lcore = true;
2667 	} else if (ch == 'J') {
2668 		g_rpc_log_file_name = arg;
2669 	} else if (ch == 'o') {
2670 		uint64_t size;
2671 
2672 		if (spdk_parse_capacity(arg, &size, NULL) != 0) {
2673 			fprintf(stderr, "Invalid IO size: %s\n", arg);
2674 			return -EINVAL;
2675 		}
2676 		g_io_size = (int)size;
2677 	} else if (ch == 'U') {
2678 		g_unique_writes = true;
2679 	} else {
2680 		tmp = spdk_strtoll(arg, 10);
2681 		if (tmp < 0) {
2682 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2683 			return tmp;
2684 		} else if (tmp >= INT_MAX) {
2685 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2686 			return -ERANGE;
2687 		}
2688 
2689 		switch (ch) {
2690 		case 'q':
2691 			g_queue_depth = tmp;
2692 			break;
2693 		case 't':
2694 			g_time_in_sec = tmp;
2695 			break;
2696 		case 'k':
2697 			g_timeout_in_sec = tmp;
2698 			break;
2699 		case 'M':
2700 			g_rw_percentage = tmp;
2701 			g_mix_specified = true;
2702 			break;
2703 		case 'P':
2704 			g_show_performance_ema_period = tmp;
2705 			break;
2706 		case 'S':
2707 			g_summarize_performance = false;
2708 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2709 			break;
2710 		default:
2711 			return -EINVAL;
2712 		}
2713 	}
2714 	return 0;
2715 }
2716 
2717 static void
2718 bdevperf_usage(void)
2719 {
2720 	printf(" -q <depth>                io depth\n");
2721 	printf(" -o <size>                 io size in bytes\n");
2722 	printf(" -w <type>                 io pattern type, must be one of " PATTERN_TYPES_STR "\n");
2723 	printf(" -t <time>                 time in seconds\n");
2724 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2725 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2726 	printf(" -P <num>                  number of moving average period\n");
2727 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2728 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2729 	printf("\t\t(only valid with -S)\n");
2730 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2731 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2732 	printf(" -f                        continue processing I/O even after failures\n");
2733 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2734 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2735 	printf(" -z                        start bdevperf, but wait for perform_tests RPC to start tests\n");
2736 	printf("                           (See examples/bdev/bdevperf/bdevperf.py)\n");
2737 	printf(" -X                        abort timed out I/O\n");
2738 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2739 	printf(" -j <filename>             use job config file\n");
2740 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2741 	printf(" -D                        use a random map for picking offsets not previously read or written (for all jobs)\n");
2742 	printf(" -E                        share per lcore thread among jobs. Available only if -j is not used.\n");
2743 	printf(" -J                        File name to open with append mode and log JSON RPC calls.\n");
2744 	printf(" -U                        generate unique data for each write I/O, has no effect on non-write I/O\n");
2745 }
2746 
2747 static void
2748 bdevperf_fini(void)
2749 {
2750 	free_job_config();
2751 	free(g_workload_type);
2752 
2753 	if (g_rpc_log_file != NULL) {
2754 		fclose(g_rpc_log_file);
2755 		g_rpc_log_file = NULL;
2756 	}
2757 }
2758 
2759 static int
2760 verify_test_params(void)
2761 {
2762 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2763 		goto out;
2764 	}
2765 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2766 		goto out;
2767 	}
2768 	if (!g_bdevperf_conf_file && !g_workload_type) {
2769 		goto out;
2770 	}
2771 	if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
2772 		printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
2773 		goto out;
2774 	}
2775 	if (g_time_in_sec <= 0) {
2776 		goto out;
2777 	}
2778 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2779 
2780 	if (g_timeout_in_sec < 0) {
2781 		goto out;
2782 	}
2783 
2784 	if (g_abort && !g_timeout_in_sec) {
2785 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2786 	}
2787 
2788 	if (g_show_performance_ema_period > 0 && g_summarize_performance) {
2789 		fprintf(stderr, "-P option must be specified with -S option\n");
2790 		return 1;
2791 	}
2792 
2793 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2794 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2795 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2796 		printf("Zero copy mechanism will not be used.\n");
2797 		g_zcopy = false;
2798 	}
2799 
2800 	if (g_bdevperf_conf_file) {
2801 		/* workload_type verification happens during config file parsing */
2802 		return 0;
2803 	}
2804 
2805 	if (!strcmp(g_workload_type, "verify") ||
2806 	    !strcmp(g_workload_type, "reset")) {
2807 		g_rw_percentage = 50;
2808 		g_verify = true;
2809 		if (!strcmp(g_workload_type, "reset")) {
2810 			g_reset = true;
2811 		}
2812 	}
2813 
2814 	if (!strcmp(g_workload_type, "read") ||
2815 	    !strcmp(g_workload_type, "randread") ||
2816 	    !strcmp(g_workload_type, "write") ||
2817 	    !strcmp(g_workload_type, "randwrite") ||
2818 	    !strcmp(g_workload_type, "verify") ||
2819 	    !strcmp(g_workload_type, "reset") ||
2820 	    !strcmp(g_workload_type, "unmap") ||
2821 	    !strcmp(g_workload_type, "write_zeroes") ||
2822 	    !strcmp(g_workload_type, "flush")) {
2823 		if (g_mix_specified) {
2824 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2825 				" only when using rw or randrw.\n");
2826 		}
2827 	}
2828 
2829 	if (!strcmp(g_workload_type, "rw") ||
2830 	    !strcmp(g_workload_type, "randrw")) {
2831 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2832 			fprintf(stderr,
2833 				"-M must be specified to value from 0 to 100 "
2834 				"for rw or randrw.\n");
2835 			return 1;
2836 		}
2837 	}
2838 
2839 	if (strcmp(g_workload_type, "randread") &&
2840 	    strcmp(g_workload_type, "randwrite") &&
2841 	    strcmp(g_workload_type, "randrw")) {
2842 		if (g_random_map) {
2843 			fprintf(stderr, "Ignoring -D option... Please use -D option"
2844 				" only when using randread, randwrite or randrw.\n");
2845 			return 1;
2846 		}
2847 	}
2848 
2849 	return 0;
2850 out:
2851 	return 1;
2852 }
2853 
2854 int
2855 main(int argc, char **argv)
2856 {
2857 	struct spdk_app_opts opts = {};
2858 	int rc;
2859 
2860 	/* Use the runtime PID to set the random seed */
2861 	srand(getpid());
2862 
2863 	spdk_app_opts_init(&opts, sizeof(opts));
2864 	opts.name = "bdevperf";
2865 	opts.rpc_addr = NULL;
2866 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2867 
2868 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:J:M:P:S:T:Xlj:DU", NULL,
2869 				      bdevperf_parse_arg, bdevperf_usage)) !=
2870 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2871 		return rc;
2872 	}
2873 
2874 	/* Set the default address if no rpc_addr was provided in args
2875 	 * and RPC is used for starting tests */
2876 	if (g_wait_for_tests && opts.rpc_addr == NULL) {
2877 		opts.rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2878 	}
2879 
2880 	if (read_job_config()) {
2881 		bdevperf_fini();
2882 		return 1;
2883 	}
2884 
2885 	if (g_rpc_log_file != NULL) {
2886 		opts.rpc_log_file = g_rpc_log_file;
2887 	}
2888 
2889 	if (verify_test_params() != 0 && !g_wait_for_tests) {
2890 		spdk_app_usage();
2891 		bdevperf_usage();
2892 		bdevperf_fini();
2893 		exit(1);
2894 	}
2895 
2896 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
2897 
2898 	spdk_app_fini();
2899 	bdevperf_fini();
2900 	return rc;
2901 }
2902