xref: /spdk/module/bdev/raid/bdev_raid.c (revision 9c9f7ddbbe5483ec0b43cb9e4b82cabcec1f320a)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 /* Function declarations */
141 static void	raid_bdev_examine(struct spdk_bdev *bdev);
142 static int	raid_bdev_init(void);
143 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
144 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
145 
146 static void
147 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
148 {
149 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
150 
151 	if (raid_ch->process.target_ch != NULL) {
152 		spdk_put_io_channel(raid_ch->process.target_ch);
153 		raid_ch->process.target_ch = NULL;
154 	}
155 
156 	if (raid_ch->process.ch_processed != NULL) {
157 		free(raid_ch->process.ch_processed->base_channel);
158 		free(raid_ch->process.ch_processed);
159 		raid_ch->process.ch_processed = NULL;
160 	}
161 }
162 
163 static int
164 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
165 {
166 	struct raid_bdev *raid_bdev = process->raid_bdev;
167 	struct raid_bdev_io_channel *raid_ch_processed;
168 	struct raid_base_bdev_info *base_info;
169 
170 	raid_ch->process.offset = process->window_offset;
171 
172 	/* In the future we may have other types of processes which don't use a target bdev,
173 	 * like data scrubbing or strip size migration. Until then, expect that there always is
174 	 * a process target. */
175 	assert(process->target != NULL);
176 
177 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
178 	if (raid_ch->process.target_ch == NULL) {
179 		goto err;
180 	}
181 
182 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
183 	if (raid_ch_processed == NULL) {
184 		goto err;
185 	}
186 	raid_ch->process.ch_processed = raid_ch_processed;
187 
188 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
189 					  sizeof(*raid_ch_processed->base_channel));
190 	if (raid_ch_processed->base_channel == NULL) {
191 		goto err;
192 	}
193 
194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
195 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
196 
197 		if (base_info != process->target) {
198 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
199 		} else {
200 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
201 		}
202 	}
203 
204 	raid_ch_processed->module_channel = raid_ch->module_channel;
205 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
206 
207 	return 0;
208 err:
209 	raid_bdev_ch_process_cleanup(raid_ch);
210 	return -ENOMEM;
211 }
212 
213 /*
214  * brief:
215  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
216  * hierarchy from raid bdev to base bdev io channels. It will be called per core
217  * params:
218  * io_device - pointer to raid bdev io device represented by raid_bdev
219  * ctx_buf - pointer to context buffer for raid bdev io channel
220  * returns:
221  * 0 - success
222  * non zero - failure
223  */
224 static int
225 raid_bdev_create_cb(void *io_device, void *ctx_buf)
226 {
227 	struct raid_bdev            *raid_bdev = io_device;
228 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
229 	uint8_t i;
230 	int ret = -ENOMEM;
231 
232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
233 
234 	assert(raid_bdev != NULL);
235 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
236 
237 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
238 	if (!raid_ch->base_channel) {
239 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
240 		return -ENOMEM;
241 	}
242 
243 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
244 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
245 		/*
246 		 * Get the spdk_io_channel for all the base bdevs. This is used during
247 		 * split logic to send the respective child bdev ios to respective base
248 		 * bdev io channel.
249 		 * Skip missing base bdevs and the process target, which should also be treated as
250 		 * missing until the process completes.
251 		 */
252 		if (raid_bdev->base_bdev_info[i].desc == NULL ||
253 		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
254 			continue;
255 		}
256 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
257 						   raid_bdev->base_bdev_info[i].desc);
258 		if (!raid_ch->base_channel[i]) {
259 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
260 			goto err;
261 		}
262 	}
263 
264 	if (raid_bdev->process != NULL) {
265 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
266 		if (ret != 0) {
267 			SPDK_ERRLOG("Failed to setup process io channel\n");
268 			goto err;
269 		}
270 	} else {
271 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
272 	}
273 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
274 
275 	if (raid_bdev->module->get_io_channel) {
276 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
277 		if (!raid_ch->module_channel) {
278 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
279 			goto err_unlocked;
280 		}
281 	}
282 
283 	return 0;
284 err:
285 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
286 err_unlocked:
287 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
288 		if (raid_ch->base_channel[i] != NULL) {
289 			spdk_put_io_channel(raid_ch->base_channel[i]);
290 		}
291 	}
292 	free(raid_ch->base_channel);
293 
294 	raid_bdev_ch_process_cleanup(raid_ch);
295 
296 	return ret;
297 }
298 
299 /*
300  * brief:
301  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
302  * hierarchy from raid bdev to base bdev io channels. It will be called per core
303  * params:
304  * io_device - pointer to raid bdev io device represented by raid_bdev
305  * ctx_buf - pointer to context buffer for raid bdev io channel
306  * returns:
307  * none
308  */
309 static void
310 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
311 {
312 	struct raid_bdev *raid_bdev = io_device;
313 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
314 	uint8_t i;
315 
316 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
317 
318 	assert(raid_ch != NULL);
319 	assert(raid_ch->base_channel);
320 
321 	if (raid_ch->module_channel) {
322 		spdk_put_io_channel(raid_ch->module_channel);
323 	}
324 
325 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
326 		/* Free base bdev channels */
327 		if (raid_ch->base_channel[i] != NULL) {
328 			spdk_put_io_channel(raid_ch->base_channel[i]);
329 		}
330 	}
331 	free(raid_ch->base_channel);
332 	raid_ch->base_channel = NULL;
333 
334 	raid_bdev_ch_process_cleanup(raid_ch);
335 }
336 
337 /*
338  * brief:
339  * raid_bdev_cleanup is used to cleanup raid_bdev related data
340  * structures.
341  * params:
342  * raid_bdev - pointer to raid_bdev
343  * returns:
344  * none
345  */
346 static void
347 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
348 {
349 	struct raid_base_bdev_info *base_info;
350 
351 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
352 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
353 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
354 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
355 
356 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
357 		assert(base_info->desc == NULL);
358 		free(base_info->name);
359 	}
360 
361 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
362 }
363 
364 static void
365 raid_bdev_free(struct raid_bdev *raid_bdev)
366 {
367 	raid_bdev_free_superblock(raid_bdev);
368 	spdk_spin_destroy(&raid_bdev->base_bdev_lock);
369 	free(raid_bdev->base_bdev_info);
370 	free(raid_bdev->bdev.name);
371 	free(raid_bdev);
372 }
373 
374 static void
375 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
376 {
377 	raid_bdev_cleanup(raid_bdev);
378 	raid_bdev_free(raid_bdev);
379 }
380 
381 /*
382  * brief:
383  * free resource of base bdev for raid bdev
384  * params:
385  * base_info - raid base bdev info
386  * returns:
387  * none
388  */
389 static void
390 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
391 {
392 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
393 
394 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
395 
396 	free(base_info->name);
397 	base_info->name = NULL;
398 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
399 		spdk_uuid_set_null(&base_info->uuid);
400 	}
401 
402 	if (base_info->desc == NULL) {
403 		return;
404 	}
405 
406 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
407 	spdk_bdev_close(base_info->desc);
408 	base_info->desc = NULL;
409 	spdk_put_io_channel(base_info->app_thread_ch);
410 	base_info->app_thread_ch = NULL;
411 
412 	if (base_info->is_configured) {
413 		assert(raid_bdev->num_base_bdevs_discovered);
414 		raid_bdev->num_base_bdevs_discovered--;
415 		base_info->is_configured = false;
416 	}
417 }
418 
419 static void
420 raid_bdev_io_device_unregister_cb(void *io_device)
421 {
422 	struct raid_bdev *raid_bdev = io_device;
423 
424 	if (raid_bdev->num_base_bdevs_discovered == 0) {
425 		/* Free raid_bdev when there are no base bdevs left */
426 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
427 		raid_bdev_cleanup(raid_bdev);
428 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
429 		raid_bdev_free(raid_bdev);
430 	} else {
431 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
432 	}
433 }
434 
435 void
436 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
437 {
438 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
439 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
440 	}
441 }
442 
443 static void
444 _raid_bdev_destruct(void *ctxt)
445 {
446 	struct raid_bdev *raid_bdev = ctxt;
447 	struct raid_base_bdev_info *base_info;
448 
449 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
450 
451 	assert(raid_bdev->process == NULL);
452 
453 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
454 		/*
455 		 * Close all base bdev descriptors for which call has come from below
456 		 * layers.  Also close the descriptors if we have started shutdown.
457 		 */
458 		if (g_shutdown_started || base_info->remove_scheduled == true) {
459 			raid_bdev_free_base_bdev_resource(base_info);
460 		}
461 	}
462 
463 	if (g_shutdown_started) {
464 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
465 	}
466 
467 	if (raid_bdev->module->stop != NULL) {
468 		if (raid_bdev->module->stop(raid_bdev) == false) {
469 			return;
470 		}
471 	}
472 
473 	raid_bdev_module_stop_done(raid_bdev);
474 }
475 
476 static int
477 raid_bdev_destruct(void *ctx)
478 {
479 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
480 
481 	return 1;
482 }
483 
484 static int
485 raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
486 			   struct spdk_bdev *bdev, uint32_t remapped_offset)
487 {
488 	struct spdk_dif_ctx dif_ctx;
489 	struct spdk_dif_error err_blk = {};
490 	int rc;
491 	struct spdk_dif_ctx_init_ext_opts dif_opts;
492 	struct iovec md_iov = {
493 		.iov_base	= md_buf,
494 		.iov_len	= num_blocks * bdev->md_len,
495 	};
496 
497 	if (md_buf == NULL) {
498 		return 0;
499 	}
500 
501 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
502 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
503 	rc = spdk_dif_ctx_init(&dif_ctx,
504 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
505 			       bdev->dif_is_head_of_md, bdev->dif_type,
506 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
507 			       0, 0, 0, 0, 0, &dif_opts);
508 	if (rc != 0) {
509 		SPDK_ERRLOG("Initialization of DIF context failed\n");
510 		return rc;
511 	}
512 
513 	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
514 
515 	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
516 	if (rc != 0) {
517 		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
518 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
519 	}
520 
521 	return rc;
522 }
523 
524 int
525 raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
526 			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
527 {
528 	struct spdk_dif_ctx dif_ctx;
529 	struct spdk_dif_error err_blk = {};
530 	int rc;
531 	struct spdk_dif_ctx_init_ext_opts dif_opts;
532 	struct iovec md_iov = {
533 		.iov_base	= md_buf,
534 		.iov_len	= num_blocks * bdev->md_len,
535 	};
536 
537 	if (md_buf == NULL) {
538 		return 0;
539 	}
540 
541 	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
542 	dif_opts.dif_pi_format = SPDK_DIF_PI_FORMAT_16;
543 	rc = spdk_dif_ctx_init(&dif_ctx,
544 			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
545 			       bdev->dif_is_head_of_md, bdev->dif_type,
546 			       SPDK_DIF_FLAGS_REFTAG_CHECK,
547 			       offset_blocks, 0, 0, 0, 0, &dif_opts);
548 	if (rc != 0) {
549 		SPDK_ERRLOG("Initialization of DIF context failed\n");
550 		return rc;
551 	}
552 
553 	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
554 	if (rc != 0) {
555 		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
556 			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
557 	}
558 
559 	return rc;
560 }
561 
562 /**
563  * Raid bdev I/O read/write wrapper for spdk_bdev_readv_blocks_ext function.
564  */
565 int
566 raid_bdev_readv_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
567 			   struct iovec *iov, int iovcnt, uint64_t offset_blocks,
568 			   uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
569 			   struct spdk_bdev_ext_io_opts *opts)
570 {
571 	return spdk_bdev_readv_blocks_ext(base_info->desc, ch, iov, iovcnt,
572 					  base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
573 }
574 
575 /**
576  * Raid bdev I/O read/write wrapper for spdk_bdev_writev_blocks_ext function.
577  */
578 int
579 raid_bdev_writev_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
580 			    struct iovec *iov, int iovcnt, uint64_t offset_blocks,
581 			    uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
582 			    struct spdk_bdev_ext_io_opts *opts)
583 {
584 	int rc;
585 	uint64_t remapped_offset_blocks = base_info->data_offset + offset_blocks;
586 
587 	if (spdk_unlikely(spdk_bdev_get_dif_type(&base_info->raid_bdev->bdev) != SPDK_DIF_DISABLE &&
588 			  base_info->raid_bdev->bdev.dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK)) {
589 
590 		rc = raid_bdev_remap_dix_reftag(opts->metadata, num_blocks, &base_info->raid_bdev->bdev,
591 						remapped_offset_blocks);
592 		if (rc != 0) {
593 			return rc;
594 		}
595 	}
596 
597 	return spdk_bdev_writev_blocks_ext(base_info->desc, ch, iov, iovcnt,
598 					   remapped_offset_blocks, num_blocks, cb, cb_arg, opts);
599 }
600 
601 void
602 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
603 {
604 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
605 	int rc;
606 
607 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
608 		struct iovec *split_iov = raid_io->split.iov;
609 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
610 
611 		/*
612 		 * Non-zero offset here means that this is the completion of the first part of the
613 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
614 		 */
615 		if (raid_io->split.offset != 0) {
616 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
617 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
618 
619 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
620 				raid_io->num_blocks = raid_io->split.offset;
621 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
622 				raid_io->iovs = bdev_io->u.bdev.iovs;
623 				if (split_iov != NULL) {
624 					raid_io->iovcnt++;
625 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
626 					split_iov->iov_base = split_iov_orig->iov_base;
627 				}
628 
629 				raid_io->split.offset = 0;
630 				raid_io->base_bdev_io_submitted = 0;
631 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
632 
633 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
634 				return;
635 			}
636 		}
637 
638 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
639 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
640 		raid_io->iovs = bdev_io->u.bdev.iovs;
641 		if (split_iov != NULL) {
642 			*split_iov = *split_iov_orig;
643 		}
644 	}
645 
646 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
647 		raid_io->completion_cb(raid_io, status);
648 	} else {
649 		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
650 				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
651 				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
652 				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
653 
654 			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
655 							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
656 							bdev_io->u.bdev.offset_blocks);
657 			if (rc != 0) {
658 				status = SPDK_BDEV_IO_STATUS_FAILED;
659 			}
660 		}
661 		spdk_bdev_io_complete(bdev_io, status);
662 	}
663 }
664 
665 /*
666  * brief:
667  * raid_bdev_io_complete_part - signal the completion of a part of the expected
668  * base bdev IOs and complete the raid_io if this is the final expected IO.
669  * The caller should first set raid_io->base_bdev_io_remaining. This function
670  * will decrement this counter by the value of the 'completed' parameter and
671  * complete the raid_io if the counter reaches 0. The caller is free to
672  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
673  * it can represent e.g. blocks or IOs.
674  * params:
675  * raid_io - pointer to raid_bdev_io
676  * completed - the part of the raid_io that has been completed
677  * status - status of the base IO
678  * returns:
679  * true - if the raid_io is completed
680  * false - otherwise
681  */
682 bool
683 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
684 			   enum spdk_bdev_io_status status)
685 {
686 	assert(raid_io->base_bdev_io_remaining >= completed);
687 	raid_io->base_bdev_io_remaining -= completed;
688 
689 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
690 		raid_io->base_bdev_io_status = status;
691 	}
692 
693 	if (raid_io->base_bdev_io_remaining == 0) {
694 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
695 		return true;
696 	} else {
697 		return false;
698 	}
699 }
700 
701 /*
702  * brief:
703  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
704  * It will try to queue the IOs after storing the context to bdev wait queue logic.
705  * params:
706  * raid_io - pointer to raid_bdev_io
707  * bdev - the block device that the IO is submitted to
708  * ch - io channel
709  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
710  * returns:
711  * none
712  */
713 void
714 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
715 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
716 {
717 	raid_io->waitq_entry.bdev = bdev;
718 	raid_io->waitq_entry.cb_fn = cb_fn;
719 	raid_io->waitq_entry.cb_arg = raid_io;
720 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
721 }
722 
723 static void
724 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
725 {
726 	struct raid_bdev_io *raid_io = cb_arg;
727 
728 	spdk_bdev_free_io(bdev_io);
729 
730 	raid_bdev_io_complete_part(raid_io, 1, success ?
731 				   SPDK_BDEV_IO_STATUS_SUCCESS :
732 				   SPDK_BDEV_IO_STATUS_FAILED);
733 }
734 
735 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
736 
737 static void
738 _raid_bdev_submit_reset_request(void *_raid_io)
739 {
740 	struct raid_bdev_io *raid_io = _raid_io;
741 
742 	raid_bdev_submit_reset_request(raid_io);
743 }
744 
745 /*
746  * brief:
747  * raid_bdev_submit_reset_request function submits reset requests
748  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
749  * which case it will queue it for later submission
750  * params:
751  * raid_io
752  * returns:
753  * none
754  */
755 static void
756 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
757 {
758 	struct raid_bdev		*raid_bdev;
759 	int				ret;
760 	uint8_t				i;
761 	struct raid_base_bdev_info	*base_info;
762 	struct spdk_io_channel		*base_ch;
763 
764 	raid_bdev = raid_io->raid_bdev;
765 
766 	if (raid_io->base_bdev_io_remaining == 0) {
767 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
768 	}
769 
770 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
771 		base_info = &raid_bdev->base_bdev_info[i];
772 		base_ch = raid_io->raid_ch->base_channel[i];
773 		if (base_ch == NULL) {
774 			raid_io->base_bdev_io_submitted++;
775 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
776 			continue;
777 		}
778 		ret = spdk_bdev_reset(base_info->desc, base_ch,
779 				      raid_base_bdev_reset_complete, raid_io);
780 		if (ret == 0) {
781 			raid_io->base_bdev_io_submitted++;
782 		} else if (ret == -ENOMEM) {
783 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
784 						base_ch, _raid_bdev_submit_reset_request);
785 			return;
786 		} else {
787 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
788 			assert(false);
789 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
790 			return;
791 		}
792 	}
793 }
794 
795 static void
796 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
797 {
798 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
799 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
800 	int i;
801 
802 	assert(split_offset != 0);
803 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
804 	raid_io->split.offset = split_offset;
805 
806 	raid_io->offset_blocks += split_offset;
807 	raid_io->num_blocks -= split_offset;
808 	if (raid_io->md_buf != NULL) {
809 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
810 	}
811 
812 	for (i = 0; i < raid_io->iovcnt; i++) {
813 		struct iovec *iov = &raid_io->iovs[i];
814 
815 		if (iov_offset < iov->iov_len) {
816 			if (iov_offset == 0) {
817 				raid_io->split.iov = NULL;
818 			} else {
819 				raid_io->split.iov = iov;
820 				raid_io->split.iov_copy = *iov;
821 				iov->iov_base += iov_offset;
822 				iov->iov_len -= iov_offset;
823 			}
824 			raid_io->iovs += i;
825 			raid_io->iovcnt -= i;
826 			break;
827 		}
828 
829 		iov_offset -= iov->iov_len;
830 	}
831 }
832 
833 static void
834 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
835 {
836 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
837 
838 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
839 		uint64_t offset_begin = raid_io->offset_blocks;
840 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
841 
842 		if (offset_end > raid_ch->process.offset) {
843 			if (offset_begin < raid_ch->process.offset) {
844 				/*
845 				 * If the I/O spans both the processed and unprocessed ranges,
846 				 * split it and first handle the unprocessed part. After it
847 				 * completes, the rest will be handled.
848 				 * This situation occurs when the process thread is not active
849 				 * or is waiting for the process window range to be locked
850 				 * (quiesced). When a window is being processed, such I/Os will be
851 				 * deferred by the bdev layer until the window is unlocked.
852 				 */
853 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
854 					      raid_ch->process.offset, offset_begin, offset_end);
855 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
856 			}
857 		} else {
858 			/* Use the child channel, which corresponds to the already processed range */
859 			raid_io->raid_ch = raid_ch->process.ch_processed;
860 		}
861 	}
862 
863 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
864 }
865 
866 /*
867  * brief:
868  * Callback function to spdk_bdev_io_get_buf.
869  * params:
870  * ch - pointer to raid bdev io channel
871  * bdev_io - pointer to parent bdev_io on raid bdev device
872  * success - True if buffer is allocated or false otherwise.
873  * returns:
874  * none
875  */
876 static void
877 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
878 		     bool success)
879 {
880 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
881 
882 	if (!success) {
883 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
884 		return;
885 	}
886 
887 	raid_bdev_submit_rw_request(raid_io);
888 }
889 
890 void
891 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
892 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
893 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
894 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
895 {
896 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
897 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
898 
899 	raid_io->type = type;
900 	raid_io->offset_blocks = offset_blocks;
901 	raid_io->num_blocks = num_blocks;
902 	raid_io->iovs = iovs;
903 	raid_io->iovcnt = iovcnt;
904 	raid_io->memory_domain = memory_domain;
905 	raid_io->memory_domain_ctx = memory_domain_ctx;
906 	raid_io->md_buf = md_buf;
907 
908 	raid_io->raid_bdev = raid_bdev;
909 	raid_io->raid_ch = raid_ch;
910 	raid_io->base_bdev_io_remaining = 0;
911 	raid_io->base_bdev_io_submitted = 0;
912 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
913 	raid_io->completion_cb = NULL;
914 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
915 }
916 
917 /*
918  * brief:
919  * raid_bdev_submit_request function is the submit_request function pointer of
920  * raid bdev function table. This is used to submit the io on raid_bdev to below
921  * layers.
922  * params:
923  * ch - pointer to raid bdev io channel
924  * bdev_io - pointer to parent bdev_io on raid bdev device
925  * returns:
926  * none
927  */
928 static void
929 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
930 {
931 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
932 
933 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
934 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
935 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
936 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
937 
938 	switch (bdev_io->type) {
939 	case SPDK_BDEV_IO_TYPE_READ:
940 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
941 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
942 		break;
943 	case SPDK_BDEV_IO_TYPE_WRITE:
944 		raid_bdev_submit_rw_request(raid_io);
945 		break;
946 
947 	case SPDK_BDEV_IO_TYPE_RESET:
948 		raid_bdev_submit_reset_request(raid_io);
949 		break;
950 
951 	case SPDK_BDEV_IO_TYPE_FLUSH:
952 	case SPDK_BDEV_IO_TYPE_UNMAP:
953 		if (raid_io->raid_bdev->process != NULL) {
954 			/* TODO: rebuild support */
955 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
956 			return;
957 		}
958 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
959 		break;
960 
961 	default:
962 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
963 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
964 		break;
965 	}
966 }
967 
968 /*
969  * brief:
970  * _raid_bdev_io_type_supported checks whether io_type is supported in
971  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
972  * doesn't support, the raid device doesn't supports.
973  *
974  * params:
975  * raid_bdev - pointer to raid bdev context
976  * io_type - io type
977  * returns:
978  * true - io_type is supported
979  * false - io_type is not supported
980  */
981 inline static bool
982 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
983 {
984 	struct raid_base_bdev_info *base_info;
985 
986 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
987 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
988 		if (raid_bdev->module->submit_null_payload_request == NULL) {
989 			return false;
990 		}
991 	}
992 
993 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
994 		if (base_info->desc == NULL) {
995 			continue;
996 		}
997 
998 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
999 			return false;
1000 		}
1001 	}
1002 
1003 	return true;
1004 }
1005 
1006 /*
1007  * brief:
1008  * raid_bdev_io_type_supported is the io_supported function for bdev function
1009  * table which returns whether the particular io type is supported or not by
1010  * raid bdev module
1011  * params:
1012  * ctx - pointer to raid bdev context
1013  * type - io type
1014  * returns:
1015  * true - io_type is supported
1016  * false - io_type is not supported
1017  */
1018 static bool
1019 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1020 {
1021 	switch (io_type) {
1022 	case SPDK_BDEV_IO_TYPE_READ:
1023 	case SPDK_BDEV_IO_TYPE_WRITE:
1024 		return true;
1025 
1026 	case SPDK_BDEV_IO_TYPE_FLUSH:
1027 	case SPDK_BDEV_IO_TYPE_RESET:
1028 	case SPDK_BDEV_IO_TYPE_UNMAP:
1029 		return _raid_bdev_io_type_supported(ctx, io_type);
1030 
1031 	default:
1032 		return false;
1033 	}
1034 
1035 	return false;
1036 }
1037 
1038 /*
1039  * brief:
1040  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1041  * raid bdev. This is used to return the io channel for this raid bdev
1042  * params:
1043  * ctxt - pointer to raid_bdev
1044  * returns:
1045  * pointer to io channel for raid bdev
1046  */
1047 static struct spdk_io_channel *
1048 raid_bdev_get_io_channel(void *ctxt)
1049 {
1050 	struct raid_bdev *raid_bdev = ctxt;
1051 
1052 	return spdk_get_io_channel(raid_bdev);
1053 }
1054 
1055 void
1056 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1057 {
1058 	struct raid_base_bdev_info *base_info;
1059 	char uuid_str[SPDK_UUID_STRING_LEN];
1060 
1061 	assert(raid_bdev != NULL);
1062 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1063 
1064 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
1065 	spdk_json_write_named_string(w, "uuid", uuid_str);
1066 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1067 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1068 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1069 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1070 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1071 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1072 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1073 				     raid_bdev->num_base_bdevs_operational);
1074 	if (raid_bdev->process) {
1075 		struct raid_bdev_process *process = raid_bdev->process;
1076 		uint64_t offset = process->window_offset;
1077 
1078 		spdk_json_write_named_object_begin(w, "process");
1079 		spdk_json_write_name(w, "type");
1080 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1081 		spdk_json_write_named_string(w, "target", process->target->name);
1082 		spdk_json_write_named_object_begin(w, "progress");
1083 		spdk_json_write_named_uint64(w, "blocks", offset);
1084 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1085 		spdk_json_write_object_end(w);
1086 		spdk_json_write_object_end(w);
1087 	}
1088 	spdk_json_write_name(w, "base_bdevs_list");
1089 	spdk_json_write_array_begin(w);
1090 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1091 		spdk_json_write_object_begin(w);
1092 		spdk_json_write_name(w, "name");
1093 		if (base_info->name) {
1094 			spdk_json_write_string(w, base_info->name);
1095 		} else {
1096 			spdk_json_write_null(w);
1097 		}
1098 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
1099 		spdk_json_write_named_string(w, "uuid", uuid_str);
1100 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1101 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1102 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1103 		spdk_json_write_object_end(w);
1104 	}
1105 	spdk_json_write_array_end(w);
1106 }
1107 
1108 /*
1109  * brief:
1110  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1111  * params:
1112  * ctx - pointer to raid_bdev
1113  * w - pointer to json context
1114  * returns:
1115  * 0 - success
1116  * non zero - failure
1117  */
1118 static int
1119 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1120 {
1121 	struct raid_bdev *raid_bdev = ctx;
1122 
1123 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1124 
1125 	/* Dump the raid bdev configuration related information */
1126 	spdk_json_write_named_object_begin(w, "raid");
1127 	raid_bdev_write_info_json(raid_bdev, w);
1128 	spdk_json_write_object_end(w);
1129 
1130 	return 0;
1131 }
1132 
1133 /*
1134  * brief:
1135  * raid_bdev_write_config_json is the function table pointer for raid bdev
1136  * params:
1137  * bdev - pointer to spdk_bdev
1138  * w - pointer to json context
1139  * returns:
1140  * none
1141  */
1142 static void
1143 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1144 {
1145 	struct raid_bdev *raid_bdev = bdev->ctxt;
1146 	struct raid_base_bdev_info *base_info;
1147 	char uuid_str[SPDK_UUID_STRING_LEN];
1148 
1149 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1150 
1151 	if (raid_bdev->superblock_enabled) {
1152 		/* raid bdev configuration is stored in the superblock */
1153 		return;
1154 	}
1155 
1156 	spdk_json_write_object_begin(w);
1157 
1158 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1159 
1160 	spdk_json_write_named_object_begin(w, "params");
1161 	spdk_json_write_named_string(w, "name", bdev->name);
1162 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
1163 	spdk_json_write_named_string(w, "uuid", uuid_str);
1164 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1165 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1166 	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1167 
1168 	spdk_json_write_named_array_begin(w, "base_bdevs");
1169 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1170 		if (base_info->desc) {
1171 			spdk_json_write_string(w, spdk_bdev_desc_get_bdev(base_info->desc)->name);
1172 		}
1173 	}
1174 	spdk_json_write_array_end(w);
1175 	spdk_json_write_object_end(w);
1176 
1177 	spdk_json_write_object_end(w);
1178 }
1179 
1180 static int
1181 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1182 {
1183 	struct raid_bdev *raid_bdev = ctx;
1184 	struct raid_base_bdev_info *base_info;
1185 	int domains_count = 0, rc = 0;
1186 
1187 	if (raid_bdev->module->memory_domains_supported == false) {
1188 		return 0;
1189 	}
1190 
1191 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1192 
1193 	/* First loop to get the number of memory domains */
1194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1195 		if (base_info->desc == NULL) {
1196 			continue;
1197 		}
1198 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1199 		if (rc < 0) {
1200 			goto out;
1201 		}
1202 		domains_count += rc;
1203 	}
1204 
1205 	if (!domains || array_size < domains_count) {
1206 		goto out;
1207 	}
1208 
1209 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1210 		if (base_info->desc == NULL) {
1211 			continue;
1212 		}
1213 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1214 		if (rc < 0) {
1215 			goto out;
1216 		}
1217 		domains += rc;
1218 		array_size -= rc;
1219 	}
1220 out:
1221 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1222 
1223 	if (rc < 0) {
1224 		return rc;
1225 	}
1226 
1227 	return domains_count;
1228 }
1229 
1230 /* g_raid_bdev_fn_table is the function table for raid bdev */
1231 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1232 	.destruct		= raid_bdev_destruct,
1233 	.submit_request		= raid_bdev_submit_request,
1234 	.io_type_supported	= raid_bdev_io_type_supported,
1235 	.get_io_channel		= raid_bdev_get_io_channel,
1236 	.dump_info_json		= raid_bdev_dump_info_json,
1237 	.write_config_json	= raid_bdev_write_config_json,
1238 	.get_memory_domains	= raid_bdev_get_memory_domains,
1239 };
1240 
1241 struct raid_bdev *
1242 raid_bdev_find_by_name(const char *name)
1243 {
1244 	struct raid_bdev *raid_bdev;
1245 
1246 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1247 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1248 			return raid_bdev;
1249 		}
1250 	}
1251 
1252 	return NULL;
1253 }
1254 
1255 static struct {
1256 	const char *name;
1257 	enum raid_level value;
1258 } g_raid_level_names[] = {
1259 	{ "raid0", RAID0 },
1260 	{ "0", RAID0 },
1261 	{ "raid1", RAID1 },
1262 	{ "1", RAID1 },
1263 	{ "raid5f", RAID5F },
1264 	{ "5f", RAID5F },
1265 	{ "concat", CONCAT },
1266 	{ }
1267 };
1268 
1269 const char *g_raid_state_names[] = {
1270 	[RAID_BDEV_STATE_ONLINE]	= "online",
1271 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1272 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1273 	[RAID_BDEV_STATE_MAX]		= NULL
1274 };
1275 
1276 static const char *g_raid_process_type_names[] = {
1277 	[RAID_PROCESS_NONE]	= "none",
1278 	[RAID_PROCESS_REBUILD]	= "rebuild",
1279 	[RAID_PROCESS_MAX]	= NULL
1280 };
1281 
1282 /* We have to use the typedef in the function declaration to appease astyle. */
1283 typedef enum raid_level raid_level_t;
1284 typedef enum raid_bdev_state raid_bdev_state_t;
1285 
1286 raid_level_t
1287 raid_bdev_str_to_level(const char *str)
1288 {
1289 	unsigned int i;
1290 
1291 	assert(str != NULL);
1292 
1293 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1294 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1295 			return g_raid_level_names[i].value;
1296 		}
1297 	}
1298 
1299 	return INVALID_RAID_LEVEL;
1300 }
1301 
1302 const char *
1303 raid_bdev_level_to_str(enum raid_level level)
1304 {
1305 	unsigned int i;
1306 
1307 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1308 		if (g_raid_level_names[i].value == level) {
1309 			return g_raid_level_names[i].name;
1310 		}
1311 	}
1312 
1313 	return "";
1314 }
1315 
1316 raid_bdev_state_t
1317 raid_bdev_str_to_state(const char *str)
1318 {
1319 	unsigned int i;
1320 
1321 	assert(str != NULL);
1322 
1323 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1324 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1325 			break;
1326 		}
1327 	}
1328 
1329 	return i;
1330 }
1331 
1332 const char *
1333 raid_bdev_state_to_str(enum raid_bdev_state state)
1334 {
1335 	if (state >= RAID_BDEV_STATE_MAX) {
1336 		return "";
1337 	}
1338 
1339 	return g_raid_state_names[state];
1340 }
1341 
1342 const char *
1343 raid_bdev_process_to_str(enum raid_process_type value)
1344 {
1345 	if (value >= RAID_PROCESS_MAX) {
1346 		return "";
1347 	}
1348 
1349 	return g_raid_process_type_names[value];
1350 }
1351 
1352 /*
1353  * brief:
1354  * raid_bdev_fini_start is called when bdev layer is starting the
1355  * shutdown process
1356  * params:
1357  * none
1358  * returns:
1359  * none
1360  */
1361 static void
1362 raid_bdev_fini_start(void)
1363 {
1364 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1365 	g_shutdown_started = true;
1366 }
1367 
1368 /*
1369  * brief:
1370  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1371  * params:
1372  * none
1373  * returns:
1374  * none
1375  */
1376 static void
1377 raid_bdev_exit(void)
1378 {
1379 	struct raid_bdev *raid_bdev, *tmp;
1380 
1381 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1382 
1383 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1384 		raid_bdev_cleanup_and_free(raid_bdev);
1385 	}
1386 }
1387 
1388 static void
1389 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1390 {
1391 	spdk_json_write_object_begin(w);
1392 
1393 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1394 
1395 	spdk_json_write_named_object_begin(w, "params");
1396 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1397 	spdk_json_write_object_end(w);
1398 
1399 	spdk_json_write_object_end(w);
1400 }
1401 
1402 static int
1403 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1404 {
1405 	raid_bdev_opts_config_json(w);
1406 
1407 	return 0;
1408 }
1409 
1410 /*
1411  * brief:
1412  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1413  * module
1414  * params:
1415  * none
1416  * returns:
1417  * size of spdk_bdev_io context for raid
1418  */
1419 static int
1420 raid_bdev_get_ctx_size(void)
1421 {
1422 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1423 	return sizeof(struct raid_bdev_io);
1424 }
1425 
1426 static struct spdk_bdev_module g_raid_if = {
1427 	.name = "raid",
1428 	.module_init = raid_bdev_init,
1429 	.fini_start = raid_bdev_fini_start,
1430 	.module_fini = raid_bdev_exit,
1431 	.config_json = raid_bdev_config_json,
1432 	.get_ctx_size = raid_bdev_get_ctx_size,
1433 	.examine_disk = raid_bdev_examine,
1434 	.async_init = false,
1435 	.async_fini = false,
1436 };
1437 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1438 
1439 /*
1440  * brief:
1441  * raid_bdev_init is the initialization function for raid bdev module
1442  * params:
1443  * none
1444  * returns:
1445  * 0 - success
1446  * non zero - failure
1447  */
1448 static int
1449 raid_bdev_init(void)
1450 {
1451 	return 0;
1452 }
1453 
1454 static int
1455 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1456 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1457 		  struct raid_bdev **raid_bdev_out)
1458 {
1459 	struct raid_bdev *raid_bdev;
1460 	struct spdk_bdev *raid_bdev_gen;
1461 	struct raid_bdev_module *module;
1462 	struct raid_base_bdev_info *base_info;
1463 	uint8_t min_operational;
1464 
1465 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1466 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1467 		return -EINVAL;
1468 	}
1469 
1470 	if (raid_bdev_find_by_name(name) != NULL) {
1471 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1472 		return -EEXIST;
1473 	}
1474 
1475 	if (level == RAID1) {
1476 		if (strip_size != 0) {
1477 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1478 			return -EINVAL;
1479 		}
1480 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1481 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1482 		return -EINVAL;
1483 	}
1484 
1485 	module = raid_bdev_module_find(level);
1486 	if (module == NULL) {
1487 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1488 		return -EINVAL;
1489 	}
1490 
1491 	assert(module->base_bdevs_min != 0);
1492 	if (num_base_bdevs < module->base_bdevs_min) {
1493 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1494 			    module->base_bdevs_min,
1495 			    raid_bdev_level_to_str(level));
1496 		return -EINVAL;
1497 	}
1498 
1499 	switch (module->base_bdevs_constraint.type) {
1500 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1501 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1502 		break;
1503 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1504 		min_operational = module->base_bdevs_constraint.value;
1505 		break;
1506 	case CONSTRAINT_UNSET:
1507 		if (module->base_bdevs_constraint.value != 0) {
1508 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1509 				    (uint8_t)module->base_bdevs_constraint.value, name);
1510 			return -EINVAL;
1511 		}
1512 		min_operational = num_base_bdevs;
1513 		break;
1514 	default:
1515 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1516 			    (uint8_t)module->base_bdevs_constraint.type,
1517 			    raid_bdev_level_to_str(module->level));
1518 		return -EINVAL;
1519 	};
1520 
1521 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1522 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1523 			    raid_bdev_level_to_str(module->level));
1524 		return -EINVAL;
1525 	}
1526 
1527 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1528 	if (!raid_bdev) {
1529 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1530 		return -ENOMEM;
1531 	}
1532 
1533 	spdk_spin_init(&raid_bdev->base_bdev_lock);
1534 	raid_bdev->module = module;
1535 	raid_bdev->num_base_bdevs = num_base_bdevs;
1536 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1537 					   sizeof(struct raid_base_bdev_info));
1538 	if (!raid_bdev->base_bdev_info) {
1539 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1540 		raid_bdev_free(raid_bdev);
1541 		return -ENOMEM;
1542 	}
1543 
1544 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1545 		base_info->raid_bdev = raid_bdev;
1546 	}
1547 
1548 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1549 	 * internally and set later.
1550 	 */
1551 	raid_bdev->strip_size = 0;
1552 	raid_bdev->strip_size_kb = strip_size;
1553 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1554 	raid_bdev->level = level;
1555 	raid_bdev->min_base_bdevs_operational = min_operational;
1556 	raid_bdev->superblock_enabled = superblock_enabled;
1557 
1558 	raid_bdev_gen = &raid_bdev->bdev;
1559 
1560 	raid_bdev_gen->name = strdup(name);
1561 	if (!raid_bdev_gen->name) {
1562 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1563 		raid_bdev_free(raid_bdev);
1564 		return -ENOMEM;
1565 	}
1566 
1567 	raid_bdev_gen->product_name = "Raid Volume";
1568 	raid_bdev_gen->ctxt = raid_bdev;
1569 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1570 	raid_bdev_gen->module = &g_raid_if;
1571 	raid_bdev_gen->write_cache = 0;
1572 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1573 
1574 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1575 
1576 	*raid_bdev_out = raid_bdev;
1577 
1578 	return 0;
1579 }
1580 
1581 /*
1582  * brief:
1583  * raid_bdev_create allocates raid bdev based on passed configuration
1584  * params:
1585  * name - name for raid bdev
1586  * strip_size - strip size in KB
1587  * num_base_bdevs - number of base bdevs
1588  * level - raid level
1589  * superblock_enabled - true if raid should have superblock
1590  * uuid - uuid to set for the bdev
1591  * raid_bdev_out - the created raid bdev
1592  * returns:
1593  * 0 - success
1594  * non zero - failure
1595  */
1596 int
1597 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1598 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1599 		 struct raid_bdev **raid_bdev_out)
1600 {
1601 	struct raid_bdev *raid_bdev;
1602 	int rc;
1603 
1604 	assert(uuid != NULL);
1605 
1606 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1607 			       &raid_bdev);
1608 	if (rc != 0) {
1609 		return rc;
1610 	}
1611 
1612 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1613 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1614 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1615 	}
1616 
1617 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1618 
1619 	*raid_bdev_out = raid_bdev;
1620 
1621 	return 0;
1622 }
1623 
1624 static void
1625 _raid_bdev_unregistering_cont(void *ctx)
1626 {
1627 	struct raid_bdev *raid_bdev = ctx;
1628 
1629 	spdk_bdev_close(raid_bdev->self_desc);
1630 	raid_bdev->self_desc = NULL;
1631 }
1632 
1633 static void
1634 raid_bdev_unregistering_cont(void *ctx)
1635 {
1636 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1637 }
1638 
1639 static int
1640 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1641 {
1642 	struct raid_process_finish_action *finish_action;
1643 
1644 	assert(spdk_get_thread() == process->thread);
1645 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1646 
1647 	finish_action = calloc(1, sizeof(*finish_action));
1648 	if (finish_action == NULL) {
1649 		return -ENOMEM;
1650 	}
1651 
1652 	finish_action->cb = cb;
1653 	finish_action->cb_ctx = cb_ctx;
1654 
1655 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1656 
1657 	return 0;
1658 }
1659 
1660 static void
1661 raid_bdev_unregistering_stop_process(void *ctx)
1662 {
1663 	struct raid_bdev_process *process = ctx;
1664 	struct raid_bdev *raid_bdev = process->raid_bdev;
1665 	int rc;
1666 
1667 	process->state = RAID_PROCESS_STATE_STOPPING;
1668 	if (process->status == 0) {
1669 		process->status = -ECANCELED;
1670 	}
1671 
1672 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1673 	if (rc != 0) {
1674 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1675 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1676 	}
1677 }
1678 
1679 static void
1680 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1681 {
1682 	struct raid_bdev *raid_bdev = event_ctx;
1683 
1684 	if (type == SPDK_BDEV_EVENT_REMOVE) {
1685 		if (raid_bdev->process != NULL) {
1686 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1687 					     raid_bdev->process);
1688 		} else {
1689 			raid_bdev_unregistering_cont(raid_bdev);
1690 		}
1691 	}
1692 }
1693 
1694 static void
1695 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1696 {
1697 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1698 	int rc;
1699 
1700 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1701 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1702 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1703 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1704 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1705 				sizeof(struct raid_bdev_io_channel),
1706 				raid_bdev_gen->name);
1707 	rc = spdk_bdev_register(raid_bdev_gen);
1708 	if (rc != 0) {
1709 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1710 			    raid_bdev_gen->name, spdk_strerror(-rc));
1711 		goto err;
1712 	}
1713 
1714 	/*
1715 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1716 	 * first. The process may still need to unquiesce a range but it will fail because the
1717 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1718 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1719 	 * so this is the only way currently to do this correctly.
1720 	 * TODO: try to handle this correctly in bdev layer instead.
1721 	 */
1722 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1723 				&raid_bdev->self_desc);
1724 	if (rc != 0) {
1725 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1726 			    raid_bdev_gen->name, spdk_strerror(-rc));
1727 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1728 		goto err;
1729 	}
1730 
1731 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1732 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1733 		      raid_bdev_gen->name, raid_bdev);
1734 	return;
1735 err:
1736 	if (raid_bdev->module->stop != NULL) {
1737 		raid_bdev->module->stop(raid_bdev);
1738 	}
1739 	spdk_io_device_unregister(raid_bdev, NULL);
1740 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1741 }
1742 
1743 static void
1744 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1745 {
1746 	if (status == 0) {
1747 		raid_bdev_configure_cont(raid_bdev);
1748 	} else {
1749 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1750 			    raid_bdev->bdev.name, spdk_strerror(-status));
1751 		if (raid_bdev->module->stop != NULL) {
1752 			raid_bdev->module->stop(raid_bdev);
1753 		}
1754 	}
1755 }
1756 
1757 /*
1758  * brief:
1759  * If raid bdev config is complete, then only register the raid bdev to
1760  * bdev layer and remove this raid bdev from configuring list and
1761  * insert the raid bdev to configured list
1762  * params:
1763  * raid_bdev - pointer to raid bdev
1764  * returns:
1765  * 0 - success
1766  * non zero - failure
1767  */
1768 static int
1769 raid_bdev_configure(struct raid_bdev *raid_bdev)
1770 {
1771 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1772 	int rc;
1773 
1774 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1775 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1776 	assert(raid_bdev->bdev.blocklen > 0);
1777 
1778 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1779 	 * internal use.
1780 	 */
1781 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1782 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1783 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1784 		return -EINVAL;
1785 	}
1786 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1787 	raid_bdev->blocklen_shift = spdk_u32log2(data_block_size);
1788 
1789 	rc = raid_bdev->module->start(raid_bdev);
1790 	if (rc != 0) {
1791 		SPDK_ERRLOG("raid module startup callback failed\n");
1792 		return rc;
1793 	}
1794 
1795 	if (raid_bdev->superblock_enabled) {
1796 		if (raid_bdev->sb == NULL) {
1797 			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1798 			if (rc == 0) {
1799 				raid_bdev_init_superblock(raid_bdev);
1800 			}
1801 		} else {
1802 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1803 			if (raid_bdev->sb->block_size != data_block_size) {
1804 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1805 				rc = -EINVAL;
1806 			}
1807 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1808 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1809 				rc = -EINVAL;
1810 			}
1811 		}
1812 
1813 		if (rc != 0) {
1814 			if (raid_bdev->module->stop != NULL) {
1815 				raid_bdev->module->stop(raid_bdev);
1816 			}
1817 			return rc;
1818 		}
1819 
1820 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1821 	} else {
1822 		raid_bdev_configure_cont(raid_bdev);
1823 	}
1824 
1825 	return 0;
1826 }
1827 
1828 /*
1829  * brief:
1830  * If raid bdev is online and registered, change the bdev state to
1831  * configuring and unregister this raid device. Queue this raid device
1832  * in configuring list
1833  * params:
1834  * raid_bdev - pointer to raid bdev
1835  * cb_fn - callback function
1836  * cb_arg - argument to callback function
1837  * returns:
1838  * none
1839  */
1840 static void
1841 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1842 		      void *cb_arg)
1843 {
1844 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1845 		if (cb_fn) {
1846 			cb_fn(cb_arg, 0);
1847 		}
1848 		return;
1849 	}
1850 
1851 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1852 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1853 
1854 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1855 }
1856 
1857 /*
1858  * brief:
1859  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1860  * params:
1861  * base_bdev - pointer to base bdev
1862  * returns:
1863  * base bdev info if found, otherwise NULL.
1864  */
1865 static struct raid_base_bdev_info *
1866 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1867 {
1868 	struct raid_bdev *raid_bdev;
1869 	struct raid_base_bdev_info *base_info;
1870 
1871 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1872 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1873 			if (base_info->desc != NULL &&
1874 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1875 				return base_info;
1876 			}
1877 		}
1878 	}
1879 
1880 	return NULL;
1881 }
1882 
1883 static void
1884 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1885 {
1886 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1887 
1888 	assert(base_info->remove_scheduled);
1889 	base_info->remove_scheduled = false;
1890 
1891 	if (status == 0) {
1892 		raid_bdev->num_base_bdevs_operational--;
1893 		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1894 			/* There is not enough base bdevs to keep the raid bdev operational. */
1895 			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1896 			return;
1897 		}
1898 	}
1899 
1900 	if (base_info->remove_cb != NULL) {
1901 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1902 	}
1903 }
1904 
1905 static void
1906 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1907 {
1908 	struct raid_base_bdev_info *base_info = ctx;
1909 
1910 	if (status != 0) {
1911 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1912 			    raid_bdev->bdev.name, spdk_strerror(-status));
1913 	}
1914 
1915 	raid_bdev_remove_base_bdev_done(base_info, status);
1916 }
1917 
1918 static void
1919 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1920 {
1921 	struct raid_base_bdev_info *base_info = ctx;
1922 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1923 
1924 	if (status != 0) {
1925 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1926 			    raid_bdev->bdev.name, spdk_strerror(-status));
1927 		goto out;
1928 	}
1929 
1930 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1931 	raid_bdev_free_base_bdev_resource(base_info);
1932 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1933 
1934 	if (raid_bdev->sb) {
1935 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1936 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1937 		uint8_t i;
1938 
1939 		for (i = 0; i < sb->base_bdevs_size; i++) {
1940 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
1941 
1942 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1943 			    sb_base_bdev->slot == slot) {
1944 				/* TODO: distinguish between failure and intentional removal */
1945 				sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1946 
1947 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1948 				return;
1949 			}
1950 		}
1951 	}
1952 out:
1953 	raid_bdev_remove_base_bdev_done(base_info, status);
1954 }
1955 
1956 static void
1957 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1958 {
1959 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1960 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1961 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1962 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1963 
1964 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1965 
1966 	if (raid_ch->base_channel[idx] != NULL) {
1967 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1968 		raid_ch->base_channel[idx] = NULL;
1969 	}
1970 
1971 	if (raid_ch->process.ch_processed != NULL) {
1972 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1973 	}
1974 
1975 	spdk_for_each_channel_continue(i, 0);
1976 }
1977 
1978 static void
1979 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1980 {
1981 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1982 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1983 
1984 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1985 			    base_info);
1986 }
1987 
1988 static void
1989 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1990 {
1991 	struct raid_base_bdev_info *base_info = ctx;
1992 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1993 
1994 	if (status != 0) {
1995 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
1996 			    raid_bdev->bdev.name, spdk_strerror(-status));
1997 		raid_bdev_remove_base_bdev_done(base_info, status);
1998 		return;
1999 	}
2000 
2001 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
2002 			      raid_bdev_channels_remove_base_bdev_done);
2003 }
2004 
2005 static int
2006 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2007 {
2008 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2009 
2010 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2011 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2012 }
2013 
2014 struct raid_bdev_process_base_bdev_remove_ctx {
2015 	struct raid_bdev_process *process;
2016 	struct raid_base_bdev_info *base_info;
2017 	uint8_t num_base_bdevs_operational;
2018 };
2019 
2020 static void
2021 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2022 {
2023 	struct raid_base_bdev_info *base_info = ctx;
2024 	int ret;
2025 
2026 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2027 	if (ret != 0) {
2028 		raid_bdev_remove_base_bdev_done(base_info, ret);
2029 	}
2030 }
2031 
2032 static void
2033 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2034 {
2035 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2036 	struct raid_base_bdev_info *base_info = ctx->base_info;
2037 
2038 	free(ctx);
2039 
2040 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2041 			     base_info);
2042 }
2043 
2044 static void
2045 _raid_bdev_process_base_bdev_remove(void *_ctx)
2046 {
2047 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2048 	struct raid_bdev_process *process = ctx->process;
2049 	int ret;
2050 
2051 	if (ctx->base_info != process->target &&
2052 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2053 		/* process doesn't need to be stopped */
2054 		raid_bdev_process_base_bdev_remove_cont(ctx);
2055 		return;
2056 	}
2057 
2058 	assert(process->state > RAID_PROCESS_STATE_INIT &&
2059 	       process->state < RAID_PROCESS_STATE_STOPPED);
2060 
2061 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2062 	if (ret != 0) {
2063 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2064 		free(ctx);
2065 		return;
2066 	}
2067 
2068 	process->state = RAID_PROCESS_STATE_STOPPING;
2069 
2070 	if (process->status == 0) {
2071 		process->status = -ENODEV;
2072 	}
2073 }
2074 
2075 static int
2076 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2077 				   struct raid_base_bdev_info *base_info)
2078 {
2079 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2080 
2081 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2082 
2083 	ctx = calloc(1, sizeof(*ctx));
2084 	if (ctx == NULL) {
2085 		return -ENOMEM;
2086 	}
2087 
2088 	/*
2089 	 * We have to send the process and num_base_bdevs_operational in the message ctx
2090 	 * because the process thread should not access raid_bdev's properties. Particularly,
2091 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2092 	 * will still be valid until the process is fully stopped.
2093 	 */
2094 	ctx->base_info = base_info;
2095 	ctx->process = process;
2096 	/*
2097 	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2098 	 * after the removal and more than one base bdev may be removed at the same time
2099 	 */
2100 	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2101 		if (!base_info->remove_scheduled && base_info->desc != NULL) {
2102 			ctx->num_base_bdevs_operational++;
2103 		}
2104 	}
2105 
2106 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2107 
2108 	return 0;
2109 }
2110 
2111 static int
2112 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2113 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2114 {
2115 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2116 	int ret = 0;
2117 
2118 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2119 
2120 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2121 
2122 	if (base_info->remove_scheduled) {
2123 		return -ENODEV;
2124 	}
2125 
2126 	assert(base_info->desc);
2127 	base_info->remove_scheduled = true;
2128 
2129 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2130 		/*
2131 		 * As raid bdev is not registered yet or already unregistered,
2132 		 * so cleanup should be done here itself.
2133 		 *
2134 		 * Removing a base bdev at this stage does not change the number of operational
2135 		 * base bdevs, only the number of discovered base bdevs.
2136 		 */
2137 		raid_bdev_free_base_bdev_resource(base_info);
2138 		base_info->remove_scheduled = false;
2139 		if (raid_bdev->num_base_bdevs_discovered == 0) {
2140 			/* There is no base bdev for this raid, so free the raid device. */
2141 			raid_bdev_cleanup_and_free(raid_bdev);
2142 		}
2143 		if (cb_fn != NULL) {
2144 			cb_fn(cb_ctx, 0);
2145 		}
2146 	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2147 		/* This raid bdev does not tolerate removing a base bdev. */
2148 		raid_bdev->num_base_bdevs_operational--;
2149 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2150 	} else {
2151 		base_info->remove_cb = cb_fn;
2152 		base_info->remove_cb_ctx = cb_ctx;
2153 
2154 		if (raid_bdev->process != NULL) {
2155 			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2156 		} else {
2157 			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2158 		}
2159 
2160 		if (ret != 0) {
2161 			base_info->remove_scheduled = false;
2162 		}
2163 	}
2164 
2165 	return ret;
2166 }
2167 
2168 /*
2169  * brief:
2170  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2171  * is removed. This function checks if this base bdev is part of any raid bdev
2172  * or not. If yes, it takes necessary action on that particular raid bdev.
2173  * params:
2174  * base_bdev - pointer to base bdev which got removed
2175  * cb_fn - callback function
2176  * cb_arg - argument to callback function
2177  * returns:
2178  * 0 - success
2179  * non zero - failure
2180  */
2181 int
2182 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2183 {
2184 	struct raid_base_bdev_info *base_info;
2185 
2186 	/* Find the raid_bdev which has claimed this base_bdev */
2187 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2188 	if (!base_info) {
2189 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2190 		return -ENODEV;
2191 	}
2192 
2193 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2194 }
2195 
2196 static void
2197 raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2198 {
2199 	if (status != 0) {
2200 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2201 			    raid_bdev->bdev.name, spdk_strerror(-status));
2202 	}
2203 }
2204 
2205 /*
2206  * brief:
2207  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2208  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2209  * If yes, call module handler to resize the raid_bdev if implemented.
2210  * params:
2211  * base_bdev - pointer to base bdev which got resized.
2212  * returns:
2213  * none
2214  */
2215 static void
2216 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2217 {
2218 	struct raid_bdev *raid_bdev;
2219 	struct raid_base_bdev_info *base_info;
2220 	uint64_t blockcnt_old;
2221 
2222 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2223 
2224 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2225 
2226 	/* Find the raid_bdev which has claimed this base_bdev */
2227 	if (!base_info) {
2228 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2229 		return;
2230 	}
2231 	raid_bdev = base_info->raid_bdev;
2232 
2233 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2234 
2235 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2236 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2237 
2238 	base_info->blockcnt = base_bdev->blockcnt;
2239 
2240 	if (!raid_bdev->module->resize) {
2241 		return;
2242 	}
2243 
2244 	blockcnt_old = raid_bdev->bdev.blockcnt;
2245 	if (raid_bdev->module->resize(raid_bdev) == false) {
2246 		return;
2247 	}
2248 
2249 	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2250 		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2251 
2252 	if (raid_bdev->superblock_enabled) {
2253 		struct raid_bdev_superblock *sb = raid_bdev->sb;
2254 		uint8_t i;
2255 
2256 		for (i = 0; i < sb->base_bdevs_size; i++) {
2257 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2258 
2259 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
2260 				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2261 				sb_base_bdev->data_size = base_info->data_size;
2262 			}
2263 		}
2264 		sb->raid_size = raid_bdev->bdev.blockcnt;
2265 		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2266 	}
2267 }
2268 
2269 /*
2270  * brief:
2271  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2272  * triggers asynchronous event.
2273  * params:
2274  * type - event details.
2275  * bdev - bdev that triggered event.
2276  * event_ctx - context for event.
2277  * returns:
2278  * none
2279  */
2280 static void
2281 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2282 			  void *event_ctx)
2283 {
2284 	int rc;
2285 
2286 	switch (type) {
2287 	case SPDK_BDEV_EVENT_REMOVE:
2288 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2289 		if (rc != 0) {
2290 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2291 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2292 		}
2293 		break;
2294 	case SPDK_BDEV_EVENT_RESIZE:
2295 		raid_bdev_resize_base_bdev(bdev);
2296 		break;
2297 	default:
2298 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2299 		break;
2300 	}
2301 }
2302 
2303 /*
2304  * brief:
2305  * Deletes the specified raid bdev
2306  * params:
2307  * raid_bdev - pointer to raid bdev
2308  * cb_fn - callback function
2309  * cb_arg - argument to callback function
2310  */
2311 void
2312 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2313 {
2314 	struct raid_base_bdev_info *base_info;
2315 
2316 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2317 
2318 	if (raid_bdev->destroy_started) {
2319 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2320 			      raid_bdev->bdev.name);
2321 		if (cb_fn) {
2322 			cb_fn(cb_arg, -EALREADY);
2323 		}
2324 		return;
2325 	}
2326 
2327 	raid_bdev->destroy_started = true;
2328 
2329 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2330 		base_info->remove_scheduled = true;
2331 
2332 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2333 			/*
2334 			 * As raid bdev is not registered yet or already unregistered,
2335 			 * so cleanup should be done here itself.
2336 			 */
2337 			raid_bdev_free_base_bdev_resource(base_info);
2338 		}
2339 	}
2340 
2341 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2342 		/* There is no base bdev for this raid, so free the raid device. */
2343 		raid_bdev_cleanup_and_free(raid_bdev);
2344 		if (cb_fn) {
2345 			cb_fn(cb_arg, 0);
2346 		}
2347 	} else {
2348 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2349 	}
2350 }
2351 
2352 static void
2353 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2354 {
2355 	if (status != 0) {
2356 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2357 			    raid_bdev->bdev.name, spdk_strerror(-status));
2358 	}
2359 }
2360 
2361 static void
2362 raid_bdev_process_finish_write_sb(void *ctx)
2363 {
2364 	struct raid_bdev *raid_bdev = ctx;
2365 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2366 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2367 	struct raid_base_bdev_info *base_info;
2368 	uint8_t i;
2369 
2370 	for (i = 0; i < sb->base_bdevs_size; i++) {
2371 		sb_base_bdev = &sb->base_bdevs[i];
2372 
2373 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2374 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2375 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2376 			if (base_info->is_configured) {
2377 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2378 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2379 			}
2380 		}
2381 	}
2382 
2383 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2384 }
2385 
2386 static void raid_bdev_process_free(struct raid_bdev_process *process);
2387 
2388 static void
2389 _raid_bdev_process_finish_done(void *ctx)
2390 {
2391 	struct raid_bdev_process *process = ctx;
2392 	struct raid_process_finish_action *finish_action;
2393 
2394 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2395 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2396 		finish_action->cb(finish_action->cb_ctx);
2397 		free(finish_action);
2398 	}
2399 
2400 	raid_bdev_process_free(process);
2401 
2402 	spdk_thread_exit(spdk_get_thread());
2403 }
2404 
2405 static void
2406 raid_bdev_process_finish_target_removed(void *ctx, int status)
2407 {
2408 	struct raid_bdev_process *process = ctx;
2409 
2410 	if (status != 0) {
2411 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2412 	}
2413 
2414 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2415 }
2416 
2417 static void
2418 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2419 {
2420 	struct raid_bdev_process *process = ctx;
2421 
2422 	if (status != 0) {
2423 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2424 	}
2425 
2426 	if (process->status != 0) {
2427 		struct raid_base_bdev_info *target = process->target;
2428 
2429 		if (target->desc != NULL && target->remove_scheduled == false) {
2430 			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
2431 			return;
2432 		}
2433 	}
2434 
2435 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2436 }
2437 
2438 static void
2439 raid_bdev_process_finish_unquiesce(void *ctx)
2440 {
2441 	struct raid_bdev_process *process = ctx;
2442 	int rc;
2443 
2444 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2445 				 raid_bdev_process_finish_unquiesced, process);
2446 	if (rc != 0) {
2447 		raid_bdev_process_finish_unquiesced(process, rc);
2448 	}
2449 }
2450 
2451 static void
2452 raid_bdev_process_finish_done(void *ctx)
2453 {
2454 	struct raid_bdev_process *process = ctx;
2455 	struct raid_bdev *raid_bdev = process->raid_bdev;
2456 
2457 	if (process->raid_ch != NULL) {
2458 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2459 	}
2460 
2461 	process->state = RAID_PROCESS_STATE_STOPPED;
2462 
2463 	if (process->status == 0) {
2464 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2465 			       raid_bdev_process_to_str(process->type),
2466 			       raid_bdev->bdev.name);
2467 		if (raid_bdev->superblock_enabled) {
2468 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2469 					     raid_bdev_process_finish_write_sb,
2470 					     raid_bdev);
2471 		}
2472 	} else {
2473 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2474 			     raid_bdev_process_to_str(process->type),
2475 			     raid_bdev->bdev.name,
2476 			     spdk_strerror(-process->status));
2477 	}
2478 
2479 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2480 			     process);
2481 }
2482 
2483 static void
2484 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2485 {
2486 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2487 
2488 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2489 }
2490 
2491 static void
2492 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2493 {
2494 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2495 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2496 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2497 
2498 	if (process->status == 0) {
2499 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2500 
2501 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2502 		raid_ch->process.target_ch = NULL;
2503 	}
2504 
2505 	raid_bdev_ch_process_cleanup(raid_ch);
2506 
2507 	spdk_for_each_channel_continue(i, 0);
2508 }
2509 
2510 static void
2511 raid_bdev_process_finish_quiesced(void *ctx, int status)
2512 {
2513 	struct raid_bdev_process *process = ctx;
2514 	struct raid_bdev *raid_bdev = process->raid_bdev;
2515 
2516 	if (status != 0) {
2517 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2518 		return;
2519 	}
2520 
2521 	raid_bdev->process = NULL;
2522 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2523 			      __raid_bdev_process_finish);
2524 }
2525 
2526 static void
2527 _raid_bdev_process_finish(void *ctx)
2528 {
2529 	struct raid_bdev_process *process = ctx;
2530 	int rc;
2531 
2532 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2533 			       raid_bdev_process_finish_quiesced, process);
2534 	if (rc != 0) {
2535 		raid_bdev_process_finish_quiesced(ctx, rc);
2536 	}
2537 }
2538 
2539 static void
2540 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2541 {
2542 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2543 }
2544 
2545 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2546 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2547 
2548 static void
2549 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2550 {
2551 	assert(spdk_get_thread() == process->thread);
2552 
2553 	if (process->status == 0) {
2554 		process->status = status;
2555 	}
2556 
2557 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2558 		return;
2559 	}
2560 
2561 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2562 	process->state = RAID_PROCESS_STATE_STOPPING;
2563 
2564 	if (process->window_range_locked) {
2565 		raid_bdev_process_unlock_window_range(process);
2566 	} else {
2567 		raid_bdev_process_thread_run(process);
2568 	}
2569 }
2570 
2571 static void
2572 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2573 {
2574 	struct raid_bdev_process *process = ctx;
2575 
2576 	if (status != 0) {
2577 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2578 		raid_bdev_process_finish(process, status);
2579 		return;
2580 	}
2581 
2582 	process->window_range_locked = false;
2583 	process->window_offset += process->window_size;
2584 
2585 	raid_bdev_process_thread_run(process);
2586 }
2587 
2588 static void
2589 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2590 {
2591 	int rc;
2592 
2593 	assert(process->window_range_locked == true);
2594 
2595 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2596 				       process->window_offset, process->max_window_size,
2597 				       raid_bdev_process_window_range_unlocked, process);
2598 	if (rc != 0) {
2599 		raid_bdev_process_window_range_unlocked(process, rc);
2600 	}
2601 }
2602 
2603 static void
2604 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2605 {
2606 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2607 
2608 	raid_bdev_process_unlock_window_range(process);
2609 }
2610 
2611 static void
2612 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2613 {
2614 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2615 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2616 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2617 
2618 	raid_ch->process.offset = process->window_offset + process->window_size;
2619 
2620 	spdk_for_each_channel_continue(i, 0);
2621 }
2622 
2623 void
2624 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2625 {
2626 	struct raid_bdev_process *process = process_req->process;
2627 
2628 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2629 
2630 	assert(spdk_get_thread() == process->thread);
2631 	assert(process->window_remaining >= process_req->num_blocks);
2632 
2633 	if (status != 0) {
2634 		process->window_status = status;
2635 	}
2636 
2637 	process->window_remaining -= process_req->num_blocks;
2638 	if (process->window_remaining == 0) {
2639 		if (process->window_status != 0) {
2640 			raid_bdev_process_finish(process, process->window_status);
2641 			return;
2642 		}
2643 
2644 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2645 				      raid_bdev_process_channels_update_done);
2646 	}
2647 }
2648 
2649 static int
2650 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2651 				 uint32_t num_blocks)
2652 {
2653 	struct raid_bdev *raid_bdev = process->raid_bdev;
2654 	struct raid_bdev_process_request *process_req;
2655 	int ret;
2656 
2657 	process_req = TAILQ_FIRST(&process->requests);
2658 	if (process_req == NULL) {
2659 		assert(process->window_remaining > 0);
2660 		return 0;
2661 	}
2662 
2663 	process_req->target = process->target;
2664 	process_req->target_ch = process->raid_ch->process.target_ch;
2665 	process_req->offset_blocks = offset_blocks;
2666 	process_req->num_blocks = num_blocks;
2667 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2668 
2669 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2670 	if (ret <= 0) {
2671 		if (ret < 0) {
2672 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2673 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2674 			process->window_status = ret;
2675 		}
2676 		return ret;
2677 	}
2678 
2679 	process_req->num_blocks = ret;
2680 	TAILQ_REMOVE(&process->requests, process_req, link);
2681 
2682 	return ret;
2683 }
2684 
2685 static void
2686 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2687 {
2688 	struct raid_bdev *raid_bdev = process->raid_bdev;
2689 	uint64_t offset = process->window_offset;
2690 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2691 	int ret;
2692 
2693 	while (offset < offset_end) {
2694 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2695 		if (ret <= 0) {
2696 			break;
2697 		}
2698 
2699 		process->window_remaining += ret;
2700 		offset += ret;
2701 	}
2702 
2703 	if (process->window_remaining > 0) {
2704 		process->window_size = process->window_remaining;
2705 	} else {
2706 		raid_bdev_process_finish(process, process->window_status);
2707 	}
2708 }
2709 
2710 static void
2711 raid_bdev_process_window_range_locked(void *ctx, int status)
2712 {
2713 	struct raid_bdev_process *process = ctx;
2714 
2715 	if (status != 0) {
2716 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2717 		raid_bdev_process_finish(process, status);
2718 		return;
2719 	}
2720 
2721 	process->window_range_locked = true;
2722 
2723 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2724 		raid_bdev_process_unlock_window_range(process);
2725 		return;
2726 	}
2727 
2728 	_raid_bdev_process_thread_run(process);
2729 }
2730 
2731 static void
2732 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2733 {
2734 	struct raid_bdev *raid_bdev = process->raid_bdev;
2735 	int rc;
2736 
2737 	assert(spdk_get_thread() == process->thread);
2738 	assert(process->window_remaining == 0);
2739 	assert(process->window_range_locked == false);
2740 
2741 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2742 		raid_bdev_process_do_finish(process);
2743 		return;
2744 	}
2745 
2746 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2747 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2748 		raid_bdev_process_finish(process, 0);
2749 		return;
2750 	}
2751 
2752 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2753 					    process->max_window_size);
2754 
2755 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2756 				     process->window_offset, process->max_window_size,
2757 				     raid_bdev_process_window_range_locked, process);
2758 	if (rc != 0) {
2759 		raid_bdev_process_window_range_locked(process, rc);
2760 	}
2761 }
2762 
2763 static void
2764 raid_bdev_process_thread_init(void *ctx)
2765 {
2766 	struct raid_bdev_process *process = ctx;
2767 	struct raid_bdev *raid_bdev = process->raid_bdev;
2768 	struct spdk_io_channel *ch;
2769 
2770 	process->thread = spdk_get_thread();
2771 
2772 	ch = spdk_get_io_channel(raid_bdev);
2773 	if (ch == NULL) {
2774 		process->status = -ENOMEM;
2775 		raid_bdev_process_do_finish(process);
2776 		return;
2777 	}
2778 
2779 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2780 	process->state = RAID_PROCESS_STATE_RUNNING;
2781 
2782 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2783 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2784 
2785 	raid_bdev_process_thread_run(process);
2786 }
2787 
2788 static void
2789 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2790 {
2791 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2792 
2793 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2794 	raid_bdev_process_free(process);
2795 
2796 	/* TODO: update sb */
2797 }
2798 
2799 static void
2800 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2801 {
2802 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2803 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2804 
2805 	raid_bdev_ch_process_cleanup(raid_ch);
2806 
2807 	spdk_for_each_channel_continue(i, 0);
2808 }
2809 
2810 static void
2811 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2812 {
2813 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2814 	struct raid_bdev *raid_bdev = process->raid_bdev;
2815 	struct spdk_thread *thread;
2816 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2817 
2818 	if (status != 0) {
2819 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2820 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2821 			    spdk_strerror(-status));
2822 		goto err;
2823 	}
2824 
2825 	/* TODO: we may need to abort if a base bdev was removed before we got here */
2826 
2827 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2828 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2829 
2830 	thread = spdk_thread_create(thread_name, NULL);
2831 	if (thread == NULL) {
2832 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2833 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2834 		goto err;
2835 	}
2836 
2837 	raid_bdev->process = process;
2838 
2839 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2840 
2841 	return;
2842 err:
2843 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2844 			      raid_bdev_channels_abort_start_process_done);
2845 }
2846 
2847 static void
2848 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2849 {
2850 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2851 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2852 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2853 	int rc;
2854 
2855 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2856 
2857 	spdk_for_each_channel_continue(i, rc);
2858 }
2859 
2860 static void
2861 raid_bdev_process_start(struct raid_bdev_process *process)
2862 {
2863 	struct raid_bdev *raid_bdev = process->raid_bdev;
2864 
2865 	assert(raid_bdev->module->submit_process_request != NULL);
2866 
2867 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2868 			      raid_bdev_channels_start_process_done);
2869 }
2870 
2871 static void
2872 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2873 {
2874 	spdk_dma_free(process_req->iov.iov_base);
2875 	spdk_dma_free(process_req->md_buf);
2876 	free(process_req);
2877 }
2878 
2879 static struct raid_bdev_process_request *
2880 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2881 {
2882 	struct raid_bdev *raid_bdev = process->raid_bdev;
2883 	struct raid_bdev_process_request *process_req;
2884 
2885 	process_req = calloc(1, sizeof(*process_req));
2886 	if (process_req == NULL) {
2887 		return NULL;
2888 	}
2889 
2890 	process_req->process = process;
2891 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2892 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2893 	if (process_req->iov.iov_base == NULL) {
2894 		free(process_req);
2895 		return NULL;
2896 	}
2897 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2898 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2899 		if (process_req->md_buf == NULL) {
2900 			raid_bdev_process_request_free(process_req);
2901 			return NULL;
2902 		}
2903 	}
2904 
2905 	return process_req;
2906 }
2907 
2908 static void
2909 raid_bdev_process_free(struct raid_bdev_process *process)
2910 {
2911 	struct raid_bdev_process_request *process_req;
2912 
2913 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2914 		TAILQ_REMOVE(&process->requests, process_req, link);
2915 		raid_bdev_process_request_free(process_req);
2916 	}
2917 
2918 	free(process);
2919 }
2920 
2921 static struct raid_bdev_process *
2922 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2923 			struct raid_base_bdev_info *target)
2924 {
2925 	struct raid_bdev_process *process;
2926 	struct raid_bdev_process_request *process_req;
2927 	int i;
2928 
2929 	process = calloc(1, sizeof(*process));
2930 	if (process == NULL) {
2931 		return NULL;
2932 	}
2933 
2934 	process->raid_bdev = raid_bdev;
2935 	process->type = type;
2936 	process->target = target;
2937 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2938 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
2939 					    raid_bdev->bdev.write_unit_size);
2940 	TAILQ_INIT(&process->requests);
2941 	TAILQ_INIT(&process->finish_actions);
2942 
2943 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2944 		process_req = raid_bdev_process_alloc_request(process);
2945 		if (process_req == NULL) {
2946 			raid_bdev_process_free(process);
2947 			return NULL;
2948 		}
2949 
2950 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2951 	}
2952 
2953 	return process;
2954 }
2955 
2956 static int
2957 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2958 {
2959 	struct raid_bdev_process *process;
2960 
2961 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2962 
2963 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2964 	if (process == NULL) {
2965 		return -ENOMEM;
2966 	}
2967 
2968 	raid_bdev_process_start(process);
2969 
2970 	return 0;
2971 }
2972 
2973 static void
2974 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
2975 {
2976 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2977 	int rc;
2978 
2979 	/* TODO: defer if rebuild in progress on another base bdev */
2980 	assert(raid_bdev->process == NULL);
2981 
2982 	base_info->is_configured = true;
2983 
2984 	raid_bdev->num_base_bdevs_discovered++;
2985 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
2986 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
2987 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
2988 
2989 	/*
2990 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
2991 	 * of base bdevs we know to be operational members of the array. Usually this is equal
2992 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
2993 	 * degraded.
2994 	 */
2995 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
2996 		rc = raid_bdev_configure(raid_bdev);
2997 		if (rc != 0) {
2998 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
2999 		}
3000 	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
3001 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3002 		raid_bdev->num_base_bdevs_operational++;
3003 		rc = raid_bdev_start_rebuild(base_info);
3004 		if (rc != 0) {
3005 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3006 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3007 		}
3008 	} else {
3009 		rc = 0;
3010 	}
3011 
3012 	if (base_info->configure_cb != NULL) {
3013 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
3014 	}
3015 }
3016 
3017 static void
3018 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3019 		void *ctx)
3020 {
3021 	struct raid_base_bdev_info *base_info = ctx;
3022 
3023 	switch (status) {
3024 	case 0:
3025 		/* valid superblock found */
3026 		SPDK_ERRLOG("Existing raid superblock found on bdev %s\n", base_info->name);
3027 		status = -EEXIST;
3028 		raid_bdev_free_base_bdev_resource(base_info);
3029 		break;
3030 	case -EINVAL:
3031 		/* no valid superblock */
3032 		raid_bdev_configure_base_bdev_cont(base_info);
3033 		return;
3034 	default:
3035 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3036 			    base_info->name, spdk_strerror(-status));
3037 		break;
3038 	}
3039 
3040 	if (base_info->configure_cb != NULL) {
3041 		base_info->configure_cb(base_info->configure_cb_ctx, status);
3042 	}
3043 }
3044 
3045 static int
3046 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3047 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3048 {
3049 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3050 	struct spdk_bdev_desc *desc;
3051 	struct spdk_bdev *bdev;
3052 	const struct spdk_uuid *bdev_uuid;
3053 	int rc;
3054 
3055 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3056 	assert(base_info->desc == NULL);
3057 
3058 	/*
3059 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3060 	 * before claiming the bdev.
3061 	 */
3062 
3063 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3064 		char uuid_str[SPDK_UUID_STRING_LEN];
3065 		const char *bdev_name;
3066 
3067 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3068 
3069 		/* UUID of a bdev is registered as its alias */
3070 		bdev = spdk_bdev_get_by_name(uuid_str);
3071 		if (bdev == NULL) {
3072 			return -ENODEV;
3073 		}
3074 
3075 		bdev_name = spdk_bdev_get_name(bdev);
3076 
3077 		if (base_info->name == NULL) {
3078 			assert(existing == true);
3079 			base_info->name = strdup(bdev_name);
3080 			if (base_info->name == NULL) {
3081 				return -ENOMEM;
3082 			}
3083 		} else if (strcmp(base_info->name, bdev_name) != 0) {
3084 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3085 				    bdev_name, base_info->name);
3086 			return -EINVAL;
3087 		}
3088 	}
3089 
3090 	assert(base_info->name != NULL);
3091 
3092 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3093 	if (rc != 0) {
3094 		if (rc != -ENODEV) {
3095 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3096 		}
3097 		return rc;
3098 	}
3099 
3100 	bdev = spdk_bdev_desc_get_bdev(desc);
3101 	bdev_uuid = spdk_bdev_get_uuid(bdev);
3102 
3103 	if (spdk_uuid_is_null(&base_info->uuid)) {
3104 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3105 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3106 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3107 		spdk_bdev_close(desc);
3108 		return -EINVAL;
3109 	}
3110 
3111 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3112 	if (rc != 0) {
3113 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3114 		spdk_bdev_close(desc);
3115 		return rc;
3116 	}
3117 
3118 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3119 
3120 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3121 	if (base_info->app_thread_ch == NULL) {
3122 		SPDK_ERRLOG("Failed to get io channel\n");
3123 		spdk_bdev_module_release_bdev(bdev);
3124 		spdk_bdev_close(desc);
3125 		return -ENOMEM;
3126 	}
3127 
3128 	base_info->desc = desc;
3129 	base_info->blockcnt = bdev->blockcnt;
3130 
3131 	if (raid_bdev->superblock_enabled) {
3132 		uint64_t data_offset;
3133 
3134 		if (base_info->data_offset == 0) {
3135 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3136 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3137 		} else {
3138 			data_offset = base_info->data_offset;
3139 		}
3140 
3141 		if (bdev->optimal_io_boundary != 0) {
3142 			data_offset = spdk_divide_round_up(data_offset,
3143 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3144 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3145 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3146 					     base_info->data_offset, base_info->name, data_offset);
3147 				data_offset = base_info->data_offset;
3148 			}
3149 		}
3150 
3151 		base_info->data_offset = data_offset;
3152 	}
3153 
3154 	if (base_info->data_offset >= bdev->blockcnt) {
3155 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3156 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3157 		rc = -EINVAL;
3158 		goto out;
3159 	}
3160 
3161 	if (base_info->data_size == 0) {
3162 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3163 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3164 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3165 			    bdev->blockcnt, base_info->name);
3166 		rc = -EINVAL;
3167 		goto out;
3168 	}
3169 
3170 	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3171 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3172 			    bdev->name);
3173 		rc = -EINVAL;
3174 		goto out;
3175 	}
3176 
3177 	/*
3178 	 * Set the raid bdev properties if this is the first base bdev configured,
3179 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3180 	 * have the same blocklen and metadata format.
3181 	 */
3182 	if (raid_bdev->bdev.blocklen == 0) {
3183 		raid_bdev->bdev.blocklen = bdev->blocklen;
3184 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3185 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3186 		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3187 		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3188 		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3189 	} else {
3190 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3191 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3192 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3193 			rc = -EINVAL;
3194 			goto out;
3195 		}
3196 
3197 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3198 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3199 		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3200 		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3201 		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev)) {
3202 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3203 				    raid_bdev->bdev.name, bdev->name);
3204 			rc = -EINVAL;
3205 			goto out;
3206 		}
3207 	}
3208 
3209 	base_info->configure_cb = cb_fn;
3210 	base_info->configure_cb_ctx = cb_ctx;
3211 
3212 	if (existing) {
3213 		raid_bdev_configure_base_bdev_cont(base_info);
3214 	} else {
3215 		/* check for existing superblock when using a new bdev */
3216 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3217 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3218 		if (rc) {
3219 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3220 				    bdev->name, spdk_strerror(-rc));
3221 		}
3222 	}
3223 out:
3224 	if (rc != 0) {
3225 		raid_bdev_free_base_bdev_resource(base_info);
3226 	}
3227 	return rc;
3228 }
3229 
3230 static int
3231 _raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3232 			   uint64_t data_offset, uint64_t data_size,
3233 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3234 {
3235 	struct raid_base_bdev_info *base_info;
3236 
3237 	assert(name != NULL);
3238 
3239 	if (slot >= raid_bdev->num_base_bdevs) {
3240 		return -EINVAL;
3241 	}
3242 
3243 	base_info = &raid_bdev->base_bdev_info[slot];
3244 
3245 	if (base_info->name != NULL) {
3246 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev '%s'\n",
3247 			    slot, raid_bdev->bdev.name, base_info->name);
3248 		return -EBUSY;
3249 	}
3250 
3251 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3252 		char uuid_str[SPDK_UUID_STRING_LEN];
3253 
3254 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3255 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev with uuid %s\n",
3256 			    slot, raid_bdev->bdev.name, uuid_str);
3257 		return -EBUSY;
3258 	}
3259 
3260 	base_info->name = strdup(name);
3261 	if (base_info->name == NULL) {
3262 		return -ENOMEM;
3263 	}
3264 
3265 	base_info->data_offset = data_offset;
3266 	base_info->data_size = data_size;
3267 
3268 	return raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3269 }
3270 
3271 int
3272 raid_bdev_attach_base_bdev(struct raid_bdev *raid_bdev, struct spdk_bdev *base_bdev,
3273 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3274 {
3275 	struct raid_base_bdev_info *base_info = NULL, *iter;
3276 	int rc;
3277 
3278 	SPDK_DEBUGLOG(bdev_raid, "attach_base_device: %s\n", base_bdev->name);
3279 
3280 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3281 
3282 	if (raid_bdev->process != NULL) {
3283 		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3284 			    raid_bdev->bdev.name);
3285 		return -EPERM;
3286 	}
3287 
3288 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
3289 		SPDK_ERRLOG("raid bdev '%s' must be in online state to attach base bdev\n",
3290 			    raid_bdev->bdev.name);
3291 		return -EINVAL;
3292 	}
3293 
3294 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3295 		if (iter->desc == NULL) {
3296 			base_info = iter;
3297 			break;
3298 		}
3299 	}
3300 
3301 	if (base_info == NULL) {
3302 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3303 			    raid_bdev->bdev.name, base_bdev->name);
3304 		return -EINVAL;
3305 	}
3306 
3307 	assert(base_info->is_configured == false);
3308 	assert(base_info->data_size != 0);
3309 
3310 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
3311 
3312 	rc = _raid_bdev_add_base_device(raid_bdev, base_bdev->name,
3313 					raid_bdev_base_bdev_slot(base_info),
3314 					base_info->data_offset, base_info->data_size,
3315 					cb_fn, cb_ctx);
3316 	if (rc != 0) {
3317 		SPDK_ERRLOG("base bdev '%s' attach failed: %s\n", base_bdev->name, spdk_strerror(-rc));
3318 		raid_bdev_free_base_bdev_resource(base_info);
3319 	}
3320 
3321 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
3322 
3323 	return rc;
3324 }
3325 
3326 /*
3327  * brief:
3328  * raid_bdev_add_base_device function is the actual function which either adds
3329  * the nvme base device to existing raid bdev or create a new raid bdev. It also claims
3330  * the base device and keep the open descriptor.
3331  * params:
3332  * raid_bdev - pointer to raid bdev
3333  * name - name of the base bdev
3334  * slot - position to add base bdev
3335  * cb_fn - callback function
3336  * cb_ctx - argument to callback function
3337  * returns:
3338  * 0 - success
3339  * non zero - failure
3340  */
3341 int
3342 raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3343 			  raid_base_bdev_cb cb_fn, void *cb_ctx)
3344 {
3345 	return _raid_bdev_add_base_device(raid_bdev, name, slot, 0, 0, cb_fn, cb_ctx);
3346 }
3347 
3348 static int
3349 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3350 {
3351 	struct raid_bdev *raid_bdev;
3352 	uint8_t i;
3353 	int rc;
3354 
3355 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3356 			       sb->level, true, &sb->uuid, &raid_bdev);
3357 	if (rc != 0) {
3358 		return rc;
3359 	}
3360 
3361 	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3362 	if (rc != 0) {
3363 		raid_bdev_free(raid_bdev);
3364 		return rc;
3365 	}
3366 
3367 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3368 	memcpy(raid_bdev->sb, sb, sb->length);
3369 
3370 	for (i = 0; i < sb->base_bdevs_size; i++) {
3371 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3372 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3373 
3374 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3375 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3376 			raid_bdev->num_base_bdevs_operational++;
3377 		}
3378 
3379 		base_info->data_offset = sb_base_bdev->data_offset;
3380 		base_info->data_size = sb_base_bdev->data_size;
3381 	}
3382 
3383 	*raid_bdev_out = raid_bdev;
3384 	return 0;
3385 }
3386 
3387 static void
3388 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3389 {
3390 	struct raid_bdev *raid_bdev;
3391 	struct raid_base_bdev_info *base_info;
3392 
3393 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3394 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3395 			if (base_info->desc == NULL && base_info->name != NULL &&
3396 			    strcmp(bdev->name, base_info->name) == 0) {
3397 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3398 				break;
3399 			}
3400 		}
3401 	}
3402 }
3403 
3404 static void
3405 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev)
3406 {
3407 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3408 	struct raid_bdev *raid_bdev;
3409 	struct raid_base_bdev_info *iter, *base_info;
3410 	uint8_t i;
3411 	int rc;
3412 
3413 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3414 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3415 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3416 		return;
3417 	}
3418 
3419 	if (spdk_uuid_is_null(&sb->uuid)) {
3420 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3421 		return;
3422 	}
3423 
3424 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3425 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3426 			break;
3427 		}
3428 	}
3429 
3430 	if (raid_bdev) {
3431 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3432 			SPDK_DEBUGLOG(bdev_raid,
3433 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3434 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3435 
3436 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3437 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3438 					     raid_bdev->bdev.name, bdev->name);
3439 				return;
3440 			}
3441 
3442 			/* remove and then recreate the raid bdev using the newer superblock */
3443 			raid_bdev_delete(raid_bdev, NULL, NULL);
3444 			raid_bdev = NULL;
3445 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3446 			SPDK_DEBUGLOG(bdev_raid,
3447 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3448 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3449 			/* use the current raid bdev superblock */
3450 			sb = raid_bdev->sb;
3451 		}
3452 	}
3453 
3454 	for (i = 0; i < sb->base_bdevs_size; i++) {
3455 		sb_base_bdev = &sb->base_bdevs[i];
3456 
3457 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3458 
3459 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3460 			break;
3461 		}
3462 	}
3463 
3464 	if (i == sb->base_bdevs_size) {
3465 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3466 		return;
3467 	}
3468 
3469 	if (!raid_bdev) {
3470 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3471 		if (rc != 0) {
3472 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3473 				    sb->name, spdk_strerror(-rc));
3474 			return;
3475 		}
3476 	}
3477 
3478 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3479 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3480 			       bdev->name, raid_bdev->bdev.name);
3481 		return;
3482 	}
3483 
3484 	base_info = NULL;
3485 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3486 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3487 			base_info = iter;
3488 			break;
3489 		}
3490 	}
3491 
3492 	if (base_info == NULL) {
3493 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3494 			    bdev->name, raid_bdev->bdev.name);
3495 		return;
3496 	}
3497 
3498 	rc = raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3499 	if (rc != 0) {
3500 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3501 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3502 	}
3503 }
3504 
3505 struct raid_bdev_examine_ctx {
3506 	struct spdk_bdev_desc *desc;
3507 	struct spdk_io_channel *ch;
3508 };
3509 
3510 static void
3511 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3512 {
3513 	if (!ctx) {
3514 		return;
3515 	}
3516 
3517 	if (ctx->ch) {
3518 		spdk_put_io_channel(ctx->ch);
3519 	}
3520 
3521 	if (ctx->desc) {
3522 		spdk_bdev_close(ctx->desc);
3523 	}
3524 
3525 	free(ctx);
3526 }
3527 
3528 static void
3529 raid_bdev_examine_load_sb_cb(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3530 {
3531 	struct raid_bdev_examine_ctx *ctx = _ctx;
3532 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3533 
3534 	switch (status) {
3535 	case 0:
3536 		/* valid superblock found */
3537 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3538 		raid_bdev_examine_sb(sb, bdev);
3539 		break;
3540 	case -EINVAL:
3541 		/* no valid superblock, check if it can be claimed anyway */
3542 		raid_bdev_examine_no_sb(bdev);
3543 		break;
3544 	default:
3545 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3546 			    bdev->name, spdk_strerror(-status));
3547 		break;
3548 	}
3549 
3550 	raid_bdev_examine_ctx_free(ctx);
3551 	spdk_bdev_module_examine_done(&g_raid_if);
3552 }
3553 
3554 static void
3555 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3556 {
3557 }
3558 
3559 /*
3560  * brief:
3561  * raid_bdev_examine function is the examine function call by the below layers
3562  * like bdev_nvme layer. This function will check if this base bdev can be
3563  * claimed by this raid bdev or not.
3564  * params:
3565  * bdev - pointer to base bdev
3566  * returns:
3567  * none
3568  */
3569 static void
3570 raid_bdev_examine(struct spdk_bdev *bdev)
3571 {
3572 	struct raid_bdev_examine_ctx *ctx;
3573 	int rc;
3574 
3575 	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3576 		goto done;
3577 	}
3578 
3579 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3580 		raid_bdev_examine_no_sb(bdev);
3581 		goto done;
3582 	}
3583 
3584 	ctx = calloc(1, sizeof(*ctx));
3585 	if (!ctx) {
3586 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3587 			    bdev->name, spdk_strerror(ENOMEM));
3588 		goto err;
3589 	}
3590 
3591 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, raid_bdev_examine_event_cb, NULL,
3592 				&ctx->desc);
3593 	if (rc) {
3594 		SPDK_ERRLOG("Failed to open bdev %s: %s\n",
3595 			    bdev->name, spdk_strerror(-rc));
3596 		goto err;
3597 	}
3598 
3599 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3600 	if (!ctx->ch) {
3601 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev->name);
3602 		goto err;
3603 	}
3604 
3605 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_cb, ctx);
3606 	if (rc) {
3607 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3608 			    bdev->name, spdk_strerror(-rc));
3609 		goto err;
3610 	}
3611 
3612 	return;
3613 err:
3614 	raid_bdev_examine_ctx_free(ctx);
3615 done:
3616 	spdk_bdev_module_examine_done(&g_raid_if);
3617 }
3618 
3619 /* Log component for bdev raid bdev module */
3620 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3621