1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (C) 2017 Intel Corporation.
3 * All rights reserved.
4 */
5
6 #include "spdk/stdinc.h"
7
8 #include "bdev_rbd.h"
9
10 #include <rbd/librbd.h>
11 #include <rados/librados.h>
12
13 #include "spdk/env.h"
14 #include "spdk/bdev.h"
15 #include "spdk/thread.h"
16 #include "spdk/json.h"
17 #include "spdk/string.h"
18 #include "spdk/util.h"
19 #include "spdk/likely.h"
20
21 #include "spdk/bdev_module.h"
22 #include "spdk/log.h"
23
24 static int bdev_rbd_count = 0;
25
26 struct bdev_rbd_pool_ctx {
27 rados_t *cluster_p;
28 char *name;
29 rados_ioctx_t io_ctx;
30 uint32_t ref;
31 STAILQ_ENTRY(bdev_rbd_pool_ctx) link;
32 };
33
34 static STAILQ_HEAD(, bdev_rbd_pool_ctx) g_map_bdev_rbd_pool_ctx = STAILQ_HEAD_INITIALIZER(
35 g_map_bdev_rbd_pool_ctx);
36
37 struct bdev_rbd {
38 struct spdk_bdev disk;
39 char *rbd_name;
40 char *user_id;
41 char *pool_name;
42 char **config;
43
44 rados_t cluster;
45 rados_t *cluster_p;
46 char *cluster_name;
47
48 union rbd_ctx {
49 rados_ioctx_t io_ctx;
50 struct bdev_rbd_pool_ctx *ctx;
51 } rados_ctx;
52
53 rbd_image_t image;
54
55 rbd_image_info_t info;
56 struct spdk_thread *destruct_td;
57
58 TAILQ_ENTRY(bdev_rbd) tailq;
59 struct spdk_poller *reset_timer;
60 struct spdk_bdev_io *reset_bdev_io;
61
62 uint64_t rbd_watch_handle;
63 };
64
65 struct bdev_rbd_io_channel {
66 struct bdev_rbd *disk;
67 struct spdk_io_channel *group_ch;
68 };
69
70 struct bdev_rbd_io {
71 struct spdk_thread *submit_td;
72 enum spdk_bdev_io_status status;
73 rbd_completion_t comp;
74 size_t total_len;
75 };
76
77 struct bdev_rbd_cluster {
78 char *name;
79 char *user_id;
80 char **config_param;
81 char *config_file;
82 char *key_file;
83 char *core_mask;
84 rados_t cluster;
85 uint32_t ref;
86 STAILQ_ENTRY(bdev_rbd_cluster) link;
87 };
88
89 static STAILQ_HEAD(, bdev_rbd_cluster) g_map_bdev_rbd_cluster = STAILQ_HEAD_INITIALIZER(
90 g_map_bdev_rbd_cluster);
91 static pthread_mutex_t g_map_bdev_rbd_cluster_mutex = PTHREAD_MUTEX_INITIALIZER;
92
93 static struct spdk_io_channel *bdev_rbd_get_io_channel(void *ctx);
94
95 static void
_rbd_update_callback(void * arg)96 _rbd_update_callback(void *arg)
97 {
98 struct bdev_rbd *rbd = arg;
99 uint64_t current_size_in_bytes = 0;
100 int rc;
101
102 rc = rbd_get_size(rbd->image, ¤t_size_in_bytes);
103 if (rc < 0) {
104 SPDK_ERRLOG("Failed getting size %d\n", rc);
105 return;
106 }
107
108 rc = spdk_bdev_notify_blockcnt_change(&rbd->disk, current_size_in_bytes / rbd->disk.blocklen);
109 if (rc != 0) {
110 SPDK_ERRLOG("failed to notify block cnt change.\n");
111 }
112 }
113
114 static void
rbd_update_callback(void * arg)115 rbd_update_callback(void *arg)
116 {
117 spdk_thread_send_msg(spdk_thread_get_app_thread(), _rbd_update_callback, arg);
118 }
119
120 static void
bdev_rbd_cluster_free(struct bdev_rbd_cluster * entry)121 bdev_rbd_cluster_free(struct bdev_rbd_cluster *entry)
122 {
123 assert(entry != NULL);
124
125 bdev_rbd_free_config(entry->config_param);
126 free(entry->config_file);
127 free(entry->key_file);
128 free(entry->user_id);
129 free(entry->name);
130 free(entry->core_mask);
131 free(entry);
132 }
133
134 static void
bdev_rbd_put_cluster(rados_t ** cluster)135 bdev_rbd_put_cluster(rados_t **cluster)
136 {
137 struct bdev_rbd_cluster *entry;
138
139 assert(cluster != NULL);
140
141 /* No need go through the map if *cluster equals to NULL */
142 if (*cluster == NULL) {
143 return;
144 }
145
146 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
147 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
148 if (*cluster != &entry->cluster) {
149 continue;
150 }
151
152 assert(entry->ref > 0);
153 entry->ref--;
154 *cluster = NULL;
155 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
156 return;
157 }
158
159 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
160 SPDK_ERRLOG("Cannot find the entry for cluster=%p\n", cluster);
161 }
162
163 static void
bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx * entry)164 bdev_rbd_put_pool_ctx(struct bdev_rbd_pool_ctx *entry)
165 {
166 assert(spdk_get_thread() == spdk_thread_get_app_thread());
167
168 assert(entry != NULL);
169 assert(entry->ref > 0);
170 entry->ref--;
171 if (entry->ref == 0) {
172 STAILQ_REMOVE(&g_map_bdev_rbd_pool_ctx, entry, bdev_rbd_pool_ctx, link);
173 rados_ioctx_destroy(entry->io_ctx);
174 free(entry->name);
175 free(entry);
176 }
177 }
178
179 static void
bdev_rbd_free(struct bdev_rbd * rbd)180 bdev_rbd_free(struct bdev_rbd *rbd)
181 {
182 if (!rbd) {
183 return;
184 }
185
186 if (rbd->image) {
187 rbd_update_unwatch(rbd->image, rbd->rbd_watch_handle);
188 rbd_flush(rbd->image);
189 rbd_close(rbd->image);
190 }
191
192 free(rbd->disk.name);
193 free(rbd->rbd_name);
194 free(rbd->user_id);
195 free(rbd->pool_name);
196 bdev_rbd_free_config(rbd->config);
197
198 if (rbd->cluster_name) {
199 /* When rbd is destructed by bdev_rbd_destruct, it will not enter here
200 * because the ctx will already freed by bdev_rbd_free_cb in async manner.
201 * This path only happens during the rbd initialization procedure of rbd */
202 if (rbd->rados_ctx.ctx) {
203 bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx);
204 rbd->rados_ctx.ctx = NULL;
205 }
206
207 bdev_rbd_put_cluster(&rbd->cluster_p);
208 free(rbd->cluster_name);
209 } else if (rbd->cluster) {
210 if (rbd->rados_ctx.io_ctx) {
211 rados_ioctx_destroy(rbd->rados_ctx.io_ctx);
212 }
213 rados_shutdown(rbd->cluster);
214 }
215
216 free(rbd);
217 }
218
219 void
bdev_rbd_free_config(char ** config)220 bdev_rbd_free_config(char **config)
221 {
222 char **entry;
223
224 if (config) {
225 for (entry = config; *entry; entry++) {
226 free(*entry);
227 }
228 free(config);
229 }
230 }
231
232 char **
bdev_rbd_dup_config(const char * const * config)233 bdev_rbd_dup_config(const char *const *config)
234 {
235 size_t count;
236 char **copy;
237
238 if (!config) {
239 return NULL;
240 }
241 for (count = 0; config[count]; count++) {}
242 copy = calloc(count + 1, sizeof(*copy));
243 if (!copy) {
244 return NULL;
245 }
246 for (count = 0; config[count]; count++) {
247 if (!(copy[count] = strdup(config[count]))) {
248 bdev_rbd_free_config(copy);
249 return NULL;
250 }
251 }
252 return copy;
253 }
254
255 static int
bdev_rados_cluster_init(const char * user_id,const char * const * config,rados_t * cluster)256 bdev_rados_cluster_init(const char *user_id, const char *const *config,
257 rados_t *cluster)
258 {
259 int ret;
260
261 ret = rados_create(cluster, user_id);
262 if (ret < 0) {
263 SPDK_ERRLOG("Failed to create rados_t struct\n");
264 return -1;
265 }
266
267 if (config) {
268 const char *const *entry = config;
269 while (*entry) {
270 ret = rados_conf_set(*cluster, entry[0], entry[1]);
271 if (ret < 0) {
272 SPDK_ERRLOG("Failed to set %s = %s\n", entry[0], entry[1]);
273 rados_shutdown(*cluster);
274 *cluster = NULL;
275 return -1;
276 }
277 entry += 2;
278 }
279 } else {
280 ret = rados_conf_read_file(*cluster, NULL);
281 if (ret < 0) {
282 SPDK_ERRLOG("Failed to read conf file\n");
283 rados_shutdown(*cluster);
284 *cluster = NULL;
285 return -1;
286 }
287 }
288
289 ret = rados_connect(*cluster);
290 if (ret < 0) {
291 SPDK_ERRLOG("Failed to connect to rbd_pool\n");
292 rados_shutdown(*cluster);
293 *cluster = NULL;
294 return -1;
295 }
296
297 return 0;
298 }
299
300 static int
bdev_rbd_get_cluster(const char * cluster_name,rados_t ** cluster)301 bdev_rbd_get_cluster(const char *cluster_name, rados_t **cluster)
302 {
303 struct bdev_rbd_cluster *entry;
304
305 if (cluster == NULL) {
306 SPDK_ERRLOG("cluster should not be NULL\n");
307 return -1;
308 }
309
310 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
311 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
312 if (strcmp(cluster_name, entry->name) == 0) {
313 entry->ref++;
314 *cluster = &entry->cluster;
315 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
316 return 0;
317 }
318 }
319
320 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
321 return -1;
322 }
323
324 static int
bdev_rbd_shared_cluster_init(const char * cluster_name,rados_t ** cluster)325 bdev_rbd_shared_cluster_init(const char *cluster_name, rados_t **cluster)
326 {
327 int ret;
328
329 ret = bdev_rbd_get_cluster(cluster_name, cluster);
330 if (ret < 0) {
331 SPDK_ERRLOG("Failed to create rados_t struct\n");
332 return -1;
333 }
334
335 return ret;
336 }
337
338 static void *
bdev_rbd_cluster_handle(void * arg)339 bdev_rbd_cluster_handle(void *arg)
340 {
341 void *ret = arg;
342 struct bdev_rbd *rbd = arg;
343 int rc;
344
345 rc = bdev_rados_cluster_init(rbd->user_id, (const char *const *)rbd->config,
346 &rbd->cluster);
347 if (rc < 0) {
348 SPDK_ERRLOG("Failed to create rados cluster for user_id=%s and rbd_pool=%s\n",
349 rbd->user_id ? rbd->user_id : "admin (the default)", rbd->pool_name);
350 ret = NULL;
351 }
352
353 return ret;
354 }
355
356 static int
bdev_rbd_get_pool_ctx(rados_t * cluster_p,const char * name,struct bdev_rbd_pool_ctx ** ctx)357 bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name, struct bdev_rbd_pool_ctx **ctx)
358 {
359 struct bdev_rbd_pool_ctx *entry;
360
361 assert(spdk_get_thread() == spdk_thread_get_app_thread());
362
363 if (name == NULL || ctx == NULL) {
364 return -1;
365 }
366
367 STAILQ_FOREACH(entry, &g_map_bdev_rbd_pool_ctx, link) {
368 if (strcmp(name, entry->name) == 0 && cluster_p == entry->cluster_p) {
369 entry->ref++;
370 *ctx = entry;
371 return 0;
372 }
373 }
374
375 entry = calloc(1, sizeof(*entry));
376 if (!entry) {
377 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
378 return -1;
379 }
380
381 entry->name = strdup(name);
382 if (entry->name == NULL) {
383 SPDK_ERRLOG("Failed to allocate the name =%s space on entry =%p\n", name, entry);
384 goto err_handle;
385 }
386
387 if (rados_ioctx_create(*cluster_p, name, &entry->io_ctx) < 0) {
388 goto err_handle1;
389 }
390
391 entry->cluster_p = cluster_p;
392 entry->ref = 1;
393 *ctx = entry;
394 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_pool_ctx, entry, link);
395
396 return 0;
397
398 err_handle1:
399 free(entry->name);
400 err_handle:
401 free(entry);
402
403 return -1;
404 }
405
406 static void *
bdev_rbd_init_context(void * arg)407 bdev_rbd_init_context(void *arg)
408 {
409 struct bdev_rbd *rbd = arg;
410 int rc;
411 rados_ioctx_t *io_ctx = NULL;
412
413 if (rbd->cluster_name) {
414 if (bdev_rbd_get_pool_ctx(rbd->cluster_p, rbd->pool_name, &rbd->rados_ctx.ctx) < 0) {
415 SPDK_ERRLOG("Failed to create ioctx on rbd=%p with cluster_name=%s\n",
416 rbd, rbd->cluster_name);
417 return NULL;
418 }
419 io_ctx = &rbd->rados_ctx.ctx->io_ctx;
420 } else {
421 if (rados_ioctx_create(*(rbd->cluster_p), rbd->pool_name, &rbd->rados_ctx.io_ctx) < 0) {
422 SPDK_ERRLOG("Failed to create ioctx on rbd=%p\n", rbd);
423 return NULL;
424 }
425 io_ctx = &rbd->rados_ctx.io_ctx;
426 }
427
428 assert(io_ctx != NULL);
429 rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
430 if (rc < 0) {
431 SPDK_ERRLOG("Failed to open specified rbd device\n");
432 return NULL;
433 }
434
435 rc = rbd_update_watch(rbd->image, &rbd->rbd_watch_handle, rbd_update_callback, (void *)rbd);
436 if (rc < 0) {
437 SPDK_ERRLOG("Failed to set up watch %d\n", rc);
438 }
439
440 rc = rbd_stat(rbd->image, &rbd->info, sizeof(rbd->info));
441 if (rc < 0) {
442 SPDK_ERRLOG("Failed to stat specified rbd device\n");
443 return NULL;
444 }
445
446 return arg;
447 }
448
449 static int
bdev_rbd_init(struct bdev_rbd * rbd)450 bdev_rbd_init(struct bdev_rbd *rbd)
451 {
452 int ret = 0;
453
454 if (!rbd->cluster_name) {
455 rbd->cluster_p = &rbd->cluster;
456 /* Cluster should be created in non-SPDK thread to avoid conflict between
457 * Rados and SPDK thread */
458 if (spdk_call_unaffinitized(bdev_rbd_cluster_handle, rbd) == NULL) {
459 SPDK_ERRLOG("Cannot create the rados object on rbd=%p\n", rbd);
460 return -1;
461 }
462 } else {
463 ret = bdev_rbd_shared_cluster_init(rbd->cluster_name, &rbd->cluster_p);
464 if (ret < 0) {
465 SPDK_ERRLOG("Failed to create rados object for rbd =%p on cluster_name=%s\n",
466 rbd, rbd->cluster_name);
467 return -1;
468 }
469 }
470
471 if (spdk_call_unaffinitized(bdev_rbd_init_context, rbd) == NULL) {
472 SPDK_ERRLOG("Cannot init rbd context for rbd=%p\n", rbd);
473 return -1;
474 }
475
476 return ret;
477 }
478
479 static void
_bdev_rbd_io_complete(void * _rbd_io)480 _bdev_rbd_io_complete(void *_rbd_io)
481 {
482 struct bdev_rbd_io *rbd_io = _rbd_io;
483
484 spdk_bdev_io_complete(spdk_bdev_io_from_ctx(rbd_io), rbd_io->status);
485 }
486
487 static void
bdev_rbd_io_complete(struct spdk_bdev_io * bdev_io,enum spdk_bdev_io_status status)488 bdev_rbd_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status)
489 {
490 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
491 struct spdk_thread *current_thread = spdk_get_thread();
492
493 rbd_io->status = status;
494 assert(rbd_io->submit_td != NULL);
495 if (rbd_io->submit_td != current_thread) {
496 spdk_thread_send_msg(rbd_io->submit_td, _bdev_rbd_io_complete, rbd_io);
497 } else {
498 _bdev_rbd_io_complete(rbd_io);
499 }
500 }
501
502 static void
bdev_rbd_finish_aiocb(rbd_completion_t cb,void * arg)503 bdev_rbd_finish_aiocb(rbd_completion_t cb, void *arg)
504 {
505 int io_status;
506 struct spdk_bdev_io *bdev_io;
507 struct bdev_rbd_io *rbd_io;
508 enum spdk_bdev_io_status bio_status;
509
510 bdev_io = rbd_aio_get_arg(cb);
511 rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
512 io_status = rbd_aio_get_return_value(cb);
513 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
514
515 if (bdev_io->type == SPDK_BDEV_IO_TYPE_READ) {
516 if ((int)rbd_io->total_len != io_status) {
517 bio_status = SPDK_BDEV_IO_STATUS_FAILED;
518 }
519 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
520 } else if (bdev_io->type == SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE && io_status == -EILSEQ) {
521 bio_status = SPDK_BDEV_IO_STATUS_MISCOMPARE;
522 #endif
523 } else if (io_status != 0) { /* For others, 0 means success */
524 bio_status = SPDK_BDEV_IO_STATUS_FAILED;
525 }
526
527 rbd_aio_release(cb);
528
529 bdev_rbd_io_complete(bdev_io, bio_status);
530 }
531
532 static void
_bdev_rbd_start_aio(struct bdev_rbd * disk,struct spdk_bdev_io * bdev_io,struct iovec * iov,int iovcnt,uint64_t offset,size_t len)533 _bdev_rbd_start_aio(struct bdev_rbd *disk, struct spdk_bdev_io *bdev_io,
534 struct iovec *iov, int iovcnt, uint64_t offset, size_t len)
535 {
536 int ret;
537 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
538 rbd_image_t image = disk->image;
539
540 ret = rbd_aio_create_completion(bdev_io, bdev_rbd_finish_aiocb,
541 &rbd_io->comp);
542 if (ret < 0) {
543 goto err;
544 }
545
546 switch (bdev_io->type) {
547 case SPDK_BDEV_IO_TYPE_READ:
548 rbd_io->total_len = len;
549 if (spdk_likely(iovcnt == 1)) {
550 ret = rbd_aio_read(image, offset, iov[0].iov_len, iov[0].iov_base,
551 rbd_io->comp);
552 } else {
553 ret = rbd_aio_readv(image, iov, iovcnt, offset, rbd_io->comp);
554 }
555 break;
556 case SPDK_BDEV_IO_TYPE_WRITE:
557 if (spdk_likely(iovcnt == 1)) {
558 ret = rbd_aio_write(image, offset, iov[0].iov_len, iov[0].iov_base,
559 rbd_io->comp);
560 } else {
561 ret = rbd_aio_writev(image, iov, iovcnt, offset, rbd_io->comp);
562 }
563 break;
564 case SPDK_BDEV_IO_TYPE_UNMAP:
565 ret = rbd_aio_discard(image, offset, len, rbd_io->comp);
566 break;
567 case SPDK_BDEV_IO_TYPE_FLUSH:
568 ret = rbd_aio_flush(image, rbd_io->comp);
569 break;
570 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
571 ret = rbd_aio_write_zeroes(image, offset, len, rbd_io->comp, /* zero_flags */ 0,
572 /* op_flags */ 0);
573 break;
574 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
575 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
576 ret = rbd_aio_compare_and_writev(image, offset, iov /* cmp */, iovcnt,
577 bdev_io->u.bdev.fused_iovs /* write */,
578 bdev_io->u.bdev.fused_iovcnt,
579 rbd_io->comp, NULL,
580 /* op_flags */ 0);
581 break;
582 #endif
583 default:
584 /* This should not happen.
585 * Function should only be called with supported io types in bdev_rbd_submit_request
586 */
587 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
588 ret = -ENOTSUP;
589 break;
590 }
591
592 if (ret < 0) {
593 rbd_aio_release(rbd_io->comp);
594 goto err;
595 }
596
597 return;
598
599 err:
600 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
601 }
602
603 static void
bdev_rbd_start_aio(void * ctx)604 bdev_rbd_start_aio(void *ctx)
605 {
606 struct spdk_bdev_io *bdev_io = ctx;
607 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
608
609 _bdev_rbd_start_aio(disk,
610 bdev_io,
611 bdev_io->u.bdev.iovs,
612 bdev_io->u.bdev.iovcnt,
613 bdev_io->u.bdev.offset_blocks * bdev_io->bdev->blocklen,
614 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
615 }
616
617 static int bdev_rbd_library_init(void);
618 static void bdev_rbd_library_fini(void);
619
620 static int
bdev_rbd_get_ctx_size(void)621 bdev_rbd_get_ctx_size(void)
622 {
623 return sizeof(struct bdev_rbd_io);
624 }
625
626 static struct spdk_bdev_module rbd_if = {
627 .name = "rbd",
628 .module_init = bdev_rbd_library_init,
629 .module_fini = bdev_rbd_library_fini,
630 .get_ctx_size = bdev_rbd_get_ctx_size,
631
632 };
633 SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
634
635 static int bdev_rbd_reset_timer(void *arg);
636
637 static void
bdev_rbd_check_outstanding_ios(struct spdk_bdev * bdev,uint64_t current_qd,void * cb_arg,int rc)638 bdev_rbd_check_outstanding_ios(struct spdk_bdev *bdev, uint64_t current_qd,
639 void *cb_arg, int rc)
640 {
641 struct bdev_rbd *disk = cb_arg;
642 enum spdk_bdev_io_status bio_status;
643
644 if (rc == 0 && current_qd > 0) {
645 disk->reset_timer = SPDK_POLLER_REGISTER(bdev_rbd_reset_timer, disk, 1000);
646 return;
647 }
648
649 if (rc != 0) {
650 bio_status = SPDK_BDEV_IO_STATUS_FAILED;
651 } else {
652 bio_status = SPDK_BDEV_IO_STATUS_SUCCESS;
653 }
654
655 bdev_rbd_io_complete(disk->reset_bdev_io, bio_status);
656 disk->reset_bdev_io = NULL;
657 }
658
659 static int
bdev_rbd_reset_timer(void * arg)660 bdev_rbd_reset_timer(void *arg)
661 {
662 struct bdev_rbd *disk = arg;
663
664 spdk_poller_unregister(&disk->reset_timer);
665
666 spdk_bdev_get_current_qd(&disk->disk, bdev_rbd_check_outstanding_ios, disk);
667
668 return SPDK_POLLER_BUSY;
669 }
670
671 static void
bdev_rbd_reset(void * ctx)672 bdev_rbd_reset(void *ctx)
673 {
674 struct spdk_bdev_io *bdev_io = ctx;
675 struct bdev_rbd *disk = (struct bdev_rbd *)bdev_io->bdev->ctxt;
676
677 /*
678 * HACK: Since librbd doesn't provide any way to cancel outstanding aio, just kick off a
679 * poller to wait for in-flight I/O to complete.
680 */
681 assert(disk->reset_bdev_io == NULL);
682 disk->reset_bdev_io = bdev_io;
683
684 bdev_rbd_reset_timer(disk);
685 }
686
687 static void
_bdev_rbd_destruct_done(void * io_device)688 _bdev_rbd_destruct_done(void *io_device)
689 {
690 struct bdev_rbd *rbd = io_device;
691
692 assert(rbd != NULL);
693
694 spdk_bdev_destruct_done(&rbd->disk, 0);
695 bdev_rbd_free(rbd);
696 }
697
698 static void
bdev_rbd_free_cb(void * io_device)699 bdev_rbd_free_cb(void *io_device)
700 {
701 struct bdev_rbd *rbd = io_device;
702
703 assert(spdk_get_thread() == spdk_thread_get_app_thread());
704
705 /* free the ctx */
706 if (rbd->cluster_name && rbd->rados_ctx.ctx) {
707 bdev_rbd_put_pool_ctx(rbd->rados_ctx.ctx);
708 rbd->rados_ctx.ctx = NULL;
709 }
710
711 /* The io device has been unregistered. Send a message back to the
712 * original thread that started the destruct operation, so that the
713 * bdev unregister callback is invoked on the same thread that started
714 * this whole process.
715 */
716 spdk_thread_send_msg(rbd->destruct_td, _bdev_rbd_destruct_done, rbd);
717 }
718
719 static void
_bdev_rbd_destruct(void * ctx)720 _bdev_rbd_destruct(void *ctx)
721 {
722 struct bdev_rbd *rbd = ctx;
723
724 spdk_io_device_unregister(rbd, bdev_rbd_free_cb);
725 }
726
727 static int
bdev_rbd_destruct(void * ctx)728 bdev_rbd_destruct(void *ctx)
729 {
730 struct bdev_rbd *rbd = ctx;
731
732 /* Start the destruct operation on the rbd bdev's
733 * main thread. This guarantees it will only start
734 * executing after any messages related to channel
735 * deletions have finished completing. *Always*
736 * send a message, even if this function gets called
737 * from the main thread, in case there are pending
738 * channel delete messages in flight to this thread.
739 */
740 assert(rbd->destruct_td == NULL);
741 rbd->destruct_td = spdk_get_thread();
742 spdk_thread_send_msg(spdk_thread_get_app_thread(), _bdev_rbd_destruct, rbd);
743
744 /* Return 1 to indicate the destruct path is asynchronous. */
745 return 1;
746 }
747
748 static void
bdev_rbd_get_buf_cb(struct spdk_io_channel * ch,struct spdk_bdev_io * bdev_io,bool success)749 bdev_rbd_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io,
750 bool success)
751 {
752 if (!success) {
753 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
754 return;
755 }
756
757 bdev_rbd_start_aio(bdev_io);
758 }
759
760 static void
bdev_rbd_submit_request(struct spdk_io_channel * ch,struct spdk_bdev_io * bdev_io)761 bdev_rbd_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
762 {
763 struct spdk_thread *submit_td = spdk_io_channel_get_thread(ch);
764 struct bdev_rbd_io *rbd_io = (struct bdev_rbd_io *)bdev_io->driver_ctx;
765
766 rbd_io->submit_td = submit_td;
767 switch (bdev_io->type) {
768 case SPDK_BDEV_IO_TYPE_READ:
769 spdk_bdev_io_get_buf(bdev_io, bdev_rbd_get_buf_cb,
770 bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
771 break;
772
773 case SPDK_BDEV_IO_TYPE_WRITE:
774 case SPDK_BDEV_IO_TYPE_UNMAP:
775 case SPDK_BDEV_IO_TYPE_FLUSH:
776 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
777 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
778 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
779 #endif
780 bdev_rbd_start_aio(bdev_io);
781 break;
782
783 case SPDK_BDEV_IO_TYPE_RESET:
784 spdk_thread_exec_msg(spdk_thread_get_app_thread(), bdev_rbd_reset, bdev_io);
785 break;
786
787 default:
788 SPDK_ERRLOG("Unsupported IO type =%d\n", bdev_io->type);
789 bdev_rbd_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
790 break;
791 }
792 }
793
794 static bool
bdev_rbd_io_type_supported(void * ctx,enum spdk_bdev_io_type io_type)795 bdev_rbd_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
796 {
797 switch (io_type) {
798 case SPDK_BDEV_IO_TYPE_READ:
799 case SPDK_BDEV_IO_TYPE_WRITE:
800 case SPDK_BDEV_IO_TYPE_UNMAP:
801 case SPDK_BDEV_IO_TYPE_FLUSH:
802 case SPDK_BDEV_IO_TYPE_RESET:
803 case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
804 #ifdef LIBRBD_SUPPORTS_COMPARE_AND_WRITE_IOVEC
805 case SPDK_BDEV_IO_TYPE_COMPARE_AND_WRITE:
806 #endif
807 return true;
808
809 default:
810 return false;
811 }
812 }
813
814 static int
bdev_rbd_create_cb(void * io_device,void * ctx_buf)815 bdev_rbd_create_cb(void *io_device, void *ctx_buf)
816 {
817 struct bdev_rbd_io_channel *ch = ctx_buf;
818 struct bdev_rbd *disk = io_device;
819
820 ch->disk = disk;
821 ch->group_ch = spdk_get_io_channel(&rbd_if);
822 assert(ch->group_ch != NULL);
823
824 return 0;
825 }
826
827 static void
bdev_rbd_destroy_cb(void * io_device,void * ctx_buf)828 bdev_rbd_destroy_cb(void *io_device, void *ctx_buf)
829 {
830 struct bdev_rbd_io_channel *ch = ctx_buf;
831
832 spdk_put_io_channel(ch->group_ch);
833 }
834
835 static struct spdk_io_channel *
bdev_rbd_get_io_channel(void * ctx)836 bdev_rbd_get_io_channel(void *ctx)
837 {
838 struct bdev_rbd *rbd_bdev = ctx;
839
840 return spdk_get_io_channel(rbd_bdev);
841 }
842
843 static void
bdev_rbd_cluster_dump_entry(const char * cluster_name,struct spdk_json_write_ctx * w)844 bdev_rbd_cluster_dump_entry(const char *cluster_name, struct spdk_json_write_ctx *w)
845 {
846 struct bdev_rbd_cluster *entry;
847
848 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
849 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
850 if (strcmp(cluster_name, entry->name)) {
851 continue;
852 }
853 if (entry->user_id) {
854 spdk_json_write_named_string(w, "user_id", entry->user_id);
855 }
856
857 if (entry->config_param) {
858 char **config_entry = entry->config_param;
859
860 spdk_json_write_named_object_begin(w, "config_param");
861 while (*config_entry) {
862 spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
863 config_entry += 2;
864 }
865 spdk_json_write_object_end(w);
866 }
867 if (entry->config_file) {
868 spdk_json_write_named_string(w, "config_file", entry->config_file);
869 }
870 if (entry->key_file) {
871 spdk_json_write_named_string(w, "key_file", entry->key_file);
872 }
873
874 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
875 return;
876 }
877
878 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
879 }
880
881 static int
bdev_rbd_dump_info_json(void * ctx,struct spdk_json_write_ctx * w)882 bdev_rbd_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
883 {
884 struct bdev_rbd *rbd_bdev = ctx;
885
886 spdk_json_write_named_object_begin(w, "rbd");
887
888 spdk_json_write_named_string(w, "pool_name", rbd_bdev->pool_name);
889
890 spdk_json_write_named_string(w, "rbd_name", rbd_bdev->rbd_name);
891
892 if (rbd_bdev->cluster_name) {
893 bdev_rbd_cluster_dump_entry(rbd_bdev->cluster_name, w);
894 goto end;
895 }
896
897 if (rbd_bdev->user_id) {
898 spdk_json_write_named_string(w, "user_id", rbd_bdev->user_id);
899 }
900
901 if (rbd_bdev->config) {
902 char **entry = rbd_bdev->config;
903
904 spdk_json_write_named_object_begin(w, "config");
905 while (*entry) {
906 spdk_json_write_named_string(w, entry[0], entry[1]);
907 entry += 2;
908 }
909 spdk_json_write_object_end(w);
910 }
911
912 end:
913 spdk_json_write_object_end(w);
914
915 return 0;
916 }
917
918 static void
bdev_rbd_write_config_json(struct spdk_bdev * bdev,struct spdk_json_write_ctx * w)919 bdev_rbd_write_config_json(struct spdk_bdev *bdev, struct spdk_json_write_ctx *w)
920 {
921 struct bdev_rbd *rbd = bdev->ctxt;
922
923 spdk_json_write_object_begin(w);
924
925 spdk_json_write_named_string(w, "method", "bdev_rbd_create");
926
927 spdk_json_write_named_object_begin(w, "params");
928 spdk_json_write_named_string(w, "name", bdev->name);
929 spdk_json_write_named_string(w, "pool_name", rbd->pool_name);
930 spdk_json_write_named_string(w, "rbd_name", rbd->rbd_name);
931 spdk_json_write_named_uint32(w, "block_size", bdev->blocklen);
932 if (rbd->user_id) {
933 spdk_json_write_named_string(w, "user_id", rbd->user_id);
934 }
935
936 if (rbd->config) {
937 char **entry = rbd->config;
938
939 spdk_json_write_named_object_begin(w, "config");
940 while (*entry) {
941 spdk_json_write_named_string(w, entry[0], entry[1]);
942 entry += 2;
943 }
944 spdk_json_write_object_end(w);
945 }
946
947 spdk_json_write_named_uuid(w, "uuid", &bdev->uuid);
948
949 spdk_json_write_object_end(w);
950
951 spdk_json_write_object_end(w);
952 }
953
954 static void
dump_single_cluster_entry(struct bdev_rbd_cluster * entry,struct spdk_json_write_ctx * w)955 dump_single_cluster_entry(struct bdev_rbd_cluster *entry, struct spdk_json_write_ctx *w)
956 {
957 assert(entry != NULL);
958
959 spdk_json_write_object_begin(w);
960 spdk_json_write_named_string(w, "cluster_name", entry->name);
961
962 if (entry->user_id) {
963 spdk_json_write_named_string(w, "user_id", entry->user_id);
964 }
965
966 if (entry->config_param) {
967 char **config_entry = entry->config_param;
968
969 spdk_json_write_named_object_begin(w, "config_param");
970 while (*config_entry) {
971 spdk_json_write_named_string(w, config_entry[0], config_entry[1]);
972 config_entry += 2;
973 }
974 spdk_json_write_object_end(w);
975 }
976 if (entry->config_file) {
977 spdk_json_write_named_string(w, "config_file", entry->config_file);
978 }
979 if (entry->key_file) {
980 spdk_json_write_named_string(w, "key_file", entry->key_file);
981 }
982
983 if (entry->core_mask) {
984 spdk_json_write_named_string(w, "core_mask", entry->core_mask);
985 }
986
987 spdk_json_write_object_end(w);
988 }
989
990 int
bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request * request,const char * name)991 bdev_rbd_get_clusters_info(struct spdk_jsonrpc_request *request, const char *name)
992 {
993 struct bdev_rbd_cluster *entry;
994 struct spdk_json_write_ctx *w;
995
996 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
997
998 if (STAILQ_EMPTY(&g_map_bdev_rbd_cluster)) {
999 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1000 return -ENOENT;
1001 }
1002
1003 /* If cluster name is provided */
1004 if (name) {
1005 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1006 if (strcmp(name, entry->name) == 0) {
1007 w = spdk_jsonrpc_begin_result(request);
1008 dump_single_cluster_entry(entry, w);
1009 spdk_jsonrpc_end_result(request, w);
1010
1011 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1012 return 0;
1013 }
1014 }
1015
1016 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1017 return -ENOENT;
1018 }
1019
1020 w = spdk_jsonrpc_begin_result(request);
1021 spdk_json_write_array_begin(w);
1022 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1023 dump_single_cluster_entry(entry, w);
1024 }
1025 spdk_json_write_array_end(w);
1026 spdk_jsonrpc_end_result(request, w);
1027 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1028
1029 return 0;
1030 }
1031
1032 static const struct spdk_bdev_fn_table rbd_fn_table = {
1033 .destruct = bdev_rbd_destruct,
1034 .submit_request = bdev_rbd_submit_request,
1035 .io_type_supported = bdev_rbd_io_type_supported,
1036 .get_io_channel = bdev_rbd_get_io_channel,
1037 .dump_info_json = bdev_rbd_dump_info_json,
1038 .write_config_json = bdev_rbd_write_config_json,
1039 };
1040
1041 static int
rbd_thread_set_cpumask(struct spdk_cpuset * set)1042 rbd_thread_set_cpumask(struct spdk_cpuset *set)
1043 {
1044 #ifdef __linux__
1045 uint32_t lcore;
1046 cpu_set_t mask;
1047
1048 assert(set != NULL);
1049 CPU_ZERO(&mask);
1050
1051 /* get the core id on current spdk_cpuset and set to cpu_set_t */
1052 for (lcore = 0; lcore < SPDK_CPUSET_SIZE; lcore++) {
1053 if (spdk_cpuset_get_cpu(set, lcore)) {
1054 CPU_SET(lcore, &mask);
1055 }
1056 }
1057
1058 /* change current thread core mask */
1059 if (sched_setaffinity(0, sizeof(mask), &mask) < 0) {
1060 SPDK_ERRLOG("Set non SPDK thread cpu mask error (errno=%d)\n", errno);
1061 return -1;
1062 }
1063
1064 return 0;
1065 #else
1066 SPDK_ERRLOG("SPDK non spdk thread cpumask setup supports only Linux platform now.\n");
1067 return -ENOTSUP;
1068 #endif
1069 }
1070
1071
1072 static int
rbd_register_cluster(const char * name,const char * user_id,const char * const * config_param,const char * config_file,const char * key_file,const char * core_mask)1073 rbd_register_cluster(const char *name, const char *user_id, const char *const *config_param,
1074 const char *config_file, const char *key_file, const char *core_mask)
1075 {
1076 struct bdev_rbd_cluster *entry;
1077 struct spdk_cpuset rbd_core_mask = {};
1078 int rc;
1079
1080 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1081 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1082 if (strcmp(name, entry->name) == 0) {
1083 SPDK_ERRLOG("Cluster name=%s already exists\n", name);
1084 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1085 return -1;
1086 }
1087 }
1088
1089 entry = calloc(1, sizeof(*entry));
1090 if (!entry) {
1091 SPDK_ERRLOG("Cannot allocate an entry for name=%s\n", name);
1092 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1093 return -1;
1094 }
1095
1096 entry->name = strdup(name);
1097 if (entry->name == NULL) {
1098 SPDK_ERRLOG("Failed to save the name =%s on entry =%p\n", name, entry);
1099 goto err_handle;
1100 }
1101
1102 if (user_id) {
1103 entry->user_id = strdup(user_id);
1104 if (entry->user_id == NULL) {
1105 SPDK_ERRLOG("Failed to save the str =%s on entry =%p\n", user_id, entry);
1106 goto err_handle;
1107 }
1108 }
1109
1110 /* Support specify config_param or config_file separately, or both of them. */
1111 if (config_param) {
1112 entry->config_param = bdev_rbd_dup_config(config_param);
1113 if (entry->config_param == NULL) {
1114 SPDK_ERRLOG("Failed to save the config_param=%p on entry = %p\n", config_param, entry);
1115 goto err_handle;
1116 }
1117 }
1118
1119 if (config_file) {
1120 entry->config_file = strdup(config_file);
1121 if (entry->config_file == NULL) {
1122 SPDK_ERRLOG("Failed to save the config_file=%s on entry = %p\n", config_file, entry);
1123 goto err_handle;
1124 }
1125 }
1126
1127 if (key_file) {
1128 entry->key_file = strdup(key_file);
1129 if (entry->key_file == NULL) {
1130 SPDK_ERRLOG("Failed to save the key_file=%s on entry = %p\n", key_file, entry);
1131 goto err_handle;
1132 }
1133 }
1134
1135 if (core_mask) {
1136 entry->core_mask = strdup(core_mask);
1137 if (entry->core_mask == NULL) {
1138 SPDK_ERRLOG("Core_mask=%s allocation failed on entry = %p\n", core_mask, entry);
1139 goto err_handle;
1140 }
1141
1142 if (spdk_cpuset_parse(&rbd_core_mask, entry->core_mask) < 0) {
1143 SPDK_ERRLOG("Invalid cpumask=%s on entry = %p\n", entry->core_mask, entry);
1144 goto err_handle;
1145 }
1146
1147 if (rbd_thread_set_cpumask(&rbd_core_mask) < 0) {
1148 SPDK_ERRLOG("Failed to change rbd threads to core_mask %s on entry = %p\n", core_mask, entry);
1149 goto err_handle;
1150 }
1151 }
1152
1153
1154 /* If rbd thread core mask is given, rados_create() must execute with
1155 * the affinity set by rbd_thread_set_cpumask(). The affinity set
1156 * by rbd_thread_set_cpumask() will be reverted once rbd_register_cluster() returns
1157 * and when we leave the spdk_call_unaffinitized context. */
1158 rc = rados_create(&entry->cluster, user_id);
1159 if (rc < 0) {
1160 SPDK_ERRLOG("Failed to create rados_t struct\n");
1161 goto err_handle;
1162 }
1163
1164 /* Try default location when entry->config_file is NULL, but ignore failure when it is NULL */
1165 rc = rados_conf_read_file(entry->cluster, entry->config_file);
1166 if (entry->config_file && rc < 0) {
1167 SPDK_ERRLOG("Failed to read conf file %s\n", entry->config_file);
1168 rados_shutdown(entry->cluster);
1169 goto err_handle;
1170 }
1171
1172 if (config_param) {
1173 const char *const *config_entry = config_param;
1174 while (*config_entry) {
1175 rc = rados_conf_set(entry->cluster, config_entry[0], config_entry[1]);
1176 if (rc < 0) {
1177 SPDK_ERRLOG("Failed to set %s = %s\n", config_entry[0], config_entry[1]);
1178 rados_shutdown(entry->cluster);
1179 goto err_handle;
1180 }
1181 config_entry += 2;
1182 }
1183 }
1184
1185 if (key_file) {
1186 rc = rados_conf_set(entry->cluster, "keyring", key_file);
1187 if (rc < 0) {
1188 SPDK_ERRLOG("Failed to set keyring = %s\n", key_file);
1189 rados_shutdown(entry->cluster);
1190 goto err_handle;
1191 }
1192 }
1193
1194 rc = rados_connect(entry->cluster);
1195 if (rc < 0) {
1196 SPDK_ERRLOG("Failed to connect to rbd_pool on cluster=%p\n", entry->cluster);
1197 rados_shutdown(entry->cluster);
1198 goto err_handle;
1199 }
1200
1201 STAILQ_INSERT_TAIL(&g_map_bdev_rbd_cluster, entry, link);
1202 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1203
1204 return 0;
1205
1206 err_handle:
1207 bdev_rbd_cluster_free(entry);
1208 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1209 return -1;
1210 }
1211
1212 int
bdev_rbd_unregister_cluster(const char * name)1213 bdev_rbd_unregister_cluster(const char *name)
1214 {
1215 struct bdev_rbd_cluster *entry;
1216 int rc = 0;
1217
1218 if (name == NULL) {
1219 return -1;
1220 }
1221
1222 pthread_mutex_lock(&g_map_bdev_rbd_cluster_mutex);
1223 STAILQ_FOREACH(entry, &g_map_bdev_rbd_cluster, link) {
1224 if (strcmp(name, entry->name) == 0) {
1225 if (entry->ref == 0) {
1226 STAILQ_REMOVE(&g_map_bdev_rbd_cluster, entry, bdev_rbd_cluster, link);
1227 rados_shutdown(entry->cluster);
1228 bdev_rbd_cluster_free(entry);
1229 } else {
1230 SPDK_ERRLOG("Cluster with name=%p is still used and we cannot delete it\n",
1231 entry->name);
1232 rc = -1;
1233 }
1234
1235 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1236 return rc;
1237 }
1238 }
1239
1240 pthread_mutex_unlock(&g_map_bdev_rbd_cluster_mutex);
1241
1242 SPDK_ERRLOG("Could not find the cluster name =%p\n", name);
1243
1244 return -1;
1245 }
1246
1247 static void *
_bdev_rbd_register_cluster(void * arg)1248 _bdev_rbd_register_cluster(void *arg)
1249 {
1250 struct cluster_register_info *info = arg;
1251 void *ret = arg;
1252 int rc;
1253
1254 rc = rbd_register_cluster((const char *)info->name, (const char *)info->user_id,
1255 (const char *const *)info->config_param, (const char *)info->config_file,
1256 (const char *)info->key_file, info->core_mask);
1257 if (rc) {
1258 ret = NULL;
1259 }
1260
1261 return ret;
1262 }
1263
1264 int
bdev_rbd_register_cluster(struct cluster_register_info * info)1265 bdev_rbd_register_cluster(struct cluster_register_info *info)
1266 {
1267 assert(info != NULL);
1268
1269 /* Rados cluster info need to be created in non SPDK-thread to avoid CPU
1270 * resource contention */
1271 if (spdk_call_unaffinitized(_bdev_rbd_register_cluster, info) == NULL) {
1272 return -1;
1273 }
1274
1275 return 0;
1276 }
1277
1278 int
bdev_rbd_create(struct spdk_bdev ** bdev,const char * name,const char * user_id,const char * pool_name,const char * const * config,const char * rbd_name,uint32_t block_size,const char * cluster_name,const struct spdk_uuid * uuid)1279 bdev_rbd_create(struct spdk_bdev **bdev, const char *name, const char *user_id,
1280 const char *pool_name,
1281 const char *const *config,
1282 const char *rbd_name,
1283 uint32_t block_size,
1284 const char *cluster_name,
1285 const struct spdk_uuid *uuid)
1286 {
1287 struct bdev_rbd *rbd;
1288 int ret;
1289
1290 if ((pool_name == NULL) || (rbd_name == NULL) || (block_size == 0)) {
1291 return -EINVAL;
1292 }
1293
1294 rbd = calloc(1, sizeof(struct bdev_rbd));
1295 if (rbd == NULL) {
1296 SPDK_ERRLOG("Failed to allocate bdev_rbd struct\n");
1297 return -ENOMEM;
1298 }
1299
1300 rbd->rbd_name = strdup(rbd_name);
1301 if (!rbd->rbd_name) {
1302 bdev_rbd_free(rbd);
1303 return -ENOMEM;
1304 }
1305
1306 if (user_id) {
1307 rbd->user_id = strdup(user_id);
1308 if (!rbd->user_id) {
1309 bdev_rbd_free(rbd);
1310 return -ENOMEM;
1311 }
1312 }
1313
1314 if (cluster_name) {
1315 rbd->cluster_name = strdup(cluster_name);
1316 if (!rbd->cluster_name) {
1317 bdev_rbd_free(rbd);
1318 return -ENOMEM;
1319 }
1320 }
1321 rbd->pool_name = strdup(pool_name);
1322 if (!rbd->pool_name) {
1323 bdev_rbd_free(rbd);
1324 return -ENOMEM;
1325 }
1326
1327 if (config && !(rbd->config = bdev_rbd_dup_config(config))) {
1328 bdev_rbd_free(rbd);
1329 return -ENOMEM;
1330 }
1331
1332 ret = bdev_rbd_init(rbd);
1333 if (ret < 0) {
1334 bdev_rbd_free(rbd);
1335 SPDK_ERRLOG("Failed to init rbd device\n");
1336 return ret;
1337 }
1338
1339 rbd->disk.uuid = *uuid;
1340 if (name) {
1341 rbd->disk.name = strdup(name);
1342 } else {
1343 rbd->disk.name = spdk_sprintf_alloc("Ceph%d", bdev_rbd_count);
1344 }
1345 if (!rbd->disk.name) {
1346 bdev_rbd_free(rbd);
1347 return -ENOMEM;
1348 }
1349 rbd->disk.product_name = "Ceph Rbd Disk";
1350 bdev_rbd_count++;
1351
1352 rbd->disk.write_cache = 0;
1353 rbd->disk.blocklen = block_size;
1354 rbd->disk.blockcnt = rbd->info.size / rbd->disk.blocklen;
1355 rbd->disk.ctxt = rbd;
1356 rbd->disk.fn_table = &rbd_fn_table;
1357 rbd->disk.module = &rbd_if;
1358
1359 SPDK_NOTICELOG("Add %s rbd disk to lun\n", rbd->disk.name);
1360
1361 spdk_io_device_register(rbd, bdev_rbd_create_cb,
1362 bdev_rbd_destroy_cb,
1363 sizeof(struct bdev_rbd_io_channel),
1364 rbd_name);
1365 ret = spdk_bdev_register(&rbd->disk);
1366 if (ret) {
1367 spdk_io_device_unregister(rbd, NULL);
1368 bdev_rbd_free(rbd);
1369 return ret;
1370 }
1371
1372 *bdev = &(rbd->disk);
1373
1374 return ret;
1375 }
1376
1377 void
bdev_rbd_delete(const char * name,spdk_delete_rbd_complete cb_fn,void * cb_arg)1378 bdev_rbd_delete(const char *name, spdk_delete_rbd_complete cb_fn, void *cb_arg)
1379 {
1380 int rc;
1381
1382 rc = spdk_bdev_unregister_by_name(name, &rbd_if, cb_fn, cb_arg);
1383 if (rc != 0) {
1384 cb_fn(cb_arg, rc);
1385 }
1386 }
1387
1388 static void
dummy_bdev_event_cb(enum spdk_bdev_event_type type,struct spdk_bdev * bdev,void * ctx)1389 dummy_bdev_event_cb(enum spdk_bdev_event_type type, struct spdk_bdev *bdev, void *ctx)
1390 {
1391 }
1392
1393 int
bdev_rbd_resize(const char * name,const uint64_t new_size_in_mb)1394 bdev_rbd_resize(const char *name, const uint64_t new_size_in_mb)
1395 {
1396 struct spdk_bdev_desc *desc;
1397 struct spdk_bdev *bdev;
1398 struct bdev_rbd *rbd;
1399 int rc = 0;
1400 uint64_t new_size_in_byte;
1401 uint64_t current_size_in_mb;
1402
1403 rc = spdk_bdev_open_ext(name, false, dummy_bdev_event_cb, NULL, &desc);
1404 if (rc != 0) {
1405 return rc;
1406 }
1407
1408 bdev = spdk_bdev_desc_get_bdev(desc);
1409
1410 if (bdev->module != &rbd_if) {
1411 rc = -EINVAL;
1412 goto exit;
1413 }
1414
1415 current_size_in_mb = bdev->blocklen * bdev->blockcnt / (1024 * 1024);
1416 if (current_size_in_mb > new_size_in_mb) {
1417 SPDK_ERRLOG("The new bdev size must be larger than current bdev size.\n");
1418 rc = -EINVAL;
1419 goto exit;
1420 }
1421
1422 rbd = SPDK_CONTAINEROF(bdev, struct bdev_rbd, disk);
1423 new_size_in_byte = new_size_in_mb * 1024 * 1024;
1424 rc = rbd_resize(rbd->image, new_size_in_byte);
1425 if (rc != 0) {
1426 SPDK_ERRLOG("failed to resize the ceph bdev.\n");
1427 goto exit;
1428 }
1429
1430 rc = spdk_bdev_notify_blockcnt_change(bdev, new_size_in_byte / bdev->blocklen);
1431 if (rc != 0) {
1432 SPDK_ERRLOG("failed to notify block cnt change.\n");
1433 }
1434
1435 exit:
1436 spdk_bdev_close(desc);
1437 return rc;
1438 }
1439
1440 static int
bdev_rbd_group_create_cb(void * io_device,void * ctx_buf)1441 bdev_rbd_group_create_cb(void *io_device, void *ctx_buf)
1442 {
1443 return 0;
1444 }
1445
1446 static void
bdev_rbd_group_destroy_cb(void * io_device,void * ctx_buf)1447 bdev_rbd_group_destroy_cb(void *io_device, void *ctx_buf)
1448 {
1449 }
1450
1451 static int
bdev_rbd_library_init(void)1452 bdev_rbd_library_init(void)
1453 {
1454 spdk_io_device_register(&rbd_if, bdev_rbd_group_create_cb, bdev_rbd_group_destroy_cb,
1455 0, "bdev_rbd_poll_groups");
1456 return 0;
1457 }
1458
1459 static void
bdev_rbd_library_fini(void)1460 bdev_rbd_library_fini(void)
1461 {
1462 spdk_io_device_unregister(&rbd_if, NULL);
1463 }
1464
1465 SPDK_LOG_REGISTER_COMPONENT(bdev_rbd)
1466