xref: /dpdk/drivers/event/sw/sw_evdev_scheduler.c (revision 89f0711f9ddfb5822da9d34f384b92f72a61c4dc)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4 
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10 
11 #define SW_IQS_MASK (SW_IQS_MAX-1)
12 
13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14  * CLZ twice is faster than caching the value due to data dependencies
15  */
16 #define PKT_MASK_TO_IQ(pkts) \
17 	(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
18 
19 #if SW_IQS_MAX != 4
20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
21 #endif
22 #define PRIO_TO_IQ(prio) (prio >> 6)
23 
24 #define MAX_PER_IQ_DEQUEUE 48
25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26 /* use cheap bit mixing, we only need to lose a few bits */
27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
28 
29 static inline uint32_t
30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
31 		uint32_t iq_num, unsigned int count)
32 {
33 	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
34 	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
35 	uint32_t nb_blocked = 0;
36 	uint32_t i;
37 
38 	if (count > MAX_PER_IQ_DEQUEUE)
39 		count = MAX_PER_IQ_DEQUEUE;
40 
41 	/* This is the QID ID. The QID ID is static, hence it can be
42 	 * used to identify the stage of processing in history lists etc
43 	 */
44 	uint32_t qid_id = qid->id;
45 
46 	iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
47 	for (i = 0; i < count; i++) {
48 		const struct rte_event *qe = &qes[i];
49 		const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
50 		struct sw_fid_t *fid = &qid->fids[flow_id];
51 		int cq = fid->cq;
52 
53 		if (cq < 0) {
54 			uint32_t cq_idx = qid->cq_next_tx++;
55 			if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
56 				qid->cq_next_tx = 0;
57 			cq = qid->cq_map[cq_idx];
58 
59 			/* find least used */
60 			int cq_free_cnt = sw->cq_ring_space[cq];
61 			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
62 					cq_idx++) {
63 				int test_cq = qid->cq_map[cq_idx];
64 				int test_cq_free = sw->cq_ring_space[test_cq];
65 				if (test_cq_free > cq_free_cnt) {
66 					cq = test_cq;
67 					cq_free_cnt = test_cq_free;
68 				}
69 			}
70 
71 			fid->cq = cq; /* this pins early */
72 		}
73 
74 		if (sw->cq_ring_space[cq] == 0 ||
75 				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
76 			blocked_qes[nb_blocked++] = *qe;
77 			continue;
78 		}
79 
80 		struct sw_port *p = &sw->ports[cq];
81 
82 		/* at this point we can queue up the packet on the cq_buf */
83 		fid->pcount++;
84 		p->cq_buf[p->cq_buf_count++] = *qe;
85 		p->inflights++;
86 		sw->cq_ring_space[cq]--;
87 
88 		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
89 		p->hist_list[head].fid = flow_id;
90 		p->hist_list[head].qid = qid_id;
91 
92 		p->stats.tx_pkts++;
93 		qid->stats.tx_pkts++;
94 		qid->to_port[cq]++;
95 
96 		/* if we just filled in the last slot, flush the buffer */
97 		if (sw->cq_ring_space[cq] == 0) {
98 			struct rte_event_ring *worker = p->cq_worker_ring;
99 			rte_event_ring_enqueue_burst(worker, p->cq_buf,
100 					p->cq_buf_count,
101 					&sw->cq_ring_space[cq]);
102 			p->cq_buf_count = 0;
103 		}
104 	}
105 	iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
106 
107 	return count - nb_blocked;
108 }
109 
110 static inline uint32_t
111 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
112 		uint32_t iq_num, unsigned int count, int keep_order)
113 {
114 	uint32_t i;
115 	uint32_t cq_idx = qid->cq_next_tx;
116 
117 	/* This is the QID ID. The QID ID is static, hence it can be
118 	 * used to identify the stage of processing in history lists etc
119 	 */
120 	uint32_t qid_id = qid->id;
121 
122 	if (count > MAX_PER_IQ_DEQUEUE)
123 		count = MAX_PER_IQ_DEQUEUE;
124 
125 	if (keep_order)
126 		/* only schedule as many as we have reorder buffer entries */
127 		count = RTE_MIN(count,
128 				rte_ring_count(qid->reorder_buffer_freelist));
129 
130 	for (i = 0; i < count; i++) {
131 		const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
132 		uint32_t cq_check_count = 0;
133 		uint32_t cq;
134 
135 		/*
136 		 *  for parallel, just send to next available CQ in round-robin
137 		 * fashion. So scan for an available CQ. If all CQs are full
138 		 * just return and move on to next QID
139 		 */
140 		do {
141 			if (++cq_check_count > qid->cq_num_mapped_cqs)
142 				goto exit;
143 			cq = qid->cq_map[cq_idx];
144 			if (++cq_idx == qid->cq_num_mapped_cqs)
145 				cq_idx = 0;
146 		} while (rte_event_ring_free_count(
147 				sw->ports[cq].cq_worker_ring) == 0 ||
148 				sw->ports[cq].inflights == SW_PORT_HIST_LIST);
149 
150 		struct sw_port *p = &sw->ports[cq];
151 		if (sw->cq_ring_space[cq] == 0 ||
152 				p->inflights == SW_PORT_HIST_LIST)
153 			break;
154 
155 		sw->cq_ring_space[cq]--;
156 
157 		qid->stats.tx_pkts++;
158 
159 		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
160 		p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
161 		p->hist_list[head].qid = qid_id;
162 
163 		if (keep_order)
164 			rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
165 					(void *)&p->hist_list[head].rob_entry);
166 
167 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
168 		iq_pop(sw, &qid->iq[iq_num]);
169 
170 		rte_compiler_barrier();
171 		p->inflights++;
172 		p->stats.tx_pkts++;
173 		p->hist_head++;
174 	}
175 exit:
176 	qid->cq_next_tx = cq_idx;
177 	return i;
178 }
179 
180 static uint32_t
181 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
182 		uint32_t iq_num, unsigned int count __rte_unused)
183 {
184 	uint32_t cq_id = qid->cq_map[0];
185 	struct sw_port *port = &sw->ports[cq_id];
186 
187 	/* get max burst enq size for cq_ring */
188 	uint32_t count_free = sw->cq_ring_space[cq_id];
189 	if (count_free == 0)
190 		return 0;
191 
192 	/* burst dequeue from the QID IQ ring */
193 	struct sw_iq *iq = &qid->iq[iq_num];
194 	uint32_t ret = iq_dequeue_burst(sw, iq,
195 			&port->cq_buf[port->cq_buf_count], count_free);
196 	port->cq_buf_count += ret;
197 
198 	/* Update QID, Port and Total TX stats */
199 	qid->stats.tx_pkts += ret;
200 	port->stats.tx_pkts += ret;
201 
202 	/* Subtract credits from cached value */
203 	sw->cq_ring_space[cq_id] -= ret;
204 
205 	return ret;
206 }
207 
208 static uint32_t
209 sw_schedule_qid_to_cq(struct sw_evdev *sw)
210 {
211 	uint32_t pkts = 0;
212 	uint32_t qid_idx;
213 
214 	sw->sched_cq_qid_called++;
215 
216 	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
217 		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
218 
219 		int type = qid->type;
220 		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
221 
222 		/* zero mapped CQs indicates directed */
223 		if (iq_num >= SW_IQS_MAX)
224 			continue;
225 
226 		uint32_t pkts_done = 0;
227 		uint32_t count = iq_count(&qid->iq[iq_num]);
228 
229 		if (count > 0) {
230 			if (type == SW_SCHED_TYPE_DIRECT)
231 				pkts_done += sw_schedule_dir_to_cq(sw, qid,
232 						iq_num, count);
233 			else if (type == RTE_SCHED_TYPE_ATOMIC)
234 				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
235 						iq_num, count);
236 			else
237 				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
238 						iq_num, count,
239 						type == RTE_SCHED_TYPE_ORDERED);
240 		}
241 
242 		/* Check if the IQ that was polled is now empty, and unset it
243 		 * in the IQ mask if its empty.
244 		 */
245 		int all_done = (pkts_done == count);
246 
247 		qid->iq_pkt_mask &= ~(all_done << (iq_num));
248 		pkts += pkts_done;
249 	}
250 
251 	return pkts;
252 }
253 
254 /* This function will perform re-ordering of packets, and injecting into
255  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
256  * contiguous in that array, this function accepts a "range" of QIDs to scan.
257  */
258 static uint16_t
259 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
260 {
261 	/* Perform egress reordering */
262 	struct rte_event *qe;
263 	uint32_t pkts_iter = 0;
264 
265 	for (; qid_start < qid_end; qid_start++) {
266 		struct sw_qid *qid = &sw->qids[qid_start];
267 		int i, num_entries_in_use;
268 
269 		if (qid->type != RTE_SCHED_TYPE_ORDERED)
270 			continue;
271 
272 		num_entries_in_use = rte_ring_free_count(
273 					qid->reorder_buffer_freelist);
274 
275 		for (i = 0; i < num_entries_in_use; i++) {
276 			struct reorder_buffer_entry *entry;
277 			int j;
278 
279 			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
280 
281 			if (!entry->ready)
282 				break;
283 
284 			for (j = 0; j < entry->num_fragments; j++) {
285 				uint16_t dest_qid;
286 				uint16_t dest_iq;
287 
288 				int idx = entry->fragment_index + j;
289 				qe = &entry->fragments[idx];
290 
291 				dest_qid = qe->queue_id;
292 				dest_iq  = PRIO_TO_IQ(qe->priority);
293 
294 				if (dest_qid >= sw->qid_count) {
295 					sw->stats.rx_dropped++;
296 					continue;
297 				}
298 
299 				pkts_iter++;
300 
301 				struct sw_qid *q = &sw->qids[dest_qid];
302 				struct sw_iq *iq = &q->iq[dest_iq];
303 
304 				/* we checked for space above, so enqueue must
305 				 * succeed
306 				 */
307 				iq_enqueue(sw, iq, qe);
308 				q->iq_pkt_mask |= (1 << (dest_iq));
309 				q->iq_pkt_count[dest_iq]++;
310 				q->stats.rx_pkts++;
311 			}
312 
313 			entry->ready = (j != entry->num_fragments);
314 			entry->num_fragments -= j;
315 			entry->fragment_index += j;
316 
317 			if (!entry->ready) {
318 				entry->fragment_index = 0;
319 
320 				rte_ring_sp_enqueue(
321 						qid->reorder_buffer_freelist,
322 						entry);
323 
324 				qid->reorder_buffer_index++;
325 				qid->reorder_buffer_index %= qid->window_size;
326 			}
327 		}
328 	}
329 	return pkts_iter;
330 }
331 
332 static __rte_always_inline void
333 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
334 {
335 	RTE_SET_USED(sw);
336 	struct rte_event_ring *worker = port->rx_worker_ring;
337 	port->pp_buf_start = 0;
338 	port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
339 			RTE_DIM(port->pp_buf), NULL);
340 }
341 
342 static __rte_always_inline uint32_t
343 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
344 {
345 	static struct reorder_buffer_entry dummy_rob;
346 	uint32_t pkts_iter = 0;
347 	struct sw_port *port = &sw->ports[port_id];
348 
349 	/* If shadow ring has 0 pkts, pull from worker ring */
350 	if (port->pp_buf_count == 0)
351 		sw_refill_pp_buf(sw, port);
352 
353 	while (port->pp_buf_count) {
354 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
355 		struct sw_hist_list_entry *hist_entry = NULL;
356 		uint8_t flags = qe->op;
357 		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
358 		int needs_reorder = 0;
359 		/* if no-reordering, having PARTIAL == NEW */
360 		if (!allow_reorder && !eop)
361 			flags = QE_FLAG_VALID;
362 
363 		/*
364 		 * if we don't have space for this packet in an IQ,
365 		 * then move on to next queue. Technically, for a
366 		 * packet that needs reordering, we don't need to check
367 		 * here, but it simplifies things not to special-case
368 		 */
369 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
370 		struct sw_qid *qid = &sw->qids[qe->queue_id];
371 
372 		/* now process based on flags. Note that for directed
373 		 * queues, the enqueue_flush masks off all but the
374 		 * valid flag. This makes FWD and PARTIAL enqueues just
375 		 * NEW type, and makes DROPS no-op calls.
376 		 */
377 		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
378 			const uint32_t hist_tail = port->hist_tail &
379 					(SW_PORT_HIST_LIST - 1);
380 
381 			hist_entry = &port->hist_list[hist_tail];
382 			const uint32_t hist_qid = hist_entry->qid;
383 			const uint32_t hist_fid = hist_entry->fid;
384 
385 			struct sw_fid_t *fid =
386 				&sw->qids[hist_qid].fids[hist_fid];
387 			fid->pcount -= eop;
388 			if (fid->pcount == 0)
389 				fid->cq = -1;
390 
391 			if (allow_reorder) {
392 				/* set reorder ready if an ordered QID */
393 				uintptr_t rob_ptr =
394 					(uintptr_t)hist_entry->rob_entry;
395 				const uintptr_t valid = (rob_ptr != 0);
396 				needs_reorder = valid;
397 				rob_ptr |=
398 					((valid - 1) & (uintptr_t)&dummy_rob);
399 				struct reorder_buffer_entry *tmp_rob_ptr =
400 					(struct reorder_buffer_entry *)rob_ptr;
401 				tmp_rob_ptr->ready = eop * needs_reorder;
402 			}
403 
404 			port->inflights -= eop;
405 			port->hist_tail += eop;
406 		}
407 		if (flags & QE_FLAG_VALID) {
408 			port->stats.rx_pkts++;
409 
410 			if (allow_reorder && needs_reorder) {
411 				struct reorder_buffer_entry *rob_entry =
412 						hist_entry->rob_entry;
413 
414 				hist_entry->rob_entry = NULL;
415 				/* Although fragmentation not currently
416 				 * supported by eventdev API, we support it
417 				 * here. Open: How do we alert the user that
418 				 * they've exceeded max frags?
419 				 */
420 				int num_frag = rob_entry->num_fragments;
421 				if (num_frag == SW_FRAGMENTS_MAX)
422 					sw->stats.rx_dropped++;
423 				else {
424 					int idx = rob_entry->num_fragments++;
425 					rob_entry->fragments[idx] = *qe;
426 				}
427 				goto end_qe;
428 			}
429 
430 			/* Use the iq_num from above to push the QE
431 			 * into the qid at the right priority
432 			 */
433 
434 			qid->iq_pkt_mask |= (1 << (iq_num));
435 			iq_enqueue(sw, &qid->iq[iq_num], qe);
436 			qid->iq_pkt_count[iq_num]++;
437 			qid->stats.rx_pkts++;
438 			pkts_iter++;
439 		}
440 
441 end_qe:
442 		port->pp_buf_start++;
443 		port->pp_buf_count--;
444 	} /* while (avail_qes) */
445 
446 	return pkts_iter;
447 }
448 
449 static uint32_t
450 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
451 {
452 	return __pull_port_lb(sw, port_id, 1);
453 }
454 
455 static uint32_t
456 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
457 {
458 	return __pull_port_lb(sw, port_id, 0);
459 }
460 
461 static uint32_t
462 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
463 {
464 	uint32_t pkts_iter = 0;
465 	struct sw_port *port = &sw->ports[port_id];
466 
467 	/* If shadow ring has 0 pkts, pull from worker ring */
468 	if (port->pp_buf_count == 0)
469 		sw_refill_pp_buf(sw, port);
470 
471 	while (port->pp_buf_count) {
472 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
473 		uint8_t flags = qe->op;
474 
475 		if ((flags & QE_FLAG_VALID) == 0)
476 			goto end_qe;
477 
478 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
479 		struct sw_qid *qid = &sw->qids[qe->queue_id];
480 		struct sw_iq *iq = &qid->iq[iq_num];
481 
482 		port->stats.rx_pkts++;
483 
484 		/* Use the iq_num from above to push the QE
485 		 * into the qid at the right priority
486 		 */
487 		qid->iq_pkt_mask |= (1 << (iq_num));
488 		iq_enqueue(sw, iq, qe);
489 		qid->iq_pkt_count[iq_num]++;
490 		qid->stats.rx_pkts++;
491 		pkts_iter++;
492 
493 end_qe:
494 		port->pp_buf_start++;
495 		port->pp_buf_count--;
496 	} /* while port->pp_buf_count */
497 
498 	return pkts_iter;
499 }
500 
501 void
502 sw_event_schedule(struct rte_eventdev *dev)
503 {
504 	struct sw_evdev *sw = sw_pmd_priv(dev);
505 	uint32_t in_pkts, out_pkts;
506 	uint32_t out_pkts_total = 0, in_pkts_total = 0;
507 	int32_t sched_quanta = sw->sched_quanta;
508 	uint32_t i;
509 
510 	sw->sched_called++;
511 	if (!sw->started)
512 		return;
513 
514 	do {
515 		uint32_t in_pkts_this_iteration = 0;
516 
517 		/* Pull from rx_ring for ports */
518 		do {
519 			in_pkts = 0;
520 			for (i = 0; i < sw->port_count; i++)
521 				if (sw->ports[i].is_directed)
522 					in_pkts += sw_schedule_pull_port_dir(sw, i);
523 				else if (sw->ports[i].num_ordered_qids > 0)
524 					in_pkts += sw_schedule_pull_port_lb(sw, i);
525 				else
526 					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
527 
528 			/* QID scan for re-ordered */
529 			in_pkts += sw_schedule_reorder(sw, 0,
530 					sw->qid_count);
531 			in_pkts_this_iteration += in_pkts;
532 		} while (in_pkts > 4 &&
533 				(int)in_pkts_this_iteration < sched_quanta);
534 
535 		out_pkts = 0;
536 		out_pkts += sw_schedule_qid_to_cq(sw);
537 		out_pkts_total += out_pkts;
538 		in_pkts_total += in_pkts_this_iteration;
539 
540 		if (in_pkts == 0 && out_pkts == 0)
541 			break;
542 	} while ((int)out_pkts_total < sched_quanta);
543 
544 	/* push all the internal buffered QEs in port->cq_ring to the
545 	 * worker cores: aka, do the ring transfers batched.
546 	 */
547 	for (i = 0; i < sw->port_count; i++) {
548 		struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
549 		rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
550 				sw->ports[i].cq_buf_count,
551 				&sw->cq_ring_space[i]);
552 		sw->ports[i].cq_buf_count = 0;
553 	}
554 
555 	sw->stats.tx_pkts += out_pkts_total;
556 	sw->stats.rx_pkts += in_pkts_total;
557 
558 	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
559 	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
560 
561 }
562