1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #ifndef RTE_EXEC_ENV_WINDOWS 6 #include <unistd.h> 7 #include <pthread.h> 8 #include <sys/socket.h> 9 #include <sys/un.h> 10 #include <dlfcn.h> 11 #endif /* !RTE_EXEC_ENV_WINDOWS */ 12 13 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 14 #undef RTE_USE_LIBBSD 15 #include <rte_string_fns.h> 16 #include <rte_common.h> 17 #include <rte_spinlock.h> 18 #include <rte_log.h> 19 20 #include "rte_telemetry.h" 21 #include "telemetry_json.h" 22 #include "telemetry_data.h" 23 #include "telemetry_internal.h" 24 25 #define MAX_CMD_LEN 56 26 #define MAX_HELP_LEN 64 27 #define MAX_OUTPUT_LEN (1024 * 16) 28 #define MAX_CONNECTIONS 10 29 30 /** Maximum number of telemetry callbacks. */ 31 #define TELEMETRY_MAX_CALLBACKS 64 32 33 #ifndef RTE_EXEC_ENV_WINDOWS 34 static void * 35 client_handler(void *socket); 36 #endif /* !RTE_EXEC_ENV_WINDOWS */ 37 38 struct cmd_callback { 39 char cmd[MAX_CMD_LEN]; 40 telemetry_cb fn; 41 char help[MAX_HELP_LEN]; 42 }; 43 44 #ifndef RTE_EXEC_ENV_WINDOWS 45 struct socket { 46 int sock; 47 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 48 handler fn; 49 uint16_t *num_clients; 50 }; 51 static struct socket v2_socket; /* socket for v2 telemetry */ 52 static struct socket v1_socket; /* socket for v1 telemetry */ 53 #endif /* !RTE_EXEC_ENV_WINDOWS */ 54 55 static const char *telemetry_version; /* save rte_version */ 56 static const char *socket_dir; /* runtime directory */ 57 static rte_cpuset_t *thread_cpuset; 58 static rte_log_fn rte_log_ptr; 59 static uint32_t logtype; 60 61 #define TMTY_LOG(l, ...) \ 62 rte_log_ptr(RTE_LOG_ ## l, logtype, "TELEMETRY: " __VA_ARGS__) 63 64 /* list of command callbacks, with one command registered by default */ 65 static struct cmd_callback callbacks[TELEMETRY_MAX_CALLBACKS]; 66 static int num_callbacks; /* How many commands are registered */ 67 /* Used when accessing or modifying list of command callbacks */ 68 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 69 #ifndef RTE_EXEC_ENV_WINDOWS 70 static uint16_t v2_clients; 71 #endif /* !RTE_EXEC_ENV_WINDOWS */ 72 73 int 74 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 75 { 76 int i = 0; 77 78 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 79 || strlen(help) >= MAX_HELP_LEN) 80 return -EINVAL; 81 if (num_callbacks >= TELEMETRY_MAX_CALLBACKS) 82 return -ENOENT; 83 84 rte_spinlock_lock(&callback_sl); 85 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 86 i++; 87 if (i != num_callbacks) 88 /* Move elements to keep the list alphabetical */ 89 memmove(callbacks + i + 1, callbacks + i, 90 sizeof(struct cmd_callback) * (num_callbacks - i)); 91 92 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 93 callbacks[i].fn = fn; 94 strlcpy(callbacks[i].help, help, MAX_HELP_LEN); 95 num_callbacks++; 96 rte_spinlock_unlock(&callback_sl); 97 98 return 0; 99 } 100 101 #ifndef RTE_EXEC_ENV_WINDOWS 102 103 static int 104 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 105 struct rte_tel_data *d) 106 { 107 int i; 108 109 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 110 rte_spinlock_lock(&callback_sl); 111 for (i = 0; i < num_callbacks; i++) 112 rte_tel_data_add_array_string(d, callbacks[i].cmd); 113 rte_spinlock_unlock(&callback_sl); 114 return 0; 115 } 116 117 static int 118 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 119 struct rte_tel_data *d) 120 { 121 rte_tel_data_start_dict(d); 122 rte_tel_data_add_dict_string(d, "version", telemetry_version); 123 rte_tel_data_add_dict_int(d, "pid", getpid()); 124 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 125 return 0; 126 } 127 128 static int 129 command_help(const char *cmd __rte_unused, const char *params, 130 struct rte_tel_data *d) 131 { 132 int i; 133 134 if (!params) 135 return -1; 136 rte_tel_data_start_dict(d); 137 rte_spinlock_lock(&callback_sl); 138 for (i = 0; i < num_callbacks; i++) 139 if (strcmp(params, callbacks[i].cmd) == 0) { 140 rte_tel_data_add_dict_string(d, params, 141 callbacks[i].help); 142 break; 143 } 144 rte_spinlock_unlock(&callback_sl); 145 if (i == num_callbacks) 146 return -1; 147 return 0; 148 } 149 150 static int 151 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 152 { 153 size_t used = 0; 154 unsigned int i; 155 156 if (d->type != RTE_TEL_ARRAY_U64 && d->type != RTE_TEL_ARRAY_INT 157 && d->type != RTE_TEL_ARRAY_STRING) 158 return snprintf(out_buf, buf_len, "null"); 159 160 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 161 if (d->type == RTE_TEL_ARRAY_U64) 162 for (i = 0; i < d->data_len; i++) 163 used = rte_tel_json_add_array_u64(out_buf, 164 buf_len, used, 165 d->data.array[i].u64val); 166 if (d->type == RTE_TEL_ARRAY_INT) 167 for (i = 0; i < d->data_len; i++) 168 used = rte_tel_json_add_array_int(out_buf, 169 buf_len, used, 170 d->data.array[i].ival); 171 if (d->type == RTE_TEL_ARRAY_STRING) 172 for (i = 0; i < d->data_len; i++) 173 used = rte_tel_json_add_array_string(out_buf, 174 buf_len, used, 175 d->data.array[i].sval); 176 return used; 177 } 178 179 static void 180 output_json(const char *cmd, const struct rte_tel_data *d, int s) 181 { 182 char out_buf[MAX_OUTPUT_LEN]; 183 184 char *cb_data_buf; 185 size_t buf_len, prefix_used, used = 0; 186 unsigned int i; 187 188 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 189 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 190 switch (d->type) { 191 case RTE_TEL_NULL: 192 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 193 MAX_CMD_LEN, cmd ? cmd : "none"); 194 break; 195 case RTE_TEL_STRING: 196 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":\"%.*s\"}", 197 MAX_CMD_LEN, cmd, 198 RTE_TEL_MAX_SINGLE_STRING_LEN, d->data.str); 199 break; 200 case RTE_TEL_DICT: 201 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 202 MAX_CMD_LEN, cmd); 203 cb_data_buf = &out_buf[prefix_used]; 204 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 205 206 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 207 for (i = 0; i < d->data_len; i++) { 208 const struct tel_dict_entry *v = &d->data.dict[i]; 209 switch (v->type) { 210 case RTE_TEL_STRING_VAL: 211 used = rte_tel_json_add_obj_str(cb_data_buf, 212 buf_len, used, 213 v->name, v->value.sval); 214 break; 215 case RTE_TEL_INT_VAL: 216 used = rte_tel_json_add_obj_int(cb_data_buf, 217 buf_len, used, 218 v->name, v->value.ival); 219 break; 220 case RTE_TEL_U64_VAL: 221 used = rte_tel_json_add_obj_u64(cb_data_buf, 222 buf_len, used, 223 v->name, v->value.u64val); 224 break; 225 case RTE_TEL_CONTAINER: 226 { 227 char temp[buf_len]; 228 const struct container *cont = 229 &v->value.container; 230 if (container_to_json(cont->data, 231 temp, buf_len) != 0) 232 used = rte_tel_json_add_obj_json( 233 cb_data_buf, 234 buf_len, used, 235 v->name, temp); 236 if (!cont->keep) 237 rte_tel_data_free(cont->data); 238 } 239 } 240 } 241 used += prefix_used; 242 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 243 break; 244 case RTE_TEL_ARRAY_STRING: 245 case RTE_TEL_ARRAY_INT: 246 case RTE_TEL_ARRAY_U64: 247 case RTE_TEL_ARRAY_CONTAINER: 248 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 249 MAX_CMD_LEN, cmd); 250 cb_data_buf = &out_buf[prefix_used]; 251 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 252 253 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 254 for (i = 0; i < d->data_len; i++) 255 if (d->type == RTE_TEL_ARRAY_STRING) 256 used = rte_tel_json_add_array_string( 257 cb_data_buf, 258 buf_len, used, 259 d->data.array[i].sval); 260 else if (d->type == RTE_TEL_ARRAY_INT) 261 used = rte_tel_json_add_array_int(cb_data_buf, 262 buf_len, used, 263 d->data.array[i].ival); 264 else if (d->type == RTE_TEL_ARRAY_U64) 265 used = rte_tel_json_add_array_u64(cb_data_buf, 266 buf_len, used, 267 d->data.array[i].u64val); 268 else if (d->type == RTE_TEL_ARRAY_CONTAINER) { 269 char temp[buf_len]; 270 const struct container *rec_data = 271 &d->data.array[i].container; 272 if (container_to_json(rec_data->data, 273 temp, buf_len) != 0) 274 used = rte_tel_json_add_array_json( 275 cb_data_buf, 276 buf_len, used, temp); 277 if (!rec_data->keep) 278 rte_tel_data_free(rec_data->data); 279 } 280 used += prefix_used; 281 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 282 break; 283 } 284 if (write(s, out_buf, used) < 0) 285 perror("Error writing to socket"); 286 } 287 288 static void 289 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 290 { 291 struct rte_tel_data data; 292 293 int ret = fn(cmd, param, &data); 294 if (ret < 0) { 295 char out_buf[MAX_CMD_LEN + 10]; 296 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 297 MAX_CMD_LEN, cmd ? cmd : "none"); 298 if (write(s, out_buf, used) < 0) 299 perror("Error writing to socket"); 300 return; 301 } 302 output_json(cmd, &data, s); 303 } 304 305 static int 306 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 307 struct rte_tel_data *d) 308 { 309 return d->type = RTE_TEL_NULL; 310 } 311 312 static void * 313 client_handler(void *sock_id) 314 { 315 int s = (int)(uintptr_t)sock_id; 316 char buffer[1024]; 317 char info_str[1024]; 318 snprintf(info_str, sizeof(info_str), 319 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 320 telemetry_version, getpid(), MAX_OUTPUT_LEN); 321 if (write(s, info_str, strlen(info_str)) < 0) { 322 close(s); 323 return NULL; 324 } 325 326 /* receive data is not null terminated */ 327 int bytes = read(s, buffer, sizeof(buffer) - 1); 328 while (bytes > 0) { 329 buffer[bytes] = 0; 330 const char *cmd = strtok(buffer, ","); 331 const char *param = strtok(NULL, "\0"); 332 telemetry_cb fn = unknown_command; 333 int i; 334 335 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 336 rte_spinlock_lock(&callback_sl); 337 for (i = 0; i < num_callbacks; i++) 338 if (strcmp(cmd, callbacks[i].cmd) == 0) { 339 fn = callbacks[i].fn; 340 break; 341 } 342 rte_spinlock_unlock(&callback_sl); 343 } 344 perform_command(fn, cmd, param, s); 345 346 bytes = read(s, buffer, sizeof(buffer) - 1); 347 } 348 close(s); 349 __atomic_sub_fetch(&v2_clients, 1, __ATOMIC_RELAXED); 350 return NULL; 351 } 352 353 static void * 354 socket_listener(void *socket) 355 { 356 while (1) { 357 pthread_t th; 358 int rc; 359 struct socket *s = (struct socket *)socket; 360 int s_accepted = accept(s->sock, NULL, NULL); 361 if (s_accepted < 0) { 362 TMTY_LOG(ERR, "Error with accept, telemetry thread quitting\n"); 363 return NULL; 364 } 365 if (s->num_clients != NULL) { 366 uint16_t conns = __atomic_load_n(s->num_clients, 367 __ATOMIC_RELAXED); 368 if (conns >= MAX_CONNECTIONS) { 369 close(s_accepted); 370 continue; 371 } 372 __atomic_add_fetch(s->num_clients, 1, 373 __ATOMIC_RELAXED); 374 } 375 rc = pthread_create(&th, NULL, s->fn, 376 (void *)(uintptr_t)s_accepted); 377 if (rc != 0) { 378 TMTY_LOG(ERR, "Error with create client thread: %s\n", 379 strerror(rc)); 380 close(s_accepted); 381 if (s->num_clients != NULL) 382 __atomic_sub_fetch(s->num_clients, 1, 383 __ATOMIC_RELAXED); 384 continue; 385 } 386 pthread_detach(th); 387 } 388 return NULL; 389 } 390 391 static inline char * 392 get_socket_path(const char *runtime_dir, const int version) 393 { 394 static char path[PATH_MAX]; 395 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 396 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 397 return path; 398 } 399 400 static void 401 unlink_sockets(void) 402 { 403 if (v2_socket.path[0]) 404 unlink(v2_socket.path); 405 if (v1_socket.path[0]) 406 unlink(v1_socket.path); 407 } 408 409 static int 410 create_socket(char *path) 411 { 412 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 413 if (sock < 0) { 414 TMTY_LOG(ERR, "Error with socket creation, %s\n", strerror(errno)); 415 return -1; 416 } 417 418 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 419 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 420 unlink(sun.sun_path); 421 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 422 TMTY_LOG(ERR, "Error binding socket: %s\n", strerror(errno)); 423 sun.sun_path[0] = 0; 424 goto error; 425 } 426 427 if (listen(sock, 1) < 0) { 428 TMTY_LOG(ERR, "Error calling listen for socket: %s\n", strerror(errno)); 429 goto error; 430 } 431 432 return sock; 433 434 error: 435 close(sock); 436 unlink_sockets(); 437 return -1; 438 } 439 440 static void 441 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 442 { 443 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 444 #if __GLIBC_PREREQ(2, 12) 445 pthread_setname_np(id, name); 446 #endif 447 #elif defined RTE_EXEC_ENV_FREEBSD 448 pthread_set_name_np(id, name); 449 #endif 450 } 451 452 static int 453 telemetry_legacy_init(void) 454 { 455 pthread_t t_old; 456 int rc; 457 458 if (num_legacy_callbacks == 1) { 459 TMTY_LOG(WARNING, "No legacy callbacks, legacy socket not created\n"); 460 return -1; 461 } 462 463 v1_socket.fn = legacy_client_handler; 464 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 465 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 466 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 467 return -1; 468 } 469 v1_socket.sock = create_socket(v1_socket.path); 470 if (v1_socket.sock < 0) 471 return -1; 472 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 473 if (rc != 0) { 474 TMTY_LOG(ERR, "Error with create legcay socket thread: %s\n", 475 strerror(rc)); 476 close(v1_socket.sock); 477 v1_socket.sock = -1; 478 unlink(v1_socket.path); 479 v1_socket.path[0] = '\0'; 480 return -1; 481 } 482 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 483 set_thread_name(t_old, "telemetry-v1"); 484 TMTY_LOG(DEBUG, "Legacy telemetry socket initialized ok\n"); 485 return 0; 486 } 487 488 static int 489 telemetry_v2_init(void) 490 { 491 pthread_t t_new; 492 int rc; 493 494 v2_socket.num_clients = &v2_clients; 495 rte_telemetry_register_cmd("/", list_commands, 496 "Returns list of available commands, Takes no parameters"); 497 rte_telemetry_register_cmd("/info", json_info, 498 "Returns DPDK Telemetry information. Takes no parameters"); 499 rte_telemetry_register_cmd("/help", command_help, 500 "Returns help text for a command. Parameters: string command"); 501 v2_socket.fn = client_handler; 502 if (strlcpy(v2_socket.path, get_socket_path(socket_dir, 2), 503 sizeof(v2_socket.path)) >= sizeof(v2_socket.path)) { 504 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 505 return -1; 506 } 507 508 v2_socket.sock = create_socket(v2_socket.path); 509 if (v2_socket.sock < 0) 510 return -1; 511 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 512 if (rc != 0) { 513 TMTY_LOG(ERR, "Error with create socket thread: %s\n", 514 strerror(rc)); 515 close(v2_socket.sock); 516 v2_socket.sock = -1; 517 unlink(v2_socket.path); 518 v2_socket.path[0] = '\0'; 519 return -1; 520 } 521 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 522 set_thread_name(t_new, "telemetry-v2"); 523 atexit(unlink_sockets); 524 525 return 0; 526 } 527 528 #endif /* !RTE_EXEC_ENV_WINDOWS */ 529 530 int32_t 531 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset, 532 rte_log_fn log_fn, uint32_t registered_logtype) 533 { 534 telemetry_version = rte_version; 535 socket_dir = runtime_dir; 536 thread_cpuset = cpuset; 537 rte_log_ptr = log_fn; 538 logtype = registered_logtype; 539 540 #ifndef RTE_EXEC_ENV_WINDOWS 541 if (telemetry_v2_init() != 0) 542 return -1; 543 TMTY_LOG(DEBUG, "Telemetry initialized ok\n"); 544 telemetry_legacy_init(); 545 #endif /* RTE_EXEC_ENV_WINDOWS */ 546 547 return 0; 548 } 549