1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2017 Cavium, Inc. 4 */ 5 6 #include "test_pipeline_common.h" 7 8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */ 9 10 static __rte_always_inline int 11 pipeline_queue_nb_event_queues(struct evt_options *opt) 12 { 13 uint16_t eth_count = rte_eth_dev_count_avail(); 14 15 return (eth_count * opt->nb_stages) + eth_count; 16 } 17 18 static int 19 pipeline_queue_worker_single_stage_tx(void *arg) 20 { 21 PIPELINE_WROKER_SINGLE_STAGE_INIT; 22 23 while (t->done == false) { 24 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 25 26 if (!event) { 27 rte_pause(); 28 continue; 29 } 30 31 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 32 pipeline_tx_pkt(ev.mbuf); 33 w->processed_pkts++; 34 } else { 35 ev.queue_id++; 36 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 37 pipeline_event_enqueue(dev, port, &ev); 38 } 39 } 40 41 return 0; 42 } 43 44 static int 45 pipeline_queue_worker_single_stage_fwd(void *arg) 46 { 47 PIPELINE_WROKER_SINGLE_STAGE_INIT; 48 const uint8_t tx_queue = t->tx_service.queue_id; 49 50 while (t->done == false) { 51 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 52 53 if (!event) { 54 rte_pause(); 55 continue; 56 } 57 58 ev.queue_id = tx_queue; 59 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 60 pipeline_event_enqueue(dev, port, &ev); 61 w->processed_pkts++; 62 } 63 64 return 0; 65 } 66 67 static int 68 pipeline_queue_worker_single_stage_burst_tx(void *arg) 69 { 70 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT; 71 72 while (t->done == false) { 73 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 74 BURST_SIZE, 0); 75 76 if (!nb_rx) { 77 rte_pause(); 78 continue; 79 } 80 81 for (i = 0; i < nb_rx; i++) { 82 rte_prefetch0(ev[i + 1].mbuf); 83 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 84 85 pipeline_tx_pkt(ev[i].mbuf); 86 ev[i].op = RTE_EVENT_OP_RELEASE; 87 w->processed_pkts++; 88 } else { 89 ev[i].queue_id++; 90 pipeline_fwd_event(&ev[i], 91 RTE_SCHED_TYPE_ATOMIC); 92 } 93 } 94 95 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 96 } 97 98 return 0; 99 } 100 101 static int 102 pipeline_queue_worker_single_stage_burst_fwd(void *arg) 103 { 104 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT; 105 const uint8_t tx_queue = t->tx_service.queue_id; 106 107 while (t->done == false) { 108 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 109 BURST_SIZE, 0); 110 111 if (!nb_rx) { 112 rte_pause(); 113 continue; 114 } 115 116 for (i = 0; i < nb_rx; i++) { 117 rte_prefetch0(ev[i + 1].mbuf); 118 ev[i].queue_id = tx_queue; 119 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 120 w->processed_pkts++; 121 } 122 123 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 124 } 125 126 return 0; 127 } 128 129 130 static int 131 pipeline_queue_worker_multi_stage_tx(void *arg) 132 { 133 PIPELINE_WROKER_MULTI_STAGE_INIT; 134 const uint8_t nb_stages = t->opt->nb_stages + 1; 135 136 while (t->done == false) { 137 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 138 139 if (!event) { 140 rte_pause(); 141 continue; 142 } 143 144 cq_id = ev.queue_id % nb_stages; 145 146 if (cq_id >= last_queue) { 147 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 148 149 pipeline_tx_pkt(ev.mbuf); 150 w->processed_pkts++; 151 continue; 152 } 153 ev.queue_id += (cq_id == last_queue) ? 1 : 0; 154 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 155 } else { 156 ev.queue_id++; 157 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 158 } 159 160 pipeline_event_enqueue(dev, port, &ev); 161 } 162 return 0; 163 } 164 165 static int 166 pipeline_queue_worker_multi_stage_fwd(void *arg) 167 { 168 PIPELINE_WROKER_MULTI_STAGE_INIT; 169 const uint8_t nb_stages = t->opt->nb_stages + 1; 170 const uint8_t tx_queue = t->tx_service.queue_id; 171 172 while (t->done == false) { 173 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 174 175 if (!event) { 176 rte_pause(); 177 continue; 178 } 179 180 cq_id = ev.queue_id % nb_stages; 181 182 if (cq_id == last_queue) { 183 ev.queue_id = tx_queue; 184 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 185 w->processed_pkts++; 186 } else { 187 ev.queue_id++; 188 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 189 } 190 191 pipeline_event_enqueue(dev, port, &ev); 192 } 193 return 0; 194 } 195 196 static int 197 pipeline_queue_worker_multi_stage_burst_tx(void *arg) 198 { 199 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT; 200 const uint8_t nb_stages = t->opt->nb_stages + 1; 201 202 while (t->done == false) { 203 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 204 BURST_SIZE, 0); 205 206 if (!nb_rx) { 207 rte_pause(); 208 continue; 209 } 210 211 for (i = 0; i < nb_rx; i++) { 212 rte_prefetch0(ev[i + 1].mbuf); 213 cq_id = ev[i].queue_id % nb_stages; 214 215 if (cq_id >= last_queue) { 216 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 217 218 pipeline_tx_pkt(ev[i].mbuf); 219 ev[i].op = RTE_EVENT_OP_RELEASE; 220 w->processed_pkts++; 221 continue; 222 } 223 224 ev[i].queue_id += (cq_id == last_queue) ? 1 : 0; 225 pipeline_fwd_event(&ev[i], 226 RTE_SCHED_TYPE_ATOMIC); 227 } else { 228 ev[i].queue_id++; 229 pipeline_fwd_event(&ev[i], 230 sched_type_list[cq_id]); 231 } 232 233 } 234 235 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 236 } 237 return 0; 238 } 239 240 static int 241 pipeline_queue_worker_multi_stage_burst_fwd(void *arg) 242 { 243 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT; 244 const uint8_t nb_stages = t->opt->nb_stages + 1; 245 const uint8_t tx_queue = t->tx_service.queue_id; 246 247 while (t->done == false) { 248 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 249 BURST_SIZE, 0); 250 251 if (!nb_rx) { 252 rte_pause(); 253 continue; 254 } 255 256 for (i = 0; i < nb_rx; i++) { 257 rte_prefetch0(ev[i + 1].mbuf); 258 cq_id = ev[i].queue_id % nb_stages; 259 260 if (cq_id == last_queue) { 261 ev[i].queue_id = tx_queue; 262 pipeline_fwd_event(&ev[i], 263 RTE_SCHED_TYPE_ATOMIC); 264 w->processed_pkts++; 265 } else { 266 ev[i].queue_id++; 267 pipeline_fwd_event(&ev[i], 268 sched_type_list[cq_id]); 269 } 270 } 271 272 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 273 } 274 return 0; 275 } 276 277 static int 278 worker_wrapper(void *arg) 279 { 280 struct worker_data *w = arg; 281 struct evt_options *opt = w->t->opt; 282 const bool burst = evt_has_burst_mode(w->dev_id); 283 const bool mt_safe = !w->t->mt_unsafe; 284 const uint8_t nb_stages = opt->nb_stages; 285 RTE_SET_USED(opt); 286 287 if (nb_stages == 1) { 288 if (!burst && mt_safe) 289 return pipeline_queue_worker_single_stage_tx(arg); 290 else if (!burst && !mt_safe) 291 return pipeline_queue_worker_single_stage_fwd(arg); 292 else if (burst && mt_safe) 293 return pipeline_queue_worker_single_stage_burst_tx(arg); 294 else if (burst && !mt_safe) 295 return pipeline_queue_worker_single_stage_burst_fwd( 296 arg); 297 } else { 298 if (!burst && mt_safe) 299 return pipeline_queue_worker_multi_stage_tx(arg); 300 else if (!burst && !mt_safe) 301 return pipeline_queue_worker_multi_stage_fwd(arg); 302 else if (burst && mt_safe) 303 return pipeline_queue_worker_multi_stage_burst_tx(arg); 304 else if (burst && !mt_safe) 305 return pipeline_queue_worker_multi_stage_burst_fwd(arg); 306 307 } 308 rte_panic("invalid worker\n"); 309 } 310 311 static int 312 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt) 313 { 314 struct test_pipeline *t = evt_test_priv(test); 315 316 if (t->mt_unsafe) 317 rte_service_component_runstate_set(t->tx_service.service_id, 1); 318 return pipeline_launch_lcores(test, opt, worker_wrapper); 319 } 320 321 static int 322 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt) 323 { 324 int ret; 325 int nb_ports; 326 int nb_queues; 327 int nb_stages = opt->nb_stages; 328 uint8_t queue; 329 struct rte_event_dev_info info; 330 struct test_pipeline *t = evt_test_priv(test); 331 uint8_t tx_evqueue_id = 0; 332 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV]; 333 uint8_t nb_worker_queues = 0; 334 335 nb_ports = evt_nr_active_lcores(opt->wlcores); 336 nb_queues = rte_eth_dev_count_avail() * (nb_stages); 337 338 /* Extra port for Tx service. */ 339 if (t->mt_unsafe) { 340 tx_evqueue_id = nb_queues; 341 nb_ports++; 342 nb_queues++; 343 } else 344 nb_queues += rte_eth_dev_count_avail(); 345 346 rte_event_dev_info_get(opt->dev_id, &info); 347 348 const struct rte_event_dev_config config = { 349 .nb_event_queues = nb_queues, 350 .nb_event_ports = nb_ports, 351 .nb_events_limit = info.max_num_events, 352 .nb_event_queue_flows = opt->nb_flows, 353 .nb_event_port_dequeue_depth = 354 info.max_event_port_dequeue_depth, 355 .nb_event_port_enqueue_depth = 356 info.max_event_port_enqueue_depth, 357 }; 358 ret = rte_event_dev_configure(opt->dev_id, &config); 359 if (ret) { 360 evt_err("failed to configure eventdev %d", opt->dev_id); 361 return ret; 362 } 363 364 struct rte_event_queue_conf q_conf = { 365 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 366 .nb_atomic_flows = opt->nb_flows, 367 .nb_atomic_order_sequences = opt->nb_flows, 368 }; 369 /* queue configurations */ 370 for (queue = 0; queue < nb_queues; queue++) { 371 uint8_t slot; 372 373 if (!t->mt_unsafe) { 374 slot = queue % (nb_stages + 1); 375 q_conf.schedule_type = slot == nb_stages ? 376 RTE_SCHED_TYPE_ATOMIC : 377 opt->sched_type_list[slot]; 378 } else { 379 slot = queue % nb_stages; 380 381 if (queue == tx_evqueue_id) { 382 q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC; 383 q_conf.event_queue_cfg = 384 RTE_EVENT_QUEUE_CFG_SINGLE_LINK; 385 } else { 386 q_conf.schedule_type = 387 opt->sched_type_list[slot]; 388 queue_arr[nb_worker_queues] = queue; 389 nb_worker_queues++; 390 } 391 } 392 393 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf); 394 if (ret) { 395 evt_err("failed to setup queue=%d", queue); 396 return ret; 397 } 398 } 399 400 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth) 401 opt->wkr_deq_dep = info.max_event_port_dequeue_depth; 402 403 /* port configuration */ 404 const struct rte_event_port_conf p_conf = { 405 .dequeue_depth = opt->wkr_deq_dep, 406 .enqueue_depth = info.max_event_port_dequeue_depth, 407 .new_event_threshold = info.max_num_events, 408 }; 409 410 /* 411 * If tx is multi thread safe then allow workers to do Tx else use Tx 412 * service to Tx packets. 413 */ 414 if (t->mt_unsafe) { 415 ret = pipeline_event_port_setup(test, opt, queue_arr, 416 nb_worker_queues, p_conf); 417 if (ret) 418 return ret; 419 420 ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id, 421 nb_ports - 1, p_conf); 422 423 } else 424 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues, 425 p_conf); 426 427 if (ret) 428 return ret; 429 /* 430 * The pipelines are setup in the following manner: 431 * 432 * eth_dev_count = 2, nb_stages = 2. 433 * 434 * Multi thread safe : 435 * queues = 6 436 * stride = 3 437 * 438 * event queue pipelines: 439 * eth0 -> q0 -> q1 -> (q2->tx) 440 * eth1 -> q3 -> q4 -> (q5->tx) 441 * 442 * q2, q5 configured as ATOMIC 443 * 444 * Multi thread unsafe : 445 * queues = 5 446 * stride = 2 447 * 448 * event queue pipelines: 449 * eth0 -> q0 -> q1 450 * } (q4->tx) Tx service 451 * eth1 -> q2 -> q3 452 * 453 * q4 configured as SINGLE_LINK|ATOMIC 454 */ 455 ret = pipeline_event_rx_adapter_setup(opt, 456 t->mt_unsafe ? nb_stages : nb_stages + 1, p_conf); 457 if (ret) 458 return ret; 459 460 if (!evt_has_distributed_sched(opt->dev_id)) { 461 uint32_t service_id; 462 rte_event_dev_service_id_get(opt->dev_id, &service_id); 463 ret = evt_service_setup(service_id); 464 if (ret) { 465 evt_err("No service lcore found to run event dev."); 466 return ret; 467 } 468 } 469 470 ret = rte_event_dev_start(opt->dev_id); 471 if (ret) { 472 evt_err("failed to start eventdev %d", opt->dev_id); 473 return ret; 474 } 475 476 return 0; 477 } 478 479 static void 480 pipeline_queue_opt_dump(struct evt_options *opt) 481 { 482 pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt)); 483 } 484 485 static int 486 pipeline_queue_opt_check(struct evt_options *opt) 487 { 488 return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt)); 489 } 490 491 static bool 492 pipeline_queue_capability_check(struct evt_options *opt) 493 { 494 struct rte_event_dev_info dev_info; 495 496 rte_event_dev_info_get(opt->dev_id, &dev_info); 497 if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) || 498 dev_info.max_event_ports < 499 evt_nr_active_lcores(opt->wlcores)) { 500 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", 501 pipeline_queue_nb_event_queues(opt), 502 dev_info.max_event_queues, 503 evt_nr_active_lcores(opt->wlcores), 504 dev_info.max_event_ports); 505 } 506 507 return true; 508 } 509 510 static const struct evt_test_ops pipeline_queue = { 511 .cap_check = pipeline_queue_capability_check, 512 .opt_check = pipeline_queue_opt_check, 513 .opt_dump = pipeline_queue_opt_dump, 514 .test_setup = pipeline_test_setup, 515 .mempool_setup = pipeline_mempool_setup, 516 .ethdev_setup = pipeline_ethdev_setup, 517 .eventdev_setup = pipeline_queue_eventdev_setup, 518 .launch_lcores = pipeline_queue_launch_lcores, 519 .eventdev_destroy = pipeline_eventdev_destroy, 520 .mempool_destroy = pipeline_mempool_destroy, 521 .ethdev_destroy = pipeline_ethdev_destroy, 522 .test_result = pipeline_test_result, 523 .test_destroy = pipeline_test_destroy, 524 }; 525 526 EVT_TEST_REGISTER(pipeline_queue); 527