xref: /spdk/module/bdev/raid/bdev_raid.c (revision 45a053c5777494f4e8ce4bc1191c9de3920377f7)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_WINDOW_SIZE	1024 * 1024
18 #define RAID_BDEV_PROCESS_MAX_QD	16
19 
20 static bool g_shutdown_started = false;
21 
22 /* List of all raid bdevs */
23 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
24 
25 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
26 
27 /*
28  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
29  * contains the relationship of raid bdev io channel with base bdev io channels.
30  */
31 struct raid_bdev_io_channel {
32 	/* Array of IO channels of base bdevs */
33 	struct spdk_io_channel	**base_channel;
34 
35 	/* Private raid module IO channel */
36 	struct spdk_io_channel	*module_channel;
37 
38 	/* Background process data */
39 	struct {
40 		uint64_t offset;
41 		struct spdk_io_channel *target_ch;
42 		struct raid_bdev_io_channel *ch_processed;
43 	} process;
44 };
45 
46 enum raid_bdev_process_state {
47 	RAID_PROCESS_STATE_INIT,
48 	RAID_PROCESS_STATE_RUNNING,
49 	RAID_PROCESS_STATE_STOPPING,
50 	RAID_PROCESS_STATE_STOPPED,
51 };
52 
53 struct raid_bdev_process {
54 	struct raid_bdev		*raid_bdev;
55 	enum raid_process_type		type;
56 	enum raid_bdev_process_state	state;
57 	struct spdk_thread		*thread;
58 	struct raid_bdev_io_channel	*raid_ch;
59 	TAILQ_HEAD(, raid_bdev_process_request) requests;
60 	uint64_t			max_window_size;
61 	uint64_t			window_size;
62 	uint64_t			window_remaining;
63 	int				window_status;
64 	uint64_t			window_offset;
65 	bool				window_range_locked;
66 	struct raid_base_bdev_info	*target;
67 	int				status;
68 };
69 
70 static struct raid_bdev_module *
71 raid_bdev_module_find(enum raid_level level)
72 {
73 	struct raid_bdev_module *raid_module;
74 
75 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
76 		if (raid_module->level == level) {
77 			return raid_module;
78 		}
79 	}
80 
81 	return NULL;
82 }
83 
84 void
85 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
86 {
87 	if (raid_bdev_module_find(raid_module->level) != NULL) {
88 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
89 			    raid_bdev_level_to_str(raid_module->level));
90 		assert(false);
91 	} else {
92 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
93 	}
94 }
95 
96 struct spdk_io_channel *
97 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
98 {
99 	return raid_ch->base_channel[idx];
100 }
101 
102 void *
103 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
104 {
105 	assert(raid_ch->module_channel != NULL);
106 
107 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
108 }
109 
110 /* Function declarations */
111 static void	raid_bdev_examine(struct spdk_bdev *bdev);
112 static int	raid_bdev_init(void);
113 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
114 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
115 
116 static void
117 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
118 {
119 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
120 
121 	if (raid_ch->process.target_ch != NULL) {
122 		spdk_put_io_channel(raid_ch->process.target_ch);
123 		raid_ch->process.target_ch = NULL;
124 	}
125 
126 	if (raid_ch->process.ch_processed != NULL) {
127 		free(raid_ch->process.ch_processed->base_channel);
128 		free(raid_ch->process.ch_processed);
129 		raid_ch->process.ch_processed = NULL;
130 	}
131 }
132 
133 static int
134 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
135 {
136 	struct raid_bdev *raid_bdev = process->raid_bdev;
137 	struct raid_bdev_io_channel *raid_ch_processed;
138 	struct raid_base_bdev_info *base_info;
139 
140 	raid_ch->process.offset = process->window_offset;
141 
142 	/* In the future we may have other types of processes which don't use a target bdev,
143 	 * like data scrubbing or strip size migration. Until then, expect that there always is
144 	 * a process target. */
145 	assert(process->target != NULL);
146 
147 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
148 	if (raid_ch->process.target_ch == NULL) {
149 		goto err;
150 	}
151 
152 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
153 	if (raid_ch_processed == NULL) {
154 		goto err;
155 	}
156 	raid_ch->process.ch_processed = raid_ch_processed;
157 
158 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
159 					  sizeof(*raid_ch_processed->base_channel));
160 	if (raid_ch_processed->base_channel == NULL) {
161 		goto err;
162 	}
163 
164 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
165 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
166 
167 		if (base_info != process->target) {
168 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
169 		} else {
170 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
171 		}
172 	}
173 
174 	raid_ch_processed->module_channel = raid_ch->module_channel;
175 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
176 
177 	return 0;
178 err:
179 	raid_bdev_ch_process_cleanup(raid_ch);
180 	return -ENOMEM;
181 }
182 
183 /*
184  * brief:
185  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
186  * hierarchy from raid bdev to base bdev io channels. It will be called per core
187  * params:
188  * io_device - pointer to raid bdev io device represented by raid_bdev
189  * ctx_buf - pointer to context buffer for raid bdev io channel
190  * returns:
191  * 0 - success
192  * non zero - failure
193  */
194 static int
195 raid_bdev_create_cb(void *io_device, void *ctx_buf)
196 {
197 	struct raid_bdev            *raid_bdev = io_device;
198 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
199 	uint8_t i;
200 	int ret = -ENOMEM;
201 
202 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
203 
204 	assert(raid_bdev != NULL);
205 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
206 
207 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
208 	if (!raid_ch->base_channel) {
209 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
210 		return -ENOMEM;
211 	}
212 
213 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
214 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
215 		/*
216 		 * Get the spdk_io_channel for all the base bdevs. This is used during
217 		 * split logic to send the respective child bdev ios to respective base
218 		 * bdev io channel.
219 		 * Skip missing base bdevs and the process target, which should also be treated as
220 		 * missing until the process completes.
221 		 */
222 		if (raid_bdev->base_bdev_info[i].desc == NULL ||
223 		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
224 			continue;
225 		}
226 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
227 						   raid_bdev->base_bdev_info[i].desc);
228 		if (!raid_ch->base_channel[i]) {
229 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
230 			goto err;
231 		}
232 	}
233 
234 	if (raid_bdev->process != NULL) {
235 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
236 		if (ret != 0) {
237 			SPDK_ERRLOG("Failed to setup process io channel\n");
238 			goto err;
239 		}
240 	} else {
241 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
242 	}
243 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
244 
245 	if (raid_bdev->module->get_io_channel) {
246 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
247 		if (!raid_ch->module_channel) {
248 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
249 			goto err_unlocked;
250 		}
251 	}
252 
253 	return 0;
254 err:
255 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
256 err_unlocked:
257 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
258 		if (raid_ch->base_channel[i] != NULL) {
259 			spdk_put_io_channel(raid_ch->base_channel[i]);
260 		}
261 	}
262 	free(raid_ch->base_channel);
263 
264 	raid_bdev_ch_process_cleanup(raid_ch);
265 
266 	return ret;
267 }
268 
269 /*
270  * brief:
271  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
272  * hierarchy from raid bdev to base bdev io channels. It will be called per core
273  * params:
274  * io_device - pointer to raid bdev io device represented by raid_bdev
275  * ctx_buf - pointer to context buffer for raid bdev io channel
276  * returns:
277  * none
278  */
279 static void
280 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
281 {
282 	struct raid_bdev *raid_bdev = io_device;
283 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
284 	uint8_t i;
285 
286 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
287 
288 	assert(raid_ch != NULL);
289 	assert(raid_ch->base_channel);
290 
291 	if (raid_ch->module_channel) {
292 		spdk_put_io_channel(raid_ch->module_channel);
293 	}
294 
295 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
296 		/* Free base bdev channels */
297 		if (raid_ch->base_channel[i] != NULL) {
298 			spdk_put_io_channel(raid_ch->base_channel[i]);
299 		}
300 	}
301 	free(raid_ch->base_channel);
302 	raid_ch->base_channel = NULL;
303 
304 	raid_bdev_ch_process_cleanup(raid_ch);
305 }
306 
307 /*
308  * brief:
309  * raid_bdev_cleanup is used to cleanup raid_bdev related data
310  * structures.
311  * params:
312  * raid_bdev - pointer to raid_bdev
313  * returns:
314  * none
315  */
316 static void
317 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
318 {
319 	struct raid_base_bdev_info *base_info;
320 
321 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
322 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
323 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
324 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
325 
326 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
327 		assert(base_info->desc == NULL);
328 		free(base_info->name);
329 	}
330 
331 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
332 }
333 
334 static void
335 raid_bdev_free(struct raid_bdev *raid_bdev)
336 {
337 	spdk_dma_free(raid_bdev->sb);
338 	spdk_spin_destroy(&raid_bdev->base_bdev_lock);
339 	free(raid_bdev->base_bdev_info);
340 	free(raid_bdev->bdev.name);
341 	free(raid_bdev);
342 }
343 
344 static void
345 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
346 {
347 	raid_bdev_cleanup(raid_bdev);
348 	raid_bdev_free(raid_bdev);
349 }
350 
351 /*
352  * brief:
353  * free resource of base bdev for raid bdev
354  * params:
355  * base_info - raid base bdev info
356  * returns:
357  * none
358  */
359 static void
360 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
361 {
362 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
363 
364 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
365 
366 	free(base_info->name);
367 	base_info->name = NULL;
368 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
369 		spdk_uuid_set_null(&base_info->uuid);
370 	}
371 
372 	if (base_info->desc == NULL) {
373 		return;
374 	}
375 
376 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
377 	spdk_bdev_close(base_info->desc);
378 	base_info->desc = NULL;
379 	spdk_put_io_channel(base_info->app_thread_ch);
380 	base_info->app_thread_ch = NULL;
381 
382 	if (base_info->is_configured) {
383 		assert(raid_bdev->num_base_bdevs_discovered);
384 		raid_bdev->num_base_bdevs_discovered--;
385 		base_info->is_configured = false;
386 	}
387 }
388 
389 static void
390 raid_bdev_io_device_unregister_cb(void *io_device)
391 {
392 	struct raid_bdev *raid_bdev = io_device;
393 
394 	if (raid_bdev->num_base_bdevs_discovered == 0) {
395 		/* Free raid_bdev when there are no base bdevs left */
396 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
397 		raid_bdev_cleanup(raid_bdev);
398 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
399 		raid_bdev_free(raid_bdev);
400 	} else {
401 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
402 	}
403 }
404 
405 void
406 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
407 {
408 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
409 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
410 	}
411 }
412 
413 static void
414 _raid_bdev_destruct(void *ctxt)
415 {
416 	struct raid_bdev *raid_bdev = ctxt;
417 	struct raid_base_bdev_info *base_info;
418 
419 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
420 
421 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
422 		/*
423 		 * Close all base bdev descriptors for which call has come from below
424 		 * layers.  Also close the descriptors if we have started shutdown.
425 		 */
426 		if (g_shutdown_started || base_info->remove_scheduled == true) {
427 			raid_bdev_free_base_bdev_resource(base_info);
428 		}
429 	}
430 
431 	if (g_shutdown_started) {
432 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
433 	}
434 
435 	if (raid_bdev->module->stop != NULL) {
436 		if (raid_bdev->module->stop(raid_bdev) == false) {
437 			return;
438 		}
439 	}
440 
441 	raid_bdev_module_stop_done(raid_bdev);
442 }
443 
444 static int
445 raid_bdev_destruct(void *ctx)
446 {
447 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
448 
449 	return 1;
450 }
451 
452 void
453 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
454 {
455 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
456 
457 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
458 		struct iovec *split_iov = raid_io->split.iov;
459 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
460 
461 		/*
462 		 * Non-zero offset here means that this is the completion of the first part of the
463 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
464 		 */
465 		if (raid_io->split.offset != 0) {
466 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
467 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
468 
469 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
470 				raid_io->num_blocks = raid_io->split.offset;
471 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
472 				raid_io->iovs = bdev_io->u.bdev.iovs;
473 				if (split_iov != NULL) {
474 					raid_io->iovcnt++;
475 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
476 					split_iov->iov_base = split_iov_orig->iov_base;
477 				}
478 
479 				raid_io->split.offset = 0;
480 				raid_io->base_bdev_io_submitted = 0;
481 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
482 
483 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
484 				return;
485 			}
486 		}
487 
488 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
489 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
490 		raid_io->iovs = bdev_io->u.bdev.iovs;
491 		if (split_iov != NULL) {
492 			*split_iov = *split_iov_orig;
493 		}
494 	}
495 
496 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
497 		raid_io->completion_cb(raid_io, status);
498 	} else {
499 		spdk_bdev_io_complete(bdev_io, status);
500 	}
501 }
502 
503 /*
504  * brief:
505  * raid_bdev_io_complete_part - signal the completion of a part of the expected
506  * base bdev IOs and complete the raid_io if this is the final expected IO.
507  * The caller should first set raid_io->base_bdev_io_remaining. This function
508  * will decrement this counter by the value of the 'completed' parameter and
509  * complete the raid_io if the counter reaches 0. The caller is free to
510  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
511  * it can represent e.g. blocks or IOs.
512  * params:
513  * raid_io - pointer to raid_bdev_io
514  * completed - the part of the raid_io that has been completed
515  * status - status of the base IO
516  * returns:
517  * true - if the raid_io is completed
518  * false - otherwise
519  */
520 bool
521 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
522 			   enum spdk_bdev_io_status status)
523 {
524 	assert(raid_io->base_bdev_io_remaining >= completed);
525 	raid_io->base_bdev_io_remaining -= completed;
526 
527 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
528 		raid_io->base_bdev_io_status = status;
529 	}
530 
531 	if (raid_io->base_bdev_io_remaining == 0) {
532 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
533 		return true;
534 	} else {
535 		return false;
536 	}
537 }
538 
539 /*
540  * brief:
541  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
542  * It will try to queue the IOs after storing the context to bdev wait queue logic.
543  * params:
544  * raid_io - pointer to raid_bdev_io
545  * bdev - the block device that the IO is submitted to
546  * ch - io channel
547  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
548  * returns:
549  * none
550  */
551 void
552 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
553 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
554 {
555 	raid_io->waitq_entry.bdev = bdev;
556 	raid_io->waitq_entry.cb_fn = cb_fn;
557 	raid_io->waitq_entry.cb_arg = raid_io;
558 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
559 }
560 
561 static void
562 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
563 {
564 	struct raid_bdev_io *raid_io = cb_arg;
565 
566 	spdk_bdev_free_io(bdev_io);
567 
568 	raid_bdev_io_complete_part(raid_io, 1, success ?
569 				   SPDK_BDEV_IO_STATUS_SUCCESS :
570 				   SPDK_BDEV_IO_STATUS_FAILED);
571 }
572 
573 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
574 
575 static void
576 _raid_bdev_submit_reset_request(void *_raid_io)
577 {
578 	struct raid_bdev_io *raid_io = _raid_io;
579 
580 	raid_bdev_submit_reset_request(raid_io);
581 }
582 
583 /*
584  * brief:
585  * raid_bdev_submit_reset_request function submits reset requests
586  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
587  * which case it will queue it for later submission
588  * params:
589  * raid_io
590  * returns:
591  * none
592  */
593 static void
594 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
595 {
596 	struct raid_bdev		*raid_bdev;
597 	int				ret;
598 	uint8_t				i;
599 	struct raid_base_bdev_info	*base_info;
600 	struct spdk_io_channel		*base_ch;
601 
602 	raid_bdev = raid_io->raid_bdev;
603 
604 	if (raid_io->base_bdev_io_remaining == 0) {
605 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
606 	}
607 
608 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
609 		base_info = &raid_bdev->base_bdev_info[i];
610 		base_ch = raid_io->raid_ch->base_channel[i];
611 		if (base_ch == NULL) {
612 			raid_io->base_bdev_io_submitted++;
613 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
614 			continue;
615 		}
616 		ret = spdk_bdev_reset(base_info->desc, base_ch,
617 				      raid_base_bdev_reset_complete, raid_io);
618 		if (ret == 0) {
619 			raid_io->base_bdev_io_submitted++;
620 		} else if (ret == -ENOMEM) {
621 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
622 						base_ch, _raid_bdev_submit_reset_request);
623 			return;
624 		} else {
625 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
626 			assert(false);
627 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
628 			return;
629 		}
630 	}
631 }
632 
633 static void
634 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
635 {
636 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
637 	size_t iov_offset = (split_offset << raid_bdev->blocklen_shift);
638 	int i;
639 
640 	assert(split_offset != 0);
641 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
642 	raid_io->split.offset = split_offset;
643 
644 	raid_io->offset_blocks += split_offset;
645 	raid_io->num_blocks -= split_offset;
646 	if (raid_io->md_buf != NULL) {
647 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
648 	}
649 
650 	for (i = 0; i < raid_io->iovcnt; i++) {
651 		struct iovec *iov = &raid_io->iovs[i];
652 
653 		if (iov_offset < iov->iov_len) {
654 			if (iov_offset == 0) {
655 				raid_io->split.iov = NULL;
656 			} else {
657 				raid_io->split.iov = iov;
658 				raid_io->split.iov_copy = *iov;
659 				iov->iov_base += iov_offset;
660 				iov->iov_len -= iov_offset;
661 			}
662 			raid_io->iovs += i;
663 			raid_io->iovcnt -= i;
664 			break;
665 		}
666 
667 		iov_offset -= iov->iov_len;
668 	}
669 }
670 
671 static void
672 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
673 {
674 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
675 
676 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
677 		uint64_t offset_begin = raid_io->offset_blocks;
678 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
679 
680 		if (offset_end > raid_ch->process.offset) {
681 			if (offset_begin < raid_ch->process.offset) {
682 				/*
683 				 * If the I/O spans both the processed and unprocessed ranges,
684 				 * split it and first handle the unprocessed part. After it
685 				 * completes, the rest will be handled.
686 				 * This situation occurs when the process thread is not active
687 				 * or is waiting for the process window range to be locked
688 				 * (quiesced). When a window is being processed, such I/Os will be
689 				 * deferred by the bdev layer until the window is unlocked.
690 				 */
691 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
692 					      raid_ch->process.offset, offset_begin, offset_end);
693 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
694 			}
695 		} else {
696 			/* Use the child channel, which corresponds to the already processed range */
697 			raid_io->raid_ch = raid_ch->process.ch_processed;
698 		}
699 	}
700 
701 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
702 }
703 
704 /*
705  * brief:
706  * Callback function to spdk_bdev_io_get_buf.
707  * params:
708  * ch - pointer to raid bdev io channel
709  * bdev_io - pointer to parent bdev_io on raid bdev device
710  * success - True if buffer is allocated or false otherwise.
711  * returns:
712  * none
713  */
714 static void
715 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
716 		     bool success)
717 {
718 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
719 
720 	if (!success) {
721 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
722 		return;
723 	}
724 
725 	raid_bdev_submit_rw_request(raid_io);
726 }
727 
728 void
729 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
730 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
731 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
732 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
733 {
734 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
735 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
736 
737 	raid_io->type = type;
738 	raid_io->offset_blocks = offset_blocks;
739 	raid_io->num_blocks = num_blocks;
740 	raid_io->iovs = iovs;
741 	raid_io->iovcnt = iovcnt;
742 	raid_io->memory_domain = memory_domain;
743 	raid_io->memory_domain_ctx = memory_domain_ctx;
744 	raid_io->md_buf = md_buf;
745 
746 	raid_io->raid_bdev = raid_bdev;
747 	raid_io->raid_ch = raid_ch;
748 	raid_io->base_bdev_io_remaining = 0;
749 	raid_io->base_bdev_io_submitted = 0;
750 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
751 	raid_io->completion_cb = NULL;
752 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
753 }
754 
755 /*
756  * brief:
757  * raid_bdev_submit_request function is the submit_request function pointer of
758  * raid bdev function table. This is used to submit the io on raid_bdev to below
759  * layers.
760  * params:
761  * ch - pointer to raid bdev io channel
762  * bdev_io - pointer to parent bdev_io on raid bdev device
763  * returns:
764  * none
765  */
766 static void
767 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
768 {
769 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
770 
771 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
772 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
773 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
774 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
775 
776 	switch (bdev_io->type) {
777 	case SPDK_BDEV_IO_TYPE_READ:
778 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
779 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
780 		break;
781 	case SPDK_BDEV_IO_TYPE_WRITE:
782 		raid_bdev_submit_rw_request(raid_io);
783 		break;
784 
785 	case SPDK_BDEV_IO_TYPE_RESET:
786 		raid_bdev_submit_reset_request(raid_io);
787 		break;
788 
789 	case SPDK_BDEV_IO_TYPE_FLUSH:
790 	case SPDK_BDEV_IO_TYPE_UNMAP:
791 		if (raid_io->raid_bdev->process != NULL) {
792 			/* TODO: rebuild support */
793 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
794 			return;
795 		}
796 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
797 		break;
798 
799 	default:
800 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
801 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
802 		break;
803 	}
804 }
805 
806 /*
807  * brief:
808  * _raid_bdev_io_type_supported checks whether io_type is supported in
809  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
810  * doesn't support, the raid device doesn't supports.
811  *
812  * params:
813  * raid_bdev - pointer to raid bdev context
814  * io_type - io type
815  * returns:
816  * true - io_type is supported
817  * false - io_type is not supported
818  */
819 inline static bool
820 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
821 {
822 	struct raid_base_bdev_info *base_info;
823 
824 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
825 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
826 		if (raid_bdev->module->submit_null_payload_request == NULL) {
827 			return false;
828 		}
829 	}
830 
831 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
832 		if (base_info->desc == NULL) {
833 			continue;
834 		}
835 
836 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
837 			return false;
838 		}
839 	}
840 
841 	return true;
842 }
843 
844 /*
845  * brief:
846  * raid_bdev_io_type_supported is the io_supported function for bdev function
847  * table which returns whether the particular io type is supported or not by
848  * raid bdev module
849  * params:
850  * ctx - pointer to raid bdev context
851  * type - io type
852  * returns:
853  * true - io_type is supported
854  * false - io_type is not supported
855  */
856 static bool
857 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
858 {
859 	switch (io_type) {
860 	case SPDK_BDEV_IO_TYPE_READ:
861 	case SPDK_BDEV_IO_TYPE_WRITE:
862 		return true;
863 
864 	case SPDK_BDEV_IO_TYPE_FLUSH:
865 	case SPDK_BDEV_IO_TYPE_RESET:
866 	case SPDK_BDEV_IO_TYPE_UNMAP:
867 		return _raid_bdev_io_type_supported(ctx, io_type);
868 
869 	default:
870 		return false;
871 	}
872 
873 	return false;
874 }
875 
876 /*
877  * brief:
878  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
879  * raid bdev. This is used to return the io channel for this raid bdev
880  * params:
881  * ctxt - pointer to raid_bdev
882  * returns:
883  * pointer to io channel for raid bdev
884  */
885 static struct spdk_io_channel *
886 raid_bdev_get_io_channel(void *ctxt)
887 {
888 	struct raid_bdev *raid_bdev = ctxt;
889 
890 	return spdk_get_io_channel(raid_bdev);
891 }
892 
893 void
894 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
895 {
896 	struct raid_base_bdev_info *base_info;
897 	char uuid_str[SPDK_UUID_STRING_LEN];
898 
899 	assert(raid_bdev != NULL);
900 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
901 
902 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
903 	spdk_json_write_named_string(w, "uuid", uuid_str);
904 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
905 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
906 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
907 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
908 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
909 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
910 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
911 				     raid_bdev->num_base_bdevs_operational);
912 	if (raid_bdev->process) {
913 		struct raid_bdev_process *process = raid_bdev->process;
914 		uint64_t offset = process->window_offset;
915 
916 		spdk_json_write_named_object_begin(w, "process");
917 		spdk_json_write_name(w, "type");
918 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
919 		spdk_json_write_named_string(w, "target", process->target->name);
920 		spdk_json_write_named_object_begin(w, "progress");
921 		spdk_json_write_named_uint64(w, "blocks", offset);
922 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
923 		spdk_json_write_object_end(w);
924 		spdk_json_write_object_end(w);
925 	}
926 	spdk_json_write_name(w, "base_bdevs_list");
927 	spdk_json_write_array_begin(w);
928 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
929 		spdk_json_write_object_begin(w);
930 		spdk_json_write_name(w, "name");
931 		if (base_info->name) {
932 			spdk_json_write_string(w, base_info->name);
933 		} else {
934 			spdk_json_write_null(w);
935 		}
936 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
937 		spdk_json_write_named_string(w, "uuid", uuid_str);
938 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
939 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
940 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
941 		spdk_json_write_object_end(w);
942 	}
943 	spdk_json_write_array_end(w);
944 }
945 
946 /*
947  * brief:
948  * raid_bdev_dump_info_json is the function table pointer for raid bdev
949  * params:
950  * ctx - pointer to raid_bdev
951  * w - pointer to json context
952  * returns:
953  * 0 - success
954  * non zero - failure
955  */
956 static int
957 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
958 {
959 	struct raid_bdev *raid_bdev = ctx;
960 
961 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
962 
963 	/* Dump the raid bdev configuration related information */
964 	spdk_json_write_named_object_begin(w, "raid");
965 	raid_bdev_write_info_json(raid_bdev, w);
966 	spdk_json_write_object_end(w);
967 
968 	return 0;
969 }
970 
971 /*
972  * brief:
973  * raid_bdev_write_config_json is the function table pointer for raid bdev
974  * params:
975  * bdev - pointer to spdk_bdev
976  * w - pointer to json context
977  * returns:
978  * none
979  */
980 static void
981 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
982 {
983 	struct raid_bdev *raid_bdev = bdev->ctxt;
984 	struct raid_base_bdev_info *base_info;
985 	char uuid_str[SPDK_UUID_STRING_LEN];
986 
987 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
988 
989 	if (raid_bdev->sb != NULL) {
990 		/* raid bdev configuration is stored in the superblock */
991 		return;
992 	}
993 
994 	spdk_json_write_object_begin(w);
995 
996 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
997 
998 	spdk_json_write_named_object_begin(w, "params");
999 	spdk_json_write_named_string(w, "name", bdev->name);
1000 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
1001 	spdk_json_write_named_string(w, "uuid", uuid_str);
1002 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1003 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1004 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
1005 
1006 	spdk_json_write_named_array_begin(w, "base_bdevs");
1007 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1008 		if (base_info->desc) {
1009 			spdk_json_write_string(w, spdk_bdev_desc_get_bdev(base_info->desc)->name);
1010 		}
1011 	}
1012 	spdk_json_write_array_end(w);
1013 	spdk_json_write_object_end(w);
1014 
1015 	spdk_json_write_object_end(w);
1016 }
1017 
1018 static int
1019 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1020 {
1021 	struct raid_bdev *raid_bdev = ctx;
1022 	struct raid_base_bdev_info *base_info;
1023 	int domains_count = 0, rc = 0;
1024 
1025 	if (raid_bdev->module->memory_domains_supported == false) {
1026 		return 0;
1027 	}
1028 
1029 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1030 
1031 	/* First loop to get the number of memory domains */
1032 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1033 		if (base_info->desc == NULL) {
1034 			continue;
1035 		}
1036 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1037 		if (rc < 0) {
1038 			goto out;
1039 		}
1040 		domains_count += rc;
1041 	}
1042 
1043 	if (!domains || array_size < domains_count) {
1044 		goto out;
1045 	}
1046 
1047 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1048 		if (base_info->desc == NULL) {
1049 			continue;
1050 		}
1051 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1052 		if (rc < 0) {
1053 			goto out;
1054 		}
1055 		domains += rc;
1056 		array_size -= rc;
1057 	}
1058 out:
1059 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1060 
1061 	if (rc < 0) {
1062 		return rc;
1063 	}
1064 
1065 	return domains_count;
1066 }
1067 
1068 /* g_raid_bdev_fn_table is the function table for raid bdev */
1069 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1070 	.destruct		= raid_bdev_destruct,
1071 	.submit_request		= raid_bdev_submit_request,
1072 	.io_type_supported	= raid_bdev_io_type_supported,
1073 	.get_io_channel		= raid_bdev_get_io_channel,
1074 	.dump_info_json		= raid_bdev_dump_info_json,
1075 	.write_config_json	= raid_bdev_write_config_json,
1076 	.get_memory_domains	= raid_bdev_get_memory_domains,
1077 };
1078 
1079 struct raid_bdev *
1080 raid_bdev_find_by_name(const char *name)
1081 {
1082 	struct raid_bdev *raid_bdev;
1083 
1084 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1085 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1086 			return raid_bdev;
1087 		}
1088 	}
1089 
1090 	return NULL;
1091 }
1092 
1093 static struct {
1094 	const char *name;
1095 	enum raid_level value;
1096 } g_raid_level_names[] = {
1097 	{ "raid0", RAID0 },
1098 	{ "0", RAID0 },
1099 	{ "raid1", RAID1 },
1100 	{ "1", RAID1 },
1101 	{ "raid5f", RAID5F },
1102 	{ "5f", RAID5F },
1103 	{ "concat", CONCAT },
1104 	{ }
1105 };
1106 
1107 const char *g_raid_state_names[] = {
1108 	[RAID_BDEV_STATE_ONLINE]	= "online",
1109 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1110 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1111 	[RAID_BDEV_STATE_MAX]		= NULL
1112 };
1113 
1114 static const char *g_raid_process_type_names[] = {
1115 	[RAID_PROCESS_NONE]	= "none",
1116 	[RAID_PROCESS_REBUILD]	= "rebuild",
1117 	[RAID_PROCESS_MAX]	= NULL
1118 };
1119 
1120 /* We have to use the typedef in the function declaration to appease astyle. */
1121 typedef enum raid_level raid_level_t;
1122 typedef enum raid_bdev_state raid_bdev_state_t;
1123 
1124 raid_level_t
1125 raid_bdev_str_to_level(const char *str)
1126 {
1127 	unsigned int i;
1128 
1129 	assert(str != NULL);
1130 
1131 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1132 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1133 			return g_raid_level_names[i].value;
1134 		}
1135 	}
1136 
1137 	return INVALID_RAID_LEVEL;
1138 }
1139 
1140 const char *
1141 raid_bdev_level_to_str(enum raid_level level)
1142 {
1143 	unsigned int i;
1144 
1145 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1146 		if (g_raid_level_names[i].value == level) {
1147 			return g_raid_level_names[i].name;
1148 		}
1149 	}
1150 
1151 	return "";
1152 }
1153 
1154 raid_bdev_state_t
1155 raid_bdev_str_to_state(const char *str)
1156 {
1157 	unsigned int i;
1158 
1159 	assert(str != NULL);
1160 
1161 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1162 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1163 			break;
1164 		}
1165 	}
1166 
1167 	return i;
1168 }
1169 
1170 const char *
1171 raid_bdev_state_to_str(enum raid_bdev_state state)
1172 {
1173 	if (state >= RAID_BDEV_STATE_MAX) {
1174 		return "";
1175 	}
1176 
1177 	return g_raid_state_names[state];
1178 }
1179 
1180 const char *
1181 raid_bdev_process_to_str(enum raid_process_type value)
1182 {
1183 	if (value >= RAID_PROCESS_MAX) {
1184 		return "";
1185 	}
1186 
1187 	return g_raid_process_type_names[value];
1188 }
1189 
1190 /*
1191  * brief:
1192  * raid_bdev_fini_start is called when bdev layer is starting the
1193  * shutdown process
1194  * params:
1195  * none
1196  * returns:
1197  * none
1198  */
1199 static void
1200 raid_bdev_fini_start(void)
1201 {
1202 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1203 	g_shutdown_started = true;
1204 }
1205 
1206 /*
1207  * brief:
1208  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1209  * params:
1210  * none
1211  * returns:
1212  * none
1213  */
1214 static void
1215 raid_bdev_exit(void)
1216 {
1217 	struct raid_bdev *raid_bdev, *tmp;
1218 
1219 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1220 
1221 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1222 		raid_bdev_cleanup_and_free(raid_bdev);
1223 	}
1224 }
1225 
1226 /*
1227  * brief:
1228  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1229  * module
1230  * params:
1231  * none
1232  * returns:
1233  * size of spdk_bdev_io context for raid
1234  */
1235 static int
1236 raid_bdev_get_ctx_size(void)
1237 {
1238 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1239 	return sizeof(struct raid_bdev_io);
1240 }
1241 
1242 static struct spdk_bdev_module g_raid_if = {
1243 	.name = "raid",
1244 	.module_init = raid_bdev_init,
1245 	.fini_start = raid_bdev_fini_start,
1246 	.module_fini = raid_bdev_exit,
1247 	.get_ctx_size = raid_bdev_get_ctx_size,
1248 	.examine_disk = raid_bdev_examine,
1249 	.async_init = false,
1250 	.async_fini = false,
1251 };
1252 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1253 
1254 /*
1255  * brief:
1256  * raid_bdev_init is the initialization function for raid bdev module
1257  * params:
1258  * none
1259  * returns:
1260  * 0 - success
1261  * non zero - failure
1262  */
1263 static int
1264 raid_bdev_init(void)
1265 {
1266 	return 0;
1267 }
1268 
1269 static int
1270 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1271 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1272 		  struct raid_bdev **raid_bdev_out)
1273 {
1274 	struct raid_bdev *raid_bdev;
1275 	struct spdk_bdev *raid_bdev_gen;
1276 	struct raid_bdev_module *module;
1277 	struct raid_base_bdev_info *base_info;
1278 	uint8_t min_operational;
1279 
1280 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1281 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1282 		return -EINVAL;
1283 	}
1284 
1285 	if (raid_bdev_find_by_name(name) != NULL) {
1286 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1287 		return -EEXIST;
1288 	}
1289 
1290 	if (level == RAID1) {
1291 		if (strip_size != 0) {
1292 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1293 			return -EINVAL;
1294 		}
1295 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1296 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1297 		return -EINVAL;
1298 	}
1299 
1300 	module = raid_bdev_module_find(level);
1301 	if (module == NULL) {
1302 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1303 		return -EINVAL;
1304 	}
1305 
1306 	assert(module->base_bdevs_min != 0);
1307 	if (num_base_bdevs < module->base_bdevs_min) {
1308 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1309 			    module->base_bdevs_min,
1310 			    raid_bdev_level_to_str(level));
1311 		return -EINVAL;
1312 	}
1313 
1314 	switch (module->base_bdevs_constraint.type) {
1315 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1316 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1317 		break;
1318 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1319 		min_operational = module->base_bdevs_constraint.value;
1320 		break;
1321 	case CONSTRAINT_UNSET:
1322 		if (module->base_bdevs_constraint.value != 0) {
1323 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1324 				    (uint8_t)module->base_bdevs_constraint.value, name);
1325 			return -EINVAL;
1326 		}
1327 		min_operational = num_base_bdevs;
1328 		break;
1329 	default:
1330 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1331 			    (uint8_t)module->base_bdevs_constraint.type,
1332 			    raid_bdev_level_to_str(module->level));
1333 		return -EINVAL;
1334 	};
1335 
1336 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1337 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1338 			    raid_bdev_level_to_str(module->level));
1339 		return -EINVAL;
1340 	}
1341 
1342 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1343 	if (!raid_bdev) {
1344 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1345 		return -ENOMEM;
1346 	}
1347 
1348 	spdk_spin_init(&raid_bdev->base_bdev_lock);
1349 	raid_bdev->module = module;
1350 	raid_bdev->num_base_bdevs = num_base_bdevs;
1351 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1352 					   sizeof(struct raid_base_bdev_info));
1353 	if (!raid_bdev->base_bdev_info) {
1354 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1355 		raid_bdev_free(raid_bdev);
1356 		return -ENOMEM;
1357 	}
1358 
1359 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1360 		base_info->raid_bdev = raid_bdev;
1361 	}
1362 
1363 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1364 	 * internally and set later.
1365 	 */
1366 	raid_bdev->strip_size = 0;
1367 	raid_bdev->strip_size_kb = strip_size;
1368 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1369 	raid_bdev->level = level;
1370 	raid_bdev->min_base_bdevs_operational = min_operational;
1371 
1372 	if (superblock_enabled) {
1373 		raid_bdev->sb = spdk_dma_zmalloc(RAID_BDEV_SB_MAX_LENGTH, 0x1000, NULL);
1374 		if (!raid_bdev->sb) {
1375 			SPDK_ERRLOG("Failed to allocate raid bdev sb buffer\n");
1376 			raid_bdev_free(raid_bdev);
1377 			return -ENOMEM;
1378 		}
1379 	}
1380 
1381 	raid_bdev_gen = &raid_bdev->bdev;
1382 
1383 	raid_bdev_gen->name = strdup(name);
1384 	if (!raid_bdev_gen->name) {
1385 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1386 		raid_bdev_free(raid_bdev);
1387 		return -ENOMEM;
1388 	}
1389 
1390 	raid_bdev_gen->product_name = "Raid Volume";
1391 	raid_bdev_gen->ctxt = raid_bdev;
1392 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1393 	raid_bdev_gen->module = &g_raid_if;
1394 	raid_bdev_gen->write_cache = 0;
1395 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1396 
1397 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1398 
1399 	*raid_bdev_out = raid_bdev;
1400 
1401 	return 0;
1402 }
1403 
1404 /*
1405  * brief:
1406  * raid_bdev_create allocates raid bdev based on passed configuration
1407  * params:
1408  * name - name for raid bdev
1409  * strip_size - strip size in KB
1410  * num_base_bdevs - number of base bdevs
1411  * level - raid level
1412  * superblock_enabled - true if raid should have superblock
1413  * uuid - uuid to set for the bdev
1414  * raid_bdev_out - the created raid bdev
1415  * returns:
1416  * 0 - success
1417  * non zero - failure
1418  */
1419 int
1420 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1421 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1422 		 struct raid_bdev **raid_bdev_out)
1423 {
1424 	struct raid_bdev *raid_bdev;
1425 	int rc;
1426 
1427 	assert(uuid != NULL);
1428 
1429 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1430 			       &raid_bdev);
1431 	if (rc != 0) {
1432 		return rc;
1433 	}
1434 
1435 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1436 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1437 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1438 	}
1439 
1440 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1441 
1442 	*raid_bdev_out = raid_bdev;
1443 
1444 	return 0;
1445 }
1446 
1447 static void
1448 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1449 {
1450 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1451 	int rc;
1452 
1453 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1454 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1455 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1456 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1457 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1458 				sizeof(struct raid_bdev_io_channel),
1459 				raid_bdev_gen->name);
1460 	rc = spdk_bdev_register(raid_bdev_gen);
1461 	if (rc != 0) {
1462 		SPDK_ERRLOG("Unable to register raid bdev and stay at configuring state\n");
1463 		if (raid_bdev->module->stop != NULL) {
1464 			raid_bdev->module->stop(raid_bdev);
1465 		}
1466 		spdk_io_device_unregister(raid_bdev, NULL);
1467 		raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1468 		return;
1469 	}
1470 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1471 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1472 		      raid_bdev_gen->name, raid_bdev);
1473 }
1474 
1475 static void
1476 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1477 {
1478 	if (status == 0) {
1479 		raid_bdev_configure_cont(raid_bdev);
1480 	} else {
1481 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1482 			    raid_bdev->bdev.name, spdk_strerror(-status));
1483 		if (raid_bdev->module->stop != NULL) {
1484 			raid_bdev->module->stop(raid_bdev);
1485 		}
1486 	}
1487 }
1488 
1489 /*
1490  * brief:
1491  * If raid bdev config is complete, then only register the raid bdev to
1492  * bdev layer and remove this raid bdev from configuring list and
1493  * insert the raid bdev to configured list
1494  * params:
1495  * raid_bdev - pointer to raid bdev
1496  * returns:
1497  * 0 - success
1498  * non zero - failure
1499  */
1500 static int
1501 raid_bdev_configure(struct raid_bdev *raid_bdev)
1502 {
1503 	int rc;
1504 
1505 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1506 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1507 	assert(raid_bdev->bdev.blocklen > 0);
1508 
1509 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1510 	 * internal use.
1511 	 */
1512 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / raid_bdev->bdev.blocklen;
1513 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1514 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1515 		return -EINVAL;
1516 	}
1517 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1518 	raid_bdev->blocklen_shift = spdk_u32log2(raid_bdev->bdev.blocklen);
1519 
1520 	rc = raid_bdev->module->start(raid_bdev);
1521 	if (rc != 0) {
1522 		SPDK_ERRLOG("raid module startup callback failed\n");
1523 		return rc;
1524 	}
1525 
1526 	if (raid_bdev->sb != NULL) {
1527 		if (spdk_uuid_is_null(&raid_bdev->sb->uuid)) {
1528 			/* NULL UUID is not valid in the sb so it means that we are creating a new
1529 			 * raid bdev and should initialize the superblock.
1530 			 */
1531 			raid_bdev_init_superblock(raid_bdev);
1532 		} else {
1533 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1534 			if (raid_bdev->sb->block_size != raid_bdev->bdev.blocklen) {
1535 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1536 				rc = -EINVAL;
1537 			}
1538 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1539 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1540 				rc = -EINVAL;
1541 			}
1542 			if (rc != 0) {
1543 				if (raid_bdev->module->stop != NULL) {
1544 					raid_bdev->module->stop(raid_bdev);
1545 				}
1546 				return rc;
1547 			}
1548 		}
1549 
1550 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1551 	} else {
1552 		raid_bdev_configure_cont(raid_bdev);
1553 	}
1554 
1555 	return 0;
1556 }
1557 
1558 /*
1559  * brief:
1560  * If raid bdev is online and registered, change the bdev state to
1561  * configuring and unregister this raid device. Queue this raid device
1562  * in configuring list
1563  * params:
1564  * raid_bdev - pointer to raid bdev
1565  * cb_fn - callback function
1566  * cb_arg - argument to callback function
1567  * returns:
1568  * none
1569  */
1570 static void
1571 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1572 		      void *cb_arg)
1573 {
1574 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1575 		if (cb_fn) {
1576 			cb_fn(cb_arg, 0);
1577 		}
1578 		return;
1579 	}
1580 
1581 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1582 	assert(raid_bdev->num_base_bdevs_discovered);
1583 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1584 
1585 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1586 }
1587 
1588 /*
1589  * brief:
1590  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1591  * params:
1592  * base_bdev - pointer to base bdev
1593  * returns:
1594  * base bdev info if found, otherwise NULL.
1595  */
1596 static struct raid_base_bdev_info *
1597 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1598 {
1599 	struct raid_bdev *raid_bdev;
1600 	struct raid_base_bdev_info *base_info;
1601 
1602 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1603 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1604 			if (base_info->desc != NULL &&
1605 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1606 				return base_info;
1607 			}
1608 		}
1609 	}
1610 
1611 	return NULL;
1612 }
1613 
1614 static void
1615 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1616 {
1617 	assert(base_info->remove_scheduled);
1618 
1619 	base_info->remove_scheduled = false;
1620 	if (base_info->remove_cb != NULL) {
1621 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1622 	}
1623 }
1624 
1625 static void
1626 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1627 {
1628 	struct raid_base_bdev_info *base_info = ctx;
1629 
1630 	if (status != 0) {
1631 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1632 			    raid_bdev->bdev.name, spdk_strerror(-status));
1633 	}
1634 
1635 	raid_bdev_remove_base_bdev_done(base_info, status);
1636 }
1637 
1638 static void
1639 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1640 {
1641 	struct raid_base_bdev_info *base_info = ctx;
1642 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1643 
1644 	if (status != 0) {
1645 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1646 			    raid_bdev->bdev.name, spdk_strerror(-status));
1647 		goto out;
1648 	}
1649 
1650 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1651 	raid_bdev_free_base_bdev_resource(base_info);
1652 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1653 
1654 	if (raid_bdev->sb) {
1655 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1656 		struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
1657 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1658 		uint8_t i;
1659 
1660 		for (i = 0; i < sb->base_bdevs_size; i++) {
1661 			sb_base_bdev = &sb->base_bdevs[i];
1662 
1663 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1664 			    sb_base_bdev->slot == slot) {
1665 				break;
1666 			}
1667 		}
1668 
1669 		assert(i < sb->base_bdevs_size);
1670 
1671 		/* TODO: distinguish between failure and intentional removal */
1672 		sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1673 
1674 		raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1675 		return;
1676 	}
1677 out:
1678 	raid_bdev_remove_base_bdev_done(base_info, status);
1679 }
1680 
1681 static void
1682 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1683 {
1684 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1685 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1686 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1687 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1688 
1689 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1690 
1691 	if (raid_ch->base_channel[idx] != NULL) {
1692 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1693 		raid_ch->base_channel[idx] = NULL;
1694 	}
1695 
1696 	if (raid_ch->process.ch_processed != NULL) {
1697 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1698 	}
1699 
1700 	spdk_for_each_channel_continue(i, 0);
1701 }
1702 
1703 static void
1704 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1705 {
1706 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1707 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1708 
1709 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1710 			    base_info);
1711 }
1712 
1713 static void
1714 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1715 {
1716 	struct raid_base_bdev_info *base_info = ctx;
1717 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1718 
1719 	if (status != 0) {
1720 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
1721 			    raid_bdev->bdev.name, spdk_strerror(-status));
1722 		raid_bdev_remove_base_bdev_done(base_info, status);
1723 		return;
1724 	}
1725 
1726 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1727 			      raid_bdev_channels_remove_base_bdev_done);
1728 }
1729 
1730 static int
1731 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
1732 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
1733 {
1734 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1735 
1736 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
1737 
1738 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1739 
1740 	if (base_info->remove_scheduled) {
1741 		return 0;
1742 	}
1743 
1744 	assert(base_info->desc);
1745 	base_info->remove_scheduled = true;
1746 	base_info->remove_cb = cb_fn;
1747 	base_info->remove_cb_ctx = cb_ctx;
1748 
1749 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1750 		/*
1751 		 * As raid bdev is not registered yet or already unregistered,
1752 		 * so cleanup should be done here itself.
1753 		 *
1754 		 * Removing a base bdev at this stage does not change the number of operational
1755 		 * base bdevs, only the number of discovered base bdevs.
1756 		 */
1757 		raid_bdev_free_base_bdev_resource(base_info);
1758 		if (raid_bdev->num_base_bdevs_discovered == 0) {
1759 			/* There is no base bdev for this raid, so free the raid device. */
1760 			raid_bdev_cleanup_and_free(raid_bdev);
1761 		}
1762 	} else if (raid_bdev->num_base_bdevs_operational-- == raid_bdev->min_base_bdevs_operational) {
1763 		/*
1764 		 * After this base bdev is removed there will not be enough base bdevs
1765 		 * to keep the raid bdev operational.
1766 		 */
1767 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
1768 	} else {
1769 		int ret;
1770 
1771 		ret = spdk_bdev_quiesce(&raid_bdev->bdev, &g_raid_if,
1772 					raid_bdev_remove_base_bdev_on_quiesced, base_info);
1773 		if (ret != 0) {
1774 			base_info->remove_scheduled = false;
1775 		}
1776 	}
1777 
1778 	return 0;
1779 }
1780 
1781 /*
1782  * brief:
1783  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
1784  * is removed. This function checks if this base bdev is part of any raid bdev
1785  * or not. If yes, it takes necessary action on that particular raid bdev.
1786  * params:
1787  * base_bdev - pointer to base bdev which got removed
1788  * cb_fn - callback function
1789  * cb_arg - argument to callback function
1790  * returns:
1791  * 0 - success
1792  * non zero - failure
1793  */
1794 int
1795 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
1796 {
1797 	struct raid_base_bdev_info *base_info;
1798 
1799 	/* Find the raid_bdev which has claimed this base_bdev */
1800 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
1801 	if (!base_info) {
1802 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
1803 		return -ENODEV;
1804 	}
1805 
1806 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
1807 }
1808 
1809 /*
1810  * brief:
1811  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
1812  * is resized. This function checks if the smallest size of the base_bdevs is changed.
1813  * If yes, call module handler to resize the raid_bdev if implemented.
1814  * params:
1815  * base_bdev - pointer to base bdev which got resized.
1816  * returns:
1817  * none
1818  */
1819 static void
1820 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
1821 {
1822 	struct raid_bdev *raid_bdev;
1823 	struct raid_base_bdev_info *base_info;
1824 
1825 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
1826 
1827 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
1828 
1829 	/* Find the raid_bdev which has claimed this base_bdev */
1830 	if (!base_info) {
1831 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
1832 		return;
1833 	}
1834 	raid_bdev = base_info->raid_bdev;
1835 
1836 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1837 
1838 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
1839 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
1840 
1841 	if (raid_bdev->module->resize) {
1842 		raid_bdev->module->resize(raid_bdev);
1843 	}
1844 }
1845 
1846 /*
1847  * brief:
1848  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
1849  * triggers asynchronous event.
1850  * params:
1851  * type - event details.
1852  * bdev - bdev that triggered event.
1853  * event_ctx - context for event.
1854  * returns:
1855  * none
1856  */
1857 static void
1858 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
1859 			  void *event_ctx)
1860 {
1861 	int rc;
1862 
1863 	switch (type) {
1864 	case SPDK_BDEV_EVENT_REMOVE:
1865 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
1866 		if (rc != 0) {
1867 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
1868 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
1869 		}
1870 		break;
1871 	case SPDK_BDEV_EVENT_RESIZE:
1872 		raid_bdev_resize_base_bdev(bdev);
1873 		break;
1874 	default:
1875 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1876 		break;
1877 	}
1878 }
1879 
1880 /*
1881  * brief:
1882  * Deletes the specified raid bdev
1883  * params:
1884  * raid_bdev - pointer to raid bdev
1885  * cb_fn - callback function
1886  * cb_arg - argument to callback function
1887  */
1888 void
1889 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
1890 {
1891 	struct raid_base_bdev_info *base_info;
1892 
1893 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
1894 
1895 	if (raid_bdev->destroy_started) {
1896 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
1897 			      raid_bdev->bdev.name);
1898 		if (cb_fn) {
1899 			cb_fn(cb_arg, -EALREADY);
1900 		}
1901 		return;
1902 	}
1903 
1904 	raid_bdev->destroy_started = true;
1905 
1906 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1907 		base_info->remove_scheduled = true;
1908 
1909 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1910 			/*
1911 			 * As raid bdev is not registered yet or already unregistered,
1912 			 * so cleanup should be done here itself.
1913 			 */
1914 			raid_bdev_free_base_bdev_resource(base_info);
1915 		}
1916 	}
1917 
1918 	if (raid_bdev->num_base_bdevs_discovered == 0) {
1919 		/* There is no base bdev for this raid, so free the raid device. */
1920 		raid_bdev_cleanup_and_free(raid_bdev);
1921 		if (cb_fn) {
1922 			cb_fn(cb_arg, 0);
1923 		}
1924 	} else {
1925 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
1926 	}
1927 }
1928 
1929 static void
1930 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1931 {
1932 	if (status != 0) {
1933 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
1934 			    raid_bdev->bdev.name, spdk_strerror(-status));
1935 	}
1936 }
1937 
1938 static void
1939 raid_bdev_process_finish_write_sb(void *ctx)
1940 {
1941 	struct raid_bdev *raid_bdev = ctx;
1942 	struct raid_bdev_superblock *sb = raid_bdev->sb;
1943 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
1944 	struct raid_base_bdev_info *base_info;
1945 	uint8_t i;
1946 
1947 	for (i = 0; i < sb->base_bdevs_size; i++) {
1948 		sb_base_bdev = &sb->base_bdevs[i];
1949 
1950 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
1951 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
1952 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
1953 			if (base_info->is_configured) {
1954 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
1955 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
1956 			}
1957 		}
1958 	}
1959 
1960 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
1961 }
1962 
1963 static void raid_bdev_process_free(struct raid_bdev_process *process);
1964 
1965 static void
1966 _raid_bdev_process_finish_done(void *ctx)
1967 {
1968 	struct raid_bdev_process *process = ctx;
1969 
1970 	raid_bdev_process_free(process);
1971 
1972 	spdk_thread_exit(spdk_get_thread());
1973 }
1974 
1975 static void
1976 raid_bdev_process_finish_target_removed(void *ctx, int status)
1977 {
1978 	struct raid_bdev_process *process = ctx;
1979 
1980 	if (status != 0) {
1981 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
1982 	}
1983 
1984 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
1985 }
1986 
1987 static void
1988 raid_bdev_process_finish_unquiesced(void *ctx, int status)
1989 {
1990 	struct raid_bdev_process *process = ctx;
1991 
1992 	if (status != 0) {
1993 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
1994 	}
1995 
1996 	if (process->status != 0) {
1997 		struct raid_base_bdev_info *target = process->target;
1998 
1999 		if (target->desc != NULL && target->remove_scheduled == false) {
2000 			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
2001 			return;
2002 		}
2003 	}
2004 
2005 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2006 }
2007 
2008 static void
2009 raid_bdev_process_finish_unquiesce(void *ctx)
2010 {
2011 	struct raid_bdev_process *process = ctx;
2012 	int rc;
2013 
2014 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2015 				 raid_bdev_process_finish_unquiesced, process);
2016 	if (rc != 0) {
2017 		raid_bdev_process_finish_unquiesced(process, rc);
2018 	}
2019 }
2020 
2021 static void
2022 raid_bdev_process_finish_done(void *ctx)
2023 {
2024 	struct raid_bdev_process *process = ctx;
2025 	struct raid_bdev *raid_bdev = process->raid_bdev;
2026 
2027 	if (process->raid_ch != NULL) {
2028 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2029 	}
2030 
2031 	process->state = RAID_PROCESS_STATE_STOPPED;
2032 
2033 	if (process->status == 0) {
2034 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2035 			       raid_bdev_process_to_str(process->type),
2036 			       raid_bdev->bdev.name);
2037 		if (raid_bdev->sb != NULL) {
2038 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2039 					     raid_bdev_process_finish_write_sb,
2040 					     raid_bdev);
2041 		}
2042 	} else {
2043 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2044 			     raid_bdev_process_to_str(process->type),
2045 			     raid_bdev->bdev.name,
2046 			     spdk_strerror(-process->status));
2047 	}
2048 
2049 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2050 			     process);
2051 }
2052 
2053 static void
2054 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2055 {
2056 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2057 
2058 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2059 }
2060 
2061 static void
2062 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2063 {
2064 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2065 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2066 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2067 
2068 	if (process->status == 0) {
2069 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2070 
2071 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2072 		raid_ch->process.target_ch = NULL;
2073 	}
2074 
2075 	raid_bdev_ch_process_cleanup(raid_ch);
2076 
2077 	spdk_for_each_channel_continue(i, 0);
2078 }
2079 
2080 static void
2081 raid_bdev_process_finish_quiesced(void *ctx, int status)
2082 {
2083 	struct raid_bdev_process *process = ctx;
2084 	struct raid_bdev *raid_bdev = process->raid_bdev;
2085 
2086 	if (status != 0) {
2087 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2088 		return;
2089 	}
2090 
2091 	raid_bdev->process = NULL;
2092 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2093 			      __raid_bdev_process_finish);
2094 }
2095 
2096 static void
2097 _raid_bdev_process_finish(void *ctx)
2098 {
2099 	struct raid_bdev_process *process = ctx;
2100 	int rc;
2101 
2102 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2103 			       raid_bdev_process_finish_quiesced, process);
2104 	if (rc != 0) {
2105 		raid_bdev_process_finish_quiesced(ctx, rc);
2106 	}
2107 }
2108 
2109 static void
2110 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2111 {
2112 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2113 }
2114 
2115 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2116 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2117 
2118 static void
2119 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2120 {
2121 	assert(spdk_get_thread() == process->thread);
2122 
2123 	if (process->status == 0) {
2124 		process->status = status;
2125 	}
2126 
2127 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2128 		return;
2129 	}
2130 
2131 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2132 	process->state = RAID_PROCESS_STATE_STOPPING;
2133 
2134 	if (process->window_range_locked) {
2135 		raid_bdev_process_unlock_window_range(process);
2136 	} else {
2137 		raid_bdev_process_thread_run(process);
2138 	}
2139 }
2140 
2141 static void
2142 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2143 {
2144 	struct raid_bdev_process *process = ctx;
2145 
2146 	if (status != 0) {
2147 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2148 		raid_bdev_process_finish(process, status);
2149 		return;
2150 	}
2151 
2152 	process->window_range_locked = false;
2153 	process->window_offset += process->window_size;
2154 
2155 	raid_bdev_process_thread_run(process);
2156 }
2157 
2158 static void
2159 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2160 {
2161 	int rc;
2162 
2163 	assert(process->window_range_locked == true);
2164 
2165 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2166 				       process->window_offset, process->max_window_size,
2167 				       raid_bdev_process_window_range_unlocked, process);
2168 	if (rc != 0) {
2169 		raid_bdev_process_window_range_unlocked(process, rc);
2170 	}
2171 }
2172 
2173 static void
2174 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2175 {
2176 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2177 
2178 	raid_bdev_process_unlock_window_range(process);
2179 }
2180 
2181 static void
2182 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2183 {
2184 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2185 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2186 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2187 
2188 	raid_ch->process.offset = process->window_offset + process->window_size;
2189 
2190 	spdk_for_each_channel_continue(i, 0);
2191 }
2192 
2193 void
2194 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2195 {
2196 	struct raid_bdev_process *process = process_req->process;
2197 
2198 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2199 
2200 	assert(spdk_get_thread() == process->thread);
2201 	assert(process->window_remaining >= process_req->num_blocks);
2202 
2203 	if (status != 0) {
2204 		process->window_status = status;
2205 	}
2206 
2207 	process->window_remaining -= process_req->num_blocks;
2208 	if (process->window_remaining == 0) {
2209 		if (process->window_status != 0) {
2210 			raid_bdev_process_finish(process, process->window_status);
2211 			return;
2212 		}
2213 
2214 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2215 				      raid_bdev_process_channels_update_done);
2216 	}
2217 }
2218 
2219 static int
2220 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2221 				 uint32_t num_blocks)
2222 {
2223 	struct raid_bdev *raid_bdev = process->raid_bdev;
2224 	struct raid_bdev_process_request *process_req;
2225 	int ret;
2226 
2227 	process_req = TAILQ_FIRST(&process->requests);
2228 	if (process_req == NULL) {
2229 		assert(process->window_remaining > 0);
2230 		return 0;
2231 	}
2232 
2233 	process_req->target = process->target;
2234 	process_req->target_ch = process->raid_ch->process.target_ch;
2235 	process_req->offset_blocks = offset_blocks;
2236 	process_req->num_blocks = num_blocks;
2237 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2238 
2239 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2240 	if (ret <= 0) {
2241 		if (ret < 0) {
2242 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2243 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2244 			process->window_status = ret;
2245 		}
2246 		return ret;
2247 	}
2248 
2249 	process_req->num_blocks = ret;
2250 	TAILQ_REMOVE(&process->requests, process_req, link);
2251 
2252 	return ret;
2253 }
2254 
2255 static void
2256 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2257 {
2258 	struct raid_bdev *raid_bdev = process->raid_bdev;
2259 	uint64_t offset = process->window_offset;
2260 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2261 	int ret;
2262 
2263 	while (offset < offset_end) {
2264 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2265 		if (ret <= 0) {
2266 			break;
2267 		}
2268 
2269 		process->window_remaining += ret;
2270 		offset += ret;
2271 	}
2272 
2273 	if (process->window_remaining > 0) {
2274 		process->window_size = process->window_remaining;
2275 	} else {
2276 		raid_bdev_process_finish(process, process->window_status);
2277 	}
2278 }
2279 
2280 static void
2281 raid_bdev_process_window_range_locked(void *ctx, int status)
2282 {
2283 	struct raid_bdev_process *process = ctx;
2284 
2285 	if (status != 0) {
2286 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2287 		raid_bdev_process_finish(process, status);
2288 		return;
2289 	}
2290 
2291 	process->window_range_locked = true;
2292 
2293 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2294 		raid_bdev_process_unlock_window_range(process);
2295 		return;
2296 	}
2297 
2298 	_raid_bdev_process_thread_run(process);
2299 }
2300 
2301 static void
2302 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2303 {
2304 	struct raid_bdev *raid_bdev = process->raid_bdev;
2305 	int rc;
2306 
2307 	assert(spdk_get_thread() == process->thread);
2308 	assert(process->window_remaining == 0);
2309 	assert(process->window_range_locked == false);
2310 
2311 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2312 		raid_bdev_process_do_finish(process);
2313 		return;
2314 	}
2315 
2316 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2317 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2318 		raid_bdev_process_finish(process, 0);
2319 		return;
2320 	}
2321 
2322 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2323 					    process->max_window_size);
2324 
2325 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2326 				     process->window_offset, process->max_window_size,
2327 				     raid_bdev_process_window_range_locked, process);
2328 	if (rc != 0) {
2329 		raid_bdev_process_window_range_locked(process, rc);
2330 	}
2331 }
2332 
2333 static void
2334 raid_bdev_process_thread_init(void *ctx)
2335 {
2336 	struct raid_bdev_process *process = ctx;
2337 	struct raid_bdev *raid_bdev = process->raid_bdev;
2338 	struct spdk_io_channel *ch;
2339 
2340 	process->thread = spdk_get_thread();
2341 
2342 	ch = spdk_get_io_channel(raid_bdev);
2343 	if (ch == NULL) {
2344 		process->status = -ENOMEM;
2345 		raid_bdev_process_do_finish(process);
2346 		return;
2347 	}
2348 
2349 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2350 	process->state = RAID_PROCESS_STATE_RUNNING;
2351 
2352 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2353 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2354 
2355 	raid_bdev_process_thread_run(process);
2356 }
2357 
2358 static void
2359 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2360 {
2361 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2362 
2363 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2364 	raid_bdev_process_free(process);
2365 
2366 	/* TODO: update sb */
2367 }
2368 
2369 static void
2370 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2371 {
2372 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2373 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2374 
2375 	raid_bdev_ch_process_cleanup(raid_ch);
2376 
2377 	spdk_for_each_channel_continue(i, 0);
2378 }
2379 
2380 static void
2381 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2382 {
2383 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2384 	struct raid_bdev *raid_bdev = process->raid_bdev;
2385 	struct spdk_thread *thread;
2386 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2387 
2388 	if (status != 0) {
2389 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2390 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2391 			    spdk_strerror(-status));
2392 		goto err;
2393 	}
2394 
2395 	/* TODO: we may need to abort if a base bdev was removed before we got here */
2396 
2397 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2398 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2399 
2400 	thread = spdk_thread_create(thread_name, NULL);
2401 	if (thread == NULL) {
2402 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2403 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2404 		goto err;
2405 	}
2406 
2407 	raid_bdev->process = process;
2408 
2409 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2410 
2411 	return;
2412 err:
2413 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2414 			      raid_bdev_channels_abort_start_process_done);
2415 }
2416 
2417 static void
2418 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2419 {
2420 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2421 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2422 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2423 	int rc;
2424 
2425 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2426 
2427 	spdk_for_each_channel_continue(i, rc);
2428 }
2429 
2430 static void
2431 raid_bdev_process_start(struct raid_bdev_process *process)
2432 {
2433 	struct raid_bdev *raid_bdev = process->raid_bdev;
2434 
2435 	assert(raid_bdev->module->submit_process_request != NULL);
2436 
2437 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2438 			      raid_bdev_channels_start_process_done);
2439 }
2440 
2441 static void
2442 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2443 {
2444 	spdk_dma_free(process_req->iov.iov_base);
2445 	spdk_dma_free(process_req->md_buf);
2446 	free(process_req);
2447 }
2448 
2449 static struct raid_bdev_process_request *
2450 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2451 {
2452 	struct raid_bdev *raid_bdev = process->raid_bdev;
2453 	struct raid_bdev_process_request *process_req;
2454 
2455 	process_req = calloc(1, sizeof(*process_req));
2456 	if (process_req == NULL) {
2457 		return NULL;
2458 	}
2459 
2460 	process_req->process = process;
2461 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2462 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2463 	if (process_req->iov.iov_base == NULL) {
2464 		free(process_req);
2465 		return NULL;
2466 	}
2467 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2468 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2469 		if (process_req->md_buf == NULL) {
2470 			raid_bdev_process_request_free(process_req);
2471 			return NULL;
2472 		}
2473 	}
2474 
2475 	return process_req;
2476 }
2477 
2478 static void
2479 raid_bdev_process_free(struct raid_bdev_process *process)
2480 {
2481 	struct raid_bdev_process_request *process_req;
2482 
2483 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2484 		TAILQ_REMOVE(&process->requests, process_req, link);
2485 		raid_bdev_process_request_free(process_req);
2486 	}
2487 
2488 	free(process);
2489 }
2490 
2491 static struct raid_bdev_process *
2492 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2493 			struct raid_base_bdev_info *target)
2494 {
2495 	struct raid_bdev_process *process;
2496 	struct raid_bdev_process_request *process_req;
2497 	int i;
2498 
2499 	process = calloc(1, sizeof(*process));
2500 	if (process == NULL) {
2501 		return NULL;
2502 	}
2503 
2504 	process->raid_bdev = raid_bdev;
2505 	process->type = type;
2506 	process->target = target;
2507 	process->max_window_size = spdk_max(RAID_BDEV_PROCESS_WINDOW_SIZE / raid_bdev->bdev.blocklen,
2508 					    raid_bdev->bdev.write_unit_size);
2509 	TAILQ_INIT(&process->requests);
2510 
2511 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2512 		process_req = raid_bdev_process_alloc_request(process);
2513 		if (process_req == NULL) {
2514 			raid_bdev_process_free(process);
2515 			return NULL;
2516 		}
2517 
2518 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2519 	}
2520 
2521 	return process;
2522 }
2523 
2524 static int
2525 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2526 {
2527 	struct raid_bdev_process *process;
2528 
2529 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2530 
2531 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2532 	if (process == NULL) {
2533 		return -ENOMEM;
2534 	}
2535 
2536 	raid_bdev_process_start(process);
2537 
2538 	return 0;
2539 }
2540 
2541 static void
2542 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
2543 {
2544 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2545 	int rc;
2546 
2547 	/* TODO: defer if rebuild in progress on another base bdev */
2548 	assert(raid_bdev->process == NULL);
2549 
2550 	base_info->is_configured = true;
2551 
2552 	raid_bdev->num_base_bdevs_discovered++;
2553 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
2554 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
2555 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
2556 
2557 	/*
2558 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
2559 	 * of base bdevs we know to be operational members of the array. Usually this is equal
2560 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
2561 	 * degraded.
2562 	 */
2563 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
2564 		rc = raid_bdev_configure(raid_bdev);
2565 		if (rc != 0) {
2566 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
2567 		}
2568 	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
2569 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
2570 		raid_bdev->num_base_bdevs_operational++;
2571 		rc = raid_bdev_start_rebuild(base_info);
2572 		if (rc != 0) {
2573 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
2574 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
2575 		}
2576 	} else {
2577 		rc = 0;
2578 	}
2579 
2580 	if (base_info->configure_cb != NULL) {
2581 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
2582 	}
2583 }
2584 
2585 static void
2586 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
2587 		void *ctx)
2588 {
2589 	struct raid_base_bdev_info *base_info = ctx;
2590 
2591 	switch (status) {
2592 	case 0:
2593 		/* valid superblock found */
2594 		SPDK_ERRLOG("Existing raid superblock found on bdev %s\n", base_info->name);
2595 		status = -EEXIST;
2596 		raid_bdev_free_base_bdev_resource(base_info);
2597 		break;
2598 	case -EINVAL:
2599 		/* no valid superblock */
2600 		raid_bdev_configure_base_bdev_cont(base_info);
2601 		return;
2602 	default:
2603 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
2604 			    base_info->name, spdk_strerror(-status));
2605 		break;
2606 	}
2607 
2608 	if (base_info->configure_cb != NULL) {
2609 		base_info->configure_cb(base_info->configure_cb_ctx, status);
2610 	}
2611 }
2612 
2613 static int
2614 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
2615 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
2616 {
2617 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2618 	struct spdk_bdev_desc *desc;
2619 	struct spdk_bdev *bdev;
2620 	const struct spdk_uuid *bdev_uuid;
2621 	int rc;
2622 
2623 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2624 	assert(base_info->desc == NULL);
2625 
2626 	/*
2627 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
2628 	 * before claiming the bdev.
2629 	 */
2630 
2631 	if (!spdk_uuid_is_null(&base_info->uuid)) {
2632 		char uuid_str[SPDK_UUID_STRING_LEN];
2633 		const char *bdev_name;
2634 
2635 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
2636 
2637 		/* UUID of a bdev is registered as its alias */
2638 		bdev = spdk_bdev_get_by_name(uuid_str);
2639 		if (bdev == NULL) {
2640 			return -ENODEV;
2641 		}
2642 
2643 		bdev_name = spdk_bdev_get_name(bdev);
2644 
2645 		if (base_info->name == NULL) {
2646 			assert(existing == true);
2647 			base_info->name = strdup(bdev_name);
2648 			if (base_info->name == NULL) {
2649 				return -ENOMEM;
2650 			}
2651 		} else if (strcmp(base_info->name, bdev_name) != 0) {
2652 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
2653 				    bdev_name, base_info->name);
2654 			return -EINVAL;
2655 		}
2656 	}
2657 
2658 	assert(base_info->name != NULL);
2659 
2660 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
2661 	if (rc != 0) {
2662 		if (rc != -ENODEV) {
2663 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
2664 		}
2665 		return rc;
2666 	}
2667 
2668 	bdev = spdk_bdev_desc_get_bdev(desc);
2669 	bdev_uuid = spdk_bdev_get_uuid(bdev);
2670 
2671 	if (spdk_uuid_is_null(&base_info->uuid)) {
2672 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
2673 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
2674 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
2675 		spdk_bdev_close(desc);
2676 		return -EINVAL;
2677 	}
2678 
2679 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
2680 	if (rc != 0) {
2681 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
2682 		spdk_bdev_close(desc);
2683 		return rc;
2684 	}
2685 
2686 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
2687 
2688 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
2689 	if (base_info->app_thread_ch == NULL) {
2690 		SPDK_ERRLOG("Failed to get io channel\n");
2691 		spdk_bdev_module_release_bdev(bdev);
2692 		spdk_bdev_close(desc);
2693 		return -ENOMEM;
2694 	}
2695 
2696 	base_info->desc = desc;
2697 	base_info->blockcnt = bdev->blockcnt;
2698 
2699 	if (raid_bdev->sb != NULL) {
2700 		uint64_t data_offset;
2701 
2702 		if (base_info->data_offset == 0) {
2703 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % bdev->blocklen) == 0);
2704 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / bdev->blocklen;
2705 		} else {
2706 			data_offset = base_info->data_offset;
2707 		}
2708 
2709 		if (bdev->optimal_io_boundary != 0) {
2710 			data_offset = spdk_divide_round_up(data_offset,
2711 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
2712 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
2713 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
2714 					     base_info->data_offset, base_info->name, data_offset);
2715 				data_offset = base_info->data_offset;
2716 			}
2717 		}
2718 
2719 		base_info->data_offset = data_offset;
2720 	}
2721 
2722 	if (base_info->data_offset >= bdev->blockcnt) {
2723 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
2724 			    base_info->data_offset, bdev->blockcnt, base_info->name);
2725 		rc = -EINVAL;
2726 		goto out;
2727 	}
2728 
2729 	if (base_info->data_size == 0) {
2730 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
2731 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
2732 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
2733 			    bdev->blockcnt, base_info->name);
2734 		rc = -EINVAL;
2735 		goto out;
2736 	}
2737 
2738 	/* Currently, RAID bdevs do not support DIF or DIX, so a RAID bdev cannot
2739 	 * be created on top of any bdev which supports it */
2740 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
2741 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
2742 			    bdev->name);
2743 		rc = -EINVAL;
2744 		goto out;
2745 	}
2746 
2747 	/*
2748 	 * Set the raid bdev properties if this is the first base bdev configured,
2749 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
2750 	 * have the same blocklen and metadata format.
2751 	 */
2752 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2753 		raid_bdev->bdev.blocklen = bdev->blocklen;
2754 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
2755 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
2756 	} else {
2757 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
2758 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
2759 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
2760 			rc = -EINVAL;
2761 			goto out;
2762 		}
2763 
2764 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
2765 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev)) {
2766 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
2767 				    raid_bdev->bdev.name, bdev->name);
2768 			rc = -EINVAL;
2769 			goto out;
2770 		}
2771 	}
2772 
2773 	base_info->configure_cb = cb_fn;
2774 	base_info->configure_cb_ctx = cb_ctx;
2775 
2776 	if (existing) {
2777 		raid_bdev_configure_base_bdev_cont(base_info);
2778 	} else {
2779 		/* check for existing superblock when using a new bdev */
2780 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
2781 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
2782 		if (rc) {
2783 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
2784 				    bdev->name, spdk_strerror(-rc));
2785 		}
2786 	}
2787 out:
2788 	if (rc != 0) {
2789 		raid_bdev_free_base_bdev_resource(base_info);
2790 	}
2791 	return rc;
2792 }
2793 
2794 static int
2795 _raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
2796 			   uint64_t data_offset, uint64_t data_size,
2797 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
2798 {
2799 	struct raid_base_bdev_info *base_info;
2800 
2801 	assert(name != NULL);
2802 
2803 	if (slot >= raid_bdev->num_base_bdevs) {
2804 		return -EINVAL;
2805 	}
2806 
2807 	base_info = &raid_bdev->base_bdev_info[slot];
2808 
2809 	if (base_info->name != NULL) {
2810 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev '%s'\n",
2811 			    slot, raid_bdev->bdev.name, base_info->name);
2812 		return -EBUSY;
2813 	}
2814 
2815 	if (!spdk_uuid_is_null(&base_info->uuid)) {
2816 		char uuid_str[SPDK_UUID_STRING_LEN];
2817 
2818 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
2819 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev with uuid %s\n",
2820 			    slot, raid_bdev->bdev.name, uuid_str);
2821 		return -EBUSY;
2822 	}
2823 
2824 	base_info->name = strdup(name);
2825 	if (base_info->name == NULL) {
2826 		return -ENOMEM;
2827 	}
2828 
2829 	base_info->data_offset = data_offset;
2830 	base_info->data_size = data_size;
2831 
2832 	return raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
2833 }
2834 
2835 int
2836 raid_bdev_attach_base_bdev(struct raid_bdev *raid_bdev, struct spdk_bdev *base_bdev,
2837 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
2838 {
2839 	struct raid_base_bdev_info *base_info = NULL, *iter;
2840 	int rc;
2841 
2842 	SPDK_DEBUGLOG(bdev_raid, "attach_base_device: %s\n", base_bdev->name);
2843 
2844 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2845 
2846 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2847 		SPDK_ERRLOG("raid bdev '%s' must be in online state to attach base bdev\n",
2848 			    raid_bdev->bdev.name);
2849 		return -EINVAL;
2850 	}
2851 
2852 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
2853 		if (iter->desc == NULL) {
2854 			base_info = iter;
2855 			break;
2856 		}
2857 	}
2858 
2859 	if (base_info == NULL) {
2860 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
2861 			    raid_bdev->bdev.name, base_bdev->name);
2862 		return -EINVAL;
2863 	}
2864 
2865 	assert(base_info->is_configured == false);
2866 	assert(base_info->data_size != 0);
2867 
2868 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
2869 
2870 	rc = _raid_bdev_add_base_device(raid_bdev, base_bdev->name,
2871 					raid_bdev_base_bdev_slot(base_info),
2872 					base_info->data_offset, base_info->data_size,
2873 					cb_fn, cb_ctx);
2874 	if (rc != 0) {
2875 		SPDK_ERRLOG("base bdev '%s' attach failed: %s\n", base_bdev->name, spdk_strerror(-rc));
2876 		raid_bdev_free_base_bdev_resource(base_info);
2877 	}
2878 
2879 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
2880 
2881 	return rc;
2882 }
2883 
2884 /*
2885  * brief:
2886  * raid_bdev_add_base_device function is the actual function which either adds
2887  * the nvme base device to existing raid bdev or create a new raid bdev. It also claims
2888  * the base device and keep the open descriptor.
2889  * params:
2890  * raid_bdev - pointer to raid bdev
2891  * name - name of the base bdev
2892  * slot - position to add base bdev
2893  * returns:
2894  * 0 - success
2895  * non zero - failure
2896  */
2897 int
2898 raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot)
2899 {
2900 	return _raid_bdev_add_base_device(raid_bdev, name, slot, 0, 0, NULL, NULL);
2901 }
2902 
2903 static int
2904 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
2905 {
2906 	struct raid_bdev *raid_bdev;
2907 	uint8_t i;
2908 	int rc;
2909 
2910 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
2911 			       sb->level, true, &sb->uuid, &raid_bdev);
2912 	if (rc != 0) {
2913 		return rc;
2914 	}
2915 
2916 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
2917 	memcpy(raid_bdev->sb, sb, sb->length);
2918 
2919 	for (i = 0; i < sb->base_bdevs_size; i++) {
2920 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2921 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2922 
2923 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
2924 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
2925 			raid_bdev->num_base_bdevs_operational++;
2926 		}
2927 
2928 		base_info->data_offset = sb_base_bdev->data_offset;
2929 		base_info->data_size = sb_base_bdev->data_size;
2930 	}
2931 
2932 	*raid_bdev_out = raid_bdev;
2933 	return 0;
2934 }
2935 
2936 static void
2937 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
2938 {
2939 	struct raid_bdev *raid_bdev;
2940 	struct raid_base_bdev_info *base_info;
2941 
2942 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
2943 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2944 			if (base_info->desc == NULL && base_info->name != NULL &&
2945 			    strcmp(bdev->name, base_info->name) == 0) {
2946 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
2947 				break;
2948 			}
2949 		}
2950 	}
2951 }
2952 
2953 static void
2954 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev)
2955 {
2956 	const struct raid_bdev_sb_base_bdev *sb_base_bdev;
2957 	struct raid_bdev *raid_bdev;
2958 	struct raid_base_bdev_info *iter, *base_info;
2959 	uint8_t i;
2960 	int rc;
2961 
2962 	if (sb->block_size != bdev->blocklen) {
2963 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
2964 			     bdev->name, sb->block_size, bdev->blocklen);
2965 		return;
2966 	}
2967 
2968 	if (spdk_uuid_is_null(&sb->uuid)) {
2969 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
2970 		return;
2971 	}
2972 
2973 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
2974 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, &sb->uuid) == 0) {
2975 			break;
2976 		}
2977 	}
2978 
2979 	if (raid_bdev) {
2980 		if (sb->seq_number > raid_bdev->sb->seq_number) {
2981 			SPDK_DEBUGLOG(bdev_raid,
2982 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
2983 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
2984 
2985 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
2986 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
2987 					     raid_bdev->bdev.name, bdev->name);
2988 				return;
2989 			}
2990 
2991 			/* remove and then recreate the raid bdev using the newer superblock */
2992 			raid_bdev_delete(raid_bdev, NULL, NULL);
2993 			raid_bdev = NULL;
2994 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
2995 			SPDK_DEBUGLOG(bdev_raid,
2996 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
2997 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
2998 			/* use the current raid bdev superblock */
2999 			sb = raid_bdev->sb;
3000 		}
3001 	}
3002 
3003 	for (i = 0; i < sb->base_bdevs_size; i++) {
3004 		sb_base_bdev = &sb->base_bdevs[i];
3005 
3006 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3007 
3008 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3009 			break;
3010 		}
3011 	}
3012 
3013 	if (i == sb->base_bdevs_size) {
3014 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3015 		return;
3016 	}
3017 
3018 	if (!raid_bdev) {
3019 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3020 		if (rc != 0) {
3021 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3022 				    sb->name, spdk_strerror(-rc));
3023 			return;
3024 		}
3025 	}
3026 
3027 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3028 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3029 			       bdev->name, raid_bdev->bdev.name);
3030 		return;
3031 	}
3032 
3033 	base_info = NULL;
3034 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3035 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3036 			base_info = iter;
3037 			break;
3038 		}
3039 	}
3040 
3041 	if (base_info == NULL) {
3042 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3043 			    bdev->name, raid_bdev->bdev.name);
3044 		return;
3045 	}
3046 
3047 	rc = raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3048 	if (rc != 0) {
3049 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3050 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3051 	}
3052 }
3053 
3054 struct raid_bdev_examine_ctx {
3055 	struct spdk_bdev_desc *desc;
3056 	struct spdk_io_channel *ch;
3057 };
3058 
3059 static void
3060 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3061 {
3062 	if (!ctx) {
3063 		return;
3064 	}
3065 
3066 	if (ctx->ch) {
3067 		spdk_put_io_channel(ctx->ch);
3068 	}
3069 
3070 	if (ctx->desc) {
3071 		spdk_bdev_close(ctx->desc);
3072 	}
3073 
3074 	free(ctx);
3075 }
3076 
3077 static void
3078 raid_bdev_examine_load_sb_cb(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3079 {
3080 	struct raid_bdev_examine_ctx *ctx = _ctx;
3081 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3082 
3083 	switch (status) {
3084 	case 0:
3085 		/* valid superblock found */
3086 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3087 		raid_bdev_examine_sb(sb, bdev);
3088 		break;
3089 	case -EINVAL:
3090 		/* no valid superblock, check if it can be claimed anyway */
3091 		raid_bdev_examine_no_sb(bdev);
3092 		break;
3093 	default:
3094 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3095 			    bdev->name, spdk_strerror(-status));
3096 		break;
3097 	}
3098 
3099 	raid_bdev_examine_ctx_free(ctx);
3100 	spdk_bdev_module_examine_done(&g_raid_if);
3101 }
3102 
3103 static void
3104 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3105 {
3106 }
3107 
3108 /*
3109  * brief:
3110  * raid_bdev_examine function is the examine function call by the below layers
3111  * like bdev_nvme layer. This function will check if this base bdev can be
3112  * claimed by this raid bdev or not.
3113  * params:
3114  * bdev - pointer to base bdev
3115  * returns:
3116  * none
3117  */
3118 static void
3119 raid_bdev_examine(struct spdk_bdev *bdev)
3120 {
3121 	struct raid_bdev_examine_ctx *ctx;
3122 	int rc;
3123 
3124 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3125 		raid_bdev_examine_no_sb(bdev);
3126 		spdk_bdev_module_examine_done(&g_raid_if);
3127 		return;
3128 	}
3129 
3130 	ctx = calloc(1, sizeof(*ctx));
3131 	if (!ctx) {
3132 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3133 			    bdev->name, spdk_strerror(ENOMEM));
3134 		goto err;
3135 	}
3136 
3137 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, raid_bdev_examine_event_cb, NULL,
3138 				&ctx->desc);
3139 	if (rc) {
3140 		SPDK_ERRLOG("Failed to open bdev %s: %s\n",
3141 			    bdev->name, spdk_strerror(-rc));
3142 		goto err;
3143 	}
3144 
3145 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3146 	if (!ctx->ch) {
3147 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev->name);
3148 		goto err;
3149 	}
3150 
3151 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_cb, ctx);
3152 	if (rc) {
3153 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3154 			    bdev->name, spdk_strerror(-rc));
3155 		goto err;
3156 	}
3157 
3158 	return;
3159 err:
3160 	raid_bdev_examine_ctx_free(ctx);
3161 	spdk_bdev_module_examine_done(&g_raid_if);
3162 }
3163 
3164 /* Log component for bdev raid bdev module */
3165 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3166