1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <stdlib.h> 6 7 #include <rte_common.h> 8 #include <rte_cycles.h> 9 #include <rte_lcore.h> 10 #include <rte_ring.h> 11 12 #include <rte_table_acl.h> 13 #include <rte_table_array.h> 14 #include <rte_table_hash.h> 15 #include <rte_table_lpm.h> 16 #include <rte_table_lpm_ipv6.h> 17 18 #include "obj.h" 19 #include "thread.h" 20 21 #ifndef THREAD_PIPELINES_MAX 22 #define THREAD_PIPELINES_MAX 256 23 #endif 24 25 #ifndef THREAD_MSGQ_SIZE 26 #define THREAD_MSGQ_SIZE 64 27 #endif 28 29 #ifndef THREAD_TIMER_PERIOD_MS 30 #define THREAD_TIMER_PERIOD_MS 100 31 #endif 32 33 /* Pipeline instruction quanta: Needs to be big enough to do some meaningful 34 * work, but not too big to avoid starving any other pipelines mapped to the 35 * same thread. For a pipeline that executes 10 instructions per packet, a 36 * quanta of 1000 instructions equates to processing 100 packets. 37 */ 38 #ifndef PIPELINE_INSTR_QUANTA 39 #define PIPELINE_INSTR_QUANTA 1000 40 #endif 41 42 /** 43 * Control thread: data plane thread context 44 */ 45 struct thread { 46 struct rte_ring *msgq_req; 47 struct rte_ring *msgq_rsp; 48 49 uint32_t enabled; 50 }; 51 52 static struct thread thread[RTE_MAX_LCORE]; 53 54 /** 55 * Data plane threads: context 56 */ 57 struct pipeline_data { 58 struct rte_swx_pipeline *p; 59 uint64_t timer_period; /* Measured in CPU cycles. */ 60 uint64_t time_next; 61 }; 62 63 struct thread_data { 64 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX]; 65 uint32_t n_pipelines; 66 67 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX]; 68 struct rte_ring *msgq_req; 69 struct rte_ring *msgq_rsp; 70 uint64_t timer_period; /* Measured in CPU cycles. */ 71 uint64_t time_next; 72 uint64_t time_next_min; 73 } __rte_cache_aligned; 74 75 static struct thread_data thread_data[RTE_MAX_LCORE]; 76 77 /** 78 * Control thread: data plane thread init 79 */ 80 static void 81 thread_free(void) 82 { 83 uint32_t i; 84 85 for (i = 0; i < RTE_MAX_LCORE; i++) { 86 struct thread *t = &thread[i]; 87 88 if (!rte_lcore_is_enabled(i)) 89 continue; 90 91 /* MSGQs */ 92 rte_ring_free(t->msgq_req); 93 94 rte_ring_free(t->msgq_rsp); 95 } 96 } 97 98 int 99 thread_init(void) 100 { 101 uint32_t i; 102 103 RTE_LCORE_FOREACH_WORKER(i) { 104 char name[NAME_MAX]; 105 struct rte_ring *msgq_req, *msgq_rsp; 106 struct thread *t = &thread[i]; 107 struct thread_data *t_data = &thread_data[i]; 108 uint32_t cpu_id = rte_lcore_to_socket_id(i); 109 110 /* MSGQs */ 111 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i); 112 113 msgq_req = rte_ring_create(name, 114 THREAD_MSGQ_SIZE, 115 cpu_id, 116 RING_F_SP_ENQ | RING_F_SC_DEQ); 117 118 if (msgq_req == NULL) { 119 thread_free(); 120 return -1; 121 } 122 123 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i); 124 125 msgq_rsp = rte_ring_create(name, 126 THREAD_MSGQ_SIZE, 127 cpu_id, 128 RING_F_SP_ENQ | RING_F_SC_DEQ); 129 130 if (msgq_rsp == NULL) { 131 thread_free(); 132 return -1; 133 } 134 135 /* Control thread records */ 136 t->msgq_req = msgq_req; 137 t->msgq_rsp = msgq_rsp; 138 t->enabled = 1; 139 140 /* Data plane thread records */ 141 t_data->n_pipelines = 0; 142 t_data->msgq_req = msgq_req; 143 t_data->msgq_rsp = msgq_rsp; 144 t_data->timer_period = 145 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000; 146 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period; 147 t_data->time_next_min = t_data->time_next; 148 } 149 150 return 0; 151 } 152 153 static inline int 154 thread_is_running(uint32_t thread_id) 155 { 156 enum rte_lcore_state_t thread_state; 157 158 thread_state = rte_eal_get_lcore_state(thread_id); 159 return (thread_state == RUNNING) ? 1 : 0; 160 } 161 162 /** 163 * Control thread & data plane threads: message passing 164 */ 165 enum thread_req_type { 166 THREAD_REQ_PIPELINE_ENABLE = 0, 167 THREAD_REQ_PIPELINE_DISABLE, 168 THREAD_REQ_MAX 169 }; 170 171 struct thread_msg_req { 172 enum thread_req_type type; 173 174 union { 175 struct { 176 struct rte_swx_pipeline *p; 177 uint32_t timer_period_ms; 178 } pipeline_enable; 179 180 struct { 181 struct rte_swx_pipeline *p; 182 } pipeline_disable; 183 }; 184 }; 185 186 struct thread_msg_rsp { 187 int status; 188 }; 189 190 /** 191 * Control thread 192 */ 193 static struct thread_msg_req * 194 thread_msg_alloc(void) 195 { 196 size_t size = RTE_MAX(sizeof(struct thread_msg_req), 197 sizeof(struct thread_msg_rsp)); 198 199 return calloc(1, size); 200 } 201 202 static void 203 thread_msg_free(struct thread_msg_rsp *rsp) 204 { 205 free(rsp); 206 } 207 208 static struct thread_msg_rsp * 209 thread_msg_send_recv(uint32_t thread_id, 210 struct thread_msg_req *req) 211 { 212 struct thread *t = &thread[thread_id]; 213 struct rte_ring *msgq_req = t->msgq_req; 214 struct rte_ring *msgq_rsp = t->msgq_rsp; 215 struct thread_msg_rsp *rsp; 216 int status; 217 218 /* send */ 219 do { 220 status = rte_ring_sp_enqueue(msgq_req, req); 221 } while (status == -ENOBUFS); 222 223 /* recv */ 224 do { 225 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp); 226 } while (status != 0); 227 228 return rsp; 229 } 230 231 int 232 thread_pipeline_enable(uint32_t thread_id, 233 struct obj *obj, 234 const char *pipeline_name) 235 { 236 struct pipeline *p = pipeline_find(obj, pipeline_name); 237 struct thread *t; 238 struct thread_msg_req *req; 239 struct thread_msg_rsp *rsp; 240 int status; 241 242 /* Check input params */ 243 if ((thread_id >= RTE_MAX_LCORE) || 244 (p == NULL)) 245 return -1; 246 247 t = &thread[thread_id]; 248 if (t->enabled == 0) 249 return -1; 250 251 if (!thread_is_running(thread_id)) { 252 struct thread_data *td = &thread_data[thread_id]; 253 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines]; 254 255 if (td->n_pipelines >= THREAD_PIPELINES_MAX) 256 return -1; 257 258 /* Data plane thread */ 259 td->p[td->n_pipelines] = p->p; 260 261 tdp->p = p->p; 262 tdp->timer_period = 263 (rte_get_tsc_hz() * p->timer_period_ms) / 1000; 264 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period; 265 266 td->n_pipelines++; 267 268 /* Pipeline */ 269 p->thread_id = thread_id; 270 p->enabled = 1; 271 272 return 0; 273 } 274 275 /* Allocate request */ 276 req = thread_msg_alloc(); 277 if (req == NULL) 278 return -1; 279 280 /* Write request */ 281 req->type = THREAD_REQ_PIPELINE_ENABLE; 282 req->pipeline_enable.p = p->p; 283 req->pipeline_enable.timer_period_ms = p->timer_period_ms; 284 285 /* Send request and wait for response */ 286 rsp = thread_msg_send_recv(thread_id, req); 287 288 /* Read response */ 289 status = rsp->status; 290 291 /* Free response */ 292 thread_msg_free(rsp); 293 294 /* Request completion */ 295 if (status) 296 return status; 297 298 p->thread_id = thread_id; 299 p->enabled = 1; 300 301 return 0; 302 } 303 304 int 305 thread_pipeline_disable(uint32_t thread_id, 306 struct obj *obj, 307 const char *pipeline_name) 308 { 309 struct pipeline *p = pipeline_find(obj, pipeline_name); 310 struct thread *t; 311 struct thread_msg_req *req; 312 struct thread_msg_rsp *rsp; 313 int status; 314 315 /* Check input params */ 316 if ((thread_id >= RTE_MAX_LCORE) || 317 (p == NULL)) 318 return -1; 319 320 t = &thread[thread_id]; 321 if (t->enabled == 0) 322 return -1; 323 324 if (p->enabled == 0) 325 return 0; 326 327 if (p->thread_id != thread_id) 328 return -1; 329 330 if (!thread_is_running(thread_id)) { 331 struct thread_data *td = &thread_data[thread_id]; 332 uint32_t i; 333 334 for (i = 0; i < td->n_pipelines; i++) { 335 struct pipeline_data *tdp = &td->pipeline_data[i]; 336 337 if (tdp->p != p->p) 338 continue; 339 340 /* Data plane thread */ 341 if (i < td->n_pipelines - 1) { 342 struct rte_swx_pipeline *pipeline_last = 343 td->p[td->n_pipelines - 1]; 344 struct pipeline_data *tdp_last = 345 &td->pipeline_data[td->n_pipelines - 1]; 346 347 td->p[i] = pipeline_last; 348 memcpy(tdp, tdp_last, sizeof(*tdp)); 349 } 350 351 td->n_pipelines--; 352 353 /* Pipeline */ 354 p->enabled = 0; 355 356 break; 357 } 358 359 return 0; 360 } 361 362 /* Allocate request */ 363 req = thread_msg_alloc(); 364 if (req == NULL) 365 return -1; 366 367 /* Write request */ 368 req->type = THREAD_REQ_PIPELINE_DISABLE; 369 req->pipeline_disable.p = p->p; 370 371 /* Send request and wait for response */ 372 rsp = thread_msg_send_recv(thread_id, req); 373 374 /* Read response */ 375 status = rsp->status; 376 377 /* Free response */ 378 thread_msg_free(rsp); 379 380 /* Request completion */ 381 if (status) 382 return status; 383 384 p->enabled = 0; 385 386 return 0; 387 } 388 389 /** 390 * Data plane threads: message handling 391 */ 392 static inline struct thread_msg_req * 393 thread_msg_recv(struct rte_ring *msgq_req) 394 { 395 struct thread_msg_req *req; 396 397 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req); 398 399 if (status != 0) 400 return NULL; 401 402 return req; 403 } 404 405 static inline void 406 thread_msg_send(struct rte_ring *msgq_rsp, 407 struct thread_msg_rsp *rsp) 408 { 409 int status; 410 411 do { 412 status = rte_ring_sp_enqueue(msgq_rsp, rsp); 413 } while (status == -ENOBUFS); 414 } 415 416 static struct thread_msg_rsp * 417 thread_msg_handle_pipeline_enable(struct thread_data *t, 418 struct thread_msg_req *req) 419 { 420 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 421 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines]; 422 423 /* Request */ 424 if (t->n_pipelines >= THREAD_PIPELINES_MAX) { 425 rsp->status = -1; 426 return rsp; 427 } 428 429 t->p[t->n_pipelines] = req->pipeline_enable.p; 430 431 p->p = req->pipeline_enable.p; 432 p->timer_period = (rte_get_tsc_hz() * 433 req->pipeline_enable.timer_period_ms) / 1000; 434 p->time_next = rte_get_tsc_cycles() + p->timer_period; 435 436 t->n_pipelines++; 437 438 /* Response */ 439 rsp->status = 0; 440 return rsp; 441 } 442 443 static struct thread_msg_rsp * 444 thread_msg_handle_pipeline_disable(struct thread_data *t, 445 struct thread_msg_req *req) 446 { 447 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 448 uint32_t n_pipelines = t->n_pipelines; 449 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p; 450 uint32_t i; 451 452 /* find pipeline */ 453 for (i = 0; i < n_pipelines; i++) { 454 struct pipeline_data *p = &t->pipeline_data[i]; 455 456 if (p->p != pipeline) 457 continue; 458 459 if (i < n_pipelines - 1) { 460 struct rte_swx_pipeline *pipeline_last = 461 t->p[n_pipelines - 1]; 462 struct pipeline_data *p_last = 463 &t->pipeline_data[n_pipelines - 1]; 464 465 t->p[i] = pipeline_last; 466 memcpy(p, p_last, sizeof(*p)); 467 } 468 469 t->n_pipelines--; 470 471 rsp->status = 0; 472 return rsp; 473 } 474 475 /* should not get here */ 476 rsp->status = 0; 477 return rsp; 478 } 479 480 static void 481 thread_msg_handle(struct thread_data *t) 482 { 483 for ( ; ; ) { 484 struct thread_msg_req *req; 485 struct thread_msg_rsp *rsp; 486 487 req = thread_msg_recv(t->msgq_req); 488 if (req == NULL) 489 break; 490 491 switch (req->type) { 492 case THREAD_REQ_PIPELINE_ENABLE: 493 rsp = thread_msg_handle_pipeline_enable(t, req); 494 break; 495 496 case THREAD_REQ_PIPELINE_DISABLE: 497 rsp = thread_msg_handle_pipeline_disable(t, req); 498 break; 499 500 default: 501 rsp = (struct thread_msg_rsp *) req; 502 rsp->status = -1; 503 } 504 505 thread_msg_send(t->msgq_rsp, rsp); 506 } 507 } 508 509 /** 510 * Data plane threads: main 511 */ 512 int 513 thread_main(void *arg __rte_unused) 514 { 515 struct thread_data *t; 516 uint32_t thread_id, i; 517 518 thread_id = rte_lcore_id(); 519 t = &thread_data[thread_id]; 520 521 /* Dispatch loop */ 522 for (i = 0; ; i++) { 523 uint32_t j; 524 525 /* Data Plane */ 526 for (j = 0; j < t->n_pipelines; j++) 527 rte_swx_pipeline_run(t->p[j], PIPELINE_INSTR_QUANTA); 528 529 /* Control Plane */ 530 if ((i & 0xF) == 0) { 531 uint64_t time = rte_get_tsc_cycles(); 532 uint64_t time_next_min = UINT64_MAX; 533 534 if (time < t->time_next_min) 535 continue; 536 537 /* Thread message queues */ 538 { 539 uint64_t time_next = t->time_next; 540 541 if (time_next <= time) { 542 thread_msg_handle(t); 543 time_next = time + t->timer_period; 544 t->time_next = time_next; 545 } 546 547 if (time_next < time_next_min) 548 time_next_min = time_next; 549 } 550 551 t->time_next_min = time_next_min; 552 } 553 } 554 555 return 0; 556 } 557