xref: /spdk/lib/blob/request.c (revision 8a0a98d35e21f282088edf28b9e8da66ec390e3a)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk/stdinc.h"
35 
36 #include "blobstore.h"
37 #include "request.h"
38 
39 #include "spdk/thread.h"
40 #include "spdk/queue.h"
41 
42 #include "spdk_internal/log.h"
43 
44 void
45 spdk_bs_call_cpl(struct spdk_bs_cpl *cpl, int bserrno)
46 {
47 	switch (cpl->type) {
48 	case SPDK_BS_CPL_TYPE_BS_BASIC:
49 		cpl->u.bs_basic.cb_fn(cpl->u.bs_basic.cb_arg,
50 				      bserrno);
51 		break;
52 	case SPDK_BS_CPL_TYPE_BS_HANDLE:
53 		cpl->u.bs_handle.cb_fn(cpl->u.bs_handle.cb_arg,
54 				       cpl->u.bs_handle.bs,
55 				       bserrno);
56 		break;
57 	case SPDK_BS_CPL_TYPE_BLOB_BASIC:
58 		cpl->u.blob_basic.cb_fn(cpl->u.blob_basic.cb_arg,
59 					bserrno);
60 		break;
61 	case SPDK_BS_CPL_TYPE_BLOBID:
62 		cpl->u.blobid.cb_fn(cpl->u.blobid.cb_arg,
63 				    cpl->u.blobid.blobid,
64 				    bserrno);
65 		break;
66 	case SPDK_BS_CPL_TYPE_BLOB_HANDLE:
67 		cpl->u.blob_handle.cb_fn(cpl->u.blob_handle.cb_arg,
68 					 cpl->u.blob_handle.blob,
69 					 bserrno);
70 		break;
71 	case SPDK_BS_CPL_TYPE_NESTED_SEQUENCE:
72 		cpl->u.nested_seq.cb_fn(cpl->u.nested_seq.cb_arg,
73 					cpl->u.nested_seq.parent,
74 					bserrno);
75 		break;
76 	case SPDK_BS_CPL_TYPE_NONE:
77 		/* this completion's callback is handled elsewhere */
78 		break;
79 	}
80 }
81 
82 static void
83 spdk_bs_request_set_complete(struct spdk_bs_request_set *set)
84 {
85 	struct spdk_bs_cpl cpl = set->cpl;
86 	int bserrno = set->bserrno;
87 
88 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
89 
90 	spdk_bs_call_cpl(&cpl, bserrno);
91 }
92 
93 static void
94 spdk_bs_sequence_completion(struct spdk_io_channel *channel, void *cb_arg, int bserrno)
95 {
96 	struct spdk_bs_request_set *set = cb_arg;
97 
98 	set->bserrno = bserrno;
99 	set->u.sequence.cb_fn((spdk_bs_sequence_t *)set, set->u.sequence.cb_arg, bserrno);
100 }
101 
102 spdk_bs_sequence_t *
103 spdk_bs_sequence_start(struct spdk_io_channel *_channel,
104 		       struct spdk_bs_cpl *cpl)
105 {
106 	struct spdk_bs_channel		*channel;
107 	struct spdk_bs_request_set	*set;
108 
109 	channel = spdk_io_channel_get_ctx(_channel);
110 
111 	set = TAILQ_FIRST(&channel->reqs);
112 	if (!set) {
113 		return NULL;
114 	}
115 	TAILQ_REMOVE(&channel->reqs, set, link);
116 
117 	set->cpl = *cpl;
118 	set->bserrno = 0;
119 	set->channel = channel;
120 
121 	set->cb_args.cb_fn = spdk_bs_sequence_completion;
122 	set->cb_args.cb_arg = set;
123 	set->cb_args.channel = channel->dev_channel;
124 
125 	return (spdk_bs_sequence_t *)set;
126 }
127 
128 void
129 spdk_bs_sequence_read_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
130 			     void *payload, uint64_t lba, uint32_t lba_count,
131 			     spdk_bs_sequence_cpl cb_fn, void *cb_arg)
132 {
133 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
134 	struct spdk_bs_channel       *channel = set->channel;
135 
136 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
137 		      lba);
138 
139 	set->u.sequence.cb_fn = cb_fn;
140 	set->u.sequence.cb_arg = cb_arg;
141 
142 	bs_dev->read(bs_dev, spdk_io_channel_from_ctx(channel), payload, lba, lba_count, &set->cb_args);
143 }
144 
145 void
146 spdk_bs_sequence_read_dev(spdk_bs_sequence_t *seq, void *payload,
147 			  uint64_t lba, uint32_t lba_count,
148 			  spdk_bs_sequence_cpl cb_fn, void *cb_arg)
149 {
150 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
151 	struct spdk_bs_channel       *channel = set->channel;
152 
153 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
154 		      lba);
155 
156 	set->u.sequence.cb_fn = cb_fn;
157 	set->u.sequence.cb_arg = cb_arg;
158 
159 	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
160 }
161 
162 void
163 spdk_bs_sequence_write_dev(spdk_bs_sequence_t *seq, void *payload,
164 			   uint64_t lba, uint32_t lba_count,
165 			   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
166 {
167 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
168 	struct spdk_bs_channel       *channel = set->channel;
169 
170 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
171 		      lba);
172 
173 	set->u.sequence.cb_fn = cb_fn;
174 	set->u.sequence.cb_arg = cb_arg;
175 
176 	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
177 			    &set->cb_args);
178 }
179 
180 void
181 spdk_bs_sequence_readv_bs_dev(spdk_bs_sequence_t *seq, struct spdk_bs_dev *bs_dev,
182 			      struct iovec *iov, int iovcnt, uint64_t lba, uint32_t lba_count,
183 			      spdk_bs_sequence_cpl cb_fn, void *cb_arg)
184 {
185 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
186 	struct spdk_bs_channel       *channel = set->channel;
187 
188 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
189 		      lba);
190 
191 	set->u.sequence.cb_fn = cb_fn;
192 	set->u.sequence.cb_arg = cb_arg;
193 
194 	bs_dev->readv(bs_dev, spdk_io_channel_from_ctx(channel), iov, iovcnt, lba, lba_count,
195 		      &set->cb_args);
196 }
197 
198 void
199 spdk_bs_sequence_readv_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
200 			   uint64_t lba, uint32_t lba_count, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
201 {
202 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
203 	struct spdk_bs_channel       *channel = set->channel;
204 
205 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
206 		      lba);
207 
208 	set->u.sequence.cb_fn = cb_fn;
209 	set->u.sequence.cb_arg = cb_arg;
210 	channel->dev->readv(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
211 			    &set->cb_args);
212 }
213 
214 void
215 spdk_bs_sequence_writev_dev(spdk_bs_sequence_t *seq, struct iovec *iov, int iovcnt,
216 			    uint64_t lba, uint32_t lba_count,
217 			    spdk_bs_sequence_cpl cb_fn, void *cb_arg)
218 {
219 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
220 	struct spdk_bs_channel       *channel = set->channel;
221 
222 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
223 		      lba);
224 
225 	set->u.sequence.cb_fn = cb_fn;
226 	set->u.sequence.cb_arg = cb_arg;
227 
228 	channel->dev->writev(channel->dev, channel->dev_channel, iov, iovcnt, lba, lba_count,
229 			     &set->cb_args);
230 }
231 
232 void
233 spdk_bs_sequence_unmap_dev(spdk_bs_sequence_t *seq,
234 			   uint64_t lba, uint32_t lba_count,
235 			   spdk_bs_sequence_cpl cb_fn, void *cb_arg)
236 {
237 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
238 	struct spdk_bs_channel       *channel = set->channel;
239 
240 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Unmapping %" PRIu32 " blocks at LBA %" PRIu64 "\n", lba_count,
241 		      lba);
242 
243 	set->u.sequence.cb_fn = cb_fn;
244 	set->u.sequence.cb_arg = cb_arg;
245 
246 	channel->dev->unmap(channel->dev, channel->dev_channel, lba, lba_count,
247 			    &set->cb_args);
248 }
249 
250 void
251 spdk_bs_sequence_write_zeroes_dev(spdk_bs_sequence_t *seq,
252 				  uint64_t lba, uint32_t lba_count,
253 				  spdk_bs_sequence_cpl cb_fn, void *cb_arg)
254 {
255 	struct spdk_bs_request_set      *set = (struct spdk_bs_request_set *)seq;
256 	struct spdk_bs_channel       *channel = set->channel;
257 
258 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "writing zeroes to %" PRIu32 " blocks at LBA %" PRIu64 "\n",
259 		      lba_count, lba);
260 
261 	set->u.sequence.cb_fn = cb_fn;
262 	set->u.sequence.cb_arg = cb_arg;
263 
264 	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
265 				   &set->cb_args);
266 }
267 
268 void
269 spdk_bs_sequence_finish(spdk_bs_sequence_t *seq, int bserrno)
270 {
271 	if (bserrno != 0) {
272 		seq->bserrno = bserrno;
273 	}
274 	spdk_bs_request_set_complete((struct spdk_bs_request_set *)seq);
275 }
276 
277 void
278 spdk_bs_user_op_sequence_finish(void *cb_arg, int bserrno)
279 {
280 	spdk_bs_sequence_t *seq = cb_arg;
281 
282 	spdk_bs_sequence_finish(seq, bserrno);
283 }
284 
285 static void
286 spdk_bs_batch_completion(struct spdk_io_channel *_channel,
287 			 void *cb_arg, int bserrno)
288 {
289 	struct spdk_bs_request_set	*set = cb_arg;
290 
291 	set->u.batch.outstanding_ops--;
292 	if (bserrno != 0) {
293 		set->bserrno = bserrno;
294 	}
295 
296 	if (set->u.batch.outstanding_ops == 0 && set->u.batch.batch_closed) {
297 		if (set->u.batch.cb_fn) {
298 			set->cb_args.cb_fn = spdk_bs_sequence_completion;
299 			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, bserrno);
300 		} else {
301 			spdk_bs_request_set_complete(set);
302 		}
303 	}
304 }
305 
306 spdk_bs_batch_t *
307 spdk_bs_batch_open(struct spdk_io_channel *_channel,
308 		   struct spdk_bs_cpl *cpl)
309 {
310 	struct spdk_bs_channel		*channel;
311 	struct spdk_bs_request_set	*set;
312 
313 	channel = spdk_io_channel_get_ctx(_channel);
314 
315 	set = TAILQ_FIRST(&channel->reqs);
316 	if (!set) {
317 		return NULL;
318 	}
319 	TAILQ_REMOVE(&channel->reqs, set, link);
320 
321 	set->cpl = *cpl;
322 	set->bserrno = 0;
323 	set->channel = channel;
324 
325 	set->u.batch.cb_fn = NULL;
326 	set->u.batch.cb_arg = NULL;
327 	set->u.batch.outstanding_ops = 0;
328 	set->u.batch.batch_closed = 0;
329 
330 	set->cb_args.cb_fn = spdk_bs_batch_completion;
331 	set->cb_args.cb_arg = set;
332 	set->cb_args.channel = channel->dev_channel;
333 
334 	return (spdk_bs_batch_t *)set;
335 }
336 
337 void
338 spdk_bs_batch_read_bs_dev(spdk_bs_batch_t *batch, struct spdk_bs_dev *bs_dev,
339 			  void *payload, uint64_t lba, uint32_t lba_count)
340 {
341 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
342 	struct spdk_bs_channel		*channel = set->channel;
343 
344 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
345 		      lba);
346 
347 	set->u.batch.outstanding_ops++;
348 	bs_dev->read(bs_dev, spdk_io_channel_from_ctx(channel), payload, lba, lba_count, &set->cb_args);
349 }
350 
351 void
352 spdk_bs_batch_read_dev(spdk_bs_batch_t *batch, void *payload,
353 		       uint64_t lba, uint32_t lba_count)
354 {
355 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
356 	struct spdk_bs_channel		*channel = set->channel;
357 
358 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu32 " blocks from LBA %" PRIu64 "\n", lba_count,
359 		      lba);
360 
361 	set->u.batch.outstanding_ops++;
362 	channel->dev->read(channel->dev, channel->dev_channel, payload, lba, lba_count, &set->cb_args);
363 }
364 
365 void
366 spdk_bs_batch_write_dev(spdk_bs_batch_t *batch, void *payload,
367 			uint64_t lba, uint32_t lba_count)
368 {
369 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
370 	struct spdk_bs_channel		*channel = set->channel;
371 
372 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu32 " blocks to LBA %" PRIu64 "\n", lba_count, lba);
373 
374 	set->u.batch.outstanding_ops++;
375 	channel->dev->write(channel->dev, channel->dev_channel, payload, lba, lba_count,
376 			    &set->cb_args);
377 }
378 
379 void
380 spdk_bs_batch_unmap_dev(spdk_bs_batch_t *batch,
381 			uint64_t lba, uint32_t lba_count)
382 {
383 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
384 	struct spdk_bs_channel		*channel = set->channel;
385 
386 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Unmapping %" PRIu32 " blocks at LBA %" PRIu64 "\n", lba_count,
387 		      lba);
388 
389 	set->u.batch.outstanding_ops++;
390 	channel->dev->unmap(channel->dev, channel->dev_channel, lba, lba_count,
391 			    &set->cb_args);
392 }
393 
394 void
395 spdk_bs_batch_write_zeroes_dev(spdk_bs_batch_t *batch,
396 			       uint64_t lba, uint32_t lba_count)
397 {
398 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
399 	struct spdk_bs_channel		*channel = set->channel;
400 
401 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Zeroing %" PRIu32 " blocks at LBA %" PRIu64 "\n", lba_count, lba);
402 
403 	set->u.batch.outstanding_ops++;
404 	channel->dev->write_zeroes(channel->dev, channel->dev_channel, lba, lba_count,
405 				   &set->cb_args);
406 }
407 
408 static void
409 spdk_bs_batch_blob_op_complete(void *arg, int bserrno)
410 {
411 	/* TODO: spdk_bs_batch_completion does not actually use the channel parameter -
412 	 *  just pass NULL here instead of getting the channel from the set cb_arg.
413 	 */
414 	spdk_bs_batch_completion(NULL, arg, bserrno);
415 }
416 
417 void
418 spdk_bs_batch_read_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
419 			void *payload, uint64_t offset, uint64_t length)
420 {
421 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
422 	struct spdk_bs_channel		*channel = set->channel;
423 
424 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Reading %" PRIu64 " pages from offset %" PRIu64 "\n", length,
425 		      offset);
426 
427 	set->u.batch.outstanding_ops++;
428 	spdk_blob_io_read(blob, spdk_io_channel_from_ctx(channel), payload, offset,
429 			  length, spdk_bs_batch_blob_op_complete, set);
430 }
431 
432 void
433 spdk_bs_batch_write_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
434 			 void *payload, uint64_t offset, uint64_t length)
435 {
436 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
437 	struct spdk_bs_channel		*channel = set->channel;
438 
439 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Writing %" PRIu64 " pages from offset %" PRIu64 "\n", length,
440 		      offset);
441 
442 	set->u.batch.outstanding_ops++;
443 	spdk_blob_io_write(blob, spdk_io_channel_from_ctx(channel), payload, offset,
444 			   length, spdk_bs_batch_blob_op_complete, set);
445 }
446 
447 void
448 spdk_bs_batch_unmap_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
449 			 uint64_t offset, uint64_t length)
450 {
451 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
452 	struct spdk_bs_channel		*channel = set->channel;
453 
454 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Unmapping %" PRIu64 " pages from offset %" PRIu64 "\n", length,
455 		      offset);
456 
457 	set->u.batch.outstanding_ops++;
458 	spdk_blob_io_unmap(blob, spdk_io_channel_from_ctx(channel), offset, length,
459 			   spdk_bs_batch_blob_op_complete, set);
460 }
461 
462 void
463 spdk_bs_batch_write_zeroes_blob(spdk_bs_batch_t *batch, struct spdk_blob *blob,
464 				uint64_t offset, uint64_t length)
465 {
466 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
467 	struct spdk_bs_channel		*channel = set->channel;
468 
469 	SPDK_DEBUGLOG(SPDK_LOG_BLOB_RW, "Zeroing %" PRIu64 " pages from offset %" PRIu64 "\n", length,
470 		      offset);
471 
472 	set->u.batch.outstanding_ops++;
473 	spdk_blob_io_write_zeroes(blob, spdk_io_channel_from_ctx(channel), offset, length,
474 				  spdk_bs_batch_blob_op_complete, set);
475 }
476 
477 void
478 spdk_bs_batch_set_errno(spdk_bs_batch_t *batch, int bserrno)
479 {
480 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
481 
482 	set->bserrno = bserrno;
483 }
484 
485 void
486 spdk_bs_batch_close(spdk_bs_batch_t *batch)
487 {
488 	struct spdk_bs_request_set	*set = (struct spdk_bs_request_set *)batch;
489 
490 	set->u.batch.batch_closed = 1;
491 
492 	if (set->u.batch.outstanding_ops == 0) {
493 		if (set->u.batch.cb_fn) {
494 			set->cb_args.cb_fn = spdk_bs_sequence_completion;
495 			set->u.batch.cb_fn((spdk_bs_sequence_t *)set, set->u.batch.cb_arg, set->bserrno);
496 		} else {
497 			spdk_bs_request_set_complete(set);
498 		}
499 	}
500 }
501 
502 spdk_bs_batch_t *
503 spdk_bs_sequence_to_batch(spdk_bs_sequence_t *seq, spdk_bs_sequence_cpl cb_fn, void *cb_arg)
504 {
505 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)seq;
506 
507 	set->u.batch.cb_fn = cb_fn;
508 	set->u.batch.cb_arg = cb_arg;
509 	set->u.batch.outstanding_ops = 0;
510 	set->u.batch.batch_closed = 0;
511 
512 	set->cb_args.cb_fn = spdk_bs_batch_completion;
513 
514 	return set;
515 }
516 
517 spdk_bs_sequence_t *
518 spdk_bs_batch_to_sequence(spdk_bs_batch_t *batch)
519 {
520 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)batch;
521 
522 	set->u.batch.outstanding_ops++;
523 
524 	set->cpl.type = SPDK_BS_CPL_TYPE_BLOB_BASIC;
525 	set->cpl.u.blob_basic.cb_fn = spdk_bs_sequence_to_batch_completion;
526 	set->cpl.u.blob_basic.cb_arg = set;
527 	set->bserrno = 0;
528 
529 	set->cb_args.cb_fn = spdk_bs_sequence_completion;
530 	set->cb_args.cb_arg = set;
531 	set->cb_args.channel = set->channel->dev_channel;
532 
533 	return (spdk_bs_sequence_t *)set;
534 }
535 
536 spdk_bs_user_op_t *
537 spdk_bs_user_op_alloc(struct spdk_io_channel *_channel, struct spdk_bs_cpl *cpl,
538 		      enum spdk_blob_op_type op_type, struct spdk_blob *blob,
539 		      void *payload, int iovcnt, uint64_t offset, uint64_t length)
540 {
541 	struct spdk_bs_channel		*channel;
542 	struct spdk_bs_request_set	*set;
543 	struct spdk_bs_user_op_args	*args;
544 
545 	channel = spdk_io_channel_get_ctx(_channel);
546 
547 	set = TAILQ_FIRST(&channel->reqs);
548 	if (!set) {
549 		return NULL;
550 	}
551 	TAILQ_REMOVE(&channel->reqs, set, link);
552 
553 	set->cpl = *cpl;
554 	set->channel = channel;
555 
556 	args = &set->u.user_op;
557 
558 	args->type = op_type;
559 	args->iovcnt = iovcnt;
560 	args->blob = blob;
561 	args->offset = offset;
562 	args->length = length;
563 	args->payload = payload;
564 
565 	return (spdk_bs_user_op_t *)set;
566 }
567 
568 void
569 spdk_bs_user_op_execute(spdk_bs_user_op_t *op)
570 {
571 	struct spdk_bs_request_set	*set;
572 	struct spdk_bs_user_op_args	*args;
573 	struct spdk_io_channel		*ch;
574 
575 	set = (struct spdk_bs_request_set *)op;
576 	args = &set->u.user_op;
577 	ch = spdk_io_channel_from_ctx(set->channel);
578 
579 	switch (args->type) {
580 	case SPDK_BLOB_READ:
581 		spdk_blob_io_read(args->blob, ch, args->payload, args->offset, args->length,
582 				  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
583 		break;
584 	case SPDK_BLOB_WRITE:
585 		spdk_blob_io_write(args->blob, ch, args->payload, args->offset, args->length,
586 				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
587 		break;
588 	case SPDK_BLOB_UNMAP:
589 		spdk_blob_io_unmap(args->blob, ch, args->offset, args->length,
590 				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
591 		break;
592 	case SPDK_BLOB_WRITE_ZEROES:
593 		spdk_blob_io_write_zeroes(args->blob, ch, args->offset, args->length,
594 					  set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
595 		break;
596 	case SPDK_BLOB_READV:
597 		spdk_blob_io_readv(args->blob, ch, args->payload, args->iovcnt,
598 				   args->offset, args->length,
599 				   set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
600 		break;
601 	case SPDK_BLOB_WRITEV:
602 		spdk_blob_io_writev(args->blob, ch, args->payload, args->iovcnt,
603 				    args->offset, args->length,
604 				    set->cpl.u.blob_basic.cb_fn, set->cpl.u.blob_basic.cb_arg);
605 		break;
606 	}
607 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
608 }
609 
610 void
611 spdk_bs_user_op_abort(spdk_bs_user_op_t *op)
612 {
613 	struct spdk_bs_request_set	*set;
614 
615 	set = (struct spdk_bs_request_set *)op;
616 
617 	set->cpl.u.blob_basic.cb_fn(set->cpl.u.blob_basic.cb_arg, -EIO);
618 	TAILQ_INSERT_TAIL(&set->channel->reqs, set, link);
619 }
620 
621 void
622 spdk_bs_sequence_to_batch_completion(void *cb_arg, int bserrno)
623 {
624 	struct spdk_bs_request_set *set = (struct spdk_bs_request_set *)cb_arg;
625 
626 	set->u.batch.outstanding_ops--;
627 
628 	if (set->u.batch.outstanding_ops == 0 && set->u.batch.batch_closed) {
629 		if (set->cb_args.cb_fn) {
630 			set->cb_args.cb_fn(set->cb_args.channel, set->cb_args.cb_arg, bserrno);
631 		}
632 	}
633 }
634 
635 SPDK_LOG_REGISTER_COMPONENT("blob_rw", SPDK_LOG_BLOB_RW)
636