1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2017 Cavium, Inc. 4 */ 5 6 #include "test_pipeline_common.h" 7 8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */ 9 10 static __rte_always_inline int 11 pipeline_atq_nb_event_queues(struct evt_options *opt) 12 { 13 RTE_SET_USED(opt); 14 15 return rte_eth_dev_count_avail(); 16 } 17 18 static __rte_noinline int 19 pipeline_atq_worker_single_stage_tx(void *arg) 20 { 21 PIPELINE_WORKER_SINGLE_STAGE_INIT; 22 23 while (t->done == false) { 24 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 25 26 if (!event) { 27 rte_pause(); 28 continue; 29 } 30 31 pipeline_event_tx(dev, port, &ev); 32 w->processed_pkts++; 33 } 34 35 return 0; 36 } 37 38 static __rte_noinline int 39 pipeline_atq_worker_single_stage_fwd(void *arg) 40 { 41 PIPELINE_WORKER_SINGLE_STAGE_INIT; 42 const uint8_t *tx_queue = t->tx_evqueue_id; 43 44 while (t->done == false) { 45 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 46 47 if (!event) { 48 rte_pause(); 49 continue; 50 } 51 52 ev.queue_id = tx_queue[ev.mbuf->port]; 53 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 54 pipeline_event_enqueue(dev, port, &ev); 55 w->processed_pkts++; 56 } 57 58 return 0; 59 } 60 61 static __rte_noinline int 62 pipeline_atq_worker_single_stage_burst_tx(void *arg) 63 { 64 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 65 66 while (t->done == false) { 67 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 68 BURST_SIZE, 0); 69 70 if (!nb_rx) { 71 rte_pause(); 72 continue; 73 } 74 75 for (i = 0; i < nb_rx; i++) { 76 rte_prefetch0(ev[i + 1].mbuf); 77 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); 78 } 79 80 pipeline_event_tx_burst(dev, port, ev, nb_rx); 81 w->processed_pkts += nb_rx; 82 } 83 84 return 0; 85 } 86 87 static __rte_noinline int 88 pipeline_atq_worker_single_stage_burst_fwd(void *arg) 89 { 90 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 91 const uint8_t *tx_queue = t->tx_evqueue_id; 92 93 while (t->done == false) { 94 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 95 BURST_SIZE, 0); 96 97 if (!nb_rx) { 98 rte_pause(); 99 continue; 100 } 101 102 for (i = 0; i < nb_rx; i++) { 103 rte_prefetch0(ev[i + 1].mbuf); 104 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); 105 ev[i].queue_id = tx_queue[ev[i].mbuf->port]; 106 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 107 } 108 109 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 110 w->processed_pkts += nb_rx; 111 } 112 113 return 0; 114 } 115 116 static __rte_noinline int 117 pipeline_atq_worker_multi_stage_tx(void *arg) 118 { 119 PIPELINE_WORKER_MULTI_STAGE_INIT; 120 121 while (t->done == false) { 122 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 123 124 if (!event) { 125 rte_pause(); 126 continue; 127 } 128 129 cq_id = ev.sub_event_type % nb_stages; 130 131 if (cq_id == last_queue) { 132 pipeline_event_tx(dev, port, &ev); 133 w->processed_pkts++; 134 continue; 135 } 136 137 ev.sub_event_type++; 138 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 139 pipeline_event_enqueue(dev, port, &ev); 140 } 141 142 return 0; 143 } 144 145 static __rte_noinline int 146 pipeline_atq_worker_multi_stage_fwd(void *arg) 147 { 148 PIPELINE_WORKER_MULTI_STAGE_INIT; 149 const uint8_t *tx_queue = t->tx_evqueue_id; 150 151 while (t->done == false) { 152 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 153 154 if (!event) { 155 rte_pause(); 156 continue; 157 } 158 159 cq_id = ev.sub_event_type % nb_stages; 160 161 if (cq_id == last_queue) { 162 ev.queue_id = tx_queue[ev.mbuf->port]; 163 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 164 w->processed_pkts++; 165 } else { 166 ev.sub_event_type++; 167 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 168 } 169 170 pipeline_event_enqueue(dev, port, &ev); 171 } 172 173 return 0; 174 } 175 176 static __rte_noinline int 177 pipeline_atq_worker_multi_stage_burst_tx(void *arg) 178 { 179 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 180 181 while (t->done == false) { 182 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 183 BURST_SIZE, 0); 184 185 if (!nb_rx) { 186 rte_pause(); 187 continue; 188 } 189 190 for (i = 0; i < nb_rx; i++) { 191 rte_prefetch0(ev[i + 1].mbuf); 192 cq_id = ev[i].sub_event_type % nb_stages; 193 194 if (cq_id == last_queue) { 195 pipeline_event_tx(dev, port, &ev[i]); 196 ev[i].op = RTE_EVENT_OP_RELEASE; 197 w->processed_pkts++; 198 continue; 199 } 200 201 ev[i].sub_event_type++; 202 pipeline_fwd_event(&ev[i], sched_type_list[cq_id]); 203 } 204 205 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 206 } 207 208 return 0; 209 } 210 211 static __rte_noinline int 212 pipeline_atq_worker_multi_stage_burst_fwd(void *arg) 213 { 214 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 215 const uint8_t *tx_queue = t->tx_evqueue_id; 216 217 while (t->done == false) { 218 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 219 BURST_SIZE, 0); 220 221 if (!nb_rx) { 222 rte_pause(); 223 continue; 224 } 225 226 for (i = 0; i < nb_rx; i++) { 227 rte_prefetch0(ev[i + 1].mbuf); 228 cq_id = ev[i].sub_event_type % nb_stages; 229 230 if (cq_id == last_queue) { 231 w->processed_pkts++; 232 ev[i].queue_id = tx_queue[ev[i].mbuf->port]; 233 pipeline_fwd_event(&ev[i], 234 RTE_SCHED_TYPE_ATOMIC); 235 } else { 236 ev[i].sub_event_type++; 237 pipeline_fwd_event(&ev[i], 238 sched_type_list[cq_id]); 239 } 240 } 241 242 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 243 } 244 245 return 0; 246 } 247 248 static int 249 worker_wrapper(void *arg) 250 { 251 struct worker_data *w = arg; 252 struct evt_options *opt = w->t->opt; 253 const bool burst = evt_has_burst_mode(w->dev_id); 254 const bool internal_port = w->t->internal_port; 255 const uint8_t nb_stages = opt->nb_stages; 256 RTE_SET_USED(opt); 257 258 if (nb_stages == 1) { 259 if (!burst && internal_port) 260 return pipeline_atq_worker_single_stage_tx(arg); 261 else if (!burst && !internal_port) 262 return pipeline_atq_worker_single_stage_fwd(arg); 263 else if (burst && internal_port) 264 return pipeline_atq_worker_single_stage_burst_tx(arg); 265 else if (burst && !internal_port) 266 return pipeline_atq_worker_single_stage_burst_fwd(arg); 267 } else { 268 if (!burst && internal_port) 269 return pipeline_atq_worker_multi_stage_tx(arg); 270 else if (!burst && !internal_port) 271 return pipeline_atq_worker_multi_stage_fwd(arg); 272 if (burst && internal_port) 273 return pipeline_atq_worker_multi_stage_burst_tx(arg); 274 else if (burst && !internal_port) 275 return pipeline_atq_worker_multi_stage_burst_fwd(arg); 276 } 277 278 rte_panic("invalid worker\n"); 279 } 280 281 static int 282 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt) 283 { 284 return pipeline_launch_lcores(test, opt, worker_wrapper); 285 } 286 287 static int 288 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt) 289 { 290 int ret; 291 int nb_ports; 292 int nb_queues; 293 uint8_t queue; 294 uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS]; 295 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV]; 296 uint8_t nb_worker_queues = 0; 297 uint8_t tx_evport_id = 0; 298 uint16_t prod = 0; 299 struct rte_event_dev_info info; 300 struct test_pipeline *t = evt_test_priv(test); 301 302 nb_ports = evt_nr_active_lcores(opt->wlcores); 303 nb_queues = rte_eth_dev_count_avail(); 304 305 memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS); 306 memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV); 307 /* One queue for Tx adapter per port */ 308 if (!t->internal_port) { 309 RTE_ETH_FOREACH_DEV(prod) { 310 tx_evqueue_id[prod] = nb_queues; 311 nb_queues++; 312 } 313 } 314 315 rte_event_dev_info_get(opt->dev_id, &info); 316 317 const struct rte_event_dev_config config = { 318 .nb_event_queues = nb_queues, 319 .nb_event_ports = nb_ports, 320 .nb_events_limit = info.max_num_events, 321 .nb_event_queue_flows = opt->nb_flows, 322 .nb_event_port_dequeue_depth = 323 info.max_event_port_dequeue_depth, 324 .nb_event_port_enqueue_depth = 325 info.max_event_port_enqueue_depth, 326 }; 327 ret = rte_event_dev_configure(opt->dev_id, &config); 328 if (ret) { 329 evt_err("failed to configure eventdev %d", opt->dev_id); 330 return ret; 331 } 332 333 struct rte_event_queue_conf q_conf = { 334 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 335 .nb_atomic_flows = opt->nb_flows, 336 .nb_atomic_order_sequences = opt->nb_flows, 337 }; 338 /* queue configurations */ 339 for (queue = 0; queue < nb_queues; queue++) { 340 q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES; 341 342 if (!t->internal_port) { 343 RTE_ETH_FOREACH_DEV(prod) { 344 if (queue == tx_evqueue_id[prod]) { 345 q_conf.event_queue_cfg = 346 RTE_EVENT_QUEUE_CFG_SINGLE_LINK; 347 } else { 348 queue_arr[nb_worker_queues] = queue; 349 nb_worker_queues++; 350 } 351 } 352 } 353 354 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf); 355 if (ret) { 356 evt_err("failed to setup queue=%d", queue); 357 return ret; 358 } 359 } 360 361 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth) 362 opt->wkr_deq_dep = info.max_event_port_dequeue_depth; 363 364 /* port configuration */ 365 const struct rte_event_port_conf p_conf = { 366 .dequeue_depth = opt->wkr_deq_dep, 367 .enqueue_depth = info.max_event_port_dequeue_depth, 368 .new_event_threshold = info.max_num_events, 369 }; 370 371 if (!t->internal_port) 372 ret = pipeline_event_port_setup(test, opt, queue_arr, 373 nb_worker_queues, p_conf); 374 else 375 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues, 376 p_conf); 377 378 if (ret) 379 return ret; 380 381 /* 382 * The pipelines are setup in the following manner: 383 * 384 * eth_dev_count = 2, nb_stages = 2, atq mode 385 * 386 * eth0, eth1 have Internal port capability : 387 * queues = 2 388 * stride = 1 389 * 390 * event queue pipelines: 391 * eth0 -> q0 ->Tx 392 * eth1 -> q1 ->Tx 393 * 394 * q0, q1 are configured as ATQ so, all the different stages can 395 * be enqueued on the same queue. 396 * 397 * eth0, eth1 use Tx adapters service core : 398 * queues = 4 399 * stride = 1 400 * 401 * event queue pipelines: 402 * eth0 -> q0 -> q2 -> Tx 403 * eth1 -> q1 -> q3 -> Tx 404 * 405 * q0, q1 are configured as stated above. 406 * q2, q3 configured as SINGLE_LINK. 407 */ 408 ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf); 409 if (ret) 410 return ret; 411 ret = pipeline_event_tx_adapter_setup(opt, p_conf); 412 if (ret) 413 return ret; 414 415 if (!evt_has_distributed_sched(opt->dev_id)) { 416 uint32_t service_id; 417 rte_event_dev_service_id_get(opt->dev_id, &service_id); 418 ret = evt_service_setup(service_id); 419 if (ret) { 420 evt_err("No service lcore found to run event dev."); 421 return ret; 422 } 423 } 424 425 /* Connect the tx_evqueue_id to the Tx adapter port */ 426 if (!t->internal_port) { 427 RTE_ETH_FOREACH_DEV(prod) { 428 ret = rte_event_eth_tx_adapter_event_port_get(prod, 429 &tx_evport_id); 430 if (ret) { 431 evt_err("Unable to get Tx adapter[%d]", prod); 432 return ret; 433 } 434 435 if (rte_event_port_link(opt->dev_id, tx_evport_id, 436 &tx_evqueue_id[prod], 437 NULL, 1) != 1) { 438 evt_err("Unable to link Tx adptr[%d] evprt[%d]", 439 prod, tx_evport_id); 440 return ret; 441 } 442 } 443 } 444 445 RTE_ETH_FOREACH_DEV(prod) { 446 ret = rte_eth_dev_start(prod); 447 if (ret) { 448 evt_err("Ethernet dev [%d] failed to start." 449 " Using synthetic producer", prod); 450 return ret; 451 } 452 } 453 454 ret = rte_event_dev_start(opt->dev_id); 455 if (ret) { 456 evt_err("failed to start eventdev %d", opt->dev_id); 457 return ret; 458 } 459 460 RTE_ETH_FOREACH_DEV(prod) { 461 ret = rte_event_eth_rx_adapter_start(prod); 462 if (ret) { 463 evt_err("Rx adapter[%d] start failed", prod); 464 return ret; 465 } 466 467 ret = rte_event_eth_tx_adapter_start(prod); 468 if (ret) { 469 evt_err("Tx adapter[%d] start failed", prod); 470 return ret; 471 } 472 } 473 474 memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) * 475 RTE_MAX_ETHPORTS); 476 477 return 0; 478 } 479 480 static void 481 pipeline_atq_opt_dump(struct evt_options *opt) 482 { 483 pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt)); 484 } 485 486 static int 487 pipeline_atq_opt_check(struct evt_options *opt) 488 { 489 return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt)); 490 } 491 492 static bool 493 pipeline_atq_capability_check(struct evt_options *opt) 494 { 495 struct rte_event_dev_info dev_info; 496 497 rte_event_dev_info_get(opt->dev_id, &dev_info); 498 if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) || 499 dev_info.max_event_ports < 500 evt_nr_active_lcores(opt->wlcores)) { 501 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", 502 pipeline_atq_nb_event_queues(opt), 503 dev_info.max_event_queues, 504 evt_nr_active_lcores(opt->wlcores), 505 dev_info.max_event_ports); 506 } 507 508 return true; 509 } 510 511 static const struct evt_test_ops pipeline_atq = { 512 .cap_check = pipeline_atq_capability_check, 513 .opt_check = pipeline_atq_opt_check, 514 .opt_dump = pipeline_atq_opt_dump, 515 .test_setup = pipeline_test_setup, 516 .mempool_setup = pipeline_mempool_setup, 517 .ethdev_setup = pipeline_ethdev_setup, 518 .eventdev_setup = pipeline_atq_eventdev_setup, 519 .launch_lcores = pipeline_atq_launch_lcores, 520 .eventdev_destroy = pipeline_eventdev_destroy, 521 .mempool_destroy = pipeline_mempool_destroy, 522 .ethdev_destroy = pipeline_ethdev_destroy, 523 .test_result = pipeline_test_result, 524 .test_destroy = pipeline_test_destroy, 525 }; 526 527 EVT_TEST_REGISTER(pipeline_atq); 528