2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (0)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header {
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context *snapc;
115 * An rbd image specification.
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
153 * an instance of the client. multiple devices may share an rbd client.
156 struct ceph_client *client;
158 struct list_head node;
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169 enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
173 struct rbd_obj_request {
174 const char *object_name;
175 u64 offset; /* object start byte */
176 u64 length; /* bytes from offset */
178 struct rbd_img_request *img_request;
179 struct list_head links; /* img_request->obj_requests */
180 u32 which; /* posn image request list */
182 enum obj_request_type type;
184 struct bio *bio_list;
191 struct ceph_osd_request *osd_req;
193 u64 xferred; /* bytes transferred */
198 rbd_obj_callback_t callback;
199 struct completion completion;
204 struct rbd_img_request {
206 struct rbd_device *rbd_dev;
207 u64 offset; /* starting image byte offset */
208 u64 length; /* byte count from offset */
209 bool write_request; /* false for read */
211 struct ceph_snap_context *snapc; /* for writes */
212 u64 snap_id; /* for reads */
214 spinlock_t completion_lock;/* protects next_completion */
216 rbd_img_callback_t callback;
217 int result; /* first nonzero obj_request result */
219 u32 obj_request_count;
220 struct list_head obj_requests; /* rbd_obj_request structs */
225 #define for_each_obj_request(ireq, oreq) \
226 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
227 #define for_each_obj_request_from(ireq, oreq) \
228 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
229 #define for_each_obj_request_safe(ireq, oreq, n) \
230 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
236 struct list_head node;
251 int dev_id; /* blkdev unique id */
253 int major; /* blkdev assigned major */
254 struct gendisk *disk; /* blkdev's gendisk and rq */
256 u32 image_format; /* Either 1 or 2 */
257 struct rbd_client *rbd_client;
259 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
261 spinlock_t lock; /* queue, flags, open_count */
263 struct rbd_image_header header;
264 unsigned long flags; /* possibly lock protected */
265 struct rbd_spec *spec;
269 struct ceph_file_layout layout;
271 struct ceph_osd_event *watch_event;
272 struct rbd_obj_request *watch_request;
274 struct rbd_spec *parent_spec;
277 /* protects updating the header */
278 struct rw_semaphore header_rwsem;
280 struct rbd_mapping mapping;
282 struct list_head node;
284 /* list of snapshots */
285 struct list_head snaps;
289 unsigned long open_count; /* protected by lock */
293 * Flag bits for rbd_dev->flags. If atomicity is required,
294 * rbd_dev->lock is used to protect access.
296 * Currently, only the "removing" flag (which is coupled with the
297 * "open_count" field) requires atomic access.
300 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
301 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
304 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
306 static LIST_HEAD(rbd_dev_list); /* devices */
307 static DEFINE_SPINLOCK(rbd_dev_list_lock);
309 static LIST_HEAD(rbd_client_list); /* clients */
310 static DEFINE_SPINLOCK(rbd_client_list_lock);
312 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
313 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
315 static void rbd_dev_release(struct device *dev);
316 static void rbd_remove_snap_dev(struct rbd_snap *snap);
318 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
320 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
323 static struct bus_attribute rbd_bus_attrs[] = {
324 __ATTR(add, S_IWUSR, NULL, rbd_add),
325 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329 static struct bus_type rbd_bus_type = {
331 .bus_attrs = rbd_bus_attrs,
334 static void rbd_root_dev_release(struct device *dev)
338 static struct device rbd_root_dev = {
340 .release = rbd_root_dev_release,
343 static __printf(2, 3)
344 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
346 struct va_format vaf;
354 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
355 else if (rbd_dev->disk)
356 printk(KERN_WARNING "%s: %s: %pV\n",
357 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
358 else if (rbd_dev->spec && rbd_dev->spec->image_name)
359 printk(KERN_WARNING "%s: image %s: %pV\n",
360 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
361 else if (rbd_dev->spec && rbd_dev->spec->image_id)
362 printk(KERN_WARNING "%s: id %s: %pV\n",
363 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
365 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
366 RBD_DRV_NAME, rbd_dev, &vaf);
371 #define rbd_assert(expr) \
372 if (unlikely(!(expr))) { \
373 printk(KERN_ERR "\nAssertion failure in %s() " \
375 "\trbd_assert(%s);\n\n", \
376 __func__, __LINE__, #expr); \
379 #else /* !RBD_DEBUG */
380 # define rbd_assert(expr) ((void) 0)
381 #endif /* !RBD_DEBUG */
383 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
384 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
386 static int rbd_open(struct block_device *bdev, fmode_t mode)
388 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
389 bool removing = false;
391 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
394 spin_lock_irq(&rbd_dev->lock);
395 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
398 rbd_dev->open_count++;
399 spin_unlock_irq(&rbd_dev->lock);
403 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
404 (void) get_device(&rbd_dev->dev);
405 set_device_ro(bdev, rbd_dev->mapping.read_only);
406 mutex_unlock(&ctl_mutex);
411 static int rbd_release(struct gendisk *disk, fmode_t mode)
413 struct rbd_device *rbd_dev = disk->private_data;
414 unsigned long open_count_before;
416 spin_lock_irq(&rbd_dev->lock);
417 open_count_before = rbd_dev->open_count--;
418 spin_unlock_irq(&rbd_dev->lock);
419 rbd_assert(open_count_before > 0);
421 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
422 put_device(&rbd_dev->dev);
423 mutex_unlock(&ctl_mutex);
428 static const struct block_device_operations rbd_bd_ops = {
429 .owner = THIS_MODULE,
431 .release = rbd_release,
435 * Initialize an rbd client instance.
438 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
440 struct rbd_client *rbdc;
443 dout("%s:\n", __func__);
444 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448 kref_init(&rbdc->kref);
449 INIT_LIST_HEAD(&rbdc->node);
451 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
454 if (IS_ERR(rbdc->client))
456 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
458 ret = ceph_open_session(rbdc->client);
462 spin_lock(&rbd_client_list_lock);
463 list_add_tail(&rbdc->node, &rbd_client_list);
464 spin_unlock(&rbd_client_list_lock);
466 mutex_unlock(&ctl_mutex);
467 dout("%s: rbdc %p\n", __func__, rbdc);
472 ceph_destroy_client(rbdc->client);
474 mutex_unlock(&ctl_mutex);
478 ceph_destroy_options(ceph_opts);
479 dout("%s: error %d\n", __func__, ret);
485 * Find a ceph client with specific addr and configuration. If
486 * found, bump its reference count.
488 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
490 struct rbd_client *client_node;
493 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
496 spin_lock(&rbd_client_list_lock);
497 list_for_each_entry(client_node, &rbd_client_list, node) {
498 if (!ceph_compare_options(ceph_opts, client_node->client)) {
499 kref_get(&client_node->kref);
504 spin_unlock(&rbd_client_list_lock);
506 return found ? client_node : NULL;
516 /* string args above */
519 /* Boolean args above */
523 static match_table_t rbd_opts_tokens = {
525 /* string args above */
526 {Opt_read_only, "read_only"},
527 {Opt_read_only, "ro"}, /* Alternate spelling */
528 {Opt_read_write, "read_write"},
529 {Opt_read_write, "rw"}, /* Alternate spelling */
530 /* Boolean args above */
538 #define RBD_READ_ONLY_DEFAULT false
540 static int parse_rbd_opts_token(char *c, void *private)
542 struct rbd_options *rbd_opts = private;
543 substring_t argstr[MAX_OPT_ARGS];
544 int token, intval, ret;
546 token = match_token(c, rbd_opts_tokens, argstr);
550 if (token < Opt_last_int) {
551 ret = match_int(&argstr[0], &intval);
553 pr_err("bad mount option arg (not int) "
557 dout("got int token %d val %d\n", token, intval);
558 } else if (token > Opt_last_int && token < Opt_last_string) {
559 dout("got string token %d val %s\n", token,
561 } else if (token > Opt_last_string && token < Opt_last_bool) {
562 dout("got Boolean token %d\n", token);
564 dout("got token %d\n", token);
569 rbd_opts->read_only = true;
572 rbd_opts->read_only = false;
582 * Get a ceph client with specific addr and configuration, if one does
583 * not exist create it.
585 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
587 struct rbd_client *rbdc;
589 rbdc = rbd_client_find(ceph_opts);
590 if (rbdc) /* using an existing client */
591 ceph_destroy_options(ceph_opts);
593 rbdc = rbd_client_create(ceph_opts);
599 * Destroy ceph client
601 * Caller must hold rbd_client_list_lock.
603 static void rbd_client_release(struct kref *kref)
605 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
607 dout("%s: rbdc %p\n", __func__, rbdc);
608 spin_lock(&rbd_client_list_lock);
609 list_del(&rbdc->node);
610 spin_unlock(&rbd_client_list_lock);
612 ceph_destroy_client(rbdc->client);
617 * Drop reference to ceph client node. If it's not referenced anymore, release
620 static void rbd_put_client(struct rbd_client *rbdc)
623 kref_put(&rbdc->kref, rbd_client_release);
626 static bool rbd_image_format_valid(u32 image_format)
628 return image_format == 1 || image_format == 2;
631 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
636 /* The header has to start with the magic rbd header text */
637 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
640 /* The bio layer requires at least sector-sized I/O */
642 if (ondisk->options.order < SECTOR_SHIFT)
645 /* If we use u64 in a few spots we may be able to loosen this */
647 if (ondisk->options.order > 8 * sizeof (int) - 1)
651 * The size of a snapshot header has to fit in a size_t, and
652 * that limits the number of snapshots.
654 snap_count = le32_to_cpu(ondisk->snap_count);
655 size = SIZE_MAX - sizeof (struct ceph_snap_context);
656 if (snap_count > size / sizeof (__le64))
660 * Not only that, but the size of the entire the snapshot
661 * header must also be representable in a size_t.
663 size -= snap_count * sizeof (__le64);
664 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
671 * Create a new header structure, translate header format from the on-disk
674 static int rbd_header_from_disk(struct rbd_image_header *header,
675 struct rbd_image_header_ondisk *ondisk)
682 memset(header, 0, sizeof (*header));
684 snap_count = le32_to_cpu(ondisk->snap_count);
686 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
687 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
688 if (!header->object_prefix)
690 memcpy(header->object_prefix, ondisk->object_prefix, len);
691 header->object_prefix[len] = '\0';
694 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
696 /* Save a copy of the snapshot names */
698 if (snap_names_len > (u64) SIZE_MAX)
700 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
701 if (!header->snap_names)
704 * Note that rbd_dev_v1_header_read() guarantees
705 * the ondisk buffer we're working with has
706 * snap_names_len bytes beyond the end of the
707 * snapshot id array, this memcpy() is safe.
709 memcpy(header->snap_names, &ondisk->snaps[snap_count],
712 /* Record each snapshot's size */
714 size = snap_count * sizeof (*header->snap_sizes);
715 header->snap_sizes = kmalloc(size, GFP_KERNEL);
716 if (!header->snap_sizes)
718 for (i = 0; i < snap_count; i++)
719 header->snap_sizes[i] =
720 le64_to_cpu(ondisk->snaps[i].image_size);
722 WARN_ON(ondisk->snap_names_len);
723 header->snap_names = NULL;
724 header->snap_sizes = NULL;
727 header->features = 0; /* No features support in v1 images */
728 header->obj_order = ondisk->options.order;
729 header->crypt_type = ondisk->options.crypt_type;
730 header->comp_type = ondisk->options.comp_type;
732 /* Allocate and fill in the snapshot context */
734 header->image_size = le64_to_cpu(ondisk->image_size);
735 size = sizeof (struct ceph_snap_context);
736 size += snap_count * sizeof (header->snapc->snaps[0]);
737 header->snapc = kzalloc(size, GFP_KERNEL);
741 atomic_set(&header->snapc->nref, 1);
742 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
743 header->snapc->num_snaps = snap_count;
744 for (i = 0; i < snap_count; i++)
745 header->snapc->snaps[i] =
746 le64_to_cpu(ondisk->snaps[i].id);
751 kfree(header->snap_sizes);
752 header->snap_sizes = NULL;
753 kfree(header->snap_names);
754 header->snap_names = NULL;
755 kfree(header->object_prefix);
756 header->object_prefix = NULL;
761 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
763 struct rbd_snap *snap;
765 if (snap_id == CEPH_NOSNAP)
766 return RBD_SNAP_HEAD_NAME;
768 list_for_each_entry(snap, &rbd_dev->snaps, node)
769 if (snap_id == snap->id)
775 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
778 struct rbd_snap *snap;
780 list_for_each_entry(snap, &rbd_dev->snaps, node) {
781 if (!strcmp(snap_name, snap->name)) {
782 rbd_dev->spec->snap_id = snap->id;
783 rbd_dev->mapping.size = snap->size;
784 rbd_dev->mapping.features = snap->features;
793 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
797 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
798 sizeof (RBD_SNAP_HEAD_NAME))) {
799 rbd_dev->spec->snap_id = CEPH_NOSNAP;
800 rbd_dev->mapping.size = rbd_dev->header.image_size;
801 rbd_dev->mapping.features = rbd_dev->header.features;
804 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
807 rbd_dev->mapping.read_only = true;
809 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
815 static void rbd_header_free(struct rbd_image_header *header)
817 kfree(header->object_prefix);
818 header->object_prefix = NULL;
819 kfree(header->snap_sizes);
820 header->snap_sizes = NULL;
821 kfree(header->snap_names);
822 header->snap_names = NULL;
823 ceph_put_snap_context(header->snapc);
824 header->snapc = NULL;
827 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
833 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
836 segment = offset >> rbd_dev->header.obj_order;
837 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
838 rbd_dev->header.object_prefix, segment);
839 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
840 pr_err("error formatting segment name for #%llu (%d)\n",
849 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
851 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
853 return offset & (segment_size - 1);
856 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
857 u64 offset, u64 length)
859 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
861 offset &= segment_size - 1;
863 rbd_assert(length <= U64_MAX - offset);
864 if (offset + length > segment_size)
865 length = segment_size - offset;
871 * returns the size of an object in the image
873 static u64 rbd_obj_bytes(struct rbd_image_header *header)
875 return 1 << header->obj_order;
882 static void bio_chain_put(struct bio *chain)
888 chain = chain->bi_next;
894 * zeros a bio chain, starting at specific offset
896 static void zero_bio_chain(struct bio *chain, int start_ofs)
905 bio_for_each_segment(bv, chain, i) {
906 if (pos + bv->bv_len > start_ofs) {
907 int remainder = max(start_ofs - pos, 0);
908 buf = bvec_kmap_irq(bv, &flags);
909 memset(buf + remainder, 0,
910 bv->bv_len - remainder);
911 bvec_kunmap_irq(buf, &flags);
916 chain = chain->bi_next;
921 * Clone a portion of a bio, starting at the given byte offset
922 * and continuing for the number of bytes indicated.
924 static struct bio *bio_clone_range(struct bio *bio_src,
933 unsigned short end_idx;
937 /* Handle the easy case for the caller */
939 if (!offset && len == bio_src->bi_size)
940 return bio_clone(bio_src, gfpmask);
942 if (WARN_ON_ONCE(!len))
944 if (WARN_ON_ONCE(len > bio_src->bi_size))
946 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
949 /* Find first affected segment... */
952 __bio_for_each_segment(bv, bio_src, idx, 0) {
953 if (resid < bv->bv_len)
959 /* ...and the last affected segment */
962 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
963 if (resid <= bv->bv_len)
967 vcnt = end_idx - idx + 1;
969 /* Build the clone */
971 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
973 return NULL; /* ENOMEM */
975 bio->bi_bdev = bio_src->bi_bdev;
976 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
977 bio->bi_rw = bio_src->bi_rw;
978 bio->bi_flags |= 1 << BIO_CLONED;
981 * Copy over our part of the bio_vec, then update the first
982 * and last (or only) entries.
984 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
985 vcnt * sizeof (struct bio_vec));
986 bio->bi_io_vec[0].bv_offset += voff;
988 bio->bi_io_vec[0].bv_len -= voff;
989 bio->bi_io_vec[vcnt - 1].bv_len = resid;
991 bio->bi_io_vec[0].bv_len = len;
1002 * Clone a portion of a bio chain, starting at the given byte offset
1003 * into the first bio in the source chain and continuing for the
1004 * number of bytes indicated. The result is another bio chain of
1005 * exactly the given length, or a null pointer on error.
1007 * The bio_src and offset parameters are both in-out. On entry they
1008 * refer to the first source bio and the offset into that bio where
1009 * the start of data to be cloned is located.
1011 * On return, bio_src is updated to refer to the bio in the source
1012 * chain that contains first un-cloned byte, and *offset will
1013 * contain the offset of that byte within that bio.
1015 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1016 unsigned int *offset,
1020 struct bio *bi = *bio_src;
1021 unsigned int off = *offset;
1022 struct bio *chain = NULL;
1025 /* Build up a chain of clone bios up to the limit */
1027 if (!bi || off >= bi->bi_size || !len)
1028 return NULL; /* Nothing to clone */
1032 unsigned int bi_size;
1036 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1037 goto out_err; /* EINVAL; ran out of bio's */
1039 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1040 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1042 goto out_err; /* ENOMEM */
1045 end = &bio->bi_next;
1048 if (off == bi->bi_size) {
1059 bio_chain_put(chain);
1064 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1066 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1067 atomic_read(&obj_request->kref.refcount));
1068 kref_get(&obj_request->kref);
1071 static void rbd_obj_request_destroy(struct kref *kref);
1072 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1074 rbd_assert(obj_request != NULL);
1075 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1076 atomic_read(&obj_request->kref.refcount));
1077 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1080 static void rbd_img_request_get(struct rbd_img_request *img_request)
1082 dout("%s: img %p (was %d)\n", __func__, img_request,
1083 atomic_read(&img_request->kref.refcount));
1084 kref_get(&img_request->kref);
1087 static void rbd_img_request_destroy(struct kref *kref);
1088 static void rbd_img_request_put(struct rbd_img_request *img_request)
1090 rbd_assert(img_request != NULL);
1091 dout("%s: img %p (was %d)\n", __func__, img_request,
1092 atomic_read(&img_request->kref.refcount));
1093 kref_put(&img_request->kref, rbd_img_request_destroy);
1096 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1097 struct rbd_obj_request *obj_request)
1099 rbd_assert(obj_request->img_request == NULL);
1101 rbd_obj_request_get(obj_request);
1102 obj_request->img_request = img_request;
1103 obj_request->which = img_request->obj_request_count;
1104 rbd_assert(obj_request->which != BAD_WHICH);
1105 img_request->obj_request_count++;
1106 list_add_tail(&obj_request->links, &img_request->obj_requests);
1107 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1108 obj_request->which);
1111 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1112 struct rbd_obj_request *obj_request)
1114 rbd_assert(obj_request->which != BAD_WHICH);
1116 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1117 obj_request->which);
1118 list_del(&obj_request->links);
1119 rbd_assert(img_request->obj_request_count > 0);
1120 img_request->obj_request_count--;
1121 rbd_assert(obj_request->which == img_request->obj_request_count);
1122 obj_request->which = BAD_WHICH;
1123 rbd_assert(obj_request->img_request == img_request);
1124 obj_request->img_request = NULL;
1125 obj_request->callback = NULL;
1126 rbd_obj_request_put(obj_request);
1129 static bool obj_request_type_valid(enum obj_request_type type)
1132 case OBJ_REQUEST_NODATA:
1133 case OBJ_REQUEST_BIO:
1134 case OBJ_REQUEST_PAGES:
1141 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1142 struct rbd_obj_request *obj_request)
1144 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1146 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1149 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1151 dout("%s: img %p\n", __func__, img_request);
1152 if (img_request->callback)
1153 img_request->callback(img_request);
1155 rbd_img_request_put(img_request);
1158 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1160 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1162 dout("%s: obj %p\n", __func__, obj_request);
1164 return wait_for_completion_interruptible(&obj_request->completion);
1167 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1169 atomic_set(&obj_request->done, 0);
1173 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1177 done = atomic_inc_return(&obj_request->done);
1179 struct rbd_img_request *img_request = obj_request->img_request;
1180 struct rbd_device *rbd_dev;
1182 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1183 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1188 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1191 return atomic_read(&obj_request->done) != 0;
1195 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1197 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1198 obj_request, obj_request->img_request, obj_request->result,
1199 obj_request->xferred, obj_request->length);
1201 * ENOENT means a hole in the image. We zero-fill the
1202 * entire length of the request. A short read also implies
1203 * zero-fill to the end of the request. Either way we
1204 * update the xferred count to indicate the whole request
1207 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1208 if (obj_request->result == -ENOENT) {
1209 zero_bio_chain(obj_request->bio_list, 0);
1210 obj_request->result = 0;
1211 obj_request->xferred = obj_request->length;
1212 } else if (obj_request->xferred < obj_request->length &&
1213 !obj_request->result) {
1214 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1215 obj_request->xferred = obj_request->length;
1217 obj_request_done_set(obj_request);
1220 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1222 dout("%s: obj %p cb %p\n", __func__, obj_request,
1223 obj_request->callback);
1224 if (obj_request->callback)
1225 obj_request->callback(obj_request);
1227 complete_all(&obj_request->completion);
1230 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1232 dout("%s: obj %p\n", __func__, obj_request);
1233 obj_request_done_set(obj_request);
1236 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1238 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1239 obj_request->result, obj_request->xferred, obj_request->length);
1240 if (obj_request->img_request)
1241 rbd_img_obj_request_read_callback(obj_request);
1243 obj_request_done_set(obj_request);
1246 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1248 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1249 obj_request->result, obj_request->length);
1251 * There is no such thing as a successful short write.
1252 * Our xferred value is the number of bytes transferred
1253 * back. Set it to our originally-requested length.
1255 obj_request->xferred = obj_request->length;
1256 obj_request_done_set(obj_request);
1260 * For a simple stat call there's nothing to do. We'll do more if
1261 * this is part of a write sequence for a layered image.
1263 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1265 dout("%s: obj %p\n", __func__, obj_request);
1266 obj_request_done_set(obj_request);
1269 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1270 struct ceph_msg *msg)
1272 struct rbd_obj_request *obj_request = osd_req->r_priv;
1275 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1276 rbd_assert(osd_req == obj_request->osd_req);
1277 rbd_assert(!!obj_request->img_request ^
1278 (obj_request->which == BAD_WHICH));
1280 if (osd_req->r_result < 0)
1281 obj_request->result = osd_req->r_result;
1282 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1284 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1287 * We support a 64-bit length, but ultimately it has to be
1288 * passed to blk_end_request(), which takes an unsigned int.
1290 obj_request->xferred = osd_req->r_reply_op_len[0];
1291 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1292 opcode = osd_req->r_ops[0].op;
1294 case CEPH_OSD_OP_READ:
1295 rbd_osd_read_callback(obj_request);
1297 case CEPH_OSD_OP_WRITE:
1298 rbd_osd_write_callback(obj_request);
1300 case CEPH_OSD_OP_STAT:
1301 rbd_osd_stat_callback(obj_request);
1303 case CEPH_OSD_OP_CALL:
1304 case CEPH_OSD_OP_NOTIFY_ACK:
1305 case CEPH_OSD_OP_WATCH:
1306 rbd_osd_trivial_callback(obj_request);
1309 rbd_warn(NULL, "%s: unsupported op %hu\n",
1310 obj_request->object_name, (unsigned short) opcode);
1314 if (obj_request_done_test(obj_request))
1315 rbd_obj_request_complete(obj_request);
1318 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1321 struct rbd_img_request *img_request = obj_request->img_request;
1322 struct ceph_osd_request *osd_req = obj_request->osd_req;
1323 struct ceph_snap_context *snapc = NULL;
1324 u64 snap_id = CEPH_NOSNAP;
1325 struct timespec *mtime = NULL;
1326 struct timespec now;
1328 rbd_assert(osd_req != NULL);
1330 if (write_request) {
1334 snapc = img_request->snapc;
1335 } else if (img_request) {
1336 snap_id = img_request->snap_id;
1338 ceph_osdc_build_request(osd_req, obj_request->offset,
1339 snapc, snap_id, mtime);
1342 static struct ceph_osd_request *rbd_osd_req_create(
1343 struct rbd_device *rbd_dev,
1345 struct rbd_obj_request *obj_request)
1347 struct rbd_img_request *img_request = obj_request->img_request;
1348 struct ceph_snap_context *snapc = NULL;
1349 struct ceph_osd_client *osdc;
1350 struct ceph_osd_request *osd_req;
1353 rbd_assert(img_request->write_request == write_request);
1354 if (img_request->write_request)
1355 snapc = img_request->snapc;
1358 /* Allocate and initialize the request, for the single op */
1360 osdc = &rbd_dev->rbd_client->client->osdc;
1361 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1363 return NULL; /* ENOMEM */
1366 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1368 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1370 osd_req->r_callback = rbd_osd_req_callback;
1371 osd_req->r_priv = obj_request;
1373 osd_req->r_oid_len = strlen(obj_request->object_name);
1374 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1375 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1377 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1382 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1384 ceph_osdc_put_request(osd_req);
1387 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1389 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1390 u64 offset, u64 length,
1391 enum obj_request_type type)
1393 struct rbd_obj_request *obj_request;
1397 rbd_assert(obj_request_type_valid(type));
1399 size = strlen(object_name) + 1;
1400 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1404 name = (char *)(obj_request + 1);
1405 obj_request->object_name = memcpy(name, object_name, size);
1406 obj_request->offset = offset;
1407 obj_request->length = length;
1408 obj_request->which = BAD_WHICH;
1409 obj_request->type = type;
1410 INIT_LIST_HEAD(&obj_request->links);
1411 obj_request_done_init(obj_request);
1412 init_completion(&obj_request->completion);
1413 kref_init(&obj_request->kref);
1415 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1416 offset, length, (int)type, obj_request);
1421 static void rbd_obj_request_destroy(struct kref *kref)
1423 struct rbd_obj_request *obj_request;
1425 obj_request = container_of(kref, struct rbd_obj_request, kref);
1427 dout("%s: obj %p\n", __func__, obj_request);
1429 rbd_assert(obj_request->img_request == NULL);
1430 rbd_assert(obj_request->which == BAD_WHICH);
1432 if (obj_request->osd_req)
1433 rbd_osd_req_destroy(obj_request->osd_req);
1435 rbd_assert(obj_request_type_valid(obj_request->type));
1436 switch (obj_request->type) {
1437 case OBJ_REQUEST_NODATA:
1438 break; /* Nothing to do */
1439 case OBJ_REQUEST_BIO:
1440 if (obj_request->bio_list)
1441 bio_chain_put(obj_request->bio_list);
1443 case OBJ_REQUEST_PAGES:
1444 if (obj_request->pages)
1445 ceph_release_page_vector(obj_request->pages,
1446 obj_request->page_count);
1454 * Caller is responsible for filling in the list of object requests
1455 * that comprises the image request, and the Linux request pointer
1456 * (if there is one).
1458 static struct rbd_img_request *rbd_img_request_create(
1459 struct rbd_device *rbd_dev,
1460 u64 offset, u64 length,
1463 struct rbd_img_request *img_request;
1464 struct ceph_snap_context *snapc = NULL;
1466 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1470 if (write_request) {
1471 down_read(&rbd_dev->header_rwsem);
1472 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1473 up_read(&rbd_dev->header_rwsem);
1474 if (WARN_ON(!snapc)) {
1476 return NULL; /* Shouldn't happen */
1480 img_request->rq = NULL;
1481 img_request->rbd_dev = rbd_dev;
1482 img_request->offset = offset;
1483 img_request->length = length;
1484 img_request->write_request = write_request;
1486 img_request->snapc = snapc;
1488 img_request->snap_id = rbd_dev->spec->snap_id;
1489 spin_lock_init(&img_request->completion_lock);
1490 img_request->next_completion = 0;
1491 img_request->callback = NULL;
1492 img_request->result = 0;
1493 img_request->obj_request_count = 0;
1494 INIT_LIST_HEAD(&img_request->obj_requests);
1495 kref_init(&img_request->kref);
1497 rbd_img_request_get(img_request); /* Avoid a warning */
1498 rbd_img_request_put(img_request); /* TEMPORARY */
1500 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1501 write_request ? "write" : "read", offset, length,
1507 static void rbd_img_request_destroy(struct kref *kref)
1509 struct rbd_img_request *img_request;
1510 struct rbd_obj_request *obj_request;
1511 struct rbd_obj_request *next_obj_request;
1513 img_request = container_of(kref, struct rbd_img_request, kref);
1515 dout("%s: img %p\n", __func__, img_request);
1517 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1518 rbd_img_obj_request_del(img_request, obj_request);
1519 rbd_assert(img_request->obj_request_count == 0);
1521 if (img_request->write_request)
1522 ceph_put_snap_context(img_request->snapc);
1527 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1529 struct rbd_img_request *img_request;
1530 u32 which = obj_request->which;
1533 img_request = obj_request->img_request;
1535 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1536 rbd_assert(img_request != NULL);
1537 rbd_assert(img_request->rq != NULL);
1538 rbd_assert(img_request->obj_request_count > 0);
1539 rbd_assert(which != BAD_WHICH);
1540 rbd_assert(which < img_request->obj_request_count);
1541 rbd_assert(which >= img_request->next_completion);
1543 spin_lock_irq(&img_request->completion_lock);
1544 if (which != img_request->next_completion)
1547 for_each_obj_request_from(img_request, obj_request) {
1548 unsigned int xferred;
1552 rbd_assert(which < img_request->obj_request_count);
1554 if (!obj_request_done_test(obj_request))
1557 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1558 xferred = (unsigned int)obj_request->xferred;
1559 result = obj_request->result;
1561 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1562 img_request->write_request ? "write" : "read",
1564 if (!img_request->result)
1565 img_request->result = result;
1568 more = blk_end_request(img_request->rq, result, xferred);
1572 rbd_assert(more ^ (which == img_request->obj_request_count));
1573 img_request->next_completion = which;
1575 spin_unlock_irq(&img_request->completion_lock);
1578 rbd_img_request_complete(img_request);
1581 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1582 struct bio *bio_list)
1584 struct rbd_device *rbd_dev = img_request->rbd_dev;
1585 struct rbd_obj_request *obj_request = NULL;
1586 struct rbd_obj_request *next_obj_request;
1587 bool write_request = img_request->write_request;
1588 unsigned int bio_offset;
1593 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1595 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1597 image_offset = img_request->offset;
1598 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1599 resid = img_request->length;
1600 rbd_assert(resid > 0);
1602 struct ceph_osd_request *osd_req;
1603 const char *object_name;
1604 unsigned int clone_size;
1608 object_name = rbd_segment_name(rbd_dev, image_offset);
1611 offset = rbd_segment_offset(rbd_dev, image_offset);
1612 length = rbd_segment_length(rbd_dev, image_offset, resid);
1613 obj_request = rbd_obj_request_create(object_name,
1616 kfree(object_name); /* object request has its own copy */
1620 rbd_assert(length <= (u64) UINT_MAX);
1621 clone_size = (unsigned int) length;
1622 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1623 &bio_offset, clone_size,
1625 if (!obj_request->bio_list)
1628 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1632 obj_request->osd_req = osd_req;
1633 obj_request->callback = rbd_img_obj_callback;
1635 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1637 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1638 obj_request->bio_list, obj_request->length);
1639 rbd_osd_req_format(obj_request, write_request);
1641 rbd_img_obj_request_add(img_request, obj_request);
1643 image_offset += length;
1650 rbd_obj_request_put(obj_request);
1652 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1653 rbd_obj_request_put(obj_request);
1658 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1660 struct rbd_device *rbd_dev = img_request->rbd_dev;
1661 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1662 struct rbd_obj_request *obj_request;
1663 struct rbd_obj_request *next_obj_request;
1665 dout("%s: img %p\n", __func__, img_request);
1666 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1669 ret = rbd_obj_request_submit(osdc, obj_request);
1673 * The image request has its own reference to each
1674 * of its object requests, so we can safely drop the
1677 rbd_obj_request_put(obj_request);
1683 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1684 u64 ver, u64 notify_id)
1686 struct rbd_obj_request *obj_request;
1687 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1690 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1691 OBJ_REQUEST_NODATA);
1696 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1697 if (!obj_request->osd_req)
1699 obj_request->callback = rbd_obj_request_put;
1701 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1703 rbd_osd_req_format(obj_request, false);
1705 ret = rbd_obj_request_submit(osdc, obj_request);
1708 rbd_obj_request_put(obj_request);
1713 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1715 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1722 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1723 rbd_dev->header_name, (unsigned long long) notify_id,
1724 (unsigned int) opcode);
1725 rc = rbd_dev_refresh(rbd_dev, &hver);
1727 rbd_warn(rbd_dev, "got notification but failed to "
1728 " update snaps: %d\n", rc);
1730 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1734 * Request sync osd watch/unwatch. The value of "start" determines
1735 * whether a watch request is being initiated or torn down.
1737 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1739 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1740 struct rbd_obj_request *obj_request;
1743 rbd_assert(start ^ !!rbd_dev->watch_event);
1744 rbd_assert(start ^ !!rbd_dev->watch_request);
1747 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1748 &rbd_dev->watch_event);
1751 rbd_assert(rbd_dev->watch_event != NULL);
1755 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1756 OBJ_REQUEST_NODATA);
1760 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1761 if (!obj_request->osd_req)
1765 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1767 ceph_osdc_unregister_linger_request(osdc,
1768 rbd_dev->watch_request->osd_req);
1770 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1771 rbd_dev->watch_event->cookie,
1772 rbd_dev->header.obj_version, start);
1773 rbd_osd_req_format(obj_request, true);
1775 ret = rbd_obj_request_submit(osdc, obj_request);
1778 ret = rbd_obj_request_wait(obj_request);
1781 ret = obj_request->result;
1786 * A watch request is set to linger, so the underlying osd
1787 * request won't go away until we unregister it. We retain
1788 * a pointer to the object request during that time (in
1789 * rbd_dev->watch_request), so we'll keep a reference to
1790 * it. We'll drop that reference (below) after we've
1794 rbd_dev->watch_request = obj_request;
1799 /* We have successfully torn down the watch request */
1801 rbd_obj_request_put(rbd_dev->watch_request);
1802 rbd_dev->watch_request = NULL;
1804 /* Cancel the event if we're tearing down, or on error */
1805 ceph_osdc_cancel_event(rbd_dev->watch_event);
1806 rbd_dev->watch_event = NULL;
1808 rbd_obj_request_put(obj_request);
1814 * Synchronous osd object method call
1816 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1817 const char *object_name,
1818 const char *class_name,
1819 const char *method_name,
1820 const char *outbound,
1821 size_t outbound_size,
1823 size_t inbound_size,
1826 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1827 struct rbd_obj_request *obj_request;
1828 struct page **pages;
1833 * Method calls are ultimately read operations. The result
1834 * should placed into the inbound buffer provided. They
1835 * also supply outbound data--parameters for the object
1836 * method. Currently if this is present it will be a
1839 page_count = (u32) calc_pages_for(0, inbound_size);
1840 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1842 return PTR_ERR(pages);
1845 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1850 obj_request->pages = pages;
1851 obj_request->page_count = page_count;
1853 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1854 if (!obj_request->osd_req)
1857 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1858 class_name, method_name);
1859 if (outbound_size) {
1860 struct ceph_pagelist *pagelist;
1862 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1866 ceph_pagelist_init(pagelist);
1867 ceph_pagelist_append(pagelist, outbound, outbound_size);
1868 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1871 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1872 obj_request->pages, inbound_size,
1874 rbd_osd_req_format(obj_request, false);
1876 ret = rbd_obj_request_submit(osdc, obj_request);
1879 ret = rbd_obj_request_wait(obj_request);
1883 ret = obj_request->result;
1887 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1889 *version = obj_request->version;
1892 rbd_obj_request_put(obj_request);
1894 ceph_release_page_vector(pages, page_count);
1899 static void rbd_request_fn(struct request_queue *q)
1900 __releases(q->queue_lock) __acquires(q->queue_lock)
1902 struct rbd_device *rbd_dev = q->queuedata;
1903 bool read_only = rbd_dev->mapping.read_only;
1907 while ((rq = blk_fetch_request(q))) {
1908 bool write_request = rq_data_dir(rq) == WRITE;
1909 struct rbd_img_request *img_request;
1913 /* Ignore any non-FS requests that filter through. */
1915 if (rq->cmd_type != REQ_TYPE_FS) {
1916 dout("%s: non-fs request type %d\n", __func__,
1917 (int) rq->cmd_type);
1918 __blk_end_request_all(rq, 0);
1922 /* Ignore/skip any zero-length requests */
1924 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1925 length = (u64) blk_rq_bytes(rq);
1928 dout("%s: zero-length request\n", __func__);
1929 __blk_end_request_all(rq, 0);
1933 spin_unlock_irq(q->queue_lock);
1935 /* Disallow writes to a read-only device */
1937 if (write_request) {
1941 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1945 * Quit early if the mapped snapshot no longer
1946 * exists. It's still possible the snapshot will
1947 * have disappeared by the time our request arrives
1948 * at the osd, but there's no sense in sending it if
1951 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1952 dout("request for non-existent snapshot");
1953 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1959 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1960 goto end_request; /* Shouldn't happen */
1963 img_request = rbd_img_request_create(rbd_dev, offset, length,
1968 img_request->rq = rq;
1970 result = rbd_img_request_fill_bio(img_request, rq->bio);
1972 result = rbd_img_request_submit(img_request);
1974 rbd_img_request_put(img_request);
1976 spin_lock_irq(q->queue_lock);
1978 rbd_warn(rbd_dev, "obj_request %s result %d\n",
1979 write_request ? "write" : "read", result);
1980 __blk_end_request_all(rq, result);
1986 * a queue callback. Makes sure that we don't create a bio that spans across
1987 * multiple osd objects. One exception would be with a single page bios,
1988 * which we handle later at bio_chain_clone_range()
1990 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1991 struct bio_vec *bvec)
1993 struct rbd_device *rbd_dev = q->queuedata;
1994 sector_t sector_offset;
1995 sector_t sectors_per_obj;
1996 sector_t obj_sector_offset;
2000 * Find how far into its rbd object the partition-relative
2001 * bio start sector is to offset relative to the enclosing
2004 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2005 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2006 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2009 * Compute the number of bytes from that offset to the end
2010 * of the object. Account for what's already used by the bio.
2012 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2013 if (ret > bmd->bi_size)
2014 ret -= bmd->bi_size;
2019 * Don't send back more than was asked for. And if the bio
2020 * was empty, let the whole thing through because: "Note
2021 * that a block device *must* allow a single page to be
2022 * added to an empty bio."
2024 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2025 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2026 ret = (int) bvec->bv_len;
2031 static void rbd_free_disk(struct rbd_device *rbd_dev)
2033 struct gendisk *disk = rbd_dev->disk;
2038 if (disk->flags & GENHD_FL_UP)
2041 blk_cleanup_queue(disk->queue);
2045 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2046 const char *object_name,
2047 u64 offset, u64 length,
2048 char *buf, u64 *version)
2051 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2052 struct rbd_obj_request *obj_request;
2053 struct page **pages = NULL;
2058 page_count = (u32) calc_pages_for(offset, length);
2059 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2061 ret = PTR_ERR(pages);
2064 obj_request = rbd_obj_request_create(object_name, offset, length,
2069 obj_request->pages = pages;
2070 obj_request->page_count = page_count;
2072 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2073 if (!obj_request->osd_req)
2076 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2077 offset, length, 0, 0);
2078 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2080 obj_request->length,
2081 obj_request->offset & ~PAGE_MASK,
2083 rbd_osd_req_format(obj_request, false);
2085 ret = rbd_obj_request_submit(osdc, obj_request);
2088 ret = rbd_obj_request_wait(obj_request);
2092 ret = obj_request->result;
2096 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2097 size = (size_t) obj_request->xferred;
2098 ceph_copy_from_page_vector(pages, buf, 0, size);
2099 rbd_assert(size <= (size_t) INT_MAX);
2102 *version = obj_request->version;
2105 rbd_obj_request_put(obj_request);
2107 ceph_release_page_vector(pages, page_count);
2113 * Read the complete header for the given rbd device.
2115 * Returns a pointer to a dynamically-allocated buffer containing
2116 * the complete and validated header. Caller can pass the address
2117 * of a variable that will be filled in with the version of the
2118 * header object at the time it was read.
2120 * Returns a pointer-coded errno if a failure occurs.
2122 static struct rbd_image_header_ondisk *
2123 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2125 struct rbd_image_header_ondisk *ondisk = NULL;
2132 * The complete header will include an array of its 64-bit
2133 * snapshot ids, followed by the names of those snapshots as
2134 * a contiguous block of NUL-terminated strings. Note that
2135 * the number of snapshots could change by the time we read
2136 * it in, in which case we re-read it.
2143 size = sizeof (*ondisk);
2144 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2146 ondisk = kmalloc(size, GFP_KERNEL);
2148 return ERR_PTR(-ENOMEM);
2150 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2152 (char *) ondisk, version);
2155 if (WARN_ON((size_t) ret < size)) {
2157 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2161 if (!rbd_dev_ondisk_valid(ondisk)) {
2163 rbd_warn(rbd_dev, "invalid header");
2167 names_size = le64_to_cpu(ondisk->snap_names_len);
2168 want_count = snap_count;
2169 snap_count = le32_to_cpu(ondisk->snap_count);
2170 } while (snap_count != want_count);
2177 return ERR_PTR(ret);
2181 * reload the ondisk the header
2183 static int rbd_read_header(struct rbd_device *rbd_dev,
2184 struct rbd_image_header *header)
2186 struct rbd_image_header_ondisk *ondisk;
2190 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2192 return PTR_ERR(ondisk);
2193 ret = rbd_header_from_disk(header, ondisk);
2195 header->obj_version = ver;
2201 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2203 struct rbd_snap *snap;
2204 struct rbd_snap *next;
2206 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2207 rbd_remove_snap_dev(snap);
2210 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2214 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2217 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2218 dout("setting size to %llu sectors", (unsigned long long) size);
2219 rbd_dev->mapping.size = (u64) size;
2220 set_capacity(rbd_dev->disk, size);
2224 * only read the first part of the ondisk header, without the snaps info
2226 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2229 struct rbd_image_header h;
2231 ret = rbd_read_header(rbd_dev, &h);
2235 down_write(&rbd_dev->header_rwsem);
2237 /* Update image size, and check for resize of mapped image */
2238 rbd_dev->header.image_size = h.image_size;
2239 rbd_update_mapping_size(rbd_dev);
2241 /* rbd_dev->header.object_prefix shouldn't change */
2242 kfree(rbd_dev->header.snap_sizes);
2243 kfree(rbd_dev->header.snap_names);
2244 /* osd requests may still refer to snapc */
2245 ceph_put_snap_context(rbd_dev->header.snapc);
2248 *hver = h.obj_version;
2249 rbd_dev->header.obj_version = h.obj_version;
2250 rbd_dev->header.image_size = h.image_size;
2251 rbd_dev->header.snapc = h.snapc;
2252 rbd_dev->header.snap_names = h.snap_names;
2253 rbd_dev->header.snap_sizes = h.snap_sizes;
2254 /* Free the extra copy of the object prefix */
2255 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2256 kfree(h.object_prefix);
2258 ret = rbd_dev_snaps_update(rbd_dev);
2260 ret = rbd_dev_snaps_register(rbd_dev);
2262 up_write(&rbd_dev->header_rwsem);
2267 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2271 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2272 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2273 if (rbd_dev->image_format == 1)
2274 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2276 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2277 mutex_unlock(&ctl_mutex);
2282 static int rbd_init_disk(struct rbd_device *rbd_dev)
2284 struct gendisk *disk;
2285 struct request_queue *q;
2288 /* create gendisk info */
2289 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2293 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2295 disk->major = rbd_dev->major;
2296 disk->first_minor = 0;
2297 disk->fops = &rbd_bd_ops;
2298 disk->private_data = rbd_dev;
2300 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2304 /* We use the default size, but let's be explicit about it. */
2305 blk_queue_physical_block_size(q, SECTOR_SIZE);
2307 /* set io sizes to object size */
2308 segment_size = rbd_obj_bytes(&rbd_dev->header);
2309 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2310 blk_queue_max_segment_size(q, segment_size);
2311 blk_queue_io_min(q, segment_size);
2312 blk_queue_io_opt(q, segment_size);
2314 blk_queue_merge_bvec(q, rbd_merge_bvec);
2317 q->queuedata = rbd_dev;
2319 rbd_dev->disk = disk;
2321 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2334 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2336 return container_of(dev, struct rbd_device, dev);
2339 static ssize_t rbd_size_show(struct device *dev,
2340 struct device_attribute *attr, char *buf)
2342 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2345 down_read(&rbd_dev->header_rwsem);
2346 size = get_capacity(rbd_dev->disk);
2347 up_read(&rbd_dev->header_rwsem);
2349 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2353 * Note this shows the features for whatever's mapped, which is not
2354 * necessarily the base image.
2356 static ssize_t rbd_features_show(struct device *dev,
2357 struct device_attribute *attr, char *buf)
2359 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2361 return sprintf(buf, "0x%016llx\n",
2362 (unsigned long long) rbd_dev->mapping.features);
2365 static ssize_t rbd_major_show(struct device *dev,
2366 struct device_attribute *attr, char *buf)
2368 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2370 return sprintf(buf, "%d\n", rbd_dev->major);
2373 static ssize_t rbd_client_id_show(struct device *dev,
2374 struct device_attribute *attr, char *buf)
2376 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2378 return sprintf(buf, "client%lld\n",
2379 ceph_client_id(rbd_dev->rbd_client->client));
2382 static ssize_t rbd_pool_show(struct device *dev,
2383 struct device_attribute *attr, char *buf)
2385 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2387 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2390 static ssize_t rbd_pool_id_show(struct device *dev,
2391 struct device_attribute *attr, char *buf)
2393 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2395 return sprintf(buf, "%llu\n",
2396 (unsigned long long) rbd_dev->spec->pool_id);
2399 static ssize_t rbd_name_show(struct device *dev,
2400 struct device_attribute *attr, char *buf)
2402 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2404 if (rbd_dev->spec->image_name)
2405 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2407 return sprintf(buf, "(unknown)\n");
2410 static ssize_t rbd_image_id_show(struct device *dev,
2411 struct device_attribute *attr, char *buf)
2413 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2415 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2419 * Shows the name of the currently-mapped snapshot (or
2420 * RBD_SNAP_HEAD_NAME for the base image).
2422 static ssize_t rbd_snap_show(struct device *dev,
2423 struct device_attribute *attr,
2426 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2428 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2432 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2433 * for the parent image. If there is no parent, simply shows
2434 * "(no parent image)".
2436 static ssize_t rbd_parent_show(struct device *dev,
2437 struct device_attribute *attr,
2440 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2441 struct rbd_spec *spec = rbd_dev->parent_spec;
2446 return sprintf(buf, "(no parent image)\n");
2448 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2449 (unsigned long long) spec->pool_id, spec->pool_name);
2454 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2455 spec->image_name ? spec->image_name : "(unknown)");
2460 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2461 (unsigned long long) spec->snap_id, spec->snap_name);
2466 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2471 return (ssize_t) (bufp - buf);
2474 static ssize_t rbd_image_refresh(struct device *dev,
2475 struct device_attribute *attr,
2479 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2482 ret = rbd_dev_refresh(rbd_dev, NULL);
2484 return ret < 0 ? ret : size;
2487 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2488 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2489 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2490 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2491 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2492 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2493 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2494 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2495 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2496 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2497 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2499 static struct attribute *rbd_attrs[] = {
2500 &dev_attr_size.attr,
2501 &dev_attr_features.attr,
2502 &dev_attr_major.attr,
2503 &dev_attr_client_id.attr,
2504 &dev_attr_pool.attr,
2505 &dev_attr_pool_id.attr,
2506 &dev_attr_name.attr,
2507 &dev_attr_image_id.attr,
2508 &dev_attr_current_snap.attr,
2509 &dev_attr_parent.attr,
2510 &dev_attr_refresh.attr,
2514 static struct attribute_group rbd_attr_group = {
2518 static const struct attribute_group *rbd_attr_groups[] = {
2523 static void rbd_sysfs_dev_release(struct device *dev)
2527 static struct device_type rbd_device_type = {
2529 .groups = rbd_attr_groups,
2530 .release = rbd_sysfs_dev_release,
2538 static ssize_t rbd_snap_size_show(struct device *dev,
2539 struct device_attribute *attr,
2542 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2544 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2547 static ssize_t rbd_snap_id_show(struct device *dev,
2548 struct device_attribute *attr,
2551 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2553 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2556 static ssize_t rbd_snap_features_show(struct device *dev,
2557 struct device_attribute *attr,
2560 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2562 return sprintf(buf, "0x%016llx\n",
2563 (unsigned long long) snap->features);
2566 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2567 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2568 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2570 static struct attribute *rbd_snap_attrs[] = {
2571 &dev_attr_snap_size.attr,
2572 &dev_attr_snap_id.attr,
2573 &dev_attr_snap_features.attr,
2577 static struct attribute_group rbd_snap_attr_group = {
2578 .attrs = rbd_snap_attrs,
2581 static void rbd_snap_dev_release(struct device *dev)
2583 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2588 static const struct attribute_group *rbd_snap_attr_groups[] = {
2589 &rbd_snap_attr_group,
2593 static struct device_type rbd_snap_device_type = {
2594 .groups = rbd_snap_attr_groups,
2595 .release = rbd_snap_dev_release,
2598 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2600 kref_get(&spec->kref);
2605 static void rbd_spec_free(struct kref *kref);
2606 static void rbd_spec_put(struct rbd_spec *spec)
2609 kref_put(&spec->kref, rbd_spec_free);
2612 static struct rbd_spec *rbd_spec_alloc(void)
2614 struct rbd_spec *spec;
2616 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2619 kref_init(&spec->kref);
2621 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2626 static void rbd_spec_free(struct kref *kref)
2628 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2630 kfree(spec->pool_name);
2631 kfree(spec->image_id);
2632 kfree(spec->image_name);
2633 kfree(spec->snap_name);
2637 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2638 struct rbd_spec *spec)
2640 struct rbd_device *rbd_dev;
2642 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2646 spin_lock_init(&rbd_dev->lock);
2648 INIT_LIST_HEAD(&rbd_dev->node);
2649 INIT_LIST_HEAD(&rbd_dev->snaps);
2650 init_rwsem(&rbd_dev->header_rwsem);
2652 rbd_dev->spec = spec;
2653 rbd_dev->rbd_client = rbdc;
2655 /* Initialize the layout used for all rbd requests */
2657 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2658 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2659 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2660 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2665 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2667 rbd_spec_put(rbd_dev->parent_spec);
2668 kfree(rbd_dev->header_name);
2669 rbd_put_client(rbd_dev->rbd_client);
2670 rbd_spec_put(rbd_dev->spec);
2674 static bool rbd_snap_registered(struct rbd_snap *snap)
2676 bool ret = snap->dev.type == &rbd_snap_device_type;
2677 bool reg = device_is_registered(&snap->dev);
2679 rbd_assert(!ret ^ reg);
2684 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2686 list_del(&snap->node);
2687 if (device_is_registered(&snap->dev))
2688 device_unregister(&snap->dev);
2691 static int rbd_register_snap_dev(struct rbd_snap *snap,
2692 struct device *parent)
2694 struct device *dev = &snap->dev;
2697 dev->type = &rbd_snap_device_type;
2698 dev->parent = parent;
2699 dev->release = rbd_snap_dev_release;
2700 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2701 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2703 ret = device_register(dev);
2708 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2709 const char *snap_name,
2710 u64 snap_id, u64 snap_size,
2713 struct rbd_snap *snap;
2716 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2718 return ERR_PTR(-ENOMEM);
2721 snap->name = kstrdup(snap_name, GFP_KERNEL);
2726 snap->size = snap_size;
2727 snap->features = snap_features;
2735 return ERR_PTR(ret);
2738 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2739 u64 *snap_size, u64 *snap_features)
2743 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2745 *snap_size = rbd_dev->header.snap_sizes[which];
2746 *snap_features = 0; /* No features for v1 */
2748 /* Skip over names until we find the one we are looking for */
2750 snap_name = rbd_dev->header.snap_names;
2752 snap_name += strlen(snap_name) + 1;
2758 * Get the size and object order for an image snapshot, or if
2759 * snap_id is CEPH_NOSNAP, gets this information for the base
2762 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2763 u8 *order, u64 *snap_size)
2765 __le64 snapid = cpu_to_le64(snap_id);
2770 } __attribute__ ((packed)) size_buf = { 0 };
2772 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2774 (char *) &snapid, sizeof (snapid),
2775 (char *) &size_buf, sizeof (size_buf), NULL);
2776 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2780 *order = size_buf.order;
2781 *snap_size = le64_to_cpu(size_buf.size);
2783 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2784 (unsigned long long) snap_id, (unsigned int) *order,
2785 (unsigned long long) *snap_size);
2790 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2792 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2793 &rbd_dev->header.obj_order,
2794 &rbd_dev->header.image_size);
2797 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2803 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2807 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2808 "rbd", "get_object_prefix",
2810 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2811 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2816 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2817 p + RBD_OBJ_PREFIX_LEN_MAX,
2820 if (IS_ERR(rbd_dev->header.object_prefix)) {
2821 ret = PTR_ERR(rbd_dev->header.object_prefix);
2822 rbd_dev->header.object_prefix = NULL;
2824 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2833 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2836 __le64 snapid = cpu_to_le64(snap_id);
2840 } features_buf = { 0 };
2844 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2845 "rbd", "get_features",
2846 (char *) &snapid, sizeof (snapid),
2847 (char *) &features_buf, sizeof (features_buf),
2849 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2853 incompat = le64_to_cpu(features_buf.incompat);
2854 if (incompat & ~RBD_FEATURES_SUPPORTED)
2857 *snap_features = le64_to_cpu(features_buf.features);
2859 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2860 (unsigned long long) snap_id,
2861 (unsigned long long) *snap_features,
2862 (unsigned long long) le64_to_cpu(features_buf.incompat));
2867 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2869 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2870 &rbd_dev->header.features);
2873 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2875 struct rbd_spec *parent_spec;
2877 void *reply_buf = NULL;
2885 parent_spec = rbd_spec_alloc();
2889 size = sizeof (__le64) + /* pool_id */
2890 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2891 sizeof (__le64) + /* snap_id */
2892 sizeof (__le64); /* overlap */
2893 reply_buf = kmalloc(size, GFP_KERNEL);
2899 snapid = cpu_to_le64(CEPH_NOSNAP);
2900 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2901 "rbd", "get_parent",
2902 (char *) &snapid, sizeof (snapid),
2903 (char *) reply_buf, size, NULL);
2904 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2910 end = (char *) reply_buf + size;
2911 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2912 if (parent_spec->pool_id == CEPH_NOPOOL)
2913 goto out; /* No parent? No problem. */
2915 /* The ceph file layout needs to fit pool id in 32 bits */
2918 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2921 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2922 if (IS_ERR(image_id)) {
2923 ret = PTR_ERR(image_id);
2926 parent_spec->image_id = image_id;
2927 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2928 ceph_decode_64_safe(&p, end, overlap, out_err);
2930 rbd_dev->parent_overlap = overlap;
2931 rbd_dev->parent_spec = parent_spec;
2932 parent_spec = NULL; /* rbd_dev now owns this */
2937 rbd_spec_put(parent_spec);
2942 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2944 size_t image_id_size;
2949 void *reply_buf = NULL;
2951 char *image_name = NULL;
2954 rbd_assert(!rbd_dev->spec->image_name);
2956 len = strlen(rbd_dev->spec->image_id);
2957 image_id_size = sizeof (__le32) + len;
2958 image_id = kmalloc(image_id_size, GFP_KERNEL);
2963 end = (char *) image_id + image_id_size;
2964 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2966 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2967 reply_buf = kmalloc(size, GFP_KERNEL);
2971 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2972 "rbd", "dir_get_name",
2973 image_id, image_id_size,
2974 (char *) reply_buf, size, NULL);
2978 end = (char *) reply_buf + size;
2979 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2980 if (IS_ERR(image_name))
2983 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2992 * When a parent image gets probed, we only have the pool, image,
2993 * and snapshot ids but not the names of any of them. This call
2994 * is made later to fill in those names. It has to be done after
2995 * rbd_dev_snaps_update() has completed because some of the
2996 * information (in particular, snapshot name) is not available
2999 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3001 struct ceph_osd_client *osdc;
3003 void *reply_buf = NULL;
3006 if (rbd_dev->spec->pool_name)
3007 return 0; /* Already have the names */
3009 /* Look up the pool name */
3011 osdc = &rbd_dev->rbd_client->client->osdc;
3012 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3014 rbd_warn(rbd_dev, "there is no pool with id %llu",
3015 rbd_dev->spec->pool_id); /* Really a BUG() */
3019 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3020 if (!rbd_dev->spec->pool_name)
3023 /* Fetch the image name; tolerate failure here */
3025 name = rbd_dev_image_name(rbd_dev);
3027 rbd_dev->spec->image_name = (char *) name;
3029 rbd_warn(rbd_dev, "unable to get image name");
3031 /* Look up the snapshot name. */
3033 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3035 rbd_warn(rbd_dev, "no snapshot with id %llu",
3036 rbd_dev->spec->snap_id); /* Really a BUG() */
3040 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3041 if(!rbd_dev->spec->snap_name)
3047 kfree(rbd_dev->spec->pool_name);
3048 rbd_dev->spec->pool_name = NULL;
3053 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3062 struct ceph_snap_context *snapc;
3066 * We'll need room for the seq value (maximum snapshot id),
3067 * snapshot count, and array of that many snapshot ids.
3068 * For now we have a fixed upper limit on the number we're
3069 * prepared to receive.
3071 size = sizeof (__le64) + sizeof (__le32) +
3072 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3073 reply_buf = kzalloc(size, GFP_KERNEL);
3077 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3078 "rbd", "get_snapcontext",
3080 reply_buf, size, ver);
3081 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3087 end = (char *) reply_buf + size;
3088 ceph_decode_64_safe(&p, end, seq, out);
3089 ceph_decode_32_safe(&p, end, snap_count, out);
3092 * Make sure the reported number of snapshot ids wouldn't go
3093 * beyond the end of our buffer. But before checking that,
3094 * make sure the computed size of the snapshot context we
3095 * allocate is representable in a size_t.
3097 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3102 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3105 size = sizeof (struct ceph_snap_context) +
3106 snap_count * sizeof (snapc->snaps[0]);
3107 snapc = kmalloc(size, GFP_KERNEL);
3113 atomic_set(&snapc->nref, 1);
3115 snapc->num_snaps = snap_count;
3116 for (i = 0; i < snap_count; i++)
3117 snapc->snaps[i] = ceph_decode_64(&p);
3119 rbd_dev->header.snapc = snapc;
3121 dout(" snap context seq = %llu, snap_count = %u\n",
3122 (unsigned long long) seq, (unsigned int) snap_count);
3130 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3140 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3141 reply_buf = kmalloc(size, GFP_KERNEL);
3143 return ERR_PTR(-ENOMEM);
3145 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3146 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3147 "rbd", "get_snapshot_name",
3148 (char *) &snap_id, sizeof (snap_id),
3149 reply_buf, size, NULL);
3150 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3155 end = (char *) reply_buf + size;
3156 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3157 if (IS_ERR(snap_name)) {
3158 ret = PTR_ERR(snap_name);
3161 dout(" snap_id 0x%016llx snap_name = %s\n",
3162 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3170 return ERR_PTR(ret);
3173 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3174 u64 *snap_size, u64 *snap_features)
3180 snap_id = rbd_dev->header.snapc->snaps[which];
3181 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3183 return ERR_PTR(ret);
3184 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3186 return ERR_PTR(ret);
3188 return rbd_dev_v2_snap_name(rbd_dev, which);
3191 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3192 u64 *snap_size, u64 *snap_features)
3194 if (rbd_dev->image_format == 1)
3195 return rbd_dev_v1_snap_info(rbd_dev, which,
3196 snap_size, snap_features);
3197 if (rbd_dev->image_format == 2)
3198 return rbd_dev_v2_snap_info(rbd_dev, which,
3199 snap_size, snap_features);
3200 return ERR_PTR(-EINVAL);
3203 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3208 down_write(&rbd_dev->header_rwsem);
3210 /* Grab old order first, to see if it changes */
3212 obj_order = rbd_dev->header.obj_order,
3213 ret = rbd_dev_v2_image_size(rbd_dev);
3216 if (rbd_dev->header.obj_order != obj_order) {
3220 rbd_update_mapping_size(rbd_dev);
3222 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3223 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3226 ret = rbd_dev_snaps_update(rbd_dev);
3227 dout("rbd_dev_snaps_update returned %d\n", ret);
3230 ret = rbd_dev_snaps_register(rbd_dev);
3231 dout("rbd_dev_snaps_register returned %d\n", ret);
3233 up_write(&rbd_dev->header_rwsem);
3239 * Scan the rbd device's current snapshot list and compare it to the
3240 * newly-received snapshot context. Remove any existing snapshots
3241 * not present in the new snapshot context. Add a new snapshot for
3242 * any snaphots in the snapshot context not in the current list.
3243 * And verify there are no changes to snapshots we already know
3246 * Assumes the snapshots in the snapshot context are sorted by
3247 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3248 * are also maintained in that order.)
3250 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3252 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3253 const u32 snap_count = snapc->num_snaps;
3254 struct list_head *head = &rbd_dev->snaps;
3255 struct list_head *links = head->next;
3258 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3259 while (index < snap_count || links != head) {
3261 struct rbd_snap *snap;
3264 u64 snap_features = 0;
3266 snap_id = index < snap_count ? snapc->snaps[index]
3268 snap = links != head ? list_entry(links, struct rbd_snap, node)
3270 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3272 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3273 struct list_head *next = links->next;
3276 * A previously-existing snapshot is not in
3277 * the new snap context.
3279 * If the now missing snapshot is the one the
3280 * image is mapped to, clear its exists flag
3281 * so we can avoid sending any more requests
3284 if (rbd_dev->spec->snap_id == snap->id)
3285 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3286 rbd_remove_snap_dev(snap);
3287 dout("%ssnap id %llu has been removed\n",
3288 rbd_dev->spec->snap_id == snap->id ?
3290 (unsigned long long) snap->id);
3292 /* Done with this list entry; advance */
3298 snap_name = rbd_dev_snap_info(rbd_dev, index,
3299 &snap_size, &snap_features);
3300 if (IS_ERR(snap_name))
3301 return PTR_ERR(snap_name);
3303 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3304 (unsigned long long) snap_id);
3305 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3306 struct rbd_snap *new_snap;
3308 /* We haven't seen this snapshot before */
3310 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3311 snap_id, snap_size, snap_features);
3312 if (IS_ERR(new_snap)) {
3313 int err = PTR_ERR(new_snap);
3315 dout(" failed to add dev, error %d\n", err);
3320 /* New goes before existing, or at end of list */
3322 dout(" added dev%s\n", snap ? "" : " at end\n");
3324 list_add_tail(&new_snap->node, &snap->node);
3326 list_add_tail(&new_snap->node, head);
3328 /* Already have this one */
3330 dout(" already present\n");
3332 rbd_assert(snap->size == snap_size);
3333 rbd_assert(!strcmp(snap->name, snap_name));
3334 rbd_assert(snap->features == snap_features);
3336 /* Done with this list entry; advance */
3338 links = links->next;
3341 /* Advance to the next entry in the snapshot context */
3345 dout("%s: done\n", __func__);
3351 * Scan the list of snapshots and register the devices for any that
3352 * have not already been registered.
3354 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3356 struct rbd_snap *snap;
3359 dout("%s:\n", __func__);
3360 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3363 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3364 if (!rbd_snap_registered(snap)) {
3365 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3370 dout("%s: returning %d\n", __func__, ret);
3375 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3380 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3382 dev = &rbd_dev->dev;
3383 dev->bus = &rbd_bus_type;
3384 dev->type = &rbd_device_type;
3385 dev->parent = &rbd_root_dev;
3386 dev->release = rbd_dev_release;
3387 dev_set_name(dev, "%d", rbd_dev->dev_id);
3388 ret = device_register(dev);
3390 mutex_unlock(&ctl_mutex);
3395 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3397 device_unregister(&rbd_dev->dev);
3400 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3403 * Get a unique rbd identifier for the given new rbd_dev, and add
3404 * the rbd_dev to the global list. The minimum rbd id is 1.
3406 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3408 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3410 spin_lock(&rbd_dev_list_lock);
3411 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3412 spin_unlock(&rbd_dev_list_lock);
3413 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3414 (unsigned long long) rbd_dev->dev_id);
3418 * Remove an rbd_dev from the global list, and record that its
3419 * identifier is no longer in use.
3421 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3423 struct list_head *tmp;
3424 int rbd_id = rbd_dev->dev_id;
3427 rbd_assert(rbd_id > 0);
3429 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3430 (unsigned long long) rbd_dev->dev_id);
3431 spin_lock(&rbd_dev_list_lock);
3432 list_del_init(&rbd_dev->node);
3435 * If the id being "put" is not the current maximum, there
3436 * is nothing special we need to do.
3438 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3439 spin_unlock(&rbd_dev_list_lock);
3444 * We need to update the current maximum id. Search the
3445 * list to find out what it is. We're more likely to find
3446 * the maximum at the end, so search the list backward.
3449 list_for_each_prev(tmp, &rbd_dev_list) {
3450 struct rbd_device *rbd_dev;
3452 rbd_dev = list_entry(tmp, struct rbd_device, node);
3453 if (rbd_dev->dev_id > max_id)
3454 max_id = rbd_dev->dev_id;
3456 spin_unlock(&rbd_dev_list_lock);
3459 * The max id could have been updated by rbd_dev_id_get(), in
3460 * which case it now accurately reflects the new maximum.
3461 * Be careful not to overwrite the maximum value in that
3464 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3465 dout(" max dev id has been reset\n");
3469 * Skips over white space at *buf, and updates *buf to point to the
3470 * first found non-space character (if any). Returns the length of
3471 * the token (string of non-white space characters) found. Note
3472 * that *buf must be terminated with '\0'.
3474 static inline size_t next_token(const char **buf)
3477 * These are the characters that produce nonzero for
3478 * isspace() in the "C" and "POSIX" locales.
3480 const char *spaces = " \f\n\r\t\v";
3482 *buf += strspn(*buf, spaces); /* Find start of token */
3484 return strcspn(*buf, spaces); /* Return token length */
3488 * Finds the next token in *buf, and if the provided token buffer is
3489 * big enough, copies the found token into it. The result, if
3490 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3491 * must be terminated with '\0' on entry.
3493 * Returns the length of the token found (not including the '\0').
3494 * Return value will be 0 if no token is found, and it will be >=
3495 * token_size if the token would not fit.
3497 * The *buf pointer will be updated to point beyond the end of the
3498 * found token. Note that this occurs even if the token buffer is
3499 * too small to hold it.
3501 static inline size_t copy_token(const char **buf,
3507 len = next_token(buf);
3508 if (len < token_size) {
3509 memcpy(token, *buf, len);
3510 *(token + len) = '\0';
3518 * Finds the next token in *buf, dynamically allocates a buffer big
3519 * enough to hold a copy of it, and copies the token into the new
3520 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3521 * that a duplicate buffer is created even for a zero-length token.
3523 * Returns a pointer to the newly-allocated duplicate, or a null
3524 * pointer if memory for the duplicate was not available. If
3525 * the lenp argument is a non-null pointer, the length of the token
3526 * (not including the '\0') is returned in *lenp.
3528 * If successful, the *buf pointer will be updated to point beyond
3529 * the end of the found token.
3531 * Note: uses GFP_KERNEL for allocation.
3533 static inline char *dup_token(const char **buf, size_t *lenp)
3538 len = next_token(buf);
3539 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3542 *(dup + len) = '\0';
3552 * Parse the options provided for an "rbd add" (i.e., rbd image
3553 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3554 * and the data written is passed here via a NUL-terminated buffer.
3555 * Returns 0 if successful or an error code otherwise.
3557 * The information extracted from these options is recorded in
3558 * the other parameters which return dynamically-allocated
3561 * The address of a pointer that will refer to a ceph options
3562 * structure. Caller must release the returned pointer using
3563 * ceph_destroy_options() when it is no longer needed.
3565 * Address of an rbd options pointer. Fully initialized by
3566 * this function; caller must release with kfree().
3568 * Address of an rbd image specification pointer. Fully
3569 * initialized by this function based on parsed options.
3570 * Caller must release with rbd_spec_put().
3572 * The options passed take this form:
3573 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3576 * A comma-separated list of one or more monitor addresses.
3577 * A monitor address is an ip address, optionally followed
3578 * by a port number (separated by a colon).
3579 * I.e.: ip1[:port1][,ip2[:port2]...]
3581 * A comma-separated list of ceph and/or rbd options.
3583 * The name of the rados pool containing the rbd image.
3585 * The name of the image in that pool to map.
3587 * An optional snapshot id. If provided, the mapping will
3588 * present data from the image at the time that snapshot was
3589 * created. The image head is used if no snapshot id is
3590 * provided. Snapshot mappings are always read-only.
3592 static int rbd_add_parse_args(const char *buf,
3593 struct ceph_options **ceph_opts,
3594 struct rbd_options **opts,
3595 struct rbd_spec **rbd_spec)
3599 const char *mon_addrs;
3600 size_t mon_addrs_size;
3601 struct rbd_spec *spec = NULL;
3602 struct rbd_options *rbd_opts = NULL;
3603 struct ceph_options *copts;
3606 /* The first four tokens are required */
3608 len = next_token(&buf);
3610 rbd_warn(NULL, "no monitor address(es) provided");
3614 mon_addrs_size = len + 1;
3618 options = dup_token(&buf, NULL);
3622 rbd_warn(NULL, "no options provided");
3626 spec = rbd_spec_alloc();
3630 spec->pool_name = dup_token(&buf, NULL);
3631 if (!spec->pool_name)
3633 if (!*spec->pool_name) {
3634 rbd_warn(NULL, "no pool name provided");
3638 spec->image_name = dup_token(&buf, NULL);
3639 if (!spec->image_name)
3641 if (!*spec->image_name) {
3642 rbd_warn(NULL, "no image name provided");
3647 * Snapshot name is optional; default is to use "-"
3648 * (indicating the head/no snapshot).
3650 len = next_token(&buf);
3652 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3653 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3654 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3655 ret = -ENAMETOOLONG;
3658 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3659 if (!spec->snap_name)
3661 *(spec->snap_name + len) = '\0';
3663 /* Initialize all rbd options to the defaults */
3665 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3669 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3671 copts = ceph_parse_options(options, mon_addrs,
3672 mon_addrs + mon_addrs_size - 1,
3673 parse_rbd_opts_token, rbd_opts);
3674 if (IS_ERR(copts)) {
3675 ret = PTR_ERR(copts);
3696 * An rbd format 2 image has a unique identifier, distinct from the
3697 * name given to it by the user. Internally, that identifier is
3698 * what's used to specify the names of objects related to the image.
3700 * A special "rbd id" object is used to map an rbd image name to its
3701 * id. If that object doesn't exist, then there is no v2 rbd image
3702 * with the supplied name.
3704 * This function will record the given rbd_dev's image_id field if
3705 * it can be determined, and in that case will return 0. If any
3706 * errors occur a negative errno will be returned and the rbd_dev's
3707 * image_id field will be unchanged (and should be NULL).
3709 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3718 * When probing a parent image, the image id is already
3719 * known (and the image name likely is not). There's no
3720 * need to fetch the image id again in this case.
3722 if (rbd_dev->spec->image_id)
3726 * First, see if the format 2 image id file exists, and if
3727 * so, get the image's persistent id from it.
3729 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3730 object_name = kmalloc(size, GFP_NOIO);
3733 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3734 dout("rbd id object name is %s\n", object_name);
3736 /* Response will be an encoded string, which includes a length */
3738 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3739 response = kzalloc(size, GFP_NOIO);
3745 ret = rbd_obj_method_sync(rbd_dev, object_name,
3748 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3749 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3754 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3755 p + RBD_IMAGE_ID_LEN_MAX,
3757 if (IS_ERR(rbd_dev->spec->image_id)) {
3758 ret = PTR_ERR(rbd_dev->spec->image_id);
3759 rbd_dev->spec->image_id = NULL;
3761 dout("image_id is %s\n", rbd_dev->spec->image_id);
3770 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3775 /* Version 1 images have no id; empty string is used */
3777 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3778 if (!rbd_dev->spec->image_id)
3781 /* Record the header object name for this rbd image. */
3783 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3784 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3785 if (!rbd_dev->header_name) {
3789 sprintf(rbd_dev->header_name, "%s%s",
3790 rbd_dev->spec->image_name, RBD_SUFFIX);
3792 /* Populate rbd image metadata */
3794 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3798 /* Version 1 images have no parent (no layering) */
3800 rbd_dev->parent_spec = NULL;
3801 rbd_dev->parent_overlap = 0;
3803 rbd_dev->image_format = 1;
3805 dout("discovered version 1 image, header name is %s\n",
3806 rbd_dev->header_name);
3811 kfree(rbd_dev->header_name);
3812 rbd_dev->header_name = NULL;
3813 kfree(rbd_dev->spec->image_id);
3814 rbd_dev->spec->image_id = NULL;
3819 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3826 * Image id was filled in by the caller. Record the header
3827 * object name for this rbd image.
3829 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3830 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3831 if (!rbd_dev->header_name)
3833 sprintf(rbd_dev->header_name, "%s%s",
3834 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3836 /* Get the size and object order for the image */
3838 ret = rbd_dev_v2_image_size(rbd_dev);
3842 /* Get the object prefix (a.k.a. block_name) for the image */
3844 ret = rbd_dev_v2_object_prefix(rbd_dev);
3848 /* Get the and check features for the image */
3850 ret = rbd_dev_v2_features(rbd_dev);
3854 /* If the image supports layering, get the parent info */
3856 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3857 ret = rbd_dev_v2_parent_info(rbd_dev);
3862 /* crypto and compression type aren't (yet) supported for v2 images */
3864 rbd_dev->header.crypt_type = 0;
3865 rbd_dev->header.comp_type = 0;
3867 /* Get the snapshot context, plus the header version */
3869 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3872 rbd_dev->header.obj_version = ver;
3874 rbd_dev->image_format = 2;
3876 dout("discovered version 2 image, header name is %s\n",
3877 rbd_dev->header_name);
3881 rbd_dev->parent_overlap = 0;
3882 rbd_spec_put(rbd_dev->parent_spec);
3883 rbd_dev->parent_spec = NULL;
3884 kfree(rbd_dev->header_name);
3885 rbd_dev->header_name = NULL;
3886 kfree(rbd_dev->header.object_prefix);
3887 rbd_dev->header.object_prefix = NULL;
3892 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3896 /* no need to lock here, as rbd_dev is not registered yet */
3897 ret = rbd_dev_snaps_update(rbd_dev);
3901 ret = rbd_dev_probe_update_spec(rbd_dev);
3905 ret = rbd_dev_set_mapping(rbd_dev);
3909 /* generate unique id: find highest unique id, add one */
3910 rbd_dev_id_get(rbd_dev);
3912 /* Fill in the device name, now that we have its id. */
3913 BUILD_BUG_ON(DEV_NAME_LEN
3914 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3915 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3917 /* Get our block major device number. */
3919 ret = register_blkdev(0, rbd_dev->name);
3922 rbd_dev->major = ret;
3924 /* Set up the blkdev mapping. */
3926 ret = rbd_init_disk(rbd_dev);
3928 goto err_out_blkdev;
3930 ret = rbd_bus_add_dev(rbd_dev);
3935 * At this point cleanup in the event of an error is the job
3936 * of the sysfs code (initiated by rbd_bus_del_dev()).
3938 down_write(&rbd_dev->header_rwsem);
3939 ret = rbd_dev_snaps_register(rbd_dev);
3940 up_write(&rbd_dev->header_rwsem);
3944 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3948 /* Everything's ready. Announce the disk to the world. */
3950 add_disk(rbd_dev->disk);
3952 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3953 (unsigned long long) rbd_dev->mapping.size);
3957 /* this will also clean up rest of rbd_dev stuff */
3959 rbd_bus_del_dev(rbd_dev);
3963 rbd_free_disk(rbd_dev);
3965 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3967 rbd_dev_id_put(rbd_dev);
3969 rbd_remove_all_snaps(rbd_dev);
3975 * Probe for the existence of the header object for the given rbd
3976 * device. For format 2 images this includes determining the image
3979 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3984 * Get the id from the image id object. If it's not a
3985 * format 2 image, we'll get ENOENT back, and we'll assume
3986 * it's a format 1 image.
3988 ret = rbd_dev_image_id(rbd_dev);
3990 ret = rbd_dev_v1_probe(rbd_dev);
3992 ret = rbd_dev_v2_probe(rbd_dev);
3994 dout("probe failed, returning %d\n", ret);
3999 ret = rbd_dev_probe_finish(rbd_dev);
4001 rbd_header_free(&rbd_dev->header);
4006 static ssize_t rbd_add(struct bus_type *bus,
4010 struct rbd_device *rbd_dev = NULL;
4011 struct ceph_options *ceph_opts = NULL;
4012 struct rbd_options *rbd_opts = NULL;
4013 struct rbd_spec *spec = NULL;
4014 struct rbd_client *rbdc;
4015 struct ceph_osd_client *osdc;
4018 if (!try_module_get(THIS_MODULE))
4021 /* parse add command */
4022 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4024 goto err_out_module;
4026 rbdc = rbd_get_client(ceph_opts);
4031 ceph_opts = NULL; /* rbd_dev client now owns this */
4034 osdc = &rbdc->client->osdc;
4035 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4037 goto err_out_client;
4038 spec->pool_id = (u64) rc;
4040 /* The ceph file layout needs to fit pool id in 32 bits */
4042 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4044 goto err_out_client;
4047 rbd_dev = rbd_dev_create(rbdc, spec);
4049 goto err_out_client;
4050 rbdc = NULL; /* rbd_dev now owns this */
4051 spec = NULL; /* rbd_dev now owns this */
4053 rbd_dev->mapping.read_only = rbd_opts->read_only;
4055 rbd_opts = NULL; /* done with this */
4057 rc = rbd_dev_probe(rbd_dev);
4059 goto err_out_rbd_dev;
4063 rbd_dev_destroy(rbd_dev);
4065 rbd_put_client(rbdc);
4068 ceph_destroy_options(ceph_opts);
4072 module_put(THIS_MODULE);
4074 dout("Error adding device %s\n", buf);
4076 return (ssize_t) rc;
4079 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4081 struct list_head *tmp;
4082 struct rbd_device *rbd_dev;
4084 spin_lock(&rbd_dev_list_lock);
4085 list_for_each(tmp, &rbd_dev_list) {
4086 rbd_dev = list_entry(tmp, struct rbd_device, node);
4087 if (rbd_dev->dev_id == dev_id) {
4088 spin_unlock(&rbd_dev_list_lock);
4092 spin_unlock(&rbd_dev_list_lock);
4096 static void rbd_dev_release(struct device *dev)
4098 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4100 if (rbd_dev->watch_event)
4101 rbd_dev_header_watch_sync(rbd_dev, 0);
4103 /* clean up and free blkdev */
4104 rbd_free_disk(rbd_dev);
4105 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4107 /* release allocated disk header fields */
4108 rbd_header_free(&rbd_dev->header);
4110 /* done with the id, and with the rbd_dev */
4111 rbd_dev_id_put(rbd_dev);
4112 rbd_assert(rbd_dev->rbd_client != NULL);
4113 rbd_dev_destroy(rbd_dev);
4115 /* release module ref */
4116 module_put(THIS_MODULE);
4119 static ssize_t rbd_remove(struct bus_type *bus,
4123 struct rbd_device *rbd_dev = NULL;
4128 rc = strict_strtoul(buf, 10, &ul);
4132 /* convert to int; abort if we lost anything in the conversion */
4133 target_id = (int) ul;
4134 if (target_id != ul)
4137 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4139 rbd_dev = __rbd_get_dev(target_id);
4145 spin_lock_irq(&rbd_dev->lock);
4146 if (rbd_dev->open_count)
4149 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4150 spin_unlock_irq(&rbd_dev->lock);
4154 rbd_remove_all_snaps(rbd_dev);
4155 rbd_bus_del_dev(rbd_dev);
4158 mutex_unlock(&ctl_mutex);
4164 * create control files in sysfs
4167 static int rbd_sysfs_init(void)
4171 ret = device_register(&rbd_root_dev);
4175 ret = bus_register(&rbd_bus_type);
4177 device_unregister(&rbd_root_dev);
4182 static void rbd_sysfs_cleanup(void)
4184 bus_unregister(&rbd_bus_type);
4185 device_unregister(&rbd_root_dev);
4188 static int __init rbd_init(void)
4192 if (!libceph_compatible(NULL)) {
4193 rbd_warn(NULL, "libceph incompatibility (quitting)");
4197 rc = rbd_sysfs_init();
4200 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4204 static void __exit rbd_exit(void)
4206 rbd_sysfs_cleanup();
4209 module_init(rbd_init);
4210 module_exit(rbd_exit);
4212 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4213 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4214 MODULE_DESCRIPTION("rados block device");
4216 /* following authorship retained from original osdblk.c */
4217 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4219 MODULE_LICENSE("GPL");