xref: /dpdk/drivers/event/sw/sw_evdev_scheduler.c (revision 0857b942113874c69dc3db5df11a828ee3cc9b6b)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
5  *
6  *   Redistribution and use in source and binary forms, with or without
7  *   modification, are permitted provided that the following conditions
8  *   are met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above copyright
13  *       notice, this list of conditions and the following disclaimer in
14  *       the documentation and/or other materials provided with the
15  *       distribution.
16  *     * Neither the name of Intel Corporation nor the names of its
17  *       contributors may be used to endorse or promote products derived
18  *       from this software without specific prior written permission.
19  *
20  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <rte_ring.h>
34 #include <rte_hash_crc.h>
35 #include "sw_evdev.h"
36 #include "iq_ring.h"
37 #include "event_ring.h"
38 
39 #define SW_IQS_MASK (SW_IQS_MAX-1)
40 
41 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
42  * CLZ twice is faster than caching the value due to data dependencies
43  */
44 #define PKT_MASK_TO_IQ(pkts) \
45 	(__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
46 
47 #if SW_IQS_MAX != 4
48 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
49 #endif
50 #define PRIO_TO_IQ(prio) (prio >> 6)
51 
52 #define MAX_PER_IQ_DEQUEUE 48
53 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
54 
55 static inline uint32_t
56 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
57 		uint32_t iq_num, unsigned int count)
58 {
59 	struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
60 	struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
61 	uint32_t nb_blocked = 0;
62 	uint32_t i;
63 
64 	if (count > MAX_PER_IQ_DEQUEUE)
65 		count = MAX_PER_IQ_DEQUEUE;
66 
67 	/* This is the QID ID. The QID ID is static, hence it can be
68 	 * used to identify the stage of processing in history lists etc
69 	 */
70 	uint32_t qid_id = qid->id;
71 
72 	iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
73 	for (i = 0; i < count; i++) {
74 		const struct rte_event *qe = &qes[i];
75 		/* use cheap bit mixing, we only need to lose a few bits */
76 		uint32_t flow_id32 = (qes[i].flow_id) ^ (qes[i].flow_id >> 10);
77 		const uint16_t flow_id = FLOWID_MASK & flow_id32;
78 		struct sw_fid_t *fid = &qid->fids[flow_id];
79 		int cq = fid->cq;
80 
81 		if (cq < 0) {
82 			uint32_t cq_idx = qid->cq_next_tx++;
83 			if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
84 				qid->cq_next_tx = 0;
85 			cq = qid->cq_map[cq_idx];
86 
87 			/* find least used */
88 			int cq_free_cnt = sw->cq_ring_space[cq];
89 			for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
90 					cq_idx++) {
91 				int test_cq = qid->cq_map[cq_idx];
92 				int test_cq_free = sw->cq_ring_space[test_cq];
93 				if (test_cq_free > cq_free_cnt) {
94 					cq = test_cq;
95 					cq_free_cnt = test_cq_free;
96 				}
97 			}
98 
99 			fid->cq = cq; /* this pins early */
100 		}
101 
102 		if (sw->cq_ring_space[cq] == 0 ||
103 				sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
104 			blocked_qes[nb_blocked++] = *qe;
105 			continue;
106 		}
107 
108 		struct sw_port *p = &sw->ports[cq];
109 
110 		/* at this point we can queue up the packet on the cq_buf */
111 		fid->pcount++;
112 		p->cq_buf[p->cq_buf_count++] = *qe;
113 		p->inflights++;
114 		sw->cq_ring_space[cq]--;
115 
116 		int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
117 		p->hist_list[head].fid = flow_id;
118 		p->hist_list[head].qid = qid_id;
119 
120 		p->stats.tx_pkts++;
121 		qid->stats.tx_pkts++;
122 
123 		/* if we just filled in the last slot, flush the buffer */
124 		if (sw->cq_ring_space[cq] == 0) {
125 			struct qe_ring *worker = p->cq_worker_ring;
126 			qe_ring_enqueue_burst(worker, p->cq_buf,
127 					p->cq_buf_count,
128 					&sw->cq_ring_space[cq]);
129 			p->cq_buf_count = 0;
130 		}
131 	}
132 	iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
133 
134 	return count - nb_blocked;
135 }
136 
137 static inline uint32_t
138 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
139 		uint32_t iq_num, unsigned int count, int keep_order)
140 {
141 	uint32_t i;
142 	uint32_t cq_idx = qid->cq_next_tx;
143 
144 	/* This is the QID ID. The QID ID is static, hence it can be
145 	 * used to identify the stage of processing in history lists etc
146 	 */
147 	uint32_t qid_id = qid->id;
148 
149 	if (count > MAX_PER_IQ_DEQUEUE)
150 		count = MAX_PER_IQ_DEQUEUE;
151 
152 	if (keep_order)
153 		/* only schedule as many as we have reorder buffer entries */
154 		count = RTE_MIN(count,
155 				rte_ring_count(qid->reorder_buffer_freelist));
156 
157 	for (i = 0; i < count; i++) {
158 		const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
159 		uint32_t cq_check_count = 0;
160 		uint32_t cq;
161 
162 		/*
163 		 *  for parallel, just send to next available CQ in round-robin
164 		 * fashion. So scan for an available CQ. If all CQs are full
165 		 * just return and move on to next QID
166 		 */
167 		do {
168 			if (++cq_check_count > qid->cq_num_mapped_cqs)
169 				goto exit;
170 			cq = qid->cq_map[cq_idx];
171 			if (++cq_idx == qid->cq_num_mapped_cqs)
172 				cq_idx = 0;
173 		} while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
174 				sw->ports[cq].inflights == SW_PORT_HIST_LIST);
175 
176 		struct sw_port *p = &sw->ports[cq];
177 		if (sw->cq_ring_space[cq] == 0 ||
178 				p->inflights == SW_PORT_HIST_LIST)
179 			break;
180 
181 		sw->cq_ring_space[cq]--;
182 
183 		qid->stats.tx_pkts++;
184 
185 		const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
186 
187 		p->hist_list[head].fid = qe->flow_id;
188 		p->hist_list[head].qid = qid_id;
189 
190 		if (keep_order)
191 			rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
192 					(void *)&p->hist_list[head].rob_entry);
193 
194 		sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
195 		iq_ring_pop(qid->iq[iq_num]);
196 
197 		rte_compiler_barrier();
198 		p->inflights++;
199 		p->stats.tx_pkts++;
200 		p->hist_head++;
201 	}
202 exit:
203 	qid->cq_next_tx = cq_idx;
204 	return i;
205 }
206 
207 static uint32_t
208 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
209 		uint32_t iq_num, unsigned int count __rte_unused)
210 {
211 	uint32_t cq_id = qid->cq_map[0];
212 	struct sw_port *port = &sw->ports[cq_id];
213 
214 	/* get max burst enq size for cq_ring */
215 	uint32_t count_free = sw->cq_ring_space[cq_id];
216 	if (count_free == 0)
217 		return 0;
218 
219 	/* burst dequeue from the QID IQ ring */
220 	struct iq_ring *ring = qid->iq[iq_num];
221 	uint32_t ret = iq_ring_dequeue_burst(ring,
222 			&port->cq_buf[port->cq_buf_count], count_free);
223 	port->cq_buf_count += ret;
224 
225 	/* Update QID, Port and Total TX stats */
226 	qid->stats.tx_pkts += ret;
227 	port->stats.tx_pkts += ret;
228 
229 	/* Subtract credits from cached value */
230 	sw->cq_ring_space[cq_id] -= ret;
231 
232 	return ret;
233 }
234 
235 static uint32_t
236 sw_schedule_qid_to_cq(struct sw_evdev *sw)
237 {
238 	uint32_t pkts = 0;
239 	uint32_t qid_idx;
240 
241 	sw->sched_cq_qid_called++;
242 
243 	for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
244 		struct sw_qid *qid = sw->qids_prioritized[qid_idx];
245 
246 		int type = qid->type;
247 		int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
248 
249 		/* zero mapped CQs indicates directed */
250 		if (iq_num >= SW_IQS_MAX)
251 			continue;
252 
253 		uint32_t pkts_done = 0;
254 		uint32_t count = iq_ring_count(qid->iq[iq_num]);
255 
256 		if (count > 0) {
257 			if (type == SW_SCHED_TYPE_DIRECT)
258 				pkts_done += sw_schedule_dir_to_cq(sw, qid,
259 						iq_num, count);
260 			else if (type == RTE_SCHED_TYPE_ATOMIC)
261 				pkts_done += sw_schedule_atomic_to_cq(sw, qid,
262 						iq_num, count);
263 			else
264 				pkts_done += sw_schedule_parallel_to_cq(sw, qid,
265 						iq_num, count,
266 						type == RTE_SCHED_TYPE_ORDERED);
267 		}
268 
269 		/* Check if the IQ that was polled is now empty, and unset it
270 		 * in the IQ mask if its empty.
271 		 */
272 		int all_done = (pkts_done == count);
273 
274 		qid->iq_pkt_mask &= ~(all_done << (iq_num));
275 		pkts += pkts_done;
276 	}
277 
278 	return pkts;
279 }
280 
281 /* This function will perform re-ordering of packets, and injecting into
282  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
283  * contiguous in that array, this function accepts a "range" of QIDs to scan.
284  */
285 static uint16_t
286 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
287 {
288 	/* Perform egress reordering */
289 	struct rte_event *qe;
290 	uint32_t pkts_iter = 0;
291 
292 	for (; qid_start < qid_end; qid_start++) {
293 		struct sw_qid *qid = &sw->qids[qid_start];
294 		int i, num_entries_in_use;
295 
296 		if (qid->type != RTE_SCHED_TYPE_ORDERED)
297 			continue;
298 
299 		num_entries_in_use = rte_ring_free_count(
300 					qid->reorder_buffer_freelist);
301 
302 		for (i = 0; i < num_entries_in_use; i++) {
303 			struct reorder_buffer_entry *entry;
304 			int j;
305 
306 			entry = &qid->reorder_buffer[qid->reorder_buffer_index];
307 
308 			if (!entry->ready)
309 				break;
310 
311 			for (j = 0; j < entry->num_fragments; j++) {
312 				uint16_t dest_qid;
313 				uint16_t dest_iq;
314 
315 				int idx = entry->fragment_index + j;
316 				qe = &entry->fragments[idx];
317 
318 				dest_qid = qe->queue_id;
319 				dest_iq  = PRIO_TO_IQ(qe->priority);
320 
321 				if (dest_qid >= sw->qid_count) {
322 					sw->stats.rx_dropped++;
323 					continue;
324 				}
325 
326 				struct sw_qid *dest_qid_ptr =
327 					&sw->qids[dest_qid];
328 				const struct iq_ring *dest_iq_ptr =
329 					dest_qid_ptr->iq[dest_iq];
330 				if (iq_ring_free_count(dest_iq_ptr) == 0)
331 					break;
332 
333 				pkts_iter++;
334 
335 				struct sw_qid *q = &sw->qids[dest_qid];
336 				struct iq_ring *r = q->iq[dest_iq];
337 
338 				/* we checked for space above, so enqueue must
339 				 * succeed
340 				 */
341 				iq_ring_enqueue(r, qe);
342 				q->iq_pkt_mask |= (1 << (dest_iq));
343 				q->iq_pkt_count[dest_iq]++;
344 				q->stats.rx_pkts++;
345 			}
346 
347 			entry->ready = (j != entry->num_fragments);
348 			entry->num_fragments -= j;
349 			entry->fragment_index += j;
350 
351 			if (!entry->ready) {
352 				entry->fragment_index = 0;
353 
354 				rte_ring_sp_enqueue(
355 						qid->reorder_buffer_freelist,
356 						entry);
357 
358 				qid->reorder_buffer_index++;
359 				qid->reorder_buffer_index %= qid->window_size;
360 			}
361 		}
362 	}
363 	return pkts_iter;
364 }
365 
366 static inline void __attribute__((always_inline))
367 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
368 {
369 	RTE_SET_USED(sw);
370 	struct qe_ring *worker = port->rx_worker_ring;
371 	port->pp_buf_start = 0;
372 	port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
373 			RTE_DIM(port->pp_buf));
374 }
375 
376 static inline uint32_t __attribute__((always_inline))
377 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
378 {
379 	static const struct reorder_buffer_entry dummy_rob;
380 	uint32_t pkts_iter = 0;
381 	struct sw_port *port = &sw->ports[port_id];
382 
383 	/* If shadow ring has 0 pkts, pull from worker ring */
384 	if (port->pp_buf_count == 0)
385 		sw_refill_pp_buf(sw, port);
386 
387 	while (port->pp_buf_count) {
388 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
389 		struct sw_hist_list_entry *hist_entry = NULL;
390 		uint8_t flags = qe->op;
391 		const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
392 		int needs_reorder = 0;
393 		/* if no-reordering, having PARTIAL == NEW */
394 		if (!allow_reorder && !eop)
395 			flags = QE_FLAG_VALID;
396 
397 		/*
398 		 * if we don't have space for this packet in an IQ,
399 		 * then move on to next queue. Technically, for a
400 		 * packet that needs reordering, we don't need to check
401 		 * here, but it simplifies things not to special-case
402 		 */
403 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
404 		struct sw_qid *qid = &sw->qids[qe->queue_id];
405 
406 		if ((flags & QE_FLAG_VALID) &&
407 				iq_ring_free_count(qid->iq[iq_num]) == 0)
408 			break;
409 
410 		/* now process based on flags. Note that for directed
411 		 * queues, the enqueue_flush masks off all but the
412 		 * valid flag. This makes FWD and PARTIAL enqueues just
413 		 * NEW type, and makes DROPS no-op calls.
414 		 */
415 		if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
416 			const uint32_t hist_tail = port->hist_tail &
417 					(SW_PORT_HIST_LIST - 1);
418 
419 			hist_entry = &port->hist_list[hist_tail];
420 			const uint32_t hist_qid = hist_entry->qid;
421 			const uint32_t hist_fid = hist_entry->fid;
422 
423 			struct sw_fid_t *fid =
424 				&sw->qids[hist_qid].fids[hist_fid];
425 			fid->pcount -= eop;
426 			if (fid->pcount == 0)
427 				fid->cq = -1;
428 
429 			if (allow_reorder) {
430 				/* set reorder ready if an ordered QID */
431 				uintptr_t rob_ptr =
432 					(uintptr_t)hist_entry->rob_entry;
433 				const uintptr_t valid = (rob_ptr != 0);
434 				needs_reorder = valid;
435 				rob_ptr |=
436 					((valid - 1) & (uintptr_t)&dummy_rob);
437 				struct reorder_buffer_entry *tmp_rob_ptr =
438 					(struct reorder_buffer_entry *)rob_ptr;
439 				tmp_rob_ptr->ready = eop * needs_reorder;
440 			}
441 
442 			port->inflights -= eop;
443 			port->hist_tail += eop;
444 		}
445 		if (flags & QE_FLAG_VALID) {
446 			port->stats.rx_pkts++;
447 
448 			if (allow_reorder && needs_reorder) {
449 				struct reorder_buffer_entry *rob_entry =
450 						hist_entry->rob_entry;
451 
452 				/* Although fragmentation not currently
453 				 * supported by eventdev API, we support it
454 				 * here. Open: How do we alert the user that
455 				 * they've exceeded max frags?
456 				 */
457 				int num_frag = rob_entry->num_fragments;
458 				if (num_frag == SW_FRAGMENTS_MAX)
459 					sw->stats.rx_dropped++;
460 				else {
461 					int idx = rob_entry->num_fragments++;
462 					rob_entry->fragments[idx] = *qe;
463 				}
464 				goto end_qe;
465 			}
466 
467 			/* Use the iq_num from above to push the QE
468 			 * into the qid at the right priority
469 			 */
470 
471 			qid->iq_pkt_mask |= (1 << (iq_num));
472 			iq_ring_enqueue(qid->iq[iq_num], qe);
473 			qid->iq_pkt_count[iq_num]++;
474 			qid->stats.rx_pkts++;
475 			pkts_iter++;
476 		}
477 
478 end_qe:
479 		port->pp_buf_start++;
480 		port->pp_buf_count--;
481 	} /* while (avail_qes) */
482 
483 	return pkts_iter;
484 }
485 
486 static uint32_t
487 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
488 {
489 	return __pull_port_lb(sw, port_id, 1);
490 }
491 
492 static uint32_t
493 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
494 {
495 	return __pull_port_lb(sw, port_id, 0);
496 }
497 
498 static uint32_t
499 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
500 {
501 	uint32_t pkts_iter = 0;
502 	struct sw_port *port = &sw->ports[port_id];
503 
504 	/* If shadow ring has 0 pkts, pull from worker ring */
505 	if (port->pp_buf_count == 0)
506 		sw_refill_pp_buf(sw, port);
507 
508 	while (port->pp_buf_count) {
509 		const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
510 		uint8_t flags = qe->op;
511 
512 		if ((flags & QE_FLAG_VALID) == 0)
513 			goto end_qe;
514 
515 		uint32_t iq_num = PRIO_TO_IQ(qe->priority);
516 		struct sw_qid *qid = &sw->qids[qe->queue_id];
517 		struct iq_ring *iq_ring = qid->iq[iq_num];
518 
519 		if (iq_ring_free_count(iq_ring) == 0)
520 			break; /* move to next port */
521 
522 		port->stats.rx_pkts++;
523 
524 		/* Use the iq_num from above to push the QE
525 		 * into the qid at the right priority
526 		 */
527 		qid->iq_pkt_mask |= (1 << (iq_num));
528 		iq_ring_enqueue(iq_ring, qe);
529 		qid->iq_pkt_count[iq_num]++;
530 		qid->stats.rx_pkts++;
531 		pkts_iter++;
532 
533 end_qe:
534 		port->pp_buf_start++;
535 		port->pp_buf_count--;
536 	} /* while port->pp_buf_count */
537 
538 	return pkts_iter;
539 }
540 
541 void
542 sw_event_schedule(struct rte_eventdev *dev)
543 {
544 	struct sw_evdev *sw = sw_pmd_priv(dev);
545 	uint32_t in_pkts, out_pkts;
546 	uint32_t out_pkts_total = 0, in_pkts_total = 0;
547 	int32_t sched_quanta = sw->sched_quanta;
548 	uint32_t i;
549 
550 	sw->sched_called++;
551 	if (!sw->started)
552 		return;
553 
554 	do {
555 		uint32_t in_pkts_this_iteration = 0;
556 
557 		/* Pull from rx_ring for ports */
558 		do {
559 			in_pkts = 0;
560 			for (i = 0; i < sw->port_count; i++)
561 				if (sw->ports[i].is_directed)
562 					in_pkts += sw_schedule_pull_port_dir(sw, i);
563 				else if (sw->ports[i].num_ordered_qids > 0)
564 					in_pkts += sw_schedule_pull_port_lb(sw, i);
565 				else
566 					in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
567 
568 			/* QID scan for re-ordered */
569 			in_pkts += sw_schedule_reorder(sw, 0,
570 					sw->qid_count);
571 			in_pkts_this_iteration += in_pkts;
572 		} while (in_pkts > 4 &&
573 				(int)in_pkts_this_iteration < sched_quanta);
574 
575 		out_pkts = 0;
576 		out_pkts += sw_schedule_qid_to_cq(sw);
577 		out_pkts_total += out_pkts;
578 		in_pkts_total += in_pkts_this_iteration;
579 
580 		if (in_pkts == 0 && out_pkts == 0)
581 			break;
582 	} while ((int)out_pkts_total < sched_quanta);
583 
584 	/* push all the internal buffered QEs in port->cq_ring to the
585 	 * worker cores: aka, do the ring transfers batched.
586 	 */
587 	for (i = 0; i < sw->port_count; i++) {
588 		struct qe_ring *worker = sw->ports[i].cq_worker_ring;
589 		qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
590 				sw->ports[i].cq_buf_count,
591 				&sw->cq_ring_space[i]);
592 		sw->ports[i].cq_buf_count = 0;
593 	}
594 
595 	sw->stats.tx_pkts += out_pkts_total;
596 	sw->stats.rx_pkts += in_pkts_total;
597 
598 	sw->sched_no_iq_enqueues += (in_pkts_total == 0);
599 	sw->sched_no_cq_enqueues += (out_pkts_total == 0);
600 
601 }
602