xref: /dpdk/drivers/event/opdl/opdl_evdev.c (revision 8b565b3445b67567a459d48e64ba5700320fc852)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4 
5 #include <inttypes.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #include <bus_vdev_driver.h>
10 #include <rte_lcore.h>
11 #include <rte_memzone.h>
12 #include <rte_kvargs.h>
13 #include <rte_errno.h>
14 #include <rte_cycles.h>
15 
16 #include "opdl_evdev.h"
17 #include "opdl_ring.h"
18 #include "opdl_log.h"
19 
20 #define EVENTDEV_NAME_OPDL_PMD event_opdl
21 #define NUMA_NODE_ARG "numa_node"
22 #define DO_VALIDATION_ARG "do_validation"
23 #define DO_TEST_ARG "self_test"
24 
25 
26 static void
27 opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
28 
29 uint16_t
30 opdl_event_enqueue_burst(void *port,
31 			 const struct rte_event ev[],
32 			 uint16_t num)
33 {
34 	struct opdl_port *p = port;
35 
36 	if (unlikely(!p->opdl->data->dev_started))
37 		return 0;
38 
39 
40 	/* either rx_enqueue or disclaim*/
41 	return p->enq(p, ev, num);
42 }
43 
44 uint16_t
45 opdl_event_enqueue(void *port, const struct rte_event *ev)
46 {
47 	struct opdl_port *p = port;
48 
49 	if (unlikely(!p->opdl->data->dev_started))
50 		return 0;
51 
52 
53 	return p->enq(p, ev, 1);
54 }
55 
56 uint16_t
57 opdl_event_dequeue_burst(void *port,
58 			 struct rte_event *ev,
59 			 uint16_t num,
60 			 uint64_t wait)
61 {
62 	struct opdl_port *p = (void *)port;
63 
64 	RTE_SET_USED(wait);
65 
66 	if (unlikely(!p->opdl->data->dev_started))
67 		return 0;
68 
69 	/* This function pointer can point to tx_dequeue or claim*/
70 	return p->deq(p, ev, num);
71 }
72 
73 uint16_t
74 opdl_event_dequeue(void *port,
75 		   struct rte_event *ev,
76 		   uint64_t wait)
77 {
78 	struct opdl_port *p = (void *)port;
79 
80 	if (unlikely(!p->opdl->data->dev_started))
81 		return 0;
82 
83 	RTE_SET_USED(wait);
84 
85 	return p->deq(p, ev, 1);
86 }
87 
88 static int
89 opdl_port_link(struct rte_eventdev *dev,
90 	       void *port,
91 	       const uint8_t queues[],
92 	       const uint8_t priorities[],
93 	       uint16_t num)
94 {
95 	struct opdl_port *p = port;
96 
97 	RTE_SET_USED(priorities);
98 	RTE_SET_USED(dev);
99 
100 	if (unlikely(dev->data->dev_started)) {
101 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
102 			     "Attempt to link queue (%u) to port %d while device started",
103 			     dev->data->dev_id,
104 				queues[0],
105 				p->id);
106 		rte_errno = EINVAL;
107 		return 0;
108 	}
109 
110 	/* Max of 1 queue per port */
111 	if (num > 1) {
112 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
113 			     "Attempt to link more than one queue (%u) to port %d requested",
114 			     dev->data->dev_id,
115 				num,
116 				p->id);
117 		rte_errno = EDQUOT;
118 		return 0;
119 	}
120 
121 	if (!p->configured) {
122 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
123 			     "port %d not configured, cannot link to %u",
124 			     dev->data->dev_id,
125 				p->id,
126 				queues[0]);
127 		rte_errno = EINVAL;
128 		return 0;
129 	}
130 
131 	if (p->external_qid != OPDL_INVALID_QID) {
132 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
133 			     "port %d already linked to queue %u, cannot link to %u",
134 			     dev->data->dev_id,
135 				p->id,
136 				p->external_qid,
137 				queues[0]);
138 		rte_errno = EINVAL;
139 		return 0;
140 	}
141 
142 	p->external_qid = queues[0];
143 
144 	return 1;
145 }
146 
147 static int
148 opdl_port_unlink(struct rte_eventdev *dev,
149 		 void *port,
150 		 uint8_t queues[],
151 		 uint16_t nb_unlinks)
152 {
153 	struct opdl_port *p = port;
154 
155 	RTE_SET_USED(queues);
156 	RTE_SET_USED(nb_unlinks);
157 
158 	if (unlikely(dev->data->dev_started)) {
159 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
160 			     "Attempt to unlink queue (%u) to port %d while device started",
161 			     dev->data->dev_id,
162 			     queues[0],
163 			     p->id);
164 		rte_errno = EINVAL;
165 		return 0;
166 	}
167 	RTE_SET_USED(nb_unlinks);
168 
169 	/* Port Stuff */
170 	p->queue_id = OPDL_INVALID_QID;
171 	p->p_type = OPDL_INVALID_PORT;
172 	p->external_qid = OPDL_INVALID_QID;
173 
174 	/* always unlink 0 queue due to statice pipeline */
175 	return 0;
176 }
177 
178 static int
179 opdl_port_setup(struct rte_eventdev *dev,
180 		uint8_t port_id,
181 		const struct rte_event_port_conf *conf)
182 {
183 	struct opdl_evdev *device = opdl_pmd_priv(dev);
184 	struct opdl_port *p = &device->ports[port_id];
185 
186 	RTE_SET_USED(conf);
187 
188 	/* Check if port already configured */
189 	if (p->configured) {
190 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
191 			     "Attempt to setup port %d which is already setup",
192 			     dev->data->dev_id,
193 			     p->id);
194 		return -EDQUOT;
195 	}
196 
197 	*p = (struct opdl_port){0}; /* zero entire structure */
198 	p->id = port_id;
199 	p->opdl = device;
200 	p->queue_id = OPDL_INVALID_QID;
201 	p->external_qid = OPDL_INVALID_QID;
202 	dev->data->ports[port_id] = p;
203 	rte_smp_wmb();
204 	p->configured = 1;
205 	device->nb_ports++;
206 	return 0;
207 }
208 
209 static void
210 opdl_port_release(void *port)
211 {
212 	struct opdl_port *p = (void *)port;
213 
214 	if (p == NULL ||
215 	    p->opdl->data->dev_started) {
216 		return;
217 	}
218 
219 	p->configured = 0;
220 	p->initialized = 0;
221 }
222 
223 static void
224 opdl_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
225 		struct rte_event_port_conf *port_conf)
226 {
227 	RTE_SET_USED(dev);
228 	RTE_SET_USED(port_id);
229 
230 	port_conf->new_event_threshold = MAX_OPDL_CONS_Q_DEPTH;
231 	port_conf->dequeue_depth = MAX_OPDL_CONS_Q_DEPTH;
232 	port_conf->enqueue_depth = MAX_OPDL_CONS_Q_DEPTH;
233 }
234 
235 static int
236 opdl_queue_setup(struct rte_eventdev *dev,
237 		 uint8_t queue_id,
238 		 const struct rte_event_queue_conf *conf)
239 {
240 	enum queue_type type;
241 
242 	struct opdl_evdev *device = opdl_pmd_priv(dev);
243 
244 	/* Extra sanity check, probably not needed */
245 	if (queue_id == OPDL_INVALID_QID) {
246 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
247 			     "Invalid queue id %u requested",
248 			     dev->data->dev_id,
249 			     queue_id);
250 		return -EINVAL;
251 	}
252 
253 	if (device->nb_q_md > device->max_queue_nb) {
254 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
255 			     "Max number of queues %u exceeded by request %u",
256 			     dev->data->dev_id,
257 			     device->max_queue_nb,
258 			     device->nb_q_md);
259 		return -EINVAL;
260 	}
261 
262 	if (RTE_EVENT_QUEUE_CFG_ALL_TYPES
263 	    & conf->event_queue_cfg) {
264 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
265 			     "QUEUE_CFG_ALL_TYPES not supported",
266 			     dev->data->dev_id);
267 		return -ENOTSUP;
268 	} else if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK
269 		   & conf->event_queue_cfg) {
270 		type = OPDL_Q_TYPE_SINGLE_LINK;
271 	} else {
272 		switch (conf->schedule_type) {
273 		case RTE_SCHED_TYPE_ORDERED:
274 			type = OPDL_Q_TYPE_ORDERED;
275 			break;
276 		case RTE_SCHED_TYPE_ATOMIC:
277 			type = OPDL_Q_TYPE_ATOMIC;
278 			break;
279 		case RTE_SCHED_TYPE_PARALLEL:
280 			type = OPDL_Q_TYPE_ORDERED;
281 			break;
282 		default:
283 			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
284 				     "Unknown queue type %d requested",
285 				     dev->data->dev_id,
286 				     conf->event_queue_cfg);
287 			return -EINVAL;
288 		}
289 	}
290 	/* Check if queue id has been setup already */
291 	uint32_t i;
292 	for (i = 0; i < device->nb_q_md; i++) {
293 		if (device->q_md[i].ext_id == queue_id) {
294 			PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
295 				     "queue id %u already setup",
296 				     dev->data->dev_id,
297 				     queue_id);
298 			return -EINVAL;
299 		}
300 	}
301 
302 	device->q_md[device->nb_q_md].ext_id = queue_id;
303 	device->q_md[device->nb_q_md].type = type;
304 	device->q_md[device->nb_q_md].setup = 1;
305 	device->nb_q_md++;
306 
307 	return 1;
308 }
309 
310 static void
311 opdl_queue_release(struct rte_eventdev *dev, uint8_t queue_id)
312 {
313 	struct opdl_evdev *device = opdl_pmd_priv(dev);
314 
315 	RTE_SET_USED(queue_id);
316 
317 	if (device->data->dev_started)
318 		return;
319 
320 }
321 
322 static void
323 opdl_queue_def_conf(struct rte_eventdev *dev,
324 		    uint8_t queue_id,
325 		    struct rte_event_queue_conf *conf)
326 {
327 	RTE_SET_USED(dev);
328 	RTE_SET_USED(queue_id);
329 
330 	static const struct rte_event_queue_conf default_conf = {
331 		.nb_atomic_flows = 1024,
332 		.nb_atomic_order_sequences = 1,
333 		.event_queue_cfg = 0,
334 		.schedule_type = RTE_SCHED_TYPE_ORDERED,
335 		.priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
336 	};
337 
338 	*conf = default_conf;
339 }
340 
341 
342 static int
343 opdl_dev_configure(const struct rte_eventdev *dev)
344 {
345 	struct opdl_evdev *opdl = opdl_pmd_priv(dev);
346 	const struct rte_eventdev_data *data = dev->data;
347 	const struct rte_event_dev_config *conf = &data->dev_conf;
348 
349 	opdl->max_queue_nb = conf->nb_event_queues;
350 	opdl->max_port_nb = conf->nb_event_ports;
351 	opdl->nb_events_limit = conf->nb_events_limit;
352 
353 	if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) {
354 		PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
355 			     "DEQUEUE_TIMEOUT not supported",
356 			     dev->data->dev_id);
357 		return -ENOTSUP;
358 	}
359 
360 	return 0;
361 }
362 
363 static void
364 opdl_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info)
365 {
366 	RTE_SET_USED(dev);
367 
368 	static const struct rte_event_dev_info evdev_opdl_info = {
369 		.driver_name = OPDL_PMD_NAME,
370 		.max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
371 		.max_event_queue_flows = OPDL_QID_NUM_FIDS,
372 		.max_event_queue_priority_levels = OPDL_Q_PRIORITY_MAX,
373 		.max_event_priority_levels = OPDL_IQS_MAX,
374 		.max_event_ports = OPDL_PORTS_MAX,
375 		.max_event_port_dequeue_depth = MAX_OPDL_CONS_Q_DEPTH,
376 		.max_event_port_enqueue_depth = MAX_OPDL_CONS_Q_DEPTH,
377 		.max_num_events = OPDL_INFLIGHT_EVENTS_TOTAL,
378 		.event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE |
379 				 RTE_EVENT_DEV_CAP_ORDERED |
380 				 RTE_EVENT_DEV_CAP_ATOMIC |
381 				 RTE_EVENT_DEV_CAP_PARALLEL |
382 				 RTE_EVENT_DEV_CAP_CARRY_FLOW_ID |
383 				 RTE_EVENT_DEV_CAP_MAINTENANCE_FREE,
384 		.max_profiles_per_port = 1,
385 	};
386 
387 	*info = evdev_opdl_info;
388 }
389 
390 static void
391 opdl_dump(struct rte_eventdev *dev, FILE *f)
392 {
393 	struct opdl_evdev *device = opdl_pmd_priv(dev);
394 
395 	if (!device->do_validation)
396 		return;
397 
398 	fprintf(f,
399 		"\n\n -- RING STATISTICS --\n");
400 	uint32_t i;
401 	for (i = 0; i < device->nb_opdls; i++)
402 		opdl_ring_dump(device->opdl[i], f);
403 
404 	fprintf(f,
405 		"\n\n -- PORT STATISTICS --\n"
406 		"Type Port Index  Port Id  Queue Id     Av. Req Size  "
407 		"Av. Grant Size     Av. Cycles PP"
408 		"      Empty DEQs   Non Empty DEQs   Pkts Processed\n");
409 
410 	for (i = 0; i < device->max_port_nb; i++) {
411 		char queue_id[64];
412 		char total_cyc[64];
413 		const char *p_type;
414 
415 		uint64_t cne, cpg;
416 		struct opdl_port *port = &device->ports[i];
417 
418 		if (port->initialized) {
419 			cne = port->port_stat[claim_non_empty];
420 			cpg = port->port_stat[claim_pkts_granted];
421 			if (port->p_type == OPDL_REGULAR_PORT)
422 				p_type = "REG";
423 			else if (port->p_type == OPDL_PURE_RX_PORT)
424 				p_type = "  RX";
425 			else if (port->p_type == OPDL_PURE_TX_PORT)
426 				p_type = "  TX";
427 			else if (port->p_type == OPDL_ASYNC_PORT)
428 				p_type = "SYNC";
429 			else
430 				p_type = "????";
431 
432 			snprintf(queue_id, sizeof(queue_id), "%02u",
433 					port->external_qid);
434 			if (port->p_type == OPDL_REGULAR_PORT ||
435 					port->p_type == OPDL_ASYNC_PORT)
436 				snprintf(total_cyc, sizeof(total_cyc),
437 					" %'16"PRIu64"",
438 					(cpg != 0 ?
439 					 port->port_stat[total_cycles] / cpg
440 					 : 0));
441 			else
442 				snprintf(total_cyc, sizeof(total_cyc),
443 					"             ----");
444 			fprintf(f,
445 				"%4s %10u %8u %9s %'16"PRIu64" %'16"PRIu64" %s "
446 				"%'16"PRIu64" %'16"PRIu64" %'16"PRIu64"\n",
447 				p_type,
448 				i,
449 				port->id,
450 				(port->external_qid == OPDL_INVALID_QID ? "---"
451 				 : queue_id),
452 				(cne != 0 ?
453 				 port->port_stat[claim_pkts_requested] / cne
454 				 : 0),
455 				(cne != 0 ?
456 				 port->port_stat[claim_pkts_granted] / cne
457 				 : 0),
458 				total_cyc,
459 				port->port_stat[claim_empty],
460 				port->port_stat[claim_non_empty],
461 				port->port_stat[claim_pkts_granted]);
462 		}
463 	}
464 	fprintf(f, "\n");
465 }
466 
467 
468 static void
469 opdl_stop(struct rte_eventdev *dev)
470 {
471 	struct opdl_evdev *device = opdl_pmd_priv(dev);
472 
473 	opdl_xstats_uninit(dev);
474 
475 	destroy_queues_and_rings(dev);
476 
477 
478 	device->started = 0;
479 
480 	rte_smp_wmb();
481 }
482 
483 static int
484 opdl_start(struct rte_eventdev *dev)
485 {
486 	int err = 0;
487 
488 	if (!err)
489 		err = create_queues_and_rings(dev);
490 
491 
492 	if (!err)
493 		err = assign_internal_queue_ids(dev);
494 
495 
496 	if (!err)
497 		err = initialise_queue_zero_ports(dev);
498 
499 
500 	if (!err)
501 		err = initialise_all_other_ports(dev);
502 
503 
504 	if (!err)
505 		err = check_queues_linked(dev);
506 
507 
508 	if (!err)
509 		err = opdl_add_event_handlers(dev);
510 
511 
512 	if (!err)
513 		err = build_all_dependencies(dev);
514 
515 	if (!err) {
516 		opdl_xstats_init(dev);
517 
518 		struct opdl_evdev *device = opdl_pmd_priv(dev);
519 
520 		PMD_DRV_LOG(INFO, "DEV_ID:[%02d] : "
521 			      "SUCCESS : Created %u total queues (%u ex, %u in),"
522 			      " %u opdls, %u event_dev ports, %u input ports",
523 			      opdl_pmd_dev_id(device),
524 			      device->nb_queues,
525 			      (device->nb_queues - device->nb_opdls),
526 			      device->nb_opdls,
527 			      device->nb_opdls,
528 			      device->nb_ports,
529 			      device->queue[0].nb_ports);
530 	} else
531 		opdl_stop(dev);
532 
533 	return err;
534 }
535 
536 static int
537 opdl_close(struct rte_eventdev *dev)
538 {
539 	struct opdl_evdev *device = opdl_pmd_priv(dev);
540 	uint32_t i;
541 
542 	for (i = 0; i < device->max_port_nb; i++) {
543 		memset(&device->ports[i],
544 		       0,
545 		       sizeof(struct opdl_port));
546 	}
547 
548 	memset(&device->s_md,
549 			0x0,
550 			sizeof(struct opdl_stage_meta_data)*OPDL_PORTS_MAX);
551 
552 	memset(&device->q_md,
553 			0xFF,
554 			sizeof(struct opdl_queue_meta_data)*OPDL_MAX_QUEUES);
555 
556 
557 	memset(device->q_map_ex_to_in,
558 			0,
559 			sizeof(uint8_t)*OPDL_INVALID_QID);
560 
561 	opdl_xstats_uninit(dev);
562 
563 	device->max_port_nb = 0;
564 
565 	device->max_queue_nb = 0;
566 
567 	device->nb_opdls = 0;
568 
569 	device->nb_queues   = 0;
570 
571 	device->nb_ports    = 0;
572 
573 	device->nb_q_md     = 0;
574 
575 	dev->data->nb_queues = 0;
576 
577 	dev->data->nb_ports = 0;
578 
579 
580 	return 0;
581 }
582 
583 static int
584 assign_numa_node(const char *key __rte_unused, const char *value, void *opaque)
585 {
586 	int *socket_id = opaque;
587 	*socket_id = atoi(value);
588 	if (*socket_id >= RTE_MAX_NUMA_NODES)
589 		return -1;
590 	return 0;
591 }
592 
593 static int
594 set_do_validation(const char *key __rte_unused, const char *value, void *opaque)
595 {
596 	int *do_val = opaque;
597 	*do_val = atoi(value);
598 	if (*do_val != 0)
599 		*do_val = 1;
600 
601 	return 0;
602 }
603 static int
604 set_do_test(const char *key __rte_unused, const char *value, void *opaque)
605 {
606 	int *do_test = opaque;
607 
608 	*do_test = atoi(value);
609 
610 	if (*do_test != 0)
611 		*do_test = 1;
612 	return 0;
613 }
614 
615 static int
616 opdl_probe(struct rte_vdev_device *vdev)
617 {
618 	static struct eventdev_ops evdev_opdl_ops = {
619 		.dev_configure = opdl_dev_configure,
620 		.dev_infos_get = opdl_info_get,
621 		.dev_close = opdl_close,
622 		.dev_start = opdl_start,
623 		.dev_stop = opdl_stop,
624 		.dump = opdl_dump,
625 
626 		.queue_def_conf = opdl_queue_def_conf,
627 		.queue_setup = opdl_queue_setup,
628 		.queue_release = opdl_queue_release,
629 		.port_def_conf = opdl_port_def_conf,
630 		.port_setup = opdl_port_setup,
631 		.port_release = opdl_port_release,
632 		.port_link = opdl_port_link,
633 		.port_unlink = opdl_port_unlink,
634 
635 
636 		.xstats_get = opdl_xstats_get,
637 		.xstats_get_names = opdl_xstats_get_names,
638 		.xstats_get_by_name = opdl_xstats_get_by_name,
639 		.xstats_reset = opdl_xstats_reset,
640 	};
641 
642 	static const char *const args[] = {
643 		NUMA_NODE_ARG,
644 		DO_VALIDATION_ARG,
645 		DO_TEST_ARG,
646 		NULL
647 	};
648 	const char *name;
649 	const char *params;
650 	struct rte_eventdev *dev;
651 	struct opdl_evdev *opdl;
652 	int socket_id = rte_socket_id();
653 	int do_validation = 0;
654 	int do_test = 0;
655 	int str_len;
656 	int test_result = 0;
657 
658 	name = rte_vdev_device_name(vdev);
659 	params = rte_vdev_device_args(vdev);
660 	if (params != NULL && params[0] != '\0') {
661 		struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
662 
663 		if (!kvlist) {
664 			PMD_DRV_LOG(INFO,
665 					"Ignoring unsupported parameters when creating device '%s'",
666 					name);
667 		} else {
668 			int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
669 					assign_numa_node, &socket_id);
670 			if (ret != 0) {
671 				PMD_DRV_LOG(ERR,
672 						"%s: Error parsing numa node parameter",
673 						name);
674 
675 				rte_kvargs_free(kvlist);
676 				return ret;
677 			}
678 
679 			ret = rte_kvargs_process(kvlist, DO_VALIDATION_ARG,
680 					set_do_validation, &do_validation);
681 			if (ret != 0) {
682 				PMD_DRV_LOG(ERR,
683 					"%s: Error parsing do validation parameter",
684 					name);
685 				rte_kvargs_free(kvlist);
686 				return ret;
687 			}
688 
689 			ret = rte_kvargs_process(kvlist, DO_TEST_ARG,
690 					set_do_test, &do_test);
691 			if (ret != 0) {
692 				PMD_DRV_LOG(ERR,
693 					"%s: Error parsing do test parameter",
694 					name);
695 				rte_kvargs_free(kvlist);
696 				return ret;
697 			}
698 
699 			rte_kvargs_free(kvlist);
700 		}
701 	}
702 	dev = rte_event_pmd_vdev_init(name,
703 			sizeof(struct opdl_evdev), socket_id, vdev);
704 
705 	if (dev == NULL) {
706 		PMD_DRV_LOG(ERR, "eventdev vdev init() failed");
707 		return -EFAULT;
708 	}
709 
710 	PMD_DRV_LOG(INFO, "DEV_ID:[%02d] : "
711 		      "Success - creating eventdev device %s, numa_node:[%d], do_validation:[%s]"
712 			  " , self_test:[%s]",
713 		      dev->data->dev_id,
714 		      name,
715 		      socket_id,
716 		      (do_validation ? "true" : "false"),
717 			  (do_test ? "true" : "false"));
718 
719 	dev->dev_ops = &evdev_opdl_ops;
720 
721 	dev->enqueue_burst = opdl_event_enqueue_burst;
722 	dev->enqueue_new_burst = opdl_event_enqueue_burst;
723 	dev->enqueue_forward_burst = opdl_event_enqueue_burst;
724 	dev->dequeue_burst = opdl_event_dequeue_burst;
725 
726 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
727 		goto done;
728 
729 	opdl = dev->data->dev_private;
730 	opdl->data = dev->data;
731 	opdl->socket = socket_id;
732 	opdl->do_validation = do_validation;
733 	opdl->do_test = do_test;
734 	str_len = strlen(name);
735 	memcpy(opdl->service_name, name, str_len);
736 
737 	if (do_test == 1)
738 		test_result =  opdl_selftest();
739 
740 done:
741 	event_dev_probing_finish(dev);
742 	return test_result;
743 }
744 
745 static int
746 opdl_remove(struct rte_vdev_device *vdev)
747 {
748 	const char *name;
749 
750 	name = rte_vdev_device_name(vdev);
751 	if (name == NULL)
752 		return -EINVAL;
753 
754 	PMD_DRV_LOG(INFO, "Closing eventdev opdl device %s", name);
755 
756 	return rte_event_pmd_vdev_uninit(name);
757 }
758 
759 static struct rte_vdev_driver evdev_opdl_pmd_drv = {
760 	.probe = opdl_probe,
761 	.remove = opdl_remove
762 };
763 
764 RTE_LOG_REGISTER_SUFFIX(opdl_logtype_driver, driver, INFO);
765 
766 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_OPDL_PMD, evdev_opdl_pmd_drv);
767 RTE_PMD_REGISTER_PARAM_STRING(event_opdl, NUMA_NODE_ARG "=<int>"
768 			      DO_VALIDATION_ARG "=<int>" DO_TEST_ARG "=<int>");
769