1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <ctype.h> 6 #include <errno.h> 7 #include <stdlib.h> 8 #ifndef RTE_EXEC_ENV_WINDOWS 9 #include <unistd.h> 10 #include <pthread.h> 11 #include <sys/socket.h> 12 #include <sys/un.h> 13 #include <sys/stat.h> 14 #endif /* !RTE_EXEC_ENV_WINDOWS */ 15 16 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 17 #undef RTE_USE_LIBBSD 18 #include <rte_string_fns.h> 19 #include <rte_common.h> 20 #include <rte_spinlock.h> 21 #include <rte_log.h> 22 23 #include "rte_telemetry.h" 24 #include "telemetry_json.h" 25 #include "telemetry_data.h" 26 #include "telemetry_internal.h" 27 28 #define MAX_CMD_LEN 56 29 #define MAX_OUTPUT_LEN (1024 * 16) 30 #define MAX_CONNECTIONS 10 31 32 #ifndef RTE_EXEC_ENV_WINDOWS 33 static void * 34 client_handler(void *socket); 35 #endif /* !RTE_EXEC_ENV_WINDOWS */ 36 37 struct cmd_callback { 38 char cmd[MAX_CMD_LEN]; 39 telemetry_cb fn; 40 char help[RTE_TEL_MAX_STRING_LEN]; 41 }; 42 43 #ifndef RTE_EXEC_ENV_WINDOWS 44 struct socket { 45 int sock; 46 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 47 handler fn; 48 uint16_t *num_clients; 49 }; 50 static struct socket v2_socket; /* socket for v2 telemetry */ 51 static struct socket v1_socket; /* socket for v1 telemetry */ 52 #endif /* !RTE_EXEC_ENV_WINDOWS */ 53 54 static const char *telemetry_version; /* save rte_version */ 55 static const char *socket_dir; /* runtime directory */ 56 static rte_cpuset_t *thread_cpuset; 57 static rte_log_fn rte_log_ptr; 58 static uint32_t logtype; 59 60 #define TMTY_LOG(l, ...) \ 61 rte_log_ptr(RTE_LOG_ ## l, logtype, "TELEMETRY: " __VA_ARGS__) 62 63 /* list of command callbacks, with one command registered by default */ 64 static struct cmd_callback *callbacks; 65 static int num_callbacks; /* How many commands are registered */ 66 /* Used when accessing or modifying list of command callbacks */ 67 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 68 #ifndef RTE_EXEC_ENV_WINDOWS 69 static uint16_t v2_clients; 70 #endif /* !RTE_EXEC_ENV_WINDOWS */ 71 72 int 73 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 74 { 75 struct cmd_callback *new_callbacks; 76 const char *cmdp = cmd; 77 int i = 0; 78 79 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 80 || strlen(help) >= RTE_TEL_MAX_STRING_LEN) 81 return -EINVAL; 82 83 while (*cmdp != '\0') { 84 if (!isalnum(*cmdp) && *cmdp != '_' && *cmdp != '/') 85 return -EINVAL; 86 cmdp++; 87 } 88 89 rte_spinlock_lock(&callback_sl); 90 new_callbacks = realloc(callbacks, sizeof(callbacks[0]) * (num_callbacks + 1)); 91 if (new_callbacks == NULL) { 92 rte_spinlock_unlock(&callback_sl); 93 return -ENOMEM; 94 } 95 callbacks = new_callbacks; 96 97 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 98 i++; 99 if (i != num_callbacks) 100 /* Move elements to keep the list alphabetical */ 101 memmove(callbacks + i + 1, callbacks + i, 102 sizeof(struct cmd_callback) * (num_callbacks - i)); 103 104 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 105 callbacks[i].fn = fn; 106 strlcpy(callbacks[i].help, help, RTE_TEL_MAX_STRING_LEN); 107 num_callbacks++; 108 rte_spinlock_unlock(&callback_sl); 109 110 return 0; 111 } 112 113 #ifndef RTE_EXEC_ENV_WINDOWS 114 115 static int 116 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 117 struct rte_tel_data *d) 118 { 119 int i; 120 121 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 122 rte_spinlock_lock(&callback_sl); 123 for (i = 0; i < num_callbacks; i++) 124 rte_tel_data_add_array_string(d, callbacks[i].cmd); 125 rte_spinlock_unlock(&callback_sl); 126 return 0; 127 } 128 129 static int 130 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 131 struct rte_tel_data *d) 132 { 133 rte_tel_data_start_dict(d); 134 rte_tel_data_add_dict_string(d, "version", telemetry_version); 135 rte_tel_data_add_dict_int(d, "pid", getpid()); 136 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 137 return 0; 138 } 139 140 static int 141 command_help(const char *cmd __rte_unused, const char *params, 142 struct rte_tel_data *d) 143 { 144 int i; 145 /* if no parameters return our own help text */ 146 const char *to_lookup = (params == NULL ? cmd : params); 147 148 rte_tel_data_start_dict(d); 149 rte_spinlock_lock(&callback_sl); 150 for (i = 0; i < num_callbacks; i++) 151 if (strcmp(to_lookup, callbacks[i].cmd) == 0) { 152 if (params == NULL) 153 rte_tel_data_string(d, callbacks[i].help); 154 else 155 rte_tel_data_add_dict_string(d, params, callbacks[i].help); 156 break; 157 } 158 rte_spinlock_unlock(&callback_sl); 159 if (i == num_callbacks) 160 return -1; 161 return 0; 162 } 163 164 static int 165 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 166 { 167 size_t used = 0; 168 unsigned int i; 169 170 if (d->type != TEL_DICT && d->type != TEL_ARRAY_UINT && 171 d->type != TEL_ARRAY_INT && d->type != TEL_ARRAY_STRING) 172 return snprintf(out_buf, buf_len, "null"); 173 174 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 175 if (d->type == TEL_ARRAY_UINT) 176 for (i = 0; i < d->data_len; i++) 177 used = rte_tel_json_add_array_uint(out_buf, 178 buf_len, used, 179 d->data.array[i].uval); 180 if (d->type == TEL_ARRAY_INT) 181 for (i = 0; i < d->data_len; i++) 182 used = rte_tel_json_add_array_int(out_buf, 183 buf_len, used, 184 d->data.array[i].ival); 185 if (d->type == TEL_ARRAY_STRING) 186 for (i = 0; i < d->data_len; i++) 187 used = rte_tel_json_add_array_string(out_buf, 188 buf_len, used, 189 d->data.array[i].sval); 190 if (d->type == TEL_DICT) 191 for (i = 0; i < d->data_len; i++) { 192 const struct tel_dict_entry *v = &d->data.dict[i]; 193 switch (v->type) { 194 case RTE_TEL_STRING_VAL: 195 used = rte_tel_json_add_obj_str(out_buf, 196 buf_len, used, 197 v->name, v->value.sval); 198 break; 199 case RTE_TEL_INT_VAL: 200 used = rte_tel_json_add_obj_int(out_buf, 201 buf_len, used, 202 v->name, v->value.ival); 203 break; 204 case RTE_TEL_UINT_VAL: 205 used = rte_tel_json_add_obj_uint(out_buf, 206 buf_len, used, 207 v->name, v->value.uval); 208 break; 209 case RTE_TEL_CONTAINER: 210 { 211 char temp[buf_len]; 212 const struct container *cont = 213 &v->value.container; 214 if (container_to_json(cont->data, 215 temp, buf_len) != 0) 216 used = rte_tel_json_add_obj_json( 217 out_buf, 218 buf_len, used, 219 v->name, temp); 220 if (!cont->keep) 221 rte_tel_data_free(cont->data); 222 break; 223 } 224 } 225 } 226 227 return used; 228 } 229 230 static void 231 output_json(const char *cmd, const struct rte_tel_data *d, int s) 232 { 233 char out_buf[MAX_OUTPUT_LEN]; 234 235 char *cb_data_buf; 236 size_t buf_len, prefix_used, used = 0; 237 unsigned int i; 238 239 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 240 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 241 242 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 243 MAX_CMD_LEN, cmd); 244 cb_data_buf = &out_buf[prefix_used]; 245 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 246 247 switch (d->type) { 248 case TEL_NULL: 249 used = strlcpy(cb_data_buf, "null", buf_len); 250 break; 251 252 case TEL_STRING: 253 used = rte_tel_json_str(cb_data_buf, buf_len, 0, d->data.str); 254 break; 255 256 case TEL_DICT: 257 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 258 for (i = 0; i < d->data_len; i++) { 259 const struct tel_dict_entry *v = &d->data.dict[i]; 260 switch (v->type) { 261 case RTE_TEL_STRING_VAL: 262 used = rte_tel_json_add_obj_str(cb_data_buf, 263 buf_len, used, 264 v->name, v->value.sval); 265 break; 266 case RTE_TEL_INT_VAL: 267 used = rte_tel_json_add_obj_int(cb_data_buf, 268 buf_len, used, 269 v->name, v->value.ival); 270 break; 271 case RTE_TEL_UINT_VAL: 272 used = rte_tel_json_add_obj_uint(cb_data_buf, 273 buf_len, used, 274 v->name, v->value.uval); 275 break; 276 case RTE_TEL_CONTAINER: 277 { 278 char temp[buf_len]; 279 const struct container *cont = 280 &v->value.container; 281 if (container_to_json(cont->data, 282 temp, buf_len) != 0) 283 used = rte_tel_json_add_obj_json( 284 cb_data_buf, 285 buf_len, used, 286 v->name, temp); 287 if (!cont->keep) 288 rte_tel_data_free(cont->data); 289 } 290 } 291 } 292 break; 293 294 case TEL_ARRAY_STRING: 295 case TEL_ARRAY_INT: 296 case TEL_ARRAY_UINT: 297 case TEL_ARRAY_CONTAINER: 298 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 299 for (i = 0; i < d->data_len; i++) 300 if (d->type == TEL_ARRAY_STRING) 301 used = rte_tel_json_add_array_string( 302 cb_data_buf, 303 buf_len, used, 304 d->data.array[i].sval); 305 else if (d->type == TEL_ARRAY_INT) 306 used = rte_tel_json_add_array_int(cb_data_buf, 307 buf_len, used, 308 d->data.array[i].ival); 309 else if (d->type == TEL_ARRAY_UINT) 310 used = rte_tel_json_add_array_uint(cb_data_buf, 311 buf_len, used, 312 d->data.array[i].uval); 313 else if (d->type == TEL_ARRAY_CONTAINER) { 314 char temp[buf_len]; 315 const struct container *rec_data = 316 &d->data.array[i].container; 317 if (container_to_json(rec_data->data, 318 temp, buf_len) != 0) 319 used = rte_tel_json_add_array_json( 320 cb_data_buf, 321 buf_len, used, temp); 322 if (!rec_data->keep) 323 rte_tel_data_free(rec_data->data); 324 } 325 break; 326 } 327 used += prefix_used; 328 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 329 if (write(s, out_buf, used) < 0) 330 perror("Error writing to socket"); 331 } 332 333 static void 334 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 335 { 336 struct rte_tel_data data = {0}; 337 338 int ret = fn(cmd, param, &data); 339 if (ret < 0) { 340 char out_buf[MAX_CMD_LEN + 10]; 341 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 342 MAX_CMD_LEN, cmd ? cmd : "none"); 343 if (write(s, out_buf, used) < 0) 344 perror("Error writing to socket"); 345 return; 346 } 347 output_json(cmd, &data, s); 348 } 349 350 static int 351 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 352 struct rte_tel_data *d) 353 { 354 return d->type = TEL_NULL; 355 } 356 357 static void * 358 client_handler(void *sock_id) 359 { 360 int s = (int)(uintptr_t)sock_id; 361 char buffer[1024]; 362 char info_str[1024]; 363 snprintf(info_str, sizeof(info_str), 364 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 365 telemetry_version, getpid(), MAX_OUTPUT_LEN); 366 if (write(s, info_str, strlen(info_str)) < 0) { 367 close(s); 368 return NULL; 369 } 370 371 /* receive data is not null terminated */ 372 int bytes = read(s, buffer, sizeof(buffer) - 1); 373 while (bytes > 0) { 374 buffer[bytes] = 0; 375 const char *cmd = strtok(buffer, ","); 376 const char *param = strtok(NULL, "\0"); 377 telemetry_cb fn = unknown_command; 378 int i; 379 380 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 381 rte_spinlock_lock(&callback_sl); 382 for (i = 0; i < num_callbacks; i++) 383 if (strcmp(cmd, callbacks[i].cmd) == 0) { 384 fn = callbacks[i].fn; 385 break; 386 } 387 rte_spinlock_unlock(&callback_sl); 388 } 389 perform_command(fn, cmd, param, s); 390 391 bytes = read(s, buffer, sizeof(buffer) - 1); 392 } 393 close(s); 394 __atomic_sub_fetch(&v2_clients, 1, __ATOMIC_RELAXED); 395 return NULL; 396 } 397 398 static void * 399 socket_listener(void *socket) 400 { 401 while (1) { 402 pthread_t th; 403 int rc; 404 struct socket *s = (struct socket *)socket; 405 int s_accepted = accept(s->sock, NULL, NULL); 406 if (s_accepted < 0) { 407 TMTY_LOG(ERR, "Error with accept, telemetry thread quitting\n"); 408 return NULL; 409 } 410 if (s->num_clients != NULL) { 411 uint16_t conns = __atomic_load_n(s->num_clients, 412 __ATOMIC_RELAXED); 413 if (conns >= MAX_CONNECTIONS) { 414 close(s_accepted); 415 continue; 416 } 417 __atomic_add_fetch(s->num_clients, 1, 418 __ATOMIC_RELAXED); 419 } 420 rc = pthread_create(&th, NULL, s->fn, 421 (void *)(uintptr_t)s_accepted); 422 if (rc != 0) { 423 TMTY_LOG(ERR, "Error with create client thread: %s\n", 424 strerror(rc)); 425 close(s_accepted); 426 if (s->num_clients != NULL) 427 __atomic_sub_fetch(s->num_clients, 1, 428 __ATOMIC_RELAXED); 429 continue; 430 } 431 pthread_detach(th); 432 } 433 return NULL; 434 } 435 436 static inline char * 437 get_socket_path(const char *runtime_dir, const int version) 438 { 439 static char path[PATH_MAX]; 440 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 441 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 442 return path; 443 } 444 445 static void 446 unlink_sockets(void) 447 { 448 if (v2_socket.path[0]) 449 unlink(v2_socket.path); 450 if (v1_socket.path[0]) 451 unlink(v1_socket.path); 452 } 453 454 static int 455 create_socket(char *path) 456 { 457 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 458 if (sock < 0) { 459 TMTY_LOG(ERR, "Error with socket creation, %s\n", strerror(errno)); 460 return -1; 461 } 462 463 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 464 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 465 TMTY_LOG(DEBUG, "Attempting socket bind to path '%s'\n", path); 466 467 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 468 struct stat st; 469 470 TMTY_LOG(DEBUG, "Initial bind to socket '%s' failed.\n", path); 471 472 /* first check if we have a runtime dir */ 473 if (stat(socket_dir, &st) < 0 || !S_ISDIR(st.st_mode)) { 474 TMTY_LOG(ERR, "Cannot access DPDK runtime directory: %s\n", socket_dir); 475 close(sock); 476 return -ENOENT; 477 } 478 479 /* check if current socket is active */ 480 if (connect(sock, (void *)&sun, sizeof(sun)) == 0) { 481 close(sock); 482 return -EADDRINUSE; 483 } 484 485 /* socket is not active, delete and attempt rebind */ 486 TMTY_LOG(DEBUG, "Attempting unlink and retrying bind\n"); 487 unlink(sun.sun_path); 488 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 489 TMTY_LOG(ERR, "Error binding socket: %s\n", strerror(errno)); 490 close(sock); 491 return -errno; /* if unlink failed, this will be -EADDRINUSE as above */ 492 } 493 } 494 495 if (listen(sock, 1) < 0) { 496 TMTY_LOG(ERR, "Error calling listen for socket: %s\n", strerror(errno)); 497 unlink(sun.sun_path); 498 close(sock); 499 return -errno; 500 } 501 TMTY_LOG(DEBUG, "Socket creation and binding ok\n"); 502 503 return sock; 504 } 505 506 static void 507 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 508 { 509 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 510 #if __GLIBC_PREREQ(2, 12) 511 pthread_setname_np(id, name); 512 #endif 513 #elif defined RTE_EXEC_ENV_FREEBSD 514 pthread_set_name_np(id, name); 515 #endif 516 } 517 518 static int 519 telemetry_legacy_init(void) 520 { 521 pthread_t t_old; 522 int rc; 523 524 if (num_legacy_callbacks == 1) { 525 TMTY_LOG(WARNING, "No legacy callbacks, legacy socket not created\n"); 526 return -1; 527 } 528 529 v1_socket.fn = legacy_client_handler; 530 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 531 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 532 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 533 return -1; 534 } 535 v1_socket.sock = create_socket(v1_socket.path); 536 if (v1_socket.sock < 0) { 537 v1_socket.path[0] = '\0'; 538 return -1; 539 } 540 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 541 if (rc != 0) { 542 TMTY_LOG(ERR, "Error with create legacy socket thread: %s\n", 543 strerror(rc)); 544 close(v1_socket.sock); 545 v1_socket.sock = -1; 546 unlink(v1_socket.path); 547 v1_socket.path[0] = '\0'; 548 return -1; 549 } 550 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 551 set_thread_name(t_old, "telemetry-v1"); 552 TMTY_LOG(DEBUG, "Legacy telemetry socket initialized ok\n"); 553 pthread_detach(t_old); 554 return 0; 555 } 556 557 static int 558 telemetry_v2_init(void) 559 { 560 char spath[sizeof(v2_socket.path)]; 561 pthread_t t_new; 562 short suffix = 0; 563 int rc; 564 565 v2_socket.num_clients = &v2_clients; 566 rte_telemetry_register_cmd("/", list_commands, 567 "Returns list of available commands, Takes no parameters"); 568 rte_telemetry_register_cmd("/info", json_info, 569 "Returns DPDK Telemetry information. Takes no parameters"); 570 rte_telemetry_register_cmd("/help", command_help, 571 "Returns help text for a command. Parameters: string command"); 572 v2_socket.fn = client_handler; 573 if (strlcpy(spath, get_socket_path(socket_dir, 2), sizeof(spath)) >= sizeof(spath)) { 574 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 575 return -1; 576 } 577 memcpy(v2_socket.path, spath, sizeof(v2_socket.path)); 578 579 v2_socket.sock = create_socket(v2_socket.path); 580 while (v2_socket.sock < 0) { 581 /* bail out on unexpected error, or suffix wrap-around */ 582 if (v2_socket.sock != -EADDRINUSE || suffix < 0) { 583 v2_socket.path[0] = '\0'; /* clear socket path */ 584 return -1; 585 } 586 /* add a suffix to the path if the basic version fails */ 587 if (snprintf(v2_socket.path, sizeof(v2_socket.path), "%s:%d", 588 spath, ++suffix) >= (int)sizeof(v2_socket.path)) { 589 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 590 return -1; 591 } 592 v2_socket.sock = create_socket(v2_socket.path); 593 } 594 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 595 if (rc != 0) { 596 TMTY_LOG(ERR, "Error with create socket thread: %s\n", 597 strerror(rc)); 598 close(v2_socket.sock); 599 v2_socket.sock = -1; 600 unlink(v2_socket.path); 601 v2_socket.path[0] = '\0'; 602 return -1; 603 } 604 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 605 set_thread_name(t_new, "telemetry-v2"); 606 pthread_detach(t_new); 607 atexit(unlink_sockets); 608 609 return 0; 610 } 611 612 #endif /* !RTE_EXEC_ENV_WINDOWS */ 613 614 int32_t 615 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset, 616 rte_log_fn log_fn, uint32_t registered_logtype) 617 { 618 telemetry_version = rte_version; 619 socket_dir = runtime_dir; 620 thread_cpuset = cpuset; 621 rte_log_ptr = log_fn; 622 logtype = registered_logtype; 623 624 #ifndef RTE_EXEC_ENV_WINDOWS 625 if (telemetry_v2_init() != 0) 626 return -1; 627 TMTY_LOG(DEBUG, "Telemetry initialized ok\n"); 628 telemetry_legacy_init(); 629 #endif /* RTE_EXEC_ENV_WINDOWS */ 630 631 return 0; 632 } 633