1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 */ 4 5 #ifndef RTE_EXEC_ENV_WINDOWS 6 #include <unistd.h> 7 #include <pthread.h> 8 #include <sys/socket.h> 9 #include <sys/un.h> 10 #include <dlfcn.h> 11 #endif /* !RTE_EXEC_ENV_WINDOWS */ 12 13 /* we won't link against libbsd, so just always use DPDKs-specific strlcpy */ 14 #undef RTE_USE_LIBBSD 15 #include <rte_string_fns.h> 16 #include <rte_common.h> 17 #include <rte_spinlock.h> 18 #include <rte_log.h> 19 20 #include "rte_telemetry.h" 21 #include "telemetry_json.h" 22 #include "telemetry_data.h" 23 #include "telemetry_internal.h" 24 25 #define MAX_CMD_LEN 56 26 #define MAX_HELP_LEN 64 27 #define MAX_OUTPUT_LEN (1024 * 16) 28 #define MAX_CONNECTIONS 10 29 30 #ifndef RTE_EXEC_ENV_WINDOWS 31 static void * 32 client_handler(void *socket); 33 #endif /* !RTE_EXEC_ENV_WINDOWS */ 34 35 struct cmd_callback { 36 char cmd[MAX_CMD_LEN]; 37 telemetry_cb fn; 38 char help[MAX_HELP_LEN]; 39 }; 40 41 #ifndef RTE_EXEC_ENV_WINDOWS 42 struct socket { 43 int sock; 44 char path[sizeof(((struct sockaddr_un *)0)->sun_path)]; 45 handler fn; 46 uint16_t *num_clients; 47 }; 48 static struct socket v2_socket; /* socket for v2 telemetry */ 49 static struct socket v1_socket; /* socket for v1 telemetry */ 50 #endif /* !RTE_EXEC_ENV_WINDOWS */ 51 52 static const char *telemetry_version; /* save rte_version */ 53 static const char *socket_dir; /* runtime directory */ 54 static rte_cpuset_t *thread_cpuset; 55 static rte_log_fn rte_log_ptr; 56 static uint32_t logtype; 57 58 #define TMTY_LOG(l, ...) \ 59 rte_log_ptr(RTE_LOG_ ## l, logtype, "TELEMETRY: " __VA_ARGS__) 60 61 /* list of command callbacks, with one command registered by default */ 62 static struct cmd_callback callbacks[TELEMETRY_MAX_CALLBACKS]; 63 static int num_callbacks; /* How many commands are registered */ 64 /* Used when accessing or modifying list of command callbacks */ 65 static rte_spinlock_t callback_sl = RTE_SPINLOCK_INITIALIZER; 66 #ifndef RTE_EXEC_ENV_WINDOWS 67 static uint16_t v2_clients; 68 #endif /* !RTE_EXEC_ENV_WINDOWS */ 69 70 int 71 rte_telemetry_register_cmd(const char *cmd, telemetry_cb fn, const char *help) 72 { 73 int i = 0; 74 75 if (strlen(cmd) >= MAX_CMD_LEN || fn == NULL || cmd[0] != '/' 76 || strlen(help) >= MAX_HELP_LEN) 77 return -EINVAL; 78 if (num_callbacks >= TELEMETRY_MAX_CALLBACKS) 79 return -ENOENT; 80 81 rte_spinlock_lock(&callback_sl); 82 while (i < num_callbacks && strcmp(cmd, callbacks[i].cmd) > 0) 83 i++; 84 if (i != num_callbacks) 85 /* Move elements to keep the list alphabetical */ 86 memmove(callbacks + i + 1, callbacks + i, 87 sizeof(struct cmd_callback) * (num_callbacks - i)); 88 89 strlcpy(callbacks[i].cmd, cmd, MAX_CMD_LEN); 90 callbacks[i].fn = fn; 91 strlcpy(callbacks[i].help, help, MAX_HELP_LEN); 92 num_callbacks++; 93 rte_spinlock_unlock(&callback_sl); 94 95 return 0; 96 } 97 98 #ifndef RTE_EXEC_ENV_WINDOWS 99 100 static int 101 list_commands(const char *cmd __rte_unused, const char *params __rte_unused, 102 struct rte_tel_data *d) 103 { 104 int i; 105 106 rte_tel_data_start_array(d, RTE_TEL_STRING_VAL); 107 for (i = 0; i < num_callbacks; i++) 108 rte_tel_data_add_array_string(d, callbacks[i].cmd); 109 return 0; 110 } 111 112 static int 113 json_info(const char *cmd __rte_unused, const char *params __rte_unused, 114 struct rte_tel_data *d) 115 { 116 rte_tel_data_start_dict(d); 117 rte_tel_data_add_dict_string(d, "version", telemetry_version); 118 rte_tel_data_add_dict_int(d, "pid", getpid()); 119 rte_tel_data_add_dict_int(d, "max_output_len", MAX_OUTPUT_LEN); 120 return 0; 121 } 122 123 static int 124 command_help(const char *cmd __rte_unused, const char *params, 125 struct rte_tel_data *d) 126 { 127 int i; 128 129 if (!params) 130 return -1; 131 rte_tel_data_start_dict(d); 132 rte_spinlock_lock(&callback_sl); 133 for (i = 0; i < num_callbacks; i++) 134 if (strcmp(params, callbacks[i].cmd) == 0) { 135 rte_tel_data_add_dict_string(d, params, 136 callbacks[i].help); 137 break; 138 } 139 rte_spinlock_unlock(&callback_sl); 140 if (i == num_callbacks) 141 return -1; 142 return 0; 143 } 144 145 static int 146 container_to_json(const struct rte_tel_data *d, char *out_buf, size_t buf_len) 147 { 148 size_t used = 0; 149 unsigned int i; 150 151 if (d->type != RTE_TEL_ARRAY_U64 && d->type != RTE_TEL_ARRAY_INT 152 && d->type != RTE_TEL_ARRAY_STRING) 153 return snprintf(out_buf, buf_len, "null"); 154 155 used = rte_tel_json_empty_array(out_buf, buf_len, 0); 156 if (d->type == RTE_TEL_ARRAY_U64) 157 for (i = 0; i < d->data_len; i++) 158 used = rte_tel_json_add_array_u64(out_buf, 159 buf_len, used, 160 d->data.array[i].u64val); 161 if (d->type == RTE_TEL_ARRAY_INT) 162 for (i = 0; i < d->data_len; i++) 163 used = rte_tel_json_add_array_int(out_buf, 164 buf_len, used, 165 d->data.array[i].ival); 166 if (d->type == RTE_TEL_ARRAY_STRING) 167 for (i = 0; i < d->data_len; i++) 168 used = rte_tel_json_add_array_string(out_buf, 169 buf_len, used, 170 d->data.array[i].sval); 171 return used; 172 } 173 174 static void 175 output_json(const char *cmd, const struct rte_tel_data *d, int s) 176 { 177 char out_buf[MAX_OUTPUT_LEN]; 178 179 char *cb_data_buf; 180 size_t buf_len, prefix_used, used = 0; 181 unsigned int i; 182 183 RTE_BUILD_BUG_ON(sizeof(out_buf) < MAX_CMD_LEN + 184 RTE_TEL_MAX_SINGLE_STRING_LEN + 10); 185 switch (d->type) { 186 case RTE_TEL_NULL: 187 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 188 MAX_CMD_LEN, cmd ? cmd : "none"); 189 break; 190 case RTE_TEL_STRING: 191 used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":\"%.*s\"}", 192 MAX_CMD_LEN, cmd, 193 RTE_TEL_MAX_SINGLE_STRING_LEN, d->data.str); 194 break; 195 case RTE_TEL_DICT: 196 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 197 MAX_CMD_LEN, cmd); 198 cb_data_buf = &out_buf[prefix_used]; 199 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 200 201 used = rte_tel_json_empty_obj(cb_data_buf, buf_len, 0); 202 for (i = 0; i < d->data_len; i++) { 203 const struct tel_dict_entry *v = &d->data.dict[i]; 204 switch (v->type) { 205 case RTE_TEL_STRING_VAL: 206 used = rte_tel_json_add_obj_str(cb_data_buf, 207 buf_len, used, 208 v->name, v->value.sval); 209 break; 210 case RTE_TEL_INT_VAL: 211 used = rte_tel_json_add_obj_int(cb_data_buf, 212 buf_len, used, 213 v->name, v->value.ival); 214 break; 215 case RTE_TEL_U64_VAL: 216 used = rte_tel_json_add_obj_u64(cb_data_buf, 217 buf_len, used, 218 v->name, v->value.u64val); 219 break; 220 case RTE_TEL_CONTAINER: 221 { 222 char temp[buf_len]; 223 const struct container *cont = 224 &v->value.container; 225 if (container_to_json(cont->data, 226 temp, buf_len) != 0) 227 used = rte_tel_json_add_obj_json( 228 cb_data_buf, 229 buf_len, used, 230 v->name, temp); 231 if (!cont->keep) 232 rte_tel_data_free(cont->data); 233 } 234 } 235 } 236 used += prefix_used; 237 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 238 break; 239 case RTE_TEL_ARRAY_STRING: 240 case RTE_TEL_ARRAY_INT: 241 case RTE_TEL_ARRAY_U64: 242 case RTE_TEL_ARRAY_CONTAINER: 243 prefix_used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":", 244 MAX_CMD_LEN, cmd); 245 cb_data_buf = &out_buf[prefix_used]; 246 buf_len = sizeof(out_buf) - prefix_used - 1; /* space for '}' */ 247 248 used = rte_tel_json_empty_array(cb_data_buf, buf_len, 0); 249 for (i = 0; i < d->data_len; i++) 250 if (d->type == RTE_TEL_ARRAY_STRING) 251 used = rte_tel_json_add_array_string( 252 cb_data_buf, 253 buf_len, used, 254 d->data.array[i].sval); 255 else if (d->type == RTE_TEL_ARRAY_INT) 256 used = rte_tel_json_add_array_int(cb_data_buf, 257 buf_len, used, 258 d->data.array[i].ival); 259 else if (d->type == RTE_TEL_ARRAY_U64) 260 used = rte_tel_json_add_array_u64(cb_data_buf, 261 buf_len, used, 262 d->data.array[i].u64val); 263 else if (d->type == RTE_TEL_ARRAY_CONTAINER) { 264 char temp[buf_len]; 265 const struct container *rec_data = 266 &d->data.array[i].container; 267 if (container_to_json(rec_data->data, 268 temp, buf_len) != 0) 269 used = rte_tel_json_add_array_json( 270 cb_data_buf, 271 buf_len, used, temp); 272 if (!rec_data->keep) 273 rte_tel_data_free(rec_data->data); 274 } 275 used += prefix_used; 276 used += strlcat(out_buf + used, "}", sizeof(out_buf) - used); 277 break; 278 } 279 if (write(s, out_buf, used) < 0) 280 perror("Error writing to socket"); 281 } 282 283 static void 284 perform_command(telemetry_cb fn, const char *cmd, const char *param, int s) 285 { 286 struct rte_tel_data data; 287 288 int ret = fn(cmd, param, &data); 289 if (ret < 0) { 290 char out_buf[MAX_CMD_LEN + 10]; 291 int used = snprintf(out_buf, sizeof(out_buf), "{\"%.*s\":null}", 292 MAX_CMD_LEN, cmd ? cmd : "none"); 293 if (write(s, out_buf, used) < 0) 294 perror("Error writing to socket"); 295 return; 296 } 297 output_json(cmd, &data, s); 298 } 299 300 static int 301 unknown_command(const char *cmd __rte_unused, const char *params __rte_unused, 302 struct rte_tel_data *d) 303 { 304 return d->type = RTE_TEL_NULL; 305 } 306 307 static void * 308 client_handler(void *sock_id) 309 { 310 int s = (int)(uintptr_t)sock_id; 311 char buffer[1024]; 312 char info_str[1024]; 313 snprintf(info_str, sizeof(info_str), 314 "{\"version\":\"%s\",\"pid\":%d,\"max_output_len\":%d}", 315 telemetry_version, getpid(), MAX_OUTPUT_LEN); 316 if (write(s, info_str, strlen(info_str)) < 0) { 317 close(s); 318 return NULL; 319 } 320 321 /* receive data is not null terminated */ 322 int bytes = read(s, buffer, sizeof(buffer) - 1); 323 while (bytes > 0) { 324 buffer[bytes] = 0; 325 const char *cmd = strtok(buffer, ","); 326 const char *param = strtok(NULL, "\0"); 327 telemetry_cb fn = unknown_command; 328 int i; 329 330 if (cmd && strlen(cmd) < MAX_CMD_LEN) { 331 rte_spinlock_lock(&callback_sl); 332 for (i = 0; i < num_callbacks; i++) 333 if (strcmp(cmd, callbacks[i].cmd) == 0) { 334 fn = callbacks[i].fn; 335 break; 336 } 337 rte_spinlock_unlock(&callback_sl); 338 } 339 perform_command(fn, cmd, param, s); 340 341 bytes = read(s, buffer, sizeof(buffer) - 1); 342 } 343 close(s); 344 __atomic_sub_fetch(&v2_clients, 1, __ATOMIC_RELAXED); 345 return NULL; 346 } 347 348 static void * 349 socket_listener(void *socket) 350 { 351 while (1) { 352 pthread_t th; 353 int rc; 354 struct socket *s = (struct socket *)socket; 355 int s_accepted = accept(s->sock, NULL, NULL); 356 if (s_accepted < 0) { 357 TMTY_LOG(ERR, "Error with accept, telemetry thread quitting\n"); 358 return NULL; 359 } 360 if (s->num_clients != NULL) { 361 uint16_t conns = __atomic_load_n(s->num_clients, 362 __ATOMIC_RELAXED); 363 if (conns >= MAX_CONNECTIONS) { 364 close(s_accepted); 365 continue; 366 } 367 __atomic_add_fetch(s->num_clients, 1, 368 __ATOMIC_RELAXED); 369 } 370 rc = pthread_create(&th, NULL, s->fn, 371 (void *)(uintptr_t)s_accepted); 372 if (rc != 0) { 373 TMTY_LOG(ERR, "Error with create client thread: %s\n", 374 strerror(rc)); 375 close(s_accepted); 376 if (s->num_clients != NULL) 377 __atomic_sub_fetch(s->num_clients, 1, 378 __ATOMIC_RELAXED); 379 continue; 380 } 381 pthread_detach(th); 382 } 383 return NULL; 384 } 385 386 static inline char * 387 get_socket_path(const char *runtime_dir, const int version) 388 { 389 static char path[PATH_MAX]; 390 snprintf(path, sizeof(path), "%s/dpdk_telemetry.v%d", 391 strlen(runtime_dir) ? runtime_dir : "/tmp", version); 392 return path; 393 } 394 395 static void 396 unlink_sockets(void) 397 { 398 if (v2_socket.path[0]) 399 unlink(v2_socket.path); 400 if (v1_socket.path[0]) 401 unlink(v1_socket.path); 402 } 403 404 static int 405 create_socket(char *path) 406 { 407 int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0); 408 if (sock < 0) { 409 TMTY_LOG(ERR, "Error with socket creation, %s\n", strerror(errno)); 410 return -1; 411 } 412 413 struct sockaddr_un sun = {.sun_family = AF_UNIX}; 414 strlcpy(sun.sun_path, path, sizeof(sun.sun_path)); 415 unlink(sun.sun_path); 416 if (bind(sock, (void *) &sun, sizeof(sun)) < 0) { 417 TMTY_LOG(ERR, "Error binding socket: %s\n", strerror(errno)); 418 sun.sun_path[0] = 0; 419 goto error; 420 } 421 422 if (listen(sock, 1) < 0) { 423 TMTY_LOG(ERR, "Error calling listen for socket: %s\n", strerror(errno)); 424 goto error; 425 } 426 427 return sock; 428 429 error: 430 close(sock); 431 unlink_sockets(); 432 return -1; 433 } 434 435 static void 436 set_thread_name(pthread_t id __rte_unused, const char *name __rte_unused) 437 { 438 #if defined RTE_EXEC_ENV_LINUX && defined __GLIBC__ && defined __GLIBC_PREREQ 439 #if __GLIBC_PREREQ(2, 12) 440 pthread_setname_np(id, name); 441 #endif 442 #elif defined RTE_EXEC_ENV_FREEBSD 443 pthread_set_name_np(id, name); 444 #endif 445 } 446 447 static int 448 telemetry_legacy_init(void) 449 { 450 pthread_t t_old; 451 int rc; 452 453 if (num_legacy_callbacks == 1) { 454 TMTY_LOG(WARNING, "No legacy callbacks, legacy socket not created\n"); 455 return -1; 456 } 457 458 v1_socket.fn = legacy_client_handler; 459 if ((size_t) snprintf(v1_socket.path, sizeof(v1_socket.path), 460 "%s/telemetry", socket_dir) >= sizeof(v1_socket.path)) { 461 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 462 return -1; 463 } 464 v1_socket.sock = create_socket(v1_socket.path); 465 if (v1_socket.sock < 0) 466 return -1; 467 rc = pthread_create(&t_old, NULL, socket_listener, &v1_socket); 468 if (rc != 0) { 469 TMTY_LOG(ERR, "Error with create legcay socket thread: %s\n", 470 strerror(rc)); 471 close(v1_socket.sock); 472 v1_socket.sock = -1; 473 unlink(v1_socket.path); 474 v1_socket.path[0] = '\0'; 475 return -1; 476 } 477 pthread_setaffinity_np(t_old, sizeof(*thread_cpuset), thread_cpuset); 478 set_thread_name(t_old, "telemetry-v1"); 479 TMTY_LOG(DEBUG, "Legacy telemetry socket initialized ok\n"); 480 return 0; 481 } 482 483 static int 484 telemetry_v2_init(void) 485 { 486 pthread_t t_new; 487 int rc; 488 489 v2_socket.num_clients = &v2_clients; 490 rte_telemetry_register_cmd("/", list_commands, 491 "Returns list of available commands, Takes no parameters"); 492 rte_telemetry_register_cmd("/info", json_info, 493 "Returns DPDK Telemetry information. Takes no parameters"); 494 rte_telemetry_register_cmd("/help", command_help, 495 "Returns help text for a command. Parameters: string command"); 496 v2_socket.fn = client_handler; 497 if (strlcpy(v2_socket.path, get_socket_path(socket_dir, 2), 498 sizeof(v2_socket.path)) >= sizeof(v2_socket.path)) { 499 TMTY_LOG(ERR, "Error with socket binding, path too long\n"); 500 return -1; 501 } 502 503 v2_socket.sock = create_socket(v2_socket.path); 504 if (v2_socket.sock < 0) 505 return -1; 506 rc = pthread_create(&t_new, NULL, socket_listener, &v2_socket); 507 if (rc != 0) { 508 TMTY_LOG(ERR, "Error with create socket thread: %s\n", 509 strerror(rc)); 510 close(v2_socket.sock); 511 v2_socket.sock = -1; 512 unlink(v2_socket.path); 513 v2_socket.path[0] = '\0'; 514 return -1; 515 } 516 pthread_setaffinity_np(t_new, sizeof(*thread_cpuset), thread_cpuset); 517 set_thread_name(t_new, "telemetry-v2"); 518 atexit(unlink_sockets); 519 520 return 0; 521 } 522 523 #endif /* !RTE_EXEC_ENV_WINDOWS */ 524 525 int32_t 526 rte_telemetry_init(const char *runtime_dir, const char *rte_version, rte_cpuset_t *cpuset, 527 rte_log_fn log_fn, uint32_t registered_logtype) 528 { 529 telemetry_version = rte_version; 530 socket_dir = runtime_dir; 531 thread_cpuset = cpuset; 532 rte_log_ptr = log_fn; 533 logtype = registered_logtype; 534 535 #ifndef RTE_EXEC_ENV_WINDOWS 536 if (telemetry_v2_init() != 0) 537 return -1; 538 TMTY_LOG(DEBUG, "Telemetry initialized ok\n"); 539 telemetry_legacy_init(); 540 #endif /* RTE_EXEC_ENV_WINDOWS */ 541 542 return 0; 543 } 544