1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2018 Intel Corporation 3 */ 4 5 #include <dirent.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <fnmatch.h> 9 #include <inttypes.h> 10 #include <libgen.h> 11 #include <limits.h> 12 #include <pthread.h> 13 #include <stdio.h> 14 #include <stdlib.h> 15 #include <string.h> 16 #include <sys/file.h> 17 #include <sys/time.h> 18 #include <sys/types.h> 19 #include <sys/socket.h> 20 #include <sys/un.h> 21 #include <unistd.h> 22 23 #include <rte_alarm.h> 24 #include <rte_common.h> 25 #include <rte_cycles.h> 26 #include <rte_eal.h> 27 #include <rte_errno.h> 28 #include <rte_lcore.h> 29 #include <rte_log.h> 30 #include <rte_tailq.h> 31 32 #include "eal_memcfg.h" 33 #include "eal_private.h" 34 #include "eal_filesystem.h" 35 #include "eal_internal_cfg.h" 36 37 static int mp_fd = -1; 38 static char mp_filter[PATH_MAX]; /* Filter for secondary process sockets */ 39 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */ 40 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER; 41 static char peer_name[PATH_MAX]; 42 43 struct action_entry { 44 TAILQ_ENTRY(action_entry) next; 45 char action_name[RTE_MP_MAX_NAME_LEN]; 46 rte_mp_t action; 47 }; 48 49 /** Double linked list of actions. */ 50 TAILQ_HEAD(action_entry_list, action_entry); 51 52 static struct action_entry_list action_entry_list = 53 TAILQ_HEAD_INITIALIZER(action_entry_list); 54 55 enum mp_type { 56 MP_MSG, /* Share message with peers, will not block */ 57 MP_REQ, /* Request for information, Will block for a reply */ 58 MP_REP, /* Response to previously-received request */ 59 MP_IGN, /* Response telling requester to ignore this response */ 60 }; 61 62 struct mp_msg_internal { 63 int type; 64 struct rte_mp_msg msg; 65 }; 66 67 struct async_request_param { 68 rte_mp_async_reply_t clb; 69 struct rte_mp_reply user_reply; 70 struct timespec end; 71 int n_responses_processed; 72 }; 73 74 struct pending_request { 75 TAILQ_ENTRY(pending_request) next; 76 enum { 77 REQUEST_TYPE_SYNC, 78 REQUEST_TYPE_ASYNC 79 } type; 80 char dst[PATH_MAX]; 81 struct rte_mp_msg *request; 82 struct rte_mp_msg *reply; 83 int reply_received; 84 RTE_STD_C11 85 union { 86 struct { 87 struct async_request_param *param; 88 } async; 89 struct { 90 pthread_cond_t cond; 91 } sync; 92 }; 93 }; 94 95 TAILQ_HEAD(pending_request_list, pending_request); 96 97 static struct { 98 struct pending_request_list requests; 99 pthread_mutex_t lock; 100 } pending_requests = { 101 .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), 102 .lock = PTHREAD_MUTEX_INITIALIZER, 103 /**< used in async requests only */ 104 }; 105 106 /* forward declarations */ 107 static int 108 mp_send(struct rte_mp_msg *msg, const char *peer, int type); 109 110 /* for use with alarm callback */ 111 static void 112 async_reply_handle(void *arg); 113 114 /* for use with process_msg */ 115 static struct pending_request * 116 async_reply_handle_thread_unsafe(void *arg); 117 118 static void 119 trigger_async_action(struct pending_request *req); 120 121 static struct pending_request * 122 find_pending_request(const char *dst, const char *act_name) 123 { 124 struct pending_request *r; 125 126 TAILQ_FOREACH(r, &pending_requests.requests, next) { 127 if (!strcmp(r->dst, dst) && 128 !strcmp(r->request->name, act_name)) 129 break; 130 } 131 132 return r; 133 } 134 135 static void 136 create_socket_path(const char *name, char *buf, int len) 137 { 138 const char *prefix = eal_mp_socket_path(); 139 140 if (strlen(name) > 0) 141 snprintf(buf, len, "%s_%s", prefix, name); 142 else 143 strlcpy(buf, prefix, len); 144 } 145 146 int 147 rte_eal_primary_proc_alive(const char *config_file_path) 148 { 149 int config_fd; 150 151 if (config_file_path) 152 config_fd = open(config_file_path, O_RDONLY); 153 else { 154 const char *path; 155 156 path = eal_runtime_config_path(); 157 config_fd = open(path, O_RDONLY); 158 } 159 if (config_fd < 0) 160 return 0; 161 162 int ret = lockf(config_fd, F_TEST, 0); 163 close(config_fd); 164 165 return !!ret; 166 } 167 168 static struct action_entry * 169 find_action_entry_by_name(const char *name) 170 { 171 struct action_entry *entry; 172 173 TAILQ_FOREACH(entry, &action_entry_list, next) { 174 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0) 175 break; 176 } 177 178 return entry; 179 } 180 181 static int 182 validate_action_name(const char *name) 183 { 184 if (name == NULL) { 185 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n"); 186 rte_errno = EINVAL; 187 return -1; 188 } 189 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) { 190 RTE_LOG(ERR, EAL, "Length of action name is zero\n"); 191 rte_errno = EINVAL; 192 return -1; 193 } 194 if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) { 195 rte_errno = E2BIG; 196 return -1; 197 } 198 return 0; 199 } 200 201 int 202 rte_mp_action_register(const char *name, rte_mp_t action) 203 { 204 struct action_entry *entry; 205 const struct internal_config *internal_conf = 206 eal_get_internal_configuration(); 207 208 if (validate_action_name(name) != 0) 209 return -1; 210 211 if (internal_conf->no_shconf) { 212 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 213 rte_errno = ENOTSUP; 214 return -1; 215 } 216 217 entry = malloc(sizeof(struct action_entry)); 218 if (entry == NULL) { 219 rte_errno = ENOMEM; 220 return -1; 221 } 222 strlcpy(entry->action_name, name, sizeof(entry->action_name)); 223 entry->action = action; 224 225 pthread_mutex_lock(&mp_mutex_action); 226 if (find_action_entry_by_name(name) != NULL) { 227 pthread_mutex_unlock(&mp_mutex_action); 228 rte_errno = EEXIST; 229 free(entry); 230 return -1; 231 } 232 TAILQ_INSERT_TAIL(&action_entry_list, entry, next); 233 pthread_mutex_unlock(&mp_mutex_action); 234 return 0; 235 } 236 237 void 238 rte_mp_action_unregister(const char *name) 239 { 240 struct action_entry *entry; 241 const struct internal_config *internal_conf = 242 eal_get_internal_configuration(); 243 244 if (validate_action_name(name) != 0) 245 return; 246 247 if (internal_conf->no_shconf) { 248 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 249 return; 250 } 251 252 pthread_mutex_lock(&mp_mutex_action); 253 entry = find_action_entry_by_name(name); 254 if (entry == NULL) { 255 pthread_mutex_unlock(&mp_mutex_action); 256 return; 257 } 258 TAILQ_REMOVE(&action_entry_list, entry, next); 259 pthread_mutex_unlock(&mp_mutex_action); 260 free(entry); 261 } 262 263 static int 264 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 265 { 266 int msglen; 267 struct iovec iov; 268 struct msghdr msgh; 269 char control[CMSG_SPACE(sizeof(m->msg.fds))]; 270 struct cmsghdr *cmsg; 271 int buflen = sizeof(*m) - sizeof(m->msg.fds); 272 273 memset(&msgh, 0, sizeof(msgh)); 274 iov.iov_base = m; 275 iov.iov_len = buflen; 276 277 msgh.msg_name = s; 278 msgh.msg_namelen = sizeof(*s); 279 msgh.msg_iov = &iov; 280 msgh.msg_iovlen = 1; 281 msgh.msg_control = control; 282 msgh.msg_controllen = sizeof(control); 283 284 msglen = recvmsg(mp_fd, &msgh, 0); 285 if (msglen < 0) { 286 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno)); 287 return -1; 288 } 289 290 if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) { 291 RTE_LOG(ERR, EAL, "truncated msg\n"); 292 return -1; 293 } 294 295 /* read auxiliary FDs if any */ 296 for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; 297 cmsg = CMSG_NXTHDR(&msgh, cmsg)) { 298 if ((cmsg->cmsg_level == SOL_SOCKET) && 299 (cmsg->cmsg_type == SCM_RIGHTS)) { 300 memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds)); 301 break; 302 } 303 } 304 /* sanity-check the response */ 305 if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) { 306 RTE_LOG(ERR, EAL, "invalid number of fd's received\n"); 307 return -1; 308 } 309 if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) { 310 RTE_LOG(ERR, EAL, "invalid received data length\n"); 311 return -1; 312 } 313 return 0; 314 } 315 316 static void 317 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) 318 { 319 struct pending_request *pending_req; 320 struct action_entry *entry; 321 struct rte_mp_msg *msg = &m->msg; 322 rte_mp_t action = NULL; 323 const struct internal_config *internal_conf = 324 eal_get_internal_configuration(); 325 326 RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); 327 328 if (m->type == MP_REP || m->type == MP_IGN) { 329 struct pending_request *req = NULL; 330 331 pthread_mutex_lock(&pending_requests.lock); 332 pending_req = find_pending_request(s->sun_path, msg->name); 333 if (pending_req) { 334 memcpy(pending_req->reply, msg, sizeof(*msg)); 335 /* -1 indicates that we've been asked to ignore */ 336 pending_req->reply_received = 337 m->type == MP_REP ? 1 : -1; 338 339 if (pending_req->type == REQUEST_TYPE_SYNC) 340 pthread_cond_signal(&pending_req->sync.cond); 341 else if (pending_req->type == REQUEST_TYPE_ASYNC) 342 req = async_reply_handle_thread_unsafe( 343 pending_req); 344 } else 345 RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); 346 pthread_mutex_unlock(&pending_requests.lock); 347 348 if (req != NULL) 349 trigger_async_action(req); 350 return; 351 } 352 353 pthread_mutex_lock(&mp_mutex_action); 354 entry = find_action_entry_by_name(msg->name); 355 if (entry != NULL) 356 action = entry->action; 357 pthread_mutex_unlock(&mp_mutex_action); 358 359 if (!action) { 360 if (m->type == MP_REQ && !internal_conf->init_complete) { 361 /* if this is a request, and init is not yet complete, 362 * and callback wasn't registered, we should tell the 363 * requester to ignore our existence because we're not 364 * yet ready to process this request. 365 */ 366 struct rte_mp_msg dummy; 367 368 memset(&dummy, 0, sizeof(dummy)); 369 strlcpy(dummy.name, msg->name, sizeof(dummy.name)); 370 mp_send(&dummy, s->sun_path, MP_IGN); 371 } else { 372 RTE_LOG(ERR, EAL, "Cannot find action: %s\n", 373 msg->name); 374 } 375 } else if (action(msg, s->sun_path) < 0) { 376 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); 377 } 378 } 379 380 static void * 381 mp_handle(void *arg __rte_unused) 382 { 383 struct mp_msg_internal msg; 384 struct sockaddr_un sa; 385 386 while (1) { 387 if (read_msg(&msg, &sa) == 0) 388 process_msg(&msg, &sa); 389 } 390 391 return NULL; 392 } 393 394 static int 395 timespec_cmp(const struct timespec *a, const struct timespec *b) 396 { 397 if (a->tv_sec < b->tv_sec) 398 return -1; 399 if (a->tv_sec > b->tv_sec) 400 return 1; 401 if (a->tv_nsec < b->tv_nsec) 402 return -1; 403 if (a->tv_nsec > b->tv_nsec) 404 return 1; 405 return 0; 406 } 407 408 enum async_action { 409 ACTION_FREE, /**< free the action entry, but don't trigger callback */ 410 ACTION_TRIGGER /**< trigger callback, then free action entry */ 411 }; 412 413 static enum async_action 414 process_async_request(struct pending_request *sr, const struct timespec *now) 415 { 416 struct async_request_param *param; 417 struct rte_mp_reply *reply; 418 bool timeout, last_msg; 419 420 param = sr->async.param; 421 reply = ¶m->user_reply; 422 423 /* did we timeout? */ 424 timeout = timespec_cmp(¶m->end, now) <= 0; 425 426 /* if we received a response, adjust relevant data and copy message. */ 427 if (sr->reply_received == 1 && sr->reply) { 428 struct rte_mp_msg *msg, *user_msgs, *tmp; 429 430 msg = sr->reply; 431 user_msgs = reply->msgs; 432 433 tmp = realloc(user_msgs, sizeof(*msg) * 434 (reply->nb_received + 1)); 435 if (!tmp) { 436 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 437 sr->dst, sr->request->name); 438 /* this entry is going to be removed and its message 439 * dropped, but we don't want to leak memory, so 440 * continue. 441 */ 442 } else { 443 user_msgs = tmp; 444 reply->msgs = user_msgs; 445 memcpy(&user_msgs[reply->nb_received], 446 msg, sizeof(*msg)); 447 reply->nb_received++; 448 } 449 450 /* mark this request as processed */ 451 param->n_responses_processed++; 452 } else if (sr->reply_received == -1) { 453 /* we were asked to ignore this process */ 454 reply->nb_sent--; 455 } else if (timeout) { 456 /* count it as processed response, but don't increment 457 * nb_received. 458 */ 459 param->n_responses_processed++; 460 } 461 462 free(sr->reply); 463 464 last_msg = param->n_responses_processed == reply->nb_sent; 465 466 return last_msg ? ACTION_TRIGGER : ACTION_FREE; 467 } 468 469 static void 470 trigger_async_action(struct pending_request *sr) 471 { 472 struct async_request_param *param; 473 struct rte_mp_reply *reply; 474 475 param = sr->async.param; 476 reply = ¶m->user_reply; 477 478 param->clb(sr->request, reply); 479 480 /* clean up */ 481 free(sr->async.param->user_reply.msgs); 482 free(sr->async.param); 483 free(sr->request); 484 free(sr); 485 } 486 487 static struct pending_request * 488 async_reply_handle_thread_unsafe(void *arg) 489 { 490 struct pending_request *req = (struct pending_request *)arg; 491 enum async_action action; 492 struct timespec ts_now; 493 494 if (clock_gettime(CLOCK_MONOTONIC, &ts_now) < 0) { 495 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 496 goto no_trigger; 497 } 498 499 action = process_async_request(req, &ts_now); 500 501 TAILQ_REMOVE(&pending_requests.requests, req, next); 502 503 if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) { 504 /* if we failed to cancel the alarm because it's already in 505 * progress, don't proceed because otherwise we will end up 506 * handling the same message twice. 507 */ 508 if (rte_errno == EINPROGRESS) { 509 RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n"); 510 goto no_trigger; 511 } 512 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n"); 513 } 514 515 if (action == ACTION_TRIGGER) 516 return req; 517 no_trigger: 518 free(req); 519 return NULL; 520 } 521 522 static void 523 async_reply_handle(void *arg) 524 { 525 struct pending_request *req; 526 527 pthread_mutex_lock(&pending_requests.lock); 528 req = async_reply_handle_thread_unsafe(arg); 529 pthread_mutex_unlock(&pending_requests.lock); 530 531 if (req != NULL) 532 trigger_async_action(req); 533 } 534 535 static int 536 open_socket_fd(void) 537 { 538 struct sockaddr_un un; 539 540 peer_name[0] = '\0'; 541 if (rte_eal_process_type() == RTE_PROC_SECONDARY) 542 snprintf(peer_name, sizeof(peer_name), 543 "%d_%"PRIx64, getpid(), rte_rdtsc()); 544 545 mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); 546 if (mp_fd < 0) { 547 RTE_LOG(ERR, EAL, "failed to create unix socket\n"); 548 return -1; 549 } 550 551 memset(&un, 0, sizeof(un)); 552 un.sun_family = AF_UNIX; 553 554 create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); 555 556 unlink(un.sun_path); /* May still exist since last run */ 557 558 if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { 559 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", 560 un.sun_path, strerror(errno)); 561 close(mp_fd); 562 return -1; 563 } 564 565 RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path); 566 return mp_fd; 567 } 568 569 static void 570 close_socket_fd(void) 571 { 572 char path[PATH_MAX]; 573 574 if (mp_fd < 0) 575 return; 576 577 close(mp_fd); 578 create_socket_path(peer_name, path, sizeof(path)); 579 unlink(path); 580 } 581 582 int 583 rte_mp_channel_init(void) 584 { 585 char path[PATH_MAX]; 586 int dir_fd; 587 pthread_t mp_handle_tid; 588 const struct internal_config *internal_conf = 589 eal_get_internal_configuration(); 590 591 /* in no shared files mode, we do not have secondary processes support, 592 * so no need to initialize IPC. 593 */ 594 if (internal_conf->no_shconf) { 595 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n"); 596 rte_errno = ENOTSUP; 597 return -1; 598 } 599 600 /* create filter path */ 601 create_socket_path("*", path, sizeof(path)); 602 strlcpy(mp_filter, basename(path), sizeof(mp_filter)); 603 604 /* path may have been modified, so recreate it */ 605 create_socket_path("*", path, sizeof(path)); 606 strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); 607 608 /* lock the directory */ 609 dir_fd = open(mp_dir_path, O_RDONLY); 610 if (dir_fd < 0) { 611 RTE_LOG(ERR, EAL, "failed to open %s: %s\n", 612 mp_dir_path, strerror(errno)); 613 return -1; 614 } 615 616 if (flock(dir_fd, LOCK_EX)) { 617 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", 618 mp_dir_path, strerror(errno)); 619 close(dir_fd); 620 return -1; 621 } 622 623 if (open_socket_fd() < 0) { 624 close(dir_fd); 625 return -1; 626 } 627 628 if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle", 629 NULL, mp_handle, NULL) < 0) { 630 RTE_LOG(ERR, EAL, "failed to create mp thread: %s\n", 631 strerror(errno)); 632 close(mp_fd); 633 close(dir_fd); 634 mp_fd = -1; 635 return -1; 636 } 637 638 /* unlock the directory */ 639 flock(dir_fd, LOCK_UN); 640 close(dir_fd); 641 642 return 0; 643 } 644 645 void 646 rte_mp_channel_cleanup(void) 647 { 648 close_socket_fd(); 649 } 650 651 /** 652 * Return -1, as fail to send message and it's caused by the local side. 653 * Return 0, as fail to send message and it's caused by the remote side. 654 * Return 1, as succeed to send message. 655 * 656 */ 657 static int 658 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) 659 { 660 int snd; 661 struct iovec iov; 662 struct msghdr msgh; 663 struct cmsghdr *cmsg; 664 struct sockaddr_un dst; 665 struct mp_msg_internal m; 666 int fd_size = msg->num_fds * sizeof(int); 667 char control[CMSG_SPACE(fd_size)]; 668 669 m.type = type; 670 memcpy(&m.msg, msg, sizeof(*msg)); 671 672 memset(&dst, 0, sizeof(dst)); 673 dst.sun_family = AF_UNIX; 674 strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); 675 676 memset(&msgh, 0, sizeof(msgh)); 677 memset(control, 0, sizeof(control)); 678 679 iov.iov_base = &m; 680 iov.iov_len = sizeof(m) - sizeof(msg->fds); 681 682 msgh.msg_name = &dst; 683 msgh.msg_namelen = sizeof(dst); 684 msgh.msg_iov = &iov; 685 msgh.msg_iovlen = 1; 686 msgh.msg_control = control; 687 msgh.msg_controllen = sizeof(control); 688 689 cmsg = CMSG_FIRSTHDR(&msgh); 690 cmsg->cmsg_len = CMSG_LEN(fd_size); 691 cmsg->cmsg_level = SOL_SOCKET; 692 cmsg->cmsg_type = SCM_RIGHTS; 693 memcpy(CMSG_DATA(cmsg), msg->fds, fd_size); 694 695 do { 696 snd = sendmsg(mp_fd, &msgh, 0); 697 } while (snd < 0 && errno == EINTR); 698 699 if (snd < 0) { 700 rte_errno = errno; 701 /* Check if it caused by peer process exits */ 702 if (errno == ECONNREFUSED && 703 rte_eal_process_type() == RTE_PROC_PRIMARY) { 704 unlink(dst_path); 705 return 0; 706 } 707 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n", 708 dst_path, strerror(errno)); 709 return -1; 710 } 711 712 return 1; 713 } 714 715 static int 716 mp_send(struct rte_mp_msg *msg, const char *peer, int type) 717 { 718 int dir_fd, ret = 0; 719 DIR *mp_dir; 720 struct dirent *ent; 721 722 if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY)) 723 peer = eal_mp_socket_path(); 724 725 if (peer) { 726 if (send_msg(peer, msg, type) < 0) 727 return -1; 728 else 729 return 0; 730 } 731 732 /* broadcast to all secondary processes */ 733 mp_dir = opendir(mp_dir_path); 734 if (!mp_dir) { 735 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", 736 mp_dir_path); 737 rte_errno = errno; 738 return -1; 739 } 740 741 dir_fd = dirfd(mp_dir); 742 /* lock the directory to prevent processes spinning up while we send */ 743 if (flock(dir_fd, LOCK_SH)) { 744 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 745 mp_dir_path); 746 rte_errno = errno; 747 closedir(mp_dir); 748 return -1; 749 } 750 751 while ((ent = readdir(mp_dir))) { 752 char path[PATH_MAX]; 753 754 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 755 continue; 756 757 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 758 ent->d_name); 759 if (send_msg(path, msg, type) < 0) 760 ret = -1; 761 } 762 /* unlock the dir */ 763 flock(dir_fd, LOCK_UN); 764 765 /* dir_fd automatically closed on closedir */ 766 closedir(mp_dir); 767 return ret; 768 } 769 770 static int 771 check_input(const struct rte_mp_msg *msg) 772 { 773 if (msg == NULL) { 774 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n"); 775 rte_errno = EINVAL; 776 return -1; 777 } 778 779 if (validate_action_name(msg->name) != 0) 780 return -1; 781 782 if (msg->len_param < 0) { 783 RTE_LOG(ERR, EAL, "Message data length is negative\n"); 784 rte_errno = EINVAL; 785 return -1; 786 } 787 788 if (msg->num_fds < 0) { 789 RTE_LOG(ERR, EAL, "Number of fd's is negative\n"); 790 rte_errno = EINVAL; 791 return -1; 792 } 793 794 if (msg->len_param > RTE_MP_MAX_PARAM_LEN) { 795 RTE_LOG(ERR, EAL, "Message data is too long\n"); 796 rte_errno = E2BIG; 797 return -1; 798 } 799 800 if (msg->num_fds > RTE_MP_MAX_FD_NUM) { 801 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", 802 RTE_MP_MAX_FD_NUM); 803 rte_errno = E2BIG; 804 return -1; 805 } 806 807 return 0; 808 } 809 810 int 811 rte_mp_sendmsg(struct rte_mp_msg *msg) 812 { 813 const struct internal_config *internal_conf = 814 eal_get_internal_configuration(); 815 816 if (check_input(msg) != 0) 817 return -1; 818 819 if (internal_conf->no_shconf) { 820 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 821 rte_errno = ENOTSUP; 822 return -1; 823 } 824 825 RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name); 826 return mp_send(msg, NULL, MP_MSG); 827 } 828 829 static int 830 mp_request_async(const char *dst, struct rte_mp_msg *req, 831 struct async_request_param *param, const struct timespec *ts) 832 { 833 struct rte_mp_msg *reply_msg; 834 struct pending_request *pending_req, *exist; 835 int ret = -1; 836 837 pending_req = calloc(1, sizeof(*pending_req)); 838 reply_msg = calloc(1, sizeof(*reply_msg)); 839 if (pending_req == NULL || reply_msg == NULL) { 840 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); 841 rte_errno = ENOMEM; 842 ret = -1; 843 goto fail; 844 } 845 846 pending_req->type = REQUEST_TYPE_ASYNC; 847 strlcpy(pending_req->dst, dst, sizeof(pending_req->dst)); 848 pending_req->request = req; 849 pending_req->reply = reply_msg; 850 pending_req->async.param = param; 851 852 /* queue already locked by caller */ 853 854 exist = find_pending_request(dst, req->name); 855 if (exist) { 856 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 857 rte_errno = EEXIST; 858 ret = -1; 859 goto fail; 860 } 861 862 ret = send_msg(dst, req, MP_REQ); 863 if (ret < 0) { 864 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 865 dst, req->name); 866 ret = -1; 867 goto fail; 868 } else if (ret == 0) { 869 ret = 0; 870 goto fail; 871 } 872 param->user_reply.nb_sent++; 873 874 /* if alarm set fails, we simply ignore the reply */ 875 if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000, 876 async_reply_handle, pending_req) < 0) { 877 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n", 878 dst, req->name); 879 ret = -1; 880 goto fail; 881 } 882 TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next); 883 884 return 0; 885 fail: 886 free(pending_req); 887 free(reply_msg); 888 return ret; 889 } 890 891 static int 892 mp_request_sync(const char *dst, struct rte_mp_msg *req, 893 struct rte_mp_reply *reply, const struct timespec *ts) 894 { 895 int ret; 896 pthread_condattr_t attr; 897 struct rte_mp_msg msg, *tmp; 898 struct pending_request pending_req, *exist; 899 900 pending_req.type = REQUEST_TYPE_SYNC; 901 pending_req.reply_received = 0; 902 strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); 903 pending_req.request = req; 904 pending_req.reply = &msg; 905 pthread_condattr_init(&attr); 906 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); 907 pthread_cond_init(&pending_req.sync.cond, &attr); 908 909 exist = find_pending_request(dst, req->name); 910 if (exist) { 911 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); 912 rte_errno = EEXIST; 913 return -1; 914 } 915 916 ret = send_msg(dst, req, MP_REQ); 917 if (ret < 0) { 918 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", 919 dst, req->name); 920 return -1; 921 } else if (ret == 0) 922 return 0; 923 924 TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next); 925 926 reply->nb_sent++; 927 928 do { 929 ret = pthread_cond_timedwait(&pending_req.sync.cond, 930 &pending_requests.lock, ts); 931 } while (ret != 0 && ret != ETIMEDOUT); 932 933 TAILQ_REMOVE(&pending_requests.requests, &pending_req, next); 934 935 if (pending_req.reply_received == 0) { 936 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n", 937 dst, req->name); 938 rte_errno = ETIMEDOUT; 939 return -1; 940 } 941 if (pending_req.reply_received == -1) { 942 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); 943 /* not receiving this message is not an error, so decrement 944 * number of sent messages 945 */ 946 reply->nb_sent--; 947 return 0; 948 } 949 950 tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); 951 if (!tmp) { 952 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", 953 dst, req->name); 954 rte_errno = ENOMEM; 955 return -1; 956 } 957 memcpy(&tmp[reply->nb_received], &msg, sizeof(msg)); 958 reply->msgs = tmp; 959 reply->nb_received++; 960 return 0; 961 } 962 963 int 964 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, 965 const struct timespec *ts) 966 { 967 int dir_fd, ret = -1; 968 DIR *mp_dir; 969 struct dirent *ent; 970 struct timespec now, end; 971 const struct internal_config *internal_conf = 972 eal_get_internal_configuration(); 973 974 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 975 976 reply->nb_sent = 0; 977 reply->nb_received = 0; 978 reply->msgs = NULL; 979 980 if (check_input(req) != 0) 981 goto end; 982 983 if (internal_conf->no_shconf) { 984 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 985 rte_errno = ENOTSUP; 986 return -1; 987 } 988 989 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 990 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 991 rte_errno = errno; 992 goto end; 993 } 994 995 end.tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 996 end.tv_sec = now.tv_sec + ts->tv_sec + 997 (now.tv_nsec + ts->tv_nsec) / 1000000000; 998 999 /* for secondary process, send request to the primary process only */ 1000 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1001 pthread_mutex_lock(&pending_requests.lock); 1002 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); 1003 pthread_mutex_unlock(&pending_requests.lock); 1004 goto end; 1005 } 1006 1007 /* for primary process, broadcast request, and collect reply 1 by 1 */ 1008 mp_dir = opendir(mp_dir_path); 1009 if (!mp_dir) { 1010 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1011 rte_errno = errno; 1012 goto end; 1013 } 1014 1015 dir_fd = dirfd(mp_dir); 1016 /* lock the directory to prevent processes spinning up while we send */ 1017 if (flock(dir_fd, LOCK_SH)) { 1018 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1019 mp_dir_path); 1020 rte_errno = errno; 1021 goto close_end; 1022 } 1023 1024 pthread_mutex_lock(&pending_requests.lock); 1025 while ((ent = readdir(mp_dir))) { 1026 char path[PATH_MAX]; 1027 1028 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1029 continue; 1030 1031 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1032 ent->d_name); 1033 1034 /* unlocks the mutex while waiting for response, 1035 * locks on receive 1036 */ 1037 if (mp_request_sync(path, req, reply, &end)) 1038 goto unlock_end; 1039 } 1040 ret = 0; 1041 1042 unlock_end: 1043 pthread_mutex_unlock(&pending_requests.lock); 1044 /* unlock the directory */ 1045 flock(dir_fd, LOCK_UN); 1046 1047 close_end: 1048 /* dir_fd automatically closed on closedir */ 1049 closedir(mp_dir); 1050 1051 end: 1052 if (ret) { 1053 free(reply->msgs); 1054 reply->nb_received = 0; 1055 reply->msgs = NULL; 1056 } 1057 return ret; 1058 } 1059 1060 int 1061 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, 1062 rte_mp_async_reply_t clb) 1063 { 1064 struct rte_mp_msg *copy; 1065 struct pending_request *dummy; 1066 struct async_request_param *param; 1067 struct rte_mp_reply *reply; 1068 int dir_fd, ret = 0; 1069 DIR *mp_dir; 1070 struct dirent *ent; 1071 struct timespec now; 1072 struct timespec *end; 1073 bool dummy_used = false; 1074 const struct internal_config *internal_conf = 1075 eal_get_internal_configuration(); 1076 1077 RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); 1078 1079 if (check_input(req) != 0) 1080 return -1; 1081 1082 if (internal_conf->no_shconf) { 1083 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1084 rte_errno = ENOTSUP; 1085 return -1; 1086 } 1087 1088 if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) { 1089 RTE_LOG(ERR, EAL, "Failed to get current time\n"); 1090 rte_errno = errno; 1091 return -1; 1092 } 1093 copy = calloc(1, sizeof(*copy)); 1094 dummy = calloc(1, sizeof(*dummy)); 1095 param = calloc(1, sizeof(*param)); 1096 if (copy == NULL || dummy == NULL || param == NULL) { 1097 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); 1098 rte_errno = ENOMEM; 1099 goto fail; 1100 } 1101 1102 /* copy message */ 1103 memcpy(copy, req, sizeof(*copy)); 1104 1105 param->n_responses_processed = 0; 1106 param->clb = clb; 1107 end = ¶m->end; 1108 reply = ¶m->user_reply; 1109 1110 end->tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000; 1111 end->tv_sec = now.tv_sec + ts->tv_sec + 1112 (now.tv_nsec + ts->tv_nsec) / 1000000000; 1113 reply->nb_sent = 0; 1114 reply->nb_received = 0; 1115 reply->msgs = NULL; 1116 1117 /* we have to lock the request queue here, as we will be adding a bunch 1118 * of requests to the queue at once, and some of the replies may arrive 1119 * before we add all of the requests to the queue. 1120 */ 1121 pthread_mutex_lock(&pending_requests.lock); 1122 1123 /* we have to ensure that callback gets triggered even if we don't send 1124 * anything, therefore earlier we have allocated a dummy request. fill 1125 * it, and put it on the queue if we don't send any requests. 1126 */ 1127 dummy->type = REQUEST_TYPE_ASYNC; 1128 dummy->request = copy; 1129 dummy->reply = NULL; 1130 dummy->async.param = param; 1131 dummy->reply_received = 1; /* short-circuit the timeout */ 1132 1133 /* for secondary process, send request to the primary process only */ 1134 if (rte_eal_process_type() == RTE_PROC_SECONDARY) { 1135 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts); 1136 1137 /* if we didn't send anything, put dummy request on the queue */ 1138 if (ret == 0 && reply->nb_sent == 0) { 1139 TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, 1140 next); 1141 dummy_used = true; 1142 } 1143 1144 pthread_mutex_unlock(&pending_requests.lock); 1145 1146 /* if we couldn't send anything, clean up */ 1147 if (ret != 0) 1148 goto fail; 1149 return 0; 1150 } 1151 1152 /* for primary process, broadcast request */ 1153 mp_dir = opendir(mp_dir_path); 1154 if (!mp_dir) { 1155 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); 1156 rte_errno = errno; 1157 goto unlock_fail; 1158 } 1159 dir_fd = dirfd(mp_dir); 1160 1161 /* lock the directory to prevent processes spinning up while we send */ 1162 if (flock(dir_fd, LOCK_SH)) { 1163 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", 1164 mp_dir_path); 1165 rte_errno = errno; 1166 goto closedir_fail; 1167 } 1168 1169 while ((ent = readdir(mp_dir))) { 1170 char path[PATH_MAX]; 1171 1172 if (fnmatch(mp_filter, ent->d_name, 0) != 0) 1173 continue; 1174 1175 snprintf(path, sizeof(path), "%s/%s", mp_dir_path, 1176 ent->d_name); 1177 1178 if (mp_request_async(path, copy, param, ts)) 1179 ret = -1; 1180 } 1181 /* if we didn't send anything, put dummy request on the queue */ 1182 if (ret == 0 && reply->nb_sent == 0) { 1183 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); 1184 dummy_used = true; 1185 } 1186 1187 /* finally, unlock the queue */ 1188 pthread_mutex_unlock(&pending_requests.lock); 1189 1190 /* unlock the directory */ 1191 flock(dir_fd, LOCK_UN); 1192 1193 /* dir_fd automatically closed on closedir */ 1194 closedir(mp_dir); 1195 1196 /* if dummy was unused, free it */ 1197 if (!dummy_used) 1198 free(dummy); 1199 1200 return ret; 1201 closedir_fail: 1202 closedir(mp_dir); 1203 unlock_fail: 1204 pthread_mutex_unlock(&pending_requests.lock); 1205 fail: 1206 free(dummy); 1207 free(param); 1208 free(copy); 1209 return -1; 1210 } 1211 1212 int 1213 rte_mp_reply(struct rte_mp_msg *msg, const char *peer) 1214 { 1215 RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); 1216 const struct internal_config *internal_conf = 1217 eal_get_internal_configuration(); 1218 1219 if (check_input(msg) != 0) 1220 return -1; 1221 1222 if (peer == NULL) { 1223 RTE_LOG(ERR, EAL, "peer is not specified\n"); 1224 rte_errno = EINVAL; 1225 return -1; 1226 } 1227 1228 if (internal_conf->no_shconf) { 1229 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n"); 1230 return 0; 1231 } 1232 1233 return mp_send(msg, peer, MP_REP); 1234 } 1235 1236 /* Internally, the status of the mp feature is represented as a three-state: 1237 * - "unknown" as long as no secondary process attached to a primary process 1238 * and there was no call to rte_mp_disable yet, 1239 * - "enabled" as soon as a secondary process attaches to a primary process, 1240 * - "disabled" when a primary process successfully called rte_mp_disable, 1241 */ 1242 enum mp_status { 1243 MP_STATUS_UNKNOWN, 1244 MP_STATUS_DISABLED, 1245 MP_STATUS_ENABLED, 1246 }; 1247 1248 static bool 1249 set_mp_status(enum mp_status status) 1250 { 1251 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 1252 uint8_t expected; 1253 uint8_t desired; 1254 1255 expected = MP_STATUS_UNKNOWN; 1256 desired = status; 1257 if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired, 1258 false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) 1259 return true; 1260 1261 return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired; 1262 } 1263 1264 bool 1265 rte_mp_disable(void) 1266 { 1267 return set_mp_status(MP_STATUS_DISABLED); 1268 } 1269 1270 bool 1271 __rte_mp_enable(void) 1272 { 1273 return set_mp_status(MP_STATUS_ENABLED); 1274 } 1275