1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <stdlib.h> 6 7 #include <rte_common.h> 8 #include <rte_cycles.h> 9 #include <rte_lcore.h> 10 #include <rte_ring.h> 11 12 #include <rte_table_acl.h> 13 #include <rte_table_array.h> 14 #include <rte_table_hash.h> 15 #include <rte_table_lpm.h> 16 #include <rte_table_lpm_ipv6.h> 17 18 #include "obj.h" 19 #include "thread.h" 20 21 #ifndef THREAD_PIPELINES_MAX 22 #define THREAD_PIPELINES_MAX 256 23 #endif 24 25 #ifndef THREAD_MSGQ_SIZE 26 #define THREAD_MSGQ_SIZE 64 27 #endif 28 29 #ifndef THREAD_TIMER_PERIOD_MS 30 #define THREAD_TIMER_PERIOD_MS 100 31 #endif 32 33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful 34 * work, but not too big to avoid starving any other pipelines mapped to the 35 * same thread. For a pipeline that executes 10 instructions per packet, a 36 * quanta of 1000 instructions equates to processing 100 packets. 37 */ 38 #ifndef PIPELINE_INSTR_QUANTA 39 #define PIPELINE_INSTR_QUANTA 1000 40 #endif 41 42 /** 43 * Control thread: data plane thread context 44 */ 45 struct thread { 46 struct rte_ring *msgq_req; 47 struct rte_ring *msgq_rsp; 48 49 uint32_t enabled; 50 }; 51 52 static struct thread thread[RTE_MAX_LCORE]; 53 54 /** 55 * Data plane threads: context 56 */ 57 struct pipeline_data { 58 struct rte_swx_pipeline *p; 59 uint64_t timer_period; /* Measured in CPU cycles. */ 60 uint64_t time_next; 61 }; 62 63 struct thread_data { 64 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX]; 65 uint32_t n_pipelines; 66 67 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX]; 68 struct rte_ring *msgq_req; 69 struct rte_ring *msgq_rsp; 70 uint64_t timer_period; /* Measured in CPU cycles. */ 71 uint64_t time_next; 72 uint64_t time_next_min; 73 } __rte_cache_aligned; 74 75 static struct thread_data thread_data[RTE_MAX_LCORE]; 76 77 /** 78 * Control thread: data plane thread init 79 */ 80 static void 81 thread_free(void) 82 { 83 uint32_t i; 84 85 for (i = 0; i < RTE_MAX_LCORE; i++) { 86 struct thread *t = &thread[i]; 87 88 if (!rte_lcore_is_enabled(i)) 89 continue; 90 91 /* MSGQs */ 92 rte_ring_free(t->msgq_req); 93 94 rte_ring_free(t->msgq_rsp); 95 } 96 } 97 98 int 99 thread_init(void) 100 { 101 uint32_t i; 102 103 RTE_LCORE_FOREACH_WORKER(i) { 104 char name[NAME_MAX]; 105 struct rte_ring *msgq_req, *msgq_rsp; 106 struct thread *t = &thread[i]; 107 struct thread_data *t_data = &thread_data[i]; 108 uint32_t cpu_id = rte_lcore_to_socket_id(i); 109 110 /* MSGQs */ 111 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i); 112 113 msgq_req = rte_ring_create(name, 114 THREAD_MSGQ_SIZE, 115 cpu_id, 116 RING_F_SP_ENQ | RING_F_SC_DEQ); 117 118 if (msgq_req == NULL) { 119 thread_free(); 120 return -1; 121 } 122 123 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i); 124 125 msgq_rsp = rte_ring_create(name, 126 THREAD_MSGQ_SIZE, 127 cpu_id, 128 RING_F_SP_ENQ | RING_F_SC_DEQ); 129 130 if (msgq_rsp == NULL) { 131 thread_free(); 132 return -1; 133 } 134 135 /* Control thread records */ 136 t->msgq_req = msgq_req; 137 t->msgq_rsp = msgq_rsp; 138 t->enabled = 1; 139 140 /* Data plane thread records */ 141 t_data->n_pipelines = 0; 142 t_data->msgq_req = msgq_req; 143 t_data->msgq_rsp = msgq_rsp; 144 t_data->timer_period = 145 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000; 146 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period; 147 t_data->time_next_min = t_data->time_next; 148 } 149 150 return 0; 151 } 152 153 static inline int 154 thread_is_running(uint32_t thread_id) 155 { 156 enum rte_lcore_state_t thread_state; 157 158 thread_state = rte_eal_get_lcore_state(thread_id); 159 return (thread_state == RUNNING) ? 1 : 0; 160 } 161 162 /** 163 * Control thread & data plane threads: message passing 164 */ 165 enum thread_req_type { 166 THREAD_REQ_PIPELINE_ENABLE = 0, 167 THREAD_REQ_PIPELINE_DISABLE, 168 THREAD_REQ_MAX 169 }; 170 171 struct thread_msg_req { 172 enum thread_req_type type; 173 174 union { 175 struct { 176 struct rte_swx_pipeline *p; 177 uint32_t timer_period_ms; 178 } pipeline_enable; 179 180 struct { 181 struct rte_swx_pipeline *p; 182 } pipeline_disable; 183 }; 184 }; 185 186 struct thread_msg_rsp { 187 int status; 188 }; 189 190 /** 191 * Control thread 192 */ 193 static struct thread_msg_req * 194 thread_msg_alloc(void) 195 { 196 size_t size = RTE_MAX(sizeof(struct thread_msg_req), 197 sizeof(struct thread_msg_rsp)); 198 199 return calloc(1, size); 200 } 201 202 static void 203 thread_msg_free(struct thread_msg_rsp *rsp) 204 { 205 free(rsp); 206 } 207 208 static struct thread_msg_rsp * 209 thread_msg_send_recv(uint32_t thread_id, 210 struct thread_msg_req *req) 211 { 212 struct thread *t = &thread[thread_id]; 213 struct rte_ring *msgq_req = t->msgq_req; 214 struct rte_ring *msgq_rsp = t->msgq_rsp; 215 struct thread_msg_rsp *rsp; 216 int status; 217 218 /* send */ 219 do { 220 status = rte_ring_sp_enqueue(msgq_req, req); 221 } while (status == -ENOBUFS); 222 223 /* recv */ 224 do { 225 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp); 226 } while (status != 0); 227 228 return rsp; 229 } 230 231 static int 232 thread_is_pipeline_enabled(uint32_t thread_id, struct rte_swx_pipeline *p) 233 { 234 struct thread *t = &thread[thread_id]; 235 struct thread_data *td = &thread_data[thread_id]; 236 uint32_t i; 237 238 if (!t->enabled) 239 return 0; /* Pipeline NOT enabled on this thread. */ 240 241 for (i = 0; i < td->n_pipelines; i++) 242 if (td->p[i] == p) 243 return 1; /* Pipeline enabled on this thread. */ 244 245 return 0 /* Pipeline NOT enabled on this thread. */; 246 } 247 248 int 249 thread_pipeline_enable(uint32_t thread_id, struct rte_swx_pipeline *p, uint32_t timer_period_ms) 250 { 251 struct thread *t; 252 struct thread_msg_req *req; 253 struct thread_msg_rsp *rsp; 254 int status; 255 256 /* Check input params */ 257 if ((thread_id >= RTE_MAX_LCORE) || !p || !timer_period_ms) 258 return -1; 259 260 t = &thread[thread_id]; 261 if (t->enabled == 0) 262 return -1; 263 264 if (!thread_is_running(thread_id)) { 265 struct thread_data *td = &thread_data[thread_id]; 266 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines]; 267 268 if (td->n_pipelines >= THREAD_PIPELINES_MAX) 269 return -1; 270 271 /* Data plane thread */ 272 td->p[td->n_pipelines] = p; 273 274 tdp->p = p; 275 tdp->timer_period = (rte_get_tsc_hz() * timer_period_ms) / 1000; 276 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period; 277 278 td->n_pipelines++; 279 280 return 0; 281 } 282 283 /* Allocate request */ 284 req = thread_msg_alloc(); 285 if (req == NULL) 286 return -1; 287 288 /* Write request */ 289 req->type = THREAD_REQ_PIPELINE_ENABLE; 290 req->pipeline_enable.p = p; 291 req->pipeline_enable.timer_period_ms = timer_period_ms; 292 293 /* Send request and wait for response */ 294 rsp = thread_msg_send_recv(thread_id, req); 295 296 /* Read response */ 297 status = rsp->status; 298 299 /* Free response */ 300 thread_msg_free(rsp); 301 302 /* Request completion */ 303 if (status) 304 return status; 305 306 return 0; 307 } 308 309 int 310 thread_pipeline_disable(uint32_t thread_id, struct rte_swx_pipeline *p) 311 { 312 struct thread *t; 313 struct thread_msg_req *req; 314 struct thread_msg_rsp *rsp; 315 int status; 316 317 /* Check input params */ 318 if ((thread_id >= RTE_MAX_LCORE) || !p) 319 return -1; 320 321 t = &thread[thread_id]; 322 if (t->enabled == 0) 323 return -1; 324 325 if (!thread_is_pipeline_enabled(thread_id, p)) 326 return 0; 327 328 if (!thread_is_running(thread_id)) { 329 struct thread_data *td = &thread_data[thread_id]; 330 uint32_t i; 331 332 for (i = 0; i < td->n_pipelines; i++) { 333 struct pipeline_data *tdp = &td->pipeline_data[i]; 334 335 if (tdp->p != p) 336 continue; 337 338 /* Data plane thread */ 339 if (i < td->n_pipelines - 1) { 340 struct rte_swx_pipeline *pipeline_last = 341 td->p[td->n_pipelines - 1]; 342 struct pipeline_data *tdp_last = 343 &td->pipeline_data[td->n_pipelines - 1]; 344 345 td->p[i] = pipeline_last; 346 memcpy(tdp, tdp_last, sizeof(*tdp)); 347 } 348 349 td->n_pipelines--; 350 351 break; 352 } 353 354 return 0; 355 } 356 357 /* Allocate request */ 358 req = thread_msg_alloc(); 359 if (req == NULL) 360 return -1; 361 362 /* Write request */ 363 req->type = THREAD_REQ_PIPELINE_DISABLE; 364 req->pipeline_disable.p = p; 365 366 /* Send request and wait for response */ 367 rsp = thread_msg_send_recv(thread_id, req); 368 369 /* Read response */ 370 status = rsp->status; 371 372 /* Free response */ 373 thread_msg_free(rsp); 374 375 /* Request completion */ 376 if (status) 377 return status; 378 379 return 0; 380 } 381 382 /** 383 * Data plane threads: message handling 384 */ 385 static inline struct thread_msg_req * 386 thread_msg_recv(struct rte_ring *msgq_req) 387 { 388 struct thread_msg_req *req; 389 390 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req); 391 392 if (status != 0) 393 return NULL; 394 395 return req; 396 } 397 398 static inline void 399 thread_msg_send(struct rte_ring *msgq_rsp, 400 struct thread_msg_rsp *rsp) 401 { 402 int status; 403 404 do { 405 status = rte_ring_sp_enqueue(msgq_rsp, rsp); 406 } while (status == -ENOBUFS); 407 } 408 409 static struct thread_msg_rsp * 410 thread_msg_handle_pipeline_enable(struct thread_data *t, 411 struct thread_msg_req *req) 412 { 413 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 414 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines]; 415 416 /* Request */ 417 if (t->n_pipelines >= THREAD_PIPELINES_MAX) { 418 rsp->status = -1; 419 return rsp; 420 } 421 422 t->p[t->n_pipelines] = req->pipeline_enable.p; 423 424 p->p = req->pipeline_enable.p; 425 p->timer_period = (rte_get_tsc_hz() * 426 req->pipeline_enable.timer_period_ms) / 1000; 427 p->time_next = rte_get_tsc_cycles() + p->timer_period; 428 429 t->n_pipelines++; 430 431 /* Response */ 432 rsp->status = 0; 433 return rsp; 434 } 435 436 static struct thread_msg_rsp * 437 thread_msg_handle_pipeline_disable(struct thread_data *t, 438 struct thread_msg_req *req) 439 { 440 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 441 uint32_t n_pipelines = t->n_pipelines; 442 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p; 443 uint32_t i; 444 445 /* find pipeline */ 446 for (i = 0; i < n_pipelines; i++) { 447 struct pipeline_data *p = &t->pipeline_data[i]; 448 449 if (p->p != pipeline) 450 continue; 451 452 if (i < n_pipelines - 1) { 453 struct rte_swx_pipeline *pipeline_last = 454 t->p[n_pipelines - 1]; 455 struct pipeline_data *p_last = 456 &t->pipeline_data[n_pipelines - 1]; 457 458 t->p[i] = pipeline_last; 459 memcpy(p, p_last, sizeof(*p)); 460 } 461 462 t->n_pipelines--; 463 464 rsp->status = 0; 465 return rsp; 466 } 467 468 /* should not get here */ 469 rsp->status = 0; 470 return rsp; 471 } 472 473 static void 474 thread_msg_handle(struct thread_data *t) 475 { 476 for ( ; ; ) { 477 struct thread_msg_req *req; 478 struct thread_msg_rsp *rsp; 479 480 req = thread_msg_recv(t->msgq_req); 481 if (req == NULL) 482 break; 483 484 switch (req->type) { 485 case THREAD_REQ_PIPELINE_ENABLE: 486 rsp = thread_msg_handle_pipeline_enable(t, req); 487 break; 488 489 case THREAD_REQ_PIPELINE_DISABLE: 490 rsp = thread_msg_handle_pipeline_disable(t, req); 491 break; 492 493 default: 494 rsp = (struct thread_msg_rsp *) req; 495 rsp->status = -1; 496 } 497 498 thread_msg_send(t->msgq_rsp, rsp); 499 } 500 } 501 502 /** 503 * Data plane threads: main 504 */ 505 int 506 thread_main(void *arg __rte_unused) 507 { 508 struct thread_data *t; 509 uint32_t thread_id, i; 510 511 thread_id = rte_lcore_id(); 512 t = &thread_data[thread_id]; 513 514 /* Dispatch loop */ 515 for (i = 0; ; i++) { 516 uint32_t j; 517 518 /* Data Plane */ 519 for (j = 0; j < t->n_pipelines; j++) 520 rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA); 521 522 /* Control Plane */ 523 if ((i & 0xF) == 0) { 524 uint64_t time = rte_get_tsc_cycles(); 525 uint64_t time_next_min = UINT64_MAX; 526 527 if (time < t->time_next_min) 528 continue; 529 530 /* Thread message queues */ 531 { 532 uint64_t time_next = t->time_next; 533 534 if (time_next <= time) { 535 thread_msg_handle(t); 536 time_next = time + t->timer_period; 537 t->time_next = time_next; 538 } 539 540 if (time_next < time_next_min) 541 time_next_min = time_next; 542 } 543 544 t->time_next_min = time_next_min; 545 } 546 } 547 548 return 0; 549 } 550