xref: /dpdk/lib/member/rte_member_sketch.c (revision e9fd1ebf981f361844aea9ec94e17f4bda5e1479)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  * Copyright(c) 2020, Alan Liu <zaoxingliu@gmail.com>
4  */
5 
6 #include <math.h>
7 #include <string.h>
8 
9 #include <rte_malloc.h>
10 #include <rte_memory.h>
11 #include <rte_errno.h>
12 #include <rte_log.h>
13 #include <rte_random.h>
14 #include <rte_prefetch.h>
15 #include <rte_ring_elem.h>
16 
17 #include "member.h"
18 #include "rte_member.h"
19 #include "rte_member_sketch.h"
20 #include "rte_member_heap.h"
21 
22 #ifdef CC_AVX512_SUPPORT
23 #include "rte_member_sketch_avx512.h"
24 #endif /* CC_AVX512_SUPPORT */
25 
26 struct sketch_runtime {
27 	uint64_t pkt_cnt;
28 	uint32_t until_next;
29 	int converged;
30 	struct minheap heap;
31 	struct node *report_array;
32 	void *key_slots;
33 	struct rte_ring *free_key_slots;
34 } __rte_cache_aligned;
35 
36 /*
37  * Geometric sampling to calculate how many packets needs to be
38  * skipped until next update. This method can mitigate the CPU
39  * overheads compared with coin-toss sampling.
40  */
41 static uint32_t
42 draw_geometric(const struct rte_member_setsum *ss)
43 {
44 	double rand = 1;
45 
46 	if (ss->sample_rate == 1)
47 		return 1;
48 
49 	while (rand == 1 || rand == 0)
50 		rand = (double) rte_rand() / (double)(RTE_RAND_MAX);
51 
52 	return (uint32_t)ceil(log(1 - rand) / log(1 - ss->sample_rate));
53 }
54 
55 static void
56 isort(uint64_t *array, int n)
57 {
58 	int i;
59 
60 	for (i = 1; i < n; i++) {
61 		uint64_t t = array[i];
62 		int j;
63 
64 		for (j = i - 1; j >= 0; j--) {
65 			if (t < array[j])
66 				array[j + 1] = array[j];
67 			else
68 				break;
69 		}
70 		array[j + 1] = t;
71 	}
72 }
73 
74 static __rte_always_inline void
75 swap(uint64_t *a, uint64_t *b)
76 {
77 	uint64_t tmp = *a;
78 	*a = *b;
79 	*b = tmp;
80 }
81 
82 static uint64_t
83 medianof5(uint64_t a, uint64_t b, uint64_t c, uint64_t d, uint64_t e)
84 {
85 	if (a > b)
86 		swap(&a, &b);
87 	if (c > d)
88 		swap(&c, &d);
89 	if (a > c) {
90 		if (d > e)
91 			swap(&c, &e);
92 		else {
93 			swap(&c, &d);
94 			swap(&d, &e);
95 		}
96 	} else {
97 		if (b > e)
98 			swap(&a, &e);
99 		else {
100 			swap(&a, &b);
101 			swap(&b, &e);
102 		}
103 	}
104 
105 	if (a > c)
106 		return a > d ? d : a;
107 	else
108 		return b > c ? c : b;
109 }
110 
111 int
112 rte_member_create_sketch(struct rte_member_setsum *ss,
113 			 const struct rte_member_parameters *params,
114 			 struct rte_ring *ring)
115 {
116 	struct sketch_runtime *runtime;
117 	uint32_t num_col;
118 	uint32_t i;
119 
120 	if (params->sample_rate == 0 || params->sample_rate > 1) {
121 		rte_errno = EINVAL;
122 		MEMBER_LOG(ERR,
123 			"Membership Sketch created with invalid parameters");
124 		return -EINVAL;
125 	}
126 
127 	if (params->extra_flag & RTE_MEMBER_SKETCH_COUNT_BYTE)
128 		ss->count_byte = 1;
129 
130 #ifdef RTE_ARCH_X86
131 	if (ss->count_byte == 1 &&
132 		rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_512 &&
133 		rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512F) == 1 &&
134 		rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512IFMA) == 1) {
135 #ifdef CC_AVX512_SUPPORT
136 		ss->use_avx512 = true;
137 #else
138 		ss->use_avx512 = false;
139 #endif
140 	}
141 
142 	if (ss->use_avx512 == true) {
143 #ifdef CC_AVX512_SUPPORT
144 		ss->num_row = NUM_ROW_VEC;
145 		MEMBER_LOG(NOTICE,
146 			"Membership Sketch AVX512 update/lookup/delete ops is selected");
147 		ss->sketch_update = sketch_update_avx512;
148 		ss->sketch_lookup = sketch_lookup_avx512;
149 		ss->sketch_delete = sketch_delete_avx512;
150 #endif
151 	} else
152 #endif
153 	{
154 		ss->num_row = NUM_ROW_SCALAR;
155 		MEMBER_LOG(NOTICE,
156 			"Membership Sketch SCALAR update/lookup/delete ops is selected");
157 		ss->sketch_update = sketch_update_scalar;
158 		ss->sketch_lookup = sketch_lookup_scalar;
159 		ss->sketch_delete = sketch_delete_scalar;
160 	}
161 
162 	ss->socket_id = params->socket_id;
163 
164 	if (ss->count_byte == 0)
165 		num_col = 4.0 / params->error_rate / params->sample_rate;
166 #ifdef RTE_ARCH_X86
167 	else if (ss->use_avx512 == true)
168 		num_col = rte_align32pow2(4.0 / params->error_rate);
169 #endif
170 	else
171 		num_col = 4.0 / params->error_rate;
172 
173 	ss->table = rte_zmalloc_socket(NULL,
174 			sizeof(uint64_t) * num_col * ss->num_row,
175 			RTE_CACHE_LINE_SIZE, ss->socket_id);
176 	if (ss->table == NULL) {
177 		MEMBER_LOG(ERR, "Sketch Table memory allocation failed");
178 		return -ENOMEM;
179 	}
180 
181 	ss->hash_seeds = rte_zmalloc_socket(NULL, sizeof(uint64_t) * ss->num_row,
182 			RTE_CACHE_LINE_SIZE, ss->socket_id);
183 	if (ss->hash_seeds == NULL) {
184 		MEMBER_LOG(ERR, "Sketch Hashseeds memory allocation failed");
185 		return -ENOMEM;
186 	}
187 
188 	ss->runtime_var = rte_zmalloc_socket(NULL, sizeof(struct sketch_runtime),
189 					RTE_CACHE_LINE_SIZE, ss->socket_id);
190 	if (ss->runtime_var == NULL) {
191 		MEMBER_LOG(ERR, "Sketch Runtime memory allocation failed");
192 		rte_free(ss);
193 		return -ENOMEM;
194 	}
195 	runtime = ss->runtime_var;
196 
197 	ss->num_col = num_col;
198 	ss->sample_rate = params->sample_rate;
199 	ss->prim_hash_seed = params->prim_hash_seed;
200 	ss->sec_hash_seed = params->sec_hash_seed;
201 	ss->error_rate = params->error_rate;
202 	ss->topk = params->top_k;
203 	ss->key_len = params->key_len;
204 	runtime->heap.key_len = ss->key_len;
205 
206 	runtime->key_slots = rte_zmalloc_socket(NULL, ss->key_len * ss->topk,
207 					RTE_CACHE_LINE_SIZE, ss->socket_id);
208 	if (runtime->key_slots == NULL) {
209 		MEMBER_LOG(ERR, "Sketch Key Slots allocation failed");
210 		goto error;
211 	}
212 
213 	runtime->free_key_slots = ring;
214 	for (i = 0; i < ss->topk; i++)
215 		rte_ring_sp_enqueue_elem(runtime->free_key_slots,
216 					&i, sizeof(uint32_t));
217 
218 	if (rte_member_minheap_init(&(runtime->heap), params->top_k,
219 			ss->socket_id, params->prim_hash_seed) < 0) {
220 		MEMBER_LOG(ERR, "Sketch Minheap allocation failed");
221 		goto error_runtime;
222 	}
223 
224 	runtime->report_array = rte_zmalloc_socket(NULL, sizeof(struct node) * ss->topk,
225 					RTE_CACHE_LINE_SIZE, ss->socket_id);
226 	if (runtime->report_array == NULL) {
227 		MEMBER_LOG(ERR, "Sketch Runtime Report Array allocation failed");
228 		goto error_runtime;
229 	}
230 
231 	for (i = 0; i < ss->num_row; i++)
232 		ss->hash_seeds[i] = rte_rand();
233 
234 	if (params->extra_flag & RTE_MEMBER_SKETCH_ALWAYS_BOUNDED)
235 		ss->always_bounded = 1;
236 
237 	if (ss->always_bounded) {
238 		double delta = 1.0 / (pow(2, ss->num_row));
239 
240 		ss->converge_thresh = 10 * pow(ss->error_rate, -2.0) * sqrt(log(1 / delta));
241 	}
242 
243 	MEMBER_LOG(DEBUG, "Sketch created, "
244 		"the total memory required is %u Bytes",  ss->num_col * ss->num_row * 8);
245 
246 	return 0;
247 
248 error_runtime:
249 	rte_member_minheap_free(&runtime->heap);
250 	rte_ring_free(runtime->free_key_slots);
251 	rte_free(runtime->key_slots);
252 error:
253 	rte_free(runtime);
254 	rte_free(ss);
255 
256 	return -ENOMEM;
257 }
258 
259 uint64_t
260 sketch_lookup_scalar(const struct rte_member_setsum *ss, const void *key)
261 {
262 	uint64_t *count_array = ss->table;
263 	uint32_t col[ss->num_row];
264 	uint64_t count_row[ss->num_row];
265 	uint32_t cur_row;
266 	uint64_t count;
267 
268 	for (cur_row = 0; cur_row < ss->num_row; cur_row++) {
269 		col[cur_row] = MEMBER_HASH_FUNC(key, ss->key_len,
270 			ss->hash_seeds[cur_row]) % ss->num_col;
271 
272 		rte_prefetch0(&count_array[cur_row * ss->num_col + col[cur_row]]);
273 	}
274 
275 	/* if sample rate is 1, it is a regular count-min, we report the min */
276 	if (ss->sample_rate == 1 || ss->count_byte == 1)
277 		return count_min(ss, col);
278 
279 	memset(count_row, 0, sizeof(uint64_t) * ss->num_row);
280 
281 	/* otherwise we report the median number */
282 	for (cur_row = 0; cur_row < ss->num_row; cur_row++)
283 		count_row[cur_row] = count_array[cur_row * ss->num_col + col[cur_row]];
284 
285 	if (ss->num_row == 5)
286 		return medianof5(count_row[0], count_row[1],
287 				count_row[2], count_row[3], count_row[4]);
288 
289 	isort(count_row, ss->num_row);
290 
291 	if (ss->num_row % 2 == 0) {
292 		count = (count_row[ss->num_row / 2] + count_row[ss->num_row / 2 - 1]) / 2;
293 		return count;
294 	}
295 	/* ss->num_row % 2 != 0 */
296 	count = count_row[ss->num_row / 2];
297 
298 	return count;
299 }
300 
301 void
302 sketch_delete_scalar(const struct rte_member_setsum *ss, const void *key)
303 {
304 	uint32_t col[ss->num_row];
305 	uint64_t *count_array = ss->table;
306 	uint32_t cur_row;
307 
308 	for (cur_row = 0; cur_row < ss->num_row; cur_row++) {
309 		col[cur_row] = MEMBER_HASH_FUNC(key, ss->key_len,
310 			ss->hash_seeds[cur_row]) % ss->num_col;
311 
312 		/* set corresponding counter to 0 */
313 		count_array[cur_row * ss->num_col + col[cur_row]] = 0;
314 	}
315 }
316 
317 int
318 rte_member_query_sketch(const struct rte_member_setsum *ss,
319 			const void *key,
320 			uint64_t *output)
321 {
322 	uint64_t count = ss->sketch_lookup(ss, key);
323 	*output = count;
324 
325 	return 0;
326 }
327 
328 void
329 rte_member_update_heap(const struct rte_member_setsum *ss)
330 {
331 	uint32_t i;
332 	struct sketch_runtime *runtime_var = ss->runtime_var;
333 
334 	for (i = 0; i < runtime_var->heap.size; i++) {
335 		uint64_t count = ss->sketch_lookup(ss, runtime_var->heap.elem[i].key);
336 
337 		runtime_var->heap.elem[i].count = count;
338 	}
339 }
340 
341 int
342 rte_member_report_heavyhitter_sketch(const struct rte_member_setsum *setsum,
343 				     void **key,
344 				     uint64_t *count)
345 {
346 	uint32_t i;
347 	struct sketch_runtime *runtime_var = setsum->runtime_var;
348 
349 	rte_member_update_heap(setsum);
350 	rte_member_heapsort(&(runtime_var->heap), runtime_var->report_array);
351 
352 	for (i = 0; i < runtime_var->heap.size; i++) {
353 		key[i] = runtime_var->report_array[i].key;
354 		count[i] = runtime_var->report_array[i].count;
355 	}
356 
357 	return runtime_var->heap.size;
358 }
359 
360 int
361 rte_member_lookup_sketch(const struct rte_member_setsum *ss,
362 			 const void *key, member_set_t *set_id)
363 {
364 	uint64_t count = ss->sketch_lookup(ss, key);
365 	struct sketch_runtime *runtime_var = ss->runtime_var;
366 
367 	if (runtime_var->heap.size > 0 && count >= runtime_var->heap.elem[0].count)
368 		*set_id = 1;
369 	else
370 		*set_id = 0;
371 
372 	if (count == 0)
373 		return 0;
374 	else
375 		return 1;
376 }
377 
378 static void
379 should_converge(const struct rte_member_setsum *ss)
380 {
381 	struct sketch_runtime *runtime_var = ss->runtime_var;
382 
383 	/* For count min sketch - L1 norm */
384 	if (runtime_var->pkt_cnt > ss->converge_thresh) {
385 		runtime_var->converged = 1;
386 		MEMBER_LOG(DEBUG, "Sketch converged, begin sampling "
387 					"from key count %"PRIu64,
388 					runtime_var->pkt_cnt);
389 	}
390 }
391 
392 static void
393 sketch_update_row(const struct rte_member_setsum *ss, const void *key,
394 		  uint32_t count, uint32_t cur_row)
395 {
396 	uint64_t *count_array = ss->table;
397 	uint32_t col = MEMBER_HASH_FUNC(key, ss->key_len,
398 			ss->hash_seeds[cur_row]) % ss->num_col;
399 
400 	/* sketch counter update */
401 	count_array[cur_row * ss->num_col + col] +=
402 			ceil(count / (ss->sample_rate));
403 }
404 
405 void
406 sketch_update_scalar(const struct rte_member_setsum *ss,
407 		     const void *key,
408 		     uint32_t count)
409 {
410 	uint64_t *count_array = ss->table;
411 	uint32_t col;
412 	uint32_t cur_row;
413 
414 	for (cur_row = 0; cur_row < ss->num_row; cur_row++) {
415 		col = MEMBER_HASH_FUNC(key, ss->key_len,
416 				ss->hash_seeds[cur_row]) % ss->num_col;
417 		count_array[cur_row * ss->num_col + col] += count;
418 	}
419 }
420 
421 static void
422 heap_update(const struct rte_member_setsum *ss, const void *key)
423 {
424 	struct sketch_runtime *runtime_var = ss->runtime_var;
425 	uint64_t key_cnt = 0;
426 	int found;
427 
428 	/* We also update the heap for this key */
429 	key_cnt = ss->sketch_lookup(ss, key);
430 	if (key_cnt > runtime_var->heap.elem[0].count) {
431 		found = rte_member_minheap_find(&runtime_var->heap, key);
432 		/* the key is found in the top-k heap */
433 		if (found >= 0) {
434 			if (runtime_var->heap.elem[found].count < key_cnt)
435 				rte_member_heapify(&runtime_var->heap, found, true);
436 
437 			runtime_var->heap.elem[found].count = key_cnt;
438 		} else if (runtime_var->heap.size < ss->topk) {
439 			rte_member_minheap_insert_node(&runtime_var->heap, key,
440 				key_cnt, runtime_var->key_slots, runtime_var->free_key_slots);
441 		} else {
442 			rte_member_minheap_replace_node(&runtime_var->heap, key, key_cnt);
443 		}
444 	} else if (runtime_var->heap.size < ss->topk) {
445 		found = rte_member_minheap_find(&runtime_var->heap, key);
446 		if (found >= 0) {
447 			if (runtime_var->heap.elem[found].count < key_cnt)
448 				rte_member_heapify(&runtime_var->heap, found, true);
449 
450 			runtime_var->heap.elem[found].count = key_cnt;
451 		} else
452 			rte_member_minheap_insert_node(&runtime_var->heap, key,
453 				key_cnt, runtime_var->key_slots, runtime_var->free_key_slots);
454 	}
455 }
456 
457 /*
458  * Add a single packet into the sketch.
459  * Sketch value is meatured by packet numbers in this mode.
460  */
461 int
462 rte_member_add_sketch(const struct rte_member_setsum *ss,
463 		      const void *key,
464 		      __rte_unused member_set_t set_id)
465 {
466 	uint32_t cur_row;
467 	struct sketch_runtime *runtime_var = ss->runtime_var;
468 	uint32_t *until_next = &(runtime_var->until_next);
469 
470 	/*
471 	 * If sketch is measured by byte count,
472 	 * the rte_member_add_sketch_byte_count routine should be used.
473 	 */
474 	if (ss->count_byte == 1) {
475 		MEMBER_LOG(ERR, "Sketch is Byte Mode, "
476 			"should use rte_member_add_byte_count()!");
477 		return -EINVAL;
478 	}
479 
480 	if (ss->sample_rate == 1) {
481 		ss->sketch_update(ss, key, 1);
482 		heap_update(ss, key);
483 		return 0;
484 	}
485 
486 	/* convergence stage if it's needed */
487 	if (ss->always_bounded && !runtime_var->converged) {
488 		ss->sketch_update(ss, key, 1);
489 
490 		if (!((++runtime_var->pkt_cnt) & (INTERVAL - 1)))
491 			should_converge(ss);
492 
493 		heap_update(ss, key);
494 		return 0;
495 	}
496 
497 	/* should we skip this packet */
498 	if (*until_next >= ss->num_row) {
499 		*until_next -= ss->num_row;
500 		return 0;
501 	}
502 	cur_row = *until_next;
503 	do {
504 		sketch_update_row(ss, key, 1, cur_row);
505 		*until_next = draw_geometric(ss);
506 		if (cur_row + *until_next >= ss->num_row)
507 			break;
508 		cur_row += *until_next;
509 	} while (1);
510 
511 	*until_next -= (ss->num_row - cur_row);
512 
513 	heap_update(ss, key);
514 
515 	return 0;
516 }
517 
518 /*
519  * Add the byte count of the packet into the sketch.
520  * Sketch value is meatured by byte count numbers in this mode.
521  */
522 int
523 rte_member_add_sketch_byte_count(const struct rte_member_setsum *ss,
524 				 const void *key,
525 				 uint32_t byte_count)
526 {
527 	struct sketch_runtime *runtime_var = ss->runtime_var;
528 	uint32_t *until_next = &(runtime_var->until_next);
529 
530 	/* should not call this API if not in count byte mode */
531 	if (ss->count_byte == 0) {
532 		MEMBER_LOG(ERR, "Sketch is Pkt Mode, "
533 			"should use rte_member_add()!");
534 		return -EINVAL;
535 	}
536 
537 	/* there's specific optimization for the sketch update */
538 	ss->sketch_update(ss, key, byte_count);
539 
540 	if (*until_next != 0) {
541 		*until_next = *until_next - 1;
542 		return 0;
543 	}
544 
545 	*until_next = draw_geometric(ss) - 1;
546 
547 	heap_update(ss, key);
548 
549 	return 0;
550 }
551 
552 int
553 rte_member_delete_sketch(const struct rte_member_setsum *ss,
554 			 const void *key)
555 {
556 	struct sketch_runtime *runtime_var = ss->runtime_var;
557 	int found;
558 
559 	found = rte_member_minheap_find(&runtime_var->heap, key);
560 	if (found < 0)
561 		return -1;
562 
563 	ss->sketch_delete(ss, key);
564 
565 	return rte_member_minheap_delete_node
566 		(&runtime_var->heap, key, runtime_var->key_slots, runtime_var->free_key_slots);
567 }
568 
569 void
570 rte_member_free_sketch(struct rte_member_setsum *ss)
571 {
572 	struct sketch_runtime *runtime_var = ss->runtime_var;
573 
574 	rte_free(ss->table);
575 	rte_member_minheap_free(&runtime_var->heap);
576 	rte_free(runtime_var->key_slots);
577 	rte_ring_free(runtime_var->free_key_slots);
578 	rte_free(runtime_var);
579 }
580 
581 void
582 rte_member_reset_sketch(const struct rte_member_setsum *ss)
583 {
584 	struct sketch_runtime *runtime_var = ss->runtime_var;
585 	uint64_t *sketch = ss->table;
586 	uint32_t i;
587 
588 	memset(sketch, 0, sizeof(uint64_t) * ss->num_col * ss->num_row);
589 	rte_member_minheap_reset(&runtime_var->heap);
590 	rte_ring_reset(runtime_var->free_key_slots);
591 
592 	for (i = 0; i < ss->topk; i++)
593 		rte_ring_sp_enqueue_elem(runtime_var->free_key_slots, &i, sizeof(uint32_t));
594 }
595