xref: /spdk/examples/nvme/abort/abort.c (revision b30d57cdad6d2bc75cc1e4e2ebbcebcb0d98dcfa)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/log.h"
38 #include "spdk/nvme.h"
39 #include "spdk/queue.h"
40 #include "spdk/string.h"
41 #include "spdk/util.h"
42 #include "spdk/likely.h"
43 
44 struct ctrlr_entry {
45 	struct spdk_nvme_ctrlr		*ctrlr;
46 	enum spdk_nvme_transport_type	trtype;
47 
48 	TAILQ_ENTRY(ctrlr_entry)	link;
49 	char				name[1024];
50 };
51 
52 struct ns_entry {
53 	struct spdk_nvme_ctrlr		*ctrlr;
54 	struct spdk_nvme_ns		*ns;
55 
56 	TAILQ_ENTRY(ns_entry)		link;
57 	uint32_t			io_size_blocks;
58 	uint32_t			num_io_requests;
59 	uint64_t			size_in_ios;
60 	uint32_t			block_size;
61 	char				name[1024];
62 };
63 
64 struct ctrlr_worker_ctx {
65 	pthread_mutex_t			mutex;
66 	struct ctrlr_entry		*entry;
67 	uint64_t			abort_submitted;
68 	uint64_t			abort_submit_failed;
69 	uint64_t			successful_abort;
70 	uint64_t			unsuccessful_abort;
71 	uint64_t			abort_failed;
72 	uint64_t			current_queue_depth;
73 	struct spdk_nvme_ctrlr		*ctrlr;
74 	TAILQ_ENTRY(ctrlr_worker_ctx)	link;
75 };
76 
77 struct ns_worker_ctx {
78 	struct ns_entry			*entry;
79 	uint64_t			io_submitted;
80 	uint64_t			io_completed;
81 	uint64_t			io_aborted;
82 	uint64_t			io_failed;
83 	uint64_t			current_queue_depth;
84 	uint64_t			offset_in_ios;
85 	bool				is_draining;
86 	struct spdk_nvme_qpair		*qpair;
87 	struct ctrlr_worker_ctx		*ctrlr_ctx;
88 	TAILQ_ENTRY(ns_worker_ctx)	link;
89 };
90 
91 struct perf_task {
92 	struct ns_worker_ctx		*ns_ctx;
93 	void				*buf;
94 };
95 
96 struct worker_thread {
97 	TAILQ_HEAD(, ns_worker_ctx)	ns_ctx;
98 	TAILQ_HEAD(, ctrlr_worker_ctx)	ctrlr_ctx;
99 	TAILQ_ENTRY(worker_thread)	link;
100 	unsigned			lcore;
101 };
102 
103 static const char *g_workload_type = "read";
104 static TAILQ_HEAD(, ctrlr_entry) g_controllers = TAILQ_HEAD_INITIALIZER(g_controllers);
105 static TAILQ_HEAD(, ns_entry) g_namespaces = TAILQ_HEAD_INITIALIZER(g_namespaces);
106 static int g_num_namespaces;
107 static TAILQ_HEAD(, worker_thread) g_workers = TAILQ_HEAD_INITIALIZER(g_workers);
108 static int g_num_workers = 0;
109 static uint32_t g_main_core;
110 
111 static int g_abort_interval = 1;
112 
113 static uint64_t g_tsc_rate;
114 
115 static uint32_t g_io_size_bytes = 131072;
116 static uint32_t g_max_io_size_blocks;
117 static int g_rw_percentage = -1;
118 static int g_is_random;
119 static int g_queue_depth = 128;
120 static int g_time_in_sec = 3;
121 static int g_dpdk_mem;
122 static int g_shm_id = -1;
123 static bool g_no_pci;
124 static bool g_warn;
125 static bool g_mix_specified;
126 
127 static const char *g_core_mask;
128 
129 struct trid_entry {
130 	struct spdk_nvme_transport_id	trid;
131 	uint16_t			nsid;
132 	TAILQ_ENTRY(trid_entry)		tailq;
133 };
134 
135 static TAILQ_HEAD(, trid_entry) g_trid_list = TAILQ_HEAD_INITIALIZER(g_trid_list);
136 
137 static void io_complete(void *ctx, const struct spdk_nvme_cpl *cpl);
138 
139 static int
140 build_nvme_name(char *name, size_t length, struct spdk_nvme_ctrlr *ctrlr)
141 {
142 	const struct spdk_nvme_transport_id *trid;
143 	int res = 0;
144 
145 	trid = spdk_nvme_ctrlr_get_transport_id(ctrlr);
146 
147 	switch (trid->trtype) {
148 	case SPDK_NVME_TRANSPORT_PCIE:
149 		res = snprintf(name, length, "PCIE (%s)", trid->traddr);
150 		break;
151 	case SPDK_NVME_TRANSPORT_RDMA:
152 		res = snprintf(name, length, "RDMA (addr:%s subnqn:%s)", trid->traddr, trid->subnqn);
153 		break;
154 	case SPDK_NVME_TRANSPORT_TCP:
155 		res = snprintf(name, length, "TCP (addr:%s subnqn:%s)", trid->traddr, trid->subnqn);
156 		break;
157 	case SPDK_NVME_TRANSPORT_CUSTOM:
158 		res = snprintf(name, length, "CUSTOM (%s)", trid->traddr);
159 		break;
160 
161 	default:
162 		fprintf(stderr, "Unknown transport type %d\n", trid->trtype);
163 		break;
164 	}
165 	return res;
166 }
167 
168 static void
169 build_nvme_ns_name(char *name, size_t length, struct spdk_nvme_ctrlr *ctrlr, uint32_t nsid)
170 {
171 	int res = 0;
172 
173 	res = build_nvme_name(name, length, ctrlr);
174 	if (res > 0) {
175 		snprintf(name + res, length - res, " NSID %u", nsid);
176 	}
177 
178 }
179 
180 static void
181 register_ns(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_ns *ns)
182 {
183 	struct ns_entry *entry;
184 	const struct spdk_nvme_ctrlr_data *cdata;
185 	uint32_t max_xfer_size, entries, sector_size;
186 	uint64_t ns_size;
187 	struct spdk_nvme_io_qpair_opts opts;
188 
189 	cdata = spdk_nvme_ctrlr_get_data(ctrlr);
190 
191 	if (!spdk_nvme_ns_is_active(ns)) {
192 		printf("Controller %-20.20s (%-20.20s): Skipping inactive NS %u\n",
193 		       cdata->mn, cdata->sn,
194 		       spdk_nvme_ns_get_id(ns));
195 		g_warn = true;
196 		return;
197 	}
198 
199 	ns_size = spdk_nvme_ns_get_size(ns);
200 	sector_size = spdk_nvme_ns_get_sector_size(ns);
201 
202 	if (ns_size < g_io_size_bytes || sector_size > g_io_size_bytes) {
203 		printf("WARNING: controller %-20.20s (%-20.20s) ns %u has invalid "
204 		       "ns size %" PRIu64 " / block size %u for I/O size %u\n",
205 		       cdata->mn, cdata->sn, spdk_nvme_ns_get_id(ns),
206 		       ns_size, spdk_nvme_ns_get_sector_size(ns), g_io_size_bytes);
207 		g_warn = true;
208 		return;
209 	}
210 
211 	max_xfer_size = spdk_nvme_ns_get_max_io_xfer_size(ns);
212 	spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
213 	/* NVMe driver may add additional entries based on
214 	 * stripe size and maximum transfer size, we assume
215 	 * 1 more entry be used for stripe.
216 	 */
217 	entries = (g_io_size_bytes - 1) / max_xfer_size + 2;
218 	if ((g_queue_depth * entries) > opts.io_queue_size) {
219 		printf("controller IO queue size %u less than required\n",
220 		       opts.io_queue_size);
221 		printf("Consider using lower queue depth or small IO size because "
222 		       "IO requests may be queued at the NVMe driver.\n");
223 	}
224 	/* For requests which have children requests, parent request itself
225 	 * will also occupy 1 entry.
226 	 */
227 	entries += 1;
228 
229 	entry = calloc(1, sizeof(struct ns_entry));
230 	if (entry == NULL) {
231 		perror("ns_entry malloc");
232 		exit(1);
233 	}
234 
235 	entry->ctrlr = ctrlr;
236 	entry->ns = ns;
237 	entry->num_io_requests = g_queue_depth * entries;
238 
239 	entry->size_in_ios = ns_size / g_io_size_bytes;
240 	entry->io_size_blocks = g_io_size_bytes / sector_size;
241 
242 	entry->block_size = spdk_nvme_ns_get_sector_size(ns);
243 
244 	if (g_max_io_size_blocks < entry->io_size_blocks) {
245 		g_max_io_size_blocks = entry->io_size_blocks;
246 	}
247 
248 	build_nvme_ns_name(entry->name, sizeof(entry->name), ctrlr, spdk_nvme_ns_get_id(ns));
249 
250 	g_num_namespaces++;
251 	TAILQ_INSERT_TAIL(&g_namespaces, entry, link);
252 }
253 
254 static void
255 unregister_namespaces(void)
256 {
257 	struct ns_entry *entry, *tmp;
258 
259 	TAILQ_FOREACH_SAFE(entry, &g_namespaces, link, tmp) {
260 		TAILQ_REMOVE(&g_namespaces, entry, link);
261 		free(entry);
262 	}
263 }
264 
265 static void
266 register_ctrlr(struct spdk_nvme_ctrlr *ctrlr, struct trid_entry *trid_entry)
267 {
268 	struct spdk_nvme_ns *ns;
269 	struct ctrlr_entry *entry = malloc(sizeof(struct ctrlr_entry));
270 	uint32_t nsid;
271 
272 	if (entry == NULL) {
273 		perror("ctrlr_entry malloc");
274 		exit(1);
275 	}
276 
277 	build_nvme_name(entry->name, sizeof(entry->name), ctrlr);
278 
279 	entry->ctrlr = ctrlr;
280 	entry->trtype = trid_entry->trid.trtype;
281 	TAILQ_INSERT_TAIL(&g_controllers, entry, link);
282 
283 	if (trid_entry->nsid == 0) {
284 		for (nsid = spdk_nvme_ctrlr_get_first_active_ns(ctrlr);
285 		     nsid != 0; nsid = spdk_nvme_ctrlr_get_next_active_ns(ctrlr, nsid)) {
286 			ns = spdk_nvme_ctrlr_get_ns(ctrlr, nsid);
287 			if (ns == NULL) {
288 				continue;
289 			}
290 			register_ns(ctrlr, ns);
291 		}
292 	} else {
293 		ns = spdk_nvme_ctrlr_get_ns(ctrlr, trid_entry->nsid);
294 		if (!ns) {
295 			perror("Namespace does not exist.");
296 			exit(1);
297 		}
298 
299 		register_ns(ctrlr, ns);
300 	}
301 }
302 
303 static void
304 abort_complete(void *ctx, const struct spdk_nvme_cpl *cpl)
305 {
306 	struct ctrlr_worker_ctx	*ctrlr_ctx = ctx;
307 
308 	ctrlr_ctx->current_queue_depth--;
309 	if (spdk_unlikely(spdk_nvme_cpl_is_error(cpl))) {
310 		ctrlr_ctx->abort_failed++;
311 	} else if ((cpl->cdw0 & 0x1) == 0) {
312 		ctrlr_ctx->successful_abort++;
313 	} else {
314 		ctrlr_ctx->unsuccessful_abort++;
315 	}
316 }
317 
318 static void
319 abort_task(struct perf_task *task)
320 {
321 	struct ns_worker_ctx	*ns_ctx = task->ns_ctx;
322 	struct ctrlr_worker_ctx	*ctrlr_ctx = ns_ctx->ctrlr_ctx;
323 	int			rc;
324 
325 	/* Hold mutex to guard ctrlr_ctx->current_queue_depth. */
326 	pthread_mutex_lock(&ctrlr_ctx->mutex);
327 
328 	rc = spdk_nvme_ctrlr_cmd_abort_ext(ctrlr_ctx->ctrlr, ns_ctx->qpair, task, abort_complete,
329 					   ctrlr_ctx);
330 
331 	if (spdk_unlikely(rc != 0)) {
332 		ctrlr_ctx->abort_submit_failed++;
333 	} else {
334 		ctrlr_ctx->current_queue_depth++;
335 		ctrlr_ctx->abort_submitted++;
336 	}
337 
338 	pthread_mutex_unlock(&ctrlr_ctx->mutex);
339 }
340 
341 static __thread unsigned int seed = 0;
342 
343 static inline void
344 submit_single_io(struct perf_task *task)
345 {
346 	uint64_t		offset_in_ios, lba;
347 	int			rc;
348 	struct ns_worker_ctx	*ns_ctx = task->ns_ctx;
349 	struct ns_entry		*entry = ns_ctx->entry;
350 
351 	if (g_is_random) {
352 		offset_in_ios = rand_r(&seed) % entry->size_in_ios;
353 	} else {
354 		offset_in_ios = ns_ctx->offset_in_ios++;
355 		if (ns_ctx->offset_in_ios == entry->size_in_ios) {
356 			ns_ctx->offset_in_ios = 0;
357 		}
358 	}
359 
360 	lba = offset_in_ios * entry->io_size_blocks;
361 
362 	if ((g_rw_percentage == 100) ||
363 	    (g_rw_percentage != 0 && (rand_r(&seed) % 100) < g_rw_percentage)) {
364 		rc = spdk_nvme_ns_cmd_read(entry->ns, ns_ctx->qpair, task->buf,
365 					   lba, entry->io_size_blocks, io_complete, task, 0);
366 	} else {
367 		rc = spdk_nvme_ns_cmd_write(entry->ns, ns_ctx->qpair, task->buf,
368 					    lba, entry->io_size_blocks, io_complete, task, 0);
369 	}
370 
371 	if (spdk_unlikely(rc != 0)) {
372 		fprintf(stderr, "I/O submission failed\n");
373 	} else {
374 		ns_ctx->current_queue_depth++;
375 		ns_ctx->io_submitted++;
376 
377 		if ((ns_ctx->io_submitted % g_abort_interval) == 0) {
378 			abort_task(task);
379 		}
380 	}
381 
382 }
383 
384 static void
385 io_complete(void *ctx, const struct spdk_nvme_cpl *cpl)
386 {
387 	struct perf_task	*task = ctx;
388 	struct ns_worker_ctx	*ns_ctx = task->ns_ctx;
389 
390 	ns_ctx->current_queue_depth--;
391 	if (spdk_unlikely(spdk_nvme_cpl_is_error(cpl))) {
392 		ns_ctx->io_failed++;
393 	} else {
394 		ns_ctx->io_completed++;
395 	}
396 
397 	/* is_draining indicates when time has expired for the test run and we are
398 	 * just waiting for the previously submitted I/O to complete. In this case,
399 	 * do not submit a new I/O to replace the one just completed.
400 	 */
401 	if (spdk_unlikely(ns_ctx->is_draining)) {
402 		spdk_dma_free(task->buf);
403 		free(task);
404 	} else {
405 		submit_single_io(task);
406 	}
407 }
408 
409 static struct perf_task *
410 allocate_task(struct ns_worker_ctx *ns_ctx)
411 {
412 	struct perf_task *task;
413 
414 	task = calloc(1, sizeof(*task));
415 	if (task == NULL) {
416 		fprintf(stderr, "Failed to allocate task\n");
417 		exit(1);
418 	}
419 
420 	task->buf = spdk_dma_zmalloc(g_io_size_bytes, 0x200, NULL);
421 	if (task->buf == NULL) {
422 		free(task);
423 		fprintf(stderr, "Failed to allocate task->buf\n");
424 		exit(1);
425 	}
426 
427 	task->ns_ctx = ns_ctx;
428 
429 	return task;
430 }
431 
432 static void
433 submit_io(struct ns_worker_ctx *ns_ctx, int queue_depth)
434 {
435 	struct perf_task *task;
436 
437 	while (queue_depth-- > 0) {
438 		task = allocate_task(ns_ctx);
439 		submit_single_io(task);
440 	}
441 }
442 
443 static int
444 work_fn(void *arg)
445 {
446 	struct worker_thread *worker = (struct worker_thread *)arg;
447 	struct ns_worker_ctx *ns_ctx;
448 	struct ctrlr_worker_ctx *ctrlr_ctx;
449 	struct ns_entry *ns_entry;
450 	struct spdk_nvme_io_qpair_opts opts;
451 	uint64_t tsc_end;
452 	uint32_t unfinished_ctx;
453 
454 	/* Allocate queue pair for each namespace. */
455 	TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
456 		ns_entry = ns_ctx->entry;
457 
458 		spdk_nvme_ctrlr_get_default_io_qpair_opts(ns_entry->ctrlr, &opts, sizeof(opts));
459 		if (opts.io_queue_requests < ns_entry->num_io_requests) {
460 			opts.io_queue_requests = ns_entry->num_io_requests;
461 		}
462 
463 		ns_ctx->qpair = spdk_nvme_ctrlr_alloc_io_qpair(ns_entry->ctrlr, &opts, sizeof(opts));
464 		if (ns_ctx->qpair == NULL) {
465 			fprintf(stderr, "spdk_nvme_ctrlr_alloc_io_qpair failed\n");
466 			return 1;
467 		}
468 	}
469 
470 	tsc_end = spdk_get_ticks() + g_time_in_sec * g_tsc_rate;
471 
472 	/* Submit initial I/O for each namespace. */
473 	TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
474 		submit_io(ns_ctx, g_queue_depth);
475 	}
476 
477 	while (1) {
478 		TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
479 			spdk_nvme_qpair_process_completions(ns_ctx->qpair, 0);
480 		}
481 
482 		if (worker->lcore == g_main_core) {
483 			TAILQ_FOREACH(ctrlr_ctx, &worker->ctrlr_ctx, link) {
484 				/* Hold mutex to guard ctrlr_ctx->current_queue_depth. */
485 				pthread_mutex_lock(&ctrlr_ctx->mutex);
486 				spdk_nvme_ctrlr_process_admin_completions(ctrlr_ctx->ctrlr);
487 				pthread_mutex_unlock(&ctrlr_ctx->mutex);
488 			}
489 		}
490 
491 		if (spdk_get_ticks() > tsc_end) {
492 			break;
493 		}
494 	}
495 
496 	do {
497 		unfinished_ctx = 0;
498 
499 		TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
500 			if (!ns_ctx->is_draining) {
501 				ns_ctx->is_draining = true;
502 			}
503 			if (ns_ctx->current_queue_depth > 0) {
504 				spdk_nvme_qpair_process_completions(ns_ctx->qpair, 0);
505 				if (ns_ctx->current_queue_depth == 0) {
506 					spdk_nvme_ctrlr_free_io_qpair(ns_ctx->qpair);
507 				} else {
508 					unfinished_ctx++;
509 				}
510 			}
511 		}
512 	} while (unfinished_ctx > 0);
513 
514 	if (worker->lcore == g_main_core) {
515 		do {
516 			unfinished_ctx = 0;
517 
518 			TAILQ_FOREACH(ctrlr_ctx, &worker->ctrlr_ctx, link) {
519 				pthread_mutex_lock(&ctrlr_ctx->mutex);
520 				if (ctrlr_ctx->current_queue_depth > 0) {
521 					spdk_nvme_ctrlr_process_admin_completions(ctrlr_ctx->ctrlr);
522 					if (ctrlr_ctx->current_queue_depth > 0) {
523 						unfinished_ctx++;
524 					}
525 				}
526 				pthread_mutex_unlock(&ctrlr_ctx->mutex);
527 			}
528 		} while (unfinished_ctx > 0);
529 	}
530 
531 	return 0;
532 }
533 
534 static void
535 usage(char *program_name)
536 {
537 	printf("%s options", program_name);
538 
539 	printf("\n");
540 	printf("\t[-q io depth]\n");
541 	printf("\t[-o io size in bytes]\n");
542 	printf("\t[-w io pattern type, must be one of\n");
543 	printf("\t\t(read, write, randread, randwrite, rw, randrw)]\n");
544 	printf("\t[-M rwmixread (100 for reads, 0 for writes)]\n");
545 	printf("\t[-t time in seconds]\n");
546 	printf("\t[-c core mask for I/O submission/completion.]\n");
547 	printf("\t\t(default: 1)\n");
548 	printf("\t[-r Transport ID for local PCIe NVMe or NVMeoF]\n");
549 	printf("\t Format: 'key:value [key:value] ...'\n");
550 	printf("\t Keys:\n");
551 	printf("\t  trtype      Transport type (e.g. PCIe, RDMA)\n");
552 	printf("\t  adrfam      Address family (e.g. IPv4, IPv6)\n");
553 	printf("\t  traddr      Transport address (e.g. 0000:04:00.0 for PCIe or 192.168.100.8 for RDMA)\n");
554 	printf("\t  trsvcid     Transport service identifier (e.g. 4420)\n");
555 	printf("\t  subnqn      Subsystem NQN (default: %s)\n", SPDK_NVMF_DISCOVERY_NQN);
556 	printf("\t Example: -r 'trtype:PCIe traddr:0000:04:00.0' for PCIe or\n");
557 	printf("\t          -r 'trtype:RDMA adrfam:IPv4 traddr:192.168.100.8 trsvcid:4420' for NVMeoF\n");
558 	printf("\t[-s DPDK huge memory size in MB.]\n");
559 	printf("\t[-i shared memory group ID]\n");
560 	printf("\t[-a abort interval.]\n");
561 	printf("\t");
562 	spdk_log_usage(stdout, "-T");
563 #ifdef DEBUG
564 	printf("\t[-G enable debug logging]\n");
565 #else
566 	printf("\t[-G enable debug logging (flag disabled, must reconfigure with --enable-debug)\n");
567 #endif
568 }
569 
570 static void
571 unregister_trids(void)
572 {
573 	struct trid_entry *trid_entry, *tmp;
574 
575 	TAILQ_FOREACH_SAFE(trid_entry, &g_trid_list, tailq, tmp) {
576 		TAILQ_REMOVE(&g_trid_list, trid_entry, tailq);
577 		free(trid_entry);
578 	}
579 }
580 
581 static int
582 add_trid(const char *trid_str)
583 {
584 	struct trid_entry *trid_entry;
585 	struct spdk_nvme_transport_id *trid;
586 	char *ns;
587 
588 	trid_entry = calloc(1, sizeof(*trid_entry));
589 	if (trid_entry == NULL) {
590 		return -1;
591 	}
592 
593 	trid = &trid_entry->trid;
594 	trid->trtype = SPDK_NVME_TRANSPORT_PCIE;
595 	snprintf(trid->subnqn, sizeof(trid->subnqn), "%s", SPDK_NVMF_DISCOVERY_NQN);
596 
597 	if (spdk_nvme_transport_id_parse(trid, trid_str) != 0) {
598 		fprintf(stderr, "Invalid transport ID format '%s'\n", trid_str);
599 		free(trid_entry);
600 		return 1;
601 	}
602 
603 	spdk_nvme_transport_id_populate_trstring(trid,
604 			spdk_nvme_transport_id_trtype_str(trid->trtype));
605 
606 	ns = strcasestr(trid_str, "ns:");
607 	if (ns) {
608 		char nsid_str[6]; /* 5 digits maximum in an nsid */
609 		int len;
610 		int nsid;
611 
612 		ns += 3;
613 
614 		len = strcspn(ns, " \t\n");
615 		if (len > 5) {
616 			fprintf(stderr, "NVMe namespace IDs must be 5 digits or less\n");
617 			free(trid_entry);
618 			return 1;
619 		}
620 
621 		memcpy(nsid_str, ns, len);
622 		nsid_str[len] = '\0';
623 
624 		nsid = spdk_strtol(nsid_str, 10);
625 		if (nsid <= 0 || nsid > 65535) {
626 			fprintf(stderr, "NVMe namespace IDs must be less than 65536 and greater than 0\n");
627 			free(trid_entry);
628 			return 1;
629 		}
630 
631 		trid_entry->nsid = (uint16_t)nsid;
632 	}
633 
634 	TAILQ_INSERT_TAIL(&g_trid_list, trid_entry, tailq);
635 	return 0;
636 }
637 
638 static int
639 parse_args(int argc, char **argv)
640 {
641 	int op;
642 	long int val;
643 	int rc;
644 
645 	while ((op = getopt(argc, argv, "a:c:i:o:q:r:s:t:w:M:")) != -1) {
646 		switch (op) {
647 		case 'a':
648 		case 'i':
649 		case 'o':
650 		case 'q':
651 		case 's':
652 		case 't':
653 		case 'M':
654 			val = spdk_strtol(optarg, 10);
655 			if (val < 0) {
656 				fprintf(stderr, "Converting a string to integer failed\n");
657 				return val;
658 			}
659 			switch (op) {
660 			case 'a':
661 				g_abort_interval = val;
662 				break;
663 			case 'i':
664 				g_shm_id = val;
665 				break;
666 			case 'o':
667 				g_io_size_bytes = val;
668 				break;
669 			case 'q':
670 				g_queue_depth = val;
671 				break;
672 			case 's':
673 				g_dpdk_mem = val;
674 				break;
675 			case 't':
676 				g_time_in_sec = val;
677 				break;
678 			case 'M':
679 				g_rw_percentage = val;
680 				g_mix_specified = true;
681 				break;
682 			}
683 			break;
684 		case 'c':
685 			g_core_mask = optarg;
686 			break;
687 		case 'r':
688 			if (add_trid(optarg)) {
689 				usage(argv[0]);
690 				return 1;
691 			}
692 			break;
693 		case 'w':
694 			g_workload_type = optarg;
695 			break;
696 		case 'G':
697 #ifndef DEBUG
698 			fprintf(stderr, "%s must be configured with --enable-debug for -G flag\n",
699 				argv[0]);
700 			usage(argv[0]);
701 			return 1;
702 #else
703 			spdk_log_set_flag("nvme");
704 			spdk_log_set_print_level(SPDK_LOG_DEBUG);
705 			break;
706 #endif
707 		case 'T':
708 			rc = spdk_log_set_flag(optarg);
709 			if (rc < 0) {
710 				fprintf(stderr, "unknown flag\n");
711 				usage(argv[0]);
712 				exit(EXIT_FAILURE);
713 			}
714 #ifdef DEBUG
715 			spdk_log_set_print_level(SPDK_LOG_DEBUG);
716 #endif
717 			break;
718 		default:
719 			usage(argv[0]);
720 			return 1;
721 		}
722 	}
723 
724 	if (!g_queue_depth) {
725 		fprintf(stderr, "missing -q (queue size) operand\n");
726 		usage(argv[0]);
727 		return 1;
728 	}
729 	if (!g_io_size_bytes) {
730 		fprintf(stderr, "missing -o (block size) operand\n");
731 		usage(argv[0]);
732 		return 1;
733 	}
734 	if (!g_workload_type) {
735 		fprintf(stderr, "missing -t (test time in seconds) operand\n");
736 		usage(argv[0]);
737 		return 1;
738 	}
739 
740 	if (!g_time_in_sec) {
741 		usage(argv[0]);
742 		return 1;
743 	}
744 
745 	if (strncmp(g_workload_type, "rand", 4) == 0) {
746 		g_is_random = 1;
747 		g_workload_type = &g_workload_type[4];
748 	}
749 
750 	if (strcmp(g_workload_type, "read") == 0 || strcmp(g_workload_type, "write") == 0) {
751 		g_rw_percentage = strcmp(g_workload_type, "read") == 0 ? 100 : 0;
752 		if (g_mix_specified) {
753 			fprintf(stderr, "Ignoring -M option... Please use -M option"
754 				" only when using rw or randrw.\n");
755 		}
756 	} else if (strcmp(g_workload_type, "rw") == 0) {
757 		if (g_rw_percentage < 0 || g_rw_percentage > 100) {
758 			fprintf(stderr,
759 				"-M must be specified to value from 0 to 100 "
760 				"for rw or randrw.\n");
761 			return 1;
762 		}
763 	} else {
764 		fprintf(stderr,
765 			"io pattern type must be one of\n"
766 			"(read, write, randread, randwrite, rw, randrw)\n");
767 		return 1;
768 	}
769 
770 	if (TAILQ_EMPTY(&g_trid_list)) {
771 		/* If no transport IDs specified, default to enumerating all local PCIe devices */
772 		add_trid("trtype:PCIe");
773 	} else {
774 		struct trid_entry *trid_entry, *trid_entry_tmp;
775 
776 		g_no_pci = true;
777 		/* check whether there is local PCIe type */
778 		TAILQ_FOREACH_SAFE(trid_entry, &g_trid_list, tailq, trid_entry_tmp) {
779 			if (trid_entry->trid.trtype == SPDK_NVME_TRANSPORT_PCIE) {
780 				g_no_pci = false;
781 				break;
782 			}
783 		}
784 	}
785 
786 	return 0;
787 }
788 
789 static int
790 register_workers(void)
791 {
792 	uint32_t i;
793 	struct worker_thread *worker;
794 
795 	SPDK_ENV_FOREACH_CORE(i) {
796 		worker = calloc(1, sizeof(*worker));
797 		if (worker == NULL) {
798 			fprintf(stderr, "Unable to allocate worker\n");
799 			return -1;
800 		}
801 
802 		TAILQ_INIT(&worker->ns_ctx);
803 		TAILQ_INIT(&worker->ctrlr_ctx);
804 		worker->lcore = i;
805 		TAILQ_INSERT_TAIL(&g_workers, worker, link);
806 		g_num_workers++;
807 	}
808 
809 	return 0;
810 }
811 
812 static void
813 unregister_workers(void)
814 {
815 	struct worker_thread *worker, *tmp_worker;
816 	struct ns_worker_ctx *ns_ctx, *tmp_ns_ctx;
817 	struct ctrlr_worker_ctx *ctrlr_ctx, *tmp_ctrlr_ctx;
818 
819 	/* Free namespace context and worker thread */
820 	TAILQ_FOREACH_SAFE(worker, &g_workers, link, tmp_worker) {
821 		TAILQ_REMOVE(&g_workers, worker, link);
822 
823 		TAILQ_FOREACH_SAFE(ns_ctx, &worker->ns_ctx, link, tmp_ns_ctx) {
824 			TAILQ_REMOVE(&worker->ns_ctx, ns_ctx, link);
825 			printf("NS: %s I/O completed: %" PRIu64 ", failed: %" PRIu64 "\n",
826 			       ns_ctx->entry->name, ns_ctx->io_completed, ns_ctx->io_failed);
827 			free(ns_ctx);
828 		}
829 
830 		TAILQ_FOREACH_SAFE(ctrlr_ctx, &worker->ctrlr_ctx, link, tmp_ctrlr_ctx) {
831 			TAILQ_REMOVE(&worker->ctrlr_ctx, ctrlr_ctx, link);
832 			printf("CTRLR: %s abort submitted %" PRIu64 ", failed to submit %" PRIu64 "\n",
833 			       ctrlr_ctx->entry->name, ctrlr_ctx->abort_submitted,
834 			       ctrlr_ctx->abort_submit_failed);
835 			printf("\t success %" PRIu64 ", unsuccess %" PRIu64 ", failed %" PRIu64 "\n",
836 			       ctrlr_ctx->successful_abort, ctrlr_ctx->unsuccessful_abort,
837 			       ctrlr_ctx->abort_failed);
838 			free(ctrlr_ctx);
839 		}
840 
841 		free(worker);
842 	}
843 }
844 
845 static bool
846 probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
847 	 struct spdk_nvme_ctrlr_opts *opts)
848 {
849 	return true;
850 }
851 
852 static void
853 attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
854 	  struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
855 {
856 	struct trid_entry       *trid_entry = cb_ctx;
857 	struct spdk_pci_addr    pci_addr;
858 	struct spdk_pci_device  *pci_dev;
859 	struct spdk_pci_id      pci_id;
860 
861 	if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
862 		printf("Attached to NVMe over Fabrics controller at %s:%s: %s\n",
863 		       trid->traddr, trid->trsvcid,
864 		       trid->subnqn);
865 	} else {
866 		if (spdk_pci_addr_parse(&pci_addr, trid->traddr)) {
867 			return;
868 		}
869 
870 		pci_dev = spdk_nvme_ctrlr_get_pci_device(ctrlr);
871 		if (!pci_dev) {
872 			return;
873 		}
874 
875 		pci_id = spdk_pci_device_get_id(pci_dev);
876 
877 		printf("Attached to NVMe Controller at %s [%04x:%04x]\n",
878 		       trid->traddr,
879 		       pci_id.vendor_id, pci_id.device_id);
880 	}
881 
882 	register_ctrlr(ctrlr, trid_entry);
883 }
884 
885 static int
886 register_controllers(void)
887 {
888 	struct trid_entry *trid_entry;
889 
890 	printf("Initializing NVMe Controllers\n");
891 
892 	TAILQ_FOREACH(trid_entry, &g_trid_list, tailq) {
893 		if (spdk_nvme_probe(&trid_entry->trid, trid_entry, probe_cb, attach_cb, NULL) != 0) {
894 			fprintf(stderr, "spdk_nvme_probe() failed for transport address '%s'\n",
895 				trid_entry->trid.traddr);
896 			return -1;
897 		}
898 	}
899 
900 	return 0;
901 }
902 
903 static void
904 unregister_controllers(void)
905 {
906 	struct ctrlr_entry *entry, *tmp;
907 	struct spdk_nvme_detach_ctx *detach_ctx = NULL;
908 
909 	TAILQ_FOREACH_SAFE(entry, &g_controllers, link, tmp) {
910 		TAILQ_REMOVE(&g_controllers, entry, link);
911 		spdk_nvme_detach_async(entry->ctrlr, &detach_ctx);
912 		free(entry);
913 	}
914 
915 	while (detach_ctx && spdk_nvme_detach_poll_async(detach_ctx) == -EAGAIN) {
916 		;
917 	}
918 }
919 
920 static int
921 associate_main_worker_with_ctrlr(void)
922 {
923 	struct ctrlr_entry	*entry;
924 	struct worker_thread	*worker;
925 	struct ctrlr_worker_ctx	*ctrlr_ctx;
926 
927 	TAILQ_FOREACH(worker, &g_workers, link) {
928 		if (worker->lcore == g_main_core) {
929 			break;
930 		}
931 	}
932 
933 	if (!worker) {
934 		return -1;
935 	}
936 
937 	TAILQ_FOREACH(entry, &g_controllers, link) {
938 		ctrlr_ctx = calloc(1, sizeof(struct ctrlr_worker_ctx));
939 		if (!ctrlr_ctx) {
940 			return -1;
941 		}
942 
943 		pthread_mutex_init(&ctrlr_ctx->mutex, NULL);
944 		ctrlr_ctx->entry = entry;
945 		ctrlr_ctx->ctrlr = entry->ctrlr;
946 
947 		TAILQ_INSERT_TAIL(&worker->ctrlr_ctx, ctrlr_ctx, link);
948 	}
949 
950 	return 0;
951 }
952 
953 static struct ctrlr_worker_ctx *
954 get_ctrlr_worker_ctx(struct spdk_nvme_ctrlr *ctrlr)
955 {
956 	struct worker_thread	*worker;
957 	struct ctrlr_worker_ctx *ctrlr_ctx;
958 
959 	TAILQ_FOREACH(worker, &g_workers, link) {
960 		if (worker->lcore == g_main_core) {
961 			break;
962 		}
963 	}
964 
965 	if (!worker) {
966 		return NULL;
967 	}
968 
969 	TAILQ_FOREACH(ctrlr_ctx, &worker->ctrlr_ctx, link) {
970 		if (ctrlr_ctx->ctrlr == ctrlr) {
971 			return ctrlr_ctx;
972 		}
973 	}
974 
975 	return NULL;
976 }
977 
978 static int
979 associate_workers_with_ns(void)
980 {
981 	struct ns_entry		*entry = TAILQ_FIRST(&g_namespaces);
982 	struct worker_thread	*worker = TAILQ_FIRST(&g_workers);
983 	struct ns_worker_ctx	*ns_ctx;
984 	int			i, count;
985 
986 	count = g_num_namespaces > g_num_workers ? g_num_namespaces : g_num_workers;
987 
988 	for (i = 0; i < count; i++) {
989 		if (entry == NULL) {
990 			break;
991 		}
992 
993 		ns_ctx = calloc(1, sizeof(struct ns_worker_ctx));
994 		if (!ns_ctx) {
995 			return -1;
996 		}
997 
998 		printf("Associating %s with lcore %d\n", entry->name, worker->lcore);
999 		ns_ctx->entry = entry;
1000 		ns_ctx->ctrlr_ctx = get_ctrlr_worker_ctx(entry->ctrlr);
1001 		if (!ns_ctx->ctrlr_ctx) {
1002 			free(ns_ctx);
1003 			return -1;
1004 		}
1005 
1006 		TAILQ_INSERT_TAIL(&worker->ns_ctx, ns_ctx, link);
1007 
1008 		worker = TAILQ_NEXT(worker, link);
1009 		if (worker == NULL) {
1010 			worker = TAILQ_FIRST(&g_workers);
1011 		}
1012 
1013 		entry = TAILQ_NEXT(entry, link);
1014 		if (entry == NULL) {
1015 			entry = TAILQ_FIRST(&g_namespaces);
1016 		}
1017 	}
1018 
1019 	return 0;
1020 }
1021 
1022 int main(int argc, char **argv)
1023 {
1024 	int rc;
1025 	struct worker_thread *worker, *main_worker;
1026 	struct spdk_env_opts opts;
1027 
1028 	rc = parse_args(argc, argv);
1029 	if (rc != 0) {
1030 		return rc;
1031 	}
1032 
1033 	spdk_env_opts_init(&opts);
1034 	opts.name = "abort";
1035 	opts.shm_id = g_shm_id;
1036 	if (g_core_mask) {
1037 		opts.core_mask = g_core_mask;
1038 	}
1039 
1040 	if (g_dpdk_mem) {
1041 		opts.mem_size = g_dpdk_mem;
1042 	}
1043 	if (g_no_pci) {
1044 		opts.no_pci = g_no_pci;
1045 	}
1046 	if (spdk_env_init(&opts) < 0) {
1047 		fprintf(stderr, "Unable to initialize SPDK env\n");
1048 		rc = -1;
1049 		goto cleanup;
1050 	}
1051 
1052 	g_tsc_rate = spdk_get_ticks_hz();
1053 
1054 	if (register_workers() != 0) {
1055 		rc = -1;
1056 		goto cleanup;
1057 	}
1058 
1059 	if (register_controllers() != 0) {
1060 		rc = -1;
1061 		goto cleanup;
1062 	}
1063 
1064 	if (g_warn) {
1065 		printf("WARNING: Some requested NVMe devices were skipped\n");
1066 	}
1067 
1068 	if (g_num_namespaces == 0) {
1069 		fprintf(stderr, "No valid NVMe controllers found\n");
1070 		goto cleanup;
1071 	}
1072 
1073 	if (associate_main_worker_with_ctrlr() != 0) {
1074 		rc = -1;
1075 		goto cleanup;
1076 	}
1077 
1078 	if (associate_workers_with_ns() != 0) {
1079 		rc = -1;
1080 		goto cleanup;
1081 	}
1082 
1083 	printf("Initialization complete. Launching workers.\n");
1084 
1085 	/* Launch all of the secondary workers */
1086 	g_main_core = spdk_env_get_current_core();
1087 	main_worker = NULL;
1088 	TAILQ_FOREACH(worker, &g_workers, link) {
1089 		if (worker->lcore != g_main_core) {
1090 			spdk_env_thread_launch_pinned(worker->lcore, work_fn, worker);
1091 		} else {
1092 			assert(main_worker == NULL);
1093 			main_worker = worker;
1094 		}
1095 	}
1096 
1097 	assert(main_worker != NULL);
1098 	rc = work_fn(main_worker);
1099 
1100 	spdk_env_thread_wait_all();
1101 
1102 cleanup:
1103 	unregister_trids();
1104 	unregister_workers();
1105 	unregister_namespaces();
1106 	unregister_controllers();
1107 
1108 	if (rc != 0) {
1109 		fprintf(stderr, "%s: errors occured\n", argv[0]);
1110 	}
1111 
1112 	return rc;
1113 }
1114