1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2019 Intel Corporation 3 */ 4 5 #include <string.h> 6 #include <stdint.h> 7 8 #include <rte_mbuf.h> 9 #include <rte_malloc.h> 10 11 #include "rte_port_eventdev.h" 12 13 /* 14 * Port EVENTDEV Reader 15 */ 16 #ifdef RTE_PORT_STATS_COLLECT 17 18 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) \ 19 do {port->stats.n_pkts_in += val;} while (0) 20 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) \ 21 do {port->stats.n_pkts_drop += val;} while (0) 22 23 #else 24 25 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(port, val) 26 #define RTE_PORT_EVENTDEV_READER_STATS_PKTS_DROP_ADD(port, val) 27 28 #endif 29 30 struct rte_port_eventdev_reader { 31 struct rte_port_in_stats stats; 32 33 uint8_t eventdev_id; 34 uint16_t port_id; 35 36 struct rte_event ev[RTE_PORT_IN_BURST_SIZE_MAX]; 37 }; 38 39 static void * 40 rte_port_eventdev_reader_create(void *params, int socket_id) 41 { 42 struct rte_port_eventdev_reader_params *conf = 43 params; 44 struct rte_port_eventdev_reader *port; 45 46 /* Check input parameters */ 47 if (conf == NULL) { 48 RTE_LOG(ERR, PORT, "%s: params is NULL\n", __func__); 49 return NULL; 50 } 51 52 /* Memory allocation */ 53 port = rte_zmalloc_socket("PORT", sizeof(*port), 54 RTE_CACHE_LINE_SIZE, socket_id); 55 if (port == NULL) { 56 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__); 57 return NULL; 58 } 59 60 /* Initialization */ 61 port->eventdev_id = conf->eventdev_id; 62 port->port_id = conf->port_id; 63 64 return port; 65 } 66 67 static int 68 rte_port_eventdev_reader_rx(void *port, struct rte_mbuf **pkts, uint32_t n_pkts) 69 { 70 struct rte_port_eventdev_reader *p = port; 71 uint16_t rx_evts_cnt, i; 72 73 rx_evts_cnt = rte_event_dequeue_burst(p->eventdev_id, p->port_id, 74 p->ev, n_pkts, 0); 75 76 for (i = 0; i < rx_evts_cnt; i++) 77 pkts[i] = p->ev[i].mbuf; 78 79 RTE_PORT_EVENTDEV_READER_STATS_PKTS_IN_ADD(p, rx_evts_cnt); 80 81 return rx_evts_cnt; 82 } 83 84 static int 85 rte_port_eventdev_reader_free(void *port) 86 { 87 if (port == NULL) { 88 RTE_LOG(ERR, PORT, "%s: port is NULL\n", __func__); 89 return -EINVAL; 90 } 91 92 rte_free(port); 93 94 return 0; 95 } 96 97 static int rte_port_eventdev_reader_stats_read(void *port, 98 struct rte_port_in_stats *stats, int clear) 99 { 100 struct rte_port_eventdev_reader *p = 101 port; 102 103 if (stats != NULL) 104 memcpy(stats, &p->stats, sizeof(p->stats)); 105 106 if (clear) 107 memset(&p->stats, 0, sizeof(p->stats)); 108 109 return 0; 110 } 111 112 /* 113 * Port EVENTDEV Writer 114 */ 115 #ifdef RTE_PORT_STATS_COLLECT 116 117 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) \ 118 do {port->stats.n_pkts_in += val;} while (0) 119 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) \ 120 do {port->stats.n_pkts_drop += val;} while (0) 121 122 #else 123 124 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(port, val) 125 #define RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(port, val) 126 127 #endif 128 129 struct rte_port_eventdev_writer { 130 struct rte_port_out_stats stats; 131 132 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX]; 133 134 uint32_t enq_burst_sz; 135 uint32_t enq_buf_count; 136 uint64_t bsz_mask; 137 138 uint8_t eventdev_id; 139 uint8_t port_id; 140 uint8_t queue_id; 141 uint8_t sched_type; 142 uint8_t evt_op; 143 }; 144 145 static void * 146 rte_port_eventdev_writer_create(void *params, int socket_id) 147 { 148 struct rte_port_eventdev_writer_params *conf = 149 params; 150 struct rte_port_eventdev_writer *port; 151 unsigned int i; 152 153 /* Check input parameters */ 154 if ((conf == NULL) || 155 (conf->enq_burst_sz == 0) || 156 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) || 157 (!rte_is_power_of_2(conf->enq_burst_sz))) { 158 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__); 159 return NULL; 160 } 161 162 /* Memory allocation */ 163 port = rte_zmalloc_socket("PORT", sizeof(*port), 164 RTE_CACHE_LINE_SIZE, socket_id); 165 if (port == NULL) { 166 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__); 167 return NULL; 168 } 169 170 /* Initialization */ 171 port->enq_burst_sz = conf->enq_burst_sz; 172 port->enq_buf_count = 0; 173 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1); 174 175 port->eventdev_id = conf->eventdev_id; 176 port->port_id = conf->port_id; 177 port->queue_id = conf->queue_id; 178 port->sched_type = conf->sched_type; 179 port->evt_op = conf->evt_op; 180 memset(&port->ev, 0, sizeof(port->ev)); 181 182 for (i = 0; i < RTE_DIM(port->ev); i++) { 183 port->ev[i].queue_id = port->queue_id; 184 port->ev[i].sched_type = port->sched_type; 185 port->ev[i].op = port->evt_op; 186 } 187 188 return port; 189 } 190 191 static inline void 192 send_burst(struct rte_port_eventdev_writer *p) 193 { 194 uint32_t nb_enq; 195 196 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 197 p->ev, p->enq_buf_count); 198 199 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, p->enq_buf_count - 200 nb_enq); 201 202 for (; nb_enq < p->enq_buf_count; nb_enq++) 203 rte_pktmbuf_free(p->ev[nb_enq].mbuf); 204 205 p->enq_buf_count = 0; 206 } 207 208 static int 209 rte_port_eventdev_writer_tx(void *port, struct rte_mbuf *pkt) 210 { 211 struct rte_port_eventdev_writer *p = port; 212 213 p->ev[p->enq_buf_count++].mbuf = pkt; 214 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1); 215 if (p->enq_buf_count >= p->enq_burst_sz) 216 send_burst(p); 217 218 return 0; 219 } 220 221 static int 222 rte_port_eventdev_writer_tx_bulk(void *port, 223 struct rte_mbuf **pkts, 224 uint64_t pkts_mask) 225 { 226 struct rte_port_eventdev_writer *p = 227 port; 228 uint64_t bsz_mask = p->bsz_mask; 229 uint32_t enq_buf_count = p->enq_buf_count; 230 uint64_t expr = (pkts_mask & (pkts_mask + 1)) | 231 ((pkts_mask & bsz_mask) ^ bsz_mask); 232 233 if (expr == 0) { 234 uint64_t n_pkts = __builtin_popcountll(pkts_mask); 235 uint32_t i, n_enq_ok; 236 237 if (enq_buf_count) 238 send_burst(p); 239 240 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, n_pkts); 241 242 struct rte_event events[2 * RTE_PORT_IN_BURST_SIZE_MAX] = {}; 243 for (i = 0; i < n_pkts; i++) { 244 events[i].mbuf = pkts[i]; 245 events[i].queue_id = p->queue_id; 246 events[i].sched_type = p->sched_type; 247 events[i].op = p->evt_op; 248 } 249 250 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 251 events, n_pkts); 252 253 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_DROP_ADD(p, 254 n_pkts - n_enq_ok); 255 for (; n_enq_ok < n_pkts; n_enq_ok++) 256 rte_pktmbuf_free(pkts[n_enq_ok]); 257 258 } else { 259 for (; pkts_mask;) { 260 uint32_t pkt_index = __builtin_ctzll(pkts_mask); 261 uint64_t pkt_mask = 1LLU << pkt_index; 262 263 p->ev[enq_buf_count++].mbuf = pkts[pkt_index]; 264 265 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1); 266 pkts_mask &= ~pkt_mask; 267 } 268 269 p->enq_buf_count = enq_buf_count; 270 if (enq_buf_count >= p->enq_burst_sz) 271 send_burst(p); 272 } 273 274 return 0; 275 } 276 277 static int 278 rte_port_eventdev_writer_flush(void *port) 279 { 280 struct rte_port_eventdev_writer *p = 281 port; 282 283 if (p->enq_buf_count > 0) 284 send_burst(p); 285 286 return 0; 287 } 288 289 static int 290 rte_port_eventdev_writer_free(void *port) 291 { 292 if (port == NULL) { 293 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__); 294 return -EINVAL; 295 } 296 297 rte_port_eventdev_writer_flush(port); 298 rte_free(port); 299 300 return 0; 301 } 302 303 static int rte_port_eventdev_writer_stats_read(void *port, 304 struct rte_port_out_stats *stats, int clear) 305 { 306 struct rte_port_eventdev_writer *p = 307 port; 308 309 if (stats != NULL) 310 memcpy(stats, &p->stats, sizeof(p->stats)); 311 312 if (clear) 313 memset(&p->stats, 0, sizeof(p->stats)); 314 315 return 0; 316 } 317 318 /* 319 * Port EVENTDEV Writer Nodrop 320 */ 321 #ifdef RTE_PORT_STATS_COLLECT 322 323 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) \ 324 do {port->stats.n_pkts_in += val;} while (0) 325 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) \ 326 do {port->stats.n_pkts_drop += val;} while (0) 327 328 #else 329 330 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(port, val) 331 #define RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(port, val) 332 333 #endif 334 335 struct rte_port_eventdev_writer_nodrop { 336 struct rte_port_out_stats stats; 337 338 struct rte_event ev[2 * RTE_PORT_IN_BURST_SIZE_MAX]; 339 340 uint32_t enq_burst_sz; 341 uint32_t enq_buf_count; 342 uint64_t bsz_mask; 343 uint64_t n_retries; 344 uint8_t eventdev_id; 345 uint8_t port_id; 346 uint8_t queue_id; 347 uint8_t sched_type; 348 uint8_t evt_op; 349 }; 350 351 352 static void * 353 rte_port_eventdev_writer_nodrop_create(void *params, int socket_id) 354 { 355 struct rte_port_eventdev_writer_nodrop_params *conf = 356 params; 357 struct rte_port_eventdev_writer_nodrop *port; 358 unsigned int i; 359 360 /* Check input parameters */ 361 if ((conf == NULL) || 362 (conf->enq_burst_sz == 0) || 363 (conf->enq_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX) || 364 (!rte_is_power_of_2(conf->enq_burst_sz))) { 365 RTE_LOG(ERR, PORT, "%s: Invalid input parameters\n", __func__); 366 return NULL; 367 } 368 369 /* Memory allocation */ 370 port = rte_zmalloc_socket("PORT", sizeof(*port), 371 RTE_CACHE_LINE_SIZE, socket_id); 372 if (port == NULL) { 373 RTE_LOG(ERR, PORT, "%s: Failed to allocate port\n", __func__); 374 return NULL; 375 } 376 377 /* Initialization */ 378 port->enq_burst_sz = conf->enq_burst_sz; 379 port->enq_buf_count = 0; 380 port->bsz_mask = 1LLU << (conf->enq_burst_sz - 1); 381 382 port->eventdev_id = conf->eventdev_id; 383 port->port_id = conf->port_id; 384 port->queue_id = conf->queue_id; 385 port->sched_type = conf->sched_type; 386 port->evt_op = conf->evt_op; 387 memset(&port->ev, 0, sizeof(port->ev)); 388 389 for (i = 0; i < RTE_DIM(port->ev); i++) { 390 port->ev[i].queue_id = port->queue_id; 391 port->ev[i].sched_type = port->sched_type; 392 port->ev[i].op = port->evt_op; 393 } 394 /* 395 * When n_retries is 0 it means that we should wait for every event to 396 * send no matter how many retries should it take. To limit number of 397 * branches in fast path, we use UINT64_MAX instead of branching. 398 */ 399 port->n_retries = (conf->n_retries == 0) ? UINT64_MAX : conf->n_retries; 400 401 return port; 402 } 403 404 static inline void 405 send_burst_nodrop(struct rte_port_eventdev_writer_nodrop *p) 406 { 407 uint32_t nb_enq, i; 408 409 nb_enq = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 410 p->ev, p->enq_buf_count); 411 412 /* We sent all the packets in a first try */ 413 if (nb_enq >= p->enq_buf_count) { 414 p->enq_buf_count = 0; 415 return; 416 } 417 418 for (i = 0; i < p->n_retries; i++) { 419 nb_enq += rte_event_enqueue_burst(p->eventdev_id, p->port_id, 420 p->ev + nb_enq, 421 p->enq_buf_count - nb_enq); 422 423 /* We sent all the events in more than one try */ 424 if (nb_enq >= p->enq_buf_count) { 425 p->enq_buf_count = 0; 426 return; 427 } 428 } 429 /* We didn't send the events in maximum allowed attempts */ 430 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_DROP_ADD(p, 431 p->enq_buf_count - nb_enq); 432 for (; nb_enq < p->enq_buf_count; nb_enq++) 433 rte_pktmbuf_free(p->ev[nb_enq].mbuf); 434 435 p->enq_buf_count = 0; 436 } 437 438 static int 439 rte_port_eventdev_writer_nodrop_tx(void *port, struct rte_mbuf *pkt) 440 { 441 struct rte_port_eventdev_writer_nodrop *p = port; 442 443 p->ev[p->enq_buf_count++].mbuf = pkt; 444 445 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, 1); 446 if (p->enq_buf_count >= p->enq_burst_sz) 447 send_burst_nodrop(p); 448 449 return 0; 450 } 451 452 static int 453 rte_port_eventdev_writer_nodrop_tx_bulk(void *port, 454 struct rte_mbuf **pkts, 455 uint64_t pkts_mask) 456 { 457 struct rte_port_eventdev_writer_nodrop *p = 458 port; 459 460 uint64_t bsz_mask = p->bsz_mask; 461 uint32_t enq_buf_count = p->enq_buf_count; 462 uint64_t expr = (pkts_mask & (pkts_mask + 1)) | 463 ((pkts_mask & bsz_mask) ^ bsz_mask); 464 465 if (expr == 0) { 466 uint64_t n_pkts = __builtin_popcountll(pkts_mask); 467 uint32_t i, n_enq_ok; 468 469 if (enq_buf_count) 470 send_burst_nodrop(p); 471 472 RTE_PORT_EVENTDEV_WRITER_NODROP_STATS_PKTS_IN_ADD(p, n_pkts); 473 474 struct rte_event events[RTE_PORT_IN_BURST_SIZE_MAX] = {}; 475 476 for (i = 0; i < n_pkts; i++) { 477 events[i].mbuf = pkts[i]; 478 events[i].queue_id = p->queue_id; 479 events[i].sched_type = p->sched_type; 480 events[i].op = p->evt_op; 481 } 482 483 n_enq_ok = rte_event_enqueue_burst(p->eventdev_id, p->port_id, 484 events, n_pkts); 485 486 if (n_enq_ok >= n_pkts) 487 return 0; 488 489 /* 490 * If we did not manage to enqueue all events in single burst, 491 * move remaining events to the buffer and call send burst. 492 */ 493 for (; n_enq_ok < n_pkts; n_enq_ok++) { 494 struct rte_mbuf *pkt = pkts[n_enq_ok]; 495 p->ev[p->enq_buf_count++].mbuf = pkt; 496 } 497 send_burst_nodrop(p); 498 } else { 499 for (; pkts_mask;) { 500 uint32_t pkt_index = __builtin_ctzll(pkts_mask); 501 uint64_t pkt_mask = 1LLU << pkt_index; 502 503 p->ev[enq_buf_count++].mbuf = pkts[pkt_index]; 504 505 RTE_PORT_EVENTDEV_WRITER_STATS_PKTS_IN_ADD(p, 1); 506 pkts_mask &= ~pkt_mask; 507 } 508 509 p->enq_buf_count = enq_buf_count; 510 if (enq_buf_count >= p->enq_burst_sz) 511 send_burst_nodrop(p); 512 } 513 514 return 0; 515 } 516 517 static int 518 rte_port_eventdev_writer_nodrop_flush(void *port) 519 { 520 struct rte_port_eventdev_writer_nodrop *p = 521 port; 522 523 if (p->enq_buf_count > 0) 524 send_burst_nodrop(p); 525 526 return 0; 527 } 528 529 static int 530 rte_port_eventdev_writer_nodrop_free(void *port) 531 { 532 if (port == NULL) { 533 RTE_LOG(ERR, PORT, "%s: Port is NULL\n", __func__); 534 return -EINVAL; 535 } 536 537 rte_port_eventdev_writer_nodrop_flush(port); 538 rte_free(port); 539 540 return 0; 541 } 542 543 static int rte_port_eventdev_writer_nodrop_stats_read(void *port, 544 struct rte_port_out_stats *stats, int clear) 545 { 546 struct rte_port_eventdev_writer_nodrop *p = 547 port; 548 549 if (stats != NULL) 550 memcpy(stats, &p->stats, sizeof(p->stats)); 551 552 if (clear) 553 memset(&p->stats, 0, sizeof(p->stats)); 554 555 return 0; 556 } 557 558 /* 559 * Summary of port operations 560 */ 561 struct rte_port_in_ops rte_port_eventdev_reader_ops = { 562 .f_create = rte_port_eventdev_reader_create, 563 .f_free = rte_port_eventdev_reader_free, 564 .f_rx = rte_port_eventdev_reader_rx, 565 .f_stats = rte_port_eventdev_reader_stats_read, 566 }; 567 568 struct rte_port_out_ops rte_port_eventdev_writer_ops = { 569 .f_create = rte_port_eventdev_writer_create, 570 .f_free = rte_port_eventdev_writer_free, 571 .f_tx = rte_port_eventdev_writer_tx, 572 .f_tx_bulk = rte_port_eventdev_writer_tx_bulk, 573 .f_flush = rte_port_eventdev_writer_flush, 574 .f_stats = rte_port_eventdev_writer_stats_read, 575 }; 576 577 struct rte_port_out_ops rte_port_eventdev_writer_nodrop_ops = { 578 .f_create = rte_port_eventdev_writer_nodrop_create, 579 .f_free = rte_port_eventdev_writer_nodrop_free, 580 .f_tx = rte_port_eventdev_writer_nodrop_tx, 581 .f_tx_bulk = rte_port_eventdev_writer_nodrop_tx_bulk, 582 .f_flush = rte_port_eventdev_writer_nodrop_flush, 583 .f_stats = rte_port_eventdev_writer_nodrop_stats_read, 584 }; 585