1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2018 Intel Corporation 3 */ 4 5 #include <string.h> 6 #include <sys/time.h> 7 8 #include <rte_alarm.h> 9 #include <rte_errno.h> 10 #include <rte_string_fns.h> 11 12 #include "eal_memalloc.h" 13 #include "eal_memcfg.h" 14 #include "eal_private.h" 15 16 #include "malloc_elem.h" 17 #include "malloc_mp.h" 18 19 #define MP_ACTION_SYNC "mp_malloc_sync" 20 /**< request sent by primary process to notify of changes in memory map */ 21 #define MP_ACTION_ROLLBACK "mp_malloc_rollback" 22 /**< request sent by primary process to notify of changes in memory map. this is 23 * essentially a regular sync request, but we cannot send sync requests while 24 * another one is in progress, and we might have to - therefore, we do this as 25 * a separate callback. 26 */ 27 #define MP_ACTION_REQUEST "mp_malloc_request" 28 /**< request sent by secondary process to ask for allocation/deallocation */ 29 #define MP_ACTION_RESPONSE "mp_malloc_response" 30 /**< response sent to secondary process to indicate result of request */ 31 32 /* forward declarations */ 33 static int 34 handle_sync_response(const struct rte_mp_msg *request, 35 const struct rte_mp_reply *reply); 36 static int 37 handle_rollback_response(const struct rte_mp_msg *request, 38 const struct rte_mp_reply *reply); 39 40 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */ 41 42 /* when we're allocating, we need to store some state to ensure that we can 43 * roll back later 44 */ 45 struct primary_alloc_req_state { 46 struct malloc_heap *heap; 47 struct rte_memseg **ms; 48 int ms_len; 49 struct malloc_elem *elem; 50 void *map_addr; 51 size_t map_len; 52 }; 53 54 enum req_state { 55 REQ_STATE_INACTIVE = 0, 56 REQ_STATE_ACTIVE, 57 REQ_STATE_COMPLETE 58 }; 59 60 struct mp_request { 61 TAILQ_ENTRY(mp_request) next; 62 struct malloc_mp_req user_req; /**< contents of request */ 63 pthread_cond_t cond; /**< variable we use to time out on this request */ 64 enum req_state state; /**< indicate status of this request */ 65 struct primary_alloc_req_state alloc_state; 66 }; 67 68 /* 69 * We could've used just a single request, but it may be possible for 70 * secondaries to timeout earlier than the primary, and send a new request while 71 * primary is still expecting replies to the old one. Therefore, each new 72 * request will get assigned a new ID, which is how we will distinguish between 73 * expected and unexpected messages. 74 */ 75 TAILQ_HEAD(mp_request_list, mp_request); 76 static struct { 77 struct mp_request_list list; 78 pthread_mutex_t lock; 79 } mp_request_list = { 80 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list), 81 .lock = PTHREAD_MUTEX_INITIALIZER 82 }; 83 84 /** 85 * General workflow is the following: 86 * 87 * Allocation: 88 * S: send request to primary 89 * P: attempt to allocate memory 90 * if failed, sendmsg failure 91 * if success, send sync request 92 * S: if received msg of failure, quit 93 * if received sync request, synchronize memory map and reply with result 94 * P: if received sync request result 95 * if success, sendmsg success 96 * if failure, roll back allocation and send a rollback request 97 * S: if received msg of success, quit 98 * if received rollback request, synchronize memory map and reply with result 99 * P: if received sync request result 100 * sendmsg sync request result 101 * S: if received msg, quit 102 * 103 * Aside from timeouts, there are three points where we can quit: 104 * - if allocation failed straight away 105 * - if allocation and sync request succeeded 106 * - if allocation succeeded, sync request failed, allocation rolled back and 107 * rollback request received (irrespective of whether it succeeded or failed) 108 * 109 * Deallocation: 110 * S: send request to primary 111 * P: attempt to deallocate memory 112 * if failed, sendmsg failure 113 * if success, send sync request 114 * S: if received msg of failure, quit 115 * if received sync request, synchronize memory map and reply with result 116 * P: if received sync request result 117 * sendmsg sync request result 118 * S: if received msg, quit 119 * 120 * There is no "rollback" from deallocation, as it's safe to have some memory 121 * mapped in some processes - it's absent from the heap, so it won't get used. 122 */ 123 124 static struct mp_request * 125 find_request_by_id(uint64_t id) 126 { 127 struct mp_request *req; 128 TAILQ_FOREACH(req, &mp_request_list.list, next) { 129 if (req->user_req.id == id) 130 break; 131 } 132 return req; 133 } 134 135 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */ 136 static uint64_t 137 get_unique_id(void) 138 { 139 uint64_t id; 140 do { 141 id = rte_rand(); 142 } while (find_request_by_id(id) != NULL); 143 return id; 144 } 145 146 /* secondary will respond to sync requests thusly */ 147 static int 148 handle_sync(const struct rte_mp_msg *msg, const void *peer) 149 { 150 struct rte_mp_msg reply; 151 const struct malloc_mp_req *req = 152 (const struct malloc_mp_req *)msg->param; 153 struct malloc_mp_req *resp = 154 (struct malloc_mp_req *)reply.param; 155 int ret; 156 157 if (req->t != REQ_TYPE_SYNC) { 158 RTE_LOG(ERR, EAL, "Unexpected request from primary\n"); 159 return -1; 160 } 161 162 memset(&reply, 0, sizeof(reply)); 163 164 reply.num_fds = 0; 165 strlcpy(reply.name, msg->name, sizeof(reply.name)); 166 reply.len_param = sizeof(*resp); 167 168 ret = eal_memalloc_sync_with_primary(); 169 170 resp->t = REQ_TYPE_SYNC; 171 resp->id = req->id; 172 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL; 173 174 rte_mp_reply(&reply, peer); 175 176 return 0; 177 } 178 179 static int 180 handle_free_request(const struct malloc_mp_req *m) 181 { 182 const struct rte_memseg_list *msl; 183 void *start, *end; 184 size_t len; 185 186 len = m->free_req.len; 187 start = m->free_req.addr; 188 end = RTE_PTR_ADD(start, len - 1); 189 190 /* check if the requested memory actually exists */ 191 msl = rte_mem_virt2memseg_list(start); 192 if (msl == NULL) { 193 RTE_LOG(ERR, EAL, "Requested to free unknown memory\n"); 194 return -1; 195 } 196 197 /* check if end is within the same memory region */ 198 if (rte_mem_virt2memseg_list(end) != msl) { 199 RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n"); 200 return -1; 201 } 202 203 /* we're supposed to only free memory that's not external */ 204 if (msl->external) { 205 RTE_LOG(ERR, EAL, "Requested to free external memory\n"); 206 return -1; 207 } 208 209 /* now that we've validated the request, announce it */ 210 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 211 m->free_req.addr, m->free_req.len); 212 213 /* now, do the actual freeing */ 214 return malloc_heap_free_pages(m->free_req.addr, m->free_req.len); 215 } 216 217 static int 218 handle_alloc_request(const struct malloc_mp_req *m, 219 struct mp_request *req) 220 { 221 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 222 const struct malloc_req_alloc *ar = &m->alloc_req; 223 struct malloc_heap *heap; 224 struct malloc_elem *elem; 225 struct rte_memseg **ms; 226 size_t alloc_sz; 227 int n_segs; 228 void *map_addr; 229 230 /* this is checked by the API, but we need to prevent divide by zero */ 231 if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) { 232 RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n"); 233 return -1; 234 } 235 236 /* heap idx is index into the heap array, not socket ID */ 237 if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) { 238 RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n"); 239 return -1; 240 } 241 242 heap = &mcfg->malloc_heaps[ar->malloc_heap_idx]; 243 244 /* 245 * for allocations, we must only use internal heaps, but since the 246 * rte_malloc_heap_socket_is_external() is thread-safe and we're already 247 * read-locked, we'll have to take advantage of the fact that internal 248 * socket ID's are always lower than RTE_MAX_NUMA_NODES. 249 */ 250 if (heap->socket_id >= RTE_MAX_NUMA_NODES) { 251 RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n"); 252 return -1; 253 } 254 255 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size + 256 MALLOC_ELEM_TRAILER_LEN, ar->page_sz); 257 n_segs = alloc_sz / ar->page_sz; 258 259 /* we can't know in advance how many pages we'll need, so we malloc */ 260 ms = malloc(sizeof(*ms) * n_segs); 261 if (ms == NULL) { 262 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n"); 263 return -1; 264 } 265 memset(ms, 0, sizeof(*ms) * n_segs); 266 267 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket, 268 ar->flags, ar->align, ar->bound, ar->contig, ms, 269 n_segs); 270 271 if (elem == NULL) 272 goto fail; 273 274 map_addr = ms[0]->addr; 275 276 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz); 277 278 /* we have succeeded in allocating memory, but we still need to sync 279 * with other processes. however, since DPDK IPC is single-threaded, we 280 * send an asynchronous request and exit this callback. 281 */ 282 283 req->alloc_state.ms = ms; 284 req->alloc_state.ms_len = n_segs; 285 req->alloc_state.map_addr = map_addr; 286 req->alloc_state.map_len = alloc_sz; 287 req->alloc_state.elem = elem; 288 req->alloc_state.heap = heap; 289 290 return 0; 291 fail: 292 free(ms); 293 return -1; 294 } 295 296 /* first stage of primary handling requests from secondary */ 297 static int 298 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused) 299 { 300 const struct malloc_mp_req *m = 301 (const struct malloc_mp_req *)msg->param; 302 struct mp_request *entry; 303 int ret; 304 305 /* lock access to request */ 306 pthread_mutex_lock(&mp_request_list.lock); 307 308 /* make sure it's not a dupe */ 309 entry = find_request_by_id(m->id); 310 if (entry != NULL) { 311 RTE_LOG(ERR, EAL, "Duplicate request id\n"); 312 goto fail; 313 } 314 315 entry = malloc(sizeof(*entry)); 316 if (entry == NULL) { 317 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n"); 318 goto fail; 319 } 320 321 /* erase all data */ 322 memset(entry, 0, sizeof(*entry)); 323 324 if (m->t == REQ_TYPE_ALLOC) { 325 ret = handle_alloc_request(m, entry); 326 } else if (m->t == REQ_TYPE_FREE) { 327 ret = handle_free_request(m); 328 } else { 329 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n"); 330 goto fail; 331 } 332 333 if (ret != 0) { 334 struct rte_mp_msg resp_msg; 335 struct malloc_mp_req *resp = 336 (struct malloc_mp_req *)resp_msg.param; 337 338 /* send failure message straight away */ 339 resp_msg.num_fds = 0; 340 resp_msg.len_param = sizeof(*resp); 341 strlcpy(resp_msg.name, MP_ACTION_RESPONSE, 342 sizeof(resp_msg.name)); 343 344 resp->t = m->t; 345 resp->result = REQ_RESULT_FAIL; 346 resp->id = m->id; 347 348 if (rte_mp_sendmsg(&resp_msg)) { 349 RTE_LOG(ERR, EAL, "Couldn't send response\n"); 350 goto fail; 351 } 352 /* we did not modify the request */ 353 free(entry); 354 } else { 355 struct rte_mp_msg sr_msg; 356 struct malloc_mp_req *sr = 357 (struct malloc_mp_req *)sr_msg.param; 358 struct timespec ts; 359 360 memset(&sr_msg, 0, sizeof(sr_msg)); 361 362 /* we can do something, so send sync request asynchronously */ 363 sr_msg.num_fds = 0; 364 sr_msg.len_param = sizeof(*sr); 365 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name)); 366 367 ts.tv_nsec = 0; 368 ts.tv_sec = MP_TIMEOUT_S; 369 370 /* sync requests carry no data */ 371 sr->t = REQ_TYPE_SYNC; 372 sr->id = m->id; 373 374 /* there may be stray timeout still waiting */ 375 do { 376 ret = rte_mp_request_async(&sr_msg, &ts, 377 handle_sync_response); 378 } while (ret != 0 && rte_errno == EEXIST); 379 if (ret != 0) { 380 RTE_LOG(ERR, EAL, "Couldn't send sync request\n"); 381 if (m->t == REQ_TYPE_ALLOC) 382 free(entry->alloc_state.ms); 383 goto fail; 384 } 385 386 /* mark request as in progress */ 387 memcpy(&entry->user_req, m, sizeof(*m)); 388 entry->state = REQ_STATE_ACTIVE; 389 390 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 391 } 392 pthread_mutex_unlock(&mp_request_list.lock); 393 return 0; 394 fail: 395 pthread_mutex_unlock(&mp_request_list.lock); 396 free(entry); 397 return -1; 398 } 399 400 /* callback for asynchronous sync requests for primary. this will either do a 401 * sendmsg with results, or trigger rollback request. 402 */ 403 static int 404 handle_sync_response(const struct rte_mp_msg *request, 405 const struct rte_mp_reply *reply) 406 { 407 enum malloc_req_result result; 408 struct mp_request *entry; 409 const struct malloc_mp_req *mpreq = 410 (const struct malloc_mp_req *)request->param; 411 int i; 412 413 /* lock the request */ 414 pthread_mutex_lock(&mp_request_list.lock); 415 416 entry = find_request_by_id(mpreq->id); 417 if (entry == NULL) { 418 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 419 goto fail; 420 } 421 422 result = REQ_RESULT_SUCCESS; 423 424 if (reply->nb_received != reply->nb_sent) 425 result = REQ_RESULT_FAIL; 426 427 for (i = 0; i < reply->nb_received; i++) { 428 struct malloc_mp_req *resp = 429 (struct malloc_mp_req *)reply->msgs[i].param; 430 431 if (resp->t != REQ_TYPE_SYNC) { 432 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n"); 433 result = REQ_RESULT_FAIL; 434 break; 435 } 436 if (resp->id != entry->user_req.id) { 437 RTE_LOG(ERR, EAL, "Response to wrong sync request\n"); 438 result = REQ_RESULT_FAIL; 439 break; 440 } 441 if (resp->result == REQ_RESULT_FAIL) { 442 result = REQ_RESULT_FAIL; 443 break; 444 } 445 } 446 447 if (entry->user_req.t == REQ_TYPE_FREE) { 448 struct rte_mp_msg msg; 449 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 450 451 memset(&msg, 0, sizeof(msg)); 452 453 /* this is a free request, just sendmsg result */ 454 resp->t = REQ_TYPE_FREE; 455 resp->result = result; 456 resp->id = entry->user_req.id; 457 msg.num_fds = 0; 458 msg.len_param = sizeof(*resp); 459 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 460 461 if (rte_mp_sendmsg(&msg)) 462 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 463 464 TAILQ_REMOVE(&mp_request_list.list, entry, next); 465 free(entry); 466 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 467 result == REQ_RESULT_SUCCESS) { 468 struct malloc_heap *heap = entry->alloc_state.heap; 469 struct rte_mp_msg msg; 470 struct malloc_mp_req *resp = 471 (struct malloc_mp_req *)msg.param; 472 473 memset(&msg, 0, sizeof(msg)); 474 475 heap->total_size += entry->alloc_state.map_len; 476 477 /* result is success, so just notify secondary about this */ 478 resp->t = REQ_TYPE_ALLOC; 479 resp->result = result; 480 resp->id = entry->user_req.id; 481 msg.num_fds = 0; 482 msg.len_param = sizeof(*resp); 483 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 484 485 if (rte_mp_sendmsg(&msg)) 486 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 487 488 TAILQ_REMOVE(&mp_request_list.list, entry, next); 489 free(entry->alloc_state.ms); 490 free(entry); 491 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 492 result == REQ_RESULT_FAIL) { 493 struct rte_mp_msg rb_msg; 494 struct malloc_mp_req *rb = 495 (struct malloc_mp_req *)rb_msg.param; 496 struct timespec ts; 497 struct primary_alloc_req_state *state = 498 &entry->alloc_state; 499 int ret; 500 501 memset(&rb_msg, 0, sizeof(rb_msg)); 502 503 /* we've failed to sync, so do a rollback */ 504 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 505 state->map_addr, state->map_len); 506 507 rollback_expand_heap(state->ms, state->ms_len, state->elem, 508 state->map_addr, state->map_len); 509 510 /* send rollback request */ 511 rb_msg.num_fds = 0; 512 rb_msg.len_param = sizeof(*rb); 513 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name)); 514 515 ts.tv_nsec = 0; 516 ts.tv_sec = MP_TIMEOUT_S; 517 518 /* sync requests carry no data */ 519 rb->t = REQ_TYPE_SYNC; 520 rb->id = entry->user_req.id; 521 522 /* there may be stray timeout still waiting */ 523 do { 524 ret = rte_mp_request_async(&rb_msg, &ts, 525 handle_rollback_response); 526 } while (ret != 0 && rte_errno == EEXIST); 527 if (ret != 0) { 528 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n"); 529 530 /* we couldn't send rollback request, but that's OK - 531 * secondary will time out, and memory has been removed 532 * from heap anyway. 533 */ 534 TAILQ_REMOVE(&mp_request_list.list, entry, next); 535 free(state->ms); 536 free(entry); 537 goto fail; 538 } 539 } else { 540 RTE_LOG(ERR, EAL, " to sync request of unknown type\n"); 541 goto fail; 542 } 543 544 pthread_mutex_unlock(&mp_request_list.lock); 545 return 0; 546 fail: 547 pthread_mutex_unlock(&mp_request_list.lock); 548 return -1; 549 } 550 551 static int 552 handle_rollback_response(const struct rte_mp_msg *request, 553 const struct rte_mp_reply *reply __rte_unused) 554 { 555 struct rte_mp_msg msg; 556 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 557 const struct malloc_mp_req *mpreq = 558 (const struct malloc_mp_req *)request->param; 559 struct mp_request *entry; 560 561 /* lock the request */ 562 pthread_mutex_lock(&mp_request_list.lock); 563 564 memset(&msg, 0, sizeof(msg)); 565 566 entry = find_request_by_id(mpreq->id); 567 if (entry == NULL) { 568 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 569 goto fail; 570 } 571 572 if (entry->user_req.t != REQ_TYPE_ALLOC) { 573 RTE_LOG(ERR, EAL, "Unexpected active request\n"); 574 goto fail; 575 } 576 577 /* we don't care if rollback succeeded, request still failed */ 578 resp->t = REQ_TYPE_ALLOC; 579 resp->result = REQ_RESULT_FAIL; 580 resp->id = mpreq->id; 581 msg.num_fds = 0; 582 msg.len_param = sizeof(*resp); 583 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 584 585 if (rte_mp_sendmsg(&msg)) 586 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 587 588 /* clean up */ 589 TAILQ_REMOVE(&mp_request_list.list, entry, next); 590 free(entry->alloc_state.ms); 591 free(entry); 592 593 pthread_mutex_unlock(&mp_request_list.lock); 594 return 0; 595 fail: 596 pthread_mutex_unlock(&mp_request_list.lock); 597 return -1; 598 } 599 600 /* final stage of the request from secondary */ 601 static int 602 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused) 603 { 604 const struct malloc_mp_req *m = 605 (const struct malloc_mp_req *)msg->param; 606 struct mp_request *entry; 607 608 pthread_mutex_lock(&mp_request_list.lock); 609 610 entry = find_request_by_id(m->id); 611 if (entry != NULL) { 612 /* update request status */ 613 entry->user_req.result = m->result; 614 615 entry->state = REQ_STATE_COMPLETE; 616 617 /* trigger thread wakeup */ 618 pthread_cond_signal(&entry->cond); 619 } 620 621 pthread_mutex_unlock(&mp_request_list.lock); 622 623 return 0; 624 } 625 626 /* synchronously request memory map sync, this is only called whenever primary 627 * process initiates the allocation. 628 */ 629 int 630 request_sync(void) 631 { 632 struct rte_mp_msg msg; 633 struct rte_mp_reply reply; 634 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param; 635 struct timespec ts; 636 int i, ret = -1; 637 638 memset(&msg, 0, sizeof(msg)); 639 memset(&reply, 0, sizeof(reply)); 640 641 /* no need to create tailq entries as this is entirely synchronous */ 642 643 msg.num_fds = 0; 644 msg.len_param = sizeof(*req); 645 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name)); 646 647 /* sync request carries no data */ 648 req->t = REQ_TYPE_SYNC; 649 req->id = get_unique_id(); 650 651 ts.tv_nsec = 0; 652 ts.tv_sec = MP_TIMEOUT_S; 653 654 /* there may be stray timeout still waiting */ 655 do { 656 ret = rte_mp_request_sync(&msg, &reply, &ts); 657 } while (ret != 0 && rte_errno == EEXIST); 658 if (ret != 0) { 659 /* if IPC is unsupported, behave as if the call succeeded */ 660 if (rte_errno != ENOTSUP) 661 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n"); 662 else 663 ret = 0; 664 goto out; 665 } 666 667 if (reply.nb_received != reply.nb_sent) { 668 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n"); 669 goto out; 670 } 671 672 for (i = 0; i < reply.nb_received; i++) { 673 struct malloc_mp_req *resp = 674 (struct malloc_mp_req *)reply.msgs[i].param; 675 if (resp->t != REQ_TYPE_SYNC) { 676 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n"); 677 goto out; 678 } 679 if (resp->id != req->id) { 680 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 681 goto out; 682 } 683 if (resp->result != REQ_RESULT_SUCCESS) { 684 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n"); 685 goto out; 686 } 687 } 688 689 ret = 0; 690 out: 691 free(reply.msgs); 692 return ret; 693 } 694 695 /* this is a synchronous wrapper around a bunch of asynchronous requests to 696 * primary process. this will initiate a request and wait until responses come. 697 */ 698 int 699 request_to_primary(struct malloc_mp_req *user_req) 700 { 701 struct rte_mp_msg msg; 702 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param; 703 struct mp_request *entry; 704 struct timespec ts; 705 struct timeval now; 706 int ret; 707 708 memset(&msg, 0, sizeof(msg)); 709 memset(&ts, 0, sizeof(ts)); 710 711 pthread_mutex_lock(&mp_request_list.lock); 712 713 entry = malloc(sizeof(*entry)); 714 if (entry == NULL) { 715 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n"); 716 goto fail; 717 } 718 719 memset(entry, 0, sizeof(*entry)); 720 721 if (gettimeofday(&now, NULL) < 0) { 722 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 723 goto fail; 724 } 725 726 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000; 727 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S + 728 (now.tv_usec * 1000) / 1000000000; 729 730 /* initialize the request */ 731 pthread_cond_init(&entry->cond, NULL); 732 733 msg.num_fds = 0; 734 msg.len_param = sizeof(*msg_req); 735 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name)); 736 737 /* (attempt to) get a unique id */ 738 user_req->id = get_unique_id(); 739 740 /* copy contents of user request into the message */ 741 memcpy(msg_req, user_req, sizeof(*msg_req)); 742 743 if (rte_mp_sendmsg(&msg)) { 744 RTE_LOG(ERR, EAL, "Cannot send message to primary\n"); 745 goto fail; 746 } 747 748 /* copy contents of user request into active request */ 749 memcpy(&entry->user_req, user_req, sizeof(*user_req)); 750 751 /* mark request as in progress */ 752 entry->state = REQ_STATE_ACTIVE; 753 754 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 755 756 /* finally, wait on timeout */ 757 do { 758 ret = pthread_cond_timedwait(&entry->cond, 759 &mp_request_list.lock, &ts); 760 } while (ret != 0 && ret != ETIMEDOUT); 761 762 if (entry->state != REQ_STATE_COMPLETE) { 763 RTE_LOG(ERR, EAL, "Request timed out\n"); 764 ret = -1; 765 } else { 766 ret = 0; 767 user_req->result = entry->user_req.result; 768 } 769 TAILQ_REMOVE(&mp_request_list.list, entry, next); 770 free(entry); 771 772 pthread_mutex_unlock(&mp_request_list.lock); 773 return ret; 774 fail: 775 pthread_mutex_unlock(&mp_request_list.lock); 776 free(entry); 777 return -1; 778 } 779 780 int 781 register_mp_requests(void) 782 { 783 if (rte_eal_process_type() == RTE_PROC_PRIMARY) { 784 /* it's OK for primary to not support IPC */ 785 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) && 786 rte_errno != ENOTSUP) { 787 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 788 MP_ACTION_REQUEST); 789 return -1; 790 } 791 } else { 792 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) { 793 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 794 MP_ACTION_SYNC); 795 return -1; 796 } 797 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) { 798 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 799 MP_ACTION_SYNC); 800 return -1; 801 } 802 if (rte_mp_action_register(MP_ACTION_RESPONSE, 803 handle_response)) { 804 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 805 MP_ACTION_RESPONSE); 806 return -1; 807 } 808 } 809 return 0; 810 } 811