xref: /spdk/module/bdev/delay/vbdev_delay.c (revision 88e3ffd7b6c5ec1ea1a660354d25f02c766092e1)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "vbdev_delay.h"
37 #include "spdk/rpc.h"
38 #include "spdk/env.h"
39 #include "spdk/endian.h"
40 #include "spdk/string.h"
41 #include "spdk/thread.h"
42 #include "spdk/util.h"
43 
44 #include "spdk/bdev_module.h"
45 #include "spdk/log.h"
46 
47 
48 static int vbdev_delay_init(void);
49 static int vbdev_delay_get_ctx_size(void);
50 static void vbdev_delay_examine(struct spdk_bdev *bdev);
51 static void vbdev_delay_finish(void);
52 static int vbdev_delay_config_json(struct spdk_json_write_ctx *w);
53 
54 static struct spdk_bdev_module delay_if = {
55 	.name = "delay",
56 	.module_init = vbdev_delay_init,
57 	.get_ctx_size = vbdev_delay_get_ctx_size,
58 	.examine_config = vbdev_delay_examine,
59 	.module_fini = vbdev_delay_finish,
60 	.config_json = vbdev_delay_config_json
61 };
62 
63 SPDK_BDEV_MODULE_REGISTER(delay, &delay_if)
64 
65 /* Associative list to be used in examine */
66 struct bdev_association {
67 	char			*vbdev_name;
68 	char			*bdev_name;
69 	uint64_t		avg_read_latency;
70 	uint64_t		p99_read_latency;
71 	uint64_t		avg_write_latency;
72 	uint64_t		p99_write_latency;
73 	TAILQ_ENTRY(bdev_association)	link;
74 };
75 static TAILQ_HEAD(, bdev_association) g_bdev_associations = TAILQ_HEAD_INITIALIZER(
76 			g_bdev_associations);
77 
78 /* List of virtual bdevs and associated info for each. */
79 struct vbdev_delay {
80 	struct spdk_bdev		*base_bdev; /* the thing we're attaching to */
81 	struct spdk_bdev_desc		*base_desc; /* its descriptor we get from open */
82 	struct spdk_bdev		delay_bdev;    /* the delay virtual bdev */
83 	uint64_t			average_read_latency_ticks; /* the average read delay */
84 	uint64_t			p99_read_latency_ticks; /* the p99 read delay */
85 	uint64_t			average_write_latency_ticks; /* the average write delay */
86 	uint64_t			p99_write_latency_ticks; /* the p99 write delay */
87 	TAILQ_ENTRY(vbdev_delay)	link;
88 	struct spdk_thread		*thread;    /* thread where base device is opened */
89 };
90 static TAILQ_HEAD(, vbdev_delay) g_delay_nodes = TAILQ_HEAD_INITIALIZER(g_delay_nodes);
91 
92 struct delay_bdev_io {
93 	int status;
94 
95 	uint64_t completion_tick;
96 
97 	enum delay_io_type type;
98 
99 	struct spdk_io_channel *ch;
100 
101 	struct spdk_bdev_io_wait_entry bdev_io_wait;
102 
103 	STAILQ_ENTRY(delay_bdev_io) link;
104 };
105 
106 struct delay_io_channel {
107 	struct spdk_io_channel	*base_ch; /* IO channel of base device */
108 	STAILQ_HEAD(, delay_bdev_io) avg_read_io;
109 	STAILQ_HEAD(, delay_bdev_io) p99_read_io;
110 	STAILQ_HEAD(, delay_bdev_io) avg_write_io;
111 	STAILQ_HEAD(, delay_bdev_io) p99_write_io;
112 	struct spdk_poller *io_poller;
113 	unsigned int rand_seed;
114 };
115 
116 static void
117 vbdev_delay_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
118 
119 
120 /* Callback for unregistering the IO device. */
121 static void
122 _device_unregister_cb(void *io_device)
123 {
124 	struct vbdev_delay *delay_node  = io_device;
125 
126 	/* Done with this delay_node. */
127 	free(delay_node->delay_bdev.name);
128 	free(delay_node);
129 }
130 
131 static void
132 _vbdev_delay_destruct(void *ctx)
133 {
134 	struct spdk_bdev_desc *desc = ctx;
135 
136 	spdk_bdev_close(desc);
137 }
138 
139 static int
140 vbdev_delay_destruct(void *ctx)
141 {
142 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
143 
144 	/* It is important to follow this exact sequence of steps for destroying
145 	 * a vbdev...
146 	 */
147 
148 	TAILQ_REMOVE(&g_delay_nodes, delay_node, link);
149 
150 	/* Unclaim the underlying bdev. */
151 	spdk_bdev_module_release_bdev(delay_node->base_bdev);
152 
153 	/* Close the underlying bdev on its same opened thread. */
154 	if (delay_node->thread && delay_node->thread != spdk_get_thread()) {
155 		spdk_thread_send_msg(delay_node->thread, _vbdev_delay_destruct, delay_node->base_desc);
156 	} else {
157 		spdk_bdev_close(delay_node->base_desc);
158 	}
159 
160 	/* Unregister the io_device. */
161 	spdk_io_device_unregister(delay_node, _device_unregister_cb);
162 
163 	return 0;
164 }
165 
166 static int
167 _process_io_stailq(void *arg, uint64_t ticks)
168 {
169 	STAILQ_HEAD(, delay_bdev_io) *head = arg;
170 	struct delay_bdev_io *io_ctx, *tmp;
171 	int completions = 0;
172 
173 	STAILQ_FOREACH_SAFE(io_ctx, head, link, tmp) {
174 		if (io_ctx->completion_tick <= ticks) {
175 			STAILQ_REMOVE(head, io_ctx, delay_bdev_io, link);
176 			spdk_bdev_io_complete(spdk_bdev_io_from_ctx(io_ctx), io_ctx->status);
177 			completions++;
178 		} else {
179 			/* In the general case, I/O will become ready in an fifo order. When timeouts are dynamically
180 			 * changed, this is not necessarily the case. However, the normal behavior will be restored
181 			 * after the outstanding I/O at the time of the change have been completed.
182 			 * This essentially means that moving from a high to low latency creates a dam for the new I/O
183 			 * submitted after the latency change. This is considered desirable behavior for the use case where
184 			 * we are trying to trigger a pre-defined timeout on an initiator.
185 			 */
186 			break;
187 		}
188 	}
189 
190 	return completions;
191 }
192 
193 static int
194 _delay_finish_io(void *arg)
195 {
196 	struct delay_io_channel *delay_ch = arg;
197 	uint64_t ticks = spdk_get_ticks();
198 	int completions = 0;
199 
200 	completions += _process_io_stailq(&delay_ch->avg_read_io, ticks);
201 	completions += _process_io_stailq(&delay_ch->avg_write_io, ticks);
202 	completions += _process_io_stailq(&delay_ch->p99_read_io, ticks);
203 	completions += _process_io_stailq(&delay_ch->p99_write_io, ticks);
204 
205 	return completions == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
206 }
207 
208 /* Completion callback for IO that were issued from this bdev. The original bdev_io
209  * is passed in as an arg so we'll complete that one with the appropriate status
210  * and then free the one that this module issued.
211  */
212 static void
213 _delay_complete_io(struct spdk_bdev_io *bdev_io, bool success, void *cb_arg)
214 {
215 	struct spdk_bdev_io *orig_io = cb_arg;
216 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(orig_io->bdev, struct vbdev_delay, delay_bdev);
217 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)orig_io->driver_ctx;
218 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
219 
220 	io_ctx->status = success ? SPDK_BDEV_IO_STATUS_SUCCESS : SPDK_BDEV_IO_STATUS_FAILED;
221 	spdk_bdev_free_io(bdev_io);
222 
223 	/* Put the I/O into the proper list for processing by the channel poller. */
224 	switch (io_ctx->type) {
225 	case DELAY_AVG_READ:
226 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->average_read_latency_ticks;
227 		STAILQ_INSERT_TAIL(&delay_ch->avg_read_io, io_ctx, link);
228 		break;
229 	case DELAY_AVG_WRITE:
230 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->average_write_latency_ticks;
231 		STAILQ_INSERT_TAIL(&delay_ch->avg_write_io, io_ctx, link);
232 		break;
233 	case DELAY_P99_READ:
234 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->p99_read_latency_ticks;
235 		STAILQ_INSERT_TAIL(&delay_ch->p99_read_io, io_ctx, link);
236 		break;
237 	case DELAY_P99_WRITE:
238 		io_ctx->completion_tick = spdk_get_ticks() + delay_node->p99_write_latency_ticks;
239 		STAILQ_INSERT_TAIL(&delay_ch->p99_write_io, io_ctx, link);
240 		break;
241 	case DELAY_NONE:
242 	default:
243 		spdk_bdev_io_complete(orig_io, io_ctx->status);
244 		break;
245 	}
246 }
247 
248 static void
249 vbdev_delay_resubmit_io(void *arg)
250 {
251 	struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
252 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
253 
254 	vbdev_delay_submit_request(io_ctx->ch, bdev_io);
255 }
256 
257 static void
258 vbdev_delay_queue_io(struct spdk_bdev_io *bdev_io)
259 {
260 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
261 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(io_ctx->ch);
262 	int rc;
263 
264 	io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
265 	io_ctx->bdev_io_wait.cb_fn = vbdev_delay_resubmit_io;
266 	io_ctx->bdev_io_wait.cb_arg = bdev_io;
267 
268 	rc = spdk_bdev_queue_io_wait(bdev_io->bdev, delay_ch->base_ch, &io_ctx->bdev_io_wait);
269 	if (rc != 0) {
270 		SPDK_ERRLOG("Queue io failed in vbdev_delay_queue_io, rc=%d.\n", rc);
271 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
272 	}
273 }
274 
275 static void
276 delay_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
277 {
278 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_delay,
279 					 delay_bdev);
280 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
281 	int rc;
282 
283 	if (!success) {
284 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
285 		return;
286 	}
287 
288 	rc = spdk_bdev_readv_blocks(delay_node->base_desc, delay_ch->base_ch, bdev_io->u.bdev.iovs,
289 				    bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
290 				    bdev_io->u.bdev.num_blocks, _delay_complete_io,
291 				    bdev_io);
292 
293 	if (rc == -ENOMEM) {
294 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
295 		vbdev_delay_queue_io(bdev_io);
296 	} else if (rc != 0) {
297 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
298 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
299 	}
300 }
301 
302 static void
303 vbdev_delay_reset_dev(struct spdk_io_channel_iter *i, int status)
304 {
305 	struct spdk_bdev_io *bdev_io = spdk_io_channel_iter_get_ctx(i);
306 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
307 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
308 	struct vbdev_delay *delay_node = spdk_io_channel_iter_get_io_device(i);
309 	int rc;
310 
311 	rc = spdk_bdev_reset(delay_node->base_desc, delay_ch->base_ch,
312 			     _delay_complete_io, bdev_io);
313 
314 	if (rc == -ENOMEM) {
315 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
316 		vbdev_delay_queue_io(bdev_io);
317 	} else if (rc != 0) {
318 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
319 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
320 	}
321 }
322 
323 static void
324 _abort_all_delayed_io(void *arg)
325 {
326 	STAILQ_HEAD(, delay_bdev_io) *head = arg;
327 	struct delay_bdev_io *io_ctx, *tmp;
328 
329 	STAILQ_FOREACH_SAFE(io_ctx, head, link, tmp) {
330 		STAILQ_REMOVE(head, io_ctx, delay_bdev_io, link);
331 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(io_ctx), SPDK_BDEV_IO_STATUS_ABORTED);
332 	}
333 }
334 
335 static void
336 vbdev_delay_reset_channel(struct spdk_io_channel_iter *i)
337 {
338 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
339 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
340 
341 	_abort_all_delayed_io(&delay_ch->avg_read_io);
342 	_abort_all_delayed_io(&delay_ch->avg_write_io);
343 	_abort_all_delayed_io(&delay_ch->p99_read_io);
344 	_abort_all_delayed_io(&delay_ch->p99_write_io);
345 
346 	spdk_for_each_channel_continue(i, 0);
347 }
348 
349 static bool
350 abort_delayed_io(void *_head, struct spdk_bdev_io *bio_to_abort)
351 {
352 	STAILQ_HEAD(, delay_bdev_io) *head = _head;
353 	struct delay_bdev_io *io_ctx_to_abort = (struct delay_bdev_io *)bio_to_abort->driver_ctx;
354 	struct delay_bdev_io *io_ctx;
355 
356 	STAILQ_FOREACH(io_ctx, head, link) {
357 		if (io_ctx == io_ctx_to_abort) {
358 			STAILQ_REMOVE(head, io_ctx_to_abort, delay_bdev_io, link);
359 			spdk_bdev_io_complete(bio_to_abort, SPDK_BDEV_IO_STATUS_ABORTED);
360 			return true;
361 		}
362 	}
363 
364 	return false;
365 }
366 
367 static int
368 vbdev_delay_abort(struct vbdev_delay *delay_node, struct delay_io_channel *delay_ch,
369 		  struct spdk_bdev_io *bdev_io)
370 {
371 	struct spdk_bdev_io *bio_to_abort = bdev_io->u.abort.bio_to_abort;
372 
373 	if (abort_delayed_io(&delay_ch->avg_read_io, bio_to_abort) ||
374 	    abort_delayed_io(&delay_ch->avg_write_io, bio_to_abort) ||
375 	    abort_delayed_io(&delay_ch->p99_read_io, bio_to_abort) ||
376 	    abort_delayed_io(&delay_ch->p99_write_io, bio_to_abort)) {
377 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
378 		return 0;
379 	}
380 
381 	return spdk_bdev_abort(delay_node->base_desc, delay_ch->base_ch, bio_to_abort,
382 			       _delay_complete_io, bdev_io);
383 }
384 
385 static void
386 vbdev_delay_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
387 {
388 	struct vbdev_delay *delay_node = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_delay, delay_bdev);
389 	struct delay_io_channel *delay_ch = spdk_io_channel_get_ctx(ch);
390 	struct delay_bdev_io *io_ctx = (struct delay_bdev_io *)bdev_io->driver_ctx;
391 	int rc = 0;
392 	bool is_p99;
393 
394 	is_p99 = rand_r(&delay_ch->rand_seed) % 100 == 0 ? true : false;
395 
396 	io_ctx->ch = ch;
397 	io_ctx->type = DELAY_NONE;
398 
399 	switch (bdev_io->type) {
400 	case SPDK_BDEV_IO_TYPE_READ:
401 		io_ctx->type = is_p99 ? DELAY_P99_READ : DELAY_AVG_READ;
402 		spdk_bdev_io_get_buf(bdev_io, delay_read_get_buf_cb,
403 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
404 		break;
405 	case SPDK_BDEV_IO_TYPE_WRITE:
406 		io_ctx->type = is_p99 ? DELAY_P99_WRITE : DELAY_AVG_WRITE;
407 		rc = spdk_bdev_writev_blocks(delay_node->base_desc, delay_ch->base_ch, bdev_io->u.bdev.iovs,
408 					     bdev_io->u.bdev.iovcnt, bdev_io->u.bdev.offset_blocks,
409 					     bdev_io->u.bdev.num_blocks, _delay_complete_io,
410 					     bdev_io);
411 		break;
412 	case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
413 		rc = spdk_bdev_write_zeroes_blocks(delay_node->base_desc, delay_ch->base_ch,
414 						   bdev_io->u.bdev.offset_blocks,
415 						   bdev_io->u.bdev.num_blocks,
416 						   _delay_complete_io, bdev_io);
417 		break;
418 	case SPDK_BDEV_IO_TYPE_UNMAP:
419 		rc = spdk_bdev_unmap_blocks(delay_node->base_desc, delay_ch->base_ch,
420 					    bdev_io->u.bdev.offset_blocks,
421 					    bdev_io->u.bdev.num_blocks,
422 					    _delay_complete_io, bdev_io);
423 		break;
424 	case SPDK_BDEV_IO_TYPE_FLUSH:
425 		rc = spdk_bdev_flush_blocks(delay_node->base_desc, delay_ch->base_ch,
426 					    bdev_io->u.bdev.offset_blocks,
427 					    bdev_io->u.bdev.num_blocks,
428 					    _delay_complete_io, bdev_io);
429 		break;
430 	case SPDK_BDEV_IO_TYPE_RESET:
431 		/* During reset, the generic bdev layer aborts all new I/Os and queues all new resets.
432 		 * Hence we can simply abort all I/Os delayed to complete.
433 		 */
434 		spdk_for_each_channel(delay_node, vbdev_delay_reset_channel, bdev_io,
435 				      vbdev_delay_reset_dev);
436 		break;
437 	case SPDK_BDEV_IO_TYPE_ABORT:
438 		rc = vbdev_delay_abort(delay_node, delay_ch, bdev_io);
439 		break;
440 	default:
441 		SPDK_ERRLOG("delay: unknown I/O type %d\n", bdev_io->type);
442 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
443 		return;
444 	}
445 
446 	if (rc == -ENOMEM) {
447 		SPDK_ERRLOG("No memory, start to queue io for delay.\n");
448 		vbdev_delay_queue_io(bdev_io);
449 	} else if (rc != 0) {
450 		SPDK_ERRLOG("ERROR on bdev_io submission!\n");
451 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
452 	}
453 }
454 
455 static bool
456 vbdev_delay_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
457 {
458 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
459 
460 	if (io_type == SPDK_BDEV_IO_TYPE_ZCOPY) {
461 		return false;
462 	} else {
463 		return spdk_bdev_io_type_supported(delay_node->base_bdev, io_type);
464 	}
465 }
466 
467 static struct spdk_io_channel *
468 vbdev_delay_get_io_channel(void *ctx)
469 {
470 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
471 	struct spdk_io_channel *delay_ch = NULL;
472 
473 	delay_ch = spdk_get_io_channel(delay_node);
474 
475 	return delay_ch;
476 }
477 
478 static void
479 _delay_write_conf_values(struct vbdev_delay *delay_node, struct spdk_json_write_ctx *w)
480 {
481 	spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&delay_node->delay_bdev));
482 	spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(delay_node->base_bdev));
483 	spdk_json_write_named_int64(w, "avg_read_latency",
484 				    delay_node->average_read_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
485 	spdk_json_write_named_int64(w, "p99_read_latency",
486 				    delay_node->p99_read_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
487 	spdk_json_write_named_int64(w, "avg_write_latency",
488 				    delay_node->average_write_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
489 	spdk_json_write_named_int64(w, "p99_write_latency",
490 				    delay_node->p99_write_latency_ticks * SPDK_SEC_TO_USEC / spdk_get_ticks_hz());
491 }
492 
493 static int
494 vbdev_delay_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
495 {
496 	struct vbdev_delay *delay_node = (struct vbdev_delay *)ctx;
497 
498 	spdk_json_write_name(w, "delay");
499 	spdk_json_write_object_begin(w);
500 	_delay_write_conf_values(delay_node, w);
501 	spdk_json_write_object_end(w);
502 
503 	return 0;
504 }
505 
506 /* This is used to generate JSON that can configure this module to its current state. */
507 static int
508 vbdev_delay_config_json(struct spdk_json_write_ctx *w)
509 {
510 	struct vbdev_delay *delay_node;
511 
512 	TAILQ_FOREACH(delay_node, &g_delay_nodes, link) {
513 		spdk_json_write_object_begin(w);
514 		spdk_json_write_named_string(w, "method", "bdev_delay_create");
515 		spdk_json_write_named_object_begin(w, "params");
516 		_delay_write_conf_values(delay_node, w);
517 		spdk_json_write_object_end(w);
518 	}
519 	return 0;
520 }
521 
522 /* We provide this callback for the SPDK channel code to create a channel using
523  * the channel struct we provided in our module get_io_channel() entry point. Here
524  * we get and save off an underlying base channel of the device below us so that
525  * we can communicate with the base bdev on a per channel basis.  If we needed
526  * our own poller for this vbdev, we'd register it here.
527  */
528 static int
529 delay_bdev_ch_create_cb(void *io_device, void *ctx_buf)
530 {
531 	struct delay_io_channel *delay_ch = ctx_buf;
532 	struct vbdev_delay *delay_node = io_device;
533 
534 	STAILQ_INIT(&delay_ch->avg_read_io);
535 	STAILQ_INIT(&delay_ch->p99_read_io);
536 	STAILQ_INIT(&delay_ch->avg_write_io);
537 	STAILQ_INIT(&delay_ch->p99_write_io);
538 
539 	delay_ch->io_poller = SPDK_POLLER_REGISTER(_delay_finish_io, delay_ch, 0);
540 	delay_ch->base_ch = spdk_bdev_get_io_channel(delay_node->base_desc);
541 	delay_ch->rand_seed = time(NULL);
542 
543 	return 0;
544 }
545 
546 /* We provide this callback for the SPDK channel code to destroy a channel
547  * created with our create callback. We just need to undo anything we did
548  * when we created. If this bdev used its own poller, we'd unregsiter it here.
549  */
550 static void
551 delay_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
552 {
553 	struct delay_io_channel *delay_ch = ctx_buf;
554 
555 	spdk_poller_unregister(&delay_ch->io_poller);
556 	spdk_put_io_channel(delay_ch->base_ch);
557 }
558 
559 /* Create the delay association from the bdev and vbdev name and insert
560  * on the global list. */
561 static int
562 vbdev_delay_insert_association(const char *bdev_name, const char *vbdev_name,
563 			       uint64_t avg_read_latency, uint64_t p99_read_latency,
564 			       uint64_t avg_write_latency, uint64_t p99_write_latency)
565 {
566 	struct bdev_association *assoc;
567 
568 	TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
569 		if (strcmp(vbdev_name, assoc->vbdev_name) == 0) {
570 			SPDK_ERRLOG("delay bdev %s already exists\n", vbdev_name);
571 			return -EEXIST;
572 		}
573 	}
574 
575 	assoc = calloc(1, sizeof(struct bdev_association));
576 	if (!assoc) {
577 		SPDK_ERRLOG("could not allocate bdev_association\n");
578 		return -ENOMEM;
579 	}
580 
581 	assoc->bdev_name = strdup(bdev_name);
582 	if (!assoc->bdev_name) {
583 		SPDK_ERRLOG("could not allocate assoc->bdev_name\n");
584 		free(assoc);
585 		return -ENOMEM;
586 	}
587 
588 	assoc->vbdev_name = strdup(vbdev_name);
589 	if (!assoc->vbdev_name) {
590 		SPDK_ERRLOG("could not allocate assoc->vbdev_name\n");
591 		free(assoc->bdev_name);
592 		free(assoc);
593 		return -ENOMEM;
594 	}
595 
596 	assoc->avg_read_latency = avg_read_latency;
597 	assoc->p99_read_latency = p99_read_latency;
598 	assoc->avg_write_latency = avg_write_latency;
599 	assoc->p99_write_latency = p99_write_latency;
600 
601 	TAILQ_INSERT_TAIL(&g_bdev_associations, assoc, link);
602 
603 	return 0;
604 }
605 
606 int
607 vbdev_delay_update_latency_value(char *delay_name, uint64_t latency_us, enum delay_io_type type)
608 {
609 	struct spdk_bdev *delay_bdev;
610 	struct vbdev_delay *delay_node;
611 	uint64_t ticks_mhz = spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
612 
613 	delay_bdev = spdk_bdev_get_by_name(delay_name);
614 	if (delay_bdev == NULL) {
615 		return -ENODEV;
616 	} else if (delay_bdev->module != &delay_if) {
617 		return -EINVAL;
618 	}
619 
620 	delay_node = SPDK_CONTAINEROF(delay_bdev, struct vbdev_delay, delay_bdev);
621 
622 	switch (type) {
623 	case DELAY_AVG_READ:
624 		delay_node->average_read_latency_ticks = ticks_mhz * latency_us;
625 		break;
626 	case DELAY_AVG_WRITE:
627 		delay_node->average_write_latency_ticks = ticks_mhz * latency_us;
628 		break;
629 	case DELAY_P99_READ:
630 		delay_node->p99_read_latency_ticks = ticks_mhz * latency_us;
631 		break;
632 	case DELAY_P99_WRITE:
633 		delay_node->p99_write_latency_ticks = ticks_mhz * latency_us;
634 		break;
635 	default:
636 		return -EINVAL;
637 	}
638 
639 	return 0;
640 }
641 
642 static int
643 vbdev_delay_init(void)
644 {
645 	/* Not allowing for .ini style configuration. */
646 	return 0;
647 }
648 
649 static void
650 vbdev_delay_finish(void)
651 {
652 	struct bdev_association *assoc;
653 
654 	while ((assoc = TAILQ_FIRST(&g_bdev_associations))) {
655 		TAILQ_REMOVE(&g_bdev_associations, assoc, link);
656 		free(assoc->bdev_name);
657 		free(assoc->vbdev_name);
658 		free(assoc);
659 	}
660 }
661 
662 static int
663 vbdev_delay_get_ctx_size(void)
664 {
665 	return sizeof(struct delay_bdev_io);
666 }
667 
668 static void
669 vbdev_delay_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
670 {
671 	/* No config per bdev needed */
672 }
673 
674 /* When we register our bdev this is how we specify our entry points. */
675 static const struct spdk_bdev_fn_table vbdev_delay_fn_table = {
676 	.destruct		= vbdev_delay_destruct,
677 	.submit_request		= vbdev_delay_submit_request,
678 	.io_type_supported	= vbdev_delay_io_type_supported,
679 	.get_io_channel		= vbdev_delay_get_io_channel,
680 	.dump_info_json		= vbdev_delay_dump_info_json,
681 	.write_config_json	= vbdev_delay_write_config_json,
682 };
683 
684 static void
685 vbdev_delay_base_bdev_hotremove_cb(struct spdk_bdev *bdev_find)
686 {
687 	struct vbdev_delay *delay_node, *tmp;
688 
689 	TAILQ_FOREACH_SAFE(delay_node, &g_delay_nodes, link, tmp) {
690 		if (bdev_find == delay_node->base_bdev) {
691 			spdk_bdev_unregister(&delay_node->delay_bdev, NULL, NULL);
692 		}
693 	}
694 }
695 
696 /* Called when the underlying base bdev triggers asynchronous event such as bdev removal. */
697 static void
698 vbdev_delay_base_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev,
699 			       void *event_ctx)
700 {
701 	switch (type) {
702 	case SPDK_BDEV_EVENT_REMOVE:
703 		vbdev_delay_base_bdev_hotremove_cb(bdev);
704 		break;
705 	default:
706 		SPDK_NOTICELOG("Unsupported bdev event: type %d\n", type);
707 		break;
708 	}
709 }
710 
711 /* Create and register the delay vbdev if we find it in our list of bdev names.
712  * This can be called either by the examine path or RPC method.
713  */
714 static int
715 vbdev_delay_register(const char *bdev_name)
716 {
717 	struct bdev_association *assoc;
718 	struct vbdev_delay *delay_node;
719 	struct spdk_bdev *bdev;
720 	uint64_t ticks_mhz = spdk_get_ticks_hz() / SPDK_SEC_TO_USEC;
721 	int rc = 0;
722 
723 	/* Check our list of names from config versus this bdev and if
724 	 * there's a match, create the delay_node & bdev accordingly.
725 	 */
726 	TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
727 		if (strcmp(assoc->bdev_name, bdev_name) != 0) {
728 			continue;
729 		}
730 
731 		delay_node = calloc(1, sizeof(struct vbdev_delay));
732 		if (!delay_node) {
733 			rc = -ENOMEM;
734 			SPDK_ERRLOG("could not allocate delay_node\n");
735 			break;
736 		}
737 		delay_node->delay_bdev.name = strdup(assoc->vbdev_name);
738 		if (!delay_node->delay_bdev.name) {
739 			rc = -ENOMEM;
740 			SPDK_ERRLOG("could not allocate delay_bdev name\n");
741 			free(delay_node);
742 			break;
743 		}
744 		delay_node->delay_bdev.product_name = "delay";
745 
746 		/* The base bdev that we're attaching to. */
747 		rc = spdk_bdev_open_ext(bdev_name, true, vbdev_delay_base_bdev_event_cb,
748 					NULL, &delay_node->base_desc);
749 		if (rc) {
750 			if (rc != -ENODEV) {
751 				SPDK_ERRLOG("could not open bdev %s\n", bdev_name);
752 			}
753 			free(delay_node->delay_bdev.name);
754 			free(delay_node);
755 			break;
756 		}
757 
758 		bdev = spdk_bdev_desc_get_bdev(delay_node->base_desc);
759 		delay_node->base_bdev = bdev;
760 
761 		delay_node->delay_bdev.write_cache = bdev->write_cache;
762 		delay_node->delay_bdev.required_alignment = bdev->required_alignment;
763 		delay_node->delay_bdev.optimal_io_boundary = bdev->optimal_io_boundary;
764 		delay_node->delay_bdev.blocklen = bdev->blocklen;
765 		delay_node->delay_bdev.blockcnt = bdev->blockcnt;
766 
767 		delay_node->delay_bdev.ctxt = delay_node;
768 		delay_node->delay_bdev.fn_table = &vbdev_delay_fn_table;
769 		delay_node->delay_bdev.module = &delay_if;
770 
771 		/* Store the number of ticks you need to add to get the I/O expiration time. */
772 		delay_node->average_read_latency_ticks = ticks_mhz * assoc->avg_read_latency;
773 		delay_node->p99_read_latency_ticks = ticks_mhz * assoc->p99_read_latency;
774 		delay_node->average_write_latency_ticks = ticks_mhz * assoc->avg_write_latency;
775 		delay_node->p99_write_latency_ticks = ticks_mhz * assoc->p99_write_latency;
776 
777 		spdk_io_device_register(delay_node, delay_bdev_ch_create_cb, delay_bdev_ch_destroy_cb,
778 					sizeof(struct delay_io_channel),
779 					assoc->vbdev_name);
780 
781 		/* Save the thread where the base device is opened */
782 		delay_node->thread = spdk_get_thread();
783 
784 		rc = spdk_bdev_module_claim_bdev(bdev, delay_node->base_desc, delay_node->delay_bdev.module);
785 		if (rc) {
786 			SPDK_ERRLOG("could not claim bdev %s\n", bdev_name);
787 			goto error_close;
788 		}
789 
790 		rc = spdk_bdev_register(&delay_node->delay_bdev);
791 		if (rc) {
792 			SPDK_ERRLOG("could not register delay_bdev\n");
793 			spdk_bdev_module_release_bdev(delay_node->base_bdev);
794 			goto error_close;
795 		}
796 
797 		TAILQ_INSERT_TAIL(&g_delay_nodes, delay_node, link);
798 	}
799 
800 	return rc;
801 
802 error_close:
803 	spdk_bdev_close(delay_node->base_desc);
804 	spdk_io_device_unregister(delay_node, NULL);
805 	free(delay_node->delay_bdev.name);
806 	free(delay_node);
807 	return rc;
808 }
809 
810 int
811 create_delay_disk(const char *bdev_name, const char *vbdev_name, uint64_t avg_read_latency,
812 		  uint64_t p99_read_latency, uint64_t avg_write_latency, uint64_t p99_write_latency)
813 {
814 	int rc = 0;
815 
816 	if (p99_read_latency < avg_read_latency || p99_write_latency < avg_write_latency) {
817 		SPDK_ERRLOG("Unable to create a delay bdev where p99 latency is less than average latency.\n");
818 		return -EINVAL;
819 	}
820 
821 	rc = vbdev_delay_insert_association(bdev_name, vbdev_name, avg_read_latency, p99_read_latency,
822 					    avg_write_latency, p99_write_latency);
823 	if (rc) {
824 		return rc;
825 	}
826 
827 	rc = vbdev_delay_register(bdev_name);
828 	if (rc == -ENODEV) {
829 		/* This is not an error, we tracked the name above and it still
830 		 * may show up later.
831 		 */
832 		SPDK_NOTICELOG("vbdev creation deferred pending base bdev arrival\n");
833 		rc = 0;
834 	}
835 
836 	return rc;
837 }
838 
839 void
840 delete_delay_disk(struct spdk_bdev *bdev, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
841 {
842 	struct bdev_association *assoc;
843 
844 	if (!bdev || bdev->module != &delay_if) {
845 		cb_fn(cb_arg, -ENODEV);
846 		return;
847 	}
848 
849 	TAILQ_FOREACH(assoc, &g_bdev_associations, link) {
850 		if (strcmp(assoc->vbdev_name, bdev->name) == 0) {
851 			TAILQ_REMOVE(&g_bdev_associations, assoc, link);
852 			free(assoc->bdev_name);
853 			free(assoc->vbdev_name);
854 			free(assoc);
855 			break;
856 		}
857 	}
858 
859 	spdk_bdev_unregister(bdev, cb_fn, cb_arg);
860 }
861 
862 static void
863 vbdev_delay_examine(struct spdk_bdev *bdev)
864 {
865 	vbdev_delay_register(bdev->name);
866 
867 	spdk_bdev_module_examine_done(&delay_if);
868 }
869 
870 SPDK_LOG_REGISTER_COMPONENT(vbdev_delay)
871