xref: /dpdk/drivers/mempool/bucket/rte_mempool_bucket.c (revision cb77b060ebecb9a14c5f7c7622535a1f6e71e2fa)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017-2018 Solarflare Communications Inc.
4  * All rights reserved.
5  *
6  * This software was jointly developed between OKTET Labs (under contract
7  * for Solarflare) and Solarflare Communications, Inc.
8  */
9 
10 #include <stdbool.h>
11 #include <stdio.h>
12 #include <string.h>
13 
14 #include <rte_errno.h>
15 #include <rte_ring.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
18 
19 /*
20  * The general idea of the bucket mempool driver is as follows.
21  * We keep track of physically contiguous groups (buckets) of objects
22  * of a certain size. Every such a group has a counter that is
23  * incremented every time an object from that group is enqueued.
24  * Until the bucket is full, no objects from it are eligible for allocation.
25  * If a request is made to dequeue a multiply of bucket size, it is
26  * satisfied by returning the whole buckets, instead of separate objects.
27  */
28 
29 
30 struct bucket_header {
31 	unsigned int lcore_id;
32 	uint8_t fill_cnt;
33 };
34 
35 struct bucket_stack {
36 	unsigned int top;
37 	unsigned int limit;
38 	void *objects[];
39 };
40 
41 struct bucket_data {
42 	unsigned int header_size;
43 	unsigned int total_elt_size;
44 	unsigned int obj_per_bucket;
45 	unsigned int bucket_stack_thresh;
46 	uintptr_t bucket_page_mask;
47 	struct rte_ring *shared_bucket_ring;
48 	struct bucket_stack *buckets[RTE_MAX_LCORE];
49 	/*
50 	 * Multi-producer single-consumer ring to hold objects that are
51 	 * returned to the mempool at a different lcore than initially
52 	 * dequeued
53 	 */
54 	struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
55 	struct rte_ring *shared_orphan_ring;
56 	struct rte_mempool *pool;
57 	unsigned int bucket_mem_size;
58 	void *lcore_callback_handle;
59 };
60 
61 static struct bucket_stack *
bucket_stack_create(const struct rte_mempool * mp,unsigned int n_elts)62 bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
63 {
64 	struct bucket_stack *stack;
65 
66 	stack = rte_zmalloc_socket("bucket_stack",
67 				   sizeof(struct bucket_stack) +
68 				   n_elts * sizeof(void *),
69 				   RTE_CACHE_LINE_SIZE,
70 				   mp->socket_id);
71 	if (stack == NULL)
72 		return NULL;
73 	stack->limit = n_elts;
74 	stack->top = 0;
75 
76 	return stack;
77 }
78 
79 static void
bucket_stack_push(struct bucket_stack * stack,void * obj)80 bucket_stack_push(struct bucket_stack *stack, void *obj)
81 {
82 	RTE_ASSERT(stack->top < stack->limit);
83 	stack->objects[stack->top++] = obj;
84 }
85 
86 static void *
bucket_stack_pop_unsafe(struct bucket_stack * stack)87 bucket_stack_pop_unsafe(struct bucket_stack *stack)
88 {
89 	RTE_ASSERT(stack->top > 0);
90 	return stack->objects[--stack->top];
91 }
92 
93 static void *
bucket_stack_pop(struct bucket_stack * stack)94 bucket_stack_pop(struct bucket_stack *stack)
95 {
96 	if (stack->top == 0)
97 		return NULL;
98 	return bucket_stack_pop_unsafe(stack);
99 }
100 
101 static int
bucket_enqueue_single(struct bucket_data * bd,void * obj)102 bucket_enqueue_single(struct bucket_data *bd, void *obj)
103 {
104 	int rc = 0;
105 	uintptr_t addr = (uintptr_t)obj;
106 	struct bucket_header *hdr;
107 	unsigned int lcore_id = rte_lcore_id();
108 
109 	addr &= bd->bucket_page_mask;
110 	hdr = (struct bucket_header *)addr;
111 
112 	if (likely(hdr->lcore_id == lcore_id)) {
113 		if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
114 			hdr->fill_cnt++;
115 		} else {
116 			hdr->fill_cnt = 0;
117 			/* Stack is big enough to put all buckets */
118 			bucket_stack_push(bd->buckets[lcore_id], hdr);
119 		}
120 	} else if (hdr->lcore_id != LCORE_ID_ANY) {
121 		struct rte_ring *adopt_ring =
122 			bd->adoption_buffer_rings[hdr->lcore_id];
123 
124 		rc = rte_ring_enqueue(adopt_ring, obj);
125 		/* Ring is big enough to put all objects */
126 		RTE_ASSERT(rc == 0);
127 	} else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
128 		hdr->fill_cnt++;
129 	} else {
130 		hdr->fill_cnt = 0;
131 		rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
132 		/* Ring is big enough to put all buckets */
133 		RTE_ASSERT(rc == 0);
134 	}
135 
136 	return rc;
137 }
138 
139 static int
bucket_enqueue(struct rte_mempool * mp,void * const * obj_table,unsigned int n)140 bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
141 	       unsigned int n)
142 {
143 	struct bucket_data *bd = mp->pool_data;
144 	struct bucket_stack *local_stack = bd->buckets[rte_lcore_id()];
145 	unsigned int i;
146 	int rc = 0;
147 
148 	for (i = 0; i < n; i++) {
149 		rc = bucket_enqueue_single(bd, obj_table[i]);
150 		RTE_ASSERT(rc == 0);
151 	}
152 	if (local_stack->top > bd->bucket_stack_thresh) {
153 		rte_ring_enqueue_bulk(bd->shared_bucket_ring,
154 				      &local_stack->objects
155 				      [bd->bucket_stack_thresh],
156 				      local_stack->top -
157 				      bd->bucket_stack_thresh,
158 				      NULL);
159 	    local_stack->top = bd->bucket_stack_thresh;
160 	}
161 	return rc;
162 }
163 
164 static void **
bucket_fill_obj_table(const struct bucket_data * bd,void ** pstart,void ** obj_table,unsigned int n)165 bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
166 		      void **obj_table, unsigned int n)
167 {
168 	unsigned int i;
169 	uint8_t *objptr = *pstart;
170 
171 	for (objptr += bd->header_size, i = 0; i < n;
172 	     i++, objptr += bd->total_elt_size)
173 		*obj_table++ = objptr;
174 	*pstart = objptr;
175 	return obj_table;
176 }
177 
178 static int
bucket_dequeue_orphans(struct bucket_data * bd,void ** obj_table,unsigned int n_orphans)179 bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
180 		       unsigned int n_orphans)
181 {
182 	unsigned int i;
183 	int rc;
184 	uint8_t *objptr;
185 
186 	rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
187 				   n_orphans, NULL);
188 	if (unlikely(rc != (int)n_orphans)) {
189 		struct bucket_header *hdr;
190 
191 		objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
192 		hdr = (struct bucket_header *)objptr;
193 
194 		if (objptr == NULL) {
195 			rc = rte_ring_dequeue(bd->shared_bucket_ring,
196 					      (void **)&objptr);
197 			if (rc != 0) {
198 				rte_errno = ENOBUFS;
199 				return -rte_errno;
200 			}
201 			hdr = (struct bucket_header *)objptr;
202 			hdr->lcore_id = rte_lcore_id();
203 		}
204 		hdr->fill_cnt = 0;
205 		bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
206 				      n_orphans);
207 		for (i = n_orphans; i < bd->obj_per_bucket; i++,
208 			     objptr += bd->total_elt_size) {
209 			rc = rte_ring_enqueue(bd->shared_orphan_ring,
210 					      objptr);
211 			if (rc != 0) {
212 				RTE_ASSERT(0);
213 				rte_errno = -rc;
214 				return rc;
215 			}
216 		}
217 	}
218 
219 	return 0;
220 }
221 
222 static int
bucket_dequeue_buckets(struct bucket_data * bd,void ** obj_table,unsigned int n_buckets)223 bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
224 		       unsigned int n_buckets)
225 {
226 	struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
227 	unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
228 	void **obj_table_base = obj_table;
229 
230 	n_buckets -= n_buckets_from_stack;
231 	while (n_buckets_from_stack-- > 0) {
232 		void *obj = bucket_stack_pop_unsafe(cur_stack);
233 
234 		obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
235 						  bd->obj_per_bucket);
236 	}
237 	while (n_buckets-- > 0) {
238 		struct bucket_header *hdr;
239 
240 		if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
241 					      (void **)&hdr) != 0)) {
242 			/*
243 			 * Return the already-dequeued buffers
244 			 * back to the mempool
245 			 */
246 			bucket_enqueue(bd->pool, obj_table_base,
247 				       obj_table - obj_table_base);
248 			rte_errno = ENOBUFS;
249 			return -rte_errno;
250 		}
251 		hdr->lcore_id = rte_lcore_id();
252 		obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
253 						  obj_table,
254 						  bd->obj_per_bucket);
255 	}
256 
257 	return 0;
258 }
259 
260 static int
bucket_adopt_orphans(struct bucket_data * bd)261 bucket_adopt_orphans(struct bucket_data *bd)
262 {
263 	int rc = 0;
264 	struct rte_ring *adopt_ring =
265 		bd->adoption_buffer_rings[rte_lcore_id()];
266 
267 	if (unlikely(!rte_ring_empty(adopt_ring))) {
268 		void *orphan;
269 
270 		while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
271 			rc = bucket_enqueue_single(bd, orphan);
272 			RTE_ASSERT(rc == 0);
273 		}
274 	}
275 	return rc;
276 }
277 
278 static int
bucket_dequeue(struct rte_mempool * mp,void ** obj_table,unsigned int n)279 bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
280 {
281 	struct bucket_data *bd = mp->pool_data;
282 	unsigned int n_buckets = n / bd->obj_per_bucket;
283 	unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
284 	int rc = 0;
285 
286 	bucket_adopt_orphans(bd);
287 
288 	if (unlikely(n_orphans > 0)) {
289 		rc = bucket_dequeue_orphans(bd, obj_table +
290 					    (n_buckets * bd->obj_per_bucket),
291 					    n_orphans);
292 		if (rc != 0)
293 			return rc;
294 	}
295 
296 	if (likely(n_buckets > 0)) {
297 		rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
298 		if (unlikely(rc != 0) && n_orphans > 0) {
299 			rte_ring_enqueue_bulk(bd->shared_orphan_ring,
300 					      obj_table + (n_buckets *
301 							   bd->obj_per_bucket),
302 					      n_orphans, NULL);
303 		}
304 	}
305 
306 	return rc;
307 }
308 
309 static int
bucket_dequeue_contig_blocks(struct rte_mempool * mp,void ** first_obj_table,unsigned int n)310 bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
311 			     unsigned int n)
312 {
313 	struct bucket_data *bd = mp->pool_data;
314 	const uint32_t header_size = bd->header_size;
315 	struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
316 	unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top);
317 	struct bucket_header *hdr;
318 	void **first_objp = first_obj_table;
319 
320 	bucket_adopt_orphans(bd);
321 
322 	n -= n_buckets_from_stack;
323 	while (n_buckets_from_stack-- > 0) {
324 		hdr = bucket_stack_pop_unsafe(cur_stack);
325 		*first_objp++ = (uint8_t *)hdr + header_size;
326 	}
327 	if (n > 0) {
328 		if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring,
329 						   first_objp, n, NULL) != n)) {
330 			/* Return the already dequeued buckets */
331 			while (first_objp-- != first_obj_table) {
332 				bucket_stack_push(cur_stack,
333 						  (uint8_t *)*first_objp -
334 						  header_size);
335 			}
336 			rte_errno = ENOBUFS;
337 			return -rte_errno;
338 		}
339 		while (n-- > 0) {
340 			hdr = (struct bucket_header *)*first_objp;
341 			hdr->lcore_id = rte_lcore_id();
342 			*first_objp++ = (uint8_t *)hdr + header_size;
343 		}
344 	}
345 
346 	return 0;
347 }
348 
349 struct bucket_count_per_lcore_ctx {
350 	const struct bucket_data *bd;
351 	unsigned int count;
352 };
353 
354 static int
bucket_count_per_lcore(unsigned int lcore_id,void * arg)355 bucket_count_per_lcore(unsigned int lcore_id, void *arg)
356 {
357 	struct bucket_count_per_lcore_ctx *bplc = arg;
358 
359 	bplc->count += bplc->bd->obj_per_bucket *
360 		bplc->bd->buckets[lcore_id]->top;
361 	bplc->count +=
362 		rte_ring_count(bplc->bd->adoption_buffer_rings[lcore_id]);
363 	return 0;
364 }
365 
366 static void
count_underfilled_buckets(struct rte_mempool * mp,void * opaque,struct rte_mempool_memhdr * memhdr,__rte_unused unsigned int mem_idx)367 count_underfilled_buckets(struct rte_mempool *mp,
368 			  void *opaque,
369 			  struct rte_mempool_memhdr *memhdr,
370 			  __rte_unused unsigned int mem_idx)
371 {
372 	unsigned int *pcount = opaque;
373 	const struct bucket_data *bd = mp->pool_data;
374 	unsigned int bucket_page_sz =
375 		(unsigned int)(~bd->bucket_page_mask + 1);
376 	uintptr_t align;
377 	uint8_t *iter;
378 
379 	align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
380 		(uintptr_t)memhdr->addr;
381 
382 	for (iter = (uint8_t *)memhdr->addr + align;
383 	     iter < (uint8_t *)memhdr->addr + memhdr->len;
384 	     iter += bucket_page_sz) {
385 		struct bucket_header *hdr = (struct bucket_header *)iter;
386 
387 		*pcount += hdr->fill_cnt;
388 	}
389 }
390 
391 static unsigned int
bucket_get_count(const struct rte_mempool * mp)392 bucket_get_count(const struct rte_mempool *mp)
393 {
394 	struct bucket_count_per_lcore_ctx bplc;
395 
396 	bplc.bd = mp->pool_data;
397 	bplc.count = bplc.bd->obj_per_bucket *
398 		rte_ring_count(bplc.bd->shared_bucket_ring);
399 	bplc.count += rte_ring_count(bplc.bd->shared_orphan_ring);
400 
401 	rte_lcore_iterate(bucket_count_per_lcore, &bplc);
402 	rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
403 			     count_underfilled_buckets, &bplc.count);
404 
405 	return bplc.count;
406 }
407 
408 static int
bucket_init_per_lcore(unsigned int lcore_id,void * arg)409 bucket_init_per_lcore(unsigned int lcore_id, void *arg)
410 {
411 	char rg_name[RTE_RING_NAMESIZE];
412 	struct bucket_data *bd = arg;
413 	struct rte_mempool *mp;
414 	int rg_flags;
415 	int rc;
416 
417 	mp = bd->pool;
418 	bd->buckets[lcore_id] = bucket_stack_create(mp,
419 		mp->size / bd->obj_per_bucket);
420 	if (bd->buckets[lcore_id] == NULL)
421 		goto error;
422 
423 	rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
424 		mp->name, lcore_id);
425 	if (rc < 0 || rc >= (int)sizeof(rg_name))
426 		goto error;
427 
428 	rg_flags = RING_F_SC_DEQ;
429 	if (mp->flags & RTE_MEMPOOL_F_SP_PUT)
430 		rg_flags |= RING_F_SP_ENQ;
431 	bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
432 		rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
433 	if (bd->adoption_buffer_rings[lcore_id] == NULL)
434 		goto error;
435 
436 	return 0;
437 error:
438 	rte_free(bd->buckets[lcore_id]);
439 	bd->buckets[lcore_id] = NULL;
440 	return -1;
441 }
442 
443 static void
bucket_uninit_per_lcore(unsigned int lcore_id,void * arg)444 bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)
445 {
446 	struct bucket_data *bd = arg;
447 
448 	rte_ring_free(bd->adoption_buffer_rings[lcore_id]);
449 	bd->adoption_buffer_rings[lcore_id] = NULL;
450 	rte_free(bd->buckets[lcore_id]);
451 	bd->buckets[lcore_id] = NULL;
452 }
453 
454 static int
bucket_alloc(struct rte_mempool * mp)455 bucket_alloc(struct rte_mempool *mp)
456 {
457 	int rg_flags = 0;
458 	int rc = 0;
459 	char rg_name[RTE_RING_NAMESIZE];
460 	struct bucket_data *bd;
461 	unsigned int bucket_header_size;
462 	size_t pg_sz;
463 
464 	rc = rte_mempool_get_page_size(mp, &pg_sz);
465 	if (rc < 0)
466 		return rc;
467 
468 	bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
469 				RTE_CACHE_LINE_SIZE, mp->socket_id);
470 	if (bd == NULL) {
471 		rc = -ENOMEM;
472 		goto no_mem_for_data;
473 	}
474 	bd->pool = mp;
475 	if (mp->flags & RTE_MEMPOOL_F_NO_CACHE_ALIGN)
476 		bucket_header_size = sizeof(struct bucket_header);
477 	else
478 		bucket_header_size = RTE_CACHE_LINE_SIZE;
479 	RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
480 	bd->header_size = mp->header_size + bucket_header_size;
481 	bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
482 	bd->bucket_mem_size = RTE_MIN(pg_sz,
483 			(size_t)(RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024));
484 	bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
485 		bd->total_elt_size;
486 	bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
487 	/* eventually this should be a tunable parameter */
488 	bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
489 
490 	bd->lcore_callback_handle = rte_lcore_callback_register("bucket",
491 		bucket_init_per_lcore, bucket_uninit_per_lcore, bd);
492 	if (bd->lcore_callback_handle == NULL) {
493 		rc = -ENOMEM;
494 		goto no_mem_for_stacks;
495 	}
496 
497 	if (mp->flags & RTE_MEMPOOL_F_SP_PUT)
498 		rg_flags |= RING_F_SP_ENQ;
499 	if (mp->flags & RTE_MEMPOOL_F_SC_GET)
500 		rg_flags |= RING_F_SC_DEQ;
501 	rc = snprintf(rg_name, sizeof(rg_name),
502 		      RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
503 	if (rc < 0 || rc >= (int)sizeof(rg_name)) {
504 		rc = -ENAMETOOLONG;
505 		goto invalid_shared_orphan_ring;
506 	}
507 	bd->shared_orphan_ring =
508 		rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
509 				mp->socket_id, rg_flags);
510 	if (bd->shared_orphan_ring == NULL) {
511 		rc = -rte_errno;
512 		goto cannot_create_shared_orphan_ring;
513 	}
514 
515 	rc = snprintf(rg_name, sizeof(rg_name),
516 		       RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
517 	if (rc < 0 || rc >= (int)sizeof(rg_name)) {
518 		rc = -ENAMETOOLONG;
519 		goto invalid_shared_bucket_ring;
520 	}
521 	bd->shared_bucket_ring =
522 		rte_ring_create(rg_name,
523 				rte_align32pow2((mp->size + 1) /
524 						bd->obj_per_bucket),
525 				mp->socket_id, rg_flags);
526 	if (bd->shared_bucket_ring == NULL) {
527 		rc = -rte_errno;
528 		goto cannot_create_shared_bucket_ring;
529 	}
530 
531 	mp->pool_data = bd;
532 
533 	return 0;
534 
535 cannot_create_shared_bucket_ring:
536 invalid_shared_bucket_ring:
537 	rte_ring_free(bd->shared_orphan_ring);
538 cannot_create_shared_orphan_ring:
539 invalid_shared_orphan_ring:
540 	rte_lcore_callback_unregister(bd->lcore_callback_handle);
541 no_mem_for_stacks:
542 	rte_free(bd);
543 no_mem_for_data:
544 	rte_errno = -rc;
545 	return rc;
546 }
547 
548 static void
bucket_free(struct rte_mempool * mp)549 bucket_free(struct rte_mempool *mp)
550 {
551 	struct bucket_data *bd = mp->pool_data;
552 
553 	if (bd == NULL)
554 		return;
555 
556 	rte_lcore_callback_unregister(bd->lcore_callback_handle);
557 
558 	rte_ring_free(bd->shared_orphan_ring);
559 	rte_ring_free(bd->shared_bucket_ring);
560 
561 	rte_free(bd);
562 }
563 
564 static ssize_t
bucket_calc_mem_size(const struct rte_mempool * mp,uint32_t obj_num,__rte_unused uint32_t pg_shift,size_t * min_total_elt_size,size_t * align)565 bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
566 		     __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
567 		     size_t *align)
568 {
569 	struct bucket_data *bd = mp->pool_data;
570 	unsigned int bucket_page_sz;
571 
572 	if (bd == NULL)
573 		return -EINVAL;
574 
575 	bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
576 	*align = bucket_page_sz;
577 	*min_total_elt_size = bucket_page_sz;
578 	/*
579 	 * Each bucket occupies its own block aligned to
580 	 * bucket_page_sz, so the required amount of memory is
581 	 * a multiple of bucket_page_sz.
582 	 * We also need extra space for a bucket header
583 	 */
584 	return ((obj_num + bd->obj_per_bucket - 1) /
585 		bd->obj_per_bucket) * bucket_page_sz;
586 }
587 
588 static int
bucket_populate(struct rte_mempool * mp,unsigned int max_objs,void * vaddr,rte_iova_t iova,size_t len,rte_mempool_populate_obj_cb_t * obj_cb,void * obj_cb_arg)589 bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
590 		void *vaddr, rte_iova_t iova, size_t len,
591 		rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
592 {
593 	struct bucket_data *bd = mp->pool_data;
594 	unsigned int bucket_page_sz;
595 	unsigned int bucket_header_sz;
596 	unsigned int n_objs;
597 	uintptr_t align;
598 	uint8_t *iter;
599 	int rc;
600 
601 	if (bd == NULL)
602 		return -EINVAL;
603 
604 	bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
605 	align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
606 		(uintptr_t)vaddr;
607 
608 	bucket_header_sz = bd->header_size - mp->header_size;
609 	if (iova != RTE_BAD_IOVA)
610 		iova += align + bucket_header_sz;
611 
612 	for (iter = (uint8_t *)vaddr + align, n_objs = 0;
613 	     iter < (uint8_t *)vaddr + len && n_objs < max_objs;
614 	     iter += bucket_page_sz) {
615 		struct bucket_header *hdr = (struct bucket_header *)iter;
616 		unsigned int chunk_len = bd->bucket_mem_size;
617 
618 		if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
619 			chunk_len = len - (iter - (uint8_t *)vaddr);
620 		if (chunk_len <= bucket_header_sz)
621 			break;
622 		chunk_len -= bucket_header_sz;
623 
624 		hdr->fill_cnt = 0;
625 		hdr->lcore_id = LCORE_ID_ANY;
626 		rc = rte_mempool_op_populate_helper(mp, 0,
627 						     RTE_MIN(bd->obj_per_bucket,
628 							     max_objs - n_objs),
629 						     iter + bucket_header_sz,
630 						     iova, chunk_len,
631 						     obj_cb, obj_cb_arg);
632 		if (rc < 0)
633 			return rc;
634 		n_objs += rc;
635 		if (iova != RTE_BAD_IOVA)
636 			iova += bucket_page_sz;
637 	}
638 
639 	return n_objs;
640 }
641 
642 static int
bucket_get_info(const struct rte_mempool * mp,struct rte_mempool_info * info)643 bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info)
644 {
645 	struct bucket_data *bd = mp->pool_data;
646 
647 	info->contig_block_size = bd->obj_per_bucket;
648 	return 0;
649 }
650 
651 
652 static const struct rte_mempool_ops ops_bucket = {
653 	.name = "bucket",
654 	.alloc = bucket_alloc,
655 	.free = bucket_free,
656 	.enqueue = bucket_enqueue,
657 	.dequeue = bucket_dequeue,
658 	.get_count = bucket_get_count,
659 	.calc_mem_size = bucket_calc_mem_size,
660 	.populate = bucket_populate,
661 	.get_info = bucket_get_info,
662 	.dequeue_contig_blocks = bucket_dequeue_contig_blocks,
663 };
664 
665 
666 RTE_MEMPOOL_REGISTER_OPS(ops_bucket);
667