xref: /dpdk/examples/pipeline/thread.c (revision 7e06c0de1952d3109a5b0c4779d7e7d8059c9d78)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 #include <errno.h>
7 
8 #include <rte_atomic.h>
9 #include <rte_common.h>
10 #include <rte_lcore.h>
11 
12 #include "obj.h"
13 #include "thread.h"
14 
15 #ifndef THREAD_PIPELINES_MAX
16 #define THREAD_PIPELINES_MAX                               256
17 #endif
18 
19 #ifndef THREAD_BLOCKS_MAX
20 #define THREAD_BLOCKS_MAX                                  256
21 #endif
22 
23 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
24  * work, but not too big to avoid starving any other pipelines mapped to the
25  * same thread. For a pipeline that executes 10 instructions per packet, a
26  * quanta of 1000 instructions equates to processing 100 packets.
27  */
28 #ifndef PIPELINE_INSTR_QUANTA
29 #define PIPELINE_INSTR_QUANTA                              1000
30 #endif
31 
32 /**
33  * In this design, there is a single control plane (CP) thread and one or multiple data plane (DP)
34  * threads. Each DP thread can run up to THREAD_PIPELINES_MAX pipelines and up to THREAD_BLOCKS_MAX
35  * blocks.
36  *
37  * The pipelines and blocks are single threaded, meaning that a given pipeline/block can be run by a
38  * single thread at any given time, so the same pipeline/block cannot show up in the list of
39  * pipelines/blocks of more than one thread at any specific moment.
40  *
41  * Each DP thread has its own context (struct thread instance), which it shares with the CP thread:
42  *  - Read-write by the CP thread;
43  *  - Read-only by the DP thread.
44  */
45 struct block {
46 	block_run_f block_func;
47 	void *block;
48 };
49 
50 struct __rte_cache_aligned thread {
51 	struct rte_swx_pipeline *pipelines[THREAD_PIPELINES_MAX];
52 	struct block *blocks[THREAD_BLOCKS_MAX];
53 	volatile uint64_t n_pipelines;
54 	volatile uint64_t n_blocks;
55 	int enabled;
56 };
57 
58 static struct thread threads[RTE_MAX_LCORE];
59 
60 /**
61  * Control plane (CP) thread.
62  */
63 int
thread_init(void)64 thread_init(void)
65 {
66 	uint32_t thread_id;
67 	int status = 0;
68 
69 	RTE_LCORE_FOREACH_WORKER(thread_id) {
70 		struct thread *t = &threads[thread_id];
71 		uint32_t i;
72 
73 		t->enabled = 1;
74 
75 		for (i = 0; i < THREAD_BLOCKS_MAX; i++) {
76 			struct block *b;
77 
78 			b = calloc(1, sizeof(struct block));
79 			if (!b) {
80 				status = -ENOMEM;
81 				goto error;
82 			}
83 
84 			t->blocks[i] = b;
85 		}
86 	}
87 
88 	return 0;
89 
90 error:
91 	RTE_LCORE_FOREACH_WORKER(thread_id) {
92 		struct thread *t = &threads[thread_id];
93 		uint32_t i;
94 
95 		t->enabled = 0;
96 
97 		for (i = 0; i < THREAD_BLOCKS_MAX; i++) {
98 			free(t->blocks[i]);
99 			t->blocks[i] = NULL;
100 		}
101 	}
102 
103 	return status;
104 }
105 
106 static uint32_t
pipeline_find(struct rte_swx_pipeline * p)107 pipeline_find(struct rte_swx_pipeline *p)
108 {
109 	uint32_t thread_id;
110 
111 	for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
112 		struct thread *t = &threads[thread_id];
113 		uint32_t i;
114 
115 		if (!t->enabled)
116 			continue;
117 
118 		for (i = 0; i < t->n_pipelines; i++)
119 			if (t->pipelines[i] == p)
120 				break;
121 	}
122 
123 	return thread_id;
124 }
125 
126 static uint32_t
block_find(void * b)127 block_find(void *b)
128 {
129 	uint32_t thread_id;
130 
131 	for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
132 		struct thread *t = &threads[thread_id];
133 		uint32_t i;
134 
135 		if (!t->enabled)
136 			continue;
137 
138 		for (i = 0; i < t->n_blocks; i++)
139 			if (t->blocks[i]->block == b)
140 				break;
141 	}
142 
143 	return thread_id;
144 }
145 
146 /**
147  * Enable a given pipeline to run on a specific DP thread.
148  *
149  * CP thread:
150  *  - Adds a new pipeline to the end of the DP thread pipeline list (t->pipelines[]);
151  *  - Increments the DP thread number of pipelines (t->n_pipelines). It is important to make sure
152  *    that t->pipelines[] update is completed BEFORE the t->n_pipelines update, hence the memory
153  *    write barrier used below.
154  *
155  * DP thread:
156  *  - Reads t->n_pipelines before starting every new iteration through t->pipelines[]. It detects
157  *    the new pipeline when it sees the updated t->n_pipelines value;
158  *  - If somehow the above condition is not met, so t->n_pipelines update is incorrectly taking
159  *    place before the t->pipelines[] update is completed, then the DP thread will use an incorrect
160  *    handle for the new pipeline, which can result in memory corruption or segmentation fault.
161  */
162 int
pipeline_enable(struct rte_swx_pipeline * p,uint32_t thread_id)163 pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id)
164 {
165 	struct thread *t;
166 	uint64_t n_pipelines;
167 
168 	/* Check input params */
169 	if (!p || thread_id >= RTE_MAX_LCORE)
170 		return -EINVAL;
171 
172 	if (pipeline_find(p) < RTE_MAX_LCORE)
173 		return -EEXIST;
174 
175 	t = &threads[thread_id];
176 	if (!t->enabled)
177 		return -EINVAL;
178 
179 	n_pipelines = t->n_pipelines;
180 
181 	/* Check there is room for at least one more pipeline. */
182 	if (n_pipelines >= THREAD_PIPELINES_MAX)
183 		return -ENOSPC;
184 
185 	/* Install the new pipeline. */
186 	t->pipelines[n_pipelines] = p;
187 	rte_wmb();
188 	t->n_pipelines = n_pipelines + 1;
189 
190 	return 0;
191 }
192 
193 /**
194  * Disable a given pipeline from running on any DP thread.
195  *
196  * CP thread:
197  *  - Detects the thread that is running the given pipeline, if any;
198  *  - Writes the last pipeline handle (pipeline_last = t->pipelines[t->n_pipelines - 1]) on the
199  *    position of the pipeline to be disabled (t->pipelines[i] = pipeline_last) and decrements the
200  *    number of pipelines running on the current thread (t->n_pipelines--). This approach makes sure
201  *    that no holes with invalid locations are ever developed within the t->pipelines[] array.
202  *  - If the memory barrier below is present, then t->n_pipelines update is guaranteed to take place
203  *    after the t->pipelines[] update is completed. The possible DP thread behaviors are detailed
204  *    below, which are all valid:
205  *     - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
206  *       exactly one time during the current dispatch loop iteration. This takes place when the DP
207  *       thread sees the final value of t->n_pipelines;
208  *     - Not run the removed pipeline at all, run all the other pipelines, except pipeline_last,
209  *       exactly one time and the pipeline_last exactly two times during the current dispatch loop
210  *       iteration. This takes place when the DP thread sees the initial value of t->n_pipelines.
211  *  - If the memory barrier below is not present, then the t->n_pipelines update may be reordered by
212  *    the CPU, so that it takes place before the t->pipelines[] update. The possible DP thread
213  *    behaviors are detailed below, which are all valid:
214  *     - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
215  *       exactly one time during the current dispatch loop iteration. This takes place when the DP
216  *       thread sees the final values of the t->pipeline[] array;
217  *     - Run the removed pipeline one last time, run all the other pipelines exactly one time, with
218  *       the exception of the pipeline_last, which is not run during the current dispatch loop
219  *       iteration. This takes place when the DP thread sees the initial values of t->pipeline[].
220  *
221  * DP thread:
222  *  - Reads t->n_pipelines before starting every new iteration through t->pipelines[].
223  */
224 void
pipeline_disable(struct rte_swx_pipeline * p)225 pipeline_disable(struct rte_swx_pipeline *p)
226 {
227 	struct thread *t;
228 	uint64_t n_pipelines;
229 	uint32_t thread_id, i;
230 
231 	/* Check input params */
232 	if (!p)
233 		return;
234 
235 	/* Find the thread that runs this pipeline. */
236 	thread_id = pipeline_find(p);
237 	if (thread_id == RTE_MAX_LCORE)
238 		return;
239 
240 	t = &threads[thread_id];
241 	n_pipelines = t->n_pipelines;
242 
243 	for (i = 0; i < n_pipelines; i++) {
244 		struct rte_swx_pipeline *pipeline = t->pipelines[i];
245 
246 		if (pipeline != p)
247 			continue;
248 
249 		if (i < n_pipelines - 1) {
250 			struct rte_swx_pipeline *pipeline_last = t->pipelines[n_pipelines - 1];
251 
252 			t->pipelines[i] = pipeline_last;
253 		}
254 
255 		rte_wmb();
256 		t->n_pipelines = n_pipelines - 1;
257 
258 		return;
259 	}
260 
261 	return;
262 }
263 
264 int
block_enable(block_run_f block_func,void * block,uint32_t thread_id)265 block_enable(block_run_f block_func, void *block, uint32_t thread_id)
266 {
267 	struct thread *t;
268 	uint64_t n_blocks;
269 
270 	/* Check input params */
271 	if (!block_func || !block || thread_id >= RTE_MAX_LCORE)
272 		return -EINVAL;
273 
274 	if (block_find(block) < RTE_MAX_LCORE)
275 		return -EEXIST;
276 
277 	t = &threads[thread_id];
278 	if (!t->enabled)
279 		return -EINVAL;
280 
281 	n_blocks = t->n_blocks;
282 
283 	/* Check there is room for at least one more block. */
284 	if (n_blocks >= THREAD_BLOCKS_MAX)
285 		return -ENOSPC;
286 
287 	/* Install the new block. */
288 	t->blocks[n_blocks]->block_func = block_func;
289 	t->blocks[n_blocks]->block = block;
290 
291 	rte_wmb();
292 	t->n_blocks = n_blocks + 1;
293 
294 	return 0;
295 }
296 
297 void
block_disable(void * block)298 block_disable(void *block)
299 {
300 	struct thread *t;
301 	uint64_t n_blocks;
302 	uint32_t thread_id, i;
303 
304 	/* Check input params */
305 	if (!block)
306 		return;
307 
308 	/* Find the thread that runs this block. */
309 	thread_id = block_find(block);
310 	if (thread_id == RTE_MAX_LCORE)
311 		return;
312 
313 	t = &threads[thread_id];
314 	n_blocks = t->n_blocks;
315 
316 	for (i = 0; i < n_blocks; i++) {
317 		struct block *b = t->blocks[i];
318 
319 		if (block != b->block)
320 			continue;
321 
322 		if (i < n_blocks - 1) {
323 			struct block *block_last = t->blocks[n_blocks - 1];
324 
325 			t->blocks[i] = block_last;
326 		}
327 
328 		rte_wmb();
329 		t->n_blocks = n_blocks - 1;
330 
331 		rte_wmb();
332 		t->blocks[n_blocks - 1] = b;
333 
334 		return;
335 	}
336 }
337 
338 /**
339  * Data plane (DP) threads.
340  *
341 
342 
343  * The t->n_pipelines variable is modified by the CP thread every time changes to the t->pipeline[]
344  * array are operated, so it is therefore very important that the latest value of t->n_pipelines is
345  * read by the DP thread at the beginning of every new dispatch loop iteration, otherwise a stale
346  * t->n_pipelines value may result in new pipelines not being detected, running pipelines that have
347  * been removed and are possibly no longer valid (e.g. when the pipeline_last is removed), running
348  * one pipeline (pipeline_last) twice as frequently than the rest of the pipelines (e.g. when a
349  * pipeline other than pipeline_last is removed), etc. This is the reason why t->n_pipelines is
350  * marked as volatile.
351  */
352 int
thread_main(void * arg __rte_unused)353 thread_main(void *arg __rte_unused)
354 {
355 	struct thread *t;
356 	uint32_t thread_id;
357 
358 	thread_id = rte_lcore_id();
359 	t = &threads[thread_id];
360 
361 	/* Dispatch loop. */
362 	for ( ; ; ) {
363 		uint32_t i;
364 
365 		/* Pipelines. */
366 		for (i = 0; i < t->n_pipelines; i++)
367 			rte_swx_pipeline_run(t->pipelines[i], PIPELINE_INSTR_QUANTA);
368 
369 		/* Blocks. */
370 		for (i = 0; i < t->n_blocks; i++) {
371 			struct block *b = t->blocks[i];
372 
373 			b->block_func(b->block);
374 		}
375 	}
376 
377 	return 0;
378 }
379