1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2018 Intel Corporation 3 */ 4 5 #include <stdlib.h> 6 #include <string.h> 7 #include <pthread.h> 8 #include <sys/time.h> 9 10 #include <rte_errno.h> 11 #include <rte_string_fns.h> 12 13 #include "eal_memalloc.h" 14 #include "eal_memcfg.h" 15 #include "eal_private.h" 16 17 #include "malloc_elem.h" 18 #include "malloc_mp.h" 19 20 #define MP_ACTION_SYNC "mp_malloc_sync" 21 /**< request sent by primary process to notify of changes in memory map */ 22 #define MP_ACTION_ROLLBACK "mp_malloc_rollback" 23 /**< request sent by primary process to notify of changes in memory map. this is 24 * essentially a regular sync request, but we cannot send sync requests while 25 * another one is in progress, and we might have to - therefore, we do this as 26 * a separate callback. 27 */ 28 #define MP_ACTION_REQUEST "mp_malloc_request" 29 /**< request sent by secondary process to ask for allocation/deallocation */ 30 #define MP_ACTION_RESPONSE "mp_malloc_response" 31 /**< response sent to secondary process to indicate result of request */ 32 33 /* forward declarations */ 34 static int 35 handle_sync_response(const struct rte_mp_msg *request, 36 const struct rte_mp_reply *reply); 37 static int 38 handle_rollback_response(const struct rte_mp_msg *request, 39 const struct rte_mp_reply *reply); 40 41 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */ 42 43 /* when we're allocating, we need to store some state to ensure that we can 44 * roll back later 45 */ 46 struct primary_alloc_req_state { 47 struct malloc_heap *heap; 48 struct rte_memseg **ms; 49 int ms_len; 50 struct malloc_elem *elem; 51 void *map_addr; 52 size_t map_len; 53 }; 54 55 enum req_state { 56 REQ_STATE_INACTIVE = 0, 57 REQ_STATE_ACTIVE, 58 REQ_STATE_COMPLETE 59 }; 60 61 struct mp_request { 62 TAILQ_ENTRY(mp_request) next; 63 struct malloc_mp_req user_req; /**< contents of request */ 64 pthread_cond_t cond; /**< variable we use to time out on this request */ 65 enum req_state state; /**< indicate status of this request */ 66 struct primary_alloc_req_state alloc_state; 67 }; 68 69 /* 70 * We could've used just a single request, but it may be possible for 71 * secondaries to timeout earlier than the primary, and send a new request while 72 * primary is still expecting replies to the old one. Therefore, each new 73 * request will get assigned a new ID, which is how we will distinguish between 74 * expected and unexpected messages. 75 */ 76 TAILQ_HEAD(mp_request_list, mp_request); 77 static struct { 78 struct mp_request_list list; 79 pthread_mutex_t lock; 80 } mp_request_list = { 81 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list), 82 .lock = PTHREAD_MUTEX_INITIALIZER 83 }; 84 85 /** 86 * General workflow is the following: 87 * 88 * Allocation: 89 * S: send request to primary 90 * P: attempt to allocate memory 91 * if failed, sendmsg failure 92 * if success, send sync request 93 * S: if received msg of failure, quit 94 * if received sync request, synchronize memory map and reply with result 95 * P: if received sync request result 96 * if success, sendmsg success 97 * if failure, roll back allocation and send a rollback request 98 * S: if received msg of success, quit 99 * if received rollback request, synchronize memory map and reply with result 100 * P: if received sync request result 101 * sendmsg sync request result 102 * S: if received msg, quit 103 * 104 * Aside from timeouts, there are three points where we can quit: 105 * - if allocation failed straight away 106 * - if allocation and sync request succeeded 107 * - if allocation succeeded, sync request failed, allocation rolled back and 108 * rollback request received (irrespective of whether it succeeded or failed) 109 * 110 * Deallocation: 111 * S: send request to primary 112 * P: attempt to deallocate memory 113 * if failed, sendmsg failure 114 * if success, send sync request 115 * S: if received msg of failure, quit 116 * if received sync request, synchronize memory map and reply with result 117 * P: if received sync request result 118 * sendmsg sync request result 119 * S: if received msg, quit 120 * 121 * There is no "rollback" from deallocation, as it's safe to have some memory 122 * mapped in some processes - it's absent from the heap, so it won't get used. 123 */ 124 125 static struct mp_request * 126 find_request_by_id(uint64_t id) 127 { 128 struct mp_request *req; 129 TAILQ_FOREACH(req, &mp_request_list.list, next) { 130 if (req->user_req.id == id) 131 break; 132 } 133 return req; 134 } 135 136 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */ 137 static uint64_t 138 get_unique_id(void) 139 { 140 uint64_t id; 141 do { 142 id = rte_rand(); 143 } while (find_request_by_id(id) != NULL); 144 return id; 145 } 146 147 /* secondary will respond to sync requests thusly */ 148 static int 149 handle_sync(const struct rte_mp_msg *msg, const void *peer) 150 { 151 struct rte_mp_msg reply; 152 const struct malloc_mp_req *req = 153 (const struct malloc_mp_req *)msg->param; 154 struct malloc_mp_req *resp = 155 (struct malloc_mp_req *)reply.param; 156 int ret; 157 158 if (req->t != REQ_TYPE_SYNC) { 159 EAL_LOG(ERR, "Unexpected request from primary"); 160 return -1; 161 } 162 163 memset(&reply, 0, sizeof(reply)); 164 165 reply.num_fds = 0; 166 strlcpy(reply.name, msg->name, sizeof(reply.name)); 167 reply.len_param = sizeof(*resp); 168 169 ret = eal_memalloc_sync_with_primary(); 170 171 resp->t = REQ_TYPE_SYNC; 172 resp->id = req->id; 173 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL; 174 175 return rte_mp_reply(&reply, peer); 176 } 177 178 static int 179 handle_free_request(const struct malloc_mp_req *m) 180 { 181 const struct rte_memseg_list *msl; 182 void *start, *end; 183 size_t len; 184 185 len = m->free_req.len; 186 start = m->free_req.addr; 187 end = RTE_PTR_ADD(start, len - 1); 188 189 /* check if the requested memory actually exists */ 190 msl = rte_mem_virt2memseg_list(start); 191 if (msl == NULL) { 192 EAL_LOG(ERR, "Requested to free unknown memory"); 193 return -1; 194 } 195 196 /* check if end is within the same memory region */ 197 if (rte_mem_virt2memseg_list(end) != msl) { 198 EAL_LOG(ERR, "Requested to free memory spanning multiple regions"); 199 return -1; 200 } 201 202 /* we're supposed to only free memory that's not external */ 203 if (msl->external) { 204 EAL_LOG(ERR, "Requested to free external memory"); 205 return -1; 206 } 207 208 /* now that we've validated the request, announce it */ 209 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 210 m->free_req.addr, m->free_req.len); 211 212 /* now, do the actual freeing */ 213 return malloc_heap_free_pages(m->free_req.addr, m->free_req.len); 214 } 215 216 static int 217 handle_alloc_request(const struct malloc_mp_req *m, 218 struct mp_request *req) 219 { 220 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 221 const struct malloc_req_alloc *ar = &m->alloc_req; 222 struct malloc_heap *heap; 223 struct malloc_elem *elem; 224 struct rte_memseg **ms; 225 size_t alloc_sz; 226 int n_segs; 227 void *map_addr; 228 229 /* this is checked by the API, but we need to prevent divide by zero */ 230 if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) { 231 EAL_LOG(ERR, "Attempting to allocate with invalid page size"); 232 return -1; 233 } 234 235 /* heap idx is index into the heap array, not socket ID */ 236 if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) { 237 EAL_LOG(ERR, "Attempting to allocate from invalid heap"); 238 return -1; 239 } 240 241 heap = &mcfg->malloc_heaps[ar->malloc_heap_idx]; 242 243 /* 244 * for allocations, we must only use internal heaps, but since the 245 * rte_malloc_heap_socket_is_external() is thread-safe and we're already 246 * read-locked, we'll have to take advantage of the fact that internal 247 * socket ID's are always lower than RTE_MAX_NUMA_NODES. 248 */ 249 if (heap->socket_id >= RTE_MAX_NUMA_NODES) { 250 EAL_LOG(ERR, "Attempting to allocate from external heap"); 251 return -1; 252 } 253 254 alloc_sz = RTE_ALIGN_CEIL(RTE_ALIGN_CEIL(ar->elt_size, ar->align) + 255 MALLOC_ELEM_OVERHEAD, ar->page_sz); 256 n_segs = alloc_sz / ar->page_sz; 257 258 /* we can't know in advance how many pages we'll need, so we malloc */ 259 ms = malloc(sizeof(*ms) * n_segs); 260 if (ms == NULL) { 261 EAL_LOG(ERR, "Couldn't allocate memory for request state"); 262 return -1; 263 } 264 memset(ms, 0, sizeof(*ms) * n_segs); 265 266 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket, 267 ar->flags, ar->align, ar->bound, ar->contig, ms, 268 n_segs); 269 270 if (elem == NULL) 271 goto fail; 272 273 map_addr = ms[0]->addr; 274 275 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz); 276 277 /* we have succeeded in allocating memory, but we still need to sync 278 * with other processes. however, since DPDK IPC is single-threaded, we 279 * send an asynchronous request and exit this callback. 280 */ 281 282 req->alloc_state.ms = ms; 283 req->alloc_state.ms_len = n_segs; 284 req->alloc_state.map_addr = map_addr; 285 req->alloc_state.map_len = alloc_sz; 286 req->alloc_state.elem = elem; 287 req->alloc_state.heap = heap; 288 289 return 0; 290 fail: 291 free(ms); 292 return -1; 293 } 294 295 /* first stage of primary handling requests from secondary */ 296 static int 297 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused) 298 { 299 const struct malloc_mp_req *m = 300 (const struct malloc_mp_req *)msg->param; 301 struct mp_request *entry; 302 int ret; 303 304 /* lock access to request */ 305 pthread_mutex_lock(&mp_request_list.lock); 306 307 /* make sure it's not a dupe */ 308 entry = find_request_by_id(m->id); 309 if (entry != NULL) { 310 EAL_LOG(ERR, "Duplicate request id"); 311 goto fail; 312 } 313 314 entry = malloc(sizeof(*entry)); 315 if (entry == NULL) { 316 EAL_LOG(ERR, "Unable to allocate memory for request"); 317 goto fail; 318 } 319 320 /* erase all data */ 321 memset(entry, 0, sizeof(*entry)); 322 323 if (m->t == REQ_TYPE_ALLOC) { 324 ret = handle_alloc_request(m, entry); 325 } else if (m->t == REQ_TYPE_FREE) { 326 ret = handle_free_request(m); 327 } else { 328 EAL_LOG(ERR, "Unexpected request from secondary"); 329 goto fail; 330 } 331 332 if (ret != 0) { 333 struct rte_mp_msg resp_msg; 334 struct malloc_mp_req *resp = 335 (struct malloc_mp_req *)resp_msg.param; 336 337 /* send failure message straight away */ 338 resp_msg.num_fds = 0; 339 resp_msg.len_param = sizeof(*resp); 340 strlcpy(resp_msg.name, MP_ACTION_RESPONSE, 341 sizeof(resp_msg.name)); 342 343 resp->t = m->t; 344 resp->result = REQ_RESULT_FAIL; 345 resp->id = m->id; 346 347 if (rte_mp_sendmsg(&resp_msg)) { 348 EAL_LOG(ERR, "Couldn't send response"); 349 goto fail; 350 } 351 /* we did not modify the request */ 352 free(entry); 353 } else { 354 struct rte_mp_msg sr_msg; 355 struct malloc_mp_req *sr = 356 (struct malloc_mp_req *)sr_msg.param; 357 struct timespec ts; 358 359 memset(&sr_msg, 0, sizeof(sr_msg)); 360 361 /* we can do something, so send sync request asynchronously */ 362 sr_msg.num_fds = 0; 363 sr_msg.len_param = sizeof(*sr); 364 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name)); 365 366 ts.tv_nsec = 0; 367 ts.tv_sec = MP_TIMEOUT_S; 368 369 /* sync requests carry no data */ 370 sr->t = REQ_TYPE_SYNC; 371 sr->id = m->id; 372 373 /* there may be stray timeout still waiting */ 374 do { 375 ret = rte_mp_request_async(&sr_msg, &ts, 376 handle_sync_response); 377 } while (ret != 0 && rte_errno == EEXIST); 378 if (ret != 0) { 379 EAL_LOG(ERR, "Couldn't send sync request"); 380 if (m->t == REQ_TYPE_ALLOC) 381 free(entry->alloc_state.ms); 382 goto fail; 383 } 384 385 /* mark request as in progress */ 386 memcpy(&entry->user_req, m, sizeof(*m)); 387 entry->state = REQ_STATE_ACTIVE; 388 389 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 390 } 391 pthread_mutex_unlock(&mp_request_list.lock); 392 return 0; 393 fail: 394 pthread_mutex_unlock(&mp_request_list.lock); 395 free(entry); 396 return -1; 397 } 398 399 /* callback for asynchronous sync requests for primary. this will either do a 400 * sendmsg with results, or trigger rollback request. 401 */ 402 static int 403 handle_sync_response(const struct rte_mp_msg *request, 404 const struct rte_mp_reply *reply) 405 { 406 enum malloc_req_result result; 407 struct mp_request *entry; 408 const struct malloc_mp_req *mpreq = 409 (const struct malloc_mp_req *)request->param; 410 int i; 411 412 /* lock the request */ 413 pthread_mutex_lock(&mp_request_list.lock); 414 415 entry = find_request_by_id(mpreq->id); 416 if (entry == NULL) { 417 EAL_LOG(ERR, "Wrong request ID"); 418 goto fail; 419 } 420 421 result = REQ_RESULT_SUCCESS; 422 423 if (reply->nb_received != reply->nb_sent) 424 result = REQ_RESULT_FAIL; 425 426 for (i = 0; i < reply->nb_received; i++) { 427 struct malloc_mp_req *resp = 428 (struct malloc_mp_req *)reply->msgs[i].param; 429 430 if (resp->t != REQ_TYPE_SYNC) { 431 EAL_LOG(ERR, "Unexpected response to sync request"); 432 result = REQ_RESULT_FAIL; 433 break; 434 } 435 if (resp->id != entry->user_req.id) { 436 EAL_LOG(ERR, "Response to wrong sync request"); 437 result = REQ_RESULT_FAIL; 438 break; 439 } 440 if (resp->result == REQ_RESULT_FAIL) { 441 result = REQ_RESULT_FAIL; 442 break; 443 } 444 } 445 446 if (entry->user_req.t == REQ_TYPE_FREE) { 447 struct rte_mp_msg msg; 448 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 449 450 memset(&msg, 0, sizeof(msg)); 451 452 /* this is a free request, just sendmsg result */ 453 resp->t = REQ_TYPE_FREE; 454 resp->result = result; 455 resp->id = entry->user_req.id; 456 msg.num_fds = 0; 457 msg.len_param = sizeof(*resp); 458 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 459 460 if (rte_mp_sendmsg(&msg)) 461 EAL_LOG(ERR, "Could not send message to secondary process"); 462 463 TAILQ_REMOVE(&mp_request_list.list, entry, next); 464 free(entry); 465 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 466 result == REQ_RESULT_SUCCESS) { 467 struct malloc_heap *heap = entry->alloc_state.heap; 468 struct rte_mp_msg msg; 469 struct malloc_mp_req *resp = 470 (struct malloc_mp_req *)msg.param; 471 472 memset(&msg, 0, sizeof(msg)); 473 474 heap->total_size += entry->alloc_state.map_len; 475 476 /* result is success, so just notify secondary about this */ 477 resp->t = REQ_TYPE_ALLOC; 478 resp->result = result; 479 resp->id = entry->user_req.id; 480 msg.num_fds = 0; 481 msg.len_param = sizeof(*resp); 482 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 483 484 if (rte_mp_sendmsg(&msg)) 485 EAL_LOG(ERR, "Could not send message to secondary process"); 486 487 TAILQ_REMOVE(&mp_request_list.list, entry, next); 488 free(entry->alloc_state.ms); 489 free(entry); 490 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 491 result == REQ_RESULT_FAIL) { 492 struct rte_mp_msg rb_msg; 493 struct malloc_mp_req *rb = 494 (struct malloc_mp_req *)rb_msg.param; 495 struct timespec ts; 496 struct primary_alloc_req_state *state = 497 &entry->alloc_state; 498 int ret; 499 500 memset(&rb_msg, 0, sizeof(rb_msg)); 501 502 /* we've failed to sync, so do a rollback */ 503 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 504 state->map_addr, state->map_len); 505 506 rollback_expand_heap(state->ms, state->ms_len, state->elem, 507 state->map_addr, state->map_len); 508 509 /* send rollback request */ 510 rb_msg.num_fds = 0; 511 rb_msg.len_param = sizeof(*rb); 512 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name)); 513 514 ts.tv_nsec = 0; 515 ts.tv_sec = MP_TIMEOUT_S; 516 517 /* sync requests carry no data */ 518 rb->t = REQ_TYPE_SYNC; 519 rb->id = entry->user_req.id; 520 521 /* there may be stray timeout still waiting */ 522 do { 523 ret = rte_mp_request_async(&rb_msg, &ts, 524 handle_rollback_response); 525 } while (ret != 0 && rte_errno == EEXIST); 526 if (ret != 0) { 527 EAL_LOG(ERR, "Could not send rollback request to secondary process"); 528 529 /* we couldn't send rollback request, but that's OK - 530 * secondary will time out, and memory has been removed 531 * from heap anyway. 532 */ 533 TAILQ_REMOVE(&mp_request_list.list, entry, next); 534 free(state->ms); 535 free(entry); 536 goto fail; 537 } 538 } else { 539 EAL_LOG(ERR, " to sync request of unknown type"); 540 goto fail; 541 } 542 543 pthread_mutex_unlock(&mp_request_list.lock); 544 return 0; 545 fail: 546 pthread_mutex_unlock(&mp_request_list.lock); 547 return -1; 548 } 549 550 static int 551 handle_rollback_response(const struct rte_mp_msg *request, 552 const struct rte_mp_reply *reply __rte_unused) 553 { 554 struct rte_mp_msg msg; 555 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 556 const struct malloc_mp_req *mpreq = 557 (const struct malloc_mp_req *)request->param; 558 struct mp_request *entry; 559 560 /* lock the request */ 561 pthread_mutex_lock(&mp_request_list.lock); 562 563 memset(&msg, 0, sizeof(msg)); 564 565 entry = find_request_by_id(mpreq->id); 566 if (entry == NULL) { 567 EAL_LOG(ERR, "Wrong request ID"); 568 goto fail; 569 } 570 571 if (entry->user_req.t != REQ_TYPE_ALLOC) { 572 EAL_LOG(ERR, "Unexpected active request"); 573 goto fail; 574 } 575 576 /* we don't care if rollback succeeded, request still failed */ 577 resp->t = REQ_TYPE_ALLOC; 578 resp->result = REQ_RESULT_FAIL; 579 resp->id = mpreq->id; 580 msg.num_fds = 0; 581 msg.len_param = sizeof(*resp); 582 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 583 584 if (rte_mp_sendmsg(&msg)) 585 EAL_LOG(ERR, "Could not send message to secondary process"); 586 587 /* clean up */ 588 TAILQ_REMOVE(&mp_request_list.list, entry, next); 589 free(entry->alloc_state.ms); 590 free(entry); 591 592 pthread_mutex_unlock(&mp_request_list.lock); 593 return 0; 594 fail: 595 pthread_mutex_unlock(&mp_request_list.lock); 596 return -1; 597 } 598 599 /* final stage of the request from secondary */ 600 static int 601 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused) 602 { 603 const struct malloc_mp_req *m = 604 (const struct malloc_mp_req *)msg->param; 605 struct mp_request *entry; 606 607 pthread_mutex_lock(&mp_request_list.lock); 608 609 entry = find_request_by_id(m->id); 610 if (entry != NULL) { 611 /* update request status */ 612 entry->user_req.result = m->result; 613 614 entry->state = REQ_STATE_COMPLETE; 615 616 /* trigger thread wakeup */ 617 pthread_cond_signal(&entry->cond); 618 } 619 620 pthread_mutex_unlock(&mp_request_list.lock); 621 622 return 0; 623 } 624 625 /* synchronously request memory map sync, this is only called whenever primary 626 * process initiates the allocation. 627 */ 628 int 629 request_sync(void) 630 { 631 struct rte_mp_msg msg; 632 struct rte_mp_reply reply; 633 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param; 634 struct timespec ts; 635 int i, ret = -1; 636 637 memset(&msg, 0, sizeof(msg)); 638 memset(&reply, 0, sizeof(reply)); 639 640 /* no need to create tailq entries as this is entirely synchronous */ 641 642 msg.num_fds = 0; 643 msg.len_param = sizeof(*req); 644 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name)); 645 646 /* sync request carries no data */ 647 req->t = REQ_TYPE_SYNC; 648 req->id = get_unique_id(); 649 650 ts.tv_nsec = 0; 651 ts.tv_sec = MP_TIMEOUT_S; 652 653 /* there may be stray timeout still waiting */ 654 do { 655 ret = rte_mp_request_sync(&msg, &reply, &ts); 656 } while (ret != 0 && rte_errno == EEXIST); 657 if (ret != 0) { 658 /* if IPC is unsupported, behave as if the call succeeded */ 659 if (rte_errno != ENOTSUP) 660 EAL_LOG(ERR, "Could not send sync request to secondary process"); 661 else 662 ret = 0; 663 goto out; 664 } 665 666 if (reply.nb_received != reply.nb_sent) { 667 EAL_LOG(ERR, "Not all secondaries have responded"); 668 goto out; 669 } 670 671 for (i = 0; i < reply.nb_received; i++) { 672 struct malloc_mp_req *resp = 673 (struct malloc_mp_req *)reply.msgs[i].param; 674 if (resp->t != REQ_TYPE_SYNC) { 675 EAL_LOG(ERR, "Unexpected response from secondary"); 676 goto out; 677 } 678 if (resp->id != req->id) { 679 EAL_LOG(ERR, "Wrong request ID"); 680 goto out; 681 } 682 if (resp->result != REQ_RESULT_SUCCESS) { 683 EAL_LOG(ERR, "Secondary process failed to synchronize"); 684 goto out; 685 } 686 } 687 688 ret = 0; 689 out: 690 free(reply.msgs); 691 return ret; 692 } 693 694 /* this is a synchronous wrapper around a bunch of asynchronous requests to 695 * primary process. this will initiate a request and wait until responses come. 696 */ 697 int 698 request_to_primary(struct malloc_mp_req *user_req) 699 { 700 struct rte_mp_msg msg; 701 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param; 702 struct mp_request *entry; 703 struct timespec ts; 704 struct timeval now; 705 int ret; 706 707 memset(&msg, 0, sizeof(msg)); 708 memset(&ts, 0, sizeof(ts)); 709 710 pthread_mutex_lock(&mp_request_list.lock); 711 712 entry = malloc(sizeof(*entry)); 713 if (entry == NULL) { 714 EAL_LOG(ERR, "Cannot allocate memory for request"); 715 goto fail; 716 } 717 718 memset(entry, 0, sizeof(*entry)); 719 720 if (gettimeofday(&now, NULL) < 0) { 721 EAL_LOG(ERR, "Cannot get current time"); 722 goto fail; 723 } 724 725 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000; 726 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S + 727 (now.tv_usec * 1000) / 1000000000; 728 729 /* initialize the request */ 730 pthread_cond_init(&entry->cond, NULL); 731 732 msg.num_fds = 0; 733 msg.len_param = sizeof(*msg_req); 734 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name)); 735 736 /* (attempt to) get a unique id */ 737 user_req->id = get_unique_id(); 738 739 /* copy contents of user request into the message */ 740 memcpy(msg_req, user_req, sizeof(*msg_req)); 741 742 if (rte_mp_sendmsg(&msg)) { 743 EAL_LOG(ERR, "Cannot send message to primary"); 744 goto fail; 745 } 746 747 /* copy contents of user request into active request */ 748 memcpy(&entry->user_req, user_req, sizeof(*user_req)); 749 750 /* mark request as in progress */ 751 entry->state = REQ_STATE_ACTIVE; 752 753 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 754 755 /* finally, wait on timeout */ 756 do { 757 ret = pthread_cond_timedwait(&entry->cond, 758 &mp_request_list.lock, &ts); 759 } while ((ret != 0 && ret != ETIMEDOUT) && 760 entry->state == REQ_STATE_ACTIVE); 761 762 if (entry->state != REQ_STATE_COMPLETE) { 763 EAL_LOG(ERR, "Request timed out"); 764 ret = -1; 765 } else { 766 ret = 0; 767 user_req->result = entry->user_req.result; 768 } 769 TAILQ_REMOVE(&mp_request_list.list, entry, next); 770 free(entry); 771 772 pthread_mutex_unlock(&mp_request_list.lock); 773 return ret; 774 fail: 775 pthread_mutex_unlock(&mp_request_list.lock); 776 free(entry); 777 return -1; 778 } 779 780 int 781 register_mp_requests(void) 782 { 783 if (rte_eal_process_type() == RTE_PROC_PRIMARY) { 784 /* it's OK for primary to not support IPC */ 785 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) && 786 rte_errno != ENOTSUP) { 787 EAL_LOG(ERR, "Couldn't register '%s' action", 788 MP_ACTION_REQUEST); 789 return -1; 790 } 791 } else { 792 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) { 793 EAL_LOG(ERR, "Couldn't register '%s' action", 794 MP_ACTION_SYNC); 795 return -1; 796 } 797 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) { 798 EAL_LOG(ERR, "Couldn't register '%s' action", 799 MP_ACTION_SYNC); 800 return -1; 801 } 802 if (rte_mp_action_register(MP_ACTION_RESPONSE, 803 handle_response)) { 804 EAL_LOG(ERR, "Couldn't register '%s' action", 805 MP_ACTION_RESPONSE); 806 return -1; 807 } 808 } 809 return 0; 810 } 811 812 void 813 unregister_mp_requests(void) 814 { 815 if (rte_eal_process_type() == RTE_PROC_PRIMARY) { 816 rte_mp_action_unregister(MP_ACTION_REQUEST); 817 } else { 818 rte_mp_action_unregister(MP_ACTION_SYNC); 819 rte_mp_action_unregister(MP_ACTION_ROLLBACK); 820 rte_mp_action_unregister(MP_ACTION_RESPONSE); 821 } 822 } 823