1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2020 Intel Corporation
3 */
4
5 #include <stdlib.h>
6 #include <errno.h>
7
8 #include <rte_atomic.h>
9 #include <rte_common.h>
10 #include <rte_lcore.h>
11
12 #include "obj.h"
13 #include "thread.h"
14
15 #ifndef THREAD_PIPELINES_MAX
16 #define THREAD_PIPELINES_MAX 256
17 #endif
18
19 #ifndef THREAD_BLOCKS_MAX
20 #define THREAD_BLOCKS_MAX 256
21 #endif
22
23 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful
24 * work, but not too big to avoid starving any other pipelines mapped to the
25 * same thread. For a pipeline that executes 10 instructions per packet, a
26 * quanta of 1000 instructions equates to processing 100 packets.
27 */
28 #ifndef PIPELINE_INSTR_QUANTA
29 #define PIPELINE_INSTR_QUANTA 1000
30 #endif
31
32 /**
33 * In this design, there is a single control plane (CP) thread and one or multiple data plane (DP)
34 * threads. Each DP thread can run up to THREAD_PIPELINES_MAX pipelines and up to THREAD_BLOCKS_MAX
35 * blocks.
36 *
37 * The pipelines and blocks are single threaded, meaning that a given pipeline/block can be run by a
38 * single thread at any given time, so the same pipeline/block cannot show up in the list of
39 * pipelines/blocks of more than one thread at any specific moment.
40 *
41 * Each DP thread has its own context (struct thread instance), which it shares with the CP thread:
42 * - Read-write by the CP thread;
43 * - Read-only by the DP thread.
44 */
45 struct block {
46 block_run_f block_func;
47 void *block;
48 };
49
50 struct __rte_cache_aligned thread {
51 struct rte_swx_pipeline *pipelines[THREAD_PIPELINES_MAX];
52 struct block *blocks[THREAD_BLOCKS_MAX];
53 volatile uint64_t n_pipelines;
54 volatile uint64_t n_blocks;
55 int enabled;
56 };
57
58 static struct thread threads[RTE_MAX_LCORE];
59
60 /**
61 * Control plane (CP) thread.
62 */
63 int
thread_init(void)64 thread_init(void)
65 {
66 uint32_t thread_id;
67 int status = 0;
68
69 RTE_LCORE_FOREACH_WORKER(thread_id) {
70 struct thread *t = &threads[thread_id];
71 uint32_t i;
72
73 t->enabled = 1;
74
75 for (i = 0; i < THREAD_BLOCKS_MAX; i++) {
76 struct block *b;
77
78 b = calloc(1, sizeof(struct block));
79 if (!b) {
80 status = -ENOMEM;
81 goto error;
82 }
83
84 t->blocks[i] = b;
85 }
86 }
87
88 return 0;
89
90 error:
91 RTE_LCORE_FOREACH_WORKER(thread_id) {
92 struct thread *t = &threads[thread_id];
93 uint32_t i;
94
95 t->enabled = 0;
96
97 for (i = 0; i < THREAD_BLOCKS_MAX; i++) {
98 free(t->blocks[i]);
99 t->blocks[i] = NULL;
100 }
101 }
102
103 return status;
104 }
105
106 static uint32_t
pipeline_find(struct rte_swx_pipeline * p)107 pipeline_find(struct rte_swx_pipeline *p)
108 {
109 uint32_t thread_id;
110
111 for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
112 struct thread *t = &threads[thread_id];
113 uint32_t i;
114
115 if (!t->enabled)
116 continue;
117
118 for (i = 0; i < t->n_pipelines; i++)
119 if (t->pipelines[i] == p)
120 break;
121 }
122
123 return thread_id;
124 }
125
126 static uint32_t
block_find(void * b)127 block_find(void *b)
128 {
129 uint32_t thread_id;
130
131 for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
132 struct thread *t = &threads[thread_id];
133 uint32_t i;
134
135 if (!t->enabled)
136 continue;
137
138 for (i = 0; i < t->n_blocks; i++)
139 if (t->blocks[i]->block == b)
140 break;
141 }
142
143 return thread_id;
144 }
145
146 /**
147 * Enable a given pipeline to run on a specific DP thread.
148 *
149 * CP thread:
150 * - Adds a new pipeline to the end of the DP thread pipeline list (t->pipelines[]);
151 * - Increments the DP thread number of pipelines (t->n_pipelines). It is important to make sure
152 * that t->pipelines[] update is completed BEFORE the t->n_pipelines update, hence the memory
153 * write barrier used below.
154 *
155 * DP thread:
156 * - Reads t->n_pipelines before starting every new iteration through t->pipelines[]. It detects
157 * the new pipeline when it sees the updated t->n_pipelines value;
158 * - If somehow the above condition is not met, so t->n_pipelines update is incorrectly taking
159 * place before the t->pipelines[] update is completed, then the DP thread will use an incorrect
160 * handle for the new pipeline, which can result in memory corruption or segmentation fault.
161 */
162 int
pipeline_enable(struct rte_swx_pipeline * p,uint32_t thread_id)163 pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id)
164 {
165 struct thread *t;
166 uint64_t n_pipelines;
167
168 /* Check input params */
169 if (!p || thread_id >= RTE_MAX_LCORE)
170 return -EINVAL;
171
172 if (pipeline_find(p) < RTE_MAX_LCORE)
173 return -EEXIST;
174
175 t = &threads[thread_id];
176 if (!t->enabled)
177 return -EINVAL;
178
179 n_pipelines = t->n_pipelines;
180
181 /* Check there is room for at least one more pipeline. */
182 if (n_pipelines >= THREAD_PIPELINES_MAX)
183 return -ENOSPC;
184
185 /* Install the new pipeline. */
186 t->pipelines[n_pipelines] = p;
187 rte_wmb();
188 t->n_pipelines = n_pipelines + 1;
189
190 return 0;
191 }
192
193 /**
194 * Disable a given pipeline from running on any DP thread.
195 *
196 * CP thread:
197 * - Detects the thread that is running the given pipeline, if any;
198 * - Writes the last pipeline handle (pipeline_last = t->pipelines[t->n_pipelines - 1]) on the
199 * position of the pipeline to be disabled (t->pipelines[i] = pipeline_last) and decrements the
200 * number of pipelines running on the current thread (t->n_pipelines--). This approach makes sure
201 * that no holes with invalid locations are ever developed within the t->pipelines[] array.
202 * - If the memory barrier below is present, then t->n_pipelines update is guaranteed to take place
203 * after the t->pipelines[] update is completed. The possible DP thread behaviors are detailed
204 * below, which are all valid:
205 * - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
206 * exactly one time during the current dispatch loop iteration. This takes place when the DP
207 * thread sees the final value of t->n_pipelines;
208 * - Not run the removed pipeline at all, run all the other pipelines, except pipeline_last,
209 * exactly one time and the pipeline_last exactly two times during the current dispatch loop
210 * iteration. This takes place when the DP thread sees the initial value of t->n_pipelines.
211 * - If the memory barrier below is not present, then the t->n_pipelines update may be reordered by
212 * the CPU, so that it takes place before the t->pipelines[] update. The possible DP thread
213 * behaviors are detailed below, which are all valid:
214 * - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last)
215 * exactly one time during the current dispatch loop iteration. This takes place when the DP
216 * thread sees the final values of the t->pipeline[] array;
217 * - Run the removed pipeline one last time, run all the other pipelines exactly one time, with
218 * the exception of the pipeline_last, which is not run during the current dispatch loop
219 * iteration. This takes place when the DP thread sees the initial values of t->pipeline[].
220 *
221 * DP thread:
222 * - Reads t->n_pipelines before starting every new iteration through t->pipelines[].
223 */
224 void
pipeline_disable(struct rte_swx_pipeline * p)225 pipeline_disable(struct rte_swx_pipeline *p)
226 {
227 struct thread *t;
228 uint64_t n_pipelines;
229 uint32_t thread_id, i;
230
231 /* Check input params */
232 if (!p)
233 return;
234
235 /* Find the thread that runs this pipeline. */
236 thread_id = pipeline_find(p);
237 if (thread_id == RTE_MAX_LCORE)
238 return;
239
240 t = &threads[thread_id];
241 n_pipelines = t->n_pipelines;
242
243 for (i = 0; i < n_pipelines; i++) {
244 struct rte_swx_pipeline *pipeline = t->pipelines[i];
245
246 if (pipeline != p)
247 continue;
248
249 if (i < n_pipelines - 1) {
250 struct rte_swx_pipeline *pipeline_last = t->pipelines[n_pipelines - 1];
251
252 t->pipelines[i] = pipeline_last;
253 }
254
255 rte_wmb();
256 t->n_pipelines = n_pipelines - 1;
257
258 return;
259 }
260
261 return;
262 }
263
264 int
block_enable(block_run_f block_func,void * block,uint32_t thread_id)265 block_enable(block_run_f block_func, void *block, uint32_t thread_id)
266 {
267 struct thread *t;
268 uint64_t n_blocks;
269
270 /* Check input params */
271 if (!block_func || !block || thread_id >= RTE_MAX_LCORE)
272 return -EINVAL;
273
274 if (block_find(block) < RTE_MAX_LCORE)
275 return -EEXIST;
276
277 t = &threads[thread_id];
278 if (!t->enabled)
279 return -EINVAL;
280
281 n_blocks = t->n_blocks;
282
283 /* Check there is room for at least one more block. */
284 if (n_blocks >= THREAD_BLOCKS_MAX)
285 return -ENOSPC;
286
287 /* Install the new block. */
288 t->blocks[n_blocks]->block_func = block_func;
289 t->blocks[n_blocks]->block = block;
290
291 rte_wmb();
292 t->n_blocks = n_blocks + 1;
293
294 return 0;
295 }
296
297 void
block_disable(void * block)298 block_disable(void *block)
299 {
300 struct thread *t;
301 uint64_t n_blocks;
302 uint32_t thread_id, i;
303
304 /* Check input params */
305 if (!block)
306 return;
307
308 /* Find the thread that runs this block. */
309 thread_id = block_find(block);
310 if (thread_id == RTE_MAX_LCORE)
311 return;
312
313 t = &threads[thread_id];
314 n_blocks = t->n_blocks;
315
316 for (i = 0; i < n_blocks; i++) {
317 struct block *b = t->blocks[i];
318
319 if (block != b->block)
320 continue;
321
322 if (i < n_blocks - 1) {
323 struct block *block_last = t->blocks[n_blocks - 1];
324
325 t->blocks[i] = block_last;
326 }
327
328 rte_wmb();
329 t->n_blocks = n_blocks - 1;
330
331 rte_wmb();
332 t->blocks[n_blocks - 1] = b;
333
334 return;
335 }
336 }
337
338 /**
339 * Data plane (DP) threads.
340 *
341
342
343 * The t->n_pipelines variable is modified by the CP thread every time changes to the t->pipeline[]
344 * array are operated, so it is therefore very important that the latest value of t->n_pipelines is
345 * read by the DP thread at the beginning of every new dispatch loop iteration, otherwise a stale
346 * t->n_pipelines value may result in new pipelines not being detected, running pipelines that have
347 * been removed and are possibly no longer valid (e.g. when the pipeline_last is removed), running
348 * one pipeline (pipeline_last) twice as frequently than the rest of the pipelines (e.g. when a
349 * pipeline other than pipeline_last is removed), etc. This is the reason why t->n_pipelines is
350 * marked as volatile.
351 */
352 int
thread_main(void * arg __rte_unused)353 thread_main(void *arg __rte_unused)
354 {
355 struct thread *t;
356 uint32_t thread_id;
357
358 thread_id = rte_lcore_id();
359 t = &threads[thread_id];
360
361 /* Dispatch loop. */
362 for ( ; ; ) {
363 uint32_t i;
364
365 /* Pipelines. */
366 for (i = 0; i < t->n_pipelines; i++)
367 rte_swx_pipeline_run(t->pipelines[i], PIPELINE_INSTR_QUANTA);
368
369 /* Blocks. */
370 for (i = 0; i < t->n_blocks; i++) {
371 struct block *b = t->blocks[i];
372
373 b->block_func(b->block);
374 }
375 }
376
377 return 0;
378 }
379