1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2017 Cavium, Inc. 4 */ 5 6 #include "test_pipeline_common.h" 7 8 static int32_t 9 pipeline_event_tx_burst_service_func(void *args) 10 { 11 12 int i; 13 struct tx_service_data *tx = args; 14 const uint8_t dev = tx->dev_id; 15 const uint8_t port = tx->port_id; 16 struct rte_event ev[BURST_SIZE + 1]; 17 18 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0); 19 20 if (!nb_rx) { 21 for (i = 0; i < tx->nb_ethports; i++) 22 rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]); 23 return 0; 24 } 25 26 for (i = 0; i < nb_rx; i++) { 27 struct rte_mbuf *m = ev[i].mbuf; 28 rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m); 29 } 30 tx->processed_pkts += nb_rx; 31 32 return 0; 33 } 34 35 static int32_t 36 pipeline_event_tx_service_func(void *args) 37 { 38 39 int i; 40 struct tx_service_data *tx = args; 41 const uint8_t dev = tx->dev_id; 42 const uint8_t port = tx->port_id; 43 struct rte_event ev; 44 45 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, &ev, 1, 0); 46 47 if (!nb_rx) { 48 for (i = 0; i < tx->nb_ethports; i++) 49 rte_eth_tx_buffer_flush(i, 0, tx->tx_buf[i]); 50 return 0; 51 } 52 53 struct rte_mbuf *m = ev.mbuf; 54 rte_eth_tx_buffer(m->port, 0, tx->tx_buf[m->port], m); 55 tx->processed_pkts++; 56 57 return 0; 58 } 59 60 int 61 pipeline_test_result(struct evt_test *test, struct evt_options *opt) 62 { 63 RTE_SET_USED(opt); 64 int i; 65 uint64_t total = 0; 66 struct test_pipeline *t = evt_test_priv(test); 67 68 printf("Packet distribution across worker cores :\n"); 69 for (i = 0; i < t->nb_workers; i++) 70 total += t->worker[i].processed_pkts; 71 for (i = 0; i < t->nb_workers; i++) 72 printf("Worker %d packets: "CLGRN"%"PRIx64" "CLNRM"percentage:" 73 CLGRN" %3.2f\n"CLNRM, i, 74 t->worker[i].processed_pkts, 75 (((double)t->worker[i].processed_pkts)/total) 76 * 100); 77 return t->result; 78 } 79 80 void 81 pipeline_opt_dump(struct evt_options *opt, uint8_t nb_queues) 82 { 83 evt_dump("nb_worker_lcores", "%d", evt_nr_active_lcores(opt->wlcores)); 84 evt_dump_worker_lcores(opt); 85 evt_dump_nb_stages(opt); 86 evt_dump("nb_evdev_ports", "%d", pipeline_nb_event_ports(opt)); 87 evt_dump("nb_evdev_queues", "%d", nb_queues); 88 evt_dump_queue_priority(opt); 89 evt_dump_sched_type_list(opt); 90 evt_dump_producer_type(opt); 91 } 92 93 static inline uint64_t 94 processed_pkts(struct test_pipeline *t) 95 { 96 uint8_t i; 97 uint64_t total = 0; 98 99 rte_smp_rmb(); 100 if (t->mt_unsafe) 101 total = t->tx_service.processed_pkts; 102 else 103 for (i = 0; i < t->nb_workers; i++) 104 total += t->worker[i].processed_pkts; 105 106 return total; 107 } 108 109 int 110 pipeline_launch_lcores(struct evt_test *test, struct evt_options *opt, 111 int (*worker)(void *)) 112 { 113 int ret, lcore_id; 114 struct test_pipeline *t = evt_test_priv(test); 115 116 int port_idx = 0; 117 /* launch workers */ 118 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 119 if (!(opt->wlcores[lcore_id])) 120 continue; 121 122 ret = rte_eal_remote_launch(worker, 123 &t->worker[port_idx], lcore_id); 124 if (ret) { 125 evt_err("failed to launch worker %d", lcore_id); 126 return ret; 127 } 128 port_idx++; 129 } 130 131 uint64_t perf_cycles = rte_get_timer_cycles(); 132 const uint64_t perf_sample = rte_get_timer_hz(); 133 134 static float total_mpps; 135 static uint64_t samples; 136 137 uint64_t prev_pkts = 0; 138 139 while (t->done == false) { 140 const uint64_t new_cycles = rte_get_timer_cycles(); 141 142 if ((new_cycles - perf_cycles) > perf_sample) { 143 const uint64_t curr_pkts = processed_pkts(t); 144 145 float mpps = (float)(curr_pkts - prev_pkts)/1000000; 146 147 prev_pkts = curr_pkts; 148 perf_cycles = new_cycles; 149 total_mpps += mpps; 150 ++samples; 151 printf(CLGRN"\r%.3f mpps avg %.3f mpps"CLNRM, 152 mpps, total_mpps/samples); 153 fflush(stdout); 154 } 155 } 156 printf("\n"); 157 return 0; 158 } 159 160 int 161 pipeline_opt_check(struct evt_options *opt, uint64_t nb_queues) 162 { 163 unsigned int lcores; 164 /* 165 * N worker + 1 master 166 */ 167 lcores = 2; 168 169 if (!rte_eth_dev_count_avail()) { 170 evt_err("test needs minimum 1 ethernet dev"); 171 return -1; 172 } 173 174 if (rte_lcore_count() < lcores) { 175 evt_err("test need minimum %d lcores", lcores); 176 return -1; 177 } 178 179 /* Validate worker lcores */ 180 if (evt_lcores_has_overlap(opt->wlcores, rte_get_master_lcore())) { 181 evt_err("worker lcores overlaps with master lcore"); 182 return -1; 183 } 184 if (evt_has_disabled_lcore(opt->wlcores)) { 185 evt_err("one or more workers lcores are not enabled"); 186 return -1; 187 } 188 if (!evt_has_active_lcore(opt->wlcores)) { 189 evt_err("minimum one worker is required"); 190 return -1; 191 } 192 193 if (nb_queues > EVT_MAX_QUEUES) { 194 evt_err("number of queues exceeds %d", EVT_MAX_QUEUES); 195 return -1; 196 } 197 if (pipeline_nb_event_ports(opt) > EVT_MAX_PORTS) { 198 evt_err("number of ports exceeds %d", EVT_MAX_PORTS); 199 return -1; 200 } 201 202 if (evt_has_invalid_stage(opt)) 203 return -1; 204 205 if (evt_has_invalid_sched_type(opt)) 206 return -1; 207 208 return 0; 209 } 210 211 #define NB_RX_DESC 128 212 #define NB_TX_DESC 512 213 int 214 pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt) 215 { 216 uint16_t i; 217 uint8_t nb_queues = 1; 218 uint8_t mt_state = 0; 219 struct test_pipeline *t = evt_test_priv(test); 220 struct rte_eth_rxconf rx_conf; 221 struct rte_eth_conf port_conf = { 222 .rxmode = { 223 .mq_mode = ETH_MQ_RX_RSS, 224 .max_rx_pkt_len = ETHER_MAX_LEN, 225 .offloads = DEV_RX_OFFLOAD_CRC_STRIP, 226 .ignore_offload_bitfield = 1, 227 }, 228 .rx_adv_conf = { 229 .rss_conf = { 230 .rss_key = NULL, 231 .rss_hf = ETH_RSS_IP, 232 }, 233 }, 234 }; 235 236 RTE_SET_USED(opt); 237 if (!rte_eth_dev_count_avail()) { 238 evt_err("No ethernet ports found.\n"); 239 return -ENODEV; 240 } 241 242 RTE_ETH_FOREACH_DEV(i) { 243 struct rte_eth_dev_info dev_info; 244 245 memset(&dev_info, 0, sizeof(struct rte_eth_dev_info)); 246 rte_eth_dev_info_get(i, &dev_info); 247 mt_state = !(dev_info.tx_offload_capa & 248 DEV_TX_OFFLOAD_MT_LOCKFREE); 249 rx_conf = dev_info.default_rxconf; 250 rx_conf.offloads = port_conf.rxmode.offloads; 251 252 if (rte_eth_dev_configure(i, nb_queues, nb_queues, 253 &port_conf) 254 < 0) { 255 evt_err("Failed to configure eth port [%d]\n", i); 256 return -EINVAL; 257 } 258 259 if (rte_eth_rx_queue_setup(i, 0, NB_RX_DESC, 260 rte_socket_id(), &rx_conf, t->pool) < 0) { 261 evt_err("Failed to setup eth port [%d] rx_queue: %d.\n", 262 i, 0); 263 return -EINVAL; 264 } 265 if (rte_eth_tx_queue_setup(i, 0, NB_TX_DESC, 266 rte_socket_id(), NULL) < 0) { 267 evt_err("Failed to setup eth port [%d] tx_queue: %d.\n", 268 i, 0); 269 return -EINVAL; 270 } 271 272 t->mt_unsafe |= mt_state; 273 t->tx_service.tx_buf[i] = 274 rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(BURST_SIZE), 0); 275 if (t->tx_service.tx_buf[i] == NULL) 276 rte_panic("Unable to allocate Tx buffer memory."); 277 rte_eth_promiscuous_enable(i); 278 } 279 280 return 0; 281 } 282 283 int 284 pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt, 285 uint8_t *queue_arr, uint8_t nb_queues, 286 const struct rte_event_port_conf p_conf) 287 { 288 int i; 289 int ret; 290 uint8_t port; 291 struct test_pipeline *t = evt_test_priv(test); 292 293 294 /* setup one port per worker, linking to all queues */ 295 for (port = 0; port < evt_nr_active_lcores(opt->wlcores); port++) { 296 struct worker_data *w = &t->worker[port]; 297 298 w->dev_id = opt->dev_id; 299 w->port_id = port; 300 w->t = t; 301 w->processed_pkts = 0; 302 303 ret = rte_event_port_setup(opt->dev_id, port, &p_conf); 304 if (ret) { 305 evt_err("failed to setup port %d", port); 306 return ret; 307 } 308 309 if (queue_arr == NULL) { 310 if (rte_event_port_link(opt->dev_id, port, NULL, NULL, 311 0) != nb_queues) 312 goto link_fail; 313 } else { 314 for (i = 0; i < nb_queues; i++) { 315 if (rte_event_port_link(opt->dev_id, port, 316 &queue_arr[i], NULL, 1) != 1) 317 goto link_fail; 318 } 319 } 320 } 321 322 return 0; 323 324 link_fail: 325 evt_err("failed to link all queues to port %d", port); 326 return -EINVAL; 327 } 328 329 int 330 pipeline_event_rx_adapter_setup(struct evt_options *opt, uint8_t stride, 331 struct rte_event_port_conf prod_conf) 332 { 333 int ret = 0; 334 uint16_t prod; 335 struct rte_event_eth_rx_adapter_queue_conf queue_conf; 336 337 memset(&queue_conf, 0, 338 sizeof(struct rte_event_eth_rx_adapter_queue_conf)); 339 queue_conf.ev.sched_type = opt->sched_type_list[0]; 340 RTE_ETH_FOREACH_DEV(prod) { 341 uint32_t cap; 342 343 ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id, 344 prod, &cap); 345 if (ret) { 346 evt_err("failed to get event rx adapter[%d]" 347 " capabilities", 348 opt->dev_id); 349 return ret; 350 } 351 queue_conf.ev.queue_id = prod * stride; 352 ret = rte_event_eth_rx_adapter_create(prod, opt->dev_id, 353 &prod_conf); 354 if (ret) { 355 evt_err("failed to create rx adapter[%d]", prod); 356 return ret; 357 } 358 ret = rte_event_eth_rx_adapter_queue_add(prod, prod, -1, 359 &queue_conf); 360 if (ret) { 361 evt_err("failed to add rx queues to adapter[%d]", prod); 362 return ret; 363 } 364 365 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { 366 uint32_t service_id; 367 368 rte_event_eth_rx_adapter_service_id_get(prod, 369 &service_id); 370 ret = evt_service_setup(service_id); 371 if (ret) { 372 evt_err("Failed to setup service core" 373 " for Rx adapter\n"); 374 return ret; 375 } 376 } 377 378 ret = rte_eth_dev_start(prod); 379 if (ret) { 380 evt_err("Ethernet dev [%d] failed to start." 381 " Using synthetic producer", prod); 382 return ret; 383 } 384 385 ret = rte_event_eth_rx_adapter_start(prod); 386 if (ret) { 387 evt_err("Rx adapter[%d] start failed", prod); 388 return ret; 389 } 390 printf("%s: Port[%d] using Rx adapter[%d] started\n", __func__, 391 prod, prod); 392 } 393 394 return ret; 395 } 396 397 int 398 pipeline_event_tx_service_setup(struct evt_test *test, struct evt_options *opt, 399 uint8_t tx_queue_id, uint8_t tx_port_id, 400 const struct rte_event_port_conf p_conf) 401 { 402 int ret; 403 struct rte_service_spec serv; 404 struct test_pipeline *t = evt_test_priv(test); 405 struct tx_service_data *tx = &t->tx_service; 406 407 ret = rte_event_port_setup(opt->dev_id, tx_port_id, &p_conf); 408 if (ret) { 409 evt_err("failed to setup port %d", tx_port_id); 410 return ret; 411 } 412 413 if (rte_event_port_link(opt->dev_id, tx_port_id, &tx_queue_id, 414 NULL, 1) != 1) { 415 evt_err("failed to link queues to port %d", tx_port_id); 416 return -EINVAL; 417 } 418 419 tx->dev_id = opt->dev_id; 420 tx->queue_id = tx_queue_id; 421 tx->port_id = tx_port_id; 422 tx->nb_ethports = rte_eth_dev_count_avail(); 423 tx->t = t; 424 425 /* Register Tx service */ 426 memset(&serv, 0, sizeof(struct rte_service_spec)); 427 snprintf(serv.name, sizeof(serv.name), "Tx_service"); 428 429 if (evt_has_burst_mode(opt->dev_id)) 430 serv.callback = pipeline_event_tx_burst_service_func; 431 else 432 serv.callback = pipeline_event_tx_service_func; 433 434 serv.callback_userdata = (void *)tx; 435 ret = rte_service_component_register(&serv, &tx->service_id); 436 if (ret) { 437 evt_err("failed to register Tx service"); 438 return ret; 439 } 440 441 ret = evt_service_setup(tx->service_id); 442 if (ret) { 443 evt_err("Failed to setup service core for Tx service\n"); 444 return ret; 445 } 446 447 rte_service_runstate_set(tx->service_id, 1); 448 449 return 0; 450 } 451 452 453 void 454 pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt) 455 { 456 uint16_t i; 457 RTE_SET_USED(test); 458 RTE_SET_USED(opt); 459 struct test_pipeline *t = evt_test_priv(test); 460 461 if (t->mt_unsafe) { 462 rte_service_component_runstate_set(t->tx_service.service_id, 0); 463 rte_service_runstate_set(t->tx_service.service_id, 0); 464 rte_service_component_unregister(t->tx_service.service_id); 465 } 466 467 RTE_ETH_FOREACH_DEV(i) { 468 rte_event_eth_rx_adapter_stop(i); 469 rte_eth_dev_stop(i); 470 rte_eth_dev_close(i); 471 } 472 } 473 474 void 475 pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt) 476 { 477 RTE_SET_USED(test); 478 479 rte_event_dev_stop(opt->dev_id); 480 rte_event_dev_close(opt->dev_id); 481 } 482 483 int 484 pipeline_mempool_setup(struct evt_test *test, struct evt_options *opt) 485 { 486 struct test_pipeline *t = evt_test_priv(test); 487 488 t->pool = rte_pktmbuf_pool_create(test->name, /* mempool name */ 489 opt->pool_sz, /* number of elements*/ 490 512, /* cache size*/ 491 0, 492 RTE_MBUF_DEFAULT_BUF_SIZE, 493 opt->socket_id); /* flags */ 494 495 if (t->pool == NULL) { 496 evt_err("failed to create mempool"); 497 return -ENOMEM; 498 } 499 500 return 0; 501 } 502 503 void 504 pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt) 505 { 506 RTE_SET_USED(opt); 507 struct test_pipeline *t = evt_test_priv(test); 508 509 rte_mempool_free(t->pool); 510 } 511 512 int 513 pipeline_test_setup(struct evt_test *test, struct evt_options *opt) 514 { 515 void *test_pipeline; 516 517 test_pipeline = rte_zmalloc_socket(test->name, 518 sizeof(struct test_pipeline), RTE_CACHE_LINE_SIZE, 519 opt->socket_id); 520 if (test_pipeline == NULL) { 521 evt_err("failed to allocate test_pipeline memory"); 522 goto nomem; 523 } 524 test->test_priv = test_pipeline; 525 526 struct test_pipeline *t = evt_test_priv(test); 527 528 t->nb_workers = evt_nr_active_lcores(opt->wlcores); 529 t->outstand_pkts = opt->nb_pkts * evt_nr_active_lcores(opt->wlcores); 530 t->done = false; 531 t->nb_flows = opt->nb_flows; 532 t->result = EVT_TEST_FAILED; 533 t->opt = opt; 534 opt->prod_type = EVT_PROD_TYPE_ETH_RX_ADPTR; 535 memcpy(t->sched_type_list, opt->sched_type_list, 536 sizeof(opt->sched_type_list)); 537 return 0; 538 nomem: 539 return -ENOMEM; 540 } 541 542 void 543 pipeline_test_destroy(struct evt_test *test, struct evt_options *opt) 544 { 545 RTE_SET_USED(opt); 546 547 rte_free(test->test_priv); 548 } 549