xref: /spdk/module/bdev/daos/bdev_daos.c (revision 8afdeef3becfe9409cc9e7372bd0bc10e8b7d46d)
1 /*   SPDX-License-Identifier: BSD-3-Clause
2  *   Copyright (c) croit GmbH.
3  *   All rights reserved.
4  *   Copyright (c) 2023 Enakta Labs
5  *   All rights reserved.
6  */
7 
8 #include <sys/queue.h>
9 
10 #include "spdk/bdev.h"
11 #include "spdk/bdev_module.h"
12 #include "spdk/endian.h"
13 #include "spdk/env.h"
14 #include "spdk/json.h"
15 #include "spdk/thread.h"
16 #include "spdk/queue.h"
17 #include "spdk/string.h"
18 #include "spdk/stdinc.h"
19 #include "spdk/log.h"
20 
21 #include <daos.h>
22 #include <daos_event.h>
23 #include <daos_fs.h>
24 #include <daos_types.h>
25 #include <daos_pool.h>
26 #include <daos_cont.h>
27 #include <daos_errno.h>
28 
29 #include "bdev_daos.h"
30 
31 struct bdev_daos_task {
32 	daos_event_t ev;
33 	struct spdk_thread *submit_td;
34 	struct spdk_bdev_io *bdev_io;
35 
36 	int io_status;
37 	uint64_t offset;
38 
39 	/* DAOS version of iovec and scatter/gather */
40 	daos_size_t read_size;
41 	d_iov_t diovs[SPDK_BDEV_IO_NUM_CHILD_IOV];
42 	d_sg_list_t sgl;
43 };
44 
45 struct bdev_daos {
46 	struct spdk_bdev disk;
47 	daos_oclass_id_t oclass;
48 
49 	char pool_name[DAOS_PROP_MAX_LABEL_BUF_LEN];
50 	char cont_name[DAOS_PROP_MAX_LABEL_BUF_LEN];
51 
52 	struct bdev_daos_task *reset_task;
53 	struct spdk_poller    *reset_retry_timer;
54 };
55 
56 struct bdev_daos_io_channel {
57 	struct bdev_daos *disk;
58 	struct spdk_poller *poller;
59 
60 	daos_handle_t pool;
61 	daos_handle_t cont;
62 
63 	dfs_t *dfs;
64 	dfs_obj_t *obj;
65 	daos_handle_t queue;
66 };
67 
68 static uint32_t g_bdev_daos_init_count = 0;
69 static pthread_mutex_t g_bdev_daos_init_mutex = PTHREAD_MUTEX_INITIALIZER;
70 
71 static int bdev_daos_initialize(void);
72 
73 static int bdev_daos_get_engine(void);
74 static int bdev_daos_put_engine(void);
75 
76 static int
77 bdev_daos_get_ctx_size(void)
78 {
79 	return sizeof(struct bdev_daos_task);
80 }
81 
82 static struct spdk_bdev_module daos_if = {
83 	.name = "daos",
84 	.module_init = bdev_daos_initialize,
85 	.get_ctx_size = bdev_daos_get_ctx_size,
86 };
87 
88 SPDK_BDEV_MODULE_REGISTER(daos, &daos_if)
89 
90 
91 /* Convert DAOS errors to closest POSIX errno
92  * This is pretty much copy of daos_der2errno()
93  * from https://github.com/daos-stack/daos/blob/master/src/include/daos/common.h
94  * but unfortunately it's not exported in DAOS packages
95  */
96 static inline int
97 daos2posix_errno(int err)
98 {
99 	if (err > 0) {
100 		return EINVAL;
101 	}
102 
103 	switch (err) {
104 	case -DER_SUCCESS:
105 		return 0;
106 	case -DER_NO_PERM:
107 	case -DER_EP_RO:
108 	case -DER_EP_OLD:
109 		return EPERM;
110 	case -DER_ENOENT:
111 	case -DER_NONEXIST:
112 		return ENOENT;
113 	case -DER_INVAL:
114 	case -DER_NOTYPE:
115 	case -DER_NOSCHEMA:
116 	case -DER_NOLOCAL:
117 	case -DER_NO_HDL:
118 	case -DER_IO_INVAL:
119 		return EINVAL;
120 	case -DER_KEY2BIG:
121 	case -DER_REC2BIG:
122 		return E2BIG;
123 	case -DER_EXIST:
124 		return EEXIST;
125 	case -DER_UNREACH:
126 		return EHOSTUNREACH;
127 	case -DER_NOSPACE:
128 		return ENOSPC;
129 	case -DER_ALREADY:
130 		return EALREADY;
131 	case -DER_NOMEM:
132 		return ENOMEM;
133 	case -DER_TIMEDOUT:
134 		return ETIMEDOUT;
135 	case -DER_BUSY:
136 	case -DER_EQ_BUSY:
137 		return EBUSY;
138 	case -DER_AGAIN:
139 		return EAGAIN;
140 	case -DER_PROTO:
141 		return EPROTO;
142 	case -DER_IO:
143 		return EIO;
144 	case -DER_CANCELED:
145 	case DER_OP_CANCELED:
146 		return ECANCELED;
147 	case -DER_OVERFLOW:
148 		return EOVERFLOW;
149 	case -DER_BADPATH:
150 	case -DER_NOTDIR:
151 		return ENOTDIR;
152 	case -DER_STALE:
153 		return ESTALE;
154 	case -DER_TX_RESTART:
155 		return ERESTART;
156 	default:
157 		return EIO;
158 	}
159 };
160 
161 static void
162 bdev_daos_free(struct bdev_daos *bdev_daos)
163 {
164 	if (!bdev_daos) {
165 		return;
166 	}
167 
168 	free(bdev_daos->disk.name);
169 	free(bdev_daos);
170 }
171 
172 static void
173 bdev_daos_destruct_cb(void *io_device)
174 {
175 	int rc;
176 	struct bdev_daos *daos = io_device;
177 
178 	assert(daos != NULL);
179 
180 	bdev_daos_free(daos);
181 
182 	rc = bdev_daos_put_engine();
183 	if (rc) {
184 		SPDK_ERRLOG("could not de-initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
185 	}
186 }
187 
188 static int
189 bdev_daos_destruct(void *ctx)
190 {
191 	struct bdev_daos *daos = ctx;
192 
193 	SPDK_NOTICELOG("%s: destroying bdev_daos device\n", daos->disk.name);
194 
195 	spdk_io_device_unregister(daos, bdev_daos_destruct_cb);
196 
197 	return 0;
198 }
199 
200 static void
201 _bdev_daos_io_complete(void *bdev_daos_task)
202 {
203 	struct bdev_daos_task *task = bdev_daos_task;
204 
205 	SPDK_DEBUGLOG(bdev_daos, "completed IO at %#lx with status %s (errno=%d)\n",
206 		      task->offset, task->io_status ? "FAILURE" : "SUCCESS", task->io_status);
207 
208 	if (task->io_status == 0) {
209 		spdk_bdev_io_complete(spdk_bdev_io_from_ctx(task), SPDK_BDEV_IO_STATUS_SUCCESS);
210 	} else {
211 		spdk_bdev_io_complete_aio_status(spdk_bdev_io_from_ctx(task), task->io_status);
212 	}
213 }
214 
215 static void
216 bdev_daos_io_complete(struct spdk_bdev_io *bdev_io, int io_status)
217 {
218 	struct bdev_daos_task *task = (struct bdev_daos_task *)bdev_io->driver_ctx;
219 	struct spdk_thread *current_thread = spdk_get_thread();
220 
221 	assert(task->submit_td != NULL);
222 
223 	task->io_status = io_status;
224 	if (task->submit_td != current_thread) {
225 		spdk_thread_send_msg(task->submit_td, _bdev_daos_io_complete, task);
226 	} else {
227 		_bdev_daos_io_complete(task);
228 	}
229 }
230 
231 static int64_t
232 bdev_daos_writev(struct bdev_daos *daos, struct bdev_daos_io_channel *ch,
233 		 struct bdev_daos_task *task,
234 		 struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
235 {
236 	int rc;
237 
238 	SPDK_DEBUGLOG(bdev_daos, "write %d iovs size %lu to off: %#lx\n",
239 		      iovcnt, nbytes, offset);
240 
241 	assert(ch != NULL);
242 	assert(daos != NULL);
243 	assert(task != NULL);
244 	assert(iov != NULL);
245 
246 	if (iovcnt > SPDK_BDEV_IO_NUM_CHILD_IOV) {
247 		SPDK_ERRLOG("iovs number [%d] exceeds max allowed limit [%d]\n", iovcnt,
248 			    SPDK_BDEV_IO_NUM_CHILD_IOV);
249 		return -E2BIG;
250 	}
251 
252 	if ((rc = daos_event_init(&task->ev, ch->queue, NULL))) {
253 		SPDK_ERRLOG("%s: could not initialize async event: " DF_RC "\n",
254 			    daos->disk.name, DP_RC(rc));
255 		return -daos2posix_errno(rc);
256 	}
257 
258 	for (int i = 0; i < iovcnt; i++, iov++) {
259 		d_iov_set(&(task->diovs[i]), iov->iov_base, iov->iov_len);
260 	}
261 
262 	task->sgl.sg_nr = iovcnt;
263 	task->sgl.sg_nr_out = 0;
264 	task->sgl.sg_iovs = task->diovs;
265 	task->offset = offset;
266 
267 	if ((rc = dfs_write(ch->dfs, ch->obj, &task->sgl, offset, &task->ev))) {
268 		SPDK_ERRLOG("%s: could not start async write: %s\n",
269 			    daos->disk.name, strerror(rc));
270 		daos_event_fini(&task->ev);
271 		return -rc;
272 	}
273 
274 	return nbytes;
275 }
276 
277 static int64_t
278 bdev_daos_readv(struct bdev_daos *daos, struct bdev_daos_io_channel *ch,
279 		struct bdev_daos_task *task,
280 		struct iovec *iov, int iovcnt, uint64_t nbytes, uint64_t offset)
281 {
282 	int rc;
283 
284 	SPDK_DEBUGLOG(bdev_daos, "read %d iovs size %lu to off: %#lx\n",
285 		      iovcnt, nbytes, offset);
286 
287 	assert(ch != NULL);
288 	assert(daos != NULL);
289 	assert(task != NULL);
290 	assert(iov != NULL);
291 
292 	if (iovcnt > SPDK_BDEV_IO_NUM_CHILD_IOV) {
293 		SPDK_ERRLOG("iovs number [%d] exceeds max allowed limit [%d]\n", iovcnt,
294 			    SPDK_BDEV_IO_NUM_CHILD_IOV);
295 		return -E2BIG;
296 	}
297 
298 	if ((rc = daos_event_init(&task->ev, ch->queue, NULL))) {
299 		SPDK_ERRLOG("%s: could not initialize async event: " DF_RC "\n",
300 			    daos->disk.name, DP_RC(rc));
301 		return -daos2posix_errno(rc);
302 	}
303 
304 	for (int i = 0; i < iovcnt; i++, iov++) {
305 		d_iov_set(&(task->diovs[i]), iov->iov_base, iov->iov_len);
306 	}
307 
308 	task->sgl.sg_nr = iovcnt;
309 	task->sgl.sg_nr_out = 0;
310 	task->sgl.sg_iovs = task->diovs;
311 	task->offset = offset;
312 
313 	if ((rc = dfs_read(ch->dfs, ch->obj, &task->sgl, offset, &task->read_size, &task->ev))) {
314 		SPDK_ERRLOG("%s: could not start async read: %s\n",
315 			    daos->disk.name, strerror(rc));
316 		daos_event_fini(&task->ev);
317 		return -rc;
318 	}
319 
320 	return nbytes;
321 }
322 
323 static void
324 bdev_daos_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
325 		     bool success)
326 {
327 	int64_t rc;
328 	struct bdev_daos_io_channel *dch = spdk_io_channel_get_ctx(ch);
329 
330 	if (!success) {
331 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
332 		return;
333 	}
334 
335 	rc = bdev_daos_readv((struct bdev_daos *)bdev_io->bdev->ctxt,
336 			     dch,
337 			     (struct bdev_daos_task *)bdev_io->driver_ctx,
338 			     bdev_io->u.bdev.iovs,
339 			     bdev_io->u.bdev.iovcnt,
340 			     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
341 			     bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
342 
343 	if (rc < 0) {
344 		spdk_bdev_io_complete_aio_status(bdev_io, rc);
345 		return;
346 	}
347 }
348 
349 static void
350 _bdev_daos_get_io_inflight(struct spdk_io_channel_iter *i)
351 {
352 	struct spdk_io_channel *ch = spdk_io_channel_iter_get_channel(i);
353 	struct bdev_daos_io_channel *dch = spdk_io_channel_get_ctx(ch);
354 	int io_inflight = daos_eq_query(dch->queue, DAOS_EQR_WAITING, 0, NULL);
355 
356 	if (io_inflight > 0) {
357 		spdk_for_each_channel_continue(i, -1);
358 		return;
359 	}
360 
361 	spdk_for_each_channel_continue(i, 0);
362 }
363 
364 static int bdev_daos_reset_retry_timer(void *arg);
365 
366 static void
367 _bdev_daos_get_io_inflight_done(struct spdk_io_channel_iter *i, int status)
368 {
369 	struct bdev_daos *daos = spdk_io_channel_iter_get_ctx(i);
370 
371 	if (status == -1) {
372 		daos->reset_retry_timer = SPDK_POLLER_REGISTER(bdev_daos_reset_retry_timer, daos, 1000);
373 		return;
374 	}
375 
376 	spdk_bdev_io_complete(spdk_bdev_io_from_ctx(daos->reset_task), SPDK_BDEV_IO_STATUS_SUCCESS);
377 }
378 
379 static int
380 bdev_daos_reset_retry_timer(void *arg)
381 {
382 	struct bdev_daos *daos = arg;
383 
384 	if (daos->reset_retry_timer) {
385 		spdk_poller_unregister(&daos->reset_retry_timer);
386 	}
387 
388 	spdk_for_each_channel(daos,
389 			      _bdev_daos_get_io_inflight,
390 			      daos,
391 			      _bdev_daos_get_io_inflight_done);
392 
393 	return SPDK_POLLER_BUSY;
394 }
395 
396 static void
397 bdev_daos_reset(struct bdev_daos *daos, struct bdev_daos_task *task)
398 {
399 	assert(daos != NULL);
400 	assert(task != NULL);
401 
402 	daos->reset_task = task;
403 	bdev_daos_reset_retry_timer(daos);
404 }
405 
406 
407 static int64_t
408 bdev_daos_unmap(struct bdev_daos_io_channel *ch, uint64_t nbytes,
409 		uint64_t offset)
410 {
411 	int rc = 0;
412 
413 	SPDK_DEBUGLOG(bdev_daos, "unmap at %#lx with size %#lx\n", offset, nbytes);
414 	if ((rc = dfs_punch(ch->dfs, ch->obj, offset, nbytes))) {
415 		return -rc;
416 	}
417 	return 0;
418 }
419 
420 static void
421 _bdev_daos_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
422 {
423 	struct bdev_daos_io_channel *dch = spdk_io_channel_get_ctx(ch);
424 
425 	int64_t rc;
426 	switch (bdev_io->type) {
427 	case SPDK_BDEV_IO_TYPE_READ:
428 		spdk_bdev_io_get_buf(bdev_io, bdev_daos_get_buf_cb,
429 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
430 		break;
431 
432 	case SPDK_BDEV_IO_TYPE_WRITE:
433 		rc = bdev_daos_writev((struct bdev_daos *)bdev_io->bdev->ctxt,
434 				      dch,
435 				      (struct bdev_daos_task *)bdev_io->driver_ctx,
436 				      bdev_io->u.bdev.iovs,
437 				      bdev_io->u.bdev.iovcnt,
438 				      bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
439 				      bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
440 		if (rc < 0) {
441 			spdk_bdev_io_complete_aio_status(bdev_io, rc);
442 			return;
443 		}
444 		break;
445 
446 	case SPDK_BDEV_IO_TYPE_RESET:
447 		/* Can't cancel in-flight requests, but can wait for their completions */
448 		bdev_daos_reset((struct bdev_daos *)bdev_io->bdev->ctxt,
449 				(struct bdev_daos_task *)bdev_io->driver_ctx);
450 		break;
451 
452 	case SPDK_BDEV_IO_TYPE_FLUSH:
453 		/* NOOP because DAOS requests land on PMEM and writes are persistent upon completion */
454 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
455 		break;
456 
457 	case SPDK_BDEV_IO_TYPE_UNMAP:
458 		rc = bdev_daos_unmap(dch,
459 				     bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen,
460 				     bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen);
461 		if (!rc) {
462 			spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_SUCCESS);
463 		} else {
464 			SPDK_DEBUGLOG(bdev_daos, "%s: could not unmap: %s",
465 				      dch->disk->disk.name, strerror(-rc));
466 			spdk_bdev_io_complete_aio_status(bdev_io, rc);
467 		}
468 
469 		break;
470 
471 	default:
472 		SPDK_ERRLOG("Wrong io type\n");
473 		spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
474 		break;
475 	}
476 }
477 
478 static void
479 bdev_daos_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
480 {
481 	struct bdev_daos_task *task = (struct bdev_daos_task *)bdev_io->driver_ctx;
482 	struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
483 
484 	assert(task != NULL);
485 
486 	task->submit_td = submit_td;
487 	task->bdev_io = bdev_io;
488 
489 	_bdev_daos_submit_request(ch, bdev_io);
490 }
491 
492 #define POLLING_EVENTS_NUM 64
493 
494 static int
495 bdev_daos_channel_poll(void *arg)
496 {
497 	daos_event_t *evp[POLLING_EVENTS_NUM];
498 	struct bdev_daos_io_channel *ch = arg;
499 
500 	assert(ch != NULL);
501 	assert(ch->disk != NULL);
502 
503 	int rc = daos_eq_poll(ch->queue, 0, DAOS_EQ_NOWAIT,
504 			      POLLING_EVENTS_NUM, evp);
505 
506 	if (rc < 0) {
507 		SPDK_DEBUGLOG(bdev_daos, "%s: could not poll daos event queue: " DF_RC "\n",
508 			      ch->disk->disk.name, DP_RC(rc));
509 		/*
510 		 * TODO: There are cases when this is self healing, e.g.
511 		 * brief network issues, DAOS agent restarting etc.
512 		 * However, if the issue persists over some time better would be
513 		 * to remove a bdev or the whole controller
514 		 */
515 		return SPDK_POLLER_BUSY;
516 	}
517 
518 	for (int i = 0; i < rc; ++i) {
519 		int status = 0;
520 		struct bdev_daos_task *task = SPDK_CONTAINEROF(evp[i], struct bdev_daos_task, ev);
521 
522 		assert(task != NULL);
523 
524 		if (task->ev.ev_error != DER_SUCCESS) {
525 			status = -task->ev.ev_error;
526 		}
527 
528 		daos_event_fini(&task->ev);
529 		bdev_daos_io_complete(task->bdev_io, status);
530 	}
531 
532 	return rc > 0 ? SPDK_POLLER_BUSY : SPDK_POLLER_IDLE;
533 }
534 
535 static bool
536 bdev_daos_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
537 {
538 	switch (io_type) {
539 	case SPDK_BDEV_IO_TYPE_READ:
540 	case SPDK_BDEV_IO_TYPE_WRITE:
541 	case SPDK_BDEV_IO_TYPE_RESET:
542 	case SPDK_BDEV_IO_TYPE_FLUSH:
543 	case SPDK_BDEV_IO_TYPE_UNMAP:
544 		return true;
545 
546 	default:
547 		return false;
548 	}
549 }
550 
551 static struct spdk_io_channel *
552 bdev_daos_get_io_channel(void *ctx)
553 {
554 	return spdk_get_io_channel(ctx);
555 }
556 
557 static void
558 bdev_daos_write_json_config(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
559 {
560 	struct bdev_daos *daos = bdev->ctxt;
561 
562 	spdk_json_write_object_begin(w);
563 
564 	spdk_json_write_named_string(w, "method", "bdev_daos_create");
565 
566 	spdk_json_write_named_object_begin(w, "params");
567 	spdk_json_write_named_string(w, "name", bdev->name);
568 	spdk_json_write_named_string(w, "pool", daos->pool_name);
569 	spdk_json_write_named_string(w, "cont", daos->cont_name);
570 	spdk_json_write_named_uint64(w, "num_blocks", bdev->blockcnt);
571 	spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
572 	spdk_json_write_named_uuid(w, "uuid", &bdev->uuid);
573 
574 	spdk_json_write_object_end(w);
575 
576 	spdk_json_write_object_end(w);
577 }
578 
579 static const struct spdk_bdev_fn_table daos_fn_table = {
580 	.destruct		= bdev_daos_destruct,
581 	.submit_request		= bdev_daos_submit_request,
582 	.io_type_supported	= bdev_daos_io_type_supported,
583 	.get_io_channel		= bdev_daos_get_io_channel,
584 	.write_config_json	= bdev_daos_write_json_config,
585 };
586 
587 static int
588 bdev_daos_io_channel_setup_daos(struct bdev_daos_io_channel *ch)
589 {
590 	int rc = 0;
591 	struct bdev_daos *daos = ch->disk;
592 	daos_pool_info_t pinfo;
593 	daos_cont_info_t cinfo;
594 
595 	int fd_oflag = O_CREAT | O_RDWR;
596 	mode_t mode = S_IFREG | S_IRWXU | S_IRWXG | S_IRWXO;
597 
598 	rc = bdev_daos_get_engine();
599 	if (rc) {
600 		SPDK_ERRLOG("could not initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
601 		return -daos2posix_errno(rc);
602 	}
603 
604 	SPDK_DEBUGLOG(bdev_daos, "connecting to daos pool '%s'\n", daos->pool_name);
605 	if ((rc = daos_pool_connect(daos->pool_name, NULL, DAOS_PC_RW, &ch->pool, &pinfo, NULL))) {
606 		SPDK_ERRLOG("%s: could not connect to daos pool: " DF_RC "\n",
607 			    daos->disk.name, DP_RC(rc));
608 		return -daos2posix_errno(rc);
609 	}
610 	SPDK_DEBUGLOG(bdev_daos, "connecting to daos container '%s'\n", daos->cont_name);
611 	if ((rc = daos_cont_open(ch->pool, daos->cont_name, DAOS_COO_RW, &ch->cont, &cinfo, NULL))) {
612 		SPDK_ERRLOG("%s: could not open daos container: " DF_RC "\n",
613 			    daos->disk.name, DP_RC(rc));
614 		rc = daos2posix_errno(rc);
615 		goto cleanup_pool;
616 	}
617 	SPDK_DEBUGLOG(bdev_daos, "mounting daos dfs\n");
618 	if ((rc = dfs_mount(ch->pool, ch->cont, O_RDWR, &ch->dfs))) {
619 		SPDK_ERRLOG("%s: could not mount daos dfs: %s\n", daos->disk.name, strerror(rc));
620 		goto cleanup_cont;
621 	}
622 	SPDK_DEBUGLOG(bdev_daos, "opening dfs object\n");
623 	if ((rc = dfs_open(ch->dfs, NULL, daos->disk.name, mode, fd_oflag, daos->oclass,
624 			   0, NULL, &ch->obj))) {
625 		SPDK_ERRLOG("%s: could not open dfs object: %s\n", daos->disk.name, strerror(rc));
626 		goto cleanup_mount;
627 	}
628 	if ((rc = daos_eq_create(&ch->queue))) {
629 		SPDK_ERRLOG("%s: could not create daos event queue: " DF_RC "\n",
630 			    daos->disk.name, DP_RC(rc));
631 		rc = daos2posix_errno(rc);
632 		goto cleanup_obj;
633 	}
634 
635 	return 0;
636 
637 cleanup_obj:
638 	dfs_release(ch->obj);
639 cleanup_mount:
640 	dfs_umount(ch->dfs);
641 cleanup_cont:
642 	daos_cont_close(ch->cont, NULL);
643 cleanup_pool:
644 	daos_pool_disconnect(ch->pool, NULL);
645 
646 	return -rc;
647 }
648 
649 static int
650 bdev_daos_io_channel_create_cb(void *io_device, void *ctx_buf)
651 {
652 	int rc;
653 	struct bdev_daos_io_channel *ch = ctx_buf;
654 
655 	ch->disk = io_device;
656 
657 	if ((rc = bdev_daos_io_channel_setup_daos(ch))) {
658 		return rc;
659 	}
660 
661 	SPDK_DEBUGLOG(bdev_daos, "%s: starting daos event queue poller\n",
662 		      ch->disk->disk.name);
663 
664 	ch->poller = SPDK_POLLER_REGISTER(bdev_daos_channel_poll, ch, 0);
665 
666 	return 0;
667 }
668 
669 static void
670 bdev_daos_io_channel_destroy_cb(void *io_device, void *ctx_buf)
671 {
672 	int rc;
673 	struct bdev_daos_io_channel *ch = ctx_buf;
674 
675 	SPDK_DEBUGLOG(bdev_daos, "stopping daos event queue poller\n");
676 
677 	spdk_poller_unregister(&ch->poller);
678 
679 	if ((rc = daos_eq_destroy(ch->queue, DAOS_EQ_DESTROY_FORCE))) {
680 		SPDK_ERRLOG("could not destroy daos event queue: " DF_RC "\n", DP_RC(rc));
681 	}
682 	if ((rc = dfs_release(ch->obj))) {
683 		SPDK_ERRLOG("could not release dfs object: %s\n", strerror(rc));
684 	}
685 	if ((rc = dfs_umount(ch->dfs))) {
686 		SPDK_ERRLOG("could not unmount dfs: %s\n", strerror(rc));
687 	}
688 	if ((rc = daos_cont_close(ch->cont, NULL))) {
689 		SPDK_ERRLOG("could not close container: " DF_RC "\n", DP_RC(rc));
690 	}
691 	if ((rc = daos_pool_disconnect(ch->pool, NULL))) {
692 		SPDK_ERRLOG("could not disconnect from pool: " DF_RC "\n", DP_RC(rc));
693 	}
694 	rc = bdev_daos_put_engine();
695 	if (rc) {
696 		SPDK_ERRLOG("could not de-initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
697 	}
698 }
699 
700 int
701 create_bdev_daos(struct spdk_bdev **bdev,
702 		 const char *name, const struct spdk_uuid *uuid,
703 		 const char *pool, const char *cont, const char *oclass,
704 		 uint64_t num_blocks, uint32_t block_size)
705 {
706 	int rc;
707 	size_t len;
708 	struct bdev_daos *daos;
709 	struct bdev_daos_io_channel ch = {};
710 
711 	SPDK_NOTICELOG("%s: creating bdev_daos disk on '%s:%s'\n", name, pool, cont);
712 
713 	if (num_blocks == 0) {
714 		SPDK_ERRLOG("Disk num_blocks must be greater than 0");
715 		return -EINVAL;
716 	}
717 
718 	if (block_size % 512) {
719 		SPDK_ERRLOG("block size must be 512 bytes aligned\n");
720 		return -EINVAL;
721 	}
722 
723 	if (!name) {
724 		SPDK_ERRLOG("device name cannot be empty\n");
725 		return -EINVAL;
726 	}
727 
728 	if (!pool) {
729 		SPDK_ERRLOG("daos pool cannot be empty\n");
730 		return -EINVAL;
731 	}
732 	if (!cont) {
733 		SPDK_ERRLOG("daos cont cannot be empty\n");
734 		return -EINVAL;
735 	}
736 
737 	daos = calloc(1, sizeof(*daos));
738 	if (!daos) {
739 		SPDK_ERRLOG("calloc() failed\n");
740 		return -ENOMEM;
741 	}
742 
743 	if (!oclass) {
744 		oclass = "SX"; /* Max throughput by default */
745 	}
746 	daos->oclass = daos_oclass_name2id(oclass);
747 	if (daos->oclass == OC_UNKNOWN) {
748 		SPDK_ERRLOG("could not parse daos oclass: '%s'\n", oclass);
749 		free(daos);
750 		return -EINVAL;
751 	}
752 
753 	len = strlen(pool);
754 	if (len > DAOS_PROP_LABEL_MAX_LEN) {
755 		SPDK_ERRLOG("daos pool name is too long\n");
756 		free(daos);
757 		return -EINVAL;
758 	}
759 	memcpy(daos->pool_name, pool, len);
760 
761 	len = strlen(cont);
762 	if (len > DAOS_PROP_LABEL_MAX_LEN) {
763 		SPDK_ERRLOG("daos cont name is too long\n");
764 		free(daos);
765 		return -EINVAL;
766 	}
767 	memcpy(daos->cont_name, cont, len);
768 
769 	daos->disk.name = strdup(name);
770 	daos->disk.product_name = "DAOS bdev";
771 
772 	daos->disk.write_cache = 0;
773 	daos->disk.blocklen = block_size;
774 	daos->disk.blockcnt = num_blocks;
775 	daos->disk.uuid = *uuid;
776 	daos->disk.max_num_segments = SPDK_BDEV_IO_NUM_CHILD_IOV;
777 
778 	daos->disk.ctxt = daos;
779 	daos->disk.fn_table = &daos_fn_table;
780 	daos->disk.module = &daos_if;
781 
782 	rc = bdev_daos_get_engine();
783 	if (rc) {
784 		SPDK_ERRLOG("could not initialize DAOS engine: " DF_RC "\n", DP_RC(rc));
785 		bdev_daos_free(daos);
786 		return -daos2posix_errno(rc);
787 	}
788 
789 	/* We try to connect to the DAOS container during channel creation, so simulate
790 	 * creating a channel here, so that we can return a failure when the DAOS bdev
791 	 * is created, instead of finding it out later when the first channel is created
792 	 * and leaving unusable bdev registered.
793 	 */
794 	rc = bdev_daos_io_channel_create_cb(daos, &ch);
795 	if (rc) {
796 		SPDK_ERRLOG("'%s' could not initialize io-channel: %s\n", name, strerror(-rc));
797 		bdev_daos_free(daos);
798 		return rc;
799 	}
800 	bdev_daos_io_channel_destroy_cb(daos, &ch);
801 
802 	spdk_io_device_register(daos, bdev_daos_io_channel_create_cb,
803 				bdev_daos_io_channel_destroy_cb,
804 				sizeof(struct bdev_daos_io_channel),
805 				daos->disk.name);
806 
807 
808 	rc = spdk_bdev_register(&daos->disk);
809 	if (rc) {
810 		spdk_io_device_unregister(daos, NULL);
811 		bdev_daos_free(daos);
812 		return rc;
813 	}
814 
815 	*bdev = &(daos->disk);
816 
817 	return rc;
818 }
819 
820 static void
821 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
822 {
823 }
824 
825 int
826 bdev_daos_resize(const char *name, const uint64_t new_size_in_mb)
827 {
828 	int rc = 0;
829 	struct spdk_bdev_desc *desc;
830 	struct spdk_bdev *bdev;
831 	struct spdk_io_channel *ch;
832 	struct bdev_daos_io_channel *dch;
833 	uint64_t new_size_in_byte;
834 	uint64_t current_size_in_mb;
835 
836 	rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
837 	if (rc != 0) {
838 		return rc;
839 	}
840 
841 	bdev = spdk_bdev_desc_get_bdev(desc);
842 	if (bdev->module != &daos_if) {
843 		rc = -EINVAL;
844 		goto exit;
845 	}
846 
847 	current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
848 	if (current_size_in_mb > new_size_in_mb) {
849 		SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
850 		rc = -EINVAL;
851 		goto exit;
852 	}
853 
854 	ch = bdev_daos_get_io_channel(bdev);
855 	dch = spdk_io_channel_get_ctx(ch);
856 	new_size_in_byte = new_size_in_mb * 1024 * 1024;
857 
858 	rc = dfs_punch(dch->dfs, dch->obj, new_size_in_byte, DFS_MAX_FSIZE);
859 	spdk_put_io_channel(ch);
860 	if (rc != 0) {
861 		SPDK_ERRLOG("failed to resize daos bdev: %s", strerror(rc));
862 		rc = -rc;
863 		goto exit;
864 	}
865 
866 	SPDK_NOTICELOG("DAOS bdev device is resized: bdev name %s, old block count %" PRIu64
867 		       ", new block count %"
868 		       PRIu64 "\n",
869 		       bdev->name,
870 		       bdev->blockcnt,
871 		       new_size_in_byte / bdev->blocklen);
872 	rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
873 	if (rc != 0) {
874 		SPDK_ERRLOG("failed to notify block cnt change.\n");
875 	}
876 
877 exit:
878 	spdk_bdev_close(desc);
879 	return rc;
880 }
881 
882 void
883 delete_bdev_daos(const char *bdev_name, spdk_bdev_unregister_cb cb_fn, void *cb_arg)
884 {
885 	int rc;
886 
887 	rc = spdk_bdev_unregister_by_name(bdev_name, &daos_if, cb_fn, cb_arg);
888 	if (rc != 0) {
889 		cb_fn(cb_arg, rc);
890 	}
891 }
892 
893 static int
894 bdev_daos_get_engine(void)
895 {
896 	int rc = 0;
897 
898 	pthread_mutex_lock(&g_bdev_daos_init_mutex);
899 	if (g_bdev_daos_init_count++ > 0) {
900 		pthread_mutex_unlock(&g_bdev_daos_init_mutex);
901 		return 0;
902 	}
903 	SPDK_DEBUGLOG(bdev_daos, "initializing DAOS engine\n");
904 
905 	rc = daos_init();
906 	pthread_mutex_unlock(&g_bdev_daos_init_mutex);
907 
908 	if (rc != -DER_ALREADY && rc) {
909 		return rc;
910 	}
911 	return 0;
912 }
913 
914 static int
915 bdev_daos_put_engine(void)
916 {
917 	int rc = 0;
918 
919 	pthread_mutex_lock(&g_bdev_daos_init_mutex);
920 	if (--g_bdev_daos_init_count > 0) {
921 		pthread_mutex_unlock(&g_bdev_daos_init_mutex);
922 		return 0;
923 	}
924 	SPDK_DEBUGLOG(bdev_daos, "de-initializing DAOS engine\n");
925 
926 	rc = daos_fini();
927 	pthread_mutex_unlock(&g_bdev_daos_init_mutex);
928 
929 	return rc;
930 }
931 
932 static int
933 bdev_daos_initialize(void)
934 {
935 	/* DAOS engine and client initialization happens
936 	   during the first bdev creation */
937 	return 0;
938 }
939 
940 SPDK_LOG_REGISTER_COMPONENT(bdev_daos)
941