1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <stdlib.h> 6 7 #include <rte_common.h> 8 #include <rte_cycles.h> 9 #include <rte_lcore.h> 10 #include <rte_ring.h> 11 12 #include <rte_table_acl.h> 13 #include <rte_table_array.h> 14 #include <rte_table_hash.h> 15 #include <rte_table_lpm.h> 16 #include <rte_table_lpm_ipv6.h> 17 18 #include "obj.h" 19 #include "thread.h" 20 21 #ifndef THREAD_PIPELINES_MAX 22 #define THREAD_PIPELINES_MAX 256 23 #endif 24 25 #ifndef THREAD_MSGQ_SIZE 26 #define THREAD_MSGQ_SIZE 64 27 #endif 28 29 #ifndef THREAD_TIMER_PERIOD_MS 30 #define THREAD_TIMER_PERIOD_MS 100 31 #endif 32 33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful 34 * work, but not too big to avoid starving any other pipelines mapped to the 35 * same thread. For a pipeline that executes 10 instructions per packet, a 36 * quanta of 1000 instructions equates to processing 100 packets. 37 */ 38 #ifndef PIPELINE_INSTR_QUANTA 39 #define PIPELINE_INSTR_QUANTA 1000 40 #endif 41 42 /** 43 * Control thread: data plane thread context 44 */ 45 struct thread { 46 struct rte_ring *msgq_req; 47 struct rte_ring *msgq_rsp; 48 49 uint32_t enabled; 50 }; 51 52 static struct thread thread[RTE_MAX_LCORE]; 53 54 /** 55 * Data plane threads: context 56 */ 57 struct pipeline_data { 58 struct rte_swx_pipeline *p; 59 uint64_t timer_period; /* Measured in CPU cycles. */ 60 uint64_t time_next; 61 }; 62 63 struct thread_data { 64 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX]; 65 uint32_t n_pipelines; 66 67 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX]; 68 struct rte_ring *msgq_req; 69 struct rte_ring *msgq_rsp; 70 uint64_t timer_period; /* Measured in CPU cycles. */ 71 uint64_t time_next; 72 uint64_t time_next_min; 73 } __rte_cache_aligned; 74 75 static struct thread_data thread_data[RTE_MAX_LCORE]; 76 77 /** 78 * Control thread: data plane thread init 79 */ 80 static void 81 thread_free(void) 82 { 83 uint32_t i; 84 85 for (i = 0; i < RTE_MAX_LCORE; i++) { 86 struct thread *t = &thread[i]; 87 88 if (!rte_lcore_is_enabled(i)) 89 continue; 90 91 /* MSGQs */ 92 if (t->msgq_req) 93 rte_ring_free(t->msgq_req); 94 95 if (t->msgq_rsp) 96 rte_ring_free(t->msgq_rsp); 97 } 98 } 99 100 int 101 thread_init(void) 102 { 103 uint32_t i; 104 105 RTE_LCORE_FOREACH_WORKER(i) { 106 char name[NAME_MAX]; 107 struct rte_ring *msgq_req, *msgq_rsp; 108 struct thread *t = &thread[i]; 109 struct thread_data *t_data = &thread_data[i]; 110 uint32_t cpu_id = rte_lcore_to_socket_id(i); 111 112 /* MSGQs */ 113 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i); 114 115 msgq_req = rte_ring_create(name, 116 THREAD_MSGQ_SIZE, 117 cpu_id, 118 RING_F_SP_ENQ | RING_F_SC_DEQ); 119 120 if (msgq_req == NULL) { 121 thread_free(); 122 return -1; 123 } 124 125 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i); 126 127 msgq_rsp = rte_ring_create(name, 128 THREAD_MSGQ_SIZE, 129 cpu_id, 130 RING_F_SP_ENQ | RING_F_SC_DEQ); 131 132 if (msgq_rsp == NULL) { 133 thread_free(); 134 return -1; 135 } 136 137 /* Control thread records */ 138 t->msgq_req = msgq_req; 139 t->msgq_rsp = msgq_rsp; 140 t->enabled = 1; 141 142 /* Data plane thread records */ 143 t_data->n_pipelines = 0; 144 t_data->msgq_req = msgq_req; 145 t_data->msgq_rsp = msgq_rsp; 146 t_data->timer_period = 147 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000; 148 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period; 149 t_data->time_next_min = t_data->time_next; 150 } 151 152 return 0; 153 } 154 155 static inline int 156 thread_is_running(uint32_t thread_id) 157 { 158 enum rte_lcore_state_t thread_state; 159 160 thread_state = rte_eal_get_lcore_state(thread_id); 161 return (thread_state == RUNNING) ? 1 : 0; 162 } 163 164 /** 165 * Control thread & data plane threads: message passing 166 */ 167 enum thread_req_type { 168 THREAD_REQ_PIPELINE_ENABLE = 0, 169 THREAD_REQ_PIPELINE_DISABLE, 170 THREAD_REQ_MAX 171 }; 172 173 struct thread_msg_req { 174 enum thread_req_type type; 175 176 union { 177 struct { 178 struct rte_swx_pipeline *p; 179 uint32_t timer_period_ms; 180 } pipeline_enable; 181 182 struct { 183 struct rte_swx_pipeline *p; 184 } pipeline_disable; 185 }; 186 }; 187 188 struct thread_msg_rsp { 189 int status; 190 }; 191 192 /** 193 * Control thread 194 */ 195 static struct thread_msg_req * 196 thread_msg_alloc(void) 197 { 198 size_t size = RTE_MAX(sizeof(struct thread_msg_req), 199 sizeof(struct thread_msg_rsp)); 200 201 return calloc(1, size); 202 } 203 204 static void 205 thread_msg_free(struct thread_msg_rsp *rsp) 206 { 207 free(rsp); 208 } 209 210 static struct thread_msg_rsp * 211 thread_msg_send_recv(uint32_t thread_id, 212 struct thread_msg_req *req) 213 { 214 struct thread *t = &thread[thread_id]; 215 struct rte_ring *msgq_req = t->msgq_req; 216 struct rte_ring *msgq_rsp = t->msgq_rsp; 217 struct thread_msg_rsp *rsp; 218 int status; 219 220 /* send */ 221 do { 222 status = rte_ring_sp_enqueue(msgq_req, req); 223 } while (status == -ENOBUFS); 224 225 /* recv */ 226 do { 227 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp); 228 } while (status != 0); 229 230 return rsp; 231 } 232 233 int 234 thread_pipeline_enable(uint32_t thread_id, 235 struct obj *obj, 236 const char *pipeline_name) 237 { 238 struct pipeline *p = pipeline_find(obj, pipeline_name); 239 struct thread *t; 240 struct thread_msg_req *req; 241 struct thread_msg_rsp *rsp; 242 int status; 243 244 /* Check input params */ 245 if ((thread_id >= RTE_MAX_LCORE) || 246 (p == NULL)) 247 return -1; 248 249 t = &thread[thread_id]; 250 if (t->enabled == 0) 251 return -1; 252 253 if (!thread_is_running(thread_id)) { 254 struct thread_data *td = &thread_data[thread_id]; 255 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines]; 256 257 if (td->n_pipelines >= THREAD_PIPELINES_MAX) 258 return -1; 259 260 /* Data plane thread */ 261 td->p[td->n_pipelines] = p->p; 262 263 tdp->p = p->p; 264 tdp->timer_period = 265 (rte_get_tsc_hz() * p->timer_period_ms) / 1000; 266 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period; 267 268 td->n_pipelines++; 269 270 /* Pipeline */ 271 p->thread_id = thread_id; 272 p->enabled = 1; 273 274 return 0; 275 } 276 277 /* Allocate request */ 278 req = thread_msg_alloc(); 279 if (req == NULL) 280 return -1; 281 282 /* Write request */ 283 req->type = THREAD_REQ_PIPELINE_ENABLE; 284 req->pipeline_enable.p = p->p; 285 req->pipeline_enable.timer_period_ms = p->timer_period_ms; 286 287 /* Send request and wait for response */ 288 rsp = thread_msg_send_recv(thread_id, req); 289 290 /* Read response */ 291 status = rsp->status; 292 293 /* Free response */ 294 thread_msg_free(rsp); 295 296 /* Request completion */ 297 if (status) 298 return status; 299 300 p->thread_id = thread_id; 301 p->enabled = 1; 302 303 return 0; 304 } 305 306 int 307 thread_pipeline_disable(uint32_t thread_id, 308 struct obj *obj, 309 const char *pipeline_name) 310 { 311 struct pipeline *p = pipeline_find(obj, pipeline_name); 312 struct thread *t; 313 struct thread_msg_req *req; 314 struct thread_msg_rsp *rsp; 315 int status; 316 317 /* Check input params */ 318 if ((thread_id >= RTE_MAX_LCORE) || 319 (p == NULL)) 320 return -1; 321 322 t = &thread[thread_id]; 323 if (t->enabled == 0) 324 return -1; 325 326 if (p->enabled == 0) 327 return 0; 328 329 if (p->thread_id != thread_id) 330 return -1; 331 332 if (!thread_is_running(thread_id)) { 333 struct thread_data *td = &thread_data[thread_id]; 334 uint32_t i; 335 336 for (i = 0; i < td->n_pipelines; i++) { 337 struct pipeline_data *tdp = &td->pipeline_data[i]; 338 339 if (tdp->p != p->p) 340 continue; 341 342 /* Data plane thread */ 343 if (i < td->n_pipelines - 1) { 344 struct rte_swx_pipeline *pipeline_last = 345 td->p[td->n_pipelines - 1]; 346 struct pipeline_data *tdp_last = 347 &td->pipeline_data[td->n_pipelines - 1]; 348 349 td->p[i] = pipeline_last; 350 memcpy(tdp, tdp_last, sizeof(*tdp)); 351 } 352 353 td->n_pipelines--; 354 355 /* Pipeline */ 356 p->enabled = 0; 357 358 break; 359 } 360 361 return 0; 362 } 363 364 /* Allocate request */ 365 req = thread_msg_alloc(); 366 if (req == NULL) 367 return -1; 368 369 /* Write request */ 370 req->type = THREAD_REQ_PIPELINE_DISABLE; 371 req->pipeline_disable.p = p->p; 372 373 /* Send request and wait for response */ 374 rsp = thread_msg_send_recv(thread_id, req); 375 376 /* Read response */ 377 status = rsp->status; 378 379 /* Free response */ 380 thread_msg_free(rsp); 381 382 /* Request completion */ 383 if (status) 384 return status; 385 386 p->enabled = 0; 387 388 return 0; 389 } 390 391 /** 392 * Data plane threads: message handling 393 */ 394 static inline struct thread_msg_req * 395 thread_msg_recv(struct rte_ring *msgq_req) 396 { 397 struct thread_msg_req *req; 398 399 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req); 400 401 if (status != 0) 402 return NULL; 403 404 return req; 405 } 406 407 static inline void 408 thread_msg_send(struct rte_ring *msgq_rsp, 409 struct thread_msg_rsp *rsp) 410 { 411 int status; 412 413 do { 414 status = rte_ring_sp_enqueue(msgq_rsp, rsp); 415 } while (status == -ENOBUFS); 416 } 417 418 static struct thread_msg_rsp * 419 thread_msg_handle_pipeline_enable(struct thread_data *t, 420 struct thread_msg_req *req) 421 { 422 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 423 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines]; 424 425 /* Request */ 426 if (t->n_pipelines >= THREAD_PIPELINES_MAX) { 427 rsp->status = -1; 428 return rsp; 429 } 430 431 t->p[t->n_pipelines] = req->pipeline_enable.p; 432 433 p->p = req->pipeline_enable.p; 434 p->timer_period = (rte_get_tsc_hz() * 435 req->pipeline_enable.timer_period_ms) / 1000; 436 p->time_next = rte_get_tsc_cycles() + p->timer_period; 437 438 t->n_pipelines++; 439 440 /* Response */ 441 rsp->status = 0; 442 return rsp; 443 } 444 445 static struct thread_msg_rsp * 446 thread_msg_handle_pipeline_disable(struct thread_data *t, 447 struct thread_msg_req *req) 448 { 449 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 450 uint32_t n_pipelines = t->n_pipelines; 451 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p; 452 uint32_t i; 453 454 /* find pipeline */ 455 for (i = 0; i < n_pipelines; i++) { 456 struct pipeline_data *p = &t->pipeline_data[i]; 457 458 if (p->p != pipeline) 459 continue; 460 461 if (i < n_pipelines - 1) { 462 struct rte_swx_pipeline *pipeline_last = 463 t->p[n_pipelines - 1]; 464 struct pipeline_data *p_last = 465 &t->pipeline_data[n_pipelines - 1]; 466 467 t->p[i] = pipeline_last; 468 memcpy(p, p_last, sizeof(*p)); 469 } 470 471 t->n_pipelines--; 472 473 rsp->status = 0; 474 return rsp; 475 } 476 477 /* should not get here */ 478 rsp->status = 0; 479 return rsp; 480 } 481 482 static void 483 thread_msg_handle(struct thread_data *t) 484 { 485 for ( ; ; ) { 486 struct thread_msg_req *req; 487 struct thread_msg_rsp *rsp; 488 489 req = thread_msg_recv(t->msgq_req); 490 if (req == NULL) 491 break; 492 493 switch (req->type) { 494 case THREAD_REQ_PIPELINE_ENABLE: 495 rsp = thread_msg_handle_pipeline_enable(t, req); 496 break; 497 498 case THREAD_REQ_PIPELINE_DISABLE: 499 rsp = thread_msg_handle_pipeline_disable(t, req); 500 break; 501 502 default: 503 rsp = (struct thread_msg_rsp *) req; 504 rsp->status = -1; 505 } 506 507 thread_msg_send(t->msgq_rsp, rsp); 508 } 509 } 510 511 /** 512 * Data plane threads: main 513 */ 514 int 515 thread_main(void *arg __rte_unused) 516 { 517 struct thread_data *t; 518 uint32_t thread_id, i; 519 520 thread_id = rte_lcore_id(); 521 t = &thread_data[thread_id]; 522 523 /* Dispatch loop */ 524 for (i = 0; ; i++) { 525 uint32_t j; 526 527 /* Data Plane */ 528 for (j = 0; j < t->n_pipelines; j++) 529 rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA); 530 531 /* Control Plane */ 532 if ((i & 0xF) == 0) { 533 uint64_t time = rte_get_tsc_cycles(); 534 uint64_t time_next_min = UINT64_MAX; 535 536 if (time < t->time_next_min) 537 continue; 538 539 /* Thread message queues */ 540 { 541 uint64_t time_next = t->time_next; 542 543 if (time_next <= time) { 544 thread_msg_handle(t); 545 time_next = time + t->timer_period; 546 t->time_next = time_next; 547 } 548 549 if (time_next < time_next_min) 550 time_next_min = time_next; 551 } 552 553 t->time_next_min = time_next_min; 554 } 555 } 556 557 return 0; 558 } 559