1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2018 Intel Corporation 3 */ 4 5 #include <dirent.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <fnmatch.h> 9 #include <inttypes.h> 10 #include <libgen.h> 11 #include <limits.h> 12 #include <pthread.h> 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <sys/file.h> 17 #include <sys/time.h> 18 #include <sys/socket.h> 19 #include <sys/un.h> 20 #include <unistd.h> 21 22 #include <rte_alarm.h> 23 #include <rte_common.h> 24 #include <rte_cycles.h> 25 #include <rte_eal.h> 26 #include <rte_errno.h> 27 #include <rte_lcore.h> 28 #include <rte_log.h> 29 #include <rte_thread.h> 30 31 #include "eal_memcfg.h" 32 #include "eal_private.h" 33 #include "eal_filesystem.h" 34 #include "eal_internal_cfg.h" 35 36 static RTE_ATOMIC(int) mp_fd = -1; 37 static rte_thread_t mp_handle_tid; 38 static char mp_filter[PATH_MAX]; /* Filter for secondary process sockets */ 39 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */ 40 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER; 41 static char peer_name[PATH_MAX]; 42 43 struct action_entry { 44 TAILQ_ENTRY(action_entry) next; 45 char action_name[RTE_MP_MAX_NAME_LEN]; 46 rte_mp_t action; 47 }; 48 49 /** Double linked list of actions. */ 50 TAILQ_HEAD(action_entry_list, action_entry); 51 52 static struct action_entry_list action_entry_list = 53 TAILQ_HEAD_INITIALIZER(action_entry_list); 54 55 enum mp_type { 56 MP_MSG, /* Share message with peers, will not block */ 57 MP_REQ, /* Request for information, Will block for a reply */ 58 MP_REP, /* Response to previously-received request */ 59 MP_IGN, /* Response telling requester to ignore this response */ 60 }; 61 62 struct mp_msg_internal { 63 int type; 64 struct rte_mp_msg msg; 65 }; 66 67 struct async_request_param { 68 rte_mp_async_reply_t clb; 69 struct rte_mp_reply user_reply; 70 struct timespec end; 71 int n_responses_processed; 72 }; 73 74 struct pending_request { 75 TAILQ_ENTRY(pending_request) next; 76 enum { 77 REQUEST_TYPE_SYNC, 78 REQUEST_TYPE_ASYNC 79 } type; 80 char dst[PATH_MAX]; 81 struct rte_mp_msg *request; 82 struct rte_mp_msg *reply; 83 int reply_received; 84 union { 85 struct { 86 struct async_request_param *param; 87 } async; 88 struct { 89 pthread_cond_t cond; 90 } sync; 91 }; 92 }; 93 94 TAILQ_HEAD(pending_request_list, pending_request); 95 96 static struct { 97 struct pending_request_list requests; 98 pthread_mutex_t lock; 99 } pending_requests = { 100 .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), 101 .lock = PTHREAD_MUTEX_INITIALIZER, 102 /**< used in async requests only */ 103 }; 104 105 /* forward declarations */ 106 static int 107 mp_send(struct rte_mp_msg *msg, const char *peer, int type); 108 109 /* for use with alarm callback */ 110 static void 111 async_reply_handle(void *arg); 112 113 /* for use with process_msg */ 114 static struct pending_request * 115 async_reply_handle_thread_unsafe(void *arg); 116 117 static void 118 trigger_async_action(struct pending_request *req); 119 120 static struct pending_request * 121 find_pending_request(const char *dst, const char *act_name) 122 { 123 struct pending_request *r; 124 125 TAILQ_FOREACH(r, &pending_requests.requests, next) { 126 if (!strcmp(r->dst, dst) && 127 !strcmp(r->request->name, act_name)) 128 break; 129 } 130 131 return r; 132 } 133 134 static void 135 create_socket_path(const char *name, char *buf, int len) 136 { 137 const char *prefix = eal_mp_socket_path(); 138 139 if (strlen(name) > 0) 140 snprintf(buf, len, "%s_%s", prefix, name); 141 else 142 strlcpy(buf, prefix, len); 143 } 144 145 int 146 rte_eal_primary_proc_alive(const char *config_file_path) 147 { 148 int config_fd; 149 150 if (config_file_path) 151 config_fd = open(config_file_path, O_RDONLY); 152 else { 153 const char *path; 154 155 path = eal_runtime_config_path(); 156 config_fd = open(path, O_RDONLY); 157 } 158 if (config_fd < 0) 159 return 0; 160 161 int ret = lockf(config_fd, F_TEST, 0); 162 close(config_fd); 163 164 return !!ret; 165 } 166 167 static struct action_entry * 168 find_action_entry_by_name(const char *name) 169 { 170 struct action_entry *entry; 171 172 TAILQ_FOREACH(entry, &action_entry_list, next) { 173 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0) 174 break; 175 } 176 177 return entry; 178 } 179 180 static int 181 validate_action_name(const char *name) 182 { 183 if (name == NULL) { 184 EAL_LOG(ERR, "Action name cannot be NULL"); 185 rte_errno = EINVAL; 186 return -1; 187 } 188 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) { 189 EAL_LOG(ERR, "Length of action name is zero"); 190 rte_errno = EINVAL; 191 return -1; 192 } 193 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) { 194 rte_errno = E2BIG; 195 return -1; 196 } 197 return 0; 198 } 199 200 int 201 rte_mp_action_register(const char *name, rte_mp_t action) 202 { 203 struct action_entry *entry; 204 const struct internal_config *internal_conf = 205 eal_get_internal_configuration(); 206 207 if (validate_action_name(name) != 0) 208 return -1; 209 210 if (internal_conf->no_shconf) { 211 EAL_LOG(DEBUG, "No shared files mode enabled, IPC is disabled"); 212 rte_errno = ENOTSUP; 213 return -1; 214 } 215 216 entry = malloc(sizeof(struct action_entry)); 217 if (entry == NULL) { 218 rte_errno = ENOMEM; 219 return -1; 220 } 221 strlcpy(entry->action_name, name, sizeof(entry->action_name)); 222 entry->action = action; 223 224 pthread_mutex_lock(&mp_mutex_action); 225 if (find_action_entry_by_name(name) != NULL) { 226 pthread_mutex_unlock(&mp_mutex_action); 227 rte_errno = EEXIST; 228 free(entry); 229 return -1; 230 } 231 TAILQ_INSERT_TAIL(&action_entry_list, entry, next); 232 pthread_mutex_unlock(&mp_mutex_action); 233 return 0; 234 } 235 236 void 237 rte_mp_action_unregister(const char *name) 238 { 239 struct action_entry *entry; 240 const struct internal_config *internal_conf = 241 eal_get_internal_configuration(); 242 243 if (validate_action_name(name) != 0) 244 return; 245 246 if (internal_conf->no_shconf) { 247 EAL_LOG(DEBUG, "No shared files mode enabled, IPC is disabled"); 248 return; 249 } 250 251 pthread_mutex_lock(&mp_mutex_action); 252 entry = find_action_entry_by_name(name); 253 if (entry == NULL) { 254 pthread_mutex_unlock(&mp_mutex_action); 255 return; 256 } 257 TAILQ_REMOVE(&action_entry_list, entry, next); 258 pthread_mutex_unlock(&mp_mutex_action); 259 free(entry); 260 } 261 262 static int 263 read_msg(int fd, struct mp_msg_internal *m, struct sockaddr_un *s) 264 { 265 int msglen; 266 struct iovec iov; 267 struct msghdr msgh; 268 char control[CMSG_SPACE(sizeof(m->msg.fds))]; 269 struct cmsghdr *cmsg; 270 int buflen = sizeof(*m) - sizeof(m->msg.fds); 271 272 memset(&msgh, 0, sizeof(msgh)); 273 iov.iov_base = m; 274 iov.iov_len = buflen; 275 276 msgh.msg_name = s; 277 msgh.msg_namelen = sizeof(*s); 278 msgh.msg_iov = &iov; 279 msgh.msg_iovlen = 1; 280 msgh.msg_control = control; 281 msgh.msg_controllen = sizeof(control); 282 283 retry: 284 msglen = recvmsg(fd, &msgh, 0); 285 286 /* zero length message means socket was closed */ 287 if (msglen == 0) 288 return 0; 289 290 if (msglen < 0) { 291 if (errno == EINTR) 292 goto retry; 293 294 EAL_LOG(ERR, "recvmsg failed, %s", strerror(errno)); 295 return -1; 296 } 297 298 if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) { 299 EAL_LOG(ERR, "truncated msg"); 300 return -1; 301 } 302 303 /* read auxiliary FDs if any */ 304 for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; 305 cmsg = CMSG_NXTHDR(&msgh, cmsg)) { 306 if ((cmsg->cmsg_level == SOL_SOCKET) && 307 (cmsg->cmsg_type == SCM_RIGHTS)) { 308 memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds)); 309 break; 310 } 311 } 312 /* sanity-check the response */ 313 if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) { 314 EAL_LOG(ERR, "invalid number of fd's received"); 315 return -1; 316 } 317 if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) { 318 EAL_LOG(ERR, "invalid received data length"); 319 return -1; 320 } 321 return msglen; 322 } 323 324 static void 325 cleanup_msg_fds(const struct rte_mp_msg *msg) 326 { 327 int i; 328 329 for (i = 0; i < msg->num_fds; i++) 330 close(msg->fds[i]); 331 } 332 333 static void 334 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 335 { 336 struct pending_request *pending_req; 337 struct action_entry *entry; 338 struct rte_mp_msg *msg = &m->msg; 339 rte_mp_t action = NULL; 340 const struct internal_config *internal_conf = 341 eal_get_internal_configuration(); 342 343 EAL_LOG(DEBUG, "msg: %s", msg->name); 344 345 if (m->type == MP_REP || m->type == MP_IGN) { 346 struct pending_request *req = NULL; 347 348 pthread_mutex_lock(&pending_requests.lock); 349 pending_req = find_pending_request(s->sun_path, msg->name); 350 if (pending_req) { 351 memcpy(pending_req->reply, msg, sizeof(*msg)); 352 /* -1 indicates that we've been asked to ignore */ 353 pending_req->reply_received = 354 m->type == MP_REP ? 1 : -1; 355 356 if (pending_req->type == REQUEST_TYPE_SYNC) 357 pthread_cond_signal(&pending_req->sync.cond); 358 else if (pending_req->type == REQUEST_TYPE_ASYNC) 359 req = async_reply_handle_thread_unsafe( 360 pending_req); 361 } else { 362 EAL_LOG(ERR, "Drop mp reply: %s", msg->name); 363 cleanup_msg_fds(msg); 364 } 365 pthread_mutex_unlock(&pending_requests.lock); 366 367 if (req != NULL) 368 trigger_async_action(req); 369 return; 370 } 371 372 pthread_mutex_lock(&mp_mutex_action); 373 entry = find_action_entry_by_name(msg->name); 374 if (entry != NULL) 375 action = entry->action; 376 pthread_mutex_unlock(&mp_mutex_action); 377 378 if (!action) { 379 if (m->type == MP_REQ && !internal_conf->init_complete) { 380 /* if this is a request, and init is not yet complete, 381 * and callback wasn't registered, we should tell the 382 * requester to ignore our existence because we're not 383 * yet ready to process this request. 384 */ 385 struct rte_mp_msg dummy; 386 387 memset(&dummy, 0, sizeof(dummy)); 388 strlcpy(dummy.name, msg->name, sizeof(dummy.name)); 389 mp_send(&dummy, s->sun_path, MP_IGN); 390 } else { 391 EAL_LOG(ERR, "Cannot find action: %s", 392 msg->name); 393 } 394 cleanup_msg_fds(msg); 395 } else if (action(msg, s->sun_path) < 0) { 396 EAL_LOG(ERR, "Fail to handle message: %s", msg->name); 397 } 398 } 399 400 static uint32_t 401 mp_handle(void *arg __rte_unused) 402 { 403 struct mp_msg_internal msg; 404 struct sockaddr_un sa; 405 int fd; 406 407 while ((fd = rte_atomic_load_explicit(&mp_fd, rte_memory_order_relaxed)) >= 0) { 408 int ret; 409 410 ret = read_msg(fd, &msg, &sa); 411 if (ret <= 0) 412 break; 413 414 process_msg(&msg, &sa); 415 } 416 417 return 0; 418 } 419 420 static int 421 timespec_cmp(const struct timespec *a, const struct timespec *b) 422 { 423 if (a->tv_sec < b->tv_sec) 424 return -1; 425 if (a->tv_sec > b->tv_sec) 426 return 1; 427 if (a->tv_nsec < b->tv_nsec) 428 return -1; 429 if (a->tv_nsec > b->tv_nsec) 430 return 1; 431 return 0; 432 } 433 434 enum async_action { 435 ACTION_FREE, /**< free the action entry, but don't trigger callback */ 436 ACTION_TRIGGER /**< trigger callback, then free action entry */ 437 }; 438 439 static enum async_action 440 process_async_request(struct pending_request *sr, const struct timespec *now) 441 { 442 struct async_request_param *param; 443 struct rte_mp_reply *reply; 444 bool timeout, last_msg; 445 446 param = sr->async.param; 447 reply = ¶m->user_reply; 448 449 /* did we timeout? */ 450 timeout = timespec_cmp(¶m->end, now) <= 0; 451 452 /* if we received a response, adjust relevant data and copy message. */ 453 if (sr->reply_received == 1 && sr->reply) { 454 struct rte_mp_msg *msg, *user_msgs, *tmp; 455 456 msg = sr->reply; 457 user_msgs = reply->msgs; 458 459 tmp = realloc(user_msgs, sizeof(*msg) * 460 (reply->nb_received + 1)); 461 if (!tmp) { 462 EAL_LOG(ERR, "Fail to alloc reply for request %s:%s", 463 sr->dst, sr->request->name); 464 /* this entry is going to be removed and its message 465 * dropped, but we don't want to leak memory, so 466 * continue. 467 */ 468 } else { 469 user_msgs = tmp; 470 reply->msgs = user_msgs; 471 memcpy(&user_msgs[reply->nb_received], 472 msg, sizeof(*msg)); 473 reply->nb_received++; 474 } 475 476 /* mark this request as processed */ 477 param->n_responses_processed++; 478 } else if (sr->reply_received == -1) { 479 /* we were asked to ignore this process */ 480 reply->nb_sent--; 481 } else if (timeout) { 482 /* count it as processed response, but don't increment 483 * nb_received. 484 */ 485 param->n_responses_processed++; 486 } 487 488 free(sr->reply); 489 490 last_msg = param->n_responses_processed == reply->nb_sent; 491 492 return last_msg ? ACTION_TRIGGER : ACTION_FREE; 493 } 494 495 static void 496 trigger_async_action(struct pending_request *sr) 497 { 498 struct async_request_param *param; 499 struct rte_mp_reply *reply; 500 501 param = sr->async.param; 502 reply = ¶m->user_reply; 503 504 param->clb(sr->request, reply); 505 506 /* clean up */ 507 free(sr->async.param->user_reply.msgs); 508 free(sr->async.param); 509 free(sr->request); 510 free(sr); 511 } 512 513 static struct pending_request * 514 async_reply_handle_thread_unsafe(void *arg) 515 { 516 struct pending_request *req = (struct pending_request *)arg; 517 enum async_action action; 518 struct timespec ts_now; 519 520 if (clock_gettime(CLOCK_MONOTONIC, &ts_now) < 0) { 521 EAL_LOG(ERR, "Cannot get current time"); 522 goto no_trigger; 523 } 524 525 action = process_async_request(req, &ts_now); 526 527 TAILQ_REMOVE(&pending_requests.requests, req, next); 528 529 if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) { 530 /* if we failed to cancel the alarm because it's already in 531 * progress, don't proceed because otherwise we will end up 532 * handling the same message twice. 533 */ 534 if (rte_errno == EINPROGRESS) { 535 EAL_LOG(DEBUG, "Request handling is already in progress"); 536 goto no_trigger; 537 } 538 EAL_LOG(ERR, "Failed to cancel alarm"); 539 } 540 541 if (action == ACTION_TRIGGER) 542 return req; 543 no_trigger: 544 free(req); 545 return NULL; 546 } 547 548 static void 549 async_reply_handle(void *arg) 550 { 551 struct pending_request *req; 552 553 pthread_mutex_lock(&pending_requests.lock); 554 req = async_reply_handle_thread_unsafe(arg); 555 pthread_mutex_unlock(&pending_requests.lock); 556 557 if (req != NULL) 558 trigger_async_action(req); 559 } 560 561 static int 562 open_socket_fd(void) 563 { 564 struct sockaddr_un un; 565 566 peer_name[0] = '\0'; 567 if (rte_eal_process_type() == RTE_PROC_SECONDARY) 568 snprintf(peer_name, sizeof(peer_name), 569 "%d_%"PRIx64, getpid(), rte_rdtsc()); 570 571 mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); 572 if (mp_fd < 0) { 573 EAL_LOG(ERR, "failed to create unix socket"); 574 return -1; 575 } 576 577 memset(&un, 0, sizeof(un)); 578 un.sun_family = AF_UNIX; 579 580 create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); 581 582 unlink(un.sun_path); /* May still exist since last run */ 583 584 if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { 585 EAL_LOG(ERR, "failed to bind %s: %s", 586 un.sun_path, strerror(errno)); 587 close(mp_fd); 588 return -1; 589 } 590 591 EAL_LOG(INFO, "Multi-process socket %s", un.sun_path); 592 return mp_fd; 593 } 594 595 static void 596 close_socket_fd(int fd) 597 { 598 char path[PATH_MAX]; 599 600 close(fd); 601 create_socket_path(peer_name, path, sizeof(path)); 602 unlink(path); 603 } 604 605 int 606 rte_mp_channel_init(void) 607 { 608 char path[PATH_MAX]; 609 int dir_fd; 610 const struct internal_config *internal_conf = 611 eal_get_internal_configuration(); 612 613 /* in no shared files mode, we do not have secondary processes support, 614 * so no need to initialize IPC. 615 */ 616 if (internal_conf->no_shconf) { 617 EAL_LOG(DEBUG, "No shared files mode enabled, IPC will be disabled"); 618 rte_errno = ENOTSUP; 619 return -1; 620 } 621 622 /* create filter path */ 623 create_socket_path("*", path, sizeof(path)); 624 strlcpy(mp_filter, basename(path), sizeof(mp_filter)); 625 626 /* path may have been modified, so recreate it */ 627 create_socket_path("*", path, sizeof(path)); 628 strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); 629 630 /* lock the directory */ 631 dir_fd = open(mp_dir_path, O_RDONLY); 632 if (dir_fd < 0) { 633 EAL_LOG(ERR, "failed to open %s: %s", 634 mp_dir_path, strerror(errno)); 635 return -1; 636 } 637 638 if (flock(dir_fd, LOCK_EX)) { 639 EAL_LOG(ERR, "failed to lock %s: %s", 640 mp_dir_path, strerror(errno)); 641 close(dir_fd); 642 return -1; 643 } 644 645 if (open_socket_fd() < 0) { 646 close(dir_fd); 647 return -1; 648 } 649 650 if (rte_thread_create_internal_control(&mp_handle_tid, "mp-msg", 651 mp_handle, NULL) < 0) { 652 EAL_LOG(ERR, "failed to create mp thread: %s", 653 strerror(errno)); 654 close(dir_fd); 655 close(rte_atomic_exchange_explicit(&mp_fd, -1, rte_memory_order_relaxed)); 656 return -1; 657 } 658 659 /* unlock the directory */ 660 flock(dir_fd, LOCK_UN); 661 close(dir_fd); 662 663 return 0; 664 } 665 666 void 667 rte_mp_channel_cleanup(void) 668 { 669 int fd; 670 671 fd = rte_atomic_exchange_explicit(&mp_fd, -1, rte_memory_order_relaxed); 672 if (fd < 0) 673 return; 674 675 pthread_cancel((pthread_t)mp_handle_tid.opaque_id); 676 rte_thread_join(mp_handle_tid, NULL); 677 close_socket_fd(fd); 678 } 679 680 /** 681 * Return -1, as fail to send message and it's caused by the local side. 682 * Return 0, as fail to send message and it's caused by the remote side. 683 * Return 1, as succeed to send message. 684 */ 685 static int 686 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) 687 { 688 int snd; 689 struct iovec iov; 690 struct msghdr msgh; 691 struct cmsghdr *cmsg; 692 struct sockaddr_un dst; 693 struct mp_msg_internal m; 694 int fd_size = msg->num_fds * sizeof(int); 695 char control[CMSG_SPACE(fd_size)]; 696 697 m.type = type; 698 memcpy(&m.msg, msg, sizeof(*msg)); 699 700 memset(&dst, 0, sizeof(dst)); 701 dst.sun_family = AF_UNIX; 702 strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); 703 704 memset(&msgh, 0, sizeof(msgh)); 705 memset(control, 0, sizeof(control)); 706 707 iov.iov_base = &m; 708 iov.iov_len = sizeof(m) - sizeof(msg->fds); 709 710 msgh.msg_name = &dst; 711 msgh.msg_namelen = sizeof(dst); 712 msgh.msg_iov = &iov; 713 msgh.msg_iovlen = 1; 714 msgh.msg_control = control; 715 msgh.msg_controllen = sizeof(control); 716 717 cmsg = CMSG_FIRSTHDR(&msgh); 718 cmsg->cmsg_len = CMSG_LEN(fd_size); 719 cmsg->cmsg_level = SOL_SOCKET; 720 cmsg->cmsg_type = SCM_RIGHTS; 721 memcpy(CMSG_DATA(cmsg), msg->fds, fd_size); 722 723 do { 724 snd = sendmsg(mp_fd, &msgh, 0); 725 } while (snd < 0 && errno == EINTR); 726 727 if (snd < 0) { 728 rte_errno = errno; 729 /* Check if it caused by peer process exits */ 730 if (errno == ECONNREFUSED && 731 rte_eal_process_type() == RTE_PROC_PRIMARY) { 732 unlink(dst_path); 733 return 0; 734 } 735 EAL_LOG(ERR, "failed to send to (%s) due to %s", 736 dst_path, strerror(errno)); 737 return -1; 738 } 739 740 return 1; 741 } 742 743 static int 744 mp_send(struct rte_mp_msg *msg, const char *peer, int type) 745 { 746 int dir_fd, ret = 0; 747 DIR *mp_dir; 748 struct dirent *ent; 749 750 if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY)) 751 peer = eal_mp_socket_path(); 752 753 if (peer) { 754 if (send_msg(peer, msg, type) < 0) 755 return -1; 756 else 757 return 0; 758 } 759 760 /* broadcast to all secondary processes */ 761 mp_dir = opendir(mp_dir_path); 762 if (!mp_dir) { 763 EAL_LOG(ERR, "Unable to open directory %s", 764 mp_dir_path); 765 rte_errno = errno; 766 return -1; 767 } 768 769 dir_fd = dirfd(mp_dir); 770 /* lock the directory to prevent processes spinning up while we send */ 771 if (flock(dir_fd, LOCK_SH)) { 772 EAL_LOG(ERR, "Unable to lock directory %s", 773 mp_dir_path); 774 rte_errno = errno; 775 closedir(mp_dir); 776 return -1; 777 } 778 779 while ((ent = readdir(mp_dir))) { 780 char path[PATH_MAX]; 781 782 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 783 continue; 784 785 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 786 ent->d_name); 787 if (send_msg(path, msg, type) < 0) 788 ret = -1; 789 } 790 /* unlock the dir */ 791 flock(dir_fd, LOCK_UN); 792 793 /* dir_fd automatically closed on closedir */ 794 closedir(mp_dir); 795 return ret; 796 } 797 798 static int 799 check_input(const struct rte_mp_msg *msg) 800 { 801 if (msg == NULL) { 802 EAL_LOG(ERR, "Msg cannot be NULL"); 803 rte_errno = EINVAL; 804 return -1; 805 } 806 807 if (validate_action_name(msg->name) != 0) 808 return -1; 809 810 if (msg->len_param < 0) { 811 EAL_LOG(ERR, "Message data length is negative"); 812 rte_errno = EINVAL; 813 return -1; 814 } 815 816 if (msg->num_fds < 0) { 817 EAL_LOG(ERR, "Number of fd's is negative"); 818 rte_errno = EINVAL; 819 return -1; 820 } 821 822 if (msg->len_param > RTE_MP_MAX_PARAM_LEN) { 823 EAL_LOG(ERR, "Message data is too long"); 824 rte_errno = E2BIG; 825 return -1; 826 } 827 828 if (msg->num_fds > RTE_MP_MAX_FD_NUM) { 829 EAL_LOG(ERR, "Cannot send more than %d FDs", 830 RTE_MP_MAX_FD_NUM); 831 rte_errno = E2BIG; 832 return -1; 833 } 834 835 return 0; 836 } 837 838 int 839 rte_mp_sendmsg(struct rte_mp_msg *msg) 840 { 841 const struct internal_config *internal_conf = 842 eal_get_internal_configuration(); 843 844 if (check_input(msg) != 0) 845 return -1; 846 847 if (internal_conf->no_shconf) { 848 EAL_LOG(DEBUG, "No shared files mode enabled, IPC is disabled"); 849 rte_errno = ENOTSUP; 850 return -1; 851 } 852 853 EAL_LOG(DEBUG, "sendmsg: %s", msg->name); 854 return mp_send(msg, NULL, MP_MSG); 855 } 856 857 static int 858 mp_request_async(const char *dst, struct rte_mp_msg *req, 859 struct async_request_param *param, const struct timespec *ts) 860 { 861 struct rte_mp_msg *reply_msg; 862 struct pending_request *pending_req, *exist; 863 int ret = -1; 864 865 pending_req = calloc(1, sizeof(*pending_req)); 866 reply_msg = calloc(1, sizeof(*reply_msg)); 867 if (pending_req == NULL || reply_msg == NULL) { 868 EAL_LOG(ERR, "Could not allocate space for sync request"); 869 rte_errno = ENOMEM; 870 ret = -1; 871 goto fail; 872 } 873 874 pending_req->type = REQUEST_TYPE_ASYNC; 875 strlcpy(pending_req->dst, dst, sizeof(pending_req->dst)); 876 pending_req->request = req; 877 pending_req->reply = reply_msg; 878 pending_req->async.param = param; 879 880 /* queue already locked by caller */ 881 882 exist = find_pending_request(dst, req->name); 883 if (exist) { 884 EAL_LOG(ERR, "A pending request %s:%s", dst, req->name); 885 rte_errno = EEXIST; 886 ret = -1; 887 goto fail; 888 } 889 890 ret = send_msg(dst, req, MP_REQ); 891 if (ret < 0) { 892 EAL_LOG(ERR, "Fail to send request %s:%s", 893 dst, req->name); 894 ret = -1; 895 goto fail; 896 } else if (ret == 0) { 897 ret = 0; 898 goto fail; 899 } 900 param->user_reply.nb_sent++; 901 902 /* if alarm set fails, we simply ignore the reply */ 903 if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000, 904 async_reply_handle, pending_req) < 0) { 905 EAL_LOG(ERR, "Fail to set alarm for request %s:%s", 906 dst, req->name); 907 ret = -1; 908 goto fail; 909 } 910 TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next); 911 912 return 0; 913 fail: 914 free(pending_req); 915 free(reply_msg); 916 return ret; 917 } 918 919 static int 920 mp_request_sync(const char *dst, struct rte_mp_msg *req, 921 struct rte_mp_reply *reply, const struct timespec *ts) 922 { 923 int ret; 924 pthread_condattr_t attr; 925 struct rte_mp_msg msg, *tmp; 926 struct pending_request pending_req, *exist; 927 928 pending_req.type = REQUEST_TYPE_SYNC; 929 pending_req.reply_received = 0; 930 strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); 931 pending_req.request = req; 932 pending_req.reply = &msg; 933 pthread_condattr_init(&attr); 934 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); 935 pthread_cond_init(&pending_req.sync.cond, &attr); 936 937 exist = find_pending_request(dst, req->name); 938 if (exist) { 939 EAL_LOG(ERR, "A pending request %s:%s", dst, req->name); 940 rte_errno = EEXIST; 941 return -1; 942 } 943 944 ret = send_msg(dst, req, MP_REQ); 945 if (ret < 0) { 946 EAL_LOG(ERR, "Fail to send request %s:%s", 947 dst, req->name); 948 return -1; 949 } else if (ret == 0) 950 return 0; 951 952 TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next); 953 954 reply->nb_sent++; 955 956 do { 957 ret = pthread_cond_timedwait(&pending_req.sync.cond, 958 &pending_requests.lock, ts); 959 } while (ret != 0 && ret != ETIMEDOUT); 960 961 TAILQ_REMOVE(&pending_requests.requests, &pending_req, next); 962 963 if (pending_req.reply_received == 0) { 964 EAL_LOG(ERR, "Fail to recv reply for request %s:%s", 965 dst, req->name); 966 rte_errno = ETIMEDOUT; 967 return -1; 968 } 969 if (pending_req.reply_received == -1) { 970 EAL_LOG(DEBUG, "Asked to ignore response"); 971 /* not receiving this message is not an error, so decrement 972 * number of sent messages 973 */ 974 reply->nb_sent--; 975 return 0; 976 } 977 978 tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); 979 if (!tmp) { 980 EAL_LOG(ERR, "Fail to alloc reply for request %s:%s", 981 dst, req->name); 982 rte_errno = ENOMEM; 983 return -1; 984 } 985 memcpy(&tmp[reply->nb_received], &msg, sizeof(msg)); 986 reply->msgs = tmp; 987 reply->nb_received++; 988 return 0; 989 } 990 991 int 992 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, 993 const struct timespec *ts) 994 { 995 int dir_fd, ret = -1; 996 DIR *mp_dir; 997 struct dirent *ent; 998 struct timespec now, end; 999 const struct internal_config *internal_conf = 1000 eal_get_internal_configuration(); 1001 1002 EAL_LOG(DEBUG, "request: %s", req->name); 1003 1004 reply->nb_sent = 0; 1005 reply->nb_received = 0; 1006 reply->msgs = NULL; 1007 1008 if (check_input(req) != 0) 1009 goto end; 1010 1011 if (internal_conf->no_shconf) { 1012 EAL_LOG(DEBUG, "No shared files mode enabled, IPC is disabled"); 1013 rte_errno = ENOTSUP; 1014 return -1; 1015 } 1016 1017 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1018 EAL_LOG(ERR, "Failed to get current time"); 1019 rte_errno = errno; 1020 goto end; 1021 } 1022 1023 end.tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1024 end.tv_sec = now.tv_sec + ts->tv_sec + 1025 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1026 1027 /* for secondary process, send request to the primary process only */ 1028 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1029 pthread_mutex_lock(&pending_requests.lock); 1030 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); 1031 pthread_mutex_unlock(&pending_requests.lock); 1032 goto end; 1033 } 1034 1035 /* for primary process, broadcast request, and collect reply 1 by 1 */ 1036 mp_dir = opendir(mp_dir_path); 1037 if (!mp_dir) { 1038 EAL_LOG(ERR, "Unable to open directory %s", mp_dir_path); 1039 rte_errno = errno; 1040 goto end; 1041 } 1042 1043 dir_fd = dirfd(mp_dir); 1044 /* lock the directory to prevent processes spinning up while we send */ 1045 if (flock(dir_fd, LOCK_SH)) { 1046 EAL_LOG(ERR, "Unable to lock directory %s", 1047 mp_dir_path); 1048 rte_errno = errno; 1049 goto close_end; 1050 } 1051 1052 pthread_mutex_lock(&pending_requests.lock); 1053 while ((ent = readdir(mp_dir))) { 1054 char path[PATH_MAX]; 1055 1056 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1057 continue; 1058 1059 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1060 ent->d_name); 1061 1062 /* unlocks the mutex while waiting for response, 1063 * locks on receive 1064 */ 1065 if (mp_request_sync(path, req, reply, &end)) 1066 goto unlock_end; 1067 } 1068 ret = 0; 1069 1070 unlock_end: 1071 pthread_mutex_unlock(&pending_requests.lock); 1072 /* unlock the directory */ 1073 flock(dir_fd, LOCK_UN); 1074 1075 close_end: 1076 /* dir_fd automatically closed on closedir */ 1077 closedir(mp_dir); 1078 1079 end: 1080 if (ret) { 1081 free(reply->msgs); 1082 reply->nb_received = 0; 1083 reply->msgs = NULL; 1084 } 1085 return ret; 1086 } 1087 1088 int 1089 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, 1090 rte_mp_async_reply_t clb) 1091 { 1092 struct rte_mp_msg *copy; 1093 struct pending_request *dummy; 1094 struct async_request_param *param; 1095 struct rte_mp_reply *reply; 1096 int dir_fd, ret = 0; 1097 DIR *mp_dir; 1098 struct dirent *ent; 1099 struct timespec now; 1100 struct timespec *end; 1101 bool dummy_used = false; 1102 const struct internal_config *internal_conf = 1103 eal_get_internal_configuration(); 1104 1105 EAL_LOG(DEBUG, "request: %s", req->name); 1106 1107 if (check_input(req) != 0) 1108 return -1; 1109 1110 if (internal_conf->no_shconf) { 1111 EAL_LOG(DEBUG, "No shared files mode enabled, IPC is disabled"); 1112 rte_errno = ENOTSUP; 1113 return -1; 1114 } 1115 1116 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1117 EAL_LOG(ERR, "Failed to get current time"); 1118 rte_errno = errno; 1119 return -1; 1120 } 1121 copy = calloc(1, sizeof(*copy)); 1122 dummy = calloc(1, sizeof(*dummy)); 1123 param = calloc(1, sizeof(*param)); 1124 if (copy == NULL || dummy == NULL || param == NULL) { 1125 EAL_LOG(ERR, "Failed to allocate memory for async reply"); 1126 rte_errno = ENOMEM; 1127 goto fail; 1128 } 1129 1130 /* copy message */ 1131 memcpy(copy, req, sizeof(*copy)); 1132 1133 param->n_responses_processed = 0; 1134 param->clb = clb; 1135 end = ¶m->end; 1136 reply = ¶m->user_reply; 1137 1138 end->tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1139 end->tv_sec = now.tv_sec + ts->tv_sec + 1140 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1141 reply->nb_sent = 0; 1142 reply->nb_received = 0; 1143 reply->msgs = NULL; 1144 1145 /* we have to lock the request queue here, as we will be adding a bunch 1146 * of requests to the queue at once, and some of the replies may arrive 1147 * before we add all of the requests to the queue. 1148 */ 1149 pthread_mutex_lock(&pending_requests.lock); 1150 1151 /* we have to ensure that callback gets triggered even if we don't send 1152 * anything, therefore earlier we have allocated a dummy request. fill 1153 * it, and put it on the queue if we don't send any requests. 1154 */ 1155 dummy->type = REQUEST_TYPE_ASYNC; 1156 dummy->request = copy; 1157 dummy->reply = NULL; 1158 dummy->async.param = param; 1159 dummy->reply_received = 1; /* short-circuit the timeout */ 1160 1161 /* for secondary process, send request to the primary process only */ 1162 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1163 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts); 1164 1165 /* if we didn't send anything, put dummy request on the queue */ 1166 if (ret == 0 && reply->nb_sent == 0) { 1167 TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, 1168 next); 1169 dummy_used = true; 1170 } 1171 1172 pthread_mutex_unlock(&pending_requests.lock); 1173 1174 /* if we couldn't send anything, clean up */ 1175 if (ret != 0) 1176 goto fail; 1177 return 0; 1178 } 1179 1180 /* for primary process, broadcast request */ 1181 mp_dir = opendir(mp_dir_path); 1182 if (!mp_dir) { 1183 EAL_LOG(ERR, "Unable to open directory %s", mp_dir_path); 1184 rte_errno = errno; 1185 goto unlock_fail; 1186 } 1187 dir_fd = dirfd(mp_dir); 1188 1189 /* lock the directory to prevent processes spinning up while we send */ 1190 if (flock(dir_fd, LOCK_SH)) { 1191 EAL_LOG(ERR, "Unable to lock directory %s", 1192 mp_dir_path); 1193 rte_errno = errno; 1194 goto closedir_fail; 1195 } 1196 1197 while ((ent = readdir(mp_dir))) { 1198 char path[PATH_MAX]; 1199 1200 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1201 continue; 1202 1203 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1204 ent->d_name); 1205 1206 if (mp_request_async(path, copy, param, ts)) 1207 ret = -1; 1208 } 1209 /* if we didn't send anything, put dummy request on the queue */ 1210 if (ret == 0 && reply->nb_sent == 0) { 1211 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); 1212 dummy_used = true; 1213 } 1214 1215 /* finally, unlock the queue */ 1216 pthread_mutex_unlock(&pending_requests.lock); 1217 1218 /* unlock the directory */ 1219 flock(dir_fd, LOCK_UN); 1220 1221 /* dir_fd automatically closed on closedir */ 1222 closedir(mp_dir); 1223 1224 /* if dummy was unused, free it */ 1225 if (!dummy_used) 1226 free(dummy); 1227 1228 return ret; 1229 closedir_fail: 1230 closedir(mp_dir); 1231 unlock_fail: 1232 pthread_mutex_unlock(&pending_requests.lock); 1233 fail: 1234 free(dummy); 1235 free(param); 1236 free(copy); 1237 return -1; 1238 } 1239 1240 int 1241 rte_mp_reply(struct rte_mp_msg *msg, const char *peer) 1242 { 1243 EAL_LOG(DEBUG, "reply: %s", msg->name); 1244 const struct internal_config *internal_conf = 1245 eal_get_internal_configuration(); 1246 1247 if (check_input(msg) != 0) 1248 return -1; 1249 1250 if (peer == NULL) { 1251 EAL_LOG(ERR, "peer is not specified"); 1252 rte_errno = EINVAL; 1253 return -1; 1254 } 1255 1256 if (internal_conf->no_shconf) { 1257 EAL_LOG(DEBUG, "No shared files mode enabled, IPC is disabled"); 1258 return 0; 1259 } 1260 1261 return mp_send(msg, peer, MP_REP); 1262 } 1263 1264 /* Internally, the status of the mp feature is represented as a three-state: 1265 * - "unknown" as long as no secondary process attached to a primary process 1266 * and there was no call to rte_mp_disable yet, 1267 * - "enabled" as soon as a secondary process attaches to a primary process, 1268 * - "disabled" when a primary process successfully called rte_mp_disable, 1269 */ 1270 enum mp_status { 1271 MP_STATUS_UNKNOWN, 1272 MP_STATUS_DISABLED, 1273 MP_STATUS_ENABLED, 1274 }; 1275 1276 static bool 1277 set_mp_status(enum mp_status status) 1278 { 1279 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 1280 uint8_t expected; 1281 uint8_t desired; 1282 1283 expected = MP_STATUS_UNKNOWN; 1284 desired = status; 1285 if (rte_atomic_compare_exchange_strong_explicit(&mcfg->mp_status, &expected, desired, 1286 rte_memory_order_relaxed, rte_memory_order_relaxed)) 1287 return true; 1288 1289 return rte_atomic_load_explicit(&mcfg->mp_status, rte_memory_order_relaxed) == desired; 1290 } 1291 1292 bool 1293 rte_mp_disable(void) 1294 { 1295 return set_mp_status(MP_STATUS_DISABLED); 1296 } 1297 1298 bool 1299 __rte_mp_enable(void) 1300 { 1301 return set_mp_status(MP_STATUS_ENABLED); 1302 } 1303