xref: /dpdk/drivers/event/sw/sw_evdev_scheduler.c (revision 68a03efeed657e6e05f281479b33b51102797e15)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10 #include "event_ring.h"
11 
12 #define SW_IQS_MASK (SW_IQS_MAX-1)
13 
14 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
15  * CLZ twice is faster than caching the value due to data dependencies
16  */
17 #define PKT_MASK_TO_IQ(pkts) \
18 	(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
19 
20 #if SW_IQS_MAX != 4
21 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
22 #endif
23 #define PRIO_TO_IQ(prio) (prio >> 6)
24 
25 #define MAX_PER_IQ_DEQUEUE 48
26 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
27 /* use cheap bit mixing, we only need to lose a few bits */
28 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
29 
30 
31 static inline uint32_t
32 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
33 		uint32_t iq_num, unsigned int count)
34 {
35 	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
36 	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
37 	uint32_t nb_blocked = 0;
38 	uint32_t i;
39 
40 	if (count > MAX_PER_IQ_DEQUEUE)
41 		count = MAX_PER_IQ_DEQUEUE;
42 
43 	/* This is the QID ID. The QID ID is static, hence it can be
44 	 * used to identify the stage of processing in history lists etc
45 	 */
46 	uint32_t qid_id = qid->id;
47 
48 	iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
49 	for (i = 0; i < count; i++) {
50 		const struct rte_event *qe = &qes[i];
51 		const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
52 		struct sw_fid_t *fid = &qid->fids[flow_id];
53 		int cq = fid->cq;
54 
55 		if (cq < 0) {
56 			uint32_t cq_idx;
57 			if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
58 				qid->cq_next_tx = 0;
59 			cq_idx = qid->cq_next_tx++;
60 
61 			cq = qid->cq_map[cq_idx];
62 
63 			/* find least used */
64 			int cq_free_cnt = sw->cq_ring_space[cq];
65 			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
66 					cq_idx++) {
67 				int test_cq = qid->cq_map[cq_idx];
68 				int test_cq_free = sw->cq_ring_space[test_cq];
69 				if (test_cq_free > cq_free_cnt) {
70 					cq = test_cq;
71 					cq_free_cnt = test_cq_free;
72 				}
73 			}
74 
75 			fid->cq = cq; /* this pins early */
76 		}
77 
78 		if (sw->cq_ring_space[cq] == 0 ||
79 				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
80 			blocked_qes[nb_blocked++] = *qe;
81 			continue;
82 		}
83 
84 		struct sw_port *p = &sw->ports[cq];
85 
86 		/* at this point we can queue up the packet on the cq_buf */
87 		fid->pcount++;
88 		p->cq_buf[p->cq_buf_count++] = *qe;
89 		p->inflights++;
90 		sw->cq_ring_space[cq]--;
91 
92 		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
93 		p->hist_list[head].fid = flow_id;
94 		p->hist_list[head].qid = qid_id;
95 
96 		p->stats.tx_pkts++;
97 		qid->stats.tx_pkts++;
98 		qid->to_port[cq]++;
99 
100 		/* if we just filled in the last slot, flush the buffer */
101 		if (sw->cq_ring_space[cq] == 0) {
102 			struct rte_event_ring *worker = p->cq_worker_ring;
103 			rte_event_ring_enqueue_burst(worker, p->cq_buf,
104 					p->cq_buf_count,
105 					&sw->cq_ring_space[cq]);
106 			p->cq_buf_count = 0;
107 		}
108 	}
109 	iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
110 
111 	return count - nb_blocked;
112 }
113 
114 static inline uint32_t
115 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
116 		uint32_t iq_num, unsigned int count, int keep_order)
117 {
118 	uint32_t i;
119 	uint32_t cq_idx = qid->cq_next_tx;
120 
121 	/* This is the QID ID. The QID ID is static, hence it can be
122 	 * used to identify the stage of processing in history lists etc
123 	 */
124 	uint32_t qid_id = qid->id;
125 
126 	if (count > MAX_PER_IQ_DEQUEUE)
127 		count = MAX_PER_IQ_DEQUEUE;
128 
129 	if (keep_order)
130 		/* only schedule as many as we have reorder buffer entries */
131 		count = RTE_MIN(count,
132 				rob_ring_count(qid->reorder_buffer_freelist));
133 
134 	for (i = 0; i < count; i++) {
135 		const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
136 		uint32_t cq_check_count = 0;
137 		uint32_t cq;
138 
139 		/*
140 		 *  for parallel, just send to next available CQ in round-robin
141 		 * fashion. So scan for an available CQ. If all CQs are full
142 		 * just return and move on to next QID
143 		 */
144 		do {
145 			if (++cq_check_count > qid->cq_num_mapped_cqs)
146 				goto exit;
147 			if (cq_idx >= qid->cq_num_mapped_cqs)
148 				cq_idx = 0;
149 			cq = qid->cq_map[cq_idx++];
150 
151 		} while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
152 				rte_event_ring_free_count(
153 					sw->ports[cq].cq_worker_ring) == 0);
154 
155 		struct sw_port *p = &sw->ports[cq];
156 		if (sw->cq_ring_space[cq] == 0 ||
157 				p->inflights == SW_PORT_HIST_LIST)
158 			break;
159 
160 		sw->cq_ring_space[cq]--;
161 
162 		qid->stats.tx_pkts++;
163 
164 		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
165 		p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
166 		p->hist_list[head].qid = qid_id;
167 
168 		if (keep_order)
169 			rob_ring_dequeue(qid->reorder_buffer_freelist,
170 					(void *)&p->hist_list[head].rob_entry);
171 
172 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
173 		iq_pop(sw, &qid->iq[iq_num]);
174 
175 		rte_compiler_barrier();
176 		p->inflights++;
177 		p->stats.tx_pkts++;
178 		p->hist_head++;
179 	}
180 exit:
181 	qid->cq_next_tx = cq_idx;
182 	return i;
183 }
184 
185 static uint32_t
186 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
187 		uint32_t iq_num, unsigned int count __rte_unused)
188 {
189 	uint32_t cq_id = qid->cq_map[0];
190 	struct sw_port *port = &sw->ports[cq_id];
191 
192 	/* get max burst enq size for cq_ring */
193 	uint32_t count_free = sw->cq_ring_space[cq_id];
194 	if (count_free == 0)
195 		return 0;
196 
197 	/* burst dequeue from the QID IQ ring */
198 	struct sw_iq *iq = &qid->iq[iq_num];
199 	uint32_t ret = iq_dequeue_burst(sw, iq,
200 			&port->cq_buf[port->cq_buf_count], count_free);
201 	port->cq_buf_count += ret;
202 
203 	/* Update QID, Port and Total TX stats */
204 	qid->stats.tx_pkts += ret;
205 	port->stats.tx_pkts += ret;
206 
207 	/* Subtract credits from cached value */
208 	sw->cq_ring_space[cq_id] -= ret;
209 
210 	return ret;
211 }
212 
213 static uint32_t
214 sw_schedule_qid_to_cq(struct sw_evdev *sw)
215 {
216 	uint32_t pkts = 0;
217 	uint32_t qid_idx;
218 
219 	sw->sched_cq_qid_called++;
220 
221 	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
222 		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
223 
224 		int type = qid->type;
225 		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
226 
227 		/* zero mapped CQs indicates directed */
228 		if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
229 			continue;
230 
231 		uint32_t pkts_done = 0;
232 		uint32_t count = iq_count(&qid->iq[iq_num]);
233 
234 		if (count >= sw->sched_min_burst) {
235 			if (type == SW_SCHED_TYPE_DIRECT)
236 				pkts_done += sw_schedule_dir_to_cq(sw, qid,
237 						iq_num, count);
238 			else if (type == RTE_SCHED_TYPE_ATOMIC)
239 				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
240 						iq_num, count);
241 			else
242 				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
243 						iq_num, count,
244 						type == RTE_SCHED_TYPE_ORDERED);
245 		}
246 
247 		/* Check if the IQ that was polled is now empty, and unset it
248 		 * in the IQ mask if its empty.
249 		 */
250 		int all_done = (pkts_done == count);
251 
252 		qid->iq_pkt_mask &= ~(all_done << (iq_num));
253 		pkts += pkts_done;
254 	}
255 
256 	return pkts;
257 }
258 
259 /* This function will perform re-ordering of packets, and injecting into
260  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
261  * contiguous in that array, this function accepts a "range" of QIDs to scan.
262  */
263 static uint16_t
264 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
265 {
266 	/* Perform egress reordering */
267 	struct rte_event *qe;
268 	uint32_t pkts_iter = 0;
269 
270 	for (; qid_start < qid_end; qid_start++) {
271 		struct sw_qid *qid = &sw->qids[qid_start];
272 		unsigned int i, num_entries_in_use;
273 
274 		if (qid->type != RTE_SCHED_TYPE_ORDERED)
275 			continue;
276 
277 		num_entries_in_use = rob_ring_free_count(
278 					qid->reorder_buffer_freelist);
279 
280 		if (num_entries_in_use < sw->sched_min_burst)
281 			num_entries_in_use = 0;
282 
283 		for (i = 0; i < num_entries_in_use; i++) {
284 			struct reorder_buffer_entry *entry;
285 			int j;
286 
287 			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
288 
289 			if (!entry->ready)
290 				break;
291 
292 			for (j = 0; j < entry->num_fragments; j++) {
293 				uint16_t dest_qid;
294 				uint16_t dest_iq;
295 
296 				int idx = entry->fragment_index + j;
297 				qe = &entry->fragments[idx];
298 
299 				dest_qid = qe->queue_id;
300 				dest_iq  = PRIO_TO_IQ(qe->priority);
301 
302 				if (dest_qid >= sw->qid_count) {
303 					sw->stats.rx_dropped++;
304 					continue;
305 				}
306 
307 				pkts_iter++;
308 
309 				struct sw_qid *q = &sw->qids[dest_qid];
310 				struct sw_iq *iq = &q->iq[dest_iq];
311 
312 				/* we checked for space above, so enqueue must
313 				 * succeed
314 				 */
315 				iq_enqueue(sw, iq, qe);
316 				q->iq_pkt_mask |= (1 << (dest_iq));
317 				q->iq_pkt_count[dest_iq]++;
318 				q->stats.rx_pkts++;
319 			}
320 
321 			entry->ready = (j != entry->num_fragments);
322 			entry->num_fragments -= j;
323 			entry->fragment_index += j;
324 
325 			if (!entry->ready) {
326 				entry->fragment_index = 0;
327 
328 				rob_ring_enqueue(
329 						qid->reorder_buffer_freelist,
330 						entry);
331 
332 				qid->reorder_buffer_index++;
333 				qid->reorder_buffer_index %= qid->window_size;
334 			}
335 		}
336 	}
337 	return pkts_iter;
338 }
339 
340 static __rte_always_inline void
341 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
342 {
343 	RTE_SET_USED(sw);
344 	struct rte_event_ring *worker = port->rx_worker_ring;
345 	port->pp_buf_start = 0;
346 	port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
347 			sw->sched_deq_burst_size, NULL);
348 }
349 
350 static __rte_always_inline uint32_t
351 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
352 {
353 	static struct reorder_buffer_entry dummy_rob;
354 	uint32_t pkts_iter = 0;
355 	struct sw_port *port = &sw->ports[port_id];
356 
357 	/* If shadow ring has 0 pkts, pull from worker ring */
358 	if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
359 		sw_refill_pp_buf(sw, port);
360 
361 	while (port->pp_buf_count) {
362 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
363 		struct sw_hist_list_entry *hist_entry = NULL;
364 		uint8_t flags = qe->op;
365 		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
366 		int needs_reorder = 0;
367 		/* if no-reordering, having PARTIAL == NEW */
368 		if (!allow_reorder && !eop)
369 			flags = QE_FLAG_VALID;
370 
371 		/*
372 		 * if we don't have space for this packet in an IQ,
373 		 * then move on to next queue. Technically, for a
374 		 * packet that needs reordering, we don't need to check
375 		 * here, but it simplifies things not to special-case
376 		 */
377 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
378 		struct sw_qid *qid = &sw->qids[qe->queue_id];
379 
380 		/* now process based on flags. Note that for directed
381 		 * queues, the enqueue_flush masks off all but the
382 		 * valid flag. This makes FWD and PARTIAL enqueues just
383 		 * NEW type, and makes DROPS no-op calls.
384 		 */
385 		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
386 			const uint32_t hist_tail = port->hist_tail &
387 					(SW_PORT_HIST_LIST - 1);
388 
389 			hist_entry = &port->hist_list[hist_tail];
390 			const uint32_t hist_qid = hist_entry->qid;
391 			const uint32_t hist_fid = hist_entry->fid;
392 
393 			struct sw_fid_t *fid =
394 				&sw->qids[hist_qid].fids[hist_fid];
395 			fid->pcount -= eop;
396 			if (fid->pcount == 0)
397 				fid->cq = -1;
398 
399 			if (allow_reorder) {
400 				/* set reorder ready if an ordered QID */
401 				uintptr_t rob_ptr =
402 					(uintptr_t)hist_entry->rob_entry;
403 				const uintptr_t valid = (rob_ptr != 0);
404 				needs_reorder = valid;
405 				rob_ptr |=
406 					((valid - 1) & (uintptr_t)&dummy_rob);
407 				struct reorder_buffer_entry *tmp_rob_ptr =
408 					(struct reorder_buffer_entry *)rob_ptr;
409 				tmp_rob_ptr->ready = eop * needs_reorder;
410 			}
411 
412 			port->inflights -= eop;
413 			port->hist_tail += eop;
414 		}
415 		if (flags & QE_FLAG_VALID) {
416 			port->stats.rx_pkts++;
417 
418 			if (allow_reorder && needs_reorder) {
419 				struct reorder_buffer_entry *rob_entry =
420 						hist_entry->rob_entry;
421 
422 				hist_entry->rob_entry = NULL;
423 				/* Although fragmentation not currently
424 				 * supported by eventdev API, we support it
425 				 * here. Open: How do we alert the user that
426 				 * they've exceeded max frags?
427 				 */
428 				int num_frag = rob_entry->num_fragments;
429 				if (num_frag == SW_FRAGMENTS_MAX)
430 					sw->stats.rx_dropped++;
431 				else {
432 					int idx = rob_entry->num_fragments++;
433 					rob_entry->fragments[idx] = *qe;
434 				}
435 				goto end_qe;
436 			}
437 
438 			/* Use the iq_num from above to push the QE
439 			 * into the qid at the right priority
440 			 */
441 
442 			qid->iq_pkt_mask |= (1 << (iq_num));
443 			iq_enqueue(sw, &qid->iq[iq_num], qe);
444 			qid->iq_pkt_count[iq_num]++;
445 			qid->stats.rx_pkts++;
446 			pkts_iter++;
447 		}
448 
449 end_qe:
450 		port->pp_buf_start++;
451 		port->pp_buf_count--;
452 	} /* while (avail_qes) */
453 
454 	return pkts_iter;
455 }
456 
457 static uint32_t
458 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
459 {
460 	return __pull_port_lb(sw, port_id, 1);
461 }
462 
463 static uint32_t
464 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
465 {
466 	return __pull_port_lb(sw, port_id, 0);
467 }
468 
469 static uint32_t
470 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
471 {
472 	uint32_t pkts_iter = 0;
473 	struct sw_port *port = &sw->ports[port_id];
474 
475 	/* If shadow ring has 0 pkts, pull from worker ring */
476 	if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
477 		sw_refill_pp_buf(sw, port);
478 
479 	while (port->pp_buf_count) {
480 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
481 		uint8_t flags = qe->op;
482 
483 		if ((flags & QE_FLAG_VALID) == 0)
484 			goto end_qe;
485 
486 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
487 		struct sw_qid *qid = &sw->qids[qe->queue_id];
488 		struct sw_iq *iq = &qid->iq[iq_num];
489 
490 		port->stats.rx_pkts++;
491 
492 		/* Use the iq_num from above to push the QE
493 		 * into the qid at the right priority
494 		 */
495 		qid->iq_pkt_mask |= (1 << (iq_num));
496 		iq_enqueue(sw, iq, qe);
497 		qid->iq_pkt_count[iq_num]++;
498 		qid->stats.rx_pkts++;
499 		pkts_iter++;
500 
501 end_qe:
502 		port->pp_buf_start++;
503 		port->pp_buf_count--;
504 	} /* while port->pp_buf_count */
505 
506 	return pkts_iter;
507 }
508 
509 void
510 sw_event_schedule(struct rte_eventdev *dev)
511 {
512 	struct sw_evdev *sw = sw_pmd_priv(dev);
513 	uint32_t in_pkts, out_pkts;
514 	uint32_t out_pkts_total = 0, in_pkts_total = 0;
515 	int32_t sched_quanta = sw->sched_quanta;
516 	uint32_t i;
517 
518 	sw->sched_called++;
519 	if (unlikely(!sw->started))
520 		return;
521 
522 	do {
523 		uint32_t in_pkts_this_iteration = 0;
524 
525 		/* Pull from rx_ring for ports */
526 		do {
527 			in_pkts = 0;
528 			for (i = 0; i < sw->port_count; i++) {
529 				/* ack the unlinks in progress as done */
530 				if (sw->ports[i].unlinks_in_progress)
531 					sw->ports[i].unlinks_in_progress = 0;
532 
533 				if (sw->ports[i].is_directed)
534 					in_pkts += sw_schedule_pull_port_dir(sw, i);
535 				else if (sw->ports[i].num_ordered_qids > 0)
536 					in_pkts += sw_schedule_pull_port_lb(sw, i);
537 				else
538 					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
539 			}
540 
541 			/* QID scan for re-ordered */
542 			in_pkts += sw_schedule_reorder(sw, 0,
543 					sw->qid_count);
544 			in_pkts_this_iteration += in_pkts;
545 		} while (in_pkts > 4 &&
546 				(int)in_pkts_this_iteration < sched_quanta);
547 
548 		out_pkts = sw_schedule_qid_to_cq(sw);
549 		out_pkts_total += out_pkts;
550 		in_pkts_total += in_pkts_this_iteration;
551 
552 		if (in_pkts == 0 && out_pkts == 0)
553 			break;
554 	} while ((int)out_pkts_total < sched_quanta);
555 
556 	sw->stats.tx_pkts += out_pkts_total;
557 	sw->stats.rx_pkts += in_pkts_total;
558 
559 	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
560 	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
561 
562 	/* push all the internal buffered QEs in port->cq_ring to the
563 	 * worker cores: aka, do the ring transfers batched.
564 	 */
565 	int no_enq = 1;
566 	for (i = 0; i < sw->port_count; i++) {
567 		struct sw_port *port = &sw->ports[i];
568 		struct rte_event_ring *worker = port->cq_worker_ring;
569 
570 		/* If shadow ring has 0 pkts, pull from worker ring */
571 		if (sw->refill_once_per_iter && port->pp_buf_count == 0)
572 			sw_refill_pp_buf(sw, port);
573 
574 		if (port->cq_buf_count >= sw->sched_min_burst) {
575 			rte_event_ring_enqueue_burst(worker,
576 					port->cq_buf,
577 					port->cq_buf_count,
578 					&sw->cq_ring_space[i]);
579 			port->cq_buf_count = 0;
580 			no_enq = 0;
581 		} else {
582 			sw->cq_ring_space[i] =
583 					rte_event_ring_free_count(worker) -
584 					port->cq_buf_count;
585 		}
586 	}
587 
588 	if (no_enq) {
589 		if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
590 			sw->sched_min_burst = 1;
591 		else
592 			sw->sched_flush_count++;
593 	} else {
594 		if (sw->sched_flush_count)
595 			sw->sched_flush_count--;
596 		else
597 			sw->sched_min_burst = sw->sched_min_burst_size;
598 	}
599 
600 }
601