1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2017 Intel Corporation 3 */ 4 5 #include <getopt.h> 6 #include <stdint.h> 7 #include <stdio.h> 8 #include <signal.h> 9 #include <sched.h> 10 11 #include "pipeline_common.h" 12 13 struct config_data cdata = { 14 .num_packets = (1L << 25), /* do ~32M packets */ 15 .num_fids = 512, 16 .queue_type = RTE_SCHED_TYPE_ATOMIC, 17 .next_qid = {-1}, 18 .qid = {-1}, 19 .num_stages = 1, 20 .worker_cq_depth = 16 21 }; 22 23 static bool 24 core_in_use(unsigned int lcore_id) { 25 return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] || 26 fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]); 27 } 28 29 static void 30 eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent, 31 void *userdata) 32 { 33 int port_id = (uintptr_t) userdata; 34 unsigned int _sent = 0; 35 36 do { 37 /* Note: hard-coded TX queue */ 38 _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent], 39 unsent - _sent); 40 } while (_sent != unsent); 41 } 42 43 /* 44 * Parse the coremask given as argument (hexadecimal string) and fill 45 * the global configuration (core role and core count) with the parsed 46 * value. 47 */ 48 static int xdigit2val(unsigned char c) 49 { 50 int val; 51 52 if (isdigit(c)) 53 val = c - '0'; 54 else if (isupper(c)) 55 val = c - 'A' + 10; 56 else 57 val = c - 'a' + 10; 58 return val; 59 } 60 61 static uint64_t 62 parse_coremask(const char *coremask) 63 { 64 int i, j, idx = 0; 65 unsigned int count = 0; 66 char c; 67 int val; 68 uint64_t mask = 0; 69 const int32_t BITS_HEX = 4; 70 71 if (coremask == NULL) 72 return -1; 73 /* Remove all blank characters ahead and after . 74 * Remove 0x/0X if exists. 75 */ 76 while (isblank(*coremask)) 77 coremask++; 78 if (coremask[0] == '0' && ((coremask[1] == 'x') 79 || (coremask[1] == 'X'))) 80 coremask += 2; 81 i = strlen(coremask); 82 while ((i > 0) && isblank(coremask[i - 1])) 83 i--; 84 if (i == 0) 85 return -1; 86 87 for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) { 88 c = coremask[i]; 89 if (isxdigit(c) == 0) { 90 /* invalid characters */ 91 return -1; 92 } 93 val = xdigit2val(c); 94 for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) { 95 if ((1 << j) & val) { 96 mask |= (1UL << idx); 97 count++; 98 } 99 } 100 } 101 for (; i >= 0; i--) 102 if (coremask[i] != '0') 103 return -1; 104 if (count == 0) 105 return -1; 106 return mask; 107 } 108 109 static struct option long_options[] = { 110 {"workers", required_argument, 0, 'w'}, 111 {"packets", required_argument, 0, 'n'}, 112 {"atomic-flows", required_argument, 0, 'f'}, 113 {"num_stages", required_argument, 0, 's'}, 114 {"rx-mask", required_argument, 0, 'r'}, 115 {"tx-mask", required_argument, 0, 't'}, 116 {"sched-mask", required_argument, 0, 'e'}, 117 {"cq-depth", required_argument, 0, 'c'}, 118 {"work-cycles", required_argument, 0, 'W'}, 119 {"mempool-size", required_argument, 0, 'm'}, 120 {"queue-priority", no_argument, 0, 'P'}, 121 {"parallel", no_argument, 0, 'p'}, 122 {"ordered", no_argument, 0, 'o'}, 123 {"quiet", no_argument, 0, 'q'}, 124 {"use-atq", no_argument, 0, 'a'}, 125 {"dump", no_argument, 0, 'D'}, 126 {0, 0, 0, 0} 127 }; 128 129 static void 130 usage(void) 131 { 132 const char *usage_str = 133 " Usage: eventdev_demo [options]\n" 134 " Options:\n" 135 " -n, --packets=N Send N packets (default ~32M), 0 implies no limit\n" 136 " -f, --atomic-flows=N Use N random flows from 1 to N (default 16)\n" 137 " -s, --num_stages=N Use N atomic stages (default 1)\n" 138 " -r, --rx-mask=core mask Run NIC rx on CPUs in core mask\n" 139 " -w, --worker-mask=core mask Run worker on CPUs in core mask\n" 140 " -t, --tx-mask=core mask Run NIC tx on CPUs in core mask\n" 141 " -e --sched-mask=core mask Run scheduler on CPUs in core mask\n" 142 " -c --cq-depth=N Worker CQ depth (default 16)\n" 143 " -W --work-cycles=N Worker cycles (default 0)\n" 144 " -P --queue-priority Enable scheduler queue prioritization\n" 145 " -o, --ordered Use ordered scheduling\n" 146 " -p, --parallel Use parallel scheduling\n" 147 " -q, --quiet Minimize printed output\n" 148 " -a, --use-atq Use all type queues\n" 149 " -m, --mempool-size=N Dictate the mempool size\n" 150 " -D, --dump Print detailed statistics before exit" 151 "\n"; 152 fprintf(stderr, "%s", usage_str); 153 exit(1); 154 } 155 156 static void 157 parse_app_args(int argc, char **argv) 158 { 159 /* Parse cli options*/ 160 int option_index; 161 int c; 162 opterr = 0; 163 uint64_t rx_lcore_mask = 0; 164 uint64_t tx_lcore_mask = 0; 165 uint64_t sched_lcore_mask = 0; 166 uint64_t worker_lcore_mask = 0; 167 int i; 168 169 for (;;) { 170 c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:", 171 long_options, &option_index); 172 if (c == -1) 173 break; 174 175 int popcnt = 0; 176 switch (c) { 177 case 'n': 178 cdata.num_packets = (int64_t)atol(optarg); 179 if (cdata.num_packets == 0) 180 cdata.num_packets = INT64_MAX; 181 break; 182 case 'f': 183 cdata.num_fids = (unsigned int)atoi(optarg); 184 break; 185 case 's': 186 cdata.num_stages = (unsigned int)atoi(optarg); 187 break; 188 case 'c': 189 cdata.worker_cq_depth = (unsigned int)atoi(optarg); 190 break; 191 case 'W': 192 cdata.worker_cycles = (unsigned int)atoi(optarg); 193 break; 194 case 'P': 195 cdata.enable_queue_priorities = 1; 196 break; 197 case 'o': 198 cdata.queue_type = RTE_SCHED_TYPE_ORDERED; 199 break; 200 case 'p': 201 cdata.queue_type = RTE_SCHED_TYPE_PARALLEL; 202 break; 203 case 'a': 204 cdata.all_type_queues = 1; 205 break; 206 case 'q': 207 cdata.quiet = 1; 208 break; 209 case 'D': 210 cdata.dump_dev = 1; 211 break; 212 case 'w': 213 worker_lcore_mask = parse_coremask(optarg); 214 break; 215 case 'r': 216 rx_lcore_mask = parse_coremask(optarg); 217 popcnt = __builtin_popcountll(rx_lcore_mask); 218 fdata->rx_single = (popcnt == 1); 219 break; 220 case 't': 221 tx_lcore_mask = parse_coremask(optarg); 222 popcnt = __builtin_popcountll(tx_lcore_mask); 223 fdata->tx_single = (popcnt == 1); 224 break; 225 case 'e': 226 sched_lcore_mask = parse_coremask(optarg); 227 popcnt = __builtin_popcountll(sched_lcore_mask); 228 fdata->sched_single = (popcnt == 1); 229 break; 230 case 'm': 231 cdata.num_mbuf = (uint64_t)atol(optarg); 232 break; 233 default: 234 usage(); 235 } 236 } 237 238 cdata.worker_lcore_mask = worker_lcore_mask; 239 cdata.sched_lcore_mask = sched_lcore_mask; 240 cdata.rx_lcore_mask = rx_lcore_mask; 241 cdata.tx_lcore_mask = tx_lcore_mask; 242 243 if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES) 244 usage(); 245 246 for (i = 0; i < MAX_NUM_CORE; i++) { 247 fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i)); 248 fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i)); 249 fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i)); 250 fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i)); 251 252 if (fdata->worker_core[i]) 253 cdata.num_workers++; 254 if (core_in_use(i)) 255 cdata.active_cores++; 256 } 257 } 258 259 /* 260 * Initializes a given port using global settings and with the RX buffers 261 * coming from the mbuf_pool passed as a parameter. 262 */ 263 static inline int 264 port_init(uint8_t port, struct rte_mempool *mbuf_pool) 265 { 266 static const struct rte_eth_conf port_conf_default = { 267 .rxmode = { 268 .mq_mode = ETH_MQ_RX_RSS, 269 .max_rx_pkt_len = ETHER_MAX_LEN, 270 }, 271 .rx_adv_conf = { 272 .rss_conf = { 273 .rss_hf = ETH_RSS_IP | 274 ETH_RSS_TCP | 275 ETH_RSS_UDP, 276 } 277 } 278 }; 279 const uint16_t rx_rings = 1, tx_rings = 1; 280 const uint16_t rx_ring_size = 512, tx_ring_size = 512; 281 struct rte_eth_conf port_conf = port_conf_default; 282 int retval; 283 uint16_t q; 284 struct rte_eth_dev_info dev_info; 285 struct rte_eth_txconf txconf; 286 287 if (!rte_eth_dev_is_valid_port(port)) 288 return -1; 289 290 rte_eth_dev_info_get(port, &dev_info); 291 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 292 port_conf.txmode.offloads |= 293 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 294 295 port_conf.rx_adv_conf.rss_conf.rss_hf &= 296 dev_info.flow_type_rss_offloads; 297 if (port_conf.rx_adv_conf.rss_conf.rss_hf != 298 port_conf_default.rx_adv_conf.rss_conf.rss_hf) { 299 printf("Port %u modified RSS hash function based on hardware support," 300 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 301 port, 302 port_conf_default.rx_adv_conf.rss_conf.rss_hf, 303 port_conf.rx_adv_conf.rss_conf.rss_hf); 304 } 305 306 /* Configure the Ethernet device. */ 307 retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); 308 if (retval != 0) 309 return retval; 310 311 /* Allocate and set up 1 RX queue per Ethernet port. */ 312 for (q = 0; q < rx_rings; q++) { 313 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, 314 rte_eth_dev_socket_id(port), NULL, mbuf_pool); 315 if (retval < 0) 316 return retval; 317 } 318 319 txconf = dev_info.default_txconf; 320 txconf.offloads = port_conf_default.txmode.offloads; 321 /* Allocate and set up 1 TX queue per Ethernet port. */ 322 for (q = 0; q < tx_rings; q++) { 323 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, 324 rte_eth_dev_socket_id(port), &txconf); 325 if (retval < 0) 326 return retval; 327 } 328 329 /* Start the Ethernet port. */ 330 retval = rte_eth_dev_start(port); 331 if (retval < 0) 332 return retval; 333 334 /* Display the port MAC address. */ 335 struct ether_addr addr; 336 rte_eth_macaddr_get(port, &addr); 337 printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 338 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", 339 (unsigned int)port, 340 addr.addr_bytes[0], addr.addr_bytes[1], 341 addr.addr_bytes[2], addr.addr_bytes[3], 342 addr.addr_bytes[4], addr.addr_bytes[5]); 343 344 /* Enable RX in promiscuous mode for the Ethernet device. */ 345 rte_eth_promiscuous_enable(port); 346 347 return 0; 348 } 349 350 static int 351 init_ports(uint16_t num_ports) 352 { 353 uint16_t portid, i; 354 355 if (!cdata.num_mbuf) 356 cdata.num_mbuf = 16384 * num_ports; 357 358 struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool", 359 /* mbufs */ cdata.num_mbuf, 360 /* cache_size */ 512, 361 /* priv_size*/ 0, 362 /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, 363 rte_socket_id()); 364 365 RTE_ETH_FOREACH_DEV(portid) 366 if (port_init(portid, mp) != 0) 367 rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", 368 portid); 369 370 RTE_ETH_FOREACH_DEV(i) { 371 void *userdata = (void *)(uintptr_t) i; 372 fdata->tx_buf[i] = 373 rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0); 374 if (fdata->tx_buf[i] == NULL) 375 rte_panic("Out of memory\n"); 376 rte_eth_tx_buffer_init(fdata->tx_buf[i], 32); 377 rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i], 378 eth_tx_buffer_retry, 379 userdata); 380 } 381 382 return 0; 383 } 384 385 static void 386 do_capability_setup(uint8_t eventdev_id) 387 { 388 uint16_t i; 389 uint8_t mt_unsafe = 0; 390 uint8_t burst = 0; 391 392 RTE_ETH_FOREACH_DEV(i) { 393 struct rte_eth_dev_info dev_info; 394 memset(&dev_info, 0, sizeof(struct rte_eth_dev_info)); 395 396 rte_eth_dev_info_get(i, &dev_info); 397 /* Check if it is safe ask worker to tx. */ 398 mt_unsafe |= !(dev_info.tx_offload_capa & 399 DEV_TX_OFFLOAD_MT_LOCKFREE); 400 } 401 402 struct rte_event_dev_info eventdev_info; 403 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); 404 405 rte_event_dev_info_get(eventdev_id, &eventdev_info); 406 burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 : 407 0; 408 409 if (mt_unsafe) 410 set_worker_generic_setup_data(&fdata->cap, burst); 411 else 412 set_worker_tx_setup_data(&fdata->cap, burst); 413 } 414 415 static void 416 signal_handler(int signum) 417 { 418 if (fdata->done) 419 rte_exit(1, "Exiting on signal %d\n", signum); 420 if (signum == SIGINT || signum == SIGTERM) { 421 printf("\n\nSignal %d received, preparing to exit...\n", 422 signum); 423 fdata->done = 1; 424 } 425 if (signum == SIGTSTP) 426 rte_event_dev_dump(0, stdout); 427 } 428 429 static inline uint64_t 430 port_stat(int dev_id, int32_t p) 431 { 432 char statname[64]; 433 snprintf(statname, sizeof(statname), "port_%u_rx", p); 434 return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL); 435 } 436 437 int 438 main(int argc, char **argv) 439 { 440 struct worker_data *worker_data; 441 uint16_t num_ports; 442 int lcore_id; 443 int err; 444 445 signal(SIGINT, signal_handler); 446 signal(SIGTERM, signal_handler); 447 signal(SIGTSTP, signal_handler); 448 449 err = rte_eal_init(argc, argv); 450 if (err < 0) 451 rte_panic("Invalid EAL arguments\n"); 452 453 argc -= err; 454 argv += err; 455 456 fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0); 457 if (fdata == NULL) 458 rte_panic("Out of memory\n"); 459 460 /* Parse cli options*/ 461 parse_app_args(argc, argv); 462 463 num_ports = rte_eth_dev_count_avail(); 464 if (num_ports == 0) 465 rte_panic("No ethernet ports found\n"); 466 467 const unsigned int cores_needed = cdata.active_cores; 468 469 if (!cdata.quiet) { 470 printf(" Config:\n"); 471 printf("\tports: %u\n", num_ports); 472 printf("\tworkers: %u\n", cdata.num_workers); 473 printf("\tpackets: %"PRIi64"\n", cdata.num_packets); 474 printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities); 475 if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED) 476 printf("\tqid0 type: ordered\n"); 477 if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC) 478 printf("\tqid0 type: atomic\n"); 479 printf("\tCores available: %u\n", rte_lcore_count()); 480 printf("\tCores used: %u\n", cores_needed); 481 } 482 483 if (rte_lcore_count() < cores_needed) 484 rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(), 485 cores_needed); 486 487 const unsigned int ndevs = rte_event_dev_count(); 488 if (ndevs == 0) 489 rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n"); 490 if (ndevs > 1) 491 fprintf(stderr, "Warning: More than one eventdev, using idx 0"); 492 493 494 do_capability_setup(0); 495 fdata->cap.check_opt(); 496 497 worker_data = rte_calloc(0, cdata.num_workers, 498 sizeof(worker_data[0]), 0); 499 if (worker_data == NULL) 500 rte_panic("rte_calloc failed\n"); 501 502 int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data); 503 if (dev_id < 0) 504 rte_exit(EXIT_FAILURE, "Error setting up eventdev\n"); 505 506 init_ports(num_ports); 507 fdata->cap.adptr_setup(num_ports); 508 509 int worker_idx = 0; 510 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 511 if (lcore_id >= MAX_NUM_CORE) 512 break; 513 514 if (!fdata->rx_core[lcore_id] && 515 !fdata->worker_core[lcore_id] && 516 !fdata->tx_core[lcore_id] && 517 !fdata->sched_core[lcore_id]) 518 continue; 519 520 if (fdata->rx_core[lcore_id]) 521 printf( 522 "[%s()] lcore %d executing NIC Rx\n", 523 __func__, lcore_id); 524 525 if (fdata->tx_core[lcore_id]) 526 printf( 527 "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n", 528 __func__, lcore_id, cons_data.port_id); 529 530 if (fdata->sched_core[lcore_id]) 531 printf("[%s()] lcore %d executing scheduler\n", 532 __func__, lcore_id); 533 534 if (fdata->worker_core[lcore_id]) 535 printf( 536 "[%s()] lcore %d executing worker, using eventdev port %u\n", 537 __func__, lcore_id, 538 worker_data[worker_idx].port_id); 539 540 err = rte_eal_remote_launch(fdata->cap.worker, 541 &worker_data[worker_idx], lcore_id); 542 if (err) { 543 rte_panic("Failed to launch worker on core %d\n", 544 lcore_id); 545 continue; 546 } 547 if (fdata->worker_core[lcore_id]) 548 worker_idx++; 549 } 550 551 lcore_id = rte_lcore_id(); 552 553 if (core_in_use(lcore_id)) 554 fdata->cap.worker(&worker_data[worker_idx++]); 555 556 rte_eal_mp_wait_lcore(); 557 558 if (cdata.dump_dev) 559 rte_event_dev_dump(dev_id, stdout); 560 561 if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) != 562 (uint64_t)-ENOTSUP)) { 563 printf("\nPort Workload distribution:\n"); 564 uint32_t i; 565 uint64_t tot_pkts = 0; 566 uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0}; 567 for (i = 0; i < cdata.num_workers; i++) { 568 pkts_per_wkr[i] = 569 port_stat(dev_id, worker_data[i].port_id); 570 tot_pkts += pkts_per_wkr[i]; 571 } 572 for (i = 0; i < cdata.num_workers; i++) { 573 float pc = pkts_per_wkr[i] * 100 / 574 ((float)tot_pkts); 575 printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n", 576 i, pc, pkts_per_wkr[i]); 577 } 578 579 } 580 581 return 0; 582 } 583