1 /* SPDX-License-Identifier: BSD-3-Clause 2 * 3 * Copyright (c) 2020 Arm Limited 4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org 5 * All rights reserved. 6 * Derived from FreeBSD's bufring.h 7 * Used as BSD-3 Licensed with permission from Kip Macy. 8 */ 9 10 #ifndef _RTE_RING_PEEK_ZC_H_ 11 #define _RTE_RING_PEEK_ZC_H_ 12 13 /** 14 * @file 15 * It is not recommended to include this file directly. 16 * Please include <rte_ring_elem.h> instead. 17 * 18 * Ring Peek Zero Copy APIs 19 * These APIs make it possible to split public enqueue/dequeue API 20 * into 3 parts: 21 * - enqueue/dequeue start 22 * - copy data to/from the ring 23 * - enqueue/dequeue finish 24 * Along with the advantages of the peek APIs, these APIs provide the ability 25 * to avoid copying of the data to temporary area (for ex: array of mbufs 26 * on the stack). 27 * 28 * Note that currently these APIs are available only for two sync modes: 29 * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) 30 * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). 31 * It is user's responsibility to create/init ring with appropriate sync 32 * modes selected. 33 * 34 * Following are some examples showing the API usage. 35 * 1) 36 * struct elem_obj {uint64_t a; uint32_t b, c;}; 37 * struct elem_obj *obj; 38 * 39 * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS 40 * // Reserve space on the ring 41 * n = rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(elem_obj), 1, &zcd, NULL); 42 * 43 * // Produce the data directly on the ring memory 44 * obj = (struct elem_obj *)zcd->ptr1; 45 * obj->a = rte_get_a(); 46 * obj->b = rte_get_b(); 47 * obj->c = rte_get_c(); 48 * rte_ring_enqueue_zc_elem_finish(ring, n); 49 * 50 * 2) 51 * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS 52 * // Reserve space on the ring 53 * n = rte_ring_enqueue_zc_burst_start(r, 32, &zcd, NULL); 54 * 55 * // Pkt I/O core polls packets from the NIC 56 * if (n != 0) { 57 * nb_rx = rte_eth_rx_burst(portid, queueid, zcd->ptr1, zcd->n1); 58 * if (nb_rx == zcd->n1 && n != zcd->n1) 59 * nb_rx = rte_eth_rx_burst(portid, queueid, 60 * zcd->ptr2, n - zcd->n1); 61 * 62 * // Provide packets to the packet processing cores 63 * rte_ring_enqueue_zc_finish(r, nb_rx); 64 * } 65 * 66 * Note that between _start_ and _finish_ none other thread can proceed 67 * with enqueue/dequeue operation till _finish_ completes. 68 */ 69 70 #include <rte_ring_peek_elem_pvt.h> 71 72 #ifdef __cplusplus 73 extern "C" { 74 #endif 75 76 /** 77 * Ring zero-copy information structure. 78 * 79 * This structure contains the pointers and length of the space 80 * reserved on the ring storage. 81 */ 82 struct __rte_cache_aligned rte_ring_zc_data { 83 /* Pointer to the first space in the ring */ 84 void *ptr1; 85 /* Pointer to the second space in the ring if there is wrap-around. 86 * It contains valid value only if wrap-around happens. 87 */ 88 void *ptr2; 89 /* Number of elements in the first pointer. If this is equal to 90 * the number of elements requested, then ptr2 is NULL. 91 * Otherwise, subtracting n1 from number of elements requested 92 * will give the number of elements available at ptr2. 93 */ 94 unsigned int n1; 95 }; 96 97 static __rte_always_inline void 98 __rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, 99 uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2) 100 { 101 uint32_t idx, scale, nr_idx; 102 uint32_t *ring = (uint32_t *)&r[1]; 103 104 /* Normalize to uint32_t */ 105 scale = esize / sizeof(uint32_t); 106 idx = head & r->mask; 107 nr_idx = idx * scale; 108 109 *dst1 = ring + nr_idx; 110 *n1 = num; 111 112 if (idx + num > r->size) { 113 *n1 = r->size - idx; 114 *dst2 = ring; 115 } else { 116 *dst2 = NULL; 117 } 118 } 119 120 /** 121 * @internal This function moves prod head value. 122 */ 123 static __rte_always_inline unsigned int 124 __rte_ring_do_enqueue_zc_elem_start(struct rte_ring *r, unsigned int esize, 125 uint32_t n, enum rte_ring_queue_behavior behavior, 126 struct rte_ring_zc_data *zcd, unsigned int *free_space) 127 { 128 uint32_t free, head, next; 129 130 switch (r->prod.sync_type) { 131 case RTE_RING_SYNC_ST: 132 n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, 133 behavior, &head, &next, &free); 134 break; 135 case RTE_RING_SYNC_MT_HTS: 136 n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); 137 break; 138 case RTE_RING_SYNC_MT: 139 case RTE_RING_SYNC_MT_RTS: 140 default: 141 /* unsupported mode, shouldn't be here */ 142 RTE_ASSERT(0); 143 n = 0; 144 free = 0; 145 return n; 146 } 147 148 __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1, 149 &zcd->n1, &zcd->ptr2); 150 151 if (free_space != NULL) 152 *free_space = free - n; 153 return n; 154 } 155 156 /** 157 * Start to enqueue several objects on the ring. 158 * Note that no actual objects are put in the queue by this function, 159 * it just reserves space for the user on the ring. 160 * User has to copy objects into the queue using the returned pointers. 161 * User should call rte_ring_enqueue_zc_elem_finish to complete the 162 * enqueue operation. 163 * 164 * @param r 165 * A pointer to the ring structure. 166 * @param esize 167 * The size of ring element, in bytes. It must be a multiple of 4. 168 * @param n 169 * The number of objects to add in the ring. 170 * @param zcd 171 * Structure containing the pointers and length of the space 172 * reserved on the ring storage. 173 * @param free_space 174 * If non-NULL, returns the amount of space in the ring after the 175 * reservation operation has finished. 176 * @return 177 * The number of objects that can be enqueued, either 0 or n 178 */ 179 static __rte_always_inline unsigned int 180 rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize, 181 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space) 182 { 183 return __rte_ring_do_enqueue_zc_elem_start(r, esize, n, 184 RTE_RING_QUEUE_FIXED, zcd, free_space); 185 } 186 187 /** 188 * Start to enqueue several pointers to objects on the ring. 189 * Note that no actual pointers are put in the queue by this function, 190 * it just reserves space for the user on the ring. 191 * User has to copy pointers to objects into the queue using the 192 * returned pointers. 193 * User should call rte_ring_enqueue_zc_finish to complete the 194 * enqueue operation. 195 * 196 * @param r 197 * A pointer to the ring structure. 198 * @param n 199 * The number of objects to add in the ring. 200 * @param zcd 201 * Structure containing the pointers and length of the space 202 * reserved on the ring storage. 203 * @param free_space 204 * If non-NULL, returns the amount of space in the ring after the 205 * reservation operation has finished. 206 * @return 207 * The number of objects that can be enqueued, either 0 or n 208 */ 209 static __rte_always_inline unsigned int 210 rte_ring_enqueue_zc_bulk_start(struct rte_ring *r, unsigned int n, 211 struct rte_ring_zc_data *zcd, unsigned int *free_space) 212 { 213 return rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(uintptr_t), n, 214 zcd, free_space); 215 } 216 217 /** 218 * Start to enqueue several objects on the ring. 219 * Note that no actual objects are put in the queue by this function, 220 * it just reserves space for the user on the ring. 221 * User has to copy objects into the queue using the returned pointers. 222 * User should call rte_ring_enqueue_zc_elem_finish to complete the 223 * enqueue operation. 224 * 225 * @param r 226 * A pointer to the ring structure. 227 * @param esize 228 * The size of ring element, in bytes. It must be a multiple of 4. 229 * @param n 230 * The number of objects to add in the ring. 231 * @param zcd 232 * Structure containing the pointers and length of the space 233 * reserved on the ring storage. 234 * @param free_space 235 * If non-NULL, returns the amount of space in the ring after the 236 * reservation operation has finished. 237 * @return 238 * The number of objects that can be enqueued, either 0 or n 239 */ 240 static __rte_always_inline unsigned int 241 rte_ring_enqueue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize, 242 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space) 243 { 244 return __rte_ring_do_enqueue_zc_elem_start(r, esize, n, 245 RTE_RING_QUEUE_VARIABLE, zcd, free_space); 246 } 247 248 /** 249 * Start to enqueue several pointers to objects on the ring. 250 * Note that no actual pointers are put in the queue by this function, 251 * it just reserves space for the user on the ring. 252 * User has to copy pointers to objects into the queue using the 253 * returned pointers. 254 * User should call rte_ring_enqueue_zc_finish to complete the 255 * enqueue operation. 256 * 257 * @param r 258 * A pointer to the ring structure. 259 * @param n 260 * The number of objects to add in the ring. 261 * @param zcd 262 * Structure containing the pointers and length of the space 263 * reserved on the ring storage. 264 * @param free_space 265 * If non-NULL, returns the amount of space in the ring after the 266 * reservation operation has finished. 267 * @return 268 * The number of objects that can be enqueued, either 0 or n. 269 */ 270 static __rte_always_inline unsigned int 271 rte_ring_enqueue_zc_burst_start(struct rte_ring *r, unsigned int n, 272 struct rte_ring_zc_data *zcd, unsigned int *free_space) 273 { 274 return rte_ring_enqueue_zc_burst_elem_start(r, sizeof(uintptr_t), n, 275 zcd, free_space); 276 } 277 278 /** 279 * Complete enqueuing several objects on the ring. 280 * Note that number of objects to enqueue should not exceed previous 281 * enqueue_start return value. 282 * 283 * @param r 284 * A pointer to the ring structure. 285 * @param n 286 * The number of objects to add to the ring. 287 */ 288 static __rte_always_inline void 289 rte_ring_enqueue_zc_elem_finish(struct rte_ring *r, unsigned int n) 290 { 291 uint32_t tail; 292 293 switch (r->prod.sync_type) { 294 case RTE_RING_SYNC_ST: 295 n = __rte_ring_st_get_tail(&r->prod, &tail, n); 296 __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); 297 break; 298 case RTE_RING_SYNC_MT_HTS: 299 n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); 300 __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); 301 break; 302 case RTE_RING_SYNC_MT: 303 case RTE_RING_SYNC_MT_RTS: 304 default: 305 /* unsupported mode, shouldn't be here */ 306 RTE_ASSERT(0); 307 } 308 } 309 310 /** 311 * Complete enqueuing several pointers to objects on the ring. 312 * Note that number of objects to enqueue should not exceed previous 313 * enqueue_start return value. 314 * 315 * @param r 316 * A pointer to the ring structure. 317 * @param n 318 * The number of pointers to objects to add to the ring. 319 */ 320 static __rte_always_inline void 321 rte_ring_enqueue_zc_finish(struct rte_ring *r, unsigned int n) 322 { 323 rte_ring_enqueue_zc_elem_finish(r, n); 324 } 325 326 /** 327 * @internal This function moves cons head value and copies up to *n* 328 * objects from the ring to the user provided obj_table. 329 */ 330 static __rte_always_inline unsigned int 331 __rte_ring_do_dequeue_zc_elem_start(struct rte_ring *r, 332 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, 333 struct rte_ring_zc_data *zcd, unsigned int *available) 334 { 335 uint32_t avail, head, next; 336 337 switch (r->cons.sync_type) { 338 case RTE_RING_SYNC_ST: 339 n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, 340 behavior, &head, &next, &avail); 341 break; 342 case RTE_RING_SYNC_MT_HTS: 343 n = __rte_ring_hts_move_cons_head(r, n, behavior, 344 &head, &avail); 345 break; 346 case RTE_RING_SYNC_MT: 347 case RTE_RING_SYNC_MT_RTS: 348 default: 349 /* unsupported mode, shouldn't be here */ 350 RTE_ASSERT(0); 351 n = 0; 352 avail = 0; 353 return n; 354 } 355 356 __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1, 357 &zcd->n1, &zcd->ptr2); 358 359 if (available != NULL) 360 *available = avail - n; 361 return n; 362 } 363 364 /** 365 * Start to dequeue several objects from the ring. 366 * Note that no actual objects are copied from the queue by this function. 367 * User has to copy objects from the queue using the returned pointers. 368 * User should call rte_ring_dequeue_zc_elem_finish to complete the 369 * dequeue operation. 370 * 371 * @param r 372 * A pointer to the ring structure. 373 * @param esize 374 * The size of ring element, in bytes. It must be a multiple of 4. 375 * @param n 376 * The number of objects to remove from the ring. 377 * @param zcd 378 * Structure containing the pointers and length of the space 379 * reserved on the ring storage. 380 * @param available 381 * If non-NULL, returns the number of remaining ring entries after the 382 * dequeue has finished. 383 * @return 384 * The number of objects that can be dequeued, either 0 or n. 385 */ 386 static __rte_always_inline unsigned int 387 rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize, 388 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available) 389 { 390 return __rte_ring_do_dequeue_zc_elem_start(r, esize, n, 391 RTE_RING_QUEUE_FIXED, zcd, available); 392 } 393 394 /** 395 * Start to dequeue several pointers to objects from the ring. 396 * Note that no actual pointers are removed from the queue by this function. 397 * User has to copy pointers to objects from the queue using the 398 * returned pointers. 399 * User should call rte_ring_dequeue_zc_finish to complete the 400 * dequeue operation. 401 * 402 * @param r 403 * A pointer to the ring structure. 404 * @param n 405 * The number of objects to remove from the ring. 406 * @param zcd 407 * Structure containing the pointers and length of the space 408 * reserved on the ring storage. 409 * @param available 410 * If non-NULL, returns the number of remaining ring entries after the 411 * dequeue has finished. 412 * @return 413 * The number of objects that can be dequeued, either 0 or n. 414 */ 415 static __rte_always_inline unsigned int 416 rte_ring_dequeue_zc_bulk_start(struct rte_ring *r, unsigned int n, 417 struct rte_ring_zc_data *zcd, unsigned int *available) 418 { 419 return rte_ring_dequeue_zc_bulk_elem_start(r, sizeof(uintptr_t), 420 n, zcd, available); 421 } 422 423 /** 424 * Start to dequeue several objects from the ring. 425 * Note that no actual objects are copied from the queue by this function. 426 * User has to copy objects from the queue using the returned pointers. 427 * User should call rte_ring_dequeue_zc_elem_finish to complete the 428 * dequeue operation. 429 * 430 * @param r 431 * A pointer to the ring structure. 432 * @param esize 433 * The size of ring element, in bytes. It must be a multiple of 4. 434 * This must be the same value used while creating the ring. Otherwise 435 * the results are undefined. 436 * @param n 437 * The number of objects to dequeue from the ring. 438 * @param zcd 439 * Structure containing the pointers and length of the space 440 * reserved on the ring storage. 441 * @param available 442 * If non-NULL, returns the number of remaining ring entries after the 443 * dequeue has finished. 444 * @return 445 * The number of objects that can be dequeued, either 0 or n. 446 */ 447 static __rte_always_inline unsigned int 448 rte_ring_dequeue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize, 449 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available) 450 { 451 return __rte_ring_do_dequeue_zc_elem_start(r, esize, n, 452 RTE_RING_QUEUE_VARIABLE, zcd, available); 453 } 454 455 /** 456 * Start to dequeue several pointers to objects from the ring. 457 * Note that no actual pointers are removed from the queue by this function. 458 * User has to copy pointers to objects from the queue using the 459 * returned pointers. 460 * User should call rte_ring_dequeue_zc_finish to complete the 461 * dequeue operation. 462 * 463 * @param r 464 * A pointer to the ring structure. 465 * @param n 466 * The number of objects to remove from the ring. 467 * @param zcd 468 * Structure containing the pointers and length of the space 469 * reserved on the ring storage. 470 * @param available 471 * If non-NULL, returns the number of remaining ring entries after the 472 * dequeue has finished. 473 * @return 474 * The number of objects that can be dequeued, either 0 or n. 475 */ 476 static __rte_always_inline unsigned int 477 rte_ring_dequeue_zc_burst_start(struct rte_ring *r, unsigned int n, 478 struct rte_ring_zc_data *zcd, unsigned int *available) 479 { 480 return rte_ring_dequeue_zc_burst_elem_start(r, sizeof(uintptr_t), n, 481 zcd, available); 482 } 483 484 /** 485 * Complete dequeuing several objects from the ring. 486 * Note that number of objects to dequeued should not exceed previous 487 * dequeue_start return value. 488 * 489 * @param r 490 * A pointer to the ring structure. 491 * @param n 492 * The number of objects to remove from the ring. 493 */ 494 static __rte_always_inline void 495 rte_ring_dequeue_zc_elem_finish(struct rte_ring *r, unsigned int n) 496 { 497 uint32_t tail; 498 499 switch (r->cons.sync_type) { 500 case RTE_RING_SYNC_ST: 501 n = __rte_ring_st_get_tail(&r->cons, &tail, n); 502 __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); 503 break; 504 case RTE_RING_SYNC_MT_HTS: 505 n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); 506 __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); 507 break; 508 case RTE_RING_SYNC_MT: 509 case RTE_RING_SYNC_MT_RTS: 510 default: 511 /* unsupported mode, shouldn't be here */ 512 RTE_ASSERT(0); 513 } 514 } 515 516 /** 517 * Complete dequeuing several objects from the ring. 518 * Note that number of objects to dequeued should not exceed previous 519 * dequeue_start return value. 520 * 521 * @param r 522 * A pointer to the ring structure. 523 * @param n 524 * The number of objects to remove from the ring. 525 */ 526 static __rte_always_inline void 527 rte_ring_dequeue_zc_finish(struct rte_ring *r, unsigned int n) 528 { 529 rte_ring_dequeue_elem_finish(r, n); 530 } 531 532 #ifdef __cplusplus 533 } 534 #endif 535 536 #endif /* _RTE_RING_PEEK_ZC_H_ */ 537