xref: /dpdk/examples/pipeline/thread.c (revision 87d396163c005deb8d9f72ec0977f19e5edd8f47)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 
7 #include <rte_common.h>
8 #include <rte_cycles.h>
9 #include <rte_lcore.h>
10 #include <rte_ring.h>
11 
12 #include <rte_table_acl.h>
13 #include <rte_table_array.h>
14 #include <rte_table_hash.h>
15 #include <rte_table_lpm.h>
16 #include <rte_table_lpm_ipv6.h>
17 
18 #include "obj.h"
19 #include "thread.h"
20 
21 #ifndef THREAD_PIPELINES_MAX
22 #define THREAD_PIPELINES_MAX                               256
23 #endif
24 
25 #ifndef THREAD_MSGQ_SIZE
26 #define THREAD_MSGQ_SIZE                                   64
27 #endif
28 
29 #ifndef THREAD_TIMER_PERIOD_MS
30 #define THREAD_TIMER_PERIOD_MS                             100
31 #endif
32 
33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
34  * work, but not too big to avoid starving any other pipelines mapped to the
35  * same thread. For a pipeline that executes 10 instructions per packet, a
36  * quanta of 1000 instructions equates to processing 100 packets.
37  */
38 #ifndef PIPELINE_INSTR_QUANTA
39 #define PIPELINE_INSTR_QUANTA                              1000
40 #endif
41 
42 /**
43  * Control thread: data plane thread context
44  */
45 struct thread {
46 	struct rte_ring *msgq_req;
47 	struct rte_ring *msgq_rsp;
48 
49 	uint32_t enabled;
50 };
51 
52 static struct thread thread[RTE_MAX_LCORE];
53 
54 /**
55  * Data plane threads: context
56  */
57 struct pipeline_data {
58 	struct rte_swx_pipeline *p;
59 	uint64_t timer_period; /* Measured in CPU cycles. */
60 	uint64_t time_next;
61 };
62 
63 struct thread_data {
64 	struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX];
65 	uint32_t n_pipelines;
66 
67 	struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX];
68 	struct rte_ring *msgq_req;
69 	struct rte_ring *msgq_rsp;
70 	uint64_t timer_period; /* Measured in CPU cycles. */
71 	uint64_t time_next;
72 	uint64_t time_next_min;
73 } __rte_cache_aligned;
74 
75 static struct thread_data thread_data[RTE_MAX_LCORE];
76 
77 /**
78  * Control thread: data plane thread init
79  */
80 static void
81 thread_free(void)
82 {
83 	uint32_t i;
84 
85 	for (i = 0; i < RTE_MAX_LCORE; i++) {
86 		struct thread *t = &thread[i];
87 
88 		if (!rte_lcore_is_enabled(i))
89 			continue;
90 
91 		/* MSGQs */
92 		rte_ring_free(t->msgq_req);
93 
94 		rte_ring_free(t->msgq_rsp);
95 	}
96 }
97 
98 int
99 thread_init(void)
100 {
101 	uint32_t i;
102 
103 	RTE_LCORE_FOREACH_WORKER(i) {
104 		char name[NAME_MAX];
105 		struct rte_ring *msgq_req, *msgq_rsp;
106 		struct thread *t = &thread[i];
107 		struct thread_data *t_data = &thread_data[i];
108 		uint32_t cpu_id = rte_lcore_to_socket_id(i);
109 
110 		/* MSGQs */
111 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i);
112 
113 		msgq_req = rte_ring_create(name,
114 			THREAD_MSGQ_SIZE,
115 			cpu_id,
116 			RING_F_SP_ENQ | RING_F_SC_DEQ);
117 
118 		if (msgq_req == NULL) {
119 			thread_free();
120 			return -1;
121 		}
122 
123 		snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i);
124 
125 		msgq_rsp = rte_ring_create(name,
126 			THREAD_MSGQ_SIZE,
127 			cpu_id,
128 			RING_F_SP_ENQ | RING_F_SC_DEQ);
129 
130 		if (msgq_rsp == NULL) {
131 			thread_free();
132 			return -1;
133 		}
134 
135 		/* Control thread records */
136 		t->msgq_req = msgq_req;
137 		t->msgq_rsp = msgq_rsp;
138 		t->enabled = 1;
139 
140 		/* Data plane thread records */
141 		t_data->n_pipelines = 0;
142 		t_data->msgq_req = msgq_req;
143 		t_data->msgq_rsp = msgq_rsp;
144 		t_data->timer_period =
145 			(rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000;
146 		t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period;
147 		t_data->time_next_min = t_data->time_next;
148 	}
149 
150 	return 0;
151 }
152 
153 static inline int
154 thread_is_running(uint32_t thread_id)
155 {
156 	enum rte_lcore_state_t thread_state;
157 
158 	thread_state = rte_eal_get_lcore_state(thread_id);
159 	return (thread_state == RUNNING) ? 1 : 0;
160 }
161 
162 /**
163  * Control thread & data plane threads: message passing
164  */
165 enum thread_req_type {
166 	THREAD_REQ_PIPELINE_ENABLE = 0,
167 	THREAD_REQ_PIPELINE_DISABLE,
168 	THREAD_REQ_MAX
169 };
170 
171 struct thread_msg_req {
172 	enum thread_req_type type;
173 
174 	union {
175 		struct {
176 			struct rte_swx_pipeline *p;
177 			uint32_t timer_period_ms;
178 		} pipeline_enable;
179 
180 		struct {
181 			struct rte_swx_pipeline *p;
182 		} pipeline_disable;
183 	};
184 };
185 
186 struct thread_msg_rsp {
187 	int status;
188 };
189 
190 /**
191  * Control thread
192  */
193 static struct thread_msg_req *
194 thread_msg_alloc(void)
195 {
196 	size_t size = RTE_MAX(sizeof(struct thread_msg_req),
197 		sizeof(struct thread_msg_rsp));
198 
199 	return calloc(1, size);
200 }
201 
202 static void
203 thread_msg_free(struct thread_msg_rsp *rsp)
204 {
205 	free(rsp);
206 }
207 
208 static struct thread_msg_rsp *
209 thread_msg_send_recv(uint32_t thread_id,
210 	struct thread_msg_req *req)
211 {
212 	struct thread *t = &thread[thread_id];
213 	struct rte_ring *msgq_req = t->msgq_req;
214 	struct rte_ring *msgq_rsp = t->msgq_rsp;
215 	struct thread_msg_rsp *rsp;
216 	int status;
217 
218 	/* send */
219 	do {
220 		status = rte_ring_sp_enqueue(msgq_req, req);
221 	} while (status == -ENOBUFS);
222 
223 	/* recv */
224 	do {
225 		status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp);
226 	} while (status != 0);
227 
228 	return rsp;
229 }
230 
231 static int
232 thread_is_pipeline_enabled(uint32_t thread_id, struct rte_swx_pipeline *p)
233 {
234 	struct thread *t = &thread[thread_id];
235 	struct thread_data *td = &thread_data[thread_id];
236 	uint32_t i;
237 
238 	if (!t->enabled)
239 		return 0; /* Pipeline NOT enabled on this thread. */
240 
241 	for (i = 0; i < td->n_pipelines; i++)
242 		if (td->p[i] == p)
243 			return 1; /* Pipeline enabled on this thread. */
244 
245 	return 0 /* Pipeline NOT enabled on this thread. */;
246 }
247 
248 int
249 thread_pipeline_enable(uint32_t thread_id, struct rte_swx_pipeline *p, uint32_t timer_period_ms)
250 {
251 	struct thread *t;
252 	struct thread_msg_req *req;
253 	struct thread_msg_rsp *rsp;
254 	int status;
255 
256 	/* Check input params */
257 	if ((thread_id >= RTE_MAX_LCORE) || !p || !timer_period_ms)
258 		return -1;
259 
260 	t = &thread[thread_id];
261 	if (t->enabled == 0)
262 		return -1;
263 
264 	if (!thread_is_running(thread_id)) {
265 		struct thread_data *td = &thread_data[thread_id];
266 		struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines];
267 
268 		if (td->n_pipelines >= THREAD_PIPELINES_MAX)
269 			return -1;
270 
271 		/* Data plane thread */
272 		td->p[td->n_pipelines] = p;
273 
274 		tdp->p = p;
275 		tdp->timer_period = (rte_get_tsc_hz() * timer_period_ms) / 1000;
276 		tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period;
277 
278 		td->n_pipelines++;
279 
280 		return 0;
281 	}
282 
283 	/* Allocate request */
284 	req = thread_msg_alloc();
285 	if (req == NULL)
286 		return -1;
287 
288 	/* Write request */
289 	req->type = THREAD_REQ_PIPELINE_ENABLE;
290 	req->pipeline_enable.p = p;
291 	req->pipeline_enable.timer_period_ms = timer_period_ms;
292 
293 	/* Send request and wait for response */
294 	rsp = thread_msg_send_recv(thread_id, req);
295 
296 	/* Read response */
297 	status = rsp->status;
298 
299 	/* Free response */
300 	thread_msg_free(rsp);
301 
302 	/* Request completion */
303 	if (status)
304 		return status;
305 
306 	return 0;
307 }
308 
309 int
310 thread_pipeline_disable(uint32_t thread_id, struct rte_swx_pipeline *p)
311 {
312 	struct thread *t;
313 	struct thread_msg_req *req;
314 	struct thread_msg_rsp *rsp;
315 	int status;
316 
317 	/* Check input params */
318 	if ((thread_id >= RTE_MAX_LCORE) || !p)
319 		return -1;
320 
321 	t = &thread[thread_id];
322 	if (t->enabled == 0)
323 		return -1;
324 
325 	if (!thread_is_pipeline_enabled(thread_id, p))
326 		return 0;
327 
328 	if (!thread_is_running(thread_id)) {
329 		struct thread_data *td = &thread_data[thread_id];
330 		uint32_t i;
331 
332 		for (i = 0; i < td->n_pipelines; i++) {
333 			struct pipeline_data *tdp = &td->pipeline_data[i];
334 
335 			if (tdp->p != p)
336 				continue;
337 
338 			/* Data plane thread */
339 			if (i < td->n_pipelines - 1) {
340 				struct rte_swx_pipeline *pipeline_last =
341 					td->p[td->n_pipelines - 1];
342 				struct pipeline_data *tdp_last =
343 					&td->pipeline_data[td->n_pipelines - 1];
344 
345 				td->p[i] = pipeline_last;
346 				memcpy(tdp, tdp_last, sizeof(*tdp));
347 			}
348 
349 			td->n_pipelines--;
350 
351 			break;
352 		}
353 
354 		return 0;
355 	}
356 
357 	/* Allocate request */
358 	req = thread_msg_alloc();
359 	if (req == NULL)
360 		return -1;
361 
362 	/* Write request */
363 	req->type = THREAD_REQ_PIPELINE_DISABLE;
364 	req->pipeline_disable.p = p;
365 
366 	/* Send request and wait for response */
367 	rsp = thread_msg_send_recv(thread_id, req);
368 
369 	/* Read response */
370 	status = rsp->status;
371 
372 	/* Free response */
373 	thread_msg_free(rsp);
374 
375 	/* Request completion */
376 	if (status)
377 		return status;
378 
379 	return 0;
380 }
381 
382 /**
383  * Data plane threads: message handling
384  */
385 static inline struct thread_msg_req *
386 thread_msg_recv(struct rte_ring *msgq_req)
387 {
388 	struct thread_msg_req *req;
389 
390 	int status = rte_ring_sc_dequeue(msgq_req, (void **) &req);
391 
392 	if (status != 0)
393 		return NULL;
394 
395 	return req;
396 }
397 
398 static inline void
399 thread_msg_send(struct rte_ring *msgq_rsp,
400 	struct thread_msg_rsp *rsp)
401 {
402 	int status;
403 
404 	do {
405 		status = rte_ring_sp_enqueue(msgq_rsp, rsp);
406 	} while (status == -ENOBUFS);
407 }
408 
409 static struct thread_msg_rsp *
410 thread_msg_handle_pipeline_enable(struct thread_data *t,
411 	struct thread_msg_req *req)
412 {
413 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
414 	struct pipeline_data *p = &t->pipeline_data[t->n_pipelines];
415 
416 	/* Request */
417 	if (t->n_pipelines >= THREAD_PIPELINES_MAX) {
418 		rsp->status = -1;
419 		return rsp;
420 	}
421 
422 	t->p[t->n_pipelines] = req->pipeline_enable.p;
423 
424 	p->p = req->pipeline_enable.p;
425 	p->timer_period = (rte_get_tsc_hz() *
426 		req->pipeline_enable.timer_period_ms) / 1000;
427 	p->time_next = rte_get_tsc_cycles() + p->timer_period;
428 
429 	t->n_pipelines++;
430 
431 	/* Response */
432 	rsp->status = 0;
433 	return rsp;
434 }
435 
436 static struct thread_msg_rsp *
437 thread_msg_handle_pipeline_disable(struct thread_data *t,
438 	struct thread_msg_req *req)
439 {
440 	struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req;
441 	uint32_t n_pipelines = t->n_pipelines;
442 	struct rte_swx_pipeline *pipeline = req->pipeline_disable.p;
443 	uint32_t i;
444 
445 	/* find pipeline */
446 	for (i = 0; i < n_pipelines; i++) {
447 		struct pipeline_data *p = &t->pipeline_data[i];
448 
449 		if (p->p != pipeline)
450 			continue;
451 
452 		if (i < n_pipelines - 1) {
453 			struct rte_swx_pipeline *pipeline_last =
454 				t->p[n_pipelines - 1];
455 			struct pipeline_data *p_last =
456 				&t->pipeline_data[n_pipelines - 1];
457 
458 			t->p[i] = pipeline_last;
459 			memcpy(p, p_last, sizeof(*p));
460 		}
461 
462 		t->n_pipelines--;
463 
464 		rsp->status = 0;
465 		return rsp;
466 	}
467 
468 	/* should not get here */
469 	rsp->status = 0;
470 	return rsp;
471 }
472 
473 static void
474 thread_msg_handle(struct thread_data *t)
475 {
476 	for ( ; ; ) {
477 		struct thread_msg_req *req;
478 		struct thread_msg_rsp *rsp;
479 
480 		req = thread_msg_recv(t->msgq_req);
481 		if (req == NULL)
482 			break;
483 
484 		switch (req->type) {
485 		case THREAD_REQ_PIPELINE_ENABLE:
486 			rsp = thread_msg_handle_pipeline_enable(t, req);
487 			break;
488 
489 		case THREAD_REQ_PIPELINE_DISABLE:
490 			rsp = thread_msg_handle_pipeline_disable(t, req);
491 			break;
492 
493 		default:
494 			rsp = (struct thread_msg_rsp *) req;
495 			rsp->status = -1;
496 		}
497 
498 		thread_msg_send(t->msgq_rsp, rsp);
499 	}
500 }
501 
502 /**
503  * Data plane threads: main
504  */
505 int
506 thread_main(void *arg __rte_unused)
507 {
508 	struct thread_data *t;
509 	uint32_t thread_id, i;
510 
511 	thread_id = rte_lcore_id();
512 	t = &thread_data[thread_id];
513 
514 	/* Dispatch loop */
515 	for (i = 0; ; i++) {
516 		uint32_t j;
517 
518 		/* Data Plane */
519 		for (j = 0; j < t->n_pipelines; j++)
520 			rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA);
521 
522 		/* Control Plane */
523 		if ((i & 0xF) == 0) {
524 			uint64_t time = rte_get_tsc_cycles();
525 			uint64_t time_next_min = UINT64_MAX;
526 
527 			if (time < t->time_next_min)
528 				continue;
529 
530 			/* Thread message queues */
531 			{
532 				uint64_t time_next = t->time_next;
533 
534 				if (time_next <= time) {
535 					thread_msg_handle(t);
536 					time_next = time + t->timer_period;
537 					t->time_next = time_next;
538 				}
539 
540 				if (time_next < time_next_min)
541 					time_next_min = time_next;
542 			}
543 
544 			t->time_next_min = time_next_min;
545 		}
546 	}
547 
548 	return 0;
549 }
550