xref: /dpdk/lib/hash/rte_cuckoo_hash.c (revision bf26e6f4019b7ab29e2ff8263effd3695d415ef4)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2016 Intel Corporation
3  * Copyright(c) 2018 Arm Limited
4  */
5 
6 #include <string.h>
7 #include <stdint.h>
8 #include <errno.h>
9 #include <stdio.h>
10 #include <sys/queue.h>
11 
12 #include <rte_common.h>
13 #include <rte_log.h>
14 #include <rte_prefetch.h>
15 #include <rte_branch_prediction.h>
16 #include <rte_malloc.h>
17 #include <rte_eal_memconfig.h>
18 #include <rte_errno.h>
19 #include <rte_string_fns.h>
20 #include <rte_cpuflags.h>
21 #include <rte_rwlock.h>
22 #include <rte_ring_elem.h>
23 #include <rte_vect.h>
24 #include <rte_tailq.h>
25 
26 #include "rte_hash.h"
27 
28 /* needs to be before rte_cuckoo_hash.h */
29 RTE_LOG_REGISTER_DEFAULT(hash_logtype, INFO);
30 #define RTE_LOGTYPE_HASH hash_logtype
31 #define HASH_LOG(level, ...) \
32 	RTE_LOG_LINE(level, HASH, "" __VA_ARGS__)
33 
34 #include "rte_cuckoo_hash.h"
35 
36 /* Enum used to select the implementation of the signature comparison function to use
37  * eg: a system supporting SVE might want to use a NEON or scalar implementation.
38  */
39 enum rte_hash_sig_compare_function {
40 	RTE_HASH_COMPARE_SCALAR = 0,
41 	RTE_HASH_COMPARE_SSE,
42 	RTE_HASH_COMPARE_NEON,
43 	RTE_HASH_COMPARE_SVE,
44 };
45 
46 #if defined(__ARM_NEON)
47 #include "compare_signatures_arm.h"
48 #elif defined(__SSE2__)
49 #include "compare_signatures_x86.h"
50 #else
51 #include "compare_signatures_generic.h"
52 #endif
53 
54 /* Mask of all flags supported by this version */
55 #define RTE_HASH_EXTRA_FLAGS_MASK (RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT | \
56 				   RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD | \
57 				   RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY | \
58 				   RTE_HASH_EXTRA_FLAGS_EXT_TABLE |	\
59 				   RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL | \
60 				   RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF)
61 
62 #define FOR_EACH_BUCKET(CURRENT_BKT, START_BUCKET)                            \
63 	for (CURRENT_BKT = START_BUCKET;                                      \
64 		CURRENT_BKT != NULL;                                          \
65 		CURRENT_BKT = CURRENT_BKT->next)
66 
67 TAILQ_HEAD(rte_hash_list, rte_tailq_entry);
68 
69 static struct rte_tailq_elem rte_hash_tailq = {
70 	.name = "RTE_HASH",
71 };
72 EAL_REGISTER_TAILQ(rte_hash_tailq)
73 
74 struct __rte_hash_rcu_dq_entry {
75 	uint32_t key_idx;
76 	uint32_t ext_bkt_idx;
77 };
78 
79 struct rte_hash *
80 rte_hash_find_existing(const char *name)
81 {
82 	struct rte_hash *h = NULL;
83 	struct rte_tailq_entry *te;
84 	struct rte_hash_list *hash_list;
85 
86 	hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
87 
88 	rte_mcfg_tailq_read_lock();
89 	TAILQ_FOREACH(te, hash_list, next) {
90 		h = (struct rte_hash *) te->data;
91 		if (strncmp(name, h->name, RTE_HASH_NAMESIZE) == 0)
92 			break;
93 	}
94 	rte_mcfg_tailq_read_unlock();
95 
96 	if (te == NULL) {
97 		rte_errno = ENOENT;
98 		return NULL;
99 	}
100 	return h;
101 }
102 
103 static inline struct rte_hash_bucket *
104 rte_hash_get_last_bkt(struct rte_hash_bucket *lst_bkt)
105 {
106 	while (lst_bkt->next != NULL)
107 		lst_bkt = lst_bkt->next;
108 	return lst_bkt;
109 }
110 
111 void rte_hash_set_cmp_func(struct rte_hash *h, rte_hash_cmp_eq_t func)
112 {
113 	h->cmp_jump_table_idx = KEY_CUSTOM;
114 	h->rte_hash_custom_cmp_eq = func;
115 }
116 
117 static inline int
118 rte_hash_cmp_eq(const void *key1, const void *key2, const struct rte_hash *h)
119 {
120 	if (h->cmp_jump_table_idx == KEY_CUSTOM)
121 		return h->rte_hash_custom_cmp_eq(key1, key2, h->key_len);
122 	else
123 		return cmp_jump_table[h->cmp_jump_table_idx](key1, key2, h->key_len);
124 }
125 
126 /*
127  * We use higher 16 bits of hash as the signature value stored in table.
128  * We use the lower bits for the primary bucket
129  * location. Then we XOR primary bucket location and the signature
130  * to get the secondary bucket location. This is same as
131  * proposed in Bin Fan, et al's paper
132  * "MemC3: Compact and Concurrent MemCache with Dumber Caching and
133  * Smarter Hashing". The benefit to use
134  * XOR is that one could derive the alternative bucket location
135  * by only using the current bucket location and the signature.
136  */
137 static inline uint16_t
138 get_short_sig(const hash_sig_t hash)
139 {
140 	return hash >> 16;
141 }
142 
143 static inline uint32_t
144 get_prim_bucket_index(const struct rte_hash *h, const hash_sig_t hash)
145 {
146 	return hash & h->bucket_bitmask;
147 }
148 
149 static inline uint32_t
150 get_alt_bucket_index(const struct rte_hash *h,
151 			uint32_t cur_bkt_idx, uint16_t sig)
152 {
153 	return (cur_bkt_idx ^ sig) & h->bucket_bitmask;
154 }
155 
156 struct rte_hash *
157 rte_hash_create(const struct rte_hash_parameters *params)
158 {
159 	struct rte_hash *h = NULL;
160 	struct rte_tailq_entry *te = NULL;
161 	struct rte_hash_list *hash_list;
162 	struct rte_ring *r = NULL;
163 	struct rte_ring *r_ext = NULL;
164 	char hash_name[RTE_HASH_NAMESIZE];
165 	void *k = NULL;
166 	void *buckets = NULL;
167 	void *buckets_ext = NULL;
168 	char ring_name[RTE_RING_NAMESIZE];
169 	char ext_ring_name[RTE_RING_NAMESIZE];
170 	unsigned num_key_slots;
171 	unsigned int hw_trans_mem_support = 0, use_local_cache = 0;
172 	unsigned int ext_table_support = 0;
173 	unsigned int readwrite_concur_support = 0;
174 	unsigned int writer_takes_lock = 0;
175 	unsigned int no_free_on_del = 0;
176 	uint32_t *ext_bkt_to_free = NULL;
177 	RTE_ATOMIC(uint32_t) *tbl_chng_cnt = NULL;
178 	struct lcore_cache *local_free_slots = NULL;
179 	unsigned int readwrite_concur_lf_support = 0;
180 	uint32_t i;
181 
182 	rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
183 
184 	hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
185 
186 	if (params == NULL) {
187 		rte_errno = EINVAL;
188 		HASH_LOG(ERR, "%s has no parameters", __func__);
189 		return NULL;
190 	}
191 
192 	/* Check for valid parameters */
193 	if ((params->entries > RTE_HASH_ENTRIES_MAX) ||
194 			(params->entries < RTE_HASH_BUCKET_ENTRIES)) {
195 		rte_errno = EINVAL;
196 		HASH_LOG(ERR, "%s() entries (%u) must be in range [%d, %d] inclusive",
197 			__func__, params->entries, RTE_HASH_BUCKET_ENTRIES,
198 			RTE_HASH_ENTRIES_MAX);
199 		return NULL;
200 	}
201 
202 	if (params->key_len == 0) {
203 		rte_errno = EINVAL;
204 		HASH_LOG(ERR, "%s() key_len must be greater than 0", __func__);
205 		return NULL;
206 	}
207 
208 	if (params->extra_flag & ~RTE_HASH_EXTRA_FLAGS_MASK) {
209 		rte_errno = EINVAL;
210 		HASH_LOG(ERR, "%s: unsupported extra flags", __func__);
211 		return NULL;
212 	}
213 
214 	if (params->name == NULL) {
215 		rte_errno = EINVAL;
216 		HASH_LOG(ERR, "%s() has invalid parameters, name can't be NULL",
217 			__func__);
218 		return NULL;
219 	}
220 
221 	/* Validate correct usage of extra options */
222 	if ((params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) &&
223 	    (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF)) {
224 		rte_errno = EINVAL;
225 		HASH_LOG(ERR, "%s: choose rw concurrency or rw concurrency lock free",
226 			__func__);
227 		return NULL;
228 	}
229 
230 	/* Check extra flags field to check extra options. */
231 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
232 		hw_trans_mem_support = 1;
233 
234 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD) {
235 		use_local_cache = 1;
236 		writer_takes_lock = 1;
237 	}
238 
239 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) {
240 		readwrite_concur_support = 1;
241 		writer_takes_lock = 1;
242 	}
243 
244 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_EXT_TABLE)
245 		ext_table_support = 1;
246 
247 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL)
248 		no_free_on_del = 1;
249 
250 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
251 		readwrite_concur_lf_support = 1;
252 		/* Enable not freeing internal memory/index on delete.
253 		 * If internal RCU is enabled, freeing of internal memory/index
254 		 * is done on delete
255 		 */
256 		no_free_on_del = 1;
257 	}
258 
259 	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
260 	if (use_local_cache)
261 		/*
262 		 * Increase number of slots by total number of indices
263 		 * that can be stored in the lcore caches
264 		 * except for the first cache
265 		 */
266 		num_key_slots = params->entries + (RTE_MAX_LCORE - 1) *
267 					(LCORE_CACHE_SIZE - 1) + 1;
268 	else
269 		num_key_slots = params->entries + 1;
270 
271 	snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name);
272 	/* Create ring (Dummy slot index is not enqueued) */
273 	r = rte_ring_create_elem(ring_name, sizeof(uint32_t),
274 			rte_align32pow2(num_key_slots), params->socket_id, 0);
275 	if (r == NULL) {
276 		HASH_LOG(ERR, "memory allocation failed");
277 		goto err;
278 	}
279 
280 	const uint32_t num_buckets = rte_align32pow2(params->entries) /
281 						RTE_HASH_BUCKET_ENTRIES;
282 
283 	/* Create ring for extendable buckets. */
284 	if (ext_table_support) {
285 		snprintf(ext_ring_name, sizeof(ext_ring_name), "HT_EXT_%s",
286 								params->name);
287 		r_ext = rte_ring_create_elem(ext_ring_name, sizeof(uint32_t),
288 				rte_align32pow2(num_buckets + 1),
289 				params->socket_id, 0);
290 
291 		if (r_ext == NULL) {
292 			HASH_LOG(ERR, "ext buckets memory allocation "
293 								"failed");
294 			goto err;
295 		}
296 	}
297 
298 	snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name);
299 
300 	rte_mcfg_tailq_write_lock();
301 
302 	/* guarantee there's no existing: this is normally already checked
303 	 * by ring creation above */
304 	TAILQ_FOREACH(te, hash_list, next) {
305 		h = (struct rte_hash *) te->data;
306 		if (strncmp(params->name, h->name, RTE_HASH_NAMESIZE) == 0)
307 			break;
308 	}
309 	h = NULL;
310 	if (te != NULL) {
311 		rte_errno = EEXIST;
312 		te = NULL;
313 		goto err_unlock;
314 	}
315 
316 	te = rte_zmalloc("HASH_TAILQ_ENTRY", sizeof(*te), 0);
317 	if (te == NULL) {
318 		HASH_LOG(ERR, "tailq entry allocation failed");
319 		goto err_unlock;
320 	}
321 
322 	h = (struct rte_hash *)rte_zmalloc_socket(hash_name, sizeof(struct rte_hash),
323 					RTE_CACHE_LINE_SIZE, params->socket_id);
324 
325 	if (h == NULL) {
326 		HASH_LOG(ERR, "memory allocation failed");
327 		goto err_unlock;
328 	}
329 
330 	buckets = rte_zmalloc_socket(NULL,
331 				num_buckets * sizeof(struct rte_hash_bucket),
332 				RTE_CACHE_LINE_SIZE, params->socket_id);
333 
334 	if (buckets == NULL) {
335 		HASH_LOG(ERR, "buckets memory allocation failed");
336 		goto err_unlock;
337 	}
338 
339 	/* Allocate same number of extendable buckets */
340 	if (ext_table_support) {
341 		buckets_ext = rte_zmalloc_socket(NULL,
342 				num_buckets * sizeof(struct rte_hash_bucket),
343 				RTE_CACHE_LINE_SIZE, params->socket_id);
344 		if (buckets_ext == NULL) {
345 			HASH_LOG(ERR, "ext buckets memory allocation "
346 							"failed");
347 			goto err_unlock;
348 		}
349 		/* Populate ext bkt ring. We reserve 0 similar to the
350 		 * key-data slot, just in case in future we want to
351 		 * use bucket index for the linked list and 0 means NULL
352 		 * for next bucket
353 		 */
354 		for (i = 1; i <= num_buckets; i++)
355 			rte_ring_sp_enqueue_elem(r_ext, &i, sizeof(uint32_t));
356 
357 		if (readwrite_concur_lf_support) {
358 			ext_bkt_to_free = rte_zmalloc(NULL, sizeof(uint32_t) *
359 								num_key_slots, 0);
360 			if (ext_bkt_to_free == NULL) {
361 				HASH_LOG(ERR, "ext bkt to free memory allocation "
362 								"failed");
363 				goto err_unlock;
364 			}
365 		}
366 	}
367 
368 	const uint32_t key_entry_size =
369 		RTE_ALIGN(sizeof(struct rte_hash_key) + params->key_len,
370 			  KEY_ALIGNMENT);
371 	const uint64_t key_tbl_size = (uint64_t) key_entry_size * num_key_slots;
372 
373 	k = rte_zmalloc_socket(NULL, key_tbl_size,
374 			RTE_CACHE_LINE_SIZE, params->socket_id);
375 
376 	if (k == NULL) {
377 		HASH_LOG(ERR, "memory allocation failed");
378 		goto err_unlock;
379 	}
380 
381 	tbl_chng_cnt = rte_zmalloc_socket(NULL, sizeof(uint32_t),
382 			RTE_CACHE_LINE_SIZE, params->socket_id);
383 
384 	if (tbl_chng_cnt == NULL) {
385 		HASH_LOG(ERR, "memory allocation failed");
386 		goto err_unlock;
387 	}
388 
389 /*
390  * If x86 architecture is used, select appropriate compare function,
391  * which may use x86 intrinsics, otherwise use memcmp
392  */
393 #if defined(RTE_ARCH_X86) || defined(RTE_ARCH_ARM64)
394 	/* Select function to compare keys */
395 	switch (params->key_len) {
396 	case 16:
397 		h->cmp_jump_table_idx = KEY_16_BYTES;
398 		break;
399 	case 32:
400 		h->cmp_jump_table_idx = KEY_32_BYTES;
401 		break;
402 	case 48:
403 		h->cmp_jump_table_idx = KEY_48_BYTES;
404 		break;
405 	case 64:
406 		h->cmp_jump_table_idx = KEY_64_BYTES;
407 		break;
408 	case 80:
409 		h->cmp_jump_table_idx = KEY_80_BYTES;
410 		break;
411 	case 96:
412 		h->cmp_jump_table_idx = KEY_96_BYTES;
413 		break;
414 	case 112:
415 		h->cmp_jump_table_idx = KEY_112_BYTES;
416 		break;
417 	case 128:
418 		h->cmp_jump_table_idx = KEY_128_BYTES;
419 		break;
420 	default:
421 		/* If key is not multiple of 16, use generic memcmp */
422 		h->cmp_jump_table_idx = KEY_OTHER_BYTES;
423 	}
424 #else
425 	h->cmp_jump_table_idx = KEY_OTHER_BYTES;
426 #endif
427 
428 	if (use_local_cache) {
429 		local_free_slots = rte_zmalloc_socket(NULL,
430 				sizeof(struct lcore_cache) * RTE_MAX_LCORE,
431 				RTE_CACHE_LINE_SIZE, params->socket_id);
432 		if (local_free_slots == NULL) {
433 			HASH_LOG(ERR, "local free slots memory allocation failed");
434 			goto err_unlock;
435 		}
436 	}
437 
438 	/* Default hash function */
439 #if defined(RTE_ARCH_X86)
440 	default_hash_func = (rte_hash_function)rte_hash_crc;
441 #elif defined(RTE_ARCH_ARM64)
442 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_CRC32))
443 		default_hash_func = (rte_hash_function)rte_hash_crc;
444 #endif
445 	/* Setup hash context */
446 	strlcpy(h->name, params->name, sizeof(h->name));
447 	h->entries = params->entries;
448 	h->key_len = params->key_len;
449 	h->key_entry_size = key_entry_size;
450 	h->hash_func_init_val = params->hash_func_init_val;
451 
452 	h->num_buckets = num_buckets;
453 	h->bucket_bitmask = h->num_buckets - 1;
454 	h->buckets = buckets;
455 	h->buckets_ext = buckets_ext;
456 	h->free_ext_bkts = r_ext;
457 	h->hash_func = (params->hash_func == NULL) ?
458 		default_hash_func : params->hash_func;
459 	h->key_store = k;
460 	h->free_slots = r;
461 	h->ext_bkt_to_free = ext_bkt_to_free;
462 	h->tbl_chng_cnt = tbl_chng_cnt;
463 	*h->tbl_chng_cnt = 0;
464 	h->hw_trans_mem_support = hw_trans_mem_support;
465 	h->use_local_cache = use_local_cache;
466 	h->local_free_slots = local_free_slots;
467 	h->readwrite_concur_support = readwrite_concur_support;
468 	h->ext_table_support = ext_table_support;
469 	h->writer_takes_lock = writer_takes_lock;
470 	h->no_free_on_del = no_free_on_del;
471 	h->readwrite_concur_lf_support = readwrite_concur_lf_support;
472 
473 #if defined(RTE_ARCH_X86)
474 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2))
475 		h->sig_cmp_fn = RTE_HASH_COMPARE_SSE;
476 	else
477 #elif defined(RTE_ARCH_ARM64)
478 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_NEON)) {
479 		h->sig_cmp_fn = RTE_HASH_COMPARE_NEON;
480 #if defined(RTE_HAS_SVE_ACLE)
481 		if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SVE))
482 			h->sig_cmp_fn = RTE_HASH_COMPARE_SVE;
483 #endif
484 	}
485 	else
486 #endif
487 		h->sig_cmp_fn = RTE_HASH_COMPARE_SCALAR;
488 
489 	/* Writer threads need to take the lock when:
490 	 * 1) RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY is enabled OR
491 	 * 2) RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD is enabled
492 	 */
493 	if (h->writer_takes_lock) {
494 		h->readwrite_lock = rte_malloc(NULL, sizeof(rte_rwlock_t),
495 						RTE_CACHE_LINE_SIZE);
496 		if (h->readwrite_lock == NULL)
497 			goto err_unlock;
498 
499 		rte_rwlock_init(h->readwrite_lock);
500 	}
501 
502 	/* Populate free slots ring. Entry zero is reserved for key misses. */
503 	for (i = 1; i < num_key_slots; i++)
504 		rte_ring_sp_enqueue_elem(r, &i, sizeof(uint32_t));
505 
506 	te->data = (void *) h;
507 	TAILQ_INSERT_TAIL(hash_list, te, next);
508 	rte_mcfg_tailq_write_unlock();
509 
510 	return h;
511 err_unlock:
512 	rte_mcfg_tailq_write_unlock();
513 err:
514 	rte_ring_free(r);
515 	rte_ring_free(r_ext);
516 	rte_free(te);
517 	rte_free(local_free_slots);
518 	rte_free(h);
519 	rte_free(buckets);
520 	rte_free(buckets_ext);
521 	rte_free(k);
522 	rte_free((void *)(uintptr_t)tbl_chng_cnt);
523 	rte_free(ext_bkt_to_free);
524 	return NULL;
525 }
526 
527 void
528 rte_hash_free(struct rte_hash *h)
529 {
530 	struct rte_tailq_entry *te;
531 	struct rte_hash_list *hash_list;
532 
533 	if (h == NULL)
534 		return;
535 
536 	hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
537 
538 	rte_mcfg_tailq_write_lock();
539 
540 	/* find out tailq entry */
541 	TAILQ_FOREACH(te, hash_list, next) {
542 		if (te->data == (void *) h)
543 			break;
544 	}
545 
546 	if (te == NULL) {
547 		rte_mcfg_tailq_write_unlock();
548 		return;
549 	}
550 
551 	TAILQ_REMOVE(hash_list, te, next);
552 
553 	rte_mcfg_tailq_write_unlock();
554 
555 	if (h->dq)
556 		rte_rcu_qsbr_dq_delete(h->dq);
557 
558 	if (h->use_local_cache)
559 		rte_free(h->local_free_slots);
560 	if (h->writer_takes_lock)
561 		rte_free(h->readwrite_lock);
562 	rte_ring_free(h->free_slots);
563 	rte_ring_free(h->free_ext_bkts);
564 	rte_free(h->key_store);
565 	rte_free(h->buckets);
566 	rte_free(h->buckets_ext);
567 	rte_free((void *)(uintptr_t)h->tbl_chng_cnt);
568 	rte_free(h->ext_bkt_to_free);
569 	rte_free(h->hash_rcu_cfg);
570 	rte_free(h);
571 	rte_free(te);
572 }
573 
574 hash_sig_t
575 rte_hash_hash(const struct rte_hash *h, const void *key)
576 {
577 	/* calc hash result by key */
578 	return h->hash_func(key, h->key_len, h->hash_func_init_val);
579 }
580 
581 int32_t
582 rte_hash_max_key_id(const struct rte_hash *h)
583 {
584 	RETURN_IF_TRUE((h == NULL), -EINVAL);
585 	if (h->use_local_cache)
586 		/*
587 		 * Increase number of slots by total number of indices
588 		 * that can be stored in the lcore caches
589 		 */
590 		return (h->entries + ((RTE_MAX_LCORE - 1) *
591 					(LCORE_CACHE_SIZE - 1)));
592 	else
593 		return h->entries;
594 }
595 
596 int32_t
597 rte_hash_count(const struct rte_hash *h)
598 {
599 	uint32_t tot_ring_cnt, cached_cnt = 0;
600 	uint32_t i, ret;
601 
602 	if (h == NULL)
603 		return -EINVAL;
604 
605 	if (h->use_local_cache) {
606 		tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) *
607 					(LCORE_CACHE_SIZE - 1);
608 		for (i = 0; i < RTE_MAX_LCORE; i++)
609 			cached_cnt += h->local_free_slots[i].len;
610 
611 		ret = tot_ring_cnt - rte_ring_count(h->free_slots) -
612 								cached_cnt;
613 	} else {
614 		tot_ring_cnt = h->entries;
615 		ret = tot_ring_cnt - rte_ring_count(h->free_slots);
616 	}
617 	return ret;
618 }
619 
620 /* Read write locks implemented using rte_rwlock */
621 static inline void
622 __hash_rw_writer_lock(const struct rte_hash *h)
623 	__rte_exclusive_lock_function(&h->readwrite_lock)
624 	__rte_no_thread_safety_analysis
625 {
626 	if (h->writer_takes_lock && h->hw_trans_mem_support)
627 		rte_rwlock_write_lock_tm(h->readwrite_lock);
628 	else if (h->writer_takes_lock)
629 		rte_rwlock_write_lock(h->readwrite_lock);
630 }
631 
632 static inline void
633 __hash_rw_reader_lock(const struct rte_hash *h)
634 	__rte_shared_lock_function(&h->readwrite_lock)
635 	__rte_no_thread_safety_analysis
636 {
637 	if (h->readwrite_concur_support && h->hw_trans_mem_support)
638 		rte_rwlock_read_lock_tm(h->readwrite_lock);
639 	else if (h->readwrite_concur_support)
640 		rte_rwlock_read_lock(h->readwrite_lock);
641 }
642 
643 static inline void
644 __hash_rw_writer_unlock(const struct rte_hash *h)
645 	__rte_unlock_function(&h->readwrite_lock)
646 	__rte_no_thread_safety_analysis
647 {
648 	if (h->writer_takes_lock && h->hw_trans_mem_support)
649 		rte_rwlock_write_unlock_tm(h->readwrite_lock);
650 	else if (h->writer_takes_lock)
651 		rte_rwlock_write_unlock(h->readwrite_lock);
652 }
653 
654 static inline void
655 __hash_rw_reader_unlock(const struct rte_hash *h)
656 	__rte_unlock_function(&h->readwrite_lock)
657 	__rte_no_thread_safety_analysis
658 {
659 	if (h->readwrite_concur_support && h->hw_trans_mem_support)
660 		rte_rwlock_read_unlock_tm(h->readwrite_lock);
661 	else if (h->readwrite_concur_support)
662 		rte_rwlock_read_unlock(h->readwrite_lock);
663 }
664 
665 void
666 rte_hash_reset(struct rte_hash *h)
667 {
668 	uint32_t tot_ring_cnt, i;
669 	unsigned int pending;
670 
671 	if (h == NULL)
672 		return;
673 
674 	__hash_rw_writer_lock(h);
675 
676 	if (h->dq) {
677 		/* Reclaim all the resources */
678 		rte_rcu_qsbr_dq_reclaim(h->dq, ~0, NULL, &pending, NULL);
679 		if (pending != 0)
680 			HASH_LOG(ERR, "RCU reclaim all resources failed");
681 	}
682 
683 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
684 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
685 	*h->tbl_chng_cnt = 0;
686 
687 	/* reset the free ring */
688 	rte_ring_reset(h->free_slots);
689 
690 	/* flush free extendable bucket ring and memory */
691 	if (h->ext_table_support) {
692 		memset(h->buckets_ext, 0, h->num_buckets *
693 						sizeof(struct rte_hash_bucket));
694 		rte_ring_reset(h->free_ext_bkts);
695 	}
696 
697 	/* Repopulate the free slots ring. Entry zero is reserved for key misses */
698 	if (h->use_local_cache)
699 		tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) *
700 					(LCORE_CACHE_SIZE - 1);
701 	else
702 		tot_ring_cnt = h->entries;
703 
704 	for (i = 1; i < tot_ring_cnt + 1; i++)
705 		rte_ring_sp_enqueue_elem(h->free_slots, &i, sizeof(uint32_t));
706 
707 	/* Repopulate the free ext bkt ring. */
708 	if (h->ext_table_support) {
709 		for (i = 1; i <= h->num_buckets; i++)
710 			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &i,
711 							sizeof(uint32_t));
712 	}
713 
714 	if (h->use_local_cache) {
715 		/* Reset local caches per lcore */
716 		for (i = 0; i < RTE_MAX_LCORE; i++)
717 			h->local_free_slots[i].len = 0;
718 	}
719 	__hash_rw_writer_unlock(h);
720 }
721 
722 /*
723  * Function called to enqueue back an index in the cache/ring,
724  * as slot has not being used and it can be used in the
725  * next addition attempt.
726  */
727 static inline void
728 enqueue_slot_back(const struct rte_hash *h,
729 		struct lcore_cache *cached_free_slots,
730 		uint32_t slot_id)
731 {
732 	if (h->use_local_cache) {
733 		cached_free_slots->objs[cached_free_slots->len] = slot_id;
734 		cached_free_slots->len++;
735 	} else
736 		rte_ring_sp_enqueue_elem(h->free_slots, &slot_id,
737 						sizeof(uint32_t));
738 }
739 
740 /* Search a key from bucket and update its data.
741  * Writer holds the lock before calling this.
742  */
743 static inline int32_t
744 search_and_update(const struct rte_hash *h, void *data, const void *key,
745 	struct rte_hash_bucket *bkt, uint16_t sig)
746 {
747 	int i;
748 	struct rte_hash_key *k, *keys = h->key_store;
749 
750 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
751 		if (bkt->sig_current[i] == sig) {
752 			k = (struct rte_hash_key *) ((char *)keys +
753 					bkt->key_idx[i] * h->key_entry_size);
754 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
755 				/* The store to application data at *data
756 				 * should not leak after the store to pdata
757 				 * in the key store. i.e. pdata is the guard
758 				 * variable. Release the application data
759 				 * to the readers.
760 				 */
761 				rte_atomic_store_explicit(&k->pdata,
762 					data,
763 					rte_memory_order_release);
764 				/*
765 				 * Return index where key is stored,
766 				 * subtracting the first dummy index
767 				 */
768 				return bkt->key_idx[i] - 1;
769 			}
770 		}
771 	}
772 	return -1;
773 }
774 
775 /* Only tries to insert at one bucket (@prim_bkt) without trying to push
776  * buckets around.
777  * return 1 if matching existing key, return 0 if succeeds, return -1 for no
778  * empty entry.
779  */
780 static inline int32_t
781 rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
782 		struct rte_hash_bucket *prim_bkt,
783 		struct rte_hash_bucket *sec_bkt,
784 		const struct rte_hash_key *key, void *data,
785 		uint16_t sig, uint32_t new_idx,
786 		int32_t *ret_val)
787 {
788 	unsigned int i;
789 	struct rte_hash_bucket *cur_bkt;
790 	int32_t ret;
791 
792 	__hash_rw_writer_lock(h);
793 	/* Check if key was inserted after last check but before this
794 	 * protected region in case of inserting duplicated keys.
795 	 */
796 	ret = search_and_update(h, data, key, prim_bkt, sig);
797 	if (ret != -1) {
798 		__hash_rw_writer_unlock(h);
799 		*ret_val = ret;
800 		return 1;
801 	}
802 
803 	FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
804 		ret = search_and_update(h, data, key, cur_bkt, sig);
805 		if (ret != -1) {
806 			__hash_rw_writer_unlock(h);
807 			*ret_val = ret;
808 			return 1;
809 		}
810 	}
811 
812 	/* Insert new entry if there is room in the primary
813 	 * bucket.
814 	 */
815 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
816 		/* Check if slot is available */
817 		if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
818 			prim_bkt->sig_current[i] = sig;
819 			/* Store to signature and key should not
820 			 * leak after the store to key_idx. i.e.
821 			 * key_idx is the guard variable for signature
822 			 * and key.
823 			 */
824 			rte_atomic_store_explicit(&prim_bkt->key_idx[i],
825 					 new_idx,
826 					 rte_memory_order_release);
827 			break;
828 		}
829 	}
830 	__hash_rw_writer_unlock(h);
831 
832 	if (i != RTE_HASH_BUCKET_ENTRIES)
833 		return 0;
834 
835 	/* no empty entry */
836 	return -1;
837 }
838 
839 /* Shift buckets along provided cuckoo_path (@leaf and @leaf_slot) and fill
840  * the path head with new entry (sig, alt_hash, new_idx)
841  * return 1 if matched key found, return -1 if cuckoo path invalided and fail,
842  * return 0 if succeeds.
843  */
844 static inline int
845 rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
846 			struct rte_hash_bucket *bkt,
847 			struct rte_hash_bucket *alt_bkt,
848 			const struct rte_hash_key *key, void *data,
849 			struct queue_node *leaf, uint32_t leaf_slot,
850 			uint16_t sig, uint32_t new_idx,
851 			int32_t *ret_val)
852 {
853 	uint32_t prev_alt_bkt_idx;
854 	struct rte_hash_bucket *cur_bkt;
855 	struct queue_node *prev_node, *curr_node = leaf;
856 	struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
857 	uint32_t prev_slot, curr_slot = leaf_slot;
858 	int32_t ret;
859 
860 	__hash_rw_writer_lock(h);
861 
862 	/* In case empty slot was gone before entering protected region */
863 	if (curr_bkt->key_idx[curr_slot] != EMPTY_SLOT) {
864 		__hash_rw_writer_unlock(h);
865 		return -1;
866 	}
867 
868 	/* Check if key was inserted after last check but before this
869 	 * protected region.
870 	 */
871 	ret = search_and_update(h, data, key, bkt, sig);
872 	if (ret != -1) {
873 		__hash_rw_writer_unlock(h);
874 		*ret_val = ret;
875 		return 1;
876 	}
877 
878 	FOR_EACH_BUCKET(cur_bkt, alt_bkt) {
879 		ret = search_and_update(h, data, key, cur_bkt, sig);
880 		if (ret != -1) {
881 			__hash_rw_writer_unlock(h);
882 			*ret_val = ret;
883 			return 1;
884 		}
885 	}
886 
887 	while (likely(curr_node->prev != NULL)) {
888 		prev_node = curr_node->prev;
889 		prev_bkt = prev_node->bkt;
890 		prev_slot = curr_node->prev_slot;
891 
892 		prev_alt_bkt_idx = get_alt_bucket_index(h,
893 					prev_node->cur_bkt_idx,
894 					prev_bkt->sig_current[prev_slot]);
895 
896 		if (unlikely(&h->buckets[prev_alt_bkt_idx]
897 				!= curr_bkt)) {
898 			/* revert it to empty, otherwise duplicated keys */
899 			rte_atomic_store_explicit(&curr_bkt->key_idx[curr_slot],
900 				EMPTY_SLOT,
901 				rte_memory_order_release);
902 			__hash_rw_writer_unlock(h);
903 			return -1;
904 		}
905 
906 		if (h->readwrite_concur_lf_support) {
907 			/* Inform the previous move. The current move need
908 			 * not be informed now as the current bucket entry
909 			 * is present in both primary and secondary.
910 			 * Since there is one writer, load acquires on
911 			 * tbl_chng_cnt are not required.
912 			 */
913 			rte_atomic_store_explicit(h->tbl_chng_cnt,
914 					 *h->tbl_chng_cnt + 1,
915 					 rte_memory_order_release);
916 			/* The store to sig_current should not
917 			 * move above the store to tbl_chng_cnt.
918 			 */
919 			rte_atomic_thread_fence(rte_memory_order_release);
920 		}
921 
922 		/* Need to swap current/alt sig to allow later
923 		 * Cuckoo insert to move elements back to its
924 		 * primary bucket if available
925 		 */
926 		curr_bkt->sig_current[curr_slot] =
927 			prev_bkt->sig_current[prev_slot];
928 		/* Release the updated bucket entry */
929 		rte_atomic_store_explicit(&curr_bkt->key_idx[curr_slot],
930 			prev_bkt->key_idx[prev_slot],
931 			rte_memory_order_release);
932 
933 		curr_slot = prev_slot;
934 		curr_node = prev_node;
935 		curr_bkt = curr_node->bkt;
936 	}
937 
938 	if (h->readwrite_concur_lf_support) {
939 		/* Inform the previous move. The current move need
940 		 * not be informed now as the current bucket entry
941 		 * is present in both primary and secondary.
942 		 * Since there is one writer, load acquires on
943 		 * tbl_chng_cnt are not required.
944 		 */
945 		rte_atomic_store_explicit(h->tbl_chng_cnt,
946 				 *h->tbl_chng_cnt + 1,
947 				 rte_memory_order_release);
948 		/* The store to sig_current should not
949 		 * move above the store to tbl_chng_cnt.
950 		 */
951 		rte_atomic_thread_fence(rte_memory_order_release);
952 	}
953 
954 	curr_bkt->sig_current[curr_slot] = sig;
955 	/* Release the new bucket entry */
956 	rte_atomic_store_explicit(&curr_bkt->key_idx[curr_slot],
957 			 new_idx,
958 			 rte_memory_order_release);
959 
960 	__hash_rw_writer_unlock(h);
961 
962 	return 0;
963 
964 }
965 
966 /*
967  * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
968  * Cuckoo
969  */
970 static inline int
971 rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
972 			struct rte_hash_bucket *bkt,
973 			struct rte_hash_bucket *sec_bkt,
974 			const struct rte_hash_key *key, void *data,
975 			uint16_t sig, uint32_t bucket_idx,
976 			uint32_t new_idx, int32_t *ret_val)
977 {
978 	unsigned int i;
979 	struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
980 	struct queue_node *tail, *head;
981 	struct rte_hash_bucket *curr_bkt, *alt_bkt;
982 	uint32_t cur_idx, alt_idx;
983 
984 	tail = queue;
985 	head = queue + 1;
986 	tail->bkt = bkt;
987 	tail->prev = NULL;
988 	tail->prev_slot = -1;
989 	tail->cur_bkt_idx = bucket_idx;
990 
991 	/* Cuckoo bfs Search */
992 	while (likely(tail != head && head <
993 					queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
994 					RTE_HASH_BUCKET_ENTRIES)) {
995 		curr_bkt = tail->bkt;
996 		cur_idx = tail->cur_bkt_idx;
997 		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
998 			if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
999 				int32_t ret = rte_hash_cuckoo_move_insert_mw(h,
1000 						bkt, sec_bkt, key, data,
1001 						tail, i, sig,
1002 						new_idx, ret_val);
1003 				if (likely(ret != -1))
1004 					return ret;
1005 			}
1006 
1007 			/* Enqueue new node and keep prev node info */
1008 			alt_idx = get_alt_bucket_index(h, cur_idx,
1009 						curr_bkt->sig_current[i]);
1010 			alt_bkt = &(h->buckets[alt_idx]);
1011 			head->bkt = alt_bkt;
1012 			head->cur_bkt_idx = alt_idx;
1013 			head->prev = tail;
1014 			head->prev_slot = i;
1015 			head++;
1016 		}
1017 		tail++;
1018 	}
1019 
1020 	return -ENOSPC;
1021 }
1022 
1023 static inline uint32_t
1024 alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)
1025 {
1026 	unsigned int n_slots;
1027 	uint32_t slot_id;
1028 
1029 	if (h->use_local_cache) {
1030 		/* Try to get a free slot from the local cache */
1031 		if (cached_free_slots->len == 0) {
1032 			/* Need to get another burst of free slots from global ring */
1033 			n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
1034 					cached_free_slots->objs,
1035 					sizeof(uint32_t),
1036 					LCORE_CACHE_SIZE, NULL);
1037 			if (n_slots == 0)
1038 				return EMPTY_SLOT;
1039 
1040 			cached_free_slots->len += n_slots;
1041 		}
1042 
1043 		/* Get a free slot from the local cache */
1044 		cached_free_slots->len--;
1045 		slot_id = cached_free_slots->objs[cached_free_slots->len];
1046 	} else {
1047 		if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
1048 						sizeof(uint32_t)) != 0)
1049 			return EMPTY_SLOT;
1050 	}
1051 
1052 	return slot_id;
1053 }
1054 
1055 static inline int32_t
1056 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
1057 						hash_sig_t sig, void *data)
1058 {
1059 	uint16_t short_sig;
1060 	uint32_t prim_bucket_idx, sec_bucket_idx;
1061 	struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt;
1062 	struct rte_hash_key *new_k, *keys = h->key_store;
1063 	uint32_t ext_bkt_id = 0;
1064 	uint32_t slot_id;
1065 	int ret;
1066 	unsigned lcore_id;
1067 	unsigned int i;
1068 	struct lcore_cache *cached_free_slots = NULL;
1069 	int32_t ret_val;
1070 	struct rte_hash_bucket *last;
1071 
1072 	short_sig = get_short_sig(sig);
1073 	prim_bucket_idx = get_prim_bucket_index(h, sig);
1074 	sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
1075 	prim_bkt = &h->buckets[prim_bucket_idx];
1076 	sec_bkt = &h->buckets[sec_bucket_idx];
1077 	rte_prefetch0(prim_bkt);
1078 	rte_prefetch0(sec_bkt);
1079 
1080 	/* Check if key is already inserted in primary location */
1081 	__hash_rw_writer_lock(h);
1082 	ret = search_and_update(h, data, key, prim_bkt, short_sig);
1083 	if (ret != -1) {
1084 		__hash_rw_writer_unlock(h);
1085 		return ret;
1086 	}
1087 
1088 	/* Check if key is already inserted in secondary location */
1089 	FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
1090 		ret = search_and_update(h, data, key, cur_bkt, short_sig);
1091 		if (ret != -1) {
1092 			__hash_rw_writer_unlock(h);
1093 			return ret;
1094 		}
1095 	}
1096 
1097 	__hash_rw_writer_unlock(h);
1098 
1099 	/* Did not find a match, so get a new slot for storing the new key */
1100 	if (h->use_local_cache) {
1101 		lcore_id = rte_lcore_id();
1102 		cached_free_slots = &h->local_free_slots[lcore_id];
1103 	}
1104 	slot_id = alloc_slot(h, cached_free_slots);
1105 	if (slot_id == EMPTY_SLOT) {
1106 		if (h->dq) {
1107 			__hash_rw_writer_lock(h);
1108 			ret = rte_rcu_qsbr_dq_reclaim(h->dq,
1109 					h->hash_rcu_cfg->max_reclaim_size,
1110 					NULL, NULL, NULL);
1111 			__hash_rw_writer_unlock(h);
1112 			if (ret == 0)
1113 				slot_id = alloc_slot(h, cached_free_slots);
1114 		}
1115 		if (slot_id == EMPTY_SLOT)
1116 			return -ENOSPC;
1117 	}
1118 
1119 	new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
1120 	/* The store to application data (by the application) at *data should
1121 	 * not leak after the store of pdata in the key store. i.e. pdata is
1122 	 * the guard variable. Release the application data to the readers.
1123 	 */
1124 	rte_atomic_store_explicit(&new_k->pdata,
1125 		data,
1126 		rte_memory_order_release);
1127 	/* Copy key */
1128 	memcpy(new_k->key, key, h->key_len);
1129 
1130 	/* Find an empty slot and insert */
1131 	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
1132 					short_sig, slot_id, &ret_val);
1133 	if (ret == 0)
1134 		return slot_id - 1;
1135 	else if (ret == 1) {
1136 		enqueue_slot_back(h, cached_free_slots, slot_id);
1137 		return ret_val;
1138 	}
1139 
1140 	/* Primary bucket full, need to make space for new entry */
1141 	ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data,
1142 				short_sig, prim_bucket_idx, slot_id, &ret_val);
1143 	if (ret == 0)
1144 		return slot_id - 1;
1145 	else if (ret == 1) {
1146 		enqueue_slot_back(h, cached_free_slots, slot_id);
1147 		return ret_val;
1148 	}
1149 
1150 	/* Also search secondary bucket to get better occupancy */
1151 	ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data,
1152 				short_sig, sec_bucket_idx, slot_id, &ret_val);
1153 
1154 	if (ret == 0)
1155 		return slot_id - 1;
1156 	else if (ret == 1) {
1157 		enqueue_slot_back(h, cached_free_slots, slot_id);
1158 		return ret_val;
1159 	}
1160 
1161 	/* if ext table not enabled, we failed the insertion */
1162 	if (!h->ext_table_support) {
1163 		enqueue_slot_back(h, cached_free_slots, slot_id);
1164 		return ret;
1165 	}
1166 
1167 	/* Now we need to go through the extendable bucket. Protection is needed
1168 	 * to protect all extendable bucket processes.
1169 	 */
1170 	__hash_rw_writer_lock(h);
1171 	/* We check for duplicates again since could be inserted before the lock */
1172 	ret = search_and_update(h, data, key, prim_bkt, short_sig);
1173 	if (ret != -1) {
1174 		enqueue_slot_back(h, cached_free_slots, slot_id);
1175 		goto failure;
1176 	}
1177 
1178 	FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
1179 		ret = search_and_update(h, data, key, cur_bkt, short_sig);
1180 		if (ret != -1) {
1181 			enqueue_slot_back(h, cached_free_slots, slot_id);
1182 			goto failure;
1183 		}
1184 	}
1185 
1186 	/* Search sec and ext buckets to find an empty entry to insert. */
1187 	FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
1188 		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
1189 			/* Check if slot is available */
1190 			if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) {
1191 				cur_bkt->sig_current[i] = short_sig;
1192 				/* Store to signature and key should not
1193 				 * leak after the store to key_idx. i.e.
1194 				 * key_idx is the guard variable for signature
1195 				 * and key.
1196 				 */
1197 				rte_atomic_store_explicit(&cur_bkt->key_idx[i],
1198 						 slot_id,
1199 						 rte_memory_order_release);
1200 				__hash_rw_writer_unlock(h);
1201 				return slot_id - 1;
1202 			}
1203 		}
1204 	}
1205 
1206 	/* Failed to get an empty entry from extendable buckets. Link a new
1207 	 * extendable bucket. We first get a free bucket from ring.
1208 	 */
1209 	if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
1210 						sizeof(uint32_t)) != 0 ||
1211 					ext_bkt_id == 0) {
1212 		if (h->dq) {
1213 			if (rte_rcu_qsbr_dq_reclaim(h->dq,
1214 					h->hash_rcu_cfg->max_reclaim_size,
1215 					NULL, NULL, NULL) == 0) {
1216 				rte_ring_sc_dequeue_elem(h->free_ext_bkts,
1217 							 &ext_bkt_id,
1218 							 sizeof(uint32_t));
1219 			}
1220 		}
1221 		if (ext_bkt_id == 0) {
1222 			ret = -ENOSPC;
1223 			goto failure;
1224 		}
1225 	}
1226 
1227 	/* Use the first location of the new bucket */
1228 	(h->buckets_ext[ext_bkt_id - 1]).sig_current[0] = short_sig;
1229 	/* Store to signature and key should not leak after
1230 	 * the store to key_idx. i.e. key_idx is the guard variable
1231 	 * for signature and key.
1232 	 */
1233 	rte_atomic_store_explicit(&(h->buckets_ext[ext_bkt_id - 1]).key_idx[0],
1234 			 slot_id,
1235 			 rte_memory_order_release);
1236 	/* Link the new bucket to sec bucket linked list */
1237 	last = rte_hash_get_last_bkt(sec_bkt);
1238 	last->next = &h->buckets_ext[ext_bkt_id - 1];
1239 	__hash_rw_writer_unlock(h);
1240 	return slot_id - 1;
1241 
1242 failure:
1243 	__hash_rw_writer_unlock(h);
1244 	return ret;
1245 
1246 }
1247 
1248 int32_t
1249 rte_hash_add_key_with_hash(const struct rte_hash *h,
1250 			const void *key, hash_sig_t sig)
1251 {
1252 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1253 	return __rte_hash_add_key_with_hash(h, key, sig, 0);
1254 }
1255 
1256 int32_t
1257 rte_hash_add_key(const struct rte_hash *h, const void *key)
1258 {
1259 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1260 	return __rte_hash_add_key_with_hash(h, key, rte_hash_hash(h, key), 0);
1261 }
1262 
1263 int
1264 rte_hash_add_key_with_hash_data(const struct rte_hash *h,
1265 			const void *key, hash_sig_t sig, void *data)
1266 {
1267 	int ret;
1268 
1269 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1270 	ret = __rte_hash_add_key_with_hash(h, key, sig, data);
1271 	if (ret >= 0)
1272 		return 0;
1273 	else
1274 		return ret;
1275 }
1276 
1277 int
1278 rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data)
1279 {
1280 	int ret;
1281 
1282 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1283 
1284 	ret = __rte_hash_add_key_with_hash(h, key, rte_hash_hash(h, key), data);
1285 	if (ret >= 0)
1286 		return 0;
1287 	else
1288 		return ret;
1289 }
1290 
1291 /* Search one bucket to find the match key - uses rw lock */
1292 static inline int32_t
1293 search_one_bucket_l(const struct rte_hash *h, const void *key,
1294 		uint16_t sig, void **data,
1295 		const struct rte_hash_bucket *bkt)
1296 {
1297 	int i;
1298 	struct rte_hash_key *k, *keys = h->key_store;
1299 
1300 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
1301 		if (bkt->sig_current[i] == sig &&
1302 				bkt->key_idx[i] != EMPTY_SLOT) {
1303 			k = (struct rte_hash_key *) ((char *)keys +
1304 					bkt->key_idx[i] * h->key_entry_size);
1305 
1306 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
1307 				if (data != NULL)
1308 					*data = k->pdata;
1309 				/*
1310 				 * Return index where key is stored,
1311 				 * subtracting the first dummy index
1312 				 */
1313 				return bkt->key_idx[i] - 1;
1314 			}
1315 		}
1316 	}
1317 	return -1;
1318 }
1319 
1320 /* Search one bucket to find the match key */
1321 static inline int32_t
1322 search_one_bucket_lf(const struct rte_hash *h, const void *key, uint16_t sig,
1323 			void **data, const struct rte_hash_bucket *bkt)
1324 {
1325 	int i;
1326 	uint32_t key_idx;
1327 	struct rte_hash_key *k, *keys = h->key_store;
1328 
1329 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
1330 		/* Signature comparison is done before the acquire-load
1331 		 * of the key index to achieve better performance.
1332 		 * This can result in the reader loading old signature
1333 		 * (which matches), while the key_idx is updated to a
1334 		 * value that belongs to a new key. However, the full
1335 		 * key comparison will ensure that the lookup fails.
1336 		 */
1337 		if (bkt->sig_current[i] == sig) {
1338 			key_idx = rte_atomic_load_explicit(&bkt->key_idx[i],
1339 					  rte_memory_order_acquire);
1340 			if (key_idx != EMPTY_SLOT) {
1341 				k = (struct rte_hash_key *) ((char *)keys +
1342 						key_idx * h->key_entry_size);
1343 
1344 				if (rte_hash_cmp_eq(key, k->key, h) == 0) {
1345 					if (data != NULL) {
1346 						*data = rte_atomic_load_explicit(
1347 							&k->pdata,
1348 							rte_memory_order_acquire);
1349 					}
1350 					/*
1351 					 * Return index where key is stored,
1352 					 * subtracting the first dummy index
1353 					 */
1354 					return key_idx - 1;
1355 				}
1356 			}
1357 		}
1358 	}
1359 	return -1;
1360 }
1361 
1362 static inline int32_t
1363 __rte_hash_lookup_with_hash_l(const struct rte_hash *h, const void *key,
1364 				hash_sig_t sig, void **data)
1365 {
1366 	uint32_t prim_bucket_idx, sec_bucket_idx;
1367 	struct rte_hash_bucket *bkt, *cur_bkt;
1368 	int ret;
1369 	uint16_t short_sig;
1370 
1371 	short_sig = get_short_sig(sig);
1372 	prim_bucket_idx = get_prim_bucket_index(h, sig);
1373 	sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
1374 
1375 	bkt = &h->buckets[prim_bucket_idx];
1376 
1377 	__hash_rw_reader_lock(h);
1378 
1379 	/* Check if key is in primary location */
1380 	ret = search_one_bucket_l(h, key, short_sig, data, bkt);
1381 	if (ret != -1) {
1382 		__hash_rw_reader_unlock(h);
1383 		return ret;
1384 	}
1385 	/* Calculate secondary hash */
1386 	bkt = &h->buckets[sec_bucket_idx];
1387 
1388 	/* Check if key is in secondary location */
1389 	FOR_EACH_BUCKET(cur_bkt, bkt) {
1390 		ret = search_one_bucket_l(h, key, short_sig,
1391 					data, cur_bkt);
1392 		if (ret != -1) {
1393 			__hash_rw_reader_unlock(h);
1394 			return ret;
1395 		}
1396 	}
1397 
1398 	__hash_rw_reader_unlock(h);
1399 
1400 	return -ENOENT;
1401 }
1402 
1403 static inline int32_t
1404 __rte_hash_lookup_with_hash_lf(const struct rte_hash *h, const void *key,
1405 					hash_sig_t sig, void **data)
1406 {
1407 	uint32_t prim_bucket_idx, sec_bucket_idx;
1408 	struct rte_hash_bucket *bkt, *cur_bkt;
1409 	uint32_t cnt_b, cnt_a;
1410 	int ret;
1411 	uint16_t short_sig;
1412 
1413 	short_sig = get_short_sig(sig);
1414 	prim_bucket_idx = get_prim_bucket_index(h, sig);
1415 	sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
1416 
1417 	do {
1418 		/* Load the table change counter before the lookup
1419 		 * starts. Acquire semantics will make sure that
1420 		 * loads in search_one_bucket are not hoisted.
1421 		 */
1422 		cnt_b = rte_atomic_load_explicit(h->tbl_chng_cnt,
1423 				rte_memory_order_acquire);
1424 
1425 		/* Check if key is in primary location */
1426 		bkt = &h->buckets[prim_bucket_idx];
1427 		ret = search_one_bucket_lf(h, key, short_sig, data, bkt);
1428 		if (ret != -1)
1429 			return ret;
1430 		/* Calculate secondary hash */
1431 		bkt = &h->buckets[sec_bucket_idx];
1432 
1433 		/* Check if key is in secondary location */
1434 		FOR_EACH_BUCKET(cur_bkt, bkt) {
1435 			ret = search_one_bucket_lf(h, key, short_sig,
1436 						data, cur_bkt);
1437 			if (ret != -1)
1438 				return ret;
1439 		}
1440 
1441 		/* The loads of sig_current in search_one_bucket
1442 		 * should not move below the load from tbl_chng_cnt.
1443 		 */
1444 		rte_atomic_thread_fence(rte_memory_order_acquire);
1445 		/* Re-read the table change counter to check if the
1446 		 * table has changed during search. If yes, re-do
1447 		 * the search.
1448 		 * This load should not get hoisted. The load
1449 		 * acquires on cnt_b, key index in primary bucket
1450 		 * and key index in secondary bucket will make sure
1451 		 * that it does not get hoisted.
1452 		 */
1453 		cnt_a = rte_atomic_load_explicit(h->tbl_chng_cnt,
1454 					rte_memory_order_acquire);
1455 	} while (cnt_b != cnt_a);
1456 
1457 	return -ENOENT;
1458 }
1459 
1460 static inline int32_t
1461 __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
1462 					hash_sig_t sig, void **data)
1463 {
1464 	if (h->readwrite_concur_lf_support)
1465 		return __rte_hash_lookup_with_hash_lf(h, key, sig, data);
1466 	else
1467 		return __rte_hash_lookup_with_hash_l(h, key, sig, data);
1468 }
1469 
1470 int32_t
1471 rte_hash_lookup_with_hash(const struct rte_hash *h,
1472 			const void *key, hash_sig_t sig)
1473 {
1474 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1475 	return __rte_hash_lookup_with_hash(h, key, sig, NULL);
1476 }
1477 
1478 int32_t
1479 rte_hash_lookup(const struct rte_hash *h, const void *key)
1480 {
1481 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1482 	return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), NULL);
1483 }
1484 
1485 int
1486 rte_hash_lookup_with_hash_data(const struct rte_hash *h,
1487 			const void *key, hash_sig_t sig, void **data)
1488 {
1489 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1490 	return __rte_hash_lookup_with_hash(h, key, sig, data);
1491 }
1492 
1493 int
1494 rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data)
1495 {
1496 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1497 	return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);
1498 }
1499 
1500 static int
1501 free_slot(const struct rte_hash *h, uint32_t slot_id)
1502 {
1503 	unsigned lcore_id, n_slots;
1504 	struct lcore_cache *cached_free_slots = NULL;
1505 
1506 	/* Return key indexes to free slot ring */
1507 	if (h->use_local_cache) {
1508 		lcore_id = rte_lcore_id();
1509 		cached_free_slots = &h->local_free_slots[lcore_id];
1510 		/* Cache full, need to free it. */
1511 		if (cached_free_slots->len == LCORE_CACHE_SIZE) {
1512 			/* Need to enqueue the free slots in global ring. */
1513 			n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
1514 						cached_free_slots->objs,
1515 						sizeof(uint32_t),
1516 						LCORE_CACHE_SIZE, NULL);
1517 			RETURN_IF_TRUE((n_slots == 0), -EFAULT);
1518 			cached_free_slots->len -= n_slots;
1519 		}
1520 	}
1521 
1522 	enqueue_slot_back(h, cached_free_slots, slot_id);
1523 	return 0;
1524 }
1525 
1526 static void
1527 __hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n)
1528 {
1529 	void *key_data = NULL;
1530 	int ret;
1531 	struct rte_hash_key *keys, *k;
1532 	struct rte_hash *h = (struct rte_hash *)p;
1533 	struct __rte_hash_rcu_dq_entry rcu_dq_entry =
1534 			*((struct __rte_hash_rcu_dq_entry *)e);
1535 
1536 	RTE_SET_USED(n);
1537 	keys = h->key_store;
1538 
1539 	k = (struct rte_hash_key *) ((char *)keys +
1540 				rcu_dq_entry.key_idx * h->key_entry_size);
1541 	key_data = k->pdata;
1542 	if (h->hash_rcu_cfg->free_key_data_func)
1543 		h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr,
1544 						    key_data);
1545 
1546 	if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT)
1547 		/* Recycle empty ext bkt to free list. */
1548 		rte_ring_sp_enqueue_elem(h->free_ext_bkts,
1549 			&rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t));
1550 
1551 	/* Return key indexes to free slot ring */
1552 	ret = free_slot(h, rcu_dq_entry.key_idx);
1553 	if (ret < 0) {
1554 		HASH_LOG(ERR,
1555 			"%s: could not enqueue free slots in global ring",
1556 				__func__);
1557 	}
1558 }
1559 
1560 int
1561 rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_hash_rcu_config *cfg)
1562 {
1563 	struct rte_rcu_qsbr_dq_parameters params = {0};
1564 	char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
1565 	struct rte_hash_rcu_config *hash_rcu_cfg = NULL;
1566 
1567 	if (h == NULL || cfg == NULL || cfg->v == NULL) {
1568 		rte_errno = EINVAL;
1569 		return 1;
1570 	}
1571 
1572 	const uint32_t total_entries = h->use_local_cache ?
1573 		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
1574 							: h->entries + 1;
1575 
1576 	if (h->hash_rcu_cfg) {
1577 		rte_errno = EEXIST;
1578 		return 1;
1579 	}
1580 
1581 	hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0);
1582 	if (hash_rcu_cfg == NULL) {
1583 		HASH_LOG(ERR, "memory allocation failed");
1584 		return 1;
1585 	}
1586 
1587 	if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
1588 		/* No other things to do. */
1589 	} else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) {
1590 		/* Init QSBR defer queue. */
1591 		snprintf(rcu_dq_name, sizeof(rcu_dq_name),
1592 					"HASH_RCU_%s", h->name);
1593 		params.name = rcu_dq_name;
1594 		params.size = cfg->dq_size;
1595 		if (params.size == 0)
1596 			params.size = total_entries;
1597 		params.trigger_reclaim_limit = cfg->trigger_reclaim_limit;
1598 		params.max_reclaim_size = cfg->max_reclaim_size;
1599 		if (params.max_reclaim_size == 0)
1600 			params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX;
1601 		params.esize = sizeof(struct __rte_hash_rcu_dq_entry);
1602 		params.free_fn = __hash_rcu_qsbr_free_resource;
1603 		params.p = h;
1604 		params.v = cfg->v;
1605 		h->dq = rte_rcu_qsbr_dq_create(&params);
1606 		if (h->dq == NULL) {
1607 			rte_free(hash_rcu_cfg);
1608 			HASH_LOG(ERR, "HASH defer queue creation failed");
1609 			return 1;
1610 		}
1611 	} else {
1612 		rte_free(hash_rcu_cfg);
1613 		rte_errno = EINVAL;
1614 		return 1;
1615 	}
1616 
1617 	hash_rcu_cfg->v = cfg->v;
1618 	hash_rcu_cfg->mode = cfg->mode;
1619 	hash_rcu_cfg->dq_size = params.size;
1620 	hash_rcu_cfg->trigger_reclaim_limit = params.trigger_reclaim_limit;
1621 	hash_rcu_cfg->max_reclaim_size = params.max_reclaim_size;
1622 	hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func;
1623 	hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr;
1624 
1625 	h->hash_rcu_cfg = hash_rcu_cfg;
1626 
1627 	return 0;
1628 }
1629 
1630 int rte_hash_rcu_qsbr_dq_reclaim(struct rte_hash *h, unsigned int *freed, unsigned int *pending,
1631 				 unsigned int *available)
1632 {
1633 	int ret;
1634 
1635 	if (h == NULL || h->hash_rcu_cfg == NULL) {
1636 		HASH_LOG(ERR, "Invalid input parameter");
1637 		rte_errno = EINVAL;
1638 		return 1;
1639 	}
1640 
1641 	ret = rte_rcu_qsbr_dq_reclaim(h->dq, h->hash_rcu_cfg->max_reclaim_size, freed, pending,
1642 				      available);
1643 	if (ret != 0) {
1644 		HASH_LOG(ERR, "%s: could not reclaim the defer queue in hash table", __func__);
1645 		return 1;
1646 	}
1647 
1648 	return 0;
1649 }
1650 
1651 static inline void
1652 remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt,
1653 		unsigned int i)
1654 {
1655 	int ret = free_slot(h, bkt->key_idx[i]);
1656 
1657 	if (ret < 0) {
1658 		HASH_LOG(ERR,
1659 			"%s: could not enqueue free slots in global ring",
1660 				__func__);
1661 	}
1662 }
1663 
1664 /* Compact the linked list by moving key from last entry in linked list to the
1665  * empty slot.
1666  */
1667 static inline void
1668 __rte_hash_compact_ll(const struct rte_hash *h,
1669 			struct rte_hash_bucket *cur_bkt, int pos) {
1670 	int i;
1671 	struct rte_hash_bucket *last_bkt;
1672 
1673 	if (!cur_bkt->next)
1674 		return;
1675 
1676 	last_bkt = rte_hash_get_last_bkt(cur_bkt);
1677 
1678 	for (i = RTE_HASH_BUCKET_ENTRIES - 1; i >= 0; i--) {
1679 		if (last_bkt->key_idx[i] != EMPTY_SLOT) {
1680 			cur_bkt->sig_current[pos] = last_bkt->sig_current[i];
1681 			rte_atomic_store_explicit(&cur_bkt->key_idx[pos],
1682 					 last_bkt->key_idx[i],
1683 					 rte_memory_order_release);
1684 			if (h->readwrite_concur_lf_support) {
1685 				/* Inform the readers that the table has changed
1686 				 * Since there is one writer, load acquire on
1687 				 * tbl_chng_cnt is not required.
1688 				 */
1689 				rte_atomic_store_explicit(h->tbl_chng_cnt,
1690 					 *h->tbl_chng_cnt + 1,
1691 					 rte_memory_order_release);
1692 				/* The store to sig_current should
1693 				 * not move above the store to tbl_chng_cnt.
1694 				 */
1695 				rte_atomic_thread_fence(rte_memory_order_release);
1696 			}
1697 			last_bkt->sig_current[i] = NULL_SIGNATURE;
1698 			rte_atomic_store_explicit(&last_bkt->key_idx[i],
1699 					 EMPTY_SLOT,
1700 					 rte_memory_order_release);
1701 			return;
1702 		}
1703 	}
1704 }
1705 
1706 /* Search one bucket and remove the matched key.
1707  * Writer is expected to hold the lock while calling this
1708  * function.
1709  */
1710 static inline int32_t
1711 search_and_remove(const struct rte_hash *h, const void *key,
1712 			struct rte_hash_bucket *bkt, uint16_t sig, int *pos)
1713 {
1714 	struct rte_hash_key *k, *keys = h->key_store;
1715 	unsigned int i;
1716 	uint32_t key_idx;
1717 
1718 	/* Check if key is in bucket */
1719 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
1720 		key_idx = rte_atomic_load_explicit(&bkt->key_idx[i],
1721 					  rte_memory_order_acquire);
1722 		if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) {
1723 			k = (struct rte_hash_key *) ((char *)keys +
1724 					key_idx * h->key_entry_size);
1725 			if (rte_hash_cmp_eq(key, k->key, h) == 0) {
1726 				bkt->sig_current[i] = NULL_SIGNATURE;
1727 				/* Free the key store index if
1728 				 * no_free_on_del is disabled.
1729 				 */
1730 				if (!h->no_free_on_del)
1731 					remove_entry(h, bkt, i);
1732 
1733 				rte_atomic_store_explicit(&bkt->key_idx[i],
1734 						 EMPTY_SLOT,
1735 						 rte_memory_order_release);
1736 
1737 				*pos = i;
1738 				/*
1739 				 * Return index where key is stored,
1740 				 * subtracting the first dummy index
1741 				 */
1742 				return key_idx - 1;
1743 			}
1744 		}
1745 	}
1746 	return -1;
1747 }
1748 
1749 static inline int32_t
1750 __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
1751 						hash_sig_t sig)
1752 {
1753 	uint32_t prim_bucket_idx, sec_bucket_idx;
1754 	struct rte_hash_bucket *prim_bkt, *sec_bkt, *prev_bkt, *last_bkt;
1755 	struct rte_hash_bucket *cur_bkt;
1756 	int pos;
1757 	int32_t ret, i;
1758 	uint16_t short_sig;
1759 	uint32_t index = EMPTY_SLOT;
1760 	struct __rte_hash_rcu_dq_entry rcu_dq_entry;
1761 
1762 	short_sig = get_short_sig(sig);
1763 	prim_bucket_idx = get_prim_bucket_index(h, sig);
1764 	sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
1765 	prim_bkt = &h->buckets[prim_bucket_idx];
1766 
1767 	__hash_rw_writer_lock(h);
1768 	/* look for key in primary bucket */
1769 	ret = search_and_remove(h, key, prim_bkt, short_sig, &pos);
1770 	if (ret != -1) {
1771 		__rte_hash_compact_ll(h, prim_bkt, pos);
1772 		last_bkt = prim_bkt->next;
1773 		prev_bkt = prim_bkt;
1774 		goto return_bkt;
1775 	}
1776 
1777 	/* Calculate secondary hash */
1778 	sec_bkt = &h->buckets[sec_bucket_idx];
1779 
1780 	FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
1781 		ret = search_and_remove(h, key, cur_bkt, short_sig, &pos);
1782 		if (ret != -1) {
1783 			__rte_hash_compact_ll(h, cur_bkt, pos);
1784 			last_bkt = sec_bkt->next;
1785 			prev_bkt = sec_bkt;
1786 			goto return_bkt;
1787 		}
1788 	}
1789 
1790 	__hash_rw_writer_unlock(h);
1791 	return -ENOENT;
1792 
1793 /* Search last bucket to see if empty to be recycled */
1794 return_bkt:
1795 	if (!last_bkt)
1796 		goto return_key;
1797 
1798 	while (last_bkt->next) {
1799 		prev_bkt = last_bkt;
1800 		last_bkt = last_bkt->next;
1801 	}
1802 
1803 	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
1804 		if (last_bkt->key_idx[i] != EMPTY_SLOT)
1805 			break;
1806 	}
1807 	/* found empty bucket and recycle */
1808 	if (i == RTE_HASH_BUCKET_ENTRIES) {
1809 		prev_bkt->next = NULL;
1810 		index = last_bkt - h->buckets_ext + 1;
1811 		/* Recycle the empty bkt if
1812 		 * no_free_on_del is disabled.
1813 		 */
1814 		if (h->no_free_on_del) {
1815 			/* Store index of an empty ext bkt to be recycled
1816 			 * on calling rte_hash_del_xxx APIs.
1817 			 * When lock free read-write concurrency is enabled,
1818 			 * an empty ext bkt cannot be put into free list
1819 			 * immediately (as readers might be using it still).
1820 			 * Hence freeing of the ext bkt is piggy-backed to
1821 			 * freeing of the key index.
1822 			 * If using external RCU, store this index in an array.
1823 			 */
1824 			if (h->hash_rcu_cfg == NULL)
1825 				h->ext_bkt_to_free[ret] = index;
1826 		} else
1827 			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
1828 							sizeof(uint32_t));
1829 	}
1830 
1831 return_key:
1832 	/* Using internal RCU QSBR */
1833 	if (h->hash_rcu_cfg) {
1834 		/* Key index where key is stored, adding the first dummy index */
1835 		rcu_dq_entry.key_idx = ret + 1;
1836 		rcu_dq_entry.ext_bkt_idx = index;
1837 		if (h->dq == NULL) {
1838 			/* Wait for quiescent state change if using
1839 			 * RTE_HASH_QSBR_MODE_SYNC
1840 			 */
1841 			rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v,
1842 						 RTE_QSBR_THRID_INVALID);
1843 			__hash_rcu_qsbr_free_resource((void *)((uintptr_t)h),
1844 						      &rcu_dq_entry, 1);
1845 		} else if (h->dq)
1846 			/* Push into QSBR FIFO if using RTE_HASH_QSBR_MODE_DQ */
1847 			if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0)
1848 				HASH_LOG(ERR, "Failed to push QSBR FIFO");
1849 	}
1850 	__hash_rw_writer_unlock(h);
1851 	return ret;
1852 }
1853 
1854 int32_t
1855 rte_hash_del_key_with_hash(const struct rte_hash *h,
1856 			const void *key, hash_sig_t sig)
1857 {
1858 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1859 	return __rte_hash_del_key_with_hash(h, key, sig);
1860 }
1861 
1862 int32_t
1863 rte_hash_del_key(const struct rte_hash *h, const void *key)
1864 {
1865 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1866 	return __rte_hash_del_key_with_hash(h, key, rte_hash_hash(h, key));
1867 }
1868 
1869 int
1870 rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
1871 			       void **key)
1872 {
1873 	RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
1874 
1875 	struct rte_hash_key *k, *keys = h->key_store;
1876 	k = (struct rte_hash_key *) ((char *) keys + (position + 1) *
1877 				     h->key_entry_size);
1878 	*key = k->key;
1879 
1880 	if (position !=
1881 	    __rte_hash_lookup_with_hash(h, *key, rte_hash_hash(h, *key),
1882 					NULL)) {
1883 		return -ENOENT;
1884 	}
1885 
1886 	return 0;
1887 }
1888 
1889 int
1890 rte_hash_free_key_with_position(const struct rte_hash *h,
1891 				const int32_t position)
1892 {
1893 	/* Key index where key is stored, adding the first dummy index */
1894 	uint32_t key_idx = position + 1;
1895 
1896 	RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL);
1897 
1898 	const uint32_t total_entries = h->use_local_cache ?
1899 		h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
1900 							: h->entries + 1;
1901 
1902 	/* Out of bounds */
1903 	if (key_idx >= total_entries)
1904 		return -EINVAL;
1905 	if (h->ext_table_support && h->readwrite_concur_lf_support) {
1906 		uint32_t index = h->ext_bkt_to_free[position];
1907 		if (index) {
1908 			/* Recycle empty ext bkt to free list. */
1909 			rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
1910 							sizeof(uint32_t));
1911 			h->ext_bkt_to_free[position] = 0;
1912 		}
1913 	}
1914 
1915 	/* Enqueue slot to cache/ring of free slots. */
1916 	return free_slot(h, key_idx);
1917 
1918 }
1919 
1920 static inline void
1921 __bulk_lookup_l(const struct rte_hash *h, const void **keys,
1922 		const struct rte_hash_bucket **primary_bkt,
1923 		const struct rte_hash_bucket **secondary_bkt,
1924 		uint16_t *sig, int32_t num_keys, int32_t *positions,
1925 		uint64_t *hit_mask, void *data[])
1926 {
1927 	uint64_t hits = 0;
1928 	int32_t i;
1929 	int32_t ret;
1930 	struct rte_hash_bucket *cur_bkt, *next_bkt;
1931 
1932 #if DENSE_HASH_BULK_LOOKUP
1933 	const int hitmask_padding = 0;
1934 	uint16_t hitmask_buffer[RTE_HASH_LOOKUP_BULK_MAX] = {0};
1935 #else
1936 	const int hitmask_padding = 1;
1937 	uint32_t prim_hitmask_buffer[RTE_HASH_LOOKUP_BULK_MAX] = {0};
1938 	uint32_t sec_hitmask_buffer[RTE_HASH_LOOKUP_BULK_MAX] = {0};
1939 #endif
1940 
1941 	__hash_rw_reader_lock(h);
1942 
1943 	/* Compare signatures and prefetch key slot of first hit */
1944 	for (i = 0; i < num_keys; i++) {
1945 #if DENSE_HASH_BULK_LOOKUP
1946 		uint16_t *hitmask = &hitmask_buffer[i];
1947 		compare_signatures_dense(hitmask,
1948 			primary_bkt[i]->sig_current,
1949 			secondary_bkt[i]->sig_current,
1950 			sig[i], h->sig_cmp_fn);
1951 		const unsigned int prim_hitmask = *(uint8_t *)(hitmask);
1952 		const unsigned int sec_hitmask = *((uint8_t *)(hitmask)+1);
1953 #else
1954 		compare_signatures_sparse(&prim_hitmask_buffer[i], &sec_hitmask_buffer[i],
1955 			primary_bkt[i], secondary_bkt[i],
1956 			sig[i], h->sig_cmp_fn);
1957 		const unsigned int prim_hitmask = prim_hitmask_buffer[i];
1958 		const unsigned int sec_hitmask = sec_hitmask_buffer[i];
1959 #endif
1960 
1961 		if (prim_hitmask) {
1962 			uint32_t first_hit =
1963 					rte_ctz32(prim_hitmask)
1964 					>> hitmask_padding;
1965 			uint32_t key_idx =
1966 				primary_bkt[i]->key_idx[first_hit];
1967 			const struct rte_hash_key *key_slot =
1968 				(const struct rte_hash_key *)(
1969 				(const char *)h->key_store +
1970 				key_idx * h->key_entry_size);
1971 			rte_prefetch0(key_slot);
1972 			continue;
1973 		}
1974 
1975 		if (sec_hitmask) {
1976 			uint32_t first_hit =
1977 					rte_ctz32(sec_hitmask)
1978 					>> hitmask_padding;
1979 			uint32_t key_idx =
1980 				secondary_bkt[i]->key_idx[first_hit];
1981 			const struct rte_hash_key *key_slot =
1982 				(const struct rte_hash_key *)(
1983 				(const char *)h->key_store +
1984 				key_idx * h->key_entry_size);
1985 			rte_prefetch0(key_slot);
1986 		}
1987 	}
1988 
1989 	/* Compare keys, first hits in primary first */
1990 	for (i = 0; i < num_keys; i++) {
1991 		positions[i] = -ENOENT;
1992 #if DENSE_HASH_BULK_LOOKUP
1993 		uint16_t *hitmask = &hitmask_buffer[i];
1994 		unsigned int prim_hitmask = *(uint8_t *)(hitmask);
1995 		unsigned int sec_hitmask = *((uint8_t *)(hitmask)+1);
1996 #else
1997 		unsigned int prim_hitmask = prim_hitmask_buffer[i];
1998 		unsigned int sec_hitmask = sec_hitmask_buffer[i];
1999 #endif
2000 		while (prim_hitmask) {
2001 			uint32_t hit_index =
2002 					rte_ctz32(prim_hitmask)
2003 					>> hitmask_padding;
2004 			uint32_t key_idx =
2005 				primary_bkt[i]->key_idx[hit_index];
2006 			const struct rte_hash_key *key_slot =
2007 				(const struct rte_hash_key *)(
2008 				(const char *)h->key_store +
2009 				key_idx * h->key_entry_size);
2010 
2011 			/*
2012 			 * If key index is 0, do not compare key,
2013 			 * as it is checking the dummy slot
2014 			 */
2015 			if (!!key_idx &
2016 				!rte_hash_cmp_eq(
2017 					key_slot->key, keys[i], h)) {
2018 				if (data != NULL)
2019 					data[i] = key_slot->pdata;
2020 
2021 				hits |= 1ULL << i;
2022 				positions[i] = key_idx - 1;
2023 				goto next_key;
2024 			}
2025 			prim_hitmask &= ~(1 << (hit_index << hitmask_padding));
2026 		}
2027 
2028 		while (sec_hitmask) {
2029 			uint32_t hit_index =
2030 					rte_ctz32(sec_hitmask)
2031 					>> hitmask_padding;
2032 			uint32_t key_idx =
2033 				secondary_bkt[i]->key_idx[hit_index];
2034 			const struct rte_hash_key *key_slot =
2035 				(const struct rte_hash_key *)(
2036 				(const char *)h->key_store +
2037 				key_idx * h->key_entry_size);
2038 
2039 			/*
2040 			 * If key index is 0, do not compare key,
2041 			 * as it is checking the dummy slot
2042 			 */
2043 
2044 			if (!!key_idx &
2045 				!rte_hash_cmp_eq(
2046 					key_slot->key, keys[i], h)) {
2047 				if (data != NULL)
2048 					data[i] = key_slot->pdata;
2049 
2050 				hits |= 1ULL << i;
2051 				positions[i] = key_idx - 1;
2052 				goto next_key;
2053 			}
2054 			sec_hitmask &= ~(1 << (hit_index << hitmask_padding));
2055 		}
2056 next_key:
2057 		continue;
2058 	}
2059 
2060 	/* all found, do not need to go through ext bkt */
2061 	if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) {
2062 		if (hit_mask != NULL)
2063 			*hit_mask = hits;
2064 		__hash_rw_reader_unlock(h);
2065 		return;
2066 	}
2067 
2068 	/* need to check ext buckets for match */
2069 	for (i = 0; i < num_keys; i++) {
2070 		if ((hits & (1ULL << i)) != 0)
2071 			continue;
2072 		next_bkt = secondary_bkt[i]->next;
2073 		FOR_EACH_BUCKET(cur_bkt, next_bkt) {
2074 			if (data != NULL)
2075 				ret = search_one_bucket_l(h, keys[i],
2076 						sig[i], &data[i], cur_bkt);
2077 			else
2078 				ret = search_one_bucket_l(h, keys[i],
2079 						sig[i], NULL, cur_bkt);
2080 			if (ret != -1) {
2081 				positions[i] = ret;
2082 				hits |= 1ULL << i;
2083 				break;
2084 			}
2085 		}
2086 	}
2087 
2088 	__hash_rw_reader_unlock(h);
2089 
2090 	if (hit_mask != NULL)
2091 		*hit_mask = hits;
2092 }
2093 
2094 static inline void
2095 __bulk_lookup_lf(const struct rte_hash *h, const void **keys,
2096 		const struct rte_hash_bucket **primary_bkt,
2097 		const struct rte_hash_bucket **secondary_bkt,
2098 		uint16_t *sig, int32_t num_keys, int32_t *positions,
2099 		uint64_t *hit_mask, void *data[])
2100 {
2101 	uint64_t hits = 0;
2102 	int32_t i;
2103 	int32_t ret;
2104 	struct rte_hash_bucket *cur_bkt, *next_bkt;
2105 	uint32_t cnt_b, cnt_a;
2106 
2107 #if DENSE_HASH_BULK_LOOKUP
2108 	const int hitmask_padding = 0;
2109 	uint16_t hitmask_buffer[RTE_HASH_LOOKUP_BULK_MAX] = {0};
2110 	static_assert(sizeof(*hitmask_buffer)*8/2 == RTE_HASH_BUCKET_ENTRIES,
2111 	"The hitmask must be exactly wide enough to accept the whole hitmask chen it is dense");
2112 #else
2113 	const int hitmask_padding = 1;
2114 	uint32_t prim_hitmask_buffer[RTE_HASH_LOOKUP_BULK_MAX] = {0};
2115 	uint32_t sec_hitmask_buffer[RTE_HASH_LOOKUP_BULK_MAX] = {0};
2116 #endif
2117 
2118 	for (i = 0; i < num_keys; i++)
2119 		positions[i] = -ENOENT;
2120 
2121 	do {
2122 		/* Load the table change counter before the lookup
2123 		 * starts. Acquire semantics will make sure that
2124 		 * loads in compare_signatures are not hoisted.
2125 		 */
2126 		cnt_b = rte_atomic_load_explicit(h->tbl_chng_cnt,
2127 					rte_memory_order_acquire);
2128 
2129 		/* Compare signatures and prefetch key slot of first hit */
2130 		for (i = 0; i < num_keys; i++) {
2131 #if DENSE_HASH_BULK_LOOKUP
2132 			uint16_t *hitmask = &hitmask_buffer[i];
2133 			compare_signatures_dense(hitmask,
2134 				primary_bkt[i]->sig_current,
2135 				secondary_bkt[i]->sig_current,
2136 				sig[i], h->sig_cmp_fn);
2137 			const unsigned int prim_hitmask = *(uint8_t *)(hitmask);
2138 			const unsigned int sec_hitmask = *((uint8_t *)(hitmask)+1);
2139 #else
2140 			compare_signatures_sparse(&prim_hitmask_buffer[i], &sec_hitmask_buffer[i],
2141 				primary_bkt[i], secondary_bkt[i],
2142 				sig[i], h->sig_cmp_fn);
2143 			const unsigned int prim_hitmask = prim_hitmask_buffer[i];
2144 			const unsigned int sec_hitmask = sec_hitmask_buffer[i];
2145 #endif
2146 
2147 			if (prim_hitmask) {
2148 				uint32_t first_hit =
2149 						rte_ctz32(prim_hitmask)
2150 						>> hitmask_padding;
2151 				uint32_t key_idx =
2152 					primary_bkt[i]->key_idx[first_hit];
2153 				const struct rte_hash_key *key_slot =
2154 					(const struct rte_hash_key *)(
2155 					(const char *)h->key_store +
2156 					key_idx * h->key_entry_size);
2157 				rte_prefetch0(key_slot);
2158 				continue;
2159 			}
2160 
2161 			if (sec_hitmask) {
2162 				uint32_t first_hit =
2163 						rte_ctz32(sec_hitmask)
2164 						>> hitmask_padding;
2165 				uint32_t key_idx =
2166 					secondary_bkt[i]->key_idx[first_hit];
2167 				const struct rte_hash_key *key_slot =
2168 					(const struct rte_hash_key *)(
2169 					(const char *)h->key_store +
2170 					key_idx * h->key_entry_size);
2171 				rte_prefetch0(key_slot);
2172 			}
2173 		}
2174 
2175 		/* Compare keys, first hits in primary first */
2176 		for (i = 0; i < num_keys; i++) {
2177 #if DENSE_HASH_BULK_LOOKUP
2178 			uint16_t *hitmask = &hitmask_buffer[i];
2179 			unsigned int prim_hitmask = *(uint8_t *)(hitmask);
2180 			unsigned int sec_hitmask = *((uint8_t *)(hitmask)+1);
2181 #else
2182 			unsigned int prim_hitmask = prim_hitmask_buffer[i];
2183 			unsigned int sec_hitmask = sec_hitmask_buffer[i];
2184 #endif
2185 			while (prim_hitmask) {
2186 				uint32_t hit_index =
2187 						rte_ctz32(prim_hitmask)
2188 						>> hitmask_padding;
2189 				uint32_t key_idx =
2190 				rte_atomic_load_explicit(
2191 					&primary_bkt[i]->key_idx[hit_index],
2192 					rte_memory_order_acquire);
2193 				const struct rte_hash_key *key_slot =
2194 					(const struct rte_hash_key *)(
2195 					(const char *)h->key_store +
2196 					key_idx * h->key_entry_size);
2197 
2198 				/*
2199 				 * If key index is 0, do not compare key,
2200 				 * as it is checking the dummy slot
2201 				 */
2202 				if (!!key_idx &
2203 					!rte_hash_cmp_eq(
2204 						key_slot->key, keys[i], h)) {
2205 					if (data != NULL)
2206 						data[i] = rte_atomic_load_explicit(
2207 							&key_slot->pdata,
2208 							rte_memory_order_acquire);
2209 
2210 					hits |= 1ULL << i;
2211 					positions[i] = key_idx - 1;
2212 					goto next_key;
2213 				}
2214 				prim_hitmask &= ~(1 << (hit_index << hitmask_padding));
2215 			}
2216 
2217 			while (sec_hitmask) {
2218 				uint32_t hit_index =
2219 						rte_ctz32(sec_hitmask)
2220 						>> hitmask_padding;
2221 				uint32_t key_idx =
2222 				rte_atomic_load_explicit(
2223 					&secondary_bkt[i]->key_idx[hit_index],
2224 					rte_memory_order_acquire);
2225 				const struct rte_hash_key *key_slot =
2226 					(const struct rte_hash_key *)(
2227 					(const char *)h->key_store +
2228 					key_idx * h->key_entry_size);
2229 
2230 				/*
2231 				 * If key index is 0, do not compare key,
2232 				 * as it is checking the dummy slot
2233 				 */
2234 
2235 				if (!!key_idx &
2236 					!rte_hash_cmp_eq(
2237 						key_slot->key, keys[i], h)) {
2238 					if (data != NULL)
2239 						data[i] = rte_atomic_load_explicit(
2240 							&key_slot->pdata,
2241 							rte_memory_order_acquire);
2242 
2243 					hits |= 1ULL << i;
2244 					positions[i] = key_idx - 1;
2245 					goto next_key;
2246 				}
2247 				sec_hitmask &= ~(1 << (hit_index << hitmask_padding));
2248 			}
2249 next_key:
2250 			continue;
2251 		}
2252 
2253 		/* all found, do not need to go through ext bkt */
2254 		if (hits == ((1ULL << num_keys) - 1)) {
2255 			if (hit_mask != NULL)
2256 				*hit_mask = hits;
2257 			return;
2258 		}
2259 		/* need to check ext buckets for match */
2260 		if (h->ext_table_support) {
2261 			for (i = 0; i < num_keys; i++) {
2262 				if ((hits & (1ULL << i)) != 0)
2263 					continue;
2264 				next_bkt = secondary_bkt[i]->next;
2265 				FOR_EACH_BUCKET(cur_bkt, next_bkt) {
2266 					if (data != NULL)
2267 						ret = search_one_bucket_lf(h,
2268 							keys[i], sig[i],
2269 							&data[i], cur_bkt);
2270 					else
2271 						ret = search_one_bucket_lf(h,
2272 								keys[i], sig[i],
2273 								NULL, cur_bkt);
2274 					if (ret != -1) {
2275 						positions[i] = ret;
2276 						hits |= 1ULL << i;
2277 						break;
2278 					}
2279 				}
2280 			}
2281 		}
2282 		/* The loads of sig_current in compare_signatures
2283 		 * should not move below the load from tbl_chng_cnt.
2284 		 */
2285 		rte_atomic_thread_fence(rte_memory_order_acquire);
2286 		/* Re-read the table change counter to check if the
2287 		 * table has changed during search. If yes, re-do
2288 		 * the search.
2289 		 * This load should not get hoisted. The load
2290 		 * acquires on cnt_b, primary key index and secondary
2291 		 * key index will make sure that it does not get
2292 		 * hoisted.
2293 		 */
2294 		cnt_a = rte_atomic_load_explicit(h->tbl_chng_cnt,
2295 					rte_memory_order_acquire);
2296 	} while (cnt_b != cnt_a);
2297 
2298 	if (hit_mask != NULL)
2299 		*hit_mask = hits;
2300 }
2301 
2302 #define PREFETCH_OFFSET 4
2303 static inline void
2304 __bulk_lookup_prefetching_loop(const struct rte_hash *h,
2305 	const void **keys, int32_t num_keys,
2306 	uint16_t *sig,
2307 	const struct rte_hash_bucket **primary_bkt,
2308 	const struct rte_hash_bucket **secondary_bkt)
2309 {
2310 	int32_t i;
2311 	uint32_t prim_hash[RTE_HASH_LOOKUP_BULK_MAX];
2312 	uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX];
2313 	uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX];
2314 
2315 	/* Prefetch first keys */
2316 	for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
2317 		rte_prefetch0(keys[i]);
2318 
2319 	/*
2320 	 * Prefetch rest of the keys, calculate primary and
2321 	 * secondary bucket and prefetch them
2322 	 */
2323 	for (i = 0; i < (num_keys - PREFETCH_OFFSET); i++) {
2324 		rte_prefetch0(keys[i + PREFETCH_OFFSET]);
2325 
2326 		prim_hash[i] = rte_hash_hash(h, keys[i]);
2327 
2328 		sig[i] = get_short_sig(prim_hash[i]);
2329 		prim_index[i] = get_prim_bucket_index(h, prim_hash[i]);
2330 		sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]);
2331 
2332 		primary_bkt[i] = &h->buckets[prim_index[i]];
2333 		secondary_bkt[i] = &h->buckets[sec_index[i]];
2334 
2335 		rte_prefetch0(primary_bkt[i]);
2336 		rte_prefetch0(secondary_bkt[i]);
2337 	}
2338 
2339 	/* Calculate and prefetch rest of the buckets */
2340 	for (; i < num_keys; i++) {
2341 		prim_hash[i] = rte_hash_hash(h, keys[i]);
2342 
2343 		sig[i] = get_short_sig(prim_hash[i]);
2344 		prim_index[i] = get_prim_bucket_index(h, prim_hash[i]);
2345 		sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]);
2346 
2347 		primary_bkt[i] = &h->buckets[prim_index[i]];
2348 		secondary_bkt[i] = &h->buckets[sec_index[i]];
2349 
2350 		rte_prefetch0(primary_bkt[i]);
2351 		rte_prefetch0(secondary_bkt[i]);
2352 	}
2353 }
2354 
2355 
2356 static inline void
2357 __rte_hash_lookup_bulk_l(const struct rte_hash *h, const void **keys,
2358 			int32_t num_keys, int32_t *positions,
2359 			uint64_t *hit_mask, void *data[])
2360 {
2361 	uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX];
2362 	const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2363 	const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2364 
2365 	__bulk_lookup_prefetching_loop(h, keys, num_keys, sig,
2366 		primary_bkt, secondary_bkt);
2367 
2368 	__bulk_lookup_l(h, keys, primary_bkt, secondary_bkt, sig, num_keys,
2369 		positions, hit_mask, data);
2370 }
2371 
2372 static inline void
2373 __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys,
2374 			int32_t num_keys, int32_t *positions,
2375 			uint64_t *hit_mask, void *data[])
2376 {
2377 	uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX];
2378 	const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2379 	const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2380 
2381 	__bulk_lookup_prefetching_loop(h, keys, num_keys, sig,
2382 		primary_bkt, secondary_bkt);
2383 
2384 	__bulk_lookup_lf(h, keys, primary_bkt, secondary_bkt, sig, num_keys,
2385 		positions, hit_mask, data);
2386 }
2387 
2388 static inline void
2389 __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
2390 			int32_t num_keys, int32_t *positions,
2391 			uint64_t *hit_mask, void *data[])
2392 {
2393 	if (h->readwrite_concur_lf_support)
2394 		__rte_hash_lookup_bulk_lf(h, keys, num_keys, positions,
2395 					  hit_mask, data);
2396 	else
2397 		__rte_hash_lookup_bulk_l(h, keys, num_keys, positions,
2398 					 hit_mask, data);
2399 }
2400 
2401 int
2402 rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
2403 		      uint32_t num_keys, int32_t *positions)
2404 {
2405 	RETURN_IF_TRUE(((h == NULL) || (keys == NULL) || (num_keys == 0) ||
2406 			(num_keys > RTE_HASH_LOOKUP_BULK_MAX) ||
2407 			(positions == NULL)), -EINVAL);
2408 
2409 	__rte_hash_lookup_bulk(h, keys, num_keys, positions, NULL, NULL);
2410 	return 0;
2411 }
2412 
2413 int
2414 rte_hash_lookup_bulk_data(const struct rte_hash *h, const void **keys,
2415 		      uint32_t num_keys, uint64_t *hit_mask, void *data[])
2416 {
2417 	RETURN_IF_TRUE(((h == NULL) || (keys == NULL) || (num_keys == 0) ||
2418 			(num_keys > RTE_HASH_LOOKUP_BULK_MAX) ||
2419 			(hit_mask == NULL)), -EINVAL);
2420 
2421 	int32_t positions[num_keys];
2422 
2423 	__rte_hash_lookup_bulk(h, keys, num_keys, positions, hit_mask, data);
2424 
2425 	/* Return number of hits */
2426 	return rte_popcount64(*hit_mask);
2427 }
2428 
2429 
2430 static inline void
2431 __rte_hash_lookup_with_hash_bulk_l(const struct rte_hash *h,
2432 			const void **keys, hash_sig_t *prim_hash,
2433 			int32_t num_keys, int32_t *positions,
2434 			uint64_t *hit_mask, void *data[])
2435 {
2436 	int32_t i;
2437 	uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX];
2438 	uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX];
2439 	uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX];
2440 	const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2441 	const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2442 
2443 	/*
2444 	 * Prefetch keys, calculate primary and
2445 	 * secondary bucket and prefetch them
2446 	 */
2447 	for (i = 0; i < num_keys; i++) {
2448 		rte_prefetch0(keys[i]);
2449 
2450 		sig[i] = get_short_sig(prim_hash[i]);
2451 		prim_index[i] = get_prim_bucket_index(h, prim_hash[i]);
2452 		sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]);
2453 
2454 		primary_bkt[i] = &h->buckets[prim_index[i]];
2455 		secondary_bkt[i] = &h->buckets[sec_index[i]];
2456 
2457 		rte_prefetch0(primary_bkt[i]);
2458 		rte_prefetch0(secondary_bkt[i]);
2459 	}
2460 
2461 	__bulk_lookup_l(h, keys, primary_bkt, secondary_bkt, sig, num_keys,
2462 		positions, hit_mask, data);
2463 }
2464 
2465 static inline void
2466 __rte_hash_lookup_with_hash_bulk_lf(const struct rte_hash *h,
2467 			const void **keys, hash_sig_t *prim_hash,
2468 			int32_t num_keys, int32_t *positions,
2469 			uint64_t *hit_mask, void *data[])
2470 {
2471 	int32_t i;
2472 	uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX];
2473 	uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX];
2474 	uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX];
2475 	const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2476 	const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
2477 
2478 	/*
2479 	 * Prefetch keys, calculate primary and
2480 	 * secondary bucket and prefetch them
2481 	 */
2482 	for (i = 0; i < num_keys; i++) {
2483 		rte_prefetch0(keys[i]);
2484 
2485 		sig[i] = get_short_sig(prim_hash[i]);
2486 		prim_index[i] = get_prim_bucket_index(h, prim_hash[i]);
2487 		sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]);
2488 
2489 		primary_bkt[i] = &h->buckets[prim_index[i]];
2490 		secondary_bkt[i] = &h->buckets[sec_index[i]];
2491 
2492 		rte_prefetch0(primary_bkt[i]);
2493 		rte_prefetch0(secondary_bkt[i]);
2494 	}
2495 
2496 	__bulk_lookup_lf(h, keys, primary_bkt, secondary_bkt, sig, num_keys,
2497 		positions, hit_mask, data);
2498 }
2499 
2500 static inline void
2501 __rte_hash_lookup_with_hash_bulk(const struct rte_hash *h, const void **keys,
2502 			hash_sig_t *prim_hash, int32_t num_keys,
2503 			int32_t *positions, uint64_t *hit_mask, void *data[])
2504 {
2505 	if (h->readwrite_concur_lf_support)
2506 		__rte_hash_lookup_with_hash_bulk_lf(h, keys, prim_hash,
2507 				num_keys, positions, hit_mask, data);
2508 	else
2509 		__rte_hash_lookup_with_hash_bulk_l(h, keys, prim_hash,
2510 				num_keys, positions, hit_mask, data);
2511 }
2512 
2513 int
2514 rte_hash_lookup_with_hash_bulk(const struct rte_hash *h, const void **keys,
2515 		hash_sig_t *sig, uint32_t num_keys, int32_t *positions)
2516 {
2517 	RETURN_IF_TRUE(((h == NULL) || (keys == NULL) ||
2518 			(sig == NULL) || (num_keys == 0) ||
2519 			(num_keys > RTE_HASH_LOOKUP_BULK_MAX) ||
2520 			(positions == NULL)), -EINVAL);
2521 
2522 	__rte_hash_lookup_with_hash_bulk(h, keys, sig, num_keys,
2523 		positions, NULL, NULL);
2524 	return 0;
2525 }
2526 
2527 int
2528 rte_hash_lookup_with_hash_bulk_data(const struct rte_hash *h,
2529 		const void **keys, hash_sig_t *sig,
2530 		uint32_t num_keys, uint64_t *hit_mask, void *data[])
2531 {
2532 	RETURN_IF_TRUE(((h == NULL) || (keys == NULL) ||
2533 			(sig == NULL) || (num_keys == 0) ||
2534 			(num_keys > RTE_HASH_LOOKUP_BULK_MAX) ||
2535 			(hit_mask == NULL)), -EINVAL);
2536 
2537 	int32_t positions[num_keys];
2538 
2539 	__rte_hash_lookup_with_hash_bulk(h, keys, sig, num_keys,
2540 			positions, hit_mask, data);
2541 
2542 	/* Return number of hits */
2543 	return rte_popcount64(*hit_mask);
2544 }
2545 
2546 int32_t
2547 rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next)
2548 {
2549 	uint32_t bucket_idx, idx, position;
2550 	struct rte_hash_key *next_key;
2551 
2552 	RETURN_IF_TRUE(((h == NULL) || (next == NULL)), -EINVAL);
2553 
2554 	const uint32_t total_entries_main = h->num_buckets *
2555 							RTE_HASH_BUCKET_ENTRIES;
2556 	const uint32_t total_entries = total_entries_main << 1;
2557 
2558 	/* Out of bounds of all buckets (both main table and ext table) */
2559 	if (*next >= total_entries_main)
2560 		goto extend_table;
2561 
2562 	/* Calculate bucket and index of current iterator */
2563 	bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
2564 	idx = *next % RTE_HASH_BUCKET_ENTRIES;
2565 
2566 	/* If current position is empty, go to the next one */
2567 	while ((position = rte_atomic_load_explicit(&h->buckets[bucket_idx].key_idx[idx],
2568 					rte_memory_order_acquire)) == EMPTY_SLOT) {
2569 		(*next)++;
2570 		/* End of table */
2571 		if (*next == total_entries_main)
2572 			goto extend_table;
2573 		bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
2574 		idx = *next % RTE_HASH_BUCKET_ENTRIES;
2575 	}
2576 
2577 	__hash_rw_reader_lock(h);
2578 	next_key = (struct rte_hash_key *) ((char *)h->key_store +
2579 				position * h->key_entry_size);
2580 	/* Return key and data */
2581 	*key = next_key->key;
2582 	*data = next_key->pdata;
2583 
2584 	__hash_rw_reader_unlock(h);
2585 
2586 	/* Increment iterator */
2587 	(*next)++;
2588 
2589 	return position - 1;
2590 
2591 /* Begin to iterate extendable buckets */
2592 extend_table:
2593 	/* Out of total bound or if ext bucket feature is not enabled */
2594 	if (*next >= total_entries || !h->ext_table_support)
2595 		return -ENOENT;
2596 
2597 	bucket_idx = (*next - total_entries_main) / RTE_HASH_BUCKET_ENTRIES;
2598 	idx = (*next - total_entries_main) % RTE_HASH_BUCKET_ENTRIES;
2599 
2600 	while ((position = h->buckets_ext[bucket_idx].key_idx[idx]) == EMPTY_SLOT) {
2601 		(*next)++;
2602 		if (*next == total_entries)
2603 			return -ENOENT;
2604 		bucket_idx = (*next - total_entries_main) /
2605 						RTE_HASH_BUCKET_ENTRIES;
2606 		idx = (*next - total_entries_main) % RTE_HASH_BUCKET_ENTRIES;
2607 	}
2608 	__hash_rw_reader_lock(h);
2609 	next_key = (struct rte_hash_key *) ((char *)h->key_store +
2610 				position * h->key_entry_size);
2611 	/* Return key and data */
2612 	*key = next_key->key;
2613 	*data = next_key->pdata;
2614 
2615 	__hash_rw_reader_unlock(h);
2616 
2617 	/* Increment iterator */
2618 	(*next)++;
2619 	return position - 1;
2620 }
2621