1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <ctype.h> 6 #include <errno.h> 7 #include <stdlib.h> 8 #ifndef RTE_EXEC_ENV_WINDOWS 9 #include <unistd.h> 10 #include <pthread.h> 11 #include <sys/socket.h> 12 #include <sys/un.h> 13 #include <sys/stat.h> 14 #endif /* !RTE_EXEC_ENV_WINDOWS */ 15 16 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 17 #undef RTE_USE_LIBBSD 18 #include <rte_string_fns.h> 19 #include <rte_common.h> 20 #include <rte_spinlock.h> 21 #include <rte_log.h> 22 23 #include "rte_telemetry.h" 24 #include "telemetry_json.h" 25 #include "telemetry_data.h" 26 #include "telemetry_internal.h" 27 28 #define MAX_CMD_LEN 56 29 #define MAX_OUTPUT_LEN (1024 * 16) 30 #define MAX_CONNECTIONS 10 31 32 #ifndef RTE_EXEC_ENV_WINDOWS 33 static void * 34 client_handler(void *socket); 35 #endif /* !RTE_EXEC_ENV_WINDOWS */ 36 37 struct cmd_callback { 38 char cmd[MAX_CMD_LEN]; 39 telemetry_cb fn; 40 char help[RTE_TEL_MAX_STRING_LEN]; 41 }; 42 43 #ifndef RTE_EXEC_ENV_WINDOWS 44 struct socket { 45 int sock; 46 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 47 handler fn; 48 RTE_ATOMIC(uint16_t) *num_clients; 49 }; 50 static struct socket v2_socket; /* socket for v2 telemetry */ 51 static struct socket v1_socket; /* socket for v1 telemetry */ 52 #endif /* !RTE_EXEC_ENV_WINDOWS */ 53 54 static const char *telemetry_version; /* save rte_version */ 55 static const char *socket_dir; /* runtime directory */ 56 static rte_cpuset_t *thread_cpuset; 57 58 RTE_LOG_REGISTER_DEFAULT(logtype, WARNING); 59 #define RTE_LOGTYPE_TELEMETRY logtype 60 #define TMTY_LOG_LINE(l, ...) RTE_LOG_LINE(l, TELEMETRY, "" __VA_ARGS__) 61 62 /* list of command callbacks, with one command registered by default */ 63 static struct cmd_callback *callbacks; 64 static int num_callbacks; /* How many commands are registered */ 65 /* Used when accessing or modifying list of command callbacks */ 66 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 67 #ifndef RTE_EXEC_ENV_WINDOWS 68 static RTE_ATOMIC(uint16_t) v2_clients; 69 #endif /* !RTE_EXEC_ENV_WINDOWS */ 70 71 int 72 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 73 { 74 struct cmd_callback *new_callbacks; 75 const char *cmdp = cmd; 76 int i = 0; 77 78 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 79 || strlen(help) >= RTE_TEL_MAX_STRING_LEN) 80 return -EINVAL; 81 82 while (*cmdp != '\0') { 83 if (!isalnum(*cmdp) && *cmdp != '_' && *cmdp != '/') 84 return -EINVAL; 85 cmdp++; 86 } 87 88 rte_spinlock_lock(&callback_sl); 89 new_callbacks = realloc(callbacks, sizeof(callbacks[0]) * (num_callbacks + 1)); 90 if (new_callbacks == NULL) { 91 rte_spinlock_unlock(&callback_sl); 92 return -ENOMEM; 93 } 94 callbacks = new_callbacks; 95 96 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 97 i++; 98 if (i != num_callbacks) 99 /* Move elements to keep the list alphabetical */ 100 memmove(callbacks + i + 1, callbacks + i, 101 sizeof(struct cmd_callback) * (num_callbacks - i)); 102 103 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 104 callbacks[i].fn = fn; 105 strlcpy(callbacks[i].help, help, RTE_TEL_MAX_STRING_LEN); 106 num_callbacks++; 107 rte_spinlock_unlock(&callback_sl); 108 109 return 0; 110 } 111 112 #ifndef RTE_EXEC_ENV_WINDOWS 113 114 static int 115 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 116 struct rte_tel_data *d) 117 { 118 int i; 119 120 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 121 rte_spinlock_lock(&callback_sl); 122 for (i = 0; i < num_callbacks; i++) 123 rte_tel_data_add_array_string(d, callbacks[i].cmd); 124 rte_spinlock_unlock(&callback_sl); 125 return 0; 126 } 127 128 static int 129 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 130 struct rte_tel_data *d) 131 { 132 rte_tel_data_start_dict(d); 133 rte_tel_data_add_dict_string(d, "version", telemetry_version); 134 rte_tel_data_add_dict_int(d, "pid", getpid()); 135 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 136 return 0; 137 } 138 139 static int 140 command_help(const char *cmd __rte_unused, const char *params, 141 struct rte_tel_data *d) 142 { 143 int i; 144 /* if no parameters return our own help text */ 145 const char *to_lookup = (params == NULL ? cmd : params); 146 147 rte_tel_data_start_dict(d); 148 rte_spinlock_lock(&callback_sl); 149 for (i = 0; i < num_callbacks; i++) 150 if (strcmp(to_lookup, callbacks[i].cmd) == 0) { 151 if (params == NULL) 152 rte_tel_data_string(d, callbacks[i].help); 153 else 154 rte_tel_data_add_dict_string(d, params, callbacks[i].help); 155 break; 156 } 157 rte_spinlock_unlock(&callback_sl); 158 if (i == num_callbacks) 159 return -1; 160 return 0; 161 } 162 163 static int 164 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 165 { 166 size_t used = 0; 167 unsigned int i; 168 169 if (d->type != TEL_DICT && d->type != TEL_ARRAY_UINT && 170 d->type != TEL_ARRAY_INT && d->type != TEL_ARRAY_STRING) 171 return snprintf(out_buf, buf_len, "null"); 172 173 if (d->type == TEL_DICT) 174 used = rte_tel_json_empty_obj(out_buf, buf_len, 0); 175 else 176 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 177 178 if (d->type == TEL_ARRAY_UINT) 179 for (i = 0; i < d->data_len; i++) 180 used = rte_tel_json_add_array_uint(out_buf, 181 buf_len, used, 182 d->data.array[i].uval); 183 if (d->type == TEL_ARRAY_INT) 184 for (i = 0; i < d->data_len; i++) 185 used = rte_tel_json_add_array_int(out_buf, 186 buf_len, used, 187 d->data.array[i].ival); 188 if (d->type == TEL_ARRAY_STRING) 189 for (i = 0; i < d->data_len; i++) 190 used = rte_tel_json_add_array_string(out_buf, 191 buf_len, used, 192 d->data.array[i].sval); 193 if (d->type == TEL_DICT) 194 for (i = 0; i < d->data_len; i++) { 195 const struct tel_dict_entry *v = &d->data.dict[i]; 196 switch (v->type) { 197 case RTE_TEL_STRING_VAL: 198 used = rte_tel_json_add_obj_str(out_buf, 199 buf_len, used, 200 v->name, v->value.sval); 201 break; 202 case RTE_TEL_INT_VAL: 203 used = rte_tel_json_add_obj_int(out_buf, 204 buf_len, used, 205 v->name, v->value.ival); 206 break; 207 case RTE_TEL_UINT_VAL: 208 used = rte_tel_json_add_obj_uint(out_buf, 209 buf_len, used, 210 v->name, v->value.uval); 211 break; 212 case RTE_TEL_CONTAINER: 213 { 214 char *temp = malloc(buf_len); 215 if (temp == NULL) 216 break; 217 *temp = '\0'; /* ensure valid string */ 218 219 const struct container *cont = 220 &v->value.container; 221 if (container_to_json(cont->data, 222 temp, buf_len) != 0) 223 used = rte_tel_json_add_obj_json( 224 out_buf, 225 buf_len, used, 226 v->name, temp); 227 if (!cont->keep) 228 rte_tel_data_free(cont->data); 229 free(temp); 230 break; 231 } 232 } 233 } 234 235 return used; 236 } 237 238 static void 239 output_json(const char *cmd, const struct rte_tel_data *d, int s) 240 { 241 char out_buf[MAX_OUTPUT_LEN]; 242 243 char *cb_data_buf; 244 size_t buf_len, prefix_used, used = 0; 245 unsigned int i; 246 247 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 248 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 249 250 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 251 MAX_CMD_LEN, cmd); 252 cb_data_buf = &out_buf[prefix_used]; 253 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 254 255 switch (d->type) { 256 case TEL_NULL: 257 used = strlcpy(cb_data_buf, "null", buf_len); 258 break; 259 260 case TEL_STRING: 261 used = rte_tel_json_str(cb_data_buf, buf_len, 0, d->data.str); 262 break; 263 264 case TEL_DICT: 265 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 266 for (i = 0; i < d->data_len; i++) { 267 const struct tel_dict_entry *v = &d->data.dict[i]; 268 switch (v->type) { 269 case RTE_TEL_STRING_VAL: 270 used = rte_tel_json_add_obj_str(cb_data_buf, 271 buf_len, used, 272 v->name, v->value.sval); 273 break; 274 case RTE_TEL_INT_VAL: 275 used = rte_tel_json_add_obj_int(cb_data_buf, 276 buf_len, used, 277 v->name, v->value.ival); 278 break; 279 case RTE_TEL_UINT_VAL: 280 used = rte_tel_json_add_obj_uint(cb_data_buf, 281 buf_len, used, 282 v->name, v->value.uval); 283 break; 284 case RTE_TEL_CONTAINER: 285 { 286 char *temp = malloc(buf_len); 287 if (temp == NULL) 288 break; 289 *temp = '\0'; /* ensure valid string */ 290 291 const struct container *cont = 292 &v->value.container; 293 if (container_to_json(cont->data, 294 temp, buf_len) != 0) 295 used = rte_tel_json_add_obj_json( 296 cb_data_buf, 297 buf_len, used, 298 v->name, temp); 299 if (!cont->keep) 300 rte_tel_data_free(cont->data); 301 free(temp); 302 } 303 } 304 } 305 break; 306 307 case TEL_ARRAY_STRING: 308 case TEL_ARRAY_INT: 309 case TEL_ARRAY_UINT: 310 case TEL_ARRAY_CONTAINER: 311 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 312 for (i = 0; i < d->data_len; i++) 313 if (d->type == TEL_ARRAY_STRING) 314 used = rte_tel_json_add_array_string( 315 cb_data_buf, 316 buf_len, used, 317 d->data.array[i].sval); 318 else if (d->type == TEL_ARRAY_INT) 319 used = rte_tel_json_add_array_int(cb_data_buf, 320 buf_len, used, 321 d->data.array[i].ival); 322 else if (d->type == TEL_ARRAY_UINT) 323 used = rte_tel_json_add_array_uint(cb_data_buf, 324 buf_len, used, 325 d->data.array[i].uval); 326 else if (d->type == TEL_ARRAY_CONTAINER) { 327 char *temp = malloc(buf_len); 328 if (temp == NULL) 329 break; 330 *temp = '\0'; /* ensure valid string */ 331 332 const struct container *rec_data = 333 &d->data.array[i].container; 334 if (container_to_json(rec_data->data, 335 temp, buf_len) != 0) 336 used = rte_tel_json_add_array_json( 337 cb_data_buf, 338 buf_len, used, temp); 339 if (!rec_data->keep) 340 rte_tel_data_free(rec_data->data); 341 free(temp); 342 } 343 break; 344 } 345 used += prefix_used; 346 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 347 if (write(s, out_buf, used) < 0) 348 perror("Error writing to socket"); 349 } 350 351 static void 352 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 353 { 354 struct rte_tel_data data = {0}; 355 356 int ret = fn(cmd, param, &data); 357 if (ret < 0) { 358 char out_buf[MAX_CMD_LEN + 10]; 359 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 360 MAX_CMD_LEN, cmd ? cmd : "none"); 361 if (write(s, out_buf, used) < 0) 362 perror("Error writing to socket"); 363 return; 364 } 365 output_json(cmd, &data, s); 366 } 367 368 static int 369 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 370 struct rte_tel_data *d) 371 { 372 return d->type = TEL_NULL; 373 } 374 375 static void * 376 client_handler(void *sock_id) 377 { 378 int s = (int)(uintptr_t)sock_id; 379 char buffer[1024]; 380 char info_str[1024]; 381 snprintf(info_str, sizeof(info_str), 382 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 383 telemetry_version, getpid(), MAX_OUTPUT_LEN); 384 if (write(s, info_str, strlen(info_str)) < 0) { 385 TMTY_LOG_LINE(ERR, "Socket write base info to client failed"); 386 goto exit; 387 } 388 389 /* receive data is not null terminated */ 390 int bytes = read(s, buffer, sizeof(buffer) - 1); 391 while (bytes > 0) { 392 buffer[bytes] = 0; 393 const char *cmd = strtok(buffer, ","); 394 const char *param = strtok(NULL, "\0"); 395 telemetry_cb fn = unknown_command; 396 int i; 397 398 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 399 rte_spinlock_lock(&callback_sl); 400 for (i = 0; i < num_callbacks; i++) 401 if (strcmp(cmd, callbacks[i].cmd) == 0) { 402 fn = callbacks[i].fn; 403 break; 404 } 405 rte_spinlock_unlock(&callback_sl); 406 } 407 perform_command(fn, cmd, param, s); 408 409 bytes = read(s, buffer, sizeof(buffer) - 1); 410 } 411 exit: 412 close(s); 413 rte_atomic_fetch_sub_explicit(&v2_clients, 1, rte_memory_order_relaxed); 414 return NULL; 415 } 416 417 static void * 418 socket_listener(void *socket) 419 { 420 while (1) { 421 pthread_t th; 422 int rc; 423 struct socket *s = (struct socket *)socket; 424 int s_accepted = accept(s->sock, NULL, NULL); 425 if (s_accepted < 0) { 426 TMTY_LOG_LINE(ERR, "Error with accept, telemetry thread quitting"); 427 return NULL; 428 } 429 if (s->num_clients != NULL) { 430 uint16_t conns = rte_atomic_load_explicit(s->num_clients, 431 rte_memory_order_relaxed); 432 if (conns >= MAX_CONNECTIONS) { 433 close(s_accepted); 434 continue; 435 } 436 rte_atomic_fetch_add_explicit(s->num_clients, 1, 437 rte_memory_order_relaxed); 438 } 439 rc = pthread_create(&th, NULL, s->fn, 440 (void *)(uintptr_t)s_accepted); 441 if (rc != 0) { 442 TMTY_LOG_LINE(ERR, "Error with create client thread: %s", 443 strerror(rc)); 444 close(s_accepted); 445 if (s->num_clients != NULL) 446 rte_atomic_fetch_sub_explicit(s->num_clients, 1, 447 rte_memory_order_relaxed); 448 continue; 449 } 450 pthread_detach(th); 451 } 452 return NULL; 453 } 454 455 static inline char * 456 get_socket_path(const char *runtime_dir, const int version) 457 { 458 static char path[PATH_MAX]; 459 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 460 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 461 return path; 462 } 463 464 static void 465 unlink_sockets(void) 466 { 467 if (v2_socket.path[0]) 468 unlink(v2_socket.path); 469 if (v1_socket.path[0]) 470 unlink(v1_socket.path); 471 } 472 473 static int 474 create_socket(char *path) 475 { 476 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 477 if (sock < 0) { 478 TMTY_LOG_LINE(ERR, "Error with socket creation, %s", strerror(errno)); 479 return -1; 480 } 481 482 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 483 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 484 TMTY_LOG_LINE(DEBUG, "Attempting socket bind to path '%s'", path); 485 486 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 487 struct stat st; 488 489 TMTY_LOG_LINE(DEBUG, "Initial bind to socket '%s' failed.", path); 490 491 /* first check if we have a runtime dir */ 492 if (stat(socket_dir, &st) < 0 || !S_ISDIR(st.st_mode)) { 493 TMTY_LOG_LINE(ERR, "Cannot access DPDK runtime directory: %s", socket_dir); 494 close(sock); 495 return -ENOENT; 496 } 497 498 /* check if current socket is active */ 499 if (connect(sock, (void *)&sun, sizeof(sun)) == 0) { 500 close(sock); 501 return -EADDRINUSE; 502 } 503 504 /* socket is not active, delete and attempt rebind */ 505 TMTY_LOG_LINE(DEBUG, "Attempting unlink and retrying bind"); 506 unlink(sun.sun_path); 507 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 508 TMTY_LOG_LINE(ERR, "Error binding socket: %s", strerror(errno)); 509 close(sock); 510 return -errno; /* if unlink failed, this will be -EADDRINUSE as above */ 511 } 512 } 513 514 if (listen(sock, 1) < 0) { 515 TMTY_LOG_LINE(ERR, "Error calling listen for socket: %s", strerror(errno)); 516 unlink(sun.sun_path); 517 close(sock); 518 return -errno; 519 } 520 TMTY_LOG_LINE(DEBUG, "Socket creation and binding ok"); 521 522 return sock; 523 } 524 525 static void 526 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 527 { 528 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 529 #if __GLIBC_PREREQ(2, 12) 530 pthread_setname_np(id, name); 531 #endif 532 #elif defined RTE_EXEC_ENV_FREEBSD 533 pthread_set_name_np(id, name); 534 #endif 535 } 536 537 static int 538 telemetry_legacy_init(void) 539 { 540 pthread_t t_old; 541 int rc; 542 543 if (num_legacy_callbacks == 1) { 544 TMTY_LOG_LINE(WARNING, "No legacy callbacks, legacy socket not created"); 545 return -1; 546 } 547 548 v1_socket.fn = legacy_client_handler; 549 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 550 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 551 TMTY_LOG_LINE(ERR, "Error with socket binding, path too long"); 552 return -1; 553 } 554 v1_socket.sock = create_socket(v1_socket.path); 555 if (v1_socket.sock < 0) { 556 v1_socket.path[0] = '\0'; 557 return -1; 558 } 559 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 560 if (rc != 0) { 561 TMTY_LOG_LINE(ERR, "Error with create legacy socket thread: %s", 562 strerror(rc)); 563 close(v1_socket.sock); 564 v1_socket.sock = -1; 565 unlink(v1_socket.path); 566 v1_socket.path[0] = '\0'; 567 return -1; 568 } 569 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 570 set_thread_name(t_old, "dpdk-telemet-v1"); 571 TMTY_LOG_LINE(DEBUG, "Legacy telemetry socket initialized ok"); 572 pthread_detach(t_old); 573 return 0; 574 } 575 576 static int 577 telemetry_v2_init(void) 578 { 579 char spath[sizeof(v2_socket.path)]; 580 pthread_t t_new; 581 short suffix = 0; 582 int rc; 583 584 v2_socket.num_clients = &v2_clients; 585 rte_telemetry_register_cmd("/", list_commands, 586 "Returns list of available commands, Takes no parameters"); 587 rte_telemetry_register_cmd("/info", json_info, 588 "Returns DPDK Telemetry information. Takes no parameters"); 589 rte_telemetry_register_cmd("/help", command_help, 590 "Returns help text for a command. Parameters: string command"); 591 v2_socket.fn = client_handler; 592 if (strlcpy(spath, get_socket_path(socket_dir, 2), sizeof(spath)) >= sizeof(spath)) { 593 TMTY_LOG_LINE(ERR, "Error with socket binding, path too long"); 594 return -1; 595 } 596 memcpy(v2_socket.path, spath, sizeof(v2_socket.path)); 597 598 v2_socket.sock = create_socket(v2_socket.path); 599 while (v2_socket.sock < 0) { 600 /* bail out on unexpected error, or suffix wrap-around */ 601 if (v2_socket.sock != -EADDRINUSE || suffix < 0) { 602 v2_socket.path[0] = '\0'; /* clear socket path */ 603 return -1; 604 } 605 /* add a suffix to the path if the basic version fails */ 606 if (snprintf(v2_socket.path, sizeof(v2_socket.path), "%s:%d", 607 spath, ++suffix) >= (int)sizeof(v2_socket.path)) { 608 TMTY_LOG_LINE(ERR, "Error with socket binding, path too long"); 609 return -1; 610 } 611 v2_socket.sock = create_socket(v2_socket.path); 612 } 613 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 614 if (rc != 0) { 615 TMTY_LOG_LINE(ERR, "Error with create socket thread: %s", 616 strerror(rc)); 617 close(v2_socket.sock); 618 v2_socket.sock = -1; 619 unlink(v2_socket.path); 620 v2_socket.path[0] = '\0'; 621 return -1; 622 } 623 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 624 set_thread_name(t_new, "dpdk-telemet-v2"); 625 pthread_detach(t_new); 626 atexit(unlink_sockets); 627 628 return 0; 629 } 630 631 #endif /* !RTE_EXEC_ENV_WINDOWS */ 632 633 int32_t 634 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset) 635 { 636 telemetry_version = rte_version; 637 socket_dir = runtime_dir; 638 thread_cpuset = cpuset; 639 640 #ifndef RTE_EXEC_ENV_WINDOWS 641 if (telemetry_v2_init() != 0) 642 return -1; 643 TMTY_LOG_LINE(DEBUG, "Telemetry initialized ok"); 644 telemetry_legacy_init(); 645 #endif /* RTE_EXEC_ENV_WINDOWS */ 646 647 return 0; 648 } 649