1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2018 Intel Corporation 3 */ 4 5 #include <string.h> 6 #include <sys/time.h> 7 8 #include <rte_alarm.h> 9 #include <rte_errno.h> 10 #include <rte_string_fns.h> 11 12 #include "eal_memalloc.h" 13 #include "eal_memcfg.h" 14 #include "eal_private.h" 15 16 #include "malloc_elem.h" 17 #include "malloc_mp.h" 18 19 #define MP_ACTION_SYNC "mp_malloc_sync" 20 /**< request sent by primary process to notify of changes in memory map */ 21 #define MP_ACTION_ROLLBACK "mp_malloc_rollback" 22 /**< request sent by primary process to notify of changes in memory map. this is 23 * essentially a regular sync request, but we cannot send sync requests while 24 * another one is in progress, and we might have to - therefore, we do this as 25 * a separate callback. 26 */ 27 #define MP_ACTION_REQUEST "mp_malloc_request" 28 /**< request sent by secondary process to ask for allocation/deallocation */ 29 #define MP_ACTION_RESPONSE "mp_malloc_response" 30 /**< response sent to secondary process to indicate result of request */ 31 32 /* forward declarations */ 33 static int 34 handle_sync_response(const struct rte_mp_msg *request, 35 const struct rte_mp_reply *reply); 36 static int 37 handle_rollback_response(const struct rte_mp_msg *request, 38 const struct rte_mp_reply *reply); 39 40 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */ 41 42 /* when we're allocating, we need to store some state to ensure that we can 43 * roll back later 44 */ 45 struct primary_alloc_req_state { 46 struct malloc_heap *heap; 47 struct rte_memseg **ms; 48 int ms_len; 49 struct malloc_elem *elem; 50 void *map_addr; 51 size_t map_len; 52 }; 53 54 enum req_state { 55 REQ_STATE_INACTIVE = 0, 56 REQ_STATE_ACTIVE, 57 REQ_STATE_COMPLETE 58 }; 59 60 struct mp_request { 61 TAILQ_ENTRY(mp_request) next; 62 struct malloc_mp_req user_req; /**< contents of request */ 63 pthread_cond_t cond; /**< variable we use to time out on this request */ 64 enum req_state state; /**< indicate status of this request */ 65 struct primary_alloc_req_state alloc_state; 66 }; 67 68 /* 69 * We could've used just a single request, but it may be possible for 70 * secondaries to timeout earlier than the primary, and send a new request while 71 * primary is still expecting replies to the old one. Therefore, each new 72 * request will get assigned a new ID, which is how we will distinguish between 73 * expected and unexpected messages. 74 */ 75 TAILQ_HEAD(mp_request_list, mp_request); 76 static struct { 77 struct mp_request_list list; 78 pthread_mutex_t lock; 79 } mp_request_list = { 80 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list), 81 .lock = PTHREAD_MUTEX_INITIALIZER 82 }; 83 84 /** 85 * General workflow is the following: 86 * 87 * Allocation: 88 * S: send request to primary 89 * P: attempt to allocate memory 90 * if failed, sendmsg failure 91 * if success, send sync request 92 * S: if received msg of failure, quit 93 * if received sync request, synchronize memory map and reply with result 94 * P: if received sync request result 95 * if success, sendmsg success 96 * if failure, roll back allocation and send a rollback request 97 * S: if received msg of success, quit 98 * if received rollback request, synchronize memory map and reply with result 99 * P: if received sync request result 100 * sendmsg sync request result 101 * S: if received msg, quit 102 * 103 * Aside from timeouts, there are three points where we can quit: 104 * - if allocation failed straight away 105 * - if allocation and sync request succeeded 106 * - if allocation succeeded, sync request failed, allocation rolled back and 107 * rollback request received (irrespective of whether it succeeded or failed) 108 * 109 * Deallocation: 110 * S: send request to primary 111 * P: attempt to deallocate memory 112 * if failed, sendmsg failure 113 * if success, send sync request 114 * S: if received msg of failure, quit 115 * if received sync request, synchronize memory map and reply with result 116 * P: if received sync request result 117 * sendmsg sync request result 118 * S: if received msg, quit 119 * 120 * There is no "rollback" from deallocation, as it's safe to have some memory 121 * mapped in some processes - it's absent from the heap, so it won't get used. 122 */ 123 124 static struct mp_request * 125 find_request_by_id(uint64_t id) 126 { 127 struct mp_request *req; 128 TAILQ_FOREACH(req, &mp_request_list.list, next) { 129 if (req->user_req.id == id) 130 break; 131 } 132 return req; 133 } 134 135 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */ 136 static uint64_t 137 get_unique_id(void) 138 { 139 uint64_t id; 140 do { 141 id = rte_rand(); 142 } while (find_request_by_id(id) != NULL); 143 return id; 144 } 145 146 /* secondary will respond to sync requests thusly */ 147 static int 148 handle_sync(const struct rte_mp_msg *msg, const void *peer) 149 { 150 struct rte_mp_msg reply; 151 const struct malloc_mp_req *req = 152 (const struct malloc_mp_req *)msg->param; 153 struct malloc_mp_req *resp = 154 (struct malloc_mp_req *)reply.param; 155 int ret; 156 157 if (req->t != REQ_TYPE_SYNC) { 158 RTE_LOG(ERR, EAL, "Unexpected request from primary\n"); 159 return -1; 160 } 161 162 memset(&reply, 0, sizeof(reply)); 163 164 reply.num_fds = 0; 165 strlcpy(reply.name, msg->name, sizeof(reply.name)); 166 reply.len_param = sizeof(*resp); 167 168 ret = eal_memalloc_sync_with_primary(); 169 170 resp->t = REQ_TYPE_SYNC; 171 resp->id = req->id; 172 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL; 173 174 return rte_mp_reply(&reply, peer); 175 } 176 177 static int 178 handle_free_request(const struct malloc_mp_req *m) 179 { 180 const struct rte_memseg_list *msl; 181 void *start, *end; 182 size_t len; 183 184 len = m->free_req.len; 185 start = m->free_req.addr; 186 end = RTE_PTR_ADD(start, len - 1); 187 188 /* check if the requested memory actually exists */ 189 msl = rte_mem_virt2memseg_list(start); 190 if (msl == NULL) { 191 RTE_LOG(ERR, EAL, "Requested to free unknown memory\n"); 192 return -1; 193 } 194 195 /* check if end is within the same memory region */ 196 if (rte_mem_virt2memseg_list(end) != msl) { 197 RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n"); 198 return -1; 199 } 200 201 /* we're supposed to only free memory that's not external */ 202 if (msl->external) { 203 RTE_LOG(ERR, EAL, "Requested to free external memory\n"); 204 return -1; 205 } 206 207 /* now that we've validated the request, announce it */ 208 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 209 m->free_req.addr, m->free_req.len); 210 211 /* now, do the actual freeing */ 212 return malloc_heap_free_pages(m->free_req.addr, m->free_req.len); 213 } 214 215 static int 216 handle_alloc_request(const struct malloc_mp_req *m, 217 struct mp_request *req) 218 { 219 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 220 const struct malloc_req_alloc *ar = &m->alloc_req; 221 struct malloc_heap *heap; 222 struct malloc_elem *elem; 223 struct rte_memseg **ms; 224 size_t alloc_sz; 225 int n_segs; 226 void *map_addr; 227 228 /* this is checked by the API, but we need to prevent divide by zero */ 229 if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) { 230 RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n"); 231 return -1; 232 } 233 234 /* heap idx is index into the heap array, not socket ID */ 235 if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) { 236 RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n"); 237 return -1; 238 } 239 240 heap = &mcfg->malloc_heaps[ar->malloc_heap_idx]; 241 242 /* 243 * for allocations, we must only use internal heaps, but since the 244 * rte_malloc_heap_socket_is_external() is thread-safe and we're already 245 * read-locked, we'll have to take advantage of the fact that internal 246 * socket ID's are always lower than RTE_MAX_NUMA_NODES. 247 */ 248 if (heap->socket_id >= RTE_MAX_NUMA_NODES) { 249 RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n"); 250 return -1; 251 } 252 253 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size + 254 MALLOC_ELEM_TRAILER_LEN, ar->page_sz); 255 n_segs = alloc_sz / ar->page_sz; 256 257 /* we can't know in advance how many pages we'll need, so we malloc */ 258 ms = malloc(sizeof(*ms) * n_segs); 259 if (ms == NULL) { 260 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n"); 261 return -1; 262 } 263 memset(ms, 0, sizeof(*ms) * n_segs); 264 265 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket, 266 ar->flags, ar->align, ar->bound, ar->contig, ms, 267 n_segs); 268 269 if (elem == NULL) 270 goto fail; 271 272 map_addr = ms[0]->addr; 273 274 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz); 275 276 /* we have succeeded in allocating memory, but we still need to sync 277 * with other processes. however, since DPDK IPC is single-threaded, we 278 * send an asynchronous request and exit this callback. 279 */ 280 281 req->alloc_state.ms = ms; 282 req->alloc_state.ms_len = n_segs; 283 req->alloc_state.map_addr = map_addr; 284 req->alloc_state.map_len = alloc_sz; 285 req->alloc_state.elem = elem; 286 req->alloc_state.heap = heap; 287 288 return 0; 289 fail: 290 free(ms); 291 return -1; 292 } 293 294 /* first stage of primary handling requests from secondary */ 295 static int 296 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused) 297 { 298 const struct malloc_mp_req *m = 299 (const struct malloc_mp_req *)msg->param; 300 struct mp_request *entry; 301 int ret; 302 303 /* lock access to request */ 304 pthread_mutex_lock(&mp_request_list.lock); 305 306 /* make sure it's not a dupe */ 307 entry = find_request_by_id(m->id); 308 if (entry != NULL) { 309 RTE_LOG(ERR, EAL, "Duplicate request id\n"); 310 goto fail; 311 } 312 313 entry = malloc(sizeof(*entry)); 314 if (entry == NULL) { 315 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n"); 316 goto fail; 317 } 318 319 /* erase all data */ 320 memset(entry, 0, sizeof(*entry)); 321 322 if (m->t == REQ_TYPE_ALLOC) { 323 ret = handle_alloc_request(m, entry); 324 } else if (m->t == REQ_TYPE_FREE) { 325 ret = handle_free_request(m); 326 } else { 327 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n"); 328 goto fail; 329 } 330 331 if (ret != 0) { 332 struct rte_mp_msg resp_msg; 333 struct malloc_mp_req *resp = 334 (struct malloc_mp_req *)resp_msg.param; 335 336 /* send failure message straight away */ 337 resp_msg.num_fds = 0; 338 resp_msg.len_param = sizeof(*resp); 339 strlcpy(resp_msg.name, MP_ACTION_RESPONSE, 340 sizeof(resp_msg.name)); 341 342 resp->t = m->t; 343 resp->result = REQ_RESULT_FAIL; 344 resp->id = m->id; 345 346 if (rte_mp_sendmsg(&resp_msg)) { 347 RTE_LOG(ERR, EAL, "Couldn't send response\n"); 348 goto fail; 349 } 350 /* we did not modify the request */ 351 free(entry); 352 } else { 353 struct rte_mp_msg sr_msg; 354 struct malloc_mp_req *sr = 355 (struct malloc_mp_req *)sr_msg.param; 356 struct timespec ts; 357 358 memset(&sr_msg, 0, sizeof(sr_msg)); 359 360 /* we can do something, so send sync request asynchronously */ 361 sr_msg.num_fds = 0; 362 sr_msg.len_param = sizeof(*sr); 363 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name)); 364 365 ts.tv_nsec = 0; 366 ts.tv_sec = MP_TIMEOUT_S; 367 368 /* sync requests carry no data */ 369 sr->t = REQ_TYPE_SYNC; 370 sr->id = m->id; 371 372 /* there may be stray timeout still waiting */ 373 do { 374 ret = rte_mp_request_async(&sr_msg, &ts, 375 handle_sync_response); 376 } while (ret != 0 && rte_errno == EEXIST); 377 if (ret != 0) { 378 RTE_LOG(ERR, EAL, "Couldn't send sync request\n"); 379 if (m->t == REQ_TYPE_ALLOC) 380 free(entry->alloc_state.ms); 381 goto fail; 382 } 383 384 /* mark request as in progress */ 385 memcpy(&entry->user_req, m, sizeof(*m)); 386 entry->state = REQ_STATE_ACTIVE; 387 388 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 389 } 390 pthread_mutex_unlock(&mp_request_list.lock); 391 return 0; 392 fail: 393 pthread_mutex_unlock(&mp_request_list.lock); 394 free(entry); 395 return -1; 396 } 397 398 /* callback for asynchronous sync requests for primary. this will either do a 399 * sendmsg with results, or trigger rollback request. 400 */ 401 static int 402 handle_sync_response(const struct rte_mp_msg *request, 403 const struct rte_mp_reply *reply) 404 { 405 enum malloc_req_result result; 406 struct mp_request *entry; 407 const struct malloc_mp_req *mpreq = 408 (const struct malloc_mp_req *)request->param; 409 int i; 410 411 /* lock the request */ 412 pthread_mutex_lock(&mp_request_list.lock); 413 414 entry = find_request_by_id(mpreq->id); 415 if (entry == NULL) { 416 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 417 goto fail; 418 } 419 420 result = REQ_RESULT_SUCCESS; 421 422 if (reply->nb_received != reply->nb_sent) 423 result = REQ_RESULT_FAIL; 424 425 for (i = 0; i < reply->nb_received; i++) { 426 struct malloc_mp_req *resp = 427 (struct malloc_mp_req *)reply->msgs[i].param; 428 429 if (resp->t != REQ_TYPE_SYNC) { 430 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n"); 431 result = REQ_RESULT_FAIL; 432 break; 433 } 434 if (resp->id != entry->user_req.id) { 435 RTE_LOG(ERR, EAL, "Response to wrong sync request\n"); 436 result = REQ_RESULT_FAIL; 437 break; 438 } 439 if (resp->result == REQ_RESULT_FAIL) { 440 result = REQ_RESULT_FAIL; 441 break; 442 } 443 } 444 445 if (entry->user_req.t == REQ_TYPE_FREE) { 446 struct rte_mp_msg msg; 447 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 448 449 memset(&msg, 0, sizeof(msg)); 450 451 /* this is a free request, just sendmsg result */ 452 resp->t = REQ_TYPE_FREE; 453 resp->result = result; 454 resp->id = entry->user_req.id; 455 msg.num_fds = 0; 456 msg.len_param = sizeof(*resp); 457 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 458 459 if (rte_mp_sendmsg(&msg)) 460 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 461 462 TAILQ_REMOVE(&mp_request_list.list, entry, next); 463 free(entry); 464 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 465 result == REQ_RESULT_SUCCESS) { 466 struct malloc_heap *heap = entry->alloc_state.heap; 467 struct rte_mp_msg msg; 468 struct malloc_mp_req *resp = 469 (struct malloc_mp_req *)msg.param; 470 471 memset(&msg, 0, sizeof(msg)); 472 473 heap->total_size += entry->alloc_state.map_len; 474 475 /* result is success, so just notify secondary about this */ 476 resp->t = REQ_TYPE_ALLOC; 477 resp->result = result; 478 resp->id = entry->user_req.id; 479 msg.num_fds = 0; 480 msg.len_param = sizeof(*resp); 481 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 482 483 if (rte_mp_sendmsg(&msg)) 484 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 485 486 TAILQ_REMOVE(&mp_request_list.list, entry, next); 487 free(entry->alloc_state.ms); 488 free(entry); 489 } else if (entry->user_req.t == REQ_TYPE_ALLOC && 490 result == REQ_RESULT_FAIL) { 491 struct rte_mp_msg rb_msg; 492 struct malloc_mp_req *rb = 493 (struct malloc_mp_req *)rb_msg.param; 494 struct timespec ts; 495 struct primary_alloc_req_state *state = 496 &entry->alloc_state; 497 int ret; 498 499 memset(&rb_msg, 0, sizeof(rb_msg)); 500 501 /* we've failed to sync, so do a rollback */ 502 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE, 503 state->map_addr, state->map_len); 504 505 rollback_expand_heap(state->ms, state->ms_len, state->elem, 506 state->map_addr, state->map_len); 507 508 /* send rollback request */ 509 rb_msg.num_fds = 0; 510 rb_msg.len_param = sizeof(*rb); 511 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name)); 512 513 ts.tv_nsec = 0; 514 ts.tv_sec = MP_TIMEOUT_S; 515 516 /* sync requests carry no data */ 517 rb->t = REQ_TYPE_SYNC; 518 rb->id = entry->user_req.id; 519 520 /* there may be stray timeout still waiting */ 521 do { 522 ret = rte_mp_request_async(&rb_msg, &ts, 523 handle_rollback_response); 524 } while (ret != 0 && rte_errno == EEXIST); 525 if (ret != 0) { 526 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n"); 527 528 /* we couldn't send rollback request, but that's OK - 529 * secondary will time out, and memory has been removed 530 * from heap anyway. 531 */ 532 TAILQ_REMOVE(&mp_request_list.list, entry, next); 533 free(state->ms); 534 free(entry); 535 goto fail; 536 } 537 } else { 538 RTE_LOG(ERR, EAL, " to sync request of unknown type\n"); 539 goto fail; 540 } 541 542 pthread_mutex_unlock(&mp_request_list.lock); 543 return 0; 544 fail: 545 pthread_mutex_unlock(&mp_request_list.lock); 546 return -1; 547 } 548 549 static int 550 handle_rollback_response(const struct rte_mp_msg *request, 551 const struct rte_mp_reply *reply __rte_unused) 552 { 553 struct rte_mp_msg msg; 554 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param; 555 const struct malloc_mp_req *mpreq = 556 (const struct malloc_mp_req *)request->param; 557 struct mp_request *entry; 558 559 /* lock the request */ 560 pthread_mutex_lock(&mp_request_list.lock); 561 562 memset(&msg, 0, sizeof(msg)); 563 564 entry = find_request_by_id(mpreq->id); 565 if (entry == NULL) { 566 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 567 goto fail; 568 } 569 570 if (entry->user_req.t != REQ_TYPE_ALLOC) { 571 RTE_LOG(ERR, EAL, "Unexpected active request\n"); 572 goto fail; 573 } 574 575 /* we don't care if rollback succeeded, request still failed */ 576 resp->t = REQ_TYPE_ALLOC; 577 resp->result = REQ_RESULT_FAIL; 578 resp->id = mpreq->id; 579 msg.num_fds = 0; 580 msg.len_param = sizeof(*resp); 581 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name)); 582 583 if (rte_mp_sendmsg(&msg)) 584 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n"); 585 586 /* clean up */ 587 TAILQ_REMOVE(&mp_request_list.list, entry, next); 588 free(entry->alloc_state.ms); 589 free(entry); 590 591 pthread_mutex_unlock(&mp_request_list.lock); 592 return 0; 593 fail: 594 pthread_mutex_unlock(&mp_request_list.lock); 595 return -1; 596 } 597 598 /* final stage of the request from secondary */ 599 static int 600 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused) 601 { 602 const struct malloc_mp_req *m = 603 (const struct malloc_mp_req *)msg->param; 604 struct mp_request *entry; 605 606 pthread_mutex_lock(&mp_request_list.lock); 607 608 entry = find_request_by_id(m->id); 609 if (entry != NULL) { 610 /* update request status */ 611 entry->user_req.result = m->result; 612 613 entry->state = REQ_STATE_COMPLETE; 614 615 /* trigger thread wakeup */ 616 pthread_cond_signal(&entry->cond); 617 } 618 619 pthread_mutex_unlock(&mp_request_list.lock); 620 621 return 0; 622 } 623 624 /* synchronously request memory map sync, this is only called whenever primary 625 * process initiates the allocation. 626 */ 627 int 628 request_sync(void) 629 { 630 struct rte_mp_msg msg; 631 struct rte_mp_reply reply; 632 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param; 633 struct timespec ts; 634 int i, ret = -1; 635 636 memset(&msg, 0, sizeof(msg)); 637 memset(&reply, 0, sizeof(reply)); 638 639 /* no need to create tailq entries as this is entirely synchronous */ 640 641 msg.num_fds = 0; 642 msg.len_param = sizeof(*req); 643 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name)); 644 645 /* sync request carries no data */ 646 req->t = REQ_TYPE_SYNC; 647 req->id = get_unique_id(); 648 649 ts.tv_nsec = 0; 650 ts.tv_sec = MP_TIMEOUT_S; 651 652 /* there may be stray timeout still waiting */ 653 do { 654 ret = rte_mp_request_sync(&msg, &reply, &ts); 655 } while (ret != 0 && rte_errno == EEXIST); 656 if (ret != 0) { 657 /* if IPC is unsupported, behave as if the call succeeded */ 658 if (rte_errno != ENOTSUP) 659 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n"); 660 else 661 ret = 0; 662 goto out; 663 } 664 665 if (reply.nb_received != reply.nb_sent) { 666 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n"); 667 goto out; 668 } 669 670 for (i = 0; i < reply.nb_received; i++) { 671 struct malloc_mp_req *resp = 672 (struct malloc_mp_req *)reply.msgs[i].param; 673 if (resp->t != REQ_TYPE_SYNC) { 674 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n"); 675 goto out; 676 } 677 if (resp->id != req->id) { 678 RTE_LOG(ERR, EAL, "Wrong request ID\n"); 679 goto out; 680 } 681 if (resp->result != REQ_RESULT_SUCCESS) { 682 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n"); 683 goto out; 684 } 685 } 686 687 ret = 0; 688 out: 689 free(reply.msgs); 690 return ret; 691 } 692 693 /* this is a synchronous wrapper around a bunch of asynchronous requests to 694 * primary process. this will initiate a request and wait until responses come. 695 */ 696 int 697 request_to_primary(struct malloc_mp_req *user_req) 698 { 699 struct rte_mp_msg msg; 700 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param; 701 struct mp_request *entry; 702 struct timespec ts; 703 struct timeval now; 704 int ret; 705 706 memset(&msg, 0, sizeof(msg)); 707 memset(&ts, 0, sizeof(ts)); 708 709 pthread_mutex_lock(&mp_request_list.lock); 710 711 entry = malloc(sizeof(*entry)); 712 if (entry == NULL) { 713 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n"); 714 goto fail; 715 } 716 717 memset(entry, 0, sizeof(*entry)); 718 719 if (gettimeofday(&now, NULL) < 0) { 720 RTE_LOG(ERR, EAL, "Cannot get current time\n"); 721 goto fail; 722 } 723 724 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000; 725 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S + 726 (now.tv_usec * 1000) / 1000000000; 727 728 /* initialize the request */ 729 pthread_cond_init(&entry->cond, NULL); 730 731 msg.num_fds = 0; 732 msg.len_param = sizeof(*msg_req); 733 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name)); 734 735 /* (attempt to) get a unique id */ 736 user_req->id = get_unique_id(); 737 738 /* copy contents of user request into the message */ 739 memcpy(msg_req, user_req, sizeof(*msg_req)); 740 741 if (rte_mp_sendmsg(&msg)) { 742 RTE_LOG(ERR, EAL, "Cannot send message to primary\n"); 743 goto fail; 744 } 745 746 /* copy contents of user request into active request */ 747 memcpy(&entry->user_req, user_req, sizeof(*user_req)); 748 749 /* mark request as in progress */ 750 entry->state = REQ_STATE_ACTIVE; 751 752 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next); 753 754 /* finally, wait on timeout */ 755 do { 756 ret = pthread_cond_timedwait(&entry->cond, 757 &mp_request_list.lock, &ts); 758 } while (ret != 0 && ret != ETIMEDOUT); 759 760 if (entry->state != REQ_STATE_COMPLETE) { 761 RTE_LOG(ERR, EAL, "Request timed out\n"); 762 ret = -1; 763 } else { 764 ret = 0; 765 user_req->result = entry->user_req.result; 766 } 767 TAILQ_REMOVE(&mp_request_list.list, entry, next); 768 free(entry); 769 770 pthread_mutex_unlock(&mp_request_list.lock); 771 return ret; 772 fail: 773 pthread_mutex_unlock(&mp_request_list.lock); 774 free(entry); 775 return -1; 776 } 777 778 int 779 register_mp_requests(void) 780 { 781 if (rte_eal_process_type() == RTE_PROC_PRIMARY) { 782 /* it's OK for primary to not support IPC */ 783 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) && 784 rte_errno != ENOTSUP) { 785 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 786 MP_ACTION_REQUEST); 787 return -1; 788 } 789 } else { 790 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) { 791 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 792 MP_ACTION_SYNC); 793 return -1; 794 } 795 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) { 796 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 797 MP_ACTION_SYNC); 798 return -1; 799 } 800 if (rte_mp_action_register(MP_ACTION_RESPONSE, 801 handle_response)) { 802 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n", 803 MP_ACTION_RESPONSE); 804 return -1; 805 } 806 } 807 return 0; 808 } 809