xref: /dpdk/doc/guides/prog_guide/ptr_compress_lib.rst (revision 2f1015d8d56d32465cc260faf469950ebb9cf73b)
1077596a4SPaul Szczepanek..  SPDX-License-Identifier: BSD-3-Clause
2077596a4SPaul Szczepanek    Copyright(c) 2024 Arm Limited.
3077596a4SPaul Szczepanek
4077596a4SPaul SzczepanekPointer Compression Library
5077596a4SPaul Szczepanek===========================
6077596a4SPaul Szczepanek
7077596a4SPaul SzczepanekUse ``rte_ptr_compress_16_shift()`` and ``rte_ptr_decompress_16_shift()``
8077596a4SPaul Szczepanekto compress and decompress pointers into 16-bit offsets.
9077596a4SPaul SzczepanekUse ``rte_ptr_compress_32_shift()`` and ``rte_ptr_decompress_32_shift()``
10077596a4SPaul Szczepanekto compress and decompress pointers into 32-bit offsets.
11077596a4SPaul Szczepanek
12077596a4SPaul SzczepanekCompression takes advantage of the fact that pointers are usually located in a limited memory region (like a mempool).
13077596a4SPaul SzczepanekBy converting them to offsets from a base memory address they can be stored in fewer bytes.
14077596a4SPaul SzczepanekHow many bytes are needed to store the offset is dictated by the memory region size and alignment of objects the pointers point to.
15077596a4SPaul Szczepanek
16077596a4SPaul SzczepanekFor example, a pointer which is part of a 4GB memory pool can be stored as 32 bit offset.
17077596a4SPaul SzczepanekIf the pointer points to memory that is 8 bytes aligned then 3 bits can be dropped from the offset and
18077596a4SPaul Szczepaneka 32GB memory pool can now fit in 32 bits.
19077596a4SPaul Szczepanek
20077596a4SPaul SzczepanekFor performance reasons these requirements are not enforced programmatically.
21077596a4SPaul SzczepanekThe programmer is responsible for ensuring that the combination of distance from the base pointer and
22077596a4SPaul Szczepanekmemory alignment allow for storing of the offset in the number of bits indicated by the function name (16 or 32).
23077596a4SPaul SzczepanekStart of mempool memory would be a good candidate for the base pointer.
24077596a4SPaul SzczepanekOtherwise any pointer that precedes all pointers, is close enough and
25077596a4SPaul Szczepanekhas the same alignment as the pointers being compressed will work.
26077596a4SPaul Szczepanek
27077596a4SPaul SzczepanekMacros present in the rte_ptr_compress.h header may be used to evaluate whether compression is possible:
28077596a4SPaul Szczepanek
29077596a4SPaul Szczepanek*   RTE_PTR_COMPRESS_BITS_NEEDED_FOR_POINTER_WITHIN_RANGE
30077596a4SPaul Szczepanek
31077596a4SPaul Szczepanek*   RTE_PTR_COMPRESS_BIT_SHIFT_FROM_ALIGNMENT
32077596a4SPaul Szczepanek
33077596a4SPaul Szczepanek*   RTE_PTR_COMPRESS_CAN_COMPRESS_16_SHIFT
34077596a4SPaul Szczepanek
35077596a4SPaul Szczepanek*   RTE_PTR_COMPRESS_CAN_COMPRESS_32_SHIFT
36077596a4SPaul Szczepanek
37077596a4SPaul SzczepanekThese will help you calculate compression parameters and whether these are legal for particular compression function.
38077596a4SPaul Szczepanek
39*2f1015d8SPaul SzczepanekIf using a mempool you can get the parameters you need to use in the compression macros and functions
40*2f1015d8SPaul Szczepanekby using ``rte_mempool_get_mem_range()`` and ``rte_mempool_get_obj_alignment()``.
41*2f1015d8SPaul Szczepanek
42077596a4SPaul Szczepanek.. note::
43077596a4SPaul Szczepanek
44077596a4SPaul Szczepanek    Performance gains depend on the batch size of pointers and CPU capabilities such as vector extensions.
45077596a4SPaul Szczepanek    It's important to measure the performance increase on target hardware.
46a4cb5461SPaul Szczepanek    A test called ``ring_perf_autotest`` in ``dpdk-test`` can provide the measurements.
47077596a4SPaul Szczepanek
48077596a4SPaul SzczepanekExample usage
49077596a4SPaul Szczepanek-------------
50077596a4SPaul Szczepanek
51077596a4SPaul SzczepanekIn this example we send pointers between two cores through a ring.
52077596a4SPaul SzczepanekWhile this is a realistic use case the code is simplified for demonstration purposes and does not have error handling.
53077596a4SPaul Szczepanek
54077596a4SPaul Szczepanek.. code-block:: c
55077596a4SPaul Szczepanek
56077596a4SPaul Szczepanek    #include <rte_launch.h>
57077596a4SPaul Szczepanek    #include <rte_ptr_compress.h>
58077596a4SPaul Szczepanek    #include <rte_ring.h>
59077596a4SPaul Szczepanek    #include <rte_ring_elem.h>
60077596a4SPaul Szczepanek
61077596a4SPaul Szczepanek    #define ITEMS_ARRAY_SIZE (1024)
62077596a4SPaul Szczepanek    #define BATCH_SIZE (128)
63077596a4SPaul Szczepanek    #define ALIGN_EXPONENT (3)
64077596a4SPaul Szczepanek    #define ITEM_ALIGN (1<<ALIGN_EXPONENT)
65077596a4SPaul Szczepanek    #define CORE_SEND (1)
66077596a4SPaul Szczepanek    #define CORE_RECV (2)
67077596a4SPaul Szczepanek
68077596a4SPaul Szczepanek    struct item {
69077596a4SPaul Szczepanek      alignas(ITEM_ALIGN) int a;
70077596a4SPaul Szczepanek    };
71077596a4SPaul Szczepanek
72077596a4SPaul Szczepanek    static struct item items[ITEMS_ARRAY_SIZE] = {0};
73077596a4SPaul Szczepanek    static struct rte_ring *ring = NULL;
74077596a4SPaul Szczepanek
75077596a4SPaul Szczepanek    static int
76077596a4SPaul Szczepanek    send_compressed(void *args)
77077596a4SPaul Szczepanek    {
78077596a4SPaul Szczepanek      struct item *ptrs_send[BATCH_SIZE] = {0};
79077596a4SPaul Szczepanek      unsigned int n_send = 0;
80077596a4SPaul Szczepanek      struct rte_ring_zc_data zcd = {0};
81077596a4SPaul Szczepanek
82077596a4SPaul Szczepanek      /* in this example we only fill the ptrs_send once and reuse */
83077596a4SPaul Szczepanek      for (;n_send < BATCH_SIZE; n_send++)
84077596a4SPaul Szczepanek        ptrs_send[n_send] = &items[n_send];
85077596a4SPaul Szczepanek
86077596a4SPaul Szczepanek      for(;;) {
87077596a4SPaul Szczepanek        n_send = rte_ring_enqueue_zc_burst_elem_start(
88077596a4SPaul Szczepanek          ring, sizeof(uint32_t), BATCH_SIZE, &zcd, NULL);
89077596a4SPaul Szczepanek
90077596a4SPaul Szczepanek        /* compress ptrs_send into offsets */
91077596a4SPaul Szczepanek        rte_ptr_compress_32_shift(items, /* base pointer */
92077596a4SPaul Szczepanek          ptrs_send, /* source array to be compressed */
93077596a4SPaul Szczepanek          zcd.ptr1, /* destination array to store offsets */
94077596a4SPaul Szczepanek          zcd.n1, /* how many pointers to compress */
95077596a4SPaul Szczepanek          ALIGN_EXPONENT /* how many bits can we drop from the offset */);
96077596a4SPaul Szczepanek
97077596a4SPaul Szczepanek        if (zcd.ptr2 != NULL)
98077596a4SPaul Szczepanek          rte_ptr_compress_32_shift(items, ptrs_send + zcd.n1,
99077596a4SPaul Szczepanek            zcd.ptr2, n_send - zcd.n1, ALIGN_EXPONENT);
100077596a4SPaul Szczepanek
101077596a4SPaul Szczepanek        rte_ring_enqueue_zc_finish(ring, n_send);
102077596a4SPaul Szczepanek      }
103077596a4SPaul Szczepanek      return 1;
104077596a4SPaul Szczepanek    }
105077596a4SPaul Szczepanek
106077596a4SPaul Szczepanek    static int
107077596a4SPaul Szczepanek    recv_compressed(void *args)
108077596a4SPaul Szczepanek    {
109077596a4SPaul Szczepanek      struct item *ptrs_recv[BATCH_SIZE] = {0};
110077596a4SPaul Szczepanek      unsigned int n_recv;
111077596a4SPaul Szczepanek      struct rte_ring_zc_data zcd = {0};
112077596a4SPaul Szczepanek
113077596a4SPaul Szczepanek      for(;;) {
114077596a4SPaul Szczepanek        /* receive compressed pointers from the ring */
115077596a4SPaul Szczepanek        n_recv = rte_ring_dequeue_zc_burst_elem_start(
116077596a4SPaul Szczepanek          ring, sizeof(uint32_t), BATCH_SIZE, &zcd, NULL);
117077596a4SPaul Szczepanek
118077596a4SPaul Szczepanek        rte_ptr_decompress_32_shift(items, /* base pointer */
119077596a4SPaul Szczepanek          zcd.ptr1, /* source array to decompress */
120077596a4SPaul Szczepanek          ptrs_recv, /* destination array to store pointers */
121077596a4SPaul Szczepanek          zcd.n1, /* how many pointers to decompress */
122077596a4SPaul Szczepanek          ALIGN_EXPONENT /* how many bits were dropped from the offset */);
123077596a4SPaul Szczepanek
124077596a4SPaul Szczepanek        /* handle the potential secondary buffer (caused by ring boundary) */
125077596a4SPaul Szczepanek        if (zcd.ptr2 != NULL)
126077596a4SPaul Szczepanek          rte_ptr_decompress_32_shift(items,
127077596a4SPaul Szczepanek            zcd.ptr2,
128077596a4SPaul Szczepanek            ptrs_recv + zcd.n1,
129077596a4SPaul Szczepanek            n_recv - zcd.n1,
130077596a4SPaul Szczepanek            ALIGN_EXPONENT);
131077596a4SPaul Szczepanek
132077596a4SPaul Szczepanek        rte_ring_dequeue_zc_finish(ring, n_recv);
133077596a4SPaul Szczepanek
134077596a4SPaul Szczepanek        /* ptrs_recv contains what ptrs_send contained in the other thread */
135077596a4SPaul Szczepanek        /* (...) */
136077596a4SPaul Szczepanek      }
137077596a4SPaul Szczepanek      return 1;
138077596a4SPaul Szczepanek    }
139077596a4SPaul Szczepanek
140077596a4SPaul Szczepanek    void
141077596a4SPaul Szczepanek    compression_example(void)
142077596a4SPaul Szczepanek    {
143077596a4SPaul Szczepanek      ring = rte_ring_create_elem(
144077596a4SPaul Szczepanek        "COMPR_PTRS", sizeof(uint32_t),
145077596a4SPaul Szczepanek        1024, rte_socket_id(),
146077596a4SPaul Szczepanek        RING_F_SP_ENQ | RING_F_SC_DEQ);
147077596a4SPaul Szczepanek
148077596a4SPaul Szczepanek      rte_eal_remote_launch(send_compressed, NULL, CORE_SEND);
149077596a4SPaul Szczepanek      rte_eal_remote_launch(recv_compressed, NULL, CORE_RECV);
150077596a4SPaul Szczepanek
151077596a4SPaul Szczepanek      for(;;) {}
152077596a4SPaul Szczepanek    }
153