xref: /dpdk/examples/pipeline/thread.c (revision f4eac3a09c51a1a2dab1f2fd3a10fe0619286a0d)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "obj.h"
19 #include "thread.h"
20 
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX                               256
23 #endif
24 
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE                                   64
27 #endif
28 
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS                             100
31 #endif
32 
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34  * work, but not too big to avoid starving any other pipelines mapped to the
35  * same thread. For a pipeline that executes 10 instructions per packet, a
36  * quanta of 1000 instructions equates to processing 100 packets.
37  */
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA                              1000
40 #endif
41 
42 /**
43  * Control thread: data plane thread context
44  */
45 struct thread {
46 	struct rte_ring *msgq_req;
47 	struct rte_ring *msgq_rsp;
48 
49 	uint32_t enabled;
50 };
51 
52 static struct thread thread[RTE_MAX_LCORE];
53 
54 /**
55  * Data plane threads: context
56  */
57 struct pipeline_data {
58 	struct rte_swx_pipeline *p;
59 	uint64_t timer_period; /* Measured in CPU cycles. */
60 	uint64_t time_next;
61 };
62 
63 struct thread_data {
64 	struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
65 	uint32_t n_pipelines;
66 
67 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68 	struct rte_ring *msgq_req;
69 	struct rte_ring *msgq_rsp;
70 	uint64_t timer_period; /* Measured in CPU cycles. */
71 	uint64_t time_next;
72 	uint64_t time_next_min;
73 } __rte_cache_aligned;
74 
75 static struct thread_data thread_data[RTE_MAX_LCORE];
76 
77 /**
78  * Control thread: data plane thread init
79  */
80 static void
81 thread_free(void)
82 {
83 	uint32_t i;
84 
85 	for (i = 0; i < RTE_MAX_LCORE; i++) {
86 		struct thread *t = &thread[i];
87 
88 		if (!rte_lcore_is_enabled(i))
89 			continue;
90 
91 		/* MSGQs */
92 		rte_ring_free(t->msgq_req);
93 
94 		rte_ring_free(t->msgq_rsp);
95 	}
96 }
97 
98 int
99 thread_init(void)
100 {
101 	uint32_t i;
102 
103 	RTE_LCORE_FOREACH_WORKER(i) {
104 		char name[NAME_MAX];
105 		struct rte_ring *msgq_req, *msgq_rsp;
106 		struct thread *t = &thread[i];
107 		struct thread_data *t_data = &thread_data[i];
108 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
109 
110 		/* MSGQs */
111 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
112 
113 		msgq_req = rte_ring_create(name,
114 			THREAD_MSGQ_SIZE,
115 			cpu_id,
116 			RING_F_SP_ENQ | RING_F_SC_DEQ);
117 
118 		if (msgq_req == NULL) {
119 			thread_free();
120 			return -1;
121 		}
122 
123 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
124 
125 		msgq_rsp = rte_ring_create(name,
126 			THREAD_MSGQ_SIZE,
127 			cpu_id,
128 			RING_F_SP_ENQ | RING_F_SC_DEQ);
129 
130 		if (msgq_rsp == NULL) {
131 			thread_free();
132 			return -1;
133 		}
134 
135 		/* Control thread records */
136 		t->msgq_req = msgq_req;
137 		t->msgq_rsp = msgq_rsp;
138 		t->enabled = 1;
139 
140 		/* Data plane thread records */
141 		t_data->n_pipelines = 0;
142 		t_data->msgq_req = msgq_req;
143 		t_data->msgq_rsp = msgq_rsp;
144 		t_data->timer_period =
145 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
146 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
147 		t_data->time_next_min = t_data->time_next;
148 	}
149 
150 	return 0;
151 }
152 
153 static inline int
154 thread_is_running(uint32_t thread_id)
155 {
156 	enum rte_lcore_state_t thread_state;
157 
158 	thread_state = rte_eal_get_lcore_state(thread_id);
159 	return (thread_state == RUNNING) ? 1 : 0;
160 }
161 
162 /**
163  * Control thread & data plane threads: message passing
164  */
165 enum thread_req_type {
166 	THREAD_REQ_PIPELINE_ENABLE = 0,
167 	THREAD_REQ_PIPELINE_DISABLE,
168 	THREAD_REQ_MAX
169 };
170 
171 struct thread_msg_req {
172 	enum thread_req_type type;
173 
174 	union {
175 		struct {
176 			struct rte_swx_pipeline *p;
177 			uint32_t timer_period_ms;
178 		} pipeline_enable;
179 
180 		struct {
181 			struct rte_swx_pipeline *p;
182 		} pipeline_disable;
183 	};
184 };
185 
186 struct thread_msg_rsp {
187 	int status;
188 };
189 
190 /**
191  * Control thread
192  */
193 static struct thread_msg_req *
194 thread_msg_alloc(void)
195 {
196 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
197 		sizeof(struct thread_msg_rsp));
198 
199 	return calloc(1, size);
200 }
201 
202 static void
203 thread_msg_free(struct thread_msg_rsp *rsp)
204 {
205 	free(rsp);
206 }
207 
208 static struct thread_msg_rsp *
209 thread_msg_send_recv(uint32_t thread_id,
210 	struct thread_msg_req *req)
211 {
212 	struct thread *t = &thread[thread_id];
213 	struct rte_ring *msgq_req = t->msgq_req;
214 	struct rte_ring *msgq_rsp = t->msgq_rsp;
215 	struct thread_msg_rsp *rsp;
216 	int status;
217 
218 	/* send */
219 	do {
220 		status = rte_ring_sp_enqueue(msgq_req, req);
221 	} while (status == -ENOBUFS);
222 
223 	/* recv */
224 	do {
225 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
226 	} while (status != 0);
227 
228 	return rsp;
229 }
230 
231 int
232 thread_pipeline_enable(uint32_t thread_id,
233 	struct obj *obj,
234 	const char *pipeline_name)
235 {
236 	struct pipeline *p = pipeline_find(obj, pipeline_name);
237 	struct thread *t;
238 	struct thread_msg_req *req;
239 	struct thread_msg_rsp *rsp;
240 	int status;
241 
242 	/* Check input params */
243 	if ((thread_id >= RTE_MAX_LCORE) ||
244 		(p == NULL))
245 		return -1;
246 
247 	t = &thread[thread_id];
248 	if (t->enabled == 0)
249 		return -1;
250 
251 	if (!thread_is_running(thread_id)) {
252 		struct thread_data *td = &thread_data[thread_id];
253 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
254 
255 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
256 			return -1;
257 
258 		/* Data plane thread */
259 		td->p[td->n_pipelines] = p->p;
260 
261 		tdp->p = p->p;
262 		tdp->timer_period =
263 			(rte_get_tsc_hz() * p->timer_period_ms) / 1000;
264 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
265 
266 		td->n_pipelines++;
267 
268 		/* Pipeline */
269 		p->thread_id = thread_id;
270 		p->enabled = 1;
271 
272 		return 0;
273 	}
274 
275 	/* Allocate request */
276 	req = thread_msg_alloc();
277 	if (req == NULL)
278 		return -1;
279 
280 	/* Write request */
281 	req->type = THREAD_REQ_PIPELINE_ENABLE;
282 	req->pipeline_enable.p = p->p;
283 	req->pipeline_enable.timer_period_ms = p->timer_period_ms;
284 
285 	/* Send request and wait for response */
286 	rsp = thread_msg_send_recv(thread_id, req);
287 
288 	/* Read response */
289 	status = rsp->status;
290 
291 	/* Free response */
292 	thread_msg_free(rsp);
293 
294 	/* Request completion */
295 	if (status)
296 		return status;
297 
298 	p->thread_id = thread_id;
299 	p->enabled = 1;
300 
301 	return 0;
302 }
303 
304 int
305 thread_pipeline_disable(uint32_t thread_id,
306 	struct obj *obj,
307 	const char *pipeline_name)
308 {
309 	struct pipeline *p = pipeline_find(obj, pipeline_name);
310 	struct thread *t;
311 	struct thread_msg_req *req;
312 	struct thread_msg_rsp *rsp;
313 	int status;
314 
315 	/* Check input params */
316 	if ((thread_id >= RTE_MAX_LCORE) ||
317 		(p == NULL))
318 		return -1;
319 
320 	t = &thread[thread_id];
321 	if (t->enabled == 0)
322 		return -1;
323 
324 	if (p->enabled == 0)
325 		return 0;
326 
327 	if (p->thread_id != thread_id)
328 		return -1;
329 
330 	if (!thread_is_running(thread_id)) {
331 		struct thread_data *td = &thread_data[thread_id];
332 		uint32_t i;
333 
334 		for (i = 0; i < td->n_pipelines; i++) {
335 			struct pipeline_data *tdp = &td->pipeline_data[i];
336 
337 			if (tdp->p != p->p)
338 				continue;
339 
340 			/* Data plane thread */
341 			if (i < td->n_pipelines - 1) {
342 				struct rte_swx_pipeline *pipeline_last =
343 					td->p[td->n_pipelines - 1];
344 				struct pipeline_data *tdp_last =
345 					&td->pipeline_data[td->n_pipelines - 1];
346 
347 				td->p[i] = pipeline_last;
348 				memcpy(tdp, tdp_last, sizeof(*tdp));
349 			}
350 
351 			td->n_pipelines--;
352 
353 			/* Pipeline */
354 			p->enabled = 0;
355 
356 			break;
357 		}
358 
359 		return 0;
360 	}
361 
362 	/* Allocate request */
363 	req = thread_msg_alloc();
364 	if (req == NULL)
365 		return -1;
366 
367 	/* Write request */
368 	req->type = THREAD_REQ_PIPELINE_DISABLE;
369 	req->pipeline_disable.p = p->p;
370 
371 	/* Send request and wait for response */
372 	rsp = thread_msg_send_recv(thread_id, req);
373 
374 	/* Read response */
375 	status = rsp->status;
376 
377 	/* Free response */
378 	thread_msg_free(rsp);
379 
380 	/* Request completion */
381 	if (status)
382 		return status;
383 
384 	p->enabled = 0;
385 
386 	return 0;
387 }
388 
389 /**
390  * Data plane threads: message handling
391  */
392 static inline struct thread_msg_req *
393 thread_msg_recv(struct rte_ring *msgq_req)
394 {
395 	struct thread_msg_req *req;
396 
397 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
398 
399 	if (status != 0)
400 		return NULL;
401 
402 	return req;
403 }
404 
405 static inline void
406 thread_msg_send(struct rte_ring *msgq_rsp,
407 	struct thread_msg_rsp *rsp)
408 {
409 	int status;
410 
411 	do {
412 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
413 	} while (status == -ENOBUFS);
414 }
415 
416 static struct thread_msg_rsp *
417 thread_msg_handle_pipeline_enable(struct thread_data *t,
418 	struct thread_msg_req *req)
419 {
420 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
421 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
422 
423 	/* Request */
424 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
425 		rsp->status = -1;
426 		return rsp;
427 	}
428 
429 	t->p[t->n_pipelines] = req->pipeline_enable.p;
430 
431 	p->p = req->pipeline_enable.p;
432 	p->timer_period = (rte_get_tsc_hz() *
433 		req->pipeline_enable.timer_period_ms) / 1000;
434 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
435 
436 	t->n_pipelines++;
437 
438 	/* Response */
439 	rsp->status = 0;
440 	return rsp;
441 }
442 
443 static struct thread_msg_rsp *
444 thread_msg_handle_pipeline_disable(struct thread_data *t,
445 	struct thread_msg_req *req)
446 {
447 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
448 	uint32_t n_pipelines = t->n_pipelines;
449 	struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
450 	uint32_t i;
451 
452 	/* find pipeline */
453 	for (i = 0; i < n_pipelines; i++) {
454 		struct pipeline_data *p = &t->pipeline_data[i];
455 
456 		if (p->p != pipeline)
457 			continue;
458 
459 		if (i < n_pipelines - 1) {
460 			struct rte_swx_pipeline *pipeline_last =
461 				t->p[n_pipelines - 1];
462 			struct pipeline_data *p_last =
463 				&t->pipeline_data[n_pipelines - 1];
464 
465 			t->p[i] = pipeline_last;
466 			memcpy(p, p_last, sizeof(*p));
467 		}
468 
469 		t->n_pipelines--;
470 
471 		rsp->status = 0;
472 		return rsp;
473 	}
474 
475 	/* should not get here */
476 	rsp->status = 0;
477 	return rsp;
478 }
479 
480 static void
481 thread_msg_handle(struct thread_data *t)
482 {
483 	for ( ; ; ) {
484 		struct thread_msg_req *req;
485 		struct thread_msg_rsp *rsp;
486 
487 		req = thread_msg_recv(t->msgq_req);
488 		if (req == NULL)
489 			break;
490 
491 		switch (req->type) {
492 		case THREAD_REQ_PIPELINE_ENABLE:
493 			rsp = thread_msg_handle_pipeline_enable(t, req);
494 			break;
495 
496 		case THREAD_REQ_PIPELINE_DISABLE:
497 			rsp = thread_msg_handle_pipeline_disable(t, req);
498 			break;
499 
500 		default:
501 			rsp = (struct thread_msg_rsp *) req;
502 			rsp->status = -1;
503 		}
504 
505 		thread_msg_send(t->msgq_rsp, rsp);
506 	}
507 }
508 
509 /**
510  * Data plane threads: main
511  */
512 int
513 thread_main(void *arg __rte_unused)
514 {
515 	struct thread_data *t;
516 	uint32_t thread_id, i;
517 
518 	thread_id = rte_lcore_id();
519 	t = &thread_data[thread_id];
520 
521 	/* Dispatch loop */
522 	for (i = 0; ; i++) {
523 		uint32_t j;
524 
525 		/* Data Plane */
526 		for (j = 0; j < t->n_pipelines; j++)
527 			rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
528 
529 		/* Control Plane */
530 		if ((i & 0xF) == 0) {
531 			uint64_t time = rte_get_tsc_cycles();
532 			uint64_t time_next_min = UINT64_MAX;
533 
534 			if (time < t->time_next_min)
535 				continue;
536 
537 			/* Thread message queues */
538 			{
539 				uint64_t time_next = t->time_next;
540 
541 				if (time_next <= time) {
542 					thread_msg_handle(t);
543 					time_next = time + t->timer_period;
544 					t->time_next = time_next;
545 				}
546 
547 				if (time_next < time_next_min)
548 					time_next_min = time_next;
549 			}
550 
551 			t->time_next_min = time_next_min;
552 		}
553 	}
554 
555 	return 0;
556 }
557