1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2020 Intel Corporation
3 * Copyright(c) 2020, Alan Liu <zaoxingliu@gmail.com>
4 */
5
6 #include <math.h>
7 #include <string.h>
8
9 #include <rte_malloc.h>
10 #include <rte_memory.h>
11 #include <rte_errno.h>
12 #include <rte_log.h>
13 #include <rte_random.h>
14 #include <rte_prefetch.h>
15 #include <rte_ring_elem.h>
16
17 #include "member.h"
18 #include "rte_member.h"
19 #include "rte_member_sketch.h"
20 #include "rte_member_heap.h"
21
22 #ifdef CC_AVX512_SUPPORT
23 #include "rte_member_sketch_avx512.h"
24 #endif /* CC_AVX512_SUPPORT */
25
26 struct __rte_cache_aligned sketch_runtime {
27 uint64_t pkt_cnt;
28 uint32_t until_next;
29 int converged;
30 struct minheap heap;
31 struct node *report_array;
32 void *key_slots;
33 struct rte_ring *free_key_slots;
34 };
35
36 /*
37 * Geometric sampling to calculate how many packets needs to be
38 * skipped until next update. This method can mitigate the CPU
39 * overheads compared with coin-toss sampling.
40 */
41 static uint32_t
draw_geometric(const struct rte_member_setsum * ss)42 draw_geometric(const struct rte_member_setsum *ss)
43 {
44 double rand = 1;
45
46 if (ss->sample_rate == 1)
47 return 1;
48
49 while (rand == 1 || rand == 0)
50 rand = (double) rte_rand() / (double)(RTE_RAND_MAX);
51
52 return (uint32_t)ceil(log(1 - rand) / log(1 - ss->sample_rate));
53 }
54
55 static void
isort(uint64_t * array,int n)56 isort(uint64_t *array, int n)
57 {
58 int i;
59
60 for (i = 1; i < n; i++) {
61 uint64_t t = array[i];
62 int j;
63
64 for (j = i - 1; j >= 0; j--) {
65 if (t < array[j])
66 array[j + 1] = array[j];
67 else
68 break;
69 }
70 array[j + 1] = t;
71 }
72 }
73
74 static __rte_always_inline void
swap(uint64_t * a,uint64_t * b)75 swap(uint64_t *a, uint64_t *b)
76 {
77 uint64_t tmp = *a;
78 *a = *b;
79 *b = tmp;
80 }
81
82 static uint64_t
medianof5(uint64_t a,uint64_t b,uint64_t c,uint64_t d,uint64_t e)83 medianof5(uint64_t a, uint64_t b, uint64_t c, uint64_t d, uint64_t e)
84 {
85 if (a > b)
86 swap(&a, &b);
87 if (c > d)
88 swap(&c, &d);
89 if (a > c) {
90 if (d > e)
91 swap(&c, &e);
92 else {
93 swap(&c, &d);
94 swap(&d, &e);
95 }
96 } else {
97 if (b > e)
98 swap(&a, &e);
99 else {
100 swap(&a, &b);
101 swap(&b, &e);
102 }
103 }
104
105 if (a > c)
106 return a > d ? d : a;
107 else
108 return b > c ? c : b;
109 }
110
111 int
rte_member_create_sketch(struct rte_member_setsum * ss,const struct rte_member_parameters * params,struct rte_ring * ring)112 rte_member_create_sketch(struct rte_member_setsum *ss,
113 const struct rte_member_parameters *params,
114 struct rte_ring *ring)
115 {
116 struct sketch_runtime *runtime;
117 uint32_t num_col;
118 uint32_t i;
119
120 if (params->sample_rate == 0 || params->sample_rate > 1) {
121 rte_errno = EINVAL;
122 MEMBER_LOG(ERR,
123 "Membership Sketch created with invalid parameters");
124 return -EINVAL;
125 }
126
127 if (params->extra_flag & RTE_MEMBER_SKETCH_COUNT_BYTE)
128 ss->count_byte = 1;
129
130 #ifdef RTE_ARCH_X86
131 if (ss->count_byte == 1 &&
132 rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_512 &&
133 rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512F) == 1 &&
134 rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512IFMA) == 1) {
135 #ifdef CC_AVX512_SUPPORT
136 ss->use_avx512 = true;
137 #else
138 ss->use_avx512 = false;
139 #endif
140 }
141
142 if (ss->use_avx512 == true) {
143 #ifdef CC_AVX512_SUPPORT
144 ss->num_row = NUM_ROW_VEC;
145 MEMBER_LOG(NOTICE,
146 "Membership Sketch AVX512 update/lookup/delete ops is selected");
147 ss->sketch_update = sketch_update_avx512;
148 ss->sketch_lookup = sketch_lookup_avx512;
149 ss->sketch_delete = sketch_delete_avx512;
150 #endif
151 } else
152 #endif
153 {
154 ss->num_row = NUM_ROW_SCALAR;
155 MEMBER_LOG(NOTICE,
156 "Membership Sketch SCALAR update/lookup/delete ops is selected");
157 ss->sketch_update = sketch_update_scalar;
158 ss->sketch_lookup = sketch_lookup_scalar;
159 ss->sketch_delete = sketch_delete_scalar;
160 }
161
162 ss->socket_id = params->socket_id;
163
164 if (ss->count_byte == 0)
165 num_col = 4.0 / params->error_rate / params->sample_rate;
166 #ifdef RTE_ARCH_X86
167 else if (ss->use_avx512 == true)
168 num_col = rte_align32pow2(4.0 / params->error_rate);
169 #endif
170 else
171 num_col = 4.0 / params->error_rate;
172
173 ss->table = rte_zmalloc_socket(NULL,
174 sizeof(uint64_t) * num_col * ss->num_row,
175 RTE_CACHE_LINE_SIZE, ss->socket_id);
176 if (ss->table == NULL) {
177 MEMBER_LOG(ERR, "Sketch Table memory allocation failed");
178 return -ENOMEM;
179 }
180
181 ss->hash_seeds = rte_zmalloc_socket(NULL, sizeof(uint64_t) * ss->num_row,
182 RTE_CACHE_LINE_SIZE, ss->socket_id);
183 if (ss->hash_seeds == NULL) {
184 MEMBER_LOG(ERR, "Sketch Hashseeds memory allocation failed");
185 return -ENOMEM;
186 }
187
188 ss->runtime_var = rte_zmalloc_socket(NULL, sizeof(struct sketch_runtime),
189 RTE_CACHE_LINE_SIZE, ss->socket_id);
190 if (ss->runtime_var == NULL) {
191 MEMBER_LOG(ERR, "Sketch Runtime memory allocation failed");
192 rte_free(ss);
193 return -ENOMEM;
194 }
195 runtime = ss->runtime_var;
196
197 ss->num_col = num_col;
198 ss->sample_rate = params->sample_rate;
199 ss->prim_hash_seed = params->prim_hash_seed;
200 ss->sec_hash_seed = params->sec_hash_seed;
201 ss->error_rate = params->error_rate;
202 ss->topk = params->top_k;
203 ss->key_len = params->key_len;
204 runtime->heap.key_len = ss->key_len;
205
206 runtime->key_slots = rte_zmalloc_socket(NULL, ss->key_len * ss->topk,
207 RTE_CACHE_LINE_SIZE, ss->socket_id);
208 if (runtime->key_slots == NULL) {
209 MEMBER_LOG(ERR, "Sketch Key Slots allocation failed");
210 goto error;
211 }
212
213 runtime->free_key_slots = ring;
214 for (i = 0; i < ss->topk; i++)
215 rte_ring_sp_enqueue_elem(runtime->free_key_slots,
216 &i, sizeof(uint32_t));
217
218 if (rte_member_minheap_init(&(runtime->heap), params->top_k,
219 ss->socket_id, params->prim_hash_seed) < 0) {
220 MEMBER_LOG(ERR, "Sketch Minheap allocation failed");
221 goto error_runtime;
222 }
223
224 runtime->report_array = rte_zmalloc_socket(NULL, sizeof(struct node) * ss->topk,
225 RTE_CACHE_LINE_SIZE, ss->socket_id);
226 if (runtime->report_array == NULL) {
227 MEMBER_LOG(ERR, "Sketch Runtime Report Array allocation failed");
228 goto error_runtime;
229 }
230
231 for (i = 0; i < ss->num_row; i++)
232 ss->hash_seeds[i] = rte_rand();
233
234 if (params->extra_flag & RTE_MEMBER_SKETCH_ALWAYS_BOUNDED)
235 ss->always_bounded = 1;
236
237 if (ss->always_bounded) {
238 double delta = 1.0 / (pow(2, ss->num_row));
239
240 ss->converge_thresh = 10 * pow(ss->error_rate, -2.0) * sqrt(log(1 / delta));
241 }
242
243 MEMBER_LOG(DEBUG, "Sketch created, "
244 "the total memory required is %u Bytes", ss->num_col * ss->num_row * 8);
245
246 return 0;
247
248 error_runtime:
249 rte_member_minheap_free(&runtime->heap);
250 rte_ring_free(runtime->free_key_slots);
251 rte_free(runtime->key_slots);
252 error:
253 rte_free(runtime);
254 rte_free(ss);
255
256 return -ENOMEM;
257 }
258
259 uint64_t
sketch_lookup_scalar(const struct rte_member_setsum * ss,const void * key)260 sketch_lookup_scalar(const struct rte_member_setsum *ss, const void *key)
261 {
262 uint64_t *count_array = ss->table;
263 uint32_t col[ss->num_row];
264 uint64_t count_row[ss->num_row];
265 uint32_t cur_row;
266 uint64_t count;
267
268 for (cur_row = 0; cur_row < ss->num_row; cur_row++) {
269 col[cur_row] = MEMBER_HASH_FUNC(key, ss->key_len,
270 ss->hash_seeds[cur_row]) % ss->num_col;
271
272 rte_prefetch0(&count_array[cur_row * ss->num_col + col[cur_row]]);
273 }
274
275 /* if sample rate is 1, it is a regular count-min, we report the min */
276 if (ss->sample_rate == 1 || ss->count_byte == 1)
277 return count_min(ss, col);
278
279 memset(count_row, 0, sizeof(uint64_t) * ss->num_row);
280
281 /* otherwise we report the median number */
282 for (cur_row = 0; cur_row < ss->num_row; cur_row++)
283 count_row[cur_row] = count_array[cur_row * ss->num_col + col[cur_row]];
284
285 if (ss->num_row == 5)
286 return medianof5(count_row[0], count_row[1],
287 count_row[2], count_row[3], count_row[4]);
288
289 isort(count_row, ss->num_row);
290
291 if (ss->num_row % 2 == 0) {
292 count = (count_row[ss->num_row / 2] + count_row[ss->num_row / 2 - 1]) / 2;
293 return count;
294 }
295 /* ss->num_row % 2 != 0 */
296 count = count_row[ss->num_row / 2];
297
298 return count;
299 }
300
301 void
sketch_delete_scalar(const struct rte_member_setsum * ss,const void * key)302 sketch_delete_scalar(const struct rte_member_setsum *ss, const void *key)
303 {
304 uint32_t col[ss->num_row];
305 uint64_t *count_array = ss->table;
306 uint32_t cur_row;
307
308 for (cur_row = 0; cur_row < ss->num_row; cur_row++) {
309 col[cur_row] = MEMBER_HASH_FUNC(key, ss->key_len,
310 ss->hash_seeds[cur_row]) % ss->num_col;
311
312 /* set corresponding counter to 0 */
313 count_array[cur_row * ss->num_col + col[cur_row]] = 0;
314 }
315 }
316
317 int
rte_member_query_sketch(const struct rte_member_setsum * ss,const void * key,uint64_t * output)318 rte_member_query_sketch(const struct rte_member_setsum *ss,
319 const void *key,
320 uint64_t *output)
321 {
322 uint64_t count = ss->sketch_lookup(ss, key);
323 *output = count;
324
325 return 0;
326 }
327
328 void
rte_member_update_heap(const struct rte_member_setsum * ss)329 rte_member_update_heap(const struct rte_member_setsum *ss)
330 {
331 uint32_t i;
332 struct sketch_runtime *runtime_var = ss->runtime_var;
333
334 for (i = 0; i < runtime_var->heap.size; i++) {
335 uint64_t count = ss->sketch_lookup(ss, runtime_var->heap.elem[i].key);
336
337 runtime_var->heap.elem[i].count = count;
338 }
339 }
340
341 int
rte_member_report_heavyhitter_sketch(const struct rte_member_setsum * setsum,void ** key,uint64_t * count)342 rte_member_report_heavyhitter_sketch(const struct rte_member_setsum *setsum,
343 void **key,
344 uint64_t *count)
345 {
346 uint32_t i;
347 struct sketch_runtime *runtime_var = setsum->runtime_var;
348
349 rte_member_update_heap(setsum);
350 rte_member_heapsort(&(runtime_var->heap), runtime_var->report_array);
351
352 for (i = 0; i < runtime_var->heap.size; i++) {
353 key[i] = runtime_var->report_array[i].key;
354 count[i] = runtime_var->report_array[i].count;
355 }
356
357 return runtime_var->heap.size;
358 }
359
360 int
rte_member_lookup_sketch(const struct rte_member_setsum * ss,const void * key,member_set_t * set_id)361 rte_member_lookup_sketch(const struct rte_member_setsum *ss,
362 const void *key, member_set_t *set_id)
363 {
364 uint64_t count = ss->sketch_lookup(ss, key);
365 struct sketch_runtime *runtime_var = ss->runtime_var;
366
367 if (runtime_var->heap.size > 0 && count >= runtime_var->heap.elem[0].count)
368 *set_id = 1;
369 else
370 *set_id = 0;
371
372 if (count == 0)
373 return 0;
374 else
375 return 1;
376 }
377
378 static void
should_converge(const struct rte_member_setsum * ss)379 should_converge(const struct rte_member_setsum *ss)
380 {
381 struct sketch_runtime *runtime_var = ss->runtime_var;
382
383 /* For count min sketch - L1 norm */
384 if (runtime_var->pkt_cnt > ss->converge_thresh) {
385 runtime_var->converged = 1;
386 MEMBER_LOG(DEBUG, "Sketch converged, begin sampling "
387 "from key count %"PRIu64,
388 runtime_var->pkt_cnt);
389 }
390 }
391
392 static void
sketch_update_row(const struct rte_member_setsum * ss,const void * key,uint32_t count,uint32_t cur_row)393 sketch_update_row(const struct rte_member_setsum *ss, const void *key,
394 uint32_t count, uint32_t cur_row)
395 {
396 uint64_t *count_array = ss->table;
397 uint32_t col = MEMBER_HASH_FUNC(key, ss->key_len,
398 ss->hash_seeds[cur_row]) % ss->num_col;
399
400 /* sketch counter update */
401 count_array[cur_row * ss->num_col + col] +=
402 ceil(count / (ss->sample_rate));
403 }
404
405 void
sketch_update_scalar(const struct rte_member_setsum * ss,const void * key,uint32_t count)406 sketch_update_scalar(const struct rte_member_setsum *ss,
407 const void *key,
408 uint32_t count)
409 {
410 uint64_t *count_array = ss->table;
411 uint32_t col;
412 uint32_t cur_row;
413
414 for (cur_row = 0; cur_row < ss->num_row; cur_row++) {
415 col = MEMBER_HASH_FUNC(key, ss->key_len,
416 ss->hash_seeds[cur_row]) % ss->num_col;
417 count_array[cur_row * ss->num_col + col] += count;
418 }
419 }
420
421 static void
heap_update(const struct rte_member_setsum * ss,const void * key)422 heap_update(const struct rte_member_setsum *ss, const void *key)
423 {
424 struct sketch_runtime *runtime_var = ss->runtime_var;
425 uint64_t key_cnt = 0;
426 int found;
427
428 /* We also update the heap for this key */
429 key_cnt = ss->sketch_lookup(ss, key);
430 if (key_cnt > runtime_var->heap.elem[0].count) {
431 found = rte_member_minheap_find(&runtime_var->heap, key);
432 /* the key is found in the top-k heap */
433 if (found >= 0) {
434 if (runtime_var->heap.elem[found].count < key_cnt)
435 rte_member_heapify(&runtime_var->heap, found, true);
436
437 runtime_var->heap.elem[found].count = key_cnt;
438 } else if (runtime_var->heap.size < ss->topk) {
439 rte_member_minheap_insert_node(&runtime_var->heap, key,
440 key_cnt, runtime_var->key_slots, runtime_var->free_key_slots);
441 } else {
442 rte_member_minheap_replace_node(&runtime_var->heap, key, key_cnt);
443 }
444 } else if (runtime_var->heap.size < ss->topk) {
445 found = rte_member_minheap_find(&runtime_var->heap, key);
446 if (found >= 0) {
447 if (runtime_var->heap.elem[found].count < key_cnt)
448 rte_member_heapify(&runtime_var->heap, found, true);
449
450 runtime_var->heap.elem[found].count = key_cnt;
451 } else
452 rte_member_minheap_insert_node(&runtime_var->heap, key,
453 key_cnt, runtime_var->key_slots, runtime_var->free_key_slots);
454 }
455 }
456
457 /*
458 * Add a single packet into the sketch.
459 * Sketch value is meatured by packet numbers in this mode.
460 */
461 int
rte_member_add_sketch(const struct rte_member_setsum * ss,const void * key,__rte_unused member_set_t set_id)462 rte_member_add_sketch(const struct rte_member_setsum *ss,
463 const void *key,
464 __rte_unused member_set_t set_id)
465 {
466 uint32_t cur_row;
467 struct sketch_runtime *runtime_var = ss->runtime_var;
468 uint32_t *until_next = &(runtime_var->until_next);
469
470 /*
471 * If sketch is measured by byte count,
472 * the rte_member_add_sketch_byte_count routine should be used.
473 */
474 if (ss->count_byte == 1) {
475 MEMBER_LOG(ERR, "Sketch is Byte Mode, "
476 "should use rte_member_add_byte_count()!");
477 return -EINVAL;
478 }
479
480 if (ss->sample_rate == 1) {
481 ss->sketch_update(ss, key, 1);
482 heap_update(ss, key);
483 return 0;
484 }
485
486 /* convergence stage if it's needed */
487 if (ss->always_bounded && !runtime_var->converged) {
488 ss->sketch_update(ss, key, 1);
489
490 if (!((++runtime_var->pkt_cnt) & (INTERVAL - 1)))
491 should_converge(ss);
492
493 heap_update(ss, key);
494 return 0;
495 }
496
497 /* should we skip this packet */
498 if (*until_next >= ss->num_row) {
499 *until_next -= ss->num_row;
500 return 0;
501 }
502 cur_row = *until_next;
503 do {
504 sketch_update_row(ss, key, 1, cur_row);
505 *until_next = draw_geometric(ss);
506 if (cur_row + *until_next >= ss->num_row)
507 break;
508 cur_row += *until_next;
509 } while (1);
510
511 *until_next -= (ss->num_row - cur_row);
512
513 heap_update(ss, key);
514
515 return 0;
516 }
517
518 /*
519 * Add the byte count of the packet into the sketch.
520 * Sketch value is meatured by byte count numbers in this mode.
521 */
522 int
rte_member_add_sketch_byte_count(const struct rte_member_setsum * ss,const void * key,uint32_t byte_count)523 rte_member_add_sketch_byte_count(const struct rte_member_setsum *ss,
524 const void *key,
525 uint32_t byte_count)
526 {
527 struct sketch_runtime *runtime_var = ss->runtime_var;
528 uint32_t *until_next = &(runtime_var->until_next);
529
530 /* should not call this API if not in count byte mode */
531 if (ss->count_byte == 0) {
532 MEMBER_LOG(ERR, "Sketch is Pkt Mode, "
533 "should use rte_member_add()!");
534 return -EINVAL;
535 }
536
537 /* there's specific optimization for the sketch update */
538 ss->sketch_update(ss, key, byte_count);
539
540 if (*until_next != 0) {
541 *until_next = *until_next - 1;
542 return 0;
543 }
544
545 *until_next = draw_geometric(ss) - 1;
546
547 heap_update(ss, key);
548
549 return 0;
550 }
551
552 int
rte_member_delete_sketch(const struct rte_member_setsum * ss,const void * key)553 rte_member_delete_sketch(const struct rte_member_setsum *ss,
554 const void *key)
555 {
556 struct sketch_runtime *runtime_var = ss->runtime_var;
557 int found;
558
559 found = rte_member_minheap_find(&runtime_var->heap, key);
560 if (found < 0)
561 return -1;
562
563 ss->sketch_delete(ss, key);
564
565 return rte_member_minheap_delete_node
566 (&runtime_var->heap, key, runtime_var->key_slots, runtime_var->free_key_slots);
567 }
568
569 void
rte_member_free_sketch(struct rte_member_setsum * ss)570 rte_member_free_sketch(struct rte_member_setsum *ss)
571 {
572 struct sketch_runtime *runtime_var = ss->runtime_var;
573
574 rte_free(ss->table);
575 rte_member_minheap_free(&runtime_var->heap);
576 rte_free(runtime_var->key_slots);
577 rte_ring_free(runtime_var->free_key_slots);
578 rte_free(runtime_var);
579 }
580
581 void
rte_member_reset_sketch(const struct rte_member_setsum * ss)582 rte_member_reset_sketch(const struct rte_member_setsum *ss)
583 {
584 struct sketch_runtime *runtime_var = ss->runtime_var;
585 uint64_t *sketch = ss->table;
586 uint32_t i;
587
588 memset(sketch, 0, sizeof(uint64_t) * ss->num_col * ss->num_row);
589 rte_member_minheap_reset(&runtime_var->heap);
590 rte_ring_reset(runtime_var->free_key_slots);
591
592 for (i = 0; i < ss->topk; i++)
593 rte_ring_sp_enqueue_elem(runtime_var->free_key_slots, &i, sizeof(uint32_t));
594 }
595