xref: /dpdk/drivers/event/opdl/opdl_ring.c (revision 29911b323e7a4200b95e2049df08779c0673fbfc)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <stdbool.h>
6 #include <stddef.h>
7 #include <stdint.h>
8 #include <stdio.h>
9 
10 #include <rte_string_fns.h>
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
14 #include <rte_log.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_atomic.h>
20 
21 #include "opdl_ring.h"
22 #include "opdl_log.h"
23 
24 #define LIB_NAME "opdl_ring"
25 
26 #define OPDL_NAME_SIZE 64
27 
28 
29 #define OPDL_EVENT_MASK  (0x00000000000FFFFFULL)
30 #define OPDL_FLOWID_MASK (0xFFFFF)
31 #define OPDL_OPA_MASK    (0xFF)
32 #define OPDL_OPA_OFFSET  (0x38)
33 
34 /* Types of dependency between stages */
35 enum dep_type {
36 	DEP_NONE = 0,  /* no dependency */
37 	DEP_DIRECT,  /* stage has direct dependency */
38 	DEP_INDIRECT,  /* in-direct dependency through other stage(s) */
39 	DEP_SELF,  /* stage dependency on itself, used to detect loops */
40 };
41 
42 /* Shared section of stage state.
43  * Care is needed when accessing and the layout is important, especially to
44  * limit the adjacent cache-line HW prefetcher from impacting performance.
45  */
46 struct __rte_cache_aligned shared_state {
47 	/* Last known minimum sequence number of dependencies, used for multi
48 	 * thread operation
49 	 */
50 	RTE_ATOMIC(uint32_t) available_seq;
51 	char _pad1[RTE_CACHE_LINE_SIZE * 3];
52 	RTE_ATOMIC(uint32_t) head;  /* Head sequence number (for multi thread operation) */
53 	char _pad2[RTE_CACHE_LINE_SIZE * 3];
54 	struct opdl_stage *stage;  /* back pointer */
55 	RTE_ATOMIC(uint32_t) tail;  /* Tail sequence number */
56 	char _pad3[RTE_CACHE_LINE_SIZE * 2];
57 };
58 
59 /* A structure to keep track of "unfinished" claims. This is only used for
60  * stages that are threadsafe. Each lcore accesses its own instance of this
61  * structure to record the entries it has claimed. This allows one lcore to make
62  * multiple claims without being blocked by another. When disclaiming it moves
63  * forward the shared tail when the shared tail matches the tail value recorded
64  * here.
65  */
66 struct __rte_cache_aligned claim_manager {
67 	uint32_t num_to_disclaim;
68 	uint32_t num_claimed;
69 	uint32_t mgr_head;
70 	uint32_t mgr_tail;
71 	struct {
72 		uint32_t head;
73 		uint32_t tail;
74 	} claims[OPDL_DISCLAIMS_PER_LCORE];
75 };
76 
77 /* Context for each stage of opdl_ring.
78  * Calculations on sequence numbers need to be done with other uint32_t values
79  * so that results are modulus 2^32, and not undefined.
80  */
81 struct __rte_cache_aligned opdl_stage {
82 	struct opdl_ring *t;  /* back pointer, set at init */
83 	uint32_t num_slots;  /* Number of slots for entries, set at init */
84 	uint32_t index;  /* ID for this stage, set at init */
85 	bool threadsafe;  /* Set to 1 if this stage supports threadsafe use */
86 	/* Last known min seq number of dependencies for used for single thread
87 	 * operation
88 	 */
89 	uint32_t available_seq;
90 	uint32_t head;  /* Current head for single-thread operation */
91 	uint32_t nb_instance;  /* Number of instances */
92 	uint32_t instance_id;  /* ID of this stage instance */
93 	uint16_t num_claimed;  /* Number of slots claimed */
94 	uint16_t num_event;		/* Number of events */
95 	uint32_t seq;			/* sequence number  */
96 	uint32_t num_deps;  /* Number of direct dependencies */
97 	/* Keep track of all dependencies, used during init only */
98 	enum dep_type *dep_tracking;
99 	/* Direct dependencies of this stage */
100 	struct shared_state **deps;
101 	/* Other stages read this! */
102 	alignas(RTE_CACHE_LINE_SIZE) struct shared_state shared;
103 	/* For managing disclaims in multi-threaded processing stages */
104 	alignas(RTE_CACHE_LINE_SIZE) struct claim_manager pending_disclaims[RTE_MAX_LCORE];
105 	uint32_t shadow_head;  /* Shadow head for single-thread operation */
106 	uint32_t queue_id;     /* ID of Queue which is assigned to this stage */
107 	uint32_t pos;		/* Atomic scan position */
108 };
109 
110 /* Context for opdl_ring */
111 struct opdl_ring {
112 	char name[OPDL_NAME_SIZE];  /* OPDL queue instance name */
113 	int socket;  /* NUMA socket that memory is allocated on */
114 	uint32_t num_slots;  /* Number of slots for entries */
115 	uint32_t mask;  /* Mask for sequence numbers (num_slots - 1) */
116 	uint32_t slot_size;  /* Size of each slot in bytes */
117 	uint32_t num_stages;  /* Number of stages that have been added */
118 	uint32_t max_num_stages;  /* Max number of stages */
119 	/* Stages indexed by ID */
120 	struct opdl_stage *stages;
121 	/* Memory for storing slot data */
122 	alignas(RTE_CACHE_LINE_SIZE) uint8_t slots[];
123 };
124 
125 
126 /* Return input stage of a opdl_ring */
127 static __rte_always_inline struct opdl_stage *
128 input_stage(const struct opdl_ring *t)
129 {
130 	return &t->stages[0];
131 }
132 
133 /* Check if a stage is the input stage */
134 static __rte_always_inline bool
135 is_input_stage(const struct opdl_stage *s)
136 {
137 	return s->index == 0;
138 }
139 
140 /* Get slot pointer from sequence number */
141 static __rte_always_inline void *
142 get_slot(const struct opdl_ring *t, uint32_t n)
143 {
144 	return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
145 }
146 
147 /* Find how many entries are available for processing */
148 static __rte_always_inline uint32_t
149 available(const struct opdl_stage *s)
150 {
151 	if (s->threadsafe == true) {
152 		uint32_t n = rte_atomic_load_explicit(&s->shared.available_seq,
153 				rte_memory_order_acquire) -
154 				rte_atomic_load_explicit(&s->shared.head,
155 				rte_memory_order_acquire);
156 
157 		/* Return 0 if available_seq needs to be updated */
158 		return (n <= s->num_slots) ? n : 0;
159 	}
160 
161 	/* Single threaded */
162 	return s->available_seq - s->head;
163 }
164 
165 /* Read sequence number of dependencies and find minimum */
166 static __rte_always_inline void
167 update_available_seq(struct opdl_stage *s)
168 {
169 	uint32_t i;
170 	uint32_t this_tail = s->shared.tail;
171 	uint32_t min_seq = rte_atomic_load_explicit(&s->deps[0]->tail, rte_memory_order_acquire);
172 	/* Input stage sequence numbers are greater than the sequence numbers of
173 	 * its dependencies so an offset of t->num_slots is needed when
174 	 * calculating available slots and also the condition which is used to
175 	 * determine the dependencies minimum sequence number must be reverted.
176 	 */
177 	uint32_t wrap;
178 
179 	if (is_input_stage(s)) {
180 		wrap = s->num_slots;
181 		for (i = 1; i < s->num_deps; i++) {
182 			uint32_t seq = rte_atomic_load_explicit(&s->deps[i]->tail,
183 					rte_memory_order_acquire);
184 			if ((this_tail - seq) > (this_tail - min_seq))
185 				min_seq = seq;
186 		}
187 	} else {
188 		wrap = 0;
189 		for (i = 1; i < s->num_deps; i++) {
190 			uint32_t seq = rte_atomic_load_explicit(&s->deps[i]->tail,
191 					rte_memory_order_acquire);
192 			if ((seq - this_tail) < (min_seq - this_tail))
193 				min_seq = seq;
194 		}
195 	}
196 
197 	if (s->threadsafe == false)
198 		s->available_seq = min_seq + wrap;
199 	else
200 		rte_atomic_store_explicit(&s->shared.available_seq, min_seq + wrap,
201 				rte_memory_order_release);
202 }
203 
204 /* Wait until the number of available slots reaches number requested */
205 static __rte_always_inline void
206 wait_for_available(struct opdl_stage *s, uint32_t n)
207 {
208 	while (available(s) < n) {
209 		rte_pause();
210 		update_available_seq(s);
211 	}
212 }
213 
214 /* Return number of slots to process based on number requested and mode */
215 static __rte_always_inline uint32_t
216 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
217 {
218 	/* Don't read tail sequences of dependencies if not needed */
219 	if (available(s) >= n)
220 		return n;
221 
222 	update_available_seq(s);
223 
224 	if (block == false) {
225 		uint32_t avail = available(s);
226 
227 		if (avail == 0) {
228 			rte_pause();
229 			return 0;
230 		}
231 		return (avail <= n) ? avail : n;
232 	}
233 
234 	if (unlikely(n > s->num_slots)) {
235 		PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
236 				n, s->num_slots);
237 		return 0;  /* Avoid infinite loop */
238 	}
239 	/* blocking */
240 	wait_for_available(s, n);
241 	return n;
242 }
243 
244 /* Copy entries in to slots with wrap-around */
245 static __rte_always_inline void
246 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
247 		uint32_t num_entries)
248 {
249 	uint32_t slot_size = t->slot_size;
250 	uint32_t slot_index = start & t->mask;
251 
252 	if (slot_index + num_entries <= t->num_slots) {
253 		rte_memcpy(get_slot(t, start), entries,
254 				num_entries * slot_size);
255 	} else {
256 		uint32_t split = t->num_slots - slot_index;
257 
258 		rte_memcpy(get_slot(t, start), entries, split * slot_size);
259 		rte_memcpy(get_slot(t, 0),
260 				RTE_PTR_ADD(entries, split * slot_size),
261 				(num_entries - split) * slot_size);
262 	}
263 }
264 
265 /* Copy entries out from slots with wrap-around */
266 static __rte_always_inline void
267 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
268 		uint32_t num_entries)
269 {
270 	uint32_t slot_size = t->slot_size;
271 	uint32_t slot_index = start & t->mask;
272 
273 	if (slot_index + num_entries <= t->num_slots) {
274 		rte_memcpy(entries, get_slot(t, start),
275 				num_entries * slot_size);
276 	} else {
277 		uint32_t split = t->num_slots - slot_index;
278 
279 		rte_memcpy(entries, get_slot(t, start), split * slot_size);
280 		rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
281 				get_slot(t, 0),
282 				(num_entries - split) * slot_size);
283 	}
284 }
285 
286 /* Input function optimised for single thread */
287 static __rte_always_inline uint32_t
288 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
289 		uint32_t num_entries, bool block)
290 {
291 	struct opdl_stage *s = input_stage(t);
292 	uint32_t head = s->head;
293 
294 	num_entries = num_to_process(s, num_entries, block);
295 	if (num_entries == 0)
296 		return 0;
297 
298 	copy_entries_in(t, head, entries, num_entries);
299 
300 	s->head += num_entries;
301 	rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release);
302 
303 	return num_entries;
304 }
305 
306 /* Convert head and tail of claim_manager into valid index */
307 static __rte_always_inline uint32_t
308 claim_mgr_index(uint32_t n)
309 {
310 	return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
311 }
312 
313 /* Check if there are available slots in claim_manager */
314 static __rte_always_inline bool
315 claim_mgr_available(struct claim_manager *mgr)
316 {
317 	return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
318 			true : false;
319 }
320 
321 /* Record a new claim. Only use after first checking an entry is available */
322 static __rte_always_inline void
323 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
324 {
325 	if ((mgr->mgr_head != mgr->mgr_tail) &&
326 			(mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
327 			tail)) {
328 		/* Combine with previous claim */
329 		mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
330 	} else {
331 		mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
332 		mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
333 		mgr->mgr_head++;
334 	}
335 
336 	mgr->num_claimed += (head - tail);
337 }
338 
339 /* Read the oldest recorded claim */
340 static __rte_always_inline bool
341 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
342 {
343 	if (mgr->mgr_head == mgr->mgr_tail)
344 		return false;
345 
346 	*head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
347 	*tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
348 	return true;
349 }
350 
351 /* Remove the oldest recorded claim. Only use after first reading the entry */
352 static __rte_always_inline void
353 claim_mgr_remove(struct claim_manager *mgr)
354 {
355 	mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
356 			mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
357 	mgr->mgr_tail++;
358 }
359 
360 /* Update tail in the oldest claim. Only use after first reading the entry */
361 static __rte_always_inline void
362 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
363 {
364 	mgr->num_claimed -= num_entries;
365 	mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
366 }
367 
368 static __rte_always_inline void
369 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
370 		uint32_t num_entries, bool block)
371 {
372 	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
373 	uint32_t head;
374 	uint32_t tail;
375 
376 	while (num_entries) {
377 		bool ret = claim_mgr_read(disclaims, &tail, &head);
378 
379 		if (ret == false)
380 			break;  /* nothing is claimed */
381 		/* There should be no race condition here. If shared.tail
382 		 * matches, no other core can update it until this one does.
383 		 */
384 		if (rte_atomic_load_explicit(&s->shared.tail, rte_memory_order_acquire) ==
385 				tail) {
386 			if (num_entries >= (head - tail)) {
387 				claim_mgr_remove(disclaims);
388 				rte_atomic_store_explicit(&s->shared.tail, head,
389 						rte_memory_order_release);
390 				num_entries -= (head - tail);
391 			} else {
392 				claim_mgr_move_tail(disclaims, num_entries);
393 				rte_atomic_store_explicit(&s->shared.tail,
394 						num_entries + tail,
395 						rte_memory_order_release);
396 				num_entries = 0;
397 			}
398 		} else if (block == false)
399 			break;  /* blocked by other thread */
400 		/* Keep going until num_entries are disclaimed. */
401 		rte_pause();
402 	}
403 
404 	disclaims->num_to_disclaim = num_entries;
405 }
406 
407 /* Move head atomically, returning number of entries available to process and
408  * the original value of head. For non-input stages, the claim is recorded
409  * so that the tail can be updated later by opdl_stage_disclaim().
410  */
411 static __rte_always_inline void
412 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
413 		uint32_t *old_head, bool block, bool claim_func)
414 {
415 	uint32_t orig_num_entries = *num_entries;
416 	uint32_t ret;
417 	struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
418 
419 	/* Attempt to disclaim any outstanding claims */
420 	opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
421 			false);
422 
423 	*old_head = rte_atomic_load_explicit(&s->shared.head, rte_memory_order_acquire);
424 	while (true) {
425 		bool success;
426 		/* If called by opdl_ring_input(), claim does not need to be
427 		 * recorded, as there will be no disclaim.
428 		 */
429 		if (claim_func) {
430 			/* Check that the claim can be recorded */
431 			ret = claim_mgr_available(disclaims);
432 			if (ret == false) {
433 				/* exit out if claim can't be recorded */
434 				*num_entries = 0;
435 				return;
436 			}
437 		}
438 
439 		*num_entries = num_to_process(s, orig_num_entries, block);
440 		if (*num_entries == 0)
441 			return;
442 
443 		success = rte_atomic_compare_exchange_weak_explicit(&s->shared.head, old_head,
444 				*old_head + *num_entries,
445 				rte_memory_order_release,  /* memory order on success */
446 				rte_memory_order_acquire);  /* memory order on fail */
447 		if (likely(success))
448 			break;
449 		rte_pause();
450 	}
451 
452 	if (claim_func)
453 		/* Store the claim record */
454 		claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
455 }
456 
457 /* Input function that supports multiple threads */
458 static __rte_always_inline uint32_t
459 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
460 		uint32_t num_entries, bool block)
461 {
462 	struct opdl_stage *s = input_stage(t);
463 	uint32_t old_head;
464 
465 	move_head_atomically(s, &num_entries, &old_head, block, false);
466 	if (num_entries == 0)
467 		return 0;
468 
469 	copy_entries_in(t, old_head, entries, num_entries);
470 
471 	/* If another thread started inputting before this one, but hasn't
472 	 * finished, we need to wait for it to complete to update the tail.
473 	 */
474 	rte_wait_until_equal_32((uint32_t *)(uintptr_t)&s->shared.tail, old_head,
475 			rte_memory_order_acquire);
476 
477 	rte_atomic_store_explicit(&s->shared.tail, old_head + num_entries,
478 			rte_memory_order_release);
479 
480 	return num_entries;
481 }
482 
483 static __rte_always_inline uint32_t
484 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
485 		uint8_t this_lcore)
486 {
487 	return ((nb_p_lcores <= 1) ? 0 :
488 			(nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
489 			nb_p_lcores);
490 }
491 
492 /* Claim slots to process, optimised for single-thread operation */
493 static __rte_always_inline uint32_t
494 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
495 		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
496 {
497 	uint32_t i = 0, j = 0,  offset;
498 	uint32_t opa_id   = 0;
499 	uint32_t flow_id  = 0;
500 	uint64_t event    = 0;
501 	void *get_slots;
502 	struct rte_event *ev;
503 	RTE_SET_USED(seq);
504 	struct opdl_ring *t = s->t;
505 	uint8_t *entries_offset = (uint8_t *)entries;
506 
507 	if (!atomic) {
508 
509 		offset = opdl_first_entry_id(s->seq, s->nb_instance,
510 				s->instance_id);
511 
512 		num_entries = s->nb_instance * num_entries;
513 
514 		num_entries = num_to_process(s, num_entries, block);
515 
516 		for (; offset < num_entries; offset += s->nb_instance) {
517 			get_slots = get_slot(t, s->head + offset);
518 			memcpy(entries_offset, get_slots, t->slot_size);
519 			entries_offset += t->slot_size;
520 			i++;
521 		}
522 	} else {
523 		num_entries = num_to_process(s, num_entries, block);
524 
525 		for (j = 0; j < num_entries; j++) {
526 			ev = (struct rte_event *)get_slot(t, s->head+j);
527 
528 			event  = rte_atomic_load_explicit((uint64_t __rte_atomic *)&ev->event,
529 					rte_memory_order_acquire);
530 
531 			opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
532 			flow_id  = OPDL_FLOWID_MASK & event;
533 
534 			if (opa_id >= s->queue_id)
535 				continue;
536 
537 			if ((flow_id % s->nb_instance) == s->instance_id) {
538 				memcpy(entries_offset, ev, t->slot_size);
539 				entries_offset += t->slot_size;
540 				i++;
541 			}
542 		}
543 	}
544 	s->shadow_head = s->head;
545 	s->head += num_entries;
546 	s->num_claimed = num_entries;
547 	s->num_event = i;
548 	s->pos = 0;
549 
550 	/* automatically disclaim entries if number of rte_events is zero */
551 	if (unlikely(i == 0))
552 		opdl_stage_disclaim(s, 0, false);
553 
554 	return i;
555 }
556 
557 /* Thread-safe version of function to claim slots for processing */
558 static __rte_always_inline uint32_t
559 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
560 		uint32_t num_entries, uint32_t *seq, bool block)
561 {
562 	uint32_t old_head;
563 	struct opdl_ring *t = s->t;
564 	uint32_t i = 0, offset;
565 	uint8_t *entries_offset = (uint8_t *)entries;
566 
567 	if (seq == NULL) {
568 		PMD_DRV_LOG(ERR, "Invalid seq PTR");
569 		return 0;
570 	}
571 	offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
572 	num_entries = offset + (s->nb_instance * num_entries);
573 
574 	move_head_atomically(s, &num_entries, &old_head, block, true);
575 
576 	for (; offset < num_entries; offset += s->nb_instance) {
577 		memcpy(entries_offset, get_slot(t, s->head + offset),
578 			t->slot_size);
579 		entries_offset += t->slot_size;
580 		i++;
581 	}
582 
583 	*seq = old_head;
584 
585 	return i;
586 }
587 
588 /* Claim and copy slot pointers, optimised for single-thread operation */
589 static __rte_always_inline uint32_t
590 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
591 		uint32_t num_entries, uint32_t *seq, bool block)
592 {
593 	num_entries = num_to_process(s, num_entries, block);
594 	if (num_entries == 0)
595 		return 0;
596 	copy_entries_out(s->t, s->head, entries, num_entries);
597 	if (seq != NULL)
598 		*seq = s->head;
599 	s->head += num_entries;
600 	return num_entries;
601 }
602 
603 /* Thread-safe version of function to claim and copy pointers to slots */
604 static __rte_always_inline uint32_t
605 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
606 		uint32_t num_entries, uint32_t *seq, bool block)
607 {
608 	uint32_t old_head;
609 
610 	move_head_atomically(s, &num_entries, &old_head, block, true);
611 	if (num_entries == 0)
612 		return 0;
613 	copy_entries_out(s->t, old_head, entries, num_entries);
614 	if (seq != NULL)
615 		*seq = old_head;
616 	return num_entries;
617 }
618 
619 static __rte_always_inline void
620 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
621 		uint32_t num_entries)
622 {
623 	uint32_t old_tail = s->shared.tail;
624 
625 	if (unlikely(num_entries > (s->head - old_tail))) {
626 		PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
627 				num_entries, s->head - old_tail);
628 		num_entries = s->head - old_tail;
629 	}
630 	rte_atomic_store_explicit(&s->shared.tail, num_entries + old_tail,
631 			rte_memory_order_release);
632 }
633 
634 uint32_t
635 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
636 		bool block)
637 {
638 	if (input_stage(t)->threadsafe == false)
639 		return opdl_ring_input_singlethread(t, entries, num_entries,
640 				block);
641 	else
642 		return opdl_ring_input_multithread(t, entries, num_entries,
643 				block);
644 }
645 
646 uint32_t
647 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
648 		const void *entries, uint32_t num_entries, bool block)
649 {
650 	uint32_t head = s->head;
651 
652 	num_entries = num_to_process(s, num_entries, block);
653 
654 	if (num_entries == 0)
655 		return 0;
656 
657 	copy_entries_in(t, head, entries, num_entries);
658 
659 	s->head += num_entries;
660 	rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release);
661 
662 	return num_entries;
663 
664 }
665 
666 uint32_t
667 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
668 		void *entries, uint32_t num_entries, bool block)
669 {
670 	uint32_t head = s->head;
671 
672 	num_entries = num_to_process(s, num_entries, block);
673 	if (num_entries == 0)
674 		return 0;
675 
676 	copy_entries_out(t, head, entries, num_entries);
677 
678 	s->head += num_entries;
679 	rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release);
680 
681 	return num_entries;
682 }
683 
684 uint32_t
685 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
686 {
687 	/* return (num_to_process(s, num_entries, false)); */
688 
689 	if (available(s) >= num_entries)
690 		return num_entries;
691 
692 	update_available_seq(s);
693 
694 	uint32_t avail = available(s);
695 
696 	if (avail == 0) {
697 		rte_pause();
698 		return 0;
699 	}
700 	return (avail <= num_entries) ? avail : num_entries;
701 }
702 
703 uint32_t
704 opdl_stage_claim(struct opdl_stage *s, void *entries,
705 		uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
706 {
707 	if (s->threadsafe == false)
708 		return opdl_stage_claim_singlethread(s, entries, num_entries,
709 				seq, block, atomic);
710 	else
711 		return opdl_stage_claim_multithread(s, entries, num_entries,
712 				seq, block);
713 }
714 
715 uint32_t
716 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
717 		uint32_t num_entries, uint32_t *seq, bool block)
718 {
719 	if (s->threadsafe == false)
720 		return opdl_stage_claim_copy_singlethread(s, entries,
721 				num_entries, seq, block);
722 	else
723 		return opdl_stage_claim_copy_multithread(s, entries,
724 				num_entries, seq, block);
725 }
726 
727 void
728 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
729 		bool block)
730 {
731 
732 	if (s->threadsafe == false) {
733 		opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
734 	} else {
735 		struct claim_manager *disclaims =
736 			&s->pending_disclaims[rte_lcore_id()];
737 
738 		if (unlikely(num_entries > s->num_slots)) {
739 			PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
740 					num_entries, disclaims->num_claimed);
741 			num_entries = disclaims->num_claimed;
742 		}
743 
744 		num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
745 				disclaims->num_claimed);
746 		opdl_stage_disclaim_multithread_n(s, num_entries, block);
747 	}
748 }
749 
750 int
751 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
752 {
753 	if (num_entries != s->num_event) {
754 		rte_errno = EINVAL;
755 		return 0;
756 	}
757 	if (s->threadsafe == false) {
758 		rte_atomic_store_explicit(&s->shared.tail, s->head, rte_memory_order_release);
759 		s->seq += s->num_claimed;
760 		s->shadow_head = s->head;
761 		s->num_claimed = 0;
762 	} else {
763 		struct claim_manager *disclaims =
764 				&s->pending_disclaims[rte_lcore_id()];
765 		opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
766 				block);
767 	}
768 	return num_entries;
769 }
770 
771 uint32_t
772 opdl_ring_available(struct opdl_ring *t)
773 {
774 	return opdl_stage_available(&t->stages[0]);
775 }
776 
777 uint32_t
778 opdl_stage_available(struct opdl_stage *s)
779 {
780 	update_available_seq(s);
781 	return available(s);
782 }
783 
784 void
785 opdl_ring_flush(struct opdl_ring *t)
786 {
787 	struct opdl_stage *s = input_stage(t);
788 
789 	wait_for_available(s, s->num_slots);
790 }
791 
792 /******************** Non performance sensitive functions ********************/
793 
794 /* Initial setup of a new stage's context */
795 static int
796 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
797 		bool is_input)
798 {
799 	uint32_t available = (is_input) ? t->num_slots : 0;
800 
801 	s->t = t;
802 	s->num_slots = t->num_slots;
803 	s->index = t->num_stages;
804 	s->threadsafe = threadsafe;
805 	s->shared.stage = s;
806 
807 	/* Alloc memory for deps */
808 	s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
809 			t->max_num_stages * sizeof(enum dep_type),
810 			0, t->socket);
811 	if (s->dep_tracking == NULL)
812 		return -ENOMEM;
813 
814 	s->deps = rte_zmalloc_socket(LIB_NAME,
815 			t->max_num_stages * sizeof(struct shared_state *),
816 			0, t->socket);
817 	if (s->deps == NULL) {
818 		rte_free(s->dep_tracking);
819 		return -ENOMEM;
820 	}
821 
822 	s->dep_tracking[s->index] = DEP_SELF;
823 
824 	if (threadsafe == true)
825 		s->shared.available_seq = available;
826 	else
827 		s->available_seq = available;
828 
829 	return 0;
830 }
831 
832 /* Add direct or indirect dependencies between stages */
833 static int
834 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
835 		enum dep_type type)
836 {
837 	struct opdl_ring *t = dependent->t;
838 	uint32_t i;
839 
840 	/* Add new direct dependency */
841 	if ((type == DEP_DIRECT) &&
842 			(dependent->dep_tracking[dependency->index] ==
843 					DEP_NONE)) {
844 		PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
845 				t->name, dependent->index, dependency->index);
846 		dependent->dep_tracking[dependency->index] = DEP_DIRECT;
847 	}
848 
849 	/* Add new indirect dependency or change direct to indirect */
850 	if ((type == DEP_INDIRECT) &&
851 			((dependent->dep_tracking[dependency->index] ==
852 			DEP_NONE) ||
853 			(dependent->dep_tracking[dependency->index] ==
854 			DEP_DIRECT))) {
855 		PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
856 				t->name, dependent->index, dependency->index);
857 		dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
858 	}
859 
860 	/* Shouldn't happen... */
861 	if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
862 			(dependent != input_stage(t))) {
863 		PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
864 				t->name, dependent->index);
865 		return -EINVAL;
866 	}
867 
868 	/* Keep going to dependencies of the dependency, until input stage */
869 	if (dependency != input_stage(t))
870 		for (i = 0; i < dependency->num_deps; i++) {
871 			int ret = add_dep(dependent, dependency->deps[i]->stage,
872 					DEP_INDIRECT);
873 
874 			if (ret < 0)
875 				return ret;
876 		}
877 
878 	/* Make list of sequence numbers for direct dependencies only */
879 	if (type == DEP_DIRECT)
880 		for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
881 			if (dependent->dep_tracking[i] == DEP_DIRECT) {
882 				if ((i == 0) && (dependent->num_deps > 1))
883 					rte_panic("%s:%u depends on > input",
884 							t->name,
885 							dependent->index);
886 				dependent->deps[dependent->num_deps++] =
887 						&t->stages[i].shared;
888 			}
889 
890 	return 0;
891 }
892 
893 struct opdl_ring *
894 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
895 		uint32_t max_num_stages, int socket)
896 {
897 	struct opdl_ring *t;
898 	char mz_name[RTE_MEMZONE_NAMESIZE];
899 	int mz_flags = 0;
900 	struct opdl_stage *st = NULL;
901 	const struct rte_memzone *mz = NULL;
902 	size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
903 			(num_slots * slot_size));
904 
905 	/* Compile time checking */
906 	RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
907 			0);
908 	RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
909 			RTE_CACHE_LINE_MASK) != 0);
910 	RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
911 			RTE_CACHE_LINE_MASK) != 0);
912 	RTE_BUILD_BUG_ON(!RTE_IS_POWER_OF_2(OPDL_DISCLAIMS_PER_LCORE));
913 
914 	/* Parameter checking */
915 	if (name == NULL) {
916 		PMD_DRV_LOG(ERR, "name param is NULL");
917 		return NULL;
918 	}
919 	if (!rte_is_power_of_2(num_slots)) {
920 		PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
921 				num_slots, name);
922 		return NULL;
923 	}
924 
925 	/* Alloc memory for stages */
926 	st = rte_zmalloc_socket(LIB_NAME,
927 		max_num_stages * sizeof(struct opdl_stage),
928 		RTE_CACHE_LINE_SIZE, socket);
929 	if (st == NULL)
930 		goto exit_fail;
931 
932 	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
933 
934 	/* Alloc memory for memzone */
935 	mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
936 	if (mz == NULL)
937 		goto exit_fail;
938 
939 	t = mz->addr;
940 
941 	/* Initialise opdl_ring queue */
942 	memset(t, 0, sizeof(*t));
943 	strlcpy(t->name, name, sizeof(t->name));
944 	t->socket = socket;
945 	t->num_slots = num_slots;
946 	t->mask = num_slots - 1;
947 	t->slot_size = slot_size;
948 	t->max_num_stages = max_num_stages;
949 	t->stages = st;
950 
951 	PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
952 			t->name, t, num_slots, socket, slot_size);
953 
954 	return t;
955 
956 exit_fail:
957 	PMD_DRV_LOG(ERR, "Cannot reserve memory");
958 	rte_free(st);
959 	rte_memzone_free(mz);
960 
961 	return NULL;
962 }
963 
964 void *
965 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
966 {
967 	return get_slot(t, index);
968 }
969 
970 bool
971 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
972 		uint32_t index, bool atomic)
973 {
974 	uint32_t i = 0, offset;
975 	struct opdl_ring *t = s->t;
976 	struct rte_event *ev_orig = NULL;
977 	bool ev_updated = false;
978 	uint64_t ev_temp    = 0;
979 	uint64_t ev_update  = 0;
980 
981 	uint32_t opa_id   = 0;
982 	uint32_t flow_id  = 0;
983 	uint64_t event    = 0;
984 
985 	if (index > s->num_event) {
986 		PMD_DRV_LOG(ERR, "index is overflow");
987 		return ev_updated;
988 	}
989 
990 	ev_temp = ev->event & OPDL_EVENT_MASK;
991 
992 	if (!atomic) {
993 		offset = opdl_first_entry_id(s->seq, s->nb_instance,
994 				s->instance_id);
995 		offset += index*s->nb_instance;
996 		ev_orig = get_slot(t, s->shadow_head+offset);
997 		if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
998 			ev_orig->event = ev->event;
999 			ev_updated = true;
1000 		}
1001 		if (ev_orig->u64 != ev->u64) {
1002 			ev_orig->u64 = ev->u64;
1003 			ev_updated = true;
1004 		}
1005 
1006 	} else {
1007 		for (i = s->pos; i < s->num_claimed; i++) {
1008 			ev_orig = (struct rte_event *)
1009 				get_slot(t, s->shadow_head+i);
1010 
1011 			event  = rte_atomic_load_explicit((uint64_t __rte_atomic *)&ev_orig->event,
1012 					rte_memory_order_acquire);
1013 
1014 			opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1015 			flow_id  = OPDL_FLOWID_MASK & event;
1016 
1017 			if (opa_id >= s->queue_id)
1018 				continue;
1019 
1020 			if ((flow_id % s->nb_instance) == s->instance_id) {
1021 				ev_update = s->queue_id;
1022 				ev_update = (ev_update << OPDL_OPA_OFFSET)
1023 					| ev->event;
1024 
1025 				s->pos = i + 1;
1026 
1027 				if ((event & OPDL_EVENT_MASK) !=
1028 						ev_temp) {
1029 					rte_atomic_store_explicit(
1030 						(uint64_t __rte_atomic *)&ev_orig->event,
1031 						ev_update, rte_memory_order_release);
1032 					ev_updated = true;
1033 				}
1034 				if (ev_orig->u64 != ev->u64) {
1035 					ev_orig->u64 = ev->u64;
1036 					ev_updated = true;
1037 				}
1038 
1039 				break;
1040 			}
1041 		}
1042 
1043 	}
1044 
1045 	return ev_updated;
1046 }
1047 
1048 int
1049 opdl_ring_get_socket(const struct opdl_ring *t)
1050 {
1051 	return t->socket;
1052 }
1053 
1054 uint32_t
1055 opdl_ring_get_num_slots(const struct opdl_ring *t)
1056 {
1057 	return t->num_slots;
1058 }
1059 
1060 const char *
1061 opdl_ring_get_name(const struct opdl_ring *t)
1062 {
1063 	return t->name;
1064 }
1065 
1066 /* Check dependency list is valid for a given opdl_ring */
1067 static int
1068 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1069 		uint32_t num_deps)
1070 {
1071 	unsigned int i;
1072 
1073 	for (i = 0; i < num_deps; ++i) {
1074 		if (!deps[i]) {
1075 			PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1076 			return -EINVAL;
1077 		}
1078 		if (t != deps[i]->t) {
1079 			PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1080 					i, deps[i]->t->name, t->name);
1081 			return -EINVAL;
1082 		}
1083 	}
1084 
1085 	return 0;
1086 }
1087 
1088 struct opdl_stage *
1089 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1090 {
1091 	struct opdl_stage *s;
1092 
1093 	/* Parameter checking */
1094 	if (!t) {
1095 		PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1096 		return NULL;
1097 	}
1098 	if (t->num_stages == t->max_num_stages) {
1099 		PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1100 				t->name, t->max_num_stages);
1101 		return NULL;
1102 	}
1103 
1104 	s = &t->stages[t->num_stages];
1105 
1106 	if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1107 		PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1108 				&s->shared, t->name);
1109 
1110 	if (init_stage(t, s, threadsafe, is_input) < 0) {
1111 		PMD_DRV_LOG(ERR, "Cannot reserve memory");
1112 		return NULL;
1113 	}
1114 	t->num_stages++;
1115 
1116 	return s;
1117 }
1118 
1119 uint32_t
1120 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1121 		uint32_t nb_instance, uint32_t instance_id,
1122 		struct opdl_stage *deps[],
1123 		uint32_t num_deps)
1124 {
1125 	uint32_t i;
1126 	int ret = 0;
1127 
1128 	if ((num_deps > 0) && (!deps)) {
1129 		PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1130 		return -1;
1131 	}
1132 	ret = check_deps(t, deps, num_deps);
1133 	if (ret < 0)
1134 		return ret;
1135 
1136 	for (i = 0; i < num_deps; i++) {
1137 		ret = add_dep(s, deps[i], DEP_DIRECT);
1138 		if (ret < 0)
1139 			return ret;
1140 	}
1141 
1142 	s->nb_instance = nb_instance;
1143 	s->instance_id = instance_id;
1144 
1145 	return ret;
1146 }
1147 
1148 struct opdl_stage *
1149 opdl_ring_get_input_stage(const struct opdl_ring *t)
1150 {
1151 	return input_stage(t);
1152 }
1153 
1154 int
1155 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1156 		uint32_t num_deps)
1157 {
1158 	unsigned int i;
1159 	int ret;
1160 
1161 	if ((num_deps == 0) || (!deps)) {
1162 		PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1163 		return -EINVAL;
1164 	}
1165 
1166 	ret = check_deps(s->t, deps, num_deps);
1167 	if (ret < 0)
1168 		return ret;
1169 
1170 	/* Update deps */
1171 	for (i = 0; i < num_deps; i++)
1172 		s->deps[i] = &deps[i]->shared;
1173 	s->num_deps = num_deps;
1174 
1175 	return 0;
1176 }
1177 
1178 struct opdl_ring *
1179 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1180 {
1181 	return s->t;
1182 }
1183 
1184 void
1185 opdl_stage_set_queue_id(struct opdl_stage *s,
1186 		uint32_t queue_id)
1187 {
1188 	s->queue_id = queue_id;
1189 }
1190 
1191 void
1192 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1193 {
1194 	uint32_t i;
1195 
1196 	if (t == NULL) {
1197 		fprintf(f, "NULL OPDL!\n");
1198 		return;
1199 	}
1200 	fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1201 			t->name, t->num_slots, t->mask, t->slot_size,
1202 			t->num_stages, t->socket);
1203 	for (i = 0; i < t->num_stages; i++) {
1204 		uint32_t j;
1205 		const struct opdl_stage *s = &t->stages[i];
1206 
1207 		fprintf(f, "  %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1208 				t->name, i, (s->threadsafe) ? "true" : "false",
1209 				(s->threadsafe) ? s->shared.head : s->head,
1210 				(s->threadsafe) ? s->shared.available_seq :
1211 				s->available_seq,
1212 				s->shared.tail, (s->num_deps > 0) ?
1213 				s->deps[0]->stage->index : 0);
1214 		for (j = 1; j < s->num_deps; j++)
1215 			fprintf(f, ",%u", s->deps[j]->stage->index);
1216 		fprintf(f, "\n");
1217 	}
1218 	fflush(f);
1219 }
1220 
1221 void
1222 opdl_ring_free(struct opdl_ring *t)
1223 {
1224 	uint32_t i;
1225 	const struct rte_memzone *mz;
1226 	char mz_name[RTE_MEMZONE_NAMESIZE];
1227 
1228 	if (t == NULL) {
1229 		PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1230 		return;
1231 	}
1232 
1233 	PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1234 
1235 	for (i = 0; i < t->num_stages; ++i) {
1236 		rte_free(t->stages[i].deps);
1237 		rte_free(t->stages[i].dep_tracking);
1238 	}
1239 
1240 	rte_free(t->stages);
1241 
1242 	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1243 	mz = rte_memzone_lookup(mz_name);
1244 	if (rte_memzone_free(mz) != 0)
1245 		PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1246 }
1247 
1248 /* search a opdl_ring from its name */
1249 struct opdl_ring *
1250 opdl_ring_lookup(const char *name)
1251 {
1252 	const struct rte_memzone *mz;
1253 	char mz_name[RTE_MEMZONE_NAMESIZE];
1254 
1255 	snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1256 
1257 	mz = rte_memzone_lookup(mz_name);
1258 	if (mz == NULL)
1259 		return NULL;
1260 
1261 	return mz->addr;
1262 }
1263 
1264 void
1265 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1266 {
1267 	s->threadsafe = threadsafe;
1268 }
1269