xref: /dpdk/drivers/event/sw/sw_evdev_scheduler.c (revision a3a2e2c8f7de433e10b1548df65b20bf10086d9c)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
5  *
6  *   Redistribution and use in source and binary forms, with or without
7  *   modification, are permitted provided that the following conditions
8  *   are met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above copyright
13  *       notice, this list of conditions and the following disclaimer in
14  *       the documentation and/or other materials provided with the
15  *       distribution.
16  *     * Neither the name of Intel Corporation nor the names of its
17  *       contributors may be used to endorse or promote products derived
18  *       from this software without specific prior written permission.
19  *
20  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <rte_ring.h>
34 #include <rte_hash_crc.h>
35 #include "sw_evdev.h"
36 #include "iq_ring.h"
37 #include "event_ring.h"
38 
39 #define SW_IQS_MASK (SW_IQS_MAX-1)
40 
41 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
42  * CLZ twice is faster than caching the value due to data dependencies
43  */
44 #define PKT_MASK_TO_IQ(pkts) \
45 	(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
46 
47 #if SW_IQS_MAX != 4
48 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
49 #endif
50 #define PRIO_TO_IQ(prio) (prio >> 6)
51 
52 #define MAX_PER_IQ_DEQUEUE 48
53 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
54 /* use cheap bit mixing, we only need to lose a few bits */
55 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
56 
57 static inline uint32_t
58 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
59 		uint32_t iq_num, unsigned int count)
60 {
61 	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
62 	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
63 	uint32_t nb_blocked = 0;
64 	uint32_t i;
65 
66 	if (count > MAX_PER_IQ_DEQUEUE)
67 		count = MAX_PER_IQ_DEQUEUE;
68 
69 	/* This is the QID ID. The QID ID is static, hence it can be
70 	 * used to identify the stage of processing in history lists etc
71 	 */
72 	uint32_t qid_id = qid->id;
73 
74 	iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
75 	for (i = 0; i < count; i++) {
76 		const struct rte_event *qe = &qes[i];
77 		const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
78 		struct sw_fid_t *fid = &qid->fids[flow_id];
79 		int cq = fid->cq;
80 
81 		if (cq < 0) {
82 			uint32_t cq_idx = qid->cq_next_tx++;
83 			if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
84 				qid->cq_next_tx = 0;
85 			cq = qid->cq_map[cq_idx];
86 
87 			/* find least used */
88 			int cq_free_cnt = sw->cq_ring_space[cq];
89 			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
90 					cq_idx++) {
91 				int test_cq = qid->cq_map[cq_idx];
92 				int test_cq_free = sw->cq_ring_space[test_cq];
93 				if (test_cq_free > cq_free_cnt) {
94 					cq = test_cq;
95 					cq_free_cnt = test_cq_free;
96 				}
97 			}
98 
99 			fid->cq = cq; /* this pins early */
100 		}
101 
102 		if (sw->cq_ring_space[cq] == 0 ||
103 				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
104 			blocked_qes[nb_blocked++] = *qe;
105 			continue;
106 		}
107 
108 		struct sw_port *p = &sw->ports[cq];
109 
110 		/* at this point we can queue up the packet on the cq_buf */
111 		fid->pcount++;
112 		p->cq_buf[p->cq_buf_count++] = *qe;
113 		p->inflights++;
114 		sw->cq_ring_space[cq]--;
115 
116 		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
117 		p->hist_list[head].fid = flow_id;
118 		p->hist_list[head].qid = qid_id;
119 
120 		p->stats.tx_pkts++;
121 		qid->stats.tx_pkts++;
122 		qid->to_port[cq]++;
123 
124 		/* if we just filled in the last slot, flush the buffer */
125 		if (sw->cq_ring_space[cq] == 0) {
126 			struct qe_ring *worker = p->cq_worker_ring;
127 			qe_ring_enqueue_burst(worker, p->cq_buf,
128 					p->cq_buf_count,
129 					&sw->cq_ring_space[cq]);
130 			p->cq_buf_count = 0;
131 		}
132 	}
133 	iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
134 
135 	return count - nb_blocked;
136 }
137 
138 static inline uint32_t
139 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
140 		uint32_t iq_num, unsigned int count, int keep_order)
141 {
142 	uint32_t i;
143 	uint32_t cq_idx = qid->cq_next_tx;
144 
145 	/* This is the QID ID. The QID ID is static, hence it can be
146 	 * used to identify the stage of processing in history lists etc
147 	 */
148 	uint32_t qid_id = qid->id;
149 
150 	if (count > MAX_PER_IQ_DEQUEUE)
151 		count = MAX_PER_IQ_DEQUEUE;
152 
153 	if (keep_order)
154 		/* only schedule as many as we have reorder buffer entries */
155 		count = RTE_MIN(count,
156 				rte_ring_count(qid->reorder_buffer_freelist));
157 
158 	for (i = 0; i < count; i++) {
159 		const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
160 		uint32_t cq_check_count = 0;
161 		uint32_t cq;
162 
163 		/*
164 		 *  for parallel, just send to next available CQ in round-robin
165 		 * fashion. So scan for an available CQ. If all CQs are full
166 		 * just return and move on to next QID
167 		 */
168 		do {
169 			if (++cq_check_count > qid->cq_num_mapped_cqs)
170 				goto exit;
171 			cq = qid->cq_map[cq_idx];
172 			if (++cq_idx == qid->cq_num_mapped_cqs)
173 				cq_idx = 0;
174 		} while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
175 				sw->ports[cq].inflights == SW_PORT_HIST_LIST);
176 
177 		struct sw_port *p = &sw->ports[cq];
178 		if (sw->cq_ring_space[cq] == 0 ||
179 				p->inflights == SW_PORT_HIST_LIST)
180 			break;
181 
182 		sw->cq_ring_space[cq]--;
183 
184 		qid->stats.tx_pkts++;
185 
186 		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
187 		p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
188 		p->hist_list[head].qid = qid_id;
189 
190 		if (keep_order)
191 			rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
192 					(void *)&p->hist_list[head].rob_entry);
193 
194 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
195 		iq_ring_pop(qid->iq[iq_num]);
196 
197 		rte_compiler_barrier();
198 		p->inflights++;
199 		p->stats.tx_pkts++;
200 		p->hist_head++;
201 	}
202 exit:
203 	qid->cq_next_tx = cq_idx;
204 	return i;
205 }
206 
207 static uint32_t
208 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
209 		uint32_t iq_num, unsigned int count __rte_unused)
210 {
211 	uint32_t cq_id = qid->cq_map[0];
212 	struct sw_port *port = &sw->ports[cq_id];
213 
214 	/* get max burst enq size for cq_ring */
215 	uint32_t count_free = sw->cq_ring_space[cq_id];
216 	if (count_free == 0)
217 		return 0;
218 
219 	/* burst dequeue from the QID IQ ring */
220 	struct iq_ring *ring = qid->iq[iq_num];
221 	uint32_t ret = iq_ring_dequeue_burst(ring,
222 			&port->cq_buf[port->cq_buf_count], count_free);
223 	port->cq_buf_count += ret;
224 
225 	/* Update QID, Port and Total TX stats */
226 	qid->stats.tx_pkts += ret;
227 	port->stats.tx_pkts += ret;
228 
229 	/* Subtract credits from cached value */
230 	sw->cq_ring_space[cq_id] -= ret;
231 
232 	return ret;
233 }
234 
235 static uint32_t
236 sw_schedule_qid_to_cq(struct sw_evdev *sw)
237 {
238 	uint32_t pkts = 0;
239 	uint32_t qid_idx;
240 
241 	sw->sched_cq_qid_called++;
242 
243 	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
244 		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
245 
246 		int type = qid->type;
247 		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
248 
249 		/* zero mapped CQs indicates directed */
250 		if (iq_num >= SW_IQS_MAX)
251 			continue;
252 
253 		uint32_t pkts_done = 0;
254 		uint32_t count = iq_ring_count(qid->iq[iq_num]);
255 
256 		if (count > 0) {
257 			if (type == SW_SCHED_TYPE_DIRECT)
258 				pkts_done += sw_schedule_dir_to_cq(sw, qid,
259 						iq_num, count);
260 			else if (type == RTE_SCHED_TYPE_ATOMIC)
261 				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
262 						iq_num, count);
263 			else
264 				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
265 						iq_num, count,
266 						type == RTE_SCHED_TYPE_ORDERED);
267 		}
268 
269 		/* Check if the IQ that was polled is now empty, and unset it
270 		 * in the IQ mask if its empty.
271 		 */
272 		int all_done = (pkts_done == count);
273 
274 		qid->iq_pkt_mask &= ~(all_done << (iq_num));
275 		pkts += pkts_done;
276 	}
277 
278 	return pkts;
279 }
280 
281 /* This function will perform re-ordering of packets, and injecting into
282  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
283  * contiguous in that array, this function accepts a "range" of QIDs to scan.
284  */
285 static uint16_t
286 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
287 {
288 	/* Perform egress reordering */
289 	struct rte_event *qe;
290 	uint32_t pkts_iter = 0;
291 
292 	for (; qid_start < qid_end; qid_start++) {
293 		struct sw_qid *qid = &sw->qids[qid_start];
294 		int i, num_entries_in_use;
295 
296 		if (qid->type != RTE_SCHED_TYPE_ORDERED)
297 			continue;
298 
299 		num_entries_in_use = rte_ring_free_count(
300 					qid->reorder_buffer_freelist);
301 
302 		for (i = 0; i < num_entries_in_use; i++) {
303 			struct reorder_buffer_entry *entry;
304 			int j;
305 
306 			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
307 
308 			if (!entry->ready)
309 				break;
310 
311 			for (j = 0; j < entry->num_fragments; j++) {
312 				uint16_t dest_qid;
313 				uint16_t dest_iq;
314 
315 				int idx = entry->fragment_index + j;
316 				qe = &entry->fragments[idx];
317 
318 				dest_qid = qe->queue_id;
319 				dest_iq  = PRIO_TO_IQ(qe->priority);
320 
321 				if (dest_qid >= sw->qid_count) {
322 					sw->stats.rx_dropped++;
323 					continue;
324 				}
325 
326 				struct sw_qid *dest_qid_ptr =
327 					&sw->qids[dest_qid];
328 				const struct iq_ring *dest_iq_ptr =
329 					dest_qid_ptr->iq[dest_iq];
330 				if (iq_ring_free_count(dest_iq_ptr) == 0)
331 					break;
332 
333 				pkts_iter++;
334 
335 				struct sw_qid *q = &sw->qids[dest_qid];
336 				struct iq_ring *r = q->iq[dest_iq];
337 
338 				/* we checked for space above, so enqueue must
339 				 * succeed
340 				 */
341 				iq_ring_enqueue(r, qe);
342 				q->iq_pkt_mask |= (1 << (dest_iq));
343 				q->iq_pkt_count[dest_iq]++;
344 				q->stats.rx_pkts++;
345 			}
346 
347 			entry->ready = (j != entry->num_fragments);
348 			entry->num_fragments -= j;
349 			entry->fragment_index += j;
350 
351 			if (!entry->ready) {
352 				entry->fragment_index = 0;
353 
354 				rte_ring_sp_enqueue(
355 						qid->reorder_buffer_freelist,
356 						entry);
357 
358 				qid->reorder_buffer_index++;
359 				qid->reorder_buffer_index %= qid->window_size;
360 			}
361 		}
362 	}
363 	return pkts_iter;
364 }
365 
366 static __rte_always_inline void
367 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
368 {
369 	RTE_SET_USED(sw);
370 	struct qe_ring *worker = port->rx_worker_ring;
371 	port->pp_buf_start = 0;
372 	port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
373 			RTE_DIM(port->pp_buf));
374 }
375 
376 static __rte_always_inline uint32_t
377 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
378 {
379 	static struct reorder_buffer_entry dummy_rob;
380 	uint32_t pkts_iter = 0;
381 	struct sw_port *port = &sw->ports[port_id];
382 
383 	/* If shadow ring has 0 pkts, pull from worker ring */
384 	if (port->pp_buf_count == 0)
385 		sw_refill_pp_buf(sw, port);
386 
387 	while (port->pp_buf_count) {
388 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
389 		struct sw_hist_list_entry *hist_entry = NULL;
390 		uint8_t flags = qe->op;
391 		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
392 		int needs_reorder = 0;
393 		/* if no-reordering, having PARTIAL == NEW */
394 		if (!allow_reorder && !eop)
395 			flags = QE_FLAG_VALID;
396 
397 		/*
398 		 * if we don't have space for this packet in an IQ,
399 		 * then move on to next queue. Technically, for a
400 		 * packet that needs reordering, we don't need to check
401 		 * here, but it simplifies things not to special-case
402 		 */
403 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
404 		struct sw_qid *qid = &sw->qids[qe->queue_id];
405 
406 		if ((flags & QE_FLAG_VALID) &&
407 				iq_ring_free_count(qid->iq[iq_num]) == 0)
408 			break;
409 
410 		/* now process based on flags. Note that for directed
411 		 * queues, the enqueue_flush masks off all but the
412 		 * valid flag. This makes FWD and PARTIAL enqueues just
413 		 * NEW type, and makes DROPS no-op calls.
414 		 */
415 		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
416 			const uint32_t hist_tail = port->hist_tail &
417 					(SW_PORT_HIST_LIST - 1);
418 
419 			hist_entry = &port->hist_list[hist_tail];
420 			const uint32_t hist_qid = hist_entry->qid;
421 			const uint32_t hist_fid = hist_entry->fid;
422 
423 			struct sw_fid_t *fid =
424 				&sw->qids[hist_qid].fids[hist_fid];
425 			fid->pcount -= eop;
426 			if (fid->pcount == 0)
427 				fid->cq = -1;
428 
429 			if (allow_reorder) {
430 				/* set reorder ready if an ordered QID */
431 				uintptr_t rob_ptr =
432 					(uintptr_t)hist_entry->rob_entry;
433 				const uintptr_t valid = (rob_ptr != 0);
434 				needs_reorder = valid;
435 				rob_ptr |=
436 					((valid - 1) & (uintptr_t)&dummy_rob);
437 				struct reorder_buffer_entry *tmp_rob_ptr =
438 					(struct reorder_buffer_entry *)rob_ptr;
439 				tmp_rob_ptr->ready = eop * needs_reorder;
440 			}
441 
442 			port->inflights -= eop;
443 			port->hist_tail += eop;
444 		}
445 		if (flags & QE_FLAG_VALID) {
446 			port->stats.rx_pkts++;
447 
448 			if (allow_reorder && needs_reorder) {
449 				struct reorder_buffer_entry *rob_entry =
450 						hist_entry->rob_entry;
451 
452 				hist_entry->rob_entry = NULL;
453 				/* Although fragmentation not currently
454 				 * supported by eventdev API, we support it
455 				 * here. Open: How do we alert the user that
456 				 * they've exceeded max frags?
457 				 */
458 				int num_frag = rob_entry->num_fragments;
459 				if (num_frag == SW_FRAGMENTS_MAX)
460 					sw->stats.rx_dropped++;
461 				else {
462 					int idx = rob_entry->num_fragments++;
463 					rob_entry->fragments[idx] = *qe;
464 				}
465 				goto end_qe;
466 			}
467 
468 			/* Use the iq_num from above to push the QE
469 			 * into the qid at the right priority
470 			 */
471 
472 			qid->iq_pkt_mask |= (1 << (iq_num));
473 			iq_ring_enqueue(qid->iq[iq_num], qe);
474 			qid->iq_pkt_count[iq_num]++;
475 			qid->stats.rx_pkts++;
476 			pkts_iter++;
477 		}
478 
479 end_qe:
480 		port->pp_buf_start++;
481 		port->pp_buf_count--;
482 	} /* while (avail_qes) */
483 
484 	return pkts_iter;
485 }
486 
487 static uint32_t
488 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
489 {
490 	return __pull_port_lb(sw, port_id, 1);
491 }
492 
493 static uint32_t
494 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
495 {
496 	return __pull_port_lb(sw, port_id, 0);
497 }
498 
499 static uint32_t
500 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
501 {
502 	uint32_t pkts_iter = 0;
503 	struct sw_port *port = &sw->ports[port_id];
504 
505 	/* If shadow ring has 0 pkts, pull from worker ring */
506 	if (port->pp_buf_count == 0)
507 		sw_refill_pp_buf(sw, port);
508 
509 	while (port->pp_buf_count) {
510 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
511 		uint8_t flags = qe->op;
512 
513 		if ((flags & QE_FLAG_VALID) == 0)
514 			goto end_qe;
515 
516 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
517 		struct sw_qid *qid = &sw->qids[qe->queue_id];
518 		struct iq_ring *iq_ring = qid->iq[iq_num];
519 
520 		if (iq_ring_free_count(iq_ring) == 0)
521 			break; /* move to next port */
522 
523 		port->stats.rx_pkts++;
524 
525 		/* Use the iq_num from above to push the QE
526 		 * into the qid at the right priority
527 		 */
528 		qid->iq_pkt_mask |= (1 << (iq_num));
529 		iq_ring_enqueue(iq_ring, qe);
530 		qid->iq_pkt_count[iq_num]++;
531 		qid->stats.rx_pkts++;
532 		pkts_iter++;
533 
534 end_qe:
535 		port->pp_buf_start++;
536 		port->pp_buf_count--;
537 	} /* while port->pp_buf_count */
538 
539 	return pkts_iter;
540 }
541 
542 void
543 sw_event_schedule(struct rte_eventdev *dev)
544 {
545 	struct sw_evdev *sw = sw_pmd_priv(dev);
546 	uint32_t in_pkts, out_pkts;
547 	uint32_t out_pkts_total = 0, in_pkts_total = 0;
548 	int32_t sched_quanta = sw->sched_quanta;
549 	uint32_t i;
550 
551 	sw->sched_called++;
552 	if (!sw->started)
553 		return;
554 
555 	do {
556 		uint32_t in_pkts_this_iteration = 0;
557 
558 		/* Pull from rx_ring for ports */
559 		do {
560 			in_pkts = 0;
561 			for (i = 0; i < sw->port_count; i++)
562 				if (sw->ports[i].is_directed)
563 					in_pkts += sw_schedule_pull_port_dir(sw, i);
564 				else if (sw->ports[i].num_ordered_qids > 0)
565 					in_pkts += sw_schedule_pull_port_lb(sw, i);
566 				else
567 					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
568 
569 			/* QID scan for re-ordered */
570 			in_pkts += sw_schedule_reorder(sw, 0,
571 					sw->qid_count);
572 			in_pkts_this_iteration += in_pkts;
573 		} while (in_pkts > 4 &&
574 				(int)in_pkts_this_iteration < sched_quanta);
575 
576 		out_pkts = 0;
577 		out_pkts += sw_schedule_qid_to_cq(sw);
578 		out_pkts_total += out_pkts;
579 		in_pkts_total += in_pkts_this_iteration;
580 
581 		if (in_pkts == 0 && out_pkts == 0)
582 			break;
583 	} while ((int)out_pkts_total < sched_quanta);
584 
585 	/* push all the internal buffered QEs in port->cq_ring to the
586 	 * worker cores: aka, do the ring transfers batched.
587 	 */
588 	for (i = 0; i < sw->port_count; i++) {
589 		struct qe_ring *worker = sw->ports[i].cq_worker_ring;
590 		qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
591 				sw->ports[i].cq_buf_count,
592 				&sw->cq_ring_space[i]);
593 		sw->ports[i].cq_buf_count = 0;
594 	}
595 
596 	sw->stats.tx_pkts += out_pkts_total;
597 	sw->stats.rx_pkts += in_pkts_total;
598 
599 	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
600 	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
601 
602 }
603