1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2017 Cavium, Inc. 4 */ 5 6 #include "test_pipeline_common.h" 7 8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */ 9 10 static __rte_always_inline int 11 pipeline_queue_nb_event_queues(struct evt_options *opt) 12 { 13 uint16_t eth_count = rte_eth_dev_count_avail(); 14 15 return (eth_count * opt->nb_stages) + eth_count; 16 } 17 18 typedef int (*pipeline_queue_worker_t)(void *arg); 19 20 static __rte_noinline int 21 pipeline_queue_worker_single_stage_tx(void *arg) 22 { 23 PIPELINE_WORKER_SINGLE_STAGE_INIT; 24 uint8_t enq = 0, deq = 0; 25 26 while (t->done == false) { 27 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 28 29 if (!deq) { 30 rte_pause(); 31 continue; 32 } 33 34 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 35 enq = pipeline_event_tx(dev, port, &ev, t); 36 ev.op = RTE_EVENT_OP_RELEASE; 37 w->processed_pkts++; 38 } else { 39 ev.queue_id++; 40 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 41 enq = pipeline_event_enqueue(dev, port, &ev, t); 42 } 43 } 44 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 45 46 return 0; 47 } 48 49 static __rte_noinline int 50 pipeline_queue_worker_single_stage_fwd(void *arg) 51 { 52 PIPELINE_WORKER_SINGLE_STAGE_INIT; 53 const uint8_t *tx_queue = t->tx_evqueue_id; 54 uint8_t enq = 0, deq = 0; 55 56 while (t->done == false) { 57 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 58 59 if (!deq) { 60 rte_pause(); 61 continue; 62 } 63 64 ev.queue_id = tx_queue[ev.mbuf->port]; 65 rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0); 66 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 67 enq = pipeline_event_enqueue(dev, port, &ev, t); 68 w->processed_pkts++; 69 } 70 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 71 72 return 0; 73 } 74 75 static __rte_noinline int 76 pipeline_queue_worker_single_stage_burst_tx(void *arg) 77 { 78 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 79 uint16_t nb_rx = 0, nb_tx = 0; 80 81 while (t->done == false) { 82 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 83 84 if (!nb_rx) { 85 rte_pause(); 86 continue; 87 } 88 89 for (i = 0; i < nb_rx; i++) { 90 rte_prefetch0(ev[i + 1].mbuf); 91 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 92 pipeline_event_tx(dev, port, &ev[i], t); 93 ev[i].op = RTE_EVENT_OP_RELEASE; 94 w->processed_pkts++; 95 } else { 96 ev[i].queue_id++; 97 pipeline_fwd_event(&ev[i], 98 RTE_SCHED_TYPE_ATOMIC); 99 } 100 } 101 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 102 } 103 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 104 105 return 0; 106 } 107 108 static __rte_noinline int 109 pipeline_queue_worker_single_stage_burst_fwd(void *arg) 110 { 111 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 112 const uint8_t *tx_queue = t->tx_evqueue_id; 113 uint16_t nb_rx = 0, nb_tx = 0; 114 115 while (t->done == false) { 116 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 117 118 if (!nb_rx) { 119 rte_pause(); 120 continue; 121 } 122 123 for (i = 0; i < nb_rx; i++) { 124 rte_prefetch0(ev[i + 1].mbuf); 125 ev[i].queue_id = tx_queue[ev[i].mbuf->port]; 126 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); 127 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 128 } 129 130 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 131 w->processed_pkts += nb_rx; 132 } 133 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 134 135 return 0; 136 } 137 138 static __rte_noinline int 139 pipeline_queue_worker_single_stage_tx_vector(void *arg) 140 { 141 PIPELINE_WORKER_SINGLE_STAGE_INIT; 142 uint8_t enq = 0, deq = 0; 143 uint16_t vector_sz; 144 145 while (!t->done) { 146 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 147 148 if (!deq) { 149 rte_pause(); 150 continue; 151 } 152 153 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 154 vector_sz = ev.vec->nb_elem; 155 enq = pipeline_event_tx_vector(dev, port, &ev, t); 156 ev.op = RTE_EVENT_OP_RELEASE; 157 w->processed_pkts += vector_sz; 158 } else { 159 ev.queue_id++; 160 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); 161 enq = pipeline_event_enqueue(dev, port, &ev, t); 162 } 163 } 164 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 165 166 return 0; 167 } 168 169 static __rte_noinline int 170 pipeline_queue_worker_single_stage_fwd_vector(void *arg) 171 { 172 PIPELINE_WORKER_SINGLE_STAGE_INIT; 173 const uint8_t *tx_queue = t->tx_evqueue_id; 174 uint8_t enq = 0, deq = 0; 175 uint16_t vector_sz; 176 177 while (!t->done) { 178 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 179 180 if (!deq) { 181 rte_pause(); 182 continue; 183 } 184 185 ev.queue_id = tx_queue[ev.vec->port]; 186 ev.vec->queue = 0; 187 vector_sz = ev.vec->nb_elem; 188 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); 189 enq = pipeline_event_enqueue(dev, port, &ev, t); 190 w->processed_pkts += vector_sz; 191 } 192 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 193 194 return 0; 195 } 196 197 static __rte_noinline int 198 pipeline_queue_worker_single_stage_burst_tx_vector(void *arg) 199 { 200 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 201 uint16_t nb_rx = 0, nb_tx = 0; 202 uint16_t vector_sz; 203 204 while (!t->done) { 205 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 206 207 if (!nb_rx) { 208 rte_pause(); 209 continue; 210 } 211 212 for (i = 0; i < nb_rx; i++) { 213 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 214 vector_sz = ev[i].vec->nb_elem; 215 pipeline_event_tx_vector(dev, port, &ev[i], t); 216 ev[i].op = RTE_EVENT_OP_RELEASE; 217 w->processed_pkts += vector_sz; 218 } else { 219 ev[i].queue_id++; 220 pipeline_fwd_event_vector( 221 &ev[i], RTE_SCHED_TYPE_ATOMIC); 222 } 223 } 224 225 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 226 } 227 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 228 229 return 0; 230 } 231 232 static __rte_noinline int 233 pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg) 234 { 235 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 236 const uint8_t *tx_queue = t->tx_evqueue_id; 237 uint16_t nb_rx = 0, nb_tx = 0; 238 uint16_t vector_sz; 239 240 while (!t->done) { 241 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 242 243 if (!nb_rx) { 244 rte_pause(); 245 continue; 246 } 247 248 vector_sz = 0; 249 for (i = 0; i < nb_rx; i++) { 250 ev[i].queue_id = tx_queue[ev[i].vec->port]; 251 ev[i].vec->queue = 0; 252 vector_sz += ev[i].vec->nb_elem; 253 pipeline_fwd_event_vector(&ev[i], 254 RTE_SCHED_TYPE_ATOMIC); 255 } 256 257 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 258 w->processed_pkts += vector_sz; 259 } 260 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 261 262 return 0; 263 } 264 265 static __rte_noinline int 266 pipeline_queue_worker_multi_stage_tx(void *arg) 267 { 268 PIPELINE_WORKER_MULTI_STAGE_INIT; 269 const uint8_t *tx_queue = t->tx_evqueue_id; 270 uint8_t enq = 0, deq = 0; 271 272 while (t->done == false) { 273 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 274 275 if (!deq) { 276 rte_pause(); 277 continue; 278 } 279 280 cq_id = ev.queue_id % nb_stages; 281 282 if (ev.queue_id == tx_queue[ev.mbuf->port]) { 283 enq = pipeline_event_tx(dev, port, &ev, t); 284 ev.op = RTE_EVENT_OP_RELEASE; 285 w->processed_pkts++; 286 continue; 287 } 288 289 ev.queue_id++; 290 pipeline_fwd_event(&ev, cq_id != last_queue ? 291 sched_type_list[cq_id] : 292 RTE_SCHED_TYPE_ATOMIC); 293 enq = pipeline_event_enqueue(dev, port, &ev, t); 294 } 295 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 296 297 return 0; 298 } 299 300 static __rte_noinline int 301 pipeline_queue_worker_multi_stage_fwd(void *arg) 302 { 303 PIPELINE_WORKER_MULTI_STAGE_INIT; 304 const uint8_t *tx_queue = t->tx_evqueue_id; 305 uint8_t enq = 0, deq = 0; 306 307 while (t->done == false) { 308 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 309 310 if (!deq) { 311 rte_pause(); 312 continue; 313 } 314 315 cq_id = ev.queue_id % nb_stages; 316 317 if (cq_id == last_queue) { 318 ev.queue_id = tx_queue[ev.mbuf->port]; 319 rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0); 320 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 321 enq = pipeline_event_enqueue(dev, port, &ev, t); 322 w->processed_pkts++; 323 } else { 324 ev.queue_id++; 325 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 326 enq = pipeline_event_enqueue(dev, port, &ev, t); 327 } 328 } 329 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 330 331 return 0; 332 } 333 334 static __rte_noinline int 335 pipeline_queue_worker_multi_stage_burst_tx(void *arg) 336 { 337 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 338 const uint8_t *tx_queue = t->tx_evqueue_id; 339 uint16_t nb_rx = 0, nb_tx = 0; 340 341 while (t->done == false) { 342 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 343 344 if (!nb_rx) { 345 rte_pause(); 346 continue; 347 } 348 349 for (i = 0; i < nb_rx; i++) { 350 rte_prefetch0(ev[i + 1].mbuf); 351 cq_id = ev[i].queue_id % nb_stages; 352 353 if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) { 354 pipeline_event_tx(dev, port, &ev[i], t); 355 ev[i].op = RTE_EVENT_OP_RELEASE; 356 w->processed_pkts++; 357 continue; 358 } 359 360 ev[i].queue_id++; 361 pipeline_fwd_event(&ev[i], cq_id != last_queue ? 362 sched_type_list[cq_id] : 363 RTE_SCHED_TYPE_ATOMIC); 364 } 365 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 366 } 367 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 368 369 return 0; 370 } 371 372 static __rte_noinline int 373 pipeline_queue_worker_multi_stage_burst_fwd(void *arg) 374 { 375 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 376 const uint8_t *tx_queue = t->tx_evqueue_id; 377 uint16_t nb_rx = 0, nb_tx = 0; 378 379 while (t->done == false) { 380 uint16_t processed_pkts = 0; 381 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 382 383 if (!nb_rx) { 384 rte_pause(); 385 continue; 386 } 387 388 for (i = 0; i < nb_rx; i++) { 389 rte_prefetch0(ev[i + 1].mbuf); 390 cq_id = ev[i].queue_id % nb_stages; 391 392 if (cq_id == last_queue) { 393 ev[i].queue_id = tx_queue[ev[i].mbuf->port]; 394 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); 395 pipeline_fwd_event(&ev[i], 396 RTE_SCHED_TYPE_ATOMIC); 397 processed_pkts++; 398 } else { 399 ev[i].queue_id++; 400 pipeline_fwd_event(&ev[i], 401 sched_type_list[cq_id]); 402 } 403 } 404 405 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 406 w->processed_pkts += processed_pkts; 407 } 408 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 409 410 return 0; 411 } 412 413 static __rte_noinline int 414 pipeline_queue_worker_multi_stage_tx_vector(void *arg) 415 { 416 PIPELINE_WORKER_MULTI_STAGE_INIT; 417 const uint8_t *tx_queue = t->tx_evqueue_id; 418 uint8_t enq = 0, deq = 0; 419 uint16_t vector_sz; 420 421 while (!t->done) { 422 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 423 424 if (!deq) { 425 rte_pause(); 426 continue; 427 } 428 429 cq_id = ev.queue_id % nb_stages; 430 431 if (ev.queue_id == tx_queue[ev.vec->port]) { 432 vector_sz = ev.vec->nb_elem; 433 enq = pipeline_event_tx_vector(dev, port, &ev, t); 434 w->processed_pkts += vector_sz; 435 ev.op = RTE_EVENT_OP_RELEASE; 436 continue; 437 } 438 439 ev.queue_id++; 440 pipeline_fwd_event_vector(&ev, cq_id != last_queue 441 ? sched_type_list[cq_id] 442 : RTE_SCHED_TYPE_ATOMIC); 443 enq = pipeline_event_enqueue(dev, port, &ev, t); 444 } 445 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 446 447 return 0; 448 } 449 450 static __rte_noinline int 451 pipeline_queue_worker_multi_stage_fwd_vector(void *arg) 452 { 453 PIPELINE_WORKER_MULTI_STAGE_INIT; 454 const uint8_t *tx_queue = t->tx_evqueue_id; 455 uint8_t enq = 0, deq = 0; 456 uint16_t vector_sz; 457 458 while (!t->done) { 459 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 460 461 if (!deq) { 462 rte_pause(); 463 continue; 464 } 465 466 cq_id = ev.queue_id % nb_stages; 467 468 if (cq_id == last_queue) { 469 vector_sz = ev.vec->nb_elem; 470 ev.queue_id = tx_queue[ev.vec->port]; 471 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC); 472 w->processed_pkts += vector_sz; 473 } else { 474 ev.queue_id++; 475 pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]); 476 } 477 478 enq = pipeline_event_enqueue(dev, port, &ev, t); 479 } 480 pipeline_worker_cleanup(dev, port, &ev, enq, deq); 481 482 return 0; 483 } 484 485 static __rte_noinline int 486 pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg) 487 { 488 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 489 const uint8_t *tx_queue = t->tx_evqueue_id; 490 uint16_t nb_rx = 0, nb_tx = 0; 491 uint16_t vector_sz; 492 493 while (!t->done) { 494 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 495 496 if (!nb_rx) { 497 rte_pause(); 498 continue; 499 } 500 501 for (i = 0; i < nb_rx; i++) { 502 cq_id = ev[i].queue_id % nb_stages; 503 504 if (ev[i].queue_id == tx_queue[ev[i].vec->port]) { 505 vector_sz = ev[i].vec->nb_elem; 506 pipeline_event_tx_vector(dev, port, &ev[i], t); 507 ev[i].op = RTE_EVENT_OP_RELEASE; 508 w->processed_pkts += vector_sz; 509 continue; 510 } 511 512 ev[i].queue_id++; 513 pipeline_fwd_event_vector( 514 &ev[i], cq_id != last_queue 515 ? sched_type_list[cq_id] 516 : RTE_SCHED_TYPE_ATOMIC); 517 } 518 519 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 520 } 521 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 522 523 return 0; 524 } 525 526 static __rte_noinline int 527 pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg) 528 { 529 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 530 const uint8_t *tx_queue = t->tx_evqueue_id; 531 uint16_t nb_rx = 0, nb_tx = 0; 532 uint16_t vector_sz; 533 534 while (!t->done) { 535 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 536 537 if (!nb_rx) { 538 rte_pause(); 539 continue; 540 } 541 542 for (i = 0; i < nb_rx; i++) { 543 cq_id = ev[i].queue_id % nb_stages; 544 545 if (cq_id == last_queue) { 546 ev[i].queue_id = tx_queue[ev[i].vec->port]; 547 vector_sz = ev[i].vec->nb_elem; 548 pipeline_fwd_event_vector( 549 &ev[i], RTE_SCHED_TYPE_ATOMIC); 550 w->processed_pkts += vector_sz; 551 } else { 552 ev[i].queue_id++; 553 pipeline_fwd_event_vector( 554 &ev[i], sched_type_list[cq_id]); 555 } 556 } 557 558 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t); 559 } 560 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx); 561 562 return 0; 563 } 564 565 static int 566 worker_wrapper(void *arg) 567 { 568 struct worker_data *w = arg; 569 struct evt_options *opt = w->t->opt; 570 const bool burst = evt_has_burst_mode(w->dev_id); 571 const bool internal_port = w->t->internal_port; 572 const uint8_t nb_stages = opt->nb_stages; 573 /*vector/burst/internal_port*/ 574 const pipeline_queue_worker_t 575 pipeline_queue_worker_single_stage[2][2][2] = { 576 [0][0][0] = pipeline_queue_worker_single_stage_fwd, 577 [0][0][1] = pipeline_queue_worker_single_stage_tx, 578 [0][1][0] = pipeline_queue_worker_single_stage_burst_fwd, 579 [0][1][1] = pipeline_queue_worker_single_stage_burst_tx, 580 [1][0][0] = pipeline_queue_worker_single_stage_fwd_vector, 581 [1][0][1] = pipeline_queue_worker_single_stage_tx_vector, 582 [1][1][0] = pipeline_queue_worker_single_stage_burst_fwd_vector, 583 [1][1][1] = pipeline_queue_worker_single_stage_burst_tx_vector, 584 }; 585 const pipeline_queue_worker_t 586 pipeline_queue_worker_multi_stage[2][2][2] = { 587 [0][0][0] = pipeline_queue_worker_multi_stage_fwd, 588 [0][0][1] = pipeline_queue_worker_multi_stage_tx, 589 [0][1][0] = pipeline_queue_worker_multi_stage_burst_fwd, 590 [0][1][1] = pipeline_queue_worker_multi_stage_burst_tx, 591 [1][0][0] = pipeline_queue_worker_multi_stage_fwd_vector, 592 [1][0][1] = pipeline_queue_worker_multi_stage_tx_vector, 593 [1][1][0] = pipeline_queue_worker_multi_stage_burst_fwd_vector, 594 [1][1][1] = pipeline_queue_worker_multi_stage_burst_tx_vector, 595 }; 596 597 if (nb_stages == 1) 598 return (pipeline_queue_worker_single_stage[opt->ena_vector] 599 [burst] 600 [internal_port])(arg); 601 else 602 return (pipeline_queue_worker_multi_stage[opt->ena_vector] 603 [burst] 604 [internal_port])(arg); 605 606 rte_panic("invalid worker\n"); 607 } 608 609 static int 610 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt) 611 { 612 return pipeline_launch_lcores(test, opt, worker_wrapper); 613 } 614 615 static int 616 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt) 617 { 618 int ret; 619 int nb_ports; 620 int nb_queues; 621 int nb_stages = opt->nb_stages; 622 uint8_t queue; 623 uint8_t tx_evport_id = 0; 624 uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS]; 625 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV]; 626 uint8_t nb_worker_queues = 0; 627 uint16_t prod = 0; 628 struct rte_event_dev_info info; 629 struct test_pipeline *t = evt_test_priv(test); 630 631 nb_ports = evt_nr_active_lcores(opt->wlcores); 632 nb_queues = rte_eth_dev_count_avail() * (nb_stages); 633 634 /* One queue for Tx adapter per port */ 635 nb_queues += rte_eth_dev_count_avail(); 636 637 memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS); 638 memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV); 639 640 rte_event_dev_info_get(opt->dev_id, &info); 641 ret = evt_configure_eventdev(opt, nb_queues, nb_ports); 642 if (ret) { 643 evt_err("failed to configure eventdev %d", opt->dev_id); 644 return ret; 645 } 646 647 struct rte_event_queue_conf q_conf = { 648 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 649 .nb_atomic_flows = opt->nb_flows, 650 .nb_atomic_order_sequences = opt->nb_flows, 651 }; 652 /* queue configurations */ 653 for (queue = 0; queue < nb_queues; queue++) { 654 uint8_t slot; 655 656 q_conf.event_queue_cfg = 0; 657 slot = queue % (nb_stages + 1); 658 if (slot == nb_stages) { 659 q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC; 660 if (!t->internal_port) { 661 q_conf.event_queue_cfg = 662 RTE_EVENT_QUEUE_CFG_SINGLE_LINK; 663 } 664 tx_evqueue_id[prod++] = queue; 665 } else { 666 q_conf.schedule_type = opt->sched_type_list[slot]; 667 queue_arr[nb_worker_queues] = queue; 668 nb_worker_queues++; 669 } 670 671 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf); 672 if (ret) { 673 evt_err("failed to setup queue=%d", queue); 674 return ret; 675 } 676 } 677 678 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth) 679 opt->wkr_deq_dep = info.max_event_port_dequeue_depth; 680 681 /* port configuration */ 682 const struct rte_event_port_conf p_conf = { 683 .dequeue_depth = opt->wkr_deq_dep, 684 .enqueue_depth = info.max_event_port_dequeue_depth, 685 .new_event_threshold = info.max_num_events, 686 }; 687 688 if (!t->internal_port) { 689 ret = pipeline_event_port_setup(test, opt, queue_arr, 690 nb_worker_queues, p_conf); 691 if (ret) 692 return ret; 693 } else 694 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues, 695 p_conf); 696 697 if (ret) 698 return ret; 699 /* 700 * The pipelines are setup in the following manner: 701 * 702 * eth_dev_count = 2, nb_stages = 2. 703 * 704 * queues = 6 705 * stride = 3 706 * 707 * event queue pipelines: 708 * eth0 -> q0 -> q1 -> (q2->tx) 709 * eth1 -> q3 -> q4 -> (q5->tx) 710 * 711 * q2, q5 configured as ATOMIC | SINGLE_LINK 712 * 713 */ 714 ret = pipeline_event_rx_adapter_setup(opt, nb_stages + 1, p_conf); 715 if (ret) 716 return ret; 717 718 ret = pipeline_event_tx_adapter_setup(opt, p_conf); 719 if (ret) 720 return ret; 721 722 if (!evt_has_distributed_sched(opt->dev_id)) { 723 uint32_t service_id; 724 rte_event_dev_service_id_get(opt->dev_id, &service_id); 725 ret = evt_service_setup(service_id); 726 if (ret) { 727 evt_err("No service lcore found to run event dev."); 728 return ret; 729 } 730 } 731 732 /* Connect the tx_evqueue_id to the Tx adapter port */ 733 if (!t->internal_port) { 734 RTE_ETH_FOREACH_DEV(prod) { 735 ret = rte_event_eth_tx_adapter_event_port_get(prod, 736 &tx_evport_id); 737 if (ret) { 738 evt_err("Unable to get Tx adptr[%d] evprt[%d]", 739 prod, tx_evport_id); 740 return ret; 741 } 742 743 if (rte_event_port_link(opt->dev_id, tx_evport_id, 744 &tx_evqueue_id[prod], 745 NULL, 1) != 1) { 746 evt_err("Unable to link Tx adptr[%d] evprt[%d]", 747 prod, tx_evport_id); 748 return ret; 749 } 750 } 751 } 752 753 ret = rte_event_dev_start(opt->dev_id); 754 if (ret) { 755 evt_err("failed to start eventdev %d", opt->dev_id); 756 return ret; 757 } 758 759 760 RTE_ETH_FOREACH_DEV(prod) { 761 ret = rte_eth_dev_start(prod); 762 if (ret) { 763 evt_err("Ethernet dev [%d] failed to start." 764 " Using synthetic producer", prod); 765 return ret; 766 } 767 768 } 769 770 RTE_ETH_FOREACH_DEV(prod) { 771 ret = rte_event_eth_rx_adapter_start(prod); 772 if (ret) { 773 evt_err("Rx adapter[%d] start failed", prod); 774 return ret; 775 } 776 777 ret = rte_event_eth_tx_adapter_start(prod); 778 if (ret) { 779 evt_err("Tx adapter[%d] start failed", prod); 780 return ret; 781 } 782 } 783 784 memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) * 785 RTE_MAX_ETHPORTS); 786 787 return 0; 788 } 789 790 static void 791 pipeline_queue_opt_dump(struct evt_options *opt) 792 { 793 pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt)); 794 } 795 796 static int 797 pipeline_queue_opt_check(struct evt_options *opt) 798 { 799 return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt)); 800 } 801 802 static bool 803 pipeline_queue_capability_check(struct evt_options *opt) 804 { 805 struct rte_event_dev_info dev_info; 806 807 rte_event_dev_info_get(opt->dev_id, &dev_info); 808 if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) || 809 dev_info.max_event_ports < 810 evt_nr_active_lcores(opt->wlcores)) { 811 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", 812 pipeline_queue_nb_event_queues(opt), 813 dev_info.max_event_queues, 814 evt_nr_active_lcores(opt->wlcores), 815 dev_info.max_event_ports); 816 } 817 818 return true; 819 } 820 821 static const struct evt_test_ops pipeline_queue = { 822 .cap_check = pipeline_queue_capability_check, 823 .opt_check = pipeline_queue_opt_check, 824 .opt_dump = pipeline_queue_opt_dump, 825 .test_setup = pipeline_test_setup, 826 .mempool_setup = pipeline_mempool_setup, 827 .ethdev_setup = pipeline_ethdev_setup, 828 .eventdev_setup = pipeline_queue_eventdev_setup, 829 .launch_lcores = pipeline_queue_launch_lcores, 830 .ethdev_rx_stop = pipeline_ethdev_rx_stop, 831 .eventdev_destroy = pipeline_eventdev_destroy, 832 .mempool_destroy = pipeline_mempool_destroy, 833 .ethdev_destroy = pipeline_ethdev_destroy, 834 .test_result = pipeline_test_result, 835 .test_destroy = pipeline_test_destroy, 836 }; 837 838 EVT_TEST_REGISTER(pipeline_queue); 839