2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have these defined elsewhere */
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
73 #define RBD_SNAP_HEAD_NAME "-"
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
83 #define RBD_FEATURE_LAYERING 1
85 /* Features supported by this (client software) implementation. */
87 #define RBD_FEATURES_ALL (0)
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
99 * block device image metadata (in-memory version)
101 struct rbd_image_header {
102 /* These four fields never change for a given rbd image */
109 /* The remaining fields need to be updated occasionally */
111 struct ceph_snap_context *snapc;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
157 * an instance of the client. multiple devices may share an rbd client.
160 struct ceph_client *client;
162 struct list_head node;
166 * a request completion status
168 struct rbd_req_status {
175 * a collection of requests
177 struct rbd_req_coll {
181 struct rbd_req_status status[0];
185 * a single io request
188 struct request *rq; /* blk layer request */
189 struct bio *bio; /* cloned bio */
190 struct page **pages; /* list of used pages */
193 struct rbd_req_coll *coll;
200 struct list_head node;
215 int dev_id; /* blkdev unique id */
217 int major; /* blkdev assigned major */
218 struct gendisk *disk; /* blkdev's gendisk and rq */
220 u32 image_format; /* Either 1 or 2 */
221 struct rbd_client *rbd_client;
223 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
225 spinlock_t lock; /* queue lock */
227 struct rbd_image_header header;
229 struct rbd_spec *spec;
233 struct ceph_file_layout layout;
235 struct ceph_osd_event *watch_event;
236 struct ceph_osd_request *watch_request;
238 struct rbd_spec *parent_spec;
241 /* protects updating the header */
242 struct rw_semaphore header_rwsem;
244 struct rbd_mapping mapping;
246 struct list_head node;
248 /* list of snapshots */
249 struct list_head snaps;
253 unsigned long open_count;
256 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
258 static LIST_HEAD(rbd_dev_list); /* devices */
259 static DEFINE_SPINLOCK(rbd_dev_list_lock);
261 static LIST_HEAD(rbd_client_list); /* clients */
262 static DEFINE_SPINLOCK(rbd_client_list_lock);
264 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
265 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
267 static void rbd_dev_release(struct device *dev);
268 static void rbd_remove_snap_dev(struct rbd_snap *snap);
270 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
272 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
275 static struct bus_attribute rbd_bus_attrs[] = {
276 __ATTR(add, S_IWUSR, NULL, rbd_add),
277 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
281 static struct bus_type rbd_bus_type = {
283 .bus_attrs = rbd_bus_attrs,
286 static void rbd_root_dev_release(struct device *dev)
290 static struct device rbd_root_dev = {
292 .release = rbd_root_dev_release,
295 static __printf(2, 3)
296 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
298 struct va_format vaf;
306 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
307 else if (rbd_dev->disk)
308 printk(KERN_WARNING "%s: %s: %pV\n",
309 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
310 else if (rbd_dev->spec && rbd_dev->spec->image_name)
311 printk(KERN_WARNING "%s: image %s: %pV\n",
312 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
313 else if (rbd_dev->spec && rbd_dev->spec->image_id)
314 printk(KERN_WARNING "%s: id %s: %pV\n",
315 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
317 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
318 RBD_DRV_NAME, rbd_dev, &vaf);
323 #define rbd_assert(expr) \
324 if (unlikely(!(expr))) { \
325 printk(KERN_ERR "\nAssertion failure in %s() " \
327 "\trbd_assert(%s);\n\n", \
328 __func__, __LINE__, #expr); \
331 #else /* !RBD_DEBUG */
332 # define rbd_assert(expr) ((void) 0)
333 #endif /* !RBD_DEBUG */
335 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
336 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
338 static int rbd_open(struct block_device *bdev, fmode_t mode)
340 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
342 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
345 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
346 (void) get_device(&rbd_dev->dev);
347 set_device_ro(bdev, rbd_dev->mapping.read_only);
348 rbd_dev->open_count++;
349 mutex_unlock(&ctl_mutex);
354 static int rbd_release(struct gendisk *disk, fmode_t mode)
356 struct rbd_device *rbd_dev = disk->private_data;
358 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
359 rbd_assert(rbd_dev->open_count > 0);
360 rbd_dev->open_count--;
361 put_device(&rbd_dev->dev);
362 mutex_unlock(&ctl_mutex);
367 static const struct block_device_operations rbd_bd_ops = {
368 .owner = THIS_MODULE,
370 .release = rbd_release,
374 * Initialize an rbd client instance.
377 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
379 struct rbd_client *rbdc;
382 dout("rbd_client_create\n");
383 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
387 kref_init(&rbdc->kref);
388 INIT_LIST_HEAD(&rbdc->node);
390 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
392 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
393 if (IS_ERR(rbdc->client))
395 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
397 ret = ceph_open_session(rbdc->client);
401 spin_lock(&rbd_client_list_lock);
402 list_add_tail(&rbdc->node, &rbd_client_list);
403 spin_unlock(&rbd_client_list_lock);
405 mutex_unlock(&ctl_mutex);
407 dout("rbd_client_create created %p\n", rbdc);
411 ceph_destroy_client(rbdc->client);
413 mutex_unlock(&ctl_mutex);
417 ceph_destroy_options(ceph_opts);
422 * Find a ceph client with specific addr and configuration. If
423 * found, bump its reference count.
425 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
427 struct rbd_client *client_node;
430 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
433 spin_lock(&rbd_client_list_lock);
434 list_for_each_entry(client_node, &rbd_client_list, node) {
435 if (!ceph_compare_options(ceph_opts, client_node->client)) {
436 kref_get(&client_node->kref);
441 spin_unlock(&rbd_client_list_lock);
443 return found ? client_node : NULL;
453 /* string args above */
456 /* Boolean args above */
460 static match_table_t rbd_opts_tokens = {
462 /* string args above */
463 {Opt_read_only, "read_only"},
464 {Opt_read_only, "ro"}, /* Alternate spelling */
465 {Opt_read_write, "read_write"},
466 {Opt_read_write, "rw"}, /* Alternate spelling */
467 /* Boolean args above */
475 #define RBD_READ_ONLY_DEFAULT false
477 static int parse_rbd_opts_token(char *c, void *private)
479 struct rbd_options *rbd_opts = private;
480 substring_t argstr[MAX_OPT_ARGS];
481 int token, intval, ret;
483 token = match_token(c, rbd_opts_tokens, argstr);
487 if (token < Opt_last_int) {
488 ret = match_int(&argstr[0], &intval);
490 pr_err("bad mount option arg (not int) "
494 dout("got int token %d val %d\n", token, intval);
495 } else if (token > Opt_last_int && token < Opt_last_string) {
496 dout("got string token %d val %s\n", token,
498 } else if (token > Opt_last_string && token < Opt_last_bool) {
499 dout("got Boolean token %d\n", token);
501 dout("got token %d\n", token);
506 rbd_opts->read_only = true;
509 rbd_opts->read_only = false;
519 * Get a ceph client with specific addr and configuration, if one does
520 * not exist create it.
522 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
524 struct rbd_client *rbdc;
526 rbdc = rbd_client_find(ceph_opts);
527 if (rbdc) /* using an existing client */
528 ceph_destroy_options(ceph_opts);
530 rbdc = rbd_client_create(ceph_opts);
536 * Destroy ceph client
538 * Caller must hold rbd_client_list_lock.
540 static void rbd_client_release(struct kref *kref)
542 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
544 dout("rbd_release_client %p\n", rbdc);
545 spin_lock(&rbd_client_list_lock);
546 list_del(&rbdc->node);
547 spin_unlock(&rbd_client_list_lock);
549 ceph_destroy_client(rbdc->client);
554 * Drop reference to ceph client node. If it's not referenced anymore, release
557 static void rbd_put_client(struct rbd_client *rbdc)
560 kref_put(&rbdc->kref, rbd_client_release);
564 * Destroy requests collection
566 static void rbd_coll_release(struct kref *kref)
568 struct rbd_req_coll *coll =
569 container_of(kref, struct rbd_req_coll, kref);
571 dout("rbd_coll_release %p\n", coll);
575 static bool rbd_image_format_valid(u32 image_format)
577 return image_format == 1 || image_format == 2;
580 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
585 /* The header has to start with the magic rbd header text */
586 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
589 /* The bio layer requires at least sector-sized I/O */
591 if (ondisk->options.order < SECTOR_SHIFT)
594 /* If we use u64 in a few spots we may be able to loosen this */
596 if (ondisk->options.order > 8 * sizeof (int) - 1)
600 * The size of a snapshot header has to fit in a size_t, and
601 * that limits the number of snapshots.
603 snap_count = le32_to_cpu(ondisk->snap_count);
604 size = SIZE_MAX - sizeof (struct ceph_snap_context);
605 if (snap_count > size / sizeof (__le64))
609 * Not only that, but the size of the entire the snapshot
610 * header must also be representable in a size_t.
612 size -= snap_count * sizeof (__le64);
613 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
620 * Create a new header structure, translate header format from the on-disk
623 static int rbd_header_from_disk(struct rbd_image_header *header,
624 struct rbd_image_header_ondisk *ondisk)
631 memset(header, 0, sizeof (*header));
633 snap_count = le32_to_cpu(ondisk->snap_count);
635 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
636 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
637 if (!header->object_prefix)
639 memcpy(header->object_prefix, ondisk->object_prefix, len);
640 header->object_prefix[len] = '\0';
643 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
645 /* Save a copy of the snapshot names */
647 if (snap_names_len > (u64) SIZE_MAX)
649 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
650 if (!header->snap_names)
653 * Note that rbd_dev_v1_header_read() guarantees
654 * the ondisk buffer we're working with has
655 * snap_names_len bytes beyond the end of the
656 * snapshot id array, this memcpy() is safe.
658 memcpy(header->snap_names, &ondisk->snaps[snap_count],
661 /* Record each snapshot's size */
663 size = snap_count * sizeof (*header->snap_sizes);
664 header->snap_sizes = kmalloc(size, GFP_KERNEL);
665 if (!header->snap_sizes)
667 for (i = 0; i < snap_count; i++)
668 header->snap_sizes[i] =
669 le64_to_cpu(ondisk->snaps[i].image_size);
671 WARN_ON(ondisk->snap_names_len);
672 header->snap_names = NULL;
673 header->snap_sizes = NULL;
676 header->features = 0; /* No features support in v1 images */
677 header->obj_order = ondisk->options.order;
678 header->crypt_type = ondisk->options.crypt_type;
679 header->comp_type = ondisk->options.comp_type;
681 /* Allocate and fill in the snapshot context */
683 header->image_size = le64_to_cpu(ondisk->image_size);
684 size = sizeof (struct ceph_snap_context);
685 size += snap_count * sizeof (header->snapc->snaps[0]);
686 header->snapc = kzalloc(size, GFP_KERNEL);
690 atomic_set(&header->snapc->nref, 1);
691 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
692 header->snapc->num_snaps = snap_count;
693 for (i = 0; i < snap_count; i++)
694 header->snapc->snaps[i] =
695 le64_to_cpu(ondisk->snaps[i].id);
700 kfree(header->snap_sizes);
701 header->snap_sizes = NULL;
702 kfree(header->snap_names);
703 header->snap_names = NULL;
704 kfree(header->object_prefix);
705 header->object_prefix = NULL;
710 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
712 struct rbd_snap *snap;
714 if (snap_id == CEPH_NOSNAP)
715 return RBD_SNAP_HEAD_NAME;
717 list_for_each_entry(snap, &rbd_dev->snaps, node)
718 if (snap_id == snap->id)
724 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
727 struct rbd_snap *snap;
729 list_for_each_entry(snap, &rbd_dev->snaps, node) {
730 if (!strcmp(snap_name, snap->name)) {
731 rbd_dev->spec->snap_id = snap->id;
732 rbd_dev->mapping.size = snap->size;
733 rbd_dev->mapping.features = snap->features;
742 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
746 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
747 sizeof (RBD_SNAP_HEAD_NAME))) {
748 rbd_dev->spec->snap_id = CEPH_NOSNAP;
749 rbd_dev->mapping.size = rbd_dev->header.image_size;
750 rbd_dev->mapping.features = rbd_dev->header.features;
753 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
756 rbd_dev->mapping.read_only = true;
758 atomic_set(&rbd_dev->exists, 1);
763 static void rbd_header_free(struct rbd_image_header *header)
765 kfree(header->object_prefix);
766 header->object_prefix = NULL;
767 kfree(header->snap_sizes);
768 header->snap_sizes = NULL;
769 kfree(header->snap_names);
770 header->snap_names = NULL;
771 ceph_put_snap_context(header->snapc);
772 header->snapc = NULL;
775 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
781 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
784 segment = offset >> rbd_dev->header.obj_order;
785 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
786 rbd_dev->header.object_prefix, segment);
787 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
788 pr_err("error formatting segment name for #%llu (%d)\n",
797 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
799 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
801 return offset & (segment_size - 1);
804 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
805 u64 offset, u64 length)
807 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
809 offset &= segment_size - 1;
811 rbd_assert(length <= U64_MAX - offset);
812 if (offset + length > segment_size)
813 length = segment_size - offset;
818 static int rbd_get_num_segments(struct rbd_image_header *header,
827 if (len - 1 > U64_MAX - ofs)
830 start_seg = ofs >> header->obj_order;
831 end_seg = (ofs + len - 1) >> header->obj_order;
833 result = end_seg - start_seg + 1;
834 if (result > (u64) INT_MAX)
841 * returns the size of an object in the image
843 static u64 rbd_obj_bytes(struct rbd_image_header *header)
845 return 1 << header->obj_order;
852 static void bio_chain_put(struct bio *chain)
858 chain = chain->bi_next;
864 * zeros a bio chain, starting at specific offset
866 static void zero_bio_chain(struct bio *chain, int start_ofs)
875 bio_for_each_segment(bv, chain, i) {
876 if (pos + bv->bv_len > start_ofs) {
877 int remainder = max(start_ofs - pos, 0);
878 buf = bvec_kmap_irq(bv, &flags);
879 memset(buf + remainder, 0,
880 bv->bv_len - remainder);
881 bvec_kunmap_irq(buf, &flags);
886 chain = chain->bi_next;
891 * Clone a portion of a bio, starting at the given byte offset
892 * and continuing for the number of bytes indicated.
894 static struct bio *bio_clone_range(struct bio *bio_src,
903 unsigned short end_idx;
907 /* Handle the easy case for the caller */
909 if (!offset && len == bio_src->bi_size)
910 return bio_clone(bio_src, gfpmask);
912 if (WARN_ON_ONCE(!len))
914 if (WARN_ON_ONCE(len > bio_src->bi_size))
916 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
919 /* Find first affected segment... */
922 __bio_for_each_segment(bv, bio_src, idx, 0) {
923 if (resid < bv->bv_len)
929 /* ...and the last affected segment */
932 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
933 if (resid <= bv->bv_len)
937 vcnt = end_idx - idx + 1;
939 /* Build the clone */
941 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
943 return NULL; /* ENOMEM */
945 bio->bi_bdev = bio_src->bi_bdev;
946 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
947 bio->bi_rw = bio_src->bi_rw;
948 bio->bi_flags |= 1 << BIO_CLONED;
951 * Copy over our part of the bio_vec, then update the first
952 * and last (or only) entries.
954 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
955 vcnt * sizeof (struct bio_vec));
956 bio->bi_io_vec[0].bv_offset += voff;
958 bio->bi_io_vec[0].bv_len -= voff;
959 bio->bi_io_vec[vcnt - 1].bv_len = resid;
961 bio->bi_io_vec[0].bv_len = len;
972 * Clone a portion of a bio chain, starting at the given byte offset
973 * into the first bio in the source chain and continuing for the
974 * number of bytes indicated. The result is another bio chain of
975 * exactly the given length, or a null pointer on error.
977 * The bio_src and offset parameters are both in-out. On entry they
978 * refer to the first source bio and the offset into that bio where
979 * the start of data to be cloned is located.
981 * On return, bio_src is updated to refer to the bio in the source
982 * chain that contains first un-cloned byte, and *offset will
983 * contain the offset of that byte within that bio.
985 static struct bio *bio_chain_clone_range(struct bio **bio_src,
986 unsigned int *offset,
990 struct bio *bi = *bio_src;
991 unsigned int off = *offset;
992 struct bio *chain = NULL;
995 /* Build up a chain of clone bios up to the limit */
997 if (!bi || off >= bi->bi_size || !len)
998 return NULL; /* Nothing to clone */
1002 unsigned int bi_size;
1006 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1007 goto out_err; /* EINVAL; ran out of bio's */
1009 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1010 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1012 goto out_err; /* ENOMEM */
1015 end = &bio->bi_next;
1018 if (off == bi->bi_size) {
1029 bio_chain_put(chain);
1034 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1036 struct ceph_osd_req_op *op;
1040 op = kzalloc(sizeof (*op), GFP_NOIO);
1044 va_start(args, opcode);
1046 case CEPH_OSD_OP_READ:
1047 case CEPH_OSD_OP_WRITE:
1048 /* rbd_osd_req_op_create(READ, offset, length) */
1049 /* rbd_osd_req_op_create(WRITE, offset, length) */
1050 op->extent.offset = va_arg(args, u64);
1051 op->extent.length = va_arg(args, u64);
1052 if (opcode == CEPH_OSD_OP_WRITE)
1053 op->payload_len = op->extent.length;
1055 case CEPH_OSD_OP_CALL:
1056 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1057 op->cls.class_name = va_arg(args, char *);
1058 size = strlen(op->cls.class_name);
1059 rbd_assert(size <= (size_t) U8_MAX);
1060 op->cls.class_len = size;
1061 op->payload_len = size;
1063 op->cls.method_name = va_arg(args, char *);
1064 size = strlen(op->cls.method_name);
1065 rbd_assert(size <= (size_t) U8_MAX);
1066 op->cls.method_len = size;
1067 op->payload_len += size;
1070 op->cls.indata = va_arg(args, void *);
1071 size = va_arg(args, size_t);
1072 rbd_assert(size <= (size_t) U32_MAX);
1073 op->cls.indata_len = (u32) size;
1074 op->payload_len += size;
1076 case CEPH_OSD_OP_NOTIFY_ACK:
1077 case CEPH_OSD_OP_WATCH:
1078 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1079 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1080 op->watch.cookie = va_arg(args, u64);
1081 op->watch.ver = va_arg(args, u64);
1082 op->watch.ver = cpu_to_le64(op->watch.ver);
1083 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1084 op->watch.flag = (u8) 1;
1087 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1097 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1102 static void rbd_coll_end_req_index(struct request *rq,
1103 struct rbd_req_coll *coll,
1107 struct request_queue *q;
1110 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1111 coll, index, (int)ret, (unsigned long long)len);
1117 blk_end_request(rq, ret, len);
1123 spin_lock_irq(q->queue_lock);
1124 coll->status[index].done = 1;
1125 coll->status[index].rc = ret;
1126 coll->status[index].bytes = len;
1127 max = min = coll->num_done;
1128 while (max < coll->total && coll->status[max].done)
1131 for (i = min; i<max; i++) {
1132 __blk_end_request(rq, (int)coll->status[i].rc,
1133 coll->status[i].bytes);
1135 kref_put(&coll->kref, rbd_coll_release);
1137 spin_unlock_irq(q->queue_lock);
1140 static void rbd_coll_end_req(struct rbd_request *rbd_req,
1143 rbd_coll_end_req_index(rbd_req->rq,
1144 rbd_req->coll, rbd_req->coll_index,
1149 * Send ceph osd request
1151 static int rbd_do_request(struct request *rq,
1152 struct rbd_device *rbd_dev,
1153 struct ceph_snap_context *snapc,
1155 const char *object_name, u64 ofs, u64 len,
1157 struct page **pages,
1160 struct ceph_osd_req_op *op,
1161 struct rbd_req_coll *coll,
1163 void (*rbd_cb)(struct ceph_osd_request *,
1167 struct ceph_osd_client *osdc;
1168 struct ceph_osd_request *osd_req;
1169 struct rbd_request *rbd_req = NULL;
1170 struct timespec mtime = CURRENT_TIME;
1173 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1174 object_name, (unsigned long long) ofs,
1175 (unsigned long long) len, coll, coll_index);
1177 osdc = &rbd_dev->rbd_client->client->osdc;
1178 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
1182 osd_req->r_flags = flags;
1183 osd_req->r_pages = pages;
1185 osd_req->r_bio = bio;
1186 bio_get(osd_req->r_bio);
1191 rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
1197 rbd_req->pages = pages;
1199 rbd_req->coll = coll;
1200 rbd_req->coll_index = coll_index;
1203 osd_req->r_callback = rbd_cb;
1204 osd_req->r_priv = rbd_req;
1206 strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
1207 osd_req->r_oid_len = strlen(osd_req->r_oid);
1209 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1210 osd_req->r_num_pages = calc_pages_for(ofs, len);
1211 osd_req->r_page_alignment = ofs & ~PAGE_MASK;
1213 ceph_osdc_build_request(osd_req, ofs, len, 1, op,
1214 snapc, snapid, &mtime);
1216 if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
1217 ceph_osdc_set_request_linger(osdc, osd_req);
1218 rbd_dev->watch_request = osd_req;
1221 ret = ceph_osdc_start_request(osdc, osd_req, false);
1228 ret = ceph_osdc_wait_request(osdc, osd_req);
1229 version = le64_to_cpu(osd_req->r_reassert_version.version);
1232 dout("reassert_ver=%llu\n", (unsigned long long) version);
1233 ceph_osdc_put_request(osd_req);
1239 bio_chain_put(osd_req->r_bio);
1242 ceph_osdc_put_request(osd_req);
1248 * Ceph osd op callback
1250 static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
1252 struct rbd_request *rbd_req = osd_req->r_priv;
1253 struct ceph_osd_reply_head *replyhead;
1254 struct ceph_osd_op *op;
1260 replyhead = msg->front.iov_base;
1261 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1262 op = (void *)(replyhead + 1);
1263 rc = (s32)le32_to_cpu(replyhead->result);
1264 bytes = le64_to_cpu(op->extent.length);
1265 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1267 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1268 (unsigned long long) bytes, read_op, (int) rc);
1270 if (rc == (s32)-ENOENT && read_op) {
1271 zero_bio_chain(rbd_req->bio, 0);
1273 } else if (rc == 0 && read_op && bytes < rbd_req->len) {
1274 zero_bio_chain(rbd_req->bio, bytes);
1275 bytes = rbd_req->len;
1278 rbd_coll_end_req(rbd_req, rc, bytes);
1281 bio_chain_put(rbd_req->bio);
1283 ceph_osdc_put_request(osd_req);
1287 static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
1288 struct ceph_msg *msg)
1290 ceph_osdc_put_request(osd_req);
1294 * Do a synchronous ceph osd operation
1296 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1298 struct ceph_osd_req_op *op,
1299 const char *object_name,
1300 u64 ofs, u64 inbound_size,
1305 struct page **pages;
1308 rbd_assert(op != NULL);
1310 num_pages = calc_pages_for(ofs, inbound_size);
1311 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1313 return PTR_ERR(pages);
1315 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1316 object_name, ofs, inbound_size, NULL,
1326 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1327 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1330 ceph_release_page_vector(pages, num_pages);
1335 * Do an asynchronous ceph osd operation
1337 static int rbd_do_op(struct request *rq,
1338 struct rbd_device *rbd_dev,
1339 struct ceph_snap_context *snapc,
1342 struct rbd_req_coll *coll,
1345 const char *seg_name;
1349 struct ceph_osd_req_op *op;
1354 seg_name = rbd_segment_name(rbd_dev, ofs);
1357 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1358 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1360 if (rq_data_dir(rq) == WRITE) {
1361 opcode = CEPH_OSD_OP_WRITE;
1362 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1363 snapid = CEPH_NOSNAP;
1365 opcode = CEPH_OSD_OP_READ;
1366 flags = CEPH_OSD_FLAG_READ;
1368 snapid = rbd_dev->spec->snap_id;
1372 op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
1376 /* we've taken care of segment sizes earlier when we
1377 cloned the bios. We should never have a segment
1378 truncated at this point */
1379 rbd_assert(seg_len == len);
1381 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1382 seg_name, seg_ofs, seg_len,
1390 rbd_coll_end_req_index(rq, coll, coll_index,
1392 rbd_osd_req_op_destroy(op);
1399 * Request sync osd read
1401 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1402 const char *object_name,
1407 struct ceph_osd_req_op *op;
1410 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
1414 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
1415 op, object_name, ofs, len, buf, ver);
1416 rbd_osd_req_op_destroy(op);
1422 * Request sync osd watch
1424 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1428 struct ceph_osd_req_op *op;
1431 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1435 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1436 rbd_dev->header_name, 0, 0, NULL,
1441 rbd_simple_req_cb, NULL);
1443 rbd_osd_req_op_destroy(op);
1448 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1450 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1457 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1458 rbd_dev->header_name, (unsigned long long) notify_id,
1459 (unsigned int) opcode);
1460 rc = rbd_dev_refresh(rbd_dev, &hver);
1462 rbd_warn(rbd_dev, "got notification but failed to "
1463 " update snaps: %d\n", rc);
1465 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1469 * Request sync osd watch/unwatch. The value of "start" determines
1470 * whether a watch request is being initiated or torn down.
1472 static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
1474 struct ceph_osd_req_op *op;
1477 rbd_assert(start ^ !!rbd_dev->watch_event);
1478 rbd_assert(start ^ !!rbd_dev->watch_request);
1481 struct ceph_osd_client *osdc;
1483 osdc = &rbd_dev->rbd_client->client->osdc;
1484 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1485 &rbd_dev->watch_event);
1490 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1491 rbd_dev->watch_event->cookie,
1492 rbd_dev->header.obj_version, start);
1494 ret = rbd_req_sync_op(rbd_dev,
1495 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1496 op, rbd_dev->header_name,
1499 /* Cancel the event if we're tearing down, or on error */
1501 if (!start || !op || ret < 0) {
1502 ceph_osdc_cancel_event(rbd_dev->watch_event);
1503 rbd_dev->watch_event = NULL;
1505 rbd_osd_req_op_destroy(op);
1511 * Synchronous osd object method call
1513 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1514 const char *object_name,
1515 const char *class_name,
1516 const char *method_name,
1517 const char *outbound,
1518 size_t outbound_size,
1520 size_t inbound_size,
1523 struct ceph_osd_req_op *op;
1527 * Any input parameters required by the method we're calling
1528 * will be sent along with the class and method names as
1529 * part of the message payload. That data and its size are
1530 * supplied via the indata and indata_len fields (named from
1531 * the perspective of the server side) in the OSD request
1534 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1535 method_name, outbound, outbound_size);
1539 ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
1540 object_name, 0, inbound_size, inbound,
1543 rbd_osd_req_op_destroy(op);
1545 dout("cls_exec returned %d\n", ret);
1549 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1551 struct rbd_req_coll *coll =
1552 kzalloc(sizeof(struct rbd_req_coll) +
1553 sizeof(struct rbd_req_status) * num_reqs,
1558 coll->total = num_reqs;
1559 kref_init(&coll->kref);
1563 static int rbd_dev_do_request(struct request *rq,
1564 struct rbd_device *rbd_dev,
1565 struct ceph_snap_context *snapc,
1566 u64 ofs, unsigned int size,
1567 struct bio *bio_chain)
1570 struct rbd_req_coll *coll;
1571 unsigned int bio_offset;
1574 dout("%s 0x%x bytes at 0x%llx\n",
1575 rq_data_dir(rq) == WRITE ? "write" : "read",
1576 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1578 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1582 coll = rbd_alloc_coll(num_segs);
1588 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1589 unsigned int clone_size;
1590 struct bio *bio_clone;
1592 BUG_ON(limit > (u64)UINT_MAX);
1593 clone_size = (unsigned int)limit;
1594 dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
1596 kref_get(&coll->kref);
1598 /* Pass a cloned bio chain via an osd request */
1600 bio_clone = bio_chain_clone_range(&bio_chain,
1601 &bio_offset, clone_size,
1604 (void)rbd_do_op(rq, rbd_dev, snapc,
1606 bio_clone, coll, cur_seg);
1608 rbd_coll_end_req_index(rq, coll, cur_seg,
1616 kref_put(&coll->kref, rbd_coll_release);
1622 * block device queue callback
1624 static void rbd_rq_fn(struct request_queue *q)
1626 struct rbd_device *rbd_dev = q->queuedata;
1627 bool read_only = rbd_dev->mapping.read_only;
1630 while ((rq = blk_fetch_request(q))) {
1631 struct ceph_snap_context *snapc = NULL;
1632 unsigned int size = 0;
1635 dout("fetched request\n");
1637 /* Filter out block requests we don't understand */
1639 if ((rq->cmd_type != REQ_TYPE_FS)) {
1640 __blk_end_request_all(rq, 0);
1643 spin_unlock_irq(q->queue_lock);
1645 /* Write requests need a reference to the snapshot context */
1647 if (rq_data_dir(rq) == WRITE) {
1649 if (read_only) /* Can't write to a read-only device */
1650 goto out_end_request;
1653 * Note that each osd request will take its
1654 * own reference to the snapshot context
1655 * supplied. The reference we take here
1656 * just guarantees the one we provide stays
1659 down_read(&rbd_dev->header_rwsem);
1660 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1661 up_read(&rbd_dev->header_rwsem);
1662 rbd_assert(snapc != NULL);
1663 } else if (!atomic_read(&rbd_dev->exists)) {
1664 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1665 dout("request for non-existent snapshot");
1667 goto out_end_request;
1670 size = blk_rq_bytes(rq);
1671 result = rbd_dev_do_request(rq, rbd_dev, snapc,
1672 blk_rq_pos(rq) * SECTOR_SIZE,
1676 ceph_put_snap_context(snapc);
1677 spin_lock_irq(q->queue_lock);
1678 if (!size || result < 0)
1679 __blk_end_request_all(rq, result);
1684 * a queue callback. Makes sure that we don't create a bio that spans across
1685 * multiple osd objects. One exception would be with a single page bios,
1686 * which we handle later at bio_chain_clone_range()
1688 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1689 struct bio_vec *bvec)
1691 struct rbd_device *rbd_dev = q->queuedata;
1692 sector_t sector_offset;
1693 sector_t sectors_per_obj;
1694 sector_t obj_sector_offset;
1698 * Find how far into its rbd object the partition-relative
1699 * bio start sector is to offset relative to the enclosing
1702 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1703 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1704 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1707 * Compute the number of bytes from that offset to the end
1708 * of the object. Account for what's already used by the bio.
1710 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1711 if (ret > bmd->bi_size)
1712 ret -= bmd->bi_size;
1717 * Don't send back more than was asked for. And if the bio
1718 * was empty, let the whole thing through because: "Note
1719 * that a block device *must* allow a single page to be
1720 * added to an empty bio."
1722 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1723 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1724 ret = (int) bvec->bv_len;
1729 static void rbd_free_disk(struct rbd_device *rbd_dev)
1731 struct gendisk *disk = rbd_dev->disk;
1736 if (disk->flags & GENHD_FL_UP)
1739 blk_cleanup_queue(disk->queue);
1744 * Read the complete header for the given rbd device.
1746 * Returns a pointer to a dynamically-allocated buffer containing
1747 * the complete and validated header. Caller can pass the address
1748 * of a variable that will be filled in with the version of the
1749 * header object at the time it was read.
1751 * Returns a pointer-coded errno if a failure occurs.
1753 static struct rbd_image_header_ondisk *
1754 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1756 struct rbd_image_header_ondisk *ondisk = NULL;
1763 * The complete header will include an array of its 64-bit
1764 * snapshot ids, followed by the names of those snapshots as
1765 * a contiguous block of NUL-terminated strings. Note that
1766 * the number of snapshots could change by the time we read
1767 * it in, in which case we re-read it.
1774 size = sizeof (*ondisk);
1775 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1777 ondisk = kmalloc(size, GFP_KERNEL);
1779 return ERR_PTR(-ENOMEM);
1781 ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
1783 (char *) ondisk, version);
1787 if (WARN_ON((size_t) ret < size)) {
1789 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
1793 if (!rbd_dev_ondisk_valid(ondisk)) {
1795 rbd_warn(rbd_dev, "invalid header");
1799 names_size = le64_to_cpu(ondisk->snap_names_len);
1800 want_count = snap_count;
1801 snap_count = le32_to_cpu(ondisk->snap_count);
1802 } while (snap_count != want_count);
1809 return ERR_PTR(ret);
1813 * reload the ondisk the header
1815 static int rbd_read_header(struct rbd_device *rbd_dev,
1816 struct rbd_image_header *header)
1818 struct rbd_image_header_ondisk *ondisk;
1822 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1824 return PTR_ERR(ondisk);
1825 ret = rbd_header_from_disk(header, ondisk);
1827 header->obj_version = ver;
1833 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1835 struct rbd_snap *snap;
1836 struct rbd_snap *next;
1838 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1839 rbd_remove_snap_dev(snap);
1842 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1846 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1849 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1850 dout("setting size to %llu sectors", (unsigned long long) size);
1851 rbd_dev->mapping.size = (u64) size;
1852 set_capacity(rbd_dev->disk, size);
1856 * only read the first part of the ondisk header, without the snaps info
1858 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1861 struct rbd_image_header h;
1863 ret = rbd_read_header(rbd_dev, &h);
1867 down_write(&rbd_dev->header_rwsem);
1869 /* Update image size, and check for resize of mapped image */
1870 rbd_dev->header.image_size = h.image_size;
1871 rbd_update_mapping_size(rbd_dev);
1873 /* rbd_dev->header.object_prefix shouldn't change */
1874 kfree(rbd_dev->header.snap_sizes);
1875 kfree(rbd_dev->header.snap_names);
1876 /* osd requests may still refer to snapc */
1877 ceph_put_snap_context(rbd_dev->header.snapc);
1880 *hver = h.obj_version;
1881 rbd_dev->header.obj_version = h.obj_version;
1882 rbd_dev->header.image_size = h.image_size;
1883 rbd_dev->header.snapc = h.snapc;
1884 rbd_dev->header.snap_names = h.snap_names;
1885 rbd_dev->header.snap_sizes = h.snap_sizes;
1886 /* Free the extra copy of the object prefix */
1887 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1888 kfree(h.object_prefix);
1890 ret = rbd_dev_snaps_update(rbd_dev);
1892 ret = rbd_dev_snaps_register(rbd_dev);
1894 up_write(&rbd_dev->header_rwsem);
1899 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1903 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1904 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1905 if (rbd_dev->image_format == 1)
1906 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1908 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1909 mutex_unlock(&ctl_mutex);
1914 static int rbd_init_disk(struct rbd_device *rbd_dev)
1916 struct gendisk *disk;
1917 struct request_queue *q;
1920 /* create gendisk info */
1921 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1925 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1927 disk->major = rbd_dev->major;
1928 disk->first_minor = 0;
1929 disk->fops = &rbd_bd_ops;
1930 disk->private_data = rbd_dev;
1933 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1937 /* We use the default size, but let's be explicit about it. */
1938 blk_queue_physical_block_size(q, SECTOR_SIZE);
1940 /* set io sizes to object size */
1941 segment_size = rbd_obj_bytes(&rbd_dev->header);
1942 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1943 blk_queue_max_segment_size(q, segment_size);
1944 blk_queue_io_min(q, segment_size);
1945 blk_queue_io_opt(q, segment_size);
1947 blk_queue_merge_bvec(q, rbd_merge_bvec);
1950 q->queuedata = rbd_dev;
1952 rbd_dev->disk = disk;
1954 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1967 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1969 return container_of(dev, struct rbd_device, dev);
1972 static ssize_t rbd_size_show(struct device *dev,
1973 struct device_attribute *attr, char *buf)
1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1978 down_read(&rbd_dev->header_rwsem);
1979 size = get_capacity(rbd_dev->disk);
1980 up_read(&rbd_dev->header_rwsem);
1982 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1986 * Note this shows the features for whatever's mapped, which is not
1987 * necessarily the base image.
1989 static ssize_t rbd_features_show(struct device *dev,
1990 struct device_attribute *attr, char *buf)
1992 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1994 return sprintf(buf, "0x%016llx\n",
1995 (unsigned long long) rbd_dev->mapping.features);
1998 static ssize_t rbd_major_show(struct device *dev,
1999 struct device_attribute *attr, char *buf)
2001 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2003 return sprintf(buf, "%d\n", rbd_dev->major);
2006 static ssize_t rbd_client_id_show(struct device *dev,
2007 struct device_attribute *attr, char *buf)
2009 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2011 return sprintf(buf, "client%lld\n",
2012 ceph_client_id(rbd_dev->rbd_client->client));
2015 static ssize_t rbd_pool_show(struct device *dev,
2016 struct device_attribute *attr, char *buf)
2018 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2020 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2023 static ssize_t rbd_pool_id_show(struct device *dev,
2024 struct device_attribute *attr, char *buf)
2026 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2028 return sprintf(buf, "%llu\n",
2029 (unsigned long long) rbd_dev->spec->pool_id);
2032 static ssize_t rbd_name_show(struct device *dev,
2033 struct device_attribute *attr, char *buf)
2035 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2037 if (rbd_dev->spec->image_name)
2038 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2040 return sprintf(buf, "(unknown)\n");
2043 static ssize_t rbd_image_id_show(struct device *dev,
2044 struct device_attribute *attr, char *buf)
2046 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2048 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2052 * Shows the name of the currently-mapped snapshot (or
2053 * RBD_SNAP_HEAD_NAME for the base image).
2055 static ssize_t rbd_snap_show(struct device *dev,
2056 struct device_attribute *attr,
2059 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2061 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2065 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2066 * for the parent image. If there is no parent, simply shows
2067 * "(no parent image)".
2069 static ssize_t rbd_parent_show(struct device *dev,
2070 struct device_attribute *attr,
2073 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2074 struct rbd_spec *spec = rbd_dev->parent_spec;
2079 return sprintf(buf, "(no parent image)\n");
2081 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2082 (unsigned long long) spec->pool_id, spec->pool_name);
2087 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2088 spec->image_name ? spec->image_name : "(unknown)");
2093 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2094 (unsigned long long) spec->snap_id, spec->snap_name);
2099 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2104 return (ssize_t) (bufp - buf);
2107 static ssize_t rbd_image_refresh(struct device *dev,
2108 struct device_attribute *attr,
2112 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2115 ret = rbd_dev_refresh(rbd_dev, NULL);
2117 return ret < 0 ? ret : size;
2120 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2121 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2122 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2123 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2124 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2125 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2126 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2127 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2128 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2129 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2130 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2132 static struct attribute *rbd_attrs[] = {
2133 &dev_attr_size.attr,
2134 &dev_attr_features.attr,
2135 &dev_attr_major.attr,
2136 &dev_attr_client_id.attr,
2137 &dev_attr_pool.attr,
2138 &dev_attr_pool_id.attr,
2139 &dev_attr_name.attr,
2140 &dev_attr_image_id.attr,
2141 &dev_attr_current_snap.attr,
2142 &dev_attr_parent.attr,
2143 &dev_attr_refresh.attr,
2147 static struct attribute_group rbd_attr_group = {
2151 static const struct attribute_group *rbd_attr_groups[] = {
2156 static void rbd_sysfs_dev_release(struct device *dev)
2160 static struct device_type rbd_device_type = {
2162 .groups = rbd_attr_groups,
2163 .release = rbd_sysfs_dev_release,
2171 static ssize_t rbd_snap_size_show(struct device *dev,
2172 struct device_attribute *attr,
2175 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2177 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2180 static ssize_t rbd_snap_id_show(struct device *dev,
2181 struct device_attribute *attr,
2184 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2186 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2189 static ssize_t rbd_snap_features_show(struct device *dev,
2190 struct device_attribute *attr,
2193 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2195 return sprintf(buf, "0x%016llx\n",
2196 (unsigned long long) snap->features);
2199 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2200 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2201 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2203 static struct attribute *rbd_snap_attrs[] = {
2204 &dev_attr_snap_size.attr,
2205 &dev_attr_snap_id.attr,
2206 &dev_attr_snap_features.attr,
2210 static struct attribute_group rbd_snap_attr_group = {
2211 .attrs = rbd_snap_attrs,
2214 static void rbd_snap_dev_release(struct device *dev)
2216 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2221 static const struct attribute_group *rbd_snap_attr_groups[] = {
2222 &rbd_snap_attr_group,
2226 static struct device_type rbd_snap_device_type = {
2227 .groups = rbd_snap_attr_groups,
2228 .release = rbd_snap_dev_release,
2231 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2233 kref_get(&spec->kref);
2238 static void rbd_spec_free(struct kref *kref);
2239 static void rbd_spec_put(struct rbd_spec *spec)
2242 kref_put(&spec->kref, rbd_spec_free);
2245 static struct rbd_spec *rbd_spec_alloc(void)
2247 struct rbd_spec *spec;
2249 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2252 kref_init(&spec->kref);
2254 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2259 static void rbd_spec_free(struct kref *kref)
2261 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2263 kfree(spec->pool_name);
2264 kfree(spec->image_id);
2265 kfree(spec->image_name);
2266 kfree(spec->snap_name);
2270 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2271 struct rbd_spec *spec)
2273 struct rbd_device *rbd_dev;
2275 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2279 spin_lock_init(&rbd_dev->lock);
2280 atomic_set(&rbd_dev->exists, 0);
2281 INIT_LIST_HEAD(&rbd_dev->node);
2282 INIT_LIST_HEAD(&rbd_dev->snaps);
2283 init_rwsem(&rbd_dev->header_rwsem);
2285 rbd_dev->spec = spec;
2286 rbd_dev->rbd_client = rbdc;
2288 /* Initialize the layout used for all rbd requests */
2290 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2291 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2292 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2293 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2298 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2300 rbd_spec_put(rbd_dev->parent_spec);
2301 kfree(rbd_dev->header_name);
2302 rbd_put_client(rbd_dev->rbd_client);
2303 rbd_spec_put(rbd_dev->spec);
2307 static bool rbd_snap_registered(struct rbd_snap *snap)
2309 bool ret = snap->dev.type == &rbd_snap_device_type;
2310 bool reg = device_is_registered(&snap->dev);
2312 rbd_assert(!ret ^ reg);
2317 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2319 list_del(&snap->node);
2320 if (device_is_registered(&snap->dev))
2321 device_unregister(&snap->dev);
2324 static int rbd_register_snap_dev(struct rbd_snap *snap,
2325 struct device *parent)
2327 struct device *dev = &snap->dev;
2330 dev->type = &rbd_snap_device_type;
2331 dev->parent = parent;
2332 dev->release = rbd_snap_dev_release;
2333 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2334 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2336 ret = device_register(dev);
2341 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2342 const char *snap_name,
2343 u64 snap_id, u64 snap_size,
2346 struct rbd_snap *snap;
2349 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2351 return ERR_PTR(-ENOMEM);
2354 snap->name = kstrdup(snap_name, GFP_KERNEL);
2359 snap->size = snap_size;
2360 snap->features = snap_features;
2368 return ERR_PTR(ret);
2371 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2372 u64 *snap_size, u64 *snap_features)
2376 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2378 *snap_size = rbd_dev->header.snap_sizes[which];
2379 *snap_features = 0; /* No features for v1 */
2381 /* Skip over names until we find the one we are looking for */
2383 snap_name = rbd_dev->header.snap_names;
2385 snap_name += strlen(snap_name) + 1;
2391 * Get the size and object order for an image snapshot, or if
2392 * snap_id is CEPH_NOSNAP, gets this information for the base
2395 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2396 u8 *order, u64 *snap_size)
2398 __le64 snapid = cpu_to_le64(snap_id);
2403 } __attribute__ ((packed)) size_buf = { 0 };
2405 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2407 (char *) &snapid, sizeof (snapid),
2408 (char *) &size_buf, sizeof (size_buf), NULL);
2409 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2413 *order = size_buf.order;
2414 *snap_size = le64_to_cpu(size_buf.size);
2416 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2417 (unsigned long long) snap_id, (unsigned int) *order,
2418 (unsigned long long) *snap_size);
2423 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2425 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2426 &rbd_dev->header.obj_order,
2427 &rbd_dev->header.image_size);
2430 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2436 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2440 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2441 "rbd", "get_object_prefix",
2443 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2444 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2447 ret = 0; /* rbd_req_sync_exec() can return positive */
2450 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2451 p + RBD_OBJ_PREFIX_LEN_MAX,
2454 if (IS_ERR(rbd_dev->header.object_prefix)) {
2455 ret = PTR_ERR(rbd_dev->header.object_prefix);
2456 rbd_dev->header.object_prefix = NULL;
2458 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2467 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2470 __le64 snapid = cpu_to_le64(snap_id);
2474 } features_buf = { 0 };
2478 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2479 "rbd", "get_features",
2480 (char *) &snapid, sizeof (snapid),
2481 (char *) &features_buf, sizeof (features_buf),
2483 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2487 incompat = le64_to_cpu(features_buf.incompat);
2488 if (incompat & ~RBD_FEATURES_ALL)
2491 *snap_features = le64_to_cpu(features_buf.features);
2493 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2494 (unsigned long long) snap_id,
2495 (unsigned long long) *snap_features,
2496 (unsigned long long) le64_to_cpu(features_buf.incompat));
2501 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2503 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2504 &rbd_dev->header.features);
2507 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2509 struct rbd_spec *parent_spec;
2511 void *reply_buf = NULL;
2519 parent_spec = rbd_spec_alloc();
2523 size = sizeof (__le64) + /* pool_id */
2524 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2525 sizeof (__le64) + /* snap_id */
2526 sizeof (__le64); /* overlap */
2527 reply_buf = kmalloc(size, GFP_KERNEL);
2533 snapid = cpu_to_le64(CEPH_NOSNAP);
2534 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2535 "rbd", "get_parent",
2536 (char *) &snapid, sizeof (snapid),
2537 (char *) reply_buf, size, NULL);
2538 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2544 end = (char *) reply_buf + size;
2545 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2546 if (parent_spec->pool_id == CEPH_NOPOOL)
2547 goto out; /* No parent? No problem. */
2549 /* The ceph file layout needs to fit pool id in 32 bits */
2552 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2555 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2556 if (IS_ERR(image_id)) {
2557 ret = PTR_ERR(image_id);
2560 parent_spec->image_id = image_id;
2561 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2562 ceph_decode_64_safe(&p, end, overlap, out_err);
2564 rbd_dev->parent_overlap = overlap;
2565 rbd_dev->parent_spec = parent_spec;
2566 parent_spec = NULL; /* rbd_dev now owns this */
2571 rbd_spec_put(parent_spec);
2576 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2578 size_t image_id_size;
2583 void *reply_buf = NULL;
2585 char *image_name = NULL;
2588 rbd_assert(!rbd_dev->spec->image_name);
2590 len = strlen(rbd_dev->spec->image_id);
2591 image_id_size = sizeof (__le32) + len;
2592 image_id = kmalloc(image_id_size, GFP_KERNEL);
2597 end = (char *) image_id + image_id_size;
2598 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2600 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2601 reply_buf = kmalloc(size, GFP_KERNEL);
2605 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2606 "rbd", "dir_get_name",
2607 image_id, image_id_size,
2608 (char *) reply_buf, size, NULL);
2612 end = (char *) reply_buf + size;
2613 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2614 if (IS_ERR(image_name))
2617 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2626 * When a parent image gets probed, we only have the pool, image,
2627 * and snapshot ids but not the names of any of them. This call
2628 * is made later to fill in those names. It has to be done after
2629 * rbd_dev_snaps_update() has completed because some of the
2630 * information (in particular, snapshot name) is not available
2633 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2635 struct ceph_osd_client *osdc;
2637 void *reply_buf = NULL;
2640 if (rbd_dev->spec->pool_name)
2641 return 0; /* Already have the names */
2643 /* Look up the pool name */
2645 osdc = &rbd_dev->rbd_client->client->osdc;
2646 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2648 rbd_warn(rbd_dev, "there is no pool with id %llu",
2649 rbd_dev->spec->pool_id); /* Really a BUG() */
2653 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2654 if (!rbd_dev->spec->pool_name)
2657 /* Fetch the image name; tolerate failure here */
2659 name = rbd_dev_image_name(rbd_dev);
2661 rbd_dev->spec->image_name = (char *) name;
2663 rbd_warn(rbd_dev, "unable to get image name");
2665 /* Look up the snapshot name. */
2667 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2669 rbd_warn(rbd_dev, "no snapshot with id %llu",
2670 rbd_dev->spec->snap_id); /* Really a BUG() */
2674 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2675 if(!rbd_dev->spec->snap_name)
2681 kfree(rbd_dev->spec->pool_name);
2682 rbd_dev->spec->pool_name = NULL;
2687 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2696 struct ceph_snap_context *snapc;
2700 * We'll need room for the seq value (maximum snapshot id),
2701 * snapshot count, and array of that many snapshot ids.
2702 * For now we have a fixed upper limit on the number we're
2703 * prepared to receive.
2705 size = sizeof (__le64) + sizeof (__le32) +
2706 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2707 reply_buf = kzalloc(size, GFP_KERNEL);
2711 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2712 "rbd", "get_snapcontext",
2714 reply_buf, size, ver);
2715 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2721 end = (char *) reply_buf + size;
2722 ceph_decode_64_safe(&p, end, seq, out);
2723 ceph_decode_32_safe(&p, end, snap_count, out);
2726 * Make sure the reported number of snapshot ids wouldn't go
2727 * beyond the end of our buffer. But before checking that,
2728 * make sure the computed size of the snapshot context we
2729 * allocate is representable in a size_t.
2731 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2736 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2739 size = sizeof (struct ceph_snap_context) +
2740 snap_count * sizeof (snapc->snaps[0]);
2741 snapc = kmalloc(size, GFP_KERNEL);
2747 atomic_set(&snapc->nref, 1);
2749 snapc->num_snaps = snap_count;
2750 for (i = 0; i < snap_count; i++)
2751 snapc->snaps[i] = ceph_decode_64(&p);
2753 rbd_dev->header.snapc = snapc;
2755 dout(" snap context seq = %llu, snap_count = %u\n",
2756 (unsigned long long) seq, (unsigned int) snap_count);
2764 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2774 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2775 reply_buf = kmalloc(size, GFP_KERNEL);
2777 return ERR_PTR(-ENOMEM);
2779 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2780 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2781 "rbd", "get_snapshot_name",
2782 (char *) &snap_id, sizeof (snap_id),
2783 reply_buf, size, NULL);
2784 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2789 end = (char *) reply_buf + size;
2790 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2791 if (IS_ERR(snap_name)) {
2792 ret = PTR_ERR(snap_name);
2795 dout(" snap_id 0x%016llx snap_name = %s\n",
2796 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2804 return ERR_PTR(ret);
2807 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2808 u64 *snap_size, u64 *snap_features)
2814 snap_id = rbd_dev->header.snapc->snaps[which];
2815 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2817 return ERR_PTR(ret);
2818 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2820 return ERR_PTR(ret);
2822 return rbd_dev_v2_snap_name(rbd_dev, which);
2825 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2826 u64 *snap_size, u64 *snap_features)
2828 if (rbd_dev->image_format == 1)
2829 return rbd_dev_v1_snap_info(rbd_dev, which,
2830 snap_size, snap_features);
2831 if (rbd_dev->image_format == 2)
2832 return rbd_dev_v2_snap_info(rbd_dev, which,
2833 snap_size, snap_features);
2834 return ERR_PTR(-EINVAL);
2837 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2842 down_write(&rbd_dev->header_rwsem);
2844 /* Grab old order first, to see if it changes */
2846 obj_order = rbd_dev->header.obj_order,
2847 ret = rbd_dev_v2_image_size(rbd_dev);
2850 if (rbd_dev->header.obj_order != obj_order) {
2854 rbd_update_mapping_size(rbd_dev);
2856 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2857 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2860 ret = rbd_dev_snaps_update(rbd_dev);
2861 dout("rbd_dev_snaps_update returned %d\n", ret);
2864 ret = rbd_dev_snaps_register(rbd_dev);
2865 dout("rbd_dev_snaps_register returned %d\n", ret);
2867 up_write(&rbd_dev->header_rwsem);
2873 * Scan the rbd device's current snapshot list and compare it to the
2874 * newly-received snapshot context. Remove any existing snapshots
2875 * not present in the new snapshot context. Add a new snapshot for
2876 * any snaphots in the snapshot context not in the current list.
2877 * And verify there are no changes to snapshots we already know
2880 * Assumes the snapshots in the snapshot context are sorted by
2881 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2882 * are also maintained in that order.)
2884 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2886 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2887 const u32 snap_count = snapc->num_snaps;
2888 struct list_head *head = &rbd_dev->snaps;
2889 struct list_head *links = head->next;
2892 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2893 while (index < snap_count || links != head) {
2895 struct rbd_snap *snap;
2898 u64 snap_features = 0;
2900 snap_id = index < snap_count ? snapc->snaps[index]
2902 snap = links != head ? list_entry(links, struct rbd_snap, node)
2904 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2906 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2907 struct list_head *next = links->next;
2909 /* Existing snapshot not in the new snap context */
2911 if (rbd_dev->spec->snap_id == snap->id)
2912 atomic_set(&rbd_dev->exists, 0);
2913 rbd_remove_snap_dev(snap);
2914 dout("%ssnap id %llu has been removed\n",
2915 rbd_dev->spec->snap_id == snap->id ?
2917 (unsigned long long) snap->id);
2919 /* Done with this list entry; advance */
2925 snap_name = rbd_dev_snap_info(rbd_dev, index,
2926 &snap_size, &snap_features);
2927 if (IS_ERR(snap_name))
2928 return PTR_ERR(snap_name);
2930 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2931 (unsigned long long) snap_id);
2932 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2933 struct rbd_snap *new_snap;
2935 /* We haven't seen this snapshot before */
2937 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2938 snap_id, snap_size, snap_features);
2939 if (IS_ERR(new_snap)) {
2940 int err = PTR_ERR(new_snap);
2942 dout(" failed to add dev, error %d\n", err);
2947 /* New goes before existing, or at end of list */
2949 dout(" added dev%s\n", snap ? "" : " at end\n");
2951 list_add_tail(&new_snap->node, &snap->node);
2953 list_add_tail(&new_snap->node, head);
2955 /* Already have this one */
2957 dout(" already present\n");
2959 rbd_assert(snap->size == snap_size);
2960 rbd_assert(!strcmp(snap->name, snap_name));
2961 rbd_assert(snap->features == snap_features);
2963 /* Done with this list entry; advance */
2965 links = links->next;
2968 /* Advance to the next entry in the snapshot context */
2972 dout("%s: done\n", __func__);
2978 * Scan the list of snapshots and register the devices for any that
2979 * have not already been registered.
2981 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2983 struct rbd_snap *snap;
2986 dout("%s called\n", __func__);
2987 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2990 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2991 if (!rbd_snap_registered(snap)) {
2992 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2997 dout("%s: returning %d\n", __func__, ret);
3002 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3007 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3009 dev = &rbd_dev->dev;
3010 dev->bus = &rbd_bus_type;
3011 dev->type = &rbd_device_type;
3012 dev->parent = &rbd_root_dev;
3013 dev->release = rbd_dev_release;
3014 dev_set_name(dev, "%d", rbd_dev->dev_id);
3015 ret = device_register(dev);
3017 mutex_unlock(&ctl_mutex);
3022 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3024 device_unregister(&rbd_dev->dev);
3027 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3030 * Get a unique rbd identifier for the given new rbd_dev, and add
3031 * the rbd_dev to the global list. The minimum rbd id is 1.
3033 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3035 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3037 spin_lock(&rbd_dev_list_lock);
3038 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3039 spin_unlock(&rbd_dev_list_lock);
3040 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3041 (unsigned long long) rbd_dev->dev_id);
3045 * Remove an rbd_dev from the global list, and record that its
3046 * identifier is no longer in use.
3048 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3050 struct list_head *tmp;
3051 int rbd_id = rbd_dev->dev_id;
3054 rbd_assert(rbd_id > 0);
3056 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3057 (unsigned long long) rbd_dev->dev_id);
3058 spin_lock(&rbd_dev_list_lock);
3059 list_del_init(&rbd_dev->node);
3062 * If the id being "put" is not the current maximum, there
3063 * is nothing special we need to do.
3065 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3066 spin_unlock(&rbd_dev_list_lock);
3071 * We need to update the current maximum id. Search the
3072 * list to find out what it is. We're more likely to find
3073 * the maximum at the end, so search the list backward.
3076 list_for_each_prev(tmp, &rbd_dev_list) {
3077 struct rbd_device *rbd_dev;
3079 rbd_dev = list_entry(tmp, struct rbd_device, node);
3080 if (rbd_dev->dev_id > max_id)
3081 max_id = rbd_dev->dev_id;
3083 spin_unlock(&rbd_dev_list_lock);
3086 * The max id could have been updated by rbd_dev_id_get(), in
3087 * which case it now accurately reflects the new maximum.
3088 * Be careful not to overwrite the maximum value in that
3091 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3092 dout(" max dev id has been reset\n");
3096 * Skips over white space at *buf, and updates *buf to point to the
3097 * first found non-space character (if any). Returns the length of
3098 * the token (string of non-white space characters) found. Note
3099 * that *buf must be terminated with '\0'.
3101 static inline size_t next_token(const char **buf)
3104 * These are the characters that produce nonzero for
3105 * isspace() in the "C" and "POSIX" locales.
3107 const char *spaces = " \f\n\r\t\v";
3109 *buf += strspn(*buf, spaces); /* Find start of token */
3111 return strcspn(*buf, spaces); /* Return token length */
3115 * Finds the next token in *buf, and if the provided token buffer is
3116 * big enough, copies the found token into it. The result, if
3117 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3118 * must be terminated with '\0' on entry.
3120 * Returns the length of the token found (not including the '\0').
3121 * Return value will be 0 if no token is found, and it will be >=
3122 * token_size if the token would not fit.
3124 * The *buf pointer will be updated to point beyond the end of the
3125 * found token. Note that this occurs even if the token buffer is
3126 * too small to hold it.
3128 static inline size_t copy_token(const char **buf,
3134 len = next_token(buf);
3135 if (len < token_size) {
3136 memcpy(token, *buf, len);
3137 *(token + len) = '\0';
3145 * Finds the next token in *buf, dynamically allocates a buffer big
3146 * enough to hold a copy of it, and copies the token into the new
3147 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3148 * that a duplicate buffer is created even for a zero-length token.
3150 * Returns a pointer to the newly-allocated duplicate, or a null
3151 * pointer if memory for the duplicate was not available. If
3152 * the lenp argument is a non-null pointer, the length of the token
3153 * (not including the '\0') is returned in *lenp.
3155 * If successful, the *buf pointer will be updated to point beyond
3156 * the end of the found token.
3158 * Note: uses GFP_KERNEL for allocation.
3160 static inline char *dup_token(const char **buf, size_t *lenp)
3165 len = next_token(buf);
3166 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3169 *(dup + len) = '\0';
3179 * Parse the options provided for an "rbd add" (i.e., rbd image
3180 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3181 * and the data written is passed here via a NUL-terminated buffer.
3182 * Returns 0 if successful or an error code otherwise.
3184 * The information extracted from these options is recorded in
3185 * the other parameters which return dynamically-allocated
3188 * The address of a pointer that will refer to a ceph options
3189 * structure. Caller must release the returned pointer using
3190 * ceph_destroy_options() when it is no longer needed.
3192 * Address of an rbd options pointer. Fully initialized by
3193 * this function; caller must release with kfree().
3195 * Address of an rbd image specification pointer. Fully
3196 * initialized by this function based on parsed options.
3197 * Caller must release with rbd_spec_put().
3199 * The options passed take this form:
3200 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3203 * A comma-separated list of one or more monitor addresses.
3204 * A monitor address is an ip address, optionally followed
3205 * by a port number (separated by a colon).
3206 * I.e.: ip1[:port1][,ip2[:port2]...]
3208 * A comma-separated list of ceph and/or rbd options.
3210 * The name of the rados pool containing the rbd image.
3212 * The name of the image in that pool to map.
3214 * An optional snapshot id. If provided, the mapping will
3215 * present data from the image at the time that snapshot was
3216 * created. The image head is used if no snapshot id is
3217 * provided. Snapshot mappings are always read-only.
3219 static int rbd_add_parse_args(const char *buf,
3220 struct ceph_options **ceph_opts,
3221 struct rbd_options **opts,
3222 struct rbd_spec **rbd_spec)
3226 const char *mon_addrs;
3227 size_t mon_addrs_size;
3228 struct rbd_spec *spec = NULL;
3229 struct rbd_options *rbd_opts = NULL;
3230 struct ceph_options *copts;
3233 /* The first four tokens are required */
3235 len = next_token(&buf);
3237 rbd_warn(NULL, "no monitor address(es) provided");
3241 mon_addrs_size = len + 1;
3245 options = dup_token(&buf, NULL);
3249 rbd_warn(NULL, "no options provided");
3253 spec = rbd_spec_alloc();
3257 spec->pool_name = dup_token(&buf, NULL);
3258 if (!spec->pool_name)
3260 if (!*spec->pool_name) {
3261 rbd_warn(NULL, "no pool name provided");
3265 spec->image_name = dup_token(&buf, NULL);
3266 if (!spec->image_name)
3268 if (!*spec->image_name) {
3269 rbd_warn(NULL, "no image name provided");
3274 * Snapshot name is optional; default is to use "-"
3275 * (indicating the head/no snapshot).
3277 len = next_token(&buf);
3279 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3280 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3281 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3282 ret = -ENAMETOOLONG;
3285 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3286 if (!spec->snap_name)
3288 *(spec->snap_name + len) = '\0';
3290 /* Initialize all rbd options to the defaults */
3292 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3296 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3298 copts = ceph_parse_options(options, mon_addrs,
3299 mon_addrs + mon_addrs_size - 1,
3300 parse_rbd_opts_token, rbd_opts);
3301 if (IS_ERR(copts)) {
3302 ret = PTR_ERR(copts);
3323 * An rbd format 2 image has a unique identifier, distinct from the
3324 * name given to it by the user. Internally, that identifier is
3325 * what's used to specify the names of objects related to the image.
3327 * A special "rbd id" object is used to map an rbd image name to its
3328 * id. If that object doesn't exist, then there is no v2 rbd image
3329 * with the supplied name.
3331 * This function will record the given rbd_dev's image_id field if
3332 * it can be determined, and in that case will return 0. If any
3333 * errors occur a negative errno will be returned and the rbd_dev's
3334 * image_id field will be unchanged (and should be NULL).
3336 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3345 * When probing a parent image, the image id is already
3346 * known (and the image name likely is not). There's no
3347 * need to fetch the image id again in this case.
3349 if (rbd_dev->spec->image_id)
3353 * First, see if the format 2 image id file exists, and if
3354 * so, get the image's persistent id from it.
3356 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3357 object_name = kmalloc(size, GFP_NOIO);
3360 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3361 dout("rbd id object name is %s\n", object_name);
3363 /* Response will be an encoded string, which includes a length */
3365 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3366 response = kzalloc(size, GFP_NOIO);
3372 ret = rbd_req_sync_exec(rbd_dev, object_name,
3375 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3376 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3379 ret = 0; /* rbd_req_sync_exec() can return positive */
3382 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3383 p + RBD_IMAGE_ID_LEN_MAX,
3385 if (IS_ERR(rbd_dev->spec->image_id)) {
3386 ret = PTR_ERR(rbd_dev->spec->image_id);
3387 rbd_dev->spec->image_id = NULL;
3389 dout("image_id is %s\n", rbd_dev->spec->image_id);
3398 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3403 /* Version 1 images have no id; empty string is used */
3405 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3406 if (!rbd_dev->spec->image_id)
3409 /* Record the header object name for this rbd image. */
3411 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3412 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3413 if (!rbd_dev->header_name) {
3417 sprintf(rbd_dev->header_name, "%s%s",
3418 rbd_dev->spec->image_name, RBD_SUFFIX);
3420 /* Populate rbd image metadata */
3422 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3426 /* Version 1 images have no parent (no layering) */
3428 rbd_dev->parent_spec = NULL;
3429 rbd_dev->parent_overlap = 0;
3431 rbd_dev->image_format = 1;
3433 dout("discovered version 1 image, header name is %s\n",
3434 rbd_dev->header_name);
3439 kfree(rbd_dev->header_name);
3440 rbd_dev->header_name = NULL;
3441 kfree(rbd_dev->spec->image_id);
3442 rbd_dev->spec->image_id = NULL;
3447 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3454 * Image id was filled in by the caller. Record the header
3455 * object name for this rbd image.
3457 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3458 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3459 if (!rbd_dev->header_name)
3461 sprintf(rbd_dev->header_name, "%s%s",
3462 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3464 /* Get the size and object order for the image */
3466 ret = rbd_dev_v2_image_size(rbd_dev);
3470 /* Get the object prefix (a.k.a. block_name) for the image */
3472 ret = rbd_dev_v2_object_prefix(rbd_dev);
3476 /* Get the and check features for the image */
3478 ret = rbd_dev_v2_features(rbd_dev);
3482 /* If the image supports layering, get the parent info */
3484 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3485 ret = rbd_dev_v2_parent_info(rbd_dev);
3490 /* crypto and compression type aren't (yet) supported for v2 images */
3492 rbd_dev->header.crypt_type = 0;
3493 rbd_dev->header.comp_type = 0;
3495 /* Get the snapshot context, plus the header version */
3497 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3500 rbd_dev->header.obj_version = ver;
3502 rbd_dev->image_format = 2;
3504 dout("discovered version 2 image, header name is %s\n",
3505 rbd_dev->header_name);
3509 rbd_dev->parent_overlap = 0;
3510 rbd_spec_put(rbd_dev->parent_spec);
3511 rbd_dev->parent_spec = NULL;
3512 kfree(rbd_dev->header_name);
3513 rbd_dev->header_name = NULL;
3514 kfree(rbd_dev->header.object_prefix);
3515 rbd_dev->header.object_prefix = NULL;
3520 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3524 /* no need to lock here, as rbd_dev is not registered yet */
3525 ret = rbd_dev_snaps_update(rbd_dev);
3529 ret = rbd_dev_probe_update_spec(rbd_dev);
3533 ret = rbd_dev_set_mapping(rbd_dev);
3537 /* generate unique id: find highest unique id, add one */
3538 rbd_dev_id_get(rbd_dev);
3540 /* Fill in the device name, now that we have its id. */
3541 BUILD_BUG_ON(DEV_NAME_LEN
3542 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3543 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3545 /* Get our block major device number. */
3547 ret = register_blkdev(0, rbd_dev->name);
3550 rbd_dev->major = ret;
3552 /* Set up the blkdev mapping. */
3554 ret = rbd_init_disk(rbd_dev);
3556 goto err_out_blkdev;
3558 ret = rbd_bus_add_dev(rbd_dev);
3563 * At this point cleanup in the event of an error is the job
3564 * of the sysfs code (initiated by rbd_bus_del_dev()).
3566 down_write(&rbd_dev->header_rwsem);
3567 ret = rbd_dev_snaps_register(rbd_dev);
3568 up_write(&rbd_dev->header_rwsem);
3572 ret = rbd_req_sync_watch(rbd_dev, 1);
3576 /* Everything's ready. Announce the disk to the world. */
3578 add_disk(rbd_dev->disk);
3580 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3581 (unsigned long long) rbd_dev->mapping.size);
3585 /* this will also clean up rest of rbd_dev stuff */
3587 rbd_bus_del_dev(rbd_dev);
3591 rbd_free_disk(rbd_dev);
3593 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3595 rbd_dev_id_put(rbd_dev);
3597 rbd_remove_all_snaps(rbd_dev);
3603 * Probe for the existence of the header object for the given rbd
3604 * device. For format 2 images this includes determining the image
3607 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3612 * Get the id from the image id object. If it's not a
3613 * format 2 image, we'll get ENOENT back, and we'll assume
3614 * it's a format 1 image.
3616 ret = rbd_dev_image_id(rbd_dev);
3618 ret = rbd_dev_v1_probe(rbd_dev);
3620 ret = rbd_dev_v2_probe(rbd_dev);
3622 dout("probe failed, returning %d\n", ret);
3627 ret = rbd_dev_probe_finish(rbd_dev);
3629 rbd_header_free(&rbd_dev->header);
3634 static ssize_t rbd_add(struct bus_type *bus,
3638 struct rbd_device *rbd_dev = NULL;
3639 struct ceph_options *ceph_opts = NULL;
3640 struct rbd_options *rbd_opts = NULL;
3641 struct rbd_spec *spec = NULL;
3642 struct rbd_client *rbdc;
3643 struct ceph_osd_client *osdc;
3646 if (!try_module_get(THIS_MODULE))
3649 /* parse add command */
3650 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3652 goto err_out_module;
3654 rbdc = rbd_get_client(ceph_opts);
3659 ceph_opts = NULL; /* rbd_dev client now owns this */
3662 osdc = &rbdc->client->osdc;
3663 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3665 goto err_out_client;
3666 spec->pool_id = (u64) rc;
3668 /* The ceph file layout needs to fit pool id in 32 bits */
3670 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
3672 goto err_out_client;
3675 rbd_dev = rbd_dev_create(rbdc, spec);
3677 goto err_out_client;
3678 rbdc = NULL; /* rbd_dev now owns this */
3679 spec = NULL; /* rbd_dev now owns this */
3681 rbd_dev->mapping.read_only = rbd_opts->read_only;
3683 rbd_opts = NULL; /* done with this */
3685 rc = rbd_dev_probe(rbd_dev);
3687 goto err_out_rbd_dev;
3691 rbd_dev_destroy(rbd_dev);
3693 rbd_put_client(rbdc);
3696 ceph_destroy_options(ceph_opts);
3700 module_put(THIS_MODULE);
3702 dout("Error adding device %s\n", buf);
3704 return (ssize_t) rc;
3707 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3709 struct list_head *tmp;
3710 struct rbd_device *rbd_dev;
3712 spin_lock(&rbd_dev_list_lock);
3713 list_for_each(tmp, &rbd_dev_list) {
3714 rbd_dev = list_entry(tmp, struct rbd_device, node);
3715 if (rbd_dev->dev_id == dev_id) {
3716 spin_unlock(&rbd_dev_list_lock);
3720 spin_unlock(&rbd_dev_list_lock);
3724 static void rbd_dev_release(struct device *dev)
3726 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3728 if (rbd_dev->watch_request) {
3729 struct ceph_client *client = rbd_dev->rbd_client->client;
3731 ceph_osdc_unregister_linger_request(&client->osdc,
3732 rbd_dev->watch_request);
3734 if (rbd_dev->watch_event)
3735 rbd_req_sync_watch(rbd_dev, 0);
3737 /* clean up and free blkdev */
3738 rbd_free_disk(rbd_dev);
3739 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3741 /* release allocated disk header fields */
3742 rbd_header_free(&rbd_dev->header);
3744 /* done with the id, and with the rbd_dev */
3745 rbd_dev_id_put(rbd_dev);
3746 rbd_assert(rbd_dev->rbd_client != NULL);
3747 rbd_dev_destroy(rbd_dev);
3749 /* release module ref */
3750 module_put(THIS_MODULE);
3753 static ssize_t rbd_remove(struct bus_type *bus,
3757 struct rbd_device *rbd_dev = NULL;
3762 rc = strict_strtoul(buf, 10, &ul);
3766 /* convert to int; abort if we lost anything in the conversion */
3767 target_id = (int) ul;
3768 if (target_id != ul)
3771 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3773 rbd_dev = __rbd_get_dev(target_id);
3779 if (rbd_dev->open_count) {
3784 rbd_remove_all_snaps(rbd_dev);
3785 rbd_bus_del_dev(rbd_dev);
3788 mutex_unlock(&ctl_mutex);
3794 * create control files in sysfs
3797 static int rbd_sysfs_init(void)
3801 ret = device_register(&rbd_root_dev);
3805 ret = bus_register(&rbd_bus_type);
3807 device_unregister(&rbd_root_dev);
3812 static void rbd_sysfs_cleanup(void)
3814 bus_unregister(&rbd_bus_type);
3815 device_unregister(&rbd_root_dev);
3818 int __init rbd_init(void)
3822 rc = rbd_sysfs_init();
3825 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3829 void __exit rbd_exit(void)
3831 rbd_sysfs_cleanup();
3834 module_init(rbd_init);
3835 module_exit(rbd_exit);
3837 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3838 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3839 MODULE_DESCRIPTION("rados block device");
3841 /* following authorship retained from original osdblk.c */
3842 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3844 MODULE_LICENSE("GPL");