1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include "test.h" 6 7 #include <unistd.h> 8 #include <string.h> 9 #include <rte_cycles.h> 10 #include <rte_errno.h> 11 #include <rte_mempool.h> 12 #include <rte_mbuf.h> 13 #include <rte_distributor.h> 14 #include <rte_string_fns.h> 15 16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */ 17 #define BURST 32 18 #define BIG_BATCH 1024 19 20 struct worker_params { 21 char name[64]; 22 struct rte_distributor *dist; 23 }; 24 25 struct worker_params worker_params; 26 27 /* statics - all zero-initialized by default */ 28 static volatile int quit; /**< general quit variable for all threads */ 29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/ 30 static volatile unsigned worker_idx; 31 32 struct worker_stats { 33 volatile unsigned handled_packets; 34 } __rte_cache_aligned; 35 struct worker_stats worker_stats[RTE_MAX_LCORE]; 36 37 /* returns the total count of the number of packets handled by the worker 38 * functions given below. 39 */ 40 static inline unsigned 41 total_packet_count(void) 42 { 43 unsigned i, count = 0; 44 for (i = 0; i < worker_idx; i++) 45 count += worker_stats[i].handled_packets; 46 return count; 47 } 48 49 /* resets the packet counts for a new test */ 50 static inline void 51 clear_packet_count(void) 52 { 53 memset(&worker_stats, 0, sizeof(worker_stats)); 54 } 55 56 /* this is the basic worker function for sanity test 57 * it does nothing but return packets and count them. 58 */ 59 static int 60 handle_work(void *arg) 61 { 62 struct rte_mbuf *buf[8] __rte_cache_aligned; 63 struct worker_params *wp = arg; 64 struct rte_distributor *db = wp->dist; 65 unsigned int count = 0, num = 0; 66 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 67 int i; 68 69 for (i = 0; i < 8; i++) 70 buf[i] = NULL; 71 num = rte_distributor_get_pkt(db, id, buf, buf, num); 72 while (!quit) { 73 worker_stats[id].handled_packets += num; 74 count += num; 75 num = rte_distributor_get_pkt(db, id, 76 buf, buf, num); 77 } 78 worker_stats[id].handled_packets += num; 79 count += num; 80 rte_distributor_return_pkt(db, id, buf, num); 81 return 0; 82 } 83 84 /* do basic sanity testing of the distributor. This test tests the following: 85 * - send 32 packets through distributor with the same tag and ensure they 86 * all go to the one worker 87 * - send 32 packets through the distributor with two different tags and 88 * verify that they go equally to two different workers. 89 * - send 32 packets with different tags through the distributors and 90 * just verify we get all packets back. 91 * - send 1024 packets through the distributor, gathering the returned packets 92 * as we go. Then verify that we correctly got all 1024 pointers back again, 93 * not necessarily in the same order (as different flows). 94 */ 95 static int 96 sanity_test(struct worker_params *wp, struct rte_mempool *p) 97 { 98 struct rte_distributor *db = wp->dist; 99 struct rte_mbuf *bufs[BURST]; 100 struct rte_mbuf *returns[BURST*2]; 101 unsigned int i, count; 102 unsigned int retries; 103 104 printf("=== Basic distributor sanity tests ===\n"); 105 clear_packet_count(); 106 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 107 printf("line %d: Error getting mbufs from pool\n", __LINE__); 108 return -1; 109 } 110 111 /* now set all hash values in all buffers to zero, so all pkts go to the 112 * one worker thread */ 113 for (i = 0; i < BURST; i++) 114 bufs[i]->hash.usr = 0; 115 116 rte_distributor_process(db, bufs, BURST); 117 count = 0; 118 do { 119 120 rte_distributor_flush(db); 121 count += rte_distributor_returned_pkts(db, 122 returns, BURST*2); 123 } while (count < BURST); 124 125 if (total_packet_count() != BURST) { 126 printf("Line %d: Error, not all packets flushed. " 127 "Expected %u, got %u\n", 128 __LINE__, BURST, total_packet_count()); 129 return -1; 130 } 131 132 for (i = 0; i < rte_lcore_count() - 1; i++) 133 printf("Worker %u handled %u packets\n", i, 134 worker_stats[i].handled_packets); 135 printf("Sanity test with all zero hashes done.\n"); 136 137 /* pick two flows and check they go correctly */ 138 if (rte_lcore_count() >= 3) { 139 clear_packet_count(); 140 for (i = 0; i < BURST; i++) 141 bufs[i]->hash.usr = (i & 1) << 8; 142 143 rte_distributor_process(db, bufs, BURST); 144 count = 0; 145 do { 146 rte_distributor_flush(db); 147 count += rte_distributor_returned_pkts(db, 148 returns, BURST*2); 149 } while (count < BURST); 150 if (total_packet_count() != BURST) { 151 printf("Line %d: Error, not all packets flushed. " 152 "Expected %u, got %u\n", 153 __LINE__, BURST, total_packet_count()); 154 return -1; 155 } 156 157 for (i = 0; i < rte_lcore_count() - 1; i++) 158 printf("Worker %u handled %u packets\n", i, 159 worker_stats[i].handled_packets); 160 printf("Sanity test with two hash values done\n"); 161 } 162 163 /* give a different hash value to each packet, 164 * so load gets distributed */ 165 clear_packet_count(); 166 for (i = 0; i < BURST; i++) 167 bufs[i]->hash.usr = i+1; 168 169 rte_distributor_process(db, bufs, BURST); 170 count = 0; 171 do { 172 rte_distributor_flush(db); 173 count += rte_distributor_returned_pkts(db, 174 returns, BURST*2); 175 } while (count < BURST); 176 if (total_packet_count() != BURST) { 177 printf("Line %d: Error, not all packets flushed. " 178 "Expected %u, got %u\n", 179 __LINE__, BURST, total_packet_count()); 180 return -1; 181 } 182 183 for (i = 0; i < rte_lcore_count() - 1; i++) 184 printf("Worker %u handled %u packets\n", i, 185 worker_stats[i].handled_packets); 186 printf("Sanity test with non-zero hashes done\n"); 187 188 rte_mempool_put_bulk(p, (void *)bufs, BURST); 189 190 /* sanity test with BIG_BATCH packets to ensure they all arrived back 191 * from the returned packets function */ 192 clear_packet_count(); 193 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH]; 194 unsigned num_returned = 0; 195 196 /* flush out any remaining packets */ 197 rte_distributor_flush(db); 198 rte_distributor_clear_returns(db); 199 200 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) { 201 printf("line %d: Error getting mbufs from pool\n", __LINE__); 202 return -1; 203 } 204 for (i = 0; i < BIG_BATCH; i++) 205 many_bufs[i]->hash.usr = i << 2; 206 207 printf("=== testing big burst (%s) ===\n", wp->name); 208 for (i = 0; i < BIG_BATCH/BURST; i++) { 209 rte_distributor_process(db, 210 &many_bufs[i*BURST], BURST); 211 count = rte_distributor_returned_pkts(db, 212 &return_bufs[num_returned], 213 BIG_BATCH - num_returned); 214 num_returned += count; 215 } 216 rte_distributor_flush(db); 217 count = rte_distributor_returned_pkts(db, 218 &return_bufs[num_returned], 219 BIG_BATCH - num_returned); 220 num_returned += count; 221 retries = 0; 222 do { 223 rte_distributor_flush(db); 224 count = rte_distributor_returned_pkts(db, 225 &return_bufs[num_returned], 226 BIG_BATCH - num_returned); 227 num_returned += count; 228 retries++; 229 } while ((num_returned < BIG_BATCH) && (retries < 100)); 230 231 if (num_returned != BIG_BATCH) { 232 printf("line %d: Missing packets, expected %d\n", 233 __LINE__, num_returned); 234 return -1; 235 } 236 237 /* big check - make sure all packets made it back!! */ 238 for (i = 0; i < BIG_BATCH; i++) { 239 unsigned j; 240 struct rte_mbuf *src = many_bufs[i]; 241 for (j = 0; j < BIG_BATCH; j++) { 242 if (return_bufs[j] == src) 243 break; 244 } 245 246 if (j == BIG_BATCH) { 247 printf("Error: could not find source packet #%u\n", i); 248 return -1; 249 } 250 } 251 printf("Sanity test of returned packets done\n"); 252 253 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 254 255 printf("\n"); 256 return 0; 257 } 258 259 260 /* to test that the distributor does not lose packets, we use this worker 261 * function which frees mbufs when it gets them. The distributor thread does 262 * the mbuf allocation. If distributor drops packets we'll eventually run out 263 * of mbufs. 264 */ 265 static int 266 handle_work_with_free_mbufs(void *arg) 267 { 268 struct rte_mbuf *buf[8] __rte_cache_aligned; 269 struct worker_params *wp = arg; 270 struct rte_distributor *d = wp->dist; 271 unsigned int count = 0; 272 unsigned int i; 273 unsigned int num = 0; 274 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 275 276 for (i = 0; i < 8; i++) 277 buf[i] = NULL; 278 num = rte_distributor_get_pkt(d, id, buf, buf, num); 279 while (!quit) { 280 worker_stats[id].handled_packets += num; 281 count += num; 282 for (i = 0; i < num; i++) 283 rte_pktmbuf_free(buf[i]); 284 num = rte_distributor_get_pkt(d, 285 id, buf, buf, num); 286 } 287 worker_stats[id].handled_packets += num; 288 count += num; 289 rte_distributor_return_pkt(d, id, buf, num); 290 return 0; 291 } 292 293 /* Perform a sanity test of the distributor with a large number of packets, 294 * where we allocate a new set of mbufs for each burst. The workers then 295 * free the mbufs. This ensures that we don't have any packet leaks in the 296 * library. 297 */ 298 static int 299 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) 300 { 301 struct rte_distributor *d = wp->dist; 302 unsigned i; 303 struct rte_mbuf *bufs[BURST]; 304 305 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name); 306 307 clear_packet_count(); 308 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) { 309 unsigned j; 310 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) 311 rte_distributor_process(d, NULL, 0); 312 for (j = 0; j < BURST; j++) { 313 bufs[j]->hash.usr = (i+j) << 1; 314 rte_mbuf_refcnt_set(bufs[j], 1); 315 } 316 317 rte_distributor_process(d, bufs, BURST); 318 } 319 320 rte_distributor_flush(d); 321 322 rte_delay_us(10000); 323 324 if (total_packet_count() < (1<<ITER_POWER)) { 325 printf("Line %u: Packet count is incorrect, %u, expected %u\n", 326 __LINE__, total_packet_count(), 327 (1<<ITER_POWER)); 328 return -1; 329 } 330 331 printf("Sanity test with mbuf alloc/free passed\n\n"); 332 return 0; 333 } 334 335 static int 336 handle_work_for_shutdown_test(void *arg) 337 { 338 struct rte_mbuf *pkt = NULL; 339 struct rte_mbuf *buf[8] __rte_cache_aligned; 340 struct worker_params *wp = arg; 341 struct rte_distributor *d = wp->dist; 342 unsigned int count = 0; 343 unsigned int num = 0; 344 unsigned int total = 0; 345 unsigned int i; 346 unsigned int returned = 0; 347 const unsigned int id = __atomic_fetch_add(&worker_idx, 1, 348 __ATOMIC_RELAXED); 349 350 num = rte_distributor_get_pkt(d, id, buf, buf, num); 351 352 /* wait for quit single globally, or for worker zero, wait 353 * for zero_quit */ 354 while (!quit && !(id == 0 && zero_quit)) { 355 worker_stats[id].handled_packets += num; 356 count += num; 357 for (i = 0; i < num; i++) 358 rte_pktmbuf_free(buf[i]); 359 num = rte_distributor_get_pkt(d, 360 id, buf, buf, num); 361 total += num; 362 } 363 worker_stats[id].handled_packets += num; 364 count += num; 365 returned = rte_distributor_return_pkt(d, id, buf, num); 366 367 if (id == 0) { 368 /* for worker zero, allow it to restart to pick up last packet 369 * when all workers are shutting down. 370 */ 371 while (zero_quit) 372 usleep(100); 373 374 num = rte_distributor_get_pkt(d, 375 id, buf, buf, num); 376 377 while (!quit) { 378 worker_stats[id].handled_packets += num; 379 count += num; 380 rte_pktmbuf_free(pkt); 381 num = rte_distributor_get_pkt(d, id, buf, buf, num); 382 } 383 returned = rte_distributor_return_pkt(d, 384 id, buf, num); 385 printf("Num returned = %d\n", returned); 386 } 387 return 0; 388 } 389 390 391 /* Perform a sanity test of the distributor with a large number of packets, 392 * where we allocate a new set of mbufs for each burst. The workers then 393 * free the mbufs. This ensures that we don't have any packet leaks in the 394 * library. 395 */ 396 static int 397 sanity_test_with_worker_shutdown(struct worker_params *wp, 398 struct rte_mempool *p) 399 { 400 struct rte_distributor *d = wp->dist; 401 struct rte_mbuf *bufs[BURST]; 402 unsigned i; 403 404 printf("=== Sanity test of worker shutdown ===\n"); 405 406 clear_packet_count(); 407 408 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 409 printf("line %d: Error getting mbufs from pool\n", __LINE__); 410 return -1; 411 } 412 413 /* 414 * Now set all hash values in all buffers to same value so all 415 * pkts go to the one worker thread 416 */ 417 for (i = 0; i < BURST; i++) 418 bufs[i]->hash.usr = 1; 419 420 rte_distributor_process(d, bufs, BURST); 421 rte_distributor_flush(d); 422 423 /* at this point, we will have processed some packets and have a full 424 * backlog for the other ones at worker 0. 425 */ 426 427 /* get more buffers to queue up, again setting them to the same flow */ 428 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 429 printf("line %d: Error getting mbufs from pool\n", __LINE__); 430 return -1; 431 } 432 for (i = 0; i < BURST; i++) 433 bufs[i]->hash.usr = 1; 434 435 /* get worker zero to quit */ 436 zero_quit = 1; 437 rte_distributor_process(d, bufs, BURST); 438 439 /* flush the distributor */ 440 rte_distributor_flush(d); 441 rte_delay_us(10000); 442 443 for (i = 0; i < rte_lcore_count() - 1; i++) 444 printf("Worker %u handled %u packets\n", i, 445 worker_stats[i].handled_packets); 446 447 if (total_packet_count() != BURST * 2) { 448 printf("Line %d: Error, not all packets flushed. " 449 "Expected %u, got %u\n", 450 __LINE__, BURST * 2, total_packet_count()); 451 return -1; 452 } 453 454 printf("Sanity test with worker shutdown passed\n\n"); 455 return 0; 456 } 457 458 /* Test that the flush function is able to move packets between workers when 459 * one worker shuts down.. 460 */ 461 static int 462 test_flush_with_worker_shutdown(struct worker_params *wp, 463 struct rte_mempool *p) 464 { 465 struct rte_distributor *d = wp->dist; 466 struct rte_mbuf *bufs[BURST]; 467 unsigned i; 468 469 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name); 470 471 clear_packet_count(); 472 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 473 printf("line %d: Error getting mbufs from pool\n", __LINE__); 474 return -1; 475 } 476 477 /* now set all hash values in all buffers to zero, so all pkts go to the 478 * one worker thread */ 479 for (i = 0; i < BURST; i++) 480 bufs[i]->hash.usr = 0; 481 482 rte_distributor_process(d, bufs, BURST); 483 /* at this point, we will have processed some packets and have a full 484 * backlog for the other ones at worker 0. 485 */ 486 487 /* get worker zero to quit */ 488 zero_quit = 1; 489 490 /* flush the distributor */ 491 rte_distributor_flush(d); 492 493 rte_delay_us(10000); 494 495 zero_quit = 0; 496 for (i = 0; i < rte_lcore_count() - 1; i++) 497 printf("Worker %u handled %u packets\n", i, 498 worker_stats[i].handled_packets); 499 500 if (total_packet_count() != BURST) { 501 printf("Line %d: Error, not all packets flushed. " 502 "Expected %u, got %u\n", 503 __LINE__, BURST, total_packet_count()); 504 return -1; 505 } 506 507 printf("Flush test with worker shutdown passed\n\n"); 508 return 0; 509 } 510 511 static 512 int test_error_distributor_create_name(void) 513 { 514 struct rte_distributor *d = NULL; 515 struct rte_distributor *db = NULL; 516 char *name = NULL; 517 518 d = rte_distributor_create(name, rte_socket_id(), 519 rte_lcore_count() - 1, 520 RTE_DIST_ALG_SINGLE); 521 if (d != NULL || rte_errno != EINVAL) { 522 printf("ERROR: No error on create() with NULL name param\n"); 523 return -1; 524 } 525 526 db = rte_distributor_create(name, rte_socket_id(), 527 rte_lcore_count() - 1, 528 RTE_DIST_ALG_BURST); 529 if (db != NULL || rte_errno != EINVAL) { 530 printf("ERROR: No error on create() with NULL param\n"); 531 return -1; 532 } 533 534 return 0; 535 } 536 537 538 static 539 int test_error_distributor_create_numworkers(void) 540 { 541 struct rte_distributor *ds = NULL; 542 struct rte_distributor *db = NULL; 543 544 ds = rte_distributor_create("test_numworkers", rte_socket_id(), 545 RTE_MAX_LCORE + 10, 546 RTE_DIST_ALG_SINGLE); 547 if (ds != NULL || rte_errno != EINVAL) { 548 printf("ERROR: No error on create() with num_workers > MAX\n"); 549 return -1; 550 } 551 552 db = rte_distributor_create("test_numworkers", rte_socket_id(), 553 RTE_MAX_LCORE + 10, 554 RTE_DIST_ALG_BURST); 555 if (db != NULL || rte_errno != EINVAL) { 556 printf("ERROR: No error on create() num_workers > MAX\n"); 557 return -1; 558 } 559 560 return 0; 561 } 562 563 564 /* Useful function which ensures that all worker functions terminate */ 565 static void 566 quit_workers(struct worker_params *wp, struct rte_mempool *p) 567 { 568 struct rte_distributor *d = wp->dist; 569 const unsigned num_workers = rte_lcore_count() - 1; 570 unsigned i; 571 struct rte_mbuf *bufs[RTE_MAX_LCORE]; 572 rte_mempool_get_bulk(p, (void *)bufs, num_workers); 573 574 zero_quit = 0; 575 quit = 1; 576 for (i = 0; i < num_workers; i++) 577 bufs[i]->hash.usr = i << 1; 578 rte_distributor_process(d, bufs, num_workers); 579 580 rte_mempool_put_bulk(p, (void *)bufs, num_workers); 581 582 rte_distributor_process(d, NULL, 0); 583 rte_distributor_flush(d); 584 rte_eal_mp_wait_lcore(); 585 quit = 0; 586 worker_idx = 0; 587 } 588 589 static int 590 test_distributor(void) 591 { 592 static struct rte_distributor *ds; 593 static struct rte_distributor *db; 594 static struct rte_distributor *dist[2]; 595 static struct rte_mempool *p; 596 int i; 597 598 if (rte_lcore_count() < 2) { 599 printf("Not enough cores for distributor_autotest, expecting at least 2\n"); 600 return TEST_SKIPPED; 601 } 602 603 if (db == NULL) { 604 db = rte_distributor_create("Test_dist_burst", rte_socket_id(), 605 rte_lcore_count() - 1, 606 RTE_DIST_ALG_BURST); 607 if (db == NULL) { 608 printf("Error creating burst distributor\n"); 609 return -1; 610 } 611 } else { 612 rte_distributor_flush(db); 613 rte_distributor_clear_returns(db); 614 } 615 616 if (ds == NULL) { 617 ds = rte_distributor_create("Test_dist_single", 618 rte_socket_id(), 619 rte_lcore_count() - 1, 620 RTE_DIST_ALG_SINGLE); 621 if (ds == NULL) { 622 printf("Error creating single distributor\n"); 623 return -1; 624 } 625 } else { 626 rte_distributor_flush(ds); 627 rte_distributor_clear_returns(ds); 628 } 629 630 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ? 631 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count()); 632 if (p == NULL) { 633 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST, 634 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 635 if (p == NULL) { 636 printf("Error creating mempool\n"); 637 return -1; 638 } 639 } 640 641 dist[0] = ds; 642 dist[1] = db; 643 644 for (i = 0; i < 2; i++) { 645 646 worker_params.dist = dist[i]; 647 if (i) 648 strlcpy(worker_params.name, "burst", 649 sizeof(worker_params.name)); 650 else 651 strlcpy(worker_params.name, "single", 652 sizeof(worker_params.name)); 653 654 rte_eal_mp_remote_launch(handle_work, 655 &worker_params, SKIP_MASTER); 656 if (sanity_test(&worker_params, p) < 0) 657 goto err; 658 quit_workers(&worker_params, p); 659 660 rte_eal_mp_remote_launch(handle_work_with_free_mbufs, 661 &worker_params, SKIP_MASTER); 662 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0) 663 goto err; 664 quit_workers(&worker_params, p); 665 666 if (rte_lcore_count() > 2) { 667 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 668 &worker_params, 669 SKIP_MASTER); 670 if (sanity_test_with_worker_shutdown(&worker_params, 671 p) < 0) 672 goto err; 673 quit_workers(&worker_params, p); 674 675 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 676 &worker_params, 677 SKIP_MASTER); 678 if (test_flush_with_worker_shutdown(&worker_params, 679 p) < 0) 680 goto err; 681 quit_workers(&worker_params, p); 682 683 } else { 684 printf("Too few cores to run worker shutdown test\n"); 685 } 686 687 } 688 689 if (test_error_distributor_create_numworkers() == -1 || 690 test_error_distributor_create_name() == -1) { 691 printf("rte_distributor_create parameter check tests failed"); 692 return -1; 693 } 694 695 return 0; 696 697 err: 698 quit_workers(&worker_params, p); 699 return -1; 700 } 701 702 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor); 703