xref: /spdk/lib/idxd/idxd.c (revision cdb0726b95631d46eaf4f2e39ddb6533f150fd27)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  *
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "spdk/env.h"
10 #include "spdk/util.h"
11 #include "spdk/memory.h"
12 #include "spdk/likely.h"
13 
14 #include "spdk/log.h"
15 #include "spdk_internal/idxd.h"
16 
17 #include "idxd_internal.h"
18 
19 #define ALIGN_4K 0x1000
20 #define USERSPACE_DRIVER_NAME "user"
21 #define KERNEL_DRIVER_NAME "kernel"
22 
23 static STAILQ_HEAD(, spdk_idxd_impl) g_idxd_impls = STAILQ_HEAD_INITIALIZER(g_idxd_impls);
24 static struct spdk_idxd_impl *g_idxd_impl;
25 
26 uint32_t
27 spdk_idxd_get_socket(struct spdk_idxd_device *idxd)
28 {
29 	return idxd->socket_id;
30 }
31 
32 static inline void
33 _submit_to_hw(struct spdk_idxd_io_channel *chan, struct idxd_ops *op)
34 {
35 	STAILQ_INSERT_TAIL(&chan->ops_outstanding, op, link);
36 	movdir64b(chan->portal + chan->portal_offset, op->desc);
37 	chan->portal_offset = (chan->portal_offset + chan->idxd->chan_per_device * PORTAL_STRIDE) &
38 			      PORTAL_MASK;
39 }
40 
41 inline static int
42 _vtophys(const void *buf, uint64_t *buf_addr, uint64_t size)
43 {
44 	uint64_t updated_size = size;
45 
46 	*buf_addr = spdk_vtophys(buf, &updated_size);
47 
48 	if (*buf_addr == SPDK_VTOPHYS_ERROR) {
49 		SPDK_ERRLOG("Error translating address\n");
50 		return -EINVAL;
51 	}
52 
53 	if (updated_size < size) {
54 		SPDK_ERRLOG("Error translating size (0x%lx), return size (0x%lx)\n", size, updated_size);
55 		return -EINVAL;
56 	}
57 
58 	return 0;
59 }
60 
61 struct idxd_vtophys_iter {
62 	const void	*src;
63 	void		*dst;
64 	uint64_t	len;
65 
66 	uint64_t	offset;
67 };
68 
69 static void
70 idxd_vtophys_iter_init(struct idxd_vtophys_iter *iter,
71 		       const void *src, void *dst, uint64_t len)
72 {
73 	iter->src = src;
74 	iter->dst = dst;
75 	iter->len = len;
76 	iter->offset = 0;
77 }
78 
79 static uint64_t
80 idxd_vtophys_iter_next(struct idxd_vtophys_iter *iter,
81 		       uint64_t *src_phys, uint64_t *dst_phys)
82 {
83 	uint64_t src_off, dst_off, len;
84 	const void *src;
85 	void *dst;
86 
87 	src = iter->src + iter->offset;
88 	dst = iter->dst + iter->offset;
89 
90 	if (iter->offset == iter->len) {
91 		return 0;
92 	}
93 
94 	len = iter->len - iter->offset;
95 
96 	src_off = len;
97 	*src_phys = spdk_vtophys(src, &src_off);
98 	if (*src_phys == SPDK_VTOPHYS_ERROR) {
99 		SPDK_ERRLOG("Error translating address\n");
100 		return SPDK_VTOPHYS_ERROR;
101 	}
102 
103 	dst_off = len;
104 	*dst_phys = spdk_vtophys(dst, &dst_off);
105 	if (*dst_phys == SPDK_VTOPHYS_ERROR) {
106 		SPDK_ERRLOG("Error translating address\n");
107 		return SPDK_VTOPHYS_ERROR;
108 	}
109 
110 	len = spdk_min(src_off, dst_off);
111 	iter->offset += len;
112 
113 	return len;
114 }
115 
116 /* helper function for DSA specific spdk_idxd_get_channel() stuff */
117 static int
118 _dsa_alloc_batches(struct spdk_idxd_io_channel *chan, int num_descriptors)
119 {
120 	struct idxd_batch *batch;
121 	struct idxd_hw_desc *desc;
122 	struct idxd_ops *op;
123 	int i, j, num_batches, rc = -1;
124 
125 	/* Allocate batches */
126 	num_batches = num_descriptors;
127 	chan->batch_base = calloc(num_batches, sizeof(struct idxd_batch));
128 	if (chan->batch_base == NULL) {
129 		SPDK_ERRLOG("Failed to allocate batch pool\n");
130 		goto error_desc;
131 	}
132 	batch = chan->batch_base;
133 	for (i = 0 ; i < num_batches ; i++) {
134 		batch->user_desc = desc = spdk_zmalloc(DESC_PER_BATCH * sizeof(struct idxd_hw_desc),
135 						       0x40, NULL,
136 						       SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
137 		if (batch->user_desc == NULL) {
138 			SPDK_ERRLOG("Failed to allocate batch descriptor memory\n");
139 			goto error_user;
140 		}
141 
142 		rc = _vtophys(batch->user_desc, &batch->user_desc_addr,
143 			      DESC_PER_BATCH * sizeof(struct idxd_hw_desc));
144 		if (rc) {
145 			SPDK_ERRLOG("Failed to translate batch descriptor memory\n");
146 			goto error_user;
147 		}
148 
149 		batch->user_ops = op = spdk_zmalloc(DESC_PER_BATCH * sizeof(struct idxd_ops),
150 						    0x40, NULL,
151 						    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
152 		if (batch->user_ops == NULL) {
153 			SPDK_ERRLOG("Failed to allocate user completion memory\n");
154 			goto error_user;
155 		}
156 
157 		for (j = 0; j < DESC_PER_BATCH; j++) {
158 			rc = _vtophys(&op->hw, &desc->completion_addr, sizeof(struct dsa_hw_comp_record));
159 			if (rc) {
160 				SPDK_ERRLOG("Failed to translate batch entry completion memory\n");
161 				goto error_user;
162 			}
163 			op++;
164 			desc++;
165 		}
166 		TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
167 		batch++;
168 	}
169 	return 0;
170 
171 error_user:
172 	TAILQ_FOREACH(batch, &chan->batch_pool, link) {
173 		spdk_free(batch->user_ops);
174 		batch->user_ops = NULL;
175 		spdk_free(batch->user_desc);
176 		batch->user_desc = NULL;
177 	}
178 	spdk_free(chan->ops_base);
179 	chan->ops_base = NULL;
180 error_desc:
181 	STAILQ_INIT(&chan->ops_pool);
182 	spdk_free(chan->desc_base);
183 	chan->desc_base = NULL;
184 	return rc;
185 }
186 
187 struct spdk_idxd_io_channel *
188 spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
189 {
190 	struct spdk_idxd_io_channel *chan;
191 	struct idxd_hw_desc *desc;
192 	struct idxd_ops *op;
193 	int i, num_descriptors, rc = -1;
194 	uint32_t comp_rec_size;
195 
196 	assert(idxd != NULL);
197 
198 	chan = calloc(1, sizeof(struct spdk_idxd_io_channel));
199 	if (chan == NULL) {
200 		SPDK_ERRLOG("Failed to allocate idxd chan\n");
201 		return NULL;
202 	}
203 
204 	chan->idxd = idxd;
205 	STAILQ_INIT(&chan->ops_pool);
206 	TAILQ_INIT(&chan->batch_pool);
207 	STAILQ_INIT(&chan->ops_outstanding);
208 
209 	/* Assign WQ, portal */
210 	pthread_mutex_lock(&idxd->num_channels_lock);
211 	if (idxd->num_channels == idxd->chan_per_device) {
212 		/* too many channels sharing this device */
213 		pthread_mutex_unlock(&idxd->num_channels_lock);
214 		SPDK_ERRLOG("Too many channels sharing this device\n");
215 		goto error;
216 	}
217 
218 	/* Have each channel start at a different offset. */
219 	chan->portal = idxd->impl->portal_get_addr(idxd);
220 	chan->portal_offset = (idxd->num_channels * PORTAL_STRIDE) & PORTAL_MASK;
221 	idxd->num_channels++;
222 
223 	pthread_mutex_unlock(&idxd->num_channels_lock);
224 
225 	/* Allocate descriptors and completions */
226 	num_descriptors = idxd->total_wq_size / idxd->chan_per_device;
227 	chan->desc_base = desc = spdk_zmalloc(num_descriptors * sizeof(struct idxd_hw_desc),
228 					      0x40, NULL,
229 					      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
230 	if (chan->desc_base == NULL) {
231 		SPDK_ERRLOG("Failed to allocate DSA descriptor memory\n");
232 		goto error;
233 	}
234 
235 	chan->ops_base = op = spdk_zmalloc(num_descriptors * sizeof(struct idxd_ops),
236 					   0x40, NULL,
237 					   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
238 	if (chan->ops_base == NULL) {
239 		SPDK_ERRLOG("Failed to allocate idxd_ops memory\n");
240 		goto error;
241 	}
242 
243 	if (idxd->type == IDXD_DEV_TYPE_DSA) {
244 		comp_rec_size = sizeof(struct dsa_hw_comp_record);
245 		if (_dsa_alloc_batches(chan, num_descriptors)) {
246 			goto error;
247 		}
248 	} else {
249 		comp_rec_size = sizeof(struct iaa_hw_comp_record);
250 	}
251 
252 	for (i = 0; i < num_descriptors; i++) {
253 		STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
254 		op->desc = desc;
255 		rc = _vtophys(&op->hw, &desc->completion_addr, comp_rec_size);
256 		if (rc) {
257 			SPDK_ERRLOG("Failed to translate completion memory\n");
258 			goto error;
259 		}
260 		op++;
261 		desc++;
262 	}
263 
264 	return chan;
265 
266 error:
267 	spdk_free(chan->ops_base);
268 	chan->ops_base = NULL;
269 	spdk_free(chan->desc_base);
270 	chan->desc_base = NULL;
271 	free(chan);
272 	return NULL;
273 }
274 
275 static int idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status);
276 
277 void
278 spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan)
279 {
280 	struct idxd_batch *batch;
281 
282 	assert(chan != NULL);
283 	assert(chan->idxd != NULL);
284 
285 	if (chan->batch) {
286 		idxd_batch_cancel(chan, -ECANCELED);
287 	}
288 
289 	pthread_mutex_lock(&chan->idxd->num_channels_lock);
290 	assert(chan->idxd->num_channels > 0);
291 	chan->idxd->num_channels--;
292 	pthread_mutex_unlock(&chan->idxd->num_channels_lock);
293 
294 	spdk_free(chan->ops_base);
295 	spdk_free(chan->desc_base);
296 	while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
297 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
298 		spdk_free(batch->user_ops);
299 		spdk_free(batch->user_desc);
300 	}
301 	free(chan->batch_base);
302 	free(chan);
303 }
304 
305 static inline struct spdk_idxd_impl *
306 idxd_get_impl_by_name(const char *impl_name)
307 {
308 	struct spdk_idxd_impl *impl;
309 
310 	assert(impl_name != NULL);
311 	STAILQ_FOREACH(impl, &g_idxd_impls, link) {
312 		if (0 == strcmp(impl_name, impl->name)) {
313 			return impl;
314 		}
315 	}
316 
317 	return NULL;
318 }
319 
320 void
321 spdk_idxd_set_config(bool kernel_mode)
322 {
323 	struct spdk_idxd_impl *tmp;
324 
325 	if (kernel_mode) {
326 		tmp = idxd_get_impl_by_name(KERNEL_DRIVER_NAME);
327 	} else {
328 		tmp = idxd_get_impl_by_name(USERSPACE_DRIVER_NAME);
329 	}
330 
331 	if (g_idxd_impl != NULL && g_idxd_impl != tmp) {
332 		SPDK_ERRLOG("Cannot change idxd implementation after devices are initialized\n");
333 		assert(false);
334 		return;
335 	}
336 	g_idxd_impl = tmp;
337 
338 	if (g_idxd_impl == NULL) {
339 		SPDK_ERRLOG("Cannot set the idxd implementation with %s mode\n",
340 			    kernel_mode ? KERNEL_DRIVER_NAME : USERSPACE_DRIVER_NAME);
341 		return;
342 	}
343 }
344 
345 static void
346 idxd_device_destruct(struct spdk_idxd_device *idxd)
347 {
348 	assert(idxd->impl != NULL);
349 
350 	idxd->impl->destruct(idxd);
351 }
352 
353 int
354 spdk_idxd_probe(void *cb_ctx, spdk_idxd_attach_cb attach_cb,
355 		spdk_idxd_probe_cb probe_cb)
356 {
357 	if (g_idxd_impl == NULL) {
358 		SPDK_ERRLOG("No idxd impl is selected\n");
359 		return -1;
360 	}
361 
362 	return g_idxd_impl->probe(cb_ctx, attach_cb, probe_cb);
363 }
364 
365 void
366 spdk_idxd_detach(struct spdk_idxd_device *idxd)
367 {
368 	assert(idxd != NULL);
369 	idxd_device_destruct(idxd);
370 }
371 
372 static int
373 _idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn, void *cb_arg,
374 		   int flags, struct idxd_hw_desc **_desc, struct idxd_ops **_op)
375 {
376 	struct idxd_hw_desc *desc;
377 	struct idxd_ops *op;
378 	uint64_t comp_addr;
379 
380 	if (!STAILQ_EMPTY(&chan->ops_pool)) {
381 		op = *_op = STAILQ_FIRST(&chan->ops_pool);
382 		desc = *_desc = op->desc;
383 		comp_addr = desc->completion_addr;
384 		memset(desc, 0, sizeof(*desc));
385 		desc->completion_addr = comp_addr;
386 		STAILQ_REMOVE_HEAD(&chan->ops_pool, link);
387 	} else {
388 		/* The application needs to handle this, violation of flow control */
389 		return -EBUSY;
390 	}
391 
392 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
393 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
394 
395 	desc->flags = flags;
396 	op->cb_arg = cb_arg;
397 	op->cb_fn = cb_fn;
398 	op->batch = NULL;
399 	op->parent = NULL;
400 	op->count = 1;
401 
402 	return 0;
403 }
404 
405 static int
406 _idxd_prep_batch_cmd(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn,
407 		     void *cb_arg, int flags,
408 		     struct idxd_hw_desc **_desc, struct idxd_ops **_op)
409 {
410 	struct idxd_hw_desc *desc;
411 	struct idxd_ops *op;
412 	uint64_t comp_addr;
413 	struct idxd_batch *batch;
414 
415 	batch = chan->batch;
416 
417 	assert(batch != NULL);
418 	if (batch->index == DESC_PER_BATCH) {
419 		return -EBUSY;
420 	}
421 
422 	desc = *_desc = &batch->user_desc[batch->index];
423 	op = *_op = &batch->user_ops[batch->index];
424 
425 	op->desc = desc;
426 	SPDK_DEBUGLOG(idxd, "Prep batch %p index %u\n", batch, batch->index);
427 
428 	batch->index++;
429 
430 	comp_addr = desc->completion_addr;
431 	memset(desc, 0, sizeof(*desc));
432 	desc->completion_addr = comp_addr;
433 	flags |= IDXD_FLAG_COMPLETION_ADDR_VALID;
434 	flags |= IDXD_FLAG_REQUEST_COMPLETION;
435 	desc->flags = flags;
436 	op->cb_arg = cb_arg;
437 	op->cb_fn = cb_fn;
438 	op->batch = batch;
439 	op->parent = NULL;
440 	op->count = 1;
441 
442 	return 0;
443 }
444 
445 static struct idxd_batch *
446 idxd_batch_create(struct spdk_idxd_io_channel *chan)
447 {
448 	struct idxd_batch *batch;
449 
450 	assert(chan != NULL);
451 	assert(chan->batch == NULL);
452 
453 	if (!TAILQ_EMPTY(&chan->batch_pool)) {
454 		batch = TAILQ_FIRST(&chan->batch_pool);
455 		batch->index = 0;
456 		batch->chan = chan;
457 		chan->batch = batch;
458 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
459 	} else {
460 		/* The application needs to handle this. */
461 		return NULL;
462 	}
463 
464 	return batch;
465 }
466 
467 static void
468 _free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan)
469 {
470 	SPDK_DEBUGLOG(idxd, "Free batch %p\n", batch);
471 	assert(batch->refcnt == 0);
472 	batch->index = 0;
473 	batch->chan = NULL;
474 	TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
475 }
476 
477 static int
478 idxd_batch_cancel(struct spdk_idxd_io_channel *chan, int status)
479 {
480 	struct idxd_ops *op;
481 	struct idxd_batch *batch;
482 	int i;
483 
484 	assert(chan != NULL);
485 
486 	batch = chan->batch;
487 	assert(batch != NULL);
488 
489 	if (batch->index == UINT8_MAX) {
490 		SPDK_ERRLOG("Cannot cancel batch, already submitted to HW.\n");
491 		return -EINVAL;
492 	}
493 
494 	chan->batch = NULL;
495 
496 	for (i = 0; i < batch->index; i++) {
497 		op = &batch->user_ops[i];
498 		if (op->cb_fn) {
499 			op->cb_fn(op->cb_arg, status);
500 		}
501 	}
502 
503 	_free_batch(batch, chan);
504 
505 	return 0;
506 }
507 
508 static int
509 idxd_batch_submit(struct spdk_idxd_io_channel *chan,
510 		  spdk_idxd_req_cb cb_fn, void *cb_arg)
511 {
512 	struct idxd_hw_desc *desc;
513 	struct idxd_batch *batch;
514 	struct idxd_ops *op;
515 	int i, rc, flags = 0;
516 
517 	assert(chan != NULL);
518 
519 	batch = chan->batch;
520 	assert(batch != NULL);
521 
522 	if (batch->index == 0) {
523 		return idxd_batch_cancel(chan, 0);
524 	}
525 
526 	/* Common prep. */
527 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
528 	if (rc) {
529 		return rc;
530 	}
531 
532 	if (batch->index == 1) {
533 		uint64_t completion_addr;
534 
535 		/* If there's only one command, convert it away from a batch. */
536 		completion_addr = desc->completion_addr;
537 		memcpy(desc, &batch->user_desc[0], sizeof(*desc));
538 		desc->completion_addr = completion_addr;
539 		op->cb_fn = batch->user_ops[0].cb_fn;
540 		op->cb_arg = batch->user_ops[0].cb_arg;
541 		op->crc_dst = batch->user_ops[0].crc_dst;
542 		_free_batch(batch, chan);
543 	} else {
544 		/* Command specific. */
545 		desc->opcode = IDXD_OPCODE_BATCH;
546 		desc->desc_list_addr = batch->user_desc_addr;
547 		desc->desc_count = batch->index;
548 		assert(batch->index <= DESC_PER_BATCH);
549 
550 		/* Add the batch elements completion contexts to the outstanding list to be polled. */
551 		for (i = 0 ; i < batch->index; i++) {
552 			batch->refcnt++;
553 			STAILQ_INSERT_TAIL(&chan->ops_outstanding, (struct idxd_ops *)&batch->user_ops[i],
554 					   link);
555 		}
556 		batch->index = UINT8_MAX;
557 	}
558 
559 	chan->batch = NULL;
560 
561 	/* Submit operation. */
562 	_submit_to_hw(chan, op);
563 	SPDK_DEBUGLOG(idxd, "Submitted batch %p\n", batch);
564 
565 	return 0;
566 }
567 
568 static int
569 _idxd_setup_batch(struct spdk_idxd_io_channel *chan)
570 {
571 	struct idxd_batch *batch;
572 
573 	if (chan->batch == NULL) {
574 		batch = idxd_batch_create(chan);
575 		if (batch == NULL) {
576 			return -EBUSY;
577 		}
578 	}
579 
580 	return 0;
581 }
582 
583 static int
584 _idxd_flush_batch(struct spdk_idxd_io_channel *chan)
585 {
586 	int rc;
587 
588 	if (chan->batch != NULL && chan->batch->index >= DESC_PER_BATCH) {
589 		/* Close out the full batch */
590 		rc = idxd_batch_submit(chan, NULL, NULL);
591 		if (rc) {
592 			assert(rc == -EBUSY);
593 			/*
594 			 * Return 0. This will get re-submitted within idxd_process_events where
595 			 * if it fails, it will get correctly aborted.
596 			 */
597 			return 0;
598 		}
599 	}
600 
601 	return 0;
602 }
603 
604 int
605 spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan,
606 		      struct iovec *diov, uint32_t diovcnt,
607 		      struct iovec *siov, uint32_t siovcnt,
608 		      int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
609 {
610 	struct idxd_hw_desc *desc;
611 	struct idxd_ops *first_op, *op;
612 	void *src, *dst;
613 	uint64_t src_addr, dst_addr;
614 	int rc, count;
615 	uint64_t len, seg_len;
616 	struct spdk_ioviter iter;
617 	struct idxd_vtophys_iter vtophys_iter;
618 
619 	assert(chan != NULL);
620 	assert(diov != NULL);
621 	assert(siov != NULL);
622 
623 	rc = _idxd_setup_batch(chan);
624 	if (rc) {
625 		return rc;
626 	}
627 
628 	count = 0;
629 	first_op = NULL;
630 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
631 	     len > 0;
632 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
633 
634 		idxd_vtophys_iter_init(&vtophys_iter, src, dst, len);
635 
636 		while (len > 0) {
637 			if (first_op == NULL) {
638 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
639 				if (rc) {
640 					goto error;
641 				}
642 
643 				first_op = op;
644 			} else {
645 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
646 				if (rc) {
647 					goto error;
648 				}
649 
650 				first_op->count++;
651 				op->parent = first_op;
652 			}
653 
654 			count++;
655 
656 			src_addr = 0;
657 			dst_addr = 0;
658 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
659 			if (seg_len == SPDK_VTOPHYS_ERROR) {
660 				rc = -EFAULT;
661 				goto error;
662 			}
663 
664 			desc->opcode = IDXD_OPCODE_MEMMOVE;
665 			desc->src_addr = src_addr;
666 			desc->dst_addr = dst_addr;
667 			desc->xfer_size = seg_len;
668 			desc->flags ^= IDXD_FLAG_CACHE_CONTROL;
669 
670 			len -= seg_len;
671 		}
672 	}
673 
674 	return _idxd_flush_batch(chan);
675 
676 error:
677 	chan->batch->index -= count;
678 	return rc;
679 }
680 
681 /* Dual-cast copies the same source to two separate destination buffers. */
682 int
683 spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *dst2,
684 			  const void *src, uint64_t nbytes, int flags,
685 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
686 {
687 	struct idxd_hw_desc *desc;
688 	struct idxd_ops *first_op, *op;
689 	uint64_t src_addr, dst1_addr, dst2_addr;
690 	int rc, count;
691 	uint64_t len;
692 	uint64_t outer_seg_len, inner_seg_len;
693 	struct idxd_vtophys_iter iter_outer, iter_inner;
694 
695 	assert(chan != NULL);
696 	assert(dst1 != NULL);
697 	assert(dst2 != NULL);
698 	assert(src != NULL);
699 
700 	if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) {
701 		SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n");
702 		return -EINVAL;
703 	}
704 
705 	rc = _idxd_setup_batch(chan);
706 	if (rc) {
707 		return rc;
708 	}
709 
710 	idxd_vtophys_iter_init(&iter_outer, src, dst1, nbytes);
711 
712 	first_op = NULL;
713 	count = 0;
714 	while (nbytes > 0) {
715 		src_addr = 0;
716 		dst1_addr = 0;
717 		outer_seg_len = idxd_vtophys_iter_next(&iter_outer, &src_addr, &dst1_addr);
718 		if (outer_seg_len == SPDK_VTOPHYS_ERROR) {
719 			goto error;
720 		}
721 
722 		idxd_vtophys_iter_init(&iter_inner, src, dst2, nbytes);
723 
724 		src += outer_seg_len;
725 		nbytes -= outer_seg_len;
726 
727 		while (outer_seg_len > 0) {
728 			if (first_op == NULL) {
729 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
730 				if (rc) {
731 					goto error;
732 				}
733 
734 				first_op = op;
735 			} else {
736 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
737 				if (rc) {
738 					goto error;
739 				}
740 
741 				first_op->count++;
742 				op->parent = first_op;
743 			}
744 
745 			count++;
746 
747 			src_addr = 0;
748 			dst2_addr = 0;
749 			inner_seg_len = idxd_vtophys_iter_next(&iter_inner, &src_addr, &dst2_addr);
750 			if (inner_seg_len == SPDK_VTOPHYS_ERROR) {
751 				rc = -EFAULT;
752 				goto error;
753 			}
754 
755 			len = spdk_min(outer_seg_len, inner_seg_len);
756 
757 			/* Command specific. */
758 			desc->opcode = IDXD_OPCODE_DUALCAST;
759 			desc->src_addr = src_addr;
760 			desc->dst_addr = dst1_addr;
761 			desc->dest2 = dst2_addr;
762 			desc->xfer_size = len;
763 			desc->flags ^= IDXD_FLAG_CACHE_CONTROL;
764 
765 			dst1_addr += len;
766 			outer_seg_len -= len;
767 		}
768 	}
769 
770 	return _idxd_flush_batch(chan);
771 
772 error:
773 	chan->batch->index -= count;
774 	return rc;
775 }
776 
777 int
778 spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan,
779 			 struct iovec *siov1, size_t siov1cnt,
780 			 struct iovec *siov2, size_t siov2cnt,
781 			 int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
782 {
783 
784 	struct idxd_hw_desc *desc;
785 	struct idxd_ops *first_op, *op;
786 	void *src1, *src2;
787 	uint64_t src1_addr, src2_addr;
788 	int rc, count;
789 	uint64_t len, seg_len;
790 	struct spdk_ioviter iter;
791 	struct idxd_vtophys_iter vtophys_iter;
792 
793 	assert(chan != NULL);
794 	assert(siov1 != NULL);
795 	assert(siov2 != NULL);
796 
797 	rc = _idxd_setup_batch(chan);
798 	if (rc) {
799 		return rc;
800 	}
801 
802 	count = 0;
803 	first_op = NULL;
804 	for (len = spdk_ioviter_first(&iter, siov1, siov1cnt, siov2, siov2cnt, &src1, &src2);
805 	     len > 0;
806 	     len = spdk_ioviter_next(&iter, &src1, &src2)) {
807 
808 		idxd_vtophys_iter_init(&vtophys_iter, src1, src2, len);
809 
810 		while (len > 0) {
811 			if (first_op == NULL) {
812 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
813 				if (rc) {
814 					goto error;
815 				}
816 
817 				first_op = op;
818 			} else {
819 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
820 				if (rc) {
821 					goto error;
822 				}
823 
824 				first_op->count++;
825 				op->parent = first_op;
826 			}
827 
828 			count++;
829 
830 			src1_addr = 0;
831 			src2_addr = 0;
832 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src1_addr, &src2_addr);
833 			if (seg_len == SPDK_VTOPHYS_ERROR) {
834 				rc = -EFAULT;
835 				goto error;
836 			}
837 
838 			desc->opcode = IDXD_OPCODE_COMPARE;
839 			desc->src_addr = src1_addr;
840 			desc->src2_addr = src2_addr;
841 			desc->xfer_size = seg_len;
842 
843 			len -= seg_len;
844 		}
845 	}
846 
847 	return _idxd_flush_batch(chan);
848 
849 error:
850 	chan->batch->index -= count;
851 	return rc;
852 }
853 
854 int
855 spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan,
856 		      struct iovec *diov, size_t diovcnt,
857 		      uint64_t fill_pattern, int flags,
858 		      spdk_idxd_req_cb cb_fn, void *cb_arg)
859 {
860 	struct idxd_hw_desc *desc;
861 	struct idxd_ops *first_op, *op;
862 	uint64_t dst_addr;
863 	int rc, count;
864 	uint64_t len, seg_len;
865 	void *dst;
866 	size_t i;
867 
868 	assert(chan != NULL);
869 	assert(diov != NULL);
870 
871 	rc = _idxd_setup_batch(chan);
872 	if (rc) {
873 		return rc;
874 	}
875 
876 	count = 0;
877 	first_op = NULL;
878 	for (i = 0; i < diovcnt; i++) {
879 		len = diov[i].iov_len;
880 		dst = diov[i].iov_base;
881 
882 		while (len > 0) {
883 			if (first_op == NULL) {
884 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
885 				if (rc) {
886 					goto error;
887 				}
888 
889 				first_op = op;
890 			} else {
891 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
892 				if (rc) {
893 					goto error;
894 				}
895 
896 				first_op->count++;
897 				op->parent = first_op;
898 			}
899 
900 			count++;
901 
902 			seg_len = len;
903 			dst_addr = spdk_vtophys(dst, &seg_len);
904 			if (dst_addr == SPDK_VTOPHYS_ERROR) {
905 				SPDK_ERRLOG("Error translating address\n");
906 				rc = -EFAULT;
907 				goto error;
908 			}
909 
910 			seg_len = spdk_min(seg_len, len);
911 
912 			desc->opcode = IDXD_OPCODE_MEMFILL;
913 			desc->pattern = fill_pattern;
914 			desc->dst_addr = dst_addr;
915 			desc->xfer_size = seg_len;
916 			desc->flags ^= IDXD_FLAG_CACHE_CONTROL;
917 
918 			len -= seg_len;
919 			dst += seg_len;
920 		}
921 	}
922 
923 	return _idxd_flush_batch(chan);
924 
925 error:
926 	chan->batch->index -= count;
927 	return rc;
928 }
929 
930 int
931 spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan,
932 			struct iovec *siov, size_t siovcnt,
933 			uint32_t seed, uint32_t *crc_dst, int flags,
934 			spdk_idxd_req_cb cb_fn, void *cb_arg)
935 {
936 	struct idxd_hw_desc *desc;
937 	struct idxd_ops *first_op, *op;
938 	uint64_t src_addr;
939 	int rc, count;
940 	uint64_t len, seg_len;
941 	void *src;
942 	size_t i;
943 	void *prev_crc;
944 
945 	assert(chan != NULL);
946 	assert(siov != NULL);
947 
948 	rc = _idxd_setup_batch(chan);
949 	if (rc) {
950 		return rc;
951 	}
952 
953 	count = 0;
954 	op = NULL;
955 	first_op = NULL;
956 	for (i = 0; i < siovcnt; i++) {
957 		len = siov[i].iov_len;
958 		src = siov[i].iov_base;
959 
960 		while (len > 0) {
961 			if (first_op == NULL) {
962 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
963 				if (rc) {
964 					goto error;
965 				}
966 
967 				first_op = op;
968 			} else {
969 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
970 				if (rc) {
971 					goto error;
972 				}
973 
974 				first_op->count++;
975 				op->parent = first_op;
976 			}
977 
978 			count++;
979 
980 			seg_len = len;
981 			src_addr = spdk_vtophys(src, &seg_len);
982 			if (src_addr == SPDK_VTOPHYS_ERROR) {
983 				SPDK_ERRLOG("Error translating address\n");
984 				rc = -EFAULT;
985 				goto error;
986 			}
987 
988 			seg_len = spdk_min(seg_len, len);
989 
990 			desc->opcode = IDXD_OPCODE_CRC32C_GEN;
991 			desc->src_addr = src_addr;
992 			if (op == first_op) {
993 				desc->crc32c.seed = seed;
994 			} else {
995 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
996 				desc->crc32c.addr = (uint64_t)prev_crc;
997 			}
998 
999 			desc->xfer_size = seg_len;
1000 			prev_crc = &op->hw.crc32c_val;
1001 
1002 			len -= seg_len;
1003 			src += seg_len;
1004 		}
1005 	}
1006 
1007 	/* Only the last op copies the crc to the destination */
1008 	if (op) {
1009 		op->crc_dst = crc_dst;
1010 	}
1011 
1012 	return _idxd_flush_batch(chan);
1013 
1014 error:
1015 	chan->batch->index -= count;
1016 	return rc;
1017 }
1018 
1019 int
1020 spdk_idxd_submit_copy_crc32c(struct spdk_idxd_io_channel *chan,
1021 			     struct iovec *diov, size_t diovcnt,
1022 			     struct iovec *siov, size_t siovcnt,
1023 			     uint32_t seed, uint32_t *crc_dst, int flags,
1024 			     spdk_idxd_req_cb cb_fn, void *cb_arg)
1025 {
1026 	struct idxd_hw_desc *desc;
1027 	struct idxd_ops *first_op, *op;
1028 	void *src, *dst;
1029 	uint64_t src_addr, dst_addr;
1030 	int rc, count;
1031 	uint64_t len, seg_len;
1032 	struct spdk_ioviter iter;
1033 	struct idxd_vtophys_iter vtophys_iter;
1034 	void *prev_crc;
1035 
1036 	assert(chan != NULL);
1037 	assert(diov != NULL);
1038 	assert(siov != NULL);
1039 
1040 	rc = _idxd_setup_batch(chan);
1041 	if (rc) {
1042 		return rc;
1043 	}
1044 
1045 	count = 0;
1046 	op = NULL;
1047 	first_op = NULL;
1048 	for (len = spdk_ioviter_first(&iter, siov, siovcnt, diov, diovcnt, &src, &dst);
1049 	     len > 0;
1050 	     len = spdk_ioviter_next(&iter, &src, &dst)) {
1051 
1052 
1053 		idxd_vtophys_iter_init(&vtophys_iter, src, dst, len);
1054 
1055 		while (len > 0) {
1056 			if (first_op == NULL) {
1057 				rc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, flags, &desc, &op);
1058 				if (rc) {
1059 					goto error;
1060 				}
1061 
1062 				first_op = op;
1063 			} else {
1064 				rc = _idxd_prep_batch_cmd(chan, NULL, NULL, flags, &desc, &op);
1065 				if (rc) {
1066 					goto error;
1067 				}
1068 
1069 				first_op->count++;
1070 				op->parent = first_op;
1071 			}
1072 
1073 			count++;
1074 
1075 			src_addr = 0;
1076 			dst_addr = 0;
1077 			seg_len = idxd_vtophys_iter_next(&vtophys_iter, &src_addr, &dst_addr);
1078 			if (seg_len == SPDK_VTOPHYS_ERROR) {
1079 				rc = -EFAULT;
1080 				goto error;
1081 			}
1082 
1083 			desc->opcode = IDXD_OPCODE_COPY_CRC;
1084 			desc->dst_addr = dst_addr;
1085 			desc->src_addr = src_addr;
1086 			desc->flags ^= IDXD_FLAG_CACHE_CONTROL;
1087 			if (op == first_op) {
1088 				desc->crc32c.seed = seed;
1089 			} else {
1090 				desc->flags |= IDXD_FLAG_FENCE | IDXD_FLAG_CRC_READ_CRC_SEED;
1091 				desc->crc32c.addr = (uint64_t)prev_crc;
1092 			}
1093 
1094 			desc->xfer_size = seg_len;
1095 			prev_crc = &op->hw.crc32c_val;
1096 
1097 			len -= seg_len;
1098 		}
1099 	}
1100 
1101 	/* Only the last op copies the crc to the destination */
1102 	if (op) {
1103 		op->crc_dst = crc_dst;
1104 	}
1105 
1106 	return _idxd_flush_batch(chan);
1107 
1108 error:
1109 	chan->batch->index -= count;
1110 	return rc;
1111 }
1112 
1113 static inline int
1114 _idxd_submit_compress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1115 			     uint64_t nbytes_dst, uint64_t nbytes_src, uint32_t *output_size,
1116 			     int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1117 {
1118 	struct idxd_hw_desc *desc;
1119 	struct idxd_ops *op;
1120 	uint64_t src_addr, dst_addr;
1121 	int rc;
1122 
1123 	/* Common prep. */
1124 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1125 	if (rc) {
1126 		return rc;
1127 	}
1128 
1129 	rc = _vtophys(src, &src_addr, nbytes_src);
1130 	if (rc) {
1131 		goto error;
1132 	}
1133 
1134 	rc = _vtophys(dst, &dst_addr, nbytes_dst);
1135 	if (rc) {
1136 		goto error;
1137 	}
1138 
1139 	/* Command specific. */
1140 	desc->opcode = IDXD_OPCODE_COMPRESS;
1141 	desc->src1_addr = src_addr;
1142 	desc->dst_addr = dst_addr;
1143 	desc->src1_size = nbytes_src;
1144 	desc->iaa.max_dst_size = nbytes_dst;
1145 	desc->iaa.src2_size = sizeof(struct iaa_aecs);
1146 	desc->iaa.src2_addr = (uint64_t)chan->idxd->aecs;
1147 	desc->flags |= IAA_FLAG_RD_SRC2_AECS;
1148 	desc->compr_flags = IAA_COMP_FLAGS;
1149 	op->output_size = output_size;
1150 
1151 	_submit_to_hw(chan, op);
1152 	return 0;
1153 error:
1154 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1155 	return rc;
1156 }
1157 
1158 int
1159 spdk_idxd_submit_compress(struct spdk_idxd_io_channel *chan,
1160 			  struct iovec *diov, uint32_t diovcnt,
1161 			  struct iovec *siov, uint32_t siovcnt, uint32_t *output_size,
1162 			  int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1163 {
1164 	assert(chan != NULL);
1165 	assert(diov != NULL);
1166 	assert(siov != NULL);
1167 
1168 	if (diovcnt == 1 && siovcnt == 1) {
1169 		/* Simple case - copying one buffer to another */
1170 		if (diov[0].iov_len < siov[0].iov_len) {
1171 			return -EINVAL;
1172 		}
1173 
1174 		return _idxd_submit_compress_single(chan, diov[0].iov_base, siov[0].iov_base,
1175 						    diov[0].iov_len, siov[0].iov_len,
1176 						    output_size, flags, cb_fn, cb_arg);
1177 	}
1178 	/* TODO: vectored support */
1179 	return -EINVAL;
1180 }
1181 
1182 static inline int
1183 _idxd_submit_decompress_single(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
1184 			       uint64_t nbytes_dst, uint64_t nbytes, int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1185 {
1186 	struct idxd_hw_desc *desc;
1187 	struct idxd_ops *op;
1188 	uint64_t src_addr, dst_addr;
1189 	int rc;
1190 
1191 	/* Common prep. */
1192 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1193 	if (rc) {
1194 		return rc;
1195 	}
1196 
1197 	rc = _vtophys(src, &src_addr, nbytes);
1198 	if (rc) {
1199 		goto error;
1200 	}
1201 
1202 	rc = _vtophys(dst, &dst_addr, nbytes_dst);
1203 	if (rc) {
1204 		goto error;
1205 	}
1206 
1207 	/* Command specific. */
1208 	desc->opcode = IDXD_OPCODE_DECOMPRESS;
1209 	desc->src1_addr = src_addr;
1210 	desc->dst_addr = dst_addr;
1211 	desc->src1_size = nbytes;
1212 	desc->iaa.max_dst_size = nbytes_dst;
1213 	desc->decompr_flags = IAA_DECOMP_FLAGS;
1214 
1215 	_submit_to_hw(chan, op);
1216 	return 0;
1217 error:
1218 	STAILQ_INSERT_TAIL(&chan->ops_pool, op, link);
1219 	return rc;
1220 }
1221 
1222 int
1223 spdk_idxd_submit_decompress(struct spdk_idxd_io_channel *chan,
1224 			    struct iovec *diov, uint32_t diovcnt,
1225 			    struct iovec *siov, uint32_t siovcnt,
1226 			    int flags, spdk_idxd_req_cb cb_fn, void *cb_arg)
1227 {
1228 	assert(chan != NULL);
1229 	assert(diov != NULL);
1230 	assert(siov != NULL);
1231 
1232 	if (diovcnt == 1 && siovcnt == 1) {
1233 		/* Simple case - copying one buffer to another */
1234 		if (diov[0].iov_len < siov[0].iov_len) {
1235 			return -EINVAL;
1236 		}
1237 
1238 		return _idxd_submit_decompress_single(chan, diov[0].iov_base, siov[0].iov_base,
1239 						      diov[0].iov_len, siov[0].iov_len,
1240 						      flags, cb_fn, cb_arg);
1241 	}
1242 	/* TODO: vectored support */
1243 	return -EINVAL;
1244 }
1245 
1246 int
1247 spdk_idxd_submit_raw_desc(struct spdk_idxd_io_channel *chan,
1248 			  struct idxd_hw_desc *_desc,
1249 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
1250 {
1251 	struct idxd_hw_desc *desc;
1252 	struct idxd_ops *op;
1253 	int rc, flags = 0;
1254 	uint64_t comp_addr;
1255 
1256 	assert(chan != NULL);
1257 	assert(_desc != NULL);
1258 
1259 	/* Common prep. */
1260 	rc = _idxd_prep_command(chan, cb_fn, cb_arg, flags, &desc, &op);
1261 	if (rc) {
1262 		return rc;
1263 	}
1264 
1265 	/* Command specific. */
1266 	flags = desc->flags;
1267 	comp_addr = desc->completion_addr;
1268 	memcpy(desc, _desc, sizeof(*desc));
1269 	desc->flags |= flags;
1270 	desc->completion_addr = comp_addr;
1271 
1272 	/* Submit operation. */
1273 	_submit_to_hw(chan, op);
1274 
1275 	return 0;
1276 }
1277 
1278 static inline void
1279 _dump_sw_error_reg(struct spdk_idxd_io_channel *chan)
1280 {
1281 	struct spdk_idxd_device *idxd = chan->idxd;
1282 
1283 	assert(idxd != NULL);
1284 	idxd->impl->dump_sw_error(idxd, chan->portal);
1285 }
1286 
1287 /* TODO: more performance experiments. */
1288 #define IDXD_COMPLETION(x) ((x) > (0) ? (1) : (0))
1289 #define IDXD_FAILURE(x) ((x) > (1) ? (1) : (0))
1290 #define IDXD_SW_ERROR(x) ((x) &= (0x1) ? (1) : (0))
1291 int
1292 spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
1293 {
1294 	struct idxd_ops *op, *tmp, *parent_op;
1295 	int status = 0;
1296 	int rc2, rc = 0;
1297 	void *cb_arg;
1298 	spdk_idxd_req_cb cb_fn;
1299 
1300 	assert(chan != NULL);
1301 
1302 	STAILQ_FOREACH_SAFE(op, &chan->ops_outstanding, link, tmp) {
1303 		if (!IDXD_COMPLETION(op->hw.status)) {
1304 			/*
1305 			 * oldest locations are at the head of the list so if
1306 			 * we've polled a location that hasn't completed, bail
1307 			 * now as there are unlikely to be any more completions.
1308 			 */
1309 			break;
1310 		}
1311 
1312 		STAILQ_REMOVE_HEAD(&chan->ops_outstanding, link);
1313 		rc++;
1314 
1315 		/* Status is in the same location for both IAA and DSA completion records. */
1316 		if (spdk_unlikely(IDXD_FAILURE(op->hw.status))) {
1317 			SPDK_ERRLOG("Completion status 0x%x\n", op->hw.status);
1318 			status = -EINVAL;
1319 			_dump_sw_error_reg(chan);
1320 		}
1321 
1322 		switch (op->desc->opcode) {
1323 		case IDXD_OPCODE_BATCH:
1324 			SPDK_DEBUGLOG(idxd, "Complete batch %p\n", op->batch);
1325 			break;
1326 		case IDXD_OPCODE_CRC32C_GEN:
1327 		case IDXD_OPCODE_COPY_CRC:
1328 			if (spdk_likely(status == 0 && op->crc_dst != NULL)) {
1329 				*op->crc_dst = op->hw.crc32c_val;
1330 				*op->crc_dst ^= ~0;
1331 			}
1332 			break;
1333 		case IDXD_OPCODE_COMPARE:
1334 			if (spdk_likely(status == 0)) {
1335 				status = op->hw.result;
1336 			}
1337 			break;
1338 		case IDXD_OPCODE_COMPRESS:
1339 			if (spdk_likely(status == 0 && op->output_size != NULL)) {
1340 				*op->output_size = op->iaa_hw.output_size;
1341 			}
1342 			break;
1343 		}
1344 
1345 		/* TODO: WHAT IF THIS FAILED!? */
1346 		op->hw.status = 0;
1347 
1348 		assert(op->count > 0);
1349 		op->count--;
1350 
1351 		parent_op = op->parent;
1352 		if (parent_op != NULL) {
1353 			assert(parent_op->count > 0);
1354 			parent_op->count--;
1355 
1356 			if (parent_op->count == 0) {
1357 				cb_fn = parent_op->cb_fn;
1358 				cb_arg = parent_op->cb_arg;
1359 
1360 				assert(parent_op->batch != NULL);
1361 
1362 				/*
1363 				 * Now that parent_op count is 0, we can release its ref
1364 				 * to its batch. We have not released the ref to the batch
1365 				 * that the op is pointing to yet, which will be done below.
1366 				 */
1367 				parent_op->batch->refcnt--;
1368 				if (parent_op->batch->refcnt == 0) {
1369 					_free_batch(parent_op->batch, chan);
1370 				}
1371 
1372 				if (cb_fn) {
1373 					cb_fn(cb_arg, status);
1374 				}
1375 			}
1376 		}
1377 
1378 		if (op->count == 0) {
1379 			cb_fn = op->cb_fn;
1380 			cb_arg = op->cb_arg;
1381 
1382 			if (op->batch != NULL) {
1383 				assert(op->batch->refcnt > 0);
1384 				op->batch->refcnt--;
1385 
1386 				if (op->batch->refcnt == 0) {
1387 					_free_batch(op->batch, chan);
1388 				}
1389 			} else {
1390 				STAILQ_INSERT_HEAD(&chan->ops_pool, op, link);
1391 			}
1392 
1393 			if (cb_fn) {
1394 				cb_fn(cb_arg, status);
1395 			}
1396 		}
1397 
1398 		/* reset the status */
1399 		status = 0;
1400 	}
1401 
1402 	/* Submit any built-up batch */
1403 	if (chan->batch) {
1404 		rc2 = idxd_batch_submit(chan, NULL, NULL);
1405 		if (rc2) {
1406 			assert(rc2 == -EBUSY);
1407 		}
1408 	}
1409 
1410 	return rc;
1411 }
1412 
1413 void
1414 idxd_impl_register(struct spdk_idxd_impl *impl)
1415 {
1416 	STAILQ_INSERT_HEAD(&g_idxd_impls, impl, link);
1417 }
1418 
1419 SPDK_LOG_REGISTER_COMPONENT(idxd)
1420