1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #ifndef RTE_EXEC_ENV_WINDOWS 6 #include <unistd.h> 7 #include <pthread.h> 8 #include <sys/socket.h> 9 #include <sys/un.h> 10 #include <sys/stat.h> 11 #include <dlfcn.h> 12 #endif /* !RTE_EXEC_ENV_WINDOWS */ 13 14 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 15 #undef RTE_USE_LIBBSD 16 #include <rte_string_fns.h> 17 #include <rte_common.h> 18 #include <rte_spinlock.h> 19 #include <rte_log.h> 20 21 #include "rte_telemetry.h" 22 #include "telemetry_json.h" 23 #include "telemetry_data.h" 24 #include "telemetry_internal.h" 25 26 #define MAX_CMD_LEN 56 27 #define MAX_OUTPUT_LEN (1024 * 16) 28 #define MAX_CONNECTIONS 10 29 30 #ifndef RTE_EXEC_ENV_WINDOWS 31 static void * 32 client_handler(void *socket); 33 #endif /* !RTE_EXEC_ENV_WINDOWS */ 34 35 struct cmd_callback { 36 char cmd[MAX_CMD_LEN]; 37 telemetry_cb fn; 38 char help[RTE_TEL_MAX_STRING_LEN]; 39 }; 40 41 #ifndef RTE_EXEC_ENV_WINDOWS 42 struct socket { 43 int sock; 44 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 45 handler fn; 46 uint16_t *num_clients; 47 }; 48 static struct socket v2_socket; /* socket for v2 telemetry */ 49 static struct socket v1_socket; /* socket for v1 telemetry */ 50 #endif /* !RTE_EXEC_ENV_WINDOWS */ 51 52 static const char *telemetry_version; /* save rte_version */ 53 static const char *socket_dir; /* runtime directory */ 54 static rte_cpuset_t *thread_cpuset; 55 static rte_log_fn rte_log_ptr; 56 static uint32_t logtype; 57 58 #define TMTY_LOG(l, ...) \ 59 rte_log_ptr(RTE_LOG_ ## l, logtype, "TELEMETRY: " __VA_ARGS__) 60 61 /* list of command callbacks, with one command registered by default */ 62 static struct cmd_callback *callbacks; 63 static int num_callbacks; /* How many commands are registered */ 64 /* Used when accessing or modifying list of command callbacks */ 65 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 66 #ifndef RTE_EXEC_ENV_WINDOWS 67 static uint16_t v2_clients; 68 #endif /* !RTE_EXEC_ENV_WINDOWS */ 69 70 int 71 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 72 { 73 struct cmd_callback *new_callbacks; 74 int i = 0; 75 76 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 77 || strlen(help) >= RTE_TEL_MAX_STRING_LEN) 78 return -EINVAL; 79 80 rte_spinlock_lock(&callback_sl); 81 new_callbacks = realloc(callbacks, sizeof(callbacks[0]) * (num_callbacks + 1)); 82 if (new_callbacks == NULL) { 83 rte_spinlock_unlock(&callback_sl); 84 return -ENOMEM; 85 } 86 callbacks = new_callbacks; 87 88 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 89 i++; 90 if (i != num_callbacks) 91 /* Move elements to keep the list alphabetical */ 92 memmove(callbacks + i + 1, callbacks + i, 93 sizeof(struct cmd_callback) * (num_callbacks - i)); 94 95 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 96 callbacks[i].fn = fn; 97 strlcpy(callbacks[i].help, help, RTE_TEL_MAX_STRING_LEN); 98 num_callbacks++; 99 rte_spinlock_unlock(&callback_sl); 100 101 return 0; 102 } 103 104 #ifndef RTE_EXEC_ENV_WINDOWS 105 106 static int 107 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 108 struct rte_tel_data *d) 109 { 110 int i; 111 112 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 113 rte_spinlock_lock(&callback_sl); 114 for (i = 0; i < num_callbacks; i++) 115 rte_tel_data_add_array_string(d, callbacks[i].cmd); 116 rte_spinlock_unlock(&callback_sl); 117 return 0; 118 } 119 120 static int 121 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 122 struct rte_tel_data *d) 123 { 124 rte_tel_data_start_dict(d); 125 rte_tel_data_add_dict_string(d, "version", telemetry_version); 126 rte_tel_data_add_dict_int(d, "pid", getpid()); 127 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 128 return 0; 129 } 130 131 static int 132 command_help(const char *cmd __rte_unused, const char *params, 133 struct rte_tel_data *d) 134 { 135 int i; 136 137 if (!params) 138 return -1; 139 rte_tel_data_start_dict(d); 140 rte_spinlock_lock(&callback_sl); 141 for (i = 0; i < num_callbacks; i++) 142 if (strcmp(params, callbacks[i].cmd) == 0) { 143 rte_tel_data_add_dict_string(d, params, 144 callbacks[i].help); 145 break; 146 } 147 rte_spinlock_unlock(&callback_sl); 148 if (i == num_callbacks) 149 return -1; 150 return 0; 151 } 152 153 static int 154 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 155 { 156 size_t used = 0; 157 unsigned int i; 158 159 if (d->type != RTE_TEL_DICT && d->type != RTE_TEL_ARRAY_U64 && 160 d->type != RTE_TEL_ARRAY_INT && d->type != RTE_TEL_ARRAY_STRING) 161 return snprintf(out_buf, buf_len, "null"); 162 163 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 164 if (d->type == RTE_TEL_ARRAY_U64) 165 for (i = 0; i < d->data_len; i++) 166 used = rte_tel_json_add_array_u64(out_buf, 167 buf_len, used, 168 d->data.array[i].u64val); 169 if (d->type == RTE_TEL_ARRAY_INT) 170 for (i = 0; i < d->data_len; i++) 171 used = rte_tel_json_add_array_int(out_buf, 172 buf_len, used, 173 d->data.array[i].ival); 174 if (d->type == RTE_TEL_ARRAY_STRING) 175 for (i = 0; i < d->data_len; i++) 176 used = rte_tel_json_add_array_string(out_buf, 177 buf_len, used, 178 d->data.array[i].sval); 179 if (d->type == RTE_TEL_DICT) 180 for (i = 0; i < d->data_len; i++) { 181 const struct tel_dict_entry *v = &d->data.dict[i]; 182 switch (v->type) { 183 case RTE_TEL_STRING_VAL: 184 used = rte_tel_json_add_obj_str(out_buf, 185 buf_len, used, 186 v->name, v->value.sval); 187 break; 188 case RTE_TEL_INT_VAL: 189 used = rte_tel_json_add_obj_int(out_buf, 190 buf_len, used, 191 v->name, v->value.ival); 192 break; 193 case RTE_TEL_U64_VAL: 194 used = rte_tel_json_add_obj_u64(out_buf, 195 buf_len, used, 196 v->name, v->value.u64val); 197 break; 198 case RTE_TEL_CONTAINER: 199 { 200 char temp[buf_len]; 201 const struct container *cont = 202 &v->value.container; 203 if (container_to_json(cont->data, 204 temp, buf_len) != 0) 205 used = rte_tel_json_add_obj_json( 206 out_buf, 207 buf_len, used, 208 v->name, temp); 209 if (!cont->keep) 210 rte_tel_data_free(cont->data); 211 break; 212 } 213 } 214 } 215 216 return used; 217 } 218 219 static void 220 output_json(const char *cmd, const struct rte_tel_data *d, int s) 221 { 222 char out_buf[MAX_OUTPUT_LEN]; 223 224 char *cb_data_buf; 225 size_t buf_len, prefix_used, used = 0; 226 unsigned int i; 227 228 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 229 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 230 switch (d->type) { 231 case RTE_TEL_NULL: 232 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 233 MAX_CMD_LEN, cmd ? cmd : "none"); 234 break; 235 case RTE_TEL_STRING: 236 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":\"%.*s\"}", 237 MAX_CMD_LEN, cmd, 238 RTE_TEL_MAX_SINGLE_STRING_LEN, d->data.str); 239 break; 240 case RTE_TEL_DICT: 241 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 242 MAX_CMD_LEN, cmd); 243 cb_data_buf = &out_buf[prefix_used]; 244 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 245 246 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 247 for (i = 0; i < d->data_len; i++) { 248 const struct tel_dict_entry *v = &d->data.dict[i]; 249 switch (v->type) { 250 case RTE_TEL_STRING_VAL: 251 used = rte_tel_json_add_obj_str(cb_data_buf, 252 buf_len, used, 253 v->name, v->value.sval); 254 break; 255 case RTE_TEL_INT_VAL: 256 used = rte_tel_json_add_obj_int(cb_data_buf, 257 buf_len, used, 258 v->name, v->value.ival); 259 break; 260 case RTE_TEL_U64_VAL: 261 used = rte_tel_json_add_obj_u64(cb_data_buf, 262 buf_len, used, 263 v->name, v->value.u64val); 264 break; 265 case RTE_TEL_CONTAINER: 266 { 267 char temp[buf_len]; 268 const struct container *cont = 269 &v->value.container; 270 if (container_to_json(cont->data, 271 temp, buf_len) != 0) 272 used = rte_tel_json_add_obj_json( 273 cb_data_buf, 274 buf_len, used, 275 v->name, temp); 276 if (!cont->keep) 277 rte_tel_data_free(cont->data); 278 } 279 } 280 } 281 used += prefix_used; 282 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 283 break; 284 case RTE_TEL_ARRAY_STRING: 285 case RTE_TEL_ARRAY_INT: 286 case RTE_TEL_ARRAY_U64: 287 case RTE_TEL_ARRAY_CONTAINER: 288 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 289 MAX_CMD_LEN, cmd); 290 cb_data_buf = &out_buf[prefix_used]; 291 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 292 293 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 294 for (i = 0; i < d->data_len; i++) 295 if (d->type == RTE_TEL_ARRAY_STRING) 296 used = rte_tel_json_add_array_string( 297 cb_data_buf, 298 buf_len, used, 299 d->data.array[i].sval); 300 else if (d->type == RTE_TEL_ARRAY_INT) 301 used = rte_tel_json_add_array_int(cb_data_buf, 302 buf_len, used, 303 d->data.array[i].ival); 304 else if (d->type == RTE_TEL_ARRAY_U64) 305 used = rte_tel_json_add_array_u64(cb_data_buf, 306 buf_len, used, 307 d->data.array[i].u64val); 308 else if (d->type == RTE_TEL_ARRAY_CONTAINER) { 309 char temp[buf_len]; 310 const struct container *rec_data = 311 &d->data.array[i].container; 312 if (container_to_json(rec_data->data, 313 temp, buf_len) != 0) 314 used = rte_tel_json_add_array_json( 315 cb_data_buf, 316 buf_len, used, temp); 317 if (!rec_data->keep) 318 rte_tel_data_free(rec_data->data); 319 } 320 used += prefix_used; 321 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 322 break; 323 } 324 if (write(s, out_buf, used) < 0) 325 perror("Error writing to socket"); 326 } 327 328 static void 329 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 330 { 331 struct rte_tel_data data; 332 333 int ret = fn(cmd, param, &data); 334 if (ret < 0) { 335 char out_buf[MAX_CMD_LEN + 10]; 336 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 337 MAX_CMD_LEN, cmd ? cmd : "none"); 338 if (write(s, out_buf, used) < 0) 339 perror("Error writing to socket"); 340 return; 341 } 342 output_json(cmd, &data, s); 343 } 344 345 static int 346 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 347 struct rte_tel_data *d) 348 { 349 return d->type = RTE_TEL_NULL; 350 } 351 352 static void * 353 client_handler(void *sock_id) 354 { 355 int s = (int)(uintptr_t)sock_id; 356 char buffer[1024]; 357 char info_str[1024]; 358 snprintf(info_str, sizeof(info_str), 359 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 360 telemetry_version, getpid(), MAX_OUTPUT_LEN); 361 if (write(s, info_str, strlen(info_str)) < 0) { 362 close(s); 363 return NULL; 364 } 365 366 /* receive data is not null terminated */ 367 int bytes = read(s, buffer, sizeof(buffer) - 1); 368 while (bytes > 0) { 369 buffer[bytes] = 0; 370 const char *cmd = strtok(buffer, ","); 371 const char *param = strtok(NULL, "\0"); 372 telemetry_cb fn = unknown_command; 373 int i; 374 375 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 376 rte_spinlock_lock(&callback_sl); 377 for (i = 0; i < num_callbacks; i++) 378 if (strcmp(cmd, callbacks[i].cmd) == 0) { 379 fn = callbacks[i].fn; 380 break; 381 } 382 rte_spinlock_unlock(&callback_sl); 383 } 384 perform_command(fn, cmd, param, s); 385 386 bytes = read(s, buffer, sizeof(buffer) - 1); 387 } 388 close(s); 389 __atomic_sub_fetch(&v2_clients, 1, __ATOMIC_RELAXED); 390 return NULL; 391 } 392 393 static void * 394 socket_listener(void *socket) 395 { 396 while (1) { 397 pthread_t th; 398 int rc; 399 struct socket *s = (struct socket *)socket; 400 int s_accepted = accept(s->sock, NULL, NULL); 401 if (s_accepted < 0) { 402 TMTY_LOG(ERR, "Error with accept, telemetry thread quitting\n"); 403 return NULL; 404 } 405 if (s->num_clients != NULL) { 406 uint16_t conns = __atomic_load_n(s->num_clients, 407 __ATOMIC_RELAXED); 408 if (conns >= MAX_CONNECTIONS) { 409 close(s_accepted); 410 continue; 411 } 412 __atomic_add_fetch(s->num_clients, 1, 413 __ATOMIC_RELAXED); 414 } 415 rc = pthread_create(&th, NULL, s->fn, 416 (void *)(uintptr_t)s_accepted); 417 if (rc != 0) { 418 TMTY_LOG(ERR, "Error with create client thread: %s\n", 419 strerror(rc)); 420 close(s_accepted); 421 if (s->num_clients != NULL) 422 __atomic_sub_fetch(s->num_clients, 1, 423 __ATOMIC_RELAXED); 424 continue; 425 } 426 pthread_detach(th); 427 } 428 return NULL; 429 } 430 431 static inline char * 432 get_socket_path(const char *runtime_dir, const int version) 433 { 434 static char path[PATH_MAX]; 435 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 436 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 437 return path; 438 } 439 440 static void 441 unlink_sockets(void) 442 { 443 if (v2_socket.path[0]) 444 unlink(v2_socket.path); 445 if (v1_socket.path[0]) 446 unlink(v1_socket.path); 447 } 448 449 static int 450 create_socket(char *path) 451 { 452 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 453 if (sock < 0) { 454 TMTY_LOG(ERR, "Error with socket creation, %s\n", strerror(errno)); 455 return -1; 456 } 457 458 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 459 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 460 TMTY_LOG(DEBUG, "Attempting socket bind to path '%s'\n", path); 461 462 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 463 struct stat st; 464 465 TMTY_LOG(DEBUG, "Initial bind to socket '%s' failed.\n", path); 466 467 /* first check if we have a runtime dir */ 468 if (stat(socket_dir, &st) < 0 || !S_ISDIR(st.st_mode)) { 469 TMTY_LOG(ERR, "Cannot access DPDK runtime directory: %s\n", socket_dir); 470 close(sock); 471 return -ENOENT; 472 } 473 474 /* check if current socket is active */ 475 if (connect(sock, (void *)&sun, sizeof(sun)) == 0) { 476 close(sock); 477 return -EADDRINUSE; 478 } 479 480 /* socket is not active, delete and attempt rebind */ 481 TMTY_LOG(DEBUG, "Attempting unlink and retrying bind\n"); 482 unlink(sun.sun_path); 483 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 484 TMTY_LOG(ERR, "Error binding socket: %s\n", strerror(errno)); 485 close(sock); 486 return -errno; /* if unlink failed, this will be -EADDRINUSE as above */ 487 } 488 } 489 490 if (listen(sock, 1) < 0) { 491 TMTY_LOG(ERR, "Error calling listen for socket: %s\n", strerror(errno)); 492 unlink(sun.sun_path); 493 close(sock); 494 return -errno; 495 } 496 TMTY_LOG(DEBUG, "Socket creation and binding ok\n"); 497 498 return sock; 499 } 500 501 static void 502 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 503 { 504 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 505 #if __GLIBC_PREREQ(2, 12) 506 pthread_setname_np(id, name); 507 #endif 508 #elif defined RTE_EXEC_ENV_FREEBSD 509 pthread_set_name_np(id, name); 510 #endif 511 } 512 513 static int 514 telemetry_legacy_init(void) 515 { 516 pthread_t t_old; 517 int rc; 518 519 if (num_legacy_callbacks == 1) { 520 TMTY_LOG(WARNING, "No legacy callbacks, legacy socket not created\n"); 521 return -1; 522 } 523 524 v1_socket.fn = legacy_client_handler; 525 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 526 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 527 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 528 return -1; 529 } 530 v1_socket.sock = create_socket(v1_socket.path); 531 if (v1_socket.sock < 0) { 532 v1_socket.path[0] = '\0'; 533 return -1; 534 } 535 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 536 if (rc != 0) { 537 TMTY_LOG(ERR, "Error with create legcay socket thread: %s\n", 538 strerror(rc)); 539 close(v1_socket.sock); 540 v1_socket.sock = -1; 541 unlink(v1_socket.path); 542 v1_socket.path[0] = '\0'; 543 return -1; 544 } 545 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 546 set_thread_name(t_old, "telemetry-v1"); 547 TMTY_LOG(DEBUG, "Legacy telemetry socket initialized ok\n"); 548 pthread_detach(t_old); 549 return 0; 550 } 551 552 static int 553 telemetry_v2_init(void) 554 { 555 char spath[sizeof(v2_socket.path)]; 556 pthread_t t_new; 557 short suffix = 0; 558 int rc; 559 560 v2_socket.num_clients = &v2_clients; 561 rte_telemetry_register_cmd("/", list_commands, 562 "Returns list of available commands, Takes no parameters"); 563 rte_telemetry_register_cmd("/info", json_info, 564 "Returns DPDK Telemetry information. Takes no parameters"); 565 rte_telemetry_register_cmd("/help", command_help, 566 "Returns help text for a command. Parameters: string command"); 567 v2_socket.fn = client_handler; 568 if (strlcpy(spath, get_socket_path(socket_dir, 2), sizeof(spath)) >= sizeof(spath)) { 569 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 570 return -1; 571 } 572 memcpy(v2_socket.path, spath, sizeof(v2_socket.path)); 573 574 v2_socket.sock = create_socket(v2_socket.path); 575 while (v2_socket.sock < 0) { 576 /* bail out on unexpected error, or suffix wrap-around */ 577 if (v2_socket.sock != -EADDRINUSE || suffix < 0) { 578 v2_socket.path[0] = '\0'; /* clear socket path */ 579 return -1; 580 } 581 /* add a suffix to the path if the basic version fails */ 582 if (snprintf(v2_socket.path, sizeof(v2_socket.path), "%s:%d", 583 spath, ++suffix) >= (int)sizeof(v2_socket.path)) { 584 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 585 return -1; 586 } 587 v2_socket.sock = create_socket(v2_socket.path); 588 } 589 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 590 if (rc != 0) { 591 TMTY_LOG(ERR, "Error with create socket thread: %s\n", 592 strerror(rc)); 593 close(v2_socket.sock); 594 v2_socket.sock = -1; 595 unlink(v2_socket.path); 596 v2_socket.path[0] = '\0'; 597 return -1; 598 } 599 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 600 set_thread_name(t_new, "telemetry-v2"); 601 pthread_detach(t_new); 602 atexit(unlink_sockets); 603 604 return 0; 605 } 606 607 #endif /* !RTE_EXEC_ENV_WINDOWS */ 608 609 int32_t 610 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset, 611 rte_log_fn log_fn, uint32_t registered_logtype) 612 { 613 telemetry_version = rte_version; 614 socket_dir = runtime_dir; 615 thread_cpuset = cpuset; 616 rte_log_ptr = log_fn; 617 logtype = registered_logtype; 618 619 #ifndef RTE_EXEC_ENV_WINDOWS 620 if (telemetry_v2_init() != 0) 621 return -1; 622 TMTY_LOG(DEBUG, "Telemetry initialized ok\n"); 623 telemetry_legacy_init(); 624 #endif /* RTE_EXEC_ENV_WINDOWS */ 625 626 return 0; 627 } 628