xref: /dpdk/lib/eal/common/malloc_mp.c (revision daa02b5cddbb8e11b31d41e2bf7bb1ae64dcae2f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4 
5 #include <string.h>
6 #include <sys/time.h>
7 
8 #include <rte_alarm.h>
9 #include <rte_errno.h>
10 #include <rte_string_fns.h>
11 
12 #include "eal_memalloc.h"
13 #include "eal_memcfg.h"
14 #include "eal_private.h"
15 
16 #include "malloc_elem.h"
17 #include "malloc_mp.h"
18 
19 #define MP_ACTION_SYNC "mp_malloc_sync"
20 /**< request sent by primary process to notify of changes in memory map */
21 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
22 /**< request sent by primary process to notify of changes in memory map. this is
23  * essentially a regular sync request, but we cannot send sync requests while
24  * another one is in progress, and we might have to - therefore, we do this as
25  * a separate callback.
26  */
27 #define MP_ACTION_REQUEST "mp_malloc_request"
28 /**< request sent by secondary process to ask for allocation/deallocation */
29 #define MP_ACTION_RESPONSE "mp_malloc_response"
30 /**< response sent to secondary process to indicate result of request */
31 
32 /* forward declarations */
33 static int
34 handle_sync_response(const struct rte_mp_msg *request,
35 		const struct rte_mp_reply *reply);
36 static int
37 handle_rollback_response(const struct rte_mp_msg *request,
38 		const struct rte_mp_reply *reply);
39 
40 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
41 
42 /* when we're allocating, we need to store some state to ensure that we can
43  * roll back later
44  */
45 struct primary_alloc_req_state {
46 	struct malloc_heap *heap;
47 	struct rte_memseg **ms;
48 	int ms_len;
49 	struct malloc_elem *elem;
50 	void *map_addr;
51 	size_t map_len;
52 };
53 
54 enum req_state {
55 	REQ_STATE_INACTIVE = 0,
56 	REQ_STATE_ACTIVE,
57 	REQ_STATE_COMPLETE
58 };
59 
60 struct mp_request {
61 	TAILQ_ENTRY(mp_request) next;
62 	struct malloc_mp_req user_req; /**< contents of request */
63 	pthread_cond_t cond; /**< variable we use to time out on this request */
64 	enum req_state state; /**< indicate status of this request */
65 	struct primary_alloc_req_state alloc_state;
66 };
67 
68 /*
69  * We could've used just a single request, but it may be possible for
70  * secondaries to timeout earlier than the primary, and send a new request while
71  * primary is still expecting replies to the old one. Therefore, each new
72  * request will get assigned a new ID, which is how we will distinguish between
73  * expected and unexpected messages.
74  */
75 TAILQ_HEAD(mp_request_list, mp_request);
76 static struct {
77 	struct mp_request_list list;
78 	pthread_mutex_t lock;
79 } mp_request_list = {
80 	.list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
81 	.lock = PTHREAD_MUTEX_INITIALIZER
82 };
83 
84 /**
85  * General workflow is the following:
86  *
87  * Allocation:
88  * S: send request to primary
89  * P: attempt to allocate memory
90  *    if failed, sendmsg failure
91  *    if success, send sync request
92  * S: if received msg of failure, quit
93  *    if received sync request, synchronize memory map and reply with result
94  * P: if received sync request result
95  *    if success, sendmsg success
96  *    if failure, roll back allocation and send a rollback request
97  * S: if received msg of success, quit
98  *    if received rollback request, synchronize memory map and reply with result
99  * P: if received sync request result
100  *    sendmsg sync request result
101  * S: if received msg, quit
102  *
103  * Aside from timeouts, there are three points where we can quit:
104  *  - if allocation failed straight away
105  *  - if allocation and sync request succeeded
106  *  - if allocation succeeded, sync request failed, allocation rolled back and
107  *    rollback request received (irrespective of whether it succeeded or failed)
108  *
109  * Deallocation:
110  * S: send request to primary
111  * P: attempt to deallocate memory
112  *    if failed, sendmsg failure
113  *    if success, send sync request
114  * S: if received msg of failure, quit
115  *    if received sync request, synchronize memory map and reply with result
116  * P: if received sync request result
117  *    sendmsg sync request result
118  * S: if received msg, quit
119  *
120  * There is no "rollback" from deallocation, as it's safe to have some memory
121  * mapped in some processes - it's absent from the heap, so it won't get used.
122  */
123 
124 static struct mp_request *
125 find_request_by_id(uint64_t id)
126 {
127 	struct mp_request *req;
128 	TAILQ_FOREACH(req, &mp_request_list.list, next) {
129 		if (req->user_req.id == id)
130 			break;
131 	}
132 	return req;
133 }
134 
135 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
136 static uint64_t
137 get_unique_id(void)
138 {
139 	uint64_t id;
140 	do {
141 		id = rte_rand();
142 	} while (find_request_by_id(id) != NULL);
143 	return id;
144 }
145 
146 /* secondary will respond to sync requests thusly */
147 static int
148 handle_sync(const struct rte_mp_msg *msg, const void *peer)
149 {
150 	struct rte_mp_msg reply;
151 	const struct malloc_mp_req *req =
152 			(const struct malloc_mp_req *)msg->param;
153 	struct malloc_mp_req *resp =
154 			(struct malloc_mp_req *)reply.param;
155 	int ret;
156 
157 	if (req->t != REQ_TYPE_SYNC) {
158 		RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
159 		return -1;
160 	}
161 
162 	memset(&reply, 0, sizeof(reply));
163 
164 	reply.num_fds = 0;
165 	strlcpy(reply.name, msg->name, sizeof(reply.name));
166 	reply.len_param = sizeof(*resp);
167 
168 	ret = eal_memalloc_sync_with_primary();
169 
170 	resp->t = REQ_TYPE_SYNC;
171 	resp->id = req->id;
172 	resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
173 
174 	return rte_mp_reply(&reply, peer);
175 }
176 
177 static int
178 handle_free_request(const struct malloc_mp_req *m)
179 {
180 	const struct rte_memseg_list *msl;
181 	void *start, *end;
182 	size_t len;
183 
184 	len = m->free_req.len;
185 	start = m->free_req.addr;
186 	end = RTE_PTR_ADD(start, len - 1);
187 
188 	/* check if the requested memory actually exists */
189 	msl = rte_mem_virt2memseg_list(start);
190 	if (msl == NULL) {
191 		RTE_LOG(ERR, EAL, "Requested to free unknown memory\n");
192 		return -1;
193 	}
194 
195 	/* check if end is within the same memory region */
196 	if (rte_mem_virt2memseg_list(end) != msl) {
197 		RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n");
198 		return -1;
199 	}
200 
201 	/* we're supposed to only free memory that's not external */
202 	if (msl->external) {
203 		RTE_LOG(ERR, EAL, "Requested to free external memory\n");
204 		return -1;
205 	}
206 
207 	/* now that we've validated the request, announce it */
208 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
209 			m->free_req.addr, m->free_req.len);
210 
211 	/* now, do the actual freeing */
212 	return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
213 }
214 
215 static int
216 handle_alloc_request(const struct malloc_mp_req *m,
217 		struct mp_request *req)
218 {
219 	struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
220 	const struct malloc_req_alloc *ar = &m->alloc_req;
221 	struct malloc_heap *heap;
222 	struct malloc_elem *elem;
223 	struct rte_memseg **ms;
224 	size_t alloc_sz;
225 	int n_segs;
226 	void *map_addr;
227 
228 	/* this is checked by the API, but we need to prevent divide by zero */
229 	if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
230 		RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n");
231 		return -1;
232 	}
233 
234 	/* heap idx is index into the heap array, not socket ID */
235 	if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
236 		RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n");
237 		return -1;
238 	}
239 
240 	heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
241 
242 	/*
243 	 * for allocations, we must only use internal heaps, but since the
244 	 * rte_malloc_heap_socket_is_external() is thread-safe and we're already
245 	 * read-locked, we'll have to take advantage of the fact that internal
246 	 * socket ID's are always lower than RTE_MAX_NUMA_NODES.
247 	 */
248 	if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
249 		RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n");
250 		return -1;
251 	}
252 
253 	alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
254 			MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
255 	n_segs = alloc_sz / ar->page_sz;
256 
257 	/* we can't know in advance how many pages we'll need, so we malloc */
258 	ms = malloc(sizeof(*ms) * n_segs);
259 	if (ms == NULL) {
260 		RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
261 		return -1;
262 	}
263 	memset(ms, 0, sizeof(*ms) * n_segs);
264 
265 	elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
266 			ar->flags, ar->align, ar->bound, ar->contig, ms,
267 			n_segs);
268 
269 	if (elem == NULL)
270 		goto fail;
271 
272 	map_addr = ms[0]->addr;
273 
274 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
275 
276 	/* we have succeeded in allocating memory, but we still need to sync
277 	 * with other processes. however, since DPDK IPC is single-threaded, we
278 	 * send an asynchronous request and exit this callback.
279 	 */
280 
281 	req->alloc_state.ms = ms;
282 	req->alloc_state.ms_len = n_segs;
283 	req->alloc_state.map_addr = map_addr;
284 	req->alloc_state.map_len = alloc_sz;
285 	req->alloc_state.elem = elem;
286 	req->alloc_state.heap = heap;
287 
288 	return 0;
289 fail:
290 	free(ms);
291 	return -1;
292 }
293 
294 /* first stage of primary handling requests from secondary */
295 static int
296 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
297 {
298 	const struct malloc_mp_req *m =
299 			(const struct malloc_mp_req *)msg->param;
300 	struct mp_request *entry;
301 	int ret;
302 
303 	/* lock access to request */
304 	pthread_mutex_lock(&mp_request_list.lock);
305 
306 	/* make sure it's not a dupe */
307 	entry = find_request_by_id(m->id);
308 	if (entry != NULL) {
309 		RTE_LOG(ERR, EAL, "Duplicate request id\n");
310 		goto fail;
311 	}
312 
313 	entry = malloc(sizeof(*entry));
314 	if (entry == NULL) {
315 		RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
316 		goto fail;
317 	}
318 
319 	/* erase all data */
320 	memset(entry, 0, sizeof(*entry));
321 
322 	if (m->t == REQ_TYPE_ALLOC) {
323 		ret = handle_alloc_request(m, entry);
324 	} else if (m->t == REQ_TYPE_FREE) {
325 		ret = handle_free_request(m);
326 	} else {
327 		RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
328 		goto fail;
329 	}
330 
331 	if (ret != 0) {
332 		struct rte_mp_msg resp_msg;
333 		struct malloc_mp_req *resp =
334 				(struct malloc_mp_req *)resp_msg.param;
335 
336 		/* send failure message straight away */
337 		resp_msg.num_fds = 0;
338 		resp_msg.len_param = sizeof(*resp);
339 		strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
340 				sizeof(resp_msg.name));
341 
342 		resp->t = m->t;
343 		resp->result = REQ_RESULT_FAIL;
344 		resp->id = m->id;
345 
346 		if (rte_mp_sendmsg(&resp_msg)) {
347 			RTE_LOG(ERR, EAL, "Couldn't send response\n");
348 			goto fail;
349 		}
350 		/* we did not modify the request */
351 		free(entry);
352 	} else {
353 		struct rte_mp_msg sr_msg;
354 		struct malloc_mp_req *sr =
355 				(struct malloc_mp_req *)sr_msg.param;
356 		struct timespec ts;
357 
358 		memset(&sr_msg, 0, sizeof(sr_msg));
359 
360 		/* we can do something, so send sync request asynchronously */
361 		sr_msg.num_fds = 0;
362 		sr_msg.len_param = sizeof(*sr);
363 		strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
364 
365 		ts.tv_nsec = 0;
366 		ts.tv_sec = MP_TIMEOUT_S;
367 
368 		/* sync requests carry no data */
369 		sr->t = REQ_TYPE_SYNC;
370 		sr->id = m->id;
371 
372 		/* there may be stray timeout still waiting */
373 		do {
374 			ret = rte_mp_request_async(&sr_msg, &ts,
375 					handle_sync_response);
376 		} while (ret != 0 && rte_errno == EEXIST);
377 		if (ret != 0) {
378 			RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
379 			if (m->t == REQ_TYPE_ALLOC)
380 				free(entry->alloc_state.ms);
381 			goto fail;
382 		}
383 
384 		/* mark request as in progress */
385 		memcpy(&entry->user_req, m, sizeof(*m));
386 		entry->state = REQ_STATE_ACTIVE;
387 
388 		TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
389 	}
390 	pthread_mutex_unlock(&mp_request_list.lock);
391 	return 0;
392 fail:
393 	pthread_mutex_unlock(&mp_request_list.lock);
394 	free(entry);
395 	return -1;
396 }
397 
398 /* callback for asynchronous sync requests for primary. this will either do a
399  * sendmsg with results, or trigger rollback request.
400  */
401 static int
402 handle_sync_response(const struct rte_mp_msg *request,
403 		const struct rte_mp_reply *reply)
404 {
405 	enum malloc_req_result result;
406 	struct mp_request *entry;
407 	const struct malloc_mp_req *mpreq =
408 			(const struct malloc_mp_req *)request->param;
409 	int i;
410 
411 	/* lock the request */
412 	pthread_mutex_lock(&mp_request_list.lock);
413 
414 	entry = find_request_by_id(mpreq->id);
415 	if (entry == NULL) {
416 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
417 		goto fail;
418 	}
419 
420 	result = REQ_RESULT_SUCCESS;
421 
422 	if (reply->nb_received != reply->nb_sent)
423 		result = REQ_RESULT_FAIL;
424 
425 	for (i = 0; i < reply->nb_received; i++) {
426 		struct malloc_mp_req *resp =
427 				(struct malloc_mp_req *)reply->msgs[i].param;
428 
429 		if (resp->t != REQ_TYPE_SYNC) {
430 			RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
431 			result = REQ_RESULT_FAIL;
432 			break;
433 		}
434 		if (resp->id != entry->user_req.id) {
435 			RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
436 			result = REQ_RESULT_FAIL;
437 			break;
438 		}
439 		if (resp->result == REQ_RESULT_FAIL) {
440 			result = REQ_RESULT_FAIL;
441 			break;
442 		}
443 	}
444 
445 	if (entry->user_req.t == REQ_TYPE_FREE) {
446 		struct rte_mp_msg msg;
447 		struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
448 
449 		memset(&msg, 0, sizeof(msg));
450 
451 		/* this is a free request, just sendmsg result */
452 		resp->t = REQ_TYPE_FREE;
453 		resp->result = result;
454 		resp->id = entry->user_req.id;
455 		msg.num_fds = 0;
456 		msg.len_param = sizeof(*resp);
457 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
458 
459 		if (rte_mp_sendmsg(&msg))
460 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
461 
462 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
463 		free(entry);
464 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
465 			result == REQ_RESULT_SUCCESS) {
466 		struct malloc_heap *heap = entry->alloc_state.heap;
467 		struct rte_mp_msg msg;
468 		struct malloc_mp_req *resp =
469 				(struct malloc_mp_req *)msg.param;
470 
471 		memset(&msg, 0, sizeof(msg));
472 
473 		heap->total_size += entry->alloc_state.map_len;
474 
475 		/* result is success, so just notify secondary about this */
476 		resp->t = REQ_TYPE_ALLOC;
477 		resp->result = result;
478 		resp->id = entry->user_req.id;
479 		msg.num_fds = 0;
480 		msg.len_param = sizeof(*resp);
481 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
482 
483 		if (rte_mp_sendmsg(&msg))
484 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
485 
486 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
487 		free(entry->alloc_state.ms);
488 		free(entry);
489 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
490 			result == REQ_RESULT_FAIL) {
491 		struct rte_mp_msg rb_msg;
492 		struct malloc_mp_req *rb =
493 				(struct malloc_mp_req *)rb_msg.param;
494 		struct timespec ts;
495 		struct primary_alloc_req_state *state =
496 				&entry->alloc_state;
497 		int ret;
498 
499 		memset(&rb_msg, 0, sizeof(rb_msg));
500 
501 		/* we've failed to sync, so do a rollback */
502 		eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
503 				state->map_addr, state->map_len);
504 
505 		rollback_expand_heap(state->ms, state->ms_len, state->elem,
506 				state->map_addr, state->map_len);
507 
508 		/* send rollback request */
509 		rb_msg.num_fds = 0;
510 		rb_msg.len_param = sizeof(*rb);
511 		strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
512 
513 		ts.tv_nsec = 0;
514 		ts.tv_sec = MP_TIMEOUT_S;
515 
516 		/* sync requests carry no data */
517 		rb->t = REQ_TYPE_SYNC;
518 		rb->id = entry->user_req.id;
519 
520 		/* there may be stray timeout still waiting */
521 		do {
522 			ret = rte_mp_request_async(&rb_msg, &ts,
523 					handle_rollback_response);
524 		} while (ret != 0 && rte_errno == EEXIST);
525 		if (ret != 0) {
526 			RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
527 
528 			/* we couldn't send rollback request, but that's OK -
529 			 * secondary will time out, and memory has been removed
530 			 * from heap anyway.
531 			 */
532 			TAILQ_REMOVE(&mp_request_list.list, entry, next);
533 			free(state->ms);
534 			free(entry);
535 			goto fail;
536 		}
537 	} else {
538 		RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
539 		goto fail;
540 	}
541 
542 	pthread_mutex_unlock(&mp_request_list.lock);
543 	return 0;
544 fail:
545 	pthread_mutex_unlock(&mp_request_list.lock);
546 	return -1;
547 }
548 
549 static int
550 handle_rollback_response(const struct rte_mp_msg *request,
551 		const struct rte_mp_reply *reply __rte_unused)
552 {
553 	struct rte_mp_msg msg;
554 	struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
555 	const struct malloc_mp_req *mpreq =
556 			(const struct malloc_mp_req *)request->param;
557 	struct mp_request *entry;
558 
559 	/* lock the request */
560 	pthread_mutex_lock(&mp_request_list.lock);
561 
562 	memset(&msg, 0, sizeof(msg));
563 
564 	entry = find_request_by_id(mpreq->id);
565 	if (entry == NULL) {
566 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
567 		goto fail;
568 	}
569 
570 	if (entry->user_req.t != REQ_TYPE_ALLOC) {
571 		RTE_LOG(ERR, EAL, "Unexpected active request\n");
572 		goto fail;
573 	}
574 
575 	/* we don't care if rollback succeeded, request still failed */
576 	resp->t = REQ_TYPE_ALLOC;
577 	resp->result = REQ_RESULT_FAIL;
578 	resp->id = mpreq->id;
579 	msg.num_fds = 0;
580 	msg.len_param = sizeof(*resp);
581 	strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
582 
583 	if (rte_mp_sendmsg(&msg))
584 		RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
585 
586 	/* clean up */
587 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
588 	free(entry->alloc_state.ms);
589 	free(entry);
590 
591 	pthread_mutex_unlock(&mp_request_list.lock);
592 	return 0;
593 fail:
594 	pthread_mutex_unlock(&mp_request_list.lock);
595 	return -1;
596 }
597 
598 /* final stage of the request from secondary */
599 static int
600 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
601 {
602 	const struct malloc_mp_req *m =
603 			(const struct malloc_mp_req *)msg->param;
604 	struct mp_request *entry;
605 
606 	pthread_mutex_lock(&mp_request_list.lock);
607 
608 	entry = find_request_by_id(m->id);
609 	if (entry != NULL) {
610 		/* update request status */
611 		entry->user_req.result = m->result;
612 
613 		entry->state = REQ_STATE_COMPLETE;
614 
615 		/* trigger thread wakeup */
616 		pthread_cond_signal(&entry->cond);
617 	}
618 
619 	pthread_mutex_unlock(&mp_request_list.lock);
620 
621 	return 0;
622 }
623 
624 /* synchronously request memory map sync, this is only called whenever primary
625  * process initiates the allocation.
626  */
627 int
628 request_sync(void)
629 {
630 	struct rte_mp_msg msg;
631 	struct rte_mp_reply reply;
632 	struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
633 	struct timespec ts;
634 	int i, ret = -1;
635 
636 	memset(&msg, 0, sizeof(msg));
637 	memset(&reply, 0, sizeof(reply));
638 
639 	/* no need to create tailq entries as this is entirely synchronous */
640 
641 	msg.num_fds = 0;
642 	msg.len_param = sizeof(*req);
643 	strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
644 
645 	/* sync request carries no data */
646 	req->t = REQ_TYPE_SYNC;
647 	req->id = get_unique_id();
648 
649 	ts.tv_nsec = 0;
650 	ts.tv_sec = MP_TIMEOUT_S;
651 
652 	/* there may be stray timeout still waiting */
653 	do {
654 		ret = rte_mp_request_sync(&msg, &reply, &ts);
655 	} while (ret != 0 && rte_errno == EEXIST);
656 	if (ret != 0) {
657 		/* if IPC is unsupported, behave as if the call succeeded */
658 		if (rte_errno != ENOTSUP)
659 			RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
660 		else
661 			ret = 0;
662 		goto out;
663 	}
664 
665 	if (reply.nb_received != reply.nb_sent) {
666 		RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
667 		goto out;
668 	}
669 
670 	for (i = 0; i < reply.nb_received; i++) {
671 		struct malloc_mp_req *resp =
672 				(struct malloc_mp_req *)reply.msgs[i].param;
673 		if (resp->t != REQ_TYPE_SYNC) {
674 			RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
675 			goto out;
676 		}
677 		if (resp->id != req->id) {
678 			RTE_LOG(ERR, EAL, "Wrong request ID\n");
679 			goto out;
680 		}
681 		if (resp->result != REQ_RESULT_SUCCESS) {
682 			RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
683 			goto out;
684 		}
685 	}
686 
687 	ret = 0;
688 out:
689 	free(reply.msgs);
690 	return ret;
691 }
692 
693 /* this is a synchronous wrapper around a bunch of asynchronous requests to
694  * primary process. this will initiate a request and wait until responses come.
695  */
696 int
697 request_to_primary(struct malloc_mp_req *user_req)
698 {
699 	struct rte_mp_msg msg;
700 	struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
701 	struct mp_request *entry;
702 	struct timespec ts;
703 	struct timeval now;
704 	int ret;
705 
706 	memset(&msg, 0, sizeof(msg));
707 	memset(&ts, 0, sizeof(ts));
708 
709 	pthread_mutex_lock(&mp_request_list.lock);
710 
711 	entry = malloc(sizeof(*entry));
712 	if (entry == NULL) {
713 		RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
714 		goto fail;
715 	}
716 
717 	memset(entry, 0, sizeof(*entry));
718 
719 	if (gettimeofday(&now, NULL) < 0) {
720 		RTE_LOG(ERR, EAL, "Cannot get current time\n");
721 		goto fail;
722 	}
723 
724 	ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
725 	ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
726 			(now.tv_usec * 1000) / 1000000000;
727 
728 	/* initialize the request */
729 	pthread_cond_init(&entry->cond, NULL);
730 
731 	msg.num_fds = 0;
732 	msg.len_param = sizeof(*msg_req);
733 	strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
734 
735 	/* (attempt to) get a unique id */
736 	user_req->id = get_unique_id();
737 
738 	/* copy contents of user request into the message */
739 	memcpy(msg_req, user_req, sizeof(*msg_req));
740 
741 	if (rte_mp_sendmsg(&msg)) {
742 		RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
743 		goto fail;
744 	}
745 
746 	/* copy contents of user request into active request */
747 	memcpy(&entry->user_req, user_req, sizeof(*user_req));
748 
749 	/* mark request as in progress */
750 	entry->state = REQ_STATE_ACTIVE;
751 
752 	TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
753 
754 	/* finally, wait on timeout */
755 	do {
756 		ret = pthread_cond_timedwait(&entry->cond,
757 				&mp_request_list.lock, &ts);
758 	} while (ret != 0 && ret != ETIMEDOUT);
759 
760 	if (entry->state != REQ_STATE_COMPLETE) {
761 		RTE_LOG(ERR, EAL, "Request timed out\n");
762 		ret = -1;
763 	} else {
764 		ret = 0;
765 		user_req->result = entry->user_req.result;
766 	}
767 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
768 	free(entry);
769 
770 	pthread_mutex_unlock(&mp_request_list.lock);
771 	return ret;
772 fail:
773 	pthread_mutex_unlock(&mp_request_list.lock);
774 	free(entry);
775 	return -1;
776 }
777 
778 int
779 register_mp_requests(void)
780 {
781 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
782 		/* it's OK for primary to not support IPC */
783 		if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
784 				rte_errno != ENOTSUP) {
785 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
786 				MP_ACTION_REQUEST);
787 			return -1;
788 		}
789 	} else {
790 		if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
791 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
792 				MP_ACTION_SYNC);
793 			return -1;
794 		}
795 		if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
796 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
797 				MP_ACTION_SYNC);
798 			return -1;
799 		}
800 		if (rte_mp_action_register(MP_ACTION_RESPONSE,
801 				handle_response)) {
802 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
803 				MP_ACTION_RESPONSE);
804 			return -1;
805 		}
806 	}
807 	return 0;
808 }
809