xref: /spdk/module/bdev/raid/bdev_raid.c (revision 8130039ee5287100d9eb93eb886967645da3d545)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2018 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "bdev_raid.h"
8 #include "spdk/env.h"
9 #include "spdk/thread.h"
10 #include "spdk/log.h"
11 #include "spdk/string.h"
12 #include "spdk/util.h"
13 #include "spdk/json.h"
14 #include "spdk/likely.h"
15 
16 #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
17 #define RAID_BDEV_PROCESS_MAX_QD	16
18 
19 #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT 1024
20 
21 static bool g_shutdown_started = false;
22 
23 /* List of all raid bdevs */
24 struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
25 
26 static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
27 
28 /*
29  * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
30  * contains the relationship of raid bdev io channel with base bdev io channels.
31  */
32 struct raid_bdev_io_channel {
33 	/* Array of IO channels of base bdevs */
34 	struct spdk_io_channel	**base_channel;
35 
36 	/* Private raid module IO channel */
37 	struct spdk_io_channel	*module_channel;
38 
39 	/* Background process data */
40 	struct {
41 		uint64_t offset;
42 		struct spdk_io_channel *target_ch;
43 		struct raid_bdev_io_channel *ch_processed;
44 	} process;
45 };
46 
47 enum raid_bdev_process_state {
48 	RAID_PROCESS_STATE_INIT,
49 	RAID_PROCESS_STATE_RUNNING,
50 	RAID_PROCESS_STATE_STOPPING,
51 	RAID_PROCESS_STATE_STOPPED,
52 };
53 
54 struct raid_bdev_process {
55 	struct raid_bdev		*raid_bdev;
56 	enum raid_process_type		type;
57 	enum raid_bdev_process_state	state;
58 	struct spdk_thread		*thread;
59 	struct raid_bdev_io_channel	*raid_ch;
60 	TAILQ_HEAD(, raid_bdev_process_request) requests;
61 	uint64_t			max_window_size;
62 	uint64_t			window_size;
63 	uint64_t			window_remaining;
64 	int				window_status;
65 	uint64_t			window_offset;
66 	bool				window_range_locked;
67 	struct raid_base_bdev_info	*target;
68 	int				status;
69 	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
70 };
71 
72 struct raid_process_finish_action {
73 	spdk_msg_fn cb;
74 	void *cb_ctx;
75 	TAILQ_ENTRY(raid_process_finish_action) link;
76 };
77 
78 static struct spdk_raid_bdev_opts g_opts = {
79 	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
80 };
81 
82 void
83 raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
84 {
85 	*opts = g_opts;
86 }
87 
88 int
89 raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
90 {
91 	if (opts->process_window_size_kb == 0) {
92 		return -EINVAL;
93 	}
94 
95 	g_opts = *opts;
96 
97 	return 0;
98 }
99 
100 static struct raid_bdev_module *
101 raid_bdev_module_find(enum raid_level level)
102 {
103 	struct raid_bdev_module *raid_module;
104 
105 	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
106 		if (raid_module->level == level) {
107 			return raid_module;
108 		}
109 	}
110 
111 	return NULL;
112 }
113 
114 void
115 raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
116 {
117 	if (raid_bdev_module_find(raid_module->level) != NULL) {
118 		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
119 			    raid_bdev_level_to_str(raid_module->level));
120 		assert(false);
121 	} else {
122 		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
123 	}
124 }
125 
126 struct spdk_io_channel *
127 raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
128 {
129 	return raid_ch->base_channel[idx];
130 }
131 
132 void *
133 raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
134 {
135 	assert(raid_ch->module_channel != NULL);
136 
137 	return spdk_io_channel_get_ctx(raid_ch->module_channel);
138 }
139 
140 /* Function declarations */
141 static void	raid_bdev_examine(struct spdk_bdev *bdev);
142 static int	raid_bdev_init(void);
143 static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
144 				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
145 
146 static void
147 raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
148 {
149 	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
150 
151 	if (raid_ch->process.target_ch != NULL) {
152 		spdk_put_io_channel(raid_ch->process.target_ch);
153 		raid_ch->process.target_ch = NULL;
154 	}
155 
156 	if (raid_ch->process.ch_processed != NULL) {
157 		free(raid_ch->process.ch_processed->base_channel);
158 		free(raid_ch->process.ch_processed);
159 		raid_ch->process.ch_processed = NULL;
160 	}
161 }
162 
163 static int
164 raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
165 {
166 	struct raid_bdev *raid_bdev = process->raid_bdev;
167 	struct raid_bdev_io_channel *raid_ch_processed;
168 	struct raid_base_bdev_info *base_info;
169 
170 	raid_ch->process.offset = process->window_offset;
171 
172 	/* In the future we may have other types of processes which don't use a target bdev,
173 	 * like data scrubbing or strip size migration. Until then, expect that there always is
174 	 * a process target. */
175 	assert(process->target != NULL);
176 
177 	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
178 	if (raid_ch->process.target_ch == NULL) {
179 		goto err;
180 	}
181 
182 	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
183 	if (raid_ch_processed == NULL) {
184 		goto err;
185 	}
186 	raid_ch->process.ch_processed = raid_ch_processed;
187 
188 	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
189 					  sizeof(*raid_ch_processed->base_channel));
190 	if (raid_ch_processed->base_channel == NULL) {
191 		goto err;
192 	}
193 
194 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
195 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
196 
197 		if (base_info != process->target) {
198 			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
199 		} else {
200 			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
201 		}
202 	}
203 
204 	raid_ch_processed->module_channel = raid_ch->module_channel;
205 	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
206 
207 	return 0;
208 err:
209 	raid_bdev_ch_process_cleanup(raid_ch);
210 	return -ENOMEM;
211 }
212 
213 /*
214  * brief:
215  * raid_bdev_create_cb function is a cb function for raid bdev which creates the
216  * hierarchy from raid bdev to base bdev io channels. It will be called per core
217  * params:
218  * io_device - pointer to raid bdev io device represented by raid_bdev
219  * ctx_buf - pointer to context buffer for raid bdev io channel
220  * returns:
221  * 0 - success
222  * non zero - failure
223  */
224 static int
225 raid_bdev_create_cb(void *io_device, void *ctx_buf)
226 {
227 	struct raid_bdev            *raid_bdev = io_device;
228 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
229 	uint8_t i;
230 	int ret = -ENOMEM;
231 
232 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
233 
234 	assert(raid_bdev != NULL);
235 	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
236 
237 	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
238 	if (!raid_ch->base_channel) {
239 		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
240 		return -ENOMEM;
241 	}
242 
243 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
244 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
245 		/*
246 		 * Get the spdk_io_channel for all the base bdevs. This is used during
247 		 * split logic to send the respective child bdev ios to respective base
248 		 * bdev io channel.
249 		 * Skip missing base bdevs and the process target, which should also be treated as
250 		 * missing until the process completes.
251 		 */
252 		if (raid_bdev->base_bdev_info[i].desc == NULL ||
253 		    (raid_bdev->process != NULL && raid_bdev->process->target == &raid_bdev->base_bdev_info[i])) {
254 			continue;
255 		}
256 		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
257 						   raid_bdev->base_bdev_info[i].desc);
258 		if (!raid_ch->base_channel[i]) {
259 			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
260 			goto err;
261 		}
262 	}
263 
264 	if (raid_bdev->process != NULL) {
265 		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
266 		if (ret != 0) {
267 			SPDK_ERRLOG("Failed to setup process io channel\n");
268 			goto err;
269 		}
270 	} else {
271 		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
272 	}
273 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
274 
275 	if (raid_bdev->module->get_io_channel) {
276 		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
277 		if (!raid_ch->module_channel) {
278 			SPDK_ERRLOG("Unable to create io channel for raid module\n");
279 			goto err_unlocked;
280 		}
281 	}
282 
283 	return 0;
284 err:
285 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
286 err_unlocked:
287 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
288 		if (raid_ch->base_channel[i] != NULL) {
289 			spdk_put_io_channel(raid_ch->base_channel[i]);
290 		}
291 	}
292 	free(raid_ch->base_channel);
293 
294 	raid_bdev_ch_process_cleanup(raid_ch);
295 
296 	return ret;
297 }
298 
299 /*
300  * brief:
301  * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
302  * hierarchy from raid bdev to base bdev io channels. It will be called per core
303  * params:
304  * io_device - pointer to raid bdev io device represented by raid_bdev
305  * ctx_buf - pointer to context buffer for raid bdev io channel
306  * returns:
307  * none
308  */
309 static void
310 raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
311 {
312 	struct raid_bdev *raid_bdev = io_device;
313 	struct raid_bdev_io_channel *raid_ch = ctx_buf;
314 	uint8_t i;
315 
316 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
317 
318 	assert(raid_ch != NULL);
319 	assert(raid_ch->base_channel);
320 
321 	if (raid_ch->module_channel) {
322 		spdk_put_io_channel(raid_ch->module_channel);
323 	}
324 
325 	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
326 		/* Free base bdev channels */
327 		if (raid_ch->base_channel[i] != NULL) {
328 			spdk_put_io_channel(raid_ch->base_channel[i]);
329 		}
330 	}
331 	free(raid_ch->base_channel);
332 	raid_ch->base_channel = NULL;
333 
334 	raid_bdev_ch_process_cleanup(raid_ch);
335 }
336 
337 /*
338  * brief:
339  * raid_bdev_cleanup is used to cleanup raid_bdev related data
340  * structures.
341  * params:
342  * raid_bdev - pointer to raid_bdev
343  * returns:
344  * none
345  */
346 static void
347 raid_bdev_cleanup(struct raid_bdev *raid_bdev)
348 {
349 	struct raid_base_bdev_info *base_info;
350 
351 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
352 		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
353 	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
354 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
355 
356 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
357 		assert(base_info->desc == NULL);
358 		free(base_info->name);
359 	}
360 
361 	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
362 }
363 
364 static void
365 raid_bdev_free(struct raid_bdev *raid_bdev)
366 {
367 	spdk_dma_free(raid_bdev->sb);
368 	spdk_spin_destroy(&raid_bdev->base_bdev_lock);
369 	free(raid_bdev->base_bdev_info);
370 	free(raid_bdev->bdev.name);
371 	free(raid_bdev);
372 }
373 
374 static void
375 raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
376 {
377 	raid_bdev_cleanup(raid_bdev);
378 	raid_bdev_free(raid_bdev);
379 }
380 
381 /*
382  * brief:
383  * free resource of base bdev for raid bdev
384  * params:
385  * base_info - raid base bdev info
386  * returns:
387  * none
388  */
389 static void
390 raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
391 {
392 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
393 
394 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
395 
396 	free(base_info->name);
397 	base_info->name = NULL;
398 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
399 		spdk_uuid_set_null(&base_info->uuid);
400 	}
401 
402 	if (base_info->desc == NULL) {
403 		return;
404 	}
405 
406 	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
407 	spdk_bdev_close(base_info->desc);
408 	base_info->desc = NULL;
409 	spdk_put_io_channel(base_info->app_thread_ch);
410 	base_info->app_thread_ch = NULL;
411 
412 	if (base_info->is_configured) {
413 		assert(raid_bdev->num_base_bdevs_discovered);
414 		raid_bdev->num_base_bdevs_discovered--;
415 		base_info->is_configured = false;
416 	}
417 }
418 
419 static void
420 raid_bdev_io_device_unregister_cb(void *io_device)
421 {
422 	struct raid_bdev *raid_bdev = io_device;
423 
424 	if (raid_bdev->num_base_bdevs_discovered == 0) {
425 		/* Free raid_bdev when there are no base bdevs left */
426 		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
427 		raid_bdev_cleanup(raid_bdev);
428 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
429 		raid_bdev_free(raid_bdev);
430 	} else {
431 		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
432 	}
433 }
434 
435 void
436 raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
437 {
438 	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
439 		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
440 	}
441 }
442 
443 static void
444 _raid_bdev_destruct(void *ctxt)
445 {
446 	struct raid_bdev *raid_bdev = ctxt;
447 	struct raid_base_bdev_info *base_info;
448 
449 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
450 
451 	assert(raid_bdev->process == NULL);
452 
453 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
454 		/*
455 		 * Close all base bdev descriptors for which call has come from below
456 		 * layers.  Also close the descriptors if we have started shutdown.
457 		 */
458 		if (g_shutdown_started || base_info->remove_scheduled == true) {
459 			raid_bdev_free_base_bdev_resource(base_info);
460 		}
461 	}
462 
463 	if (g_shutdown_started) {
464 		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
465 	}
466 
467 	if (raid_bdev->module->stop != NULL) {
468 		if (raid_bdev->module->stop(raid_bdev) == false) {
469 			return;
470 		}
471 	}
472 
473 	raid_bdev_module_stop_done(raid_bdev);
474 }
475 
476 static int
477 raid_bdev_destruct(void *ctx)
478 {
479 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
480 
481 	return 1;
482 }
483 
484 /**
485  * Raid bdev I/O read/write wrapper for spdk_bdev_readv_blocks_ext function.
486  */
487 int
488 raid_bdev_readv_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
489 			   struct iovec *iov, int iovcnt, uint64_t offset_blocks,
490 			   uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
491 			   struct spdk_bdev_ext_io_opts *opts)
492 {
493 	return spdk_bdev_readv_blocks_ext(base_info->desc, ch, iov, iovcnt,
494 					  base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
495 }
496 
497 /**
498  * Raid bdev I/O read/write wrapper for spdk_bdev_writev_blocks_ext function.
499  */
500 int
501 raid_bdev_writev_blocks_ext(struct raid_base_bdev_info *base_info, struct spdk_io_channel *ch,
502 			    struct iovec *iov, int iovcnt, uint64_t offset_blocks,
503 			    uint64_t num_blocks, spdk_bdev_io_completion_cb cb, void *cb_arg,
504 			    struct spdk_bdev_ext_io_opts *opts)
505 {
506 	return spdk_bdev_writev_blocks_ext(base_info->desc, ch, iov, iovcnt,
507 					   base_info->data_offset + offset_blocks, num_blocks, cb, cb_arg, opts);
508 }
509 
510 void
511 raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
512 {
513 	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
514 
515 	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
516 		struct iovec *split_iov = raid_io->split.iov;
517 		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
518 
519 		/*
520 		 * Non-zero offset here means that this is the completion of the first part of the
521 		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
522 		 */
523 		if (raid_io->split.offset != 0) {
524 			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
525 			raid_io->md_buf = bdev_io->u.bdev.md_buf;
526 
527 			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
528 				raid_io->num_blocks = raid_io->split.offset;
529 				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
530 				raid_io->iovs = bdev_io->u.bdev.iovs;
531 				if (split_iov != NULL) {
532 					raid_io->iovcnt++;
533 					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
534 					split_iov->iov_base = split_iov_orig->iov_base;
535 				}
536 
537 				raid_io->split.offset = 0;
538 				raid_io->base_bdev_io_submitted = 0;
539 				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
540 
541 				raid_io->raid_bdev->module->submit_rw_request(raid_io);
542 				return;
543 			}
544 		}
545 
546 		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
547 		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
548 		raid_io->iovs = bdev_io->u.bdev.iovs;
549 		if (split_iov != NULL) {
550 			*split_iov = *split_iov_orig;
551 		}
552 	}
553 
554 	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
555 		raid_io->completion_cb(raid_io, status);
556 	} else {
557 		spdk_bdev_io_complete(bdev_io, status);
558 	}
559 }
560 
561 /*
562  * brief:
563  * raid_bdev_io_complete_part - signal the completion of a part of the expected
564  * base bdev IOs and complete the raid_io if this is the final expected IO.
565  * The caller should first set raid_io->base_bdev_io_remaining. This function
566  * will decrement this counter by the value of the 'completed' parameter and
567  * complete the raid_io if the counter reaches 0. The caller is free to
568  * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
569  * it can represent e.g. blocks or IOs.
570  * params:
571  * raid_io - pointer to raid_bdev_io
572  * completed - the part of the raid_io that has been completed
573  * status - status of the base IO
574  * returns:
575  * true - if the raid_io is completed
576  * false - otherwise
577  */
578 bool
579 raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
580 			   enum spdk_bdev_io_status status)
581 {
582 	assert(raid_io->base_bdev_io_remaining >= completed);
583 	raid_io->base_bdev_io_remaining -= completed;
584 
585 	if (status != SPDK_BDEV_IO_STATUS_SUCCESS) {
586 		raid_io->base_bdev_io_status = status;
587 	}
588 
589 	if (raid_io->base_bdev_io_remaining == 0) {
590 		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
591 		return true;
592 	} else {
593 		return false;
594 	}
595 }
596 
597 /*
598  * brief:
599  * raid_bdev_queue_io_wait function processes the IO which failed to submit.
600  * It will try to queue the IOs after storing the context to bdev wait queue logic.
601  * params:
602  * raid_io - pointer to raid_bdev_io
603  * bdev - the block device that the IO is submitted to
604  * ch - io channel
605  * cb_fn - callback when the spdk_bdev_io for bdev becomes available
606  * returns:
607  * none
608  */
609 void
610 raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
611 			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
612 {
613 	raid_io->waitq_entry.bdev = bdev;
614 	raid_io->waitq_entry.cb_fn = cb_fn;
615 	raid_io->waitq_entry.cb_arg = raid_io;
616 	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
617 }
618 
619 static void
620 raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
621 {
622 	struct raid_bdev_io *raid_io = cb_arg;
623 
624 	spdk_bdev_free_io(bdev_io);
625 
626 	raid_bdev_io_complete_part(raid_io, 1, success ?
627 				   SPDK_BDEV_IO_STATUS_SUCCESS :
628 				   SPDK_BDEV_IO_STATUS_FAILED);
629 }
630 
631 static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
632 
633 static void
634 _raid_bdev_submit_reset_request(void *_raid_io)
635 {
636 	struct raid_bdev_io *raid_io = _raid_io;
637 
638 	raid_bdev_submit_reset_request(raid_io);
639 }
640 
641 /*
642  * brief:
643  * raid_bdev_submit_reset_request function submits reset requests
644  * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
645  * which case it will queue it for later submission
646  * params:
647  * raid_io
648  * returns:
649  * none
650  */
651 static void
652 raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
653 {
654 	struct raid_bdev		*raid_bdev;
655 	int				ret;
656 	uint8_t				i;
657 	struct raid_base_bdev_info	*base_info;
658 	struct spdk_io_channel		*base_ch;
659 
660 	raid_bdev = raid_io->raid_bdev;
661 
662 	if (raid_io->base_bdev_io_remaining == 0) {
663 		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
664 	}
665 
666 	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
667 		base_info = &raid_bdev->base_bdev_info[i];
668 		base_ch = raid_io->raid_ch->base_channel[i];
669 		if (base_ch == NULL) {
670 			raid_io->base_bdev_io_submitted++;
671 			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
672 			continue;
673 		}
674 		ret = spdk_bdev_reset(base_info->desc, base_ch,
675 				      raid_base_bdev_reset_complete, raid_io);
676 		if (ret == 0) {
677 			raid_io->base_bdev_io_submitted++;
678 		} else if (ret == -ENOMEM) {
679 			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
680 						base_ch, _raid_bdev_submit_reset_request);
681 			return;
682 		} else {
683 			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
684 			assert(false);
685 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
686 			return;
687 		}
688 	}
689 }
690 
691 static void
692 raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
693 {
694 	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
695 	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
696 	int i;
697 
698 	assert(split_offset != 0);
699 	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
700 	raid_io->split.offset = split_offset;
701 
702 	raid_io->offset_blocks += split_offset;
703 	raid_io->num_blocks -= split_offset;
704 	if (raid_io->md_buf != NULL) {
705 		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
706 	}
707 
708 	for (i = 0; i < raid_io->iovcnt; i++) {
709 		struct iovec *iov = &raid_io->iovs[i];
710 
711 		if (iov_offset < iov->iov_len) {
712 			if (iov_offset == 0) {
713 				raid_io->split.iov = NULL;
714 			} else {
715 				raid_io->split.iov = iov;
716 				raid_io->split.iov_copy = *iov;
717 				iov->iov_base += iov_offset;
718 				iov->iov_len -= iov_offset;
719 			}
720 			raid_io->iovs += i;
721 			raid_io->iovcnt -= i;
722 			break;
723 		}
724 
725 		iov_offset -= iov->iov_len;
726 	}
727 }
728 
729 static void
730 raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
731 {
732 	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
733 
734 	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
735 		uint64_t offset_begin = raid_io->offset_blocks;
736 		uint64_t offset_end = offset_begin + raid_io->num_blocks;
737 
738 		if (offset_end > raid_ch->process.offset) {
739 			if (offset_begin < raid_ch->process.offset) {
740 				/*
741 				 * If the I/O spans both the processed and unprocessed ranges,
742 				 * split it and first handle the unprocessed part. After it
743 				 * completes, the rest will be handled.
744 				 * This situation occurs when the process thread is not active
745 				 * or is waiting for the process window range to be locked
746 				 * (quiesced). When a window is being processed, such I/Os will be
747 				 * deferred by the bdev layer until the window is unlocked.
748 				 */
749 				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
750 					      raid_ch->process.offset, offset_begin, offset_end);
751 				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
752 			}
753 		} else {
754 			/* Use the child channel, which corresponds to the already processed range */
755 			raid_io->raid_ch = raid_ch->process.ch_processed;
756 		}
757 	}
758 
759 	raid_io->raid_bdev->module->submit_rw_request(raid_io);
760 }
761 
762 /*
763  * brief:
764  * Callback function to spdk_bdev_io_get_buf.
765  * params:
766  * ch - pointer to raid bdev io channel
767  * bdev_io - pointer to parent bdev_io on raid bdev device
768  * success - True if buffer is allocated or false otherwise.
769  * returns:
770  * none
771  */
772 static void
773 raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
774 		     bool success)
775 {
776 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
777 
778 	if (!success) {
779 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
780 		return;
781 	}
782 
783 	raid_bdev_submit_rw_request(raid_io);
784 }
785 
786 void
787 raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
788 		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
789 		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
790 		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
791 {
792 	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
793 	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
794 
795 	raid_io->type = type;
796 	raid_io->offset_blocks = offset_blocks;
797 	raid_io->num_blocks = num_blocks;
798 	raid_io->iovs = iovs;
799 	raid_io->iovcnt = iovcnt;
800 	raid_io->memory_domain = memory_domain;
801 	raid_io->memory_domain_ctx = memory_domain_ctx;
802 	raid_io->md_buf = md_buf;
803 
804 	raid_io->raid_bdev = raid_bdev;
805 	raid_io->raid_ch = raid_ch;
806 	raid_io->base_bdev_io_remaining = 0;
807 	raid_io->base_bdev_io_submitted = 0;
808 	raid_io->base_bdev_io_status = SPDK_BDEV_IO_STATUS_SUCCESS;
809 	raid_io->completion_cb = NULL;
810 	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
811 }
812 
813 /*
814  * brief:
815  * raid_bdev_submit_request function is the submit_request function pointer of
816  * raid bdev function table. This is used to submit the io on raid_bdev to below
817  * layers.
818  * params:
819  * ch - pointer to raid bdev io channel
820  * bdev_io - pointer to parent bdev_io on raid bdev device
821  * returns:
822  * none
823  */
824 static void
825 raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
826 {
827 	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
828 
829 	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
830 			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
831 			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
832 			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
833 
834 	switch (bdev_io->type) {
835 	case SPDK_BDEV_IO_TYPE_READ:
836 		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
837 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
838 		break;
839 	case SPDK_BDEV_IO_TYPE_WRITE:
840 		raid_bdev_submit_rw_request(raid_io);
841 		break;
842 
843 	case SPDK_BDEV_IO_TYPE_RESET:
844 		raid_bdev_submit_reset_request(raid_io);
845 		break;
846 
847 	case SPDK_BDEV_IO_TYPE_FLUSH:
848 	case SPDK_BDEV_IO_TYPE_UNMAP:
849 		if (raid_io->raid_bdev->process != NULL) {
850 			/* TODO: rebuild support */
851 			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
852 			return;
853 		}
854 		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
855 		break;
856 
857 	default:
858 		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
859 		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
860 		break;
861 	}
862 }
863 
864 /*
865  * brief:
866  * _raid_bdev_io_type_supported checks whether io_type is supported in
867  * all base bdev modules of raid bdev module. If anyone among the base_bdevs
868  * doesn't support, the raid device doesn't supports.
869  *
870  * params:
871  * raid_bdev - pointer to raid bdev context
872  * io_type - io type
873  * returns:
874  * true - io_type is supported
875  * false - io_type is not supported
876  */
877 inline static bool
878 _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
879 {
880 	struct raid_base_bdev_info *base_info;
881 
882 	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
883 	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
884 		if (raid_bdev->module->submit_null_payload_request == NULL) {
885 			return false;
886 		}
887 	}
888 
889 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
890 		if (base_info->desc == NULL) {
891 			continue;
892 		}
893 
894 		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
895 			return false;
896 		}
897 	}
898 
899 	return true;
900 }
901 
902 /*
903  * brief:
904  * raid_bdev_io_type_supported is the io_supported function for bdev function
905  * table which returns whether the particular io type is supported or not by
906  * raid bdev module
907  * params:
908  * ctx - pointer to raid bdev context
909  * type - io type
910  * returns:
911  * true - io_type is supported
912  * false - io_type is not supported
913  */
914 static bool
915 raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
916 {
917 	switch (io_type) {
918 	case SPDK_BDEV_IO_TYPE_READ:
919 	case SPDK_BDEV_IO_TYPE_WRITE:
920 		return true;
921 
922 	case SPDK_BDEV_IO_TYPE_FLUSH:
923 	case SPDK_BDEV_IO_TYPE_RESET:
924 	case SPDK_BDEV_IO_TYPE_UNMAP:
925 		return _raid_bdev_io_type_supported(ctx, io_type);
926 
927 	default:
928 		return false;
929 	}
930 
931 	return false;
932 }
933 
934 /*
935  * brief:
936  * raid_bdev_get_io_channel is the get_io_channel function table pointer for
937  * raid bdev. This is used to return the io channel for this raid bdev
938  * params:
939  * ctxt - pointer to raid_bdev
940  * returns:
941  * pointer to io channel for raid bdev
942  */
943 static struct spdk_io_channel *
944 raid_bdev_get_io_channel(void *ctxt)
945 {
946 	struct raid_bdev *raid_bdev = ctxt;
947 
948 	return spdk_get_io_channel(raid_bdev);
949 }
950 
951 void
952 raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
953 {
954 	struct raid_base_bdev_info *base_info;
955 	char uuid_str[SPDK_UUID_STRING_LEN];
956 
957 	assert(raid_bdev != NULL);
958 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
959 
960 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
961 	spdk_json_write_named_string(w, "uuid", uuid_str);
962 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
963 	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
964 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
965 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
966 	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
967 	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
968 	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
969 				     raid_bdev->num_base_bdevs_operational);
970 	if (raid_bdev->process) {
971 		struct raid_bdev_process *process = raid_bdev->process;
972 		uint64_t offset = process->window_offset;
973 
974 		spdk_json_write_named_object_begin(w, "process");
975 		spdk_json_write_name(w, "type");
976 		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
977 		spdk_json_write_named_string(w, "target", process->target->name);
978 		spdk_json_write_named_object_begin(w, "progress");
979 		spdk_json_write_named_uint64(w, "blocks", offset);
980 		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
981 		spdk_json_write_object_end(w);
982 		spdk_json_write_object_end(w);
983 	}
984 	spdk_json_write_name(w, "base_bdevs_list");
985 	spdk_json_write_array_begin(w);
986 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
987 		spdk_json_write_object_begin(w);
988 		spdk_json_write_name(w, "name");
989 		if (base_info->name) {
990 			spdk_json_write_string(w, base_info->name);
991 		} else {
992 			spdk_json_write_null(w);
993 		}
994 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
995 		spdk_json_write_named_string(w, "uuid", uuid_str);
996 		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
997 		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
998 		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
999 		spdk_json_write_object_end(w);
1000 	}
1001 	spdk_json_write_array_end(w);
1002 }
1003 
1004 /*
1005  * brief:
1006  * raid_bdev_dump_info_json is the function table pointer for raid bdev
1007  * params:
1008  * ctx - pointer to raid_bdev
1009  * w - pointer to json context
1010  * returns:
1011  * 0 - success
1012  * non zero - failure
1013  */
1014 static int
1015 raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1016 {
1017 	struct raid_bdev *raid_bdev = ctx;
1018 
1019 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1020 
1021 	/* Dump the raid bdev configuration related information */
1022 	spdk_json_write_named_object_begin(w, "raid");
1023 	raid_bdev_write_info_json(raid_bdev, w);
1024 	spdk_json_write_object_end(w);
1025 
1026 	return 0;
1027 }
1028 
1029 /*
1030  * brief:
1031  * raid_bdev_write_config_json is the function table pointer for raid bdev
1032  * params:
1033  * bdev - pointer to spdk_bdev
1034  * w - pointer to json context
1035  * returns:
1036  * none
1037  */
1038 static void
1039 raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1040 {
1041 	struct raid_bdev *raid_bdev = bdev->ctxt;
1042 	struct raid_base_bdev_info *base_info;
1043 	char uuid_str[SPDK_UUID_STRING_LEN];
1044 
1045 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1046 
1047 	if (raid_bdev->sb != NULL) {
1048 		/* raid bdev configuration is stored in the superblock */
1049 		return;
1050 	}
1051 
1052 	spdk_json_write_object_begin(w);
1053 
1054 	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1055 
1056 	spdk_json_write_named_object_begin(w, "params");
1057 	spdk_json_write_named_string(w, "name", bdev->name);
1058 	spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &raid_bdev->bdev.uuid);
1059 	spdk_json_write_named_string(w, "uuid", uuid_str);
1060 	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1061 	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1062 	spdk_json_write_named_bool(w, "superblock", raid_bdev->sb != NULL);
1063 
1064 	spdk_json_write_named_array_begin(w, "base_bdevs");
1065 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1066 		if (base_info->desc) {
1067 			spdk_json_write_string(w, spdk_bdev_desc_get_bdev(base_info->desc)->name);
1068 		}
1069 	}
1070 	spdk_json_write_array_end(w);
1071 	spdk_json_write_object_end(w);
1072 
1073 	spdk_json_write_object_end(w);
1074 }
1075 
1076 static int
1077 raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1078 {
1079 	struct raid_bdev *raid_bdev = ctx;
1080 	struct raid_base_bdev_info *base_info;
1081 	int domains_count = 0, rc = 0;
1082 
1083 	if (raid_bdev->module->memory_domains_supported == false) {
1084 		return 0;
1085 	}
1086 
1087 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1088 
1089 	/* First loop to get the number of memory domains */
1090 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1091 		if (base_info->desc == NULL) {
1092 			continue;
1093 		}
1094 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1095 		if (rc < 0) {
1096 			goto out;
1097 		}
1098 		domains_count += rc;
1099 	}
1100 
1101 	if (!domains || array_size < domains_count) {
1102 		goto out;
1103 	}
1104 
1105 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1106 		if (base_info->desc == NULL) {
1107 			continue;
1108 		}
1109 		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1110 		if (rc < 0) {
1111 			goto out;
1112 		}
1113 		domains += rc;
1114 		array_size -= rc;
1115 	}
1116 out:
1117 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1118 
1119 	if (rc < 0) {
1120 		return rc;
1121 	}
1122 
1123 	return domains_count;
1124 }
1125 
1126 /* g_raid_bdev_fn_table is the function table for raid bdev */
1127 static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1128 	.destruct		= raid_bdev_destruct,
1129 	.submit_request		= raid_bdev_submit_request,
1130 	.io_type_supported	= raid_bdev_io_type_supported,
1131 	.get_io_channel		= raid_bdev_get_io_channel,
1132 	.dump_info_json		= raid_bdev_dump_info_json,
1133 	.write_config_json	= raid_bdev_write_config_json,
1134 	.get_memory_domains	= raid_bdev_get_memory_domains,
1135 };
1136 
1137 struct raid_bdev *
1138 raid_bdev_find_by_name(const char *name)
1139 {
1140 	struct raid_bdev *raid_bdev;
1141 
1142 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1143 		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1144 			return raid_bdev;
1145 		}
1146 	}
1147 
1148 	return NULL;
1149 }
1150 
1151 static struct {
1152 	const char *name;
1153 	enum raid_level value;
1154 } g_raid_level_names[] = {
1155 	{ "raid0", RAID0 },
1156 	{ "0", RAID0 },
1157 	{ "raid1", RAID1 },
1158 	{ "1", RAID1 },
1159 	{ "raid5f", RAID5F },
1160 	{ "5f", RAID5F },
1161 	{ "concat", CONCAT },
1162 	{ }
1163 };
1164 
1165 const char *g_raid_state_names[] = {
1166 	[RAID_BDEV_STATE_ONLINE]	= "online",
1167 	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1168 	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1169 	[RAID_BDEV_STATE_MAX]		= NULL
1170 };
1171 
1172 static const char *g_raid_process_type_names[] = {
1173 	[RAID_PROCESS_NONE]	= "none",
1174 	[RAID_PROCESS_REBUILD]	= "rebuild",
1175 	[RAID_PROCESS_MAX]	= NULL
1176 };
1177 
1178 /* We have to use the typedef in the function declaration to appease astyle. */
1179 typedef enum raid_level raid_level_t;
1180 typedef enum raid_bdev_state raid_bdev_state_t;
1181 
1182 raid_level_t
1183 raid_bdev_str_to_level(const char *str)
1184 {
1185 	unsigned int i;
1186 
1187 	assert(str != NULL);
1188 
1189 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1190 		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1191 			return g_raid_level_names[i].value;
1192 		}
1193 	}
1194 
1195 	return INVALID_RAID_LEVEL;
1196 }
1197 
1198 const char *
1199 raid_bdev_level_to_str(enum raid_level level)
1200 {
1201 	unsigned int i;
1202 
1203 	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1204 		if (g_raid_level_names[i].value == level) {
1205 			return g_raid_level_names[i].name;
1206 		}
1207 	}
1208 
1209 	return "";
1210 }
1211 
1212 raid_bdev_state_t
1213 raid_bdev_str_to_state(const char *str)
1214 {
1215 	unsigned int i;
1216 
1217 	assert(str != NULL);
1218 
1219 	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1220 		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1221 			break;
1222 		}
1223 	}
1224 
1225 	return i;
1226 }
1227 
1228 const char *
1229 raid_bdev_state_to_str(enum raid_bdev_state state)
1230 {
1231 	if (state >= RAID_BDEV_STATE_MAX) {
1232 		return "";
1233 	}
1234 
1235 	return g_raid_state_names[state];
1236 }
1237 
1238 const char *
1239 raid_bdev_process_to_str(enum raid_process_type value)
1240 {
1241 	if (value >= RAID_PROCESS_MAX) {
1242 		return "";
1243 	}
1244 
1245 	return g_raid_process_type_names[value];
1246 }
1247 
1248 /*
1249  * brief:
1250  * raid_bdev_fini_start is called when bdev layer is starting the
1251  * shutdown process
1252  * params:
1253  * none
1254  * returns:
1255  * none
1256  */
1257 static void
1258 raid_bdev_fini_start(void)
1259 {
1260 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1261 	g_shutdown_started = true;
1262 }
1263 
1264 /*
1265  * brief:
1266  * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1267  * params:
1268  * none
1269  * returns:
1270  * none
1271  */
1272 static void
1273 raid_bdev_exit(void)
1274 {
1275 	struct raid_bdev *raid_bdev, *tmp;
1276 
1277 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1278 
1279 	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1280 		raid_bdev_cleanup_and_free(raid_bdev);
1281 	}
1282 }
1283 
1284 static void
1285 raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1286 {
1287 	spdk_json_write_object_begin(w);
1288 
1289 	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1290 
1291 	spdk_json_write_named_object_begin(w, "params");
1292 	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1293 	spdk_json_write_object_end(w);
1294 
1295 	spdk_json_write_object_end(w);
1296 }
1297 
1298 static int
1299 raid_bdev_config_json(struct spdk_json_write_ctx *w)
1300 {
1301 	raid_bdev_opts_config_json(w);
1302 
1303 	return 0;
1304 }
1305 
1306 /*
1307  * brief:
1308  * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1309  * module
1310  * params:
1311  * none
1312  * returns:
1313  * size of spdk_bdev_io context for raid
1314  */
1315 static int
1316 raid_bdev_get_ctx_size(void)
1317 {
1318 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1319 	return sizeof(struct raid_bdev_io);
1320 }
1321 
1322 static struct spdk_bdev_module g_raid_if = {
1323 	.name = "raid",
1324 	.module_init = raid_bdev_init,
1325 	.fini_start = raid_bdev_fini_start,
1326 	.module_fini = raid_bdev_exit,
1327 	.config_json = raid_bdev_config_json,
1328 	.get_ctx_size = raid_bdev_get_ctx_size,
1329 	.examine_disk = raid_bdev_examine,
1330 	.async_init = false,
1331 	.async_fini = false,
1332 };
1333 SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1334 
1335 /*
1336  * brief:
1337  * raid_bdev_init is the initialization function for raid bdev module
1338  * params:
1339  * none
1340  * returns:
1341  * 0 - success
1342  * non zero - failure
1343  */
1344 static int
1345 raid_bdev_init(void)
1346 {
1347 	return 0;
1348 }
1349 
1350 static int
1351 _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1352 		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1353 		  struct raid_bdev **raid_bdev_out)
1354 {
1355 	struct raid_bdev *raid_bdev;
1356 	struct spdk_bdev *raid_bdev_gen;
1357 	struct raid_bdev_module *module;
1358 	struct raid_base_bdev_info *base_info;
1359 	uint8_t min_operational;
1360 
1361 	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1362 		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1363 		return -EINVAL;
1364 	}
1365 
1366 	if (raid_bdev_find_by_name(name) != NULL) {
1367 		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1368 		return -EEXIST;
1369 	}
1370 
1371 	if (level == RAID1) {
1372 		if (strip_size != 0) {
1373 			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1374 			return -EINVAL;
1375 		}
1376 	} else if (spdk_u32_is_pow2(strip_size) == false) {
1377 		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1378 		return -EINVAL;
1379 	}
1380 
1381 	module = raid_bdev_module_find(level);
1382 	if (module == NULL) {
1383 		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1384 		return -EINVAL;
1385 	}
1386 
1387 	assert(module->base_bdevs_min != 0);
1388 	if (num_base_bdevs < module->base_bdevs_min) {
1389 		SPDK_ERRLOG("At least %u base devices required for %s\n",
1390 			    module->base_bdevs_min,
1391 			    raid_bdev_level_to_str(level));
1392 		return -EINVAL;
1393 	}
1394 
1395 	switch (module->base_bdevs_constraint.type) {
1396 	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1397 		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1398 		break;
1399 	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1400 		min_operational = module->base_bdevs_constraint.value;
1401 		break;
1402 	case CONSTRAINT_UNSET:
1403 		if (module->base_bdevs_constraint.value != 0) {
1404 			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1405 				    (uint8_t)module->base_bdevs_constraint.value, name);
1406 			return -EINVAL;
1407 		}
1408 		min_operational = num_base_bdevs;
1409 		break;
1410 	default:
1411 		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1412 			    (uint8_t)module->base_bdevs_constraint.type,
1413 			    raid_bdev_level_to_str(module->level));
1414 		return -EINVAL;
1415 	};
1416 
1417 	if (min_operational == 0 || min_operational > num_base_bdevs) {
1418 		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1419 			    raid_bdev_level_to_str(module->level));
1420 		return -EINVAL;
1421 	}
1422 
1423 	raid_bdev = calloc(1, sizeof(*raid_bdev));
1424 	if (!raid_bdev) {
1425 		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1426 		return -ENOMEM;
1427 	}
1428 
1429 	spdk_spin_init(&raid_bdev->base_bdev_lock);
1430 	raid_bdev->module = module;
1431 	raid_bdev->num_base_bdevs = num_base_bdevs;
1432 	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1433 					   sizeof(struct raid_base_bdev_info));
1434 	if (!raid_bdev->base_bdev_info) {
1435 		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1436 		raid_bdev_free(raid_bdev);
1437 		return -ENOMEM;
1438 	}
1439 
1440 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1441 		base_info->raid_bdev = raid_bdev;
1442 	}
1443 
1444 	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1445 	 * internally and set later.
1446 	 */
1447 	raid_bdev->strip_size = 0;
1448 	raid_bdev->strip_size_kb = strip_size;
1449 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1450 	raid_bdev->level = level;
1451 	raid_bdev->min_base_bdevs_operational = min_operational;
1452 
1453 	if (superblock_enabled) {
1454 		raid_bdev->sb = spdk_dma_zmalloc(RAID_BDEV_SB_MAX_LENGTH, 0x1000, NULL);
1455 		if (!raid_bdev->sb) {
1456 			SPDK_ERRLOG("Failed to allocate raid bdev sb buffer\n");
1457 			raid_bdev_free(raid_bdev);
1458 			return -ENOMEM;
1459 		}
1460 	}
1461 
1462 	raid_bdev_gen = &raid_bdev->bdev;
1463 
1464 	raid_bdev_gen->name = strdup(name);
1465 	if (!raid_bdev_gen->name) {
1466 		SPDK_ERRLOG("Unable to allocate name for raid\n");
1467 		raid_bdev_free(raid_bdev);
1468 		return -ENOMEM;
1469 	}
1470 
1471 	raid_bdev_gen->product_name = "Raid Volume";
1472 	raid_bdev_gen->ctxt = raid_bdev;
1473 	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1474 	raid_bdev_gen->module = &g_raid_if;
1475 	raid_bdev_gen->write_cache = 0;
1476 	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1477 
1478 	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1479 
1480 	*raid_bdev_out = raid_bdev;
1481 
1482 	return 0;
1483 }
1484 
1485 /*
1486  * brief:
1487  * raid_bdev_create allocates raid bdev based on passed configuration
1488  * params:
1489  * name - name for raid bdev
1490  * strip_size - strip size in KB
1491  * num_base_bdevs - number of base bdevs
1492  * level - raid level
1493  * superblock_enabled - true if raid should have superblock
1494  * uuid - uuid to set for the bdev
1495  * raid_bdev_out - the created raid bdev
1496  * returns:
1497  * 0 - success
1498  * non zero - failure
1499  */
1500 int
1501 raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1502 		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1503 		 struct raid_bdev **raid_bdev_out)
1504 {
1505 	struct raid_bdev *raid_bdev;
1506 	int rc;
1507 
1508 	assert(uuid != NULL);
1509 
1510 	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1511 			       &raid_bdev);
1512 	if (rc != 0) {
1513 		return rc;
1514 	}
1515 
1516 	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1517 		/* we need to have the uuid to store in the superblock before the bdev is registered */
1518 		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1519 	}
1520 
1521 	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1522 
1523 	*raid_bdev_out = raid_bdev;
1524 
1525 	return 0;
1526 }
1527 
1528 static void
1529 _raid_bdev_unregistering_cont(void *ctx)
1530 {
1531 	struct raid_bdev *raid_bdev = ctx;
1532 
1533 	spdk_bdev_close(raid_bdev->self_desc);
1534 	raid_bdev->self_desc = NULL;
1535 }
1536 
1537 static void
1538 raid_bdev_unregistering_cont(void *ctx)
1539 {
1540 	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1541 }
1542 
1543 static int
1544 raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1545 {
1546 	struct raid_process_finish_action *finish_action;
1547 
1548 	assert(spdk_get_thread() == process->thread);
1549 	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1550 
1551 	finish_action = calloc(1, sizeof(*finish_action));
1552 	if (finish_action == NULL) {
1553 		return -ENOMEM;
1554 	}
1555 
1556 	finish_action->cb = cb;
1557 	finish_action->cb_ctx = cb_ctx;
1558 
1559 	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1560 
1561 	return 0;
1562 }
1563 
1564 static void
1565 raid_bdev_unregistering_stop_process(void *ctx)
1566 {
1567 	struct raid_bdev_process *process = ctx;
1568 	struct raid_bdev *raid_bdev = process->raid_bdev;
1569 	int rc;
1570 
1571 	process->state = RAID_PROCESS_STATE_STOPPING;
1572 	if (process->status == 0) {
1573 		process->status = -ECANCELED;
1574 	}
1575 
1576 	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1577 	if (rc != 0) {
1578 		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1579 			    raid_bdev->bdev.name, spdk_strerror(-rc));
1580 	}
1581 }
1582 
1583 static void
1584 raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1585 {
1586 	struct raid_bdev *raid_bdev = event_ctx;
1587 
1588 	switch (type) {
1589 	case SPDK_BDEV_EVENT_REMOVE:
1590 		if (raid_bdev->process != NULL) {
1591 			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1592 					     raid_bdev->process);
1593 		} else {
1594 			raid_bdev_unregistering_cont(raid_bdev);
1595 		}
1596 		break;
1597 	default:
1598 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
1599 		break;
1600 	}
1601 }
1602 
1603 static void
1604 raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1605 {
1606 	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1607 	int rc;
1608 
1609 	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1610 	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1611 	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1612 		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1613 	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1614 				sizeof(struct raid_bdev_io_channel),
1615 				raid_bdev_gen->name);
1616 	rc = spdk_bdev_register(raid_bdev_gen);
1617 	if (rc != 0) {
1618 		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1619 			    raid_bdev_gen->name, spdk_strerror(-rc));
1620 		goto err;
1621 	}
1622 
1623 	/*
1624 	 * Open the bdev internally to delay unregistering if we need to stop a background process
1625 	 * first. The process may still need to unquiesce a range but it will fail because the
1626 	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1627 	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1628 	 * so this is the only way currently to do this correctly.
1629 	 * TODO: try to handle this correctly in bdev layer instead.
1630 	 */
1631 	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1632 				&raid_bdev->self_desc);
1633 	if (rc != 0) {
1634 		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1635 			    raid_bdev_gen->name, spdk_strerror(-rc));
1636 		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1637 		goto err;
1638 	}
1639 
1640 	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1641 	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1642 		      raid_bdev_gen->name, raid_bdev);
1643 	return;
1644 err:
1645 	if (raid_bdev->module->stop != NULL) {
1646 		raid_bdev->module->stop(raid_bdev);
1647 	}
1648 	spdk_io_device_unregister(raid_bdev, NULL);
1649 	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1650 }
1651 
1652 static void
1653 raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1654 {
1655 	if (status == 0) {
1656 		raid_bdev_configure_cont(raid_bdev);
1657 	} else {
1658 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1659 			    raid_bdev->bdev.name, spdk_strerror(-status));
1660 		if (raid_bdev->module->stop != NULL) {
1661 			raid_bdev->module->stop(raid_bdev);
1662 		}
1663 	}
1664 }
1665 
1666 /*
1667  * brief:
1668  * If raid bdev config is complete, then only register the raid bdev to
1669  * bdev layer and remove this raid bdev from configuring list and
1670  * insert the raid bdev to configured list
1671  * params:
1672  * raid_bdev - pointer to raid bdev
1673  * returns:
1674  * 0 - success
1675  * non zero - failure
1676  */
1677 static int
1678 raid_bdev_configure(struct raid_bdev *raid_bdev)
1679 {
1680 	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1681 	int rc;
1682 
1683 	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1684 	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1685 	assert(raid_bdev->bdev.blocklen > 0);
1686 
1687 	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1688 	 * internal use.
1689 	 */
1690 	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1691 	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1692 		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1693 		return -EINVAL;
1694 	}
1695 	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1696 	raid_bdev->blocklen_shift = spdk_u32log2(data_block_size);
1697 
1698 	rc = raid_bdev->module->start(raid_bdev);
1699 	if (rc != 0) {
1700 		SPDK_ERRLOG("raid module startup callback failed\n");
1701 		return rc;
1702 	}
1703 
1704 	if (raid_bdev->sb != NULL) {
1705 		if (spdk_uuid_is_null(&raid_bdev->sb->uuid)) {
1706 			/* NULL UUID is not valid in the sb so it means that we are creating a new
1707 			 * raid bdev and should initialize the superblock.
1708 			 */
1709 			raid_bdev_init_superblock(raid_bdev);
1710 		} else {
1711 			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1712 			if (raid_bdev->sb->block_size != data_block_size) {
1713 				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1714 				rc = -EINVAL;
1715 			}
1716 			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1717 				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1718 				rc = -EINVAL;
1719 			}
1720 			if (rc != 0) {
1721 				if (raid_bdev->module->stop != NULL) {
1722 					raid_bdev->module->stop(raid_bdev);
1723 				}
1724 				return rc;
1725 			}
1726 		}
1727 
1728 		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1729 	} else {
1730 		raid_bdev_configure_cont(raid_bdev);
1731 	}
1732 
1733 	return 0;
1734 }
1735 
1736 /*
1737  * brief:
1738  * If raid bdev is online and registered, change the bdev state to
1739  * configuring and unregister this raid device. Queue this raid device
1740  * in configuring list
1741  * params:
1742  * raid_bdev - pointer to raid bdev
1743  * cb_fn - callback function
1744  * cb_arg - argument to callback function
1745  * returns:
1746  * none
1747  */
1748 static void
1749 raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1750 		      void *cb_arg)
1751 {
1752 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1753 		if (cb_fn) {
1754 			cb_fn(cb_arg, 0);
1755 		}
1756 		return;
1757 	}
1758 
1759 	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1760 	assert(raid_bdev->num_base_bdevs_discovered);
1761 	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1762 
1763 	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1764 }
1765 
1766 /*
1767  * brief:
1768  * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1769  * params:
1770  * base_bdev - pointer to base bdev
1771  * returns:
1772  * base bdev info if found, otherwise NULL.
1773  */
1774 static struct raid_base_bdev_info *
1775 raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1776 {
1777 	struct raid_bdev *raid_bdev;
1778 	struct raid_base_bdev_info *base_info;
1779 
1780 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1781 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1782 			if (base_info->desc != NULL &&
1783 			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1784 				return base_info;
1785 			}
1786 		}
1787 	}
1788 
1789 	return NULL;
1790 }
1791 
1792 static void
1793 raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1794 {
1795 	assert(base_info->remove_scheduled);
1796 
1797 	base_info->remove_scheduled = false;
1798 	if (base_info->remove_cb != NULL) {
1799 		base_info->remove_cb(base_info->remove_cb_ctx, status);
1800 	}
1801 }
1802 
1803 static void
1804 raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1805 {
1806 	struct raid_base_bdev_info *base_info = ctx;
1807 
1808 	if (status != 0) {
1809 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1810 			    raid_bdev->bdev.name, spdk_strerror(-status));
1811 	}
1812 
1813 	raid_bdev_remove_base_bdev_done(base_info, status);
1814 }
1815 
1816 static void
1817 raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1818 {
1819 	struct raid_base_bdev_info *base_info = ctx;
1820 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1821 
1822 	if (status != 0) {
1823 		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1824 			    raid_bdev->bdev.name, spdk_strerror(-status));
1825 		goto out;
1826 	}
1827 
1828 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
1829 	raid_bdev_free_base_bdev_resource(base_info);
1830 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
1831 
1832 	if (raid_bdev->sb) {
1833 		struct raid_bdev_superblock *sb = raid_bdev->sb;
1834 		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
1835 		uint8_t i;
1836 
1837 		for (i = 0; i < sb->base_bdevs_size; i++) {
1838 			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
1839 
1840 			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
1841 			    sb_base_bdev->slot == slot) {
1842 				/* TODO: distinguish between failure and intentional removal */
1843 				sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
1844 
1845 				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
1846 				return;
1847 			}
1848 		}
1849 	}
1850 out:
1851 	raid_bdev_remove_base_bdev_done(base_info, status);
1852 }
1853 
1854 static void
1855 raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1856 {
1857 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1858 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1859 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1860 	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1861 
1862 	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1863 
1864 	if (raid_ch->base_channel[idx] != NULL) {
1865 		spdk_put_io_channel(raid_ch->base_channel[idx]);
1866 		raid_ch->base_channel[idx] = NULL;
1867 	}
1868 
1869 	if (raid_ch->process.ch_processed != NULL) {
1870 		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1871 	}
1872 
1873 	spdk_for_each_channel_continue(i, 0);
1874 }
1875 
1876 static void
1877 raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1878 {
1879 	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1880 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1881 
1882 	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1883 			    base_info);
1884 }
1885 
1886 static void
1887 raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
1888 {
1889 	struct raid_base_bdev_info *base_info = ctx;
1890 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1891 
1892 	if (status != 0) {
1893 		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
1894 			    raid_bdev->bdev.name, spdk_strerror(-status));
1895 		raid_bdev_remove_base_bdev_done(base_info, status);
1896 		return;
1897 	}
1898 
1899 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
1900 			      raid_bdev_channels_remove_base_bdev_done);
1901 }
1902 
1903 static int
1904 raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
1905 {
1906 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1907 
1908 	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
1909 				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
1910 }
1911 
1912 struct raid_bdev_process_base_bdev_remove_ctx {
1913 	struct raid_bdev_process *process;
1914 	struct raid_base_bdev_info *base_info;
1915 	uint8_t num_base_bdevs_operational;
1916 };
1917 
1918 static void
1919 _raid_bdev_process_base_bdev_remove_cont(void *ctx)
1920 {
1921 	struct raid_base_bdev_info *base_info = ctx;
1922 	int ret;
1923 
1924 	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
1925 	if (ret != 0) {
1926 		raid_bdev_remove_base_bdev_done(base_info, ret);
1927 	}
1928 }
1929 
1930 static void
1931 raid_bdev_process_base_bdev_remove_cont(void *_ctx)
1932 {
1933 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
1934 	struct raid_base_bdev_info *base_info = ctx->base_info;
1935 
1936 	free(ctx);
1937 
1938 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
1939 			     base_info);
1940 }
1941 
1942 static void
1943 _raid_bdev_process_base_bdev_remove(void *_ctx)
1944 {
1945 	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
1946 	struct raid_bdev_process *process = ctx->process;
1947 	int ret;
1948 
1949 	if (ctx->base_info != process->target &&
1950 	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
1951 		/* process doesn't need to be stopped */
1952 		raid_bdev_process_base_bdev_remove_cont(ctx);
1953 		return;
1954 	}
1955 
1956 	assert(process->state > RAID_PROCESS_STATE_INIT &&
1957 	       process->state < RAID_PROCESS_STATE_STOPPED);
1958 
1959 	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
1960 	if (ret != 0) {
1961 		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
1962 		free(ctx);
1963 		return;
1964 	}
1965 
1966 	process->state = RAID_PROCESS_STATE_STOPPING;
1967 
1968 	if (process->status == 0) {
1969 		process->status = -ENODEV;
1970 	}
1971 }
1972 
1973 static int
1974 raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
1975 				   struct raid_base_bdev_info *base_info)
1976 {
1977 	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
1978 
1979 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1980 
1981 	ctx = calloc(1, sizeof(*ctx));
1982 	if (ctx == NULL) {
1983 		return -ENOMEM;
1984 	}
1985 
1986 	/*
1987 	 * We have to send the process and num_base_bdevs_operational in the message ctx
1988 	 * because the process thread should not access raid_bdev's properties. Particularly,
1989 	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
1990 	 * will still be valid until the process is fully stopped.
1991 	 */
1992 	ctx->base_info = base_info;
1993 	ctx->process = process;
1994 	ctx->num_base_bdevs_operational = process->raid_bdev->num_base_bdevs_operational;
1995 
1996 	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
1997 
1998 	return 0;
1999 }
2000 
2001 static int
2002 _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2003 			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2004 {
2005 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2006 	int ret = 0;
2007 
2008 	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2009 
2010 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2011 
2012 	if (base_info->remove_scheduled) {
2013 		return -ENODEV;
2014 	}
2015 
2016 	assert(base_info->desc);
2017 	base_info->remove_scheduled = true;
2018 	base_info->remove_cb = cb_fn;
2019 	base_info->remove_cb_ctx = cb_ctx;
2020 
2021 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2022 		/*
2023 		 * As raid bdev is not registered yet or already unregistered,
2024 		 * so cleanup should be done here itself.
2025 		 *
2026 		 * Removing a base bdev at this stage does not change the number of operational
2027 		 * base bdevs, only the number of discovered base bdevs.
2028 		 */
2029 		raid_bdev_free_base_bdev_resource(base_info);
2030 		if (raid_bdev->num_base_bdevs_discovered == 0) {
2031 			/* There is no base bdev for this raid, so free the raid device. */
2032 			raid_bdev_cleanup_and_free(raid_bdev);
2033 		}
2034 	} else if (raid_bdev->num_base_bdevs_operational-- == raid_bdev->min_base_bdevs_operational) {
2035 		/*
2036 		 * After this base bdev is removed there will not be enough base bdevs
2037 		 * to keep the raid bdev operational.
2038 		 */
2039 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2040 	} else if (raid_bdev->process != NULL) {
2041 		ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2042 	} else {
2043 		ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2044 	}
2045 
2046 	if (ret != 0) {
2047 		base_info->remove_scheduled = false;
2048 	}
2049 	return ret;
2050 }
2051 
2052 /*
2053  * brief:
2054  * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2055  * is removed. This function checks if this base bdev is part of any raid bdev
2056  * or not. If yes, it takes necessary action on that particular raid bdev.
2057  * params:
2058  * base_bdev - pointer to base bdev which got removed
2059  * cb_fn - callback function
2060  * cb_arg - argument to callback function
2061  * returns:
2062  * 0 - success
2063  * non zero - failure
2064  */
2065 int
2066 raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2067 {
2068 	struct raid_base_bdev_info *base_info;
2069 
2070 	/* Find the raid_bdev which has claimed this base_bdev */
2071 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2072 	if (!base_info) {
2073 		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2074 		return -ENODEV;
2075 	}
2076 
2077 	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2078 }
2079 
2080 /*
2081  * brief:
2082  * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2083  * is resized. This function checks if the smallest size of the base_bdevs is changed.
2084  * If yes, call module handler to resize the raid_bdev if implemented.
2085  * params:
2086  * base_bdev - pointer to base bdev which got resized.
2087  * returns:
2088  * none
2089  */
2090 static void
2091 raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2092 {
2093 	struct raid_bdev *raid_bdev;
2094 	struct raid_base_bdev_info *base_info;
2095 
2096 	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2097 
2098 	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2099 
2100 	/* Find the raid_bdev which has claimed this base_bdev */
2101 	if (!base_info) {
2102 		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2103 		return;
2104 	}
2105 	raid_bdev = base_info->raid_bdev;
2106 
2107 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2108 
2109 	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2110 		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2111 
2112 	if (raid_bdev->module->resize) {
2113 		raid_bdev->module->resize(raid_bdev);
2114 	}
2115 }
2116 
2117 /*
2118  * brief:
2119  * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2120  * triggers asynchronous event.
2121  * params:
2122  * type - event details.
2123  * bdev - bdev that triggered event.
2124  * event_ctx - context for event.
2125  * returns:
2126  * none
2127  */
2128 static void
2129 raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2130 			  void *event_ctx)
2131 {
2132 	int rc;
2133 
2134 	switch (type) {
2135 	case SPDK_BDEV_EVENT_REMOVE:
2136 		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2137 		if (rc != 0) {
2138 			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2139 				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2140 		}
2141 		break;
2142 	case SPDK_BDEV_EVENT_RESIZE:
2143 		raid_bdev_resize_base_bdev(bdev);
2144 		break;
2145 	default:
2146 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2147 		break;
2148 	}
2149 }
2150 
2151 /*
2152  * brief:
2153  * Deletes the specified raid bdev
2154  * params:
2155  * raid_bdev - pointer to raid bdev
2156  * cb_fn - callback function
2157  * cb_arg - argument to callback function
2158  */
2159 void
2160 raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2161 {
2162 	struct raid_base_bdev_info *base_info;
2163 
2164 	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2165 
2166 	if (raid_bdev->destroy_started) {
2167 		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2168 			      raid_bdev->bdev.name);
2169 		if (cb_fn) {
2170 			cb_fn(cb_arg, -EALREADY);
2171 		}
2172 		return;
2173 	}
2174 
2175 	raid_bdev->destroy_started = true;
2176 
2177 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2178 		base_info->remove_scheduled = true;
2179 
2180 		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2181 			/*
2182 			 * As raid bdev is not registered yet or already unregistered,
2183 			 * so cleanup should be done here itself.
2184 			 */
2185 			raid_bdev_free_base_bdev_resource(base_info);
2186 		}
2187 	}
2188 
2189 	if (raid_bdev->num_base_bdevs_discovered == 0) {
2190 		/* There is no base bdev for this raid, so free the raid device. */
2191 		raid_bdev_cleanup_and_free(raid_bdev);
2192 		if (cb_fn) {
2193 			cb_fn(cb_arg, 0);
2194 		}
2195 	} else {
2196 		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2197 	}
2198 }
2199 
2200 static void
2201 raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2202 {
2203 	if (status != 0) {
2204 		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2205 			    raid_bdev->bdev.name, spdk_strerror(-status));
2206 	}
2207 }
2208 
2209 static void
2210 raid_bdev_process_finish_write_sb(void *ctx)
2211 {
2212 	struct raid_bdev *raid_bdev = ctx;
2213 	struct raid_bdev_superblock *sb = raid_bdev->sb;
2214 	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2215 	struct raid_base_bdev_info *base_info;
2216 	uint8_t i;
2217 
2218 	for (i = 0; i < sb->base_bdevs_size; i++) {
2219 		sb_base_bdev = &sb->base_bdevs[i];
2220 
2221 		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2222 		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2223 			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2224 			if (base_info->is_configured) {
2225 				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2226 				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2227 			}
2228 		}
2229 	}
2230 
2231 	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2232 }
2233 
2234 static void raid_bdev_process_free(struct raid_bdev_process *process);
2235 
2236 static void
2237 _raid_bdev_process_finish_done(void *ctx)
2238 {
2239 	struct raid_bdev_process *process = ctx;
2240 	struct raid_process_finish_action *finish_action;
2241 
2242 	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2243 		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2244 		finish_action->cb(finish_action->cb_ctx);
2245 		free(finish_action);
2246 	}
2247 
2248 	raid_bdev_process_free(process);
2249 
2250 	spdk_thread_exit(spdk_get_thread());
2251 }
2252 
2253 static void
2254 raid_bdev_process_finish_target_removed(void *ctx, int status)
2255 {
2256 	struct raid_bdev_process *process = ctx;
2257 
2258 	if (status != 0) {
2259 		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2260 	}
2261 
2262 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2263 }
2264 
2265 static void
2266 raid_bdev_process_finish_unquiesced(void *ctx, int status)
2267 {
2268 	struct raid_bdev_process *process = ctx;
2269 
2270 	if (status != 0) {
2271 		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2272 	}
2273 
2274 	if (process->status != 0) {
2275 		struct raid_base_bdev_info *target = process->target;
2276 
2277 		if (target->desc != NULL && target->remove_scheduled == false) {
2278 			_raid_bdev_remove_base_bdev(target, raid_bdev_process_finish_target_removed, process);
2279 			return;
2280 		}
2281 	}
2282 
2283 	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2284 }
2285 
2286 static void
2287 raid_bdev_process_finish_unquiesce(void *ctx)
2288 {
2289 	struct raid_bdev_process *process = ctx;
2290 	int rc;
2291 
2292 	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2293 				 raid_bdev_process_finish_unquiesced, process);
2294 	if (rc != 0) {
2295 		raid_bdev_process_finish_unquiesced(process, rc);
2296 	}
2297 }
2298 
2299 static void
2300 raid_bdev_process_finish_done(void *ctx)
2301 {
2302 	struct raid_bdev_process *process = ctx;
2303 	struct raid_bdev *raid_bdev = process->raid_bdev;
2304 
2305 	if (process->raid_ch != NULL) {
2306 		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2307 	}
2308 
2309 	process->state = RAID_PROCESS_STATE_STOPPED;
2310 
2311 	if (process->status == 0) {
2312 		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2313 			       raid_bdev_process_to_str(process->type),
2314 			       raid_bdev->bdev.name);
2315 		if (raid_bdev->sb != NULL) {
2316 			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2317 					     raid_bdev_process_finish_write_sb,
2318 					     raid_bdev);
2319 		}
2320 	} else {
2321 		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2322 			     raid_bdev_process_to_str(process->type),
2323 			     raid_bdev->bdev.name,
2324 			     spdk_strerror(-process->status));
2325 	}
2326 
2327 	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2328 			     process);
2329 }
2330 
2331 static void
2332 __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2333 {
2334 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2335 
2336 	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2337 }
2338 
2339 static void
2340 raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2341 {
2342 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2343 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2344 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2345 
2346 	if (process->status == 0) {
2347 		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2348 
2349 		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2350 		raid_ch->process.target_ch = NULL;
2351 	}
2352 
2353 	raid_bdev_ch_process_cleanup(raid_ch);
2354 
2355 	spdk_for_each_channel_continue(i, 0);
2356 }
2357 
2358 static void
2359 raid_bdev_process_finish_quiesced(void *ctx, int status)
2360 {
2361 	struct raid_bdev_process *process = ctx;
2362 	struct raid_bdev *raid_bdev = process->raid_bdev;
2363 
2364 	if (status != 0) {
2365 		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2366 		return;
2367 	}
2368 
2369 	raid_bdev->process = NULL;
2370 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2371 			      __raid_bdev_process_finish);
2372 }
2373 
2374 static void
2375 _raid_bdev_process_finish(void *ctx)
2376 {
2377 	struct raid_bdev_process *process = ctx;
2378 	int rc;
2379 
2380 	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2381 			       raid_bdev_process_finish_quiesced, process);
2382 	if (rc != 0) {
2383 		raid_bdev_process_finish_quiesced(ctx, rc);
2384 	}
2385 }
2386 
2387 static void
2388 raid_bdev_process_do_finish(struct raid_bdev_process *process)
2389 {
2390 	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2391 }
2392 
2393 static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2394 static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2395 
2396 static void
2397 raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2398 {
2399 	assert(spdk_get_thread() == process->thread);
2400 
2401 	if (process->status == 0) {
2402 		process->status = status;
2403 	}
2404 
2405 	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2406 		return;
2407 	}
2408 
2409 	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2410 	process->state = RAID_PROCESS_STATE_STOPPING;
2411 
2412 	if (process->window_range_locked) {
2413 		raid_bdev_process_unlock_window_range(process);
2414 	} else {
2415 		raid_bdev_process_thread_run(process);
2416 	}
2417 }
2418 
2419 static void
2420 raid_bdev_process_window_range_unlocked(void *ctx, int status)
2421 {
2422 	struct raid_bdev_process *process = ctx;
2423 
2424 	if (status != 0) {
2425 		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2426 		raid_bdev_process_finish(process, status);
2427 		return;
2428 	}
2429 
2430 	process->window_range_locked = false;
2431 	process->window_offset += process->window_size;
2432 
2433 	raid_bdev_process_thread_run(process);
2434 }
2435 
2436 static void
2437 raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2438 {
2439 	int rc;
2440 
2441 	assert(process->window_range_locked == true);
2442 
2443 	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2444 				       process->window_offset, process->max_window_size,
2445 				       raid_bdev_process_window_range_unlocked, process);
2446 	if (rc != 0) {
2447 		raid_bdev_process_window_range_unlocked(process, rc);
2448 	}
2449 }
2450 
2451 static void
2452 raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2453 {
2454 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2455 
2456 	raid_bdev_process_unlock_window_range(process);
2457 }
2458 
2459 static void
2460 raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2461 {
2462 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2463 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2464 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2465 
2466 	raid_ch->process.offset = process->window_offset + process->window_size;
2467 
2468 	spdk_for_each_channel_continue(i, 0);
2469 }
2470 
2471 void
2472 raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2473 {
2474 	struct raid_bdev_process *process = process_req->process;
2475 
2476 	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2477 
2478 	assert(spdk_get_thread() == process->thread);
2479 	assert(process->window_remaining >= process_req->num_blocks);
2480 
2481 	if (status != 0) {
2482 		process->window_status = status;
2483 	}
2484 
2485 	process->window_remaining -= process_req->num_blocks;
2486 	if (process->window_remaining == 0) {
2487 		if (process->window_status != 0) {
2488 			raid_bdev_process_finish(process, process->window_status);
2489 			return;
2490 		}
2491 
2492 		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2493 				      raid_bdev_process_channels_update_done);
2494 	}
2495 }
2496 
2497 static int
2498 raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2499 				 uint32_t num_blocks)
2500 {
2501 	struct raid_bdev *raid_bdev = process->raid_bdev;
2502 	struct raid_bdev_process_request *process_req;
2503 	int ret;
2504 
2505 	process_req = TAILQ_FIRST(&process->requests);
2506 	if (process_req == NULL) {
2507 		assert(process->window_remaining > 0);
2508 		return 0;
2509 	}
2510 
2511 	process_req->target = process->target;
2512 	process_req->target_ch = process->raid_ch->process.target_ch;
2513 	process_req->offset_blocks = offset_blocks;
2514 	process_req->num_blocks = num_blocks;
2515 	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2516 
2517 	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2518 	if (ret <= 0) {
2519 		if (ret < 0) {
2520 			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2521 				    raid_bdev->bdev.name, spdk_strerror(-ret));
2522 			process->window_status = ret;
2523 		}
2524 		return ret;
2525 	}
2526 
2527 	process_req->num_blocks = ret;
2528 	TAILQ_REMOVE(&process->requests, process_req, link);
2529 
2530 	return ret;
2531 }
2532 
2533 static void
2534 _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2535 {
2536 	struct raid_bdev *raid_bdev = process->raid_bdev;
2537 	uint64_t offset = process->window_offset;
2538 	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2539 	int ret;
2540 
2541 	while (offset < offset_end) {
2542 		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2543 		if (ret <= 0) {
2544 			break;
2545 		}
2546 
2547 		process->window_remaining += ret;
2548 		offset += ret;
2549 	}
2550 
2551 	if (process->window_remaining > 0) {
2552 		process->window_size = process->window_remaining;
2553 	} else {
2554 		raid_bdev_process_finish(process, process->window_status);
2555 	}
2556 }
2557 
2558 static void
2559 raid_bdev_process_window_range_locked(void *ctx, int status)
2560 {
2561 	struct raid_bdev_process *process = ctx;
2562 
2563 	if (status != 0) {
2564 		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2565 		raid_bdev_process_finish(process, status);
2566 		return;
2567 	}
2568 
2569 	process->window_range_locked = true;
2570 
2571 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2572 		raid_bdev_process_unlock_window_range(process);
2573 		return;
2574 	}
2575 
2576 	_raid_bdev_process_thread_run(process);
2577 }
2578 
2579 static void
2580 raid_bdev_process_thread_run(struct raid_bdev_process *process)
2581 {
2582 	struct raid_bdev *raid_bdev = process->raid_bdev;
2583 	int rc;
2584 
2585 	assert(spdk_get_thread() == process->thread);
2586 	assert(process->window_remaining == 0);
2587 	assert(process->window_range_locked == false);
2588 
2589 	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2590 		raid_bdev_process_do_finish(process);
2591 		return;
2592 	}
2593 
2594 	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2595 		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2596 		raid_bdev_process_finish(process, 0);
2597 		return;
2598 	}
2599 
2600 	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2601 					    process->max_window_size);
2602 
2603 	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2604 				     process->window_offset, process->max_window_size,
2605 				     raid_bdev_process_window_range_locked, process);
2606 	if (rc != 0) {
2607 		raid_bdev_process_window_range_locked(process, rc);
2608 	}
2609 }
2610 
2611 static void
2612 raid_bdev_process_thread_init(void *ctx)
2613 {
2614 	struct raid_bdev_process *process = ctx;
2615 	struct raid_bdev *raid_bdev = process->raid_bdev;
2616 	struct spdk_io_channel *ch;
2617 
2618 	process->thread = spdk_get_thread();
2619 
2620 	ch = spdk_get_io_channel(raid_bdev);
2621 	if (ch == NULL) {
2622 		process->status = -ENOMEM;
2623 		raid_bdev_process_do_finish(process);
2624 		return;
2625 	}
2626 
2627 	process->raid_ch = spdk_io_channel_get_ctx(ch);
2628 	process->state = RAID_PROCESS_STATE_RUNNING;
2629 
2630 	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2631 		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2632 
2633 	raid_bdev_process_thread_run(process);
2634 }
2635 
2636 static void
2637 raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2638 {
2639 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2640 
2641 	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2642 	raid_bdev_process_free(process);
2643 
2644 	/* TODO: update sb */
2645 }
2646 
2647 static void
2648 raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2649 {
2650 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2651 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2652 
2653 	raid_bdev_ch_process_cleanup(raid_ch);
2654 
2655 	spdk_for_each_channel_continue(i, 0);
2656 }
2657 
2658 static void
2659 raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2660 {
2661 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2662 	struct raid_bdev *raid_bdev = process->raid_bdev;
2663 	struct spdk_thread *thread;
2664 	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2665 
2666 	if (status != 0) {
2667 		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2668 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2669 			    spdk_strerror(-status));
2670 		goto err;
2671 	}
2672 
2673 	/* TODO: we may need to abort if a base bdev was removed before we got here */
2674 
2675 	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2676 		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2677 
2678 	thread = spdk_thread_create(thread_name, NULL);
2679 	if (thread == NULL) {
2680 		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2681 			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2682 		goto err;
2683 	}
2684 
2685 	raid_bdev->process = process;
2686 
2687 	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2688 
2689 	return;
2690 err:
2691 	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
2692 			      raid_bdev_channels_abort_start_process_done);
2693 }
2694 
2695 static void
2696 raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
2697 {
2698 	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2699 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2700 	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2701 	int rc;
2702 
2703 	rc = raid_bdev_ch_process_setup(raid_ch, process);
2704 
2705 	spdk_for_each_channel_continue(i, rc);
2706 }
2707 
2708 static void
2709 raid_bdev_process_start(struct raid_bdev_process *process)
2710 {
2711 	struct raid_bdev *raid_bdev = process->raid_bdev;
2712 
2713 	assert(raid_bdev->module->submit_process_request != NULL);
2714 
2715 	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
2716 			      raid_bdev_channels_start_process_done);
2717 }
2718 
2719 static void
2720 raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
2721 {
2722 	spdk_dma_free(process_req->iov.iov_base);
2723 	spdk_dma_free(process_req->md_buf);
2724 	free(process_req);
2725 }
2726 
2727 static struct raid_bdev_process_request *
2728 raid_bdev_process_alloc_request(struct raid_bdev_process *process)
2729 {
2730 	struct raid_bdev *raid_bdev = process->raid_bdev;
2731 	struct raid_bdev_process_request *process_req;
2732 
2733 	process_req = calloc(1, sizeof(*process_req));
2734 	if (process_req == NULL) {
2735 		return NULL;
2736 	}
2737 
2738 	process_req->process = process;
2739 	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
2740 	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
2741 	if (process_req->iov.iov_base == NULL) {
2742 		free(process_req);
2743 		return NULL;
2744 	}
2745 	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
2746 		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
2747 		if (process_req->md_buf == NULL) {
2748 			raid_bdev_process_request_free(process_req);
2749 			return NULL;
2750 		}
2751 	}
2752 
2753 	return process_req;
2754 }
2755 
2756 static void
2757 raid_bdev_process_free(struct raid_bdev_process *process)
2758 {
2759 	struct raid_bdev_process_request *process_req;
2760 
2761 	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
2762 		TAILQ_REMOVE(&process->requests, process_req, link);
2763 		raid_bdev_process_request_free(process_req);
2764 	}
2765 
2766 	free(process);
2767 }
2768 
2769 static struct raid_bdev_process *
2770 raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
2771 			struct raid_base_bdev_info *target)
2772 {
2773 	struct raid_bdev_process *process;
2774 	struct raid_bdev_process_request *process_req;
2775 	int i;
2776 
2777 	process = calloc(1, sizeof(*process));
2778 	if (process == NULL) {
2779 		return NULL;
2780 	}
2781 
2782 	process->raid_bdev = raid_bdev;
2783 	process->type = type;
2784 	process->target = target;
2785 	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
2786 					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
2787 					    raid_bdev->bdev.write_unit_size);
2788 	TAILQ_INIT(&process->requests);
2789 	TAILQ_INIT(&process->finish_actions);
2790 
2791 	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
2792 		process_req = raid_bdev_process_alloc_request(process);
2793 		if (process_req == NULL) {
2794 			raid_bdev_process_free(process);
2795 			return NULL;
2796 		}
2797 
2798 		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2799 	}
2800 
2801 	return process;
2802 }
2803 
2804 static int
2805 raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
2806 {
2807 	struct raid_bdev_process *process;
2808 
2809 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2810 
2811 	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
2812 	if (process == NULL) {
2813 		return -ENOMEM;
2814 	}
2815 
2816 	raid_bdev_process_start(process);
2817 
2818 	return 0;
2819 }
2820 
2821 static void
2822 raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
2823 {
2824 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2825 	int rc;
2826 
2827 	/* TODO: defer if rebuild in progress on another base bdev */
2828 	assert(raid_bdev->process == NULL);
2829 
2830 	base_info->is_configured = true;
2831 
2832 	raid_bdev->num_base_bdevs_discovered++;
2833 	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
2834 	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
2835 	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
2836 
2837 	/*
2838 	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
2839 	 * of base bdevs we know to be operational members of the array. Usually this is equal
2840 	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
2841 	 * degraded.
2842 	 */
2843 	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
2844 		rc = raid_bdev_configure(raid_bdev);
2845 		if (rc != 0) {
2846 			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
2847 		}
2848 	} else if (raid_bdev->num_base_bdevs_discovered > raid_bdev->num_base_bdevs_operational) {
2849 		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
2850 		raid_bdev->num_base_bdevs_operational++;
2851 		rc = raid_bdev_start_rebuild(base_info);
2852 		if (rc != 0) {
2853 			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
2854 			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
2855 		}
2856 	} else {
2857 		rc = 0;
2858 	}
2859 
2860 	if (base_info->configure_cb != NULL) {
2861 		base_info->configure_cb(base_info->configure_cb_ctx, rc);
2862 	}
2863 }
2864 
2865 static void
2866 raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
2867 		void *ctx)
2868 {
2869 	struct raid_base_bdev_info *base_info = ctx;
2870 
2871 	switch (status) {
2872 	case 0:
2873 		/* valid superblock found */
2874 		SPDK_ERRLOG("Existing raid superblock found on bdev %s\n", base_info->name);
2875 		status = -EEXIST;
2876 		raid_bdev_free_base_bdev_resource(base_info);
2877 		break;
2878 	case -EINVAL:
2879 		/* no valid superblock */
2880 		raid_bdev_configure_base_bdev_cont(base_info);
2881 		return;
2882 	default:
2883 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
2884 			    base_info->name, spdk_strerror(-status));
2885 		break;
2886 	}
2887 
2888 	if (base_info->configure_cb != NULL) {
2889 		base_info->configure_cb(base_info->configure_cb_ctx, status);
2890 	}
2891 }
2892 
2893 static int
2894 raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
2895 			      raid_base_bdev_cb cb_fn, void *cb_ctx)
2896 {
2897 	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2898 	struct spdk_bdev_desc *desc;
2899 	struct spdk_bdev *bdev;
2900 	const struct spdk_uuid *bdev_uuid;
2901 	int rc;
2902 
2903 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2904 	assert(base_info->desc == NULL);
2905 
2906 	/*
2907 	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
2908 	 * before claiming the bdev.
2909 	 */
2910 
2911 	if (!spdk_uuid_is_null(&base_info->uuid)) {
2912 		char uuid_str[SPDK_UUID_STRING_LEN];
2913 		const char *bdev_name;
2914 
2915 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
2916 
2917 		/* UUID of a bdev is registered as its alias */
2918 		bdev = spdk_bdev_get_by_name(uuid_str);
2919 		if (bdev == NULL) {
2920 			return -ENODEV;
2921 		}
2922 
2923 		bdev_name = spdk_bdev_get_name(bdev);
2924 
2925 		if (base_info->name == NULL) {
2926 			assert(existing == true);
2927 			base_info->name = strdup(bdev_name);
2928 			if (base_info->name == NULL) {
2929 				return -ENOMEM;
2930 			}
2931 		} else if (strcmp(base_info->name, bdev_name) != 0) {
2932 			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
2933 				    bdev_name, base_info->name);
2934 			return -EINVAL;
2935 		}
2936 	}
2937 
2938 	assert(base_info->name != NULL);
2939 
2940 	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
2941 	if (rc != 0) {
2942 		if (rc != -ENODEV) {
2943 			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
2944 		}
2945 		return rc;
2946 	}
2947 
2948 	bdev = spdk_bdev_desc_get_bdev(desc);
2949 	bdev_uuid = spdk_bdev_get_uuid(bdev);
2950 
2951 	if (spdk_uuid_is_null(&base_info->uuid)) {
2952 		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
2953 	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
2954 		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
2955 		spdk_bdev_close(desc);
2956 		return -EINVAL;
2957 	}
2958 
2959 	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
2960 	if (rc != 0) {
2961 		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
2962 		spdk_bdev_close(desc);
2963 		return rc;
2964 	}
2965 
2966 	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
2967 
2968 	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
2969 	if (base_info->app_thread_ch == NULL) {
2970 		SPDK_ERRLOG("Failed to get io channel\n");
2971 		spdk_bdev_module_release_bdev(bdev);
2972 		spdk_bdev_close(desc);
2973 		return -ENOMEM;
2974 	}
2975 
2976 	base_info->desc = desc;
2977 	base_info->blockcnt = bdev->blockcnt;
2978 
2979 	if (raid_bdev->sb != NULL) {
2980 		uint64_t data_offset;
2981 
2982 		if (base_info->data_offset == 0) {
2983 			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
2984 			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
2985 		} else {
2986 			data_offset = base_info->data_offset;
2987 		}
2988 
2989 		if (bdev->optimal_io_boundary != 0) {
2990 			data_offset = spdk_divide_round_up(data_offset,
2991 							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
2992 			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
2993 				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
2994 					     base_info->data_offset, base_info->name, data_offset);
2995 				data_offset = base_info->data_offset;
2996 			}
2997 		}
2998 
2999 		base_info->data_offset = data_offset;
3000 	}
3001 
3002 	if (base_info->data_offset >= bdev->blockcnt) {
3003 		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3004 			    base_info->data_offset, bdev->blockcnt, base_info->name);
3005 		rc = -EINVAL;
3006 		goto out;
3007 	}
3008 
3009 	if (base_info->data_size == 0) {
3010 		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3011 	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3012 		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3013 			    bdev->blockcnt, base_info->name);
3014 		rc = -EINVAL;
3015 		goto out;
3016 	}
3017 
3018 	/* Currently, RAID bdevs do not support DIF or DIX, so a RAID bdev cannot
3019 	 * be created on top of any bdev which supports it */
3020 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3021 		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3022 			    bdev->name);
3023 		rc = -EINVAL;
3024 		goto out;
3025 	}
3026 
3027 	/*
3028 	 * Set the raid bdev properties if this is the first base bdev configured,
3029 	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3030 	 * have the same blocklen and metadata format.
3031 	 */
3032 	if (raid_bdev->bdev.blocklen == 0) {
3033 		raid_bdev->bdev.blocklen = bdev->blocklen;
3034 		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3035 		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3036 	} else {
3037 		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3038 			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3039 				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3040 			rc = -EINVAL;
3041 			goto out;
3042 		}
3043 
3044 		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3045 		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev)) {
3046 			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3047 				    raid_bdev->bdev.name, bdev->name);
3048 			rc = -EINVAL;
3049 			goto out;
3050 		}
3051 	}
3052 
3053 	base_info->configure_cb = cb_fn;
3054 	base_info->configure_cb_ctx = cb_ctx;
3055 
3056 	if (existing) {
3057 		raid_bdev_configure_base_bdev_cont(base_info);
3058 	} else {
3059 		/* check for existing superblock when using a new bdev */
3060 		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3061 				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3062 		if (rc) {
3063 			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3064 				    bdev->name, spdk_strerror(-rc));
3065 		}
3066 	}
3067 out:
3068 	if (rc != 0) {
3069 		raid_bdev_free_base_bdev_resource(base_info);
3070 	}
3071 	return rc;
3072 }
3073 
3074 static int
3075 _raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3076 			   uint64_t data_offset, uint64_t data_size,
3077 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3078 {
3079 	struct raid_base_bdev_info *base_info;
3080 
3081 	assert(name != NULL);
3082 
3083 	if (slot >= raid_bdev->num_base_bdevs) {
3084 		return -EINVAL;
3085 	}
3086 
3087 	base_info = &raid_bdev->base_bdev_info[slot];
3088 
3089 	if (base_info->name != NULL) {
3090 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev '%s'\n",
3091 			    slot, raid_bdev->bdev.name, base_info->name);
3092 		return -EBUSY;
3093 	}
3094 
3095 	if (!spdk_uuid_is_null(&base_info->uuid)) {
3096 		char uuid_str[SPDK_UUID_STRING_LEN];
3097 
3098 		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3099 		SPDK_ERRLOG("Slot %u on raid bdev '%s' already assigned to bdev with uuid %s\n",
3100 			    slot, raid_bdev->bdev.name, uuid_str);
3101 		return -EBUSY;
3102 	}
3103 
3104 	base_info->name = strdup(name);
3105 	if (base_info->name == NULL) {
3106 		return -ENOMEM;
3107 	}
3108 
3109 	base_info->data_offset = data_offset;
3110 	base_info->data_size = data_size;
3111 
3112 	return raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3113 }
3114 
3115 int
3116 raid_bdev_attach_base_bdev(struct raid_bdev *raid_bdev, struct spdk_bdev *base_bdev,
3117 			   raid_base_bdev_cb cb_fn, void *cb_ctx)
3118 {
3119 	struct raid_base_bdev_info *base_info = NULL, *iter;
3120 	int rc;
3121 
3122 	SPDK_DEBUGLOG(bdev_raid, "attach_base_device: %s\n", base_bdev->name);
3123 
3124 	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3125 
3126 	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
3127 		SPDK_ERRLOG("raid bdev '%s' must be in online state to attach base bdev\n",
3128 			    raid_bdev->bdev.name);
3129 		return -EINVAL;
3130 	}
3131 
3132 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3133 		if (iter->desc == NULL) {
3134 			base_info = iter;
3135 			break;
3136 		}
3137 	}
3138 
3139 	if (base_info == NULL) {
3140 		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3141 			    raid_bdev->bdev.name, base_bdev->name);
3142 		return -EINVAL;
3143 	}
3144 
3145 	assert(base_info->is_configured == false);
3146 	assert(base_info->data_size != 0);
3147 
3148 	spdk_spin_lock(&raid_bdev->base_bdev_lock);
3149 
3150 	rc = _raid_bdev_add_base_device(raid_bdev, base_bdev->name,
3151 					raid_bdev_base_bdev_slot(base_info),
3152 					base_info->data_offset, base_info->data_size,
3153 					cb_fn, cb_ctx);
3154 	if (rc != 0) {
3155 		SPDK_ERRLOG("base bdev '%s' attach failed: %s\n", base_bdev->name, spdk_strerror(-rc));
3156 		raid_bdev_free_base_bdev_resource(base_info);
3157 	}
3158 
3159 	spdk_spin_unlock(&raid_bdev->base_bdev_lock);
3160 
3161 	return rc;
3162 }
3163 
3164 /*
3165  * brief:
3166  * raid_bdev_add_base_device function is the actual function which either adds
3167  * the nvme base device to existing raid bdev or create a new raid bdev. It also claims
3168  * the base device and keep the open descriptor.
3169  * params:
3170  * raid_bdev - pointer to raid bdev
3171  * name - name of the base bdev
3172  * slot - position to add base bdev
3173  * cb_fn - callback function
3174  * cb_ctx - argument to callback function
3175  * returns:
3176  * 0 - success
3177  * non zero - failure
3178  */
3179 int
3180 raid_bdev_add_base_device(struct raid_bdev *raid_bdev, const char *name, uint8_t slot,
3181 			  raid_base_bdev_cb cb_fn, void *cb_ctx)
3182 {
3183 	return _raid_bdev_add_base_device(raid_bdev, name, slot, 0, 0, cb_fn, cb_ctx);
3184 }
3185 
3186 static int
3187 raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3188 {
3189 	struct raid_bdev *raid_bdev;
3190 	uint8_t i;
3191 	int rc;
3192 
3193 	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3194 			       sb->level, true, &sb->uuid, &raid_bdev);
3195 	if (rc != 0) {
3196 		return rc;
3197 	}
3198 
3199 	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3200 	memcpy(raid_bdev->sb, sb, sb->length);
3201 
3202 	for (i = 0; i < sb->base_bdevs_size; i++) {
3203 		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3204 		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3205 
3206 		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3207 			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3208 			raid_bdev->num_base_bdevs_operational++;
3209 		}
3210 
3211 		base_info->data_offset = sb_base_bdev->data_offset;
3212 		base_info->data_size = sb_base_bdev->data_size;
3213 	}
3214 
3215 	*raid_bdev_out = raid_bdev;
3216 	return 0;
3217 }
3218 
3219 static void
3220 raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3221 {
3222 	struct raid_bdev *raid_bdev;
3223 	struct raid_base_bdev_info *base_info;
3224 
3225 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3226 		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3227 			if (base_info->desc == NULL && base_info->name != NULL &&
3228 			    strcmp(bdev->name, base_info->name) == 0) {
3229 				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3230 				break;
3231 			}
3232 		}
3233 	}
3234 }
3235 
3236 static void
3237 raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev)
3238 {
3239 	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3240 	struct raid_bdev *raid_bdev;
3241 	struct raid_base_bdev_info *iter, *base_info;
3242 	uint8_t i;
3243 	int rc;
3244 
3245 	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3246 		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3247 			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3248 		return;
3249 	}
3250 
3251 	if (spdk_uuid_is_null(&sb->uuid)) {
3252 		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3253 		return;
3254 	}
3255 
3256 	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3257 		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3258 			break;
3259 		}
3260 	}
3261 
3262 	if (raid_bdev) {
3263 		if (sb->seq_number > raid_bdev->sb->seq_number) {
3264 			SPDK_DEBUGLOG(bdev_raid,
3265 				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3266 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3267 
3268 			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3269 				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3270 					     raid_bdev->bdev.name, bdev->name);
3271 				return;
3272 			}
3273 
3274 			/* remove and then recreate the raid bdev using the newer superblock */
3275 			raid_bdev_delete(raid_bdev, NULL, NULL);
3276 			raid_bdev = NULL;
3277 		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3278 			SPDK_DEBUGLOG(bdev_raid,
3279 				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3280 				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3281 			/* use the current raid bdev superblock */
3282 			sb = raid_bdev->sb;
3283 		}
3284 	}
3285 
3286 	for (i = 0; i < sb->base_bdevs_size; i++) {
3287 		sb_base_bdev = &sb->base_bdevs[i];
3288 
3289 		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3290 
3291 		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3292 			break;
3293 		}
3294 	}
3295 
3296 	if (i == sb->base_bdevs_size) {
3297 		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3298 		return;
3299 	}
3300 
3301 	if (!raid_bdev) {
3302 		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3303 		if (rc != 0) {
3304 			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3305 				    sb->name, spdk_strerror(-rc));
3306 			return;
3307 		}
3308 	}
3309 
3310 	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3311 		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3312 			       bdev->name, raid_bdev->bdev.name);
3313 		return;
3314 	}
3315 
3316 	base_info = NULL;
3317 	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3318 		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3319 			base_info = iter;
3320 			break;
3321 		}
3322 	}
3323 
3324 	if (base_info == NULL) {
3325 		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3326 			    bdev->name, raid_bdev->bdev.name);
3327 		return;
3328 	}
3329 
3330 	rc = raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3331 	if (rc != 0) {
3332 		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3333 			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3334 	}
3335 }
3336 
3337 struct raid_bdev_examine_ctx {
3338 	struct spdk_bdev_desc *desc;
3339 	struct spdk_io_channel *ch;
3340 };
3341 
3342 static void
3343 raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3344 {
3345 	if (!ctx) {
3346 		return;
3347 	}
3348 
3349 	if (ctx->ch) {
3350 		spdk_put_io_channel(ctx->ch);
3351 	}
3352 
3353 	if (ctx->desc) {
3354 		spdk_bdev_close(ctx->desc);
3355 	}
3356 
3357 	free(ctx);
3358 }
3359 
3360 static void
3361 raid_bdev_examine_load_sb_cb(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3362 {
3363 	struct raid_bdev_examine_ctx *ctx = _ctx;
3364 	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3365 
3366 	switch (status) {
3367 	case 0:
3368 		/* valid superblock found */
3369 		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3370 		raid_bdev_examine_sb(sb, bdev);
3371 		break;
3372 	case -EINVAL:
3373 		/* no valid superblock, check if it can be claimed anyway */
3374 		raid_bdev_examine_no_sb(bdev);
3375 		break;
3376 	default:
3377 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3378 			    bdev->name, spdk_strerror(-status));
3379 		break;
3380 	}
3381 
3382 	raid_bdev_examine_ctx_free(ctx);
3383 	spdk_bdev_module_examine_done(&g_raid_if);
3384 }
3385 
3386 static void
3387 raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3388 {
3389 }
3390 
3391 /*
3392  * brief:
3393  * raid_bdev_examine function is the examine function call by the below layers
3394  * like bdev_nvme layer. This function will check if this base bdev can be
3395  * claimed by this raid bdev or not.
3396  * params:
3397  * bdev - pointer to base bdev
3398  * returns:
3399  * none
3400  */
3401 static void
3402 raid_bdev_examine(struct spdk_bdev *bdev)
3403 {
3404 	struct raid_bdev_examine_ctx *ctx;
3405 	int rc;
3406 
3407 	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3408 		raid_bdev_examine_no_sb(bdev);
3409 		spdk_bdev_module_examine_done(&g_raid_if);
3410 		return;
3411 	}
3412 
3413 	ctx = calloc(1, sizeof(*ctx));
3414 	if (!ctx) {
3415 		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3416 			    bdev->name, spdk_strerror(ENOMEM));
3417 		goto err;
3418 	}
3419 
3420 	rc = spdk_bdev_open_ext(spdk_bdev_get_name(bdev), false, raid_bdev_examine_event_cb, NULL,
3421 				&ctx->desc);
3422 	if (rc) {
3423 		SPDK_ERRLOG("Failed to open bdev %s: %s\n",
3424 			    bdev->name, spdk_strerror(-rc));
3425 		goto err;
3426 	}
3427 
3428 	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3429 	if (!ctx->ch) {
3430 		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev->name);
3431 		goto err;
3432 	}
3433 
3434 	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_cb, ctx);
3435 	if (rc) {
3436 		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3437 			    bdev->name, spdk_strerror(-rc));
3438 		goto err;
3439 	}
3440 
3441 	return;
3442 err:
3443 	raid_bdev_examine_ctx_free(ctx);
3444 	spdk_bdev_module_examine_done(&g_raid_if);
3445 }
3446 
3447 /* Log component for bdev raid bdev module */
3448 SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3449