xref: /spdk/examples/nvme/abort/abort.c (revision c6c1234de9e0015e670dd0b51bf6ce39ee0e07bd)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/env.h"
9 #include "spdk/log.h"
10 #include "spdk/nvme.h"
11 #include "spdk/queue.h"
12 #include "spdk/string.h"
13 #include "spdk/util.h"
14 #include "spdk/likely.h"
15 
16 #define ABORT_GETOPT_STRING "a:c:i:l:o:q:r:s:t:w:GM:T:"
17 struct ctrlr_entry {
18 	struct spdk_nvme_ctrlr		*ctrlr;
19 	enum spdk_nvme_transport_type	trtype;
20 
21 	TAILQ_ENTRY(ctrlr_entry)	link;
22 	char				name[1024];
23 };
24 
25 struct ns_entry {
26 	struct spdk_nvme_ctrlr		*ctrlr;
27 	struct spdk_nvme_ns		*ns;
28 
29 	TAILQ_ENTRY(ns_entry)		link;
30 	uint32_t			io_size_blocks;
31 	uint32_t			num_io_requests;
32 	uint64_t			size_in_ios;
33 	uint32_t			block_size;
34 	char				name[1024];
35 };
36 
37 struct ctrlr_worker_ctx {
38 	pthread_mutex_t			mutex;
39 	struct ctrlr_entry		*entry;
40 	uint64_t			abort_submitted;
41 	uint64_t			abort_submit_failed;
42 	uint64_t			successful_abort;
43 	uint64_t			unsuccessful_abort;
44 	uint64_t			abort_failed;
45 	uint64_t			current_queue_depth;
46 	struct spdk_nvme_ctrlr		*ctrlr;
47 	TAILQ_ENTRY(ctrlr_worker_ctx)	link;
48 };
49 
50 struct ns_worker_ctx {
51 	struct ns_entry			*entry;
52 	uint64_t			io_submitted;
53 	uint64_t			io_completed;
54 	uint64_t			io_aborted;
55 	uint64_t			io_failed;
56 	uint64_t			current_queue_depth;
57 	uint64_t			offset_in_ios;
58 	bool				is_draining;
59 	struct spdk_nvme_qpair		*qpair;
60 	struct ctrlr_worker_ctx		*ctrlr_ctx;
61 	TAILQ_ENTRY(ns_worker_ctx)	link;
62 };
63 
64 struct perf_task {
65 	struct ns_worker_ctx		*ns_ctx;
66 	void				*buf;
67 };
68 
69 struct worker_thread {
70 	TAILQ_HEAD(, ns_worker_ctx)	ns_ctx;
71 	TAILQ_HEAD(, ctrlr_worker_ctx)	ctrlr_ctx;
72 	TAILQ_ENTRY(worker_thread)	link;
73 	unsigned			lcore;
74 	int				status;
75 };
76 
77 static const char *g_workload_type = "read";
78 static TAILQ_HEAD(, ctrlr_entry) g_controllers = TAILQ_HEAD_INITIALIZER(g_controllers);
79 static TAILQ_HEAD(, ns_entry) g_namespaces = TAILQ_HEAD_INITIALIZER(g_namespaces);
80 static int g_num_namespaces;
81 static TAILQ_HEAD(, worker_thread) g_workers = TAILQ_HEAD_INITIALIZER(g_workers);
82 static int g_num_workers = 0;
83 static uint32_t g_main_core;
84 
85 static int g_abort_interval = 1;
86 
87 static uint64_t g_tsc_rate;
88 
89 static uint32_t g_io_size_bytes = 131072;
90 static uint32_t g_max_io_size_blocks;
91 static int g_rw_percentage = -1;
92 static int g_is_random;
93 static int g_queue_depth = 128;
94 static int g_time_in_sec = 3;
95 static int g_dpdk_mem;
96 static int g_shm_id = -1;
97 static bool g_no_pci;
98 static bool g_warn;
99 static bool g_mix_specified;
100 static bool g_no_hugepages;
101 
102 static const char *g_core_mask;
103 
104 static const struct option g_abort_cmdline_opts[] = {
105 #define ABORT_NO_HUGE        257
106 	{"no-huge",			no_argument,	NULL, ABORT_NO_HUGE},
107 	{0, 0, 0, 0}
108 };
109 
110 struct trid_entry {
111 	struct spdk_nvme_transport_id	trid;
112 	uint16_t			nsid;
113 	TAILQ_ENTRY(trid_entry)		tailq;
114 };
115 
116 static TAILQ_HEAD(, trid_entry) g_trid_list = TAILQ_HEAD_INITIALIZER(g_trid_list);
117 
118 static void io_complete(void *ctx, const struct spdk_nvme_cpl *cpl);
119 
120 static int
121 build_nvme_name(char *name, size_t length, struct spdk_nvme_ctrlr *ctrlr)
122 {
123 	const struct spdk_nvme_transport_id *trid;
124 	int res = 0;
125 
126 	trid = spdk_nvme_ctrlr_get_transport_id(ctrlr);
127 
128 	switch (trid->trtype) {
129 	case SPDK_NVME_TRANSPORT_PCIE:
130 		res = snprintf(name, length, "PCIE (%s)", trid->traddr);
131 		break;
132 	case SPDK_NVME_TRANSPORT_RDMA:
133 		res = snprintf(name, length, "RDMA (addr:%s subnqn:%s)", trid->traddr, trid->subnqn);
134 		break;
135 	case SPDK_NVME_TRANSPORT_TCP:
136 		res = snprintf(name, length, "TCP (addr:%s subnqn:%s)", trid->traddr, trid->subnqn);
137 		break;
138 	case SPDK_NVME_TRANSPORT_CUSTOM:
139 		res = snprintf(name, length, "CUSTOM (%s)", trid->traddr);
140 		break;
141 
142 	default:
143 		fprintf(stderr, "Unknown transport type %d\n", trid->trtype);
144 		break;
145 	}
146 	return res;
147 }
148 
149 static void
150 build_nvme_ns_name(char *name, size_t length, struct spdk_nvme_ctrlr *ctrlr, uint32_t nsid)
151 {
152 	int res = 0;
153 
154 	res = build_nvme_name(name, length, ctrlr);
155 	if (res > 0) {
156 		snprintf(name + res, length - res, " NSID %u", nsid);
157 	}
158 
159 }
160 
161 static void
162 register_ns(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_ns *ns)
163 {
164 	struct ns_entry *entry;
165 	const struct spdk_nvme_ctrlr_data *cdata;
166 	uint32_t max_xfer_size, entries, sector_size;
167 	uint64_t ns_size;
168 	struct spdk_nvme_io_qpair_opts opts;
169 
170 	cdata = spdk_nvme_ctrlr_get_data(ctrlr);
171 
172 	if (!spdk_nvme_ns_is_active(ns)) {
173 		printf("Controller %-20.20s (%-20.20s): Skipping inactive NS %u\n",
174 		       cdata->mn, cdata->sn,
175 		       spdk_nvme_ns_get_id(ns));
176 		g_warn = true;
177 		return;
178 	}
179 
180 	ns_size = spdk_nvme_ns_get_size(ns);
181 	sector_size = spdk_nvme_ns_get_sector_size(ns);
182 
183 	if (ns_size < g_io_size_bytes || sector_size > g_io_size_bytes) {
184 		printf("WARNING: controller %-20.20s (%-20.20s) ns %u has invalid "
185 		       "ns size %" PRIu64 " / block size %u for I/O size %u\n",
186 		       cdata->mn, cdata->sn, spdk_nvme_ns_get_id(ns),
187 		       ns_size, spdk_nvme_ns_get_sector_size(ns), g_io_size_bytes);
188 		g_warn = true;
189 		return;
190 	}
191 
192 	max_xfer_size = spdk_nvme_ns_get_max_io_xfer_size(ns);
193 	spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
194 	/* NVMe driver may add additional entries based on
195 	 * stripe size and maximum transfer size, we assume
196 	 * 1 more entry be used for stripe.
197 	 */
198 	entries = (g_io_size_bytes - 1) / max_xfer_size + 2;
199 	if ((g_queue_depth * entries) > opts.io_queue_size) {
200 		printf("controller IO queue size %u less than required\n",
201 		       opts.io_queue_size);
202 		printf("Consider using lower queue depth or small IO size because "
203 		       "IO requests may be queued at the NVMe driver.\n");
204 	}
205 	/* For requests which have children requests, parent request itself
206 	 * will also occupy 1 entry.
207 	 */
208 	entries += 1;
209 
210 	entry = calloc(1, sizeof(struct ns_entry));
211 	if (entry == NULL) {
212 		perror("ns_entry malloc");
213 		exit(1);
214 	}
215 
216 	entry->ctrlr = ctrlr;
217 	entry->ns = ns;
218 	entry->num_io_requests = g_queue_depth * entries;
219 
220 	entry->size_in_ios = ns_size / g_io_size_bytes;
221 	entry->io_size_blocks = g_io_size_bytes / sector_size;
222 
223 	entry->block_size = spdk_nvme_ns_get_sector_size(ns);
224 
225 	if (g_max_io_size_blocks < entry->io_size_blocks) {
226 		g_max_io_size_blocks = entry->io_size_blocks;
227 	}
228 
229 	build_nvme_ns_name(entry->name, sizeof(entry->name), ctrlr, spdk_nvme_ns_get_id(ns));
230 
231 	g_num_namespaces++;
232 	TAILQ_INSERT_TAIL(&g_namespaces, entry, link);
233 }
234 
235 static void
236 unregister_namespaces(void)
237 {
238 	struct ns_entry *entry, *tmp;
239 
240 	TAILQ_FOREACH_SAFE(entry, &g_namespaces, link, tmp) {
241 		TAILQ_REMOVE(&g_namespaces, entry, link);
242 		free(entry);
243 	}
244 }
245 
246 static void
247 register_ctrlr(struct spdk_nvme_ctrlr *ctrlr, struct trid_entry *trid_entry)
248 {
249 	struct spdk_nvme_ns *ns;
250 	struct ctrlr_entry *entry = malloc(sizeof(struct ctrlr_entry));
251 	uint32_t nsid;
252 
253 	if (entry == NULL) {
254 		perror("ctrlr_entry malloc");
255 		exit(1);
256 	}
257 
258 	build_nvme_name(entry->name, sizeof(entry->name), ctrlr);
259 
260 	entry->ctrlr = ctrlr;
261 	entry->trtype = trid_entry->trid.trtype;
262 	TAILQ_INSERT_TAIL(&g_controllers, entry, link);
263 
264 	if (trid_entry->nsid == 0) {
265 		for (nsid = spdk_nvme_ctrlr_get_first_active_ns(ctrlr);
266 		     nsid != 0; nsid = spdk_nvme_ctrlr_get_next_active_ns(ctrlr, nsid)) {
267 			ns = spdk_nvme_ctrlr_get_ns(ctrlr, nsid);
268 			if (ns == NULL) {
269 				continue;
270 			}
271 			register_ns(ctrlr, ns);
272 		}
273 	} else {
274 		ns = spdk_nvme_ctrlr_get_ns(ctrlr, trid_entry->nsid);
275 		if (!ns) {
276 			perror("Namespace does not exist.");
277 			exit(1);
278 		}
279 
280 		register_ns(ctrlr, ns);
281 	}
282 }
283 
284 static void
285 abort_complete(void *ctx, const struct spdk_nvme_cpl *cpl)
286 {
287 	struct ctrlr_worker_ctx	*ctrlr_ctx = ctx;
288 
289 	ctrlr_ctx->current_queue_depth--;
290 	if (spdk_unlikely(spdk_nvme_cpl_is_error(cpl))) {
291 		ctrlr_ctx->abort_failed++;
292 	} else if ((cpl->cdw0 & 0x1) == 0) {
293 		ctrlr_ctx->successful_abort++;
294 	} else {
295 		ctrlr_ctx->unsuccessful_abort++;
296 	}
297 }
298 
299 static void
300 abort_task(struct perf_task *task)
301 {
302 	struct ns_worker_ctx	*ns_ctx = task->ns_ctx;
303 	struct ctrlr_worker_ctx	*ctrlr_ctx = ns_ctx->ctrlr_ctx;
304 	int			rc;
305 
306 	/* Hold mutex to guard ctrlr_ctx->current_queue_depth. */
307 	pthread_mutex_lock(&ctrlr_ctx->mutex);
308 
309 	rc = spdk_nvme_ctrlr_cmd_abort_ext(ctrlr_ctx->ctrlr, ns_ctx->qpair, task, abort_complete,
310 					   ctrlr_ctx);
311 
312 	if (spdk_unlikely(rc != 0)) {
313 		ctrlr_ctx->abort_submit_failed++;
314 	} else {
315 		ctrlr_ctx->current_queue_depth++;
316 		ctrlr_ctx->abort_submitted++;
317 	}
318 
319 	pthread_mutex_unlock(&ctrlr_ctx->mutex);
320 }
321 
322 static __thread unsigned int seed = 0;
323 
324 static inline void
325 submit_single_io(struct perf_task *task)
326 {
327 	uint64_t		offset_in_ios, lba;
328 	int			rc;
329 	struct ns_worker_ctx	*ns_ctx = task->ns_ctx;
330 	struct ns_entry		*entry = ns_ctx->entry;
331 
332 	if (g_is_random) {
333 		offset_in_ios = rand_r(&seed) % entry->size_in_ios;
334 	} else {
335 		offset_in_ios = ns_ctx->offset_in_ios++;
336 		if (ns_ctx->offset_in_ios == entry->size_in_ios) {
337 			ns_ctx->offset_in_ios = 0;
338 		}
339 	}
340 
341 	lba = offset_in_ios * entry->io_size_blocks;
342 
343 	if ((g_rw_percentage == 100) ||
344 	    (g_rw_percentage != 0 && (rand_r(&seed) % 100) < g_rw_percentage)) {
345 		rc = spdk_nvme_ns_cmd_read(entry->ns, ns_ctx->qpair, task->buf,
346 					   lba, entry->io_size_blocks, io_complete, task, 0);
347 	} else {
348 		rc = spdk_nvme_ns_cmd_write(entry->ns, ns_ctx->qpair, task->buf,
349 					    lba, entry->io_size_blocks, io_complete, task, 0);
350 	}
351 
352 	if (spdk_unlikely(rc != 0)) {
353 		fprintf(stderr, "I/O submission failed\n");
354 	} else {
355 		ns_ctx->current_queue_depth++;
356 		ns_ctx->io_submitted++;
357 
358 		if ((ns_ctx->io_submitted % g_abort_interval) == 0) {
359 			abort_task(task);
360 		}
361 	}
362 
363 }
364 
365 static void
366 io_complete(void *ctx, const struct spdk_nvme_cpl *cpl)
367 {
368 	struct perf_task	*task = ctx;
369 	struct ns_worker_ctx	*ns_ctx = task->ns_ctx;
370 
371 	ns_ctx->current_queue_depth--;
372 	if (spdk_unlikely(spdk_nvme_cpl_is_error(cpl))) {
373 		ns_ctx->io_failed++;
374 	} else {
375 		ns_ctx->io_completed++;
376 	}
377 
378 	/* is_draining indicates when time has expired for the test run and we are
379 	 * just waiting for the previously submitted I/O to complete. In this case,
380 	 * do not submit a new I/O to replace the one just completed.
381 	 */
382 	if (spdk_unlikely(ns_ctx->is_draining)) {
383 		spdk_dma_free(task->buf);
384 		free(task);
385 	} else {
386 		submit_single_io(task);
387 	}
388 }
389 
390 static struct perf_task *
391 allocate_task(struct ns_worker_ctx *ns_ctx)
392 {
393 	struct perf_task *task;
394 
395 	task = calloc(1, sizeof(*task));
396 	if (task == NULL) {
397 		fprintf(stderr, "Failed to allocate task\n");
398 		exit(1);
399 	}
400 
401 	task->buf = spdk_dma_zmalloc(g_io_size_bytes, 0x200, NULL);
402 	if (task->buf == NULL) {
403 		free(task);
404 		fprintf(stderr, "Failed to allocate task->buf\n");
405 		exit(1);
406 	}
407 
408 	task->ns_ctx = ns_ctx;
409 
410 	return task;
411 }
412 
413 static void
414 submit_io(struct ns_worker_ctx *ns_ctx, int queue_depth)
415 {
416 	struct perf_task *task;
417 
418 	while (queue_depth-- > 0) {
419 		task = allocate_task(ns_ctx);
420 		submit_single_io(task);
421 	}
422 }
423 
424 static int
425 work_fn(void *arg)
426 {
427 	struct worker_thread *worker = (struct worker_thread *)arg;
428 	struct ns_worker_ctx *ns_ctx;
429 	struct ctrlr_worker_ctx *ctrlr_ctx;
430 	struct ns_entry *ns_entry;
431 	struct spdk_nvme_io_qpair_opts opts;
432 	uint64_t tsc_end;
433 	uint32_t unfinished_ctx;
434 	int rc = 0;
435 
436 	/* Allocate queue pair for each namespace. */
437 	TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
438 		ns_entry = ns_ctx->entry;
439 
440 		spdk_nvme_ctrlr_get_default_io_qpair_opts(ns_entry->ctrlr, &opts, sizeof(opts));
441 		if (opts.io_queue_requests < ns_entry->num_io_requests) {
442 			opts.io_queue_requests = ns_entry->num_io_requests;
443 		}
444 
445 		ns_ctx->qpair = spdk_nvme_ctrlr_alloc_io_qpair(ns_entry->ctrlr, &opts, sizeof(opts));
446 		if (ns_ctx->qpair == NULL) {
447 			fprintf(stderr, "spdk_nvme_ctrlr_alloc_io_qpair failed\n");
448 			worker->status = -ENOMEM;
449 			goto out;
450 		}
451 	}
452 
453 	tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
454 
455 	/* Submit initial I/O for each namespace. */
456 	TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
457 		submit_io(ns_ctx, g_queue_depth);
458 	}
459 
460 	while (1) {
461 		TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
462 			rc = spdk_nvme_qpair_process_completions(ns_ctx->qpair, 0);
463 			if (rc < 0) {
464 				fprintf(stderr, "spdk_nvme_qpair_process_completions returned "
465 					"%d\n", rc);
466 				worker->status = rc;
467 				goto out;
468 			}
469 		}
470 
471 		if (worker->lcore == g_main_core) {
472 			TAILQ_FOREACH(ctrlr_ctx, &worker->ctrlr_ctx, link) {
473 				/* Hold mutex to guard ctrlr_ctx->current_queue_depth. */
474 				pthread_mutex_lock(&ctrlr_ctx->mutex);
475 				rc = spdk_nvme_ctrlr_process_admin_completions(ctrlr_ctx->ctrlr);
476 				pthread_mutex_unlock(&ctrlr_ctx->mutex);
477 				if (rc < 0) {
478 					fprintf(stderr, "spdk_nvme_ctrlr_process_admin_completions "
479 						"returned %d\n", rc);
480 					worker->status = rc;
481 					goto out;
482 				}
483 			}
484 		}
485 
486 		if (spdk_get_ticks() > tsc_end) {
487 			break;
488 		}
489 	}
490 
491 	do {
492 		unfinished_ctx = 0;
493 
494 		TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
495 			if (!ns_ctx->is_draining) {
496 				ns_ctx->is_draining = true;
497 			}
498 			if (ns_ctx->current_queue_depth > 0) {
499 				rc = spdk_nvme_qpair_process_completions(ns_ctx->qpair, 0);
500 				if (rc < 0) {
501 					fprintf(stderr, "spdk_nvme_qpair_process_completions "
502 						"returned %d\n", rc);
503 					worker->status = rc;
504 					goto out;
505 				}
506 				unfinished_ctx++;
507 			}
508 		}
509 	} while (unfinished_ctx > 0);
510 
511 	if (worker->lcore == g_main_core) {
512 		do {
513 			unfinished_ctx = 0;
514 
515 			TAILQ_FOREACH(ctrlr_ctx, &worker->ctrlr_ctx, link) {
516 				pthread_mutex_lock(&ctrlr_ctx->mutex);
517 				if (ctrlr_ctx->current_queue_depth > 0) {
518 					rc = spdk_nvme_ctrlr_process_admin_completions(ctrlr_ctx->ctrlr);
519 					unfinished_ctx++;
520 				}
521 				pthread_mutex_unlock(&ctrlr_ctx->mutex);
522 				if (rc < 0) {
523 					fprintf(stderr, "spdk_nvme_ctrlr_process_admin_completions "
524 						"returned %d\n", rc);
525 					worker->status = rc;
526 					goto out;
527 				}
528 			}
529 		} while (unfinished_ctx > 0);
530 	}
531 out:
532 	TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
533 		/* Make sure we don't submit any IOs at this point */
534 		ns_ctx->is_draining = true;
535 		spdk_nvme_ctrlr_free_io_qpair(ns_ctx->qpair);
536 	}
537 
538 	return worker->status != 0;
539 }
540 
541 static void
542 usage(char *program_name)
543 {
544 	printf("%s options", program_name);
545 
546 	printf("\n");
547 	printf("\t[-q io depth]\n");
548 	printf("\t[-o io size in bytes]\n");
549 	printf("\t[-w io pattern type, must be one of\n");
550 	printf("\t\t(read, write, randread, randwrite, rw, randrw)]\n");
551 	printf("\t[-M rwmixread (100 for reads, 0 for writes)]\n");
552 	printf("\t[-t time in seconds]\n");
553 	printf("\t[-c core mask for I/O submission/completion.]\n");
554 	printf("\t\t(default: 1)\n");
555 	printf("\t[-r Transport ID for local PCIe NVMe or NVMeoF]\n");
556 	printf("\t Format: 'key:value [key:value] ...'\n");
557 	printf("\t Keys:\n");
558 	printf("\t  trtype      Transport type (e.g. PCIe, RDMA)\n");
559 	printf("\t  adrfam      Address family (e.g. IPv4, IPv6)\n");
560 	printf("\t  traddr      Transport address (e.g. 0000:04:00.0 for PCIe or 192.168.100.8 for RDMA)\n");
561 	printf("\t  trsvcid     Transport service identifier (e.g. 4420)\n");
562 	printf("\t  subnqn      Subsystem NQN (default: %s)\n", SPDK_NVMF_DISCOVERY_NQN);
563 	printf("\t Example: -r 'trtype:PCIe traddr:0000:04:00.0' for PCIe or\n");
564 	printf("\t          -r 'trtype:RDMA adrfam:IPv4 traddr:192.168.100.8 trsvcid:4420' for NVMeoF\n");
565 	printf("\t[-s DPDK huge memory size in MB.]\n");
566 	printf("\t[-i shared memory group ID]\n");
567 	printf("\t[-a abort interval.]\n");
568 	printf("\t[--no-huge SPDK is run without hugepages\n");
569 	printf("\t");
570 	spdk_log_usage(stdout, "-T");
571 #ifdef DEBUG
572 	printf("\t[-G enable debug logging]\n");
573 #else
574 	printf("\t[-G enable debug logging (flag disabled, must reconfigure with --enable-debug)]\n");
575 #endif
576 	printf("\t[-l log level]\n");
577 	printf("\t Available log levels:\n");
578 	printf("\t  disabled, error, warning, notice, info, debug\n");
579 }
580 
581 static void
582 unregister_trids(void)
583 {
584 	struct trid_entry *trid_entry, *tmp;
585 
586 	TAILQ_FOREACH_SAFE(trid_entry, &g_trid_list, tailq, tmp) {
587 		TAILQ_REMOVE(&g_trid_list, trid_entry, tailq);
588 		free(trid_entry);
589 	}
590 }
591 
592 static int
593 add_trid(const char *trid_str)
594 {
595 	struct trid_entry *trid_entry;
596 	struct spdk_nvme_transport_id *trid;
597 	char *ns;
598 
599 	trid_entry = calloc(1, sizeof(*trid_entry));
600 	if (trid_entry == NULL) {
601 		return -1;
602 	}
603 
604 	trid = &trid_entry->trid;
605 	trid->trtype = SPDK_NVME_TRANSPORT_PCIE;
606 	snprintf(trid->subnqn, sizeof(trid->subnqn), "%s", SPDK_NVMF_DISCOVERY_NQN);
607 
608 	if (spdk_nvme_transport_id_parse(trid, trid_str) != 0) {
609 		fprintf(stderr, "Invalid transport ID format '%s'\n", trid_str);
610 		free(trid_entry);
611 		return 1;
612 	}
613 
614 	spdk_nvme_transport_id_populate_trstring(trid,
615 			spdk_nvme_transport_id_trtype_str(trid->trtype));
616 
617 	ns = strcasestr(trid_str, "ns:");
618 	if (ns) {
619 		char nsid_str[6]; /* 5 digits maximum in an nsid */
620 		int len;
621 		int nsid;
622 
623 		ns += 3;
624 
625 		len = strcspn(ns, " \t\n");
626 		if (len > 5) {
627 			fprintf(stderr, "NVMe namespace IDs must be 5 digits or less\n");
628 			free(trid_entry);
629 			return 1;
630 		}
631 
632 		memcpy(nsid_str, ns, len);
633 		nsid_str[len] = '\0';
634 
635 		nsid = spdk_strtol(nsid_str, 10);
636 		if (nsid <= 0 || nsid > 65535) {
637 			fprintf(stderr, "NVMe namespace IDs must be less than 65536 and greater than 0\n");
638 			free(trid_entry);
639 			return 1;
640 		}
641 
642 		trid_entry->nsid = (uint16_t)nsid;
643 	}
644 
645 	TAILQ_INSERT_TAIL(&g_trid_list, trid_entry, tailq);
646 	return 0;
647 }
648 
649 static int
650 parse_args(int argc, char **argv)
651 {
652 	int op, opt_index;
653 	long int val;
654 	int rc;
655 
656 	while ((op = getopt_long(argc, argv, ABORT_GETOPT_STRING, g_abort_cmdline_opts,
657 				 &opt_index)) != -1) {
658 		switch (op) {
659 		case 'a':
660 		case 'i':
661 		case 'o':
662 		case 'q':
663 		case 's':
664 		case 't':
665 		case 'M':
666 			val = spdk_strtol(optarg, 10);
667 			if (val < 0) {
668 				fprintf(stderr, "Converting a string to integer failed\n");
669 				return val;
670 			}
671 			switch (op) {
672 			case 'a':
673 				g_abort_interval = val;
674 				break;
675 			case 'i':
676 				g_shm_id = val;
677 				break;
678 			case 'o':
679 				g_io_size_bytes = val;
680 				break;
681 			case 'q':
682 				g_queue_depth = val;
683 				break;
684 			case 's':
685 				g_dpdk_mem = val;
686 				break;
687 			case 't':
688 				g_time_in_sec = val;
689 				break;
690 			case 'M':
691 				g_rw_percentage = val;
692 				g_mix_specified = true;
693 				break;
694 			}
695 			break;
696 		case 'c':
697 			g_core_mask = optarg;
698 			break;
699 		case 'r':
700 			if (add_trid(optarg)) {
701 				usage(argv[0]);
702 				return 1;
703 			}
704 			break;
705 		case 'w':
706 			g_workload_type = optarg;
707 			break;
708 		case 'G':
709 #ifndef DEBUG
710 			fprintf(stderr, "%s must be configured with --enable-debug for -G flag\n",
711 				argv[0]);
712 			usage(argv[0]);
713 			return 1;
714 #else
715 			spdk_log_set_flag("nvme");
716 			spdk_log_set_print_level(SPDK_LOG_DEBUG);
717 			break;
718 #endif
719 		case 'T':
720 			rc = spdk_log_set_flag(optarg);
721 			if (rc < 0) {
722 				fprintf(stderr, "unknown flag\n");
723 				usage(argv[0]);
724 				exit(EXIT_FAILURE);
725 			}
726 #ifdef DEBUG
727 			spdk_log_set_print_level(SPDK_LOG_DEBUG);
728 #endif
729 			break;
730 		case 'l':
731 			if (!strcmp(optarg, "disabled")) {
732 				spdk_log_set_print_level(SPDK_LOG_DISABLED);
733 			} else if (!strcmp(optarg, "error")) {
734 				spdk_log_set_print_level(SPDK_LOG_ERROR);
735 			} else if (!strcmp(optarg, "warning")) {
736 				spdk_log_set_print_level(SPDK_LOG_WARN);
737 			} else if (!strcmp(optarg, "notice")) {
738 				spdk_log_set_print_level(SPDK_LOG_NOTICE);
739 			} else if (!strcmp(optarg, "info")) {
740 				spdk_log_set_print_level(SPDK_LOG_INFO);
741 			} else if (!strcmp(optarg, "debug")) {
742 				spdk_log_set_print_level(SPDK_LOG_DEBUG);
743 			} else {
744 				fprintf(stderr, "Unrecognized log level: %s\n", optarg);
745 				return 1;
746 			}
747 			break;
748 		case ABORT_NO_HUGE:
749 			g_no_hugepages = true;
750 			break;
751 		default:
752 			usage(argv[0]);
753 			return 1;
754 		}
755 	}
756 
757 	if (!g_queue_depth) {
758 		fprintf(stderr, "missing -q (queue size) operand\n");
759 		usage(argv[0]);
760 		return 1;
761 	}
762 	if (!g_io_size_bytes) {
763 		fprintf(stderr, "missing -o (block size) operand\n");
764 		usage(argv[0]);
765 		return 1;
766 	}
767 	if (!g_workload_type) {
768 		fprintf(stderr, "missing -t (test time in seconds) operand\n");
769 		usage(argv[0]);
770 		return 1;
771 	}
772 
773 	if (!g_time_in_sec) {
774 		usage(argv[0]);
775 		return 1;
776 	}
777 
778 	if (strncmp(g_workload_type, "rand", 4) == 0) {
779 		g_is_random = 1;
780 		g_workload_type = &g_workload_type[4];
781 	}
782 
783 	if (strcmp(g_workload_type, "read") == 0 || strcmp(g_workload_type, "write") == 0) {
784 		g_rw_percentage = strcmp(g_workload_type, "read") == 0 ? 100 : 0;
785 		if (g_mix_specified) {
786 			fprintf(stderr, "Ignoring -M option... Please use -M option"
787 				" only when using rw or randrw.\n");
788 		}
789 	} else if (strcmp(g_workload_type, "rw") == 0) {
790 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
791 			fprintf(stderr,
792 				"-M must be specified to value from 0 to 100 "
793 				"for rw or randrw.\n");
794 			return 1;
795 		}
796 	} else {
797 		fprintf(stderr,
798 			"io pattern type must be one of\n"
799 			"(read, write, randread, randwrite, rw, randrw)\n");
800 		return 1;
801 	}
802 
803 	if (TAILQ_EMPTY(&g_trid_list)) {
804 		/* If no transport IDs specified, default to enumerating all local PCIe devices */
805 		add_trid("trtype:PCIe");
806 	} else {
807 		struct trid_entry *trid_entry, *trid_entry_tmp;
808 
809 		g_no_pci = true;
810 		/* check whether there is local PCIe type */
811 		TAILQ_FOREACH_SAFE(trid_entry, &g_trid_list, tailq, trid_entry_tmp) {
812 			if (trid_entry->trid.trtype == SPDK_NVME_TRANSPORT_PCIE) {
813 				g_no_pci = false;
814 				break;
815 			}
816 		}
817 	}
818 
819 	return 0;
820 }
821 
822 static int
823 register_workers(void)
824 {
825 	uint32_t i;
826 	struct worker_thread *worker;
827 
828 	SPDK_ENV_FOREACH_CORE(i) {
829 		worker = calloc(1, sizeof(*worker));
830 		if (worker == NULL) {
831 			fprintf(stderr, "Unable to allocate worker\n");
832 			return -1;
833 		}
834 
835 		TAILQ_INIT(&worker->ns_ctx);
836 		TAILQ_INIT(&worker->ctrlr_ctx);
837 		worker->lcore = i;
838 		TAILQ_INSERT_TAIL(&g_workers, worker, link);
839 		g_num_workers++;
840 	}
841 
842 	return 0;
843 }
844 
845 static void
846 unregister_workers(void)
847 {
848 	struct worker_thread *worker, *tmp_worker;
849 	struct ns_worker_ctx *ns_ctx, *tmp_ns_ctx;
850 	struct ctrlr_worker_ctx *ctrlr_ctx, *tmp_ctrlr_ctx;
851 
852 	/* Free namespace context and worker thread */
853 	TAILQ_FOREACH_SAFE(worker, &g_workers, link, tmp_worker) {
854 		TAILQ_REMOVE(&g_workers, worker, link);
855 
856 		TAILQ_FOREACH_SAFE(ns_ctx, &worker->ns_ctx, link, tmp_ns_ctx) {
857 			TAILQ_REMOVE(&worker->ns_ctx, ns_ctx, link);
858 			printf("NS: %s I/O completed: %" PRIu64 ", failed: %" PRIu64 "\n",
859 			       ns_ctx->entry->name, ns_ctx->io_completed, ns_ctx->io_failed);
860 			free(ns_ctx);
861 		}
862 
863 		TAILQ_FOREACH_SAFE(ctrlr_ctx, &worker->ctrlr_ctx, link, tmp_ctrlr_ctx) {
864 			TAILQ_REMOVE(&worker->ctrlr_ctx, ctrlr_ctx, link);
865 			printf("CTRLR: %s abort submitted %" PRIu64 ", failed to submit %" PRIu64 "\n",
866 			       ctrlr_ctx->entry->name, ctrlr_ctx->abort_submitted,
867 			       ctrlr_ctx->abort_submit_failed);
868 			printf("\t success %" PRIu64 ", unsuccessful %" PRIu64 ", failed %" PRIu64 "\n",
869 			       ctrlr_ctx->successful_abort, ctrlr_ctx->unsuccessful_abort,
870 			       ctrlr_ctx->abort_failed);
871 			free(ctrlr_ctx);
872 		}
873 
874 		free(worker);
875 	}
876 }
877 
878 static bool
879 probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
880 	 struct spdk_nvme_ctrlr_opts *opts)
881 {
882 	uint16_t min_aq_size;
883 
884 	/* We need to make sure the admin queue is big enough to handle all of the aborts that
885 	 * will be sent by this test app.  We add a few extra entries to account for any admin
886 	 * commands other than the aborts. */
887 	min_aq_size = spdk_divide_round_up(g_queue_depth, g_abort_interval) + 8;
888 	opts->admin_queue_size = spdk_max(opts->admin_queue_size, min_aq_size);
889 
890 	/* Avoid possible nvme_qpair_abort_queued_reqs_with_cbarg ERROR when IO queue size is 128. */
891 	opts->disable_error_logging = true;
892 
893 	return true;
894 }
895 
896 static void
897 attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
898 	  struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
899 {
900 	struct trid_entry       *trid_entry = cb_ctx;
901 	struct spdk_pci_addr    pci_addr;
902 	struct spdk_pci_device  *pci_dev;
903 	struct spdk_pci_id      pci_id;
904 
905 	if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
906 		printf("Attached to NVMe over Fabrics controller at %s:%s: %s\n",
907 		       trid->traddr, trid->trsvcid,
908 		       trid->subnqn);
909 	} else {
910 		if (spdk_pci_addr_parse(&pci_addr, trid->traddr)) {
911 			return;
912 		}
913 
914 		pci_dev = spdk_nvme_ctrlr_get_pci_device(ctrlr);
915 		if (!pci_dev) {
916 			return;
917 		}
918 
919 		pci_id = spdk_pci_device_get_id(pci_dev);
920 
921 		printf("Attached to NVMe Controller at %s [%04x:%04x]\n",
922 		       trid->traddr,
923 		       pci_id.vendor_id, pci_id.device_id);
924 	}
925 
926 	register_ctrlr(ctrlr, trid_entry);
927 }
928 
929 static int
930 register_controllers(void)
931 {
932 	struct trid_entry *trid_entry;
933 
934 	printf("Initializing NVMe Controllers\n");
935 
936 	TAILQ_FOREACH(trid_entry, &g_trid_list, tailq) {
937 		if (spdk_nvme_probe(&trid_entry->trid, trid_entry, probe_cb, attach_cb, NULL) != 0) {
938 			fprintf(stderr, "spdk_nvme_probe() failed for transport address '%s'\n",
939 				trid_entry->trid.traddr);
940 			return -1;
941 		}
942 	}
943 
944 	return 0;
945 }
946 
947 static void
948 unregister_controllers(void)
949 {
950 	struct ctrlr_entry *entry, *tmp;
951 	struct spdk_nvme_detach_ctx *detach_ctx = NULL;
952 
953 	TAILQ_FOREACH_SAFE(entry, &g_controllers, link, tmp) {
954 		TAILQ_REMOVE(&g_controllers, entry, link);
955 		spdk_nvme_detach_async(entry->ctrlr, &detach_ctx);
956 		free(entry);
957 	}
958 
959 	if (detach_ctx) {
960 		spdk_nvme_detach_poll(detach_ctx);
961 	}
962 }
963 
964 static int
965 associate_main_worker_with_ctrlr(void)
966 {
967 	struct ctrlr_entry	*entry;
968 	struct worker_thread	*worker;
969 	struct ctrlr_worker_ctx	*ctrlr_ctx;
970 
971 	TAILQ_FOREACH(worker, &g_workers, link) {
972 		if (worker->lcore == g_main_core) {
973 			break;
974 		}
975 	}
976 
977 	if (!worker) {
978 		return -1;
979 	}
980 
981 	TAILQ_FOREACH(entry, &g_controllers, link) {
982 		ctrlr_ctx = calloc(1, sizeof(struct ctrlr_worker_ctx));
983 		if (!ctrlr_ctx) {
984 			return -1;
985 		}
986 
987 		pthread_mutex_init(&ctrlr_ctx->mutex, NULL);
988 		ctrlr_ctx->entry = entry;
989 		ctrlr_ctx->ctrlr = entry->ctrlr;
990 
991 		TAILQ_INSERT_TAIL(&worker->ctrlr_ctx, ctrlr_ctx, link);
992 	}
993 
994 	return 0;
995 }
996 
997 static struct ctrlr_worker_ctx *
998 get_ctrlr_worker_ctx(struct spdk_nvme_ctrlr *ctrlr)
999 {
1000 	struct worker_thread	*worker;
1001 	struct ctrlr_worker_ctx *ctrlr_ctx;
1002 
1003 	TAILQ_FOREACH(worker, &g_workers, link) {
1004 		if (worker->lcore == g_main_core) {
1005 			break;
1006 		}
1007 	}
1008 
1009 	if (!worker) {
1010 		return NULL;
1011 	}
1012 
1013 	TAILQ_FOREACH(ctrlr_ctx, &worker->ctrlr_ctx, link) {
1014 		if (ctrlr_ctx->ctrlr == ctrlr) {
1015 			return ctrlr_ctx;
1016 		}
1017 	}
1018 
1019 	return NULL;
1020 }
1021 
1022 static int
1023 associate_workers_with_ns(void)
1024 {
1025 	struct ns_entry		*entry = TAILQ_FIRST(&g_namespaces);
1026 	struct worker_thread	*worker = TAILQ_FIRST(&g_workers);
1027 	struct ns_worker_ctx	*ns_ctx;
1028 	int			i, count;
1029 
1030 	count = g_num_namespaces > g_num_workers ? g_num_namespaces : g_num_workers;
1031 
1032 	for (i = 0; i < count; i++) {
1033 		if (entry == NULL) {
1034 			break;
1035 		}
1036 
1037 		ns_ctx = calloc(1, sizeof(struct ns_worker_ctx));
1038 		if (!ns_ctx) {
1039 			return -1;
1040 		}
1041 
1042 		printf("Associating %s with lcore %d\n", entry->name, worker->lcore);
1043 		ns_ctx->entry = entry;
1044 		ns_ctx->ctrlr_ctx = get_ctrlr_worker_ctx(entry->ctrlr);
1045 		if (!ns_ctx->ctrlr_ctx) {
1046 			free(ns_ctx);
1047 			return -1;
1048 		}
1049 
1050 		TAILQ_INSERT_TAIL(&worker->ns_ctx, ns_ctx, link);
1051 
1052 		worker = TAILQ_NEXT(worker, link);
1053 		if (worker == NULL) {
1054 			worker = TAILQ_FIRST(&g_workers);
1055 		}
1056 
1057 		entry = TAILQ_NEXT(entry, link);
1058 		if (entry == NULL) {
1059 			entry = TAILQ_FIRST(&g_namespaces);
1060 		}
1061 	}
1062 
1063 	return 0;
1064 }
1065 
1066 int
1067 main(int argc, char **argv)
1068 {
1069 	int rc;
1070 	struct worker_thread *worker, *main_worker;
1071 	struct spdk_env_opts opts;
1072 
1073 	rc = parse_args(argc, argv);
1074 	if (rc != 0) {
1075 		return rc;
1076 	}
1077 
1078 	opts.opts_size = sizeof(opts);
1079 	spdk_env_opts_init(&opts);
1080 	opts.name = "abort";
1081 	opts.shm_id = g_shm_id;
1082 	if (g_core_mask) {
1083 		opts.core_mask = g_core_mask;
1084 	}
1085 
1086 	if (g_dpdk_mem) {
1087 		opts.mem_size = g_dpdk_mem;
1088 	}
1089 	if (g_no_pci) {
1090 		opts.no_pci = g_no_pci;
1091 	}
1092 	if (g_no_hugepages) {
1093 		opts.no_huge = true;
1094 	}
1095 	if (spdk_env_init(&opts) < 0) {
1096 		fprintf(stderr, "Unable to initialize SPDK env\n");
1097 		unregister_trids();
1098 		return -1;
1099 	}
1100 
1101 	g_tsc_rate = spdk_get_ticks_hz();
1102 
1103 	if (register_workers() != 0) {
1104 		rc = -1;
1105 		goto cleanup;
1106 	}
1107 
1108 	if (register_controllers() != 0) {
1109 		rc = -1;
1110 		goto cleanup;
1111 	}
1112 
1113 	if (g_warn) {
1114 		printf("WARNING: Some requested NVMe devices were skipped\n");
1115 	}
1116 
1117 	if (g_num_namespaces == 0) {
1118 		fprintf(stderr, "No valid NVMe controllers found\n");
1119 		rc = -1;
1120 		goto cleanup;
1121 	}
1122 
1123 	if (associate_main_worker_with_ctrlr() != 0) {
1124 		rc = -1;
1125 		goto cleanup;
1126 	}
1127 
1128 	if (associate_workers_with_ns() != 0) {
1129 		rc = -1;
1130 		goto cleanup;
1131 	}
1132 
1133 	printf("Initialization complete. Launching workers.\n");
1134 
1135 	/* Launch all of the secondary workers */
1136 	g_main_core = spdk_env_get_current_core();
1137 	main_worker = NULL;
1138 	TAILQ_FOREACH(worker, &g_workers, link) {
1139 		if (worker->lcore != g_main_core) {
1140 			spdk_env_thread_launch_pinned(worker->lcore, work_fn, worker);
1141 		} else {
1142 			assert(main_worker == NULL);
1143 			main_worker = worker;
1144 		}
1145 	}
1146 
1147 	assert(main_worker != NULL);
1148 	rc = work_fn(main_worker);
1149 
1150 	spdk_env_thread_wait_all();
1151 
1152 	TAILQ_FOREACH(worker, &g_workers, link) {
1153 		if (worker->status != 0) {
1154 			rc = 1;
1155 			break;
1156 		}
1157 	}
1158 
1159 cleanup:
1160 	unregister_trids();
1161 	unregister_workers();
1162 	unregister_namespaces();
1163 	unregister_controllers();
1164 
1165 	spdk_env_fini();
1166 
1167 	if (rc != 0) {
1168 		fprintf(stderr, "%s: errors occurred\n", argv[0]);
1169 	}
1170 
1171 	return rc;
1172 }
1173