1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2010-2017 Intel Corporation 3 */ 4 5 #include "test.h" 6 7 #include <unistd.h> 8 #include <string.h> 9 #include <rte_cycles.h> 10 #include <rte_errno.h> 11 #include <rte_mempool.h> 12 #include <rte_mbuf.h> 13 #include <rte_mbuf_dyn.h> 14 #include <rte_distributor.h> 15 #include <rte_string_fns.h> 16 17 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */ 18 #define BURST 32 19 #define BIG_BATCH 1024 20 21 typedef uint32_t seq_dynfield_t; 22 static int seq_dynfield_offset = -1; 23 24 static inline seq_dynfield_t * 25 seq_field(struct rte_mbuf *mbuf) 26 { 27 return RTE_MBUF_DYNFIELD(mbuf, seq_dynfield_offset, seq_dynfield_t *); 28 } 29 30 struct worker_params { 31 char name[64]; 32 struct rte_distributor *dist; 33 }; 34 35 struct worker_params worker_params; 36 37 /* statics - all zero-initialized by default */ 38 static volatile int quit; /**< general quit variable for all threads */ 39 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/ 40 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/ 41 static volatile unsigned worker_idx; 42 static volatile unsigned zero_idx; 43 44 struct worker_stats { 45 volatile unsigned handled_packets; 46 } __rte_cache_aligned; 47 struct worker_stats worker_stats[RTE_MAX_LCORE]; 48 49 /* returns the total count of the number of packets handled by the worker 50 * functions given below. 51 */ 52 static inline unsigned 53 total_packet_count(void) 54 { 55 unsigned i, count = 0; 56 for (i = 0; i < worker_idx; i++) 57 count += __atomic_load_n(&worker_stats[i].handled_packets, 58 __ATOMIC_RELAXED); 59 return count; 60 } 61 62 /* resets the packet counts for a new test */ 63 static inline void 64 clear_packet_count(void) 65 { 66 unsigned int i; 67 for (i = 0; i < RTE_MAX_LCORE; i++) 68 __atomic_store_n(&worker_stats[i].handled_packets, 0, 69 __ATOMIC_RELAXED); 70 } 71 72 /* this is the basic worker function for sanity test 73 * it does nothing but return packets and count them. 74 */ 75 static int 76 handle_work(void *arg) 77 { 78 struct rte_mbuf *buf[8] __rte_cache_aligned; 79 struct worker_params *wp = arg; 80 struct rte_distributor *db = wp->dist; 81 unsigned int num; 82 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 83 84 num = rte_distributor_get_pkt(db, id, buf, NULL, 0); 85 while (!quit) { 86 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 87 __ATOMIC_RELAXED); 88 num = rte_distributor_get_pkt(db, id, 89 buf, buf, num); 90 } 91 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 92 __ATOMIC_RELAXED); 93 rte_distributor_return_pkt(db, id, buf, num); 94 return 0; 95 } 96 97 /* do basic sanity testing of the distributor. This test tests the following: 98 * - send 32 packets through distributor with the same tag and ensure they 99 * all go to the one worker 100 * - send 32 packets through the distributor with two different tags and 101 * verify that they go equally to two different workers. 102 * - send 32 packets with different tags through the distributors and 103 * just verify we get all packets back. 104 * - send 1024 packets through the distributor, gathering the returned packets 105 * as we go. Then verify that we correctly got all 1024 pointers back again, 106 * not necessarily in the same order (as different flows). 107 */ 108 static int 109 sanity_test(struct worker_params *wp, struct rte_mempool *p) 110 { 111 struct rte_distributor *db = wp->dist; 112 struct rte_mbuf *bufs[BURST]; 113 struct rte_mbuf *returns[BURST*2]; 114 unsigned int i, count; 115 unsigned int retries; 116 unsigned int processed; 117 118 printf("=== Basic distributor sanity tests ===\n"); 119 clear_packet_count(); 120 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 121 printf("line %d: Error getting mbufs from pool\n", __LINE__); 122 return -1; 123 } 124 125 /* now set all hash values in all buffers to zero, so all pkts go to the 126 * one worker thread */ 127 for (i = 0; i < BURST; i++) 128 bufs[i]->hash.usr = 0; 129 130 processed = 0; 131 while (processed < BURST) 132 processed += rte_distributor_process(db, &bufs[processed], 133 BURST - processed); 134 135 count = 0; 136 do { 137 138 rte_distributor_flush(db); 139 count += rte_distributor_returned_pkts(db, 140 returns, BURST*2); 141 } while (count < BURST); 142 143 if (total_packet_count() != BURST) { 144 printf("Line %d: Error, not all packets flushed. " 145 "Expected %u, got %u\n", 146 __LINE__, BURST, total_packet_count()); 147 rte_mempool_put_bulk(p, (void *)bufs, BURST); 148 return -1; 149 } 150 151 for (i = 0; i < rte_lcore_count() - 1; i++) 152 printf("Worker %u handled %u packets\n", i, 153 __atomic_load_n(&worker_stats[i].handled_packets, 154 __ATOMIC_RELAXED)); 155 printf("Sanity test with all zero hashes done.\n"); 156 157 /* pick two flows and check they go correctly */ 158 if (rte_lcore_count() >= 3) { 159 clear_packet_count(); 160 for (i = 0; i < BURST; i++) 161 bufs[i]->hash.usr = (i & 1) << 8; 162 163 rte_distributor_process(db, bufs, BURST); 164 count = 0; 165 do { 166 rte_distributor_flush(db); 167 count += rte_distributor_returned_pkts(db, 168 returns, BURST*2); 169 } while (count < BURST); 170 if (total_packet_count() != BURST) { 171 printf("Line %d: Error, not all packets flushed. " 172 "Expected %u, got %u\n", 173 __LINE__, BURST, total_packet_count()); 174 rte_mempool_put_bulk(p, (void *)bufs, BURST); 175 return -1; 176 } 177 178 for (i = 0; i < rte_lcore_count() - 1; i++) 179 printf("Worker %u handled %u packets\n", i, 180 __atomic_load_n( 181 &worker_stats[i].handled_packets, 182 __ATOMIC_RELAXED)); 183 printf("Sanity test with two hash values done\n"); 184 } 185 186 /* give a different hash value to each packet, 187 * so load gets distributed */ 188 clear_packet_count(); 189 for (i = 0; i < BURST; i++) 190 bufs[i]->hash.usr = i+1; 191 192 rte_distributor_process(db, bufs, BURST); 193 count = 0; 194 do { 195 rte_distributor_flush(db); 196 count += rte_distributor_returned_pkts(db, 197 returns, BURST*2); 198 } while (count < BURST); 199 if (total_packet_count() != BURST) { 200 printf("Line %d: Error, not all packets flushed. " 201 "Expected %u, got %u\n", 202 __LINE__, BURST, total_packet_count()); 203 rte_mempool_put_bulk(p, (void *)bufs, BURST); 204 return -1; 205 } 206 207 for (i = 0; i < rte_lcore_count() - 1; i++) 208 printf("Worker %u handled %u packets\n", i, 209 __atomic_load_n(&worker_stats[i].handled_packets, 210 __ATOMIC_RELAXED)); 211 printf("Sanity test with non-zero hashes done\n"); 212 213 rte_mempool_put_bulk(p, (void *)bufs, BURST); 214 215 /* sanity test with BIG_BATCH packets to ensure they all arrived back 216 * from the returned packets function */ 217 clear_packet_count(); 218 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH]; 219 unsigned num_returned = 0; 220 unsigned int num_being_processed = 0; 221 unsigned int return_buffer_capacity = 127;/* RTE_DISTRIB_RETURNS_MASK */ 222 223 /* flush out any remaining packets */ 224 rte_distributor_flush(db); 225 rte_distributor_clear_returns(db); 226 227 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) { 228 printf("line %d: Error getting mbufs from pool\n", __LINE__); 229 return -1; 230 } 231 for (i = 0; i < BIG_BATCH; i++) 232 many_bufs[i]->hash.usr = i << 2; 233 234 printf("=== testing big burst (%s) ===\n", wp->name); 235 for (i = 0; i < BIG_BATCH/BURST; i++) { 236 rte_distributor_process(db, 237 &many_bufs[i*BURST], BURST); 238 num_being_processed += BURST; 239 do { 240 count = rte_distributor_returned_pkts(db, 241 &return_bufs[num_returned], 242 BIG_BATCH - num_returned); 243 num_being_processed -= count; 244 num_returned += count; 245 rte_distributor_flush(db); 246 } while (num_being_processed + BURST > return_buffer_capacity); 247 } 248 retries = 0; 249 do { 250 rte_distributor_flush(db); 251 count = rte_distributor_returned_pkts(db, 252 &return_bufs[num_returned], 253 BIG_BATCH - num_returned); 254 num_returned += count; 255 retries++; 256 } while ((num_returned < BIG_BATCH) && (retries < 100)); 257 258 if (num_returned != BIG_BATCH) { 259 printf("line %d: Missing packets, expected %d\n", 260 __LINE__, num_returned); 261 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 262 return -1; 263 } 264 265 /* big check - make sure all packets made it back!! */ 266 for (i = 0; i < BIG_BATCH; i++) { 267 unsigned j; 268 struct rte_mbuf *src = many_bufs[i]; 269 for (j = 0; j < BIG_BATCH; j++) { 270 if (return_bufs[j] == src) 271 break; 272 } 273 274 if (j == BIG_BATCH) { 275 printf("Error: could not find source packet #%u\n", i); 276 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 277 return -1; 278 } 279 } 280 printf("Sanity test of returned packets done\n"); 281 282 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); 283 284 printf("\n"); 285 return 0; 286 } 287 288 289 /* to test that the distributor does not lose packets, we use this worker 290 * function which frees mbufs when it gets them. The distributor thread does 291 * the mbuf allocation. If distributor drops packets we'll eventually run out 292 * of mbufs. 293 */ 294 static int 295 handle_work_with_free_mbufs(void *arg) 296 { 297 struct rte_mbuf *buf[8] __rte_cache_aligned; 298 struct worker_params *wp = arg; 299 struct rte_distributor *d = wp->dist; 300 unsigned int i; 301 unsigned int num; 302 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 303 304 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 305 while (!quit) { 306 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 307 __ATOMIC_RELAXED); 308 for (i = 0; i < num; i++) 309 rte_pktmbuf_free(buf[i]); 310 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 311 } 312 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 313 __ATOMIC_RELAXED); 314 rte_distributor_return_pkt(d, id, buf, num); 315 return 0; 316 } 317 318 /* Perform a sanity test of the distributor with a large number of packets, 319 * where we allocate a new set of mbufs for each burst. The workers then 320 * free the mbufs. This ensures that we don't have any packet leaks in the 321 * library. 322 */ 323 static int 324 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) 325 { 326 struct rte_distributor *d = wp->dist; 327 unsigned i; 328 struct rte_mbuf *bufs[BURST]; 329 unsigned int processed; 330 331 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name); 332 333 clear_packet_count(); 334 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) { 335 unsigned j; 336 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) 337 rte_distributor_process(d, NULL, 0); 338 for (j = 0; j < BURST; j++) { 339 bufs[j]->hash.usr = (i+j) << 1; 340 } 341 342 processed = 0; 343 while (processed < BURST) 344 processed += rte_distributor_process(d, 345 &bufs[processed], BURST - processed); 346 } 347 348 rte_distributor_flush(d); 349 350 rte_delay_us(10000); 351 352 if (total_packet_count() < (1<<ITER_POWER)) { 353 printf("Line %u: Packet count is incorrect, %u, expected %u\n", 354 __LINE__, total_packet_count(), 355 (1<<ITER_POWER)); 356 return -1; 357 } 358 359 printf("Sanity test with mbuf alloc/free passed\n\n"); 360 return 0; 361 } 362 363 static int 364 handle_work_for_shutdown_test(void *arg) 365 { 366 struct rte_mbuf *buf[8] __rte_cache_aligned; 367 struct worker_params *wp = arg; 368 struct rte_distributor *d = wp->dist; 369 unsigned int num; 370 unsigned int zero_id = 0; 371 unsigned int zero_unset; 372 const unsigned int id = __atomic_fetch_add(&worker_idx, 1, 373 __ATOMIC_RELAXED); 374 375 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 376 377 if (num > 0) { 378 zero_unset = RTE_MAX_LCORE; 379 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id, 380 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); 381 } 382 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE); 383 384 /* wait for quit single globally, or for worker zero, wait 385 * for zero_quit */ 386 while (!quit && !(id == zero_id && zero_quit)) { 387 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 388 __ATOMIC_RELAXED); 389 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 390 391 if (num > 0) { 392 zero_unset = RTE_MAX_LCORE; 393 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id, 394 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); 395 } 396 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE); 397 } 398 399 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 400 __ATOMIC_RELAXED); 401 if (id == zero_id) { 402 rte_distributor_return_pkt(d, id, NULL, 0); 403 404 /* for worker zero, allow it to restart to pick up last packet 405 * when all workers are shutting down. 406 */ 407 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE); 408 while (zero_quit) 409 usleep(100); 410 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE); 411 412 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 413 414 while (!quit) { 415 __atomic_fetch_add(&worker_stats[id].handled_packets, 416 num, __ATOMIC_RELAXED); 417 num = rte_distributor_get_pkt(d, id, buf, NULL, 0); 418 } 419 } 420 rte_distributor_return_pkt(d, id, buf, num); 421 return 0; 422 } 423 424 425 /* Perform a sanity test of the distributor with a large number of packets, 426 * where we allocate a new set of mbufs for each burst. The workers then 427 * free the mbufs. This ensures that we don't have any packet leaks in the 428 * library. 429 */ 430 static int 431 sanity_test_with_worker_shutdown(struct worker_params *wp, 432 struct rte_mempool *p) 433 { 434 struct rte_distributor *d = wp->dist; 435 struct rte_mbuf *bufs[BURST]; 436 struct rte_mbuf *bufs2[BURST]; 437 unsigned int i; 438 unsigned int failed = 0; 439 unsigned int processed = 0; 440 441 printf("=== Sanity test of worker shutdown ===\n"); 442 443 clear_packet_count(); 444 445 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 446 printf("line %d: Error getting mbufs from pool\n", __LINE__); 447 return -1; 448 } 449 450 /* 451 * Now set all hash values in all buffers to same value so all 452 * pkts go to the one worker thread 453 */ 454 for (i = 0; i < BURST; i++) 455 bufs[i]->hash.usr = 1; 456 457 processed = 0; 458 while (processed < BURST) 459 processed += rte_distributor_process(d, &bufs[processed], 460 BURST - processed); 461 rte_distributor_flush(d); 462 463 /* at this point, we will have processed some packets and have a full 464 * backlog for the other ones at worker 0. 465 */ 466 467 /* get more buffers to queue up, again setting them to the same flow */ 468 if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) { 469 printf("line %d: Error getting mbufs from pool\n", __LINE__); 470 rte_mempool_put_bulk(p, (void *)bufs, BURST); 471 return -1; 472 } 473 for (i = 0; i < BURST; i++) 474 bufs2[i]->hash.usr = 1; 475 476 /* get worker zero to quit */ 477 zero_quit = 1; 478 rte_distributor_process(d, bufs2, BURST); 479 480 /* flush the distributor */ 481 rte_distributor_flush(d); 482 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 483 rte_distributor_flush(d); 484 485 zero_quit = 0; 486 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 487 rte_delay_us(100); 488 489 for (i = 0; i < rte_lcore_count() - 1; i++) 490 printf("Worker %u handled %u packets\n", i, 491 __atomic_load_n(&worker_stats[i].handled_packets, 492 __ATOMIC_RELAXED)); 493 494 if (total_packet_count() != BURST * 2) { 495 printf("Line %d: Error, not all packets flushed. " 496 "Expected %u, got %u\n", 497 __LINE__, BURST * 2, total_packet_count()); 498 failed = 1; 499 } 500 501 rte_mempool_put_bulk(p, (void *)bufs, BURST); 502 rte_mempool_put_bulk(p, (void *)bufs2, BURST); 503 504 if (failed) 505 return -1; 506 507 printf("Sanity test with worker shutdown passed\n\n"); 508 return 0; 509 } 510 511 /* Test that the flush function is able to move packets between workers when 512 * one worker shuts down.. 513 */ 514 static int 515 test_flush_with_worker_shutdown(struct worker_params *wp, 516 struct rte_mempool *p) 517 { 518 struct rte_distributor *d = wp->dist; 519 struct rte_mbuf *bufs[BURST]; 520 unsigned int i; 521 unsigned int failed = 0; 522 unsigned int processed; 523 524 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name); 525 526 clear_packet_count(); 527 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { 528 printf("line %d: Error getting mbufs from pool\n", __LINE__); 529 return -1; 530 } 531 532 /* now set all hash values in all buffers to zero, so all pkts go to the 533 * one worker thread */ 534 for (i = 0; i < BURST; i++) 535 bufs[i]->hash.usr = 0; 536 537 processed = 0; 538 while (processed < BURST) 539 processed += rte_distributor_process(d, &bufs[processed], 540 BURST - processed); 541 /* at this point, we will have processed some packets and have a full 542 * backlog for the other ones at worker 0. 543 */ 544 545 /* get worker zero to quit */ 546 zero_quit = 1; 547 548 /* flush the distributor */ 549 rte_distributor_flush(d); 550 551 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 552 rte_distributor_flush(d); 553 554 zero_quit = 0; 555 556 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) 557 rte_delay_us(100); 558 559 for (i = 0; i < rte_lcore_count() - 1; i++) 560 printf("Worker %u handled %u packets\n", i, 561 __atomic_load_n(&worker_stats[i].handled_packets, 562 __ATOMIC_RELAXED)); 563 564 if (total_packet_count() != BURST) { 565 printf("Line %d: Error, not all packets flushed. " 566 "Expected %u, got %u\n", 567 __LINE__, BURST, total_packet_count()); 568 failed = 1; 569 } 570 571 rte_mempool_put_bulk(p, (void *)bufs, BURST); 572 573 if (failed) 574 return -1; 575 576 printf("Flush test with worker shutdown passed\n\n"); 577 return 0; 578 } 579 580 static int 581 handle_and_mark_work(void *arg) 582 { 583 struct rte_mbuf *buf[8] __rte_cache_aligned; 584 struct worker_params *wp = arg; 585 struct rte_distributor *db = wp->dist; 586 unsigned int num, i; 587 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); 588 num = rte_distributor_get_pkt(db, id, buf, NULL, 0); 589 while (!quit) { 590 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 591 __ATOMIC_RELAXED); 592 for (i = 0; i < num; i++) 593 *seq_field(buf[i]) += id + 1; 594 num = rte_distributor_get_pkt(db, id, 595 buf, buf, num); 596 } 597 __atomic_fetch_add(&worker_stats[id].handled_packets, num, 598 __ATOMIC_RELAXED); 599 rte_distributor_return_pkt(db, id, buf, num); 600 return 0; 601 } 602 603 /* sanity_mark_test sends packets to workers which mark them. 604 * Every packet has also encoded sequence number. 605 * The returned packets are sorted and verified if they were handled 606 * by proper workers. 607 */ 608 static int 609 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p) 610 { 611 const unsigned int buf_count = 24; 612 const unsigned int burst = 8; 613 const unsigned int shift = 12; 614 const unsigned int seq_shift = 10; 615 616 struct rte_distributor *db = wp->dist; 617 struct rte_mbuf *bufs[buf_count]; 618 struct rte_mbuf *returns[buf_count]; 619 unsigned int i, count, id; 620 unsigned int sorted[buf_count], seq; 621 unsigned int failed = 0; 622 unsigned int processed; 623 624 printf("=== Marked packets test ===\n"); 625 clear_packet_count(); 626 if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) { 627 printf("line %d: Error getting mbufs from pool\n", __LINE__); 628 return -1; 629 } 630 631 /* bufs' hashes will be like these below, but shifted left. 632 * The shifting is for avoiding collisions with backlogs 633 * and in-flight tags left by previous tests. 634 * [1, 1, 1, 1, 1, 1, 1, 1 635 * 1, 1, 1, 1, 2, 2, 2, 2 636 * 2, 2, 2, 2, 1, 1, 1, 1] 637 */ 638 for (i = 0; i < burst; i++) { 639 bufs[0 * burst + i]->hash.usr = 1 << shift; 640 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2) 641 << shift; 642 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1) 643 << shift; 644 } 645 /* Assign a sequence number to each packet. The sequence is shifted, 646 * so that lower bits will hold mark from worker. 647 */ 648 for (i = 0; i < buf_count; i++) 649 *seq_field(bufs[i]) = i << seq_shift; 650 651 count = 0; 652 for (i = 0; i < buf_count/burst; i++) { 653 processed = 0; 654 while (processed < burst) 655 processed += rte_distributor_process(db, 656 &bufs[i * burst + processed], 657 burst - processed); 658 count += rte_distributor_returned_pkts(db, &returns[count], 659 buf_count - count); 660 } 661 662 do { 663 rte_distributor_flush(db); 664 count += rte_distributor_returned_pkts(db, &returns[count], 665 buf_count - count); 666 } while (count < buf_count); 667 668 for (i = 0; i < rte_lcore_count() - 1; i++) 669 printf("Worker %u handled %u packets\n", i, 670 __atomic_load_n(&worker_stats[i].handled_packets, 671 __ATOMIC_RELAXED)); 672 673 /* Sort returned packets by sent order (sequence numbers). */ 674 for (i = 0; i < buf_count; i++) { 675 seq = *seq_field(returns[i]) >> seq_shift; 676 id = *seq_field(returns[i]) - (seq << seq_shift); 677 sorted[seq] = id; 678 } 679 680 /* Verify that packets [0-11] and [20-23] were processed 681 * by the same worker 682 */ 683 for (i = 1; i < 12; i++) { 684 if (sorted[i] != sorted[0]) { 685 printf("Packet number %u processed by worker %u," 686 " but should be processes by worker %u\n", 687 i, sorted[i], sorted[0]); 688 failed = 1; 689 } 690 } 691 for (i = 20; i < 24; i++) { 692 if (sorted[i] != sorted[0]) { 693 printf("Packet number %u processed by worker %u," 694 " but should be processes by worker %u\n", 695 i, sorted[i], sorted[0]); 696 failed = 1; 697 } 698 } 699 /* And verify that packets [12-19] were processed 700 * by the another worker 701 */ 702 for (i = 13; i < 20; i++) { 703 if (sorted[i] != sorted[12]) { 704 printf("Packet number %u processed by worker %u," 705 " but should be processes by worker %u\n", 706 i, sorted[i], sorted[12]); 707 failed = 1; 708 } 709 } 710 711 rte_mempool_put_bulk(p, (void *)bufs, buf_count); 712 713 if (failed) 714 return -1; 715 716 printf("Marked packets test passed\n"); 717 return 0; 718 } 719 720 static 721 int test_error_distributor_create_name(void) 722 { 723 struct rte_distributor *d = NULL; 724 struct rte_distributor *db = NULL; 725 char *name = NULL; 726 727 d = rte_distributor_create(name, rte_socket_id(), 728 rte_lcore_count() - 1, 729 RTE_DIST_ALG_SINGLE); 730 if (d != NULL || rte_errno != EINVAL) { 731 printf("ERROR: No error on create() with NULL name param\n"); 732 return -1; 733 } 734 735 db = rte_distributor_create(name, rte_socket_id(), 736 rte_lcore_count() - 1, 737 RTE_DIST_ALG_BURST); 738 if (db != NULL || rte_errno != EINVAL) { 739 printf("ERROR: No error on create() with NULL param\n"); 740 return -1; 741 } 742 743 return 0; 744 } 745 746 747 static 748 int test_error_distributor_create_numworkers(void) 749 { 750 struct rte_distributor *ds = NULL; 751 struct rte_distributor *db = NULL; 752 753 ds = rte_distributor_create("test_numworkers", rte_socket_id(), 754 RTE_MAX_LCORE + 10, 755 RTE_DIST_ALG_SINGLE); 756 if (ds != NULL || rte_errno != EINVAL) { 757 printf("ERROR: No error on create() with num_workers > MAX\n"); 758 return -1; 759 } 760 761 db = rte_distributor_create("test_numworkers", rte_socket_id(), 762 RTE_MAX_LCORE + 10, 763 RTE_DIST_ALG_BURST); 764 if (db != NULL || rte_errno != EINVAL) { 765 printf("ERROR: No error on create() num_workers > MAX\n"); 766 return -1; 767 } 768 769 return 0; 770 } 771 772 773 /* Useful function which ensures that all worker functions terminate */ 774 static void 775 quit_workers(struct worker_params *wp, struct rte_mempool *p) 776 { 777 struct rte_distributor *d = wp->dist; 778 const unsigned num_workers = rte_lcore_count() - 1; 779 unsigned i; 780 struct rte_mbuf *bufs[RTE_MAX_LCORE]; 781 struct rte_mbuf *returns[RTE_MAX_LCORE]; 782 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { 783 printf("line %d: Error getting mbufs from pool\n", __LINE__); 784 return; 785 } 786 787 zero_quit = 0; 788 quit = 1; 789 for (i = 0; i < num_workers; i++) { 790 bufs[i]->hash.usr = i << 1; 791 rte_distributor_process(d, &bufs[i], 1); 792 } 793 794 rte_distributor_process(d, NULL, 0); 795 rte_distributor_flush(d); 796 rte_eal_mp_wait_lcore(); 797 798 while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE)) 799 ; 800 801 rte_distributor_clear_returns(d); 802 rte_mempool_put_bulk(p, (void *)bufs, num_workers); 803 804 quit = 0; 805 worker_idx = 0; 806 zero_idx = RTE_MAX_LCORE; 807 zero_quit = 0; 808 zero_sleep = 0; 809 } 810 811 static int 812 test_distributor(void) 813 { 814 static struct rte_distributor *ds; 815 static struct rte_distributor *db; 816 static struct rte_distributor *dist[2]; 817 static struct rte_mempool *p; 818 int i; 819 820 static const struct rte_mbuf_dynfield seq_dynfield_desc = { 821 .name = "test_distributor_dynfield_seq", 822 .size = sizeof(seq_dynfield_t), 823 .align = __alignof__(seq_dynfield_t), 824 }; 825 seq_dynfield_offset = 826 rte_mbuf_dynfield_register(&seq_dynfield_desc); 827 if (seq_dynfield_offset < 0) { 828 printf("Error registering mbuf field\n"); 829 return TEST_FAILED; 830 } 831 832 if (rte_lcore_count() < 2) { 833 printf("Not enough cores for distributor_autotest, expecting at least 2\n"); 834 return TEST_SKIPPED; 835 } 836 837 if (db == NULL) { 838 db = rte_distributor_create("Test_dist_burst", rte_socket_id(), 839 rte_lcore_count() - 1, 840 RTE_DIST_ALG_BURST); 841 if (db == NULL) { 842 printf("Error creating burst distributor\n"); 843 return -1; 844 } 845 } else { 846 rte_distributor_flush(db); 847 rte_distributor_clear_returns(db); 848 } 849 850 if (ds == NULL) { 851 ds = rte_distributor_create("Test_dist_single", 852 rte_socket_id(), 853 rte_lcore_count() - 1, 854 RTE_DIST_ALG_SINGLE); 855 if (ds == NULL) { 856 printf("Error creating single distributor\n"); 857 return -1; 858 } 859 } else { 860 rte_distributor_flush(ds); 861 rte_distributor_clear_returns(ds); 862 } 863 864 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ? 865 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count()); 866 if (p == NULL) { 867 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST, 868 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); 869 if (p == NULL) { 870 printf("Error creating mempool\n"); 871 return -1; 872 } 873 } 874 875 dist[0] = ds; 876 dist[1] = db; 877 878 for (i = 0; i < 2; i++) { 879 880 worker_params.dist = dist[i]; 881 if (i) 882 strlcpy(worker_params.name, "burst", 883 sizeof(worker_params.name)); 884 else 885 strlcpy(worker_params.name, "single", 886 sizeof(worker_params.name)); 887 888 rte_eal_mp_remote_launch(handle_work, 889 &worker_params, SKIP_MAIN); 890 if (sanity_test(&worker_params, p) < 0) 891 goto err; 892 quit_workers(&worker_params, p); 893 894 rte_eal_mp_remote_launch(handle_work_with_free_mbufs, 895 &worker_params, SKIP_MAIN); 896 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0) 897 goto err; 898 quit_workers(&worker_params, p); 899 900 if (rte_lcore_count() > 2) { 901 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 902 &worker_params, 903 SKIP_MAIN); 904 if (sanity_test_with_worker_shutdown(&worker_params, 905 p) < 0) 906 goto err; 907 quit_workers(&worker_params, p); 908 909 rte_eal_mp_remote_launch(handle_work_for_shutdown_test, 910 &worker_params, 911 SKIP_MAIN); 912 if (test_flush_with_worker_shutdown(&worker_params, 913 p) < 0) 914 goto err; 915 quit_workers(&worker_params, p); 916 917 rte_eal_mp_remote_launch(handle_and_mark_work, 918 &worker_params, SKIP_MAIN); 919 if (sanity_mark_test(&worker_params, p) < 0) 920 goto err; 921 quit_workers(&worker_params, p); 922 923 } else { 924 printf("Too few cores to run worker shutdown test\n"); 925 } 926 927 } 928 929 if (test_error_distributor_create_numworkers() == -1 || 930 test_error_distributor_create_name() == -1) { 931 printf("rte_distributor_create parameter check tests failed"); 932 return -1; 933 } 934 935 return 0; 936 937 err: 938 quit_workers(&worker_params, p); 939 return -1; 940 } 941 942 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor); 943