1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2018 Intel Corporation 3 */ 4 5 #include <dirent.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <fnmatch.h> 9 #include <inttypes.h> 10 #include <libgen.h> 11 #include <limits.h> 12 #include <pthread.h> 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <sys/file.h> 17 #include <sys/time.h> 18 #include <sys/socket.h> 19 #include <sys/un.h> 20 #include <unistd.h> 21 22 #include <rte_alarm.h> 23 #include <rte_common.h> 24 #include <rte_cycles.h> 25 #include <rte_eal.h> 26 #include <rte_errno.h> 27 #include <rte_lcore.h> 28 #include <rte_log.h> 29 30 #include "eal_memcfg.h" 31 #include "eal_private.h" 32 #include "eal_filesystem.h" 33 #include "eal_internal_cfg.h" 34 35 static int mp_fd = -1; 36 static pthread_t mp_handle_tid; 37 static char mp_filter[PATH_MAX]; /* Filter for secondary process sockets */ 38 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */ 39 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER; 40 static char peer_name[PATH_MAX]; 41 42 struct action_entry { 43 TAILQ_ENTRY(action_entry) next; 44 char action_name[RTE_MP_MAX_NAME_LEN]; 45 rte_mp_t action; 46 }; 47 48 /** Double linked list of actions. */ 49 TAILQ_HEAD(action_entry_list, action_entry); 50 51 static struct action_entry_list action_entry_list = 52 TAILQ_HEAD_INITIALIZER(action_entry_list); 53 54 enum mp_type { 55 MP_MSG, /* Share message with peers, will not block */ 56 MP_REQ, /* Request for information, Will block for a reply */ 57 MP_REP, /* Response to previously-received request */ 58 MP_IGN, /* Response telling requester to ignore this response */ 59 }; 60 61 struct mp_msg_internal { 62 int type; 63 struct rte_mp_msg msg; 64 }; 65 66 struct async_request_param { 67 rte_mp_async_reply_t clb; 68 struct rte_mp_reply user_reply; 69 struct timespec end; 70 int n_responses_processed; 71 }; 72 73 struct pending_request { 74 TAILQ_ENTRY(pending_request) next; 75 enum { 76 REQUEST_TYPE_SYNC, 77 REQUEST_TYPE_ASYNC 78 } type; 79 char dst[PATH_MAX]; 80 struct rte_mp_msg *request; 81 struct rte_mp_msg *reply; 82 int reply_received; 83 RTE_STD_C11 84 union { 85 struct { 86 struct async_request_param *param; 87 } async; 88 struct { 89 pthread_cond_t cond; 90 } sync; 91 }; 92 }; 93 94 TAILQ_HEAD(pending_request_list, pending_request); 95 96 static struct { 97 struct pending_request_list requests; 98 pthread_mutex_t lock; 99 } pending_requests = { 100 .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), 101 .lock = PTHREAD_MUTEX_INITIALIZER, 102 /**< used in async requests only */ 103 }; 104 105 /* forward declarations */ 106 static int 107 mp_send(struct rte_mp_msg *msg, const char *peer, int type); 108 109 /* for use with alarm callback */ 110 static void 111 async_reply_handle(void *arg); 112 113 /* for use with process_msg */ 114 static struct pending_request * 115 async_reply_handle_thread_unsafe(void *arg); 116 117 static void 118 trigger_async_action(struct pending_request *req); 119 120 static struct pending_request * 121 find_pending_request(const char *dst, const char *act_name) 122 { 123 struct pending_request *r; 124 125 TAILQ_FOREACH(r, &pending_requests.requests, next) { 126 if (!strcmp(r->dst, dst) && 127 !strcmp(r->request->name, act_name)) 128 break; 129 } 130 131 return r; 132 } 133 134 static void 135 create_socket_path(const char *name, char *buf, int len) 136 { 137 const char *prefix = eal_mp_socket_path(); 138 139 if (strlen(name) > 0) 140 snprintf(buf, len, "%s_%s", prefix, name); 141 else 142 strlcpy(buf, prefix, len); 143 } 144 145 int 146 rte_eal_primary_proc_alive(const char *config_file_path) 147 { 148 int config_fd; 149 150 if (config_file_path) 151 config_fd = open(config_file_path, O_RDONLY); 152 else { 153 const char *path; 154 155 path = eal_runtime_config_path(); 156 config_fd = open(path, O_RDONLY); 157 } 158 if (config_fd < 0) 159 return 0; 160 161 int ret = lockf(config_fd, F_TEST, 0); 162 close(config_fd); 163 164 return !!ret; 165 } 166 167 static struct action_entry * 168 find_action_entry_by_name(const char *name) 169 { 170 struct action_entry *entry; 171 172 TAILQ_FOREACH(entry, &action_entry_list, next) { 173 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0) 174 break; 175 } 176 177 return entry; 178 } 179 180 static int 181 validate_action_name(const char *name) 182 { 183 if (name == NULL) { 184 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n"); 185 rte_errno = EINVAL; 186 return -1; 187 } 188 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) { 189 RTE_LOG(ERR, EAL, "Length of action name is zero\n"); 190 rte_errno = EINVAL; 191 return -1; 192 } 193 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) { 194 rte_errno = E2BIG; 195 return -1; 196 } 197 return 0; 198 } 199 200 int 201 rte_mp_action_register(const char *name, rte_mp_t action) 202 { 203 struct action_entry *entry; 204 const struct internal_config *internal_conf = 205 eal_get_internal_configuration(); 206 207 if (validate_action_name(name) != 0) 208 return -1; 209 210 if (internal_conf->no_shconf) { 211 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 212 rte_errno = ENOTSUP; 213 return -1; 214 } 215 216 entry = malloc(sizeof(struct action_entry)); 217 if (entry == NULL) { 218 rte_errno = ENOMEM; 219 return -1; 220 } 221 strlcpy(entry->action_name, name, sizeof(entry->action_name)); 222 entry->action = action; 223 224 pthread_mutex_lock(&mp_mutex_action); 225 if (find_action_entry_by_name(name) != NULL) { 226 pthread_mutex_unlock(&mp_mutex_action); 227 rte_errno = EEXIST; 228 free(entry); 229 return -1; 230 } 231 TAILQ_INSERT_TAIL(&action_entry_list, entry, next); 232 pthread_mutex_unlock(&mp_mutex_action); 233 return 0; 234 } 235 236 void 237 rte_mp_action_unregister(const char *name) 238 { 239 struct action_entry *entry; 240 const struct internal_config *internal_conf = 241 eal_get_internal_configuration(); 242 243 if (validate_action_name(name) != 0) 244 return; 245 246 if (internal_conf->no_shconf) { 247 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 248 return; 249 } 250 251 pthread_mutex_lock(&mp_mutex_action); 252 entry = find_action_entry_by_name(name); 253 if (entry == NULL) { 254 pthread_mutex_unlock(&mp_mutex_action); 255 return; 256 } 257 TAILQ_REMOVE(&action_entry_list, entry, next); 258 pthread_mutex_unlock(&mp_mutex_action); 259 free(entry); 260 } 261 262 static int 263 read_msg(int fd, struct mp_msg_internal *m, struct sockaddr_un *s) 264 { 265 int msglen; 266 struct iovec iov; 267 struct msghdr msgh; 268 char control[CMSG_SPACE(sizeof(m->msg.fds))]; 269 struct cmsghdr *cmsg; 270 int buflen = sizeof(*m) - sizeof(m->msg.fds); 271 272 memset(&msgh, 0, sizeof(msgh)); 273 iov.iov_base = m; 274 iov.iov_len = buflen; 275 276 msgh.msg_name = s; 277 msgh.msg_namelen = sizeof(*s); 278 msgh.msg_iov = &iov; 279 msgh.msg_iovlen = 1; 280 msgh.msg_control = control; 281 msgh.msg_controllen = sizeof(control); 282 283 retry: 284 msglen = recvmsg(fd, &msgh, 0); 285 286 /* zero length message means socket was closed */ 287 if (msglen == 0) 288 return 0; 289 290 if (msglen < 0) { 291 if (errno == EINTR) 292 goto retry; 293 294 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno)); 295 return -1; 296 } 297 298 if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) { 299 RTE_LOG(ERR, EAL, "truncated msg\n"); 300 return -1; 301 } 302 303 /* read auxiliary FDs if any */ 304 for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; 305 cmsg = CMSG_NXTHDR(&msgh, cmsg)) { 306 if ((cmsg->cmsg_level == SOL_SOCKET) && 307 (cmsg->cmsg_type == SCM_RIGHTS)) { 308 memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds)); 309 break; 310 } 311 } 312 /* sanity-check the response */ 313 if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) { 314 RTE_LOG(ERR, EAL, "invalid number of fd's received\n"); 315 return -1; 316 } 317 if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) { 318 RTE_LOG(ERR, EAL, "invalid received data length\n"); 319 return -1; 320 } 321 return msglen; 322 } 323 324 static void 325 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 326 { 327 struct pending_request *pending_req; 328 struct action_entry *entry; 329 struct rte_mp_msg *msg = &m->msg; 330 rte_mp_t action = NULL; 331 const struct internal_config *internal_conf = 332 eal_get_internal_configuration(); 333 334 RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); 335 336 if (m->type == MP_REP || m->type == MP_IGN) { 337 struct pending_request *req = NULL; 338 339 pthread_mutex_lock(&pending_requests.lock); 340 pending_req = find_pending_request(s->sun_path, msg->name); 341 if (pending_req) { 342 memcpy(pending_req->reply, msg, sizeof(*msg)); 343 /* -1 indicates that we've been asked to ignore */ 344 pending_req->reply_received = 345 m->type == MP_REP ? 1 : -1; 346 347 if (pending_req->type == REQUEST_TYPE_SYNC) 348 pthread_cond_signal(&pending_req->sync.cond); 349 else if (pending_req->type == REQUEST_TYPE_ASYNC) 350 req = async_reply_handle_thread_unsafe( 351 pending_req); 352 } else 353 RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); 354 pthread_mutex_unlock(&pending_requests.lock); 355 356 if (req != NULL) 357 trigger_async_action(req); 358 return; 359 } 360 361 pthread_mutex_lock(&mp_mutex_action); 362 entry = find_action_entry_by_name(msg->name); 363 if (entry != NULL) 364 action = entry->action; 365 pthread_mutex_unlock(&mp_mutex_action); 366 367 if (!action) { 368 if (m->type == MP_REQ && !internal_conf->init_complete) { 369 /* if this is a request, and init is not yet complete, 370 * and callback wasn't registered, we should tell the 371 * requester to ignore our existence because we're not 372 * yet ready to process this request. 373 */ 374 struct rte_mp_msg dummy; 375 376 memset(&dummy, 0, sizeof(dummy)); 377 strlcpy(dummy.name, msg->name, sizeof(dummy.name)); 378 mp_send(&dummy, s->sun_path, MP_IGN); 379 } else { 380 RTE_LOG(ERR, EAL, "Cannot find action: %s\n", 381 msg->name); 382 } 383 } else if (action(msg, s->sun_path) < 0) { 384 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); 385 } 386 } 387 388 static void * 389 mp_handle(void *arg __rte_unused) 390 { 391 struct mp_msg_internal msg; 392 struct sockaddr_un sa; 393 int fd; 394 395 while ((fd = __atomic_load_n(&mp_fd, __ATOMIC_RELAXED)) >= 0) { 396 int ret; 397 398 ret = read_msg(fd, &msg, &sa); 399 if (ret <= 0) 400 break; 401 402 process_msg(&msg, &sa); 403 } 404 405 return NULL; 406 } 407 408 static int 409 timespec_cmp(const struct timespec *a, const struct timespec *b) 410 { 411 if (a->tv_sec < b->tv_sec) 412 return -1; 413 if (a->tv_sec > b->tv_sec) 414 return 1; 415 if (a->tv_nsec < b->tv_nsec) 416 return -1; 417 if (a->tv_nsec > b->tv_nsec) 418 return 1; 419 return 0; 420 } 421 422 enum async_action { 423 ACTION_FREE, /**< free the action entry, but don't trigger callback */ 424 ACTION_TRIGGER /**< trigger callback, then free action entry */ 425 }; 426 427 static enum async_action 428 process_async_request(struct pending_request *sr, const struct timespec *now) 429 { 430 struct async_request_param *param; 431 struct rte_mp_reply *reply; 432 bool timeout, last_msg; 433 434 param = sr->async.param; 435 reply = ¶m->user_reply; 436 437 /* did we timeout? */ 438 timeout = timespec_cmp(¶m->end, now) <= 0; 439 440 /* if we received a response, adjust relevant data and copy message. */ 441 if (sr->reply_received == 1 && sr->reply) { 442 struct rte_mp_msg *msg, *user_msgs, *tmp; 443 444 msg = sr->reply; 445 user_msgs = reply->msgs; 446 447 tmp = realloc(user_msgs, sizeof(*msg) * 448 (reply->nb_received + 1)); 449 if (!tmp) { 450 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 451 sr->dst, sr->request->name); 452 /* this entry is going to be removed and its message 453 * dropped, but we don't want to leak memory, so 454 * continue. 455 */ 456 } else { 457 user_msgs = tmp; 458 reply->msgs = user_msgs; 459 memcpy(&user_msgs[reply->nb_received], 460 msg, sizeof(*msg)); 461 reply->nb_received++; 462 } 463 464 /* mark this request as processed */ 465 param->n_responses_processed++; 466 } else if (sr->reply_received == -1) { 467 /* we were asked to ignore this process */ 468 reply->nb_sent--; 469 } else if (timeout) { 470 /* count it as processed response, but don't increment 471 * nb_received. 472 */ 473 param->n_responses_processed++; 474 } 475 476 free(sr->reply); 477 478 last_msg = param->n_responses_processed == reply->nb_sent; 479 480 return last_msg ? ACTION_TRIGGER : ACTION_FREE; 481 } 482 483 static void 484 trigger_async_action(struct pending_request *sr) 485 { 486 struct async_request_param *param; 487 struct rte_mp_reply *reply; 488 489 param = sr->async.param; 490 reply = ¶m->user_reply; 491 492 param->clb(sr->request, reply); 493 494 /* clean up */ 495 free(sr->async.param->user_reply.msgs); 496 free(sr->async.param); 497 free(sr->request); 498 free(sr); 499 } 500 501 static struct pending_request * 502 async_reply_handle_thread_unsafe(void *arg) 503 { 504 struct pending_request *req = (struct pending_request *)arg; 505 enum async_action action; 506 struct timespec ts_now; 507 508 if (clock_gettime(CLOCK_MONOTONIC, &ts_now) < 0) { 509 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 510 goto no_trigger; 511 } 512 513 action = process_async_request(req, &ts_now); 514 515 TAILQ_REMOVE(&pending_requests.requests, req, next); 516 517 if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) { 518 /* if we failed to cancel the alarm because it's already in 519 * progress, don't proceed because otherwise we will end up 520 * handling the same message twice. 521 */ 522 if (rte_errno == EINPROGRESS) { 523 RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n"); 524 goto no_trigger; 525 } 526 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n"); 527 } 528 529 if (action == ACTION_TRIGGER) 530 return req; 531 no_trigger: 532 free(req); 533 return NULL; 534 } 535 536 static void 537 async_reply_handle(void *arg) 538 { 539 struct pending_request *req; 540 541 pthread_mutex_lock(&pending_requests.lock); 542 req = async_reply_handle_thread_unsafe(arg); 543 pthread_mutex_unlock(&pending_requests.lock); 544 545 if (req != NULL) 546 trigger_async_action(req); 547 } 548 549 static int 550 open_socket_fd(void) 551 { 552 struct sockaddr_un un; 553 554 peer_name[0] = '\0'; 555 if (rte_eal_process_type() == RTE_PROC_SECONDARY) 556 snprintf(peer_name, sizeof(peer_name), 557 "%d_%"PRIx64, getpid(), rte_rdtsc()); 558 559 mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); 560 if (mp_fd < 0) { 561 RTE_LOG(ERR, EAL, "failed to create unix socket\n"); 562 return -1; 563 } 564 565 memset(&un, 0, sizeof(un)); 566 un.sun_family = AF_UNIX; 567 568 create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); 569 570 unlink(un.sun_path); /* May still exist since last run */ 571 572 if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { 573 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", 574 un.sun_path, strerror(errno)); 575 close(mp_fd); 576 return -1; 577 } 578 579 RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path); 580 return mp_fd; 581 } 582 583 static void 584 close_socket_fd(int fd) 585 { 586 char path[PATH_MAX]; 587 588 close(fd); 589 create_socket_path(peer_name, path, sizeof(path)); 590 unlink(path); 591 } 592 593 int 594 rte_mp_channel_init(void) 595 { 596 char path[PATH_MAX]; 597 int dir_fd; 598 const struct internal_config *internal_conf = 599 eal_get_internal_configuration(); 600 601 /* in no shared files mode, we do not have secondary processes support, 602 * so no need to initialize IPC. 603 */ 604 if (internal_conf->no_shconf) { 605 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n"); 606 rte_errno = ENOTSUP; 607 return -1; 608 } 609 610 /* create filter path */ 611 create_socket_path("*", path, sizeof(path)); 612 strlcpy(mp_filter, basename(path), sizeof(mp_filter)); 613 614 /* path may have been modified, so recreate it */ 615 create_socket_path("*", path, sizeof(path)); 616 strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); 617 618 /* lock the directory */ 619 dir_fd = open(mp_dir_path, O_RDONLY); 620 if (dir_fd < 0) { 621 RTE_LOG(ERR, EAL, "failed to open %s: %s\n", 622 mp_dir_path, strerror(errno)); 623 return -1; 624 } 625 626 if (flock(dir_fd, LOCK_EX)) { 627 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", 628 mp_dir_path, strerror(errno)); 629 close(dir_fd); 630 return -1; 631 } 632 633 if (open_socket_fd() < 0) { 634 close(dir_fd); 635 return -1; 636 } 637 638 if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle", 639 NULL, mp_handle, NULL) < 0) { 640 RTE_LOG(ERR, EAL, "failed to create mp thread: %s\n", 641 strerror(errno)); 642 close(dir_fd); 643 close(__atomic_exchange_n(&mp_fd, -1, __ATOMIC_RELAXED)); 644 return -1; 645 } 646 647 /* unlock the directory */ 648 flock(dir_fd, LOCK_UN); 649 close(dir_fd); 650 651 return 0; 652 } 653 654 void 655 rte_mp_channel_cleanup(void) 656 { 657 int fd; 658 659 fd = __atomic_exchange_n(&mp_fd, -1, __ATOMIC_RELAXED); 660 if (fd < 0) 661 return; 662 663 pthread_cancel(mp_handle_tid); 664 pthread_join(mp_handle_tid, NULL); 665 close_socket_fd(fd); 666 } 667 668 /** 669 * Return -1, as fail to send message and it's caused by the local side. 670 * Return 0, as fail to send message and it's caused by the remote side. 671 * Return 1, as succeed to send message. 672 * 673 */ 674 static int 675 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) 676 { 677 int snd; 678 struct iovec iov; 679 struct msghdr msgh; 680 struct cmsghdr *cmsg; 681 struct sockaddr_un dst; 682 struct mp_msg_internal m; 683 int fd_size = msg->num_fds * sizeof(int); 684 char control[CMSG_SPACE(fd_size)]; 685 686 m.type = type; 687 memcpy(&m.msg, msg, sizeof(*msg)); 688 689 memset(&dst, 0, sizeof(dst)); 690 dst.sun_family = AF_UNIX; 691 strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); 692 693 memset(&msgh, 0, sizeof(msgh)); 694 memset(control, 0, sizeof(control)); 695 696 iov.iov_base = &m; 697 iov.iov_len = sizeof(m) - sizeof(msg->fds); 698 699 msgh.msg_name = &dst; 700 msgh.msg_namelen = sizeof(dst); 701 msgh.msg_iov = &iov; 702 msgh.msg_iovlen = 1; 703 msgh.msg_control = control; 704 msgh.msg_controllen = sizeof(control); 705 706 cmsg = CMSG_FIRSTHDR(&msgh); 707 cmsg->cmsg_len = CMSG_LEN(fd_size); 708 cmsg->cmsg_level = SOL_SOCKET; 709 cmsg->cmsg_type = SCM_RIGHTS; 710 memcpy(CMSG_DATA(cmsg), msg->fds, fd_size); 711 712 do { 713 snd = sendmsg(mp_fd, &msgh, 0); 714 } while (snd < 0 && errno == EINTR); 715 716 if (snd < 0) { 717 rte_errno = errno; 718 /* Check if it caused by peer process exits */ 719 if (errno == ECONNREFUSED && 720 rte_eal_process_type() == RTE_PROC_PRIMARY) { 721 unlink(dst_path); 722 return 0; 723 } 724 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n", 725 dst_path, strerror(errno)); 726 return -1; 727 } 728 729 return 1; 730 } 731 732 static int 733 mp_send(struct rte_mp_msg *msg, const char *peer, int type) 734 { 735 int dir_fd, ret = 0; 736 DIR *mp_dir; 737 struct dirent *ent; 738 739 if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY)) 740 peer = eal_mp_socket_path(); 741 742 if (peer) { 743 if (send_msg(peer, msg, type) < 0) 744 return -1; 745 else 746 return 0; 747 } 748 749 /* broadcast to all secondary processes */ 750 mp_dir = opendir(mp_dir_path); 751 if (!mp_dir) { 752 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", 753 mp_dir_path); 754 rte_errno = errno; 755 return -1; 756 } 757 758 dir_fd = dirfd(mp_dir); 759 /* lock the directory to prevent processes spinning up while we send */ 760 if (flock(dir_fd, LOCK_SH)) { 761 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 762 mp_dir_path); 763 rte_errno = errno; 764 closedir(mp_dir); 765 return -1; 766 } 767 768 while ((ent = readdir(mp_dir))) { 769 char path[PATH_MAX]; 770 771 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 772 continue; 773 774 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 775 ent->d_name); 776 if (send_msg(path, msg, type) < 0) 777 ret = -1; 778 } 779 /* unlock the dir */ 780 flock(dir_fd, LOCK_UN); 781 782 /* dir_fd automatically closed on closedir */ 783 closedir(mp_dir); 784 return ret; 785 } 786 787 static int 788 check_input(const struct rte_mp_msg *msg) 789 { 790 if (msg == NULL) { 791 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n"); 792 rte_errno = EINVAL; 793 return -1; 794 } 795 796 if (validate_action_name(msg->name) != 0) 797 return -1; 798 799 if (msg->len_param < 0) { 800 RTE_LOG(ERR, EAL, "Message data length is negative\n"); 801 rte_errno = EINVAL; 802 return -1; 803 } 804 805 if (msg->num_fds < 0) { 806 RTE_LOG(ERR, EAL, "Number of fd's is negative\n"); 807 rte_errno = EINVAL; 808 return -1; 809 } 810 811 if (msg->len_param > RTE_MP_MAX_PARAM_LEN) { 812 RTE_LOG(ERR, EAL, "Message data is too long\n"); 813 rte_errno = E2BIG; 814 return -1; 815 } 816 817 if (msg->num_fds > RTE_MP_MAX_FD_NUM) { 818 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", 819 RTE_MP_MAX_FD_NUM); 820 rte_errno = E2BIG; 821 return -1; 822 } 823 824 return 0; 825 } 826 827 int 828 rte_mp_sendmsg(struct rte_mp_msg *msg) 829 { 830 const struct internal_config *internal_conf = 831 eal_get_internal_configuration(); 832 833 if (check_input(msg) != 0) 834 return -1; 835 836 if (internal_conf->no_shconf) { 837 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 838 rte_errno = ENOTSUP; 839 return -1; 840 } 841 842 RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name); 843 return mp_send(msg, NULL, MP_MSG); 844 } 845 846 static int 847 mp_request_async(const char *dst, struct rte_mp_msg *req, 848 struct async_request_param *param, const struct timespec *ts) 849 { 850 struct rte_mp_msg *reply_msg; 851 struct pending_request *pending_req, *exist; 852 int ret = -1; 853 854 pending_req = calloc(1, sizeof(*pending_req)); 855 reply_msg = calloc(1, sizeof(*reply_msg)); 856 if (pending_req == NULL || reply_msg == NULL) { 857 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); 858 rte_errno = ENOMEM; 859 ret = -1; 860 goto fail; 861 } 862 863 pending_req->type = REQUEST_TYPE_ASYNC; 864 strlcpy(pending_req->dst, dst, sizeof(pending_req->dst)); 865 pending_req->request = req; 866 pending_req->reply = reply_msg; 867 pending_req->async.param = param; 868 869 /* queue already locked by caller */ 870 871 exist = find_pending_request(dst, req->name); 872 if (exist) { 873 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 874 rte_errno = EEXIST; 875 ret = -1; 876 goto fail; 877 } 878 879 ret = send_msg(dst, req, MP_REQ); 880 if (ret < 0) { 881 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 882 dst, req->name); 883 ret = -1; 884 goto fail; 885 } else if (ret == 0) { 886 ret = 0; 887 goto fail; 888 } 889 param->user_reply.nb_sent++; 890 891 /* if alarm set fails, we simply ignore the reply */ 892 if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000, 893 async_reply_handle, pending_req) < 0) { 894 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n", 895 dst, req->name); 896 ret = -1; 897 goto fail; 898 } 899 TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next); 900 901 return 0; 902 fail: 903 free(pending_req); 904 free(reply_msg); 905 return ret; 906 } 907 908 static int 909 mp_request_sync(const char *dst, struct rte_mp_msg *req, 910 struct rte_mp_reply *reply, const struct timespec *ts) 911 { 912 int ret; 913 pthread_condattr_t attr; 914 struct rte_mp_msg msg, *tmp; 915 struct pending_request pending_req, *exist; 916 917 pending_req.type = REQUEST_TYPE_SYNC; 918 pending_req.reply_received = 0; 919 strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); 920 pending_req.request = req; 921 pending_req.reply = &msg; 922 pthread_condattr_init(&attr); 923 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); 924 pthread_cond_init(&pending_req.sync.cond, &attr); 925 926 exist = find_pending_request(dst, req->name); 927 if (exist) { 928 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 929 rte_errno = EEXIST; 930 return -1; 931 } 932 933 ret = send_msg(dst, req, MP_REQ); 934 if (ret < 0) { 935 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 936 dst, req->name); 937 return -1; 938 } else if (ret == 0) 939 return 0; 940 941 TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next); 942 943 reply->nb_sent++; 944 945 do { 946 ret = pthread_cond_timedwait(&pending_req.sync.cond, 947 &pending_requests.lock, ts); 948 } while (ret != 0 && ret != ETIMEDOUT); 949 950 TAILQ_REMOVE(&pending_requests.requests, &pending_req, next); 951 952 if (pending_req.reply_received == 0) { 953 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n", 954 dst, req->name); 955 rte_errno = ETIMEDOUT; 956 return -1; 957 } 958 if (pending_req.reply_received == -1) { 959 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); 960 /* not receiving this message is not an error, so decrement 961 * number of sent messages 962 */ 963 reply->nb_sent--; 964 return 0; 965 } 966 967 tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); 968 if (!tmp) { 969 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 970 dst, req->name); 971 rte_errno = ENOMEM; 972 return -1; 973 } 974 memcpy(&tmp[reply->nb_received], &msg, sizeof(msg)); 975 reply->msgs = tmp; 976 reply->nb_received++; 977 return 0; 978 } 979 980 int 981 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, 982 const struct timespec *ts) 983 { 984 int dir_fd, ret = -1; 985 DIR *mp_dir; 986 struct dirent *ent; 987 struct timespec now, end; 988 const struct internal_config *internal_conf = 989 eal_get_internal_configuration(); 990 991 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 992 993 reply->nb_sent = 0; 994 reply->nb_received = 0; 995 reply->msgs = NULL; 996 997 if (check_input(req) != 0) 998 goto end; 999 1000 if (internal_conf->no_shconf) { 1001 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1002 rte_errno = ENOTSUP; 1003 return -1; 1004 } 1005 1006 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1007 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 1008 rte_errno = errno; 1009 goto end; 1010 } 1011 1012 end.tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1013 end.tv_sec = now.tv_sec + ts->tv_sec + 1014 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1015 1016 /* for secondary process, send request to the primary process only */ 1017 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1018 pthread_mutex_lock(&pending_requests.lock); 1019 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); 1020 pthread_mutex_unlock(&pending_requests.lock); 1021 goto end; 1022 } 1023 1024 /* for primary process, broadcast request, and collect reply 1 by 1 */ 1025 mp_dir = opendir(mp_dir_path); 1026 if (!mp_dir) { 1027 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1028 rte_errno = errno; 1029 goto end; 1030 } 1031 1032 dir_fd = dirfd(mp_dir); 1033 /* lock the directory to prevent processes spinning up while we send */ 1034 if (flock(dir_fd, LOCK_SH)) { 1035 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1036 mp_dir_path); 1037 rte_errno = errno; 1038 goto close_end; 1039 } 1040 1041 pthread_mutex_lock(&pending_requests.lock); 1042 while ((ent = readdir(mp_dir))) { 1043 char path[PATH_MAX]; 1044 1045 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1046 continue; 1047 1048 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1049 ent->d_name); 1050 1051 /* unlocks the mutex while waiting for response, 1052 * locks on receive 1053 */ 1054 if (mp_request_sync(path, req, reply, &end)) 1055 goto unlock_end; 1056 } 1057 ret = 0; 1058 1059 unlock_end: 1060 pthread_mutex_unlock(&pending_requests.lock); 1061 /* unlock the directory */ 1062 flock(dir_fd, LOCK_UN); 1063 1064 close_end: 1065 /* dir_fd automatically closed on closedir */ 1066 closedir(mp_dir); 1067 1068 end: 1069 if (ret) { 1070 free(reply->msgs); 1071 reply->nb_received = 0; 1072 reply->msgs = NULL; 1073 } 1074 return ret; 1075 } 1076 1077 int 1078 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, 1079 rte_mp_async_reply_t clb) 1080 { 1081 struct rte_mp_msg *copy; 1082 struct pending_request *dummy; 1083 struct async_request_param *param; 1084 struct rte_mp_reply *reply; 1085 int dir_fd, ret = 0; 1086 DIR *mp_dir; 1087 struct dirent *ent; 1088 struct timespec now; 1089 struct timespec *end; 1090 bool dummy_used = false; 1091 const struct internal_config *internal_conf = 1092 eal_get_internal_configuration(); 1093 1094 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 1095 1096 if (check_input(req) != 0) 1097 return -1; 1098 1099 if (internal_conf->no_shconf) { 1100 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1101 rte_errno = ENOTSUP; 1102 return -1; 1103 } 1104 1105 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1106 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 1107 rte_errno = errno; 1108 return -1; 1109 } 1110 copy = calloc(1, sizeof(*copy)); 1111 dummy = calloc(1, sizeof(*dummy)); 1112 param = calloc(1, sizeof(*param)); 1113 if (copy == NULL || dummy == NULL || param == NULL) { 1114 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); 1115 rte_errno = ENOMEM; 1116 goto fail; 1117 } 1118 1119 /* copy message */ 1120 memcpy(copy, req, sizeof(*copy)); 1121 1122 param->n_responses_processed = 0; 1123 param->clb = clb; 1124 end = ¶m->end; 1125 reply = ¶m->user_reply; 1126 1127 end->tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1128 end->tv_sec = now.tv_sec + ts->tv_sec + 1129 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1130 reply->nb_sent = 0; 1131 reply->nb_received = 0; 1132 reply->msgs = NULL; 1133 1134 /* we have to lock the request queue here, as we will be adding a bunch 1135 * of requests to the queue at once, and some of the replies may arrive 1136 * before we add all of the requests to the queue. 1137 */ 1138 pthread_mutex_lock(&pending_requests.lock); 1139 1140 /* we have to ensure that callback gets triggered even if we don't send 1141 * anything, therefore earlier we have allocated a dummy request. fill 1142 * it, and put it on the queue if we don't send any requests. 1143 */ 1144 dummy->type = REQUEST_TYPE_ASYNC; 1145 dummy->request = copy; 1146 dummy->reply = NULL; 1147 dummy->async.param = param; 1148 dummy->reply_received = 1; /* short-circuit the timeout */ 1149 1150 /* for secondary process, send request to the primary process only */ 1151 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1152 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts); 1153 1154 /* if we didn't send anything, put dummy request on the queue */ 1155 if (ret == 0 && reply->nb_sent == 0) { 1156 TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, 1157 next); 1158 dummy_used = true; 1159 } 1160 1161 pthread_mutex_unlock(&pending_requests.lock); 1162 1163 /* if we couldn't send anything, clean up */ 1164 if (ret != 0) 1165 goto fail; 1166 return 0; 1167 } 1168 1169 /* for primary process, broadcast request */ 1170 mp_dir = opendir(mp_dir_path); 1171 if (!mp_dir) { 1172 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1173 rte_errno = errno; 1174 goto unlock_fail; 1175 } 1176 dir_fd = dirfd(mp_dir); 1177 1178 /* lock the directory to prevent processes spinning up while we send */ 1179 if (flock(dir_fd, LOCK_SH)) { 1180 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1181 mp_dir_path); 1182 rte_errno = errno; 1183 goto closedir_fail; 1184 } 1185 1186 while ((ent = readdir(mp_dir))) { 1187 char path[PATH_MAX]; 1188 1189 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1190 continue; 1191 1192 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1193 ent->d_name); 1194 1195 if (mp_request_async(path, copy, param, ts)) 1196 ret = -1; 1197 } 1198 /* if we didn't send anything, put dummy request on the queue */ 1199 if (ret == 0 && reply->nb_sent == 0) { 1200 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); 1201 dummy_used = true; 1202 } 1203 1204 /* finally, unlock the queue */ 1205 pthread_mutex_unlock(&pending_requests.lock); 1206 1207 /* unlock the directory */ 1208 flock(dir_fd, LOCK_UN); 1209 1210 /* dir_fd automatically closed on closedir */ 1211 closedir(mp_dir); 1212 1213 /* if dummy was unused, free it */ 1214 if (!dummy_used) 1215 free(dummy); 1216 1217 return ret; 1218 closedir_fail: 1219 closedir(mp_dir); 1220 unlock_fail: 1221 pthread_mutex_unlock(&pending_requests.lock); 1222 fail: 1223 free(dummy); 1224 free(param); 1225 free(copy); 1226 return -1; 1227 } 1228 1229 int 1230 rte_mp_reply(struct rte_mp_msg *msg, const char *peer) 1231 { 1232 RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); 1233 const struct internal_config *internal_conf = 1234 eal_get_internal_configuration(); 1235 1236 if (check_input(msg) != 0) 1237 return -1; 1238 1239 if (peer == NULL) { 1240 RTE_LOG(ERR, EAL, "peer is not specified\n"); 1241 rte_errno = EINVAL; 1242 return -1; 1243 } 1244 1245 if (internal_conf->no_shconf) { 1246 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1247 return 0; 1248 } 1249 1250 return mp_send(msg, peer, MP_REP); 1251 } 1252 1253 /* Internally, the status of the mp feature is represented as a three-state: 1254 * - "unknown" as long as no secondary process attached to a primary process 1255 * and there was no call to rte_mp_disable yet, 1256 * - "enabled" as soon as a secondary process attaches to a primary process, 1257 * - "disabled" when a primary process successfully called rte_mp_disable, 1258 */ 1259 enum mp_status { 1260 MP_STATUS_UNKNOWN, 1261 MP_STATUS_DISABLED, 1262 MP_STATUS_ENABLED, 1263 }; 1264 1265 static bool 1266 set_mp_status(enum mp_status status) 1267 { 1268 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 1269 uint8_t expected; 1270 uint8_t desired; 1271 1272 expected = MP_STATUS_UNKNOWN; 1273 desired = status; 1274 if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired, 1275 false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) 1276 return true; 1277 1278 return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired; 1279 } 1280 1281 bool 1282 rte_mp_disable(void) 1283 { 1284 return set_mp_status(MP_STATUS_DISABLED); 1285 } 1286 1287 bool 1288 __rte_mp_enable(void) 1289 { 1290 return set_mp_status(MP_STATUS_ENABLED); 1291 } 1292