xref: /spdk/module/bdev/raid/bdev_raid.c (revision beff2dedc047dec06b7e29d038b607a3206a5da9)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 struct raid_base_bdev_info *
141 raid_bdev_channel_get_base_info(struct raid_bdev_io_channel *raid_ch, struct spdk_bdev *base_bdev)
142 {
143 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
144 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
145 	uint8_t i;
146 
147 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
148 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[i];
149 
150 		if (base_info->is_configured &&
151 		    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
152 			return base_info;
153 		}
154 	}
155 
156 	return NULL;
157 }
158 
159 /* Function declarations */
160 static void	raid_bdev_examine(struct spdk_bdev *bdev);
161 static int	raid_bdev_init(void);
162 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
163 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
164 
165 static void
166 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
167 {
168 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
169 
170 	if (raid_ch->process.target_ch != NULL) {
171 		spdk_put_io_channel(raid_ch->process.target_ch);
172 		raid_ch->process.target_ch = NULL;
173 	}
174 
175 	if (raid_ch->process.ch_processed != NULL) {
176 		free(raid_ch->process.ch_processed->base_channel);
177 		free(raid_ch->process.ch_processed);
178 		raid_ch->process.ch_processed = NULL;
179 	}
180 }
181 
182 static int
183 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
184 {
185 	struct raid_bdev *raid_bdev = process->raid_bdev;
186 	struct raid_bdev_io_channel *raid_ch_processed;
187 	struct raid_base_bdev_info *base_info;
188 
189 	raid_ch->process.offset = process->window_offset;
190 
191 	/* In the future we may have other types of processes which don't use a target bdev,
192 	 * like data scrubbing or strip size migration. Until then, expect that there always is
193 	 * a process target. */
194 	assert(process->target != NULL);
195 
196 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
197 	if (raid_ch->process.target_ch == NULL) {
198 		goto err;
199 	}
200 
201 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
202 	if (raid_ch_processed == NULL) {
203 		goto err;
204 	}
205 	raid_ch->process.ch_processed = raid_ch_processed;
206 
207 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
208 					  sizeof(*raid_ch_processed->base_channel));
209 	if (raid_ch_processed->base_channel == NULL) {
210 		goto err;
211 	}
212 
213 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
214 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
215 
216 		if (base_info != process->target) {
217 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
218 		} else {
219 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
220 		}
221 	}
222 
223 	raid_ch_processed->module_channel = raid_ch->module_channel;
224 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
225 
226 	return 0;
227 err:
228 	raid_bdev_ch_process_cleanup(raid_ch);
229 	return -ENOMEM;
230 }
231 
232 /*
233  * brief:
234  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
235  * hierarchy from raid bdev to base bdev io channels. It will be called per core
236  * params:
237  * io_device - pointer to raid bdev io device represented by raid_bdev
238  * ctx_buf - pointer to context buffer for raid bdev io channel
239  * returns:
240  * 0 - success
241  * non zero - failure
242  */
243 static int
244 raid_bdev_create_cb(void *io_device, void *ctx_buf)
245 {
246 	struct raid_bdev            *raid_bdev = io_device;
247 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
248 	uint8_t i;
249 	int ret = -ENOMEM;
250 
251 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
252 
253 	assert(raid_bdev != NULL);
254 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
255 
256 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
257 	if (!raid_ch->base_channel) {
258 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
259 		return -ENOMEM;
260 	}
261 
262 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
263 		/*
264 		 * Get the spdk_io_channel for all the base bdevs. This is used during
265 		 * split logic to send the respective child bdev ios to respective base
266 		 * bdev io channel.
267 		 * Skip missing base bdevs and the process target, which should also be treated as
268 		 * missing until the process completes.
269 		 */
270 		if (raid_bdev->base_bdev_info[i].is_configured == false ||
271 		    raid_bdev->base_bdev_info[i].is_process_target == true) {
272 			continue;
273 		}
274 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
275 						   raid_bdev->base_bdev_info[i].desc);
276 		if (!raid_ch->base_channel[i]) {
277 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
278 			goto err;
279 		}
280 	}
281 
282 	if (raid_bdev->module->get_io_channel) {
283 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
284 		if (!raid_ch->module_channel) {
285 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
286 			goto err;
287 		}
288 	}
289 
290 	if (raid_bdev->process != NULL) {
291 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
292 		if (ret != 0) {
293 			SPDK_ERRLOG("Failed to setup process io channel\n");
294 			goto err;
295 		}
296 	} else {
297 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
298 	}
299 
300 	return 0;
301 err:
302 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
303 		if (raid_ch->base_channel[i] != NULL) {
304 			spdk_put_io_channel(raid_ch->base_channel[i]);
305 		}
306 	}
307 	free(raid_ch->base_channel);
308 
309 	raid_bdev_ch_process_cleanup(raid_ch);
310 
311 	return ret;
312 }
313 
314 /*
315  * brief:
316  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
317  * hierarchy from raid bdev to base bdev io channels. It will be called per core
318  * params:
319  * io_device - pointer to raid bdev io device represented by raid_bdev
320  * ctx_buf - pointer to context buffer for raid bdev io channel
321  * returns:
322  * none
323  */
324 static void
325 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
326 {
327 	struct raid_bdev *raid_bdev = io_device;
328 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
329 	uint8_t i;
330 
331 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
332 
333 	assert(raid_ch != NULL);
334 	assert(raid_ch->base_channel);
335 
336 	if (raid_ch->module_channel) {
337 		spdk_put_io_channel(raid_ch->module_channel);
338 	}
339 
340 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
341 		/* Free base bdev channels */
342 		if (raid_ch->base_channel[i] != NULL) {
343 			spdk_put_io_channel(raid_ch->base_channel[i]);
344 		}
345 	}
346 	free(raid_ch->base_channel);
347 	raid_ch->base_channel = NULL;
348 
349 	raid_bdev_ch_process_cleanup(raid_ch);
350 }
351 
352 /*
353  * brief:
354  * raid_bdev_cleanup is used to cleanup raid_bdev related data
355  * structures.
356  * params:
357  * raid_bdev - pointer to raid_bdev
358  * returns:
359  * none
360  */
361 static void
362 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
363 {
364 	struct raid_base_bdev_info *base_info;
365 
366 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
367 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
368 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
369 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
370 
371 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
372 		assert(base_info->desc == NULL);
373 		free(base_info->name);
374 	}
375 
376 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
377 }
378 
379 static void
380 raid_bdev_free(struct raid_bdev *raid_bdev)
381 {
382 	raid_bdev_free_superblock(raid_bdev);
383 	free(raid_bdev->base_bdev_info);
384 	free(raid_bdev->bdev.name);
385 	free(raid_bdev);
386 }
387 
388 static void
389 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
390 {
391 	raid_bdev_cleanup(raid_bdev);
392 	raid_bdev_free(raid_bdev);
393 }
394 
395 static void
396 raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
397 {
398 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
399 
400 	assert(base_info->is_configured);
401 	assert(raid_bdev->num_base_bdevs_discovered);
402 	raid_bdev->num_base_bdevs_discovered--;
403 	base_info->is_configured = false;
404 	base_info->is_process_target = false;
405 }
406 
407 /*
408  * brief:
409  * free resource of base bdev for raid bdev
410  * params:
411  * base_info - raid base bdev info
412  * returns:
413  * none
414  */
415 static void
416 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
417 {
418 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
419 
420 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
421 
422 	free(base_info->name);
423 	base_info->name = NULL;
424 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
425 		spdk_uuid_set_null(&base_info->uuid);
426 	}
427 	base_info->is_failed = false;
428 
429 	if (base_info->desc == NULL) {
430 		return;
431 	}
432 
433 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
434 	spdk_bdev_close(base_info->desc);
435 	base_info->desc = NULL;
436 	spdk_put_io_channel(base_info->app_thread_ch);
437 	base_info->app_thread_ch = NULL;
438 
439 	if (base_info->is_configured) {
440 		raid_bdev_deconfigure_base_bdev(base_info);
441 	}
442 }
443 
444 static void
445 raid_bdev_io_device_unregister_cb(void *io_device)
446 {
447 	struct raid_bdev *raid_bdev = io_device;
448 
449 	if (raid_bdev->num_base_bdevs_discovered == 0) {
450 		/* Free raid_bdev when there are no base bdevs left */
451 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
452 		raid_bdev_cleanup(raid_bdev);
453 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
454 		raid_bdev_free(raid_bdev);
455 	} else {
456 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
457 	}
458 }
459 
460 void
461 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
462 {
463 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
464 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
465 	}
466 }
467 
468 static void
469 _raid_bdev_destruct(void *ctxt)
470 {
471 	struct raid_bdev *raid_bdev = ctxt;
472 	struct raid_base_bdev_info *base_info;
473 
474 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
475 
476 	assert(raid_bdev->process == NULL);
477 
478 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
479 		/*
480 		 * Close all base bdev descriptors for which call has come from below
481 		 * layers.  Also close the descriptors if we have started shutdown.
482 		 */
483 		if (g_shutdown_started || base_info->remove_scheduled == true) {
484 			raid_bdev_free_base_bdev_resource(base_info);
485 		}
486 	}
487 
488 	if (g_shutdown_started) {
489 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
490 	}
491 
492 	if (raid_bdev->module->stop != NULL) {
493 		if (raid_bdev->module->stop(raid_bdev) == false) {
494 			return;
495 		}
496 	}
497 
498 	raid_bdev_module_stop_done(raid_bdev);
499 }
500 
501 static int
502 raid_bdev_destruct(void *ctx)
503 {
504 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
505 
506 	return 1;
507 }
508 
509 int
510 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
511 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
512 {
513 	struct spdk_dif_ctx dif_ctx;
514 	struct spdk_dif_error err_blk = {};
515 	int rc;
516 	struct spdk_dif_ctx_init_ext_opts dif_opts;
517 	struct iovec md_iov = {
518 		.iov_base	= md_buf,
519 		.iov_len	= num_blocks * bdev->md_len,
520 	};
521 
522 	if (md_buf == NULL) {
523 		return 0;
524 	}
525 
526 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
527 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
528 	rc = spdk_dif_ctx_init(&dif_ctx,
529 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
530 			       bdev->dif_is_head_of_md, bdev->dif_type,
531 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
532 			       0, 0, 0, 0, 0, &dif_opts);
533 	if (rc != 0) {
534 		SPDK_ERRLOG("Initialization of DIF context failed\n");
535 		return rc;
536 	}
537 
538 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
539 
540 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
541 	if (rc != 0) {
542 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
543 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
544 	}
545 
546 	return rc;
547 }
548 
549 int
550 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
551 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
552 {
553 	struct spdk_dif_ctx dif_ctx;
554 	struct spdk_dif_error err_blk = {};
555 	int rc;
556 	struct spdk_dif_ctx_init_ext_opts dif_opts;
557 	struct iovec md_iov = {
558 		.iov_base	= md_buf,
559 		.iov_len	= num_blocks * bdev->md_len,
560 	};
561 
562 	if (md_buf == NULL) {
563 		return 0;
564 	}
565 
566 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
567 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
568 	rc = spdk_dif_ctx_init(&dif_ctx,
569 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
570 			       bdev->dif_is_head_of_md, bdev->dif_type,
571 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
572 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
573 	if (rc != 0) {
574 		SPDK_ERRLOG("Initialization of DIF context failed\n");
575 		return rc;
576 	}
577 
578 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
579 	if (rc != 0) {
580 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
581 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
582 	}
583 
584 	return rc;
585 }
586 
587 void
588 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
589 {
590 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
591 	int rc;
592 
593 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
594 		struct iovec *split_iov = raid_io->split.iov;
595 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
596 
597 		/*
598 		 * Non-zero offset here means that this is the completion of the first part of the
599 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
600 		 */
601 		if (raid_io->split.offset != 0) {
602 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
603 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
604 
605 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
606 				raid_io->num_blocks = raid_io->split.offset;
607 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
608 				raid_io->iovs = bdev_io->u.bdev.iovs;
609 				if (split_iov != NULL) {
610 					raid_io->iovcnt++;
611 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
612 					split_iov->iov_base = split_iov_orig->iov_base;
613 				}
614 
615 				raid_io->split.offset = 0;
616 				raid_io->base_bdev_io_submitted = 0;
617 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
618 
619 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
620 				return;
621 			}
622 		}
623 
624 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
625 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
626 		raid_io->iovs = bdev_io->u.bdev.iovs;
627 		if (split_iov != NULL) {
628 			*split_iov = *split_iov_orig;
629 		}
630 	}
631 
632 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
633 		raid_io->completion_cb(raid_io, status);
634 	} else {
635 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
636 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
637 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
638 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
639 
640 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
641 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
642 							bdev_io->u.bdev.offset_blocks);
643 			if (rc != 0) {
644 				status = SPDK_BDEV_IO_STATUS_FAILED;
645 			}
646 		}
647 		spdk_bdev_io_complete(bdev_io, status);
648 	}
649 }
650 
651 /*
652  * brief:
653  * raid_bdev_io_complete_part - signal the completion of a part of the expected
654  * base bdev IOs and complete the raid_io if this is the final expected IO.
655  * The caller should first set raid_io->base_bdev_io_remaining. This function
656  * will decrement this counter by the value of the 'completed' parameter and
657  * complete the raid_io if the counter reaches 0. The caller is free to
658  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
659  * it can represent e.g. blocks or IOs.
660  * params:
661  * raid_io - pointer to raid_bdev_io
662  * completed - the part of the raid_io that has been completed
663  * status - status of the base IO
664  * returns:
665  * true - if the raid_io is completed
666  * false - otherwise
667  */
668 bool
669 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
670 			   enum spdk_bdev_io_status status)
671 {
672 	assert(raid_io->base_bdev_io_remaining >= completed);
673 	raid_io->base_bdev_io_remaining -= completed;
674 
675 	if (status != raid_io->base_bdev_io_status_default) {
676 		raid_io->base_bdev_io_status = status;
677 	}
678 
679 	if (raid_io->base_bdev_io_remaining == 0) {
680 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
681 		return true;
682 	} else {
683 		return false;
684 	}
685 }
686 
687 /*
688  * brief:
689  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
690  * It will try to queue the IOs after storing the context to bdev wait queue logic.
691  * params:
692  * raid_io - pointer to raid_bdev_io
693  * bdev - the block device that the IO is submitted to
694  * ch - io channel
695  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
696  * returns:
697  * none
698  */
699 void
700 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
701 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
702 {
703 	raid_io->waitq_entry.bdev = bdev;
704 	raid_io->waitq_entry.cb_fn = cb_fn;
705 	raid_io->waitq_entry.cb_arg = raid_io;
706 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
707 }
708 
709 static void
710 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
711 {
712 	struct raid_bdev_io *raid_io = cb_arg;
713 
714 	spdk_bdev_free_io(bdev_io);
715 
716 	raid_bdev_io_complete_part(raid_io, 1, success ?
717 				   SPDK_BDEV_IO_STATUS_SUCCESS :
718 				   SPDK_BDEV_IO_STATUS_FAILED);
719 }
720 
721 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
722 
723 static void
724 _raid_bdev_submit_reset_request(void *_raid_io)
725 {
726 	struct raid_bdev_io *raid_io = _raid_io;
727 
728 	raid_bdev_submit_reset_request(raid_io);
729 }
730 
731 /*
732  * brief:
733  * raid_bdev_submit_reset_request function submits reset requests
734  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
735  * which case it will queue it for later submission
736  * params:
737  * raid_io
738  * returns:
739  * none
740  */
741 static void
742 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
743 {
744 	struct raid_bdev		*raid_bdev;
745 	int				ret;
746 	uint8_t				i;
747 	struct raid_base_bdev_info	*base_info;
748 	struct spdk_io_channel		*base_ch;
749 
750 	raid_bdev = raid_io->raid_bdev;
751 
752 	if (raid_io->base_bdev_io_remaining == 0) {
753 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
754 	}
755 
756 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
757 		base_info = &raid_bdev->base_bdev_info[i];
758 		base_ch = raid_io->raid_ch->base_channel[i];
759 		if (base_ch == NULL) {
760 			raid_io->base_bdev_io_submitted++;
761 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
762 			continue;
763 		}
764 		ret = spdk_bdev_reset(base_info->desc, base_ch,
765 				      raid_base_bdev_reset_complete, raid_io);
766 		if (ret == 0) {
767 			raid_io->base_bdev_io_submitted++;
768 		} else if (ret == -ENOMEM) {
769 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
770 						base_ch, _raid_bdev_submit_reset_request);
771 			return;
772 		} else {
773 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
774 			assert(false);
775 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
776 			return;
777 		}
778 	}
779 }
780 
781 static void
782 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
783 {
784 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
785 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
786 	int i;
787 
788 	assert(split_offset != 0);
789 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
790 	raid_io->split.offset = split_offset;
791 
792 	raid_io->offset_blocks += split_offset;
793 	raid_io->num_blocks -= split_offset;
794 	if (raid_io->md_buf != NULL) {
795 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
796 	}
797 
798 	for (i = 0; i < raid_io->iovcnt; i++) {
799 		struct iovec *iov = &raid_io->iovs[i];
800 
801 		if (iov_offset < iov->iov_len) {
802 			if (iov_offset == 0) {
803 				raid_io->split.iov = NULL;
804 			} else {
805 				raid_io->split.iov = iov;
806 				raid_io->split.iov_copy = *iov;
807 				iov->iov_base += iov_offset;
808 				iov->iov_len -= iov_offset;
809 			}
810 			raid_io->iovs += i;
811 			raid_io->iovcnt -= i;
812 			break;
813 		}
814 
815 		iov_offset -= iov->iov_len;
816 	}
817 }
818 
819 static void
820 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
821 {
822 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
823 
824 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
825 		uint64_t offset_begin = raid_io->offset_blocks;
826 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
827 
828 		if (offset_end > raid_ch->process.offset) {
829 			if (offset_begin < raid_ch->process.offset) {
830 				/*
831 				 * If the I/O spans both the processed and unprocessed ranges,
832 				 * split it and first handle the unprocessed part. After it
833 				 * completes, the rest will be handled.
834 				 * This situation occurs when the process thread is not active
835 				 * or is waiting for the process window range to be locked
836 				 * (quiesced). When a window is being processed, such I/Os will be
837 				 * deferred by the bdev layer until the window is unlocked.
838 				 */
839 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
840 					      raid_ch->process.offset, offset_begin, offset_end);
841 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
842 			}
843 		} else {
844 			/* Use the child channel, which corresponds to the already processed range */
845 			raid_io->raid_ch = raid_ch->process.ch_processed;
846 		}
847 	}
848 
849 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
850 }
851 
852 /*
853  * brief:
854  * Callback function to spdk_bdev_io_get_buf.
855  * params:
856  * ch - pointer to raid bdev io channel
857  * bdev_io - pointer to parent bdev_io on raid bdev device
858  * success - True if buffer is allocated or false otherwise.
859  * returns:
860  * none
861  */
862 static void
863 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
864 		     bool success)
865 {
866 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
867 
868 	if (!success) {
869 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
870 		return;
871 	}
872 
873 	raid_bdev_submit_rw_request(raid_io);
874 }
875 
876 void
877 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
878 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
879 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
880 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
881 {
882 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
883 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
884 
885 	raid_io->type = type;
886 	raid_io->offset_blocks = offset_blocks;
887 	raid_io->num_blocks = num_blocks;
888 	raid_io->iovs = iovs;
889 	raid_io->iovcnt = iovcnt;
890 	raid_io->memory_domain = memory_domain;
891 	raid_io->memory_domain_ctx = memory_domain_ctx;
892 	raid_io->md_buf = md_buf;
893 
894 	raid_io->raid_bdev = raid_bdev;
895 	raid_io->raid_ch = raid_ch;
896 	raid_io->base_bdev_io_remaining = 0;
897 	raid_io->base_bdev_io_submitted = 0;
898 	raid_io->completion_cb = NULL;
899 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
900 
901 	raid_bdev_io_set_default_status(raid_io, SPDK_BDEV_IO_STATUS_SUCCESS);
902 }
903 
904 /*
905  * brief:
906  * raid_bdev_submit_request function is the submit_request function pointer of
907  * raid bdev function table. This is used to submit the io on raid_bdev to below
908  * layers.
909  * params:
910  * ch - pointer to raid bdev io channel
911  * bdev_io - pointer to parent bdev_io on raid bdev device
912  * returns:
913  * none
914  */
915 static void
916 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
917 {
918 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
919 
920 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
921 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
922 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
923 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
924 
925 	switch (bdev_io->type) {
926 	case SPDK_BDEV_IO_TYPE_READ:
927 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
928 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
929 		break;
930 	case SPDK_BDEV_IO_TYPE_WRITE:
931 		raid_bdev_submit_rw_request(raid_io);
932 		break;
933 
934 	case SPDK_BDEV_IO_TYPE_RESET:
935 		raid_bdev_submit_reset_request(raid_io);
936 		break;
937 
938 	case SPDK_BDEV_IO_TYPE_FLUSH:
939 	case SPDK_BDEV_IO_TYPE_UNMAP:
940 		if (raid_io->raid_bdev->process != NULL) {
941 			/* TODO: rebuild support */
942 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
943 			return;
944 		}
945 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
946 		break;
947 
948 	default:
949 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
950 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
951 		break;
952 	}
953 }
954 
955 /*
956  * brief:
957  * _raid_bdev_io_type_supported checks whether io_type is supported in
958  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
959  * doesn't support, the raid device doesn't supports.
960  *
961  * params:
962  * raid_bdev - pointer to raid bdev context
963  * io_type - io type
964  * returns:
965  * true - io_type is supported
966  * false - io_type is not supported
967  */
968 inline static bool
969 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
970 {
971 	struct raid_base_bdev_info *base_info;
972 
973 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
974 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
975 		if (raid_bdev->module->submit_null_payload_request == NULL) {
976 			return false;
977 		}
978 	}
979 
980 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
981 		if (base_info->desc == NULL) {
982 			continue;
983 		}
984 
985 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
986 			return false;
987 		}
988 	}
989 
990 	return true;
991 }
992 
993 /*
994  * brief:
995  * raid_bdev_io_type_supported is the io_supported function for bdev function
996  * table which returns whether the particular io type is supported or not by
997  * raid bdev module
998  * params:
999  * ctx - pointer to raid bdev context
1000  * type - io type
1001  * returns:
1002  * true - io_type is supported
1003  * false - io_type is not supported
1004  */
1005 static bool
1006 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1007 {
1008 	switch (io_type) {
1009 	case SPDK_BDEV_IO_TYPE_READ:
1010 	case SPDK_BDEV_IO_TYPE_WRITE:
1011 		return true;
1012 
1013 	case SPDK_BDEV_IO_TYPE_FLUSH:
1014 	case SPDK_BDEV_IO_TYPE_RESET:
1015 	case SPDK_BDEV_IO_TYPE_UNMAP:
1016 		return _raid_bdev_io_type_supported(ctx, io_type);
1017 
1018 	default:
1019 		return false;
1020 	}
1021 
1022 	return false;
1023 }
1024 
1025 /*
1026  * brief:
1027  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1028  * raid bdev. This is used to return the io channel for this raid bdev
1029  * params:
1030  * ctxt - pointer to raid_bdev
1031  * returns:
1032  * pointer to io channel for raid bdev
1033  */
1034 static struct spdk_io_channel *
1035 raid_bdev_get_io_channel(void *ctxt)
1036 {
1037 	struct raid_bdev *raid_bdev = ctxt;
1038 
1039 	return spdk_get_io_channel(raid_bdev);
1040 }
1041 
1042 void
1043 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1044 {
1045 	struct raid_base_bdev_info *base_info;
1046 
1047 	assert(raid_bdev != NULL);
1048 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1049 
1050 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1051 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1052 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1053 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1054 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1055 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1056 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1057 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1058 				     raid_bdev->num_base_bdevs_operational);
1059 	if (raid_bdev->process) {
1060 		struct raid_bdev_process *process = raid_bdev->process;
1061 		uint64_t offset = process->window_offset;
1062 
1063 		spdk_json_write_named_object_begin(w, "process");
1064 		spdk_json_write_name(w, "type");
1065 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1066 		spdk_json_write_named_string(w, "target", process->target->name);
1067 		spdk_json_write_named_object_begin(w, "progress");
1068 		spdk_json_write_named_uint64(w, "blocks", offset);
1069 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1070 		spdk_json_write_object_end(w);
1071 		spdk_json_write_object_end(w);
1072 	}
1073 	spdk_json_write_name(w, "base_bdevs_list");
1074 	spdk_json_write_array_begin(w);
1075 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1076 		spdk_json_write_object_begin(w);
1077 		spdk_json_write_name(w, "name");
1078 		if (base_info->name) {
1079 			spdk_json_write_string(w, base_info->name);
1080 		} else {
1081 			spdk_json_write_null(w);
1082 		}
1083 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1084 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1085 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1086 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1087 		spdk_json_write_object_end(w);
1088 	}
1089 	spdk_json_write_array_end(w);
1090 }
1091 
1092 /*
1093  * brief:
1094  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1095  * params:
1096  * ctx - pointer to raid_bdev
1097  * w - pointer to json context
1098  * returns:
1099  * 0 - success
1100  * non zero - failure
1101  */
1102 static int
1103 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1104 {
1105 	struct raid_bdev *raid_bdev = ctx;
1106 
1107 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1108 
1109 	/* Dump the raid bdev configuration related information */
1110 	spdk_json_write_named_object_begin(w, "raid");
1111 	raid_bdev_write_info_json(raid_bdev, w);
1112 	spdk_json_write_object_end(w);
1113 
1114 	return 0;
1115 }
1116 
1117 /*
1118  * brief:
1119  * raid_bdev_write_config_json is the function table pointer for raid bdev
1120  * params:
1121  * bdev - pointer to spdk_bdev
1122  * w - pointer to json context
1123  * returns:
1124  * none
1125  */
1126 static void
1127 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1128 {
1129 	struct raid_bdev *raid_bdev = bdev->ctxt;
1130 	struct raid_base_bdev_info *base_info;
1131 
1132 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1133 
1134 	if (raid_bdev->superblock_enabled) {
1135 		/* raid bdev configuration is stored in the superblock */
1136 		return;
1137 	}
1138 
1139 	spdk_json_write_object_begin(w);
1140 
1141 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1142 
1143 	spdk_json_write_named_object_begin(w, "params");
1144 	spdk_json_write_named_string(w, "name", bdev->name);
1145 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1146 	if (raid_bdev->strip_size_kb != 0) {
1147 		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1148 	}
1149 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1150 
1151 	spdk_json_write_named_array_begin(w, "base_bdevs");
1152 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1153 		if (base_info->name) {
1154 			spdk_json_write_string(w, base_info->name);
1155 		} else {
1156 			char str[32];
1157 
1158 			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1159 			spdk_json_write_string(w, str);
1160 		}
1161 	}
1162 	spdk_json_write_array_end(w);
1163 	spdk_json_write_object_end(w);
1164 
1165 	spdk_json_write_object_end(w);
1166 }
1167 
1168 static int
1169 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1170 {
1171 	struct raid_bdev *raid_bdev = ctx;
1172 	struct raid_base_bdev_info *base_info;
1173 	int domains_count = 0, rc = 0;
1174 
1175 	if (raid_bdev->module->memory_domains_supported == false) {
1176 		return 0;
1177 	}
1178 
1179 	/* First loop to get the number of memory domains */
1180 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1181 		if (base_info->is_configured == false) {
1182 			continue;
1183 		}
1184 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1185 		if (rc < 0) {
1186 			return rc;
1187 		}
1188 		domains_count += rc;
1189 	}
1190 
1191 	if (!domains || array_size < domains_count) {
1192 		return domains_count;
1193 	}
1194 
1195 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1196 		if (base_info->is_configured == false) {
1197 			continue;
1198 		}
1199 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1200 		if (rc < 0) {
1201 			return rc;
1202 		}
1203 		domains += rc;
1204 		array_size -= rc;
1205 	}
1206 
1207 	return domains_count;
1208 }
1209 
1210 /* g_raid_bdev_fn_table is the function table for raid bdev */
1211 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1212 	.destruct		= raid_bdev_destruct,
1213 	.submit_request		= raid_bdev_submit_request,
1214 	.io_type_supported	= raid_bdev_io_type_supported,
1215 	.get_io_channel		= raid_bdev_get_io_channel,
1216 	.dump_info_json		= raid_bdev_dump_info_json,
1217 	.write_config_json	= raid_bdev_write_config_json,
1218 	.get_memory_domains	= raid_bdev_get_memory_domains,
1219 };
1220 
1221 struct raid_bdev *
1222 raid_bdev_find_by_name(const char *name)
1223 {
1224 	struct raid_bdev *raid_bdev;
1225 
1226 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1227 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1228 			return raid_bdev;
1229 		}
1230 	}
1231 
1232 	return NULL;
1233 }
1234 
1235 static struct raid_bdev *
1236 raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1237 {
1238 	struct raid_bdev *raid_bdev;
1239 
1240 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1241 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1242 			return raid_bdev;
1243 		}
1244 	}
1245 
1246 	return NULL;
1247 }
1248 
1249 static struct {
1250 	const char *name;
1251 	enum raid_level value;
1252 } g_raid_level_names[] = {
1253 	{ "raid0", RAID0 },
1254 	{ "0", RAID0 },
1255 	{ "raid1", RAID1 },
1256 	{ "1", RAID1 },
1257 	{ "raid5f", RAID5F },
1258 	{ "5f", RAID5F },
1259 	{ "concat", CONCAT },
1260 	{ }
1261 };
1262 
1263 const char *g_raid_state_names[] = {
1264 	[RAID_BDEV_STATE_ONLINE]	= "online",
1265 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1266 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1267 	[RAID_BDEV_STATE_MAX]		= NULL
1268 };
1269 
1270 static const char *g_raid_process_type_names[] = {
1271 	[RAID_PROCESS_NONE]	= "none",
1272 	[RAID_PROCESS_REBUILD]	= "rebuild",
1273 	[RAID_PROCESS_MAX]	= NULL
1274 };
1275 
1276 /* We have to use the typedef in the function declaration to appease astyle. */
1277 typedef enum raid_level raid_level_t;
1278 typedef enum raid_bdev_state raid_bdev_state_t;
1279 
1280 raid_level_t
1281 raid_bdev_str_to_level(const char *str)
1282 {
1283 	unsigned int i;
1284 
1285 	assert(str != NULL);
1286 
1287 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1288 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1289 			return g_raid_level_names[i].value;
1290 		}
1291 	}
1292 
1293 	return INVALID_RAID_LEVEL;
1294 }
1295 
1296 const char *
1297 raid_bdev_level_to_str(enum raid_level level)
1298 {
1299 	unsigned int i;
1300 
1301 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1302 		if (g_raid_level_names[i].value == level) {
1303 			return g_raid_level_names[i].name;
1304 		}
1305 	}
1306 
1307 	return "";
1308 }
1309 
1310 raid_bdev_state_t
1311 raid_bdev_str_to_state(const char *str)
1312 {
1313 	unsigned int i;
1314 
1315 	assert(str != NULL);
1316 
1317 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1318 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1319 			break;
1320 		}
1321 	}
1322 
1323 	return i;
1324 }
1325 
1326 const char *
1327 raid_bdev_state_to_str(enum raid_bdev_state state)
1328 {
1329 	if (state >= RAID_BDEV_STATE_MAX) {
1330 		return "";
1331 	}
1332 
1333 	return g_raid_state_names[state];
1334 }
1335 
1336 const char *
1337 raid_bdev_process_to_str(enum raid_process_type value)
1338 {
1339 	if (value >= RAID_PROCESS_MAX) {
1340 		return "";
1341 	}
1342 
1343 	return g_raid_process_type_names[value];
1344 }
1345 
1346 /*
1347  * brief:
1348  * raid_bdev_fini_start is called when bdev layer is starting the
1349  * shutdown process
1350  * params:
1351  * none
1352  * returns:
1353  * none
1354  */
1355 static void
1356 raid_bdev_fini_start(void)
1357 {
1358 	struct raid_bdev *raid_bdev;
1359 	struct raid_base_bdev_info *base_info;
1360 
1361 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1362 
1363 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1364 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1365 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1366 				raid_bdev_free_base_bdev_resource(base_info);
1367 			}
1368 		}
1369 	}
1370 
1371 	g_shutdown_started = true;
1372 }
1373 
1374 /*
1375  * brief:
1376  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1377  * params:
1378  * none
1379  * returns:
1380  * none
1381  */
1382 static void
1383 raid_bdev_exit(void)
1384 {
1385 	struct raid_bdev *raid_bdev, *tmp;
1386 
1387 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1388 
1389 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1390 		raid_bdev_cleanup_and_free(raid_bdev);
1391 	}
1392 }
1393 
1394 static void
1395 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1396 {
1397 	spdk_json_write_object_begin(w);
1398 
1399 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1400 
1401 	spdk_json_write_named_object_begin(w, "params");
1402 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1403 	spdk_json_write_object_end(w);
1404 
1405 	spdk_json_write_object_end(w);
1406 }
1407 
1408 static int
1409 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1410 {
1411 	raid_bdev_opts_config_json(w);
1412 
1413 	return 0;
1414 }
1415 
1416 /*
1417  * brief:
1418  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1419  * module
1420  * params:
1421  * none
1422  * returns:
1423  * size of spdk_bdev_io context for raid
1424  */
1425 static int
1426 raid_bdev_get_ctx_size(void)
1427 {
1428 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1429 	return sizeof(struct raid_bdev_io);
1430 }
1431 
1432 static struct spdk_bdev_module g_raid_if = {
1433 	.name = "raid",
1434 	.module_init = raid_bdev_init,
1435 	.fini_start = raid_bdev_fini_start,
1436 	.module_fini = raid_bdev_exit,
1437 	.config_json = raid_bdev_config_json,
1438 	.get_ctx_size = raid_bdev_get_ctx_size,
1439 	.examine_disk = raid_bdev_examine,
1440 	.async_init = false,
1441 	.async_fini = false,
1442 };
1443 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1444 
1445 /*
1446  * brief:
1447  * raid_bdev_init is the initialization function for raid bdev module
1448  * params:
1449  * none
1450  * returns:
1451  * 0 - success
1452  * non zero - failure
1453  */
1454 static int
1455 raid_bdev_init(void)
1456 {
1457 	return 0;
1458 }
1459 
1460 static int
1461 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1462 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1463 		  struct raid_bdev **raid_bdev_out)
1464 {
1465 	struct raid_bdev *raid_bdev;
1466 	struct spdk_bdev *raid_bdev_gen;
1467 	struct raid_bdev_module *module;
1468 	struct raid_base_bdev_info *base_info;
1469 	uint8_t min_operational;
1470 
1471 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1472 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1473 		return -EINVAL;
1474 	}
1475 
1476 	if (raid_bdev_find_by_name(name) != NULL) {
1477 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1478 		return -EEXIST;
1479 	}
1480 
1481 	if (level == RAID1) {
1482 		if (strip_size != 0) {
1483 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1484 			return -EINVAL;
1485 		}
1486 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1487 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1488 		return -EINVAL;
1489 	}
1490 
1491 	module = raid_bdev_module_find(level);
1492 	if (module == NULL) {
1493 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1494 		return -EINVAL;
1495 	}
1496 
1497 	assert(module->base_bdevs_min != 0);
1498 	if (num_base_bdevs < module->base_bdevs_min) {
1499 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1500 			    module->base_bdevs_min,
1501 			    raid_bdev_level_to_str(level));
1502 		return -EINVAL;
1503 	}
1504 
1505 	switch (module->base_bdevs_constraint.type) {
1506 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1507 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1508 		break;
1509 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1510 		min_operational = module->base_bdevs_constraint.value;
1511 		break;
1512 	case CONSTRAINT_UNSET:
1513 		if (module->base_bdevs_constraint.value != 0) {
1514 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1515 				    (uint8_t)module->base_bdevs_constraint.value, name);
1516 			return -EINVAL;
1517 		}
1518 		min_operational = num_base_bdevs;
1519 		break;
1520 	default:
1521 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1522 			    (uint8_t)module->base_bdevs_constraint.type,
1523 			    raid_bdev_level_to_str(module->level));
1524 		return -EINVAL;
1525 	};
1526 
1527 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1528 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1529 			    raid_bdev_level_to_str(module->level));
1530 		return -EINVAL;
1531 	}
1532 
1533 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1534 	if (!raid_bdev) {
1535 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1536 		return -ENOMEM;
1537 	}
1538 
1539 	raid_bdev->module = module;
1540 	raid_bdev->num_base_bdevs = num_base_bdevs;
1541 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1542 					   sizeof(struct raid_base_bdev_info));
1543 	if (!raid_bdev->base_bdev_info) {
1544 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1545 		raid_bdev_free(raid_bdev);
1546 		return -ENOMEM;
1547 	}
1548 
1549 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1550 		base_info->raid_bdev = raid_bdev;
1551 	}
1552 
1553 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1554 	 * internally and set later.
1555 	 */
1556 	raid_bdev->strip_size = 0;
1557 	raid_bdev->strip_size_kb = strip_size;
1558 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1559 	raid_bdev->level = level;
1560 	raid_bdev->min_base_bdevs_operational = min_operational;
1561 	raid_bdev->superblock_enabled = superblock_enabled;
1562 
1563 	raid_bdev_gen = &raid_bdev->bdev;
1564 
1565 	raid_bdev_gen->name = strdup(name);
1566 	if (!raid_bdev_gen->name) {
1567 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1568 		raid_bdev_free(raid_bdev);
1569 		return -ENOMEM;
1570 	}
1571 
1572 	raid_bdev_gen->product_name = "Raid Volume";
1573 	raid_bdev_gen->ctxt = raid_bdev;
1574 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1575 	raid_bdev_gen->module = &g_raid_if;
1576 	raid_bdev_gen->write_cache = 0;
1577 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1578 
1579 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1580 
1581 	*raid_bdev_out = raid_bdev;
1582 
1583 	return 0;
1584 }
1585 
1586 /*
1587  * brief:
1588  * raid_bdev_create allocates raid bdev based on passed configuration
1589  * params:
1590  * name - name for raid bdev
1591  * strip_size - strip size in KB
1592  * num_base_bdevs - number of base bdevs
1593  * level - raid level
1594  * superblock_enabled - true if raid should have superblock
1595  * uuid - uuid to set for the bdev
1596  * raid_bdev_out - the created raid bdev
1597  * returns:
1598  * 0 - success
1599  * non zero - failure
1600  */
1601 int
1602 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1603 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1604 		 struct raid_bdev **raid_bdev_out)
1605 {
1606 	struct raid_bdev *raid_bdev;
1607 	int rc;
1608 
1609 	assert(uuid != NULL);
1610 
1611 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1612 			       &raid_bdev);
1613 	if (rc != 0) {
1614 		return rc;
1615 	}
1616 
1617 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1618 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1619 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1620 	}
1621 
1622 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1623 
1624 	*raid_bdev_out = raid_bdev;
1625 
1626 	return 0;
1627 }
1628 
1629 static void
1630 _raid_bdev_unregistering_cont(void *ctx)
1631 {
1632 	struct raid_bdev *raid_bdev = ctx;
1633 
1634 	spdk_bdev_close(raid_bdev->self_desc);
1635 	raid_bdev->self_desc = NULL;
1636 }
1637 
1638 static void
1639 raid_bdev_unregistering_cont(void *ctx)
1640 {
1641 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1642 }
1643 
1644 static int
1645 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1646 {
1647 	struct raid_process_finish_action *finish_action;
1648 
1649 	assert(spdk_get_thread() == process->thread);
1650 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1651 
1652 	finish_action = calloc(1, sizeof(*finish_action));
1653 	if (finish_action == NULL) {
1654 		return -ENOMEM;
1655 	}
1656 
1657 	finish_action->cb = cb;
1658 	finish_action->cb_ctx = cb_ctx;
1659 
1660 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1661 
1662 	return 0;
1663 }
1664 
1665 static void
1666 raid_bdev_unregistering_stop_process(void *ctx)
1667 {
1668 	struct raid_bdev_process *process = ctx;
1669 	struct raid_bdev *raid_bdev = process->raid_bdev;
1670 	int rc;
1671 
1672 	process->state = RAID_PROCESS_STATE_STOPPING;
1673 	if (process->status == 0) {
1674 		process->status = -ECANCELED;
1675 	}
1676 
1677 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1678 	if (rc != 0) {
1679 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1680 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1681 	}
1682 }
1683 
1684 static void
1685 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1686 {
1687 	struct raid_bdev *raid_bdev = event_ctx;
1688 
1689 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1690 		if (raid_bdev->process != NULL) {
1691 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1692 					     raid_bdev->process);
1693 		} else {
1694 			raid_bdev_unregistering_cont(raid_bdev);
1695 		}
1696 	}
1697 }
1698 
1699 static void
1700 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1701 {
1702 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1703 	int rc;
1704 
1705 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1706 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1707 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1708 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1709 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1710 				sizeof(struct raid_bdev_io_channel),
1711 				raid_bdev_gen->name);
1712 	rc = spdk_bdev_register(raid_bdev_gen);
1713 	if (rc != 0) {
1714 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1715 			    raid_bdev_gen->name, spdk_strerror(-rc));
1716 		goto err;
1717 	}
1718 
1719 	/*
1720 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1721 	 * first. The process may still need to unquiesce a range but it will fail because the
1722 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1723 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1724 	 * so this is the only way currently to do this correctly.
1725 	 * TODO: try to handle this correctly in bdev layer instead.
1726 	 */
1727 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1728 				&raid_bdev->self_desc);
1729 	if (rc != 0) {
1730 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1731 			    raid_bdev_gen->name, spdk_strerror(-rc));
1732 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1733 		goto err;
1734 	}
1735 
1736 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1737 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1738 		      raid_bdev_gen->name, raid_bdev);
1739 	return;
1740 err:
1741 	if (raid_bdev->module->stop != NULL) {
1742 		raid_bdev->module->stop(raid_bdev);
1743 	}
1744 	spdk_io_device_unregister(raid_bdev, NULL);
1745 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1746 }
1747 
1748 static void
1749 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1750 {
1751 	if (status == 0) {
1752 		raid_bdev_configure_cont(raid_bdev);
1753 	} else {
1754 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1755 			    raid_bdev->bdev.name, spdk_strerror(-status));
1756 		if (raid_bdev->module->stop != NULL) {
1757 			raid_bdev->module->stop(raid_bdev);
1758 		}
1759 	}
1760 }
1761 
1762 /*
1763  * brief:
1764  * If raid bdev config is complete, then only register the raid bdev to
1765  * bdev layer and remove this raid bdev from configuring list and
1766  * insert the raid bdev to configured list
1767  * params:
1768  * raid_bdev - pointer to raid bdev
1769  * returns:
1770  * 0 - success
1771  * non zero - failure
1772  */
1773 static int
1774 raid_bdev_configure(struct raid_bdev *raid_bdev)
1775 {
1776 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1777 	int rc;
1778 
1779 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1780 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1781 	assert(raid_bdev->bdev.blocklen > 0);
1782 
1783 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1784 	 * internal use.
1785 	 */
1786 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1787 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1788 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1789 		return -EINVAL;
1790 	}
1791 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1792 
1793 	rc = raid_bdev->module->start(raid_bdev);
1794 	if (rc != 0) {
1795 		SPDK_ERRLOG("raid module startup callback failed\n");
1796 		return rc;
1797 	}
1798 
1799 	if (raid_bdev->superblock_enabled) {
1800 		if (raid_bdev->sb == NULL) {
1801 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1802 			if (rc == 0) {
1803 				raid_bdev_init_superblock(raid_bdev);
1804 			}
1805 		} else {
1806 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1807 			if (raid_bdev->sb->block_size != data_block_size) {
1808 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1809 				rc = -EINVAL;
1810 			}
1811 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1812 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1813 				rc = -EINVAL;
1814 			}
1815 		}
1816 
1817 		if (rc != 0) {
1818 			if (raid_bdev->module->stop != NULL) {
1819 				raid_bdev->module->stop(raid_bdev);
1820 			}
1821 			return rc;
1822 		}
1823 
1824 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1825 	} else {
1826 		raid_bdev_configure_cont(raid_bdev);
1827 	}
1828 
1829 	return 0;
1830 }
1831 
1832 /*
1833  * brief:
1834  * If raid bdev is online and registered, change the bdev state to
1835  * configuring and unregister this raid device. Queue this raid device
1836  * in configuring list
1837  * params:
1838  * raid_bdev - pointer to raid bdev
1839  * cb_fn - callback function
1840  * cb_arg - argument to callback function
1841  * returns:
1842  * none
1843  */
1844 static void
1845 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1846 		      void *cb_arg)
1847 {
1848 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1849 		if (cb_fn) {
1850 			cb_fn(cb_arg, 0);
1851 		}
1852 		return;
1853 	}
1854 
1855 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1856 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1857 
1858 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1859 }
1860 
1861 /*
1862  * brief:
1863  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1864  * params:
1865  * base_bdev - pointer to base bdev
1866  * returns:
1867  * base bdev info if found, otherwise NULL.
1868  */
1869 static struct raid_base_bdev_info *
1870 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1871 {
1872 	struct raid_bdev *raid_bdev;
1873 	struct raid_base_bdev_info *base_info;
1874 
1875 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1876 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1877 			if (base_info->desc != NULL &&
1878 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1879 				return base_info;
1880 			}
1881 		}
1882 	}
1883 
1884 	return NULL;
1885 }
1886 
1887 static void
1888 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1889 {
1890 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1891 
1892 	assert(base_info->remove_scheduled);
1893 	base_info->remove_scheduled = false;
1894 
1895 	if (status == 0) {
1896 		raid_bdev->num_base_bdevs_operational--;
1897 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1898 			/* There is not enough base bdevs to keep the raid bdev operational. */
1899 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1900 			return;
1901 		}
1902 	}
1903 
1904 	if (base_info->remove_cb != NULL) {
1905 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1906 	}
1907 }
1908 
1909 static void
1910 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1911 {
1912 	struct raid_base_bdev_info *base_info = ctx;
1913 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1914 
1915 	if (status != 0) {
1916 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1917 			    raid_bdev->bdev.name, spdk_strerror(-status));
1918 	}
1919 
1920 	raid_bdev_remove_base_bdev_done(base_info, status);
1921 }
1922 
1923 static void
1924 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1925 {
1926 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1927 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1928 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1929 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1930 
1931 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1932 
1933 	if (raid_ch->base_channel[idx] != NULL) {
1934 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1935 		raid_ch->base_channel[idx] = NULL;
1936 	}
1937 
1938 	if (raid_ch->process.ch_processed != NULL) {
1939 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1940 	}
1941 
1942 	spdk_for_each_channel_continue(i, 0);
1943 }
1944 
1945 static void
1946 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1947 {
1948 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1949 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1950 
1951 	raid_bdev_free_base_bdev_resource(base_info);
1952 
1953 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1954 			    base_info);
1955 }
1956 
1957 static void
1958 raid_bdev_remove_base_bdev_cont(struct raid_base_bdev_info *base_info)
1959 {
1960 	raid_bdev_deconfigure_base_bdev(base_info);
1961 
1962 	spdk_for_each_channel(base_info->raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1963 			      raid_bdev_channels_remove_base_bdev_done);
1964 }
1965 
1966 static void
1967 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1968 {
1969 	struct raid_base_bdev_info *base_info = ctx;
1970 
1971 	if (status != 0) {
1972 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1973 			    raid_bdev->bdev.name, spdk_strerror(-status));
1974 		raid_bdev_remove_base_bdev_done(base_info, status);
1975 		return;
1976 	}
1977 
1978 	raid_bdev_remove_base_bdev_cont(base_info);
1979 }
1980 
1981 static void
1982 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1983 {
1984 	struct raid_base_bdev_info *base_info = ctx;
1985 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1986 
1987 	if (status != 0) {
1988 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
1989 			    raid_bdev->bdev.name, spdk_strerror(-status));
1990 		raid_bdev_remove_base_bdev_done(base_info, status);
1991 		return;
1992 	}
1993 
1994 	if (raid_bdev->sb) {
1995 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1996 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1997 		uint8_t i;
1998 
1999 		for (i = 0; i < sb->base_bdevs_size; i++) {
2000 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2001 
2002 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
2003 			    sb_base_bdev->slot == slot) {
2004 				if (base_info->is_failed) {
2005 					sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
2006 				} else {
2007 					sb_base_bdev->state = RAID_SB_BASE_BDEV_MISSING;
2008 				}
2009 
2010 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
2011 				return;
2012 			}
2013 		}
2014 	}
2015 
2016 	raid_bdev_remove_base_bdev_cont(base_info);
2017 }
2018 
2019 static int
2020 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2021 {
2022 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2023 
2024 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2025 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2026 }
2027 
2028 struct raid_bdev_process_base_bdev_remove_ctx {
2029 	struct raid_bdev_process *process;
2030 	struct raid_base_bdev_info *base_info;
2031 	uint8_t num_base_bdevs_operational;
2032 };
2033 
2034 static void
2035 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2036 {
2037 	struct raid_base_bdev_info *base_info = ctx;
2038 	int ret;
2039 
2040 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2041 	if (ret != 0) {
2042 		raid_bdev_remove_base_bdev_done(base_info, ret);
2043 	}
2044 }
2045 
2046 static void
2047 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2048 {
2049 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2050 	struct raid_base_bdev_info *base_info = ctx->base_info;
2051 
2052 	free(ctx);
2053 
2054 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2055 			     base_info);
2056 }
2057 
2058 static void
2059 _raid_bdev_process_base_bdev_remove(void *_ctx)
2060 {
2061 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2062 	struct raid_bdev_process *process = ctx->process;
2063 	int ret;
2064 
2065 	if (ctx->base_info != process->target &&
2066 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2067 		/* process doesn't need to be stopped */
2068 		raid_bdev_process_base_bdev_remove_cont(ctx);
2069 		return;
2070 	}
2071 
2072 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2073 	       process->state < RAID_PROCESS_STATE_STOPPED);
2074 
2075 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2076 	if (ret != 0) {
2077 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2078 		free(ctx);
2079 		return;
2080 	}
2081 
2082 	process->state = RAID_PROCESS_STATE_STOPPING;
2083 
2084 	if (process->status == 0) {
2085 		process->status = -ENODEV;
2086 	}
2087 }
2088 
2089 static int
2090 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2091 				   struct raid_base_bdev_info *base_info)
2092 {
2093 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2094 
2095 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2096 
2097 	ctx = calloc(1, sizeof(*ctx));
2098 	if (ctx == NULL) {
2099 		return -ENOMEM;
2100 	}
2101 
2102 	/*
2103 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2104 	 * because the process thread should not access raid_bdev's properties. Particularly,
2105 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2106 	 * will still be valid until the process is fully stopped.
2107 	 */
2108 	ctx->base_info = base_info;
2109 	ctx->process = process;
2110 	/*
2111 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2112 	 * after the removal and more than one base bdev may be removed at the same time
2113 	 */
2114 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2115 		if (base_info->is_configured && !base_info->remove_scheduled) {
2116 			ctx->num_base_bdevs_operational++;
2117 		}
2118 	}
2119 
2120 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2121 
2122 	return 0;
2123 }
2124 
2125 static int
2126 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2127 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2128 {
2129 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2130 	int ret = 0;
2131 
2132 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2133 
2134 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2135 
2136 	if (base_info->remove_scheduled || !base_info->is_configured) {
2137 		return -ENODEV;
2138 	}
2139 
2140 	assert(base_info->desc);
2141 	base_info->remove_scheduled = true;
2142 
2143 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2144 		/*
2145 		 * As raid bdev is not registered yet or already unregistered,
2146 		 * so cleanup should be done here itself.
2147 		 *
2148 		 * Removing a base bdev at this stage does not change the number of operational
2149 		 * base bdevs, only the number of discovered base bdevs.
2150 		 */
2151 		raid_bdev_free_base_bdev_resource(base_info);
2152 		base_info->remove_scheduled = false;
2153 		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2154 		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2155 			/* There is no base bdev for this raid, so free the raid device. */
2156 			raid_bdev_cleanup_and_free(raid_bdev);
2157 		}
2158 		if (cb_fn != NULL) {
2159 			cb_fn(cb_ctx, 0);
2160 		}
2161 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2162 		/* This raid bdev does not tolerate removing a base bdev. */
2163 		raid_bdev->num_base_bdevs_operational--;
2164 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2165 	} else {
2166 		base_info->remove_cb = cb_fn;
2167 		base_info->remove_cb_ctx = cb_ctx;
2168 
2169 		if (raid_bdev->process != NULL) {
2170 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2171 		} else {
2172 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2173 		}
2174 
2175 		if (ret != 0) {
2176 			base_info->remove_scheduled = false;
2177 		}
2178 	}
2179 
2180 	return ret;
2181 }
2182 
2183 /*
2184  * brief:
2185  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2186  * is removed. This function checks if this base bdev is part of any raid bdev
2187  * or not. If yes, it takes necessary action on that particular raid bdev.
2188  * params:
2189  * base_bdev - pointer to base bdev which got removed
2190  * cb_fn - callback function
2191  * cb_arg - argument to callback function
2192  * returns:
2193  * 0 - success
2194  * non zero - failure
2195  */
2196 int
2197 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2198 {
2199 	struct raid_base_bdev_info *base_info;
2200 
2201 	/* Find the raid_bdev which has claimed this base_bdev */
2202 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2203 	if (!base_info) {
2204 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2205 		return -ENODEV;
2206 	}
2207 
2208 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2209 }
2210 
2211 static void
2212 raid_bdev_fail_base_remove_cb(void *ctx, int status)
2213 {
2214 	struct raid_base_bdev_info *base_info = ctx;
2215 
2216 	if (status != 0) {
2217 		SPDK_WARNLOG("Failed to remove base bdev %s\n", base_info->name);
2218 		base_info->is_failed = false;
2219 	}
2220 }
2221 
2222 static void
2223 _raid_bdev_fail_base_bdev(void *ctx)
2224 {
2225 	struct raid_base_bdev_info *base_info = ctx;
2226 	int rc;
2227 
2228 	if (base_info->is_failed) {
2229 		return;
2230 	}
2231 	base_info->is_failed = true;
2232 
2233 	SPDK_NOTICELOG("Failing base bdev in slot %d ('%s') of raid bdev '%s'\n",
2234 		       raid_bdev_base_bdev_slot(base_info), base_info->name, base_info->raid_bdev->bdev.name);
2235 
2236 	rc = _raid_bdev_remove_base_bdev(base_info, raid_bdev_fail_base_remove_cb, base_info);
2237 	if (rc != 0) {
2238 		raid_bdev_fail_base_remove_cb(base_info, rc);
2239 	}
2240 }
2241 
2242 void
2243 raid_bdev_fail_base_bdev(struct raid_base_bdev_info *base_info)
2244 {
2245 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_fail_base_bdev, base_info);
2246 }
2247 
2248 static void
2249 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2250 {
2251 	if (status != 0) {
2252 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2253 			    raid_bdev->bdev.name, spdk_strerror(-status));
2254 	}
2255 }
2256 
2257 /*
2258  * brief:
2259  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2260  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2261  * If yes, call module handler to resize the raid_bdev if implemented.
2262  * params:
2263  * base_bdev - pointer to base bdev which got resized.
2264  * returns:
2265  * none
2266  */
2267 static void
2268 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2269 {
2270 	struct raid_bdev *raid_bdev;
2271 	struct raid_base_bdev_info *base_info;
2272 	uint64_t blockcnt_old;
2273 
2274 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2275 
2276 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2277 
2278 	/* Find the raid_bdev which has claimed this base_bdev */
2279 	if (!base_info) {
2280 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2281 		return;
2282 	}
2283 	raid_bdev = base_info->raid_bdev;
2284 
2285 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2286 
2287 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2288 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2289 
2290 	base_info->blockcnt = base_bdev->blockcnt;
2291 
2292 	if (!raid_bdev->module->resize) {
2293 		return;
2294 	}
2295 
2296 	blockcnt_old = raid_bdev->bdev.blockcnt;
2297 	if (raid_bdev->module->resize(raid_bdev) == false) {
2298 		return;
2299 	}
2300 
2301 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2302 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2303 
2304 	if (raid_bdev->superblock_enabled) {
2305 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2306 		uint8_t i;
2307 
2308 		for (i = 0; i < sb->base_bdevs_size; i++) {
2309 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2310 
2311 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
2312 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2313 				sb_base_bdev->data_size = base_info->data_size;
2314 			}
2315 		}
2316 		sb->raid_size = raid_bdev->bdev.blockcnt;
2317 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2318 	}
2319 }
2320 
2321 /*
2322  * brief:
2323  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2324  * triggers asynchronous event.
2325  * params:
2326  * type - event details.
2327  * bdev - bdev that triggered event.
2328  * event_ctx - context for event.
2329  * returns:
2330  * none
2331  */
2332 static void
2333 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2334 			  void *event_ctx)
2335 {
2336 	int rc;
2337 
2338 	switch (type) {
2339 	case SPDK_BDEV_EVENT_REMOVE:
2340 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2341 		if (rc != 0) {
2342 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2343 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2344 		}
2345 		break;
2346 	case SPDK_BDEV_EVENT_RESIZE:
2347 		raid_bdev_resize_base_bdev(bdev);
2348 		break;
2349 	default:
2350 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2351 		break;
2352 	}
2353 }
2354 
2355 /*
2356  * brief:
2357  * Deletes the specified raid bdev
2358  * params:
2359  * raid_bdev - pointer to raid bdev
2360  * cb_fn - callback function
2361  * cb_arg - argument to callback function
2362  */
2363 void
2364 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2365 {
2366 	struct raid_base_bdev_info *base_info;
2367 
2368 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2369 
2370 	if (raid_bdev->destroy_started) {
2371 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2372 			      raid_bdev->bdev.name);
2373 		if (cb_fn) {
2374 			cb_fn(cb_arg, -EALREADY);
2375 		}
2376 		return;
2377 	}
2378 
2379 	raid_bdev->destroy_started = true;
2380 
2381 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2382 		base_info->remove_scheduled = true;
2383 
2384 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2385 			/*
2386 			 * As raid bdev is not registered yet or already unregistered,
2387 			 * so cleanup should be done here itself.
2388 			 */
2389 			raid_bdev_free_base_bdev_resource(base_info);
2390 		}
2391 	}
2392 
2393 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2394 		/* There is no base bdev for this raid, so free the raid device. */
2395 		raid_bdev_cleanup_and_free(raid_bdev);
2396 		if (cb_fn) {
2397 			cb_fn(cb_arg, 0);
2398 		}
2399 	} else {
2400 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2401 	}
2402 }
2403 
2404 static void
2405 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2406 {
2407 	if (status != 0) {
2408 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2409 			    raid_bdev->bdev.name, spdk_strerror(-status));
2410 	}
2411 }
2412 
2413 static void
2414 raid_bdev_process_finish_write_sb(void *ctx)
2415 {
2416 	struct raid_bdev *raid_bdev = ctx;
2417 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2418 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2419 	struct raid_base_bdev_info *base_info;
2420 	uint8_t i;
2421 
2422 	for (i = 0; i < sb->base_bdevs_size; i++) {
2423 		sb_base_bdev = &sb->base_bdevs[i];
2424 
2425 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2426 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2427 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2428 			if (base_info->is_configured) {
2429 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2430 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2431 			}
2432 		}
2433 	}
2434 
2435 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2436 }
2437 
2438 static void raid_bdev_process_free(struct raid_bdev_process *process);
2439 
2440 static void
2441 _raid_bdev_process_finish_done(void *ctx)
2442 {
2443 	struct raid_bdev_process *process = ctx;
2444 	struct raid_process_finish_action *finish_action;
2445 
2446 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2447 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2448 		finish_action->cb(finish_action->cb_ctx);
2449 		free(finish_action);
2450 	}
2451 
2452 	raid_bdev_process_free(process);
2453 
2454 	spdk_thread_exit(spdk_get_thread());
2455 }
2456 
2457 static void
2458 raid_bdev_process_finish_target_removed(void *ctx, int status)
2459 {
2460 	struct raid_bdev_process *process = ctx;
2461 
2462 	if (status != 0) {
2463 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2464 	}
2465 
2466 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2467 }
2468 
2469 static void
2470 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2471 {
2472 	struct raid_bdev_process *process = ctx;
2473 
2474 	if (status != 0) {
2475 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2476 	}
2477 
2478 	if (process->status != 0) {
2479 		status = _raid_bdev_remove_base_bdev(process->target, raid_bdev_process_finish_target_removed,
2480 						     process);
2481 		if (status != 0) {
2482 			raid_bdev_process_finish_target_removed(process, status);
2483 		}
2484 		return;
2485 	}
2486 
2487 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2488 }
2489 
2490 static void
2491 raid_bdev_process_finish_unquiesce(void *ctx)
2492 {
2493 	struct raid_bdev_process *process = ctx;
2494 	int rc;
2495 
2496 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2497 				 raid_bdev_process_finish_unquiesced, process);
2498 	if (rc != 0) {
2499 		raid_bdev_process_finish_unquiesced(process, rc);
2500 	}
2501 }
2502 
2503 static void
2504 raid_bdev_process_finish_done(void *ctx)
2505 {
2506 	struct raid_bdev_process *process = ctx;
2507 	struct raid_bdev *raid_bdev = process->raid_bdev;
2508 
2509 	if (process->raid_ch != NULL) {
2510 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2511 	}
2512 
2513 	process->state = RAID_PROCESS_STATE_STOPPED;
2514 
2515 	if (process->status == 0) {
2516 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2517 			       raid_bdev_process_to_str(process->type),
2518 			       raid_bdev->bdev.name);
2519 		if (raid_bdev->superblock_enabled) {
2520 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2521 					     raid_bdev_process_finish_write_sb,
2522 					     raid_bdev);
2523 		}
2524 	} else {
2525 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2526 			     raid_bdev_process_to_str(process->type),
2527 			     raid_bdev->bdev.name,
2528 			     spdk_strerror(-process->status));
2529 	}
2530 
2531 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2532 			     process);
2533 }
2534 
2535 static void
2536 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2537 {
2538 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2539 
2540 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2541 }
2542 
2543 static void
2544 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2545 {
2546 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2547 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2548 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2549 
2550 	if (process->status == 0) {
2551 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2552 
2553 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2554 		raid_ch->process.target_ch = NULL;
2555 	}
2556 
2557 	raid_bdev_ch_process_cleanup(raid_ch);
2558 
2559 	spdk_for_each_channel_continue(i, 0);
2560 }
2561 
2562 static void
2563 raid_bdev_process_finish_quiesced(void *ctx, int status)
2564 {
2565 	struct raid_bdev_process *process = ctx;
2566 	struct raid_bdev *raid_bdev = process->raid_bdev;
2567 
2568 	if (status != 0) {
2569 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2570 		return;
2571 	}
2572 
2573 	raid_bdev->process = NULL;
2574 	process->target->is_process_target = false;
2575 
2576 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2577 			      __raid_bdev_process_finish);
2578 }
2579 
2580 static void
2581 _raid_bdev_process_finish(void *ctx)
2582 {
2583 	struct raid_bdev_process *process = ctx;
2584 	int rc;
2585 
2586 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2587 			       raid_bdev_process_finish_quiesced, process);
2588 	if (rc != 0) {
2589 		raid_bdev_process_finish_quiesced(ctx, rc);
2590 	}
2591 }
2592 
2593 static void
2594 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2595 {
2596 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2597 }
2598 
2599 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2600 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2601 
2602 static void
2603 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2604 {
2605 	assert(spdk_get_thread() == process->thread);
2606 
2607 	if (process->status == 0) {
2608 		process->status = status;
2609 	}
2610 
2611 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2612 		return;
2613 	}
2614 
2615 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2616 	process->state = RAID_PROCESS_STATE_STOPPING;
2617 
2618 	if (process->window_range_locked) {
2619 		raid_bdev_process_unlock_window_range(process);
2620 	} else {
2621 		raid_bdev_process_thread_run(process);
2622 	}
2623 }
2624 
2625 static void
2626 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2627 {
2628 	struct raid_bdev_process *process = ctx;
2629 
2630 	if (status != 0) {
2631 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2632 		raid_bdev_process_finish(process, status);
2633 		return;
2634 	}
2635 
2636 	process->window_range_locked = false;
2637 	process->window_offset += process->window_size;
2638 
2639 	raid_bdev_process_thread_run(process);
2640 }
2641 
2642 static void
2643 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2644 {
2645 	int rc;
2646 
2647 	assert(process->window_range_locked == true);
2648 
2649 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2650 				       process->window_offset, process->max_window_size,
2651 				       raid_bdev_process_window_range_unlocked, process);
2652 	if (rc != 0) {
2653 		raid_bdev_process_window_range_unlocked(process, rc);
2654 	}
2655 }
2656 
2657 static void
2658 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2659 {
2660 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2661 
2662 	raid_bdev_process_unlock_window_range(process);
2663 }
2664 
2665 static void
2666 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2667 {
2668 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2669 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2670 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2671 
2672 	raid_ch->process.offset = process->window_offset + process->window_size;
2673 
2674 	spdk_for_each_channel_continue(i, 0);
2675 }
2676 
2677 void
2678 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2679 {
2680 	struct raid_bdev_process *process = process_req->process;
2681 
2682 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2683 
2684 	assert(spdk_get_thread() == process->thread);
2685 	assert(process->window_remaining >= process_req->num_blocks);
2686 
2687 	if (status != 0) {
2688 		process->window_status = status;
2689 	}
2690 
2691 	process->window_remaining -= process_req->num_blocks;
2692 	if (process->window_remaining == 0) {
2693 		if (process->window_status != 0) {
2694 			raid_bdev_process_finish(process, process->window_status);
2695 			return;
2696 		}
2697 
2698 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2699 				      raid_bdev_process_channels_update_done);
2700 	}
2701 }
2702 
2703 static int
2704 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2705 				 uint32_t num_blocks)
2706 {
2707 	struct raid_bdev *raid_bdev = process->raid_bdev;
2708 	struct raid_bdev_process_request *process_req;
2709 	int ret;
2710 
2711 	process_req = TAILQ_FIRST(&process->requests);
2712 	if (process_req == NULL) {
2713 		assert(process->window_remaining > 0);
2714 		return 0;
2715 	}
2716 
2717 	process_req->target = process->target;
2718 	process_req->target_ch = process->raid_ch->process.target_ch;
2719 	process_req->offset_blocks = offset_blocks;
2720 	process_req->num_blocks = num_blocks;
2721 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2722 
2723 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2724 	if (ret <= 0) {
2725 		if (ret < 0) {
2726 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2727 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2728 			process->window_status = ret;
2729 		}
2730 		return ret;
2731 	}
2732 
2733 	process_req->num_blocks = ret;
2734 	TAILQ_REMOVE(&process->requests, process_req, link);
2735 
2736 	return ret;
2737 }
2738 
2739 static void
2740 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2741 {
2742 	struct raid_bdev *raid_bdev = process->raid_bdev;
2743 	uint64_t offset = process->window_offset;
2744 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2745 	int ret;
2746 
2747 	while (offset < offset_end) {
2748 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2749 		if (ret <= 0) {
2750 			break;
2751 		}
2752 
2753 		process->window_remaining += ret;
2754 		offset += ret;
2755 	}
2756 
2757 	if (process->window_remaining > 0) {
2758 		process->window_size = process->window_remaining;
2759 	} else {
2760 		raid_bdev_process_finish(process, process->window_status);
2761 	}
2762 }
2763 
2764 static void
2765 raid_bdev_process_window_range_locked(void *ctx, int status)
2766 {
2767 	struct raid_bdev_process *process = ctx;
2768 
2769 	if (status != 0) {
2770 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2771 		raid_bdev_process_finish(process, status);
2772 		return;
2773 	}
2774 
2775 	process->window_range_locked = true;
2776 
2777 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2778 		raid_bdev_process_unlock_window_range(process);
2779 		return;
2780 	}
2781 
2782 	_raid_bdev_process_thread_run(process);
2783 }
2784 
2785 static void
2786 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2787 {
2788 	struct raid_bdev *raid_bdev = process->raid_bdev;
2789 	int rc;
2790 
2791 	assert(spdk_get_thread() == process->thread);
2792 	assert(process->window_remaining == 0);
2793 	assert(process->window_range_locked == false);
2794 
2795 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2796 		raid_bdev_process_do_finish(process);
2797 		return;
2798 	}
2799 
2800 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2801 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2802 		raid_bdev_process_finish(process, 0);
2803 		return;
2804 	}
2805 
2806 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2807 					    process->max_window_size);
2808 
2809 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2810 				     process->window_offset, process->max_window_size,
2811 				     raid_bdev_process_window_range_locked, process);
2812 	if (rc != 0) {
2813 		raid_bdev_process_window_range_locked(process, rc);
2814 	}
2815 }
2816 
2817 static void
2818 raid_bdev_process_thread_init(void *ctx)
2819 {
2820 	struct raid_bdev_process *process = ctx;
2821 	struct raid_bdev *raid_bdev = process->raid_bdev;
2822 	struct spdk_io_channel *ch;
2823 
2824 	process->thread = spdk_get_thread();
2825 
2826 	ch = spdk_get_io_channel(raid_bdev);
2827 	if (ch == NULL) {
2828 		process->status = -ENOMEM;
2829 		raid_bdev_process_do_finish(process);
2830 		return;
2831 	}
2832 
2833 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2834 	process->state = RAID_PROCESS_STATE_RUNNING;
2835 
2836 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2837 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2838 
2839 	raid_bdev_process_thread_run(process);
2840 }
2841 
2842 static void
2843 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2844 {
2845 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2846 
2847 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2848 	raid_bdev_process_free(process);
2849 
2850 	/* TODO: update sb */
2851 }
2852 
2853 static void
2854 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2855 {
2856 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2857 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2858 
2859 	raid_bdev_ch_process_cleanup(raid_ch);
2860 
2861 	spdk_for_each_channel_continue(i, 0);
2862 }
2863 
2864 static void
2865 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2866 {
2867 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2868 	struct raid_bdev *raid_bdev = process->raid_bdev;
2869 	struct spdk_thread *thread;
2870 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2871 
2872 	if (status == 0 &&
2873 	    (process->target->remove_scheduled || !process->target->is_configured ||
2874 	     raid_bdev->num_base_bdevs_operational <= raid_bdev->min_base_bdevs_operational)) {
2875 		/* a base bdev was removed before we got here */
2876 		status = -ENODEV;
2877 	}
2878 
2879 	if (status != 0) {
2880 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2881 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2882 			    spdk_strerror(-status));
2883 		goto err;
2884 	}
2885 
2886 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2887 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2888 
2889 	thread = spdk_thread_create(thread_name, NULL);
2890 	if (thread == NULL) {
2891 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2892 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2893 		goto err;
2894 	}
2895 
2896 	raid_bdev->process = process;
2897 
2898 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2899 
2900 	return;
2901 err:
2902 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2903 			      raid_bdev_channels_abort_start_process_done);
2904 }
2905 
2906 static void
2907 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2908 {
2909 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2910 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2911 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2912 	int rc;
2913 
2914 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2915 
2916 	spdk_for_each_channel_continue(i, rc);
2917 }
2918 
2919 static void
2920 raid_bdev_process_start(struct raid_bdev_process *process)
2921 {
2922 	struct raid_bdev *raid_bdev = process->raid_bdev;
2923 
2924 	assert(raid_bdev->module->submit_process_request != NULL);
2925 
2926 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2927 			      raid_bdev_channels_start_process_done);
2928 }
2929 
2930 static void
2931 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2932 {
2933 	spdk_dma_free(process_req->iov.iov_base);
2934 	spdk_dma_free(process_req->md_buf);
2935 	free(process_req);
2936 }
2937 
2938 static struct raid_bdev_process_request *
2939 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2940 {
2941 	struct raid_bdev *raid_bdev = process->raid_bdev;
2942 	struct raid_bdev_process_request *process_req;
2943 
2944 	process_req = calloc(1, sizeof(*process_req));
2945 	if (process_req == NULL) {
2946 		return NULL;
2947 	}
2948 
2949 	process_req->process = process;
2950 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2951 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2952 	if (process_req->iov.iov_base == NULL) {
2953 		free(process_req);
2954 		return NULL;
2955 	}
2956 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2957 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2958 		if (process_req->md_buf == NULL) {
2959 			raid_bdev_process_request_free(process_req);
2960 			return NULL;
2961 		}
2962 	}
2963 
2964 	return process_req;
2965 }
2966 
2967 static void
2968 raid_bdev_process_free(struct raid_bdev_process *process)
2969 {
2970 	struct raid_bdev_process_request *process_req;
2971 
2972 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2973 		TAILQ_REMOVE(&process->requests, process_req, link);
2974 		raid_bdev_process_request_free(process_req);
2975 	}
2976 
2977 	free(process);
2978 }
2979 
2980 static struct raid_bdev_process *
2981 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2982 			struct raid_base_bdev_info *target)
2983 {
2984 	struct raid_bdev_process *process;
2985 	struct raid_bdev_process_request *process_req;
2986 	int i;
2987 
2988 	process = calloc(1, sizeof(*process));
2989 	if (process == NULL) {
2990 		return NULL;
2991 	}
2992 
2993 	process->raid_bdev = raid_bdev;
2994 	process->type = type;
2995 	process->target = target;
2996 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2997 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
2998 					    raid_bdev->bdev.write_unit_size);
2999 	TAILQ_INIT(&process->requests);
3000 	TAILQ_INIT(&process->finish_actions);
3001 
3002 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
3003 		process_req = raid_bdev_process_alloc_request(process);
3004 		if (process_req == NULL) {
3005 			raid_bdev_process_free(process);
3006 			return NULL;
3007 		}
3008 
3009 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
3010 	}
3011 
3012 	return process;
3013 }
3014 
3015 static int
3016 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
3017 {
3018 	struct raid_bdev_process *process;
3019 
3020 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3021 
3022 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
3023 	if (process == NULL) {
3024 		return -ENOMEM;
3025 	}
3026 
3027 	raid_bdev_process_start(process);
3028 
3029 	return 0;
3030 }
3031 
3032 static void raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info);
3033 
3034 static void
3035 _raid_bdev_configure_base_bdev_cont(struct spdk_io_channel_iter *i, int status)
3036 {
3037 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
3038 
3039 	raid_bdev_configure_base_bdev_cont(base_info);
3040 }
3041 
3042 static void
3043 raid_bdev_ch_sync(struct spdk_io_channel_iter *i)
3044 {
3045 	spdk_for_each_channel_continue(i, 0);
3046 }
3047 
3048 static void
3049 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
3050 {
3051 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3052 	int rc;
3053 
3054 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational &&
3055 	    base_info->is_process_target == false) {
3056 		/* TODO: defer if rebuild in progress on another base bdev */
3057 		assert(raid_bdev->process == NULL);
3058 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3059 		base_info->is_process_target = true;
3060 		/* To assure is_process_target is set before is_configured when checked in raid_bdev_create_cb() */
3061 		spdk_for_each_channel(raid_bdev, raid_bdev_ch_sync, base_info, _raid_bdev_configure_base_bdev_cont);
3062 		return;
3063 	}
3064 
3065 	base_info->is_configured = true;
3066 
3067 	raid_bdev->num_base_bdevs_discovered++;
3068 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
3069 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
3070 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
3071 
3072 	/*
3073 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3074 	 * of base bdevs we know to be operational members of the array. Usually this is equal
3075 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3076 	 * degraded.
3077 	 */
3078 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3079 		rc = raid_bdev_configure(raid_bdev);
3080 		if (rc != 0) {
3081 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3082 		}
3083 	} else if (base_info->is_process_target) {
3084 		raid_bdev->num_base_bdevs_operational++;
3085 		rc = raid_bdev_start_rebuild(base_info);
3086 		if (rc != 0) {
3087 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3088 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3089 		}
3090 	} else {
3091 		rc = 0;
3092 	}
3093 
3094 	if (base_info->configure_cb != NULL) {
3095 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
3096 	}
3097 }
3098 
3099 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3100 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3101 
3102 static void
3103 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3104 		void *ctx)
3105 {
3106 	struct raid_base_bdev_info *base_info = ctx;
3107 
3108 	switch (status) {
3109 	case 0:
3110 		/* valid superblock found */
3111 		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3112 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3113 
3114 			raid_bdev_free_base_bdev_resource(base_info);
3115 			raid_bdev_examine_sb(sb, bdev, base_info->configure_cb, base_info->configure_cb_ctx);
3116 			return;
3117 		}
3118 		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3119 		status = -EEXIST;
3120 		raid_bdev_free_base_bdev_resource(base_info);
3121 		break;
3122 	case -EINVAL:
3123 		/* no valid superblock */
3124 		raid_bdev_configure_base_bdev_cont(base_info);
3125 		return;
3126 	default:
3127 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3128 			    base_info->name, spdk_strerror(-status));
3129 		break;
3130 	}
3131 
3132 	if (base_info->configure_cb != NULL) {
3133 		base_info->configure_cb(base_info->configure_cb_ctx, status);
3134 	}
3135 }
3136 
3137 static int
3138 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3139 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3140 {
3141 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3142 	struct spdk_bdev_desc *desc;
3143 	struct spdk_bdev *bdev;
3144 	const struct spdk_uuid *bdev_uuid;
3145 	int rc;
3146 
3147 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3148 	assert(base_info->desc == NULL);
3149 
3150 	/*
3151 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3152 	 * before claiming the bdev.
3153 	 */
3154 
3155 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3156 		char uuid_str[SPDK_UUID_STRING_LEN];
3157 		const char *bdev_name;
3158 
3159 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3160 
3161 		/* UUID of a bdev is registered as its alias */
3162 		bdev = spdk_bdev_get_by_name(uuid_str);
3163 		if (bdev == NULL) {
3164 			return -ENODEV;
3165 		}
3166 
3167 		bdev_name = spdk_bdev_get_name(bdev);
3168 
3169 		if (base_info->name == NULL) {
3170 			assert(existing == true);
3171 			base_info->name = strdup(bdev_name);
3172 			if (base_info->name == NULL) {
3173 				return -ENOMEM;
3174 			}
3175 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3176 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3177 				    bdev_name, base_info->name);
3178 			return -EINVAL;
3179 		}
3180 	}
3181 
3182 	assert(base_info->name != NULL);
3183 
3184 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3185 	if (rc != 0) {
3186 		if (rc != -ENODEV) {
3187 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3188 		}
3189 		return rc;
3190 	}
3191 
3192 	bdev = spdk_bdev_desc_get_bdev(desc);
3193 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3194 
3195 	if (spdk_uuid_is_null(&base_info->uuid)) {
3196 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3197 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3198 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3199 		spdk_bdev_close(desc);
3200 		return -EINVAL;
3201 	}
3202 
3203 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3204 	if (rc != 0) {
3205 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3206 		spdk_bdev_close(desc);
3207 		return rc;
3208 	}
3209 
3210 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3211 
3212 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3213 	if (base_info->app_thread_ch == NULL) {
3214 		SPDK_ERRLOG("Failed to get io channel\n");
3215 		spdk_bdev_module_release_bdev(bdev);
3216 		spdk_bdev_close(desc);
3217 		return -ENOMEM;
3218 	}
3219 
3220 	base_info->desc = desc;
3221 	base_info->blockcnt = bdev->blockcnt;
3222 
3223 	if (raid_bdev->superblock_enabled) {
3224 		uint64_t data_offset;
3225 
3226 		if (base_info->data_offset == 0) {
3227 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3228 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3229 		} else {
3230 			data_offset = base_info->data_offset;
3231 		}
3232 
3233 		if (bdev->optimal_io_boundary != 0) {
3234 			data_offset = spdk_divide_round_up(data_offset,
3235 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3236 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3237 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3238 					     base_info->data_offset, base_info->name, data_offset);
3239 				data_offset = base_info->data_offset;
3240 			}
3241 		}
3242 
3243 		base_info->data_offset = data_offset;
3244 	}
3245 
3246 	if (base_info->data_offset >= bdev->blockcnt) {
3247 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3248 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3249 		rc = -EINVAL;
3250 		goto out;
3251 	}
3252 
3253 	if (base_info->data_size == 0) {
3254 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3255 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3256 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3257 			    bdev->blockcnt, base_info->name);
3258 		rc = -EINVAL;
3259 		goto out;
3260 	}
3261 
3262 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3263 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3264 			    bdev->name);
3265 		rc = -EINVAL;
3266 		goto out;
3267 	}
3268 
3269 	/*
3270 	 * Set the raid bdev properties if this is the first base bdev configured,
3271 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3272 	 * have the same blocklen and metadata format.
3273 	 */
3274 	if (raid_bdev->bdev.blocklen == 0) {
3275 		raid_bdev->bdev.blocklen = bdev->blocklen;
3276 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3277 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3278 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3279 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3280 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3281 	} else {
3282 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3283 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3284 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3285 			rc = -EINVAL;
3286 			goto out;
3287 		}
3288 
3289 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3290 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3291 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3292 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3293 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev)) {
3294 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3295 				    raid_bdev->bdev.name, bdev->name);
3296 			rc = -EINVAL;
3297 			goto out;
3298 		}
3299 	}
3300 
3301 	base_info->configure_cb = cb_fn;
3302 	base_info->configure_cb_ctx = cb_ctx;
3303 
3304 	if (existing) {
3305 		raid_bdev_configure_base_bdev_cont(base_info);
3306 	} else {
3307 		/* check for existing superblock when using a new bdev */
3308 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3309 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3310 		if (rc) {
3311 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3312 				    bdev->name, spdk_strerror(-rc));
3313 		}
3314 	}
3315 out:
3316 	if (rc != 0) {
3317 		raid_bdev_free_base_bdev_resource(base_info);
3318 	}
3319 	return rc;
3320 }
3321 
3322 int
3323 raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3324 			raid_base_bdev_cb cb_fn, void *cb_ctx)
3325 {
3326 	struct raid_base_bdev_info *base_info = NULL, *iter;
3327 	int rc;
3328 
3329 	assert(name != NULL);
3330 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3331 
3332 	if (raid_bdev->process != NULL) {
3333 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3334 			    raid_bdev->bdev.name);
3335 		return -EPERM;
3336 	}
3337 
3338 	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3339 		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3340 
3341 		if (bdev != NULL) {
3342 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3343 				if (iter->name == NULL &&
3344 				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3345 					base_info = iter;
3346 					break;
3347 				}
3348 			}
3349 		}
3350 	}
3351 
3352 	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3353 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3354 			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3355 				base_info = iter;
3356 				break;
3357 			}
3358 		}
3359 	}
3360 
3361 	if (base_info == NULL) {
3362 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3363 			    raid_bdev->bdev.name, name);
3364 		return -EINVAL;
3365 	}
3366 
3367 	assert(base_info->is_configured == false);
3368 
3369 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3370 		assert(base_info->data_size != 0);
3371 		assert(base_info->desc == NULL);
3372 	}
3373 
3374 	base_info->name = strdup(name);
3375 	if (base_info->name == NULL) {
3376 		return -ENOMEM;
3377 	}
3378 
3379 	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3380 	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3381 		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3382 		free(base_info->name);
3383 		base_info->name = NULL;
3384 	}
3385 
3386 	return rc;
3387 }
3388 
3389 static int
3390 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3391 {
3392 	struct raid_bdev *raid_bdev;
3393 	uint8_t i;
3394 	int rc;
3395 
3396 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3397 			       sb->level, true, &sb->uuid, &raid_bdev);
3398 	if (rc != 0) {
3399 		return rc;
3400 	}
3401 
3402 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3403 	if (rc != 0) {
3404 		raid_bdev_free(raid_bdev);
3405 		return rc;
3406 	}
3407 
3408 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3409 	memcpy(raid_bdev->sb, sb, sb->length);
3410 
3411 	for (i = 0; i < sb->base_bdevs_size; i++) {
3412 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3413 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3414 
3415 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3416 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3417 			raid_bdev->num_base_bdevs_operational++;
3418 		}
3419 
3420 		base_info->data_offset = sb_base_bdev->data_offset;
3421 		base_info->data_size = sb_base_bdev->data_size;
3422 	}
3423 
3424 	*raid_bdev_out = raid_bdev;
3425 	return 0;
3426 }
3427 
3428 static void
3429 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3430 {
3431 	struct raid_bdev *raid_bdev;
3432 	struct raid_base_bdev_info *base_info;
3433 
3434 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3435 		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3436 			continue;
3437 		}
3438 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3439 			if (base_info->desc == NULL &&
3440 			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3441 			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3442 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3443 				break;
3444 			}
3445 		}
3446 	}
3447 }
3448 
3449 struct raid_bdev_examine_others_ctx {
3450 	struct spdk_uuid raid_bdev_uuid;
3451 	uint8_t current_base_bdev_idx;
3452 	raid_base_bdev_cb cb_fn;
3453 	void *cb_ctx;
3454 };
3455 
3456 static void
3457 raid_bdev_examine_others_done(void *_ctx, int status)
3458 {
3459 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3460 
3461 	if (ctx->cb_fn != NULL) {
3462 		ctx->cb_fn(ctx->cb_ctx, status);
3463 	}
3464 	free(ctx);
3465 }
3466 
3467 typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3468 		const struct raid_bdev_superblock *sb, int status, void *ctx);
3469 static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3470 				     void *cb_ctx);
3471 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3472 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3473 static void raid_bdev_examine_others(void *_ctx, int status);
3474 
3475 static void
3476 raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3477 				 int status, void *_ctx)
3478 {
3479 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3480 
3481 	if (status != 0) {
3482 		raid_bdev_examine_others_done(ctx, status);
3483 		return;
3484 	}
3485 
3486 	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3487 }
3488 
3489 static void
3490 raid_bdev_examine_others(void *_ctx, int status)
3491 {
3492 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3493 	struct raid_bdev *raid_bdev;
3494 	struct raid_base_bdev_info *base_info;
3495 	char uuid_str[SPDK_UUID_STRING_LEN];
3496 
3497 	if (status != 0) {
3498 		goto out;
3499 	}
3500 
3501 	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3502 	if (raid_bdev == NULL) {
3503 		status = -ENODEV;
3504 		goto out;
3505 	}
3506 
3507 	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3508 	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3509 	     base_info++) {
3510 		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3511 			continue;
3512 		}
3513 
3514 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3515 
3516 		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3517 			continue;
3518 		}
3519 
3520 		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3521 
3522 		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3523 		if (status != 0) {
3524 			continue;
3525 		}
3526 		return;
3527 	}
3528 out:
3529 	raid_bdev_examine_others_done(ctx, status);
3530 }
3531 
3532 static void
3533 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3534 		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3535 {
3536 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3537 	struct raid_bdev *raid_bdev;
3538 	struct raid_base_bdev_info *iter, *base_info;
3539 	uint8_t i;
3540 	int rc;
3541 
3542 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3543 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3544 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3545 		rc = -EINVAL;
3546 		goto out;
3547 	}
3548 
3549 	if (spdk_uuid_is_null(&sb->uuid)) {
3550 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3551 		rc = -EINVAL;
3552 		goto out;
3553 	}
3554 
3555 	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3556 
3557 	if (raid_bdev) {
3558 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3559 			SPDK_DEBUGLOG(bdev_raid,
3560 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3561 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3562 
3563 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3564 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3565 					     raid_bdev->bdev.name, bdev->name);
3566 				rc = -EBUSY;
3567 				goto out;
3568 			}
3569 
3570 			/* remove and then recreate the raid bdev using the newer superblock */
3571 			raid_bdev_delete(raid_bdev, NULL, NULL);
3572 			raid_bdev = NULL;
3573 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3574 			SPDK_DEBUGLOG(bdev_raid,
3575 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3576 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3577 			/* use the current raid bdev superblock */
3578 			sb = raid_bdev->sb;
3579 		}
3580 	}
3581 
3582 	for (i = 0; i < sb->base_bdevs_size; i++) {
3583 		sb_base_bdev = &sb->base_bdevs[i];
3584 
3585 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3586 
3587 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3588 			break;
3589 		}
3590 	}
3591 
3592 	if (i == sb->base_bdevs_size) {
3593 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3594 		rc = -EINVAL;
3595 		goto out;
3596 	}
3597 
3598 	if (!raid_bdev) {
3599 		struct raid_bdev_examine_others_ctx *ctx;
3600 
3601 		ctx = calloc(1, sizeof(*ctx));
3602 		if (ctx == NULL) {
3603 			rc = -ENOMEM;
3604 			goto out;
3605 		}
3606 
3607 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3608 		if (rc != 0) {
3609 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3610 				    sb->name, spdk_strerror(-rc));
3611 			free(ctx);
3612 			goto out;
3613 		}
3614 
3615 		/* after this base bdev is configured, examine other base bdevs that may be present */
3616 		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3617 		ctx->cb_fn = cb_fn;
3618 		ctx->cb_ctx = cb_ctx;
3619 
3620 		cb_fn = raid_bdev_examine_others;
3621 		cb_ctx = ctx;
3622 	}
3623 
3624 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3625 		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3626 		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3627 		assert(base_info->is_configured == false);
3628 		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3629 		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3630 		assert(spdk_uuid_is_null(&base_info->uuid));
3631 		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3632 		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3633 		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3634 		if (rc != 0) {
3635 			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3636 				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3637 		}
3638 		goto out;
3639 	}
3640 
3641 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3642 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3643 			       bdev->name, raid_bdev->bdev.name);
3644 		rc = -EINVAL;
3645 		goto out;
3646 	}
3647 
3648 	base_info = NULL;
3649 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3650 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3651 			base_info = iter;
3652 			break;
3653 		}
3654 	}
3655 
3656 	if (base_info == NULL) {
3657 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3658 			    bdev->name, raid_bdev->bdev.name);
3659 		rc = -EINVAL;
3660 		goto out;
3661 	}
3662 
3663 	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3664 	if (rc != 0) {
3665 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3666 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3667 	}
3668 out:
3669 	if (rc != 0 && cb_fn != 0) {
3670 		cb_fn(cb_ctx, rc);
3671 	}
3672 }
3673 
3674 struct raid_bdev_examine_ctx {
3675 	struct spdk_bdev_desc *desc;
3676 	struct spdk_io_channel *ch;
3677 	raid_bdev_examine_load_sb_cb cb;
3678 	void *cb_ctx;
3679 };
3680 
3681 static void
3682 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3683 {
3684 	if (!ctx) {
3685 		return;
3686 	}
3687 
3688 	if (ctx->ch) {
3689 		spdk_put_io_channel(ctx->ch);
3690 	}
3691 
3692 	if (ctx->desc) {
3693 		spdk_bdev_close(ctx->desc);
3694 	}
3695 
3696 	free(ctx);
3697 }
3698 
3699 static void
3700 raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3701 {
3702 	struct raid_bdev_examine_ctx *ctx = _ctx;
3703 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3704 
3705 	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3706 
3707 	raid_bdev_examine_ctx_free(ctx);
3708 }
3709 
3710 static void
3711 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3712 {
3713 }
3714 
3715 static int
3716 raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3717 {
3718 	struct raid_bdev_examine_ctx *ctx;
3719 	int rc;
3720 
3721 	assert(cb != NULL);
3722 
3723 	ctx = calloc(1, sizeof(*ctx));
3724 	if (!ctx) {
3725 		return -ENOMEM;
3726 	}
3727 
3728 	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3729 	if (rc) {
3730 		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3731 		goto err;
3732 	}
3733 
3734 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3735 	if (!ctx->ch) {
3736 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3737 		rc = -ENOMEM;
3738 		goto err;
3739 	}
3740 
3741 	ctx->cb = cb;
3742 	ctx->cb_ctx = cb_ctx;
3743 
3744 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3745 	if (rc) {
3746 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3747 			    bdev_name, spdk_strerror(-rc));
3748 		goto err;
3749 	}
3750 
3751 	return 0;
3752 err:
3753 	raid_bdev_examine_ctx_free(ctx);
3754 	return rc;
3755 }
3756 
3757 static void
3758 raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3759 		       void *ctx)
3760 {
3761 	switch (status) {
3762 	case 0:
3763 		/* valid superblock found */
3764 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3765 		raid_bdev_examine_sb(sb, bdev, NULL, NULL);
3766 		break;
3767 	case -EINVAL:
3768 		/* no valid superblock, check if it can be claimed anyway */
3769 		raid_bdev_examine_no_sb(bdev);
3770 		break;
3771 	default:
3772 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3773 			    bdev->name, spdk_strerror(-status));
3774 		break;
3775 	}
3776 
3777 	spdk_bdev_module_examine_done(&g_raid_if);
3778 }
3779 
3780 /*
3781  * brief:
3782  * raid_bdev_examine function is the examine function call by the below layers
3783  * like bdev_nvme layer. This function will check if this base bdev can be
3784  * claimed by this raid bdev or not.
3785  * params:
3786  * bdev - pointer to base bdev
3787  * returns:
3788  * none
3789  */
3790 static void
3791 raid_bdev_examine(struct spdk_bdev *bdev)
3792 {
3793 	int rc;
3794 
3795 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3796 		goto done;
3797 	}
3798 
3799 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3800 		raid_bdev_examine_no_sb(bdev);
3801 		goto done;
3802 	}
3803 
3804 	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3805 	if (rc != 0) {
3806 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3807 			    bdev->name, spdk_strerror(-rc));
3808 		goto done;
3809 	}
3810 
3811 	return;
3812 done:
3813 	spdk_bdev_module_examine_done(&g_raid_if);
3814 }
3815 
3816 /* Log component for bdev raid bdev module */
3817 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3818