xref: /dpdk/examples/qos_sched/app_thread.c (revision 9a710863decb1cdb98efbdd5e11df3ebcfcc37b6)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4 
5 #include <stdint.h>
6 
7 #include <rte_log.h>
8 #include <rte_mbuf.h>
9 #include <rte_malloc.h>
10 #include <rte_cycles.h>
11 #include <rte_ethdev.h>
12 #include <rte_memcpy.h>
13 #include <rte_byteorder.h>
14 #include <rte_branch_prediction.h>
15 #include <rte_sched.h>
16 
17 #include "main.h"
18 
19 /*
20  * QoS parameters are encoded as follows:
21  *		Outer VLAN ID defines subport
22  *		Inner VLAN ID defines pipe
23  *		Destination IP host (0.0.0.XXX) defines queue
24  * Values below define offset to each field from start of frame
25  */
26 #define SUBPORT_OFFSET	7
27 #define PIPE_OFFSET		9
28 #define QUEUE_OFFSET	20
29 #define COLOR_OFFSET	19
30 
31 static inline int
32 get_pkt_sched(struct rte_mbuf *m, uint32_t *subport, uint32_t *pipe,
33 			uint32_t *traffic_class, uint32_t *queue, uint32_t *color)
34 {
35 	uint16_t *pdata = rte_pktmbuf_mtod(m, uint16_t *);
36 	uint16_t pipe_queue;
37 
38 	*subport = (rte_be_to_cpu_16(pdata[SUBPORT_OFFSET]) & 0x0FFF) &
39 			(port_params.n_subports_per_port - 1); /* Outer VLAN ID*/
40 	*pipe = (rte_be_to_cpu_16(pdata[PIPE_OFFSET]) & 0x0FFF) &
41 			(port_params.n_pipes_per_subport - 1); /* Inner VLAN ID */
42 	pipe_queue = active_queues[(pdata[QUEUE_OFFSET] >> 8) % n_active_queues];
43 	*traffic_class = pipe_queue > RTE_SCHED_TRAFFIC_CLASS_BE ?
44 			RTE_SCHED_TRAFFIC_CLASS_BE : pipe_queue; /* Destination IP */
45 	*queue = pipe_queue - *traffic_class; /* Destination IP */
46 	*color = pdata[COLOR_OFFSET] & 0x03; /* Destination IP */
47 
48 	return 0;
49 }
50 
51 void
52 app_rx_thread(struct thread_conf **confs)
53 {
54 	uint32_t i, nb_rx;
55 	struct rte_mbuf *rx_mbufs[burst_conf.rx_burst] __rte_cache_aligned;
56 	struct thread_conf *conf;
57 	int conf_idx = 0;
58 
59 	uint32_t subport;
60 	uint32_t pipe;
61 	uint32_t traffic_class;
62 	uint32_t queue;
63 	uint32_t color;
64 
65 	while ((conf = confs[conf_idx])) {
66 		nb_rx = rte_eth_rx_burst(conf->rx_port, conf->rx_queue, rx_mbufs,
67 				burst_conf.rx_burst);
68 
69 		if (likely(nb_rx != 0)) {
70 			APP_STATS_ADD(conf->stat.nb_rx, nb_rx);
71 
72 			for(i = 0; i < nb_rx; i++) {
73 				get_pkt_sched(rx_mbufs[i],
74 						&subport, &pipe, &traffic_class, &queue, &color);
75 				rte_sched_port_pkt_write(conf->sched_port,
76 						rx_mbufs[i],
77 						subport, pipe,
78 						traffic_class, queue,
79 						(enum rte_color) color);
80 			}
81 
82 			if (unlikely(rte_ring_sp_enqueue_bulk(conf->rx_ring,
83 					(void **)rx_mbufs, nb_rx, NULL) == 0)) {
84 				for(i = 0; i < nb_rx; i++) {
85 					rte_pktmbuf_free(rx_mbufs[i]);
86 
87 					APP_STATS_ADD(conf->stat.nb_drop, 1);
88 				}
89 			}
90 		}
91 		conf_idx++;
92 		if (confs[conf_idx] == NULL)
93 			conf_idx = 0;
94 	}
95 }
96 
97 
98 
99 /* Send the packet to an output interface
100  * For performance reason function returns number of packets dropped, not sent,
101  * so 0 means that all packets were sent successfully
102  */
103 
104 static inline void
105 app_send_burst(struct thread_conf *qconf)
106 {
107 	struct rte_mbuf **mbufs;
108 	uint32_t n, ret;
109 
110 	mbufs = (struct rte_mbuf **)qconf->m_table;
111 	n = qconf->n_mbufs;
112 
113 	do {
114 		ret = rte_eth_tx_burst(qconf->tx_port, qconf->tx_queue, mbufs, (uint16_t)n);
115 		/* we cannot drop the packets, so re-send */
116 		/* update number of packets to be sent */
117 		n -= ret;
118 		mbufs = (struct rte_mbuf **)&mbufs[ret];
119 	} while (n);
120 }
121 
122 
123 /* Send the packet to an output interface */
124 static void
125 app_send_packets(struct thread_conf *qconf, struct rte_mbuf **mbufs, uint32_t nb_pkt)
126 {
127 	uint32_t i, len;
128 
129 	len = qconf->n_mbufs;
130 	for(i = 0; i < nb_pkt; i++) {
131 		qconf->m_table[len] = mbufs[i];
132 		len++;
133 		/* enough pkts to be sent */
134 		if (unlikely(len == burst_conf.tx_burst)) {
135 			qconf->n_mbufs = len;
136 			app_send_burst(qconf);
137 			len = 0;
138 		}
139 	}
140 
141 	qconf->n_mbufs = len;
142 }
143 
144 void
145 app_tx_thread(struct thread_conf **confs)
146 {
147 	struct rte_mbuf *mbufs[burst_conf.qos_dequeue];
148 	struct thread_conf *conf;
149 	int conf_idx = 0;
150 	int retval;
151 	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;
152 
153 	while ((conf = confs[conf_idx])) {
154 		retval = rte_ring_sc_dequeue_bulk(conf->tx_ring, (void **)mbufs,
155 					burst_conf.qos_dequeue, NULL);
156 		if (likely(retval != 0)) {
157 			app_send_packets(conf, mbufs, burst_conf.qos_dequeue);
158 
159 			conf->counter = 0; /* reset empty read loop counter */
160 		}
161 
162 		conf->counter++;
163 
164 		/* drain ring and TX queues */
165 		if (unlikely(conf->counter > drain_tsc)) {
166 			/* now check is there any packets left to be transmitted */
167 			if (conf->n_mbufs != 0) {
168 				app_send_burst(conf);
169 
170 				conf->n_mbufs = 0;
171 			}
172 			conf->counter = 0;
173 		}
174 
175 		conf_idx++;
176 		if (confs[conf_idx] == NULL)
177 			conf_idx = 0;
178 	}
179 }
180 
181 
182 void
183 app_worker_thread(struct thread_conf **confs)
184 {
185 	struct rte_mbuf *mbufs[burst_conf.ring_burst];
186 	struct thread_conf *conf;
187 	int conf_idx = 0;
188 
189 	while ((conf = confs[conf_idx])) {
190 		uint32_t nb_pkt;
191 
192 		/* Read packet from the ring */
193 		nb_pkt = rte_ring_sc_dequeue_burst(conf->rx_ring, (void **)mbufs,
194 					burst_conf.ring_burst, NULL);
195 		if (likely(nb_pkt)) {
196 			int nb_sent = rte_sched_port_enqueue(conf->sched_port, mbufs,
197 					nb_pkt);
198 
199 			APP_STATS_ADD(conf->stat.nb_drop, nb_pkt - nb_sent);
200 			APP_STATS_ADD(conf->stat.nb_rx, nb_pkt);
201 		}
202 
203 		nb_pkt = rte_sched_port_dequeue(conf->sched_port, mbufs,
204 					burst_conf.qos_dequeue);
205 		if (likely(nb_pkt > 0))
206 			while (rte_ring_sp_enqueue_bulk(conf->tx_ring,
207 					(void **)mbufs, nb_pkt, NULL) == 0)
208 				; /* empty body */
209 
210 		conf_idx++;
211 		if (confs[conf_idx] == NULL)
212 			conf_idx = 0;
213 	}
214 }
215 
216 
217 void
218 app_mixed_thread(struct thread_conf **confs)
219 {
220 	struct rte_mbuf *mbufs[burst_conf.ring_burst];
221 	struct thread_conf *conf;
222 	int conf_idx = 0;
223 	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US;
224 
225 	while ((conf = confs[conf_idx])) {
226 		uint32_t nb_pkt;
227 
228 		/* Read packet from the ring */
229 		nb_pkt = rte_ring_sc_dequeue_burst(conf->rx_ring, (void **)mbufs,
230 					burst_conf.ring_burst, NULL);
231 		if (likely(nb_pkt)) {
232 			int nb_sent = rte_sched_port_enqueue(conf->sched_port, mbufs,
233 					nb_pkt);
234 
235 			APP_STATS_ADD(conf->stat.nb_drop, nb_pkt - nb_sent);
236 			APP_STATS_ADD(conf->stat.nb_rx, nb_pkt);
237 		}
238 
239 
240 		nb_pkt = rte_sched_port_dequeue(conf->sched_port, mbufs,
241 					burst_conf.qos_dequeue);
242 		if (likely(nb_pkt > 0)) {
243 			app_send_packets(conf, mbufs, nb_pkt);
244 
245 			conf->counter = 0; /* reset empty read loop counter */
246 		}
247 
248 		conf->counter++;
249 
250 		/* drain ring and TX queues */
251 		if (unlikely(conf->counter > drain_tsc)) {
252 
253 			/* now check is there any packets left to be transmitted */
254 			if (conf->n_mbufs != 0) {
255 				app_send_burst(conf);
256 
257 				conf->n_mbufs = 0;
258 			}
259 			conf->counter = 0;
260 		}
261 
262 		conf_idx++;
263 		if (confs[conf_idx] == NULL)
264 			conf_idx = 0;
265 	}
266 }
267