xref: /spdk/module/bdev/crypto/vbdev_crypto.c (revision 307b8c112ffd90a26d53dd15fad67bd9038ef526)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES.
5  *   All rights reserved.
6  */
7 
8 #include "vbdev_crypto.h"
9 
10 #include "spdk/env.h"
11 #include "spdk/likely.h"
12 #include "spdk/endian.h"
13 #include "spdk/thread.h"
14 #include "spdk/bdev_module.h"
15 #include "spdk/log.h"
16 #include "spdk/hexlify.h"
17 
18 #include <rte_config.h>
19 #include <rte_bus_vdev.h>
20 #include <rte_crypto.h>
21 #include <rte_cryptodev.h>
22 #include <rte_mbuf_dyn.h>
23 
24 /* Used to store IO context in mbuf */
25 static const struct rte_mbuf_dynfield rte_mbuf_dynfield_io_context = {
26 	.name = "context_bdev_io",
27 	.size = sizeof(uint64_t),
28 	.align = __alignof__(uint64_t),
29 	.flags = 0,
30 };
31 static int g_mbuf_offset;
32 
33 /* To add support for new device types, follow the examples of the following...
34  * Note that the string names are defined by the DPDK PMD in question so be
35  * sure to use the exact names.
36  */
37 #define MAX_NUM_DRV_TYPES 3
38 
39 /* The VF spread is the number of queue pairs between virtual functions, we use this to
40  * load balance the QAT device.
41  */
42 #define QAT_VF_SPREAD 32
43 static uint8_t g_qat_total_qp = 0;
44 static uint8_t g_next_qat_index;
45 
46 const char *g_driver_names[MAX_NUM_DRV_TYPES] = { AESNI_MB, QAT, MLX5 };
47 
48 /* Global list of available crypto devices. */
49 struct vbdev_dev {
50 	struct rte_cryptodev_info	cdev_info;	/* includes device friendly name */
51 	uint8_t				cdev_id;	/* identifier for the device */
52 	TAILQ_ENTRY(vbdev_dev)		link;
53 };
54 static TAILQ_HEAD(, vbdev_dev) g_vbdev_devs = TAILQ_HEAD_INITIALIZER(g_vbdev_devs);
55 
56 /* Global list and lock for unique device/queue pair combos. We keep 1 list per supported PMD
57  * so that we can optimize per PMD where it make sense. For example, with QAT there an optimal
58  * pattern for assigning queue pairs where with AESNI there is not.
59  */
60 struct device_qp {
61 	struct vbdev_dev		*device;	/* ptr to crypto device */
62 	uint8_t				qp;		/* queue pair for this node */
63 	bool				in_use;		/* whether this node is in use or not */
64 	uint8_t				index;		/* used by QAT to load balance placement of qpairs */
65 	TAILQ_ENTRY(device_qp)		link;
66 };
67 static TAILQ_HEAD(, device_qp) g_device_qp_qat = TAILQ_HEAD_INITIALIZER(g_device_qp_qat);
68 static TAILQ_HEAD(, device_qp) g_device_qp_aesni_mb = TAILQ_HEAD_INITIALIZER(g_device_qp_aesni_mb);
69 static TAILQ_HEAD(, device_qp) g_device_qp_mlx5 = TAILQ_HEAD_INITIALIZER(g_device_qp_mlx5);
70 static pthread_mutex_t g_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
71 
72 
73 /* In order to limit the number of resources we need to do one crypto
74  * operation per LBA (we use LBA as IV), we tell the bdev layer that
75  * our max IO size is something reasonable. Units here are in bytes.
76  */
77 #define CRYPTO_MAX_IO		(64 * 1024)
78 
79 /* This controls how many ops will be dequeued from the crypto driver in one run
80  * of the poller. It is mainly a performance knob as it effectively determines how
81  * much work the poller has to do.  However even that can vary between crypto drivers
82  * as the AESNI_MB driver for example does all the crypto work on dequeue whereas the
83  * QAT driver just dequeues what has been completed already.
84  */
85 #define MAX_DEQUEUE_BURST_SIZE	64
86 
87 /* When enqueueing, we need to supply the crypto driver with an array of pointers to
88  * operation structs. As each of these can be max 512B, we can adjust the CRYPTO_MAX_IO
89  * value in conjunction with the other defines to make sure we're not using crazy amounts
90  * of memory. All of these numbers can and probably should be adjusted based on the
91  * workload. By default we'll use the worst case (smallest) block size for the
92  * minimum number of array entries. As an example, a CRYPTO_MAX_IO size of 64K with 512B
93  * blocks would give us an enqueue array size of 128.
94  */
95 #define MAX_ENQUEUE_ARRAY_SIZE (CRYPTO_MAX_IO / 512)
96 
97 /* The number of MBUFS we need must be a power of two and to support other small IOs
98  * in addition to the limits mentioned above, we go to the next power of two. It is
99  * big number because it is one mempool for source and destination mbufs. It may
100  * need to be bigger to support multiple crypto drivers at once.
101  */
102 #define NUM_MBUFS		32768
103 #define POOL_CACHE_SIZE		256
104 #define MAX_CRYPTO_VOLUMES	128
105 #define NUM_SESSIONS		(2 * MAX_CRYPTO_VOLUMES)
106 #define SESS_MEMPOOL_CACHE_SIZE 0
107 uint8_t g_number_of_claimed_volumes = 0;
108 
109 /* This is the max number of IOs we can supply to any crypto device QP at one time.
110  * It can vary between drivers.
111  */
112 #define CRYPTO_QP_DESCRIPTORS	2048
113 
114 /* At this moment DPDK descriptors allocation for mlx5 has some issues. We use 512
115  * as an compromise value between performance and the time spent for initialization. */
116 #define CRYPTO_QP_DESCRIPTORS_MLX5	512
117 
118 #define AESNI_MB_NUM_QP		64
119 
120 /* Common for suported devices. */
121 #define DEFAULT_NUM_XFORMS           2
122 #define IV_OFFSET (sizeof(struct rte_crypto_op) + \
123                 sizeof(struct rte_crypto_sym_op) + \
124                 (DEFAULT_NUM_XFORMS * \
125                  sizeof(struct rte_crypto_sym_xform)))
126 #define IV_LENGTH		     16
127 #define QUEUED_OP_OFFSET (IV_OFFSET + IV_LENGTH)
128 
129 static void _complete_internal_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
130 static void _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
131 static void _complete_internal_write(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg);
132 static void vbdev_crypto_examine(struct spdk_bdev *bdev);
133 static int vbdev_crypto_claim(const char *bdev_name);
134 static void vbdev_crypto_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
135 
136 struct bdev_names {
137 	struct vbdev_crypto_opts	*opts;
138 	TAILQ_ENTRY(bdev_names)		link;
139 };
140 
141 /* List of crypto_bdev names and their base bdevs via configuration file. */
142 static TAILQ_HEAD(, bdev_names) g_bdev_names = TAILQ_HEAD_INITIALIZER(g_bdev_names);
143 
144 struct vbdev_crypto {
145 	struct spdk_bdev		*base_bdev;		/* the thing we're attaching to */
146 	struct spdk_bdev_desc		*base_desc;		/* its descriptor we get from open */
147 	struct spdk_bdev		crypto_bdev;		/* the crypto virtual bdev */
148 	struct vbdev_crypto_opts	*opts;			/* crypto options such as key, cipher */
149 	uint32_t			qp_desc_nr;             /* number of qp descriptors */
150 	struct rte_cryptodev_sym_session *session_encrypt;	/* encryption session for this bdev */
151 	struct rte_cryptodev_sym_session *session_decrypt;	/* decryption session for this bdev */
152 	struct rte_crypto_sym_xform	cipher_xform;		/* crypto control struct for this bdev */
153 	TAILQ_ENTRY(vbdev_crypto)	link;
154 	struct spdk_thread		*thread;		/* thread where base device is opened */
155 };
156 
157 /* List of virtual bdevs and associated info for each. We keep the device friendly name here even
158  * though its also in the device struct because we use it early on.
159  */
160 static TAILQ_HEAD(, vbdev_crypto) g_vbdev_crypto = TAILQ_HEAD_INITIALIZER(g_vbdev_crypto);
161 
162 /* Shared mempools between all devices on this system */
163 static struct rte_mempool *g_session_mp = NULL;
164 static struct rte_mempool *g_session_mp_priv = NULL;
165 static struct rte_mempool *g_mbuf_mp = NULL;            /* mbuf mempool */
166 static struct rte_mempool *g_crypto_op_mp = NULL;	/* crypto operations, must be rte* mempool */
167 
168 static struct rte_mbuf_ext_shared_info g_shinfo = {};   /* used by DPDK mbuf macro */
169 
170 /* For queueing up crypto operations that we can't submit for some reason */
171 struct vbdev_crypto_op {
172 	uint8_t					cdev_id;
173 	uint8_t					qp;
174 	struct rte_crypto_op			*crypto_op;
175 	struct spdk_bdev_io			*bdev_io;
176 	TAILQ_ENTRY(vbdev_crypto_op)		link;
177 };
178 #define QUEUED_OP_LENGTH (sizeof(struct vbdev_crypto_op))
179 
180 /* The crypto vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
181  * We store things in here that are needed on per thread basis like the base_channel for this thread,
182  * and the poller for this thread.
183  */
184 struct crypto_io_channel {
185 	struct spdk_io_channel		*base_ch;		/* IO channel of base device */
186 	struct spdk_poller		*poller;		/* completion poller */
187 	struct device_qp		*device_qp;		/* unique device/qp combination for this channel */
188 	TAILQ_HEAD(, spdk_bdev_io)	pending_cry_ios;	/* outstanding operations to the crypto device */
189 	struct spdk_io_channel_iter	*iter;			/* used with for_each_channel in reset */
190 	TAILQ_HEAD(, vbdev_crypto_op)	queued_cry_ops;		/* queued for re-submission to CryptoDev */
191 };
192 
193 /* This is the crypto per IO context that the bdev layer allocates for us opaquely and attaches to
194  * each IO for us.
195  */
196 struct crypto_bdev_io {
197 	int cryop_cnt_remaining;			/* counter used when completing crypto ops */
198 	struct crypto_io_channel *crypto_ch;		/* need to store for crypto completion handling */
199 	struct vbdev_crypto *crypto_bdev;		/* the crypto node struct associated with this IO */
200 	struct spdk_bdev_io *orig_io;			/* the original IO */
201 	struct spdk_bdev_io *read_io;			/* the read IO we issued */
202 	int8_t bdev_io_status;				/* the status we'll report back on the bdev IO */
203 	bool on_pending_list;
204 	/* Used for the single contiguous buffer that serves as the crypto destination target for writes */
205 	uint64_t aux_num_blocks;			/* num of blocks for the contiguous buffer */
206 	uint64_t aux_offset_blocks;			/* block offset on media */
207 	void *aux_buf_raw;				/* raw buffer that the bdev layer gave us for write buffer */
208 	struct iovec aux_buf_iov;			/* iov representing aligned contig write buffer */
209 
210 	/* for bdev_io_wait */
211 	struct spdk_bdev_io_wait_entry bdev_io_wait;
212 	struct spdk_io_channel *ch;
213 };
214 
215 /* Called by vbdev_crypto_init_crypto_drivers() to init each discovered crypto device */
216 static int
217 create_vbdev_dev(uint8_t index, uint16_t num_lcores)
218 {
219 	struct vbdev_dev *device;
220 	uint8_t j, cdev_id, cdrv_id;
221 	struct device_qp *dev_qp;
222 	struct device_qp *tmp_qp;
223 	uint32_t qp_desc_nr;
224 	int rc;
225 	TAILQ_HEAD(device_qps, device_qp) *dev_qp_head;
226 
227 	device = calloc(1, sizeof(struct vbdev_dev));
228 	if (!device) {
229 		return -ENOMEM;
230 	}
231 
232 	/* Get details about this device. */
233 	rte_cryptodev_info_get(index, &device->cdev_info);
234 	cdrv_id = device->cdev_info.driver_id;
235 	cdev_id = device->cdev_id = index;
236 
237 	/* QAT_ASYM devices are not supported at this time. */
238 	if (strcmp(device->cdev_info.driver_name, QAT_ASYM) == 0) {
239 		free(device);
240 		return 0;
241 	}
242 
243 	/* Before going any further, make sure we have enough resources for this
244 	 * device type to function.  We need a unique queue pair per core accross each
245 	 * device type to remain lockless....
246 	 */
247 	if ((rte_cryptodev_device_count_by_driver(cdrv_id) *
248 	     device->cdev_info.max_nb_queue_pairs) < num_lcores) {
249 		SPDK_ERRLOG("Insufficient unique queue pairs available for %s\n",
250 			    device->cdev_info.driver_name);
251 		SPDK_ERRLOG("Either add more crypto devices or decrease core count\n");
252 		rc = -EINVAL;
253 		goto err;
254 	}
255 
256 	/* Setup queue pairs. */
257 	struct rte_cryptodev_config conf = {
258 		.nb_queue_pairs = device->cdev_info.max_nb_queue_pairs,
259 		.socket_id = SPDK_ENV_SOCKET_ID_ANY
260 	};
261 
262 	rc = rte_cryptodev_configure(cdev_id, &conf);
263 	if (rc < 0) {
264 		SPDK_ERRLOG("Failed to configure cryptodev %u: error %d\n",
265 			    cdev_id, rc);
266 		rc = -EINVAL;
267 		goto err;
268 	}
269 
270 	/* Select the right device/qp list based on driver name
271 	 * or error if it does not exist.
272 	 */
273 	if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
274 		dev_qp_head = (struct device_qps *)&g_device_qp_qat;
275 		qp_desc_nr = CRYPTO_QP_DESCRIPTORS;
276 	} else if (strcmp(device->cdev_info.driver_name, AESNI_MB) == 0) {
277 		dev_qp_head = (struct device_qps *)&g_device_qp_aesni_mb;
278 		qp_desc_nr = CRYPTO_QP_DESCRIPTORS;
279 	} else if (strcmp(device->cdev_info.driver_name, MLX5) == 0) {
280 		dev_qp_head = (struct device_qps *)&g_device_qp_mlx5;
281 		qp_desc_nr = CRYPTO_QP_DESCRIPTORS_MLX5;
282 	} else {
283 		SPDK_ERRLOG("Failed to start device %u. Invalid driver name \"%s\"\n",
284 			    cdev_id, device->cdev_info.driver_name);
285 		rc = -EINVAL;
286 		goto err_qp_setup;
287 	}
288 
289 	struct rte_cryptodev_qp_conf qp_conf = {
290 		.nb_descriptors = qp_desc_nr,
291 		.mp_session = g_session_mp,
292 		.mp_session_private = g_session_mp_priv,
293 	};
294 
295 	/* Pre-setup all potential qpairs now and assign them in the channel
296 	 * callback. If we were to create them there, we'd have to stop the
297 	 * entire device affecting all other threads that might be using it
298 	 * even on other queue pairs.
299 	 */
300 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
301 		rc = rte_cryptodev_queue_pair_setup(cdev_id, j, &qp_conf, SOCKET_ID_ANY);
302 		if (rc < 0) {
303 			SPDK_ERRLOG("Failed to setup queue pair %u on "
304 				    "cryptodev %u: error %d\n", j, cdev_id, rc);
305 			rc = -EINVAL;
306 			goto err_qp_setup;
307 		}
308 	}
309 
310 	rc = rte_cryptodev_start(cdev_id);
311 	if (rc < 0) {
312 		SPDK_ERRLOG("Failed to start device %u: error %d\n",
313 			    cdev_id, rc);
314 		rc = -EINVAL;
315 		goto err_dev_start;
316 	}
317 
318 	/* Build up lists of device/qp combinations per PMD */
319 	for (j = 0; j < device->cdev_info.max_nb_queue_pairs; j++) {
320 		dev_qp = calloc(1, sizeof(struct device_qp));
321 		if (!dev_qp) {
322 			rc = -ENOMEM;
323 			goto err_qp_alloc;
324 		}
325 		dev_qp->device = device;
326 		dev_qp->qp = j;
327 		dev_qp->in_use = false;
328 		if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
329 			g_qat_total_qp++;
330 		}
331 		TAILQ_INSERT_TAIL(dev_qp_head, dev_qp, link);
332 	}
333 
334 	/* Add to our list of available crypto devices. */
335 	TAILQ_INSERT_TAIL(&g_vbdev_devs, device, link);
336 
337 	return 0;
338 err_qp_alloc:
339 	TAILQ_FOREACH_SAFE(dev_qp, dev_qp_head, link, tmp_qp) {
340 		if (dev_qp->device->cdev_id != device->cdev_id) {
341 			continue;
342 		}
343 		TAILQ_REMOVE(dev_qp_head, dev_qp, link);
344 		if (dev_qp_head == (struct device_qps *)&g_device_qp_qat) {
345 			g_qat_total_qp--;
346 		}
347 		free(dev_qp);
348 	}
349 	rte_cryptodev_stop(cdev_id);
350 err_dev_start:
351 err_qp_setup:
352 	rte_cryptodev_close(cdev_id);
353 err:
354 	free(device);
355 
356 	return rc;
357 }
358 
359 static void
360 release_vbdev_dev(struct vbdev_dev *device)
361 {
362 	struct device_qp *dev_qp;
363 	struct device_qp *tmp_qp;
364 	TAILQ_HEAD(device_qps, device_qp) *dev_qp_head = NULL;
365 
366 	assert(device);
367 
368 	/* Select the right device/qp list based on driver name. */
369 	if (strcmp(device->cdev_info.driver_name, QAT) == 0) {
370 		dev_qp_head = (struct device_qps *)&g_device_qp_qat;
371 	} else if (strcmp(device->cdev_info.driver_name, AESNI_MB) == 0) {
372 		dev_qp_head = (struct device_qps *)&g_device_qp_aesni_mb;
373 	} else if (strcmp(device->cdev_info.driver_name, MLX5) == 0) {
374 		dev_qp_head = (struct device_qps *)&g_device_qp_mlx5;
375 	}
376 	if (dev_qp_head) {
377 		TAILQ_FOREACH_SAFE(dev_qp, dev_qp_head, link, tmp_qp) {
378 			/* Remove only qps of our device even if the driver names matches. */
379 			if (dev_qp->device->cdev_id != device->cdev_id) {
380 				continue;
381 			}
382 			TAILQ_REMOVE(dev_qp_head, dev_qp, link);
383 			if (dev_qp_head == (struct device_qps *)&g_device_qp_qat) {
384 				g_qat_total_qp--;
385 			}
386 			free(dev_qp);
387 		}
388 	}
389 	rte_cryptodev_stop(device->cdev_id);
390 	rte_cryptodev_close(device->cdev_id);
391 	free(device);
392 }
393 
394 /* Dummy function used by DPDK to free ext attached buffers to mbufs, we free them ourselves but
395  * this callback has to be here. */
396 static void
397 shinfo_free_cb(void *arg1, void *arg2)
398 {
399 }
400 
401 /* This is called from the module's init function. We setup all crypto devices early on as we are unable
402  * to easily dynamically configure queue pairs after the drivers are up and running.  So, here, we
403  * configure the max capabilities of each device and assign threads to queue pairs as channels are
404  * requested.
405  */
406 static int
407 vbdev_crypto_init_crypto_drivers(void)
408 {
409 	uint8_t cdev_count;
410 	uint8_t cdev_id;
411 	int i, rc;
412 	struct vbdev_dev *device;
413 	struct vbdev_dev *tmp_dev;
414 	struct device_qp *dev_qp;
415 	unsigned int max_sess_size = 0, sess_size;
416 	uint16_t num_lcores = rte_lcore_count();
417 	char aesni_args[32];
418 
419 	/* Only the first call, via RPC or module init should init the crypto drivers. */
420 	if (g_session_mp != NULL) {
421 		return 0;
422 	}
423 
424 	/* We always init AESNI_MB */
425 	snprintf(aesni_args, sizeof(aesni_args), "max_nb_queue_pairs=%d", AESNI_MB_NUM_QP);
426 	rc = rte_vdev_init(AESNI_MB, aesni_args);
427 	if (rc) {
428 		SPDK_NOTICELOG("Failed to create virtual PMD %s: error %d. "
429 			       "Possibly %s is not supported by DPDK library. "
430 			       "Keep going...\n", AESNI_MB, rc, AESNI_MB);
431 	}
432 
433 	/* If we have no crypto devices, there's no reason to continue. */
434 	cdev_count = rte_cryptodev_count();
435 	SPDK_NOTICELOG("Found crypto devices: %d\n", (int)cdev_count);
436 	if (cdev_count == 0) {
437 		return 0;
438 	}
439 
440 	g_mbuf_offset = rte_mbuf_dynfield_register(&rte_mbuf_dynfield_io_context);
441 	if (g_mbuf_offset < 0) {
442 		SPDK_ERRLOG("error registering dynamic field with DPDK\n");
443 		return -EINVAL;
444 	}
445 
446 	/*
447 	 * Create global mempools, shared by all devices regardless of type.
448 	 */
449 
450 	/* First determine max session size, most pools are shared by all the devices,
451 	 * so we need to find the global max sessions size.
452 	 */
453 	for (cdev_id = 0; cdev_id < cdev_count; cdev_id++) {
454 		sess_size = rte_cryptodev_sym_get_private_session_size(cdev_id);
455 		if (sess_size > max_sess_size) {
456 			max_sess_size = sess_size;
457 		}
458 	}
459 
460 	g_session_mp_priv = rte_mempool_create("session_mp_priv", NUM_SESSIONS, max_sess_size,
461 					       SESS_MEMPOOL_CACHE_SIZE, 0, NULL, NULL, NULL,
462 					       NULL, SOCKET_ID_ANY, 0);
463 	if (g_session_mp_priv == NULL) {
464 		SPDK_ERRLOG("Cannot create private session pool max size 0x%x\n", max_sess_size);
465 		return -ENOMEM;
466 	}
467 
468 	g_session_mp = rte_cryptodev_sym_session_pool_create(
469 			       "session_mp",
470 			       NUM_SESSIONS, 0, SESS_MEMPOOL_CACHE_SIZE, 0,
471 			       SOCKET_ID_ANY);
472 	if (g_session_mp == NULL) {
473 		SPDK_ERRLOG("Cannot create session pool max size 0x%x\n", max_sess_size);
474 		rc = -ENOMEM;
475 		goto error_create_session_mp;
476 	}
477 
478 	g_mbuf_mp = rte_pktmbuf_pool_create("mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
479 					    0, 0, SPDK_ENV_SOCKET_ID_ANY);
480 	if (g_mbuf_mp == NULL) {
481 		SPDK_ERRLOG("Cannot create mbuf pool\n");
482 		rc = -ENOMEM;
483 		goto error_create_mbuf;
484 	}
485 
486 	/* We use per op private data as suggested by DPDK and to store the IV and
487 	 * our own struct for queueing ops.
488 	 */
489 	g_crypto_op_mp = rte_crypto_op_pool_create("op_mp",
490 			 RTE_CRYPTO_OP_TYPE_SYMMETRIC,
491 			 NUM_MBUFS,
492 			 POOL_CACHE_SIZE,
493 			 (DEFAULT_NUM_XFORMS *
494 			  sizeof(struct rte_crypto_sym_xform)) +
495 			 IV_LENGTH + QUEUED_OP_LENGTH,
496 			 rte_socket_id());
497 
498 	if (g_crypto_op_mp == NULL) {
499 		SPDK_ERRLOG("Cannot create op pool\n");
500 		rc = -ENOMEM;
501 		goto error_create_op;
502 	}
503 
504 	/* Init all devices */
505 	for (i = 0; i < cdev_count; i++) {
506 		rc = create_vbdev_dev(i, num_lcores);
507 		if (rc) {
508 			goto err;
509 		}
510 	}
511 
512 	/* Assign index values to the QAT device qp nodes so that we can
513 	 * assign them for optimal performance.
514 	 */
515 	i = 0;
516 	TAILQ_FOREACH(dev_qp, &g_device_qp_qat, link) {
517 		dev_qp->index = i++;
518 	}
519 
520 	g_shinfo.free_cb = shinfo_free_cb;
521 	return 0;
522 
523 	/* Error cleanup paths. */
524 err:
525 	TAILQ_FOREACH_SAFE(device, &g_vbdev_devs, link, tmp_dev) {
526 		TAILQ_REMOVE(&g_vbdev_devs, device, link);
527 		release_vbdev_dev(device);
528 	}
529 	rte_mempool_free(g_crypto_op_mp);
530 	g_crypto_op_mp = NULL;
531 error_create_op:
532 	rte_mempool_free(g_mbuf_mp);
533 	g_mbuf_mp = NULL;
534 error_create_mbuf:
535 	rte_mempool_free(g_session_mp);
536 	g_session_mp = NULL;
537 error_create_session_mp:
538 	if (g_session_mp_priv != NULL) {
539 		rte_mempool_free(g_session_mp_priv);
540 		g_session_mp_priv = NULL;
541 	}
542 	return rc;
543 }
544 
545 /* Following an encrypt or decrypt we need to then either write the encrypted data or finish
546  * the read on decrypted data. Do that here.
547  */
548 static void
549 _crypto_operation_complete(struct spdk_bdev_io *bdev_io)
550 {
551 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
552 					   crypto_bdev);
553 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
554 	struct crypto_io_channel *crypto_ch = io_ctx->crypto_ch;
555 	struct spdk_bdev_io *free_me = io_ctx->read_io;
556 	int rc = 0;
557 
558 	/* Can also be called from the crypto_dev_poller() to fail the stuck re-enqueue ops IO. */
559 	if (io_ctx->on_pending_list) {
560 		TAILQ_REMOVE(&crypto_ch->pending_cry_ios, bdev_io, module_link);
561 		io_ctx->on_pending_list = false;
562 	}
563 
564 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
565 
566 		/* Complete the original IO and then free the one that we created
567 		 * as a result of issuing an IO via submit_request.
568 		 */
569 		if (io_ctx->bdev_io_status != SPDK_BDEV_IO_STATUS_FAILED) {
570 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
571 		} else {
572 			SPDK_ERRLOG("Issue with decryption on bdev_io %p\n", bdev_io);
573 			rc = -EINVAL;
574 		}
575 		spdk_bdev_free_io(free_me);
576 
577 	} else if (bdev_io->type == SPDK_BDEV_IO_TYPE_WRITE) {
578 
579 		if (io_ctx->bdev_io_status != SPDK_BDEV_IO_STATUS_FAILED) {
580 			/* Write the encrypted data. */
581 			rc = spdk_bdev_writev_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
582 						     &io_ctx->aux_buf_iov, 1, io_ctx->aux_offset_blocks,
583 						     io_ctx->aux_num_blocks, _complete_internal_write,
584 						     bdev_io);
585 		} else {
586 			SPDK_ERRLOG("Issue with encryption on bdev_io %p\n", bdev_io);
587 			rc = -EINVAL;
588 		}
589 
590 	} else {
591 		SPDK_ERRLOG("Unknown bdev type %u on crypto operation completion\n",
592 			    bdev_io->type);
593 		rc = -EINVAL;
594 	}
595 
596 	if (rc) {
597 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
598 	}
599 }
600 
601 static void
602 cancel_queued_crypto_ops(struct crypto_io_channel *crypto_ch, struct spdk_bdev_io *bdev_io)
603 {
604 	struct rte_mbuf *mbufs_to_free[2 * MAX_DEQUEUE_BURST_SIZE];
605 	struct rte_crypto_op *dequeued_ops[MAX_DEQUEUE_BURST_SIZE];
606 	struct vbdev_crypto_op *op_to_cancel, *tmp_op;
607 	struct rte_crypto_op *crypto_op;
608 	int num_mbufs, num_dequeued_ops;
609 
610 	/* Remove all ops from the failed IO. Since we don't know the
611 	 * order we have to check them all. */
612 	num_mbufs = 0;
613 	num_dequeued_ops = 0;
614 	TAILQ_FOREACH_SAFE(op_to_cancel, &crypto_ch->queued_cry_ops, link, tmp_op) {
615 		/* Checking if this is our op. One IO contains multiple ops. */
616 		if (bdev_io == op_to_cancel->bdev_io) {
617 			crypto_op = op_to_cancel->crypto_op;
618 			TAILQ_REMOVE(&crypto_ch->queued_cry_ops, op_to_cancel, link);
619 
620 			/* Populating lists for freeing mbufs and ops. */
621 			mbufs_to_free[num_mbufs++] = (void *)crypto_op->sym->m_src;
622 			if (crypto_op->sym->m_dst) {
623 				mbufs_to_free[num_mbufs++] = (void *)crypto_op->sym->m_dst;
624 			}
625 			dequeued_ops[num_dequeued_ops++] = crypto_op;
626 		}
627 	}
628 
629 	/* Now bulk free both mbufs and crypto operations. */
630 	if (num_dequeued_ops > 0) {
631 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)dequeued_ops,
632 				     num_dequeued_ops);
633 		assert(num_mbufs > 0);
634 		/* This also releases chained mbufs if any. */
635 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
636 	}
637 }
638 
639 static int _crypto_operation(struct spdk_bdev_io *bdev_io,
640 			     enum rte_crypto_cipher_operation crypto_op,
641 			     void *aux_buf);
642 
643 /* This is the poller for the crypto device. It uses a single API to dequeue whatever is ready at
644  * the device. Then we need to decide if what we've got so far (including previous poller
645  * runs) totals up to one or more complete bdev_ios and if so continue with the bdev_io
646  * accordingly. This means either completing a read or issuing a new write.
647  */
648 static int
649 crypto_dev_poller(void *args)
650 {
651 	struct crypto_io_channel *crypto_ch = args;
652 	uint8_t cdev_id = crypto_ch->device_qp->device->cdev_id;
653 	int i, num_dequeued_ops, num_enqueued_ops;
654 	struct spdk_bdev_io *bdev_io = NULL;
655 	struct crypto_bdev_io *io_ctx = NULL;
656 	struct rte_crypto_op *dequeued_ops[MAX_DEQUEUE_BURST_SIZE];
657 	struct rte_mbuf *mbufs_to_free[2 * MAX_DEQUEUE_BURST_SIZE];
658 	int num_mbufs = 0;
659 	struct vbdev_crypto_op *op_to_resubmit;
660 
661 	/* Each run of the poller will get just what the device has available
662 	 * at the moment we call it, we don't check again after draining the
663 	 * first batch.
664 	 */
665 	num_dequeued_ops = rte_cryptodev_dequeue_burst(cdev_id, crypto_ch->device_qp->qp,
666 			   dequeued_ops, MAX_DEQUEUE_BURST_SIZE);
667 
668 	/* Check if operation was processed successfully */
669 	for (i = 0; i < num_dequeued_ops; i++) {
670 
671 		/* We don't know the order or association of the crypto ops wrt any
672 		 * particular bdev_io so need to look at each and determine if it's
673 		 * the last one for it's bdev_io or not.
674 		 */
675 		bdev_io = (struct spdk_bdev_io *)*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset,
676 				uint64_t *);
677 		assert(bdev_io != NULL);
678 		io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
679 
680 		if (dequeued_ops[i]->status != RTE_CRYPTO_OP_STATUS_SUCCESS) {
681 			SPDK_ERRLOG("error with op %d status %u\n", i,
682 				    dequeued_ops[i]->status);
683 			/* Update the bdev status to error, we'll still process the
684 			 * rest of the crypto ops for this bdev_io though so they
685 			 * aren't left hanging.
686 			 */
687 			io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
688 		}
689 
690 		assert(io_ctx->cryop_cnt_remaining > 0);
691 
692 		/* Return the associated src and dst mbufs by collecting them into
693 		 * an array that we can use the bulk API to free after the loop.
694 		 */
695 		*RTE_MBUF_DYNFIELD(dequeued_ops[i]->sym->m_src, g_mbuf_offset, uint64_t *) = 0;
696 		mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_src;
697 		if (dequeued_ops[i]->sym->m_dst) {
698 			mbufs_to_free[num_mbufs++] = (void *)dequeued_ops[i]->sym->m_dst;
699 		}
700 
701 		/* done encrypting, complete the bdev_io */
702 		if (--io_ctx->cryop_cnt_remaining == 0) {
703 
704 			/* If we're completing this with an outstanding reset we need
705 			 * to fail it.
706 			 */
707 			if (crypto_ch->iter) {
708 				io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
709 			}
710 
711 			/* Complete the IO */
712 			_crypto_operation_complete(bdev_io);
713 		}
714 	}
715 
716 	/* Now bulk free both mbufs and crypto operations. */
717 	if (num_dequeued_ops > 0) {
718 		rte_mempool_put_bulk(g_crypto_op_mp,
719 				     (void **)dequeued_ops,
720 				     num_dequeued_ops);
721 		assert(num_mbufs > 0);
722 		/* This also releases chained mbufs if any. */
723 		rte_pktmbuf_free_bulk(mbufs_to_free, num_mbufs);
724 	}
725 
726 	/* Check if there are any pending crypto ops to process */
727 	while (!TAILQ_EMPTY(&crypto_ch->queued_cry_ops)) {
728 		op_to_resubmit = TAILQ_FIRST(&crypto_ch->queued_cry_ops);
729 		bdev_io = op_to_resubmit->bdev_io;
730 		io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
731 		num_enqueued_ops = rte_cryptodev_enqueue_burst(op_to_resubmit->cdev_id,
732 				   op_to_resubmit->qp,
733 				   &op_to_resubmit->crypto_op,
734 				   1);
735 		if (num_enqueued_ops == 1) {
736 			/* Make sure we don't put this on twice as one bdev_io is made up
737 			 * of many crypto ops.
738 			 */
739 			if (io_ctx->on_pending_list == false) {
740 				TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link);
741 				io_ctx->on_pending_list = true;
742 			}
743 			TAILQ_REMOVE(&crypto_ch->queued_cry_ops, op_to_resubmit, link);
744 		} else {
745 			if (op_to_resubmit->crypto_op->status == RTE_CRYPTO_OP_STATUS_NOT_PROCESSED) {
746 				/* If we couldn't get one, just break and try again later. */
747 				break;
748 			} else {
749 				/* Something is really wrong with the op. Most probably the
750 				 * mbuf is broken or the HW is not able to process the request.
751 				 * Fail the IO and remove its ops from the queued ops list. */
752 				io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
753 
754 				cancel_queued_crypto_ops(crypto_ch, bdev_io);
755 
756 				/* Fail the IO if there is nothing left on device. */
757 				if (--io_ctx->cryop_cnt_remaining == 0) {
758 					_crypto_operation_complete(bdev_io);
759 				}
760 			}
761 
762 		}
763 	}
764 
765 	/* If the channel iter is not NULL, we need to continue to poll
766 	 * until the pending list is empty, then we can move on to the
767 	 * next channel.
768 	 */
769 	if (crypto_ch->iter && TAILQ_EMPTY(&crypto_ch->pending_cry_ios)) {
770 		SPDK_NOTICELOG("Channel %p has been quiesced.\n", crypto_ch);
771 		spdk_for_each_channel_continue(crypto_ch->iter, 0);
772 		crypto_ch->iter = NULL;
773 	}
774 
775 	return num_dequeued_ops;
776 }
777 
778 /* Allocate the new mbuf of @remainder size with data pointed by @addr and attach
779  * it to the @orig_mbuf. */
780 static int
781 mbuf_chain_remainder(struct spdk_bdev_io *bdev_io, struct rte_mbuf *orig_mbuf,
782 		     uint8_t *addr, uint32_t remainder)
783 {
784 	uint64_t phys_addr, phys_len;
785 	struct rte_mbuf *chain_mbuf;
786 	int rc;
787 
788 	phys_len = remainder;
789 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
790 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len != remainder)) {
791 		return -EFAULT;
792 	}
793 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&chain_mbuf, 1);
794 	if (spdk_unlikely(rc)) {
795 		return -ENOMEM;
796 	}
797 	/* Store context in every mbuf as we don't know anything about completion order */
798 	*RTE_MBUF_DYNFIELD(chain_mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)bdev_io;
799 	rte_pktmbuf_attach_extbuf(chain_mbuf, addr, phys_addr, phys_len, &g_shinfo);
800 	rte_pktmbuf_append(chain_mbuf, phys_len);
801 
802 	/* Chained buffer is released by rte_pktbuf_free_bulk() automagicaly. */
803 	rte_pktmbuf_chain(orig_mbuf, chain_mbuf);
804 	return 0;
805 }
806 
807 /* Attach data buffer pointed by @addr to @mbuf. Return utilized len of the
808  * contiguous space that was physically available. */
809 static uint64_t
810 mbuf_attach_buf(struct spdk_bdev_io *bdev_io, struct rte_mbuf *mbuf,
811 		uint8_t *addr, uint32_t len)
812 {
813 	uint64_t phys_addr, phys_len;
814 
815 	/* Store context in every mbuf as we don't know anything about completion order */
816 	*RTE_MBUF_DYNFIELD(mbuf, g_mbuf_offset, uint64_t *) = (uint64_t)bdev_io;
817 
818 	phys_len = len;
819 	phys_addr = spdk_vtophys((void *)addr, &phys_len);
820 	if (spdk_unlikely(phys_addr == SPDK_VTOPHYS_ERROR || phys_len == 0)) {
821 		return 0;
822 	}
823 	assert(phys_len <= len);
824 
825 	/* Set the mbuf elements address and length. */
826 	rte_pktmbuf_attach_extbuf(mbuf, addr, phys_addr, phys_len, &g_shinfo);
827 	rte_pktmbuf_append(mbuf, phys_len);
828 
829 	return phys_len;
830 }
831 
832 /* We're either encrypting on the way down or decrypting on the way back. */
833 static int
834 _crypto_operation(struct spdk_bdev_io *bdev_io, enum rte_crypto_cipher_operation crypto_op,
835 		  void *aux_buf)
836 {
837 	uint16_t num_enqueued_ops = 0;
838 	uint32_t cryop_cnt = bdev_io->u.bdev.num_blocks;
839 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
840 	struct crypto_io_channel *crypto_ch = io_ctx->crypto_ch;
841 	uint8_t cdev_id = crypto_ch->device_qp->device->cdev_id;
842 	uint32_t crypto_len = io_ctx->crypto_bdev->crypto_bdev.blocklen;
843 	uint64_t total_length = bdev_io->u.bdev.num_blocks * crypto_len;
844 	int rc;
845 	uint32_t iov_index = 0;
846 	uint32_t allocated = 0;
847 	uint8_t *current_iov = NULL;
848 	uint64_t total_remaining = 0;
849 	uint64_t current_iov_remaining = 0;
850 	uint32_t crypto_index = 0;
851 	uint32_t en_offset = 0;
852 	struct rte_crypto_op *crypto_ops[MAX_ENQUEUE_ARRAY_SIZE];
853 	struct rte_mbuf *src_mbufs[MAX_ENQUEUE_ARRAY_SIZE];
854 	struct rte_mbuf *dst_mbufs[MAX_ENQUEUE_ARRAY_SIZE];
855 	int burst;
856 	struct vbdev_crypto_op *op_to_queue;
857 	uint64_t alignment = spdk_bdev_get_buf_align(&io_ctx->crypto_bdev->crypto_bdev);
858 
859 	assert((bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen) <= CRYPTO_MAX_IO);
860 
861 	/* Get the number of source mbufs that we need. These will always be 1:1 because we
862 	 * don't support chaining. The reason we don't is because of our decision to use
863 	 * LBA as IV, there can be no case where we'd need >1 mbuf per crypto op or the
864 	 * op would be > 1 LBA.
865 	 */
866 	rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, src_mbufs, cryop_cnt);
867 	if (rc) {
868 		SPDK_ERRLOG("Failed to get src_mbufs!\n");
869 		return -ENOMEM;
870 	}
871 
872 	/* Get the same amount but these buffers to describe the encrypted data location (dst). */
873 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
874 		rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, dst_mbufs, cryop_cnt);
875 		if (rc) {
876 			SPDK_ERRLOG("Failed to get dst_mbufs!\n");
877 			rc = -ENOMEM;
878 			goto error_get_dst;
879 		}
880 	}
881 
882 #ifdef __clang_analyzer__
883 	/* silence scan-build false positive */
884 	SPDK_CLANG_ANALYZER_PREINIT_PTR_ARRAY(crypto_ops, MAX_ENQUEUE_ARRAY_SIZE, 0x1000);
885 #endif
886 	/* Allocate crypto operations. */
887 	allocated = rte_crypto_op_bulk_alloc(g_crypto_op_mp,
888 					     RTE_CRYPTO_OP_TYPE_SYMMETRIC,
889 					     crypto_ops, cryop_cnt);
890 	if (allocated < cryop_cnt) {
891 		SPDK_ERRLOG("Failed to allocate crypto ops!\n");
892 		rc = -ENOMEM;
893 		goto error_get_ops;
894 	}
895 
896 	/* For encryption, we need to prepare a single contiguous buffer as the encryption
897 	 * destination, we'll then pass that along for the write after encryption is done.
898 	 * This is done to avoiding encrypting the provided write buffer which may be
899 	 * undesirable in some use cases.
900 	 */
901 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
902 		io_ctx->aux_buf_iov.iov_len = total_length;
903 		io_ctx->aux_buf_raw = aux_buf;
904 		io_ctx->aux_buf_iov.iov_base  = (void *)(((uintptr_t)aux_buf + (alignment - 1)) & ~(alignment - 1));
905 		io_ctx->aux_offset_blocks = bdev_io->u.bdev.offset_blocks;
906 		io_ctx->aux_num_blocks = bdev_io->u.bdev.num_blocks;
907 	}
908 
909 	/* This value is used in the completion callback to determine when the bdev_io is
910 	 * complete.
911 	 */
912 	io_ctx->cryop_cnt_remaining = cryop_cnt;
913 
914 	/* As we don't support chaining because of a decision to use LBA as IV, construction
915 	 * of crypto operations is straightforward. We build both the op, the mbuf and the
916 	 * dst_mbuf in our local arrays by looping through the length of the bdev IO and
917 	 * picking off LBA sized blocks of memory from the IOVs as we walk through them. Each
918 	 * LBA sized chunk of memory will correspond 1:1 to a crypto operation and a single
919 	 * mbuf per crypto operation.
920 	 */
921 	total_remaining = total_length;
922 	current_iov = bdev_io->u.bdev.iovs[iov_index].iov_base;
923 	current_iov_remaining = bdev_io->u.bdev.iovs[iov_index].iov_len;
924 	do {
925 		uint8_t *iv_ptr;
926 		uint8_t *buf_addr;
927 		uint64_t phys_len;
928 		uint32_t remainder;
929 		uint64_t op_block_offset;
930 
931 		phys_len = mbuf_attach_buf(bdev_io, src_mbufs[crypto_index],
932 					   current_iov, crypto_len);
933 		if (spdk_unlikely(phys_len == 0)) {
934 			goto error_attach_session;
935 			rc = -EFAULT;
936 		}
937 
938 		/* Handle the case of page boundary. */
939 		remainder = crypto_len - phys_len;
940 		if (spdk_unlikely(remainder > 0)) {
941 			rc = mbuf_chain_remainder(bdev_io, src_mbufs[crypto_index],
942 						  current_iov + phys_len, remainder);
943 			if (spdk_unlikely(rc)) {
944 				goto error_attach_session;
945 			}
946 		}
947 
948 		/* Set the IV - we use the LBA of the crypto_op */
949 		iv_ptr = rte_crypto_op_ctod_offset(crypto_ops[crypto_index], uint8_t *,
950 						   IV_OFFSET);
951 		memset(iv_ptr, 0, IV_LENGTH);
952 		op_block_offset = bdev_io->u.bdev.offset_blocks + crypto_index;
953 		rte_memcpy(iv_ptr, &op_block_offset, sizeof(uint64_t));
954 
955 		/* Set the data to encrypt/decrypt length */
956 		crypto_ops[crypto_index]->sym->cipher.data.length = crypto_len;
957 		crypto_ops[crypto_index]->sym->cipher.data.offset = 0;
958 
959 		/* link the mbuf to the crypto op. */
960 		crypto_ops[crypto_index]->sym->m_src = src_mbufs[crypto_index];
961 
962 		/* For encrypt, point the destination to a buffer we allocate and redirect the bdev_io
963 		 * that will be used to process the write on completion to the same buffer. Setting
964 		 * up the en_buffer is a little simpler as we know the destination buffer is single IOV.
965 		 */
966 		if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
967 			buf_addr = io_ctx->aux_buf_iov.iov_base + en_offset;
968 			phys_len = mbuf_attach_buf(bdev_io, dst_mbufs[crypto_index],
969 						   buf_addr, crypto_len);
970 			if (spdk_unlikely(phys_len == 0)) {
971 				rc = -EFAULT;
972 				goto error_attach_session;
973 			}
974 
975 			crypto_ops[crypto_index]->sym->m_dst = dst_mbufs[crypto_index];
976 			en_offset += phys_len;
977 
978 			/* Handle the case of page boundary. */
979 			remainder = crypto_len - phys_len;
980 			if (spdk_unlikely(remainder > 0)) {
981 				rc = mbuf_chain_remainder(bdev_io, dst_mbufs[crypto_index],
982 							  buf_addr + phys_len, remainder);
983 				if (spdk_unlikely(rc)) {
984 					goto error_attach_session;
985 				}
986 				en_offset += remainder;
987 			}
988 
989 			/* Attach the crypto session to the operation */
990 			rc = rte_crypto_op_attach_sym_session(crypto_ops[crypto_index],
991 							      io_ctx->crypto_bdev->session_encrypt);
992 			if (rc) {
993 				rc = -EINVAL;
994 				goto error_attach_session;
995 			}
996 		} else {
997 			crypto_ops[crypto_index]->sym->m_dst = NULL;
998 
999 			/* Attach the crypto session to the operation */
1000 			rc = rte_crypto_op_attach_sym_session(crypto_ops[crypto_index],
1001 							      io_ctx->crypto_bdev->session_decrypt);
1002 			if (rc) {
1003 				rc = -EINVAL;
1004 				goto error_attach_session;
1005 			}
1006 		}
1007 
1008 		/* Subtract our running totals for the op in progress and the overall bdev io */
1009 		total_remaining -= crypto_len;
1010 		current_iov_remaining -= crypto_len;
1011 
1012 		/* move our current IOV pointer accordingly. */
1013 		current_iov += crypto_len;
1014 
1015 		/* move on to the next crypto operation */
1016 		crypto_index++;
1017 
1018 		/* If we're done with this IOV, move to the next one. */
1019 		if (current_iov_remaining == 0 && total_remaining > 0) {
1020 			iov_index++;
1021 			current_iov = bdev_io->u.bdev.iovs[iov_index].iov_base;
1022 			current_iov_remaining = bdev_io->u.bdev.iovs[iov_index].iov_len;
1023 		}
1024 	} while (total_remaining > 0);
1025 
1026 	/* Enqueue everything we've got but limit by the max number of descriptors we
1027 	 * configured the crypto device for.
1028 	 */
1029 	burst = spdk_min(cryop_cnt, io_ctx->crypto_bdev->qp_desc_nr);
1030 	num_enqueued_ops = rte_cryptodev_enqueue_burst(cdev_id, crypto_ch->device_qp->qp,
1031 			   &crypto_ops[0],
1032 			   burst);
1033 
1034 	/* Add this bdev_io to our outstanding list if any of its crypto ops made it. */
1035 	if (num_enqueued_ops > 0) {
1036 		TAILQ_INSERT_TAIL(&crypto_ch->pending_cry_ios, bdev_io, module_link);
1037 		io_ctx->on_pending_list = true;
1038 	}
1039 	/* We were unable to enqueue everything but did get some, so need to decide what
1040 	 * to do based on the status of the last op.
1041 	 */
1042 	if (num_enqueued_ops < cryop_cnt) {
1043 		switch (crypto_ops[num_enqueued_ops]->status) {
1044 		case RTE_CRYPTO_OP_STATUS_NOT_PROCESSED:
1045 			/* Queue them up on a linked list to be resubmitted via the poller. */
1046 			for (crypto_index = num_enqueued_ops; crypto_index < cryop_cnt; crypto_index++) {
1047 				op_to_queue = (struct vbdev_crypto_op *)rte_crypto_op_ctod_offset(crypto_ops[crypto_index],
1048 						uint8_t *, QUEUED_OP_OFFSET);
1049 				op_to_queue->cdev_id = cdev_id;
1050 				op_to_queue->qp = crypto_ch->device_qp->qp;
1051 				op_to_queue->crypto_op = crypto_ops[crypto_index];
1052 				op_to_queue->bdev_io = bdev_io;
1053 				TAILQ_INSERT_TAIL(&crypto_ch->queued_cry_ops,
1054 						  op_to_queue,
1055 						  link);
1056 			}
1057 			break;
1058 		default:
1059 			/* For all other statuses, set the io_ctx bdev_io status so that
1060 			 * the poller will pick the failure up for the overall bdev status.
1061 			 */
1062 			io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_FAILED;
1063 			if (num_enqueued_ops == 0) {
1064 				/* If nothing was enqueued, but the last one wasn't because of
1065 				 * busy, fail it now as the poller won't know anything about it.
1066 				 */
1067 				rc = -EINVAL;
1068 				goto error_attach_session;
1069 			}
1070 			break;
1071 		}
1072 	}
1073 
1074 	return rc;
1075 
1076 	/* Error cleanup paths. */
1077 error_attach_session:
1078 error_get_ops:
1079 	if (crypto_op == RTE_CRYPTO_CIPHER_OP_ENCRYPT) {
1080 		/* This also releases chained mbufs if any. */
1081 		rte_pktmbuf_free_bulk(dst_mbufs, cryop_cnt);
1082 	}
1083 	if (allocated > 0) {
1084 		rte_mempool_put_bulk(g_crypto_op_mp, (void **)crypto_ops,
1085 				     allocated);
1086 	}
1087 error_get_dst:
1088 	/* This also releases chained mbufs if any. */
1089 	rte_pktmbuf_free_bulk(src_mbufs, cryop_cnt);
1090 	return rc;
1091 }
1092 
1093 /* This function is called after all channels have been quiesced following
1094  * a bdev reset.
1095  */
1096 static void
1097 _ch_quiesce_done(struct spdk_io_channel_iter *i, int status)
1098 {
1099 	struct crypto_bdev_io *io_ctx = spdk_io_channel_iter_get_ctx(i);
1100 
1101 	assert(TAILQ_EMPTY(&io_ctx->crypto_ch->pending_cry_ios));
1102 	assert(io_ctx->orig_io != NULL);
1103 
1104 	spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
1105 }
1106 
1107 /* This function is called per channel to quiesce IOs before completing a
1108  * bdev reset that we received.
1109  */
1110 static void
1111 _ch_quiesce(struct spdk_io_channel_iter *i)
1112 {
1113 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1114 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1115 
1116 	crypto_ch->iter = i;
1117 	/* When the poller runs, it will see the non-NULL iter and handle
1118 	 * the quiesce.
1119 	 */
1120 }
1121 
1122 /* Completion callback for IO that were issued from this bdev other than read/write.
1123  * They have their own for readability.
1124  */
1125 static void
1126 _complete_internal_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1127 {
1128 	struct spdk_bdev_io *orig_io = cb_arg;
1129 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1130 
1131 	if (bdev_io->type == SPDK_BDEV_IO_TYPE_RESET) {
1132 		struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1133 
1134 		assert(orig_io == orig_ctx->orig_io);
1135 
1136 		spdk_bdev_free_io(bdev_io);
1137 
1138 		spdk_for_each_channel(orig_ctx->crypto_bdev,
1139 				      _ch_quiesce,
1140 				      orig_ctx,
1141 				      _ch_quiesce_done);
1142 		return;
1143 	}
1144 
1145 	spdk_bdev_io_complete(orig_io, status);
1146 	spdk_bdev_free_io(bdev_io);
1147 }
1148 
1149 /* Completion callback for writes that were issued from this bdev. */
1150 static void
1151 _complete_internal_write(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1152 {
1153 	struct spdk_bdev_io *orig_io = cb_arg;
1154 	int status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
1155 	struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1156 
1157 	spdk_bdev_io_put_aux_buf(orig_io, orig_ctx->aux_buf_raw);
1158 
1159 	spdk_bdev_io_complete(orig_io, status);
1160 	spdk_bdev_free_io(bdev_io);
1161 }
1162 
1163 /* Completion callback for reads that were issued from this bdev. */
1164 static void
1165 _complete_internal_read(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
1166 {
1167 	struct spdk_bdev_io *orig_io = cb_arg;
1168 	struct crypto_bdev_io *orig_ctx = (struct crypto_bdev_io *)orig_io->driver_ctx;
1169 
1170 	if (success) {
1171 
1172 		/* Save off this bdev_io so it can be freed after decryption. */
1173 		orig_ctx->read_io = bdev_io;
1174 
1175 		if (!_crypto_operation(orig_io, RTE_CRYPTO_CIPHER_OP_DECRYPT, NULL)) {
1176 			return;
1177 		} else {
1178 			SPDK_ERRLOG("Failed to decrypt!\n");
1179 		}
1180 	} else {
1181 		SPDK_ERRLOG("Failed to read prior to decrypting!\n");
1182 	}
1183 
1184 	spdk_bdev_io_complete(orig_io, SPDK_BDEV_IO_STATUS_FAILED);
1185 	spdk_bdev_free_io(bdev_io);
1186 }
1187 
1188 static void
1189 vbdev_crypto_resubmit_io(void *arg)
1190 {
1191 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
1192 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1193 
1194 	vbdev_crypto_submit_request(io_ctx->ch, bdev_io);
1195 }
1196 
1197 static void
1198 vbdev_crypto_queue_io(struct spdk_bdev_io *bdev_io)
1199 {
1200 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1201 	int rc;
1202 
1203 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
1204 	io_ctx->bdev_io_wait.cb_fn = vbdev_crypto_resubmit_io;
1205 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
1206 
1207 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->crypto_ch->base_ch, &io_ctx->bdev_io_wait);
1208 	if (rc != 0) {
1209 		SPDK_ERRLOG("Queue io failed in vbdev_crypto_queue_io, rc=%d.\n", rc);
1210 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1211 	}
1212 }
1213 
1214 /* Callback for getting a buf from the bdev pool in the event that the caller passed
1215  * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
1216  * beneath us before we're done with it.
1217  */
1218 static void
1219 crypto_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
1220 		       bool success)
1221 {
1222 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
1223 					   crypto_bdev);
1224 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1225 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1226 	int rc;
1227 
1228 	if (!success) {
1229 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1230 		return;
1231 	}
1232 
1233 	rc = spdk_bdev_readv_blocks(crypto_bdev->base_desc, crypto_ch->base_ch, bdev_io->u.bdev.iovs,
1234 				    bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
1235 				    bdev_io->u.bdev.num_blocks, _complete_internal_read,
1236 				    bdev_io);
1237 	if (rc != 0) {
1238 		if (rc == -ENOMEM) {
1239 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1240 			io_ctx->ch = ch;
1241 			vbdev_crypto_queue_io(bdev_io);
1242 		} else {
1243 			SPDK_ERRLOG("Failed to submit bdev_io!\n");
1244 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1245 		}
1246 	}
1247 }
1248 
1249 /* For encryption we don't want to encrypt the data in place as the host isn't
1250  * expecting us to mangle its data buffers so we need to encrypt into the bdev
1251  * aux buffer, then we can use that as the source for the disk data transfer.
1252  */
1253 static void
1254 crypto_write_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
1255 			void *aux_buf)
1256 {
1257 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1258 	int rc = 0;
1259 
1260 	rc = _crypto_operation(bdev_io, RTE_CRYPTO_CIPHER_OP_ENCRYPT, aux_buf);
1261 	if (rc != 0) {
1262 		spdk_bdev_io_put_aux_buf(bdev_io, aux_buf);
1263 		if (rc == -ENOMEM) {
1264 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1265 			io_ctx->ch = ch;
1266 			vbdev_crypto_queue_io(bdev_io);
1267 		} else {
1268 			SPDK_ERRLOG("Failed to submit bdev_io!\n");
1269 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1270 		}
1271 	}
1272 }
1273 
1274 /* Called when someone submits IO to this crypto vbdev. For IO's not relevant to crypto,
1275  * we're simply passing it on here via SPDK IO calls which in turn allocate another bdev IO
1276  * and call our cpl callback provided below along with the original bdev_io so that we can
1277  * complete it once this IO completes. For crypto operations, we'll either encrypt it first
1278  * (writes) then call back into bdev to submit it or we'll submit a read and then catch it
1279  * on the way back for decryption.
1280  */
1281 static void
1282 vbdev_crypto_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
1283 {
1284 	struct vbdev_crypto *crypto_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_crypto,
1285 					   crypto_bdev);
1286 	struct crypto_io_channel *crypto_ch = spdk_io_channel_get_ctx(ch);
1287 	struct crypto_bdev_io *io_ctx = (struct crypto_bdev_io *)bdev_io->driver_ctx;
1288 	int rc = 0;
1289 
1290 	memset(io_ctx, 0, sizeof(struct crypto_bdev_io));
1291 	io_ctx->crypto_bdev = crypto_bdev;
1292 	io_ctx->crypto_ch = crypto_ch;
1293 	io_ctx->orig_io = bdev_io;
1294 	io_ctx->bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
1295 
1296 	switch (bdev_io->type) {
1297 	case SPDK_BDEV_IO_TYPE_READ:
1298 		spdk_bdev_io_get_buf(bdev_io, crypto_read_get_buf_cb,
1299 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
1300 		break;
1301 	case SPDK_BDEV_IO_TYPE_WRITE:
1302 		/* Tell the bdev layer that we need an aux buf in addition to the data
1303 		 * buf already associated with the bdev.
1304 		 */
1305 		spdk_bdev_io_get_aux_buf(bdev_io, crypto_write_get_buf_cb);
1306 		break;
1307 	case SPDK_BDEV_IO_TYPE_UNMAP:
1308 		rc = spdk_bdev_unmap_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
1309 					    bdev_io->u.bdev.offset_blocks,
1310 					    bdev_io->u.bdev.num_blocks,
1311 					    _complete_internal_io, bdev_io);
1312 		break;
1313 	case SPDK_BDEV_IO_TYPE_FLUSH:
1314 		rc = spdk_bdev_flush_blocks(crypto_bdev->base_desc, crypto_ch->base_ch,
1315 					    bdev_io->u.bdev.offset_blocks,
1316 					    bdev_io->u.bdev.num_blocks,
1317 					    _complete_internal_io, bdev_io);
1318 		break;
1319 	case SPDK_BDEV_IO_TYPE_RESET:
1320 		rc = spdk_bdev_reset(crypto_bdev->base_desc, crypto_ch->base_ch,
1321 				     _complete_internal_io, bdev_io);
1322 		break;
1323 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1324 	default:
1325 		SPDK_ERRLOG("crypto: unknown I/O type %d\n", bdev_io->type);
1326 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1327 		return;
1328 	}
1329 
1330 	if (rc != 0) {
1331 		if (rc == -ENOMEM) {
1332 			SPDK_DEBUGLOG(vbdev_crypto, "No memory, queue the IO.\n");
1333 			io_ctx->ch = ch;
1334 			vbdev_crypto_queue_io(bdev_io);
1335 		} else {
1336 			SPDK_ERRLOG("Failed to submit bdev_io!\n");
1337 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
1338 		}
1339 	}
1340 }
1341 
1342 /* We'll just call the base bdev and let it answer except for WZ command which
1343  * we always say we don't support so that the bdev layer will actually send us
1344  * real writes that we can encrypt.
1345  */
1346 static bool
1347 vbdev_crypto_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1348 {
1349 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1350 
1351 	switch (io_type) {
1352 	case SPDK_BDEV_IO_TYPE_WRITE:
1353 	case SPDK_BDEV_IO_TYPE_UNMAP:
1354 	case SPDK_BDEV_IO_TYPE_RESET:
1355 	case SPDK_BDEV_IO_TYPE_READ:
1356 	case SPDK_BDEV_IO_TYPE_FLUSH:
1357 		return spdk_bdev_io_type_supported(crypto_bdev->base_bdev, io_type);
1358 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
1359 	/* Force the bdev layer to issue actual writes of zeroes so we can
1360 	 * encrypt them as regular writes.
1361 	 */
1362 	default:
1363 		return false;
1364 	}
1365 }
1366 
1367 /* Callback for unregistering the IO device. */
1368 static void
1369 _device_unregister_cb(void *io_device)
1370 {
1371 	struct vbdev_crypto *crypto_bdev = io_device;
1372 
1373 	/* Done with this crypto_bdev. */
1374 	rte_cryptodev_sym_session_free(crypto_bdev->session_decrypt);
1375 	rte_cryptodev_sym_session_free(crypto_bdev->session_encrypt);
1376 	crypto_bdev->opts = NULL;
1377 	free(crypto_bdev->crypto_bdev.name);
1378 	free(crypto_bdev);
1379 }
1380 
1381 /* Wrapper for the bdev close operation. */
1382 static void
1383 _vbdev_crypto_destruct(void *ctx)
1384 {
1385 	struct spdk_bdev_desc *desc = ctx;
1386 
1387 	spdk_bdev_close(desc);
1388 }
1389 
1390 /* Called after we've unregistered following a hot remove callback.
1391  * Our finish entry point will be called next.
1392  */
1393 static int
1394 vbdev_crypto_destruct(void *ctx)
1395 {
1396 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1397 
1398 	/* Remove this device from the internal list */
1399 	TAILQ_REMOVE(&g_vbdev_crypto, crypto_bdev, link);
1400 
1401 	/* Unclaim the underlying bdev. */
1402 	spdk_bdev_module_release_bdev(crypto_bdev->base_bdev);
1403 
1404 	/* Close the underlying bdev on its same opened thread. */
1405 	if (crypto_bdev->thread && crypto_bdev->thread != spdk_get_thread()) {
1406 		spdk_thread_send_msg(crypto_bdev->thread, _vbdev_crypto_destruct, crypto_bdev->base_desc);
1407 	} else {
1408 		spdk_bdev_close(crypto_bdev->base_desc);
1409 	}
1410 
1411 	/* Unregister the io_device. */
1412 	spdk_io_device_unregister(crypto_bdev, _device_unregister_cb);
1413 
1414 	g_number_of_claimed_volumes--;
1415 
1416 	return 0;
1417 }
1418 
1419 /* We supplied this as an entry point for upper layers who want to communicate to this
1420  * bdev.  This is how they get a channel. We are passed the same context we provided when
1421  * we created our crypto vbdev in examine() which, for this bdev, is the address of one of
1422  * our context nodes. From here we'll ask the SPDK channel code to fill out our channel
1423  * struct and we'll keep it in our crypto node.
1424  */
1425 static struct spdk_io_channel *
1426 vbdev_crypto_get_io_channel(void *ctx)
1427 {
1428 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1429 
1430 	/* The IO channel code will allocate a channel for us which consists of
1431 	 * the SPDK channel structure plus the size of our crypto_io_channel struct
1432 	 * that we passed in when we registered our IO device. It will then call
1433 	 * our channel create callback to populate any elements that we need to
1434 	 * update.
1435 	 */
1436 	return spdk_get_io_channel(crypto_bdev);
1437 }
1438 
1439 /* This is the output for bdev_get_bdevs() for this vbdev */
1440 static int
1441 vbdev_crypto_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1442 {
1443 	struct vbdev_crypto *crypto_bdev = (struct vbdev_crypto *)ctx;
1444 	char *hexkey = NULL, *hexkey2 = NULL;
1445 	int rc = 0;
1446 
1447 	hexkey = spdk_hexlify(crypto_bdev->opts->key,
1448 			      crypto_bdev->opts->key_size);
1449 	if (!hexkey) {
1450 		return -ENOMEM;
1451 	}
1452 
1453 	if (crypto_bdev->opts->key2) {
1454 		hexkey2 = spdk_hexlify(crypto_bdev->opts->key2,
1455 				       crypto_bdev->opts->key2_size);
1456 		if (!hexkey2) {
1457 			rc = -ENOMEM;
1458 			goto out_err;
1459 		}
1460 	}
1461 
1462 	spdk_json_write_name(w, "crypto");
1463 	spdk_json_write_object_begin(w);
1464 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(crypto_bdev->base_bdev));
1465 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&crypto_bdev->crypto_bdev));
1466 	spdk_json_write_named_string(w, "crypto_pmd", crypto_bdev->opts->drv_name);
1467 	spdk_json_write_named_string(w, "key", hexkey);
1468 	if (hexkey2) {
1469 		spdk_json_write_named_string(w, "key2", hexkey2);
1470 	}
1471 	spdk_json_write_named_string(w, "cipher", crypto_bdev->opts->cipher);
1472 	spdk_json_write_object_end(w);
1473 out_err:
1474 	if (hexkey) {
1475 		memset(hexkey, 0, strlen(hexkey));
1476 		free(hexkey);
1477 	}
1478 	if (hexkey2) {
1479 		memset(hexkey2, 0, strlen(hexkey2));
1480 		free(hexkey2);
1481 	}
1482 	return rc;
1483 }
1484 
1485 static int
1486 vbdev_crypto_config_json(struct spdk_json_write_ctx *w)
1487 {
1488 	struct vbdev_crypto *crypto_bdev;
1489 
1490 	TAILQ_FOREACH(crypto_bdev, &g_vbdev_crypto, link) {
1491 		char *hexkey = NULL, *hexkey2 = NULL;
1492 
1493 		hexkey = spdk_hexlify(crypto_bdev->opts->key,
1494 				      crypto_bdev->opts->key_size);
1495 		if (!hexkey) {
1496 			return -ENOMEM;
1497 		}
1498 
1499 		if (crypto_bdev->opts->key2) {
1500 			hexkey2 = spdk_hexlify(crypto_bdev->opts->key2,
1501 					       crypto_bdev->opts->key2_size);
1502 			if (!hexkey2) {
1503 				memset(hexkey, 0, strlen(hexkey));
1504 				free(hexkey);
1505 				return -ENOMEM;
1506 			}
1507 		}
1508 
1509 		spdk_json_write_object_begin(w);
1510 		spdk_json_write_named_string(w, "method", "bdev_crypto_create");
1511 		spdk_json_write_named_object_begin(w, "params");
1512 		spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(crypto_bdev->base_bdev));
1513 		spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&crypto_bdev->crypto_bdev));
1514 		spdk_json_write_named_string(w, "crypto_pmd", crypto_bdev->opts->drv_name);
1515 		spdk_json_write_named_string(w, "key", hexkey);
1516 		if (hexkey2) {
1517 			spdk_json_write_named_string(w, "key2", hexkey2);
1518 		}
1519 		spdk_json_write_named_string(w, "cipher", crypto_bdev->opts->cipher);
1520 		spdk_json_write_object_end(w);
1521 		spdk_json_write_object_end(w);
1522 
1523 		if (hexkey) {
1524 			memset(hexkey, 0, strlen(hexkey));
1525 			free(hexkey);
1526 		}
1527 		if (hexkey2) {
1528 			memset(hexkey2, 0, strlen(hexkey2));
1529 			free(hexkey2);
1530 		}
1531 	}
1532 	return 0;
1533 }
1534 
1535 /* Helper function for the channel creation callback. */
1536 static void
1537 _assign_device_qp(struct vbdev_crypto *crypto_bdev, struct device_qp *device_qp,
1538 		  struct crypto_io_channel *crypto_ch)
1539 {
1540 	pthread_mutex_lock(&g_device_qp_lock);
1541 	if (strcmp(crypto_bdev->opts->drv_name, QAT) == 0) {
1542 		/* For some QAT devices, the optimal qp to use is every 32nd as this spreads the
1543 		 * workload out over the multiple virtual functions in the device. For the devices
1544 		 * where this isn't the case, it doesn't hurt.
1545 		 */
1546 		TAILQ_FOREACH(device_qp, &g_device_qp_qat, link) {
1547 			if (device_qp->index != g_next_qat_index) {
1548 				continue;
1549 			}
1550 			if (device_qp->in_use == false) {
1551 				crypto_ch->device_qp = device_qp;
1552 				device_qp->in_use = true;
1553 				g_next_qat_index = (g_next_qat_index + QAT_VF_SPREAD) % g_qat_total_qp;
1554 				break;
1555 			} else {
1556 				/* if the preferred index is used, skip to the next one in this set. */
1557 				g_next_qat_index = (g_next_qat_index + 1) % g_qat_total_qp;
1558 			}
1559 		}
1560 	} else if (strcmp(crypto_bdev->opts->drv_name, AESNI_MB) == 0) {
1561 		TAILQ_FOREACH(device_qp, &g_device_qp_aesni_mb, link) {
1562 			if (device_qp->in_use == false) {
1563 				crypto_ch->device_qp = device_qp;
1564 				device_qp->in_use = true;
1565 				break;
1566 			}
1567 		}
1568 	} else if (strcmp(crypto_bdev->opts->drv_name, MLX5) == 0) {
1569 		TAILQ_FOREACH(device_qp, &g_device_qp_mlx5, link) {
1570 			if (device_qp->in_use == false) {
1571 				crypto_ch->device_qp = device_qp;
1572 				device_qp->in_use = true;
1573 				break;
1574 			}
1575 		}
1576 	}
1577 	pthread_mutex_unlock(&g_device_qp_lock);
1578 }
1579 
1580 /* We provide this callback for the SPDK channel code to create a channel using
1581  * the channel struct we provided in our module get_io_channel() entry point. Here
1582  * we get and save off an underlying base channel of the device below us so that
1583  * we can communicate with the base bdev on a per channel basis. We also register the
1584  * poller used to complete crypto operations from the device.
1585  */
1586 static int
1587 crypto_bdev_ch_create_cb(void *io_device, void *ctx_buf)
1588 {
1589 	struct crypto_io_channel *crypto_ch = ctx_buf;
1590 	struct vbdev_crypto *crypto_bdev = io_device;
1591 	struct device_qp *device_qp = NULL;
1592 
1593 	crypto_ch->base_ch = spdk_bdev_get_io_channel(crypto_bdev->base_desc);
1594 	crypto_ch->poller = SPDK_POLLER_REGISTER(crypto_dev_poller, crypto_ch, 0);
1595 	crypto_ch->device_qp = NULL;
1596 
1597 	/* Assign a device/qp combination that is unique per channel per PMD. */
1598 	_assign_device_qp(crypto_bdev, device_qp, crypto_ch);
1599 	assert(crypto_ch->device_qp);
1600 
1601 	/* We use this queue to track outstanding IO in our layer. */
1602 	TAILQ_INIT(&crypto_ch->pending_cry_ios);
1603 
1604 	/* We use this to queue up crypto ops when the device is busy. */
1605 	TAILQ_INIT(&crypto_ch->queued_cry_ops);
1606 
1607 	return 0;
1608 }
1609 
1610 /* We provide this callback for the SPDK channel code to destroy a channel
1611  * created with our create callback. We just need to undo anything we did
1612  * when we created.
1613  */
1614 static void
1615 crypto_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
1616 {
1617 	struct crypto_io_channel *crypto_ch = ctx_buf;
1618 
1619 	pthread_mutex_lock(&g_device_qp_lock);
1620 	crypto_ch->device_qp->in_use = false;
1621 	pthread_mutex_unlock(&g_device_qp_lock);
1622 
1623 	spdk_poller_unregister(&crypto_ch->poller);
1624 	spdk_put_io_channel(crypto_ch->base_ch);
1625 }
1626 
1627 /* Create the association from the bdev and vbdev name and insert
1628  * on the global list. */
1629 static int
1630 vbdev_crypto_insert_name(struct vbdev_crypto_opts *opts, struct bdev_names **out)
1631 {
1632 	struct bdev_names *name;
1633 	bool found = false;
1634 	int j;
1635 
1636 	assert(opts);
1637 	assert(out);
1638 
1639 	TAILQ_FOREACH(name, &g_bdev_names, link) {
1640 		if (strcmp(opts->vbdev_name, name->opts->vbdev_name) == 0) {
1641 			SPDK_ERRLOG("Crypto bdev %s already exists\n", opts->vbdev_name);
1642 			return -EEXIST;
1643 		}
1644 	}
1645 
1646 	for (j = 0; j < MAX_NUM_DRV_TYPES ; j++) {
1647 		if (strcmp(opts->drv_name, g_driver_names[j]) == 0) {
1648 			found = true;
1649 			break;
1650 		}
1651 	}
1652 	if (!found) {
1653 		SPDK_ERRLOG("Crypto PMD type %s is not supported.\n", opts->drv_name);
1654 		return -EINVAL;
1655 	}
1656 
1657 	name = calloc(1, sizeof(struct bdev_names));
1658 	if (!name) {
1659 		SPDK_ERRLOG("Failed to allocate memory for bdev_names.\n");
1660 		return -ENOMEM;
1661 	}
1662 
1663 	name->opts = opts;
1664 	TAILQ_INSERT_TAIL(&g_bdev_names, name, link);
1665 	*out = name;
1666 
1667 	return 0;
1668 }
1669 
1670 void
1671 free_crypto_opts(struct vbdev_crypto_opts *opts)
1672 {
1673 	free(opts->bdev_name);
1674 	free(opts->vbdev_name);
1675 	free(opts->drv_name);
1676 	if (opts->xts_key) {
1677 		memset(opts->xts_key, 0,
1678 		       opts->key_size + opts->key2_size);
1679 		free(opts->xts_key);
1680 	}
1681 	memset(opts->key, 0, opts->key_size);
1682 	free(opts->key);
1683 	opts->key_size = 0;
1684 	if (opts->key2) {
1685 		memset(opts->key2, 0, opts->key2_size);
1686 		free(opts->key2);
1687 	}
1688 	opts->key2_size = 0;
1689 	free(opts);
1690 }
1691 
1692 static void
1693 vbdev_crypto_delete_name(struct bdev_names *name)
1694 {
1695 	TAILQ_REMOVE(&g_bdev_names, name, link);
1696 	if (name->opts) {
1697 		free_crypto_opts(name->opts);
1698 		name->opts = NULL;
1699 	}
1700 	free(name);
1701 }
1702 
1703 /* RPC entry point for crypto creation. */
1704 int
1705 create_crypto_disk(struct vbdev_crypto_opts *opts)
1706 {
1707 	struct bdev_names *name = NULL;
1708 	int rc;
1709 
1710 	rc = vbdev_crypto_insert_name(opts, &name);
1711 	if (rc) {
1712 		return rc;
1713 	}
1714 
1715 	rc = vbdev_crypto_claim(opts->bdev_name);
1716 	if (rc == -ENODEV) {
1717 		SPDK_NOTICELOG("vbdev creation deferred pending base bdev arrival\n");
1718 		rc = 0;
1719 	}
1720 
1721 	if (rc) {
1722 		assert(name != NULL);
1723 		/* In case of error we let the caller function to deallocate @opts
1724 		 * since it is its responsibiltiy. Setting name->opts = NULL let's
1725 		 * vbdev_crypto_delete_name() know it does not have to do anything
1726 		 * about @opts.
1727 		 */
1728 		name->opts = NULL;
1729 		vbdev_crypto_delete_name(name);
1730 	}
1731 	return rc;
1732 }
1733 
1734 /* Called at driver init time, parses config file to prepare for examine calls,
1735  * also fully initializes the crypto drivers.
1736  */
1737 static int
1738 vbdev_crypto_init(void)
1739 {
1740 	int rc = 0;
1741 
1742 	/* Fully configure both SW and HW drivers. */
1743 	rc = vbdev_crypto_init_crypto_drivers();
1744 	if (rc) {
1745 		SPDK_ERRLOG("Error setting up crypto devices\n");
1746 	}
1747 
1748 	return rc;
1749 }
1750 
1751 /* Called when the entire module is being torn down. */
1752 static void
1753 vbdev_crypto_finish(void)
1754 {
1755 	struct bdev_names *name;
1756 	struct vbdev_dev *device;
1757 
1758 	while ((name = TAILQ_FIRST(&g_bdev_names))) {
1759 		vbdev_crypto_delete_name(name);
1760 	}
1761 
1762 	while ((device = TAILQ_FIRST(&g_vbdev_devs))) {
1763 		TAILQ_REMOVE(&g_vbdev_devs, device, link);
1764 		release_vbdev_dev(device);
1765 	}
1766 	rte_vdev_uninit(AESNI_MB);
1767 
1768 	/* These are removed in release_vbdev_dev() */
1769 	assert(TAILQ_EMPTY(&g_device_qp_qat));
1770 	assert(TAILQ_EMPTY(&g_device_qp_aesni_mb));
1771 	assert(TAILQ_EMPTY(&g_device_qp_mlx5));
1772 
1773 	rte_mempool_free(g_crypto_op_mp);
1774 	rte_mempool_free(g_mbuf_mp);
1775 	rte_mempool_free(g_session_mp);
1776 	if (g_session_mp_priv != NULL) {
1777 		rte_mempool_free(g_session_mp_priv);
1778 	}
1779 }
1780 
1781 /* During init we'll be asked how much memory we'd like passed to us
1782  * in bev_io structures as context. Here's where we specify how
1783  * much context we want per IO.
1784  */
1785 static int
1786 vbdev_crypto_get_ctx_size(void)
1787 {
1788 	return sizeof(struct crypto_bdev_io);
1789 }
1790 
1791 static void
1792 vbdev_crypto_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
1793 {
1794 	struct vbdev_crypto *crypto_bdev, *tmp;
1795 
1796 	TAILQ_FOREACH_SAFE(crypto_bdev, &g_vbdev_crypto, link, tmp) {
1797 		if (bdev_find == crypto_bdev->base_bdev) {
1798 			spdk_bdev_unregister(&crypto_bdev->crypto_bdev, NULL, NULL);
1799 		}
1800 	}
1801 }
1802 
1803 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
1804 static void
1805 vbdev_crypto_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1806 				void *event_ctx)
1807 {
1808 	switch (type) {
1809 	case SPDK_BDEV_EVENT_REMOVE:
1810 		vbdev_crypto_base_bdev_hotremove_cb(bdev);
1811 		break;
1812 	default:
1813 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1814 		break;
1815 	}
1816 }
1817 
1818 static void
1819 vbdev_crypto_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1820 {
1821 	/* No config per bdev needed */
1822 }
1823 
1824 /* When we register our bdev this is how we specify our entry points. */
1825 static const struct spdk_bdev_fn_table vbdev_crypto_fn_table = {
1826 	.destruct		= vbdev_crypto_destruct,
1827 	.submit_request		= vbdev_crypto_submit_request,
1828 	.io_type_supported	= vbdev_crypto_io_type_supported,
1829 	.get_io_channel		= vbdev_crypto_get_io_channel,
1830 	.dump_info_json		= vbdev_crypto_dump_info_json,
1831 	.write_config_json	= vbdev_crypto_write_config_json
1832 };
1833 
1834 static struct spdk_bdev_module crypto_if = {
1835 	.name = "crypto",
1836 	.module_init = vbdev_crypto_init,
1837 	.get_ctx_size = vbdev_crypto_get_ctx_size,
1838 	.examine_config = vbdev_crypto_examine,
1839 	.module_fini = vbdev_crypto_finish,
1840 	.config_json = vbdev_crypto_config_json
1841 };
1842 
1843 SPDK_BDEV_MODULE_REGISTER(crypto, &crypto_if)
1844 
1845 static int
1846 vbdev_crypto_claim(const char *bdev_name)
1847 {
1848 	struct bdev_names *name;
1849 	struct vbdev_crypto *vbdev;
1850 	struct vbdev_dev *device;
1851 	struct spdk_bdev *bdev;
1852 	bool found = false;
1853 	uint8_t key_size;
1854 	int rc = 0;
1855 
1856 	if (g_number_of_claimed_volumes >= MAX_CRYPTO_VOLUMES) {
1857 		SPDK_DEBUGLOG(vbdev_crypto, "Reached max number of claimed volumes\n");
1858 		return -EINVAL;
1859 	}
1860 	g_number_of_claimed_volumes++;
1861 
1862 	/* Check our list of names from config versus this bdev and if
1863 	 * there's a match, create the crypto_bdev & bdev accordingly.
1864 	 */
1865 	TAILQ_FOREACH(name, &g_bdev_names, link) {
1866 		if (strcmp(name->opts->bdev_name, bdev_name) != 0) {
1867 			continue;
1868 		}
1869 		SPDK_DEBUGLOG(vbdev_crypto, "Match on %s\n", bdev_name);
1870 
1871 		vbdev = calloc(1, sizeof(struct vbdev_crypto));
1872 		if (!vbdev) {
1873 			SPDK_ERRLOG("Failed to allocate memory for crypto_bdev.\n");
1874 			rc = -ENOMEM;
1875 			goto error_vbdev_alloc;
1876 		}
1877 		vbdev->crypto_bdev.product_name = "crypto";
1878 
1879 		vbdev->crypto_bdev.name = strdup(name->opts->vbdev_name);
1880 		if (!vbdev->crypto_bdev.name) {
1881 			SPDK_ERRLOG("Failed to allocate memory for crypto_bdev name.\n");
1882 			rc = -ENOMEM;
1883 			goto error_bdev_name;
1884 		}
1885 
1886 		rc = spdk_bdev_open_ext(bdev_name, true, vbdev_crypto_base_bdev_event_cb,
1887 					NULL, &vbdev->base_desc);
1888 		if (rc) {
1889 			if (rc != -ENODEV) {
1890 				SPDK_ERRLOG("Failed to open bdev %s: error %d\n", bdev_name, rc);
1891 			}
1892 			goto error_open;
1893 		}
1894 
1895 		bdev = spdk_bdev_desc_get_bdev(vbdev->base_desc);
1896 		vbdev->base_bdev = bdev;
1897 
1898 		if (strcmp(name->opts->drv_name, MLX5) == 0) {
1899 			vbdev->qp_desc_nr = CRYPTO_QP_DESCRIPTORS_MLX5;
1900 		} else {
1901 			vbdev->qp_desc_nr = CRYPTO_QP_DESCRIPTORS;
1902 		}
1903 
1904 		vbdev->crypto_bdev.write_cache = bdev->write_cache;
1905 		if (strcmp(name->opts->drv_name, QAT) == 0) {
1906 			vbdev->crypto_bdev.required_alignment =
1907 				spdk_max(spdk_u32log2(bdev->blocklen), bdev->required_alignment);
1908 			SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
1909 				       vbdev->crypto_bdev.required_alignment);
1910 			SPDK_NOTICELOG("QAT using cipher: %s\n", name->opts->cipher);
1911 		} else if (strcmp(name->opts->drv_name, MLX5) == 0) {
1912 			vbdev->crypto_bdev.required_alignment = bdev->required_alignment;
1913 			SPDK_NOTICELOG("MLX5 using cipher: %s\n", name->opts->cipher);
1914 		} else {
1915 			vbdev->crypto_bdev.required_alignment = bdev->required_alignment;
1916 			SPDK_NOTICELOG("AESNI_MB using cipher: %s\n", name->opts->cipher);
1917 		}
1918 		vbdev->cipher_xform.cipher.iv.length = IV_LENGTH;
1919 
1920 		/* Note: CRYPTO_MAX_IO is in units of bytes, optimal_io_boundary is
1921 		 * in units of blocks.
1922 		 */
1923 		if (bdev->optimal_io_boundary > 0) {
1924 			vbdev->crypto_bdev.optimal_io_boundary =
1925 				spdk_min((CRYPTO_MAX_IO / bdev->blocklen), bdev->optimal_io_boundary);
1926 		} else {
1927 			vbdev->crypto_bdev.optimal_io_boundary = (CRYPTO_MAX_IO / bdev->blocklen);
1928 		}
1929 		vbdev->crypto_bdev.split_on_optimal_io_boundary = true;
1930 		vbdev->crypto_bdev.blocklen = bdev->blocklen;
1931 		vbdev->crypto_bdev.blockcnt = bdev->blockcnt;
1932 
1933 		/* This is the context that is passed to us when the bdev
1934 		 * layer calls in so we'll save our crypto_bdev node here.
1935 		 */
1936 		vbdev->crypto_bdev.ctxt = vbdev;
1937 		vbdev->crypto_bdev.fn_table = &vbdev_crypto_fn_table;
1938 		vbdev->crypto_bdev.module = &crypto_if;
1939 
1940 		/* Assign crypto opts from the name. The pointer is valid up to the point
1941 		 * the module is unloaded and all names removed from the list. */
1942 		vbdev->opts = name->opts;
1943 
1944 		TAILQ_INSERT_TAIL(&g_vbdev_crypto, vbdev, link);
1945 
1946 		spdk_io_device_register(vbdev, crypto_bdev_ch_create_cb, crypto_bdev_ch_destroy_cb,
1947 					sizeof(struct crypto_io_channel), vbdev->crypto_bdev.name);
1948 
1949 		/* Save the thread where the base device is opened */
1950 		vbdev->thread = spdk_get_thread();
1951 
1952 		rc = spdk_bdev_module_claim_bdev(bdev, vbdev->base_desc, vbdev->crypto_bdev.module);
1953 		if (rc) {
1954 			SPDK_ERRLOG("Failed to claim bdev %s\n", spdk_bdev_get_name(bdev));
1955 			goto error_claim;
1956 		}
1957 
1958 		/* To init the session we have to get the cryptoDev device ID for this vbdev */
1959 		TAILQ_FOREACH(device, &g_vbdev_devs, link) {
1960 			if (strcmp(device->cdev_info.driver_name, vbdev->opts->drv_name) == 0) {
1961 				found = true;
1962 				break;
1963 			}
1964 		}
1965 		if (found == false) {
1966 			SPDK_ERRLOG("Failed to match crypto device driver to crypto vbdev.\n");
1967 			rc = -EINVAL;
1968 			goto error_cant_find_devid;
1969 		}
1970 
1971 		/* Get sessions. */
1972 		vbdev->session_encrypt = rte_cryptodev_sym_session_create(g_session_mp);
1973 		if (NULL == vbdev->session_encrypt) {
1974 			SPDK_ERRLOG("Failed to create encrypt crypto session.\n");
1975 			rc = -EINVAL;
1976 			goto error_session_en_create;
1977 		}
1978 
1979 		vbdev->session_decrypt = rte_cryptodev_sym_session_create(g_session_mp);
1980 		if (NULL == vbdev->session_decrypt) {
1981 			SPDK_ERRLOG("Failed to create decrypt crypto session.\n");
1982 			rc = -EINVAL;
1983 			goto error_session_de_create;
1984 		}
1985 
1986 		/* Init our per vbdev xform with the desired cipher options. */
1987 		vbdev->cipher_xform.type = RTE_CRYPTO_SYM_XFORM_CIPHER;
1988 		vbdev->cipher_xform.cipher.iv.offset = IV_OFFSET;
1989 		if (strcmp(vbdev->opts->cipher, AES_CBC) == 0) {
1990 			vbdev->cipher_xform.cipher.key.data = vbdev->opts->key;
1991 			vbdev->cipher_xform.cipher.key.length = vbdev->opts->key_size;
1992 			vbdev->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_CBC;
1993 		} else if (strcmp(vbdev->opts->cipher, AES_XTS) == 0) {
1994 			key_size = vbdev->opts->key_size + vbdev->opts->key2_size;
1995 			vbdev->cipher_xform.cipher.key.data = vbdev->opts->xts_key;
1996 			vbdev->cipher_xform.cipher.key.length = key_size;
1997 			vbdev->cipher_xform.cipher.algo = RTE_CRYPTO_CIPHER_AES_XTS;
1998 		} else {
1999 			SPDK_ERRLOG("Invalid cipher name %s.\n", vbdev->opts->cipher);
2000 			rc = -EINVAL;
2001 			goto error_session_de_create;
2002 		}
2003 		vbdev->cipher_xform.cipher.iv.length = IV_LENGTH;
2004 
2005 		vbdev->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_ENCRYPT;
2006 		rc = rte_cryptodev_sym_session_init(device->cdev_id, vbdev->session_encrypt,
2007 						    &vbdev->cipher_xform,
2008 						    g_session_mp_priv ? g_session_mp_priv : g_session_mp);
2009 		if (rc < 0) {
2010 			SPDK_ERRLOG("Failed to init encrypt session: error %d\n", rc);
2011 			rc = -EINVAL;
2012 			goto error_session_init;
2013 		}
2014 
2015 		vbdev->cipher_xform.cipher.op = RTE_CRYPTO_CIPHER_OP_DECRYPT;
2016 		rc = rte_cryptodev_sym_session_init(device->cdev_id, vbdev->session_decrypt,
2017 						    &vbdev->cipher_xform,
2018 						    g_session_mp_priv ? g_session_mp_priv : g_session_mp);
2019 		if (rc < 0) {
2020 			SPDK_ERRLOG("Failed to init decrypt session: error %d\n", rc);
2021 			rc = -EINVAL;
2022 			goto error_session_init;
2023 		}
2024 
2025 		rc = spdk_bdev_register(&vbdev->crypto_bdev);
2026 		if (rc < 0) {
2027 			SPDK_ERRLOG("Failed to register vbdev: error %d\n", rc);
2028 			rc = -EINVAL;
2029 			goto error_bdev_register;
2030 		}
2031 		SPDK_DEBUGLOG(vbdev_crypto, "Registered io_device and virtual bdev for: %s\n",
2032 			      vbdev->opts->vbdev_name);
2033 		break;
2034 	}
2035 
2036 	return rc;
2037 
2038 	/* Error cleanup paths. */
2039 error_bdev_register:
2040 error_session_init:
2041 	rte_cryptodev_sym_session_free(vbdev->session_decrypt);
2042 error_session_de_create:
2043 	rte_cryptodev_sym_session_free(vbdev->session_encrypt);
2044 error_session_en_create:
2045 error_cant_find_devid:
2046 	spdk_bdev_module_release_bdev(vbdev->base_bdev);
2047 error_claim:
2048 	TAILQ_REMOVE(&g_vbdev_crypto, vbdev, link);
2049 	spdk_io_device_unregister(vbdev, NULL);
2050 	spdk_bdev_close(vbdev->base_desc);
2051 error_open:
2052 	free(vbdev->crypto_bdev.name);
2053 error_bdev_name:
2054 	free(vbdev);
2055 error_vbdev_alloc:
2056 	g_number_of_claimed_volumes--;
2057 	return rc;
2058 }
2059 
2060 /* RPC entry for deleting a crypto vbdev. */
2061 void
2062 delete_crypto_disk(const char *bdev_name, spdk_delete_crypto_complete cb_fn,
2063 		   void *cb_arg)
2064 {
2065 	struct bdev_names *name;
2066 	int rc;
2067 
2068 	/* Some cleanup happens in the destruct callback. */
2069 	rc = spdk_bdev_unregister_by_name(bdev_name, &crypto_if, cb_fn, cb_arg);
2070 	if (rc == 0) {
2071 		/* Remove the association (vbdev, bdev) from g_bdev_names. This is required so that the
2072 		 * vbdev does not get re-created if the same bdev is constructed at some other time,
2073 		 * unless the underlying bdev was hot-removed.
2074 		 */
2075 		TAILQ_FOREACH(name, &g_bdev_names, link) {
2076 			if (strcmp(name->opts->vbdev_name, bdev_name) == 0) {
2077 				vbdev_crypto_delete_name(name);
2078 				break;
2079 			}
2080 		}
2081 	} else {
2082 		cb_fn(cb_arg, rc);
2083 	}
2084 }
2085 
2086 /* Because we specified this function in our crypto bdev function table when we
2087  * registered our crypto bdev, we'll get this call anytime a new bdev shows up.
2088  * Here we need to decide if we care about it and if so what to do. We
2089  * parsed the config file at init so we check the new bdev against the list
2090  * we built up at that time and if the user configured us to attach to this
2091  * bdev, here's where we do it.
2092  */
2093 static void
2094 vbdev_crypto_examine(struct spdk_bdev *bdev)
2095 {
2096 	vbdev_crypto_claim(spdk_bdev_get_name(bdev));
2097 	spdk_bdev_module_examine_done(&crypto_if);
2098 }
2099 
2100 SPDK_LOG_REGISTER_COMPONENT(vbdev_crypto)
2101