xref: /spdk/lib/idxd/idxd.c (revision f6866117acb32c78d5ea7bd76ba330284655af35)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2020 Intel Corporation.
3  *   All rights reserved.
4  */
5 
6 #include "spdk/stdinc.h"
7 
8 #include "spdk/env.h"
9 #include "spdk/util.h"
10 #include "spdk/memory.h"
11 #include "spdk/likely.h"
12 
13 #include "spdk/log.h"
14 #include "spdk_internal/idxd.h"
15 
16 #include "idxd_internal.h"
17 
18 #define ALIGN_4K 0x1000
19 #define USERSPACE_DRIVER_NAME "user"
20 #define KERNEL_DRIVER_NAME "kernel"
21 
22 /* The max number of completions processed per poll */
23 #define IDXD_MAX_COMPLETIONS      128
24 
25 static STAILQ_HEAD(, spdk_idxd_impl) g_idxd_impls = STAILQ_HEAD_INITIALIZER(g_idxd_impls);
26 static struct spdk_idxd_impl *g_idxd_impl;
27 
28 uint32_t
29 spdk_idxd_get_socket(struct spdk_idxd_device *idxd)
30 {
31 	return idxd->socket_id;
32 }
33 
34 static inline void
35 _submit_to_hw(struct spdk_idxd_io_channel *chan, struct idxd_ops *op)
36 {
37 	STAILQ_INSERT_TAIL(&chan->ops_outstanding, op, link);
38 	/*
39 	 * We must barrier before writing the descriptor to ensure that data
40 	 * has been correctly flushed from the associated data buffers before DMA
41 	 * operations begin.
42 	 */
43 	_spdk_wmb();
44 	movdir64b(chan->portal + chan->portal_offset, op->desc);
45 	chan->portal_offset = (chan->portal_offset + chan->idxd->chan_per_device * PORTAL_STRIDE) &
46 			      PORTAL_MASK;
47 }
48 
49 inline static int
50 _vtophys(struct spdk_idxd_io_channel *chan, const void *buf, uint64_t *buf_addr, uint64_t size)
51 {
52 	uint64_t updated_size = size;
53 
54 	if (chan->pasid_enabled) {
55 		/* We can just use virtual addresses */
56 		*buf_addr = (uint64_t)buf;
57 		return 0;
58 	}
59 
60 	*buf_addr = spdk_vtophys(buf, &updated_size);
61 
62 	if (*buf_addr == SPDK_VTOPHYS_ERROR) {
63 		SPDK_ERRLOG("Error translating address\n");
64 		return -EINVAL;
65 	}
66 
67 	if (updated_size < size) {
68 		SPDK_ERRLOG("Error translating size (0x%lx), return size (0x%lx)\n", size, updated_size);
69 		return -EINVAL;
70 	}
71 
72 	return 0;
73 }
74 
75 struct idxd_vtophys_iter {
76 	const void	*src;
77 	void		*dst;
78 	uint64_t	len;
79 
80 	uint64_t	offset;
81 
82 	bool		pasid_enabled;
83 };
84 
85 static void
86 idxd_vtophys_iter_init(struct spdk_idxd_io_channel *chan,
87 		       struct idxd_vtophys_iter *iter,
88 		       const void *src, void *dst, uint64_t len)
89 {
90 	iter->src = src;
91 	iter->dst = dst;
92 	iter->len = len;
93 	iter->offset = 0;
94 	iter->pasid_enabled = chan->pasid_enabled;
95 }
96 
97 static uint64_t
98 idxd_vtophys_iter_next(struct idxd_vtophys_iter *iter,
99 		       uint64_t *src_phys, uint64_t *dst_phys)
100 {
101 	uint64_t src_off, dst_off, len;
102 	const void *src;
103 	void *dst;
104 
105 	src = iter->src + iter->offset;
106 	dst = iter->dst + iter->offset;
107 
108 	if (iter->offset == iter->len) {
109 		return 0;
110 	}
111 
112 	if (iter->pasid_enabled) {
113 		*src_phys = (uint64_t)src;
114 		*dst_phys = (uint64_t)dst;
115 		return iter->len;
116 	}
117 
118 	len = iter->len - iter->offset;
119 
120 	src_off = len;
121 	*src_phys = spdk_vtophys(src, &src_off);
122 	if (*src_phys == SPDK_VTOPHYS_ERROR) {
123 		SPDK_ERRLOG("Error translating address\n");
124 		return SPDK_VTOPHYS_ERROR;
125 	}
126 
127 	dst_off = len;
128 	*dst_phys = spdk_vtophys(dst, &dst_off);
129 	if (*dst_phys == SPDK_VTOPHYS_ERROR) {
130 		SPDK_ERRLOG("Error translating address\n");
131 		return SPDK_VTOPHYS_ERROR;
132 	}
133 
134 	len = spdk_min(src_off, dst_off);
135 	iter->offset += len;
136 
137 	return len;
138 }
139 
140 /* helper function for DSA specific spdk_idxd_get_channel() stuff */
141 static int
142 _dsa_alloc_batches(struct spdk_idxd_io_channel *chan, int num_descriptors)
143 {
144 	struct idxd_batch *batch;
145 	struct idxd_hw_desc *desc;
146 	struct idxd_ops *op;
147 	int i, j, num_batches, rc = -1;
148 
149 	/* Allocate batches */
150 	num_batches = num_descriptors;
151 	chan->batch_base = calloc(num_batches, sizeof(struct idxd_batch));
152 	if (chan->batch_base == NULL) {
153 		SPDK_ERRLOG("Failed to allocate batch pool\n");
154 		goto error_desc;
155 	}
156 	batch = chan->batch_base;
157 	for (i = 0 ; i < num_batches ; i++) {
158 		batch->user_desc = desc = spdk_zmalloc(DESC_PER_BATCH * sizeof(struct idxd_hw_desc),
159 						       0x40, NULL,
160 						       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
161 		if (batch->user_desc == NULL) {
162 			SPDK_ERRLOG("Failed to allocate batch descriptor memory\n");
163 			goto error_user;
164 		}
165 
166 		rc = _vtophys(chan, batch->user_desc, &batch->user_desc_addr,
167 			      DESC_PER_BATCH * sizeof(struct idxd_hw_desc));
168 		if (rc) {
169 			SPDK_ERRLOG("Failed to translate batch descriptor memory\n");
170 			goto error_user;
171 		}
172 
173 		batch->user_ops = op = spdk_zmalloc(DESC_PER_BATCH * sizeof(struct idxd_ops),
174 						    0x40, NULL,
175 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
176 		if (batch->user_ops == NULL) {
177 			SPDK_ERRLOG("Failed to allocate user completion memory\n");
178 			goto error_user;
179 		}
180 
181 		for (j = 0; j < DESC_PER_BATCH; j++) {
182 			rc = _vtophys(chan, &op->hw, &desc->completion_addr, sizeof(struct dsa_hw_comp_record));
183 			if (rc) {
184 				SPDK_ERRLOG("Failed to translate batch entry completion memory\n");
185 				goto error_user;
186 			}
187 			op++;
188 			desc++;
189 		}
190 		TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
191 		batch++;
192 	}
193 	return 0;
194 
195 error_user:
196 	TAILQ_FOREACH(batch, &chan->batch_pool, link) {
197 		spdk_free(batch->user_ops);
198 		batch->user_ops = NULL;
199 		spdk_free(batch->user_desc);
200 		batch->user_desc = NULL;
201 	}
202 	spdk_free(chan->ops_base);
203 	chan->ops_base = NULL;
204 error_desc:
205 	STAILQ_INIT(&chan->ops_pool);
206 	spdk_free(chan->desc_base);
207 	chan->desc_base = NULL;
208 	return rc;
209 }
210 
211 struct spdk_idxd_io_channel *
212 spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
213 {
214 	struct spdk_idxd_io_channel *chan;
215 	struct idxd_hw_desc *desc;
216 	struct idxd_ops *op;
217 	int i, num_descriptors, rc = -1;
218 	uint32_t comp_rec_size;
219 
220 	assert(idxd != NULL);
221 
222 	chan = calloc(1, sizeof(struct spdk_idxd_io_channel));
223 	if (chan == NULL) {
224 		SPDK_ERRLOG("Failed to allocate idxd chan\n");
225 		return NULL;
226 	}
227 
228 	chan->idxd = idxd;
229 	chan->pasid_enabled = idxd->pasid_enabled;
230 	STAILQ_INIT(&chan->ops_pool);
231 	TAILQ_INIT(&chan->batch_pool);
232 	STAILQ_INIT(&chan->ops_outstanding);
233 
234 	/* Assign WQ, portal */
235 	pthread_mutex_lock(&idxd->num_channels_lock);
236 	if (idxd->num_channels == idxd->chan_per_device) {
237 		/* too many channels sharing this device */
238 		pthread_mutex_unlock(&idxd->num_channels_lock);
239 		SPDK_ERRLOG("Too many channels sharing this device\n");
240 		goto error;
241 	}
242 
243 	/* Have each channel start at a different offset. */
244 	chan->portal = idxd->impl->portal_get_addr(idxd);
245 	chan->portal_offset = (idxd->num_channels * PORTAL_STRIDE) & PORTAL_MASK;
246 	idxd->num_channels++;
247 
248 	pthread_mutex_unlock(&idxd->num_channels_lock);
249 
250 	/* Allocate descriptors and completions */
251 	num_descriptors = idxd->total_wq_size / idxd->chan_per_device;
252 	chan->desc_base = desc = spdk_zmalloc(num_descriptors * sizeof(struct idxd_hw_desc),
253 					      0x40, NULL,
254 					      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
255 	if (chan->desc_base == NULL) {
256 		SPDK_ERRLOG("Failed to allocate DSA descriptor memory\n");
257 		goto error;
258 	}
259 
260 	chan->ops_base = op = spdk_zmalloc(num_descriptors * sizeof(struct idxd_ops),
261 					   0x40, NULL,
262 					   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
263 	if (chan->ops_base == NULL) {
264 		SPDK_ERRLOG("Failed to allocate idxd_ops memory\n");
265 		goto error;
266 	}
267 
268 	if (idxd->type == IDXD_DEV_TYPE_DSA) {
269 		comp_rec_size = sizeof(struct dsa_hw_comp_record);
270 		if (_dsa_alloc_batches(chan, num_descriptors)) {
271 			goto error;
272 		}
273 	} else {
274 		comp_rec_size = sizeof(struct iaa_hw_comp_record);
275 	}
276 
277 	for (i = 0; i < num_descriptors; i++) {
278 		STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
279 		op->desc = desc;
280 		rc = _vtophys(chan, &op->hw, &desc->completion_addr, comp_rec_size);
281 		if (rc) {
282 			SPDK_ERRLOG("Failed to translate completion memory\n");
283 			goto error;
284 		}
285 		op++;
286 		desc++;
287 	}
288 
289 	return chan;
290 
291 error:
292 	spdk_free(chan->ops_base);
293 	chan->ops_base = NULL;
294 	spdk_free(chan->desc_base);
295 	chan->desc_base = NULL;
296 	free(chan);
297 	return NULL;
298 }
299 
300 static int idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status);
301 
302 void
303 spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan)
304 {
305 	struct idxd_batch *batch;
306 
307 	assert(chan != NULL);
308 	assert(chan->idxd != NULL);
309 
310 	if (chan->batch) {
311 		idxd_batch_cancel(chan, -ECANCELED);
312 	}
313 
314 	pthread_mutex_lock(&chan->idxd->num_channels_lock);
315 	assert(chan->idxd->num_channels > 0);
316 	chan->idxd->num_channels--;
317 	pthread_mutex_unlock(&chan->idxd->num_channels_lock);
318 
319 	spdk_free(chan->ops_base);
320 	spdk_free(chan->desc_base);
321 	while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
322 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
323 		spdk_free(batch->user_ops);
324 		spdk_free(batch->user_desc);
325 	}
326 	free(chan->batch_base);
327 	free(chan);
328 }
329 
330 static inline struct spdk_idxd_impl *
331 idxd_get_impl_by_name(const char *impl_name)
332 {
333 	struct spdk_idxd_impl *impl;
334 
335 	assert(impl_name != NULL);
336 	STAILQ_FOREACH(impl, &g_idxd_impls, link) {
337 		if (0 == strcmp(impl_name, impl->name)) {
338 			return impl;
339 		}
340 	}
341 
342 	return NULL;
343 }
344 
345 void
346 spdk_idxd_set_config(bool kernel_mode)
347 {
348 	struct spdk_idxd_impl *tmp;
349 
350 	if (kernel_mode) {
351 		tmp = idxd_get_impl_by_name(KERNEL_DRIVER_NAME);
352 	} else {
353 		tmp = idxd_get_impl_by_name(USERSPACE_DRIVER_NAME);
354 	}
355 
356 	if (g_idxd_impl != NULL && g_idxd_impl != tmp) {
357 		SPDK_ERRLOG("Cannot change idxd implementation after devices are initialized\n");
358 		assert(false);
359 		return;
360 	}
361 	g_idxd_impl = tmp;
362 
363 	if (g_idxd_impl == NULL) {
364 		SPDK_ERRLOG("Cannot set the idxd implementation with %s mode\n",
365 			    kernel_mode ? KERNEL_DRIVER_NAME : USERSPACE_DRIVER_NAME);
366 		return;
367 	}
368 }
369 
370 static void
371 idxd_device_destruct(struct spdk_idxd_device *idxd)
372 {
373 	assert(idxd->impl != NULL);
374 
375 	idxd->impl->destruct(idxd);
376 }
377 
378 int
379 spdk_idxd_probe(void *cb_ctx, spdk_idxd_attach_cb attach_cb,
380 		spdk_idxd_probe_cb probe_cb)
381 {
382 	if (g_idxd_impl == NULL) {
383 		SPDK_ERRLOG("No idxd impl is selected\n");
384 		return -1;
385 	}
386 
387 	return g_idxd_impl->probe(cb_ctx, attach_cb, probe_cb);
388 }
389 
390 void
391 spdk_idxd_detach(struct spdk_idxd_device *idxd)
392 {
393 	assert(idxd != NULL);
394 	idxd_device_destruct(idxd);
395 }
396 
397 static int
398 _idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, void *cb_arg,
399 		   int flags, struct idxd_hw_desc **_desc, struct idxd_ops **_op)
400 {
401 	struct idxd_hw_desc *desc;
402 	struct idxd_ops *op;
403 	uint64_t comp_addr;
404 
405 	if (!STAILQ_EMPTY(&chan->ops_pool)) {
406 		op = *_op = STAILQ_FIRST(&chan->ops_pool);
407 		desc = *_desc = op->desc;
408 		comp_addr = desc->completion_addr;
409 		memset(desc, 0, sizeof(*desc));
410 		desc->completion_addr = comp_addr;
411 		STAILQ_REMOVE_HEAD(&chan->ops_pool, link);
412 	} else {
413 		/* The application needs to handle this, violation of flow control */
414 		return -EBUSY;
415 	}
416 
417 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
418 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
419 
420 	desc->flags = flags;
421 	op->cb_arg = cb_arg;
422 	op->cb_fn = cb_fn;
423 	op->batch = NULL;
424 	op->parent = NULL;
425 	op->count = 1;
426 
427 	return 0;
428 }
429 
430 static int
431 _idxd_prep_batch_cmd(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn,
432 		     void *cb_arg, int flags,
433 		     struct idxd_hw_desc **_desc, struct idxd_ops **_op)
434 {
435 	struct idxd_hw_desc *desc;
436 	struct idxd_ops *op;
437 	uint64_t comp_addr;
438 	struct idxd_batch *batch;
439 
440 	batch = chan->batch;
441 
442 	assert(batch != NULL);
443 	if (batch->index == DESC_PER_BATCH) {
444 		return -EBUSY;
445 	}
446 
447 	desc = *_desc = &batch->user_desc[batch->index];
448 	op = *_op = &batch->user_ops[batch->index];
449 
450 	op->desc = desc;
451 	SPDK_DEBUGLOG(idxd, "Prep batch %p index %u\n", batch, batch->index);
452 
453 	batch->index++;
454 
455 	comp_addr = desc->completion_addr;
456 	memset(desc, 0, sizeof(*desc));
457 	desc->completion_addr = comp_addr;
458 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
459 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
460 	desc->flags = flags;
461 	op->cb_arg = cb_arg;
462 	op->cb_fn = cb_fn;
463 	op->batch = batch;
464 	op->parent = NULL;
465 	op->count = 1;
466 	op->crc_dst = NULL;
467 
468 	return 0;
469 }
470 
471 static struct idxd_batch *
472 idxd_batch_create(struct spdk_idxd_io_channel *chan)
473 {
474 	struct idxd_batch *batch;
475 
476 	assert(chan != NULL);
477 	assert(chan->batch == NULL);
478 
479 	if (!TAILQ_EMPTY(&chan->batch_pool)) {
480 		batch = TAILQ_FIRST(&chan->batch_pool);
481 		batch->index = 0;
482 		batch->chan = chan;
483 		chan->batch = batch;
484 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
485 	} else {
486 		/* The application needs to handle this. */
487 		return NULL;
488 	}
489 
490 	return batch;
491 }
492 
493 static void
494 _free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan)
495 {
496 	SPDK_DEBUGLOG(idxd, "Free batch %p\n", batch);
497 	assert(batch->refcnt == 0);
498 	batch->index = 0;
499 	batch->chan = NULL;
500 	TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
501 }
502 
503 static int
504 idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status)
505 {
506 	struct idxd_ops *op;
507 	struct idxd_batch *batch;
508 	int i;
509 
510 	assert(chan != NULL);
511 
512 	batch = chan->batch;
513 	assert(batch != NULL);
514 
515 	if (batch->index == UINT8_MAX) {
516 		SPDK_ERRLOG("Cannot cancel batch, already submitted to HW.\n");
517 		return -EINVAL;
518 	}
519 
520 	chan->batch = NULL;
521 
522 	for (i = 0; i < batch->index; i++) {
523 		op = &batch->user_ops[i];
524 		if (op->cb_fn) {
525 			op->cb_fn(op->cb_arg, status);
526 		}
527 	}
528 
529 	_free_batch(batch, chan);
530 
531 	return 0;
532 }
533 
534 static int
535 idxd_batch_submit(struct spdk_idxd_io_channel *chan,
536 		  spdk_idxd_req_cb cb_fn, void *cb_arg)
537 {
538 	struct idxd_hw_desc *desc;
539 	struct idxd_batch *batch;
540 	struct idxd_ops *op;
541 	int i, rc, flags = 0;
542 
543 	assert(chan != NULL);
544 
545 	batch = chan->batch;
546 	assert(batch != NULL);
547 
548 	if (batch->index == 0) {
549 		return idxd_batch_cancel(chan, 0);
550 	}
551 
552 	/* Common prep. */
553 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
554 	if (rc) {
555 		return rc;
556 	}
557 
558 	if (batch->index == 1) {
559 		uint64_t completion_addr;
560 
561 		/* If there's only one command, convert it away from a batch. */
562 		completion_addr = desc->completion_addr;
563 		memcpy(desc, &batch->user_desc[0], sizeof(*desc));
564 		desc->completion_addr = completion_addr;
565 		op->cb_fn = batch->user_ops[0].cb_fn;
566 		op->cb_arg = batch->user_ops[0].cb_arg;
567 		op->crc_dst = batch->user_ops[0].crc_dst;
568 		_free_batch(batch, chan);
569 	} else {
570 		/* Command specific. */
571 		desc->opcode = IDXD_OPCODE_BATCH;
572 		desc->desc_list_addr = batch->user_desc_addr;
573 		desc->desc_count = batch->index;
574 		assert(batch->index <= DESC_PER_BATCH);
575 
576 		/* Add the batch elements completion contexts to the outstanding list to be polled. */
577 		for (i = 0 ; i < batch->index; i++) {
578 			batch->refcnt++;
579 			STAILQ_INSERT_TAIL(&chan->ops_outstanding, (struct idxd_ops *)&batch->user_ops[i],
580 					   link);
581 		}
582 		batch->index = UINT8_MAX;
583 	}
584 
585 	chan->batch = NULL;
586 
587 	/* Submit operation. */
588 	_submit_to_hw(chan, op);
589 	SPDK_DEBUGLOG(idxd, "Submitted batch %p\n", batch);
590 
591 	return 0;
592 }
593 
594 static int
595 _idxd_setup_batch(struct spdk_idxd_io_channel *chan)
596 {
597 	struct idxd_batch *batch;
598 
599 	if (chan->batch == NULL) {
600 		batch = idxd_batch_create(chan);
601 		if (batch == NULL) {
602 			return -EBUSY;
603 		}
604 	}
605 
606 	return 0;
607 }
608 
609 static int
610 _idxd_flush_batch(struct spdk_idxd_io_channel *chan)
611 {
612 	int rc;
613 
614 	if (chan->batch != NULL && chan->batch->index >= DESC_PER_BATCH) {
615 		/* Close out the full batch */
616 		rc = idxd_batch_submit(chan, NULL, NULL);
617 		if (rc) {
618 			assert(rc == -EBUSY);
619 			/*
620 			 * Return 0. This will get re-submitted within idxd_process_events where
621 			 * if it fails, it will get correctly aborted.
622 			 */
623 			return 0;
624 		}
625 	}
626 
627 	return 0;
628 }
629 
630 static inline void
631 _update_write_flags(struct spdk_idxd_io_channel *chan, struct idxd_hw_desc *desc)
632 {
633 	desc->flags ^= IDXD_FLAG_CACHE_CONTROL;
634 }
635 
636 int
637 spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan,
638 		      struct iovec *diov, uint32_t diovcnt,
639 		      struct iovec *siov, uint32_t siovcnt,
640 		      int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
641 {
642 	struct idxd_hw_desc *desc;
643 	struct idxd_ops *first_op, *op;
644 	void *src, *dst;
645 	uint64_t src_addr, dst_addr;
646 	int rc, count;
647 	uint64_t len, seg_len;
648 	struct spdk_ioviter iter;
649 	struct idxd_vtophys_iter vtophys_iter;
650 
651 	assert(chan != NULL);
652 	assert(diov != NULL);
653 	assert(siov != NULL);
654 
655 	rc = _idxd_setup_batch(chan);
656 	if (rc) {
657 		return rc;
658 	}
659 
660 	count = 0;
661 	first_op = NULL;
662 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
663 	     len > 0;
664 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
665 
666 		idxd_vtophys_iter_init(chan, &vtophys_iter, src, dst, len);
667 
668 		while (len > 0) {
669 			if (first_op == NULL) {
670 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
671 				if (rc) {
672 					goto error;
673 				}
674 
675 				first_op = op;
676 			} else {
677 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
678 				if (rc) {
679 					goto error;
680 				}
681 
682 				first_op->count++;
683 				op->parent = first_op;
684 			}
685 
686 			count++;
687 
688 			src_addr = 0;
689 			dst_addr = 0;
690 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
691 			if (seg_len == SPDK_VTOPHYS_ERROR) {
692 				rc = -EFAULT;
693 				goto error;
694 			}
695 
696 			desc->opcode = IDXD_OPCODE_MEMMOVE;
697 			desc->src_addr = src_addr;
698 			desc->dst_addr = dst_addr;
699 			desc->xfer_size = seg_len;
700 			_update_write_flags(chan, desc);
701 
702 			len -= seg_len;
703 		}
704 	}
705 
706 	return _idxd_flush_batch(chan);
707 
708 error:
709 	chan->batch->index -= count;
710 	return rc;
711 }
712 
713 /* Dual-cast copies the same source to two separate destination buffers. */
714 int
715 spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *dst2,
716 			  const void *src, uint64_t nbytes, int flags,
717 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
718 {
719 	struct idxd_hw_desc *desc;
720 	struct idxd_ops *first_op, *op;
721 	uint64_t src_addr, dst1_addr, dst2_addr;
722 	int rc, count;
723 	uint64_t len;
724 	uint64_t outer_seg_len, inner_seg_len;
725 	struct idxd_vtophys_iter iter_outer, iter_inner;
726 
727 	assert(chan != NULL);
728 	assert(dst1 != NULL);
729 	assert(dst2 != NULL);
730 	assert(src != NULL);
731 
732 	if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) {
733 		SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n");
734 		return -EINVAL;
735 	}
736 
737 	rc = _idxd_setup_batch(chan);
738 	if (rc) {
739 		return rc;
740 	}
741 
742 	idxd_vtophys_iter_init(chan, &iter_outer, src, dst1, nbytes);
743 
744 	first_op = NULL;
745 	count = 0;
746 	while (nbytes > 0) {
747 		src_addr = 0;
748 		dst1_addr = 0;
749 		outer_seg_len = idxd_vtophys_iter_next(&iter_outer, &src_addr, &dst1_addr);
750 		if (outer_seg_len == SPDK_VTOPHYS_ERROR) {
751 			goto error;
752 		}
753 
754 		idxd_vtophys_iter_init(chan, &iter_inner, src, dst2, nbytes);
755 
756 		src += outer_seg_len;
757 		nbytes -= outer_seg_len;
758 
759 		while (outer_seg_len > 0) {
760 			if (first_op == NULL) {
761 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
762 				if (rc) {
763 					goto error;
764 				}
765 
766 				first_op = op;
767 			} else {
768 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
769 				if (rc) {
770 					goto error;
771 				}
772 
773 				first_op->count++;
774 				op->parent = first_op;
775 			}
776 
777 			count++;
778 
779 			src_addr = 0;
780 			dst2_addr = 0;
781 			inner_seg_len = idxd_vtophys_iter_next(&iter_inner, &src_addr, &dst2_addr);
782 			if (inner_seg_len == SPDK_VTOPHYS_ERROR) {
783 				rc = -EFAULT;
784 				goto error;
785 			}
786 
787 			len = spdk_min(outer_seg_len, inner_seg_len);
788 
789 			/* Command specific. */
790 			desc->opcode = IDXD_OPCODE_DUALCAST;
791 			desc->src_addr = src_addr;
792 			desc->dst_addr = dst1_addr;
793 			desc->dest2 = dst2_addr;
794 			desc->xfer_size = len;
795 			_update_write_flags(chan, desc);
796 
797 			dst1_addr += len;
798 			outer_seg_len -= len;
799 		}
800 	}
801 
802 	return _idxd_flush_batch(chan);
803 
804 error:
805 	chan->batch->index -= count;
806 	return rc;
807 }
808 
809 int
810 spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan,
811 			 struct iovec *siov1, size_t siov1cnt,
812 			 struct iovec *siov2, size_t siov2cnt,
813 			 int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
814 {
815 
816 	struct idxd_hw_desc *desc;
817 	struct idxd_ops *first_op, *op;
818 	void *src1, *src2;
819 	uint64_t src1_addr, src2_addr;
820 	int rc, count;
821 	uint64_t len, seg_len;
822 	struct spdk_ioviter iter;
823 	struct idxd_vtophys_iter vtophys_iter;
824 
825 	assert(chan != NULL);
826 	assert(siov1 != NULL);
827 	assert(siov2 != NULL);
828 
829 	rc = _idxd_setup_batch(chan);
830 	if (rc) {
831 		return rc;
832 	}
833 
834 	count = 0;
835 	first_op = NULL;
836 	for (len = spdk_ioviter_first(&iter, siov1, siov1cnt, siov2, siov2cnt, &src1, &src2);
837 	     len > 0;
838 	     len = spdk_ioviter_next(&iter, &src1, &src2)) {
839 
840 		idxd_vtophys_iter_init(chan, &vtophys_iter, src1, src2, len);
841 
842 		while (len > 0) {
843 			if (first_op == NULL) {
844 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
845 				if (rc) {
846 					goto error;
847 				}
848 
849 				first_op = op;
850 			} else {
851 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
852 				if (rc) {
853 					goto error;
854 				}
855 
856 				first_op->count++;
857 				op->parent = first_op;
858 			}
859 
860 			count++;
861 
862 			src1_addr = 0;
863 			src2_addr = 0;
864 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src1_addr, &src2_addr);
865 			if (seg_len == SPDK_VTOPHYS_ERROR) {
866 				rc = -EFAULT;
867 				goto error;
868 			}
869 
870 			desc->opcode = IDXD_OPCODE_COMPARE;
871 			desc->src_addr = src1_addr;
872 			desc->src2_addr = src2_addr;
873 			desc->xfer_size = seg_len;
874 
875 			len -= seg_len;
876 		}
877 	}
878 
879 	return _idxd_flush_batch(chan);
880 
881 error:
882 	chan->batch->index -= count;
883 	return rc;
884 }
885 
886 int
887 spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan,
888 		      struct iovec *diov, size_t diovcnt,
889 		      uint64_t fill_pattern, int flags,
890 		      spdk_idxd_req_cb cb_fn, void *cb_arg)
891 {
892 	struct idxd_hw_desc *desc;
893 	struct idxd_ops *first_op, *op;
894 	uint64_t dst_addr;
895 	int rc, count;
896 	uint64_t len, seg_len;
897 	void *dst;
898 	size_t i;
899 
900 	assert(chan != NULL);
901 	assert(diov != NULL);
902 
903 	rc = _idxd_setup_batch(chan);
904 	if (rc) {
905 		return rc;
906 	}
907 
908 	count = 0;
909 	first_op = NULL;
910 	for (i = 0; i < diovcnt; i++) {
911 		len = diov[i].iov_len;
912 		dst = diov[i].iov_base;
913 
914 		while (len > 0) {
915 			if (first_op == NULL) {
916 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
917 				if (rc) {
918 					goto error;
919 				}
920 
921 				first_op = op;
922 			} else {
923 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
924 				if (rc) {
925 					goto error;
926 				}
927 
928 				first_op->count++;
929 				op->parent = first_op;
930 			}
931 
932 			count++;
933 
934 			seg_len = len;
935 			if (chan->pasid_enabled) {
936 				dst_addr = (uint64_t)dst;
937 			} else {
938 				dst_addr = spdk_vtophys(dst, &seg_len);
939 				if (dst_addr == SPDK_VTOPHYS_ERROR) {
940 					SPDK_ERRLOG("Error translating address\n");
941 					rc = -EFAULT;
942 					goto error;
943 				}
944 			}
945 
946 			seg_len = spdk_min(seg_len, len);
947 
948 			desc->opcode = IDXD_OPCODE_MEMFILL;
949 			desc->pattern = fill_pattern;
950 			desc->dst_addr = dst_addr;
951 			desc->xfer_size = seg_len;
952 			_update_write_flags(chan, desc);
953 
954 			len -= seg_len;
955 			dst += seg_len;
956 		}
957 	}
958 
959 	return _idxd_flush_batch(chan);
960 
961 error:
962 	chan->batch->index -= count;
963 	return rc;
964 }
965 
966 int
967 spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan,
968 			struct iovec *siov, size_t siovcnt,
969 			uint32_t seed, uint32_t *crc_dst, int flags,
970 			spdk_idxd_req_cb cb_fn, void *cb_arg)
971 {
972 	struct idxd_hw_desc *desc;
973 	struct idxd_ops *first_op, *op;
974 	uint64_t src_addr;
975 	int rc, count;
976 	uint64_t len, seg_len;
977 	void *src;
978 	size_t i;
979 	uint64_t prev_crc = 0;
980 
981 	assert(chan != NULL);
982 	assert(siov != NULL);
983 
984 	rc = _idxd_setup_batch(chan);
985 	if (rc) {
986 		return rc;
987 	}
988 
989 	count = 0;
990 	op = NULL;
991 	first_op = NULL;
992 	for (i = 0; i < siovcnt; i++) {
993 		len = siov[i].iov_len;
994 		src = siov[i].iov_base;
995 
996 		while (len > 0) {
997 			if (first_op == NULL) {
998 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
999 				if (rc) {
1000 					goto error;
1001 				}
1002 
1003 				first_op = op;
1004 			} else {
1005 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1006 				if (rc) {
1007 					goto error;
1008 				}
1009 
1010 				first_op->count++;
1011 				op->parent = first_op;
1012 			}
1013 
1014 			count++;
1015 
1016 			seg_len = len;
1017 			if (chan->pasid_enabled) {
1018 				src_addr = (uint64_t)src;
1019 			} else {
1020 				src_addr = spdk_vtophys(src, &seg_len);
1021 				if (src_addr == SPDK_VTOPHYS_ERROR) {
1022 					SPDK_ERRLOG("Error translating address\n");
1023 					rc = -EFAULT;
1024 					goto error;
1025 				}
1026 			}
1027 
1028 			seg_len = spdk_min(seg_len, len);
1029 
1030 			desc->opcode = IDXD_OPCODE_CRC32C_GEN;
1031 			desc->src_addr = src_addr;
1032 			if (op == first_op) {
1033 				desc->crc32c.seed = seed;
1034 			} else {
1035 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
1036 				desc->crc32c.addr = prev_crc;
1037 			}
1038 
1039 			desc->xfer_size = seg_len;
1040 			prev_crc = desc->completion_addr + offsetof(struct dsa_hw_comp_record, crc32c_val);
1041 
1042 			len -= seg_len;
1043 			src += seg_len;
1044 		}
1045 	}
1046 
1047 	/* Only the last op copies the crc to the destination */
1048 	if (op) {
1049 		op->crc_dst = crc_dst;
1050 	}
1051 
1052 	return _idxd_flush_batch(chan);
1053 
1054 error:
1055 	chan->batch->index -= count;
1056 	return rc;
1057 }
1058 
1059 int
1060 spdk_idxd_submit_copy_crc32c(struct spdk_idxd_io_channel *chan,
1061 			     struct iovec *diov, size_t diovcnt,
1062 			     struct iovec *siov, size_t siovcnt,
1063 			     uint32_t seed, uint32_t *crc_dst, int flags,
1064 			     spdk_idxd_req_cb cb_fn, void *cb_arg)
1065 {
1066 	struct idxd_hw_desc *desc;
1067 	struct idxd_ops *first_op, *op;
1068 	void *src, *dst;
1069 	uint64_t src_addr, dst_addr;
1070 	int rc, count;
1071 	uint64_t len, seg_len;
1072 	struct spdk_ioviter iter;
1073 	struct idxd_vtophys_iter vtophys_iter;
1074 	uint64_t prev_crc = 0;
1075 
1076 	assert(chan != NULL);
1077 	assert(diov != NULL);
1078 	assert(siov != NULL);
1079 
1080 	rc = _idxd_setup_batch(chan);
1081 	if (rc) {
1082 		return rc;
1083 	}
1084 
1085 	count = 0;
1086 	op = NULL;
1087 	first_op = NULL;
1088 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
1089 	     len > 0;
1090 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
1091 
1092 
1093 		idxd_vtophys_iter_init(chan, &vtophys_iter, src, dst, len);
1094 
1095 		while (len > 0) {
1096 			if (first_op == NULL) {
1097 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1098 				if (rc) {
1099 					goto error;
1100 				}
1101 
1102 				first_op = op;
1103 			} else {
1104 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1105 				if (rc) {
1106 					goto error;
1107 				}
1108 
1109 				first_op->count++;
1110 				op->parent = first_op;
1111 			}
1112 
1113 			count++;
1114 
1115 			src_addr = 0;
1116 			dst_addr = 0;
1117 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
1118 			if (seg_len == SPDK_VTOPHYS_ERROR) {
1119 				rc = -EFAULT;
1120 				goto error;
1121 			}
1122 
1123 			desc->opcode = IDXD_OPCODE_COPY_CRC;
1124 			desc->dst_addr = dst_addr;
1125 			desc->src_addr = src_addr;
1126 			_update_write_flags(chan, desc);
1127 			if (op == first_op) {
1128 				desc->crc32c.seed = seed;
1129 			} else {
1130 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
1131 				desc->crc32c.addr = prev_crc;
1132 			}
1133 
1134 			desc->xfer_size = seg_len;
1135 			prev_crc = desc->completion_addr + offsetof(struct dsa_hw_comp_record, crc32c_val);
1136 
1137 			len -= seg_len;
1138 		}
1139 	}
1140 
1141 	/* Only the last op copies the crc to the destination */
1142 	if (op) {
1143 		op->crc_dst = crc_dst;
1144 	}
1145 
1146 	return _idxd_flush_batch(chan);
1147 
1148 error:
1149 	chan->batch->index -= count;
1150 	return rc;
1151 }
1152 
1153 static inline int
1154 _idxd_submit_compress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1155 			     uint64_t nbytes_dst, uint64_t nbytes_src, uint32_t *output_size,
1156 			     int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1157 {
1158 	struct idxd_hw_desc *desc;
1159 	struct idxd_ops *op;
1160 	uint64_t src_addr, dst_addr;
1161 	int rc;
1162 
1163 	/* Common prep. */
1164 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1165 	if (rc) {
1166 		return rc;
1167 	}
1168 
1169 	rc = _vtophys(chan, src, &src_addr, nbytes_src);
1170 	if (rc) {
1171 		goto error;
1172 	}
1173 
1174 	rc = _vtophys(chan, dst, &dst_addr, nbytes_dst);
1175 	if (rc) {
1176 		goto error;
1177 	}
1178 
1179 	/* Command specific. */
1180 	desc->opcode = IDXD_OPCODE_COMPRESS;
1181 	desc->src1_addr = src_addr;
1182 	desc->dst_addr = dst_addr;
1183 	desc->src1_size = nbytes_src;
1184 	desc->iaa.max_dst_size = nbytes_dst;
1185 	desc->iaa.src2_size = sizeof(struct iaa_aecs);
1186 	desc->iaa.src2_addr = chan->idxd->aecs_addr;
1187 	desc->flags |= IAA_FLAG_RD_SRC2_AECS;
1188 	desc->compr_flags = IAA_COMP_FLAGS;
1189 	op->output_size = output_size;
1190 
1191 	_submit_to_hw(chan, op);
1192 	return 0;
1193 error:
1194 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1195 	return rc;
1196 }
1197 
1198 int
1199 spdk_idxd_submit_compress(struct spdk_idxd_io_channel *chan,
1200 			  void *dst, uint64_t nbytes,
1201 			  struct iovec *siov, uint32_t siovcnt, uint32_t *output_size,
1202 			  int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1203 {
1204 	assert(chan != NULL);
1205 	assert(dst != NULL);
1206 	assert(siov != NULL);
1207 
1208 	if (siovcnt == 1) {
1209 		/* Simple case - copying one buffer to another */
1210 		if (nbytes < siov[0].iov_len) {
1211 			return -EINVAL;
1212 		}
1213 
1214 		return _idxd_submit_compress_single(chan, dst, siov[0].iov_base,
1215 						    nbytes, siov[0].iov_len,
1216 						    output_size, flags, cb_fn, cb_arg);
1217 	}
1218 	/* TODO: vectored support */
1219 	return -EINVAL;
1220 }
1221 
1222 static inline int
1223 _idxd_submit_decompress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1224 			       uint64_t nbytes_dst, uint64_t nbytes, int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1225 {
1226 	struct idxd_hw_desc *desc;
1227 	struct idxd_ops *op;
1228 	uint64_t src_addr, dst_addr;
1229 	int rc;
1230 
1231 	/* Common prep. */
1232 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1233 	if (rc) {
1234 		return rc;
1235 	}
1236 
1237 	rc = _vtophys(chan, src, &src_addr, nbytes);
1238 	if (rc) {
1239 		goto error;
1240 	}
1241 
1242 	rc = _vtophys(chan, dst, &dst_addr, nbytes_dst);
1243 	if (rc) {
1244 		goto error;
1245 	}
1246 
1247 	/* Command specific. */
1248 	desc->opcode = IDXD_OPCODE_DECOMPRESS;
1249 	desc->src1_addr = src_addr;
1250 	desc->dst_addr = dst_addr;
1251 	desc->src1_size = nbytes;
1252 	desc->iaa.max_dst_size = nbytes_dst;
1253 	desc->decompr_flags = IAA_DECOMP_FLAGS;
1254 
1255 	_submit_to_hw(chan, op);
1256 	return 0;
1257 error:
1258 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1259 	return rc;
1260 }
1261 
1262 int
1263 spdk_idxd_submit_decompress(struct spdk_idxd_io_channel *chan,
1264 			    struct iovec *diov, uint32_t diovcnt,
1265 			    struct iovec *siov, uint32_t siovcnt,
1266 			    int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1267 {
1268 	assert(chan != NULL);
1269 	assert(diov != NULL);
1270 	assert(siov != NULL);
1271 
1272 	if (diovcnt == 1 && siovcnt == 1) {
1273 		/* Simple case - copying one buffer to another */
1274 		if (diov[0].iov_len < siov[0].iov_len) {
1275 			return -EINVAL;
1276 		}
1277 
1278 		return _idxd_submit_decompress_single(chan, diov[0].iov_base, siov[0].iov_base,
1279 						      diov[0].iov_len, siov[0].iov_len,
1280 						      flags, cb_fn, cb_arg);
1281 	}
1282 	/* TODO: vectored support */
1283 	return -EINVAL;
1284 }
1285 
1286 int
1287 spdk_idxd_submit_raw_desc(struct spdk_idxd_io_channel *chan,
1288 			  struct idxd_hw_desc *_desc,
1289 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
1290 {
1291 	struct idxd_hw_desc *desc;
1292 	struct idxd_ops *op;
1293 	int rc, flags = 0;
1294 	uint64_t comp_addr;
1295 
1296 	assert(chan != NULL);
1297 	assert(_desc != NULL);
1298 
1299 	/* Common prep. */
1300 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1301 	if (rc) {
1302 		return rc;
1303 	}
1304 
1305 	/* Command specific. */
1306 	flags = desc->flags;
1307 	comp_addr = desc->completion_addr;
1308 	memcpy(desc, _desc, sizeof(*desc));
1309 	desc->flags |= flags;
1310 	desc->completion_addr = comp_addr;
1311 
1312 	/* Submit operation. */
1313 	_submit_to_hw(chan, op);
1314 
1315 	return 0;
1316 }
1317 
1318 static inline void
1319 _dump_sw_error_reg(struct spdk_idxd_io_channel *chan)
1320 {
1321 	struct spdk_idxd_device *idxd = chan->idxd;
1322 
1323 	assert(idxd != NULL);
1324 	idxd->impl->dump_sw_error(idxd, chan->portal);
1325 }
1326 
1327 /* TODO: more performance experiments. */
1328 #define IDXD_COMPLETION(x) ((x) > (0) ? (1) : (0))
1329 #define IDXD_FAILURE(x) ((x) > (1) ? (1) : (0))
1330 #define IDXD_SW_ERROR(x) ((x) &= (0x1) ? (1) : (0))
1331 int
1332 spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
1333 {
1334 	struct idxd_ops *op, *tmp, *parent_op;
1335 	int status = 0;
1336 	int rc2, rc = 0;
1337 	void *cb_arg;
1338 	spdk_idxd_req_cb cb_fn;
1339 
1340 	assert(chan != NULL);
1341 
1342 	STAILQ_FOREACH_SAFE(op, &chan->ops_outstanding, link, tmp) {
1343 		if (!IDXD_COMPLETION(op->hw.status)) {
1344 			/*
1345 			 * oldest locations are at the head of the list so if
1346 			 * we've polled a location that hasn't completed, bail
1347 			 * now as there are unlikely to be any more completions.
1348 			 */
1349 			break;
1350 		}
1351 
1352 		STAILQ_REMOVE_HEAD(&chan->ops_outstanding, link);
1353 		rc++;
1354 
1355 		/* Status is in the same location for both IAA and DSA completion records. */
1356 		if (spdk_unlikely(IDXD_FAILURE(op->hw.status))) {
1357 			SPDK_ERRLOG("Completion status 0x%x\n", op->hw.status);
1358 			status = -EINVAL;
1359 			_dump_sw_error_reg(chan);
1360 		}
1361 
1362 		switch (op->desc->opcode) {
1363 		case IDXD_OPCODE_BATCH:
1364 			SPDK_DEBUGLOG(idxd, "Complete batch %p\n", op->batch);
1365 			break;
1366 		case IDXD_OPCODE_CRC32C_GEN:
1367 		case IDXD_OPCODE_COPY_CRC:
1368 			if (spdk_likely(status == 0 && op->crc_dst != NULL)) {
1369 				*op->crc_dst = op->hw.crc32c_val;
1370 				*op->crc_dst ^= ~0;
1371 			}
1372 			break;
1373 		case IDXD_OPCODE_COMPARE:
1374 			if (spdk_likely(status == 0)) {
1375 				status = op->hw.result;
1376 			}
1377 			break;
1378 		case IDXD_OPCODE_COMPRESS:
1379 			if (spdk_likely(status == 0 && op->output_size != NULL)) {
1380 				*op->output_size = op->iaa_hw.output_size;
1381 			}
1382 			break;
1383 		}
1384 
1385 		/* TODO: WHAT IF THIS FAILED!? */
1386 		op->hw.status = 0;
1387 
1388 		assert(op->count > 0);
1389 		op->count--;
1390 
1391 		parent_op = op->parent;
1392 		if (parent_op != NULL) {
1393 			assert(parent_op->count > 0);
1394 			parent_op->count--;
1395 
1396 			if (parent_op->count == 0) {
1397 				cb_fn = parent_op->cb_fn;
1398 				cb_arg = parent_op->cb_arg;
1399 
1400 				assert(parent_op->batch != NULL);
1401 
1402 				/*
1403 				 * Now that parent_op count is 0, we can release its ref
1404 				 * to its batch. We have not released the ref to the batch
1405 				 * that the op is pointing to yet, which will be done below.
1406 				 */
1407 				parent_op->batch->refcnt--;
1408 				if (parent_op->batch->refcnt == 0) {
1409 					_free_batch(parent_op->batch, chan);
1410 				}
1411 
1412 				if (cb_fn) {
1413 					cb_fn(cb_arg, status);
1414 				}
1415 			}
1416 		}
1417 
1418 		if (op->count == 0) {
1419 			cb_fn = op->cb_fn;
1420 			cb_arg = op->cb_arg;
1421 
1422 			if (op->batch != NULL) {
1423 				assert(op->batch->refcnt > 0);
1424 				op->batch->refcnt--;
1425 
1426 				if (op->batch->refcnt == 0) {
1427 					_free_batch(op->batch, chan);
1428 				}
1429 			} else {
1430 				STAILQ_INSERT_HEAD(&chan->ops_pool, op, link);
1431 			}
1432 
1433 			if (cb_fn) {
1434 				cb_fn(cb_arg, status);
1435 			}
1436 		}
1437 
1438 		/* reset the status */
1439 		status = 0;
1440 		/* break the processing loop to prevent from starving the rest of the system */
1441 		if (rc > IDXD_MAX_COMPLETIONS) {
1442 			break;
1443 		}
1444 	}
1445 
1446 	/* Submit any built-up batch */
1447 	if (chan->batch) {
1448 		rc2 = idxd_batch_submit(chan, NULL, NULL);
1449 		if (rc2) {
1450 			assert(rc2 == -EBUSY);
1451 		}
1452 	}
1453 
1454 	return rc;
1455 }
1456 
1457 void
1458 idxd_impl_register(struct spdk_idxd_impl *impl)
1459 {
1460 	STAILQ_INSERT_HEAD(&g_idxd_impls, impl, link);
1461 }
1462 
1463 SPDK_LOG_REGISTER_COMPONENT(idxd)
1464