xref: /dpdk/drivers/event/dsw/dsw_evdev.c (revision dd1d4398795aeaaa32b66889b64650e940d7204f)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4 
5 #include <stdbool.h>
6 
7 #include <rte_cycles.h>
8 #include <eventdev_pmd.h>
9 #include <eventdev_pmd_vdev.h>
10 #include <rte_random.h>
11 #include <rte_ring_elem.h>
12 
13 #include "dsw_evdev.h"
14 
15 #define EVENTDEV_NAME_DSW_PMD event_dsw
16 
17 static int
18 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
19 	       const struct rte_event_port_conf *conf)
20 {
21 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
22 	struct dsw_port *port;
23 	struct rte_event_ring *in_ring;
24 	struct rte_ring *ctl_in_ring;
25 	char ring_name[RTE_RING_NAMESIZE];
26 	bool implicit_release;
27 
28 	port = &dsw->ports[port_id];
29 
30 	implicit_release =
31 	    !(conf->event_port_cfg & RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
32 
33 	*port = (struct dsw_port) {
34 		.id = port_id,
35 		.dsw = dsw,
36 		.dequeue_depth = conf->dequeue_depth,
37 		.enqueue_depth = conf->enqueue_depth,
38 		.new_event_threshold = conf->new_event_threshold,
39 		.implicit_release = implicit_release
40 	};
41 
42 	snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
43 		 port_id);
44 
45 	in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
46 					dev->data->socket_id,
47 					RING_F_SC_DEQ|RING_F_EXACT_SZ);
48 
49 	if (in_ring == NULL)
50 		return -ENOMEM;
51 
52 	snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
53 		 dev->data->dev_id, port_id);
54 
55 	ctl_in_ring = rte_ring_create_elem(ring_name,
56 					   sizeof(struct dsw_ctl_msg),
57 					   DSW_CTL_IN_RING_SIZE,
58 					   dev->data->socket_id,
59 					   RING_F_SC_DEQ|RING_F_EXACT_SZ);
60 
61 	if (ctl_in_ring == NULL) {
62 		rte_event_ring_free(in_ring);
63 		return -ENOMEM;
64 	}
65 
66 	port->in_ring = in_ring;
67 	port->ctl_in_ring = ctl_in_ring;
68 
69 	port->load_update_interval =
70 		(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
71 
72 	port->migration_interval =
73 		(DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
74 
75 	dev->data->ports[port_id] = port;
76 
77 	return 0;
78 }
79 
80 static void
81 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
82 		  uint8_t port_id __rte_unused,
83 		  struct rte_event_port_conf *port_conf)
84 {
85 	*port_conf = (struct rte_event_port_conf) {
86 		.new_event_threshold = 1024,
87 		.dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
88 		.enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
89 	};
90 }
91 
92 static void
93 dsw_port_release(void *p)
94 {
95 	struct dsw_port *port = p;
96 
97 	rte_event_ring_free(port->in_ring);
98 	rte_ring_free(port->ctl_in_ring);
99 }
100 
101 static int
102 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
103 		const struct rte_event_queue_conf *conf)
104 {
105 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
106 	struct dsw_queue *queue = &dsw->queues[queue_id];
107 
108 	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
109 		return -ENOTSUP;
110 
111 	/* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
112 	 * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
113 	 * the queue will only have a single serving port, no
114 	 * migration will ever happen, so the extra TYPE_ATOMIC
115 	 * migration overhead is avoided.
116 	 */
117 	if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
118 		queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
119 	else {
120 		if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
121 			return -ENOTSUP;
122 		/* atomic or parallel */
123 		queue->schedule_type = conf->schedule_type;
124 	}
125 
126 	rte_bitset_init(queue->serving_ports, DSW_MAX_PORTS);
127 	queue->num_serving_ports = 0;
128 
129 	return 0;
130 }
131 
132 static void
133 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
134 		   uint8_t queue_id __rte_unused,
135 		   struct rte_event_queue_conf *queue_conf)
136 {
137 	*queue_conf = (struct rte_event_queue_conf) {
138 		.nb_atomic_flows = 4096,
139 		.schedule_type = RTE_SCHED_TYPE_ATOMIC,
140 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL
141 	};
142 }
143 
144 static void
145 dsw_queue_release(struct rte_eventdev *dev __rte_unused,
146 		  uint8_t queue_id __rte_unused)
147 {
148 }
149 
150 static void
151 queue_add_port(struct dsw_queue *queue, uint16_t port_id)
152 {
153 	rte_bitset_set(queue->serving_ports, port_id);
154 	queue->num_serving_ports++;
155 }
156 
157 static bool
158 queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
159 {
160 	if (rte_bitset_test(queue->serving_ports, port_id)) {
161 		queue->num_serving_ports--;
162 		rte_bitset_clear(queue->serving_ports, port_id);
163 		return true;
164 	}
165 
166 	return false;
167 }
168 
169 static int
170 dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
171 		     const uint8_t queues[], uint16_t num, bool link)
172 {
173 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
174 	struct dsw_port *p = port;
175 	uint16_t i;
176 	uint16_t count = 0;
177 
178 	for (i = 0; i < num; i++) {
179 		uint8_t qid = queues[i];
180 		struct dsw_queue *q = &dsw->queues[qid];
181 		if (link) {
182 			queue_add_port(q, p->id);
183 			count++;
184 		} else {
185 			bool removed = queue_remove_port(q, p->id);
186 			if (removed)
187 				count++;
188 		}
189 	}
190 
191 	return count;
192 }
193 
194 static int
195 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
196 	      const uint8_t priorities[] __rte_unused, uint16_t num)
197 {
198 	return dsw_port_link_unlink(dev, port, queues, num, true);
199 }
200 
201 static int
202 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
203 		uint16_t num)
204 {
205 	return dsw_port_link_unlink(dev, port, queues, num, false);
206 }
207 
208 static void
209 dsw_info_get(struct rte_eventdev *dev __rte_unused,
210 	     struct rte_event_dev_info *info)
211 {
212 	*info = (struct rte_event_dev_info) {
213 		.driver_name = DSW_PMD_NAME,
214 		.max_event_queues = DSW_MAX_QUEUES,
215 		.max_event_queue_flows = DSW_MAX_FLOWS,
216 		.max_event_queue_priority_levels = 1,
217 		.max_event_priority_levels = 1,
218 		.max_event_ports = DSW_MAX_PORTS,
219 		.max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
220 		.max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
221 		.max_num_events = DSW_MAX_EVENTS,
222 		.max_profiles_per_port = 1,
223 		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
224 		RTE_EVENT_DEV_CAP_ATOMIC |
225 		RTE_EVENT_DEV_CAP_PARALLEL |
226 		RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED|
227 		RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE|
228 		RTE_EVENT_DEV_CAP_NONSEQ_MODE|
229 		RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT|
230 		RTE_EVENT_DEV_CAP_CARRY_FLOW_ID |
231 		RTE_EVENT_DEV_CAP_INDEPENDENT_ENQ
232 	};
233 }
234 
235 static int
236 dsw_configure(const struct rte_eventdev *dev)
237 {
238 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
239 	const struct rte_event_dev_config *conf = &dev->data->dev_conf;
240 	int32_t min_max_in_flight;
241 
242 	dsw->num_ports = conf->nb_event_ports;
243 	dsw->num_queues = conf->nb_event_queues;
244 
245 	/* Avoid a situation where consumer ports are holding all the
246 	 * credits, without making use of them.
247 	 */
248 	min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
249 
250 	dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
251 
252 	return 0;
253 }
254 
255 
256 static void
257 initial_flow_to_port_assignment(struct dsw_evdev *dsw)
258 {
259 	uint8_t queue_id;
260 	for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
261 		struct dsw_queue *queue = &dsw->queues[queue_id];
262 		uint16_t flow_hash;
263 		for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
264 			uint8_t skip = rte_rand_max(queue->num_serving_ports);
265 			uint8_t port_id;
266 
267 			for (port_id = 0;; port_id++) {
268 				if (rte_bitset_test(queue->serving_ports,
269 						    port_id)) {
270 					if (skip == 0)
271 						break;
272 					skip--;
273 				}
274 			}
275 
276 			dsw->queues[queue_id].flow_to_port_map[flow_hash] =
277 				port_id;
278 		}
279 	}
280 }
281 
282 static int
283 dsw_start(struct rte_eventdev *dev)
284 {
285 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
286 	uint16_t i;
287 	uint64_t now;
288 
289 	dsw->credits_on_loan = 0;
290 
291 	initial_flow_to_port_assignment(dsw);
292 
293 	now = rte_get_timer_cycles();
294 	for (i = 0; i < dsw->num_ports; i++) {
295 		dsw->ports[i].measurement_start = now;
296 		dsw->ports[i].busy_start = now;
297 	}
298 
299 	return 0;
300 }
301 
302 static void
303 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
304 		   eventdev_stop_flush_t flush, void *flush_arg)
305 {
306 	uint16_t i;
307 
308 	for (i = 0; i < buf_len; i++)
309 		flush(dev_id, buf[i], flush_arg);
310 }
311 
312 static void
313 dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,
314 		      eventdev_stop_flush_t flush, void *flush_arg)
315 {
316 	dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,
317 			   flush, flush_arg);
318 }
319 
320 static void
321 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
322 		   eventdev_stop_flush_t flush, void *flush_arg)
323 {
324 	uint16_t dport_id;
325 
326 	for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
327 		if (dport_id != port->id)
328 			dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
329 					   port->out_buffer_len[dport_id],
330 					   flush, flush_arg);
331 }
332 
333 static void
334 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
335 		       eventdev_stop_flush_t flush, void *flush_arg)
336 {
337 	struct rte_event ev;
338 
339 	while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
340 		flush(dev_id, ev, flush_arg);
341 }
342 
343 static void
344 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
345 	  eventdev_stop_flush_t flush, void *flush_arg)
346 {
347 	uint16_t port_id;
348 
349 	if (flush == NULL)
350 		return;
351 
352 	for (port_id = 0; port_id < dsw->num_ports; port_id++) {
353 		struct dsw_port *port = &dsw->ports[port_id];
354 
355 		dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
356 		dsw_port_drain_paused(dev_id, port, flush, flush_arg);
357 		dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
358 	}
359 }
360 
361 static void
362 dsw_stop(struct rte_eventdev *dev)
363 {
364 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
365 	uint8_t dev_id;
366 	eventdev_stop_flush_t flush;
367 	void *flush_arg;
368 
369 	dev_id = dev->data->dev_id;
370 	flush = dev->dev_ops->dev_stop_flush;
371 	flush_arg = dev->data->dev_stop_flush_arg;
372 
373 	dsw_drain(dev_id, dsw, flush, flush_arg);
374 }
375 
376 static int
377 dsw_close(struct rte_eventdev *dev)
378 {
379 	struct dsw_evdev *dsw = dsw_pmd_priv(dev);
380 	uint16_t port_id;
381 
382 	for (port_id = 0; port_id < dsw->num_ports; port_id++)
383 		dsw_port_release(&dsw->ports[port_id]);
384 
385 	dsw->num_ports = 0;
386 	dsw->num_queues = 0;
387 
388 	return 0;
389 }
390 
391 static int
392 dsw_eth_rx_adapter_caps_get(const struct rte_eventdev *dev __rte_unused,
393 			    const struct rte_eth_dev *eth_dev __rte_unused,
394 			    uint32_t *caps)
395 {
396 	*caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
397 	return 0;
398 }
399 
400 static int
401 dsw_timer_adapter_caps_get(const struct rte_eventdev *dev __rte_unused,
402 			   uint64_t flags __rte_unused, uint32_t *caps,
403 			   const struct event_timer_adapter_ops **ops)
404 {
405 	*caps = 0;
406 	*ops = NULL;
407 	return 0;
408 }
409 
410 static int
411 dsw_crypto_adapter_caps_get(const struct rte_eventdev *dev  __rte_unused,
412 			    const struct rte_cryptodev *cdev  __rte_unused,
413 			    uint32_t *caps)
414 {
415 	*caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP;
416 	return 0;
417 }
418 
419 static struct eventdev_ops dsw_evdev_ops = {
420 	.port_setup = dsw_port_setup,
421 	.port_def_conf = dsw_port_def_conf,
422 	.port_release = dsw_port_release,
423 	.queue_setup = dsw_queue_setup,
424 	.queue_def_conf = dsw_queue_def_conf,
425 	.queue_release = dsw_queue_release,
426 	.port_link = dsw_port_link,
427 	.port_unlink = dsw_port_unlink,
428 	.dev_infos_get = dsw_info_get,
429 	.dev_configure = dsw_configure,
430 	.dev_start = dsw_start,
431 	.dev_stop = dsw_stop,
432 	.dev_close = dsw_close,
433 	.eth_rx_adapter_caps_get = dsw_eth_rx_adapter_caps_get,
434 	.timer_adapter_caps_get = dsw_timer_adapter_caps_get,
435 	.crypto_adapter_caps_get = dsw_crypto_adapter_caps_get,
436 	.xstats_get = dsw_xstats_get,
437 	.xstats_get_names = dsw_xstats_get_names,
438 	.xstats_get_by_name = dsw_xstats_get_by_name
439 };
440 
441 static int
442 dsw_probe(struct rte_vdev_device *vdev)
443 {
444 	const char *name;
445 	struct rte_eventdev *dev;
446 	struct dsw_evdev *dsw;
447 
448 	name = rte_vdev_device_name(vdev);
449 
450 	dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
451 				      rte_socket_id(), vdev);
452 	if (dev == NULL)
453 		return -EFAULT;
454 
455 	dev->dev_ops = &dsw_evdev_ops;
456 	dev->enqueue_burst = dsw_event_enqueue_burst;
457 	dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
458 	dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
459 	dev->dequeue_burst = dsw_event_dequeue_burst;
460 	dev->maintain = dsw_event_maintain;
461 
462 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
463 		return 0;
464 
465 	dsw = dev->data->dev_private;
466 	dsw->data = dev->data;
467 
468 	event_dev_probing_finish(dev);
469 	return 0;
470 }
471 
472 static int
473 dsw_remove(struct rte_vdev_device *vdev)
474 {
475 	const char *name;
476 
477 	name = rte_vdev_device_name(vdev);
478 	if (name == NULL)
479 		return -EINVAL;
480 
481 	return rte_event_pmd_vdev_uninit(name);
482 }
483 
484 static struct rte_vdev_driver evdev_dsw_pmd_drv = {
485 	.probe = dsw_probe,
486 	.remove = dsw_remove
487 };
488 
489 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
490 RTE_LOG_REGISTER_DEFAULT(event_dsw_logtype, NOTICE);
491