1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2017 Cavium, Inc. 4 */ 5 6 #include "test_pipeline_common.h" 7 8 static int32_t 9 pipeline_event_tx_burst_service_func(void *args) 10 { 11 12 int i; 13 struct tx_service_data *tx = args; 14 const uint8_t dev = tx->dev_id; 15 const uint8_t port = tx->port_id; 16 struct rte_event ev[BURST_SIZE + 1]; 17 18 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 19 20 if (!nb_rx) { 21 for (i = 0; i < tx->nb_ethports; i++) 22 rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]); 23 return 0; 24 } 25 26 for (i = 0; i < nb_rx; i++) { 27 struct rte_mbuf *m = ev[i].mbuf; 28 rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m); 29 } 30 tx->processed_pkts += nb_rx; 31 32 return 0; 33 } 34 35 static int32_t 36 pipeline_event_tx_service_func(void *args) 37 { 38 39 int i; 40 struct tx_service_data *tx = args; 41 const uint8_t dev = tx->dev_id; 42 const uint8_t port = tx->port_id; 43 struct rte_event ev; 44 45 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 46 47 if (!nb_rx) { 48 for (i = 0; i < tx->nb_ethports; i++) 49 rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]); 50 return 0; 51 } 52 53 struct rte_mbuf *m = ev.mbuf; 54 rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m); 55 tx->processed_pkts++; 56 57 return 0; 58 } 59 60 int 61 pipeline_test_result(struct evt_test *test, struct evt_options *opt) 62 { 63 RTE_SET_USED(opt); 64 int i; 65 uint64_t total = 0; 66 struct test_pipeline *t = evt_test_priv(test); 67 68 printf("Packet distribution across worker cores :\n"); 69 for (i = 0; i < t->nb_workers; i++) 70 total += t->worker[i].processed_pkts; 71 for (i = 0; i < t->nb_workers; i++) 72 printf("Worker %d packets: "CLGRN"%"PRIx64" "CLNRM"percentage:" 73 CLGRN" %3.2f\n"CLNRM, i, 74 t->worker[i].processed_pkts, 75 (((double)t->worker[i].processed_pkts)/total) 76 * 100); 77 return t->result; 78 } 79 80 void 81 pipeline_opt_dump(struct evt_options *opt, uint8_t nb_queues) 82 { 83 evt_dump("nb_worker_lcores", "%d", evt_nr_active_lcores(opt->wlcores)); 84 evt_dump_worker_lcores(opt); 85 evt_dump_nb_stages(opt); 86 evt_dump("nb_evdev_ports", "%d", pipeline_nb_event_ports(opt)); 87 evt_dump("nb_evdev_queues", "%d", nb_queues); 88 evt_dump_queue_priority(opt); 89 evt_dump_sched_type_list(opt); 90 evt_dump_producer_type(opt); 91 } 92 93 static inline uint64_t 94 processed_pkts(struct test_pipeline *t) 95 { 96 uint8_t i; 97 uint64_t total = 0; 98 99 rte_smp_rmb(); 100 if (t->mt_unsafe) 101 total = t->tx_service.processed_pkts; 102 else 103 for (i = 0; i < t->nb_workers; i++) 104 total += t->worker[i].processed_pkts; 105 106 return total; 107 } 108 109 int 110 pipeline_launch_lcores(struct evt_test *test, struct evt_options *opt, 111 int (*worker)(void *)) 112 { 113 int ret, lcore_id; 114 struct test_pipeline *t = evt_test_priv(test); 115 116 int port_idx = 0; 117 /* launch workers */ 118 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 119 if (!(opt->wlcores[lcore_id])) 120 continue; 121 122 ret = rte_eal_remote_launch(worker, 123 &t->worker[port_idx], lcore_id); 124 if (ret) { 125 evt_err("failed to launch worker %d", lcore_id); 126 return ret; 127 } 128 port_idx++; 129 } 130 131 uint64_t perf_cycles = rte_get_timer_cycles(); 132 const uint64_t perf_sample = rte_get_timer_hz(); 133 134 static float total_mpps; 135 static uint64_t samples; 136 137 uint64_t prev_pkts = 0; 138 139 while (t->done == false) { 140 const uint64_t new_cycles = rte_get_timer_cycles(); 141 142 if ((new_cycles - perf_cycles) > perf_sample) { 143 const uint64_t curr_pkts = processed_pkts(t); 144 145 float mpps = (float)(curr_pkts - prev_pkts)/1000000; 146 147 prev_pkts = curr_pkts; 148 perf_cycles = new_cycles; 149 total_mpps += mpps; 150 ++samples; 151 printf(CLGRN"\r%.3f mpps avg %.3f mpps"CLNRM, 152 mpps, total_mpps/samples); 153 fflush(stdout); 154 } 155 } 156 printf("\n"); 157 return 0; 158 } 159 160 int 161 pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues) 162 { 163 unsigned int lcores; 164 /* 165 * N worker + 1 master 166 */ 167 lcores = 2; 168 169 if (!rte_eth_dev_count_avail()) { 170 evt_err("test needs minimum 1 ethernet dev"); 171 return -1; 172 } 173 174 if (rte_lcore_count() < lcores) { 175 evt_err("test need minimum %d lcores", lcores); 176 return -1; 177 } 178 179 /* Validate worker lcores */ 180 if (evt_lcores_has_overlap(opt->wlcores, rte_get_master_lcore())) { 181 evt_err("worker lcores overlaps with master lcore"); 182 return -1; 183 } 184 if (evt_has_disabled_lcore(opt->wlcores)) { 185 evt_err("one or more workers lcores are not enabled"); 186 return -1; 187 } 188 if (!evt_has_active_lcore(opt->wlcores)) { 189 evt_err("minimum one worker is required"); 190 return -1; 191 } 192 193 if (nb_queues > EVT_MAX_QUEUES) { 194 evt_err("number of queues exceeds %d", EVT_MAX_QUEUES); 195 return -1; 196 } 197 if (pipeline_nb_event_ports(opt) > EVT_MAX_PORTS) { 198 evt_err("number of ports exceeds %d", EVT_MAX_PORTS); 199 return -1; 200 } 201 202 if (evt_has_invalid_stage(opt)) 203 return -1; 204 205 if (evt_has_invalid_sched_type(opt)) 206 return -1; 207 208 return 0; 209 } 210 211 #define NB_RX_DESC 128 212 #define NB_TX_DESC 512 213 int 214 pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt) 215 { 216 uint16_t i; 217 uint8_t nb_queues = 1; 218 uint8_t mt_state = 0; 219 struct test_pipeline *t = evt_test_priv(test); 220 struct rte_eth_rxconf rx_conf; 221 struct rte_eth_conf port_conf = { 222 .rxmode = { 223 .mq_mode = ETH_MQ_RX_RSS, 224 .max_rx_pkt_len = ETHER_MAX_LEN, 225 }, 226 .rx_adv_conf = { 227 .rss_conf = { 228 .rss_key = NULL, 229 .rss_hf = ETH_RSS_IP, 230 }, 231 }, 232 }; 233 234 RTE_SET_USED(opt); 235 if (!rte_eth_dev_count_avail()) { 236 evt_err("No ethernet ports found.\n"); 237 return -ENODEV; 238 } 239 240 RTE_ETH_FOREACH_DEV(i) { 241 struct rte_eth_dev_info dev_info; 242 struct rte_eth_conf local_port_conf = port_conf; 243 244 rte_eth_dev_info_get(i, &dev_info); 245 mt_state = !(dev_info.tx_offload_capa & 246 DEV_TX_OFFLOAD_MT_LOCKFREE); 247 rx_conf = dev_info.default_rxconf; 248 rx_conf.offloads = port_conf.rxmode.offloads; 249 250 local_port_conf.rx_adv_conf.rss_conf.rss_hf &= 251 dev_info.flow_type_rss_offloads; 252 if (local_port_conf.rx_adv_conf.rss_conf.rss_hf != 253 port_conf.rx_adv_conf.rss_conf.rss_hf) { 254 evt_info("Port %u modified RSS hash function based on hardware support," 255 "requested:%#"PRIx64" configured:%#"PRIx64"\n", 256 i, 257 port_conf.rx_adv_conf.rss_conf.rss_hf, 258 local_port_conf.rx_adv_conf.rss_conf.rss_hf); 259 } 260 261 if (rte_eth_dev_configure(i, nb_queues, nb_queues, 262 &local_port_conf) 263 < 0) { 264 evt_err("Failed to configure eth port [%d]\n", i); 265 return -EINVAL; 266 } 267 268 if (rte_eth_rx_queue_setup(i, 0, NB_RX_DESC, 269 rte_socket_id(), &rx_conf, t->pool) < 0) { 270 evt_err("Failed to setup eth port [%d] rx_queue: %d.\n", 271 i, 0); 272 return -EINVAL; 273 } 274 if (rte_eth_tx_queue_setup(i, 0, NB_TX_DESC, 275 rte_socket_id(), NULL) < 0) { 276 evt_err("Failed to setup eth port [%d] tx_queue: %d.\n", 277 i, 0); 278 return -EINVAL; 279 } 280 281 t->mt_unsafe |= mt_state; 282 t->tx_service.tx_buf[i] = 283 rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(BURST_SIZE), 0); 284 if (t->tx_service.tx_buf[i] == NULL) 285 rte_panic("Unable to allocate Tx buffer memory."); 286 rte_eth_promiscuous_enable(i); 287 } 288 289 return 0; 290 } 291 292 int 293 pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt, 294 uint8_t *queue_arr, uint8_t nb_queues, 295 const struct rte_event_port_conf p_conf) 296 { 297 int i; 298 int ret; 299 uint8_t port; 300 struct test_pipeline *t = evt_test_priv(test); 301 302 303 /* setup one port per worker, linking to all queues */ 304 for (port = 0; port < evt_nr_active_lcores(opt->wlcores); port++) { 305 struct worker_data *w = &t->worker[port]; 306 307 w->dev_id = opt->dev_id; 308 w->port_id = port; 309 w->t = t; 310 w->processed_pkts = 0; 311 312 ret = rte_event_port_setup(opt->dev_id, port, &p_conf); 313 if (ret) { 314 evt_err("failed to setup port %d", port); 315 return ret; 316 } 317 318 if (queue_arr == NULL) { 319 if (rte_event_port_link(opt->dev_id, port, NULL, NULL, 320 0) != nb_queues) 321 goto link_fail; 322 } else { 323 for (i = 0; i < nb_queues; i++) { 324 if (rte_event_port_link(opt->dev_id, port, 325 &queue_arr[i], NULL, 1) != 1) 326 goto link_fail; 327 } 328 } 329 } 330 331 return 0; 332 333 link_fail: 334 evt_err("failed to link all queues to port %d", port); 335 return -EINVAL; 336 } 337 338 int 339 pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride, 340 struct rte_event_port_conf prod_conf) 341 { 342 int ret = 0; 343 uint16_t prod; 344 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 345 346 memset(&queue_conf, 0, 347 sizeof(struct rte_event_eth_rx_adapter_queue_conf)); 348 queue_conf.ev.sched_type = opt->sched_type_list[0]; 349 RTE_ETH_FOREACH_DEV(prod) { 350 uint32_t cap; 351 352 ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id, 353 prod, &cap); 354 if (ret) { 355 evt_err("failed to get event rx adapter[%d]" 356 " capabilities", 357 opt->dev_id); 358 return ret; 359 } 360 queue_conf.ev.queue_id = prod * stride; 361 ret = rte_event_eth_rx_adapter_create(prod, opt->dev_id, 362 &prod_conf); 363 if (ret) { 364 evt_err("failed to create rx adapter[%d]", prod); 365 return ret; 366 } 367 ret = rte_event_eth_rx_adapter_queue_add(prod, prod, -1, 368 &queue_conf); 369 if (ret) { 370 evt_err("failed to add rx queues to adapter[%d]", prod); 371 return ret; 372 } 373 374 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 375 uint32_t service_id; 376 377 rte_event_eth_rx_adapter_service_id_get(prod, 378 &service_id); 379 ret = evt_service_setup(service_id); 380 if (ret) { 381 evt_err("Failed to setup service core" 382 " for Rx adapter\n"); 383 return ret; 384 } 385 } 386 387 ret = rte_eth_dev_start(prod); 388 if (ret) { 389 evt_err("Ethernet dev [%d] failed to start." 390 " Using synthetic producer", prod); 391 return ret; 392 } 393 394 ret = rte_event_eth_rx_adapter_start(prod); 395 if (ret) { 396 evt_err("Rx adapter[%d] start failed", prod); 397 return ret; 398 } 399 printf("%s: Port[%d] using Rx adapter[%d] started\n", __func__, 400 prod, prod); 401 } 402 403 return ret; 404 } 405 406 int 407 pipeline_event_tx_service_setup(struct evt_test *test, struct evt_options *opt, 408 uint8_t tx_queue_id, uint8_t tx_port_id, 409 const struct rte_event_port_conf p_conf) 410 { 411 int ret; 412 struct rte_service_spec serv; 413 struct test_pipeline *t = evt_test_priv(test); 414 struct tx_service_data *tx = &t->tx_service; 415 416 ret = rte_event_port_setup(opt->dev_id, tx_port_id, &p_conf); 417 if (ret) { 418 evt_err("failed to setup port %d", tx_port_id); 419 return ret; 420 } 421 422 if (rte_event_port_link(opt->dev_id, tx_port_id, &tx_queue_id, 423 NULL, 1) != 1) { 424 evt_err("failed to link queues to port %d", tx_port_id); 425 return -EINVAL; 426 } 427 428 tx->dev_id = opt->dev_id; 429 tx->queue_id = tx_queue_id; 430 tx->port_id = tx_port_id; 431 tx->nb_ethports = rte_eth_dev_count_avail(); 432 tx->t = t; 433 434 /* Register Tx service */ 435 memset(&serv, 0, sizeof(struct rte_service_spec)); 436 snprintf(serv.name, sizeof(serv.name), "Tx_service"); 437 438 if (evt_has_burst_mode(opt->dev_id)) 439 serv.callback = pipeline_event_tx_burst_service_func; 440 else 441 serv.callback = pipeline_event_tx_service_func; 442 443 serv.callback_userdata = (void *)tx; 444 ret = rte_service_component_register(&serv, &tx->service_id); 445 if (ret) { 446 evt_err("failed to register Tx service"); 447 return ret; 448 } 449 450 ret = evt_service_setup(tx->service_id); 451 if (ret) { 452 evt_err("Failed to setup service core for Tx service\n"); 453 return ret; 454 } 455 456 rte_service_runstate_set(tx->service_id, 1); 457 458 return 0; 459 } 460 461 462 void 463 pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt) 464 { 465 uint16_t i; 466 RTE_SET_USED(test); 467 RTE_SET_USED(opt); 468 struct test_pipeline *t = evt_test_priv(test); 469 470 if (t->mt_unsafe) { 471 rte_service_component_runstate_set(t->tx_service.service_id, 0); 472 rte_service_runstate_set(t->tx_service.service_id, 0); 473 rte_service_component_unregister(t->tx_service.service_id); 474 } 475 476 RTE_ETH_FOREACH_DEV(i) { 477 rte_event_eth_rx_adapter_stop(i); 478 rte_eth_dev_stop(i); 479 } 480 } 481 482 void 483 pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt) 484 { 485 RTE_SET_USED(test); 486 487 rte_event_dev_stop(opt->dev_id); 488 rte_event_dev_close(opt->dev_id); 489 } 490 491 int 492 pipeline_mempool_setup(struct evt_test *test, struct evt_options *opt) 493 { 494 struct test_pipeline *t = evt_test_priv(test); 495 496 t->pool = rte_pktmbuf_pool_create(test->name, /* mempool name */ 497 opt->pool_sz, /* number of elements*/ 498 512, /* cache size*/ 499 0, 500 RTE_MBUF_DEFAULT_BUF_SIZE, 501 opt->socket_id); /* flags */ 502 503 if (t->pool == NULL) { 504 evt_err("failed to create mempool"); 505 return -ENOMEM; 506 } 507 508 return 0; 509 } 510 511 void 512 pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt) 513 { 514 RTE_SET_USED(opt); 515 struct test_pipeline *t = evt_test_priv(test); 516 517 rte_mempool_free(t->pool); 518 } 519 520 int 521 pipeline_test_setup(struct evt_test *test, struct evt_options *opt) 522 { 523 void *test_pipeline; 524 525 test_pipeline = rte_zmalloc_socket(test->name, 526 sizeof(struct test_pipeline), RTE_CACHE_LINE_SIZE, 527 opt->socket_id); 528 if (test_pipeline == NULL) { 529 evt_err("failed to allocate test_pipeline memory"); 530 goto nomem; 531 } 532 test->test_priv = test_pipeline; 533 534 struct test_pipeline *t = evt_test_priv(test); 535 536 t->nb_workers = evt_nr_active_lcores(opt->wlcores); 537 t->outstand_pkts = opt->nb_pkts * evt_nr_active_lcores(opt->wlcores); 538 t->done = false; 539 t->nb_flows = opt->nb_flows; 540 t->result = EVT_TEST_FAILED; 541 t->opt = opt; 542 opt->prod_type = EVT_PROD_TYPE_ETH_RX_ADPTR; 543 memcpy(t->sched_type_list, opt->sched_type_list, 544 sizeof(opt->sched_type_list)); 545 return 0; 546 nomem: 547 return -ENOMEM; 548 } 549 550 void 551 pipeline_test_destroy(struct evt_test *test, struct evt_options *opt) 552 { 553 RTE_SET_USED(opt); 554 555 rte_free(test->test_priv); 556 } 557