1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2020 Intel Corporation 3 * Copyright(c) 2020, Alan Liu <zaoxingliu@gmail.com> 4 */ 5 6 #include <math.h> 7 #include <string.h> 8 9 #include <rte_malloc.h> 10 #include <rte_memory.h> 11 #include <rte_errno.h> 12 #include <rte_log.h> 13 #include <rte_random.h> 14 #include <rte_prefetch.h> 15 #include <rte_ring_elem.h> 16 17 #include "member.h" 18 #include "rte_member.h" 19 #include "rte_member_sketch.h" 20 #include "rte_member_heap.h" 21 22 #ifdef CC_AVX512_SUPPORT 23 #include "rte_member_sketch_avx512.h" 24 #endif /* CC_AVX512_SUPPORT */ 25 26 struct sketch_runtime { 27 uint64_t pkt_cnt; 28 uint32_t until_next; 29 int converged; 30 struct minheap heap; 31 struct node *report_array; 32 void *key_slots; 33 struct rte_ring *free_key_slots; 34 } __rte_cache_aligned; 35 36 /* 37 * Geometric sampling to calculate how many packets needs to be 38 * skipped until next update. This method can mitigate the CPU 39 * overheads compared with coin-toss sampling. 40 */ 41 static uint32_t 42 draw_geometric(const struct rte_member_setsum *ss) 43 { 44 double rand = 1; 45 46 if (ss->sample_rate == 1) 47 return 1; 48 49 while (rand == 1 || rand == 0) 50 rand = (double) rte_rand() / (double)(RTE_RAND_MAX); 51 52 return (uint32_t)ceil(log(1 - rand) / log(1 - ss->sample_rate)); 53 } 54 55 static void 56 isort(uint64_t *array, int n) 57 { 58 int i; 59 60 for (i = 1; i < n; i++) { 61 uint64_t t = array[i]; 62 int j; 63 64 for (j = i - 1; j >= 0; j--) { 65 if (t < array[j]) 66 array[j + 1] = array[j]; 67 else 68 break; 69 } 70 array[j + 1] = t; 71 } 72 } 73 74 static __rte_always_inline void 75 swap(uint64_t *a, uint64_t *b) 76 { 77 uint64_t tmp = *a; 78 *a = *b; 79 *b = tmp; 80 } 81 82 static uint64_t 83 medianof5(uint64_t a, uint64_t b, uint64_t c, uint64_t d, uint64_t e) 84 { 85 if (a > b) 86 swap(&a, &b); 87 if (c > d) 88 swap(&c, &d); 89 if (a > c) { 90 if (d > e) 91 swap(&c, &e); 92 else { 93 swap(&c, &d); 94 swap(&d, &e); 95 } 96 } else { 97 if (b > e) 98 swap(&a, &e); 99 else { 100 swap(&a, &b); 101 swap(&b, &e); 102 } 103 } 104 105 if (a > c) 106 return a > d ? d : a; 107 else 108 return b > c ? c : b; 109 } 110 111 int 112 rte_member_create_sketch(struct rte_member_setsum *ss, 113 const struct rte_member_parameters *params, 114 struct rte_ring *ring) 115 { 116 struct sketch_runtime *runtime; 117 uint32_t num_col; 118 uint32_t i; 119 120 if (params->sample_rate == 0 || params->sample_rate > 1) { 121 rte_errno = EINVAL; 122 MEMBER_LOG(ERR, 123 "Membership Sketch created with invalid parameters"); 124 return -EINVAL; 125 } 126 127 if (params->extra_flag & RTE_MEMBER_SKETCH_COUNT_BYTE) 128 ss->count_byte = 1; 129 130 #ifdef RTE_ARCH_X86 131 if (ss->count_byte == 1 && 132 rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_512 && 133 rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512F) == 1 && 134 rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX512IFMA) == 1) { 135 #ifdef CC_AVX512_SUPPORT 136 ss->use_avx512 = true; 137 #else 138 ss->use_avx512 = false; 139 #endif 140 } 141 142 if (ss->use_avx512 == true) { 143 #ifdef CC_AVX512_SUPPORT 144 ss->num_row = NUM_ROW_VEC; 145 MEMBER_LOG(NOTICE, 146 "Membership Sketch AVX512 update/lookup/delete ops is selected"); 147 ss->sketch_update = sketch_update_avx512; 148 ss->sketch_lookup = sketch_lookup_avx512; 149 ss->sketch_delete = sketch_delete_avx512; 150 #endif 151 } else 152 #endif 153 { 154 ss->num_row = NUM_ROW_SCALAR; 155 MEMBER_LOG(NOTICE, 156 "Membership Sketch SCALAR update/lookup/delete ops is selected"); 157 ss->sketch_update = sketch_update_scalar; 158 ss->sketch_lookup = sketch_lookup_scalar; 159 ss->sketch_delete = sketch_delete_scalar; 160 } 161 162 ss->socket_id = params->socket_id; 163 164 if (ss->count_byte == 0) 165 num_col = 4.0 / params->error_rate / params->sample_rate; 166 #ifdef RTE_ARCH_X86 167 else if (ss->use_avx512 == true) 168 num_col = rte_align32pow2(4.0 / params->error_rate); 169 #endif 170 else 171 num_col = 4.0 / params->error_rate; 172 173 ss->table = rte_zmalloc_socket(NULL, 174 sizeof(uint64_t) * num_col * ss->num_row, 175 RTE_CACHE_LINE_SIZE, ss->socket_id); 176 if (ss->table == NULL) { 177 MEMBER_LOG(ERR, "Sketch Table memory allocation failed"); 178 return -ENOMEM; 179 } 180 181 ss->hash_seeds = rte_zmalloc_socket(NULL, sizeof(uint64_t) * ss->num_row, 182 RTE_CACHE_LINE_SIZE, ss->socket_id); 183 if (ss->hash_seeds == NULL) { 184 MEMBER_LOG(ERR, "Sketch Hashseeds memory allocation failed"); 185 return -ENOMEM; 186 } 187 188 ss->runtime_var = rte_zmalloc_socket(NULL, sizeof(struct sketch_runtime), 189 RTE_CACHE_LINE_SIZE, ss->socket_id); 190 if (ss->runtime_var == NULL) { 191 MEMBER_LOG(ERR, "Sketch Runtime memory allocation failed"); 192 rte_free(ss); 193 return -ENOMEM; 194 } 195 runtime = ss->runtime_var; 196 197 ss->num_col = num_col; 198 ss->sample_rate = params->sample_rate; 199 ss->prim_hash_seed = params->prim_hash_seed; 200 ss->sec_hash_seed = params->sec_hash_seed; 201 ss->error_rate = params->error_rate; 202 ss->topk = params->top_k; 203 ss->key_len = params->key_len; 204 runtime->heap.key_len = ss->key_len; 205 206 runtime->key_slots = rte_zmalloc_socket(NULL, ss->key_len * ss->topk, 207 RTE_CACHE_LINE_SIZE, ss->socket_id); 208 if (runtime->key_slots == NULL) { 209 MEMBER_LOG(ERR, "Sketch Key Slots allocation failed"); 210 goto error; 211 } 212 213 runtime->free_key_slots = ring; 214 for (i = 0; i < ss->topk; i++) 215 rte_ring_sp_enqueue_elem(runtime->free_key_slots, 216 &i, sizeof(uint32_t)); 217 218 if (rte_member_minheap_init(&(runtime->heap), params->top_k, 219 ss->socket_id, params->prim_hash_seed) < 0) { 220 MEMBER_LOG(ERR, "Sketch Minheap allocation failed"); 221 goto error_runtime; 222 } 223 224 runtime->report_array = rte_zmalloc_socket(NULL, sizeof(struct node) * ss->topk, 225 RTE_CACHE_LINE_SIZE, ss->socket_id); 226 if (runtime->report_array == NULL) { 227 MEMBER_LOG(ERR, "Sketch Runtime Report Array allocation failed"); 228 goto error_runtime; 229 } 230 231 for (i = 0; i < ss->num_row; i++) 232 ss->hash_seeds[i] = rte_rand(); 233 234 if (params->extra_flag & RTE_MEMBER_SKETCH_ALWAYS_BOUNDED) 235 ss->always_bounded = 1; 236 237 if (ss->always_bounded) { 238 double delta = 1.0 / (pow(2, ss->num_row)); 239 240 ss->converge_thresh = 10 * pow(ss->error_rate, -2.0) * sqrt(log(1 / delta)); 241 } 242 243 MEMBER_LOG(DEBUG, "Sketch created, " 244 "the total memory required is %u Bytes", ss->num_col * ss->num_row * 8); 245 246 return 0; 247 248 error_runtime: 249 rte_member_minheap_free(&runtime->heap); 250 rte_ring_free(runtime->free_key_slots); 251 rte_free(runtime->key_slots); 252 error: 253 rte_free(runtime); 254 rte_free(ss); 255 256 return -ENOMEM; 257 } 258 259 uint64_t 260 sketch_lookup_scalar(const struct rte_member_setsum *ss, const void *key) 261 { 262 uint64_t *count_array = ss->table; 263 uint32_t col[ss->num_row]; 264 uint64_t count_row[ss->num_row]; 265 uint32_t cur_row; 266 uint64_t count; 267 268 for (cur_row = 0; cur_row < ss->num_row; cur_row++) { 269 col[cur_row] = MEMBER_HASH_FUNC(key, ss->key_len, 270 ss->hash_seeds[cur_row]) % ss->num_col; 271 272 rte_prefetch0(&count_array[cur_row * ss->num_col + col[cur_row]]); 273 } 274 275 /* if sample rate is 1, it is a regular count-min, we report the min */ 276 if (ss->sample_rate == 1 || ss->count_byte == 1) 277 return count_min(ss, col); 278 279 memset(count_row, 0, sizeof(uint64_t) * ss->num_row); 280 281 /* otherwise we report the median number */ 282 for (cur_row = 0; cur_row < ss->num_row; cur_row++) 283 count_row[cur_row] = count_array[cur_row * ss->num_col + col[cur_row]]; 284 285 if (ss->num_row == 5) 286 return medianof5(count_row[0], count_row[1], 287 count_row[2], count_row[3], count_row[4]); 288 289 isort(count_row, ss->num_row); 290 291 if (ss->num_row % 2 == 0) { 292 count = (count_row[ss->num_row / 2] + count_row[ss->num_row / 2 - 1]) / 2; 293 return count; 294 } 295 /* ss->num_row % 2 != 0 */ 296 count = count_row[ss->num_row / 2]; 297 298 return count; 299 } 300 301 void 302 sketch_delete_scalar(const struct rte_member_setsum *ss, const void *key) 303 { 304 uint32_t col[ss->num_row]; 305 uint64_t *count_array = ss->table; 306 uint32_t cur_row; 307 308 for (cur_row = 0; cur_row < ss->num_row; cur_row++) { 309 col[cur_row] = MEMBER_HASH_FUNC(key, ss->key_len, 310 ss->hash_seeds[cur_row]) % ss->num_col; 311 312 /* set corresponding counter to 0 */ 313 count_array[cur_row * ss->num_col + col[cur_row]] = 0; 314 } 315 } 316 317 int 318 rte_member_query_sketch(const struct rte_member_setsum *ss, 319 const void *key, 320 uint64_t *output) 321 { 322 uint64_t count = ss->sketch_lookup(ss, key); 323 *output = count; 324 325 return 0; 326 } 327 328 void 329 rte_member_update_heap(const struct rte_member_setsum *ss) 330 { 331 uint32_t i; 332 struct sketch_runtime *runtime_var = ss->runtime_var; 333 334 for (i = 0; i < runtime_var->heap.size; i++) { 335 uint64_t count = ss->sketch_lookup(ss, runtime_var->heap.elem[i].key); 336 337 runtime_var->heap.elem[i].count = count; 338 } 339 } 340 341 int 342 rte_member_report_heavyhitter_sketch(const struct rte_member_setsum *setsum, 343 void **key, 344 uint64_t *count) 345 { 346 uint32_t i; 347 struct sketch_runtime *runtime_var = setsum->runtime_var; 348 349 rte_member_update_heap(setsum); 350 rte_member_heapsort(&(runtime_var->heap), runtime_var->report_array); 351 352 for (i = 0; i < runtime_var->heap.size; i++) { 353 key[i] = runtime_var->report_array[i].key; 354 count[i] = runtime_var->report_array[i].count; 355 } 356 357 return runtime_var->heap.size; 358 } 359 360 int 361 rte_member_lookup_sketch(const struct rte_member_setsum *ss, 362 const void *key, member_set_t *set_id) 363 { 364 uint64_t count = ss->sketch_lookup(ss, key); 365 struct sketch_runtime *runtime_var = ss->runtime_var; 366 367 if (runtime_var->heap.size > 0 && count >= runtime_var->heap.elem[0].count) 368 *set_id = 1; 369 else 370 *set_id = 0; 371 372 if (count == 0) 373 return 0; 374 else 375 return 1; 376 } 377 378 static void 379 should_converge(const struct rte_member_setsum *ss) 380 { 381 struct sketch_runtime *runtime_var = ss->runtime_var; 382 383 /* For count min sketch - L1 norm */ 384 if (runtime_var->pkt_cnt > ss->converge_thresh) { 385 runtime_var->converged = 1; 386 MEMBER_LOG(DEBUG, "Sketch converged, begin sampling " 387 "from key count %"PRIu64, 388 runtime_var->pkt_cnt); 389 } 390 } 391 392 static void 393 sketch_update_row(const struct rte_member_setsum *ss, const void *key, 394 uint32_t count, uint32_t cur_row) 395 { 396 uint64_t *count_array = ss->table; 397 uint32_t col = MEMBER_HASH_FUNC(key, ss->key_len, 398 ss->hash_seeds[cur_row]) % ss->num_col; 399 400 /* sketch counter update */ 401 count_array[cur_row * ss->num_col + col] += 402 ceil(count / (ss->sample_rate)); 403 } 404 405 void 406 sketch_update_scalar(const struct rte_member_setsum *ss, 407 const void *key, 408 uint32_t count) 409 { 410 uint64_t *count_array = ss->table; 411 uint32_t col; 412 uint32_t cur_row; 413 414 for (cur_row = 0; cur_row < ss->num_row; cur_row++) { 415 col = MEMBER_HASH_FUNC(key, ss->key_len, 416 ss->hash_seeds[cur_row]) % ss->num_col; 417 count_array[cur_row * ss->num_col + col] += count; 418 } 419 } 420 421 static void 422 heap_update(const struct rte_member_setsum *ss, const void *key) 423 { 424 struct sketch_runtime *runtime_var = ss->runtime_var; 425 uint64_t key_cnt = 0; 426 int found; 427 428 /* We also update the heap for this key */ 429 key_cnt = ss->sketch_lookup(ss, key); 430 if (key_cnt > runtime_var->heap.elem[0].count) { 431 found = rte_member_minheap_find(&runtime_var->heap, key); 432 /* the key is found in the top-k heap */ 433 if (found >= 0) { 434 if (runtime_var->heap.elem[found].count < key_cnt) 435 rte_member_heapify(&runtime_var->heap, found, true); 436 437 runtime_var->heap.elem[found].count = key_cnt; 438 } else if (runtime_var->heap.size < ss->topk) { 439 rte_member_minheap_insert_node(&runtime_var->heap, key, 440 key_cnt, runtime_var->key_slots, runtime_var->free_key_slots); 441 } else { 442 rte_member_minheap_replace_node(&runtime_var->heap, key, key_cnt); 443 } 444 } else if (runtime_var->heap.size < ss->topk) { 445 found = rte_member_minheap_find(&runtime_var->heap, key); 446 if (found >= 0) { 447 if (runtime_var->heap.elem[found].count < key_cnt) 448 rte_member_heapify(&runtime_var->heap, found, true); 449 450 runtime_var->heap.elem[found].count = key_cnt; 451 } else 452 rte_member_minheap_insert_node(&runtime_var->heap, key, 453 key_cnt, runtime_var->key_slots, runtime_var->free_key_slots); 454 } 455 } 456 457 /* 458 * Add a single packet into the sketch. 459 * Sketch value is meatured by packet numbers in this mode. 460 */ 461 int 462 rte_member_add_sketch(const struct rte_member_setsum *ss, 463 const void *key, 464 __rte_unused member_set_t set_id) 465 { 466 uint32_t cur_row; 467 struct sketch_runtime *runtime_var = ss->runtime_var; 468 uint32_t *until_next = &(runtime_var->until_next); 469 470 /* 471 * If sketch is measured by byte count, 472 * the rte_member_add_sketch_byte_count routine should be used. 473 */ 474 if (ss->count_byte == 1) { 475 MEMBER_LOG(ERR, "Sketch is Byte Mode, " 476 "should use rte_member_add_byte_count()!"); 477 return -EINVAL; 478 } 479 480 if (ss->sample_rate == 1) { 481 ss->sketch_update(ss, key, 1); 482 heap_update(ss, key); 483 return 0; 484 } 485 486 /* convergence stage if it's needed */ 487 if (ss->always_bounded && !runtime_var->converged) { 488 ss->sketch_update(ss, key, 1); 489 490 if (!((++runtime_var->pkt_cnt) & (INTERVAL - 1))) 491 should_converge(ss); 492 493 heap_update(ss, key); 494 return 0; 495 } 496 497 /* should we skip this packet */ 498 if (*until_next >= ss->num_row) { 499 *until_next -= ss->num_row; 500 return 0; 501 } 502 cur_row = *until_next; 503 do { 504 sketch_update_row(ss, key, 1, cur_row); 505 *until_next = draw_geometric(ss); 506 if (cur_row + *until_next >= ss->num_row) 507 break; 508 cur_row += *until_next; 509 } while (1); 510 511 *until_next -= (ss->num_row - cur_row); 512 513 heap_update(ss, key); 514 515 return 0; 516 } 517 518 /* 519 * Add the byte count of the packet into the sketch. 520 * Sketch value is meatured by byte count numbers in this mode. 521 */ 522 int 523 rte_member_add_sketch_byte_count(const struct rte_member_setsum *ss, 524 const void *key, 525 uint32_t byte_count) 526 { 527 struct sketch_runtime *runtime_var = ss->runtime_var; 528 uint32_t *until_next = &(runtime_var->until_next); 529 530 /* should not call this API if not in count byte mode */ 531 if (ss->count_byte == 0) { 532 MEMBER_LOG(ERR, "Sketch is Pkt Mode, " 533 "should use rte_member_add()!"); 534 return -EINVAL; 535 } 536 537 /* there's specific optimization for the sketch update */ 538 ss->sketch_update(ss, key, byte_count); 539 540 if (*until_next != 0) { 541 *until_next = *until_next - 1; 542 return 0; 543 } 544 545 *until_next = draw_geometric(ss) - 1; 546 547 heap_update(ss, key); 548 549 return 0; 550 } 551 552 int 553 rte_member_delete_sketch(const struct rte_member_setsum *ss, 554 const void *key) 555 { 556 struct sketch_runtime *runtime_var = ss->runtime_var; 557 int found; 558 559 found = rte_member_minheap_find(&runtime_var->heap, key); 560 if (found < 0) 561 return -1; 562 563 ss->sketch_delete(ss, key); 564 565 return rte_member_minheap_delete_node 566 (&runtime_var->heap, key, runtime_var->key_slots, runtime_var->free_key_slots); 567 } 568 569 void 570 rte_member_free_sketch(struct rte_member_setsum *ss) 571 { 572 struct sketch_runtime *runtime_var = ss->runtime_var; 573 574 rte_free(ss->table); 575 rte_member_minheap_free(&runtime_var->heap); 576 rte_free(runtime_var->key_slots); 577 rte_ring_free(runtime_var->free_key_slots); 578 rte_free(runtime_var); 579 } 580 581 void 582 rte_member_reset_sketch(const struct rte_member_setsum *ss) 583 { 584 struct sketch_runtime *runtime_var = ss->runtime_var; 585 uint64_t *sketch = ss->table; 586 uint32_t i; 587 588 memset(sketch, 0, sizeof(uint64_t) * ss->num_col * ss->num_row); 589 rte_member_minheap_reset(&runtime_var->heap); 590 rte_ring_reset(runtime_var->free_key_slots); 591 592 for (i = 0; i < ss->topk; i++) 593 rte_ring_sp_enqueue_elem(runtime_var->free_key_slots, &i, sizeof(uint32_t)); 594 } 595