xref: /spdk/module/accel/dpdk_cryptodev/accel_dpdk_cryptodev.c (revision 186b109dd3a723612e3df79bb3d97699173d39e3)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   Copyright (c) 2022, 2023 NVIDIA CORPORATION & AFFILIATES.
4  *   All rights reserved.
5  */
6 
7 #include "accel_dpdk_cryptodev.h"
8 
9 #include "spdk/accel.h"
10 #include "spdk/accel_module.h"
11 #include "spdk/env.h"
12 #include "spdk/likely.h"
13 #include "spdk/thread.h"
14 #include "spdk/util.h"
15 #include "spdk/log.h"
16 #include "spdk/json.h"
17 #include "spdk_internal/sgl.h"
18 
19 #include <rte_bus_vdev.h>
20 #include <rte_crypto.h>
21 #include <rte_cryptodev.h>
22 #include <rte_mbuf_dyn.h>
23 #include <rte_version.h>
24 
25 /* The VF spread is the number of queue pairs between virtual functions, we use this to
26  * load balance the QAT device.
27  */
28 #define ACCEL_DPDK_CRYPTODEV_QAT_VF_SPREAD		32
29 
30 /* This controls how many ops will be dequeued from the crypto driver in one run
31  * of the poller. It is mainly a performance knob as it effectively determines how
32  * much work the poller has to do.  However even that can vary between crypto drivers
33  * as the ACCEL_DPDK_CRYPTODEV_AESNI_MB driver for example does all the crypto work on dequeue whereas the
34  * QAT driver just dequeues what has been completed already.
35  */
36 #define ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE	64
37 
38 #define ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE (128)
39 
40 /* The number of MBUFS we need must be a power of two and to support other small IOs
41  * in addition to the limits mentioned above, we go to the next power of two. It is
42  * big number because it is one mempool for source and destination mbufs. It may
43  * need to be bigger to support multiple crypto drivers at once.
44  */
45 #define ACCEL_DPDK_CRYPTODEV_NUM_MBUFS			32768
46 #define ACCEL_DPDK_CRYPTODEV_POOL_CACHE_SIZE		256
47 #define ACCEL_DPDK_CRYPTODEV_MAX_CRYPTO_VOLUMES		128
48 #define ACCEL_DPDK_CRYPTODEV_NUM_SESSIONS		(2 * ACCEL_DPDK_CRYPTODEV_MAX_CRYPTO_VOLUMES)
49 #define ACCEL_DPDK_CRYPTODEV_SESS_MEMPOOL_CACHE_SIZE	0
50 
51 /* This is the max number of IOs we can supply to any crypto device QP at one time.
52  * It can vary between drivers.
53  */
54 #define ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS		2048
55 
56 /* At this moment DPDK descriptors allocation for mlx5 has some issues. We use 512
57  * as a compromise value between performance and the time spent for initialization. */
58 #define ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS_MLX5	512
59 
60 #define ACCEL_DPDK_CRYPTODEV_AESNI_MB_NUM_QP		64
61 
62 /* Common for supported devices. */
63 #define ACCEL_DPDK_CRYPTODEV_DEFAULT_NUM_XFORMS		2
64 #define ACCEL_DPDK_CRYPTODEV_IV_OFFSET (sizeof(struct rte_crypto_op) + \
65                 sizeof(struct rte_crypto_sym_op) + \
66                 (ACCEL_DPDK_CRYPTODEV_DEFAULT_NUM_XFORMS * \
67                  sizeof(struct rte_crypto_sym_xform)))
68 #define ACCEL_DPDK_CRYPTODEV_IV_LENGTH			16
69 
70 /* Driver names */
71 #define ACCEL_DPDK_CRYPTODEV_AESNI_MB	"crypto_aesni_mb"
72 #define ACCEL_DPDK_CRYPTODEV_QAT	"crypto_qat"
73 #define ACCEL_DPDK_CRYPTODEV_QAT_ASYM	"crypto_qat_asym"
74 #define ACCEL_DPDK_CRYPTODEV_MLX5	"mlx5_pci"
75 #define ACCEL_DPDK_CRYPTODEV_UADK	"crypto_uadk"
76 
77 /* Supported ciphers */
78 #define ACCEL_DPDK_CRYPTODEV_AES_CBC	"AES_CBC" /* QAT and ACCEL_DPDK_CRYPTODEV_AESNI_MB */
79 #define ACCEL_DPDK_CRYPTODEV_AES_XTS	"AES_XTS" /* QAT and MLX5 */
80 
81 /* Specific to AES_CBC. */
82 #define ACCEL_DPDK_CRYPTODEV_AES_CBC_128_KEY_SIZE			16
83 #define ACCEL_DPDK_CRYPTODEV_AES_CBC_256_KEY_SIZE			32
84 
85 /* Limit of the max memory len attached to mbuf - rte_pktmbuf_attach_extbuf has uint16_t `buf_len`
86  * parameter, we use closes aligned value 32768 for better performance */
87 #define ACCEL_DPDK_CRYPTODEV_MAX_MBUF_LEN			32768
88 
89 /* Used to store IO context in mbuf */
90 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
91 	.name = "context_accel_dpdk_cryptodev",
92 	.size = sizeof(uint64_t),
93 	.align = __alignof__(uint64_t),
94 	.flags = 0,
95 };
96 
97 struct accel_dpdk_cryptodev_device;
98 
99 enum accel_dpdk_cryptodev_driver_type {
100 	ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB = 0,
101 	ACCEL_DPDK_CRYPTODEV_DRIVER_QAT,
102 	ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI,
103 	ACCEL_DPDK_CRYPTODEV_DRIVER_UADK,
104 	ACCEL_DPDK_CRYPTODEV_DRIVER_LAST
105 };
106 
107 struct accel_dpdk_cryptodev_qp {
108 	struct accel_dpdk_cryptodev_device *device;	/* ptr to crypto device */
109 	uint32_t num_enqueued_ops;	/* Used to decide whether to poll the qp or not */
110 	uint8_t qp; /* queue identifier */
111 	bool in_use; /* whether this node is in use or not */
112 	uint8_t index; /* used by QAT to load balance placement of qpairs */
113 	TAILQ_ENTRY(accel_dpdk_cryptodev_qp) link;
114 };
115 
116 struct accel_dpdk_cryptodev_device {
117 	enum accel_dpdk_cryptodev_driver_type type;
118 	struct rte_cryptodev_info cdev_info; /* includes DPDK device friendly name */
119 	uint32_t qp_desc_nr; /* max number of qp descriptors to be enqueued in burst */
120 	uint8_t cdev_id; /* identifier for the device */
121 	TAILQ_HEAD(, accel_dpdk_cryptodev_qp) qpairs;
122 	TAILQ_ENTRY(accel_dpdk_cryptodev_device) link;
123 };
124 
125 struct accel_dpdk_cryptodev_key_handle {
126 	struct accel_dpdk_cryptodev_device *device;
127 	TAILQ_ENTRY(accel_dpdk_cryptodev_key_handle) link;
128 	void *session_encrypt;	/* encryption session for this key */
129 	void *session_decrypt;	/* decryption session for this key */
130 	struct rte_crypto_sym_xform cipher_xform;		/* crypto control struct for this key */
131 };
132 
133 struct accel_dpdk_cryptodev_key_priv {
134 	enum accel_dpdk_cryptodev_driver_type driver;
135 	enum spdk_accel_cipher cipher;
136 	char *xts_key;
137 	TAILQ_HEAD(, accel_dpdk_cryptodev_key_handle) dev_keys;
138 };
139 
140 /* The crypto channel struct. It is allocated and freed on my behalf by the io channel code.
141  * We store things in here that are needed on per thread basis like the base_channel for this thread,
142  * and the poller for this thread.
143  */
144 struct accel_dpdk_cryptodev_io_channel {
145 	/* completion poller */
146 	struct spdk_poller *poller;
147 	/* Array of qpairs for each available device. The specific device will be selected depending on the crypto key */
148 	struct accel_dpdk_cryptodev_qp *device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_LAST];
149 	/* Used to queue tasks when qpair is full or only part of crypto ops was submitted to the PMD */
150 	TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks;
151 	/* Used to queue tasks that were completed in submission path - to avoid calling cpl_cb and possibly overflow
152 	 * call stack */
153 	TAILQ_HEAD(, accel_dpdk_cryptodev_task) completed_tasks;
154 };
155 
156 struct accel_dpdk_cryptodev_task {
157 	struct spdk_accel_task base;
158 	uint32_t cryop_completed;	/* The number of crypto operations completed by HW */
159 	uint32_t cryop_submitted;	/* The number of crypto operations submitted to HW */
160 	uint32_t cryop_total;		/* Total number of crypto operations in this task */
161 	bool is_failed;
162 	bool inplace;
163 	TAILQ_ENTRY(accel_dpdk_cryptodev_task) link;
164 };
165 
166 /* Shared mempools between all devices on this system */
167 static struct rte_mempool *g_session_mp = NULL;
168 static struct rte_mempool *g_session_mp_priv = NULL;
169 static struct rte_mempool *g_mbuf_mp = NULL;            /* mbuf mempool */
170 static int g_mbuf_offset;
171 static struct rte_mempool *g_crypto_op_mp = NULL;	/* crypto operations, must be rte* mempool */
172 
173 static struct rte_mbuf_ext_shared_info g_shinfo = {};   /* used by DPDK mbuf macro */
174 
175 static uint8_t g_qat_total_qp = 0;
176 static uint8_t g_next_qat_index;
177 
178 static const char *g_driver_names[] = {
179 	[ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB]	= ACCEL_DPDK_CRYPTODEV_AESNI_MB,
180 	[ACCEL_DPDK_CRYPTODEV_DRIVER_QAT]	= ACCEL_DPDK_CRYPTODEV_QAT,
181 	[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI]	= ACCEL_DPDK_CRYPTODEV_MLX5,
182 	[ACCEL_DPDK_CRYPTODEV_DRIVER_UADK]	= ACCEL_DPDK_CRYPTODEV_UADK
183 };
184 static const char *g_cipher_names[] = {
185 	[SPDK_ACCEL_CIPHER_AES_CBC]	= ACCEL_DPDK_CRYPTODEV_AES_CBC,
186 	[SPDK_ACCEL_CIPHER_AES_XTS]	= ACCEL_DPDK_CRYPTODEV_AES_XTS,
187 };
188 
189 static enum accel_dpdk_cryptodev_driver_type g_dpdk_cryptodev_driver =
190 	ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
191 
192 /* Global list of all crypto devices */
193 static TAILQ_HEAD(, accel_dpdk_cryptodev_device) g_crypto_devices = TAILQ_HEAD_INITIALIZER(
194 			g_crypto_devices);
195 static pthread_mutex_t g_device_lock = PTHREAD_MUTEX_INITIALIZER;
196 
197 static struct spdk_accel_module_if g_accel_dpdk_cryptodev_module;
198 
199 static int accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto_ch,
200 		struct accel_dpdk_cryptodev_task *task);
201 
202 void
203 accel_dpdk_cryptodev_enable(void)
204 {
205 	spdk_accel_module_list_add(&g_accel_dpdk_cryptodev_module);
206 }
207 
208 int
209 accel_dpdk_cryptodev_set_driver(const char *driver_name)
210 {
211 	if (strcmp(driver_name, ACCEL_DPDK_CRYPTODEV_QAT) == 0) {
212 		g_dpdk_cryptodev_driver = ACCEL_DPDK_CRYPTODEV_DRIVER_QAT;
213 	} else if (strcmp(driver_name, ACCEL_DPDK_CRYPTODEV_AESNI_MB) == 0) {
214 		g_dpdk_cryptodev_driver = ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
215 	} else if (strcmp(driver_name, ACCEL_DPDK_CRYPTODEV_MLX5) == 0) {
216 		g_dpdk_cryptodev_driver = ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI;
217 	} else if (strcmp(driver_name, ACCEL_DPDK_CRYPTODEV_UADK) == 0) {
218 		g_dpdk_cryptodev_driver = ACCEL_DPDK_CRYPTODEV_DRIVER_UADK;
219 	} else {
220 		SPDK_ERRLOG("Unsupported driver %s\n", driver_name);
221 		return -EINVAL;
222 	}
223 
224 	SPDK_NOTICELOG("Using driver %s\n", driver_name);
225 
226 	return 0;
227 }
228 
229 const char *
230 accel_dpdk_cryptodev_get_driver(void)
231 {
232 	return g_driver_names[g_dpdk_cryptodev_driver];
233 }
234 
235 static inline uint16_t
236 accel_dpdk_cryptodev_poll_qp(struct accel_dpdk_cryptodev_qp *qp,
237 			     struct accel_dpdk_cryptodev_io_channel *crypto_ch)
238 {
239 	struct rte_crypto_op *dequeued_ops[ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE];
240 	struct rte_mbuf *mbufs_to_free[2 * ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE];
241 	struct accel_dpdk_cryptodev_task *task;
242 	uint32_t num_mbufs = 0;
243 	int i;
244 	uint16_t num_dequeued_ops;
245 
246 	/* Each run of the poller will get just what the device has available
247 	 * at the moment we call it, we don't check again after draining the
248 	 * first batch.
249 	 */
250 	num_dequeued_ops = rte_cryptodev_dequeue_burst(qp->device->cdev_id, qp->qp,
251 			   dequeued_ops, ACCEL_DPDK_CRYPTODEV_MAX_DEQUEUE_BURST_SIZE);
252 	/* Check if operation was processed successfully */
253 	for (i = 0; i < num_dequeued_ops; i++) {
254 
255 		/* We don't know the order or association of the crypto ops wrt any
256 		 * particular task so need to look at each and determine if it's
257 		 * the last one for it's task or not.
258 		 */
259 		task = (struct accel_dpdk_cryptodev_task *)*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src,
260 				g_mbuf_offset, uint64_t *);
261 		assert(task != NULL);
262 
263 		if (dequeued_ops[i]->status != RTE_CRYPTO_OP_STATUS_SUCCESS) {
264 			SPDK_ERRLOG("error with op %d status %u\n", i, dequeued_ops[i]->status);
265 			/* Update the task status to error, we'll still process the
266 			 * rest of the crypto ops for this task though so they
267 			 * aren't left hanging.
268 			 */
269 			task->is_failed = true;
270 		}
271 
272 		/* Return the associated src and dst mbufs by collecting them into
273 		 * an array that we can use the bulk API to free after the loop.
274 		 */
275 		*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset, uint64_t *) = 0;
276 		mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_src;
277 		if (dequeued_ops[i]->sym->m_dst) {
278 			mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_dst;
279 		}
280 
281 		task->cryop_completed++;
282 		if (task->cryop_completed == task->cryop_total) {
283 			/* Complete the IO */
284 			spdk_accel_task_complete(&task->base, task->is_failed ? -EINVAL : 0);
285 		} else if (task->cryop_completed == task->cryop_submitted) {
286 			/* submit remaining crypto ops */
287 			int rc = accel_dpdk_cryptodev_process_task(crypto_ch, task);
288 
289 			if (spdk_unlikely(rc)) {
290 				if (rc == -ENOMEM) {
291 					TAILQ_INSERT_TAIL(&crypto_ch->queued_tasks, task, link);
292 					continue;
293 				} else if (rc == -EALREADY) {
294 					/* -EALREADY means that a task is completed, but it might be unsafe to complete
295 					 * it if we are in the submission path. Since we are in the poller context, we can
296 					 * complete th task immediately */
297 					rc = 0;
298 				}
299 				spdk_accel_task_complete(&task->base, rc);
300 			}
301 		}
302 	}
303 
304 	/* Now bulk free both mbufs and crypto operations. */
305 	if (num_dequeued_ops > 0) {
306 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)dequeued_ops, num_dequeued_ops);
307 		assert(num_mbufs > 0);
308 		/* This also releases chained mbufs if any. */
309 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
310 	}
311 
312 	assert(qp->num_enqueued_ops >= num_dequeued_ops);
313 	qp->num_enqueued_ops -= num_dequeued_ops;
314 
315 	return num_dequeued_ops;
316 }
317 
318 /* This is the poller for the crypto module. It uses a single API to dequeue whatever is ready at
319  * the device. Then we need to decide if what we've got so far (including previous poller
320  * runs) totals up to one or more complete task */
321 static int
322 accel_dpdk_cryptodev_poller(void *args)
323 {
324 	struct accel_dpdk_cryptodev_io_channel *crypto_ch = args;
325 	struct accel_dpdk_cryptodev_qp *qp;
326 	struct accel_dpdk_cryptodev_task *task, *task_tmp;
327 	TAILQ_HEAD(, accel_dpdk_cryptodev_task) queued_tasks_tmp;
328 	uint32_t num_dequeued_ops = 0, num_enqueued_ops = 0, num_completed_tasks = 0;
329 	int i, rc;
330 
331 	for (i = 0; i < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST; i++) {
332 		qp = crypto_ch->device_qp[i];
333 		/* Avoid polling "idle" qps since it may affect performance */
334 		if (qp && qp->num_enqueued_ops) {
335 			num_dequeued_ops += accel_dpdk_cryptodev_poll_qp(qp, crypto_ch);
336 		}
337 	}
338 
339 	if (!TAILQ_EMPTY(&crypto_ch->queued_tasks)) {
340 		TAILQ_INIT(&queued_tasks_tmp);
341 
342 		TAILQ_FOREACH_SAFE(task, &crypto_ch->queued_tasks, link, task_tmp) {
343 			TAILQ_REMOVE(&crypto_ch->queued_tasks, task, link);
344 			rc = accel_dpdk_cryptodev_process_task(crypto_ch, task);
345 			if (spdk_unlikely(rc)) {
346 				if (rc == -ENOMEM) {
347 					TAILQ_INSERT_TAIL(&queued_tasks_tmp, task, link);
348 					/* Other queued tasks may belong to other qpairs,
349 					 * so process the whole list */
350 					continue;
351 				} else if (rc == -EALREADY) {
352 					/* -EALREADY means that a task is completed, but it might be unsafe to complete
353 					 * it if we are in the submission path. Since we are in the poller context, we can
354 					 * complete th task immediately */
355 					rc = 0;
356 				}
357 				spdk_accel_task_complete(&task->base, rc);
358 				num_completed_tasks++;
359 			} else {
360 				num_enqueued_ops++;
361 			}
362 		}
363 
364 		TAILQ_SWAP(&crypto_ch->queued_tasks, &queued_tasks_tmp, accel_dpdk_cryptodev_task, link);
365 	}
366 
367 	TAILQ_FOREACH_SAFE(task, &crypto_ch->completed_tasks, link, task_tmp) {
368 		TAILQ_REMOVE(&crypto_ch->completed_tasks, task, link);
369 		spdk_accel_task_complete(&task->base, 0);
370 		num_completed_tasks++;
371 	}
372 
373 	return !!(num_dequeued_ops + num_enqueued_ops + num_completed_tasks);
374 }
375 
376 /* Allocate the new mbuf of @remainder size with data pointed by @addr and attach
377  * it to the @orig_mbuf. */
378 static inline int
379 accel_dpdk_cryptodev_mbuf_chain_remainder(struct accel_dpdk_cryptodev_task *task,
380 		struct rte_mbuf *orig_mbuf, uint8_t *addr, uint64_t *_remainder)
381 {
382 	uint64_t phys_addr, phys_len, remainder = *_remainder;
383 	struct rte_mbuf *chain_mbuf;
384 	int rc;
385 
386 	phys_len = remainder;
387 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
388 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR)) {
389 		return -EFAULT;
390 	}
391 	remainder = spdk_min(remainder, phys_len);
392 	remainder = spdk_min(remainder, ACCEL_DPDK_CRYPTODEV_MAX_MBUF_LEN);
393 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&chain_mbuf, 1);
394 	if (spdk_unlikely(rc)) {
395 		return -ENOMEM;
396 	}
397 	/* Store context in every mbuf as we don't know anything about completion order */
398 	*RTE_MBUF_DYNFIELD(chain_mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)task;
399 	rte_pktmbuf_attach_extbuf(chain_mbuf, addr, phys_addr, remainder, &g_shinfo);
400 	rte_pktmbuf_append(chain_mbuf, remainder);
401 
402 	/* Chained buffer is released by rte_pktbuf_free_bulk() automagically. */
403 	rte_pktmbuf_chain(orig_mbuf, chain_mbuf);
404 	*_remainder = remainder;
405 
406 	return 0;
407 }
408 
409 /* Attach data buffer pointed by @addr to @mbuf. Return utilized len of the
410  * contiguous space that was physically available. */
411 static inline uint64_t
412 accel_dpdk_cryptodev_mbuf_attach_buf(struct accel_dpdk_cryptodev_task *task, struct rte_mbuf *mbuf,
413 				     uint8_t *addr, uint32_t len)
414 {
415 	uint64_t phys_addr, phys_len;
416 
417 	/* Store context in every mbuf as we don't know anything about completion order */
418 	*RTE_MBUF_DYNFIELD(mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)task;
419 
420 	phys_len = len;
421 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
422 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len == 0)) {
423 		return 0;
424 	}
425 	assert(phys_len <= len);
426 	phys_len = spdk_min(phys_len, ACCEL_DPDK_CRYPTODEV_MAX_MBUF_LEN);
427 
428 	/* Set the mbuf elements address and length. */
429 	rte_pktmbuf_attach_extbuf(mbuf, addr, phys_addr, phys_len, &g_shinfo);
430 	rte_pktmbuf_append(mbuf, phys_len);
431 
432 	return phys_len;
433 }
434 
435 static inline struct accel_dpdk_cryptodev_key_handle *
436 accel_dpdk_find_key_handle_in_channel(struct accel_dpdk_cryptodev_io_channel *crypto_ch,
437 				      struct accel_dpdk_cryptodev_key_priv *key)
438 {
439 	struct accel_dpdk_cryptodev_key_handle *key_handle;
440 
441 	if (key->driver == ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI) {
442 		/* Crypto key is registered on all available devices while io_channel opens CQ/QP on a single device.
443 		 * We need to iterate a list of key entries to find a suitable device */
444 		TAILQ_FOREACH(key_handle, &key->dev_keys, link) {
445 			if (key_handle->device->cdev_id ==
446 			    crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI]->device->cdev_id) {
447 				return key_handle;
448 			}
449 		}
450 		return NULL;
451 	} else {
452 		return TAILQ_FIRST(&key->dev_keys);
453 	}
454 }
455 
456 static inline int
457 accel_dpdk_cryptodev_task_alloc_resources(struct rte_mbuf **src_mbufs, struct rte_mbuf **dst_mbufs,
458 		struct rte_crypto_op **crypto_ops, int count)
459 {
460 	int rc;
461 
462 	/* Get the number of source mbufs that we need. These will always be 1:1 because we
463 	 * don't support chaining. The reason we don't is because of our decision to use
464 	 * LBA as IV, there can be no case where we'd need >1 mbuf per crypto op or the
465 	 * op would be > 1 LBA.
466 	 */
467 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, src_mbufs, count);
468 	if (rc) {
469 		SPDK_ERRLOG("Failed to get src_mbufs!\n");
470 		return -ENOMEM;
471 	}
472 
473 	/* Get the same amount to describe destination. If crypto operation is inline then we don't just skip it */
474 	if (dst_mbufs) {
475 		rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, dst_mbufs, count);
476 		if (rc) {
477 			SPDK_ERRLOG("Failed to get dst_mbufs!\n");
478 			goto err_free_src;
479 		}
480 	}
481 
482 #ifdef __clang_analyzer__
483 	/* silence scan-build false positive */
484 	SPDK_CLANG_ANALYZER_PREINIT_PTR_ARRAY(crypto_ops, ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE,
485 					      0x1000);
486 #endif
487 	/* Allocate crypto operations. */
488 	rc = rte_crypto_op_bulk_alloc(g_crypto_op_mp,
489 				      RTE_CRYPTO_OP_TYPE_SYMMETRIC,
490 				      crypto_ops, count);
491 	if (rc < count) {
492 		SPDK_ERRLOG("Failed to allocate crypto ops! rc %d\n", rc);
493 		goto err_free_ops;
494 	}
495 
496 	return 0;
497 
498 err_free_ops:
499 	if (rc > 0) {
500 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops, rc);
501 	}
502 	if (dst_mbufs) {
503 		/* This also releases chained mbufs if any. */
504 		rte_pktmbuf_free_bulk(dst_mbufs, count);
505 	}
506 err_free_src:
507 	/* This also releases chained mbufs if any. */
508 	rte_pktmbuf_free_bulk(src_mbufs, count);
509 
510 	return -ENOMEM;
511 }
512 
513 static inline int
514 accel_dpdk_cryptodev_mbuf_add_single_block(struct spdk_iov_sgl *sgl, struct rte_mbuf *mbuf,
515 		struct accel_dpdk_cryptodev_task *task)
516 {
517 	int rc;
518 	uint8_t *buf_addr;
519 	uint64_t phys_len;
520 	uint64_t remainder;
521 	uint64_t buf_len;
522 
523 	assert(sgl->iov->iov_len > sgl->iov_offset);
524 	buf_len = spdk_min(task->base.block_size, sgl->iov->iov_len - sgl->iov_offset);
525 	buf_addr = sgl->iov->iov_base + sgl->iov_offset;
526 	phys_len = accel_dpdk_cryptodev_mbuf_attach_buf(task, mbuf, buf_addr, buf_len);
527 	if (spdk_unlikely(phys_len == 0)) {
528 		return -EFAULT;
529 	}
530 	buf_len = spdk_min(buf_len, phys_len);
531 	spdk_iov_sgl_advance(sgl, buf_len);
532 
533 	/* Handle the case of page boundary. */
534 	assert(task->base.block_size >= buf_len);
535 	remainder = task->base.block_size - buf_len;
536 	while (remainder) {
537 		buf_len = spdk_min(remainder, sgl->iov->iov_len - sgl->iov_offset);
538 		buf_addr = sgl->iov->iov_base + sgl->iov_offset;
539 		rc = accel_dpdk_cryptodev_mbuf_chain_remainder(task, mbuf, buf_addr, &buf_len);
540 		if (spdk_unlikely(rc)) {
541 			return rc;
542 		}
543 		spdk_iov_sgl_advance(sgl, buf_len);
544 		remainder -= buf_len;
545 	}
546 
547 	return 0;
548 }
549 
550 static inline void
551 accel_dpdk_cryptodev_op_set_iv(struct rte_crypto_op *crypto_op, uint64_t iv)
552 {
553 	uint8_t *iv_ptr = rte_crypto_op_ctod_offset(crypto_op, uint8_t *, ACCEL_DPDK_CRYPTODEV_IV_OFFSET);
554 
555 	/* Set the IV - we use the LBA of the crypto_op */
556 	memset(iv_ptr, 0, ACCEL_DPDK_CRYPTODEV_IV_LENGTH);
557 	rte_memcpy(iv_ptr, &iv, sizeof(uint64_t));
558 }
559 
560 static inline void
561 accel_dpdk_cryptodev_update_resources_from_pools(struct rte_crypto_op **crypto_ops,
562 		struct rte_mbuf **src_mbufs, struct rte_mbuf **dst_mbufs,
563 		uint32_t num_enqueued_ops, uint32_t cryop_cnt)
564 {
565 	memmove(crypto_ops, &crypto_ops[num_enqueued_ops], sizeof(crypto_ops[0]) * cryop_cnt);
566 	memmove(src_mbufs, &src_mbufs[num_enqueued_ops], sizeof(src_mbufs[0]) * cryop_cnt);
567 	if (dst_mbufs) {
568 		memmove(dst_mbufs, &dst_mbufs[num_enqueued_ops], sizeof(dst_mbufs[0]) * cryop_cnt);
569 	}
570 }
571 
572 static int
573 accel_dpdk_cryptodev_process_task(struct accel_dpdk_cryptodev_io_channel *crypto_ch,
574 				  struct accel_dpdk_cryptodev_task *task)
575 {
576 	uint16_t num_enqueued_ops;
577 	uint32_t cryop_cnt;
578 	uint32_t crypto_len = task->base.block_size;
579 	uint64_t dst_length, total_length;
580 	uint32_t sgl_offset;
581 	uint32_t qp_capacity;
582 	uint64_t iv_start;
583 	uint32_t i, crypto_index;
584 	struct rte_crypto_op *crypto_ops[ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE];
585 	struct rte_mbuf *src_mbufs[ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE];
586 	struct rte_mbuf *dst_mbufs[ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE];
587 	void *session;
588 	struct accel_dpdk_cryptodev_key_priv *priv;
589 	struct accel_dpdk_cryptodev_key_handle *key_handle;
590 	struct accel_dpdk_cryptodev_qp *qp;
591 	struct accel_dpdk_cryptodev_device *dev;
592 	struct spdk_iov_sgl src, dst = {};
593 	int rc;
594 	bool inplace = task->inplace;
595 
596 	if (spdk_unlikely(!task->base.crypto_key ||
597 			  task->base.crypto_key->module_if != &g_accel_dpdk_cryptodev_module)) {
598 		return -EINVAL;
599 	}
600 
601 	priv = task->base.crypto_key->priv;
602 	assert(priv->driver < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST);
603 
604 	if (task->cryop_completed) {
605 		/* We continue to process remaining blocks */
606 		assert(task->cryop_submitted == task->cryop_completed);
607 		assert(task->cryop_total > task->cryop_completed);
608 		cryop_cnt = task->cryop_total - task->cryop_completed;
609 		sgl_offset = task->cryop_completed * crypto_len;
610 		iv_start = task->base.iv + task->cryop_completed;
611 	} else {
612 		/* That is a new task */
613 		total_length = 0;
614 		for (i = 0; i < task->base.s.iovcnt; i++) {
615 			total_length += task->base.s.iovs[i].iov_len;
616 		}
617 		dst_length = 0;
618 		for (i = 0; i < task->base.d.iovcnt; i++) {
619 			dst_length += task->base.d.iovs[i].iov_len;
620 		}
621 
622 		if (spdk_unlikely(total_length != dst_length || !total_length)) {
623 			return -ERANGE;
624 		}
625 		if (spdk_unlikely(total_length % task->base.block_size != 0)) {
626 			return -EINVAL;
627 		}
628 
629 		cryop_cnt = total_length / task->base.block_size;
630 		task->cryop_total = cryop_cnt;
631 		sgl_offset = 0;
632 		iv_start = task->base.iv;
633 	}
634 
635 	/* Limit the number of crypto ops that we can process once */
636 	cryop_cnt = spdk_min(cryop_cnt, ACCEL_DPDK_CRYPTODEV_MAX_ENQUEUE_ARRAY_SIZE);
637 
638 	qp = crypto_ch->device_qp[priv->driver];
639 	assert(qp);
640 	dev = qp->device;
641 	assert(dev);
642 	assert(dev->qp_desc_nr >= qp->num_enqueued_ops);
643 
644 	qp_capacity = dev->qp_desc_nr - qp->num_enqueued_ops;
645 	cryop_cnt = spdk_min(cryop_cnt, qp_capacity);
646 	if (spdk_unlikely(cryop_cnt == 0)) {
647 		/* QP is full */
648 		return -ENOMEM;
649 	}
650 
651 	key_handle = accel_dpdk_find_key_handle_in_channel(crypto_ch, priv);
652 	if (spdk_unlikely(!key_handle)) {
653 		SPDK_ERRLOG("Failed to find a key handle, driver %s, cipher %s\n", g_driver_names[priv->driver],
654 			    g_cipher_names[priv->cipher]);
655 		return -EINVAL;
656 	}
657 	/* mlx5_pci binds keys to a specific device, we can't use a key with any device */
658 	assert(dev == key_handle->device || priv->driver != ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI);
659 
660 	if (task->base.op_code == SPDK_ACCEL_OPC_ENCRYPT) {
661 		session = key_handle->session_encrypt;
662 	} else if (task->base.op_code == SPDK_ACCEL_OPC_DECRYPT) {
663 		session = key_handle->session_decrypt;
664 	} else {
665 		return -EINVAL;
666 	}
667 
668 	rc = accel_dpdk_cryptodev_task_alloc_resources(src_mbufs, inplace ? NULL : dst_mbufs,
669 			crypto_ops, cryop_cnt);
670 	if (rc) {
671 		return rc;
672 	}
673 
674 	/* As we don't support chaining because of a decision to use LBA as IV, construction
675 	 * of crypto operations is straightforward. We build both the op, the mbuf and the
676 	 * dst_mbuf in our local arrays by looping through the length of the accel task and
677 	 * picking off LBA sized blocks of memory from the IOVs as we walk through them. Each
678 	 * LBA sized chunk of memory will correspond 1:1 to a crypto operation and a single
679 	 * mbuf per crypto operation.
680 	 */
681 	spdk_iov_sgl_init(&src, task->base.s.iovs, task->base.s.iovcnt, 0);
682 	spdk_iov_sgl_advance(&src, sgl_offset);
683 	if (!inplace) {
684 		spdk_iov_sgl_init(&dst, task->base.d.iovs, task->base.d.iovcnt, 0);
685 		spdk_iov_sgl_advance(&dst, sgl_offset);
686 	}
687 
688 	for (crypto_index = 0; crypto_index < cryop_cnt; crypto_index++) {
689 		rc = accel_dpdk_cryptodev_mbuf_add_single_block(&src, src_mbufs[crypto_index], task);
690 		if (spdk_unlikely(rc)) {
691 			goto free_ops;
692 		}
693 		accel_dpdk_cryptodev_op_set_iv(crypto_ops[crypto_index], iv_start);
694 		iv_start++;
695 
696 		/* Set the data to encrypt/decrypt length */
697 		crypto_ops[crypto_index]->sym->cipher.data.length = crypto_len;
698 		crypto_ops[crypto_index]->sym->cipher.data.offset = 0;
699 		rte_crypto_op_attach_sym_session(crypto_ops[crypto_index], session);
700 
701 		/* link the mbuf to the crypto op. */
702 		crypto_ops[crypto_index]->sym->m_src = src_mbufs[crypto_index];
703 
704 		if (inplace) {
705 			crypto_ops[crypto_index]->sym->m_dst = NULL;
706 		} else {
707 #ifndef __clang_analyzer__
708 			/* scan-build thinks that dst_mbufs is not initialized */
709 			rc = accel_dpdk_cryptodev_mbuf_add_single_block(&dst, dst_mbufs[crypto_index], task);
710 			if (spdk_unlikely(rc)) {
711 				goto free_ops;
712 			}
713 			crypto_ops[crypto_index]->sym->m_dst = dst_mbufs[crypto_index];
714 #endif
715 		}
716 	}
717 
718 	/* Enqueue everything we've got but limit by the max number of descriptors we
719 	 * configured the crypto device for.
720 	 */
721 	num_enqueued_ops = rte_cryptodev_enqueue_burst(dev->cdev_id, qp->qp, crypto_ops, cryop_cnt);
722 	/* This value is used in the completion callback to determine when the accel task is complete. */
723 	task->cryop_submitted += num_enqueued_ops;
724 	qp->num_enqueued_ops += num_enqueued_ops;
725 	/* We were unable to enqueue everything but did get some, so need to decide what
726 	 * to do based on the status of the last op.
727 	 */
728 	if (num_enqueued_ops < cryop_cnt) {
729 		switch (crypto_ops[num_enqueued_ops]->status) {
730 		case RTE_CRYPTO_OP_STATUS_SUCCESS:
731 			/* Crypto operation might be completed successfully but enqueuing to a completion ring might fail.
732 			 * That might happen with SW PMDs like openssl
733 			 * We can't retry such operation on next turn since if crypto operation was inplace, we can encrypt/
734 			 * decrypt already processed buffer. See github issue #2907 for more details.
735 			 * Handle this case as the crypto op was completed successfully - increment cryop_submitted and
736 			 * cryop_completed.
737 			 * We won't receive a completion for such operation, so we need to cleanup mbufs and crypto_ops */
738 			assert(task->cryop_total > task->cryop_completed);
739 			task->cryop_completed++;
740 			task->cryop_submitted++;
741 			if (task->cryop_completed == task->cryop_total) {
742 				assert(num_enqueued_ops == 0);
743 				/* All crypto ops are completed. We can't complete the task immediately since this function might be
744 				 * called in scope of spdk_accel_submit_* function and user's logic in the completion callback
745 				 * might lead to stack overflow */
746 				cryop_cnt -= num_enqueued_ops;
747 				accel_dpdk_cryptodev_update_resources_from_pools(crypto_ops, src_mbufs, inplace ? NULL : dst_mbufs,
748 						num_enqueued_ops, cryop_cnt);
749 				rc = -EALREADY;
750 				goto free_ops;
751 			}
752 		/* fallthrough */
753 		case RTE_CRYPTO_OP_STATUS_NOT_PROCESSED:
754 			if (num_enqueued_ops == 0) {
755 				/* Nothing was submitted. Free crypto ops and mbufs, treat this case as NOMEM */
756 				rc = -ENOMEM;
757 				goto free_ops;
758 			}
759 			/* Part of the crypto operations were not submitted, release mbufs and crypto ops.
760 			 * The rest crypto ops will be submitted again once current batch is completed */
761 			cryop_cnt -= num_enqueued_ops;
762 			accel_dpdk_cryptodev_update_resources_from_pools(crypto_ops, src_mbufs, inplace ? NULL : dst_mbufs,
763 					num_enqueued_ops, cryop_cnt);
764 			rc = 0;
765 			goto free_ops;
766 		default:
767 			/* For all other statuses, mark task as failed so that the poller will pick
768 			 * the failure up for the overall task status.
769 			 */
770 			task->is_failed = true;
771 			if (num_enqueued_ops == 0) {
772 				/* If nothing was enqueued, but the last one wasn't because of
773 				 * busy, fail it now as the poller won't know anything about it.
774 				 */
775 				rc = -EINVAL;
776 				goto free_ops;
777 			}
778 			break;
779 		}
780 	}
781 
782 	return 0;
783 
784 	/* Error cleanup paths. */
785 free_ops:
786 	if (!inplace) {
787 		/* This also releases chained mbufs if any. */
788 		rte_pktmbuf_free_bulk(dst_mbufs, cryop_cnt);
789 	}
790 	rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops, cryop_cnt);
791 	/* This also releases chained mbufs if any. */
792 	rte_pktmbuf_free_bulk(src_mbufs, cryop_cnt);
793 	return rc;
794 }
795 
796 static inline struct accel_dpdk_cryptodev_qp *
797 accel_dpdk_cryptodev_get_next_device_qpair(enum accel_dpdk_cryptodev_driver_type type)
798 {
799 	struct accel_dpdk_cryptodev_device *device, *device_tmp;
800 	struct accel_dpdk_cryptodev_qp *qpair;
801 
802 	TAILQ_FOREACH_SAFE(device, &g_crypto_devices, link, device_tmp) {
803 		if (device->type != type) {
804 			continue;
805 		}
806 		TAILQ_FOREACH(qpair, &device->qpairs, link) {
807 			if (!qpair->in_use) {
808 				qpair->in_use = true;
809 				return qpair;
810 			}
811 		}
812 	}
813 
814 	return NULL;
815 }
816 
817 /* Helper function for the channel creation callback.
818  * Returns the number of drivers assigned to the channel */
819 static uint32_t
820 accel_dpdk_cryptodev_assign_device_qps(struct accel_dpdk_cryptodev_io_channel *crypto_ch)
821 {
822 	struct accel_dpdk_cryptodev_device *device;
823 	struct accel_dpdk_cryptodev_qp *device_qp;
824 	uint32_t num_drivers = 0;
825 	bool qat_found = false;
826 
827 	pthread_mutex_lock(&g_device_lock);
828 
829 	TAILQ_FOREACH(device, &g_crypto_devices, link) {
830 		if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT && !qat_found) {
831 			/* For some QAT devices, the optimal qp to use is every 32nd as this spreads the
832 			 * workload out over the multiple virtual functions in the device. For the devices
833 			 * where this isn't the case, it doesn't hurt.
834 			 */
835 			TAILQ_FOREACH(device_qp, &device->qpairs, link) {
836 				if (device_qp->index != g_next_qat_index) {
837 					continue;
838 				}
839 				if (device_qp->in_use == false) {
840 					assert(crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_QAT] == NULL);
841 					crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_QAT] = device_qp;
842 					device_qp->in_use = true;
843 					g_next_qat_index = (g_next_qat_index + ACCEL_DPDK_CRYPTODEV_QAT_VF_SPREAD) % g_qat_total_qp;
844 					qat_found = true;
845 					num_drivers++;
846 					break;
847 				} else {
848 					/* if the preferred index is used, skip to the next one in this set. */
849 					g_next_qat_index = (g_next_qat_index + 1) % g_qat_total_qp;
850 				}
851 			}
852 		}
853 	}
854 
855 	/* For ACCEL_DPDK_CRYPTODEV_AESNI_MB and MLX5_PCI select devices in round-robin manner */
856 	device_qp = accel_dpdk_cryptodev_get_next_device_qpair(ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB);
857 	if (device_qp) {
858 		assert(crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB] == NULL);
859 		crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB] = device_qp;
860 		num_drivers++;
861 	}
862 
863 	device_qp = accel_dpdk_cryptodev_get_next_device_qpair(ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI);
864 	if (device_qp) {
865 		assert(crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI] == NULL);
866 		crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI] = device_qp;
867 		num_drivers++;
868 	}
869 
870 	device_qp = accel_dpdk_cryptodev_get_next_device_qpair(ACCEL_DPDK_CRYPTODEV_DRIVER_UADK);
871 	if (device_qp) {
872 		assert(crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_UADK] == NULL);
873 		crypto_ch->device_qp[ACCEL_DPDK_CRYPTODEV_DRIVER_UADK] = device_qp;
874 		num_drivers++;
875 	}
876 	pthread_mutex_unlock(&g_device_lock);
877 
878 	return num_drivers;
879 }
880 
881 static void
882 _accel_dpdk_cryptodev_destroy_cb(void *io_device, void *ctx_buf)
883 {
884 	struct accel_dpdk_cryptodev_io_channel *crypto_ch = (struct accel_dpdk_cryptodev_io_channel *)
885 			ctx_buf;
886 	int i;
887 
888 	pthread_mutex_lock(&g_device_lock);
889 	for (i = 0; i < ACCEL_DPDK_CRYPTODEV_DRIVER_LAST; i++) {
890 		if (crypto_ch->device_qp[i]) {
891 			crypto_ch->device_qp[i]->in_use = false;
892 		}
893 	}
894 	pthread_mutex_unlock(&g_device_lock);
895 
896 	spdk_poller_unregister(&crypto_ch->poller);
897 }
898 
899 static int
900 _accel_dpdk_cryptodev_create_cb(void *io_device, void *ctx_buf)
901 {
902 	struct accel_dpdk_cryptodev_io_channel *crypto_ch = (struct accel_dpdk_cryptodev_io_channel *)
903 			ctx_buf;
904 
905 	crypto_ch->poller = SPDK_POLLER_REGISTER(accel_dpdk_cryptodev_poller, crypto_ch, 0);
906 	if (!accel_dpdk_cryptodev_assign_device_qps(crypto_ch)) {
907 		SPDK_ERRLOG("No crypto drivers assigned\n");
908 		spdk_poller_unregister(&crypto_ch->poller);
909 		return -EINVAL;
910 	}
911 
912 	/* We use this to queue tasks when qpair is full or no resources in pools */
913 	TAILQ_INIT(&crypto_ch->queued_tasks);
914 	TAILQ_INIT(&crypto_ch->completed_tasks);
915 
916 	return 0;
917 }
918 
919 static struct spdk_io_channel *
920 accel_dpdk_cryptodev_get_io_channel(void)
921 {
922 	return spdk_get_io_channel(&g_accel_dpdk_cryptodev_module);
923 }
924 
925 static size_t
926 accel_dpdk_cryptodev_ctx_size(void)
927 {
928 	return sizeof(struct accel_dpdk_cryptodev_task);
929 }
930 
931 static bool
932 accel_dpdk_cryptodev_supports_opcode(enum spdk_accel_opcode opc)
933 {
934 	switch (opc) {
935 	case SPDK_ACCEL_OPC_ENCRYPT:
936 	case SPDK_ACCEL_OPC_DECRYPT:
937 		return true;
938 	default:
939 		return false;
940 	}
941 }
942 
943 static int
944 accel_dpdk_cryptodev_submit_tasks(struct spdk_io_channel *_ch, struct spdk_accel_task *_task)
945 {
946 	struct accel_dpdk_cryptodev_task *task = SPDK_CONTAINEROF(_task, struct accel_dpdk_cryptodev_task,
947 			base);
948 	struct accel_dpdk_cryptodev_io_channel *ch = spdk_io_channel_get_ctx(_ch);
949 	int rc;
950 
951 	task->cryop_completed = 0;
952 	task->cryop_submitted = 0;
953 	task->cryop_total = 0;
954 	task->inplace = true;
955 	task->is_failed = false;
956 
957 	/* Check if crypto operation is inplace: no destination or source == destination */
958 	if (task->base.s.iovcnt == task->base.d.iovcnt) {
959 		if (memcmp(task->base.s.iovs, task->base.d.iovs, sizeof(struct iovec) * task->base.s.iovcnt) != 0) {
960 			task->inplace = false;
961 		}
962 	} else if (task->base.d.iovcnt != 0) {
963 		task->inplace = false;
964 	}
965 
966 	rc = accel_dpdk_cryptodev_process_task(ch, task);
967 	if (spdk_unlikely(rc)) {
968 		if (rc == -ENOMEM) {
969 			TAILQ_INSERT_TAIL(&ch->queued_tasks, task, link);
970 			rc = 0;
971 		} else if (rc == -EALREADY) {
972 			/* -EALREADY means that a task is completed, but it might be unsafe to complete
973 			 * it if we are in the submission path. Hence put it into a dedicated queue to and
974 			 * process it during polling */
975 			TAILQ_INSERT_TAIL(&ch->completed_tasks, task, link);
976 			rc = 0;
977 		}
978 	}
979 
980 	return rc;
981 }
982 
983 /* Dummy function used by DPDK to free ext attached buffers to mbufs, we free them ourselves but
984  * this callback has to be here. */
985 static void
986 shinfo_free_cb(void *arg1, void *arg2)
987 {
988 }
989 
990 static int
991 accel_dpdk_cryptodev_create(uint8_t index, uint16_t num_lcores)
992 {
993 	struct rte_cryptodev_qp_conf qp_conf = {
994 		.mp_session = g_session_mp,
995 #if RTE_VERSION < RTE_VERSION_NUM(22, 11, 0, 0)
996 		.mp_session_private = g_session_mp_priv
997 #endif
998 	};
999 	/* Setup queue pairs. */
1000 	struct rte_cryptodev_config conf = { .socket_id = SPDK_ENV_NUMA_ID_ANY };
1001 	struct accel_dpdk_cryptodev_device *device;
1002 	uint8_t j, cdev_id, cdrv_id;
1003 	struct accel_dpdk_cryptodev_qp *dev_qp;
1004 	int rc;
1005 
1006 	device = calloc(1, sizeof(*device));
1007 	if (!device) {
1008 		return -ENOMEM;
1009 	}
1010 
1011 	/* Get details about this device. */
1012 	rte_cryptodev_info_get(index, &device->cdev_info);
1013 	cdrv_id = device->cdev_info.driver_id;
1014 	cdev_id = device->cdev_id = index;
1015 
1016 	if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_QAT) == 0) {
1017 		device->qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS;
1018 		device->type = ACCEL_DPDK_CRYPTODEV_DRIVER_QAT;
1019 	} else if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_AESNI_MB) == 0) {
1020 		device->qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS;
1021 		device->type = ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB;
1022 	} else if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_MLX5) == 0) {
1023 		device->qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS_MLX5;
1024 		device->type = ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI;
1025 	} else if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_QAT_ASYM) == 0) {
1026 		/* ACCEL_DPDK_CRYPTODEV_QAT_ASYM devices are not supported at this time. */
1027 		rc = 0;
1028 		goto err;
1029 	} else if (strcmp(device->cdev_info.driver_name, ACCEL_DPDK_CRYPTODEV_UADK) == 0) {
1030 		device->qp_desc_nr = ACCEL_DPDK_CRYPTODEV_QP_DESCRIPTORS;
1031 		device->type = ACCEL_DPDK_CRYPTODEV_DRIVER_UADK;
1032 	} else {
1033 		SPDK_ERRLOG("Failed to start device %u. Invalid driver name \"%s\"\n",
1034 			    cdev_id, device->cdev_info.driver_name);
1035 		rc = -EINVAL;
1036 		goto err;
1037 	}
1038 
1039 	/* Before going any further, make sure we have enough resources for this
1040 	 * device type to function.  We need a unique queue pair per core across each
1041 	 * device type to remain lockless....
1042 	 */
1043 	if ((rte_cryptodev_device_count_by_driver(cdrv_id) *
1044 	     device->cdev_info.max_nb_queue_pairs) < num_lcores) {
1045 		SPDK_ERRLOG("Insufficient unique queue pairs available for %s\n",
1046 			    device->cdev_info.driver_name);
1047 		SPDK_ERRLOG("Either add more crypto devices or decrease core count\n");
1048 		rc = -EINVAL;
1049 		goto err;
1050 	}
1051 
1052 	conf.nb_queue_pairs = device->cdev_info.max_nb_queue_pairs;
1053 	rc = rte_cryptodev_configure(cdev_id, &conf);
1054 	if (rc < 0) {
1055 		SPDK_ERRLOG("Failed to configure cryptodev %u: error %d\n",
1056 			    cdev_id, rc);
1057 		rc = -EINVAL;
1058 		goto err;
1059 	}
1060 
1061 	/* Pre-setup all potential qpairs now and assign them in the channel
1062 	 * callback. If we were to create them there, we'd have to stop the
1063 	 * entire device affecting all other threads that might be using it
1064 	 * even on other queue pairs.
1065 	 */
1066 	qp_conf.nb_descriptors = device->qp_desc_nr;
1067 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
1068 		rc = rte_cryptodev_queue_pair_setup(cdev_id, j, &qp_conf, SOCKET_ID_ANY);
1069 		if (rc < 0) {
1070 			SPDK_ERRLOG("Failed to setup queue pair %u on "
1071 				    "cryptodev %u: error %d\n", j, cdev_id, rc);
1072 			rc = -EINVAL;
1073 			goto err_qp_setup;
1074 		}
1075 	}
1076 
1077 	rc = rte_cryptodev_start(cdev_id);
1078 	if (rc < 0) {
1079 		SPDK_ERRLOG("Failed to start device %u: error %d\n", cdev_id, rc);
1080 		rc = -EINVAL;
1081 		goto err_dev_start;
1082 	}
1083 
1084 	TAILQ_INIT(&device->qpairs);
1085 	/* Build up lists of device/qp combinations per PMD */
1086 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
1087 		dev_qp = calloc(1, sizeof(*dev_qp));
1088 		if (!dev_qp) {
1089 			rc = -ENOMEM;
1090 			goto err_qp_alloc;
1091 		}
1092 		dev_qp->device = device;
1093 		dev_qp->qp = j;
1094 		dev_qp->in_use = false;
1095 		TAILQ_INSERT_TAIL(&device->qpairs, dev_qp, link);
1096 		if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT) {
1097 			dev_qp->index = g_qat_total_qp++;
1098 		}
1099 	}
1100 	/* Add to our list of available crypto devices. */
1101 	TAILQ_INSERT_TAIL(&g_crypto_devices, device, link);
1102 
1103 	return 0;
1104 
1105 err_qp_alloc:
1106 	TAILQ_FOREACH(dev_qp, &device->qpairs, link) {
1107 		if (dev_qp->device->cdev_id != device->cdev_id) {
1108 			continue;
1109 		}
1110 		free(dev_qp);
1111 		if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT) {
1112 			assert(g_qat_total_qp);
1113 			g_qat_total_qp--;
1114 		}
1115 	}
1116 	rte_cryptodev_stop(cdev_id);
1117 err_dev_start:
1118 err_qp_setup:
1119 	rte_cryptodev_close(cdev_id);
1120 err:
1121 	free(device);
1122 
1123 	return rc;
1124 }
1125 
1126 static void
1127 accel_dpdk_cryptodev_release(struct accel_dpdk_cryptodev_device *device)
1128 {
1129 	struct accel_dpdk_cryptodev_qp *dev_qp, *tmp;
1130 
1131 	assert(device);
1132 
1133 	TAILQ_FOREACH_SAFE(dev_qp, &device->qpairs, link, tmp) {
1134 		free(dev_qp);
1135 	}
1136 	if (device->type == ACCEL_DPDK_CRYPTODEV_DRIVER_QAT) {
1137 		assert(g_qat_total_qp >= device->cdev_info.max_nb_queue_pairs);
1138 		g_qat_total_qp -= device->cdev_info.max_nb_queue_pairs;
1139 	}
1140 	rte_cryptodev_stop(device->cdev_id);
1141 	rte_cryptodev_close(device->cdev_id);
1142 	free(device);
1143 }
1144 
1145 static int
1146 accel_dpdk_cryptodev_init(void)
1147 {
1148 	uint8_t cdev_count;
1149 	uint8_t cdev_id;
1150 	int i, rc;
1151 	const char *driver_name = g_driver_names[g_dpdk_cryptodev_driver];
1152 	struct accel_dpdk_cryptodev_device *device, *tmp_dev;
1153 	unsigned int max_sess_size = 0, sess_size;
1154 	uint16_t num_lcores = rte_lcore_count();
1155 	char init_args[32];
1156 
1157 	/* Only the first call via module init should init the crypto drivers. */
1158 	if (g_session_mp != NULL) {
1159 		return 0;
1160 	}
1161 
1162 	if (g_dpdk_cryptodev_driver == ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB ||
1163 	    g_dpdk_cryptodev_driver == ACCEL_DPDK_CRYPTODEV_DRIVER_UADK) {
1164 		snprintf(init_args, sizeof(init_args), "max_nb_queue_pairs=%d",
1165 			 ACCEL_DPDK_CRYPTODEV_AESNI_MB_NUM_QP);
1166 		rc = rte_vdev_init(driver_name, init_args);
1167 		if (rc) {
1168 			SPDK_NOTICELOG("Failed to create virtual PMD %s: error %d. "
1169 				       "Possibly %s is not supported by DPDK library. "
1170 				       "Keep going...\n", driver_name, rc, driver_name);
1171 		}
1172 	}
1173 
1174 	/* If we have no crypto devices, report error to fallback on other modules. */
1175 	cdev_count = rte_cryptodev_count();
1176 	if (cdev_count == 0) {
1177 		return -ENODEV;
1178 	}
1179 	SPDK_NOTICELOG("Found crypto devices: %d\n", (int)cdev_count);
1180 
1181 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
1182 	if (g_mbuf_offset < 0) {
1183 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
1184 		return -EINVAL;
1185 	}
1186 
1187 	/* Create global mempools, shared by all devices regardless of type */
1188 	/* First determine max session size, most pools are shared by all the devices,
1189 	 * so we need to find the global max sessions size. */
1190 	for (cdev_id = 0; cdev_id < cdev_count; cdev_id++) {
1191 		sess_size = rte_cryptodev_sym_get_private_session_size(cdev_id);
1192 		if (sess_size > max_sess_size) {
1193 			max_sess_size = sess_size;
1194 		}
1195 	}
1196 
1197 #if RTE_VERSION < RTE_VERSION_NUM(22, 11, 0, 0)
1198 	g_session_mp_priv = rte_mempool_create("dpdk_crypto_ses_mp_priv",
1199 					       ACCEL_DPDK_CRYPTODEV_NUM_SESSIONS, max_sess_size, ACCEL_DPDK_CRYPTODEV_SESS_MEMPOOL_CACHE_SIZE, 0,
1200 					       NULL, NULL, NULL, NULL, SOCKET_ID_ANY, 0);
1201 	if (g_session_mp_priv == NULL) {
1202 		SPDK_ERRLOG("Cannot create private session pool max size 0x%x\n", max_sess_size);
1203 		return -ENOMEM;
1204 	}
1205 
1206 	/* When session private data mempool allocated, the element size for the session mempool
1207 	 * should be 0. */
1208 	max_sess_size = 0;
1209 #endif
1210 
1211 	g_session_mp = rte_cryptodev_sym_session_pool_create("dpdk_crypto_ses_mp",
1212 			ACCEL_DPDK_CRYPTODEV_NUM_SESSIONS, max_sess_size, ACCEL_DPDK_CRYPTODEV_SESS_MEMPOOL_CACHE_SIZE, 0,
1213 			SOCKET_ID_ANY);
1214 	if (g_session_mp == NULL) {
1215 		SPDK_ERRLOG("Cannot create session pool max size 0x%x\n", max_sess_size);
1216 		rc = -ENOMEM;
1217 		goto error_create_session_mp;
1218 	}
1219 
1220 	g_mbuf_mp = rte_pktmbuf_pool_create("dpdk_crypto_mbuf_mp", ACCEL_DPDK_CRYPTODEV_NUM_MBUFS,
1221 					    ACCEL_DPDK_CRYPTODEV_POOL_CACHE_SIZE,
1222 					    0, 0, SPDK_ENV_NUMA_ID_ANY);
1223 	if (g_mbuf_mp == NULL) {
1224 		SPDK_ERRLOG("Cannot create mbuf pool\n");
1225 		rc = -ENOMEM;
1226 		goto error_create_mbuf;
1227 	}
1228 
1229 	/* We use per op private data as suggested by DPDK and to store the IV and
1230 	 * our own struct for queueing ops. */
1231 	g_crypto_op_mp = rte_crypto_op_pool_create("dpdk_crypto_op_mp",
1232 			 RTE_CRYPTO_OP_TYPE_SYMMETRIC, ACCEL_DPDK_CRYPTODEV_NUM_MBUFS, ACCEL_DPDK_CRYPTODEV_POOL_CACHE_SIZE,
1233 			 (ACCEL_DPDK_CRYPTODEV_DEFAULT_NUM_XFORMS * sizeof(struct rte_crypto_sym_xform)) +
1234 			 ACCEL_DPDK_CRYPTODEV_IV_LENGTH, rte_socket_id());
1235 	if (g_crypto_op_mp == NULL) {
1236 		SPDK_ERRLOG("Cannot create op pool\n");
1237 		rc = -ENOMEM;
1238 		goto error_create_op;
1239 	}
1240 
1241 	/* Init all devices */
1242 	for (i = 0; i < cdev_count; i++) {
1243 		rc = accel_dpdk_cryptodev_create(i, num_lcores);
1244 		if (rc) {
1245 			goto err;
1246 		}
1247 	}
1248 
1249 	g_shinfo.free_cb = shinfo_free_cb;
1250 
1251 	spdk_io_device_register(&g_accel_dpdk_cryptodev_module, _accel_dpdk_cryptodev_create_cb,
1252 				_accel_dpdk_cryptodev_destroy_cb, sizeof(struct accel_dpdk_cryptodev_io_channel),
1253 				"accel_dpdk_cryptodev");
1254 
1255 	return 0;
1256 
1257 	/* Error cleanup paths. */
1258 err:
1259 	TAILQ_FOREACH_SAFE(device, &g_crypto_devices, link, tmp_dev) {
1260 		TAILQ_REMOVE(&g_crypto_devices, device, link);
1261 		accel_dpdk_cryptodev_release(device);
1262 	}
1263 	rte_mempool_free(g_crypto_op_mp);
1264 	g_crypto_op_mp = NULL;
1265 error_create_op:
1266 	rte_mempool_free(g_mbuf_mp);
1267 	g_mbuf_mp = NULL;
1268 error_create_mbuf:
1269 	rte_mempool_free(g_session_mp);
1270 	g_session_mp = NULL;
1271 error_create_session_mp:
1272 	if (g_session_mp_priv != NULL) {
1273 		rte_mempool_free(g_session_mp_priv);
1274 		g_session_mp_priv = NULL;
1275 	}
1276 	return rc;
1277 }
1278 
1279 static void
1280 accel_dpdk_cryptodev_fini_cb(void *io_device)
1281 {
1282 	struct accel_dpdk_cryptodev_device *device, *tmp;
1283 
1284 	TAILQ_FOREACH_SAFE(device, &g_crypto_devices, link, tmp) {
1285 		TAILQ_REMOVE(&g_crypto_devices, device, link);
1286 		accel_dpdk_cryptodev_release(device);
1287 	}
1288 
1289 	if (g_dpdk_cryptodev_driver == ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB ||
1290 	    g_dpdk_cryptodev_driver == ACCEL_DPDK_CRYPTODEV_DRIVER_UADK) {
1291 		rte_vdev_uninit(g_driver_names[g_dpdk_cryptodev_driver]);
1292 	}
1293 
1294 	rte_mempool_free(g_crypto_op_mp);
1295 	rte_mempool_free(g_mbuf_mp);
1296 	rte_mempool_free(g_session_mp);
1297 	if (g_session_mp_priv != NULL) {
1298 		rte_mempool_free(g_session_mp_priv);
1299 	}
1300 
1301 	spdk_accel_module_finish();
1302 }
1303 
1304 /* Called when the entire module is being torn down. */
1305 static void
1306 accel_dpdk_cryptodev_fini(void *ctx)
1307 {
1308 	if (g_crypto_op_mp) {
1309 		spdk_io_device_unregister(&g_accel_dpdk_cryptodev_module, accel_dpdk_cryptodev_fini_cb);
1310 	}
1311 }
1312 
1313 static void
1314 accel_dpdk_cryptodev_key_handle_session_free(struct accel_dpdk_cryptodev_device *device,
1315 		void *session)
1316 {
1317 #if RTE_VERSION >= RTE_VERSION_NUM(22, 11, 0, 0)
1318 	assert(device != NULL);
1319 
1320 	rte_cryptodev_sym_session_free(device->cdev_id, session);
1321 #else
1322 	rte_cryptodev_sym_session_free(session);
1323 #endif
1324 }
1325 
1326 static void *
1327 accel_dpdk_cryptodev_key_handle_session_create(struct accel_dpdk_cryptodev_device *device,
1328 		struct rte_crypto_sym_xform *cipher_xform)
1329 {
1330 	void *session;
1331 
1332 #if RTE_VERSION >= RTE_VERSION_NUM(22, 11, 0, 0)
1333 	session = rte_cryptodev_sym_session_create(device->cdev_id, cipher_xform, g_session_mp);
1334 #else
1335 	session = rte_cryptodev_sym_session_create(g_session_mp);
1336 	if (!session) {
1337 		return NULL;
1338 	}
1339 
1340 	if (rte_cryptodev_sym_session_init(device->cdev_id, session, cipher_xform, g_session_mp_priv) < 0) {
1341 		accel_dpdk_cryptodev_key_handle_session_free(device, session);
1342 		return NULL;
1343 	}
1344 #endif
1345 
1346 	return session;
1347 }
1348 
1349 static int
1350 accel_dpdk_cryptodev_key_handle_configure(struct spdk_accel_crypto_key *key,
1351 		struct accel_dpdk_cryptodev_key_handle *key_handle)
1352 {
1353 	struct accel_dpdk_cryptodev_key_priv *priv = key->priv;
1354 
1355 	key_handle->cipher_xform.type = RTE_CRYPTO_SYM_XFORM_CIPHER;
1356 	key_handle->cipher_xform.cipher.iv.offset = ACCEL_DPDK_CRYPTODEV_IV_OFFSET;
1357 	key_handle->cipher_xform.cipher.iv.length = ACCEL_DPDK_CRYPTODEV_IV_LENGTH;
1358 
1359 	switch (priv->cipher) {
1360 	case SPDK_ACCEL_CIPHER_AES_CBC:
1361 		key_handle->cipher_xform.cipher.key.data = key->key;
1362 		key_handle->cipher_xform.cipher.key.length = key->key_size;
1363 		key_handle->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_CBC;
1364 		break;
1365 	case SPDK_ACCEL_CIPHER_AES_XTS:
1366 		key_handle->cipher_xform.cipher.key.data = priv->xts_key;
1367 		key_handle->cipher_xform.cipher.key.length = key->key_size + key->key2_size;
1368 		key_handle->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_XTS;
1369 		break;
1370 	default:
1371 		SPDK_ERRLOG("Invalid cipher name %s.\n", key->param.cipher);
1372 		return -EINVAL;
1373 	}
1374 
1375 	key_handle->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_ENCRYPT;
1376 	key_handle->session_encrypt = accel_dpdk_cryptodev_key_handle_session_create(key_handle->device,
1377 				      &key_handle->cipher_xform);
1378 	if (!key_handle->session_encrypt) {
1379 		SPDK_ERRLOG("Failed to init encrypt session\n");
1380 		return -EINVAL;
1381 	}
1382 
1383 	key_handle->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_DECRYPT;
1384 	key_handle->session_decrypt = accel_dpdk_cryptodev_key_handle_session_create(key_handle->device,
1385 				      &key_handle->cipher_xform);
1386 	if (!key_handle->session_decrypt) {
1387 		SPDK_ERRLOG("Failed to init decrypt session:");
1388 		accel_dpdk_cryptodev_key_handle_session_free(key_handle->device, key_handle->session_encrypt);
1389 		return -EINVAL;
1390 	}
1391 
1392 	return 0;
1393 }
1394 
1395 static void
1396 accel_dpdk_cryptodev_key_deinit(struct spdk_accel_crypto_key *key)
1397 {
1398 	struct accel_dpdk_cryptodev_key_handle *key_handle, *key_handle_tmp;
1399 	struct accel_dpdk_cryptodev_key_priv *priv = key->priv;
1400 
1401 	TAILQ_FOREACH_SAFE(key_handle, &priv->dev_keys, link, key_handle_tmp) {
1402 		accel_dpdk_cryptodev_key_handle_session_free(key_handle->device, key_handle->session_encrypt);
1403 		accel_dpdk_cryptodev_key_handle_session_free(key_handle->device, key_handle->session_decrypt);
1404 		TAILQ_REMOVE(&priv->dev_keys, key_handle, link);
1405 		spdk_memset_s(key_handle, sizeof(*key_handle), 0, sizeof(*key_handle));
1406 		free(key_handle);
1407 	}
1408 
1409 	if (priv->xts_key) {
1410 		spdk_memset_s(priv->xts_key, key->key_size + key->key2_size, 0, key->key_size + key->key2_size);
1411 	}
1412 	free(priv->xts_key);
1413 	free(priv);
1414 }
1415 
1416 static bool
1417 accel_dpdk_cryptodev_supports_cipher(enum spdk_accel_cipher cipher, size_t key_size)
1418 {
1419 	switch (g_dpdk_cryptodev_driver) {
1420 	case ACCEL_DPDK_CRYPTODEV_DRIVER_QAT:
1421 	case ACCEL_DPDK_CRYPTODEV_DRIVER_UADK:
1422 	case ACCEL_DPDK_CRYPTODEV_DRIVER_AESNI_MB:
1423 		switch (cipher) {
1424 		case SPDK_ACCEL_CIPHER_AES_XTS:
1425 			return key_size == SPDK_ACCEL_AES_XTS_128_KEY_SIZE;
1426 		case SPDK_ACCEL_CIPHER_AES_CBC:
1427 			return key_size == ACCEL_DPDK_CRYPTODEV_AES_CBC_128_KEY_SIZE ||
1428 			       key_size == ACCEL_DPDK_CRYPTODEV_AES_CBC_256_KEY_SIZE;
1429 		default:
1430 			return false;
1431 		}
1432 	case ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI:
1433 		switch (cipher) {
1434 		case SPDK_ACCEL_CIPHER_AES_XTS:
1435 			return key_size == SPDK_ACCEL_AES_XTS_128_KEY_SIZE || key_size == SPDK_ACCEL_AES_XTS_256_KEY_SIZE;
1436 		default:
1437 			return false;
1438 		}
1439 	default:
1440 		return false;
1441 	}
1442 }
1443 
1444 static int
1445 accel_dpdk_cryptodev_key_init(struct spdk_accel_crypto_key *key)
1446 {
1447 	struct accel_dpdk_cryptodev_device *device;
1448 	struct accel_dpdk_cryptodev_key_priv *priv;
1449 	struct accel_dpdk_cryptodev_key_handle *key_handle;
1450 	enum accel_dpdk_cryptodev_driver_type driver;
1451 	int rc;
1452 
1453 	driver = g_dpdk_cryptodev_driver;
1454 
1455 	priv = calloc(1, sizeof(*priv));
1456 	if (!priv) {
1457 		SPDK_ERRLOG("Memory allocation failed\n");
1458 		return -ENOMEM;
1459 	}
1460 	key->priv = priv;
1461 	priv->driver = driver;
1462 	priv->cipher = key->cipher;
1463 	TAILQ_INIT(&priv->dev_keys);
1464 
1465 	if (key->cipher == SPDK_ACCEL_CIPHER_AES_XTS) {
1466 		/* DPDK expects the keys to be concatenated together. */
1467 		priv->xts_key = calloc(key->key_size + key->key2_size + 1, sizeof(char));
1468 		if (!priv->xts_key) {
1469 			SPDK_ERRLOG("Memory allocation failed\n");
1470 			accel_dpdk_cryptodev_key_deinit(key);
1471 			return -ENOMEM;
1472 		}
1473 		memcpy(priv->xts_key, key->key, key->key_size);
1474 		memcpy(priv->xts_key + key->key_size, key->key2, key->key2_size);
1475 	}
1476 
1477 	pthread_mutex_lock(&g_device_lock);
1478 	TAILQ_FOREACH(device, &g_crypto_devices, link) {
1479 		if (device->type != driver) {
1480 			continue;
1481 		}
1482 		key_handle = calloc(1, sizeof(*key_handle));
1483 		if (!key_handle) {
1484 			pthread_mutex_unlock(&g_device_lock);
1485 			accel_dpdk_cryptodev_key_deinit(key);
1486 			return -ENOMEM;
1487 		}
1488 		key_handle->device = device;
1489 		TAILQ_INSERT_TAIL(&priv->dev_keys, key_handle, link);
1490 		rc = accel_dpdk_cryptodev_key_handle_configure(key, key_handle);
1491 		if (rc) {
1492 			pthread_mutex_unlock(&g_device_lock);
1493 			accel_dpdk_cryptodev_key_deinit(key);
1494 			return rc;
1495 		}
1496 		if (driver != ACCEL_DPDK_CRYPTODEV_DRIVER_MLX5_PCI) {
1497 			/* For MLX5_PCI we need to register a key on each device since
1498 			 * the key is bound to a specific Protection Domain,
1499 			 * so don't break the loop */
1500 			break;
1501 		}
1502 	}
1503 	pthread_mutex_unlock(&g_device_lock);
1504 
1505 	if (TAILQ_EMPTY(&priv->dev_keys)) {
1506 		free(priv);
1507 		return -ENODEV;
1508 	}
1509 
1510 	return 0;
1511 }
1512 
1513 static void
1514 accel_dpdk_cryptodev_write_config_json(struct spdk_json_write_ctx *w)
1515 {
1516 	spdk_json_write_object_begin(w);
1517 	spdk_json_write_named_string(w, "method", "dpdk_cryptodev_scan_accel_module");
1518 	spdk_json_write_object_end(w);
1519 
1520 	spdk_json_write_object_begin(w);
1521 	spdk_json_write_named_string(w, "method", "dpdk_cryptodev_set_driver");
1522 	spdk_json_write_named_object_begin(w, "params");
1523 	spdk_json_write_named_string(w, "driver_name", g_driver_names[g_dpdk_cryptodev_driver]);
1524 	spdk_json_write_object_end(w);
1525 	spdk_json_write_object_end(w);
1526 }
1527 
1528 static int
1529 accel_dpdk_cryptodev_get_operation_info(enum spdk_accel_opcode opcode,
1530 					const struct spdk_accel_operation_exec_ctx *ctx,
1531 					struct spdk_accel_opcode_info *info)
1532 {
1533 	if (!accel_dpdk_cryptodev_supports_opcode(opcode)) {
1534 		SPDK_ERRLOG("Received unexpected opcode: %d", opcode);
1535 		assert(false);
1536 		return -EINVAL;
1537 	}
1538 
1539 	switch (g_dpdk_cryptodev_driver) {
1540 	case ACCEL_DPDK_CRYPTODEV_DRIVER_QAT:
1541 		info->required_alignment = spdk_u32log2(ctx->block_size);
1542 		break;
1543 	default:
1544 		info->required_alignment = 0;
1545 		break;
1546 	}
1547 
1548 	return 0;
1549 }
1550 
1551 static struct spdk_accel_module_if g_accel_dpdk_cryptodev_module = {
1552 	.module_init		= accel_dpdk_cryptodev_init,
1553 	.module_fini		= accel_dpdk_cryptodev_fini,
1554 	.write_config_json	= accel_dpdk_cryptodev_write_config_json,
1555 	.get_ctx_size		= accel_dpdk_cryptodev_ctx_size,
1556 	.name			= "dpdk_cryptodev",
1557 	.supports_opcode	= accel_dpdk_cryptodev_supports_opcode,
1558 	.get_io_channel		= accel_dpdk_cryptodev_get_io_channel,
1559 	.submit_tasks		= accel_dpdk_cryptodev_submit_tasks,
1560 	.crypto_key_init	= accel_dpdk_cryptodev_key_init,
1561 	.crypto_key_deinit	= accel_dpdk_cryptodev_key_deinit,
1562 	.crypto_supports_cipher	= accel_dpdk_cryptodev_supports_cipher,
1563 	.get_operation_info	= accel_dpdk_cryptodev_get_operation_info,
1564 };
1565