xref: /dpdk/lib/eal/common/malloc_mp.c (revision 429219adab185909a8127e680d19f7628af62fb2)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4 
5 #include <stdlib.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <sys/time.h>
9 
10 #include <rte_errno.h>
11 #include <rte_string_fns.h>
12 
13 #include "eal_memalloc.h"
14 #include "eal_memcfg.h"
15 #include "eal_private.h"
16 
17 #include "malloc_elem.h"
18 #include "malloc_mp.h"
19 
20 #define MP_ACTION_SYNC "mp_malloc_sync"
21 /**< request sent by primary process to notify of changes in memory map */
22 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
23 /**< request sent by primary process to notify of changes in memory map. this is
24  * essentially a regular sync request, but we cannot send sync requests while
25  * another one is in progress, and we might have to - therefore, we do this as
26  * a separate callback.
27  */
28 #define MP_ACTION_REQUEST "mp_malloc_request"
29 /**< request sent by secondary process to ask for allocation/deallocation */
30 #define MP_ACTION_RESPONSE "mp_malloc_response"
31 /**< response sent to secondary process to indicate result of request */
32 
33 /* forward declarations */
34 static int
35 handle_sync_response(const struct rte_mp_msg *request,
36 		const struct rte_mp_reply *reply);
37 static int
38 handle_rollback_response(const struct rte_mp_msg *request,
39 		const struct rte_mp_reply *reply);
40 
41 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
42 
43 /* when we're allocating, we need to store some state to ensure that we can
44  * roll back later
45  */
46 struct primary_alloc_req_state {
47 	struct malloc_heap *heap;
48 	struct rte_memseg **ms;
49 	int ms_len;
50 	struct malloc_elem *elem;
51 	void *map_addr;
52 	size_t map_len;
53 };
54 
55 enum req_state {
56 	REQ_STATE_INACTIVE = 0,
57 	REQ_STATE_ACTIVE,
58 	REQ_STATE_COMPLETE
59 };
60 
61 struct mp_request {
62 	TAILQ_ENTRY(mp_request) next;
63 	struct malloc_mp_req user_req; /**< contents of request */
64 	pthread_cond_t cond; /**< variable we use to time out on this request */
65 	enum req_state state; /**< indicate status of this request */
66 	struct primary_alloc_req_state alloc_state;
67 };
68 
69 /*
70  * We could've used just a single request, but it may be possible for
71  * secondaries to timeout earlier than the primary, and send a new request while
72  * primary is still expecting replies to the old one. Therefore, each new
73  * request will get assigned a new ID, which is how we will distinguish between
74  * expected and unexpected messages.
75  */
76 TAILQ_HEAD(mp_request_list, mp_request);
77 static struct {
78 	struct mp_request_list list;
79 	pthread_mutex_t lock;
80 } mp_request_list = {
81 	.list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
82 	.lock = PTHREAD_MUTEX_INITIALIZER
83 };
84 
85 /**
86  * General workflow is the following:
87  *
88  * Allocation:
89  * S: send request to primary
90  * P: attempt to allocate memory
91  *    if failed, sendmsg failure
92  *    if success, send sync request
93  * S: if received msg of failure, quit
94  *    if received sync request, synchronize memory map and reply with result
95  * P: if received sync request result
96  *    if success, sendmsg success
97  *    if failure, roll back allocation and send a rollback request
98  * S: if received msg of success, quit
99  *    if received rollback request, synchronize memory map and reply with result
100  * P: if received sync request result
101  *    sendmsg sync request result
102  * S: if received msg, quit
103  *
104  * Aside from timeouts, there are three points where we can quit:
105  *  - if allocation failed straight away
106  *  - if allocation and sync request succeeded
107  *  - if allocation succeeded, sync request failed, allocation rolled back and
108  *    rollback request received (irrespective of whether it succeeded or failed)
109  *
110  * Deallocation:
111  * S: send request to primary
112  * P: attempt to deallocate memory
113  *    if failed, sendmsg failure
114  *    if success, send sync request
115  * S: if received msg of failure, quit
116  *    if received sync request, synchronize memory map and reply with result
117  * P: if received sync request result
118  *    sendmsg sync request result
119  * S: if received msg, quit
120  *
121  * There is no "rollback" from deallocation, as it's safe to have some memory
122  * mapped in some processes - it's absent from the heap, so it won't get used.
123  */
124 
125 static struct mp_request *
126 find_request_by_id(uint64_t id)
127 {
128 	struct mp_request *req;
129 	TAILQ_FOREACH(req, &mp_request_list.list, next) {
130 		if (req->user_req.id == id)
131 			break;
132 	}
133 	return req;
134 }
135 
136 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
137 static uint64_t
138 get_unique_id(void)
139 {
140 	uint64_t id;
141 	do {
142 		id = rte_rand();
143 	} while (find_request_by_id(id) != NULL);
144 	return id;
145 }
146 
147 /* secondary will respond to sync requests thusly */
148 static int
149 handle_sync(const struct rte_mp_msg *msg, const void *peer)
150 {
151 	struct rte_mp_msg reply;
152 	const struct malloc_mp_req *req =
153 			(const struct malloc_mp_req *)msg->param;
154 	struct malloc_mp_req *resp =
155 			(struct malloc_mp_req *)reply.param;
156 	int ret;
157 
158 	if (req->t != REQ_TYPE_SYNC) {
159 		EAL_LOG(ERR, "Unexpected request from primary");
160 		return -1;
161 	}
162 
163 	memset(&reply, 0, sizeof(reply));
164 
165 	reply.num_fds = 0;
166 	strlcpy(reply.name, msg->name, sizeof(reply.name));
167 	reply.len_param = sizeof(*resp);
168 
169 	ret = eal_memalloc_sync_with_primary();
170 
171 	resp->t = REQ_TYPE_SYNC;
172 	resp->id = req->id;
173 	resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
174 
175 	return rte_mp_reply(&reply, peer);
176 }
177 
178 static int
179 handle_free_request(const struct malloc_mp_req *m)
180 {
181 	const struct rte_memseg_list *msl;
182 	void *start, *end;
183 	size_t len;
184 
185 	len = m->free_req.len;
186 	start = m->free_req.addr;
187 	end = RTE_PTR_ADD(start, len - 1);
188 
189 	/* check if the requested memory actually exists */
190 	msl = rte_mem_virt2memseg_list(start);
191 	if (msl == NULL) {
192 		EAL_LOG(ERR, "Requested to free unknown memory");
193 		return -1;
194 	}
195 
196 	/* check if end is within the same memory region */
197 	if (rte_mem_virt2memseg_list(end) != msl) {
198 		EAL_LOG(ERR, "Requested to free memory spanning multiple regions");
199 		return -1;
200 	}
201 
202 	/* we're supposed to only free memory that's not external */
203 	if (msl->external) {
204 		EAL_LOG(ERR, "Requested to free external memory");
205 		return -1;
206 	}
207 
208 	/* now that we've validated the request, announce it */
209 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
210 			m->free_req.addr, m->free_req.len);
211 
212 	/* now, do the actual freeing */
213 	return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
214 }
215 
216 static int
217 handle_alloc_request(const struct malloc_mp_req *m,
218 		struct mp_request *req)
219 {
220 	struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
221 	const struct malloc_req_alloc *ar = &m->alloc_req;
222 	struct malloc_heap *heap;
223 	struct malloc_elem *elem;
224 	struct rte_memseg **ms;
225 	size_t alloc_sz;
226 	int n_segs;
227 	void *map_addr;
228 
229 	/* this is checked by the API, but we need to prevent divide by zero */
230 	if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
231 		EAL_LOG(ERR, "Attempting to allocate with invalid page size");
232 		return -1;
233 	}
234 
235 	/* heap idx is index into the heap array, not socket ID */
236 	if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
237 		EAL_LOG(ERR, "Attempting to allocate from invalid heap");
238 		return -1;
239 	}
240 
241 	heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
242 
243 	/*
244 	 * for allocations, we must only use internal heaps, but since the
245 	 * rte_malloc_heap_socket_is_external() is thread-safe and we're already
246 	 * read-locked, we'll have to take advantage of the fact that internal
247 	 * socket ID's are always lower than RTE_MAX_NUMA_NODES.
248 	 */
249 	if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
250 		EAL_LOG(ERR, "Attempting to allocate from external heap");
251 		return -1;
252 	}
253 
254 	alloc_sz = RTE_ALIGN_CEIL(RTE_ALIGN_CEIL(ar->elt_size, ar->align) +
255 			MALLOC_ELEM_OVERHEAD, ar->page_sz);
256 	n_segs = alloc_sz / ar->page_sz;
257 
258 	/* we can't know in advance how many pages we'll need, so we malloc */
259 	ms = malloc(sizeof(*ms) * n_segs);
260 	if (ms == NULL) {
261 		EAL_LOG(ERR, "Couldn't allocate memory for request state");
262 		return -1;
263 	}
264 	memset(ms, 0, sizeof(*ms) * n_segs);
265 
266 	elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
267 			ar->flags, ar->align, ar->bound, ar->contig, ms,
268 			n_segs);
269 
270 	if (elem == NULL)
271 		goto fail;
272 
273 	map_addr = ms[0]->addr;
274 
275 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
276 
277 	/* we have succeeded in allocating memory, but we still need to sync
278 	 * with other processes. however, since DPDK IPC is single-threaded, we
279 	 * send an asynchronous request and exit this callback.
280 	 */
281 
282 	req->alloc_state.ms = ms;
283 	req->alloc_state.ms_len = n_segs;
284 	req->alloc_state.map_addr = map_addr;
285 	req->alloc_state.map_len = alloc_sz;
286 	req->alloc_state.elem = elem;
287 	req->alloc_state.heap = heap;
288 
289 	return 0;
290 fail:
291 	free(ms);
292 	return -1;
293 }
294 
295 /* first stage of primary handling requests from secondary */
296 static int
297 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
298 {
299 	const struct malloc_mp_req *m =
300 			(const struct malloc_mp_req *)msg->param;
301 	struct mp_request *entry;
302 	int ret;
303 
304 	/* lock access to request */
305 	pthread_mutex_lock(&mp_request_list.lock);
306 
307 	/* make sure it's not a dupe */
308 	entry = find_request_by_id(m->id);
309 	if (entry != NULL) {
310 		EAL_LOG(ERR, "Duplicate request id");
311 		goto fail;
312 	}
313 
314 	entry = malloc(sizeof(*entry));
315 	if (entry == NULL) {
316 		EAL_LOG(ERR, "Unable to allocate memory for request");
317 		goto fail;
318 	}
319 
320 	/* erase all data */
321 	memset(entry, 0, sizeof(*entry));
322 
323 	if (m->t == REQ_TYPE_ALLOC) {
324 		ret = handle_alloc_request(m, entry);
325 	} else if (m->t == REQ_TYPE_FREE) {
326 		ret = handle_free_request(m);
327 	} else {
328 		EAL_LOG(ERR, "Unexpected request from secondary");
329 		goto fail;
330 	}
331 
332 	if (ret != 0) {
333 		struct rte_mp_msg resp_msg;
334 		struct malloc_mp_req *resp =
335 				(struct malloc_mp_req *)resp_msg.param;
336 
337 		/* send failure message straight away */
338 		resp_msg.num_fds = 0;
339 		resp_msg.len_param = sizeof(*resp);
340 		strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
341 				sizeof(resp_msg.name));
342 
343 		resp->t = m->t;
344 		resp->result = REQ_RESULT_FAIL;
345 		resp->id = m->id;
346 
347 		if (rte_mp_sendmsg(&resp_msg)) {
348 			EAL_LOG(ERR, "Couldn't send response");
349 			goto fail;
350 		}
351 		/* we did not modify the request */
352 		free(entry);
353 	} else {
354 		struct rte_mp_msg sr_msg;
355 		struct malloc_mp_req *sr =
356 				(struct malloc_mp_req *)sr_msg.param;
357 		struct timespec ts;
358 
359 		memset(&sr_msg, 0, sizeof(sr_msg));
360 
361 		/* we can do something, so send sync request asynchronously */
362 		sr_msg.num_fds = 0;
363 		sr_msg.len_param = sizeof(*sr);
364 		strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
365 
366 		ts.tv_nsec = 0;
367 		ts.tv_sec = MP_TIMEOUT_S;
368 
369 		/* sync requests carry no data */
370 		sr->t = REQ_TYPE_SYNC;
371 		sr->id = m->id;
372 
373 		/* there may be stray timeout still waiting */
374 		do {
375 			ret = rte_mp_request_async(&sr_msg, &ts,
376 					handle_sync_response);
377 		} while (ret != 0 && rte_errno == EEXIST);
378 		if (ret != 0) {
379 			EAL_LOG(ERR, "Couldn't send sync request");
380 			if (m->t == REQ_TYPE_ALLOC)
381 				free(entry->alloc_state.ms);
382 			goto fail;
383 		}
384 
385 		/* mark request as in progress */
386 		memcpy(&entry->user_req, m, sizeof(*m));
387 		entry->state = REQ_STATE_ACTIVE;
388 
389 		TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
390 	}
391 	pthread_mutex_unlock(&mp_request_list.lock);
392 	return 0;
393 fail:
394 	pthread_mutex_unlock(&mp_request_list.lock);
395 	free(entry);
396 	return -1;
397 }
398 
399 /* callback for asynchronous sync requests for primary. this will either do a
400  * sendmsg with results, or trigger rollback request.
401  */
402 static int
403 handle_sync_response(const struct rte_mp_msg *request,
404 		const struct rte_mp_reply *reply)
405 {
406 	enum malloc_req_result result;
407 	struct mp_request *entry;
408 	const struct malloc_mp_req *mpreq =
409 			(const struct malloc_mp_req *)request->param;
410 	int i;
411 
412 	/* lock the request */
413 	pthread_mutex_lock(&mp_request_list.lock);
414 
415 	entry = find_request_by_id(mpreq->id);
416 	if (entry == NULL) {
417 		EAL_LOG(ERR, "Wrong request ID");
418 		goto fail;
419 	}
420 
421 	result = REQ_RESULT_SUCCESS;
422 
423 	if (reply->nb_received != reply->nb_sent)
424 		result = REQ_RESULT_FAIL;
425 
426 	for (i = 0; i < reply->nb_received; i++) {
427 		struct malloc_mp_req *resp =
428 				(struct malloc_mp_req *)reply->msgs[i].param;
429 
430 		if (resp->t != REQ_TYPE_SYNC) {
431 			EAL_LOG(ERR, "Unexpected response to sync request");
432 			result = REQ_RESULT_FAIL;
433 			break;
434 		}
435 		if (resp->id != entry->user_req.id) {
436 			EAL_LOG(ERR, "Response to wrong sync request");
437 			result = REQ_RESULT_FAIL;
438 			break;
439 		}
440 		if (resp->result == REQ_RESULT_FAIL) {
441 			result = REQ_RESULT_FAIL;
442 			break;
443 		}
444 	}
445 
446 	if (entry->user_req.t == REQ_TYPE_FREE) {
447 		struct rte_mp_msg msg;
448 		struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
449 
450 		memset(&msg, 0, sizeof(msg));
451 
452 		/* this is a free request, just sendmsg result */
453 		resp->t = REQ_TYPE_FREE;
454 		resp->result = result;
455 		resp->id = entry->user_req.id;
456 		msg.num_fds = 0;
457 		msg.len_param = sizeof(*resp);
458 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
459 
460 		if (rte_mp_sendmsg(&msg))
461 			EAL_LOG(ERR, "Could not send message to secondary process");
462 
463 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
464 		free(entry);
465 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
466 			result == REQ_RESULT_SUCCESS) {
467 		struct malloc_heap *heap = entry->alloc_state.heap;
468 		struct rte_mp_msg msg;
469 		struct malloc_mp_req *resp =
470 				(struct malloc_mp_req *)msg.param;
471 
472 		memset(&msg, 0, sizeof(msg));
473 
474 		heap->total_size += entry->alloc_state.map_len;
475 
476 		/* result is success, so just notify secondary about this */
477 		resp->t = REQ_TYPE_ALLOC;
478 		resp->result = result;
479 		resp->id = entry->user_req.id;
480 		msg.num_fds = 0;
481 		msg.len_param = sizeof(*resp);
482 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
483 
484 		if (rte_mp_sendmsg(&msg))
485 			EAL_LOG(ERR, "Could not send message to secondary process");
486 
487 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
488 		free(entry->alloc_state.ms);
489 		free(entry);
490 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
491 			result == REQ_RESULT_FAIL) {
492 		struct rte_mp_msg rb_msg;
493 		struct malloc_mp_req *rb =
494 				(struct malloc_mp_req *)rb_msg.param;
495 		struct timespec ts;
496 		struct primary_alloc_req_state *state =
497 				&entry->alloc_state;
498 		int ret;
499 
500 		memset(&rb_msg, 0, sizeof(rb_msg));
501 
502 		/* we've failed to sync, so do a rollback */
503 		eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
504 				state->map_addr, state->map_len);
505 
506 		rollback_expand_heap(state->ms, state->ms_len, state->elem,
507 				state->map_addr, state->map_len);
508 
509 		/* send rollback request */
510 		rb_msg.num_fds = 0;
511 		rb_msg.len_param = sizeof(*rb);
512 		strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
513 
514 		ts.tv_nsec = 0;
515 		ts.tv_sec = MP_TIMEOUT_S;
516 
517 		/* sync requests carry no data */
518 		rb->t = REQ_TYPE_SYNC;
519 		rb->id = entry->user_req.id;
520 
521 		/* there may be stray timeout still waiting */
522 		do {
523 			ret = rte_mp_request_async(&rb_msg, &ts,
524 					handle_rollback_response);
525 		} while (ret != 0 && rte_errno == EEXIST);
526 		if (ret != 0) {
527 			EAL_LOG(ERR, "Could not send rollback request to secondary process");
528 
529 			/* we couldn't send rollback request, but that's OK -
530 			 * secondary will time out, and memory has been removed
531 			 * from heap anyway.
532 			 */
533 			TAILQ_REMOVE(&mp_request_list.list, entry, next);
534 			free(state->ms);
535 			free(entry);
536 			goto fail;
537 		}
538 	} else {
539 		EAL_LOG(ERR, " to sync request of unknown type");
540 		goto fail;
541 	}
542 
543 	pthread_mutex_unlock(&mp_request_list.lock);
544 	return 0;
545 fail:
546 	pthread_mutex_unlock(&mp_request_list.lock);
547 	return -1;
548 }
549 
550 static int
551 handle_rollback_response(const struct rte_mp_msg *request,
552 		const struct rte_mp_reply *reply __rte_unused)
553 {
554 	struct rte_mp_msg msg;
555 	struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
556 	const struct malloc_mp_req *mpreq =
557 			(const struct malloc_mp_req *)request->param;
558 	struct mp_request *entry;
559 
560 	/* lock the request */
561 	pthread_mutex_lock(&mp_request_list.lock);
562 
563 	memset(&msg, 0, sizeof(msg));
564 
565 	entry = find_request_by_id(mpreq->id);
566 	if (entry == NULL) {
567 		EAL_LOG(ERR, "Wrong request ID");
568 		goto fail;
569 	}
570 
571 	if (entry->user_req.t != REQ_TYPE_ALLOC) {
572 		EAL_LOG(ERR, "Unexpected active request");
573 		goto fail;
574 	}
575 
576 	/* we don't care if rollback succeeded, request still failed */
577 	resp->t = REQ_TYPE_ALLOC;
578 	resp->result = REQ_RESULT_FAIL;
579 	resp->id = mpreq->id;
580 	msg.num_fds = 0;
581 	msg.len_param = sizeof(*resp);
582 	strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
583 
584 	if (rte_mp_sendmsg(&msg))
585 		EAL_LOG(ERR, "Could not send message to secondary process");
586 
587 	/* clean up */
588 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
589 	free(entry->alloc_state.ms);
590 	free(entry);
591 
592 	pthread_mutex_unlock(&mp_request_list.lock);
593 	return 0;
594 fail:
595 	pthread_mutex_unlock(&mp_request_list.lock);
596 	return -1;
597 }
598 
599 /* final stage of the request from secondary */
600 static int
601 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
602 {
603 	const struct malloc_mp_req *m =
604 			(const struct malloc_mp_req *)msg->param;
605 	struct mp_request *entry;
606 
607 	pthread_mutex_lock(&mp_request_list.lock);
608 
609 	entry = find_request_by_id(m->id);
610 	if (entry != NULL) {
611 		/* update request status */
612 		entry->user_req.result = m->result;
613 
614 		entry->state = REQ_STATE_COMPLETE;
615 
616 		/* trigger thread wakeup */
617 		pthread_cond_signal(&entry->cond);
618 	}
619 
620 	pthread_mutex_unlock(&mp_request_list.lock);
621 
622 	return 0;
623 }
624 
625 /* synchronously request memory map sync, this is only called whenever primary
626  * process initiates the allocation.
627  */
628 int
629 request_sync(void)
630 {
631 	struct rte_mp_msg msg;
632 	struct rte_mp_reply reply;
633 	struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
634 	struct timespec ts;
635 	int i, ret = -1;
636 
637 	memset(&msg, 0, sizeof(msg));
638 	memset(&reply, 0, sizeof(reply));
639 
640 	/* no need to create tailq entries as this is entirely synchronous */
641 
642 	msg.num_fds = 0;
643 	msg.len_param = sizeof(*req);
644 	strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
645 
646 	/* sync request carries no data */
647 	req->t = REQ_TYPE_SYNC;
648 	req->id = get_unique_id();
649 
650 	ts.tv_nsec = 0;
651 	ts.tv_sec = MP_TIMEOUT_S;
652 
653 	/* there may be stray timeout still waiting */
654 	do {
655 		ret = rte_mp_request_sync(&msg, &reply, &ts);
656 	} while (ret != 0 && rte_errno == EEXIST);
657 	if (ret != 0) {
658 		/* if IPC is unsupported, behave as if the call succeeded */
659 		if (rte_errno != ENOTSUP)
660 			EAL_LOG(ERR, "Could not send sync request to secondary process");
661 		else
662 			ret = 0;
663 		goto out;
664 	}
665 
666 	if (reply.nb_received != reply.nb_sent) {
667 		EAL_LOG(ERR, "Not all secondaries have responded");
668 		goto out;
669 	}
670 
671 	for (i = 0; i < reply.nb_received; i++) {
672 		struct malloc_mp_req *resp =
673 				(struct malloc_mp_req *)reply.msgs[i].param;
674 		if (resp->t != REQ_TYPE_SYNC) {
675 			EAL_LOG(ERR, "Unexpected response from secondary");
676 			goto out;
677 		}
678 		if (resp->id != req->id) {
679 			EAL_LOG(ERR, "Wrong request ID");
680 			goto out;
681 		}
682 		if (resp->result != REQ_RESULT_SUCCESS) {
683 			EAL_LOG(ERR, "Secondary process failed to synchronize");
684 			goto out;
685 		}
686 	}
687 
688 	ret = 0;
689 out:
690 	free(reply.msgs);
691 	return ret;
692 }
693 
694 /* this is a synchronous wrapper around a bunch of asynchronous requests to
695  * primary process. this will initiate a request and wait until responses come.
696  */
697 int
698 request_to_primary(struct malloc_mp_req *user_req)
699 {
700 	struct rte_mp_msg msg;
701 	struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
702 	struct mp_request *entry;
703 	struct timespec ts;
704 	struct timeval now;
705 	int ret;
706 
707 	memset(&msg, 0, sizeof(msg));
708 	memset(&ts, 0, sizeof(ts));
709 
710 	pthread_mutex_lock(&mp_request_list.lock);
711 
712 	entry = malloc(sizeof(*entry));
713 	if (entry == NULL) {
714 		EAL_LOG(ERR, "Cannot allocate memory for request");
715 		goto fail;
716 	}
717 
718 	memset(entry, 0, sizeof(*entry));
719 
720 	if (gettimeofday(&now, NULL) < 0) {
721 		EAL_LOG(ERR, "Cannot get current time");
722 		goto fail;
723 	}
724 
725 	ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
726 	ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
727 			(now.tv_usec * 1000) / 1000000000;
728 
729 	/* initialize the request */
730 	pthread_cond_init(&entry->cond, NULL);
731 
732 	msg.num_fds = 0;
733 	msg.len_param = sizeof(*msg_req);
734 	strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
735 
736 	/* (attempt to) get a unique id */
737 	user_req->id = get_unique_id();
738 
739 	/* copy contents of user request into the message */
740 	memcpy(msg_req, user_req, sizeof(*msg_req));
741 
742 	if (rte_mp_sendmsg(&msg)) {
743 		EAL_LOG(ERR, "Cannot send message to primary");
744 		goto fail;
745 	}
746 
747 	/* copy contents of user request into active request */
748 	memcpy(&entry->user_req, user_req, sizeof(*user_req));
749 
750 	/* mark request as in progress */
751 	entry->state = REQ_STATE_ACTIVE;
752 
753 	TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
754 
755 	/* finally, wait on timeout */
756 	do {
757 		ret = pthread_cond_timedwait(&entry->cond,
758 				&mp_request_list.lock, &ts);
759 	} while ((ret != 0 && ret != ETIMEDOUT) &&
760 			entry->state == REQ_STATE_ACTIVE);
761 
762 	if (entry->state != REQ_STATE_COMPLETE) {
763 		EAL_LOG(ERR, "Request timed out");
764 		ret = -1;
765 	} else {
766 		ret = 0;
767 		user_req->result = entry->user_req.result;
768 	}
769 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
770 	free(entry);
771 
772 	pthread_mutex_unlock(&mp_request_list.lock);
773 	return ret;
774 fail:
775 	pthread_mutex_unlock(&mp_request_list.lock);
776 	free(entry);
777 	return -1;
778 }
779 
780 int
781 register_mp_requests(void)
782 {
783 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
784 		/* it's OK for primary to not support IPC */
785 		if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
786 				rte_errno != ENOTSUP) {
787 			EAL_LOG(ERR, "Couldn't register '%s' action",
788 				MP_ACTION_REQUEST);
789 			return -1;
790 		}
791 	} else {
792 		if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
793 			EAL_LOG(ERR, "Couldn't register '%s' action",
794 				MP_ACTION_SYNC);
795 			return -1;
796 		}
797 		if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
798 			EAL_LOG(ERR, "Couldn't register '%s' action",
799 				MP_ACTION_SYNC);
800 			return -1;
801 		}
802 		if (rte_mp_action_register(MP_ACTION_RESPONSE,
803 				handle_response)) {
804 			EAL_LOG(ERR, "Couldn't register '%s' action",
805 				MP_ACTION_RESPONSE);
806 			return -1;
807 		}
808 	}
809 	return 0;
810 }
811 
812 void
813 unregister_mp_requests(void)
814 {
815 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
816 		rte_mp_action_unregister(MP_ACTION_REQUEST);
817 	} else {
818 		rte_mp_action_unregister(MP_ACTION_SYNC);
819 		rte_mp_action_unregister(MP_ACTION_ROLLBACK);
820 		rte_mp_action_unregister(MP_ACTION_RESPONSE);
821 	}
822 }
823