1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2017 Cavium, Inc. 4 */ 5 6 #include "test_pipeline_common.h" 7 8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */ 9 10 static __rte_always_inline int 11 pipeline_atq_nb_event_queues(struct evt_options *opt) 12 { 13 RTE_SET_USED(opt); 14 15 return rte_eth_dev_count(); 16 } 17 18 static int 19 pipeline_atq_worker_single_stage_tx(void *arg) 20 { 21 PIPELINE_WROKER_SINGLE_STAGE_INIT; 22 23 while (t->done == false) { 24 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 25 26 if (!event) { 27 rte_pause(); 28 continue; 29 } 30 31 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 32 pipeline_tx_pkt(ev.mbuf); 33 w->processed_pkts++; 34 continue; 35 } 36 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 37 pipeline_event_enqueue(dev, port, &ev); 38 } 39 40 return 0; 41 } 42 43 static int 44 pipeline_atq_worker_single_stage_fwd(void *arg) 45 { 46 PIPELINE_WROKER_SINGLE_STAGE_INIT; 47 const uint8_t tx_queue = t->tx_service.queue_id; 48 49 while (t->done == false) { 50 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 51 52 if (!event) { 53 rte_pause(); 54 continue; 55 } 56 57 w->processed_pkts++; 58 ev.queue_id = tx_queue; 59 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 60 pipeline_event_enqueue(dev, port, &ev); 61 } 62 63 return 0; 64 } 65 66 static int 67 pipeline_atq_worker_single_stage_burst_tx(void *arg) 68 { 69 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT; 70 71 while (t->done == false) { 72 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 73 BURST_SIZE, 0); 74 75 if (!nb_rx) { 76 rte_pause(); 77 continue; 78 } 79 80 for (i = 0; i < nb_rx; i++) { 81 rte_prefetch0(ev[i + 1].mbuf); 82 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 83 84 pipeline_tx_pkt(ev[i].mbuf); 85 ev[i].op = RTE_EVENT_OP_RELEASE; 86 w->processed_pkts++; 87 } else 88 pipeline_fwd_event(&ev[i], 89 RTE_SCHED_TYPE_ATOMIC); 90 } 91 92 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 93 } 94 95 return 0; 96 } 97 98 static int 99 pipeline_atq_worker_single_stage_burst_fwd(void *arg) 100 { 101 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT; 102 const uint8_t tx_queue = t->tx_service.queue_id; 103 104 while (t->done == false) { 105 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 106 BURST_SIZE, 0); 107 108 if (!nb_rx) { 109 rte_pause(); 110 continue; 111 } 112 113 for (i = 0; i < nb_rx; i++) { 114 rte_prefetch0(ev[i + 1].mbuf); 115 ev[i].queue_id = tx_queue; 116 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); 117 w->processed_pkts++; 118 } 119 120 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 121 } 122 123 return 0; 124 } 125 126 static int 127 pipeline_atq_worker_multi_stage_tx(void *arg) 128 { 129 PIPELINE_WROKER_MULTI_STAGE_INIT; 130 const uint8_t nb_stages = t->opt->nb_stages; 131 132 133 while (t->done == false) { 134 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 135 136 if (!event) { 137 rte_pause(); 138 continue; 139 } 140 141 cq_id = ev.sub_event_type % nb_stages; 142 143 if (cq_id == last_queue) { 144 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { 145 146 pipeline_tx_pkt(ev.mbuf); 147 w->processed_pkts++; 148 continue; 149 } 150 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 151 } else { 152 ev.sub_event_type++; 153 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 154 } 155 156 pipeline_event_enqueue(dev, port, &ev); 157 } 158 return 0; 159 } 160 161 static int 162 pipeline_atq_worker_multi_stage_fwd(void *arg) 163 { 164 PIPELINE_WROKER_MULTI_STAGE_INIT; 165 const uint8_t nb_stages = t->opt->nb_stages; 166 const uint8_t tx_queue = t->tx_service.queue_id; 167 168 while (t->done == false) { 169 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 170 171 if (!event) { 172 rte_pause(); 173 continue; 174 } 175 176 cq_id = ev.sub_event_type % nb_stages; 177 178 if (cq_id == last_queue) { 179 w->processed_pkts++; 180 ev.queue_id = tx_queue; 181 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); 182 } else { 183 ev.sub_event_type++; 184 pipeline_fwd_event(&ev, sched_type_list[cq_id]); 185 } 186 187 pipeline_event_enqueue(dev, port, &ev); 188 } 189 return 0; 190 } 191 192 static int 193 pipeline_atq_worker_multi_stage_burst_tx(void *arg) 194 { 195 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT; 196 const uint8_t nb_stages = t->opt->nb_stages; 197 198 while (t->done == false) { 199 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 200 BURST_SIZE, 0); 201 202 if (!nb_rx) { 203 rte_pause(); 204 continue; 205 } 206 207 for (i = 0; i < nb_rx; i++) { 208 rte_prefetch0(ev[i + 1].mbuf); 209 cq_id = ev[i].sub_event_type % nb_stages; 210 211 if (cq_id == last_queue) { 212 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { 213 214 pipeline_tx_pkt(ev[i].mbuf); 215 ev[i].op = RTE_EVENT_OP_RELEASE; 216 w->processed_pkts++; 217 continue; 218 } 219 220 pipeline_fwd_event(&ev[i], 221 RTE_SCHED_TYPE_ATOMIC); 222 } else { 223 ev[i].sub_event_type++; 224 pipeline_fwd_event(&ev[i], 225 sched_type_list[cq_id]); 226 } 227 } 228 229 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 230 } 231 return 0; 232 } 233 234 static int 235 pipeline_atq_worker_multi_stage_burst_fwd(void *arg) 236 { 237 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT; 238 const uint8_t nb_stages = t->opt->nb_stages; 239 const uint8_t tx_queue = t->tx_service.queue_id; 240 241 while (t->done == false) { 242 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, 243 BURST_SIZE, 0); 244 245 if (!nb_rx) { 246 rte_pause(); 247 continue; 248 } 249 250 for (i = 0; i < nb_rx; i++) { 251 rte_prefetch0(ev[i + 1].mbuf); 252 cq_id = ev[i].sub_event_type % nb_stages; 253 254 if (cq_id == last_queue) { 255 w->processed_pkts++; 256 ev[i].queue_id = tx_queue; 257 pipeline_fwd_event(&ev[i], 258 RTE_SCHED_TYPE_ATOMIC); 259 } else { 260 ev[i].sub_event_type++; 261 pipeline_fwd_event(&ev[i], 262 sched_type_list[cq_id]); 263 } 264 } 265 266 pipeline_event_enqueue_burst(dev, port, ev, nb_rx); 267 } 268 return 0; 269 } 270 271 static int 272 worker_wrapper(void *arg) 273 { 274 struct worker_data *w = arg; 275 struct evt_options *opt = w->t->opt; 276 const bool burst = evt_has_burst_mode(w->dev_id); 277 const bool mt_safe = !w->t->mt_unsafe; 278 const uint8_t nb_stages = opt->nb_stages; 279 RTE_SET_USED(opt); 280 281 if (nb_stages == 1) { 282 if (!burst && mt_safe) 283 return pipeline_atq_worker_single_stage_tx(arg); 284 else if (!burst && !mt_safe) 285 return pipeline_atq_worker_single_stage_fwd(arg); 286 else if (burst && mt_safe) 287 return pipeline_atq_worker_single_stage_burst_tx(arg); 288 else if (burst && !mt_safe) 289 return pipeline_atq_worker_single_stage_burst_fwd(arg); 290 } else { 291 if (!burst && mt_safe) 292 return pipeline_atq_worker_multi_stage_tx(arg); 293 else if (!burst && !mt_safe) 294 return pipeline_atq_worker_multi_stage_fwd(arg); 295 if (burst && mt_safe) 296 return pipeline_atq_worker_multi_stage_burst_tx(arg); 297 else if (burst && !mt_safe) 298 return pipeline_atq_worker_multi_stage_burst_fwd(arg); 299 } 300 rte_panic("invalid worker\n"); 301 } 302 303 static int 304 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt) 305 { 306 struct test_pipeline *t = evt_test_priv(test); 307 308 if (t->mt_unsafe) 309 rte_service_component_runstate_set(t->tx_service.service_id, 1); 310 return pipeline_launch_lcores(test, opt, worker_wrapper); 311 } 312 313 static int 314 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt) 315 { 316 int ret; 317 int nb_ports; 318 int nb_queues; 319 uint8_t queue; 320 struct rte_event_dev_info info; 321 struct test_pipeline *t = evt_test_priv(test); 322 uint8_t tx_evqueue_id = 0; 323 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV]; 324 uint8_t nb_worker_queues = 0; 325 326 nb_ports = evt_nr_active_lcores(opt->wlcores); 327 nb_queues = rte_eth_dev_count(); 328 329 /* One extra port and queueu for Tx service */ 330 if (t->mt_unsafe) { 331 tx_evqueue_id = nb_queues; 332 nb_ports++; 333 nb_queues++; 334 } 335 336 337 rte_event_dev_info_get(opt->dev_id, &info); 338 339 const struct rte_event_dev_config config = { 340 .nb_event_queues = nb_queues, 341 .nb_event_ports = nb_ports, 342 .nb_events_limit = info.max_num_events, 343 .nb_event_queue_flows = opt->nb_flows, 344 .nb_event_port_dequeue_depth = 345 info.max_event_port_dequeue_depth, 346 .nb_event_port_enqueue_depth = 347 info.max_event_port_enqueue_depth, 348 }; 349 ret = rte_event_dev_configure(opt->dev_id, &config); 350 if (ret) { 351 evt_err("failed to configure eventdev %d", opt->dev_id); 352 return ret; 353 } 354 355 struct rte_event_queue_conf q_conf = { 356 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, 357 .nb_atomic_flows = opt->nb_flows, 358 .nb_atomic_order_sequences = opt->nb_flows, 359 }; 360 /* queue configurations */ 361 for (queue = 0; queue < nb_queues; queue++) { 362 q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES; 363 364 if (t->mt_unsafe) { 365 if (queue == tx_evqueue_id) { 366 q_conf.event_queue_cfg = 367 RTE_EVENT_QUEUE_CFG_SINGLE_LINK; 368 } else { 369 queue_arr[nb_worker_queues] = queue; 370 nb_worker_queues++; 371 } 372 } 373 374 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf); 375 if (ret) { 376 evt_err("failed to setup queue=%d", queue); 377 return ret; 378 } 379 } 380 381 /* port configuration */ 382 const struct rte_event_port_conf p_conf = { 383 .dequeue_depth = opt->wkr_deq_dep, 384 .enqueue_depth = info.max_event_port_dequeue_depth, 385 .new_event_threshold = info.max_num_events, 386 }; 387 388 if (t->mt_unsafe) { 389 ret = pipeline_event_port_setup(test, opt, queue_arr, 390 nb_worker_queues, p_conf); 391 if (ret) 392 return ret; 393 394 ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id, 395 nb_ports - 1, p_conf); 396 } else 397 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues, 398 p_conf); 399 400 if (ret) 401 return ret; 402 403 /* 404 * The pipelines are setup in the following manner: 405 * 406 * eth_dev_count = 2, nb_stages = 2, atq mode 407 * 408 * Multi thread safe : 409 * queues = 2 410 * stride = 1 411 * 412 * event queue pipelines: 413 * eth0 -> q0 ->tx 414 * eth1 -> q1 ->tx 415 * 416 * q0, q1 are configured as ATQ so, all the different stages can 417 * be enqueued on the same queue. 418 * 419 * Multi thread unsafe : 420 * queues = 3 421 * stride = 1 422 * 423 * event queue pipelines: 424 * eth0 -> q0 425 * } (q3->tx) Tx service 426 * eth1 -> q1 427 * 428 * q0,q1 are configured as stated above. 429 * q3 configured as SINGLE_LINK|ATOMIC. 430 */ 431 ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf); 432 if (ret) 433 return ret; 434 435 if (!evt_has_distributed_sched(opt->dev_id)) { 436 uint32_t service_id; 437 rte_event_dev_service_id_get(opt->dev_id, &service_id); 438 ret = evt_service_setup(service_id); 439 if (ret) { 440 evt_err("No service lcore found to run event dev."); 441 return ret; 442 } 443 } 444 445 ret = rte_event_dev_start(opt->dev_id); 446 if (ret) { 447 evt_err("failed to start eventdev %d", opt->dev_id); 448 return ret; 449 } 450 451 return 0; 452 } 453 454 static void 455 pipeline_atq_opt_dump(struct evt_options *opt) 456 { 457 pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt)); 458 } 459 460 static int 461 pipeline_atq_opt_check(struct evt_options *opt) 462 { 463 return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt)); 464 } 465 466 static bool 467 pipeline_atq_capability_check(struct evt_options *opt) 468 { 469 struct rte_event_dev_info dev_info; 470 471 rte_event_dev_info_get(opt->dev_id, &dev_info); 472 if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) || 473 dev_info.max_event_ports < 474 evt_nr_active_lcores(opt->wlcores)) { 475 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d", 476 pipeline_atq_nb_event_queues(opt), 477 dev_info.max_event_queues, 478 evt_nr_active_lcores(opt->wlcores), 479 dev_info.max_event_ports); 480 } 481 482 return true; 483 } 484 485 static const struct evt_test_ops pipeline_atq = { 486 .cap_check = pipeline_atq_capability_check, 487 .opt_check = pipeline_atq_opt_check, 488 .opt_dump = pipeline_atq_opt_dump, 489 .test_setup = pipeline_test_setup, 490 .mempool_setup = pipeline_mempool_setup, 491 .ethdev_setup = pipeline_ethdev_setup, 492 .eventdev_setup = pipeline_atq_eventdev_setup, 493 .launch_lcores = pipeline_atq_launch_lcores, 494 .eventdev_destroy = pipeline_eventdev_destroy, 495 .mempool_destroy = pipeline_mempool_destroy, 496 .ethdev_destroy = pipeline_ethdev_destroy, 497 .test_result = pipeline_test_result, 498 .test_destroy = pipeline_test_destroy, 499 }; 500 501 EVT_TEST_REGISTER(pipeline_atq); 502