xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision dcb296a32b5e183ccf86cad08fe23a06f0ee7376)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 
23 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
24 #define BDEVPERF_CONFIG_UNDEFINED -1
25 #define BDEVPERF_CONFIG_ERROR -2
26 
27 struct bdevperf_task {
28 	struct iovec			iov;
29 	struct bdevperf_job		*job;
30 	struct spdk_bdev_io		*bdev_io;
31 	void				*buf;
32 	void				*md_buf;
33 	uint64_t			offset_blocks;
34 	struct bdevperf_task		*task_to_abort;
35 	enum spdk_bdev_io_type		io_type;
36 	TAILQ_ENTRY(bdevperf_task)	link;
37 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
38 };
39 
40 static const char *g_workload_type = NULL;
41 static int g_io_size = 0;
42 /* initialize to invalid value so we can detect if user overrides it. */
43 static int g_rw_percentage = -1;
44 static bool g_verify = false;
45 static bool g_reset = false;
46 static bool g_continue_on_failure = false;
47 static bool g_abort = false;
48 static bool g_error_to_exit = false;
49 static int g_queue_depth = 0;
50 static uint64_t g_time_in_usec;
51 static int g_show_performance_real_time = 0;
52 static uint64_t g_show_performance_period_in_usec = 1000000;
53 static uint64_t g_show_performance_period_num = 0;
54 static uint64_t g_show_performance_ema_period = 0;
55 static int g_run_rc = 0;
56 static bool g_shutdown = false;
57 static uint64_t g_start_tsc;
58 static uint64_t g_shutdown_tsc;
59 static bool g_zcopy = false;
60 static struct spdk_thread *g_main_thread;
61 static int g_time_in_sec = 0;
62 static bool g_mix_specified = false;
63 static const char *g_job_bdev_name;
64 static bool g_wait_for_tests = false;
65 static struct spdk_jsonrpc_request *g_request = NULL;
66 static bool g_multithread_mode = false;
67 static int g_timeout_in_sec;
68 static struct spdk_conf *g_bdevperf_conf = NULL;
69 static const char *g_bdevperf_conf_file = NULL;
70 static double g_zipf_theta;
71 
72 static struct spdk_cpuset g_all_cpuset;
73 static struct spdk_poller *g_perf_timer = NULL;
74 
75 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
76 static void rpc_perform_tests_cb(void);
77 
78 struct bdevperf_job {
79 	char				*name;
80 	struct spdk_bdev		*bdev;
81 	struct spdk_bdev_desc		*bdev_desc;
82 	struct spdk_io_channel		*ch;
83 	TAILQ_ENTRY(bdevperf_job)	link;
84 	struct spdk_thread		*thread;
85 
86 	const char			*workload_type;
87 	int				io_size;
88 	int				rw_percentage;
89 	bool				is_random;
90 	bool				verify;
91 	bool				reset;
92 	bool				continue_on_failure;
93 	bool				unmap;
94 	bool				write_zeroes;
95 	bool				flush;
96 	bool				abort;
97 	int				queue_depth;
98 	unsigned int			seed;
99 
100 	uint64_t			io_completed;
101 	uint64_t			io_failed;
102 	uint64_t			io_timeout;
103 	uint64_t			prev_io_completed;
104 	double				ema_io_per_second;
105 	int				current_queue_depth;
106 	uint64_t			size_in_ios;
107 	uint64_t			ios_base;
108 	uint64_t			offset_in_ios;
109 	uint64_t			io_size_blocks;
110 	uint64_t			buf_size;
111 	uint32_t			dif_check_flags;
112 	bool				is_draining;
113 	struct spdk_poller		*run_timer;
114 	struct spdk_poller		*reset_timer;
115 	struct spdk_bit_array		*outstanding;
116 	struct spdk_zipf		*zipf;
117 	TAILQ_HEAD(, bdevperf_task)	task_list;
118 	uint64_t			run_time_in_usec;
119 };
120 
121 struct spdk_bdevperf {
122 	TAILQ_HEAD(, bdevperf_job)	jobs;
123 	uint32_t			running_jobs;
124 };
125 
126 static struct spdk_bdevperf g_bdevperf = {
127 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
128 	.running_jobs = 0,
129 };
130 
131 enum job_config_rw {
132 	JOB_CONFIG_RW_READ = 0,
133 	JOB_CONFIG_RW_WRITE,
134 	JOB_CONFIG_RW_RANDREAD,
135 	JOB_CONFIG_RW_RANDWRITE,
136 	JOB_CONFIG_RW_RW,
137 	JOB_CONFIG_RW_RANDRW,
138 	JOB_CONFIG_RW_VERIFY,
139 	JOB_CONFIG_RW_RESET,
140 	JOB_CONFIG_RW_UNMAP,
141 	JOB_CONFIG_RW_FLUSH,
142 	JOB_CONFIG_RW_WRITE_ZEROES,
143 };
144 
145 /* Storing values from a section of job config file */
146 struct job_config {
147 	const char			*name;
148 	const char			*filename;
149 	struct spdk_cpuset		cpumask;
150 	int				bs;
151 	int				iodepth;
152 	int				rwmixread;
153 	int64_t				offset;
154 	uint64_t			length;
155 	enum job_config_rw		rw;
156 	TAILQ_ENTRY(job_config)	link;
157 };
158 
159 TAILQ_HEAD(, job_config) job_config_list
160 	= TAILQ_HEAD_INITIALIZER(job_config_list);
161 
162 static bool g_performance_dump_active = false;
163 
164 struct bdevperf_aggregate_stats {
165 	struct bdevperf_job		*current_job;
166 	uint64_t			io_time_in_usec;
167 	uint64_t			ema_period;
168 	double				total_io_per_second;
169 	double				total_mb_per_second;
170 	double				total_failed_per_second;
171 	double				total_timeout_per_second;
172 };
173 
174 static struct bdevperf_aggregate_stats g_stats = {};
175 
176 /*
177  * Cumulative Moving Average (CMA): average of all data up to current
178  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
179  * Simple Moving Average (SMA): unweighted mean of the previous n data
180  *
181  * Bdevperf supports CMA and EMA.
182  */
183 static double
184 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
185 {
186 	return (double)job->io_completed * 1000000 / io_time_in_usec;
187 }
188 
189 static double
190 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
191 {
192 	double io_completed, io_per_second;
193 
194 	io_completed = job->io_completed;
195 	io_per_second = (double)(io_completed - job->prev_io_completed) * 1000000
196 			/ g_show_performance_period_in_usec;
197 	job->prev_io_completed = io_completed;
198 
199 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
200 				  / (ema_period + 1);
201 	return job->ema_io_per_second;
202 }
203 
204 static void
205 performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job *job)
206 {
207 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
208 	uint64_t time_in_usec;
209 
210 	printf("\r Job: %s (Core Mask 0x%s)\n", spdk_thread_get_name(job->thread),
211 	       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
212 
213 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
214 		printf("\r Job: %s ended in about %.2f seconds with error\n",
215 		       spdk_thread_get_name(job->thread), (double)job->run_time_in_usec / 1000000);
216 	}
217 	if (job->verify) {
218 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
219 		       job->ios_base, job->size_in_ios);
220 	}
221 
222 	if (g_performance_dump_active == true) {
223 		/* Use job's actual run time as Job has ended */
224 		if (job->io_failed > 0 && !job->continue_on_failure) {
225 			time_in_usec = job->run_time_in_usec;
226 		} else {
227 			time_in_usec = stats->io_time_in_usec;
228 		}
229 	} else {
230 		time_in_usec = job->run_time_in_usec;
231 	}
232 
233 	if (stats->ema_period == 0) {
234 		io_per_second = get_cma_io_per_second(job, time_in_usec);
235 	} else {
236 		io_per_second = get_ema_io_per_second(job, stats->ema_period);
237 	}
238 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
239 
240 	failed_per_second = (double)job->io_failed * 1000000 / time_in_usec;
241 	timeout_per_second = (double)job->io_timeout * 1000000 / time_in_usec;
242 
243 	printf("\t %-20s: %10.2f %10.2f %10.2f",
244 	       job->name, (float)time_in_usec / 1000000, io_per_second, mb_per_second);
245 	printf(" %10.2f %8.2f\n",
246 	       failed_per_second, timeout_per_second);
247 
248 	stats->total_io_per_second += io_per_second;
249 	stats->total_mb_per_second += mb_per_second;
250 	stats->total_failed_per_second += failed_per_second;
251 	stats->total_timeout_per_second += timeout_per_second;
252 }
253 
254 static void
255 generate_data(void *buf, int buf_len, int block_size, void *md_buf, int md_size,
256 	      int num_blocks)
257 {
258 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
259 
260 	if (buf_len < num_blocks * block_size) {
261 		return;
262 	}
263 
264 	if (md_buf == NULL) {
265 		data_block_size = block_size - md_size;
266 		md_buf = (char *)buf + data_block_size;
267 		md_offset = block_size;
268 	} else {
269 		data_block_size = block_size;
270 		md_offset = md_size;
271 	}
272 
273 	while (offset_blocks < num_blocks) {
274 		inner_offset = 0;
275 		while (inner_offset < data_block_size) {
276 			*(uint32_t *)buf = offset_blocks + inner_offset;
277 			inner_offset += sizeof(uint32_t);
278 			buf += sizeof(uint32_t);
279 		}
280 		memset(md_buf, offset_blocks, md_size);
281 		md_buf += md_offset;
282 		offset_blocks++;
283 	}
284 }
285 
286 static bool
287 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
288 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
289 {
290 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
291 		return false;
292 	}
293 
294 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
295 
296 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
297 
298 	if (wr_md_buf != NULL) {
299 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
300 	}
301 
302 	return true;
303 }
304 
305 static bool
306 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
307 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
308 {
309 	int offset_blocks = 0, md_offset, data_block_size;
310 
311 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
312 		return false;
313 	}
314 
315 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
316 
317 	if (wr_md_buf == NULL) {
318 		data_block_size = block_size - md_size;
319 		wr_md_buf = (char *)wr_buf + data_block_size;
320 		rd_md_buf = (char *)rd_buf + data_block_size;
321 		md_offset = block_size;
322 	} else {
323 		data_block_size = block_size;
324 		md_offset = md_size;
325 	}
326 
327 	while (offset_blocks < num_blocks) {
328 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
329 			return false;
330 		}
331 
332 		wr_buf += block_size;
333 		rd_buf += block_size;
334 
335 		if (md_check) {
336 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
337 				return false;
338 			}
339 
340 			wr_md_buf += md_offset;
341 			rd_md_buf += md_offset;
342 		}
343 
344 		offset_blocks++;
345 	}
346 
347 	return true;
348 }
349 
350 static void
351 free_job_config(void)
352 {
353 	struct job_config *config, *tmp;
354 
355 	spdk_conf_free(g_bdevperf_conf);
356 	g_bdevperf_conf = NULL;
357 
358 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
359 		TAILQ_REMOVE(&job_config_list, config, link);
360 		free(config);
361 	}
362 }
363 
364 static void
365 bdevperf_job_free(struct bdevperf_job *job)
366 {
367 	spdk_bit_array_free(&job->outstanding);
368 	spdk_zipf_free(&job->zipf);
369 	free(job->name);
370 	free(job);
371 }
372 
373 static void
374 bdevperf_test_done(void *ctx)
375 {
376 	struct bdevperf_job *job, *jtmp;
377 	struct bdevperf_task *task, *ttmp;
378 	int rc;
379 	uint64_t time_in_usec;
380 
381 	if (g_time_in_usec) {
382 		g_stats.io_time_in_usec = g_time_in_usec;
383 
384 		if (!g_run_rc && g_performance_dump_active) {
385 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
386 			return;
387 		}
388 	}
389 
390 	if (g_show_performance_real_time) {
391 		spdk_poller_unregister(&g_perf_timer);
392 	}
393 
394 	if (g_shutdown) {
395 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
396 		time_in_usec = g_shutdown_tsc * 1000000 / spdk_get_ticks_hz();
397 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
398 		printf("Received shutdown signal, test time was about %.6f seconds\n",
399 		       (double)g_time_in_usec / 1000000);
400 	}
401 
402 	printf("\n\r %-*s: %10s %10s %10s %10s %8s\n",
403 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s");
404 
405 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
406 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
407 
408 		performance_dump_job(&g_stats, job);
409 
410 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
411 			TAILQ_REMOVE(&job->task_list, task, link);
412 			spdk_free(task->buf);
413 			spdk_free(task->md_buf);
414 			free(task);
415 		}
416 
417 		bdevperf_job_free(job);
418 	}
419 
420 	printf("\r ==================================================================================\n");
421 	printf("\r %-28s: %10s %10.2f %10.2f",
422 	       "Total", "", g_stats.total_io_per_second, g_stats.total_mb_per_second);
423 	printf(" %10.2f %8.2f\n",
424 	       g_stats.total_failed_per_second, g_stats.total_timeout_per_second);
425 	fflush(stdout);
426 
427 	rc = g_run_rc;
428 	if (g_request && !g_shutdown) {
429 		rpc_perform_tests_cb();
430 		if (rc != 0) {
431 			spdk_app_stop(rc);
432 		}
433 	} else {
434 		spdk_app_stop(rc);
435 	}
436 }
437 
438 static void
439 bdevperf_job_end(void *ctx)
440 {
441 	assert(g_main_thread == spdk_get_thread());
442 
443 	if (--g_bdevperf.running_jobs == 0) {
444 		bdevperf_test_done(NULL);
445 	}
446 }
447 
448 static void
449 bdevperf_end_task(struct bdevperf_task *task)
450 {
451 	struct bdevperf_job     *job = task->job;
452 	uint64_t		end_tsc = 0;
453 
454 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
455 	if (job->is_draining) {
456 		if (job->current_queue_depth == 0) {
457 			end_tsc = spdk_get_ticks() - g_start_tsc;
458 			job->run_time_in_usec = end_tsc * 1000000 / spdk_get_ticks_hz();
459 			spdk_put_io_channel(job->ch);
460 			spdk_bdev_close(job->bdev_desc);
461 			spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
462 		}
463 	}
464 }
465 
466 static void
467 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
468 {
469 	struct bdevperf_job	*job = task->job;
470 
471 	task->bdev_io_wait.bdev = job->bdev;
472 	task->bdev_io_wait.cb_fn = cb_fn;
473 	task->bdev_io_wait.cb_arg = task;
474 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
475 }
476 
477 static int
478 bdevperf_job_drain(void *ctx)
479 {
480 	struct bdevperf_job *job = ctx;
481 
482 	spdk_poller_unregister(&job->run_timer);
483 	if (job->reset) {
484 		spdk_poller_unregister(&job->reset_timer);
485 	}
486 
487 	job->is_draining = true;
488 
489 	return -1;
490 }
491 
492 static void
493 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
494 {
495 	struct bdevperf_task	*task = cb_arg;
496 	struct bdevperf_job	*job = task->job;
497 
498 	job->current_queue_depth--;
499 
500 	if (success) {
501 		job->io_completed++;
502 	} else {
503 		job->io_failed++;
504 		if (!job->continue_on_failure) {
505 			bdevperf_job_drain(job);
506 			g_run_rc = -1;
507 		}
508 	}
509 
510 	spdk_bdev_free_io(bdev_io);
511 	bdevperf_end_task(task);
512 }
513 
514 static int
515 bdevperf_verify_dif(struct bdevperf_task *task, struct iovec *iovs, int iovcnt)
516 {
517 	struct bdevperf_job	*job = task->job;
518 	struct spdk_bdev	*bdev = job->bdev;
519 	struct spdk_dif_ctx	dif_ctx;
520 	struct spdk_dif_error	err_blk = {};
521 	int			rc;
522 
523 	rc = spdk_dif_ctx_init(&dif_ctx,
524 			       spdk_bdev_get_block_size(bdev),
525 			       spdk_bdev_get_md_size(bdev),
526 			       spdk_bdev_is_md_interleaved(bdev),
527 			       spdk_bdev_is_dif_head_of_md(bdev),
528 			       spdk_bdev_get_dif_type(bdev),
529 			       job->dif_check_flags,
530 			       task->offset_blocks, 0, 0, 0, 0);
531 	if (rc != 0) {
532 		fprintf(stderr, "Initialization of DIF context failed\n");
533 		return rc;
534 	}
535 
536 	if (spdk_bdev_is_md_interleaved(bdev)) {
537 		rc = spdk_dif_verify(iovs, iovcnt, job->io_size_blocks, &dif_ctx, &err_blk);
538 	} else {
539 		struct iovec md_iov = {
540 			.iov_base	= task->md_buf,
541 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
542 		};
543 
544 		rc = spdk_dix_verify(iovs, iovcnt, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
545 	}
546 
547 	if (rc != 0) {
548 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
549 			err_blk.err_type, err_blk.err_offset);
550 	}
551 
552 	return rc;
553 }
554 
555 static void
556 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
557 {
558 	struct bdevperf_job	*job;
559 	struct bdevperf_task	*task = cb_arg;
560 	struct iovec		*iovs;
561 	int			iovcnt;
562 	bool			md_check;
563 	uint64_t		offset_in_ios;
564 	int			rc;
565 
566 	job = task->job;
567 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
568 
569 	if (g_error_to_exit == true) {
570 		bdevperf_job_drain(job);
571 	} else if (!success) {
572 		if (!job->reset && !job->continue_on_failure) {
573 			bdevperf_job_drain(job);
574 			g_run_rc = -1;
575 			g_error_to_exit = true;
576 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
577 			       task->offset_blocks, job->name);
578 		}
579 	} else if (job->verify || job->reset) {
580 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
581 		assert(iovcnt == 1);
582 		assert(iovs != NULL);
583 		if (!verify_data(task->buf, job->buf_size, iovs[0].iov_base, iovs[0].iov_len,
584 				 spdk_bdev_get_block_size(job->bdev),
585 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
586 				 spdk_bdev_get_md_size(job->bdev),
587 				 job->io_size_blocks, md_check)) {
588 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
589 			printf("   First dword expected 0x%x got 0x%x\n", *(int *)task->buf, *(int *)iovs[0].iov_base);
590 			bdevperf_job_drain(job);
591 			g_run_rc = -1;
592 		}
593 	} else if (job->dif_check_flags != 0) {
594 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
595 			spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
596 			assert(iovcnt == 1);
597 			assert(iovs != NULL);
598 			rc = bdevperf_verify_dif(task, iovs, iovcnt);
599 			if (rc != 0) {
600 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
601 				       task->offset_blocks, job->name);
602 
603 				success = false;
604 				if (!job->reset && !job->continue_on_failure) {
605 					bdevperf_job_drain(job);
606 					g_run_rc = -1;
607 					g_error_to_exit = true;
608 				}
609 			}
610 		}
611 	}
612 
613 	job->current_queue_depth--;
614 
615 	if (success) {
616 		job->io_completed++;
617 	} else {
618 		job->io_failed++;
619 	}
620 
621 	if (job->verify) {
622 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
623 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
624 
625 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
626 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
627 	}
628 
629 	spdk_bdev_free_io(bdev_io);
630 
631 	/*
632 	 * is_draining indicates when time has expired for the test run
633 	 * and we are just waiting for the previously submitted I/O
634 	 * to complete.  In this case, do not submit a new I/O to replace
635 	 * the one just completed.
636 	 */
637 	if (!job->is_draining) {
638 		bdevperf_submit_single(job, task);
639 	} else {
640 		bdevperf_end_task(task);
641 	}
642 }
643 
644 static void
645 bdevperf_verify_submit_read(void *cb_arg)
646 {
647 	struct bdevperf_job	*job;
648 	struct bdevperf_task	*task = cb_arg;
649 	int			rc;
650 
651 	job = task->job;
652 
653 	/* Read the data back in */
654 	rc = spdk_bdev_read_blocks_with_md(job->bdev_desc, job->ch, NULL, NULL,
655 					   task->offset_blocks, job->io_size_blocks,
656 					   bdevperf_complete, task);
657 
658 	if (rc == -ENOMEM) {
659 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
660 	} else if (rc != 0) {
661 		printf("Failed to submit read: %d\n", rc);
662 		bdevperf_job_drain(job);
663 		g_run_rc = rc;
664 	}
665 }
666 
667 static void
668 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
669 			       void *cb_arg)
670 {
671 	if (success) {
672 		spdk_bdev_free_io(bdev_io);
673 		bdevperf_verify_submit_read(cb_arg);
674 	} else {
675 		bdevperf_complete(bdev_io, success, cb_arg);
676 	}
677 }
678 
679 static void
680 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
681 {
682 	if (!success) {
683 		bdevperf_complete(bdev_io, success, cb_arg);
684 		return;
685 	}
686 
687 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
688 }
689 
690 static int
691 bdevperf_generate_dif(struct bdevperf_task *task)
692 {
693 	struct bdevperf_job	*job = task->job;
694 	struct spdk_bdev	*bdev = job->bdev;
695 	struct spdk_dif_ctx	dif_ctx;
696 	int			rc;
697 
698 	rc = spdk_dif_ctx_init(&dif_ctx,
699 			       spdk_bdev_get_block_size(bdev),
700 			       spdk_bdev_get_md_size(bdev),
701 			       spdk_bdev_is_md_interleaved(bdev),
702 			       spdk_bdev_is_dif_head_of_md(bdev),
703 			       spdk_bdev_get_dif_type(bdev),
704 			       job->dif_check_flags,
705 			       task->offset_blocks, 0, 0, 0, 0);
706 	if (rc != 0) {
707 		fprintf(stderr, "Initialization of DIF context failed\n");
708 		return rc;
709 	}
710 
711 	if (spdk_bdev_is_md_interleaved(bdev)) {
712 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
713 	} else {
714 		struct iovec md_iov = {
715 			.iov_base	= task->md_buf,
716 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
717 		};
718 
719 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
720 	}
721 
722 	if (rc != 0) {
723 		fprintf(stderr, "Generation of DIF/DIX failed\n");
724 	}
725 
726 	return rc;
727 }
728 
729 static void
730 bdevperf_submit_task(void *arg)
731 {
732 	struct bdevperf_task	*task = arg;
733 	struct bdevperf_job	*job = task->job;
734 	struct spdk_bdev_desc	*desc;
735 	struct spdk_io_channel	*ch;
736 	spdk_bdev_io_completion_cb cb_fn;
737 	uint64_t		offset_in_ios;
738 	int			rc = 0;
739 
740 	desc = job->bdev_desc;
741 	ch = job->ch;
742 
743 	switch (task->io_type) {
744 	case SPDK_BDEV_IO_TYPE_WRITE:
745 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
746 			rc = bdevperf_generate_dif(task);
747 		}
748 		if (rc == 0) {
749 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
750 
751 			if (g_zcopy) {
752 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
753 				return;
754 			} else {
755 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
756 								     task->md_buf,
757 								     task->offset_blocks,
758 								     job->io_size_blocks,
759 								     cb_fn, task);
760 			}
761 		}
762 		break;
763 	case SPDK_BDEV_IO_TYPE_FLUSH:
764 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
765 					    job->io_size_blocks, bdevperf_complete, task);
766 		break;
767 	case SPDK_BDEV_IO_TYPE_UNMAP:
768 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
769 					    job->io_size_blocks, bdevperf_complete, task);
770 		break;
771 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
772 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
773 						   job->io_size_blocks, bdevperf_complete, task);
774 		break;
775 	case SPDK_BDEV_IO_TYPE_READ:
776 		if (g_zcopy) {
777 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
778 						   true, bdevperf_zcopy_populate_complete, task);
779 		} else {
780 			rc = spdk_bdev_read_blocks_with_md(desc, ch, task->buf, task->md_buf,
781 							   task->offset_blocks,
782 							   job->io_size_blocks,
783 							   bdevperf_complete, task);
784 		}
785 		break;
786 	case SPDK_BDEV_IO_TYPE_ABORT:
787 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
788 		break;
789 	default:
790 		assert(false);
791 		rc = -EINVAL;
792 		break;
793 	}
794 
795 	if (rc == -ENOMEM) {
796 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
797 		return;
798 	} else if (rc != 0) {
799 		printf("Failed to submit bdev_io: %d\n", rc);
800 		if (job->verify) {
801 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
802 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
803 
804 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
805 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
806 		}
807 		bdevperf_job_drain(job);
808 		g_run_rc = rc;
809 		return;
810 	}
811 
812 	job->current_queue_depth++;
813 }
814 
815 static void
816 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
817 {
818 	struct bdevperf_task	*task = cb_arg;
819 	struct bdevperf_job	*job = task->job;
820 	struct iovec		*iovs;
821 	int			iovcnt;
822 
823 	if (!success) {
824 		bdevperf_job_drain(job);
825 		g_run_rc = -1;
826 		return;
827 	}
828 
829 	task->bdev_io = bdev_io;
830 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
831 
832 	if (job->verify || job->reset) {
833 		/* When job->verify or job->reset is enabled, task->buf is used for
834 		 *  verification of read after write.  For write I/O, when zcopy APIs
835 		 *  are used, task->buf cannot be used, and data must be written to
836 		 *  the data buffer allocated underneath bdev layer instead.
837 		 *  Hence we copy task->buf to the allocated data buffer here.
838 		 */
839 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
840 		assert(iovcnt == 1);
841 		assert(iovs != NULL);
842 
843 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
844 			  spdk_bdev_get_block_size(job->bdev),
845 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
846 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
847 	}
848 
849 	bdevperf_submit_task(task);
850 }
851 
852 static void
853 bdevperf_prep_zcopy_write_task(void *arg)
854 {
855 	struct bdevperf_task	*task = arg;
856 	struct bdevperf_job	*job = task->job;
857 	int			rc;
858 
859 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
860 				   task->offset_blocks, job->io_size_blocks,
861 				   false, bdevperf_zcopy_get_buf_complete, task);
862 	if (rc != 0) {
863 		assert(rc == -ENOMEM);
864 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
865 		return;
866 	}
867 
868 	job->current_queue_depth++;
869 }
870 
871 static struct bdevperf_task *
872 bdevperf_job_get_task(struct bdevperf_job *job)
873 {
874 	struct bdevperf_task *task;
875 
876 	task = TAILQ_FIRST(&job->task_list);
877 	if (!task) {
878 		printf("Task allocation failed\n");
879 		abort();
880 	}
881 
882 	TAILQ_REMOVE(&job->task_list, task, link);
883 	return task;
884 }
885 
886 static void
887 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
888 {
889 	uint64_t offset_in_ios;
890 
891 	if (job->zipf) {
892 		offset_in_ios = spdk_zipf_generate(job->zipf);
893 	} else if (job->is_random) {
894 		offset_in_ios = rand_r(&job->seed) % job->size_in_ios;
895 	} else {
896 		offset_in_ios = job->offset_in_ios++;
897 		if (job->offset_in_ios == job->size_in_ios) {
898 			job->offset_in_ios = 0;
899 		}
900 
901 		/* Increment of offset_in_ios if there's already an outstanding IO
902 		 * to that location. We only need this with job->verify as random
903 		 * offsets are not supported with job->verify at this time.
904 		 */
905 		if (job->verify) {
906 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
907 
908 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
909 				offset_in_ios = job->offset_in_ios++;
910 				if (job->offset_in_ios == job->size_in_ios) {
911 					job->offset_in_ios = 0;
912 				}
913 			}
914 			spdk_bit_array_set(job->outstanding, offset_in_ios);
915 		}
916 	}
917 
918 	/* For multi-thread to same job, offset_in_ios is relative
919 	 * to the LBA range assigned for that job. job->offset_blocks
920 	 * is absolute (entire bdev LBA range).
921 	 */
922 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
923 
924 	if (job->verify || job->reset) {
925 		generate_data(task->buf, job->buf_size,
926 			      spdk_bdev_get_block_size(job->bdev),
927 			      task->md_buf, spdk_bdev_get_md_size(job->bdev),
928 			      job->io_size_blocks);
929 		if (g_zcopy) {
930 			bdevperf_prep_zcopy_write_task(task);
931 			return;
932 		} else {
933 			task->iov.iov_base = task->buf;
934 			task->iov.iov_len = job->buf_size;
935 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
936 		}
937 	} else if (job->flush) {
938 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
939 	} else if (job->unmap) {
940 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
941 	} else if (job->write_zeroes) {
942 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
943 	} else if ((job->rw_percentage == 100) ||
944 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
945 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
946 	} else {
947 		if (g_zcopy) {
948 			bdevperf_prep_zcopy_write_task(task);
949 			return;
950 		} else {
951 			task->iov.iov_base = task->buf;
952 			task->iov.iov_len = job->buf_size;
953 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
954 		}
955 	}
956 
957 	bdevperf_submit_task(task);
958 }
959 
960 static int reset_job(void *arg);
961 
962 static void
963 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
964 {
965 	struct bdevperf_task	*task = cb_arg;
966 	struct bdevperf_job	*job = task->job;
967 
968 	if (!success) {
969 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
970 		bdevperf_job_drain(job);
971 		g_run_rc = -1;
972 	}
973 
974 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
975 	spdk_bdev_free_io(bdev_io);
976 
977 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
978 						10 * 1000000);
979 }
980 
981 static int
982 reset_job(void *arg)
983 {
984 	struct bdevperf_job *job = arg;
985 	struct bdevperf_task *task;
986 	int rc;
987 
988 	spdk_poller_unregister(&job->reset_timer);
989 
990 	/* Do reset. */
991 	task = bdevperf_job_get_task(job);
992 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
993 			     reset_cb, task);
994 	if (rc) {
995 		printf("Reset failed: %d\n", rc);
996 		bdevperf_job_drain(job);
997 		g_run_rc = -1;
998 	}
999 
1000 	return -1;
1001 }
1002 
1003 static void
1004 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1005 {
1006 	struct bdevperf_job *job = cb_arg;
1007 	struct bdevperf_task *task;
1008 
1009 	job->io_timeout++;
1010 
1011 	if (job->is_draining || !job->abort ||
1012 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1013 		return;
1014 	}
1015 
1016 	task = bdevperf_job_get_task(job);
1017 	if (task == NULL) {
1018 		return;
1019 	}
1020 
1021 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1022 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1023 
1024 	bdevperf_submit_task(task);
1025 }
1026 
1027 static void
1028 bdevperf_job_run(void *ctx)
1029 {
1030 	struct bdevperf_job *job = ctx;
1031 	struct bdevperf_task *task;
1032 	int i;
1033 
1034 	/* Submit initial I/O for this job. Each time one
1035 	 * completes, another will be submitted. */
1036 
1037 	/* Start a timer to stop this I/O chain when the run is over */
1038 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain, job, g_time_in_usec);
1039 	if (job->reset) {
1040 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1041 							10 * 1000000);
1042 	}
1043 
1044 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1045 
1046 	for (i = 0; i < job->queue_depth; i++) {
1047 		task = bdevperf_job_get_task(job);
1048 		bdevperf_submit_single(job, task);
1049 	}
1050 }
1051 
1052 static void
1053 _performance_dump_done(void *ctx)
1054 {
1055 	struct bdevperf_aggregate_stats *stats = ctx;
1056 
1057 	printf("\r ==================================================================================\n");
1058 	printf("\r %-28s: %10s %10.2f %10.2f",
1059 	       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1060 	printf(" %10.2f %8.2f\n",
1061 	       stats->total_failed_per_second, stats->total_timeout_per_second);
1062 	fflush(stdout);
1063 
1064 	g_performance_dump_active = false;
1065 
1066 	free(stats);
1067 }
1068 
1069 static void
1070 _performance_dump(void *ctx)
1071 {
1072 	struct bdevperf_aggregate_stats *stats = ctx;
1073 
1074 	performance_dump_job(stats, stats->current_job);
1075 
1076 	/* This assumes the jobs list is static after start up time.
1077 	 * That's true right now, but if that ever changed this would need a lock. */
1078 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1079 	if (stats->current_job == NULL) {
1080 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1081 	} else {
1082 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1083 	}
1084 }
1085 
1086 static int
1087 performance_statistics_thread(void *arg)
1088 {
1089 	struct bdevperf_aggregate_stats *stats;
1090 
1091 	if (g_performance_dump_active) {
1092 		return -1;
1093 	}
1094 
1095 	g_performance_dump_active = true;
1096 
1097 	stats = calloc(1, sizeof(*stats));
1098 	if (stats == NULL) {
1099 		return -1;
1100 	}
1101 
1102 	g_show_performance_period_num++;
1103 
1104 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1105 	stats->ema_period = g_show_performance_ema_period;
1106 
1107 	/* Iterate all of the jobs to gather stats
1108 	 * These jobs will not get removed here until a final performance dump is run,
1109 	 * so this should be safe without locking.
1110 	 */
1111 	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1112 	if (stats->current_job == NULL) {
1113 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1114 	} else {
1115 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1116 	}
1117 
1118 	return -1;
1119 }
1120 
1121 static void
1122 bdevperf_test(void)
1123 {
1124 	struct bdevperf_job *job;
1125 
1126 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / 1000000);
1127 	fflush(stdout);
1128 
1129 	/* Start a timer to dump performance numbers */
1130 	g_start_tsc = spdk_get_ticks();
1131 	if (g_show_performance_real_time && !g_perf_timer) {
1132 		printf("\r %-*s: %10s %10s %10s %10s %8s\n",
1133 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s");
1134 
1135 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1136 						    g_show_performance_period_in_usec);
1137 	}
1138 
1139 	/* Iterate jobs to start all I/O */
1140 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1141 		g_bdevperf.running_jobs++;
1142 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1143 	}
1144 }
1145 
1146 static void
1147 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1148 {
1149 	struct bdevperf_job *job = event_ctx;
1150 
1151 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1152 		bdevperf_job_drain(job);
1153 	}
1154 }
1155 
1156 static uint32_t g_construct_job_count = 0;
1157 
1158 static void
1159 _bdevperf_construct_job_done(void *ctx)
1160 {
1161 	if (--g_construct_job_count == 0) {
1162 
1163 		if (g_run_rc != 0) {
1164 			/* Something failed. */
1165 			bdevperf_test_done(NULL);
1166 			return;
1167 		}
1168 
1169 		/* Ready to run the test */
1170 		bdevperf_test();
1171 	} else if (g_run_rc != 0) {
1172 		/* Reset error as some jobs constructed right */
1173 		g_run_rc = 0;
1174 		if (g_continue_on_failure == false) {
1175 			g_error_to_exit = true;
1176 		}
1177 	}
1178 }
1179 
1180 /* Checkformat will not allow to use inlined type,
1181    this is a workaround */
1182 typedef struct spdk_thread *spdk_thread_t;
1183 
1184 static spdk_thread_t
1185 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1186 {
1187 	struct spdk_cpuset tmp;
1188 
1189 	/* This function runs on the main thread. */
1190 	assert(g_main_thread == spdk_get_thread());
1191 
1192 	/* Handle default mask */
1193 	if (spdk_cpuset_count(cpumask) == 0) {
1194 		cpumask = &g_all_cpuset;
1195 	}
1196 
1197 	/* Warn user that mask might need to be changed */
1198 	spdk_cpuset_copy(&tmp, cpumask);
1199 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1200 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1201 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1202 	}
1203 
1204 	return spdk_thread_create(tag, cpumask);
1205 }
1206 
1207 static uint32_t
1208 _get_next_core(void)
1209 {
1210 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1211 
1212 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1213 		current_core = spdk_env_get_first_core();
1214 		return current_core;
1215 	}
1216 
1217 	current_core = spdk_env_get_next_core(current_core);
1218 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1219 		current_core = spdk_env_get_first_core();
1220 	}
1221 
1222 	return current_core;
1223 }
1224 
1225 static void
1226 _bdevperf_construct_job(void *ctx)
1227 {
1228 	struct bdevperf_job *job = ctx;
1229 	int rc;
1230 
1231 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1232 				&job->bdev_desc);
1233 	if (rc != 0) {
1234 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1235 		g_run_rc = -EINVAL;
1236 		goto end;
1237 	}
1238 
1239 	if (g_zcopy) {
1240 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1241 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1242 			g_run_rc = -ENOTSUP;
1243 			goto end;
1244 		}
1245 	}
1246 
1247 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1248 	if (!job->ch) {
1249 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1250 			    rc);
1251 		spdk_bdev_close(job->bdev_desc);
1252 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1253 		g_run_rc = -ENOMEM;
1254 		goto end;
1255 	}
1256 
1257 end:
1258 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1259 }
1260 
1261 static void
1262 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1263 {
1264 	switch (rw) {
1265 	case JOB_CONFIG_RW_READ:
1266 		job->rw_percentage = 100;
1267 		break;
1268 	case JOB_CONFIG_RW_WRITE:
1269 		job->rw_percentage = 0;
1270 		break;
1271 	case JOB_CONFIG_RW_RANDREAD:
1272 		job->is_random = true;
1273 		job->rw_percentage = 100;
1274 		job->seed = rand();
1275 		break;
1276 	case JOB_CONFIG_RW_RANDWRITE:
1277 		job->is_random = true;
1278 		job->rw_percentage = 0;
1279 		job->seed = rand();
1280 		break;
1281 	case JOB_CONFIG_RW_RW:
1282 		job->is_random = false;
1283 		break;
1284 	case JOB_CONFIG_RW_RANDRW:
1285 		job->is_random = true;
1286 		job->seed = rand();
1287 		break;
1288 	case JOB_CONFIG_RW_VERIFY:
1289 		job->verify = true;
1290 		job->rw_percentage = 50;
1291 		break;
1292 	case JOB_CONFIG_RW_RESET:
1293 		job->reset = true;
1294 		job->verify = true;
1295 		job->rw_percentage = 50;
1296 		break;
1297 	case JOB_CONFIG_RW_UNMAP:
1298 		job->unmap = true;
1299 		break;
1300 	case JOB_CONFIG_RW_FLUSH:
1301 		job->flush = true;
1302 		break;
1303 	case JOB_CONFIG_RW_WRITE_ZEROES:
1304 		job->write_zeroes = true;
1305 		break;
1306 	}
1307 }
1308 
1309 static int
1310 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1311 		       struct spdk_thread *thread)
1312 {
1313 	struct bdevperf_job *job;
1314 	struct bdevperf_task *task;
1315 	int block_size, data_block_size;
1316 	int rc;
1317 	int task_num, n;
1318 
1319 	block_size = spdk_bdev_get_block_size(bdev);
1320 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1321 
1322 	job = calloc(1, sizeof(struct bdevperf_job));
1323 	if (!job) {
1324 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1325 		return -ENOMEM;
1326 	}
1327 
1328 	job->name = strdup(spdk_bdev_get_name(bdev));
1329 	if (!job->name) {
1330 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1331 		bdevperf_job_free(job);
1332 		return -ENOMEM;
1333 	}
1334 
1335 	job->workload_type = g_workload_type;
1336 	job->io_size = config->bs;
1337 	job->rw_percentage = config->rwmixread;
1338 	job->continue_on_failure = g_continue_on_failure;
1339 	job->queue_depth = config->iodepth;
1340 	job->bdev = bdev;
1341 	job->io_size_blocks = job->io_size / data_block_size;
1342 	job->buf_size = job->io_size_blocks * block_size;
1343 	job->abort = g_abort;
1344 	job_init_rw(job, config->rw);
1345 
1346 	if ((job->io_size % data_block_size) != 0) {
1347 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1348 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1349 		bdevperf_job_free(job);
1350 		return -ENOTSUP;
1351 	}
1352 
1353 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1354 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1355 		bdevperf_job_free(job);
1356 		return -ENOTSUP;
1357 	}
1358 
1359 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1360 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1361 	}
1362 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1363 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1364 	}
1365 
1366 	job->offset_in_ios = 0;
1367 
1368 	if (config->length != 0) {
1369 		/* Use subset of disk */
1370 		job->size_in_ios = config->length / job->io_size_blocks;
1371 		job->ios_base = config->offset / job->io_size_blocks;
1372 	} else {
1373 		/* Use whole disk */
1374 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1375 		job->ios_base = 0;
1376 	}
1377 
1378 	if (job->is_random && g_zipf_theta > 0) {
1379 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1380 	}
1381 
1382 	if (job->verify) {
1383 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1384 		if (job->outstanding == NULL) {
1385 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1386 				    spdk_bdev_get_name(bdev));
1387 			bdevperf_job_free(job);
1388 			return -ENOMEM;
1389 		}
1390 	}
1391 
1392 	TAILQ_INIT(&job->task_list);
1393 
1394 	task_num = job->queue_depth;
1395 	if (job->reset) {
1396 		task_num += 1;
1397 	}
1398 	if (job->abort) {
1399 		task_num += job->queue_depth;
1400 	}
1401 
1402 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1403 
1404 	for (n = 0; n < task_num; n++) {
1405 		task = calloc(1, sizeof(struct bdevperf_task));
1406 		if (!task) {
1407 			fprintf(stderr, "Failed to allocate task from memory\n");
1408 			return -ENOMEM;
1409 		}
1410 
1411 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1412 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1413 		if (!task->buf) {
1414 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
1415 			free(task);
1416 			return -ENOMEM;
1417 		}
1418 
1419 		if (spdk_bdev_is_md_separate(job->bdev)) {
1420 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
1421 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
1422 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1423 			if (!task->md_buf) {
1424 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
1425 				spdk_free(task->buf);
1426 				free(task);
1427 				return -ENOMEM;
1428 			}
1429 		}
1430 
1431 		task->job = job;
1432 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
1433 	}
1434 
1435 	job->thread = thread;
1436 
1437 	g_construct_job_count++;
1438 
1439 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
1440 	assert(rc == 0);
1441 
1442 	return rc;
1443 }
1444 
1445 static int
1446 parse_rw(const char *str, enum job_config_rw ret)
1447 {
1448 	if (str == NULL) {
1449 		return ret;
1450 	}
1451 
1452 	if (!strcmp(str, "read")) {
1453 		ret = JOB_CONFIG_RW_READ;
1454 	} else if (!strcmp(str, "randread")) {
1455 		ret = JOB_CONFIG_RW_RANDREAD;
1456 	} else if (!strcmp(str, "write")) {
1457 		ret = JOB_CONFIG_RW_WRITE;
1458 	} else if (!strcmp(str, "randwrite")) {
1459 		ret = JOB_CONFIG_RW_RANDWRITE;
1460 	} else if (!strcmp(str, "verify")) {
1461 		ret = JOB_CONFIG_RW_VERIFY;
1462 	} else if (!strcmp(str, "reset")) {
1463 		ret = JOB_CONFIG_RW_RESET;
1464 	} else if (!strcmp(str, "unmap")) {
1465 		ret = JOB_CONFIG_RW_UNMAP;
1466 	} else if (!strcmp(str, "write_zeroes")) {
1467 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
1468 	} else if (!strcmp(str, "flush")) {
1469 		ret = JOB_CONFIG_RW_FLUSH;
1470 	} else if (!strcmp(str, "rw")) {
1471 		ret = JOB_CONFIG_RW_RW;
1472 	} else if (!strcmp(str, "randrw")) {
1473 		ret = JOB_CONFIG_RW_RANDRW;
1474 	} else {
1475 		fprintf(stderr, "rw must be one of\n"
1476 			"(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
1477 		ret = BDEVPERF_CONFIG_ERROR;
1478 	}
1479 
1480 	return ret;
1481 }
1482 
1483 static const char *
1484 config_filename_next(const char *filename, char *out)
1485 {
1486 	int i, k;
1487 
1488 	if (filename == NULL) {
1489 		out[0] = '\0';
1490 		return NULL;
1491 	}
1492 
1493 	if (filename[0] == ':') {
1494 		filename++;
1495 	}
1496 
1497 	for (i = 0, k = 0;
1498 	     filename[i] != '\0' &&
1499 	     filename[i] != ':' &&
1500 	     i < BDEVPERF_CONFIG_MAX_FILENAME;
1501 	     i++) {
1502 		if (filename[i] == ' ' || filename[i] == '\t') {
1503 			continue;
1504 		}
1505 
1506 		out[k++] = filename[i];
1507 	}
1508 	out[k] = 0;
1509 
1510 	return filename + i;
1511 }
1512 
1513 static void
1514 bdevperf_construct_jobs(void)
1515 {
1516 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
1517 	struct spdk_thread *thread;
1518 	struct job_config *config;
1519 	struct spdk_bdev *bdev;
1520 	const char *filenames;
1521 	int rc;
1522 
1523 	TAILQ_FOREACH(config, &job_config_list, link) {
1524 		filenames = config->filename;
1525 
1526 		thread = construct_job_thread(&config->cpumask, config->name);
1527 		assert(thread);
1528 
1529 		while (filenames) {
1530 			filenames = config_filename_next(filenames, filename);
1531 			if (strlen(filename) == 0) {
1532 				break;
1533 			}
1534 
1535 			bdev = spdk_bdev_get_by_name(filename);
1536 			if (!bdev) {
1537 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
1538 				g_run_rc = -EINVAL;
1539 				return;
1540 			}
1541 
1542 			rc = bdevperf_construct_job(bdev, config, thread);
1543 			if (rc < 0) {
1544 				g_run_rc = rc;
1545 				return;
1546 			}
1547 		}
1548 	}
1549 }
1550 
1551 static int
1552 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
1553 {
1554 	struct job_config *config = calloc(1, sizeof(*config));
1555 
1556 	if (config == NULL) {
1557 		fprintf(stderr, "Unable to allocate memory for job config\n");
1558 		return -ENOMEM;
1559 	}
1560 
1561 	config->name = filename;
1562 	config->filename = filename;
1563 	spdk_cpuset_zero(&config->cpumask);
1564 	spdk_cpuset_set_cpu(&config->cpumask, _get_next_core(), true);
1565 	config->bs = g_io_size;
1566 	config->iodepth = g_queue_depth;
1567 	config->rwmixread = g_rw_percentage;
1568 	config->offset = offset;
1569 	config->length = range;
1570 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
1571 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
1572 		return -EINVAL;
1573 	}
1574 
1575 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
1576 	return 0;
1577 }
1578 
1579 static void
1580 bdevperf_construct_multithread_job_configs(void)
1581 {
1582 	struct spdk_bdev *bdev;
1583 	uint32_t i;
1584 	uint32_t num_cores;
1585 	uint64_t blocks_per_job;
1586 	int64_t offset;
1587 
1588 	num_cores = 0;
1589 	SPDK_ENV_FOREACH_CORE(i) {
1590 		num_cores++;
1591 	}
1592 
1593 	if (num_cores == 0) {
1594 		g_run_rc = -EINVAL;
1595 		return;
1596 	}
1597 
1598 	if (g_job_bdev_name != NULL) {
1599 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1600 		if (!bdev) {
1601 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1602 			return;
1603 		}
1604 
1605 		blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
1606 		offset = 0;
1607 
1608 		SPDK_ENV_FOREACH_CORE(i) {
1609 			g_run_rc = make_cli_job_config(g_job_bdev_name, offset, blocks_per_job);
1610 			if (g_run_rc) {
1611 				return;
1612 			}
1613 
1614 			offset += blocks_per_job;
1615 		}
1616 	} else {
1617 		bdev = spdk_bdev_first_leaf();
1618 		while (bdev != NULL) {
1619 			blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
1620 			offset = 0;
1621 
1622 			SPDK_ENV_FOREACH_CORE(i) {
1623 				g_run_rc = make_cli_job_config(spdk_bdev_get_name(bdev),
1624 							       offset, blocks_per_job);
1625 				if (g_run_rc) {
1626 					return;
1627 				}
1628 
1629 				offset += blocks_per_job;
1630 			}
1631 
1632 			bdev = spdk_bdev_next_leaf(bdev);
1633 		}
1634 	}
1635 }
1636 
1637 static void
1638 bdevperf_construct_job_configs(void)
1639 {
1640 	struct spdk_bdev *bdev;
1641 
1642 	/* There are three different modes for allocating jobs. Standard mode
1643 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
1644 	 *
1645 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
1646 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
1647 	 * This runs multiple threads per bdev, effectively.
1648 	 *
1649 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
1650 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
1651 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
1652 	 * which is different from other modes in that they only support global options.
1653 	 */
1654 
1655 	if (g_bdevperf_conf) {
1656 		goto end;
1657 	} else if (g_multithread_mode) {
1658 		bdevperf_construct_multithread_job_configs();
1659 		goto end;
1660 	}
1661 
1662 	if (g_job_bdev_name != NULL) {
1663 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1664 		if (bdev) {
1665 			/* Construct the job */
1666 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
1667 		} else {
1668 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1669 		}
1670 	} else {
1671 		bdev = spdk_bdev_first_leaf();
1672 
1673 		while (bdev != NULL) {
1674 			/* Construct the job */
1675 			g_run_rc = make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
1676 			if (g_run_rc) {
1677 				break;
1678 			}
1679 
1680 			bdev = spdk_bdev_next_leaf(bdev);
1681 		}
1682 	}
1683 
1684 end:
1685 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
1686 	 * of iteration.
1687 	 */
1688 	g_construct_job_count = 1;
1689 
1690 	if (g_run_rc == 0) {
1691 		bdevperf_construct_jobs();
1692 	}
1693 
1694 	_bdevperf_construct_job_done(NULL);
1695 }
1696 
1697 static int
1698 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
1699 {
1700 	const char *job_name;
1701 	int tmp;
1702 
1703 	tmp = spdk_conf_section_get_intval(s, name);
1704 	if (tmp == -1) {
1705 		/* Field was not found. Check default value
1706 		 * In [global] section it is ok to have undefined values
1707 		 * but for other sections it is not ok */
1708 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
1709 			job_name = spdk_conf_section_get_name(s);
1710 			if (strcmp(job_name, "global") == 0) {
1711 				return def;
1712 			}
1713 
1714 			fprintf(stderr,
1715 				"Job '%s' has no '%s' assigned\n",
1716 				job_name, name);
1717 			return BDEVPERF_CONFIG_ERROR;
1718 		}
1719 		return def;
1720 	}
1721 
1722 	/* NOTE: get_intval returns nonnegative on success */
1723 	if (tmp < 0) {
1724 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
1725 			spdk_conf_section_get_name(s), name);
1726 		return BDEVPERF_CONFIG_ERROR;
1727 	}
1728 
1729 	return tmp;
1730 }
1731 
1732 /* CLI arguments override parameters for global sections */
1733 static void
1734 config_set_cli_args(struct job_config *config)
1735 {
1736 	if (g_job_bdev_name) {
1737 		config->filename = g_job_bdev_name;
1738 	}
1739 	if (g_io_size > 0) {
1740 		config->bs = g_io_size;
1741 	}
1742 	if (g_queue_depth > 0) {
1743 		config->iodepth = g_queue_depth;
1744 	}
1745 	if (g_rw_percentage > 0) {
1746 		config->rwmixread = g_rw_percentage;
1747 	}
1748 	if (g_workload_type) {
1749 		config->rw = parse_rw(g_workload_type, config->rw);
1750 	}
1751 }
1752 
1753 static int
1754 read_job_config(void)
1755 {
1756 	struct job_config global_default_config;
1757 	struct job_config global_config;
1758 	struct spdk_conf_section *s;
1759 	struct job_config *config;
1760 	const char *cpumask;
1761 	const char *rw;
1762 	bool is_global;
1763 	int n = 0;
1764 	int val;
1765 
1766 	if (g_bdevperf_conf_file == NULL) {
1767 		return 0;
1768 	}
1769 
1770 	g_bdevperf_conf = spdk_conf_allocate();
1771 	if (g_bdevperf_conf == NULL) {
1772 		fprintf(stderr, "Could not allocate job config structure\n");
1773 		return 1;
1774 	}
1775 
1776 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
1777 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
1778 		fprintf(stderr, "Invalid job config");
1779 		return 1;
1780 	}
1781 
1782 	/* Initialize global defaults */
1783 	global_default_config.filename = NULL;
1784 	/* Zero mask is the same as g_all_cpuset
1785 	 * The g_all_cpuset is not initialized yet,
1786 	 * so use zero mask as the default instead */
1787 	spdk_cpuset_zero(&global_default_config.cpumask);
1788 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
1789 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
1790 	/* bdevperf has no default for -M option but in FIO the default is 50 */
1791 	global_default_config.rwmixread = 50;
1792 	global_default_config.offset = 0;
1793 	/* length 0 means 100% */
1794 	global_default_config.length = 0;
1795 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
1796 	config_set_cli_args(&global_default_config);
1797 
1798 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
1799 		return 1;
1800 	}
1801 
1802 	/* There is only a single instance of global job_config
1803 	 * We just reset its value when we encounter new [global] section */
1804 	global_config = global_default_config;
1805 
1806 	for (s = spdk_conf_first_section(g_bdevperf_conf);
1807 	     s != NULL;
1808 	     s = spdk_conf_next_section(s)) {
1809 		config = calloc(1, sizeof(*config));
1810 		if (config == NULL) {
1811 			fprintf(stderr, "Unable to allocate memory for job config\n");
1812 			return 1;
1813 		}
1814 
1815 		config->name = spdk_conf_section_get_name(s);
1816 		is_global = strcmp(config->name, "global") == 0;
1817 
1818 		if (is_global) {
1819 			global_config = global_default_config;
1820 		}
1821 
1822 		config->filename = spdk_conf_section_get_val(s, "filename");
1823 		if (config->filename == NULL) {
1824 			config->filename = global_config.filename;
1825 		}
1826 		if (!is_global) {
1827 			if (config->filename == NULL) {
1828 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
1829 				goto error;
1830 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
1831 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
1832 				fprintf(stderr,
1833 					"filename for '%s' job is too long. Max length is %d\n",
1834 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
1835 				goto error;
1836 			}
1837 		}
1838 
1839 		cpumask = spdk_conf_section_get_val(s, "cpumask");
1840 		if (cpumask == NULL) {
1841 			config->cpumask = global_config.cpumask;
1842 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
1843 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
1844 			goto error;
1845 		}
1846 
1847 		config->bs = parse_uint_option(s, "bs", global_config.bs);
1848 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
1849 			goto error;
1850 		} else if (config->bs == 0) {
1851 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
1852 			goto error;
1853 		}
1854 
1855 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
1856 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
1857 			goto error;
1858 		} else if (config->iodepth == 0) {
1859 			fprintf(stderr,
1860 				"'iodepth' of job '%s' must be greater than 0\n",
1861 				config->name);
1862 			goto error;
1863 		}
1864 
1865 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
1866 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
1867 			goto error;
1868 		} else if (config->rwmixread > 100) {
1869 			fprintf(stderr,
1870 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
1871 				config->name);
1872 			goto error;
1873 		}
1874 
1875 		config->offset = parse_uint_option(s, "offset", global_config.offset);
1876 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
1877 			goto error;
1878 		}
1879 
1880 		val = parse_uint_option(s, "length", global_config.length);
1881 		if (val == BDEVPERF_CONFIG_ERROR) {
1882 			goto error;
1883 		}
1884 		config->length = val;
1885 
1886 		rw = spdk_conf_section_get_val(s, "rw");
1887 		config->rw = parse_rw(rw, global_config.rw);
1888 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
1889 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
1890 			goto error;
1891 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
1892 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
1893 			goto error;
1894 		}
1895 
1896 		if (is_global) {
1897 			config_set_cli_args(config);
1898 			global_config = *config;
1899 			free(config);
1900 		} else {
1901 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
1902 			n++;
1903 		}
1904 	}
1905 
1906 	printf("Using job config with %d jobs\n", n);
1907 	return 0;
1908 error:
1909 	free(config);
1910 	return 1;
1911 }
1912 
1913 static void
1914 bdevperf_run(void *arg1)
1915 {
1916 	uint32_t i;
1917 
1918 	g_main_thread = spdk_get_thread();
1919 
1920 	spdk_cpuset_zero(&g_all_cpuset);
1921 	SPDK_ENV_FOREACH_CORE(i) {
1922 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
1923 	}
1924 
1925 	if (g_wait_for_tests) {
1926 		/* Do not perform any tests until RPC is received */
1927 		return;
1928 	}
1929 
1930 	bdevperf_construct_job_configs();
1931 }
1932 
1933 static void
1934 rpc_perform_tests_cb(void)
1935 {
1936 	struct spdk_json_write_ctx *w;
1937 	struct spdk_jsonrpc_request *request = g_request;
1938 
1939 	g_request = NULL;
1940 
1941 	if (g_run_rc == 0) {
1942 		w = spdk_jsonrpc_begin_result(request);
1943 		spdk_json_write_uint32(w, g_run_rc);
1944 		spdk_jsonrpc_end_result(request, w);
1945 	} else {
1946 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
1947 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
1948 	}
1949 
1950 	/* Reset g_run_rc to 0 for the next test run. */
1951 	g_run_rc = 0;
1952 
1953 	/* Reset g_stats to 0 for the next test run. */
1954 	memset(&g_stats, 0, sizeof(g_stats));
1955 }
1956 
1957 static void
1958 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
1959 {
1960 	if (params != NULL) {
1961 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
1962 						 "perform_tests method requires no parameters");
1963 		return;
1964 	}
1965 	if (g_request != NULL) {
1966 		fprintf(stderr, "Another test is already in progress.\n");
1967 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
1968 						 spdk_strerror(-EINPROGRESS));
1969 		return;
1970 	}
1971 	g_request = request;
1972 
1973 	/* Only construct job configs at the first test run.  */
1974 	if (TAILQ_EMPTY(&job_config_list)) {
1975 		bdevperf_construct_job_configs();
1976 	} else {
1977 		bdevperf_construct_jobs();
1978 	}
1979 }
1980 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
1981 
1982 static void
1983 _bdevperf_job_drain(void *ctx)
1984 {
1985 	bdevperf_job_drain(ctx);
1986 }
1987 
1988 static void
1989 spdk_bdevperf_shutdown_cb(void)
1990 {
1991 	g_shutdown = true;
1992 	struct bdevperf_job *job, *tmp;
1993 
1994 	if (g_bdevperf.running_jobs == 0) {
1995 		bdevperf_test_done(NULL);
1996 		return;
1997 	}
1998 
1999 	/* Iterate jobs to stop all I/O */
2000 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2001 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2002 	}
2003 }
2004 
2005 static int
2006 bdevperf_parse_arg(int ch, char *arg)
2007 {
2008 	long long tmp;
2009 
2010 	if (ch == 'w') {
2011 		g_workload_type = optarg;
2012 	} else if (ch == 'T') {
2013 		g_job_bdev_name = optarg;
2014 	} else if (ch == 'z') {
2015 		g_wait_for_tests = true;
2016 	} else if (ch == 'Z') {
2017 		g_zcopy = true;
2018 	} else if (ch == 'X') {
2019 		g_abort = true;
2020 	} else if (ch == 'C') {
2021 		g_multithread_mode = true;
2022 	} else if (ch == 'f') {
2023 		g_continue_on_failure = true;
2024 	} else if (ch == 'j') {
2025 		g_bdevperf_conf_file = optarg;
2026 	} else if (ch == 'F') {
2027 		char *endptr;
2028 
2029 		errno = 0;
2030 		g_zipf_theta = strtod(optarg, &endptr);
2031 		if (errno || optarg == endptr || g_zipf_theta < 0) {
2032 			fprintf(stderr, "Illegal zipf theta value %s\n", optarg);
2033 			return -EINVAL;
2034 		}
2035 	} else {
2036 		tmp = spdk_strtoll(optarg, 10);
2037 		if (tmp < 0) {
2038 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2039 			return tmp;
2040 		} else if (tmp >= INT_MAX) {
2041 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2042 			return -ERANGE;
2043 		}
2044 
2045 		switch (ch) {
2046 		case 'q':
2047 			g_queue_depth = tmp;
2048 			break;
2049 		case 'o':
2050 			g_io_size = tmp;
2051 			break;
2052 		case 't':
2053 			g_time_in_sec = tmp;
2054 			break;
2055 		case 'k':
2056 			g_timeout_in_sec = tmp;
2057 			break;
2058 		case 'M':
2059 			g_rw_percentage = tmp;
2060 			g_mix_specified = true;
2061 			break;
2062 		case 'P':
2063 			g_show_performance_ema_period = tmp;
2064 			break;
2065 		case 'S':
2066 			g_show_performance_real_time = 1;
2067 			g_show_performance_period_in_usec = tmp * 1000000;
2068 			break;
2069 		default:
2070 			return -EINVAL;
2071 		}
2072 	}
2073 	return 0;
2074 }
2075 
2076 static void
2077 bdevperf_usage(void)
2078 {
2079 	printf(" -q <depth>                io depth\n");
2080 	printf(" -o <size>                 io size in bytes\n");
2081 	printf(" -w <type>                 io pattern type, must be one of (read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
2082 	printf(" -t <time>                 time in seconds\n");
2083 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2084 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2085 	printf(" -P <num>                  number of moving average period\n");
2086 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2087 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2088 	printf("\t\t(only valid with -S)\n");
2089 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2090 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2091 	printf(" -f                        continue processing I/O even after failures\n");
2092 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2093 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2094 	printf(" -z                        start bdevperf, but wait for RPC to start tests\n");
2095 	printf(" -X                        abort timed out I/O\n");
2096 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2097 	printf(" -j <filename>             use job config file\n");
2098 }
2099 
2100 static int
2101 verify_test_params(struct spdk_app_opts *opts)
2102 {
2103 	/* When RPC is used for starting tests and
2104 	 * no rpc_addr was configured for the app,
2105 	 * use the default address. */
2106 	if (g_wait_for_tests && opts->rpc_addr == NULL) {
2107 		opts->rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2108 	}
2109 
2110 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2111 		goto out;
2112 	}
2113 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2114 		goto out;
2115 	}
2116 	if (!g_bdevperf_conf_file && !g_workload_type) {
2117 		goto out;
2118 	}
2119 	if (g_time_in_sec <= 0) {
2120 		goto out;
2121 	}
2122 	g_time_in_usec = g_time_in_sec * 1000000LL;
2123 
2124 	if (g_timeout_in_sec < 0) {
2125 		goto out;
2126 	}
2127 
2128 	if (g_abort && !g_timeout_in_sec) {
2129 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2130 	}
2131 
2132 	if (g_show_performance_ema_period > 0 &&
2133 	    g_show_performance_real_time == 0) {
2134 		fprintf(stderr, "-P option must be specified with -S option\n");
2135 		return 1;
2136 	}
2137 
2138 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2139 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2140 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2141 		printf("Zero copy mechanism will not be used.\n");
2142 		g_zcopy = false;
2143 	}
2144 
2145 	if (g_bdevperf_conf_file) {
2146 		/* workload_type verification happens during config file parsing */
2147 		return 0;
2148 	}
2149 
2150 	if (!strcmp(g_workload_type, "verify") ||
2151 	    !strcmp(g_workload_type, "reset")) {
2152 		g_rw_percentage = 50;
2153 		if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2154 			fprintf(stderr, "Unable to exceed max I/O size of %d for verify. (%d provided).\n",
2155 				SPDK_BDEV_LARGE_BUF_MAX_SIZE, g_io_size);
2156 			return 1;
2157 		}
2158 		g_verify = true;
2159 		if (!strcmp(g_workload_type, "reset")) {
2160 			g_reset = true;
2161 		}
2162 	}
2163 
2164 	if (!strcmp(g_workload_type, "read") ||
2165 	    !strcmp(g_workload_type, "randread") ||
2166 	    !strcmp(g_workload_type, "write") ||
2167 	    !strcmp(g_workload_type, "randwrite") ||
2168 	    !strcmp(g_workload_type, "verify") ||
2169 	    !strcmp(g_workload_type, "reset") ||
2170 	    !strcmp(g_workload_type, "unmap") ||
2171 	    !strcmp(g_workload_type, "write_zeroes") ||
2172 	    !strcmp(g_workload_type, "flush")) {
2173 		if (g_mix_specified) {
2174 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2175 				" only when using rw or randrw.\n");
2176 		}
2177 	}
2178 
2179 	if (!strcmp(g_workload_type, "rw") ||
2180 	    !strcmp(g_workload_type, "randrw")) {
2181 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2182 			fprintf(stderr,
2183 				"-M must be specified to value from 0 to 100 "
2184 				"for rw or randrw.\n");
2185 			return 1;
2186 		}
2187 	}
2188 
2189 	return 0;
2190 out:
2191 	spdk_app_usage();
2192 	bdevperf_usage();
2193 	return 1;
2194 }
2195 
2196 int
2197 main(int argc, char **argv)
2198 {
2199 	struct spdk_app_opts opts = {};
2200 	int rc;
2201 
2202 	/* Use the runtime PID to set the random seed */
2203 	srand(getpid());
2204 
2205 	spdk_app_opts_init(&opts, sizeof(opts));
2206 	opts.name = "bdevperf";
2207 	opts.rpc_addr = NULL;
2208 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2209 
2210 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CF:M:P:S:T:Xj:", NULL,
2211 				      bdevperf_parse_arg, bdevperf_usage)) !=
2212 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2213 		return rc;
2214 	}
2215 
2216 	if (read_job_config()) {
2217 		free_job_config();
2218 		return 1;
2219 	}
2220 
2221 	if (verify_test_params(&opts) != 0) {
2222 		free_job_config();
2223 		exit(1);
2224 	}
2225 
2226 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
2227 
2228 	spdk_app_fini();
2229 	free_job_config();
2230 	return rc;
2231 }
2232