xref: /spdk/examples/bdev/bdevperf/bdevperf.c (revision 42d1bd28396630ca9cfb81bf7934fb8872df47f0)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2016 Intel Corporation.
3  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/bdev.h"
10 #include "spdk/accel.h"
11 #include "spdk/endian.h"
12 #include "spdk/env.h"
13 #include "spdk/event.h"
14 #include "spdk/log.h"
15 #include "spdk/util.h"
16 #include "spdk/thread.h"
17 #include "spdk/string.h"
18 #include "spdk/rpc.h"
19 #include "spdk/bit_array.h"
20 #include "spdk/conf.h"
21 #include "spdk/zipf.h"
22 #include "spdk/histogram_data.h"
23 
24 #define BDEVPERF_CONFIG_MAX_FILENAME 1024
25 #define BDEVPERF_CONFIG_UNDEFINED -1
26 #define BDEVPERF_CONFIG_ERROR -2
27 #define PATTERN_TYPES_STR "(read, write, randread, randwrite, rw, randrw, verify, reset, unmap, flush, write_zeroes)"
28 #define BDEVPERF_MAX_COREMASK_STRING 64
29 
30 struct bdevperf_task {
31 	struct iovec			iov;
32 	struct bdevperf_job		*job;
33 	struct spdk_bdev_io		*bdev_io;
34 	void				*buf;
35 	void				*verify_buf;
36 	void				*md_buf;
37 	void				*verify_md_buf;
38 	uint64_t			offset_blocks;
39 	struct bdevperf_task		*task_to_abort;
40 	enum spdk_bdev_io_type		io_type;
41 	TAILQ_ENTRY(bdevperf_task)	link;
42 	struct spdk_bdev_io_wait_entry	bdev_io_wait;
43 };
44 
45 static char *g_workload_type = NULL;
46 static int g_io_size = 0;
47 /* initialize to invalid value so we can detect if user overrides it. */
48 static int g_rw_percentage = -1;
49 static bool g_verify = false;
50 static bool g_reset = false;
51 static bool g_continue_on_failure = false;
52 static bool g_abort = false;
53 static bool g_error_to_exit = false;
54 static int g_queue_depth = 0;
55 static uint64_t g_time_in_usec;
56 static bool g_summarize_performance = true;
57 static uint64_t g_show_performance_period_in_usec = SPDK_SEC_TO_USEC;
58 static uint64_t g_show_performance_period_num = 0;
59 static uint64_t g_show_performance_ema_period = 0;
60 static int g_run_rc = 0;
61 static bool g_shutdown = false;
62 static uint64_t g_start_tsc;
63 static uint64_t g_shutdown_tsc;
64 static bool g_zcopy = false;
65 static struct spdk_thread *g_main_thread;
66 static int g_time_in_sec = 0;
67 static bool g_mix_specified = false;
68 static const char *g_job_bdev_name;
69 static bool g_wait_for_tests = false;
70 static struct spdk_jsonrpc_request *g_request = NULL;
71 static bool g_multithread_mode = false;
72 static int g_timeout_in_sec;
73 static struct spdk_conf *g_bdevperf_conf = NULL;
74 static const char *g_bdevperf_conf_file = NULL;
75 static double g_zipf_theta;
76 static bool g_random_map = false;
77 static bool g_unique_writes = false;
78 
79 static struct spdk_cpuset g_all_cpuset;
80 static struct spdk_poller *g_perf_timer = NULL;
81 
82 static void bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task);
83 static void rpc_perform_tests_cb(void);
84 static int bdevperf_parse_arg(int ch, char *arg);
85 static int verify_test_params(void);
86 static void bdevperf_usage(void);
87 
88 static uint32_t g_bdev_count = 0;
89 static uint32_t g_latency_display_level;
90 
91 static bool g_one_thread_per_lcore = false;
92 
93 static const double g_latency_cutoffs[] = {
94 	0.01,
95 	0.10,
96 	0.25,
97 	0.50,
98 	0.75,
99 	0.90,
100 	0.95,
101 	0.98,
102 	0.99,
103 	0.995,
104 	0.999,
105 	0.9999,
106 	0.99999,
107 	0.999999,
108 	0.9999999,
109 	-1,
110 };
111 
112 static const char *g_rpc_log_file_name = NULL;
113 static FILE *g_rpc_log_file = NULL;
114 
115 struct latency_info {
116 	uint64_t	min;
117 	uint64_t	max;
118 	uint64_t	total;
119 };
120 
121 
122 enum job_config_rw {
123 	JOB_CONFIG_RW_READ = 0,
124 	JOB_CONFIG_RW_WRITE,
125 	JOB_CONFIG_RW_RANDREAD,
126 	JOB_CONFIG_RW_RANDWRITE,
127 	JOB_CONFIG_RW_RW,
128 	JOB_CONFIG_RW_RANDRW,
129 	JOB_CONFIG_RW_VERIFY,
130 	JOB_CONFIG_RW_RESET,
131 	JOB_CONFIG_RW_UNMAP,
132 	JOB_CONFIG_RW_FLUSH,
133 	JOB_CONFIG_RW_WRITE_ZEROES,
134 };
135 
136 struct bdevperf_job {
137 	char				*name;
138 	struct spdk_bdev		*bdev;
139 	struct spdk_bdev_desc		*bdev_desc;
140 	struct spdk_io_channel		*ch;
141 	TAILQ_ENTRY(bdevperf_job)	link;
142 	struct spdk_thread		*thread;
143 
144 	enum job_config_rw		workload_type;
145 	int				io_size;
146 	int				rw_percentage;
147 	bool				is_random;
148 	bool				verify;
149 	bool				reset;
150 	bool				continue_on_failure;
151 	bool				unmap;
152 	bool				write_zeroes;
153 	bool				flush;
154 	bool				abort;
155 	int				queue_depth;
156 	unsigned int			seed;
157 
158 	uint64_t			io_completed;
159 	uint64_t			io_failed;
160 	uint64_t			io_timeout;
161 	uint64_t			prev_io_completed;
162 	double				ema_io_per_second;
163 	int				current_queue_depth;
164 	uint64_t			size_in_ios;
165 	uint64_t			ios_base;
166 	uint64_t			offset_in_ios;
167 	uint64_t			io_size_blocks;
168 	uint64_t			buf_size;
169 	uint32_t			dif_check_flags;
170 	bool				is_draining;
171 	struct spdk_poller		*run_timer;
172 	struct spdk_poller		*reset_timer;
173 	struct spdk_bit_array		*outstanding;
174 	struct spdk_zipf		*zipf;
175 	TAILQ_HEAD(, bdevperf_task)	task_list;
176 	uint64_t			run_time_in_usec;
177 
178 	/* keep channel's histogram data before being destroyed */
179 	struct spdk_histogram_data	*histogram;
180 	struct spdk_bit_array		*random_map;
181 
182 	/* counter used for generating unique write data (-U option) */
183 	uint32_t			write_io_count;
184 };
185 
186 struct spdk_bdevperf {
187 	TAILQ_HEAD(, bdevperf_job)	jobs;
188 	uint32_t			running_jobs;
189 };
190 
191 static struct spdk_bdevperf g_bdevperf = {
192 	.jobs = TAILQ_HEAD_INITIALIZER(g_bdevperf.jobs),
193 	.running_jobs = 0,
194 };
195 
196 /* Storing values from a section of job config file */
197 struct job_config {
198 	const char			*name;
199 	const char			*filename;
200 	struct spdk_cpuset		cpumask;
201 	int				bs;
202 	int				iodepth;
203 	int				rwmixread;
204 	uint32_t			lcore;
205 	int64_t				offset;
206 	uint64_t			length;
207 	enum job_config_rw		rw;
208 	TAILQ_ENTRY(job_config)	link;
209 };
210 
211 TAILQ_HEAD(, job_config) job_config_list
212 	= TAILQ_HEAD_INITIALIZER(job_config_list);
213 
214 static bool g_performance_dump_active = false;
215 
216 struct bdevperf_stats {
217 	uint64_t			io_time_in_usec;
218 	double				total_io_per_second;
219 	double				total_mb_per_second;
220 	double				total_failed_per_second;
221 	double				total_timeout_per_second;
222 	double				min_latency;
223 	double				max_latency;
224 	double				average_latency;
225 	uint64_t			total_io_completed;
226 	uint64_t			total_tsc;
227 };
228 
229 struct bdevperf_aggregate_stats {
230 	struct bdevperf_job		*current_job;
231 	struct bdevperf_stats		total;
232 };
233 
234 static struct bdevperf_aggregate_stats g_stats = {.total.min_latency = (double)UINT64_MAX};
235 
236 struct lcore_thread {
237 	struct spdk_thread		*thread;
238 	uint32_t			lcore;
239 	TAILQ_ENTRY(lcore_thread)	link;
240 };
241 
242 TAILQ_HEAD(, lcore_thread) g_lcore_thread_list
243 	= TAILQ_HEAD_INITIALIZER(g_lcore_thread_list);
244 
245 
246 static char *
247 parse_workload_type(enum job_config_rw ret)
248 {
249 	switch (ret) {
250 	case JOB_CONFIG_RW_READ:
251 		return "read";
252 	case JOB_CONFIG_RW_RANDREAD:
253 		return "randread";
254 	case JOB_CONFIG_RW_WRITE:
255 		return "write";
256 	case JOB_CONFIG_RW_RANDWRITE:
257 		return "randwrite";
258 	case JOB_CONFIG_RW_VERIFY:
259 		return "verify";
260 	case JOB_CONFIG_RW_RESET:
261 		return "reset";
262 	case JOB_CONFIG_RW_UNMAP:
263 		return "unmap";
264 	case JOB_CONFIG_RW_WRITE_ZEROES:
265 		return "write_zeroes";
266 	case JOB_CONFIG_RW_FLUSH:
267 		return "flush";
268 	case JOB_CONFIG_RW_RW:
269 		return "rw";
270 	case JOB_CONFIG_RW_RANDRW:
271 		return "randrw";
272 	default:
273 		fprintf(stderr, "wrong workload_type code\n");
274 	}
275 
276 	return NULL;
277 }
278 
279 /*
280  * Cumulative Moving Average (CMA): average of all data up to current
281  * Exponential Moving Average (EMA): weighted mean of the previous n data and more weight is given to recent
282  * Simple Moving Average (SMA): unweighted mean of the previous n data
283  *
284  * Bdevperf supports CMA and EMA.
285  */
286 static double
287 get_cma_io_per_second(struct bdevperf_job *job, uint64_t io_time_in_usec)
288 {
289 	return (double)job->io_completed * SPDK_SEC_TO_USEC / io_time_in_usec;
290 }
291 
292 static double
293 get_ema_io_per_second(struct bdevperf_job *job, uint64_t ema_period)
294 {
295 	double io_completed, io_per_second;
296 
297 	io_completed = job->io_completed;
298 	io_per_second = (double)(io_completed - job->prev_io_completed) * SPDK_SEC_TO_USEC
299 			/ g_show_performance_period_in_usec;
300 	job->prev_io_completed = io_completed;
301 
302 	job->ema_io_per_second += (io_per_second - job->ema_io_per_second) * 2
303 				  / (ema_period + 1);
304 	return job->ema_io_per_second;
305 }
306 
307 static void
308 get_avg_latency(void *ctx, uint64_t start, uint64_t end, uint64_t count,
309 		uint64_t total, uint64_t so_far)
310 {
311 	struct latency_info *latency_info = ctx;
312 
313 	if (count == 0) {
314 		return;
315 	}
316 
317 	latency_info->total += (start + end) / 2 * count;
318 
319 	if (so_far == count) {
320 		latency_info->min = start;
321 	}
322 
323 	if (so_far == total) {
324 		latency_info->max = end;
325 	}
326 }
327 
328 static void
329 bdevperf_job_stats_accumulate(struct bdevperf_stats *aggr_stats,
330 			      struct bdevperf_stats *job_stats)
331 {
332 	aggr_stats->total_io_per_second += job_stats->total_io_per_second;
333 	aggr_stats->total_mb_per_second += job_stats->total_mb_per_second;
334 	aggr_stats->total_failed_per_second += job_stats->total_failed_per_second;
335 	aggr_stats->total_timeout_per_second += job_stats->total_timeout_per_second;
336 	aggr_stats->total_io_completed += job_stats->total_io_completed;
337 	aggr_stats->total_tsc += job_stats->total_tsc;
338 
339 	if (job_stats->min_latency < aggr_stats->min_latency) {
340 		aggr_stats->min_latency = job_stats->min_latency;
341 	}
342 	if (job_stats->max_latency > aggr_stats->max_latency) {
343 		aggr_stats->max_latency = job_stats->max_latency;
344 	}
345 }
346 
347 static void
348 bdevperf_job_get_stats(struct bdevperf_job *job,
349 		       struct bdevperf_stats *job_stats,
350 		       uint64_t time_in_usec,
351 		       uint64_t ema_period)
352 {
353 	double io_per_second, mb_per_second, failed_per_second, timeout_per_second;
354 	double average_latency = 0.0, min_latency, max_latency;
355 	uint64_t tsc_rate;
356 	uint64_t total_io;
357 	struct latency_info latency_info = {};
358 
359 	if (ema_period == 0) {
360 		io_per_second = get_cma_io_per_second(job, time_in_usec);
361 	} else {
362 		io_per_second = get_ema_io_per_second(job, ema_period);
363 	}
364 	tsc_rate = spdk_get_ticks_hz();
365 	mb_per_second = io_per_second * job->io_size / (1024 * 1024);
366 
367 	spdk_histogram_data_iterate(job->histogram, get_avg_latency, &latency_info);
368 
369 	total_io = job->io_completed + job->io_failed;
370 	if (total_io != 0) {
371 		average_latency = (double)latency_info.total / total_io * SPDK_SEC_TO_USEC / tsc_rate;
372 	}
373 	min_latency = (double)latency_info.min * SPDK_SEC_TO_USEC / tsc_rate;
374 	max_latency = (double)latency_info.max * SPDK_SEC_TO_USEC / tsc_rate;
375 
376 	failed_per_second = (double)job->io_failed * SPDK_SEC_TO_USEC / time_in_usec;
377 	timeout_per_second = (double)job->io_timeout * SPDK_SEC_TO_USEC / time_in_usec;
378 
379 	job_stats->total_io_per_second = io_per_second;
380 	job_stats->total_mb_per_second = mb_per_second;
381 	job_stats->total_failed_per_second = failed_per_second;
382 	job_stats->total_timeout_per_second = timeout_per_second;
383 	job_stats->total_io_completed = total_io;
384 	job_stats->total_tsc = latency_info.total;
385 	job_stats->average_latency = average_latency;
386 	job_stats->min_latency = min_latency;
387 	job_stats->max_latency = max_latency;
388 	job_stats->io_time_in_usec = time_in_usec;
389 }
390 
391 static void
392 performance_dump_job_stdout(struct bdevperf_job *job,
393 			    struct bdevperf_stats *job_stats)
394 {
395 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
396 		printf("Job: %s (Core Mask 0x%s, workload: %s, percentage: %d, depth: %d, IO size: %d)\n",
397 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
398 		       parse_workload_type(job->workload_type), job->rw_percentage,
399 		       job->queue_depth, job->io_size);
400 	} else {
401 		printf("Job: %s (Core Mask 0x%s, workload: %s, depth: %d, IO size: %d)\n",
402 		       job->name, spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)),
403 		       parse_workload_type(job->workload_type), job->queue_depth, job->io_size);
404 	}
405 
406 
407 	if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
408 		printf("Job: %s ended in about %.2f seconds with error\n",
409 		       job->name, (double)job->run_time_in_usec / SPDK_SEC_TO_USEC);
410 	}
411 	if (job->verify) {
412 		printf("\t Verification LBA range: start 0x%" PRIx64 " length 0x%" PRIx64 "\n",
413 		       job->ios_base, job->size_in_ios);
414 	}
415 
416 	printf("\t %-20s: %10.2f %10.2f %10.2f",
417 	       job->name,
418 	       (float)job_stats->io_time_in_usec / SPDK_SEC_TO_USEC,
419 	       job_stats->total_io_per_second,
420 	       job_stats->total_mb_per_second);
421 	printf(" %10.2f %8.2f",
422 	       job_stats->total_failed_per_second,
423 	       job_stats->total_timeout_per_second);
424 	printf(" %10.2f %10.2f %10.2f\n",
425 	       job_stats->average_latency,
426 	       job_stats->min_latency,
427 	       job_stats->max_latency);
428 }
429 
430 static void
431 performance_dump_job_json(struct bdevperf_job *job,
432 			  struct spdk_json_write_ctx *w,
433 			  struct bdevperf_stats *job_stats)
434 {
435 	char core_mask_string[BDEVPERF_MAX_COREMASK_STRING] = {0};
436 
437 	spdk_json_write_named_string(w, "job", job->name);
438 	snprintf(core_mask_string, BDEVPERF_MAX_COREMASK_STRING,
439 		 "0x%s", spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
440 	spdk_json_write_named_string(w, "core_mask", core_mask_string);
441 	spdk_json_write_named_string(w, "workload", parse_workload_type(job->workload_type));
442 
443 	if (job->workload_type == JOB_CONFIG_RW_RW || job->workload_type == JOB_CONFIG_RW_RANDRW) {
444 		spdk_json_write_named_uint32(w, "percentage", job->rw_percentage);
445 	}
446 
447 	if (g_shutdown) {
448 		spdk_json_write_named_string(w, "status", "terminated");
449 	} else if (job->io_failed > 0 && !job->reset && !job->continue_on_failure) {
450 		spdk_json_write_named_string(w, "status", "failed");
451 	} else {
452 		spdk_json_write_named_string(w, "status", "finished");
453 	}
454 
455 	if (job->verify) {
456 		spdk_json_write_named_object_begin(w, "verify_range");
457 		spdk_json_write_named_uint64(w, "start", job->ios_base);
458 		spdk_json_write_named_uint64(w, "length", job->size_in_ios);
459 		spdk_json_write_object_end(w);
460 	}
461 
462 	spdk_json_write_named_uint32(w, "queue_depth", job->queue_depth);
463 	spdk_json_write_named_uint32(w, "io_size", job->io_size);
464 	spdk_json_write_named_double(w, "runtime", (double)job_stats->io_time_in_usec / SPDK_SEC_TO_USEC);
465 	spdk_json_write_named_double(w, "iops", job_stats->total_io_per_second);
466 	spdk_json_write_named_double(w, "mibps", job_stats->total_mb_per_second);
467 	spdk_json_write_named_uint64(w, "io_failed", job->io_failed);
468 	spdk_json_write_named_uint64(w, "io_timeout", job->io_timeout);
469 	spdk_json_write_named_double(w, "avg_latency_us", job_stats->average_latency);
470 	spdk_json_write_named_double(w, "min_latency_us", job_stats->min_latency);
471 	spdk_json_write_named_double(w, "max_latency_us", job_stats->max_latency);
472 }
473 
474 static void
475 generate_data(struct bdevperf_job *job, void *buf, void *md_buf, bool unique)
476 {
477 	int offset_blocks = 0, md_offset, data_block_size, inner_offset;
478 	int buf_len = job->buf_size;
479 	int block_size = spdk_bdev_get_block_size(job->bdev);
480 	int md_size = spdk_bdev_get_md_size(job->bdev);
481 	int num_blocks = job->io_size_blocks;
482 
483 	if (buf_len < num_blocks * block_size) {
484 		return;
485 	}
486 
487 	if (md_buf == NULL) {
488 		data_block_size = block_size - md_size;
489 		md_buf = (char *)buf + data_block_size;
490 		md_offset = block_size;
491 	} else {
492 		data_block_size = block_size;
493 		md_offset = md_size;
494 	}
495 
496 	if (unique) {
497 		uint64_t io_count = job->write_io_count++;
498 		unsigned int i;
499 
500 		assert(md_size == 0 || md_size >= (int)sizeof(uint64_t));
501 
502 		while (offset_blocks < num_blocks) {
503 			inner_offset = 0;
504 			while (inner_offset < data_block_size) {
505 				*(uint64_t *)buf = (io_count << 32) | (offset_blocks + inner_offset);
506 				inner_offset += sizeof(uint64_t);
507 				buf += sizeof(uint64_t);
508 			}
509 			for (i = 0; i < md_size / sizeof(uint64_t); i++) {
510 				((uint64_t *)md_buf)[i] = (io_count << 32) | offset_blocks;
511 			}
512 			md_buf += md_offset;
513 			offset_blocks++;
514 		}
515 		return;
516 	}
517 
518 	while (offset_blocks < num_blocks) {
519 		inner_offset = 0;
520 		while (inner_offset < data_block_size) {
521 			*(uint32_t *)buf = offset_blocks + inner_offset;
522 			inner_offset += sizeof(uint32_t);
523 			buf += sizeof(uint32_t);
524 		}
525 		memset(md_buf, offset_blocks, md_size);
526 		md_buf += md_offset;
527 		offset_blocks++;
528 	}
529 }
530 
531 static bool
532 copy_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
533 	  void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks)
534 {
535 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
536 		return false;
537 	}
538 
539 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
540 
541 	memcpy(wr_buf, rd_buf, block_size * num_blocks);
542 
543 	if (wr_md_buf != NULL) {
544 		memcpy(wr_md_buf, rd_md_buf, md_size * num_blocks);
545 	}
546 
547 	return true;
548 }
549 
550 static bool
551 verify_data(void *wr_buf, int wr_buf_len, void *rd_buf, int rd_buf_len, int block_size,
552 	    void *wr_md_buf, void *rd_md_buf, int md_size, int num_blocks, bool md_check)
553 {
554 	int offset_blocks = 0, md_offset, data_block_size;
555 
556 	if (wr_buf_len < num_blocks * block_size || rd_buf_len < num_blocks * block_size) {
557 		return false;
558 	}
559 
560 	assert((wr_md_buf != NULL) == (rd_md_buf != NULL));
561 
562 	if (wr_md_buf == NULL) {
563 		data_block_size = block_size - md_size;
564 		wr_md_buf = (char *)wr_buf + data_block_size;
565 		rd_md_buf = (char *)rd_buf + data_block_size;
566 		md_offset = block_size;
567 	} else {
568 		data_block_size = block_size;
569 		md_offset = md_size;
570 	}
571 
572 	while (offset_blocks < num_blocks) {
573 		if (memcmp(wr_buf, rd_buf, data_block_size) != 0) {
574 			printf("data_block_size %d, num_blocks %d, offset %d\n", data_block_size, num_blocks,
575 			       offset_blocks);
576 			spdk_log_dump(stdout, "rd_buf", rd_buf, data_block_size);
577 			spdk_log_dump(stdout, "wr_buf", wr_buf, data_block_size);
578 			return false;
579 		}
580 
581 		wr_buf += block_size;
582 		rd_buf += block_size;
583 
584 		if (md_check) {
585 			if (memcmp(wr_md_buf, rd_md_buf, md_size) != 0) {
586 				printf("md_size %d, num_blocks %d, offset %d\n", md_size, num_blocks, offset_blocks);
587 				spdk_log_dump(stdout, "rd_md_buf", rd_md_buf, md_size);
588 				spdk_log_dump(stdout, "wr_md_buf", wr_md_buf, md_size);
589 				return false;
590 			}
591 
592 			wr_md_buf += md_offset;
593 			rd_md_buf += md_offset;
594 		}
595 
596 		offset_blocks++;
597 	}
598 
599 	return true;
600 }
601 
602 static void
603 free_job_config(void)
604 {
605 	struct job_config *config, *tmp;
606 
607 	spdk_conf_free(g_bdevperf_conf);
608 	g_bdevperf_conf = NULL;
609 
610 	TAILQ_FOREACH_SAFE(config, &job_config_list, link, tmp) {
611 		TAILQ_REMOVE(&job_config_list, config, link);
612 		free(config);
613 	}
614 }
615 
616 static void
617 bdevperf_job_free(struct bdevperf_job *job)
618 {
619 	spdk_histogram_data_free(job->histogram);
620 	spdk_bit_array_free(&job->outstanding);
621 	spdk_bit_array_free(&job->random_map);
622 	spdk_zipf_free(&job->zipf);
623 	free(job->name);
624 	free(job);
625 }
626 
627 static void
628 job_thread_exit(void *ctx)
629 {
630 	spdk_thread_exit(spdk_get_thread());
631 }
632 
633 static void
634 check_cutoff(void *ctx, uint64_t start, uint64_t end, uint64_t count,
635 	     uint64_t total, uint64_t so_far)
636 {
637 	double so_far_pct;
638 	double **cutoff = ctx;
639 	uint64_t tsc_rate;
640 
641 	if (count == 0) {
642 		return;
643 	}
644 
645 	tsc_rate = spdk_get_ticks_hz();
646 	so_far_pct = (double)so_far / total;
647 	while (so_far_pct >= **cutoff && **cutoff > 0) {
648 		printf("%9.5f%% : %9.3fus\n", **cutoff * 100, (double)end * SPDK_SEC_TO_USEC / tsc_rate);
649 		(*cutoff)++;
650 	}
651 }
652 
653 static void
654 print_bucket(void *ctx, uint64_t start, uint64_t end, uint64_t count,
655 	     uint64_t total, uint64_t so_far)
656 {
657 	double so_far_pct;
658 	uint64_t tsc_rate;
659 
660 	if (count == 0) {
661 		return;
662 	}
663 
664 	tsc_rate = spdk_get_ticks_hz();
665 	so_far_pct = (double)so_far * 100 / total;
666 	printf("%9.3f - %9.3f: %9.4f%%  (%9ju)\n",
667 	       (double)start * SPDK_SEC_TO_USEC / tsc_rate,
668 	       (double)end * SPDK_SEC_TO_USEC / tsc_rate,
669 	       so_far_pct, count);
670 }
671 
672 static void
673 bdevperf_test_done(void *ctx)
674 {
675 	struct bdevperf_job *job, *jtmp;
676 	struct bdevperf_task *task, *ttmp;
677 	struct lcore_thread *lthread, *lttmp;
678 	double average_latency = 0.0;
679 	uint64_t time_in_usec;
680 	int rc;
681 	struct spdk_json_write_ctx *w = NULL;
682 	struct bdevperf_stats job_stats = {0};
683 	struct spdk_cpuset cpu_mask;
684 
685 	if (g_time_in_usec) {
686 		g_stats.total.io_time_in_usec = g_time_in_usec;
687 
688 		if (!g_run_rc && g_performance_dump_active) {
689 			spdk_thread_send_msg(spdk_get_thread(), bdevperf_test_done, NULL);
690 			return;
691 		}
692 	}
693 
694 	spdk_poller_unregister(&g_perf_timer);
695 
696 	if (g_shutdown) {
697 		g_shutdown_tsc = spdk_get_ticks() - g_start_tsc;
698 		time_in_usec = g_shutdown_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
699 		g_time_in_usec = (g_time_in_usec > time_in_usec) ? time_in_usec : g_time_in_usec;
700 		printf("Received shutdown signal, test time was about %.6f seconds\n",
701 		       (double)g_time_in_usec / SPDK_SEC_TO_USEC);
702 	}
703 	/* Send RPC response if g_run_rc indicate success, or shutdown request was sent to bdevperf.
704 	 * rpc_perform_tests_cb will send error response in case of error.
705 	 */
706 	if ((g_run_rc == 0 || g_shutdown) && g_request) {
707 		w = spdk_jsonrpc_begin_result(g_request);
708 		spdk_json_write_object_begin(w);
709 		spdk_json_write_named_array_begin(w, "results");
710 	}
711 
712 	printf("\n%*s\n", 107, "Latency(us)");
713 	printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
714 	       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
715 
716 
717 	spdk_cpuset_zero(&cpu_mask);
718 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
719 		spdk_cpuset_or(&cpu_mask, spdk_thread_get_cpumask(job->thread));
720 		memset(&job_stats, 0, sizeof(job_stats));
721 		bdevperf_job_get_stats(job, &job_stats, job->run_time_in_usec, 0);
722 		bdevperf_job_stats_accumulate(&g_stats.total, &job_stats);
723 		performance_dump_job_stdout(job, &job_stats);
724 		if (w) {
725 			spdk_json_write_object_begin(w);
726 			performance_dump_job_json(job, w, &job_stats);
727 			spdk_json_write_object_end(w);
728 		}
729 	}
730 
731 	if (w) {
732 		spdk_json_write_array_end(w);
733 		spdk_json_write_named_uint32(w, "core_count", spdk_cpuset_count(&cpu_mask));
734 		spdk_json_write_object_end(w);
735 		spdk_jsonrpc_end_result(g_request, w);
736 	}
737 	printf("\r =================================================================================="
738 	       "=================================\n");
739 	printf("\r %-28s: %10s %10.2f %10.2f",
740 	       "Total", "", g_stats.total.total_io_per_second, g_stats.total.total_mb_per_second);
741 	printf(" %10.2f %8.2f",
742 	       g_stats.total.total_failed_per_second, g_stats.total.total_timeout_per_second);
743 
744 	if (g_stats.total.total_io_completed != 0) {
745 		average_latency = ((double)g_stats.total.total_tsc / g_stats.total.total_io_completed) *
746 				  SPDK_SEC_TO_USEC /
747 				  spdk_get_ticks_hz();
748 	}
749 	printf(" %10.2f %10.2f %10.2f\n", average_latency, g_stats.total.min_latency,
750 	       g_stats.total.max_latency);
751 
752 	if (g_latency_display_level == 0 || g_stats.total.total_io_completed == 0) {
753 		goto clean;
754 	}
755 
756 	printf("\n Latency summary\n");
757 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
758 		printf("\r =============================================\n");
759 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
760 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
761 
762 		const double *cutoff = g_latency_cutoffs;
763 
764 		spdk_histogram_data_iterate(job->histogram, check_cutoff, &cutoff);
765 
766 		printf("\n");
767 	}
768 
769 	if (g_latency_display_level == 1) {
770 		goto clean;
771 	}
772 
773 	printf("\r Latency histogram\n");
774 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
775 		printf("\r =============================================\n");
776 		printf("\r Job: %s (Core Mask 0x%s)\n", job->name,
777 		       spdk_cpuset_fmt(spdk_thread_get_cpumask(job->thread)));
778 
779 		spdk_histogram_data_iterate(job->histogram, print_bucket, NULL);
780 		printf("\n");
781 	}
782 
783 clean:
784 	fflush(stdout);
785 
786 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, jtmp) {
787 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
788 
789 		if (!g_one_thread_per_lcore) {
790 			spdk_thread_send_msg(job->thread, job_thread_exit, NULL);
791 		}
792 
793 		TAILQ_FOREACH_SAFE(task, &job->task_list, link, ttmp) {
794 			TAILQ_REMOVE(&job->task_list, task, link);
795 			spdk_free(task->buf);
796 			spdk_free(task->verify_buf);
797 			spdk_free(task->md_buf);
798 			spdk_free(task->verify_md_buf);
799 			free(task);
800 		}
801 
802 		bdevperf_job_free(job);
803 	}
804 
805 	if (g_one_thread_per_lcore) {
806 		TAILQ_FOREACH_SAFE(lthread, &g_lcore_thread_list, link, lttmp) {
807 			TAILQ_REMOVE(&g_lcore_thread_list, lthread, link);
808 			spdk_thread_send_msg(lthread->thread, job_thread_exit, NULL);
809 			free(lthread);
810 		}
811 	}
812 
813 	if (g_bdevperf_conf == NULL) {
814 		free_job_config();
815 	}
816 
817 	rc = g_run_rc;
818 	if (g_request && !g_shutdown) {
819 		rpc_perform_tests_cb();
820 		if (rc != 0) {
821 			spdk_app_stop(rc);
822 		}
823 	} else {
824 		spdk_app_stop(rc);
825 	}
826 }
827 
828 static void
829 bdevperf_job_end(void *ctx)
830 {
831 	assert(g_main_thread == spdk_get_thread());
832 
833 	if (--g_bdevperf.running_jobs == 0) {
834 		bdevperf_test_done(NULL);
835 	}
836 }
837 
838 static void
839 bdevperf_channel_get_histogram_cb(void *cb_arg, int status, struct spdk_histogram_data *histogram)
840 {
841 	struct spdk_histogram_data *job_hist = cb_arg;
842 
843 	if (status == 0) {
844 		spdk_histogram_data_merge(job_hist, histogram);
845 	}
846 }
847 
848 static void
849 bdevperf_job_empty(struct bdevperf_job *job)
850 {
851 	uint64_t end_tsc = 0;
852 
853 	end_tsc = spdk_get_ticks() - g_start_tsc;
854 	job->run_time_in_usec = end_tsc * SPDK_SEC_TO_USEC / spdk_get_ticks_hz();
855 	/* keep histogram info before channel is destroyed */
856 	spdk_bdev_channel_get_histogram(job->ch, bdevperf_channel_get_histogram_cb,
857 					job->histogram);
858 	spdk_put_io_channel(job->ch);
859 	spdk_bdev_close(job->bdev_desc);
860 	spdk_thread_send_msg(g_main_thread, bdevperf_job_end, NULL);
861 }
862 
863 static void
864 bdevperf_end_task(struct bdevperf_task *task)
865 {
866 	struct bdevperf_job     *job = task->job;
867 
868 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
869 	if (job->is_draining) {
870 		if (job->current_queue_depth == 0) {
871 			bdevperf_job_empty(job);
872 		}
873 	}
874 }
875 
876 static void
877 bdevperf_queue_io_wait_with_cb(struct bdevperf_task *task, spdk_bdev_io_wait_cb cb_fn)
878 {
879 	struct bdevperf_job	*job = task->job;
880 
881 	task->bdev_io_wait.bdev = job->bdev;
882 	task->bdev_io_wait.cb_fn = cb_fn;
883 	task->bdev_io_wait.cb_arg = task;
884 	spdk_bdev_queue_io_wait(job->bdev, job->ch, &task->bdev_io_wait);
885 }
886 
887 static int
888 bdevperf_job_drain(void *ctx)
889 {
890 	struct bdevperf_job *job = ctx;
891 
892 	spdk_poller_unregister(&job->run_timer);
893 	if (job->reset) {
894 		spdk_poller_unregister(&job->reset_timer);
895 	}
896 
897 	job->is_draining = true;
898 
899 	return -1;
900 }
901 
902 static int
903 bdevperf_job_drain_timer(void *ctx)
904 {
905 	struct bdevperf_job *job = ctx;
906 
907 	bdevperf_job_drain(ctx);
908 	if (job->current_queue_depth == 0) {
909 		bdevperf_job_empty(job);
910 	}
911 
912 	return SPDK_POLLER_BUSY;
913 }
914 
915 static void
916 bdevperf_abort_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
917 {
918 	struct bdevperf_task	*task = cb_arg;
919 	struct bdevperf_job	*job = task->job;
920 
921 	job->current_queue_depth--;
922 
923 	if (success) {
924 		job->io_completed++;
925 	} else {
926 		job->io_failed++;
927 		if (!job->continue_on_failure) {
928 			bdevperf_job_drain(job);
929 			g_run_rc = -1;
930 		}
931 	}
932 
933 	spdk_bdev_free_io(bdev_io);
934 	bdevperf_end_task(task);
935 }
936 
937 static int
938 bdevperf_verify_dif(struct bdevperf_task *task)
939 {
940 	struct bdevperf_job	*job = task->job;
941 	struct spdk_bdev	*bdev = job->bdev;
942 	struct spdk_dif_ctx	dif_ctx;
943 	struct spdk_dif_error	err_blk = {};
944 	int			rc;
945 	struct spdk_dif_ctx_init_ext_opts dif_opts;
946 
947 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
948 	dif_opts.dif_pi_format = spdk_bdev_get_dif_pi_format(bdev);
949 	rc = spdk_dif_ctx_init(&dif_ctx,
950 			       spdk_bdev_get_block_size(bdev),
951 			       spdk_bdev_get_md_size(bdev),
952 			       spdk_bdev_is_md_interleaved(bdev),
953 			       spdk_bdev_is_dif_head_of_md(bdev),
954 			       spdk_bdev_get_dif_type(bdev),
955 			       job->dif_check_flags,
956 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
957 	if (rc != 0) {
958 		fprintf(stderr, "Initialization of DIF context failed\n");
959 		return rc;
960 	}
961 
962 	if (spdk_bdev_is_md_interleaved(bdev)) {
963 		rc = spdk_dif_verify(&task->iov, 1, job->io_size_blocks, &dif_ctx, &err_blk);
964 	} else {
965 		struct iovec md_iov = {
966 			.iov_base	= task->md_buf,
967 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
968 		};
969 
970 		rc = spdk_dix_verify(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx, &err_blk);
971 	}
972 
973 	if (rc != 0) {
974 		fprintf(stderr, "DIF/DIX error detected. type=%d, offset=%" PRIu32 "\n",
975 			err_blk.err_type, err_blk.err_offset);
976 	}
977 
978 	return rc;
979 }
980 
981 static void
982 bdevperf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
983 {
984 	struct bdevperf_job	*job;
985 	struct bdevperf_task	*task = cb_arg;
986 	bool			md_check;
987 	uint64_t		offset_in_ios;
988 	int			rc;
989 
990 	job = task->job;
991 	md_check = spdk_bdev_get_dif_type(job->bdev) == SPDK_DIF_DISABLE;
992 
993 	if (g_error_to_exit == true) {
994 		bdevperf_job_drain(job);
995 	} else if (!success) {
996 		if (!job->reset && !job->continue_on_failure) {
997 			bdevperf_job_drain(job);
998 			g_run_rc = -1;
999 			g_error_to_exit = true;
1000 			printf("task offset: %" PRIu64 " on job bdev=%s fails\n",
1001 			       task->offset_blocks, job->name);
1002 		}
1003 	} else if (job->verify || job->reset) {
1004 		if (!verify_data(task->buf, job->buf_size,
1005 				 task->iov.iov_base, job->buf_size,
1006 				 spdk_bdev_get_block_size(job->bdev),
1007 				 task->md_buf, spdk_bdev_io_get_md_buf(bdev_io),
1008 				 spdk_bdev_get_md_size(job->bdev),
1009 				 job->io_size_blocks, md_check)) {
1010 			printf("Buffer mismatch! Target: %s Disk Offset: %" PRIu64 "\n", job->name, task->offset_blocks);
1011 			bdevperf_job_drain(job);
1012 			g_run_rc = -1;
1013 		}
1014 	} else if (job->dif_check_flags != 0) {
1015 		if (task->io_type == SPDK_BDEV_IO_TYPE_READ && spdk_bdev_get_md_size(job->bdev) != 0) {
1016 			rc = bdevperf_verify_dif(task);
1017 			if (rc != 0) {
1018 				printf("DIF error detected. task offset: %" PRIu64 " on job bdev=%s\n",
1019 				       task->offset_blocks, job->name);
1020 
1021 				success = false;
1022 				if (!job->reset && !job->continue_on_failure) {
1023 					bdevperf_job_drain(job);
1024 					g_run_rc = -1;
1025 					g_error_to_exit = true;
1026 				}
1027 			}
1028 		}
1029 	}
1030 
1031 	job->current_queue_depth--;
1032 
1033 	if (success) {
1034 		job->io_completed++;
1035 	} else {
1036 		job->io_failed++;
1037 	}
1038 
1039 	if (job->verify) {
1040 		assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1041 		offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1042 
1043 		assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1044 		spdk_bit_array_clear(job->outstanding, offset_in_ios);
1045 	}
1046 
1047 	spdk_bdev_free_io(bdev_io);
1048 
1049 	/*
1050 	 * is_draining indicates when time has expired for the test run
1051 	 * and we are just waiting for the previously submitted I/O
1052 	 * to complete.  In this case, do not submit a new I/O to replace
1053 	 * the one just completed.
1054 	 */
1055 	if (!job->is_draining) {
1056 		bdevperf_submit_single(job, task);
1057 	} else {
1058 		bdevperf_end_task(task);
1059 	}
1060 }
1061 
1062 static void
1063 bdevperf_verify_submit_read(void *cb_arg)
1064 {
1065 	struct bdevperf_job	*job;
1066 	struct bdevperf_task	*task = cb_arg;
1067 	int			rc;
1068 
1069 	job = task->job;
1070 
1071 	task->iov.iov_base = task->verify_buf;
1072 	task->iov.iov_len = job->buf_size;
1073 
1074 	/* Read the data back in */
1075 	rc = spdk_bdev_readv_blocks_with_md(job->bdev_desc, job->ch, &task->iov, 1, task->verify_md_buf,
1076 					    task->offset_blocks, job->io_size_blocks,
1077 					    bdevperf_complete, task);
1078 
1079 	if (rc == -ENOMEM) {
1080 		bdevperf_queue_io_wait_with_cb(task, bdevperf_verify_submit_read);
1081 	} else if (rc != 0) {
1082 		printf("Failed to submit read: %d\n", rc);
1083 		bdevperf_job_drain(job);
1084 		g_run_rc = rc;
1085 	}
1086 }
1087 
1088 static void
1089 bdevperf_verify_write_complete(struct spdk_bdev_io *bdev_io, bool success,
1090 			       void *cb_arg)
1091 {
1092 	if (success) {
1093 		spdk_bdev_free_io(bdev_io);
1094 		bdevperf_verify_submit_read(cb_arg);
1095 	} else {
1096 		bdevperf_complete(bdev_io, success, cb_arg);
1097 	}
1098 }
1099 
1100 static void
1101 bdevperf_zcopy_populate_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1102 {
1103 	if (!success) {
1104 		bdevperf_complete(bdev_io, success, cb_arg);
1105 		return;
1106 	}
1107 
1108 	spdk_bdev_zcopy_end(bdev_io, false, bdevperf_complete, cb_arg);
1109 }
1110 
1111 static int
1112 bdevperf_generate_dif(struct bdevperf_task *task)
1113 {
1114 	struct bdevperf_job	*job = task->job;
1115 	struct spdk_bdev	*bdev = job->bdev;
1116 	struct spdk_dif_ctx	dif_ctx;
1117 	int			rc;
1118 	struct spdk_dif_ctx_init_ext_opts dif_opts;
1119 
1120 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
1121 	dif_opts.dif_pi_format = spdk_bdev_get_dif_pi_format(bdev);
1122 	rc = spdk_dif_ctx_init(&dif_ctx,
1123 			       spdk_bdev_get_block_size(bdev),
1124 			       spdk_bdev_get_md_size(bdev),
1125 			       spdk_bdev_is_md_interleaved(bdev),
1126 			       spdk_bdev_is_dif_head_of_md(bdev),
1127 			       spdk_bdev_get_dif_type(bdev),
1128 			       job->dif_check_flags,
1129 			       task->offset_blocks, 0, 0, 0, 0, &dif_opts);
1130 	if (rc != 0) {
1131 		fprintf(stderr, "Initialization of DIF context failed\n");
1132 		return rc;
1133 	}
1134 
1135 	if (spdk_bdev_is_md_interleaved(bdev)) {
1136 		rc = spdk_dif_generate(&task->iov, 1, job->io_size_blocks, &dif_ctx);
1137 	} else {
1138 		struct iovec md_iov = {
1139 			.iov_base	= task->md_buf,
1140 			.iov_len	= spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
1141 		};
1142 
1143 		rc = spdk_dix_generate(&task->iov, 1, &md_iov, job->io_size_blocks, &dif_ctx);
1144 	}
1145 
1146 	if (rc != 0) {
1147 		fprintf(stderr, "Generation of DIF/DIX failed\n");
1148 	}
1149 
1150 	return rc;
1151 }
1152 
1153 static void
1154 bdevperf_submit_task(void *arg)
1155 {
1156 	struct bdevperf_task	*task = arg;
1157 	struct bdevperf_job	*job = task->job;
1158 	struct spdk_bdev_desc	*desc;
1159 	struct spdk_io_channel	*ch;
1160 	spdk_bdev_io_completion_cb cb_fn;
1161 	uint64_t		offset_in_ios;
1162 	int			rc = 0;
1163 
1164 	desc = job->bdev_desc;
1165 	ch = job->ch;
1166 
1167 	switch (task->io_type) {
1168 	case SPDK_BDEV_IO_TYPE_WRITE:
1169 		if (spdk_bdev_get_md_size(job->bdev) != 0 && job->dif_check_flags != 0) {
1170 			rc = bdevperf_generate_dif(task);
1171 		}
1172 		if (rc == 0) {
1173 			cb_fn = (job->verify || job->reset) ? bdevperf_verify_write_complete : bdevperf_complete;
1174 
1175 			if (g_zcopy) {
1176 				spdk_bdev_zcopy_end(task->bdev_io, true, cb_fn, task);
1177 				return;
1178 			} else {
1179 				rc = spdk_bdev_writev_blocks_with_md(desc, ch, &task->iov, 1,
1180 								     task->md_buf,
1181 								     task->offset_blocks,
1182 								     job->io_size_blocks,
1183 								     cb_fn, task);
1184 			}
1185 		}
1186 		break;
1187 	case SPDK_BDEV_IO_TYPE_FLUSH:
1188 		rc = spdk_bdev_flush_blocks(desc, ch, task->offset_blocks,
1189 					    job->io_size_blocks, bdevperf_complete, task);
1190 		break;
1191 	case SPDK_BDEV_IO_TYPE_UNMAP:
1192 		rc = spdk_bdev_unmap_blocks(desc, ch, task->offset_blocks,
1193 					    job->io_size_blocks, bdevperf_complete, task);
1194 		break;
1195 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1196 		rc = spdk_bdev_write_zeroes_blocks(desc, ch, task->offset_blocks,
1197 						   job->io_size_blocks, bdevperf_complete, task);
1198 		break;
1199 	case SPDK_BDEV_IO_TYPE_READ:
1200 		if (g_zcopy) {
1201 			rc = spdk_bdev_zcopy_start(desc, ch, NULL, 0, task->offset_blocks, job->io_size_blocks,
1202 						   true, bdevperf_zcopy_populate_complete, task);
1203 		} else {
1204 			rc = spdk_bdev_readv_blocks_with_md(desc, ch, &task->iov, 1,
1205 							    task->md_buf,
1206 							    task->offset_blocks,
1207 							    job->io_size_blocks,
1208 							    bdevperf_complete, task);
1209 		}
1210 		break;
1211 	case SPDK_BDEV_IO_TYPE_ABORT:
1212 		rc = spdk_bdev_abort(desc, ch, task->task_to_abort, bdevperf_abort_complete, task);
1213 		break;
1214 	default:
1215 		assert(false);
1216 		rc = -EINVAL;
1217 		break;
1218 	}
1219 
1220 	if (rc == -ENOMEM) {
1221 		bdevperf_queue_io_wait_with_cb(task, bdevperf_submit_task);
1222 		return;
1223 	} else if (rc != 0) {
1224 		printf("Failed to submit bdev_io: %d\n", rc);
1225 		if (job->verify) {
1226 			assert(task->offset_blocks / job->io_size_blocks >= job->ios_base);
1227 			offset_in_ios = task->offset_blocks / job->io_size_blocks - job->ios_base;
1228 
1229 			assert(spdk_bit_array_get(job->outstanding, offset_in_ios) == true);
1230 			spdk_bit_array_clear(job->outstanding, offset_in_ios);
1231 		}
1232 		bdevperf_job_drain(job);
1233 		g_run_rc = rc;
1234 		return;
1235 	}
1236 
1237 	job->current_queue_depth++;
1238 }
1239 
1240 static void
1241 bdevperf_zcopy_get_buf_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1242 {
1243 	struct bdevperf_task	*task = cb_arg;
1244 	struct bdevperf_job	*job = task->job;
1245 	struct iovec		*iovs;
1246 	int			iovcnt;
1247 
1248 	if (!success) {
1249 		bdevperf_job_drain(job);
1250 		g_run_rc = -1;
1251 		return;
1252 	}
1253 
1254 	task->bdev_io = bdev_io;
1255 	task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1256 
1257 	if (job->verify || job->reset) {
1258 		/* When job->verify or job->reset is enabled, task->buf is used for
1259 		 *  verification of read after write.  For write I/O, when zcopy APIs
1260 		 *  are used, task->buf cannot be used, and data must be written to
1261 		 *  the data buffer allocated underneath bdev layer instead.
1262 		 *  Hence we copy task->buf to the allocated data buffer here.
1263 		 */
1264 		spdk_bdev_io_get_iovec(bdev_io, &iovs, &iovcnt);
1265 		assert(iovcnt == 1);
1266 		assert(iovs != NULL);
1267 
1268 		copy_data(iovs[0].iov_base, iovs[0].iov_len, task->buf, job->buf_size,
1269 			  spdk_bdev_get_block_size(job->bdev),
1270 			  spdk_bdev_io_get_md_buf(bdev_io), task->md_buf,
1271 			  spdk_bdev_get_md_size(job->bdev), job->io_size_blocks);
1272 	}
1273 
1274 	bdevperf_submit_task(task);
1275 }
1276 
1277 static void
1278 bdevperf_prep_zcopy_write_task(void *arg)
1279 {
1280 	struct bdevperf_task	*task = arg;
1281 	struct bdevperf_job	*job = task->job;
1282 	int			rc;
1283 
1284 	rc = spdk_bdev_zcopy_start(job->bdev_desc, job->ch, NULL, 0,
1285 				   task->offset_blocks, job->io_size_blocks,
1286 				   false, bdevperf_zcopy_get_buf_complete, task);
1287 	if (rc != 0) {
1288 		assert(rc == -ENOMEM);
1289 		bdevperf_queue_io_wait_with_cb(task, bdevperf_prep_zcopy_write_task);
1290 		return;
1291 	}
1292 
1293 	job->current_queue_depth++;
1294 }
1295 
1296 static struct bdevperf_task *
1297 bdevperf_job_get_task(struct bdevperf_job *job)
1298 {
1299 	struct bdevperf_task *task;
1300 
1301 	task = TAILQ_FIRST(&job->task_list);
1302 	if (!task) {
1303 		printf("Task allocation failed\n");
1304 		abort();
1305 	}
1306 
1307 	TAILQ_REMOVE(&job->task_list, task, link);
1308 	return task;
1309 }
1310 
1311 static void
1312 bdevperf_submit_single(struct bdevperf_job *job, struct bdevperf_task *task)
1313 {
1314 	uint64_t offset_in_ios;
1315 	uint64_t rand_value;
1316 	uint32_t first_clear;
1317 
1318 	if (job->zipf) {
1319 		offset_in_ios = spdk_zipf_generate(job->zipf);
1320 	} else if (job->is_random) {
1321 		/* RAND_MAX is only INT32_MAX, so use 2 calls to rand_r to
1322 		 * get a large enough value to ensure we are issuing I/O
1323 		 * uniformly across the whole bdev.
1324 		 */
1325 		rand_value = (uint64_t)rand_r(&job->seed) * RAND_MAX + rand_r(&job->seed);
1326 		offset_in_ios = rand_value % job->size_in_ios;
1327 
1328 		if (g_random_map) {
1329 			/* Make sure, that the offset does not exceed the maximum size
1330 			 * of the bit array (verified during job creation)
1331 			 */
1332 			assert(offset_in_ios < UINT32_MAX);
1333 
1334 			first_clear = spdk_bit_array_find_first_clear(job->random_map, (uint32_t)offset_in_ios);
1335 
1336 			if (first_clear == UINT32_MAX) {
1337 				first_clear = spdk_bit_array_find_first_clear(job->random_map, 0);
1338 
1339 				if (first_clear == UINT32_MAX) {
1340 					/* If there are no more clear bits in the array, we start over
1341 					 * and select the previously selected random value.
1342 					 */
1343 					spdk_bit_array_clear_mask(job->random_map);
1344 					first_clear = (uint32_t)offset_in_ios;
1345 				}
1346 			}
1347 
1348 			spdk_bit_array_set(job->random_map, first_clear);
1349 
1350 			offset_in_ios = first_clear;
1351 		}
1352 	} else {
1353 		offset_in_ios = job->offset_in_ios++;
1354 		if (job->offset_in_ios == job->size_in_ios) {
1355 			job->offset_in_ios = 0;
1356 		}
1357 
1358 		/* Increment of offset_in_ios if there's already an outstanding IO
1359 		 * to that location. We only need this with job->verify as random
1360 		 * offsets are not supported with job->verify at this time.
1361 		 */
1362 		if (job->verify) {
1363 			assert(spdk_bit_array_find_first_clear(job->outstanding, 0) != UINT32_MAX);
1364 
1365 			while (spdk_bit_array_get(job->outstanding, offset_in_ios)) {
1366 				offset_in_ios = job->offset_in_ios++;
1367 				if (job->offset_in_ios == job->size_in_ios) {
1368 					job->offset_in_ios = 0;
1369 				}
1370 			}
1371 			spdk_bit_array_set(job->outstanding, offset_in_ios);
1372 		}
1373 	}
1374 
1375 	/* For multi-thread to same job, offset_in_ios is relative
1376 	 * to the LBA range assigned for that job. job->offset_blocks
1377 	 * is absolute (entire bdev LBA range).
1378 	 */
1379 	task->offset_blocks = (offset_in_ios + job->ios_base) * job->io_size_blocks;
1380 
1381 	if (job->flush) {
1382 		task->io_type = SPDK_BDEV_IO_TYPE_FLUSH;
1383 	} else if (job->unmap) {
1384 		task->io_type = SPDK_BDEV_IO_TYPE_UNMAP;
1385 	} else if (job->write_zeroes) {
1386 		task->io_type = SPDK_BDEV_IO_TYPE_WRITE_ZEROES;
1387 	} else if ((job->rw_percentage == 100) ||
1388 		   (job->rw_percentage != 0 && ((rand_r(&job->seed) % 100) < job->rw_percentage))) {
1389 		assert(!job->verify);
1390 		task->io_type = SPDK_BDEV_IO_TYPE_READ;
1391 		if (!g_zcopy) {
1392 			task->iov.iov_base = task->buf;
1393 			task->iov.iov_len = job->buf_size;
1394 		}
1395 	} else {
1396 		if (job->verify || job->reset || g_unique_writes) {
1397 			generate_data(job, task->buf, task->md_buf, g_unique_writes);
1398 		}
1399 		if (g_zcopy) {
1400 			bdevperf_prep_zcopy_write_task(task);
1401 			return;
1402 		} else {
1403 			task->iov.iov_base = task->buf;
1404 			task->iov.iov_len = job->buf_size;
1405 			task->io_type = SPDK_BDEV_IO_TYPE_WRITE;
1406 		}
1407 	}
1408 
1409 	bdevperf_submit_task(task);
1410 }
1411 
1412 static int reset_job(void *arg);
1413 
1414 static void
1415 reset_cb(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1416 {
1417 	struct bdevperf_task	*task = cb_arg;
1418 	struct bdevperf_job	*job = task->job;
1419 
1420 	if (!success) {
1421 		printf("Reset blockdev=%s failed\n", spdk_bdev_get_name(job->bdev));
1422 		bdevperf_job_drain(job);
1423 		g_run_rc = -1;
1424 	}
1425 
1426 	TAILQ_INSERT_TAIL(&job->task_list, task, link);
1427 	spdk_bdev_free_io(bdev_io);
1428 
1429 	job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1430 						10 * SPDK_SEC_TO_USEC);
1431 }
1432 
1433 static int
1434 reset_job(void *arg)
1435 {
1436 	struct bdevperf_job *job = arg;
1437 	struct bdevperf_task *task;
1438 	int rc;
1439 
1440 	spdk_poller_unregister(&job->reset_timer);
1441 
1442 	/* Do reset. */
1443 	task = bdevperf_job_get_task(job);
1444 	rc = spdk_bdev_reset(job->bdev_desc, job->ch,
1445 			     reset_cb, task);
1446 	if (rc) {
1447 		printf("Reset failed: %d\n", rc);
1448 		bdevperf_job_drain(job);
1449 		g_run_rc = -1;
1450 	}
1451 
1452 	return -1;
1453 }
1454 
1455 static void
1456 bdevperf_timeout_cb(void *cb_arg, struct spdk_bdev_io *bdev_io)
1457 {
1458 	struct bdevperf_job *job = cb_arg;
1459 	struct bdevperf_task *task;
1460 
1461 	job->io_timeout++;
1462 
1463 	if (job->is_draining || !job->abort ||
1464 	    !spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ABORT)) {
1465 		return;
1466 	}
1467 
1468 	task = bdevperf_job_get_task(job);
1469 	if (task == NULL) {
1470 		return;
1471 	}
1472 
1473 	task->task_to_abort = spdk_bdev_io_get_cb_arg(bdev_io);
1474 	task->io_type = SPDK_BDEV_IO_TYPE_ABORT;
1475 
1476 	bdevperf_submit_task(task);
1477 }
1478 
1479 static void
1480 bdevperf_job_run(void *ctx)
1481 {
1482 	struct bdevperf_job *job = ctx;
1483 	struct bdevperf_task *task;
1484 	int i;
1485 
1486 	/* Submit initial I/O for this job. Each time one
1487 	 * completes, another will be submitted. */
1488 
1489 	/* Start a timer to stop this I/O chain when the run is over */
1490 	job->run_timer = SPDK_POLLER_REGISTER(bdevperf_job_drain_timer, job, g_time_in_usec);
1491 	if (job->reset) {
1492 		job->reset_timer = SPDK_POLLER_REGISTER(reset_job, job,
1493 							10 * SPDK_SEC_TO_USEC);
1494 	}
1495 
1496 	spdk_bdev_set_timeout(job->bdev_desc, g_timeout_in_sec, bdevperf_timeout_cb, job);
1497 
1498 	for (i = 0; i < job->queue_depth; i++) {
1499 		task = bdevperf_job_get_task(job);
1500 		bdevperf_submit_single(job, task);
1501 	}
1502 }
1503 
1504 static void
1505 _performance_dump_done(void *ctx)
1506 {
1507 	struct bdevperf_aggregate_stats *aggregate = ctx;
1508 	struct bdevperf_stats *stats = &aggregate->total;
1509 	double average_latency;
1510 
1511 	if (g_summarize_performance) {
1512 		printf("%12.2f IOPS, %8.2f MiB/s", stats->total_io_per_second, stats->total_mb_per_second);
1513 		printf("\r");
1514 	} else {
1515 		printf("\r =================================================================================="
1516 		       "=================================\n");
1517 		printf("\r %-28s: %10s %10.2f %10.2f",
1518 		       "Total", "", stats->total_io_per_second, stats->total_mb_per_second);
1519 		printf(" %10.2f %8.2f",
1520 		       stats->total_failed_per_second, stats->total_timeout_per_second);
1521 
1522 		average_latency = ((double)stats->total_tsc / stats->total_io_completed) * SPDK_SEC_TO_USEC /
1523 				  spdk_get_ticks_hz();
1524 		printf(" %10.2f %10.2f %10.2f\n", average_latency, stats->min_latency, stats->max_latency);
1525 		printf("\n");
1526 	}
1527 
1528 	fflush(stdout);
1529 
1530 	g_performance_dump_active = false;
1531 
1532 	free(aggregate);
1533 }
1534 
1535 static void
1536 _performance_dump(void *ctx)
1537 {
1538 	struct bdevperf_aggregate_stats *stats = ctx;
1539 	struct bdevperf_stats job_stats = {0};
1540 	struct bdevperf_job *job = stats->current_job;
1541 	uint64_t time_in_usec;
1542 
1543 	if (job->io_failed > 0 && !job->continue_on_failure) {
1544 		time_in_usec = job->run_time_in_usec;
1545 	} else {
1546 		time_in_usec = stats->total.io_time_in_usec;
1547 	}
1548 
1549 	bdevperf_job_get_stats(job, &job_stats, time_in_usec, g_show_performance_ema_period);
1550 	bdevperf_job_stats_accumulate(&stats->total, &job_stats);
1551 	if (!g_summarize_performance) {
1552 		performance_dump_job_stdout(stats->current_job, &job_stats);
1553 	}
1554 
1555 	/* This assumes the jobs list is static after start up time.
1556 	 * That's true right now, but if that ever changed this would need a lock. */
1557 	stats->current_job = TAILQ_NEXT(stats->current_job, link);
1558 	if (stats->current_job == NULL) {
1559 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, stats);
1560 	} else {
1561 		spdk_thread_send_msg(stats->current_job->thread, _performance_dump, stats);
1562 	}
1563 }
1564 
1565 static int
1566 performance_statistics_thread(void *arg)
1567 {
1568 	struct bdevperf_aggregate_stats *aggregate;
1569 	struct bdevperf_stats *stats;
1570 
1571 
1572 	if (g_performance_dump_active) {
1573 		return -1;
1574 	}
1575 
1576 	g_performance_dump_active = true;
1577 
1578 	aggregate = calloc(1, sizeof(*aggregate));
1579 	if (aggregate == NULL) {
1580 		return -1;
1581 	}
1582 	stats = &aggregate->total;
1583 	stats->min_latency = (double)UINT64_MAX;
1584 
1585 	g_show_performance_period_num++;
1586 
1587 	stats->io_time_in_usec = g_show_performance_period_num * g_show_performance_period_in_usec;
1588 
1589 	/* Iterate all of the jobs to gather stats
1590 	 * These jobs will not get removed here until a final performance dump is run,
1591 	 * so this should be safe without locking.
1592 	 */
1593 	aggregate->current_job = TAILQ_FIRST(&g_bdevperf.jobs);
1594 	if (aggregate->current_job == NULL) {
1595 		spdk_thread_send_msg(g_main_thread, _performance_dump_done, aggregate);
1596 	} else {
1597 		spdk_thread_send_msg(aggregate->current_job->thread, _performance_dump, aggregate);
1598 	}
1599 
1600 	return -1;
1601 }
1602 
1603 static void
1604 bdevperf_test(void)
1605 {
1606 	struct bdevperf_job *job;
1607 
1608 	if (TAILQ_EMPTY(&g_bdevperf.jobs)) {
1609 		if (g_request) {
1610 			spdk_jsonrpc_send_error_response_fmt(g_request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
1611 							     "No jobs defined or bdevs created");
1612 			g_request = NULL;
1613 		}
1614 		return;
1615 	}
1616 
1617 	printf("Running I/O for %" PRIu64 " seconds...\n", g_time_in_usec / (uint64_t)SPDK_SEC_TO_USEC);
1618 	fflush(stdout);
1619 
1620 	/* Start a timer to dump performance numbers */
1621 	g_start_tsc = spdk_get_ticks();
1622 	if (!g_summarize_performance) {
1623 		printf("%*s\n", 107, "Latency(us)");
1624 		printf("\r %-*s: %10s %10s %10s %10s %8s %10s %10s %10s\n",
1625 		       28, "Device Information", "runtime(s)", "IOPS", "MiB/s", "Fail/s", "TO/s", "Average", "min", "max");
1626 	}
1627 	if (!g_perf_timer) {
1628 		g_perf_timer = SPDK_POLLER_REGISTER(performance_statistics_thread, NULL,
1629 						    g_show_performance_period_in_usec);
1630 	}
1631 
1632 	/* Iterate jobs to start all I/O */
1633 	TAILQ_FOREACH(job, &g_bdevperf.jobs, link) {
1634 		g_bdevperf.running_jobs++;
1635 		spdk_thread_send_msg(job->thread, bdevperf_job_run, job);
1636 	}
1637 }
1638 
1639 static void
1640 bdevperf_bdev_removed(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1641 {
1642 	struct bdevperf_job *job = event_ctx;
1643 
1644 	if (SPDK_BDEV_EVENT_REMOVE == type) {
1645 		bdevperf_job_drain(job);
1646 	}
1647 }
1648 
1649 static void
1650 bdevperf_histogram_status_cb(void *cb_arg, int status)
1651 {
1652 	if (status != 0) {
1653 		g_run_rc = status;
1654 		if (g_continue_on_failure == false) {
1655 			g_error_to_exit = true;
1656 		}
1657 	}
1658 
1659 	if (--g_bdev_count == 0) {
1660 		if (g_run_rc == 0) {
1661 			/* Ready to run the test */
1662 			bdevperf_test();
1663 		} else {
1664 			bdevperf_test_done(NULL);
1665 		}
1666 	}
1667 }
1668 
1669 static uint32_t g_construct_job_count = 0;
1670 
1671 static int
1672 _bdevperf_enable_histogram(void *ctx, struct spdk_bdev *bdev)
1673 {
1674 	bool *enable = ctx;
1675 
1676 	g_bdev_count++;
1677 
1678 	spdk_bdev_histogram_enable(bdev, bdevperf_histogram_status_cb, NULL, *enable);
1679 
1680 	return 0;
1681 }
1682 
1683 static void
1684 bdevperf_enable_histogram(bool enable)
1685 {
1686 	struct spdk_bdev *bdev;
1687 	int rc;
1688 
1689 	/* increment initial g_bdev_count so that it will never reach 0 in the middle of iteration */
1690 	g_bdev_count = 1;
1691 
1692 	if (g_job_bdev_name != NULL) {
1693 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
1694 		if (bdev) {
1695 			rc = _bdevperf_enable_histogram(&enable, bdev);
1696 		} else {
1697 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
1698 			rc = -1;
1699 		}
1700 	} else {
1701 		rc = spdk_for_each_bdev_leaf(&enable, _bdevperf_enable_histogram);
1702 	}
1703 
1704 	bdevperf_histogram_status_cb(NULL, rc);
1705 }
1706 
1707 static void
1708 _bdevperf_construct_job_done(void *ctx)
1709 {
1710 	if (--g_construct_job_count == 0) {
1711 		if (g_run_rc != 0) {
1712 			/* Something failed. */
1713 			bdevperf_test_done(NULL);
1714 			return;
1715 		}
1716 
1717 		/* always enable histogram. */
1718 		bdevperf_enable_histogram(true);
1719 	} else if (g_run_rc != 0) {
1720 		/* Reset error as some jobs constructed right */
1721 		g_run_rc = 0;
1722 		if (g_continue_on_failure == false) {
1723 			g_error_to_exit = true;
1724 		}
1725 	}
1726 }
1727 
1728 /* Checkformat will not allow to use inlined type,
1729    this is a workaround */
1730 typedef struct spdk_thread *spdk_thread_t;
1731 
1732 static spdk_thread_t
1733 construct_job_thread(struct spdk_cpuset *cpumask, const char *tag)
1734 {
1735 	struct spdk_cpuset tmp;
1736 
1737 	/* This function runs on the main thread. */
1738 	assert(g_main_thread == spdk_get_thread());
1739 
1740 	/* Handle default mask */
1741 	if (spdk_cpuset_count(cpumask) == 0) {
1742 		cpumask = &g_all_cpuset;
1743 	}
1744 
1745 	/* Warn user that mask might need to be changed */
1746 	spdk_cpuset_copy(&tmp, cpumask);
1747 	spdk_cpuset_or(&tmp, &g_all_cpuset);
1748 	if (!spdk_cpuset_equal(&tmp, &g_all_cpuset)) {
1749 		fprintf(stderr, "cpumask for '%s' is too big\n", tag);
1750 	}
1751 
1752 	return spdk_thread_create(tag, cpumask);
1753 }
1754 
1755 static uint32_t
1756 _get_next_core(void)
1757 {
1758 	static uint32_t current_core = SPDK_ENV_LCORE_ID_ANY;
1759 
1760 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1761 		current_core = spdk_env_get_first_core();
1762 		return current_core;
1763 	}
1764 
1765 	current_core = spdk_env_get_next_core(current_core);
1766 	if (current_core == SPDK_ENV_LCORE_ID_ANY) {
1767 		current_core = spdk_env_get_first_core();
1768 	}
1769 
1770 	return current_core;
1771 }
1772 
1773 static void
1774 _bdevperf_construct_job(void *ctx)
1775 {
1776 	struct bdevperf_job *job = ctx;
1777 	int rc;
1778 
1779 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(job->bdev), true, bdevperf_bdev_removed, job,
1780 				&job->bdev_desc);
1781 	if (rc != 0) {
1782 		SPDK_ERRLOG("Could not open leaf bdev %s, error=%d\n", spdk_bdev_get_name(job->bdev), rc);
1783 		g_run_rc = -EINVAL;
1784 		goto end;
1785 	}
1786 
1787 	if (g_zcopy) {
1788 		if (!spdk_bdev_io_type_supported(job->bdev, SPDK_BDEV_IO_TYPE_ZCOPY)) {
1789 			printf("Test requires ZCOPY but bdev module does not support ZCOPY\n");
1790 			g_run_rc = -ENOTSUP;
1791 			goto end;
1792 		}
1793 	}
1794 
1795 	job->ch = spdk_bdev_get_io_channel(job->bdev_desc);
1796 	if (!job->ch) {
1797 		SPDK_ERRLOG("Could not get io_channel for device %s, error=%d\n", spdk_bdev_get_name(job->bdev),
1798 			    rc);
1799 		spdk_bdev_close(job->bdev_desc);
1800 		TAILQ_REMOVE(&g_bdevperf.jobs, job, link);
1801 		g_run_rc = -ENOMEM;
1802 		goto end;
1803 	}
1804 
1805 end:
1806 	spdk_thread_send_msg(g_main_thread, _bdevperf_construct_job_done, NULL);
1807 }
1808 
1809 static void
1810 job_init_rw(struct bdevperf_job *job, enum job_config_rw rw)
1811 {
1812 	switch (rw) {
1813 	case JOB_CONFIG_RW_READ:
1814 		job->rw_percentage = 100;
1815 		break;
1816 	case JOB_CONFIG_RW_WRITE:
1817 		job->rw_percentage = 0;
1818 		break;
1819 	case JOB_CONFIG_RW_RANDREAD:
1820 		job->is_random = true;
1821 		job->rw_percentage = 100;
1822 		job->seed = rand();
1823 		break;
1824 	case JOB_CONFIG_RW_RANDWRITE:
1825 		job->is_random = true;
1826 		job->rw_percentage = 0;
1827 		job->seed = rand();
1828 		break;
1829 	case JOB_CONFIG_RW_RW:
1830 		job->is_random = false;
1831 		break;
1832 	case JOB_CONFIG_RW_RANDRW:
1833 		job->is_random = true;
1834 		job->seed = rand();
1835 		break;
1836 	case JOB_CONFIG_RW_RESET:
1837 		/* Reset shares the flow with verify. */
1838 		job->reset = true;
1839 	/* fallthrough */
1840 	case JOB_CONFIG_RW_VERIFY:
1841 		job->verify = true;
1842 		/* For verify flow read is done on write completion
1843 		 * callback only, rw_percentage shall not be used. */
1844 		job->rw_percentage = 0;
1845 		break;
1846 	case JOB_CONFIG_RW_UNMAP:
1847 		job->unmap = true;
1848 		break;
1849 	case JOB_CONFIG_RW_FLUSH:
1850 		job->flush = true;
1851 		break;
1852 	case JOB_CONFIG_RW_WRITE_ZEROES:
1853 		job->write_zeroes = true;
1854 		break;
1855 	}
1856 }
1857 
1858 static int
1859 bdevperf_construct_job(struct spdk_bdev *bdev, struct job_config *config,
1860 		       struct spdk_thread *thread)
1861 {
1862 	struct bdevperf_job *job;
1863 	struct bdevperf_task *task;
1864 	int block_size, data_block_size;
1865 	int rc;
1866 	int task_num, n;
1867 	int32_t numa_id;
1868 
1869 	block_size = spdk_bdev_get_block_size(bdev);
1870 	data_block_size = spdk_bdev_get_data_block_size(bdev);
1871 
1872 	job = calloc(1, sizeof(struct bdevperf_job));
1873 	if (!job) {
1874 		fprintf(stderr, "Unable to allocate memory for new job.\n");
1875 		return -ENOMEM;
1876 	}
1877 
1878 	job->name = strdup(spdk_bdev_get_name(bdev));
1879 	if (!job->name) {
1880 		fprintf(stderr, "Unable to allocate memory for job name.\n");
1881 		bdevperf_job_free(job);
1882 		return -ENOMEM;
1883 	}
1884 
1885 	job->workload_type = config->rw;
1886 	job->io_size = config->bs;
1887 	job->rw_percentage = config->rwmixread;
1888 	job->continue_on_failure = g_continue_on_failure;
1889 	job->queue_depth = config->iodepth;
1890 	job->bdev = bdev;
1891 	job->io_size_blocks = job->io_size / data_block_size;
1892 	job->buf_size = job->io_size_blocks * block_size;
1893 	job->abort = g_abort;
1894 	job_init_rw(job, config->rw);
1895 
1896 	if ((job->io_size % data_block_size) != 0) {
1897 		SPDK_ERRLOG("IO size (%d) is not multiples of data block size of bdev %s (%"PRIu32")\n",
1898 			    job->io_size, spdk_bdev_get_name(bdev), data_block_size);
1899 		bdevperf_job_free(job);
1900 		return -ENOTSUP;
1901 	}
1902 
1903 	if (job->unmap && !spdk_bdev_io_type_supported(bdev, SPDK_BDEV_IO_TYPE_UNMAP)) {
1904 		printf("Skipping %s because it does not support unmap\n", spdk_bdev_get_name(bdev));
1905 		bdevperf_job_free(job);
1906 		return -ENOTSUP;
1907 	}
1908 
1909 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_REFTAG)) {
1910 		job->dif_check_flags |= SPDK_DIF_FLAGS_REFTAG_CHECK;
1911 	}
1912 	if (spdk_bdev_is_dif_check_enabled(bdev, SPDK_DIF_CHECK_TYPE_GUARD)) {
1913 		job->dif_check_flags |= SPDK_DIF_FLAGS_GUARD_CHECK;
1914 	}
1915 
1916 	job->offset_in_ios = 0;
1917 
1918 	if (config->length != 0) {
1919 		/* Use subset of disk */
1920 		job->size_in_ios = config->length / job->io_size_blocks;
1921 		job->ios_base = config->offset / job->io_size_blocks;
1922 	} else {
1923 		/* Use whole disk */
1924 		job->size_in_ios = spdk_bdev_get_num_blocks(bdev) / job->io_size_blocks;
1925 		job->ios_base = 0;
1926 	}
1927 
1928 	if (job->is_random && g_zipf_theta > 0) {
1929 		job->zipf = spdk_zipf_create(job->size_in_ios, g_zipf_theta, 0);
1930 	}
1931 
1932 	if (job->verify) {
1933 		if (job->size_in_ios >= UINT32_MAX) {
1934 			SPDK_ERRLOG("Due to constraints of verify operation, the job storage capacity is too large\n");
1935 			bdevperf_job_free(job);
1936 			return -ENOMEM;
1937 		}
1938 		job->outstanding = spdk_bit_array_create(job->size_in_ios);
1939 		if (job->outstanding == NULL) {
1940 			SPDK_ERRLOG("Could not create outstanding array bitmap for bdev %s\n",
1941 				    spdk_bdev_get_name(bdev));
1942 			bdevperf_job_free(job);
1943 			return -ENOMEM;
1944 		}
1945 		if (job->queue_depth > (int)job->size_in_ios) {
1946 			SPDK_WARNLOG("Due to constraints of verify job, queue depth (-q, %d) can't exceed the number of IO "
1947 				     "requests which can be submitted to the bdev %s simultaneously (%"PRIu64"). "
1948 				     "Queue depth is limited to %"PRIu64"\n",
1949 				     job->queue_depth, job->name, job->size_in_ios, job->size_in_ios);
1950 			job->queue_depth = (int)job->size_in_ios;
1951 		}
1952 	}
1953 
1954 	job->histogram = spdk_histogram_data_alloc();
1955 	if (job->histogram == NULL) {
1956 		fprintf(stderr, "Failed to allocate histogram\n");
1957 		bdevperf_job_free(job);
1958 		return -ENOMEM;
1959 	}
1960 
1961 	TAILQ_INIT(&job->task_list);
1962 
1963 	if (g_random_map) {
1964 		if (job->size_in_ios >= UINT32_MAX) {
1965 			SPDK_ERRLOG("Due to constraints of the random map, the job storage capacity is too large\n");
1966 			bdevperf_job_free(job);
1967 			return -ENOMEM;
1968 		}
1969 		job->random_map = spdk_bit_array_create(job->size_in_ios);
1970 		if (job->random_map == NULL) {
1971 			SPDK_ERRLOG("Could not create random_map array bitmap for bdev %s\n",
1972 				    spdk_bdev_get_name(bdev));
1973 			bdevperf_job_free(job);
1974 			return -ENOMEM;
1975 		}
1976 	}
1977 
1978 	task_num = job->queue_depth;
1979 	if (job->reset) {
1980 		task_num += 1;
1981 	}
1982 	if (job->abort) {
1983 		task_num += job->queue_depth;
1984 	}
1985 
1986 	TAILQ_INSERT_TAIL(&g_bdevperf.jobs, job, link);
1987 
1988 	numa_id = spdk_bdev_get_numa_id(job->bdev);
1989 
1990 	for (n = 0; n < task_num; n++) {
1991 		task = calloc(1, sizeof(struct bdevperf_task));
1992 		if (!task) {
1993 			fprintf(stderr, "Failed to allocate task from memory\n");
1994 			spdk_zipf_free(&job->zipf);
1995 			return -ENOMEM;
1996 		}
1997 
1998 		task->buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
1999 					 numa_id, SPDK_MALLOC_DMA);
2000 		if (!task->buf) {
2001 			fprintf(stderr, "Cannot allocate buf for task=%p\n", task);
2002 			spdk_zipf_free(&job->zipf);
2003 			free(task);
2004 			return -ENOMEM;
2005 		}
2006 
2007 		if (job->verify && job->buf_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2008 			task->verify_buf = spdk_zmalloc(job->buf_size, spdk_bdev_get_buf_align(job->bdev), NULL,
2009 							numa_id, SPDK_MALLOC_DMA);
2010 			if (!task->verify_buf) {
2011 				fprintf(stderr, "Cannot allocate buf_verify for task=%p\n", task);
2012 				spdk_free(task->buf);
2013 				spdk_zipf_free(&job->zipf);
2014 				free(task);
2015 				return -ENOMEM;
2016 			}
2017 
2018 			if (spdk_bdev_is_md_separate(job->bdev)) {
2019 				task->verify_md_buf = spdk_zmalloc(spdk_bdev_get_md_size(bdev) * job->io_size_blocks,
2020 								   spdk_bdev_get_buf_align(job->bdev), NULL, numa_id, SPDK_MALLOC_DMA);
2021 				if (!task->verify_md_buf) {
2022 					fprintf(stderr, "Cannot allocate verify_md_buf for task=%p\n", task);
2023 					spdk_free(task->buf);
2024 					spdk_free(task->verify_buf);
2025 					spdk_zipf_free(&job->zipf);
2026 					free(task);
2027 					return -ENOMEM;
2028 				}
2029 			}
2030 		}
2031 
2032 		if (spdk_bdev_is_md_separate(job->bdev)) {
2033 			task->md_buf = spdk_zmalloc(job->io_size_blocks *
2034 						    spdk_bdev_get_md_size(job->bdev), 0, NULL,
2035 						    numa_id, SPDK_MALLOC_DMA);
2036 			if (!task->md_buf) {
2037 				fprintf(stderr, "Cannot allocate md buf for task=%p\n", task);
2038 				spdk_zipf_free(&job->zipf);
2039 				spdk_free(task->verify_buf);
2040 				spdk_free(task->verify_md_buf);
2041 				spdk_free(task->buf);
2042 				free(task);
2043 				return -ENOMEM;
2044 			}
2045 		}
2046 
2047 		task->job = job;
2048 		TAILQ_INSERT_TAIL(&job->task_list, task, link);
2049 	}
2050 
2051 	job->thread = thread;
2052 
2053 	g_construct_job_count++;
2054 
2055 	rc = spdk_thread_send_msg(thread, _bdevperf_construct_job, job);
2056 	assert(rc == 0);
2057 
2058 	return rc;
2059 }
2060 
2061 static int
2062 parse_rw(const char *str, enum job_config_rw ret)
2063 {
2064 	if (str == NULL) {
2065 		return ret;
2066 	}
2067 
2068 	if (!strcmp(str, "read")) {
2069 		ret = JOB_CONFIG_RW_READ;
2070 	} else if (!strcmp(str, "randread")) {
2071 		ret = JOB_CONFIG_RW_RANDREAD;
2072 	} else if (!strcmp(str, "write")) {
2073 		ret = JOB_CONFIG_RW_WRITE;
2074 	} else if (!strcmp(str, "randwrite")) {
2075 		ret = JOB_CONFIG_RW_RANDWRITE;
2076 	} else if (!strcmp(str, "verify")) {
2077 		ret = JOB_CONFIG_RW_VERIFY;
2078 	} else if (!strcmp(str, "reset")) {
2079 		ret = JOB_CONFIG_RW_RESET;
2080 	} else if (!strcmp(str, "unmap")) {
2081 		ret = JOB_CONFIG_RW_UNMAP;
2082 	} else if (!strcmp(str, "write_zeroes")) {
2083 		ret = JOB_CONFIG_RW_WRITE_ZEROES;
2084 	} else if (!strcmp(str, "flush")) {
2085 		ret = JOB_CONFIG_RW_FLUSH;
2086 	} else if (!strcmp(str, "rw")) {
2087 		ret = JOB_CONFIG_RW_RW;
2088 	} else if (!strcmp(str, "randrw")) {
2089 		ret = JOB_CONFIG_RW_RANDRW;
2090 	} else {
2091 		fprintf(stderr, "rw must be one of\n"
2092 			PATTERN_TYPES_STR "\n");
2093 		ret = BDEVPERF_CONFIG_ERROR;
2094 	}
2095 
2096 	return ret;
2097 }
2098 
2099 static const char *
2100 config_filename_next(const char *filename, char *out)
2101 {
2102 	int i, k;
2103 
2104 	if (filename == NULL) {
2105 		out[0] = '\0';
2106 		return NULL;
2107 	}
2108 
2109 	if (filename[0] == ':') {
2110 		filename++;
2111 	}
2112 
2113 	for (i = 0, k = 0;
2114 	     filename[i] != '\0' &&
2115 	     filename[i] != ':' &&
2116 	     i < BDEVPERF_CONFIG_MAX_FILENAME &&
2117 	     k < (BDEVPERF_CONFIG_MAX_FILENAME - 1);
2118 	     i++) {
2119 		if (filename[i] == ' ' || filename[i] == '\t') {
2120 			continue;
2121 		}
2122 
2123 		out[k++] = filename[i];
2124 	}
2125 	out[k] = 0;
2126 
2127 	return filename + i;
2128 }
2129 
2130 static struct spdk_thread *
2131 get_lcore_thread(uint32_t lcore)
2132 {
2133 	struct lcore_thread *lthread;
2134 
2135 	TAILQ_FOREACH(lthread, &g_lcore_thread_list, link) {
2136 		if (lthread->lcore == lcore) {
2137 			return lthread->thread;
2138 		}
2139 	}
2140 
2141 	return NULL;
2142 }
2143 
2144 static void
2145 create_lcore_thread(uint32_t lcore)
2146 {
2147 	struct lcore_thread *lthread;
2148 	struct spdk_cpuset cpumask = {};
2149 	char name[32];
2150 
2151 	lthread = calloc(1, sizeof(*lthread));
2152 	assert(lthread != NULL);
2153 
2154 	lthread->lcore = lcore;
2155 
2156 	snprintf(name, sizeof(name), "lcore_%u", lcore);
2157 	spdk_cpuset_set_cpu(&cpumask, lcore, true);
2158 
2159 	lthread->thread = spdk_thread_create(name, &cpumask);
2160 	assert(lthread->thread != NULL);
2161 
2162 	TAILQ_INSERT_TAIL(&g_lcore_thread_list, lthread, link);
2163 }
2164 
2165 static void
2166 bdevperf_construct_jobs(void)
2167 {
2168 	char filename[BDEVPERF_CONFIG_MAX_FILENAME];
2169 	struct spdk_thread *thread;
2170 	struct job_config *config;
2171 	struct spdk_bdev *bdev;
2172 	const char *filenames;
2173 	uint32_t i;
2174 	int rc;
2175 
2176 	if (g_one_thread_per_lcore) {
2177 		SPDK_ENV_FOREACH_CORE(i) {
2178 			create_lcore_thread(i);
2179 		}
2180 	}
2181 
2182 	TAILQ_FOREACH(config, &job_config_list, link) {
2183 		filenames = config->filename;
2184 
2185 		if (!g_one_thread_per_lcore) {
2186 			thread = construct_job_thread(&config->cpumask, config->name);
2187 		} else {
2188 			thread = get_lcore_thread(config->lcore);
2189 		}
2190 		assert(thread);
2191 
2192 		while (filenames) {
2193 			filenames = config_filename_next(filenames, filename);
2194 			if (strlen(filename) == 0) {
2195 				break;
2196 			}
2197 
2198 			bdev = spdk_bdev_get_by_name(filename);
2199 			if (!bdev) {
2200 				fprintf(stderr, "Unable to find bdev '%s'\n", filename);
2201 				g_run_rc = -EINVAL;
2202 				return;
2203 			}
2204 
2205 			rc = bdevperf_construct_job(bdev, config, thread);
2206 			if (rc < 0) {
2207 				g_run_rc = rc;
2208 				return;
2209 			}
2210 		}
2211 	}
2212 }
2213 
2214 static int
2215 make_cli_job_config(const char *filename, int64_t offset, uint64_t range)
2216 {
2217 	struct job_config *config = calloc(1, sizeof(*config));
2218 
2219 	if (config == NULL) {
2220 		fprintf(stderr, "Unable to allocate memory for job config\n");
2221 		return -ENOMEM;
2222 	}
2223 
2224 	config->name = filename;
2225 	config->filename = filename;
2226 	config->lcore = _get_next_core();
2227 	spdk_cpuset_zero(&config->cpumask);
2228 	spdk_cpuset_set_cpu(&config->cpumask, config->lcore, true);
2229 	config->bs = g_io_size;
2230 	config->iodepth = g_queue_depth;
2231 	config->rwmixread = g_rw_percentage;
2232 	config->offset = offset;
2233 	config->length = range;
2234 	config->rw = parse_rw(g_workload_type, BDEVPERF_CONFIG_ERROR);
2235 	if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2236 		free(config);
2237 		return -EINVAL;
2238 	}
2239 
2240 	TAILQ_INSERT_TAIL(&job_config_list, config, link);
2241 	return 0;
2242 }
2243 
2244 static int
2245 bdevperf_construct_multithread_job_config(void *ctx, struct spdk_bdev *bdev)
2246 {
2247 	uint32_t *num_cores = ctx;
2248 	uint32_t i;
2249 	uint64_t blocks_per_job;
2250 	int64_t offset;
2251 	int rc;
2252 
2253 	blocks_per_job = spdk_bdev_get_num_blocks(bdev) / *num_cores;
2254 	offset = 0;
2255 
2256 	SPDK_ENV_FOREACH_CORE(i) {
2257 		rc = make_cli_job_config(spdk_bdev_get_name(bdev), offset, blocks_per_job);
2258 		if (rc) {
2259 			return rc;
2260 		}
2261 
2262 		offset += blocks_per_job;
2263 	}
2264 
2265 	return 0;
2266 }
2267 
2268 static void
2269 bdevperf_construct_multithread_job_configs(void)
2270 {
2271 	struct spdk_bdev *bdev;
2272 	uint32_t i;
2273 	uint32_t num_cores;
2274 
2275 	num_cores = 0;
2276 	SPDK_ENV_FOREACH_CORE(i) {
2277 		num_cores++;
2278 	}
2279 
2280 	if (num_cores == 0) {
2281 		g_run_rc = -EINVAL;
2282 		return;
2283 	}
2284 
2285 	if (g_job_bdev_name != NULL) {
2286 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2287 		if (!bdev) {
2288 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2289 			return;
2290 		}
2291 		g_run_rc = bdevperf_construct_multithread_job_config(&num_cores, bdev);
2292 	} else {
2293 		g_run_rc = spdk_for_each_bdev_leaf(&num_cores, bdevperf_construct_multithread_job_config);
2294 	}
2295 
2296 }
2297 
2298 static int
2299 bdevperf_construct_job_config(void *ctx, struct spdk_bdev *bdev)
2300 {
2301 	/* Construct the job */
2302 	return make_cli_job_config(spdk_bdev_get_name(bdev), 0, 0);
2303 }
2304 
2305 static void
2306 bdevperf_construct_job_configs(void)
2307 {
2308 	struct spdk_bdev *bdev;
2309 
2310 	/* There are three different modes for allocating jobs. Standard mode
2311 	 * (the default) creates one spdk_thread per bdev and runs the I/O job there.
2312 	 *
2313 	 * The -C flag places bdevperf into "multithread" mode, meaning it creates
2314 	 * one spdk_thread per bdev PER CORE, and runs a copy of the job on each.
2315 	 * This runs multiple threads per bdev, effectively.
2316 	 *
2317 	 * The -j flag implies "FIO" mode which tries to mimic semantic of FIO jobs.
2318 	 * In "FIO" mode, threads are spawned per-job instead of per-bdev.
2319 	 * Each FIO job can be individually parameterized by filename, cpu mask, etc,
2320 	 * which is different from other modes in that they only support global options.
2321 	 *
2322 	 * Both for standard mode and "multithread" mode, if the -E flag is specified,
2323 	 * it creates one spdk_thread PER CORE. On each core, one spdk_thread is shared by
2324 	 * multiple jobs.
2325 	 */
2326 
2327 	if (g_bdevperf_conf) {
2328 		goto end;
2329 	}
2330 
2331 	if (g_multithread_mode) {
2332 		bdevperf_construct_multithread_job_configs();
2333 	} else if (g_job_bdev_name != NULL) {
2334 		bdev = spdk_bdev_get_by_name(g_job_bdev_name);
2335 		if (bdev) {
2336 			/* Construct the job */
2337 			g_run_rc = make_cli_job_config(g_job_bdev_name, 0, 0);
2338 		} else {
2339 			fprintf(stderr, "Unable to find bdev '%s'\n", g_job_bdev_name);
2340 		}
2341 	} else {
2342 		g_run_rc = spdk_for_each_bdev_leaf(NULL, bdevperf_construct_job_config);
2343 	}
2344 
2345 end:
2346 	/* Increment initial construct_jobs count so that it will never reach 0 in the middle
2347 	 * of iteration.
2348 	 */
2349 	g_construct_job_count = 1;
2350 
2351 	if (g_run_rc == 0) {
2352 		bdevperf_construct_jobs();
2353 	}
2354 
2355 	_bdevperf_construct_job_done(NULL);
2356 }
2357 
2358 static int
2359 parse_uint_option(struct spdk_conf_section *s, const char *name, int def)
2360 {
2361 	const char *job_name;
2362 	int tmp;
2363 
2364 	tmp = spdk_conf_section_get_intval(s, name);
2365 	if (tmp == -1) {
2366 		/* Field was not found. Check default value
2367 		 * In [global] section it is ok to have undefined values
2368 		 * but for other sections it is not ok */
2369 		if (def == BDEVPERF_CONFIG_UNDEFINED) {
2370 			job_name = spdk_conf_section_get_name(s);
2371 			if (strcmp(job_name, "global") == 0) {
2372 				return def;
2373 			}
2374 
2375 			fprintf(stderr,
2376 				"Job '%s' has no '%s' assigned\n",
2377 				job_name, name);
2378 			return BDEVPERF_CONFIG_ERROR;
2379 		}
2380 		return def;
2381 	}
2382 
2383 	/* NOTE: get_intval returns nonnegative on success */
2384 	if (tmp < 0) {
2385 		fprintf(stderr, "Job '%s' has bad '%s' value.\n",
2386 			spdk_conf_section_get_name(s), name);
2387 		return BDEVPERF_CONFIG_ERROR;
2388 	}
2389 
2390 	return tmp;
2391 }
2392 
2393 /* CLI arguments override parameters for global sections */
2394 static void
2395 config_set_cli_args(struct job_config *config)
2396 {
2397 	if (g_job_bdev_name) {
2398 		config->filename = g_job_bdev_name;
2399 	}
2400 	if (g_io_size > 0) {
2401 		config->bs = g_io_size;
2402 	}
2403 	if (g_queue_depth > 0) {
2404 		config->iodepth = g_queue_depth;
2405 	}
2406 	if (g_rw_percentage > 0) {
2407 		config->rwmixread = g_rw_percentage;
2408 	}
2409 	if (g_workload_type) {
2410 		config->rw = parse_rw(g_workload_type, config->rw);
2411 	}
2412 }
2413 
2414 static int
2415 read_job_config(void)
2416 {
2417 	struct job_config global_default_config;
2418 	struct job_config global_config;
2419 	struct spdk_conf_section *s;
2420 	struct job_config *config = NULL;
2421 	const char *cpumask;
2422 	const char *rw;
2423 	bool is_global;
2424 	int n = 0;
2425 	int val;
2426 
2427 	if (g_bdevperf_conf_file == NULL) {
2428 		return 0;
2429 	}
2430 
2431 	g_bdevperf_conf = spdk_conf_allocate();
2432 	if (g_bdevperf_conf == NULL) {
2433 		fprintf(stderr, "Could not allocate job config structure\n");
2434 		return 1;
2435 	}
2436 
2437 	spdk_conf_disable_sections_merge(g_bdevperf_conf);
2438 	if (spdk_conf_read(g_bdevperf_conf, g_bdevperf_conf_file)) {
2439 		fprintf(stderr, "Invalid job config");
2440 		return 1;
2441 	}
2442 
2443 	/* Initialize global defaults */
2444 	global_default_config.filename = NULL;
2445 	/* Zero mask is the same as g_all_cpuset
2446 	 * The g_all_cpuset is not initialized yet,
2447 	 * so use zero mask as the default instead */
2448 	spdk_cpuset_zero(&global_default_config.cpumask);
2449 	global_default_config.bs = BDEVPERF_CONFIG_UNDEFINED;
2450 	global_default_config.iodepth = BDEVPERF_CONFIG_UNDEFINED;
2451 	/* bdevperf has no default for -M option but in FIO the default is 50 */
2452 	global_default_config.rwmixread = 50;
2453 	global_default_config.offset = 0;
2454 	/* length 0 means 100% */
2455 	global_default_config.length = 0;
2456 	global_default_config.rw = BDEVPERF_CONFIG_UNDEFINED;
2457 	config_set_cli_args(&global_default_config);
2458 
2459 	if ((int)global_default_config.rw == BDEVPERF_CONFIG_ERROR) {
2460 		return 1;
2461 	}
2462 
2463 	/* There is only a single instance of global job_config
2464 	 * We just reset its value when we encounter new [global] section */
2465 	global_config = global_default_config;
2466 
2467 	for (s = spdk_conf_first_section(g_bdevperf_conf);
2468 	     s != NULL;
2469 	     s = spdk_conf_next_section(s)) {
2470 		config = calloc(1, sizeof(*config));
2471 		if (config == NULL) {
2472 			fprintf(stderr, "Unable to allocate memory for job config\n");
2473 			return 1;
2474 		}
2475 
2476 		config->name = spdk_conf_section_get_name(s);
2477 		is_global = strcmp(config->name, "global") == 0;
2478 
2479 		if (is_global) {
2480 			global_config = global_default_config;
2481 		}
2482 
2483 		config->filename = spdk_conf_section_get_val(s, "filename");
2484 		if (config->filename == NULL) {
2485 			config->filename = global_config.filename;
2486 		}
2487 		if (!is_global) {
2488 			if (config->filename == NULL) {
2489 				fprintf(stderr, "Job '%s' expects 'filename' parameter\n", config->name);
2490 				goto error;
2491 			} else if (strnlen(config->filename, BDEVPERF_CONFIG_MAX_FILENAME)
2492 				   >= BDEVPERF_CONFIG_MAX_FILENAME) {
2493 				fprintf(stderr,
2494 					"filename for '%s' job is too long. Max length is %d\n",
2495 					config->name, BDEVPERF_CONFIG_MAX_FILENAME);
2496 				goto error;
2497 			}
2498 		}
2499 
2500 		cpumask = spdk_conf_section_get_val(s, "cpumask");
2501 		if (cpumask == NULL) {
2502 			config->cpumask = global_config.cpumask;
2503 		} else if (spdk_cpuset_parse(&config->cpumask, cpumask)) {
2504 			fprintf(stderr, "Job '%s' has bad 'cpumask' value\n", config->name);
2505 			goto error;
2506 		}
2507 
2508 		config->bs = parse_uint_option(s, "bs", global_config.bs);
2509 		if (config->bs == BDEVPERF_CONFIG_ERROR) {
2510 			goto error;
2511 		} else if (config->bs == 0) {
2512 			fprintf(stderr, "'bs' of job '%s' must be greater than 0\n", config->name);
2513 			goto error;
2514 		}
2515 
2516 		config->iodepth = parse_uint_option(s, "iodepth", global_config.iodepth);
2517 		if (config->iodepth == BDEVPERF_CONFIG_ERROR) {
2518 			goto error;
2519 		} else if (config->iodepth == 0) {
2520 			fprintf(stderr,
2521 				"'iodepth' of job '%s' must be greater than 0\n",
2522 				config->name);
2523 			goto error;
2524 		}
2525 
2526 		config->rwmixread = parse_uint_option(s, "rwmixread", global_config.rwmixread);
2527 		if (config->rwmixread == BDEVPERF_CONFIG_ERROR) {
2528 			goto error;
2529 		} else if (config->rwmixread > 100) {
2530 			fprintf(stderr,
2531 				"'rwmixread' value of '%s' job is not in 0-100 range\n",
2532 				config->name);
2533 			goto error;
2534 		}
2535 
2536 		config->offset = parse_uint_option(s, "offset", global_config.offset);
2537 		if (config->offset == BDEVPERF_CONFIG_ERROR) {
2538 			goto error;
2539 		}
2540 
2541 		val = parse_uint_option(s, "length", global_config.length);
2542 		if (val == BDEVPERF_CONFIG_ERROR) {
2543 			goto error;
2544 		}
2545 		config->length = val;
2546 
2547 		rw = spdk_conf_section_get_val(s, "rw");
2548 		config->rw = parse_rw(rw, global_config.rw);
2549 		if ((int)config->rw == BDEVPERF_CONFIG_ERROR) {
2550 			fprintf(stderr, "Job '%s' has bad 'rw' value\n", config->name);
2551 			goto error;
2552 		} else if (!is_global && (int)config->rw == BDEVPERF_CONFIG_UNDEFINED) {
2553 			fprintf(stderr, "Job '%s' has no 'rw' assigned\n", config->name);
2554 			goto error;
2555 		}
2556 
2557 		if (is_global) {
2558 			config_set_cli_args(config);
2559 			global_config = *config;
2560 			free(config);
2561 			config = NULL;
2562 		} else {
2563 			TAILQ_INSERT_TAIL(&job_config_list, config, link);
2564 			n++;
2565 		}
2566 	}
2567 
2568 	if (g_rpc_log_file_name != NULL) {
2569 		g_rpc_log_file = fopen(g_rpc_log_file_name, "a");
2570 		if (g_rpc_log_file == NULL) {
2571 			fprintf(stderr, "Failed to open %s\n", g_rpc_log_file_name);
2572 			goto error;
2573 		}
2574 	}
2575 
2576 	printf("Using job config with %d jobs\n", n);
2577 	return 0;
2578 error:
2579 	free(config);
2580 	return 1;
2581 }
2582 
2583 static void
2584 bdevperf_run(void *arg1)
2585 {
2586 	uint32_t i;
2587 
2588 	g_main_thread = spdk_get_thread();
2589 
2590 	spdk_cpuset_zero(&g_all_cpuset);
2591 	SPDK_ENV_FOREACH_CORE(i) {
2592 		spdk_cpuset_set_cpu(&g_all_cpuset, i, true);
2593 	}
2594 
2595 	if (g_wait_for_tests) {
2596 		/* Do not perform any tests until RPC is received */
2597 		return;
2598 	}
2599 
2600 	bdevperf_construct_job_configs();
2601 }
2602 
2603 static void
2604 rpc_perform_tests_reset(void)
2605 {
2606 	/* Reset g_run_rc to 0 for the next test run. */
2607 	g_run_rc = 0;
2608 
2609 	/* Reset g_stats to 0 for the next test run. */
2610 	memset(&g_stats, 0, sizeof(g_stats));
2611 
2612 	/* Reset g_show_performance_period_num to 0 for the next test run. */
2613 	g_show_performance_period_num = 0;
2614 }
2615 
2616 static void
2617 rpc_perform_tests_cb(void)
2618 {
2619 	struct spdk_jsonrpc_request *request = g_request;
2620 
2621 	g_request = NULL;
2622 
2623 	if (g_run_rc) {
2624 		spdk_jsonrpc_send_error_response_fmt(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2625 						     "bdevperf failed with error %s", spdk_strerror(-g_run_rc));
2626 	}
2627 
2628 	rpc_perform_tests_reset();
2629 }
2630 
2631 struct rpc_bdevperf_params {
2632 	int	time_in_sec;
2633 	char	*workload_type;
2634 	int	queue_depth;
2635 	char	*io_size;
2636 	int	rw_percentage;
2637 };
2638 
2639 static const struct spdk_json_object_decoder rpc_bdevperf_params_decoders[] = {
2640 	{"time_in_sec", offsetof(struct rpc_bdevperf_params, time_in_sec), spdk_json_decode_int32, true},
2641 	{"workload_type", offsetof(struct rpc_bdevperf_params, workload_type), spdk_json_decode_string, true},
2642 	{"queue_depth", offsetof(struct rpc_bdevperf_params, queue_depth), spdk_json_decode_int32, true},
2643 	{"io_size", offsetof(struct rpc_bdevperf_params, io_size), spdk_json_decode_string, true},
2644 	{"rw_percentage", offsetof(struct rpc_bdevperf_params, rw_percentage), spdk_json_decode_int32, true},
2645 };
2646 
2647 static void
2648 rpc_apply_bdevperf_params(struct rpc_bdevperf_params *params)
2649 {
2650 	if (params->workload_type) {
2651 		/* we need to clear previously settled parameter to avoid memory leak */
2652 		free(g_workload_type);
2653 		g_workload_type = strdup(params->workload_type);
2654 	}
2655 	if (params->queue_depth) {
2656 		g_queue_depth = params->queue_depth;
2657 	}
2658 	if (params->io_size) {
2659 		bdevperf_parse_arg('o', params->io_size);
2660 	}
2661 	if (params->time_in_sec) {
2662 		g_time_in_sec = params->time_in_sec;
2663 	}
2664 	if (params->rw_percentage) {
2665 		g_rw_percentage = params->rw_percentage;
2666 		g_mix_specified = true;
2667 	} else {
2668 		g_mix_specified = false;
2669 	}
2670 }
2671 
2672 static void
2673 rpc_perform_tests(struct spdk_jsonrpc_request *request, const struct spdk_json_val *params)
2674 {
2675 	struct rpc_bdevperf_params req = {}, backup = {};
2676 	int rc;
2677 
2678 	if (g_request != NULL) {
2679 		fprintf(stderr, "Another test is already in progress.\n");
2680 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
2681 						 spdk_strerror(-EINPROGRESS));
2682 		return;
2683 	}
2684 
2685 	if (params) {
2686 		if (spdk_json_decode_object_relaxed(params, rpc_bdevperf_params_decoders,
2687 						    SPDK_COUNTOF(rpc_bdevperf_params_decoders),
2688 						    &req)) {
2689 			spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2690 							 "spdk_json_decode_object failed");
2691 			return;
2692 		}
2693 
2694 		if (g_workload_type) {
2695 			backup.workload_type = strdup(g_workload_type);
2696 		}
2697 		backup.queue_depth = g_queue_depth;
2698 		if (asprintf(&backup.io_size, "%d", g_io_size) < 0) {
2699 			fprintf(stderr, "Couldn't allocate memory for queue depth");
2700 			goto rpc_error;
2701 		}
2702 		backup.time_in_sec = g_time_in_sec;
2703 		backup.rw_percentage = g_rw_percentage;
2704 
2705 		rpc_apply_bdevperf_params(&req);
2706 
2707 		free(req.workload_type);
2708 		free(req.io_size);
2709 	}
2710 
2711 	rc = verify_test_params();
2712 
2713 	if (rc) {
2714 		spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
2715 						 "Invalid parameters provided");
2716 		/* restore old params on error */
2717 		rpc_apply_bdevperf_params(&backup);
2718 		goto rpc_error;
2719 	}
2720 
2721 	g_request = request;
2722 
2723 	/* Only construct job configs at the first test run.  */
2724 	if (TAILQ_EMPTY(&job_config_list)) {
2725 		bdevperf_construct_job_configs();
2726 	} else {
2727 		bdevperf_construct_jobs();
2728 	}
2729 
2730 rpc_error:
2731 	free(backup.io_size);
2732 	free(backup.workload_type);
2733 }
2734 SPDK_RPC_REGISTER("perform_tests", rpc_perform_tests, SPDK_RPC_RUNTIME)
2735 
2736 static void
2737 _bdevperf_job_drain(void *ctx)
2738 {
2739 	bdevperf_job_drain(ctx);
2740 }
2741 
2742 static void
2743 spdk_bdevperf_shutdown_cb(void)
2744 {
2745 	g_shutdown = true;
2746 	struct bdevperf_job *job, *tmp;
2747 
2748 	if (g_bdevperf.running_jobs == 0) {
2749 		bdevperf_test_done(NULL);
2750 		return;
2751 	}
2752 
2753 	/* Iterate jobs to stop all I/O */
2754 	TAILQ_FOREACH_SAFE(job, &g_bdevperf.jobs, link, tmp) {
2755 		spdk_thread_send_msg(job->thread, _bdevperf_job_drain, job);
2756 	}
2757 }
2758 
2759 static int
2760 bdevperf_parse_arg(int ch, char *arg)
2761 {
2762 	long long tmp;
2763 
2764 	if (ch == 'w') {
2765 		g_workload_type = strdup(arg);
2766 	} else if (ch == 'T') {
2767 		g_job_bdev_name = arg;
2768 	} else if (ch == 'z') {
2769 		g_wait_for_tests = true;
2770 	} else if (ch == 'Z') {
2771 		g_zcopy = true;
2772 	} else if (ch == 'X') {
2773 		g_abort = true;
2774 	} else if (ch == 'C') {
2775 		g_multithread_mode = true;
2776 	} else if (ch == 'f') {
2777 		g_continue_on_failure = true;
2778 	} else if (ch == 'j') {
2779 		g_bdevperf_conf_file = arg;
2780 	} else if (ch == 'F') {
2781 		char *endptr;
2782 
2783 		errno = 0;
2784 		g_zipf_theta = strtod(arg, &endptr);
2785 		if (errno || arg == endptr || g_zipf_theta < 0) {
2786 			fprintf(stderr, "Illegal zipf theta value %s\n", arg);
2787 			return -EINVAL;
2788 		}
2789 	} else if (ch == 'l') {
2790 		g_latency_display_level++;
2791 	} else if (ch == 'D') {
2792 		g_random_map = true;
2793 	} else if (ch == 'E') {
2794 		g_one_thread_per_lcore = true;
2795 	} else if (ch == 'J') {
2796 		g_rpc_log_file_name = arg;
2797 	} else if (ch == 'o') {
2798 		uint64_t size;
2799 
2800 		if (spdk_parse_capacity(arg, &size, NULL) != 0) {
2801 			fprintf(stderr, "Invalid IO size: %s\n", arg);
2802 			return -EINVAL;
2803 		}
2804 		g_io_size = (int)size;
2805 	} else if (ch == 'U') {
2806 		g_unique_writes = true;
2807 	} else {
2808 		tmp = spdk_strtoll(arg, 10);
2809 		if (tmp < 0) {
2810 			fprintf(stderr, "Parse failed for the option %c.\n", ch);
2811 			return tmp;
2812 		} else if (tmp >= INT_MAX) {
2813 			fprintf(stderr, "Parsed option was too large %c.\n", ch);
2814 			return -ERANGE;
2815 		}
2816 
2817 		switch (ch) {
2818 		case 'q':
2819 			g_queue_depth = tmp;
2820 			break;
2821 		case 't':
2822 			g_time_in_sec = tmp;
2823 			break;
2824 		case 'k':
2825 			g_timeout_in_sec = tmp;
2826 			break;
2827 		case 'M':
2828 			g_rw_percentage = tmp;
2829 			g_mix_specified = true;
2830 			break;
2831 		case 'P':
2832 			g_show_performance_ema_period = tmp;
2833 			break;
2834 		case 'S':
2835 			g_summarize_performance = false;
2836 			g_show_performance_period_in_usec = tmp * SPDK_SEC_TO_USEC;
2837 			break;
2838 		default:
2839 			return -EINVAL;
2840 		}
2841 	}
2842 	return 0;
2843 }
2844 
2845 static void
2846 bdevperf_usage(void)
2847 {
2848 	printf(" -q <depth>                io depth\n");
2849 	printf(" -o <size>                 io size in bytes\n");
2850 	printf(" -w <type>                 io pattern type, must be one of " PATTERN_TYPES_STR "\n");
2851 	printf(" -t <time>                 time in seconds\n");
2852 	printf(" -k <timeout>              timeout in seconds to detect starved I/O (default is 0 and disabled)\n");
2853 	printf(" -M <percent>              rwmixread (100 for reads, 0 for writes)\n");
2854 	printf(" -P <num>                  number of moving average period\n");
2855 	printf("\t\t(If set to n, show weighted mean of the previous n IO/s in real time)\n");
2856 	printf("\t\t(Formula: M = 2 / (n + 1), EMA[i+1] = IO/s * M + (1 - M) * EMA[i])\n");
2857 	printf("\t\t(only valid with -S)\n");
2858 	printf(" -S <period>               show performance result in real time every <period> seconds\n");
2859 	printf(" -T <bdev>                 bdev to run against. Default: all available bdevs.\n");
2860 	printf(" -f                        continue processing I/O even after failures\n");
2861 	printf(" -F <zipf theta>           use zipf distribution for random I/O\n");
2862 	printf(" -Z                        enable using zcopy bdev API for read or write I/O\n");
2863 	printf(" -z                        start bdevperf, but wait for perform_tests RPC to start tests\n");
2864 	printf("                           (See examples/bdev/bdevperf/bdevperf.py)\n");
2865 	printf(" -X                        abort timed out I/O\n");
2866 	printf(" -C                        enable every core to send I/Os to each bdev\n");
2867 	printf(" -j <filename>             use job config file\n");
2868 	printf(" -l                        display latency histogram, default: disable. -l display summary, -ll display details\n");
2869 	printf(" -D                        use a random map for picking offsets not previously read or written (for all jobs)\n");
2870 	printf(" -E                        share per lcore thread among jobs. Available only if -j is not used.\n");
2871 	printf(" -J                        File name to open with append mode and log JSON RPC calls.\n");
2872 	printf(" -U                        generate unique data for each write I/O, has no effect on non-write I/O\n");
2873 }
2874 
2875 static void
2876 bdevperf_fini(void)
2877 {
2878 	free_job_config();
2879 	free(g_workload_type);
2880 
2881 	if (g_rpc_log_file != NULL) {
2882 		fclose(g_rpc_log_file);
2883 		g_rpc_log_file = NULL;
2884 	}
2885 }
2886 
2887 static int
2888 verify_test_params(void)
2889 {
2890 	if (!g_bdevperf_conf_file && g_queue_depth <= 0) {
2891 		goto out;
2892 	}
2893 	if (!g_bdevperf_conf_file && g_io_size <= 0) {
2894 		goto out;
2895 	}
2896 	if (!g_bdevperf_conf_file && !g_workload_type) {
2897 		goto out;
2898 	}
2899 	if (g_bdevperf_conf_file && g_one_thread_per_lcore) {
2900 		printf("If bdevperf's config file is used, per lcore thread cannot be used\n");
2901 		goto out;
2902 	}
2903 	if (g_time_in_sec <= 0) {
2904 		goto out;
2905 	}
2906 	g_time_in_usec = g_time_in_sec * SPDK_SEC_TO_USEC;
2907 
2908 	if (g_timeout_in_sec < 0) {
2909 		goto out;
2910 	}
2911 
2912 	if (g_abort && !g_timeout_in_sec) {
2913 		printf("Timeout must be set for abort option, Ignoring g_abort\n");
2914 	}
2915 
2916 	if (g_show_performance_ema_period > 0 && g_summarize_performance) {
2917 		fprintf(stderr, "-P option must be specified with -S option\n");
2918 		return 1;
2919 	}
2920 
2921 	if (g_io_size > SPDK_BDEV_LARGE_BUF_MAX_SIZE) {
2922 		printf("I/O size of %d is greater than zero copy threshold (%d).\n",
2923 		       g_io_size, SPDK_BDEV_LARGE_BUF_MAX_SIZE);
2924 		printf("Zero copy mechanism will not be used.\n");
2925 		g_zcopy = false;
2926 	}
2927 
2928 	if (g_bdevperf_conf_file) {
2929 		/* workload_type verification happens during config file parsing */
2930 		return 0;
2931 	}
2932 
2933 	if (!strcmp(g_workload_type, "verify") ||
2934 	    !strcmp(g_workload_type, "reset")) {
2935 		g_rw_percentage = 50;
2936 		g_verify = true;
2937 		if (!strcmp(g_workload_type, "reset")) {
2938 			g_reset = true;
2939 		}
2940 	}
2941 
2942 	if (!strcmp(g_workload_type, "read") ||
2943 	    !strcmp(g_workload_type, "randread") ||
2944 	    !strcmp(g_workload_type, "write") ||
2945 	    !strcmp(g_workload_type, "randwrite") ||
2946 	    !strcmp(g_workload_type, "verify") ||
2947 	    !strcmp(g_workload_type, "reset") ||
2948 	    !strcmp(g_workload_type, "unmap") ||
2949 	    !strcmp(g_workload_type, "write_zeroes") ||
2950 	    !strcmp(g_workload_type, "flush")) {
2951 		if (g_mix_specified) {
2952 			fprintf(stderr, "Ignoring -M option... Please use -M option"
2953 				" only when using rw or randrw.\n");
2954 		}
2955 	}
2956 
2957 	if (!strcmp(g_workload_type, "rw") ||
2958 	    !strcmp(g_workload_type, "randrw")) {
2959 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
2960 			fprintf(stderr,
2961 				"-M must be specified to value from 0 to 100 "
2962 				"for rw or randrw.\n");
2963 			return 1;
2964 		}
2965 	}
2966 
2967 	if (strcmp(g_workload_type, "randread") &&
2968 	    strcmp(g_workload_type, "randwrite") &&
2969 	    strcmp(g_workload_type, "randrw")) {
2970 		if (g_random_map) {
2971 			fprintf(stderr, "Ignoring -D option... Please use -D option"
2972 				" only when using randread, randwrite or randrw.\n");
2973 			return 1;
2974 		}
2975 	}
2976 
2977 	return 0;
2978 out:
2979 	return 1;
2980 }
2981 
2982 int
2983 main(int argc, char **argv)
2984 {
2985 	struct spdk_app_opts opts = {};
2986 	int rc;
2987 
2988 	/* Use the runtime PID to set the random seed */
2989 	srand(getpid());
2990 
2991 	spdk_app_opts_init(&opts, sizeof(opts));
2992 	opts.name = "bdevperf";
2993 	opts.rpc_addr = NULL;
2994 	opts.shutdown_cb = spdk_bdevperf_shutdown_cb;
2995 
2996 	if ((rc = spdk_app_parse_args(argc, argv, &opts, "Zzfq:o:t:w:k:CEF:J:M:P:S:T:Xlj:DU", NULL,
2997 				      bdevperf_parse_arg, bdevperf_usage)) !=
2998 	    SPDK_APP_PARSE_ARGS_SUCCESS) {
2999 		return rc;
3000 	}
3001 
3002 	/* Set the default address if no rpc_addr was provided in args
3003 	 * and RPC is used for starting tests */
3004 	if (g_wait_for_tests && opts.rpc_addr == NULL) {
3005 		opts.rpc_addr = SPDK_DEFAULT_RPC_ADDR;
3006 	}
3007 
3008 	if (read_job_config()) {
3009 		bdevperf_fini();
3010 		return 1;
3011 	}
3012 
3013 	if (g_rpc_log_file != NULL) {
3014 		opts.rpc_log_file = g_rpc_log_file;
3015 	}
3016 
3017 	if (verify_test_params() != 0 && !g_wait_for_tests) {
3018 		spdk_app_usage();
3019 		bdevperf_usage();
3020 		bdevperf_fini();
3021 		exit(1);
3022 	}
3023 
3024 	rc = spdk_app_start(&opts, bdevperf_run, NULL);
3025 
3026 	spdk_app_fini();
3027 	bdevperf_fini();
3028 	return rc;
3029 }
3030