1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <stdlib.h> 6 #include <errno.h> 7 8 #include <rte_atomic.h> 9 #include <rte_common.h> 10 #include <rte_lcore.h> 11 12 #include "obj.h" 13 #include "thread.h" 14 15 #ifndef THREAD_PIPELINES_MAX 16 #define THREAD_PIPELINES_MAX 256 17 #endif 18 19 #ifndef THREAD_BLOCKS_MAX 20 #define THREAD_BLOCKS_MAX 256 21 #endif 22 23 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful 24 * work, but not too big to avoid starving any other pipelines mapped to the 25 * same thread. For a pipeline that executes 10 instructions per packet, a 26 * quanta of 1000 instructions equates to processing 100 packets. 27 */ 28 #ifndef PIPELINE_INSTR_QUANTA 29 #define PIPELINE_INSTR_QUANTA 1000 30 #endif 31 32 /** 33 * In this design, there is a single control plane (CP) thread and one or multiple data plane (DP) 34 * threads. Each DP thread can run up to THREAD_PIPELINES_MAX pipelines and up to THREAD_BLOCKS_MAX 35 * blocks. 36 * 37 * The pipelines and blocks are single threaded, meaning that a given pipeline/block can be run by a 38 * single thread at any given time, so the same pipeline/block cannot show up in the list of 39 * pipelines/blocks of more than one thread at any specific moment. 40 * 41 * Each DP thread has its own context (struct thread instance), which it shares with the CP thread: 42 * - Read-write by the CP thread; 43 * - Read-only by the DP thread. 44 */ 45 struct block { 46 block_run_f block_func; 47 void *block; 48 }; 49 50 struct __rte_cache_aligned thread { 51 struct rte_swx_pipeline *pipelines[THREAD_PIPELINES_MAX]; 52 struct block *blocks[THREAD_BLOCKS_MAX]; 53 volatile uint64_t n_pipelines; 54 volatile uint64_t n_blocks; 55 int enabled; 56 }; 57 58 static struct thread threads[RTE_MAX_LCORE]; 59 60 /** 61 * Control plane (CP) thread. 62 */ 63 int 64 thread_init(void) 65 { 66 uint32_t thread_id; 67 int status = 0; 68 69 RTE_LCORE_FOREACH_WORKER(thread_id) { 70 struct thread *t = &threads[thread_id]; 71 uint32_t i; 72 73 t->enabled = 1; 74 75 for (i = 0; i < THREAD_BLOCKS_MAX; i++) { 76 struct block *b; 77 78 b = calloc(1, sizeof(struct block)); 79 if (!b) { 80 status = -ENOMEM; 81 goto error; 82 } 83 84 t->blocks[i] = b; 85 } 86 } 87 88 return 0; 89 90 error: 91 RTE_LCORE_FOREACH_WORKER(thread_id) { 92 struct thread *t = &threads[thread_id]; 93 uint32_t i; 94 95 t->enabled = 0; 96 97 for (i = 0; i < THREAD_BLOCKS_MAX; i++) { 98 free(t->blocks[i]); 99 t->blocks[i] = NULL; 100 } 101 } 102 103 return status; 104 } 105 106 static uint32_t 107 pipeline_find(struct rte_swx_pipeline *p) 108 { 109 uint32_t thread_id; 110 111 for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) { 112 struct thread *t = &threads[thread_id]; 113 uint32_t i; 114 115 if (!t->enabled) 116 continue; 117 118 for (i = 0; i < t->n_pipelines; i++) 119 if (t->pipelines[i] == p) 120 break; 121 } 122 123 return thread_id; 124 } 125 126 static uint32_t 127 block_find(void *b) 128 { 129 uint32_t thread_id; 130 131 for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) { 132 struct thread *t = &threads[thread_id]; 133 uint32_t i; 134 135 if (!t->enabled) 136 continue; 137 138 for (i = 0; i < t->n_blocks; i++) 139 if (t->blocks[i]->block == b) 140 break; 141 } 142 143 return thread_id; 144 } 145 146 /** 147 * Enable a given pipeline to run on a specific DP thread. 148 * 149 * CP thread: 150 * - Adds a new pipeline to the end of the DP thread pipeline list (t->pipelines[]); 151 * - Increments the DP thread number of pipelines (t->n_pipelines). It is important to make sure 152 * that t->pipelines[] update is completed BEFORE the t->n_pipelines update, hence the memory 153 * write barrier used below. 154 * 155 * DP thread: 156 * - Reads t->n_pipelines before starting every new iteration through t->pipelines[]. It detects 157 * the new pipeline when it sees the updated t->n_pipelines value; 158 * - If somehow the above condition is not met, so t->n_pipelines update is incorrectly taking 159 * place before the t->pipelines[] update is completed, then the DP thread will use an incorrect 160 * handle for the new pipeline, which can result in memory corruption or segmentation fault. 161 */ 162 int 163 pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id) 164 { 165 struct thread *t; 166 uint64_t n_pipelines; 167 168 /* Check input params */ 169 if (!p || thread_id >= RTE_MAX_LCORE) 170 return -EINVAL; 171 172 if (pipeline_find(p) < RTE_MAX_LCORE) 173 return -EEXIST; 174 175 t = &threads[thread_id]; 176 if (!t->enabled) 177 return -EINVAL; 178 179 n_pipelines = t->n_pipelines; 180 181 /* Check there is room for at least one more pipeline. */ 182 if (n_pipelines >= THREAD_PIPELINES_MAX) 183 return -ENOSPC; 184 185 /* Install the new pipeline. */ 186 t->pipelines[n_pipelines] = p; 187 rte_wmb(); 188 t->n_pipelines = n_pipelines + 1; 189 190 return 0; 191 } 192 193 /** 194 * Disable a given pipeline from running on any DP thread. 195 * 196 * CP thread: 197 * - Detects the thread that is running the given pipeline, if any; 198 * - Writes the last pipeline handle (pipeline_last = t->pipelines[t->n_pipelines - 1]) on the 199 * position of the pipeline to be disabled (t->pipelines[i] = pipeline_last) and decrements the 200 * number of pipelines running on the current thread (t->n_pipelines--). This approach makes sure 201 * that no holes with invalid locations are ever developed within the t->pipelines[] array. 202 * - If the memory barrier below is present, then t->n_pipelines update is guaranteed to take place 203 * after the t->pipelines[] update is completed. The possible DP thread behaviors are detailed 204 * below, which are all valid: 205 * - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last) 206 * exactly one time during the current dispatch loop iteration. This takes place when the DP 207 * thread sees the final value of t->n_pipelines; 208 * - Not run the removed pipeline at all, run all the other pipelines, except pipeline_last, 209 * exactly one time and the pipeline_last exactly two times during the current dispatch loop 210 * iteration. This takes place when the DP thread sees the initial value of t->n_pipelines. 211 * - If the memory barrier below is not present, then the t->n_pipelines update may be reordered by 212 * the CPU, so that it takes place before the t->pipelines[] update. The possible DP thread 213 * behaviors are detailed below, which are all valid: 214 * - Not run the removed pipeline at all, run all the other pipelines (including pipeline_last) 215 * exactly one time during the current dispatch loop iteration. This takes place when the DP 216 * thread sees the final values of the t->pipeline[] array; 217 * - Run the removed pipeline one last time, run all the other pipelines exactly one time, with 218 * the exception of the pipeline_last, which is not run during the current dispatch loop 219 * iteration. This takes place when the DP thread sees the initial values of t->pipeline[]. 220 * 221 * DP thread: 222 * - Reads t->n_pipelines before starting every new iteration through t->pipelines[]. 223 */ 224 void 225 pipeline_disable(struct rte_swx_pipeline *p) 226 { 227 struct thread *t; 228 uint64_t n_pipelines; 229 uint32_t thread_id, i; 230 231 /* Check input params */ 232 if (!p) 233 return; 234 235 /* Find the thread that runs this pipeline. */ 236 thread_id = pipeline_find(p); 237 if (thread_id == RTE_MAX_LCORE) 238 return; 239 240 t = &threads[thread_id]; 241 n_pipelines = t->n_pipelines; 242 243 for (i = 0; i < n_pipelines; i++) { 244 struct rte_swx_pipeline *pipeline = t->pipelines[i]; 245 246 if (pipeline != p) 247 continue; 248 249 if (i < n_pipelines - 1) { 250 struct rte_swx_pipeline *pipeline_last = t->pipelines[n_pipelines - 1]; 251 252 t->pipelines[i] = pipeline_last; 253 } 254 255 rte_wmb(); 256 t->n_pipelines = n_pipelines - 1; 257 258 return; 259 } 260 261 return; 262 } 263 264 int 265 block_enable(block_run_f block_func, void *block, uint32_t thread_id) 266 { 267 struct thread *t; 268 uint64_t n_blocks; 269 270 /* Check input params */ 271 if (!block_func || !block || thread_id >= RTE_MAX_LCORE) 272 return -EINVAL; 273 274 if (block_find(block) < RTE_MAX_LCORE) 275 return -EEXIST; 276 277 t = &threads[thread_id]; 278 if (!t->enabled) 279 return -EINVAL; 280 281 n_blocks = t->n_blocks; 282 283 /* Check there is room for at least one more block. */ 284 if (n_blocks >= THREAD_BLOCKS_MAX) 285 return -ENOSPC; 286 287 /* Install the new block. */ 288 t->blocks[n_blocks]->block_func = block_func; 289 t->blocks[n_blocks]->block = block; 290 291 rte_wmb(); 292 t->n_blocks = n_blocks + 1; 293 294 return 0; 295 } 296 297 void 298 block_disable(void *block) 299 { 300 struct thread *t; 301 uint64_t n_blocks; 302 uint32_t thread_id, i; 303 304 /* Check input params */ 305 if (!block) 306 return; 307 308 /* Find the thread that runs this block. */ 309 thread_id = block_find(block); 310 if (thread_id == RTE_MAX_LCORE) 311 return; 312 313 t = &threads[thread_id]; 314 n_blocks = t->n_blocks; 315 316 for (i = 0; i < n_blocks; i++) { 317 struct block *b = t->blocks[i]; 318 319 if (block != b->block) 320 continue; 321 322 if (i < n_blocks - 1) { 323 struct block *block_last = t->blocks[n_blocks - 1]; 324 325 t->blocks[i] = block_last; 326 } 327 328 rte_wmb(); 329 t->n_blocks = n_blocks - 1; 330 331 rte_wmb(); 332 t->blocks[n_blocks - 1] = b; 333 334 return; 335 } 336 } 337 338 /** 339 * Data plane (DP) threads. 340 * 341 342 343 * The t->n_pipelines variable is modified by the CP thread every time changes to the t->pipeline[] 344 * array are operated, so it is therefore very important that the latest value of t->n_pipelines is 345 * read by the DP thread at the beginning of every new dispatch loop iteration, otherwise a stale 346 * t->n_pipelines value may result in new pipelines not being detected, running pipelines that have 347 * been removed and are possibly no longer valid (e.g. when the pipeline_last is removed), running 348 * one pipeline (pipeline_last) twice as frequently than the rest of the pipelines (e.g. when a 349 * pipeline other than pipeline_last is removed), etc. This is the reason why t->n_pipelines is 350 * marked as volatile. 351 */ 352 int 353 thread_main(void *arg __rte_unused) 354 { 355 struct thread *t; 356 uint32_t thread_id; 357 358 thread_id = rte_lcore_id(); 359 t = &threads[thread_id]; 360 361 /* Dispatch loop. */ 362 for ( ; ; ) { 363 uint32_t i; 364 365 /* Pipelines. */ 366 for (i = 0; i < t->n_pipelines; i++) 367 rte_swx_pipeline_run(t->pipelines[i], PIPELINE_INSTR_QUANTA); 368 369 /* Blocks. */ 370 for (i = 0; i < t->n_blocks; i++) { 371 struct block *b = t->blocks[i]; 372 373 b->block_func(b->block); 374 } 375 } 376 377 return 0; 378 } 379