xref: /spdk/lib/blob/request.c (revision c6c1234de9e0015e670dd0b51bf6ce39ee0e07bd)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (C) 2017 Intel Corporation.
3  *   All rights reserved.
4  *   Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5  */
6 
7 #include "spdk/stdinc.h"
8 
9 #include "blobstore.h"
10 #include "request.h"
11 
12 #include "spdk/thread.h"
13 #include "spdk/queue.h"
14 #include "spdk/trace.h"
15 
16 #include "spdk_internal/trace_defs.h"
17 #include "spdk/log.h"
18 
19 void
20 bs_call_cpl(struct spdk_bs_cpl *cpl, int bserrno)
21 {
22 	switch (cpl->type) {
23 	case SPDK_BS_CPL_TYPE_BS_BASIC:
24 		cpl->u.bs_basic.cb_fn(cpl->u.bs_basic.cb_arg,
25 				      bserrno);
26 		break;
27 	case SPDK_BS_CPL_TYPE_BS_HANDLE:
28 		cpl->u.bs_handle.cb_fn(cpl->u.bs_handle.cb_arg,
29 				       bserrno == 0 ? cpl->u.bs_handle.bs : NULL,
30 				       bserrno);
31 		break;
32 	case SPDK_BS_CPL_TYPE_BLOB_BASIC:
33 		cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg,
34 					bserrno);
35 		break;
36 	case SPDK_BS_CPL_TYPE_BLOBID:
37 		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg,
38 				    bserrno == 0 ? cpl->u.blobid.blobid : SPDK_BLOBID_INVALID,
39 				    bserrno);
40 		break;
41 	case SPDK_BS_CPL_TYPE_BLOB_HANDLE:
42 		cpl->u.blob_handle.cb_fn(cpl->u.blob_handle.cb_arg,
43 					 bserrno == 0 ? cpl->u.blob_handle.blob : NULL,
44 					 bserrno);
45 		break;
46 	case SPDK_BS_CPL_TYPE_NESTED_SEQUENCE:
47 		cpl->u.nested_seq.cb_fn(cpl->u.nested_seq.cb_arg,
48 					cpl->u.nested_seq.parent,
49 					bserrno);
50 		break;
51 	case SPDK_BS_CPL_TYPE_NONE:
52 		/* this completion's callback is handled elsewhere */
53 		break;
54 	}
55 }
56 
57 static void
58 bs_request_set_complete(struct spdk_bs_request_set *set)
59 {
60 	struct spdk_bs_cpl cpl = set->cpl;
61 	int bserrno = set->bserrno;
62 
63 	spdk_trace_record(TRACE_BLOB_REQ_SET_COMPLETE, 0, 0, (uintptr_t)&set->cb_args,
64 			  (uintptr_t)set->cpl.u.blob_basic.cb_arg);
65 
66 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
67 
68 	bs_call_cpl(&cpl, bserrno);
69 }
70 
71 static void
72 bs_sequence_completion(struct spdk_io_channel *channel, void *cb_arg, int bserrno)
73 {
74 	struct spdk_bs_request_set *set = cb_arg;
75 
76 	set->bserrno = bserrno;
77 	set->u.sequence.cb_fn((spdk_bs_sequence_t *)set, set->u.sequence.cb_arg, bserrno);
78 }
79 
80 static inline spdk_bs_sequence_t *
81 bs_sequence_start(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl,
82 		  struct spdk_io_channel *back_channel)
83 {
84 	struct spdk_bs_channel		*channel;
85 	struct spdk_bs_request_set	*set;
86 
87 	channel = spdk_io_channel_get_ctx(_channel);
88 	assert(channel != NULL);
89 	set = TAILQ_FIRST(&channel->reqs);
90 	if (!set) {
91 		return NULL;
92 	}
93 	TAILQ_REMOVE(&channel->reqs, set, link);
94 
95 	spdk_trace_record(TRACE_BLOB_REQ_SET_START, 0, 0, (uintptr_t)&set->cb_args,
96 			  (uintptr_t)cpl->u.blob_basic.cb_arg);
97 
98 	set->cpl = *cpl;
99 	set->bserrno = 0;
100 	set->channel = channel;
101 	set->back_channel = back_channel;
102 
103 	set->cb_args.cb_fn = bs_sequence_completion;
104 	set->cb_args.cb_arg = set;
105 	set->cb_args.channel = channel->dev_channel;
106 	set->ext_io_opts = NULL;
107 
108 	return (spdk_bs_sequence_t *)set;
109 }
110 
111 /* Use when performing IO directly on the blobstore (e.g. metadata - not a blob). */
112 spdk_bs_sequence_t *
113 bs_sequence_start_bs(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl)
114 {
115 	return bs_sequence_start(_channel, cpl, _channel);
116 }
117 
118 /* Use when performing IO on a blob. */
119 spdk_bs_sequence_t *
120 bs_sequence_start_blob(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl,
121 		       struct spdk_blob *blob)
122 {
123 	struct spdk_io_channel	*esnap_ch = _channel;
124 
125 	if (spdk_blob_is_esnap_clone(blob)) {
126 		esnap_ch = blob_esnap_get_io_channel(_channel, blob);
127 		if (esnap_ch == NULL) {
128 			/*
129 			 * The most likely reason we are here is because of some logic error
130 			 * elsewhere that caused channel allocations to fail. We could get here due
131 			 * to being out of memory as well. If we are out of memory, the process is
132 			 * this will be just one of many problems that this process will be having.
133 			 * Killing it off debug builds now due to logic errors is the right thing to
134 			 * do and killing it off due to ENOMEM is no big loss.
135 			 */
136 			assert(false);
137 			return NULL;
138 		}
139 	}
140 	return bs_sequence_start(_channel, cpl, esnap_ch);
141 }
142 
143 void
144 bs_sequence_read_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
145 			void *payload, uint64_t lba, uint32_t lba_count,
146 			spdk_bs_sequence_cpl cb_fn, void *cb_arg)
147 {
148 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)seq;
149 	struct spdk_io_channel		*back_channel = set->back_channel;
150 
151 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
152 		      lba);
153 
154 	set->u.sequence.cb_fn = cb_fn;
155 	set->u.sequence.cb_arg = cb_arg;
156 
157 	bs_dev->read(bs_dev, back_channel, payload, lba, lba_count, &set->cb_args);
158 }
159 
160 void
161 bs_sequence_read_dev(spdk_bs_sequence_t *seq, void *payload,
162 		     uint64_t lba, uint32_t lba_count,
163 		     spdk_bs_sequence_cpl cb_fn, void *cb_arg)
164 {
165 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
166 	struct spdk_bs_channel       *channel = set->channel;
167 
168 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
169 		      lba);
170 
171 	set->u.sequence.cb_fn = cb_fn;
172 	set->u.sequence.cb_arg = cb_arg;
173 
174 	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
175 }
176 
177 void
178 bs_sequence_write_dev(spdk_bs_sequence_t *seq, void *payload,
179 		      uint64_t lba, uint32_t lba_count,
180 		      spdk_bs_sequence_cpl cb_fn, void *cb_arg)
181 {
182 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
183 	struct spdk_bs_channel       *channel = set->channel;
184 
185 	SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
186 		      lba);
187 
188 	set->u.sequence.cb_fn = cb_fn;
189 	set->u.sequence.cb_arg = cb_arg;
190 
191 	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
192 			    &set->cb_args);
193 }
194 
195 void
196 bs_sequence_readv_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
197 			 struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
198 			 spdk_bs_sequence_cpl cb_fn, void *cb_arg)
199 {
200 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
201 	struct spdk_io_channel		*back_channel = set->back_channel;
202 
203 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
204 		      lba);
205 
206 	set->u.sequence.cb_fn = cb_fn;
207 	set->u.sequence.cb_arg = cb_arg;
208 
209 	if (set->ext_io_opts) {
210 		assert(bs_dev->readv_ext);
211 		bs_dev->readv_ext(bs_dev, back_channel, iov, iovcnt, lba, lba_count,
212 				  &set->cb_args, set->ext_io_opts);
213 	} else {
214 		bs_dev->readv(bs_dev, back_channel, iov, iovcnt, lba, lba_count, &set->cb_args);
215 	}
216 }
217 
218 void
219 bs_sequence_readv_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
220 		      uint64_t lba, uint32_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
221 {
222 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
223 	struct spdk_bs_channel       *channel = set->channel;
224 
225 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
226 		      lba);
227 
228 	set->u.sequence.cb_fn = cb_fn;
229 	set->u.sequence.cb_arg = cb_arg;
230 	if (set->ext_io_opts) {
231 		assert(channel->dev->readv_ext);
232 		channel->dev->readv_ext(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
233 					&set->cb_args, set->ext_io_opts);
234 	} else {
235 		channel->dev->readv(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count, &set->cb_args);
236 	}
237 }
238 
239 void
240 bs_sequence_writev_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
241 		       uint64_t lba, uint32_t lba_count,
242 		       spdk_bs_sequence_cpl cb_fn, void *cb_arg)
243 {
244 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
245 	struct spdk_bs_channel       *channel = set->channel;
246 
247 	SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
248 		      lba);
249 
250 	set->u.sequence.cb_fn = cb_fn;
251 	set->u.sequence.cb_arg = cb_arg;
252 
253 	if (set->ext_io_opts) {
254 		assert(channel->dev->writev_ext);
255 		channel->dev->writev_ext(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
256 					 &set->cb_args, set->ext_io_opts);
257 	} else {
258 		channel->dev->writev(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
259 				     &set->cb_args);
260 	}
261 }
262 
263 void
264 bs_sequence_write_zeroes_dev(spdk_bs_sequence_t *seq,
265 			     uint64_t lba, uint64_t lba_count,
266 			     spdk_bs_sequence_cpl cb_fn, void *cb_arg)
267 {
268 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
269 	struct spdk_bs_channel       *channel = set->channel;
270 
271 	SPDK_DEBUGLOG(blob_rw, "writing zeroes to %" PRIu64 " blocks at LBA %" PRIu64 "\n",
272 		      lba_count, lba);
273 
274 	set->u.sequence.cb_fn = cb_fn;
275 	set->u.sequence.cb_arg = cb_arg;
276 
277 	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
278 				   &set->cb_args);
279 }
280 
281 void
282 bs_sequence_copy_dev(spdk_bs_sequence_t *seq, uint64_t dst_lba, uint64_t src_lba,
283 		     uint64_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
284 {
285 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq;
286 	struct spdk_bs_channel     *channel = set->channel;
287 
288 	SPDK_DEBUGLOG(blob_rw, "Copying %" PRIu64 " blocks from LBA %" PRIu64 " to LBA %" PRIu64 "\n",
289 		      lba_count, src_lba, dst_lba);
290 
291 	set->u.sequence.cb_fn = cb_fn;
292 	set->u.sequence.cb_arg = cb_arg;
293 
294 	channel->dev->copy(channel->dev, channel->dev_channel, dst_lba, src_lba, lba_count, &set->cb_args);
295 }
296 
297 void
298 bs_sequence_finish(spdk_bs_sequence_t *seq, int bserrno)
299 {
300 	if (bserrno != 0) {
301 		seq->bserrno = bserrno;
302 	}
303 	bs_request_set_complete((struct spdk_bs_request_set *)seq);
304 }
305 
306 void
307 bs_user_op_sequence_finish(void *cb_arg, int bserrno)
308 {
309 	spdk_bs_sequence_t *seq = cb_arg;
310 
311 	bs_sequence_finish(seq, bserrno);
312 }
313 
314 static void
315 bs_batch_completion(struct spdk_io_channel *_channel,
316 		    void *cb_arg, int bserrno)
317 {
318 	struct spdk_bs_request_set	*set = cb_arg;
319 
320 	set->u.batch.outstanding_ops--;
321 	if (bserrno != 0) {
322 		set->bserrno = bserrno;
323 	}
324 
325 	if (set->u.batch.outstanding_ops == 0 && set->u.batch.batch_closed) {
326 		if (set->u.batch.cb_fn) {
327 			set->cb_args.cb_fn = bs_sequence_completion;
328 			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, bserrno);
329 		} else {
330 			bs_request_set_complete(set);
331 		}
332 	}
333 }
334 
335 spdk_bs_batch_t *
336 bs_batch_open(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl, struct spdk_blob *blob)
337 {
338 	struct spdk_bs_channel		*channel;
339 	struct spdk_bs_request_set	*set;
340 	struct spdk_io_channel		*back_channel = _channel;
341 
342 	if (spdk_blob_is_esnap_clone(blob)) {
343 		back_channel = blob_esnap_get_io_channel(_channel, blob);
344 		if (back_channel == NULL) {
345 			return NULL;
346 		}
347 	}
348 
349 	channel = spdk_io_channel_get_ctx(_channel);
350 	assert(channel != NULL);
351 	set = TAILQ_FIRST(&channel->reqs);
352 	if (!set) {
353 		return NULL;
354 	}
355 	TAILQ_REMOVE(&channel->reqs, set, link);
356 
357 	spdk_trace_record(TRACE_BLOB_REQ_SET_START, 0, 0, (uintptr_t)&set->cb_args,
358 			  (uintptr_t)cpl->u.blob_basic.cb_arg);
359 
360 	set->cpl = *cpl;
361 	set->bserrno = 0;
362 	set->channel = channel;
363 	set->back_channel = back_channel;
364 
365 	set->u.batch.cb_fn = NULL;
366 	set->u.batch.cb_arg = NULL;
367 	set->u.batch.outstanding_ops = 0;
368 	set->u.batch.batch_closed = 0;
369 
370 	set->cb_args.cb_fn = bs_batch_completion;
371 	set->cb_args.cb_arg = set;
372 	set->cb_args.channel = channel->dev_channel;
373 
374 	return (spdk_bs_batch_t *)set;
375 }
376 
377 void
378 bs_batch_read_bs_dev(spdk_bs_batch_t *batch, struct spdk_bs_dev *bs_dev,
379 		     void *payload, uint64_t lba, uint32_t lba_count)
380 {
381 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
382 	struct spdk_io_channel		*back_channel = set->back_channel;
383 
384 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
385 		      lba);
386 
387 	set->u.batch.outstanding_ops++;
388 	bs_dev->read(bs_dev, back_channel, payload, lba, lba_count, &set->cb_args);
389 }
390 
391 void
392 bs_batch_read_dev(spdk_bs_batch_t *batch, void *payload,
393 		  uint64_t lba, uint32_t lba_count)
394 {
395 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
396 	struct spdk_bs_channel		*channel = set->channel;
397 
398 	SPDK_DEBUGLOG(blob_rw, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
399 		      lba);
400 
401 	set->u.batch.outstanding_ops++;
402 	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
403 }
404 
405 void
406 bs_batch_write_dev(spdk_bs_batch_t *batch, void *payload,
407 		   uint64_t lba, uint32_t lba_count)
408 {
409 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
410 	struct spdk_bs_channel		*channel = set->channel;
411 
412 	SPDK_DEBUGLOG(blob_rw, "Writing %" PRIu32 " blocks to LBA %" PRIu64 "\n", lba_count, lba);
413 
414 	set->u.batch.outstanding_ops++;
415 	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
416 			    &set->cb_args);
417 }
418 
419 void
420 bs_batch_unmap_dev(spdk_bs_batch_t *batch,
421 		   uint64_t lba, uint64_t lba_count)
422 {
423 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
424 	struct spdk_bs_channel		*channel = set->channel;
425 
426 	SPDK_DEBUGLOG(blob_rw, "Unmapping %" PRIu64 " blocks at LBA %" PRIu64 "\n", lba_count,
427 		      lba);
428 
429 	set->u.batch.outstanding_ops++;
430 	channel->dev->unmap(channel->dev, channel->dev_channel, lba, lba_count,
431 			    &set->cb_args);
432 }
433 
434 void
435 bs_batch_write_zeroes_dev(spdk_bs_batch_t *batch,
436 			  uint64_t lba, uint64_t lba_count)
437 {
438 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
439 	struct spdk_bs_channel		*channel = set->channel;
440 
441 	SPDK_DEBUGLOG(blob_rw, "Zeroing %" PRIu64 " blocks at LBA %" PRIu64 "\n", lba_count, lba);
442 
443 	set->u.batch.outstanding_ops++;
444 	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
445 				   &set->cb_args);
446 }
447 
448 void
449 bs_batch_close(spdk_bs_batch_t *batch)
450 {
451 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
452 
453 	set->u.batch.batch_closed = 1;
454 
455 	if (set->u.batch.outstanding_ops == 0) {
456 		if (set->u.batch.cb_fn) {
457 			set->cb_args.cb_fn = bs_sequence_completion;
458 			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, set->bserrno);
459 		} else {
460 			bs_request_set_complete(set);
461 		}
462 	}
463 }
464 
465 spdk_bs_batch_t *
466 bs_sequence_to_batch(spdk_bs_sequence_t *seq, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
467 {
468 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq;
469 
470 	set->u.batch.cb_fn = cb_fn;
471 	set->u.batch.cb_arg = cb_arg;
472 	set->u.batch.outstanding_ops = 0;
473 	set->u.batch.batch_closed = 0;
474 
475 	set->cb_args.cb_fn = bs_batch_completion;
476 
477 	return set;
478 }
479 
480 spdk_bs_user_op_t *
481 bs_user_op_alloc(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl,
482 		 enum spdk_blob_op_type op_type, struct spdk_blob *blob,
483 		 void *payload, int iovcnt, uint64_t offset, uint64_t length)
484 {
485 	struct spdk_bs_channel		*channel;
486 	struct spdk_bs_request_set	*set;
487 	struct spdk_bs_user_op_args	*args;
488 
489 	channel = spdk_io_channel_get_ctx(_channel);
490 	assert(channel != NULL);
491 	set = TAILQ_FIRST(&channel->reqs);
492 	if (!set) {
493 		return NULL;
494 	}
495 	TAILQ_REMOVE(&channel->reqs, set, link);
496 
497 	spdk_trace_record(TRACE_BLOB_REQ_SET_START, 0, 0, (uintptr_t)&set->cb_args,
498 			  (uintptr_t)cpl->u.blob_basic.cb_arg);
499 
500 	set->cpl = *cpl;
501 	set->channel = channel;
502 	set->back_channel = NULL;
503 	set->ext_io_opts = NULL;
504 
505 	args = &set->u.user_op;
506 
507 	args->type = op_type;
508 	args->iovcnt = iovcnt;
509 	args->blob = blob;
510 	args->offset = offset;
511 	args->length = length;
512 	args->payload = payload;
513 
514 	return (spdk_bs_user_op_t *)set;
515 }
516 
517 void
518 bs_user_op_execute(spdk_bs_user_op_t *op)
519 {
520 	struct spdk_bs_request_set	*set;
521 	struct spdk_bs_user_op_args	*args;
522 	struct spdk_io_channel		*ch;
523 
524 	set = (struct spdk_bs_request_set *)op;
525 	args = &set->u.user_op;
526 	ch = spdk_io_channel_from_ctx(set->channel);
527 
528 	switch (args->type) {
529 	case SPDK_BLOB_READ:
530 		spdk_blob_io_read(args->blob, ch, args->payload, args->offset, args->length,
531 				  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
532 		break;
533 	case SPDK_BLOB_WRITE:
534 		spdk_blob_io_write(args->blob, ch, args->payload, args->offset, args->length,
535 				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
536 		break;
537 	case SPDK_BLOB_UNMAP:
538 		spdk_blob_io_unmap(args->blob, ch, args->offset, args->length,
539 				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
540 		break;
541 	case SPDK_BLOB_WRITE_ZEROES:
542 		spdk_blob_io_write_zeroes(args->blob, ch, args->offset, args->length,
543 					  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
544 		break;
545 	case SPDK_BLOB_READV:
546 		spdk_blob_io_readv_ext(args->blob, ch, args->payload, args->iovcnt,
547 				       args->offset, args->length,
548 				       set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg,
549 				       set->ext_io_opts);
550 		break;
551 	case SPDK_BLOB_WRITEV:
552 		spdk_blob_io_writev_ext(args->blob, ch, args->payload, args->iovcnt,
553 					args->offset, args->length,
554 					set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg,
555 					set->ext_io_opts);
556 		break;
557 	}
558 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
559 }
560 
561 void
562 bs_user_op_abort(spdk_bs_user_op_t *op, int bserrno)
563 {
564 	struct spdk_bs_request_set	*set;
565 
566 	set = (struct spdk_bs_request_set *)op;
567 
568 	set->cpl.u.blob_basic.cb_fn(set->cpl.u.blob_basic.cb_arg, bserrno);
569 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
570 }
571 
572 SPDK_LOG_REGISTER_COMPONENT(blob_rw)
573