xref: /dpdk/examples/ip_pipeline/thread.c (revision 57ccb278088fae9edc664893037e35cec8dea181)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include <rte_common.h>
35 #include <rte_cycles.h>
36 #include <rte_pipeline.h>
37 
38 #include "pipeline_common_be.h"
39 #include "app.h"
40 #include "thread.h"
41 
42 static inline void *
43 thread_msg_recv(struct rte_ring *r)
44 {
45 	void *msg;
46 	int status = rte_ring_sc_dequeue(r, &msg);
47 
48 	if (status != 0)
49 		return NULL;
50 
51 	return msg;
52 }
53 
54 static inline void
55 thread_msg_send(struct rte_ring *r,
56 	void *msg)
57 {
58 	int status;
59 
60 	do {
61 		status = rte_ring_sp_enqueue(r, msg);
62 	} while (status == -ENOBUFS);
63 }
64 
65 static int
66 thread_pipeline_enable(struct app_thread_data *t,
67 		struct thread_pipeline_enable_msg_req *req)
68 {
69 	struct app_thread_pipeline_data *p;
70 
71 	if (req->f_run == NULL) {
72 		if (t->n_regular >= APP_MAX_THREAD_PIPELINES)
73 			return -1;
74 	} else {
75 		if (t->n_custom >= APP_MAX_THREAD_PIPELINES)
76 			return -1;
77 	}
78 
79 	p = (req->f_run == NULL) ?
80 		&t->regular[t->n_regular] :
81 		&t->custom[t->n_custom];
82 
83 	p->pipeline_id = req->pipeline_id;
84 	p->be = req->be;
85 	p->f_run = req->f_run;
86 	p->f_timer = req->f_timer;
87 	p->timer_period = req->timer_period;
88 	p->deadline = 0;
89 
90 	if (req->f_run == NULL)
91 		t->n_regular++;
92 	else
93 		t->n_custom++;
94 
95 	return 0;
96 }
97 
98 static int
99 thread_pipeline_disable(struct app_thread_data *t,
100 		struct thread_pipeline_disable_msg_req *req)
101 {
102 	uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
103 	uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
104 	uint32_t i;
105 
106 	/* search regular pipelines of current thread */
107 	for (i = 0; i < n_regular; i++) {
108 		if (t->regular[i].pipeline_id != req->pipeline_id)
109 			continue;
110 
111 		if (i < n_regular - 1)
112 			memcpy(&t->regular[i],
113 			  &t->regular[i+1],
114 			  (n_regular - 1 - i) * sizeof(struct app_thread_pipeline_data));
115 
116 		n_regular--;
117 		t->n_regular = n_regular;
118 
119 		return 0;
120 	}
121 
122 	/* search custom pipelines of current thread */
123 	for (i = 0; i < n_custom; i++) {
124 		if (t->custom[i].pipeline_id != req->pipeline_id)
125 			continue;
126 
127 		if (i < n_custom - 1)
128 			memcpy(&t->custom[i],
129 			  &t->custom[i+1],
130 			  (n_custom - 1 - i) * sizeof(struct app_thread_pipeline_data));
131 
132 		n_custom--;
133 		t->n_custom = n_custom;
134 
135 		return 0;
136 	}
137 
138 	/* return if pipeline not found */
139 	return -1;
140 }
141 
142 static int
143 thread_msg_req_handle(struct app_thread_data *t)
144 {
145 	void *msg_ptr;
146 	struct thread_msg_req *req;
147 	struct thread_msg_rsp *rsp;
148 
149 	msg_ptr = thread_msg_recv(t->msgq_in);
150 	req = msg_ptr;
151 	rsp = msg_ptr;
152 
153 	if (req != NULL)
154 		switch (req->type) {
155 		case THREAD_MSG_REQ_PIPELINE_ENABLE: {
156 			rsp->status = thread_pipeline_enable(t,
157 					(struct thread_pipeline_enable_msg_req *) req);
158 			thread_msg_send(t->msgq_out, rsp);
159 			break;
160 		}
161 
162 		case THREAD_MSG_REQ_PIPELINE_DISABLE: {
163 			rsp->status = thread_pipeline_disable(t,
164 					(struct thread_pipeline_disable_msg_req *) req);
165 			thread_msg_send(t->msgq_out, rsp);
166 			break;
167 		}
168 		default:
169 			break;
170 		}
171 
172 	return 0;
173 }
174 
175 int
176 app_thread(void *arg)
177 {
178 	struct app_params *app = (struct app_params *) arg;
179 	uint32_t core_id = rte_lcore_id(), i, j;
180 	struct app_thread_data *t = &app->thread_data[core_id];
181 
182 	for (i = 0; ; i++) {
183 		uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
184 		uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
185 
186 		/* Run regular pipelines */
187 		for (j = 0; j < n_regular; j++) {
188 			struct app_thread_pipeline_data *data = &t->regular[j];
189 			struct pipeline *p = data->be;
190 
191 			rte_pipeline_run(p->p);
192 		}
193 
194 		/* Run custom pipelines */
195 		for (j = 0; j < n_custom; j++) {
196 			struct app_thread_pipeline_data *data = &t->custom[j];
197 
198 			data->f_run(data->be);
199 		}
200 
201 		/* Timer */
202 		if ((i & 0xF) == 0) {
203 			uint64_t time = rte_get_tsc_cycles();
204 			uint64_t t_deadline = UINT64_MAX;
205 
206 			if (time < t->deadline)
207 				continue;
208 
209 			/* Timer for regular pipelines */
210 			for (j = 0; j < n_regular; j++) {
211 				struct app_thread_pipeline_data *data =
212 					&t->regular[j];
213 				uint64_t p_deadline = data->deadline;
214 
215 				if (p_deadline <= time) {
216 					data->f_timer(data->be);
217 					p_deadline = time + data->timer_period;
218 					data->deadline = p_deadline;
219 				}
220 
221 				if (p_deadline < t_deadline)
222 					t_deadline = p_deadline;
223 			}
224 
225 			/* Timer for custom pipelines */
226 			for (j = 0; j < n_custom; j++) {
227 				struct app_thread_pipeline_data *data =
228 					&t->custom[j];
229 				uint64_t p_deadline = data->deadline;
230 
231 				if (p_deadline <= time) {
232 					data->f_timer(data->be);
233 					p_deadline = time + data->timer_period;
234 					data->deadline = p_deadline;
235 				}
236 
237 				if (p_deadline < t_deadline)
238 					t_deadline = p_deadline;
239 			}
240 
241 			/* Timer for thread message request */
242 			{
243 				uint64_t deadline = t->thread_req_deadline;
244 
245 				if (deadline <= time) {
246 					thread_msg_req_handle(t);
247 					deadline = time + t->timer_period;
248 					t->thread_req_deadline = deadline;
249 				}
250 
251 				if (deadline < t_deadline)
252 					t_deadline = deadline;
253 			}
254 
255 			t->deadline = t_deadline;
256 		}
257 	}
258 
259 	return 0;
260 }
261