1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #ifndef RTE_EXEC_ENV_WINDOWS 6 #include <unistd.h> 7 #include <pthread.h> 8 #include <sys/socket.h> 9 #include <sys/un.h> 10 #include <sys/stat.h> 11 #include <dlfcn.h> 12 #endif /* !RTE_EXEC_ENV_WINDOWS */ 13 14 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 15 #undef RTE_USE_LIBBSD 16 #include <rte_string_fns.h> 17 #include <rte_common.h> 18 #include <rte_spinlock.h> 19 #include <rte_log.h> 20 21 #include "rte_telemetry.h" 22 #include "telemetry_json.h" 23 #include "telemetry_data.h" 24 #include "telemetry_internal.h" 25 26 #define MAX_CMD_LEN 56 27 #define MAX_HELP_LEN 64 28 #define MAX_OUTPUT_LEN (1024 * 16) 29 #define MAX_CONNECTIONS 10 30 31 #ifndef RTE_EXEC_ENV_WINDOWS 32 static void * 33 client_handler(void *socket); 34 #endif /* !RTE_EXEC_ENV_WINDOWS */ 35 36 struct cmd_callback { 37 char cmd[MAX_CMD_LEN]; 38 telemetry_cb fn; 39 char help[MAX_HELP_LEN]; 40 }; 41 42 #ifndef RTE_EXEC_ENV_WINDOWS 43 struct socket { 44 int sock; 45 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 46 handler fn; 47 uint16_t *num_clients; 48 }; 49 static struct socket v2_socket; /* socket for v2 telemetry */ 50 static struct socket v1_socket; /* socket for v1 telemetry */ 51 #endif /* !RTE_EXEC_ENV_WINDOWS */ 52 53 static const char *telemetry_version; /* save rte_version */ 54 static const char *socket_dir; /* runtime directory */ 55 static rte_cpuset_t *thread_cpuset; 56 static rte_log_fn rte_log_ptr; 57 static uint32_t logtype; 58 59 #define TMTY_LOG(l, ...) \ 60 rte_log_ptr(RTE_LOG_ ## l, logtype, "TELEMETRY: " __VA_ARGS__) 61 62 /* list of command callbacks, with one command registered by default */ 63 static struct cmd_callback *callbacks; 64 static int num_callbacks; /* How many commands are registered */ 65 /* Used when accessing or modifying list of command callbacks */ 66 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 67 #ifndef RTE_EXEC_ENV_WINDOWS 68 static uint16_t v2_clients; 69 #endif /* !RTE_EXEC_ENV_WINDOWS */ 70 71 int 72 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 73 { 74 struct cmd_callback *new_callbacks; 75 int i = 0; 76 77 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 78 || strlen(help) >= MAX_HELP_LEN) 79 return -EINVAL; 80 81 rte_spinlock_lock(&callback_sl); 82 new_callbacks = realloc(callbacks, sizeof(callbacks[0]) * (num_callbacks + 1)); 83 if (new_callbacks == NULL) { 84 rte_spinlock_unlock(&callback_sl); 85 return -ENOMEM; 86 } 87 callbacks = new_callbacks; 88 89 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 90 i++; 91 if (i != num_callbacks) 92 /* Move elements to keep the list alphabetical */ 93 memmove(callbacks + i + 1, callbacks + i, 94 sizeof(struct cmd_callback) * (num_callbacks - i)); 95 96 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 97 callbacks[i].fn = fn; 98 strlcpy(callbacks[i].help, help, MAX_HELP_LEN); 99 num_callbacks++; 100 rte_spinlock_unlock(&callback_sl); 101 102 return 0; 103 } 104 105 #ifndef RTE_EXEC_ENV_WINDOWS 106 107 static int 108 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 109 struct rte_tel_data *d) 110 { 111 int i; 112 113 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 114 rte_spinlock_lock(&callback_sl); 115 for (i = 0; i < num_callbacks; i++) 116 rte_tel_data_add_array_string(d, callbacks[i].cmd); 117 rte_spinlock_unlock(&callback_sl); 118 return 0; 119 } 120 121 static int 122 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 123 struct rte_tel_data *d) 124 { 125 rte_tel_data_start_dict(d); 126 rte_tel_data_add_dict_string(d, "version", telemetry_version); 127 rte_tel_data_add_dict_int(d, "pid", getpid()); 128 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 129 return 0; 130 } 131 132 static int 133 command_help(const char *cmd __rte_unused, const char *params, 134 struct rte_tel_data *d) 135 { 136 int i; 137 138 if (!params) 139 return -1; 140 rte_tel_data_start_dict(d); 141 rte_spinlock_lock(&callback_sl); 142 for (i = 0; i < num_callbacks; i++) 143 if (strcmp(params, callbacks[i].cmd) == 0) { 144 rte_tel_data_add_dict_string(d, params, 145 callbacks[i].help); 146 break; 147 } 148 rte_spinlock_unlock(&callback_sl); 149 if (i == num_callbacks) 150 return -1; 151 return 0; 152 } 153 154 static int 155 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 156 { 157 size_t used = 0; 158 unsigned int i; 159 160 if (d->type != RTE_TEL_ARRAY_U64 && d->type != RTE_TEL_ARRAY_INT 161 && d->type != RTE_TEL_ARRAY_STRING) 162 return snprintf(out_buf, buf_len, "null"); 163 164 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 165 if (d->type == RTE_TEL_ARRAY_U64) 166 for (i = 0; i < d->data_len; i++) 167 used = rte_tel_json_add_array_u64(out_buf, 168 buf_len, used, 169 d->data.array[i].u64val); 170 if (d->type == RTE_TEL_ARRAY_INT) 171 for (i = 0; i < d->data_len; i++) 172 used = rte_tel_json_add_array_int(out_buf, 173 buf_len, used, 174 d->data.array[i].ival); 175 if (d->type == RTE_TEL_ARRAY_STRING) 176 for (i = 0; i < d->data_len; i++) 177 used = rte_tel_json_add_array_string(out_buf, 178 buf_len, used, 179 d->data.array[i].sval); 180 return used; 181 } 182 183 static void 184 output_json(const char *cmd, const struct rte_tel_data *d, int s) 185 { 186 char out_buf[MAX_OUTPUT_LEN]; 187 188 char *cb_data_buf; 189 size_t buf_len, prefix_used, used = 0; 190 unsigned int i; 191 192 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 193 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 194 switch (d->type) { 195 case RTE_TEL_NULL: 196 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 197 MAX_CMD_LEN, cmd ? cmd : "none"); 198 break; 199 case RTE_TEL_STRING: 200 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":\"%.*s\"}", 201 MAX_CMD_LEN, cmd, 202 RTE_TEL_MAX_SINGLE_STRING_LEN, d->data.str); 203 break; 204 case RTE_TEL_DICT: 205 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 206 MAX_CMD_LEN, cmd); 207 cb_data_buf = &out_buf[prefix_used]; 208 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 209 210 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 211 for (i = 0; i < d->data_len; i++) { 212 const struct tel_dict_entry *v = &d->data.dict[i]; 213 switch (v->type) { 214 case RTE_TEL_STRING_VAL: 215 used = rte_tel_json_add_obj_str(cb_data_buf, 216 buf_len, used, 217 v->name, v->value.sval); 218 break; 219 case RTE_TEL_INT_VAL: 220 used = rte_tel_json_add_obj_int(cb_data_buf, 221 buf_len, used, 222 v->name, v->value.ival); 223 break; 224 case RTE_TEL_U64_VAL: 225 used = rte_tel_json_add_obj_u64(cb_data_buf, 226 buf_len, used, 227 v->name, v->value.u64val); 228 break; 229 case RTE_TEL_CONTAINER: 230 { 231 char temp[buf_len]; 232 const struct container *cont = 233 &v->value.container; 234 if (container_to_json(cont->data, 235 temp, buf_len) != 0) 236 used = rte_tel_json_add_obj_json( 237 cb_data_buf, 238 buf_len, used, 239 v->name, temp); 240 if (!cont->keep) 241 rte_tel_data_free(cont->data); 242 } 243 } 244 } 245 used += prefix_used; 246 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 247 break; 248 case RTE_TEL_ARRAY_STRING: 249 case RTE_TEL_ARRAY_INT: 250 case RTE_TEL_ARRAY_U64: 251 case RTE_TEL_ARRAY_CONTAINER: 252 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 253 MAX_CMD_LEN, cmd); 254 cb_data_buf = &out_buf[prefix_used]; 255 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 256 257 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 258 for (i = 0; i < d->data_len; i++) 259 if (d->type == RTE_TEL_ARRAY_STRING) 260 used = rte_tel_json_add_array_string( 261 cb_data_buf, 262 buf_len, used, 263 d->data.array[i].sval); 264 else if (d->type == RTE_TEL_ARRAY_INT) 265 used = rte_tel_json_add_array_int(cb_data_buf, 266 buf_len, used, 267 d->data.array[i].ival); 268 else if (d->type == RTE_TEL_ARRAY_U64) 269 used = rte_tel_json_add_array_u64(cb_data_buf, 270 buf_len, used, 271 d->data.array[i].u64val); 272 else if (d->type == RTE_TEL_ARRAY_CONTAINER) { 273 char temp[buf_len]; 274 const struct container *rec_data = 275 &d->data.array[i].container; 276 if (container_to_json(rec_data->data, 277 temp, buf_len) != 0) 278 used = rte_tel_json_add_array_json( 279 cb_data_buf, 280 buf_len, used, temp); 281 if (!rec_data->keep) 282 rte_tel_data_free(rec_data->data); 283 } 284 used += prefix_used; 285 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 286 break; 287 } 288 if (write(s, out_buf, used) < 0) 289 perror("Error writing to socket"); 290 } 291 292 static void 293 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 294 { 295 struct rte_tel_data data; 296 297 int ret = fn(cmd, param, &data); 298 if (ret < 0) { 299 char out_buf[MAX_CMD_LEN + 10]; 300 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 301 MAX_CMD_LEN, cmd ? cmd : "none"); 302 if (write(s, out_buf, used) < 0) 303 perror("Error writing to socket"); 304 return; 305 } 306 output_json(cmd, &data, s); 307 } 308 309 static int 310 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 311 struct rte_tel_data *d) 312 { 313 return d->type = RTE_TEL_NULL; 314 } 315 316 static void * 317 client_handler(void *sock_id) 318 { 319 int s = (int)(uintptr_t)sock_id; 320 char buffer[1024]; 321 char info_str[1024]; 322 snprintf(info_str, sizeof(info_str), 323 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 324 telemetry_version, getpid(), MAX_OUTPUT_LEN); 325 if (write(s, info_str, strlen(info_str)) < 0) { 326 close(s); 327 return NULL; 328 } 329 330 /* receive data is not null terminated */ 331 int bytes = read(s, buffer, sizeof(buffer) - 1); 332 while (bytes > 0) { 333 buffer[bytes] = 0; 334 const char *cmd = strtok(buffer, ","); 335 const char *param = strtok(NULL, "\0"); 336 telemetry_cb fn = unknown_command; 337 int i; 338 339 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 340 rte_spinlock_lock(&callback_sl); 341 for (i = 0; i < num_callbacks; i++) 342 if (strcmp(cmd, callbacks[i].cmd) == 0) { 343 fn = callbacks[i].fn; 344 break; 345 } 346 rte_spinlock_unlock(&callback_sl); 347 } 348 perform_command(fn, cmd, param, s); 349 350 bytes = read(s, buffer, sizeof(buffer) - 1); 351 } 352 close(s); 353 __atomic_sub_fetch(&v2_clients, 1, __ATOMIC_RELAXED); 354 return NULL; 355 } 356 357 static void * 358 socket_listener(void *socket) 359 { 360 while (1) { 361 pthread_t th; 362 int rc; 363 struct socket *s = (struct socket *)socket; 364 int s_accepted = accept(s->sock, NULL, NULL); 365 if (s_accepted < 0) { 366 TMTY_LOG(ERR, "Error with accept, telemetry thread quitting\n"); 367 return NULL; 368 } 369 if (s->num_clients != NULL) { 370 uint16_t conns = __atomic_load_n(s->num_clients, 371 __ATOMIC_RELAXED); 372 if (conns >= MAX_CONNECTIONS) { 373 close(s_accepted); 374 continue; 375 } 376 __atomic_add_fetch(s->num_clients, 1, 377 __ATOMIC_RELAXED); 378 } 379 rc = pthread_create(&th, NULL, s->fn, 380 (void *)(uintptr_t)s_accepted); 381 if (rc != 0) { 382 TMTY_LOG(ERR, "Error with create client thread: %s\n", 383 strerror(rc)); 384 close(s_accepted); 385 if (s->num_clients != NULL) 386 __atomic_sub_fetch(s->num_clients, 1, 387 __ATOMIC_RELAXED); 388 continue; 389 } 390 pthread_detach(th); 391 } 392 return NULL; 393 } 394 395 static inline char * 396 get_socket_path(const char *runtime_dir, const int version) 397 { 398 static char path[PATH_MAX]; 399 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 400 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 401 return path; 402 } 403 404 static void 405 unlink_sockets(void) 406 { 407 if (v2_socket.path[0]) 408 unlink(v2_socket.path); 409 if (v1_socket.path[0]) 410 unlink(v1_socket.path); 411 } 412 413 static int 414 create_socket(char *path) 415 { 416 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 417 if (sock < 0) { 418 TMTY_LOG(ERR, "Error with socket creation, %s\n", strerror(errno)); 419 return -1; 420 } 421 422 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 423 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 424 unlink(sun.sun_path); 425 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 426 struct stat st; 427 428 TMTY_LOG(ERR, "Error binding socket: %s\n", strerror(errno)); 429 if (stat(socket_dir, &st) < 0 || !S_ISDIR(st.st_mode)) 430 TMTY_LOG(ERR, "Cannot access DPDK runtime directory: %s\n", socket_dir); 431 sun.sun_path[0] = 0; 432 goto error; 433 } 434 435 if (listen(sock, 1) < 0) { 436 TMTY_LOG(ERR, "Error calling listen for socket: %s\n", strerror(errno)); 437 goto error; 438 } 439 440 return sock; 441 442 error: 443 close(sock); 444 unlink_sockets(); 445 return -1; 446 } 447 448 static void 449 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 450 { 451 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 452 #if __GLIBC_PREREQ(2, 12) 453 pthread_setname_np(id, name); 454 #endif 455 #elif defined RTE_EXEC_ENV_FREEBSD 456 pthread_set_name_np(id, name); 457 #endif 458 } 459 460 static int 461 telemetry_legacy_init(void) 462 { 463 pthread_t t_old; 464 int rc; 465 466 if (num_legacy_callbacks == 1) { 467 TMTY_LOG(WARNING, "No legacy callbacks, legacy socket not created\n"); 468 return -1; 469 } 470 471 v1_socket.fn = legacy_client_handler; 472 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 473 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 474 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 475 return -1; 476 } 477 v1_socket.sock = create_socket(v1_socket.path); 478 if (v1_socket.sock < 0) 479 return -1; 480 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 481 if (rc != 0) { 482 TMTY_LOG(ERR, "Error with create legcay socket thread: %s\n", 483 strerror(rc)); 484 close(v1_socket.sock); 485 v1_socket.sock = -1; 486 unlink(v1_socket.path); 487 v1_socket.path[0] = '\0'; 488 return -1; 489 } 490 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 491 set_thread_name(t_old, "telemetry-v1"); 492 TMTY_LOG(DEBUG, "Legacy telemetry socket initialized ok\n"); 493 return 0; 494 } 495 496 static int 497 telemetry_v2_init(void) 498 { 499 pthread_t t_new; 500 int rc; 501 502 v2_socket.num_clients = &v2_clients; 503 rte_telemetry_register_cmd("/", list_commands, 504 "Returns list of available commands, Takes no parameters"); 505 rte_telemetry_register_cmd("/info", json_info, 506 "Returns DPDK Telemetry information. Takes no parameters"); 507 rte_telemetry_register_cmd("/help", command_help, 508 "Returns help text for a command. Parameters: string command"); 509 v2_socket.fn = client_handler; 510 if (strlcpy(v2_socket.path, get_socket_path(socket_dir, 2), 511 sizeof(v2_socket.path)) >= sizeof(v2_socket.path)) { 512 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 513 return -1; 514 } 515 516 v2_socket.sock = create_socket(v2_socket.path); 517 if (v2_socket.sock < 0) 518 return -1; 519 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 520 if (rc != 0) { 521 TMTY_LOG(ERR, "Error with create socket thread: %s\n", 522 strerror(rc)); 523 close(v2_socket.sock); 524 v2_socket.sock = -1; 525 unlink(v2_socket.path); 526 v2_socket.path[0] = '\0'; 527 return -1; 528 } 529 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 530 set_thread_name(t_new, "telemetry-v2"); 531 atexit(unlink_sockets); 532 533 return 0; 534 } 535 536 #endif /* !RTE_EXEC_ENV_WINDOWS */ 537 538 int32_t 539 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset, 540 rte_log_fn log_fn, uint32_t registered_logtype) 541 { 542 telemetry_version = rte_version; 543 socket_dir = runtime_dir; 544 thread_cpuset = cpuset; 545 rte_log_ptr = log_fn; 546 logtype = registered_logtype; 547 548 #ifndef RTE_EXEC_ENV_WINDOWS 549 if (telemetry_v2_init() != 0) 550 return -1; 551 TMTY_LOG(DEBUG, "Telemetry initialized ok\n"); 552 telemetry_legacy_init(); 553 #endif /* RTE_EXEC_ENV_WINDOWS */ 554 555 return 0; 556 } 557