1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <stdlib.h> 6 7 #include <rte_common.h> 8 #include <rte_cycles.h> 9 #include <rte_lcore.h> 10 #include <rte_ring.h> 11 12 #include <rte_table_acl.h> 13 #include <rte_table_array.h> 14 #include <rte_table_hash.h> 15 #include <rte_table_lpm.h> 16 #include <rte_table_lpm_ipv6.h> 17 18 #include "obj.h" 19 #include "thread.h" 20 21 #ifndef THREAD_PIPELINES_MAX 22 #define THREAD_PIPELINES_MAX 256 23 #endif 24 25 #ifndef THREAD_MSGQ_SIZE 26 #define THREAD_MSGQ_SIZE 64 27 #endif 28 29 #ifndef THREAD_TIMER_PERIOD_MS 30 #define THREAD_TIMER_PERIOD_MS 100 31 #endif 32 33 /** 34 * Control thread: data plane thread context 35 */ 36 struct thread { 37 struct rte_ring *msgq_req; 38 struct rte_ring *msgq_rsp; 39 40 uint32_t enabled; 41 }; 42 43 static struct thread thread[RTE_MAX_LCORE]; 44 45 /** 46 * Data plane threads: context 47 */ 48 struct pipeline_data { 49 struct rte_swx_pipeline *p; 50 uint64_t timer_period; /* Measured in CPU cycles. */ 51 uint64_t time_next; 52 }; 53 54 struct thread_data { 55 struct rte_swx_pipeline *p[THREAD_PIPELINES_MAX]; 56 uint32_t n_pipelines; 57 58 struct pipeline_data pipeline_data[THREAD_PIPELINES_MAX]; 59 struct rte_ring *msgq_req; 60 struct rte_ring *msgq_rsp; 61 uint64_t timer_period; /* Measured in CPU cycles. */ 62 uint64_t time_next; 63 uint64_t time_next_min; 64 } __rte_cache_aligned; 65 66 static struct thread_data thread_data[RTE_MAX_LCORE]; 67 68 /** 69 * Control thread: data plane thread init 70 */ 71 static void 72 thread_free(void) 73 { 74 uint32_t i; 75 76 for (i = 0; i < RTE_MAX_LCORE; i++) { 77 struct thread *t = &thread[i]; 78 79 if (!rte_lcore_is_enabled(i)) 80 continue; 81 82 /* MSGQs */ 83 if (t->msgq_req) 84 rte_ring_free(t->msgq_req); 85 86 if (t->msgq_rsp) 87 rte_ring_free(t->msgq_rsp); 88 } 89 } 90 91 int 92 thread_init(void) 93 { 94 uint32_t i; 95 96 RTE_LCORE_FOREACH_SLAVE(i) { 97 char name[NAME_MAX]; 98 struct rte_ring *msgq_req, *msgq_rsp; 99 struct thread *t = &thread[i]; 100 struct thread_data *t_data = &thread_data[i]; 101 uint32_t cpu_id = rte_lcore_to_socket_id(i); 102 103 /* MSGQs */ 104 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-REQ", i); 105 106 msgq_req = rte_ring_create(name, 107 THREAD_MSGQ_SIZE, 108 cpu_id, 109 RING_F_SP_ENQ | RING_F_SC_DEQ); 110 111 if (msgq_req == NULL) { 112 thread_free(); 113 return -1; 114 } 115 116 snprintf(name, sizeof(name), "THREAD-%04x-MSGQ-RSP", i); 117 118 msgq_rsp = rte_ring_create(name, 119 THREAD_MSGQ_SIZE, 120 cpu_id, 121 RING_F_SP_ENQ | RING_F_SC_DEQ); 122 123 if (msgq_rsp == NULL) { 124 thread_free(); 125 return -1; 126 } 127 128 /* Control thread records */ 129 t->msgq_req = msgq_req; 130 t->msgq_rsp = msgq_rsp; 131 t->enabled = 1; 132 133 /* Data plane thread records */ 134 t_data->n_pipelines = 0; 135 t_data->msgq_req = msgq_req; 136 t_data->msgq_rsp = msgq_rsp; 137 t_data->timer_period = 138 (rte_get_tsc_hz() * THREAD_TIMER_PERIOD_MS) / 1000; 139 t_data->time_next = rte_get_tsc_cycles() + t_data->timer_period; 140 t_data->time_next_min = t_data->time_next; 141 } 142 143 return 0; 144 } 145 146 static inline int 147 thread_is_running(uint32_t thread_id) 148 { 149 enum rte_lcore_state_t thread_state; 150 151 thread_state = rte_eal_get_lcore_state(thread_id); 152 return (thread_state == RUNNING) ? 1 : 0; 153 } 154 155 /** 156 * Control thread & data plane threads: message passing 157 */ 158 enum thread_req_type { 159 THREAD_REQ_PIPELINE_ENABLE = 0, 160 THREAD_REQ_PIPELINE_DISABLE, 161 THREAD_REQ_MAX 162 }; 163 164 struct thread_msg_req { 165 enum thread_req_type type; 166 167 union { 168 struct { 169 struct rte_swx_pipeline *p; 170 uint32_t timer_period_ms; 171 } pipeline_enable; 172 173 struct { 174 struct rte_swx_pipeline *p; 175 } pipeline_disable; 176 }; 177 }; 178 179 struct thread_msg_rsp { 180 int status; 181 }; 182 183 /** 184 * Control thread 185 */ 186 static struct thread_msg_req * 187 thread_msg_alloc(void) 188 { 189 size_t size = RTE_MAX(sizeof(struct thread_msg_req), 190 sizeof(struct thread_msg_rsp)); 191 192 return calloc(1, size); 193 } 194 195 static void 196 thread_msg_free(struct thread_msg_rsp *rsp) 197 { 198 free(rsp); 199 } 200 201 static struct thread_msg_rsp * 202 thread_msg_send_recv(uint32_t thread_id, 203 struct thread_msg_req *req) 204 { 205 struct thread *t = &thread[thread_id]; 206 struct rte_ring *msgq_req = t->msgq_req; 207 struct rte_ring *msgq_rsp = t->msgq_rsp; 208 struct thread_msg_rsp *rsp; 209 int status; 210 211 /* send */ 212 do { 213 status = rte_ring_sp_enqueue(msgq_req, req); 214 } while (status == -ENOBUFS); 215 216 /* recv */ 217 do { 218 status = rte_ring_sc_dequeue(msgq_rsp, (void **) &rsp); 219 } while (status != 0); 220 221 return rsp; 222 } 223 224 int 225 thread_pipeline_enable(uint32_t thread_id, 226 struct obj *obj, 227 const char *pipeline_name) 228 { 229 struct pipeline *p = pipeline_find(obj, pipeline_name); 230 struct thread *t; 231 struct thread_msg_req *req; 232 struct thread_msg_rsp *rsp; 233 int status; 234 235 /* Check input params */ 236 if ((thread_id >= RTE_MAX_LCORE) || 237 (p == NULL)) 238 return -1; 239 240 t = &thread[thread_id]; 241 if (t->enabled == 0) 242 return -1; 243 244 if (!thread_is_running(thread_id)) { 245 struct thread_data *td = &thread_data[thread_id]; 246 struct pipeline_data *tdp = &td->pipeline_data[td->n_pipelines]; 247 248 if (td->n_pipelines >= THREAD_PIPELINES_MAX) 249 return -1; 250 251 /* Data plane thread */ 252 td->p[td->n_pipelines] = p->p; 253 254 tdp->p = p->p; 255 tdp->timer_period = 256 (rte_get_tsc_hz() * p->timer_period_ms) / 1000; 257 tdp->time_next = rte_get_tsc_cycles() + tdp->timer_period; 258 259 td->n_pipelines++; 260 261 /* Pipeline */ 262 p->thread_id = thread_id; 263 p->enabled = 1; 264 265 return 0; 266 } 267 268 /* Allocate request */ 269 req = thread_msg_alloc(); 270 if (req == NULL) 271 return -1; 272 273 /* Write request */ 274 req->type = THREAD_REQ_PIPELINE_ENABLE; 275 req->pipeline_enable.p = p->p; 276 req->pipeline_enable.timer_period_ms = p->timer_period_ms; 277 278 /* Send request and wait for response */ 279 rsp = thread_msg_send_recv(thread_id, req); 280 281 /* Read response */ 282 status = rsp->status; 283 284 /* Free response */ 285 thread_msg_free(rsp); 286 287 /* Request completion */ 288 if (status) 289 return status; 290 291 p->thread_id = thread_id; 292 p->enabled = 1; 293 294 return 0; 295 } 296 297 int 298 thread_pipeline_disable(uint32_t thread_id, 299 struct obj *obj, 300 const char *pipeline_name) 301 { 302 struct pipeline *p = pipeline_find(obj, pipeline_name); 303 struct thread *t; 304 struct thread_msg_req *req; 305 struct thread_msg_rsp *rsp; 306 int status; 307 308 /* Check input params */ 309 if ((thread_id >= RTE_MAX_LCORE) || 310 (p == NULL)) 311 return -1; 312 313 t = &thread[thread_id]; 314 if (t->enabled == 0) 315 return -1; 316 317 if (p->enabled == 0) 318 return 0; 319 320 if (p->thread_id != thread_id) 321 return -1; 322 323 if (!thread_is_running(thread_id)) { 324 struct thread_data *td = &thread_data[thread_id]; 325 uint32_t i; 326 327 for (i = 0; i < td->n_pipelines; i++) { 328 struct pipeline_data *tdp = &td->pipeline_data[i]; 329 330 if (tdp->p != p->p) 331 continue; 332 333 /* Data plane thread */ 334 if (i < td->n_pipelines - 1) { 335 struct rte_swx_pipeline *pipeline_last = 336 td->p[td->n_pipelines - 1]; 337 struct pipeline_data *tdp_last = 338 &td->pipeline_data[td->n_pipelines - 1]; 339 340 td->p[i] = pipeline_last; 341 memcpy(tdp, tdp_last, sizeof(*tdp)); 342 } 343 344 td->n_pipelines--; 345 346 /* Pipeline */ 347 p->enabled = 0; 348 349 break; 350 } 351 352 return 0; 353 } 354 355 /* Allocate request */ 356 req = thread_msg_alloc(); 357 if (req == NULL) 358 return -1; 359 360 /* Write request */ 361 req->type = THREAD_REQ_PIPELINE_DISABLE; 362 req->pipeline_disable.p = p->p; 363 364 /* Send request and wait for response */ 365 rsp = thread_msg_send_recv(thread_id, req); 366 367 /* Read response */ 368 status = rsp->status; 369 370 /* Free response */ 371 thread_msg_free(rsp); 372 373 /* Request completion */ 374 if (status) 375 return status; 376 377 p->enabled = 0; 378 379 return 0; 380 } 381 382 /** 383 * Data plane threads: message handling 384 */ 385 static inline struct thread_msg_req * 386 thread_msg_recv(struct rte_ring *msgq_req) 387 { 388 struct thread_msg_req *req; 389 390 int status = rte_ring_sc_dequeue(msgq_req, (void **) &req); 391 392 if (status != 0) 393 return NULL; 394 395 return req; 396 } 397 398 static inline void 399 thread_msg_send(struct rte_ring *msgq_rsp, 400 struct thread_msg_rsp *rsp) 401 { 402 int status; 403 404 do { 405 status = rte_ring_sp_enqueue(msgq_rsp, rsp); 406 } while (status == -ENOBUFS); 407 } 408 409 static struct thread_msg_rsp * 410 thread_msg_handle_pipeline_enable(struct thread_data *t, 411 struct thread_msg_req *req) 412 { 413 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 414 struct pipeline_data *p = &t->pipeline_data[t->n_pipelines]; 415 416 /* Request */ 417 if (t->n_pipelines >= THREAD_PIPELINES_MAX) { 418 rsp->status = -1; 419 return rsp; 420 } 421 422 t->p[t->n_pipelines] = req->pipeline_enable.p; 423 424 p->p = req->pipeline_enable.p; 425 p->timer_period = (rte_get_tsc_hz() * 426 req->pipeline_enable.timer_period_ms) / 1000; 427 p->time_next = rte_get_tsc_cycles() + p->timer_period; 428 429 t->n_pipelines++; 430 431 /* Response */ 432 rsp->status = 0; 433 return rsp; 434 } 435 436 static struct thread_msg_rsp * 437 thread_msg_handle_pipeline_disable(struct thread_data *t, 438 struct thread_msg_req *req) 439 { 440 struct thread_msg_rsp *rsp = (struct thread_msg_rsp *) req; 441 uint32_t n_pipelines = t->n_pipelines; 442 struct rte_swx_pipeline *pipeline = req->pipeline_disable.p; 443 uint32_t i; 444 445 /* find pipeline */ 446 for (i = 0; i < n_pipelines; i++) { 447 struct pipeline_data *p = &t->pipeline_data[i]; 448 449 if (p->p != pipeline) 450 continue; 451 452 if (i < n_pipelines - 1) { 453 struct rte_swx_pipeline *pipeline_last = 454 t->p[n_pipelines - 1]; 455 struct pipeline_data *p_last = 456 &t->pipeline_data[n_pipelines - 1]; 457 458 t->p[i] = pipeline_last; 459 memcpy(p, p_last, sizeof(*p)); 460 } 461 462 t->n_pipelines--; 463 464 rsp->status = 0; 465 return rsp; 466 } 467 468 /* should not get here */ 469 rsp->status = 0; 470 return rsp; 471 } 472 473 static void 474 thread_msg_handle(struct thread_data *t) 475 { 476 for ( ; ; ) { 477 struct thread_msg_req *req; 478 struct thread_msg_rsp *rsp; 479 480 req = thread_msg_recv(t->msgq_req); 481 if (req == NULL) 482 break; 483 484 switch (req->type) { 485 case THREAD_REQ_PIPELINE_ENABLE: 486 rsp = thread_msg_handle_pipeline_enable(t, req); 487 break; 488 489 case THREAD_REQ_PIPELINE_DISABLE: 490 rsp = thread_msg_handle_pipeline_disable(t, req); 491 break; 492 493 default: 494 rsp = (struct thread_msg_rsp *) req; 495 rsp->status = -1; 496 } 497 498 thread_msg_send(t->msgq_rsp, rsp); 499 } 500 } 501 502 /** 503 * Data plane threads: main 504 */ 505 int 506 thread_main(void *arg __rte_unused) 507 { 508 struct thread_data *t; 509 uint32_t thread_id, i; 510 511 thread_id = rte_lcore_id(); 512 t = &thread_data[thread_id]; 513 514 /* Dispatch loop */ 515 for (i = 0; ; i++) { 516 uint32_t j; 517 518 /* Data Plane */ 519 for (j = 0; j < t->n_pipelines; j++) 520 rte_swx_pipeline_run(t->p[j], 1000000); 521 522 /* Control Plane */ 523 if ((i & 0xF) == 0) { 524 uint64_t time = rte_get_tsc_cycles(); 525 uint64_t time_next_min = UINT64_MAX; 526 527 if (time < t->time_next_min) 528 continue; 529 530 /* Thread message queues */ 531 { 532 uint64_t time_next = t->time_next; 533 534 if (time_next <= time) { 535 thread_msg_handle(t); 536 time_next = time + t->timer_period; 537 t->time_next = time_next; 538 } 539 540 if (time_next < time_next_min) 541 time_next_min = time_next; 542 } 543 544 t->time_next_min = time_next_min; 545 } 546 } 547 548 return 0; 549 } 550