1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2017 Cavium, Inc. 4 */ 5 6 #include "test_pipeline_common.h" 7 8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */ 9 10 static __rte_always_inline int 11 pipeline_atq_nb_event_queues(struct evt_options *opt) 12 { 13 RTE_SET_USED(opt); 14 15 return rte_eth_dev_count_avail(); 16 } 17 18 static __rte_noinline int 19 pipeline_atq_worker_single_stage_tx(void *arg) 20 { 21 PIPELINE_WORKER_SINGLE_STAGE_INIT; 22 23 while (t->done == false) { 24 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 25 26 if (!event) { 27 rte_pause(); 28 continue; 29 } 30 31 pipeline_event_tx(dev, port, &ev); 32 w->processed_pkts++; 33 } 34 35 return 0; 36 } 37 38 static __rte_noinline int 39 pipeline_atq_worker_single_stage_fwd(void *arg) 40 { 41 PIPELINE_WORKER_SINGLE_STAGE_INIT; 42 const uint8_t *tx_queue = t->tx_evqueue_id; 43 44 while (t->done == false) { 45 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 46 47 if (!event) { 48 rte_pause(); 49 continue; 50 } 51 52 ev.queue_id = tx_queue[ev.mbuf->port]; 53 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 54 pipeline_event_enqueue(dev, port, &ev); 55 w->processed_pkts++; 56 } 57 58 return 0; 59 } 60 61 static __rte_noinline int 62 pipeline_atq_worker_single_stage_burst_tx(void *arg) 63 { 64 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 65 66 while (t->done == false) { 67 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 68 BURST_SIZE, 0); 69 70 if (!nb_rx) { 71 rte_pause(); 72 continue; 73 } 74 75 for (i = 0; i < nb_rx; i++) { 76 rte_prefetch0(ev[i + 1].mbuf); 77 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); 78 } 79 80 pipeline_event_tx_burst(dev, port, ev, nb_rx); 81 w->processed_pkts += nb_rx; 82 } 83 84 return 0; 85 } 86 87 static __rte_noinline int 88 pipeline_atq_worker_single_stage_burst_fwd(void *arg) 89 { 90 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT; 91 const uint8_t *tx_queue = t->tx_evqueue_id; 92 93 while (t->done == false) { 94 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 95 BURST_SIZE, 0); 96 97 if (!nb_rx) { 98 rte_pause(); 99 continue; 100 } 101 102 for (i = 0; i < nb_rx; i++) { 103 rte_prefetch0(ev[i + 1].mbuf); 104 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0); 105 ev[i].queue_id = tx_queue[ev[i].mbuf->port]; 106 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 107 } 108 109 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 110 w->processed_pkts += nb_rx; 111 } 112 113 return 0; 114 } 115 116 static __rte_noinline int 117 pipeline_atq_worker_multi_stage_tx(void *arg) 118 { 119 PIPELINE_WORKER_MULTI_STAGE_INIT; 120 121 while (t->done == false) { 122 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 123 124 if (!event) { 125 rte_pause(); 126 continue; 127 } 128 129 cq_id = ev.sub_event_type % nb_stages; 130 131 if (cq_id == last_queue) { 132 pipeline_event_tx(dev, port, &ev); 133 w->processed_pkts++; 134 continue; 135 } 136 137 ev.sub_event_type++; 138 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 139 pipeline_event_enqueue(dev, port, &ev); 140 } 141 142 return 0; 143 } 144 145 static __rte_noinline int 146 pipeline_atq_worker_multi_stage_fwd(void *arg) 147 { 148 PIPELINE_WORKER_MULTI_STAGE_INIT; 149 const uint8_t *tx_queue = t->tx_evqueue_id; 150 151 while (t->done == false) { 152 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 153 154 if (!event) { 155 rte_pause(); 156 continue; 157 } 158 159 cq_id = ev.sub_event_type % nb_stages; 160 161 if (cq_id == last_queue) { 162 ev.queue_id = tx_queue[ev.mbuf->port]; 163 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 164 w->processed_pkts++; 165 } else { 166 ev.sub_event_type++; 167 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 168 } 169 170 pipeline_event_enqueue(dev, port, &ev); 171 } 172 173 return 0; 174 } 175 176 static __rte_noinline int 177 pipeline_atq_worker_multi_stage_burst_tx(void *arg) 178 { 179 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 180 181 while (t->done == false) { 182 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 183 BURST_SIZE, 0); 184 185 if (!nb_rx) { 186 rte_pause(); 187 continue; 188 } 189 190 for (i = 0; i < nb_rx; i++) { 191 rte_prefetch0(ev[i + 1].mbuf); 192 cq_id = ev[i].sub_event_type % nb_stages; 193 194 if (cq_id == last_queue) { 195 pipeline_event_tx(dev, port, &ev[i]); 196 ev[i].op = RTE_EVENT_OP_RELEASE; 197 w->processed_pkts++; 198 continue; 199 } 200 201 ev[i].sub_event_type++; 202 pipeline_fwd_event(&ev[i], sched_type_list[cq_id]); 203 } 204 205 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 206 } 207 208 return 0; 209 } 210 211 static __rte_noinline int 212 pipeline_atq_worker_multi_stage_burst_fwd(void *arg) 213 { 214 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT; 215 const uint8_t *tx_queue = t->tx_evqueue_id; 216 217 while (t->done == false) { 218 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 219 BURST_SIZE, 0); 220 221 if (!nb_rx) { 222 rte_pause(); 223 continue; 224 } 225 226 for (i = 0; i < nb_rx; i++) { 227 rte_prefetch0(ev[i + 1].mbuf); 228 cq_id = ev[i].sub_event_type % nb_stages; 229 230 if (cq_id == last_queue) { 231 w->processed_pkts++; 232 ev[i].queue_id = tx_queue[ev[i].mbuf->port]; 233 pipeline_fwd_event(&ev[i], 234 RTE_SCHED_TYPE_ATOMIC); 235 } else { 236 ev[i].sub_event_type++; 237 pipeline_fwd_event(&ev[i], 238 sched_type_list[cq_id]); 239 } 240 } 241 242 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 243 } 244 245 return 0; 246 } 247 248 static int 249 worker_wrapper(void *arg) 250 { 251 struct worker_data *w = arg; 252 struct evt_options *opt = w->t->opt; 253 const bool burst = evt_has_burst_mode(w->dev_id); 254 const bool internal_port = w->t->internal_port; 255 const uint8_t nb_stages = opt->nb_stages; 256 RTE_SET_USED(opt); 257 258 if (nb_stages == 1) { 259 if (!burst && internal_port) 260 return pipeline_atq_worker_single_stage_tx(arg); 261 else if (!burst && !internal_port) 262 return pipeline_atq_worker_single_stage_fwd(arg); 263 else if (burst && internal_port) 264 return pipeline_atq_worker_single_stage_burst_tx(arg); 265 else if (burst && !internal_port) 266 return pipeline_atq_worker_single_stage_burst_fwd(arg); 267 } else { 268 if (!burst && internal_port) 269 return pipeline_atq_worker_multi_stage_tx(arg); 270 else if (!burst && !internal_port) 271 return pipeline_atq_worker_multi_stage_fwd(arg); 272 if (burst && internal_port) 273 return pipeline_atq_worker_multi_stage_burst_tx(arg); 274 else if (burst && !internal_port) 275 return pipeline_atq_worker_multi_stage_burst_fwd(arg); 276 } 277 278 rte_panic("invalid worker\n"); 279 } 280 281 static int 282 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt) 283 { 284 return pipeline_launch_lcores(test, opt, worker_wrapper); 285 } 286 287 static int 288 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt) 289 { 290 int ret; 291 int nb_ports; 292 int nb_queues; 293 uint8_t queue; 294 uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS]; 295 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV]; 296 uint8_t nb_worker_queues = 0; 297 uint8_t tx_evport_id = 0; 298 uint16_t prod = 0; 299 struct rte_event_dev_info info; 300 struct test_pipeline *t = evt_test_priv(test); 301 302 nb_ports = evt_nr_active_lcores(opt->wlcores); 303 nb_queues = rte_eth_dev_count_avail(); 304 305 memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS); 306 memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV); 307 /* One queue for Tx adapter per port */ 308 if (!t->internal_port) { 309 RTE_ETH_FOREACH_DEV(prod) { 310 tx_evqueue_id[prod] = nb_queues; 311 nb_queues++; 312 } 313 } 314 315 rte_event_dev_info_get(opt->dev_id, &info); 316 317 ret = evt_configure_eventdev(opt, nb_queues, nb_ports); 318 if (ret) { 319 evt_err("failed to configure eventdev %d", opt->dev_id); 320 return ret; 321 } 322 323 struct rte_event_queue_conf q_conf = { 324 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 325 .nb_atomic_flows = opt->nb_flows, 326 .nb_atomic_order_sequences = opt->nb_flows, 327 }; 328 /* queue configurations */ 329 for (queue = 0; queue < nb_queues; queue++) { 330 q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES; 331 332 if (!t->internal_port) { 333 RTE_ETH_FOREACH_DEV(prod) { 334 if (queue == tx_evqueue_id[prod]) { 335 q_conf.event_queue_cfg = 336 RTE_EVENT_QUEUE_CFG_SINGLE_LINK; 337 } else { 338 queue_arr[nb_worker_queues] = queue; 339 nb_worker_queues++; 340 } 341 } 342 } 343 344 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf); 345 if (ret) { 346 evt_err("failed to setup queue=%d", queue); 347 return ret; 348 } 349 } 350 351 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth) 352 opt->wkr_deq_dep = info.max_event_port_dequeue_depth; 353 354 /* port configuration */ 355 const struct rte_event_port_conf p_conf = { 356 .dequeue_depth = opt->wkr_deq_dep, 357 .enqueue_depth = info.max_event_port_dequeue_depth, 358 .new_event_threshold = info.max_num_events, 359 }; 360 361 if (!t->internal_port) 362 ret = pipeline_event_port_setup(test, opt, queue_arr, 363 nb_worker_queues, p_conf); 364 else 365 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues, 366 p_conf); 367 368 if (ret) 369 return ret; 370 371 /* 372 * The pipelines are setup in the following manner: 373 * 374 * eth_dev_count = 2, nb_stages = 2, atq mode 375 * 376 * eth0, eth1 have Internal port capability : 377 * queues = 2 378 * stride = 1 379 * 380 * event queue pipelines: 381 * eth0 -> q0 ->Tx 382 * eth1 -> q1 ->Tx 383 * 384 * q0, q1 are configured as ATQ so, all the different stages can 385 * be enqueued on the same queue. 386 * 387 * eth0, eth1 use Tx adapters service core : 388 * queues = 4 389 * stride = 1 390 * 391 * event queue pipelines: 392 * eth0 -> q0 -> q2 -> Tx 393 * eth1 -> q1 -> q3 -> Tx 394 * 395 * q0, q1 are configured as stated above. 396 * q2, q3 configured as SINGLE_LINK. 397 */ 398 ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf); 399 if (ret) 400 return ret; 401 ret = pipeline_event_tx_adapter_setup(opt, p_conf); 402 if (ret) 403 return ret; 404 405 if (!evt_has_distributed_sched(opt->dev_id)) { 406 uint32_t service_id; 407 rte_event_dev_service_id_get(opt->dev_id, &service_id); 408 ret = evt_service_setup(service_id); 409 if (ret) { 410 evt_err("No service lcore found to run event dev."); 411 return ret; 412 } 413 } 414 415 /* Connect the tx_evqueue_id to the Tx adapter port */ 416 if (!t->internal_port) { 417 RTE_ETH_FOREACH_DEV(prod) { 418 ret = rte_event_eth_tx_adapter_event_port_get(prod, 419 &tx_evport_id); 420 if (ret) { 421 evt_err("Unable to get Tx adapter[%d]", prod); 422 return ret; 423 } 424 425 if (rte_event_port_link(opt->dev_id, tx_evport_id, 426 &tx_evqueue_id[prod], 427 NULL, 1) != 1) { 428 evt_err("Unable to link Tx adptr[%d] evprt[%d]", 429 prod, tx_evport_id); 430 return ret; 431 } 432 } 433 } 434 435 ret = rte_event_dev_start(opt->dev_id); 436 if (ret) { 437 evt_err("failed to start eventdev %d", opt->dev_id); 438 return ret; 439 } 440 441 442 RTE_ETH_FOREACH_DEV(prod) { 443 ret = rte_eth_dev_start(prod); 444 if (ret) { 445 evt_err("Ethernet dev [%d] failed to start." 446 " Using synthetic producer", prod); 447 return ret; 448 } 449 } 450 451 RTE_ETH_FOREACH_DEV(prod) { 452 ret = rte_event_eth_rx_adapter_start(prod); 453 if (ret) { 454 evt_err("Rx adapter[%d] start failed", prod); 455 return ret; 456 } 457 458 ret = rte_event_eth_tx_adapter_start(prod); 459 if (ret) { 460 evt_err("Tx adapter[%d] start failed", prod); 461 return ret; 462 } 463 } 464 465 memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) * 466 RTE_MAX_ETHPORTS); 467 468 return 0; 469 } 470 471 static void 472 pipeline_atq_opt_dump(struct evt_options *opt) 473 { 474 pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt)); 475 } 476 477 static int 478 pipeline_atq_opt_check(struct evt_options *opt) 479 { 480 return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt)); 481 } 482 483 static bool 484 pipeline_atq_capability_check(struct evt_options *opt) 485 { 486 struct rte_event_dev_info dev_info; 487 488 rte_event_dev_info_get(opt->dev_id, &dev_info); 489 if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) || 490 dev_info.max_event_ports < 491 evt_nr_active_lcores(opt->wlcores)) { 492 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", 493 pipeline_atq_nb_event_queues(opt), 494 dev_info.max_event_queues, 495 evt_nr_active_lcores(opt->wlcores), 496 dev_info.max_event_ports); 497 } 498 499 return true; 500 } 501 502 static const struct evt_test_ops pipeline_atq = { 503 .cap_check = pipeline_atq_capability_check, 504 .opt_check = pipeline_atq_opt_check, 505 .opt_dump = pipeline_atq_opt_dump, 506 .test_setup = pipeline_test_setup, 507 .mempool_setup = pipeline_mempool_setup, 508 .ethdev_setup = pipeline_ethdev_setup, 509 .eventdev_setup = pipeline_atq_eventdev_setup, 510 .launch_lcores = pipeline_atq_launch_lcores, 511 .eventdev_destroy = pipeline_eventdev_destroy, 512 .mempool_destroy = pipeline_mempool_destroy, 513 .ethdev_destroy = pipeline_ethdev_destroy, 514 .test_result = pipeline_test_result, 515 .test_destroy = pipeline_test_destroy, 516 }; 517 518 EVT_TEST_REGISTER(pipeline_atq); 519