xref: /spdk/module/bdev/crypto/vbdev_crypto.c (revision 1a00f5c09488e7466a331b8c75cde4969740357f)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUcryptoION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "vbdev_crypto.h"
35 
36 #include "spdk/env.h"
37 #include "spdk/likely.h"
38 #include "spdk/endian.h"
39 #include "spdk/thread.h"
40 #include "spdk/bdev_module.h"
41 #include "spdk/log.h"
42 
43 #include <rte_config.h>
44 #include <rte_bus_vdev.h>
45 #include <rte_crypto.h>
46 #include <rte_cryptodev.h>
47 #include <rte_mbuf_dyn.h>
48 
49 /* Used to store IO context in mbuf */
50 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
51 	.name = "context_bdev_io",
52 	.size = sizeof(uint64_t),
53 	.align = __alignof__(uint64_t),
54 	.flags = 0,
55 };
56 static int g_mbuf_offset;
57 
58 /* To add support for new device types, follow the examples of the following...
59  * Note that the string names are defined by the DPDK PMD in question so be
60  * sure to use the exact names.
61  */
62 #define MAX_NUM_DRV_TYPES 2
63 
64 /* The VF spread is the number of queue pairs between virtual functions, we use this to
65  * load balance the QAT device.
66  */
67 #define QAT_VF_SPREAD 32
68 static uint8_t g_qat_total_qp = 0;
69 static uint8_t g_next_qat_index;
70 
71 const char *g_driver_names[MAX_NUM_DRV_TYPES] = { AESNI_MB, QAT };
72 
73 /* Global list of available crypto devices. */
74 struct vbdev_dev {
75 	struct rte_cryptodev_info	cdev_info;	/* includes device friendly name */
76 	uint8_t				cdev_id;	/* identifier for the device */
77 	TAILQ_ENTRY(vbdev_dev)		link;
78 };
79 static TAILQ_HEAD(, vbdev_dev) g_vbdev_devs = TAILQ_HEAD_INITIALIZER(g_vbdev_devs);
80 
81 /* Global list and lock for unique device/queue pair combos. We keep 1 list per supported PMD
82  * so that we can optimize per PMD where it make sense. For example, with QAT there an optimal
83  * pattern for assigning queue pairs where with AESNI there is not.
84  */
85 struct device_qp {
86 	struct vbdev_dev		*device;	/* ptr to crypto device */
87 	uint8_t				qp;		/* queue pair for this node */
88 	bool				in_use;		/* whether this node is in use or not */
89 	uint8_t				index;		/* used by QAT to load balance placement of qpairs */
90 	TAILQ_ENTRY(device_qp)		link;
91 };
92 static TAILQ_HEAD(, device_qp) g_device_qp_qat = TAILQ_HEAD_INITIALIZER(g_device_qp_qat);
93 static TAILQ_HEAD(, device_qp) g_device_qp_aesni_mb = TAILQ_HEAD_INITIALIZER(g_device_qp_aesni_mb);
94 static pthread_mutex_t g_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
95 
96 
97 /* In order to limit the number of resources we need to do one crypto
98  * operation per LBA (we use LBA as IV), we tell the bdev layer that
99  * our max IO size is something reasonable. Units here are in bytes.
100  */
101 #define CRYPTO_MAX_IO		(64 * 1024)
102 
103 /* This controls how many ops will be dequeued from the crypto driver in one run
104  * of the poller. It is mainly a performance knob as it effectively determines how
105  * much work the poller has to do.  However even that can vary between crypto drivers
106  * as the AESNI_MB driver for example does all the crypto work on dequeue whereas the
107  * QAT driver just dequeues what has been completed already.
108  */
109 #define MAX_DEQUEUE_BURST_SIZE	64
110 
111 /* When enqueueing, we need to supply the crypto driver with an array of pointers to
112  * operation structs. As each of these can be max 512B, we can adjust the CRYPTO_MAX_IO
113  * value in conjunction with the other defines to make sure we're not using crazy amounts
114  * of memory. All of these numbers can and probably should be adjusted based on the
115  * workload. By default we'll use the worst case (smallest) block size for the
116  * minimum number of array entries. As an example, a CRYPTO_MAX_IO size of 64K with 512B
117  * blocks would give us an enqueue array size of 128.
118  */
119 #define MAX_ENQUEUE_ARRAY_SIZE (CRYPTO_MAX_IO / 512)
120 
121 /* The number of MBUFS we need must be a power of two and to support other small IOs
122  * in addition to the limits mentioned above, we go to the next power of two. It is
123  * big number because it is one mempool for source and destination mbufs. It may
124  * need to be bigger to support multiple crypto drivers at once.
125  */
126 #define NUM_MBUFS		32768
127 #define POOL_CACHE_SIZE		256
128 #define MAX_CRYPTO_VOLUMES	128
129 #define NUM_SESSIONS		(2 * MAX_CRYPTO_VOLUMES)
130 #define SESS_MEMPOOL_CACHE_SIZE 0
131 uint8_t g_number_of_claimed_volumes = 0;
132 
133 /* This is the max number of IOs we can supply to any crypto device QP at one time.
134  * It can vary between drivers.
135  */
136 #define CRYPTO_QP_DESCRIPTORS	2048
137 
138 /* Specific to AES_CBC. */
139 #define AES_CBC_IV_LENGTH	16
140 #define AES_CBC_KEY_LENGTH	16
141 #define AES_XTS_KEY_LENGTH	16	/* XTS uses 2 keys, each of this size. */
142 #define AESNI_MB_NUM_QP		64
143 
144 /* Common for suported devices. */
145 #define IV_OFFSET            (sizeof(struct rte_crypto_op) + \
146 				sizeof(struct rte_crypto_sym_op))
147 #define QUEUED_OP_OFFSET (IV_OFFSET + AES_CBC_IV_LENGTH)
148 
149 static void _complete_internal_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
150 static void _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
151 static void _complete_internal_write(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
152 static void vbdev_crypto_examine(struct spdk_bdev *bdev);
153 static int vbdev_crypto_claim(const char *bdev_name);
154 static void vbdev_crypto_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
155 
156 /* List of crypto_bdev names and their base bdevs via configuration file. */
157 struct bdev_names {
158 	char			*vbdev_name;	/* name of the vbdev to create */
159 	char			*bdev_name;	/* base bdev name */
160 
161 	/* Note, for dev/test we allow use of key in the config file, for production
162 	 * use, you must use an RPC to specify the key for security reasons.
163 	 */
164 	uint8_t			*key;		/* key per bdev */
165 	char			*drv_name;	/* name of the crypto device driver */
166 	char			*cipher;	/* AES_CBC or AES_XTS */
167 	uint8_t			*key2;		/* key #2 for AES_XTS, per bdev */
168 	TAILQ_ENTRY(bdev_names)	link;
169 };
170 static TAILQ_HEAD(, bdev_names) g_bdev_names = TAILQ_HEAD_INITIALIZER(g_bdev_names);
171 
172 /* List of virtual bdevs and associated info for each. We keep the device friendly name here even
173  * though its also in the device struct because we use it early on.
174  */
175 struct vbdev_crypto {
176 	struct spdk_bdev		*base_bdev;		/* the thing we're attaching to */
177 	struct spdk_bdev_desc		*base_desc;		/* its descriptor we get from open */
178 	struct spdk_bdev		crypto_bdev;		/* the crypto virtual bdev */
179 	uint8_t				*key;			/* key per bdev */
180 	uint8_t				*key2;			/* for XTS */
181 	uint8_t				*xts_key;		/* key + key 2 */
182 	char				*drv_name;		/* name of the crypto device driver */
183 	char				*cipher;		/* cipher used */
184 	struct rte_cryptodev_sym_session *session_encrypt;	/* encryption session for this bdev */
185 	struct rte_cryptodev_sym_session *session_decrypt;	/* decryption session for this bdev */
186 	struct rte_crypto_sym_xform	cipher_xform;		/* crypto control struct for this bdev */
187 	TAILQ_ENTRY(vbdev_crypto)	link;
188 	struct spdk_thread		*thread;		/* thread where base device is opened */
189 };
190 static TAILQ_HEAD(, vbdev_crypto) g_vbdev_crypto = TAILQ_HEAD_INITIALIZER(g_vbdev_crypto);
191 
192 /* Shared mempools between all devices on this system */
193 static struct rte_mempool *g_session_mp = NULL;
194 static struct rte_mempool *g_session_mp_priv = NULL;
195 static struct rte_mempool *g_mbuf_mp = NULL;            /* mbuf mempool */
196 static struct rte_mempool *g_crypto_op_mp = NULL;	/* crypto operations, must be rte* mempool */
197 
198 static struct rte_mbuf_ext_shared_info g_shinfo = {};   /* used by DPDK mbuf macro */
199 
200 /* For queueing up crypto operations that we can't submit for some reason */
201 struct vbdev_crypto_op {
202 	uint8_t					cdev_id;
203 	uint8_t					qp;
204 	struct rte_crypto_op			*crypto_op;
205 	struct spdk_bdev_io			*bdev_io;
206 	TAILQ_ENTRY(vbdev_crypto_op)		link;
207 };
208 #define QUEUED_OP_LENGTH (sizeof(struct vbdev_crypto_op))
209 
210 /* The crypto vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
211  * We store things in here that are needed on per thread basis like the base_channel for this thread,
212  * and the poller for this thread.
213  */
214 struct crypto_io_channel {
215 	struct spdk_io_channel		*base_ch;		/* IO channel of base device */
216 	struct spdk_poller		*poller;		/* completion poller */
217 	struct device_qp		*device_qp;		/* unique device/qp combination for this channel */
218 	TAILQ_HEAD(, spdk_bdev_io)	pending_cry_ios;	/* outstanding operations to the crypto device */
219 	struct spdk_io_channel_iter	*iter;			/* used with for_each_channel in reset */
220 	TAILQ_HEAD(, vbdev_crypto_op)	queued_cry_ops;		/* queued for re-submission to CryptoDev */
221 };
222 
223 /* This is the crypto per IO context that the bdev layer allocates for us opaquely and attaches to
224  * each IO for us.
225  */
226 struct crypto_bdev_io {
227 	int cryop_cnt_remaining;			/* counter used when completing crypto ops */
228 	struct crypto_io_channel *crypto_ch;		/* need to store for crypto completion handling */
229 	struct vbdev_crypto *crypto_bdev;		/* the crypto node struct associated with this IO */
230 	struct spdk_bdev_io *orig_io;			/* the original IO */
231 	struct spdk_bdev_io *read_io;			/* the read IO we issued */
232 	int8_t bdev_io_status;				/* the status we'll report back on the bdev IO */
233 	bool on_pending_list;
234 	/* Used for the single contiguous buffer that serves as the crypto destination target for writes */
235 	uint64_t aux_num_blocks;			/* num of blocks for the contiguous buffer */
236 	uint64_t aux_offset_blocks;			/* block offset on media */
237 	void *aux_buf_raw;				/* raw buffer that the bdev layer gave us for write buffer */
238 	struct iovec aux_buf_iov;			/* iov representing aligned contig write buffer */
239 
240 	/* for bdev_io_wait */
241 	struct spdk_bdev_io_wait_entry bdev_io_wait;
242 	struct spdk_io_channel *ch;
243 };
244 
245 /* Called by vbdev_crypto_init_crypto_drivers() to init each discovered crypto device */
246 static int
247 create_vbdev_dev(uint8_t index, uint16_t num_lcores)
248 {
249 	struct vbdev_dev *device;
250 	uint8_t j, cdev_id, cdrv_id;
251 	struct device_qp *dev_qp;
252 	struct device_qp *tmp_qp;
253 	int rc;
254 	TAILQ_HEAD(device_qps, device_qp) *dev_qp_head;
255 
256 	device = calloc(1, sizeof(struct vbdev_dev));
257 	if (!device) {
258 		return -ENOMEM;
259 	}
260 
261 	/* Get details about this device. */
262 	rte_cryptodev_info_get(index, &device->cdev_info);
263 	cdrv_id = device->cdev_info.driver_id;
264 	cdev_id = device->cdev_id = index;
265 
266 	/* QAT_ASYM devices are not supported at this time. */
267 	if (strcmp(device->cdev_info.driver_name, QAT_ASYM) == 0) {
268 		free(device);
269 		return 0;
270 	}
271 
272 	/* Before going any further, make sure we have enough resources for this
273 	 * device type to function.  We need a unique queue pair per core accross each
274 	 * device type to remain lockless....
275 	 */
276 	if ((rte_cryptodev_device_count_by_driver(cdrv_id) *
277 	     device->cdev_info.max_nb_queue_pairs) < num_lcores) {
278 		SPDK_ERRLOG("Insufficient unique queue pairs available for %s\n",
279 			    device->cdev_info.driver_name);
280 		SPDK_ERRLOG("Either add more crypto devices or decrease core count\n");
281 		rc = -EINVAL;
282 		goto err;
283 	}
284 
285 	/* Setup queue pairs. */
286 	struct rte_cryptodev_config conf = {
287 		.nb_queue_pairs = device->cdev_info.max_nb_queue_pairs,
288 		.socket_id = SPDK_ENV_SOCKET_ID_ANY
289 	};
290 
291 	rc = rte_cryptodev_configure(cdev_id, &conf);
292 	if (rc < 0) {
293 		SPDK_ERRLOG("Failed to configure cryptodev %u\n", cdev_id);
294 		rc = -EINVAL;
295 		goto err;
296 	}
297 
298 	struct rte_cryptodev_qp_conf qp_conf = {
299 		.nb_descriptors = CRYPTO_QP_DESCRIPTORS,
300 		.mp_session = g_session_mp,
301 		.mp_session_private = g_session_mp_priv,
302 	};
303 
304 	/* Pre-setup all potential qpairs now and assign them in the channel
305 	 * callback. If we were to create them there, we'd have to stop the
306 	 * entire device affecting all other threads that might be using it
307 	 * even on other queue pairs.
308 	 */
309 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
310 		rc = rte_cryptodev_queue_pair_setup(cdev_id, j, &qp_conf, SOCKET_ID_ANY);
311 		if (rc < 0) {
312 			SPDK_ERRLOG("Failed to setup queue pair %u on "
313 				    "cryptodev %u\n", j, cdev_id);
314 			rc = -EINVAL;
315 			goto err_qp_setup;
316 		}
317 	}
318 
319 	rc = rte_cryptodev_start(cdev_id);
320 	if (rc < 0) {
321 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
322 			    cdev_id, rc);
323 		rc = -EINVAL;
324 		goto err_dev_start;
325 	}
326 
327 	/* Select the right device/qp list based on driver name
328 	 * or error if it does not exist.
329 	 */
330 	if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
331 		dev_qp_head = (struct device_qps *)&g_device_qp_qat;
332 	} else if (strcmp(device->cdev_info.driver_name, AESNI_MB) == 0) {
333 		dev_qp_head = (struct device_qps *)&g_device_qp_aesni_mb;
334 	} else {
335 		SPDK_ERRLOG("Failed to start device %u. Invalid driver name \"%s\"\n",
336 			    cdev_id, device->cdev_info.driver_name);
337 		rc = -EINVAL;
338 		goto err_invalid_drv_name;
339 	}
340 
341 	/* Build up lists of device/qp combinations per PMD */
342 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
343 		dev_qp = calloc(1, sizeof(struct device_qp));
344 		if (!dev_qp) {
345 			rc = -ENOMEM;
346 			goto err_qp_alloc;
347 		}
348 		dev_qp->device = device;
349 		dev_qp->qp = j;
350 		dev_qp->in_use = false;
351 		if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
352 			g_qat_total_qp++;
353 		}
354 		TAILQ_INSERT_TAIL(dev_qp_head, dev_qp, link);
355 	}
356 
357 	/* Add to our list of available crypto devices. */
358 	TAILQ_INSERT_TAIL(&g_vbdev_devs, device, link);
359 
360 	return 0;
361 err_qp_alloc:
362 	TAILQ_FOREACH_SAFE(dev_qp, dev_qp_head, link, tmp_qp) {
363 		if (dev_qp->device->cdev_id != device->cdev_id) {
364 			continue;
365 		}
366 		TAILQ_REMOVE(dev_qp_head, dev_qp, link);
367 		if (dev_qp_head == (struct device_qps *)&g_device_qp_qat) {
368 			g_qat_total_qp--;
369 		}
370 		free(dev_qp);
371 	}
372 err_invalid_drv_name:
373 	rte_cryptodev_stop(cdev_id);
374 err_dev_start:
375 err_qp_setup:
376 	rte_cryptodev_close(cdev_id);
377 err:
378 	free(device);
379 
380 	return rc;
381 }
382 
383 static void
384 release_vbdev_dev(struct vbdev_dev *device)
385 {
386 	struct device_qp *dev_qp;
387 	struct device_qp *tmp_qp;
388 	TAILQ_HEAD(device_qps, device_qp) *dev_qp_head = NULL;
389 
390 	assert(device);
391 
392 	/* Select the right device/qp list based on driver name. */
393 	if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
394 		dev_qp_head = (struct device_qps *)&g_device_qp_qat;
395 	} else if (strcmp(device->cdev_info.driver_name, AESNI_MB) == 0) {
396 		dev_qp_head = (struct device_qps *)&g_device_qp_aesni_mb;
397 	}
398 	if (dev_qp_head) {
399 		TAILQ_FOREACH_SAFE(dev_qp, dev_qp_head, link, tmp_qp) {
400 			/* Remove only qps of our device even if the driver names matches. */
401 			if (dev_qp->device->cdev_id != device->cdev_id) {
402 				continue;
403 			}
404 			TAILQ_REMOVE(dev_qp_head, dev_qp, link);
405 			if (dev_qp_head == (struct device_qps *)&g_device_qp_qat) {
406 				g_qat_total_qp--;
407 			}
408 			free(dev_qp);
409 		}
410 	}
411 	rte_cryptodev_stop(device->cdev_id);
412 	rte_cryptodev_close(device->cdev_id);
413 	free(device);
414 }
415 
416 /* Dummy function used by DPDK to free ext attached buffers to mbufs, we free them ourselves but
417  * this callback has to be here. */
418 static void shinfo_free_cb(void *arg1, void *arg2)
419 {
420 }
421 
422 /* This is called from the module's init function. We setup all crypto devices early on as we are unable
423  * to easily dynamically configure queue pairs after the drivers are up and running.  So, here, we
424  * configure the max capabilities of each device and assign threads to queue pairs as channels are
425  * requested.
426  */
427 static int
428 vbdev_crypto_init_crypto_drivers(void)
429 {
430 	uint8_t cdev_count;
431 	uint8_t cdev_id;
432 	int i, rc;
433 	struct vbdev_dev *device;
434 	struct vbdev_dev *tmp_dev;
435 	struct device_qp *dev_qp;
436 	unsigned int max_sess_size = 0, sess_size;
437 	uint16_t num_lcores = rte_lcore_count();
438 	char aesni_args[32];
439 
440 	/* Only the first call, via RPC or module init should init the crypto drivers. */
441 	if (g_session_mp != NULL) {
442 		return 0;
443 	}
444 
445 	/* We always init AESNI_MB */
446 	snprintf(aesni_args, sizeof(aesni_args), "max_nb_queue_pairs=%d", AESNI_MB_NUM_QP);
447 	rc = rte_vdev_init(AESNI_MB, aesni_args);
448 	if (rc) {
449 		SPDK_NOTICELOG("Failed to create virtual PMD %s: error %d. "
450 			       "Possibly %s is not supported by DPDK library. "
451 			       "Keep going...\n", AESNI_MB, rc, AESNI_MB);
452 	}
453 
454 	/* If we have no crypto devices, there's no reason to continue. */
455 	cdev_count = rte_cryptodev_count();
456 	if (cdev_count == 0) {
457 		return 0;
458 	}
459 
460 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
461 	if (g_mbuf_offset < 0) {
462 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
463 		return -EINVAL;
464 	}
465 
466 	/*
467 	 * Create global mempools, shared by all devices regardless of type.
468 	 */
469 
470 	/* First determine max session size, most pools are shared by all the devices,
471 	 * so we need to find the global max sessions size.
472 	 */
473 	for (cdev_id = 0; cdev_id < cdev_count; cdev_id++) {
474 		sess_size = rte_cryptodev_sym_get_private_session_size(cdev_id);
475 		if (sess_size > max_sess_size) {
476 			max_sess_size = sess_size;
477 		}
478 	}
479 
480 	g_session_mp_priv = rte_mempool_create("session_mp_priv", NUM_SESSIONS, max_sess_size,
481 					       SESS_MEMPOOL_CACHE_SIZE, 0, NULL, NULL, NULL,
482 					       NULL, SOCKET_ID_ANY, 0);
483 	if (g_session_mp_priv == NULL) {
484 		SPDK_ERRLOG("Cannot create private session pool max size 0x%x\n", max_sess_size);
485 		return -ENOMEM;
486 	}
487 
488 	g_session_mp = rte_cryptodev_sym_session_pool_create(
489 			       "session_mp",
490 			       NUM_SESSIONS, 0, SESS_MEMPOOL_CACHE_SIZE, 0,
491 			       SOCKET_ID_ANY);
492 	if (g_session_mp == NULL) {
493 		SPDK_ERRLOG("Cannot create session pool max size 0x%x\n", max_sess_size);
494 		rc = -ENOMEM;
495 		goto error_create_session_mp;
496 	}
497 
498 	g_mbuf_mp = rte_pktmbuf_pool_create("mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
499 					    0, 0, SPDK_ENV_SOCKET_ID_ANY);
500 	if (g_mbuf_mp == NULL) {
501 		SPDK_ERRLOG("Cannot create mbuf pool\n");
502 		rc = -ENOMEM;
503 		goto error_create_mbuf;
504 	}
505 
506 	/* We use per op private data to store the IV and our own struct
507 	 * for queueing ops.
508 	 */
509 	g_crypto_op_mp = rte_crypto_op_pool_create("op_mp",
510 			 RTE_CRYPTO_OP_TYPE_SYMMETRIC,
511 			 NUM_MBUFS,
512 			 POOL_CACHE_SIZE,
513 			 AES_CBC_IV_LENGTH + QUEUED_OP_LENGTH,
514 			 rte_socket_id());
515 
516 	if (g_crypto_op_mp == NULL) {
517 		SPDK_ERRLOG("Cannot create op pool\n");
518 		rc = -ENOMEM;
519 		goto error_create_op;
520 	}
521 
522 	/* Init all devices */
523 	for (i = 0; i < cdev_count; i++) {
524 		rc = create_vbdev_dev(i, num_lcores);
525 		if (rc) {
526 			goto err;
527 		}
528 	}
529 
530 	/* Assign index values to the QAT device qp nodes so that we can
531 	 * assign them for optimal performance.
532 	 */
533 	i = 0;
534 	TAILQ_FOREACH(dev_qp, &g_device_qp_qat, link) {
535 		dev_qp->index = i++;
536 	}
537 
538 	g_shinfo.free_cb = shinfo_free_cb;
539 	return 0;
540 
541 	/* Error cleanup paths. */
542 err:
543 	TAILQ_FOREACH_SAFE(device, &g_vbdev_devs, link, tmp_dev) {
544 		TAILQ_REMOVE(&g_vbdev_devs, device, link);
545 		release_vbdev_dev(device);
546 	}
547 	rte_mempool_free(g_crypto_op_mp);
548 	g_crypto_op_mp = NULL;
549 error_create_op:
550 	rte_mempool_free(g_mbuf_mp);
551 	g_mbuf_mp = NULL;
552 error_create_mbuf:
553 	rte_mempool_free(g_session_mp);
554 	g_session_mp = NULL;
555 error_create_session_mp:
556 	if (g_session_mp_priv != NULL) {
557 		rte_mempool_free(g_session_mp_priv);
558 		g_session_mp_priv = NULL;
559 	}
560 	return rc;
561 }
562 
563 /* Following an encrypt or decrypt we need to then either write the encrypted data or finish
564  * the read on decrypted data. Do that here.
565  */
566 static void
567 _crypto_operation_complete(struct spdk_bdev_io *bdev_io)
568 {
569 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
570 					   crypto_bdev);
571 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
572 	struct crypto_io_channel *crypto_ch = io_ctx->crypto_ch;
573 	struct spdk_bdev_io *free_me = io_ctx->read_io;
574 	int rc = 0;
575 
576 	/* Can also be called from the crypto_dev_poller() to fail the stuck re-enqueue ops IO. */
577 	if (io_ctx->on_pending_list) {
578 		TAILQ_REMOVE(&crypto_ch->pending_cry_ios, bdev_io, module_link);
579 		io_ctx->on_pending_list = false;
580 	}
581 
582 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
583 
584 		/* Complete the original IO and then free the one that we created
585 		 * as a result of issuing an IO via submit_request.
586 		 */
587 		if (io_ctx->bdev_io_status != SPDK_BDEV_IO_STATUS_FAILED) {
588 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
589 		} else {
590 			SPDK_ERRLOG("Issue with decryption on bdev_io %p\n", bdev_io);
591 			rc = -EINVAL;
592 		}
593 		spdk_bdev_free_io(free_me);
594 
595 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
596 
597 		if (io_ctx->bdev_io_status != SPDK_BDEV_IO_STATUS_FAILED) {
598 			/* Write the encrypted data. */
599 			rc = spdk_bdev_writev_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
600 						     &io_ctx->aux_buf_iov, 1, io_ctx->aux_offset_blocks,
601 						     io_ctx->aux_num_blocks, _complete_internal_write,
602 						     bdev_io);
603 		} else {
604 			SPDK_ERRLOG("Issue with encryption on bdev_io %p\n", bdev_io);
605 			rc = -EINVAL;
606 		}
607 
608 	} else {
609 		SPDK_ERRLOG("Unknown bdev type %u on crypto operation completion\n",
610 			    bdev_io->type);
611 		rc = -EINVAL;
612 	}
613 
614 	if (rc) {
615 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
616 	}
617 }
618 
619 static void
620 cancel_queued_crypto_ops(struct crypto_io_channel *crypto_ch, struct spdk_bdev_io *bdev_io)
621 {
622 	struct rte_mbuf *mbufs_to_free[2 * MAX_DEQUEUE_BURST_SIZE];
623 	struct rte_crypto_op *dequeued_ops[MAX_DEQUEUE_BURST_SIZE];
624 	struct vbdev_crypto_op *op_to_cancel, *tmp_op;
625 	struct rte_crypto_op *crypto_op;
626 	int num_mbufs, num_dequeued_ops;
627 
628 	/* Remove all ops from the failed IO. Since we don't know the
629 	 * order we have to check them all. */
630 	num_mbufs = 0;
631 	num_dequeued_ops = 0;
632 	TAILQ_FOREACH_SAFE(op_to_cancel, &crypto_ch->queued_cry_ops, link, tmp_op) {
633 		/* Checking if this is our op. One IO contains multiple ops. */
634 		if (bdev_io == op_to_cancel->bdev_io) {
635 			crypto_op = op_to_cancel->crypto_op;
636 			TAILQ_REMOVE(&crypto_ch->queued_cry_ops, op_to_cancel, link);
637 
638 			/* Populating lists for freeing mbufs and ops. */
639 			mbufs_to_free[num_mbufs++] = (void *)crypto_op->sym->m_src;
640 			if (crypto_op->sym->m_dst) {
641 				mbufs_to_free[num_mbufs++] = (void *)crypto_op->sym->m_dst;
642 			}
643 			dequeued_ops[num_dequeued_ops++] = crypto_op;
644 		}
645 	}
646 
647 	/* Now bulk free both mbufs and crypto operations. */
648 	if (num_dequeued_ops > 0) {
649 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)dequeued_ops,
650 				     num_dequeued_ops);
651 		assert(num_mbufs > 0);
652 		/* This also releases chained mbufs if any. */
653 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
654 	}
655 }
656 
657 static int _crypto_operation(struct spdk_bdev_io *bdev_io,
658 			     enum rte_crypto_cipher_operation crypto_op,
659 			     void *aux_buf);
660 
661 /* This is the poller for the crypto device. It uses a single API to dequeue whatever is ready at
662  * the device. Then we need to decide if what we've got so far (including previous poller
663  * runs) totals up to one or more complete bdev_ios and if so continue with the bdev_io
664  * accordingly. This means either completing a read or issuing a new write.
665  */
666 static int
667 crypto_dev_poller(void *args)
668 {
669 	struct crypto_io_channel *crypto_ch = args;
670 	uint8_t cdev_id = crypto_ch->device_qp->device->cdev_id;
671 	int i, num_dequeued_ops, num_enqueued_ops;
672 	struct spdk_bdev_io *bdev_io = NULL;
673 	struct crypto_bdev_io *io_ctx = NULL;
674 	struct rte_crypto_op *dequeued_ops[MAX_DEQUEUE_BURST_SIZE];
675 	struct rte_mbuf *mbufs_to_free[2 * MAX_DEQUEUE_BURST_SIZE];
676 	int num_mbufs = 0;
677 	struct vbdev_crypto_op *op_to_resubmit;
678 
679 	/* Each run of the poller will get just what the device has available
680 	 * at the moment we call it, we don't check again after draining the
681 	 * first batch.
682 	 */
683 	num_dequeued_ops = rte_cryptodev_dequeue_burst(cdev_id, crypto_ch->device_qp->qp,
684 			   dequeued_ops, MAX_DEQUEUE_BURST_SIZE);
685 
686 	/* Check if operation was processed successfully */
687 	for (i = 0; i < num_dequeued_ops; i++) {
688 
689 		/* We don't know the order or association of the crypto ops wrt any
690 		 * particular bdev_io so need to look at each and determine if it's
691 		 * the last one for it's bdev_io or not.
692 		 */
693 		bdev_io = (struct spdk_bdev_io *)*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset,
694 				uint64_t *);
695 		assert(bdev_io != NULL);
696 		io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
697 
698 		if (dequeued_ops[i]->status != RTE_CRYPTO_OP_STATUS_SUCCESS) {
699 			SPDK_ERRLOG("error with op %d status %u\n", i,
700 				    dequeued_ops[i]->status);
701 			/* Update the bdev status to error, we'll still process the
702 			 * rest of the crypto ops for this bdev_io though so they
703 			 * aren't left hanging.
704 			 */
705 			io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
706 		}
707 
708 		assert(io_ctx->cryop_cnt_remaining > 0);
709 
710 		/* Return the associated src and dst mbufs by collecting them into
711 		 * an array that we can use the bulk API to free after the loop.
712 		 */
713 		*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset, uint64_t *) = 0;
714 		mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_src;
715 		if (dequeued_ops[i]->sym->m_dst) {
716 			mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_dst;
717 		}
718 
719 		/* done encrypting, complete the bdev_io */
720 		if (--io_ctx->cryop_cnt_remaining == 0) {
721 
722 			/* If we're completing this with an outstanding reset we need
723 			 * to fail it.
724 			 */
725 			if (crypto_ch->iter) {
726 				io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
727 			}
728 
729 			/* Complete the IO */
730 			_crypto_operation_complete(bdev_io);
731 		}
732 	}
733 
734 	/* Now bulk free both mbufs and crypto operations. */
735 	if (num_dequeued_ops > 0) {
736 		rte_mempool_put_bulk(g_crypto_op_mp,
737 				     (void **)dequeued_ops,
738 				     num_dequeued_ops);
739 		assert(num_mbufs > 0);
740 		/* This also releases chained mbufs if any. */
741 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
742 	}
743 
744 	/* Check if there are any pending crypto ops to process */
745 	while (!TAILQ_EMPTY(&crypto_ch->queued_cry_ops)) {
746 		op_to_resubmit = TAILQ_FIRST(&crypto_ch->queued_cry_ops);
747 		bdev_io = op_to_resubmit->bdev_io;
748 		io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
749 		num_enqueued_ops = rte_cryptodev_enqueue_burst(op_to_resubmit->cdev_id,
750 				   op_to_resubmit->qp,
751 				   &op_to_resubmit->crypto_op,
752 				   1);
753 		if (num_enqueued_ops == 1) {
754 			/* Make sure we don't put this on twice as one bdev_io is made up
755 			 * of many crypto ops.
756 			 */
757 			if (io_ctx->on_pending_list == false) {
758 				TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link);
759 				io_ctx->on_pending_list = true;
760 			}
761 			TAILQ_REMOVE(&crypto_ch->queued_cry_ops, op_to_resubmit, link);
762 		} else {
763 			if (op_to_resubmit->crypto_op->status == RTE_CRYPTO_OP_STATUS_NOT_PROCESSED) {
764 				/* If we couldn't get one, just break and try again later. */
765 				break;
766 			} else {
767 				/* Something is really wrong with the op. Most probably the
768 				 * mbuf is broken or the HW is not able to process the request.
769 				 * Fail the IO and remove its ops from the queued ops list. */
770 				io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
771 
772 				cancel_queued_crypto_ops(crypto_ch, bdev_io);
773 
774 				/* Fail the IO if there is nothing left on device. */
775 				if (--io_ctx->cryop_cnt_remaining == 0) {
776 					_crypto_operation_complete(bdev_io);
777 				}
778 			}
779 
780 		}
781 	}
782 
783 	/* If the channel iter is not NULL, we need to continue to poll
784 	 * until the pending list is empty, then we can move on to the
785 	 * next channel.
786 	 */
787 	if (crypto_ch->iter && TAILQ_EMPTY(&crypto_ch->pending_cry_ios)) {
788 		SPDK_NOTICELOG("Channel %p has been quiesced.\n", crypto_ch);
789 		spdk_for_each_channel_continue(crypto_ch->iter, 0);
790 		crypto_ch->iter = NULL;
791 	}
792 
793 	return num_dequeued_ops;
794 }
795 
796 /* Allocate the new mbuf of @remainder size with data pointed by @addr and attach
797  * it to the @orig_mbuf. */
798 static int
799 mbuf_chain_remainder(struct spdk_bdev_io *bdev_io, struct rte_mbuf *orig_mbuf,
800 		     uint8_t *addr, uint32_t remainder)
801 {
802 	uint64_t phys_addr, phys_len;
803 	struct rte_mbuf *chain_mbuf;
804 	int rc;
805 
806 	phys_len = remainder;
807 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
808 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len != remainder)) {
809 		return -EFAULT;
810 	}
811 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&chain_mbuf, 1);
812 	if (spdk_unlikely(rc)) {
813 		return -ENOMEM;
814 	}
815 	/* Store context in every mbuf as we don't know anything about completion order */
816 	*RTE_MBUF_DYNFIELD(chain_mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)bdev_io;
817 	rte_pktmbuf_attach_extbuf(chain_mbuf, addr, phys_addr, phys_len, &g_shinfo);
818 	rte_pktmbuf_append(chain_mbuf, phys_len);
819 
820 	/* Chained buffer is released by rte_pktbuf_free_bulk() automagicaly. */
821 	rte_pktmbuf_chain(orig_mbuf, chain_mbuf);
822 	return 0;
823 }
824 
825 /* Attach data buffer pointed by @addr to @mbuf. Return utilized len of the
826  * contiguous space that was physically available. */
827 static uint64_t
828 mbuf_attach_buf(struct spdk_bdev_io *bdev_io, struct rte_mbuf *mbuf,
829 		uint8_t *addr, uint32_t len)
830 {
831 	uint64_t phys_addr, phys_len;
832 
833 	/* Store context in every mbuf as we don't know anything about completion order */
834 	*RTE_MBUF_DYNFIELD(mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)bdev_io;
835 
836 	phys_len = len;
837 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
838 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len == 0)) {
839 		return 0;
840 	}
841 	assert(phys_len <= len);
842 
843 	/* Set the mbuf elements address and length. */
844 	rte_pktmbuf_attach_extbuf(mbuf, addr, phys_addr, phys_len, &g_shinfo);
845 	rte_pktmbuf_append(mbuf, phys_len);
846 
847 	return phys_len;
848 }
849 
850 /* We're either encrypting on the way down or decrypting on the way back. */
851 static int
852 _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation crypto_op,
853 		  void *aux_buf)
854 {
855 	uint16_t num_enqueued_ops = 0;
856 	uint32_t cryop_cnt = bdev_io->u.bdev.num_blocks;
857 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
858 	struct crypto_io_channel *crypto_ch = io_ctx->crypto_ch;
859 	uint8_t cdev_id = crypto_ch->device_qp->device->cdev_id;
860 	uint32_t crypto_len = io_ctx->crypto_bdev->crypto_bdev.blocklen;
861 	uint64_t total_length = bdev_io->u.bdev.num_blocks * crypto_len;
862 	int rc;
863 	uint32_t iov_index = 0;
864 	uint32_t allocated = 0;
865 	uint8_t *current_iov = NULL;
866 	uint64_t total_remaining = 0;
867 	uint64_t current_iov_remaining = 0;
868 	uint32_t crypto_index = 0;
869 	uint32_t en_offset = 0;
870 	struct rte_crypto_op *crypto_ops[MAX_ENQUEUE_ARRAY_SIZE];
871 	struct rte_mbuf *src_mbufs[MAX_ENQUEUE_ARRAY_SIZE];
872 	struct rte_mbuf *dst_mbufs[MAX_ENQUEUE_ARRAY_SIZE];
873 	int burst;
874 	struct vbdev_crypto_op *op_to_queue;
875 	uint64_t alignment = spdk_bdev_get_buf_align(&io_ctx->crypto_bdev->crypto_bdev);
876 
877 	assert((bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen) <= CRYPTO_MAX_IO);
878 
879 	/* Get the number of source mbufs that we need. These will always be 1:1 because we
880 	 * don't support chaining. The reason we don't is because of our decision to use
881 	 * LBA as IV, there can be no case where we'd need >1 mbuf per crypto op or the
882 	 * op would be > 1 LBA.
883 	 */
884 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, src_mbufs, cryop_cnt);
885 	if (rc) {
886 		SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
887 		return -ENOMEM;
888 	}
889 
890 	/* Get the same amount but these buffers to describe the encrypted data location (dst). */
891 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
892 		rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, dst_mbufs, cryop_cnt);
893 		if (rc) {
894 			SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
895 			rc = -ENOMEM;
896 			goto error_get_dst;
897 		}
898 	}
899 
900 #ifdef __clang_analyzer__
901 	/* silence scan-build false positive */
902 	SPDK_CLANG_ANALYZER_PREINIT_PTR_ARRAY(crypto_ops, MAX_ENQUEUE_ARRAY_SIZE, 0x1000);
903 #endif
904 	/* Allocate crypto operations. */
905 	allocated = rte_crypto_op_bulk_alloc(g_crypto_op_mp,
906 					     RTE_CRYPTO_OP_TYPE_SYMMETRIC,
907 					     crypto_ops, cryop_cnt);
908 	if (allocated < cryop_cnt) {
909 		SPDK_ERRLOG("ERROR trying to get crypto ops!\n");
910 		rc = -ENOMEM;
911 		goto error_get_ops;
912 	}
913 
914 	/* For encryption, we need to prepare a single contiguous buffer as the encryption
915 	 * destination, we'll then pass that along for the write after encryption is done.
916 	 * This is done to avoiding encrypting the provided write buffer which may be
917 	 * undesirable in some use cases.
918 	 */
919 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
920 		io_ctx->aux_buf_iov.iov_len = total_length;
921 		io_ctx->aux_buf_raw = aux_buf;
922 		io_ctx->aux_buf_iov.iov_base  = (void *)(((uintptr_t)aux_buf + (alignment - 1)) & ~(alignment - 1));
923 		io_ctx->aux_offset_blocks = bdev_io->u.bdev.offset_blocks;
924 		io_ctx->aux_num_blocks = bdev_io->u.bdev.num_blocks;
925 	}
926 
927 	/* This value is used in the completion callback to determine when the bdev_io is
928 	 * complete.
929 	 */
930 	io_ctx->cryop_cnt_remaining = cryop_cnt;
931 
932 	/* As we don't support chaining because of a decision to use LBA as IV, construction
933 	 * of crypto operations is straightforward. We build both the op, the mbuf and the
934 	 * dst_mbuf in our local arrays by looping through the length of the bdev IO and
935 	 * picking off LBA sized blocks of memory from the IOVs as we walk through them. Each
936 	 * LBA sized chunk of memory will correspond 1:1 to a crypto operation and a single
937 	 * mbuf per crypto operation.
938 	 */
939 	total_remaining = total_length;
940 	current_iov = bdev_io->u.bdev.iovs[iov_index].iov_base;
941 	current_iov_remaining = bdev_io->u.bdev.iovs[iov_index].iov_len;
942 	do {
943 		uint8_t *iv_ptr;
944 		uint8_t *buf_addr;
945 		uint64_t phys_len;
946 		uint32_t remainder;
947 		uint64_t op_block_offset;
948 
949 		phys_len = mbuf_attach_buf(bdev_io, src_mbufs[crypto_index],
950 					   current_iov, crypto_len);
951 		if (spdk_unlikely(phys_len == 0)) {
952 			goto error_attach_session;
953 			rc = -EFAULT;
954 		}
955 
956 		/* Handle the case of page boundary. */
957 		remainder = crypto_len - phys_len;
958 		if (spdk_unlikely(remainder > 0)) {
959 			rc = mbuf_chain_remainder(bdev_io, src_mbufs[crypto_index],
960 						  current_iov + phys_len, remainder);
961 			if (spdk_unlikely(rc)) {
962 				goto error_attach_session;
963 			}
964 		}
965 
966 		/* Set the IV - we use the LBA of the crypto_op */
967 		iv_ptr = rte_crypto_op_ctod_offset(crypto_ops[crypto_index], uint8_t *,
968 						   IV_OFFSET);
969 		memset(iv_ptr, 0, AES_CBC_IV_LENGTH);
970 		op_block_offset = bdev_io->u.bdev.offset_blocks + crypto_index;
971 		rte_memcpy(iv_ptr, &op_block_offset, sizeof(uint64_t));
972 
973 		/* Set the data to encrypt/decrypt length */
974 		crypto_ops[crypto_index]->sym->cipher.data.length = crypto_len;
975 		crypto_ops[crypto_index]->sym->cipher.data.offset = 0;
976 
977 		/* link the mbuf to the crypto op. */
978 		crypto_ops[crypto_index]->sym->m_src = src_mbufs[crypto_index];
979 
980 		/* For encrypt, point the destination to a buffer we allocate and redirect the bdev_io
981 		 * that will be used to process the write on completion to the same buffer. Setting
982 		 * up the en_buffer is a little simpler as we know the destination buffer is single IOV.
983 		 */
984 		if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
985 			buf_addr = io_ctx->aux_buf_iov.iov_base + en_offset;
986 			phys_len = mbuf_attach_buf(bdev_io, dst_mbufs[crypto_index],
987 						   buf_addr, crypto_len);
988 			if (spdk_unlikely(phys_len == 0)) {
989 				rc = -EFAULT;
990 				goto error_attach_session;
991 			}
992 
993 			crypto_ops[crypto_index]->sym->m_dst = dst_mbufs[crypto_index];
994 			en_offset += phys_len;
995 
996 			/* Handle the case of page boundary. */
997 			remainder = crypto_len - phys_len;
998 			if (spdk_unlikely(remainder > 0)) {
999 				rc = mbuf_chain_remainder(bdev_io, dst_mbufs[crypto_index],
1000 							  buf_addr + phys_len, remainder);
1001 				if (spdk_unlikely(rc)) {
1002 					goto error_attach_session;
1003 				}
1004 				en_offset += remainder;
1005 			}
1006 
1007 			/* Attach the crypto session to the operation */
1008 			rc = rte_crypto_op_attach_sym_session(crypto_ops[crypto_index],
1009 							      io_ctx->crypto_bdev->session_encrypt);
1010 			if (rc) {
1011 				rc = -EINVAL;
1012 				goto error_attach_session;
1013 			}
1014 		} else {
1015 			crypto_ops[crypto_index]->sym->m_dst = NULL;
1016 
1017 			/* Attach the crypto session to the operation */
1018 			rc = rte_crypto_op_attach_sym_session(crypto_ops[crypto_index],
1019 							      io_ctx->crypto_bdev->session_decrypt);
1020 			if (rc) {
1021 				rc = -EINVAL;
1022 				goto error_attach_session;
1023 			}
1024 		}
1025 
1026 		/* Subtract our running totals for the op in progress and the overall bdev io */
1027 		total_remaining -= crypto_len;
1028 		current_iov_remaining -= crypto_len;
1029 
1030 		/* move our current IOV pointer accordingly. */
1031 		current_iov += crypto_len;
1032 
1033 		/* move on to the next crypto operation */
1034 		crypto_index++;
1035 
1036 		/* If we're done with this IOV, move to the next one. */
1037 		if (current_iov_remaining == 0 && total_remaining > 0) {
1038 			iov_index++;
1039 			current_iov = bdev_io->u.bdev.iovs[iov_index].iov_base;
1040 			current_iov_remaining = bdev_io->u.bdev.iovs[iov_index].iov_len;
1041 		}
1042 	} while (total_remaining > 0);
1043 
1044 	/* Enqueue everything we've got but limit by the max number of descriptors we
1045 	 * configured the crypto device for.
1046 	 */
1047 	burst = spdk_min(cryop_cnt, CRYPTO_QP_DESCRIPTORS);
1048 	num_enqueued_ops = rte_cryptodev_enqueue_burst(cdev_id, crypto_ch->device_qp->qp,
1049 			   &crypto_ops[0],
1050 			   burst);
1051 
1052 	/* Add this bdev_io to our outstanding list if any of its crypto ops made it. */
1053 	if (num_enqueued_ops > 0) {
1054 		TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link);
1055 		io_ctx->on_pending_list = true;
1056 	}
1057 	/* We were unable to enqueue everything but did get some, so need to decide what
1058 	 * to do based on the status of the last op.
1059 	 */
1060 	if (num_enqueued_ops < cryop_cnt) {
1061 		switch (crypto_ops[num_enqueued_ops]->status) {
1062 		case RTE_CRYPTO_OP_STATUS_NOT_PROCESSED:
1063 			/* Queue them up on a linked list to be resubmitted via the poller. */
1064 			for (crypto_index = num_enqueued_ops; crypto_index < cryop_cnt; crypto_index++) {
1065 				op_to_queue = (struct vbdev_crypto_op *)rte_crypto_op_ctod_offset(crypto_ops[crypto_index],
1066 						uint8_t *, QUEUED_OP_OFFSET);
1067 				op_to_queue->cdev_id = cdev_id;
1068 				op_to_queue->qp = crypto_ch->device_qp->qp;
1069 				op_to_queue->crypto_op = crypto_ops[crypto_index];
1070 				op_to_queue->bdev_io = bdev_io;
1071 				TAILQ_INSERT_TAIL(&crypto_ch->queued_cry_ops,
1072 						  op_to_queue,
1073 						  link);
1074 			}
1075 			break;
1076 		default:
1077 			/* For all other statuses, set the io_ctx bdev_io status so that
1078 			 * the poller will pick the failure up for the overall bdev status.
1079 			 */
1080 			io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
1081 			if (num_enqueued_ops == 0) {
1082 				/* If nothing was enqueued, but the last one wasn't because of
1083 				 * busy, fail it now as the poller won't know anything about it.
1084 				 */
1085 				rc = -EINVAL;
1086 				goto error_attach_session;
1087 			}
1088 			break;
1089 		}
1090 	}
1091 
1092 	return rc;
1093 
1094 	/* Error cleanup paths. */
1095 error_attach_session:
1096 error_get_ops:
1097 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
1098 		/* This also releases chained mbufs if any. */
1099 		rte_pktmbuf_free_bulk(dst_mbufs, cryop_cnt);
1100 	}
1101 	if (allocated > 0) {
1102 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops,
1103 				     allocated);
1104 	}
1105 error_get_dst:
1106 	/* This also releases chained mbufs if any. */
1107 	rte_pktmbuf_free_bulk(src_mbufs, cryop_cnt);
1108 	return rc;
1109 }
1110 
1111 /* This function is called after all channels have been quiesced following
1112  * a bdev reset.
1113  */
1114 static void
1115 _ch_quiesce_done(struct spdk_io_channel_iter *i, int status)
1116 {
1117 	struct crypto_bdev_io *io_ctx = spdk_io_channel_iter_get_ctx(i);
1118 
1119 	assert(TAILQ_EMPTY(&io_ctx->crypto_ch->pending_cry_ios));
1120 	assert(io_ctx->orig_io != NULL);
1121 
1122 	spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
1123 }
1124 
1125 /* This function is called per channel to quiesce IOs before completing a
1126  * bdev reset that we received.
1127  */
1128 static void
1129 _ch_quiesce(struct spdk_io_channel_iter *i)
1130 {
1131 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1132 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1133 
1134 	crypto_ch->iter = i;
1135 	/* When the poller runs, it will see the non-NULL iter and handle
1136 	 * the quiesce.
1137 	 */
1138 }
1139 
1140 /* Completion callback for IO that were issued from this bdev other than read/write.
1141  * They have their own for readability.
1142  */
1143 static void
1144 _complete_internal_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1145 {
1146 	struct spdk_bdev_io *orig_io = cb_arg;
1147 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1148 
1149 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1150 		struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1151 
1152 		assert(orig_io == orig_ctx->orig_io);
1153 
1154 		spdk_bdev_free_io(bdev_io);
1155 
1156 		spdk_for_each_channel(orig_ctx->crypto_bdev,
1157 				      _ch_quiesce,
1158 				      orig_ctx,
1159 				      _ch_quiesce_done);
1160 		return;
1161 	}
1162 
1163 	spdk_bdev_io_complete(orig_io, status);
1164 	spdk_bdev_free_io(bdev_io);
1165 }
1166 
1167 /* Completion callback for writes that were issued from this bdev. */
1168 static void
1169 _complete_internal_write(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1170 {
1171 	struct spdk_bdev_io *orig_io = cb_arg;
1172 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1173 	struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1174 
1175 	spdk_bdev_io_put_aux_buf(orig_io, orig_ctx->aux_buf_raw);
1176 
1177 	spdk_bdev_io_complete(orig_io, status);
1178 	spdk_bdev_free_io(bdev_io);
1179 }
1180 
1181 /* Completion callback for reads that were issued from this bdev. */
1182 static void
1183 _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1184 {
1185 	struct spdk_bdev_io *orig_io = cb_arg;
1186 	struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1187 
1188 	if (success) {
1189 
1190 		/* Save off this bdev_io so it can be freed after decryption. */
1191 		orig_ctx->read_io = bdev_io;
1192 
1193 		if (!_crypto_operation(orig_io, RTE_CRYPTO_CIPHER_OP_DECRYPT, NULL)) {
1194 			return;
1195 		} else {
1196 			SPDK_ERRLOG("ERROR decrypting\n");
1197 		}
1198 	} else {
1199 		SPDK_ERRLOG("ERROR on read prior to decrypting\n");
1200 	}
1201 
1202 	spdk_bdev_io_complete(orig_io, SPDK_BDEV_IO_STATUS_FAILED);
1203 	spdk_bdev_free_io(bdev_io);
1204 }
1205 
1206 static void
1207 vbdev_crypto_resubmit_io(void *arg)
1208 {
1209 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
1210 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1211 
1212 	vbdev_crypto_submit_request(io_ctx->ch, bdev_io);
1213 }
1214 
1215 static void
1216 vbdev_crypto_queue_io(struct spdk_bdev_io *bdev_io)
1217 {
1218 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1219 	int rc;
1220 
1221 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
1222 	io_ctx->bdev_io_wait.cb_fn = vbdev_crypto_resubmit_io;
1223 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
1224 
1225 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->crypto_ch->base_ch, &io_ctx->bdev_io_wait);
1226 	if (rc != 0) {
1227 		SPDK_ERRLOG("Queue io failed in vbdev_crypto_queue_io, rc=%d.\n", rc);
1228 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1229 	}
1230 }
1231 
1232 /* Callback for getting a buf from the bdev pool in the event that the caller passed
1233  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
1234  * beneath us before we're done with it.
1235  */
1236 static void
1237 crypto_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
1238 		       bool success)
1239 {
1240 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
1241 					   crypto_bdev);
1242 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1243 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1244 	int rc;
1245 
1246 	if (!success) {
1247 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1248 		return;
1249 	}
1250 
1251 	rc = spdk_bdev_readv_blocks(crypto_bdev->base_desc, crypto_ch->base_ch, bdev_io->u.bdev.iovs,
1252 				    bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
1253 				    bdev_io->u.bdev.num_blocks, _complete_internal_read,
1254 				    bdev_io);
1255 	if (rc != 0) {
1256 		if (rc == -ENOMEM) {
1257 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1258 			io_ctx->ch = ch;
1259 			vbdev_crypto_queue_io(bdev_io);
1260 		} else {
1261 			SPDK_ERRLOG("ERROR on bdev_io submission!\n");
1262 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1263 		}
1264 	}
1265 }
1266 
1267 /* For encryption we don't want to encrypt the data in place as the host isn't
1268  * expecting us to mangle its data buffers so we need to encrypt into the bdev
1269  * aux buffer, then we can use that as the source for the disk data transfer.
1270  */
1271 static void
1272 crypto_write_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
1273 			void *aux_buf)
1274 {
1275 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1276 	int rc = 0;
1277 
1278 	rc = _crypto_operation(bdev_io, RTE_CRYPTO_CIPHER_OP_ENCRYPT, aux_buf);
1279 	if (rc != 0) {
1280 		spdk_bdev_io_put_aux_buf(bdev_io, aux_buf);
1281 		if (rc == -ENOMEM) {
1282 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1283 			io_ctx->ch = ch;
1284 			vbdev_crypto_queue_io(bdev_io);
1285 		} else {
1286 			SPDK_ERRLOG("ERROR on bdev_io submission!\n");
1287 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1288 		}
1289 	}
1290 }
1291 
1292 /* Called when someone submits IO to this crypto vbdev. For IO's not relevant to crypto,
1293  * we're simply passing it on here via SPDK IO calls which in turn allocate another bdev IO
1294  * and call our cpl callback provided below along with the original bdev_io so that we can
1295  * complete it once this IO completes. For crypto operations, we'll either encrypt it first
1296  * (writes) then call back into bdev to submit it or we'll submit a read and then catch it
1297  * on the way back for decryption.
1298  */
1299 static void
1300 vbdev_crypto_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1301 {
1302 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
1303 					   crypto_bdev);
1304 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1305 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1306 	int rc = 0;
1307 
1308 	memset(io_ctx, 0, sizeof(struct crypto_bdev_io));
1309 	io_ctx->crypto_bdev = crypto_bdev;
1310 	io_ctx->crypto_ch = crypto_ch;
1311 	io_ctx->orig_io = bdev_io;
1312 	io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
1313 
1314 	switch (bdev_io->type) {
1315 	case SPDK_BDEV_IO_TYPE_READ:
1316 		spdk_bdev_io_get_buf(bdev_io, crypto_read_get_buf_cb,
1317 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1318 		break;
1319 	case SPDK_BDEV_IO_TYPE_WRITE:
1320 		/* Tell the bdev layer that we need an aux buf in addition to the data
1321 		 * buf already associated with the bdev.
1322 		 */
1323 		spdk_bdev_io_get_aux_buf(bdev_io, crypto_write_get_buf_cb);
1324 		break;
1325 	case SPDK_BDEV_IO_TYPE_UNMAP:
1326 		rc = spdk_bdev_unmap_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
1327 					    bdev_io->u.bdev.offset_blocks,
1328 					    bdev_io->u.bdev.num_blocks,
1329 					    _complete_internal_io, bdev_io);
1330 		break;
1331 	case SPDK_BDEV_IO_TYPE_FLUSH:
1332 		rc = spdk_bdev_flush_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
1333 					    bdev_io->u.bdev.offset_blocks,
1334 					    bdev_io->u.bdev.num_blocks,
1335 					    _complete_internal_io, bdev_io);
1336 		break;
1337 	case SPDK_BDEV_IO_TYPE_RESET:
1338 		rc = spdk_bdev_reset(crypto_bdev->base_desc, crypto_ch->base_ch,
1339 				     _complete_internal_io, bdev_io);
1340 		break;
1341 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1342 	default:
1343 		SPDK_ERRLOG("crypto: unknown I/O type %d\n", bdev_io->type);
1344 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1345 		return;
1346 	}
1347 
1348 	if (rc != 0) {
1349 		if (rc == -ENOMEM) {
1350 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1351 			io_ctx->ch = ch;
1352 			vbdev_crypto_queue_io(bdev_io);
1353 		} else {
1354 			SPDK_ERRLOG("ERROR on bdev_io submission!\n");
1355 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1356 		}
1357 	}
1358 }
1359 
1360 /* We'll just call the base bdev and let it answer except for WZ command which
1361  * we always say we don't support so that the bdev layer will actually send us
1362  * real writes that we can encrypt.
1363  */
1364 static bool
1365 vbdev_crypto_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1366 {
1367 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1368 
1369 	switch (io_type) {
1370 	case SPDK_BDEV_IO_TYPE_WRITE:
1371 	case SPDK_BDEV_IO_TYPE_UNMAP:
1372 	case SPDK_BDEV_IO_TYPE_RESET:
1373 	case SPDK_BDEV_IO_TYPE_READ:
1374 	case SPDK_BDEV_IO_TYPE_FLUSH:
1375 		return spdk_bdev_io_type_supported(crypto_bdev->base_bdev, io_type);
1376 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1377 	/* Force the bdev layer to issue actual writes of zeroes so we can
1378 	 * encrypt them as regular writes.
1379 	 */
1380 	default:
1381 		return false;
1382 	}
1383 }
1384 
1385 /* Callback for unregistering the IO device. */
1386 static void
1387 _device_unregister_cb(void *io_device)
1388 {
1389 	struct vbdev_crypto *crypto_bdev = io_device;
1390 
1391 	/* Done with this crypto_bdev. */
1392 	rte_cryptodev_sym_session_free(crypto_bdev->session_decrypt);
1393 	rte_cryptodev_sym_session_free(crypto_bdev->session_encrypt);
1394 	free(crypto_bdev->drv_name);
1395 	if (crypto_bdev->key) {
1396 		memset(crypto_bdev->key, 0, strnlen(crypto_bdev->key, (AES_CBC_KEY_LENGTH + 1)));
1397 		free(crypto_bdev->key);
1398 	}
1399 	if (crypto_bdev->key2) {
1400 		memset(crypto_bdev->key2, 0, strnlen(crypto_bdev->key2, (AES_XTS_KEY_LENGTH + 1)));
1401 		free(crypto_bdev->key2);
1402 	}
1403 	if (crypto_bdev->xts_key) {
1404 		memset(crypto_bdev->xts_key, 0, strnlen(crypto_bdev->xts_key, (AES_XTS_KEY_LENGTH * 2) + 1));
1405 		free(crypto_bdev->xts_key);
1406 	}
1407 	free(crypto_bdev->crypto_bdev.name);
1408 	free(crypto_bdev);
1409 }
1410 
1411 /* Wrapper for the bdev close operation. */
1412 static void
1413 _vbdev_crypto_destruct(void *ctx)
1414 {
1415 	struct spdk_bdev_desc *desc = ctx;
1416 
1417 	spdk_bdev_close(desc);
1418 }
1419 
1420 /* Called after we've unregistered following a hot remove callback.
1421  * Our finish entry point will be called next.
1422  */
1423 static int
1424 vbdev_crypto_destruct(void *ctx)
1425 {
1426 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1427 
1428 	/* Remove this device from the internal list */
1429 	TAILQ_REMOVE(&g_vbdev_crypto, crypto_bdev, link);
1430 
1431 	/* Unclaim the underlying bdev. */
1432 	spdk_bdev_module_release_bdev(crypto_bdev->base_bdev);
1433 
1434 	/* Close the underlying bdev on its same opened thread. */
1435 	if (crypto_bdev->thread && crypto_bdev->thread != spdk_get_thread()) {
1436 		spdk_thread_send_msg(crypto_bdev->thread, _vbdev_crypto_destruct, crypto_bdev->base_desc);
1437 	} else {
1438 		spdk_bdev_close(crypto_bdev->base_desc);
1439 	}
1440 
1441 	/* Unregister the io_device. */
1442 	spdk_io_device_unregister(crypto_bdev, _device_unregister_cb);
1443 
1444 	g_number_of_claimed_volumes--;
1445 
1446 	return 0;
1447 }
1448 
1449 /* We supplied this as an entry point for upper layers who want to communicate to this
1450  * bdev.  This is how they get a channel. We are passed the same context we provided when
1451  * we created our crypto vbdev in examine() which, for this bdev, is the address of one of
1452  * our context nodes. From here we'll ask the SPDK channel code to fill out our channel
1453  * struct and we'll keep it in our crypto node.
1454  */
1455 static struct spdk_io_channel *
1456 vbdev_crypto_get_io_channel(void *ctx)
1457 {
1458 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1459 
1460 	/* The IO channel code will allocate a channel for us which consists of
1461 	 * the SPDK channel structure plus the size of our crypto_io_channel struct
1462 	 * that we passed in when we registered our IO device. It will then call
1463 	 * our channel create callback to populate any elements that we need to
1464 	 * update.
1465 	 */
1466 	return spdk_get_io_channel(crypto_bdev);
1467 }
1468 
1469 /* This is the output for bdev_get_bdevs() for this vbdev */
1470 static int
1471 vbdev_crypto_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1472 {
1473 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1474 
1475 	spdk_json_write_name(w, "crypto");
1476 	spdk_json_write_object_begin(w);
1477 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(crypto_bdev->base_bdev));
1478 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&crypto_bdev->crypto_bdev));
1479 	spdk_json_write_named_string(w, "crypto_pmd", crypto_bdev->drv_name);
1480 	spdk_json_write_named_string(w, "key", crypto_bdev->key);
1481 	if (strcmp(crypto_bdev->cipher, AES_XTS) == 0) {
1482 		spdk_json_write_named_string(w, "key2", crypto_bdev->key2);
1483 	}
1484 	spdk_json_write_named_string(w, "cipher", crypto_bdev->cipher);
1485 	spdk_json_write_object_end(w);
1486 	return 0;
1487 }
1488 
1489 static int
1490 vbdev_crypto_config_json(struct spdk_json_write_ctx *w)
1491 {
1492 	struct vbdev_crypto *crypto_bdev;
1493 
1494 	TAILQ_FOREACH(crypto_bdev, &g_vbdev_crypto, link) {
1495 		spdk_json_write_object_begin(w);
1496 		spdk_json_write_named_string(w, "method", "bdev_crypto_create");
1497 		spdk_json_write_named_object_begin(w, "params");
1498 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(crypto_bdev->base_bdev));
1499 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&crypto_bdev->crypto_bdev));
1500 		spdk_json_write_named_string(w, "crypto_pmd", crypto_bdev->drv_name);
1501 		spdk_json_write_named_string(w, "key", crypto_bdev->key);
1502 		if (strcmp(crypto_bdev->cipher, AES_XTS) == 0) {
1503 			spdk_json_write_named_string(w, "key2", crypto_bdev->key2);
1504 		}
1505 		spdk_json_write_named_string(w, "cipher", crypto_bdev->cipher);
1506 		spdk_json_write_object_end(w);
1507 		spdk_json_write_object_end(w);
1508 	}
1509 	return 0;
1510 }
1511 
1512 /* Helper function for the channel creation callback. */
1513 static void
1514 _assign_device_qp(struct vbdev_crypto *crypto_bdev, struct device_qp *device_qp,
1515 		  struct crypto_io_channel *crypto_ch)
1516 {
1517 	pthread_mutex_lock(&g_device_qp_lock);
1518 	if (strcmp(crypto_bdev->drv_name, QAT) == 0) {
1519 		/* For some QAT devices, the optimal qp to use is every 32nd as this spreads the
1520 		 * workload out over the multiple virtual functions in the device. For the devices
1521 		 * where this isn't the case, it doesn't hurt.
1522 		 */
1523 		TAILQ_FOREACH(device_qp, &g_device_qp_qat, link) {
1524 			if (device_qp->index != g_next_qat_index) {
1525 				continue;
1526 			}
1527 			if (device_qp->in_use == false) {
1528 				crypto_ch->device_qp = device_qp;
1529 				device_qp->in_use = true;
1530 				g_next_qat_index = (g_next_qat_index + QAT_VF_SPREAD) % g_qat_total_qp;
1531 				break;
1532 			} else {
1533 				/* if the preferred index is used, skip to the next one in this set. */
1534 				g_next_qat_index = (g_next_qat_index + 1) % g_qat_total_qp;
1535 			}
1536 		}
1537 	} else if (strcmp(crypto_bdev->drv_name, AESNI_MB) == 0) {
1538 		TAILQ_FOREACH(device_qp, &g_device_qp_aesni_mb, link) {
1539 			if (device_qp->in_use == false) {
1540 				crypto_ch->device_qp = device_qp;
1541 				device_qp->in_use = true;
1542 				break;
1543 			}
1544 		}
1545 	}
1546 	pthread_mutex_unlock(&g_device_qp_lock);
1547 }
1548 
1549 /* We provide this callback for the SPDK channel code to create a channel using
1550  * the channel struct we provided in our module get_io_channel() entry point. Here
1551  * we get and save off an underlying base channel of the device below us so that
1552  * we can communicate with the base bdev on a per channel basis. We also register the
1553  * poller used to complete crypto operations from the device.
1554  */
1555 static int
1556 crypto_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1557 {
1558 	struct crypto_io_channel *crypto_ch = ctx_buf;
1559 	struct vbdev_crypto *crypto_bdev = io_device;
1560 	struct device_qp *device_qp = NULL;
1561 
1562 	crypto_ch->base_ch = spdk_bdev_get_io_channel(crypto_bdev->base_desc);
1563 	crypto_ch->poller = SPDK_POLLER_REGISTER(crypto_dev_poller, crypto_ch, 0);
1564 	crypto_ch->device_qp = NULL;
1565 
1566 	/* Assign a device/qp combination that is unique per channel per PMD. */
1567 	_assign_device_qp(crypto_bdev, device_qp, crypto_ch);
1568 	assert(crypto_ch->device_qp);
1569 
1570 	/* We use this queue to track outstanding IO in our layer. */
1571 	TAILQ_INIT(&crypto_ch->pending_cry_ios);
1572 
1573 	/* We use this to queue up crypto ops when the device is busy. */
1574 	TAILQ_INIT(&crypto_ch->queued_cry_ops);
1575 
1576 	return 0;
1577 }
1578 
1579 /* We provide this callback for the SPDK channel code to destroy a channel
1580  * created with our create callback. We just need to undo anything we did
1581  * when we created.
1582  */
1583 static void
1584 crypto_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1585 {
1586 	struct crypto_io_channel *crypto_ch = ctx_buf;
1587 
1588 	pthread_mutex_lock(&g_device_qp_lock);
1589 	crypto_ch->device_qp->in_use = false;
1590 	pthread_mutex_unlock(&g_device_qp_lock);
1591 
1592 	spdk_poller_unregister(&crypto_ch->poller);
1593 	spdk_put_io_channel(crypto_ch->base_ch);
1594 }
1595 
1596 /* Create the association from the bdev and vbdev name and insert
1597  * on the global list. */
1598 static int
1599 vbdev_crypto_insert_name(const char *bdev_name, const char *vbdev_name,
1600 			 const char *crypto_pmd, const char *key,
1601 			 const char *cipher, const char *key2)
1602 {
1603 	struct bdev_names *name;
1604 	int rc, j;
1605 	bool found = false;
1606 
1607 	TAILQ_FOREACH(name, &g_bdev_names, link) {
1608 		if (strcmp(vbdev_name, name->vbdev_name) == 0) {
1609 			SPDK_ERRLOG("crypto bdev %s already exists\n", vbdev_name);
1610 			return -EEXIST;
1611 		}
1612 	}
1613 
1614 	name = calloc(1, sizeof(struct bdev_names));
1615 	if (!name) {
1616 		SPDK_ERRLOG("could not allocate bdev_names\n");
1617 		return -ENOMEM;
1618 	}
1619 
1620 	name->bdev_name = strdup(bdev_name);
1621 	if (!name->bdev_name) {
1622 		SPDK_ERRLOG("could not allocate name->bdev_name\n");
1623 		rc = -ENOMEM;
1624 		goto error_alloc_bname;
1625 	}
1626 
1627 	name->vbdev_name = strdup(vbdev_name);
1628 	if (!name->vbdev_name) {
1629 		SPDK_ERRLOG("could not allocate name->vbdev_name\n");
1630 		rc = -ENOMEM;
1631 		goto error_alloc_vname;
1632 	}
1633 
1634 	name->drv_name = strdup(crypto_pmd);
1635 	if (!name->drv_name) {
1636 		SPDK_ERRLOG("could not allocate name->drv_name\n");
1637 		rc = -ENOMEM;
1638 		goto error_alloc_dname;
1639 	}
1640 	for (j = 0; j < MAX_NUM_DRV_TYPES ; j++) {
1641 		if (strcmp(crypto_pmd, g_driver_names[j]) == 0) {
1642 			found = true;
1643 			break;
1644 		}
1645 	}
1646 	if (!found) {
1647 		SPDK_ERRLOG("invalid crypto PMD type %s\n", crypto_pmd);
1648 		rc = -EINVAL;
1649 		goto error_invalid_pmd;
1650 	}
1651 
1652 	name->key = strdup(key);
1653 	if (!name->key) {
1654 		SPDK_ERRLOG("could not allocate name->key\n");
1655 		rc = -ENOMEM;
1656 		goto error_alloc_key;
1657 	}
1658 	if (strnlen(name->key, (AES_CBC_KEY_LENGTH + 1)) != AES_CBC_KEY_LENGTH) {
1659 		SPDK_ERRLOG("invalid AES_CBC key length\n");
1660 		rc = -EINVAL;
1661 		goto error_invalid_key;
1662 	}
1663 
1664 	if (strncmp(cipher, AES_XTS, sizeof(AES_XTS)) == 0) {
1665 		/* To please scan-build, input validation makes sure we can't
1666 		 * have this cipher without providing a key2.
1667 		 */
1668 		name->cipher = AES_XTS;
1669 		assert(key2);
1670 		if (strnlen(key2, (AES_XTS_KEY_LENGTH + 1)) != AES_XTS_KEY_LENGTH) {
1671 			SPDK_ERRLOG("invalid AES_XTS key length\n");
1672 			rc = -EINVAL;
1673 			goto error_invalid_key2;
1674 		}
1675 
1676 		name->key2 = strdup(key2);
1677 		if (!name->key2) {
1678 			SPDK_ERRLOG("could not allocate name->key2\n");
1679 			rc = -ENOMEM;
1680 			goto error_alloc_key2;
1681 		}
1682 	} else if (strncmp(cipher, AES_CBC, sizeof(AES_CBC)) == 0) {
1683 		name->cipher = AES_CBC;
1684 	} else {
1685 		SPDK_ERRLOG("Invalid cipher: %s\n", cipher);
1686 		rc = -EINVAL;
1687 		goto error_cipher;
1688 	}
1689 
1690 	TAILQ_INSERT_TAIL(&g_bdev_names, name, link);
1691 
1692 	return 0;
1693 
1694 	/* Error cleanup paths. */
1695 error_cipher:
1696 	free(name->key2);
1697 error_alloc_key2:
1698 error_invalid_key2:
1699 error_invalid_key:
1700 	free(name->key);
1701 error_alloc_key:
1702 error_invalid_pmd:
1703 	free(name->drv_name);
1704 error_alloc_dname:
1705 	free(name->vbdev_name);
1706 error_alloc_vname:
1707 	free(name->bdev_name);
1708 error_alloc_bname:
1709 	free(name);
1710 	return rc;
1711 }
1712 
1713 /* RPC entry point for crypto creation. */
1714 int
1715 create_crypto_disk(const char *bdev_name, const char *vbdev_name,
1716 		   const char *crypto_pmd, const char *key,
1717 		   const char *cipher, const char *key2)
1718 {
1719 	int rc;
1720 
1721 	rc = vbdev_crypto_insert_name(bdev_name, vbdev_name, crypto_pmd, key, cipher, key2);
1722 	if (rc) {
1723 		return rc;
1724 	}
1725 
1726 	rc = vbdev_crypto_claim(bdev_name);
1727 	if (rc == -ENODEV) {
1728 		SPDK_NOTICELOG("vbdev creation deferred pending base bdev arrival\n");
1729 		rc = 0;
1730 	}
1731 
1732 	return rc;
1733 }
1734 
1735 /* Called at driver init time, parses config file to prepare for examine calls,
1736  * also fully initializes the crypto drivers.
1737  */
1738 static int
1739 vbdev_crypto_init(void)
1740 {
1741 	int rc = 0;
1742 
1743 	/* Fully configure both SW and HW drivers. */
1744 	rc = vbdev_crypto_init_crypto_drivers();
1745 	if (rc) {
1746 		SPDK_ERRLOG("Error setting up crypto devices\n");
1747 	}
1748 
1749 	return rc;
1750 }
1751 
1752 /* Called when the entire module is being torn down. */
1753 static void
1754 vbdev_crypto_finish(void)
1755 {
1756 	struct bdev_names *name;
1757 	struct vbdev_dev *device;
1758 
1759 	while ((name = TAILQ_FIRST(&g_bdev_names))) {
1760 		TAILQ_REMOVE(&g_bdev_names, name, link);
1761 		free(name->drv_name);
1762 		free(name->key);
1763 		free(name->bdev_name);
1764 		free(name->vbdev_name);
1765 		free(name->key2);
1766 		free(name);
1767 	}
1768 
1769 	while ((device = TAILQ_FIRST(&g_vbdev_devs))) {
1770 		TAILQ_REMOVE(&g_vbdev_devs, device, link);
1771 		release_vbdev_dev(device);
1772 	}
1773 	rte_vdev_uninit(AESNI_MB);
1774 
1775 	/* These are removed in release_vbdev_dev() */
1776 	assert(TAILQ_EMPTY(&g_device_qp_qat));
1777 	assert(TAILQ_EMPTY(&g_device_qp_aesni_mb));
1778 
1779 	rte_mempool_free(g_crypto_op_mp);
1780 	rte_mempool_free(g_mbuf_mp);
1781 	rte_mempool_free(g_session_mp);
1782 	if (g_session_mp_priv != NULL) {
1783 		rte_mempool_free(g_session_mp_priv);
1784 	}
1785 }
1786 
1787 /* During init we'll be asked how much memory we'd like passed to us
1788  * in bev_io structures as context. Here's where we specify how
1789  * much context we want per IO.
1790  */
1791 static int
1792 vbdev_crypto_get_ctx_size(void)
1793 {
1794 	return sizeof(struct crypto_bdev_io);
1795 }
1796 
1797 static void
1798 vbdev_crypto_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1799 {
1800 	struct vbdev_crypto *crypto_bdev, *tmp;
1801 
1802 	TAILQ_FOREACH_SAFE(crypto_bdev, &g_vbdev_crypto, link, tmp) {
1803 		if (bdev_find == crypto_bdev->base_bdev) {
1804 			spdk_bdev_unregister(&crypto_bdev->crypto_bdev, NULL, NULL);
1805 		}
1806 	}
1807 }
1808 
1809 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1810 static void
1811 vbdev_crypto_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1812 				void *event_ctx)
1813 {
1814 	switch (type) {
1815 	case SPDK_BDEV_EVENT_REMOVE:
1816 		vbdev_crypto_base_bdev_hotremove_cb(bdev);
1817 		break;
1818 	default:
1819 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1820 		break;
1821 	}
1822 }
1823 
1824 static void
1825 vbdev_crypto_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1826 {
1827 	/* No config per bdev needed */
1828 }
1829 
1830 /* When we register our bdev this is how we specify our entry points. */
1831 static const struct spdk_bdev_fn_table vbdev_crypto_fn_table = {
1832 	.destruct		= vbdev_crypto_destruct,
1833 	.submit_request		= vbdev_crypto_submit_request,
1834 	.io_type_supported	= vbdev_crypto_io_type_supported,
1835 	.get_io_channel		= vbdev_crypto_get_io_channel,
1836 	.dump_info_json		= vbdev_crypto_dump_info_json,
1837 	.write_config_json	= vbdev_crypto_write_config_json
1838 };
1839 
1840 static struct spdk_bdev_module crypto_if = {
1841 	.name = "crypto",
1842 	.module_init = vbdev_crypto_init,
1843 	.get_ctx_size = vbdev_crypto_get_ctx_size,
1844 	.examine_config = vbdev_crypto_examine,
1845 	.module_fini = vbdev_crypto_finish,
1846 	.config_json = vbdev_crypto_config_json
1847 };
1848 
1849 SPDK_BDEV_MODULE_REGISTER(crypto, &crypto_if)
1850 
1851 static int
1852 vbdev_crypto_claim(const char *bdev_name)
1853 {
1854 	struct bdev_names *name;
1855 	struct vbdev_crypto *vbdev;
1856 	struct vbdev_dev *device;
1857 	struct spdk_bdev *bdev;
1858 	bool found = false;
1859 	int rc = 0;
1860 
1861 	if (g_number_of_claimed_volumes >= MAX_CRYPTO_VOLUMES) {
1862 		SPDK_DEBUGLOG(vbdev_crypto, "Reached max number of claimed volumes\n");
1863 		return -EINVAL;
1864 	}
1865 	g_number_of_claimed_volumes++;
1866 
1867 	/* Check our list of names from config versus this bdev and if
1868 	 * there's a match, create the crypto_bdev & bdev accordingly.
1869 	 */
1870 	TAILQ_FOREACH(name, &g_bdev_names, link) {
1871 		if (strcmp(name->bdev_name, bdev_name) != 0) {
1872 			continue;
1873 		}
1874 		SPDK_DEBUGLOG(vbdev_crypto, "Match on %s\n", bdev_name);
1875 
1876 		vbdev = calloc(1, sizeof(struct vbdev_crypto));
1877 		if (!vbdev) {
1878 			SPDK_ERRLOG("could not allocate crypto_bdev\n");
1879 			rc = -ENOMEM;
1880 			goto error_vbdev_alloc;
1881 		}
1882 
1883 		vbdev->crypto_bdev.name = strdup(name->vbdev_name);
1884 		if (!vbdev->crypto_bdev.name) {
1885 			SPDK_ERRLOG("could not allocate crypto_bdev name\n");
1886 			rc = -ENOMEM;
1887 			goto error_bdev_name;
1888 		}
1889 
1890 		vbdev->key = strdup(name->key);
1891 		if (!vbdev->key) {
1892 			SPDK_ERRLOG("could not allocate crypto_bdev key\n");
1893 			rc = -ENOMEM;
1894 			goto error_alloc_key;
1895 		}
1896 
1897 		if (name->key2) {
1898 			vbdev->key2 = strdup(name->key2);
1899 			if (!vbdev->key2) {
1900 				SPDK_ERRLOG("could not allocate crypto_bdev key2\n");
1901 				rc = -ENOMEM;
1902 				goto error_alloc_key2;
1903 			}
1904 		}
1905 
1906 		vbdev->drv_name = strdup(name->drv_name);
1907 		if (!vbdev->drv_name) {
1908 			SPDK_ERRLOG("could not allocate crypto_bdev drv_name\n");
1909 			rc = -ENOMEM;
1910 			goto error_drv_name;
1911 		}
1912 
1913 		vbdev->crypto_bdev.product_name = "crypto";
1914 
1915 		rc = spdk_bdev_open_ext(bdev_name, true, vbdev_crypto_base_bdev_event_cb,
1916 					NULL, &vbdev->base_desc);
1917 		if (rc) {
1918 			if (rc != -ENODEV) {
1919 				SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
1920 			}
1921 			goto error_open;
1922 		}
1923 
1924 		bdev = spdk_bdev_desc_get_bdev(vbdev->base_desc);
1925 		vbdev->base_bdev = bdev;
1926 
1927 		vbdev->crypto_bdev.write_cache = bdev->write_cache;
1928 		vbdev->cipher = AES_CBC;
1929 		if (strcmp(vbdev->drv_name, QAT) == 0) {
1930 			vbdev->crypto_bdev.required_alignment =
1931 				spdk_max(spdk_u32log2(bdev->blocklen), bdev->required_alignment);
1932 			SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1933 				       vbdev->crypto_bdev.required_alignment);
1934 			if (strcmp(name->cipher, AES_CBC) == 0) {
1935 				SPDK_NOTICELOG("QAT using cipher: AES_CBC\n");
1936 			} else {
1937 				SPDK_NOTICELOG("QAT using cipher: AES_XTS\n");
1938 				vbdev->cipher = AES_XTS;
1939 				/* DPDK expects they keys to be concatenated together. */
1940 				vbdev->xts_key = calloc(1, (AES_XTS_KEY_LENGTH * 2) + 1);
1941 				if (vbdev->xts_key == NULL) {
1942 					SPDK_ERRLOG("could not allocate memory for XTS key\n");
1943 					rc = -ENOMEM;
1944 					goto error_xts_key;
1945 				}
1946 				memcpy(vbdev->xts_key, vbdev->key, AES_XTS_KEY_LENGTH);
1947 				assert(name->key2);
1948 				memcpy(vbdev->xts_key + AES_XTS_KEY_LENGTH, name->key2, AES_XTS_KEY_LENGTH + 1);
1949 			}
1950 		} else {
1951 			vbdev->crypto_bdev.required_alignment = bdev->required_alignment;
1952 		}
1953 		/* Note: CRYPTO_MAX_IO is in units of bytes, optimal_io_boundary is
1954 		 * in units of blocks.
1955 		 */
1956 		if (bdev->optimal_io_boundary > 0) {
1957 			vbdev->crypto_bdev.optimal_io_boundary =
1958 				spdk_min((CRYPTO_MAX_IO / bdev->blocklen), bdev->optimal_io_boundary);
1959 		} else {
1960 			vbdev->crypto_bdev.optimal_io_boundary = (CRYPTO_MAX_IO / bdev->blocklen);
1961 		}
1962 		vbdev->crypto_bdev.split_on_optimal_io_boundary = true;
1963 		vbdev->crypto_bdev.blocklen = bdev->blocklen;
1964 		vbdev->crypto_bdev.blockcnt = bdev->blockcnt;
1965 
1966 		/* This is the context that is passed to us when the bdev
1967 		 * layer calls in so we'll save our crypto_bdev node here.
1968 		 */
1969 		vbdev->crypto_bdev.ctxt = vbdev;
1970 		vbdev->crypto_bdev.fn_table = &vbdev_crypto_fn_table;
1971 		vbdev->crypto_bdev.module = &crypto_if;
1972 		TAILQ_INSERT_TAIL(&g_vbdev_crypto, vbdev, link);
1973 
1974 		spdk_io_device_register(vbdev, crypto_bdev_ch_create_cb, crypto_bdev_ch_destroy_cb,
1975 					sizeof(struct crypto_io_channel), vbdev->crypto_bdev.name);
1976 
1977 		/* Save the thread where the base device is opened */
1978 		vbdev->thread = spdk_get_thread();
1979 
1980 		rc = spdk_bdev_module_claim_bdev(bdev, vbdev->base_desc, vbdev->crypto_bdev.module);
1981 		if (rc) {
1982 			SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(bdev));
1983 			goto error_claim;
1984 		}
1985 
1986 		/* To init the session we have to get the cryptoDev device ID for this vbdev */
1987 		TAILQ_FOREACH(device, &g_vbdev_devs, link) {
1988 			if (strcmp(device->cdev_info.driver_name, vbdev->drv_name) == 0) {
1989 				found = true;
1990 				break;
1991 			}
1992 		}
1993 		if (found == false) {
1994 			SPDK_ERRLOG("ERROR can't match crypto device driver to crypto vbdev!\n");
1995 			rc = -EINVAL;
1996 			goto error_cant_find_devid;
1997 		}
1998 
1999 		/* Get sessions. */
2000 		vbdev->session_encrypt = rte_cryptodev_sym_session_create(g_session_mp);
2001 		if (NULL == vbdev->session_encrypt) {
2002 			SPDK_ERRLOG("ERROR trying to create crypto session!\n");
2003 			rc = -EINVAL;
2004 			goto error_session_en_create;
2005 		}
2006 
2007 		vbdev->session_decrypt = rte_cryptodev_sym_session_create(g_session_mp);
2008 		if (NULL == vbdev->session_decrypt) {
2009 			SPDK_ERRLOG("ERROR trying to create crypto session!\n");
2010 			rc = -EINVAL;
2011 			goto error_session_de_create;
2012 		}
2013 
2014 		/* Init our per vbdev xform with the desired cipher options. */
2015 		vbdev->cipher_xform.type = RTE_CRYPTO_SYM_XFORM_CIPHER;
2016 		vbdev->cipher_xform.cipher.iv.offset = IV_OFFSET;
2017 		if (strcmp(name->cipher, AES_CBC) == 0) {
2018 			vbdev->cipher_xform.cipher.key.data = vbdev->key;
2019 			vbdev->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_CBC;
2020 			vbdev->cipher_xform.cipher.key.length = AES_CBC_KEY_LENGTH;
2021 		} else {
2022 			vbdev->cipher_xform.cipher.key.data = vbdev->xts_key;
2023 			vbdev->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_XTS;
2024 			vbdev->cipher_xform.cipher.key.length = AES_XTS_KEY_LENGTH * 2;
2025 		}
2026 		vbdev->cipher_xform.cipher.iv.length = AES_CBC_IV_LENGTH;
2027 
2028 		vbdev->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_ENCRYPT;
2029 		rc = rte_cryptodev_sym_session_init(device->cdev_id, vbdev->session_encrypt,
2030 						    &vbdev->cipher_xform,
2031 						    g_session_mp_priv ? g_session_mp_priv : g_session_mp);
2032 		if (rc < 0) {
2033 			SPDK_ERRLOG("ERROR trying to init encrypt session!\n");
2034 			rc = -EINVAL;
2035 			goto error_session_init;
2036 		}
2037 
2038 		vbdev->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_DECRYPT;
2039 		rc = rte_cryptodev_sym_session_init(device->cdev_id, vbdev->session_decrypt,
2040 						    &vbdev->cipher_xform,
2041 						    g_session_mp_priv ? g_session_mp_priv : g_session_mp);
2042 		if (rc < 0) {
2043 			SPDK_ERRLOG("ERROR trying to init decrypt session!\n");
2044 			rc = -EINVAL;
2045 			goto error_session_init;
2046 		}
2047 
2048 		rc = spdk_bdev_register(&vbdev->crypto_bdev);
2049 		if (rc < 0) {
2050 			SPDK_ERRLOG("ERROR trying to register bdev\n");
2051 			rc = -EINVAL;
2052 			goto error_bdev_register;
2053 		}
2054 		SPDK_DEBUGLOG(vbdev_crypto, "registered io_device and virtual bdev for: %s\n",
2055 			      name->vbdev_name);
2056 		break;
2057 	}
2058 
2059 	return rc;
2060 
2061 	/* Error cleanup paths. */
2062 error_bdev_register:
2063 error_session_init:
2064 	rte_cryptodev_sym_session_free(vbdev->session_decrypt);
2065 error_session_de_create:
2066 	rte_cryptodev_sym_session_free(vbdev->session_encrypt);
2067 error_session_en_create:
2068 error_cant_find_devid:
2069 	spdk_bdev_module_release_bdev(vbdev->base_bdev);
2070 error_claim:
2071 	TAILQ_REMOVE(&g_vbdev_crypto, vbdev, link);
2072 	spdk_io_device_unregister(vbdev, NULL);
2073 	if (vbdev->xts_key) {
2074 		memset(vbdev->xts_key, 0, AES_XTS_KEY_LENGTH * 2);
2075 		free(vbdev->xts_key);
2076 	}
2077 error_xts_key:
2078 	spdk_bdev_close(vbdev->base_desc);
2079 error_open:
2080 	free(vbdev->drv_name);
2081 error_drv_name:
2082 	if (vbdev->key2) {
2083 		memset(vbdev->key2, 0, strlen(vbdev->key2));
2084 		free(vbdev->key2);
2085 	}
2086 error_alloc_key2:
2087 	if (vbdev->key) {
2088 		memset(vbdev->key, 0, strlen(vbdev->key));
2089 		free(vbdev->key);
2090 	}
2091 error_alloc_key:
2092 	free(vbdev->crypto_bdev.name);
2093 error_bdev_name:
2094 	free(vbdev);
2095 error_vbdev_alloc:
2096 	g_number_of_claimed_volumes--;
2097 	return rc;
2098 }
2099 
2100 /* RPC entry for deleting a crypto vbdev. */
2101 void
2102 delete_crypto_disk(struct spdk_bdev *bdev, spdk_delete_crypto_complete cb_fn,
2103 		   void *cb_arg)
2104 {
2105 	struct bdev_names *name;
2106 
2107 	if (!bdev || bdev->module != &crypto_if) {
2108 		cb_fn(cb_arg, -ENODEV);
2109 		return;
2110 	}
2111 
2112 	/* Remove the association (vbdev, bdev) from g_bdev_names. This is required so that the
2113 	 * vbdev does not get re-created if the same bdev is constructed at some other time,
2114 	 * unless the underlying bdev was hot-removed.
2115 	 */
2116 	TAILQ_FOREACH(name, &g_bdev_names, link) {
2117 		if (strcmp(name->vbdev_name, bdev->name) == 0) {
2118 			TAILQ_REMOVE(&g_bdev_names, name, link);
2119 			free(name->bdev_name);
2120 			free(name->vbdev_name);
2121 			free(name->drv_name);
2122 			free(name->key);
2123 			free(name->key2);
2124 			free(name);
2125 			break;
2126 		}
2127 	}
2128 
2129 	/* Additional cleanup happens in the destruct callback. */
2130 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
2131 }
2132 
2133 /* Because we specified this function in our crypto bdev function table when we
2134  * registered our crypto bdev, we'll get this call anytime a new bdev shows up.
2135  * Here we need to decide if we care about it and if so what to do. We
2136  * parsed the config file at init so we check the new bdev against the list
2137  * we built up at that time and if the user configured us to attach to this
2138  * bdev, here's where we do it.
2139  */
2140 static void
2141 vbdev_crypto_examine(struct spdk_bdev *bdev)
2142 {
2143 	vbdev_crypto_claim(spdk_bdev_get_name(bdev));
2144 	spdk_bdev_module_examine_done(&crypto_if);
2145 }
2146 
2147 SPDK_LOG_REGISTER_COMPONENT(vbdev_crypto)
2148