2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U32_MAX ((u32) (~0U))
58 #define U64_MAX ((u64) (~0ULL))
60 #define RBD_DRV_NAME "rbd"
61 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
63 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
65 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
66 #define RBD_MAX_SNAP_NAME_LEN \
67 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
69 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
70 #define RBD_MAX_OPT_LEN 1024
72 #define RBD_SNAP_HEAD_NAME "-"
74 /* This allows a single page to hold an image name sent by OSD */
75 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
76 #define RBD_IMAGE_ID_LEN_MAX 64
78 #define RBD_OBJ_PREFIX_LEN_MAX 64
82 #define RBD_FEATURE_LAYERING 1
84 /* Features supported by this (client software) implementation. */
86 #define RBD_FEATURES_ALL (0)
89 * An RBD device name will be "rbd#", where the "rbd" comes from
90 * RBD_DRV_NAME above, and # is a unique integer identifier.
91 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
92 * enough to hold all possible device names.
94 #define DEV_NAME_LEN 32
95 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
97 #define RBD_READ_ONLY_DEFAULT false
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These four fields never change for a given rbd image */
110 /* The remaining fields need to be updated occasionally */
112 struct ceph_snap_context *snapc;
120 * An rbd image specification.
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
162 * an instance of the client. multiple devices may share an rbd client.
165 struct ceph_client *client;
167 struct list_head node;
171 * a request completion status
173 struct rbd_req_status {
180 * a collection of requests
182 struct rbd_req_coll {
186 struct rbd_req_status status[0];
190 * a single io request
193 struct request *rq; /* blk layer request */
194 struct bio *bio; /* cloned bio */
195 struct page **pages; /* list of used pages */
198 struct rbd_req_coll *coll;
205 struct list_head node;
220 int dev_id; /* blkdev unique id */
222 int major; /* blkdev assigned major */
223 struct gendisk *disk; /* blkdev's gendisk and rq */
225 u32 image_format; /* Either 1 or 2 */
226 struct rbd_client *rbd_client;
228 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
230 spinlock_t lock; /* queue lock */
232 struct rbd_image_header header;
234 struct rbd_spec *spec;
238 struct ceph_file_layout layout;
240 struct ceph_osd_event *watch_event;
241 struct ceph_osd_request *watch_request;
243 struct rbd_spec *parent_spec;
246 /* protects updating the header */
247 struct rw_semaphore header_rwsem;
249 struct rbd_mapping mapping;
251 struct list_head node;
253 /* list of snapshots */
254 struct list_head snaps;
258 unsigned long open_count;
261 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
263 static LIST_HEAD(rbd_dev_list); /* devices */
264 static DEFINE_SPINLOCK(rbd_dev_list_lock);
266 static LIST_HEAD(rbd_client_list); /* clients */
267 static DEFINE_SPINLOCK(rbd_client_list_lock);
269 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
270 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
272 static void rbd_dev_release(struct device *dev);
273 static void rbd_remove_snap_dev(struct rbd_snap *snap);
275 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
277 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
280 static struct bus_attribute rbd_bus_attrs[] = {
281 __ATTR(add, S_IWUSR, NULL, rbd_add),
282 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
286 static struct bus_type rbd_bus_type = {
288 .bus_attrs = rbd_bus_attrs,
291 static void rbd_root_dev_release(struct device *dev)
295 static struct device rbd_root_dev = {
297 .release = rbd_root_dev_release,
300 static __printf(2, 3)
301 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
303 struct va_format vaf;
311 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
312 else if (rbd_dev->disk)
313 printk(KERN_WARNING "%s: %s: %pV\n",
314 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
315 else if (rbd_dev->spec && rbd_dev->spec->image_name)
316 printk(KERN_WARNING "%s: image %s: %pV\n",
317 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
318 else if (rbd_dev->spec && rbd_dev->spec->image_id)
319 printk(KERN_WARNING "%s: id %s: %pV\n",
320 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
322 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
323 RBD_DRV_NAME, rbd_dev, &vaf);
328 #define rbd_assert(expr) \
329 if (unlikely(!(expr))) { \
330 printk(KERN_ERR "\nAssertion failure in %s() " \
332 "\trbd_assert(%s);\n\n", \
333 __func__, __LINE__, #expr); \
336 #else /* !RBD_DEBUG */
337 # define rbd_assert(expr) ((void) 0)
338 #endif /* !RBD_DEBUG */
340 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
341 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
343 static int rbd_open(struct block_device *bdev, fmode_t mode)
345 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
347 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
350 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
351 (void) get_device(&rbd_dev->dev);
352 set_device_ro(bdev, rbd_dev->mapping.read_only);
353 rbd_dev->open_count++;
354 mutex_unlock(&ctl_mutex);
359 static int rbd_release(struct gendisk *disk, fmode_t mode)
361 struct rbd_device *rbd_dev = disk->private_data;
363 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
364 rbd_assert(rbd_dev->open_count > 0);
365 rbd_dev->open_count--;
366 put_device(&rbd_dev->dev);
367 mutex_unlock(&ctl_mutex);
372 static const struct block_device_operations rbd_bd_ops = {
373 .owner = THIS_MODULE,
375 .release = rbd_release,
379 * Initialize an rbd client instance.
382 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
384 struct rbd_client *rbdc;
387 dout("rbd_client_create\n");
388 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
392 kref_init(&rbdc->kref);
393 INIT_LIST_HEAD(&rbdc->node);
395 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
397 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
398 if (IS_ERR(rbdc->client))
400 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
402 ret = ceph_open_session(rbdc->client);
406 spin_lock(&rbd_client_list_lock);
407 list_add_tail(&rbdc->node, &rbd_client_list);
408 spin_unlock(&rbd_client_list_lock);
410 mutex_unlock(&ctl_mutex);
412 dout("rbd_client_create created %p\n", rbdc);
416 ceph_destroy_client(rbdc->client);
418 mutex_unlock(&ctl_mutex);
422 ceph_destroy_options(ceph_opts);
427 * Find a ceph client with specific addr and configuration. If
428 * found, bump its reference count.
430 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
432 struct rbd_client *client_node;
435 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
438 spin_lock(&rbd_client_list_lock);
439 list_for_each_entry(client_node, &rbd_client_list, node) {
440 if (!ceph_compare_options(ceph_opts, client_node->client)) {
441 kref_get(&client_node->kref);
446 spin_unlock(&rbd_client_list_lock);
448 return found ? client_node : NULL;
458 /* string args above */
461 /* Boolean args above */
465 static match_table_t rbd_opts_tokens = {
467 /* string args above */
468 {Opt_read_only, "read_only"},
469 {Opt_read_only, "ro"}, /* Alternate spelling */
470 {Opt_read_write, "read_write"},
471 {Opt_read_write, "rw"}, /* Alternate spelling */
472 /* Boolean args above */
476 static int parse_rbd_opts_token(char *c, void *private)
478 struct rbd_options *rbd_opts = private;
479 substring_t argstr[MAX_OPT_ARGS];
480 int token, intval, ret;
482 token = match_token(c, rbd_opts_tokens, argstr);
486 if (token < Opt_last_int) {
487 ret = match_int(&argstr[0], &intval);
489 pr_err("bad mount option arg (not int) "
493 dout("got int token %d val %d\n", token, intval);
494 } else if (token > Opt_last_int && token < Opt_last_string) {
495 dout("got string token %d val %s\n", token,
497 } else if (token > Opt_last_string && token < Opt_last_bool) {
498 dout("got Boolean token %d\n", token);
500 dout("got token %d\n", token);
505 rbd_opts->read_only = true;
508 rbd_opts->read_only = false;
518 * Get a ceph client with specific addr and configuration, if one does
519 * not exist create it.
521 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
523 struct rbd_client *rbdc;
525 rbdc = rbd_client_find(ceph_opts);
526 if (rbdc) /* using an existing client */
527 ceph_destroy_options(ceph_opts);
529 rbdc = rbd_client_create(ceph_opts);
535 * Destroy ceph client
537 * Caller must hold rbd_client_list_lock.
539 static void rbd_client_release(struct kref *kref)
541 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
543 dout("rbd_release_client %p\n", rbdc);
544 spin_lock(&rbd_client_list_lock);
545 list_del(&rbdc->node);
546 spin_unlock(&rbd_client_list_lock);
548 ceph_destroy_client(rbdc->client);
553 * Drop reference to ceph client node. If it's not referenced anymore, release
556 static void rbd_put_client(struct rbd_client *rbdc)
559 kref_put(&rbdc->kref, rbd_client_release);
563 * Destroy requests collection
565 static void rbd_coll_release(struct kref *kref)
567 struct rbd_req_coll *coll =
568 container_of(kref, struct rbd_req_coll, kref);
570 dout("rbd_coll_release %p\n", coll);
574 static bool rbd_image_format_valid(u32 image_format)
576 return image_format == 1 || image_format == 2;
579 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
584 /* The header has to start with the magic rbd header text */
585 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
588 /* The bio layer requires at least sector-sized I/O */
590 if (ondisk->options.order < SECTOR_SHIFT)
593 /* If we use u64 in a few spots we may be able to loosen this */
595 if (ondisk->options.order > 8 * sizeof (int) - 1)
599 * The size of a snapshot header has to fit in a size_t, and
600 * that limits the number of snapshots.
602 snap_count = le32_to_cpu(ondisk->snap_count);
603 size = SIZE_MAX - sizeof (struct ceph_snap_context);
604 if (snap_count > size / sizeof (__le64))
608 * Not only that, but the size of the entire the snapshot
609 * header must also be representable in a size_t.
611 size -= snap_count * sizeof (__le64);
612 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
619 * Create a new header structure, translate header format from the on-disk
622 static int rbd_header_from_disk(struct rbd_image_header *header,
623 struct rbd_image_header_ondisk *ondisk)
630 memset(header, 0, sizeof (*header));
632 snap_count = le32_to_cpu(ondisk->snap_count);
634 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
635 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
636 if (!header->object_prefix)
638 memcpy(header->object_prefix, ondisk->object_prefix, len);
639 header->object_prefix[len] = '\0';
642 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
644 /* Save a copy of the snapshot names */
646 if (snap_names_len > (u64) SIZE_MAX)
648 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
649 if (!header->snap_names)
652 * Note that rbd_dev_v1_header_read() guarantees
653 * the ondisk buffer we're working with has
654 * snap_names_len bytes beyond the end of the
655 * snapshot id array, this memcpy() is safe.
657 memcpy(header->snap_names, &ondisk->snaps[snap_count],
660 /* Record each snapshot's size */
662 size = snap_count * sizeof (*header->snap_sizes);
663 header->snap_sizes = kmalloc(size, GFP_KERNEL);
664 if (!header->snap_sizes)
666 for (i = 0; i < snap_count; i++)
667 header->snap_sizes[i] =
668 le64_to_cpu(ondisk->snaps[i].image_size);
670 WARN_ON(ondisk->snap_names_len);
671 header->snap_names = NULL;
672 header->snap_sizes = NULL;
675 header->features = 0; /* No features support in v1 images */
676 header->obj_order = ondisk->options.order;
677 header->crypt_type = ondisk->options.crypt_type;
678 header->comp_type = ondisk->options.comp_type;
680 /* Allocate and fill in the snapshot context */
682 header->image_size = le64_to_cpu(ondisk->image_size);
683 size = sizeof (struct ceph_snap_context);
684 size += snap_count * sizeof (header->snapc->snaps[0]);
685 header->snapc = kzalloc(size, GFP_KERNEL);
689 atomic_set(&header->snapc->nref, 1);
690 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
691 header->snapc->num_snaps = snap_count;
692 for (i = 0; i < snap_count; i++)
693 header->snapc->snaps[i] =
694 le64_to_cpu(ondisk->snaps[i].id);
699 kfree(header->snap_sizes);
700 header->snap_sizes = NULL;
701 kfree(header->snap_names);
702 header->snap_names = NULL;
703 kfree(header->object_prefix);
704 header->object_prefix = NULL;
709 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
711 struct rbd_snap *snap;
713 if (snap_id == CEPH_NOSNAP)
714 return RBD_SNAP_HEAD_NAME;
716 list_for_each_entry(snap, &rbd_dev->snaps, node)
717 if (snap_id == snap->id)
723 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
726 struct rbd_snap *snap;
728 list_for_each_entry(snap, &rbd_dev->snaps, node) {
729 if (!strcmp(snap_name, snap->name)) {
730 rbd_dev->spec->snap_id = snap->id;
731 rbd_dev->mapping.size = snap->size;
732 rbd_dev->mapping.features = snap->features;
741 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
745 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
746 sizeof (RBD_SNAP_HEAD_NAME))) {
747 rbd_dev->spec->snap_id = CEPH_NOSNAP;
748 rbd_dev->mapping.size = rbd_dev->header.image_size;
749 rbd_dev->mapping.features = rbd_dev->header.features;
752 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
755 rbd_dev->mapping.read_only = true;
757 atomic_set(&rbd_dev->exists, 1);
762 static void rbd_header_free(struct rbd_image_header *header)
764 kfree(header->object_prefix);
765 header->object_prefix = NULL;
766 kfree(header->snap_sizes);
767 header->snap_sizes = NULL;
768 kfree(header->snap_names);
769 header->snap_names = NULL;
770 ceph_put_snap_context(header->snapc);
771 header->snapc = NULL;
774 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
780 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
783 segment = offset >> rbd_dev->header.obj_order;
784 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
785 rbd_dev->header.object_prefix, segment);
786 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
787 pr_err("error formatting segment name for #%llu (%d)\n",
796 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
798 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
800 return offset & (segment_size - 1);
803 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
804 u64 offset, u64 length)
806 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
808 offset &= segment_size - 1;
810 rbd_assert(length <= U64_MAX - offset);
811 if (offset + length > segment_size)
812 length = segment_size - offset;
817 static int rbd_get_num_segments(struct rbd_image_header *header,
825 if (len - 1 > U64_MAX - ofs)
828 start_seg = ofs >> header->obj_order;
829 end_seg = (ofs + len - 1) >> header->obj_order;
831 return end_seg - start_seg + 1;
835 * returns the size of an object in the image
837 static u64 rbd_obj_bytes(struct rbd_image_header *header)
839 return 1 << header->obj_order;
846 static void bio_chain_put(struct bio *chain)
852 chain = chain->bi_next;
858 * zeros a bio chain, starting at specific offset
860 static void zero_bio_chain(struct bio *chain, int start_ofs)
869 bio_for_each_segment(bv, chain, i) {
870 if (pos + bv->bv_len > start_ofs) {
871 int remainder = max(start_ofs - pos, 0);
872 buf = bvec_kmap_irq(bv, &flags);
873 memset(buf + remainder, 0,
874 bv->bv_len - remainder);
875 bvec_kunmap_irq(buf, &flags);
880 chain = chain->bi_next;
885 * Clone a portion of a bio, starting at the given byte offset
886 * and continuing for the number of bytes indicated.
888 static struct bio *bio_clone_range(struct bio *bio_src,
897 unsigned short end_idx;
901 /* Handle the easy case for the caller */
903 if (!offset && len == bio_src->bi_size)
904 return bio_clone(bio_src, gfpmask);
906 if (WARN_ON_ONCE(!len))
908 if (WARN_ON_ONCE(len > bio_src->bi_size))
910 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
913 /* Find first affected segment... */
916 __bio_for_each_segment(bv, bio_src, idx, 0) {
917 if (resid < bv->bv_len)
923 /* ...and the last affected segment */
926 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
927 if (resid <= bv->bv_len)
931 vcnt = end_idx - idx + 1;
933 /* Build the clone */
935 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
937 return NULL; /* ENOMEM */
939 bio->bi_bdev = bio_src->bi_bdev;
940 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
941 bio->bi_rw = bio_src->bi_rw;
942 bio->bi_flags |= 1 << BIO_CLONED;
945 * Copy over our part of the bio_vec, then update the first
946 * and last (or only) entries.
948 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
949 vcnt * sizeof (struct bio_vec));
950 bio->bi_io_vec[0].bv_offset += voff;
952 bio->bi_io_vec[0].bv_len -= voff;
953 bio->bi_io_vec[vcnt - 1].bv_len = resid;
955 bio->bi_io_vec[0].bv_len = len;
966 * Clone a portion of a bio chain, starting at the given byte offset
967 * into the first bio in the source chain and continuing for the
968 * number of bytes indicated. The result is another bio chain of
969 * exactly the given length, or a null pointer on error.
971 * The bio_src and offset parameters are both in-out. On entry they
972 * refer to the first source bio and the offset into that bio where
973 * the start of data to be cloned is located.
975 * On return, bio_src is updated to refer to the bio in the source
976 * chain that contains first un-cloned byte, and *offset will
977 * contain the offset of that byte within that bio.
979 static struct bio *bio_chain_clone_range(struct bio **bio_src,
980 unsigned int *offset,
984 struct bio *bi = *bio_src;
985 unsigned int off = *offset;
986 struct bio *chain = NULL;
989 /* Build up a chain of clone bios up to the limit */
991 if (!bi || off >= bi->bi_size || !len)
992 return NULL; /* Nothing to clone */
996 unsigned int bi_size;
1000 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1001 goto out_err; /* EINVAL; ran out of bio's */
1003 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1004 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1006 goto out_err; /* ENOMEM */
1009 end = &bio->bi_next;
1012 if (off == bi->bi_size) {
1023 bio_chain_put(chain);
1028 static struct ceph_osd_req_op *rbd_create_rw_op(int opcode, u32 payload_len)
1030 struct ceph_osd_req_op *op;
1032 op = kzalloc(sizeof (*op), GFP_NOIO);
1036 * op extent offset and length will be set later on
1037 * after ceph_calc_file_object_mapping().
1040 op->payload_len = payload_len;
1045 static void rbd_destroy_op(struct ceph_osd_req_op *op)
1050 static void rbd_coll_end_req_index(struct request *rq,
1051 struct rbd_req_coll *coll,
1055 struct request_queue *q;
1058 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1059 coll, index, (int)ret, (unsigned long long)len);
1065 blk_end_request(rq, ret, len);
1071 spin_lock_irq(q->queue_lock);
1072 coll->status[index].done = 1;
1073 coll->status[index].rc = ret;
1074 coll->status[index].bytes = len;
1075 max = min = coll->num_done;
1076 while (max < coll->total && coll->status[max].done)
1079 for (i = min; i<max; i++) {
1080 __blk_end_request(rq, (int)coll->status[i].rc,
1081 coll->status[i].bytes);
1083 kref_put(&coll->kref, rbd_coll_release);
1085 spin_unlock_irq(q->queue_lock);
1088 static void rbd_coll_end_req(struct rbd_request *rbd_req,
1091 rbd_coll_end_req_index(rbd_req->rq,
1092 rbd_req->coll, rbd_req->coll_index,
1097 * Send ceph osd request
1099 static int rbd_do_request(struct request *rq,
1100 struct rbd_device *rbd_dev,
1101 struct ceph_snap_context *snapc,
1103 const char *object_name, u64 ofs, u64 len,
1105 struct page **pages,
1108 struct ceph_osd_req_op *op,
1109 struct rbd_req_coll *coll,
1111 void (*rbd_cb)(struct ceph_osd_request *,
1113 struct ceph_osd_request **linger_req,
1116 struct ceph_osd_client *osdc;
1117 struct ceph_osd_request *osd_req;
1118 struct rbd_request *rbd_req = NULL;
1119 struct timespec mtime = CURRENT_TIME;
1122 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1123 object_name, (unsigned long long) ofs,
1124 (unsigned long long) len, coll, coll_index);
1126 osdc = &rbd_dev->rbd_client->client->osdc;
1127 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1131 osd_req->r_flags = flags;
1132 osd_req->r_pages = pages;
1134 osd_req->r_bio = bio;
1135 bio_get(osd_req->r_bio);
1140 rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
1146 rbd_req->pages = pages;
1148 rbd_req->coll = coll;
1149 rbd_req->coll_index = coll ? coll_index : 0;
1152 osd_req->r_callback = rbd_cb;
1153 osd_req->r_priv = rbd_req;
1155 strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1156 osd_req->r_oid_len = strlen(osd_req->r_oid);
1158 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1160 if (op->op == CEPH_OSD_OP_READ || op->op == CEPH_OSD_OP_WRITE) {
1161 op->extent.offset = ofs;
1162 op->extent.length = len;
1163 if (op->op == CEPH_OSD_OP_WRITE)
1164 op->payload_len = len;
1166 osd_req->r_num_pages = calc_pages_for(ofs, len);
1167 osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1169 ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1170 snapc, snapid, &mtime);
1173 ceph_osdc_set_request_linger(osdc, osd_req);
1174 *linger_req = osd_req;
1177 ret = ceph_osdc_start_request(osdc, osd_req, false);
1184 ret = ceph_osdc_wait_request(osdc, osd_req);
1185 version = le64_to_cpu(osd_req->r_reassert_version.version);
1188 dout("reassert_ver=%llu\n", (unsigned long long) version);
1189 ceph_osdc_put_request(osd_req);
1195 bio_chain_put(osd_req->r_bio);
1198 ceph_osdc_put_request(osd_req);
1204 * Ceph osd op callback
1206 static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
1208 struct rbd_request *rbd_req = osd_req->r_priv;
1209 struct ceph_osd_reply_head *replyhead;
1210 struct ceph_osd_op *op;
1216 replyhead = msg->front.iov_base;
1217 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1218 op = (void *)(replyhead + 1);
1219 rc = (s32)le32_to_cpu(replyhead->result);
1220 bytes = le64_to_cpu(op->extent.length);
1221 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1223 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1224 (unsigned long long) bytes, read_op, (int) rc);
1226 if (rc == (s32)-ENOENT && read_op) {
1227 zero_bio_chain(rbd_req->bio, 0);
1229 } else if (rc == 0 && read_op && bytes < rbd_req->len) {
1230 zero_bio_chain(rbd_req->bio, bytes);
1231 bytes = rbd_req->len;
1234 rbd_coll_end_req(rbd_req, rc, bytes);
1237 bio_chain_put(rbd_req->bio);
1239 ceph_osdc_put_request(osd_req);
1243 static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1244 struct ceph_msg *msg)
1246 ceph_osdc_put_request(osd_req);
1250 * Do a synchronous ceph osd operation
1252 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1254 struct ceph_osd_req_op *op,
1255 const char *object_name,
1256 u64 ofs, u64 inbound_size,
1258 struct ceph_osd_request **linger_req,
1262 struct page **pages;
1265 rbd_assert(op != NULL);
1267 num_pages = calc_pages_for(ofs, inbound_size);
1268 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1270 return PTR_ERR(pages);
1272 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1273 object_name, ofs, inbound_size, NULL,
1283 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1284 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1287 ceph_release_page_vector(pages, num_pages);
1292 * Do an asynchronous ceph osd operation
1294 static int rbd_do_op(struct request *rq,
1295 struct rbd_device *rbd_dev,
1296 struct ceph_snap_context *snapc,
1299 struct rbd_req_coll *coll,
1306 struct ceph_osd_req_op *op;
1312 seg_name = rbd_segment_name(rbd_dev, ofs);
1315 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1316 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1318 if (rq_data_dir(rq) == WRITE) {
1319 opcode = CEPH_OSD_OP_WRITE;
1320 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1321 snapid = CEPH_NOSNAP;
1322 payload_len = seg_len;
1324 opcode = CEPH_OSD_OP_READ;
1325 flags = CEPH_OSD_FLAG_READ;
1327 snapid = rbd_dev->spec->snap_id;
1332 op = rbd_create_rw_op(opcode, payload_len);
1336 /* we've taken care of segment sizes earlier when we
1337 cloned the bios. We should never have a segment
1338 truncated at this point */
1339 rbd_assert(seg_len == len);
1341 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1342 seg_name, seg_ofs, seg_len,
1348 rbd_req_cb, 0, NULL);
1350 rbd_coll_end_req_index(rq, coll, coll_index,
1359 * Request sync osd read
1361 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1362 const char *object_name,
1367 struct ceph_osd_req_op *op;
1370 op = rbd_create_rw_op(CEPH_OSD_OP_READ, 0);
1374 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1375 op, object_name, ofs, len, buf, NULL, ver);
1382 * Request sync osd watch
1384 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1388 struct ceph_osd_req_op *op;
1391 op = rbd_create_rw_op(CEPH_OSD_OP_NOTIFY_ACK, 0);
1395 op->watch.ver = cpu_to_le64(ver);
1396 op->watch.cookie = notify_id;
1399 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1400 rbd_dev->header_name, 0, 0, NULL,
1405 rbd_simple_req_cb, 0, NULL);
1411 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1413 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1420 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1421 rbd_dev->header_name, (unsigned long long) notify_id,
1422 (unsigned int) opcode);
1423 rc = rbd_dev_refresh(rbd_dev, &hver);
1425 rbd_warn(rbd_dev, "got notification but failed to "
1426 " update snaps: %d\n", rc);
1428 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1432 * Request sync osd watch/unwatch. The value of "start" determines
1433 * whether a watch request is being initiated or torn down.
1435 static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
1437 struct ceph_osd_req_op *op;
1438 struct ceph_osd_request **linger_req = NULL;
1442 op = rbd_create_rw_op(CEPH_OSD_OP_WATCH, 0);
1447 struct ceph_osd_client *osdc;
1449 osdc = &rbd_dev->rbd_client->client->osdc;
1450 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1451 &rbd_dev->watch_event);
1454 version = cpu_to_le64(rbd_dev->header.obj_version);
1455 linger_req = &rbd_dev->watch_request;
1458 op->watch.ver = version;
1459 op->watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1460 op->watch.flag = (u8) start ? 1 : 0;
1462 ret = rbd_req_sync_op(rbd_dev,
1463 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1464 op, rbd_dev->header_name,
1465 0, 0, NULL, linger_req, NULL);
1467 if (!start || ret < 0) {
1468 ceph_osdc_cancel_event(rbd_dev->watch_event);
1469 rbd_dev->watch_event = NULL;
1478 * Synchronous osd object method call
1480 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1481 const char *object_name,
1482 const char *class_name,
1483 const char *method_name,
1484 const char *outbound,
1485 size_t outbound_size,
1487 size_t inbound_size,
1490 struct ceph_osd_req_op *op;
1491 int class_name_len = strlen(class_name);
1492 int method_name_len = strlen(method_name);
1497 * Any input parameters required by the method we're calling
1498 * will be sent along with the class and method names as
1499 * part of the message payload. That data and its size are
1500 * supplied via the indata and indata_len fields (named from
1501 * the perspective of the server side) in the OSD request
1504 payload_size = class_name_len + method_name_len + outbound_size;
1505 op = rbd_create_rw_op(CEPH_OSD_OP_CALL, payload_size);
1509 op->cls.class_name = class_name;
1510 op->cls.class_len = (__u8) class_name_len;
1511 op->cls.method_name = method_name;
1512 op->cls.method_len = (__u8) method_name_len;
1514 op->cls.indata = outbound;
1515 op->cls.indata_len = outbound_size;
1517 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1518 object_name, 0, inbound_size, inbound,
1523 dout("cls_exec returned %d\n", ret);
1527 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1529 struct rbd_req_coll *coll =
1530 kzalloc(sizeof(struct rbd_req_coll) +
1531 sizeof(struct rbd_req_status) * num_reqs,
1536 coll->total = num_reqs;
1537 kref_init(&coll->kref);
1541 static int rbd_dev_do_request(struct request *rq,
1542 struct rbd_device *rbd_dev,
1543 struct ceph_snap_context *snapc,
1544 u64 ofs, unsigned int size,
1545 struct bio *bio_chain)
1548 struct rbd_req_coll *coll;
1549 unsigned int bio_offset;
1552 dout("%s 0x%x bytes at 0x%llx\n",
1553 rq_data_dir(rq) == WRITE ? "write" : "read",
1554 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1556 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1560 coll = rbd_alloc_coll(num_segs);
1566 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1567 unsigned int clone_size;
1568 struct bio *bio_clone;
1570 BUG_ON(limit > (u64)UINT_MAX);
1571 clone_size = (unsigned int)limit;
1572 dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
1574 kref_get(&coll->kref);
1576 /* Pass a cloned bio chain via an osd request */
1578 bio_clone = bio_chain_clone_range(&bio_chain,
1579 &bio_offset, clone_size,
1582 (void)rbd_do_op(rq, rbd_dev, snapc,
1584 bio_clone, coll, cur_seg);
1586 rbd_coll_end_req_index(rq, coll, cur_seg,
1594 kref_put(&coll->kref, rbd_coll_release);
1600 * block device queue callback
1602 static void rbd_rq_fn(struct request_queue *q)
1604 struct rbd_device *rbd_dev = q->queuedata;
1605 bool read_only = rbd_dev->mapping.read_only;
1608 while ((rq = blk_fetch_request(q))) {
1609 struct ceph_snap_context *snapc = NULL;
1610 unsigned int size = 0;
1613 dout("fetched request\n");
1615 /* Filter out block requests we don't understand */
1617 if ((rq->cmd_type != REQ_TYPE_FS)) {
1618 __blk_end_request_all(rq, 0);
1621 spin_unlock_irq(q->queue_lock);
1623 /* Write requests need a reference to the snapshot context */
1625 if (rq_data_dir(rq) == WRITE) {
1627 if (read_only) /* Can't write to a read-only device */
1628 goto out_end_request;
1631 * Note that each osd request will take its
1632 * own reference to the snapshot context
1633 * supplied. The reference we take here
1634 * just guarantees the one we provide stays
1637 down_read(&rbd_dev->header_rwsem);
1638 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1639 up_read(&rbd_dev->header_rwsem);
1640 rbd_assert(snapc != NULL);
1641 } else if (!atomic_read(&rbd_dev->exists)) {
1642 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1643 dout("request for non-existent snapshot");
1645 goto out_end_request;
1648 size = blk_rq_bytes(rq);
1649 result = rbd_dev_do_request(rq, rbd_dev, snapc,
1650 blk_rq_pos(rq) * SECTOR_SIZE,
1654 ceph_put_snap_context(snapc);
1655 spin_lock_irq(q->queue_lock);
1656 if (!size || result < 0)
1657 __blk_end_request_all(rq, result);
1662 * a queue callback. Makes sure that we don't create a bio that spans across
1663 * multiple osd objects. One exception would be with a single page bios,
1664 * which we handle later at bio_chain_clone_range()
1666 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1667 struct bio_vec *bvec)
1669 struct rbd_device *rbd_dev = q->queuedata;
1670 sector_t sector_offset;
1671 sector_t sectors_per_obj;
1672 sector_t obj_sector_offset;
1676 * Find how far into its rbd object the partition-relative
1677 * bio start sector is to offset relative to the enclosing
1680 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1681 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1682 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1685 * Compute the number of bytes from that offset to the end
1686 * of the object. Account for what's already used by the bio.
1688 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1689 if (ret > bmd->bi_size)
1690 ret -= bmd->bi_size;
1695 * Don't send back more than was asked for. And if the bio
1696 * was empty, let the whole thing through because: "Note
1697 * that a block device *must* allow a single page to be
1698 * added to an empty bio."
1700 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1701 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1702 ret = (int) bvec->bv_len;
1707 static void rbd_free_disk(struct rbd_device *rbd_dev)
1709 struct gendisk *disk = rbd_dev->disk;
1714 if (disk->flags & GENHD_FL_UP)
1717 blk_cleanup_queue(disk->queue);
1722 * Read the complete header for the given rbd device.
1724 * Returns a pointer to a dynamically-allocated buffer containing
1725 * the complete and validated header. Caller can pass the address
1726 * of a variable that will be filled in with the version of the
1727 * header object at the time it was read.
1729 * Returns a pointer-coded errno if a failure occurs.
1731 static struct rbd_image_header_ondisk *
1732 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1734 struct rbd_image_header_ondisk *ondisk = NULL;
1741 * The complete header will include an array of its 64-bit
1742 * snapshot ids, followed by the names of those snapshots as
1743 * a contiguous block of NUL-terminated strings. Note that
1744 * the number of snapshots could change by the time we read
1745 * it in, in which case we re-read it.
1752 size = sizeof (*ondisk);
1753 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1755 ondisk = kmalloc(size, GFP_KERNEL);
1757 return ERR_PTR(-ENOMEM);
1759 ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
1761 (char *) ondisk, version);
1765 if (WARN_ON((size_t) ret < size)) {
1767 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
1771 if (!rbd_dev_ondisk_valid(ondisk)) {
1773 rbd_warn(rbd_dev, "invalid header");
1777 names_size = le64_to_cpu(ondisk->snap_names_len);
1778 want_count = snap_count;
1779 snap_count = le32_to_cpu(ondisk->snap_count);
1780 } while (snap_count != want_count);
1787 return ERR_PTR(ret);
1791 * reload the ondisk the header
1793 static int rbd_read_header(struct rbd_device *rbd_dev,
1794 struct rbd_image_header *header)
1796 struct rbd_image_header_ondisk *ondisk;
1800 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1802 return PTR_ERR(ondisk);
1803 ret = rbd_header_from_disk(header, ondisk);
1805 header->obj_version = ver;
1811 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1813 struct rbd_snap *snap;
1814 struct rbd_snap *next;
1816 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1817 rbd_remove_snap_dev(snap);
1820 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1824 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1827 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1828 dout("setting size to %llu sectors", (unsigned long long) size);
1829 rbd_dev->mapping.size = (u64) size;
1830 set_capacity(rbd_dev->disk, size);
1834 * only read the first part of the ondisk header, without the snaps info
1836 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1839 struct rbd_image_header h;
1841 ret = rbd_read_header(rbd_dev, &h);
1845 down_write(&rbd_dev->header_rwsem);
1847 /* Update image size, and check for resize of mapped image */
1848 rbd_dev->header.image_size = h.image_size;
1849 rbd_update_mapping_size(rbd_dev);
1851 /* rbd_dev->header.object_prefix shouldn't change */
1852 kfree(rbd_dev->header.snap_sizes);
1853 kfree(rbd_dev->header.snap_names);
1854 /* osd requests may still refer to snapc */
1855 ceph_put_snap_context(rbd_dev->header.snapc);
1858 *hver = h.obj_version;
1859 rbd_dev->header.obj_version = h.obj_version;
1860 rbd_dev->header.image_size = h.image_size;
1861 rbd_dev->header.snapc = h.snapc;
1862 rbd_dev->header.snap_names = h.snap_names;
1863 rbd_dev->header.snap_sizes = h.snap_sizes;
1864 /* Free the extra copy of the object prefix */
1865 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1866 kfree(h.object_prefix);
1868 ret = rbd_dev_snaps_update(rbd_dev);
1870 ret = rbd_dev_snaps_register(rbd_dev);
1872 up_write(&rbd_dev->header_rwsem);
1877 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1881 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1882 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1883 if (rbd_dev->image_format == 1)
1884 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1886 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1887 mutex_unlock(&ctl_mutex);
1892 static int rbd_init_disk(struct rbd_device *rbd_dev)
1894 struct gendisk *disk;
1895 struct request_queue *q;
1898 /* create gendisk info */
1899 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1903 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1905 disk->major = rbd_dev->major;
1906 disk->first_minor = 0;
1907 disk->fops = &rbd_bd_ops;
1908 disk->private_data = rbd_dev;
1911 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1915 /* We use the default size, but let's be explicit about it. */
1916 blk_queue_physical_block_size(q, SECTOR_SIZE);
1918 /* set io sizes to object size */
1919 segment_size = rbd_obj_bytes(&rbd_dev->header);
1920 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1921 blk_queue_max_segment_size(q, segment_size);
1922 blk_queue_io_min(q, segment_size);
1923 blk_queue_io_opt(q, segment_size);
1925 blk_queue_merge_bvec(q, rbd_merge_bvec);
1928 q->queuedata = rbd_dev;
1930 rbd_dev->disk = disk;
1932 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1945 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1947 return container_of(dev, struct rbd_device, dev);
1950 static ssize_t rbd_size_show(struct device *dev,
1951 struct device_attribute *attr, char *buf)
1953 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1956 down_read(&rbd_dev->header_rwsem);
1957 size = get_capacity(rbd_dev->disk);
1958 up_read(&rbd_dev->header_rwsem);
1960 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1964 * Note this shows the features for whatever's mapped, which is not
1965 * necessarily the base image.
1967 static ssize_t rbd_features_show(struct device *dev,
1968 struct device_attribute *attr, char *buf)
1970 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1972 return sprintf(buf, "0x%016llx\n",
1973 (unsigned long long) rbd_dev->mapping.features);
1976 static ssize_t rbd_major_show(struct device *dev,
1977 struct device_attribute *attr, char *buf)
1979 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1981 return sprintf(buf, "%d\n", rbd_dev->major);
1984 static ssize_t rbd_client_id_show(struct device *dev,
1985 struct device_attribute *attr, char *buf)
1987 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1989 return sprintf(buf, "client%lld\n",
1990 ceph_client_id(rbd_dev->rbd_client->client));
1993 static ssize_t rbd_pool_show(struct device *dev,
1994 struct device_attribute *attr, char *buf)
1996 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1998 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2001 static ssize_t rbd_pool_id_show(struct device *dev,
2002 struct device_attribute *attr, char *buf)
2004 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2006 return sprintf(buf, "%llu\n",
2007 (unsigned long long) rbd_dev->spec->pool_id);
2010 static ssize_t rbd_name_show(struct device *dev,
2011 struct device_attribute *attr, char *buf)
2013 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2015 if (rbd_dev->spec->image_name)
2016 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2018 return sprintf(buf, "(unknown)\n");
2021 static ssize_t rbd_image_id_show(struct device *dev,
2022 struct device_attribute *attr, char *buf)
2024 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2026 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2030 * Shows the name of the currently-mapped snapshot (or
2031 * RBD_SNAP_HEAD_NAME for the base image).
2033 static ssize_t rbd_snap_show(struct device *dev,
2034 struct device_attribute *attr,
2037 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2039 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2043 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2044 * for the parent image. If there is no parent, simply shows
2045 * "(no parent image)".
2047 static ssize_t rbd_parent_show(struct device *dev,
2048 struct device_attribute *attr,
2051 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2052 struct rbd_spec *spec = rbd_dev->parent_spec;
2057 return sprintf(buf, "(no parent image)\n");
2059 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2060 (unsigned long long) spec->pool_id, spec->pool_name);
2065 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2066 spec->image_name ? spec->image_name : "(unknown)");
2071 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2072 (unsigned long long) spec->snap_id, spec->snap_name);
2077 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2082 return (ssize_t) (bufp - buf);
2085 static ssize_t rbd_image_refresh(struct device *dev,
2086 struct device_attribute *attr,
2090 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2093 ret = rbd_dev_refresh(rbd_dev, NULL);
2095 return ret < 0 ? ret : size;
2098 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2099 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2100 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2101 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2102 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2103 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2104 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2105 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2106 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2107 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2108 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2110 static struct attribute *rbd_attrs[] = {
2111 &dev_attr_size.attr,
2112 &dev_attr_features.attr,
2113 &dev_attr_major.attr,
2114 &dev_attr_client_id.attr,
2115 &dev_attr_pool.attr,
2116 &dev_attr_pool_id.attr,
2117 &dev_attr_name.attr,
2118 &dev_attr_image_id.attr,
2119 &dev_attr_current_snap.attr,
2120 &dev_attr_parent.attr,
2121 &dev_attr_refresh.attr,
2125 static struct attribute_group rbd_attr_group = {
2129 static const struct attribute_group *rbd_attr_groups[] = {
2134 static void rbd_sysfs_dev_release(struct device *dev)
2138 static struct device_type rbd_device_type = {
2140 .groups = rbd_attr_groups,
2141 .release = rbd_sysfs_dev_release,
2149 static ssize_t rbd_snap_size_show(struct device *dev,
2150 struct device_attribute *attr,
2153 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2155 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2158 static ssize_t rbd_snap_id_show(struct device *dev,
2159 struct device_attribute *attr,
2162 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2164 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2167 static ssize_t rbd_snap_features_show(struct device *dev,
2168 struct device_attribute *attr,
2171 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2173 return sprintf(buf, "0x%016llx\n",
2174 (unsigned long long) snap->features);
2177 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2178 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2179 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2181 static struct attribute *rbd_snap_attrs[] = {
2182 &dev_attr_snap_size.attr,
2183 &dev_attr_snap_id.attr,
2184 &dev_attr_snap_features.attr,
2188 static struct attribute_group rbd_snap_attr_group = {
2189 .attrs = rbd_snap_attrs,
2192 static void rbd_snap_dev_release(struct device *dev)
2194 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2199 static const struct attribute_group *rbd_snap_attr_groups[] = {
2200 &rbd_snap_attr_group,
2204 static struct device_type rbd_snap_device_type = {
2205 .groups = rbd_snap_attr_groups,
2206 .release = rbd_snap_dev_release,
2209 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2211 kref_get(&spec->kref);
2216 static void rbd_spec_free(struct kref *kref);
2217 static void rbd_spec_put(struct rbd_spec *spec)
2220 kref_put(&spec->kref, rbd_spec_free);
2223 static struct rbd_spec *rbd_spec_alloc(void)
2225 struct rbd_spec *spec;
2227 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2230 kref_init(&spec->kref);
2232 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2237 static void rbd_spec_free(struct kref *kref)
2239 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2241 kfree(spec->pool_name);
2242 kfree(spec->image_id);
2243 kfree(spec->image_name);
2244 kfree(spec->snap_name);
2248 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2249 struct rbd_spec *spec)
2251 struct rbd_device *rbd_dev;
2253 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2257 spin_lock_init(&rbd_dev->lock);
2258 atomic_set(&rbd_dev->exists, 0);
2259 INIT_LIST_HEAD(&rbd_dev->node);
2260 INIT_LIST_HEAD(&rbd_dev->snaps);
2261 init_rwsem(&rbd_dev->header_rwsem);
2263 rbd_dev->spec = spec;
2264 rbd_dev->rbd_client = rbdc;
2266 /* Initialize the layout used for all rbd requests */
2268 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2269 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2270 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2271 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2276 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2278 rbd_spec_put(rbd_dev->parent_spec);
2279 kfree(rbd_dev->header_name);
2280 rbd_put_client(rbd_dev->rbd_client);
2281 rbd_spec_put(rbd_dev->spec);
2285 static bool rbd_snap_registered(struct rbd_snap *snap)
2287 bool ret = snap->dev.type == &rbd_snap_device_type;
2288 bool reg = device_is_registered(&snap->dev);
2290 rbd_assert(!ret ^ reg);
2295 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2297 list_del(&snap->node);
2298 if (device_is_registered(&snap->dev))
2299 device_unregister(&snap->dev);
2302 static int rbd_register_snap_dev(struct rbd_snap *snap,
2303 struct device *parent)
2305 struct device *dev = &snap->dev;
2308 dev->type = &rbd_snap_device_type;
2309 dev->parent = parent;
2310 dev->release = rbd_snap_dev_release;
2311 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2312 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2314 ret = device_register(dev);
2319 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2320 const char *snap_name,
2321 u64 snap_id, u64 snap_size,
2324 struct rbd_snap *snap;
2327 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2329 return ERR_PTR(-ENOMEM);
2332 snap->name = kstrdup(snap_name, GFP_KERNEL);
2337 snap->size = snap_size;
2338 snap->features = snap_features;
2346 return ERR_PTR(ret);
2349 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2350 u64 *snap_size, u64 *snap_features)
2354 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2356 *snap_size = rbd_dev->header.snap_sizes[which];
2357 *snap_features = 0; /* No features for v1 */
2359 /* Skip over names until we find the one we are looking for */
2361 snap_name = rbd_dev->header.snap_names;
2363 snap_name += strlen(snap_name) + 1;
2369 * Get the size and object order for an image snapshot, or if
2370 * snap_id is CEPH_NOSNAP, gets this information for the base
2373 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2374 u8 *order, u64 *snap_size)
2376 __le64 snapid = cpu_to_le64(snap_id);
2381 } __attribute__ ((packed)) size_buf = { 0 };
2383 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2385 (char *) &snapid, sizeof (snapid),
2386 (char *) &size_buf, sizeof (size_buf), NULL);
2387 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2391 *order = size_buf.order;
2392 *snap_size = le64_to_cpu(size_buf.size);
2394 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2395 (unsigned long long) snap_id, (unsigned int) *order,
2396 (unsigned long long) *snap_size);
2401 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2403 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2404 &rbd_dev->header.obj_order,
2405 &rbd_dev->header.image_size);
2408 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2414 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2418 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2419 "rbd", "get_object_prefix",
2421 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2422 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2425 ret = 0; /* rbd_req_sync_exec() can return positive */
2428 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2429 p + RBD_OBJ_PREFIX_LEN_MAX,
2432 if (IS_ERR(rbd_dev->header.object_prefix)) {
2433 ret = PTR_ERR(rbd_dev->header.object_prefix);
2434 rbd_dev->header.object_prefix = NULL;
2436 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2445 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2448 __le64 snapid = cpu_to_le64(snap_id);
2452 } features_buf = { 0 };
2456 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2457 "rbd", "get_features",
2458 (char *) &snapid, sizeof (snapid),
2459 (char *) &features_buf, sizeof (features_buf),
2461 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2465 incompat = le64_to_cpu(features_buf.incompat);
2466 if (incompat & ~RBD_FEATURES_ALL)
2469 *snap_features = le64_to_cpu(features_buf.features);
2471 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2472 (unsigned long long) snap_id,
2473 (unsigned long long) *snap_features,
2474 (unsigned long long) le64_to_cpu(features_buf.incompat));
2479 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2481 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2482 &rbd_dev->header.features);
2485 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2487 struct rbd_spec *parent_spec;
2489 void *reply_buf = NULL;
2497 parent_spec = rbd_spec_alloc();
2501 size = sizeof (__le64) + /* pool_id */
2502 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2503 sizeof (__le64) + /* snap_id */
2504 sizeof (__le64); /* overlap */
2505 reply_buf = kmalloc(size, GFP_KERNEL);
2511 snapid = cpu_to_le64(CEPH_NOSNAP);
2512 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2513 "rbd", "get_parent",
2514 (char *) &snapid, sizeof (snapid),
2515 (char *) reply_buf, size, NULL);
2516 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2522 end = (char *) reply_buf + size;
2523 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2524 if (parent_spec->pool_id == CEPH_NOPOOL)
2525 goto out; /* No parent? No problem. */
2527 /* The ceph file layout needs to fit pool id in 32 bits */
2530 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2533 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2534 if (IS_ERR(image_id)) {
2535 ret = PTR_ERR(image_id);
2538 parent_spec->image_id = image_id;
2539 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2540 ceph_decode_64_safe(&p, end, overlap, out_err);
2542 rbd_dev->parent_overlap = overlap;
2543 rbd_dev->parent_spec = parent_spec;
2544 parent_spec = NULL; /* rbd_dev now owns this */
2549 rbd_spec_put(parent_spec);
2554 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2556 size_t image_id_size;
2561 void *reply_buf = NULL;
2563 char *image_name = NULL;
2566 rbd_assert(!rbd_dev->spec->image_name);
2568 len = strlen(rbd_dev->spec->image_id);
2569 image_id_size = sizeof (__le32) + len;
2570 image_id = kmalloc(image_id_size, GFP_KERNEL);
2575 end = (char *) image_id + image_id_size;
2576 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2578 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2579 reply_buf = kmalloc(size, GFP_KERNEL);
2583 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2584 "rbd", "dir_get_name",
2585 image_id, image_id_size,
2586 (char *) reply_buf, size, NULL);
2590 end = (char *) reply_buf + size;
2591 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2592 if (IS_ERR(image_name))
2595 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2604 * When a parent image gets probed, we only have the pool, image,
2605 * and snapshot ids but not the names of any of them. This call
2606 * is made later to fill in those names. It has to be done after
2607 * rbd_dev_snaps_update() has completed because some of the
2608 * information (in particular, snapshot name) is not available
2611 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2613 struct ceph_osd_client *osdc;
2615 void *reply_buf = NULL;
2618 if (rbd_dev->spec->pool_name)
2619 return 0; /* Already have the names */
2621 /* Look up the pool name */
2623 osdc = &rbd_dev->rbd_client->client->osdc;
2624 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2626 rbd_warn(rbd_dev, "there is no pool with id %llu",
2627 rbd_dev->spec->pool_id); /* Really a BUG() */
2631 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2632 if (!rbd_dev->spec->pool_name)
2635 /* Fetch the image name; tolerate failure here */
2637 name = rbd_dev_image_name(rbd_dev);
2639 rbd_dev->spec->image_name = (char *) name;
2641 rbd_warn(rbd_dev, "unable to get image name");
2643 /* Look up the snapshot name. */
2645 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2647 rbd_warn(rbd_dev, "no snapshot with id %llu",
2648 rbd_dev->spec->snap_id); /* Really a BUG() */
2652 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2653 if(!rbd_dev->spec->snap_name)
2659 kfree(rbd_dev->spec->pool_name);
2660 rbd_dev->spec->pool_name = NULL;
2665 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2674 struct ceph_snap_context *snapc;
2678 * We'll need room for the seq value (maximum snapshot id),
2679 * snapshot count, and array of that many snapshot ids.
2680 * For now we have a fixed upper limit on the number we're
2681 * prepared to receive.
2683 size = sizeof (__le64) + sizeof (__le32) +
2684 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2685 reply_buf = kzalloc(size, GFP_KERNEL);
2689 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2690 "rbd", "get_snapcontext",
2692 reply_buf, size, ver);
2693 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2699 end = (char *) reply_buf + size;
2700 ceph_decode_64_safe(&p, end, seq, out);
2701 ceph_decode_32_safe(&p, end, snap_count, out);
2704 * Make sure the reported number of snapshot ids wouldn't go
2705 * beyond the end of our buffer. But before checking that,
2706 * make sure the computed size of the snapshot context we
2707 * allocate is representable in a size_t.
2709 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2714 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2717 size = sizeof (struct ceph_snap_context) +
2718 snap_count * sizeof (snapc->snaps[0]);
2719 snapc = kmalloc(size, GFP_KERNEL);
2725 atomic_set(&snapc->nref, 1);
2727 snapc->num_snaps = snap_count;
2728 for (i = 0; i < snap_count; i++)
2729 snapc->snaps[i] = ceph_decode_64(&p);
2731 rbd_dev->header.snapc = snapc;
2733 dout(" snap context seq = %llu, snap_count = %u\n",
2734 (unsigned long long) seq, (unsigned int) snap_count);
2742 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2752 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2753 reply_buf = kmalloc(size, GFP_KERNEL);
2755 return ERR_PTR(-ENOMEM);
2757 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2758 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2759 "rbd", "get_snapshot_name",
2760 (char *) &snap_id, sizeof (snap_id),
2761 reply_buf, size, NULL);
2762 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2767 end = (char *) reply_buf + size;
2768 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2769 if (IS_ERR(snap_name)) {
2770 ret = PTR_ERR(snap_name);
2773 dout(" snap_id 0x%016llx snap_name = %s\n",
2774 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2782 return ERR_PTR(ret);
2785 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2786 u64 *snap_size, u64 *snap_features)
2792 snap_id = rbd_dev->header.snapc->snaps[which];
2793 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2795 return ERR_PTR(ret);
2796 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2798 return ERR_PTR(ret);
2800 return rbd_dev_v2_snap_name(rbd_dev, which);
2803 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2804 u64 *snap_size, u64 *snap_features)
2806 if (rbd_dev->image_format == 1)
2807 return rbd_dev_v1_snap_info(rbd_dev, which,
2808 snap_size, snap_features);
2809 if (rbd_dev->image_format == 2)
2810 return rbd_dev_v2_snap_info(rbd_dev, which,
2811 snap_size, snap_features);
2812 return ERR_PTR(-EINVAL);
2815 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2820 down_write(&rbd_dev->header_rwsem);
2822 /* Grab old order first, to see if it changes */
2824 obj_order = rbd_dev->header.obj_order,
2825 ret = rbd_dev_v2_image_size(rbd_dev);
2828 if (rbd_dev->header.obj_order != obj_order) {
2832 rbd_update_mapping_size(rbd_dev);
2834 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2835 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2838 ret = rbd_dev_snaps_update(rbd_dev);
2839 dout("rbd_dev_snaps_update returned %d\n", ret);
2842 ret = rbd_dev_snaps_register(rbd_dev);
2843 dout("rbd_dev_snaps_register returned %d\n", ret);
2845 up_write(&rbd_dev->header_rwsem);
2851 * Scan the rbd device's current snapshot list and compare it to the
2852 * newly-received snapshot context. Remove any existing snapshots
2853 * not present in the new snapshot context. Add a new snapshot for
2854 * any snaphots in the snapshot context not in the current list.
2855 * And verify there are no changes to snapshots we already know
2858 * Assumes the snapshots in the snapshot context are sorted by
2859 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2860 * are also maintained in that order.)
2862 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2864 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2865 const u32 snap_count = snapc->num_snaps;
2866 struct list_head *head = &rbd_dev->snaps;
2867 struct list_head *links = head->next;
2870 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2871 while (index < snap_count || links != head) {
2873 struct rbd_snap *snap;
2876 u64 snap_features = 0;
2878 snap_id = index < snap_count ? snapc->snaps[index]
2880 snap = links != head ? list_entry(links, struct rbd_snap, node)
2882 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2884 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2885 struct list_head *next = links->next;
2887 /* Existing snapshot not in the new snap context */
2889 if (rbd_dev->spec->snap_id == snap->id)
2890 atomic_set(&rbd_dev->exists, 0);
2891 rbd_remove_snap_dev(snap);
2892 dout("%ssnap id %llu has been removed\n",
2893 rbd_dev->spec->snap_id == snap->id ?
2895 (unsigned long long) snap->id);
2897 /* Done with this list entry; advance */
2903 snap_name = rbd_dev_snap_info(rbd_dev, index,
2904 &snap_size, &snap_features);
2905 if (IS_ERR(snap_name))
2906 return PTR_ERR(snap_name);
2908 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2909 (unsigned long long) snap_id);
2910 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2911 struct rbd_snap *new_snap;
2913 /* We haven't seen this snapshot before */
2915 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2916 snap_id, snap_size, snap_features);
2917 if (IS_ERR(new_snap)) {
2918 int err = PTR_ERR(new_snap);
2920 dout(" failed to add dev, error %d\n", err);
2925 /* New goes before existing, or at end of list */
2927 dout(" added dev%s\n", snap ? "" : " at end\n");
2929 list_add_tail(&new_snap->node, &snap->node);
2931 list_add_tail(&new_snap->node, head);
2933 /* Already have this one */
2935 dout(" already present\n");
2937 rbd_assert(snap->size == snap_size);
2938 rbd_assert(!strcmp(snap->name, snap_name));
2939 rbd_assert(snap->features == snap_features);
2941 /* Done with this list entry; advance */
2943 links = links->next;
2946 /* Advance to the next entry in the snapshot context */
2950 dout("%s: done\n", __func__);
2956 * Scan the list of snapshots and register the devices for any that
2957 * have not already been registered.
2959 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2961 struct rbd_snap *snap;
2964 dout("%s called\n", __func__);
2965 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2968 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2969 if (!rbd_snap_registered(snap)) {
2970 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2975 dout("%s: returning %d\n", __func__, ret);
2980 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2985 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2987 dev = &rbd_dev->dev;
2988 dev->bus = &rbd_bus_type;
2989 dev->type = &rbd_device_type;
2990 dev->parent = &rbd_root_dev;
2991 dev->release = rbd_dev_release;
2992 dev_set_name(dev, "%d", rbd_dev->dev_id);
2993 ret = device_register(dev);
2995 mutex_unlock(&ctl_mutex);
3000 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3002 device_unregister(&rbd_dev->dev);
3005 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
3010 ret = rbd_req_sync_watch(rbd_dev, 1);
3011 if (ret == -ERANGE) {
3012 rc = rbd_dev_refresh(rbd_dev, NULL);
3016 } while (ret == -ERANGE);
3021 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3024 * Get a unique rbd identifier for the given new rbd_dev, and add
3025 * the rbd_dev to the global list. The minimum rbd id is 1.
3027 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3029 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3031 spin_lock(&rbd_dev_list_lock);
3032 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3033 spin_unlock(&rbd_dev_list_lock);
3034 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3035 (unsigned long long) rbd_dev->dev_id);
3039 * Remove an rbd_dev from the global list, and record that its
3040 * identifier is no longer in use.
3042 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3044 struct list_head *tmp;
3045 int rbd_id = rbd_dev->dev_id;
3048 rbd_assert(rbd_id > 0);
3050 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3051 (unsigned long long) rbd_dev->dev_id);
3052 spin_lock(&rbd_dev_list_lock);
3053 list_del_init(&rbd_dev->node);
3056 * If the id being "put" is not the current maximum, there
3057 * is nothing special we need to do.
3059 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3060 spin_unlock(&rbd_dev_list_lock);
3065 * We need to update the current maximum id. Search the
3066 * list to find out what it is. We're more likely to find
3067 * the maximum at the end, so search the list backward.
3070 list_for_each_prev(tmp, &rbd_dev_list) {
3071 struct rbd_device *rbd_dev;
3073 rbd_dev = list_entry(tmp, struct rbd_device, node);
3074 if (rbd_dev->dev_id > max_id)
3075 max_id = rbd_dev->dev_id;
3077 spin_unlock(&rbd_dev_list_lock);
3080 * The max id could have been updated by rbd_dev_id_get(), in
3081 * which case it now accurately reflects the new maximum.
3082 * Be careful not to overwrite the maximum value in that
3085 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3086 dout(" max dev id has been reset\n");
3090 * Skips over white space at *buf, and updates *buf to point to the
3091 * first found non-space character (if any). Returns the length of
3092 * the token (string of non-white space characters) found. Note
3093 * that *buf must be terminated with '\0'.
3095 static inline size_t next_token(const char **buf)
3098 * These are the characters that produce nonzero for
3099 * isspace() in the "C" and "POSIX" locales.
3101 const char *spaces = " \f\n\r\t\v";
3103 *buf += strspn(*buf, spaces); /* Find start of token */
3105 return strcspn(*buf, spaces); /* Return token length */
3109 * Finds the next token in *buf, and if the provided token buffer is
3110 * big enough, copies the found token into it. The result, if
3111 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3112 * must be terminated with '\0' on entry.
3114 * Returns the length of the token found (not including the '\0').
3115 * Return value will be 0 if no token is found, and it will be >=
3116 * token_size if the token would not fit.
3118 * The *buf pointer will be updated to point beyond the end of the
3119 * found token. Note that this occurs even if the token buffer is
3120 * too small to hold it.
3122 static inline size_t copy_token(const char **buf,
3128 len = next_token(buf);
3129 if (len < token_size) {
3130 memcpy(token, *buf, len);
3131 *(token + len) = '\0';
3139 * Finds the next token in *buf, dynamically allocates a buffer big
3140 * enough to hold a copy of it, and copies the token into the new
3141 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3142 * that a duplicate buffer is created even for a zero-length token.
3144 * Returns a pointer to the newly-allocated duplicate, or a null
3145 * pointer if memory for the duplicate was not available. If
3146 * the lenp argument is a non-null pointer, the length of the token
3147 * (not including the '\0') is returned in *lenp.
3149 * If successful, the *buf pointer will be updated to point beyond
3150 * the end of the found token.
3152 * Note: uses GFP_KERNEL for allocation.
3154 static inline char *dup_token(const char **buf, size_t *lenp)
3159 len = next_token(buf);
3160 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3163 *(dup + len) = '\0';
3173 * Parse the options provided for an "rbd add" (i.e., rbd image
3174 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3175 * and the data written is passed here via a NUL-terminated buffer.
3176 * Returns 0 if successful or an error code otherwise.
3178 * The information extracted from these options is recorded in
3179 * the other parameters which return dynamically-allocated
3182 * The address of a pointer that will refer to a ceph options
3183 * structure. Caller must release the returned pointer using
3184 * ceph_destroy_options() when it is no longer needed.
3186 * Address of an rbd options pointer. Fully initialized by
3187 * this function; caller must release with kfree().
3189 * Address of an rbd image specification pointer. Fully
3190 * initialized by this function based on parsed options.
3191 * Caller must release with rbd_spec_put().
3193 * The options passed take this form:
3194 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3197 * A comma-separated list of one or more monitor addresses.
3198 * A monitor address is an ip address, optionally followed
3199 * by a port number (separated by a colon).
3200 * I.e.: ip1[:port1][,ip2[:port2]...]
3202 * A comma-separated list of ceph and/or rbd options.
3204 * The name of the rados pool containing the rbd image.
3206 * The name of the image in that pool to map.
3208 * An optional snapshot id. If provided, the mapping will
3209 * present data from the image at the time that snapshot was
3210 * created. The image head is used if no snapshot id is
3211 * provided. Snapshot mappings are always read-only.
3213 static int rbd_add_parse_args(const char *buf,
3214 struct ceph_options **ceph_opts,
3215 struct rbd_options **opts,
3216 struct rbd_spec **rbd_spec)
3220 const char *mon_addrs;
3221 size_t mon_addrs_size;
3222 struct rbd_spec *spec = NULL;
3223 struct rbd_options *rbd_opts = NULL;
3224 struct ceph_options *copts;
3227 /* The first four tokens are required */
3229 len = next_token(&buf);
3231 rbd_warn(NULL, "no monitor address(es) provided");
3235 mon_addrs_size = len + 1;
3239 options = dup_token(&buf, NULL);
3243 rbd_warn(NULL, "no options provided");
3247 spec = rbd_spec_alloc();
3251 spec->pool_name = dup_token(&buf, NULL);
3252 if (!spec->pool_name)
3254 if (!*spec->pool_name) {
3255 rbd_warn(NULL, "no pool name provided");
3259 spec->image_name = dup_token(&buf, NULL);
3260 if (!spec->image_name)
3262 if (!*spec->image_name) {
3263 rbd_warn(NULL, "no image name provided");
3268 * Snapshot name is optional; default is to use "-"
3269 * (indicating the head/no snapshot).
3271 len = next_token(&buf);
3273 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3274 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3275 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3276 ret = -ENAMETOOLONG;
3279 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3280 if (!spec->snap_name)
3282 *(spec->snap_name + len) = '\0';
3284 /* Initialize all rbd options to the defaults */
3286 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3290 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3292 copts = ceph_parse_options(options, mon_addrs,
3293 mon_addrs + mon_addrs_size - 1,
3294 parse_rbd_opts_token, rbd_opts);
3295 if (IS_ERR(copts)) {
3296 ret = PTR_ERR(copts);
3317 * An rbd format 2 image has a unique identifier, distinct from the
3318 * name given to it by the user. Internally, that identifier is
3319 * what's used to specify the names of objects related to the image.
3321 * A special "rbd id" object is used to map an rbd image name to its
3322 * id. If that object doesn't exist, then there is no v2 rbd image
3323 * with the supplied name.
3325 * This function will record the given rbd_dev's image_id field if
3326 * it can be determined, and in that case will return 0. If any
3327 * errors occur a negative errno will be returned and the rbd_dev's
3328 * image_id field will be unchanged (and should be NULL).
3330 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3339 * When probing a parent image, the image id is already
3340 * known (and the image name likely is not). There's no
3341 * need to fetch the image id again in this case.
3343 if (rbd_dev->spec->image_id)
3347 * First, see if the format 2 image id file exists, and if
3348 * so, get the image's persistent id from it.
3350 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3351 object_name = kmalloc(size, GFP_NOIO);
3354 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3355 dout("rbd id object name is %s\n", object_name);
3357 /* Response will be an encoded string, which includes a length */
3359 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3360 response = kzalloc(size, GFP_NOIO);
3366 ret = rbd_req_sync_exec(rbd_dev, object_name,
3369 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3370 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3373 ret = 0; /* rbd_req_sync_exec() can return positive */
3376 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3377 p + RBD_IMAGE_ID_LEN_MAX,
3379 if (IS_ERR(rbd_dev->spec->image_id)) {
3380 ret = PTR_ERR(rbd_dev->spec->image_id);
3381 rbd_dev->spec->image_id = NULL;
3383 dout("image_id is %s\n", rbd_dev->spec->image_id);
3392 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3397 /* Version 1 images have no id; empty string is used */
3399 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3400 if (!rbd_dev->spec->image_id)
3403 /* Record the header object name for this rbd image. */
3405 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3406 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3407 if (!rbd_dev->header_name) {
3411 sprintf(rbd_dev->header_name, "%s%s",
3412 rbd_dev->spec->image_name, RBD_SUFFIX);
3414 /* Populate rbd image metadata */
3416 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3420 /* Version 1 images have no parent (no layering) */
3422 rbd_dev->parent_spec = NULL;
3423 rbd_dev->parent_overlap = 0;
3425 rbd_dev->image_format = 1;
3427 dout("discovered version 1 image, header name is %s\n",
3428 rbd_dev->header_name);
3433 kfree(rbd_dev->header_name);
3434 rbd_dev->header_name = NULL;
3435 kfree(rbd_dev->spec->image_id);
3436 rbd_dev->spec->image_id = NULL;
3441 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3448 * Image id was filled in by the caller. Record the header
3449 * object name for this rbd image.
3451 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3452 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3453 if (!rbd_dev->header_name)
3455 sprintf(rbd_dev->header_name, "%s%s",
3456 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3458 /* Get the size and object order for the image */
3460 ret = rbd_dev_v2_image_size(rbd_dev);
3464 /* Get the object prefix (a.k.a. block_name) for the image */
3466 ret = rbd_dev_v2_object_prefix(rbd_dev);
3470 /* Get the and check features for the image */
3472 ret = rbd_dev_v2_features(rbd_dev);
3476 /* If the image supports layering, get the parent info */
3478 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3479 ret = rbd_dev_v2_parent_info(rbd_dev);
3484 /* crypto and compression type aren't (yet) supported for v2 images */
3486 rbd_dev->header.crypt_type = 0;
3487 rbd_dev->header.comp_type = 0;
3489 /* Get the snapshot context, plus the header version */
3491 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3494 rbd_dev->header.obj_version = ver;
3496 rbd_dev->image_format = 2;
3498 dout("discovered version 2 image, header name is %s\n",
3499 rbd_dev->header_name);
3503 rbd_dev->parent_overlap = 0;
3504 rbd_spec_put(rbd_dev->parent_spec);
3505 rbd_dev->parent_spec = NULL;
3506 kfree(rbd_dev->header_name);
3507 rbd_dev->header_name = NULL;
3508 kfree(rbd_dev->header.object_prefix);
3509 rbd_dev->header.object_prefix = NULL;
3514 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3518 /* no need to lock here, as rbd_dev is not registered yet */
3519 ret = rbd_dev_snaps_update(rbd_dev);
3523 ret = rbd_dev_probe_update_spec(rbd_dev);
3527 ret = rbd_dev_set_mapping(rbd_dev);
3531 /* generate unique id: find highest unique id, add one */
3532 rbd_dev_id_get(rbd_dev);
3534 /* Fill in the device name, now that we have its id. */
3535 BUILD_BUG_ON(DEV_NAME_LEN
3536 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3537 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3539 /* Get our block major device number. */
3541 ret = register_blkdev(0, rbd_dev->name);
3544 rbd_dev->major = ret;
3546 /* Set up the blkdev mapping. */
3548 ret = rbd_init_disk(rbd_dev);
3550 goto err_out_blkdev;
3552 ret = rbd_bus_add_dev(rbd_dev);
3557 * At this point cleanup in the event of an error is the job
3558 * of the sysfs code (initiated by rbd_bus_del_dev()).
3560 down_write(&rbd_dev->header_rwsem);
3561 ret = rbd_dev_snaps_register(rbd_dev);
3562 up_write(&rbd_dev->header_rwsem);
3566 ret = rbd_init_watch_dev(rbd_dev);
3570 /* Everything's ready. Announce the disk to the world. */
3572 add_disk(rbd_dev->disk);
3574 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3575 (unsigned long long) rbd_dev->mapping.size);
3579 /* this will also clean up rest of rbd_dev stuff */
3581 rbd_bus_del_dev(rbd_dev);
3585 rbd_free_disk(rbd_dev);
3587 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3589 rbd_dev_id_put(rbd_dev);
3591 rbd_remove_all_snaps(rbd_dev);
3597 * Probe for the existence of the header object for the given rbd
3598 * device. For format 2 images this includes determining the image
3601 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3606 * Get the id from the image id object. If it's not a
3607 * format 2 image, we'll get ENOENT back, and we'll assume
3608 * it's a format 1 image.
3610 ret = rbd_dev_image_id(rbd_dev);
3612 ret = rbd_dev_v1_probe(rbd_dev);
3614 ret = rbd_dev_v2_probe(rbd_dev);
3616 dout("probe failed, returning %d\n", ret);
3621 ret = rbd_dev_probe_finish(rbd_dev);
3623 rbd_header_free(&rbd_dev->header);
3628 static ssize_t rbd_add(struct bus_type *bus,
3632 struct rbd_device *rbd_dev = NULL;
3633 struct ceph_options *ceph_opts = NULL;
3634 struct rbd_options *rbd_opts = NULL;
3635 struct rbd_spec *spec = NULL;
3636 struct rbd_client *rbdc;
3637 struct ceph_osd_client *osdc;
3640 if (!try_module_get(THIS_MODULE))
3643 /* parse add command */
3644 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3646 goto err_out_module;
3648 rbdc = rbd_get_client(ceph_opts);
3653 ceph_opts = NULL; /* rbd_dev client now owns this */
3656 osdc = &rbdc->client->osdc;
3657 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3659 goto err_out_client;
3660 spec->pool_id = (u64) rc;
3662 /* The ceph file layout needs to fit pool id in 32 bits */
3664 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
3666 goto err_out_client;
3669 rbd_dev = rbd_dev_create(rbdc, spec);
3671 goto err_out_client;
3672 rbdc = NULL; /* rbd_dev now owns this */
3673 spec = NULL; /* rbd_dev now owns this */
3675 rbd_dev->mapping.read_only = rbd_opts->read_only;
3677 rbd_opts = NULL; /* done with this */
3679 rc = rbd_dev_probe(rbd_dev);
3681 goto err_out_rbd_dev;
3685 rbd_dev_destroy(rbd_dev);
3687 rbd_put_client(rbdc);
3690 ceph_destroy_options(ceph_opts);
3694 module_put(THIS_MODULE);
3696 dout("Error adding device %s\n", buf);
3698 return (ssize_t) rc;
3701 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3703 struct list_head *tmp;
3704 struct rbd_device *rbd_dev;
3706 spin_lock(&rbd_dev_list_lock);
3707 list_for_each(tmp, &rbd_dev_list) {
3708 rbd_dev = list_entry(tmp, struct rbd_device, node);
3709 if (rbd_dev->dev_id == dev_id) {
3710 spin_unlock(&rbd_dev_list_lock);
3714 spin_unlock(&rbd_dev_list_lock);
3718 static void rbd_dev_release(struct device *dev)
3720 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3722 if (rbd_dev->watch_request) {
3723 struct ceph_client *client = rbd_dev->rbd_client->client;
3725 ceph_osdc_unregister_linger_request(&client->osdc,
3726 rbd_dev->watch_request);
3728 if (rbd_dev->watch_event)
3729 rbd_req_sync_watch(rbd_dev, 0);
3731 /* clean up and free blkdev */
3732 rbd_free_disk(rbd_dev);
3733 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3735 /* release allocated disk header fields */
3736 rbd_header_free(&rbd_dev->header);
3738 /* done with the id, and with the rbd_dev */
3739 rbd_dev_id_put(rbd_dev);
3740 rbd_assert(rbd_dev->rbd_client != NULL);
3741 rbd_dev_destroy(rbd_dev);
3743 /* release module ref */
3744 module_put(THIS_MODULE);
3747 static ssize_t rbd_remove(struct bus_type *bus,
3751 struct rbd_device *rbd_dev = NULL;
3756 rc = strict_strtoul(buf, 10, &ul);
3760 /* convert to int; abort if we lost anything in the conversion */
3761 target_id = (int) ul;
3762 if (target_id != ul)
3765 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3767 rbd_dev = __rbd_get_dev(target_id);
3773 if (rbd_dev->open_count) {
3778 rbd_remove_all_snaps(rbd_dev);
3779 rbd_bus_del_dev(rbd_dev);
3782 mutex_unlock(&ctl_mutex);
3788 * create control files in sysfs
3791 static int rbd_sysfs_init(void)
3795 ret = device_register(&rbd_root_dev);
3799 ret = bus_register(&rbd_bus_type);
3801 device_unregister(&rbd_root_dev);
3806 static void rbd_sysfs_cleanup(void)
3808 bus_unregister(&rbd_bus_type);
3809 device_unregister(&rbd_root_dev);
3812 int __init rbd_init(void)
3816 rc = rbd_sysfs_init();
3819 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3823 void __exit rbd_exit(void)
3825 rbd_sysfs_cleanup();
3828 module_init(rbd_init);
3829 module_exit(rbd_exit);
3831 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3832 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3833 MODULE_DESCRIPTION("rados block device");
3835 /* following authorship retained from original osdblk.c */
3836 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3838 MODULE_LICENSE("GPL");