xref: /dpdk/examples/pipeline/thread.c (revision 72c00ae9dba7fc3f8c892b0354b956e885a25770)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "obj.h"
19 #include "thread.h"
20 
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX                               256
23 #endif
24 
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE                                   64
27 #endif
28 
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS                             100
31 #endif
32 
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34  * work, but not too big to avoid starving any other pipelines mapped to the
35  * same thread. For a pipeline that executes 10 instructions per packet, a
36  * quanta of 1000 instructions equates to processing 100 packets.
37  */
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA                              1000
40 #endif
41 
42 /**
43  * Control thread: data plane thread context
44  */
45 struct thread {
46 	struct rte_ring *msgq_req;
47 	struct rte_ring *msgq_rsp;
48 
49 	uint32_t enabled;
50 };
51 
52 static struct thread thread[RTE_MAX_LCORE];
53 
54 /**
55  * Data plane threads: context
56  */
57 struct pipeline_data {
58 	struct rte_swx_pipeline *p;
59 	uint64_t timer_period; /* Measured in CPU cycles. */
60 	uint64_t time_next;
61 };
62 
63 struct thread_data {
64 	struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
65 	uint32_t n_pipelines;
66 
67 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68 	struct rte_ring *msgq_req;
69 	struct rte_ring *msgq_rsp;
70 	uint64_t timer_period; /* Measured in CPU cycles. */
71 	uint64_t time_next;
72 	uint64_t time_next_min;
73 } __rte_cache_aligned;
74 
75 static struct thread_data thread_data[RTE_MAX_LCORE];
76 
77 /**
78  * Control thread: data plane thread init
79  */
80 static void
81 thread_free(void)
82 {
83 	uint32_t i;
84 
85 	for (i = 0; i < RTE_MAX_LCORE; i++) {
86 		struct thread *t = &thread[i];
87 
88 		if (!rte_lcore_is_enabled(i))
89 			continue;
90 
91 		/* MSGQs */
92 		if (t->msgq_req)
93 			rte_ring_free(t->msgq_req);
94 
95 		if (t->msgq_rsp)
96 			rte_ring_free(t->msgq_rsp);
97 	}
98 }
99 
100 int
101 thread_init(void)
102 {
103 	uint32_t i;
104 
105 	RTE_LCORE_FOREACH_WORKER(i) {
106 		char name[NAME_MAX];
107 		struct rte_ring *msgq_req, *msgq_rsp;
108 		struct thread *t = &thread[i];
109 		struct thread_data *t_data = &thread_data[i];
110 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
111 
112 		/* MSGQs */
113 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
114 
115 		msgq_req = rte_ring_create(name,
116 			THREAD_MSGQ_SIZE,
117 			cpu_id,
118 			RING_F_SP_ENQ | RING_F_SC_DEQ);
119 
120 		if (msgq_req == NULL) {
121 			thread_free();
122 			return -1;
123 		}
124 
125 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
126 
127 		msgq_rsp = rte_ring_create(name,
128 			THREAD_MSGQ_SIZE,
129 			cpu_id,
130 			RING_F_SP_ENQ | RING_F_SC_DEQ);
131 
132 		if (msgq_rsp == NULL) {
133 			thread_free();
134 			return -1;
135 		}
136 
137 		/* Control thread records */
138 		t->msgq_req = msgq_req;
139 		t->msgq_rsp = msgq_rsp;
140 		t->enabled = 1;
141 
142 		/* Data plane thread records */
143 		t_data->n_pipelines = 0;
144 		t_data->msgq_req = msgq_req;
145 		t_data->msgq_rsp = msgq_rsp;
146 		t_data->timer_period =
147 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
148 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
149 		t_data->time_next_min = t_data->time_next;
150 	}
151 
152 	return 0;
153 }
154 
155 static inline int
156 thread_is_running(uint32_t thread_id)
157 {
158 	enum rte_lcore_state_t thread_state;
159 
160 	thread_state = rte_eal_get_lcore_state(thread_id);
161 	return (thread_state == RUNNING) ? 1 : 0;
162 }
163 
164 /**
165  * Control thread & data plane threads: message passing
166  */
167 enum thread_req_type {
168 	THREAD_REQ_PIPELINE_ENABLE = 0,
169 	THREAD_REQ_PIPELINE_DISABLE,
170 	THREAD_REQ_MAX
171 };
172 
173 struct thread_msg_req {
174 	enum thread_req_type type;
175 
176 	union {
177 		struct {
178 			struct rte_swx_pipeline *p;
179 			uint32_t timer_period_ms;
180 		} pipeline_enable;
181 
182 		struct {
183 			struct rte_swx_pipeline *p;
184 		} pipeline_disable;
185 	};
186 };
187 
188 struct thread_msg_rsp {
189 	int status;
190 };
191 
192 /**
193  * Control thread
194  */
195 static struct thread_msg_req *
196 thread_msg_alloc(void)
197 {
198 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
199 		sizeof(struct thread_msg_rsp));
200 
201 	return calloc(1, size);
202 }
203 
204 static void
205 thread_msg_free(struct thread_msg_rsp *rsp)
206 {
207 	free(rsp);
208 }
209 
210 static struct thread_msg_rsp *
211 thread_msg_send_recv(uint32_t thread_id,
212 	struct thread_msg_req *req)
213 {
214 	struct thread *t = &thread[thread_id];
215 	struct rte_ring *msgq_req = t->msgq_req;
216 	struct rte_ring *msgq_rsp = t->msgq_rsp;
217 	struct thread_msg_rsp *rsp;
218 	int status;
219 
220 	/* send */
221 	do {
222 		status = rte_ring_sp_enqueue(msgq_req, req);
223 	} while (status == -ENOBUFS);
224 
225 	/* recv */
226 	do {
227 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
228 	} while (status != 0);
229 
230 	return rsp;
231 }
232 
233 int
234 thread_pipeline_enable(uint32_t thread_id,
235 	struct obj *obj,
236 	const char *pipeline_name)
237 {
238 	struct pipeline *p = pipeline_find(obj, pipeline_name);
239 	struct thread *t;
240 	struct thread_msg_req *req;
241 	struct thread_msg_rsp *rsp;
242 	int status;
243 
244 	/* Check input params */
245 	if ((thread_id >= RTE_MAX_LCORE) ||
246 		(p == NULL))
247 		return -1;
248 
249 	t = &thread[thread_id];
250 	if (t->enabled == 0)
251 		return -1;
252 
253 	if (!thread_is_running(thread_id)) {
254 		struct thread_data *td = &thread_data[thread_id];
255 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
256 
257 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
258 			return -1;
259 
260 		/* Data plane thread */
261 		td->p[td->n_pipelines] = p->p;
262 
263 		tdp->p = p->p;
264 		tdp->timer_period =
265 			(rte_get_tsc_hz() * p->timer_period_ms) / 1000;
266 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
267 
268 		td->n_pipelines++;
269 
270 		/* Pipeline */
271 		p->thread_id = thread_id;
272 		p->enabled = 1;
273 
274 		return 0;
275 	}
276 
277 	/* Allocate request */
278 	req = thread_msg_alloc();
279 	if (req == NULL)
280 		return -1;
281 
282 	/* Write request */
283 	req->type = THREAD_REQ_PIPELINE_ENABLE;
284 	req->pipeline_enable.p = p->p;
285 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
286 
287 	/* Send request and wait for response */
288 	rsp = thread_msg_send_recv(thread_id, req);
289 
290 	/* Read response */
291 	status = rsp->status;
292 
293 	/* Free response */
294 	thread_msg_free(rsp);
295 
296 	/* Request completion */
297 	if (status)
298 		return status;
299 
300 	p->thread_id = thread_id;
301 	p->enabled = 1;
302 
303 	return 0;
304 }
305 
306 int
307 thread_pipeline_disable(uint32_t thread_id,
308 	struct obj *obj,
309 	const char *pipeline_name)
310 {
311 	struct pipeline *p = pipeline_find(obj, pipeline_name);
312 	struct thread *t;
313 	struct thread_msg_req *req;
314 	struct thread_msg_rsp *rsp;
315 	int status;
316 
317 	/* Check input params */
318 	if ((thread_id >= RTE_MAX_LCORE) ||
319 		(p == NULL))
320 		return -1;
321 
322 	t = &thread[thread_id];
323 	if (t->enabled == 0)
324 		return -1;
325 
326 	if (p->enabled == 0)
327 		return 0;
328 
329 	if (p->thread_id != thread_id)
330 		return -1;
331 
332 	if (!thread_is_running(thread_id)) {
333 		struct thread_data *td = &thread_data[thread_id];
334 		uint32_t i;
335 
336 		for (i = 0; i < td->n_pipelines; i++) {
337 			struct pipeline_data *tdp = &td->pipeline_data[i];
338 
339 			if (tdp->p != p->p)
340 				continue;
341 
342 			/* Data plane thread */
343 			if (i < td->n_pipelines - 1) {
344 				struct rte_swx_pipeline *pipeline_last =
345 					td->p[td->n_pipelines - 1];
346 				struct pipeline_data *tdp_last =
347 					&td->pipeline_data[td->n_pipelines - 1];
348 
349 				td->p[i] = pipeline_last;
350 				memcpy(tdp, tdp_last, sizeof(*tdp));
351 			}
352 
353 			td->n_pipelines--;
354 
355 			/* Pipeline */
356 			p->enabled = 0;
357 
358 			break;
359 		}
360 
361 		return 0;
362 	}
363 
364 	/* Allocate request */
365 	req = thread_msg_alloc();
366 	if (req == NULL)
367 		return -1;
368 
369 	/* Write request */
370 	req->type = THREAD_REQ_PIPELINE_DISABLE;
371 	req->pipeline_disable.p = p->p;
372 
373 	/* Send request and wait for response */
374 	rsp = thread_msg_send_recv(thread_id, req);
375 
376 	/* Read response */
377 	status = rsp->status;
378 
379 	/* Free response */
380 	thread_msg_free(rsp);
381 
382 	/* Request completion */
383 	if (status)
384 		return status;
385 
386 	p->enabled = 0;
387 
388 	return 0;
389 }
390 
391 /**
392  * Data plane threads: message handling
393  */
394 static inline struct thread_msg_req *
395 thread_msg_recv(struct rte_ring *msgq_req)
396 {
397 	struct thread_msg_req *req;
398 
399 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
400 
401 	if (status != 0)
402 		return NULL;
403 
404 	return req;
405 }
406 
407 static inline void
408 thread_msg_send(struct rte_ring *msgq_rsp,
409 	struct thread_msg_rsp *rsp)
410 {
411 	int status;
412 
413 	do {
414 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
415 	} while (status == -ENOBUFS);
416 }
417 
418 static struct thread_msg_rsp *
419 thread_msg_handle_pipeline_enable(struct thread_data *t,
420 	struct thread_msg_req *req)
421 {
422 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
423 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
424 
425 	/* Request */
426 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
427 		rsp->status = -1;
428 		return rsp;
429 	}
430 
431 	t->p[t->n_pipelines] = req->pipeline_enable.p;
432 
433 	p->p = req->pipeline_enable.p;
434 	p->timer_period = (rte_get_tsc_hz() *
435 		req->pipeline_enable.timer_period_ms) / 1000;
436 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
437 
438 	t->n_pipelines++;
439 
440 	/* Response */
441 	rsp->status = 0;
442 	return rsp;
443 }
444 
445 static struct thread_msg_rsp *
446 thread_msg_handle_pipeline_disable(struct thread_data *t,
447 	struct thread_msg_req *req)
448 {
449 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
450 	uint32_t n_pipelines = t->n_pipelines;
451 	struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
452 	uint32_t i;
453 
454 	/* find pipeline */
455 	for (i = 0; i < n_pipelines; i++) {
456 		struct pipeline_data *p = &t->pipeline_data[i];
457 
458 		if (p->p != pipeline)
459 			continue;
460 
461 		if (i < n_pipelines - 1) {
462 			struct rte_swx_pipeline *pipeline_last =
463 				t->p[n_pipelines - 1];
464 			struct pipeline_data *p_last =
465 				&t->pipeline_data[n_pipelines - 1];
466 
467 			t->p[i] = pipeline_last;
468 			memcpy(p, p_last, sizeof(*p));
469 		}
470 
471 		t->n_pipelines--;
472 
473 		rsp->status = 0;
474 		return rsp;
475 	}
476 
477 	/* should not get here */
478 	rsp->status = 0;
479 	return rsp;
480 }
481 
482 static void
483 thread_msg_handle(struct thread_data *t)
484 {
485 	for ( ; ; ) {
486 		struct thread_msg_req *req;
487 		struct thread_msg_rsp *rsp;
488 
489 		req = thread_msg_recv(t->msgq_req);
490 		if (req == NULL)
491 			break;
492 
493 		switch (req->type) {
494 		case THREAD_REQ_PIPELINE_ENABLE:
495 			rsp = thread_msg_handle_pipeline_enable(t, req);
496 			break;
497 
498 		case THREAD_REQ_PIPELINE_DISABLE:
499 			rsp = thread_msg_handle_pipeline_disable(t, req);
500 			break;
501 
502 		default:
503 			rsp = (struct thread_msg_rsp *) req;
504 			rsp->status = -1;
505 		}
506 
507 		thread_msg_send(t->msgq_rsp, rsp);
508 	}
509 }
510 
511 /**
512  * Data plane threads: main
513  */
514 int
515 thread_main(void *arg __rte_unused)
516 {
517 	struct thread_data *t;
518 	uint32_t thread_id, i;
519 
520 	thread_id = rte_lcore_id();
521 	t = &thread_data[thread_id];
522 
523 	/* Dispatch loop */
524 	for (i = 0; ; i++) {
525 		uint32_t j;
526 
527 		/* Data Plane */
528 		for (j = 0; j < t->n_pipelines; j++)
529 			rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
530 
531 		/* Control Plane */
532 		if ((i & 0xF) == 0) {
533 			uint64_t time = rte_get_tsc_cycles();
534 			uint64_t time_next_min = UINT64_MAX;
535 
536 			if (time < t->time_next_min)
537 				continue;
538 
539 			/* Thread message queues */
540 			{
541 				uint64_t time_next = t->time_next;
542 
543 				if (time_next <= time) {
544 					thread_msg_handle(t);
545 					time_next = time + t->timer_period;
546 					t->time_next = time_next;
547 				}
548 
549 				if (time_next < time_next_min)
550 					time_next_min = time_next;
551 			}
552 
553 			t->time_next_min = time_next_min;
554 		}
555 	}
556 
557 	return 0;
558 }
559