xref: /spdk/module/bdev/raid/bdev_raid.c (revision fa3ab73844ced08f4f9487f5de71d477ca5cf604)
1  /*   SPDX-License-Identifier: BSD-3-Clause
2   *   Copyright (C) 2018 Intel Corporation.
3   *   All rights reserved.
4   *   Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5   */
6  
7  #include "bdev_raid.h"
8  #include "spdk/env.h"
9  #include "spdk/thread.h"
10  #include "spdk/log.h"
11  #include "spdk/string.h"
12  #include "spdk/util.h"
13  #include "spdk/json.h"
14  #include "spdk/likely.h"
15  #include "spdk/trace.h"
16  #include "spdk_internal/trace_defs.h"
17  
18  #define RAID_OFFSET_BLOCKS_INVALID	UINT64_MAX
19  #define RAID_BDEV_PROCESS_MAX_QD	16
20  
21  #define RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT	1024
22  #define RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT	0
23  
24  static bool g_shutdown_started = false;
25  
26  /* List of all raid bdevs */
27  struct raid_all_tailq g_raid_bdev_list = TAILQ_HEAD_INITIALIZER(g_raid_bdev_list);
28  
29  static TAILQ_HEAD(, raid_bdev_module) g_raid_modules = TAILQ_HEAD_INITIALIZER(g_raid_modules);
30  
31  /*
32   * raid_bdev_io_channel is the context of spdk_io_channel for raid bdev device. It
33   * contains the relationship of raid bdev io channel with base bdev io channels.
34   */
35  struct raid_bdev_io_channel {
36  	/* Array of IO channels of base bdevs */
37  	struct spdk_io_channel	**base_channel;
38  
39  	/* Private raid module IO channel */
40  	struct spdk_io_channel	*module_channel;
41  
42  	/* Background process data */
43  	struct {
44  		uint64_t offset;
45  		struct spdk_io_channel *target_ch;
46  		struct raid_bdev_io_channel *ch_processed;
47  	} process;
48  };
49  
50  enum raid_bdev_process_state {
51  	RAID_PROCESS_STATE_INIT,
52  	RAID_PROCESS_STATE_RUNNING,
53  	RAID_PROCESS_STATE_STOPPING,
54  	RAID_PROCESS_STATE_STOPPED,
55  };
56  
57  struct raid_process_qos {
58  	bool enable_qos;
59  	uint64_t last_tsc;
60  	double bytes_per_tsc;
61  	double bytes_available;
62  	double bytes_max;
63  	struct spdk_poller *process_continue_poller;
64  };
65  
66  struct raid_bdev_process {
67  	struct raid_bdev		*raid_bdev;
68  	enum raid_process_type		type;
69  	enum raid_bdev_process_state	state;
70  	struct spdk_thread		*thread;
71  	struct raid_bdev_io_channel	*raid_ch;
72  	TAILQ_HEAD(, raid_bdev_process_request) requests;
73  	uint64_t			max_window_size;
74  	uint64_t			window_size;
75  	uint64_t			window_remaining;
76  	int				window_status;
77  	uint64_t			window_offset;
78  	bool				window_range_locked;
79  	struct raid_base_bdev_info	*target;
80  	int				status;
81  	TAILQ_HEAD(, raid_process_finish_action) finish_actions;
82  	struct raid_process_qos		qos;
83  };
84  
85  struct raid_process_finish_action {
86  	spdk_msg_fn cb;
87  	void *cb_ctx;
88  	TAILQ_ENTRY(raid_process_finish_action) link;
89  };
90  
91  static struct spdk_raid_bdev_opts g_opts = {
92  	.process_window_size_kb = RAID_BDEV_PROCESS_WINDOW_SIZE_KB_DEFAULT,
93  	.process_max_bandwidth_mb_sec = RAID_BDEV_PROCESS_MAX_BANDWIDTH_MB_SEC_DEFAULT,
94  };
95  
96  void
97  raid_bdev_get_opts(struct spdk_raid_bdev_opts *opts)
98  {
99  	*opts = g_opts;
100  }
101  
102  int
103  raid_bdev_set_opts(const struct spdk_raid_bdev_opts *opts)
104  {
105  	if (opts->process_window_size_kb == 0) {
106  		return -EINVAL;
107  	}
108  
109  	g_opts = *opts;
110  
111  	return 0;
112  }
113  
114  static struct raid_bdev_module *
115  raid_bdev_module_find(enum raid_level level)
116  {
117  	struct raid_bdev_module *raid_module;
118  
119  	TAILQ_FOREACH(raid_module, &g_raid_modules, link) {
120  		if (raid_module->level == level) {
121  			return raid_module;
122  		}
123  	}
124  
125  	return NULL;
126  }
127  
128  void
129  raid_bdev_module_list_add(struct raid_bdev_module *raid_module)
130  {
131  	if (raid_bdev_module_find(raid_module->level) != NULL) {
132  		SPDK_ERRLOG("module for raid level '%s' already registered.\n",
133  			    raid_bdev_level_to_str(raid_module->level));
134  		assert(false);
135  	} else {
136  		TAILQ_INSERT_TAIL(&g_raid_modules, raid_module, link);
137  	}
138  }
139  
140  struct spdk_io_channel *
141  raid_bdev_channel_get_base_channel(struct raid_bdev_io_channel *raid_ch, uint8_t idx)
142  {
143  	return raid_ch->base_channel[idx];
144  }
145  
146  void *
147  raid_bdev_channel_get_module_ctx(struct raid_bdev_io_channel *raid_ch)
148  {
149  	assert(raid_ch->module_channel != NULL);
150  
151  	return spdk_io_channel_get_ctx(raid_ch->module_channel);
152  }
153  
154  struct raid_base_bdev_info *
155  raid_bdev_channel_get_base_info(struct raid_bdev_io_channel *raid_ch, struct spdk_bdev *base_bdev)
156  {
157  	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
158  	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
159  	uint8_t i;
160  
161  	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
162  		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[i];
163  
164  		if (base_info->is_configured &&
165  		    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
166  			return base_info;
167  		}
168  	}
169  
170  	return NULL;
171  }
172  
173  /* Function declarations */
174  static void	raid_bdev_examine(struct spdk_bdev *bdev);
175  static int	raid_bdev_init(void);
176  static void	raid_bdev_deconfigure(struct raid_bdev *raid_bdev,
177  				      raid_bdev_destruct_cb cb_fn, void *cb_arg);
178  
179  static void
180  raid_bdev_ch_process_cleanup(struct raid_bdev_io_channel *raid_ch)
181  {
182  	raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
183  
184  	if (raid_ch->process.target_ch != NULL) {
185  		spdk_put_io_channel(raid_ch->process.target_ch);
186  		raid_ch->process.target_ch = NULL;
187  	}
188  
189  	if (raid_ch->process.ch_processed != NULL) {
190  		free(raid_ch->process.ch_processed->base_channel);
191  		free(raid_ch->process.ch_processed);
192  		raid_ch->process.ch_processed = NULL;
193  	}
194  }
195  
196  static int
197  raid_bdev_ch_process_setup(struct raid_bdev_io_channel *raid_ch, struct raid_bdev_process *process)
198  {
199  	struct raid_bdev *raid_bdev = process->raid_bdev;
200  	struct raid_bdev_io_channel *raid_ch_processed;
201  	struct raid_base_bdev_info *base_info;
202  
203  	raid_ch->process.offset = process->window_offset;
204  
205  	/* In the future we may have other types of processes which don't use a target bdev,
206  	 * like data scrubbing or strip size migration. Until then, expect that there always is
207  	 * a process target. */
208  	assert(process->target != NULL);
209  
210  	raid_ch->process.target_ch = spdk_bdev_get_io_channel(process->target->desc);
211  	if (raid_ch->process.target_ch == NULL) {
212  		goto err;
213  	}
214  
215  	raid_ch_processed = calloc(1, sizeof(*raid_ch_processed));
216  	if (raid_ch_processed == NULL) {
217  		goto err;
218  	}
219  	raid_ch->process.ch_processed = raid_ch_processed;
220  
221  	raid_ch_processed->base_channel = calloc(raid_bdev->num_base_bdevs,
222  					  sizeof(*raid_ch_processed->base_channel));
223  	if (raid_ch_processed->base_channel == NULL) {
224  		goto err;
225  	}
226  
227  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
228  		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
229  
230  		if (base_info != process->target) {
231  			raid_ch_processed->base_channel[slot] = raid_ch->base_channel[slot];
232  		} else {
233  			raid_ch_processed->base_channel[slot] = raid_ch->process.target_ch;
234  		}
235  	}
236  
237  	raid_ch_processed->module_channel = raid_ch->module_channel;
238  	raid_ch_processed->process.offset = RAID_OFFSET_BLOCKS_INVALID;
239  
240  	return 0;
241  err:
242  	raid_bdev_ch_process_cleanup(raid_ch);
243  	return -ENOMEM;
244  }
245  
246  /*
247   * brief:
248   * raid_bdev_create_cb function is a cb function for raid bdev which creates the
249   * hierarchy from raid bdev to base bdev io channels. It will be called per core
250   * params:
251   * io_device - pointer to raid bdev io device represented by raid_bdev
252   * ctx_buf - pointer to context buffer for raid bdev io channel
253   * returns:
254   * 0 - success
255   * non zero - failure
256   */
257  static int
258  raid_bdev_create_cb(void *io_device, void *ctx_buf)
259  {
260  	struct raid_bdev            *raid_bdev = io_device;
261  	struct raid_bdev_io_channel *raid_ch = ctx_buf;
262  	uint8_t i;
263  	int ret = -ENOMEM;
264  
265  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_create_cb, %p\n", raid_ch);
266  
267  	assert(raid_bdev != NULL);
268  	assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
269  
270  	raid_ch->base_channel = calloc(raid_bdev->num_base_bdevs, sizeof(struct spdk_io_channel *));
271  	if (!raid_ch->base_channel) {
272  		SPDK_ERRLOG("Unable to allocate base bdevs io channel\n");
273  		return -ENOMEM;
274  	}
275  
276  	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
277  		/*
278  		 * Get the spdk_io_channel for all the base bdevs. This is used during
279  		 * split logic to send the respective child bdev ios to respective base
280  		 * bdev io channel.
281  		 * Skip missing base bdevs and the process target, which should also be treated as
282  		 * missing until the process completes.
283  		 */
284  		if (raid_bdev->base_bdev_info[i].is_configured == false ||
285  		    raid_bdev->base_bdev_info[i].is_process_target == true) {
286  			continue;
287  		}
288  		raid_ch->base_channel[i] = spdk_bdev_get_io_channel(
289  						   raid_bdev->base_bdev_info[i].desc);
290  		if (!raid_ch->base_channel[i]) {
291  			SPDK_ERRLOG("Unable to create io channel for base bdev\n");
292  			goto err;
293  		}
294  	}
295  
296  	if (raid_bdev->module->get_io_channel) {
297  		raid_ch->module_channel = raid_bdev->module->get_io_channel(raid_bdev);
298  		if (!raid_ch->module_channel) {
299  			SPDK_ERRLOG("Unable to create io channel for raid module\n");
300  			goto err;
301  		}
302  	}
303  
304  	if (raid_bdev->process != NULL) {
305  		ret = raid_bdev_ch_process_setup(raid_ch, raid_bdev->process);
306  		if (ret != 0) {
307  			SPDK_ERRLOG("Failed to setup process io channel\n");
308  			goto err;
309  		}
310  	} else {
311  		raid_ch->process.offset = RAID_OFFSET_BLOCKS_INVALID;
312  	}
313  
314  	return 0;
315  err:
316  	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
317  		if (raid_ch->base_channel[i] != NULL) {
318  			spdk_put_io_channel(raid_ch->base_channel[i]);
319  		}
320  	}
321  	free(raid_ch->base_channel);
322  
323  	raid_bdev_ch_process_cleanup(raid_ch);
324  
325  	return ret;
326  }
327  
328  /*
329   * brief:
330   * raid_bdev_destroy_cb function is a cb function for raid bdev which deletes the
331   * hierarchy from raid bdev to base bdev io channels. It will be called per core
332   * params:
333   * io_device - pointer to raid bdev io device represented by raid_bdev
334   * ctx_buf - pointer to context buffer for raid bdev io channel
335   * returns:
336   * none
337   */
338  static void
339  raid_bdev_destroy_cb(void *io_device, void *ctx_buf)
340  {
341  	struct raid_bdev *raid_bdev = io_device;
342  	struct raid_bdev_io_channel *raid_ch = ctx_buf;
343  	uint8_t i;
344  
345  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destroy_cb\n");
346  
347  	assert(raid_ch != NULL);
348  	assert(raid_ch->base_channel);
349  
350  	if (raid_ch->module_channel) {
351  		spdk_put_io_channel(raid_ch->module_channel);
352  	}
353  
354  	for (i = 0; i < raid_bdev->num_base_bdevs; i++) {
355  		/* Free base bdev channels */
356  		if (raid_ch->base_channel[i] != NULL) {
357  			spdk_put_io_channel(raid_ch->base_channel[i]);
358  		}
359  	}
360  	free(raid_ch->base_channel);
361  	raid_ch->base_channel = NULL;
362  
363  	raid_bdev_ch_process_cleanup(raid_ch);
364  }
365  
366  /*
367   * brief:
368   * raid_bdev_cleanup is used to cleanup raid_bdev related data
369   * structures.
370   * params:
371   * raid_bdev - pointer to raid_bdev
372   * returns:
373   * none
374   */
375  static void
376  raid_bdev_cleanup(struct raid_bdev *raid_bdev)
377  {
378  	struct raid_base_bdev_info *base_info;
379  
380  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_cleanup, %p name %s, state %s\n",
381  		      raid_bdev, raid_bdev->bdev.name, raid_bdev_state_to_str(raid_bdev->state));
382  	assert(raid_bdev->state != RAID_BDEV_STATE_ONLINE);
383  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
384  
385  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
386  		assert(base_info->desc == NULL);
387  		free(base_info->name);
388  	}
389  
390  	TAILQ_REMOVE(&g_raid_bdev_list, raid_bdev, global_link);
391  }
392  
393  static void
394  raid_bdev_free(struct raid_bdev *raid_bdev)
395  {
396  	raid_bdev_free_superblock(raid_bdev);
397  	free(raid_bdev->base_bdev_info);
398  	free(raid_bdev->bdev.name);
399  	free(raid_bdev);
400  }
401  
402  static void
403  raid_bdev_cleanup_and_free(struct raid_bdev *raid_bdev)
404  {
405  	raid_bdev_cleanup(raid_bdev);
406  	raid_bdev_free(raid_bdev);
407  }
408  
409  static void
410  raid_bdev_deconfigure_base_bdev(struct raid_base_bdev_info *base_info)
411  {
412  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
413  
414  	assert(base_info->is_configured);
415  	assert(raid_bdev->num_base_bdevs_discovered);
416  	raid_bdev->num_base_bdevs_discovered--;
417  	base_info->is_configured = false;
418  	base_info->is_process_target = false;
419  }
420  
421  /*
422   * brief:
423   * free resource of base bdev for raid bdev
424   * params:
425   * base_info - raid base bdev info
426   * returns:
427   * none
428   */
429  static void
430  raid_bdev_free_base_bdev_resource(struct raid_base_bdev_info *base_info)
431  {
432  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
433  
434  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
435  	assert(base_info->configure_cb == NULL);
436  
437  	free(base_info->name);
438  	base_info->name = NULL;
439  	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
440  		spdk_uuid_set_null(&base_info->uuid);
441  	}
442  	base_info->is_failed = false;
443  
444  	/* clear `data_offset` to allow it to be recalculated during configuration */
445  	base_info->data_offset = 0;
446  
447  	if (base_info->desc == NULL) {
448  		return;
449  	}
450  
451  	spdk_bdev_module_release_bdev(spdk_bdev_desc_get_bdev(base_info->desc));
452  	spdk_bdev_close(base_info->desc);
453  	base_info->desc = NULL;
454  	spdk_put_io_channel(base_info->app_thread_ch);
455  	base_info->app_thread_ch = NULL;
456  
457  	if (base_info->is_configured) {
458  		raid_bdev_deconfigure_base_bdev(base_info);
459  	}
460  }
461  
462  static void
463  raid_bdev_io_device_unregister_cb(void *io_device)
464  {
465  	struct raid_bdev *raid_bdev = io_device;
466  
467  	if (raid_bdev->num_base_bdevs_discovered == 0) {
468  		/* Free raid_bdev when there are no base bdevs left */
469  		SPDK_DEBUGLOG(bdev_raid, "raid bdev base bdevs is 0, going to free all in destruct\n");
470  		raid_bdev_cleanup(raid_bdev);
471  		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
472  		raid_bdev_free(raid_bdev);
473  	} else {
474  		spdk_bdev_destruct_done(&raid_bdev->bdev, 0);
475  	}
476  }
477  
478  void
479  raid_bdev_module_stop_done(struct raid_bdev *raid_bdev)
480  {
481  	if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
482  		spdk_io_device_unregister(raid_bdev, raid_bdev_io_device_unregister_cb);
483  	}
484  }
485  
486  static void
487  _raid_bdev_destruct(void *ctxt)
488  {
489  	struct raid_bdev *raid_bdev = ctxt;
490  	struct raid_base_bdev_info *base_info;
491  
492  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_destruct\n");
493  
494  	assert(raid_bdev->process == NULL);
495  
496  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
497  		/*
498  		 * Close all base bdev descriptors for which call has come from below
499  		 * layers.  Also close the descriptors if we have started shutdown.
500  		 */
501  		if (g_shutdown_started || base_info->remove_scheduled == true) {
502  			raid_bdev_free_base_bdev_resource(base_info);
503  		}
504  	}
505  
506  	if (g_shutdown_started) {
507  		raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
508  	}
509  
510  	if (raid_bdev->module->stop != NULL) {
511  		if (raid_bdev->module->stop(raid_bdev) == false) {
512  			return;
513  		}
514  	}
515  
516  	raid_bdev_module_stop_done(raid_bdev);
517  }
518  
519  static int
520  raid_bdev_destruct(void *ctx)
521  {
522  	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_destruct, ctx);
523  
524  	return 1;
525  }
526  
527  int
528  raid_bdev_remap_dix_reftag(void *md_buf, uint64_t num_blocks,
529  			   struct spdk_bdev *bdev, uint32_t remapped_offset)
530  {
531  	struct spdk_dif_ctx dif_ctx;
532  	struct spdk_dif_error err_blk = {};
533  	int rc;
534  	struct spdk_dif_ctx_init_ext_opts dif_opts;
535  	struct iovec md_iov = {
536  		.iov_base	= md_buf,
537  		.iov_len	= num_blocks * bdev->md_len,
538  	};
539  
540  	if (md_buf == NULL) {
541  		return 0;
542  	}
543  
544  	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
545  	dif_opts.dif_pi_format = bdev->dif_pi_format;
546  	rc = spdk_dif_ctx_init(&dif_ctx,
547  			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
548  			       bdev->dif_is_head_of_md, bdev->dif_type,
549  			       SPDK_DIF_FLAGS_REFTAG_CHECK,
550  			       0, 0, 0, 0, 0, &dif_opts);
551  	if (rc != 0) {
552  		SPDK_ERRLOG("Initialization of DIF context failed\n");
553  		return rc;
554  	}
555  
556  	spdk_dif_ctx_set_remapped_init_ref_tag(&dif_ctx, remapped_offset);
557  
558  	rc = spdk_dix_remap_ref_tag(&md_iov, num_blocks, &dif_ctx, &err_blk, false);
559  	if (rc != 0) {
560  		SPDK_ERRLOG("Remapping reference tag failed. type=%d, offset=%d"
561  			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
562  	}
563  
564  	return rc;
565  }
566  
567  int
568  raid_bdev_verify_dix_reftag(struct iovec *iovs, int iovcnt, void *md_buf,
569  			    uint64_t num_blocks, struct spdk_bdev *bdev, uint32_t offset_blocks)
570  {
571  	struct spdk_dif_ctx dif_ctx;
572  	struct spdk_dif_error err_blk = {};
573  	int rc;
574  	struct spdk_dif_ctx_init_ext_opts dif_opts;
575  	struct iovec md_iov = {
576  		.iov_base	= md_buf,
577  		.iov_len	= num_blocks * bdev->md_len,
578  	};
579  
580  	if (md_buf == NULL) {
581  		return 0;
582  	}
583  
584  	dif_opts.size = SPDK_SIZEOF(&dif_opts, dif_pi_format);
585  	dif_opts.dif_pi_format = bdev->dif_pi_format;
586  	rc = spdk_dif_ctx_init(&dif_ctx,
587  			       bdev->blocklen, bdev->md_len, bdev->md_interleave,
588  			       bdev->dif_is_head_of_md, bdev->dif_type,
589  			       SPDK_DIF_FLAGS_REFTAG_CHECK,
590  			       offset_blocks, 0, 0, 0, 0, &dif_opts);
591  	if (rc != 0) {
592  		SPDK_ERRLOG("Initialization of DIF context failed\n");
593  		return rc;
594  	}
595  
596  	rc = spdk_dix_verify(iovs, iovcnt, &md_iov, num_blocks, &dif_ctx, &err_blk);
597  	if (rc != 0) {
598  		SPDK_ERRLOG("Reference tag check failed. type=%d, offset=%d"
599  			    PRIu32 "\n", err_blk.err_type, err_blk.err_offset);
600  	}
601  
602  	return rc;
603  }
604  
605  void
606  raid_bdev_io_complete(struct raid_bdev_io *raid_io, enum spdk_bdev_io_status status)
607  {
608  	struct spdk_bdev_io *bdev_io = spdk_bdev_io_from_ctx(raid_io);
609  	int rc;
610  
611  	spdk_trace_record(TRACE_BDEV_RAID_IO_DONE, 0, 0, (uintptr_t)raid_io, (uintptr_t)bdev_io);
612  
613  	if (raid_io->split.offset != RAID_OFFSET_BLOCKS_INVALID) {
614  		struct iovec *split_iov = raid_io->split.iov;
615  		const struct iovec *split_iov_orig = &raid_io->split.iov_copy;
616  
617  		/*
618  		 * Non-zero offset here means that this is the completion of the first part of the
619  		 * split I/O (the higher LBAs). Then, we submit the second part and set offset to 0.
620  		 */
621  		if (raid_io->split.offset != 0) {
622  			raid_io->offset_blocks = bdev_io->u.bdev.offset_blocks;
623  			raid_io->md_buf = bdev_io->u.bdev.md_buf;
624  
625  			if (status == SPDK_BDEV_IO_STATUS_SUCCESS) {
626  				raid_io->num_blocks = raid_io->split.offset;
627  				raid_io->iovcnt = raid_io->iovs - bdev_io->u.bdev.iovs;
628  				raid_io->iovs = bdev_io->u.bdev.iovs;
629  				if (split_iov != NULL) {
630  					raid_io->iovcnt++;
631  					split_iov->iov_len = split_iov->iov_base - split_iov_orig->iov_base;
632  					split_iov->iov_base = split_iov_orig->iov_base;
633  				}
634  
635  				raid_io->split.offset = 0;
636  				raid_io->base_bdev_io_submitted = 0;
637  				raid_io->raid_ch = raid_io->raid_ch->process.ch_processed;
638  
639  				raid_io->raid_bdev->module->submit_rw_request(raid_io);
640  				return;
641  			}
642  		}
643  
644  		raid_io->num_blocks = bdev_io->u.bdev.num_blocks;
645  		raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
646  		raid_io->iovs = bdev_io->u.bdev.iovs;
647  		if (split_iov != NULL) {
648  			*split_iov = *split_iov_orig;
649  		}
650  	}
651  
652  	if (spdk_unlikely(raid_io->completion_cb != NULL)) {
653  		raid_io->completion_cb(raid_io, status);
654  	} else {
655  		if (spdk_unlikely(bdev_io->type == SPDK_BDEV_IO_TYPE_READ &&
656  				  spdk_bdev_get_dif_type(bdev_io->bdev) != SPDK_DIF_DISABLE &&
657  				  bdev_io->bdev->dif_check_flags & SPDK_DIF_FLAGS_REFTAG_CHECK &&
658  				  status == SPDK_BDEV_IO_STATUS_SUCCESS)) {
659  
660  			rc = raid_bdev_remap_dix_reftag(bdev_io->u.bdev.md_buf,
661  							bdev_io->u.bdev.num_blocks, bdev_io->bdev,
662  							bdev_io->u.bdev.offset_blocks);
663  			if (rc != 0) {
664  				status = SPDK_BDEV_IO_STATUS_FAILED;
665  			}
666  		}
667  		spdk_bdev_io_complete(bdev_io, status);
668  	}
669  }
670  
671  /*
672   * brief:
673   * raid_bdev_io_complete_part - signal the completion of a part of the expected
674   * base bdev IOs and complete the raid_io if this is the final expected IO.
675   * The caller should first set raid_io->base_bdev_io_remaining. This function
676   * will decrement this counter by the value of the 'completed' parameter and
677   * complete the raid_io if the counter reaches 0. The caller is free to
678   * interpret the 'base_bdev_io_remaining' and 'completed' values as needed,
679   * it can represent e.g. blocks or IOs.
680   * params:
681   * raid_io - pointer to raid_bdev_io
682   * completed - the part of the raid_io that has been completed
683   * status - status of the base IO
684   * returns:
685   * true - if the raid_io is completed
686   * false - otherwise
687   */
688  bool
689  raid_bdev_io_complete_part(struct raid_bdev_io *raid_io, uint64_t completed,
690  			   enum spdk_bdev_io_status status)
691  {
692  	assert(raid_io->base_bdev_io_remaining >= completed);
693  	raid_io->base_bdev_io_remaining -= completed;
694  
695  	if (status != raid_io->base_bdev_io_status_default) {
696  		raid_io->base_bdev_io_status = status;
697  	}
698  
699  	if (raid_io->base_bdev_io_remaining == 0) {
700  		raid_bdev_io_complete(raid_io, raid_io->base_bdev_io_status);
701  		return true;
702  	} else {
703  		return false;
704  	}
705  }
706  
707  /*
708   * brief:
709   * raid_bdev_queue_io_wait function processes the IO which failed to submit.
710   * It will try to queue the IOs after storing the context to bdev wait queue logic.
711   * params:
712   * raid_io - pointer to raid_bdev_io
713   * bdev - the block device that the IO is submitted to
714   * ch - io channel
715   * cb_fn - callback when the spdk_bdev_io for bdev becomes available
716   * returns:
717   * none
718   */
719  void
720  raid_bdev_queue_io_wait(struct raid_bdev_io *raid_io, struct spdk_bdev *bdev,
721  			struct spdk_io_channel *ch, spdk_bdev_io_wait_cb cb_fn)
722  {
723  	raid_io->waitq_entry.bdev = bdev;
724  	raid_io->waitq_entry.cb_fn = cb_fn;
725  	raid_io->waitq_entry.cb_arg = raid_io;
726  	spdk_bdev_queue_io_wait(bdev, ch, &raid_io->waitq_entry);
727  }
728  
729  static void
730  raid_base_bdev_reset_complete(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
731  {
732  	struct raid_bdev_io *raid_io = cb_arg;
733  
734  	spdk_bdev_free_io(bdev_io);
735  
736  	raid_bdev_io_complete_part(raid_io, 1, success ?
737  				   SPDK_BDEV_IO_STATUS_SUCCESS :
738  				   SPDK_BDEV_IO_STATUS_FAILED);
739  }
740  
741  static void raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io);
742  
743  static void
744  _raid_bdev_submit_reset_request(void *_raid_io)
745  {
746  	struct raid_bdev_io *raid_io = _raid_io;
747  
748  	raid_bdev_submit_reset_request(raid_io);
749  }
750  
751  /*
752   * brief:
753   * raid_bdev_submit_reset_request function submits reset requests
754   * to member disks; it will submit as many as possible unless a reset fails with -ENOMEM, in
755   * which case it will queue it for later submission
756   * params:
757   * raid_io
758   * returns:
759   * none
760   */
761  static void
762  raid_bdev_submit_reset_request(struct raid_bdev_io *raid_io)
763  {
764  	struct raid_bdev		*raid_bdev;
765  	int				ret;
766  	uint8_t				i;
767  	struct raid_base_bdev_info	*base_info;
768  	struct spdk_io_channel		*base_ch;
769  
770  	raid_bdev = raid_io->raid_bdev;
771  
772  	if (raid_io->base_bdev_io_remaining == 0) {
773  		raid_io->base_bdev_io_remaining = raid_bdev->num_base_bdevs;
774  	}
775  
776  	for (i = raid_io->base_bdev_io_submitted; i < raid_bdev->num_base_bdevs; i++) {
777  		base_info = &raid_bdev->base_bdev_info[i];
778  		base_ch = raid_io->raid_ch->base_channel[i];
779  		if (base_ch == NULL) {
780  			raid_io->base_bdev_io_submitted++;
781  			raid_bdev_io_complete_part(raid_io, 1, SPDK_BDEV_IO_STATUS_SUCCESS);
782  			continue;
783  		}
784  		ret = spdk_bdev_reset(base_info->desc, base_ch,
785  				      raid_base_bdev_reset_complete, raid_io);
786  		if (ret == 0) {
787  			raid_io->base_bdev_io_submitted++;
788  		} else if (ret == -ENOMEM) {
789  			raid_bdev_queue_io_wait(raid_io, spdk_bdev_desc_get_bdev(base_info->desc),
790  						base_ch, _raid_bdev_submit_reset_request);
791  			return;
792  		} else {
793  			SPDK_ERRLOG("bdev io submit error not due to ENOMEM, it should not happen\n");
794  			assert(false);
795  			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
796  			return;
797  		}
798  	}
799  }
800  
801  static void
802  raid_bdev_io_split(struct raid_bdev_io *raid_io, uint64_t split_offset)
803  {
804  	struct raid_bdev *raid_bdev = raid_io->raid_bdev;
805  	size_t iov_offset = split_offset * raid_bdev->bdev.blocklen;
806  	int i;
807  
808  	assert(split_offset != 0);
809  	assert(raid_io->split.offset == RAID_OFFSET_BLOCKS_INVALID);
810  	raid_io->split.offset = split_offset;
811  
812  	raid_io->offset_blocks += split_offset;
813  	raid_io->num_blocks -= split_offset;
814  	if (raid_io->md_buf != NULL) {
815  		raid_io->md_buf += (split_offset * raid_bdev->bdev.md_len);
816  	}
817  
818  	for (i = 0; i < raid_io->iovcnt; i++) {
819  		struct iovec *iov = &raid_io->iovs[i];
820  
821  		if (iov_offset < iov->iov_len) {
822  			if (iov_offset == 0) {
823  				raid_io->split.iov = NULL;
824  			} else {
825  				raid_io->split.iov = iov;
826  				raid_io->split.iov_copy = *iov;
827  				iov->iov_base += iov_offset;
828  				iov->iov_len -= iov_offset;
829  			}
830  			raid_io->iovs += i;
831  			raid_io->iovcnt -= i;
832  			break;
833  		}
834  
835  		iov_offset -= iov->iov_len;
836  	}
837  }
838  
839  static void
840  raid_bdev_submit_rw_request(struct raid_bdev_io *raid_io)
841  {
842  	struct raid_bdev_io_channel *raid_ch = raid_io->raid_ch;
843  
844  	if (raid_ch->process.offset != RAID_OFFSET_BLOCKS_INVALID) {
845  		uint64_t offset_begin = raid_io->offset_blocks;
846  		uint64_t offset_end = offset_begin + raid_io->num_blocks;
847  
848  		if (offset_end > raid_ch->process.offset) {
849  			if (offset_begin < raid_ch->process.offset) {
850  				/*
851  				 * If the I/O spans both the processed and unprocessed ranges,
852  				 * split it and first handle the unprocessed part. After it
853  				 * completes, the rest will be handled.
854  				 * This situation occurs when the process thread is not active
855  				 * or is waiting for the process window range to be locked
856  				 * (quiesced). When a window is being processed, such I/Os will be
857  				 * deferred by the bdev layer until the window is unlocked.
858  				 */
859  				SPDK_DEBUGLOG(bdev_raid, "split: process_offset: %lu offset_begin: %lu offset_end: %lu\n",
860  					      raid_ch->process.offset, offset_begin, offset_end);
861  				raid_bdev_io_split(raid_io, raid_ch->process.offset - offset_begin);
862  			}
863  		} else {
864  			/* Use the child channel, which corresponds to the already processed range */
865  			raid_io->raid_ch = raid_ch->process.ch_processed;
866  		}
867  	}
868  
869  	raid_io->raid_bdev->module->submit_rw_request(raid_io);
870  }
871  
872  /*
873   * brief:
874   * Callback function to spdk_bdev_io_get_buf.
875   * params:
876   * ch - pointer to raid bdev io channel
877   * bdev_io - pointer to parent bdev_io on raid bdev device
878   * success - True if buffer is allocated or false otherwise.
879   * returns:
880   * none
881   */
882  static void
883  raid_bdev_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
884  		     bool success)
885  {
886  	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
887  
888  	if (!success) {
889  		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
890  		return;
891  	}
892  
893  	raid_io->iovs = bdev_io->u.bdev.iovs;
894  	raid_io->iovcnt = bdev_io->u.bdev.iovcnt;
895  	raid_io->md_buf = bdev_io->u.bdev.md_buf;
896  
897  	raid_bdev_submit_rw_request(raid_io);
898  }
899  
900  void
901  raid_bdev_io_init(struct raid_bdev_io *raid_io, struct raid_bdev_io_channel *raid_ch,
902  		  enum spdk_bdev_io_type type, uint64_t offset_blocks,
903  		  uint64_t num_blocks, struct iovec *iovs, int iovcnt, void *md_buf,
904  		  struct spdk_memory_domain *memory_domain, void *memory_domain_ctx)
905  {
906  	struct spdk_io_channel *ch = spdk_io_channel_from_ctx(raid_ch);
907  	struct raid_bdev *raid_bdev = spdk_io_channel_get_io_device(ch);
908  
909  	raid_io->type = type;
910  	raid_io->offset_blocks = offset_blocks;
911  	raid_io->num_blocks = num_blocks;
912  	raid_io->iovs = iovs;
913  	raid_io->iovcnt = iovcnt;
914  	raid_io->memory_domain = memory_domain;
915  	raid_io->memory_domain_ctx = memory_domain_ctx;
916  	raid_io->md_buf = md_buf;
917  
918  	raid_io->raid_bdev = raid_bdev;
919  	raid_io->raid_ch = raid_ch;
920  	raid_io->base_bdev_io_remaining = 0;
921  	raid_io->base_bdev_io_submitted = 0;
922  	raid_io->completion_cb = NULL;
923  	raid_io->split.offset = RAID_OFFSET_BLOCKS_INVALID;
924  
925  	raid_bdev_io_set_default_status(raid_io, SPDK_BDEV_IO_STATUS_SUCCESS);
926  }
927  
928  /*
929   * brief:
930   * raid_bdev_submit_request function is the submit_request function pointer of
931   * raid bdev function table. This is used to submit the io on raid_bdev to below
932   * layers.
933   * params:
934   * ch - pointer to raid bdev io channel
935   * bdev_io - pointer to parent bdev_io on raid bdev device
936   * returns:
937   * none
938   */
939  static void
940  raid_bdev_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
941  {
942  	struct raid_bdev_io *raid_io = (struct raid_bdev_io *)bdev_io->driver_ctx;
943  
944  	raid_bdev_io_init(raid_io, spdk_io_channel_get_ctx(ch), bdev_io->type,
945  			  bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
946  			  bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.md_buf,
947  			  bdev_io->u.bdev.memory_domain, bdev_io->u.bdev.memory_domain_ctx);
948  
949  	spdk_trace_record(TRACE_BDEV_RAID_IO_START, 0, 0, (uintptr_t)raid_io, (uintptr_t)bdev_io);
950  
951  	switch (bdev_io->type) {
952  	case SPDK_BDEV_IO_TYPE_READ:
953  		spdk_bdev_io_get_buf(bdev_io, raid_bdev_get_buf_cb,
954  				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
955  		break;
956  	case SPDK_BDEV_IO_TYPE_WRITE:
957  		raid_bdev_submit_rw_request(raid_io);
958  		break;
959  
960  	case SPDK_BDEV_IO_TYPE_RESET:
961  		raid_bdev_submit_reset_request(raid_io);
962  		break;
963  
964  	case SPDK_BDEV_IO_TYPE_FLUSH:
965  	case SPDK_BDEV_IO_TYPE_UNMAP:
966  		if (raid_io->raid_bdev->process != NULL) {
967  			/* TODO: rebuild support */
968  			raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
969  			return;
970  		}
971  		raid_io->raid_bdev->module->submit_null_payload_request(raid_io);
972  		break;
973  
974  	default:
975  		SPDK_ERRLOG("submit request, invalid io type %u\n", bdev_io->type);
976  		raid_bdev_io_complete(raid_io, SPDK_BDEV_IO_STATUS_FAILED);
977  		break;
978  	}
979  }
980  
981  /*
982   * brief:
983   * _raid_bdev_io_type_supported checks whether io_type is supported in
984   * all base bdev modules of raid bdev module. If anyone among the base_bdevs
985   * doesn't support, the raid device doesn't supports.
986   *
987   * params:
988   * raid_bdev - pointer to raid bdev context
989   * io_type - io type
990   * returns:
991   * true - io_type is supported
992   * false - io_type is not supported
993   */
994  inline static bool
995  _raid_bdev_io_type_supported(struct raid_bdev *raid_bdev, enum spdk_bdev_io_type io_type)
996  {
997  	struct raid_base_bdev_info *base_info;
998  
999  	if (io_type == SPDK_BDEV_IO_TYPE_FLUSH ||
1000  	    io_type == SPDK_BDEV_IO_TYPE_UNMAP) {
1001  		if (raid_bdev->module->submit_null_payload_request == NULL) {
1002  			return false;
1003  		}
1004  	}
1005  
1006  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1007  		if (base_info->desc == NULL) {
1008  			continue;
1009  		}
1010  
1011  		if (spdk_bdev_io_type_supported(spdk_bdev_desc_get_bdev(base_info->desc), io_type) == false) {
1012  			return false;
1013  		}
1014  	}
1015  
1016  	return true;
1017  }
1018  
1019  /*
1020   * brief:
1021   * raid_bdev_io_type_supported is the io_supported function for bdev function
1022   * table which returns whether the particular io type is supported or not by
1023   * raid bdev module
1024   * params:
1025   * ctx - pointer to raid bdev context
1026   * type - io type
1027   * returns:
1028   * true - io_type is supported
1029   * false - io_type is not supported
1030   */
1031  static bool
1032  raid_bdev_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
1033  {
1034  	switch (io_type) {
1035  	case SPDK_BDEV_IO_TYPE_READ:
1036  	case SPDK_BDEV_IO_TYPE_WRITE:
1037  		return true;
1038  
1039  	case SPDK_BDEV_IO_TYPE_FLUSH:
1040  	case SPDK_BDEV_IO_TYPE_RESET:
1041  	case SPDK_BDEV_IO_TYPE_UNMAP:
1042  		return _raid_bdev_io_type_supported(ctx, io_type);
1043  
1044  	default:
1045  		return false;
1046  	}
1047  
1048  	return false;
1049  }
1050  
1051  /*
1052   * brief:
1053   * raid_bdev_get_io_channel is the get_io_channel function table pointer for
1054   * raid bdev. This is used to return the io channel for this raid bdev
1055   * params:
1056   * ctxt - pointer to raid_bdev
1057   * returns:
1058   * pointer to io channel for raid bdev
1059   */
1060  static struct spdk_io_channel *
1061  raid_bdev_get_io_channel(void *ctxt)
1062  {
1063  	struct raid_bdev *raid_bdev = ctxt;
1064  
1065  	return spdk_get_io_channel(raid_bdev);
1066  }
1067  
1068  void
1069  raid_bdev_write_info_json(struct raid_bdev *raid_bdev, struct spdk_json_write_ctx *w)
1070  {
1071  	struct raid_base_bdev_info *base_info;
1072  
1073  	assert(raid_bdev != NULL);
1074  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1075  
1076  	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1077  	spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1078  	spdk_json_write_named_string(w, "state", raid_bdev_state_to_str(raid_bdev->state));
1079  	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1080  	spdk_json_write_named_bool(w, "superblock", raid_bdev->superblock_enabled);
1081  	spdk_json_write_named_uint32(w, "num_base_bdevs", raid_bdev->num_base_bdevs);
1082  	spdk_json_write_named_uint32(w, "num_base_bdevs_discovered", raid_bdev->num_base_bdevs_discovered);
1083  	spdk_json_write_named_uint32(w, "num_base_bdevs_operational",
1084  				     raid_bdev->num_base_bdevs_operational);
1085  	if (raid_bdev->process) {
1086  		struct raid_bdev_process *process = raid_bdev->process;
1087  		uint64_t offset = process->window_offset;
1088  
1089  		spdk_json_write_named_object_begin(w, "process");
1090  		spdk_json_write_name(w, "type");
1091  		spdk_json_write_string(w, raid_bdev_process_to_str(process->type));
1092  		spdk_json_write_named_string(w, "target", process->target->name);
1093  		spdk_json_write_named_object_begin(w, "progress");
1094  		spdk_json_write_named_uint64(w, "blocks", offset);
1095  		spdk_json_write_named_uint32(w, "percent", offset * 100.0 / raid_bdev->bdev.blockcnt);
1096  		spdk_json_write_object_end(w);
1097  		spdk_json_write_object_end(w);
1098  	}
1099  	spdk_json_write_name(w, "base_bdevs_list");
1100  	spdk_json_write_array_begin(w);
1101  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1102  		spdk_json_write_object_begin(w);
1103  		spdk_json_write_name(w, "name");
1104  		if (base_info->name) {
1105  			spdk_json_write_string(w, base_info->name);
1106  		} else {
1107  			spdk_json_write_null(w);
1108  		}
1109  		spdk_json_write_named_uuid(w, "uuid", &base_info->uuid);
1110  		spdk_json_write_named_bool(w, "is_configured", base_info->is_configured);
1111  		spdk_json_write_named_uint64(w, "data_offset", base_info->data_offset);
1112  		spdk_json_write_named_uint64(w, "data_size", base_info->data_size);
1113  		spdk_json_write_object_end(w);
1114  	}
1115  	spdk_json_write_array_end(w);
1116  }
1117  
1118  /*
1119   * brief:
1120   * raid_bdev_dump_info_json is the function table pointer for raid bdev
1121   * params:
1122   * ctx - pointer to raid_bdev
1123   * w - pointer to json context
1124   * returns:
1125   * 0 - success
1126   * non zero - failure
1127   */
1128  static int
1129  raid_bdev_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
1130  {
1131  	struct raid_bdev *raid_bdev = ctx;
1132  
1133  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_dump_config_json\n");
1134  
1135  	/* Dump the raid bdev configuration related information */
1136  	spdk_json_write_named_object_begin(w, "raid");
1137  	raid_bdev_write_info_json(raid_bdev, w);
1138  	spdk_json_write_object_end(w);
1139  
1140  	return 0;
1141  }
1142  
1143  /*
1144   * brief:
1145   * raid_bdev_write_config_json is the function table pointer for raid bdev
1146   * params:
1147   * bdev - pointer to spdk_bdev
1148   * w - pointer to json context
1149   * returns:
1150   * none
1151   */
1152  static void
1153  raid_bdev_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
1154  {
1155  	struct raid_bdev *raid_bdev = bdev->ctxt;
1156  	struct raid_base_bdev_info *base_info;
1157  
1158  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
1159  
1160  	if (raid_bdev->superblock_enabled) {
1161  		/* raid bdev configuration is stored in the superblock */
1162  		return;
1163  	}
1164  
1165  	spdk_json_write_object_begin(w);
1166  
1167  	spdk_json_write_named_string(w, "method", "bdev_raid_create");
1168  
1169  	spdk_json_write_named_object_begin(w, "params");
1170  	spdk_json_write_named_string(w, "name", bdev->name);
1171  	spdk_json_write_named_uuid(w, "uuid", &raid_bdev->bdev.uuid);
1172  	if (raid_bdev->strip_size_kb != 0) {
1173  		spdk_json_write_named_uint32(w, "strip_size_kb", raid_bdev->strip_size_kb);
1174  	}
1175  	spdk_json_write_named_string(w, "raid_level", raid_bdev_level_to_str(raid_bdev->level));
1176  
1177  	spdk_json_write_named_array_begin(w, "base_bdevs");
1178  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1179  		if (base_info->name) {
1180  			spdk_json_write_string(w, base_info->name);
1181  		} else {
1182  			char str[32];
1183  
1184  			snprintf(str, sizeof(str), "removed_base_bdev_%u", raid_bdev_base_bdev_slot(base_info));
1185  			spdk_json_write_string(w, str);
1186  		}
1187  	}
1188  	spdk_json_write_array_end(w);
1189  	spdk_json_write_object_end(w);
1190  
1191  	spdk_json_write_object_end(w);
1192  }
1193  
1194  static int
1195  raid_bdev_get_memory_domains(void *ctx, struct spdk_memory_domain **domains, int array_size)
1196  {
1197  	struct raid_bdev *raid_bdev = ctx;
1198  	struct raid_base_bdev_info *base_info;
1199  	int domains_count = 0, rc = 0;
1200  
1201  	if (raid_bdev->module->memory_domains_supported == false) {
1202  		return 0;
1203  	}
1204  
1205  	/* First loop to get the number of memory domains */
1206  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1207  		if (base_info->is_configured == false) {
1208  			continue;
1209  		}
1210  		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), NULL, 0);
1211  		if (rc < 0) {
1212  			return rc;
1213  		}
1214  		domains_count += rc;
1215  	}
1216  
1217  	if (!domains || array_size < domains_count) {
1218  		return domains_count;
1219  	}
1220  
1221  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1222  		if (base_info->is_configured == false) {
1223  			continue;
1224  		}
1225  		rc = spdk_bdev_get_memory_domains(spdk_bdev_desc_get_bdev(base_info->desc), domains, array_size);
1226  		if (rc < 0) {
1227  			return rc;
1228  		}
1229  		domains += rc;
1230  		array_size -= rc;
1231  	}
1232  
1233  	return domains_count;
1234  }
1235  
1236  /* g_raid_bdev_fn_table is the function table for raid bdev */
1237  static const struct spdk_bdev_fn_table g_raid_bdev_fn_table = {
1238  	.destruct		= raid_bdev_destruct,
1239  	.submit_request		= raid_bdev_submit_request,
1240  	.io_type_supported	= raid_bdev_io_type_supported,
1241  	.get_io_channel		= raid_bdev_get_io_channel,
1242  	.dump_info_json		= raid_bdev_dump_info_json,
1243  	.write_config_json	= raid_bdev_write_config_json,
1244  	.get_memory_domains	= raid_bdev_get_memory_domains,
1245  };
1246  
1247  struct raid_bdev *
1248  raid_bdev_find_by_name(const char *name)
1249  {
1250  	struct raid_bdev *raid_bdev;
1251  
1252  	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1253  		if (strcmp(raid_bdev->bdev.name, name) == 0) {
1254  			return raid_bdev;
1255  		}
1256  	}
1257  
1258  	return NULL;
1259  }
1260  
1261  static struct raid_bdev *
1262  raid_bdev_find_by_uuid(const struct spdk_uuid *uuid)
1263  {
1264  	struct raid_bdev *raid_bdev;
1265  
1266  	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1267  		if (spdk_uuid_compare(&raid_bdev->bdev.uuid, uuid) == 0) {
1268  			return raid_bdev;
1269  		}
1270  	}
1271  
1272  	return NULL;
1273  }
1274  
1275  static struct {
1276  	const char *name;
1277  	enum raid_level value;
1278  } g_raid_level_names[] = {
1279  	{ "raid0", RAID0 },
1280  	{ "0", RAID0 },
1281  	{ "raid1", RAID1 },
1282  	{ "1", RAID1 },
1283  	{ "raid5f", RAID5F },
1284  	{ "5f", RAID5F },
1285  	{ "concat", CONCAT },
1286  	{ }
1287  };
1288  
1289  const char *g_raid_state_names[] = {
1290  	[RAID_BDEV_STATE_ONLINE]	= "online",
1291  	[RAID_BDEV_STATE_CONFIGURING]	= "configuring",
1292  	[RAID_BDEV_STATE_OFFLINE]	= "offline",
1293  	[RAID_BDEV_STATE_MAX]		= NULL
1294  };
1295  
1296  static const char *g_raid_process_type_names[] = {
1297  	[RAID_PROCESS_NONE]	= "none",
1298  	[RAID_PROCESS_REBUILD]	= "rebuild",
1299  	[RAID_PROCESS_MAX]	= NULL
1300  };
1301  
1302  /* We have to use the typedef in the function declaration to appease astyle. */
1303  typedef enum raid_level raid_level_t;
1304  typedef enum raid_bdev_state raid_bdev_state_t;
1305  
1306  raid_level_t
1307  raid_bdev_str_to_level(const char *str)
1308  {
1309  	unsigned int i;
1310  
1311  	assert(str != NULL);
1312  
1313  	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1314  		if (strcasecmp(g_raid_level_names[i].name, str) == 0) {
1315  			return g_raid_level_names[i].value;
1316  		}
1317  	}
1318  
1319  	return INVALID_RAID_LEVEL;
1320  }
1321  
1322  const char *
1323  raid_bdev_level_to_str(enum raid_level level)
1324  {
1325  	unsigned int i;
1326  
1327  	for (i = 0; g_raid_level_names[i].name != NULL; i++) {
1328  		if (g_raid_level_names[i].value == level) {
1329  			return g_raid_level_names[i].name;
1330  		}
1331  	}
1332  
1333  	return "";
1334  }
1335  
1336  raid_bdev_state_t
1337  raid_bdev_str_to_state(const char *str)
1338  {
1339  	unsigned int i;
1340  
1341  	assert(str != NULL);
1342  
1343  	for (i = 0; i < RAID_BDEV_STATE_MAX; i++) {
1344  		if (strcasecmp(g_raid_state_names[i], str) == 0) {
1345  			break;
1346  		}
1347  	}
1348  
1349  	return i;
1350  }
1351  
1352  const char *
1353  raid_bdev_state_to_str(enum raid_bdev_state state)
1354  {
1355  	if (state >= RAID_BDEV_STATE_MAX) {
1356  		return "";
1357  	}
1358  
1359  	return g_raid_state_names[state];
1360  }
1361  
1362  const char *
1363  raid_bdev_process_to_str(enum raid_process_type value)
1364  {
1365  	if (value >= RAID_PROCESS_MAX) {
1366  		return "";
1367  	}
1368  
1369  	return g_raid_process_type_names[value];
1370  }
1371  
1372  /*
1373   * brief:
1374   * raid_bdev_fini_start is called when bdev layer is starting the
1375   * shutdown process
1376   * params:
1377   * none
1378   * returns:
1379   * none
1380   */
1381  static void
1382  raid_bdev_fini_start(void)
1383  {
1384  	struct raid_bdev *raid_bdev;
1385  	struct raid_base_bdev_info *base_info;
1386  
1387  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_fini_start\n");
1388  
1389  	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1390  		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1391  			RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1392  				raid_bdev_free_base_bdev_resource(base_info);
1393  			}
1394  		}
1395  	}
1396  
1397  	g_shutdown_started = true;
1398  }
1399  
1400  /*
1401   * brief:
1402   * raid_bdev_exit is called on raid bdev module exit time by bdev layer
1403   * params:
1404   * none
1405   * returns:
1406   * none
1407   */
1408  static void
1409  raid_bdev_exit(void)
1410  {
1411  	struct raid_bdev *raid_bdev, *tmp;
1412  
1413  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_exit\n");
1414  
1415  	TAILQ_FOREACH_SAFE(raid_bdev, &g_raid_bdev_list, global_link, tmp) {
1416  		raid_bdev_cleanup_and_free(raid_bdev);
1417  	}
1418  }
1419  
1420  static void
1421  raid_bdev_opts_config_json(struct spdk_json_write_ctx *w)
1422  {
1423  	spdk_json_write_object_begin(w);
1424  
1425  	spdk_json_write_named_string(w, "method", "bdev_raid_set_options");
1426  
1427  	spdk_json_write_named_object_begin(w, "params");
1428  	spdk_json_write_named_uint32(w, "process_window_size_kb", g_opts.process_window_size_kb);
1429  	spdk_json_write_named_uint32(w, "process_max_bandwidth_mb_sec",
1430  				     g_opts.process_max_bandwidth_mb_sec);
1431  	spdk_json_write_object_end(w);
1432  
1433  	spdk_json_write_object_end(w);
1434  }
1435  
1436  static int
1437  raid_bdev_config_json(struct spdk_json_write_ctx *w)
1438  {
1439  	raid_bdev_opts_config_json(w);
1440  
1441  	return 0;
1442  }
1443  
1444  /*
1445   * brief:
1446   * raid_bdev_get_ctx_size is used to return the context size of bdev_io for raid
1447   * module
1448   * params:
1449   * none
1450   * returns:
1451   * size of spdk_bdev_io context for raid
1452   */
1453  static int
1454  raid_bdev_get_ctx_size(void)
1455  {
1456  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_get_ctx_size\n");
1457  	return sizeof(struct raid_bdev_io);
1458  }
1459  
1460  static struct spdk_bdev_module g_raid_if = {
1461  	.name = "raid",
1462  	.module_init = raid_bdev_init,
1463  	.fini_start = raid_bdev_fini_start,
1464  	.module_fini = raid_bdev_exit,
1465  	.config_json = raid_bdev_config_json,
1466  	.get_ctx_size = raid_bdev_get_ctx_size,
1467  	.examine_disk = raid_bdev_examine,
1468  	.async_init = false,
1469  	.async_fini = false,
1470  };
1471  SPDK_BDEV_MODULE_REGISTER(raid, &g_raid_if)
1472  
1473  /*
1474   * brief:
1475   * raid_bdev_init is the initialization function for raid bdev module
1476   * params:
1477   * none
1478   * returns:
1479   * 0 - success
1480   * non zero - failure
1481   */
1482  static int
1483  raid_bdev_init(void)
1484  {
1485  	return 0;
1486  }
1487  
1488  static int
1489  _raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1490  		  enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1491  		  struct raid_bdev **raid_bdev_out)
1492  {
1493  	struct raid_bdev *raid_bdev;
1494  	struct spdk_bdev *raid_bdev_gen;
1495  	struct raid_bdev_module *module;
1496  	struct raid_base_bdev_info *base_info;
1497  	uint8_t min_operational;
1498  
1499  	if (strnlen(name, RAID_BDEV_SB_NAME_SIZE) == RAID_BDEV_SB_NAME_SIZE) {
1500  		SPDK_ERRLOG("Raid bdev name '%s' exceeds %d characters\n", name, RAID_BDEV_SB_NAME_SIZE - 1);
1501  		return -EINVAL;
1502  	}
1503  
1504  	if (raid_bdev_find_by_name(name) != NULL) {
1505  		SPDK_ERRLOG("Duplicate raid bdev name found: %s\n", name);
1506  		return -EEXIST;
1507  	}
1508  
1509  	if (level == RAID1) {
1510  		if (strip_size != 0) {
1511  			SPDK_ERRLOG("Strip size is not supported by raid1\n");
1512  			return -EINVAL;
1513  		}
1514  	} else if (spdk_u32_is_pow2(strip_size) == false) {
1515  		SPDK_ERRLOG("Invalid strip size %" PRIu32 "\n", strip_size);
1516  		return -EINVAL;
1517  	}
1518  
1519  	module = raid_bdev_module_find(level);
1520  	if (module == NULL) {
1521  		SPDK_ERRLOG("Unsupported raid level '%d'\n", level);
1522  		return -EINVAL;
1523  	}
1524  
1525  	assert(module->base_bdevs_min != 0);
1526  	if (num_base_bdevs < module->base_bdevs_min) {
1527  		SPDK_ERRLOG("At least %u base devices required for %s\n",
1528  			    module->base_bdevs_min,
1529  			    raid_bdev_level_to_str(level));
1530  		return -EINVAL;
1531  	}
1532  
1533  	switch (module->base_bdevs_constraint.type) {
1534  	case CONSTRAINT_MAX_BASE_BDEVS_REMOVED:
1535  		min_operational = num_base_bdevs - module->base_bdevs_constraint.value;
1536  		break;
1537  	case CONSTRAINT_MIN_BASE_BDEVS_OPERATIONAL:
1538  		min_operational = module->base_bdevs_constraint.value;
1539  		break;
1540  	case CONSTRAINT_UNSET:
1541  		if (module->base_bdevs_constraint.value != 0) {
1542  			SPDK_ERRLOG("Unexpected constraint value '%u' provided for raid bdev '%s'.\n",
1543  				    (uint8_t)module->base_bdevs_constraint.value, name);
1544  			return -EINVAL;
1545  		}
1546  		min_operational = num_base_bdevs;
1547  		break;
1548  	default:
1549  		SPDK_ERRLOG("Unrecognised constraint type '%u' in module for raid level '%s'.\n",
1550  			    (uint8_t)module->base_bdevs_constraint.type,
1551  			    raid_bdev_level_to_str(module->level));
1552  		return -EINVAL;
1553  	};
1554  
1555  	if (min_operational == 0 || min_operational > num_base_bdevs) {
1556  		SPDK_ERRLOG("Wrong constraint value for raid level '%s'.\n",
1557  			    raid_bdev_level_to_str(module->level));
1558  		return -EINVAL;
1559  	}
1560  
1561  	raid_bdev = calloc(1, sizeof(*raid_bdev));
1562  	if (!raid_bdev) {
1563  		SPDK_ERRLOG("Unable to allocate memory for raid bdev\n");
1564  		return -ENOMEM;
1565  	}
1566  
1567  	raid_bdev->module = module;
1568  	raid_bdev->num_base_bdevs = num_base_bdevs;
1569  	raid_bdev->base_bdev_info = calloc(raid_bdev->num_base_bdevs,
1570  					   sizeof(struct raid_base_bdev_info));
1571  	if (!raid_bdev->base_bdev_info) {
1572  		SPDK_ERRLOG("Unable able to allocate base bdev info\n");
1573  		raid_bdev_free(raid_bdev);
1574  		return -ENOMEM;
1575  	}
1576  
1577  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1578  		base_info->raid_bdev = raid_bdev;
1579  	}
1580  
1581  	/* strip_size_kb is from the rpc param.  strip_size is in blocks and used
1582  	 * internally and set later.
1583  	 */
1584  	raid_bdev->strip_size = 0;
1585  	raid_bdev->strip_size_kb = strip_size;
1586  	raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1587  	raid_bdev->level = level;
1588  	raid_bdev->min_base_bdevs_operational = min_operational;
1589  	raid_bdev->superblock_enabled = superblock_enabled;
1590  
1591  	raid_bdev_gen = &raid_bdev->bdev;
1592  
1593  	raid_bdev_gen->name = strdup(name);
1594  	if (!raid_bdev_gen->name) {
1595  		SPDK_ERRLOG("Unable to allocate name for raid\n");
1596  		raid_bdev_free(raid_bdev);
1597  		return -ENOMEM;
1598  	}
1599  
1600  	raid_bdev_gen->product_name = "Raid Volume";
1601  	raid_bdev_gen->ctxt = raid_bdev;
1602  	raid_bdev_gen->fn_table = &g_raid_bdev_fn_table;
1603  	raid_bdev_gen->module = &g_raid_if;
1604  	raid_bdev_gen->write_cache = 0;
1605  	spdk_uuid_copy(&raid_bdev_gen->uuid, uuid);
1606  
1607  	TAILQ_INSERT_TAIL(&g_raid_bdev_list, raid_bdev, global_link);
1608  
1609  	*raid_bdev_out = raid_bdev;
1610  
1611  	return 0;
1612  }
1613  
1614  /*
1615   * brief:
1616   * raid_bdev_create allocates raid bdev based on passed configuration
1617   * params:
1618   * name - name for raid bdev
1619   * strip_size - strip size in KB
1620   * num_base_bdevs - number of base bdevs
1621   * level - raid level
1622   * superblock_enabled - true if raid should have superblock
1623   * uuid - uuid to set for the bdev
1624   * raid_bdev_out - the created raid bdev
1625   * returns:
1626   * 0 - success
1627   * non zero - failure
1628   */
1629  int
1630  raid_bdev_create(const char *name, uint32_t strip_size, uint8_t num_base_bdevs,
1631  		 enum raid_level level, bool superblock_enabled, const struct spdk_uuid *uuid,
1632  		 struct raid_bdev **raid_bdev_out)
1633  {
1634  	struct raid_bdev *raid_bdev;
1635  	int rc;
1636  
1637  	assert(uuid != NULL);
1638  
1639  	rc = _raid_bdev_create(name, strip_size, num_base_bdevs, level, superblock_enabled, uuid,
1640  			       &raid_bdev);
1641  	if (rc != 0) {
1642  		return rc;
1643  	}
1644  
1645  	if (superblock_enabled && spdk_uuid_is_null(uuid)) {
1646  		/* we need to have the uuid to store in the superblock before the bdev is registered */
1647  		spdk_uuid_generate(&raid_bdev->bdev.uuid);
1648  	}
1649  
1650  	raid_bdev->num_base_bdevs_operational = num_base_bdevs;
1651  
1652  	*raid_bdev_out = raid_bdev;
1653  
1654  	return 0;
1655  }
1656  
1657  static void
1658  _raid_bdev_unregistering_cont(void *ctx)
1659  {
1660  	struct raid_bdev *raid_bdev = ctx;
1661  
1662  	spdk_bdev_close(raid_bdev->self_desc);
1663  	raid_bdev->self_desc = NULL;
1664  }
1665  
1666  static void
1667  raid_bdev_unregistering_cont(void *ctx)
1668  {
1669  	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_unregistering_cont, ctx);
1670  }
1671  
1672  static int
1673  raid_bdev_process_add_finish_action(struct raid_bdev_process *process, spdk_msg_fn cb, void *cb_ctx)
1674  {
1675  	struct raid_process_finish_action *finish_action;
1676  
1677  	assert(spdk_get_thread() == process->thread);
1678  	assert(process->state < RAID_PROCESS_STATE_STOPPED);
1679  
1680  	finish_action = calloc(1, sizeof(*finish_action));
1681  	if (finish_action == NULL) {
1682  		return -ENOMEM;
1683  	}
1684  
1685  	finish_action->cb = cb;
1686  	finish_action->cb_ctx = cb_ctx;
1687  
1688  	TAILQ_INSERT_TAIL(&process->finish_actions, finish_action, link);
1689  
1690  	return 0;
1691  }
1692  
1693  static void
1694  raid_bdev_unregistering_stop_process(void *ctx)
1695  {
1696  	struct raid_bdev_process *process = ctx;
1697  	struct raid_bdev *raid_bdev = process->raid_bdev;
1698  	int rc;
1699  
1700  	process->state = RAID_PROCESS_STATE_STOPPING;
1701  	if (process->status == 0) {
1702  		process->status = -ECANCELED;
1703  	}
1704  
1705  	rc = raid_bdev_process_add_finish_action(process, raid_bdev_unregistering_cont, raid_bdev);
1706  	if (rc != 0) {
1707  		SPDK_ERRLOG("Failed to add raid bdev '%s' process finish action: %s\n",
1708  			    raid_bdev->bdev.name, spdk_strerror(-rc));
1709  	}
1710  }
1711  
1712  static void
1713  raid_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
1714  {
1715  	struct raid_bdev *raid_bdev = event_ctx;
1716  
1717  	if (type == SPDK_BDEV_EVENT_REMOVE) {
1718  		if (raid_bdev->process != NULL) {
1719  			spdk_thread_send_msg(raid_bdev->process->thread, raid_bdev_unregistering_stop_process,
1720  					     raid_bdev->process);
1721  		} else {
1722  			raid_bdev_unregistering_cont(raid_bdev);
1723  		}
1724  	}
1725  }
1726  
1727  static void
1728  raid_bdev_configure_cont(struct raid_bdev *raid_bdev)
1729  {
1730  	struct spdk_bdev *raid_bdev_gen = &raid_bdev->bdev;
1731  	int rc;
1732  
1733  	raid_bdev->state = RAID_BDEV_STATE_ONLINE;
1734  	SPDK_DEBUGLOG(bdev_raid, "io device register %p\n", raid_bdev);
1735  	SPDK_DEBUGLOG(bdev_raid, "blockcnt %" PRIu64 ", blocklen %u\n",
1736  		      raid_bdev_gen->blockcnt, raid_bdev_gen->blocklen);
1737  	spdk_io_device_register(raid_bdev, raid_bdev_create_cb, raid_bdev_destroy_cb,
1738  				sizeof(struct raid_bdev_io_channel),
1739  				raid_bdev_gen->name);
1740  	rc = spdk_bdev_register(raid_bdev_gen);
1741  	if (rc != 0) {
1742  		SPDK_ERRLOG("Failed to register raid bdev '%s': %s\n",
1743  			    raid_bdev_gen->name, spdk_strerror(-rc));
1744  		goto out;
1745  	}
1746  
1747  	/*
1748  	 * Open the bdev internally to delay unregistering if we need to stop a background process
1749  	 * first. The process may still need to unquiesce a range but it will fail because the
1750  	 * bdev's internal.spinlock is destroyed by the time the destruct callback is reached.
1751  	 * During application shutdown, bdevs automatically get unregistered by the bdev layer
1752  	 * so this is the only way currently to do this correctly.
1753  	 * TODO: try to handle this correctly in bdev layer instead.
1754  	 */
1755  	rc = spdk_bdev_open_ext(raid_bdev_gen->name, false, raid_bdev_event_cb, raid_bdev,
1756  				&raid_bdev->self_desc);
1757  	if (rc != 0) {
1758  		SPDK_ERRLOG("Failed to open raid bdev '%s': %s\n",
1759  			    raid_bdev_gen->name, spdk_strerror(-rc));
1760  		spdk_bdev_unregister(raid_bdev_gen, NULL, NULL);
1761  		goto out;
1762  	}
1763  
1764  	SPDK_DEBUGLOG(bdev_raid, "raid bdev generic %p\n", raid_bdev_gen);
1765  	SPDK_DEBUGLOG(bdev_raid, "raid bdev is created with name %s, raid_bdev %p\n",
1766  		      raid_bdev_gen->name, raid_bdev);
1767  out:
1768  	if (rc != 0) {
1769  		if (raid_bdev->module->stop != NULL) {
1770  			raid_bdev->module->stop(raid_bdev);
1771  		}
1772  		spdk_io_device_unregister(raid_bdev, NULL);
1773  		raid_bdev->state = RAID_BDEV_STATE_CONFIGURING;
1774  	}
1775  
1776  	if (raid_bdev->configure_cb != NULL) {
1777  		raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, rc);
1778  		raid_bdev->configure_cb = NULL;
1779  	}
1780  }
1781  
1782  static void
1783  raid_bdev_configure_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
1784  {
1785  	if (status == 0) {
1786  		raid_bdev_configure_cont(raid_bdev);
1787  	} else {
1788  		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
1789  			    raid_bdev->bdev.name, spdk_strerror(-status));
1790  		if (raid_bdev->module->stop != NULL) {
1791  			raid_bdev->module->stop(raid_bdev);
1792  		}
1793  		if (raid_bdev->configure_cb != NULL) {
1794  			raid_bdev->configure_cb(raid_bdev->configure_cb_ctx, status);
1795  			raid_bdev->configure_cb = NULL;
1796  		}
1797  	}
1798  }
1799  
1800  /*
1801   * brief:
1802   * If raid bdev config is complete, then only register the raid bdev to
1803   * bdev layer and remove this raid bdev from configuring list and
1804   * insert the raid bdev to configured list
1805   * params:
1806   * raid_bdev - pointer to raid bdev
1807   * returns:
1808   * 0 - success
1809   * non zero - failure
1810   */
1811  static int
1812  raid_bdev_configure(struct raid_bdev *raid_bdev, raid_bdev_configure_cb cb, void *cb_ctx)
1813  {
1814  	uint32_t data_block_size = spdk_bdev_get_data_block_size(&raid_bdev->bdev);
1815  	int rc;
1816  
1817  	assert(raid_bdev->state == RAID_BDEV_STATE_CONFIGURING);
1818  	assert(raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational);
1819  	assert(raid_bdev->bdev.blocklen > 0);
1820  
1821  	/* The strip_size_kb is read in from user in KB. Convert to blocks here for
1822  	 * internal use.
1823  	 */
1824  	raid_bdev->strip_size = (raid_bdev->strip_size_kb * 1024) / data_block_size;
1825  	if (raid_bdev->strip_size == 0 && raid_bdev->level != RAID1) {
1826  		SPDK_ERRLOG("Strip size cannot be smaller than the device block size\n");
1827  		return -EINVAL;
1828  	}
1829  	raid_bdev->strip_size_shift = spdk_u32log2(raid_bdev->strip_size);
1830  
1831  	rc = raid_bdev->module->start(raid_bdev);
1832  	if (rc != 0) {
1833  		SPDK_ERRLOG("raid module startup callback failed\n");
1834  		return rc;
1835  	}
1836  
1837  	assert(raid_bdev->configure_cb == NULL);
1838  	raid_bdev->configure_cb = cb;
1839  	raid_bdev->configure_cb_ctx = cb_ctx;
1840  
1841  	if (raid_bdev->superblock_enabled) {
1842  		if (raid_bdev->sb == NULL) {
1843  			rc = raid_bdev_alloc_superblock(raid_bdev, data_block_size);
1844  			if (rc == 0) {
1845  				raid_bdev_init_superblock(raid_bdev);
1846  			}
1847  		} else {
1848  			assert(spdk_uuid_compare(&raid_bdev->sb->uuid, &raid_bdev->bdev.uuid) == 0);
1849  			if (raid_bdev->sb->block_size != data_block_size) {
1850  				SPDK_ERRLOG("blocklen does not match value in superblock\n");
1851  				rc = -EINVAL;
1852  			}
1853  			if (raid_bdev->sb->raid_size != raid_bdev->bdev.blockcnt) {
1854  				SPDK_ERRLOG("blockcnt does not match value in superblock\n");
1855  				rc = -EINVAL;
1856  			}
1857  		}
1858  
1859  		if (rc != 0) {
1860  			raid_bdev->configure_cb = NULL;
1861  			if (raid_bdev->module->stop != NULL) {
1862  				raid_bdev->module->stop(raid_bdev);
1863  			}
1864  			return rc;
1865  		}
1866  
1867  		raid_bdev_write_superblock(raid_bdev, raid_bdev_configure_write_sb_cb, NULL);
1868  	} else {
1869  		raid_bdev_configure_cont(raid_bdev);
1870  	}
1871  
1872  	return 0;
1873  }
1874  
1875  /*
1876   * brief:
1877   * If raid bdev is online and registered, change the bdev state to
1878   * configuring and unregister this raid device. Queue this raid device
1879   * in configuring list
1880   * params:
1881   * raid_bdev - pointer to raid bdev
1882   * cb_fn - callback function
1883   * cb_arg - argument to callback function
1884   * returns:
1885   * none
1886   */
1887  static void
1888  raid_bdev_deconfigure(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn,
1889  		      void *cb_arg)
1890  {
1891  	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
1892  		if (cb_fn) {
1893  			cb_fn(cb_arg, 0);
1894  		}
1895  		return;
1896  	}
1897  
1898  	raid_bdev->state = RAID_BDEV_STATE_OFFLINE;
1899  	SPDK_DEBUGLOG(bdev_raid, "raid bdev state changing from online to offline\n");
1900  
1901  	spdk_bdev_unregister(&raid_bdev->bdev, cb_fn, cb_arg);
1902  }
1903  
1904  /*
1905   * brief:
1906   * raid_bdev_find_base_info_by_bdev function finds the base bdev info by bdev.
1907   * params:
1908   * base_bdev - pointer to base bdev
1909   * returns:
1910   * base bdev info if found, otherwise NULL.
1911   */
1912  static struct raid_base_bdev_info *
1913  raid_bdev_find_base_info_by_bdev(struct spdk_bdev *base_bdev)
1914  {
1915  	struct raid_bdev *raid_bdev;
1916  	struct raid_base_bdev_info *base_info;
1917  
1918  	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
1919  		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
1920  			if (base_info->desc != NULL &&
1921  			    spdk_bdev_desc_get_bdev(base_info->desc) == base_bdev) {
1922  				return base_info;
1923  			}
1924  		}
1925  	}
1926  
1927  	return NULL;
1928  }
1929  
1930  static void
1931  raid_bdev_remove_base_bdev_done(struct raid_base_bdev_info *base_info, int status)
1932  {
1933  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1934  
1935  	assert(base_info->remove_scheduled);
1936  	base_info->remove_scheduled = false;
1937  
1938  	if (status == 0) {
1939  		raid_bdev->num_base_bdevs_operational--;
1940  		if (raid_bdev->num_base_bdevs_operational < raid_bdev->min_base_bdevs_operational) {
1941  			/* There is not enough base bdevs to keep the raid bdev operational. */
1942  			raid_bdev_deconfigure(raid_bdev, base_info->remove_cb, base_info->remove_cb_ctx);
1943  			return;
1944  		}
1945  	}
1946  
1947  	if (base_info->remove_cb != NULL) {
1948  		base_info->remove_cb(base_info->remove_cb_ctx, status);
1949  	}
1950  }
1951  
1952  static void
1953  raid_bdev_remove_base_bdev_on_unquiesced(void *ctx, int status)
1954  {
1955  	struct raid_base_bdev_info *base_info = ctx;
1956  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1957  
1958  	if (status != 0) {
1959  		SPDK_ERRLOG("Failed to unquiesce raid bdev %s: %s\n",
1960  			    raid_bdev->bdev.name, spdk_strerror(-status));
1961  	}
1962  
1963  	raid_bdev_remove_base_bdev_done(base_info, status);
1964  }
1965  
1966  static void
1967  raid_bdev_channel_remove_base_bdev(struct spdk_io_channel_iter *i)
1968  {
1969  	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1970  	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
1971  	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
1972  	uint8_t idx = raid_bdev_base_bdev_slot(base_info);
1973  
1974  	SPDK_DEBUGLOG(bdev_raid, "slot: %u raid_ch: %p\n", idx, raid_ch);
1975  
1976  	if (raid_ch->base_channel[idx] != NULL) {
1977  		spdk_put_io_channel(raid_ch->base_channel[idx]);
1978  		raid_ch->base_channel[idx] = NULL;
1979  	}
1980  
1981  	if (raid_ch->process.ch_processed != NULL) {
1982  		raid_ch->process.ch_processed->base_channel[idx] = NULL;
1983  	}
1984  
1985  	spdk_for_each_channel_continue(i, 0);
1986  }
1987  
1988  static void
1989  raid_bdev_channels_remove_base_bdev_done(struct spdk_io_channel_iter *i, int status)
1990  {
1991  	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
1992  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
1993  
1994  	raid_bdev_free_base_bdev_resource(base_info);
1995  
1996  	spdk_bdev_unquiesce(&raid_bdev->bdev, &g_raid_if, raid_bdev_remove_base_bdev_on_unquiesced,
1997  			    base_info);
1998  }
1999  
2000  static void
2001  raid_bdev_remove_base_bdev_cont(struct raid_base_bdev_info *base_info)
2002  {
2003  	raid_bdev_deconfigure_base_bdev(base_info);
2004  
2005  	spdk_for_each_channel(base_info->raid_bdev, raid_bdev_channel_remove_base_bdev, base_info,
2006  			      raid_bdev_channels_remove_base_bdev_done);
2007  }
2008  
2009  static void
2010  raid_bdev_remove_base_bdev_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2011  {
2012  	struct raid_base_bdev_info *base_info = ctx;
2013  
2014  	if (status != 0) {
2015  		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock: %s\n",
2016  			    raid_bdev->bdev.name, spdk_strerror(-status));
2017  		raid_bdev_remove_base_bdev_done(base_info, status);
2018  		return;
2019  	}
2020  
2021  	raid_bdev_remove_base_bdev_cont(base_info);
2022  }
2023  
2024  static void
2025  raid_bdev_remove_base_bdev_on_quiesced(void *ctx, int status)
2026  {
2027  	struct raid_base_bdev_info *base_info = ctx;
2028  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2029  
2030  	if (status != 0) {
2031  		SPDK_ERRLOG("Failed to quiesce raid bdev %s: %s\n",
2032  			    raid_bdev->bdev.name, spdk_strerror(-status));
2033  		raid_bdev_remove_base_bdev_done(base_info, status);
2034  		return;
2035  	}
2036  
2037  	if (raid_bdev->sb) {
2038  		struct raid_bdev_superblock *sb = raid_bdev->sb;
2039  		uint8_t slot = raid_bdev_base_bdev_slot(base_info);
2040  		uint8_t i;
2041  
2042  		for (i = 0; i < sb->base_bdevs_size; i++) {
2043  			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2044  
2045  			if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED &&
2046  			    sb_base_bdev->slot == slot) {
2047  				if (base_info->is_failed) {
2048  					sb_base_bdev->state = RAID_SB_BASE_BDEV_FAILED;
2049  				} else {
2050  					sb_base_bdev->state = RAID_SB_BASE_BDEV_MISSING;
2051  				}
2052  
2053  				raid_bdev_write_superblock(raid_bdev, raid_bdev_remove_base_bdev_write_sb_cb, base_info);
2054  				return;
2055  			}
2056  		}
2057  	}
2058  
2059  	raid_bdev_remove_base_bdev_cont(base_info);
2060  }
2061  
2062  static int
2063  raid_bdev_remove_base_bdev_quiesce(struct raid_base_bdev_info *base_info)
2064  {
2065  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2066  
2067  	return spdk_bdev_quiesce(&base_info->raid_bdev->bdev, &g_raid_if,
2068  				 raid_bdev_remove_base_bdev_on_quiesced, base_info);
2069  }
2070  
2071  struct raid_bdev_process_base_bdev_remove_ctx {
2072  	struct raid_bdev_process *process;
2073  	struct raid_base_bdev_info *base_info;
2074  	uint8_t num_base_bdevs_operational;
2075  };
2076  
2077  static void
2078  _raid_bdev_process_base_bdev_remove_cont(void *ctx)
2079  {
2080  	struct raid_base_bdev_info *base_info = ctx;
2081  	int ret;
2082  
2083  	ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2084  	if (ret != 0) {
2085  		raid_bdev_remove_base_bdev_done(base_info, ret);
2086  	}
2087  }
2088  
2089  static void
2090  raid_bdev_process_base_bdev_remove_cont(void *_ctx)
2091  {
2092  	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2093  	struct raid_base_bdev_info *base_info = ctx->base_info;
2094  
2095  	free(ctx);
2096  
2097  	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_base_bdev_remove_cont,
2098  			     base_info);
2099  }
2100  
2101  static void
2102  _raid_bdev_process_base_bdev_remove(void *_ctx)
2103  {
2104  	struct raid_bdev_process_base_bdev_remove_ctx *ctx = _ctx;
2105  	struct raid_bdev_process *process = ctx->process;
2106  	int ret;
2107  
2108  	if (ctx->base_info != process->target &&
2109  	    ctx->num_base_bdevs_operational > process->raid_bdev->min_base_bdevs_operational) {
2110  		/* process doesn't need to be stopped */
2111  		raid_bdev_process_base_bdev_remove_cont(ctx);
2112  		return;
2113  	}
2114  
2115  	assert(process->state > RAID_PROCESS_STATE_INIT &&
2116  	       process->state < RAID_PROCESS_STATE_STOPPED);
2117  
2118  	ret = raid_bdev_process_add_finish_action(process, raid_bdev_process_base_bdev_remove_cont, ctx);
2119  	if (ret != 0) {
2120  		raid_bdev_remove_base_bdev_done(ctx->base_info, ret);
2121  		free(ctx);
2122  		return;
2123  	}
2124  
2125  	process->state = RAID_PROCESS_STATE_STOPPING;
2126  
2127  	if (process->status == 0) {
2128  		process->status = -ENODEV;
2129  	}
2130  }
2131  
2132  static int
2133  raid_bdev_process_base_bdev_remove(struct raid_bdev_process *process,
2134  				   struct raid_base_bdev_info *base_info)
2135  {
2136  	struct raid_bdev_process_base_bdev_remove_ctx *ctx;
2137  
2138  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2139  
2140  	ctx = calloc(1, sizeof(*ctx));
2141  	if (ctx == NULL) {
2142  		return -ENOMEM;
2143  	}
2144  
2145  	/*
2146  	 * We have to send the process and num_base_bdevs_operational in the message ctx
2147  	 * because the process thread should not access raid_bdev's properties. Particularly,
2148  	 * raid_bdev->process may be cleared by the time the message is handled, but ctx->process
2149  	 * will still be valid until the process is fully stopped.
2150  	 */
2151  	ctx->base_info = base_info;
2152  	ctx->process = process;
2153  	/*
2154  	 * raid_bdev->num_base_bdevs_operational can't be used here because it is decremented
2155  	 * after the removal and more than one base bdev may be removed at the same time
2156  	 */
2157  	RAID_FOR_EACH_BASE_BDEV(process->raid_bdev, base_info) {
2158  		if (base_info->is_configured && !base_info->remove_scheduled) {
2159  			ctx->num_base_bdevs_operational++;
2160  		}
2161  	}
2162  
2163  	spdk_thread_send_msg(process->thread, _raid_bdev_process_base_bdev_remove, ctx);
2164  
2165  	return 0;
2166  }
2167  
2168  static int
2169  _raid_bdev_remove_base_bdev(struct raid_base_bdev_info *base_info,
2170  			    raid_base_bdev_cb cb_fn, void *cb_ctx)
2171  {
2172  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
2173  	int ret = 0;
2174  
2175  	SPDK_DEBUGLOG(bdev_raid, "%s\n", base_info->name);
2176  
2177  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2178  
2179  	if (base_info->remove_scheduled || !base_info->is_configured) {
2180  		return -ENODEV;
2181  	}
2182  
2183  	assert(base_info->desc);
2184  	base_info->remove_scheduled = true;
2185  
2186  	if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2187  		/*
2188  		 * As raid bdev is not registered yet or already unregistered,
2189  		 * so cleanup should be done here itself.
2190  		 *
2191  		 * Removing a base bdev at this stage does not change the number of operational
2192  		 * base bdevs, only the number of discovered base bdevs.
2193  		 */
2194  		raid_bdev_free_base_bdev_resource(base_info);
2195  		base_info->remove_scheduled = false;
2196  		if (raid_bdev->num_base_bdevs_discovered == 0 &&
2197  		    raid_bdev->state == RAID_BDEV_STATE_OFFLINE) {
2198  			/* There is no base bdev for this raid, so free the raid device. */
2199  			raid_bdev_cleanup_and_free(raid_bdev);
2200  		}
2201  		if (cb_fn != NULL) {
2202  			cb_fn(cb_ctx, 0);
2203  		}
2204  	} else if (raid_bdev->min_base_bdevs_operational == raid_bdev->num_base_bdevs) {
2205  		/* This raid bdev does not tolerate removing a base bdev. */
2206  		raid_bdev->num_base_bdevs_operational--;
2207  		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_ctx);
2208  	} else {
2209  		base_info->remove_cb = cb_fn;
2210  		base_info->remove_cb_ctx = cb_ctx;
2211  
2212  		if (raid_bdev->process != NULL) {
2213  			ret = raid_bdev_process_base_bdev_remove(raid_bdev->process, base_info);
2214  		} else {
2215  			ret = raid_bdev_remove_base_bdev_quiesce(base_info);
2216  		}
2217  
2218  		if (ret != 0) {
2219  			base_info->remove_scheduled = false;
2220  		}
2221  	}
2222  
2223  	return ret;
2224  }
2225  
2226  /*
2227   * brief:
2228   * raid_bdev_remove_base_bdev function is called by below layers when base_bdev
2229   * is removed. This function checks if this base bdev is part of any raid bdev
2230   * or not. If yes, it takes necessary action on that particular raid bdev.
2231   * params:
2232   * base_bdev - pointer to base bdev which got removed
2233   * cb_fn - callback function
2234   * cb_arg - argument to callback function
2235   * returns:
2236   * 0 - success
2237   * non zero - failure
2238   */
2239  int
2240  raid_bdev_remove_base_bdev(struct spdk_bdev *base_bdev, raid_base_bdev_cb cb_fn, void *cb_ctx)
2241  {
2242  	struct raid_base_bdev_info *base_info;
2243  
2244  	/* Find the raid_bdev which has claimed this base_bdev */
2245  	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2246  	if (!base_info) {
2247  		SPDK_ERRLOG("bdev to remove '%s' not found\n", base_bdev->name);
2248  		return -ENODEV;
2249  	}
2250  
2251  	return _raid_bdev_remove_base_bdev(base_info, cb_fn, cb_ctx);
2252  }
2253  
2254  static void
2255  raid_bdev_fail_base_remove_cb(void *ctx, int status)
2256  {
2257  	struct raid_base_bdev_info *base_info = ctx;
2258  
2259  	if (status != 0) {
2260  		SPDK_WARNLOG("Failed to remove base bdev %s\n", base_info->name);
2261  		base_info->is_failed = false;
2262  	}
2263  }
2264  
2265  static void
2266  _raid_bdev_fail_base_bdev(void *ctx)
2267  {
2268  	struct raid_base_bdev_info *base_info = ctx;
2269  	int rc;
2270  
2271  	if (base_info->is_failed) {
2272  		return;
2273  	}
2274  	base_info->is_failed = true;
2275  
2276  	SPDK_NOTICELOG("Failing base bdev in slot %d ('%s') of raid bdev '%s'\n",
2277  		       raid_bdev_base_bdev_slot(base_info), base_info->name, base_info->raid_bdev->bdev.name);
2278  
2279  	rc = _raid_bdev_remove_base_bdev(base_info, raid_bdev_fail_base_remove_cb, base_info);
2280  	if (rc != 0) {
2281  		raid_bdev_fail_base_remove_cb(base_info, rc);
2282  	}
2283  }
2284  
2285  void
2286  raid_bdev_fail_base_bdev(struct raid_base_bdev_info *base_info)
2287  {
2288  	spdk_thread_exec_msg(spdk_thread_get_app_thread(), _raid_bdev_fail_base_bdev, base_info);
2289  }
2290  
2291  static void
2292  raid_bdev_resize_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2293  {
2294  	if (status != 0) {
2295  		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after resizing the bdev: %s\n",
2296  			    raid_bdev->bdev.name, spdk_strerror(-status));
2297  	}
2298  }
2299  
2300  /*
2301   * brief:
2302   * raid_bdev_resize_base_bdev function is called by below layers when base_bdev
2303   * is resized. This function checks if the smallest size of the base_bdevs is changed.
2304   * If yes, call module handler to resize the raid_bdev if implemented.
2305   * params:
2306   * base_bdev - pointer to base bdev which got resized.
2307   * returns:
2308   * none
2309   */
2310  static void
2311  raid_bdev_resize_base_bdev(struct spdk_bdev *base_bdev)
2312  {
2313  	struct raid_bdev *raid_bdev;
2314  	struct raid_base_bdev_info *base_info;
2315  	uint64_t blockcnt_old;
2316  
2317  	SPDK_DEBUGLOG(bdev_raid, "raid_bdev_resize_base_bdev\n");
2318  
2319  	base_info = raid_bdev_find_base_info_by_bdev(base_bdev);
2320  
2321  	/* Find the raid_bdev which has claimed this base_bdev */
2322  	if (!base_info) {
2323  		SPDK_ERRLOG("raid_bdev whose base_bdev '%s' not found\n", base_bdev->name);
2324  		return;
2325  	}
2326  	raid_bdev = base_info->raid_bdev;
2327  
2328  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
2329  
2330  	SPDK_NOTICELOG("base_bdev '%s' was resized: old size %" PRIu64 ", new size %" PRIu64 "\n",
2331  		       base_bdev->name, base_info->blockcnt, base_bdev->blockcnt);
2332  
2333  	base_info->blockcnt = base_bdev->blockcnt;
2334  
2335  	if (!raid_bdev->module->resize) {
2336  		return;
2337  	}
2338  
2339  	blockcnt_old = raid_bdev->bdev.blockcnt;
2340  	if (raid_bdev->module->resize(raid_bdev) == false) {
2341  		return;
2342  	}
2343  
2344  	SPDK_NOTICELOG("raid bdev '%s': block count was changed from %" PRIu64 " to %" PRIu64 "\n",
2345  		       raid_bdev->bdev.name, blockcnt_old, raid_bdev->bdev.blockcnt);
2346  
2347  	if (raid_bdev->superblock_enabled) {
2348  		struct raid_bdev_superblock *sb = raid_bdev->sb;
2349  		uint8_t i;
2350  
2351  		for (i = 0; i < sb->base_bdevs_size; i++) {
2352  			struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
2353  
2354  			if (sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2355  				base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2356  				sb_base_bdev->data_size = base_info->data_size;
2357  			}
2358  		}
2359  		sb->raid_size = raid_bdev->bdev.blockcnt;
2360  		raid_bdev_write_superblock(raid_bdev, raid_bdev_resize_write_sb_cb, NULL);
2361  	}
2362  }
2363  
2364  /*
2365   * brief:
2366   * raid_bdev_event_base_bdev function is called by below layers when base_bdev
2367   * triggers asynchronous event.
2368   * params:
2369   * type - event details.
2370   * bdev - bdev that triggered event.
2371   * event_ctx - context for event.
2372   * returns:
2373   * none
2374   */
2375  static void
2376  raid_bdev_event_base_bdev(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
2377  			  void *event_ctx)
2378  {
2379  	int rc;
2380  
2381  	switch (type) {
2382  	case SPDK_BDEV_EVENT_REMOVE:
2383  		rc = raid_bdev_remove_base_bdev(bdev, NULL, NULL);
2384  		if (rc != 0) {
2385  			SPDK_ERRLOG("Failed to remove base bdev %s: %s\n",
2386  				    spdk_bdev_get_name(bdev), spdk_strerror(-rc));
2387  		}
2388  		break;
2389  	case SPDK_BDEV_EVENT_RESIZE:
2390  		raid_bdev_resize_base_bdev(bdev);
2391  		break;
2392  	default:
2393  		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
2394  		break;
2395  	}
2396  }
2397  
2398  /*
2399   * brief:
2400   * Deletes the specified raid bdev
2401   * params:
2402   * raid_bdev - pointer to raid bdev
2403   * cb_fn - callback function
2404   * cb_arg - argument to callback function
2405   */
2406  void
2407  raid_bdev_delete(struct raid_bdev *raid_bdev, raid_bdev_destruct_cb cb_fn, void *cb_arg)
2408  {
2409  	struct raid_base_bdev_info *base_info;
2410  
2411  	SPDK_DEBUGLOG(bdev_raid, "delete raid bdev: %s\n", raid_bdev->bdev.name);
2412  
2413  	if (raid_bdev->destroy_started) {
2414  		SPDK_DEBUGLOG(bdev_raid, "destroying raid bdev %s is already started\n",
2415  			      raid_bdev->bdev.name);
2416  		if (cb_fn) {
2417  			cb_fn(cb_arg, -EALREADY);
2418  		}
2419  		return;
2420  	}
2421  
2422  	raid_bdev->destroy_started = true;
2423  
2424  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
2425  		base_info->remove_scheduled = true;
2426  
2427  		if (raid_bdev->state != RAID_BDEV_STATE_ONLINE) {
2428  			/*
2429  			 * As raid bdev is not registered yet or already unregistered,
2430  			 * so cleanup should be done here itself.
2431  			 */
2432  			raid_bdev_free_base_bdev_resource(base_info);
2433  		}
2434  	}
2435  
2436  	if (raid_bdev->num_base_bdevs_discovered == 0) {
2437  		/* There is no base bdev for this raid, so free the raid device. */
2438  		raid_bdev_cleanup_and_free(raid_bdev);
2439  		if (cb_fn) {
2440  			cb_fn(cb_arg, 0);
2441  		}
2442  	} else {
2443  		raid_bdev_deconfigure(raid_bdev, cb_fn, cb_arg);
2444  	}
2445  }
2446  
2447  static void
2448  raid_bdev_process_finish_write_sb_cb(int status, struct raid_bdev *raid_bdev, void *ctx)
2449  {
2450  	if (status != 0) {
2451  		SPDK_ERRLOG("Failed to write raid bdev '%s' superblock after background process finished: %s\n",
2452  			    raid_bdev->bdev.name, spdk_strerror(-status));
2453  	}
2454  }
2455  
2456  static void
2457  raid_bdev_process_finish_write_sb(void *ctx)
2458  {
2459  	struct raid_bdev *raid_bdev = ctx;
2460  	struct raid_bdev_superblock *sb = raid_bdev->sb;
2461  	struct raid_bdev_sb_base_bdev *sb_base_bdev;
2462  	struct raid_base_bdev_info *base_info;
2463  	uint8_t i;
2464  
2465  	for (i = 0; i < sb->base_bdevs_size; i++) {
2466  		sb_base_bdev = &sb->base_bdevs[i];
2467  
2468  		if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED &&
2469  		    sb_base_bdev->slot < raid_bdev->num_base_bdevs) {
2470  			base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
2471  			if (base_info->is_configured) {
2472  				sb_base_bdev->state = RAID_SB_BASE_BDEV_CONFIGURED;
2473  				sb_base_bdev->data_offset = base_info->data_offset;
2474  				spdk_uuid_copy(&sb_base_bdev->uuid, &base_info->uuid);
2475  			}
2476  		}
2477  	}
2478  
2479  	raid_bdev_write_superblock(raid_bdev, raid_bdev_process_finish_write_sb_cb, NULL);
2480  }
2481  
2482  static void raid_bdev_process_free(struct raid_bdev_process *process);
2483  
2484  static void
2485  _raid_bdev_process_finish_done(void *ctx)
2486  {
2487  	struct raid_bdev_process *process = ctx;
2488  	struct raid_process_finish_action *finish_action;
2489  
2490  	while ((finish_action = TAILQ_FIRST(&process->finish_actions)) != NULL) {
2491  		TAILQ_REMOVE(&process->finish_actions, finish_action, link);
2492  		finish_action->cb(finish_action->cb_ctx);
2493  		free(finish_action);
2494  	}
2495  
2496  	spdk_poller_unregister(&process->qos.process_continue_poller);
2497  
2498  	raid_bdev_process_free(process);
2499  
2500  	spdk_thread_exit(spdk_get_thread());
2501  }
2502  
2503  static void
2504  raid_bdev_process_finish_target_removed(void *ctx, int status)
2505  {
2506  	struct raid_bdev_process *process = ctx;
2507  
2508  	if (status != 0) {
2509  		SPDK_ERRLOG("Failed to remove target bdev: %s\n", spdk_strerror(-status));
2510  	}
2511  
2512  	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2513  }
2514  
2515  static void
2516  raid_bdev_process_finish_unquiesced(void *ctx, int status)
2517  {
2518  	struct raid_bdev_process *process = ctx;
2519  
2520  	if (status != 0) {
2521  		SPDK_ERRLOG("Failed to unquiesce bdev: %s\n", spdk_strerror(-status));
2522  	}
2523  
2524  	if (process->status != 0) {
2525  		status = _raid_bdev_remove_base_bdev(process->target, raid_bdev_process_finish_target_removed,
2526  						     process);
2527  		if (status != 0) {
2528  			raid_bdev_process_finish_target_removed(process, status);
2529  		}
2530  		return;
2531  	}
2532  
2533  	spdk_thread_send_msg(process->thread, _raid_bdev_process_finish_done, process);
2534  }
2535  
2536  static void
2537  raid_bdev_process_finish_unquiesce(void *ctx)
2538  {
2539  	struct raid_bdev_process *process = ctx;
2540  	int rc;
2541  
2542  	rc = spdk_bdev_unquiesce(&process->raid_bdev->bdev, &g_raid_if,
2543  				 raid_bdev_process_finish_unquiesced, process);
2544  	if (rc != 0) {
2545  		raid_bdev_process_finish_unquiesced(process, rc);
2546  	}
2547  }
2548  
2549  static void
2550  raid_bdev_process_finish_done(void *ctx)
2551  {
2552  	struct raid_bdev_process *process = ctx;
2553  	struct raid_bdev *raid_bdev = process->raid_bdev;
2554  
2555  	if (process->raid_ch != NULL) {
2556  		spdk_put_io_channel(spdk_io_channel_from_ctx(process->raid_ch));
2557  	}
2558  
2559  	process->state = RAID_PROCESS_STATE_STOPPED;
2560  
2561  	if (process->status == 0) {
2562  		SPDK_NOTICELOG("Finished %s on raid bdev %s\n",
2563  			       raid_bdev_process_to_str(process->type),
2564  			       raid_bdev->bdev.name);
2565  		if (raid_bdev->superblock_enabled) {
2566  			spdk_thread_send_msg(spdk_thread_get_app_thread(),
2567  					     raid_bdev_process_finish_write_sb,
2568  					     raid_bdev);
2569  		}
2570  	} else {
2571  		SPDK_WARNLOG("Finished %s on raid bdev %s: %s\n",
2572  			     raid_bdev_process_to_str(process->type),
2573  			     raid_bdev->bdev.name,
2574  			     spdk_strerror(-process->status));
2575  	}
2576  
2577  	spdk_thread_send_msg(spdk_thread_get_app_thread(), raid_bdev_process_finish_unquiesce,
2578  			     process);
2579  }
2580  
2581  static void
2582  __raid_bdev_process_finish(struct spdk_io_channel_iter *i, int status)
2583  {
2584  	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2585  
2586  	spdk_thread_send_msg(process->thread, raid_bdev_process_finish_done, process);
2587  }
2588  
2589  static void
2590  raid_bdev_channel_process_finish(struct spdk_io_channel_iter *i)
2591  {
2592  	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2593  	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2594  	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2595  
2596  	if (process->status == 0) {
2597  		uint8_t slot = raid_bdev_base_bdev_slot(process->target);
2598  
2599  		raid_ch->base_channel[slot] = raid_ch->process.target_ch;
2600  		raid_ch->process.target_ch = NULL;
2601  	}
2602  
2603  	raid_bdev_ch_process_cleanup(raid_ch);
2604  
2605  	spdk_for_each_channel_continue(i, 0);
2606  }
2607  
2608  static void
2609  raid_bdev_process_finish_quiesced(void *ctx, int status)
2610  {
2611  	struct raid_bdev_process *process = ctx;
2612  	struct raid_bdev *raid_bdev = process->raid_bdev;
2613  
2614  	if (status != 0) {
2615  		SPDK_ERRLOG("Failed to quiesce bdev: %s\n", spdk_strerror(-status));
2616  		return;
2617  	}
2618  
2619  	raid_bdev->process = NULL;
2620  	process->target->is_process_target = false;
2621  
2622  	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_process_finish, process,
2623  			      __raid_bdev_process_finish);
2624  }
2625  
2626  static void
2627  _raid_bdev_process_finish(void *ctx)
2628  {
2629  	struct raid_bdev_process *process = ctx;
2630  	int rc;
2631  
2632  	rc = spdk_bdev_quiesce(&process->raid_bdev->bdev, &g_raid_if,
2633  			       raid_bdev_process_finish_quiesced, process);
2634  	if (rc != 0) {
2635  		raid_bdev_process_finish_quiesced(ctx, rc);
2636  	}
2637  }
2638  
2639  static void
2640  raid_bdev_process_do_finish(struct raid_bdev_process *process)
2641  {
2642  	spdk_thread_send_msg(spdk_thread_get_app_thread(), _raid_bdev_process_finish, process);
2643  }
2644  
2645  static void raid_bdev_process_unlock_window_range(struct raid_bdev_process *process);
2646  static void raid_bdev_process_thread_run(struct raid_bdev_process *process);
2647  
2648  static void
2649  raid_bdev_process_finish(struct raid_bdev_process *process, int status)
2650  {
2651  	assert(spdk_get_thread() == process->thread);
2652  
2653  	if (process->status == 0) {
2654  		process->status = status;
2655  	}
2656  
2657  	if (process->state >= RAID_PROCESS_STATE_STOPPING) {
2658  		return;
2659  	}
2660  
2661  	assert(process->state == RAID_PROCESS_STATE_RUNNING);
2662  	process->state = RAID_PROCESS_STATE_STOPPING;
2663  
2664  	if (process->window_range_locked) {
2665  		raid_bdev_process_unlock_window_range(process);
2666  	} else {
2667  		raid_bdev_process_thread_run(process);
2668  	}
2669  }
2670  
2671  static void
2672  raid_bdev_process_window_range_unlocked(void *ctx, int status)
2673  {
2674  	struct raid_bdev_process *process = ctx;
2675  
2676  	if (status != 0) {
2677  		SPDK_ERRLOG("Failed to unlock LBA range: %s\n", spdk_strerror(-status));
2678  		raid_bdev_process_finish(process, status);
2679  		return;
2680  	}
2681  
2682  	process->window_range_locked = false;
2683  	process->window_offset += process->window_size;
2684  
2685  	raid_bdev_process_thread_run(process);
2686  }
2687  
2688  static void
2689  raid_bdev_process_unlock_window_range(struct raid_bdev_process *process)
2690  {
2691  	int rc;
2692  
2693  	assert(process->window_range_locked == true);
2694  
2695  	rc = spdk_bdev_unquiesce_range(&process->raid_bdev->bdev, &g_raid_if,
2696  				       process->window_offset, process->max_window_size,
2697  				       raid_bdev_process_window_range_unlocked, process);
2698  	if (rc != 0) {
2699  		raid_bdev_process_window_range_unlocked(process, rc);
2700  	}
2701  }
2702  
2703  static void
2704  raid_bdev_process_channels_update_done(struct spdk_io_channel_iter *i, int status)
2705  {
2706  	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2707  
2708  	raid_bdev_process_unlock_window_range(process);
2709  }
2710  
2711  static void
2712  raid_bdev_process_channel_update(struct spdk_io_channel_iter *i)
2713  {
2714  	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2715  	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2716  	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2717  
2718  	raid_ch->process.offset = process->window_offset + process->window_size;
2719  
2720  	spdk_for_each_channel_continue(i, 0);
2721  }
2722  
2723  void
2724  raid_bdev_process_request_complete(struct raid_bdev_process_request *process_req, int status)
2725  {
2726  	struct raid_bdev_process *process = process_req->process;
2727  
2728  	TAILQ_INSERT_TAIL(&process->requests, process_req, link);
2729  
2730  	assert(spdk_get_thread() == process->thread);
2731  	assert(process->window_remaining >= process_req->num_blocks);
2732  
2733  	if (status != 0) {
2734  		process->window_status = status;
2735  	}
2736  
2737  	process->window_remaining -= process_req->num_blocks;
2738  	if (process->window_remaining == 0) {
2739  		if (process->window_status != 0) {
2740  			raid_bdev_process_finish(process, process->window_status);
2741  			return;
2742  		}
2743  
2744  		spdk_for_each_channel(process->raid_bdev, raid_bdev_process_channel_update, process,
2745  				      raid_bdev_process_channels_update_done);
2746  	}
2747  }
2748  
2749  static int
2750  raid_bdev_submit_process_request(struct raid_bdev_process *process, uint64_t offset_blocks,
2751  				 uint32_t num_blocks)
2752  {
2753  	struct raid_bdev *raid_bdev = process->raid_bdev;
2754  	struct raid_bdev_process_request *process_req;
2755  	int ret;
2756  
2757  	process_req = TAILQ_FIRST(&process->requests);
2758  	if (process_req == NULL) {
2759  		assert(process->window_remaining > 0);
2760  		return 0;
2761  	}
2762  
2763  	process_req->target = process->target;
2764  	process_req->target_ch = process->raid_ch->process.target_ch;
2765  	process_req->offset_blocks = offset_blocks;
2766  	process_req->num_blocks = num_blocks;
2767  	process_req->iov.iov_len = num_blocks * raid_bdev->bdev.blocklen;
2768  
2769  	ret = raid_bdev->module->submit_process_request(process_req, process->raid_ch);
2770  	if (ret <= 0) {
2771  		if (ret < 0) {
2772  			SPDK_ERRLOG("Failed to submit process request on %s: %s\n",
2773  				    raid_bdev->bdev.name, spdk_strerror(-ret));
2774  			process->window_status = ret;
2775  		}
2776  		return ret;
2777  	}
2778  
2779  	process_req->num_blocks = ret;
2780  	TAILQ_REMOVE(&process->requests, process_req, link);
2781  
2782  	return ret;
2783  }
2784  
2785  static void
2786  _raid_bdev_process_thread_run(struct raid_bdev_process *process)
2787  {
2788  	struct raid_bdev *raid_bdev = process->raid_bdev;
2789  	uint64_t offset = process->window_offset;
2790  	const uint64_t offset_end = spdk_min(offset + process->max_window_size, raid_bdev->bdev.blockcnt);
2791  	int ret;
2792  
2793  	while (offset < offset_end) {
2794  		ret = raid_bdev_submit_process_request(process, offset, offset_end - offset);
2795  		if (ret <= 0) {
2796  			break;
2797  		}
2798  
2799  		process->window_remaining += ret;
2800  		offset += ret;
2801  	}
2802  
2803  	if (process->window_remaining > 0) {
2804  		process->window_size = process->window_remaining;
2805  	} else {
2806  		raid_bdev_process_finish(process, process->window_status);
2807  	}
2808  }
2809  
2810  static void
2811  raid_bdev_process_window_range_locked(void *ctx, int status)
2812  {
2813  	struct raid_bdev_process *process = ctx;
2814  
2815  	if (status != 0) {
2816  		SPDK_ERRLOG("Failed to lock LBA range: %s\n", spdk_strerror(-status));
2817  		raid_bdev_process_finish(process, status);
2818  		return;
2819  	}
2820  
2821  	process->window_range_locked = true;
2822  
2823  	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2824  		raid_bdev_process_unlock_window_range(process);
2825  		return;
2826  	}
2827  
2828  	_raid_bdev_process_thread_run(process);
2829  }
2830  
2831  static bool
2832  raid_bdev_process_consume_token(struct raid_bdev_process *process)
2833  {
2834  	struct raid_bdev *raid_bdev = process->raid_bdev;
2835  	uint64_t now = spdk_get_ticks();
2836  
2837  	process->qos.bytes_available = spdk_min(process->qos.bytes_max,
2838  						process->qos.bytes_available +
2839  						(now - process->qos.last_tsc) * process->qos.bytes_per_tsc);
2840  	process->qos.last_tsc = now;
2841  	if (process->qos.bytes_available > 0.0) {
2842  		process->qos.bytes_available -= process->window_size * raid_bdev->bdev.blocklen;
2843  		return true;
2844  	}
2845  	return false;
2846  }
2847  
2848  static bool
2849  raid_bdev_process_lock_window_range(struct raid_bdev_process *process)
2850  {
2851  	struct raid_bdev *raid_bdev = process->raid_bdev;
2852  	int rc;
2853  
2854  	assert(process->window_range_locked == false);
2855  
2856  	if (process->qos.enable_qos) {
2857  		if (raid_bdev_process_consume_token(process)) {
2858  			spdk_poller_pause(process->qos.process_continue_poller);
2859  		} else {
2860  			spdk_poller_resume(process->qos.process_continue_poller);
2861  			return false;
2862  		}
2863  	}
2864  
2865  	rc = spdk_bdev_quiesce_range(&raid_bdev->bdev, &g_raid_if,
2866  				     process->window_offset, process->max_window_size,
2867  				     raid_bdev_process_window_range_locked, process);
2868  	if (rc != 0) {
2869  		raid_bdev_process_window_range_locked(process, rc);
2870  	}
2871  	return true;
2872  }
2873  
2874  static int
2875  raid_bdev_process_continue_poll(void *arg)
2876  {
2877  	struct raid_bdev_process *process = arg;
2878  
2879  	if (raid_bdev_process_lock_window_range(process)) {
2880  		return SPDK_POLLER_BUSY;
2881  	}
2882  	return SPDK_POLLER_IDLE;
2883  }
2884  
2885  static void
2886  raid_bdev_process_thread_run(struct raid_bdev_process *process)
2887  {
2888  	struct raid_bdev *raid_bdev = process->raid_bdev;
2889  
2890  	assert(spdk_get_thread() == process->thread);
2891  	assert(process->window_remaining == 0);
2892  	assert(process->window_range_locked == false);
2893  
2894  	if (process->state == RAID_PROCESS_STATE_STOPPING) {
2895  		raid_bdev_process_do_finish(process);
2896  		return;
2897  	}
2898  
2899  	if (process->window_offset == raid_bdev->bdev.blockcnt) {
2900  		SPDK_DEBUGLOG(bdev_raid, "process completed on %s\n", raid_bdev->bdev.name);
2901  		raid_bdev_process_finish(process, 0);
2902  		return;
2903  	}
2904  
2905  	process->max_window_size = spdk_min(raid_bdev->bdev.blockcnt - process->window_offset,
2906  					    process->max_window_size);
2907  	raid_bdev_process_lock_window_range(process);
2908  }
2909  
2910  static void
2911  raid_bdev_process_thread_init(void *ctx)
2912  {
2913  	struct raid_bdev_process *process = ctx;
2914  	struct raid_bdev *raid_bdev = process->raid_bdev;
2915  	struct spdk_io_channel *ch;
2916  
2917  	process->thread = spdk_get_thread();
2918  
2919  	ch = spdk_get_io_channel(raid_bdev);
2920  	if (ch == NULL) {
2921  		process->status = -ENOMEM;
2922  		raid_bdev_process_do_finish(process);
2923  		return;
2924  	}
2925  
2926  	process->raid_ch = spdk_io_channel_get_ctx(ch);
2927  	process->state = RAID_PROCESS_STATE_RUNNING;
2928  
2929  	if (process->qos.enable_qos) {
2930  		process->qos.process_continue_poller = SPDK_POLLER_REGISTER(raid_bdev_process_continue_poll,
2931  						       process, 0);
2932  		spdk_poller_pause(process->qos.process_continue_poller);
2933  	}
2934  
2935  	SPDK_NOTICELOG("Started %s on raid bdev %s\n",
2936  		       raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2937  
2938  	raid_bdev_process_thread_run(process);
2939  }
2940  
2941  static void
2942  raid_bdev_channels_abort_start_process_done(struct spdk_io_channel_iter *i, int status)
2943  {
2944  	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2945  
2946  	_raid_bdev_remove_base_bdev(process->target, NULL, NULL);
2947  	raid_bdev_process_free(process);
2948  
2949  	/* TODO: update sb */
2950  }
2951  
2952  static void
2953  raid_bdev_channel_abort_start_process(struct spdk_io_channel_iter *i)
2954  {
2955  	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
2956  	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
2957  
2958  	raid_bdev_ch_process_cleanup(raid_ch);
2959  
2960  	spdk_for_each_channel_continue(i, 0);
2961  }
2962  
2963  static void
2964  raid_bdev_channels_start_process_done(struct spdk_io_channel_iter *i, int status)
2965  {
2966  	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
2967  	struct raid_bdev *raid_bdev = process->raid_bdev;
2968  	struct spdk_thread *thread;
2969  	char thread_name[RAID_BDEV_SB_NAME_SIZE + 16];
2970  
2971  	if (status == 0 &&
2972  	    (process->target->remove_scheduled || !process->target->is_configured ||
2973  	     raid_bdev->num_base_bdevs_operational <= raid_bdev->min_base_bdevs_operational)) {
2974  		/* a base bdev was removed before we got here */
2975  		status = -ENODEV;
2976  	}
2977  
2978  	if (status != 0) {
2979  		SPDK_ERRLOG("Failed to start %s on %s: %s\n",
2980  			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name,
2981  			    spdk_strerror(-status));
2982  		goto err;
2983  	}
2984  
2985  	snprintf(thread_name, sizeof(thread_name), "%s_%s",
2986  		 raid_bdev->bdev.name, raid_bdev_process_to_str(process->type));
2987  
2988  	thread = spdk_thread_create(thread_name, NULL);
2989  	if (thread == NULL) {
2990  		SPDK_ERRLOG("Failed to create %s thread for %s\n",
2991  			    raid_bdev_process_to_str(process->type), raid_bdev->bdev.name);
2992  		goto err;
2993  	}
2994  
2995  	raid_bdev->process = process;
2996  
2997  	spdk_thread_send_msg(thread, raid_bdev_process_thread_init, process);
2998  
2999  	return;
3000  err:
3001  	spdk_for_each_channel(process->raid_bdev, raid_bdev_channel_abort_start_process, process,
3002  			      raid_bdev_channels_abort_start_process_done);
3003  }
3004  
3005  static void
3006  raid_bdev_channel_start_process(struct spdk_io_channel_iter *i)
3007  {
3008  	struct raid_bdev_process *process = spdk_io_channel_iter_get_ctx(i);
3009  	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
3010  	struct raid_bdev_io_channel *raid_ch = spdk_io_channel_get_ctx(ch);
3011  	int rc;
3012  
3013  	rc = raid_bdev_ch_process_setup(raid_ch, process);
3014  
3015  	spdk_for_each_channel_continue(i, rc);
3016  }
3017  
3018  static void
3019  raid_bdev_process_start(struct raid_bdev_process *process)
3020  {
3021  	struct raid_bdev *raid_bdev = process->raid_bdev;
3022  
3023  	assert(raid_bdev->module->submit_process_request != NULL);
3024  
3025  	spdk_for_each_channel(raid_bdev, raid_bdev_channel_start_process, process,
3026  			      raid_bdev_channels_start_process_done);
3027  }
3028  
3029  static void
3030  raid_bdev_process_request_free(struct raid_bdev_process_request *process_req)
3031  {
3032  	spdk_dma_free(process_req->iov.iov_base);
3033  	spdk_dma_free(process_req->md_buf);
3034  	free(process_req);
3035  }
3036  
3037  static struct raid_bdev_process_request *
3038  raid_bdev_process_alloc_request(struct raid_bdev_process *process)
3039  {
3040  	struct raid_bdev *raid_bdev = process->raid_bdev;
3041  	struct raid_bdev_process_request *process_req;
3042  
3043  	process_req = calloc(1, sizeof(*process_req));
3044  	if (process_req == NULL) {
3045  		return NULL;
3046  	}
3047  
3048  	process_req->process = process;
3049  	process_req->iov.iov_len = process->max_window_size * raid_bdev->bdev.blocklen;
3050  	process_req->iov.iov_base = spdk_dma_malloc(process_req->iov.iov_len, 4096, 0);
3051  	if (process_req->iov.iov_base == NULL) {
3052  		free(process_req);
3053  		return NULL;
3054  	}
3055  	if (spdk_bdev_is_md_separate(&raid_bdev->bdev)) {
3056  		process_req->md_buf = spdk_dma_malloc(process->max_window_size * raid_bdev->bdev.md_len, 4096, 0);
3057  		if (process_req->md_buf == NULL) {
3058  			raid_bdev_process_request_free(process_req);
3059  			return NULL;
3060  		}
3061  	}
3062  
3063  	return process_req;
3064  }
3065  
3066  static void
3067  raid_bdev_process_free(struct raid_bdev_process *process)
3068  {
3069  	struct raid_bdev_process_request *process_req;
3070  
3071  	while ((process_req = TAILQ_FIRST(&process->requests)) != NULL) {
3072  		TAILQ_REMOVE(&process->requests, process_req, link);
3073  		raid_bdev_process_request_free(process_req);
3074  	}
3075  
3076  	free(process);
3077  }
3078  
3079  static struct raid_bdev_process *
3080  raid_bdev_process_alloc(struct raid_bdev *raid_bdev, enum raid_process_type type,
3081  			struct raid_base_bdev_info *target)
3082  {
3083  	struct raid_bdev_process *process;
3084  	struct raid_bdev_process_request *process_req;
3085  	int i;
3086  
3087  	process = calloc(1, sizeof(*process));
3088  	if (process == NULL) {
3089  		return NULL;
3090  	}
3091  
3092  	process->raid_bdev = raid_bdev;
3093  	process->type = type;
3094  	process->target = target;
3095  	process->max_window_size = spdk_max(spdk_divide_round_up(g_opts.process_window_size_kb * 1024UL,
3096  					    spdk_bdev_get_data_block_size(&raid_bdev->bdev)),
3097  					    raid_bdev->bdev.write_unit_size);
3098  	TAILQ_INIT(&process->requests);
3099  	TAILQ_INIT(&process->finish_actions);
3100  
3101  	if (g_opts.process_max_bandwidth_mb_sec != 0) {
3102  		process->qos.enable_qos = true;
3103  		process->qos.last_tsc = spdk_get_ticks();
3104  		process->qos.bytes_per_tsc = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 /
3105  					     spdk_get_ticks_hz();
3106  		process->qos.bytes_max = g_opts.process_max_bandwidth_mb_sec * 1024 * 1024.0 / SPDK_SEC_TO_MSEC;
3107  		process->qos.bytes_available = 0.0;
3108  	}
3109  
3110  	for (i = 0; i < RAID_BDEV_PROCESS_MAX_QD; i++) {
3111  		process_req = raid_bdev_process_alloc_request(process);
3112  		if (process_req == NULL) {
3113  			raid_bdev_process_free(process);
3114  			return NULL;
3115  		}
3116  
3117  		TAILQ_INSERT_TAIL(&process->requests, process_req, link);
3118  	}
3119  
3120  	return process;
3121  }
3122  
3123  static int
3124  raid_bdev_start_rebuild(struct raid_base_bdev_info *target)
3125  {
3126  	struct raid_bdev_process *process;
3127  
3128  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3129  
3130  	process = raid_bdev_process_alloc(target->raid_bdev, RAID_PROCESS_REBUILD, target);
3131  	if (process == NULL) {
3132  		return -ENOMEM;
3133  	}
3134  
3135  	raid_bdev_process_start(process);
3136  
3137  	return 0;
3138  }
3139  
3140  static void raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info);
3141  
3142  static void
3143  _raid_bdev_configure_base_bdev_cont(struct spdk_io_channel_iter *i, int status)
3144  {
3145  	struct raid_base_bdev_info *base_info = spdk_io_channel_iter_get_ctx(i);
3146  
3147  	raid_bdev_configure_base_bdev_cont(base_info);
3148  }
3149  
3150  static void
3151  raid_bdev_ch_sync(struct spdk_io_channel_iter *i)
3152  {
3153  	spdk_for_each_channel_continue(i, 0);
3154  }
3155  
3156  static void
3157  raid_bdev_configure_base_bdev_cont(struct raid_base_bdev_info *base_info)
3158  {
3159  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3160  	raid_base_bdev_cb configure_cb;
3161  	int rc;
3162  
3163  	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational &&
3164  	    base_info->is_process_target == false) {
3165  		/* TODO: defer if rebuild in progress on another base bdev */
3166  		assert(raid_bdev->process == NULL);
3167  		assert(raid_bdev->state == RAID_BDEV_STATE_ONLINE);
3168  		base_info->is_process_target = true;
3169  		/* To assure is_process_target is set before is_configured when checked in raid_bdev_create_cb() */
3170  		spdk_for_each_channel(raid_bdev, raid_bdev_ch_sync, base_info, _raid_bdev_configure_base_bdev_cont);
3171  		return;
3172  	}
3173  
3174  	base_info->is_configured = true;
3175  
3176  	raid_bdev->num_base_bdevs_discovered++;
3177  	assert(raid_bdev->num_base_bdevs_discovered <= raid_bdev->num_base_bdevs);
3178  	assert(raid_bdev->num_base_bdevs_operational <= raid_bdev->num_base_bdevs);
3179  	assert(raid_bdev->num_base_bdevs_operational >= raid_bdev->min_base_bdevs_operational);
3180  
3181  	configure_cb = base_info->configure_cb;
3182  	base_info->configure_cb = NULL;
3183  	/*
3184  	 * Configure the raid bdev when the number of discovered base bdevs reaches the number
3185  	 * of base bdevs we know to be operational members of the array. Usually this is equal
3186  	 * to the total number of base bdevs (num_base_bdevs) but can be less - when the array is
3187  	 * degraded.
3188  	 */
3189  	if (raid_bdev->num_base_bdevs_discovered == raid_bdev->num_base_bdevs_operational) {
3190  		rc = raid_bdev_configure(raid_bdev, configure_cb, base_info->configure_cb_ctx);
3191  		if (rc != 0) {
3192  			SPDK_ERRLOG("Failed to configure raid bdev: %s\n", spdk_strerror(-rc));
3193  		} else {
3194  			configure_cb = NULL;
3195  		}
3196  	} else if (base_info->is_process_target) {
3197  		raid_bdev->num_base_bdevs_operational++;
3198  		rc = raid_bdev_start_rebuild(base_info);
3199  		if (rc != 0) {
3200  			SPDK_ERRLOG("Failed to start rebuild: %s\n", spdk_strerror(-rc));
3201  			_raid_bdev_remove_base_bdev(base_info, NULL, NULL);
3202  		}
3203  	} else {
3204  		rc = 0;
3205  	}
3206  
3207  	if (configure_cb != NULL) {
3208  		configure_cb(base_info->configure_cb_ctx, rc);
3209  	}
3210  }
3211  
3212  static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3213  				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3214  
3215  static void
3216  raid_bdev_configure_base_bdev_check_sb_cb(const struct raid_bdev_superblock *sb, int status,
3217  		void *ctx)
3218  {
3219  	struct raid_base_bdev_info *base_info = ctx;
3220  	raid_base_bdev_cb configure_cb = base_info->configure_cb;
3221  
3222  	switch (status) {
3223  	case 0:
3224  		/* valid superblock found */
3225  		base_info->configure_cb = NULL;
3226  		if (spdk_uuid_compare(&base_info->raid_bdev->bdev.uuid, &sb->uuid) == 0) {
3227  			struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(base_info->desc);
3228  
3229  			raid_bdev_free_base_bdev_resource(base_info);
3230  			raid_bdev_examine_sb(sb, bdev, configure_cb, base_info->configure_cb_ctx);
3231  			return;
3232  		}
3233  		SPDK_ERRLOG("Superblock of a different raid bdev found on bdev %s\n", base_info->name);
3234  		status = -EEXIST;
3235  		raid_bdev_free_base_bdev_resource(base_info);
3236  		break;
3237  	case -EINVAL:
3238  		/* no valid superblock */
3239  		raid_bdev_configure_base_bdev_cont(base_info);
3240  		return;
3241  	default:
3242  		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3243  			    base_info->name, spdk_strerror(-status));
3244  		break;
3245  	}
3246  
3247  	if (configure_cb != NULL) {
3248  		base_info->configure_cb = NULL;
3249  		configure_cb(base_info->configure_cb_ctx, status);
3250  	}
3251  }
3252  
3253  static int
3254  raid_bdev_configure_base_bdev(struct raid_base_bdev_info *base_info, bool existing,
3255  			      raid_base_bdev_cb cb_fn, void *cb_ctx)
3256  {
3257  	struct raid_bdev *raid_bdev = base_info->raid_bdev;
3258  	struct spdk_bdev_desc *desc;
3259  	struct spdk_bdev *bdev;
3260  	const struct spdk_uuid *bdev_uuid;
3261  	int rc;
3262  
3263  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3264  	assert(base_info->desc == NULL);
3265  
3266  	/*
3267  	 * Base bdev can be added by name or uuid. Here we assure both properties are set and valid
3268  	 * before claiming the bdev.
3269  	 */
3270  
3271  	if (!spdk_uuid_is_null(&base_info->uuid)) {
3272  		char uuid_str[SPDK_UUID_STRING_LEN];
3273  		const char *bdev_name;
3274  
3275  		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3276  
3277  		/* UUID of a bdev is registered as its alias */
3278  		bdev = spdk_bdev_get_by_name(uuid_str);
3279  		if (bdev == NULL) {
3280  			return -ENODEV;
3281  		}
3282  
3283  		bdev_name = spdk_bdev_get_name(bdev);
3284  
3285  		if (base_info->name == NULL) {
3286  			assert(existing == true);
3287  			base_info->name = strdup(bdev_name);
3288  			if (base_info->name == NULL) {
3289  				return -ENOMEM;
3290  			}
3291  		} else if (strcmp(base_info->name, bdev_name) != 0) {
3292  			SPDK_ERRLOG("Name mismatch for base bdev '%s' - expected '%s'\n",
3293  				    bdev_name, base_info->name);
3294  			return -EINVAL;
3295  		}
3296  	}
3297  
3298  	assert(base_info->name != NULL);
3299  
3300  	rc = spdk_bdev_open_ext(base_info->name, true, raid_bdev_event_base_bdev, NULL, &desc);
3301  	if (rc != 0) {
3302  		if (rc != -ENODEV) {
3303  			SPDK_ERRLOG("Unable to create desc on bdev '%s'\n", base_info->name);
3304  		}
3305  		return rc;
3306  	}
3307  
3308  	bdev = spdk_bdev_desc_get_bdev(desc);
3309  	bdev_uuid = spdk_bdev_get_uuid(bdev);
3310  
3311  	if (spdk_uuid_is_null(&base_info->uuid)) {
3312  		spdk_uuid_copy(&base_info->uuid, bdev_uuid);
3313  	} else if (spdk_uuid_compare(&base_info->uuid, bdev_uuid) != 0) {
3314  		SPDK_ERRLOG("UUID mismatch for base bdev '%s'\n", base_info->name);
3315  		spdk_bdev_close(desc);
3316  		return -EINVAL;
3317  	}
3318  
3319  	rc = spdk_bdev_module_claim_bdev(bdev, NULL, &g_raid_if);
3320  	if (rc != 0) {
3321  		SPDK_ERRLOG("Unable to claim this bdev as it is already claimed\n");
3322  		spdk_bdev_close(desc);
3323  		return rc;
3324  	}
3325  
3326  	SPDK_DEBUGLOG(bdev_raid, "bdev %s is claimed\n", bdev->name);
3327  
3328  	base_info->app_thread_ch = spdk_bdev_get_io_channel(desc);
3329  	if (base_info->app_thread_ch == NULL) {
3330  		SPDK_ERRLOG("Failed to get io channel\n");
3331  		spdk_bdev_module_release_bdev(bdev);
3332  		spdk_bdev_close(desc);
3333  		return -ENOMEM;
3334  	}
3335  
3336  	base_info->desc = desc;
3337  	base_info->blockcnt = bdev->blockcnt;
3338  
3339  	if (raid_bdev->superblock_enabled) {
3340  		uint64_t data_offset;
3341  
3342  		if (base_info->data_offset == 0) {
3343  			assert((RAID_BDEV_MIN_DATA_OFFSET_SIZE % spdk_bdev_get_data_block_size(bdev)) == 0);
3344  			data_offset = RAID_BDEV_MIN_DATA_OFFSET_SIZE / spdk_bdev_get_data_block_size(bdev);
3345  		} else {
3346  			data_offset = base_info->data_offset;
3347  		}
3348  
3349  		if (bdev->optimal_io_boundary != 0) {
3350  			data_offset = spdk_divide_round_up(data_offset,
3351  							   bdev->optimal_io_boundary) * bdev->optimal_io_boundary;
3352  			if (base_info->data_offset != 0 && base_info->data_offset != data_offset) {
3353  				SPDK_WARNLOG("Data offset %lu on bdev '%s' is different than optimal value %lu\n",
3354  					     base_info->data_offset, base_info->name, data_offset);
3355  				data_offset = base_info->data_offset;
3356  			}
3357  		}
3358  
3359  		base_info->data_offset = data_offset;
3360  	}
3361  
3362  	if (base_info->data_offset >= bdev->blockcnt) {
3363  		SPDK_ERRLOG("Data offset %lu exceeds base bdev capacity %lu on bdev '%s'\n",
3364  			    base_info->data_offset, bdev->blockcnt, base_info->name);
3365  		rc = -EINVAL;
3366  		goto out;
3367  	}
3368  
3369  	if (base_info->data_size == 0) {
3370  		base_info->data_size = bdev->blockcnt - base_info->data_offset;
3371  	} else if (base_info->data_offset + base_info->data_size > bdev->blockcnt) {
3372  		SPDK_ERRLOG("Data offset and size exceeds base bdev capacity %lu on bdev '%s'\n",
3373  			    bdev->blockcnt, base_info->name);
3374  		rc = -EINVAL;
3375  		goto out;
3376  	}
3377  
3378  	if (!raid_bdev->module->dif_supported && spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3379  		SPDK_ERRLOG("Base bdev '%s' has DIF or DIX enabled - unsupported RAID configuration\n",
3380  			    bdev->name);
3381  		rc = -EINVAL;
3382  		goto out;
3383  	}
3384  
3385  	/*
3386  	 * Set the raid bdev properties if this is the first base bdev configured,
3387  	 * otherwise - verify. Assumption is that all the base bdevs for any raid bdev should
3388  	 * have the same blocklen and metadata format.
3389  	 */
3390  	if (raid_bdev->bdev.blocklen == 0) {
3391  		raid_bdev->bdev.blocklen = bdev->blocklen;
3392  		raid_bdev->bdev.md_len = spdk_bdev_get_md_size(bdev);
3393  		raid_bdev->bdev.md_interleave = spdk_bdev_is_md_interleaved(bdev);
3394  		raid_bdev->bdev.dif_type = spdk_bdev_get_dif_type(bdev);
3395  		raid_bdev->bdev.dif_check_flags = bdev->dif_check_flags;
3396  		raid_bdev->bdev.dif_is_head_of_md = spdk_bdev_is_dif_head_of_md(bdev);
3397  		raid_bdev->bdev.dif_pi_format = bdev->dif_pi_format;
3398  	} else {
3399  		if (raid_bdev->bdev.blocklen != bdev->blocklen) {
3400  			SPDK_ERRLOG("Raid bdev '%s' blocklen %u differs from base bdev '%s' blocklen %u\n",
3401  				    raid_bdev->bdev.name, raid_bdev->bdev.blocklen, bdev->name, bdev->blocklen);
3402  			rc = -EINVAL;
3403  			goto out;
3404  		}
3405  
3406  		if (raid_bdev->bdev.md_len != spdk_bdev_get_md_size(bdev) ||
3407  		    raid_bdev->bdev.md_interleave != spdk_bdev_is_md_interleaved(bdev) ||
3408  		    raid_bdev->bdev.dif_type != spdk_bdev_get_dif_type(bdev) ||
3409  		    raid_bdev->bdev.dif_check_flags != bdev->dif_check_flags ||
3410  		    raid_bdev->bdev.dif_is_head_of_md != spdk_bdev_is_dif_head_of_md(bdev) ||
3411  		    raid_bdev->bdev.dif_pi_format != bdev->dif_pi_format) {
3412  			SPDK_ERRLOG("Raid bdev '%s' has different metadata format than base bdev '%s'\n",
3413  				    raid_bdev->bdev.name, bdev->name);
3414  			rc = -EINVAL;
3415  			goto out;
3416  		}
3417  	}
3418  
3419  	assert(base_info->configure_cb == NULL);
3420  	base_info->configure_cb = cb_fn;
3421  	base_info->configure_cb_ctx = cb_ctx;
3422  
3423  	if (existing) {
3424  		raid_bdev_configure_base_bdev_cont(base_info);
3425  	} else {
3426  		/* check for existing superblock when using a new bdev */
3427  		rc = raid_bdev_load_base_bdev_superblock(desc, base_info->app_thread_ch,
3428  				raid_bdev_configure_base_bdev_check_sb_cb, base_info);
3429  		if (rc) {
3430  			SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3431  				    bdev->name, spdk_strerror(-rc));
3432  		}
3433  	}
3434  out:
3435  	if (rc != 0) {
3436  		base_info->configure_cb = NULL;
3437  		raid_bdev_free_base_bdev_resource(base_info);
3438  	}
3439  	return rc;
3440  }
3441  
3442  int
3443  raid_bdev_add_base_bdev(struct raid_bdev *raid_bdev, const char *name,
3444  			raid_base_bdev_cb cb_fn, void *cb_ctx)
3445  {
3446  	struct raid_base_bdev_info *base_info = NULL, *iter;
3447  	int rc;
3448  
3449  	assert(name != NULL);
3450  	assert(spdk_get_thread() == spdk_thread_get_app_thread());
3451  
3452  	if (raid_bdev->process != NULL) {
3453  		SPDK_ERRLOG("raid bdev '%s' is in process\n",
3454  			    raid_bdev->bdev.name);
3455  		return -EPERM;
3456  	}
3457  
3458  	if (raid_bdev->state == RAID_BDEV_STATE_CONFIGURING) {
3459  		struct spdk_bdev *bdev = spdk_bdev_get_by_name(name);
3460  
3461  		if (bdev != NULL) {
3462  			RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3463  				if (iter->name == NULL &&
3464  				    spdk_uuid_compare(&bdev->uuid, &iter->uuid) == 0) {
3465  					base_info = iter;
3466  					break;
3467  				}
3468  			}
3469  		}
3470  	}
3471  
3472  	if (base_info == NULL || raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3473  		RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3474  			if (iter->name == NULL && spdk_uuid_is_null(&iter->uuid)) {
3475  				base_info = iter;
3476  				break;
3477  			}
3478  		}
3479  	}
3480  
3481  	if (base_info == NULL) {
3482  		SPDK_ERRLOG("no empty slot found in raid bdev '%s' for new base bdev '%s'\n",
3483  			    raid_bdev->bdev.name, name);
3484  		return -EINVAL;
3485  	}
3486  
3487  	assert(base_info->is_configured == false);
3488  
3489  	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3490  		assert(base_info->data_size != 0);
3491  		assert(base_info->desc == NULL);
3492  	}
3493  
3494  	base_info->name = strdup(name);
3495  	if (base_info->name == NULL) {
3496  		return -ENOMEM;
3497  	}
3498  
3499  	rc = raid_bdev_configure_base_bdev(base_info, false, cb_fn, cb_ctx);
3500  	if (rc != 0 && (rc != -ENODEV || raid_bdev->state != RAID_BDEV_STATE_CONFIGURING)) {
3501  		SPDK_ERRLOG("base bdev '%s' configure failed: %s\n", name, spdk_strerror(-rc));
3502  		free(base_info->name);
3503  		base_info->name = NULL;
3504  	}
3505  
3506  	return rc;
3507  }
3508  
3509  static int
3510  raid_bdev_create_from_sb(const struct raid_bdev_superblock *sb, struct raid_bdev **raid_bdev_out)
3511  {
3512  	struct raid_bdev *raid_bdev;
3513  	uint8_t i;
3514  	int rc;
3515  
3516  	rc = _raid_bdev_create(sb->name, (sb->strip_size * sb->block_size) / 1024, sb->num_base_bdevs,
3517  			       sb->level, true, &sb->uuid, &raid_bdev);
3518  	if (rc != 0) {
3519  		return rc;
3520  	}
3521  
3522  	rc = raid_bdev_alloc_superblock(raid_bdev, sb->block_size);
3523  	if (rc != 0) {
3524  		raid_bdev_free(raid_bdev);
3525  		return rc;
3526  	}
3527  
3528  	assert(sb->length <= RAID_BDEV_SB_MAX_LENGTH);
3529  	memcpy(raid_bdev->sb, sb, sb->length);
3530  
3531  	for (i = 0; i < sb->base_bdevs_size; i++) {
3532  		const struct raid_bdev_sb_base_bdev *sb_base_bdev = &sb->base_bdevs[i];
3533  		struct raid_base_bdev_info *base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3534  
3535  		if (sb_base_bdev->state == RAID_SB_BASE_BDEV_CONFIGURED) {
3536  			spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3537  			raid_bdev->num_base_bdevs_operational++;
3538  		}
3539  
3540  		base_info->data_offset = sb_base_bdev->data_offset;
3541  		base_info->data_size = sb_base_bdev->data_size;
3542  	}
3543  
3544  	*raid_bdev_out = raid_bdev;
3545  	return 0;
3546  }
3547  
3548  static void
3549  raid_bdev_examine_no_sb(struct spdk_bdev *bdev)
3550  {
3551  	struct raid_bdev *raid_bdev;
3552  	struct raid_base_bdev_info *base_info;
3553  
3554  	TAILQ_FOREACH(raid_bdev, &g_raid_bdev_list, global_link) {
3555  		if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING || raid_bdev->sb != NULL) {
3556  			continue;
3557  		}
3558  		RAID_FOR_EACH_BASE_BDEV(raid_bdev, base_info) {
3559  			if (base_info->desc == NULL &&
3560  			    ((base_info->name != NULL && strcmp(bdev->name, base_info->name) == 0) ||
3561  			     spdk_uuid_compare(&base_info->uuid, &bdev->uuid) == 0)) {
3562  				raid_bdev_configure_base_bdev(base_info, true, NULL, NULL);
3563  				break;
3564  			}
3565  		}
3566  	}
3567  }
3568  
3569  struct raid_bdev_examine_others_ctx {
3570  	struct spdk_uuid raid_bdev_uuid;
3571  	uint8_t current_base_bdev_idx;
3572  	raid_base_bdev_cb cb_fn;
3573  	void *cb_ctx;
3574  };
3575  
3576  static void
3577  raid_bdev_examine_others_done(void *_ctx, int status)
3578  {
3579  	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3580  
3581  	if (ctx->cb_fn != NULL) {
3582  		ctx->cb_fn(ctx->cb_ctx, status);
3583  	}
3584  	free(ctx);
3585  }
3586  
3587  typedef void (*raid_bdev_examine_load_sb_cb)(struct spdk_bdev *bdev,
3588  		const struct raid_bdev_superblock *sb, int status, void *ctx);
3589  static int raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb,
3590  				     void *cb_ctx);
3591  static void raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3592  				 raid_base_bdev_cb cb_fn, void *cb_ctx);
3593  static void raid_bdev_examine_others(void *_ctx, int status);
3594  
3595  static void
3596  raid_bdev_examine_others_load_cb(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb,
3597  				 int status, void *_ctx)
3598  {
3599  	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3600  
3601  	if (status != 0) {
3602  		raid_bdev_examine_others_done(ctx, status);
3603  		return;
3604  	}
3605  
3606  	raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_others, ctx);
3607  }
3608  
3609  static void
3610  raid_bdev_examine_others(void *_ctx, int status)
3611  {
3612  	struct raid_bdev_examine_others_ctx *ctx = _ctx;
3613  	struct raid_bdev *raid_bdev;
3614  	struct raid_base_bdev_info *base_info;
3615  	char uuid_str[SPDK_UUID_STRING_LEN];
3616  
3617  	if (status != 0 && status != -EEXIST) {
3618  		goto out;
3619  	}
3620  
3621  	raid_bdev = raid_bdev_find_by_uuid(&ctx->raid_bdev_uuid);
3622  	if (raid_bdev == NULL) {
3623  		status = -ENODEV;
3624  		goto out;
3625  	}
3626  
3627  	for (base_info = &raid_bdev->base_bdev_info[ctx->current_base_bdev_idx];
3628  	     base_info < &raid_bdev->base_bdev_info[raid_bdev->num_base_bdevs];
3629  	     base_info++) {
3630  		if (base_info->is_configured || spdk_uuid_is_null(&base_info->uuid)) {
3631  			continue;
3632  		}
3633  
3634  		spdk_uuid_fmt_lower(uuid_str, sizeof(uuid_str), &base_info->uuid);
3635  
3636  		if (spdk_bdev_get_by_name(uuid_str) == NULL) {
3637  			continue;
3638  		}
3639  
3640  		ctx->current_base_bdev_idx = raid_bdev_base_bdev_slot(base_info);
3641  
3642  		status = raid_bdev_examine_load_sb(uuid_str, raid_bdev_examine_others_load_cb, ctx);
3643  		if (status != 0) {
3644  			continue;
3645  		}
3646  		return;
3647  	}
3648  out:
3649  	raid_bdev_examine_others_done(ctx, status);
3650  }
3651  
3652  static void
3653  raid_bdev_examine_sb(const struct raid_bdev_superblock *sb, struct spdk_bdev *bdev,
3654  		     raid_base_bdev_cb cb_fn, void *cb_ctx)
3655  {
3656  	const struct raid_bdev_sb_base_bdev *sb_base_bdev = NULL;
3657  	struct raid_bdev *raid_bdev;
3658  	struct raid_base_bdev_info *iter, *base_info;
3659  	uint8_t i;
3660  	int rc;
3661  
3662  	if (sb->block_size != spdk_bdev_get_data_block_size(bdev)) {
3663  		SPDK_WARNLOG("Bdev %s block size (%u) does not match the value in superblock (%u)\n",
3664  			     bdev->name, sb->block_size, spdk_bdev_get_data_block_size(bdev));
3665  		rc = -EINVAL;
3666  		goto out;
3667  	}
3668  
3669  	if (spdk_uuid_is_null(&sb->uuid)) {
3670  		SPDK_WARNLOG("NULL raid bdev UUID in superblock on bdev %s\n", bdev->name);
3671  		rc = -EINVAL;
3672  		goto out;
3673  	}
3674  
3675  	raid_bdev = raid_bdev_find_by_uuid(&sb->uuid);
3676  
3677  	if (raid_bdev) {
3678  		if (raid_bdev->sb == NULL) {
3679  			SPDK_WARNLOG("raid superblock is null\n");
3680  			rc = -EINVAL;
3681  			goto out;
3682  		}
3683  
3684  		if (sb->seq_number > raid_bdev->sb->seq_number) {
3685  			SPDK_DEBUGLOG(bdev_raid,
3686  				      "raid superblock seq_number on bdev %s (%lu) greater than existing raid bdev %s (%lu)\n",
3687  				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3688  
3689  			if (raid_bdev->state != RAID_BDEV_STATE_CONFIGURING) {
3690  				SPDK_WARNLOG("Newer version of raid bdev %s superblock found on bdev %s but raid bdev is not in configuring state.\n",
3691  					     raid_bdev->bdev.name, bdev->name);
3692  				rc = -EBUSY;
3693  				goto out;
3694  			}
3695  
3696  			/* remove and then recreate the raid bdev using the newer superblock */
3697  			raid_bdev_delete(raid_bdev, NULL, NULL);
3698  			raid_bdev = NULL;
3699  		} else if (sb->seq_number < raid_bdev->sb->seq_number) {
3700  			SPDK_DEBUGLOG(bdev_raid,
3701  				      "raid superblock seq_number on bdev %s (%lu) smaller than existing raid bdev %s (%lu)\n",
3702  				      bdev->name, sb->seq_number, raid_bdev->bdev.name, raid_bdev->sb->seq_number);
3703  			/* use the current raid bdev superblock */
3704  			sb = raid_bdev->sb;
3705  		}
3706  	}
3707  
3708  	for (i = 0; i < sb->base_bdevs_size; i++) {
3709  		sb_base_bdev = &sb->base_bdevs[i];
3710  
3711  		assert(spdk_uuid_is_null(&sb_base_bdev->uuid) == false);
3712  
3713  		if (spdk_uuid_compare(&sb_base_bdev->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3714  			break;
3715  		}
3716  	}
3717  
3718  	if (i == sb->base_bdevs_size) {
3719  		SPDK_DEBUGLOG(bdev_raid, "raid superblock does not contain this bdev's uuid\n");
3720  		rc = -EINVAL;
3721  		goto out;
3722  	}
3723  
3724  	if (!raid_bdev) {
3725  		struct raid_bdev_examine_others_ctx *ctx;
3726  
3727  		ctx = calloc(1, sizeof(*ctx));
3728  		if (ctx == NULL) {
3729  			rc = -ENOMEM;
3730  			goto out;
3731  		}
3732  
3733  		rc = raid_bdev_create_from_sb(sb, &raid_bdev);
3734  		if (rc != 0) {
3735  			SPDK_ERRLOG("Failed to create raid bdev %s: %s\n",
3736  				    sb->name, spdk_strerror(-rc));
3737  			free(ctx);
3738  			goto out;
3739  		}
3740  
3741  		/* after this base bdev is configured, examine other base bdevs that may be present */
3742  		spdk_uuid_copy(&ctx->raid_bdev_uuid, &sb->uuid);
3743  		ctx->cb_fn = cb_fn;
3744  		ctx->cb_ctx = cb_ctx;
3745  
3746  		cb_fn = raid_bdev_examine_others;
3747  		cb_ctx = ctx;
3748  	}
3749  
3750  	if (raid_bdev->state == RAID_BDEV_STATE_ONLINE) {
3751  		assert(sb_base_bdev->slot < raid_bdev->num_base_bdevs);
3752  		base_info = &raid_bdev->base_bdev_info[sb_base_bdev->slot];
3753  		assert(base_info->is_configured == false);
3754  		assert(sb_base_bdev->state == RAID_SB_BASE_BDEV_MISSING ||
3755  		       sb_base_bdev->state == RAID_SB_BASE_BDEV_FAILED);
3756  		assert(spdk_uuid_is_null(&base_info->uuid));
3757  		spdk_uuid_copy(&base_info->uuid, &sb_base_bdev->uuid);
3758  		SPDK_NOTICELOG("Re-adding bdev %s to raid bdev %s.\n", bdev->name, raid_bdev->bdev.name);
3759  		rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3760  		if (rc != 0) {
3761  			SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3762  				    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3763  		}
3764  		goto out;
3765  	}
3766  
3767  	if (sb_base_bdev->state != RAID_SB_BASE_BDEV_CONFIGURED) {
3768  		SPDK_NOTICELOG("Bdev %s is not an active member of raid bdev %s. Ignoring.\n",
3769  			       bdev->name, raid_bdev->bdev.name);
3770  		rc = -EINVAL;
3771  		goto out;
3772  	}
3773  
3774  	base_info = NULL;
3775  	RAID_FOR_EACH_BASE_BDEV(raid_bdev, iter) {
3776  		if (spdk_uuid_compare(&iter->uuid, spdk_bdev_get_uuid(bdev)) == 0) {
3777  			base_info = iter;
3778  			break;
3779  		}
3780  	}
3781  
3782  	if (base_info == NULL) {
3783  		SPDK_ERRLOG("Bdev %s is not a member of raid bdev %s\n",
3784  			    bdev->name, raid_bdev->bdev.name);
3785  		rc = -EINVAL;
3786  		goto out;
3787  	}
3788  
3789  	if (base_info->is_configured) {
3790  		rc = -EEXIST;
3791  		goto out;
3792  	}
3793  
3794  	rc = raid_bdev_configure_base_bdev(base_info, true, cb_fn, cb_ctx);
3795  	if (rc != 0) {
3796  		SPDK_ERRLOG("Failed to configure bdev %s as base bdev of raid %s: %s\n",
3797  			    bdev->name, raid_bdev->bdev.name, spdk_strerror(-rc));
3798  	}
3799  out:
3800  	if (rc != 0 && cb_fn != 0) {
3801  		cb_fn(cb_ctx, rc);
3802  	}
3803  }
3804  
3805  struct raid_bdev_examine_ctx {
3806  	struct spdk_bdev_desc *desc;
3807  	struct spdk_io_channel *ch;
3808  	raid_bdev_examine_load_sb_cb cb;
3809  	void *cb_ctx;
3810  };
3811  
3812  static void
3813  raid_bdev_examine_ctx_free(struct raid_bdev_examine_ctx *ctx)
3814  {
3815  	if (!ctx) {
3816  		return;
3817  	}
3818  
3819  	if (ctx->ch) {
3820  		spdk_put_io_channel(ctx->ch);
3821  	}
3822  
3823  	if (ctx->desc) {
3824  		spdk_bdev_close(ctx->desc);
3825  	}
3826  
3827  	free(ctx);
3828  }
3829  
3830  static void
3831  raid_bdev_examine_load_sb_done(const struct raid_bdev_superblock *sb, int status, void *_ctx)
3832  {
3833  	struct raid_bdev_examine_ctx *ctx = _ctx;
3834  	struct spdk_bdev *bdev = spdk_bdev_desc_get_bdev(ctx->desc);
3835  
3836  	ctx->cb(bdev, sb, status, ctx->cb_ctx);
3837  
3838  	raid_bdev_examine_ctx_free(ctx);
3839  }
3840  
3841  static void
3842  raid_bdev_examine_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *event_ctx)
3843  {
3844  }
3845  
3846  static int
3847  raid_bdev_examine_load_sb(const char *bdev_name, raid_bdev_examine_load_sb_cb cb, void *cb_ctx)
3848  {
3849  	struct raid_bdev_examine_ctx *ctx;
3850  	int rc;
3851  
3852  	assert(cb != NULL);
3853  
3854  	ctx = calloc(1, sizeof(*ctx));
3855  	if (!ctx) {
3856  		return -ENOMEM;
3857  	}
3858  
3859  	rc = spdk_bdev_open_ext(bdev_name, false, raid_bdev_examine_event_cb, NULL, &ctx->desc);
3860  	if (rc) {
3861  		SPDK_ERRLOG("Failed to open bdev %s: %s\n", bdev_name, spdk_strerror(-rc));
3862  		goto err;
3863  	}
3864  
3865  	ctx->ch = spdk_bdev_get_io_channel(ctx->desc);
3866  	if (!ctx->ch) {
3867  		SPDK_ERRLOG("Failed to get io channel for bdev %s\n", bdev_name);
3868  		rc = -ENOMEM;
3869  		goto err;
3870  	}
3871  
3872  	ctx->cb = cb;
3873  	ctx->cb_ctx = cb_ctx;
3874  
3875  	rc = raid_bdev_load_base_bdev_superblock(ctx->desc, ctx->ch, raid_bdev_examine_load_sb_done, ctx);
3876  	if (rc) {
3877  		SPDK_ERRLOG("Failed to read bdev %s superblock: %s\n",
3878  			    bdev_name, spdk_strerror(-rc));
3879  		goto err;
3880  	}
3881  
3882  	return 0;
3883  err:
3884  	raid_bdev_examine_ctx_free(ctx);
3885  	return rc;
3886  }
3887  
3888  static void
3889  raid_bdev_examine_done(void *ctx, int status)
3890  {
3891  	struct spdk_bdev *bdev = ctx;
3892  
3893  	if (status != 0) {
3894  		SPDK_ERRLOG("Failed to examine bdev %s: %s\n",
3895  			    bdev->name, spdk_strerror(-status));
3896  	}
3897  	spdk_bdev_module_examine_done(&g_raid_if);
3898  }
3899  
3900  static void
3901  raid_bdev_examine_cont(struct spdk_bdev *bdev, const struct raid_bdev_superblock *sb, int status,
3902  		       void *ctx)
3903  {
3904  	switch (status) {
3905  	case 0:
3906  		/* valid superblock found */
3907  		SPDK_DEBUGLOG(bdev_raid, "raid superblock found on bdev %s\n", bdev->name);
3908  		raid_bdev_examine_sb(sb, bdev, raid_bdev_examine_done, bdev);
3909  		return;
3910  	case -EINVAL:
3911  		/* no valid superblock, check if it can be claimed anyway */
3912  		raid_bdev_examine_no_sb(bdev);
3913  		status = 0;
3914  		break;
3915  	}
3916  
3917  	raid_bdev_examine_done(bdev, status);
3918  }
3919  
3920  /*
3921   * brief:
3922   * raid_bdev_examine function is the examine function call by the below layers
3923   * like bdev_nvme layer. This function will check if this base bdev can be
3924   * claimed by this raid bdev or not.
3925   * params:
3926   * bdev - pointer to base bdev
3927   * returns:
3928   * none
3929   */
3930  static void
3931  raid_bdev_examine(struct spdk_bdev *bdev)
3932  {
3933  	int rc = 0;
3934  
3935  	if (raid_bdev_find_base_info_by_bdev(bdev) != NULL) {
3936  		goto done;
3937  	}
3938  
3939  	if (spdk_bdev_get_dif_type(bdev) != SPDK_DIF_DISABLE) {
3940  		raid_bdev_examine_no_sb(bdev);
3941  		goto done;
3942  	}
3943  
3944  	rc = raid_bdev_examine_load_sb(bdev->name, raid_bdev_examine_cont, NULL);
3945  	if (rc != 0) {
3946  		goto done;
3947  	}
3948  
3949  	return;
3950  done:
3951  	raid_bdev_examine_done(bdev, rc);
3952  }
3953  
3954  /* Log component for bdev raid bdev module */
3955  SPDK_LOG_REGISTER_COMPONENT(bdev_raid)
3956  
3957  static void
3958  bdev_raid_trace(void)
3959  {
3960  	struct spdk_trace_tpoint_opts opts[] = {
3961  		{
3962  			"BDEV_RAID_IO_START", TRACE_BDEV_RAID_IO_START,
3963  			OWNER_TYPE_NONE, OBJECT_BDEV_RAID_IO, 1,
3964  			{{ "ctx", SPDK_TRACE_ARG_TYPE_PTR, 8 }}
3965  		},
3966  		{
3967  			"BDEV_RAID_IO_DONE", TRACE_BDEV_RAID_IO_DONE,
3968  			OWNER_TYPE_NONE, OBJECT_BDEV_RAID_IO, 0,
3969  			{{ "ctx", SPDK_TRACE_ARG_TYPE_PTR, 8 }}
3970  		}
3971  	};
3972  
3973  
3974  	spdk_trace_register_object(OBJECT_BDEV_RAID_IO, 'R');
3975  	spdk_trace_register_description_ext(opts, SPDK_COUNTOF(opts));
3976  	spdk_trace_tpoint_register_relation(TRACE_BDEV_IO_START, OBJECT_BDEV_RAID_IO, 1);
3977  	spdk_trace_tpoint_register_relation(TRACE_BDEV_IO_DONE, OBJECT_BDEV_RAID_IO, 0);
3978  }
3979  SPDK_TRACE_REGISTER_FN(bdev_raid_trace, "bdev_raid", TRACE_GROUP_BDEV_RAID)
3980