xref: /spdk/test/nvme/fused_ordering/fused_ordering.c (revision c6c1234de9e0015e670dd0b51bf6ce39ee0e07bd)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2022 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/nvme.h"
9 #include "spdk/env.h"
10 #include "spdk/string.h"
11 #include "spdk/log.h"
12 #include "spdk/util.h"
13 #include "spdk/likely.h"
14 
15 #define WRITE_BLOCKS 128
16 #define FUSED_BLOCKS 1
17 #define NO_WRITE_CMDS 8
18 
19 struct worker_thread {
20 	TAILQ_ENTRY(worker_thread) link;
21 	unsigned lcore;
22 	void *cw_buf;
23 	void *large_buf;
24 	struct spdk_nvme_qpair *qpair;
25 	uint32_t poll_count;
26 	uint32_t outstanding;
27 	int status;
28 };
29 
30 static struct spdk_nvme_ctrlr *g_ctrlr;
31 static struct spdk_nvme_ns *g_ns;
32 static struct spdk_nvme_transport_id g_trid = {};
33 static uint32_t g_num_workers = 0;
34 static TAILQ_HEAD(, worker_thread) g_workers = TAILQ_HEAD_INITIALIZER(g_workers);
35 
36 
37 static void
38 io_complete(void *arg, const struct spdk_nvme_cpl *cpl)
39 {
40 	struct worker_thread *worker = arg;
41 
42 	if (spdk_nvme_cpl_is_error(cpl)) {
43 		spdk_nvme_print_completion(spdk_nvme_qpair_get_id(worker->qpair),
44 					   (struct spdk_nvme_cpl *)cpl);
45 		exit(1);
46 	}
47 
48 	worker->outstanding--;
49 }
50 
51 static int
52 register_workers(void)
53 {
54 	uint32_t i;
55 	struct worker_thread *worker;
56 
57 	SPDK_ENV_FOREACH_CORE(i) {
58 		worker = calloc(1, sizeof(*worker));
59 		if (worker == NULL) {
60 			fprintf(stderr, "Unable to allocate worker\n");
61 			return -1;
62 		}
63 
64 		worker->lcore = i;
65 		TAILQ_INSERT_TAIL(&g_workers, worker, link);
66 		g_num_workers++;
67 	}
68 
69 	return 0;
70 }
71 
72 static void
73 unregister_workers(void)
74 {
75 	struct worker_thread *worker, *tmp_worker;
76 
77 	TAILQ_FOREACH_SAFE(worker, &g_workers, link, tmp_worker) {
78 		TAILQ_REMOVE(&g_workers, worker, link);
79 		free(worker);
80 	}
81 }
82 
83 static unsigned
84 init_workers(void)
85 {
86 	void *cw_buf = NULL, *large_buf = NULL;
87 	struct worker_thread *worker;
88 	int rc = 0;
89 
90 	assert(g_num_workers);
91 
92 	cw_buf = spdk_zmalloc(FUSED_BLOCKS * 4096, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
93 			      SPDK_MALLOC_DMA);
94 	if (cw_buf == NULL) {
95 		printf("ERROR: buffer allocation failed.\n");
96 		rc = -1;
97 		goto error;
98 	}
99 
100 	large_buf = spdk_zmalloc(WRITE_BLOCKS * 4096, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY,
101 				 SPDK_MALLOC_DMA);
102 	if (large_buf == NULL) {
103 		printf("ERROR: buffer allocation failed.\n");
104 		rc = -1;
105 		goto error;
106 	}
107 
108 	TAILQ_FOREACH(worker, &g_workers, link) {
109 		worker->qpair = spdk_nvme_ctrlr_alloc_io_qpair(g_ctrlr, NULL, 0);
110 		if (worker->qpair == NULL) {
111 			printf("ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed.\n");
112 			rc = -1;
113 			goto error;
114 		}
115 		worker->cw_buf = cw_buf;
116 		worker->large_buf = large_buf;
117 	}
118 	goto exit;
119 
120 error:
121 	TAILQ_FOREACH(worker, &g_workers, link) {
122 		spdk_nvme_ctrlr_free_io_qpair(worker->qpair);
123 	}
124 	spdk_free(large_buf);
125 	spdk_free(cw_buf);
126 exit:
127 	return rc;
128 }
129 
130 static void
131 fini_workers(void)
132 {
133 	void *cw_buf = NULL, *large_buf = NULL;
134 	struct worker_thread *worker;
135 
136 	TAILQ_FOREACH(worker, &g_workers, link) {
137 		spdk_nvme_ctrlr_free_io_qpair(worker->qpair);
138 		cw_buf = worker->cw_buf;
139 		large_buf = worker->large_buf;
140 	}
141 
142 	spdk_free(large_buf);
143 	spdk_free(cw_buf);
144 }
145 
146 static int
147 fused_ordering(void *arg)
148 {
149 	struct worker_thread *worker = (struct worker_thread *)arg;
150 	uint32_t i;
151 	uint32_t rc = 0;
152 
153 	/* Issue relatively large writes - big enough that the data will not fit
154 	 * in-capsule - followed by the compare command. Then poll the completion queue a number of
155 	 * times matching the poll_count variable. This adds a variable amount of delay between
156 	 * the compare and the subsequent fused write submission.
157 	 *
158 	 * GitHub issue #2428 showed a problem where once the non-in-capsule data had been fetched from
159 	 * the host, that request could get sent to the target layer between the two fused commands. This
160 	 * variable delay would eventually induce this condition before the fix.
161 	 */
162 	/* Submit 8 write commands per queue */
163 	for (i = 0; i < NO_WRITE_CMDS; i++) {
164 		rc = spdk_nvme_ns_cmd_write(g_ns, worker->qpair, worker->large_buf,
165 					    0,
166 					    WRITE_BLOCKS, io_complete,
167 					    worker,
168 					    0);
169 		if (rc != 0) {
170 			fprintf(stderr, "starting write I/O failed\n");
171 			goto out;
172 		}
173 
174 		worker->outstanding++;
175 	}
176 
177 	/* Submit first fuse command, per queue */
178 	rc = spdk_nvme_ns_cmd_compare(g_ns, worker->qpair, worker->cw_buf,
179 				      0,
180 				      FUSED_BLOCKS, io_complete,
181 				      worker,
182 				      SPDK_NVME_IO_FLAGS_FUSE_FIRST);
183 	if (rc != 0) {
184 		fprintf(stderr, "starting compare I/O failed\n");
185 		goto out;
186 	}
187 
188 	worker->outstanding++;
189 
190 	/* Process completions */
191 	while (worker->poll_count-- > 0) {
192 		spdk_nvme_qpair_process_completions(worker->qpair, 0);
193 	}
194 
195 	/* Submit second fuse command, one per queue */
196 	rc = spdk_nvme_ns_cmd_write(g_ns, worker->qpair, worker->cw_buf, 0,
197 				    FUSED_BLOCKS, io_complete,
198 				    worker,
199 				    SPDK_NVME_IO_FLAGS_FUSE_SECOND);
200 	if (rc != 0) {
201 		fprintf(stderr, "starting write I/O failed\n");
202 		goto out;
203 	}
204 
205 	worker->outstanding++;
206 
207 	/* Process completions */
208 	while (worker->outstanding > 0) {
209 		spdk_nvme_qpair_process_completions(worker->qpair, 0);
210 	}
211 
212 out:
213 	worker->status = rc;
214 	return rc;
215 }
216 
217 static void
218 usage(const char *program_name)
219 {
220 	printf("%s [options]", program_name);
221 	printf("\t\n");
222 	printf("options:\n");
223 	printf("\t[-r remote NVMe over Fabrics target address]\n");
224 #ifdef DEBUG
225 	printf("\t[-L enable debug logging]\n");
226 #else
227 	printf("\t[-L enable debug logging (flag disabled, must reconfigure with --enable-debug)]\n");
228 	printf("\t[-c core mask]\n");
229 #endif
230 	printf("\t[-s memory size in MB for DPDK (default: 0MB)]\n");
231 	printf("\t[--no-huge SPDK is run without hugepages]\n");
232 }
233 
234 #define FUSED_GETOPT_STRING "r:L:q:c:s:"
235 static const struct option g_fused_cmdline_opts[] = {
236 #define FUSED_NO_HUGE        257
237 	{"no-huge", no_argument, NULL, FUSED_NO_HUGE},
238 	{0, 0, 0, 0}
239 };
240 
241 static int
242 parse_args(int argc, char **argv, struct spdk_env_opts *env_opts)
243 {
244 	int op, rc, opt_index;
245 	long int value;
246 
247 	while ((op = getopt_long(argc, argv, FUSED_GETOPT_STRING, g_fused_cmdline_opts,
248 				 &opt_index)) != -1) {
249 		switch (op) {
250 		case 'r':
251 			if (spdk_nvme_transport_id_parse(&g_trid, optarg) != 0) {
252 				fprintf(stderr, "Error parsing transport address\n");
253 				return 1;
254 			}
255 			break;
256 		case 'L':
257 			rc = spdk_log_set_flag(optarg);
258 			if (rc < 0) {
259 				fprintf(stderr, "unknown flag\n");
260 				usage(argv[0]);
261 				exit(EXIT_FAILURE);
262 			}
263 #ifdef DEBUG
264 			spdk_log_set_print_level(SPDK_LOG_DEBUG);
265 #endif
266 			break;
267 		case 'c':
268 			env_opts->core_mask = optarg;
269 			break;
270 		case 's':
271 			value = spdk_strtol(optarg, 10);
272 			if (value < 0) {
273 				fprintf(stderr, "converting a string to integer failed\n");
274 				return -EINVAL;
275 			}
276 			env_opts->mem_size = value;
277 			break;
278 		case FUSED_NO_HUGE:
279 			env_opts->no_huge = true;
280 			break;
281 		default:
282 			usage(argv[0]);
283 			return 1;
284 		}
285 	}
286 
287 	return 0;
288 }
289 
290 int
291 main(int argc, char **argv)
292 {
293 	int rc, i;
294 	struct spdk_env_opts opts;
295 	struct spdk_nvme_ctrlr_opts ctrlr_opts;
296 	int nsid;
297 	const struct spdk_nvme_ctrlr_opts *ctrlr_opts_actual;
298 	uint32_t ctrlr_io_queues;
299 	uint32_t main_core;
300 	struct worker_thread *main_worker = NULL, *worker = NULL;
301 
302 	opts.opts_size = sizeof(opts);
303 	spdk_env_opts_init(&opts);
304 	spdk_log_set_print_level(SPDK_LOG_NOTICE);
305 	rc = parse_args(argc, argv, &opts);
306 	if (rc != 0) {
307 		return rc;
308 	}
309 
310 	opts.name = "fused_ordering";
311 	if (spdk_env_init(&opts) < 0) {
312 		fprintf(stderr, "Unable to initialize SPDK env\n");
313 		return 1;
314 	}
315 
316 	if (register_workers() != 0) {
317 		rc = -1;
318 		goto exit;
319 	}
320 
321 	spdk_nvme_ctrlr_get_default_ctrlr_opts(&ctrlr_opts, sizeof(ctrlr_opts));
322 	ctrlr_opts.keep_alive_timeout_ms = 60 * 1000;
323 	g_ctrlr = spdk_nvme_connect(&g_trid, &ctrlr_opts, sizeof(ctrlr_opts));
324 	if (g_ctrlr == NULL) {
325 		fprintf(stderr, "spdk_nvme_connect() failed\n");
326 		rc = 1;
327 		goto exit;
328 	}
329 
330 	printf("Attached to %s\n", g_trid.subnqn);
331 
332 	nsid = spdk_nvme_ctrlr_get_first_active_ns(g_ctrlr);
333 	if (nsid == 0) {
334 		perror("No active namespaces");
335 		exit(1);
336 	}
337 	g_ns = spdk_nvme_ctrlr_get_ns(g_ctrlr, nsid);
338 
339 	printf("  Namespace ID: %d size: %juGB\n", spdk_nvme_ns_get_id(g_ns),
340 	       spdk_nvme_ns_get_size(g_ns) / 1000000000);
341 
342 	ctrlr_opts_actual = spdk_nvme_ctrlr_get_opts(g_ctrlr);
343 	ctrlr_io_queues = ctrlr_opts_actual->num_io_queues;
344 
345 	/* One qpair per core */
346 	if (g_num_workers > ctrlr_io_queues) {
347 		printf("ERROR: Number of IO queues requested %d more then ctrlr caps %d.\n", g_num_workers,
348 		       ctrlr_io_queues);
349 		rc = -1;
350 		goto exit;
351 	}
352 
353 	rc = init_workers();
354 	if (rc) {
355 		printf("ERROR: Workers initialization failed.\n");
356 		goto exit;
357 	}
358 
359 	for (i = 0; i < 1024; i++) {
360 		printf("fused_ordering(%d)\n", i);
361 		main_core = spdk_env_get_current_core();
362 		TAILQ_FOREACH(worker, &g_workers, link) {
363 			worker->poll_count = i;
364 			if (worker->lcore != main_core) {
365 				spdk_env_thread_launch_pinned(worker->lcore, fused_ordering, worker);
366 			} else {
367 				main_worker = worker;
368 			}
369 		}
370 
371 		if (main_worker != NULL) {
372 			fused_ordering(main_worker);
373 		}
374 
375 		spdk_env_thread_wait_all();
376 
377 		TAILQ_FOREACH(worker, &g_workers, link) {
378 			if (spdk_unlikely(worker->status != 0)) {
379 				SPDK_ERRLOG("Iteration of fused ordering(%d) failed.\n", i - 1);
380 				rc = -1;
381 				goto exit;
382 			}
383 		}
384 	}
385 
386 exit:
387 	fini_workers();
388 	unregister_workers();
389 	spdk_nvme_detach(g_ctrlr);
390 	spdk_env_fini();
391 	return rc;
392 }
393