1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2018 Intel Corporation 3 */ 4 5 #include <dirent.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <fnmatch.h> 9 #include <inttypes.h> 10 #include <libgen.h> 11 #include <limits.h> 12 #include <pthread.h> 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <sys/file.h> 17 #include <sys/time.h> 18 #include <sys/socket.h> 19 #include <sys/un.h> 20 #include <unistd.h> 21 22 #include <rte_alarm.h> 23 #include <rte_common.h> 24 #include <rte_cycles.h> 25 #include <rte_eal.h> 26 #include <rte_errno.h> 27 #include <rte_lcore.h> 28 #include <rte_log.h> 29 30 #include "eal_memcfg.h" 31 #include "eal_private.h" 32 #include "eal_filesystem.h" 33 #include "eal_internal_cfg.h" 34 35 static int mp_fd = -1; 36 static pthread_t mp_handle_tid; 37 static char mp_filter[PATH_MAX]; /* Filter for secondary process sockets */ 38 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */ 39 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER; 40 static char peer_name[PATH_MAX]; 41 42 struct action_entry { 43 TAILQ_ENTRY(action_entry) next; 44 char action_name[RTE_MP_MAX_NAME_LEN]; 45 rte_mp_t action; 46 }; 47 48 /** Double linked list of actions. */ 49 TAILQ_HEAD(action_entry_list, action_entry); 50 51 static struct action_entry_list action_entry_list = 52 TAILQ_HEAD_INITIALIZER(action_entry_list); 53 54 enum mp_type { 55 MP_MSG, /* Share message with peers, will not block */ 56 MP_REQ, /* Request for information, Will block for a reply */ 57 MP_REP, /* Response to previously-received request */ 58 MP_IGN, /* Response telling requester to ignore this response */ 59 }; 60 61 struct mp_msg_internal { 62 int type; 63 struct rte_mp_msg msg; 64 }; 65 66 struct async_request_param { 67 rte_mp_async_reply_t clb; 68 struct rte_mp_reply user_reply; 69 struct timespec end; 70 int n_responses_processed; 71 }; 72 73 struct pending_request { 74 TAILQ_ENTRY(pending_request) next; 75 enum { 76 REQUEST_TYPE_SYNC, 77 REQUEST_TYPE_ASYNC 78 } type; 79 char dst[PATH_MAX]; 80 struct rte_mp_msg *request; 81 struct rte_mp_msg *reply; 82 int reply_received; 83 RTE_STD_C11 84 union { 85 struct { 86 struct async_request_param *param; 87 } async; 88 struct { 89 pthread_cond_t cond; 90 } sync; 91 }; 92 }; 93 94 TAILQ_HEAD(pending_request_list, pending_request); 95 96 static struct { 97 struct pending_request_list requests; 98 pthread_mutex_t lock; 99 } pending_requests = { 100 .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), 101 .lock = PTHREAD_MUTEX_INITIALIZER, 102 /**< used in async requests only */ 103 }; 104 105 /* forward declarations */ 106 static int 107 mp_send(struct rte_mp_msg *msg, const char *peer, int type); 108 109 /* for use with alarm callback */ 110 static void 111 async_reply_handle(void *arg); 112 113 /* for use with process_msg */ 114 static struct pending_request * 115 async_reply_handle_thread_unsafe(void *arg); 116 117 static void 118 trigger_async_action(struct pending_request *req); 119 120 static struct pending_request * 121 find_pending_request(const char *dst, const char *act_name) 122 { 123 struct pending_request *r; 124 125 TAILQ_FOREACH(r, &pending_requests.requests, next) { 126 if (!strcmp(r->dst, dst) && 127 !strcmp(r->request->name, act_name)) 128 break; 129 } 130 131 return r; 132 } 133 134 static void 135 create_socket_path(const char *name, char *buf, int len) 136 { 137 const char *prefix = eal_mp_socket_path(); 138 139 if (strlen(name) > 0) 140 snprintf(buf, len, "%s_%s", prefix, name); 141 else 142 strlcpy(buf, prefix, len); 143 } 144 145 int 146 rte_eal_primary_proc_alive(const char *config_file_path) 147 { 148 int config_fd; 149 150 if (config_file_path) 151 config_fd = open(config_file_path, O_RDONLY); 152 else { 153 const char *path; 154 155 path = eal_runtime_config_path(); 156 config_fd = open(path, O_RDONLY); 157 } 158 if (config_fd < 0) 159 return 0; 160 161 int ret = lockf(config_fd, F_TEST, 0); 162 close(config_fd); 163 164 return !!ret; 165 } 166 167 static struct action_entry * 168 find_action_entry_by_name(const char *name) 169 { 170 struct action_entry *entry; 171 172 TAILQ_FOREACH(entry, &action_entry_list, next) { 173 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0) 174 break; 175 } 176 177 return entry; 178 } 179 180 static int 181 validate_action_name(const char *name) 182 { 183 if (name == NULL) { 184 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n"); 185 rte_errno = EINVAL; 186 return -1; 187 } 188 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) { 189 RTE_LOG(ERR, EAL, "Length of action name is zero\n"); 190 rte_errno = EINVAL; 191 return -1; 192 } 193 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) { 194 rte_errno = E2BIG; 195 return -1; 196 } 197 return 0; 198 } 199 200 int 201 rte_mp_action_register(const char *name, rte_mp_t action) 202 { 203 struct action_entry *entry; 204 const struct internal_config *internal_conf = 205 eal_get_internal_configuration(); 206 207 if (validate_action_name(name) != 0) 208 return -1; 209 210 if (internal_conf->no_shconf) { 211 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 212 rte_errno = ENOTSUP; 213 return -1; 214 } 215 216 entry = malloc(sizeof(struct action_entry)); 217 if (entry == NULL) { 218 rte_errno = ENOMEM; 219 return -1; 220 } 221 strlcpy(entry->action_name, name, sizeof(entry->action_name)); 222 entry->action = action; 223 224 pthread_mutex_lock(&mp_mutex_action); 225 if (find_action_entry_by_name(name) != NULL) { 226 pthread_mutex_unlock(&mp_mutex_action); 227 rte_errno = EEXIST; 228 free(entry); 229 return -1; 230 } 231 TAILQ_INSERT_TAIL(&action_entry_list, entry, next); 232 pthread_mutex_unlock(&mp_mutex_action); 233 return 0; 234 } 235 236 void 237 rte_mp_action_unregister(const char *name) 238 { 239 struct action_entry *entry; 240 const struct internal_config *internal_conf = 241 eal_get_internal_configuration(); 242 243 if (validate_action_name(name) != 0) 244 return; 245 246 if (internal_conf->no_shconf) { 247 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 248 return; 249 } 250 251 pthread_mutex_lock(&mp_mutex_action); 252 entry = find_action_entry_by_name(name); 253 if (entry == NULL) { 254 pthread_mutex_unlock(&mp_mutex_action); 255 return; 256 } 257 TAILQ_REMOVE(&action_entry_list, entry, next); 258 pthread_mutex_unlock(&mp_mutex_action); 259 free(entry); 260 } 261 262 static int 263 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 264 { 265 int msglen; 266 struct iovec iov; 267 struct msghdr msgh; 268 char control[CMSG_SPACE(sizeof(m->msg.fds))]; 269 struct cmsghdr *cmsg; 270 int buflen = sizeof(*m) - sizeof(m->msg.fds); 271 272 memset(&msgh, 0, sizeof(msgh)); 273 iov.iov_base = m; 274 iov.iov_len = buflen; 275 276 msgh.msg_name = s; 277 msgh.msg_namelen = sizeof(*s); 278 msgh.msg_iov = &iov; 279 msgh.msg_iovlen = 1; 280 msgh.msg_control = control; 281 msgh.msg_controllen = sizeof(control); 282 283 retry: 284 msglen = recvmsg(mp_fd, &msgh, 0); 285 286 /* zero length message means socket was closed */ 287 if (msglen == 0) 288 return 0; 289 290 if (msglen < 0) { 291 if (errno == EINTR) 292 goto retry; 293 294 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno)); 295 return -1; 296 } 297 298 if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) { 299 RTE_LOG(ERR, EAL, "truncated msg\n"); 300 return -1; 301 } 302 303 /* read auxiliary FDs if any */ 304 for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; 305 cmsg = CMSG_NXTHDR(&msgh, cmsg)) { 306 if ((cmsg->cmsg_level == SOL_SOCKET) && 307 (cmsg->cmsg_type == SCM_RIGHTS)) { 308 memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds)); 309 break; 310 } 311 } 312 /* sanity-check the response */ 313 if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) { 314 RTE_LOG(ERR, EAL, "invalid number of fd's received\n"); 315 return -1; 316 } 317 if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) { 318 RTE_LOG(ERR, EAL, "invalid received data length\n"); 319 return -1; 320 } 321 return msglen; 322 } 323 324 static void 325 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 326 { 327 struct pending_request *pending_req; 328 struct action_entry *entry; 329 struct rte_mp_msg *msg = &m->msg; 330 rte_mp_t action = NULL; 331 const struct internal_config *internal_conf = 332 eal_get_internal_configuration(); 333 334 RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); 335 336 if (m->type == MP_REP || m->type == MP_IGN) { 337 struct pending_request *req = NULL; 338 339 pthread_mutex_lock(&pending_requests.lock); 340 pending_req = find_pending_request(s->sun_path, msg->name); 341 if (pending_req) { 342 memcpy(pending_req->reply, msg, sizeof(*msg)); 343 /* -1 indicates that we've been asked to ignore */ 344 pending_req->reply_received = 345 m->type == MP_REP ? 1 : -1; 346 347 if (pending_req->type == REQUEST_TYPE_SYNC) 348 pthread_cond_signal(&pending_req->sync.cond); 349 else if (pending_req->type == REQUEST_TYPE_ASYNC) 350 req = async_reply_handle_thread_unsafe( 351 pending_req); 352 } else 353 RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); 354 pthread_mutex_unlock(&pending_requests.lock); 355 356 if (req != NULL) 357 trigger_async_action(req); 358 return; 359 } 360 361 pthread_mutex_lock(&mp_mutex_action); 362 entry = find_action_entry_by_name(msg->name); 363 if (entry != NULL) 364 action = entry->action; 365 pthread_mutex_unlock(&mp_mutex_action); 366 367 if (!action) { 368 if (m->type == MP_REQ && !internal_conf->init_complete) { 369 /* if this is a request, and init is not yet complete, 370 * and callback wasn't registered, we should tell the 371 * requester to ignore our existence because we're not 372 * yet ready to process this request. 373 */ 374 struct rte_mp_msg dummy; 375 376 memset(&dummy, 0, sizeof(dummy)); 377 strlcpy(dummy.name, msg->name, sizeof(dummy.name)); 378 mp_send(&dummy, s->sun_path, MP_IGN); 379 } else { 380 RTE_LOG(ERR, EAL, "Cannot find action: %s\n", 381 msg->name); 382 } 383 } else if (action(msg, s->sun_path) < 0) { 384 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); 385 } 386 } 387 388 static void * 389 mp_handle(void *arg __rte_unused) 390 { 391 struct mp_msg_internal msg; 392 struct sockaddr_un sa; 393 394 while (mp_fd >= 0) { 395 int ret; 396 397 ret = read_msg(&msg, &sa); 398 if (ret <= 0) 399 break; 400 401 process_msg(&msg, &sa); 402 } 403 404 return NULL; 405 } 406 407 static int 408 timespec_cmp(const struct timespec *a, const struct timespec *b) 409 { 410 if (a->tv_sec < b->tv_sec) 411 return -1; 412 if (a->tv_sec > b->tv_sec) 413 return 1; 414 if (a->tv_nsec < b->tv_nsec) 415 return -1; 416 if (a->tv_nsec > b->tv_nsec) 417 return 1; 418 return 0; 419 } 420 421 enum async_action { 422 ACTION_FREE, /**< free the action entry, but don't trigger callback */ 423 ACTION_TRIGGER /**< trigger callback, then free action entry */ 424 }; 425 426 static enum async_action 427 process_async_request(struct pending_request *sr, const struct timespec *now) 428 { 429 struct async_request_param *param; 430 struct rte_mp_reply *reply; 431 bool timeout, last_msg; 432 433 param = sr->async.param; 434 reply = ¶m->user_reply; 435 436 /* did we timeout? */ 437 timeout = timespec_cmp(¶m->end, now) <= 0; 438 439 /* if we received a response, adjust relevant data and copy message. */ 440 if (sr->reply_received == 1 && sr->reply) { 441 struct rte_mp_msg *msg, *user_msgs, *tmp; 442 443 msg = sr->reply; 444 user_msgs = reply->msgs; 445 446 tmp = realloc(user_msgs, sizeof(*msg) * 447 (reply->nb_received + 1)); 448 if (!tmp) { 449 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 450 sr->dst, sr->request->name); 451 /* this entry is going to be removed and its message 452 * dropped, but we don't want to leak memory, so 453 * continue. 454 */ 455 } else { 456 user_msgs = tmp; 457 reply->msgs = user_msgs; 458 memcpy(&user_msgs[reply->nb_received], 459 msg, sizeof(*msg)); 460 reply->nb_received++; 461 } 462 463 /* mark this request as processed */ 464 param->n_responses_processed++; 465 } else if (sr->reply_received == -1) { 466 /* we were asked to ignore this process */ 467 reply->nb_sent--; 468 } else if (timeout) { 469 /* count it as processed response, but don't increment 470 * nb_received. 471 */ 472 param->n_responses_processed++; 473 } 474 475 free(sr->reply); 476 477 last_msg = param->n_responses_processed == reply->nb_sent; 478 479 return last_msg ? ACTION_TRIGGER : ACTION_FREE; 480 } 481 482 static void 483 trigger_async_action(struct pending_request *sr) 484 { 485 struct async_request_param *param; 486 struct rte_mp_reply *reply; 487 488 param = sr->async.param; 489 reply = ¶m->user_reply; 490 491 param->clb(sr->request, reply); 492 493 /* clean up */ 494 free(sr->async.param->user_reply.msgs); 495 free(sr->async.param); 496 free(sr->request); 497 free(sr); 498 } 499 500 static struct pending_request * 501 async_reply_handle_thread_unsafe(void *arg) 502 { 503 struct pending_request *req = (struct pending_request *)arg; 504 enum async_action action; 505 struct timespec ts_now; 506 507 if (clock_gettime(CLOCK_MONOTONIC, &ts_now) < 0) { 508 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 509 goto no_trigger; 510 } 511 512 action = process_async_request(req, &ts_now); 513 514 TAILQ_REMOVE(&pending_requests.requests, req, next); 515 516 if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) { 517 /* if we failed to cancel the alarm because it's already in 518 * progress, don't proceed because otherwise we will end up 519 * handling the same message twice. 520 */ 521 if (rte_errno == EINPROGRESS) { 522 RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n"); 523 goto no_trigger; 524 } 525 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n"); 526 } 527 528 if (action == ACTION_TRIGGER) 529 return req; 530 no_trigger: 531 free(req); 532 return NULL; 533 } 534 535 static void 536 async_reply_handle(void *arg) 537 { 538 struct pending_request *req; 539 540 pthread_mutex_lock(&pending_requests.lock); 541 req = async_reply_handle_thread_unsafe(arg); 542 pthread_mutex_unlock(&pending_requests.lock); 543 544 if (req != NULL) 545 trigger_async_action(req); 546 } 547 548 static int 549 open_socket_fd(void) 550 { 551 struct sockaddr_un un; 552 553 peer_name[0] = '\0'; 554 if (rte_eal_process_type() == RTE_PROC_SECONDARY) 555 snprintf(peer_name, sizeof(peer_name), 556 "%d_%"PRIx64, getpid(), rte_rdtsc()); 557 558 mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); 559 if (mp_fd < 0) { 560 RTE_LOG(ERR, EAL, "failed to create unix socket\n"); 561 return -1; 562 } 563 564 memset(&un, 0, sizeof(un)); 565 un.sun_family = AF_UNIX; 566 567 create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); 568 569 unlink(un.sun_path); /* May still exist since last run */ 570 571 if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { 572 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", 573 un.sun_path, strerror(errno)); 574 close(mp_fd); 575 return -1; 576 } 577 578 RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path); 579 return mp_fd; 580 } 581 582 static void 583 close_socket_fd(int fd) 584 { 585 char path[PATH_MAX]; 586 587 close(fd); 588 create_socket_path(peer_name, path, sizeof(path)); 589 unlink(path); 590 } 591 592 int 593 rte_mp_channel_init(void) 594 { 595 char path[PATH_MAX]; 596 int dir_fd; 597 const struct internal_config *internal_conf = 598 eal_get_internal_configuration(); 599 600 /* in no shared files mode, we do not have secondary processes support, 601 * so no need to initialize IPC. 602 */ 603 if (internal_conf->no_shconf) { 604 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n"); 605 rte_errno = ENOTSUP; 606 return -1; 607 } 608 609 /* create filter path */ 610 create_socket_path("*", path, sizeof(path)); 611 strlcpy(mp_filter, basename(path), sizeof(mp_filter)); 612 613 /* path may have been modified, so recreate it */ 614 create_socket_path("*", path, sizeof(path)); 615 strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); 616 617 /* lock the directory */ 618 dir_fd = open(mp_dir_path, O_RDONLY); 619 if (dir_fd < 0) { 620 RTE_LOG(ERR, EAL, "failed to open %s: %s\n", 621 mp_dir_path, strerror(errno)); 622 return -1; 623 } 624 625 if (flock(dir_fd, LOCK_EX)) { 626 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", 627 mp_dir_path, strerror(errno)); 628 close(dir_fd); 629 return -1; 630 } 631 632 if (open_socket_fd() < 0) { 633 close(dir_fd); 634 return -1; 635 } 636 637 if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle", 638 NULL, mp_handle, NULL) < 0) { 639 RTE_LOG(ERR, EAL, "failed to create mp thread: %s\n", 640 strerror(errno)); 641 close(mp_fd); 642 close(dir_fd); 643 mp_fd = -1; 644 return -1; 645 } 646 647 /* unlock the directory */ 648 flock(dir_fd, LOCK_UN); 649 close(dir_fd); 650 651 return 0; 652 } 653 654 void 655 rte_mp_channel_cleanup(void) 656 { 657 int fd; 658 659 if (mp_fd < 0) 660 return; 661 662 fd = mp_fd; 663 mp_fd = -1; 664 pthread_cancel(mp_handle_tid); 665 pthread_join(mp_handle_tid, NULL); 666 close_socket_fd(fd); 667 } 668 669 /** 670 * Return -1, as fail to send message and it's caused by the local side. 671 * Return 0, as fail to send message and it's caused by the remote side. 672 * Return 1, as succeed to send message. 673 * 674 */ 675 static int 676 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) 677 { 678 int snd; 679 struct iovec iov; 680 struct msghdr msgh; 681 struct cmsghdr *cmsg; 682 struct sockaddr_un dst; 683 struct mp_msg_internal m; 684 int fd_size = msg->num_fds * sizeof(int); 685 char control[CMSG_SPACE(fd_size)]; 686 687 m.type = type; 688 memcpy(&m.msg, msg, sizeof(*msg)); 689 690 memset(&dst, 0, sizeof(dst)); 691 dst.sun_family = AF_UNIX; 692 strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); 693 694 memset(&msgh, 0, sizeof(msgh)); 695 memset(control, 0, sizeof(control)); 696 697 iov.iov_base = &m; 698 iov.iov_len = sizeof(m) - sizeof(msg->fds); 699 700 msgh.msg_name = &dst; 701 msgh.msg_namelen = sizeof(dst); 702 msgh.msg_iov = &iov; 703 msgh.msg_iovlen = 1; 704 msgh.msg_control = control; 705 msgh.msg_controllen = sizeof(control); 706 707 cmsg = CMSG_FIRSTHDR(&msgh); 708 cmsg->cmsg_len = CMSG_LEN(fd_size); 709 cmsg->cmsg_level = SOL_SOCKET; 710 cmsg->cmsg_type = SCM_RIGHTS; 711 memcpy(CMSG_DATA(cmsg), msg->fds, fd_size); 712 713 do { 714 snd = sendmsg(mp_fd, &msgh, 0); 715 } while (snd < 0 && errno == EINTR); 716 717 if (snd < 0) { 718 rte_errno = errno; 719 /* Check if it caused by peer process exits */ 720 if (errno == ECONNREFUSED && 721 rte_eal_process_type() == RTE_PROC_PRIMARY) { 722 unlink(dst_path); 723 return 0; 724 } 725 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n", 726 dst_path, strerror(errno)); 727 return -1; 728 } 729 730 return 1; 731 } 732 733 static int 734 mp_send(struct rte_mp_msg *msg, const char *peer, int type) 735 { 736 int dir_fd, ret = 0; 737 DIR *mp_dir; 738 struct dirent *ent; 739 740 if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY)) 741 peer = eal_mp_socket_path(); 742 743 if (peer) { 744 if (send_msg(peer, msg, type) < 0) 745 return -1; 746 else 747 return 0; 748 } 749 750 /* broadcast to all secondary processes */ 751 mp_dir = opendir(mp_dir_path); 752 if (!mp_dir) { 753 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", 754 mp_dir_path); 755 rte_errno = errno; 756 return -1; 757 } 758 759 dir_fd = dirfd(mp_dir); 760 /* lock the directory to prevent processes spinning up while we send */ 761 if (flock(dir_fd, LOCK_SH)) { 762 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 763 mp_dir_path); 764 rte_errno = errno; 765 closedir(mp_dir); 766 return -1; 767 } 768 769 while ((ent = readdir(mp_dir))) { 770 char path[PATH_MAX]; 771 772 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 773 continue; 774 775 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 776 ent->d_name); 777 if (send_msg(path, msg, type) < 0) 778 ret = -1; 779 } 780 /* unlock the dir */ 781 flock(dir_fd, LOCK_UN); 782 783 /* dir_fd automatically closed on closedir */ 784 closedir(mp_dir); 785 return ret; 786 } 787 788 static int 789 check_input(const struct rte_mp_msg *msg) 790 { 791 if (msg == NULL) { 792 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n"); 793 rte_errno = EINVAL; 794 return -1; 795 } 796 797 if (validate_action_name(msg->name) != 0) 798 return -1; 799 800 if (msg->len_param < 0) { 801 RTE_LOG(ERR, EAL, "Message data length is negative\n"); 802 rte_errno = EINVAL; 803 return -1; 804 } 805 806 if (msg->num_fds < 0) { 807 RTE_LOG(ERR, EAL, "Number of fd's is negative\n"); 808 rte_errno = EINVAL; 809 return -1; 810 } 811 812 if (msg->len_param > RTE_MP_MAX_PARAM_LEN) { 813 RTE_LOG(ERR, EAL, "Message data is too long\n"); 814 rte_errno = E2BIG; 815 return -1; 816 } 817 818 if (msg->num_fds > RTE_MP_MAX_FD_NUM) { 819 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", 820 RTE_MP_MAX_FD_NUM); 821 rte_errno = E2BIG; 822 return -1; 823 } 824 825 return 0; 826 } 827 828 int 829 rte_mp_sendmsg(struct rte_mp_msg *msg) 830 { 831 const struct internal_config *internal_conf = 832 eal_get_internal_configuration(); 833 834 if (check_input(msg) != 0) 835 return -1; 836 837 if (internal_conf->no_shconf) { 838 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 839 rte_errno = ENOTSUP; 840 return -1; 841 } 842 843 RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name); 844 return mp_send(msg, NULL, MP_MSG); 845 } 846 847 static int 848 mp_request_async(const char *dst, struct rte_mp_msg *req, 849 struct async_request_param *param, const struct timespec *ts) 850 { 851 struct rte_mp_msg *reply_msg; 852 struct pending_request *pending_req, *exist; 853 int ret = -1; 854 855 pending_req = calloc(1, sizeof(*pending_req)); 856 reply_msg = calloc(1, sizeof(*reply_msg)); 857 if (pending_req == NULL || reply_msg == NULL) { 858 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); 859 rte_errno = ENOMEM; 860 ret = -1; 861 goto fail; 862 } 863 864 pending_req->type = REQUEST_TYPE_ASYNC; 865 strlcpy(pending_req->dst, dst, sizeof(pending_req->dst)); 866 pending_req->request = req; 867 pending_req->reply = reply_msg; 868 pending_req->async.param = param; 869 870 /* queue already locked by caller */ 871 872 exist = find_pending_request(dst, req->name); 873 if (exist) { 874 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 875 rte_errno = EEXIST; 876 ret = -1; 877 goto fail; 878 } 879 880 ret = send_msg(dst, req, MP_REQ); 881 if (ret < 0) { 882 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 883 dst, req->name); 884 ret = -1; 885 goto fail; 886 } else if (ret == 0) { 887 ret = 0; 888 goto fail; 889 } 890 param->user_reply.nb_sent++; 891 892 /* if alarm set fails, we simply ignore the reply */ 893 if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000, 894 async_reply_handle, pending_req) < 0) { 895 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n", 896 dst, req->name); 897 ret = -1; 898 goto fail; 899 } 900 TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next); 901 902 return 0; 903 fail: 904 free(pending_req); 905 free(reply_msg); 906 return ret; 907 } 908 909 static int 910 mp_request_sync(const char *dst, struct rte_mp_msg *req, 911 struct rte_mp_reply *reply, const struct timespec *ts) 912 { 913 int ret; 914 pthread_condattr_t attr; 915 struct rte_mp_msg msg, *tmp; 916 struct pending_request pending_req, *exist; 917 918 pending_req.type = REQUEST_TYPE_SYNC; 919 pending_req.reply_received = 0; 920 strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); 921 pending_req.request = req; 922 pending_req.reply = &msg; 923 pthread_condattr_init(&attr); 924 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); 925 pthread_cond_init(&pending_req.sync.cond, &attr); 926 927 exist = find_pending_request(dst, req->name); 928 if (exist) { 929 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 930 rte_errno = EEXIST; 931 return -1; 932 } 933 934 ret = send_msg(dst, req, MP_REQ); 935 if (ret < 0) { 936 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 937 dst, req->name); 938 return -1; 939 } else if (ret == 0) 940 return 0; 941 942 TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next); 943 944 reply->nb_sent++; 945 946 do { 947 ret = pthread_cond_timedwait(&pending_req.sync.cond, 948 &pending_requests.lock, ts); 949 } while (ret != 0 && ret != ETIMEDOUT); 950 951 TAILQ_REMOVE(&pending_requests.requests, &pending_req, next); 952 953 if (pending_req.reply_received == 0) { 954 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n", 955 dst, req->name); 956 rte_errno = ETIMEDOUT; 957 return -1; 958 } 959 if (pending_req.reply_received == -1) { 960 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); 961 /* not receiving this message is not an error, so decrement 962 * number of sent messages 963 */ 964 reply->nb_sent--; 965 return 0; 966 } 967 968 tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); 969 if (!tmp) { 970 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 971 dst, req->name); 972 rte_errno = ENOMEM; 973 return -1; 974 } 975 memcpy(&tmp[reply->nb_received], &msg, sizeof(msg)); 976 reply->msgs = tmp; 977 reply->nb_received++; 978 return 0; 979 } 980 981 int 982 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, 983 const struct timespec *ts) 984 { 985 int dir_fd, ret = -1; 986 DIR *mp_dir; 987 struct dirent *ent; 988 struct timespec now, end; 989 const struct internal_config *internal_conf = 990 eal_get_internal_configuration(); 991 992 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 993 994 reply->nb_sent = 0; 995 reply->nb_received = 0; 996 reply->msgs = NULL; 997 998 if (check_input(req) != 0) 999 goto end; 1000 1001 if (internal_conf->no_shconf) { 1002 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1003 rte_errno = ENOTSUP; 1004 return -1; 1005 } 1006 1007 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1008 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 1009 rte_errno = errno; 1010 goto end; 1011 } 1012 1013 end.tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1014 end.tv_sec = now.tv_sec + ts->tv_sec + 1015 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1016 1017 /* for secondary process, send request to the primary process only */ 1018 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1019 pthread_mutex_lock(&pending_requests.lock); 1020 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); 1021 pthread_mutex_unlock(&pending_requests.lock); 1022 goto end; 1023 } 1024 1025 /* for primary process, broadcast request, and collect reply 1 by 1 */ 1026 mp_dir = opendir(mp_dir_path); 1027 if (!mp_dir) { 1028 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1029 rte_errno = errno; 1030 goto end; 1031 } 1032 1033 dir_fd = dirfd(mp_dir); 1034 /* lock the directory to prevent processes spinning up while we send */ 1035 if (flock(dir_fd, LOCK_SH)) { 1036 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1037 mp_dir_path); 1038 rte_errno = errno; 1039 goto close_end; 1040 } 1041 1042 pthread_mutex_lock(&pending_requests.lock); 1043 while ((ent = readdir(mp_dir))) { 1044 char path[PATH_MAX]; 1045 1046 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1047 continue; 1048 1049 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1050 ent->d_name); 1051 1052 /* unlocks the mutex while waiting for response, 1053 * locks on receive 1054 */ 1055 if (mp_request_sync(path, req, reply, &end)) 1056 goto unlock_end; 1057 } 1058 ret = 0; 1059 1060 unlock_end: 1061 pthread_mutex_unlock(&pending_requests.lock); 1062 /* unlock the directory */ 1063 flock(dir_fd, LOCK_UN); 1064 1065 close_end: 1066 /* dir_fd automatically closed on closedir */ 1067 closedir(mp_dir); 1068 1069 end: 1070 if (ret) { 1071 free(reply->msgs); 1072 reply->nb_received = 0; 1073 reply->msgs = NULL; 1074 } 1075 return ret; 1076 } 1077 1078 int 1079 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, 1080 rte_mp_async_reply_t clb) 1081 { 1082 struct rte_mp_msg *copy; 1083 struct pending_request *dummy; 1084 struct async_request_param *param; 1085 struct rte_mp_reply *reply; 1086 int dir_fd, ret = 0; 1087 DIR *mp_dir; 1088 struct dirent *ent; 1089 struct timespec now; 1090 struct timespec *end; 1091 bool dummy_used = false; 1092 const struct internal_config *internal_conf = 1093 eal_get_internal_configuration(); 1094 1095 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 1096 1097 if (check_input(req) != 0) 1098 return -1; 1099 1100 if (internal_conf->no_shconf) { 1101 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1102 rte_errno = ENOTSUP; 1103 return -1; 1104 } 1105 1106 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1107 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 1108 rte_errno = errno; 1109 return -1; 1110 } 1111 copy = calloc(1, sizeof(*copy)); 1112 dummy = calloc(1, sizeof(*dummy)); 1113 param = calloc(1, sizeof(*param)); 1114 if (copy == NULL || dummy == NULL || param == NULL) { 1115 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); 1116 rte_errno = ENOMEM; 1117 goto fail; 1118 } 1119 1120 /* copy message */ 1121 memcpy(copy, req, sizeof(*copy)); 1122 1123 param->n_responses_processed = 0; 1124 param->clb = clb; 1125 end = ¶m->end; 1126 reply = ¶m->user_reply; 1127 1128 end->tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1129 end->tv_sec = now.tv_sec + ts->tv_sec + 1130 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1131 reply->nb_sent = 0; 1132 reply->nb_received = 0; 1133 reply->msgs = NULL; 1134 1135 /* we have to lock the request queue here, as we will be adding a bunch 1136 * of requests to the queue at once, and some of the replies may arrive 1137 * before we add all of the requests to the queue. 1138 */ 1139 pthread_mutex_lock(&pending_requests.lock); 1140 1141 /* we have to ensure that callback gets triggered even if we don't send 1142 * anything, therefore earlier we have allocated a dummy request. fill 1143 * it, and put it on the queue if we don't send any requests. 1144 */ 1145 dummy->type = REQUEST_TYPE_ASYNC; 1146 dummy->request = copy; 1147 dummy->reply = NULL; 1148 dummy->async.param = param; 1149 dummy->reply_received = 1; /* short-circuit the timeout */ 1150 1151 /* for secondary process, send request to the primary process only */ 1152 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1153 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts); 1154 1155 /* if we didn't send anything, put dummy request on the queue */ 1156 if (ret == 0 && reply->nb_sent == 0) { 1157 TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, 1158 next); 1159 dummy_used = true; 1160 } 1161 1162 pthread_mutex_unlock(&pending_requests.lock); 1163 1164 /* if we couldn't send anything, clean up */ 1165 if (ret != 0) 1166 goto fail; 1167 return 0; 1168 } 1169 1170 /* for primary process, broadcast request */ 1171 mp_dir = opendir(mp_dir_path); 1172 if (!mp_dir) { 1173 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1174 rte_errno = errno; 1175 goto unlock_fail; 1176 } 1177 dir_fd = dirfd(mp_dir); 1178 1179 /* lock the directory to prevent processes spinning up while we send */ 1180 if (flock(dir_fd, LOCK_SH)) { 1181 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1182 mp_dir_path); 1183 rte_errno = errno; 1184 goto closedir_fail; 1185 } 1186 1187 while ((ent = readdir(mp_dir))) { 1188 char path[PATH_MAX]; 1189 1190 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1191 continue; 1192 1193 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1194 ent->d_name); 1195 1196 if (mp_request_async(path, copy, param, ts)) 1197 ret = -1; 1198 } 1199 /* if we didn't send anything, put dummy request on the queue */ 1200 if (ret == 0 && reply->nb_sent == 0) { 1201 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); 1202 dummy_used = true; 1203 } 1204 1205 /* finally, unlock the queue */ 1206 pthread_mutex_unlock(&pending_requests.lock); 1207 1208 /* unlock the directory */ 1209 flock(dir_fd, LOCK_UN); 1210 1211 /* dir_fd automatically closed on closedir */ 1212 closedir(mp_dir); 1213 1214 /* if dummy was unused, free it */ 1215 if (!dummy_used) 1216 free(dummy); 1217 1218 return ret; 1219 closedir_fail: 1220 closedir(mp_dir); 1221 unlock_fail: 1222 pthread_mutex_unlock(&pending_requests.lock); 1223 fail: 1224 free(dummy); 1225 free(param); 1226 free(copy); 1227 return -1; 1228 } 1229 1230 int 1231 rte_mp_reply(struct rte_mp_msg *msg, const char *peer) 1232 { 1233 RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); 1234 const struct internal_config *internal_conf = 1235 eal_get_internal_configuration(); 1236 1237 if (check_input(msg) != 0) 1238 return -1; 1239 1240 if (peer == NULL) { 1241 RTE_LOG(ERR, EAL, "peer is not specified\n"); 1242 rte_errno = EINVAL; 1243 return -1; 1244 } 1245 1246 if (internal_conf->no_shconf) { 1247 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1248 return 0; 1249 } 1250 1251 return mp_send(msg, peer, MP_REP); 1252 } 1253 1254 /* Internally, the status of the mp feature is represented as a three-state: 1255 * - "unknown" as long as no secondary process attached to a primary process 1256 * and there was no call to rte_mp_disable yet, 1257 * - "enabled" as soon as a secondary process attaches to a primary process, 1258 * - "disabled" when a primary process successfully called rte_mp_disable, 1259 */ 1260 enum mp_status { 1261 MP_STATUS_UNKNOWN, 1262 MP_STATUS_DISABLED, 1263 MP_STATUS_ENABLED, 1264 }; 1265 1266 static bool 1267 set_mp_status(enum mp_status status) 1268 { 1269 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 1270 uint8_t expected; 1271 uint8_t desired; 1272 1273 expected = MP_STATUS_UNKNOWN; 1274 desired = status; 1275 if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired, 1276 false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) 1277 return true; 1278 1279 return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired; 1280 } 1281 1282 bool 1283 rte_mp_disable(void) 1284 { 1285 return set_mp_status(MP_STATUS_DISABLED); 1286 } 1287 1288 bool 1289 __rte_mp_enable(void) 1290 { 1291 return set_mp_status(MP_STATUS_ENABLED); 1292 } 1293