199a2dd95SBruce Richardson /* SPDX-License-Identifier: BSD-3-Clause
299a2dd95SBruce Richardson * Copyright(c) 2010-2014 Intel Corporation
399a2dd95SBruce Richardson */
499a2dd95SBruce Richardson
599a2dd95SBruce Richardson #include <stdio.h>
699a2dd95SBruce Richardson #include <sys/queue.h>
799a2dd95SBruce Richardson #include <rte_mbuf.h>
899a2dd95SBruce Richardson #include <rte_memzone.h>
999a2dd95SBruce Richardson #include <rte_errno.h>
1099a2dd95SBruce Richardson #include <rte_string_fns.h>
1199a2dd95SBruce Richardson #include <rte_eal_memconfig.h>
1299a2dd95SBruce Richardson #include <rte_pause.h>
1399a2dd95SBruce Richardson #include <rte_tailq.h>
1499a2dd95SBruce Richardson
1599a2dd95SBruce Richardson #include "rte_distributor_single.h"
1699a2dd95SBruce Richardson #include "distributor_private.h"
1799a2dd95SBruce Richardson
1899a2dd95SBruce Richardson TAILQ_HEAD(rte_distributor_list, rte_distributor_single);
1999a2dd95SBruce Richardson
2099a2dd95SBruce Richardson static struct rte_tailq_elem rte_distributor_tailq = {
2199a2dd95SBruce Richardson .name = "RTE_DISTRIBUTOR",
2299a2dd95SBruce Richardson };
EAL_REGISTER_TAILQ(rte_distributor_tailq)2399a2dd95SBruce Richardson EAL_REGISTER_TAILQ(rte_distributor_tailq)
2499a2dd95SBruce Richardson
2599a2dd95SBruce Richardson /**** APIs called by workers ****/
2699a2dd95SBruce Richardson
2799a2dd95SBruce Richardson void
2899a2dd95SBruce Richardson rte_distributor_request_pkt_single(struct rte_distributor_single *d,
2999a2dd95SBruce Richardson unsigned worker_id, struct rte_mbuf *oldpkt)
3099a2dd95SBruce Richardson {
3199a2dd95SBruce Richardson union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
3299a2dd95SBruce Richardson int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
3399a2dd95SBruce Richardson | RTE_DISTRIB_GET_BUF;
346b70c6b3SFeifei Wang RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
35c2a363a3STyler Retzlaff ==, 0, rte_memory_order_relaxed);
3699a2dd95SBruce Richardson
3799a2dd95SBruce Richardson /* Sync with distributor on GET_BUF flag. */
38c2a363a3STyler Retzlaff rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
3999a2dd95SBruce Richardson }
4099a2dd95SBruce Richardson
4199a2dd95SBruce Richardson struct rte_mbuf *
rte_distributor_poll_pkt_single(struct rte_distributor_single * d,unsigned worker_id)4299a2dd95SBruce Richardson rte_distributor_poll_pkt_single(struct rte_distributor_single *d,
4399a2dd95SBruce Richardson unsigned worker_id)
4499a2dd95SBruce Richardson {
4599a2dd95SBruce Richardson union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
4699a2dd95SBruce Richardson /* Sync with distributor. Acquire bufptr64. */
47c2a363a3STyler Retzlaff if (rte_atomic_load_explicit(&buf->bufptr64, rte_memory_order_acquire)
4899a2dd95SBruce Richardson & RTE_DISTRIB_GET_BUF)
4999a2dd95SBruce Richardson return NULL;
5099a2dd95SBruce Richardson
5199a2dd95SBruce Richardson /* since bufptr64 is signed, this should be an arithmetic shift */
5299a2dd95SBruce Richardson int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
5399a2dd95SBruce Richardson return (struct rte_mbuf *)((uintptr_t)ret);
5499a2dd95SBruce Richardson }
5599a2dd95SBruce Richardson
5699a2dd95SBruce Richardson struct rte_mbuf *
rte_distributor_get_pkt_single(struct rte_distributor_single * d,unsigned worker_id,struct rte_mbuf * oldpkt)5799a2dd95SBruce Richardson rte_distributor_get_pkt_single(struct rte_distributor_single *d,
5899a2dd95SBruce Richardson unsigned worker_id, struct rte_mbuf *oldpkt)
5999a2dd95SBruce Richardson {
6099a2dd95SBruce Richardson struct rte_mbuf *ret;
6199a2dd95SBruce Richardson rte_distributor_request_pkt_single(d, worker_id, oldpkt);
6299a2dd95SBruce Richardson while ((ret = rte_distributor_poll_pkt_single(d, worker_id)) == NULL)
6399a2dd95SBruce Richardson rte_pause();
6499a2dd95SBruce Richardson return ret;
6599a2dd95SBruce Richardson }
6699a2dd95SBruce Richardson
6799a2dd95SBruce Richardson int
rte_distributor_return_pkt_single(struct rte_distributor_single * d,unsigned worker_id,struct rte_mbuf * oldpkt)6899a2dd95SBruce Richardson rte_distributor_return_pkt_single(struct rte_distributor_single *d,
6999a2dd95SBruce Richardson unsigned worker_id, struct rte_mbuf *oldpkt)
7099a2dd95SBruce Richardson {
7199a2dd95SBruce Richardson union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
7299a2dd95SBruce Richardson uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
7399a2dd95SBruce Richardson | RTE_DISTRIB_RETURN_BUF;
746b70c6b3SFeifei Wang RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
75c2a363a3STyler Retzlaff ==, 0, rte_memory_order_relaxed);
7699a2dd95SBruce Richardson
7799a2dd95SBruce Richardson /* Sync with distributor on RETURN_BUF flag. */
78c2a363a3STyler Retzlaff rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
7999a2dd95SBruce Richardson return 0;
8099a2dd95SBruce Richardson }
8199a2dd95SBruce Richardson
8299a2dd95SBruce Richardson /**** APIs called on distributor core ***/
8399a2dd95SBruce Richardson
8499a2dd95SBruce Richardson /* as name suggests, adds a packet to the backlog for a particular worker */
8599a2dd95SBruce Richardson static int
add_to_backlog(struct rte_distributor_backlog * bl,int64_t item)8699a2dd95SBruce Richardson add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
8799a2dd95SBruce Richardson {
8899a2dd95SBruce Richardson if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
8999a2dd95SBruce Richardson return -1;
9099a2dd95SBruce Richardson
9199a2dd95SBruce Richardson bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
9299a2dd95SBruce Richardson = item;
9399a2dd95SBruce Richardson return 0;
9499a2dd95SBruce Richardson }
9599a2dd95SBruce Richardson
9699a2dd95SBruce Richardson /* takes the next packet for a worker off the backlog */
9799a2dd95SBruce Richardson static int64_t
backlog_pop(struct rte_distributor_backlog * bl)9899a2dd95SBruce Richardson backlog_pop(struct rte_distributor_backlog *bl)
9999a2dd95SBruce Richardson {
10099a2dd95SBruce Richardson bl->count--;
10199a2dd95SBruce Richardson return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
10299a2dd95SBruce Richardson }
10399a2dd95SBruce Richardson
10499a2dd95SBruce Richardson /* stores a packet returned from a worker inside the returns array */
10599a2dd95SBruce Richardson static inline void
store_return(uintptr_t oldbuf,struct rte_distributor_single * d,unsigned * ret_start,unsigned * ret_count)10699a2dd95SBruce Richardson store_return(uintptr_t oldbuf, struct rte_distributor_single *d,
10799a2dd95SBruce Richardson unsigned *ret_start, unsigned *ret_count)
10899a2dd95SBruce Richardson {
10999a2dd95SBruce Richardson /* store returns in a circular buffer - code is branch-free */
11099a2dd95SBruce Richardson d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
11199a2dd95SBruce Richardson = (void *)oldbuf;
11299a2dd95SBruce Richardson *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
11399a2dd95SBruce Richardson *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
11499a2dd95SBruce Richardson }
11599a2dd95SBruce Richardson
11699a2dd95SBruce Richardson static inline void
handle_worker_shutdown(struct rte_distributor_single * d,unsigned int wkr)11799a2dd95SBruce Richardson handle_worker_shutdown(struct rte_distributor_single *d, unsigned int wkr)
11899a2dd95SBruce Richardson {
11999a2dd95SBruce Richardson d->in_flight_tags[wkr] = 0;
12099a2dd95SBruce Richardson d->in_flight_bitmask &= ~(1UL << wkr);
12199a2dd95SBruce Richardson /* Sync with worker. Release bufptr64. */
122c2a363a3STyler Retzlaff rte_atomic_store_explicit(&d->bufs[wkr].bufptr64, 0, rte_memory_order_release);
12399a2dd95SBruce Richardson if (unlikely(d->backlog[wkr].count != 0)) {
12499a2dd95SBruce Richardson /* On return of a packet, we need to move the
12599a2dd95SBruce Richardson * queued packets for this core elsewhere.
12699a2dd95SBruce Richardson * Easiest solution is to set things up for
12799a2dd95SBruce Richardson * a recursive call. That will cause those
12899a2dd95SBruce Richardson * packets to be queued up for the next free
12999a2dd95SBruce Richardson * core, i.e. it will return as soon as a
13099a2dd95SBruce Richardson * core becomes free to accept the first
13199a2dd95SBruce Richardson * packet, as subsequent ones will be added to
13299a2dd95SBruce Richardson * the backlog for that core.
13399a2dd95SBruce Richardson */
13499a2dd95SBruce Richardson struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
13599a2dd95SBruce Richardson unsigned i;
13699a2dd95SBruce Richardson struct rte_distributor_backlog *bl = &d->backlog[wkr];
13799a2dd95SBruce Richardson
13899a2dd95SBruce Richardson for (i = 0; i < bl->count; i++) {
13999a2dd95SBruce Richardson unsigned idx = (bl->start + i) &
14099a2dd95SBruce Richardson RTE_DISTRIB_BACKLOG_MASK;
14199a2dd95SBruce Richardson pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
14299a2dd95SBruce Richardson RTE_DISTRIB_FLAG_BITS));
14399a2dd95SBruce Richardson }
14499a2dd95SBruce Richardson /* recursive call.
14599a2dd95SBruce Richardson * Note that the tags were set before first level call
14699a2dd95SBruce Richardson * to rte_distributor_process.
14799a2dd95SBruce Richardson */
14899a2dd95SBruce Richardson rte_distributor_process_single(d, pkts, i);
14999a2dd95SBruce Richardson bl->count = bl->start = 0;
15099a2dd95SBruce Richardson }
15199a2dd95SBruce Richardson }
15299a2dd95SBruce Richardson
15399a2dd95SBruce Richardson /* this function is called when process() fn is called without any new
15499a2dd95SBruce Richardson * packets. It goes through all the workers and clears any returned packets
15599a2dd95SBruce Richardson * to do a partial flush.
15699a2dd95SBruce Richardson */
15799a2dd95SBruce Richardson static int
process_returns(struct rte_distributor_single * d)15899a2dd95SBruce Richardson process_returns(struct rte_distributor_single *d)
15999a2dd95SBruce Richardson {
16099a2dd95SBruce Richardson unsigned wkr;
16199a2dd95SBruce Richardson unsigned flushed = 0;
16299a2dd95SBruce Richardson unsigned ret_start = d->returns.start,
16399a2dd95SBruce Richardson ret_count = d->returns.count;
16499a2dd95SBruce Richardson
16599a2dd95SBruce Richardson for (wkr = 0; wkr < d->num_workers; wkr++) {
16699a2dd95SBruce Richardson uintptr_t oldbuf = 0;
16799a2dd95SBruce Richardson /* Sync with worker. Acquire bufptr64. */
168c2a363a3STyler Retzlaff const int64_t data = rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
169c2a363a3STyler Retzlaff rte_memory_order_acquire);
17099a2dd95SBruce Richardson
17199a2dd95SBruce Richardson if (data & RTE_DISTRIB_GET_BUF) {
17299a2dd95SBruce Richardson flushed++;
17399a2dd95SBruce Richardson if (d->backlog[wkr].count)
17499a2dd95SBruce Richardson /* Sync with worker. Release bufptr64. */
175c2a363a3STyler Retzlaff rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
17699a2dd95SBruce Richardson backlog_pop(&d->backlog[wkr]),
177c2a363a3STyler Retzlaff rte_memory_order_release);
17899a2dd95SBruce Richardson else {
17999a2dd95SBruce Richardson /* Sync with worker on GET_BUF flag. */
180c2a363a3STyler Retzlaff rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
18199a2dd95SBruce Richardson RTE_DISTRIB_GET_BUF,
182c2a363a3STyler Retzlaff rte_memory_order_release);
18399a2dd95SBruce Richardson d->in_flight_tags[wkr] = 0;
18499a2dd95SBruce Richardson d->in_flight_bitmask &= ~(1UL << wkr);
18599a2dd95SBruce Richardson }
18699a2dd95SBruce Richardson oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
18799a2dd95SBruce Richardson } else if (data & RTE_DISTRIB_RETURN_BUF) {
18899a2dd95SBruce Richardson handle_worker_shutdown(d, wkr);
18999a2dd95SBruce Richardson oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
19099a2dd95SBruce Richardson }
19199a2dd95SBruce Richardson
19299a2dd95SBruce Richardson store_return(oldbuf, d, &ret_start, &ret_count);
19399a2dd95SBruce Richardson }
19499a2dd95SBruce Richardson
19599a2dd95SBruce Richardson d->returns.start = ret_start;
19699a2dd95SBruce Richardson d->returns.count = ret_count;
19799a2dd95SBruce Richardson
19899a2dd95SBruce Richardson return flushed;
19999a2dd95SBruce Richardson }
20099a2dd95SBruce Richardson
20199a2dd95SBruce Richardson /* process a set of packets to distribute them to workers */
20299a2dd95SBruce Richardson int
rte_distributor_process_single(struct rte_distributor_single * d,struct rte_mbuf ** mbufs,unsigned num_mbufs)20399a2dd95SBruce Richardson rte_distributor_process_single(struct rte_distributor_single *d,
20499a2dd95SBruce Richardson struct rte_mbuf **mbufs, unsigned num_mbufs)
20599a2dd95SBruce Richardson {
20699a2dd95SBruce Richardson unsigned next_idx = 0;
20799a2dd95SBruce Richardson unsigned wkr = 0;
20899a2dd95SBruce Richardson struct rte_mbuf *next_mb = NULL;
20999a2dd95SBruce Richardson int64_t next_value = 0;
21099a2dd95SBruce Richardson uint32_t new_tag = 0;
21199a2dd95SBruce Richardson unsigned ret_start = d->returns.start,
21299a2dd95SBruce Richardson ret_count = d->returns.count;
21399a2dd95SBruce Richardson
21499a2dd95SBruce Richardson if (unlikely(num_mbufs == 0))
21599a2dd95SBruce Richardson return process_returns(d);
21699a2dd95SBruce Richardson
21799a2dd95SBruce Richardson while (next_idx < num_mbufs || next_mb != NULL) {
21899a2dd95SBruce Richardson uintptr_t oldbuf = 0;
21999a2dd95SBruce Richardson /* Sync with worker. Acquire bufptr64. */
220c2a363a3STyler Retzlaff int64_t data = rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64),
221c2a363a3STyler Retzlaff rte_memory_order_acquire);
22299a2dd95SBruce Richardson
22399a2dd95SBruce Richardson if (!next_mb) {
22499a2dd95SBruce Richardson next_mb = mbufs[next_idx++];
22599a2dd95SBruce Richardson next_value = (((int64_t)(uintptr_t)next_mb)
22699a2dd95SBruce Richardson << RTE_DISTRIB_FLAG_BITS);
22799a2dd95SBruce Richardson /*
22899a2dd95SBruce Richardson * User is advocated to set tag value for each
22999a2dd95SBruce Richardson * mbuf before calling rte_distributor_process.
23099a2dd95SBruce Richardson * User defined tags are used to identify flows,
23199a2dd95SBruce Richardson * or sessions.
23299a2dd95SBruce Richardson */
23399a2dd95SBruce Richardson new_tag = next_mb->hash.usr;
23499a2dd95SBruce Richardson
23599a2dd95SBruce Richardson /*
23699a2dd95SBruce Richardson * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
23799a2dd95SBruce Richardson * then the size of match has to be expanded.
23899a2dd95SBruce Richardson */
23999a2dd95SBruce Richardson uint64_t match = 0;
24099a2dd95SBruce Richardson unsigned i;
24199a2dd95SBruce Richardson /*
24299a2dd95SBruce Richardson * to scan for a match use "xor" and "not" to get a 0/1
24399a2dd95SBruce Richardson * value, then use shifting to merge to single "match"
24499a2dd95SBruce Richardson * variable, where a one-bit indicates a match for the
24599a2dd95SBruce Richardson * worker given by the bit-position
24699a2dd95SBruce Richardson */
24799a2dd95SBruce Richardson for (i = 0; i < d->num_workers; i++)
2489699b098SBruce Richardson match |= ((uint64_t)!(d->in_flight_tags[i] ^ new_tag) << i);
24999a2dd95SBruce Richardson
25099a2dd95SBruce Richardson /* Only turned-on bits are considered as match */
25199a2dd95SBruce Richardson match &= d->in_flight_bitmask;
25299a2dd95SBruce Richardson
25399a2dd95SBruce Richardson if (match) {
25499a2dd95SBruce Richardson next_mb = NULL;
255*de0ec3c2STyler Retzlaff unsigned int worker = rte_ctz64(match);
25699a2dd95SBruce Richardson if (add_to_backlog(&d->backlog[worker],
25799a2dd95SBruce Richardson next_value) < 0)
25899a2dd95SBruce Richardson next_idx--;
25999a2dd95SBruce Richardson }
26099a2dd95SBruce Richardson }
26199a2dd95SBruce Richardson
26299a2dd95SBruce Richardson if ((data & RTE_DISTRIB_GET_BUF) &&
26399a2dd95SBruce Richardson (d->backlog[wkr].count || next_mb)) {
26499a2dd95SBruce Richardson
26599a2dd95SBruce Richardson if (d->backlog[wkr].count)
26699a2dd95SBruce Richardson /* Sync with worker. Release bufptr64. */
267c2a363a3STyler Retzlaff rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
26899a2dd95SBruce Richardson backlog_pop(&d->backlog[wkr]),
269c2a363a3STyler Retzlaff rte_memory_order_release);
27099a2dd95SBruce Richardson
27199a2dd95SBruce Richardson else {
27299a2dd95SBruce Richardson /* Sync with worker. Release bufptr64. */
273c2a363a3STyler Retzlaff rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
27499a2dd95SBruce Richardson next_value,
275c2a363a3STyler Retzlaff rte_memory_order_release);
27699a2dd95SBruce Richardson d->in_flight_tags[wkr] = new_tag;
27799a2dd95SBruce Richardson d->in_flight_bitmask |= (1UL << wkr);
27899a2dd95SBruce Richardson next_mb = NULL;
27999a2dd95SBruce Richardson }
28099a2dd95SBruce Richardson oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
28199a2dd95SBruce Richardson } else if (data & RTE_DISTRIB_RETURN_BUF) {
28299a2dd95SBruce Richardson handle_worker_shutdown(d, wkr);
28399a2dd95SBruce Richardson oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
28499a2dd95SBruce Richardson }
28599a2dd95SBruce Richardson
28699a2dd95SBruce Richardson /* store returns in a circular buffer */
28799a2dd95SBruce Richardson store_return(oldbuf, d, &ret_start, &ret_count);
28899a2dd95SBruce Richardson
28999a2dd95SBruce Richardson if (++wkr == d->num_workers)
29099a2dd95SBruce Richardson wkr = 0;
29199a2dd95SBruce Richardson }
29299a2dd95SBruce Richardson /* to finish, check all workers for backlog and schedule work for them
29399a2dd95SBruce Richardson * if they are ready */
29499a2dd95SBruce Richardson for (wkr = 0; wkr < d->num_workers; wkr++)
29599a2dd95SBruce Richardson if (d->backlog[wkr].count &&
29699a2dd95SBruce Richardson /* Sync with worker. Acquire bufptr64. */
297c2a363a3STyler Retzlaff (rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
298c2a363a3STyler Retzlaff rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
29999a2dd95SBruce Richardson
30099a2dd95SBruce Richardson int64_t oldbuf = d->bufs[wkr].bufptr64 >>
30199a2dd95SBruce Richardson RTE_DISTRIB_FLAG_BITS;
30299a2dd95SBruce Richardson
30399a2dd95SBruce Richardson store_return(oldbuf, d, &ret_start, &ret_count);
30499a2dd95SBruce Richardson
30599a2dd95SBruce Richardson /* Sync with worker. Release bufptr64. */
306c2a363a3STyler Retzlaff rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
30799a2dd95SBruce Richardson backlog_pop(&d->backlog[wkr]),
308c2a363a3STyler Retzlaff rte_memory_order_release);
30999a2dd95SBruce Richardson }
31099a2dd95SBruce Richardson
31199a2dd95SBruce Richardson d->returns.start = ret_start;
31299a2dd95SBruce Richardson d->returns.count = ret_count;
31399a2dd95SBruce Richardson return num_mbufs;
31499a2dd95SBruce Richardson }
31599a2dd95SBruce Richardson
31699a2dd95SBruce Richardson /* return to the caller, packets returned from workers */
31799a2dd95SBruce Richardson int
rte_distributor_returned_pkts_single(struct rte_distributor_single * d,struct rte_mbuf ** mbufs,unsigned max_mbufs)31899a2dd95SBruce Richardson rte_distributor_returned_pkts_single(struct rte_distributor_single *d,
31999a2dd95SBruce Richardson struct rte_mbuf **mbufs, unsigned max_mbufs)
32099a2dd95SBruce Richardson {
32199a2dd95SBruce Richardson struct rte_distributor_returned_pkts *returns = &d->returns;
32299a2dd95SBruce Richardson unsigned retval = (max_mbufs < returns->count) ?
32399a2dd95SBruce Richardson max_mbufs : returns->count;
32499a2dd95SBruce Richardson unsigned i;
32599a2dd95SBruce Richardson
32699a2dd95SBruce Richardson for (i = 0; i < retval; i++) {
32799a2dd95SBruce Richardson unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
32899a2dd95SBruce Richardson mbufs[i] = returns->mbufs[idx];
32999a2dd95SBruce Richardson }
33099a2dd95SBruce Richardson returns->start += i;
33199a2dd95SBruce Richardson returns->count -= i;
33299a2dd95SBruce Richardson
33399a2dd95SBruce Richardson return retval;
33499a2dd95SBruce Richardson }
33599a2dd95SBruce Richardson
33699a2dd95SBruce Richardson /* return the number of packets in-flight in a distributor, i.e. packets
33799a2dd95SBruce Richardson * being worked on or queued up in a backlog.
33899a2dd95SBruce Richardson */
33999a2dd95SBruce Richardson static inline unsigned
total_outstanding(const struct rte_distributor_single * d)34099a2dd95SBruce Richardson total_outstanding(const struct rte_distributor_single *d)
34199a2dd95SBruce Richardson {
34299a2dd95SBruce Richardson unsigned wkr, total_outstanding;
34399a2dd95SBruce Richardson
344*de0ec3c2STyler Retzlaff total_outstanding = rte_popcount64(d->in_flight_bitmask);
34599a2dd95SBruce Richardson
34699a2dd95SBruce Richardson for (wkr = 0; wkr < d->num_workers; wkr++)
34799a2dd95SBruce Richardson total_outstanding += d->backlog[wkr].count;
34899a2dd95SBruce Richardson
34999a2dd95SBruce Richardson return total_outstanding;
35099a2dd95SBruce Richardson }
35199a2dd95SBruce Richardson
35299a2dd95SBruce Richardson /* flush the distributor, so that there are no outstanding packets in flight or
35399a2dd95SBruce Richardson * queued up. */
35499a2dd95SBruce Richardson int
rte_distributor_flush_single(struct rte_distributor_single * d)35599a2dd95SBruce Richardson rte_distributor_flush_single(struct rte_distributor_single *d)
35699a2dd95SBruce Richardson {
35799a2dd95SBruce Richardson const unsigned flushed = total_outstanding(d);
35899a2dd95SBruce Richardson
35999a2dd95SBruce Richardson while (total_outstanding(d) > 0)
36099a2dd95SBruce Richardson rte_distributor_process_single(d, NULL, 0);
36199a2dd95SBruce Richardson
36299a2dd95SBruce Richardson return flushed;
36399a2dd95SBruce Richardson }
36499a2dd95SBruce Richardson
36599a2dd95SBruce Richardson /* clears the internal returns array in the distributor */
36699a2dd95SBruce Richardson void
rte_distributor_clear_returns_single(struct rte_distributor_single * d)36799a2dd95SBruce Richardson rte_distributor_clear_returns_single(struct rte_distributor_single *d)
36899a2dd95SBruce Richardson {
36999a2dd95SBruce Richardson d->returns.start = d->returns.count = 0;
37099a2dd95SBruce Richardson #ifndef __OPTIMIZE__
37199a2dd95SBruce Richardson memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
37299a2dd95SBruce Richardson #endif
37399a2dd95SBruce Richardson }
37499a2dd95SBruce Richardson
37599a2dd95SBruce Richardson /* creates a distributor instance */
37699a2dd95SBruce Richardson struct rte_distributor_single *
rte_distributor_create_single(const char * name,unsigned socket_id,unsigned num_workers)37799a2dd95SBruce Richardson rte_distributor_create_single(const char *name,
37899a2dd95SBruce Richardson unsigned socket_id,
37999a2dd95SBruce Richardson unsigned num_workers)
38099a2dd95SBruce Richardson {
38199a2dd95SBruce Richardson struct rte_distributor_single *d;
38299a2dd95SBruce Richardson struct rte_distributor_list *distributor_list;
38399a2dd95SBruce Richardson char mz_name[RTE_MEMZONE_NAMESIZE];
38499a2dd95SBruce Richardson const struct rte_memzone *mz;
38599a2dd95SBruce Richardson
38699a2dd95SBruce Richardson /* compilation-time checks */
38799a2dd95SBruce Richardson RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
38899a2dd95SBruce Richardson RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
38999a2dd95SBruce Richardson RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
39099a2dd95SBruce Richardson sizeof(d->in_flight_bitmask) * CHAR_BIT);
39199a2dd95SBruce Richardson
39299a2dd95SBruce Richardson if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
39399a2dd95SBruce Richardson rte_errno = EINVAL;
39499a2dd95SBruce Richardson return NULL;
39599a2dd95SBruce Richardson }
39699a2dd95SBruce Richardson
39799a2dd95SBruce Richardson snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
39899a2dd95SBruce Richardson mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
39999a2dd95SBruce Richardson if (mz == NULL) {
40099a2dd95SBruce Richardson rte_errno = ENOMEM;
40199a2dd95SBruce Richardson return NULL;
40299a2dd95SBruce Richardson }
40399a2dd95SBruce Richardson
40499a2dd95SBruce Richardson d = mz->addr;
40599a2dd95SBruce Richardson strlcpy(d->name, name, sizeof(d->name));
40699a2dd95SBruce Richardson d->num_workers = num_workers;
40799a2dd95SBruce Richardson
40899a2dd95SBruce Richardson distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head,
40999a2dd95SBruce Richardson rte_distributor_list);
41099a2dd95SBruce Richardson
41199a2dd95SBruce Richardson rte_mcfg_tailq_write_lock();
41299a2dd95SBruce Richardson TAILQ_INSERT_TAIL(distributor_list, d, next);
41399a2dd95SBruce Richardson rte_mcfg_tailq_write_unlock();
41499a2dd95SBruce Richardson
41599a2dd95SBruce Richardson return d;
41699a2dd95SBruce Richardson }
417