xref: /dpdk/lib/distributor/rte_distributor_single.c (revision de0ec3c2458e3e2ce53e504c4ba92a36fcc78ef3)
199a2dd95SBruce Richardson /* SPDX-License-Identifier: BSD-3-Clause
299a2dd95SBruce Richardson  * Copyright(c) 2010-2014 Intel Corporation
399a2dd95SBruce Richardson  */
499a2dd95SBruce Richardson 
599a2dd95SBruce Richardson #include <stdio.h>
699a2dd95SBruce Richardson #include <sys/queue.h>
799a2dd95SBruce Richardson #include <rte_mbuf.h>
899a2dd95SBruce Richardson #include <rte_memzone.h>
999a2dd95SBruce Richardson #include <rte_errno.h>
1099a2dd95SBruce Richardson #include <rte_string_fns.h>
1199a2dd95SBruce Richardson #include <rte_eal_memconfig.h>
1299a2dd95SBruce Richardson #include <rte_pause.h>
1399a2dd95SBruce Richardson #include <rte_tailq.h>
1499a2dd95SBruce Richardson 
1599a2dd95SBruce Richardson #include "rte_distributor_single.h"
1699a2dd95SBruce Richardson #include "distributor_private.h"
1799a2dd95SBruce Richardson 
1899a2dd95SBruce Richardson TAILQ_HEAD(rte_distributor_list, rte_distributor_single);
1999a2dd95SBruce Richardson 
2099a2dd95SBruce Richardson static struct rte_tailq_elem rte_distributor_tailq = {
2199a2dd95SBruce Richardson 	.name = "RTE_DISTRIBUTOR",
2299a2dd95SBruce Richardson };
EAL_REGISTER_TAILQ(rte_distributor_tailq)2399a2dd95SBruce Richardson EAL_REGISTER_TAILQ(rte_distributor_tailq)
2499a2dd95SBruce Richardson 
2599a2dd95SBruce Richardson /**** APIs called by workers ****/
2699a2dd95SBruce Richardson 
2799a2dd95SBruce Richardson void
2899a2dd95SBruce Richardson rte_distributor_request_pkt_single(struct rte_distributor_single *d,
2999a2dd95SBruce Richardson 		unsigned worker_id, struct rte_mbuf *oldpkt)
3099a2dd95SBruce Richardson {
3199a2dd95SBruce Richardson 	union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
3299a2dd95SBruce Richardson 	int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
3399a2dd95SBruce Richardson 			| RTE_DISTRIB_GET_BUF;
346b70c6b3SFeifei Wang 	RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
35c2a363a3STyler Retzlaff 		==, 0, rte_memory_order_relaxed);
3699a2dd95SBruce Richardson 
3799a2dd95SBruce Richardson 	/* Sync with distributor on GET_BUF flag. */
38c2a363a3STyler Retzlaff 	rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
3999a2dd95SBruce Richardson }
4099a2dd95SBruce Richardson 
4199a2dd95SBruce Richardson struct rte_mbuf *
rte_distributor_poll_pkt_single(struct rte_distributor_single * d,unsigned worker_id)4299a2dd95SBruce Richardson rte_distributor_poll_pkt_single(struct rte_distributor_single *d,
4399a2dd95SBruce Richardson 		unsigned worker_id)
4499a2dd95SBruce Richardson {
4599a2dd95SBruce Richardson 	union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
4699a2dd95SBruce Richardson 	/* Sync with distributor. Acquire bufptr64. */
47c2a363a3STyler Retzlaff 	if (rte_atomic_load_explicit(&buf->bufptr64, rte_memory_order_acquire)
4899a2dd95SBruce Richardson 		& RTE_DISTRIB_GET_BUF)
4999a2dd95SBruce Richardson 		return NULL;
5099a2dd95SBruce Richardson 
5199a2dd95SBruce Richardson 	/* since bufptr64 is signed, this should be an arithmetic shift */
5299a2dd95SBruce Richardson 	int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
5399a2dd95SBruce Richardson 	return (struct rte_mbuf *)((uintptr_t)ret);
5499a2dd95SBruce Richardson }
5599a2dd95SBruce Richardson 
5699a2dd95SBruce Richardson struct rte_mbuf *
rte_distributor_get_pkt_single(struct rte_distributor_single * d,unsigned worker_id,struct rte_mbuf * oldpkt)5799a2dd95SBruce Richardson rte_distributor_get_pkt_single(struct rte_distributor_single *d,
5899a2dd95SBruce Richardson 		unsigned worker_id, struct rte_mbuf *oldpkt)
5999a2dd95SBruce Richardson {
6099a2dd95SBruce Richardson 	struct rte_mbuf *ret;
6199a2dd95SBruce Richardson 	rte_distributor_request_pkt_single(d, worker_id, oldpkt);
6299a2dd95SBruce Richardson 	while ((ret = rte_distributor_poll_pkt_single(d, worker_id)) == NULL)
6399a2dd95SBruce Richardson 		rte_pause();
6499a2dd95SBruce Richardson 	return ret;
6599a2dd95SBruce Richardson }
6699a2dd95SBruce Richardson 
6799a2dd95SBruce Richardson int
rte_distributor_return_pkt_single(struct rte_distributor_single * d,unsigned worker_id,struct rte_mbuf * oldpkt)6899a2dd95SBruce Richardson rte_distributor_return_pkt_single(struct rte_distributor_single *d,
6999a2dd95SBruce Richardson 		unsigned worker_id, struct rte_mbuf *oldpkt)
7099a2dd95SBruce Richardson {
7199a2dd95SBruce Richardson 	union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
7299a2dd95SBruce Richardson 	uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
7399a2dd95SBruce Richardson 			| RTE_DISTRIB_RETURN_BUF;
746b70c6b3SFeifei Wang 	RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
75c2a363a3STyler Retzlaff 		==, 0, rte_memory_order_relaxed);
7699a2dd95SBruce Richardson 
7799a2dd95SBruce Richardson 	/* Sync with distributor on RETURN_BUF flag. */
78c2a363a3STyler Retzlaff 	rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
7999a2dd95SBruce Richardson 	return 0;
8099a2dd95SBruce Richardson }
8199a2dd95SBruce Richardson 
8299a2dd95SBruce Richardson /**** APIs called on distributor core ***/
8399a2dd95SBruce Richardson 
8499a2dd95SBruce Richardson /* as name suggests, adds a packet to the backlog for a particular worker */
8599a2dd95SBruce Richardson static int
add_to_backlog(struct rte_distributor_backlog * bl,int64_t item)8699a2dd95SBruce Richardson add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
8799a2dd95SBruce Richardson {
8899a2dd95SBruce Richardson 	if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
8999a2dd95SBruce Richardson 		return -1;
9099a2dd95SBruce Richardson 
9199a2dd95SBruce Richardson 	bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
9299a2dd95SBruce Richardson 			= item;
9399a2dd95SBruce Richardson 	return 0;
9499a2dd95SBruce Richardson }
9599a2dd95SBruce Richardson 
9699a2dd95SBruce Richardson /* takes the next packet for a worker off the backlog */
9799a2dd95SBruce Richardson static int64_t
backlog_pop(struct rte_distributor_backlog * bl)9899a2dd95SBruce Richardson backlog_pop(struct rte_distributor_backlog *bl)
9999a2dd95SBruce Richardson {
10099a2dd95SBruce Richardson 	bl->count--;
10199a2dd95SBruce Richardson 	return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
10299a2dd95SBruce Richardson }
10399a2dd95SBruce Richardson 
10499a2dd95SBruce Richardson /* stores a packet returned from a worker inside the returns array */
10599a2dd95SBruce Richardson static inline void
store_return(uintptr_t oldbuf,struct rte_distributor_single * d,unsigned * ret_start,unsigned * ret_count)10699a2dd95SBruce Richardson store_return(uintptr_t oldbuf, struct rte_distributor_single *d,
10799a2dd95SBruce Richardson 		unsigned *ret_start, unsigned *ret_count)
10899a2dd95SBruce Richardson {
10999a2dd95SBruce Richardson 	/* store returns in a circular buffer - code is branch-free */
11099a2dd95SBruce Richardson 	d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
11199a2dd95SBruce Richardson 			= (void *)oldbuf;
11299a2dd95SBruce Richardson 	*ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
11399a2dd95SBruce Richardson 	*ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
11499a2dd95SBruce Richardson }
11599a2dd95SBruce Richardson 
11699a2dd95SBruce Richardson static inline void
handle_worker_shutdown(struct rte_distributor_single * d,unsigned int wkr)11799a2dd95SBruce Richardson handle_worker_shutdown(struct rte_distributor_single *d, unsigned int wkr)
11899a2dd95SBruce Richardson {
11999a2dd95SBruce Richardson 	d->in_flight_tags[wkr] = 0;
12099a2dd95SBruce Richardson 	d->in_flight_bitmask &= ~(1UL << wkr);
12199a2dd95SBruce Richardson 	/* Sync with worker. Release bufptr64. */
122c2a363a3STyler Retzlaff 	rte_atomic_store_explicit(&d->bufs[wkr].bufptr64, 0, rte_memory_order_release);
12399a2dd95SBruce Richardson 	if (unlikely(d->backlog[wkr].count != 0)) {
12499a2dd95SBruce Richardson 		/* On return of a packet, we need to move the
12599a2dd95SBruce Richardson 		 * queued packets for this core elsewhere.
12699a2dd95SBruce Richardson 		 * Easiest solution is to set things up for
12799a2dd95SBruce Richardson 		 * a recursive call. That will cause those
12899a2dd95SBruce Richardson 		 * packets to be queued up for the next free
12999a2dd95SBruce Richardson 		 * core, i.e. it will return as soon as a
13099a2dd95SBruce Richardson 		 * core becomes free to accept the first
13199a2dd95SBruce Richardson 		 * packet, as subsequent ones will be added to
13299a2dd95SBruce Richardson 		 * the backlog for that core.
13399a2dd95SBruce Richardson 		 */
13499a2dd95SBruce Richardson 		struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
13599a2dd95SBruce Richardson 		unsigned i;
13699a2dd95SBruce Richardson 		struct rte_distributor_backlog *bl = &d->backlog[wkr];
13799a2dd95SBruce Richardson 
13899a2dd95SBruce Richardson 		for (i = 0; i < bl->count; i++) {
13999a2dd95SBruce Richardson 			unsigned idx = (bl->start + i) &
14099a2dd95SBruce Richardson 					RTE_DISTRIB_BACKLOG_MASK;
14199a2dd95SBruce Richardson 			pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
14299a2dd95SBruce Richardson 					RTE_DISTRIB_FLAG_BITS));
14399a2dd95SBruce Richardson 		}
14499a2dd95SBruce Richardson 		/* recursive call.
14599a2dd95SBruce Richardson 		 * Note that the tags were set before first level call
14699a2dd95SBruce Richardson 		 * to rte_distributor_process.
14799a2dd95SBruce Richardson 		 */
14899a2dd95SBruce Richardson 		rte_distributor_process_single(d, pkts, i);
14999a2dd95SBruce Richardson 		bl->count = bl->start = 0;
15099a2dd95SBruce Richardson 	}
15199a2dd95SBruce Richardson }
15299a2dd95SBruce Richardson 
15399a2dd95SBruce Richardson /* this function is called when process() fn is called without any new
15499a2dd95SBruce Richardson  * packets. It goes through all the workers and clears any returned packets
15599a2dd95SBruce Richardson  * to do a partial flush.
15699a2dd95SBruce Richardson  */
15799a2dd95SBruce Richardson static int
process_returns(struct rte_distributor_single * d)15899a2dd95SBruce Richardson process_returns(struct rte_distributor_single *d)
15999a2dd95SBruce Richardson {
16099a2dd95SBruce Richardson 	unsigned wkr;
16199a2dd95SBruce Richardson 	unsigned flushed = 0;
16299a2dd95SBruce Richardson 	unsigned ret_start = d->returns.start,
16399a2dd95SBruce Richardson 			ret_count = d->returns.count;
16499a2dd95SBruce Richardson 
16599a2dd95SBruce Richardson 	for (wkr = 0; wkr < d->num_workers; wkr++) {
16699a2dd95SBruce Richardson 		uintptr_t oldbuf = 0;
16799a2dd95SBruce Richardson 		/* Sync with worker. Acquire bufptr64. */
168c2a363a3STyler Retzlaff 		const int64_t data = rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
169c2a363a3STyler Retzlaff 							rte_memory_order_acquire);
17099a2dd95SBruce Richardson 
17199a2dd95SBruce Richardson 		if (data & RTE_DISTRIB_GET_BUF) {
17299a2dd95SBruce Richardson 			flushed++;
17399a2dd95SBruce Richardson 			if (d->backlog[wkr].count)
17499a2dd95SBruce Richardson 				/* Sync with worker. Release bufptr64. */
175c2a363a3STyler Retzlaff 				rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
17699a2dd95SBruce Richardson 					backlog_pop(&d->backlog[wkr]),
177c2a363a3STyler Retzlaff 					rte_memory_order_release);
17899a2dd95SBruce Richardson 			else {
17999a2dd95SBruce Richardson 				/* Sync with worker on GET_BUF flag. */
180c2a363a3STyler Retzlaff 				rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
18199a2dd95SBruce Richardson 					RTE_DISTRIB_GET_BUF,
182c2a363a3STyler Retzlaff 					rte_memory_order_release);
18399a2dd95SBruce Richardson 				d->in_flight_tags[wkr] = 0;
18499a2dd95SBruce Richardson 				d->in_flight_bitmask &= ~(1UL << wkr);
18599a2dd95SBruce Richardson 			}
18699a2dd95SBruce Richardson 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
18799a2dd95SBruce Richardson 		} else if (data & RTE_DISTRIB_RETURN_BUF) {
18899a2dd95SBruce Richardson 			handle_worker_shutdown(d, wkr);
18999a2dd95SBruce Richardson 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
19099a2dd95SBruce Richardson 		}
19199a2dd95SBruce Richardson 
19299a2dd95SBruce Richardson 		store_return(oldbuf, d, &ret_start, &ret_count);
19399a2dd95SBruce Richardson 	}
19499a2dd95SBruce Richardson 
19599a2dd95SBruce Richardson 	d->returns.start = ret_start;
19699a2dd95SBruce Richardson 	d->returns.count = ret_count;
19799a2dd95SBruce Richardson 
19899a2dd95SBruce Richardson 	return flushed;
19999a2dd95SBruce Richardson }
20099a2dd95SBruce Richardson 
20199a2dd95SBruce Richardson /* process a set of packets to distribute them to workers */
20299a2dd95SBruce Richardson int
rte_distributor_process_single(struct rte_distributor_single * d,struct rte_mbuf ** mbufs,unsigned num_mbufs)20399a2dd95SBruce Richardson rte_distributor_process_single(struct rte_distributor_single *d,
20499a2dd95SBruce Richardson 		struct rte_mbuf **mbufs, unsigned num_mbufs)
20599a2dd95SBruce Richardson {
20699a2dd95SBruce Richardson 	unsigned next_idx = 0;
20799a2dd95SBruce Richardson 	unsigned wkr = 0;
20899a2dd95SBruce Richardson 	struct rte_mbuf *next_mb = NULL;
20999a2dd95SBruce Richardson 	int64_t next_value = 0;
21099a2dd95SBruce Richardson 	uint32_t new_tag = 0;
21199a2dd95SBruce Richardson 	unsigned ret_start = d->returns.start,
21299a2dd95SBruce Richardson 			ret_count = d->returns.count;
21399a2dd95SBruce Richardson 
21499a2dd95SBruce Richardson 	if (unlikely(num_mbufs == 0))
21599a2dd95SBruce Richardson 		return process_returns(d);
21699a2dd95SBruce Richardson 
21799a2dd95SBruce Richardson 	while (next_idx < num_mbufs || next_mb != NULL) {
21899a2dd95SBruce Richardson 		uintptr_t oldbuf = 0;
21999a2dd95SBruce Richardson 		/* Sync with worker. Acquire bufptr64. */
220c2a363a3STyler Retzlaff 		int64_t data = rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64),
221c2a363a3STyler Retzlaff 						rte_memory_order_acquire);
22299a2dd95SBruce Richardson 
22399a2dd95SBruce Richardson 		if (!next_mb) {
22499a2dd95SBruce Richardson 			next_mb = mbufs[next_idx++];
22599a2dd95SBruce Richardson 			next_value = (((int64_t)(uintptr_t)next_mb)
22699a2dd95SBruce Richardson 					<< RTE_DISTRIB_FLAG_BITS);
22799a2dd95SBruce Richardson 			/*
22899a2dd95SBruce Richardson 			 * User is advocated to set tag value for each
22999a2dd95SBruce Richardson 			 * mbuf before calling rte_distributor_process.
23099a2dd95SBruce Richardson 			 * User defined tags are used to identify flows,
23199a2dd95SBruce Richardson 			 * or sessions.
23299a2dd95SBruce Richardson 			 */
23399a2dd95SBruce Richardson 			new_tag = next_mb->hash.usr;
23499a2dd95SBruce Richardson 
23599a2dd95SBruce Richardson 			/*
23699a2dd95SBruce Richardson 			 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
23799a2dd95SBruce Richardson 			 * then the size of match has to be expanded.
23899a2dd95SBruce Richardson 			 */
23999a2dd95SBruce Richardson 			uint64_t match = 0;
24099a2dd95SBruce Richardson 			unsigned i;
24199a2dd95SBruce Richardson 			/*
24299a2dd95SBruce Richardson 			 * to scan for a match use "xor" and "not" to get a 0/1
24399a2dd95SBruce Richardson 			 * value, then use shifting to merge to single "match"
24499a2dd95SBruce Richardson 			 * variable, where a one-bit indicates a match for the
24599a2dd95SBruce Richardson 			 * worker given by the bit-position
24699a2dd95SBruce Richardson 			 */
24799a2dd95SBruce Richardson 			for (i = 0; i < d->num_workers; i++)
2489699b098SBruce Richardson 				match |= ((uint64_t)!(d->in_flight_tags[i] ^ new_tag) << i);
24999a2dd95SBruce Richardson 
25099a2dd95SBruce Richardson 			/* Only turned-on bits are considered as match */
25199a2dd95SBruce Richardson 			match &= d->in_flight_bitmask;
25299a2dd95SBruce Richardson 
25399a2dd95SBruce Richardson 			if (match) {
25499a2dd95SBruce Richardson 				next_mb = NULL;
255*de0ec3c2STyler Retzlaff 				unsigned int worker = rte_ctz64(match);
25699a2dd95SBruce Richardson 				if (add_to_backlog(&d->backlog[worker],
25799a2dd95SBruce Richardson 						next_value) < 0)
25899a2dd95SBruce Richardson 					next_idx--;
25999a2dd95SBruce Richardson 			}
26099a2dd95SBruce Richardson 		}
26199a2dd95SBruce Richardson 
26299a2dd95SBruce Richardson 		if ((data & RTE_DISTRIB_GET_BUF) &&
26399a2dd95SBruce Richardson 				(d->backlog[wkr].count || next_mb)) {
26499a2dd95SBruce Richardson 
26599a2dd95SBruce Richardson 			if (d->backlog[wkr].count)
26699a2dd95SBruce Richardson 				/* Sync with worker. Release bufptr64. */
267c2a363a3STyler Retzlaff 				rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
26899a2dd95SBruce Richardson 						backlog_pop(&d->backlog[wkr]),
269c2a363a3STyler Retzlaff 						rte_memory_order_release);
27099a2dd95SBruce Richardson 
27199a2dd95SBruce Richardson 			else {
27299a2dd95SBruce Richardson 				/* Sync with worker. Release bufptr64.  */
273c2a363a3STyler Retzlaff 				rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
27499a2dd95SBruce Richardson 						next_value,
275c2a363a3STyler Retzlaff 						rte_memory_order_release);
27699a2dd95SBruce Richardson 				d->in_flight_tags[wkr] = new_tag;
27799a2dd95SBruce Richardson 				d->in_flight_bitmask |= (1UL << wkr);
27899a2dd95SBruce Richardson 				next_mb = NULL;
27999a2dd95SBruce Richardson 			}
28099a2dd95SBruce Richardson 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
28199a2dd95SBruce Richardson 		} else if (data & RTE_DISTRIB_RETURN_BUF) {
28299a2dd95SBruce Richardson 			handle_worker_shutdown(d, wkr);
28399a2dd95SBruce Richardson 			oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
28499a2dd95SBruce Richardson 		}
28599a2dd95SBruce Richardson 
28699a2dd95SBruce Richardson 		/* store returns in a circular buffer */
28799a2dd95SBruce Richardson 		store_return(oldbuf, d, &ret_start, &ret_count);
28899a2dd95SBruce Richardson 
28999a2dd95SBruce Richardson 		if (++wkr == d->num_workers)
29099a2dd95SBruce Richardson 			wkr = 0;
29199a2dd95SBruce Richardson 	}
29299a2dd95SBruce Richardson 	/* to finish, check all workers for backlog and schedule work for them
29399a2dd95SBruce Richardson 	 * if they are ready */
29499a2dd95SBruce Richardson 	for (wkr = 0; wkr < d->num_workers; wkr++)
29599a2dd95SBruce Richardson 		if (d->backlog[wkr].count &&
29699a2dd95SBruce Richardson 				/* Sync with worker. Acquire bufptr64. */
297c2a363a3STyler Retzlaff 				(rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
298c2a363a3STyler Retzlaff 				rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
29999a2dd95SBruce Richardson 
30099a2dd95SBruce Richardson 			int64_t oldbuf = d->bufs[wkr].bufptr64 >>
30199a2dd95SBruce Richardson 					RTE_DISTRIB_FLAG_BITS;
30299a2dd95SBruce Richardson 
30399a2dd95SBruce Richardson 			store_return(oldbuf, d, &ret_start, &ret_count);
30499a2dd95SBruce Richardson 
30599a2dd95SBruce Richardson 			/* Sync with worker. Release bufptr64. */
306c2a363a3STyler Retzlaff 			rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
30799a2dd95SBruce Richardson 				backlog_pop(&d->backlog[wkr]),
308c2a363a3STyler Retzlaff 				rte_memory_order_release);
30999a2dd95SBruce Richardson 		}
31099a2dd95SBruce Richardson 
31199a2dd95SBruce Richardson 	d->returns.start = ret_start;
31299a2dd95SBruce Richardson 	d->returns.count = ret_count;
31399a2dd95SBruce Richardson 	return num_mbufs;
31499a2dd95SBruce Richardson }
31599a2dd95SBruce Richardson 
31699a2dd95SBruce Richardson /* return to the caller, packets returned from workers */
31799a2dd95SBruce Richardson int
rte_distributor_returned_pkts_single(struct rte_distributor_single * d,struct rte_mbuf ** mbufs,unsigned max_mbufs)31899a2dd95SBruce Richardson rte_distributor_returned_pkts_single(struct rte_distributor_single *d,
31999a2dd95SBruce Richardson 		struct rte_mbuf **mbufs, unsigned max_mbufs)
32099a2dd95SBruce Richardson {
32199a2dd95SBruce Richardson 	struct rte_distributor_returned_pkts *returns = &d->returns;
32299a2dd95SBruce Richardson 	unsigned retval = (max_mbufs < returns->count) ?
32399a2dd95SBruce Richardson 			max_mbufs : returns->count;
32499a2dd95SBruce Richardson 	unsigned i;
32599a2dd95SBruce Richardson 
32699a2dd95SBruce Richardson 	for (i = 0; i < retval; i++) {
32799a2dd95SBruce Richardson 		unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
32899a2dd95SBruce Richardson 		mbufs[i] = returns->mbufs[idx];
32999a2dd95SBruce Richardson 	}
33099a2dd95SBruce Richardson 	returns->start += i;
33199a2dd95SBruce Richardson 	returns->count -= i;
33299a2dd95SBruce Richardson 
33399a2dd95SBruce Richardson 	return retval;
33499a2dd95SBruce Richardson }
33599a2dd95SBruce Richardson 
33699a2dd95SBruce Richardson /* return the number of packets in-flight in a distributor, i.e. packets
33799a2dd95SBruce Richardson  * being worked on or queued up in a backlog.
33899a2dd95SBruce Richardson  */
33999a2dd95SBruce Richardson static inline unsigned
total_outstanding(const struct rte_distributor_single * d)34099a2dd95SBruce Richardson total_outstanding(const struct rte_distributor_single *d)
34199a2dd95SBruce Richardson {
34299a2dd95SBruce Richardson 	unsigned wkr, total_outstanding;
34399a2dd95SBruce Richardson 
344*de0ec3c2STyler Retzlaff 	total_outstanding = rte_popcount64(d->in_flight_bitmask);
34599a2dd95SBruce Richardson 
34699a2dd95SBruce Richardson 	for (wkr = 0; wkr < d->num_workers; wkr++)
34799a2dd95SBruce Richardson 		total_outstanding += d->backlog[wkr].count;
34899a2dd95SBruce Richardson 
34999a2dd95SBruce Richardson 	return total_outstanding;
35099a2dd95SBruce Richardson }
35199a2dd95SBruce Richardson 
35299a2dd95SBruce Richardson /* flush the distributor, so that there are no outstanding packets in flight or
35399a2dd95SBruce Richardson  * queued up. */
35499a2dd95SBruce Richardson int
rte_distributor_flush_single(struct rte_distributor_single * d)35599a2dd95SBruce Richardson rte_distributor_flush_single(struct rte_distributor_single *d)
35699a2dd95SBruce Richardson {
35799a2dd95SBruce Richardson 	const unsigned flushed = total_outstanding(d);
35899a2dd95SBruce Richardson 
35999a2dd95SBruce Richardson 	while (total_outstanding(d) > 0)
36099a2dd95SBruce Richardson 		rte_distributor_process_single(d, NULL, 0);
36199a2dd95SBruce Richardson 
36299a2dd95SBruce Richardson 	return flushed;
36399a2dd95SBruce Richardson }
36499a2dd95SBruce Richardson 
36599a2dd95SBruce Richardson /* clears the internal returns array in the distributor */
36699a2dd95SBruce Richardson void
rte_distributor_clear_returns_single(struct rte_distributor_single * d)36799a2dd95SBruce Richardson rte_distributor_clear_returns_single(struct rte_distributor_single *d)
36899a2dd95SBruce Richardson {
36999a2dd95SBruce Richardson 	d->returns.start = d->returns.count = 0;
37099a2dd95SBruce Richardson #ifndef __OPTIMIZE__
37199a2dd95SBruce Richardson 	memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
37299a2dd95SBruce Richardson #endif
37399a2dd95SBruce Richardson }
37499a2dd95SBruce Richardson 
37599a2dd95SBruce Richardson /* creates a distributor instance */
37699a2dd95SBruce Richardson struct rte_distributor_single *
rte_distributor_create_single(const char * name,unsigned socket_id,unsigned num_workers)37799a2dd95SBruce Richardson rte_distributor_create_single(const char *name,
37899a2dd95SBruce Richardson 		unsigned socket_id,
37999a2dd95SBruce Richardson 		unsigned num_workers)
38099a2dd95SBruce Richardson {
38199a2dd95SBruce Richardson 	struct rte_distributor_single *d;
38299a2dd95SBruce Richardson 	struct rte_distributor_list *distributor_list;
38399a2dd95SBruce Richardson 	char mz_name[RTE_MEMZONE_NAMESIZE];
38499a2dd95SBruce Richardson 	const struct rte_memzone *mz;
38599a2dd95SBruce Richardson 
38699a2dd95SBruce Richardson 	/* compilation-time checks */
38799a2dd95SBruce Richardson 	RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
38899a2dd95SBruce Richardson 	RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
38999a2dd95SBruce Richardson 	RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
39099a2dd95SBruce Richardson 				sizeof(d->in_flight_bitmask) * CHAR_BIT);
39199a2dd95SBruce Richardson 
39299a2dd95SBruce Richardson 	if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
39399a2dd95SBruce Richardson 		rte_errno = EINVAL;
39499a2dd95SBruce Richardson 		return NULL;
39599a2dd95SBruce Richardson 	}
39699a2dd95SBruce Richardson 
39799a2dd95SBruce Richardson 	snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
39899a2dd95SBruce Richardson 	mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
39999a2dd95SBruce Richardson 	if (mz == NULL) {
40099a2dd95SBruce Richardson 		rte_errno = ENOMEM;
40199a2dd95SBruce Richardson 		return NULL;
40299a2dd95SBruce Richardson 	}
40399a2dd95SBruce Richardson 
40499a2dd95SBruce Richardson 	d = mz->addr;
40599a2dd95SBruce Richardson 	strlcpy(d->name, name, sizeof(d->name));
40699a2dd95SBruce Richardson 	d->num_workers = num_workers;
40799a2dd95SBruce Richardson 
40899a2dd95SBruce Richardson 	distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head,
40999a2dd95SBruce Richardson 					  rte_distributor_list);
41099a2dd95SBruce Richardson 
41199a2dd95SBruce Richardson 	rte_mcfg_tailq_write_lock();
41299a2dd95SBruce Richardson 	TAILQ_INSERT_TAIL(distributor_list, d, next);
41399a2dd95SBruce Richardson 	rte_mcfg_tailq_write_unlock();
41499a2dd95SBruce Richardson 
41599a2dd95SBruce Richardson 	return d;
41699a2dd95SBruce Richardson }
417