1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2017 Intel Corporation 3 */ 4 5 #include <stdbool.h> 6 #include <stddef.h> 7 #include <stdint.h> 8 #include <stdio.h> 9 10 #include <rte_string_fns.h> 11 #include <rte_branch_prediction.h> 12 #include <rte_debug.h> 13 #include <rte_lcore.h> 14 #include <rte_log.h> 15 #include <rte_malloc.h> 16 #include <rte_memcpy.h> 17 #include <rte_memory.h> 18 #include <rte_memzone.h> 19 #include <rte_atomic.h> 20 21 #include "opdl_ring.h" 22 #include "opdl_log.h" 23 24 #define LIB_NAME "opdl_ring" 25 26 #define OPDL_NAME_SIZE 64 27 28 29 #define OPDL_EVENT_MASK (0x00000000000FFFFFULL) 30 #define OPDL_FLOWID_MASK (0xFFFFF) 31 #define OPDL_OPA_MASK (0xFF) 32 #define OPDL_OPA_OFFSET (0x38) 33 34 /* Types of dependency between stages */ 35 enum dep_type { 36 DEP_NONE = 0, /* no dependency */ 37 DEP_DIRECT, /* stage has direct dependency */ 38 DEP_INDIRECT, /* in-direct dependency through other stage(s) */ 39 DEP_SELF, /* stage dependency on itself, used to detect loops */ 40 }; 41 42 /* Shared section of stage state. 43 * Care is needed when accessing and the layout is important, especially to 44 * limit the adjacent cache-line HW prefetcher from impacting performance. 45 */ 46 struct __rte_cache_aligned shared_state { 47 /* Last known minimum sequence number of dependencies, used for multi 48 * thread operation 49 */ 50 RTE_ATOMIC(uint32_t) available_seq; 51 char _pad1[RTE_CACHE_LINE_SIZE * 3]; 52 RTE_ATOMIC(uint32_t) head; /* Head sequence number (for multi thread operation) */ 53 char _pad2[RTE_CACHE_LINE_SIZE * 3]; 54 struct opdl_stage *stage; /* back pointer */ 55 RTE_ATOMIC(uint32_t) tail; /* Tail sequence number */ 56 char _pad3[RTE_CACHE_LINE_SIZE * 2]; 57 }; 58 59 /* A structure to keep track of "unfinished" claims. This is only used for 60 * stages that are threadsafe. Each lcore accesses its own instance of this 61 * structure to record the entries it has claimed. This allows one lcore to make 62 * multiple claims without being blocked by another. When disclaiming it moves 63 * forward the shared tail when the shared tail matches the tail value recorded 64 * here. 65 */ 66 struct __rte_cache_aligned claim_manager { 67 uint32_t num_to_disclaim; 68 uint32_t num_claimed; 69 uint32_t mgr_head; 70 uint32_t mgr_tail; 71 struct { 72 uint32_t head; 73 uint32_t tail; 74 } claims[OPDL_DISCLAIMS_PER_LCORE]; 75 }; 76 77 /* Context for each stage of opdl_ring. 78 * Calculations on sequence numbers need to be done with other uint32_t values 79 * so that results are modulus 2^32, and not undefined. 80 */ 81 struct __rte_cache_aligned opdl_stage { 82 struct opdl_ring *t; /* back pointer, set at init */ 83 uint32_t num_slots; /* Number of slots for entries, set at init */ 84 uint32_t index; /* ID for this stage, set at init */ 85 bool threadsafe; /* Set to 1 if this stage supports threadsafe use */ 86 /* Last known min seq number of dependencies for used for single thread 87 * operation 88 */ 89 uint32_t available_seq; 90 uint32_t head; /* Current head for single-thread operation */ 91 uint32_t nb_instance; /* Number of instances */ 92 uint32_t instance_id; /* ID of this stage instance */ 93 uint16_t num_claimed; /* Number of slots claimed */ 94 uint16_t num_event; /* Number of events */ 95 uint32_t seq; /* sequence number */ 96 uint32_t num_deps; /* Number of direct dependencies */ 97 /* Keep track of all dependencies, used during init only */ 98 enum dep_type *dep_tracking; 99 /* Direct dependencies of this stage */ 100 struct shared_state **deps; 101 /* Other stages read this! */ 102 alignas(RTE_CACHE_LINE_SIZE) struct shared_state shared; 103 /* For managing disclaims in multi-threaded processing stages */ 104 alignas(RTE_CACHE_LINE_SIZE) struct claim_manager pending_disclaims[RTE_MAX_LCORE]; 105 uint32_t shadow_head; /* Shadow head for single-thread operation */ 106 uint32_t queue_id; /* ID of Queue which is assigned to this stage */ 107 uint32_t pos; /* Atomic scan position */ 108 }; 109 110 /* Context for opdl_ring */ 111 struct opdl_ring { 112 char name[OPDL_NAME_SIZE]; /* OPDL queue instance name */ 113 int socket; /* NUMA socket that memory is allocated on */ 114 uint32_t num_slots; /* Number of slots for entries */ 115 uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */ 116 uint32_t slot_size; /* Size of each slot in bytes */ 117 uint32_t num_stages; /* Number of stages that have been added */ 118 uint32_t max_num_stages; /* Max number of stages */ 119 /* Stages indexed by ID */ 120 struct opdl_stage *stages; 121 /* Memory for storing slot data */ 122 alignas(RTE_CACHE_LINE_SIZE) uint8_t slots[]; 123 }; 124 125 126 /* Return input stage of a opdl_ring */ 127 static __rte_always_inline struct opdl_stage * 128 input_stage(const struct opdl_ring *t) 129 { 130 return &t->stages[0]; 131 } 132 133 /* Check if a stage is the input stage */ 134 static __rte_always_inline bool 135 is_input_stage(const struct opdl_stage *s) 136 { 137 return s->index == 0; 138 } 139 140 /* Get slot pointer from sequence number */ 141 static __rte_always_inline void * 142 get_slot(const struct opdl_ring *t, uint32_t n) 143 { 144 return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size]; 145 } 146 147 /* Find how many entries are available for processing */ 148 static __rte_always_inline uint32_t 149 available(const struct opdl_stage *s) 150 { 151 if (s->threadsafe == true) { 152 uint32_t n = rte_atomic_load_explicit(&s->shared.available_seq, 153 rte_memory_order_acquire) - 154 rte_atomic_load_explicit(&s->shared.head, 155 rte_memory_order_acquire); 156 157 /* Return 0 if available_seq needs to be updated */ 158 return (n <= s->num_slots) ? n : 0; 159 } 160 161 /* Single threaded */ 162 return s->available_seq - s->head; 163 } 164 165 /* Read sequence number of dependencies and find minimum */ 166 static __rte_always_inline void 167 update_available_seq(struct opdl_stage *s) 168 { 169 uint32_t i; 170 uint32_t this_tail = s->shared.tail; 171 uint32_t min_seq = rte_atomic_load_explicit(&s->deps[0]->tail, rte_memory_order_acquire); 172 /* Input stage sequence numbers are greater than the sequence numbers of 173 * its dependencies so an offset of t->num_slots is needed when 174 * calculating available slots and also the condition which is used to 175 * determine the dependencies minimum sequence number must be reverted. 176 */ 177 uint32_t wrap; 178 179 if (is_input_stage(s)) { 180 wrap = s->num_slots; 181 for (i = 1; i < s->num_deps; i++) { 182 uint32_t seq = rte_atomic_load_explicit(&s->deps[i]->tail, 183 rte_memory_order_acquire); 184 if ((this_tail - seq) > (this_tail - min_seq)) 185 min_seq = seq; 186 } 187 } else { 188 wrap = 0; 189 for (i = 1; i < s->num_deps; i++) { 190 uint32_t seq = rte_atomic_load_explicit(&s->deps[i]->tail, 191 rte_memory_order_acquire); 192 if ((seq - this_tail) < (min_seq - this_tail)) 193 min_seq = seq; 194 } 195 } 196 197 if (s->threadsafe == false) 198 s->available_seq = min_seq + wrap; 199 else 200 rte_atomic_store_explicit(&s->shared.available_seq, min_seq + wrap, 201 rte_memory_order_release); 202 } 203 204 /* Wait until the number of available slots reaches number requested */ 205 static __rte_always_inline void 206 wait_for_available(struct opdl_stage *s, uint32_t n) 207 { 208 while (available(s) < n) { 209 rte_pause(); 210 update_available_seq(s); 211 } 212 } 213 214 /* Return number of slots to process based on number requested and mode */ 215 static __rte_always_inline uint32_t 216 num_to_process(struct opdl_stage *s, uint32_t n, bool block) 217 { 218 /* Don't read tail sequences of dependencies if not needed */ 219 if (available(s) >= n) 220 return n; 221 222 update_available_seq(s); 223 224 if (block == false) { 225 uint32_t avail = available(s); 226 227 if (avail == 0) { 228 rte_pause(); 229 return 0; 230 } 231 return (avail <= n) ? avail : n; 232 } 233 234 if (unlikely(n > s->num_slots)) { 235 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)", 236 n, s->num_slots); 237 return 0; /* Avoid infinite loop */ 238 } 239 /* blocking */ 240 wait_for_available(s, n); 241 return n; 242 } 243 244 /* Copy entries in to slots with wrap-around */ 245 static __rte_always_inline void 246 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries, 247 uint32_t num_entries) 248 { 249 uint32_t slot_size = t->slot_size; 250 uint32_t slot_index = start & t->mask; 251 252 if (slot_index + num_entries <= t->num_slots) { 253 rte_memcpy(get_slot(t, start), entries, 254 num_entries * slot_size); 255 } else { 256 uint32_t split = t->num_slots - slot_index; 257 258 rte_memcpy(get_slot(t, start), entries, split * slot_size); 259 rte_memcpy(get_slot(t, 0), 260 RTE_PTR_ADD(entries, split * slot_size), 261 (num_entries - split) * slot_size); 262 } 263 } 264 265 /* Copy entries out from slots with wrap-around */ 266 static __rte_always_inline void 267 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries, 268 uint32_t num_entries) 269 { 270 uint32_t slot_size = t->slot_size; 271 uint32_t slot_index = start & t->mask; 272 273 if (slot_index + num_entries <= t->num_slots) { 274 rte_memcpy(entries, get_slot(t, start), 275 num_entries * slot_size); 276 } else { 277 uint32_t split = t->num_slots - slot_index; 278 279 rte_memcpy(entries, get_slot(t, start), split * slot_size); 280 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size), 281 get_slot(t, 0), 282 (num_entries - split) * slot_size); 283 } 284 } 285 286 /* Input function optimised for single thread */ 287 static __rte_always_inline uint32_t 288 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries, 289 uint32_t num_entries, bool block) 290 { 291 struct opdl_stage *s = input_stage(t); 292 uint32_t head = s->head; 293 294 num_entries = num_to_process(s, num_entries, block); 295 if (num_entries == 0) 296 return 0; 297 298 copy_entries_in(t, head, entries, num_entries); 299 300 s->head += num_entries; 301 rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release); 302 303 return num_entries; 304 } 305 306 /* Convert head and tail of claim_manager into valid index */ 307 static __rte_always_inline uint32_t 308 claim_mgr_index(uint32_t n) 309 { 310 return n & (OPDL_DISCLAIMS_PER_LCORE - 1); 311 } 312 313 /* Check if there are available slots in claim_manager */ 314 static __rte_always_inline bool 315 claim_mgr_available(struct claim_manager *mgr) 316 { 317 return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ? 318 true : false; 319 } 320 321 /* Record a new claim. Only use after first checking an entry is available */ 322 static __rte_always_inline void 323 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head) 324 { 325 if ((mgr->mgr_head != mgr->mgr_tail) && 326 (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head == 327 tail)) { 328 /* Combine with previous claim */ 329 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head; 330 } else { 331 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head; 332 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail; 333 mgr->mgr_head++; 334 } 335 336 mgr->num_claimed += (head - tail); 337 } 338 339 /* Read the oldest recorded claim */ 340 static __rte_always_inline bool 341 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head) 342 { 343 if (mgr->mgr_head == mgr->mgr_tail) 344 return false; 345 346 *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head; 347 *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail; 348 return true; 349 } 350 351 /* Remove the oldest recorded claim. Only use after first reading the entry */ 352 static __rte_always_inline void 353 claim_mgr_remove(struct claim_manager *mgr) 354 { 355 mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head - 356 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail); 357 mgr->mgr_tail++; 358 } 359 360 /* Update tail in the oldest claim. Only use after first reading the entry */ 361 static __rte_always_inline void 362 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries) 363 { 364 mgr->num_claimed -= num_entries; 365 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries; 366 } 367 368 static __rte_always_inline void 369 opdl_stage_disclaim_multithread_n(struct opdl_stage *s, 370 uint32_t num_entries, bool block) 371 { 372 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()]; 373 uint32_t head; 374 uint32_t tail; 375 376 while (num_entries) { 377 bool ret = claim_mgr_read(disclaims, &tail, &head); 378 379 if (ret == false) 380 break; /* nothing is claimed */ 381 /* There should be no race condition here. If shared.tail 382 * matches, no other core can update it until this one does. 383 */ 384 if (rte_atomic_load_explicit(&s->shared.tail, rte_memory_order_acquire) == 385 tail) { 386 if (num_entries >= (head - tail)) { 387 claim_mgr_remove(disclaims); 388 rte_atomic_store_explicit(&s->shared.tail, head, 389 rte_memory_order_release); 390 num_entries -= (head - tail); 391 } else { 392 claim_mgr_move_tail(disclaims, num_entries); 393 rte_atomic_store_explicit(&s->shared.tail, 394 num_entries + tail, 395 rte_memory_order_release); 396 num_entries = 0; 397 } 398 } else if (block == false) 399 break; /* blocked by other thread */ 400 /* Keep going until num_entries are disclaimed. */ 401 rte_pause(); 402 } 403 404 disclaims->num_to_disclaim = num_entries; 405 } 406 407 /* Move head atomically, returning number of entries available to process and 408 * the original value of head. For non-input stages, the claim is recorded 409 * so that the tail can be updated later by opdl_stage_disclaim(). 410 */ 411 static __rte_always_inline void 412 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries, 413 uint32_t *old_head, bool block, bool claim_func) 414 { 415 uint32_t orig_num_entries = *num_entries; 416 uint32_t ret; 417 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()]; 418 419 /* Attempt to disclaim any outstanding claims */ 420 opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim, 421 false); 422 423 *old_head = rte_atomic_load_explicit(&s->shared.head, rte_memory_order_acquire); 424 while (true) { 425 bool success; 426 /* If called by opdl_ring_input(), claim does not need to be 427 * recorded, as there will be no disclaim. 428 */ 429 if (claim_func) { 430 /* Check that the claim can be recorded */ 431 ret = claim_mgr_available(disclaims); 432 if (ret == false) { 433 /* exit out if claim can't be recorded */ 434 *num_entries = 0; 435 return; 436 } 437 } 438 439 *num_entries = num_to_process(s, orig_num_entries, block); 440 if (*num_entries == 0) 441 return; 442 443 success = rte_atomic_compare_exchange_weak_explicit(&s->shared.head, old_head, 444 *old_head + *num_entries, 445 rte_memory_order_release, /* memory order on success */ 446 rte_memory_order_acquire); /* memory order on fail */ 447 if (likely(success)) 448 break; 449 rte_pause(); 450 } 451 452 if (claim_func) 453 /* Store the claim record */ 454 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries); 455 } 456 457 /* Input function that supports multiple threads */ 458 static __rte_always_inline uint32_t 459 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries, 460 uint32_t num_entries, bool block) 461 { 462 struct opdl_stage *s = input_stage(t); 463 uint32_t old_head; 464 465 move_head_atomically(s, &num_entries, &old_head, block, false); 466 if (num_entries == 0) 467 return 0; 468 469 copy_entries_in(t, old_head, entries, num_entries); 470 471 /* If another thread started inputting before this one, but hasn't 472 * finished, we need to wait for it to complete to update the tail. 473 */ 474 rte_wait_until_equal_32((uint32_t *)(uintptr_t)&s->shared.tail, old_head, 475 rte_memory_order_acquire); 476 477 rte_atomic_store_explicit(&s->shared.tail, old_head + num_entries, 478 rte_memory_order_release); 479 480 return num_entries; 481 } 482 483 static __rte_always_inline uint32_t 484 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores, 485 uint8_t this_lcore) 486 { 487 return ((nb_p_lcores <= 1) ? 0 : 488 (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) % 489 nb_p_lcores); 490 } 491 492 /* Claim slots to process, optimised for single-thread operation */ 493 static __rte_always_inline uint32_t 494 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries, 495 uint32_t num_entries, uint32_t *seq, bool block, bool atomic) 496 { 497 uint32_t i = 0, j = 0, offset; 498 uint32_t opa_id = 0; 499 uint32_t flow_id = 0; 500 uint64_t event = 0; 501 void *get_slots; 502 struct rte_event *ev; 503 RTE_SET_USED(seq); 504 struct opdl_ring *t = s->t; 505 uint8_t *entries_offset = (uint8_t *)entries; 506 507 if (!atomic) { 508 509 offset = opdl_first_entry_id(s->seq, s->nb_instance, 510 s->instance_id); 511 512 num_entries = s->nb_instance * num_entries; 513 514 num_entries = num_to_process(s, num_entries, block); 515 516 for (; offset < num_entries; offset += s->nb_instance) { 517 get_slots = get_slot(t, s->head + offset); 518 memcpy(entries_offset, get_slots, t->slot_size); 519 entries_offset += t->slot_size; 520 i++; 521 } 522 } else { 523 num_entries = num_to_process(s, num_entries, block); 524 525 for (j = 0; j < num_entries; j++) { 526 ev = (struct rte_event *)get_slot(t, s->head+j); 527 528 event = rte_atomic_load_explicit((uint64_t __rte_atomic *)&ev->event, 529 rte_memory_order_acquire); 530 531 opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); 532 flow_id = OPDL_FLOWID_MASK & event; 533 534 if (opa_id >= s->queue_id) 535 continue; 536 537 if ((flow_id % s->nb_instance) == s->instance_id) { 538 memcpy(entries_offset, ev, t->slot_size); 539 entries_offset += t->slot_size; 540 i++; 541 } 542 } 543 } 544 s->shadow_head = s->head; 545 s->head += num_entries; 546 s->num_claimed = num_entries; 547 s->num_event = i; 548 s->pos = 0; 549 550 /* automatically disclaim entries if number of rte_events is zero */ 551 if (unlikely(i == 0)) 552 opdl_stage_disclaim(s, 0, false); 553 554 return i; 555 } 556 557 /* Thread-safe version of function to claim slots for processing */ 558 static __rte_always_inline uint32_t 559 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries, 560 uint32_t num_entries, uint32_t *seq, bool block) 561 { 562 uint32_t old_head; 563 struct opdl_ring *t = s->t; 564 uint32_t i = 0, offset; 565 uint8_t *entries_offset = (uint8_t *)entries; 566 567 if (seq == NULL) { 568 PMD_DRV_LOG(ERR, "Invalid seq PTR"); 569 return 0; 570 } 571 offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id); 572 num_entries = offset + (s->nb_instance * num_entries); 573 574 move_head_atomically(s, &num_entries, &old_head, block, true); 575 576 for (; offset < num_entries; offset += s->nb_instance) { 577 memcpy(entries_offset, get_slot(t, s->head + offset), 578 t->slot_size); 579 entries_offset += t->slot_size; 580 i++; 581 } 582 583 *seq = old_head; 584 585 return i; 586 } 587 588 /* Claim and copy slot pointers, optimised for single-thread operation */ 589 static __rte_always_inline uint32_t 590 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries, 591 uint32_t num_entries, uint32_t *seq, bool block) 592 { 593 num_entries = num_to_process(s, num_entries, block); 594 if (num_entries == 0) 595 return 0; 596 copy_entries_out(s->t, s->head, entries, num_entries); 597 if (seq != NULL) 598 *seq = s->head; 599 s->head += num_entries; 600 return num_entries; 601 } 602 603 /* Thread-safe version of function to claim and copy pointers to slots */ 604 static __rte_always_inline uint32_t 605 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries, 606 uint32_t num_entries, uint32_t *seq, bool block) 607 { 608 uint32_t old_head; 609 610 move_head_atomically(s, &num_entries, &old_head, block, true); 611 if (num_entries == 0) 612 return 0; 613 copy_entries_out(s->t, old_head, entries, num_entries); 614 if (seq != NULL) 615 *seq = old_head; 616 return num_entries; 617 } 618 619 static __rte_always_inline void 620 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s, 621 uint32_t num_entries) 622 { 623 uint32_t old_tail = s->shared.tail; 624 625 if (unlikely(num_entries > (s->head - old_tail))) { 626 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)", 627 num_entries, s->head - old_tail); 628 num_entries = s->head - old_tail; 629 } 630 rte_atomic_store_explicit(&s->shared.tail, num_entries + old_tail, 631 rte_memory_order_release); 632 } 633 634 uint32_t 635 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries, 636 bool block) 637 { 638 if (input_stage(t)->threadsafe == false) 639 return opdl_ring_input_singlethread(t, entries, num_entries, 640 block); 641 else 642 return opdl_ring_input_multithread(t, entries, num_entries, 643 block); 644 } 645 646 uint32_t 647 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s, 648 const void *entries, uint32_t num_entries, bool block) 649 { 650 uint32_t head = s->head; 651 652 num_entries = num_to_process(s, num_entries, block); 653 654 if (num_entries == 0) 655 return 0; 656 657 copy_entries_in(t, head, entries, num_entries); 658 659 s->head += num_entries; 660 rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release); 661 662 return num_entries; 663 664 } 665 666 uint32_t 667 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s, 668 void *entries, uint32_t num_entries, bool block) 669 { 670 uint32_t head = s->head; 671 672 num_entries = num_to_process(s, num_entries, block); 673 if (num_entries == 0) 674 return 0; 675 676 copy_entries_out(t, head, entries, num_entries); 677 678 s->head += num_entries; 679 rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release); 680 681 return num_entries; 682 } 683 684 uint32_t 685 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries) 686 { 687 /* return (num_to_process(s, num_entries, false)); */ 688 689 if (available(s) >= num_entries) 690 return num_entries; 691 692 update_available_seq(s); 693 694 uint32_t avail = available(s); 695 696 if (avail == 0) { 697 rte_pause(); 698 return 0; 699 } 700 return (avail <= num_entries) ? avail : num_entries; 701 } 702 703 uint32_t 704 opdl_stage_claim(struct opdl_stage *s, void *entries, 705 uint32_t num_entries, uint32_t *seq, bool block, bool atomic) 706 { 707 if (s->threadsafe == false) 708 return opdl_stage_claim_singlethread(s, entries, num_entries, 709 seq, block, atomic); 710 else 711 return opdl_stage_claim_multithread(s, entries, num_entries, 712 seq, block); 713 } 714 715 uint32_t 716 opdl_stage_claim_copy(struct opdl_stage *s, void *entries, 717 uint32_t num_entries, uint32_t *seq, bool block) 718 { 719 if (s->threadsafe == false) 720 return opdl_stage_claim_copy_singlethread(s, entries, 721 num_entries, seq, block); 722 else 723 return opdl_stage_claim_copy_multithread(s, entries, 724 num_entries, seq, block); 725 } 726 727 void 728 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries, 729 bool block) 730 { 731 732 if (s->threadsafe == false) { 733 opdl_stage_disclaim_singlethread_n(s, s->num_claimed); 734 } else { 735 struct claim_manager *disclaims = 736 &s->pending_disclaims[rte_lcore_id()]; 737 738 if (unlikely(num_entries > s->num_slots)) { 739 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)", 740 num_entries, disclaims->num_claimed); 741 num_entries = disclaims->num_claimed; 742 } 743 744 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim, 745 disclaims->num_claimed); 746 opdl_stage_disclaim_multithread_n(s, num_entries, block); 747 } 748 } 749 750 int 751 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block) 752 { 753 if (num_entries != s->num_event) { 754 rte_errno = EINVAL; 755 return 0; 756 } 757 if (s->threadsafe == false) { 758 rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release); 759 s->seq += s->num_claimed; 760 s->shadow_head = s->head; 761 s->num_claimed = 0; 762 } else { 763 struct claim_manager *disclaims = 764 &s->pending_disclaims[rte_lcore_id()]; 765 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed, 766 block); 767 } 768 return num_entries; 769 } 770 771 uint32_t 772 opdl_ring_available(struct opdl_ring *t) 773 { 774 return opdl_stage_available(&t->stages[0]); 775 } 776 777 uint32_t 778 opdl_stage_available(struct opdl_stage *s) 779 { 780 update_available_seq(s); 781 return available(s); 782 } 783 784 void 785 opdl_ring_flush(struct opdl_ring *t) 786 { 787 struct opdl_stage *s = input_stage(t); 788 789 wait_for_available(s, s->num_slots); 790 } 791 792 /******************** Non performance sensitive functions ********************/ 793 794 /* Initial setup of a new stage's context */ 795 static int 796 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe, 797 bool is_input) 798 { 799 uint32_t available = (is_input) ? t->num_slots : 0; 800 801 s->t = t; 802 s->num_slots = t->num_slots; 803 s->index = t->num_stages; 804 s->threadsafe = threadsafe; 805 s->shared.stage = s; 806 807 /* Alloc memory for deps */ 808 s->dep_tracking = rte_zmalloc_socket(LIB_NAME, 809 t->max_num_stages * sizeof(enum dep_type), 810 0, t->socket); 811 if (s->dep_tracking == NULL) 812 return -ENOMEM; 813 814 s->deps = rte_zmalloc_socket(LIB_NAME, 815 t->max_num_stages * sizeof(struct shared_state *), 816 0, t->socket); 817 if (s->deps == NULL) { 818 rte_free(s->dep_tracking); 819 return -ENOMEM; 820 } 821 822 s->dep_tracking[s->index] = DEP_SELF; 823 824 if (threadsafe == true) 825 s->shared.available_seq = available; 826 else 827 s->available_seq = available; 828 829 return 0; 830 } 831 832 /* Add direct or indirect dependencies between stages */ 833 static int 834 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency, 835 enum dep_type type) 836 { 837 struct opdl_ring *t = dependent->t; 838 uint32_t i; 839 840 /* Add new direct dependency */ 841 if ((type == DEP_DIRECT) && 842 (dependent->dep_tracking[dependency->index] == 843 DEP_NONE)) { 844 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u", 845 t->name, dependent->index, dependency->index); 846 dependent->dep_tracking[dependency->index] = DEP_DIRECT; 847 } 848 849 /* Add new indirect dependency or change direct to indirect */ 850 if ((type == DEP_INDIRECT) && 851 ((dependent->dep_tracking[dependency->index] == 852 DEP_NONE) || 853 (dependent->dep_tracking[dependency->index] == 854 DEP_DIRECT))) { 855 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u", 856 t->name, dependent->index, dependency->index); 857 dependent->dep_tracking[dependency->index] = DEP_INDIRECT; 858 } 859 860 /* Shouldn't happen... */ 861 if ((dependent->dep_tracking[dependency->index] == DEP_SELF) && 862 (dependent != input_stage(t))) { 863 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u", 864 t->name, dependent->index); 865 return -EINVAL; 866 } 867 868 /* Keep going to dependencies of the dependency, until input stage */ 869 if (dependency != input_stage(t)) 870 for (i = 0; i < dependency->num_deps; i++) { 871 int ret = add_dep(dependent, dependency->deps[i]->stage, 872 DEP_INDIRECT); 873 874 if (ret < 0) 875 return ret; 876 } 877 878 /* Make list of sequence numbers for direct dependencies only */ 879 if (type == DEP_DIRECT) 880 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++) 881 if (dependent->dep_tracking[i] == DEP_DIRECT) { 882 if ((i == 0) && (dependent->num_deps > 1)) 883 rte_panic("%s:%u depends on > input", 884 t->name, 885 dependent->index); 886 dependent->deps[dependent->num_deps++] = 887 &t->stages[i].shared; 888 } 889 890 return 0; 891 } 892 893 struct opdl_ring * 894 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size, 895 uint32_t max_num_stages, int socket) 896 { 897 struct opdl_ring *t; 898 char mz_name[RTE_MEMZONE_NAMESIZE]; 899 int mz_flags = 0; 900 struct opdl_stage *st = NULL; 901 const struct rte_memzone *mz = NULL; 902 size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) + 903 (num_slots * slot_size)); 904 905 /* Compile time checking */ 906 RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) != 907 0); 908 RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) & 909 RTE_CACHE_LINE_MASK) != 0); 910 RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) & 911 RTE_CACHE_LINE_MASK) != 0); 912 RTE_BUILD_BUG_ON(!RTE_IS_POWER_OF_2(OPDL_DISCLAIMS_PER_LCORE)); 913 914 /* Parameter checking */ 915 if (name == NULL) { 916 PMD_DRV_LOG(ERR, "name param is NULL"); 917 return NULL; 918 } 919 if (!rte_is_power_of_2(num_slots)) { 920 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2", 921 num_slots, name); 922 return NULL; 923 } 924 925 /* Alloc memory for stages */ 926 st = rte_zmalloc_socket(LIB_NAME, 927 max_num_stages * sizeof(struct opdl_stage), 928 RTE_CACHE_LINE_SIZE, socket); 929 if (st == NULL) 930 goto exit_fail; 931 932 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name); 933 934 /* Alloc memory for memzone */ 935 mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags); 936 if (mz == NULL) 937 goto exit_fail; 938 939 t = mz->addr; 940 941 /* Initialise opdl_ring queue */ 942 memset(t, 0, sizeof(*t)); 943 strlcpy(t->name, name, sizeof(t->name)); 944 t->socket = socket; 945 t->num_slots = num_slots; 946 t->mask = num_slots - 1; 947 t->slot_size = slot_size; 948 t->max_num_stages = max_num_stages; 949 t->stages = st; 950 951 PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)", 952 t->name, t, num_slots, socket, slot_size); 953 954 return t; 955 956 exit_fail: 957 PMD_DRV_LOG(ERR, "Cannot reserve memory"); 958 rte_free(st); 959 rte_memzone_free(mz); 960 961 return NULL; 962 } 963 964 void * 965 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index) 966 { 967 return get_slot(t, index); 968 } 969 970 bool 971 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev, 972 uint32_t index, bool atomic) 973 { 974 uint32_t i = 0, offset; 975 struct opdl_ring *t = s->t; 976 struct rte_event *ev_orig = NULL; 977 bool ev_updated = false; 978 uint64_t ev_temp = 0; 979 uint64_t ev_update = 0; 980 981 uint32_t opa_id = 0; 982 uint32_t flow_id = 0; 983 uint64_t event = 0; 984 985 if (index > s->num_event) { 986 PMD_DRV_LOG(ERR, "index is overflow"); 987 return ev_updated; 988 } 989 990 ev_temp = ev->event & OPDL_EVENT_MASK; 991 992 if (!atomic) { 993 offset = opdl_first_entry_id(s->seq, s->nb_instance, 994 s->instance_id); 995 offset += index*s->nb_instance; 996 ev_orig = get_slot(t, s->shadow_head+offset); 997 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) { 998 ev_orig->event = ev->event; 999 ev_updated = true; 1000 } 1001 if (ev_orig->u64 != ev->u64) { 1002 ev_orig->u64 = ev->u64; 1003 ev_updated = true; 1004 } 1005 1006 } else { 1007 for (i = s->pos; i < s->num_claimed; i++) { 1008 ev_orig = (struct rte_event *) 1009 get_slot(t, s->shadow_head+i); 1010 1011 event = rte_atomic_load_explicit((uint64_t __rte_atomic *)&ev_orig->event, 1012 rte_memory_order_acquire); 1013 1014 opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET); 1015 flow_id = OPDL_FLOWID_MASK & event; 1016 1017 if (opa_id >= s->queue_id) 1018 continue; 1019 1020 if ((flow_id % s->nb_instance) == s->instance_id) { 1021 ev_update = s->queue_id; 1022 ev_update = (ev_update << OPDL_OPA_OFFSET) 1023 | ev->event; 1024 1025 s->pos = i + 1; 1026 1027 if ((event & OPDL_EVENT_MASK) != 1028 ev_temp) { 1029 rte_atomic_store_explicit( 1030 (uint64_t __rte_atomic *)&ev_orig->event, 1031 ev_update, rte_memory_order_release); 1032 ev_updated = true; 1033 } 1034 if (ev_orig->u64 != ev->u64) { 1035 ev_orig->u64 = ev->u64; 1036 ev_updated = true; 1037 } 1038 1039 break; 1040 } 1041 } 1042 1043 } 1044 1045 return ev_updated; 1046 } 1047 1048 int 1049 opdl_ring_get_socket(const struct opdl_ring *t) 1050 { 1051 return t->socket; 1052 } 1053 1054 uint32_t 1055 opdl_ring_get_num_slots(const struct opdl_ring *t) 1056 { 1057 return t->num_slots; 1058 } 1059 1060 const char * 1061 opdl_ring_get_name(const struct opdl_ring *t) 1062 { 1063 return t->name; 1064 } 1065 1066 /* Check dependency list is valid for a given opdl_ring */ 1067 static int 1068 check_deps(struct opdl_ring *t, struct opdl_stage *deps[], 1069 uint32_t num_deps) 1070 { 1071 unsigned int i; 1072 1073 for (i = 0; i < num_deps; ++i) { 1074 if (!deps[i]) { 1075 PMD_DRV_LOG(ERR, "deps[%u] is NULL", i); 1076 return -EINVAL; 1077 } 1078 if (t != deps[i]->t) { 1079 PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s", 1080 i, deps[i]->t->name, t->name); 1081 return -EINVAL; 1082 } 1083 } 1084 1085 return 0; 1086 } 1087 1088 struct opdl_stage * 1089 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input) 1090 { 1091 struct opdl_stage *s; 1092 1093 /* Parameter checking */ 1094 if (!t) { 1095 PMD_DRV_LOG(ERR, "opdl_ring is NULL"); 1096 return NULL; 1097 } 1098 if (t->num_stages == t->max_num_stages) { 1099 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)", 1100 t->name, t->max_num_stages); 1101 return NULL; 1102 } 1103 1104 s = &t->stages[t->num_stages]; 1105 1106 if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0) 1107 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned", 1108 &s->shared, t->name); 1109 1110 if (init_stage(t, s, threadsafe, is_input) < 0) { 1111 PMD_DRV_LOG(ERR, "Cannot reserve memory"); 1112 return NULL; 1113 } 1114 t->num_stages++; 1115 1116 return s; 1117 } 1118 1119 uint32_t 1120 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s, 1121 uint32_t nb_instance, uint32_t instance_id, 1122 struct opdl_stage *deps[], 1123 uint32_t num_deps) 1124 { 1125 uint32_t i; 1126 int ret = 0; 1127 1128 if ((num_deps > 0) && (!deps)) { 1129 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name); 1130 return -1; 1131 } 1132 ret = check_deps(t, deps, num_deps); 1133 if (ret < 0) 1134 return ret; 1135 1136 for (i = 0; i < num_deps; i++) { 1137 ret = add_dep(s, deps[i], DEP_DIRECT); 1138 if (ret < 0) 1139 return ret; 1140 } 1141 1142 s->nb_instance = nb_instance; 1143 s->instance_id = instance_id; 1144 1145 return ret; 1146 } 1147 1148 struct opdl_stage * 1149 opdl_ring_get_input_stage(const struct opdl_ring *t) 1150 { 1151 return input_stage(t); 1152 } 1153 1154 int 1155 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[], 1156 uint32_t num_deps) 1157 { 1158 unsigned int i; 1159 int ret; 1160 1161 if ((num_deps == 0) || (!deps)) { 1162 PMD_DRV_LOG(ERR, "cannot set NULL dependencies"); 1163 return -EINVAL; 1164 } 1165 1166 ret = check_deps(s->t, deps, num_deps); 1167 if (ret < 0) 1168 return ret; 1169 1170 /* Update deps */ 1171 for (i = 0; i < num_deps; i++) 1172 s->deps[i] = &deps[i]->shared; 1173 s->num_deps = num_deps; 1174 1175 return 0; 1176 } 1177 1178 struct opdl_ring * 1179 opdl_stage_get_opdl_ring(const struct opdl_stage *s) 1180 { 1181 return s->t; 1182 } 1183 1184 void 1185 opdl_stage_set_queue_id(struct opdl_stage *s, 1186 uint32_t queue_id) 1187 { 1188 s->queue_id = queue_id; 1189 } 1190 1191 void 1192 opdl_ring_dump(const struct opdl_ring *t, FILE *f) 1193 { 1194 uint32_t i; 1195 1196 if (t == NULL) { 1197 fprintf(f, "NULL OPDL!\n"); 1198 return; 1199 } 1200 fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n", 1201 t->name, t->num_slots, t->mask, t->slot_size, 1202 t->num_stages, t->socket); 1203 for (i = 0; i < t->num_stages; i++) { 1204 uint32_t j; 1205 const struct opdl_stage *s = &t->stages[i]; 1206 1207 fprintf(f, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u", 1208 t->name, i, (s->threadsafe) ? "true" : "false", 1209 (s->threadsafe) ? s->shared.head : s->head, 1210 (s->threadsafe) ? s->shared.available_seq : 1211 s->available_seq, 1212 s->shared.tail, (s->num_deps > 0) ? 1213 s->deps[0]->stage->index : 0); 1214 for (j = 1; j < s->num_deps; j++) 1215 fprintf(f, ",%u", s->deps[j]->stage->index); 1216 fprintf(f, "\n"); 1217 } 1218 fflush(f); 1219 } 1220 1221 void 1222 opdl_ring_free(struct opdl_ring *t) 1223 { 1224 uint32_t i; 1225 const struct rte_memzone *mz; 1226 char mz_name[RTE_MEMZONE_NAMESIZE]; 1227 1228 if (t == NULL) { 1229 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!"); 1230 return; 1231 } 1232 1233 PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t); 1234 1235 for (i = 0; i < t->num_stages; ++i) { 1236 rte_free(t->stages[i].deps); 1237 rte_free(t->stages[i].dep_tracking); 1238 } 1239 1240 rte_free(t->stages); 1241 1242 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name); 1243 mz = rte_memzone_lookup(mz_name); 1244 if (rte_memzone_free(mz) != 0) 1245 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name); 1246 } 1247 1248 /* search a opdl_ring from its name */ 1249 struct opdl_ring * 1250 opdl_ring_lookup(const char *name) 1251 { 1252 const struct rte_memzone *mz; 1253 char mz_name[RTE_MEMZONE_NAMESIZE]; 1254 1255 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name); 1256 1257 mz = rte_memzone_lookup(mz_name); 1258 if (mz == NULL) 1259 return NULL; 1260 1261 return mz->addr; 1262 } 1263 1264 void 1265 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe) 1266 { 1267 s->threadsafe = threadsafe; 1268 } 1269