xref: /spdk/lib/idxd/idxd.c (revision 16d862d0380886f6fc765f68a87e240bb4295595)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/env.h"
9 #include "spdk/util.h"
10 #include "spdk/memory.h"
11 #include "spdk/likely.h"
12 
13 #include "spdk/log.h"
14 #include "spdk_internal/idxd.h"
15 
16 #include "idxd_internal.h"
17 
18 #define ALIGN_4K 0x1000
19 #define USERSPACE_DRIVER_NAME "user"
20 #define KERNEL_DRIVER_NAME "kernel"
21 
22 /* The max number of completions processed per poll */
23 #define IDXD_MAX_COMPLETIONS      128
24 
25 /* The minimum number of entries in batch per flush */
26 #define IDXD_MIN_BATCH_FLUSH      32
27 
28 #define DATA_BLOCK_SIZE_512 512
29 #define DATA_BLOCK_SIZE_520 520
30 #define DATA_BLOCK_SIZE_4096 4096
31 #define DATA_BLOCK_SIZE_4104 4104
32 
33 #define METADATA_SIZE_8 8
34 #define METADATA_SIZE_16 16
35 
36 static STAILQ_HEAD(, spdk_idxd_impl) g_idxd_impls = STAILQ_HEAD_INITIALIZER(g_idxd_impls);
37 static struct spdk_idxd_impl *g_idxd_impl;
38 
39 uint32_t
40 spdk_idxd_get_socket(struct spdk_idxd_device *idxd)
41 {
42 	return idxd->socket_id;
43 }
44 
45 static inline void
46 _submit_to_hw(struct spdk_idxd_io_channel *chan, struct idxd_ops *op)
47 {
48 	STAILQ_INSERT_TAIL(&chan->ops_outstanding, op, link);
49 	/*
50 	 * We must barrier before writing the descriptor to ensure that data
51 	 * has been correctly flushed from the associated data buffers before DMA
52 	 * operations begin.
53 	 */
54 	_spdk_wmb();
55 	movdir64b(chan->portal + chan->portal_offset, op->desc);
56 	chan->portal_offset = (chan->portal_offset + chan->idxd->chan_per_device * PORTAL_STRIDE) &
57 			      PORTAL_MASK;
58 }
59 
60 inline static int
61 _vtophys(struct spdk_idxd_io_channel *chan, const void *buf, uint64_t *buf_addr, uint64_t size)
62 {
63 	uint64_t updated_size = size;
64 
65 	if (chan->pasid_enabled) {
66 		/* We can just use virtual addresses */
67 		*buf_addr = (uint64_t)buf;
68 		return 0;
69 	}
70 
71 	*buf_addr = spdk_vtophys(buf, &updated_size);
72 
73 	if (*buf_addr == SPDK_VTOPHYS_ERROR) {
74 		SPDK_ERRLOG("Error translating address\n");
75 		return -EINVAL;
76 	}
77 
78 	if (updated_size < size) {
79 		SPDK_ERRLOG("Error translating size (0x%lx), return size (0x%lx)\n", size, updated_size);
80 		return -EINVAL;
81 	}
82 
83 	return 0;
84 }
85 
86 struct idxd_vtophys_iter {
87 	const void	*src;
88 	void		*dst;
89 	uint64_t	len;
90 
91 	uint64_t	offset;
92 
93 	bool		pasid_enabled;
94 };
95 
96 static void
97 idxd_vtophys_iter_init(struct spdk_idxd_io_channel *chan,
98 		       struct idxd_vtophys_iter *iter,
99 		       const void *src, void *dst, uint64_t len)
100 {
101 	iter->src = src;
102 	iter->dst = dst;
103 	iter->len = len;
104 	iter->offset = 0;
105 	iter->pasid_enabled = chan->pasid_enabled;
106 }
107 
108 static uint64_t
109 idxd_vtophys_iter_next(struct idxd_vtophys_iter *iter,
110 		       uint64_t *src_phys, uint64_t *dst_phys)
111 {
112 	uint64_t src_off, dst_off, len;
113 	const void *src;
114 	void *dst;
115 
116 	src = iter->src + iter->offset;
117 	dst = iter->dst + iter->offset;
118 
119 	if (iter->offset == iter->len) {
120 		return 0;
121 	}
122 
123 	if (iter->pasid_enabled) {
124 		*src_phys = (uint64_t)src;
125 		*dst_phys = (uint64_t)dst;
126 		return iter->len;
127 	}
128 
129 	len = iter->len - iter->offset;
130 
131 	src_off = len;
132 	*src_phys = spdk_vtophys(src, &src_off);
133 	if (*src_phys == SPDK_VTOPHYS_ERROR) {
134 		SPDK_ERRLOG("Error translating address\n");
135 		return SPDK_VTOPHYS_ERROR;
136 	}
137 
138 	dst_off = len;
139 	*dst_phys = spdk_vtophys(dst, &dst_off);
140 	if (*dst_phys == SPDK_VTOPHYS_ERROR) {
141 		SPDK_ERRLOG("Error translating address\n");
142 		return SPDK_VTOPHYS_ERROR;
143 	}
144 
145 	len = spdk_min(src_off, dst_off);
146 	iter->offset += len;
147 
148 	return len;
149 }
150 
151 /* helper function for DSA specific spdk_idxd_get_channel() stuff */
152 static int
153 _dsa_alloc_batches(struct spdk_idxd_io_channel *chan, int num_descriptors)
154 {
155 	struct idxd_batch *batch;
156 	struct idxd_hw_desc *desc;
157 	struct idxd_ops *op;
158 	int i, j, num_batches, rc = -1;
159 
160 	/* Allocate batches */
161 	num_batches = num_descriptors;
162 	chan->batch_base = calloc(num_batches, sizeof(struct idxd_batch));
163 	if (chan->batch_base == NULL) {
164 		SPDK_ERRLOG("Failed to allocate batch pool\n");
165 		return -ENOMEM;
166 	}
167 	batch = chan->batch_base;
168 	for (i = 0 ; i < num_batches ; i++) {
169 		batch->size = chan->idxd->batch_size;
170 		batch->user_desc = desc = spdk_zmalloc(batch->size * sizeof(struct idxd_hw_desc),
171 						       0x40, NULL,
172 						       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
173 		if (batch->user_desc == NULL) {
174 			SPDK_ERRLOG("Failed to allocate batch descriptor memory\n");
175 			goto error_user;
176 		}
177 
178 		rc = _vtophys(chan, batch->user_desc, &batch->user_desc_addr,
179 			      batch->size * sizeof(struct idxd_hw_desc));
180 		if (rc) {
181 			SPDK_ERRLOG("Failed to translate batch descriptor memory\n");
182 			goto error_user;
183 		}
184 
185 		batch->user_ops = op = spdk_zmalloc(batch->size * sizeof(struct idxd_ops),
186 						    0x40, NULL,
187 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
188 		if (batch->user_ops == NULL) {
189 			SPDK_ERRLOG("Failed to allocate user completion memory\n");
190 			goto error_user;
191 		}
192 
193 		for (j = 0; j < batch->size; j++) {
194 			rc = _vtophys(chan, &op->hw, &desc->completion_addr, sizeof(struct dsa_hw_comp_record));
195 			if (rc) {
196 				SPDK_ERRLOG("Failed to translate batch entry completion memory\n");
197 				goto error_user;
198 			}
199 			op++;
200 			desc++;
201 		}
202 		TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
203 		batch++;
204 	}
205 	return 0;
206 
207 error_user:
208 	TAILQ_FOREACH(batch, &chan->batch_pool, link) {
209 		spdk_free(batch->user_ops);
210 		batch->user_ops = NULL;
211 		spdk_free(batch->user_desc);
212 		batch->user_desc = NULL;
213 	}
214 	return rc;
215 }
216 
217 struct spdk_idxd_io_channel *
218 spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
219 {
220 	struct spdk_idxd_io_channel *chan;
221 	struct idxd_hw_desc *desc;
222 	struct idxd_ops *op;
223 	int i, num_descriptors, rc = -1;
224 	uint32_t comp_rec_size;
225 
226 	assert(idxd != NULL);
227 
228 	chan = calloc(1, sizeof(struct spdk_idxd_io_channel));
229 	if (chan == NULL) {
230 		SPDK_ERRLOG("Failed to allocate idxd chan\n");
231 		return NULL;
232 	}
233 
234 	chan->idxd = idxd;
235 	chan->pasid_enabled = idxd->pasid_enabled;
236 	STAILQ_INIT(&chan->ops_pool);
237 	TAILQ_INIT(&chan->batch_pool);
238 	STAILQ_INIT(&chan->ops_outstanding);
239 
240 	/* Assign WQ, portal */
241 	pthread_mutex_lock(&idxd->num_channels_lock);
242 	if (idxd->num_channels == idxd->chan_per_device) {
243 		/* too many channels sharing this device */
244 		pthread_mutex_unlock(&idxd->num_channels_lock);
245 		SPDK_ERRLOG("Too many channels sharing this device\n");
246 		goto error;
247 	}
248 
249 	/* Have each channel start at a different offset. */
250 	chan->portal = idxd->impl->portal_get_addr(idxd);
251 	chan->portal_offset = (idxd->num_channels * PORTAL_STRIDE) & PORTAL_MASK;
252 	idxd->num_channels++;
253 
254 	pthread_mutex_unlock(&idxd->num_channels_lock);
255 
256 	/* Allocate descriptors and completions */
257 	num_descriptors = idxd->total_wq_size / idxd->chan_per_device;
258 	chan->desc_base = desc = spdk_zmalloc(num_descriptors * sizeof(struct idxd_hw_desc),
259 					      0x40, NULL,
260 					      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
261 	if (chan->desc_base == NULL) {
262 		SPDK_ERRLOG("Failed to allocate DSA descriptor memory\n");
263 		goto error;
264 	}
265 
266 	chan->ops_base = op = spdk_zmalloc(num_descriptors * sizeof(struct idxd_ops),
267 					   0x40, NULL,
268 					   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
269 	if (chan->ops_base == NULL) {
270 		SPDK_ERRLOG("Failed to allocate idxd_ops memory\n");
271 		goto error;
272 	}
273 
274 	if (idxd->type == IDXD_DEV_TYPE_DSA) {
275 		comp_rec_size = sizeof(struct dsa_hw_comp_record);
276 		if (_dsa_alloc_batches(chan, num_descriptors)) {
277 			goto error;
278 		}
279 	} else {
280 		comp_rec_size = sizeof(struct iaa_hw_comp_record);
281 	}
282 
283 	for (i = 0; i < num_descriptors; i++) {
284 		STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
285 		op->desc = desc;
286 		rc = _vtophys(chan, &op->hw, &desc->completion_addr, comp_rec_size);
287 		if (rc) {
288 			SPDK_ERRLOG("Failed to translate completion memory\n");
289 			goto error;
290 		}
291 		op++;
292 		desc++;
293 	}
294 
295 	return chan;
296 
297 error:
298 	spdk_free(chan->ops_base);
299 	chan->ops_base = NULL;
300 	spdk_free(chan->desc_base);
301 	chan->desc_base = NULL;
302 	free(chan);
303 	return NULL;
304 }
305 
306 static int idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status);
307 
308 void
309 spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan)
310 {
311 	struct idxd_batch *batch;
312 
313 	assert(chan != NULL);
314 	assert(chan->idxd != NULL);
315 
316 	if (chan->batch) {
317 		idxd_batch_cancel(chan, -ECANCELED);
318 	}
319 
320 	pthread_mutex_lock(&chan->idxd->num_channels_lock);
321 	assert(chan->idxd->num_channels > 0);
322 	chan->idxd->num_channels--;
323 	pthread_mutex_unlock(&chan->idxd->num_channels_lock);
324 
325 	spdk_free(chan->ops_base);
326 	spdk_free(chan->desc_base);
327 	while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
328 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
329 		spdk_free(batch->user_ops);
330 		spdk_free(batch->user_desc);
331 	}
332 	free(chan->batch_base);
333 	free(chan);
334 }
335 
336 static inline struct spdk_idxd_impl *
337 idxd_get_impl_by_name(const char *impl_name)
338 {
339 	struct spdk_idxd_impl *impl;
340 
341 	assert(impl_name != NULL);
342 	STAILQ_FOREACH(impl, &g_idxd_impls, link) {
343 		if (0 == strcmp(impl_name, impl->name)) {
344 			return impl;
345 		}
346 	}
347 
348 	return NULL;
349 }
350 
351 int
352 spdk_idxd_set_config(bool kernel_mode)
353 {
354 	struct spdk_idxd_impl *tmp;
355 
356 	if (kernel_mode) {
357 		tmp = idxd_get_impl_by_name(KERNEL_DRIVER_NAME);
358 	} else {
359 		tmp = idxd_get_impl_by_name(USERSPACE_DRIVER_NAME);
360 	}
361 
362 	if (g_idxd_impl != NULL && g_idxd_impl != tmp) {
363 		SPDK_ERRLOG("Cannot change idxd implementation after devices are initialized\n");
364 		assert(false);
365 		return -EALREADY;
366 	}
367 	g_idxd_impl = tmp;
368 
369 	if (g_idxd_impl == NULL) {
370 		SPDK_ERRLOG("Cannot set the idxd implementation with %s mode\n",
371 			    kernel_mode ? KERNEL_DRIVER_NAME : USERSPACE_DRIVER_NAME);
372 		return -EINVAL;
373 	}
374 
375 	return 0;
376 }
377 
378 static void
379 idxd_device_destruct(struct spdk_idxd_device *idxd)
380 {
381 	assert(idxd->impl != NULL);
382 
383 	idxd->impl->destruct(idxd);
384 }
385 
386 int
387 spdk_idxd_probe(void *cb_ctx, spdk_idxd_attach_cb attach_cb,
388 		spdk_idxd_probe_cb probe_cb)
389 {
390 	if (g_idxd_impl == NULL) {
391 		SPDK_ERRLOG("No idxd impl is selected\n");
392 		return -1;
393 	}
394 
395 	return g_idxd_impl->probe(cb_ctx, attach_cb, probe_cb);
396 }
397 
398 void
399 spdk_idxd_detach(struct spdk_idxd_device *idxd)
400 {
401 	assert(idxd != NULL);
402 	idxd_device_destruct(idxd);
403 }
404 
405 static int
406 _idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, void *cb_arg,
407 		   int flags, struct idxd_hw_desc **_desc, struct idxd_ops **_op)
408 {
409 	struct idxd_hw_desc *desc;
410 	struct idxd_ops *op;
411 	uint64_t comp_addr;
412 
413 	if (!STAILQ_EMPTY(&chan->ops_pool)) {
414 		op = *_op = STAILQ_FIRST(&chan->ops_pool);
415 		desc = *_desc = op->desc;
416 		comp_addr = desc->completion_addr;
417 		memset(desc, 0, sizeof(*desc));
418 		desc->completion_addr = comp_addr;
419 		STAILQ_REMOVE_HEAD(&chan->ops_pool, link);
420 	} else {
421 		/* The application needs to handle this, violation of flow control */
422 		return -EBUSY;
423 	}
424 
425 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
426 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
427 
428 	desc->flags = flags;
429 	op->cb_arg = cb_arg;
430 	op->cb_fn = cb_fn;
431 	op->batch = NULL;
432 	op->parent = NULL;
433 	op->count = 1;
434 
435 	return 0;
436 }
437 
438 static int
439 _idxd_prep_batch_cmd(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn,
440 		     void *cb_arg, int flags,
441 		     struct idxd_hw_desc **_desc, struct idxd_ops **_op)
442 {
443 	struct idxd_hw_desc *desc;
444 	struct idxd_ops *op;
445 	uint64_t comp_addr;
446 	struct idxd_batch *batch;
447 
448 	batch = chan->batch;
449 
450 	assert(batch != NULL);
451 	if (batch->index == batch->size) {
452 		return -EBUSY;
453 	}
454 
455 	desc = *_desc = &batch->user_desc[batch->index];
456 	op = *_op = &batch->user_ops[batch->index];
457 
458 	op->desc = desc;
459 	SPDK_DEBUGLOG(idxd, "Prep batch %p index %u\n", batch, batch->index);
460 
461 	batch->index++;
462 
463 	comp_addr = desc->completion_addr;
464 	memset(desc, 0, sizeof(*desc));
465 	desc->completion_addr = comp_addr;
466 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
467 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
468 	desc->flags = flags;
469 	op->cb_arg = cb_arg;
470 	op->cb_fn = cb_fn;
471 	op->batch = batch;
472 	op->parent = NULL;
473 	op->count = 1;
474 	op->crc_dst = NULL;
475 
476 	return 0;
477 }
478 
479 static struct idxd_batch *
480 idxd_batch_create(struct spdk_idxd_io_channel *chan)
481 {
482 	struct idxd_batch *batch;
483 
484 	assert(chan != NULL);
485 	assert(chan->batch == NULL);
486 
487 	if (!TAILQ_EMPTY(&chan->batch_pool)) {
488 		batch = TAILQ_FIRST(&chan->batch_pool);
489 		batch->index = 0;
490 		batch->chan = chan;
491 		chan->batch = batch;
492 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
493 	} else {
494 		/* The application needs to handle this. */
495 		return NULL;
496 	}
497 
498 	return batch;
499 }
500 
501 static void
502 _free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan)
503 {
504 	SPDK_DEBUGLOG(idxd, "Free batch %p\n", batch);
505 	assert(batch->refcnt == 0);
506 	batch->index = 0;
507 	batch->chan = NULL;
508 	TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
509 }
510 
511 static int
512 idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status)
513 {
514 	struct idxd_ops *op;
515 	struct idxd_batch *batch;
516 	int i;
517 
518 	assert(chan != NULL);
519 
520 	batch = chan->batch;
521 	assert(batch != NULL);
522 
523 	if (batch->index == UINT16_MAX) {
524 		SPDK_ERRLOG("Cannot cancel batch, already submitted to HW.\n");
525 		return -EINVAL;
526 	}
527 
528 	chan->batch = NULL;
529 
530 	for (i = 0; i < batch->index; i++) {
531 		op = &batch->user_ops[i];
532 		if (op->cb_fn) {
533 			op->cb_fn(op->cb_arg, status);
534 		}
535 	}
536 
537 	_free_batch(batch, chan);
538 
539 	return 0;
540 }
541 
542 static int
543 idxd_batch_submit(struct spdk_idxd_io_channel *chan,
544 		  spdk_idxd_req_cb cb_fn, void *cb_arg)
545 {
546 	struct idxd_hw_desc *desc;
547 	struct idxd_batch *batch;
548 	struct idxd_ops *op;
549 	int i, rc, flags = 0;
550 
551 	assert(chan != NULL);
552 
553 	batch = chan->batch;
554 	assert(batch != NULL);
555 
556 	if (batch->index == 0) {
557 		return idxd_batch_cancel(chan, 0);
558 	}
559 
560 	/* Common prep. */
561 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
562 	if (rc) {
563 		return rc;
564 	}
565 
566 	if (batch->index == 1) {
567 		uint64_t completion_addr;
568 
569 		/* If there's only one command, convert it away from a batch. */
570 		completion_addr = desc->completion_addr;
571 		memcpy(desc, &batch->user_desc[0], sizeof(*desc));
572 		desc->completion_addr = completion_addr;
573 		op->cb_fn = batch->user_ops[0].cb_fn;
574 		op->cb_arg = batch->user_ops[0].cb_arg;
575 		op->crc_dst = batch->user_ops[0].crc_dst;
576 		_free_batch(batch, chan);
577 	} else {
578 		/* Command specific. */
579 		desc->opcode = IDXD_OPCODE_BATCH;
580 		desc->desc_list_addr = batch->user_desc_addr;
581 		desc->desc_count = batch->index;
582 		assert(batch->index <= batch->size);
583 
584 		/* Add the batch elements completion contexts to the outstanding list to be polled. */
585 		for (i = 0 ; i < batch->index; i++) {
586 			batch->refcnt++;
587 			STAILQ_INSERT_TAIL(&chan->ops_outstanding, (struct idxd_ops *)&batch->user_ops[i],
588 					   link);
589 		}
590 		batch->index = UINT16_MAX;
591 	}
592 
593 	chan->batch = NULL;
594 
595 	/* Submit operation. */
596 	_submit_to_hw(chan, op);
597 	SPDK_DEBUGLOG(idxd, "Submitted batch %p\n", batch);
598 
599 	return 0;
600 }
601 
602 static int
603 _idxd_setup_batch(struct spdk_idxd_io_channel *chan)
604 {
605 	struct idxd_batch *batch;
606 
607 	if (chan->batch == NULL) {
608 		batch = idxd_batch_create(chan);
609 		if (batch == NULL) {
610 			return -EBUSY;
611 		}
612 	}
613 
614 	return 0;
615 }
616 
617 static int
618 _idxd_flush_batch(struct spdk_idxd_io_channel *chan)
619 {
620 	struct idxd_batch *batch = chan->batch;
621 	int rc;
622 
623 	if (batch != NULL && batch->index >= IDXD_MIN_BATCH_FLUSH) {
624 		/* Close out the full batch */
625 		rc = idxd_batch_submit(chan, NULL, NULL);
626 		if (rc) {
627 			assert(rc == -EBUSY);
628 			/*
629 			 * Return 0. This will get re-submitted within idxd_process_events where
630 			 * if it fails, it will get correctly aborted.
631 			 */
632 			return 0;
633 		}
634 	}
635 
636 	return 0;
637 }
638 
639 static inline void
640 _update_write_flags(struct spdk_idxd_io_channel *chan, struct idxd_hw_desc *desc)
641 {
642 	desc->flags ^= IDXD_FLAG_CACHE_CONTROL;
643 }
644 
645 int
646 spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan,
647 		      struct iovec *diov, uint32_t diovcnt,
648 		      struct iovec *siov, uint32_t siovcnt,
649 		      int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
650 {
651 	struct idxd_hw_desc *desc;
652 	struct idxd_ops *first_op, *op;
653 	void *src, *dst;
654 	uint64_t src_addr, dst_addr;
655 	int rc, count;
656 	uint64_t len, seg_len;
657 	struct spdk_ioviter iter;
658 	struct idxd_vtophys_iter vtophys_iter;
659 
660 	assert(chan != NULL);
661 	assert(diov != NULL);
662 	assert(siov != NULL);
663 
664 	rc = _idxd_setup_batch(chan);
665 	if (rc) {
666 		return rc;
667 	}
668 
669 	count = 0;
670 	first_op = NULL;
671 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
672 	     len > 0;
673 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
674 
675 		idxd_vtophys_iter_init(chan, &vtophys_iter, src, dst, len);
676 
677 		while (len > 0) {
678 			if (first_op == NULL) {
679 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
680 				if (rc) {
681 					goto error;
682 				}
683 
684 				first_op = op;
685 			} else {
686 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
687 				if (rc) {
688 					goto error;
689 				}
690 
691 				first_op->count++;
692 				op->parent = first_op;
693 			}
694 
695 			count++;
696 
697 			src_addr = 0;
698 			dst_addr = 0;
699 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
700 			if (seg_len == SPDK_VTOPHYS_ERROR) {
701 				rc = -EFAULT;
702 				goto error;
703 			}
704 
705 			desc->opcode = IDXD_OPCODE_MEMMOVE;
706 			desc->src_addr = src_addr;
707 			desc->dst_addr = dst_addr;
708 			desc->xfer_size = seg_len;
709 			_update_write_flags(chan, desc);
710 
711 			len -= seg_len;
712 		}
713 	}
714 
715 	return _idxd_flush_batch(chan);
716 
717 error:
718 	chan->batch->index -= count;
719 	return rc;
720 }
721 
722 /* Dual-cast copies the same source to two separate destination buffers. */
723 int
724 spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *dst2,
725 			  const void *src, uint64_t nbytes, int flags,
726 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
727 {
728 	struct idxd_hw_desc *desc;
729 	struct idxd_ops *first_op, *op;
730 	uint64_t src_addr, dst1_addr, dst2_addr;
731 	int rc, count;
732 	uint64_t len;
733 	uint64_t outer_seg_len, inner_seg_len;
734 	struct idxd_vtophys_iter iter_outer, iter_inner;
735 
736 	assert(chan != NULL);
737 	assert(dst1 != NULL);
738 	assert(dst2 != NULL);
739 	assert(src != NULL);
740 
741 	if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) {
742 		SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n");
743 		return -EINVAL;
744 	}
745 
746 	rc = _idxd_setup_batch(chan);
747 	if (rc) {
748 		return rc;
749 	}
750 
751 	idxd_vtophys_iter_init(chan, &iter_outer, src, dst1, nbytes);
752 
753 	first_op = NULL;
754 	count = 0;
755 	while (nbytes > 0) {
756 		src_addr = 0;
757 		dst1_addr = 0;
758 		outer_seg_len = idxd_vtophys_iter_next(&iter_outer, &src_addr, &dst1_addr);
759 		if (outer_seg_len == SPDK_VTOPHYS_ERROR) {
760 			goto error;
761 		}
762 
763 		idxd_vtophys_iter_init(chan, &iter_inner, src, dst2, nbytes);
764 
765 		src += outer_seg_len;
766 		nbytes -= outer_seg_len;
767 
768 		while (outer_seg_len > 0) {
769 			if (first_op == NULL) {
770 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
771 				if (rc) {
772 					goto error;
773 				}
774 
775 				first_op = op;
776 			} else {
777 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
778 				if (rc) {
779 					goto error;
780 				}
781 
782 				first_op->count++;
783 				op->parent = first_op;
784 			}
785 
786 			count++;
787 
788 			src_addr = 0;
789 			dst2_addr = 0;
790 			inner_seg_len = idxd_vtophys_iter_next(&iter_inner, &src_addr, &dst2_addr);
791 			if (inner_seg_len == SPDK_VTOPHYS_ERROR) {
792 				rc = -EFAULT;
793 				goto error;
794 			}
795 
796 			len = spdk_min(outer_seg_len, inner_seg_len);
797 
798 			/* Command specific. */
799 			desc->opcode = IDXD_OPCODE_DUALCAST;
800 			desc->src_addr = src_addr;
801 			desc->dst_addr = dst1_addr;
802 			desc->dest2 = dst2_addr;
803 			desc->xfer_size = len;
804 			_update_write_flags(chan, desc);
805 
806 			dst1_addr += len;
807 			outer_seg_len -= len;
808 		}
809 	}
810 
811 	return _idxd_flush_batch(chan);
812 
813 error:
814 	chan->batch->index -= count;
815 	return rc;
816 }
817 
818 int
819 spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan,
820 			 struct iovec *siov1, size_t siov1cnt,
821 			 struct iovec *siov2, size_t siov2cnt,
822 			 int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
823 {
824 
825 	struct idxd_hw_desc *desc;
826 	struct idxd_ops *first_op, *op;
827 	void *src1, *src2;
828 	uint64_t src1_addr, src2_addr;
829 	int rc, count;
830 	uint64_t len, seg_len;
831 	struct spdk_ioviter iter;
832 	struct idxd_vtophys_iter vtophys_iter;
833 
834 	assert(chan != NULL);
835 	assert(siov1 != NULL);
836 	assert(siov2 != NULL);
837 
838 	rc = _idxd_setup_batch(chan);
839 	if (rc) {
840 		return rc;
841 	}
842 
843 	count = 0;
844 	first_op = NULL;
845 	for (len = spdk_ioviter_first(&iter, siov1, siov1cnt, siov2, siov2cnt, &src1, &src2);
846 	     len > 0;
847 	     len = spdk_ioviter_next(&iter, &src1, &src2)) {
848 
849 		idxd_vtophys_iter_init(chan, &vtophys_iter, src1, src2, len);
850 
851 		while (len > 0) {
852 			if (first_op == NULL) {
853 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
854 				if (rc) {
855 					goto error;
856 				}
857 
858 				first_op = op;
859 			} else {
860 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
861 				if (rc) {
862 					goto error;
863 				}
864 
865 				first_op->count++;
866 				op->parent = first_op;
867 			}
868 
869 			count++;
870 
871 			src1_addr = 0;
872 			src2_addr = 0;
873 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src1_addr, &src2_addr);
874 			if (seg_len == SPDK_VTOPHYS_ERROR) {
875 				rc = -EFAULT;
876 				goto error;
877 			}
878 
879 			desc->opcode = IDXD_OPCODE_COMPARE;
880 			desc->src_addr = src1_addr;
881 			desc->src2_addr = src2_addr;
882 			desc->xfer_size = seg_len;
883 
884 			len -= seg_len;
885 		}
886 	}
887 
888 	return _idxd_flush_batch(chan);
889 
890 error:
891 	chan->batch->index -= count;
892 	return rc;
893 }
894 
895 int
896 spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan,
897 		      struct iovec *diov, size_t diovcnt,
898 		      uint64_t fill_pattern, int flags,
899 		      spdk_idxd_req_cb cb_fn, void *cb_arg)
900 {
901 	struct idxd_hw_desc *desc;
902 	struct idxd_ops *first_op, *op;
903 	uint64_t dst_addr;
904 	int rc, count;
905 	uint64_t len, seg_len;
906 	void *dst;
907 	size_t i;
908 
909 	assert(chan != NULL);
910 	assert(diov != NULL);
911 
912 	rc = _idxd_setup_batch(chan);
913 	if (rc) {
914 		return rc;
915 	}
916 
917 	count = 0;
918 	first_op = NULL;
919 	for (i = 0; i < diovcnt; i++) {
920 		len = diov[i].iov_len;
921 		dst = diov[i].iov_base;
922 
923 		while (len > 0) {
924 			if (first_op == NULL) {
925 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
926 				if (rc) {
927 					goto error;
928 				}
929 
930 				first_op = op;
931 			} else {
932 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
933 				if (rc) {
934 					goto error;
935 				}
936 
937 				first_op->count++;
938 				op->parent = first_op;
939 			}
940 
941 			count++;
942 
943 			seg_len = len;
944 			if (chan->pasid_enabled) {
945 				dst_addr = (uint64_t)dst;
946 			} else {
947 				dst_addr = spdk_vtophys(dst, &seg_len);
948 				if (dst_addr == SPDK_VTOPHYS_ERROR) {
949 					SPDK_ERRLOG("Error translating address\n");
950 					rc = -EFAULT;
951 					goto error;
952 				}
953 			}
954 
955 			seg_len = spdk_min(seg_len, len);
956 
957 			desc->opcode = IDXD_OPCODE_MEMFILL;
958 			desc->pattern = fill_pattern;
959 			desc->dst_addr = dst_addr;
960 			desc->xfer_size = seg_len;
961 			_update_write_flags(chan, desc);
962 
963 			len -= seg_len;
964 			dst += seg_len;
965 		}
966 	}
967 
968 	return _idxd_flush_batch(chan);
969 
970 error:
971 	chan->batch->index -= count;
972 	return rc;
973 }
974 
975 int
976 spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan,
977 			struct iovec *siov, size_t siovcnt,
978 			uint32_t seed, uint32_t *crc_dst, int flags,
979 			spdk_idxd_req_cb cb_fn, void *cb_arg)
980 {
981 	struct idxd_hw_desc *desc;
982 	struct idxd_ops *first_op, *op;
983 	uint64_t src_addr;
984 	int rc, count;
985 	uint64_t len, seg_len;
986 	void *src;
987 	size_t i;
988 	uint64_t prev_crc = 0;
989 
990 	assert(chan != NULL);
991 	assert(siov != NULL);
992 
993 	rc = _idxd_setup_batch(chan);
994 	if (rc) {
995 		return rc;
996 	}
997 
998 	count = 0;
999 	op = NULL;
1000 	first_op = NULL;
1001 	for (i = 0; i < siovcnt; i++) {
1002 		len = siov[i].iov_len;
1003 		src = siov[i].iov_base;
1004 
1005 		while (len > 0) {
1006 			if (first_op == NULL) {
1007 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1008 				if (rc) {
1009 					goto error;
1010 				}
1011 
1012 				first_op = op;
1013 			} else {
1014 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1015 				if (rc) {
1016 					goto error;
1017 				}
1018 
1019 				first_op->count++;
1020 				op->parent = first_op;
1021 			}
1022 
1023 			count++;
1024 
1025 			seg_len = len;
1026 			if (chan->pasid_enabled) {
1027 				src_addr = (uint64_t)src;
1028 			} else {
1029 				src_addr = spdk_vtophys(src, &seg_len);
1030 				if (src_addr == SPDK_VTOPHYS_ERROR) {
1031 					SPDK_ERRLOG("Error translating address\n");
1032 					rc = -EFAULT;
1033 					goto error;
1034 				}
1035 			}
1036 
1037 			seg_len = spdk_min(seg_len, len);
1038 
1039 			desc->opcode = IDXD_OPCODE_CRC32C_GEN;
1040 			desc->src_addr = src_addr;
1041 			if (op == first_op) {
1042 				desc->crc32c.seed = seed;
1043 			} else {
1044 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
1045 				desc->crc32c.addr = prev_crc;
1046 			}
1047 
1048 			desc->xfer_size = seg_len;
1049 			prev_crc = desc->completion_addr + offsetof(struct dsa_hw_comp_record, crc32c_val);
1050 
1051 			len -= seg_len;
1052 			src += seg_len;
1053 		}
1054 	}
1055 
1056 	/* Only the last op copies the crc to the destination */
1057 	if (op) {
1058 		op->crc_dst = crc_dst;
1059 	}
1060 
1061 	return _idxd_flush_batch(chan);
1062 
1063 error:
1064 	chan->batch->index -= count;
1065 	return rc;
1066 }
1067 
1068 int
1069 spdk_idxd_submit_copy_crc32c(struct spdk_idxd_io_channel *chan,
1070 			     struct iovec *diov, size_t diovcnt,
1071 			     struct iovec *siov, size_t siovcnt,
1072 			     uint32_t seed, uint32_t *crc_dst, int flags,
1073 			     spdk_idxd_req_cb cb_fn, void *cb_arg)
1074 {
1075 	struct idxd_hw_desc *desc;
1076 	struct idxd_ops *first_op, *op;
1077 	void *src, *dst;
1078 	uint64_t src_addr, dst_addr;
1079 	int rc, count;
1080 	uint64_t len, seg_len;
1081 	struct spdk_ioviter iter;
1082 	struct idxd_vtophys_iter vtophys_iter;
1083 	uint64_t prev_crc = 0;
1084 
1085 	assert(chan != NULL);
1086 	assert(diov != NULL);
1087 	assert(siov != NULL);
1088 
1089 	rc = _idxd_setup_batch(chan);
1090 	if (rc) {
1091 		return rc;
1092 	}
1093 
1094 	count = 0;
1095 	op = NULL;
1096 	first_op = NULL;
1097 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
1098 	     len > 0;
1099 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
1100 
1101 
1102 		idxd_vtophys_iter_init(chan, &vtophys_iter, src, dst, len);
1103 
1104 		while (len > 0) {
1105 			if (first_op == NULL) {
1106 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1107 				if (rc) {
1108 					goto error;
1109 				}
1110 
1111 				first_op = op;
1112 			} else {
1113 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1114 				if (rc) {
1115 					goto error;
1116 				}
1117 
1118 				first_op->count++;
1119 				op->parent = first_op;
1120 			}
1121 
1122 			count++;
1123 
1124 			src_addr = 0;
1125 			dst_addr = 0;
1126 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
1127 			if (seg_len == SPDK_VTOPHYS_ERROR) {
1128 				rc = -EFAULT;
1129 				goto error;
1130 			}
1131 
1132 			desc->opcode = IDXD_OPCODE_COPY_CRC;
1133 			desc->dst_addr = dst_addr;
1134 			desc->src_addr = src_addr;
1135 			_update_write_flags(chan, desc);
1136 			if (op == first_op) {
1137 				desc->crc32c.seed = seed;
1138 			} else {
1139 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
1140 				desc->crc32c.addr = prev_crc;
1141 			}
1142 
1143 			desc->xfer_size = seg_len;
1144 			prev_crc = desc->completion_addr + offsetof(struct dsa_hw_comp_record, crc32c_val);
1145 
1146 			len -= seg_len;
1147 		}
1148 	}
1149 
1150 	/* Only the last op copies the crc to the destination */
1151 	if (op) {
1152 		op->crc_dst = crc_dst;
1153 	}
1154 
1155 	return _idxd_flush_batch(chan);
1156 
1157 error:
1158 	chan->batch->index -= count;
1159 	return rc;
1160 }
1161 
1162 static inline int
1163 _idxd_submit_compress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1164 			     uint64_t nbytes_dst, uint64_t nbytes_src, uint32_t *output_size,
1165 			     int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1166 {
1167 	struct idxd_hw_desc *desc;
1168 	struct idxd_ops *op;
1169 	uint64_t src_addr, dst_addr;
1170 	int rc;
1171 
1172 	/* Common prep. */
1173 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1174 	if (rc) {
1175 		return rc;
1176 	}
1177 
1178 	rc = _vtophys(chan, src, &src_addr, nbytes_src);
1179 	if (rc) {
1180 		goto error;
1181 	}
1182 
1183 	rc = _vtophys(chan, dst, &dst_addr, nbytes_dst);
1184 	if (rc) {
1185 		goto error;
1186 	}
1187 
1188 	/* Command specific. */
1189 	desc->opcode = IDXD_OPCODE_COMPRESS;
1190 	desc->src1_addr = src_addr;
1191 	desc->dst_addr = dst_addr;
1192 	desc->src1_size = nbytes_src;
1193 	desc->iaa.max_dst_size = nbytes_dst;
1194 	desc->iaa.src2_size = sizeof(struct iaa_aecs);
1195 	desc->iaa.src2_addr = chan->idxd->aecs_addr;
1196 	desc->flags |= IAA_FLAG_RD_SRC2_AECS;
1197 	desc->compr_flags = IAA_COMP_FLAGS;
1198 	op->output_size = output_size;
1199 
1200 	_submit_to_hw(chan, op);
1201 	return 0;
1202 error:
1203 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1204 	return rc;
1205 }
1206 
1207 int
1208 spdk_idxd_submit_compress(struct spdk_idxd_io_channel *chan,
1209 			  void *dst, uint64_t nbytes,
1210 			  struct iovec *siov, uint32_t siovcnt, uint32_t *output_size,
1211 			  int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1212 {
1213 	assert(chan != NULL);
1214 	assert(dst != NULL);
1215 	assert(siov != NULL);
1216 
1217 	if (siovcnt == 1) {
1218 		/* Simple case - copying one buffer to another */
1219 		if (nbytes < siov[0].iov_len) {
1220 			return -EINVAL;
1221 		}
1222 
1223 		return _idxd_submit_compress_single(chan, dst, siov[0].iov_base,
1224 						    nbytes, siov[0].iov_len,
1225 						    output_size, flags, cb_fn, cb_arg);
1226 	}
1227 	/* TODO: vectored support */
1228 	return -EINVAL;
1229 }
1230 
1231 static inline int
1232 _idxd_submit_decompress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1233 			       uint64_t nbytes_dst, uint64_t nbytes, int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1234 {
1235 	struct idxd_hw_desc *desc;
1236 	struct idxd_ops *op;
1237 	uint64_t src_addr, dst_addr;
1238 	int rc;
1239 
1240 	/* Common prep. */
1241 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1242 	if (rc) {
1243 		return rc;
1244 	}
1245 
1246 	rc = _vtophys(chan, src, &src_addr, nbytes);
1247 	if (rc) {
1248 		goto error;
1249 	}
1250 
1251 	rc = _vtophys(chan, dst, &dst_addr, nbytes_dst);
1252 	if (rc) {
1253 		goto error;
1254 	}
1255 
1256 	/* Command specific. */
1257 	desc->opcode = IDXD_OPCODE_DECOMPRESS;
1258 	desc->src1_addr = src_addr;
1259 	desc->dst_addr = dst_addr;
1260 	desc->src1_size = nbytes;
1261 	desc->iaa.max_dst_size = nbytes_dst;
1262 	desc->decompr_flags = IAA_DECOMP_FLAGS;
1263 
1264 	_submit_to_hw(chan, op);
1265 	return 0;
1266 error:
1267 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1268 	return rc;
1269 }
1270 
1271 int
1272 spdk_idxd_submit_decompress(struct spdk_idxd_io_channel *chan,
1273 			    struct iovec *diov, uint32_t diovcnt,
1274 			    struct iovec *siov, uint32_t siovcnt,
1275 			    int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1276 {
1277 	assert(chan != NULL);
1278 	assert(diov != NULL);
1279 	assert(siov != NULL);
1280 
1281 	if (diovcnt == 1 && siovcnt == 1) {
1282 		/* Simple case - copying one buffer to another */
1283 		if (diov[0].iov_len < siov[0].iov_len) {
1284 			return -EINVAL;
1285 		}
1286 
1287 		return _idxd_submit_decompress_single(chan, diov[0].iov_base, siov[0].iov_base,
1288 						      diov[0].iov_len, siov[0].iov_len,
1289 						      flags, cb_fn, cb_arg);
1290 	}
1291 	/* TODO: vectored support */
1292 	return -EINVAL;
1293 }
1294 
1295 static inline int
1296 idxd_get_dif_flags(const struct spdk_dif_ctx *ctx, uint8_t *flags)
1297 {
1298 	uint32_t data_block_size = ctx->block_size - ctx->md_size;
1299 
1300 	if (flags == NULL) {
1301 		SPDK_ERRLOG("Flag should be non-null");
1302 		return -EINVAL;
1303 	}
1304 
1305 	switch (ctx->guard_interval) {
1306 	case DATA_BLOCK_SIZE_512:
1307 		*flags = IDXD_DIF_FLAG_DIF_BLOCK_SIZE_512;
1308 		break;
1309 	case DATA_BLOCK_SIZE_520:
1310 		*flags = IDXD_DIF_FLAG_DIF_BLOCK_SIZE_520;
1311 		break;
1312 	case DATA_BLOCK_SIZE_4096:
1313 		*flags = IDXD_DIF_FLAG_DIF_BLOCK_SIZE_4096;
1314 		break;
1315 	case DATA_BLOCK_SIZE_4104:
1316 		*flags = IDXD_DIF_FLAG_DIF_BLOCK_SIZE_4104;
1317 		break;
1318 	default:
1319 		SPDK_ERRLOG("Invalid DIF block size %d\n", data_block_size);
1320 		return -EINVAL;
1321 	}
1322 
1323 	return 0;
1324 }
1325 
1326 static inline int
1327 idxd_get_source_dif_flags(const struct spdk_dif_ctx *ctx, uint8_t *flags)
1328 {
1329 	if (flags == NULL) {
1330 		SPDK_ERRLOG("Flag should be non-null");
1331 		return -EINVAL;
1332 	}
1333 
1334 	*flags = 0;
1335 
1336 	if (!(ctx->dif_flags & SPDK_DIF_FLAGS_GUARD_CHECK)) {
1337 		*flags |= IDXD_DIF_SOURCE_FLAG_GUARD_CHECK_DISABLE;
1338 	}
1339 
1340 	if (!(ctx->dif_flags & SPDK_DIF_FLAGS_REFTAG_CHECK)) {
1341 		*flags |= IDXD_DIF_SOURCE_FLAG_REF_TAG_CHECK_DISABLE;
1342 	}
1343 
1344 	switch (ctx->dif_type) {
1345 	case SPDK_DIF_TYPE1:
1346 	case SPDK_DIF_TYPE2:
1347 		/* If Type 1 or 2 is used, then all DIF checks are disabled when
1348 		 * the Application Tag is 0xFFFF.
1349 		 */
1350 		*flags |= IDXD_DIF_SOURCE_FLAG_APP_TAG_F_DETECT;
1351 		break;
1352 	case SPDK_DIF_TYPE3:
1353 		/* If Type 3 is used, then all DIF checks are disabled when the
1354 		 * Application Tag is 0xFFFF and the Reference Tag is 0xFFFFFFFF
1355 		 * (for PI 8 bytes format).
1356 		 */
1357 		*flags |= IDXD_DIF_SOURCE_FLAG_APP_AND_REF_TAG_F_DETECT;
1358 		break;
1359 	default:
1360 		SPDK_ERRLOG("Invalid DIF type %d\n", ctx->dif_type);
1361 		return -EINVAL;
1362 	}
1363 
1364 	return 0;
1365 }
1366 
1367 static inline int
1368 idxd_get_app_tag_mask(const struct spdk_dif_ctx *ctx, uint16_t *app_tag_mask)
1369 {
1370 	if (!(ctx->dif_flags & SPDK_DIF_FLAGS_APPTAG_CHECK)) {
1371 		/* The Source Application Tag Mask may be set to 0xffff
1372 		 * to disable application tag checking */
1373 		*app_tag_mask = 0xFFFF;
1374 	} else {
1375 		*app_tag_mask = ~ctx->apptag_mask;
1376 	}
1377 
1378 	return 0;
1379 }
1380 
1381 static inline int
1382 idxd_validate_dif_common_params(const struct spdk_dif_ctx *ctx)
1383 {
1384 	uint32_t data_block_size = ctx->block_size - ctx->md_size;
1385 
1386 	/* Check byte offset from the start of the whole data buffer */
1387 	if (ctx->data_offset != 0) {
1388 		SPDK_ERRLOG("Byte offset from the start of the whole data buffer must be set to 0.");
1389 		return -EINVAL;
1390 	}
1391 
1392 	/* Check seed value for guard computation */
1393 	if (ctx->guard_seed != 0) {
1394 		SPDK_ERRLOG("Seed value for guard computation must be set to 0.");
1395 		return -EINVAL;
1396 	}
1397 
1398 	/* Check for supported metadata sizes */
1399 	if (ctx->md_size != METADATA_SIZE_8 && ctx->md_size != METADATA_SIZE_16)  {
1400 		SPDK_ERRLOG("Metadata size %d is not supported.\n", ctx->md_size);
1401 		return -EINVAL;
1402 	}
1403 
1404 	/* Check for supported DIF PI formats */
1405 	if (ctx->dif_pi_format != SPDK_DIF_PI_FORMAT_16) {
1406 		SPDK_ERRLOG("DIF PI format %d is not supported.\n", ctx->dif_pi_format);
1407 		return -EINVAL;
1408 	}
1409 
1410 	/* Check for supported metadata locations */
1411 	if (ctx->md_interleave == false) {
1412 		SPDK_ERRLOG("Separated metadata location is not supported.\n");
1413 		return -EINVAL;
1414 	}
1415 
1416 	/* Check for supported DIF alignments */
1417 	if (ctx->md_size == METADATA_SIZE_16 &&
1418 	    (ctx->guard_interval == DATA_BLOCK_SIZE_512 ||
1419 	     ctx->guard_interval == DATA_BLOCK_SIZE_4096)) {
1420 		SPDK_ERRLOG("DIF left alignment in metadata is not supported.\n");
1421 		return -EINVAL;
1422 	}
1423 
1424 	/* Check for supported DIF block sizes */
1425 	if (data_block_size != DATA_BLOCK_SIZE_512 &&
1426 	    data_block_size != DATA_BLOCK_SIZE_4096) {
1427 		SPDK_ERRLOG("DIF block size %d is not supported.\n", data_block_size);
1428 		return -EINVAL;
1429 	}
1430 
1431 	return 0;
1432 }
1433 
1434 static inline int
1435 idxd_validate_dif_check_params(const struct spdk_dif_ctx *ctx)
1436 {
1437 	int rc = idxd_validate_dif_common_params(ctx);
1438 	if (rc) {
1439 		return rc;
1440 	}
1441 
1442 	return 0;
1443 }
1444 
1445 static inline int
1446 idxd_validate_dif_check_buf_align(const struct spdk_dif_ctx *ctx, const uint64_t len)
1447 {
1448 	/* DSA can only process contiguous memory buffers, multiple of the block size */
1449 	if (len % ctx->block_size != 0) {
1450 		SPDK_ERRLOG("The memory buffer length (%ld) is not a multiple of block size with metadata (%d).\n",
1451 			    len, ctx->block_size);
1452 		return -EINVAL;
1453 	}
1454 
1455 	return 0;
1456 }
1457 
1458 int
1459 spdk_idxd_submit_dif_check(struct spdk_idxd_io_channel *chan,
1460 			   struct iovec *siov, size_t siovcnt,
1461 			   uint32_t num_blocks, const struct spdk_dif_ctx *ctx, int flags,
1462 			   spdk_idxd_req_cb cb_fn, void *cb_arg)
1463 {
1464 	struct idxd_hw_desc *desc;
1465 	struct idxd_ops *first_op = NULL, *op = NULL;
1466 	uint64_t src_seg_addr, src_seg_len;
1467 	uint32_t num_blocks_done = 0;
1468 	uint8_t dif_flags = 0, src_dif_flags = 0;
1469 	uint16_t app_tag_mask = 0;
1470 	int rc, count = 0;
1471 	size_t i;
1472 
1473 	assert(ctx != NULL);
1474 	assert(chan != NULL);
1475 	assert(siov != NULL);
1476 
1477 	rc = idxd_validate_dif_check_params(ctx);
1478 	if (rc) {
1479 		return rc;
1480 	}
1481 
1482 	rc = idxd_get_dif_flags(ctx, &dif_flags);
1483 	if (rc) {
1484 		return rc;
1485 	}
1486 
1487 	rc = idxd_get_source_dif_flags(ctx, &src_dif_flags);
1488 	if (rc) {
1489 		return rc;
1490 	}
1491 
1492 	rc = idxd_get_app_tag_mask(ctx, &app_tag_mask);
1493 	if (rc) {
1494 		return rc;
1495 	}
1496 
1497 	rc = _idxd_setup_batch(chan);
1498 	if (rc) {
1499 		return rc;
1500 	}
1501 
1502 	for (i = 0; i < siovcnt; i++) {
1503 		src_seg_addr = (uint64_t)siov[i].iov_base;
1504 		src_seg_len = siov[i].iov_len;
1505 
1506 		/* DSA processes the iovec buffers independently, so the buffers cannot
1507 		 * be split (must be multiple of the block size) */
1508 
1509 		/* Validate the memory buffer alignment */
1510 		rc = idxd_validate_dif_check_buf_align(ctx, src_seg_len);
1511 		if (rc) {
1512 			goto error;
1513 		}
1514 
1515 		if (first_op == NULL) {
1516 			rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1517 			if (rc) {
1518 				goto error;
1519 			}
1520 
1521 			first_op = op;
1522 		} else {
1523 			rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1524 			if (rc) {
1525 				goto error;
1526 			}
1527 
1528 			first_op->count++;
1529 			op->parent = first_op;
1530 		}
1531 
1532 		count++;
1533 
1534 		desc->opcode = IDXD_OPCODE_DIF_CHECK;
1535 		desc->src_addr = src_seg_addr;
1536 		desc->xfer_size = src_seg_len;
1537 		desc->dif_chk.flags = dif_flags;
1538 		desc->dif_chk.src_flags = src_dif_flags;
1539 		desc->dif_chk.app_tag_seed = ctx->app_tag;
1540 		desc->dif_chk.app_tag_mask = app_tag_mask;
1541 		desc->dif_chk.ref_tag_seed = (uint32_t)ctx->init_ref_tag + num_blocks_done;
1542 
1543 		num_blocks_done += (src_seg_len / ctx->block_size);
1544 	}
1545 
1546 	return _idxd_flush_batch(chan);
1547 
1548 error:
1549 	chan->batch->index -= count;
1550 	return rc;
1551 }
1552 
1553 static inline int
1554 idxd_validate_dif_insert_params(const struct spdk_dif_ctx *ctx)
1555 {
1556 	int rc = idxd_validate_dif_common_params(ctx);
1557 	if (rc) {
1558 		return rc;
1559 	}
1560 
1561 	/* Check for required DIF flags */
1562 	if (!(ctx->dif_flags & SPDK_DIF_FLAGS_GUARD_CHECK))  {
1563 		SPDK_ERRLOG("Guard check flag must be set.\n");
1564 		return -EINVAL;
1565 	}
1566 
1567 	if (!(ctx->dif_flags & SPDK_DIF_FLAGS_APPTAG_CHECK))  {
1568 		SPDK_ERRLOG("Application Tag check flag must be set.\n");
1569 		return -EINVAL;
1570 	}
1571 
1572 	if (!(ctx->dif_flags & SPDK_DIF_FLAGS_REFTAG_CHECK))  {
1573 		SPDK_ERRLOG("Reference Tag check flag must be set.\n");
1574 		return -EINVAL;
1575 	}
1576 
1577 	return 0;
1578 }
1579 
1580 static inline int
1581 idxd_validate_dif_insert_iovecs(const struct spdk_dif_ctx *ctx,
1582 				const struct iovec *diov, const size_t diovcnt,
1583 				const struct iovec *siov, const size_t siovcnt)
1584 {
1585 	uint32_t data_block_size = ctx->block_size - ctx->md_size;
1586 	size_t src_len, dst_len;
1587 	uint32_t num_blocks;
1588 	size_t i;
1589 
1590 	if (diovcnt != siovcnt) {
1591 		SPDK_ERRLOG("Invalid number of elements in src (%ld) and dst (%ld) iovecs.\n",
1592 			    siovcnt, diovcnt);
1593 		return -EINVAL;
1594 	}
1595 
1596 	for (i = 0; i < siovcnt; i++) {
1597 		src_len = siov[i].iov_len;
1598 		dst_len = diov[i].iov_len;
1599 		num_blocks = src_len / data_block_size;
1600 		if (src_len != dst_len - num_blocks * ctx->md_size) {
1601 			SPDK_ERRLOG("Invalid length of data in src (%ld) and dst (%ld) in iovecs[%ld].\n",
1602 				    src_len, dst_len, i);
1603 			return -EINVAL;
1604 		}
1605 	}
1606 
1607 	return 0;
1608 }
1609 
1610 static inline int
1611 idxd_validate_dif_insert_buf_align(const struct spdk_dif_ctx *ctx,
1612 				   const uint64_t src_len, const uint64_t dst_len)
1613 {
1614 	uint32_t data_block_size = ctx->block_size - ctx->md_size;
1615 
1616 	/* DSA can only process contiguous memory buffers, multiple of the block size */
1617 	if (src_len % data_block_size != 0) {
1618 		SPDK_ERRLOG("The memory source buffer length (%ld) is not a multiple of block size without metadata (%d).\n",
1619 			    src_len, data_block_size);
1620 		return -EINVAL;
1621 	}
1622 
1623 	if (dst_len % ctx->block_size != 0) {
1624 		SPDK_ERRLOG("The memory destination buffer length (%ld) is not a multiple of block size with metadata (%d).\n",
1625 			    dst_len, ctx->block_size);
1626 		return -EINVAL;
1627 	}
1628 
1629 	/* The memory source and destiantion must hold the same number of blocks. */
1630 	if (src_len / data_block_size != (dst_len / ctx->block_size)) {
1631 		SPDK_ERRLOG("The memory source (%ld) and destiantion (%ld) must hold the same number of blocks.\n",
1632 			    src_len / data_block_size, dst_len / ctx->block_size);
1633 		return -EINVAL;
1634 	}
1635 
1636 	return 0;
1637 }
1638 
1639 int
1640 spdk_idxd_submit_dif_insert(struct spdk_idxd_io_channel *chan,
1641 			    struct iovec *diov, size_t diovcnt,
1642 			    struct iovec *siov, size_t siovcnt,
1643 			    uint32_t num_blocks, const struct spdk_dif_ctx *ctx, int flags,
1644 			    spdk_idxd_req_cb cb_fn, void *cb_arg)
1645 {
1646 	struct idxd_hw_desc *desc;
1647 	struct idxd_ops *first_op = NULL, *op = NULL;
1648 	uint32_t data_block_size = ctx->block_size - ctx->md_size;
1649 	uint64_t src_seg_addr, src_seg_len;
1650 	uint64_t dst_seg_addr, dst_seg_len;
1651 	uint32_t num_blocks_done = 0;
1652 	uint8_t dif_flags = 0;
1653 	int rc, count = 0;
1654 	size_t i;
1655 
1656 	assert(ctx != NULL);
1657 	assert(chan != NULL);
1658 	assert(siov != NULL);
1659 
1660 	rc = idxd_validate_dif_insert_params(ctx);
1661 	if (rc) {
1662 		return rc;
1663 	}
1664 
1665 	rc = idxd_validate_dif_insert_iovecs(ctx, diov, diovcnt, siov, siovcnt);
1666 	if (rc) {
1667 		return rc;
1668 	}
1669 
1670 	rc = idxd_get_dif_flags(ctx, &dif_flags);
1671 	if (rc) {
1672 		return rc;
1673 	}
1674 
1675 	rc = _idxd_setup_batch(chan);
1676 	if (rc) {
1677 		return rc;
1678 	}
1679 
1680 	for (i = 0; i < siovcnt; i++) {
1681 		src_seg_addr = (uint64_t)siov[i].iov_base;
1682 		src_seg_len = siov[i].iov_len;
1683 		dst_seg_addr = (uint64_t)diov[i].iov_base;
1684 		dst_seg_len = diov[i].iov_len;
1685 
1686 		/* DSA processes the iovec buffers independently, so the buffers cannot
1687 		 * be split (must be multiple of the block size). The destination memory
1688 		 * size needs to be same as the source memory size + metadata size */
1689 
1690 		rc = idxd_validate_dif_insert_buf_align(ctx, src_seg_len, dst_seg_len);
1691 		if (rc) {
1692 			goto error;
1693 		}
1694 
1695 		if (first_op == NULL) {
1696 			rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1697 			if (rc) {
1698 				goto error;
1699 			}
1700 
1701 			first_op = op;
1702 		} else {
1703 			rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1704 			if (rc) {
1705 				goto error;
1706 			}
1707 
1708 			first_op->count++;
1709 			op->parent = first_op;
1710 		}
1711 
1712 		count++;
1713 
1714 		desc->opcode = IDXD_OPCODE_DIF_INS;
1715 		desc->src_addr = src_seg_addr;
1716 		desc->dst_addr = dst_seg_addr;
1717 		desc->xfer_size = src_seg_len;
1718 		desc->dif_ins.flags = dif_flags;
1719 		desc->dif_ins.app_tag_seed = ctx->app_tag;
1720 		desc->dif_ins.app_tag_mask = ~ctx->apptag_mask;
1721 		desc->dif_ins.ref_tag_seed = (uint32_t)ctx->init_ref_tag + num_blocks_done;
1722 
1723 		num_blocks_done += src_seg_len / data_block_size;
1724 	}
1725 
1726 	return _idxd_flush_batch(chan);
1727 
1728 error:
1729 	chan->batch->index -= count;
1730 	return rc;
1731 }
1732 
1733 static inline int
1734 idxd_validate_dif_strip_buf_align(const struct spdk_dif_ctx *ctx,
1735 				  const uint64_t src_len, const uint64_t dst_len)
1736 {
1737 	uint32_t data_block_size = ctx->block_size - ctx->md_size;
1738 
1739 	/* DSA can only process contiguous memory buffers, multiple of the block size. */
1740 	if (src_len % ctx->block_size != 0) {
1741 		SPDK_ERRLOG("The src buffer length (%ld) is not a multiple of block size (%d).\n",
1742 			    src_len, ctx->block_size);
1743 		return -EINVAL;
1744 	}
1745 	if (dst_len % data_block_size != 0) {
1746 		SPDK_ERRLOG("The dst buffer length (%ld) is not a multiple of block size without metadata (%d).\n",
1747 			    dst_len, data_block_size);
1748 		return -EINVAL;
1749 	}
1750 	/* The memory source and destiantion must hold the same number of blocks. */
1751 	if (src_len / ctx->block_size != dst_len / data_block_size) {
1752 		SPDK_ERRLOG("The memory source (%ld) and destiantion (%ld) must hold the same number of blocks.\n",
1753 			    src_len / data_block_size, dst_len / ctx->block_size);
1754 		return -EINVAL;
1755 	}
1756 	return 0;
1757 }
1758 
1759 int
1760 spdk_idxd_submit_dif_strip(struct spdk_idxd_io_channel *chan,
1761 			   struct iovec *diov, size_t diovcnt,
1762 			   struct iovec *siov, size_t siovcnt,
1763 			   uint32_t num_blocks, const struct spdk_dif_ctx *ctx, int flags,
1764 			   spdk_idxd_req_cb cb_fn, void *cb_arg)
1765 {
1766 	struct idxd_hw_desc *desc;
1767 	struct idxd_ops *first_op = NULL, *op = NULL;
1768 	uint64_t src_seg_addr, src_seg_len;
1769 	uint64_t dst_seg_addr, dst_seg_len;
1770 	uint8_t dif_flags = 0, src_dif_flags = 0;
1771 	uint16_t app_tag_mask = 0;
1772 	int rc, count = 0;
1773 	size_t i;
1774 
1775 	rc = idxd_validate_dif_common_params(ctx);
1776 	if (rc) {
1777 		return rc;
1778 	}
1779 
1780 	rc = idxd_get_dif_flags(ctx, &dif_flags);
1781 	if (rc) {
1782 		return rc;
1783 	}
1784 
1785 	rc = idxd_get_source_dif_flags(ctx, &src_dif_flags);
1786 	if (rc) {
1787 		return rc;
1788 	}
1789 
1790 	rc = idxd_get_app_tag_mask(ctx, &app_tag_mask);
1791 	if (rc) {
1792 		return rc;
1793 	}
1794 
1795 	rc = _idxd_setup_batch(chan);
1796 	if (rc) {
1797 		return rc;
1798 	}
1799 
1800 	if (diovcnt != siovcnt) {
1801 		SPDK_ERRLOG("Mismatched iovcnts: src=%ld, dst=%ld\n",
1802 			    siovcnt, diovcnt);
1803 		return -EINVAL;
1804 	}
1805 
1806 	for (i = 0; i < siovcnt; i++) {
1807 		src_seg_addr = (uint64_t)siov[i].iov_base;
1808 		src_seg_len = siov[i].iov_len;
1809 		dst_seg_addr = (uint64_t)diov[i].iov_base;
1810 		dst_seg_len = diov[i].iov_len;
1811 
1812 		/* DSA processes the iovec buffers independently, so the buffers cannot
1813 		 * be split (must be multiple of the block size). The source memory
1814 		 * size needs to be same as the destination memory size + metadata size */
1815 
1816 		rc = idxd_validate_dif_strip_buf_align(ctx, src_seg_len, dst_seg_len);
1817 		if (rc) {
1818 			goto error;
1819 		}
1820 
1821 		if (first_op == NULL) {
1822 			rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1823 			if (rc) {
1824 				goto error;
1825 			}
1826 
1827 			first_op = op;
1828 		} else {
1829 			rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1830 			if (rc) {
1831 				goto error;
1832 			}
1833 
1834 			first_op->count++;
1835 			op->parent = first_op;
1836 		}
1837 
1838 		count++;
1839 
1840 		desc->opcode = IDXD_OPCODE_DIF_STRP;
1841 		desc->src_addr = src_seg_addr;
1842 		desc->dst_addr = dst_seg_addr;
1843 		desc->xfer_size = src_seg_len;
1844 		desc->dif_strip.flags = dif_flags;
1845 		desc->dif_strip.src_flags = src_dif_flags;
1846 		desc->dif_strip.app_tag_seed = ctx->app_tag;
1847 		desc->dif_strip.app_tag_mask = app_tag_mask;
1848 		desc->dif_strip.ref_tag_seed = (uint32_t)ctx->init_ref_tag;
1849 	}
1850 
1851 	return _idxd_flush_batch(chan);
1852 
1853 error:
1854 	chan->batch->index -= count;
1855 	return rc;
1856 }
1857 
1858 int
1859 spdk_idxd_submit_raw_desc(struct spdk_idxd_io_channel *chan,
1860 			  struct idxd_hw_desc *_desc,
1861 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
1862 {
1863 	struct idxd_hw_desc *desc;
1864 	struct idxd_ops *op;
1865 	int rc, flags = 0;
1866 	uint64_t comp_addr;
1867 
1868 	assert(chan != NULL);
1869 	assert(_desc != NULL);
1870 
1871 	/* Common prep. */
1872 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1873 	if (rc) {
1874 		return rc;
1875 	}
1876 
1877 	/* Command specific. */
1878 	flags = desc->flags;
1879 	comp_addr = desc->completion_addr;
1880 	memcpy(desc, _desc, sizeof(*desc));
1881 	desc->flags |= flags;
1882 	desc->completion_addr = comp_addr;
1883 
1884 	/* Submit operation. */
1885 	_submit_to_hw(chan, op);
1886 
1887 	return 0;
1888 }
1889 
1890 static inline void
1891 _dump_sw_error_reg(struct spdk_idxd_io_channel *chan)
1892 {
1893 	struct spdk_idxd_device *idxd = chan->idxd;
1894 
1895 	assert(idxd != NULL);
1896 	idxd->impl->dump_sw_error(idxd, chan->portal);
1897 }
1898 
1899 /* TODO: more performance experiments. */
1900 #define IDXD_COMPLETION(x) ((x) > (0) ? (1) : (0))
1901 #define IDXD_FAILURE(x) ((x) > (1) ? (1) : (0))
1902 #define IDXD_SW_ERROR(x) ((x) &= (0x1) ? (1) : (0))
1903 int
1904 spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
1905 {
1906 	struct idxd_ops *op, *tmp, *parent_op;
1907 	int status = 0;
1908 	int rc2, rc = 0;
1909 	void *cb_arg;
1910 	spdk_idxd_req_cb cb_fn;
1911 
1912 	assert(chan != NULL);
1913 
1914 	STAILQ_FOREACH_SAFE(op, &chan->ops_outstanding, link, tmp) {
1915 		if (!IDXD_COMPLETION(op->hw.status)) {
1916 			/*
1917 			 * oldest locations are at the head of the list so if
1918 			 * we've polled a location that hasn't completed, bail
1919 			 * now as there are unlikely to be any more completions.
1920 			 */
1921 			break;
1922 		}
1923 
1924 		STAILQ_REMOVE_HEAD(&chan->ops_outstanding, link);
1925 		rc++;
1926 
1927 		/* Status is in the same location for both IAA and DSA completion records. */
1928 		if (spdk_unlikely(IDXD_FAILURE(op->hw.status))) {
1929 			SPDK_ERRLOG("Completion status 0x%x\n", op->hw.status);
1930 			status = -EINVAL;
1931 			_dump_sw_error_reg(chan);
1932 		}
1933 
1934 		switch (op->desc->opcode) {
1935 		case IDXD_OPCODE_BATCH:
1936 			SPDK_DEBUGLOG(idxd, "Complete batch %p\n", op->batch);
1937 			break;
1938 		case IDXD_OPCODE_CRC32C_GEN:
1939 		case IDXD_OPCODE_COPY_CRC:
1940 			if (spdk_likely(status == 0 && op->crc_dst != NULL)) {
1941 				*op->crc_dst = op->hw.crc32c_val;
1942 				*op->crc_dst ^= ~0;
1943 			}
1944 			break;
1945 		case IDXD_OPCODE_COMPARE:
1946 			if (spdk_likely(status == 0)) {
1947 				status = op->hw.result;
1948 			}
1949 			break;
1950 		case IDXD_OPCODE_COMPRESS:
1951 			if (spdk_likely(status == 0 && op->output_size != NULL)) {
1952 				*op->output_size = op->iaa_hw.output_size;
1953 			}
1954 			break;
1955 		case IDXD_OPCODE_DIF_CHECK:
1956 		case IDXD_OPCODE_DIF_STRP:
1957 			if (spdk_unlikely(op->hw.status == IDXD_DSA_STATUS_DIF_ERROR)) {
1958 				status = -EIO;
1959 			}
1960 			break;
1961 		}
1962 
1963 		/* TODO: WHAT IF THIS FAILED!? */
1964 		op->hw.status = 0;
1965 
1966 		assert(op->count > 0);
1967 		op->count--;
1968 
1969 		parent_op = op->parent;
1970 		if (parent_op != NULL) {
1971 			assert(parent_op->count > 0);
1972 			parent_op->count--;
1973 
1974 			if (parent_op->count == 0) {
1975 				cb_fn = parent_op->cb_fn;
1976 				cb_arg = parent_op->cb_arg;
1977 
1978 				assert(parent_op->batch != NULL);
1979 
1980 				/*
1981 				 * Now that parent_op count is 0, we can release its ref
1982 				 * to its batch. We have not released the ref to the batch
1983 				 * that the op is pointing to yet, which will be done below.
1984 				 */
1985 				parent_op->batch->refcnt--;
1986 				if (parent_op->batch->refcnt == 0) {
1987 					_free_batch(parent_op->batch, chan);
1988 				}
1989 
1990 				if (cb_fn) {
1991 					cb_fn(cb_arg, status);
1992 				}
1993 			}
1994 		}
1995 
1996 		if (op->count == 0) {
1997 			cb_fn = op->cb_fn;
1998 			cb_arg = op->cb_arg;
1999 
2000 			if (op->batch != NULL) {
2001 				assert(op->batch->refcnt > 0);
2002 				op->batch->refcnt--;
2003 
2004 				if (op->batch->refcnt == 0) {
2005 					_free_batch(op->batch, chan);
2006 				}
2007 			} else {
2008 				STAILQ_INSERT_HEAD(&chan->ops_pool, op, link);
2009 			}
2010 
2011 			if (cb_fn) {
2012 				cb_fn(cb_arg, status);
2013 			}
2014 		}
2015 
2016 		/* reset the status */
2017 		status = 0;
2018 		/* break the processing loop to prevent from starving the rest of the system */
2019 		if (rc > IDXD_MAX_COMPLETIONS) {
2020 			break;
2021 		}
2022 	}
2023 
2024 	/* Submit any built-up batch */
2025 	if (chan->batch) {
2026 		rc2 = idxd_batch_submit(chan, NULL, NULL);
2027 		if (rc2) {
2028 			assert(rc2 == -EBUSY);
2029 		}
2030 	}
2031 
2032 	return rc;
2033 }
2034 
2035 void
2036 idxd_impl_register(struct spdk_idxd_impl *impl)
2037 {
2038 	STAILQ_INSERT_HEAD(&g_idxd_impls, impl, link);
2039 }
2040 
2041 SPDK_LOG_REGISTER_COMPONENT(idxd)
2042