xref: /dpdk/examples/pipeline/thread.c (revision f5057be340e44f3edc0fe90fa875eb89a4c49b4f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "obj.h"
19 #include "thread.h"
20 
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX                               256
23 #endif
24 
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE                                   64
27 #endif
28 
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS                             100
31 #endif
32 
33 /**
34  * Control thread: data plane thread context
35  */
36 struct thread {
37 	struct rte_ring *msgq_req;
38 	struct rte_ring *msgq_rsp;
39 
40 	uint32_t enabled;
41 };
42 
43 static struct thread thread[RTE_MAX_LCORE];
44 
45 /**
46  * Data plane threads: context
47  */
48 struct pipeline_data {
49 	struct rte_swx_pipeline *p;
50 	uint64_t timer_period; /* Measured in CPU cycles. */
51 	uint64_t time_next;
52 };
53 
54 struct thread_data {
55 	struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
56 	uint32_t n_pipelines;
57 
58 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
59 	struct rte_ring *msgq_req;
60 	struct rte_ring *msgq_rsp;
61 	uint64_t timer_period; /* Measured in CPU cycles. */
62 	uint64_t time_next;
63 	uint64_t time_next_min;
64 } __rte_cache_aligned;
65 
66 static struct thread_data thread_data[RTE_MAX_LCORE];
67 
68 /**
69  * Control thread: data plane thread init
70  */
71 static void
72 thread_free(void)
73 {
74 	uint32_t i;
75 
76 	for (i = 0; i < RTE_MAX_LCORE; i++) {
77 		struct thread *t = &thread[i];
78 
79 		if (!rte_lcore_is_enabled(i))
80 			continue;
81 
82 		/* MSGQs */
83 		if (t->msgq_req)
84 			rte_ring_free(t->msgq_req);
85 
86 		if (t->msgq_rsp)
87 			rte_ring_free(t->msgq_rsp);
88 	}
89 }
90 
91 int
92 thread_init(void)
93 {
94 	uint32_t i;
95 
96 	RTE_LCORE_FOREACH_SLAVE(i) {
97 		char name[NAME_MAX];
98 		struct rte_ring *msgq_req, *msgq_rsp;
99 		struct thread *t = &thread[i];
100 		struct thread_data *t_data = &thread_data[i];
101 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
102 
103 		/* MSGQs */
104 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
105 
106 		msgq_req = rte_ring_create(name,
107 			THREAD_MSGQ_SIZE,
108 			cpu_id,
109 			RING_F_SP_ENQ | RING_F_SC_DEQ);
110 
111 		if (msgq_req == NULL) {
112 			thread_free();
113 			return -1;
114 		}
115 
116 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
117 
118 		msgq_rsp = rte_ring_create(name,
119 			THREAD_MSGQ_SIZE,
120 			cpu_id,
121 			RING_F_SP_ENQ | RING_F_SC_DEQ);
122 
123 		if (msgq_rsp == NULL) {
124 			thread_free();
125 			return -1;
126 		}
127 
128 		/* Control thread records */
129 		t->msgq_req = msgq_req;
130 		t->msgq_rsp = msgq_rsp;
131 		t->enabled = 1;
132 
133 		/* Data plane thread records */
134 		t_data->n_pipelines = 0;
135 		t_data->msgq_req = msgq_req;
136 		t_data->msgq_rsp = msgq_rsp;
137 		t_data->timer_period =
138 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
139 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
140 		t_data->time_next_min = t_data->time_next;
141 	}
142 
143 	return 0;
144 }
145 
146 static inline int
147 thread_is_running(uint32_t thread_id)
148 {
149 	enum rte_lcore_state_t thread_state;
150 
151 	thread_state = rte_eal_get_lcore_state(thread_id);
152 	return (thread_state == RUNNING) ? 1 : 0;
153 }
154 
155 /**
156  * Control thread & data plane threads: message passing
157  */
158 enum thread_req_type {
159 	THREAD_REQ_PIPELINE_ENABLE = 0,
160 	THREAD_REQ_PIPELINE_DISABLE,
161 	THREAD_REQ_MAX
162 };
163 
164 struct thread_msg_req {
165 	enum thread_req_type type;
166 
167 	union {
168 		struct {
169 			struct rte_swx_pipeline *p;
170 			uint32_t timer_period_ms;
171 		} pipeline_enable;
172 
173 		struct {
174 			struct rte_swx_pipeline *p;
175 		} pipeline_disable;
176 	};
177 };
178 
179 struct thread_msg_rsp {
180 	int status;
181 };
182 
183 /**
184  * Control thread
185  */
186 static struct thread_msg_req *
187 thread_msg_alloc(void)
188 {
189 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
190 		sizeof(struct thread_msg_rsp));
191 
192 	return calloc(1, size);
193 }
194 
195 static void
196 thread_msg_free(struct thread_msg_rsp *rsp)
197 {
198 	free(rsp);
199 }
200 
201 static struct thread_msg_rsp *
202 thread_msg_send_recv(uint32_t thread_id,
203 	struct thread_msg_req *req)
204 {
205 	struct thread *t = &thread[thread_id];
206 	struct rte_ring *msgq_req = t->msgq_req;
207 	struct rte_ring *msgq_rsp = t->msgq_rsp;
208 	struct thread_msg_rsp *rsp;
209 	int status;
210 
211 	/* send */
212 	do {
213 		status = rte_ring_sp_enqueue(msgq_req, req);
214 	} while (status == -ENOBUFS);
215 
216 	/* recv */
217 	do {
218 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
219 	} while (status != 0);
220 
221 	return rsp;
222 }
223 
224 int
225 thread_pipeline_enable(uint32_t thread_id,
226 	struct obj *obj,
227 	const char *pipeline_name)
228 {
229 	struct pipeline *p = pipeline_find(obj, pipeline_name);
230 	struct thread *t;
231 	struct thread_msg_req *req;
232 	struct thread_msg_rsp *rsp;
233 	int status;
234 
235 	/* Check input params */
236 	if ((thread_id >= RTE_MAX_LCORE) ||
237 		(p == NULL))
238 		return -1;
239 
240 	t = &thread[thread_id];
241 	if (t->enabled == 0)
242 		return -1;
243 
244 	if (!thread_is_running(thread_id)) {
245 		struct thread_data *td = &thread_data[thread_id];
246 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
247 
248 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
249 			return -1;
250 
251 		/* Data plane thread */
252 		td->p[td->n_pipelines] = p->p;
253 
254 		tdp->p = p->p;
255 		tdp->timer_period =
256 			(rte_get_tsc_hz() * p->timer_period_ms) / 1000;
257 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
258 
259 		td->n_pipelines++;
260 
261 		/* Pipeline */
262 		p->thread_id = thread_id;
263 		p->enabled = 1;
264 
265 		return 0;
266 	}
267 
268 	/* Allocate request */
269 	req = thread_msg_alloc();
270 	if (req == NULL)
271 		return -1;
272 
273 	/* Write request */
274 	req->type = THREAD_REQ_PIPELINE_ENABLE;
275 	req->pipeline_enable.p = p->p;
276 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
277 
278 	/* Send request and wait for response */
279 	rsp = thread_msg_send_recv(thread_id, req);
280 
281 	/* Read response */
282 	status = rsp->status;
283 
284 	/* Free response */
285 	thread_msg_free(rsp);
286 
287 	/* Request completion */
288 	if (status)
289 		return status;
290 
291 	p->thread_id = thread_id;
292 	p->enabled = 1;
293 
294 	return 0;
295 }
296 
297 int
298 thread_pipeline_disable(uint32_t thread_id,
299 	struct obj *obj,
300 	const char *pipeline_name)
301 {
302 	struct pipeline *p = pipeline_find(obj, pipeline_name);
303 	struct thread *t;
304 	struct thread_msg_req *req;
305 	struct thread_msg_rsp *rsp;
306 	int status;
307 
308 	/* Check input params */
309 	if ((thread_id >= RTE_MAX_LCORE) ||
310 		(p == NULL))
311 		return -1;
312 
313 	t = &thread[thread_id];
314 	if (t->enabled == 0)
315 		return -1;
316 
317 	if (p->enabled == 0)
318 		return 0;
319 
320 	if (p->thread_id != thread_id)
321 		return -1;
322 
323 	if (!thread_is_running(thread_id)) {
324 		struct thread_data *td = &thread_data[thread_id];
325 		uint32_t i;
326 
327 		for (i = 0; i < td->n_pipelines; i++) {
328 			struct pipeline_data *tdp = &td->pipeline_data[i];
329 
330 			if (tdp->p != p->p)
331 				continue;
332 
333 			/* Data plane thread */
334 			if (i < td->n_pipelines - 1) {
335 				struct rte_swx_pipeline *pipeline_last =
336 					td->p[td->n_pipelines - 1];
337 				struct pipeline_data *tdp_last =
338 					&td->pipeline_data[td->n_pipelines - 1];
339 
340 				td->p[i] = pipeline_last;
341 				memcpy(tdp, tdp_last, sizeof(*tdp));
342 			}
343 
344 			td->n_pipelines--;
345 
346 			/* Pipeline */
347 			p->enabled = 0;
348 
349 			break;
350 		}
351 
352 		return 0;
353 	}
354 
355 	/* Allocate request */
356 	req = thread_msg_alloc();
357 	if (req == NULL)
358 		return -1;
359 
360 	/* Write request */
361 	req->type = THREAD_REQ_PIPELINE_DISABLE;
362 	req->pipeline_disable.p = p->p;
363 
364 	/* Send request and wait for response */
365 	rsp = thread_msg_send_recv(thread_id, req);
366 
367 	/* Read response */
368 	status = rsp->status;
369 
370 	/* Free response */
371 	thread_msg_free(rsp);
372 
373 	/* Request completion */
374 	if (status)
375 		return status;
376 
377 	p->enabled = 0;
378 
379 	return 0;
380 }
381 
382 /**
383  * Data plane threads: message handling
384  */
385 static inline struct thread_msg_req *
386 thread_msg_recv(struct rte_ring *msgq_req)
387 {
388 	struct thread_msg_req *req;
389 
390 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
391 
392 	if (status != 0)
393 		return NULL;
394 
395 	return req;
396 }
397 
398 static inline void
399 thread_msg_send(struct rte_ring *msgq_rsp,
400 	struct thread_msg_rsp *rsp)
401 {
402 	int status;
403 
404 	do {
405 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
406 	} while (status == -ENOBUFS);
407 }
408 
409 static struct thread_msg_rsp *
410 thread_msg_handle_pipeline_enable(struct thread_data *t,
411 	struct thread_msg_req *req)
412 {
413 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
414 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
415 
416 	/* Request */
417 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
418 		rsp->status = -1;
419 		return rsp;
420 	}
421 
422 	t->p[t->n_pipelines] = req->pipeline_enable.p;
423 
424 	p->p = req->pipeline_enable.p;
425 	p->timer_period = (rte_get_tsc_hz() *
426 		req->pipeline_enable.timer_period_ms) / 1000;
427 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
428 
429 	t->n_pipelines++;
430 
431 	/* Response */
432 	rsp->status = 0;
433 	return rsp;
434 }
435 
436 static struct thread_msg_rsp *
437 thread_msg_handle_pipeline_disable(struct thread_data *t,
438 	struct thread_msg_req *req)
439 {
440 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
441 	uint32_t n_pipelines = t->n_pipelines;
442 	struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
443 	uint32_t i;
444 
445 	/* find pipeline */
446 	for (i = 0; i < n_pipelines; i++) {
447 		struct pipeline_data *p = &t->pipeline_data[i];
448 
449 		if (p->p != pipeline)
450 			continue;
451 
452 		if (i < n_pipelines - 1) {
453 			struct rte_swx_pipeline *pipeline_last =
454 				t->p[n_pipelines - 1];
455 			struct pipeline_data *p_last =
456 				&t->pipeline_data[n_pipelines - 1];
457 
458 			t->p[i] = pipeline_last;
459 			memcpy(p, p_last, sizeof(*p));
460 		}
461 
462 		t->n_pipelines--;
463 
464 		rsp->status = 0;
465 		return rsp;
466 	}
467 
468 	/* should not get here */
469 	rsp->status = 0;
470 	return rsp;
471 }
472 
473 static void
474 thread_msg_handle(struct thread_data *t)
475 {
476 	for ( ; ; ) {
477 		struct thread_msg_req *req;
478 		struct thread_msg_rsp *rsp;
479 
480 		req = thread_msg_recv(t->msgq_req);
481 		if (req == NULL)
482 			break;
483 
484 		switch (req->type) {
485 		case THREAD_REQ_PIPELINE_ENABLE:
486 			rsp = thread_msg_handle_pipeline_enable(t, req);
487 			break;
488 
489 		case THREAD_REQ_PIPELINE_DISABLE:
490 			rsp = thread_msg_handle_pipeline_disable(t, req);
491 			break;
492 
493 		default:
494 			rsp = (struct thread_msg_rsp *) req;
495 			rsp->status = -1;
496 		}
497 
498 		thread_msg_send(t->msgq_rsp, rsp);
499 	}
500 }
501 
502 /**
503  * Data plane threads: main
504  */
505 int
506 thread_main(void *arg __rte_unused)
507 {
508 	struct thread_data *t;
509 	uint32_t thread_id, i;
510 
511 	thread_id = rte_lcore_id();
512 	t = &thread_data[thread_id];
513 
514 	/* Dispatch loop */
515 	for (i = 0; ; i++) {
516 		uint32_t j;
517 
518 		/* Data Plane */
519 		for (j = 0; j < t->n_pipelines; j++)
520 			rte_swx_pipeline_run(t->p[j], 1000000);
521 
522 		/* Control Plane */
523 		if ((i & 0xF) == 0) {
524 			uint64_t time = rte_get_tsc_cycles();
525 			uint64_t time_next_min = UINT64_MAX;
526 
527 			if (time < t->time_next_min)
528 				continue;
529 
530 			/* Thread message queues */
531 			{
532 				uint64_t time_next = t->time_next;
533 
534 				if (time_next <= time) {
535 					thread_msg_handle(t);
536 					time_next = time + t->timer_period;
537 					t->time_next = time_next;
538 				}
539 
540 				if (time_next < time_next_min)
541 					time_next_min = time_next;
542 			}
543 
544 			t->time_next_min = time_next_min;
545 		}
546 	}
547 
548 	return 0;
549 }
550