xref: /spdk/module/bdev/raid/bdev_raid.c (revision ef5c1379bef5579b248edb9da8e93fcb631bbe9f)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 /* Function declarations */
141 static void	raid_bdev_examine(struct spdk_bdev *bdev);
142 static int	raid_bdev_init(void);
143 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
144 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
145 
146 static void
147 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
148 {
149 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
150 
151 	if (raid_ch->process.target_ch != NULL) {
152 		spdk_put_io_channel(raid_ch->process.target_ch);
153 		raid_ch->process.target_ch = NULL;
154 	}
155 
156 	if (raid_ch->process.ch_processed != NULL) {
157 		free(raid_ch->process.ch_processed->base_channel);
158 		free(raid_ch->process.ch_processed);
159 		raid_ch->process.ch_processed = NULL;
160 	}
161 }
162 
163 static int
164 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
165 {
166 	struct raid_bdev *raid_bdev = process->raid_bdev;
167 	struct raid_bdev_io_channel *raid_ch_processed;
168 	struct raid_base_bdev_info *base_info;
169 
170 	raid_ch->process.offset = process->window_offset;
171 
172 	/* In the future we may have other types of processes which don't use a target bdev,
173 	 * like data scrubbing or strip size migration. Until then, expect that there always is
174 	 * a process target. */
175 	assert(process->target != NULL);
176 
177 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
178 	if (raid_ch->process.target_ch == NULL) {
179 		goto err;
180 	}
181 
182 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
183 	if (raid_ch_processed == NULL) {
184 		goto err;
185 	}
186 	raid_ch->process.ch_processed = raid_ch_processed;
187 
188 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
189 					  sizeof(*raid_ch_processed->base_channel));
190 	if (raid_ch_processed->base_channel == NULL) {
191 		goto err;
192 	}
193 
194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
195 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
196 
197 		if (base_info != process->target) {
198 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
199 		} else {
200 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
201 		}
202 	}
203 
204 	raid_ch_processed->module_channel = raid_ch->module_channel;
205 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
206 
207 	return 0;
208 err:
209 	raid_bdev_ch_process_cleanup(raid_ch);
210 	return -ENOMEM;
211 }
212 
213 /*
214  * brief:
215  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
216  * hierarchy from raid bdev to base bdev io channels. It will be called per core
217  * params:
218  * io_device - pointer to raid bdev io device represented by raid_bdev
219  * ctx_buf - pointer to context buffer for raid bdev io channel
220  * returns:
221  * 0 - success
222  * non zero - failure
223  */
224 static int
225 raid_bdev_create_cb(void *io_device, void *ctx_buf)
226 {
227 	struct raid_bdev            *raid_bdev = io_device;
228 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
229 	uint8_t i;
230 	int ret = -ENOMEM;
231 
232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
233 
234 	assert(raid_bdev != NULL);
235 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
236 
237 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
238 	if (!raid_ch->base_channel) {
239 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
240 		return -ENOMEM;
241 	}
242 
243 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
244 		/*
245 		 * Get the spdk_io_channel for all the base bdevs. This is used during
246 		 * split logic to send the respective child bdev ios to respective base
247 		 * bdev io channel.
248 		 * Skip missing base bdevs and the process target, which should also be treated as
249 		 * missing until the process completes.
250 		 */
251 		if (raid_bdev->base_bdev_info[i].is_configured == false ||
252 		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
253 			continue;
254 		}
255 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
256 						   raid_bdev->base_bdev_info[i].desc);
257 		if (!raid_ch->base_channel[i]) {
258 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
259 			goto err;
260 		}
261 	}
262 
263 	if (raid_bdev->module->get_io_channel) {
264 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
265 		if (!raid_ch->module_channel) {
266 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
267 			goto err;
268 		}
269 	}
270 
271 	if (raid_bdev->process != NULL) {
272 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
273 		if (ret != 0) {
274 			SPDK_ERRLOG("Failed to setup process io channel\n");
275 			goto err;
276 		}
277 	} else {
278 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
279 	}
280 
281 	return 0;
282 err:
283 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
284 		if (raid_ch->base_channel[i] != NULL) {
285 			spdk_put_io_channel(raid_ch->base_channel[i]);
286 		}
287 	}
288 	free(raid_ch->base_channel);
289 
290 	raid_bdev_ch_process_cleanup(raid_ch);
291 
292 	return ret;
293 }
294 
295 /*
296  * brief:
297  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
298  * hierarchy from raid bdev to base bdev io channels. It will be called per core
299  * params:
300  * io_device - pointer to raid bdev io device represented by raid_bdev
301  * ctx_buf - pointer to context buffer for raid bdev io channel
302  * returns:
303  * none
304  */
305 static void
306 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
307 {
308 	struct raid_bdev *raid_bdev = io_device;
309 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
310 	uint8_t i;
311 
312 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
313 
314 	assert(raid_ch != NULL);
315 	assert(raid_ch->base_channel);
316 
317 	if (raid_ch->module_channel) {
318 		spdk_put_io_channel(raid_ch->module_channel);
319 	}
320 
321 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
322 		/* Free base bdev channels */
323 		if (raid_ch->base_channel[i] != NULL) {
324 			spdk_put_io_channel(raid_ch->base_channel[i]);
325 		}
326 	}
327 	free(raid_ch->base_channel);
328 	raid_ch->base_channel = NULL;
329 
330 	raid_bdev_ch_process_cleanup(raid_ch);
331 }
332 
333 /*
334  * brief:
335  * raid_bdev_cleanup is used to cleanup raid_bdev related data
336  * structures.
337  * params:
338  * raid_bdev - pointer to raid_bdev
339  * returns:
340  * none
341  */
342 static void
343 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
344 {
345 	struct raid_base_bdev_info *base_info;
346 
347 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
348 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
349 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
350 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
351 
352 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
353 		assert(base_info->desc == NULL);
354 		free(base_info->name);
355 	}
356 
357 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
358 }
359 
360 static void
361 raid_bdev_free(struct raid_bdev *raid_bdev)
362 {
363 	raid_bdev_free_superblock(raid_bdev);
364 	free(raid_bdev->base_bdev_info);
365 	free(raid_bdev->bdev.name);
366 	free(raid_bdev);
367 }
368 
369 static void
370 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
371 {
372 	raid_bdev_cleanup(raid_bdev);
373 	raid_bdev_free(raid_bdev);
374 }
375 
376 static void
377 raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
378 {
379 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
380 
381 	assert(base_info->is_configured);
382 	assert(raid_bdev->num_base_bdevs_discovered);
383 	raid_bdev->num_base_bdevs_discovered--;
384 	base_info->is_configured = false;
385 }
386 
387 /*
388  * brief:
389  * free resource of base bdev for raid bdev
390  * params:
391  * base_info - raid base bdev info
392  * returns:
393  * none
394  */
395 static void
396 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
397 {
398 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
399 
400 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
401 
402 	free(base_info->name);
403 	base_info->name = NULL;
404 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
405 		spdk_uuid_set_null(&base_info->uuid);
406 	}
407 
408 	if (base_info->desc == NULL) {
409 		return;
410 	}
411 
412 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
413 	spdk_bdev_close(base_info->desc);
414 	base_info->desc = NULL;
415 	spdk_put_io_channel(base_info->app_thread_ch);
416 	base_info->app_thread_ch = NULL;
417 
418 	if (base_info->is_configured) {
419 		raid_bdev_deconfigure_base_bdev(base_info);
420 	}
421 }
422 
423 static void
424 raid_bdev_io_device_unregister_cb(void *io_device)
425 {
426 	struct raid_bdev *raid_bdev = io_device;
427 
428 	if (raid_bdev->num_base_bdevs_discovered == 0) {
429 		/* Free raid_bdev when there are no base bdevs left */
430 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
431 		raid_bdev_cleanup(raid_bdev);
432 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
433 		raid_bdev_free(raid_bdev);
434 	} else {
435 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
436 	}
437 }
438 
439 void
440 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
441 {
442 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
443 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
444 	}
445 }
446 
447 static void
448 _raid_bdev_destruct(void *ctxt)
449 {
450 	struct raid_bdev *raid_bdev = ctxt;
451 	struct raid_base_bdev_info *base_info;
452 
453 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
454 
455 	assert(raid_bdev->process == NULL);
456 
457 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
458 		/*
459 		 * Close all base bdev descriptors for which call has come from below
460 		 * layers.  Also close the descriptors if we have started shutdown.
461 		 */
462 		if (g_shutdown_started || base_info->remove_scheduled == true) {
463 			raid_bdev_free_base_bdev_resource(base_info);
464 		}
465 	}
466 
467 	if (g_shutdown_started) {
468 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
469 	}
470 
471 	if (raid_bdev->module->stop != NULL) {
472 		if (raid_bdev->module->stop(raid_bdev) == false) {
473 			return;
474 		}
475 	}
476 
477 	raid_bdev_module_stop_done(raid_bdev);
478 }
479 
480 static int
481 raid_bdev_destruct(void *ctx)
482 {
483 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
484 
485 	return 1;
486 }
487 
488 static int
489 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
490 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
491 {
492 	struct spdk_dif_ctx dif_ctx;
493 	struct spdk_dif_error err_blk = {};
494 	int rc;
495 	struct spdk_dif_ctx_init_ext_opts dif_opts;
496 	struct iovec md_iov = {
497 		.iov_base	= md_buf,
498 		.iov_len	= num_blocks * bdev->md_len,
499 	};
500 
501 	if (md_buf == NULL) {
502 		return 0;
503 	}
504 
505 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
506 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
507 	rc = spdk_dif_ctx_init(&dif_ctx,
508 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
509 			       bdev->dif_is_head_of_md, bdev->dif_type,
510 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
511 			       0, 0, 0, 0, 0, &dif_opts);
512 	if (rc != 0) {
513 		SPDK_ERRLOG("Initialization of DIF context failed\n");
514 		return rc;
515 	}
516 
517 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
518 
519 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
520 	if (rc != 0) {
521 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
522 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
523 	}
524 
525 	return rc;
526 }
527 
528 int
529 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
530 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
531 {
532 	struct spdk_dif_ctx dif_ctx;
533 	struct spdk_dif_error err_blk = {};
534 	int rc;
535 	struct spdk_dif_ctx_init_ext_opts dif_opts;
536 	struct iovec md_iov = {
537 		.iov_base	= md_buf,
538 		.iov_len	= num_blocks * bdev->md_len,
539 	};
540 
541 	if (md_buf == NULL) {
542 		return 0;
543 	}
544 
545 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
546 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
547 	rc = spdk_dif_ctx_init(&dif_ctx,
548 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
549 			       bdev->dif_is_head_of_md, bdev->dif_type,
550 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
551 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
552 	if (rc != 0) {
553 		SPDK_ERRLOG("Initialization of DIF context failed\n");
554 		return rc;
555 	}
556 
557 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
558 	if (rc != 0) {
559 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
560 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
561 	}
562 
563 	return rc;
564 }
565 
566 /**
567  * Raid bdev I/O read/write wrapper for spdk_bdev_readv_blocks_ext function.
568  */
569 int
570 raid_bdev_readv_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
571 			   struct iovec *iov, int iovcnt, uint64_t offset_blocks,
572 			   uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
573 			   struct spdk_bdev_ext_io_opts *opts)
574 {
575 	return spdk_bdev_readv_blocks_ext(base_info->desc, ch, iov, iovcnt,
576 					  base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
577 }
578 
579 /**
580  * Raid bdev I/O read/write wrapper for spdk_bdev_writev_blocks_ext function.
581  */
582 int
583 raid_bdev_writev_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
584 			    struct iovec *iov, int iovcnt, uint64_t offset_blocks,
585 			    uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
586 			    struct spdk_bdev_ext_io_opts *opts)
587 {
588 	int rc;
589 	uint64_t remapped_offset_blocks = base_info->data_offset + offset_blocks;
590 
591 	if (spdk_unlikely(spdk_bdev_get_dif_type(&base_info->raid_bdev->bdev) != SPDK_DIF_DISABLE &&
592 			  base_info->raid_bdev->bdev.dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK)) {
593 
594 		rc = raid_bdev_remap_dix_reftag(opts->metadata, num_blocks, &base_info->raid_bdev->bdev,
595 						remapped_offset_blocks);
596 		if (rc != 0) {
597 			return rc;
598 		}
599 	}
600 
601 	return spdk_bdev_writev_blocks_ext(base_info->desc, ch, iov, iovcnt,
602 					   remapped_offset_blocks, num_blocks, cb, cb_arg, opts);
603 }
604 
605 void
606 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
607 {
608 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
609 	int rc;
610 
611 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
612 		struct iovec *split_iov = raid_io->split.iov;
613 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
614 
615 		/*
616 		 * Non-zero offset here means that this is the completion of the first part of the
617 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
618 		 */
619 		if (raid_io->split.offset != 0) {
620 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
621 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
622 
623 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
624 				raid_io->num_blocks = raid_io->split.offset;
625 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
626 				raid_io->iovs = bdev_io->u.bdev.iovs;
627 				if (split_iov != NULL) {
628 					raid_io->iovcnt++;
629 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
630 					split_iov->iov_base = split_iov_orig->iov_base;
631 				}
632 
633 				raid_io->split.offset = 0;
634 				raid_io->base_bdev_io_submitted = 0;
635 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
636 
637 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
638 				return;
639 			}
640 		}
641 
642 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
643 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
644 		raid_io->iovs = bdev_io->u.bdev.iovs;
645 		if (split_iov != NULL) {
646 			*split_iov = *split_iov_orig;
647 		}
648 	}
649 
650 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
651 		raid_io->completion_cb(raid_io, status);
652 	} else {
653 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
654 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
655 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
656 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
657 
658 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
659 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
660 							bdev_io->u.bdev.offset_blocks);
661 			if (rc != 0) {
662 				status = SPDK_BDEV_IO_STATUS_FAILED;
663 			}
664 		}
665 		spdk_bdev_io_complete(bdev_io, status);
666 	}
667 }
668 
669 /*
670  * brief:
671  * raid_bdev_io_complete_part - signal the completion of a part of the expected
672  * base bdev IOs and complete the raid_io if this is the final expected IO.
673  * The caller should first set raid_io->base_bdev_io_remaining. This function
674  * will decrement this counter by the value of the 'completed' parameter and
675  * complete the raid_io if the counter reaches 0. The caller is free to
676  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
677  * it can represent e.g. blocks or IOs.
678  * params:
679  * raid_io - pointer to raid_bdev_io
680  * completed - the part of the raid_io that has been completed
681  * status - status of the base IO
682  * returns:
683  * true - if the raid_io is completed
684  * false - otherwise
685  */
686 bool
687 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
688 			   enum spdk_bdev_io_status status)
689 {
690 	assert(raid_io->base_bdev_io_remaining >= completed);
691 	raid_io->base_bdev_io_remaining -= completed;
692 
693 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
694 		raid_io->base_bdev_io_status = status;
695 	}
696 
697 	if (raid_io->base_bdev_io_remaining == 0) {
698 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
699 		return true;
700 	} else {
701 		return false;
702 	}
703 }
704 
705 /*
706  * brief:
707  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
708  * It will try to queue the IOs after storing the context to bdev wait queue logic.
709  * params:
710  * raid_io - pointer to raid_bdev_io
711  * bdev - the block device that the IO is submitted to
712  * ch - io channel
713  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
714  * returns:
715  * none
716  */
717 void
718 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
719 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
720 {
721 	raid_io->waitq_entry.bdev = bdev;
722 	raid_io->waitq_entry.cb_fn = cb_fn;
723 	raid_io->waitq_entry.cb_arg = raid_io;
724 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
725 }
726 
727 static void
728 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
729 {
730 	struct raid_bdev_io *raid_io = cb_arg;
731 
732 	spdk_bdev_free_io(bdev_io);
733 
734 	raid_bdev_io_complete_part(raid_io, 1, success ?
735 				   SPDK_BDEV_IO_STATUS_SUCCESS :
736 				   SPDK_BDEV_IO_STATUS_FAILED);
737 }
738 
739 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
740 
741 static void
742 _raid_bdev_submit_reset_request(void *_raid_io)
743 {
744 	struct raid_bdev_io *raid_io = _raid_io;
745 
746 	raid_bdev_submit_reset_request(raid_io);
747 }
748 
749 /*
750  * brief:
751  * raid_bdev_submit_reset_request function submits reset requests
752  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
753  * which case it will queue it for later submission
754  * params:
755  * raid_io
756  * returns:
757  * none
758  */
759 static void
760 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
761 {
762 	struct raid_bdev		*raid_bdev;
763 	int				ret;
764 	uint8_t				i;
765 	struct raid_base_bdev_info	*base_info;
766 	struct spdk_io_channel		*base_ch;
767 
768 	raid_bdev = raid_io->raid_bdev;
769 
770 	if (raid_io->base_bdev_io_remaining == 0) {
771 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
772 	}
773 
774 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
775 		base_info = &raid_bdev->base_bdev_info[i];
776 		base_ch = raid_io->raid_ch->base_channel[i];
777 		if (base_ch == NULL) {
778 			raid_io->base_bdev_io_submitted++;
779 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
780 			continue;
781 		}
782 		ret = spdk_bdev_reset(base_info->desc, base_ch,
783 				      raid_base_bdev_reset_complete, raid_io);
784 		if (ret == 0) {
785 			raid_io->base_bdev_io_submitted++;
786 		} else if (ret == -ENOMEM) {
787 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
788 						base_ch, _raid_bdev_submit_reset_request);
789 			return;
790 		} else {
791 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
792 			assert(false);
793 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
794 			return;
795 		}
796 	}
797 }
798 
799 static void
800 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
801 {
802 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
803 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
804 	int i;
805 
806 	assert(split_offset != 0);
807 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
808 	raid_io->split.offset = split_offset;
809 
810 	raid_io->offset_blocks += split_offset;
811 	raid_io->num_blocks -= split_offset;
812 	if (raid_io->md_buf != NULL) {
813 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
814 	}
815 
816 	for (i = 0; i < raid_io->iovcnt; i++) {
817 		struct iovec *iov = &raid_io->iovs[i];
818 
819 		if (iov_offset < iov->iov_len) {
820 			if (iov_offset == 0) {
821 				raid_io->split.iov = NULL;
822 			} else {
823 				raid_io->split.iov = iov;
824 				raid_io->split.iov_copy = *iov;
825 				iov->iov_base += iov_offset;
826 				iov->iov_len -= iov_offset;
827 			}
828 			raid_io->iovs += i;
829 			raid_io->iovcnt -= i;
830 			break;
831 		}
832 
833 		iov_offset -= iov->iov_len;
834 	}
835 }
836 
837 static void
838 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
839 {
840 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
841 
842 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
843 		uint64_t offset_begin = raid_io->offset_blocks;
844 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
845 
846 		if (offset_end > raid_ch->process.offset) {
847 			if (offset_begin < raid_ch->process.offset) {
848 				/*
849 				 * If the I/O spans both the processed and unprocessed ranges,
850 				 * split it and first handle the unprocessed part. After it
851 				 * completes, the rest will be handled.
852 				 * This situation occurs when the process thread is not active
853 				 * or is waiting for the process window range to be locked
854 				 * (quiesced). When a window is being processed, such I/Os will be
855 				 * deferred by the bdev layer until the window is unlocked.
856 				 */
857 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
858 					      raid_ch->process.offset, offset_begin, offset_end);
859 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
860 			}
861 		} else {
862 			/* Use the child channel, which corresponds to the already processed range */
863 			raid_io->raid_ch = raid_ch->process.ch_processed;
864 		}
865 	}
866 
867 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
868 }
869 
870 /*
871  * brief:
872  * Callback function to spdk_bdev_io_get_buf.
873  * params:
874  * ch - pointer to raid bdev io channel
875  * bdev_io - pointer to parent bdev_io on raid bdev device
876  * success - True if buffer is allocated or false otherwise.
877  * returns:
878  * none
879  */
880 static void
881 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
882 		     bool success)
883 {
884 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
885 
886 	if (!success) {
887 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
888 		return;
889 	}
890 
891 	raid_bdev_submit_rw_request(raid_io);
892 }
893 
894 void
895 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
896 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
897 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
898 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
899 {
900 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
901 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
902 
903 	raid_io->type = type;
904 	raid_io->offset_blocks = offset_blocks;
905 	raid_io->num_blocks = num_blocks;
906 	raid_io->iovs = iovs;
907 	raid_io->iovcnt = iovcnt;
908 	raid_io->memory_domain = memory_domain;
909 	raid_io->memory_domain_ctx = memory_domain_ctx;
910 	raid_io->md_buf = md_buf;
911 
912 	raid_io->raid_bdev = raid_bdev;
913 	raid_io->raid_ch = raid_ch;
914 	raid_io->base_bdev_io_remaining = 0;
915 	raid_io->base_bdev_io_submitted = 0;
916 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
917 	raid_io->completion_cb = NULL;
918 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
919 }
920 
921 /*
922  * brief:
923  * raid_bdev_submit_request function is the submit_request function pointer of
924  * raid bdev function table. This is used to submit the io on raid_bdev to below
925  * layers.
926  * params:
927  * ch - pointer to raid bdev io channel
928  * bdev_io - pointer to parent bdev_io on raid bdev device
929  * returns:
930  * none
931  */
932 static void
933 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
934 {
935 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
936 
937 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
938 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
939 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
940 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
941 
942 	switch (bdev_io->type) {
943 	case SPDK_BDEV_IO_TYPE_READ:
944 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
945 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
946 		break;
947 	case SPDK_BDEV_IO_TYPE_WRITE:
948 		raid_bdev_submit_rw_request(raid_io);
949 		break;
950 
951 	case SPDK_BDEV_IO_TYPE_RESET:
952 		raid_bdev_submit_reset_request(raid_io);
953 		break;
954 
955 	case SPDK_BDEV_IO_TYPE_FLUSH:
956 	case SPDK_BDEV_IO_TYPE_UNMAP:
957 		if (raid_io->raid_bdev->process != NULL) {
958 			/* TODO: rebuild support */
959 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
960 			return;
961 		}
962 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
963 		break;
964 
965 	default:
966 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
967 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
968 		break;
969 	}
970 }
971 
972 /*
973  * brief:
974  * _raid_bdev_io_type_supported checks whether io_type is supported in
975  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
976  * doesn't support, the raid device doesn't supports.
977  *
978  * params:
979  * raid_bdev - pointer to raid bdev context
980  * io_type - io type
981  * returns:
982  * true - io_type is supported
983  * false - io_type is not supported
984  */
985 inline static bool
986 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
987 {
988 	struct raid_base_bdev_info *base_info;
989 
990 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
991 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
992 		if (raid_bdev->module->submit_null_payload_request == NULL) {
993 			return false;
994 		}
995 	}
996 
997 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
998 		if (base_info->desc == NULL) {
999 			continue;
1000 		}
1001 
1002 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
1003 			return false;
1004 		}
1005 	}
1006 
1007 	return true;
1008 }
1009 
1010 /*
1011  * brief:
1012  * raid_bdev_io_type_supported is the io_supported function for bdev function
1013  * table which returns whether the particular io type is supported or not by
1014  * raid bdev module
1015  * params:
1016  * ctx - pointer to raid bdev context
1017  * type - io type
1018  * returns:
1019  * true - io_type is supported
1020  * false - io_type is not supported
1021  */
1022 static bool
1023 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1024 {
1025 	switch (io_type) {
1026 	case SPDK_BDEV_IO_TYPE_READ:
1027 	case SPDK_BDEV_IO_TYPE_WRITE:
1028 		return true;
1029 
1030 	case SPDK_BDEV_IO_TYPE_FLUSH:
1031 	case SPDK_BDEV_IO_TYPE_RESET:
1032 	case SPDK_BDEV_IO_TYPE_UNMAP:
1033 		return _raid_bdev_io_type_supported(ctx, io_type);
1034 
1035 	default:
1036 		return false;
1037 	}
1038 
1039 	return false;
1040 }
1041 
1042 /*
1043  * brief:
1044  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1045  * raid bdev. This is used to return the io channel for this raid bdev
1046  * params:
1047  * ctxt - pointer to raid_bdev
1048  * returns:
1049  * pointer to io channel for raid bdev
1050  */
1051 static struct spdk_io_channel *
1052 raid_bdev_get_io_channel(void *ctxt)
1053 {
1054 	struct raid_bdev *raid_bdev = ctxt;
1055 
1056 	return spdk_get_io_channel(raid_bdev);
1057 }
1058 
1059 void
1060 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1061 {
1062 	struct raid_base_bdev_info *base_info;
1063 
1064 	assert(raid_bdev != NULL);
1065 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1066 
1067 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1068 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1069 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1070 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1071 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1072 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1073 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1074 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1075 				     raid_bdev->num_base_bdevs_operational);
1076 	if (raid_bdev->process) {
1077 		struct raid_bdev_process *process = raid_bdev->process;
1078 		uint64_t offset = process->window_offset;
1079 
1080 		spdk_json_write_named_object_begin(w, "process");
1081 		spdk_json_write_name(w, "type");
1082 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1083 		spdk_json_write_named_string(w, "target", process->target->name);
1084 		spdk_json_write_named_object_begin(w, "progress");
1085 		spdk_json_write_named_uint64(w, "blocks", offset);
1086 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1087 		spdk_json_write_object_end(w);
1088 		spdk_json_write_object_end(w);
1089 	}
1090 	spdk_json_write_name(w, "base_bdevs_list");
1091 	spdk_json_write_array_begin(w);
1092 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1093 		spdk_json_write_object_begin(w);
1094 		spdk_json_write_name(w, "name");
1095 		if (base_info->name) {
1096 			spdk_json_write_string(w, base_info->name);
1097 		} else {
1098 			spdk_json_write_null(w);
1099 		}
1100 		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1101 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1102 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1103 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1104 		spdk_json_write_object_end(w);
1105 	}
1106 	spdk_json_write_array_end(w);
1107 }
1108 
1109 /*
1110  * brief:
1111  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1112  * params:
1113  * ctx - pointer to raid_bdev
1114  * w - pointer to json context
1115  * returns:
1116  * 0 - success
1117  * non zero - failure
1118  */
1119 static int
1120 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1121 {
1122 	struct raid_bdev *raid_bdev = ctx;
1123 
1124 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1125 
1126 	/* Dump the raid bdev configuration related information */
1127 	spdk_json_write_named_object_begin(w, "raid");
1128 	raid_bdev_write_info_json(raid_bdev, w);
1129 	spdk_json_write_object_end(w);
1130 
1131 	return 0;
1132 }
1133 
1134 /*
1135  * brief:
1136  * raid_bdev_write_config_json is the function table pointer for raid bdev
1137  * params:
1138  * bdev - pointer to spdk_bdev
1139  * w - pointer to json context
1140  * returns:
1141  * none
1142  */
1143 static void
1144 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1145 {
1146 	struct raid_bdev *raid_bdev = bdev->ctxt;
1147 	struct raid_base_bdev_info *base_info;
1148 
1149 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1150 
1151 	if (raid_bdev->superblock_enabled) {
1152 		/* raid bdev configuration is stored in the superblock */
1153 		return;
1154 	}
1155 
1156 	spdk_json_write_object_begin(w);
1157 
1158 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1159 
1160 	spdk_json_write_named_object_begin(w, "params");
1161 	spdk_json_write_named_string(w, "name", bdev->name);
1162 	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1163 	if (raid_bdev->strip_size_kb != 0) {
1164 		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1165 	}
1166 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1167 
1168 	spdk_json_write_named_array_begin(w, "base_bdevs");
1169 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1170 		if (base_info->name) {
1171 			spdk_json_write_string(w, base_info->name);
1172 		} else {
1173 			char str[32];
1174 
1175 			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1176 			spdk_json_write_string(w, str);
1177 		}
1178 	}
1179 	spdk_json_write_array_end(w);
1180 	spdk_json_write_object_end(w);
1181 
1182 	spdk_json_write_object_end(w);
1183 }
1184 
1185 static int
1186 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1187 {
1188 	struct raid_bdev *raid_bdev = ctx;
1189 	struct raid_base_bdev_info *base_info;
1190 	int domains_count = 0, rc = 0;
1191 
1192 	if (raid_bdev->module->memory_domains_supported == false) {
1193 		return 0;
1194 	}
1195 
1196 	/* First loop to get the number of memory domains */
1197 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1198 		if (base_info->is_configured == false) {
1199 			continue;
1200 		}
1201 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1202 		if (rc < 0) {
1203 			return rc;
1204 		}
1205 		domains_count += rc;
1206 	}
1207 
1208 	if (!domains || array_size < domains_count) {
1209 		return domains_count;
1210 	}
1211 
1212 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1213 		if (base_info->is_configured == false) {
1214 			continue;
1215 		}
1216 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1217 		if (rc < 0) {
1218 			return rc;
1219 		}
1220 		domains += rc;
1221 		array_size -= rc;
1222 	}
1223 
1224 	return domains_count;
1225 }
1226 
1227 /* g_raid_bdev_fn_table is the function table for raid bdev */
1228 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1229 	.destruct		= raid_bdev_destruct,
1230 	.submit_request		= raid_bdev_submit_request,
1231 	.io_type_supported	= raid_bdev_io_type_supported,
1232 	.get_io_channel		= raid_bdev_get_io_channel,
1233 	.dump_info_json		= raid_bdev_dump_info_json,
1234 	.write_config_json	= raid_bdev_write_config_json,
1235 	.get_memory_domains	= raid_bdev_get_memory_domains,
1236 };
1237 
1238 struct raid_bdev *
1239 raid_bdev_find_by_name(const char *name)
1240 {
1241 	struct raid_bdev *raid_bdev;
1242 
1243 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1244 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1245 			return raid_bdev;
1246 		}
1247 	}
1248 
1249 	return NULL;
1250 }
1251 
1252 static struct raid_bdev *
1253 raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1254 {
1255 	struct raid_bdev *raid_bdev;
1256 
1257 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1258 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1259 			return raid_bdev;
1260 		}
1261 	}
1262 
1263 	return NULL;
1264 }
1265 
1266 static struct {
1267 	const char *name;
1268 	enum raid_level value;
1269 } g_raid_level_names[] = {
1270 	{ "raid0", RAID0 },
1271 	{ "0", RAID0 },
1272 	{ "raid1", RAID1 },
1273 	{ "1", RAID1 },
1274 	{ "raid5f", RAID5F },
1275 	{ "5f", RAID5F },
1276 	{ "concat", CONCAT },
1277 	{ }
1278 };
1279 
1280 const char *g_raid_state_names[] = {
1281 	[RAID_BDEV_STATE_ONLINE]	= "online",
1282 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1283 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1284 	[RAID_BDEV_STATE_MAX]		= NULL
1285 };
1286 
1287 static const char *g_raid_process_type_names[] = {
1288 	[RAID_PROCESS_NONE]	= "none",
1289 	[RAID_PROCESS_REBUILD]	= "rebuild",
1290 	[RAID_PROCESS_MAX]	= NULL
1291 };
1292 
1293 /* We have to use the typedef in the function declaration to appease astyle. */
1294 typedef enum raid_level raid_level_t;
1295 typedef enum raid_bdev_state raid_bdev_state_t;
1296 
1297 raid_level_t
1298 raid_bdev_str_to_level(const char *str)
1299 {
1300 	unsigned int i;
1301 
1302 	assert(str != NULL);
1303 
1304 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1305 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1306 			return g_raid_level_names[i].value;
1307 		}
1308 	}
1309 
1310 	return INVALID_RAID_LEVEL;
1311 }
1312 
1313 const char *
1314 raid_bdev_level_to_str(enum raid_level level)
1315 {
1316 	unsigned int i;
1317 
1318 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1319 		if (g_raid_level_names[i].value == level) {
1320 			return g_raid_level_names[i].name;
1321 		}
1322 	}
1323 
1324 	return "";
1325 }
1326 
1327 raid_bdev_state_t
1328 raid_bdev_str_to_state(const char *str)
1329 {
1330 	unsigned int i;
1331 
1332 	assert(str != NULL);
1333 
1334 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1335 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1336 			break;
1337 		}
1338 	}
1339 
1340 	return i;
1341 }
1342 
1343 const char *
1344 raid_bdev_state_to_str(enum raid_bdev_state state)
1345 {
1346 	if (state >= RAID_BDEV_STATE_MAX) {
1347 		return "";
1348 	}
1349 
1350 	return g_raid_state_names[state];
1351 }
1352 
1353 const char *
1354 raid_bdev_process_to_str(enum raid_process_type value)
1355 {
1356 	if (value >= RAID_PROCESS_MAX) {
1357 		return "";
1358 	}
1359 
1360 	return g_raid_process_type_names[value];
1361 }
1362 
1363 /*
1364  * brief:
1365  * raid_bdev_fini_start is called when bdev layer is starting the
1366  * shutdown process
1367  * params:
1368  * none
1369  * returns:
1370  * none
1371  */
1372 static void
1373 raid_bdev_fini_start(void)
1374 {
1375 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1376 	g_shutdown_started = true;
1377 }
1378 
1379 /*
1380  * brief:
1381  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1382  * params:
1383  * none
1384  * returns:
1385  * none
1386  */
1387 static void
1388 raid_bdev_exit(void)
1389 {
1390 	struct raid_bdev *raid_bdev, *tmp;
1391 
1392 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1393 
1394 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1395 		raid_bdev_cleanup_and_free(raid_bdev);
1396 	}
1397 }
1398 
1399 static void
1400 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1401 {
1402 	spdk_json_write_object_begin(w);
1403 
1404 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1405 
1406 	spdk_json_write_named_object_begin(w, "params");
1407 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1408 	spdk_json_write_object_end(w);
1409 
1410 	spdk_json_write_object_end(w);
1411 }
1412 
1413 static int
1414 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1415 {
1416 	raid_bdev_opts_config_json(w);
1417 
1418 	return 0;
1419 }
1420 
1421 /*
1422  * brief:
1423  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1424  * module
1425  * params:
1426  * none
1427  * returns:
1428  * size of spdk_bdev_io context for raid
1429  */
1430 static int
1431 raid_bdev_get_ctx_size(void)
1432 {
1433 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1434 	return sizeof(struct raid_bdev_io);
1435 }
1436 
1437 static struct spdk_bdev_module g_raid_if = {
1438 	.name = "raid",
1439 	.module_init = raid_bdev_init,
1440 	.fini_start = raid_bdev_fini_start,
1441 	.module_fini = raid_bdev_exit,
1442 	.config_json = raid_bdev_config_json,
1443 	.get_ctx_size = raid_bdev_get_ctx_size,
1444 	.examine_disk = raid_bdev_examine,
1445 	.async_init = false,
1446 	.async_fini = false,
1447 };
1448 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1449 
1450 /*
1451  * brief:
1452  * raid_bdev_init is the initialization function for raid bdev module
1453  * params:
1454  * none
1455  * returns:
1456  * 0 - success
1457  * non zero - failure
1458  */
1459 static int
1460 raid_bdev_init(void)
1461 {
1462 	return 0;
1463 }
1464 
1465 static int
1466 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1467 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1468 		  struct raid_bdev **raid_bdev_out)
1469 {
1470 	struct raid_bdev *raid_bdev;
1471 	struct spdk_bdev *raid_bdev_gen;
1472 	struct raid_bdev_module *module;
1473 	struct raid_base_bdev_info *base_info;
1474 	uint8_t min_operational;
1475 
1476 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1477 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1478 		return -EINVAL;
1479 	}
1480 
1481 	if (raid_bdev_find_by_name(name) != NULL) {
1482 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1483 		return -EEXIST;
1484 	}
1485 
1486 	if (level == RAID1) {
1487 		if (strip_size != 0) {
1488 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1489 			return -EINVAL;
1490 		}
1491 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1492 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1493 		return -EINVAL;
1494 	}
1495 
1496 	module = raid_bdev_module_find(level);
1497 	if (module == NULL) {
1498 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1499 		return -EINVAL;
1500 	}
1501 
1502 	assert(module->base_bdevs_min != 0);
1503 	if (num_base_bdevs < module->base_bdevs_min) {
1504 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1505 			    module->base_bdevs_min,
1506 			    raid_bdev_level_to_str(level));
1507 		return -EINVAL;
1508 	}
1509 
1510 	switch (module->base_bdevs_constraint.type) {
1511 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1512 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1513 		break;
1514 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1515 		min_operational = module->base_bdevs_constraint.value;
1516 		break;
1517 	case CONSTRAINT_UNSET:
1518 		if (module->base_bdevs_constraint.value != 0) {
1519 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1520 				    (uint8_t)module->base_bdevs_constraint.value, name);
1521 			return -EINVAL;
1522 		}
1523 		min_operational = num_base_bdevs;
1524 		break;
1525 	default:
1526 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1527 			    (uint8_t)module->base_bdevs_constraint.type,
1528 			    raid_bdev_level_to_str(module->level));
1529 		return -EINVAL;
1530 	};
1531 
1532 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1533 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1534 			    raid_bdev_level_to_str(module->level));
1535 		return -EINVAL;
1536 	}
1537 
1538 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1539 	if (!raid_bdev) {
1540 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1541 		return -ENOMEM;
1542 	}
1543 
1544 	raid_bdev->module = module;
1545 	raid_bdev->num_base_bdevs = num_base_bdevs;
1546 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1547 					   sizeof(struct raid_base_bdev_info));
1548 	if (!raid_bdev->base_bdev_info) {
1549 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1550 		raid_bdev_free(raid_bdev);
1551 		return -ENOMEM;
1552 	}
1553 
1554 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1555 		base_info->raid_bdev = raid_bdev;
1556 	}
1557 
1558 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1559 	 * internally and set later.
1560 	 */
1561 	raid_bdev->strip_size = 0;
1562 	raid_bdev->strip_size_kb = strip_size;
1563 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1564 	raid_bdev->level = level;
1565 	raid_bdev->min_base_bdevs_operational = min_operational;
1566 	raid_bdev->superblock_enabled = superblock_enabled;
1567 
1568 	raid_bdev_gen = &raid_bdev->bdev;
1569 
1570 	raid_bdev_gen->name = strdup(name);
1571 	if (!raid_bdev_gen->name) {
1572 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1573 		raid_bdev_free(raid_bdev);
1574 		return -ENOMEM;
1575 	}
1576 
1577 	raid_bdev_gen->product_name = "Raid Volume";
1578 	raid_bdev_gen->ctxt = raid_bdev;
1579 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1580 	raid_bdev_gen->module = &g_raid_if;
1581 	raid_bdev_gen->write_cache = 0;
1582 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1583 
1584 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1585 
1586 	*raid_bdev_out = raid_bdev;
1587 
1588 	return 0;
1589 }
1590 
1591 /*
1592  * brief:
1593  * raid_bdev_create allocates raid bdev based on passed configuration
1594  * params:
1595  * name - name for raid bdev
1596  * strip_size - strip size in KB
1597  * num_base_bdevs - number of base bdevs
1598  * level - raid level
1599  * superblock_enabled - true if raid should have superblock
1600  * uuid - uuid to set for the bdev
1601  * raid_bdev_out - the created raid bdev
1602  * returns:
1603  * 0 - success
1604  * non zero - failure
1605  */
1606 int
1607 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1608 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1609 		 struct raid_bdev **raid_bdev_out)
1610 {
1611 	struct raid_bdev *raid_bdev;
1612 	int rc;
1613 
1614 	assert(uuid != NULL);
1615 
1616 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1617 			       &raid_bdev);
1618 	if (rc != 0) {
1619 		return rc;
1620 	}
1621 
1622 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1623 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1624 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1625 	}
1626 
1627 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1628 
1629 	*raid_bdev_out = raid_bdev;
1630 
1631 	return 0;
1632 }
1633 
1634 static void
1635 _raid_bdev_unregistering_cont(void *ctx)
1636 {
1637 	struct raid_bdev *raid_bdev = ctx;
1638 
1639 	spdk_bdev_close(raid_bdev->self_desc);
1640 	raid_bdev->self_desc = NULL;
1641 }
1642 
1643 static void
1644 raid_bdev_unregistering_cont(void *ctx)
1645 {
1646 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1647 }
1648 
1649 static int
1650 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1651 {
1652 	struct raid_process_finish_action *finish_action;
1653 
1654 	assert(spdk_get_thread() == process->thread);
1655 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1656 
1657 	finish_action = calloc(1, sizeof(*finish_action));
1658 	if (finish_action == NULL) {
1659 		return -ENOMEM;
1660 	}
1661 
1662 	finish_action->cb = cb;
1663 	finish_action->cb_ctx = cb_ctx;
1664 
1665 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1666 
1667 	return 0;
1668 }
1669 
1670 static void
1671 raid_bdev_unregistering_stop_process(void *ctx)
1672 {
1673 	struct raid_bdev_process *process = ctx;
1674 	struct raid_bdev *raid_bdev = process->raid_bdev;
1675 	int rc;
1676 
1677 	process->state = RAID_PROCESS_STATE_STOPPING;
1678 	if (process->status == 0) {
1679 		process->status = -ECANCELED;
1680 	}
1681 
1682 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1683 	if (rc != 0) {
1684 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1685 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1686 	}
1687 }
1688 
1689 static void
1690 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1691 {
1692 	struct raid_bdev *raid_bdev = event_ctx;
1693 
1694 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1695 		if (raid_bdev->process != NULL) {
1696 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1697 					     raid_bdev->process);
1698 		} else {
1699 			raid_bdev_unregistering_cont(raid_bdev);
1700 		}
1701 	}
1702 }
1703 
1704 static void
1705 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1706 {
1707 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1708 	int rc;
1709 
1710 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1711 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1712 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1713 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1714 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1715 				sizeof(struct raid_bdev_io_channel),
1716 				raid_bdev_gen->name);
1717 	rc = spdk_bdev_register(raid_bdev_gen);
1718 	if (rc != 0) {
1719 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1720 			    raid_bdev_gen->name, spdk_strerror(-rc));
1721 		goto err;
1722 	}
1723 
1724 	/*
1725 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1726 	 * first. The process may still need to unquiesce a range but it will fail because the
1727 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1728 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1729 	 * so this is the only way currently to do this correctly.
1730 	 * TODO: try to handle this correctly in bdev layer instead.
1731 	 */
1732 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1733 				&raid_bdev->self_desc);
1734 	if (rc != 0) {
1735 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1736 			    raid_bdev_gen->name, spdk_strerror(-rc));
1737 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1738 		goto err;
1739 	}
1740 
1741 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1742 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1743 		      raid_bdev_gen->name, raid_bdev);
1744 	return;
1745 err:
1746 	if (raid_bdev->module->stop != NULL) {
1747 		raid_bdev->module->stop(raid_bdev);
1748 	}
1749 	spdk_io_device_unregister(raid_bdev, NULL);
1750 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1751 }
1752 
1753 static void
1754 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1755 {
1756 	if (status == 0) {
1757 		raid_bdev_configure_cont(raid_bdev);
1758 	} else {
1759 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1760 			    raid_bdev->bdev.name, spdk_strerror(-status));
1761 		if (raid_bdev->module->stop != NULL) {
1762 			raid_bdev->module->stop(raid_bdev);
1763 		}
1764 	}
1765 }
1766 
1767 /*
1768  * brief:
1769  * If raid bdev config is complete, then only register the raid bdev to
1770  * bdev layer and remove this raid bdev from configuring list and
1771  * insert the raid bdev to configured list
1772  * params:
1773  * raid_bdev - pointer to raid bdev
1774  * returns:
1775  * 0 - success
1776  * non zero - failure
1777  */
1778 static int
1779 raid_bdev_configure(struct raid_bdev *raid_bdev)
1780 {
1781 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1782 	int rc;
1783 
1784 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1785 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1786 	assert(raid_bdev->bdev.blocklen > 0);
1787 
1788 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1789 	 * internal use.
1790 	 */
1791 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1792 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1793 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1794 		return -EINVAL;
1795 	}
1796 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1797 
1798 	rc = raid_bdev->module->start(raid_bdev);
1799 	if (rc != 0) {
1800 		SPDK_ERRLOG("raid module startup callback failed\n");
1801 		return rc;
1802 	}
1803 
1804 	if (raid_bdev->superblock_enabled) {
1805 		if (raid_bdev->sb == NULL) {
1806 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1807 			if (rc == 0) {
1808 				raid_bdev_init_superblock(raid_bdev);
1809 			}
1810 		} else {
1811 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1812 			if (raid_bdev->sb->block_size != data_block_size) {
1813 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1814 				rc = -EINVAL;
1815 			}
1816 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1817 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1818 				rc = -EINVAL;
1819 			}
1820 		}
1821 
1822 		if (rc != 0) {
1823 			if (raid_bdev->module->stop != NULL) {
1824 				raid_bdev->module->stop(raid_bdev);
1825 			}
1826 			return rc;
1827 		}
1828 
1829 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1830 	} else {
1831 		raid_bdev_configure_cont(raid_bdev);
1832 	}
1833 
1834 	return 0;
1835 }
1836 
1837 /*
1838  * brief:
1839  * If raid bdev is online and registered, change the bdev state to
1840  * configuring and unregister this raid device. Queue this raid device
1841  * in configuring list
1842  * params:
1843  * raid_bdev - pointer to raid bdev
1844  * cb_fn - callback function
1845  * cb_arg - argument to callback function
1846  * returns:
1847  * none
1848  */
1849 static void
1850 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1851 		      void *cb_arg)
1852 {
1853 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1854 		if (cb_fn) {
1855 			cb_fn(cb_arg, 0);
1856 		}
1857 		return;
1858 	}
1859 
1860 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1861 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1862 
1863 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1864 }
1865 
1866 /*
1867  * brief:
1868  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1869  * params:
1870  * base_bdev - pointer to base bdev
1871  * returns:
1872  * base bdev info if found, otherwise NULL.
1873  */
1874 static struct raid_base_bdev_info *
1875 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1876 {
1877 	struct raid_bdev *raid_bdev;
1878 	struct raid_base_bdev_info *base_info;
1879 
1880 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1881 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1882 			if (base_info->desc != NULL &&
1883 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1884 				return base_info;
1885 			}
1886 		}
1887 	}
1888 
1889 	return NULL;
1890 }
1891 
1892 static void
1893 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1894 {
1895 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1896 
1897 	assert(base_info->remove_scheduled);
1898 	base_info->remove_scheduled = false;
1899 
1900 	if (status == 0) {
1901 		raid_bdev->num_base_bdevs_operational--;
1902 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1903 			/* There is not enough base bdevs to keep the raid bdev operational. */
1904 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1905 			return;
1906 		}
1907 	}
1908 
1909 	if (base_info->remove_cb != NULL) {
1910 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1911 	}
1912 }
1913 
1914 static void
1915 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1916 {
1917 	struct raid_base_bdev_info *base_info = ctx;
1918 
1919 	if (status != 0) {
1920 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1921 			    raid_bdev->bdev.name, spdk_strerror(-status));
1922 	}
1923 
1924 	raid_bdev_remove_base_bdev_done(base_info, status);
1925 }
1926 
1927 static void
1928 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1929 {
1930 	struct raid_base_bdev_info *base_info = ctx;
1931 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1932 
1933 	if (status != 0) {
1934 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1935 			    raid_bdev->bdev.name, spdk_strerror(-status));
1936 		goto out;
1937 	}
1938 
1939 	if (raid_bdev->sb) {
1940 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1941 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1942 		uint8_t i;
1943 
1944 		for (i = 0; i < sb->base_bdevs_size; i++) {
1945 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
1946 
1947 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1948 			    sb_base_bdev->slot == slot) {
1949 				/* TODO: distinguish between failure and intentional removal */
1950 				sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1951 
1952 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1953 				return;
1954 			}
1955 		}
1956 	}
1957 out:
1958 	raid_bdev_remove_base_bdev_done(base_info, status);
1959 }
1960 
1961 static void
1962 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1963 {
1964 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1965 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1966 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1967 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1968 
1969 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1970 
1971 	if (raid_ch->base_channel[idx] != NULL) {
1972 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1973 		raid_ch->base_channel[idx] = NULL;
1974 	}
1975 
1976 	if (raid_ch->process.ch_processed != NULL) {
1977 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1978 	}
1979 
1980 	spdk_for_each_channel_continue(i, 0);
1981 }
1982 
1983 static void
1984 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1985 {
1986 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1987 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1988 
1989 	raid_bdev_free_base_bdev_resource(base_info);
1990 
1991 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1992 			    base_info);
1993 }
1994 
1995 static void
1996 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1997 {
1998 	struct raid_base_bdev_info *base_info = ctx;
1999 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2000 
2001 	if (status != 0) {
2002 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
2003 			    raid_bdev->bdev.name, spdk_strerror(-status));
2004 		raid_bdev_remove_base_bdev_done(base_info, status);
2005 		return;
2006 	}
2007 
2008 	raid_bdev_deconfigure_base_bdev(base_info);
2009 
2010 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
2011 			      raid_bdev_channels_remove_base_bdev_done);
2012 }
2013 
2014 static int
2015 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2016 {
2017 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2018 
2019 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2020 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2021 }
2022 
2023 struct raid_bdev_process_base_bdev_remove_ctx {
2024 	struct raid_bdev_process *process;
2025 	struct raid_base_bdev_info *base_info;
2026 	uint8_t num_base_bdevs_operational;
2027 };
2028 
2029 static void
2030 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2031 {
2032 	struct raid_base_bdev_info *base_info = ctx;
2033 	int ret;
2034 
2035 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2036 	if (ret != 0) {
2037 		raid_bdev_remove_base_bdev_done(base_info, ret);
2038 	}
2039 }
2040 
2041 static void
2042 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2043 {
2044 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2045 	struct raid_base_bdev_info *base_info = ctx->base_info;
2046 
2047 	free(ctx);
2048 
2049 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2050 			     base_info);
2051 }
2052 
2053 static void
2054 _raid_bdev_process_base_bdev_remove(void *_ctx)
2055 {
2056 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2057 	struct raid_bdev_process *process = ctx->process;
2058 	int ret;
2059 
2060 	if (ctx->base_info != process->target &&
2061 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2062 		/* process doesn't need to be stopped */
2063 		raid_bdev_process_base_bdev_remove_cont(ctx);
2064 		return;
2065 	}
2066 
2067 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2068 	       process->state < RAID_PROCESS_STATE_STOPPED);
2069 
2070 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2071 	if (ret != 0) {
2072 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2073 		free(ctx);
2074 		return;
2075 	}
2076 
2077 	process->state = RAID_PROCESS_STATE_STOPPING;
2078 
2079 	if (process->status == 0) {
2080 		process->status = -ENODEV;
2081 	}
2082 }
2083 
2084 static int
2085 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2086 				   struct raid_base_bdev_info *base_info)
2087 {
2088 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2089 
2090 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2091 
2092 	ctx = calloc(1, sizeof(*ctx));
2093 	if (ctx == NULL) {
2094 		return -ENOMEM;
2095 	}
2096 
2097 	/*
2098 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2099 	 * because the process thread should not access raid_bdev's properties. Particularly,
2100 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2101 	 * will still be valid until the process is fully stopped.
2102 	 */
2103 	ctx->base_info = base_info;
2104 	ctx->process = process;
2105 	/*
2106 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2107 	 * after the removal and more than one base bdev may be removed at the same time
2108 	 */
2109 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2110 		if (base_info->is_configured && !base_info->remove_scheduled) {
2111 			ctx->num_base_bdevs_operational++;
2112 		}
2113 	}
2114 
2115 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2116 
2117 	return 0;
2118 }
2119 
2120 static int
2121 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2122 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2123 {
2124 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2125 	int ret = 0;
2126 
2127 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2128 
2129 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2130 
2131 	if (base_info->remove_scheduled) {
2132 		return -ENODEV;
2133 	}
2134 
2135 	assert(base_info->desc);
2136 	base_info->remove_scheduled = true;
2137 
2138 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2139 		/*
2140 		 * As raid bdev is not registered yet or already unregistered,
2141 		 * so cleanup should be done here itself.
2142 		 *
2143 		 * Removing a base bdev at this stage does not change the number of operational
2144 		 * base bdevs, only the number of discovered base bdevs.
2145 		 */
2146 		raid_bdev_free_base_bdev_resource(base_info);
2147 		base_info->remove_scheduled = false;
2148 		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2149 		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2150 			/* There is no base bdev for this raid, so free the raid device. */
2151 			raid_bdev_cleanup_and_free(raid_bdev);
2152 		}
2153 		if (cb_fn != NULL) {
2154 			cb_fn(cb_ctx, 0);
2155 		}
2156 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2157 		/* This raid bdev does not tolerate removing a base bdev. */
2158 		raid_bdev->num_base_bdevs_operational--;
2159 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2160 	} else {
2161 		base_info->remove_cb = cb_fn;
2162 		base_info->remove_cb_ctx = cb_ctx;
2163 
2164 		if (raid_bdev->process != NULL) {
2165 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2166 		} else {
2167 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2168 		}
2169 
2170 		if (ret != 0) {
2171 			base_info->remove_scheduled = false;
2172 		}
2173 	}
2174 
2175 	return ret;
2176 }
2177 
2178 /*
2179  * brief:
2180  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2181  * is removed. This function checks if this base bdev is part of any raid bdev
2182  * or not. If yes, it takes necessary action on that particular raid bdev.
2183  * params:
2184  * base_bdev - pointer to base bdev which got removed
2185  * cb_fn - callback function
2186  * cb_arg - argument to callback function
2187  * returns:
2188  * 0 - success
2189  * non zero - failure
2190  */
2191 int
2192 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2193 {
2194 	struct raid_base_bdev_info *base_info;
2195 
2196 	/* Find the raid_bdev which has claimed this base_bdev */
2197 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2198 	if (!base_info) {
2199 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2200 		return -ENODEV;
2201 	}
2202 
2203 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2204 }
2205 
2206 static void
2207 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2208 {
2209 	if (status != 0) {
2210 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2211 			    raid_bdev->bdev.name, spdk_strerror(-status));
2212 	}
2213 }
2214 
2215 /*
2216  * brief:
2217  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2218  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2219  * If yes, call module handler to resize the raid_bdev if implemented.
2220  * params:
2221  * base_bdev - pointer to base bdev which got resized.
2222  * returns:
2223  * none
2224  */
2225 static void
2226 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2227 {
2228 	struct raid_bdev *raid_bdev;
2229 	struct raid_base_bdev_info *base_info;
2230 	uint64_t blockcnt_old;
2231 
2232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2233 
2234 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2235 
2236 	/* Find the raid_bdev which has claimed this base_bdev */
2237 	if (!base_info) {
2238 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2239 		return;
2240 	}
2241 	raid_bdev = base_info->raid_bdev;
2242 
2243 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2244 
2245 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2246 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2247 
2248 	base_info->blockcnt = base_bdev->blockcnt;
2249 
2250 	if (!raid_bdev->module->resize) {
2251 		return;
2252 	}
2253 
2254 	blockcnt_old = raid_bdev->bdev.blockcnt;
2255 	if (raid_bdev->module->resize(raid_bdev) == false) {
2256 		return;
2257 	}
2258 
2259 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2260 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2261 
2262 	if (raid_bdev->superblock_enabled) {
2263 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2264 		uint8_t i;
2265 
2266 		for (i = 0; i < sb->base_bdevs_size; i++) {
2267 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2268 
2269 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
2270 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2271 				sb_base_bdev->data_size = base_info->data_size;
2272 			}
2273 		}
2274 		sb->raid_size = raid_bdev->bdev.blockcnt;
2275 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2276 	}
2277 }
2278 
2279 /*
2280  * brief:
2281  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2282  * triggers asynchronous event.
2283  * params:
2284  * type - event details.
2285  * bdev - bdev that triggered event.
2286  * event_ctx - context for event.
2287  * returns:
2288  * none
2289  */
2290 static void
2291 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2292 			  void *event_ctx)
2293 {
2294 	int rc;
2295 
2296 	switch (type) {
2297 	case SPDK_BDEV_EVENT_REMOVE:
2298 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2299 		if (rc != 0) {
2300 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2301 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2302 		}
2303 		break;
2304 	case SPDK_BDEV_EVENT_RESIZE:
2305 		raid_bdev_resize_base_bdev(bdev);
2306 		break;
2307 	default:
2308 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2309 		break;
2310 	}
2311 }
2312 
2313 /*
2314  * brief:
2315  * Deletes the specified raid bdev
2316  * params:
2317  * raid_bdev - pointer to raid bdev
2318  * cb_fn - callback function
2319  * cb_arg - argument to callback function
2320  */
2321 void
2322 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2323 {
2324 	struct raid_base_bdev_info *base_info;
2325 
2326 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2327 
2328 	if (raid_bdev->destroy_started) {
2329 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2330 			      raid_bdev->bdev.name);
2331 		if (cb_fn) {
2332 			cb_fn(cb_arg, -EALREADY);
2333 		}
2334 		return;
2335 	}
2336 
2337 	raid_bdev->destroy_started = true;
2338 
2339 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2340 		base_info->remove_scheduled = true;
2341 
2342 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2343 			/*
2344 			 * As raid bdev is not registered yet or already unregistered,
2345 			 * so cleanup should be done here itself.
2346 			 */
2347 			raid_bdev_free_base_bdev_resource(base_info);
2348 		}
2349 	}
2350 
2351 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2352 		/* There is no base bdev for this raid, so free the raid device. */
2353 		raid_bdev_cleanup_and_free(raid_bdev);
2354 		if (cb_fn) {
2355 			cb_fn(cb_arg, 0);
2356 		}
2357 	} else {
2358 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2359 	}
2360 }
2361 
2362 static void
2363 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2364 {
2365 	if (status != 0) {
2366 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2367 			    raid_bdev->bdev.name, spdk_strerror(-status));
2368 	}
2369 }
2370 
2371 static void
2372 raid_bdev_process_finish_write_sb(void *ctx)
2373 {
2374 	struct raid_bdev *raid_bdev = ctx;
2375 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2376 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2377 	struct raid_base_bdev_info *base_info;
2378 	uint8_t i;
2379 
2380 	for (i = 0; i < sb->base_bdevs_size; i++) {
2381 		sb_base_bdev = &sb->base_bdevs[i];
2382 
2383 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2384 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2385 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2386 			if (base_info->is_configured) {
2387 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2388 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2389 			}
2390 		}
2391 	}
2392 
2393 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2394 }
2395 
2396 static void raid_bdev_process_free(struct raid_bdev_process *process);
2397 
2398 static void
2399 _raid_bdev_process_finish_done(void *ctx)
2400 {
2401 	struct raid_bdev_process *process = ctx;
2402 	struct raid_process_finish_action *finish_action;
2403 
2404 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2405 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2406 		finish_action->cb(finish_action->cb_ctx);
2407 		free(finish_action);
2408 	}
2409 
2410 	raid_bdev_process_free(process);
2411 
2412 	spdk_thread_exit(spdk_get_thread());
2413 }
2414 
2415 static void
2416 raid_bdev_process_finish_target_removed(void *ctx, int status)
2417 {
2418 	struct raid_bdev_process *process = ctx;
2419 
2420 	if (status != 0) {
2421 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2422 	}
2423 
2424 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2425 }
2426 
2427 static void
2428 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2429 {
2430 	struct raid_bdev_process *process = ctx;
2431 
2432 	if (status != 0) {
2433 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2434 	}
2435 
2436 	if (process->status != 0) {
2437 		struct raid_base_bdev_info *target = process->target;
2438 
2439 		if (target->is_configured && !target->remove_scheduled) {
2440 			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
2441 			return;
2442 		}
2443 	}
2444 
2445 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2446 }
2447 
2448 static void
2449 raid_bdev_process_finish_unquiesce(void *ctx)
2450 {
2451 	struct raid_bdev_process *process = ctx;
2452 	int rc;
2453 
2454 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2455 				 raid_bdev_process_finish_unquiesced, process);
2456 	if (rc != 0) {
2457 		raid_bdev_process_finish_unquiesced(process, rc);
2458 	}
2459 }
2460 
2461 static void
2462 raid_bdev_process_finish_done(void *ctx)
2463 {
2464 	struct raid_bdev_process *process = ctx;
2465 	struct raid_bdev *raid_bdev = process->raid_bdev;
2466 
2467 	if (process->raid_ch != NULL) {
2468 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2469 	}
2470 
2471 	process->state = RAID_PROCESS_STATE_STOPPED;
2472 
2473 	if (process->status == 0) {
2474 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2475 			       raid_bdev_process_to_str(process->type),
2476 			       raid_bdev->bdev.name);
2477 		if (raid_bdev->superblock_enabled) {
2478 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2479 					     raid_bdev_process_finish_write_sb,
2480 					     raid_bdev);
2481 		}
2482 	} else {
2483 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2484 			     raid_bdev_process_to_str(process->type),
2485 			     raid_bdev->bdev.name,
2486 			     spdk_strerror(-process->status));
2487 	}
2488 
2489 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2490 			     process);
2491 }
2492 
2493 static void
2494 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2495 {
2496 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2497 
2498 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2499 }
2500 
2501 static void
2502 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2503 {
2504 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2505 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2506 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2507 
2508 	if (process->status == 0) {
2509 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2510 
2511 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2512 		raid_ch->process.target_ch = NULL;
2513 	}
2514 
2515 	raid_bdev_ch_process_cleanup(raid_ch);
2516 
2517 	spdk_for_each_channel_continue(i, 0);
2518 }
2519 
2520 static void
2521 raid_bdev_process_finish_quiesced(void *ctx, int status)
2522 {
2523 	struct raid_bdev_process *process = ctx;
2524 	struct raid_bdev *raid_bdev = process->raid_bdev;
2525 
2526 	if (status != 0) {
2527 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2528 		return;
2529 	}
2530 
2531 	raid_bdev->process = NULL;
2532 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2533 			      __raid_bdev_process_finish);
2534 }
2535 
2536 static void
2537 _raid_bdev_process_finish(void *ctx)
2538 {
2539 	struct raid_bdev_process *process = ctx;
2540 	int rc;
2541 
2542 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2543 			       raid_bdev_process_finish_quiesced, process);
2544 	if (rc != 0) {
2545 		raid_bdev_process_finish_quiesced(ctx, rc);
2546 	}
2547 }
2548 
2549 static void
2550 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2551 {
2552 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2553 }
2554 
2555 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2556 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2557 
2558 static void
2559 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2560 {
2561 	assert(spdk_get_thread() == process->thread);
2562 
2563 	if (process->status == 0) {
2564 		process->status = status;
2565 	}
2566 
2567 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2568 		return;
2569 	}
2570 
2571 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2572 	process->state = RAID_PROCESS_STATE_STOPPING;
2573 
2574 	if (process->window_range_locked) {
2575 		raid_bdev_process_unlock_window_range(process);
2576 	} else {
2577 		raid_bdev_process_thread_run(process);
2578 	}
2579 }
2580 
2581 static void
2582 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2583 {
2584 	struct raid_bdev_process *process = ctx;
2585 
2586 	if (status != 0) {
2587 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2588 		raid_bdev_process_finish(process, status);
2589 		return;
2590 	}
2591 
2592 	process->window_range_locked = false;
2593 	process->window_offset += process->window_size;
2594 
2595 	raid_bdev_process_thread_run(process);
2596 }
2597 
2598 static void
2599 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2600 {
2601 	int rc;
2602 
2603 	assert(process->window_range_locked == true);
2604 
2605 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2606 				       process->window_offset, process->max_window_size,
2607 				       raid_bdev_process_window_range_unlocked, process);
2608 	if (rc != 0) {
2609 		raid_bdev_process_window_range_unlocked(process, rc);
2610 	}
2611 }
2612 
2613 static void
2614 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2615 {
2616 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2617 
2618 	raid_bdev_process_unlock_window_range(process);
2619 }
2620 
2621 static void
2622 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2623 {
2624 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2625 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2626 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2627 
2628 	raid_ch->process.offset = process->window_offset + process->window_size;
2629 
2630 	spdk_for_each_channel_continue(i, 0);
2631 }
2632 
2633 void
2634 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2635 {
2636 	struct raid_bdev_process *process = process_req->process;
2637 
2638 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2639 
2640 	assert(spdk_get_thread() == process->thread);
2641 	assert(process->window_remaining >= process_req->num_blocks);
2642 
2643 	if (status != 0) {
2644 		process->window_status = status;
2645 	}
2646 
2647 	process->window_remaining -= process_req->num_blocks;
2648 	if (process->window_remaining == 0) {
2649 		if (process->window_status != 0) {
2650 			raid_bdev_process_finish(process, process->window_status);
2651 			return;
2652 		}
2653 
2654 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2655 				      raid_bdev_process_channels_update_done);
2656 	}
2657 }
2658 
2659 static int
2660 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2661 				 uint32_t num_blocks)
2662 {
2663 	struct raid_bdev *raid_bdev = process->raid_bdev;
2664 	struct raid_bdev_process_request *process_req;
2665 	int ret;
2666 
2667 	process_req = TAILQ_FIRST(&process->requests);
2668 	if (process_req == NULL) {
2669 		assert(process->window_remaining > 0);
2670 		return 0;
2671 	}
2672 
2673 	process_req->target = process->target;
2674 	process_req->target_ch = process->raid_ch->process.target_ch;
2675 	process_req->offset_blocks = offset_blocks;
2676 	process_req->num_blocks = num_blocks;
2677 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2678 
2679 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2680 	if (ret <= 0) {
2681 		if (ret < 0) {
2682 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2683 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2684 			process->window_status = ret;
2685 		}
2686 		return ret;
2687 	}
2688 
2689 	process_req->num_blocks = ret;
2690 	TAILQ_REMOVE(&process->requests, process_req, link);
2691 
2692 	return ret;
2693 }
2694 
2695 static void
2696 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2697 {
2698 	struct raid_bdev *raid_bdev = process->raid_bdev;
2699 	uint64_t offset = process->window_offset;
2700 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2701 	int ret;
2702 
2703 	while (offset < offset_end) {
2704 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2705 		if (ret <= 0) {
2706 			break;
2707 		}
2708 
2709 		process->window_remaining += ret;
2710 		offset += ret;
2711 	}
2712 
2713 	if (process->window_remaining > 0) {
2714 		process->window_size = process->window_remaining;
2715 	} else {
2716 		raid_bdev_process_finish(process, process->window_status);
2717 	}
2718 }
2719 
2720 static void
2721 raid_bdev_process_window_range_locked(void *ctx, int status)
2722 {
2723 	struct raid_bdev_process *process = ctx;
2724 
2725 	if (status != 0) {
2726 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2727 		raid_bdev_process_finish(process, status);
2728 		return;
2729 	}
2730 
2731 	process->window_range_locked = true;
2732 
2733 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2734 		raid_bdev_process_unlock_window_range(process);
2735 		return;
2736 	}
2737 
2738 	_raid_bdev_process_thread_run(process);
2739 }
2740 
2741 static void
2742 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2743 {
2744 	struct raid_bdev *raid_bdev = process->raid_bdev;
2745 	int rc;
2746 
2747 	assert(spdk_get_thread() == process->thread);
2748 	assert(process->window_remaining == 0);
2749 	assert(process->window_range_locked == false);
2750 
2751 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2752 		raid_bdev_process_do_finish(process);
2753 		return;
2754 	}
2755 
2756 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2757 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2758 		raid_bdev_process_finish(process, 0);
2759 		return;
2760 	}
2761 
2762 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2763 					    process->max_window_size);
2764 
2765 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2766 				     process->window_offset, process->max_window_size,
2767 				     raid_bdev_process_window_range_locked, process);
2768 	if (rc != 0) {
2769 		raid_bdev_process_window_range_locked(process, rc);
2770 	}
2771 }
2772 
2773 static void
2774 raid_bdev_process_thread_init(void *ctx)
2775 {
2776 	struct raid_bdev_process *process = ctx;
2777 	struct raid_bdev *raid_bdev = process->raid_bdev;
2778 	struct spdk_io_channel *ch;
2779 
2780 	process->thread = spdk_get_thread();
2781 
2782 	ch = spdk_get_io_channel(raid_bdev);
2783 	if (ch == NULL) {
2784 		process->status = -ENOMEM;
2785 		raid_bdev_process_do_finish(process);
2786 		return;
2787 	}
2788 
2789 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2790 	process->state = RAID_PROCESS_STATE_RUNNING;
2791 
2792 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2793 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2794 
2795 	raid_bdev_process_thread_run(process);
2796 }
2797 
2798 static void
2799 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2800 {
2801 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2802 
2803 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2804 	raid_bdev_process_free(process);
2805 
2806 	/* TODO: update sb */
2807 }
2808 
2809 static void
2810 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2811 {
2812 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2813 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2814 
2815 	raid_bdev_ch_process_cleanup(raid_ch);
2816 
2817 	spdk_for_each_channel_continue(i, 0);
2818 }
2819 
2820 static void
2821 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2822 {
2823 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2824 	struct raid_bdev *raid_bdev = process->raid_bdev;
2825 	struct spdk_thread *thread;
2826 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2827 
2828 	if (status != 0) {
2829 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2830 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2831 			    spdk_strerror(-status));
2832 		goto err;
2833 	}
2834 
2835 	/* TODO: we may need to abort if a base bdev was removed before we got here */
2836 
2837 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2838 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2839 
2840 	thread = spdk_thread_create(thread_name, NULL);
2841 	if (thread == NULL) {
2842 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2843 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2844 		goto err;
2845 	}
2846 
2847 	raid_bdev->process = process;
2848 
2849 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2850 
2851 	return;
2852 err:
2853 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2854 			      raid_bdev_channels_abort_start_process_done);
2855 }
2856 
2857 static void
2858 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2859 {
2860 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2861 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2862 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2863 	int rc;
2864 
2865 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2866 
2867 	spdk_for_each_channel_continue(i, rc);
2868 }
2869 
2870 static void
2871 raid_bdev_process_start(struct raid_bdev_process *process)
2872 {
2873 	struct raid_bdev *raid_bdev = process->raid_bdev;
2874 
2875 	assert(raid_bdev->module->submit_process_request != NULL);
2876 
2877 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2878 			      raid_bdev_channels_start_process_done);
2879 }
2880 
2881 static void
2882 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2883 {
2884 	spdk_dma_free(process_req->iov.iov_base);
2885 	spdk_dma_free(process_req->md_buf);
2886 	free(process_req);
2887 }
2888 
2889 static struct raid_bdev_process_request *
2890 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2891 {
2892 	struct raid_bdev *raid_bdev = process->raid_bdev;
2893 	struct raid_bdev_process_request *process_req;
2894 
2895 	process_req = calloc(1, sizeof(*process_req));
2896 	if (process_req == NULL) {
2897 		return NULL;
2898 	}
2899 
2900 	process_req->process = process;
2901 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2902 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2903 	if (process_req->iov.iov_base == NULL) {
2904 		free(process_req);
2905 		return NULL;
2906 	}
2907 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2908 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2909 		if (process_req->md_buf == NULL) {
2910 			raid_bdev_process_request_free(process_req);
2911 			return NULL;
2912 		}
2913 	}
2914 
2915 	return process_req;
2916 }
2917 
2918 static void
2919 raid_bdev_process_free(struct raid_bdev_process *process)
2920 {
2921 	struct raid_bdev_process_request *process_req;
2922 
2923 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2924 		TAILQ_REMOVE(&process->requests, process_req, link);
2925 		raid_bdev_process_request_free(process_req);
2926 	}
2927 
2928 	free(process);
2929 }
2930 
2931 static struct raid_bdev_process *
2932 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2933 			struct raid_base_bdev_info *target)
2934 {
2935 	struct raid_bdev_process *process;
2936 	struct raid_bdev_process_request *process_req;
2937 	int i;
2938 
2939 	process = calloc(1, sizeof(*process));
2940 	if (process == NULL) {
2941 		return NULL;
2942 	}
2943 
2944 	process->raid_bdev = raid_bdev;
2945 	process->type = type;
2946 	process->target = target;
2947 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2948 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
2949 					    raid_bdev->bdev.write_unit_size);
2950 	TAILQ_INIT(&process->requests);
2951 	TAILQ_INIT(&process->finish_actions);
2952 
2953 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2954 		process_req = raid_bdev_process_alloc_request(process);
2955 		if (process_req == NULL) {
2956 			raid_bdev_process_free(process);
2957 			return NULL;
2958 		}
2959 
2960 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2961 	}
2962 
2963 	return process;
2964 }
2965 
2966 static int
2967 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2968 {
2969 	struct raid_bdev_process *process;
2970 
2971 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2972 
2973 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2974 	if (process == NULL) {
2975 		return -ENOMEM;
2976 	}
2977 
2978 	raid_bdev_process_start(process);
2979 
2980 	return 0;
2981 }
2982 
2983 static void
2984 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
2985 {
2986 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2987 	int rc;
2988 
2989 	/* TODO: defer if rebuild in progress on another base bdev */
2990 	assert(raid_bdev->process == NULL);
2991 
2992 	base_info->is_configured = true;
2993 
2994 	raid_bdev->num_base_bdevs_discovered++;
2995 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
2996 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
2997 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
2998 
2999 	/*
3000 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3001 	 * of base bdevs we know to be operational members of the array. Usually this is equal
3002 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3003 	 * degraded.
3004 	 */
3005 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3006 		rc = raid_bdev_configure(raid_bdev);
3007 		if (rc != 0) {
3008 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3009 		}
3010 	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
3011 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3012 		raid_bdev->num_base_bdevs_operational++;
3013 		rc = raid_bdev_start_rebuild(base_info);
3014 		if (rc != 0) {
3015 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3016 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3017 		}
3018 	} else {
3019 		rc = 0;
3020 	}
3021 
3022 	if (base_info->configure_cb != NULL) {
3023 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
3024 	}
3025 }
3026 
3027 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3028 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3029 
3030 static void
3031 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3032 		void *ctx)
3033 {
3034 	struct raid_base_bdev_info *base_info = ctx;
3035 
3036 	switch (status) {
3037 	case 0:
3038 		/* valid superblock found */
3039 		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3040 			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3041 
3042 			raid_bdev_free_base_bdev_resource(base_info);
3043 			raid_bdev_examine_sb(sb, bdev, base_info->configure_cb, base_info->configure_cb_ctx);
3044 			return;
3045 		}
3046 		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3047 		status = -EEXIST;
3048 		raid_bdev_free_base_bdev_resource(base_info);
3049 		break;
3050 	case -EINVAL:
3051 		/* no valid superblock */
3052 		raid_bdev_configure_base_bdev_cont(base_info);
3053 		return;
3054 	default:
3055 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3056 			    base_info->name, spdk_strerror(-status));
3057 		break;
3058 	}
3059 
3060 	if (base_info->configure_cb != NULL) {
3061 		base_info->configure_cb(base_info->configure_cb_ctx, status);
3062 	}
3063 }
3064 
3065 static int
3066 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3067 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3068 {
3069 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3070 	struct spdk_bdev_desc *desc;
3071 	struct spdk_bdev *bdev;
3072 	const struct spdk_uuid *bdev_uuid;
3073 	int rc;
3074 
3075 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3076 	assert(base_info->desc == NULL);
3077 
3078 	/*
3079 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3080 	 * before claiming the bdev.
3081 	 */
3082 
3083 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3084 		char uuid_str[SPDK_UUID_STRING_LEN];
3085 		const char *bdev_name;
3086 
3087 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3088 
3089 		/* UUID of a bdev is registered as its alias */
3090 		bdev = spdk_bdev_get_by_name(uuid_str);
3091 		if (bdev == NULL) {
3092 			return -ENODEV;
3093 		}
3094 
3095 		bdev_name = spdk_bdev_get_name(bdev);
3096 
3097 		if (base_info->name == NULL) {
3098 			assert(existing == true);
3099 			base_info->name = strdup(bdev_name);
3100 			if (base_info->name == NULL) {
3101 				return -ENOMEM;
3102 			}
3103 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3104 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3105 				    bdev_name, base_info->name);
3106 			return -EINVAL;
3107 		}
3108 	}
3109 
3110 	assert(base_info->name != NULL);
3111 
3112 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3113 	if (rc != 0) {
3114 		if (rc != -ENODEV) {
3115 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3116 		}
3117 		return rc;
3118 	}
3119 
3120 	bdev = spdk_bdev_desc_get_bdev(desc);
3121 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3122 
3123 	if (spdk_uuid_is_null(&base_info->uuid)) {
3124 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3125 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3126 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3127 		spdk_bdev_close(desc);
3128 		return -EINVAL;
3129 	}
3130 
3131 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3132 	if (rc != 0) {
3133 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3134 		spdk_bdev_close(desc);
3135 		return rc;
3136 	}
3137 
3138 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3139 
3140 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3141 	if (base_info->app_thread_ch == NULL) {
3142 		SPDK_ERRLOG("Failed to get io channel\n");
3143 		spdk_bdev_module_release_bdev(bdev);
3144 		spdk_bdev_close(desc);
3145 		return -ENOMEM;
3146 	}
3147 
3148 	base_info->desc = desc;
3149 	base_info->blockcnt = bdev->blockcnt;
3150 
3151 	if (raid_bdev->superblock_enabled) {
3152 		uint64_t data_offset;
3153 
3154 		if (base_info->data_offset == 0) {
3155 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3156 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3157 		} else {
3158 			data_offset = base_info->data_offset;
3159 		}
3160 
3161 		if (bdev->optimal_io_boundary != 0) {
3162 			data_offset = spdk_divide_round_up(data_offset,
3163 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3164 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3165 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3166 					     base_info->data_offset, base_info->name, data_offset);
3167 				data_offset = base_info->data_offset;
3168 			}
3169 		}
3170 
3171 		base_info->data_offset = data_offset;
3172 	}
3173 
3174 	if (base_info->data_offset >= bdev->blockcnt) {
3175 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3176 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3177 		rc = -EINVAL;
3178 		goto out;
3179 	}
3180 
3181 	if (base_info->data_size == 0) {
3182 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3183 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3184 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3185 			    bdev->blockcnt, base_info->name);
3186 		rc = -EINVAL;
3187 		goto out;
3188 	}
3189 
3190 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3191 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3192 			    bdev->name);
3193 		rc = -EINVAL;
3194 		goto out;
3195 	}
3196 
3197 	/*
3198 	 * Set the raid bdev properties if this is the first base bdev configured,
3199 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3200 	 * have the same blocklen and metadata format.
3201 	 */
3202 	if (raid_bdev->bdev.blocklen == 0) {
3203 		raid_bdev->bdev.blocklen = bdev->blocklen;
3204 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3205 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3206 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3207 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3208 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3209 	} else {
3210 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3211 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3212 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3213 			rc = -EINVAL;
3214 			goto out;
3215 		}
3216 
3217 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3218 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3219 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3220 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3221 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev)) {
3222 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3223 				    raid_bdev->bdev.name, bdev->name);
3224 			rc = -EINVAL;
3225 			goto out;
3226 		}
3227 	}
3228 
3229 	base_info->configure_cb = cb_fn;
3230 	base_info->configure_cb_ctx = cb_ctx;
3231 
3232 	if (existing) {
3233 		raid_bdev_configure_base_bdev_cont(base_info);
3234 	} else {
3235 		/* check for existing superblock when using a new bdev */
3236 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3237 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3238 		if (rc) {
3239 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3240 				    bdev->name, spdk_strerror(-rc));
3241 		}
3242 	}
3243 out:
3244 	if (rc != 0) {
3245 		raid_bdev_free_base_bdev_resource(base_info);
3246 	}
3247 	return rc;
3248 }
3249 
3250 int
3251 raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3252 			raid_base_bdev_cb cb_fn, void *cb_ctx)
3253 {
3254 	struct raid_base_bdev_info *base_info = NULL, *iter;
3255 	int rc;
3256 
3257 	assert(name != NULL);
3258 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3259 
3260 	if (raid_bdev->process != NULL) {
3261 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3262 			    raid_bdev->bdev.name);
3263 		return -EPERM;
3264 	}
3265 
3266 	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3267 		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3268 
3269 		if (bdev != NULL) {
3270 			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3271 				if (iter->name == NULL &&
3272 				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3273 					base_info = iter;
3274 					break;
3275 				}
3276 			}
3277 		}
3278 	}
3279 
3280 	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3281 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3282 			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3283 				base_info = iter;
3284 				break;
3285 			}
3286 		}
3287 	}
3288 
3289 	if (base_info == NULL) {
3290 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3291 			    raid_bdev->bdev.name, name);
3292 		return -EINVAL;
3293 	}
3294 
3295 	assert(base_info->is_configured == false);
3296 
3297 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3298 		assert(base_info->data_size != 0);
3299 		assert(base_info->desc == NULL);
3300 	}
3301 
3302 	base_info->name = strdup(name);
3303 	if (base_info->name == NULL) {
3304 		return -ENOMEM;
3305 	}
3306 
3307 	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3308 	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3309 		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3310 		free(base_info->name);
3311 		base_info->name = NULL;
3312 	}
3313 
3314 	return rc;
3315 }
3316 
3317 static int
3318 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3319 {
3320 	struct raid_bdev *raid_bdev;
3321 	uint8_t i;
3322 	int rc;
3323 
3324 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3325 			       sb->level, true, &sb->uuid, &raid_bdev);
3326 	if (rc != 0) {
3327 		return rc;
3328 	}
3329 
3330 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3331 	if (rc != 0) {
3332 		raid_bdev_free(raid_bdev);
3333 		return rc;
3334 	}
3335 
3336 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3337 	memcpy(raid_bdev->sb, sb, sb->length);
3338 
3339 	for (i = 0; i < sb->base_bdevs_size; i++) {
3340 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3341 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3342 
3343 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3344 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3345 			raid_bdev->num_base_bdevs_operational++;
3346 		}
3347 
3348 		base_info->data_offset = sb_base_bdev->data_offset;
3349 		base_info->data_size = sb_base_bdev->data_size;
3350 	}
3351 
3352 	*raid_bdev_out = raid_bdev;
3353 	return 0;
3354 }
3355 
3356 static void
3357 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3358 {
3359 	struct raid_bdev *raid_bdev;
3360 	struct raid_base_bdev_info *base_info;
3361 
3362 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3363 		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3364 			continue;
3365 		}
3366 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3367 			if (base_info->desc == NULL &&
3368 			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3369 			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3370 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3371 				break;
3372 			}
3373 		}
3374 	}
3375 }
3376 
3377 struct raid_bdev_examine_others_ctx {
3378 	struct spdk_uuid raid_bdev_uuid;
3379 	uint8_t current_base_bdev_idx;
3380 	raid_base_bdev_cb cb_fn;
3381 	void *cb_ctx;
3382 };
3383 
3384 static void
3385 raid_bdev_examine_others_done(void *_ctx, int status)
3386 {
3387 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3388 
3389 	if (ctx->cb_fn != NULL) {
3390 		ctx->cb_fn(ctx->cb_ctx, status);
3391 	}
3392 	free(ctx);
3393 }
3394 
3395 typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3396 		const struct raid_bdev_superblock *sb, int status, void *ctx);
3397 static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3398 				     void *cb_ctx);
3399 static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3400 				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3401 static void raid_bdev_examine_others(void *_ctx, int status);
3402 
3403 static void
3404 raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3405 				 int status, void *_ctx)
3406 {
3407 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3408 
3409 	if (status != 0) {
3410 		raid_bdev_examine_others_done(ctx, status);
3411 		return;
3412 	}
3413 
3414 	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3415 }
3416 
3417 static void
3418 raid_bdev_examine_others(void *_ctx, int status)
3419 {
3420 	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3421 	struct raid_bdev *raid_bdev;
3422 	struct raid_base_bdev_info *base_info;
3423 	char uuid_str[SPDK_UUID_STRING_LEN];
3424 
3425 	if (status != 0) {
3426 		goto out;
3427 	}
3428 
3429 	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3430 	if (raid_bdev == NULL) {
3431 		status = -ENODEV;
3432 		goto out;
3433 	}
3434 
3435 	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3436 	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3437 	     base_info++) {
3438 		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3439 			continue;
3440 		}
3441 
3442 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3443 
3444 		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3445 			continue;
3446 		}
3447 
3448 		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3449 
3450 		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3451 		if (status != 0) {
3452 			continue;
3453 		}
3454 		return;
3455 	}
3456 out:
3457 	raid_bdev_examine_others_done(ctx, status);
3458 }
3459 
3460 static void
3461 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3462 		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3463 {
3464 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3465 	struct raid_bdev *raid_bdev;
3466 	struct raid_base_bdev_info *iter, *base_info;
3467 	uint8_t i;
3468 	int rc;
3469 
3470 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3471 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3472 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3473 		rc = -EINVAL;
3474 		goto out;
3475 	}
3476 
3477 	if (spdk_uuid_is_null(&sb->uuid)) {
3478 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3479 		rc = -EINVAL;
3480 		goto out;
3481 	}
3482 
3483 	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3484 
3485 	if (raid_bdev) {
3486 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3487 			SPDK_DEBUGLOG(bdev_raid,
3488 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3489 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3490 
3491 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3492 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3493 					     raid_bdev->bdev.name, bdev->name);
3494 				rc = -EBUSY;
3495 				goto out;
3496 			}
3497 
3498 			/* remove and then recreate the raid bdev using the newer superblock */
3499 			raid_bdev_delete(raid_bdev, NULL, NULL);
3500 			raid_bdev = NULL;
3501 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3502 			SPDK_DEBUGLOG(bdev_raid,
3503 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3504 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3505 			/* use the current raid bdev superblock */
3506 			sb = raid_bdev->sb;
3507 		}
3508 	}
3509 
3510 	for (i = 0; i < sb->base_bdevs_size; i++) {
3511 		sb_base_bdev = &sb->base_bdevs[i];
3512 
3513 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3514 
3515 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3516 			break;
3517 		}
3518 	}
3519 
3520 	if (i == sb->base_bdevs_size) {
3521 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3522 		rc = -EINVAL;
3523 		goto out;
3524 	}
3525 
3526 	if (!raid_bdev) {
3527 		struct raid_bdev_examine_others_ctx *ctx;
3528 
3529 		ctx = calloc(1, sizeof(*ctx));
3530 		if (ctx == NULL) {
3531 			rc = -ENOMEM;
3532 			goto out;
3533 		}
3534 
3535 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3536 		if (rc != 0) {
3537 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3538 				    sb->name, spdk_strerror(-rc));
3539 			free(ctx);
3540 			goto out;
3541 		}
3542 
3543 		/* after this base bdev is configured, examine other base bdevs that may be present */
3544 		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3545 		ctx->cb_fn = cb_fn;
3546 		ctx->cb_ctx = cb_ctx;
3547 
3548 		cb_fn = raid_bdev_examine_others;
3549 		cb_ctx = ctx;
3550 	}
3551 
3552 	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3553 		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3554 		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3555 		assert(base_info->is_configured == false);
3556 		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3557 		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3558 		assert(spdk_uuid_is_null(&base_info->uuid));
3559 		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3560 		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3561 		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3562 		if (rc != 0) {
3563 			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3564 				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3565 		}
3566 		goto out;
3567 	}
3568 
3569 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3570 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3571 			       bdev->name, raid_bdev->bdev.name);
3572 		rc = -EINVAL;
3573 		goto out;
3574 	}
3575 
3576 	base_info = NULL;
3577 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3578 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3579 			base_info = iter;
3580 			break;
3581 		}
3582 	}
3583 
3584 	if (base_info == NULL) {
3585 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3586 			    bdev->name, raid_bdev->bdev.name);
3587 		rc = -EINVAL;
3588 		goto out;
3589 	}
3590 
3591 	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3592 	if (rc != 0) {
3593 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3594 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3595 	}
3596 out:
3597 	if (rc != 0 && cb_fn != 0) {
3598 		cb_fn(cb_ctx, rc);
3599 	}
3600 }
3601 
3602 struct raid_bdev_examine_ctx {
3603 	struct spdk_bdev_desc *desc;
3604 	struct spdk_io_channel *ch;
3605 	raid_bdev_examine_load_sb_cb cb;
3606 	void *cb_ctx;
3607 };
3608 
3609 static void
3610 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3611 {
3612 	if (!ctx) {
3613 		return;
3614 	}
3615 
3616 	if (ctx->ch) {
3617 		spdk_put_io_channel(ctx->ch);
3618 	}
3619 
3620 	if (ctx->desc) {
3621 		spdk_bdev_close(ctx->desc);
3622 	}
3623 
3624 	free(ctx);
3625 }
3626 
3627 static void
3628 raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3629 {
3630 	struct raid_bdev_examine_ctx *ctx = _ctx;
3631 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3632 
3633 	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3634 
3635 	raid_bdev_examine_ctx_free(ctx);
3636 }
3637 
3638 static void
3639 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3640 {
3641 }
3642 
3643 static int
3644 raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3645 {
3646 	struct raid_bdev_examine_ctx *ctx;
3647 	int rc;
3648 
3649 	assert(cb != NULL);
3650 
3651 	ctx = calloc(1, sizeof(*ctx));
3652 	if (!ctx) {
3653 		return -ENOMEM;
3654 	}
3655 
3656 	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3657 	if (rc) {
3658 		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3659 		goto err;
3660 	}
3661 
3662 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3663 	if (!ctx->ch) {
3664 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3665 		rc = -ENOMEM;
3666 		goto err;
3667 	}
3668 
3669 	ctx->cb = cb;
3670 	ctx->cb_ctx = cb_ctx;
3671 
3672 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3673 	if (rc) {
3674 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3675 			    bdev_name, spdk_strerror(-rc));
3676 		goto err;
3677 	}
3678 
3679 	return 0;
3680 err:
3681 	raid_bdev_examine_ctx_free(ctx);
3682 	return rc;
3683 }
3684 
3685 static void
3686 raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3687 		       void *ctx)
3688 {
3689 	switch (status) {
3690 	case 0:
3691 		/* valid superblock found */
3692 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3693 		raid_bdev_examine_sb(sb, bdev, NULL, NULL);
3694 		break;
3695 	case -EINVAL:
3696 		/* no valid superblock, check if it can be claimed anyway */
3697 		raid_bdev_examine_no_sb(bdev);
3698 		break;
3699 	default:
3700 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3701 			    bdev->name, spdk_strerror(-status));
3702 		break;
3703 	}
3704 
3705 	spdk_bdev_module_examine_done(&g_raid_if);
3706 }
3707 
3708 /*
3709  * brief:
3710  * raid_bdev_examine function is the examine function call by the below layers
3711  * like bdev_nvme layer. This function will check if this base bdev can be
3712  * claimed by this raid bdev or not.
3713  * params:
3714  * bdev - pointer to base bdev
3715  * returns:
3716  * none
3717  */
3718 static void
3719 raid_bdev_examine(struct spdk_bdev *bdev)
3720 {
3721 	int rc;
3722 
3723 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3724 		goto done;
3725 	}
3726 
3727 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3728 		raid_bdev_examine_no_sb(bdev);
3729 		goto done;
3730 	}
3731 
3732 	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3733 	if (rc != 0) {
3734 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3735 			    bdev->name, spdk_strerror(-rc));
3736 		goto done;
3737 	}
3738 
3739 	return;
3740 done:
3741 	spdk_bdev_module_examine_done(&g_raid_if);
3742 }
3743 
3744 /* Log component for bdev raid bdev module */
3745 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3746