xref: /dpdk/drivers/event/dsw/dsw_evdev.c (revision 68a03efeed657e6e05f281479b33b51102797e15)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4 
5 #include <stdbool.h>
6 
7 #include <rte_cycles.h>
8 #include <eventdev_pmd.h>
9 #include <eventdev_pmd_vdev.h>
10 #include <rte_random.h>
11 #include <rte_ring_elem.h>
12 
13 #include "dsw_evdev.h"
14 
15 #define EVENTDEV_NAME_DSW_PMD event_dsw
16 
17 static int
18 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
19 	       const struct rte_event_port_conf *conf)
20 {
21 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
22 	struct dsw_port *port;
23 	struct rte_event_ring *in_ring;
24 	struct rte_ring *ctl_in_ring;
25 	char ring_name[RTE_RING_NAMESIZE];
26 
27 	port = &dsw->ports[port_id];
28 
29 	*port = (struct dsw_port) {
30 		.id = port_id,
31 		.dsw = dsw,
32 		.dequeue_depth = conf->dequeue_depth,
33 		.enqueue_depth = conf->enqueue_depth,
34 		.new_event_threshold = conf->new_event_threshold
35 	};
36 
37 	snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
38 		 port_id);
39 
40 	in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
41 					dev->data->socket_id,
42 					RING_F_SC_DEQ|RING_F_EXACT_SZ);
43 
44 	if (in_ring == NULL)
45 		return -ENOMEM;
46 
47 	snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
48 		 dev->data->dev_id, port_id);
49 
50 	ctl_in_ring = rte_ring_create_elem(ring_name,
51 					   sizeof(struct dsw_ctl_msg),
52 					   DSW_CTL_IN_RING_SIZE,
53 					   dev->data->socket_id,
54 					   RING_F_SC_DEQ|RING_F_EXACT_SZ);
55 
56 	if (ctl_in_ring == NULL) {
57 		rte_event_ring_free(in_ring);
58 		return -ENOMEM;
59 	}
60 
61 	port->in_ring = in_ring;
62 	port->ctl_in_ring = ctl_in_ring;
63 
64 	rte_atomic16_init(&port->load);
65 	rte_atomic32_init(&port->immigration_load);
66 
67 	port->load_update_interval =
68 		(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
69 
70 	port->migration_interval =
71 		(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
72 
73 	dev->data->ports[port_id] = port;
74 
75 	return 0;
76 }
77 
78 static void
79 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
80 		  uint8_t port_id __rte_unused,
81 		  struct rte_event_port_conf *port_conf)
82 {
83 	*port_conf = (struct rte_event_port_conf) {
84 		.new_event_threshold = 1024,
85 		.dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
86 		.enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
87 	};
88 }
89 
90 static void
91 dsw_port_release(void *p)
92 {
93 	struct dsw_port *port = p;
94 
95 	rte_event_ring_free(port->in_ring);
96 	rte_ring_free(port->ctl_in_ring);
97 }
98 
99 static int
100 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
101 		const struct rte_event_queue_conf *conf)
102 {
103 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
104 	struct dsw_queue *queue = &dsw->queues[queue_id];
105 
106 	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
107 		return -ENOTSUP;
108 
109 	/* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
110 	 * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
111 	 * the queue will only have a single serving port, no
112 	 * migration will ever happen, so the extra TYPE_ATOMIC
113 	 * migration overhead is avoided.
114 	 */
115 	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
116 		queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
117 	else {
118 		if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
119 			return -ENOTSUP;
120 		/* atomic or parallel */
121 		queue->schedule_type = conf->schedule_type;
122 	}
123 
124 	queue->num_serving_ports = 0;
125 
126 	return 0;
127 }
128 
129 static void
130 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
131 		   uint8_t queue_id __rte_unused,
132 		   struct rte_event_queue_conf *queue_conf)
133 {
134 	*queue_conf = (struct rte_event_queue_conf) {
135 		.nb_atomic_flows = 4096,
136 		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
137 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL
138 	};
139 }
140 
141 static void
142 dsw_queue_release(struct rte_eventdev *dev __rte_unused,
143 		  uint8_t queue_id __rte_unused)
144 {
145 }
146 
147 static void
148 queue_add_port(struct dsw_queue *queue, uint16_t port_id)
149 {
150 	queue->serving_ports[queue->num_serving_ports] = port_id;
151 	queue->num_serving_ports++;
152 }
153 
154 static bool
155 queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
156 {
157 	uint16_t i;
158 
159 	for (i = 0; i < queue->num_serving_ports; i++)
160 		if (queue->serving_ports[i] == port_id) {
161 			uint16_t last_idx = queue->num_serving_ports - 1;
162 			if (i != last_idx)
163 				queue->serving_ports[i] =
164 					queue->serving_ports[last_idx];
165 			queue->num_serving_ports--;
166 			return true;
167 		}
168 	return false;
169 }
170 
171 static int
172 dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
173 		     const uint8_t queues[], uint16_t num, bool link)
174 {
175 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
176 	struct dsw_port *p = port;
177 	uint16_t i;
178 	uint16_t count = 0;
179 
180 	for (i = 0; i < num; i++) {
181 		uint8_t qid = queues[i];
182 		struct dsw_queue *q = &dsw->queues[qid];
183 		if (link) {
184 			queue_add_port(q, p->id);
185 			count++;
186 		} else {
187 			bool removed = queue_remove_port(q, p->id);
188 			if (removed)
189 				count++;
190 		}
191 	}
192 
193 	return count;
194 }
195 
196 static int
197 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
198 	      const uint8_t priorities[] __rte_unused, uint16_t num)
199 {
200 	return dsw_port_link_unlink(dev, port, queues, num, true);
201 }
202 
203 static int
204 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
205 		uint16_t num)
206 {
207 	return dsw_port_link_unlink(dev, port, queues, num, false);
208 }
209 
210 static void
211 dsw_info_get(struct rte_eventdev *dev __rte_unused,
212 	     struct rte_event_dev_info *info)
213 {
214 	*info = (struct rte_event_dev_info) {
215 		.driver_name = DSW_PMD_NAME,
216 		.max_event_queues = DSW_MAX_QUEUES,
217 		.max_event_queue_flows = DSW_MAX_FLOWS,
218 		.max_event_queue_priority_levels = 1,
219 		.max_event_priority_levels = 1,
220 		.max_event_ports = DSW_MAX_PORTS,
221 		.max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
222 		.max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
223 		.max_num_events = DSW_MAX_EVENTS,
224 		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
225 		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED|
226 		RTE_EVENT_DEV_CAP_NONSEQ_MODE|
227 		RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT|
228 		RTE_EVENT_DEV_CAP_CARRY_FLOW_ID
229 	};
230 }
231 
232 static int
233 dsw_configure(const struct rte_eventdev *dev)
234 {
235 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
236 	const struct rte_event_dev_config *conf = &dev->data->dev_conf;
237 	int32_t min_max_in_flight;
238 
239 	dsw->num_ports = conf->nb_event_ports;
240 	dsw->num_queues = conf->nb_event_queues;
241 
242 	/* Avoid a situation where consumer ports are holding all the
243 	 * credits, without making use of them.
244 	 */
245 	min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
246 
247 	dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
248 
249 	return 0;
250 }
251 
252 
253 static void
254 initial_flow_to_port_assignment(struct dsw_evdev *dsw)
255 {
256 	uint8_t queue_id;
257 	for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
258 		struct dsw_queue *queue = &dsw->queues[queue_id];
259 		uint16_t flow_hash;
260 		for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
261 			uint8_t port_idx =
262 				rte_rand() % queue->num_serving_ports;
263 			uint8_t port_id =
264 				queue->serving_ports[port_idx];
265 			dsw->queues[queue_id].flow_to_port_map[flow_hash] =
266 				port_id;
267 		}
268 	}
269 }
270 
271 static int
272 dsw_start(struct rte_eventdev *dev)
273 {
274 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
275 	uint16_t i;
276 	uint64_t now;
277 
278 	rte_atomic32_init(&dsw->credits_on_loan);
279 
280 	initial_flow_to_port_assignment(dsw);
281 
282 	now = rte_get_timer_cycles();
283 	for (i = 0; i < dsw->num_ports; i++) {
284 		dsw->ports[i].measurement_start = now;
285 		dsw->ports[i].busy_start = now;
286 	}
287 
288 	return 0;
289 }
290 
291 static void
292 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
293 		   eventdev_stop_flush_t flush, void *flush_arg)
294 {
295 	uint16_t i;
296 
297 	for (i = 0; i < buf_len; i++)
298 		flush(dev_id, buf[i], flush_arg);
299 }
300 
301 static void
302 dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,
303 		      eventdev_stop_flush_t flush, void *flush_arg)
304 {
305 	dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,
306 			   flush, flush_arg);
307 }
308 
309 static void
310 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
311 		   eventdev_stop_flush_t flush, void *flush_arg)
312 {
313 	uint16_t dport_id;
314 
315 	for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
316 		if (dport_id != port->id)
317 			dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
318 					   port->out_buffer_len[dport_id],
319 					   flush, flush_arg);
320 }
321 
322 static void
323 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
324 		       eventdev_stop_flush_t flush, void *flush_arg)
325 {
326 	struct rte_event ev;
327 
328 	while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
329 		flush(dev_id, ev, flush_arg);
330 }
331 
332 static void
333 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
334 	  eventdev_stop_flush_t flush, void *flush_arg)
335 {
336 	uint16_t port_id;
337 
338 	if (flush == NULL)
339 		return;
340 
341 	for (port_id = 0; port_id < dsw->num_ports; port_id++) {
342 		struct dsw_port *port = &dsw->ports[port_id];
343 
344 		dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
345 		dsw_port_drain_paused(dev_id, port, flush, flush_arg);
346 		dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
347 	}
348 }
349 
350 static void
351 dsw_stop(struct rte_eventdev *dev)
352 {
353 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
354 	uint8_t dev_id;
355 	eventdev_stop_flush_t flush;
356 	void *flush_arg;
357 
358 	dev_id = dev->data->dev_id;
359 	flush = dev->dev_ops->dev_stop_flush;
360 	flush_arg = dev->data->dev_stop_flush_arg;
361 
362 	dsw_drain(dev_id, dsw, flush, flush_arg);
363 }
364 
365 static int
366 dsw_close(struct rte_eventdev *dev)
367 {
368 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
369 
370 	dsw->num_ports = 0;
371 	dsw->num_queues = 0;
372 
373 	return 0;
374 }
375 
376 static struct rte_eventdev_ops dsw_evdev_ops = {
377 	.port_setup = dsw_port_setup,
378 	.port_def_conf = dsw_port_def_conf,
379 	.port_release = dsw_port_release,
380 	.queue_setup = dsw_queue_setup,
381 	.queue_def_conf = dsw_queue_def_conf,
382 	.queue_release = dsw_queue_release,
383 	.port_link = dsw_port_link,
384 	.port_unlink = dsw_port_unlink,
385 	.dev_infos_get = dsw_info_get,
386 	.dev_configure = dsw_configure,
387 	.dev_start = dsw_start,
388 	.dev_stop = dsw_stop,
389 	.dev_close = dsw_close,
390 	.xstats_get = dsw_xstats_get,
391 	.xstats_get_names = dsw_xstats_get_names,
392 	.xstats_get_by_name = dsw_xstats_get_by_name
393 };
394 
395 static int
396 dsw_probe(struct rte_vdev_device *vdev)
397 {
398 	const char *name;
399 	struct rte_eventdev *dev;
400 	struct dsw_evdev *dsw;
401 
402 	name = rte_vdev_device_name(vdev);
403 
404 	dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
405 				      rte_socket_id());
406 	if (dev == NULL)
407 		return -EFAULT;
408 
409 	dev->dev_ops = &dsw_evdev_ops;
410 	dev->enqueue = dsw_event_enqueue;
411 	dev->enqueue_burst = dsw_event_enqueue_burst;
412 	dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
413 	dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
414 	dev->dequeue = dsw_event_dequeue;
415 	dev->dequeue_burst = dsw_event_dequeue_burst;
416 
417 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
418 		return 0;
419 
420 	dsw = dev->data->dev_private;
421 	dsw->data = dev->data;
422 
423 	return 0;
424 }
425 
426 static int
427 dsw_remove(struct rte_vdev_device *vdev)
428 {
429 	const char *name;
430 
431 	name = rte_vdev_device_name(vdev);
432 	if (name == NULL)
433 		return -EINVAL;
434 
435 	return rte_event_pmd_vdev_uninit(name);
436 }
437 
438 static struct rte_vdev_driver evdev_dsw_pmd_drv = {
439 	.probe = dsw_probe,
440 	.remove = dsw_remove
441 };
442 
443 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
444