xref: /dpdk/lib/eal/common/malloc_mp.c (revision 99a2dd955fba6e4cc23b77d590a033650ced9c45)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4 
5 #include <string.h>
6 #include <sys/time.h>
7 
8 #include <rte_alarm.h>
9 #include <rte_errno.h>
10 #include <rte_string_fns.h>
11 
12 #include "eal_memalloc.h"
13 #include "eal_memcfg.h"
14 #include "eal_private.h"
15 
16 #include "malloc_elem.h"
17 #include "malloc_mp.h"
18 
19 #define MP_ACTION_SYNC "mp_malloc_sync"
20 /**< request sent by primary process to notify of changes in memory map */
21 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
22 /**< request sent by primary process to notify of changes in memory map. this is
23  * essentially a regular sync request, but we cannot send sync requests while
24  * another one is in progress, and we might have to - therefore, we do this as
25  * a separate callback.
26  */
27 #define MP_ACTION_REQUEST "mp_malloc_request"
28 /**< request sent by secondary process to ask for allocation/deallocation */
29 #define MP_ACTION_RESPONSE "mp_malloc_response"
30 /**< response sent to secondary process to indicate result of request */
31 
32 /* forward declarations */
33 static int
34 handle_sync_response(const struct rte_mp_msg *request,
35 		const struct rte_mp_reply *reply);
36 static int
37 handle_rollback_response(const struct rte_mp_msg *request,
38 		const struct rte_mp_reply *reply);
39 
40 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
41 
42 /* when we're allocating, we need to store some state to ensure that we can
43  * roll back later
44  */
45 struct primary_alloc_req_state {
46 	struct malloc_heap *heap;
47 	struct rte_memseg **ms;
48 	int ms_len;
49 	struct malloc_elem *elem;
50 	void *map_addr;
51 	size_t map_len;
52 };
53 
54 enum req_state {
55 	REQ_STATE_INACTIVE = 0,
56 	REQ_STATE_ACTIVE,
57 	REQ_STATE_COMPLETE
58 };
59 
60 struct mp_request {
61 	TAILQ_ENTRY(mp_request) next;
62 	struct malloc_mp_req user_req; /**< contents of request */
63 	pthread_cond_t cond; /**< variable we use to time out on this request */
64 	enum req_state state; /**< indicate status of this request */
65 	struct primary_alloc_req_state alloc_state;
66 };
67 
68 /*
69  * We could've used just a single request, but it may be possible for
70  * secondaries to timeout earlier than the primary, and send a new request while
71  * primary is still expecting replies to the old one. Therefore, each new
72  * request will get assigned a new ID, which is how we will distinguish between
73  * expected and unexpected messages.
74  */
75 TAILQ_HEAD(mp_request_list, mp_request);
76 static struct {
77 	struct mp_request_list list;
78 	pthread_mutex_t lock;
79 } mp_request_list = {
80 	.list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
81 	.lock = PTHREAD_MUTEX_INITIALIZER
82 };
83 
84 /**
85  * General workflow is the following:
86  *
87  * Allocation:
88  * S: send request to primary
89  * P: attempt to allocate memory
90  *    if failed, sendmsg failure
91  *    if success, send sync request
92  * S: if received msg of failure, quit
93  *    if received sync request, synchronize memory map and reply with result
94  * P: if received sync request result
95  *    if success, sendmsg success
96  *    if failure, roll back allocation and send a rollback request
97  * S: if received msg of success, quit
98  *    if received rollback request, synchronize memory map and reply with result
99  * P: if received sync request result
100  *    sendmsg sync request result
101  * S: if received msg, quit
102  *
103  * Aside from timeouts, there are three points where we can quit:
104  *  - if allocation failed straight away
105  *  - if allocation and sync request succeeded
106  *  - if allocation succeeded, sync request failed, allocation rolled back and
107  *    rollback request received (irrespective of whether it succeeded or failed)
108  *
109  * Deallocation:
110  * S: send request to primary
111  * P: attempt to deallocate memory
112  *    if failed, sendmsg failure
113  *    if success, send sync request
114  * S: if received msg of failure, quit
115  *    if received sync request, synchronize memory map and reply with result
116  * P: if received sync request result
117  *    sendmsg sync request result
118  * S: if received msg, quit
119  *
120  * There is no "rollback" from deallocation, as it's safe to have some memory
121  * mapped in some processes - it's absent from the heap, so it won't get used.
122  */
123 
124 static struct mp_request *
125 find_request_by_id(uint64_t id)
126 {
127 	struct mp_request *req;
128 	TAILQ_FOREACH(req, &mp_request_list.list, next) {
129 		if (req->user_req.id == id)
130 			break;
131 	}
132 	return req;
133 }
134 
135 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
136 static uint64_t
137 get_unique_id(void)
138 {
139 	uint64_t id;
140 	do {
141 		id = rte_rand();
142 	} while (find_request_by_id(id) != NULL);
143 	return id;
144 }
145 
146 /* secondary will respond to sync requests thusly */
147 static int
148 handle_sync(const struct rte_mp_msg *msg, const void *peer)
149 {
150 	struct rte_mp_msg reply;
151 	const struct malloc_mp_req *req =
152 			(const struct malloc_mp_req *)msg->param;
153 	struct malloc_mp_req *resp =
154 			(struct malloc_mp_req *)reply.param;
155 	int ret;
156 
157 	if (req->t != REQ_TYPE_SYNC) {
158 		RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
159 		return -1;
160 	}
161 
162 	memset(&reply, 0, sizeof(reply));
163 
164 	reply.num_fds = 0;
165 	strlcpy(reply.name, msg->name, sizeof(reply.name));
166 	reply.len_param = sizeof(*resp);
167 
168 	ret = eal_memalloc_sync_with_primary();
169 
170 	resp->t = REQ_TYPE_SYNC;
171 	resp->id = req->id;
172 	resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
173 
174 	rte_mp_reply(&reply, peer);
175 
176 	return 0;
177 }
178 
179 static int
180 handle_free_request(const struct malloc_mp_req *m)
181 {
182 	const struct rte_memseg_list *msl;
183 	void *start, *end;
184 	size_t len;
185 
186 	len = m->free_req.len;
187 	start = m->free_req.addr;
188 	end = RTE_PTR_ADD(start, len - 1);
189 
190 	/* check if the requested memory actually exists */
191 	msl = rte_mem_virt2memseg_list(start);
192 	if (msl == NULL) {
193 		RTE_LOG(ERR, EAL, "Requested to free unknown memory\n");
194 		return -1;
195 	}
196 
197 	/* check if end is within the same memory region */
198 	if (rte_mem_virt2memseg_list(end) != msl) {
199 		RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n");
200 		return -1;
201 	}
202 
203 	/* we're supposed to only free memory that's not external */
204 	if (msl->external) {
205 		RTE_LOG(ERR, EAL, "Requested to free external memory\n");
206 		return -1;
207 	}
208 
209 	/* now that we've validated the request, announce it */
210 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
211 			m->free_req.addr, m->free_req.len);
212 
213 	/* now, do the actual freeing */
214 	return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
215 }
216 
217 static int
218 handle_alloc_request(const struct malloc_mp_req *m,
219 		struct mp_request *req)
220 {
221 	struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
222 	const struct malloc_req_alloc *ar = &m->alloc_req;
223 	struct malloc_heap *heap;
224 	struct malloc_elem *elem;
225 	struct rte_memseg **ms;
226 	size_t alloc_sz;
227 	int n_segs;
228 	void *map_addr;
229 
230 	/* this is checked by the API, but we need to prevent divide by zero */
231 	if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
232 		RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n");
233 		return -1;
234 	}
235 
236 	/* heap idx is index into the heap array, not socket ID */
237 	if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
238 		RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n");
239 		return -1;
240 	}
241 
242 	heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
243 
244 	/*
245 	 * for allocations, we must only use internal heaps, but since the
246 	 * rte_malloc_heap_socket_is_external() is thread-safe and we're already
247 	 * read-locked, we'll have to take advantage of the fact that internal
248 	 * socket ID's are always lower than RTE_MAX_NUMA_NODES.
249 	 */
250 	if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
251 		RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n");
252 		return -1;
253 	}
254 
255 	alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
256 			MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
257 	n_segs = alloc_sz / ar->page_sz;
258 
259 	/* we can't know in advance how many pages we'll need, so we malloc */
260 	ms = malloc(sizeof(*ms) * n_segs);
261 	if (ms == NULL) {
262 		RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
263 		return -1;
264 	}
265 	memset(ms, 0, sizeof(*ms) * n_segs);
266 
267 	elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
268 			ar->flags, ar->align, ar->bound, ar->contig, ms,
269 			n_segs);
270 
271 	if (elem == NULL)
272 		goto fail;
273 
274 	map_addr = ms[0]->addr;
275 
276 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
277 
278 	/* we have succeeded in allocating memory, but we still need to sync
279 	 * with other processes. however, since DPDK IPC is single-threaded, we
280 	 * send an asynchronous request and exit this callback.
281 	 */
282 
283 	req->alloc_state.ms = ms;
284 	req->alloc_state.ms_len = n_segs;
285 	req->alloc_state.map_addr = map_addr;
286 	req->alloc_state.map_len = alloc_sz;
287 	req->alloc_state.elem = elem;
288 	req->alloc_state.heap = heap;
289 
290 	return 0;
291 fail:
292 	free(ms);
293 	return -1;
294 }
295 
296 /* first stage of primary handling requests from secondary */
297 static int
298 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
299 {
300 	const struct malloc_mp_req *m =
301 			(const struct malloc_mp_req *)msg->param;
302 	struct mp_request *entry;
303 	int ret;
304 
305 	/* lock access to request */
306 	pthread_mutex_lock(&mp_request_list.lock);
307 
308 	/* make sure it's not a dupe */
309 	entry = find_request_by_id(m->id);
310 	if (entry != NULL) {
311 		RTE_LOG(ERR, EAL, "Duplicate request id\n");
312 		goto fail;
313 	}
314 
315 	entry = malloc(sizeof(*entry));
316 	if (entry == NULL) {
317 		RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
318 		goto fail;
319 	}
320 
321 	/* erase all data */
322 	memset(entry, 0, sizeof(*entry));
323 
324 	if (m->t == REQ_TYPE_ALLOC) {
325 		ret = handle_alloc_request(m, entry);
326 	} else if (m->t == REQ_TYPE_FREE) {
327 		ret = handle_free_request(m);
328 	} else {
329 		RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
330 		goto fail;
331 	}
332 
333 	if (ret != 0) {
334 		struct rte_mp_msg resp_msg;
335 		struct malloc_mp_req *resp =
336 				(struct malloc_mp_req *)resp_msg.param;
337 
338 		/* send failure message straight away */
339 		resp_msg.num_fds = 0;
340 		resp_msg.len_param = sizeof(*resp);
341 		strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
342 				sizeof(resp_msg.name));
343 
344 		resp->t = m->t;
345 		resp->result = REQ_RESULT_FAIL;
346 		resp->id = m->id;
347 
348 		if (rte_mp_sendmsg(&resp_msg)) {
349 			RTE_LOG(ERR, EAL, "Couldn't send response\n");
350 			goto fail;
351 		}
352 		/* we did not modify the request */
353 		free(entry);
354 	} else {
355 		struct rte_mp_msg sr_msg;
356 		struct malloc_mp_req *sr =
357 				(struct malloc_mp_req *)sr_msg.param;
358 		struct timespec ts;
359 
360 		memset(&sr_msg, 0, sizeof(sr_msg));
361 
362 		/* we can do something, so send sync request asynchronously */
363 		sr_msg.num_fds = 0;
364 		sr_msg.len_param = sizeof(*sr);
365 		strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
366 
367 		ts.tv_nsec = 0;
368 		ts.tv_sec = MP_TIMEOUT_S;
369 
370 		/* sync requests carry no data */
371 		sr->t = REQ_TYPE_SYNC;
372 		sr->id = m->id;
373 
374 		/* there may be stray timeout still waiting */
375 		do {
376 			ret = rte_mp_request_async(&sr_msg, &ts,
377 					handle_sync_response);
378 		} while (ret != 0 && rte_errno == EEXIST);
379 		if (ret != 0) {
380 			RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
381 			if (m->t == REQ_TYPE_ALLOC)
382 				free(entry->alloc_state.ms);
383 			goto fail;
384 		}
385 
386 		/* mark request as in progress */
387 		memcpy(&entry->user_req, m, sizeof(*m));
388 		entry->state = REQ_STATE_ACTIVE;
389 
390 		TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
391 	}
392 	pthread_mutex_unlock(&mp_request_list.lock);
393 	return 0;
394 fail:
395 	pthread_mutex_unlock(&mp_request_list.lock);
396 	free(entry);
397 	return -1;
398 }
399 
400 /* callback for asynchronous sync requests for primary. this will either do a
401  * sendmsg with results, or trigger rollback request.
402  */
403 static int
404 handle_sync_response(const struct rte_mp_msg *request,
405 		const struct rte_mp_reply *reply)
406 {
407 	enum malloc_req_result result;
408 	struct mp_request *entry;
409 	const struct malloc_mp_req *mpreq =
410 			(const struct malloc_mp_req *)request->param;
411 	int i;
412 
413 	/* lock the request */
414 	pthread_mutex_lock(&mp_request_list.lock);
415 
416 	entry = find_request_by_id(mpreq->id);
417 	if (entry == NULL) {
418 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
419 		goto fail;
420 	}
421 
422 	result = REQ_RESULT_SUCCESS;
423 
424 	if (reply->nb_received != reply->nb_sent)
425 		result = REQ_RESULT_FAIL;
426 
427 	for (i = 0; i < reply->nb_received; i++) {
428 		struct malloc_mp_req *resp =
429 				(struct malloc_mp_req *)reply->msgs[i].param;
430 
431 		if (resp->t != REQ_TYPE_SYNC) {
432 			RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
433 			result = REQ_RESULT_FAIL;
434 			break;
435 		}
436 		if (resp->id != entry->user_req.id) {
437 			RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
438 			result = REQ_RESULT_FAIL;
439 			break;
440 		}
441 		if (resp->result == REQ_RESULT_FAIL) {
442 			result = REQ_RESULT_FAIL;
443 			break;
444 		}
445 	}
446 
447 	if (entry->user_req.t == REQ_TYPE_FREE) {
448 		struct rte_mp_msg msg;
449 		struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
450 
451 		memset(&msg, 0, sizeof(msg));
452 
453 		/* this is a free request, just sendmsg result */
454 		resp->t = REQ_TYPE_FREE;
455 		resp->result = result;
456 		resp->id = entry->user_req.id;
457 		msg.num_fds = 0;
458 		msg.len_param = sizeof(*resp);
459 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
460 
461 		if (rte_mp_sendmsg(&msg))
462 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
463 
464 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
465 		free(entry);
466 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
467 			result == REQ_RESULT_SUCCESS) {
468 		struct malloc_heap *heap = entry->alloc_state.heap;
469 		struct rte_mp_msg msg;
470 		struct malloc_mp_req *resp =
471 				(struct malloc_mp_req *)msg.param;
472 
473 		memset(&msg, 0, sizeof(msg));
474 
475 		heap->total_size += entry->alloc_state.map_len;
476 
477 		/* result is success, so just notify secondary about this */
478 		resp->t = REQ_TYPE_ALLOC;
479 		resp->result = result;
480 		resp->id = entry->user_req.id;
481 		msg.num_fds = 0;
482 		msg.len_param = sizeof(*resp);
483 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
484 
485 		if (rte_mp_sendmsg(&msg))
486 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
487 
488 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
489 		free(entry->alloc_state.ms);
490 		free(entry);
491 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
492 			result == REQ_RESULT_FAIL) {
493 		struct rte_mp_msg rb_msg;
494 		struct malloc_mp_req *rb =
495 				(struct malloc_mp_req *)rb_msg.param;
496 		struct timespec ts;
497 		struct primary_alloc_req_state *state =
498 				&entry->alloc_state;
499 		int ret;
500 
501 		memset(&rb_msg, 0, sizeof(rb_msg));
502 
503 		/* we've failed to sync, so do a rollback */
504 		eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
505 				state->map_addr, state->map_len);
506 
507 		rollback_expand_heap(state->ms, state->ms_len, state->elem,
508 				state->map_addr, state->map_len);
509 
510 		/* send rollback request */
511 		rb_msg.num_fds = 0;
512 		rb_msg.len_param = sizeof(*rb);
513 		strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
514 
515 		ts.tv_nsec = 0;
516 		ts.tv_sec = MP_TIMEOUT_S;
517 
518 		/* sync requests carry no data */
519 		rb->t = REQ_TYPE_SYNC;
520 		rb->id = entry->user_req.id;
521 
522 		/* there may be stray timeout still waiting */
523 		do {
524 			ret = rte_mp_request_async(&rb_msg, &ts,
525 					handle_rollback_response);
526 		} while (ret != 0 && rte_errno == EEXIST);
527 		if (ret != 0) {
528 			RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
529 
530 			/* we couldn't send rollback request, but that's OK -
531 			 * secondary will time out, and memory has been removed
532 			 * from heap anyway.
533 			 */
534 			TAILQ_REMOVE(&mp_request_list.list, entry, next);
535 			free(state->ms);
536 			free(entry);
537 			goto fail;
538 		}
539 	} else {
540 		RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
541 		goto fail;
542 	}
543 
544 	pthread_mutex_unlock(&mp_request_list.lock);
545 	return 0;
546 fail:
547 	pthread_mutex_unlock(&mp_request_list.lock);
548 	return -1;
549 }
550 
551 static int
552 handle_rollback_response(const struct rte_mp_msg *request,
553 		const struct rte_mp_reply *reply __rte_unused)
554 {
555 	struct rte_mp_msg msg;
556 	struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
557 	const struct malloc_mp_req *mpreq =
558 			(const struct malloc_mp_req *)request->param;
559 	struct mp_request *entry;
560 
561 	/* lock the request */
562 	pthread_mutex_lock(&mp_request_list.lock);
563 
564 	memset(&msg, 0, sizeof(msg));
565 
566 	entry = find_request_by_id(mpreq->id);
567 	if (entry == NULL) {
568 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
569 		goto fail;
570 	}
571 
572 	if (entry->user_req.t != REQ_TYPE_ALLOC) {
573 		RTE_LOG(ERR, EAL, "Unexpected active request\n");
574 		goto fail;
575 	}
576 
577 	/* we don't care if rollback succeeded, request still failed */
578 	resp->t = REQ_TYPE_ALLOC;
579 	resp->result = REQ_RESULT_FAIL;
580 	resp->id = mpreq->id;
581 	msg.num_fds = 0;
582 	msg.len_param = sizeof(*resp);
583 	strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
584 
585 	if (rte_mp_sendmsg(&msg))
586 		RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
587 
588 	/* clean up */
589 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
590 	free(entry->alloc_state.ms);
591 	free(entry);
592 
593 	pthread_mutex_unlock(&mp_request_list.lock);
594 	return 0;
595 fail:
596 	pthread_mutex_unlock(&mp_request_list.lock);
597 	return -1;
598 }
599 
600 /* final stage of the request from secondary */
601 static int
602 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
603 {
604 	const struct malloc_mp_req *m =
605 			(const struct malloc_mp_req *)msg->param;
606 	struct mp_request *entry;
607 
608 	pthread_mutex_lock(&mp_request_list.lock);
609 
610 	entry = find_request_by_id(m->id);
611 	if (entry != NULL) {
612 		/* update request status */
613 		entry->user_req.result = m->result;
614 
615 		entry->state = REQ_STATE_COMPLETE;
616 
617 		/* trigger thread wakeup */
618 		pthread_cond_signal(&entry->cond);
619 	}
620 
621 	pthread_mutex_unlock(&mp_request_list.lock);
622 
623 	return 0;
624 }
625 
626 /* synchronously request memory map sync, this is only called whenever primary
627  * process initiates the allocation.
628  */
629 int
630 request_sync(void)
631 {
632 	struct rte_mp_msg msg;
633 	struct rte_mp_reply reply;
634 	struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
635 	struct timespec ts;
636 	int i, ret = -1;
637 
638 	memset(&msg, 0, sizeof(msg));
639 	memset(&reply, 0, sizeof(reply));
640 
641 	/* no need to create tailq entries as this is entirely synchronous */
642 
643 	msg.num_fds = 0;
644 	msg.len_param = sizeof(*req);
645 	strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
646 
647 	/* sync request carries no data */
648 	req->t = REQ_TYPE_SYNC;
649 	req->id = get_unique_id();
650 
651 	ts.tv_nsec = 0;
652 	ts.tv_sec = MP_TIMEOUT_S;
653 
654 	/* there may be stray timeout still waiting */
655 	do {
656 		ret = rte_mp_request_sync(&msg, &reply, &ts);
657 	} while (ret != 0 && rte_errno == EEXIST);
658 	if (ret != 0) {
659 		/* if IPC is unsupported, behave as if the call succeeded */
660 		if (rte_errno != ENOTSUP)
661 			RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
662 		else
663 			ret = 0;
664 		goto out;
665 	}
666 
667 	if (reply.nb_received != reply.nb_sent) {
668 		RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
669 		goto out;
670 	}
671 
672 	for (i = 0; i < reply.nb_received; i++) {
673 		struct malloc_mp_req *resp =
674 				(struct malloc_mp_req *)reply.msgs[i].param;
675 		if (resp->t != REQ_TYPE_SYNC) {
676 			RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
677 			goto out;
678 		}
679 		if (resp->id != req->id) {
680 			RTE_LOG(ERR, EAL, "Wrong request ID\n");
681 			goto out;
682 		}
683 		if (resp->result != REQ_RESULT_SUCCESS) {
684 			RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
685 			goto out;
686 		}
687 	}
688 
689 	ret = 0;
690 out:
691 	free(reply.msgs);
692 	return ret;
693 }
694 
695 /* this is a synchronous wrapper around a bunch of asynchronous requests to
696  * primary process. this will initiate a request and wait until responses come.
697  */
698 int
699 request_to_primary(struct malloc_mp_req *user_req)
700 {
701 	struct rte_mp_msg msg;
702 	struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
703 	struct mp_request *entry;
704 	struct timespec ts;
705 	struct timeval now;
706 	int ret;
707 
708 	memset(&msg, 0, sizeof(msg));
709 	memset(&ts, 0, sizeof(ts));
710 
711 	pthread_mutex_lock(&mp_request_list.lock);
712 
713 	entry = malloc(sizeof(*entry));
714 	if (entry == NULL) {
715 		RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
716 		goto fail;
717 	}
718 
719 	memset(entry, 0, sizeof(*entry));
720 
721 	if (gettimeofday(&now, NULL) < 0) {
722 		RTE_LOG(ERR, EAL, "Cannot get current time\n");
723 		goto fail;
724 	}
725 
726 	ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
727 	ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
728 			(now.tv_usec * 1000) / 1000000000;
729 
730 	/* initialize the request */
731 	pthread_cond_init(&entry->cond, NULL);
732 
733 	msg.num_fds = 0;
734 	msg.len_param = sizeof(*msg_req);
735 	strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
736 
737 	/* (attempt to) get a unique id */
738 	user_req->id = get_unique_id();
739 
740 	/* copy contents of user request into the message */
741 	memcpy(msg_req, user_req, sizeof(*msg_req));
742 
743 	if (rte_mp_sendmsg(&msg)) {
744 		RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
745 		goto fail;
746 	}
747 
748 	/* copy contents of user request into active request */
749 	memcpy(&entry->user_req, user_req, sizeof(*user_req));
750 
751 	/* mark request as in progress */
752 	entry->state = REQ_STATE_ACTIVE;
753 
754 	TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
755 
756 	/* finally, wait on timeout */
757 	do {
758 		ret = pthread_cond_timedwait(&entry->cond,
759 				&mp_request_list.lock, &ts);
760 	} while (ret != 0 && ret != ETIMEDOUT);
761 
762 	if (entry->state != REQ_STATE_COMPLETE) {
763 		RTE_LOG(ERR, EAL, "Request timed out\n");
764 		ret = -1;
765 	} else {
766 		ret = 0;
767 		user_req->result = entry->user_req.result;
768 	}
769 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
770 	free(entry);
771 
772 	pthread_mutex_unlock(&mp_request_list.lock);
773 	return ret;
774 fail:
775 	pthread_mutex_unlock(&mp_request_list.lock);
776 	free(entry);
777 	return -1;
778 }
779 
780 int
781 register_mp_requests(void)
782 {
783 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
784 		/* it's OK for primary to not support IPC */
785 		if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
786 				rte_errno != ENOTSUP) {
787 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
788 				MP_ACTION_REQUEST);
789 			return -1;
790 		}
791 	} else {
792 		if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
793 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
794 				MP_ACTION_SYNC);
795 			return -1;
796 		}
797 		if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
798 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
799 				MP_ACTION_SYNC);
800 			return -1;
801 		}
802 		if (rte_mp_action_register(MP_ACTION_RESPONSE,
803 				handle_response)) {
804 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
805 				MP_ACTION_RESPONSE);
806 			return -1;
807 		}
808 	}
809 	return 0;
810 }
811