1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #include <errno.h> 6 #include <stdlib.h> 7 #ifndef RTE_EXEC_ENV_WINDOWS 8 #include <unistd.h> 9 #include <pthread.h> 10 #include <sys/socket.h> 11 #include <sys/un.h> 12 #include <sys/stat.h> 13 #endif /* !RTE_EXEC_ENV_WINDOWS */ 14 15 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 16 #undef RTE_USE_LIBBSD 17 #include <rte_string_fns.h> 18 #include <rte_common.h> 19 #include <rte_spinlock.h> 20 #include <rte_log.h> 21 22 #include "rte_telemetry.h" 23 #include "telemetry_json.h" 24 #include "telemetry_data.h" 25 #include "telemetry_internal.h" 26 27 #define MAX_CMD_LEN 56 28 #define MAX_OUTPUT_LEN (1024 * 16) 29 #define MAX_CONNECTIONS 10 30 31 #ifndef RTE_EXEC_ENV_WINDOWS 32 static void * 33 client_handler(void *socket); 34 #endif /* !RTE_EXEC_ENV_WINDOWS */ 35 36 struct cmd_callback { 37 char cmd[MAX_CMD_LEN]; 38 telemetry_cb fn; 39 char help[RTE_TEL_MAX_STRING_LEN]; 40 }; 41 42 #ifndef RTE_EXEC_ENV_WINDOWS 43 struct socket { 44 int sock; 45 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 46 handler fn; 47 uint16_t *num_clients; 48 }; 49 static struct socket v2_socket; /* socket for v2 telemetry */ 50 static struct socket v1_socket; /* socket for v1 telemetry */ 51 #endif /* !RTE_EXEC_ENV_WINDOWS */ 52 53 static const char *telemetry_version; /* save rte_version */ 54 static const char *socket_dir; /* runtime directory */ 55 static rte_cpuset_t *thread_cpuset; 56 static rte_log_fn rte_log_ptr; 57 static uint32_t logtype; 58 59 #define TMTY_LOG(l, ...) \ 60 rte_log_ptr(RTE_LOG_ ## l, logtype, "TELEMETRY: " __VA_ARGS__) 61 62 /* list of command callbacks, with one command registered by default */ 63 static struct cmd_callback *callbacks; 64 static int num_callbacks; /* How many commands are registered */ 65 /* Used when accessing or modifying list of command callbacks */ 66 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 67 #ifndef RTE_EXEC_ENV_WINDOWS 68 static uint16_t v2_clients; 69 #endif /* !RTE_EXEC_ENV_WINDOWS */ 70 71 int 72 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 73 { 74 struct cmd_callback *new_callbacks; 75 int i = 0; 76 77 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 78 || strlen(help) >= RTE_TEL_MAX_STRING_LEN) 79 return -EINVAL; 80 81 rte_spinlock_lock(&callback_sl); 82 new_callbacks = realloc(callbacks, sizeof(callbacks[0]) * (num_callbacks + 1)); 83 if (new_callbacks == NULL) { 84 rte_spinlock_unlock(&callback_sl); 85 return -ENOMEM; 86 } 87 callbacks = new_callbacks; 88 89 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 90 i++; 91 if (i != num_callbacks) 92 /* Move elements to keep the list alphabetical */ 93 memmove(callbacks + i + 1, callbacks + i, 94 sizeof(struct cmd_callback) * (num_callbacks - i)); 95 96 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 97 callbacks[i].fn = fn; 98 strlcpy(callbacks[i].help, help, RTE_TEL_MAX_STRING_LEN); 99 num_callbacks++; 100 rte_spinlock_unlock(&callback_sl); 101 102 return 0; 103 } 104 105 #ifndef RTE_EXEC_ENV_WINDOWS 106 107 static int 108 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 109 struct rte_tel_data *d) 110 { 111 int i; 112 113 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 114 rte_spinlock_lock(&callback_sl); 115 for (i = 0; i < num_callbacks; i++) 116 rte_tel_data_add_array_string(d, callbacks[i].cmd); 117 rte_spinlock_unlock(&callback_sl); 118 return 0; 119 } 120 121 static int 122 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 123 struct rte_tel_data *d) 124 { 125 rte_tel_data_start_dict(d); 126 rte_tel_data_add_dict_string(d, "version", telemetry_version); 127 rte_tel_data_add_dict_int(d, "pid", getpid()); 128 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 129 return 0; 130 } 131 132 static int 133 command_help(const char *cmd __rte_unused, const char *params, 134 struct rte_tel_data *d) 135 { 136 int i; 137 138 if (!params) 139 return -1; 140 rte_tel_data_start_dict(d); 141 rte_spinlock_lock(&callback_sl); 142 for (i = 0; i < num_callbacks; i++) 143 if (strcmp(params, callbacks[i].cmd) == 0) { 144 rte_tel_data_add_dict_string(d, params, 145 callbacks[i].help); 146 break; 147 } 148 rte_spinlock_unlock(&callback_sl); 149 if (i == num_callbacks) 150 return -1; 151 return 0; 152 } 153 154 static int 155 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 156 { 157 size_t used = 0; 158 unsigned int i; 159 160 if (d->type != RTE_TEL_DICT && d->type != RTE_TEL_ARRAY_U64 && 161 d->type != RTE_TEL_ARRAY_INT && d->type != RTE_TEL_ARRAY_STRING) 162 return snprintf(out_buf, buf_len, "null"); 163 164 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 165 if (d->type == RTE_TEL_ARRAY_U64) 166 for (i = 0; i < d->data_len; i++) 167 used = rte_tel_json_add_array_u64(out_buf, 168 buf_len, used, 169 d->data.array[i].u64val); 170 if (d->type == RTE_TEL_ARRAY_INT) 171 for (i = 0; i < d->data_len; i++) 172 used = rte_tel_json_add_array_int(out_buf, 173 buf_len, used, 174 d->data.array[i].ival); 175 if (d->type == RTE_TEL_ARRAY_STRING) 176 for (i = 0; i < d->data_len; i++) 177 used = rte_tel_json_add_array_string(out_buf, 178 buf_len, used, 179 d->data.array[i].sval); 180 if (d->type == RTE_TEL_DICT) 181 for (i = 0; i < d->data_len; i++) { 182 const struct tel_dict_entry *v = &d->data.dict[i]; 183 switch (v->type) { 184 case RTE_TEL_STRING_VAL: 185 used = rte_tel_json_add_obj_str(out_buf, 186 buf_len, used, 187 v->name, v->value.sval); 188 break; 189 case RTE_TEL_INT_VAL: 190 used = rte_tel_json_add_obj_int(out_buf, 191 buf_len, used, 192 v->name, v->value.ival); 193 break; 194 case RTE_TEL_U64_VAL: 195 used = rte_tel_json_add_obj_u64(out_buf, 196 buf_len, used, 197 v->name, v->value.u64val); 198 break; 199 case RTE_TEL_CONTAINER: 200 { 201 char temp[buf_len]; 202 const struct container *cont = 203 &v->value.container; 204 if (container_to_json(cont->data, 205 temp, buf_len) != 0) 206 used = rte_tel_json_add_obj_json( 207 out_buf, 208 buf_len, used, 209 v->name, temp); 210 if (!cont->keep) 211 rte_tel_data_free(cont->data); 212 break; 213 } 214 } 215 } 216 217 return used; 218 } 219 220 static void 221 output_json(const char *cmd, const struct rte_tel_data *d, int s) 222 { 223 char out_buf[MAX_OUTPUT_LEN]; 224 225 char *cb_data_buf; 226 size_t buf_len, prefix_used, used = 0; 227 unsigned int i; 228 229 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 230 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 231 switch (d->type) { 232 case RTE_TEL_NULL: 233 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 234 MAX_CMD_LEN, cmd ? cmd : "none"); 235 break; 236 case RTE_TEL_STRING: 237 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":\"%.*s\"}", 238 MAX_CMD_LEN, cmd, 239 RTE_TEL_MAX_SINGLE_STRING_LEN, d->data.str); 240 break; 241 case RTE_TEL_DICT: 242 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 243 MAX_CMD_LEN, cmd); 244 cb_data_buf = &out_buf[prefix_used]; 245 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 246 247 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 248 for (i = 0; i < d->data_len; i++) { 249 const struct tel_dict_entry *v = &d->data.dict[i]; 250 switch (v->type) { 251 case RTE_TEL_STRING_VAL: 252 used = rte_tel_json_add_obj_str(cb_data_buf, 253 buf_len, used, 254 v->name, v->value.sval); 255 break; 256 case RTE_TEL_INT_VAL: 257 used = rte_tel_json_add_obj_int(cb_data_buf, 258 buf_len, used, 259 v->name, v->value.ival); 260 break; 261 case RTE_TEL_U64_VAL: 262 used = rte_tel_json_add_obj_u64(cb_data_buf, 263 buf_len, used, 264 v->name, v->value.u64val); 265 break; 266 case RTE_TEL_CONTAINER: 267 { 268 char temp[buf_len]; 269 const struct container *cont = 270 &v->value.container; 271 if (container_to_json(cont->data, 272 temp, buf_len) != 0) 273 used = rte_tel_json_add_obj_json( 274 cb_data_buf, 275 buf_len, used, 276 v->name, temp); 277 if (!cont->keep) 278 rte_tel_data_free(cont->data); 279 } 280 } 281 } 282 used += prefix_used; 283 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 284 break; 285 case RTE_TEL_ARRAY_STRING: 286 case RTE_TEL_ARRAY_INT: 287 case RTE_TEL_ARRAY_U64: 288 case RTE_TEL_ARRAY_CONTAINER: 289 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 290 MAX_CMD_LEN, cmd); 291 cb_data_buf = &out_buf[prefix_used]; 292 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 293 294 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 295 for (i = 0; i < d->data_len; i++) 296 if (d->type == RTE_TEL_ARRAY_STRING) 297 used = rte_tel_json_add_array_string( 298 cb_data_buf, 299 buf_len, used, 300 d->data.array[i].sval); 301 else if (d->type == RTE_TEL_ARRAY_INT) 302 used = rte_tel_json_add_array_int(cb_data_buf, 303 buf_len, used, 304 d->data.array[i].ival); 305 else if (d->type == RTE_TEL_ARRAY_U64) 306 used = rte_tel_json_add_array_u64(cb_data_buf, 307 buf_len, used, 308 d->data.array[i].u64val); 309 else if (d->type == RTE_TEL_ARRAY_CONTAINER) { 310 char temp[buf_len]; 311 const struct container *rec_data = 312 &d->data.array[i].container; 313 if (container_to_json(rec_data->data, 314 temp, buf_len) != 0) 315 used = rte_tel_json_add_array_json( 316 cb_data_buf, 317 buf_len, used, temp); 318 if (!rec_data->keep) 319 rte_tel_data_free(rec_data->data); 320 } 321 used += prefix_used; 322 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 323 break; 324 } 325 if (write(s, out_buf, used) < 0) 326 perror("Error writing to socket"); 327 } 328 329 static void 330 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 331 { 332 struct rte_tel_data data; 333 334 int ret = fn(cmd, param, &data); 335 if (ret < 0) { 336 char out_buf[MAX_CMD_LEN + 10]; 337 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 338 MAX_CMD_LEN, cmd ? cmd : "none"); 339 if (write(s, out_buf, used) < 0) 340 perror("Error writing to socket"); 341 return; 342 } 343 output_json(cmd, &data, s); 344 } 345 346 static int 347 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 348 struct rte_tel_data *d) 349 { 350 return d->type = RTE_TEL_NULL; 351 } 352 353 static void * 354 client_handler(void *sock_id) 355 { 356 int s = (int)(uintptr_t)sock_id; 357 char buffer[1024]; 358 char info_str[1024]; 359 snprintf(info_str, sizeof(info_str), 360 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 361 telemetry_version, getpid(), MAX_OUTPUT_LEN); 362 if (write(s, info_str, strlen(info_str)) < 0) { 363 close(s); 364 return NULL; 365 } 366 367 /* receive data is not null terminated */ 368 int bytes = read(s, buffer, sizeof(buffer) - 1); 369 while (bytes > 0) { 370 buffer[bytes] = 0; 371 const char *cmd = strtok(buffer, ","); 372 const char *param = strtok(NULL, "\0"); 373 telemetry_cb fn = unknown_command; 374 int i; 375 376 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 377 rte_spinlock_lock(&callback_sl); 378 for (i = 0; i < num_callbacks; i++) 379 if (strcmp(cmd, callbacks[i].cmd) == 0) { 380 fn = callbacks[i].fn; 381 break; 382 } 383 rte_spinlock_unlock(&callback_sl); 384 } 385 perform_command(fn, cmd, param, s); 386 387 bytes = read(s, buffer, sizeof(buffer) - 1); 388 } 389 close(s); 390 __atomic_sub_fetch(&v2_clients, 1, __ATOMIC_RELAXED); 391 return NULL; 392 } 393 394 static void * 395 socket_listener(void *socket) 396 { 397 while (1) { 398 pthread_t th; 399 int rc; 400 struct socket *s = (struct socket *)socket; 401 int s_accepted = accept(s->sock, NULL, NULL); 402 if (s_accepted < 0) { 403 TMTY_LOG(ERR, "Error with accept, telemetry thread quitting\n"); 404 return NULL; 405 } 406 if (s->num_clients != NULL) { 407 uint16_t conns = __atomic_load_n(s->num_clients, 408 __ATOMIC_RELAXED); 409 if (conns >= MAX_CONNECTIONS) { 410 close(s_accepted); 411 continue; 412 } 413 __atomic_add_fetch(s->num_clients, 1, 414 __ATOMIC_RELAXED); 415 } 416 rc = pthread_create(&th, NULL, s->fn, 417 (void *)(uintptr_t)s_accepted); 418 if (rc != 0) { 419 TMTY_LOG(ERR, "Error with create client thread: %s\n", 420 strerror(rc)); 421 close(s_accepted); 422 if (s->num_clients != NULL) 423 __atomic_sub_fetch(s->num_clients, 1, 424 __ATOMIC_RELAXED); 425 continue; 426 } 427 pthread_detach(th); 428 } 429 return NULL; 430 } 431 432 static inline char * 433 get_socket_path(const char *runtime_dir, const int version) 434 { 435 static char path[PATH_MAX]; 436 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 437 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 438 return path; 439 } 440 441 static void 442 unlink_sockets(void) 443 { 444 if (v2_socket.path[0]) 445 unlink(v2_socket.path); 446 if (v1_socket.path[0]) 447 unlink(v1_socket.path); 448 } 449 450 static int 451 create_socket(char *path) 452 { 453 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 454 if (sock < 0) { 455 TMTY_LOG(ERR, "Error with socket creation, %s\n", strerror(errno)); 456 return -1; 457 } 458 459 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 460 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 461 TMTY_LOG(DEBUG, "Attempting socket bind to path '%s'\n", path); 462 463 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 464 struct stat st; 465 466 TMTY_LOG(DEBUG, "Initial bind to socket '%s' failed.\n", path); 467 468 /* first check if we have a runtime dir */ 469 if (stat(socket_dir, &st) < 0 || !S_ISDIR(st.st_mode)) { 470 TMTY_LOG(ERR, "Cannot access DPDK runtime directory: %s\n", socket_dir); 471 close(sock); 472 return -ENOENT; 473 } 474 475 /* check if current socket is active */ 476 if (connect(sock, (void *)&sun, sizeof(sun)) == 0) { 477 close(sock); 478 return -EADDRINUSE; 479 } 480 481 /* socket is not active, delete and attempt rebind */ 482 TMTY_LOG(DEBUG, "Attempting unlink and retrying bind\n"); 483 unlink(sun.sun_path); 484 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 485 TMTY_LOG(ERR, "Error binding socket: %s\n", strerror(errno)); 486 close(sock); 487 return -errno; /* if unlink failed, this will be -EADDRINUSE as above */ 488 } 489 } 490 491 if (listen(sock, 1) < 0) { 492 TMTY_LOG(ERR, "Error calling listen for socket: %s\n", strerror(errno)); 493 unlink(sun.sun_path); 494 close(sock); 495 return -errno; 496 } 497 TMTY_LOG(DEBUG, "Socket creation and binding ok\n"); 498 499 return sock; 500 } 501 502 static void 503 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 504 { 505 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 506 #if __GLIBC_PREREQ(2, 12) 507 pthread_setname_np(id, name); 508 #endif 509 #elif defined RTE_EXEC_ENV_FREEBSD 510 pthread_set_name_np(id, name); 511 #endif 512 } 513 514 static int 515 telemetry_legacy_init(void) 516 { 517 pthread_t t_old; 518 int rc; 519 520 if (num_legacy_callbacks == 1) { 521 TMTY_LOG(WARNING, "No legacy callbacks, legacy socket not created\n"); 522 return -1; 523 } 524 525 v1_socket.fn = legacy_client_handler; 526 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 527 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 528 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 529 return -1; 530 } 531 v1_socket.sock = create_socket(v1_socket.path); 532 if (v1_socket.sock < 0) { 533 v1_socket.path[0] = '\0'; 534 return -1; 535 } 536 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 537 if (rc != 0) { 538 TMTY_LOG(ERR, "Error with create legacy socket thread: %s\n", 539 strerror(rc)); 540 close(v1_socket.sock); 541 v1_socket.sock = -1; 542 unlink(v1_socket.path); 543 v1_socket.path[0] = '\0'; 544 return -1; 545 } 546 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 547 set_thread_name(t_old, "telemetry-v1"); 548 TMTY_LOG(DEBUG, "Legacy telemetry socket initialized ok\n"); 549 pthread_detach(t_old); 550 return 0; 551 } 552 553 static int 554 telemetry_v2_init(void) 555 { 556 char spath[sizeof(v2_socket.path)]; 557 pthread_t t_new; 558 short suffix = 0; 559 int rc; 560 561 v2_socket.num_clients = &v2_clients; 562 rte_telemetry_register_cmd("/", list_commands, 563 "Returns list of available commands, Takes no parameters"); 564 rte_telemetry_register_cmd("/info", json_info, 565 "Returns DPDK Telemetry information. Takes no parameters"); 566 rte_telemetry_register_cmd("/help", command_help, 567 "Returns help text for a command. Parameters: string command"); 568 v2_socket.fn = client_handler; 569 if (strlcpy(spath, get_socket_path(socket_dir, 2), sizeof(spath)) >= sizeof(spath)) { 570 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 571 return -1; 572 } 573 memcpy(v2_socket.path, spath, sizeof(v2_socket.path)); 574 575 v2_socket.sock = create_socket(v2_socket.path); 576 while (v2_socket.sock < 0) { 577 /* bail out on unexpected error, or suffix wrap-around */ 578 if (v2_socket.sock != -EADDRINUSE || suffix < 0) { 579 v2_socket.path[0] = '\0'; /* clear socket path */ 580 return -1; 581 } 582 /* add a suffix to the path if the basic version fails */ 583 if (snprintf(v2_socket.path, sizeof(v2_socket.path), "%s:%d", 584 spath, ++suffix) >= (int)sizeof(v2_socket.path)) { 585 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 586 return -1; 587 } 588 v2_socket.sock = create_socket(v2_socket.path); 589 } 590 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 591 if (rc != 0) { 592 TMTY_LOG(ERR, "Error with create socket thread: %s\n", 593 strerror(rc)); 594 close(v2_socket.sock); 595 v2_socket.sock = -1; 596 unlink(v2_socket.path); 597 v2_socket.path[0] = '\0'; 598 return -1; 599 } 600 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 601 set_thread_name(t_new, "telemetry-v2"); 602 pthread_detach(t_new); 603 atexit(unlink_sockets); 604 605 return 0; 606 } 607 608 #endif /* !RTE_EXEC_ENV_WINDOWS */ 609 610 int32_t 611 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset, 612 rte_log_fn log_fn, uint32_t registered_logtype) 613 { 614 telemetry_version = rte_version; 615 socket_dir = runtime_dir; 616 thread_cpuset = cpuset; 617 rte_log_ptr = log_fn; 618 logtype = registered_logtype; 619 620 #ifndef RTE_EXEC_ENV_WINDOWS 621 if (telemetry_v2_init() != 0) 622 return -1; 623 TMTY_LOG(DEBUG, "Telemetry initialized ok\n"); 624 telemetry_legacy_init(); 625 #endif /* RTE_EXEC_ENV_WINDOWS */ 626 627 return 0; 628 } 629