xref: /spdk/lib/idxd/idxd.c (revision a5f87f39127c7e0da8d9c4fcd042a27e350be84e)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/env.h"
9 #include "spdk/util.h"
10 #include "spdk/memory.h"
11 #include "spdk/likely.h"
12 
13 #include "spdk/log.h"
14 #include "spdk_internal/idxd.h"
15 
16 #include "idxd_internal.h"
17 
18 #define ALIGN_4K 0x1000
19 #define USERSPACE_DRIVER_NAME "user"
20 #define KERNEL_DRIVER_NAME "kernel"
21 
22 /* The max number of completions processed per poll */
23 #define IDXD_MAX_COMPLETIONS      128
24 
25 /* The minimum number of entries in batch per flush */
26 #define IDXD_MIN_BATCH_FLUSH      32
27 
28 static STAILQ_HEAD(, spdk_idxd_impl) g_idxd_impls = STAILQ_HEAD_INITIALIZER(g_idxd_impls);
29 static struct spdk_idxd_impl *g_idxd_impl;
30 
31 uint32_t
32 spdk_idxd_get_socket(struct spdk_idxd_device *idxd)
33 {
34 	return idxd->socket_id;
35 }
36 
37 static inline void
38 _submit_to_hw(struct spdk_idxd_io_channel *chan, struct idxd_ops *op)
39 {
40 	STAILQ_INSERT_TAIL(&chan->ops_outstanding, op, link);
41 	/*
42 	 * We must barrier before writing the descriptor to ensure that data
43 	 * has been correctly flushed from the associated data buffers before DMA
44 	 * operations begin.
45 	 */
46 	_spdk_wmb();
47 	movdir64b(chan->portal + chan->portal_offset, op->desc);
48 	chan->portal_offset = (chan->portal_offset + chan->idxd->chan_per_device * PORTAL_STRIDE) &
49 			      PORTAL_MASK;
50 }
51 
52 inline static int
53 _vtophys(struct spdk_idxd_io_channel *chan, const void *buf, uint64_t *buf_addr, uint64_t size)
54 {
55 	uint64_t updated_size = size;
56 
57 	if (chan->pasid_enabled) {
58 		/* We can just use virtual addresses */
59 		*buf_addr = (uint64_t)buf;
60 		return 0;
61 	}
62 
63 	*buf_addr = spdk_vtophys(buf, &updated_size);
64 
65 	if (*buf_addr == SPDK_VTOPHYS_ERROR) {
66 		SPDK_ERRLOG("Error translating address\n");
67 		return -EINVAL;
68 	}
69 
70 	if (updated_size < size) {
71 		SPDK_ERRLOG("Error translating size (0x%lx), return size (0x%lx)\n", size, updated_size);
72 		return -EINVAL;
73 	}
74 
75 	return 0;
76 }
77 
78 struct idxd_vtophys_iter {
79 	const void	*src;
80 	void		*dst;
81 	uint64_t	len;
82 
83 	uint64_t	offset;
84 
85 	bool		pasid_enabled;
86 };
87 
88 static void
89 idxd_vtophys_iter_init(struct spdk_idxd_io_channel *chan,
90 		       struct idxd_vtophys_iter *iter,
91 		       const void *src, void *dst, uint64_t len)
92 {
93 	iter->src = src;
94 	iter->dst = dst;
95 	iter->len = len;
96 	iter->offset = 0;
97 	iter->pasid_enabled = chan->pasid_enabled;
98 }
99 
100 static uint64_t
101 idxd_vtophys_iter_next(struct idxd_vtophys_iter *iter,
102 		       uint64_t *src_phys, uint64_t *dst_phys)
103 {
104 	uint64_t src_off, dst_off, len;
105 	const void *src;
106 	void *dst;
107 
108 	src = iter->src + iter->offset;
109 	dst = iter->dst + iter->offset;
110 
111 	if (iter->offset == iter->len) {
112 		return 0;
113 	}
114 
115 	if (iter->pasid_enabled) {
116 		*src_phys = (uint64_t)src;
117 		*dst_phys = (uint64_t)dst;
118 		return iter->len;
119 	}
120 
121 	len = iter->len - iter->offset;
122 
123 	src_off = len;
124 	*src_phys = spdk_vtophys(src, &src_off);
125 	if (*src_phys == SPDK_VTOPHYS_ERROR) {
126 		SPDK_ERRLOG("Error translating address\n");
127 		return SPDK_VTOPHYS_ERROR;
128 	}
129 
130 	dst_off = len;
131 	*dst_phys = spdk_vtophys(dst, &dst_off);
132 	if (*dst_phys == SPDK_VTOPHYS_ERROR) {
133 		SPDK_ERRLOG("Error translating address\n");
134 		return SPDK_VTOPHYS_ERROR;
135 	}
136 
137 	len = spdk_min(src_off, dst_off);
138 	iter->offset += len;
139 
140 	return len;
141 }
142 
143 /* helper function for DSA specific spdk_idxd_get_channel() stuff */
144 static int
145 _dsa_alloc_batches(struct spdk_idxd_io_channel *chan, int num_descriptors)
146 {
147 	struct idxd_batch *batch;
148 	struct idxd_hw_desc *desc;
149 	struct idxd_ops *op;
150 	int i, j, num_batches, rc = -1;
151 
152 	/* Allocate batches */
153 	num_batches = num_descriptors;
154 	chan->batch_base = calloc(num_batches, sizeof(struct idxd_batch));
155 	if (chan->batch_base == NULL) {
156 		SPDK_ERRLOG("Failed to allocate batch pool\n");
157 		goto error_desc;
158 	}
159 	batch = chan->batch_base;
160 	for (i = 0 ; i < num_batches ; i++) {
161 		batch->size = chan->idxd->batch_size;
162 		batch->user_desc = desc = spdk_zmalloc(batch->size * sizeof(struct idxd_hw_desc),
163 						       0x40, NULL,
164 						       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
165 		if (batch->user_desc == NULL) {
166 			SPDK_ERRLOG("Failed to allocate batch descriptor memory\n");
167 			goto error_user;
168 		}
169 
170 		rc = _vtophys(chan, batch->user_desc, &batch->user_desc_addr,
171 			      batch->size * sizeof(struct idxd_hw_desc));
172 		if (rc) {
173 			SPDK_ERRLOG("Failed to translate batch descriptor memory\n");
174 			goto error_user;
175 		}
176 
177 		batch->user_ops = op = spdk_zmalloc(batch->size * sizeof(struct idxd_ops),
178 						    0x40, NULL,
179 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
180 		if (batch->user_ops == NULL) {
181 			SPDK_ERRLOG("Failed to allocate user completion memory\n");
182 			goto error_user;
183 		}
184 
185 		for (j = 0; j < batch->size; j++) {
186 			rc = _vtophys(chan, &op->hw, &desc->completion_addr, sizeof(struct dsa_hw_comp_record));
187 			if (rc) {
188 				SPDK_ERRLOG("Failed to translate batch entry completion memory\n");
189 				goto error_user;
190 			}
191 			op++;
192 			desc++;
193 		}
194 		TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
195 		batch++;
196 	}
197 	return 0;
198 
199 error_user:
200 	TAILQ_FOREACH(batch, &chan->batch_pool, link) {
201 		spdk_free(batch->user_ops);
202 		batch->user_ops = NULL;
203 		spdk_free(batch->user_desc);
204 		batch->user_desc = NULL;
205 	}
206 	spdk_free(chan->ops_base);
207 	chan->ops_base = NULL;
208 error_desc:
209 	STAILQ_INIT(&chan->ops_pool);
210 	spdk_free(chan->desc_base);
211 	chan->desc_base = NULL;
212 	return rc;
213 }
214 
215 struct spdk_idxd_io_channel *
216 spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
217 {
218 	struct spdk_idxd_io_channel *chan;
219 	struct idxd_hw_desc *desc;
220 	struct idxd_ops *op;
221 	int i, num_descriptors, rc = -1;
222 	uint32_t comp_rec_size;
223 
224 	assert(idxd != NULL);
225 
226 	chan = calloc(1, sizeof(struct spdk_idxd_io_channel));
227 	if (chan == NULL) {
228 		SPDK_ERRLOG("Failed to allocate idxd chan\n");
229 		return NULL;
230 	}
231 
232 	chan->idxd = idxd;
233 	chan->pasid_enabled = idxd->pasid_enabled;
234 	STAILQ_INIT(&chan->ops_pool);
235 	TAILQ_INIT(&chan->batch_pool);
236 	STAILQ_INIT(&chan->ops_outstanding);
237 
238 	/* Assign WQ, portal */
239 	pthread_mutex_lock(&idxd->num_channels_lock);
240 	if (idxd->num_channels == idxd->chan_per_device) {
241 		/* too many channels sharing this device */
242 		pthread_mutex_unlock(&idxd->num_channels_lock);
243 		SPDK_ERRLOG("Too many channels sharing this device\n");
244 		goto error;
245 	}
246 
247 	/* Have each channel start at a different offset. */
248 	chan->portal = idxd->impl->portal_get_addr(idxd);
249 	chan->portal_offset = (idxd->num_channels * PORTAL_STRIDE) & PORTAL_MASK;
250 	idxd->num_channels++;
251 
252 	pthread_mutex_unlock(&idxd->num_channels_lock);
253 
254 	/* Allocate descriptors and completions */
255 	num_descriptors = idxd->total_wq_size / idxd->chan_per_device;
256 	chan->desc_base = desc = spdk_zmalloc(num_descriptors * sizeof(struct idxd_hw_desc),
257 					      0x40, NULL,
258 					      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
259 	if (chan->desc_base == NULL) {
260 		SPDK_ERRLOG("Failed to allocate DSA descriptor memory\n");
261 		goto error;
262 	}
263 
264 	chan->ops_base = op = spdk_zmalloc(num_descriptors * sizeof(struct idxd_ops),
265 					   0x40, NULL,
266 					   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
267 	if (chan->ops_base == NULL) {
268 		SPDK_ERRLOG("Failed to allocate idxd_ops memory\n");
269 		goto error;
270 	}
271 
272 	if (idxd->type == IDXD_DEV_TYPE_DSA) {
273 		comp_rec_size = sizeof(struct dsa_hw_comp_record);
274 		if (_dsa_alloc_batches(chan, num_descriptors)) {
275 			goto error;
276 		}
277 	} else {
278 		comp_rec_size = sizeof(struct iaa_hw_comp_record);
279 	}
280 
281 	for (i = 0; i < num_descriptors; i++) {
282 		STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
283 		op->desc = desc;
284 		rc = _vtophys(chan, &op->hw, &desc->completion_addr, comp_rec_size);
285 		if (rc) {
286 			SPDK_ERRLOG("Failed to translate completion memory\n");
287 			goto error;
288 		}
289 		op++;
290 		desc++;
291 	}
292 
293 	return chan;
294 
295 error:
296 	spdk_free(chan->ops_base);
297 	chan->ops_base = NULL;
298 	spdk_free(chan->desc_base);
299 	chan->desc_base = NULL;
300 	free(chan);
301 	return NULL;
302 }
303 
304 static int idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status);
305 
306 void
307 spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan)
308 {
309 	struct idxd_batch *batch;
310 
311 	assert(chan != NULL);
312 	assert(chan->idxd != NULL);
313 
314 	if (chan->batch) {
315 		idxd_batch_cancel(chan, -ECANCELED);
316 	}
317 
318 	pthread_mutex_lock(&chan->idxd->num_channels_lock);
319 	assert(chan->idxd->num_channels > 0);
320 	chan->idxd->num_channels--;
321 	pthread_mutex_unlock(&chan->idxd->num_channels_lock);
322 
323 	spdk_free(chan->ops_base);
324 	spdk_free(chan->desc_base);
325 	while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
326 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
327 		spdk_free(batch->user_ops);
328 		spdk_free(batch->user_desc);
329 	}
330 	free(chan->batch_base);
331 	free(chan);
332 }
333 
334 static inline struct spdk_idxd_impl *
335 idxd_get_impl_by_name(const char *impl_name)
336 {
337 	struct spdk_idxd_impl *impl;
338 
339 	assert(impl_name != NULL);
340 	STAILQ_FOREACH(impl, &g_idxd_impls, link) {
341 		if (0 == strcmp(impl_name, impl->name)) {
342 			return impl;
343 		}
344 	}
345 
346 	return NULL;
347 }
348 
349 int
350 spdk_idxd_set_config(bool kernel_mode)
351 {
352 	struct spdk_idxd_impl *tmp;
353 
354 	if (kernel_mode) {
355 		tmp = idxd_get_impl_by_name(KERNEL_DRIVER_NAME);
356 	} else {
357 		tmp = idxd_get_impl_by_name(USERSPACE_DRIVER_NAME);
358 	}
359 
360 	if (g_idxd_impl != NULL && g_idxd_impl != tmp) {
361 		SPDK_ERRLOG("Cannot change idxd implementation after devices are initialized\n");
362 		assert(false);
363 		return -EALREADY;
364 	}
365 	g_idxd_impl = tmp;
366 
367 	if (g_idxd_impl == NULL) {
368 		SPDK_ERRLOG("Cannot set the idxd implementation with %s mode\n",
369 			    kernel_mode ? KERNEL_DRIVER_NAME : USERSPACE_DRIVER_NAME);
370 		return -EINVAL;
371 	}
372 
373 	return 0;
374 }
375 
376 static void
377 idxd_device_destruct(struct spdk_idxd_device *idxd)
378 {
379 	assert(idxd->impl != NULL);
380 
381 	idxd->impl->destruct(idxd);
382 }
383 
384 int
385 spdk_idxd_probe(void *cb_ctx, spdk_idxd_attach_cb attach_cb,
386 		spdk_idxd_probe_cb probe_cb)
387 {
388 	if (g_idxd_impl == NULL) {
389 		SPDK_ERRLOG("No idxd impl is selected\n");
390 		return -1;
391 	}
392 
393 	return g_idxd_impl->probe(cb_ctx, attach_cb, probe_cb);
394 }
395 
396 void
397 spdk_idxd_detach(struct spdk_idxd_device *idxd)
398 {
399 	assert(idxd != NULL);
400 	idxd_device_destruct(idxd);
401 }
402 
403 static int
404 _idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, void *cb_arg,
405 		   int flags, struct idxd_hw_desc **_desc, struct idxd_ops **_op)
406 {
407 	struct idxd_hw_desc *desc;
408 	struct idxd_ops *op;
409 	uint64_t comp_addr;
410 
411 	if (!STAILQ_EMPTY(&chan->ops_pool)) {
412 		op = *_op = STAILQ_FIRST(&chan->ops_pool);
413 		desc = *_desc = op->desc;
414 		comp_addr = desc->completion_addr;
415 		memset(desc, 0, sizeof(*desc));
416 		desc->completion_addr = comp_addr;
417 		STAILQ_REMOVE_HEAD(&chan->ops_pool, link);
418 	} else {
419 		/* The application needs to handle this, violation of flow control */
420 		return -EBUSY;
421 	}
422 
423 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
424 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
425 
426 	desc->flags = flags;
427 	op->cb_arg = cb_arg;
428 	op->cb_fn = cb_fn;
429 	op->batch = NULL;
430 	op->parent = NULL;
431 	op->count = 1;
432 
433 	return 0;
434 }
435 
436 static int
437 _idxd_prep_batch_cmd(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn,
438 		     void *cb_arg, int flags,
439 		     struct idxd_hw_desc **_desc, struct idxd_ops **_op)
440 {
441 	struct idxd_hw_desc *desc;
442 	struct idxd_ops *op;
443 	uint64_t comp_addr;
444 	struct idxd_batch *batch;
445 
446 	batch = chan->batch;
447 
448 	assert(batch != NULL);
449 	if (batch->index == batch->size) {
450 		return -EBUSY;
451 	}
452 
453 	desc = *_desc = &batch->user_desc[batch->index];
454 	op = *_op = &batch->user_ops[batch->index];
455 
456 	op->desc = desc;
457 	SPDK_DEBUGLOG(idxd, "Prep batch %p index %u\n", batch, batch->index);
458 
459 	batch->index++;
460 
461 	comp_addr = desc->completion_addr;
462 	memset(desc, 0, sizeof(*desc));
463 	desc->completion_addr = comp_addr;
464 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
465 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
466 	desc->flags = flags;
467 	op->cb_arg = cb_arg;
468 	op->cb_fn = cb_fn;
469 	op->batch = batch;
470 	op->parent = NULL;
471 	op->count = 1;
472 	op->crc_dst = NULL;
473 
474 	return 0;
475 }
476 
477 static struct idxd_batch *
478 idxd_batch_create(struct spdk_idxd_io_channel *chan)
479 {
480 	struct idxd_batch *batch;
481 
482 	assert(chan != NULL);
483 	assert(chan->batch == NULL);
484 
485 	if (!TAILQ_EMPTY(&chan->batch_pool)) {
486 		batch = TAILQ_FIRST(&chan->batch_pool);
487 		batch->index = 0;
488 		batch->chan = chan;
489 		chan->batch = batch;
490 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
491 	} else {
492 		/* The application needs to handle this. */
493 		return NULL;
494 	}
495 
496 	return batch;
497 }
498 
499 static void
500 _free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan)
501 {
502 	SPDK_DEBUGLOG(idxd, "Free batch %p\n", batch);
503 	assert(batch->refcnt == 0);
504 	batch->index = 0;
505 	batch->chan = NULL;
506 	TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
507 }
508 
509 static int
510 idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status)
511 {
512 	struct idxd_ops *op;
513 	struct idxd_batch *batch;
514 	int i;
515 
516 	assert(chan != NULL);
517 
518 	batch = chan->batch;
519 	assert(batch != NULL);
520 
521 	if (batch->index == UINT16_MAX) {
522 		SPDK_ERRLOG("Cannot cancel batch, already submitted to HW.\n");
523 		return -EINVAL;
524 	}
525 
526 	chan->batch = NULL;
527 
528 	for (i = 0; i < batch->index; i++) {
529 		op = &batch->user_ops[i];
530 		if (op->cb_fn) {
531 			op->cb_fn(op->cb_arg, status);
532 		}
533 	}
534 
535 	_free_batch(batch, chan);
536 
537 	return 0;
538 }
539 
540 static int
541 idxd_batch_submit(struct spdk_idxd_io_channel *chan,
542 		  spdk_idxd_req_cb cb_fn, void *cb_arg)
543 {
544 	struct idxd_hw_desc *desc;
545 	struct idxd_batch *batch;
546 	struct idxd_ops *op;
547 	int i, rc, flags = 0;
548 
549 	assert(chan != NULL);
550 
551 	batch = chan->batch;
552 	assert(batch != NULL);
553 
554 	if (batch->index == 0) {
555 		return idxd_batch_cancel(chan, 0);
556 	}
557 
558 	/* Common prep. */
559 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
560 	if (rc) {
561 		return rc;
562 	}
563 
564 	if (batch->index == 1) {
565 		uint64_t completion_addr;
566 
567 		/* If there's only one command, convert it away from a batch. */
568 		completion_addr = desc->completion_addr;
569 		memcpy(desc, &batch->user_desc[0], sizeof(*desc));
570 		desc->completion_addr = completion_addr;
571 		op->cb_fn = batch->user_ops[0].cb_fn;
572 		op->cb_arg = batch->user_ops[0].cb_arg;
573 		op->crc_dst = batch->user_ops[0].crc_dst;
574 		_free_batch(batch, chan);
575 	} else {
576 		/* Command specific. */
577 		desc->opcode = IDXD_OPCODE_BATCH;
578 		desc->desc_list_addr = batch->user_desc_addr;
579 		desc->desc_count = batch->index;
580 		assert(batch->index <= batch->size);
581 
582 		/* Add the batch elements completion contexts to the outstanding list to be polled. */
583 		for (i = 0 ; i < batch->index; i++) {
584 			batch->refcnt++;
585 			STAILQ_INSERT_TAIL(&chan->ops_outstanding, (struct idxd_ops *)&batch->user_ops[i],
586 					   link);
587 		}
588 		batch->index = UINT16_MAX;
589 	}
590 
591 	chan->batch = NULL;
592 
593 	/* Submit operation. */
594 	_submit_to_hw(chan, op);
595 	SPDK_DEBUGLOG(idxd, "Submitted batch %p\n", batch);
596 
597 	return 0;
598 }
599 
600 static int
601 _idxd_setup_batch(struct spdk_idxd_io_channel *chan)
602 {
603 	struct idxd_batch *batch;
604 
605 	if (chan->batch == NULL) {
606 		batch = idxd_batch_create(chan);
607 		if (batch == NULL) {
608 			return -EBUSY;
609 		}
610 	}
611 
612 	return 0;
613 }
614 
615 static int
616 _idxd_flush_batch(struct spdk_idxd_io_channel *chan)
617 {
618 	struct idxd_batch *batch = chan->batch;
619 	int rc;
620 
621 	if (batch != NULL && batch->index >= IDXD_MIN_BATCH_FLUSH) {
622 		/* Close out the full batch */
623 		rc = idxd_batch_submit(chan, NULL, NULL);
624 		if (rc) {
625 			assert(rc == -EBUSY);
626 			/*
627 			 * Return 0. This will get re-submitted within idxd_process_events where
628 			 * if it fails, it will get correctly aborted.
629 			 */
630 			return 0;
631 		}
632 	}
633 
634 	return 0;
635 }
636 
637 static inline void
638 _update_write_flags(struct spdk_idxd_io_channel *chan, struct idxd_hw_desc *desc)
639 {
640 	desc->flags ^= IDXD_FLAG_CACHE_CONTROL;
641 }
642 
643 int
644 spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan,
645 		      struct iovec *diov, uint32_t diovcnt,
646 		      struct iovec *siov, uint32_t siovcnt,
647 		      int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
648 {
649 	struct idxd_hw_desc *desc;
650 	struct idxd_ops *first_op, *op;
651 	void *src, *dst;
652 	uint64_t src_addr, dst_addr;
653 	int rc, count;
654 	uint64_t len, seg_len;
655 	struct spdk_ioviter iter;
656 	struct idxd_vtophys_iter vtophys_iter;
657 
658 	assert(chan != NULL);
659 	assert(diov != NULL);
660 	assert(siov != NULL);
661 
662 	rc = _idxd_setup_batch(chan);
663 	if (rc) {
664 		return rc;
665 	}
666 
667 	count = 0;
668 	first_op = NULL;
669 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
670 	     len > 0;
671 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
672 
673 		idxd_vtophys_iter_init(chan, &vtophys_iter, src, dst, len);
674 
675 		while (len > 0) {
676 			if (first_op == NULL) {
677 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
678 				if (rc) {
679 					goto error;
680 				}
681 
682 				first_op = op;
683 			} else {
684 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
685 				if (rc) {
686 					goto error;
687 				}
688 
689 				first_op->count++;
690 				op->parent = first_op;
691 			}
692 
693 			count++;
694 
695 			src_addr = 0;
696 			dst_addr = 0;
697 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
698 			if (seg_len == SPDK_VTOPHYS_ERROR) {
699 				rc = -EFAULT;
700 				goto error;
701 			}
702 
703 			desc->opcode = IDXD_OPCODE_MEMMOVE;
704 			desc->src_addr = src_addr;
705 			desc->dst_addr = dst_addr;
706 			desc->xfer_size = seg_len;
707 			_update_write_flags(chan, desc);
708 
709 			len -= seg_len;
710 		}
711 	}
712 
713 	return _idxd_flush_batch(chan);
714 
715 error:
716 	chan->batch->index -= count;
717 	return rc;
718 }
719 
720 /* Dual-cast copies the same source to two separate destination buffers. */
721 int
722 spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *dst2,
723 			  const void *src, uint64_t nbytes, int flags,
724 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
725 {
726 	struct idxd_hw_desc *desc;
727 	struct idxd_ops *first_op, *op;
728 	uint64_t src_addr, dst1_addr, dst2_addr;
729 	int rc, count;
730 	uint64_t len;
731 	uint64_t outer_seg_len, inner_seg_len;
732 	struct idxd_vtophys_iter iter_outer, iter_inner;
733 
734 	assert(chan != NULL);
735 	assert(dst1 != NULL);
736 	assert(dst2 != NULL);
737 	assert(src != NULL);
738 
739 	if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) {
740 		SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n");
741 		return -EINVAL;
742 	}
743 
744 	rc = _idxd_setup_batch(chan);
745 	if (rc) {
746 		return rc;
747 	}
748 
749 	idxd_vtophys_iter_init(chan, &iter_outer, src, dst1, nbytes);
750 
751 	first_op = NULL;
752 	count = 0;
753 	while (nbytes > 0) {
754 		src_addr = 0;
755 		dst1_addr = 0;
756 		outer_seg_len = idxd_vtophys_iter_next(&iter_outer, &src_addr, &dst1_addr);
757 		if (outer_seg_len == SPDK_VTOPHYS_ERROR) {
758 			goto error;
759 		}
760 
761 		idxd_vtophys_iter_init(chan, &iter_inner, src, dst2, nbytes);
762 
763 		src += outer_seg_len;
764 		nbytes -= outer_seg_len;
765 
766 		while (outer_seg_len > 0) {
767 			if (first_op == NULL) {
768 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
769 				if (rc) {
770 					goto error;
771 				}
772 
773 				first_op = op;
774 			} else {
775 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
776 				if (rc) {
777 					goto error;
778 				}
779 
780 				first_op->count++;
781 				op->parent = first_op;
782 			}
783 
784 			count++;
785 
786 			src_addr = 0;
787 			dst2_addr = 0;
788 			inner_seg_len = idxd_vtophys_iter_next(&iter_inner, &src_addr, &dst2_addr);
789 			if (inner_seg_len == SPDK_VTOPHYS_ERROR) {
790 				rc = -EFAULT;
791 				goto error;
792 			}
793 
794 			len = spdk_min(outer_seg_len, inner_seg_len);
795 
796 			/* Command specific. */
797 			desc->opcode = IDXD_OPCODE_DUALCAST;
798 			desc->src_addr = src_addr;
799 			desc->dst_addr = dst1_addr;
800 			desc->dest2 = dst2_addr;
801 			desc->xfer_size = len;
802 			_update_write_flags(chan, desc);
803 
804 			dst1_addr += len;
805 			outer_seg_len -= len;
806 		}
807 	}
808 
809 	return _idxd_flush_batch(chan);
810 
811 error:
812 	chan->batch->index -= count;
813 	return rc;
814 }
815 
816 int
817 spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan,
818 			 struct iovec *siov1, size_t siov1cnt,
819 			 struct iovec *siov2, size_t siov2cnt,
820 			 int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
821 {
822 
823 	struct idxd_hw_desc *desc;
824 	struct idxd_ops *first_op, *op;
825 	void *src1, *src2;
826 	uint64_t src1_addr, src2_addr;
827 	int rc, count;
828 	uint64_t len, seg_len;
829 	struct spdk_ioviter iter;
830 	struct idxd_vtophys_iter vtophys_iter;
831 
832 	assert(chan != NULL);
833 	assert(siov1 != NULL);
834 	assert(siov2 != NULL);
835 
836 	rc = _idxd_setup_batch(chan);
837 	if (rc) {
838 		return rc;
839 	}
840 
841 	count = 0;
842 	first_op = NULL;
843 	for (len = spdk_ioviter_first(&iter, siov1, siov1cnt, siov2, siov2cnt, &src1, &src2);
844 	     len > 0;
845 	     len = spdk_ioviter_next(&iter, &src1, &src2)) {
846 
847 		idxd_vtophys_iter_init(chan, &vtophys_iter, src1, src2, len);
848 
849 		while (len > 0) {
850 			if (first_op == NULL) {
851 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
852 				if (rc) {
853 					goto error;
854 				}
855 
856 				first_op = op;
857 			} else {
858 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
859 				if (rc) {
860 					goto error;
861 				}
862 
863 				first_op->count++;
864 				op->parent = first_op;
865 			}
866 
867 			count++;
868 
869 			src1_addr = 0;
870 			src2_addr = 0;
871 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src1_addr, &src2_addr);
872 			if (seg_len == SPDK_VTOPHYS_ERROR) {
873 				rc = -EFAULT;
874 				goto error;
875 			}
876 
877 			desc->opcode = IDXD_OPCODE_COMPARE;
878 			desc->src_addr = src1_addr;
879 			desc->src2_addr = src2_addr;
880 			desc->xfer_size = seg_len;
881 
882 			len -= seg_len;
883 		}
884 	}
885 
886 	return _idxd_flush_batch(chan);
887 
888 error:
889 	chan->batch->index -= count;
890 	return rc;
891 }
892 
893 int
894 spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan,
895 		      struct iovec *diov, size_t diovcnt,
896 		      uint64_t fill_pattern, int flags,
897 		      spdk_idxd_req_cb cb_fn, void *cb_arg)
898 {
899 	struct idxd_hw_desc *desc;
900 	struct idxd_ops *first_op, *op;
901 	uint64_t dst_addr;
902 	int rc, count;
903 	uint64_t len, seg_len;
904 	void *dst;
905 	size_t i;
906 
907 	assert(chan != NULL);
908 	assert(diov != NULL);
909 
910 	rc = _idxd_setup_batch(chan);
911 	if (rc) {
912 		return rc;
913 	}
914 
915 	count = 0;
916 	first_op = NULL;
917 	for (i = 0; i < diovcnt; i++) {
918 		len = diov[i].iov_len;
919 		dst = diov[i].iov_base;
920 
921 		while (len > 0) {
922 			if (first_op == NULL) {
923 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
924 				if (rc) {
925 					goto error;
926 				}
927 
928 				first_op = op;
929 			} else {
930 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
931 				if (rc) {
932 					goto error;
933 				}
934 
935 				first_op->count++;
936 				op->parent = first_op;
937 			}
938 
939 			count++;
940 
941 			seg_len = len;
942 			if (chan->pasid_enabled) {
943 				dst_addr = (uint64_t)dst;
944 			} else {
945 				dst_addr = spdk_vtophys(dst, &seg_len);
946 				if (dst_addr == SPDK_VTOPHYS_ERROR) {
947 					SPDK_ERRLOG("Error translating address\n");
948 					rc = -EFAULT;
949 					goto error;
950 				}
951 			}
952 
953 			seg_len = spdk_min(seg_len, len);
954 
955 			desc->opcode = IDXD_OPCODE_MEMFILL;
956 			desc->pattern = fill_pattern;
957 			desc->dst_addr = dst_addr;
958 			desc->xfer_size = seg_len;
959 			_update_write_flags(chan, desc);
960 
961 			len -= seg_len;
962 			dst += seg_len;
963 		}
964 	}
965 
966 	return _idxd_flush_batch(chan);
967 
968 error:
969 	chan->batch->index -= count;
970 	return rc;
971 }
972 
973 int
974 spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan,
975 			struct iovec *siov, size_t siovcnt,
976 			uint32_t seed, uint32_t *crc_dst, int flags,
977 			spdk_idxd_req_cb cb_fn, void *cb_arg)
978 {
979 	struct idxd_hw_desc *desc;
980 	struct idxd_ops *first_op, *op;
981 	uint64_t src_addr;
982 	int rc, count;
983 	uint64_t len, seg_len;
984 	void *src;
985 	size_t i;
986 	uint64_t prev_crc = 0;
987 
988 	assert(chan != NULL);
989 	assert(siov != NULL);
990 
991 	rc = _idxd_setup_batch(chan);
992 	if (rc) {
993 		return rc;
994 	}
995 
996 	count = 0;
997 	op = NULL;
998 	first_op = NULL;
999 	for (i = 0; i < siovcnt; i++) {
1000 		len = siov[i].iov_len;
1001 		src = siov[i].iov_base;
1002 
1003 		while (len > 0) {
1004 			if (first_op == NULL) {
1005 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1006 				if (rc) {
1007 					goto error;
1008 				}
1009 
1010 				first_op = op;
1011 			} else {
1012 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1013 				if (rc) {
1014 					goto error;
1015 				}
1016 
1017 				first_op->count++;
1018 				op->parent = first_op;
1019 			}
1020 
1021 			count++;
1022 
1023 			seg_len = len;
1024 			if (chan->pasid_enabled) {
1025 				src_addr = (uint64_t)src;
1026 			} else {
1027 				src_addr = spdk_vtophys(src, &seg_len);
1028 				if (src_addr == SPDK_VTOPHYS_ERROR) {
1029 					SPDK_ERRLOG("Error translating address\n");
1030 					rc = -EFAULT;
1031 					goto error;
1032 				}
1033 			}
1034 
1035 			seg_len = spdk_min(seg_len, len);
1036 
1037 			desc->opcode = IDXD_OPCODE_CRC32C_GEN;
1038 			desc->src_addr = src_addr;
1039 			if (op == first_op) {
1040 				desc->crc32c.seed = seed;
1041 			} else {
1042 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
1043 				desc->crc32c.addr = prev_crc;
1044 			}
1045 
1046 			desc->xfer_size = seg_len;
1047 			prev_crc = desc->completion_addr + offsetof(struct dsa_hw_comp_record, crc32c_val);
1048 
1049 			len -= seg_len;
1050 			src += seg_len;
1051 		}
1052 	}
1053 
1054 	/* Only the last op copies the crc to the destination */
1055 	if (op) {
1056 		op->crc_dst = crc_dst;
1057 	}
1058 
1059 	return _idxd_flush_batch(chan);
1060 
1061 error:
1062 	chan->batch->index -= count;
1063 	return rc;
1064 }
1065 
1066 int
1067 spdk_idxd_submit_copy_crc32c(struct spdk_idxd_io_channel *chan,
1068 			     struct iovec *diov, size_t diovcnt,
1069 			     struct iovec *siov, size_t siovcnt,
1070 			     uint32_t seed, uint32_t *crc_dst, int flags,
1071 			     spdk_idxd_req_cb cb_fn, void *cb_arg)
1072 {
1073 	struct idxd_hw_desc *desc;
1074 	struct idxd_ops *first_op, *op;
1075 	void *src, *dst;
1076 	uint64_t src_addr, dst_addr;
1077 	int rc, count;
1078 	uint64_t len, seg_len;
1079 	struct spdk_ioviter iter;
1080 	struct idxd_vtophys_iter vtophys_iter;
1081 	uint64_t prev_crc = 0;
1082 
1083 	assert(chan != NULL);
1084 	assert(diov != NULL);
1085 	assert(siov != NULL);
1086 
1087 	rc = _idxd_setup_batch(chan);
1088 	if (rc) {
1089 		return rc;
1090 	}
1091 
1092 	count = 0;
1093 	op = NULL;
1094 	first_op = NULL;
1095 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
1096 	     len > 0;
1097 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
1098 
1099 
1100 		idxd_vtophys_iter_init(chan, &vtophys_iter, src, dst, len);
1101 
1102 		while (len > 0) {
1103 			if (first_op == NULL) {
1104 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1105 				if (rc) {
1106 					goto error;
1107 				}
1108 
1109 				first_op = op;
1110 			} else {
1111 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1112 				if (rc) {
1113 					goto error;
1114 				}
1115 
1116 				first_op->count++;
1117 				op->parent = first_op;
1118 			}
1119 
1120 			count++;
1121 
1122 			src_addr = 0;
1123 			dst_addr = 0;
1124 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
1125 			if (seg_len == SPDK_VTOPHYS_ERROR) {
1126 				rc = -EFAULT;
1127 				goto error;
1128 			}
1129 
1130 			desc->opcode = IDXD_OPCODE_COPY_CRC;
1131 			desc->dst_addr = dst_addr;
1132 			desc->src_addr = src_addr;
1133 			_update_write_flags(chan, desc);
1134 			if (op == first_op) {
1135 				desc->crc32c.seed = seed;
1136 			} else {
1137 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
1138 				desc->crc32c.addr = prev_crc;
1139 			}
1140 
1141 			desc->xfer_size = seg_len;
1142 			prev_crc = desc->completion_addr + offsetof(struct dsa_hw_comp_record, crc32c_val);
1143 
1144 			len -= seg_len;
1145 		}
1146 	}
1147 
1148 	/* Only the last op copies the crc to the destination */
1149 	if (op) {
1150 		op->crc_dst = crc_dst;
1151 	}
1152 
1153 	return _idxd_flush_batch(chan);
1154 
1155 error:
1156 	chan->batch->index -= count;
1157 	return rc;
1158 }
1159 
1160 static inline int
1161 _idxd_submit_compress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1162 			     uint64_t nbytes_dst, uint64_t nbytes_src, uint32_t *output_size,
1163 			     int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1164 {
1165 	struct idxd_hw_desc *desc;
1166 	struct idxd_ops *op;
1167 	uint64_t src_addr, dst_addr;
1168 	int rc;
1169 
1170 	/* Common prep. */
1171 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1172 	if (rc) {
1173 		return rc;
1174 	}
1175 
1176 	rc = _vtophys(chan, src, &src_addr, nbytes_src);
1177 	if (rc) {
1178 		goto error;
1179 	}
1180 
1181 	rc = _vtophys(chan, dst, &dst_addr, nbytes_dst);
1182 	if (rc) {
1183 		goto error;
1184 	}
1185 
1186 	/* Command specific. */
1187 	desc->opcode = IDXD_OPCODE_COMPRESS;
1188 	desc->src1_addr = src_addr;
1189 	desc->dst_addr = dst_addr;
1190 	desc->src1_size = nbytes_src;
1191 	desc->iaa.max_dst_size = nbytes_dst;
1192 	desc->iaa.src2_size = sizeof(struct iaa_aecs);
1193 	desc->iaa.src2_addr = chan->idxd->aecs_addr;
1194 	desc->flags |= IAA_FLAG_RD_SRC2_AECS;
1195 	desc->compr_flags = IAA_COMP_FLAGS;
1196 	op->output_size = output_size;
1197 
1198 	_submit_to_hw(chan, op);
1199 	return 0;
1200 error:
1201 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1202 	return rc;
1203 }
1204 
1205 int
1206 spdk_idxd_submit_compress(struct spdk_idxd_io_channel *chan,
1207 			  void *dst, uint64_t nbytes,
1208 			  struct iovec *siov, uint32_t siovcnt, uint32_t *output_size,
1209 			  int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1210 {
1211 	assert(chan != NULL);
1212 	assert(dst != NULL);
1213 	assert(siov != NULL);
1214 
1215 	if (siovcnt == 1) {
1216 		/* Simple case - copying one buffer to another */
1217 		if (nbytes < siov[0].iov_len) {
1218 			return -EINVAL;
1219 		}
1220 
1221 		return _idxd_submit_compress_single(chan, dst, siov[0].iov_base,
1222 						    nbytes, siov[0].iov_len,
1223 						    output_size, flags, cb_fn, cb_arg);
1224 	}
1225 	/* TODO: vectored support */
1226 	return -EINVAL;
1227 }
1228 
1229 static inline int
1230 _idxd_submit_decompress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1231 			       uint64_t nbytes_dst, uint64_t nbytes, int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1232 {
1233 	struct idxd_hw_desc *desc;
1234 	struct idxd_ops *op;
1235 	uint64_t src_addr, dst_addr;
1236 	int rc;
1237 
1238 	/* Common prep. */
1239 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1240 	if (rc) {
1241 		return rc;
1242 	}
1243 
1244 	rc = _vtophys(chan, src, &src_addr, nbytes);
1245 	if (rc) {
1246 		goto error;
1247 	}
1248 
1249 	rc = _vtophys(chan, dst, &dst_addr, nbytes_dst);
1250 	if (rc) {
1251 		goto error;
1252 	}
1253 
1254 	/* Command specific. */
1255 	desc->opcode = IDXD_OPCODE_DECOMPRESS;
1256 	desc->src1_addr = src_addr;
1257 	desc->dst_addr = dst_addr;
1258 	desc->src1_size = nbytes;
1259 	desc->iaa.max_dst_size = nbytes_dst;
1260 	desc->decompr_flags = IAA_DECOMP_FLAGS;
1261 
1262 	_submit_to_hw(chan, op);
1263 	return 0;
1264 error:
1265 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1266 	return rc;
1267 }
1268 
1269 int
1270 spdk_idxd_submit_decompress(struct spdk_idxd_io_channel *chan,
1271 			    struct iovec *diov, uint32_t diovcnt,
1272 			    struct iovec *siov, uint32_t siovcnt,
1273 			    int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1274 {
1275 	assert(chan != NULL);
1276 	assert(diov != NULL);
1277 	assert(siov != NULL);
1278 
1279 	if (diovcnt == 1 && siovcnt == 1) {
1280 		/* Simple case - copying one buffer to another */
1281 		if (diov[0].iov_len < siov[0].iov_len) {
1282 			return -EINVAL;
1283 		}
1284 
1285 		return _idxd_submit_decompress_single(chan, diov[0].iov_base, siov[0].iov_base,
1286 						      diov[0].iov_len, siov[0].iov_len,
1287 						      flags, cb_fn, cb_arg);
1288 	}
1289 	/* TODO: vectored support */
1290 	return -EINVAL;
1291 }
1292 
1293 int
1294 spdk_idxd_submit_raw_desc(struct spdk_idxd_io_channel *chan,
1295 			  struct idxd_hw_desc *_desc,
1296 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
1297 {
1298 	struct idxd_hw_desc *desc;
1299 	struct idxd_ops *op;
1300 	int rc, flags = 0;
1301 	uint64_t comp_addr;
1302 
1303 	assert(chan != NULL);
1304 	assert(_desc != NULL);
1305 
1306 	/* Common prep. */
1307 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1308 	if (rc) {
1309 		return rc;
1310 	}
1311 
1312 	/* Command specific. */
1313 	flags = desc->flags;
1314 	comp_addr = desc->completion_addr;
1315 	memcpy(desc, _desc, sizeof(*desc));
1316 	desc->flags |= flags;
1317 	desc->completion_addr = comp_addr;
1318 
1319 	/* Submit operation. */
1320 	_submit_to_hw(chan, op);
1321 
1322 	return 0;
1323 }
1324 
1325 static inline void
1326 _dump_sw_error_reg(struct spdk_idxd_io_channel *chan)
1327 {
1328 	struct spdk_idxd_device *idxd = chan->idxd;
1329 
1330 	assert(idxd != NULL);
1331 	idxd->impl->dump_sw_error(idxd, chan->portal);
1332 }
1333 
1334 /* TODO: more performance experiments. */
1335 #define IDXD_COMPLETION(x) ((x) > (0) ? (1) : (0))
1336 #define IDXD_FAILURE(x) ((x) > (1) ? (1) : (0))
1337 #define IDXD_SW_ERROR(x) ((x) &= (0x1) ? (1) : (0))
1338 int
1339 spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
1340 {
1341 	struct idxd_ops *op, *tmp, *parent_op;
1342 	int status = 0;
1343 	int rc2, rc = 0;
1344 	void *cb_arg;
1345 	spdk_idxd_req_cb cb_fn;
1346 
1347 	assert(chan != NULL);
1348 
1349 	STAILQ_FOREACH_SAFE(op, &chan->ops_outstanding, link, tmp) {
1350 		if (!IDXD_COMPLETION(op->hw.status)) {
1351 			/*
1352 			 * oldest locations are at the head of the list so if
1353 			 * we've polled a location that hasn't completed, bail
1354 			 * now as there are unlikely to be any more completions.
1355 			 */
1356 			break;
1357 		}
1358 
1359 		STAILQ_REMOVE_HEAD(&chan->ops_outstanding, link);
1360 		rc++;
1361 
1362 		/* Status is in the same location for both IAA and DSA completion records. */
1363 		if (spdk_unlikely(IDXD_FAILURE(op->hw.status))) {
1364 			SPDK_ERRLOG("Completion status 0x%x\n", op->hw.status);
1365 			status = -EINVAL;
1366 			_dump_sw_error_reg(chan);
1367 		}
1368 
1369 		switch (op->desc->opcode) {
1370 		case IDXD_OPCODE_BATCH:
1371 			SPDK_DEBUGLOG(idxd, "Complete batch %p\n", op->batch);
1372 			break;
1373 		case IDXD_OPCODE_CRC32C_GEN:
1374 		case IDXD_OPCODE_COPY_CRC:
1375 			if (spdk_likely(status == 0 && op->crc_dst != NULL)) {
1376 				*op->crc_dst = op->hw.crc32c_val;
1377 				*op->crc_dst ^= ~0;
1378 			}
1379 			break;
1380 		case IDXD_OPCODE_COMPARE:
1381 			if (spdk_likely(status == 0)) {
1382 				status = op->hw.result;
1383 			}
1384 			break;
1385 		case IDXD_OPCODE_COMPRESS:
1386 			if (spdk_likely(status == 0 && op->output_size != NULL)) {
1387 				*op->output_size = op->iaa_hw.output_size;
1388 			}
1389 			break;
1390 		}
1391 
1392 		/* TODO: WHAT IF THIS FAILED!? */
1393 		op->hw.status = 0;
1394 
1395 		assert(op->count > 0);
1396 		op->count--;
1397 
1398 		parent_op = op->parent;
1399 		if (parent_op != NULL) {
1400 			assert(parent_op->count > 0);
1401 			parent_op->count--;
1402 
1403 			if (parent_op->count == 0) {
1404 				cb_fn = parent_op->cb_fn;
1405 				cb_arg = parent_op->cb_arg;
1406 
1407 				assert(parent_op->batch != NULL);
1408 
1409 				/*
1410 				 * Now that parent_op count is 0, we can release its ref
1411 				 * to its batch. We have not released the ref to the batch
1412 				 * that the op is pointing to yet, which will be done below.
1413 				 */
1414 				parent_op->batch->refcnt--;
1415 				if (parent_op->batch->refcnt == 0) {
1416 					_free_batch(parent_op->batch, chan);
1417 				}
1418 
1419 				if (cb_fn) {
1420 					cb_fn(cb_arg, status);
1421 				}
1422 			}
1423 		}
1424 
1425 		if (op->count == 0) {
1426 			cb_fn = op->cb_fn;
1427 			cb_arg = op->cb_arg;
1428 
1429 			if (op->batch != NULL) {
1430 				assert(op->batch->refcnt > 0);
1431 				op->batch->refcnt--;
1432 
1433 				if (op->batch->refcnt == 0) {
1434 					_free_batch(op->batch, chan);
1435 				}
1436 			} else {
1437 				STAILQ_INSERT_HEAD(&chan->ops_pool, op, link);
1438 			}
1439 
1440 			if (cb_fn) {
1441 				cb_fn(cb_arg, status);
1442 			}
1443 		}
1444 
1445 		/* reset the status */
1446 		status = 0;
1447 		/* break the processing loop to prevent from starving the rest of the system */
1448 		if (rc > IDXD_MAX_COMPLETIONS) {
1449 			break;
1450 		}
1451 	}
1452 
1453 	/* Submit any built-up batch */
1454 	if (chan->batch) {
1455 		rc2 = idxd_batch_submit(chan, NULL, NULL);
1456 		if (rc2) {
1457 			assert(rc2 == -EBUSY);
1458 		}
1459 	}
1460 
1461 	return rc;
1462 }
1463 
1464 void
1465 idxd_impl_register(struct spdk_idxd_impl *impl)
1466 {
1467 	STAILQ_INSERT_HEAD(&g_idxd_impls, impl, link);
1468 }
1469 
1470 SPDK_LOG_REGISTER_COMPONENT(idxd)
1471