xref: /spdk/module/bdev/raid/bdev_raid.c (revision 42fd001310188f0635a3953f3b0ea0b33a840902)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 /* Function declarations */
141 static void	raid_bdev_examine(struct spdk_bdev *bdev);
142 static int	raid_bdev_init(void);
143 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
144 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
145 
146 static void
147 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
148 {
149 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
150 
151 	if (raid_ch->process.target_ch != NULL) {
152 		spdk_put_io_channel(raid_ch->process.target_ch);
153 		raid_ch->process.target_ch = NULL;
154 	}
155 
156 	if (raid_ch->process.ch_processed != NULL) {
157 		free(raid_ch->process.ch_processed->base_channel);
158 		free(raid_ch->process.ch_processed);
159 		raid_ch->process.ch_processed = NULL;
160 	}
161 }
162 
163 static int
164 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
165 {
166 	struct raid_bdev *raid_bdev = process->raid_bdev;
167 	struct raid_bdev_io_channel *raid_ch_processed;
168 	struct raid_base_bdev_info *base_info;
169 
170 	raid_ch->process.offset = process->window_offset;
171 
172 	/* In the future we may have other types of processes which don't use a target bdev,
173 	 * like data scrubbing or strip size migration. Until then, expect that there always is
174 	 * a process target. */
175 	assert(process->target != NULL);
176 
177 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
178 	if (raid_ch->process.target_ch == NULL) {
179 		goto err;
180 	}
181 
182 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
183 	if (raid_ch_processed == NULL) {
184 		goto err;
185 	}
186 	raid_ch->process.ch_processed = raid_ch_processed;
187 
188 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
189 					  sizeof(*raid_ch_processed->base_channel));
190 	if (raid_ch_processed->base_channel == NULL) {
191 		goto err;
192 	}
193 
194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
195 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
196 
197 		if (base_info != process->target) {
198 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
199 		} else {
200 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
201 		}
202 	}
203 
204 	raid_ch_processed->module_channel = raid_ch->module_channel;
205 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
206 
207 	return 0;
208 err:
209 	raid_bdev_ch_process_cleanup(raid_ch);
210 	return -ENOMEM;
211 }
212 
213 /*
214  * brief:
215  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
216  * hierarchy from raid bdev to base bdev io channels. It will be called per core
217  * params:
218  * io_device - pointer to raid bdev io device represented by raid_bdev
219  * ctx_buf - pointer to context buffer for raid bdev io channel
220  * returns:
221  * 0 - success
222  * non zero - failure
223  */
224 static int
225 raid_bdev_create_cb(void *io_device, void *ctx_buf)
226 {
227 	struct raid_bdev            *raid_bdev = io_device;
228 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
229 	uint8_t i;
230 	int ret = -ENOMEM;
231 
232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
233 
234 	assert(raid_bdev != NULL);
235 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
236 
237 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
238 	if (!raid_ch->base_channel) {
239 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
240 		return -ENOMEM;
241 	}
242 
243 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
244 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
245 		/*
246 		 * Get the spdk_io_channel for all the base bdevs. This is used during
247 		 * split logic to send the respective child bdev ios to respective base
248 		 * bdev io channel.
249 		 * Skip missing base bdevs and the process target, which should also be treated as
250 		 * missing until the process completes.
251 		 */
252 		if (raid_bdev->base_bdev_info[i].desc == NULL ||
253 		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
254 			continue;
255 		}
256 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
257 						   raid_bdev->base_bdev_info[i].desc);
258 		if (!raid_ch->base_channel[i]) {
259 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
260 			goto err;
261 		}
262 	}
263 
264 	if (raid_bdev->process != NULL) {
265 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
266 		if (ret != 0) {
267 			SPDK_ERRLOG("Failed to setup process io channel\n");
268 			goto err;
269 		}
270 	} else {
271 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
272 	}
273 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
274 
275 	if (raid_bdev->module->get_io_channel) {
276 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
277 		if (!raid_ch->module_channel) {
278 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
279 			goto err_unlocked;
280 		}
281 	}
282 
283 	return 0;
284 err:
285 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
286 err_unlocked:
287 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
288 		if (raid_ch->base_channel[i] != NULL) {
289 			spdk_put_io_channel(raid_ch->base_channel[i]);
290 		}
291 	}
292 	free(raid_ch->base_channel);
293 
294 	raid_bdev_ch_process_cleanup(raid_ch);
295 
296 	return ret;
297 }
298 
299 /*
300  * brief:
301  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
302  * hierarchy from raid bdev to base bdev io channels. It will be called per core
303  * params:
304  * io_device - pointer to raid bdev io device represented by raid_bdev
305  * ctx_buf - pointer to context buffer for raid bdev io channel
306  * returns:
307  * none
308  */
309 static void
310 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
311 {
312 	struct raid_bdev *raid_bdev = io_device;
313 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
314 	uint8_t i;
315 
316 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
317 
318 	assert(raid_ch != NULL);
319 	assert(raid_ch->base_channel);
320 
321 	if (raid_ch->module_channel) {
322 		spdk_put_io_channel(raid_ch->module_channel);
323 	}
324 
325 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
326 		/* Free base bdev channels */
327 		if (raid_ch->base_channel[i] != NULL) {
328 			spdk_put_io_channel(raid_ch->base_channel[i]);
329 		}
330 	}
331 	free(raid_ch->base_channel);
332 	raid_ch->base_channel = NULL;
333 
334 	raid_bdev_ch_process_cleanup(raid_ch);
335 }
336 
337 /*
338  * brief:
339  * raid_bdev_cleanup is used to cleanup raid_bdev related data
340  * structures.
341  * params:
342  * raid_bdev - pointer to raid_bdev
343  * returns:
344  * none
345  */
346 static void
347 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
348 {
349 	struct raid_base_bdev_info *base_info;
350 
351 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
352 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
353 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
354 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
355 
356 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
357 		assert(base_info->desc == NULL);
358 		free(base_info->name);
359 	}
360 
361 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
362 }
363 
364 static void
365 raid_bdev_free(struct raid_bdev *raid_bdev)
366 {
367 	spdk_dma_free(raid_bdev->sb);
368 	spdk_spin_destroy(&raid_bdev->base_bdev_lock);
369 	free(raid_bdev->base_bdev_info);
370 	free(raid_bdev->bdev.name);
371 	free(raid_bdev);
372 }
373 
374 static void
375 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
376 {
377 	raid_bdev_cleanup(raid_bdev);
378 	raid_bdev_free(raid_bdev);
379 }
380 
381 /*
382  * brief:
383  * free resource of base bdev for raid bdev
384  * params:
385  * base_info - raid base bdev info
386  * returns:
387  * none
388  */
389 static void
390 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
391 {
392 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
393 
394 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
395 
396 	free(base_info->name);
397 	base_info->name = NULL;
398 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
399 		spdk_uuid_set_null(&base_info->uuid);
400 	}
401 
402 	if (base_info->desc == NULL) {
403 		return;
404 	}
405 
406 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
407 	spdk_bdev_close(base_info->desc);
408 	base_info->desc = NULL;
409 	spdk_put_io_channel(base_info->app_thread_ch);
410 	base_info->app_thread_ch = NULL;
411 
412 	if (base_info->is_configured) {
413 		assert(raid_bdev->num_base_bdevs_discovered);
414 		raid_bdev->num_base_bdevs_discovered--;
415 		base_info->is_configured = false;
416 	}
417 }
418 
419 static void
420 raid_bdev_io_device_unregister_cb(void *io_device)
421 {
422 	struct raid_bdev *raid_bdev = io_device;
423 
424 	if (raid_bdev->num_base_bdevs_discovered == 0) {
425 		/* Free raid_bdev when there are no base bdevs left */
426 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
427 		raid_bdev_cleanup(raid_bdev);
428 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
429 		raid_bdev_free(raid_bdev);
430 	} else {
431 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
432 	}
433 }
434 
435 void
436 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
437 {
438 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
439 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
440 	}
441 }
442 
443 static void
444 _raid_bdev_destruct(void *ctxt)
445 {
446 	struct raid_bdev *raid_bdev = ctxt;
447 	struct raid_base_bdev_info *base_info;
448 
449 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
450 
451 	assert(raid_bdev->process == NULL);
452 
453 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
454 		/*
455 		 * Close all base bdev descriptors for which call has come from below
456 		 * layers.  Also close the descriptors if we have started shutdown.
457 		 */
458 		if (g_shutdown_started || base_info->remove_scheduled == true) {
459 			raid_bdev_free_base_bdev_resource(base_info);
460 		}
461 	}
462 
463 	if (g_shutdown_started) {
464 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
465 	}
466 
467 	if (raid_bdev->module->stop != NULL) {
468 		if (raid_bdev->module->stop(raid_bdev) == false) {
469 			return;
470 		}
471 	}
472 
473 	raid_bdev_module_stop_done(raid_bdev);
474 }
475 
476 static int
477 raid_bdev_destruct(void *ctx)
478 {
479 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
480 
481 	return 1;
482 }
483 
484 void
485 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
486 {
487 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
488 
489 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
490 		struct iovec *split_iov = raid_io->split.iov;
491 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
492 
493 		/*
494 		 * Non-zero offset here means that this is the completion of the first part of the
495 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
496 		 */
497 		if (raid_io->split.offset != 0) {
498 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
499 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
500 
501 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
502 				raid_io->num_blocks = raid_io->split.offset;
503 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
504 				raid_io->iovs = bdev_io->u.bdev.iovs;
505 				if (split_iov != NULL) {
506 					raid_io->iovcnt++;
507 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
508 					split_iov->iov_base = split_iov_orig->iov_base;
509 				}
510 
511 				raid_io->split.offset = 0;
512 				raid_io->base_bdev_io_submitted = 0;
513 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
514 
515 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
516 				return;
517 			}
518 		}
519 
520 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
521 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
522 		raid_io->iovs = bdev_io->u.bdev.iovs;
523 		if (split_iov != NULL) {
524 			*split_iov = *split_iov_orig;
525 		}
526 	}
527 
528 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
529 		raid_io->completion_cb(raid_io, status);
530 	} else {
531 		spdk_bdev_io_complete(bdev_io, status);
532 	}
533 }
534 
535 /*
536  * brief:
537  * raid_bdev_io_complete_part - signal the completion of a part of the expected
538  * base bdev IOs and complete the raid_io if this is the final expected IO.
539  * The caller should first set raid_io->base_bdev_io_remaining. This function
540  * will decrement this counter by the value of the 'completed' parameter and
541  * complete the raid_io if the counter reaches 0. The caller is free to
542  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
543  * it can represent e.g. blocks or IOs.
544  * params:
545  * raid_io - pointer to raid_bdev_io
546  * completed - the part of the raid_io that has been completed
547  * status - status of the base IO
548  * returns:
549  * true - if the raid_io is completed
550  * false - otherwise
551  */
552 bool
553 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
554 			   enum spdk_bdev_io_status status)
555 {
556 	assert(raid_io->base_bdev_io_remaining >= completed);
557 	raid_io->base_bdev_io_remaining -= completed;
558 
559 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
560 		raid_io->base_bdev_io_status = status;
561 	}
562 
563 	if (raid_io->base_bdev_io_remaining == 0) {
564 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
565 		return true;
566 	} else {
567 		return false;
568 	}
569 }
570 
571 /*
572  * brief:
573  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
574  * It will try to queue the IOs after storing the context to bdev wait queue logic.
575  * params:
576  * raid_io - pointer to raid_bdev_io
577  * bdev - the block device that the IO is submitted to
578  * ch - io channel
579  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
580  * returns:
581  * none
582  */
583 void
584 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
585 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
586 {
587 	raid_io->waitq_entry.bdev = bdev;
588 	raid_io->waitq_entry.cb_fn = cb_fn;
589 	raid_io->waitq_entry.cb_arg = raid_io;
590 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
591 }
592 
593 static void
594 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
595 {
596 	struct raid_bdev_io *raid_io = cb_arg;
597 
598 	spdk_bdev_free_io(bdev_io);
599 
600 	raid_bdev_io_complete_part(raid_io, 1, success ?
601 				   SPDK_BDEV_IO_STATUS_SUCCESS :
602 				   SPDK_BDEV_IO_STATUS_FAILED);
603 }
604 
605 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
606 
607 static void
608 _raid_bdev_submit_reset_request(void *_raid_io)
609 {
610 	struct raid_bdev_io *raid_io = _raid_io;
611 
612 	raid_bdev_submit_reset_request(raid_io);
613 }
614 
615 /*
616  * brief:
617  * raid_bdev_submit_reset_request function submits reset requests
618  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
619  * which case it will queue it for later submission
620  * params:
621  * raid_io
622  * returns:
623  * none
624  */
625 static void
626 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
627 {
628 	struct raid_bdev		*raid_bdev;
629 	int				ret;
630 	uint8_t				i;
631 	struct raid_base_bdev_info	*base_info;
632 	struct spdk_io_channel		*base_ch;
633 
634 	raid_bdev = raid_io->raid_bdev;
635 
636 	if (raid_io->base_bdev_io_remaining == 0) {
637 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
638 	}
639 
640 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
641 		base_info = &raid_bdev->base_bdev_info[i];
642 		base_ch = raid_io->raid_ch->base_channel[i];
643 		if (base_ch == NULL) {
644 			raid_io->base_bdev_io_submitted++;
645 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
646 			continue;
647 		}
648 		ret = spdk_bdev_reset(base_info->desc, base_ch,
649 				      raid_base_bdev_reset_complete, raid_io);
650 		if (ret == 0) {
651 			raid_io->base_bdev_io_submitted++;
652 		} else if (ret == -ENOMEM) {
653 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
654 						base_ch, _raid_bdev_submit_reset_request);
655 			return;
656 		} else {
657 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
658 			assert(false);
659 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
660 			return;
661 		}
662 	}
663 }
664 
665 static void
666 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
667 {
668 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
669 	size_t iov_offset = (split_offset << raid_bdev->blocklen_shift);
670 	int i;
671 
672 	assert(split_offset != 0);
673 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
674 	raid_io->split.offset = split_offset;
675 
676 	raid_io->offset_blocks += split_offset;
677 	raid_io->num_blocks -= split_offset;
678 	if (raid_io->md_buf != NULL) {
679 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
680 	}
681 
682 	for (i = 0; i < raid_io->iovcnt; i++) {
683 		struct iovec *iov = &raid_io->iovs[i];
684 
685 		if (iov_offset < iov->iov_len) {
686 			if (iov_offset == 0) {
687 				raid_io->split.iov = NULL;
688 			} else {
689 				raid_io->split.iov = iov;
690 				raid_io->split.iov_copy = *iov;
691 				iov->iov_base += iov_offset;
692 				iov->iov_len -= iov_offset;
693 			}
694 			raid_io->iovs += i;
695 			raid_io->iovcnt -= i;
696 			break;
697 		}
698 
699 		iov_offset -= iov->iov_len;
700 	}
701 }
702 
703 static void
704 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
705 {
706 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
707 
708 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
709 		uint64_t offset_begin = raid_io->offset_blocks;
710 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
711 
712 		if (offset_end > raid_ch->process.offset) {
713 			if (offset_begin < raid_ch->process.offset) {
714 				/*
715 				 * If the I/O spans both the processed and unprocessed ranges,
716 				 * split it and first handle the unprocessed part. After it
717 				 * completes, the rest will be handled.
718 				 * This situation occurs when the process thread is not active
719 				 * or is waiting for the process window range to be locked
720 				 * (quiesced). When a window is being processed, such I/Os will be
721 				 * deferred by the bdev layer until the window is unlocked.
722 				 */
723 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
724 					      raid_ch->process.offset, offset_begin, offset_end);
725 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
726 			}
727 		} else {
728 			/* Use the child channel, which corresponds to the already processed range */
729 			raid_io->raid_ch = raid_ch->process.ch_processed;
730 		}
731 	}
732 
733 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
734 }
735 
736 /*
737  * brief:
738  * Callback function to spdk_bdev_io_get_buf.
739  * params:
740  * ch - pointer to raid bdev io channel
741  * bdev_io - pointer to parent bdev_io on raid bdev device
742  * success - True if buffer is allocated or false otherwise.
743  * returns:
744  * none
745  */
746 static void
747 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
748 		     bool success)
749 {
750 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
751 
752 	if (!success) {
753 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
754 		return;
755 	}
756 
757 	raid_bdev_submit_rw_request(raid_io);
758 }
759 
760 void
761 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
762 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
763 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
764 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
765 {
766 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
767 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
768 
769 	raid_io->type = type;
770 	raid_io->offset_blocks = offset_blocks;
771 	raid_io->num_blocks = num_blocks;
772 	raid_io->iovs = iovs;
773 	raid_io->iovcnt = iovcnt;
774 	raid_io->memory_domain = memory_domain;
775 	raid_io->memory_domain_ctx = memory_domain_ctx;
776 	raid_io->md_buf = md_buf;
777 
778 	raid_io->raid_bdev = raid_bdev;
779 	raid_io->raid_ch = raid_ch;
780 	raid_io->base_bdev_io_remaining = 0;
781 	raid_io->base_bdev_io_submitted = 0;
782 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
783 	raid_io->completion_cb = NULL;
784 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
785 }
786 
787 /*
788  * brief:
789  * raid_bdev_submit_request function is the submit_request function pointer of
790  * raid bdev function table. This is used to submit the io on raid_bdev to below
791  * layers.
792  * params:
793  * ch - pointer to raid bdev io channel
794  * bdev_io - pointer to parent bdev_io on raid bdev device
795  * returns:
796  * none
797  */
798 static void
799 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
800 {
801 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
802 
803 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
804 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
805 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
806 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
807 
808 	switch (bdev_io->type) {
809 	case SPDK_BDEV_IO_TYPE_READ:
810 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
811 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
812 		break;
813 	case SPDK_BDEV_IO_TYPE_WRITE:
814 		raid_bdev_submit_rw_request(raid_io);
815 		break;
816 
817 	case SPDK_BDEV_IO_TYPE_RESET:
818 		raid_bdev_submit_reset_request(raid_io);
819 		break;
820 
821 	case SPDK_BDEV_IO_TYPE_FLUSH:
822 	case SPDK_BDEV_IO_TYPE_UNMAP:
823 		if (raid_io->raid_bdev->process != NULL) {
824 			/* TODO: rebuild support */
825 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
826 			return;
827 		}
828 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
829 		break;
830 
831 	default:
832 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
833 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
834 		break;
835 	}
836 }
837 
838 /*
839  * brief:
840  * _raid_bdev_io_type_supported checks whether io_type is supported in
841  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
842  * doesn't support, the raid device doesn't supports.
843  *
844  * params:
845  * raid_bdev - pointer to raid bdev context
846  * io_type - io type
847  * returns:
848  * true - io_type is supported
849  * false - io_type is not supported
850  */
851 inline static bool
852 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
853 {
854 	struct raid_base_bdev_info *base_info;
855 
856 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
857 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
858 		if (raid_bdev->module->submit_null_payload_request == NULL) {
859 			return false;
860 		}
861 	}
862 
863 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
864 		if (base_info->desc == NULL) {
865 			continue;
866 		}
867 
868 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
869 			return false;
870 		}
871 	}
872 
873 	return true;
874 }
875 
876 /*
877  * brief:
878  * raid_bdev_io_type_supported is the io_supported function for bdev function
879  * table which returns whether the particular io type is supported or not by
880  * raid bdev module
881  * params:
882  * ctx - pointer to raid bdev context
883  * type - io type
884  * returns:
885  * true - io_type is supported
886  * false - io_type is not supported
887  */
888 static bool
889 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
890 {
891 	switch (io_type) {
892 	case SPDK_BDEV_IO_TYPE_READ:
893 	case SPDK_BDEV_IO_TYPE_WRITE:
894 		return true;
895 
896 	case SPDK_BDEV_IO_TYPE_FLUSH:
897 	case SPDK_BDEV_IO_TYPE_RESET:
898 	case SPDK_BDEV_IO_TYPE_UNMAP:
899 		return _raid_bdev_io_type_supported(ctx, io_type);
900 
901 	default:
902 		return false;
903 	}
904 
905 	return false;
906 }
907 
908 /*
909  * brief:
910  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
911  * raid bdev. This is used to return the io channel for this raid bdev
912  * params:
913  * ctxt - pointer to raid_bdev
914  * returns:
915  * pointer to io channel for raid bdev
916  */
917 static struct spdk_io_channel *
918 raid_bdev_get_io_channel(void *ctxt)
919 {
920 	struct raid_bdev *raid_bdev = ctxt;
921 
922 	return spdk_get_io_channel(raid_bdev);
923 }
924 
925 void
926 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
927 {
928 	struct raid_base_bdev_info *base_info;
929 	char uuid_str[SPDK_UUID_STRING_LEN];
930 
931 	assert(raid_bdev != NULL);
932 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
933 
934 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
935 	spdk_json_write_named_string(w, "uuid", uuid_str);
936 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
937 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
938 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
939 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
940 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
941 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
942 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
943 				     raid_bdev->num_base_bdevs_operational);
944 	if (raid_bdev->process) {
945 		struct raid_bdev_process *process = raid_bdev->process;
946 		uint64_t offset = process->window_offset;
947 
948 		spdk_json_write_named_object_begin(w, "process");
949 		spdk_json_write_name(w, "type");
950 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
951 		spdk_json_write_named_string(w, "target", process->target->name);
952 		spdk_json_write_named_object_begin(w, "progress");
953 		spdk_json_write_named_uint64(w, "blocks", offset);
954 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
955 		spdk_json_write_object_end(w);
956 		spdk_json_write_object_end(w);
957 	}
958 	spdk_json_write_name(w, "base_bdevs_list");
959 	spdk_json_write_array_begin(w);
960 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
961 		spdk_json_write_object_begin(w);
962 		spdk_json_write_name(w, "name");
963 		if (base_info->name) {
964 			spdk_json_write_string(w, base_info->name);
965 		} else {
966 			spdk_json_write_null(w);
967 		}
968 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
969 		spdk_json_write_named_string(w, "uuid", uuid_str);
970 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
971 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
972 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
973 		spdk_json_write_object_end(w);
974 	}
975 	spdk_json_write_array_end(w);
976 }
977 
978 /*
979  * brief:
980  * raid_bdev_dump_info_json is the function table pointer for raid bdev
981  * params:
982  * ctx - pointer to raid_bdev
983  * w - pointer to json context
984  * returns:
985  * 0 - success
986  * non zero - failure
987  */
988 static int
989 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
990 {
991 	struct raid_bdev *raid_bdev = ctx;
992 
993 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
994 
995 	/* Dump the raid bdev configuration related information */
996 	spdk_json_write_named_object_begin(w, "raid");
997 	raid_bdev_write_info_json(raid_bdev, w);
998 	spdk_json_write_object_end(w);
999 
1000 	return 0;
1001 }
1002 
1003 /*
1004  * brief:
1005  * raid_bdev_write_config_json is the function table pointer for raid bdev
1006  * params:
1007  * bdev - pointer to spdk_bdev
1008  * w - pointer to json context
1009  * returns:
1010  * none
1011  */
1012 static void
1013 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1014 {
1015 	struct raid_bdev *raid_bdev = bdev->ctxt;
1016 	struct raid_base_bdev_info *base_info;
1017 	char uuid_str[SPDK_UUID_STRING_LEN];
1018 
1019 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1020 
1021 	if (raid_bdev->sb != NULL) {
1022 		/* raid bdev configuration is stored in the superblock */
1023 		return;
1024 	}
1025 
1026 	spdk_json_write_object_begin(w);
1027 
1028 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1029 
1030 	spdk_json_write_named_object_begin(w, "params");
1031 	spdk_json_write_named_string(w, "name", bdev->name);
1032 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
1033 	spdk_json_write_named_string(w, "uuid", uuid_str);
1034 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1035 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1036 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
1037 
1038 	spdk_json_write_named_array_begin(w, "base_bdevs");
1039 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1040 		if (base_info->desc) {
1041 			spdk_json_write_string(w, spdk_bdev_desc_get_bdev(base_info->desc)->name);
1042 		}
1043 	}
1044 	spdk_json_write_array_end(w);
1045 	spdk_json_write_object_end(w);
1046 
1047 	spdk_json_write_object_end(w);
1048 }
1049 
1050 static int
1051 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1052 {
1053 	struct raid_bdev *raid_bdev = ctx;
1054 	struct raid_base_bdev_info *base_info;
1055 	int domains_count = 0, rc = 0;
1056 
1057 	if (raid_bdev->module->memory_domains_supported == false) {
1058 		return 0;
1059 	}
1060 
1061 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1062 
1063 	/* First loop to get the number of memory domains */
1064 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1065 		if (base_info->desc == NULL) {
1066 			continue;
1067 		}
1068 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1069 		if (rc < 0) {
1070 			goto out;
1071 		}
1072 		domains_count += rc;
1073 	}
1074 
1075 	if (!domains || array_size < domains_count) {
1076 		goto out;
1077 	}
1078 
1079 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1080 		if (base_info->desc == NULL) {
1081 			continue;
1082 		}
1083 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1084 		if (rc < 0) {
1085 			goto out;
1086 		}
1087 		domains += rc;
1088 		array_size -= rc;
1089 	}
1090 out:
1091 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1092 
1093 	if (rc < 0) {
1094 		return rc;
1095 	}
1096 
1097 	return domains_count;
1098 }
1099 
1100 /* g_raid_bdev_fn_table is the function table for raid bdev */
1101 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1102 	.destruct		= raid_bdev_destruct,
1103 	.submit_request		= raid_bdev_submit_request,
1104 	.io_type_supported	= raid_bdev_io_type_supported,
1105 	.get_io_channel		= raid_bdev_get_io_channel,
1106 	.dump_info_json		= raid_bdev_dump_info_json,
1107 	.write_config_json	= raid_bdev_write_config_json,
1108 	.get_memory_domains	= raid_bdev_get_memory_domains,
1109 };
1110 
1111 struct raid_bdev *
1112 raid_bdev_find_by_name(const char *name)
1113 {
1114 	struct raid_bdev *raid_bdev;
1115 
1116 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1117 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1118 			return raid_bdev;
1119 		}
1120 	}
1121 
1122 	return NULL;
1123 }
1124 
1125 static struct {
1126 	const char *name;
1127 	enum raid_level value;
1128 } g_raid_level_names[] = {
1129 	{ "raid0", RAID0 },
1130 	{ "0", RAID0 },
1131 	{ "raid1", RAID1 },
1132 	{ "1", RAID1 },
1133 	{ "raid5f", RAID5F },
1134 	{ "5f", RAID5F },
1135 	{ "concat", CONCAT },
1136 	{ }
1137 };
1138 
1139 const char *g_raid_state_names[] = {
1140 	[RAID_BDEV_STATE_ONLINE]	= "online",
1141 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1142 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1143 	[RAID_BDEV_STATE_MAX]		= NULL
1144 };
1145 
1146 static const char *g_raid_process_type_names[] = {
1147 	[RAID_PROCESS_NONE]	= "none",
1148 	[RAID_PROCESS_REBUILD]	= "rebuild",
1149 	[RAID_PROCESS_MAX]	= NULL
1150 };
1151 
1152 /* We have to use the typedef in the function declaration to appease astyle. */
1153 typedef enum raid_level raid_level_t;
1154 typedef enum raid_bdev_state raid_bdev_state_t;
1155 
1156 raid_level_t
1157 raid_bdev_str_to_level(const char *str)
1158 {
1159 	unsigned int i;
1160 
1161 	assert(str != NULL);
1162 
1163 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1164 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1165 			return g_raid_level_names[i].value;
1166 		}
1167 	}
1168 
1169 	return INVALID_RAID_LEVEL;
1170 }
1171 
1172 const char *
1173 raid_bdev_level_to_str(enum raid_level level)
1174 {
1175 	unsigned int i;
1176 
1177 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1178 		if (g_raid_level_names[i].value == level) {
1179 			return g_raid_level_names[i].name;
1180 		}
1181 	}
1182 
1183 	return "";
1184 }
1185 
1186 raid_bdev_state_t
1187 raid_bdev_str_to_state(const char *str)
1188 {
1189 	unsigned int i;
1190 
1191 	assert(str != NULL);
1192 
1193 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1194 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1195 			break;
1196 		}
1197 	}
1198 
1199 	return i;
1200 }
1201 
1202 const char *
1203 raid_bdev_state_to_str(enum raid_bdev_state state)
1204 {
1205 	if (state >= RAID_BDEV_STATE_MAX) {
1206 		return "";
1207 	}
1208 
1209 	return g_raid_state_names[state];
1210 }
1211 
1212 const char *
1213 raid_bdev_process_to_str(enum raid_process_type value)
1214 {
1215 	if (value >= RAID_PROCESS_MAX) {
1216 		return "";
1217 	}
1218 
1219 	return g_raid_process_type_names[value];
1220 }
1221 
1222 /*
1223  * brief:
1224  * raid_bdev_fini_start is called when bdev layer is starting the
1225  * shutdown process
1226  * params:
1227  * none
1228  * returns:
1229  * none
1230  */
1231 static void
1232 raid_bdev_fini_start(void)
1233 {
1234 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1235 	g_shutdown_started = true;
1236 }
1237 
1238 /*
1239  * brief:
1240  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1241  * params:
1242  * none
1243  * returns:
1244  * none
1245  */
1246 static void
1247 raid_bdev_exit(void)
1248 {
1249 	struct raid_bdev *raid_bdev, *tmp;
1250 
1251 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1252 
1253 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1254 		raid_bdev_cleanup_and_free(raid_bdev);
1255 	}
1256 }
1257 
1258 static void
1259 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1260 {
1261 	spdk_json_write_object_begin(w);
1262 
1263 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1264 
1265 	spdk_json_write_named_object_begin(w, "params");
1266 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1267 	spdk_json_write_object_end(w);
1268 
1269 	spdk_json_write_object_end(w);
1270 }
1271 
1272 static int
1273 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1274 {
1275 	raid_bdev_opts_config_json(w);
1276 
1277 	return 0;
1278 }
1279 
1280 /*
1281  * brief:
1282  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1283  * module
1284  * params:
1285  * none
1286  * returns:
1287  * size of spdk_bdev_io context for raid
1288  */
1289 static int
1290 raid_bdev_get_ctx_size(void)
1291 {
1292 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1293 	return sizeof(struct raid_bdev_io);
1294 }
1295 
1296 static struct spdk_bdev_module g_raid_if = {
1297 	.name = "raid",
1298 	.module_init = raid_bdev_init,
1299 	.fini_start = raid_bdev_fini_start,
1300 	.module_fini = raid_bdev_exit,
1301 	.config_json = raid_bdev_config_json,
1302 	.get_ctx_size = raid_bdev_get_ctx_size,
1303 	.examine_disk = raid_bdev_examine,
1304 	.async_init = false,
1305 	.async_fini = false,
1306 };
1307 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1308 
1309 /*
1310  * brief:
1311  * raid_bdev_init is the initialization function for raid bdev module
1312  * params:
1313  * none
1314  * returns:
1315  * 0 - success
1316  * non zero - failure
1317  */
1318 static int
1319 raid_bdev_init(void)
1320 {
1321 	return 0;
1322 }
1323 
1324 static int
1325 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1326 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1327 		  struct raid_bdev **raid_bdev_out)
1328 {
1329 	struct raid_bdev *raid_bdev;
1330 	struct spdk_bdev *raid_bdev_gen;
1331 	struct raid_bdev_module *module;
1332 	struct raid_base_bdev_info *base_info;
1333 	uint8_t min_operational;
1334 
1335 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1336 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1337 		return -EINVAL;
1338 	}
1339 
1340 	if (raid_bdev_find_by_name(name) != NULL) {
1341 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1342 		return -EEXIST;
1343 	}
1344 
1345 	if (level == RAID1) {
1346 		if (strip_size != 0) {
1347 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1348 			return -EINVAL;
1349 		}
1350 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1351 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1352 		return -EINVAL;
1353 	}
1354 
1355 	module = raid_bdev_module_find(level);
1356 	if (module == NULL) {
1357 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1358 		return -EINVAL;
1359 	}
1360 
1361 	assert(module->base_bdevs_min != 0);
1362 	if (num_base_bdevs < module->base_bdevs_min) {
1363 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1364 			    module->base_bdevs_min,
1365 			    raid_bdev_level_to_str(level));
1366 		return -EINVAL;
1367 	}
1368 
1369 	switch (module->base_bdevs_constraint.type) {
1370 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1371 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1372 		break;
1373 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1374 		min_operational = module->base_bdevs_constraint.value;
1375 		break;
1376 	case CONSTRAINT_UNSET:
1377 		if (module->base_bdevs_constraint.value != 0) {
1378 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1379 				    (uint8_t)module->base_bdevs_constraint.value, name);
1380 			return -EINVAL;
1381 		}
1382 		min_operational = num_base_bdevs;
1383 		break;
1384 	default:
1385 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1386 			    (uint8_t)module->base_bdevs_constraint.type,
1387 			    raid_bdev_level_to_str(module->level));
1388 		return -EINVAL;
1389 	};
1390 
1391 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1392 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1393 			    raid_bdev_level_to_str(module->level));
1394 		return -EINVAL;
1395 	}
1396 
1397 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1398 	if (!raid_bdev) {
1399 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1400 		return -ENOMEM;
1401 	}
1402 
1403 	spdk_spin_init(&raid_bdev->base_bdev_lock);
1404 	raid_bdev->module = module;
1405 	raid_bdev->num_base_bdevs = num_base_bdevs;
1406 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1407 					   sizeof(struct raid_base_bdev_info));
1408 	if (!raid_bdev->base_bdev_info) {
1409 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1410 		raid_bdev_free(raid_bdev);
1411 		return -ENOMEM;
1412 	}
1413 
1414 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1415 		base_info->raid_bdev = raid_bdev;
1416 	}
1417 
1418 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1419 	 * internally and set later.
1420 	 */
1421 	raid_bdev->strip_size = 0;
1422 	raid_bdev->strip_size_kb = strip_size;
1423 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1424 	raid_bdev->level = level;
1425 	raid_bdev->min_base_bdevs_operational = min_operational;
1426 
1427 	if (superblock_enabled) {
1428 		raid_bdev->sb = spdk_dma_zmalloc(RAID_BDEV_SB_MAX_LENGTH, 0x1000, NULL);
1429 		if (!raid_bdev->sb) {
1430 			SPDK_ERRLOG("Failed to allocate raid bdev sb buffer\n");
1431 			raid_bdev_free(raid_bdev);
1432 			return -ENOMEM;
1433 		}
1434 	}
1435 
1436 	raid_bdev_gen = &raid_bdev->bdev;
1437 
1438 	raid_bdev_gen->name = strdup(name);
1439 	if (!raid_bdev_gen->name) {
1440 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1441 		raid_bdev_free(raid_bdev);
1442 		return -ENOMEM;
1443 	}
1444 
1445 	raid_bdev_gen->product_name = "Raid Volume";
1446 	raid_bdev_gen->ctxt = raid_bdev;
1447 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1448 	raid_bdev_gen->module = &g_raid_if;
1449 	raid_bdev_gen->write_cache = 0;
1450 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1451 
1452 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1453 
1454 	*raid_bdev_out = raid_bdev;
1455 
1456 	return 0;
1457 }
1458 
1459 /*
1460  * brief:
1461  * raid_bdev_create allocates raid bdev based on passed configuration
1462  * params:
1463  * name - name for raid bdev
1464  * strip_size - strip size in KB
1465  * num_base_bdevs - number of base bdevs
1466  * level - raid level
1467  * superblock_enabled - true if raid should have superblock
1468  * uuid - uuid to set for the bdev
1469  * raid_bdev_out - the created raid bdev
1470  * returns:
1471  * 0 - success
1472  * non zero - failure
1473  */
1474 int
1475 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1476 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1477 		 struct raid_bdev **raid_bdev_out)
1478 {
1479 	struct raid_bdev *raid_bdev;
1480 	int rc;
1481 
1482 	assert(uuid != NULL);
1483 
1484 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1485 			       &raid_bdev);
1486 	if (rc != 0) {
1487 		return rc;
1488 	}
1489 
1490 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1491 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1492 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1493 	}
1494 
1495 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1496 
1497 	*raid_bdev_out = raid_bdev;
1498 
1499 	return 0;
1500 }
1501 
1502 static void
1503 _raid_bdev_unregistering_cont(void *ctx)
1504 {
1505 	struct raid_bdev *raid_bdev = ctx;
1506 
1507 	spdk_bdev_close(raid_bdev->self_desc);
1508 	raid_bdev->self_desc = NULL;
1509 }
1510 
1511 static void
1512 raid_bdev_unregistering_cont(void *ctx)
1513 {
1514 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1515 }
1516 
1517 static int
1518 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1519 {
1520 	struct raid_process_finish_action *finish_action;
1521 
1522 	assert(spdk_get_thread() == process->thread);
1523 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1524 
1525 	finish_action = calloc(1, sizeof(*finish_action));
1526 	if (finish_action == NULL) {
1527 		return -ENOMEM;
1528 	}
1529 
1530 	finish_action->cb = cb;
1531 	finish_action->cb_ctx = cb_ctx;
1532 
1533 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1534 
1535 	return 0;
1536 }
1537 
1538 static void
1539 raid_bdev_unregistering_stop_process(void *ctx)
1540 {
1541 	struct raid_bdev_process *process = ctx;
1542 	struct raid_bdev *raid_bdev = process->raid_bdev;
1543 	int rc;
1544 
1545 	process->state = RAID_PROCESS_STATE_STOPPING;
1546 	if (process->status == 0) {
1547 		process->status = -ECANCELED;
1548 	}
1549 
1550 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1551 	if (rc != 0) {
1552 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1553 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1554 	}
1555 }
1556 
1557 static void
1558 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1559 {
1560 	struct raid_bdev *raid_bdev = event_ctx;
1561 
1562 	switch (type) {
1563 	case SPDK_BDEV_EVENT_REMOVE:
1564 		if (raid_bdev->process != NULL) {
1565 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1566 					     raid_bdev->process);
1567 		} else {
1568 			raid_bdev_unregistering_cont(raid_bdev);
1569 		}
1570 		break;
1571 	default:
1572 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1573 		break;
1574 	}
1575 }
1576 
1577 static void
1578 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1579 {
1580 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1581 	int rc;
1582 
1583 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1584 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1585 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1586 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1587 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1588 				sizeof(struct raid_bdev_io_channel),
1589 				raid_bdev_gen->name);
1590 	rc = spdk_bdev_register(raid_bdev_gen);
1591 	if (rc != 0) {
1592 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1593 			    raid_bdev_gen->name, spdk_strerror(-rc));
1594 		goto err;
1595 	}
1596 
1597 	/*
1598 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1599 	 * first. The process may still need to unquiesce a range but it will fail because the
1600 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1601 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1602 	 * so this is the only way currently to do this correctly.
1603 	 * TODO: try to handle this correctly in bdev layer instead.
1604 	 */
1605 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1606 				&raid_bdev->self_desc);
1607 	if (rc != 0) {
1608 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1609 			    raid_bdev_gen->name, spdk_strerror(-rc));
1610 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1611 		goto err;
1612 	}
1613 
1614 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1615 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1616 		      raid_bdev_gen->name, raid_bdev);
1617 	return;
1618 err:
1619 	if (raid_bdev->module->stop != NULL) {
1620 		raid_bdev->module->stop(raid_bdev);
1621 	}
1622 	spdk_io_device_unregister(raid_bdev, NULL);
1623 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1624 }
1625 
1626 static void
1627 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1628 {
1629 	if (status == 0) {
1630 		raid_bdev_configure_cont(raid_bdev);
1631 	} else {
1632 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1633 			    raid_bdev->bdev.name, spdk_strerror(-status));
1634 		if (raid_bdev->module->stop != NULL) {
1635 			raid_bdev->module->stop(raid_bdev);
1636 		}
1637 	}
1638 }
1639 
1640 /*
1641  * brief:
1642  * If raid bdev config is complete, then only register the raid bdev to
1643  * bdev layer and remove this raid bdev from configuring list and
1644  * insert the raid bdev to configured list
1645  * params:
1646  * raid_bdev - pointer to raid bdev
1647  * returns:
1648  * 0 - success
1649  * non zero - failure
1650  */
1651 static int
1652 raid_bdev_configure(struct raid_bdev *raid_bdev)
1653 {
1654 	int rc;
1655 
1656 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1657 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1658 	assert(raid_bdev->bdev.blocklen > 0);
1659 
1660 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1661 	 * internal use.
1662 	 */
1663 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / raid_bdev->bdev.blocklen;
1664 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1665 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1666 		return -EINVAL;
1667 	}
1668 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1669 	raid_bdev->blocklen_shift = spdk_u32log2(raid_bdev->bdev.blocklen);
1670 
1671 	rc = raid_bdev->module->start(raid_bdev);
1672 	if (rc != 0) {
1673 		SPDK_ERRLOG("raid module startup callback failed\n");
1674 		return rc;
1675 	}
1676 
1677 	if (raid_bdev->sb != NULL) {
1678 		if (spdk_uuid_is_null(&raid_bdev->sb->uuid)) {
1679 			/* NULL UUID is not valid in the sb so it means that we are creating a new
1680 			 * raid bdev and should initialize the superblock.
1681 			 */
1682 			raid_bdev_init_superblock(raid_bdev);
1683 		} else {
1684 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1685 			if (raid_bdev->sb->block_size != raid_bdev->bdev.blocklen) {
1686 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1687 				rc = -EINVAL;
1688 			}
1689 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1690 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1691 				rc = -EINVAL;
1692 			}
1693 			if (rc != 0) {
1694 				if (raid_bdev->module->stop != NULL) {
1695 					raid_bdev->module->stop(raid_bdev);
1696 				}
1697 				return rc;
1698 			}
1699 		}
1700 
1701 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1702 	} else {
1703 		raid_bdev_configure_cont(raid_bdev);
1704 	}
1705 
1706 	return 0;
1707 }
1708 
1709 /*
1710  * brief:
1711  * If raid bdev is online and registered, change the bdev state to
1712  * configuring and unregister this raid device. Queue this raid device
1713  * in configuring list
1714  * params:
1715  * raid_bdev - pointer to raid bdev
1716  * cb_fn - callback function
1717  * cb_arg - argument to callback function
1718  * returns:
1719  * none
1720  */
1721 static void
1722 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1723 		      void *cb_arg)
1724 {
1725 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1726 		if (cb_fn) {
1727 			cb_fn(cb_arg, 0);
1728 		}
1729 		return;
1730 	}
1731 
1732 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1733 	assert(raid_bdev->num_base_bdevs_discovered);
1734 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1735 
1736 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1737 }
1738 
1739 /*
1740  * brief:
1741  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1742  * params:
1743  * base_bdev - pointer to base bdev
1744  * returns:
1745  * base bdev info if found, otherwise NULL.
1746  */
1747 static struct raid_base_bdev_info *
1748 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1749 {
1750 	struct raid_bdev *raid_bdev;
1751 	struct raid_base_bdev_info *base_info;
1752 
1753 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1754 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1755 			if (base_info->desc != NULL &&
1756 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1757 				return base_info;
1758 			}
1759 		}
1760 	}
1761 
1762 	return NULL;
1763 }
1764 
1765 static void
1766 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1767 {
1768 	assert(base_info->remove_scheduled);
1769 
1770 	base_info->remove_scheduled = false;
1771 	if (base_info->remove_cb != NULL) {
1772 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1773 	}
1774 }
1775 
1776 static void
1777 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1778 {
1779 	struct raid_base_bdev_info *base_info = ctx;
1780 
1781 	if (status != 0) {
1782 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1783 			    raid_bdev->bdev.name, spdk_strerror(-status));
1784 	}
1785 
1786 	raid_bdev_remove_base_bdev_done(base_info, status);
1787 }
1788 
1789 static void
1790 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1791 {
1792 	struct raid_base_bdev_info *base_info = ctx;
1793 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1794 
1795 	if (status != 0) {
1796 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1797 			    raid_bdev->bdev.name, spdk_strerror(-status));
1798 		goto out;
1799 	}
1800 
1801 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1802 	raid_bdev_free_base_bdev_resource(base_info);
1803 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1804 
1805 	if (raid_bdev->sb) {
1806 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1807 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1808 		uint8_t i;
1809 
1810 		for (i = 0; i < sb->base_bdevs_size; i++) {
1811 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
1812 
1813 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1814 			    sb_base_bdev->slot == slot) {
1815 				/* TODO: distinguish between failure and intentional removal */
1816 				sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1817 
1818 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1819 				return;
1820 			}
1821 		}
1822 	}
1823 out:
1824 	raid_bdev_remove_base_bdev_done(base_info, status);
1825 }
1826 
1827 static void
1828 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1829 {
1830 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1831 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1832 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1833 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1834 
1835 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1836 
1837 	if (raid_ch->base_channel[idx] != NULL) {
1838 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1839 		raid_ch->base_channel[idx] = NULL;
1840 	}
1841 
1842 	if (raid_ch->process.ch_processed != NULL) {
1843 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1844 	}
1845 
1846 	spdk_for_each_channel_continue(i, 0);
1847 }
1848 
1849 static void
1850 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1851 {
1852 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1853 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1854 
1855 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1856 			    base_info);
1857 }
1858 
1859 static void
1860 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1861 {
1862 	struct raid_base_bdev_info *base_info = ctx;
1863 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1864 
1865 	if (status != 0) {
1866 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
1867 			    raid_bdev->bdev.name, spdk_strerror(-status));
1868 		raid_bdev_remove_base_bdev_done(base_info, status);
1869 		return;
1870 	}
1871 
1872 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1873 			      raid_bdev_channels_remove_base_bdev_done);
1874 }
1875 
1876 static int
1877 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
1878 {
1879 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1880 
1881 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
1882 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
1883 }
1884 
1885 struct raid_bdev_process_base_bdev_remove_ctx {
1886 	struct raid_bdev_process *process;
1887 	struct raid_base_bdev_info *base_info;
1888 	uint8_t num_base_bdevs_operational;
1889 };
1890 
1891 static void
1892 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
1893 {
1894 	struct raid_base_bdev_info *base_info = ctx;
1895 	int ret;
1896 
1897 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
1898 	if (ret != 0) {
1899 		raid_bdev_remove_base_bdev_done(base_info, ret);
1900 	}
1901 }
1902 
1903 static void
1904 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
1905 {
1906 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
1907 	struct raid_base_bdev_info *base_info = ctx->base_info;
1908 
1909 	free(ctx);
1910 
1911 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
1912 			     base_info);
1913 }
1914 
1915 static void
1916 _raid_bdev_process_base_bdev_remove(void *_ctx)
1917 {
1918 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
1919 	struct raid_bdev_process *process = ctx->process;
1920 	int ret;
1921 
1922 	if (ctx->base_info != process->target &&
1923 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
1924 		/* process doesn't need to be stopped */
1925 		raid_bdev_process_base_bdev_remove_cont(ctx);
1926 		return;
1927 	}
1928 
1929 	assert(process->state > RAID_PROCESS_STATE_INIT &&
1930 	       process->state < RAID_PROCESS_STATE_STOPPED);
1931 
1932 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
1933 	if (ret != 0) {
1934 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
1935 		free(ctx);
1936 		return;
1937 	}
1938 
1939 	process->state = RAID_PROCESS_STATE_STOPPING;
1940 
1941 	if (process->status == 0) {
1942 		process->status = -ENODEV;
1943 	}
1944 }
1945 
1946 static int
1947 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
1948 				   struct raid_base_bdev_info *base_info)
1949 {
1950 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
1951 
1952 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1953 
1954 	ctx = calloc(1, sizeof(*ctx));
1955 	if (ctx == NULL) {
1956 		return -ENOMEM;
1957 	}
1958 
1959 	/*
1960 	 * We have to send the process and num_base_bdevs_operational in the message ctx
1961 	 * because the process thread should not access raid_bdev's properties. Particularly,
1962 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
1963 	 * will still be valid until the process is fully stopped.
1964 	 */
1965 	ctx->base_info = base_info;
1966 	ctx->process = process;
1967 	ctx->num_base_bdevs_operational = process->raid_bdev->num_base_bdevs_operational;
1968 
1969 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
1970 
1971 	return 0;
1972 }
1973 
1974 static int
1975 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
1976 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
1977 {
1978 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1979 	int ret = 0;
1980 
1981 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
1982 
1983 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1984 
1985 	if (base_info->remove_scheduled) {
1986 		return -ENODEV;
1987 	}
1988 
1989 	assert(base_info->desc);
1990 	base_info->remove_scheduled = true;
1991 	base_info->remove_cb = cb_fn;
1992 	base_info->remove_cb_ctx = cb_ctx;
1993 
1994 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1995 		/*
1996 		 * As raid bdev is not registered yet or already unregistered,
1997 		 * so cleanup should be done here itself.
1998 		 *
1999 		 * Removing a base bdev at this stage does not change the number of operational
2000 		 * base bdevs, only the number of discovered base bdevs.
2001 		 */
2002 		raid_bdev_free_base_bdev_resource(base_info);
2003 		if (raid_bdev->num_base_bdevs_discovered == 0) {
2004 			/* There is no base bdev for this raid, so free the raid device. */
2005 			raid_bdev_cleanup_and_free(raid_bdev);
2006 		}
2007 	} else if (raid_bdev->num_base_bdevs_operational-- == raid_bdev->min_base_bdevs_operational) {
2008 		/*
2009 		 * After this base bdev is removed there will not be enough base bdevs
2010 		 * to keep the raid bdev operational.
2011 		 */
2012 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2013 	} else if (raid_bdev->process != NULL) {
2014 		ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2015 	} else {
2016 		ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2017 	}
2018 
2019 	if (ret != 0) {
2020 		base_info->remove_scheduled = false;
2021 	}
2022 	return ret;
2023 }
2024 
2025 /*
2026  * brief:
2027  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2028  * is removed. This function checks if this base bdev is part of any raid bdev
2029  * or not. If yes, it takes necessary action on that particular raid bdev.
2030  * params:
2031  * base_bdev - pointer to base bdev which got removed
2032  * cb_fn - callback function
2033  * cb_arg - argument to callback function
2034  * returns:
2035  * 0 - success
2036  * non zero - failure
2037  */
2038 int
2039 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2040 {
2041 	struct raid_base_bdev_info *base_info;
2042 
2043 	/* Find the raid_bdev which has claimed this base_bdev */
2044 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2045 	if (!base_info) {
2046 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2047 		return -ENODEV;
2048 	}
2049 
2050 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2051 }
2052 
2053 /*
2054  * brief:
2055  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2056  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2057  * If yes, call module handler to resize the raid_bdev if implemented.
2058  * params:
2059  * base_bdev - pointer to base bdev which got resized.
2060  * returns:
2061  * none
2062  */
2063 static void
2064 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2065 {
2066 	struct raid_bdev *raid_bdev;
2067 	struct raid_base_bdev_info *base_info;
2068 
2069 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2070 
2071 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2072 
2073 	/* Find the raid_bdev which has claimed this base_bdev */
2074 	if (!base_info) {
2075 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2076 		return;
2077 	}
2078 	raid_bdev = base_info->raid_bdev;
2079 
2080 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2081 
2082 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2083 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2084 
2085 	if (raid_bdev->module->resize) {
2086 		raid_bdev->module->resize(raid_bdev);
2087 	}
2088 }
2089 
2090 /*
2091  * brief:
2092  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2093  * triggers asynchronous event.
2094  * params:
2095  * type - event details.
2096  * bdev - bdev that triggered event.
2097  * event_ctx - context for event.
2098  * returns:
2099  * none
2100  */
2101 static void
2102 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2103 			  void *event_ctx)
2104 {
2105 	int rc;
2106 
2107 	switch (type) {
2108 	case SPDK_BDEV_EVENT_REMOVE:
2109 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2110 		if (rc != 0) {
2111 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2112 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2113 		}
2114 		break;
2115 	case SPDK_BDEV_EVENT_RESIZE:
2116 		raid_bdev_resize_base_bdev(bdev);
2117 		break;
2118 	default:
2119 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2120 		break;
2121 	}
2122 }
2123 
2124 /*
2125  * brief:
2126  * Deletes the specified raid bdev
2127  * params:
2128  * raid_bdev - pointer to raid bdev
2129  * cb_fn - callback function
2130  * cb_arg - argument to callback function
2131  */
2132 void
2133 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2134 {
2135 	struct raid_base_bdev_info *base_info;
2136 
2137 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2138 
2139 	if (raid_bdev->destroy_started) {
2140 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2141 			      raid_bdev->bdev.name);
2142 		if (cb_fn) {
2143 			cb_fn(cb_arg, -EALREADY);
2144 		}
2145 		return;
2146 	}
2147 
2148 	raid_bdev->destroy_started = true;
2149 
2150 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2151 		base_info->remove_scheduled = true;
2152 
2153 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2154 			/*
2155 			 * As raid bdev is not registered yet or already unregistered,
2156 			 * so cleanup should be done here itself.
2157 			 */
2158 			raid_bdev_free_base_bdev_resource(base_info);
2159 		}
2160 	}
2161 
2162 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2163 		/* There is no base bdev for this raid, so free the raid device. */
2164 		raid_bdev_cleanup_and_free(raid_bdev);
2165 		if (cb_fn) {
2166 			cb_fn(cb_arg, 0);
2167 		}
2168 	} else {
2169 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2170 	}
2171 }
2172 
2173 static void
2174 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2175 {
2176 	if (status != 0) {
2177 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2178 			    raid_bdev->bdev.name, spdk_strerror(-status));
2179 	}
2180 }
2181 
2182 static void
2183 raid_bdev_process_finish_write_sb(void *ctx)
2184 {
2185 	struct raid_bdev *raid_bdev = ctx;
2186 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2187 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2188 	struct raid_base_bdev_info *base_info;
2189 	uint8_t i;
2190 
2191 	for (i = 0; i < sb->base_bdevs_size; i++) {
2192 		sb_base_bdev = &sb->base_bdevs[i];
2193 
2194 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2195 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2196 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2197 			if (base_info->is_configured) {
2198 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2199 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2200 			}
2201 		}
2202 	}
2203 
2204 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2205 }
2206 
2207 static void raid_bdev_process_free(struct raid_bdev_process *process);
2208 
2209 static void
2210 _raid_bdev_process_finish_done(void *ctx)
2211 {
2212 	struct raid_bdev_process *process = ctx;
2213 	struct raid_process_finish_action *finish_action;
2214 
2215 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2216 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2217 		finish_action->cb(finish_action->cb_ctx);
2218 		free(finish_action);
2219 	}
2220 
2221 	raid_bdev_process_free(process);
2222 
2223 	spdk_thread_exit(spdk_get_thread());
2224 }
2225 
2226 static void
2227 raid_bdev_process_finish_target_removed(void *ctx, int status)
2228 {
2229 	struct raid_bdev_process *process = ctx;
2230 
2231 	if (status != 0) {
2232 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2233 	}
2234 
2235 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2236 }
2237 
2238 static void
2239 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2240 {
2241 	struct raid_bdev_process *process = ctx;
2242 
2243 	if (status != 0) {
2244 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2245 	}
2246 
2247 	if (process->status != 0) {
2248 		struct raid_base_bdev_info *target = process->target;
2249 
2250 		if (target->desc != NULL && target->remove_scheduled == false) {
2251 			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
2252 			return;
2253 		}
2254 	}
2255 
2256 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2257 }
2258 
2259 static void
2260 raid_bdev_process_finish_unquiesce(void *ctx)
2261 {
2262 	struct raid_bdev_process *process = ctx;
2263 	int rc;
2264 
2265 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2266 				 raid_bdev_process_finish_unquiesced, process);
2267 	if (rc != 0) {
2268 		raid_bdev_process_finish_unquiesced(process, rc);
2269 	}
2270 }
2271 
2272 static void
2273 raid_bdev_process_finish_done(void *ctx)
2274 {
2275 	struct raid_bdev_process *process = ctx;
2276 	struct raid_bdev *raid_bdev = process->raid_bdev;
2277 
2278 	if (process->raid_ch != NULL) {
2279 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2280 	}
2281 
2282 	process->state = RAID_PROCESS_STATE_STOPPED;
2283 
2284 	if (process->status == 0) {
2285 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2286 			       raid_bdev_process_to_str(process->type),
2287 			       raid_bdev->bdev.name);
2288 		if (raid_bdev->sb != NULL) {
2289 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2290 					     raid_bdev_process_finish_write_sb,
2291 					     raid_bdev);
2292 		}
2293 	} else {
2294 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2295 			     raid_bdev_process_to_str(process->type),
2296 			     raid_bdev->bdev.name,
2297 			     spdk_strerror(-process->status));
2298 	}
2299 
2300 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2301 			     process);
2302 }
2303 
2304 static void
2305 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2306 {
2307 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2308 
2309 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2310 }
2311 
2312 static void
2313 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2314 {
2315 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2316 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2317 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2318 
2319 	if (process->status == 0) {
2320 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2321 
2322 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2323 		raid_ch->process.target_ch = NULL;
2324 	}
2325 
2326 	raid_bdev_ch_process_cleanup(raid_ch);
2327 
2328 	spdk_for_each_channel_continue(i, 0);
2329 }
2330 
2331 static void
2332 raid_bdev_process_finish_quiesced(void *ctx, int status)
2333 {
2334 	struct raid_bdev_process *process = ctx;
2335 	struct raid_bdev *raid_bdev = process->raid_bdev;
2336 
2337 	if (status != 0) {
2338 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2339 		return;
2340 	}
2341 
2342 	raid_bdev->process = NULL;
2343 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2344 			      __raid_bdev_process_finish);
2345 }
2346 
2347 static void
2348 _raid_bdev_process_finish(void *ctx)
2349 {
2350 	struct raid_bdev_process *process = ctx;
2351 	int rc;
2352 
2353 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2354 			       raid_bdev_process_finish_quiesced, process);
2355 	if (rc != 0) {
2356 		raid_bdev_process_finish_quiesced(ctx, rc);
2357 	}
2358 }
2359 
2360 static void
2361 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2362 {
2363 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2364 }
2365 
2366 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2367 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2368 
2369 static void
2370 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2371 {
2372 	assert(spdk_get_thread() == process->thread);
2373 
2374 	if (process->status == 0) {
2375 		process->status = status;
2376 	}
2377 
2378 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2379 		return;
2380 	}
2381 
2382 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2383 	process->state = RAID_PROCESS_STATE_STOPPING;
2384 
2385 	if (process->window_range_locked) {
2386 		raid_bdev_process_unlock_window_range(process);
2387 	} else {
2388 		raid_bdev_process_thread_run(process);
2389 	}
2390 }
2391 
2392 static void
2393 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2394 {
2395 	struct raid_bdev_process *process = ctx;
2396 
2397 	if (status != 0) {
2398 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2399 		raid_bdev_process_finish(process, status);
2400 		return;
2401 	}
2402 
2403 	process->window_range_locked = false;
2404 	process->window_offset += process->window_size;
2405 
2406 	raid_bdev_process_thread_run(process);
2407 }
2408 
2409 static void
2410 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2411 {
2412 	int rc;
2413 
2414 	assert(process->window_range_locked == true);
2415 
2416 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2417 				       process->window_offset, process->max_window_size,
2418 				       raid_bdev_process_window_range_unlocked, process);
2419 	if (rc != 0) {
2420 		raid_bdev_process_window_range_unlocked(process, rc);
2421 	}
2422 }
2423 
2424 static void
2425 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2426 {
2427 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2428 
2429 	raid_bdev_process_unlock_window_range(process);
2430 }
2431 
2432 static void
2433 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2434 {
2435 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2436 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2437 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2438 
2439 	raid_ch->process.offset = process->window_offset + process->window_size;
2440 
2441 	spdk_for_each_channel_continue(i, 0);
2442 }
2443 
2444 void
2445 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2446 {
2447 	struct raid_bdev_process *process = process_req->process;
2448 
2449 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2450 
2451 	assert(spdk_get_thread() == process->thread);
2452 	assert(process->window_remaining >= process_req->num_blocks);
2453 
2454 	if (status != 0) {
2455 		process->window_status = status;
2456 	}
2457 
2458 	process->window_remaining -= process_req->num_blocks;
2459 	if (process->window_remaining == 0) {
2460 		if (process->window_status != 0) {
2461 			raid_bdev_process_finish(process, process->window_status);
2462 			return;
2463 		}
2464 
2465 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2466 				      raid_bdev_process_channels_update_done);
2467 	}
2468 }
2469 
2470 static int
2471 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2472 				 uint32_t num_blocks)
2473 {
2474 	struct raid_bdev *raid_bdev = process->raid_bdev;
2475 	struct raid_bdev_process_request *process_req;
2476 	int ret;
2477 
2478 	process_req = TAILQ_FIRST(&process->requests);
2479 	if (process_req == NULL) {
2480 		assert(process->window_remaining > 0);
2481 		return 0;
2482 	}
2483 
2484 	process_req->target = process->target;
2485 	process_req->target_ch = process->raid_ch->process.target_ch;
2486 	process_req->offset_blocks = offset_blocks;
2487 	process_req->num_blocks = num_blocks;
2488 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2489 
2490 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2491 	if (ret <= 0) {
2492 		if (ret < 0) {
2493 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2494 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2495 			process->window_status = ret;
2496 		}
2497 		return ret;
2498 	}
2499 
2500 	process_req->num_blocks = ret;
2501 	TAILQ_REMOVE(&process->requests, process_req, link);
2502 
2503 	return ret;
2504 }
2505 
2506 static void
2507 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2508 {
2509 	struct raid_bdev *raid_bdev = process->raid_bdev;
2510 	uint64_t offset = process->window_offset;
2511 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2512 	int ret;
2513 
2514 	while (offset < offset_end) {
2515 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2516 		if (ret <= 0) {
2517 			break;
2518 		}
2519 
2520 		process->window_remaining += ret;
2521 		offset += ret;
2522 	}
2523 
2524 	if (process->window_remaining > 0) {
2525 		process->window_size = process->window_remaining;
2526 	} else {
2527 		raid_bdev_process_finish(process, process->window_status);
2528 	}
2529 }
2530 
2531 static void
2532 raid_bdev_process_window_range_locked(void *ctx, int status)
2533 {
2534 	struct raid_bdev_process *process = ctx;
2535 
2536 	if (status != 0) {
2537 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2538 		raid_bdev_process_finish(process, status);
2539 		return;
2540 	}
2541 
2542 	process->window_range_locked = true;
2543 
2544 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2545 		raid_bdev_process_unlock_window_range(process);
2546 		return;
2547 	}
2548 
2549 	_raid_bdev_process_thread_run(process);
2550 }
2551 
2552 static void
2553 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2554 {
2555 	struct raid_bdev *raid_bdev = process->raid_bdev;
2556 	int rc;
2557 
2558 	assert(spdk_get_thread() == process->thread);
2559 	assert(process->window_remaining == 0);
2560 	assert(process->window_range_locked == false);
2561 
2562 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2563 		raid_bdev_process_do_finish(process);
2564 		return;
2565 	}
2566 
2567 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2568 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2569 		raid_bdev_process_finish(process, 0);
2570 		return;
2571 	}
2572 
2573 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2574 					    process->max_window_size);
2575 
2576 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2577 				     process->window_offset, process->max_window_size,
2578 				     raid_bdev_process_window_range_locked, process);
2579 	if (rc != 0) {
2580 		raid_bdev_process_window_range_locked(process, rc);
2581 	}
2582 }
2583 
2584 static void
2585 raid_bdev_process_thread_init(void *ctx)
2586 {
2587 	struct raid_bdev_process *process = ctx;
2588 	struct raid_bdev *raid_bdev = process->raid_bdev;
2589 	struct spdk_io_channel *ch;
2590 
2591 	process->thread = spdk_get_thread();
2592 
2593 	ch = spdk_get_io_channel(raid_bdev);
2594 	if (ch == NULL) {
2595 		process->status = -ENOMEM;
2596 		raid_bdev_process_do_finish(process);
2597 		return;
2598 	}
2599 
2600 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2601 	process->state = RAID_PROCESS_STATE_RUNNING;
2602 
2603 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2604 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2605 
2606 	raid_bdev_process_thread_run(process);
2607 }
2608 
2609 static void
2610 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2611 {
2612 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2613 
2614 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2615 	raid_bdev_process_free(process);
2616 
2617 	/* TODO: update sb */
2618 }
2619 
2620 static void
2621 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2622 {
2623 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2624 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2625 
2626 	raid_bdev_ch_process_cleanup(raid_ch);
2627 
2628 	spdk_for_each_channel_continue(i, 0);
2629 }
2630 
2631 static void
2632 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2633 {
2634 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2635 	struct raid_bdev *raid_bdev = process->raid_bdev;
2636 	struct spdk_thread *thread;
2637 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2638 
2639 	if (status != 0) {
2640 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2641 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2642 			    spdk_strerror(-status));
2643 		goto err;
2644 	}
2645 
2646 	/* TODO: we may need to abort if a base bdev was removed before we got here */
2647 
2648 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2649 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2650 
2651 	thread = spdk_thread_create(thread_name, NULL);
2652 	if (thread == NULL) {
2653 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2654 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2655 		goto err;
2656 	}
2657 
2658 	raid_bdev->process = process;
2659 
2660 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2661 
2662 	return;
2663 err:
2664 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2665 			      raid_bdev_channels_abort_start_process_done);
2666 }
2667 
2668 static void
2669 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2670 {
2671 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2672 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2673 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2674 	int rc;
2675 
2676 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2677 
2678 	spdk_for_each_channel_continue(i, rc);
2679 }
2680 
2681 static void
2682 raid_bdev_process_start(struct raid_bdev_process *process)
2683 {
2684 	struct raid_bdev *raid_bdev = process->raid_bdev;
2685 
2686 	assert(raid_bdev->module->submit_process_request != NULL);
2687 
2688 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2689 			      raid_bdev_channels_start_process_done);
2690 }
2691 
2692 static void
2693 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2694 {
2695 	spdk_dma_free(process_req->iov.iov_base);
2696 	spdk_dma_free(process_req->md_buf);
2697 	free(process_req);
2698 }
2699 
2700 static struct raid_bdev_process_request *
2701 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2702 {
2703 	struct raid_bdev *raid_bdev = process->raid_bdev;
2704 	struct raid_bdev_process_request *process_req;
2705 
2706 	process_req = calloc(1, sizeof(*process_req));
2707 	if (process_req == NULL) {
2708 		return NULL;
2709 	}
2710 
2711 	process_req->process = process;
2712 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2713 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2714 	if (process_req->iov.iov_base == NULL) {
2715 		free(process_req);
2716 		return NULL;
2717 	}
2718 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2719 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2720 		if (process_req->md_buf == NULL) {
2721 			raid_bdev_process_request_free(process_req);
2722 			return NULL;
2723 		}
2724 	}
2725 
2726 	return process_req;
2727 }
2728 
2729 static void
2730 raid_bdev_process_free(struct raid_bdev_process *process)
2731 {
2732 	struct raid_bdev_process_request *process_req;
2733 
2734 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2735 		TAILQ_REMOVE(&process->requests, process_req, link);
2736 		raid_bdev_process_request_free(process_req);
2737 	}
2738 
2739 	free(process);
2740 }
2741 
2742 static struct raid_bdev_process *
2743 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2744 			struct raid_base_bdev_info *target)
2745 {
2746 	struct raid_bdev_process *process;
2747 	struct raid_bdev_process_request *process_req;
2748 	int i;
2749 
2750 	process = calloc(1, sizeof(*process));
2751 	if (process == NULL) {
2752 		return NULL;
2753 	}
2754 
2755 	process->raid_bdev = raid_bdev;
2756 	process->type = type;
2757 	process->target = target;
2758 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2759 					    raid_bdev->bdev.blocklen),
2760 					    raid_bdev->bdev.write_unit_size);
2761 	TAILQ_INIT(&process->requests);
2762 	TAILQ_INIT(&process->finish_actions);
2763 
2764 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2765 		process_req = raid_bdev_process_alloc_request(process);
2766 		if (process_req == NULL) {
2767 			raid_bdev_process_free(process);
2768 			return NULL;
2769 		}
2770 
2771 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2772 	}
2773 
2774 	return process;
2775 }
2776 
2777 static int
2778 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2779 {
2780 	struct raid_bdev_process *process;
2781 
2782 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2783 
2784 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2785 	if (process == NULL) {
2786 		return -ENOMEM;
2787 	}
2788 
2789 	raid_bdev_process_start(process);
2790 
2791 	return 0;
2792 }
2793 
2794 static void
2795 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
2796 {
2797 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2798 	int rc;
2799 
2800 	/* TODO: defer if rebuild in progress on another base bdev */
2801 	assert(raid_bdev->process == NULL);
2802 
2803 	base_info->is_configured = true;
2804 
2805 	raid_bdev->num_base_bdevs_discovered++;
2806 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
2807 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
2808 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
2809 
2810 	/*
2811 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
2812 	 * of base bdevs we know to be operational members of the array. Usually this is equal
2813 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
2814 	 * degraded.
2815 	 */
2816 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
2817 		rc = raid_bdev_configure(raid_bdev);
2818 		if (rc != 0) {
2819 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
2820 		}
2821 	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
2822 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
2823 		raid_bdev->num_base_bdevs_operational++;
2824 		rc = raid_bdev_start_rebuild(base_info);
2825 		if (rc != 0) {
2826 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
2827 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
2828 		}
2829 	} else {
2830 		rc = 0;
2831 	}
2832 
2833 	if (base_info->configure_cb != NULL) {
2834 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
2835 	}
2836 }
2837 
2838 static void
2839 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
2840 		void *ctx)
2841 {
2842 	struct raid_base_bdev_info *base_info = ctx;
2843 
2844 	switch (status) {
2845 	case 0:
2846 		/* valid superblock found */
2847 		SPDK_ERRLOG("Existing raid superblock found on bdev %s\n", base_info->name);
2848 		status = -EEXIST;
2849 		raid_bdev_free_base_bdev_resource(base_info);
2850 		break;
2851 	case -EINVAL:
2852 		/* no valid superblock */
2853 		raid_bdev_configure_base_bdev_cont(base_info);
2854 		return;
2855 	default:
2856 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
2857 			    base_info->name, spdk_strerror(-status));
2858 		break;
2859 	}
2860 
2861 	if (base_info->configure_cb != NULL) {
2862 		base_info->configure_cb(base_info->configure_cb_ctx, status);
2863 	}
2864 }
2865 
2866 static int
2867 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
2868 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
2869 {
2870 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2871 	struct spdk_bdev_desc *desc;
2872 	struct spdk_bdev *bdev;
2873 	const struct spdk_uuid *bdev_uuid;
2874 	int rc;
2875 
2876 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2877 	assert(base_info->desc == NULL);
2878 
2879 	/*
2880 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
2881 	 * before claiming the bdev.
2882 	 */
2883 
2884 	if (!spdk_uuid_is_null(&base_info->uuid)) {
2885 		char uuid_str[SPDK_UUID_STRING_LEN];
2886 		const char *bdev_name;
2887 
2888 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
2889 
2890 		/* UUID of a bdev is registered as its alias */
2891 		bdev = spdk_bdev_get_by_name(uuid_str);
2892 		if (bdev == NULL) {
2893 			return -ENODEV;
2894 		}
2895 
2896 		bdev_name = spdk_bdev_get_name(bdev);
2897 
2898 		if (base_info->name == NULL) {
2899 			assert(existing == true);
2900 			base_info->name = strdup(bdev_name);
2901 			if (base_info->name == NULL) {
2902 				return -ENOMEM;
2903 			}
2904 		} else if (strcmp(base_info->name, bdev_name) != 0) {
2905 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
2906 				    bdev_name, base_info->name);
2907 			return -EINVAL;
2908 		}
2909 	}
2910 
2911 	assert(base_info->name != NULL);
2912 
2913 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
2914 	if (rc != 0) {
2915 		if (rc != -ENODEV) {
2916 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
2917 		}
2918 		return rc;
2919 	}
2920 
2921 	bdev = spdk_bdev_desc_get_bdev(desc);
2922 	bdev_uuid = spdk_bdev_get_uuid(bdev);
2923 
2924 	if (spdk_uuid_is_null(&base_info->uuid)) {
2925 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
2926 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
2927 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
2928 		spdk_bdev_close(desc);
2929 		return -EINVAL;
2930 	}
2931 
2932 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
2933 	if (rc != 0) {
2934 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
2935 		spdk_bdev_close(desc);
2936 		return rc;
2937 	}
2938 
2939 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
2940 
2941 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
2942 	if (base_info->app_thread_ch == NULL) {
2943 		SPDK_ERRLOG("Failed to get io channel\n");
2944 		spdk_bdev_module_release_bdev(bdev);
2945 		spdk_bdev_close(desc);
2946 		return -ENOMEM;
2947 	}
2948 
2949 	base_info->desc = desc;
2950 	base_info->blockcnt = bdev->blockcnt;
2951 
2952 	if (raid_bdev->sb != NULL) {
2953 		uint64_t data_offset;
2954 
2955 		if (base_info->data_offset == 0) {
2956 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % bdev->blocklen) == 0);
2957 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / bdev->blocklen;
2958 		} else {
2959 			data_offset = base_info->data_offset;
2960 		}
2961 
2962 		if (bdev->optimal_io_boundary != 0) {
2963 			data_offset = spdk_divide_round_up(data_offset,
2964 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
2965 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
2966 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
2967 					     base_info->data_offset, base_info->name, data_offset);
2968 				data_offset = base_info->data_offset;
2969 			}
2970 		}
2971 
2972 		base_info->data_offset = data_offset;
2973 	}
2974 
2975 	if (base_info->data_offset >= bdev->blockcnt) {
2976 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
2977 			    base_info->data_offset, bdev->blockcnt, base_info->name);
2978 		rc = -EINVAL;
2979 		goto out;
2980 	}
2981 
2982 	if (base_info->data_size == 0) {
2983 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
2984 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
2985 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
2986 			    bdev->blockcnt, base_info->name);
2987 		rc = -EINVAL;
2988 		goto out;
2989 	}
2990 
2991 	/* Currently, RAID bdevs do not support DIF or DIX, so a RAID bdev cannot
2992 	 * be created on top of any bdev which supports it */
2993 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
2994 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
2995 			    bdev->name);
2996 		rc = -EINVAL;
2997 		goto out;
2998 	}
2999 
3000 	/*
3001 	 * Set the raid bdev properties if this is the first base bdev configured,
3002 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3003 	 * have the same blocklen and metadata format.
3004 	 */
3005 	if (raid_bdev->num_base_bdevs_discovered == 0) {
3006 		raid_bdev->bdev.blocklen = bdev->blocklen;
3007 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3008 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3009 	} else {
3010 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3011 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3012 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3013 			rc = -EINVAL;
3014 			goto out;
3015 		}
3016 
3017 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3018 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev)) {
3019 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3020 				    raid_bdev->bdev.name, bdev->name);
3021 			rc = -EINVAL;
3022 			goto out;
3023 		}
3024 	}
3025 
3026 	base_info->configure_cb = cb_fn;
3027 	base_info->configure_cb_ctx = cb_ctx;
3028 
3029 	if (existing) {
3030 		raid_bdev_configure_base_bdev_cont(base_info);
3031 	} else {
3032 		/* check for existing superblock when using a new bdev */
3033 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3034 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3035 		if (rc) {
3036 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3037 				    bdev->name, spdk_strerror(-rc));
3038 		}
3039 	}
3040 out:
3041 	if (rc != 0) {
3042 		raid_bdev_free_base_bdev_resource(base_info);
3043 	}
3044 	return rc;
3045 }
3046 
3047 static int
3048 _raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3049 			   uint64_t data_offset, uint64_t data_size,
3050 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3051 {
3052 	struct raid_base_bdev_info *base_info;
3053 
3054 	assert(name != NULL);
3055 
3056 	if (slot >= raid_bdev->num_base_bdevs) {
3057 		return -EINVAL;
3058 	}
3059 
3060 	base_info = &raid_bdev->base_bdev_info[slot];
3061 
3062 	if (base_info->name != NULL) {
3063 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev '%s'\n",
3064 			    slot, raid_bdev->bdev.name, base_info->name);
3065 		return -EBUSY;
3066 	}
3067 
3068 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3069 		char uuid_str[SPDK_UUID_STRING_LEN];
3070 
3071 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3072 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev with uuid %s\n",
3073 			    slot, raid_bdev->bdev.name, uuid_str);
3074 		return -EBUSY;
3075 	}
3076 
3077 	base_info->name = strdup(name);
3078 	if (base_info->name == NULL) {
3079 		return -ENOMEM;
3080 	}
3081 
3082 	base_info->data_offset = data_offset;
3083 	base_info->data_size = data_size;
3084 
3085 	return raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3086 }
3087 
3088 int
3089 raid_bdev_attach_base_bdev(struct raid_bdev *raid_bdev, struct spdk_bdev *base_bdev,
3090 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3091 {
3092 	struct raid_base_bdev_info *base_info = NULL, *iter;
3093 	int rc;
3094 
3095 	SPDK_DEBUGLOG(bdev_raid, "attach_base_device: %s\n", base_bdev->name);
3096 
3097 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3098 
3099 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
3100 		SPDK_ERRLOG("raid bdev '%s' must be in online state to attach base bdev\n",
3101 			    raid_bdev->bdev.name);
3102 		return -EINVAL;
3103 	}
3104 
3105 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3106 		if (iter->desc == NULL) {
3107 			base_info = iter;
3108 			break;
3109 		}
3110 	}
3111 
3112 	if (base_info == NULL) {
3113 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3114 			    raid_bdev->bdev.name, base_bdev->name);
3115 		return -EINVAL;
3116 	}
3117 
3118 	assert(base_info->is_configured == false);
3119 	assert(base_info->data_size != 0);
3120 
3121 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
3122 
3123 	rc = _raid_bdev_add_base_device(raid_bdev, base_bdev->name,
3124 					raid_bdev_base_bdev_slot(base_info),
3125 					base_info->data_offset, base_info->data_size,
3126 					cb_fn, cb_ctx);
3127 	if (rc != 0) {
3128 		SPDK_ERRLOG("base bdev '%s' attach failed: %s\n", base_bdev->name, spdk_strerror(-rc));
3129 		raid_bdev_free_base_bdev_resource(base_info);
3130 	}
3131 
3132 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
3133 
3134 	return rc;
3135 }
3136 
3137 /*
3138  * brief:
3139  * raid_bdev_add_base_device function is the actual function which either adds
3140  * the nvme base device to existing raid bdev or create a new raid bdev. It also claims
3141  * the base device and keep the open descriptor.
3142  * params:
3143  * raid_bdev - pointer to raid bdev
3144  * name - name of the base bdev
3145  * slot - position to add base bdev
3146  * cb_fn - callback function
3147  * cb_ctx - argument to callback function
3148  * returns:
3149  * 0 - success
3150  * non zero - failure
3151  */
3152 int
3153 raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3154 			  raid_base_bdev_cb cb_fn, void *cb_ctx)
3155 {
3156 	return _raid_bdev_add_base_device(raid_bdev, name, slot, 0, 0, cb_fn, cb_ctx);
3157 }
3158 
3159 static int
3160 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3161 {
3162 	struct raid_bdev *raid_bdev;
3163 	uint8_t i;
3164 	int rc;
3165 
3166 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3167 			       sb->level, true, &sb->uuid, &raid_bdev);
3168 	if (rc != 0) {
3169 		return rc;
3170 	}
3171 
3172 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3173 	memcpy(raid_bdev->sb, sb, sb->length);
3174 
3175 	for (i = 0; i < sb->base_bdevs_size; i++) {
3176 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3177 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3178 
3179 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3180 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3181 			raid_bdev->num_base_bdevs_operational++;
3182 		}
3183 
3184 		base_info->data_offset = sb_base_bdev->data_offset;
3185 		base_info->data_size = sb_base_bdev->data_size;
3186 	}
3187 
3188 	*raid_bdev_out = raid_bdev;
3189 	return 0;
3190 }
3191 
3192 static void
3193 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3194 {
3195 	struct raid_bdev *raid_bdev;
3196 	struct raid_base_bdev_info *base_info;
3197 
3198 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3199 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3200 			if (base_info->desc == NULL && base_info->name != NULL &&
3201 			    strcmp(bdev->name, base_info->name) == 0) {
3202 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3203 				break;
3204 			}
3205 		}
3206 	}
3207 }
3208 
3209 static void
3210 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev)
3211 {
3212 	const struct raid_bdev_sb_base_bdev *sb_base_bdev;
3213 	struct raid_bdev *raid_bdev;
3214 	struct raid_base_bdev_info *iter, *base_info;
3215 	uint8_t i;
3216 	int rc;
3217 
3218 	if (sb->block_size != bdev->blocklen) {
3219 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3220 			     bdev->name, sb->block_size, bdev->blocklen);
3221 		return;
3222 	}
3223 
3224 	if (spdk_uuid_is_null(&sb->uuid)) {
3225 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3226 		return;
3227 	}
3228 
3229 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3230 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3231 			break;
3232 		}
3233 	}
3234 
3235 	if (raid_bdev) {
3236 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3237 			SPDK_DEBUGLOG(bdev_raid,
3238 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3239 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3240 
3241 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3242 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3243 					     raid_bdev->bdev.name, bdev->name);
3244 				return;
3245 			}
3246 
3247 			/* remove and then recreate the raid bdev using the newer superblock */
3248 			raid_bdev_delete(raid_bdev, NULL, NULL);
3249 			raid_bdev = NULL;
3250 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3251 			SPDK_DEBUGLOG(bdev_raid,
3252 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3253 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3254 			/* use the current raid bdev superblock */
3255 			sb = raid_bdev->sb;
3256 		}
3257 	}
3258 
3259 	for (i = 0; i < sb->base_bdevs_size; i++) {
3260 		sb_base_bdev = &sb->base_bdevs[i];
3261 
3262 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3263 
3264 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3265 			break;
3266 		}
3267 	}
3268 
3269 	if (i == sb->base_bdevs_size) {
3270 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3271 		return;
3272 	}
3273 
3274 	if (!raid_bdev) {
3275 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3276 		if (rc != 0) {
3277 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3278 				    sb->name, spdk_strerror(-rc));
3279 			return;
3280 		}
3281 	}
3282 
3283 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3284 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3285 			       bdev->name, raid_bdev->bdev.name);
3286 		return;
3287 	}
3288 
3289 	base_info = NULL;
3290 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3291 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3292 			base_info = iter;
3293 			break;
3294 		}
3295 	}
3296 
3297 	if (base_info == NULL) {
3298 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3299 			    bdev->name, raid_bdev->bdev.name);
3300 		return;
3301 	}
3302 
3303 	rc = raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3304 	if (rc != 0) {
3305 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3306 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3307 	}
3308 }
3309 
3310 struct raid_bdev_examine_ctx {
3311 	struct spdk_bdev_desc *desc;
3312 	struct spdk_io_channel *ch;
3313 };
3314 
3315 static void
3316 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3317 {
3318 	if (!ctx) {
3319 		return;
3320 	}
3321 
3322 	if (ctx->ch) {
3323 		spdk_put_io_channel(ctx->ch);
3324 	}
3325 
3326 	if (ctx->desc) {
3327 		spdk_bdev_close(ctx->desc);
3328 	}
3329 
3330 	free(ctx);
3331 }
3332 
3333 static void
3334 raid_bdev_examine_load_sb_cb(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3335 {
3336 	struct raid_bdev_examine_ctx *ctx = _ctx;
3337 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3338 
3339 	switch (status) {
3340 	case 0:
3341 		/* valid superblock found */
3342 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3343 		raid_bdev_examine_sb(sb, bdev);
3344 		break;
3345 	case -EINVAL:
3346 		/* no valid superblock, check if it can be claimed anyway */
3347 		raid_bdev_examine_no_sb(bdev);
3348 		break;
3349 	default:
3350 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3351 			    bdev->name, spdk_strerror(-status));
3352 		break;
3353 	}
3354 
3355 	raid_bdev_examine_ctx_free(ctx);
3356 	spdk_bdev_module_examine_done(&g_raid_if);
3357 }
3358 
3359 static void
3360 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3361 {
3362 }
3363 
3364 /*
3365  * brief:
3366  * raid_bdev_examine function is the examine function call by the below layers
3367  * like bdev_nvme layer. This function will check if this base bdev can be
3368  * claimed by this raid bdev or not.
3369  * params:
3370  * bdev - pointer to base bdev
3371  * returns:
3372  * none
3373  */
3374 static void
3375 raid_bdev_examine(struct spdk_bdev *bdev)
3376 {
3377 	struct raid_bdev_examine_ctx *ctx;
3378 	int rc;
3379 
3380 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3381 		raid_bdev_examine_no_sb(bdev);
3382 		spdk_bdev_module_examine_done(&g_raid_if);
3383 		return;
3384 	}
3385 
3386 	ctx = calloc(1, sizeof(*ctx));
3387 	if (!ctx) {
3388 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3389 			    bdev->name, spdk_strerror(ENOMEM));
3390 		goto err;
3391 	}
3392 
3393 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, raid_bdev_examine_event_cb, NULL,
3394 				&ctx->desc);
3395 	if (rc) {
3396 		SPDK_ERRLOG("Failed to open bdev %s: %s\n",
3397 			    bdev->name, spdk_strerror(-rc));
3398 		goto err;
3399 	}
3400 
3401 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3402 	if (!ctx->ch) {
3403 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev->name);
3404 		goto err;
3405 	}
3406 
3407 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_cb, ctx);
3408 	if (rc) {
3409 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3410 			    bdev->name, spdk_strerror(-rc));
3411 		goto err;
3412 	}
3413 
3414 	return;
3415 err:
3416 	raid_bdev_examine_ctx_free(ctx);
3417 	spdk_bdev_module_examine_done(&g_raid_if);
3418 }
3419 
3420 /* Log component for bdev raid bdev module */
3421 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3422