1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2019 Intel Corporation 3 */ 4 5 #include <string.h> 6 #include <stdint.h> 7 8 #include <rte_mbuf.h> 9 #include <rte_malloc.h> 10 11 #include "rte_port_eventdev.h" 12 13 #include "port_log.h" 14 15 /* 16 * Port EVENTDEV Reader 17 */ 18 #ifdef RTE_PORT_STATS_COLLECT 19 20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \ 21 do {port->stats.n_pkts_in += val;} while (0) 22 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \ 23 do {port->stats.n_pkts_drop += val;} while (0) 24 25 #else 26 27 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) 28 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) 29 30 #endif 31 32 struct rte_port_eventdev_reader { 33 struct rte_port_in_stats stats; 34 35 uint8_t eventdev_id; 36 uint16_t port_id; 37 38 struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX]; 39 }; 40 41 static void * 42 rte_port_eventdev_reader_create(void *params, int socket_id) 43 { 44 struct rte_port_eventdev_reader_params *conf = 45 params; 46 struct rte_port_eventdev_reader *port; 47 48 /* Check input parameters */ 49 if (conf == NULL) { 50 PORT_LOG(ERR, "%s: params is NULL", __func__); 51 return NULL; 52 } 53 54 /* Memory allocation */ 55 port = rte_zmalloc_socket("PORT", sizeof(*port), 56 RTE_CACHE_LINE_SIZE, socket_id); 57 if (port == NULL) { 58 PORT_LOG(ERR, "%s: Failed to allocate port", __func__); 59 return NULL; 60 } 61 62 /* Initialization */ 63 port->eventdev_id = conf->eventdev_id; 64 port->port_id = conf->port_id; 65 66 return port; 67 } 68 69 static int 70 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts) 71 { 72 struct rte_port_eventdev_reader *p = port; 73 uint16_t rx_evts_cnt, i; 74 75 rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id, 76 p->ev, n_pkts, 0); 77 78 for (i = 0; i < rx_evts_cnt; i++) 79 pkts[i] = p->ev[i].mbuf; 80 81 RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt); 82 83 return rx_evts_cnt; 84 } 85 86 static int 87 rte_port_eventdev_reader_free(void *port) 88 { 89 if (port == NULL) { 90 PORT_LOG(ERR, "%s: port is NULL", __func__); 91 return -EINVAL; 92 } 93 94 rte_free(port); 95 96 return 0; 97 } 98 99 static int rte_port_eventdev_reader_stats_read(void *port, 100 struct rte_port_in_stats *stats, int clear) 101 { 102 struct rte_port_eventdev_reader *p = 103 port; 104 105 if (stats != NULL) 106 memcpy(stats, &p->stats, sizeof(p->stats)); 107 108 if (clear) 109 memset(&p->stats, 0, sizeof(p->stats)); 110 111 return 0; 112 } 113 114 /* 115 * Port EVENTDEV Writer 116 */ 117 #ifdef RTE_PORT_STATS_COLLECT 118 119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \ 120 do {port->stats.n_pkts_in += val;} while (0) 121 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \ 122 do {port->stats.n_pkts_drop += val;} while (0) 123 124 #else 125 126 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) 127 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) 128 129 #endif 130 131 struct rte_port_eventdev_writer { 132 struct rte_port_out_stats stats; 133 134 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX]; 135 136 uint32_t enq_burst_sz; 137 uint32_t enq_buf_count; 138 uint64_t bsz_mask; 139 140 uint8_t eventdev_id; 141 uint8_t port_id; 142 uint8_t queue_id; 143 uint8_t sched_type; 144 uint8_t evt_op; 145 }; 146 147 static void * 148 rte_port_eventdev_writer_create(void *params, int socket_id) 149 { 150 struct rte_port_eventdev_writer_params *conf = 151 params; 152 struct rte_port_eventdev_writer *port; 153 unsigned int i; 154 155 /* Check input parameters */ 156 if ((conf == NULL) || 157 (conf->enq_burst_sz == 0) || 158 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) || 159 (!rte_is_power_of_2(conf->enq_burst_sz))) { 160 PORT_LOG(ERR, "%s: Invalid input parameters", __func__); 161 return NULL; 162 } 163 164 /* Memory allocation */ 165 port = rte_zmalloc_socket("PORT", sizeof(*port), 166 RTE_CACHE_LINE_SIZE, socket_id); 167 if (port == NULL) { 168 PORT_LOG(ERR, "%s: Failed to allocate port", __func__); 169 return NULL; 170 } 171 172 /* Initialization */ 173 port->enq_burst_sz = conf->enq_burst_sz; 174 port->enq_buf_count = 0; 175 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1); 176 177 port->eventdev_id = conf->eventdev_id; 178 port->port_id = conf->port_id; 179 port->queue_id = conf->queue_id; 180 port->sched_type = conf->sched_type; 181 port->evt_op = conf->evt_op; 182 memset(&port->ev, 0, sizeof(port->ev)); 183 184 for (i = 0; i < RTE_DIM(port->ev); i++) { 185 port->ev[i].queue_id = port->queue_id; 186 port->ev[i].sched_type = port->sched_type; 187 port->ev[i].op = port->evt_op; 188 } 189 190 return port; 191 } 192 193 static inline void 194 send_burst(struct rte_port_eventdev_writer *p) 195 { 196 uint32_t nb_enq; 197 198 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 199 p->ev, p->enq_buf_count); 200 201 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count - 202 nb_enq); 203 204 for (; nb_enq < p->enq_buf_count; nb_enq++) 205 rte_pktmbuf_free(p->ev[nb_enq].mbuf); 206 207 p->enq_buf_count = 0; 208 } 209 210 static int 211 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt) 212 { 213 struct rte_port_eventdev_writer *p = port; 214 215 p->ev[p->enq_buf_count++].mbuf = pkt; 216 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1); 217 if (p->enq_buf_count >= p->enq_burst_sz) 218 send_burst(p); 219 220 return 0; 221 } 222 223 static int 224 rte_port_eventdev_writer_tx_bulk(void *port, 225 struct rte_mbuf **pkts, 226 uint64_t pkts_mask) 227 { 228 struct rte_port_eventdev_writer *p = 229 port; 230 uint64_t bsz_mask = p->bsz_mask; 231 uint32_t enq_buf_count = p->enq_buf_count; 232 uint64_t expr = (pkts_mask & (pkts_mask + 1)) | 233 ((pkts_mask & bsz_mask) ^ bsz_mask); 234 235 if (expr == 0) { 236 uint64_t n_pkts = rte_popcount64(pkts_mask); 237 uint32_t i, n_enq_ok; 238 239 if (enq_buf_count) 240 send_burst(p); 241 242 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts); 243 244 struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {}; 245 for (i = 0; i < n_pkts; i++) { 246 events[i].mbuf = pkts[i]; 247 events[i].queue_id = p->queue_id; 248 events[i].sched_type = p->sched_type; 249 events[i].op = p->evt_op; 250 } 251 252 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 253 events, n_pkts); 254 255 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, 256 n_pkts - n_enq_ok); 257 for (; n_enq_ok < n_pkts; n_enq_ok++) 258 rte_pktmbuf_free(pkts[n_enq_ok]); 259 260 } else { 261 for (; pkts_mask;) { 262 uint32_t pkt_index = rte_ctz64(pkts_mask); 263 uint64_t pkt_mask = 1LLU << pkt_index; 264 265 p->ev[enq_buf_count++].mbuf = pkts[pkt_index]; 266 267 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1); 268 pkts_mask &= ~pkt_mask; 269 } 270 271 p->enq_buf_count = enq_buf_count; 272 if (enq_buf_count >= p->enq_burst_sz) 273 send_burst(p); 274 } 275 276 return 0; 277 } 278 279 static int 280 rte_port_eventdev_writer_flush(void *port) 281 { 282 struct rte_port_eventdev_writer *p = 283 port; 284 285 if (p->enq_buf_count > 0) 286 send_burst(p); 287 288 return 0; 289 } 290 291 static int 292 rte_port_eventdev_writer_free(void *port) 293 { 294 if (port == NULL) { 295 PORT_LOG(ERR, "%s: Port is NULL", __func__); 296 return -EINVAL; 297 } 298 299 rte_port_eventdev_writer_flush(port); 300 rte_free(port); 301 302 return 0; 303 } 304 305 static int rte_port_eventdev_writer_stats_read(void *port, 306 struct rte_port_out_stats *stats, int clear) 307 { 308 struct rte_port_eventdev_writer *p = 309 port; 310 311 if (stats != NULL) 312 memcpy(stats, &p->stats, sizeof(p->stats)); 313 314 if (clear) 315 memset(&p->stats, 0, sizeof(p->stats)); 316 317 return 0; 318 } 319 320 /* 321 * Port EVENTDEV Writer Nodrop 322 */ 323 #ifdef RTE_PORT_STATS_COLLECT 324 325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \ 326 do {port->stats.n_pkts_in += val;} while (0) 327 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \ 328 do {port->stats.n_pkts_drop += val;} while (0) 329 330 #else 331 332 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) 333 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) 334 335 #endif 336 337 struct rte_port_eventdev_writer_nodrop { 338 struct rte_port_out_stats stats; 339 340 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX]; 341 342 uint32_t enq_burst_sz; 343 uint32_t enq_buf_count; 344 uint64_t bsz_mask; 345 uint64_t n_retries; 346 uint8_t eventdev_id; 347 uint8_t port_id; 348 uint8_t queue_id; 349 uint8_t sched_type; 350 uint8_t evt_op; 351 }; 352 353 354 static void * 355 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id) 356 { 357 struct rte_port_eventdev_writer_nodrop_params *conf = 358 params; 359 struct rte_port_eventdev_writer_nodrop *port; 360 unsigned int i; 361 362 /* Check input parameters */ 363 if ((conf == NULL) || 364 (conf->enq_burst_sz == 0) || 365 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) || 366 (!rte_is_power_of_2(conf->enq_burst_sz))) { 367 PORT_LOG(ERR, "%s: Invalid input parameters", __func__); 368 return NULL; 369 } 370 371 /* Memory allocation */ 372 port = rte_zmalloc_socket("PORT", sizeof(*port), 373 RTE_CACHE_LINE_SIZE, socket_id); 374 if (port == NULL) { 375 PORT_LOG(ERR, "%s: Failed to allocate port", __func__); 376 return NULL; 377 } 378 379 /* Initialization */ 380 port->enq_burst_sz = conf->enq_burst_sz; 381 port->enq_buf_count = 0; 382 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1); 383 384 port->eventdev_id = conf->eventdev_id; 385 port->port_id = conf->port_id; 386 port->queue_id = conf->queue_id; 387 port->sched_type = conf->sched_type; 388 port->evt_op = conf->evt_op; 389 memset(&port->ev, 0, sizeof(port->ev)); 390 391 for (i = 0; i < RTE_DIM(port->ev); i++) { 392 port->ev[i].queue_id = port->queue_id; 393 port->ev[i].sched_type = port->sched_type; 394 port->ev[i].op = port->evt_op; 395 } 396 /* 397 * When n_retries is 0 it means that we should wait for every event to 398 * send no matter how many retries should it take. To limit number of 399 * branches in fast path, we use UINT64_MAX instead of branching. 400 */ 401 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries; 402 403 return port; 404 } 405 406 static inline void 407 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p) 408 { 409 uint32_t nb_enq, i; 410 411 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 412 p->ev, p->enq_buf_count); 413 414 /* We sent all the packets in a first try */ 415 if (nb_enq >= p->enq_buf_count) { 416 p->enq_buf_count = 0; 417 return; 418 } 419 420 for (i = 0; i < p->n_retries; i++) { 421 nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id, 422 p->ev + nb_enq, 423 p->enq_buf_count - nb_enq); 424 425 /* We sent all the events in more than one try */ 426 if (nb_enq >= p->enq_buf_count) { 427 p->enq_buf_count = 0; 428 return; 429 } 430 } 431 /* We didn't send the events in maximum allowed attempts */ 432 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, 433 p->enq_buf_count - nb_enq); 434 for (; nb_enq < p->enq_buf_count; nb_enq++) 435 rte_pktmbuf_free(p->ev[nb_enq].mbuf); 436 437 p->enq_buf_count = 0; 438 } 439 440 static int 441 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt) 442 { 443 struct rte_port_eventdev_writer_nodrop *p = port; 444 445 p->ev[p->enq_buf_count++].mbuf = pkt; 446 447 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1); 448 if (p->enq_buf_count >= p->enq_burst_sz) 449 send_burst_nodrop(p); 450 451 return 0; 452 } 453 454 static int 455 rte_port_eventdev_writer_nodrop_tx_bulk(void *port, 456 struct rte_mbuf **pkts, 457 uint64_t pkts_mask) 458 { 459 struct rte_port_eventdev_writer_nodrop *p = 460 port; 461 462 uint64_t bsz_mask = p->bsz_mask; 463 uint32_t enq_buf_count = p->enq_buf_count; 464 uint64_t expr = (pkts_mask & (pkts_mask + 1)) | 465 ((pkts_mask & bsz_mask) ^ bsz_mask); 466 467 if (expr == 0) { 468 uint64_t n_pkts = rte_popcount64(pkts_mask); 469 uint32_t i, n_enq_ok; 470 471 if (enq_buf_count) 472 send_burst_nodrop(p); 473 474 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts); 475 476 struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {}; 477 478 for (i = 0; i < n_pkts; i++) { 479 events[i].mbuf = pkts[i]; 480 events[i].queue_id = p->queue_id; 481 events[i].sched_type = p->sched_type; 482 events[i].op = p->evt_op; 483 } 484 485 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 486 events, n_pkts); 487 488 if (n_enq_ok >= n_pkts) 489 return 0; 490 491 /* 492 * If we did not manage to enqueue all events in single burst, 493 * move remaining events to the buffer and call send burst. 494 */ 495 for (; n_enq_ok < n_pkts; n_enq_ok++) { 496 struct rte_mbuf *pkt = pkts[n_enq_ok]; 497 p->ev[p->enq_buf_count++].mbuf = pkt; 498 } 499 send_burst_nodrop(p); 500 } else { 501 for (; pkts_mask;) { 502 uint32_t pkt_index = rte_ctz64(pkts_mask); 503 uint64_t pkt_mask = 1LLU << pkt_index; 504 505 p->ev[enq_buf_count++].mbuf = pkts[pkt_index]; 506 507 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1); 508 pkts_mask &= ~pkt_mask; 509 } 510 511 p->enq_buf_count = enq_buf_count; 512 if (enq_buf_count >= p->enq_burst_sz) 513 send_burst_nodrop(p); 514 } 515 516 return 0; 517 } 518 519 static int 520 rte_port_eventdev_writer_nodrop_flush(void *port) 521 { 522 struct rte_port_eventdev_writer_nodrop *p = 523 port; 524 525 if (p->enq_buf_count > 0) 526 send_burst_nodrop(p); 527 528 return 0; 529 } 530 531 static int 532 rte_port_eventdev_writer_nodrop_free(void *port) 533 { 534 if (port == NULL) { 535 PORT_LOG(ERR, "%s: Port is NULL", __func__); 536 return -EINVAL; 537 } 538 539 rte_port_eventdev_writer_nodrop_flush(port); 540 rte_free(port); 541 542 return 0; 543 } 544 545 static int rte_port_eventdev_writer_nodrop_stats_read(void *port, 546 struct rte_port_out_stats *stats, int clear) 547 { 548 struct rte_port_eventdev_writer_nodrop *p = 549 port; 550 551 if (stats != NULL) 552 memcpy(stats, &p->stats, sizeof(p->stats)); 553 554 if (clear) 555 memset(&p->stats, 0, sizeof(p->stats)); 556 557 return 0; 558 } 559 560 /* 561 * Summary of port operations 562 */ 563 struct rte_port_in_ops rte_port_eventdev_reader_ops = { 564 .f_create = rte_port_eventdev_reader_create, 565 .f_free = rte_port_eventdev_reader_free, 566 .f_rx = rte_port_eventdev_reader_rx, 567 .f_stats = rte_port_eventdev_reader_stats_read, 568 }; 569 570 struct rte_port_out_ops rte_port_eventdev_writer_ops = { 571 .f_create = rte_port_eventdev_writer_create, 572 .f_free = rte_port_eventdev_writer_free, 573 .f_tx = rte_port_eventdev_writer_tx, 574 .f_tx_bulk = rte_port_eventdev_writer_tx_bulk, 575 .f_flush = rte_port_eventdev_writer_flush, 576 .f_stats = rte_port_eventdev_writer_stats_read, 577 }; 578 579 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = { 580 .f_create = rte_port_eventdev_writer_nodrop_create, 581 .f_free = rte_port_eventdev_writer_nodrop_free, 582 .f_tx = rte_port_eventdev_writer_nodrop_tx, 583 .f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk, 584 .f_flush = rte_port_eventdev_writer_nodrop_flush, 585 .f_stats = rte_port_eventdev_writer_nodrop_stats_read, 586 }; 587