1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2018 Intel Corporation 3 */ 4 5 #include <dirent.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <fnmatch.h> 9 #include <inttypes.h> 10 #include <libgen.h> 11 #include <limits.h> 12 #include <pthread.h> 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <sys/file.h> 17 #include <sys/time.h> 18 #include <sys/types.h> 19 #include <sys/socket.h> 20 #include <sys/un.h> 21 #include <unistd.h> 22 23 #include <rte_alarm.h> 24 #include <rte_common.h> 25 #include <rte_cycles.h> 26 #include <rte_eal.h> 27 #include <rte_errno.h> 28 #include <rte_lcore.h> 29 #include <rte_log.h> 30 #include <rte_tailq.h> 31 32 #include "eal_memcfg.h" 33 #include "eal_private.h" 34 #include "eal_filesystem.h" 35 #include "eal_internal_cfg.h" 36 37 static int mp_fd = -1; 38 static char mp_filter[PATH_MAX]; /* Filter for secondary process sockets */ 39 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */ 40 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER; 41 static char peer_name[PATH_MAX]; 42 43 struct action_entry { 44 TAILQ_ENTRY(action_entry) next; 45 char action_name[RTE_MP_MAX_NAME_LEN]; 46 rte_mp_t action; 47 }; 48 49 /** Double linked list of actions. */ 50 TAILQ_HEAD(action_entry_list, action_entry); 51 52 static struct action_entry_list action_entry_list = 53 TAILQ_HEAD_INITIALIZER(action_entry_list); 54 55 enum mp_type { 56 MP_MSG, /* Share message with peers, will not block */ 57 MP_REQ, /* Request for information, Will block for a reply */ 58 MP_REP, /* Response to previously-received request */ 59 MP_IGN, /* Response telling requester to ignore this response */ 60 }; 61 62 struct mp_msg_internal { 63 int type; 64 struct rte_mp_msg msg; 65 }; 66 67 struct async_request_param { 68 rte_mp_async_reply_t clb; 69 struct rte_mp_reply user_reply; 70 struct timespec end; 71 int n_responses_processed; 72 }; 73 74 struct pending_request { 75 TAILQ_ENTRY(pending_request) next; 76 enum { 77 REQUEST_TYPE_SYNC, 78 REQUEST_TYPE_ASYNC 79 } type; 80 char dst[PATH_MAX]; 81 struct rte_mp_msg *request; 82 struct rte_mp_msg *reply; 83 int reply_received; 84 RTE_STD_C11 85 union { 86 struct { 87 struct async_request_param *param; 88 } async; 89 struct { 90 pthread_cond_t cond; 91 } sync; 92 }; 93 }; 94 95 TAILQ_HEAD(pending_request_list, pending_request); 96 97 static struct { 98 struct pending_request_list requests; 99 pthread_mutex_t lock; 100 } pending_requests = { 101 .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), 102 .lock = PTHREAD_MUTEX_INITIALIZER, 103 /**< used in async requests only */ 104 }; 105 106 /* forward declarations */ 107 static int 108 mp_send(struct rte_mp_msg *msg, const char *peer, int type); 109 110 /* for use with alarm callback */ 111 static void 112 async_reply_handle(void *arg); 113 114 /* for use with process_msg */ 115 static struct pending_request * 116 async_reply_handle_thread_unsafe(void *arg); 117 118 static void 119 trigger_async_action(struct pending_request *req); 120 121 static struct pending_request * 122 find_pending_request(const char *dst, const char *act_name) 123 { 124 struct pending_request *r; 125 126 TAILQ_FOREACH(r, &pending_requests.requests, next) { 127 if (!strcmp(r->dst, dst) && 128 !strcmp(r->request->name, act_name)) 129 break; 130 } 131 132 return r; 133 } 134 135 static void 136 create_socket_path(const char *name, char *buf, int len) 137 { 138 const char *prefix = eal_mp_socket_path(); 139 140 if (strlen(name) > 0) 141 snprintf(buf, len, "%s_%s", prefix, name); 142 else 143 strlcpy(buf, prefix, len); 144 } 145 146 int 147 rte_eal_primary_proc_alive(const char *config_file_path) 148 { 149 int config_fd; 150 151 if (config_file_path) 152 config_fd = open(config_file_path, O_RDONLY); 153 else { 154 const char *path; 155 156 path = eal_runtime_config_path(); 157 config_fd = open(path, O_RDONLY); 158 } 159 if (config_fd < 0) 160 return 0; 161 162 int ret = lockf(config_fd, F_TEST, 0); 163 close(config_fd); 164 165 return !!ret; 166 } 167 168 static struct action_entry * 169 find_action_entry_by_name(const char *name) 170 { 171 struct action_entry *entry; 172 173 TAILQ_FOREACH(entry, &action_entry_list, next) { 174 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0) 175 break; 176 } 177 178 return entry; 179 } 180 181 static int 182 validate_action_name(const char *name) 183 { 184 if (name == NULL) { 185 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n"); 186 rte_errno = EINVAL; 187 return -1; 188 } 189 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) { 190 RTE_LOG(ERR, EAL, "Length of action name is zero\n"); 191 rte_errno = EINVAL; 192 return -1; 193 } 194 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) { 195 rte_errno = E2BIG; 196 return -1; 197 } 198 return 0; 199 } 200 201 int 202 rte_mp_action_register(const char *name, rte_mp_t action) 203 { 204 struct action_entry *entry; 205 const struct internal_config *internal_conf = 206 eal_get_internal_configuration(); 207 208 if (validate_action_name(name) != 0) 209 return -1; 210 211 if (internal_conf->no_shconf) { 212 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 213 rte_errno = ENOTSUP; 214 return -1; 215 } 216 217 entry = malloc(sizeof(struct action_entry)); 218 if (entry == NULL) { 219 rte_errno = ENOMEM; 220 return -1; 221 } 222 strlcpy(entry->action_name, name, sizeof(entry->action_name)); 223 entry->action = action; 224 225 pthread_mutex_lock(&mp_mutex_action); 226 if (find_action_entry_by_name(name) != NULL) { 227 pthread_mutex_unlock(&mp_mutex_action); 228 rte_errno = EEXIST; 229 free(entry); 230 return -1; 231 } 232 TAILQ_INSERT_TAIL(&action_entry_list, entry, next); 233 pthread_mutex_unlock(&mp_mutex_action); 234 return 0; 235 } 236 237 void 238 rte_mp_action_unregister(const char *name) 239 { 240 struct action_entry *entry; 241 const struct internal_config *internal_conf = 242 eal_get_internal_configuration(); 243 244 if (validate_action_name(name) != 0) 245 return; 246 247 if (internal_conf->no_shconf) { 248 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 249 return; 250 } 251 252 pthread_mutex_lock(&mp_mutex_action); 253 entry = find_action_entry_by_name(name); 254 if (entry == NULL) { 255 pthread_mutex_unlock(&mp_mutex_action); 256 return; 257 } 258 TAILQ_REMOVE(&action_entry_list, entry, next); 259 pthread_mutex_unlock(&mp_mutex_action); 260 free(entry); 261 } 262 263 static int 264 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 265 { 266 int msglen; 267 struct iovec iov; 268 struct msghdr msgh; 269 char control[CMSG_SPACE(sizeof(m->msg.fds))]; 270 struct cmsghdr *cmsg; 271 int buflen = sizeof(*m) - sizeof(m->msg.fds); 272 273 memset(&msgh, 0, sizeof(msgh)); 274 iov.iov_base = m; 275 iov.iov_len = buflen; 276 277 msgh.msg_name = s; 278 msgh.msg_namelen = sizeof(*s); 279 msgh.msg_iov = &iov; 280 msgh.msg_iovlen = 1; 281 msgh.msg_control = control; 282 msgh.msg_controllen = sizeof(control); 283 284 msglen = recvmsg(mp_fd, &msgh, 0); 285 if (msglen < 0) { 286 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno)); 287 return -1; 288 } 289 290 if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) { 291 RTE_LOG(ERR, EAL, "truncated msg\n"); 292 return -1; 293 } 294 295 /* read auxiliary FDs if any */ 296 for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; 297 cmsg = CMSG_NXTHDR(&msgh, cmsg)) { 298 if ((cmsg->cmsg_level == SOL_SOCKET) && 299 (cmsg->cmsg_type == SCM_RIGHTS)) { 300 memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds)); 301 break; 302 } 303 } 304 /* sanity-check the response */ 305 if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) { 306 RTE_LOG(ERR, EAL, "invalid number of fd's received\n"); 307 return -1; 308 } 309 if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) { 310 RTE_LOG(ERR, EAL, "invalid received data length\n"); 311 return -1; 312 } 313 return 0; 314 } 315 316 static void 317 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 318 { 319 struct pending_request *pending_req; 320 struct action_entry *entry; 321 struct rte_mp_msg *msg = &m->msg; 322 rte_mp_t action = NULL; 323 const struct internal_config *internal_conf = 324 eal_get_internal_configuration(); 325 326 RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); 327 328 if (m->type == MP_REP || m->type == MP_IGN) { 329 struct pending_request *req = NULL; 330 331 pthread_mutex_lock(&pending_requests.lock); 332 pending_req = find_pending_request(s->sun_path, msg->name); 333 if (pending_req) { 334 memcpy(pending_req->reply, msg, sizeof(*msg)); 335 /* -1 indicates that we've been asked to ignore */ 336 pending_req->reply_received = 337 m->type == MP_REP ? 1 : -1; 338 339 if (pending_req->type == REQUEST_TYPE_SYNC) 340 pthread_cond_signal(&pending_req->sync.cond); 341 else if (pending_req->type == REQUEST_TYPE_ASYNC) 342 req = async_reply_handle_thread_unsafe( 343 pending_req); 344 } else 345 RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); 346 pthread_mutex_unlock(&pending_requests.lock); 347 348 if (req != NULL) 349 trigger_async_action(req); 350 return; 351 } 352 353 pthread_mutex_lock(&mp_mutex_action); 354 entry = find_action_entry_by_name(msg->name); 355 if (entry != NULL) 356 action = entry->action; 357 pthread_mutex_unlock(&mp_mutex_action); 358 359 if (!action) { 360 if (m->type == MP_REQ && !internal_conf->init_complete) { 361 /* if this is a request, and init is not yet complete, 362 * and callback wasn't registered, we should tell the 363 * requester to ignore our existence because we're not 364 * yet ready to process this request. 365 */ 366 struct rte_mp_msg dummy; 367 368 memset(&dummy, 0, sizeof(dummy)); 369 strlcpy(dummy.name, msg->name, sizeof(dummy.name)); 370 mp_send(&dummy, s->sun_path, MP_IGN); 371 } else { 372 RTE_LOG(ERR, EAL, "Cannot find action: %s\n", 373 msg->name); 374 } 375 } else if (action(msg, s->sun_path) < 0) { 376 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); 377 } 378 } 379 380 static void * 381 mp_handle(void *arg __rte_unused) 382 { 383 struct mp_msg_internal msg; 384 struct sockaddr_un sa; 385 386 while (1) { 387 if (read_msg(&msg, &sa) == 0) 388 process_msg(&msg, &sa); 389 } 390 391 return NULL; 392 } 393 394 static int 395 timespec_cmp(const struct timespec *a, const struct timespec *b) 396 { 397 if (a->tv_sec < b->tv_sec) 398 return -1; 399 if (a->tv_sec > b->tv_sec) 400 return 1; 401 if (a->tv_nsec < b->tv_nsec) 402 return -1; 403 if (a->tv_nsec > b->tv_nsec) 404 return 1; 405 return 0; 406 } 407 408 enum async_action { 409 ACTION_FREE, /**< free the action entry, but don't trigger callback */ 410 ACTION_TRIGGER /**< trigger callback, then free action entry */ 411 }; 412 413 static enum async_action 414 process_async_request(struct pending_request *sr, const struct timespec *now) 415 { 416 struct async_request_param *param; 417 struct rte_mp_reply *reply; 418 bool timeout, last_msg; 419 420 param = sr->async.param; 421 reply = ¶m->user_reply; 422 423 /* did we timeout? */ 424 timeout = timespec_cmp(¶m->end, now) <= 0; 425 426 /* if we received a response, adjust relevant data and copy message. */ 427 if (sr->reply_received == 1 && sr->reply) { 428 struct rte_mp_msg *msg, *user_msgs, *tmp; 429 430 msg = sr->reply; 431 user_msgs = reply->msgs; 432 433 tmp = realloc(user_msgs, sizeof(*msg) * 434 (reply->nb_received + 1)); 435 if (!tmp) { 436 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 437 sr->dst, sr->request->name); 438 /* this entry is going to be removed and its message 439 * dropped, but we don't want to leak memory, so 440 * continue. 441 */ 442 } else { 443 user_msgs = tmp; 444 reply->msgs = user_msgs; 445 memcpy(&user_msgs[reply->nb_received], 446 msg, sizeof(*msg)); 447 reply->nb_received++; 448 } 449 450 /* mark this request as processed */ 451 param->n_responses_processed++; 452 } else if (sr->reply_received == -1) { 453 /* we were asked to ignore this process */ 454 reply->nb_sent--; 455 } else if (timeout) { 456 /* count it as processed response, but don't increment 457 * nb_received. 458 */ 459 param->n_responses_processed++; 460 } 461 462 free(sr->reply); 463 464 last_msg = param->n_responses_processed == reply->nb_sent; 465 466 return last_msg ? ACTION_TRIGGER : ACTION_FREE; 467 } 468 469 static void 470 trigger_async_action(struct pending_request *sr) 471 { 472 struct async_request_param *param; 473 struct rte_mp_reply *reply; 474 475 param = sr->async.param; 476 reply = ¶m->user_reply; 477 478 param->clb(sr->request, reply); 479 480 /* clean up */ 481 free(sr->async.param->user_reply.msgs); 482 free(sr->async.param); 483 free(sr->request); 484 free(sr); 485 } 486 487 static struct pending_request * 488 async_reply_handle_thread_unsafe(void *arg) 489 { 490 struct pending_request *req = (struct pending_request *)arg; 491 enum async_action action; 492 struct timespec ts_now; 493 struct timeval now; 494 495 if (gettimeofday(&now, NULL) < 0) { 496 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 497 goto no_trigger; 498 } 499 ts_now.tv_nsec = now.tv_usec * 1000; 500 ts_now.tv_sec = now.tv_sec; 501 502 action = process_async_request(req, &ts_now); 503 504 TAILQ_REMOVE(&pending_requests.requests, req, next); 505 506 if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) { 507 /* if we failed to cancel the alarm because it's already in 508 * progress, don't proceed because otherwise we will end up 509 * handling the same message twice. 510 */ 511 if (rte_errno == EINPROGRESS) { 512 RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n"); 513 goto no_trigger; 514 } 515 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n"); 516 } 517 518 if (action == ACTION_TRIGGER) 519 return req; 520 no_trigger: 521 free(req); 522 return NULL; 523 } 524 525 static void 526 async_reply_handle(void *arg) 527 { 528 struct pending_request *req; 529 530 pthread_mutex_lock(&pending_requests.lock); 531 req = async_reply_handle_thread_unsafe(arg); 532 pthread_mutex_unlock(&pending_requests.lock); 533 534 if (req != NULL) 535 trigger_async_action(req); 536 } 537 538 static int 539 open_socket_fd(void) 540 { 541 struct sockaddr_un un; 542 543 peer_name[0] = '\0'; 544 if (rte_eal_process_type() == RTE_PROC_SECONDARY) 545 snprintf(peer_name, sizeof(peer_name), 546 "%d_%"PRIx64, getpid(), rte_rdtsc()); 547 548 mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); 549 if (mp_fd < 0) { 550 RTE_LOG(ERR, EAL, "failed to create unix socket\n"); 551 return -1; 552 } 553 554 memset(&un, 0, sizeof(un)); 555 un.sun_family = AF_UNIX; 556 557 create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); 558 559 unlink(un.sun_path); /* May still exist since last run */ 560 561 if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { 562 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", 563 un.sun_path, strerror(errno)); 564 close(mp_fd); 565 return -1; 566 } 567 568 RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path); 569 return mp_fd; 570 } 571 572 static void 573 close_socket_fd(void) 574 { 575 char path[PATH_MAX]; 576 577 if (mp_fd < 0) 578 return; 579 580 close(mp_fd); 581 create_socket_path(peer_name, path, sizeof(path)); 582 unlink(path); 583 } 584 585 int 586 rte_mp_channel_init(void) 587 { 588 char path[PATH_MAX]; 589 int dir_fd; 590 pthread_t mp_handle_tid; 591 const struct internal_config *internal_conf = 592 eal_get_internal_configuration(); 593 594 /* in no shared files mode, we do not have secondary processes support, 595 * so no need to initialize IPC. 596 */ 597 if (internal_conf->no_shconf) { 598 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n"); 599 rte_errno = ENOTSUP; 600 return -1; 601 } 602 603 /* create filter path */ 604 create_socket_path("*", path, sizeof(path)); 605 strlcpy(mp_filter, basename(path), sizeof(mp_filter)); 606 607 /* path may have been modified, so recreate it */ 608 create_socket_path("*", path, sizeof(path)); 609 strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); 610 611 /* lock the directory */ 612 dir_fd = open(mp_dir_path, O_RDONLY); 613 if (dir_fd < 0) { 614 RTE_LOG(ERR, EAL, "failed to open %s: %s\n", 615 mp_dir_path, strerror(errno)); 616 return -1; 617 } 618 619 if (flock(dir_fd, LOCK_EX)) { 620 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", 621 mp_dir_path, strerror(errno)); 622 close(dir_fd); 623 return -1; 624 } 625 626 if (open_socket_fd() < 0) { 627 close(dir_fd); 628 return -1; 629 } 630 631 if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle", 632 NULL, mp_handle, NULL) < 0) { 633 RTE_LOG(ERR, EAL, "failed to create mp thread: %s\n", 634 strerror(errno)); 635 close(mp_fd); 636 close(dir_fd); 637 mp_fd = -1; 638 return -1; 639 } 640 641 /* unlock the directory */ 642 flock(dir_fd, LOCK_UN); 643 close(dir_fd); 644 645 return 0; 646 } 647 648 void 649 rte_mp_channel_cleanup(void) 650 { 651 close_socket_fd(); 652 } 653 654 /** 655 * Return -1, as fail to send message and it's caused by the local side. 656 * Return 0, as fail to send message and it's caused by the remote side. 657 * Return 1, as succeed to send message. 658 * 659 */ 660 static int 661 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) 662 { 663 int snd; 664 struct iovec iov; 665 struct msghdr msgh; 666 struct cmsghdr *cmsg; 667 struct sockaddr_un dst; 668 struct mp_msg_internal m; 669 int fd_size = msg->num_fds * sizeof(int); 670 char control[CMSG_SPACE(fd_size)]; 671 672 m.type = type; 673 memcpy(&m.msg, msg, sizeof(*msg)); 674 675 memset(&dst, 0, sizeof(dst)); 676 dst.sun_family = AF_UNIX; 677 strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); 678 679 memset(&msgh, 0, sizeof(msgh)); 680 memset(control, 0, sizeof(control)); 681 682 iov.iov_base = &m; 683 iov.iov_len = sizeof(m) - sizeof(msg->fds); 684 685 msgh.msg_name = &dst; 686 msgh.msg_namelen = sizeof(dst); 687 msgh.msg_iov = &iov; 688 msgh.msg_iovlen = 1; 689 msgh.msg_control = control; 690 msgh.msg_controllen = sizeof(control); 691 692 cmsg = CMSG_FIRSTHDR(&msgh); 693 cmsg->cmsg_len = CMSG_LEN(fd_size); 694 cmsg->cmsg_level = SOL_SOCKET; 695 cmsg->cmsg_type = SCM_RIGHTS; 696 memcpy(CMSG_DATA(cmsg), msg->fds, fd_size); 697 698 do { 699 snd = sendmsg(mp_fd, &msgh, 0); 700 } while (snd < 0 && errno == EINTR); 701 702 if (snd < 0) { 703 rte_errno = errno; 704 /* Check if it caused by peer process exits */ 705 if (errno == ECONNREFUSED && 706 rte_eal_process_type() == RTE_PROC_PRIMARY) { 707 unlink(dst_path); 708 return 0; 709 } 710 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n", 711 dst_path, strerror(errno)); 712 return -1; 713 } 714 715 return 1; 716 } 717 718 static int 719 mp_send(struct rte_mp_msg *msg, const char *peer, int type) 720 { 721 int dir_fd, ret = 0; 722 DIR *mp_dir; 723 struct dirent *ent; 724 725 if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY)) 726 peer = eal_mp_socket_path(); 727 728 if (peer) { 729 if (send_msg(peer, msg, type) < 0) 730 return -1; 731 else 732 return 0; 733 } 734 735 /* broadcast to all secondary processes */ 736 mp_dir = opendir(mp_dir_path); 737 if (!mp_dir) { 738 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", 739 mp_dir_path); 740 rte_errno = errno; 741 return -1; 742 } 743 744 dir_fd = dirfd(mp_dir); 745 /* lock the directory to prevent processes spinning up while we send */ 746 if (flock(dir_fd, LOCK_SH)) { 747 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 748 mp_dir_path); 749 rte_errno = errno; 750 closedir(mp_dir); 751 return -1; 752 } 753 754 while ((ent = readdir(mp_dir))) { 755 char path[PATH_MAX]; 756 757 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 758 continue; 759 760 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 761 ent->d_name); 762 if (send_msg(path, msg, type) < 0) 763 ret = -1; 764 } 765 /* unlock the dir */ 766 flock(dir_fd, LOCK_UN); 767 768 /* dir_fd automatically closed on closedir */ 769 closedir(mp_dir); 770 return ret; 771 } 772 773 static int 774 check_input(const struct rte_mp_msg *msg) 775 { 776 if (msg == NULL) { 777 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n"); 778 rte_errno = EINVAL; 779 return -1; 780 } 781 782 if (validate_action_name(msg->name) != 0) 783 return -1; 784 785 if (msg->len_param < 0) { 786 RTE_LOG(ERR, EAL, "Message data length is negative\n"); 787 rte_errno = EINVAL; 788 return -1; 789 } 790 791 if (msg->num_fds < 0) { 792 RTE_LOG(ERR, EAL, "Number of fd's is negative\n"); 793 rte_errno = EINVAL; 794 return -1; 795 } 796 797 if (msg->len_param > RTE_MP_MAX_PARAM_LEN) { 798 RTE_LOG(ERR, EAL, "Message data is too long\n"); 799 rte_errno = E2BIG; 800 return -1; 801 } 802 803 if (msg->num_fds > RTE_MP_MAX_FD_NUM) { 804 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", 805 RTE_MP_MAX_FD_NUM); 806 rte_errno = E2BIG; 807 return -1; 808 } 809 810 return 0; 811 } 812 813 int 814 rte_mp_sendmsg(struct rte_mp_msg *msg) 815 { 816 const struct internal_config *internal_conf = 817 eal_get_internal_configuration(); 818 819 if (check_input(msg) != 0) 820 return -1; 821 822 if (internal_conf->no_shconf) { 823 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 824 rte_errno = ENOTSUP; 825 return -1; 826 } 827 828 RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name); 829 return mp_send(msg, NULL, MP_MSG); 830 } 831 832 static int 833 mp_request_async(const char *dst, struct rte_mp_msg *req, 834 struct async_request_param *param, const struct timespec *ts) 835 { 836 struct rte_mp_msg *reply_msg; 837 struct pending_request *pending_req, *exist; 838 int ret = -1; 839 840 pending_req = calloc(1, sizeof(*pending_req)); 841 reply_msg = calloc(1, sizeof(*reply_msg)); 842 if (pending_req == NULL || reply_msg == NULL) { 843 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); 844 rte_errno = ENOMEM; 845 ret = -1; 846 goto fail; 847 } 848 849 pending_req->type = REQUEST_TYPE_ASYNC; 850 strlcpy(pending_req->dst, dst, sizeof(pending_req->dst)); 851 pending_req->request = req; 852 pending_req->reply = reply_msg; 853 pending_req->async.param = param; 854 855 /* queue already locked by caller */ 856 857 exist = find_pending_request(dst, req->name); 858 if (exist) { 859 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 860 rte_errno = EEXIST; 861 ret = -1; 862 goto fail; 863 } 864 865 ret = send_msg(dst, req, MP_REQ); 866 if (ret < 0) { 867 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 868 dst, req->name); 869 ret = -1; 870 goto fail; 871 } else if (ret == 0) { 872 ret = 0; 873 goto fail; 874 } 875 param->user_reply.nb_sent++; 876 877 /* if alarm set fails, we simply ignore the reply */ 878 if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000, 879 async_reply_handle, pending_req) < 0) { 880 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n", 881 dst, req->name); 882 ret = -1; 883 goto fail; 884 } 885 TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next); 886 887 return 0; 888 fail: 889 free(pending_req); 890 free(reply_msg); 891 return ret; 892 } 893 894 static int 895 mp_request_sync(const char *dst, struct rte_mp_msg *req, 896 struct rte_mp_reply *reply, const struct timespec *ts) 897 { 898 int ret; 899 struct rte_mp_msg msg, *tmp; 900 struct pending_request pending_req, *exist; 901 902 pending_req.type = REQUEST_TYPE_SYNC; 903 pending_req.reply_received = 0; 904 strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); 905 pending_req.request = req; 906 pending_req.reply = &msg; 907 pthread_cond_init(&pending_req.sync.cond, NULL); 908 909 exist = find_pending_request(dst, req->name); 910 if (exist) { 911 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 912 rte_errno = EEXIST; 913 return -1; 914 } 915 916 ret = send_msg(dst, req, MP_REQ); 917 if (ret < 0) { 918 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 919 dst, req->name); 920 return -1; 921 } else if (ret == 0) 922 return 0; 923 924 TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next); 925 926 reply->nb_sent++; 927 928 do { 929 ret = pthread_cond_timedwait(&pending_req.sync.cond, 930 &pending_requests.lock, ts); 931 } while (ret != 0 && ret != ETIMEDOUT); 932 933 TAILQ_REMOVE(&pending_requests.requests, &pending_req, next); 934 935 if (pending_req.reply_received == 0) { 936 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n", 937 dst, req->name); 938 rte_errno = ETIMEDOUT; 939 return -1; 940 } 941 if (pending_req.reply_received == -1) { 942 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); 943 /* not receiving this message is not an error, so decrement 944 * number of sent messages 945 */ 946 reply->nb_sent--; 947 return 0; 948 } 949 950 tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); 951 if (!tmp) { 952 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 953 dst, req->name); 954 rte_errno = ENOMEM; 955 return -1; 956 } 957 memcpy(&tmp[reply->nb_received], &msg, sizeof(msg)); 958 reply->msgs = tmp; 959 reply->nb_received++; 960 return 0; 961 } 962 963 int 964 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, 965 const struct timespec *ts) 966 { 967 int dir_fd, ret = -1; 968 DIR *mp_dir; 969 struct dirent *ent; 970 struct timeval now; 971 struct timespec end; 972 const struct internal_config *internal_conf = 973 eal_get_internal_configuration(); 974 975 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 976 977 reply->nb_sent = 0; 978 reply->nb_received = 0; 979 reply->msgs = NULL; 980 981 if (check_input(req) != 0) 982 goto end; 983 984 if (internal_conf->no_shconf) { 985 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 986 rte_errno = ENOTSUP; 987 return -1; 988 } 989 990 if (gettimeofday(&now, NULL) < 0) { 991 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 992 rte_errno = errno; 993 goto end; 994 } 995 996 end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000; 997 end.tv_sec = now.tv_sec + ts->tv_sec + 998 (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000; 999 1000 /* for secondary process, send request to the primary process only */ 1001 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1002 pthread_mutex_lock(&pending_requests.lock); 1003 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); 1004 pthread_mutex_unlock(&pending_requests.lock); 1005 goto end; 1006 } 1007 1008 /* for primary process, broadcast request, and collect reply 1 by 1 */ 1009 mp_dir = opendir(mp_dir_path); 1010 if (!mp_dir) { 1011 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1012 rte_errno = errno; 1013 goto end; 1014 } 1015 1016 dir_fd = dirfd(mp_dir); 1017 /* lock the directory to prevent processes spinning up while we send */ 1018 if (flock(dir_fd, LOCK_SH)) { 1019 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1020 mp_dir_path); 1021 rte_errno = errno; 1022 goto close_end; 1023 } 1024 1025 pthread_mutex_lock(&pending_requests.lock); 1026 while ((ent = readdir(mp_dir))) { 1027 char path[PATH_MAX]; 1028 1029 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1030 continue; 1031 1032 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1033 ent->d_name); 1034 1035 /* unlocks the mutex while waiting for response, 1036 * locks on receive 1037 */ 1038 if (mp_request_sync(path, req, reply, &end)) 1039 goto unlock_end; 1040 } 1041 ret = 0; 1042 1043 unlock_end: 1044 pthread_mutex_unlock(&pending_requests.lock); 1045 /* unlock the directory */ 1046 flock(dir_fd, LOCK_UN); 1047 1048 close_end: 1049 /* dir_fd automatically closed on closedir */ 1050 closedir(mp_dir); 1051 1052 end: 1053 if (ret) { 1054 free(reply->msgs); 1055 reply->nb_received = 0; 1056 reply->msgs = NULL; 1057 } 1058 return ret; 1059 } 1060 1061 int 1062 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, 1063 rte_mp_async_reply_t clb) 1064 { 1065 struct rte_mp_msg *copy; 1066 struct pending_request *dummy; 1067 struct async_request_param *param; 1068 struct rte_mp_reply *reply; 1069 int dir_fd, ret = 0; 1070 DIR *mp_dir; 1071 struct dirent *ent; 1072 struct timeval now; 1073 struct timespec *end; 1074 bool dummy_used = false; 1075 const struct internal_config *internal_conf = 1076 eal_get_internal_configuration(); 1077 1078 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 1079 1080 if (check_input(req) != 0) 1081 return -1; 1082 1083 if (internal_conf->no_shconf) { 1084 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1085 rte_errno = ENOTSUP; 1086 return -1; 1087 } 1088 1089 if (gettimeofday(&now, NULL) < 0) { 1090 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 1091 rte_errno = errno; 1092 return -1; 1093 } 1094 copy = calloc(1, sizeof(*copy)); 1095 dummy = calloc(1, sizeof(*dummy)); 1096 param = calloc(1, sizeof(*param)); 1097 if (copy == NULL || dummy == NULL || param == NULL) { 1098 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); 1099 rte_errno = ENOMEM; 1100 goto fail; 1101 } 1102 1103 /* copy message */ 1104 memcpy(copy, req, sizeof(*copy)); 1105 1106 param->n_responses_processed = 0; 1107 param->clb = clb; 1108 end = ¶m->end; 1109 reply = ¶m->user_reply; 1110 1111 end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000; 1112 end->tv_sec = now.tv_sec + ts->tv_sec + 1113 (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000; 1114 reply->nb_sent = 0; 1115 reply->nb_received = 0; 1116 reply->msgs = NULL; 1117 1118 /* we have to lock the request queue here, as we will be adding a bunch 1119 * of requests to the queue at once, and some of the replies may arrive 1120 * before we add all of the requests to the queue. 1121 */ 1122 pthread_mutex_lock(&pending_requests.lock); 1123 1124 /* we have to ensure that callback gets triggered even if we don't send 1125 * anything, therefore earlier we have allocated a dummy request. fill 1126 * it, and put it on the queue if we don't send any requests. 1127 */ 1128 dummy->type = REQUEST_TYPE_ASYNC; 1129 dummy->request = copy; 1130 dummy->reply = NULL; 1131 dummy->async.param = param; 1132 dummy->reply_received = 1; /* short-circuit the timeout */ 1133 1134 /* for secondary process, send request to the primary process only */ 1135 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1136 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts); 1137 1138 /* if we didn't send anything, put dummy request on the queue */ 1139 if (ret == 0 && reply->nb_sent == 0) { 1140 TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, 1141 next); 1142 dummy_used = true; 1143 } 1144 1145 pthread_mutex_unlock(&pending_requests.lock); 1146 1147 /* if we couldn't send anything, clean up */ 1148 if (ret != 0) 1149 goto fail; 1150 return 0; 1151 } 1152 1153 /* for primary process, broadcast request */ 1154 mp_dir = opendir(mp_dir_path); 1155 if (!mp_dir) { 1156 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1157 rte_errno = errno; 1158 goto unlock_fail; 1159 } 1160 dir_fd = dirfd(mp_dir); 1161 1162 /* lock the directory to prevent processes spinning up while we send */ 1163 if (flock(dir_fd, LOCK_SH)) { 1164 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1165 mp_dir_path); 1166 rte_errno = errno; 1167 goto closedir_fail; 1168 } 1169 1170 while ((ent = readdir(mp_dir))) { 1171 char path[PATH_MAX]; 1172 1173 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1174 continue; 1175 1176 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1177 ent->d_name); 1178 1179 if (mp_request_async(path, copy, param, ts)) 1180 ret = -1; 1181 } 1182 /* if we didn't send anything, put dummy request on the queue */ 1183 if (ret == 0 && reply->nb_sent == 0) { 1184 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); 1185 dummy_used = true; 1186 } 1187 1188 /* finally, unlock the queue */ 1189 pthread_mutex_unlock(&pending_requests.lock); 1190 1191 /* unlock the directory */ 1192 flock(dir_fd, LOCK_UN); 1193 1194 /* dir_fd automatically closed on closedir */ 1195 closedir(mp_dir); 1196 1197 /* if dummy was unused, free it */ 1198 if (!dummy_used) 1199 free(dummy); 1200 1201 return ret; 1202 closedir_fail: 1203 closedir(mp_dir); 1204 unlock_fail: 1205 pthread_mutex_unlock(&pending_requests.lock); 1206 fail: 1207 free(dummy); 1208 free(param); 1209 free(copy); 1210 return -1; 1211 } 1212 1213 int 1214 rte_mp_reply(struct rte_mp_msg *msg, const char *peer) 1215 { 1216 RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); 1217 const struct internal_config *internal_conf = 1218 eal_get_internal_configuration(); 1219 1220 if (check_input(msg) != 0) 1221 return -1; 1222 1223 if (peer == NULL) { 1224 RTE_LOG(ERR, EAL, "peer is not specified\n"); 1225 rte_errno = EINVAL; 1226 return -1; 1227 } 1228 1229 if (internal_conf->no_shconf) { 1230 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1231 return 0; 1232 } 1233 1234 return mp_send(msg, peer, MP_REP); 1235 } 1236 1237 /* Internally, the status of the mp feature is represented as a three-state: 1238 * - "unknown" as long as no secondary process attached to a primary process 1239 * and there was no call to rte_mp_disable yet, 1240 * - "enabled" as soon as a secondary process attaches to a primary process, 1241 * - "disabled" when a primary process successfully called rte_mp_disable, 1242 */ 1243 enum mp_status { 1244 MP_STATUS_UNKNOWN, 1245 MP_STATUS_DISABLED, 1246 MP_STATUS_ENABLED, 1247 }; 1248 1249 static bool 1250 set_mp_status(enum mp_status status) 1251 { 1252 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 1253 uint8_t expected; 1254 uint8_t desired; 1255 1256 expected = MP_STATUS_UNKNOWN; 1257 desired = status; 1258 if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired, 1259 false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) 1260 return true; 1261 1262 return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired; 1263 } 1264 1265 bool 1266 rte_mp_disable(void) 1267 { 1268 return set_mp_status(MP_STATUS_DISABLED); 1269 } 1270 1271 bool 1272 __rte_mp_enable(void) 1273 { 1274 return set_mp_status(MP_STATUS_ENABLED); 1275 } 1276