xref: /dpdk/drivers/dma/idxd/idxd_common.c (revision f665790a5dbad7b645ff46f31d65e977324e7bfc)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright 2021 Intel Corporation
3  */
4 
5 #include <x86intrin.h>
6 
7 #include <rte_malloc.h>
8 #include <rte_common.h>
9 #include <rte_log.h>
10 #include <rte_prefetch.h>
11 
12 #include "idxd_internal.h"
13 
14 #define IDXD_PMD_NAME_STR "dmadev_idxd"
15 
16 /* systems with DSA all support AVX2 so allow our data-path functions to
17  * always use at least that instruction set
18  */
19 #ifndef __AVX2__
20 #define __use_avx2 __attribute__((target("avx2")))
21 #else
22 #define __use_avx2
23 #endif
24 
25 __use_avx2
26 static __rte_always_inline rte_iova_t
27 __desc_idx_to_iova(struct idxd_dmadev *idxd, uint16_t n)
28 {
29 	return idxd->desc_iova + (n * sizeof(struct idxd_hw_desc));
30 }
31 
32 __use_avx2
33 static __rte_always_inline void
34 __idxd_movdir64b(volatile void *dst, const struct idxd_hw_desc *src)
35 {
36 	asm volatile (".byte 0x66, 0x0f, 0x38, 0xf8, 0x02"
37 			:
38 			: "a" (dst), "d" (src)
39 			: "memory");
40 }
41 
42 __use_avx2
43 static __rte_always_inline void
44 __submit(struct idxd_dmadev *idxd)
45 {
46 	rte_prefetch1(&idxd->batch_comp_ring[idxd->batch_idx_read]);
47 
48 	if (idxd->batch_size == 0)
49 		return;
50 
51 	/* write completion to batch comp ring */
52 	rte_iova_t comp_addr = idxd->batch_iova +
53 			(idxd->batch_idx_write * sizeof(struct idxd_completion));
54 
55 	if (idxd->batch_size == 1) {
56 		/* submit batch directly */
57 		struct idxd_hw_desc desc =
58 				idxd->desc_ring[idxd->batch_start & idxd->desc_ring_mask];
59 		desc.completion = comp_addr;
60 		desc.op_flags |= IDXD_FLAG_REQUEST_COMPLETION;
61 		_mm_sfence(); /* fence before writing desc to device */
62 		__idxd_movdir64b(idxd->portal, &desc);
63 	} else {
64 		const struct idxd_hw_desc batch_desc = {
65 				.op_flags = (idxd_op_batch << IDXD_CMD_OP_SHIFT) |
66 				IDXD_FLAG_COMPLETION_ADDR_VALID |
67 				IDXD_FLAG_REQUEST_COMPLETION,
68 				.desc_addr = __desc_idx_to_iova(idxd,
69 						idxd->batch_start & idxd->desc_ring_mask),
70 				.completion = comp_addr,
71 				.size = idxd->batch_size,
72 		};
73 		_mm_sfence(); /* fence before writing desc to device */
74 		__idxd_movdir64b(idxd->portal, &batch_desc);
75 	}
76 
77 	if (++idxd->batch_idx_write > idxd->max_batches)
78 		idxd->batch_idx_write = 0;
79 
80 	idxd->stats.submitted += idxd->batch_size;
81 
82 	idxd->batch_start += idxd->batch_size;
83 	idxd->batch_size = 0;
84 	idxd->batch_idx_ring[idxd->batch_idx_write] = idxd->batch_start;
85 	_mm256_store_si256((void *)&idxd->batch_comp_ring[idxd->batch_idx_write],
86 			_mm256_setzero_si256());
87 }
88 
89 __use_avx2
90 static __rte_always_inline int
91 __idxd_write_desc(struct idxd_dmadev *idxd,
92 		const uint32_t op_flags,
93 		const rte_iova_t src,
94 		const rte_iova_t dst,
95 		const uint32_t size,
96 		const uint32_t flags)
97 {
98 	uint16_t mask = idxd->desc_ring_mask;
99 	uint16_t job_id = idxd->batch_start + idxd->batch_size;
100 	/* we never wrap batches, so we only mask the start and allow start+size to overflow */
101 	uint16_t write_idx = (idxd->batch_start & mask) + idxd->batch_size;
102 
103 	/* first check batch ring space then desc ring space */
104 	if ((idxd->batch_idx_read == 0 && idxd->batch_idx_write == idxd->max_batches) ||
105 			idxd->batch_idx_write + 1 == idxd->batch_idx_read)
106 		return -ENOSPC;
107 	if (((write_idx + 1) & mask) == (idxd->ids_returned & mask))
108 		return -ENOSPC;
109 
110 	/* write desc. Note: descriptors don't wrap, but the completion address does */
111 	const uint64_t op_flags64 = (uint64_t)(op_flags | IDXD_FLAG_COMPLETION_ADDR_VALID) << 32;
112 	const uint64_t comp_addr = __desc_idx_to_iova(idxd, write_idx & mask);
113 	_mm256_store_si256((void *)&idxd->desc_ring[write_idx],
114 			_mm256_set_epi64x(dst, src, comp_addr, op_flags64));
115 	_mm256_store_si256((void *)&idxd->desc_ring[write_idx].size,
116 			_mm256_set_epi64x(0, 0, 0, size));
117 
118 	idxd->batch_size++;
119 
120 	rte_prefetch0_write(&idxd->desc_ring[write_idx + 1]);
121 
122 	if (flags & RTE_DMA_OP_FLAG_SUBMIT)
123 		__submit(idxd);
124 
125 	return job_id;
126 }
127 
128 __use_avx2
129 int
130 idxd_enqueue_copy(void *dev_private, uint16_t qid __rte_unused, rte_iova_t src,
131 		rte_iova_t dst, unsigned int length, uint64_t flags)
132 {
133 	/* we can take advantage of the fact that the fence flag in dmadev and DSA are the same,
134 	 * but check it at compile time to be sure.
135 	 */
136 	RTE_BUILD_BUG_ON(RTE_DMA_OP_FLAG_FENCE != IDXD_FLAG_FENCE);
137 	uint32_t memmove = (idxd_op_memmove << IDXD_CMD_OP_SHIFT) |
138 			IDXD_FLAG_CACHE_CONTROL | (flags & IDXD_FLAG_FENCE);
139 	return __idxd_write_desc(dev_private, memmove, src, dst, length,
140 			flags);
141 }
142 
143 __use_avx2
144 int
145 idxd_enqueue_fill(void *dev_private, uint16_t qid __rte_unused, uint64_t pattern,
146 		rte_iova_t dst, unsigned int length, uint64_t flags)
147 {
148 	uint32_t fill = (idxd_op_fill << IDXD_CMD_OP_SHIFT) |
149 			IDXD_FLAG_CACHE_CONTROL | (flags & IDXD_FLAG_FENCE);
150 	return __idxd_write_desc(dev_private, fill, pattern, dst, length,
151 			flags);
152 }
153 
154 __use_avx2
155 int
156 idxd_submit(void *dev_private, uint16_t qid __rte_unused)
157 {
158 	__submit(dev_private);
159 	return 0;
160 }
161 
162 __use_avx2
163 static enum rte_dma_status_code
164 get_comp_status(struct idxd_completion *c)
165 {
166 	uint8_t st = c->status;
167 	switch (st) {
168 	/* successful descriptors are not written back normally */
169 	case IDXD_COMP_STATUS_INCOMPLETE:
170 	case IDXD_COMP_STATUS_SUCCESS:
171 		return RTE_DMA_STATUS_SUCCESSFUL;
172 	case IDXD_COMP_STATUS_PAGE_FAULT:
173 		return RTE_DMA_STATUS_PAGE_FAULT;
174 	case IDXD_COMP_STATUS_INVALID_OPCODE:
175 		return RTE_DMA_STATUS_INVALID_OPCODE;
176 	case IDXD_COMP_STATUS_INVALID_SIZE:
177 		return RTE_DMA_STATUS_INVALID_LENGTH;
178 	case IDXD_COMP_STATUS_SKIPPED:
179 		return RTE_DMA_STATUS_NOT_ATTEMPTED;
180 	default:
181 		return RTE_DMA_STATUS_ERROR_UNKNOWN;
182 	}
183 }
184 
185 __use_avx2
186 int
187 idxd_vchan_status(const struct rte_dma_dev *dev, uint16_t vchan __rte_unused,
188 		enum rte_dma_vchan_status *status)
189 {
190 	struct idxd_dmadev *idxd = dev->fp_obj->dev_private;
191 	uint16_t last_batch_write = idxd->batch_idx_write == 0 ? idxd->max_batches :
192 			idxd->batch_idx_write - 1;
193 	uint8_t bstatus = (idxd->batch_comp_ring[last_batch_write].status != 0);
194 
195 	/* An IDXD device will always be either active or idle.
196 	 * RTE_DMA_VCHAN_HALTED_ERROR is therefore not supported by IDXD.
197 	 */
198 	*status = bstatus ? RTE_DMA_VCHAN_IDLE : RTE_DMA_VCHAN_ACTIVE;
199 
200 	return 0;
201 }
202 
203 __use_avx2
204 static __rte_always_inline int
205 batch_ok(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status)
206 {
207 	uint16_t ret;
208 	uint8_t bstatus;
209 
210 	if (max_ops == 0)
211 		return 0;
212 
213 	/* first check if there are any unreturned handles from last time */
214 	if (idxd->ids_avail != idxd->ids_returned) {
215 		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
216 		idxd->ids_returned += ret;
217 		if (status)
218 			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
219 		return ret;
220 	}
221 
222 	if (idxd->batch_idx_read == idxd->batch_idx_write)
223 		return 0;
224 
225 	bstatus = idxd->batch_comp_ring[idxd->batch_idx_read].status;
226 	/* now check if next batch is complete and successful */
227 	if (bstatus == IDXD_COMP_STATUS_SUCCESS) {
228 		/* since the batch idx ring stores the start of each batch, pre-increment to lookup
229 		 * start of next batch.
230 		 */
231 		if (++idxd->batch_idx_read > idxd->max_batches)
232 			idxd->batch_idx_read = 0;
233 		idxd->ids_avail = idxd->batch_idx_ring[idxd->batch_idx_read];
234 
235 		ret = RTE_MIN((uint16_t)(idxd->ids_avail - idxd->ids_returned), max_ops);
236 		idxd->ids_returned += ret;
237 		if (status)
238 			memset(status, RTE_DMA_STATUS_SUCCESSFUL, ret * sizeof(*status));
239 		return ret;
240 	}
241 	/* check if batch is incomplete */
242 	else if (bstatus == IDXD_COMP_STATUS_INCOMPLETE)
243 		return 0;
244 
245 	return -1; /* error case */
246 }
247 
248 __use_avx2
249 static inline uint16_t
250 batch_completed(struct idxd_dmadev *idxd, uint16_t max_ops, bool *has_error)
251 {
252 	uint16_t i;
253 	uint16_t b_start, b_end, next_batch;
254 
255 	int ret = batch_ok(idxd, max_ops, NULL);
256 	if (ret >= 0)
257 		return ret;
258 
259 	/* ERROR case, not successful, not incomplete */
260 	/* Get the batch size, and special case size 1.
261 	 * once we identify the actual failure job, return other jobs, then update
262 	 * the batch ring indexes to make it look like the first job of the batch has failed.
263 	 * Subsequent calls here will always return zero packets, and the error must be cleared by
264 	 * calling the completed_status() function.
265 	 */
266 	next_batch = (idxd->batch_idx_read + 1);
267 	if (next_batch > idxd->max_batches)
268 		next_batch = 0;
269 	b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
270 	b_end = idxd->batch_idx_ring[next_batch];
271 
272 	if (b_end - b_start == 1) { /* not a batch */
273 		*has_error = true;
274 		return 0;
275 	}
276 
277 	for (i = b_start; i < b_end; i++) {
278 		struct idxd_completion *c = (void *)&idxd->desc_ring[i & idxd->desc_ring_mask];
279 		if (c->status > IDXD_COMP_STATUS_SUCCESS) /* ignore incomplete(0) and success(1) */
280 			break;
281 	}
282 	ret = RTE_MIN((uint16_t)(i - idxd->ids_returned), max_ops);
283 	if (ret < max_ops)
284 		*has_error = true; /* we got up to the point of error */
285 	idxd->ids_avail = idxd->ids_returned += ret;
286 
287 	/* to ensure we can call twice and just return 0, set start of batch to where we finished */
288 	idxd->batch_comp_ring[idxd->batch_idx_read].completed_size -= ret;
289 	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
290 	if (idxd->batch_idx_ring[next_batch] - idxd->batch_idx_ring[idxd->batch_idx_read] == 1) {
291 		/* copy over the descriptor status to the batch ring as if no batch */
292 		uint16_t d_idx = idxd->batch_idx_ring[idxd->batch_idx_read] & idxd->desc_ring_mask;
293 		struct idxd_completion *desc_comp = (void *)&idxd->desc_ring[d_idx];
294 		idxd->batch_comp_ring[idxd->batch_idx_read].status = desc_comp->status;
295 	}
296 
297 	return ret;
298 }
299 
300 __use_avx2
301 static uint16_t
302 batch_completed_status(struct idxd_dmadev *idxd, uint16_t max_ops, enum rte_dma_status_code *status)
303 {
304 	uint16_t next_batch;
305 
306 	int ret = batch_ok(idxd, max_ops, status);
307 	if (ret >= 0)
308 		return ret;
309 
310 	/* ERROR case, not successful, not incomplete */
311 	/* Get the batch size, and special case size 1.
312 	 */
313 	next_batch = (idxd->batch_idx_read + 1);
314 	if (next_batch > idxd->max_batches)
315 		next_batch = 0;
316 	const uint16_t b_start = idxd->batch_idx_ring[idxd->batch_idx_read];
317 	const uint16_t b_end = idxd->batch_idx_ring[next_batch];
318 	const uint16_t b_len = b_end - b_start;
319 	if (b_len == 1) {/* not a batch */
320 		*status = get_comp_status(&idxd->batch_comp_ring[idxd->batch_idx_read]);
321 		if (status != RTE_DMA_STATUS_SUCCESSFUL)
322 			idxd->stats.errors++;
323 		idxd->ids_avail++;
324 		idxd->ids_returned++;
325 		idxd->batch_idx_read = next_batch;
326 		return 1;
327 	}
328 
329 	/* not a single-element batch, need to process more.
330 	 * Scenarios:
331 	 * 1. max_ops >= batch_size - can fit everything, simple case
332 	 *   - loop through completed ops and then add on any not-attempted ones
333 	 * 2. max_ops < batch_size - can't fit everything, more complex case
334 	 *   - loop through completed/incomplete and stop when hit max_ops
335 	 *   - adjust the batch descriptor to update where we stopped, with appropriate bcount
336 	 *   - if bcount is to be exactly 1, update the batch descriptor as it will be treated as
337 	 *     non-batch next time.
338 	 */
339 	const uint16_t bcount = idxd->batch_comp_ring[idxd->batch_idx_read].completed_size;
340 	for (ret = 0; ret < b_len && ret < max_ops; ret++) {
341 		struct idxd_completion *c = (void *)
342 				&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
343 		status[ret] = (ret < bcount) ? get_comp_status(c) : RTE_DMA_STATUS_NOT_ATTEMPTED;
344 		if (status[ret] != RTE_DMA_STATUS_SUCCESSFUL)
345 			idxd->stats.errors++;
346 	}
347 	idxd->ids_avail = idxd->ids_returned += ret;
348 
349 	/* everything fit */
350 	if (ret == b_len) {
351 		idxd->batch_idx_read = next_batch;
352 		return ret;
353 	}
354 
355 	/* set up for next time, update existing batch descriptor & start idx at batch_idx_read */
356 	idxd->batch_idx_ring[idxd->batch_idx_read] += ret;
357 	if (ret > bcount) {
358 		/* we have only incomplete ones - set batch completed size to 0 */
359 		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
360 		comp->completed_size = 0;
361 		/* if there is only one descriptor left, job skipped so set flag appropriately */
362 		if (b_len - ret == 1)
363 			comp->status = IDXD_COMP_STATUS_SKIPPED;
364 	} else {
365 		struct idxd_completion *comp = &idxd->batch_comp_ring[idxd->batch_idx_read];
366 		comp->completed_size -= ret;
367 		/* if there is only one descriptor left, copy status info straight to desc */
368 		if (comp->completed_size == 1) {
369 			struct idxd_completion *c = (void *)
370 					&idxd->desc_ring[(b_start + ret) & idxd->desc_ring_mask];
371 			comp->status = c->status;
372 			/* individual descs can be ok without writeback, but not batches */
373 			if (comp->status == IDXD_COMP_STATUS_INCOMPLETE)
374 				comp->status = IDXD_COMP_STATUS_SUCCESS;
375 		} else if (bcount == b_len) {
376 			/* check if we still have an error, and clear flag if not */
377 			uint16_t i;
378 			for (i = b_start + ret; i < b_end; i++) {
379 				struct idxd_completion *c = (void *)
380 						&idxd->desc_ring[i & idxd->desc_ring_mask];
381 				if (c->status > IDXD_COMP_STATUS_SUCCESS)
382 					break;
383 			}
384 			if (i == b_end) /* no errors */
385 				comp->status = IDXD_COMP_STATUS_SUCCESS;
386 		}
387 	}
388 
389 	return ret;
390 }
391 
392 __use_avx2
393 uint16_t
394 idxd_completed(void *dev_private, uint16_t qid __rte_unused, uint16_t max_ops,
395 		uint16_t *last_idx, bool *has_error)
396 {
397 	struct idxd_dmadev *idxd = dev_private;
398 	uint16_t batch, ret = 0;
399 
400 	do {
401 		batch = batch_completed(idxd, max_ops - ret, has_error);
402 		ret += batch;
403 	} while (batch > 0 && *has_error == false);
404 
405 	idxd->stats.completed += ret;
406 	*last_idx = idxd->ids_returned - 1;
407 	return ret;
408 }
409 
410 __use_avx2
411 uint16_t
412 idxd_completed_status(void *dev_private, uint16_t qid __rte_unused, uint16_t max_ops,
413 		uint16_t *last_idx, enum rte_dma_status_code *status)
414 {
415 	struct idxd_dmadev *idxd = dev_private;
416 	uint16_t batch, ret = 0;
417 
418 	do {
419 		batch = batch_completed_status(idxd, max_ops - ret, &status[ret]);
420 		ret += batch;
421 	} while (batch > 0);
422 
423 	idxd->stats.completed += ret;
424 	*last_idx = idxd->ids_returned - 1;
425 	return ret;
426 }
427 
428 int
429 idxd_dump(const struct rte_dma_dev *dev, FILE *f)
430 {
431 	struct idxd_dmadev *idxd = dev->fp_obj->dev_private;
432 	unsigned int i;
433 
434 	fprintf(f, "== IDXD Private Data ==\n");
435 	fprintf(f, "  Portal: %p\n", idxd->portal);
436 	fprintf(f, "  Config: { ring_size: %u }\n",
437 			idxd->qcfg.nb_desc);
438 	fprintf(f, "  Batch ring (sz = %u, max_batches = %u):\n\t",
439 			idxd->max_batches + 1, idxd->max_batches);
440 	for (i = 0; i <= idxd->max_batches; i++) {
441 		fprintf(f, " %u ", idxd->batch_idx_ring[i]);
442 		if (i == idxd->batch_idx_read && i == idxd->batch_idx_write)
443 			fprintf(f, "[rd ptr, wr ptr] ");
444 		else if (i == idxd->batch_idx_read)
445 			fprintf(f, "[rd ptr] ");
446 		else if (i == idxd->batch_idx_write)
447 			fprintf(f, "[wr ptr] ");
448 		if (i == idxd->max_batches)
449 			fprintf(f, "\n");
450 	}
451 
452 	fprintf(f, "  Curr batch: start = %u, size = %u\n", idxd->batch_start, idxd->batch_size);
453 	fprintf(f, "  IDS: avail = %u, returned: %u\n", idxd->ids_avail, idxd->ids_returned);
454 	return 0;
455 }
456 
457 int
458 idxd_stats_get(const struct rte_dma_dev *dev, uint16_t vchan __rte_unused,
459 		struct rte_dma_stats *stats, uint32_t stats_sz)
460 {
461 	struct idxd_dmadev *idxd = dev->fp_obj->dev_private;
462 	if (stats_sz < sizeof(*stats))
463 		return -EINVAL;
464 	*stats = idxd->stats;
465 	return 0;
466 }
467 
468 int
469 idxd_stats_reset(struct rte_dma_dev *dev, uint16_t vchan __rte_unused)
470 {
471 	struct idxd_dmadev *idxd = dev->fp_obj->dev_private;
472 	idxd->stats = (struct rte_dma_stats){0};
473 	return 0;
474 }
475 
476 int
477 idxd_info_get(const struct rte_dma_dev *dev, struct rte_dma_info *info, uint32_t size)
478 {
479 	struct idxd_dmadev *idxd = dev->fp_obj->dev_private;
480 
481 	if (size < sizeof(*info))
482 		return -EINVAL;
483 
484 	*info = (struct rte_dma_info) {
485 			.dev_capa = RTE_DMA_CAPA_MEM_TO_MEM | RTE_DMA_CAPA_HANDLES_ERRORS |
486 				RTE_DMA_CAPA_OPS_COPY | RTE_DMA_CAPA_OPS_FILL,
487 			.max_vchans = 1,
488 			.max_desc = 4096,
489 			.min_desc = 64,
490 	};
491 	if (idxd->sva_support)
492 		info->dev_capa |= RTE_DMA_CAPA_SVA;
493 	return 0;
494 }
495 
496 uint16_t
497 idxd_burst_capacity(const void *dev_private, uint16_t vchan __rte_unused)
498 {
499 	const struct idxd_dmadev *idxd = dev_private;
500 	uint16_t write_idx = idxd->batch_start + idxd->batch_size;
501 	uint16_t used_space;
502 
503 	/* Check for space in the batch ring */
504 	if ((idxd->batch_idx_read == 0 && idxd->batch_idx_write == idxd->max_batches) ||
505 			idxd->batch_idx_write + 1 == idxd->batch_idx_read)
506 		return 0;
507 
508 	/* Subtract and mask to get in correct range */
509 	used_space = (write_idx - idxd->ids_returned) & idxd->desc_ring_mask;
510 
511 	const int ret = RTE_MIN((idxd->desc_ring_mask - used_space),
512 			(idxd->max_batch_size - idxd->batch_size));
513 	return ret < 0 ? 0 : (uint16_t)ret;
514 }
515 
516 int
517 idxd_configure(struct rte_dma_dev *dev __rte_unused, const struct rte_dma_conf *dev_conf,
518 		uint32_t conf_sz)
519 {
520 	if (sizeof(struct rte_dma_conf) != conf_sz)
521 		return -EINVAL;
522 
523 	if (dev_conf->nb_vchans != 1)
524 		return -EINVAL;
525 	return 0;
526 }
527 
528 int
529 idxd_vchan_setup(struct rte_dma_dev *dev, uint16_t vchan __rte_unused,
530 		const struct rte_dma_vchan_conf *qconf, uint32_t qconf_sz)
531 {
532 	struct idxd_dmadev *idxd = dev->fp_obj->dev_private;
533 	uint16_t max_desc = qconf->nb_desc;
534 
535 	if (sizeof(struct rte_dma_vchan_conf) != qconf_sz)
536 		return -EINVAL;
537 
538 	idxd->qcfg = *qconf;
539 
540 	if (!rte_is_power_of_2(max_desc))
541 		max_desc = rte_align32pow2(max_desc);
542 	IDXD_PMD_DEBUG("DMA dev %u using %u descriptors", dev->data->dev_id, max_desc);
543 	idxd->desc_ring_mask = max_desc - 1;
544 	idxd->qcfg.nb_desc = max_desc;
545 
546 	/* in case we are reconfiguring a device, free any existing memory */
547 	rte_free(idxd->desc_ring);
548 
549 	/* allocate the descriptor ring at 2x size as batches can't wrap */
550 	idxd->desc_ring = rte_zmalloc(NULL, sizeof(*idxd->desc_ring) * max_desc * 2, 0);
551 	if (idxd->desc_ring == NULL)
552 		return -ENOMEM;
553 	idxd->desc_iova = rte_mem_virt2iova(idxd->desc_ring);
554 
555 	idxd->batch_idx_read = 0;
556 	idxd->batch_idx_write = 0;
557 	idxd->batch_start = 0;
558 	idxd->batch_size = 0;
559 	idxd->ids_returned = 0;
560 	idxd->ids_avail = 0;
561 
562 	memset(idxd->batch_comp_ring, 0, sizeof(*idxd->batch_comp_ring) *
563 			(idxd->max_batches + 1));
564 	return 0;
565 }
566 
567 int
568 idxd_dmadev_create(const char *name, struct rte_device *dev,
569 		   const struct idxd_dmadev *base_idxd,
570 		   const struct rte_dma_dev_ops *ops)
571 {
572 	struct idxd_dmadev *idxd = NULL;
573 	struct rte_dma_dev *dmadev = NULL;
574 	int ret = 0;
575 
576 	RTE_BUILD_BUG_ON(sizeof(struct idxd_hw_desc) != 64);
577 	RTE_BUILD_BUG_ON(offsetof(struct idxd_hw_desc, size) != 32);
578 	RTE_BUILD_BUG_ON(sizeof(struct idxd_completion) != 32);
579 
580 	if (!name) {
581 		IDXD_PMD_ERR("Invalid name of the device!");
582 		ret = -EINVAL;
583 		goto cleanup;
584 	}
585 
586 	/* Allocate device structure */
587 	dmadev = rte_dma_pmd_allocate(name, dev->numa_node, sizeof(struct idxd_dmadev));
588 	if (dmadev == NULL) {
589 		IDXD_PMD_ERR("Unable to allocate dma device");
590 		ret = -ENOMEM;
591 		goto cleanup;
592 	}
593 	dmadev->dev_ops = ops;
594 	dmadev->device = dev;
595 
596 	dmadev->fp_obj->copy = idxd_enqueue_copy;
597 	dmadev->fp_obj->fill = idxd_enqueue_fill;
598 	dmadev->fp_obj->submit = idxd_submit;
599 	dmadev->fp_obj->completed = idxd_completed;
600 	dmadev->fp_obj->completed_status = idxd_completed_status;
601 	dmadev->fp_obj->burst_capacity = idxd_burst_capacity;
602 	dmadev->fp_obj->dev_private = dmadev->data->dev_private;
603 
604 	if (rte_eal_process_type() != RTE_PROC_PRIMARY)
605 		return 0;
606 
607 	idxd = dmadev->data->dev_private;
608 	*idxd = *base_idxd; /* copy over the main fields already passed in */
609 	idxd->dmadev = dmadev;
610 
611 	/* allocate batch index ring and completion ring.
612 	 * The +1 is because we can never fully use
613 	 * the ring, otherwise read == write means both full and empty.
614 	 */
615 	idxd->batch_comp_ring = rte_zmalloc_socket(NULL, (sizeof(idxd->batch_idx_ring[0]) +
616 			sizeof(idxd->batch_comp_ring[0]))	* (idxd->max_batches + 1),
617 			sizeof(idxd->batch_comp_ring[0]), dev->numa_node);
618 	if (idxd->batch_comp_ring == NULL) {
619 		IDXD_PMD_ERR("Unable to reserve memory for batch data");
620 		ret = -ENOMEM;
621 		goto cleanup;
622 	}
623 	idxd->batch_idx_ring = (void *)&idxd->batch_comp_ring[idxd->max_batches+1];
624 	idxd->batch_iova = rte_mem_virt2iova(idxd->batch_comp_ring);
625 
626 	idxd->dmadev->state = RTE_DMA_DEV_READY;
627 
628 	return 0;
629 
630 cleanup:
631 	if (dmadev)
632 		rte_dma_pmd_release(name);
633 
634 	return ret;
635 }
636 
637 int idxd_pmd_logtype;
638 
639 RTE_LOG_REGISTER_DEFAULT(idxd_pmd_logtype, WARNING);
640