xref: /spdk/lib/idxd/idxd.c (revision 344bb69312056bf7ea1f5590bdff6186d4bc917e)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 #include "spdk/stdinc.h"
36 
37 #include "spdk/env.h"
38 #include "spdk/util.h"
39 #include "spdk/memory.h"
40 
41 #include "spdk/log.h"
42 #include "spdk_internal/idxd.h"
43 
44 #include "idxd.h"
45 
46 #define ALIGN_4K 0x1000
47 
48 pthread_mutex_t	g_driver_lock = PTHREAD_MUTEX_INITIALIZER;
49 
50 /*
51  * g_dev_cfg gives us 2 pre-set configurations of DSA to choose from
52  * via RPC.
53  */
54 struct device_config *g_dev_cfg = NULL;
55 
56 /*
57  * Pre-built configurations. Variations depend on various factors
58  * including how many different types of target latency profiles there
59  * are, how many different QOS requirements there might be, etc.
60  */
61 struct device_config g_dev_cfg0 = {
62 	.config_num = 0,
63 	.num_groups = 4,
64 	.num_wqs_per_group = 1,
65 	.num_engines_per_group = 1,
66 	.total_wqs = 4,
67 	.total_engines = 4,
68 };
69 
70 struct device_config g_dev_cfg1 = {
71 	.config_num = 1,
72 	.num_groups = 2,
73 	.num_wqs_per_group = 2,
74 	.num_engines_per_group = 2,
75 	.total_wqs = 4,
76 	.total_engines = 4,
77 };
78 
79 static uint32_t
80 _idxd_read_4(struct spdk_idxd_device *idxd, uint32_t offset)
81 {
82 	return spdk_mmio_read_4((uint32_t *)(idxd->reg_base + offset));
83 }
84 
85 static void
86 _idxd_write_4(struct spdk_idxd_device *idxd, uint32_t offset, uint32_t value)
87 {
88 	spdk_mmio_write_4((uint32_t *)(idxd->reg_base + offset), value);
89 }
90 
91 static uint64_t
92 _idxd_read_8(struct spdk_idxd_device *idxd, uint32_t offset)
93 {
94 	return spdk_mmio_read_8((uint64_t *)(idxd->reg_base + offset));
95 }
96 
97 static void
98 _idxd_write_8(struct spdk_idxd_device *idxd, uint32_t offset, uint64_t value)
99 {
100 	spdk_mmio_write_8((uint64_t *)(idxd->reg_base + offset), value);
101 }
102 
103 struct spdk_idxd_io_channel *
104 spdk_idxd_get_channel(struct spdk_idxd_device *idxd)
105 {
106 	struct spdk_idxd_io_channel *chan;
107 	struct idxd_batch *batch;
108 	int i;
109 
110 	chan = calloc(1, sizeof(struct spdk_idxd_io_channel));
111 	if (chan == NULL) {
112 		SPDK_ERRLOG("Failed to allocate idxd chan\n");
113 		return NULL;
114 	}
115 	chan->idxd = idxd;
116 
117 	TAILQ_INIT(&chan->batches);
118 
119 	TAILQ_INIT(&chan->batch_pool);
120 	for (i = 0 ; i < NUM_BATCHES ; i++) {
121 		batch = calloc(1, sizeof(struct idxd_batch));
122 		if (batch == NULL) {
123 			SPDK_ERRLOG("Failed to allocate batch\n");
124 			while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
125 				TAILQ_REMOVE(&chan->batch_pool, batch, link);
126 				free(batch);
127 			}
128 			return NULL;
129 		}
130 		TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
131 	}
132 
133 	return chan;
134 }
135 
136 void
137 spdk_idxd_put_channel(struct spdk_idxd_io_channel *chan)
138 {
139 	free(chan);
140 }
141 
142 int
143 spdk_idxd_configure_chan(struct spdk_idxd_io_channel *chan)
144 {
145 	uint32_t num_ring_slots;
146 	int rc;
147 
148 	/* Round robin the WQ selection for the chan on this IDXD device. */
149 	chan->idxd->wq_id++;
150 	if (chan->idxd->wq_id == g_dev_cfg->total_wqs) {
151 		chan->idxd->wq_id = 0;
152 	}
153 
154 	num_ring_slots = chan->idxd->queues[chan->idxd->wq_id].wqcfg.wq_size;
155 
156 	chan->ring_ctrl.ring_slots = spdk_bit_array_create(num_ring_slots);
157 	if (chan->ring_ctrl.ring_slots == NULL) {
158 		SPDK_ERRLOG("Failed to allocate bit array for ring\n");
159 		return -ENOMEM;
160 	}
161 
162 	/*
163 	 * max ring slots can change as channels come and go but we
164 	 * start off getting all of the slots for this work queue.
165 	 */
166 	chan->ring_ctrl.max_ring_slots = num_ring_slots;
167 
168 	/* Store the original size of the ring. */
169 	chan->ring_ctrl.ring_size = num_ring_slots;
170 
171 	chan->ring_ctrl.desc = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_hw_desc),
172 					    0x40, NULL,
173 					    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
174 	if (chan->ring_ctrl.desc == NULL) {
175 		SPDK_ERRLOG("Failed to allocate descriptor memory\n");
176 		rc = -ENOMEM;
177 		goto err_desc;
178 	}
179 
180 	chan->ring_ctrl.completions = spdk_zmalloc(num_ring_slots * sizeof(struct idxd_comp),
181 				      0x40, NULL,
182 				      SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
183 	if (chan->ring_ctrl.completions == NULL) {
184 		SPDK_ERRLOG("Failed to allocate completion memory\n");
185 		rc = -ENOMEM;
186 		goto err_comp;
187 	}
188 
189 	chan->ring_ctrl.user_desc = spdk_zmalloc(TOTAL_USER_DESC * sizeof(struct idxd_hw_desc),
190 				    0x40, NULL,
191 				    SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
192 	if (chan->ring_ctrl.user_desc == NULL) {
193 		SPDK_ERRLOG("Failed to allocate batch descriptor memory\n");
194 		rc = -ENOMEM;
195 		goto err_user_desc;
196 	}
197 
198 	/* Each slot on the ring reserves DESC_PER_BATCH elemnts in user_desc. */
199 	chan->ring_ctrl.user_ring_slots = spdk_bit_array_create(NUM_BATCHES);
200 	if (chan->ring_ctrl.user_ring_slots == NULL) {
201 		SPDK_ERRLOG("Failed to allocate bit array for user ring\n");
202 		rc = -ENOMEM;
203 		goto err_user_ring;
204 	}
205 
206 	chan->ring_ctrl.user_completions = spdk_zmalloc(TOTAL_USER_DESC * sizeof(struct idxd_comp),
207 					   0x40, NULL,
208 					   SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA);
209 	if (chan->ring_ctrl.user_completions == NULL) {
210 		SPDK_ERRLOG("Failed to allocate user completion memory\n");
211 		rc = -ENOMEM;
212 		goto err_user_comp;
213 	}
214 
215 	chan->ring_ctrl.portal = (char *)chan->idxd->portals + chan->idxd->wq_id * PORTAL_SIZE;
216 
217 	return 0;
218 
219 err_user_comp:
220 	spdk_bit_array_free(&chan->ring_ctrl.user_ring_slots);
221 err_user_ring:
222 	spdk_free(chan->ring_ctrl.user_desc);
223 err_user_desc:
224 	spdk_free(chan->ring_ctrl.completions);
225 err_comp:
226 	spdk_free(chan->ring_ctrl.desc);
227 err_desc:
228 	spdk_bit_array_free(&chan->ring_ctrl.ring_slots);
229 
230 	return rc;
231 }
232 
233 /* Used for control commands, not for descriptor submission. */
234 static int
235 idxd_wait_cmd(struct spdk_idxd_device *idxd, int _timeout)
236 {
237 	uint32_t timeout = _timeout;
238 	union idxd_cmdsts_reg cmd_status = {};
239 
240 	cmd_status.raw = _idxd_read_4(idxd, IDXD_CMDSTS_OFFSET);
241 	while (cmd_status.active && --timeout) {
242 		usleep(1);
243 		cmd_status.raw = _idxd_read_4(idxd, IDXD_CMDSTS_OFFSET);
244 	}
245 
246 	/* Check for timeout */
247 	if (timeout == 0 && cmd_status.active) {
248 		SPDK_ERRLOG("Command timeout, waited %u\n", _timeout);
249 		return -EBUSY;
250 	}
251 
252 	/* Check for error */
253 	if (cmd_status.err) {
254 		SPDK_ERRLOG("Command status reg reports error 0x%x\n", cmd_status.err);
255 		return -EINVAL;
256 	}
257 
258 	return 0;
259 }
260 
261 static void
262 _idxd_drain(struct spdk_idxd_io_channel *chan)
263 {
264 	uint32_t index;
265 	int set = 0;
266 
267 	do {
268 		spdk_idxd_process_events(chan);
269 		set = 0;
270 		for (index = 0; index < chan->ring_ctrl.max_ring_slots; index++) {
271 			set |= spdk_bit_array_get(chan->ring_ctrl.ring_slots, index);
272 		}
273 	} while (set);
274 }
275 
276 int
277 spdk_idxd_reconfigure_chan(struct spdk_idxd_io_channel *chan, uint32_t num_channels)
278 {
279 	uint32_t num_ring_slots;
280 	int rc;
281 	struct idxd_batch *batch;
282 
283 	_idxd_drain(chan);
284 
285 	assert(spdk_bit_array_count_set(chan->ring_ctrl.ring_slots) == 0);
286 
287 	if (num_channels == 0) {
288 		spdk_free(chan->ring_ctrl.completions);
289 		spdk_free(chan->ring_ctrl.desc);
290 		spdk_bit_array_free(&chan->ring_ctrl.ring_slots);
291 		spdk_free(chan->ring_ctrl.user_completions);
292 		spdk_free(chan->ring_ctrl.user_desc);
293 		spdk_bit_array_free(&chan->ring_ctrl.user_ring_slots);
294 		while ((batch = TAILQ_FIRST(&chan->batch_pool))) {
295 			TAILQ_REMOVE(&chan->batch_pool, batch, link);
296 			free(batch);
297 		}
298 		return 0;
299 	}
300 
301 	num_ring_slots = chan->ring_ctrl.ring_size / num_channels;
302 
303 	/* re-allocate our descriptor ring for hw flow control. */
304 	rc = spdk_bit_array_resize(&chan->ring_ctrl.ring_slots, num_ring_slots);
305 	if (rc < 0) {
306 		SPDK_ERRLOG("Unable to resize channel bit array\n");
307 		return -ENOMEM;
308 	}
309 
310 	chan->ring_ctrl.max_ring_slots = num_ring_slots;
311 
312 	/*
313 	 * Note: The batch descriptor ring does not change with the
314 	 * number of channels as descriptors on this ring do not
315 	 * "count" for flow control.
316 	 */
317 
318 	return rc;
319 }
320 
321 /* Called via RPC to select a pre-defined configuration. */
322 void
323 spdk_idxd_set_config(uint32_t config_num)
324 {
325 	switch (config_num) {
326 	case 0:
327 		g_dev_cfg = &g_dev_cfg0;
328 		break;
329 	case 1:
330 		g_dev_cfg = &g_dev_cfg1;
331 		break;
332 	default:
333 		g_dev_cfg = &g_dev_cfg0;
334 		SPDK_ERRLOG("Invalid config, using default\n");
335 		break;
336 	}
337 }
338 
339 static int
340 idxd_unmap_pci_bar(struct spdk_idxd_device *idxd, int bar)
341 {
342 	int rc = 0;
343 	void *addr = NULL;
344 
345 	if (bar == IDXD_MMIO_BAR) {
346 		addr = (void *)idxd->reg_base;
347 	} else if (bar == IDXD_WQ_BAR) {
348 		addr = (void *)idxd->portals;
349 	}
350 
351 	if (addr) {
352 		rc = spdk_pci_device_unmap_bar(idxd->device, 0, addr);
353 	}
354 	return rc;
355 }
356 
357 static int
358 idxd_map_pci_bars(struct spdk_idxd_device *idxd)
359 {
360 	int rc;
361 	void *addr;
362 	uint64_t phys_addr, size;
363 
364 	rc = spdk_pci_device_map_bar(idxd->device, IDXD_MMIO_BAR, &addr, &phys_addr, &size);
365 	if (rc != 0 || addr == NULL) {
366 		SPDK_ERRLOG("pci_device_map_range failed with error code %d\n", rc);
367 		return -1;
368 	}
369 	idxd->reg_base = addr;
370 
371 	rc = spdk_pci_device_map_bar(idxd->device, IDXD_WQ_BAR, &addr, &phys_addr, &size);
372 	if (rc != 0 || addr == NULL) {
373 		SPDK_ERRLOG("pci_device_map_range failed with error code %d\n", rc);
374 		rc = idxd_unmap_pci_bar(idxd, IDXD_MMIO_BAR);
375 		if (rc) {
376 			SPDK_ERRLOG("unable to unmap MMIO bar\n");
377 		}
378 		return -EINVAL;
379 	}
380 	idxd->portals = addr;
381 
382 	return 0;
383 }
384 
385 static int
386 idxd_reset_dev(struct spdk_idxd_device *idxd)
387 {
388 	int rc;
389 
390 	_idxd_write_4(idxd, IDXD_CMD_OFFSET, IDXD_RESET_DEVICE << IDXD_CMD_SHIFT);
391 	rc = idxd_wait_cmd(idxd, IDXD_REGISTER_TIMEOUT_US);
392 	if (rc < 0) {
393 		SPDK_ERRLOG("Error resetting device %u\n", rc);
394 	}
395 
396 	return rc;
397 }
398 
399 /*
400  * Build group config based on getting info from the device combined
401  * with the defined configuration. Once built, it is written to the
402  * device.
403  */
404 static int
405 idxd_group_config(struct spdk_idxd_device *idxd)
406 {
407 	int i;
408 	uint64_t base_offset;
409 
410 	assert(g_dev_cfg->num_groups <= idxd->registers.groupcap.num_groups);
411 	idxd->groups = calloc(idxd->registers.groupcap.num_groups, sizeof(struct idxd_group));
412 	if (idxd->groups == NULL) {
413 		SPDK_ERRLOG("Failed to allocate group memory\n");
414 		return -ENOMEM;
415 	}
416 
417 	assert(g_dev_cfg->total_engines <= idxd->registers.enginecap.num_engines);
418 	for (i = 0; i < g_dev_cfg->total_engines; i++) {
419 		idxd->groups[i % g_dev_cfg->num_groups].grpcfg.engines |= (1 << i);
420 	}
421 
422 	assert(g_dev_cfg->total_wqs <= idxd->registers.wqcap.num_wqs);
423 	for (i = 0; i < g_dev_cfg->total_wqs; i++) {
424 		idxd->groups[i % g_dev_cfg->num_groups].grpcfg.wqs[0] |= (1 << i);
425 	}
426 
427 	for (i = 0; i < g_dev_cfg->num_groups; i++) {
428 		idxd->groups[i].idxd = idxd;
429 		idxd->groups[i].id = i;
430 
431 		/* Divide BW tokens evenly */
432 		idxd->groups[i].grpcfg.flags.tokens_allowed =
433 			idxd->registers.groupcap.total_tokens / g_dev_cfg->num_groups;
434 	}
435 
436 	/*
437 	 * Now write the group config to the device for all groups. We write
438 	 * to the max number of groups in order to 0 out the ones we didn't
439 	 * configure.
440 	 */
441 	for (i = 0 ; i < idxd->registers.groupcap.num_groups; i++) {
442 
443 		base_offset = idxd->grpcfg_offset + i * 64;
444 
445 		/* GRPWQCFG, work queues config */
446 		_idxd_write_8(idxd, base_offset, idxd->groups[i].grpcfg.wqs[0]);
447 
448 		/* GRPENGCFG, engine config */
449 		_idxd_write_8(idxd, base_offset + CFG_ENGINE_OFFSET, idxd->groups[i].grpcfg.engines);
450 
451 		/* GRPFLAGS, flags config */
452 		_idxd_write_8(idxd, base_offset + CFG_FLAG_OFFSET, idxd->groups[i].grpcfg.flags.raw);
453 	}
454 
455 	return 0;
456 }
457 
458 /*
459  * Build work queue (WQ) config based on getting info from the device combined
460  * with the defined configuration. Once built, it is written to the device.
461  */
462 static int
463 idxd_wq_config(struct spdk_idxd_device *idxd)
464 {
465 	int i, j;
466 	struct idxd_wq *queue;
467 	u_int32_t wq_size = idxd->registers.wqcap.total_wq_size / g_dev_cfg->total_wqs;
468 
469 	SPDK_NOTICELOG("Total ring slots available space 0x%x, so per work queue is 0x%x\n",
470 		       idxd->registers.wqcap.total_wq_size, wq_size);
471 	assert(g_dev_cfg->total_wqs <= IDXD_MAX_QUEUES);
472 	assert(g_dev_cfg->total_wqs <= idxd->registers.wqcap.num_wqs);
473 	assert(LOG2_WQ_MAX_BATCH <= idxd->registers.gencap.max_batch_shift);
474 	assert(LOG2_WQ_MAX_XFER <= idxd->registers.gencap.max_xfer_shift);
475 
476 	idxd->queues = calloc(1, idxd->registers.wqcap.num_wqs * sizeof(struct idxd_wq));
477 	if (idxd->queues == NULL) {
478 		SPDK_ERRLOG("Failed to allocate queue memory\n");
479 		return -ENOMEM;
480 	}
481 
482 	for (i = 0; i < g_dev_cfg->total_wqs; i++) {
483 		queue = &idxd->queues[i];
484 		queue->wqcfg.wq_size = wq_size;
485 		queue->wqcfg.mode = WQ_MODE_DEDICATED;
486 		queue->wqcfg.max_batch_shift = LOG2_WQ_MAX_BATCH;
487 		queue->wqcfg.max_xfer_shift = LOG2_WQ_MAX_XFER;
488 		queue->wqcfg.wq_state = WQ_ENABLED;
489 		queue->wqcfg.priority = WQ_PRIORITY_1;
490 
491 		/* Not part of the config struct */
492 		queue->idxd = idxd;
493 		queue->group = &idxd->groups[i % g_dev_cfg->num_groups];
494 	}
495 
496 	/*
497 	 * Now write the work queue config to the device for all wq space
498 	 */
499 	for (i = 0 ; i < idxd->registers.wqcap.num_wqs; i++) {
500 		queue = &idxd->queues[i];
501 		for (j = 0 ; j < WQCFG_NUM_DWORDS; j++) {
502 			_idxd_write_4(idxd, idxd->wqcfg_offset + i * 32 + j * 4,
503 				      queue->wqcfg.raw[j]);
504 		}
505 	}
506 
507 	return 0;
508 }
509 
510 static int
511 idxd_device_configure(struct spdk_idxd_device *idxd)
512 {
513 	int i, rc = 0;
514 	union idxd_offsets_register offsets_reg;
515 	union idxd_genstatus_register genstatus_reg;
516 
517 	/*
518 	 * Map BAR0 and BAR2
519 	 */
520 	rc = idxd_map_pci_bars(idxd);
521 	if (rc) {
522 		return rc;
523 	}
524 
525 	/*
526 	 * Reset the device
527 	 */
528 	rc = idxd_reset_dev(idxd);
529 	if (rc) {
530 		goto err_reset;
531 	}
532 
533 	/*
534 	 * Read in config registers
535 	 */
536 	idxd->registers.version = _idxd_read_4(idxd, IDXD_VERSION_OFFSET);
537 	idxd->registers.gencap.raw = _idxd_read_8(idxd, IDXD_GENCAP_OFFSET);
538 	idxd->registers.wqcap.raw = _idxd_read_8(idxd, IDXD_WQCAP_OFFSET);
539 	idxd->registers.groupcap.raw = _idxd_read_8(idxd, IDXD_GRPCAP_OFFSET);
540 	idxd->registers.enginecap.raw = _idxd_read_8(idxd, IDXD_ENGCAP_OFFSET);
541 	for (i = 0; i < IDXD_OPCAP_WORDS; i++) {
542 		idxd->registers.opcap.raw[i] =
543 			_idxd_read_8(idxd, i * sizeof(uint64_t) + IDXD_OPCAP_OFFSET);
544 	}
545 	offsets_reg.raw[0] = _idxd_read_8(idxd, IDXD_TABLE_OFFSET);
546 	offsets_reg.raw[1] = _idxd_read_8(idxd, IDXD_TABLE_OFFSET + sizeof(uint64_t));
547 	idxd->grpcfg_offset = offsets_reg.grpcfg * IDXD_TABLE_OFFSET_MULT;
548 	idxd->wqcfg_offset = offsets_reg.wqcfg * IDXD_TABLE_OFFSET_MULT;
549 	idxd->ims_offset = offsets_reg.ims * IDXD_TABLE_OFFSET_MULT;
550 	idxd->msix_perm_offset = offsets_reg.msix_perm  * IDXD_TABLE_OFFSET_MULT;
551 	idxd->perfmon_offset = offsets_reg.perfmon * IDXD_TABLE_OFFSET_MULT;
552 
553 	/*
554 	 * Configure groups and work queues.
555 	 */
556 	rc = idxd_group_config(idxd);
557 	if (rc) {
558 		goto err_group_cfg;
559 	}
560 
561 	rc = idxd_wq_config(idxd);
562 	if (rc) {
563 		goto err_wq_cfg;
564 	}
565 
566 	/*
567 	 * Enable the device
568 	 */
569 	genstatus_reg.raw = _idxd_read_4(idxd, IDXD_GENSTATUS_OFFSET);
570 	assert(genstatus_reg.state == IDXD_DEVICE_STATE_DISABLED);
571 
572 	_idxd_write_4(idxd, IDXD_CMD_OFFSET, IDXD_ENABLE_DEV << IDXD_CMD_SHIFT);
573 	rc = idxd_wait_cmd(idxd, IDXD_REGISTER_TIMEOUT_US);
574 	genstatus_reg.raw = _idxd_read_4(idxd, IDXD_GENSTATUS_OFFSET);
575 	if ((rc < 0) || (genstatus_reg.state != IDXD_DEVICE_STATE_ENABLED)) {
576 		rc = -EINVAL;
577 		SPDK_ERRLOG("Error enabling device %u\n", rc);
578 		goto err_device_enable;
579 	}
580 
581 	genstatus_reg.raw = spdk_mmio_read_4((uint32_t *)(idxd->reg_base + IDXD_GENSTATUS_OFFSET));
582 	assert(genstatus_reg.state == IDXD_DEVICE_STATE_ENABLED);
583 
584 	/*
585 	 * Enable the work queues that we've configured
586 	 */
587 	for (i = 0; i < g_dev_cfg->total_wqs; i++) {
588 		_idxd_write_4(idxd, IDXD_CMD_OFFSET,
589 			      (IDXD_ENABLE_WQ << IDXD_CMD_SHIFT) | i);
590 		rc = idxd_wait_cmd(idxd, IDXD_REGISTER_TIMEOUT_US);
591 		if (rc < 0) {
592 			SPDK_ERRLOG("Error enabling work queues 0x%x\n", rc);
593 			goto err_wq_enable;
594 		}
595 	}
596 
597 	if ((rc == 0) && (genstatus_reg.state == IDXD_DEVICE_STATE_ENABLED)) {
598 		SPDK_NOTICELOG("Device enabled, version 0x%x gencap: 0x%lx\n",
599 			       idxd->registers.version,
600 			       idxd->registers.gencap.raw);
601 
602 	}
603 
604 	return rc;
605 err_wq_enable:
606 err_device_enable:
607 	free(idxd->queues);
608 err_wq_cfg:
609 	free(idxd->groups);
610 err_group_cfg:
611 err_reset:
612 	idxd_unmap_pci_bar(idxd, IDXD_MMIO_BAR);
613 	idxd_unmap_pci_bar(idxd, IDXD_MMIO_BAR);
614 
615 	return rc;
616 }
617 
618 static void
619 idxd_device_destruct(struct spdk_idxd_device *idxd)
620 {
621 	idxd_unmap_pci_bar(idxd, IDXD_MMIO_BAR);
622 	idxd_unmap_pci_bar(idxd, IDXD_WQ_BAR);
623 	free(idxd->groups);
624 	free(idxd->queues);
625 	free(idxd);
626 }
627 
628 /* Caller must hold g_driver_lock */
629 static struct spdk_idxd_device *
630 idxd_attach(struct spdk_pci_device *device)
631 {
632 	struct spdk_idxd_device *idxd;
633 	uint32_t cmd_reg;
634 	int rc;
635 
636 	idxd = calloc(1, sizeof(struct spdk_idxd_device));
637 	if (idxd == NULL) {
638 		SPDK_ERRLOG("Failed to allocate memory for idxd device.\n");
639 		return NULL;
640 	}
641 
642 	idxd->device = device;
643 
644 	/* Enable PCI busmaster. */
645 	spdk_pci_device_cfg_read32(device, &cmd_reg, 4);
646 	cmd_reg |= 0x4;
647 	spdk_pci_device_cfg_write32(device, cmd_reg, 4);
648 
649 	rc = idxd_device_configure(idxd);
650 	if (rc) {
651 		goto err;
652 	}
653 
654 	return idxd;
655 err:
656 	idxd_device_destruct(idxd);
657 	return NULL;
658 }
659 
660 struct idxd_enum_ctx {
661 	spdk_idxd_probe_cb probe_cb;
662 	spdk_idxd_attach_cb attach_cb;
663 	void *cb_ctx;
664 };
665 
666 /* This function must only be called while holding g_driver_lock */
667 static int
668 idxd_enum_cb(void *ctx, struct spdk_pci_device *pci_dev)
669 {
670 	struct idxd_enum_ctx *enum_ctx = ctx;
671 	struct spdk_idxd_device *idxd;
672 
673 	if (enum_ctx->probe_cb(enum_ctx->cb_ctx, pci_dev)) {
674 		idxd = idxd_attach(pci_dev);
675 		if (idxd == NULL) {
676 			SPDK_ERRLOG("idxd_attach() failed\n");
677 			return -EINVAL;
678 		}
679 
680 		enum_ctx->attach_cb(enum_ctx->cb_ctx, pci_dev, idxd);
681 	}
682 
683 	return 0;
684 }
685 
686 int
687 spdk_idxd_probe(void *cb_ctx, spdk_idxd_probe_cb probe_cb, spdk_idxd_attach_cb attach_cb)
688 {
689 	int rc;
690 	struct idxd_enum_ctx enum_ctx;
691 
692 	enum_ctx.probe_cb = probe_cb;
693 	enum_ctx.attach_cb = attach_cb;
694 	enum_ctx.cb_ctx = cb_ctx;
695 
696 	pthread_mutex_lock(&g_driver_lock);
697 	rc = spdk_pci_enumerate(spdk_pci_idxd_get_driver(), idxd_enum_cb, &enum_ctx);
698 	pthread_mutex_unlock(&g_driver_lock);
699 
700 	return rc;
701 }
702 
703 void
704 spdk_idxd_detach(struct spdk_idxd_device *idxd)
705 {
706 	idxd_device_destruct(idxd);
707 }
708 
709 static struct idxd_hw_desc *
710 _idxd_prep_command(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn,
711 		   void *cb_arg, struct idxd_batch *batch)
712 {
713 	uint32_t index;
714 	struct idxd_hw_desc *desc;
715 	struct idxd_comp *comp;
716 
717 	index = spdk_bit_array_find_first_clear(chan->ring_ctrl.ring_slots, 0);
718 	if (index == UINT32_MAX) {
719 		/* ran out of ring slots */
720 		return NULL;
721 	}
722 
723 	spdk_bit_array_set(chan->ring_ctrl.ring_slots, index);
724 
725 	desc = &chan->ring_ctrl.desc[index];
726 	comp = &chan->ring_ctrl.completions[index];
727 
728 	desc->flags = IDXD_FLAG_COMPLETION_ADDR_VALID | IDXD_FLAG_REQUEST_COMPLETION;
729 	desc->completion_addr = (uintptr_t)&comp->hw;
730 	comp->cb_arg = cb_arg;
731 	comp->cb_fn = cb_fn;
732 	if (batch) {
733 		comp->batch = batch;
734 		batch->batch_desc_index = index;
735 	}
736 
737 	return desc;
738 }
739 
740 int
741 spdk_idxd_submit_copy(struct spdk_idxd_io_channel *chan, void *dst, const void *src,
742 		      uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg)
743 {
744 	struct idxd_hw_desc *desc;
745 	uint64_t src_addr, dst_addr;
746 	uint64_t src_nbytes = 0, dst_nbytes = 0;
747 
748 	/* Common prep. */
749 	desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
750 	if (desc == NULL) {
751 		return -EBUSY;
752 	}
753 
754 	src_addr = spdk_vtophys(src, &src_nbytes);
755 	dst_addr = spdk_vtophys(dst, &dst_nbytes);
756 
757 	if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) {
758 		return -EINVAL;
759 	}
760 
761 	if (src_nbytes < nbytes || dst_nbytes < nbytes) {
762 		return -EINVAL;
763 	}
764 
765 	/* Command specific. */
766 	desc->opcode = IDXD_OPCODE_MEMMOVE;
767 	desc->src_addr = src_addr;
768 	desc->dst_addr = dst_addr;
769 	desc->xfer_size = nbytes;
770 
771 	/* Submit operation. */
772 	movdir64b(chan->ring_ctrl.portal, desc);
773 
774 	return 0;
775 }
776 
777 /* Dual-cast copies the same source to two separate destination buffers. */
778 int
779 spdk_idxd_submit_dualcast(struct spdk_idxd_io_channel *chan, void *dst1, void *dst2,
780 			  const void *src, uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg)
781 {
782 	struct idxd_hw_desc *desc;
783 	uint64_t src_addr, dst1_addr, dst2_addr;
784 	uint64_t src_nbytes = 0, dst1_nbytes = 0, dst2_nbytes = 0;
785 
786 	if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) {
787 		SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n");
788 		return -EINVAL;
789 	}
790 
791 	/* Common prep. */
792 	desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
793 	if (desc == NULL) {
794 		return -EBUSY;
795 	}
796 
797 	src_addr = spdk_vtophys(src, &src_nbytes);
798 	dst1_addr = spdk_vtophys(dst1, &dst1_nbytes);
799 	dst2_addr = spdk_vtophys(dst2, &dst2_nbytes);
800 
801 	if (src_addr == SPDK_VTOPHYS_ERROR || dst1_addr == SPDK_VTOPHYS_ERROR ||
802 	    dst2_addr == SPDK_VTOPHYS_ERROR) {
803 		return -EINVAL;
804 	}
805 
806 	if (src_nbytes < nbytes || dst1_nbytes < nbytes || dst2_nbytes < nbytes) {
807 		return -EINVAL;
808 	}
809 
810 	/* Command specific. */
811 	desc->opcode = IDXD_OPCODE_DUALCAST;
812 	desc->src_addr = src_addr;
813 	desc->dst_addr = dst1_addr;
814 	desc->dest2 = dst2_addr;
815 	desc->xfer_size = nbytes;
816 
817 	/* Submit operation. */
818 	movdir64b(chan->ring_ctrl.portal, desc);
819 
820 	return 0;
821 }
822 
823 int
824 spdk_idxd_submit_compare(struct spdk_idxd_io_channel *chan, void *src1, const void *src2,
825 			 uint64_t nbytes,
826 			 spdk_idxd_req_cb cb_fn, void *cb_arg)
827 {
828 	struct idxd_hw_desc *desc;
829 	uint64_t src1_addr, src2_addr;
830 	uint64_t src1_nbytes = 0, src2_nbytes = 0;
831 
832 	/* Common prep. */
833 	desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
834 	if (desc == NULL) {
835 		return -EBUSY;
836 	}
837 
838 	src1_addr = spdk_vtophys(src1, &src1_nbytes);
839 	src2_addr = spdk_vtophys(src2, &src2_nbytes);
840 
841 	if (src1_addr == SPDK_VTOPHYS_ERROR || src2_addr == SPDK_VTOPHYS_ERROR) {
842 		return -EINVAL;
843 	}
844 
845 	if (src1_nbytes < nbytes || src2_nbytes < nbytes) {
846 		return -EINVAL;
847 	}
848 
849 	/* Command specific. */
850 	desc->opcode = IDXD_OPCODE_COMPARE;
851 	desc->src_addr = src1_addr;
852 	desc->src2_addr = src2_addr;
853 	desc->xfer_size = nbytes;
854 
855 	/* Submit operation. */
856 	movdir64b(chan->ring_ctrl.portal, desc);
857 
858 	return 0;
859 }
860 
861 int
862 spdk_idxd_submit_fill(struct spdk_idxd_io_channel *chan, void *dst, uint64_t fill_pattern,
863 		      uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg)
864 {
865 	struct idxd_hw_desc *desc;
866 	uint64_t dst_addr;
867 	uint64_t dst_nbytes = 0;
868 
869 	/* Common prep. */
870 	desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
871 	if (desc == NULL) {
872 		return -EBUSY;
873 	}
874 
875 	dst_addr = spdk_vtophys(dst, &dst_nbytes);
876 
877 	if (dst_addr == SPDK_VTOPHYS_ERROR) {
878 		return -EINVAL;
879 	}
880 
881 	if (dst_nbytes < nbytes) {
882 		return -EINVAL;
883 	}
884 
885 	/* Command specific. */
886 	desc->opcode = IDXD_OPCODE_MEMFILL;
887 	desc->pattern = fill_pattern;
888 	desc->dst_addr = dst_addr;
889 	desc->xfer_size = nbytes;
890 
891 	/* Submit operation. */
892 	movdir64b(chan->ring_ctrl.portal, desc);
893 
894 	return 0;
895 }
896 
897 int
898 spdk_idxd_submit_crc32c(struct spdk_idxd_io_channel *chan, uint32_t *dst, void *src,
899 			uint32_t seed, uint64_t nbytes,
900 			spdk_idxd_req_cb cb_fn, void *cb_arg)
901 {
902 	struct idxd_hw_desc *desc;
903 	uint64_t src_addr, dst_addr;
904 	uint64_t src_nbytes = 0, dst_nbytes = 0;
905 
906 	/* Common prep. */
907 	desc = _idxd_prep_command(chan, cb_fn, cb_arg, NULL);
908 	if (desc == NULL) {
909 		return -EBUSY;
910 	}
911 
912 	src_addr = spdk_vtophys(src, &src_nbytes);
913 	dst_addr = spdk_vtophys(dst, &dst_nbytes);
914 
915 	if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) {
916 		return -EINVAL;
917 	}
918 
919 	if (src_nbytes < nbytes || dst_nbytes < nbytes) {
920 		return -EINVAL;
921 	}
922 
923 	/* Command specific. */
924 	desc->opcode = IDXD_OPCODE_CRC32C_GEN;
925 	desc->dst_addr = dst_addr;
926 	desc->src_addr = src_addr;
927 	desc->flags &= IDXD_CLEAR_CRC_FLAGS;
928 	desc->crc32c.seed = seed;
929 	desc->xfer_size = nbytes;
930 
931 	/* Submit operation. */
932 	movdir64b(chan->ring_ctrl.portal, desc);
933 
934 	return 0;
935 }
936 
937 uint32_t
938 spdk_idxd_batch_get_max(void)
939 {
940 	return DESC_PER_BATCH; /* TODO maybe add startup RPC to set this */
941 }
942 
943 struct idxd_batch *
944 spdk_idxd_batch_create(struct spdk_idxd_io_channel *chan)
945 {
946 	struct idxd_batch *batch = NULL;
947 
948 	if (!TAILQ_EMPTY(&chan->batch_pool)) {
949 		batch = TAILQ_FIRST(&chan->batch_pool);
950 		TAILQ_REMOVE(&chan->batch_pool, batch, link);
951 	} else {
952 		/* The application needs to handle this. */
953 		return NULL;
954 	}
955 
956 	batch->batch_num = spdk_bit_array_find_first_clear(chan->ring_ctrl.user_ring_slots, 0);
957 	if (batch->batch_num == UINT32_MAX) {
958 		/* ran out of ring slots, the application needs to handle this. */
959 		TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
960 		return NULL;
961 	}
962 
963 	spdk_bit_array_set(chan->ring_ctrl.user_ring_slots, batch->batch_num);
964 
965 	/*
966 	 * Find the first descriptor address for the given batch. The
967 	 * descriptor ring used for user desctipors is allocated in
968 	 * units of DESC_PER_BATCH.  The actual index is in units of
969 	 * one descriptor.
970 	 */
971 	batch->start_index = batch->cur_index = batch->batch_num * DESC_PER_BATCH;
972 
973 	TAILQ_INSERT_TAIL(&chan->batches, batch, link);
974 	SPDK_DEBUGLOG(idxd, "New batch %p num %u\n", batch, batch->batch_num);
975 
976 	return batch;
977 }
978 
979 static bool
980 _does_batch_exist(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan)
981 {
982 	bool found = false;
983 	struct idxd_batch *cur_batch;
984 
985 	TAILQ_FOREACH(cur_batch, &chan->batches, link) {
986 		if (cur_batch == batch) {
987 			found = true;
988 			break;
989 		}
990 	}
991 
992 	return found;
993 }
994 
995 int
996 spdk_idxd_batch_cancel(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch)
997 {
998 	if (_does_batch_exist(batch, chan) == false) {
999 		SPDK_ERRLOG("Attempt to cancel a batch that doesn't exist\n.");
1000 		return -EINVAL;
1001 	}
1002 
1003 	if (batch->remaining > 0) {
1004 		SPDK_ERRLOG("Cannot cancel batch, already submitted to HW\n.");
1005 		return -EINVAL;
1006 	}
1007 
1008 	TAILQ_REMOVE(&chan->batches, batch, link);
1009 	spdk_bit_array_clear(chan->ring_ctrl.user_ring_slots, batch->batch_num);
1010 	TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
1011 
1012 	return 0;
1013 }
1014 
1015 int
1016 spdk_idxd_batch_submit(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
1017 		       spdk_idxd_req_cb cb_fn, void *cb_arg)
1018 {
1019 	struct idxd_hw_desc *desc;
1020 
1021 	if (_does_batch_exist(batch, chan) == false) {
1022 		SPDK_ERRLOG("Attempt to submit a batch that doesn't exist\n.");
1023 		return -EINVAL;
1024 	}
1025 
1026 	/* Common prep. */
1027 	desc = _idxd_prep_command(chan, cb_fn, cb_arg, batch);
1028 	if (desc == NULL) {
1029 		SPDK_DEBUGLOG(idxd, "Can't submit batch %p busy batch num %u\n", batch, batch->batch_num);
1030 		return -EBUSY;
1031 	}
1032 
1033 	/* Command specific. */
1034 	desc->opcode = IDXD_OPCODE_BATCH;
1035 	desc->desc_list_addr = (uintptr_t)&chan->ring_ctrl.user_desc[batch->start_index];
1036 	desc->desc_count = batch->cur_index - batch->start_index;
1037 	assert(desc->desc_count <= DESC_PER_BATCH);
1038 
1039 	if (desc->desc_count < MIN_USER_DESC_COUNT) {
1040 		SPDK_ERRLOG("Attempt to submit a batch without at least %u operations.\n",
1041 			    MIN_USER_DESC_COUNT);
1042 		return -EINVAL;
1043 	}
1044 
1045 	/* Total completions for the batch = num desc plus 1 for the batch desc itself. */
1046 	batch->remaining = desc->desc_count + 1;
1047 
1048 	/* Submit operation. */
1049 	movdir64b(chan->ring_ctrl.portal, desc);
1050 
1051 	return 0;
1052 }
1053 
1054 static struct idxd_hw_desc *
1055 _idxd_prep_batch_cmd(struct spdk_idxd_io_channel *chan, spdk_idxd_req_cb cb_fn,
1056 		     void *cb_arg, struct idxd_batch *batch)
1057 {
1058 	struct idxd_hw_desc *desc;
1059 	struct idxd_comp *comp;
1060 
1061 	if (_does_batch_exist(batch, chan) == false) {
1062 		SPDK_ERRLOG("Attempt to add to a batch that doesn't exist\n.");
1063 		return NULL;
1064 	}
1065 
1066 	if ((batch->cur_index - batch->start_index) == DESC_PER_BATCH) {
1067 		SPDK_ERRLOG("Attempt to add to a batch that is already full\n.");
1068 		return NULL;
1069 	}
1070 
1071 	desc = &chan->ring_ctrl.user_desc[batch->cur_index];
1072 	comp = &chan->ring_ctrl.user_completions[batch->cur_index];
1073 	SPDK_DEBUGLOG(idxd, "Prep batch %p index %u\n", batch, batch->cur_index);
1074 
1075 	batch->cur_index++;
1076 	assert(batch->cur_index > batch->start_index);
1077 
1078 	desc->flags = IDXD_FLAG_COMPLETION_ADDR_VALID | IDXD_FLAG_REQUEST_COMPLETION;
1079 	desc->completion_addr = (uintptr_t)&comp->hw;
1080 	comp->cb_arg = cb_arg;
1081 	comp->cb_fn = cb_fn;
1082 	comp->batch = batch;
1083 
1084 	return desc;
1085 }
1086 
1087 int
1088 spdk_idxd_batch_prep_copy(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
1089 			  void *dst, const void *src, uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg)
1090 {
1091 	struct idxd_hw_desc *desc;
1092 	uint64_t src_addr, dst_addr;
1093 	uint64_t src_nbytes = 0, dst_nbytes = 0;
1094 
1095 	/* Common prep. */
1096 	desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch);
1097 	if (desc == NULL) {
1098 		return -EINVAL;
1099 	}
1100 
1101 	src_addr = spdk_vtophys(src, &src_nbytes);
1102 	dst_addr = spdk_vtophys(dst, &dst_nbytes);
1103 
1104 	if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) {
1105 		return -EINVAL;
1106 	}
1107 
1108 	if (src_nbytes < nbytes || dst_nbytes < nbytes) {
1109 		return -EINVAL;
1110 	}
1111 
1112 	/* Command specific. */
1113 	desc->opcode = IDXD_OPCODE_MEMMOVE;
1114 	desc->src_addr = src_addr;
1115 	desc->dst_addr = dst_addr;
1116 	desc->xfer_size = nbytes;
1117 
1118 	return 0;
1119 }
1120 
1121 int
1122 spdk_idxd_batch_prep_fill(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
1123 			  void *dst, uint64_t fill_pattern, uint64_t nbytes,
1124 			  spdk_idxd_req_cb cb_fn, void *cb_arg)
1125 {
1126 	struct idxd_hw_desc *desc;
1127 	uint64_t dst_addr;
1128 	uint64_t dst_nbytes = 0;
1129 
1130 	/* Common prep. */
1131 	desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch);
1132 	if (desc == NULL) {
1133 		return -EINVAL;
1134 	}
1135 
1136 	dst_addr = spdk_vtophys(dst, &dst_nbytes);
1137 
1138 	if (dst_addr == SPDK_VTOPHYS_ERROR) {
1139 		return -EINVAL;
1140 	}
1141 
1142 	if (dst_nbytes < nbytes) {
1143 		return -EINVAL;
1144 	}
1145 
1146 	/* Command specific. */
1147 	desc->opcode = IDXD_OPCODE_MEMFILL;
1148 	desc->pattern = fill_pattern;
1149 	desc->dst_addr = dst_addr;
1150 	desc->xfer_size = nbytes;
1151 
1152 	return 0;
1153 }
1154 
1155 int
1156 spdk_idxd_batch_prep_dualcast(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
1157 			      void *dst1, void *dst2, const void *src, uint64_t nbytes, spdk_idxd_req_cb cb_fn, void *cb_arg)
1158 {
1159 	struct idxd_hw_desc *desc;
1160 	uint64_t src_addr, dst1_addr, dst2_addr;
1161 	uint64_t src_nbytes = 0, dst1_nbytes = 0, dst2_nbytes = 0;
1162 
1163 	if ((uintptr_t)dst1 & (ALIGN_4K - 1) || (uintptr_t)dst2 & (ALIGN_4K - 1)) {
1164 		SPDK_ERRLOG("Dualcast requires 4K alignment on dst addresses\n");
1165 		return -EINVAL;
1166 	}
1167 
1168 	/* Common prep. */
1169 	desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch);
1170 	if (desc == NULL) {
1171 		return -EINVAL;
1172 	}
1173 
1174 	src_addr = spdk_vtophys(src, &src_nbytes);
1175 	dst1_addr = spdk_vtophys(dst1, &dst1_nbytes);
1176 	dst2_addr = spdk_vtophys(dst2, &dst2_nbytes);
1177 
1178 	if (src_addr == SPDK_VTOPHYS_ERROR || dst1_addr == SPDK_VTOPHYS_ERROR ||
1179 	    dst2_addr == SPDK_VTOPHYS_ERROR) {
1180 		return -EINVAL;
1181 	}
1182 
1183 	if (src_nbytes < nbytes || dst1_nbytes < nbytes || dst2_nbytes < nbytes) {
1184 		return -EINVAL;
1185 	}
1186 
1187 	desc->opcode = IDXD_OPCODE_DUALCAST;
1188 	desc->src_addr = src_addr;
1189 	desc->dst_addr = dst1_addr;
1190 	desc->dest2 = dst2_addr;
1191 	desc->xfer_size = nbytes;
1192 
1193 	return 0;
1194 }
1195 
1196 int
1197 spdk_idxd_batch_prep_crc32c(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
1198 			    uint32_t *dst, void *src, uint32_t seed, uint64_t nbytes,
1199 			    spdk_idxd_req_cb cb_fn, void *cb_arg)
1200 {
1201 	struct idxd_hw_desc *desc;
1202 	uint64_t src_addr, dst_addr;
1203 	uint64_t src_nbytes = 0, dst_nbytes = 0;
1204 
1205 	/* Common prep. */
1206 	desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch);
1207 	if (desc == NULL) {
1208 		return -EINVAL;
1209 	}
1210 
1211 	src_addr = spdk_vtophys(src, &src_nbytes);
1212 	dst_addr = spdk_vtophys(dst, &dst_nbytes);
1213 
1214 	if (src_addr == SPDK_VTOPHYS_ERROR || dst_addr == SPDK_VTOPHYS_ERROR) {
1215 		return -EINVAL;
1216 	}
1217 
1218 	if (src_nbytes < nbytes || dst_nbytes < nbytes) {
1219 		return -EINVAL;
1220 	}
1221 
1222 	/* Command specific. */
1223 	desc->opcode = IDXD_OPCODE_CRC32C_GEN;
1224 	desc->dst_addr = dst_addr;
1225 	desc->src_addr = src_addr;
1226 	desc->flags &= IDXD_CLEAR_CRC_FLAGS;
1227 	desc->crc32c.seed = seed;
1228 	desc->xfer_size = nbytes;
1229 
1230 	return 0;
1231 }
1232 
1233 int
1234 spdk_idxd_batch_prep_compare(struct spdk_idxd_io_channel *chan, struct idxd_batch *batch,
1235 			     void *src1, void *src2, uint64_t nbytes, spdk_idxd_req_cb cb_fn,
1236 			     void *cb_arg)
1237 {
1238 	struct idxd_hw_desc *desc;
1239 	uint64_t src1_addr, src2_addr;
1240 	uint64_t src1_nbytes = 0, src2_nbytes = 0;
1241 
1242 	/* Common prep. */
1243 	desc = _idxd_prep_batch_cmd(chan, cb_fn, cb_arg, batch);
1244 	if (desc == NULL) {
1245 		return -EINVAL;
1246 	}
1247 
1248 	src1_addr = spdk_vtophys(src1, &src1_nbytes);
1249 	src2_addr = spdk_vtophys(src2, &src2_nbytes);
1250 
1251 	if (src1_addr == SPDK_VTOPHYS_ERROR || src2_addr == SPDK_VTOPHYS_ERROR) {
1252 		return -EINVAL;
1253 	}
1254 
1255 	if (src1_nbytes < nbytes || src2_nbytes < nbytes) {
1256 		return -EINVAL;
1257 	}
1258 
1259 	/* Command specific. */
1260 	desc->opcode = IDXD_OPCODE_COMPARE;
1261 	desc->src_addr = src1_addr;
1262 	desc->src2_addr = src2_addr;
1263 	desc->xfer_size = nbytes;
1264 
1265 	return 0;
1266 }
1267 
1268 static void
1269 _dump_error_reg(struct spdk_idxd_io_channel *chan)
1270 {
1271 	uint64_t sw_error_0;
1272 	uint16_t i;
1273 
1274 	sw_error_0 = _idxd_read_8(chan->idxd, IDXD_SWERR_OFFSET);
1275 
1276 	SPDK_NOTICELOG("SW Error bits set:");
1277 	for (i = 0; i < CHAR_BIT; i++) {
1278 		if ((1ULL << i) & sw_error_0) {
1279 			SPDK_NOTICELOG("    %d\n", i);
1280 		}
1281 	}
1282 	SPDK_NOTICELOG("SW Error error code: %#x\n", (uint8_t)(sw_error_0 >> 8));
1283 	SPDK_NOTICELOG("SW Error WQ index: %u\n", (uint8_t)(sw_error_0 >> 16));
1284 	SPDK_NOTICELOG("SW Error Operation: %u\n", (uint8_t)(sw_error_0 >> 32));
1285 }
1286 
1287 static void
1288 _free_batch(struct idxd_batch *batch, struct spdk_idxd_io_channel *chan,
1289 	    struct idxd_comp *comp)
1290 {
1291 	TAILQ_REMOVE(&chan->batches, batch, link);
1292 	TAILQ_INSERT_TAIL(&chan->batch_pool, batch, link);
1293 	comp->batch = NULL;
1294 	spdk_bit_array_clear(chan->ring_ctrl.user_ring_slots, batch->batch_num);
1295 	spdk_bit_array_clear(chan->ring_ctrl.ring_slots, batch->batch_desc_index);
1296 }
1297 
1298 static void
1299 _spdk_idxd_process_batch_events(struct spdk_idxd_io_channel *chan)
1300 {
1301 	uint16_t index;
1302 	struct idxd_comp *comp;
1303 	uint64_t sw_error_0;
1304 	int status = 0;
1305 	struct idxd_batch *batch;
1306 
1307 	/*
1308 	 * We don't check the bit array for user completions as there's only
1309 	 * one bit per per batch.
1310 	 */
1311 	for (index = 0; index < TOTAL_USER_DESC; index++) {
1312 		comp = &chan->ring_ctrl.user_completions[index];
1313 		if (comp->hw.status == 1) {
1314 			struct idxd_hw_desc *desc;
1315 
1316 			sw_error_0 = _idxd_read_8(chan->idxd, IDXD_SWERR_OFFSET);
1317 			if (sw_error_0 & 0x1) {
1318 				_dump_error_reg(chan);
1319 				status = -EINVAL;
1320 			}
1321 
1322 			desc = &chan->ring_ctrl.user_desc[index];
1323 			switch (desc->opcode) {
1324 			case IDXD_OPCODE_CRC32C_GEN:
1325 				*(uint32_t *)desc->dst_addr = comp->hw.crc32c_val;
1326 				*(uint32_t *)desc->dst_addr ^= ~0;
1327 				break;
1328 			case IDXD_OPCODE_COMPARE:
1329 				if (status == 0) {
1330 					status = comp->hw.result;
1331 				}
1332 				break;
1333 			case IDXD_OPCODE_MEMFILL:
1334 			case IDXD_OPCODE_DUALCAST:
1335 			case IDXD_OPCODE_MEMMOVE:
1336 				break;
1337 			default:
1338 				assert(false);
1339 				break;
1340 			}
1341 
1342 			/* The hw will complete all user desc first before the batch
1343 			 * desc (see spec for configuration exceptions) however
1344 			 * because of the order that we check for comps in the poller
1345 			 * we may "see" them in a different order than they actually
1346 			 * completed in.
1347 			 */
1348 			batch = comp->batch;
1349 			assert(batch->remaining > 0);
1350 			if (--batch->remaining == 0) {
1351 				_free_batch(batch, chan, comp);
1352 			}
1353 
1354 			comp->cb_fn((void *)comp->cb_arg, status);
1355 			comp->hw.status = status = 0;
1356 		}
1357 	}
1358 }
1359 
1360 /*
1361  * TODO: Experiment with different methods of reaping completions for performance
1362  * once we have real silicon.
1363  */
1364 void
1365 spdk_idxd_process_events(struct spdk_idxd_io_channel *chan)
1366 {
1367 	uint16_t index;
1368 	struct idxd_comp *comp;
1369 	uint64_t sw_error_0;
1370 	int status = 0;
1371 	struct idxd_batch *batch;
1372 
1373 	if (!TAILQ_EMPTY(&chan->batches)) {
1374 		_spdk_idxd_process_batch_events(chan);
1375 	}
1376 
1377 	for (index = 0; index < chan->ring_ctrl.max_ring_slots; index++) {
1378 		if (spdk_bit_array_get(chan->ring_ctrl.ring_slots, index)) {
1379 			comp = &chan->ring_ctrl.completions[index];
1380 			if (comp->hw.status == 1) {
1381 				struct idxd_hw_desc *desc;
1382 
1383 				sw_error_0 = _idxd_read_8(chan->idxd, IDXD_SWERR_OFFSET);
1384 				if (sw_error_0 & 0x1) {
1385 					_dump_error_reg(chan);
1386 					status = -EINVAL;
1387 				}
1388 
1389 				desc = &chan->ring_ctrl.desc[index];
1390 				switch (desc->opcode) {
1391 				case IDXD_OPCODE_BATCH:
1392 					/* The hw will complete all user desc first before the batch
1393 					 * desc (see spec for configuration exceptions) however
1394 					 * because of the order that we check for comps in the poller
1395 					 * we may "see" them in a different order than they actually
1396 					 * completed in.
1397 					 */
1398 					batch = comp->batch;
1399 					assert(batch->remaining > 0);
1400 					if (--batch->remaining == 0) {
1401 						_free_batch(batch, chan, comp);
1402 					}
1403 					break;
1404 				case IDXD_OPCODE_CRC32C_GEN:
1405 					*(uint32_t *)desc->dst_addr = comp->hw.crc32c_val;
1406 					*(uint32_t *)desc->dst_addr ^= ~0;
1407 					break;
1408 				case IDXD_OPCODE_COMPARE:
1409 					if (status == 0) {
1410 						status = comp->hw.result;
1411 					}
1412 					break;
1413 				}
1414 
1415 				comp->cb_fn(comp->cb_arg, status);
1416 				comp->hw.status = status = 0;
1417 				if (desc->opcode != IDXD_OPCODE_BATCH) {
1418 					spdk_bit_array_clear(chan->ring_ctrl.ring_slots, index);
1419 				}
1420 			}
1421 		}
1422 	}
1423 }
1424 
1425 SPDK_LOG_REGISTER_COMPONENT(idxd)
1426