1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #ifndef RTE_EXEC_ENV_WINDOWS 6 #include <unistd.h> 7 #include <pthread.h> 8 #include <sys/socket.h> 9 #include <sys/un.h> 10 #include <sys/stat.h> 11 #endif /* !RTE_EXEC_ENV_WINDOWS */ 12 13 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 14 #undef RTE_USE_LIBBSD 15 #include <rte_string_fns.h> 16 #include <rte_common.h> 17 #include <rte_spinlock.h> 18 #include <rte_log.h> 19 20 #include "rte_telemetry.h" 21 #include "telemetry_json.h" 22 #include "telemetry_data.h" 23 #include "telemetry_internal.h" 24 25 #define MAX_CMD_LEN 56 26 #define MAX_OUTPUT_LEN (1024 * 16) 27 #define MAX_CONNECTIONS 10 28 29 #ifndef RTE_EXEC_ENV_WINDOWS 30 static void * 31 client_handler(void *socket); 32 #endif /* !RTE_EXEC_ENV_WINDOWS */ 33 34 struct cmd_callback { 35 char cmd[MAX_CMD_LEN]; 36 telemetry_cb fn; 37 char help[RTE_TEL_MAX_STRING_LEN]; 38 }; 39 40 #ifndef RTE_EXEC_ENV_WINDOWS 41 struct socket { 42 int sock; 43 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 44 handler fn; 45 uint16_t *num_clients; 46 }; 47 static struct socket v2_socket; /* socket for v2 telemetry */ 48 static struct socket v1_socket; /* socket for v1 telemetry */ 49 #endif /* !RTE_EXEC_ENV_WINDOWS */ 50 51 static const char *telemetry_version; /* save rte_version */ 52 static const char *socket_dir; /* runtime directory */ 53 static rte_cpuset_t *thread_cpuset; 54 static rte_log_fn rte_log_ptr; 55 static uint32_t logtype; 56 57 #define TMTY_LOG(l, ...) \ 58 rte_log_ptr(RTE_LOG_ ## l, logtype, "TELEMETRY: " __VA_ARGS__) 59 60 /* list of command callbacks, with one command registered by default */ 61 static struct cmd_callback *callbacks; 62 static int num_callbacks; /* How many commands are registered */ 63 /* Used when accessing or modifying list of command callbacks */ 64 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 65 #ifndef RTE_EXEC_ENV_WINDOWS 66 static uint16_t v2_clients; 67 #endif /* !RTE_EXEC_ENV_WINDOWS */ 68 69 int 70 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 71 { 72 struct cmd_callback *new_callbacks; 73 int i = 0; 74 75 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 76 || strlen(help) >= RTE_TEL_MAX_STRING_LEN) 77 return -EINVAL; 78 79 rte_spinlock_lock(&callback_sl); 80 new_callbacks = realloc(callbacks, sizeof(callbacks[0]) * (num_callbacks + 1)); 81 if (new_callbacks == NULL) { 82 rte_spinlock_unlock(&callback_sl); 83 return -ENOMEM; 84 } 85 callbacks = new_callbacks; 86 87 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 88 i++; 89 if (i != num_callbacks) 90 /* Move elements to keep the list alphabetical */ 91 memmove(callbacks + i + 1, callbacks + i, 92 sizeof(struct cmd_callback) * (num_callbacks - i)); 93 94 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 95 callbacks[i].fn = fn; 96 strlcpy(callbacks[i].help, help, RTE_TEL_MAX_STRING_LEN); 97 num_callbacks++; 98 rte_spinlock_unlock(&callback_sl); 99 100 return 0; 101 } 102 103 #ifndef RTE_EXEC_ENV_WINDOWS 104 105 static int 106 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 107 struct rte_tel_data *d) 108 { 109 int i; 110 111 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 112 rte_spinlock_lock(&callback_sl); 113 for (i = 0; i < num_callbacks; i++) 114 rte_tel_data_add_array_string(d, callbacks[i].cmd); 115 rte_spinlock_unlock(&callback_sl); 116 return 0; 117 } 118 119 static int 120 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 121 struct rte_tel_data *d) 122 { 123 rte_tel_data_start_dict(d); 124 rte_tel_data_add_dict_string(d, "version", telemetry_version); 125 rte_tel_data_add_dict_int(d, "pid", getpid()); 126 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 127 return 0; 128 } 129 130 static int 131 command_help(const char *cmd __rte_unused, const char *params, 132 struct rte_tel_data *d) 133 { 134 int i; 135 136 if (!params) 137 return -1; 138 rte_tel_data_start_dict(d); 139 rte_spinlock_lock(&callback_sl); 140 for (i = 0; i < num_callbacks; i++) 141 if (strcmp(params, callbacks[i].cmd) == 0) { 142 rte_tel_data_add_dict_string(d, params, 143 callbacks[i].help); 144 break; 145 } 146 rte_spinlock_unlock(&callback_sl); 147 if (i == num_callbacks) 148 return -1; 149 return 0; 150 } 151 152 static int 153 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 154 { 155 size_t used = 0; 156 unsigned int i; 157 158 if (d->type != RTE_TEL_DICT && d->type != RTE_TEL_ARRAY_U64 && 159 d->type != RTE_TEL_ARRAY_INT && d->type != RTE_TEL_ARRAY_STRING) 160 return snprintf(out_buf, buf_len, "null"); 161 162 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 163 if (d->type == RTE_TEL_ARRAY_U64) 164 for (i = 0; i < d->data_len; i++) 165 used = rte_tel_json_add_array_u64(out_buf, 166 buf_len, used, 167 d->data.array[i].u64val); 168 if (d->type == RTE_TEL_ARRAY_INT) 169 for (i = 0; i < d->data_len; i++) 170 used = rte_tel_json_add_array_int(out_buf, 171 buf_len, used, 172 d->data.array[i].ival); 173 if (d->type == RTE_TEL_ARRAY_STRING) 174 for (i = 0; i < d->data_len; i++) 175 used = rte_tel_json_add_array_string(out_buf, 176 buf_len, used, 177 d->data.array[i].sval); 178 if (d->type == RTE_TEL_DICT) 179 for (i = 0; i < d->data_len; i++) { 180 const struct tel_dict_entry *v = &d->data.dict[i]; 181 switch (v->type) { 182 case RTE_TEL_STRING_VAL: 183 used = rte_tel_json_add_obj_str(out_buf, 184 buf_len, used, 185 v->name, v->value.sval); 186 break; 187 case RTE_TEL_INT_VAL: 188 used = rte_tel_json_add_obj_int(out_buf, 189 buf_len, used, 190 v->name, v->value.ival); 191 break; 192 case RTE_TEL_U64_VAL: 193 used = rte_tel_json_add_obj_u64(out_buf, 194 buf_len, used, 195 v->name, v->value.u64val); 196 break; 197 case RTE_TEL_CONTAINER: 198 { 199 char temp[buf_len]; 200 const struct container *cont = 201 &v->value.container; 202 if (container_to_json(cont->data, 203 temp, buf_len) != 0) 204 used = rte_tel_json_add_obj_json( 205 out_buf, 206 buf_len, used, 207 v->name, temp); 208 if (!cont->keep) 209 rte_tel_data_free(cont->data); 210 break; 211 } 212 } 213 } 214 215 return used; 216 } 217 218 static void 219 output_json(const char *cmd, const struct rte_tel_data *d, int s) 220 { 221 char out_buf[MAX_OUTPUT_LEN]; 222 223 char *cb_data_buf; 224 size_t buf_len, prefix_used, used = 0; 225 unsigned int i; 226 227 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 228 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 229 switch (d->type) { 230 case RTE_TEL_NULL: 231 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 232 MAX_CMD_LEN, cmd ? cmd : "none"); 233 break; 234 case RTE_TEL_STRING: 235 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":\"%.*s\"}", 236 MAX_CMD_LEN, cmd, 237 RTE_TEL_MAX_SINGLE_STRING_LEN, d->data.str); 238 break; 239 case RTE_TEL_DICT: 240 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 241 MAX_CMD_LEN, cmd); 242 cb_data_buf = &out_buf[prefix_used]; 243 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 244 245 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 246 for (i = 0; i < d->data_len; i++) { 247 const struct tel_dict_entry *v = &d->data.dict[i]; 248 switch (v->type) { 249 case RTE_TEL_STRING_VAL: 250 used = rte_tel_json_add_obj_str(cb_data_buf, 251 buf_len, used, 252 v->name, v->value.sval); 253 break; 254 case RTE_TEL_INT_VAL: 255 used = rte_tel_json_add_obj_int(cb_data_buf, 256 buf_len, used, 257 v->name, v->value.ival); 258 break; 259 case RTE_TEL_U64_VAL: 260 used = rte_tel_json_add_obj_u64(cb_data_buf, 261 buf_len, used, 262 v->name, v->value.u64val); 263 break; 264 case RTE_TEL_CONTAINER: 265 { 266 char temp[buf_len]; 267 const struct container *cont = 268 &v->value.container; 269 if (container_to_json(cont->data, 270 temp, buf_len) != 0) 271 used = rte_tel_json_add_obj_json( 272 cb_data_buf, 273 buf_len, used, 274 v->name, temp); 275 if (!cont->keep) 276 rte_tel_data_free(cont->data); 277 } 278 } 279 } 280 used += prefix_used; 281 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 282 break; 283 case RTE_TEL_ARRAY_STRING: 284 case RTE_TEL_ARRAY_INT: 285 case RTE_TEL_ARRAY_U64: 286 case RTE_TEL_ARRAY_CONTAINER: 287 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 288 MAX_CMD_LEN, cmd); 289 cb_data_buf = &out_buf[prefix_used]; 290 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 291 292 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 293 for (i = 0; i < d->data_len; i++) 294 if (d->type == RTE_TEL_ARRAY_STRING) 295 used = rte_tel_json_add_array_string( 296 cb_data_buf, 297 buf_len, used, 298 d->data.array[i].sval); 299 else if (d->type == RTE_TEL_ARRAY_INT) 300 used = rte_tel_json_add_array_int(cb_data_buf, 301 buf_len, used, 302 d->data.array[i].ival); 303 else if (d->type == RTE_TEL_ARRAY_U64) 304 used = rte_tel_json_add_array_u64(cb_data_buf, 305 buf_len, used, 306 d->data.array[i].u64val); 307 else if (d->type == RTE_TEL_ARRAY_CONTAINER) { 308 char temp[buf_len]; 309 const struct container *rec_data = 310 &d->data.array[i].container; 311 if (container_to_json(rec_data->data, 312 temp, buf_len) != 0) 313 used = rte_tel_json_add_array_json( 314 cb_data_buf, 315 buf_len, used, temp); 316 if (!rec_data->keep) 317 rte_tel_data_free(rec_data->data); 318 } 319 used += prefix_used; 320 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 321 break; 322 } 323 if (write(s, out_buf, used) < 0) 324 perror("Error writing to socket"); 325 } 326 327 static void 328 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 329 { 330 struct rte_tel_data data; 331 332 int ret = fn(cmd, param, &data); 333 if (ret < 0) { 334 char out_buf[MAX_CMD_LEN + 10]; 335 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 336 MAX_CMD_LEN, cmd ? cmd : "none"); 337 if (write(s, out_buf, used) < 0) 338 perror("Error writing to socket"); 339 return; 340 } 341 output_json(cmd, &data, s); 342 } 343 344 static int 345 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 346 struct rte_tel_data *d) 347 { 348 return d->type = RTE_TEL_NULL; 349 } 350 351 static void * 352 client_handler(void *sock_id) 353 { 354 int s = (int)(uintptr_t)sock_id; 355 char buffer[1024]; 356 char info_str[1024]; 357 snprintf(info_str, sizeof(info_str), 358 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 359 telemetry_version, getpid(), MAX_OUTPUT_LEN); 360 if (write(s, info_str, strlen(info_str)) < 0) { 361 close(s); 362 return NULL; 363 } 364 365 /* receive data is not null terminated */ 366 int bytes = read(s, buffer, sizeof(buffer) - 1); 367 while (bytes > 0) { 368 buffer[bytes] = 0; 369 const char *cmd = strtok(buffer, ","); 370 const char *param = strtok(NULL, "\0"); 371 telemetry_cb fn = unknown_command; 372 int i; 373 374 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 375 rte_spinlock_lock(&callback_sl); 376 for (i = 0; i < num_callbacks; i++) 377 if (strcmp(cmd, callbacks[i].cmd) == 0) { 378 fn = callbacks[i].fn; 379 break; 380 } 381 rte_spinlock_unlock(&callback_sl); 382 } 383 perform_command(fn, cmd, param, s); 384 385 bytes = read(s, buffer, sizeof(buffer) - 1); 386 } 387 close(s); 388 __atomic_sub_fetch(&v2_clients, 1, __ATOMIC_RELAXED); 389 return NULL; 390 } 391 392 static void * 393 socket_listener(void *socket) 394 { 395 while (1) { 396 pthread_t th; 397 int rc; 398 struct socket *s = (struct socket *)socket; 399 int s_accepted = accept(s->sock, NULL, NULL); 400 if (s_accepted < 0) { 401 TMTY_LOG(ERR, "Error with accept, telemetry thread quitting\n"); 402 return NULL; 403 } 404 if (s->num_clients != NULL) { 405 uint16_t conns = __atomic_load_n(s->num_clients, 406 __ATOMIC_RELAXED); 407 if (conns >= MAX_CONNECTIONS) { 408 close(s_accepted); 409 continue; 410 } 411 __atomic_add_fetch(s->num_clients, 1, 412 __ATOMIC_RELAXED); 413 } 414 rc = pthread_create(&th, NULL, s->fn, 415 (void *)(uintptr_t)s_accepted); 416 if (rc != 0) { 417 TMTY_LOG(ERR, "Error with create client thread: %s\n", 418 strerror(rc)); 419 close(s_accepted); 420 if (s->num_clients != NULL) 421 __atomic_sub_fetch(s->num_clients, 1, 422 __ATOMIC_RELAXED); 423 continue; 424 } 425 pthread_detach(th); 426 } 427 return NULL; 428 } 429 430 static inline char * 431 get_socket_path(const char *runtime_dir, const int version) 432 { 433 static char path[PATH_MAX]; 434 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 435 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 436 return path; 437 } 438 439 static void 440 unlink_sockets(void) 441 { 442 if (v2_socket.path[0]) 443 unlink(v2_socket.path); 444 if (v1_socket.path[0]) 445 unlink(v1_socket.path); 446 } 447 448 static int 449 create_socket(char *path) 450 { 451 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 452 if (sock < 0) { 453 TMTY_LOG(ERR, "Error with socket creation, %s\n", strerror(errno)); 454 return -1; 455 } 456 457 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 458 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 459 TMTY_LOG(DEBUG, "Attempting socket bind to path '%s'\n", path); 460 461 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 462 struct stat st; 463 464 TMTY_LOG(DEBUG, "Initial bind to socket '%s' failed.\n", path); 465 466 /* first check if we have a runtime dir */ 467 if (stat(socket_dir, &st) < 0 || !S_ISDIR(st.st_mode)) { 468 TMTY_LOG(ERR, "Cannot access DPDK runtime directory: %s\n", socket_dir); 469 close(sock); 470 return -ENOENT; 471 } 472 473 /* check if current socket is active */ 474 if (connect(sock, (void *)&sun, sizeof(sun)) == 0) { 475 close(sock); 476 return -EADDRINUSE; 477 } 478 479 /* socket is not active, delete and attempt rebind */ 480 TMTY_LOG(DEBUG, "Attempting unlink and retrying bind\n"); 481 unlink(sun.sun_path); 482 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 483 TMTY_LOG(ERR, "Error binding socket: %s\n", strerror(errno)); 484 close(sock); 485 return -errno; /* if unlink failed, this will be -EADDRINUSE as above */ 486 } 487 } 488 489 if (listen(sock, 1) < 0) { 490 TMTY_LOG(ERR, "Error calling listen for socket: %s\n", strerror(errno)); 491 unlink(sun.sun_path); 492 close(sock); 493 return -errno; 494 } 495 TMTY_LOG(DEBUG, "Socket creation and binding ok\n"); 496 497 return sock; 498 } 499 500 static void 501 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 502 { 503 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 504 #if __GLIBC_PREREQ(2, 12) 505 pthread_setname_np(id, name); 506 #endif 507 #elif defined RTE_EXEC_ENV_FREEBSD 508 pthread_set_name_np(id, name); 509 #endif 510 } 511 512 static int 513 telemetry_legacy_init(void) 514 { 515 pthread_t t_old; 516 int rc; 517 518 if (num_legacy_callbacks == 1) { 519 TMTY_LOG(WARNING, "No legacy callbacks, legacy socket not created\n"); 520 return -1; 521 } 522 523 v1_socket.fn = legacy_client_handler; 524 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 525 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 526 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 527 return -1; 528 } 529 v1_socket.sock = create_socket(v1_socket.path); 530 if (v1_socket.sock < 0) { 531 v1_socket.path[0] = '\0'; 532 return -1; 533 } 534 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 535 if (rc != 0) { 536 TMTY_LOG(ERR, "Error with create legacy socket thread: %s\n", 537 strerror(rc)); 538 close(v1_socket.sock); 539 v1_socket.sock = -1; 540 unlink(v1_socket.path); 541 v1_socket.path[0] = '\0'; 542 return -1; 543 } 544 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 545 set_thread_name(t_old, "telemetry-v1"); 546 TMTY_LOG(DEBUG, "Legacy telemetry socket initialized ok\n"); 547 pthread_detach(t_old); 548 return 0; 549 } 550 551 static int 552 telemetry_v2_init(void) 553 { 554 char spath[sizeof(v2_socket.path)]; 555 pthread_t t_new; 556 short suffix = 0; 557 int rc; 558 559 v2_socket.num_clients = &v2_clients; 560 rte_telemetry_register_cmd("/", list_commands, 561 "Returns list of available commands, Takes no parameters"); 562 rte_telemetry_register_cmd("/info", json_info, 563 "Returns DPDK Telemetry information. Takes no parameters"); 564 rte_telemetry_register_cmd("/help", command_help, 565 "Returns help text for a command. Parameters: string command"); 566 v2_socket.fn = client_handler; 567 if (strlcpy(spath, get_socket_path(socket_dir, 2), sizeof(spath)) >= sizeof(spath)) { 568 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 569 return -1; 570 } 571 memcpy(v2_socket.path, spath, sizeof(v2_socket.path)); 572 573 v2_socket.sock = create_socket(v2_socket.path); 574 while (v2_socket.sock < 0) { 575 /* bail out on unexpected error, or suffix wrap-around */ 576 if (v2_socket.sock != -EADDRINUSE || suffix < 0) { 577 v2_socket.path[0] = '\0'; /* clear socket path */ 578 return -1; 579 } 580 /* add a suffix to the path if the basic version fails */ 581 if (snprintf(v2_socket.path, sizeof(v2_socket.path), "%s:%d", 582 spath, ++suffix) >= (int)sizeof(v2_socket.path)) { 583 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 584 return -1; 585 } 586 v2_socket.sock = create_socket(v2_socket.path); 587 } 588 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 589 if (rc != 0) { 590 TMTY_LOG(ERR, "Error with create socket thread: %s\n", 591 strerror(rc)); 592 close(v2_socket.sock); 593 v2_socket.sock = -1; 594 unlink(v2_socket.path); 595 v2_socket.path[0] = '\0'; 596 return -1; 597 } 598 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 599 set_thread_name(t_new, "telemetry-v2"); 600 pthread_detach(t_new); 601 atexit(unlink_sockets); 602 603 return 0; 604 } 605 606 #endif /* !RTE_EXEC_ENV_WINDOWS */ 607 608 int32_t 609 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset, 610 rte_log_fn log_fn, uint32_t registered_logtype) 611 { 612 telemetry_version = rte_version; 613 socket_dir = runtime_dir; 614 thread_cpuset = cpuset; 615 rte_log_ptr = log_fn; 616 logtype = registered_logtype; 617 618 #ifndef RTE_EXEC_ENV_WINDOWS 619 if (telemetry_v2_init() != 0) 620 return -1; 621 TMTY_LOG(DEBUG, "Telemetry initialized ok\n"); 622 telemetry_legacy_init(); 623 #endif /* RTE_EXEC_ENV_WINDOWS */ 624 625 return 0; 626 } 627