xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision b45556e2b27d951c8d4271f03ff5a4199ce5936b)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 
23 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
24 #define BDEVPERF_CONFIG_UNDEFINED -1
25 #define BDEVPERF_CONFIG_ERROR -2
26 
27 struct bdevperf_task {
28 	struct iovec			iov;
29 	struct bdevperf_job		*job;
30 	struct spdk_bdev_io		*bdev_io;
31 	void				*buf;
32 	void				*md_buf;
33 	uint64_t			offset_blocks;
34 	struct bdevperf_task		*task_to_abort;
35 	enum spdk_bdev_io_type		io_type;
36 	TAILQ_ENTRY(bdevperf_task)	link;
37 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
38 };
39 
40 static const char *g_workload_type = NULL;
41 static int g_io_size = 0;
42 /* initialize to invalid value so we can detect if user overrides it. */
43 static int g_rw_percentage = -1;
44 static bool g_verify = false;
45 static bool g_reset = false;
46 static bool g_continue_on_failure = false;
47 static bool g_abort = false;
48 static bool g_error_to_exit = false;
49 static int g_queue_depth = 0;
50 static uint64_t g_time_in_usec;
51 static int g_show_performance_real_time = 0;
52 static uint64_t g_show_performance_period_in_usec = 1000000;
53 static uint64_t g_show_performance_period_num = 0;
54 static uint64_t g_show_performance_ema_period = 0;
55 static int g_run_rc = 0;
56 static bool g_shutdown = false;
57 static uint64_t g_start_tsc;
58 static uint64_t g_shutdown_tsc;
59 static bool g_zcopy = false;
60 static struct spdk_thread *g_main_thread;
61 static int g_time_in_sec = 0;
62 static bool g_mix_specified = false;
63 static const char *g_job_bdev_name;
64 static bool g_wait_for_tests = false;
65 static struct spdk_jsonrpc_request *g_request = NULL;
66 static bool g_multithread_mode = false;
67 static int g_timeout_in_sec;
68 static struct spdk_conf *g_bdevperf_conf = NULL;
69 static const char *g_bdevperf_conf_file = NULL;
70 static double g_zipf_theta;
71 
72 static struct spdk_cpuset g_all_cpuset;
73 static struct spdk_poller *g_perf_timer = NULL;
74 
75 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
76 static void rpc_perform_tests_cb(void);
77 
78 struct bdevperf_job {
79 	char				*name;
80 	struct spdk_bdev		*bdev;
81 	struct spdk_bdev_desc		*bdev_desc;
82 	struct spdk_io_channel		*ch;
83 	TAILQ_ENTRY(bdevperf_job)	link;
84 	struct spdk_thread		*thread;
85 
86 	const char			*workload_type;
87 	int				io_size;
88 	int				rw_percentage;
89 	bool				is_random;
90 	bool				verify;
91 	bool				reset;
92 	bool				continue_on_failure;
93 	bool				unmap;
94 	bool				write_zeroes;
95 	bool				flush;
96 	bool				abort;
97 	int				queue_depth;
98 	unsigned int			seed;
99 
100 	uint64_t			io_completed;
101 	uint64_t			io_failed;
102 	uint64_t			io_timeout;
103 	uint64_t			prev_io_completed;
104 	double				ema_io_per_second;
105 	int				current_queue_depth;
106 	uint64_t			size_in_ios;
107 	uint64_t			ios_base;
108 	uint64_t			offset_in_ios;
109 	uint64_t			io_size_blocks;
110 	uint64_t			buf_size;
111 	uint32_t			dif_check_flags;
112 	bool				is_draining;
113 	struct spdk_poller		*run_timer;
114 	struct spdk_poller		*reset_timer;
115 	struct spdk_bit_array		*outstanding;
116 	struct spdk_zipf		*zipf;
117 	TAILQ_HEAD(, bdevperf_task)	task_list;
118 	uint64_t			run_time_in_usec;
119 };
120 
121 struct spdk_bdevperf {
122 	TAILQ_HEAD(, bdevperf_job)	jobs;
123 	uint32_t			running_jobs;
124 };
125 
126 static struct spdk_bdevperf g_bdevperf = {
127 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
128 	.running_jobs = 0,
129 };
130 
131 enum job_config_rw {
132 	JOB_CONFIG_RW_READ = 0,
133 	JOB_CONFIG_RW_WRITE,
134 	JOB_CONFIG_RW_RANDREAD,
135 	JOB_CONFIG_RW_RANDWRITE,
136 	JOB_CONFIG_RW_RW,
137 	JOB_CONFIG_RW_RANDRW,
138 	JOB_CONFIG_RW_VERIFY,
139 	JOB_CONFIG_RW_RESET,
140 	JOB_CONFIG_RW_UNMAP,
141 	JOB_CONFIG_RW_FLUSH,
142 	JOB_CONFIG_RW_WRITE_ZEROES,
143 };
144 
145 /* Storing values from a section of job config file */
146 struct job_config {
147 	const char			*name;
148 	const char			*filename;
149 	struct spdk_cpuset		cpumask;
150 	int				bs;
151 	int				iodepth;
152 	int				rwmixread;
153 	int64_t				offset;
154 	uint64_t			length;
155 	enum job_config_rw		rw;
156 	TAILQ_ENTRY(job_config)	link;
157 };
158 
159 TAILQ_HEAD(, job_config) job_config_list
160 	= TAILQ_HEAD_INITIALIZER(job_config_list);
161 
162 static bool g_performance_dump_active = false;
163 
164 struct bdevperf_aggregate_stats {
165 	struct bdevperf_job		*current_job;
166 	uint64_t			io_time_in_usec;
167 	uint64_t			ema_period;
168 	double				total_io_per_second;
169 	double				total_mb_per_second;
170 	double				total_failed_per_second;
171 	double				total_timeout_per_second;
172 };
173 
174 static struct bdevperf_aggregate_stats g_stats = {};
175 
176 /*
177  * Cumulative Moving Average (CMA): average of all data up to current
178  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
179  * Simple Moving Average (SMA): unweighted mean of the previous n data
180  *
181  * Bdevperf supports CMA and EMA.
182  */
183 static double
184 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
185 {
186 	return (double)job->io_completed * 1000000 / io_time_in_usec;
187 }
188 
189 static double
190 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
191 {
192 	double io_completed, io_per_second;
193 
194 	io_completed = job->io_completed;
195 	io_per_second = (double)(io_completed - job->prev_io_completed) * 1000000
196 			/ g_show_performance_period_in_usec;
197 	job->prev_io_completed = io_completed;
198 
199 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
200 				  / (ema_period + 1);
201 	return job->ema_io_per_second;
202 }
203 
204 static void
205 performance_dump_job(struct bdevperf_aggregate_stats *stats, struct bdevperf_job *job)
206 {
207 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
208 	uint64_t time_in_usec;
209 
210 	printf("\r Job: %s (Core Mask 0x%s)\n", spdk_thread_get_name(job->thread),
211 	       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
212 
213 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
214 		printf("\r Job: %s ended in about %.2f seconds with error\n",
215 		       spdk_thread_get_name(job->thread), (double)job->run_time_in_usec / 1000000);
216 	}
217 	if (job->verify) {
218 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
219 		       job->ios_base, job->size_in_ios);
220 	}
221 
222 	if (g_performance_dump_active == true) {
223 		/* Use job's actual run time as Job has ended */
224 		if (job->io_failed > 0 && !job->continue_on_failure) {
225 			time_in_usec = job->run_time_in_usec;
226 		} else {
227 			time_in_usec = stats->io_time_in_usec;
228 		}
229 	} else {
230 		time_in_usec = job->run_time_in_usec;
231 	}
232 
233 	if (stats->ema_period == 0) {
234 		io_per_second = get_cma_io_per_second(job, time_in_usec);
235 	} else {
236 		io_per_second = get_ema_io_per_second(job, stats->ema_period);
237 	}
238 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
239 
240 	failed_per_second = (double)job->io_failed * 1000000 / time_in_usec;
241 	timeout_per_second = (double)job->io_timeout * 1000000 / time_in_usec;
242 
243 	printf("\t %-20s: %10.2f %10.2f %10.2f",
244 	       job->name, (float)time_in_usec / 1000000, io_per_second, mb_per_second);
245 	printf(" %10.2f %8.2f\n",
246 	       failed_per_second, timeout_per_second);
247 
248 	stats->total_io_per_second += io_per_second;
249 	stats->total_mb_per_second += mb_per_second;
250 	stats->total_failed_per_second += failed_per_second;
251 	stats->total_timeout_per_second += timeout_per_second;
252 }
253 
254 static void
255 generate_data(void *buf, int buf_len, int block_size, void *md_buf, int md_size,
256 	      int num_blocks)
257 {
258 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
259 
260 	if (buf_len < num_blocks * block_size) {
261 		return;
262 	}
263 
264 	if (md_buf == NULL) {
265 		data_block_size = block_size - md_size;
266 		md_buf = (char *)buf + data_block_size;
267 		md_offset = block_size;
268 	} else {
269 		data_block_size = block_size;
270 		md_offset = md_size;
271 	}
272 
273 	while (offset_blocks < num_blocks) {
274 		inner_offset = 0;
275 		while (inner_offset < data_block_size) {
276 			*(uint32_t *)buf = offset_blocks + inner_offset;
277 			inner_offset += sizeof(uint32_t);
278 			buf += sizeof(uint32_t);
279 		}
280 		memset(md_buf, offset_blocks, md_size);
281 		md_buf += md_offset;
282 		offset_blocks++;
283 	}
284 }
285 
286 static bool
287 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
288 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
289 {
290 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
291 		return false;
292 	}
293 
294 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
295 
296 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
297 
298 	if (wr_md_buf != NULL) {
299 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
300 	}
301 
302 	return true;
303 }
304 
305 static bool
306 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
307 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
308 {
309 	int offset_blocks = 0, md_offset, data_block_size;
310 
311 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
312 		return false;
313 	}
314 
315 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
316 
317 	if (wr_md_buf == NULL) {
318 		data_block_size = block_size - md_size;
319 		wr_md_buf = (char *)wr_buf + data_block_size;
320 		rd_md_buf = (char *)rd_buf + data_block_size;
321 		md_offset = block_size;
322 	} else {
323 		data_block_size = block_size;
324 		md_offset = md_size;
325 	}
326 
327 	while (offset_blocks < num_blocks) {
328 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
329 			return false;
330 		}
331 
332 		wr_buf += block_size;
333 		rd_buf += block_size;
334 
335 		if (md_check) {
336 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
337 				return false;
338 			}
339 
340 			wr_md_buf += md_offset;
341 			rd_md_buf += md_offset;
342 		}
343 
344 		offset_blocks++;
345 	}
346 
347 	return true;
348 }
349 
350 static void
351 free_job_config(void)
352 {
353 	struct job_config *config, *tmp;
354 
355 	spdk_conf_free(g_bdevperf_conf);
356 	g_bdevperf_conf = NULL;
357 
358 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
359 		TAILQ_REMOVE(&job_config_list, config, link);
360 		free(config);
361 	}
362 }
363 
364 static void
365 bdevperf_test_done(void *ctx)
366 {
367 	struct bdevperf_job *job, *jtmp;
368 	struct bdevperf_task *task, *ttmp;
369 	int rc;
370 	uint64_t time_in_usec;
371 
372 	if (g_time_in_usec) {
373 		g_stats.io_time_in_usec = g_time_in_usec;
374 
375 		if (!g_run_rc && g_performance_dump_active) {
376 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
377 			return;
378 		}
379 	}
380 
381 	if (g_show_performance_real_time) {
382 		spdk_poller_unregister(&g_perf_timer);
383 	}
384 
385 	if (g_shutdown) {
386 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
387 		time_in_usec = g_shutdown_tsc * 1000000 / spdk_get_ticks_hz();
388 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
389 		printf("Received shutdown signal, test time was about %.6f seconds\n",
390 		       (double)g_time_in_usec / 1000000);
391 	}
392 
393 	printf("\n\r %-*s: %10s %10s %10s %10s %8s\n",
394 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s");
395 
396 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
397 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
398 
399 		performance_dump_job(&g_stats, job);
400 
401 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
402 			TAILQ_REMOVE(&job->task_list, task, link);
403 			spdk_free(task->buf);
404 			spdk_free(task->md_buf);
405 			free(task);
406 		}
407 
408 		if (job->verify) {
409 			spdk_bit_array_free(&job->outstanding);
410 		}
411 		spdk_zipf_free(&job->zipf);
412 		free(job->name);
413 		free(job);
414 	}
415 
416 	printf("\r ==================================================================================\n");
417 	printf("\r %-28s: %10s %10.2f %10.2f",
418 	       "Total", "", g_stats.total_io_per_second, g_stats.total_mb_per_second);
419 	printf(" %10.2f %8.2f\n",
420 	       g_stats.total_failed_per_second, g_stats.total_timeout_per_second);
421 	fflush(stdout);
422 
423 	rc = g_run_rc;
424 	if (g_request && !g_shutdown) {
425 		rpc_perform_tests_cb();
426 		if (rc != 0) {
427 			spdk_app_stop(rc);
428 		}
429 	} else {
430 		spdk_app_stop(rc);
431 	}
432 }
433 
434 static void
435 bdevperf_job_end(void *ctx)
436 {
437 	assert(g_main_thread == spdk_get_thread());
438 
439 	if (--g_bdevperf.running_jobs == 0) {
440 		bdevperf_test_done(NULL);
441 	}
442 }
443 
444 static void
445 bdevperf_end_task(struct bdevperf_task *task)
446 {
447 	struct bdevperf_job     *job = task->job;
448 	uint64_t		end_tsc = 0;
449 
450 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
451 	if (job->is_draining) {
452 		if (job->current_queue_depth == 0) {
453 			end_tsc = spdk_get_ticks() - g_start_tsc;
454 			job->run_time_in_usec = end_tsc * 1000000 / spdk_get_ticks_hz();
455 			spdk_put_io_channel(job->ch);
456 			spdk_bdev_close(job->bdev_desc);
457 			spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
458 		}
459 	}
460 }
461 
462 static void
463 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
464 {
465 	struct bdevperf_job	*job = task->job;
466 
467 	task->bdev_io_wait.bdev = job->bdev;
468 	task->bdev_io_wait.cb_fn = cb_fn;
469 	task->bdev_io_wait.cb_arg = task;
470 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
471 }
472 
473 static int
474 bdevperf_job_drain(void *ctx)
475 {
476 	struct bdevperf_job *job = ctx;
477 
478 	spdk_poller_unregister(&job->run_timer);
479 	if (job->reset) {
480 		spdk_poller_unregister(&job->reset_timer);
481 	}
482 
483 	job->is_draining = true;
484 
485 	return -1;
486 }
487 
488 static void
489 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
490 {
491 	struct bdevperf_task	*task = cb_arg;
492 	struct bdevperf_job	*job = task->job;
493 
494 	job->current_queue_depth--;
495 
496 	if (success) {
497 		job->io_completed++;
498 	} else {
499 		job->io_failed++;
500 		if (!job->continue_on_failure) {
501 			bdevperf_job_drain(job);
502 			g_run_rc = -1;
503 		}
504 	}
505 
506 	spdk_bdev_free_io(bdev_io);
507 	bdevperf_end_task(task);
508 }
509 
510 static int
511 bdevperf_verify_dif(struct bdevperf_task *task, struct iovec *iovs, int iovcnt)
512 {
513 	struct bdevperf_job	*job = task->job;
514 	struct spdk_bdev	*bdev = job->bdev;
515 	struct spdk_dif_ctx	dif_ctx;
516 	struct spdk_dif_error	err_blk = {};
517 	int			rc;
518 
519 	rc = spdk_dif_ctx_init(&dif_ctx,
520 			       spdk_bdev_get_block_size(bdev),
521 			       spdk_bdev_get_md_size(bdev),
522 			       spdk_bdev_is_md_interleaved(bdev),
523 			       spdk_bdev_is_dif_head_of_md(bdev),
524 			       spdk_bdev_get_dif_type(bdev),
525 			       job->dif_check_flags,
526 			       task->offset_blocks, 0, 0, 0, 0);
527 	if (rc != 0) {
528 		fprintf(stderr, "Initialization of DIF context failed\n");
529 		return rc;
530 	}
531 
532 	if (spdk_bdev_is_md_interleaved(bdev)) {
533 		rc = spdk_dif_verify(iovs, iovcnt, job->io_size_blocks, &dif_ctx, &err_blk);
534 	} else {
535 		struct iovec md_iov = {
536 			.iov_base	= task->md_buf,
537 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
538 		};
539 
540 		rc = spdk_dix_verify(iovs, iovcnt, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
541 	}
542 
543 	if (rc != 0) {
544 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
545 			err_blk.err_type, err_blk.err_offset);
546 	}
547 
548 	return rc;
549 }
550 
551 static void
552 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
553 {
554 	struct bdevperf_job	*job;
555 	struct bdevperf_task	*task = cb_arg;
556 	struct iovec		*iovs;
557 	int			iovcnt;
558 	bool			md_check;
559 	uint64_t		offset_in_ios;
560 	int			rc;
561 
562 	job = task->job;
563 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
564 
565 	if (g_error_to_exit == true) {
566 		bdevperf_job_drain(job);
567 	} else if (!success) {
568 		if (!job->reset && !job->continue_on_failure) {
569 			bdevperf_job_drain(job);
570 			g_run_rc = -1;
571 			g_error_to_exit = true;
572 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
573 			       task->offset_blocks, job->name);
574 		}
575 	} else if (job->verify || job->reset) {
576 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
577 		assert(iovcnt == 1);
578 		assert(iovs != NULL);
579 		if (!verify_data(task->buf, job->buf_size, iovs[0].iov_base, iovs[0].iov_len,
580 				 spdk_bdev_get_block_size(job->bdev),
581 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
582 				 spdk_bdev_get_md_size(job->bdev),
583 				 job->io_size_blocks, md_check)) {
584 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
585 			printf("   First dword expected 0x%x got 0x%x\n", *(int *)task->buf, *(int *)iovs[0].iov_base);
586 			bdevperf_job_drain(job);
587 			g_run_rc = -1;
588 		}
589 	} else if (job->dif_check_flags != 0) {
590 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
591 			spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
592 			assert(iovcnt == 1);
593 			assert(iovs != NULL);
594 			rc = bdevperf_verify_dif(task, iovs, iovcnt);
595 			if (rc != 0) {
596 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
597 				       task->offset_blocks, job->name);
598 
599 				success = false;
600 				if (!job->reset && !job->continue_on_failure) {
601 					bdevperf_job_drain(job);
602 					g_run_rc = -1;
603 					g_error_to_exit = true;
604 				}
605 			}
606 		}
607 	}
608 
609 	job->current_queue_depth--;
610 
611 	if (success) {
612 		job->io_completed++;
613 	} else {
614 		job->io_failed++;
615 	}
616 
617 	if (job->verify) {
618 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
619 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
620 
621 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
622 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
623 	}
624 
625 	spdk_bdev_free_io(bdev_io);
626 
627 	/*
628 	 * is_draining indicates when time has expired for the test run
629 	 * and we are just waiting for the previously submitted I/O
630 	 * to complete.  In this case, do not submit a new I/O to replace
631 	 * the one just completed.
632 	 */
633 	if (!job->is_draining) {
634 		bdevperf_submit_single(job, task);
635 	} else {
636 		bdevperf_end_task(task);
637 	}
638 }
639 
640 static void
641 bdevperf_verify_submit_read(void *cb_arg)
642 {
643 	struct bdevperf_job	*job;
644 	struct bdevperf_task	*task = cb_arg;
645 	int			rc;
646 
647 	job = task->job;
648 
649 	/* Read the data back in */
650 	rc = spdk_bdev_read_blocks_with_md(job->bdev_desc, job->ch, NULL, NULL,
651 					   task->offset_blocks, job->io_size_blocks,
652 					   bdevperf_complete, task);
653 
654 	if (rc == -ENOMEM) {
655 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
656 	} else if (rc != 0) {
657 		printf("Failed to submit read: %d\n", rc);
658 		bdevperf_job_drain(job);
659 		g_run_rc = rc;
660 	}
661 }
662 
663 static void
664 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
665 			       void *cb_arg)
666 {
667 	if (success) {
668 		spdk_bdev_free_io(bdev_io);
669 		bdevperf_verify_submit_read(cb_arg);
670 	} else {
671 		bdevperf_complete(bdev_io, success, cb_arg);
672 	}
673 }
674 
675 static void
676 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
677 {
678 	if (!success) {
679 		bdevperf_complete(bdev_io, success, cb_arg);
680 		return;
681 	}
682 
683 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
684 }
685 
686 static int
687 bdevperf_generate_dif(struct bdevperf_task *task)
688 {
689 	struct bdevperf_job	*job = task->job;
690 	struct spdk_bdev	*bdev = job->bdev;
691 	struct spdk_dif_ctx	dif_ctx;
692 	int			rc;
693 
694 	rc = spdk_dif_ctx_init(&dif_ctx,
695 			       spdk_bdev_get_block_size(bdev),
696 			       spdk_bdev_get_md_size(bdev),
697 			       spdk_bdev_is_md_interleaved(bdev),
698 			       spdk_bdev_is_dif_head_of_md(bdev),
699 			       spdk_bdev_get_dif_type(bdev),
700 			       job->dif_check_flags,
701 			       task->offset_blocks, 0, 0, 0, 0);
702 	if (rc != 0) {
703 		fprintf(stderr, "Initialization of DIF context failed\n");
704 		return rc;
705 	}
706 
707 	if (spdk_bdev_is_md_interleaved(bdev)) {
708 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
709 	} else {
710 		struct iovec md_iov = {
711 			.iov_base	= task->md_buf,
712 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
713 		};
714 
715 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
716 	}
717 
718 	if (rc != 0) {
719 		fprintf(stderr, "Generation of DIF/DIX failed\n");
720 	}
721 
722 	return rc;
723 }
724 
725 static void
726 bdevperf_submit_task(void *arg)
727 {
728 	struct bdevperf_task	*task = arg;
729 	struct bdevperf_job	*job = task->job;
730 	struct spdk_bdev_desc	*desc;
731 	struct spdk_io_channel	*ch;
732 	spdk_bdev_io_completion_cb cb_fn;
733 	uint64_t		offset_in_ios;
734 	int			rc = 0;
735 
736 	desc = job->bdev_desc;
737 	ch = job->ch;
738 
739 	switch (task->io_type) {
740 	case SPDK_BDEV_IO_TYPE_WRITE:
741 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
742 			rc = bdevperf_generate_dif(task);
743 		}
744 		if (rc == 0) {
745 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
746 
747 			if (g_zcopy) {
748 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
749 				return;
750 			} else {
751 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
752 								     task->md_buf,
753 								     task->offset_blocks,
754 								     job->io_size_blocks,
755 								     cb_fn, task);
756 			}
757 		}
758 		break;
759 	case SPDK_BDEV_IO_TYPE_FLUSH:
760 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
761 					    job->io_size_blocks, bdevperf_complete, task);
762 		break;
763 	case SPDK_BDEV_IO_TYPE_UNMAP:
764 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
765 					    job->io_size_blocks, bdevperf_complete, task);
766 		break;
767 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
768 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
769 						   job->io_size_blocks, bdevperf_complete, task);
770 		break;
771 	case SPDK_BDEV_IO_TYPE_READ:
772 		if (g_zcopy) {
773 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
774 						   true, bdevperf_zcopy_populate_complete, task);
775 		} else {
776 			rc = spdk_bdev_read_blocks_with_md(desc, ch, task->buf, task->md_buf,
777 							   task->offset_blocks,
778 							   job->io_size_blocks,
779 							   bdevperf_complete, task);
780 		}
781 		break;
782 	case SPDK_BDEV_IO_TYPE_ABORT:
783 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
784 		break;
785 	default:
786 		assert(false);
787 		rc = -EINVAL;
788 		break;
789 	}
790 
791 	if (rc == -ENOMEM) {
792 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
793 		return;
794 	} else if (rc != 0) {
795 		printf("Failed to submit bdev_io: %d\n", rc);
796 		if (job->verify) {
797 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
798 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
799 
800 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
801 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
802 		}
803 		bdevperf_job_drain(job);
804 		g_run_rc = rc;
805 		return;
806 	}
807 
808 	job->current_queue_depth++;
809 }
810 
811 static void
812 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
813 {
814 	struct bdevperf_task	*task = cb_arg;
815 	struct bdevperf_job	*job = task->job;
816 	struct iovec		*iovs;
817 	int			iovcnt;
818 
819 	if (!success) {
820 		bdevperf_job_drain(job);
821 		g_run_rc = -1;
822 		return;
823 	}
824 
825 	task->bdev_io = bdev_io;
826 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
827 
828 	if (job->verify || job->reset) {
829 		/* When job->verify or job->reset is enabled, task->buf is used for
830 		 *  verification of read after write.  For write I/O, when zcopy APIs
831 		 *  are used, task->buf cannot be used, and data must be written to
832 		 *  the data buffer allocated underneath bdev layer instead.
833 		 *  Hence we copy task->buf to the allocated data buffer here.
834 		 */
835 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
836 		assert(iovcnt == 1);
837 		assert(iovs != NULL);
838 
839 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
840 			  spdk_bdev_get_block_size(job->bdev),
841 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
842 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
843 	}
844 
845 	bdevperf_submit_task(task);
846 }
847 
848 static void
849 bdevperf_prep_zcopy_write_task(void *arg)
850 {
851 	struct bdevperf_task	*task = arg;
852 	struct bdevperf_job	*job = task->job;
853 	int			rc;
854 
855 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
856 				   task->offset_blocks, job->io_size_blocks,
857 				   false, bdevperf_zcopy_get_buf_complete, task);
858 	if (rc != 0) {
859 		assert(rc == -ENOMEM);
860 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
861 		return;
862 	}
863 
864 	job->current_queue_depth++;
865 }
866 
867 static struct bdevperf_task *
868 bdevperf_job_get_task(struct bdevperf_job *job)
869 {
870 	struct bdevperf_task *task;
871 
872 	task = TAILQ_FIRST(&job->task_list);
873 	if (!task) {
874 		printf("Task allocation failed\n");
875 		abort();
876 	}
877 
878 	TAILQ_REMOVE(&job->task_list, task, link);
879 	return task;
880 }
881 
882 static void
883 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
884 {
885 	uint64_t offset_in_ios;
886 
887 	if (job->zipf) {
888 		offset_in_ios = spdk_zipf_generate(job->zipf);
889 	} else if (job->is_random) {
890 		offset_in_ios = rand_r(&job->seed) % job->size_in_ios;
891 	} else {
892 		offset_in_ios = job->offset_in_ios++;
893 		if (job->offset_in_ios == job->size_in_ios) {
894 			job->offset_in_ios = 0;
895 		}
896 
897 		/* Increment of offset_in_ios if there's already an outstanding IO
898 		 * to that location. We only need this with job->verify as random
899 		 * offsets are not supported with job->verify at this time.
900 		 */
901 		if (job->verify) {
902 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
903 
904 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
905 				offset_in_ios = job->offset_in_ios++;
906 				if (job->offset_in_ios == job->size_in_ios) {
907 					job->offset_in_ios = 0;
908 				}
909 			}
910 			spdk_bit_array_set(job->outstanding, offset_in_ios);
911 		}
912 	}
913 
914 	/* For multi-thread to same job, offset_in_ios is relative
915 	 * to the LBA range assigned for that job. job->offset_blocks
916 	 * is absolute (entire bdev LBA range).
917 	 */
918 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
919 
920 	if (job->verify || job->reset) {
921 		generate_data(task->buf, job->buf_size,
922 			      spdk_bdev_get_block_size(job->bdev),
923 			      task->md_buf, spdk_bdev_get_md_size(job->bdev),
924 			      job->io_size_blocks);
925 		if (g_zcopy) {
926 			bdevperf_prep_zcopy_write_task(task);
927 			return;
928 		} else {
929 			task->iov.iov_base = task->buf;
930 			task->iov.iov_len = job->buf_size;
931 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
932 		}
933 	} else if (job->flush) {
934 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
935 	} else if (job->unmap) {
936 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
937 	} else if (job->write_zeroes) {
938 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
939 	} else if ((job->rw_percentage == 100) ||
940 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
941 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
942 	} else {
943 		if (g_zcopy) {
944 			bdevperf_prep_zcopy_write_task(task);
945 			return;
946 		} else {
947 			task->iov.iov_base = task->buf;
948 			task->iov.iov_len = job->buf_size;
949 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
950 		}
951 	}
952 
953 	bdevperf_submit_task(task);
954 }
955 
956 static int reset_job(void *arg);
957 
958 static void
959 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
960 {
961 	struct bdevperf_task	*task = cb_arg;
962 	struct bdevperf_job	*job = task->job;
963 
964 	if (!success) {
965 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
966 		bdevperf_job_drain(job);
967 		g_run_rc = -1;
968 	}
969 
970 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
971 	spdk_bdev_free_io(bdev_io);
972 
973 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
974 						10 * 1000000);
975 }
976 
977 static int
978 reset_job(void *arg)
979 {
980 	struct bdevperf_job *job = arg;
981 	struct bdevperf_task *task;
982 	int rc;
983 
984 	spdk_poller_unregister(&job->reset_timer);
985 
986 	/* Do reset. */
987 	task = bdevperf_job_get_task(job);
988 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
989 			     reset_cb, task);
990 	if (rc) {
991 		printf("Reset failed: %d\n", rc);
992 		bdevperf_job_drain(job);
993 		g_run_rc = -1;
994 	}
995 
996 	return -1;
997 }
998 
999 static void
1000 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1001 {
1002 	struct bdevperf_job *job = cb_arg;
1003 	struct bdevperf_task *task;
1004 
1005 	job->io_timeout++;
1006 
1007 	if (job->is_draining || !job->abort ||
1008 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1009 		return;
1010 	}
1011 
1012 	task = bdevperf_job_get_task(job);
1013 	if (task == NULL) {
1014 		return;
1015 	}
1016 
1017 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1018 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1019 
1020 	bdevperf_submit_task(task);
1021 }
1022 
1023 static void
1024 bdevperf_job_run(void *ctx)
1025 {
1026 	struct bdevperf_job *job = ctx;
1027 	struct bdevperf_task *task;
1028 	int i;
1029 
1030 	/* Submit initial I/O for this job. Each time one
1031 	 * completes, another will be submitted. */
1032 
1033 	/* Start a timer to stop this I/O chain when the run is over */
1034 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain, job, g_time_in_usec);
1035 	if (job->reset) {
1036 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1037 							10 * 1000000);
1038 	}
1039 
1040 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1041 
1042 	for (i = 0; i < job->queue_depth; i++) {
1043 		task = bdevperf_job_get_task(job);
1044 		bdevperf_submit_single(job, task);
1045 	}
1046 }
1047 
1048 static void
1049 _performance_dump_done(void *ctx)
1050 {
1051 	struct bdevperf_aggregate_stats *stats = ctx;
1052 
1053 	printf("\r ==================================================================================\n");
1054 	printf("\r %-28s: %10s %10.2f %10.2f",
1055 	       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1056 	printf(" %10.2f %8.2f\n",
1057 	       stats->total_failed_per_second, stats->total_timeout_per_second);
1058 	fflush(stdout);
1059 
1060 	g_performance_dump_active = false;
1061 
1062 	free(stats);
1063 }
1064 
1065 static void
1066 _performance_dump(void *ctx)
1067 {
1068 	struct bdevperf_aggregate_stats *stats = ctx;
1069 
1070 	performance_dump_job(stats, stats->current_job);
1071 
1072 	/* This assumes the jobs list is static after start up time.
1073 	 * That's true right now, but if that ever changed this would need a lock. */
1074 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1075 	if (stats->current_job == NULL) {
1076 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1077 	} else {
1078 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1079 	}
1080 }
1081 
1082 static int
1083 performance_statistics_thread(void *arg)
1084 {
1085 	struct bdevperf_aggregate_stats *stats;
1086 
1087 	if (g_performance_dump_active) {
1088 		return -1;
1089 	}
1090 
1091 	g_performance_dump_active = true;
1092 
1093 	stats = calloc(1, sizeof(*stats));
1094 	if (stats == NULL) {
1095 		return -1;
1096 	}
1097 
1098 	g_show_performance_period_num++;
1099 
1100 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1101 	stats->ema_period = g_show_performance_ema_period;
1102 
1103 	/* Iterate all of the jobs to gather stats
1104 	 * These jobs will not get removed here until a final performance dump is run,
1105 	 * so this should be safe without locking.
1106 	 */
1107 	stats->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1108 	if (stats->current_job == NULL) {
1109 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1110 	} else {
1111 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1112 	}
1113 
1114 	return -1;
1115 }
1116 
1117 static void
1118 bdevperf_test(void)
1119 {
1120 	struct bdevperf_job *job;
1121 
1122 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / 1000000);
1123 	fflush(stdout);
1124 
1125 	/* Start a timer to dump performance numbers */
1126 	g_start_tsc = spdk_get_ticks();
1127 	if (g_show_performance_real_time && !g_perf_timer) {
1128 		printf("\r %-*s: %10s %10s %10s %10s %8s\n",
1129 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s");
1130 
1131 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1132 						    g_show_performance_period_in_usec);
1133 	}
1134 
1135 	/* Iterate jobs to start all I/O */
1136 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1137 		g_bdevperf.running_jobs++;
1138 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1139 	}
1140 }
1141 
1142 static void
1143 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1144 {
1145 	struct bdevperf_job *job = event_ctx;
1146 
1147 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1148 		bdevperf_job_drain(job);
1149 	}
1150 }
1151 
1152 static uint32_t g_construct_job_count = 0;
1153 
1154 static void
1155 _bdevperf_construct_job_done(void *ctx)
1156 {
1157 	if (--g_construct_job_count == 0) {
1158 
1159 		if (g_run_rc != 0) {
1160 			/* Something failed. */
1161 			bdevperf_test_done(NULL);
1162 			return;
1163 		}
1164 
1165 		/* Ready to run the test */
1166 		bdevperf_test();
1167 	} else if (g_run_rc != 0) {
1168 		/* Reset error as some jobs constructed right */
1169 		g_run_rc = 0;
1170 		if (g_continue_on_failure == false) {
1171 			g_error_to_exit = true;
1172 		}
1173 	}
1174 }
1175 
1176 /* Checkformat will not allow to use inlined type,
1177    this is a workaround */
1178 typedef struct spdk_thread *spdk_thread_t;
1179 
1180 static spdk_thread_t
1181 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1182 {
1183 	struct spdk_cpuset tmp;
1184 
1185 	/* This function runs on the main thread. */
1186 	assert(g_main_thread == spdk_get_thread());
1187 
1188 	/* Handle default mask */
1189 	if (spdk_cpuset_count(cpumask) == 0) {
1190 		cpumask = &g_all_cpuset;
1191 	}
1192 
1193 	/* Warn user that mask might need to be changed */
1194 	spdk_cpuset_copy(&tmp, cpumask);
1195 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1196 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1197 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1198 	}
1199 
1200 	return spdk_thread_create(tag, cpumask);
1201 }
1202 
1203 static uint32_t
1204 _get_next_core(void)
1205 {
1206 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1207 
1208 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1209 		current_core = spdk_env_get_first_core();
1210 		return current_core;
1211 	}
1212 
1213 	current_core = spdk_env_get_next_core(current_core);
1214 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1215 		current_core = spdk_env_get_first_core();
1216 	}
1217 
1218 	return current_core;
1219 }
1220 
1221 static void
1222 _bdevperf_construct_job(void *ctx)
1223 {
1224 	struct bdevperf_job *job = ctx;
1225 	int rc;
1226 
1227 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1228 				&job->bdev_desc);
1229 	if (rc != 0) {
1230 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1231 		g_run_rc = -EINVAL;
1232 		goto end;
1233 	}
1234 
1235 	if (g_zcopy) {
1236 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1237 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1238 			g_run_rc = -ENOTSUP;
1239 			goto end;
1240 		}
1241 	}
1242 
1243 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1244 	if (!job->ch) {
1245 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1246 			    rc);
1247 		spdk_bdev_close(job->bdev_desc);
1248 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1249 		g_run_rc = -ENOMEM;
1250 		goto end;
1251 	}
1252 
1253 end:
1254 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1255 }
1256 
1257 static void
1258 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1259 {
1260 	switch (rw) {
1261 	case JOB_CONFIG_RW_READ:
1262 		job->rw_percentage = 100;
1263 		break;
1264 	case JOB_CONFIG_RW_WRITE:
1265 		job->rw_percentage = 0;
1266 		break;
1267 	case JOB_CONFIG_RW_RANDREAD:
1268 		job->is_random = true;
1269 		job->rw_percentage = 100;
1270 		job->seed = rand();
1271 		break;
1272 	case JOB_CONFIG_RW_RANDWRITE:
1273 		job->is_random = true;
1274 		job->rw_percentage = 0;
1275 		job->seed = rand();
1276 		break;
1277 	case JOB_CONFIG_RW_RW:
1278 		job->is_random = false;
1279 		break;
1280 	case JOB_CONFIG_RW_RANDRW:
1281 		job->is_random = true;
1282 		job->seed = rand();
1283 		break;
1284 	case JOB_CONFIG_RW_VERIFY:
1285 		job->verify = true;
1286 		job->rw_percentage = 50;
1287 		break;
1288 	case JOB_CONFIG_RW_RESET:
1289 		job->reset = true;
1290 		job->verify = true;
1291 		job->rw_percentage = 50;
1292 		break;
1293 	case JOB_CONFIG_RW_UNMAP:
1294 		job->unmap = true;
1295 		break;
1296 	case JOB_CONFIG_RW_FLUSH:
1297 		job->flush = true;
1298 		break;
1299 	case JOB_CONFIG_RW_WRITE_ZEROES:
1300 		job->write_zeroes = true;
1301 		break;
1302 	}
1303 }
1304 
1305 static int
1306 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1307 		       struct spdk_thread *thread)
1308 {
1309 	struct bdevperf_job *job;
1310 	struct bdevperf_task *task;
1311 	int block_size, data_block_size;
1312 	int rc;
1313 	int task_num, n;
1314 
1315 	block_size = spdk_bdev_get_block_size(bdev);
1316 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1317 
1318 	job = calloc(1, sizeof(struct bdevperf_job));
1319 	if (!job) {
1320 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1321 		return -ENOMEM;
1322 	}
1323 
1324 	job->name = strdup(spdk_bdev_get_name(bdev));
1325 	if (!job->name) {
1326 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1327 		free(job);
1328 		return -ENOMEM;
1329 	}
1330 
1331 	job->workload_type = g_workload_type;
1332 	job->io_size = config->bs;
1333 	job->rw_percentage = config->rwmixread;
1334 	job->continue_on_failure = g_continue_on_failure;
1335 	job->queue_depth = config->iodepth;
1336 	job->bdev = bdev;
1337 	job->io_size_blocks = job->io_size / data_block_size;
1338 	job->buf_size = job->io_size_blocks * block_size;
1339 	job->abort = g_abort;
1340 	job_init_rw(job, config->rw);
1341 
1342 	if ((job->io_size % data_block_size) != 0) {
1343 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1344 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1345 		free(job->name);
1346 		free(job);
1347 		return -ENOTSUP;
1348 	}
1349 
1350 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1351 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1352 		free(job->name);
1353 		free(job);
1354 		return -ENOTSUP;
1355 	}
1356 
1357 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1358 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1359 	}
1360 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1361 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1362 	}
1363 
1364 	job->offset_in_ios = 0;
1365 
1366 	if (config->length != 0) {
1367 		/* Use subset of disk */
1368 		job->size_in_ios = config->length / job->io_size_blocks;
1369 		job->ios_base = config->offset / job->io_size_blocks;
1370 	} else {
1371 		/* Use whole disk */
1372 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1373 		job->ios_base = 0;
1374 	}
1375 
1376 	if (job->is_random && g_zipf_theta > 0) {
1377 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1378 	}
1379 
1380 	if (job->verify) {
1381 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1382 		if (job->outstanding == NULL) {
1383 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1384 				    spdk_bdev_get_name(bdev));
1385 			free(job->name);
1386 			free(job);
1387 			return -ENOMEM;
1388 		}
1389 	}
1390 
1391 	TAILQ_INIT(&job->task_list);
1392 
1393 	task_num = job->queue_depth;
1394 	if (job->reset) {
1395 		task_num += 1;
1396 	}
1397 	if (job->abort) {
1398 		task_num += job->queue_depth;
1399 	}
1400 
1401 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1402 
1403 	for (n = 0; n < task_num; n++) {
1404 		task = calloc(1, sizeof(struct bdevperf_task));
1405 		if (!task) {
1406 			fprintf(stderr, "Failed to allocate task from memory\n");
1407 			return -ENOMEM;
1408 		}
1409 
1410 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1411 					 SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1412 		if (!task->buf) {
1413 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
1414 			free(task);
1415 			return -ENOMEM;
1416 		}
1417 
1418 		if (spdk_bdev_is_md_separate(job->bdev)) {
1419 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
1420 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
1421 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
1422 			if (!task->md_buf) {
1423 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
1424 				spdk_free(task->buf);
1425 				free(task);
1426 				return -ENOMEM;
1427 			}
1428 		}
1429 
1430 		task->job = job;
1431 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
1432 	}
1433 
1434 	job->thread = thread;
1435 
1436 	g_construct_job_count++;
1437 
1438 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
1439 	assert(rc == 0);
1440 
1441 	return rc;
1442 }
1443 
1444 static int
1445 parse_rw(const char *str, enum job_config_rw ret)
1446 {
1447 	if (str == NULL) {
1448 		return ret;
1449 	}
1450 
1451 	if (!strcmp(str, "read")) {
1452 		ret = JOB_CONFIG_RW_READ;
1453 	} else if (!strcmp(str, "randread")) {
1454 		ret = JOB_CONFIG_RW_RANDREAD;
1455 	} else if (!strcmp(str, "write")) {
1456 		ret = JOB_CONFIG_RW_WRITE;
1457 	} else if (!strcmp(str, "randwrite")) {
1458 		ret = JOB_CONFIG_RW_RANDWRITE;
1459 	} else if (!strcmp(str, "verify")) {
1460 		ret = JOB_CONFIG_RW_VERIFY;
1461 	} else if (!strcmp(str, "reset")) {
1462 		ret = JOB_CONFIG_RW_RESET;
1463 	} else if (!strcmp(str, "unmap")) {
1464 		ret = JOB_CONFIG_RW_UNMAP;
1465 	} else if (!strcmp(str, "write_zeroes")) {
1466 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
1467 	} else if (!strcmp(str, "flush")) {
1468 		ret = JOB_CONFIG_RW_FLUSH;
1469 	} else if (!strcmp(str, "rw")) {
1470 		ret = JOB_CONFIG_RW_RW;
1471 	} else if (!strcmp(str, "randrw")) {
1472 		ret = JOB_CONFIG_RW_RANDRW;
1473 	} else {
1474 		fprintf(stderr, "rw must be one of\n"
1475 			"(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
1476 		ret = BDEVPERF_CONFIG_ERROR;
1477 	}
1478 
1479 	return ret;
1480 }
1481 
1482 static const char *
1483 config_filename_next(const char *filename, char *out)
1484 {
1485 	int i, k;
1486 
1487 	if (filename == NULL) {
1488 		out[0] = '\0';
1489 		return NULL;
1490 	}
1491 
1492 	if (filename[0] == ':') {
1493 		filename++;
1494 	}
1495 
1496 	for (i = 0, k = 0;
1497 	     filename[i] != '\0' &&
1498 	     filename[i] != ':' &&
1499 	     i < BDEVPERF_CONFIG_MAX_FILENAME;
1500 	     i++) {
1501 		if (filename[i] == ' ' || filename[i] == '\t') {
1502 			continue;
1503 		}
1504 
1505 		out[k++] = filename[i];
1506 	}
1507 	out[k] = 0;
1508 
1509 	return filename + i;
1510 }
1511 
1512 static void
1513 bdevperf_construct_jobs(void)
1514 {
1515 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
1516 	struct spdk_thread *thread;
1517 	struct job_config *config;
1518 	struct spdk_bdev *bdev;
1519 	const char *filenames;
1520 	int rc;
1521 
1522 	TAILQ_FOREACH(config, &job_config_list, link) {
1523 		filenames = config->filename;
1524 
1525 		thread = construct_job_thread(&config->cpumask, config->name);
1526 		assert(thread);
1527 
1528 		while (filenames) {
1529 			filenames = config_filename_next(filenames, filename);
1530 			if (strlen(filename) == 0) {
1531 				break;
1532 			}
1533 
1534 			bdev = spdk_bdev_get_by_name(filename);
1535 			if (!bdev) {
1536 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
1537 				g_run_rc = -EINVAL;
1538 				return;
1539 			}
1540 
1541 			rc = bdevperf_construct_job(bdev, config, thread);
1542 			if (rc < 0) {
1543 				g_run_rc = rc;
1544 				return;
1545 			}
1546 		}
1547 	}
1548 }
1549 
1550 static int
1551 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
1552 {
1553 	struct job_config *config = calloc(1, sizeof(*config));
1554 
1555 	if (config == NULL) {
1556 		fprintf(stderr, "Unable to allocate memory for job config\n");
1557 		return -ENOMEM;
1558 	}
1559 
1560 	config->name = filename;
1561 	config->filename = filename;
1562 	spdk_cpuset_zero(&config->cpumask);
1563 	spdk_cpuset_set_cpu(&config->cpumask, _get_next_core(), true);
1564 	config->bs = g_io_size;
1565 	config->iodepth = g_queue_depth;
1566 	config->rwmixread = g_rw_percentage;
1567 	config->offset = offset;
1568 	config->length = range;
1569 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
1570 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
1571 		return -EINVAL;
1572 	}
1573 
1574 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
1575 	return 0;
1576 }
1577 
1578 static void
1579 bdevperf_construct_multithread_job_configs(void)
1580 {
1581 	struct spdk_bdev *bdev;
1582 	uint32_t i;
1583 	uint32_t num_cores;
1584 	uint64_t blocks_per_job;
1585 	int64_t offset;
1586 
1587 	num_cores = 0;
1588 	SPDK_ENV_FOREACH_CORE(i) {
1589 		num_cores++;
1590 	}
1591 
1592 	if (num_cores == 0) {
1593 		g_run_rc = -EINVAL;
1594 		return;
1595 	}
1596 
1597 	if (g_job_bdev_name != NULL) {
1598 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1599 		if (!bdev) {
1600 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1601 			return;
1602 		}
1603 
1604 		blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
1605 		offset = 0;
1606 
1607 		SPDK_ENV_FOREACH_CORE(i) {
1608 			g_run_rc = make_cli_job_config(g_job_bdev_name, offset, blocks_per_job);
1609 			if (g_run_rc) {
1610 				return;
1611 			}
1612 
1613 			offset += blocks_per_job;
1614 		}
1615 	} else {
1616 		bdev = spdk_bdev_first_leaf();
1617 		while (bdev != NULL) {
1618 			blocks_per_job = spdk_bdev_get_num_blocks(bdev) / num_cores;
1619 			offset = 0;
1620 
1621 			SPDK_ENV_FOREACH_CORE(i) {
1622 				g_run_rc = make_cli_job_config(spdk_bdev_get_name(bdev),
1623 							       offset, blocks_per_job);
1624 				if (g_run_rc) {
1625 					return;
1626 				}
1627 
1628 				offset += blocks_per_job;
1629 			}
1630 
1631 			bdev = spdk_bdev_next_leaf(bdev);
1632 		}
1633 	}
1634 }
1635 
1636 static void
1637 bdevperf_construct_job_configs(void)
1638 {
1639 	struct spdk_bdev *bdev;
1640 
1641 	/* There are three different modes for allocating jobs. Standard mode
1642 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
1643 	 *
1644 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
1645 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
1646 	 * This runs multiple threads per bdev, effectively.
1647 	 *
1648 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
1649 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
1650 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
1651 	 * which is different from other modes in that they only support global options.
1652 	 */
1653 
1654 	if (g_bdevperf_conf) {
1655 		goto end;
1656 	} else if (g_multithread_mode) {
1657 		bdevperf_construct_multithread_job_configs();
1658 		goto end;
1659 	}
1660 
1661 	if (g_job_bdev_name != NULL) {
1662 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1663 		if (bdev) {
1664 			/* Construct the job */
1665 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
1666 		} else {
1667 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1668 		}
1669 	} else {
1670 		bdev = spdk_bdev_first_leaf();
1671 
1672 		while (bdev != NULL) {
1673 			/* Construct the job */
1674 			g_run_rc = make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
1675 			if (g_run_rc) {
1676 				break;
1677 			}
1678 
1679 			bdev = spdk_bdev_next_leaf(bdev);
1680 		}
1681 	}
1682 
1683 end:
1684 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
1685 	 * of iteration.
1686 	 */
1687 	g_construct_job_count = 1;
1688 
1689 	if (g_run_rc == 0) {
1690 		bdevperf_construct_jobs();
1691 	}
1692 
1693 	_bdevperf_construct_job_done(NULL);
1694 }
1695 
1696 static int
1697 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
1698 {
1699 	const char *job_name;
1700 	int tmp;
1701 
1702 	tmp = spdk_conf_section_get_intval(s, name);
1703 	if (tmp == -1) {
1704 		/* Field was not found. Check default value
1705 		 * In [global] section it is ok to have undefined values
1706 		 * but for other sections it is not ok */
1707 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
1708 			job_name = spdk_conf_section_get_name(s);
1709 			if (strcmp(job_name, "global") == 0) {
1710 				return def;
1711 			}
1712 
1713 			fprintf(stderr,
1714 				"Job '%s' has no '%s' assigned\n",
1715 				job_name, name);
1716 			return BDEVPERF_CONFIG_ERROR;
1717 		}
1718 		return def;
1719 	}
1720 
1721 	/* NOTE: get_intval returns nonnegative on success */
1722 	if (tmp < 0) {
1723 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
1724 			spdk_conf_section_get_name(s), name);
1725 		return BDEVPERF_CONFIG_ERROR;
1726 	}
1727 
1728 	return tmp;
1729 }
1730 
1731 /* CLI arguments override parameters for global sections */
1732 static void
1733 config_set_cli_args(struct job_config *config)
1734 {
1735 	if (g_job_bdev_name) {
1736 		config->filename = g_job_bdev_name;
1737 	}
1738 	if (g_io_size > 0) {
1739 		config->bs = g_io_size;
1740 	}
1741 	if (g_queue_depth > 0) {
1742 		config->iodepth = g_queue_depth;
1743 	}
1744 	if (g_rw_percentage > 0) {
1745 		config->rwmixread = g_rw_percentage;
1746 	}
1747 	if (g_workload_type) {
1748 		config->rw = parse_rw(g_workload_type, config->rw);
1749 	}
1750 }
1751 
1752 static int
1753 read_job_config(void)
1754 {
1755 	struct job_config global_default_config;
1756 	struct job_config global_config;
1757 	struct spdk_conf_section *s;
1758 	struct job_config *config;
1759 	const char *cpumask;
1760 	const char *rw;
1761 	bool is_global;
1762 	int n = 0;
1763 	int val;
1764 
1765 	if (g_bdevperf_conf_file == NULL) {
1766 		return 0;
1767 	}
1768 
1769 	g_bdevperf_conf = spdk_conf_allocate();
1770 	if (g_bdevperf_conf == NULL) {
1771 		fprintf(stderr, "Could not allocate job config structure\n");
1772 		return 1;
1773 	}
1774 
1775 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
1776 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
1777 		fprintf(stderr, "Invalid job config");
1778 		return 1;
1779 	}
1780 
1781 	/* Initialize global defaults */
1782 	global_default_config.filename = NULL;
1783 	/* Zero mask is the same as g_all_cpuset
1784 	 * The g_all_cpuset is not initialized yet,
1785 	 * so use zero mask as the default instead */
1786 	spdk_cpuset_zero(&global_default_config.cpumask);
1787 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
1788 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
1789 	/* bdevperf has no default for -M option but in FIO the default is 50 */
1790 	global_default_config.rwmixread = 50;
1791 	global_default_config.offset = 0;
1792 	/* length 0 means 100% */
1793 	global_default_config.length = 0;
1794 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
1795 	config_set_cli_args(&global_default_config);
1796 
1797 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
1798 		return 1;
1799 	}
1800 
1801 	/* There is only a single instance of global job_config
1802 	 * We just reset its value when we encounter new [global] section */
1803 	global_config = global_default_config;
1804 
1805 	for (s = spdk_conf_first_section(g_bdevperf_conf);
1806 	     s != NULL;
1807 	     s = spdk_conf_next_section(s)) {
1808 		config = calloc(1, sizeof(*config));
1809 		if (config == NULL) {
1810 			fprintf(stderr, "Unable to allocate memory for job config\n");
1811 			return 1;
1812 		}
1813 
1814 		config->name = spdk_conf_section_get_name(s);
1815 		is_global = strcmp(config->name, "global") == 0;
1816 
1817 		if (is_global) {
1818 			global_config = global_default_config;
1819 		}
1820 
1821 		config->filename = spdk_conf_section_get_val(s, "filename");
1822 		if (config->filename == NULL) {
1823 			config->filename = global_config.filename;
1824 		}
1825 		if (!is_global) {
1826 			if (config->filename == NULL) {
1827 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
1828 				goto error;
1829 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
1830 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
1831 				fprintf(stderr,
1832 					"filename for '%s' job is too long. Max length is %d\n",
1833 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
1834 				goto error;
1835 			}
1836 		}
1837 
1838 		cpumask = spdk_conf_section_get_val(s, "cpumask");
1839 		if (cpumask == NULL) {
1840 			config->cpumask = global_config.cpumask;
1841 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
1842 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
1843 			goto error;
1844 		}
1845 
1846 		config->bs = parse_uint_option(s, "bs", global_config.bs);
1847 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
1848 			goto error;
1849 		} else if (config->bs == 0) {
1850 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
1851 			goto error;
1852 		}
1853 
1854 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
1855 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
1856 			goto error;
1857 		} else if (config->iodepth == 0) {
1858 			fprintf(stderr,
1859 				"'iodepth' of job '%s' must be greater than 0\n",
1860 				config->name);
1861 			goto error;
1862 		}
1863 
1864 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
1865 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
1866 			goto error;
1867 		} else if (config->rwmixread > 100) {
1868 			fprintf(stderr,
1869 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
1870 				config->name);
1871 			goto error;
1872 		}
1873 
1874 		config->offset = parse_uint_option(s, "offset", global_config.offset);
1875 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
1876 			goto error;
1877 		}
1878 
1879 		val = parse_uint_option(s, "length", global_config.length);
1880 		if (val == BDEVPERF_CONFIG_ERROR) {
1881 			goto error;
1882 		}
1883 		config->length = val;
1884 
1885 		rw = spdk_conf_section_get_val(s, "rw");
1886 		config->rw = parse_rw(rw, global_config.rw);
1887 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
1888 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
1889 			goto error;
1890 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
1891 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
1892 			goto error;
1893 		}
1894 
1895 		if (is_global) {
1896 			config_set_cli_args(config);
1897 			global_config = *config;
1898 			free(config);
1899 		} else {
1900 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
1901 			n++;
1902 		}
1903 	}
1904 
1905 	printf("Using job config with %d jobs\n", n);
1906 	return 0;
1907 error:
1908 	free(config);
1909 	return 1;
1910 }
1911 
1912 static void
1913 bdevperf_run(void *arg1)
1914 {
1915 	uint32_t i;
1916 
1917 	g_main_thread = spdk_get_thread();
1918 
1919 	spdk_cpuset_zero(&g_all_cpuset);
1920 	SPDK_ENV_FOREACH_CORE(i) {
1921 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
1922 	}
1923 
1924 	if (g_wait_for_tests) {
1925 		/* Do not perform any tests until RPC is received */
1926 		return;
1927 	}
1928 
1929 	bdevperf_construct_job_configs();
1930 }
1931 
1932 static void
1933 rpc_perform_tests_cb(void)
1934 {
1935 	struct spdk_json_write_ctx *w;
1936 	struct spdk_jsonrpc_request *request = g_request;
1937 
1938 	g_request = NULL;
1939 
1940 	if (g_run_rc == 0) {
1941 		w = spdk_jsonrpc_begin_result(request);
1942 		spdk_json_write_uint32(w, g_run_rc);
1943 		spdk_jsonrpc_end_result(request, w);
1944 	} else {
1945 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
1946 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
1947 	}
1948 
1949 	/* Reset g_run_rc to 0 for the next test run. */
1950 	g_run_rc = 0;
1951 
1952 	/* Reset g_stats to 0 for the next test run. */
1953 	memset(&g_stats, 0, sizeof(g_stats));
1954 }
1955 
1956 static void
1957 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
1958 {
1959 	if (params != NULL) {
1960 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
1961 						 "perform_tests method requires no parameters");
1962 		return;
1963 	}
1964 	if (g_request != NULL) {
1965 		fprintf(stderr, "Another test is already in progress.\n");
1966 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
1967 						 spdk_strerror(-EINPROGRESS));
1968 		return;
1969 	}
1970 	g_request = request;
1971 
1972 	/* Only construct job configs at the first test run.  */
1973 	if (TAILQ_EMPTY(&job_config_list)) {
1974 		bdevperf_construct_job_configs();
1975 	} else {
1976 		bdevperf_construct_jobs();
1977 	}
1978 }
1979 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
1980 
1981 static void
1982 _bdevperf_job_drain(void *ctx)
1983 {
1984 	bdevperf_job_drain(ctx);
1985 }
1986 
1987 static void
1988 spdk_bdevperf_shutdown_cb(void)
1989 {
1990 	g_shutdown = true;
1991 	struct bdevperf_job *job, *tmp;
1992 
1993 	if (g_bdevperf.running_jobs == 0) {
1994 		bdevperf_test_done(NULL);
1995 		return;
1996 	}
1997 
1998 	/* Iterate jobs to stop all I/O */
1999 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2000 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2001 	}
2002 }
2003 
2004 static int
2005 bdevperf_parse_arg(int ch, char *arg)
2006 {
2007 	long long tmp;
2008 
2009 	if (ch == 'w') {
2010 		g_workload_type = optarg;
2011 	} else if (ch == 'T') {
2012 		g_job_bdev_name = optarg;
2013 	} else if (ch == 'z') {
2014 		g_wait_for_tests = true;
2015 	} else if (ch == 'Z') {
2016 		g_zcopy = true;
2017 	} else if (ch == 'X') {
2018 		g_abort = true;
2019 	} else if (ch == 'C') {
2020 		g_multithread_mode = true;
2021 	} else if (ch == 'f') {
2022 		g_continue_on_failure = true;
2023 	} else if (ch == 'j') {
2024 		g_bdevperf_conf_file = optarg;
2025 	} else if (ch == 'F') {
2026 		char *endptr;
2027 
2028 		errno = 0;
2029 		g_zipf_theta = strtod(optarg, &endptr);
2030 		if (errno || optarg == endptr || g_zipf_theta < 0) {
2031 			fprintf(stderr, "Illegal zipf theta value %s\n", optarg);
2032 			return -EINVAL;
2033 		}
2034 	} else {
2035 		tmp = spdk_strtoll(optarg, 10);
2036 		if (tmp < 0) {
2037 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2038 			return tmp;
2039 		} else if (tmp >= INT_MAX) {
2040 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2041 			return -ERANGE;
2042 		}
2043 
2044 		switch (ch) {
2045 		case 'q':
2046 			g_queue_depth = tmp;
2047 			break;
2048 		case 'o':
2049 			g_io_size = tmp;
2050 			break;
2051 		case 't':
2052 			g_time_in_sec = tmp;
2053 			break;
2054 		case 'k':
2055 			g_timeout_in_sec = tmp;
2056 			break;
2057 		case 'M':
2058 			g_rw_percentage = tmp;
2059 			g_mix_specified = true;
2060 			break;
2061 		case 'P':
2062 			g_show_performance_ema_period = tmp;
2063 			break;
2064 		case 'S':
2065 			g_show_performance_real_time = 1;
2066 			g_show_performance_period_in_usec = tmp * 1000000;
2067 			break;
2068 		default:
2069 			return -EINVAL;
2070 		}
2071 	}
2072 	return 0;
2073 }
2074 
2075 static void
2076 bdevperf_usage(void)
2077 {
2078 	printf(" -q <depth>                io depth\n");
2079 	printf(" -o <size>                 io size in bytes\n");
2080 	printf(" -w <type>                 io pattern type, must be one of (read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush)\n");
2081 	printf(" -t <time>                 time in seconds\n");
2082 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2083 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2084 	printf(" -P <num>                  number of moving average period\n");
2085 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2086 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2087 	printf("\t\t(only valid with -S)\n");
2088 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2089 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2090 	printf(" -f                        continue processing I/O even after failures\n");
2091 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2092 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2093 	printf(" -z                        start bdevperf, but wait for RPC to start tests\n");
2094 	printf(" -X                        abort timed out I/O\n");
2095 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2096 	printf(" -j <filename>             use job config file\n");
2097 }
2098 
2099 static int
2100 verify_test_params(struct spdk_app_opts *opts)
2101 {
2102 	/* When RPC is used for starting tests and
2103 	 * no rpc_addr was configured for the app,
2104 	 * use the default address. */
2105 	if (g_wait_for_tests && opts->rpc_addr == NULL) {
2106 		opts->rpc_addr = SPDK_DEFAULT_RPC_ADDR;
2107 	}
2108 
2109 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2110 		goto out;
2111 	}
2112 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2113 		goto out;
2114 	}
2115 	if (!g_bdevperf_conf_file && !g_workload_type) {
2116 		goto out;
2117 	}
2118 	if (g_time_in_sec <= 0) {
2119 		goto out;
2120 	}
2121 	g_time_in_usec = g_time_in_sec * 1000000LL;
2122 
2123 	if (g_timeout_in_sec < 0) {
2124 		goto out;
2125 	}
2126 
2127 	if (g_abort && !g_timeout_in_sec) {
2128 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2129 	}
2130 
2131 	if (g_show_performance_ema_period > 0 &&
2132 	    g_show_performance_real_time == 0) {
2133 		fprintf(stderr, "-P option must be specified with -S option\n");
2134 		return 1;
2135 	}
2136 
2137 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2138 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2139 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2140 		printf("Zero copy mechanism will not be used.\n");
2141 		g_zcopy = false;
2142 	}
2143 
2144 	if (g_bdevperf_conf_file) {
2145 		/* workload_type verification happens during config file parsing */
2146 		return 0;
2147 	}
2148 
2149 	if (!strcmp(g_workload_type, "verify") ||
2150 	    !strcmp(g_workload_type, "reset")) {
2151 		g_rw_percentage = 50;
2152 		if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2153 			fprintf(stderr, "Unable to exceed max I/O size of %d for verify. (%d provided).\n",
2154 				SPDK_BDEV_LARGE_BUF_MAX_SIZE, g_io_size);
2155 			return 1;
2156 		}
2157 		g_verify = true;
2158 		if (!strcmp(g_workload_type, "reset")) {
2159 			g_reset = true;
2160 		}
2161 	}
2162 
2163 	if (!strcmp(g_workload_type, "read") ||
2164 	    !strcmp(g_workload_type, "randread") ||
2165 	    !strcmp(g_workload_type, "write") ||
2166 	    !strcmp(g_workload_type, "randwrite") ||
2167 	    !strcmp(g_workload_type, "verify") ||
2168 	    !strcmp(g_workload_type, "reset") ||
2169 	    !strcmp(g_workload_type, "unmap") ||
2170 	    !strcmp(g_workload_type, "write_zeroes") ||
2171 	    !strcmp(g_workload_type, "flush")) {
2172 		if (g_mix_specified) {
2173 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2174 				" only when using rw or randrw.\n");
2175 		}
2176 	}
2177 
2178 	if (!strcmp(g_workload_type, "rw") ||
2179 	    !strcmp(g_workload_type, "randrw")) {
2180 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2181 			fprintf(stderr,
2182 				"-M must be specified to value from 0 to 100 "
2183 				"for rw or randrw.\n");
2184 			return 1;
2185 		}
2186 	}
2187 
2188 	return 0;
2189 out:
2190 	spdk_app_usage();
2191 	bdevperf_usage();
2192 	return 1;
2193 }
2194 
2195 int
2196 main(int argc, char **argv)
2197 {
2198 	struct spdk_app_opts opts = {};
2199 	int rc;
2200 
2201 	/* Use the runtime PID to set the random seed */
2202 	srand(getpid());
2203 
2204 	spdk_app_opts_init(&opts, sizeof(opts));
2205 	opts.name = "bdevperf";
2206 	opts.rpc_addr = NULL;
2207 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2208 
2209 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CF:M:P:S:T:Xj:", NULL,
2210 				      bdevperf_parse_arg, bdevperf_usage)) !=
2211 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2212 		return rc;
2213 	}
2214 
2215 	if (read_job_config()) {
2216 		free_job_config();
2217 		return 1;
2218 	}
2219 
2220 	if (verify_test_params(&opts) != 0) {
2221 		free_job_config();
2222 		exit(1);
2223 	}
2224 
2225 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
2226 
2227 	spdk_app_fini();
2228 	free_job_config();
2229 	return rc;
2230 }
2231