xref: /dpdk/drivers/event/sw/sw_evdev_scheduler.c (revision 5078a8f37d82f3b54cf373389b6754e2bafd584f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10 #include "event_ring.h"
11 
12 #define SW_IQS_MASK (SW_IQS_MAX-1)
13 
14 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
15  * CLZ twice is faster than caching the value due to data dependencies
16  */
17 #define PKT_MASK_TO_IQ(pkts) \
18 	(rte_ctz32(pkts | (1 << SW_IQS_MAX)))
19 
20 #if SW_IQS_MAX != 4
21 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
22 #endif
23 #define PRIO_TO_IQ(prio) (prio >> 6)
24 
25 #define MAX_PER_IQ_DEQUEUE 48
26 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
27 /* use cheap bit mixing, we only need to lose a few bits */
28 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
29 
30 
31 static inline uint32_t
sw_schedule_atomic_to_cq(struct sw_evdev * sw,struct sw_qid * const qid,uint32_t iq_num,unsigned int count)32 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
33 		uint32_t iq_num, unsigned int count)
34 {
35 	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
36 	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
37 	uint32_t nb_blocked = 0;
38 	uint32_t i;
39 
40 	if (count > MAX_PER_IQ_DEQUEUE)
41 		count = MAX_PER_IQ_DEQUEUE;
42 
43 	/* This is the QID ID. The QID ID is static, hence it can be
44 	 * used to identify the stage of processing in history lists etc
45 	 */
46 	uint32_t qid_id = qid->id;
47 
48 	iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
49 	for (i = 0; i < count; i++) {
50 		const struct rte_event *qe = &qes[i];
51 		const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
52 		struct sw_fid_t *fid = &qid->fids[flow_id];
53 		int cq = fid->cq;
54 
55 		if (cq < 0) {
56 			uint32_t cq_idx;
57 			if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
58 				qid->cq_next_tx = 0;
59 			cq_idx = qid->cq_next_tx++;
60 
61 			cq = qid->cq_map[cq_idx];
62 
63 			/* find least used */
64 			int cq_free_cnt = sw->cq_ring_space[cq];
65 			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
66 					cq_idx++) {
67 				int test_cq = qid->cq_map[cq_idx];
68 				int test_cq_free = sw->cq_ring_space[test_cq];
69 				if (test_cq_free > cq_free_cnt) {
70 					cq = test_cq;
71 					cq_free_cnt = test_cq_free;
72 				}
73 			}
74 
75 			fid->cq = cq; /* this pins early */
76 		}
77 
78 		if (sw->cq_ring_space[cq] == 0 ||
79 				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
80 			blocked_qes[nb_blocked++] = *qe;
81 			continue;
82 		}
83 
84 		struct sw_port *p = &sw->ports[cq];
85 
86 		/* at this point we can queue up the packet on the cq_buf */
87 		fid->pcount++;
88 		p->cq_buf[p->cq_buf_count++] = *qe;
89 		p->inflights++;
90 		sw->cq_ring_space[cq]--;
91 
92 		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
93 		p->hist_list[head] = (struct sw_hist_list_entry) {
94 			.qid = qid_id,
95 			.fid = flow_id,
96 		};
97 
98 		p->stats.tx_pkts++;
99 		qid->stats.tx_pkts++;
100 		qid->to_port[cq]++;
101 
102 		/* if we just filled in the last slot, flush the buffer */
103 		if (sw->cq_ring_space[cq] == 0) {
104 			struct rte_event_ring *worker = p->cq_worker_ring;
105 			rte_event_ring_enqueue_burst(worker, p->cq_buf,
106 					p->cq_buf_count,
107 					&sw->cq_ring_space[cq]);
108 			p->cq_buf_count = 0;
109 		}
110 	}
111 	iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
112 
113 	return count - nb_blocked;
114 }
115 
116 static inline uint32_t
sw_schedule_parallel_to_cq(struct sw_evdev * sw,struct sw_qid * const qid,uint32_t iq_num,unsigned int count,int keep_order)117 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
118 		uint32_t iq_num, unsigned int count, int keep_order)
119 {
120 	uint32_t i;
121 	uint32_t cq_idx = qid->cq_next_tx;
122 
123 	/* This is the QID ID. The QID ID is static, hence it can be
124 	 * used to identify the stage of processing in history lists etc
125 	 */
126 	uint32_t qid_id = qid->id;
127 
128 	if (count > MAX_PER_IQ_DEQUEUE)
129 		count = MAX_PER_IQ_DEQUEUE;
130 
131 	if (keep_order)
132 		/* only schedule as many as we have reorder buffer entries */
133 		count = RTE_MIN(count,
134 				rob_ring_count(qid->reorder_buffer_freelist));
135 
136 	for (i = 0; i < count; i++) {
137 		const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
138 		uint32_t cq_check_count = 0;
139 		uint32_t cq;
140 
141 		/*
142 		 *  for parallel, just send to next available CQ in round-robin
143 		 * fashion. So scan for an available CQ. If all CQs are full
144 		 * just return and move on to next QID
145 		 */
146 		do {
147 			if (++cq_check_count > qid->cq_num_mapped_cqs)
148 				goto exit;
149 			if (cq_idx >= qid->cq_num_mapped_cqs)
150 				cq_idx = 0;
151 			cq = qid->cq_map[cq_idx++];
152 
153 		} while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
154 				rte_event_ring_free_count(
155 					sw->ports[cq].cq_worker_ring) == 0);
156 
157 		struct sw_port *p = &sw->ports[cq];
158 		if (sw->cq_ring_space[cq] == 0 ||
159 				p->inflights == SW_PORT_HIST_LIST)
160 			break;
161 
162 		sw->cq_ring_space[cq]--;
163 
164 		qid->stats.tx_pkts++;
165 
166 		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
167 		p->hist_list[head] = (struct sw_hist_list_entry) {
168 			.qid = qid_id,
169 			.fid = SW_HASH_FLOWID(qe->flow_id),
170 		};
171 
172 		if (keep_order)
173 			rob_ring_dequeue(qid->reorder_buffer_freelist,
174 					(void *)&p->hist_list[head].rob_entry);
175 
176 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
177 		iq_pop(sw, &qid->iq[iq_num]);
178 
179 		rte_compiler_barrier();
180 		p->inflights++;
181 		p->stats.tx_pkts++;
182 		p->hist_head++;
183 	}
184 exit:
185 	qid->cq_next_tx = cq_idx;
186 	return i;
187 }
188 
189 static uint32_t
sw_schedule_dir_to_cq(struct sw_evdev * sw,struct sw_qid * const qid,uint32_t iq_num,unsigned int count __rte_unused)190 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
191 		uint32_t iq_num, unsigned int count __rte_unused)
192 {
193 	uint32_t cq_id = qid->cq_map[0];
194 	struct sw_port *port = &sw->ports[cq_id];
195 
196 	/* get max burst enq size for cq_ring */
197 	uint32_t count_free = sw->cq_ring_space[cq_id];
198 	if (count_free == 0)
199 		return 0;
200 
201 	/* burst dequeue from the QID IQ ring */
202 	struct sw_iq *iq = &qid->iq[iq_num];
203 	uint32_t ret = iq_dequeue_burst(sw, iq,
204 			&port->cq_buf[port->cq_buf_count], count_free);
205 	port->cq_buf_count += ret;
206 
207 	/* Update QID, Port and Total TX stats */
208 	qid->stats.tx_pkts += ret;
209 	port->stats.tx_pkts += ret;
210 
211 	/* Subtract credits from cached value */
212 	sw->cq_ring_space[cq_id] -= ret;
213 
214 	return ret;
215 }
216 
217 static uint32_t
sw_schedule_qid_to_cq(struct sw_evdev * sw)218 sw_schedule_qid_to_cq(struct sw_evdev *sw)
219 {
220 	uint32_t pkts = 0;
221 	uint32_t qid_idx;
222 
223 	sw->sched_cq_qid_called++;
224 
225 	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
226 		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
227 
228 		int type = qid->type;
229 		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
230 
231 		/* zero mapped CQs indicates directed */
232 		if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
233 			continue;
234 
235 		uint32_t pkts_done = 0;
236 		uint32_t count = iq_count(&qid->iq[iq_num]);
237 
238 		if (count >= sw->sched_min_burst) {
239 			if (type == SW_SCHED_TYPE_DIRECT)
240 				pkts_done += sw_schedule_dir_to_cq(sw, qid,
241 						iq_num, count);
242 			else if (type == RTE_SCHED_TYPE_ATOMIC)
243 				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
244 						iq_num, count);
245 			else
246 				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
247 						iq_num, count,
248 						type == RTE_SCHED_TYPE_ORDERED);
249 		}
250 
251 		/* Check if the IQ that was polled is now empty, and unset it
252 		 * in the IQ mask if its empty.
253 		 */
254 		int all_done = (pkts_done == count);
255 
256 		qid->iq_pkt_mask &= ~(all_done << (iq_num));
257 		pkts += pkts_done;
258 	}
259 
260 	return pkts;
261 }
262 
263 /* This function will perform re-ordering of packets, and injecting into
264  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
265  * contiguous in that array, this function accepts a "range" of QIDs to scan.
266  */
267 static uint16_t
sw_schedule_reorder(struct sw_evdev * sw,int qid_start,int qid_end)268 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
269 {
270 	/* Perform egress reordering */
271 	struct rte_event *qe;
272 	uint32_t pkts_iter = 0;
273 
274 	for (; qid_start < qid_end; qid_start++) {
275 		struct sw_qid *qid = &sw->qids[qid_start];
276 		unsigned int i, num_entries_in_use;
277 
278 		if (qid->type != RTE_SCHED_TYPE_ORDERED)
279 			continue;
280 
281 		num_entries_in_use = rob_ring_free_count(
282 					qid->reorder_buffer_freelist);
283 
284 		if (num_entries_in_use < sw->sched_min_burst)
285 			num_entries_in_use = 0;
286 
287 		for (i = 0; i < num_entries_in_use; i++) {
288 			struct reorder_buffer_entry *entry;
289 			int j;
290 
291 			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
292 
293 			if (!entry->ready)
294 				break;
295 
296 			for (j = 0; j < entry->num_fragments; j++) {
297 				uint16_t dest_qid;
298 				uint16_t dest_iq;
299 
300 				int idx = entry->fragment_index + j;
301 				qe = &entry->fragments[idx];
302 
303 				dest_qid = qe->queue_id;
304 				dest_iq  = PRIO_TO_IQ(qe->priority);
305 
306 				if (dest_qid >= sw->qid_count) {
307 					sw->stats.rx_dropped++;
308 					continue;
309 				}
310 
311 				pkts_iter++;
312 
313 				struct sw_qid *q = &sw->qids[dest_qid];
314 				struct sw_iq *iq = &q->iq[dest_iq];
315 
316 				/* we checked for space above, so enqueue must
317 				 * succeed
318 				 */
319 				iq_enqueue(sw, iq, qe);
320 				q->iq_pkt_mask |= (1 << (dest_iq));
321 				q->iq_pkt_count[dest_iq]++;
322 				q->stats.rx_pkts++;
323 			}
324 
325 			entry->ready = (j != entry->num_fragments);
326 			entry->num_fragments -= j;
327 			entry->fragment_index += j;
328 
329 			if (!entry->ready) {
330 				entry->fragment_index = 0;
331 
332 				rob_ring_enqueue(
333 						qid->reorder_buffer_freelist,
334 						entry);
335 
336 				qid->reorder_buffer_index++;
337 				qid->reorder_buffer_index %= qid->window_size;
338 			}
339 		}
340 	}
341 	return pkts_iter;
342 }
343 
344 static __rte_always_inline void
sw_refill_pp_buf(struct sw_evdev * sw,struct sw_port * port)345 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
346 {
347 	RTE_SET_USED(sw);
348 	struct rte_event_ring *worker = port->rx_worker_ring;
349 	port->pp_buf_start = 0;
350 	port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
351 			sw->sched_deq_burst_size, NULL);
352 }
353 
354 static __rte_always_inline uint32_t
__pull_port_lb(struct sw_evdev * sw,uint32_t port_id,int allow_reorder)355 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
356 {
357 	static struct reorder_buffer_entry dummy_rob;
358 	uint32_t pkts_iter = 0;
359 	struct sw_port *port = &sw->ports[port_id];
360 
361 	/* If shadow ring has 0 pkts, pull from worker ring */
362 	if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
363 		sw_refill_pp_buf(sw, port);
364 
365 	while (port->pp_buf_count) {
366 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
367 		struct sw_hist_list_entry *hist_entry = NULL;
368 		uint8_t flags = qe->op;
369 		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
370 		int needs_reorder = 0;
371 		/* if no-reordering, having PARTIAL == NEW */
372 		if (!allow_reorder && !eop)
373 			flags = QE_FLAG_VALID;
374 
375 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
376 		struct sw_qid *qid = &sw->qids[qe->queue_id];
377 
378 		/* now process based on flags. Note that for directed
379 		 * queues, the enqueue_flush masks off all but the
380 		 * valid flag. This makes FWD and PARTIAL enqueues just
381 		 * NEW type, and makes DROPS no-op calls.
382 		 */
383 		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
384 			const uint32_t hist_tail = port->hist_tail &
385 					(SW_PORT_HIST_LIST - 1);
386 
387 			hist_entry = &port->hist_list[hist_tail];
388 			const uint32_t hist_qid = hist_entry->qid;
389 			const uint32_t hist_fid = hist_entry->fid;
390 
391 			struct sw_fid_t *fid =
392 				&sw->qids[hist_qid].fids[hist_fid];
393 			fid->pcount -= eop;
394 			if (fid->pcount == 0)
395 				fid->cq = -1;
396 
397 			if (allow_reorder) {
398 				/* set reorder ready if an ordered QID */
399 				uintptr_t rob_ptr =
400 					(uintptr_t)hist_entry->rob_entry;
401 				const uintptr_t valid = (rob_ptr != 0);
402 				needs_reorder = valid;
403 				rob_ptr |=
404 					((valid - 1) & (uintptr_t)&dummy_rob);
405 				struct reorder_buffer_entry *tmp_rob_ptr =
406 					(struct reorder_buffer_entry *)rob_ptr;
407 				tmp_rob_ptr->ready = eop * needs_reorder;
408 			}
409 
410 			port->inflights -= eop;
411 			port->hist_tail += eop;
412 		}
413 		if (flags & QE_FLAG_VALID) {
414 			port->stats.rx_pkts++;
415 
416 			if (allow_reorder && needs_reorder) {
417 				struct reorder_buffer_entry *rob_entry =
418 						hist_entry->rob_entry;
419 
420 				/* Although fragmentation not currently
421 				 * supported by eventdev API, we support it
422 				 * here. Open: How do we alert the user that
423 				 * they've exceeded max frags?
424 				 */
425 				int num_frag = rob_entry->num_fragments;
426 				if (num_frag == SW_FRAGMENTS_MAX)
427 					sw->stats.rx_dropped++;
428 				else {
429 					int idx = rob_entry->num_fragments++;
430 					rob_entry->fragments[idx] = *qe;
431 				}
432 				goto end_qe;
433 			}
434 
435 			/* Use the iq_num from above to push the QE
436 			 * into the qid at the right priority
437 			 */
438 
439 			qid->iq_pkt_mask |= (1 << (iq_num));
440 			iq_enqueue(sw, &qid->iq[iq_num], qe);
441 			qid->iq_pkt_count[iq_num]++;
442 			qid->stats.rx_pkts++;
443 			pkts_iter++;
444 		}
445 
446 end_qe:
447 		port->pp_buf_start++;
448 		port->pp_buf_count--;
449 	} /* while (avail_qes) */
450 
451 	return pkts_iter;
452 }
453 
454 static uint32_t
sw_schedule_pull_port_lb(struct sw_evdev * sw,uint32_t port_id)455 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
456 {
457 	return __pull_port_lb(sw, port_id, 1);
458 }
459 
460 static uint32_t
sw_schedule_pull_port_no_reorder(struct sw_evdev * sw,uint32_t port_id)461 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
462 {
463 	return __pull_port_lb(sw, port_id, 0);
464 }
465 
466 static uint32_t
sw_schedule_pull_port_dir(struct sw_evdev * sw,uint32_t port_id)467 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
468 {
469 	uint32_t pkts_iter = 0;
470 	struct sw_port *port = &sw->ports[port_id];
471 
472 	/* If shadow ring has 0 pkts, pull from worker ring */
473 	if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
474 		sw_refill_pp_buf(sw, port);
475 
476 	while (port->pp_buf_count) {
477 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
478 		uint8_t flags = qe->op;
479 
480 		if ((flags & QE_FLAG_VALID) == 0)
481 			goto end_qe;
482 
483 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
484 		struct sw_qid *qid = &sw->qids[qe->queue_id];
485 		struct sw_iq *iq = &qid->iq[iq_num];
486 
487 		port->stats.rx_pkts++;
488 
489 		/* Use the iq_num from above to push the QE
490 		 * into the qid at the right priority
491 		 */
492 		qid->iq_pkt_mask |= (1 << (iq_num));
493 		iq_enqueue(sw, iq, qe);
494 		qid->iq_pkt_count[iq_num]++;
495 		qid->stats.rx_pkts++;
496 		pkts_iter++;
497 
498 end_qe:
499 		port->pp_buf_start++;
500 		port->pp_buf_count--;
501 	} /* while port->pp_buf_count */
502 
503 	return pkts_iter;
504 }
505 
506 int32_t
sw_event_schedule(struct rte_eventdev * dev)507 sw_event_schedule(struct rte_eventdev *dev)
508 {
509 	struct sw_evdev *sw = sw_pmd_priv(dev);
510 	uint32_t in_pkts, out_pkts;
511 	uint32_t out_pkts_total = 0, in_pkts_total = 0;
512 	int32_t sched_quanta = sw->sched_quanta;
513 	uint32_t i;
514 
515 	sw->sched_called++;
516 	if (unlikely(!sw->started))
517 		return -EAGAIN;
518 
519 	do {
520 		uint32_t in_pkts_this_iteration = 0;
521 
522 		/* Pull from rx_ring for ports */
523 		do {
524 			in_pkts = 0;
525 			for (i = 0; i < sw->port_count; i++) {
526 				/* ack the unlinks in progress as done */
527 				if (sw->ports[i].unlinks_in_progress)
528 					sw->ports[i].unlinks_in_progress = 0;
529 
530 				if (sw->ports[i].is_directed)
531 					in_pkts += sw_schedule_pull_port_dir(sw, i);
532 				else if (sw->ports[i].num_ordered_qids > 0)
533 					in_pkts += sw_schedule_pull_port_lb(sw, i);
534 				else
535 					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
536 			}
537 
538 			/* QID scan for re-ordered */
539 			in_pkts += sw_schedule_reorder(sw, 0,
540 					sw->qid_count);
541 			in_pkts_this_iteration += in_pkts;
542 		} while (in_pkts > 4 &&
543 				(int)in_pkts_this_iteration < sched_quanta);
544 
545 		out_pkts = sw_schedule_qid_to_cq(sw);
546 		out_pkts_total += out_pkts;
547 		in_pkts_total += in_pkts_this_iteration;
548 
549 		if (in_pkts == 0 && out_pkts == 0)
550 			break;
551 	} while ((int)out_pkts_total < sched_quanta);
552 
553 	sw->stats.tx_pkts += out_pkts_total;
554 	sw->stats.rx_pkts += in_pkts_total;
555 
556 	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
557 	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
558 
559 	uint64_t work_done = (in_pkts_total + out_pkts_total) != 0;
560 	sw->sched_progress_last_iter = work_done;
561 
562 	uint64_t cqs_scheds_last_iter = 0;
563 
564 	/* push all the internal buffered QEs in port->cq_ring to the
565 	 * worker cores: aka, do the ring transfers batched.
566 	 */
567 	int no_enq = 1;
568 	for (i = 0; i < sw->port_count; i++) {
569 		struct sw_port *port = &sw->ports[i];
570 		struct rte_event_ring *worker = port->cq_worker_ring;
571 
572 		/* If shadow ring has 0 pkts, pull from worker ring */
573 		if (sw->refill_once_per_iter && port->pp_buf_count == 0)
574 			sw_refill_pp_buf(sw, port);
575 
576 		if (port->cq_buf_count >= sw->sched_min_burst) {
577 			rte_event_ring_enqueue_burst(worker,
578 					port->cq_buf,
579 					port->cq_buf_count,
580 					&sw->cq_ring_space[i]);
581 			port->cq_buf_count = 0;
582 			no_enq = 0;
583 			cqs_scheds_last_iter |= (1ULL << i);
584 		} else {
585 			sw->cq_ring_space[i] =
586 					rte_event_ring_free_count(worker) -
587 					port->cq_buf_count;
588 		}
589 	}
590 
591 	if (no_enq) {
592 		if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
593 			sw->sched_min_burst = 1;
594 		else
595 			sw->sched_flush_count++;
596 	} else {
597 		if (sw->sched_flush_count)
598 			sw->sched_flush_count--;
599 		else
600 			sw->sched_min_burst = sw->sched_min_burst_size;
601 	}
602 
603 	/* Provide stats on what eventdev ports were scheduled to this
604 	 * iteration. If more than 64 ports are active, always report that
605 	 * all Eventdev ports have been scheduled events.
606 	 */
607 	sw->sched_last_iter_bitmask = cqs_scheds_last_iter;
608 	if (unlikely(sw->port_count >= 64))
609 		sw->sched_last_iter_bitmask = UINT64_MAX;
610 
611 	return work_done ? 0 : -EAGAIN;
612 }
613