xref: /dpdk/lib/eal/common/malloc_mp.c (revision 30a1de105a5f40d77b344a891c4a68f79e815c43)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4 
5 #include <string.h>
6 #include <sys/time.h>
7 
8 #include <rte_errno.h>
9 #include <rte_string_fns.h>
10 
11 #include "eal_memalloc.h"
12 #include "eal_memcfg.h"
13 #include "eal_private.h"
14 
15 #include "malloc_elem.h"
16 #include "malloc_mp.h"
17 
18 #define MP_ACTION_SYNC "mp_malloc_sync"
19 /**< request sent by primary process to notify of changes in memory map */
20 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
21 /**< request sent by primary process to notify of changes in memory map. this is
22  * essentially a regular sync request, but we cannot send sync requests while
23  * another one is in progress, and we might have to - therefore, we do this as
24  * a separate callback.
25  */
26 #define MP_ACTION_REQUEST "mp_malloc_request"
27 /**< request sent by secondary process to ask for allocation/deallocation */
28 #define MP_ACTION_RESPONSE "mp_malloc_response"
29 /**< response sent to secondary process to indicate result of request */
30 
31 /* forward declarations */
32 static int
33 handle_sync_response(const struct rte_mp_msg *request,
34 		const struct rte_mp_reply *reply);
35 static int
36 handle_rollback_response(const struct rte_mp_msg *request,
37 		const struct rte_mp_reply *reply);
38 
39 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
40 
41 /* when we're allocating, we need to store some state to ensure that we can
42  * roll back later
43  */
44 struct primary_alloc_req_state {
45 	struct malloc_heap *heap;
46 	struct rte_memseg **ms;
47 	int ms_len;
48 	struct malloc_elem *elem;
49 	void *map_addr;
50 	size_t map_len;
51 };
52 
53 enum req_state {
54 	REQ_STATE_INACTIVE = 0,
55 	REQ_STATE_ACTIVE,
56 	REQ_STATE_COMPLETE
57 };
58 
59 struct mp_request {
60 	TAILQ_ENTRY(mp_request) next;
61 	struct malloc_mp_req user_req; /**< contents of request */
62 	pthread_cond_t cond; /**< variable we use to time out on this request */
63 	enum req_state state; /**< indicate status of this request */
64 	struct primary_alloc_req_state alloc_state;
65 };
66 
67 /*
68  * We could've used just a single request, but it may be possible for
69  * secondaries to timeout earlier than the primary, and send a new request while
70  * primary is still expecting replies to the old one. Therefore, each new
71  * request will get assigned a new ID, which is how we will distinguish between
72  * expected and unexpected messages.
73  */
74 TAILQ_HEAD(mp_request_list, mp_request);
75 static struct {
76 	struct mp_request_list list;
77 	pthread_mutex_t lock;
78 } mp_request_list = {
79 	.list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
80 	.lock = PTHREAD_MUTEX_INITIALIZER
81 };
82 
83 /**
84  * General workflow is the following:
85  *
86  * Allocation:
87  * S: send request to primary
88  * P: attempt to allocate memory
89  *    if failed, sendmsg failure
90  *    if success, send sync request
91  * S: if received msg of failure, quit
92  *    if received sync request, synchronize memory map and reply with result
93  * P: if received sync request result
94  *    if success, sendmsg success
95  *    if failure, roll back allocation and send a rollback request
96  * S: if received msg of success, quit
97  *    if received rollback request, synchronize memory map and reply with result
98  * P: if received sync request result
99  *    sendmsg sync request result
100  * S: if received msg, quit
101  *
102  * Aside from timeouts, there are three points where we can quit:
103  *  - if allocation failed straight away
104  *  - if allocation and sync request succeeded
105  *  - if allocation succeeded, sync request failed, allocation rolled back and
106  *    rollback request received (irrespective of whether it succeeded or failed)
107  *
108  * Deallocation:
109  * S: send request to primary
110  * P: attempt to deallocate memory
111  *    if failed, sendmsg failure
112  *    if success, send sync request
113  * S: if received msg of failure, quit
114  *    if received sync request, synchronize memory map and reply with result
115  * P: if received sync request result
116  *    sendmsg sync request result
117  * S: if received msg, quit
118  *
119  * There is no "rollback" from deallocation, as it's safe to have some memory
120  * mapped in some processes - it's absent from the heap, so it won't get used.
121  */
122 
123 static struct mp_request *
124 find_request_by_id(uint64_t id)
125 {
126 	struct mp_request *req;
127 	TAILQ_FOREACH(req, &mp_request_list.list, next) {
128 		if (req->user_req.id == id)
129 			break;
130 	}
131 	return req;
132 }
133 
134 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
135 static uint64_t
136 get_unique_id(void)
137 {
138 	uint64_t id;
139 	do {
140 		id = rte_rand();
141 	} while (find_request_by_id(id) != NULL);
142 	return id;
143 }
144 
145 /* secondary will respond to sync requests thusly */
146 static int
147 handle_sync(const struct rte_mp_msg *msg, const void *peer)
148 {
149 	struct rte_mp_msg reply;
150 	const struct malloc_mp_req *req =
151 			(const struct malloc_mp_req *)msg->param;
152 	struct malloc_mp_req *resp =
153 			(struct malloc_mp_req *)reply.param;
154 	int ret;
155 
156 	if (req->t != REQ_TYPE_SYNC) {
157 		RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
158 		return -1;
159 	}
160 
161 	memset(&reply, 0, sizeof(reply));
162 
163 	reply.num_fds = 0;
164 	strlcpy(reply.name, msg->name, sizeof(reply.name));
165 	reply.len_param = sizeof(*resp);
166 
167 	ret = eal_memalloc_sync_with_primary();
168 
169 	resp->t = REQ_TYPE_SYNC;
170 	resp->id = req->id;
171 	resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
172 
173 	return rte_mp_reply(&reply, peer);
174 }
175 
176 static int
177 handle_free_request(const struct malloc_mp_req *m)
178 {
179 	const struct rte_memseg_list *msl;
180 	void *start, *end;
181 	size_t len;
182 
183 	len = m->free_req.len;
184 	start = m->free_req.addr;
185 	end = RTE_PTR_ADD(start, len - 1);
186 
187 	/* check if the requested memory actually exists */
188 	msl = rte_mem_virt2memseg_list(start);
189 	if (msl == NULL) {
190 		RTE_LOG(ERR, EAL, "Requested to free unknown memory\n");
191 		return -1;
192 	}
193 
194 	/* check if end is within the same memory region */
195 	if (rte_mem_virt2memseg_list(end) != msl) {
196 		RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n");
197 		return -1;
198 	}
199 
200 	/* we're supposed to only free memory that's not external */
201 	if (msl->external) {
202 		RTE_LOG(ERR, EAL, "Requested to free external memory\n");
203 		return -1;
204 	}
205 
206 	/* now that we've validated the request, announce it */
207 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
208 			m->free_req.addr, m->free_req.len);
209 
210 	/* now, do the actual freeing */
211 	return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
212 }
213 
214 static int
215 handle_alloc_request(const struct malloc_mp_req *m,
216 		struct mp_request *req)
217 {
218 	struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
219 	const struct malloc_req_alloc *ar = &m->alloc_req;
220 	struct malloc_heap *heap;
221 	struct malloc_elem *elem;
222 	struct rte_memseg **ms;
223 	size_t alloc_sz;
224 	int n_segs;
225 	void *map_addr;
226 
227 	/* this is checked by the API, but we need to prevent divide by zero */
228 	if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
229 		RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n");
230 		return -1;
231 	}
232 
233 	/* heap idx is index into the heap array, not socket ID */
234 	if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
235 		RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n");
236 		return -1;
237 	}
238 
239 	heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
240 
241 	/*
242 	 * for allocations, we must only use internal heaps, but since the
243 	 * rte_malloc_heap_socket_is_external() is thread-safe and we're already
244 	 * read-locked, we'll have to take advantage of the fact that internal
245 	 * socket ID's are always lower than RTE_MAX_NUMA_NODES.
246 	 */
247 	if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
248 		RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n");
249 		return -1;
250 	}
251 
252 	alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
253 			MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
254 	n_segs = alloc_sz / ar->page_sz;
255 
256 	/* we can't know in advance how many pages we'll need, so we malloc */
257 	ms = malloc(sizeof(*ms) * n_segs);
258 	if (ms == NULL) {
259 		RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
260 		return -1;
261 	}
262 	memset(ms, 0, sizeof(*ms) * n_segs);
263 
264 	elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
265 			ar->flags, ar->align, ar->bound, ar->contig, ms,
266 			n_segs);
267 
268 	if (elem == NULL)
269 		goto fail;
270 
271 	map_addr = ms[0]->addr;
272 
273 	eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
274 
275 	/* we have succeeded in allocating memory, but we still need to sync
276 	 * with other processes. however, since DPDK IPC is single-threaded, we
277 	 * send an asynchronous request and exit this callback.
278 	 */
279 
280 	req->alloc_state.ms = ms;
281 	req->alloc_state.ms_len = n_segs;
282 	req->alloc_state.map_addr = map_addr;
283 	req->alloc_state.map_len = alloc_sz;
284 	req->alloc_state.elem = elem;
285 	req->alloc_state.heap = heap;
286 
287 	return 0;
288 fail:
289 	free(ms);
290 	return -1;
291 }
292 
293 /* first stage of primary handling requests from secondary */
294 static int
295 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
296 {
297 	const struct malloc_mp_req *m =
298 			(const struct malloc_mp_req *)msg->param;
299 	struct mp_request *entry;
300 	int ret;
301 
302 	/* lock access to request */
303 	pthread_mutex_lock(&mp_request_list.lock);
304 
305 	/* make sure it's not a dupe */
306 	entry = find_request_by_id(m->id);
307 	if (entry != NULL) {
308 		RTE_LOG(ERR, EAL, "Duplicate request id\n");
309 		goto fail;
310 	}
311 
312 	entry = malloc(sizeof(*entry));
313 	if (entry == NULL) {
314 		RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
315 		goto fail;
316 	}
317 
318 	/* erase all data */
319 	memset(entry, 0, sizeof(*entry));
320 
321 	if (m->t == REQ_TYPE_ALLOC) {
322 		ret = handle_alloc_request(m, entry);
323 	} else if (m->t == REQ_TYPE_FREE) {
324 		ret = handle_free_request(m);
325 	} else {
326 		RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
327 		goto fail;
328 	}
329 
330 	if (ret != 0) {
331 		struct rte_mp_msg resp_msg;
332 		struct malloc_mp_req *resp =
333 				(struct malloc_mp_req *)resp_msg.param;
334 
335 		/* send failure message straight away */
336 		resp_msg.num_fds = 0;
337 		resp_msg.len_param = sizeof(*resp);
338 		strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
339 				sizeof(resp_msg.name));
340 
341 		resp->t = m->t;
342 		resp->result = REQ_RESULT_FAIL;
343 		resp->id = m->id;
344 
345 		if (rte_mp_sendmsg(&resp_msg)) {
346 			RTE_LOG(ERR, EAL, "Couldn't send response\n");
347 			goto fail;
348 		}
349 		/* we did not modify the request */
350 		free(entry);
351 	} else {
352 		struct rte_mp_msg sr_msg;
353 		struct malloc_mp_req *sr =
354 				(struct malloc_mp_req *)sr_msg.param;
355 		struct timespec ts;
356 
357 		memset(&sr_msg, 0, sizeof(sr_msg));
358 
359 		/* we can do something, so send sync request asynchronously */
360 		sr_msg.num_fds = 0;
361 		sr_msg.len_param = sizeof(*sr);
362 		strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
363 
364 		ts.tv_nsec = 0;
365 		ts.tv_sec = MP_TIMEOUT_S;
366 
367 		/* sync requests carry no data */
368 		sr->t = REQ_TYPE_SYNC;
369 		sr->id = m->id;
370 
371 		/* there may be stray timeout still waiting */
372 		do {
373 			ret = rte_mp_request_async(&sr_msg, &ts,
374 					handle_sync_response);
375 		} while (ret != 0 && rte_errno == EEXIST);
376 		if (ret != 0) {
377 			RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
378 			if (m->t == REQ_TYPE_ALLOC)
379 				free(entry->alloc_state.ms);
380 			goto fail;
381 		}
382 
383 		/* mark request as in progress */
384 		memcpy(&entry->user_req, m, sizeof(*m));
385 		entry->state = REQ_STATE_ACTIVE;
386 
387 		TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
388 	}
389 	pthread_mutex_unlock(&mp_request_list.lock);
390 	return 0;
391 fail:
392 	pthread_mutex_unlock(&mp_request_list.lock);
393 	free(entry);
394 	return -1;
395 }
396 
397 /* callback for asynchronous sync requests for primary. this will either do a
398  * sendmsg with results, or trigger rollback request.
399  */
400 static int
401 handle_sync_response(const struct rte_mp_msg *request,
402 		const struct rte_mp_reply *reply)
403 {
404 	enum malloc_req_result result;
405 	struct mp_request *entry;
406 	const struct malloc_mp_req *mpreq =
407 			(const struct malloc_mp_req *)request->param;
408 	int i;
409 
410 	/* lock the request */
411 	pthread_mutex_lock(&mp_request_list.lock);
412 
413 	entry = find_request_by_id(mpreq->id);
414 	if (entry == NULL) {
415 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
416 		goto fail;
417 	}
418 
419 	result = REQ_RESULT_SUCCESS;
420 
421 	if (reply->nb_received != reply->nb_sent)
422 		result = REQ_RESULT_FAIL;
423 
424 	for (i = 0; i < reply->nb_received; i++) {
425 		struct malloc_mp_req *resp =
426 				(struct malloc_mp_req *)reply->msgs[i].param;
427 
428 		if (resp->t != REQ_TYPE_SYNC) {
429 			RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
430 			result = REQ_RESULT_FAIL;
431 			break;
432 		}
433 		if (resp->id != entry->user_req.id) {
434 			RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
435 			result = REQ_RESULT_FAIL;
436 			break;
437 		}
438 		if (resp->result == REQ_RESULT_FAIL) {
439 			result = REQ_RESULT_FAIL;
440 			break;
441 		}
442 	}
443 
444 	if (entry->user_req.t == REQ_TYPE_FREE) {
445 		struct rte_mp_msg msg;
446 		struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
447 
448 		memset(&msg, 0, sizeof(msg));
449 
450 		/* this is a free request, just sendmsg result */
451 		resp->t = REQ_TYPE_FREE;
452 		resp->result = result;
453 		resp->id = entry->user_req.id;
454 		msg.num_fds = 0;
455 		msg.len_param = sizeof(*resp);
456 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
457 
458 		if (rte_mp_sendmsg(&msg))
459 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
460 
461 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
462 		free(entry);
463 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
464 			result == REQ_RESULT_SUCCESS) {
465 		struct malloc_heap *heap = entry->alloc_state.heap;
466 		struct rte_mp_msg msg;
467 		struct malloc_mp_req *resp =
468 				(struct malloc_mp_req *)msg.param;
469 
470 		memset(&msg, 0, sizeof(msg));
471 
472 		heap->total_size += entry->alloc_state.map_len;
473 
474 		/* result is success, so just notify secondary about this */
475 		resp->t = REQ_TYPE_ALLOC;
476 		resp->result = result;
477 		resp->id = entry->user_req.id;
478 		msg.num_fds = 0;
479 		msg.len_param = sizeof(*resp);
480 		strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
481 
482 		if (rte_mp_sendmsg(&msg))
483 			RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
484 
485 		TAILQ_REMOVE(&mp_request_list.list, entry, next);
486 		free(entry->alloc_state.ms);
487 		free(entry);
488 	} else if (entry->user_req.t == REQ_TYPE_ALLOC &&
489 			result == REQ_RESULT_FAIL) {
490 		struct rte_mp_msg rb_msg;
491 		struct malloc_mp_req *rb =
492 				(struct malloc_mp_req *)rb_msg.param;
493 		struct timespec ts;
494 		struct primary_alloc_req_state *state =
495 				&entry->alloc_state;
496 		int ret;
497 
498 		memset(&rb_msg, 0, sizeof(rb_msg));
499 
500 		/* we've failed to sync, so do a rollback */
501 		eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
502 				state->map_addr, state->map_len);
503 
504 		rollback_expand_heap(state->ms, state->ms_len, state->elem,
505 				state->map_addr, state->map_len);
506 
507 		/* send rollback request */
508 		rb_msg.num_fds = 0;
509 		rb_msg.len_param = sizeof(*rb);
510 		strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
511 
512 		ts.tv_nsec = 0;
513 		ts.tv_sec = MP_TIMEOUT_S;
514 
515 		/* sync requests carry no data */
516 		rb->t = REQ_TYPE_SYNC;
517 		rb->id = entry->user_req.id;
518 
519 		/* there may be stray timeout still waiting */
520 		do {
521 			ret = rte_mp_request_async(&rb_msg, &ts,
522 					handle_rollback_response);
523 		} while (ret != 0 && rte_errno == EEXIST);
524 		if (ret != 0) {
525 			RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
526 
527 			/* we couldn't send rollback request, but that's OK -
528 			 * secondary will time out, and memory has been removed
529 			 * from heap anyway.
530 			 */
531 			TAILQ_REMOVE(&mp_request_list.list, entry, next);
532 			free(state->ms);
533 			free(entry);
534 			goto fail;
535 		}
536 	} else {
537 		RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
538 		goto fail;
539 	}
540 
541 	pthread_mutex_unlock(&mp_request_list.lock);
542 	return 0;
543 fail:
544 	pthread_mutex_unlock(&mp_request_list.lock);
545 	return -1;
546 }
547 
548 static int
549 handle_rollback_response(const struct rte_mp_msg *request,
550 		const struct rte_mp_reply *reply __rte_unused)
551 {
552 	struct rte_mp_msg msg;
553 	struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
554 	const struct malloc_mp_req *mpreq =
555 			(const struct malloc_mp_req *)request->param;
556 	struct mp_request *entry;
557 
558 	/* lock the request */
559 	pthread_mutex_lock(&mp_request_list.lock);
560 
561 	memset(&msg, 0, sizeof(msg));
562 
563 	entry = find_request_by_id(mpreq->id);
564 	if (entry == NULL) {
565 		RTE_LOG(ERR, EAL, "Wrong request ID\n");
566 		goto fail;
567 	}
568 
569 	if (entry->user_req.t != REQ_TYPE_ALLOC) {
570 		RTE_LOG(ERR, EAL, "Unexpected active request\n");
571 		goto fail;
572 	}
573 
574 	/* we don't care if rollback succeeded, request still failed */
575 	resp->t = REQ_TYPE_ALLOC;
576 	resp->result = REQ_RESULT_FAIL;
577 	resp->id = mpreq->id;
578 	msg.num_fds = 0;
579 	msg.len_param = sizeof(*resp);
580 	strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
581 
582 	if (rte_mp_sendmsg(&msg))
583 		RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
584 
585 	/* clean up */
586 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
587 	free(entry->alloc_state.ms);
588 	free(entry);
589 
590 	pthread_mutex_unlock(&mp_request_list.lock);
591 	return 0;
592 fail:
593 	pthread_mutex_unlock(&mp_request_list.lock);
594 	return -1;
595 }
596 
597 /* final stage of the request from secondary */
598 static int
599 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
600 {
601 	const struct malloc_mp_req *m =
602 			(const struct malloc_mp_req *)msg->param;
603 	struct mp_request *entry;
604 
605 	pthread_mutex_lock(&mp_request_list.lock);
606 
607 	entry = find_request_by_id(m->id);
608 	if (entry != NULL) {
609 		/* update request status */
610 		entry->user_req.result = m->result;
611 
612 		entry->state = REQ_STATE_COMPLETE;
613 
614 		/* trigger thread wakeup */
615 		pthread_cond_signal(&entry->cond);
616 	}
617 
618 	pthread_mutex_unlock(&mp_request_list.lock);
619 
620 	return 0;
621 }
622 
623 /* synchronously request memory map sync, this is only called whenever primary
624  * process initiates the allocation.
625  */
626 int
627 request_sync(void)
628 {
629 	struct rte_mp_msg msg;
630 	struct rte_mp_reply reply;
631 	struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
632 	struct timespec ts;
633 	int i, ret = -1;
634 
635 	memset(&msg, 0, sizeof(msg));
636 	memset(&reply, 0, sizeof(reply));
637 
638 	/* no need to create tailq entries as this is entirely synchronous */
639 
640 	msg.num_fds = 0;
641 	msg.len_param = sizeof(*req);
642 	strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
643 
644 	/* sync request carries no data */
645 	req->t = REQ_TYPE_SYNC;
646 	req->id = get_unique_id();
647 
648 	ts.tv_nsec = 0;
649 	ts.tv_sec = MP_TIMEOUT_S;
650 
651 	/* there may be stray timeout still waiting */
652 	do {
653 		ret = rte_mp_request_sync(&msg, &reply, &ts);
654 	} while (ret != 0 && rte_errno == EEXIST);
655 	if (ret != 0) {
656 		/* if IPC is unsupported, behave as if the call succeeded */
657 		if (rte_errno != ENOTSUP)
658 			RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
659 		else
660 			ret = 0;
661 		goto out;
662 	}
663 
664 	if (reply.nb_received != reply.nb_sent) {
665 		RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
666 		goto out;
667 	}
668 
669 	for (i = 0; i < reply.nb_received; i++) {
670 		struct malloc_mp_req *resp =
671 				(struct malloc_mp_req *)reply.msgs[i].param;
672 		if (resp->t != REQ_TYPE_SYNC) {
673 			RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
674 			goto out;
675 		}
676 		if (resp->id != req->id) {
677 			RTE_LOG(ERR, EAL, "Wrong request ID\n");
678 			goto out;
679 		}
680 		if (resp->result != REQ_RESULT_SUCCESS) {
681 			RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
682 			goto out;
683 		}
684 	}
685 
686 	ret = 0;
687 out:
688 	free(reply.msgs);
689 	return ret;
690 }
691 
692 /* this is a synchronous wrapper around a bunch of asynchronous requests to
693  * primary process. this will initiate a request and wait until responses come.
694  */
695 int
696 request_to_primary(struct malloc_mp_req *user_req)
697 {
698 	struct rte_mp_msg msg;
699 	struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
700 	struct mp_request *entry;
701 	struct timespec ts;
702 	struct timeval now;
703 	int ret;
704 
705 	memset(&msg, 0, sizeof(msg));
706 	memset(&ts, 0, sizeof(ts));
707 
708 	pthread_mutex_lock(&mp_request_list.lock);
709 
710 	entry = malloc(sizeof(*entry));
711 	if (entry == NULL) {
712 		RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
713 		goto fail;
714 	}
715 
716 	memset(entry, 0, sizeof(*entry));
717 
718 	if (gettimeofday(&now, NULL) < 0) {
719 		RTE_LOG(ERR, EAL, "Cannot get current time\n");
720 		goto fail;
721 	}
722 
723 	ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
724 	ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
725 			(now.tv_usec * 1000) / 1000000000;
726 
727 	/* initialize the request */
728 	pthread_cond_init(&entry->cond, NULL);
729 
730 	msg.num_fds = 0;
731 	msg.len_param = sizeof(*msg_req);
732 	strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
733 
734 	/* (attempt to) get a unique id */
735 	user_req->id = get_unique_id();
736 
737 	/* copy contents of user request into the message */
738 	memcpy(msg_req, user_req, sizeof(*msg_req));
739 
740 	if (rte_mp_sendmsg(&msg)) {
741 		RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
742 		goto fail;
743 	}
744 
745 	/* copy contents of user request into active request */
746 	memcpy(&entry->user_req, user_req, sizeof(*user_req));
747 
748 	/* mark request as in progress */
749 	entry->state = REQ_STATE_ACTIVE;
750 
751 	TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
752 
753 	/* finally, wait on timeout */
754 	do {
755 		ret = pthread_cond_timedwait(&entry->cond,
756 				&mp_request_list.lock, &ts);
757 	} while (ret != 0 && ret != ETIMEDOUT);
758 
759 	if (entry->state != REQ_STATE_COMPLETE) {
760 		RTE_LOG(ERR, EAL, "Request timed out\n");
761 		ret = -1;
762 	} else {
763 		ret = 0;
764 		user_req->result = entry->user_req.result;
765 	}
766 	TAILQ_REMOVE(&mp_request_list.list, entry, next);
767 	free(entry);
768 
769 	pthread_mutex_unlock(&mp_request_list.lock);
770 	return ret;
771 fail:
772 	pthread_mutex_unlock(&mp_request_list.lock);
773 	free(entry);
774 	return -1;
775 }
776 
777 int
778 register_mp_requests(void)
779 {
780 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
781 		/* it's OK for primary to not support IPC */
782 		if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
783 				rte_errno != ENOTSUP) {
784 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
785 				MP_ACTION_REQUEST);
786 			return -1;
787 		}
788 	} else {
789 		if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
790 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
791 				MP_ACTION_SYNC);
792 			return -1;
793 		}
794 		if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
795 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
796 				MP_ACTION_SYNC);
797 			return -1;
798 		}
799 		if (rte_mp_action_register(MP_ACTION_RESPONSE,
800 				handle_response)) {
801 			RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
802 				MP_ACTION_RESPONSE);
803 			return -1;
804 		}
805 	}
806 	return 0;
807 }
808 
809 void
810 unregister_mp_requests(void)
811 {
812 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
813 		rte_mp_action_unregister(MP_ACTION_REQUEST);
814 	} else {
815 		rte_mp_action_unregister(MP_ACTION_SYNC);
816 		rte_mp_action_unregister(MP_ACTION_ROLLBACK);
817 		rte_mp_action_unregister(MP_ACTION_RESPONSE);
818 	}
819 }
820