xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision 971ec01268cdf972b0fd02014ffe2998b80931e9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 #define PATTERN_TYPES_STR "(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush, write_zeroes)"
28 #define BDEVPERF_MAX_COREMASK_STRING 64
29 
30 struct bdevperf_task {
31 	struct iovec			iov;
32 	struct bdevperf_job		*job;
33 	struct spdk_bdev_io		*bdev_io;
34 	void				*buf;
35 	void				*verify_buf;
36 	void				*md_buf;
37 	void				*verify_md_buf;
38 	uint64_t			offset_blocks;
39 	struct bdevperf_task		*task_to_abort;
40 	enum spdk_bdev_io_type		io_type;
41 	TAILQ_ENTRY(bdevperf_task)	link;
42 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
43 };
44 
45 static char *g_workload_type = NULL;
46 static int g_io_size = 0;
47 /* initialize to invalid value so we can detect if user overrides it. */
48 static int g_rw_percentage = -1;
49 static bool g_verify = false;
50 static bool g_reset = false;
51 static bool g_continue_on_failure = false;
52 static bool g_abort = false;
53 static bool g_error_to_exit = false;
54 static int g_queue_depth = 0;
55 static uint64_t g_time_in_usec;
56 static bool g_summarize_performance = true;
57 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
58 static uint64_t g_show_performance_period_num = 0;
59 static uint64_t g_show_performance_ema_period = 0;
60 static int g_run_rc = 0;
61 static bool g_shutdown = false;
62 static uint64_t g_start_tsc;
63 static uint64_t g_shutdown_tsc;
64 static bool g_zcopy = false;
65 static struct spdk_thread *g_main_thread;
66 static int g_time_in_sec = 0;
67 static bool g_mix_specified = false;
68 static const char *g_job_bdev_name;
69 static bool g_wait_for_tests = false;
70 static struct spdk_jsonrpc_request *g_request = NULL;
71 static bool g_multithread_mode = false;
72 static int g_timeout_in_sec;
73 static struct spdk_conf *g_bdevperf_conf = NULL;
74 static const char *g_bdevperf_conf_file = NULL;
75 static double g_zipf_theta;
76 static bool g_random_map = false;
77 static bool g_unique_writes = false;
78 static bool g_hide_metadata = false;
79 
80 static struct spdk_cpuset g_all_cpuset;
81 static struct spdk_poller *g_perf_timer = NULL;
82 
83 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
84 static void rpc_perform_tests_cb(void);
85 static int bdevperf_parse_arg(int ch, char *arg);
86 static int verify_test_params(void);
87 static void bdevperf_usage(void);
88 
89 static uint32_t g_bdev_count = 0;
90 static uint32_t g_latency_display_level;
91 
92 static bool g_one_thread_per_lcore = false;
93 
94 static const double g_latency_cutoffs[] = {
95 	0.01,
96 	0.10,
97 	0.25,
98 	0.50,
99 	0.75,
100 	0.90,
101 	0.95,
102 	0.98,
103 	0.99,
104 	0.995,
105 	0.999,
106 	0.9999,
107 	0.99999,
108 	0.999999,
109 	0.9999999,
110 	-1,
111 };
112 
113 static const char *g_rpc_log_file_name = NULL;
114 static FILE *g_rpc_log_file = NULL;
115 
116 struct latency_info {
117 	uint64_t	min;
118 	uint64_t	max;
119 	uint64_t	total;
120 };
121 
122 
123 enum job_config_rw {
124 	JOB_CONFIG_RW_READ = 0,
125 	JOB_CONFIG_RW_WRITE,
126 	JOB_CONFIG_RW_RANDREAD,
127 	JOB_CONFIG_RW_RANDWRITE,
128 	JOB_CONFIG_RW_RW,
129 	JOB_CONFIG_RW_RANDRW,
130 	JOB_CONFIG_RW_VERIFY,
131 	JOB_CONFIG_RW_RESET,
132 	JOB_CONFIG_RW_UNMAP,
133 	JOB_CONFIG_RW_FLUSH,
134 	JOB_CONFIG_RW_WRITE_ZEROES,
135 };
136 
137 struct bdevperf_job {
138 	char				*name;
139 	struct spdk_bdev		*bdev;
140 	struct spdk_bdev_desc		*bdev_desc;
141 	struct spdk_io_channel		*ch;
142 	TAILQ_ENTRY(bdevperf_job)	link;
143 	struct spdk_thread		*thread;
144 
145 	enum job_config_rw		workload_type;
146 	int				io_size;
147 	int				rw_percentage;
148 	bool				is_random;
149 	bool				verify;
150 	bool				reset;
151 	bool				continue_on_failure;
152 	bool				unmap;
153 	bool				write_zeroes;
154 	bool				flush;
155 	bool				abort;
156 	int				queue_depth;
157 	unsigned int			seed;
158 
159 	uint64_t			io_completed;
160 	uint64_t			io_failed;
161 	uint64_t			io_timeout;
162 	uint64_t			prev_io_completed;
163 	double				ema_io_per_second;
164 	int				current_queue_depth;
165 	uint64_t			size_in_ios;
166 	uint64_t			ios_base;
167 	uint64_t			offset_in_ios;
168 	uint64_t			io_size_blocks;
169 	uint64_t			buf_size;
170 	uint32_t			dif_check_flags;
171 	bool				is_draining;
172 	bool				md_check;
173 	struct spdk_poller		*run_timer;
174 	struct spdk_poller		*reset_timer;
175 	struct spdk_bit_array		*outstanding;
176 	struct spdk_zipf		*zipf;
177 	TAILQ_HEAD(, bdevperf_task)	task_list;
178 	uint64_t			run_time_in_usec;
179 
180 	/* keep channel's histogram data before being destroyed */
181 	struct spdk_histogram_data	*histogram;
182 	struct spdk_bit_array		*random_map;
183 
184 	/* counter used for generating unique write data (-U option) */
185 	uint32_t			write_io_count;
186 };
187 
188 struct spdk_bdevperf {
189 	TAILQ_HEAD(, bdevperf_job)	jobs;
190 	uint32_t			running_jobs;
191 };
192 
193 static struct spdk_bdevperf g_bdevperf = {
194 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
195 	.running_jobs = 0,
196 };
197 
198 /* Storing values from a section of job config file */
199 struct job_config {
200 	const char			*name;
201 	const char			*filename;
202 	struct spdk_cpuset		cpumask;
203 	int				bs;
204 	int				iodepth;
205 	int				rwmixread;
206 	uint32_t			lcore;
207 	int64_t				offset;
208 	uint64_t			length;
209 	enum job_config_rw		rw;
210 	TAILQ_ENTRY(job_config)	link;
211 };
212 
213 TAILQ_HEAD(, job_config) job_config_list
214 	= TAILQ_HEAD_INITIALIZER(job_config_list);
215 
216 static bool g_performance_dump_active = false;
217 
218 struct bdevperf_stats {
219 	uint64_t			io_time_in_usec;
220 	double				total_io_per_second;
221 	double				total_mb_per_second;
222 	double				total_failed_per_second;
223 	double				total_timeout_per_second;
224 	double				min_latency;
225 	double				max_latency;
226 	double				average_latency;
227 	uint64_t			total_io_completed;
228 	uint64_t			total_tsc;
229 };
230 
231 struct bdevperf_aggregate_stats {
232 	struct bdevperf_job		*current_job;
233 	struct bdevperf_stats		total;
234 };
235 
236 static struct bdevperf_aggregate_stats g_stats = {.total.min_latency = (double)UINT64_MAX};
237 
238 struct lcore_thread {
239 	struct spdk_thread		*thread;
240 	uint32_t			lcore;
241 	TAILQ_ENTRY(lcore_thread)	link;
242 };
243 
244 TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
245 	= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
246 
247 
248 static char *
249 parse_workload_type(enum job_config_rw ret)
250 {
251 	switch (ret) {
252 	case JOB_CONFIG_RW_READ:
253 		return "read";
254 	case JOB_CONFIG_RW_RANDREAD:
255 		return "randread";
256 	case JOB_CONFIG_RW_WRITE:
257 		return "write";
258 	case JOB_CONFIG_RW_RANDWRITE:
259 		return "randwrite";
260 	case JOB_CONFIG_RW_VERIFY:
261 		return "verify";
262 	case JOB_CONFIG_RW_RESET:
263 		return "reset";
264 	case JOB_CONFIG_RW_UNMAP:
265 		return "unmap";
266 	case JOB_CONFIG_RW_WRITE_ZEROES:
267 		return "write_zeroes";
268 	case JOB_CONFIG_RW_FLUSH:
269 		return "flush";
270 	case JOB_CONFIG_RW_RW:
271 		return "rw";
272 	case JOB_CONFIG_RW_RANDRW:
273 		return "randrw";
274 	default:
275 		fprintf(stderr, "wrong workload_type code\n");
276 	}
277 
278 	return NULL;
279 }
280 
281 /*
282  * Cumulative Moving Average (CMA): average of all data up to current
283  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
284  * Simple Moving Average (SMA): unweighted mean of the previous n data
285  *
286  * Bdevperf supports CMA and EMA.
287  */
288 static double
289 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
290 {
291 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
292 }
293 
294 static double
295 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
296 {
297 	double io_completed, io_per_second;
298 
299 	io_completed = job->io_completed;
300 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
301 			/ g_show_performance_period_in_usec;
302 	job->prev_io_completed = io_completed;
303 
304 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
305 				  / (ema_period + 1);
306 	return job->ema_io_per_second;
307 }
308 
309 static void
310 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
311 		uint64_t total, uint64_t so_far)
312 {
313 	struct latency_info *latency_info = ctx;
314 
315 	if (count == 0) {
316 		return;
317 	}
318 
319 	latency_info->total += (start + end) / 2 * count;
320 
321 	if (so_far == count) {
322 		latency_info->min = start;
323 	}
324 
325 	if (so_far == total) {
326 		latency_info->max = end;
327 	}
328 }
329 
330 static void
331 bdevperf_job_stats_accumulate(struct bdevperf_stats *aggr_stats,
332 			      struct bdevperf_stats *job_stats)
333 {
334 	aggr_stats->total_io_per_second += job_stats->total_io_per_second;
335 	aggr_stats->total_mb_per_second += job_stats->total_mb_per_second;
336 	aggr_stats->total_failed_per_second += job_stats->total_failed_per_second;
337 	aggr_stats->total_timeout_per_second += job_stats->total_timeout_per_second;
338 	aggr_stats->total_io_completed += job_stats->total_io_completed;
339 	aggr_stats->total_tsc += job_stats->total_tsc;
340 
341 	if (job_stats->min_latency < aggr_stats->min_latency) {
342 		aggr_stats->min_latency = job_stats->min_latency;
343 	}
344 	if (job_stats->max_latency > aggr_stats->max_latency) {
345 		aggr_stats->max_latency = job_stats->max_latency;
346 	}
347 }
348 
349 static void
350 bdevperf_job_get_stats(struct bdevperf_job *job,
351 		       struct bdevperf_stats *job_stats,
352 		       uint64_t time_in_usec,
353 		       uint64_t ema_period)
354 {
355 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
356 	double average_latency = 0.0, min_latency, max_latency;
357 	uint64_t tsc_rate;
358 	uint64_t total_io;
359 	struct latency_info latency_info = {};
360 
361 	if (ema_period == 0) {
362 		io_per_second = get_cma_io_per_second(job, time_in_usec);
363 	} else {
364 		io_per_second = get_ema_io_per_second(job, ema_period);
365 	}
366 	tsc_rate = spdk_get_ticks_hz();
367 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
368 
369 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
370 
371 	total_io = job->io_completed + job->io_failed;
372 	if (total_io != 0) {
373 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
374 	}
375 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
376 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
377 
378 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
379 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
380 
381 	job_stats->total_io_per_second = io_per_second;
382 	job_stats->total_mb_per_second = mb_per_second;
383 	job_stats->total_failed_per_second = failed_per_second;
384 	job_stats->total_timeout_per_second = timeout_per_second;
385 	job_stats->total_io_completed = total_io;
386 	job_stats->total_tsc = latency_info.total;
387 	job_stats->average_latency = average_latency;
388 	job_stats->min_latency = min_latency;
389 	job_stats->max_latency = max_latency;
390 	job_stats->io_time_in_usec = time_in_usec;
391 }
392 
393 static void
394 performance_dump_job_stdout(struct bdevperf_job *job,
395 			    struct bdevperf_stats *job_stats)
396 {
397 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
398 		printf("Job: %s (Core Mask 0x%s, workload: %s, percentage: %d, depth: %d, IO size: %d)\n",
399 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
400 		       parse_workload_type(job->workload_type), job->rw_percentage,
401 		       job->queue_depth, job->io_size);
402 	} else {
403 		printf("Job: %s (Core Mask 0x%s, workload: %s, depth: %d, IO size: %d)\n",
404 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
405 		       parse_workload_type(job->workload_type), job->queue_depth, job->io_size);
406 	}
407 
408 
409 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
410 		printf("Job: %s ended in about %.2f seconds with error\n",
411 		       job->name, (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
412 	}
413 	if (job->verify) {
414 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
415 		       job->ios_base, job->size_in_ios);
416 	}
417 
418 	printf("\t %-20s: %10.2f %10.2f %10.2f",
419 	       job->name,
420 	       (float)job_stats->io_time_in_usec / SPDK_SEC_TO_USEC,
421 	       job_stats->total_io_per_second,
422 	       job_stats->total_mb_per_second);
423 	printf(" %10.2f %8.2f",
424 	       job_stats->total_failed_per_second,
425 	       job_stats->total_timeout_per_second);
426 	printf(" %10.2f %10.2f %10.2f\n",
427 	       job_stats->average_latency,
428 	       job_stats->min_latency,
429 	       job_stats->max_latency);
430 }
431 
432 static void
433 performance_dump_job_json(struct bdevperf_job *job,
434 			  struct spdk_json_write_ctx *w,
435 			  struct bdevperf_stats *job_stats)
436 {
437 	char core_mask_string[BDEVPERF_MAX_COREMASK_STRING] = {0};
438 
439 	spdk_json_write_named_string(w, "job", job->name);
440 	snprintf(core_mask_string, BDEVPERF_MAX_COREMASK_STRING,
441 		 "0x%s", spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
442 	spdk_json_write_named_string(w, "core_mask", core_mask_string);
443 	spdk_json_write_named_string(w, "workload", parse_workload_type(job->workload_type));
444 
445 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
446 		spdk_json_write_named_uint32(w, "percentage", job->rw_percentage);
447 	}
448 
449 	if (g_shutdown) {
450 		spdk_json_write_named_string(w, "status", "terminated");
451 	} else if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
452 		spdk_json_write_named_string(w, "status", "failed");
453 	} else {
454 		spdk_json_write_named_string(w, "status", "finished");
455 	}
456 
457 	if (job->verify) {
458 		spdk_json_write_named_object_begin(w, "verify_range");
459 		spdk_json_write_named_uint64(w, "start", job->ios_base);
460 		spdk_json_write_named_uint64(w, "length", job->size_in_ios);
461 		spdk_json_write_object_end(w);
462 	}
463 
464 	spdk_json_write_named_uint32(w, "queue_depth", job->queue_depth);
465 	spdk_json_write_named_uint32(w, "io_size", job->io_size);
466 	spdk_json_write_named_double(w, "runtime", (double)job_stats->io_time_in_usec / SPDK_SEC_TO_USEC);
467 	spdk_json_write_named_double(w, "iops", job_stats->total_io_per_second);
468 	spdk_json_write_named_double(w, "mibps", job_stats->total_mb_per_second);
469 	spdk_json_write_named_uint64(w, "io_failed", job->io_failed);
470 	spdk_json_write_named_uint64(w, "io_timeout", job->io_timeout);
471 	spdk_json_write_named_double(w, "avg_latency_us", job_stats->average_latency);
472 	spdk_json_write_named_double(w, "min_latency_us", job_stats->min_latency);
473 	spdk_json_write_named_double(w, "max_latency_us", job_stats->max_latency);
474 }
475 
476 static void
477 generate_data(struct bdevperf_job *job, void *buf, void *md_buf, bool unique)
478 {
479 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
480 	int buf_len = job->buf_size;
481 	int block_size = spdk_bdev_desc_get_block_size(job->bdev_desc);
482 	int md_size = spdk_bdev_desc_get_md_size(job->bdev_desc);
483 	int num_blocks = job->io_size_blocks;
484 
485 	if (buf_len < num_blocks * block_size) {
486 		return;
487 	}
488 
489 	if (md_buf == NULL) {
490 		data_block_size = block_size - md_size;
491 		md_buf = (char *)buf + data_block_size;
492 		md_offset = block_size;
493 	} else {
494 		data_block_size = block_size;
495 		md_offset = md_size;
496 	}
497 
498 	if (unique) {
499 		uint64_t io_count = job->write_io_count++;
500 		unsigned int i;
501 
502 		assert(md_size == 0 || md_size >= (int)sizeof(uint64_t));
503 
504 		while (offset_blocks < num_blocks) {
505 			inner_offset = 0;
506 			while (inner_offset < data_block_size) {
507 				*(uint64_t *)buf = (io_count << 32) | (offset_blocks + inner_offset);
508 				inner_offset += sizeof(uint64_t);
509 				buf += sizeof(uint64_t);
510 			}
511 			for (i = 0; i < md_size / sizeof(uint64_t); i++) {
512 				((uint64_t *)md_buf)[i] = (io_count << 32) | offset_blocks;
513 			}
514 			md_buf += md_offset;
515 			offset_blocks++;
516 		}
517 		return;
518 	}
519 
520 	while (offset_blocks < num_blocks) {
521 		inner_offset = 0;
522 		while (inner_offset < data_block_size) {
523 			*(uint32_t *)buf = offset_blocks + inner_offset;
524 			inner_offset += sizeof(uint32_t);
525 			buf += sizeof(uint32_t);
526 		}
527 		memset(md_buf, offset_blocks, md_size);
528 		md_buf += md_offset;
529 		offset_blocks++;
530 	}
531 }
532 
533 static bool
534 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
535 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
536 {
537 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
538 		return false;
539 	}
540 
541 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
542 
543 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
544 
545 	if (wr_md_buf != NULL) {
546 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
547 	}
548 
549 	return true;
550 }
551 
552 static bool
553 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
554 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
555 {
556 	int offset_blocks = 0, md_offset, data_block_size;
557 
558 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
559 		return false;
560 	}
561 
562 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
563 
564 	if (wr_md_buf == NULL) {
565 		data_block_size = block_size - md_size;
566 		wr_md_buf = (char *)wr_buf + data_block_size;
567 		rd_md_buf = (char *)rd_buf + data_block_size;
568 		md_offset = block_size;
569 	} else {
570 		data_block_size = block_size;
571 		md_offset = md_size;
572 	}
573 
574 	while (offset_blocks < num_blocks) {
575 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
576 			printf("data_block_size %d, num_blocks %d, offset %d\n", data_block_size, num_blocks,
577 			       offset_blocks);
578 			spdk_log_dump(stdout, "rd_buf", rd_buf, data_block_size);
579 			spdk_log_dump(stdout, "wr_buf", wr_buf, data_block_size);
580 			return false;
581 		}
582 
583 		wr_buf += block_size;
584 		rd_buf += block_size;
585 
586 		if (md_check) {
587 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
588 				printf("md_size %d, num_blocks %d, offset %d\n", md_size, num_blocks, offset_blocks);
589 				spdk_log_dump(stdout, "rd_md_buf", rd_md_buf, md_size);
590 				spdk_log_dump(stdout, "wr_md_buf", wr_md_buf, md_size);
591 				return false;
592 			}
593 
594 			wr_md_buf += md_offset;
595 			rd_md_buf += md_offset;
596 		}
597 
598 		offset_blocks++;
599 	}
600 
601 	return true;
602 }
603 
604 static void
605 free_job_config(void)
606 {
607 	struct job_config *config, *tmp;
608 
609 	spdk_conf_free(g_bdevperf_conf);
610 	g_bdevperf_conf = NULL;
611 
612 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
613 		TAILQ_REMOVE(&job_config_list, config, link);
614 		free(config);
615 	}
616 }
617 
618 static void
619 bdevperf_job_free(struct bdevperf_job *job)
620 {
621 	if (job->bdev_desc != NULL) {
622 		spdk_bdev_close(job->bdev_desc);
623 	}
624 
625 	spdk_histogram_data_free(job->histogram);
626 	spdk_bit_array_free(&job->outstanding);
627 	spdk_bit_array_free(&job->random_map);
628 	spdk_zipf_free(&job->zipf);
629 	free(job->name);
630 	free(job);
631 }
632 
633 static void
634 job_thread_exit(void *ctx)
635 {
636 	spdk_thread_exit(spdk_get_thread());
637 }
638 
639 static void
640 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
641 	     uint64_t total, uint64_t so_far)
642 {
643 	double so_far_pct;
644 	double **cutoff = ctx;
645 	uint64_t tsc_rate;
646 
647 	if (count == 0) {
648 		return;
649 	}
650 
651 	tsc_rate = spdk_get_ticks_hz();
652 	so_far_pct = (double)so_far / total;
653 	while (so_far_pct >= **cutoff && **cutoff > 0) {
654 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
655 		(*cutoff)++;
656 	}
657 }
658 
659 static void
660 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
661 	     uint64_t total, uint64_t so_far)
662 {
663 	double so_far_pct;
664 	uint64_t tsc_rate;
665 
666 	if (count == 0) {
667 		return;
668 	}
669 
670 	tsc_rate = spdk_get_ticks_hz();
671 	so_far_pct = (double)so_far * 100 / total;
672 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
673 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
674 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
675 	       so_far_pct, count);
676 }
677 
678 static void
679 bdevperf_test_done(void *ctx)
680 {
681 	struct bdevperf_job *job, *jtmp;
682 	struct bdevperf_task *task, *ttmp;
683 	struct lcore_thread *lthread, *lttmp;
684 	double average_latency = 0.0;
685 	uint64_t time_in_usec;
686 	int rc;
687 	struct spdk_json_write_ctx *w = NULL;
688 	struct bdevperf_stats job_stats = {0};
689 	struct spdk_cpuset cpu_mask;
690 
691 	if (g_time_in_usec) {
692 		g_stats.total.io_time_in_usec = g_time_in_usec;
693 
694 		if (!g_run_rc && g_performance_dump_active) {
695 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
696 			return;
697 		}
698 	}
699 
700 	spdk_poller_unregister(&g_perf_timer);
701 
702 	if (g_shutdown) {
703 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
704 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
705 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
706 		printf("Received shutdown signal, test time was about %.6f seconds\n",
707 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
708 	}
709 	/* Send RPC response if g_run_rc indicate success, or shutdown request was sent to bdevperf.
710 	 * rpc_perform_tests_cb will send error response in case of error.
711 	 */
712 	if ((g_run_rc == 0 || g_shutdown) && g_request) {
713 		w = spdk_jsonrpc_begin_result(g_request);
714 		spdk_json_write_object_begin(w);
715 		spdk_json_write_named_array_begin(w, "results");
716 	}
717 
718 	printf("\n%*s\n", 107, "Latency(us)");
719 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
720 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
721 
722 
723 	spdk_cpuset_zero(&cpu_mask);
724 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
725 		spdk_cpuset_or(&cpu_mask, spdk_thread_get_cpumask(job->thread));
726 		memset(&job_stats, 0, sizeof(job_stats));
727 		bdevperf_job_get_stats(job, &job_stats, job->run_time_in_usec, 0);
728 		bdevperf_job_stats_accumulate(&g_stats.total, &job_stats);
729 		performance_dump_job_stdout(job, &job_stats);
730 		if (w) {
731 			spdk_json_write_object_begin(w);
732 			performance_dump_job_json(job, w, &job_stats);
733 			spdk_json_write_object_end(w);
734 		}
735 	}
736 
737 	if (w) {
738 		spdk_json_write_array_end(w);
739 		spdk_json_write_named_uint32(w, "core_count", spdk_cpuset_count(&cpu_mask));
740 		spdk_json_write_object_end(w);
741 		spdk_jsonrpc_end_result(g_request, w);
742 	}
743 	printf("\r =================================================================================="
744 	       "=================================\n");
745 	printf("\r %-28s: %10s %10.2f %10.2f",
746 	       "Total", "", g_stats.total.total_io_per_second, g_stats.total.total_mb_per_second);
747 	printf(" %10.2f %8.2f",
748 	       g_stats.total.total_failed_per_second, g_stats.total.total_timeout_per_second);
749 
750 	if (g_stats.total.total_io_completed != 0) {
751 		average_latency = ((double)g_stats.total.total_tsc / g_stats.total.total_io_completed) *
752 				  SPDK_SEC_TO_USEC /
753 				  spdk_get_ticks_hz();
754 	}
755 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.total.min_latency,
756 	       g_stats.total.max_latency);
757 
758 	if (g_latency_display_level == 0 || g_stats.total.total_io_completed == 0) {
759 		goto clean;
760 	}
761 
762 	printf("\n Latency summary\n");
763 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
764 		printf("\r =============================================\n");
765 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
766 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
767 
768 		const double *cutoff = g_latency_cutoffs;
769 
770 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
771 
772 		printf("\n");
773 	}
774 
775 	if (g_latency_display_level == 1) {
776 		goto clean;
777 	}
778 
779 	printf("\r Latency histogram\n");
780 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
781 		printf("\r =============================================\n");
782 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
783 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
784 
785 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
786 		printf("\n");
787 	}
788 
789 clean:
790 	fflush(stdout);
791 
792 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
793 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
794 
795 		if (!g_one_thread_per_lcore) {
796 			spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
797 		}
798 
799 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
800 			TAILQ_REMOVE(&job->task_list, task, link);
801 			spdk_free(task->buf);
802 			spdk_free(task->verify_buf);
803 			spdk_free(task->md_buf);
804 			spdk_free(task->verify_md_buf);
805 			free(task);
806 		}
807 
808 		bdevperf_job_free(job);
809 	}
810 
811 	if (g_one_thread_per_lcore) {
812 		TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
813 			TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
814 			spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
815 			free(lthread);
816 		}
817 	}
818 
819 	if (g_bdevperf_conf == NULL) {
820 		free_job_config();
821 	}
822 
823 	rc = g_run_rc;
824 	if (g_request && !g_shutdown) {
825 		rpc_perform_tests_cb();
826 		if (rc != 0) {
827 			spdk_app_stop(rc);
828 		}
829 	} else {
830 		spdk_app_stop(rc);
831 	}
832 }
833 
834 static void
835 bdevperf_job_end(void *ctx)
836 {
837 	struct bdevperf_job *job = ctx;
838 
839 	assert(g_main_thread == spdk_get_thread());
840 
841 	if (job->bdev_desc != NULL) {
842 		spdk_bdev_close(job->bdev_desc);
843 		job->bdev_desc = NULL;
844 	}
845 
846 	if (--g_bdevperf.running_jobs == 0) {
847 		bdevperf_test_done(NULL);
848 	}
849 }
850 
851 static void
852 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
853 {
854 	struct spdk_histogram_data *job_hist = cb_arg;
855 
856 	if (status == 0) {
857 		spdk_histogram_data_merge(job_hist, histogram);
858 	}
859 }
860 
861 static void
862 bdevperf_job_empty(struct bdevperf_job *job)
863 {
864 	uint64_t end_tsc = 0;
865 
866 	end_tsc = spdk_get_ticks() - g_start_tsc;
867 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
868 	/* keep histogram info before channel is destroyed */
869 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
870 					job->histogram);
871 	spdk_put_io_channel(job->ch);
872 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, job);
873 }
874 
875 static void
876 bdevperf_end_task(struct bdevperf_task *task)
877 {
878 	struct bdevperf_job     *job = task->job;
879 
880 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
881 	if (job->is_draining) {
882 		if (job->current_queue_depth == 0) {
883 			bdevperf_job_empty(job);
884 		}
885 	}
886 }
887 
888 static void
889 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
890 {
891 	struct bdevperf_job	*job = task->job;
892 
893 	task->bdev_io_wait.bdev = job->bdev;
894 	task->bdev_io_wait.cb_fn = cb_fn;
895 	task->bdev_io_wait.cb_arg = task;
896 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
897 }
898 
899 static int
900 bdevperf_job_drain(void *ctx)
901 {
902 	struct bdevperf_job *job = ctx;
903 
904 	spdk_poller_unregister(&job->run_timer);
905 	if (job->reset) {
906 		spdk_poller_unregister(&job->reset_timer);
907 	}
908 
909 	job->is_draining = true;
910 
911 	return -1;
912 }
913 
914 static int
915 bdevperf_job_drain_timer(void *ctx)
916 {
917 	struct bdevperf_job *job = ctx;
918 
919 	bdevperf_job_drain(ctx);
920 	if (job->current_queue_depth == 0) {
921 		bdevperf_job_empty(job);
922 	}
923 
924 	return SPDK_POLLER_BUSY;
925 }
926 
927 static void
928 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
929 {
930 	struct bdevperf_task	*task = cb_arg;
931 	struct bdevperf_job	*job = task->job;
932 
933 	job->current_queue_depth--;
934 
935 	if (success) {
936 		job->io_completed++;
937 	} else {
938 		job->io_failed++;
939 		if (!job->continue_on_failure) {
940 			bdevperf_job_drain(job);
941 			g_run_rc = -1;
942 		}
943 	}
944 
945 	spdk_bdev_free_io(bdev_io);
946 	bdevperf_end_task(task);
947 }
948 
949 static int
950 bdevperf_verify_dif(struct bdevperf_task *task)
951 {
952 	struct bdevperf_job	*job = task->job;
953 	struct spdk_bdev	*bdev = job->bdev;
954 	struct spdk_dif_ctx	dif_ctx;
955 	struct spdk_dif_error	err_blk = {};
956 	int			rc;
957 	struct spdk_dif_ctx_init_ext_opts dif_opts;
958 
959 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
960 	dif_opts.dif_pi_format = spdk_bdev_get_dif_pi_format(bdev);
961 	rc = spdk_dif_ctx_init(&dif_ctx,
962 			       spdk_bdev_get_block_size(bdev),
963 			       spdk_bdev_get_md_size(bdev),
964 			       spdk_bdev_is_md_interleaved(bdev),
965 			       spdk_bdev_is_dif_head_of_md(bdev),
966 			       spdk_bdev_get_dif_type(bdev),
967 			       job->dif_check_flags,
968 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
969 	if (rc != 0) {
970 		fprintf(stderr, "Initialization of DIF context failed\n");
971 		return rc;
972 	}
973 
974 	if (spdk_bdev_is_md_interleaved(bdev)) {
975 		rc = spdk_dif_verify(&task->iov, 1, job->io_size_blocks, &dif_ctx, &err_blk);
976 	} else {
977 		struct iovec md_iov = {
978 			.iov_base	= task->md_buf,
979 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
980 		};
981 
982 		rc = spdk_dix_verify(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
983 	}
984 
985 	if (rc != 0) {
986 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
987 			err_blk.err_type, err_blk.err_offset);
988 	}
989 
990 	return rc;
991 }
992 
993 static void
994 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
995 {
996 	struct bdevperf_job	*job;
997 	struct bdevperf_task	*task = cb_arg;
998 	uint64_t		offset_in_ios;
999 	int			rc;
1000 
1001 	job = task->job;
1002 
1003 	if (g_error_to_exit == true) {
1004 		bdevperf_job_drain(job);
1005 	} else if (!success) {
1006 		if (!job->reset && !job->continue_on_failure) {
1007 			bdevperf_job_drain(job);
1008 			g_run_rc = -1;
1009 			g_error_to_exit = true;
1010 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
1011 			       task->offset_blocks, job->name);
1012 		}
1013 	} else if (job->verify || job->reset) {
1014 		if (!verify_data(task->buf, job->buf_size,
1015 				 task->iov.iov_base, job->buf_size,
1016 				 spdk_bdev_desc_get_block_size(job->bdev_desc),
1017 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
1018 				 spdk_bdev_desc_get_md_size(job->bdev_desc),
1019 				 job->io_size_blocks, job->md_check)) {
1020 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
1021 			bdevperf_job_drain(job);
1022 			g_run_rc = -1;
1023 		}
1024 	} else if (job->dif_check_flags != 0) {
1025 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_desc_get_md_size(job->bdev_desc) != 0) {
1026 			rc = bdevperf_verify_dif(task);
1027 			if (rc != 0) {
1028 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
1029 				       task->offset_blocks, job->name);
1030 
1031 				success = false;
1032 				if (!job->reset && !job->continue_on_failure) {
1033 					bdevperf_job_drain(job);
1034 					g_run_rc = -1;
1035 					g_error_to_exit = true;
1036 				}
1037 			}
1038 		}
1039 	}
1040 
1041 	job->current_queue_depth--;
1042 
1043 	if (success) {
1044 		job->io_completed++;
1045 	} else {
1046 		job->io_failed++;
1047 	}
1048 
1049 	if (job->verify) {
1050 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1051 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1052 
1053 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1054 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
1055 	}
1056 
1057 	spdk_bdev_free_io(bdev_io);
1058 
1059 	/*
1060 	 * is_draining indicates when time has expired for the test run
1061 	 * and we are just waiting for the previously submitted I/O
1062 	 * to complete.  In this case, do not submit a new I/O to replace
1063 	 * the one just completed.
1064 	 */
1065 	if (!job->is_draining) {
1066 		bdevperf_submit_single(job, task);
1067 	} else {
1068 		bdevperf_end_task(task);
1069 	}
1070 }
1071 
1072 static void
1073 bdevperf_verify_submit_read(void *cb_arg)
1074 {
1075 	struct bdevperf_job	*job;
1076 	struct bdevperf_task	*task = cb_arg;
1077 	int			rc;
1078 
1079 	job = task->job;
1080 
1081 	task->iov.iov_base = task->verify_buf;
1082 	task->iov.iov_len = job->buf_size;
1083 
1084 	/* Read the data back in */
1085 	rc = spdk_bdev_readv_blocks_with_md(job->bdev_desc, job->ch, &task->iov, 1, task->verify_md_buf,
1086 					    task->offset_blocks, job->io_size_blocks,
1087 					    bdevperf_complete, task);
1088 
1089 	if (rc == -ENOMEM) {
1090 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
1091 	} else if (rc != 0) {
1092 		printf("Failed to submit read: %d\n", rc);
1093 		bdevperf_job_drain(job);
1094 		g_run_rc = rc;
1095 	}
1096 }
1097 
1098 static void
1099 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
1100 			       void *cb_arg)
1101 {
1102 	if (success) {
1103 		spdk_bdev_free_io(bdev_io);
1104 		bdevperf_verify_submit_read(cb_arg);
1105 	} else {
1106 		bdevperf_complete(bdev_io, success, cb_arg);
1107 	}
1108 }
1109 
1110 static void
1111 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1112 {
1113 	if (!success) {
1114 		bdevperf_complete(bdev_io, success, cb_arg);
1115 		return;
1116 	}
1117 
1118 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
1119 }
1120 
1121 static int
1122 bdevperf_generate_dif(struct bdevperf_task *task)
1123 {
1124 	struct bdevperf_job	*job = task->job;
1125 	struct spdk_bdev_desc	*desc = job->bdev_desc;
1126 	struct spdk_dif_ctx	dif_ctx;
1127 	int			rc;
1128 	struct spdk_dif_ctx_init_ext_opts dif_opts;
1129 
1130 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
1131 	dif_opts.dif_pi_format = spdk_bdev_desc_get_dif_pi_format(desc);
1132 	rc = spdk_dif_ctx_init(&dif_ctx,
1133 			       spdk_bdev_desc_get_block_size(desc),
1134 			       spdk_bdev_desc_get_md_size(desc),
1135 			       spdk_bdev_desc_is_md_interleaved(desc),
1136 			       spdk_bdev_desc_is_dif_head_of_md(desc),
1137 			       spdk_bdev_desc_get_dif_type(desc),
1138 			       job->dif_check_flags,
1139 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
1140 	if (rc != 0) {
1141 		fprintf(stderr, "Initialization of DIF context failed\n");
1142 		return rc;
1143 	}
1144 
1145 	if (spdk_bdev_desc_is_md_interleaved(desc)) {
1146 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
1147 	} else {
1148 		struct iovec md_iov = {
1149 			.iov_base	= task->md_buf,
1150 			.iov_len	= spdk_bdev_desc_get_md_size(desc) * job->io_size_blocks,
1151 		};
1152 
1153 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
1154 	}
1155 
1156 	if (rc != 0) {
1157 		fprintf(stderr, "Generation of DIF/DIX failed\n");
1158 	}
1159 
1160 	return rc;
1161 }
1162 
1163 static void
1164 bdevperf_submit_task(void *arg)
1165 {
1166 	struct bdevperf_task	*task = arg;
1167 	struct bdevperf_job	*job = task->job;
1168 	struct spdk_bdev_desc	*desc;
1169 	struct spdk_io_channel	*ch;
1170 	spdk_bdev_io_completion_cb cb_fn;
1171 	uint64_t		offset_in_ios;
1172 	int			rc = 0;
1173 
1174 	desc = job->bdev_desc;
1175 	ch = job->ch;
1176 
1177 	switch (task->io_type) {
1178 	case SPDK_BDEV_IO_TYPE_WRITE:
1179 		if (spdk_bdev_desc_get_md_size(desc) != 0 && job->dif_check_flags != 0) {
1180 			rc = bdevperf_generate_dif(task);
1181 		}
1182 		if (rc == 0) {
1183 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
1184 
1185 			if (g_zcopy) {
1186 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
1187 				return;
1188 			} else {
1189 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
1190 								     task->md_buf,
1191 								     task->offset_blocks,
1192 								     job->io_size_blocks,
1193 								     cb_fn, task);
1194 			}
1195 		}
1196 		break;
1197 	case SPDK_BDEV_IO_TYPE_FLUSH:
1198 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
1199 					    job->io_size_blocks, bdevperf_complete, task);
1200 		break;
1201 	case SPDK_BDEV_IO_TYPE_UNMAP:
1202 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
1203 					    job->io_size_blocks, bdevperf_complete, task);
1204 		break;
1205 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1206 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
1207 						   job->io_size_blocks, bdevperf_complete, task);
1208 		break;
1209 	case SPDK_BDEV_IO_TYPE_READ:
1210 		if (g_zcopy) {
1211 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
1212 						   true, bdevperf_zcopy_populate_complete, task);
1213 		} else {
1214 			rc = spdk_bdev_readv_blocks_with_md(desc, ch, &task->iov, 1,
1215 							    task->md_buf,
1216 							    task->offset_blocks,
1217 							    job->io_size_blocks,
1218 							    bdevperf_complete, task);
1219 		}
1220 		break;
1221 	case SPDK_BDEV_IO_TYPE_ABORT:
1222 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
1223 		break;
1224 	default:
1225 		assert(false);
1226 		rc = -EINVAL;
1227 		break;
1228 	}
1229 
1230 	if (rc == -ENOMEM) {
1231 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1232 		return;
1233 	} else if (rc != 0) {
1234 		printf("Failed to submit bdev_io: %d\n", rc);
1235 		if (job->verify) {
1236 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1237 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1238 
1239 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1240 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1241 		}
1242 		bdevperf_job_drain(job);
1243 		g_run_rc = rc;
1244 		return;
1245 	}
1246 
1247 	job->current_queue_depth++;
1248 }
1249 
1250 static void
1251 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1252 {
1253 	struct bdevperf_task	*task = cb_arg;
1254 	struct bdevperf_job	*job = task->job;
1255 	struct iovec		*iovs;
1256 	int			iovcnt;
1257 
1258 	if (!success) {
1259 		bdevperf_job_drain(job);
1260 		g_run_rc = -1;
1261 		return;
1262 	}
1263 
1264 	task->bdev_io = bdev_io;
1265 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1266 
1267 	if (job->verify || job->reset) {
1268 		/* When job->verify or job->reset is enabled, task->buf is used for
1269 		 *  verification of read after write.  For write I/O, when zcopy APIs
1270 		 *  are used, task->buf cannot be used, and data must be written to
1271 		 *  the data buffer allocated underneath bdev layer instead.
1272 		 *  Hence we copy task->buf to the allocated data buffer here.
1273 		 */
1274 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1275 		assert(iovcnt == 1);
1276 		assert(iovs != NULL);
1277 
1278 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1279 			  spdk_bdev_desc_get_block_size(job->bdev_desc),
1280 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1281 			  spdk_bdev_desc_get_md_size(job->bdev_desc), job->io_size_blocks);
1282 	}
1283 
1284 	bdevperf_submit_task(task);
1285 }
1286 
1287 static void
1288 bdevperf_prep_zcopy_write_task(void *arg)
1289 {
1290 	struct bdevperf_task	*task = arg;
1291 	struct bdevperf_job	*job = task->job;
1292 	int			rc;
1293 
1294 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1295 				   task->offset_blocks, job->io_size_blocks,
1296 				   false, bdevperf_zcopy_get_buf_complete, task);
1297 	if (rc != 0) {
1298 		assert(rc == -ENOMEM);
1299 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1300 		return;
1301 	}
1302 
1303 	job->current_queue_depth++;
1304 }
1305 
1306 static struct bdevperf_task *
1307 bdevperf_job_get_task(struct bdevperf_job *job)
1308 {
1309 	struct bdevperf_task *task;
1310 
1311 	task = TAILQ_FIRST(&job->task_list);
1312 	if (!task) {
1313 		printf("Task allocation failed\n");
1314 		abort();
1315 	}
1316 
1317 	TAILQ_REMOVE(&job->task_list, task, link);
1318 	return task;
1319 }
1320 
1321 static void
1322 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1323 {
1324 	uint64_t offset_in_ios;
1325 	uint64_t rand_value;
1326 	uint32_t first_clear;
1327 
1328 	if (job->zipf) {
1329 		offset_in_ios = spdk_zipf_generate(job->zipf);
1330 	} else if (job->is_random) {
1331 		/* RAND_MAX is only INT32_MAX, so use 2 calls to rand_r to
1332 		 * get a large enough value to ensure we are issuing I/O
1333 		 * uniformly across the whole bdev.
1334 		 */
1335 		rand_value = (uint64_t)rand_r(&job->seed) * RAND_MAX + rand_r(&job->seed);
1336 		offset_in_ios = rand_value % job->size_in_ios;
1337 
1338 		if (g_random_map) {
1339 			/* Make sure, that the offset does not exceed the maximum size
1340 			 * of the bit array (verified during job creation)
1341 			 */
1342 			assert(offset_in_ios < UINT32_MAX);
1343 
1344 			first_clear = spdk_bit_array_find_first_clear(job->random_map, (uint32_t)offset_in_ios);
1345 
1346 			if (first_clear == UINT32_MAX) {
1347 				first_clear = spdk_bit_array_find_first_clear(job->random_map, 0);
1348 
1349 				if (first_clear == UINT32_MAX) {
1350 					/* If there are no more clear bits in the array, we start over
1351 					 * and select the previously selected random value.
1352 					 */
1353 					spdk_bit_array_clear_mask(job->random_map);
1354 					first_clear = (uint32_t)offset_in_ios;
1355 				}
1356 			}
1357 
1358 			spdk_bit_array_set(job->random_map, first_clear);
1359 
1360 			offset_in_ios = first_clear;
1361 		}
1362 	} else {
1363 		offset_in_ios = job->offset_in_ios++;
1364 		if (job->offset_in_ios == job->size_in_ios) {
1365 			job->offset_in_ios = 0;
1366 		}
1367 
1368 		/* Increment of offset_in_ios if there's already an outstanding IO
1369 		 * to that location. We only need this with job->verify as random
1370 		 * offsets are not supported with job->verify at this time.
1371 		 */
1372 		if (job->verify) {
1373 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1374 
1375 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1376 				offset_in_ios = job->offset_in_ios++;
1377 				if (job->offset_in_ios == job->size_in_ios) {
1378 					job->offset_in_ios = 0;
1379 				}
1380 			}
1381 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1382 		}
1383 	}
1384 
1385 	/* For multi-thread to same job, offset_in_ios is relative
1386 	 * to the LBA range assigned for that job. job->offset_blocks
1387 	 * is absolute (entire bdev LBA range).
1388 	 */
1389 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1390 
1391 	if (job->flush) {
1392 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1393 	} else if (job->unmap) {
1394 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1395 	} else if (job->write_zeroes) {
1396 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1397 	} else if ((job->rw_percentage == 100) ||
1398 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1399 		assert(!job->verify);
1400 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1401 		if (!g_zcopy) {
1402 			task->iov.iov_base = task->buf;
1403 			task->iov.iov_len = job->buf_size;
1404 		}
1405 	} else {
1406 		if (job->verify || job->reset || g_unique_writes) {
1407 			generate_data(job, task->buf, task->md_buf, g_unique_writes);
1408 		}
1409 		if (g_zcopy) {
1410 			bdevperf_prep_zcopy_write_task(task);
1411 			return;
1412 		} else {
1413 			task->iov.iov_base = task->buf;
1414 			task->iov.iov_len = job->buf_size;
1415 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1416 		}
1417 	}
1418 
1419 	bdevperf_submit_task(task);
1420 }
1421 
1422 static int reset_job(void *arg);
1423 
1424 static void
1425 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1426 {
1427 	struct bdevperf_task	*task = cb_arg;
1428 	struct bdevperf_job	*job = task->job;
1429 
1430 	if (!success) {
1431 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1432 		bdevperf_job_drain(job);
1433 		g_run_rc = -1;
1434 	}
1435 
1436 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1437 	spdk_bdev_free_io(bdev_io);
1438 
1439 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1440 						10 * SPDK_SEC_TO_USEC);
1441 }
1442 
1443 static int
1444 reset_job(void *arg)
1445 {
1446 	struct bdevperf_job *job = arg;
1447 	struct bdevperf_task *task;
1448 	int rc;
1449 
1450 	spdk_poller_unregister(&job->reset_timer);
1451 
1452 	/* Do reset. */
1453 	task = bdevperf_job_get_task(job);
1454 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1455 			     reset_cb, task);
1456 	if (rc) {
1457 		printf("Reset failed: %d\n", rc);
1458 		bdevperf_job_drain(job);
1459 		g_run_rc = -1;
1460 	}
1461 
1462 	return -1;
1463 }
1464 
1465 static void
1466 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1467 {
1468 	struct bdevperf_job *job = cb_arg;
1469 	struct bdevperf_task *task;
1470 
1471 	job->io_timeout++;
1472 
1473 	if (job->is_draining || !job->abort ||
1474 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1475 		return;
1476 	}
1477 
1478 	task = bdevperf_job_get_task(job);
1479 	if (task == NULL) {
1480 		return;
1481 	}
1482 
1483 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1484 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1485 
1486 	bdevperf_submit_task(task);
1487 }
1488 
1489 static void
1490 bdevperf_job_run(void *ctx)
1491 {
1492 	struct bdevperf_job *job = ctx;
1493 	struct bdevperf_task *task;
1494 	int i;
1495 
1496 	/* Submit initial I/O for this job. Each time one
1497 	 * completes, another will be submitted. */
1498 
1499 	/* Start a timer to stop this I/O chain when the run is over */
1500 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1501 	if (job->reset) {
1502 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1503 							10 * SPDK_SEC_TO_USEC);
1504 	}
1505 
1506 	for (i = 0; i < job->queue_depth; i++) {
1507 		task = bdevperf_job_get_task(job);
1508 		bdevperf_submit_single(job, task);
1509 	}
1510 }
1511 
1512 static void
1513 _performance_dump_done(void *ctx)
1514 {
1515 	struct bdevperf_aggregate_stats *aggregate = ctx;
1516 	struct bdevperf_stats *stats = &aggregate->total;
1517 	double average_latency;
1518 
1519 	if (g_summarize_performance) {
1520 		printf("%12.2f IOPS, %8.2f MiB/s", stats->total_io_per_second, stats->total_mb_per_second);
1521 		printf("\r");
1522 	} else {
1523 		printf("\r =================================================================================="
1524 		       "=================================\n");
1525 		printf("\r %-28s: %10s %10.2f %10.2f",
1526 		       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1527 		printf(" %10.2f %8.2f",
1528 		       stats->total_failed_per_second, stats->total_timeout_per_second);
1529 
1530 		average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1531 				  spdk_get_ticks_hz();
1532 		printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1533 		printf("\n");
1534 	}
1535 
1536 	fflush(stdout);
1537 
1538 	g_performance_dump_active = false;
1539 
1540 	free(aggregate);
1541 }
1542 
1543 static void
1544 _performance_dump(void *ctx)
1545 {
1546 	struct bdevperf_aggregate_stats *stats = ctx;
1547 	struct bdevperf_stats job_stats = {0};
1548 	struct bdevperf_job *job = stats->current_job;
1549 	uint64_t time_in_usec;
1550 
1551 	if (job->io_failed > 0 && !job->continue_on_failure) {
1552 		time_in_usec = job->run_time_in_usec;
1553 	} else {
1554 		time_in_usec = stats->total.io_time_in_usec;
1555 	}
1556 
1557 	bdevperf_job_get_stats(job, &job_stats, time_in_usec, g_show_performance_ema_period);
1558 	bdevperf_job_stats_accumulate(&stats->total, &job_stats);
1559 	if (!g_summarize_performance) {
1560 		performance_dump_job_stdout(stats->current_job, &job_stats);
1561 	}
1562 
1563 	/* This assumes the jobs list is static after start up time.
1564 	 * That's true right now, but if that ever changed this would need a lock. */
1565 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1566 	if (stats->current_job == NULL) {
1567 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1568 	} else {
1569 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1570 	}
1571 }
1572 
1573 static int
1574 performance_statistics_thread(void *arg)
1575 {
1576 	struct bdevperf_aggregate_stats *aggregate;
1577 	struct bdevperf_stats *stats;
1578 
1579 
1580 	if (g_performance_dump_active) {
1581 		return -1;
1582 	}
1583 
1584 	g_performance_dump_active = true;
1585 
1586 	aggregate = calloc(1, sizeof(*aggregate));
1587 	if (aggregate == NULL) {
1588 		return -1;
1589 	}
1590 	stats = &aggregate->total;
1591 	stats->min_latency = (double)UINT64_MAX;
1592 
1593 	g_show_performance_period_num++;
1594 
1595 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1596 
1597 	/* Iterate all of the jobs to gather stats
1598 	 * These jobs will not get removed here until a final performance dump is run,
1599 	 * so this should be safe without locking.
1600 	 */
1601 	aggregate->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1602 	if (aggregate->current_job == NULL) {
1603 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, aggregate);
1604 	} else {
1605 		spdk_thread_send_msg(aggregate->current_job->thread, _performance_dump, aggregate);
1606 	}
1607 
1608 	return -1;
1609 }
1610 
1611 static void
1612 bdevperf_test(void)
1613 {
1614 	struct bdevperf_job *job;
1615 
1616 	if (TAILQ_EMPTY(&g_bdevperf.jobs)) {
1617 		if (g_request) {
1618 			spdk_jsonrpc_send_error_response_fmt(g_request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
1619 							     "No jobs defined or bdevs created");
1620 			g_request = NULL;
1621 		}
1622 		return;
1623 	}
1624 
1625 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1626 	fflush(stdout);
1627 
1628 	/* Start a timer to dump performance numbers */
1629 	g_start_tsc = spdk_get_ticks();
1630 	if (!g_summarize_performance) {
1631 		printf("%*s\n", 107, "Latency(us)");
1632 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1633 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1634 	}
1635 	if (!g_perf_timer) {
1636 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1637 						    g_show_performance_period_in_usec);
1638 	}
1639 
1640 	/* Iterate jobs to start all I/O */
1641 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1642 		spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1643 
1644 		g_bdevperf.running_jobs++;
1645 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1646 	}
1647 }
1648 
1649 static void
1650 _bdevperf_job_drain(void *ctx)
1651 {
1652 	bdevperf_job_drain(ctx);
1653 }
1654 
1655 static void
1656 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1657 {
1658 	struct bdevperf_job *job = event_ctx;
1659 
1660 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1661 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
1662 	}
1663 }
1664 
1665 static void
1666 bdevperf_histogram_status_cb(void *cb_arg, int status)
1667 {
1668 	if (status != 0) {
1669 		g_run_rc = status;
1670 		if (g_continue_on_failure == false) {
1671 			g_error_to_exit = true;
1672 		}
1673 	}
1674 
1675 	if (--g_bdev_count == 0) {
1676 		if (g_run_rc == 0) {
1677 			/* Ready to run the test */
1678 			bdevperf_test();
1679 		} else {
1680 			bdevperf_test_done(NULL);
1681 		}
1682 	}
1683 }
1684 
1685 static uint32_t g_construct_job_count = 0;
1686 
1687 static int
1688 _bdevperf_enable_histogram(void *ctx, struct spdk_bdev *bdev)
1689 {
1690 	bool *enable = ctx;
1691 
1692 	g_bdev_count++;
1693 
1694 	spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, *enable);
1695 
1696 	return 0;
1697 }
1698 
1699 static void
1700 bdevperf_enable_histogram(bool enable)
1701 {
1702 	struct spdk_bdev *bdev;
1703 	int rc;
1704 
1705 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1706 	g_bdev_count = 1;
1707 
1708 	if (g_job_bdev_name != NULL) {
1709 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1710 		if (bdev) {
1711 			rc = _bdevperf_enable_histogram(&enable, bdev);
1712 		} else {
1713 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1714 			rc = -1;
1715 		}
1716 	} else {
1717 		rc = spdk_for_each_bdev_leaf(&enable, _bdevperf_enable_histogram);
1718 	}
1719 
1720 	bdevperf_histogram_status_cb(NULL, rc);
1721 }
1722 
1723 static void
1724 _bdevperf_construct_job_done(void *ctx)
1725 {
1726 	if (--g_construct_job_count == 0) {
1727 		if (g_run_rc != 0) {
1728 			/* Something failed. */
1729 			bdevperf_test_done(NULL);
1730 			return;
1731 		}
1732 
1733 		/* always enable histogram. */
1734 		bdevperf_enable_histogram(true);
1735 	} else if (g_run_rc != 0) {
1736 		/* Reset error as some jobs constructed right */
1737 		g_run_rc = 0;
1738 		if (g_continue_on_failure == false) {
1739 			g_error_to_exit = true;
1740 		}
1741 	}
1742 }
1743 
1744 /* Checkformat will not allow to use inlined type,
1745    this is a workaround */
1746 typedef struct spdk_thread *spdk_thread_t;
1747 
1748 static spdk_thread_t
1749 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1750 {
1751 	struct spdk_cpuset tmp;
1752 
1753 	/* This function runs on the main thread. */
1754 	assert(g_main_thread == spdk_get_thread());
1755 
1756 	/* Handle default mask */
1757 	if (spdk_cpuset_count(cpumask) == 0) {
1758 		cpumask = &g_all_cpuset;
1759 	}
1760 
1761 	/* Warn user that mask might need to be changed */
1762 	spdk_cpuset_copy(&tmp, cpumask);
1763 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1764 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1765 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1766 	}
1767 
1768 	return spdk_thread_create(tag, cpumask);
1769 }
1770 
1771 static uint32_t
1772 _get_next_core(void)
1773 {
1774 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1775 
1776 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1777 		current_core = spdk_env_get_first_core();
1778 		return current_core;
1779 	}
1780 
1781 	current_core = spdk_env_get_next_core(current_core);
1782 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1783 		current_core = spdk_env_get_first_core();
1784 	}
1785 
1786 	return current_core;
1787 }
1788 
1789 static void
1790 _bdevperf_construct_job(void *ctx)
1791 {
1792 	struct bdevperf_job *job = ctx;
1793 
1794 	if (g_zcopy) {
1795 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1796 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1797 			g_run_rc = -ENOTSUP;
1798 			goto end;
1799 		}
1800 	}
1801 
1802 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1803 	if (!job->ch) {
1804 		SPDK_ERRLOG("Could not get io_channel for device %s\n", spdk_bdev_get_name(job->bdev));
1805 		g_run_rc = -ENOMEM;
1806 		goto end;
1807 	}
1808 
1809 end:
1810 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1811 }
1812 
1813 static void
1814 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1815 {
1816 	switch (rw) {
1817 	case JOB_CONFIG_RW_READ:
1818 		job->rw_percentage = 100;
1819 		break;
1820 	case JOB_CONFIG_RW_WRITE:
1821 		job->rw_percentage = 0;
1822 		break;
1823 	case JOB_CONFIG_RW_RANDREAD:
1824 		job->is_random = true;
1825 		job->rw_percentage = 100;
1826 		job->seed = rand();
1827 		break;
1828 	case JOB_CONFIG_RW_RANDWRITE:
1829 		job->is_random = true;
1830 		job->rw_percentage = 0;
1831 		job->seed = rand();
1832 		break;
1833 	case JOB_CONFIG_RW_RW:
1834 		job->is_random = false;
1835 		break;
1836 	case JOB_CONFIG_RW_RANDRW:
1837 		job->is_random = true;
1838 		job->seed = rand();
1839 		break;
1840 	case JOB_CONFIG_RW_RESET:
1841 		/* Reset shares the flow with verify. */
1842 		job->reset = true;
1843 	/* fallthrough */
1844 	case JOB_CONFIG_RW_VERIFY:
1845 		job->verify = true;
1846 		/* For verify flow read is done on write completion
1847 		 * callback only, rw_percentage shall not be used. */
1848 		job->rw_percentage = 0;
1849 		break;
1850 	case JOB_CONFIG_RW_UNMAP:
1851 		job->unmap = true;
1852 		break;
1853 	case JOB_CONFIG_RW_FLUSH:
1854 		job->flush = true;
1855 		break;
1856 	case JOB_CONFIG_RW_WRITE_ZEROES:
1857 		job->write_zeroes = true;
1858 		break;
1859 	}
1860 }
1861 
1862 static int
1863 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1864 		       struct spdk_thread *thread)
1865 {
1866 	struct bdevperf_job *job;
1867 	struct spdk_bdev_open_opts opts = {};
1868 	struct bdevperf_task *task;
1869 	int block_size, data_block_size;
1870 	int rc;
1871 	int task_num, n;
1872 	int32_t numa_id;
1873 
1874 	job = calloc(1, sizeof(struct bdevperf_job));
1875 	if (!job) {
1876 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1877 		return -ENOMEM;
1878 	}
1879 
1880 	job->thread = thread;
1881 
1882 	job->name = strdup(spdk_bdev_get_name(bdev));
1883 	if (!job->name) {
1884 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1885 		bdevperf_job_free(job);
1886 		return -ENOMEM;
1887 	}
1888 
1889 	spdk_bdev_open_opts_init(&opts, sizeof(opts));
1890 	opts.hide_metadata = g_hide_metadata;
1891 
1892 	rc = spdk_bdev_open_ext_v2(job->name, true, bdevperf_bdev_removed, job, &opts,
1893 				   &job->bdev_desc);
1894 	if (rc != 0) {
1895 		fprintf(stderr, "Could not open leaf bdev %s, error=%d\n", job->name, rc);
1896 		bdevperf_job_free(job);
1897 		return rc;
1898 	}
1899 
1900 	block_size = spdk_bdev_desc_get_block_size(job->bdev_desc);
1901 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1902 
1903 	job->workload_type = config->rw;
1904 	job->io_size = config->bs;
1905 	job->rw_percentage = config->rwmixread;
1906 	job->continue_on_failure = g_continue_on_failure;
1907 	job->queue_depth = config->iodepth;
1908 	job->bdev = bdev;
1909 	job->io_size_blocks = job->io_size / data_block_size;
1910 	job->buf_size = job->io_size_blocks * block_size;
1911 	job->abort = g_abort;
1912 	job_init_rw(job, config->rw);
1913 	job->md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
1914 
1915 	if ((job->io_size % data_block_size) != 0) {
1916 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1917 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1918 		bdevperf_job_free(job);
1919 		return -ENOTSUP;
1920 	}
1921 
1922 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1923 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1924 		bdevperf_job_free(job);
1925 		return -ENOTSUP;
1926 	}
1927 
1928 	if (spdk_bdev_desc_is_dif_check_enabled(job->bdev_desc, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1929 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1930 	}
1931 	if (spdk_bdev_desc_is_dif_check_enabled(job->bdev_desc, SPDK_DIF_CHECK_TYPE_GUARD)) {
1932 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1933 	}
1934 
1935 	job->offset_in_ios = 0;
1936 
1937 	if (config->length != 0) {
1938 		/* Use subset of disk */
1939 		job->size_in_ios = config->length / job->io_size_blocks;
1940 		job->ios_base = config->offset / job->io_size_blocks;
1941 	} else {
1942 		/* Use whole disk */
1943 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1944 		job->ios_base = 0;
1945 	}
1946 
1947 	if (job->is_random && g_zipf_theta > 0) {
1948 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1949 	}
1950 
1951 	if (job->verify) {
1952 		if (job->size_in_ios >= UINT32_MAX) {
1953 			SPDK_ERRLOG("Due to constraints of verify operation, the job storage capacity is too large\n");
1954 			bdevperf_job_free(job);
1955 			return -ENOMEM;
1956 		}
1957 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1958 		if (job->outstanding == NULL) {
1959 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1960 				    spdk_bdev_get_name(bdev));
1961 			bdevperf_job_free(job);
1962 			return -ENOMEM;
1963 		}
1964 		if (job->queue_depth > (int)job->size_in_ios) {
1965 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1966 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1967 				     "Queue depth is limited to %"PRIu64"\n",
1968 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1969 			job->queue_depth = (int)job->size_in_ios;
1970 		}
1971 	}
1972 
1973 	job->histogram = spdk_histogram_data_alloc();
1974 	if (job->histogram == NULL) {
1975 		fprintf(stderr, "Failed to allocate histogram\n");
1976 		bdevperf_job_free(job);
1977 		return -ENOMEM;
1978 	}
1979 
1980 	TAILQ_INIT(&job->task_list);
1981 
1982 	if (g_random_map) {
1983 		if (job->size_in_ios >= UINT32_MAX) {
1984 			SPDK_ERRLOG("Due to constraints of the random map, the job storage capacity is too large\n");
1985 			bdevperf_job_free(job);
1986 			return -ENOMEM;
1987 		}
1988 		job->random_map = spdk_bit_array_create(job->size_in_ios);
1989 		if (job->random_map == NULL) {
1990 			SPDK_ERRLOG("Could not create random_map array bitmap for bdev %s\n",
1991 				    spdk_bdev_get_name(bdev));
1992 			bdevperf_job_free(job);
1993 			return -ENOMEM;
1994 		}
1995 	}
1996 
1997 	task_num = job->queue_depth;
1998 	if (job->reset) {
1999 		task_num += 1;
2000 	}
2001 	if (job->abort) {
2002 		task_num += job->queue_depth;
2003 	}
2004 
2005 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
2006 
2007 	numa_id = spdk_bdev_get_numa_id(job->bdev);
2008 
2009 	for (n = 0; n < task_num; n++) {
2010 		task = calloc(1, sizeof(struct bdevperf_task));
2011 		if (!task) {
2012 			fprintf(stderr, "Failed to allocate task from memory\n");
2013 			spdk_zipf_free(&job->zipf);
2014 			return -ENOMEM;
2015 		}
2016 
2017 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
2018 					 numa_id, SPDK_MALLOC_DMA);
2019 		if (!task->buf) {
2020 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
2021 			spdk_zipf_free(&job->zipf);
2022 			free(task);
2023 			return -ENOMEM;
2024 		}
2025 
2026 		if (job->verify && job->buf_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2027 			task->verify_buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
2028 							numa_id, SPDK_MALLOC_DMA);
2029 			if (!task->verify_buf) {
2030 				fprintf(stderr, "Cannot allocate buf_verify for task=%p\n", task);
2031 				spdk_free(task->buf);
2032 				spdk_zipf_free(&job->zipf);
2033 				free(task);
2034 				return -ENOMEM;
2035 			}
2036 
2037 			if (spdk_bdev_is_md_separate(job->bdev)) {
2038 				task->verify_md_buf = spdk_zmalloc(spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
2039 								   spdk_bdev_get_buf_align(job->bdev), NULL, numa_id, SPDK_MALLOC_DMA);
2040 				if (!task->verify_md_buf) {
2041 					fprintf(stderr, "Cannot allocate verify_md_buf for task=%p\n", task);
2042 					spdk_free(task->buf);
2043 					spdk_free(task->verify_buf);
2044 					spdk_zipf_free(&job->zipf);
2045 					free(task);
2046 					return -ENOMEM;
2047 				}
2048 			}
2049 		}
2050 
2051 		if (spdk_bdev_desc_is_md_separate(job->bdev_desc)) {
2052 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
2053 						    spdk_bdev_desc_get_md_size(job->bdev_desc), 0, NULL,
2054 						    numa_id, SPDK_MALLOC_DMA);
2055 			if (!task->md_buf) {
2056 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
2057 				spdk_zipf_free(&job->zipf);
2058 				spdk_free(task->verify_buf);
2059 				spdk_free(task->verify_md_buf);
2060 				spdk_free(task->buf);
2061 				free(task);
2062 				return -ENOMEM;
2063 			}
2064 		}
2065 
2066 		task->job = job;
2067 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
2068 	}
2069 
2070 	g_construct_job_count++;
2071 
2072 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
2073 	assert(rc == 0);
2074 
2075 	return rc;
2076 }
2077 
2078 static int
2079 parse_rw(const char *str, enum job_config_rw ret)
2080 {
2081 	if (str == NULL) {
2082 		return ret;
2083 	}
2084 
2085 	if (!strcmp(str, "read")) {
2086 		ret = JOB_CONFIG_RW_READ;
2087 	} else if (!strcmp(str, "randread")) {
2088 		ret = JOB_CONFIG_RW_RANDREAD;
2089 	} else if (!strcmp(str, "write")) {
2090 		ret = JOB_CONFIG_RW_WRITE;
2091 	} else if (!strcmp(str, "randwrite")) {
2092 		ret = JOB_CONFIG_RW_RANDWRITE;
2093 	} else if (!strcmp(str, "verify")) {
2094 		ret = JOB_CONFIG_RW_VERIFY;
2095 	} else if (!strcmp(str, "reset")) {
2096 		ret = JOB_CONFIG_RW_RESET;
2097 	} else if (!strcmp(str, "unmap")) {
2098 		ret = JOB_CONFIG_RW_UNMAP;
2099 	} else if (!strcmp(str, "write_zeroes")) {
2100 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
2101 	} else if (!strcmp(str, "flush")) {
2102 		ret = JOB_CONFIG_RW_FLUSH;
2103 	} else if (!strcmp(str, "rw")) {
2104 		ret = JOB_CONFIG_RW_RW;
2105 	} else if (!strcmp(str, "randrw")) {
2106 		ret = JOB_CONFIG_RW_RANDRW;
2107 	} else {
2108 		fprintf(stderr, "rw must be one of\n"
2109 			PATTERN_TYPES_STR "\n");
2110 		ret = BDEVPERF_CONFIG_ERROR;
2111 	}
2112 
2113 	return ret;
2114 }
2115 
2116 static const char *
2117 config_filename_next(const char *filename, char *out)
2118 {
2119 	int i, k;
2120 
2121 	if (filename == NULL) {
2122 		out[0] = '\0';
2123 		return NULL;
2124 	}
2125 
2126 	if (filename[0] == ':') {
2127 		filename++;
2128 	}
2129 
2130 	for (i = 0, k = 0;
2131 	     filename[i] != '\0' &&
2132 	     filename[i] != ':' &&
2133 	     i < BDEVPERF_CONFIG_MAX_FILENAME &&
2134 	     k < (BDEVPERF_CONFIG_MAX_FILENAME - 1);
2135 	     i++) {
2136 		if (filename[i] == ' ' || filename[i] == '\t') {
2137 			continue;
2138 		}
2139 
2140 		out[k++] = filename[i];
2141 	}
2142 	out[k] = 0;
2143 
2144 	return filename + i;
2145 }
2146 
2147 static struct spdk_thread *
2148 get_lcore_thread(uint32_t lcore)
2149 {
2150 	struct lcore_thread *lthread;
2151 
2152 	TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
2153 		if (lthread->lcore == lcore) {
2154 			return lthread->thread;
2155 		}
2156 	}
2157 
2158 	return NULL;
2159 }
2160 
2161 static void
2162 create_lcore_thread(uint32_t lcore)
2163 {
2164 	struct lcore_thread *lthread;
2165 	struct spdk_cpuset cpumask = {};
2166 	char name[32];
2167 
2168 	lthread = calloc(1, sizeof(*lthread));
2169 	assert(lthread != NULL);
2170 
2171 	lthread->lcore = lcore;
2172 
2173 	snprintf(name, sizeof(name), "lcore_%u", lcore);
2174 	spdk_cpuset_set_cpu(&cpumask, lcore, true);
2175 
2176 	lthread->thread = spdk_thread_create(name, &cpumask);
2177 	assert(lthread->thread != NULL);
2178 
2179 	TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
2180 }
2181 
2182 static void
2183 bdevperf_construct_jobs(void)
2184 {
2185 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
2186 	struct spdk_thread *thread;
2187 	struct job_config *config;
2188 	struct spdk_bdev *bdev;
2189 	const char *filenames;
2190 	uint32_t i;
2191 	int rc;
2192 
2193 	if (g_one_thread_per_lcore) {
2194 		SPDK_ENV_FOREACH_CORE(i) {
2195 			create_lcore_thread(i);
2196 		}
2197 	}
2198 
2199 	TAILQ_FOREACH(config, &job_config_list, link) {
2200 		filenames = config->filename;
2201 
2202 		if (!g_one_thread_per_lcore) {
2203 			thread = construct_job_thread(&config->cpumask, config->name);
2204 		} else {
2205 			thread = get_lcore_thread(config->lcore);
2206 		}
2207 		assert(thread);
2208 
2209 		while (filenames) {
2210 			filenames = config_filename_next(filenames, filename);
2211 			if (strlen(filename) == 0) {
2212 				break;
2213 			}
2214 
2215 			bdev = spdk_bdev_get_by_name(filename);
2216 			if (!bdev) {
2217 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
2218 				g_run_rc = -EINVAL;
2219 				return;
2220 			}
2221 
2222 			rc = bdevperf_construct_job(bdev, config, thread);
2223 			if (rc < 0) {
2224 				g_run_rc = rc;
2225 				return;
2226 			}
2227 		}
2228 	}
2229 }
2230 
2231 static int
2232 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
2233 {
2234 	struct job_config *config = calloc(1, sizeof(*config));
2235 
2236 	if (config == NULL) {
2237 		fprintf(stderr, "Unable to allocate memory for job config\n");
2238 		return -ENOMEM;
2239 	}
2240 
2241 	config->name = filename;
2242 	config->filename = filename;
2243 	config->lcore = _get_next_core();
2244 	spdk_cpuset_zero(&config->cpumask);
2245 	spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
2246 	config->bs = g_io_size;
2247 	config->iodepth = g_queue_depth;
2248 	config->rwmixread = g_rw_percentage;
2249 	config->offset = offset;
2250 	config->length = range;
2251 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
2252 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2253 		free(config);
2254 		return -EINVAL;
2255 	}
2256 
2257 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
2258 	return 0;
2259 }
2260 
2261 static int
2262 bdevperf_construct_multithread_job_config(void *ctx, struct spdk_bdev *bdev)
2263 {
2264 	uint32_t *num_cores = ctx;
2265 	uint32_t i;
2266 	uint64_t blocks_per_job;
2267 	int64_t offset;
2268 	int rc;
2269 
2270 	blocks_per_job = spdk_bdev_get_num_blocks(bdev) / *num_cores;
2271 	offset = 0;
2272 
2273 	SPDK_ENV_FOREACH_CORE(i) {
2274 		rc = make_cli_job_config(spdk_bdev_get_name(bdev), offset, blocks_per_job);
2275 		if (rc) {
2276 			return rc;
2277 		}
2278 
2279 		offset += blocks_per_job;
2280 	}
2281 
2282 	return 0;
2283 }
2284 
2285 static void
2286 bdevperf_construct_multithread_job_configs(void)
2287 {
2288 	struct spdk_bdev *bdev;
2289 	uint32_t i;
2290 	uint32_t num_cores;
2291 
2292 	num_cores = 0;
2293 	SPDK_ENV_FOREACH_CORE(i) {
2294 		num_cores++;
2295 	}
2296 
2297 	if (num_cores == 0) {
2298 		g_run_rc = -EINVAL;
2299 		return;
2300 	}
2301 
2302 	if (g_job_bdev_name != NULL) {
2303 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2304 		if (!bdev) {
2305 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2306 			return;
2307 		}
2308 		g_run_rc = bdevperf_construct_multithread_job_config(&num_cores, bdev);
2309 	} else {
2310 		g_run_rc = spdk_for_each_bdev_leaf(&num_cores, bdevperf_construct_multithread_job_config);
2311 	}
2312 
2313 }
2314 
2315 static int
2316 bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
2317 {
2318 	/* Construct the job */
2319 	return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
2320 }
2321 
2322 static void
2323 bdevperf_construct_job_configs(void)
2324 {
2325 	struct spdk_bdev *bdev;
2326 
2327 	/* There are three different modes for allocating jobs. Standard mode
2328 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
2329 	 *
2330 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
2331 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
2332 	 * This runs multiple threads per bdev, effectively.
2333 	 *
2334 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
2335 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
2336 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
2337 	 * which is different from other modes in that they only support global options.
2338 	 *
2339 	 * Both for standard mode and "multithread" mode, if the -E flag is specified,
2340 	 * it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
2341 	 * multiple jobs.
2342 	 */
2343 
2344 	if (g_bdevperf_conf) {
2345 		goto end;
2346 	}
2347 
2348 	if (g_multithread_mode) {
2349 		bdevperf_construct_multithread_job_configs();
2350 	} else if (g_job_bdev_name != NULL) {
2351 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2352 		if (bdev) {
2353 			/* Construct the job */
2354 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
2355 		} else {
2356 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2357 		}
2358 	} else {
2359 		g_run_rc = spdk_for_each_bdev_leaf(NULL, bdevperf_construct_job_config);
2360 	}
2361 
2362 end:
2363 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
2364 	 * of iteration.
2365 	 */
2366 	g_construct_job_count = 1;
2367 
2368 	if (g_run_rc == 0) {
2369 		bdevperf_construct_jobs();
2370 	}
2371 
2372 	_bdevperf_construct_job_done(NULL);
2373 }
2374 
2375 static int
2376 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
2377 {
2378 	const char *job_name;
2379 	int tmp;
2380 
2381 	tmp = spdk_conf_section_get_intval(s, name);
2382 	if (tmp == -1) {
2383 		/* Field was not found. Check default value
2384 		 * In [global] section it is ok to have undefined values
2385 		 * but for other sections it is not ok */
2386 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
2387 			job_name = spdk_conf_section_get_name(s);
2388 			if (strcmp(job_name, "global") == 0) {
2389 				return def;
2390 			}
2391 
2392 			fprintf(stderr,
2393 				"Job '%s' has no '%s' assigned\n",
2394 				job_name, name);
2395 			return BDEVPERF_CONFIG_ERROR;
2396 		}
2397 		return def;
2398 	}
2399 
2400 	/* NOTE: get_intval returns nonnegative on success */
2401 	if (tmp < 0) {
2402 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2403 			spdk_conf_section_get_name(s), name);
2404 		return BDEVPERF_CONFIG_ERROR;
2405 	}
2406 
2407 	return tmp;
2408 }
2409 
2410 /* CLI arguments override parameters for global sections */
2411 static void
2412 config_set_cli_args(struct job_config *config)
2413 {
2414 	if (g_job_bdev_name) {
2415 		config->filename = g_job_bdev_name;
2416 	}
2417 	if (g_io_size > 0) {
2418 		config->bs = g_io_size;
2419 	}
2420 	if (g_queue_depth > 0) {
2421 		config->iodepth = g_queue_depth;
2422 	}
2423 	if (g_rw_percentage > 0) {
2424 		config->rwmixread = g_rw_percentage;
2425 	}
2426 	if (g_workload_type) {
2427 		config->rw = parse_rw(g_workload_type, config->rw);
2428 	}
2429 }
2430 
2431 static int
2432 read_job_config(void)
2433 {
2434 	struct job_config global_default_config;
2435 	struct job_config global_config;
2436 	struct spdk_conf_section *s;
2437 	struct job_config *config = NULL;
2438 	const char *cpumask;
2439 	const char *rw;
2440 	bool is_global;
2441 	int n = 0;
2442 	int val;
2443 
2444 	if (g_bdevperf_conf_file == NULL) {
2445 		return 0;
2446 	}
2447 
2448 	g_bdevperf_conf = spdk_conf_allocate();
2449 	if (g_bdevperf_conf == NULL) {
2450 		fprintf(stderr, "Could not allocate job config structure\n");
2451 		return 1;
2452 	}
2453 
2454 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2455 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2456 		fprintf(stderr, "Invalid job config");
2457 		return 1;
2458 	}
2459 
2460 	/* Initialize global defaults */
2461 	global_default_config.filename = NULL;
2462 	/* Zero mask is the same as g_all_cpuset
2463 	 * The g_all_cpuset is not initialized yet,
2464 	 * so use zero mask as the default instead */
2465 	spdk_cpuset_zero(&global_default_config.cpumask);
2466 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2467 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2468 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2469 	global_default_config.rwmixread = 50;
2470 	global_default_config.offset = 0;
2471 	/* length 0 means 100% */
2472 	global_default_config.length = 0;
2473 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2474 	config_set_cli_args(&global_default_config);
2475 
2476 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2477 		return 1;
2478 	}
2479 
2480 	/* There is only a single instance of global job_config
2481 	 * We just reset its value when we encounter new [global] section */
2482 	global_config = global_default_config;
2483 
2484 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2485 	     s != NULL;
2486 	     s = spdk_conf_next_section(s)) {
2487 		config = calloc(1, sizeof(*config));
2488 		if (config == NULL) {
2489 			fprintf(stderr, "Unable to allocate memory for job config\n");
2490 			return 1;
2491 		}
2492 
2493 		config->name = spdk_conf_section_get_name(s);
2494 		is_global = strcmp(config->name, "global") == 0;
2495 
2496 		if (is_global) {
2497 			global_config = global_default_config;
2498 		}
2499 
2500 		config->filename = spdk_conf_section_get_val(s, "filename");
2501 		if (config->filename == NULL) {
2502 			config->filename = global_config.filename;
2503 		}
2504 		if (!is_global) {
2505 			if (config->filename == NULL) {
2506 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2507 				goto error;
2508 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2509 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2510 				fprintf(stderr,
2511 					"filename for '%s' job is too long. Max length is %d\n",
2512 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2513 				goto error;
2514 			}
2515 		}
2516 
2517 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2518 		if (cpumask == NULL) {
2519 			config->cpumask = global_config.cpumask;
2520 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2521 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2522 			goto error;
2523 		}
2524 
2525 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2526 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2527 			goto error;
2528 		} else if (config->bs == 0) {
2529 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2530 			goto error;
2531 		}
2532 
2533 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2534 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2535 			goto error;
2536 		} else if (config->iodepth == 0) {
2537 			fprintf(stderr,
2538 				"'iodepth' of job '%s' must be greater than 0\n",
2539 				config->name);
2540 			goto error;
2541 		}
2542 
2543 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2544 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2545 			goto error;
2546 		} else if (config->rwmixread > 100) {
2547 			fprintf(stderr,
2548 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2549 				config->name);
2550 			goto error;
2551 		}
2552 
2553 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2554 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2555 			goto error;
2556 		}
2557 
2558 		val = parse_uint_option(s, "length", global_config.length);
2559 		if (val == BDEVPERF_CONFIG_ERROR) {
2560 			goto error;
2561 		}
2562 		config->length = val;
2563 
2564 		rw = spdk_conf_section_get_val(s, "rw");
2565 		config->rw = parse_rw(rw, global_config.rw);
2566 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2567 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2568 			goto error;
2569 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2570 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2571 			goto error;
2572 		}
2573 
2574 		if (is_global) {
2575 			config_set_cli_args(config);
2576 			global_config = *config;
2577 			free(config);
2578 			config = NULL;
2579 		} else {
2580 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2581 			n++;
2582 		}
2583 	}
2584 
2585 	if (g_rpc_log_file_name != NULL) {
2586 		g_rpc_log_file = fopen(g_rpc_log_file_name, "a");
2587 		if (g_rpc_log_file == NULL) {
2588 			fprintf(stderr, "Failed to open %s\n", g_rpc_log_file_name);
2589 			goto error;
2590 		}
2591 	}
2592 
2593 	printf("Using job config with %d jobs\n", n);
2594 	return 0;
2595 error:
2596 	free(config);
2597 	return 1;
2598 }
2599 
2600 static void
2601 bdevperf_run(void *arg1)
2602 {
2603 	uint32_t i;
2604 
2605 	g_main_thread = spdk_get_thread();
2606 
2607 	spdk_cpuset_zero(&g_all_cpuset);
2608 	SPDK_ENV_FOREACH_CORE(i) {
2609 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2610 	}
2611 
2612 	if (g_wait_for_tests) {
2613 		/* Do not perform any tests until RPC is received */
2614 		return;
2615 	}
2616 
2617 	bdevperf_construct_job_configs();
2618 }
2619 
2620 static void
2621 rpc_perform_tests_reset(void)
2622 {
2623 	/* Reset g_run_rc to 0 for the next test run. */
2624 	g_run_rc = 0;
2625 
2626 	/* Reset g_stats to 0 for the next test run. */
2627 	memset(&g_stats, 0, sizeof(g_stats));
2628 
2629 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2630 	g_show_performance_period_num = 0;
2631 }
2632 
2633 static void
2634 rpc_perform_tests_cb(void)
2635 {
2636 	struct spdk_jsonrpc_request *request = g_request;
2637 
2638 	g_request = NULL;
2639 
2640 	if (g_run_rc) {
2641 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2642 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2643 	}
2644 
2645 	rpc_perform_tests_reset();
2646 }
2647 
2648 struct rpc_bdevperf_params {
2649 	int	time_in_sec;
2650 	char	*workload_type;
2651 	int	queue_depth;
2652 	char	*io_size;
2653 	int	rw_percentage;
2654 };
2655 
2656 static const struct spdk_json_object_decoder rpc_bdevperf_params_decoders[] = {
2657 	{"time_in_sec", offsetof(struct rpc_bdevperf_params, time_in_sec), spdk_json_decode_int32, true},
2658 	{"workload_type", offsetof(struct rpc_bdevperf_params, workload_type), spdk_json_decode_string, true},
2659 	{"queue_depth", offsetof(struct rpc_bdevperf_params, queue_depth), spdk_json_decode_int32, true},
2660 	{"io_size", offsetof(struct rpc_bdevperf_params, io_size), spdk_json_decode_string, true},
2661 	{"rw_percentage", offsetof(struct rpc_bdevperf_params, rw_percentage), spdk_json_decode_int32, true},
2662 };
2663 
2664 static void
2665 rpc_apply_bdevperf_params(struct rpc_bdevperf_params *params)
2666 {
2667 	if (params->workload_type) {
2668 		/* we need to clear previously settled parameter to avoid memory leak */
2669 		free(g_workload_type);
2670 		g_workload_type = strdup(params->workload_type);
2671 	}
2672 	if (params->queue_depth) {
2673 		g_queue_depth = params->queue_depth;
2674 	}
2675 	if (params->io_size) {
2676 		bdevperf_parse_arg('o', params->io_size);
2677 	}
2678 	if (params->time_in_sec) {
2679 		g_time_in_sec = params->time_in_sec;
2680 	}
2681 	if (params->rw_percentage) {
2682 		g_rw_percentage = params->rw_percentage;
2683 		g_mix_specified = true;
2684 	} else {
2685 		g_mix_specified = false;
2686 	}
2687 }
2688 
2689 static void
2690 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2691 {
2692 	struct rpc_bdevperf_params req = {}, backup = {};
2693 	int rc;
2694 
2695 	if (g_request != NULL) {
2696 		fprintf(stderr, "Another test is already in progress.\n");
2697 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2698 						 spdk_strerror(-EINPROGRESS));
2699 		return;
2700 	}
2701 
2702 	if (params) {
2703 		if (spdk_json_decode_object_relaxed(params, rpc_bdevperf_params_decoders,
2704 						    SPDK_COUNTOF(rpc_bdevperf_params_decoders),
2705 						    &req)) {
2706 			spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2707 							 "spdk_json_decode_object failed");
2708 			return;
2709 		}
2710 
2711 		if (g_workload_type) {
2712 			backup.workload_type = strdup(g_workload_type);
2713 		}
2714 		backup.queue_depth = g_queue_depth;
2715 		if (asprintf(&backup.io_size, "%d", g_io_size) < 0) {
2716 			fprintf(stderr, "Couldn't allocate memory for queue depth");
2717 			goto rpc_error;
2718 		}
2719 		backup.time_in_sec = g_time_in_sec;
2720 		backup.rw_percentage = g_rw_percentage;
2721 
2722 		rpc_apply_bdevperf_params(&req);
2723 
2724 		free(req.workload_type);
2725 		free(req.io_size);
2726 	}
2727 
2728 	rc = verify_test_params();
2729 
2730 	if (rc) {
2731 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2732 						 "Invalid parameters provided");
2733 		/* restore old params on error */
2734 		rpc_apply_bdevperf_params(&backup);
2735 		goto rpc_error;
2736 	}
2737 
2738 	g_request = request;
2739 
2740 	/* Only construct job configs at the first test run.  */
2741 	if (TAILQ_EMPTY(&job_config_list)) {
2742 		bdevperf_construct_job_configs();
2743 	} else {
2744 		bdevperf_construct_jobs();
2745 	}
2746 
2747 rpc_error:
2748 	free(backup.io_size);
2749 	free(backup.workload_type);
2750 }
2751 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2752 
2753 static void
2754 spdk_bdevperf_shutdown_cb(void)
2755 {
2756 	g_shutdown = true;
2757 	struct bdevperf_job *job, *tmp;
2758 
2759 	if (g_bdevperf.running_jobs == 0) {
2760 		bdevperf_test_done(NULL);
2761 		return;
2762 	}
2763 
2764 	/* Iterate jobs to stop all I/O */
2765 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2766 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2767 	}
2768 }
2769 
2770 static int
2771 bdevperf_parse_arg(int ch, char *arg)
2772 {
2773 	long long tmp;
2774 
2775 	if (ch == 'w') {
2776 		g_workload_type = strdup(arg);
2777 	} else if (ch == 'T') {
2778 		g_job_bdev_name = arg;
2779 	} else if (ch == 'z') {
2780 		g_wait_for_tests = true;
2781 	} else if (ch == 'Z') {
2782 		g_zcopy = true;
2783 	} else if (ch == 'X') {
2784 		g_abort = true;
2785 	} else if (ch == 'C') {
2786 		g_multithread_mode = true;
2787 	} else if (ch == 'f') {
2788 		g_continue_on_failure = true;
2789 	} else if (ch == 'j') {
2790 		g_bdevperf_conf_file = arg;
2791 	} else if (ch == 'F') {
2792 		char *endptr;
2793 
2794 		errno = 0;
2795 		g_zipf_theta = strtod(arg, &endptr);
2796 		if (errno || arg == endptr || g_zipf_theta < 0) {
2797 			fprintf(stderr, "Illegal zipf theta value %s\n", arg);
2798 			return -EINVAL;
2799 		}
2800 	} else if (ch == 'l') {
2801 		g_latency_display_level++;
2802 	} else if (ch == 'D') {
2803 		g_random_map = true;
2804 	} else if (ch == 'E') {
2805 		g_one_thread_per_lcore = true;
2806 	} else if (ch == 'J') {
2807 		g_rpc_log_file_name = arg;
2808 	} else if (ch == 'o') {
2809 		uint64_t size;
2810 
2811 		if (spdk_parse_capacity(arg, &size, NULL) != 0) {
2812 			fprintf(stderr, "Invalid IO size: %s\n", arg);
2813 			return -EINVAL;
2814 		}
2815 		g_io_size = (int)size;
2816 	} else if (ch == 'U') {
2817 		g_unique_writes = true;
2818 	} else if (ch == 'N') {
2819 		g_hide_metadata = true;
2820 	} else {
2821 		tmp = spdk_strtoll(arg, 10);
2822 		if (tmp < 0) {
2823 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2824 			return tmp;
2825 		} else if (tmp >= INT_MAX) {
2826 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2827 			return -ERANGE;
2828 		}
2829 
2830 		switch (ch) {
2831 		case 'q':
2832 			g_queue_depth = tmp;
2833 			break;
2834 		case 't':
2835 			g_time_in_sec = tmp;
2836 			break;
2837 		case 'k':
2838 			g_timeout_in_sec = tmp;
2839 			break;
2840 		case 'M':
2841 			g_rw_percentage = tmp;
2842 			g_mix_specified = true;
2843 			break;
2844 		case 'P':
2845 			g_show_performance_ema_period = tmp;
2846 			break;
2847 		case 'S':
2848 			g_summarize_performance = false;
2849 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2850 			break;
2851 		default:
2852 			return -EINVAL;
2853 		}
2854 	}
2855 	return 0;
2856 }
2857 
2858 static void
2859 bdevperf_usage(void)
2860 {
2861 	printf(" -q <depth>                io depth\n");
2862 	printf(" -o <size>                 io size in bytes\n");
2863 	printf(" -w <type>                 io pattern type, must be one of " PATTERN_TYPES_STR "\n");
2864 	printf(" -t <time>                 time in seconds\n");
2865 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2866 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2867 	printf(" -P <num>                  number of moving average period\n");
2868 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2869 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2870 	printf("\t\t(only valid with -S)\n");
2871 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2872 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2873 	printf(" -f                        continue processing I/O even after failures\n");
2874 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2875 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2876 	printf(" -z                        start bdevperf, but wait for perform_tests RPC to start tests\n");
2877 	printf("                           (See examples/bdev/bdevperf/bdevperf.py)\n");
2878 	printf(" -X                        abort timed out I/O\n");
2879 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2880 	printf(" -j <filename>             use job config file\n");
2881 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2882 	printf(" -D                        use a random map for picking offsets not previously read or written (for all jobs)\n");
2883 	printf(" -E                        share per lcore thread among jobs. Available only if -j is not used.\n");
2884 	printf(" -J                        File name to open with append mode and log JSON RPC calls.\n");
2885 	printf(" -U                        generate unique data for each write I/O, has no effect on non-write I/O\n");
2886 	printf(" -N                        Enable hide_metadata option to each bdev\n");
2887 }
2888 
2889 static void
2890 bdevperf_fini(void)
2891 {
2892 	free_job_config();
2893 	free(g_workload_type);
2894 
2895 	if (g_rpc_log_file != NULL) {
2896 		fclose(g_rpc_log_file);
2897 		g_rpc_log_file = NULL;
2898 	}
2899 }
2900 
2901 static int
2902 verify_test_params(void)
2903 {
2904 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2905 		goto out;
2906 	}
2907 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2908 		goto out;
2909 	}
2910 	if (!g_bdevperf_conf_file && !g_workload_type) {
2911 		goto out;
2912 	}
2913 	if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
2914 		printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
2915 		goto out;
2916 	}
2917 	if (g_time_in_sec <= 0) {
2918 		goto out;
2919 	}
2920 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2921 
2922 	if (g_timeout_in_sec < 0) {
2923 		goto out;
2924 	}
2925 
2926 	if (g_abort && !g_timeout_in_sec) {
2927 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2928 	}
2929 
2930 	if (g_show_performance_ema_period > 0 && g_summarize_performance) {
2931 		fprintf(stderr, "-P option must be specified with -S option\n");
2932 		return 1;
2933 	}
2934 
2935 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2936 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2937 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2938 		printf("Zero copy mechanism will not be used.\n");
2939 		g_zcopy = false;
2940 	}
2941 
2942 	if (g_bdevperf_conf_file) {
2943 		/* workload_type verification happens during config file parsing */
2944 		return 0;
2945 	}
2946 
2947 	if (!strcmp(g_workload_type, "verify") ||
2948 	    !strcmp(g_workload_type, "reset")) {
2949 		g_rw_percentage = 50;
2950 		g_verify = true;
2951 		if (!strcmp(g_workload_type, "reset")) {
2952 			g_reset = true;
2953 		}
2954 	}
2955 
2956 	if (!strcmp(g_workload_type, "read") ||
2957 	    !strcmp(g_workload_type, "randread") ||
2958 	    !strcmp(g_workload_type, "write") ||
2959 	    !strcmp(g_workload_type, "randwrite") ||
2960 	    !strcmp(g_workload_type, "verify") ||
2961 	    !strcmp(g_workload_type, "reset") ||
2962 	    !strcmp(g_workload_type, "unmap") ||
2963 	    !strcmp(g_workload_type, "write_zeroes") ||
2964 	    !strcmp(g_workload_type, "flush")) {
2965 		if (g_mix_specified) {
2966 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2967 				" only when using rw or randrw.\n");
2968 		}
2969 	}
2970 
2971 	if (!strcmp(g_workload_type, "rw") ||
2972 	    !strcmp(g_workload_type, "randrw")) {
2973 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2974 			fprintf(stderr,
2975 				"-M must be specified to value from 0 to 100 "
2976 				"for rw or randrw.\n");
2977 			return 1;
2978 		}
2979 	}
2980 
2981 	if (strcmp(g_workload_type, "randread") &&
2982 	    strcmp(g_workload_type, "randwrite") &&
2983 	    strcmp(g_workload_type, "randrw")) {
2984 		if (g_random_map) {
2985 			fprintf(stderr, "Ignoring -D option... Please use -D option"
2986 				" only when using randread, randwrite or randrw.\n");
2987 			return 1;
2988 		}
2989 	}
2990 
2991 	return 0;
2992 out:
2993 	return 1;
2994 }
2995 
2996 int
2997 main(int argc, char **argv)
2998 {
2999 	struct spdk_app_opts opts = {};
3000 	int rc;
3001 
3002 	/* Use the runtime PID to set the random seed */
3003 	srand(getpid());
3004 
3005 	spdk_app_opts_init(&opts, sizeof(opts));
3006 	opts.name = "bdevperf";
3007 	opts.rpc_addr = NULL;
3008 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
3009 
3010 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:J:M:P:S:T:Xlj:DUN", NULL,
3011 				      bdevperf_parse_arg, bdevperf_usage)) !=
3012 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
3013 		return rc;
3014 	}
3015 
3016 	/* Set the default address if no rpc_addr was provided in args
3017 	 * and RPC is used for starting tests */
3018 	if (g_wait_for_tests && opts.rpc_addr == NULL) {
3019 		opts.rpc_addr = SPDK_DEFAULT_RPC_ADDR;
3020 	}
3021 
3022 	if (read_job_config()) {
3023 		bdevperf_fini();
3024 		return 1;
3025 	}
3026 
3027 	if (g_rpc_log_file != NULL) {
3028 		opts.rpc_log_file = g_rpc_log_file;
3029 	}
3030 
3031 	if (verify_test_params() != 0 && !g_wait_for_tests) {
3032 		spdk_app_usage();
3033 		bdevperf_usage();
3034 		bdevperf_fini();
3035 		exit(1);
3036 	}
3037 
3038 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
3039 
3040 	spdk_app_fini();
3041 	bdevperf_fini();
3042 	return rc;
3043 }
3044