1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2018 Intel Corporation 3 */ 4 5 #include <string.h> 6 #include <sys/time.h> 7 8 #include <rte_errno.h> 9 #include <rte_string_fns.h> 10 11 #include "eal_memalloc.h" 12 #include "eal_memcfg.h" 13 #include "eal_private.h" 14 15 #include "malloc_elem.h" 16 #include "malloc_mp.h" 17 18 #define MP_ACTION_SYNC "mp_malloc_sync" 19 /**< request sent by primary process to notify of changes in memory map */ 20 #define MP_ACTION_ROLLBACK "mp_malloc_rollback" 21 /**< request sent by primary process to notify of changes in memory map. this is 22 * essentially a regular sync request, but we cannot send sync requests while 23 * another one is in progress, and we might have to - therefore, we do this as 24 * a separate callback. 25 */ 26 #define MP_ACTION_REQUEST "mp_malloc_request" 27 /**< request sent by secondary process to ask for allocation/deallocation */ 28 #define MP_ACTION_RESPONSE "mp_malloc_response" 29 /**< response sent to secondary process to indicate result of request */ 30 31 /* forward declarations */ 32 static int 33 handle_sync_response(const struct rte_mp_msg *request, 34 const struct rte_mp_reply *reply); 35 static int 36 handle_rollback_response(const struct rte_mp_msg *request, 37 const struct rte_mp_reply *reply); 38 39 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */ 40 41 /* when we're allocating, we need to store some state to ensure that we can 42 * roll back later 43 */ 44 struct primary_alloc_req_state { 45 struct malloc_heap *heap; 46 struct rte_memseg **ms; 47 int ms_len; 48 struct malloc_elem *elem; 49 void *map_addr; 50 size_t map_len; 51 }; 52 53 enum req_state { 54 REQ_STATE_INACTIVE = 0, 55 REQ_STATE_ACTIVE, 56 REQ_STATE_COMPLETE 57 }; 58 59 struct mp_request { 60 TAILQ_ENTRY(mp_request) next; 61 struct malloc_mp_req user_req; /**< contents of request */ 62 pthread_cond_t cond; /**< variable we use to time out on this request */ 63 enum req_state state; /**< indicate status of this request */ 64 struct primary_alloc_req_state alloc_state; 65 }; 66 67 /* 68 * We could've used just a single request, but it may be possible for 69 * secondaries to timeout earlier than the primary, and send a new request while 70 * primary is still expecting replies to the old one. Therefore, each new 71 * request will get assigned a new ID, which is how we will distinguish between 72 * expected and unexpected messages. 73 */ 74 TAILQ_HEAD(mp_request_list, mp_request); 75 static struct { 76 struct mp_request_list list; 77 pthread_mutex_t lock; 78 } mp_request_list = { 79 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list), 80 .lock = PTHREAD_MUTEX_INITIALIZER 81 }; 82 83 /** 84 * General workflow is the following: 85 * 86 * Allocation: 87 * S: send request to primary 88 * P: attempt to allocate memory 89 * if failed, sendmsg failure 90 * if success, send sync request 91 * S: if received msg of failure, quit 92 * if received sync request, synchronize memory map and reply with result 93 * P: if received sync request result 94 * if success, sendmsg success 95 * if failure, roll back allocation and send a rollback request 96 * S: if received msg of success, quit 97 * if received rollback request, synchronize memory map and reply with result 98 * P: if received sync request result 99 * sendmsg sync request result 100 * S: if received msg, quit 101 * 102 * Aside from timeouts, there are three points where we can quit: 103 * - if allocation failed straight away 104 * - if allocation and sync request succeeded 105 * - if allocation succeeded, sync request failed, allocation rolled back and 106 * rollback request received (irrespective of whether it succeeded or failed) 107 * 108 * Deallocation: 109 * S: send request to primary 110 * P: attempt to deallocate memory 111 * if failed, sendmsg failure 112 * if success, send sync request 113 * S: if received msg of failure, quit 114 * if received sync request, synchronize memory map and reply with result 115 * P: if received sync request result 116 * sendmsg sync request result 117 * S: if received msg, quit 118 * 119 * There is no "rollback" from deallocation, as it's safe to have some memory 120 * mapped in some processes - it's absent from the heap, so it won't get used. 121 */ 122 123 static struct mp_request * 124 find_request_by_id(uint64_t id) 125 { 126 struct mp_request *req; 127 TAILQ_FOREACH(req, &mp_request_list.list, next) { 128 if (req->user_req.id == id) 129 break; 130 } 131 return req; 132 } 133 134 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */ 135 static uint64_t 136 get_unique_id(void) 137 { 138 uint64_t id; 139 do { 140 id = rte_rand(); 141 } while (find_request_by_id(id) != NULL); 142 return id; 143 } 144 145 /* secondary will respond to sync requests thusly */ 146 static int 147 handle_sync(const struct rte_mp_msg *msg, const void *peer) 148 { 149 struct rte_mp_msg reply; 150 const struct malloc_mp_req *req = 151 (const struct malloc_mp_req *)msg->param; 152 struct malloc_mp_req *resp = 153 (struct malloc_mp_req *)reply.param; 154 int ret; 155 156 if (req->t != REQ_TYPE_SYNC) { 157 RTE_LOG(ERR, EAL, "Unexpected request from primary\n"); 158 return -1; 159 } 160 161 memset(&reply, 0, sizeof(reply)); 162 163 reply.num_fds = 0; 164 strlcpy(reply.name, msg->name, sizeof(reply.name)); 165 reply.len_param = sizeof(*resp); 166 167 ret = eal_memalloc_sync_with_primary(); 168 169 resp->t = REQ_TYPE_SYNC; 170 resp->id = req->id; 171 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL; 172 173 return rte_mp_reply(&reply, peer); 174 } 175 176 static int 177 handle_free_request(const struct malloc_mp_req *m) 178 { 179 const struct rte_memseg_list *msl; 180 void *start, *end; 181 size_t len; 182 183 len = m->free_req.len; 184 start = m->free_req.addr; 185 end = RTE_PTR_ADD(start, len - 1); 186 187 /* check if the requested memory actually exists */ 188 msl = rte_mem_virt2memseg_list(start); 189 if (msl == NULL) { 190 RTE_LOG(ERR, EAL, "Requested to free unknown memory\n"); 191 return -1; 192 } 193 194 /* check if end is within the same memory region */ 195 if (rte_mem_virt2memseg_list(end) != msl) { 196 RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n"); 197 return -1; 198 } 199 200 /* we're supposed to only free memory that's not external */ 201 if (msl->external) { 202 RTE_LOG(ERR, EAL, "Requested to free external memory\n"); 203 return -1; 204 } 205 206 /* now that we've validated the request, announce it */ 207 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 208 m->free_req.addr, m->free_req.len); 209 210 /* now, do the actual freeing */ 211 return malloc_heap_free_pages(m->free_req.addr, m->free_req.len); 212 } 213 214 static int 215 handle_alloc_request(const struct malloc_mp_req *m, 216 struct mp_request *req) 217 { 218 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 219 const struct malloc_req_alloc *ar = &m->alloc_req; 220 struct malloc_heap *heap; 221 struct malloc_elem *elem; 222 struct rte_memseg **ms; 223 size_t alloc_sz; 224 int n_segs; 225 void *map_addr; 226 227 /* this is checked by the API, but we need to prevent divide by zero */ 228 if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) { 229 RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n"); 230 return -1; 231 } 232 233 /* heap idx is index into the heap array, not socket ID */ 234 if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) { 235 RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n"); 236 return -1; 237 } 238 239 heap = &mcfg->malloc_heaps[ar->malloc_heap_idx]; 240 241 /* 242 * for allocations, we must only use internal heaps, but since the 243 * rte_malloc_heap_socket_is_external() is thread-safe and we're already 244 * read-locked, we'll have to take advantage of the fact that internal 245 * socket ID's are always lower than RTE_MAX_NUMA_NODES. 246 */ 247 if (heap->socket_id >= RTE_MAX_NUMA_NODES) { 248 RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n"); 249 return -1; 250 } 251 252 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size + 253 MALLOC_ELEM_TRAILER_LEN, ar->page_sz); 254 n_segs = alloc_sz / ar->page_sz; 255 256 /* we can't know in advance how many pages we'll need, so we malloc */ 257 ms = malloc(sizeof(*ms) * n_segs); 258 if (ms == NULL) { 259 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n"); 260 return -1; 261 } 262 memset(ms, 0, sizeof(*ms) * n_segs); 263 264 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket, 265 ar->flags, ar->align, ar->bound, ar->contig, ms, 266 n_segs); 267 268 if (elem == NULL) 269 goto fail; 270 271 map_addr = ms[0]->addr; 272 273 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz); 274 275 /* we have succeeded in allocating memory, but we still need to sync 276 * with other processes. however, since DPDK IPC is single-threaded, we 277 * send an asynchronous request and exit this callback. 278 */ 279 280 req->alloc_state.ms = ms; 281 req->alloc_state.ms_len = n_segs; 282 req->alloc_state.map_addr = map_addr; 283 req->alloc_state.map_len = alloc_sz; 284 req->alloc_state.elem = elem; 285 req->alloc_state.heap = heap; 286 287 return 0; 288 fail: 289 free(ms); 290 return -1; 291 } 292 293 /* first stage of primary handling requests from secondary */ 294 static int 295 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused) 296 { 297 const struct malloc_mp_req *m = 298 (const struct malloc_mp_req *)msg->param; 299 struct mp_request *entry; 300 int ret; 301 302 /* lock access to request */ 303 pthread_mutex_lock(&mp_request_list.lock); 304 305 /* make sure it's not a dupe */ 306 entry = find_request_by_id(m->id); 307 if (entry != NULL) { 308 RTE_LOG(ERR, EAL, "Duplicate request id\n"); 309 goto fail; 310 } 311 312 entry = malloc(sizeof(*entry)); 313 if (entry == NULL) { 314 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n"); 315 goto fail; 316 } 317 318 /* erase all data */ 319 memset(entry, 0, sizeof(*entry)); 320 321 if (m->t == REQ_TYPE_ALLOC) { 322 ret = handle_alloc_request(m, entry); 323 } else if (m->t == REQ_TYPE_FREE) { 324 ret = handle_free_request(m); 325 } else { 326 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n"); 327 goto fail; 328 } 329 330 if (ret != 0) { 331 struct rte_mp_msg resp_msg; 332 struct malloc_mp_req *resp = 333 (struct malloc_mp_req *)resp_msg.param; 334 335 /* send failure message straight away */ 336 resp_msg.num_fds = 0; 337 resp_msg.len_param = sizeof(*resp); 338 strlcpy(resp_msg.name, MP_ACTION_RESPONSE, 339 sizeof(resp_msg.name)); 340 341 resp->t = m->t; 342 resp->result = REQ_RESULT_FAIL; 343 resp->id = m->id; 344 345 if (rte_mp_sendmsg(&resp_msg)) { 346 RTE_LOG(ERR, EAL, "Couldn't send response\n"); 347 goto fail; 348 } 349 /* we did not modify the request */ 350 free(entry); 351 } else { 352 struct rte_mp_msg sr_msg; 353 struct malloc_mp_req *sr = 354 (struct malloc_mp_req *)sr_msg.param; 355 struct timespec ts; 356 357 memset(&sr_msg, 0, sizeof(sr_msg)); 358 359 /* we can do something, so send sync request asynchronously */ 360 sr_msg.num_fds = 0; 361 sr_msg.len_param = sizeof(*sr); 362 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name)); 363 364 ts.tv_nsec = 0; 365 ts.tv_sec = MP_TIMEOUT_S; 366 367 /* sync requests carry no data */ 368 sr->t = REQ_TYPE_SYNC; 369 sr->id = m->id; 370 371 /* there may be stray timeout still waiting */ 372 do { 373 ret = rte_mp_request_async(&sr_msg, &ts, 374 handle_sync_response); 375 } while (ret != 0 && rte_errno == EEXIST); 376 if (ret != 0) { 377 RTE_LOG(ERR, EAL, "Couldn't send sync request\n"); 378 if (m->t == REQ_TYPE_ALLOC) 379 free(entry->alloc_state.ms); 380 goto fail; 381 } 382 383 /* mark request as in progress */ 384 memcpy(&entry->user_req, m, sizeof(*m)); 385 entry->state = REQ_STATE_ACTIVE; 386 387 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 388 } 389 pthread_mutex_unlock(&mp_request_list.lock); 390 return 0; 391 fail: 392 pthread_mutex_unlock(&mp_request_list.lock); 393 free(entry); 394 return -1; 395 } 396 397 /* callback for asynchronous sync requests for primary. this will either do a 398 * sendmsg with results, or trigger rollback request. 399 */ 400 static int 401 handle_sync_response(const struct rte_mp_msg *request, 402 const struct rte_mp_reply *reply) 403 { 404 enum malloc_req_result result; 405 struct mp_request *entry; 406 const struct malloc_mp_req *mpreq = 407 (const struct malloc_mp_req *)request->param; 408 int i; 409 410 /* lock the request */ 411 pthread_mutex_lock(&mp_request_list.lock); 412 413 entry = find_request_by_id(mpreq->id); 414 if (entry == NULL) { 415 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 416 goto fail; 417 } 418 419 result = REQ_RESULT_SUCCESS; 420 421 if (reply->nb_received != reply->nb_sent) 422 result = REQ_RESULT_FAIL; 423 424 for (i = 0; i < reply->nb_received; i++) { 425 struct malloc_mp_req *resp = 426 (struct malloc_mp_req *)reply->msgs[i].param; 427 428 if (resp->t != REQ_TYPE_SYNC) { 429 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n"); 430 result = REQ_RESULT_FAIL; 431 break; 432 } 433 if (resp->id != entry->user_req.id) { 434 RTE_LOG(ERR, EAL, "Response to wrong sync request\n"); 435 result = REQ_RESULT_FAIL; 436 break; 437 } 438 if (resp->result == REQ_RESULT_FAIL) { 439 result = REQ_RESULT_FAIL; 440 break; 441 } 442 } 443 444 if (entry->user_req.t == REQ_TYPE_FREE) { 445 struct rte_mp_msg msg; 446 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 447 448 memset(&msg, 0, sizeof(msg)); 449 450 /* this is a free request, just sendmsg result */ 451 resp->t = REQ_TYPE_FREE; 452 resp->result = result; 453 resp->id = entry->user_req.id; 454 msg.num_fds = 0; 455 msg.len_param = sizeof(*resp); 456 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 457 458 if (rte_mp_sendmsg(&msg)) 459 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 460 461 TAILQ_REMOVE(&mp_request_list.list, entry, next); 462 free(entry); 463 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 464 result == REQ_RESULT_SUCCESS) { 465 struct malloc_heap *heap = entry->alloc_state.heap; 466 struct rte_mp_msg msg; 467 struct malloc_mp_req *resp = 468 (struct malloc_mp_req *)msg.param; 469 470 memset(&msg, 0, sizeof(msg)); 471 472 heap->total_size += entry->alloc_state.map_len; 473 474 /* result is success, so just notify secondary about this */ 475 resp->t = REQ_TYPE_ALLOC; 476 resp->result = result; 477 resp->id = entry->user_req.id; 478 msg.num_fds = 0; 479 msg.len_param = sizeof(*resp); 480 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 481 482 if (rte_mp_sendmsg(&msg)) 483 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 484 485 TAILQ_REMOVE(&mp_request_list.list, entry, next); 486 free(entry->alloc_state.ms); 487 free(entry); 488 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 489 result == REQ_RESULT_FAIL) { 490 struct rte_mp_msg rb_msg; 491 struct malloc_mp_req *rb = 492 (struct malloc_mp_req *)rb_msg.param; 493 struct timespec ts; 494 struct primary_alloc_req_state *state = 495 &entry->alloc_state; 496 int ret; 497 498 memset(&rb_msg, 0, sizeof(rb_msg)); 499 500 /* we've failed to sync, so do a rollback */ 501 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 502 state->map_addr, state->map_len); 503 504 rollback_expand_heap(state->ms, state->ms_len, state->elem, 505 state->map_addr, state->map_len); 506 507 /* send rollback request */ 508 rb_msg.num_fds = 0; 509 rb_msg.len_param = sizeof(*rb); 510 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name)); 511 512 ts.tv_nsec = 0; 513 ts.tv_sec = MP_TIMEOUT_S; 514 515 /* sync requests carry no data */ 516 rb->t = REQ_TYPE_SYNC; 517 rb->id = entry->user_req.id; 518 519 /* there may be stray timeout still waiting */ 520 do { 521 ret = rte_mp_request_async(&rb_msg, &ts, 522 handle_rollback_response); 523 } while (ret != 0 && rte_errno == EEXIST); 524 if (ret != 0) { 525 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n"); 526 527 /* we couldn't send rollback request, but that's OK - 528 * secondary will time out, and memory has been removed 529 * from heap anyway. 530 */ 531 TAILQ_REMOVE(&mp_request_list.list, entry, next); 532 free(state->ms); 533 free(entry); 534 goto fail; 535 } 536 } else { 537 RTE_LOG(ERR, EAL, " to sync request of unknown type\n"); 538 goto fail; 539 } 540 541 pthread_mutex_unlock(&mp_request_list.lock); 542 return 0; 543 fail: 544 pthread_mutex_unlock(&mp_request_list.lock); 545 return -1; 546 } 547 548 static int 549 handle_rollback_response(const struct rte_mp_msg *request, 550 const struct rte_mp_reply *reply __rte_unused) 551 { 552 struct rte_mp_msg msg; 553 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 554 const struct malloc_mp_req *mpreq = 555 (const struct malloc_mp_req *)request->param; 556 struct mp_request *entry; 557 558 /* lock the request */ 559 pthread_mutex_lock(&mp_request_list.lock); 560 561 memset(&msg, 0, sizeof(msg)); 562 563 entry = find_request_by_id(mpreq->id); 564 if (entry == NULL) { 565 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 566 goto fail; 567 } 568 569 if (entry->user_req.t != REQ_TYPE_ALLOC) { 570 RTE_LOG(ERR, EAL, "Unexpected active request\n"); 571 goto fail; 572 } 573 574 /* we don't care if rollback succeeded, request still failed */ 575 resp->t = REQ_TYPE_ALLOC; 576 resp->result = REQ_RESULT_FAIL; 577 resp->id = mpreq->id; 578 msg.num_fds = 0; 579 msg.len_param = sizeof(*resp); 580 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 581 582 if (rte_mp_sendmsg(&msg)) 583 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 584 585 /* clean up */ 586 TAILQ_REMOVE(&mp_request_list.list, entry, next); 587 free(entry->alloc_state.ms); 588 free(entry); 589 590 pthread_mutex_unlock(&mp_request_list.lock); 591 return 0; 592 fail: 593 pthread_mutex_unlock(&mp_request_list.lock); 594 return -1; 595 } 596 597 /* final stage of the request from secondary */ 598 static int 599 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused) 600 { 601 const struct malloc_mp_req *m = 602 (const struct malloc_mp_req *)msg->param; 603 struct mp_request *entry; 604 605 pthread_mutex_lock(&mp_request_list.lock); 606 607 entry = find_request_by_id(m->id); 608 if (entry != NULL) { 609 /* update request status */ 610 entry->user_req.result = m->result; 611 612 entry->state = REQ_STATE_COMPLETE; 613 614 /* trigger thread wakeup */ 615 pthread_cond_signal(&entry->cond); 616 } 617 618 pthread_mutex_unlock(&mp_request_list.lock); 619 620 return 0; 621 } 622 623 /* synchronously request memory map sync, this is only called whenever primary 624 * process initiates the allocation. 625 */ 626 int 627 request_sync(void) 628 { 629 struct rte_mp_msg msg; 630 struct rte_mp_reply reply; 631 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param; 632 struct timespec ts; 633 int i, ret = -1; 634 635 memset(&msg, 0, sizeof(msg)); 636 memset(&reply, 0, sizeof(reply)); 637 638 /* no need to create tailq entries as this is entirely synchronous */ 639 640 msg.num_fds = 0; 641 msg.len_param = sizeof(*req); 642 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name)); 643 644 /* sync request carries no data */ 645 req->t = REQ_TYPE_SYNC; 646 req->id = get_unique_id(); 647 648 ts.tv_nsec = 0; 649 ts.tv_sec = MP_TIMEOUT_S; 650 651 /* there may be stray timeout still waiting */ 652 do { 653 ret = rte_mp_request_sync(&msg, &reply, &ts); 654 } while (ret != 0 && rte_errno == EEXIST); 655 if (ret != 0) { 656 /* if IPC is unsupported, behave as if the call succeeded */ 657 if (rte_errno != ENOTSUP) 658 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n"); 659 else 660 ret = 0; 661 goto out; 662 } 663 664 if (reply.nb_received != reply.nb_sent) { 665 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n"); 666 goto out; 667 } 668 669 for (i = 0; i < reply.nb_received; i++) { 670 struct malloc_mp_req *resp = 671 (struct malloc_mp_req *)reply.msgs[i].param; 672 if (resp->t != REQ_TYPE_SYNC) { 673 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n"); 674 goto out; 675 } 676 if (resp->id != req->id) { 677 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 678 goto out; 679 } 680 if (resp->result != REQ_RESULT_SUCCESS) { 681 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n"); 682 goto out; 683 } 684 } 685 686 ret = 0; 687 out: 688 free(reply.msgs); 689 return ret; 690 } 691 692 /* this is a synchronous wrapper around a bunch of asynchronous requests to 693 * primary process. this will initiate a request and wait until responses come. 694 */ 695 int 696 request_to_primary(struct malloc_mp_req *user_req) 697 { 698 struct rte_mp_msg msg; 699 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param; 700 struct mp_request *entry; 701 struct timespec ts; 702 struct timeval now; 703 int ret; 704 705 memset(&msg, 0, sizeof(msg)); 706 memset(&ts, 0, sizeof(ts)); 707 708 pthread_mutex_lock(&mp_request_list.lock); 709 710 entry = malloc(sizeof(*entry)); 711 if (entry == NULL) { 712 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n"); 713 goto fail; 714 } 715 716 memset(entry, 0, sizeof(*entry)); 717 718 if (gettimeofday(&now, NULL) < 0) { 719 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 720 goto fail; 721 } 722 723 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000; 724 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S + 725 (now.tv_usec * 1000) / 1000000000; 726 727 /* initialize the request */ 728 pthread_cond_init(&entry->cond, NULL); 729 730 msg.num_fds = 0; 731 msg.len_param = sizeof(*msg_req); 732 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name)); 733 734 /* (attempt to) get a unique id */ 735 user_req->id = get_unique_id(); 736 737 /* copy contents of user request into the message */ 738 memcpy(msg_req, user_req, sizeof(*msg_req)); 739 740 if (rte_mp_sendmsg(&msg)) { 741 RTE_LOG(ERR, EAL, "Cannot send message to primary\n"); 742 goto fail; 743 } 744 745 /* copy contents of user request into active request */ 746 memcpy(&entry->user_req, user_req, sizeof(*user_req)); 747 748 /* mark request as in progress */ 749 entry->state = REQ_STATE_ACTIVE; 750 751 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 752 753 /* finally, wait on timeout */ 754 do { 755 ret = pthread_cond_timedwait(&entry->cond, 756 &mp_request_list.lock, &ts); 757 } while (ret != 0 && ret != ETIMEDOUT); 758 759 if (entry->state != REQ_STATE_COMPLETE) { 760 RTE_LOG(ERR, EAL, "Request timed out\n"); 761 ret = -1; 762 } else { 763 ret = 0; 764 user_req->result = entry->user_req.result; 765 } 766 TAILQ_REMOVE(&mp_request_list.list, entry, next); 767 free(entry); 768 769 pthread_mutex_unlock(&mp_request_list.lock); 770 return ret; 771 fail: 772 pthread_mutex_unlock(&mp_request_list.lock); 773 free(entry); 774 return -1; 775 } 776 777 int 778 register_mp_requests(void) 779 { 780 if (rte_eal_process_type() == RTE_PROC_PRIMARY) { 781 /* it's OK for primary to not support IPC */ 782 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) && 783 rte_errno != ENOTSUP) { 784 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 785 MP_ACTION_REQUEST); 786 return -1; 787 } 788 } else { 789 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) { 790 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 791 MP_ACTION_SYNC); 792 return -1; 793 } 794 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) { 795 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 796 MP_ACTION_SYNC); 797 return -1; 798 } 799 if (rte_mp_action_register(MP_ACTION_RESPONSE, 800 handle_response)) { 801 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 802 MP_ACTION_RESPONSE); 803 return -1; 804 } 805 } 806 return 0; 807 } 808 809 void 810 unregister_mp_requests(void) 811 { 812 if (rte_eal_process_type() == RTE_PROC_PRIMARY) { 813 rte_mp_action_unregister(MP_ACTION_REQUEST); 814 } else { 815 rte_mp_action_unregister(MP_ACTION_SYNC); 816 rte_mp_action_unregister(MP_ACTION_ROLLBACK); 817 rte_mp_action_unregister(MP_ACTION_RESPONSE); 818 } 819 } 820