1077596a4SPaul Szczepanek.. SPDX-License-Identifier: BSD-3-Clause 2077596a4SPaul Szczepanek Copyright(c) 2024 Arm Limited. 3077596a4SPaul Szczepanek 4077596a4SPaul SzczepanekPointer Compression Library 5077596a4SPaul Szczepanek=========================== 6077596a4SPaul Szczepanek 7077596a4SPaul SzczepanekUse ``rte_ptr_compress_16_shift()`` and ``rte_ptr_decompress_16_shift()`` 8077596a4SPaul Szczepanekto compress and decompress pointers into 16-bit offsets. 9077596a4SPaul SzczepanekUse ``rte_ptr_compress_32_shift()`` and ``rte_ptr_decompress_32_shift()`` 10077596a4SPaul Szczepanekto compress and decompress pointers into 32-bit offsets. 11077596a4SPaul Szczepanek 12077596a4SPaul SzczepanekCompression takes advantage of the fact that pointers are usually located in a limited memory region (like a mempool). 13077596a4SPaul SzczepanekBy converting them to offsets from a base memory address they can be stored in fewer bytes. 14077596a4SPaul SzczepanekHow many bytes are needed to store the offset is dictated by the memory region size and alignment of objects the pointers point to. 15077596a4SPaul Szczepanek 16077596a4SPaul SzczepanekFor example, a pointer which is part of a 4GB memory pool can be stored as 32 bit offset. 17077596a4SPaul SzczepanekIf the pointer points to memory that is 8 bytes aligned then 3 bits can be dropped from the offset and 18077596a4SPaul Szczepaneka 32GB memory pool can now fit in 32 bits. 19077596a4SPaul Szczepanek 20077596a4SPaul SzczepanekFor performance reasons these requirements are not enforced programmatically. 21077596a4SPaul SzczepanekThe programmer is responsible for ensuring that the combination of distance from the base pointer and 22077596a4SPaul Szczepanekmemory alignment allow for storing of the offset in the number of bits indicated by the function name (16 or 32). 23077596a4SPaul SzczepanekStart of mempool memory would be a good candidate for the base pointer. 24077596a4SPaul SzczepanekOtherwise any pointer that precedes all pointers, is close enough and 25077596a4SPaul Szczepanekhas the same alignment as the pointers being compressed will work. 26077596a4SPaul Szczepanek 27077596a4SPaul SzczepanekMacros present in the rte_ptr_compress.h header may be used to evaluate whether compression is possible: 28077596a4SPaul Szczepanek 29077596a4SPaul Szczepanek* RTE_PTR_COMPRESS_BITS_NEEDED_FOR_POINTER_WITHIN_RANGE 30077596a4SPaul Szczepanek 31077596a4SPaul Szczepanek* RTE_PTR_COMPRESS_BIT_SHIFT_FROM_ALIGNMENT 32077596a4SPaul Szczepanek 33077596a4SPaul Szczepanek* RTE_PTR_COMPRESS_CAN_COMPRESS_16_SHIFT 34077596a4SPaul Szczepanek 35077596a4SPaul Szczepanek* RTE_PTR_COMPRESS_CAN_COMPRESS_32_SHIFT 36077596a4SPaul Szczepanek 37077596a4SPaul SzczepanekThese will help you calculate compression parameters and whether these are legal for particular compression function. 38077596a4SPaul Szczepanek 39*2f1015d8SPaul SzczepanekIf using a mempool you can get the parameters you need to use in the compression macros and functions 40*2f1015d8SPaul Szczepanekby using ``rte_mempool_get_mem_range()`` and ``rte_mempool_get_obj_alignment()``. 41*2f1015d8SPaul Szczepanek 42077596a4SPaul Szczepanek.. note:: 43077596a4SPaul Szczepanek 44077596a4SPaul Szczepanek Performance gains depend on the batch size of pointers and CPU capabilities such as vector extensions. 45077596a4SPaul Szczepanek It's important to measure the performance increase on target hardware. 46a4cb5461SPaul Szczepanek A test called ``ring_perf_autotest`` in ``dpdk-test`` can provide the measurements. 47077596a4SPaul Szczepanek 48077596a4SPaul SzczepanekExample usage 49077596a4SPaul Szczepanek------------- 50077596a4SPaul Szczepanek 51077596a4SPaul SzczepanekIn this example we send pointers between two cores through a ring. 52077596a4SPaul SzczepanekWhile this is a realistic use case the code is simplified for demonstration purposes and does not have error handling. 53077596a4SPaul Szczepanek 54077596a4SPaul Szczepanek.. code-block:: c 55077596a4SPaul Szczepanek 56077596a4SPaul Szczepanek #include <rte_launch.h> 57077596a4SPaul Szczepanek #include <rte_ptr_compress.h> 58077596a4SPaul Szczepanek #include <rte_ring.h> 59077596a4SPaul Szczepanek #include <rte_ring_elem.h> 60077596a4SPaul Szczepanek 61077596a4SPaul Szczepanek #define ITEMS_ARRAY_SIZE (1024) 62077596a4SPaul Szczepanek #define BATCH_SIZE (128) 63077596a4SPaul Szczepanek #define ALIGN_EXPONENT (3) 64077596a4SPaul Szczepanek #define ITEM_ALIGN (1<<ALIGN_EXPONENT) 65077596a4SPaul Szczepanek #define CORE_SEND (1) 66077596a4SPaul Szczepanek #define CORE_RECV (2) 67077596a4SPaul Szczepanek 68077596a4SPaul Szczepanek struct item { 69077596a4SPaul Szczepanek alignas(ITEM_ALIGN) int a; 70077596a4SPaul Szczepanek }; 71077596a4SPaul Szczepanek 72077596a4SPaul Szczepanek static struct item items[ITEMS_ARRAY_SIZE] = {0}; 73077596a4SPaul Szczepanek static struct rte_ring *ring = NULL; 74077596a4SPaul Szczepanek 75077596a4SPaul Szczepanek static int 76077596a4SPaul Szczepanek send_compressed(void *args) 77077596a4SPaul Szczepanek { 78077596a4SPaul Szczepanek struct item *ptrs_send[BATCH_SIZE] = {0}; 79077596a4SPaul Szczepanek unsigned int n_send = 0; 80077596a4SPaul Szczepanek struct rte_ring_zc_data zcd = {0}; 81077596a4SPaul Szczepanek 82077596a4SPaul Szczepanek /* in this example we only fill the ptrs_send once and reuse */ 83077596a4SPaul Szczepanek for (;n_send < BATCH_SIZE; n_send++) 84077596a4SPaul Szczepanek ptrs_send[n_send] = &items[n_send]; 85077596a4SPaul Szczepanek 86077596a4SPaul Szczepanek for(;;) { 87077596a4SPaul Szczepanek n_send = rte_ring_enqueue_zc_burst_elem_start( 88077596a4SPaul Szczepanek ring, sizeof(uint32_t), BATCH_SIZE, &zcd, NULL); 89077596a4SPaul Szczepanek 90077596a4SPaul Szczepanek /* compress ptrs_send into offsets */ 91077596a4SPaul Szczepanek rte_ptr_compress_32_shift(items, /* base pointer */ 92077596a4SPaul Szczepanek ptrs_send, /* source array to be compressed */ 93077596a4SPaul Szczepanek zcd.ptr1, /* destination array to store offsets */ 94077596a4SPaul Szczepanek zcd.n1, /* how many pointers to compress */ 95077596a4SPaul Szczepanek ALIGN_EXPONENT /* how many bits can we drop from the offset */); 96077596a4SPaul Szczepanek 97077596a4SPaul Szczepanek if (zcd.ptr2 != NULL) 98077596a4SPaul Szczepanek rte_ptr_compress_32_shift(items, ptrs_send + zcd.n1, 99077596a4SPaul Szczepanek zcd.ptr2, n_send - zcd.n1, ALIGN_EXPONENT); 100077596a4SPaul Szczepanek 101077596a4SPaul Szczepanek rte_ring_enqueue_zc_finish(ring, n_send); 102077596a4SPaul Szczepanek } 103077596a4SPaul Szczepanek return 1; 104077596a4SPaul Szczepanek } 105077596a4SPaul Szczepanek 106077596a4SPaul Szczepanek static int 107077596a4SPaul Szczepanek recv_compressed(void *args) 108077596a4SPaul Szczepanek { 109077596a4SPaul Szczepanek struct item *ptrs_recv[BATCH_SIZE] = {0}; 110077596a4SPaul Szczepanek unsigned int n_recv; 111077596a4SPaul Szczepanek struct rte_ring_zc_data zcd = {0}; 112077596a4SPaul Szczepanek 113077596a4SPaul Szczepanek for(;;) { 114077596a4SPaul Szczepanek /* receive compressed pointers from the ring */ 115077596a4SPaul Szczepanek n_recv = rte_ring_dequeue_zc_burst_elem_start( 116077596a4SPaul Szczepanek ring, sizeof(uint32_t), BATCH_SIZE, &zcd, NULL); 117077596a4SPaul Szczepanek 118077596a4SPaul Szczepanek rte_ptr_decompress_32_shift(items, /* base pointer */ 119077596a4SPaul Szczepanek zcd.ptr1, /* source array to decompress */ 120077596a4SPaul Szczepanek ptrs_recv, /* destination array to store pointers */ 121077596a4SPaul Szczepanek zcd.n1, /* how many pointers to decompress */ 122077596a4SPaul Szczepanek ALIGN_EXPONENT /* how many bits were dropped from the offset */); 123077596a4SPaul Szczepanek 124077596a4SPaul Szczepanek /* handle the potential secondary buffer (caused by ring boundary) */ 125077596a4SPaul Szczepanek if (zcd.ptr2 != NULL) 126077596a4SPaul Szczepanek rte_ptr_decompress_32_shift(items, 127077596a4SPaul Szczepanek zcd.ptr2, 128077596a4SPaul Szczepanek ptrs_recv + zcd.n1, 129077596a4SPaul Szczepanek n_recv - zcd.n1, 130077596a4SPaul Szczepanek ALIGN_EXPONENT); 131077596a4SPaul Szczepanek 132077596a4SPaul Szczepanek rte_ring_dequeue_zc_finish(ring, n_recv); 133077596a4SPaul Szczepanek 134077596a4SPaul Szczepanek /* ptrs_recv contains what ptrs_send contained in the other thread */ 135077596a4SPaul Szczepanek /* (...) */ 136077596a4SPaul Szczepanek } 137077596a4SPaul Szczepanek return 1; 138077596a4SPaul Szczepanek } 139077596a4SPaul Szczepanek 140077596a4SPaul Szczepanek void 141077596a4SPaul Szczepanek compression_example(void) 142077596a4SPaul Szczepanek { 143077596a4SPaul Szczepanek ring = rte_ring_create_elem( 144077596a4SPaul Szczepanek "COMPR_PTRS", sizeof(uint32_t), 145077596a4SPaul Szczepanek 1024, rte_socket_id(), 146077596a4SPaul Szczepanek RING_F_SP_ENQ | RING_F_SC_DEQ); 147077596a4SPaul Szczepanek 148077596a4SPaul Szczepanek rte_eal_remote_launch(send_compressed, NULL, CORE_SEND); 149077596a4SPaul Szczepanek rte_eal_remote_launch(recv_compressed, NULL, CORE_RECV); 150077596a4SPaul Szczepanek 151077596a4SPaul Szczepanek for(;;) {} 152077596a4SPaul Szczepanek } 153