1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2018 Intel Corporation 3 */ 4 5 #include <dirent.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <fnmatch.h> 9 #include <inttypes.h> 10 #include <libgen.h> 11 #include <limits.h> 12 #include <pthread.h> 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <sys/file.h> 17 #include <sys/time.h> 18 #include <sys/types.h> 19 #include <sys/socket.h> 20 #include <sys/un.h> 21 #include <unistd.h> 22 23 #include <rte_alarm.h> 24 #include <rte_common.h> 25 #include <rte_cycles.h> 26 #include <rte_eal.h> 27 #include <rte_errno.h> 28 #include <rte_lcore.h> 29 #include <rte_log.h> 30 #include <rte_tailq.h> 31 32 #include "eal_memcfg.h" 33 #include "eal_private.h" 34 #include "eal_filesystem.h" 35 #include "eal_internal_cfg.h" 36 37 static int mp_fd = -1; 38 static pthread_t mp_handle_tid; 39 static char mp_filter[PATH_MAX]; /* Filter for secondary process sockets */ 40 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */ 41 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER; 42 static char peer_name[PATH_MAX]; 43 44 struct action_entry { 45 TAILQ_ENTRY(action_entry) next; 46 char action_name[RTE_MP_MAX_NAME_LEN]; 47 rte_mp_t action; 48 }; 49 50 /** Double linked list of actions. */ 51 TAILQ_HEAD(action_entry_list, action_entry); 52 53 static struct action_entry_list action_entry_list = 54 TAILQ_HEAD_INITIALIZER(action_entry_list); 55 56 enum mp_type { 57 MP_MSG, /* Share message with peers, will not block */ 58 MP_REQ, /* Request for information, Will block for a reply */ 59 MP_REP, /* Response to previously-received request */ 60 MP_IGN, /* Response telling requester to ignore this response */ 61 }; 62 63 struct mp_msg_internal { 64 int type; 65 struct rte_mp_msg msg; 66 }; 67 68 struct async_request_param { 69 rte_mp_async_reply_t clb; 70 struct rte_mp_reply user_reply; 71 struct timespec end; 72 int n_responses_processed; 73 }; 74 75 struct pending_request { 76 TAILQ_ENTRY(pending_request) next; 77 enum { 78 REQUEST_TYPE_SYNC, 79 REQUEST_TYPE_ASYNC 80 } type; 81 char dst[PATH_MAX]; 82 struct rte_mp_msg *request; 83 struct rte_mp_msg *reply; 84 int reply_received; 85 RTE_STD_C11 86 union { 87 struct { 88 struct async_request_param *param; 89 } async; 90 struct { 91 pthread_cond_t cond; 92 } sync; 93 }; 94 }; 95 96 TAILQ_HEAD(pending_request_list, pending_request); 97 98 static struct { 99 struct pending_request_list requests; 100 pthread_mutex_t lock; 101 } pending_requests = { 102 .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), 103 .lock = PTHREAD_MUTEX_INITIALIZER, 104 /**< used in async requests only */ 105 }; 106 107 /* forward declarations */ 108 static int 109 mp_send(struct rte_mp_msg *msg, const char *peer, int type); 110 111 /* for use with alarm callback */ 112 static void 113 async_reply_handle(void *arg); 114 115 /* for use with process_msg */ 116 static struct pending_request * 117 async_reply_handle_thread_unsafe(void *arg); 118 119 static void 120 trigger_async_action(struct pending_request *req); 121 122 static struct pending_request * 123 find_pending_request(const char *dst, const char *act_name) 124 { 125 struct pending_request *r; 126 127 TAILQ_FOREACH(r, &pending_requests.requests, next) { 128 if (!strcmp(r->dst, dst) && 129 !strcmp(r->request->name, act_name)) 130 break; 131 } 132 133 return r; 134 } 135 136 static void 137 create_socket_path(const char *name, char *buf, int len) 138 { 139 const char *prefix = eal_mp_socket_path(); 140 141 if (strlen(name) > 0) 142 snprintf(buf, len, "%s_%s", prefix, name); 143 else 144 strlcpy(buf, prefix, len); 145 } 146 147 int 148 rte_eal_primary_proc_alive(const char *config_file_path) 149 { 150 int config_fd; 151 152 if (config_file_path) 153 config_fd = open(config_file_path, O_RDONLY); 154 else { 155 const char *path; 156 157 path = eal_runtime_config_path(); 158 config_fd = open(path, O_RDONLY); 159 } 160 if (config_fd < 0) 161 return 0; 162 163 int ret = lockf(config_fd, F_TEST, 0); 164 close(config_fd); 165 166 return !!ret; 167 } 168 169 static struct action_entry * 170 find_action_entry_by_name(const char *name) 171 { 172 struct action_entry *entry; 173 174 TAILQ_FOREACH(entry, &action_entry_list, next) { 175 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0) 176 break; 177 } 178 179 return entry; 180 } 181 182 static int 183 validate_action_name(const char *name) 184 { 185 if (name == NULL) { 186 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n"); 187 rte_errno = EINVAL; 188 return -1; 189 } 190 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) { 191 RTE_LOG(ERR, EAL, "Length of action name is zero\n"); 192 rte_errno = EINVAL; 193 return -1; 194 } 195 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) { 196 rte_errno = E2BIG; 197 return -1; 198 } 199 return 0; 200 } 201 202 int 203 rte_mp_action_register(const char *name, rte_mp_t action) 204 { 205 struct action_entry *entry; 206 const struct internal_config *internal_conf = 207 eal_get_internal_configuration(); 208 209 if (validate_action_name(name) != 0) 210 return -1; 211 212 if (internal_conf->no_shconf) { 213 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 214 rte_errno = ENOTSUP; 215 return -1; 216 } 217 218 entry = malloc(sizeof(struct action_entry)); 219 if (entry == NULL) { 220 rte_errno = ENOMEM; 221 return -1; 222 } 223 strlcpy(entry->action_name, name, sizeof(entry->action_name)); 224 entry->action = action; 225 226 pthread_mutex_lock(&mp_mutex_action); 227 if (find_action_entry_by_name(name) != NULL) { 228 pthread_mutex_unlock(&mp_mutex_action); 229 rte_errno = EEXIST; 230 free(entry); 231 return -1; 232 } 233 TAILQ_INSERT_TAIL(&action_entry_list, entry, next); 234 pthread_mutex_unlock(&mp_mutex_action); 235 return 0; 236 } 237 238 void 239 rte_mp_action_unregister(const char *name) 240 { 241 struct action_entry *entry; 242 const struct internal_config *internal_conf = 243 eal_get_internal_configuration(); 244 245 if (validate_action_name(name) != 0) 246 return; 247 248 if (internal_conf->no_shconf) { 249 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 250 return; 251 } 252 253 pthread_mutex_lock(&mp_mutex_action); 254 entry = find_action_entry_by_name(name); 255 if (entry == NULL) { 256 pthread_mutex_unlock(&mp_mutex_action); 257 return; 258 } 259 TAILQ_REMOVE(&action_entry_list, entry, next); 260 pthread_mutex_unlock(&mp_mutex_action); 261 free(entry); 262 } 263 264 static int 265 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 266 { 267 int msglen; 268 struct iovec iov; 269 struct msghdr msgh; 270 char control[CMSG_SPACE(sizeof(m->msg.fds))]; 271 struct cmsghdr *cmsg; 272 int buflen = sizeof(*m) - sizeof(m->msg.fds); 273 274 memset(&msgh, 0, sizeof(msgh)); 275 iov.iov_base = m; 276 iov.iov_len = buflen; 277 278 msgh.msg_name = s; 279 msgh.msg_namelen = sizeof(*s); 280 msgh.msg_iov = &iov; 281 msgh.msg_iovlen = 1; 282 msgh.msg_control = control; 283 msgh.msg_controllen = sizeof(control); 284 285 msglen = recvmsg(mp_fd, &msgh, 0); 286 if (msglen < 0) { 287 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno)); 288 return -1; 289 } 290 291 if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) { 292 RTE_LOG(ERR, EAL, "truncated msg\n"); 293 return -1; 294 } 295 296 /* read auxiliary FDs if any */ 297 for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; 298 cmsg = CMSG_NXTHDR(&msgh, cmsg)) { 299 if ((cmsg->cmsg_level == SOL_SOCKET) && 300 (cmsg->cmsg_type == SCM_RIGHTS)) { 301 memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds)); 302 break; 303 } 304 } 305 /* sanity-check the response */ 306 if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) { 307 RTE_LOG(ERR, EAL, "invalid number of fd's received\n"); 308 return -1; 309 } 310 if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) { 311 RTE_LOG(ERR, EAL, "invalid received data length\n"); 312 return -1; 313 } 314 return 0; 315 } 316 317 static void 318 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 319 { 320 struct pending_request *pending_req; 321 struct action_entry *entry; 322 struct rte_mp_msg *msg = &m->msg; 323 rte_mp_t action = NULL; 324 const struct internal_config *internal_conf = 325 eal_get_internal_configuration(); 326 327 RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); 328 329 if (m->type == MP_REP || m->type == MP_IGN) { 330 struct pending_request *req = NULL; 331 332 pthread_mutex_lock(&pending_requests.lock); 333 pending_req = find_pending_request(s->sun_path, msg->name); 334 if (pending_req) { 335 memcpy(pending_req->reply, msg, sizeof(*msg)); 336 /* -1 indicates that we've been asked to ignore */ 337 pending_req->reply_received = 338 m->type == MP_REP ? 1 : -1; 339 340 if (pending_req->type == REQUEST_TYPE_SYNC) 341 pthread_cond_signal(&pending_req->sync.cond); 342 else if (pending_req->type == REQUEST_TYPE_ASYNC) 343 req = async_reply_handle_thread_unsafe( 344 pending_req); 345 } else 346 RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); 347 pthread_mutex_unlock(&pending_requests.lock); 348 349 if (req != NULL) 350 trigger_async_action(req); 351 return; 352 } 353 354 pthread_mutex_lock(&mp_mutex_action); 355 entry = find_action_entry_by_name(msg->name); 356 if (entry != NULL) 357 action = entry->action; 358 pthread_mutex_unlock(&mp_mutex_action); 359 360 if (!action) { 361 if (m->type == MP_REQ && !internal_conf->init_complete) { 362 /* if this is a request, and init is not yet complete, 363 * and callback wasn't registered, we should tell the 364 * requester to ignore our existence because we're not 365 * yet ready to process this request. 366 */ 367 struct rte_mp_msg dummy; 368 369 memset(&dummy, 0, sizeof(dummy)); 370 strlcpy(dummy.name, msg->name, sizeof(dummy.name)); 371 mp_send(&dummy, s->sun_path, MP_IGN); 372 } else { 373 RTE_LOG(ERR, EAL, "Cannot find action: %s\n", 374 msg->name); 375 } 376 } else if (action(msg, s->sun_path) < 0) { 377 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); 378 } 379 } 380 381 static void * 382 mp_handle(void *arg __rte_unused) 383 { 384 struct mp_msg_internal msg; 385 struct sockaddr_un sa; 386 387 while (mp_fd >= 0) { 388 if (read_msg(&msg, &sa) == 0) 389 process_msg(&msg, &sa); 390 } 391 392 return NULL; 393 } 394 395 static int 396 timespec_cmp(const struct timespec *a, const struct timespec *b) 397 { 398 if (a->tv_sec < b->tv_sec) 399 return -1; 400 if (a->tv_sec > b->tv_sec) 401 return 1; 402 if (a->tv_nsec < b->tv_nsec) 403 return -1; 404 if (a->tv_nsec > b->tv_nsec) 405 return 1; 406 return 0; 407 } 408 409 enum async_action { 410 ACTION_FREE, /**< free the action entry, but don't trigger callback */ 411 ACTION_TRIGGER /**< trigger callback, then free action entry */ 412 }; 413 414 static enum async_action 415 process_async_request(struct pending_request *sr, const struct timespec *now) 416 { 417 struct async_request_param *param; 418 struct rte_mp_reply *reply; 419 bool timeout, last_msg; 420 421 param = sr->async.param; 422 reply = ¶m->user_reply; 423 424 /* did we timeout? */ 425 timeout = timespec_cmp(¶m->end, now) <= 0; 426 427 /* if we received a response, adjust relevant data and copy message. */ 428 if (sr->reply_received == 1 && sr->reply) { 429 struct rte_mp_msg *msg, *user_msgs, *tmp; 430 431 msg = sr->reply; 432 user_msgs = reply->msgs; 433 434 tmp = realloc(user_msgs, sizeof(*msg) * 435 (reply->nb_received + 1)); 436 if (!tmp) { 437 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 438 sr->dst, sr->request->name); 439 /* this entry is going to be removed and its message 440 * dropped, but we don't want to leak memory, so 441 * continue. 442 */ 443 } else { 444 user_msgs = tmp; 445 reply->msgs = user_msgs; 446 memcpy(&user_msgs[reply->nb_received], 447 msg, sizeof(*msg)); 448 reply->nb_received++; 449 } 450 451 /* mark this request as processed */ 452 param->n_responses_processed++; 453 } else if (sr->reply_received == -1) { 454 /* we were asked to ignore this process */ 455 reply->nb_sent--; 456 } else if (timeout) { 457 /* count it as processed response, but don't increment 458 * nb_received. 459 */ 460 param->n_responses_processed++; 461 } 462 463 free(sr->reply); 464 465 last_msg = param->n_responses_processed == reply->nb_sent; 466 467 return last_msg ? ACTION_TRIGGER : ACTION_FREE; 468 } 469 470 static void 471 trigger_async_action(struct pending_request *sr) 472 { 473 struct async_request_param *param; 474 struct rte_mp_reply *reply; 475 476 param = sr->async.param; 477 reply = ¶m->user_reply; 478 479 param->clb(sr->request, reply); 480 481 /* clean up */ 482 free(sr->async.param->user_reply.msgs); 483 free(sr->async.param); 484 free(sr->request); 485 free(sr); 486 } 487 488 static struct pending_request * 489 async_reply_handle_thread_unsafe(void *arg) 490 { 491 struct pending_request *req = (struct pending_request *)arg; 492 enum async_action action; 493 struct timespec ts_now; 494 495 if (clock_gettime(CLOCK_MONOTONIC, &ts_now) < 0) { 496 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 497 goto no_trigger; 498 } 499 500 action = process_async_request(req, &ts_now); 501 502 TAILQ_REMOVE(&pending_requests.requests, req, next); 503 504 if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) { 505 /* if we failed to cancel the alarm because it's already in 506 * progress, don't proceed because otherwise we will end up 507 * handling the same message twice. 508 */ 509 if (rte_errno == EINPROGRESS) { 510 RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n"); 511 goto no_trigger; 512 } 513 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n"); 514 } 515 516 if (action == ACTION_TRIGGER) 517 return req; 518 no_trigger: 519 free(req); 520 return NULL; 521 } 522 523 static void 524 async_reply_handle(void *arg) 525 { 526 struct pending_request *req; 527 528 pthread_mutex_lock(&pending_requests.lock); 529 req = async_reply_handle_thread_unsafe(arg); 530 pthread_mutex_unlock(&pending_requests.lock); 531 532 if (req != NULL) 533 trigger_async_action(req); 534 } 535 536 static int 537 open_socket_fd(void) 538 { 539 struct sockaddr_un un; 540 541 peer_name[0] = '\0'; 542 if (rte_eal_process_type() == RTE_PROC_SECONDARY) 543 snprintf(peer_name, sizeof(peer_name), 544 "%d_%"PRIx64, getpid(), rte_rdtsc()); 545 546 mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); 547 if (mp_fd < 0) { 548 RTE_LOG(ERR, EAL, "failed to create unix socket\n"); 549 return -1; 550 } 551 552 memset(&un, 0, sizeof(un)); 553 un.sun_family = AF_UNIX; 554 555 create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); 556 557 unlink(un.sun_path); /* May still exist since last run */ 558 559 if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { 560 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", 561 un.sun_path, strerror(errno)); 562 close(mp_fd); 563 return -1; 564 } 565 566 RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path); 567 return mp_fd; 568 } 569 570 static void 571 close_socket_fd(int fd) 572 { 573 char path[PATH_MAX]; 574 575 close(fd); 576 create_socket_path(peer_name, path, sizeof(path)); 577 unlink(path); 578 } 579 580 int 581 rte_mp_channel_init(void) 582 { 583 char path[PATH_MAX]; 584 int dir_fd; 585 const struct internal_config *internal_conf = 586 eal_get_internal_configuration(); 587 588 /* in no shared files mode, we do not have secondary processes support, 589 * so no need to initialize IPC. 590 */ 591 if (internal_conf->no_shconf) { 592 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n"); 593 rte_errno = ENOTSUP; 594 return -1; 595 } 596 597 /* create filter path */ 598 create_socket_path("*", path, sizeof(path)); 599 strlcpy(mp_filter, basename(path), sizeof(mp_filter)); 600 601 /* path may have been modified, so recreate it */ 602 create_socket_path("*", path, sizeof(path)); 603 strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); 604 605 /* lock the directory */ 606 dir_fd = open(mp_dir_path, O_RDONLY); 607 if (dir_fd < 0) { 608 RTE_LOG(ERR, EAL, "failed to open %s: %s\n", 609 mp_dir_path, strerror(errno)); 610 return -1; 611 } 612 613 if (flock(dir_fd, LOCK_EX)) { 614 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", 615 mp_dir_path, strerror(errno)); 616 close(dir_fd); 617 return -1; 618 } 619 620 if (open_socket_fd() < 0) { 621 close(dir_fd); 622 return -1; 623 } 624 625 if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle", 626 NULL, mp_handle, NULL) < 0) { 627 RTE_LOG(ERR, EAL, "failed to create mp thread: %s\n", 628 strerror(errno)); 629 close(mp_fd); 630 close(dir_fd); 631 mp_fd = -1; 632 return -1; 633 } 634 635 /* unlock the directory */ 636 flock(dir_fd, LOCK_UN); 637 close(dir_fd); 638 639 return 0; 640 } 641 642 void 643 rte_mp_channel_cleanup(void) 644 { 645 int fd; 646 647 if (mp_fd < 0) 648 return; 649 650 fd = mp_fd; 651 mp_fd = -1; 652 pthread_cancel(mp_handle_tid); 653 pthread_join(mp_handle_tid, NULL); 654 close_socket_fd(fd); 655 } 656 657 /** 658 * Return -1, as fail to send message and it's caused by the local side. 659 * Return 0, as fail to send message and it's caused by the remote side. 660 * Return 1, as succeed to send message. 661 * 662 */ 663 static int 664 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) 665 { 666 int snd; 667 struct iovec iov; 668 struct msghdr msgh; 669 struct cmsghdr *cmsg; 670 struct sockaddr_un dst; 671 struct mp_msg_internal m; 672 int fd_size = msg->num_fds * sizeof(int); 673 char control[CMSG_SPACE(fd_size)]; 674 675 m.type = type; 676 memcpy(&m.msg, msg, sizeof(*msg)); 677 678 memset(&dst, 0, sizeof(dst)); 679 dst.sun_family = AF_UNIX; 680 strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); 681 682 memset(&msgh, 0, sizeof(msgh)); 683 memset(control, 0, sizeof(control)); 684 685 iov.iov_base = &m; 686 iov.iov_len = sizeof(m) - sizeof(msg->fds); 687 688 msgh.msg_name = &dst; 689 msgh.msg_namelen = sizeof(dst); 690 msgh.msg_iov = &iov; 691 msgh.msg_iovlen = 1; 692 msgh.msg_control = control; 693 msgh.msg_controllen = sizeof(control); 694 695 cmsg = CMSG_FIRSTHDR(&msgh); 696 cmsg->cmsg_len = CMSG_LEN(fd_size); 697 cmsg->cmsg_level = SOL_SOCKET; 698 cmsg->cmsg_type = SCM_RIGHTS; 699 memcpy(CMSG_DATA(cmsg), msg->fds, fd_size); 700 701 do { 702 snd = sendmsg(mp_fd, &msgh, 0); 703 } while (snd < 0 && errno == EINTR); 704 705 if (snd < 0) { 706 rte_errno = errno; 707 /* Check if it caused by peer process exits */ 708 if (errno == ECONNREFUSED && 709 rte_eal_process_type() == RTE_PROC_PRIMARY) { 710 unlink(dst_path); 711 return 0; 712 } 713 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n", 714 dst_path, strerror(errno)); 715 return -1; 716 } 717 718 return 1; 719 } 720 721 static int 722 mp_send(struct rte_mp_msg *msg, const char *peer, int type) 723 { 724 int dir_fd, ret = 0; 725 DIR *mp_dir; 726 struct dirent *ent; 727 728 if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY)) 729 peer = eal_mp_socket_path(); 730 731 if (peer) { 732 if (send_msg(peer, msg, type) < 0) 733 return -1; 734 else 735 return 0; 736 } 737 738 /* broadcast to all secondary processes */ 739 mp_dir = opendir(mp_dir_path); 740 if (!mp_dir) { 741 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", 742 mp_dir_path); 743 rte_errno = errno; 744 return -1; 745 } 746 747 dir_fd = dirfd(mp_dir); 748 /* lock the directory to prevent processes spinning up while we send */ 749 if (flock(dir_fd, LOCK_SH)) { 750 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 751 mp_dir_path); 752 rte_errno = errno; 753 closedir(mp_dir); 754 return -1; 755 } 756 757 while ((ent = readdir(mp_dir))) { 758 char path[PATH_MAX]; 759 760 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 761 continue; 762 763 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 764 ent->d_name); 765 if (send_msg(path, msg, type) < 0) 766 ret = -1; 767 } 768 /* unlock the dir */ 769 flock(dir_fd, LOCK_UN); 770 771 /* dir_fd automatically closed on closedir */ 772 closedir(mp_dir); 773 return ret; 774 } 775 776 static int 777 check_input(const struct rte_mp_msg *msg) 778 { 779 if (msg == NULL) { 780 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n"); 781 rte_errno = EINVAL; 782 return -1; 783 } 784 785 if (validate_action_name(msg->name) != 0) 786 return -1; 787 788 if (msg->len_param < 0) { 789 RTE_LOG(ERR, EAL, "Message data length is negative\n"); 790 rte_errno = EINVAL; 791 return -1; 792 } 793 794 if (msg->num_fds < 0) { 795 RTE_LOG(ERR, EAL, "Number of fd's is negative\n"); 796 rte_errno = EINVAL; 797 return -1; 798 } 799 800 if (msg->len_param > RTE_MP_MAX_PARAM_LEN) { 801 RTE_LOG(ERR, EAL, "Message data is too long\n"); 802 rte_errno = E2BIG; 803 return -1; 804 } 805 806 if (msg->num_fds > RTE_MP_MAX_FD_NUM) { 807 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", 808 RTE_MP_MAX_FD_NUM); 809 rte_errno = E2BIG; 810 return -1; 811 } 812 813 return 0; 814 } 815 816 int 817 rte_mp_sendmsg(struct rte_mp_msg *msg) 818 { 819 const struct internal_config *internal_conf = 820 eal_get_internal_configuration(); 821 822 if (check_input(msg) != 0) 823 return -1; 824 825 if (internal_conf->no_shconf) { 826 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 827 rte_errno = ENOTSUP; 828 return -1; 829 } 830 831 RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name); 832 return mp_send(msg, NULL, MP_MSG); 833 } 834 835 static int 836 mp_request_async(const char *dst, struct rte_mp_msg *req, 837 struct async_request_param *param, const struct timespec *ts) 838 { 839 struct rte_mp_msg *reply_msg; 840 struct pending_request *pending_req, *exist; 841 int ret = -1; 842 843 pending_req = calloc(1, sizeof(*pending_req)); 844 reply_msg = calloc(1, sizeof(*reply_msg)); 845 if (pending_req == NULL || reply_msg == NULL) { 846 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); 847 rte_errno = ENOMEM; 848 ret = -1; 849 goto fail; 850 } 851 852 pending_req->type = REQUEST_TYPE_ASYNC; 853 strlcpy(pending_req->dst, dst, sizeof(pending_req->dst)); 854 pending_req->request = req; 855 pending_req->reply = reply_msg; 856 pending_req->async.param = param; 857 858 /* queue already locked by caller */ 859 860 exist = find_pending_request(dst, req->name); 861 if (exist) { 862 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 863 rte_errno = EEXIST; 864 ret = -1; 865 goto fail; 866 } 867 868 ret = send_msg(dst, req, MP_REQ); 869 if (ret < 0) { 870 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 871 dst, req->name); 872 ret = -1; 873 goto fail; 874 } else if (ret == 0) { 875 ret = 0; 876 goto fail; 877 } 878 param->user_reply.nb_sent++; 879 880 /* if alarm set fails, we simply ignore the reply */ 881 if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000, 882 async_reply_handle, pending_req) < 0) { 883 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n", 884 dst, req->name); 885 ret = -1; 886 goto fail; 887 } 888 TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next); 889 890 return 0; 891 fail: 892 free(pending_req); 893 free(reply_msg); 894 return ret; 895 } 896 897 static int 898 mp_request_sync(const char *dst, struct rte_mp_msg *req, 899 struct rte_mp_reply *reply, const struct timespec *ts) 900 { 901 int ret; 902 pthread_condattr_t attr; 903 struct rte_mp_msg msg, *tmp; 904 struct pending_request pending_req, *exist; 905 906 pending_req.type = REQUEST_TYPE_SYNC; 907 pending_req.reply_received = 0; 908 strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); 909 pending_req.request = req; 910 pending_req.reply = &msg; 911 pthread_condattr_init(&attr); 912 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); 913 pthread_cond_init(&pending_req.sync.cond, &attr); 914 915 exist = find_pending_request(dst, req->name); 916 if (exist) { 917 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 918 rte_errno = EEXIST; 919 return -1; 920 } 921 922 ret = send_msg(dst, req, MP_REQ); 923 if (ret < 0) { 924 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 925 dst, req->name); 926 return -1; 927 } else if (ret == 0) 928 return 0; 929 930 TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next); 931 932 reply->nb_sent++; 933 934 do { 935 ret = pthread_cond_timedwait(&pending_req.sync.cond, 936 &pending_requests.lock, ts); 937 } while (ret != 0 && ret != ETIMEDOUT); 938 939 TAILQ_REMOVE(&pending_requests.requests, &pending_req, next); 940 941 if (pending_req.reply_received == 0) { 942 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n", 943 dst, req->name); 944 rte_errno = ETIMEDOUT; 945 return -1; 946 } 947 if (pending_req.reply_received == -1) { 948 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); 949 /* not receiving this message is not an error, so decrement 950 * number of sent messages 951 */ 952 reply->nb_sent--; 953 return 0; 954 } 955 956 tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); 957 if (!tmp) { 958 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 959 dst, req->name); 960 rte_errno = ENOMEM; 961 return -1; 962 } 963 memcpy(&tmp[reply->nb_received], &msg, sizeof(msg)); 964 reply->msgs = tmp; 965 reply->nb_received++; 966 return 0; 967 } 968 969 int 970 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, 971 const struct timespec *ts) 972 { 973 int dir_fd, ret = -1; 974 DIR *mp_dir; 975 struct dirent *ent; 976 struct timespec now, end; 977 const struct internal_config *internal_conf = 978 eal_get_internal_configuration(); 979 980 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 981 982 reply->nb_sent = 0; 983 reply->nb_received = 0; 984 reply->msgs = NULL; 985 986 if (check_input(req) != 0) 987 goto end; 988 989 if (internal_conf->no_shconf) { 990 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 991 rte_errno = ENOTSUP; 992 return -1; 993 } 994 995 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 996 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 997 rte_errno = errno; 998 goto end; 999 } 1000 1001 end.tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1002 end.tv_sec = now.tv_sec + ts->tv_sec + 1003 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1004 1005 /* for secondary process, send request to the primary process only */ 1006 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1007 pthread_mutex_lock(&pending_requests.lock); 1008 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); 1009 pthread_mutex_unlock(&pending_requests.lock); 1010 goto end; 1011 } 1012 1013 /* for primary process, broadcast request, and collect reply 1 by 1 */ 1014 mp_dir = opendir(mp_dir_path); 1015 if (!mp_dir) { 1016 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1017 rte_errno = errno; 1018 goto end; 1019 } 1020 1021 dir_fd = dirfd(mp_dir); 1022 /* lock the directory to prevent processes spinning up while we send */ 1023 if (flock(dir_fd, LOCK_SH)) { 1024 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1025 mp_dir_path); 1026 rte_errno = errno; 1027 goto close_end; 1028 } 1029 1030 pthread_mutex_lock(&pending_requests.lock); 1031 while ((ent = readdir(mp_dir))) { 1032 char path[PATH_MAX]; 1033 1034 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1035 continue; 1036 1037 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1038 ent->d_name); 1039 1040 /* unlocks the mutex while waiting for response, 1041 * locks on receive 1042 */ 1043 if (mp_request_sync(path, req, reply, &end)) 1044 goto unlock_end; 1045 } 1046 ret = 0; 1047 1048 unlock_end: 1049 pthread_mutex_unlock(&pending_requests.lock); 1050 /* unlock the directory */ 1051 flock(dir_fd, LOCK_UN); 1052 1053 close_end: 1054 /* dir_fd automatically closed on closedir */ 1055 closedir(mp_dir); 1056 1057 end: 1058 if (ret) { 1059 free(reply->msgs); 1060 reply->nb_received = 0; 1061 reply->msgs = NULL; 1062 } 1063 return ret; 1064 } 1065 1066 int 1067 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, 1068 rte_mp_async_reply_t clb) 1069 { 1070 struct rte_mp_msg *copy; 1071 struct pending_request *dummy; 1072 struct async_request_param *param; 1073 struct rte_mp_reply *reply; 1074 int dir_fd, ret = 0; 1075 DIR *mp_dir; 1076 struct dirent *ent; 1077 struct timespec now; 1078 struct timespec *end; 1079 bool dummy_used = false; 1080 const struct internal_config *internal_conf = 1081 eal_get_internal_configuration(); 1082 1083 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 1084 1085 if (check_input(req) != 0) 1086 return -1; 1087 1088 if (internal_conf->no_shconf) { 1089 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1090 rte_errno = ENOTSUP; 1091 return -1; 1092 } 1093 1094 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1095 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 1096 rte_errno = errno; 1097 return -1; 1098 } 1099 copy = calloc(1, sizeof(*copy)); 1100 dummy = calloc(1, sizeof(*dummy)); 1101 param = calloc(1, sizeof(*param)); 1102 if (copy == NULL || dummy == NULL || param == NULL) { 1103 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); 1104 rte_errno = ENOMEM; 1105 goto fail; 1106 } 1107 1108 /* copy message */ 1109 memcpy(copy, req, sizeof(*copy)); 1110 1111 param->n_responses_processed = 0; 1112 param->clb = clb; 1113 end = ¶m->end; 1114 reply = ¶m->user_reply; 1115 1116 end->tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1117 end->tv_sec = now.tv_sec + ts->tv_sec + 1118 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1119 reply->nb_sent = 0; 1120 reply->nb_received = 0; 1121 reply->msgs = NULL; 1122 1123 /* we have to lock the request queue here, as we will be adding a bunch 1124 * of requests to the queue at once, and some of the replies may arrive 1125 * before we add all of the requests to the queue. 1126 */ 1127 pthread_mutex_lock(&pending_requests.lock); 1128 1129 /* we have to ensure that callback gets triggered even if we don't send 1130 * anything, therefore earlier we have allocated a dummy request. fill 1131 * it, and put it on the queue if we don't send any requests. 1132 */ 1133 dummy->type = REQUEST_TYPE_ASYNC; 1134 dummy->request = copy; 1135 dummy->reply = NULL; 1136 dummy->async.param = param; 1137 dummy->reply_received = 1; /* short-circuit the timeout */ 1138 1139 /* for secondary process, send request to the primary process only */ 1140 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1141 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts); 1142 1143 /* if we didn't send anything, put dummy request on the queue */ 1144 if (ret == 0 && reply->nb_sent == 0) { 1145 TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, 1146 next); 1147 dummy_used = true; 1148 } 1149 1150 pthread_mutex_unlock(&pending_requests.lock); 1151 1152 /* if we couldn't send anything, clean up */ 1153 if (ret != 0) 1154 goto fail; 1155 return 0; 1156 } 1157 1158 /* for primary process, broadcast request */ 1159 mp_dir = opendir(mp_dir_path); 1160 if (!mp_dir) { 1161 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1162 rte_errno = errno; 1163 goto unlock_fail; 1164 } 1165 dir_fd = dirfd(mp_dir); 1166 1167 /* lock the directory to prevent processes spinning up while we send */ 1168 if (flock(dir_fd, LOCK_SH)) { 1169 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1170 mp_dir_path); 1171 rte_errno = errno; 1172 goto closedir_fail; 1173 } 1174 1175 while ((ent = readdir(mp_dir))) { 1176 char path[PATH_MAX]; 1177 1178 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1179 continue; 1180 1181 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1182 ent->d_name); 1183 1184 if (mp_request_async(path, copy, param, ts)) 1185 ret = -1; 1186 } 1187 /* if we didn't send anything, put dummy request on the queue */ 1188 if (ret == 0 && reply->nb_sent == 0) { 1189 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); 1190 dummy_used = true; 1191 } 1192 1193 /* finally, unlock the queue */ 1194 pthread_mutex_unlock(&pending_requests.lock); 1195 1196 /* unlock the directory */ 1197 flock(dir_fd, LOCK_UN); 1198 1199 /* dir_fd automatically closed on closedir */ 1200 closedir(mp_dir); 1201 1202 /* if dummy was unused, free it */ 1203 if (!dummy_used) 1204 free(dummy); 1205 1206 return ret; 1207 closedir_fail: 1208 closedir(mp_dir); 1209 unlock_fail: 1210 pthread_mutex_unlock(&pending_requests.lock); 1211 fail: 1212 free(dummy); 1213 free(param); 1214 free(copy); 1215 return -1; 1216 } 1217 1218 int 1219 rte_mp_reply(struct rte_mp_msg *msg, const char *peer) 1220 { 1221 RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); 1222 const struct internal_config *internal_conf = 1223 eal_get_internal_configuration(); 1224 1225 if (check_input(msg) != 0) 1226 return -1; 1227 1228 if (peer == NULL) { 1229 RTE_LOG(ERR, EAL, "peer is not specified\n"); 1230 rte_errno = EINVAL; 1231 return -1; 1232 } 1233 1234 if (internal_conf->no_shconf) { 1235 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1236 return 0; 1237 } 1238 1239 return mp_send(msg, peer, MP_REP); 1240 } 1241 1242 /* Internally, the status of the mp feature is represented as a three-state: 1243 * - "unknown" as long as no secondary process attached to a primary process 1244 * and there was no call to rte_mp_disable yet, 1245 * - "enabled" as soon as a secondary process attaches to a primary process, 1246 * - "disabled" when a primary process successfully called rte_mp_disable, 1247 */ 1248 enum mp_status { 1249 MP_STATUS_UNKNOWN, 1250 MP_STATUS_DISABLED, 1251 MP_STATUS_ENABLED, 1252 }; 1253 1254 static bool 1255 set_mp_status(enum mp_status status) 1256 { 1257 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 1258 uint8_t expected; 1259 uint8_t desired; 1260 1261 expected = MP_STATUS_UNKNOWN; 1262 desired = status; 1263 if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired, 1264 false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) 1265 return true; 1266 1267 return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired; 1268 } 1269 1270 bool 1271 rte_mp_disable(void) 1272 { 1273 return set_mp_status(MP_STATUS_DISABLED); 1274 } 1275 1276 bool 1277 __rte_mp_enable(void) 1278 { 1279 return set_mp_status(MP_STATUS_ENABLED); 1280 } 1281