xref: /dpdk/drivers/event/sw/sw_evdev_scheduler.c (revision c7f5dba7d4bb7971fac51755aad09b71b10cef90)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10 
11 #define SW_IQS_MASK (SW_IQS_MAX-1)
12 
13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14  * CLZ twice is faster than caching the value due to data dependencies
15  */
16 #define PKT_MASK_TO_IQ(pkts) \
17 	(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
18 
19 #if SW_IQS_MAX != 4
20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
21 #endif
22 #define PRIO_TO_IQ(prio) (prio >> 6)
23 
24 #define MAX_PER_IQ_DEQUEUE 48
25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26 /* use cheap bit mixing, we only need to lose a few bits */
27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
28 
29 static inline uint32_t
30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
31 		uint32_t iq_num, unsigned int count)
32 {
33 	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
34 	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
35 	uint32_t nb_blocked = 0;
36 	uint32_t i;
37 
38 	if (count > MAX_PER_IQ_DEQUEUE)
39 		count = MAX_PER_IQ_DEQUEUE;
40 
41 	/* This is the QID ID. The QID ID is static, hence it can be
42 	 * used to identify the stage of processing in history lists etc
43 	 */
44 	uint32_t qid_id = qid->id;
45 
46 	iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
47 	for (i = 0; i < count; i++) {
48 		const struct rte_event *qe = &qes[i];
49 		const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
50 		struct sw_fid_t *fid = &qid->fids[flow_id];
51 		int cq = fid->cq;
52 
53 		if (cq < 0) {
54 			uint32_t cq_idx;
55 			if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
56 				qid->cq_next_tx = 0;
57 			cq_idx = qid->cq_next_tx++;
58 
59 			cq = qid->cq_map[cq_idx];
60 
61 			/* find least used */
62 			int cq_free_cnt = sw->cq_ring_space[cq];
63 			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
64 					cq_idx++) {
65 				int test_cq = qid->cq_map[cq_idx];
66 				int test_cq_free = sw->cq_ring_space[test_cq];
67 				if (test_cq_free > cq_free_cnt) {
68 					cq = test_cq;
69 					cq_free_cnt = test_cq_free;
70 				}
71 			}
72 
73 			fid->cq = cq; /* this pins early */
74 		}
75 
76 		if (sw->cq_ring_space[cq] == 0 ||
77 				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
78 			blocked_qes[nb_blocked++] = *qe;
79 			continue;
80 		}
81 
82 		struct sw_port *p = &sw->ports[cq];
83 
84 		/* at this point we can queue up the packet on the cq_buf */
85 		fid->pcount++;
86 		p->cq_buf[p->cq_buf_count++] = *qe;
87 		p->inflights++;
88 		sw->cq_ring_space[cq]--;
89 
90 		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
91 		p->hist_list[head].fid = flow_id;
92 		p->hist_list[head].qid = qid_id;
93 
94 		p->stats.tx_pkts++;
95 		qid->stats.tx_pkts++;
96 		qid->to_port[cq]++;
97 
98 		/* if we just filled in the last slot, flush the buffer */
99 		if (sw->cq_ring_space[cq] == 0) {
100 			struct rte_event_ring *worker = p->cq_worker_ring;
101 			rte_event_ring_enqueue_burst(worker, p->cq_buf,
102 					p->cq_buf_count,
103 					&sw->cq_ring_space[cq]);
104 			p->cq_buf_count = 0;
105 		}
106 	}
107 	iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
108 
109 	return count - nb_blocked;
110 }
111 
112 static inline uint32_t
113 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
114 		uint32_t iq_num, unsigned int count, int keep_order)
115 {
116 	uint32_t i;
117 	uint32_t cq_idx = qid->cq_next_tx;
118 
119 	/* This is the QID ID. The QID ID is static, hence it can be
120 	 * used to identify the stage of processing in history lists etc
121 	 */
122 	uint32_t qid_id = qid->id;
123 
124 	if (count > MAX_PER_IQ_DEQUEUE)
125 		count = MAX_PER_IQ_DEQUEUE;
126 
127 	if (keep_order)
128 		/* only schedule as many as we have reorder buffer entries */
129 		count = RTE_MIN(count,
130 				rte_ring_count(qid->reorder_buffer_freelist));
131 
132 	for (i = 0; i < count; i++) {
133 		const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
134 		uint32_t cq_check_count = 0;
135 		uint32_t cq;
136 
137 		/*
138 		 *  for parallel, just send to next available CQ in round-robin
139 		 * fashion. So scan for an available CQ. If all CQs are full
140 		 * just return and move on to next QID
141 		 */
142 		do {
143 			if (++cq_check_count > qid->cq_num_mapped_cqs)
144 				goto exit;
145 			if (cq_idx >= qid->cq_num_mapped_cqs)
146 				cq_idx = 0;
147 			cq = qid->cq_map[cq_idx++];
148 
149 		} while (rte_event_ring_free_count(
150 				sw->ports[cq].cq_worker_ring) == 0 ||
151 				sw->ports[cq].inflights == SW_PORT_HIST_LIST);
152 
153 		struct sw_port *p = &sw->ports[cq];
154 		if (sw->cq_ring_space[cq] == 0 ||
155 				p->inflights == SW_PORT_HIST_LIST)
156 			break;
157 
158 		sw->cq_ring_space[cq]--;
159 
160 		qid->stats.tx_pkts++;
161 
162 		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
163 		p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
164 		p->hist_list[head].qid = qid_id;
165 
166 		if (keep_order)
167 			rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
168 					(void *)&p->hist_list[head].rob_entry);
169 
170 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
171 		iq_pop(sw, &qid->iq[iq_num]);
172 
173 		rte_compiler_barrier();
174 		p->inflights++;
175 		p->stats.tx_pkts++;
176 		p->hist_head++;
177 	}
178 exit:
179 	qid->cq_next_tx = cq_idx;
180 	return i;
181 }
182 
183 static uint32_t
184 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
185 		uint32_t iq_num, unsigned int count __rte_unused)
186 {
187 	uint32_t cq_id = qid->cq_map[0];
188 	struct sw_port *port = &sw->ports[cq_id];
189 
190 	/* get max burst enq size for cq_ring */
191 	uint32_t count_free = sw->cq_ring_space[cq_id];
192 	if (count_free == 0)
193 		return 0;
194 
195 	/* burst dequeue from the QID IQ ring */
196 	struct sw_iq *iq = &qid->iq[iq_num];
197 	uint32_t ret = iq_dequeue_burst(sw, iq,
198 			&port->cq_buf[port->cq_buf_count], count_free);
199 	port->cq_buf_count += ret;
200 
201 	/* Update QID, Port and Total TX stats */
202 	qid->stats.tx_pkts += ret;
203 	port->stats.tx_pkts += ret;
204 
205 	/* Subtract credits from cached value */
206 	sw->cq_ring_space[cq_id] -= ret;
207 
208 	return ret;
209 }
210 
211 static uint32_t
212 sw_schedule_qid_to_cq(struct sw_evdev *sw)
213 {
214 	uint32_t pkts = 0;
215 	uint32_t qid_idx;
216 
217 	sw->sched_cq_qid_called++;
218 
219 	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
220 		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
221 
222 		int type = qid->type;
223 		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
224 
225 		/* zero mapped CQs indicates directed */
226 		if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
227 			continue;
228 
229 		uint32_t pkts_done = 0;
230 		uint32_t count = iq_count(&qid->iq[iq_num]);
231 
232 		if (count > 0) {
233 			if (type == SW_SCHED_TYPE_DIRECT)
234 				pkts_done += sw_schedule_dir_to_cq(sw, qid,
235 						iq_num, count);
236 			else if (type == RTE_SCHED_TYPE_ATOMIC)
237 				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
238 						iq_num, count);
239 			else
240 				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
241 						iq_num, count,
242 						type == RTE_SCHED_TYPE_ORDERED);
243 		}
244 
245 		/* Check if the IQ that was polled is now empty, and unset it
246 		 * in the IQ mask if its empty.
247 		 */
248 		int all_done = (pkts_done == count);
249 
250 		qid->iq_pkt_mask &= ~(all_done << (iq_num));
251 		pkts += pkts_done;
252 	}
253 
254 	return pkts;
255 }
256 
257 /* This function will perform re-ordering of packets, and injecting into
258  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
259  * contiguous in that array, this function accepts a "range" of QIDs to scan.
260  */
261 static uint16_t
262 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
263 {
264 	/* Perform egress reordering */
265 	struct rte_event *qe;
266 	uint32_t pkts_iter = 0;
267 
268 	for (; qid_start < qid_end; qid_start++) {
269 		struct sw_qid *qid = &sw->qids[qid_start];
270 		int i, num_entries_in_use;
271 
272 		if (qid->type != RTE_SCHED_TYPE_ORDERED)
273 			continue;
274 
275 		num_entries_in_use = rte_ring_free_count(
276 					qid->reorder_buffer_freelist);
277 
278 		for (i = 0; i < num_entries_in_use; i++) {
279 			struct reorder_buffer_entry *entry;
280 			int j;
281 
282 			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
283 
284 			if (!entry->ready)
285 				break;
286 
287 			for (j = 0; j < entry->num_fragments; j++) {
288 				uint16_t dest_qid;
289 				uint16_t dest_iq;
290 
291 				int idx = entry->fragment_index + j;
292 				qe = &entry->fragments[idx];
293 
294 				dest_qid = qe->queue_id;
295 				dest_iq  = PRIO_TO_IQ(qe->priority);
296 
297 				if (dest_qid >= sw->qid_count) {
298 					sw->stats.rx_dropped++;
299 					continue;
300 				}
301 
302 				pkts_iter++;
303 
304 				struct sw_qid *q = &sw->qids[dest_qid];
305 				struct sw_iq *iq = &q->iq[dest_iq];
306 
307 				/* we checked for space above, so enqueue must
308 				 * succeed
309 				 */
310 				iq_enqueue(sw, iq, qe);
311 				q->iq_pkt_mask |= (1 << (dest_iq));
312 				q->iq_pkt_count[dest_iq]++;
313 				q->stats.rx_pkts++;
314 			}
315 
316 			entry->ready = (j != entry->num_fragments);
317 			entry->num_fragments -= j;
318 			entry->fragment_index += j;
319 
320 			if (!entry->ready) {
321 				entry->fragment_index = 0;
322 
323 				rte_ring_sp_enqueue(
324 						qid->reorder_buffer_freelist,
325 						entry);
326 
327 				qid->reorder_buffer_index++;
328 				qid->reorder_buffer_index %= qid->window_size;
329 			}
330 		}
331 	}
332 	return pkts_iter;
333 }
334 
335 static __rte_always_inline void
336 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
337 {
338 	RTE_SET_USED(sw);
339 	struct rte_event_ring *worker = port->rx_worker_ring;
340 	port->pp_buf_start = 0;
341 	port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
342 			RTE_DIM(port->pp_buf), NULL);
343 }
344 
345 static __rte_always_inline uint32_t
346 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
347 {
348 	static struct reorder_buffer_entry dummy_rob;
349 	uint32_t pkts_iter = 0;
350 	struct sw_port *port = &sw->ports[port_id];
351 
352 	/* If shadow ring has 0 pkts, pull from worker ring */
353 	if (port->pp_buf_count == 0)
354 		sw_refill_pp_buf(sw, port);
355 
356 	while (port->pp_buf_count) {
357 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
358 		struct sw_hist_list_entry *hist_entry = NULL;
359 		uint8_t flags = qe->op;
360 		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
361 		int needs_reorder = 0;
362 		/* if no-reordering, having PARTIAL == NEW */
363 		if (!allow_reorder && !eop)
364 			flags = QE_FLAG_VALID;
365 
366 		/*
367 		 * if we don't have space for this packet in an IQ,
368 		 * then move on to next queue. Technically, for a
369 		 * packet that needs reordering, we don't need to check
370 		 * here, but it simplifies things not to special-case
371 		 */
372 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
373 		struct sw_qid *qid = &sw->qids[qe->queue_id];
374 
375 		/* now process based on flags. Note that for directed
376 		 * queues, the enqueue_flush masks off all but the
377 		 * valid flag. This makes FWD and PARTIAL enqueues just
378 		 * NEW type, and makes DROPS no-op calls.
379 		 */
380 		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
381 			const uint32_t hist_tail = port->hist_tail &
382 					(SW_PORT_HIST_LIST - 1);
383 
384 			hist_entry = &port->hist_list[hist_tail];
385 			const uint32_t hist_qid = hist_entry->qid;
386 			const uint32_t hist_fid = hist_entry->fid;
387 
388 			struct sw_fid_t *fid =
389 				&sw->qids[hist_qid].fids[hist_fid];
390 			fid->pcount -= eop;
391 			if (fid->pcount == 0)
392 				fid->cq = -1;
393 
394 			if (allow_reorder) {
395 				/* set reorder ready if an ordered QID */
396 				uintptr_t rob_ptr =
397 					(uintptr_t)hist_entry->rob_entry;
398 				const uintptr_t valid = (rob_ptr != 0);
399 				needs_reorder = valid;
400 				rob_ptr |=
401 					((valid - 1) & (uintptr_t)&dummy_rob);
402 				struct reorder_buffer_entry *tmp_rob_ptr =
403 					(struct reorder_buffer_entry *)rob_ptr;
404 				tmp_rob_ptr->ready = eop * needs_reorder;
405 			}
406 
407 			port->inflights -= eop;
408 			port->hist_tail += eop;
409 		}
410 		if (flags & QE_FLAG_VALID) {
411 			port->stats.rx_pkts++;
412 
413 			if (allow_reorder && needs_reorder) {
414 				struct reorder_buffer_entry *rob_entry =
415 						hist_entry->rob_entry;
416 
417 				hist_entry->rob_entry = NULL;
418 				/* Although fragmentation not currently
419 				 * supported by eventdev API, we support it
420 				 * here. Open: How do we alert the user that
421 				 * they've exceeded max frags?
422 				 */
423 				int num_frag = rob_entry->num_fragments;
424 				if (num_frag == SW_FRAGMENTS_MAX)
425 					sw->stats.rx_dropped++;
426 				else {
427 					int idx = rob_entry->num_fragments++;
428 					rob_entry->fragments[idx] = *qe;
429 				}
430 				goto end_qe;
431 			}
432 
433 			/* Use the iq_num from above to push the QE
434 			 * into the qid at the right priority
435 			 */
436 
437 			qid->iq_pkt_mask |= (1 << (iq_num));
438 			iq_enqueue(sw, &qid->iq[iq_num], qe);
439 			qid->iq_pkt_count[iq_num]++;
440 			qid->stats.rx_pkts++;
441 			pkts_iter++;
442 		}
443 
444 end_qe:
445 		port->pp_buf_start++;
446 		port->pp_buf_count--;
447 	} /* while (avail_qes) */
448 
449 	return pkts_iter;
450 }
451 
452 static uint32_t
453 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
454 {
455 	return __pull_port_lb(sw, port_id, 1);
456 }
457 
458 static uint32_t
459 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
460 {
461 	return __pull_port_lb(sw, port_id, 0);
462 }
463 
464 static uint32_t
465 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
466 {
467 	uint32_t pkts_iter = 0;
468 	struct sw_port *port = &sw->ports[port_id];
469 
470 	/* If shadow ring has 0 pkts, pull from worker ring */
471 	if (port->pp_buf_count == 0)
472 		sw_refill_pp_buf(sw, port);
473 
474 	while (port->pp_buf_count) {
475 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
476 		uint8_t flags = qe->op;
477 
478 		if ((flags & QE_FLAG_VALID) == 0)
479 			goto end_qe;
480 
481 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
482 		struct sw_qid *qid = &sw->qids[qe->queue_id];
483 		struct sw_iq *iq = &qid->iq[iq_num];
484 
485 		port->stats.rx_pkts++;
486 
487 		/* Use the iq_num from above to push the QE
488 		 * into the qid at the right priority
489 		 */
490 		qid->iq_pkt_mask |= (1 << (iq_num));
491 		iq_enqueue(sw, iq, qe);
492 		qid->iq_pkt_count[iq_num]++;
493 		qid->stats.rx_pkts++;
494 		pkts_iter++;
495 
496 end_qe:
497 		port->pp_buf_start++;
498 		port->pp_buf_count--;
499 	} /* while port->pp_buf_count */
500 
501 	return pkts_iter;
502 }
503 
504 void
505 sw_event_schedule(struct rte_eventdev *dev)
506 {
507 	struct sw_evdev *sw = sw_pmd_priv(dev);
508 	uint32_t in_pkts, out_pkts;
509 	uint32_t out_pkts_total = 0, in_pkts_total = 0;
510 	int32_t sched_quanta = sw->sched_quanta;
511 	uint32_t i;
512 
513 	sw->sched_called++;
514 	if (unlikely(!sw->started))
515 		return;
516 
517 	do {
518 		uint32_t in_pkts_this_iteration = 0;
519 
520 		/* Pull from rx_ring for ports */
521 		do {
522 			in_pkts = 0;
523 			for (i = 0; i < sw->port_count; i++) {
524 				/* ack the unlinks in progress as done */
525 				if (sw->ports[i].unlinks_in_progress)
526 					sw->ports[i].unlinks_in_progress = 0;
527 
528 				if (sw->ports[i].is_directed)
529 					in_pkts += sw_schedule_pull_port_dir(sw, i);
530 				else if (sw->ports[i].num_ordered_qids > 0)
531 					in_pkts += sw_schedule_pull_port_lb(sw, i);
532 				else
533 					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
534 			}
535 
536 			/* QID scan for re-ordered */
537 			in_pkts += sw_schedule_reorder(sw, 0,
538 					sw->qid_count);
539 			in_pkts_this_iteration += in_pkts;
540 		} while (in_pkts > 4 &&
541 				(int)in_pkts_this_iteration < sched_quanta);
542 
543 		out_pkts = sw_schedule_qid_to_cq(sw);
544 		out_pkts_total += out_pkts;
545 		in_pkts_total += in_pkts_this_iteration;
546 
547 		if (in_pkts == 0 && out_pkts == 0)
548 			break;
549 	} while ((int)out_pkts_total < sched_quanta);
550 
551 	sw->stats.tx_pkts += out_pkts_total;
552 	sw->stats.rx_pkts += in_pkts_total;
553 
554 	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
555 	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
556 
557 	/* push all the internal buffered QEs in port->cq_ring to the
558 	 * worker cores: aka, do the ring transfers batched.
559 	 */
560 	for (i = 0; i < sw->port_count; i++) {
561 		struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
562 		rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
563 				sw->ports[i].cq_buf_count,
564 				&sw->cq_ring_space[i]);
565 		sw->ports[i].cq_buf_count = 0;
566 	}
567 
568 }
569