1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright (C) 2022 Intel Corporation. 3 * All rights reserved. 4 */ 5 6 #include "spdk/stdinc.h" 7 8 #include "spdk/nvme.h" 9 #include "spdk/env.h" 10 #include "spdk/string.h" 11 #include "spdk/log.h" 12 #include "spdk/util.h" 13 #include "spdk/likely.h" 14 15 #define WRITE_BLOCKS 128 16 #define FUSED_BLOCKS 1 17 #define NO_WRITE_CMDS 8 18 19 struct worker_thread { 20 TAILQ_ENTRY(worker_thread) link; 21 unsigned lcore; 22 void *cw_buf; 23 void *large_buf; 24 struct spdk_nvme_qpair *qpair; 25 uint32_t poll_count; 26 uint32_t outstanding; 27 int status; 28 }; 29 30 static struct spdk_nvme_ctrlr *g_ctrlr; 31 static struct spdk_nvme_ns *g_ns; 32 static struct spdk_nvme_transport_id g_trid = {}; 33 static uint32_t g_num_workers = 0; 34 static TAILQ_HEAD(, worker_thread) g_workers = TAILQ_HEAD_INITIALIZER(g_workers); 35 36 37 static void 38 io_complete(void *arg, const struct spdk_nvme_cpl *cpl) 39 { 40 struct worker_thread *worker = arg; 41 42 if (spdk_nvme_cpl_is_error(cpl)) { 43 spdk_nvme_print_completion(spdk_nvme_qpair_get_id(worker->qpair), 44 (struct spdk_nvme_cpl *)cpl); 45 exit(1); 46 } 47 48 worker->outstanding--; 49 } 50 51 static int 52 register_workers(void) 53 { 54 uint32_t i; 55 struct worker_thread *worker; 56 57 SPDK_ENV_FOREACH_CORE(i) { 58 worker = calloc(1, sizeof(*worker)); 59 if (worker == NULL) { 60 fprintf(stderr, "Unable to allocate worker\n"); 61 return -1; 62 } 63 64 worker->lcore = i; 65 TAILQ_INSERT_TAIL(&g_workers, worker, link); 66 g_num_workers++; 67 } 68 69 return 0; 70 } 71 72 static void 73 unregister_workers(void) 74 { 75 struct worker_thread *worker, *tmp_worker; 76 77 TAILQ_FOREACH_SAFE(worker, &g_workers, link, tmp_worker) { 78 TAILQ_REMOVE(&g_workers, worker, link); 79 free(worker); 80 } 81 } 82 83 static unsigned 84 init_workers(void) 85 { 86 void *cw_buf = NULL, *large_buf = NULL; 87 struct worker_thread *worker; 88 int rc = 0; 89 90 assert(g_num_workers); 91 92 cw_buf = spdk_zmalloc(FUSED_BLOCKS * 4096, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 93 SPDK_MALLOC_DMA); 94 if (cw_buf == NULL) { 95 printf("ERROR: buffer allocation failed.\n"); 96 rc = -1; 97 goto error; 98 } 99 100 large_buf = spdk_zmalloc(WRITE_BLOCKS * 4096, 0x1000, NULL, SPDK_ENV_SOCKET_ID_ANY, 101 SPDK_MALLOC_DMA); 102 if (large_buf == NULL) { 103 printf("ERROR: buffer allocation failed.\n"); 104 rc = -1; 105 goto error; 106 } 107 108 TAILQ_FOREACH(worker, &g_workers, link) { 109 worker->qpair = spdk_nvme_ctrlr_alloc_io_qpair(g_ctrlr, NULL, 0); 110 if (worker->qpair == NULL) { 111 printf("ERROR: spdk_nvme_ctrlr_alloc_io_qpair() failed.\n"); 112 rc = -1; 113 goto error; 114 } 115 worker->cw_buf = cw_buf; 116 worker->large_buf = large_buf; 117 } 118 goto exit; 119 120 error: 121 TAILQ_FOREACH(worker, &g_workers, link) { 122 spdk_nvme_ctrlr_free_io_qpair(worker->qpair); 123 } 124 spdk_free(large_buf); 125 spdk_free(cw_buf); 126 exit: 127 return rc; 128 } 129 130 static void 131 fini_workers(void) 132 { 133 void *cw_buf = NULL, *large_buf = NULL; 134 struct worker_thread *worker; 135 136 TAILQ_FOREACH(worker, &g_workers, link) { 137 spdk_nvme_ctrlr_free_io_qpair(worker->qpair); 138 cw_buf = worker->cw_buf; 139 large_buf = worker->large_buf; 140 } 141 142 spdk_free(large_buf); 143 spdk_free(cw_buf); 144 } 145 146 static int 147 fused_ordering(void *arg) 148 { 149 struct worker_thread *worker = (struct worker_thread *)arg; 150 uint32_t i; 151 uint32_t rc = 0; 152 153 /* Issue relatively large writes - big enough that the data will not fit 154 * in-capsule - followed by the compare command. Then poll the completion queue a number of 155 * times matching the poll_count variable. This adds a variable amount of delay between 156 * the compare and the subsequent fused write submission. 157 * 158 * GitHub issue #2428 showed a problem where once the non-in-capsule data had been fetched from 159 * the host, that request could get sent to the target layer between the two fused commands. This 160 * variable delay would eventually induce this condition before the fix. 161 */ 162 /* Submit 8 write commands per queue */ 163 for (i = 0; i < NO_WRITE_CMDS; i++) { 164 rc = spdk_nvme_ns_cmd_write(g_ns, worker->qpair, worker->large_buf, 165 0, 166 WRITE_BLOCKS, io_complete, 167 worker, 168 0); 169 if (rc != 0) { 170 fprintf(stderr, "starting write I/O failed\n"); 171 goto out; 172 } 173 174 worker->outstanding++; 175 } 176 177 /* Submit first fuse command, per queue */ 178 rc = spdk_nvme_ns_cmd_compare(g_ns, worker->qpair, worker->cw_buf, 179 0, 180 FUSED_BLOCKS, io_complete, 181 worker, 182 SPDK_NVME_IO_FLAGS_FUSE_FIRST); 183 if (rc != 0) { 184 fprintf(stderr, "starting compare I/O failed\n"); 185 goto out; 186 } 187 188 worker->outstanding++; 189 190 /* Process completions */ 191 while (worker->poll_count-- > 0) { 192 spdk_nvme_qpair_process_completions(worker->qpair, 0); 193 } 194 195 /* Submit second fuse command, one per queue */ 196 rc = spdk_nvme_ns_cmd_write(g_ns, worker->qpair, worker->cw_buf, 0, 197 FUSED_BLOCKS, io_complete, 198 worker, 199 SPDK_NVME_IO_FLAGS_FUSE_SECOND); 200 if (rc != 0) { 201 fprintf(stderr, "starting write I/O failed\n"); 202 goto out; 203 } 204 205 worker->outstanding++; 206 207 /* Process completions */ 208 while (worker->outstanding > 0) { 209 spdk_nvme_qpair_process_completions(worker->qpair, 0); 210 } 211 212 out: 213 worker->status = rc; 214 return rc; 215 } 216 217 static void 218 usage(const char *program_name) 219 { 220 printf("%s [options]", program_name); 221 printf("\t\n"); 222 printf("options:\n"); 223 printf("\t[-r remote NVMe over Fabrics target address]\n"); 224 #ifdef DEBUG 225 printf("\t[-L enable debug logging]\n"); 226 #else 227 printf("\t[-L enable debug logging (flag disabled, must reconfigure with --enable-debug)]\n"); 228 printf("\t[-c core mask]\n"); 229 #endif 230 printf("\t[-s memory size in MB for DPDK (default: 0MB)]\n"); 231 printf("\t[--no-huge SPDK is run without hugepages]\n"); 232 } 233 234 #define FUSED_GETOPT_STRING "r:L:q:c:s:" 235 static const struct option g_fused_cmdline_opts[] = { 236 #define FUSED_NO_HUGE 257 237 {"no-huge", no_argument, NULL, FUSED_NO_HUGE}, 238 {0, 0, 0, 0} 239 }; 240 241 static int 242 parse_args(int argc, char **argv, struct spdk_env_opts *env_opts) 243 { 244 int op, rc, opt_index; 245 long int value; 246 247 while ((op = getopt_long(argc, argv, FUSED_GETOPT_STRING, g_fused_cmdline_opts, 248 &opt_index)) != -1) { 249 switch (op) { 250 case 'r': 251 if (spdk_nvme_transport_id_parse(&g_trid, optarg) != 0) { 252 fprintf(stderr, "Error parsing transport address\n"); 253 return 1; 254 } 255 break; 256 case 'L': 257 rc = spdk_log_set_flag(optarg); 258 if (rc < 0) { 259 fprintf(stderr, "unknown flag\n"); 260 usage(argv[0]); 261 exit(EXIT_FAILURE); 262 } 263 #ifdef DEBUG 264 spdk_log_set_print_level(SPDK_LOG_DEBUG); 265 #endif 266 break; 267 case 'c': 268 env_opts->core_mask = optarg; 269 break; 270 case 's': 271 value = spdk_strtol(optarg, 10); 272 if (value < 0) { 273 fprintf(stderr, "converting a string to integer failed\n"); 274 return -EINVAL; 275 } 276 env_opts->mem_size = value; 277 break; 278 case FUSED_NO_HUGE: 279 env_opts->no_huge = true; 280 break; 281 default: 282 usage(argv[0]); 283 return 1; 284 } 285 } 286 287 return 0; 288 } 289 290 int 291 main(int argc, char **argv) 292 { 293 int rc, i; 294 struct spdk_env_opts opts; 295 struct spdk_nvme_ctrlr_opts ctrlr_opts; 296 int nsid; 297 const struct spdk_nvme_ctrlr_opts *ctrlr_opts_actual; 298 uint32_t ctrlr_io_queues; 299 uint32_t main_core; 300 struct worker_thread *main_worker = NULL, *worker = NULL; 301 302 spdk_env_opts_init(&opts); 303 spdk_log_set_print_level(SPDK_LOG_NOTICE); 304 rc = parse_args(argc, argv, &opts); 305 if (rc != 0) { 306 return rc; 307 } 308 309 opts.name = "fused_ordering"; 310 if (spdk_env_init(&opts) < 0) { 311 fprintf(stderr, "Unable to initialize SPDK env\n"); 312 return 1; 313 } 314 315 if (register_workers() != 0) { 316 rc = -1; 317 goto exit; 318 } 319 320 spdk_nvme_ctrlr_get_default_ctrlr_opts(&ctrlr_opts, sizeof(ctrlr_opts)); 321 ctrlr_opts.keep_alive_timeout_ms = 60 * 1000; 322 g_ctrlr = spdk_nvme_connect(&g_trid, &ctrlr_opts, sizeof(ctrlr_opts)); 323 if (g_ctrlr == NULL) { 324 fprintf(stderr, "spdk_nvme_connect() failed\n"); 325 rc = 1; 326 goto exit; 327 } 328 329 printf("Attached to %s\n", g_trid.subnqn); 330 331 nsid = spdk_nvme_ctrlr_get_first_active_ns(g_ctrlr); 332 if (nsid == 0) { 333 perror("No active namespaces"); 334 exit(1); 335 } 336 g_ns = spdk_nvme_ctrlr_get_ns(g_ctrlr, nsid); 337 338 printf(" Namespace ID: %d size: %juGB\n", spdk_nvme_ns_get_id(g_ns), 339 spdk_nvme_ns_get_size(g_ns) / 1000000000); 340 341 ctrlr_opts_actual = spdk_nvme_ctrlr_get_opts(g_ctrlr); 342 ctrlr_io_queues = ctrlr_opts_actual->num_io_queues; 343 344 /* One qpair per core */ 345 if (g_num_workers > ctrlr_io_queues) { 346 printf("ERROR: Number of IO queues requested %d more then ctrlr caps %d.\n", g_num_workers, 347 ctrlr_io_queues); 348 rc = -1; 349 goto exit; 350 } 351 352 rc = init_workers(); 353 if (rc) { 354 printf("ERROR: Workers initialization failed.\n"); 355 goto exit; 356 } 357 358 for (i = 0; i < 1024; i++) { 359 printf("fused_ordering(%d)\n", i); 360 main_core = spdk_env_get_current_core(); 361 TAILQ_FOREACH(worker, &g_workers, link) { 362 worker->poll_count = i; 363 if (worker->lcore != main_core) { 364 spdk_env_thread_launch_pinned(worker->lcore, fused_ordering, worker); 365 } else { 366 main_worker = worker; 367 } 368 } 369 370 if (main_worker != NULL) { 371 fused_ordering(main_worker); 372 } 373 374 spdk_env_thread_wait_all(); 375 376 TAILQ_FOREACH(worker, &g_workers, link) { 377 if (spdk_unlikely(worker->status != 0)) { 378 SPDK_ERRLOG("Iteration of fused ordering(%d) failed.\n", i - 1); 379 rc = -1; 380 goto exit; 381 } 382 } 383 } 384 385 exit: 386 fini_workers(); 387 unregister_workers(); 388 spdk_nvme_detach(g_ctrlr); 389 spdk_env_fini(); 390 return rc; 391 } 392