2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (0)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header {
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context *snapc;
115 * An rbd image specification.
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
153 * an instance of the client. multiple devices may share an rbd client.
156 struct ceph_client *client;
158 struct list_head node;
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169 enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
173 struct rbd_obj_request {
174 const char *object_name;
175 u64 offset; /* object start byte */
176 u64 length; /* bytes from offset */
178 struct rbd_img_request *img_request;
179 u64 img_offset; /* image relative offset */
180 struct list_head links; /* img_request->obj_requests */
181 u32 which; /* posn image request list */
183 enum obj_request_type type;
185 struct bio *bio_list;
192 struct ceph_osd_request *osd_req;
194 u64 xferred; /* bytes transferred */
199 rbd_obj_callback_t callback;
200 struct completion completion;
206 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
207 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
210 struct rbd_img_request {
211 struct rbd_device *rbd_dev;
212 u64 offset; /* starting image byte offset */
213 u64 length; /* byte count from offset */
216 u64 snap_id; /* for reads */
217 struct ceph_snap_context *snapc; /* for writes */
220 struct request *rq; /* block request */
221 struct rbd_obj_request *obj_request; /* obj req initiator */
223 spinlock_t completion_lock;/* protects next_completion */
225 rbd_img_callback_t callback;
226 u64 xferred;/* aggregate bytes transferred */
227 int result; /* first nonzero obj_request result */
229 u32 obj_request_count;
230 struct list_head obj_requests; /* rbd_obj_request structs */
235 #define for_each_obj_request(ireq, oreq) \
236 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
237 #define for_each_obj_request_from(ireq, oreq) \
238 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
239 #define for_each_obj_request_safe(ireq, oreq, n) \
240 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
246 struct list_head node;
261 int dev_id; /* blkdev unique id */
263 int major; /* blkdev assigned major */
264 struct gendisk *disk; /* blkdev's gendisk and rq */
266 u32 image_format; /* Either 1 or 2 */
267 struct rbd_client *rbd_client;
269 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
271 spinlock_t lock; /* queue, flags, open_count */
273 struct rbd_image_header header;
274 unsigned long flags; /* possibly lock protected */
275 struct rbd_spec *spec;
279 struct ceph_file_layout layout;
281 struct ceph_osd_event *watch_event;
282 struct rbd_obj_request *watch_request;
284 struct rbd_spec *parent_spec;
287 /* protects updating the header */
288 struct rw_semaphore header_rwsem;
290 struct rbd_mapping mapping;
292 struct list_head node;
294 /* list of snapshots */
295 struct list_head snaps;
299 unsigned long open_count; /* protected by lock */
303 * Flag bits for rbd_dev->flags. If atomicity is required,
304 * rbd_dev->lock is used to protect access.
306 * Currently, only the "removing" flag (which is coupled with the
307 * "open_count" field) requires atomic access.
310 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
311 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
314 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
316 static LIST_HEAD(rbd_dev_list); /* devices */
317 static DEFINE_SPINLOCK(rbd_dev_list_lock);
319 static LIST_HEAD(rbd_client_list); /* clients */
320 static DEFINE_SPINLOCK(rbd_client_list_lock);
322 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
323 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
325 static void rbd_dev_release(struct device *dev);
326 static void rbd_remove_snap_dev(struct rbd_snap *snap);
328 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
330 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
333 static struct bus_attribute rbd_bus_attrs[] = {
334 __ATTR(add, S_IWUSR, NULL, rbd_add),
335 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
339 static struct bus_type rbd_bus_type = {
341 .bus_attrs = rbd_bus_attrs,
344 static void rbd_root_dev_release(struct device *dev)
348 static struct device rbd_root_dev = {
350 .release = rbd_root_dev_release,
353 static __printf(2, 3)
354 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
356 struct va_format vaf;
364 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
365 else if (rbd_dev->disk)
366 printk(KERN_WARNING "%s: %s: %pV\n",
367 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
368 else if (rbd_dev->spec && rbd_dev->spec->image_name)
369 printk(KERN_WARNING "%s: image %s: %pV\n",
370 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
371 else if (rbd_dev->spec && rbd_dev->spec->image_id)
372 printk(KERN_WARNING "%s: id %s: %pV\n",
373 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
375 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
376 RBD_DRV_NAME, rbd_dev, &vaf);
381 #define rbd_assert(expr) \
382 if (unlikely(!(expr))) { \
383 printk(KERN_ERR "\nAssertion failure in %s() " \
385 "\trbd_assert(%s);\n\n", \
386 __func__, __LINE__, #expr); \
389 #else /* !RBD_DEBUG */
390 # define rbd_assert(expr) ((void) 0)
391 #endif /* !RBD_DEBUG */
393 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
394 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
396 static int rbd_open(struct block_device *bdev, fmode_t mode)
398 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
399 bool removing = false;
401 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
404 spin_lock_irq(&rbd_dev->lock);
405 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
408 rbd_dev->open_count++;
409 spin_unlock_irq(&rbd_dev->lock);
413 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
414 (void) get_device(&rbd_dev->dev);
415 set_device_ro(bdev, rbd_dev->mapping.read_only);
416 mutex_unlock(&ctl_mutex);
421 static int rbd_release(struct gendisk *disk, fmode_t mode)
423 struct rbd_device *rbd_dev = disk->private_data;
424 unsigned long open_count_before;
426 spin_lock_irq(&rbd_dev->lock);
427 open_count_before = rbd_dev->open_count--;
428 spin_unlock_irq(&rbd_dev->lock);
429 rbd_assert(open_count_before > 0);
431 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
432 put_device(&rbd_dev->dev);
433 mutex_unlock(&ctl_mutex);
438 static const struct block_device_operations rbd_bd_ops = {
439 .owner = THIS_MODULE,
441 .release = rbd_release,
445 * Initialize an rbd client instance.
448 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
450 struct rbd_client *rbdc;
453 dout("%s:\n", __func__);
454 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
458 kref_init(&rbdc->kref);
459 INIT_LIST_HEAD(&rbdc->node);
461 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
463 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
464 if (IS_ERR(rbdc->client))
466 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
468 ret = ceph_open_session(rbdc->client);
472 spin_lock(&rbd_client_list_lock);
473 list_add_tail(&rbdc->node, &rbd_client_list);
474 spin_unlock(&rbd_client_list_lock);
476 mutex_unlock(&ctl_mutex);
477 dout("%s: rbdc %p\n", __func__, rbdc);
482 ceph_destroy_client(rbdc->client);
484 mutex_unlock(&ctl_mutex);
488 ceph_destroy_options(ceph_opts);
489 dout("%s: error %d\n", __func__, ret);
495 * Find a ceph client with specific addr and configuration. If
496 * found, bump its reference count.
498 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
500 struct rbd_client *client_node;
503 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
506 spin_lock(&rbd_client_list_lock);
507 list_for_each_entry(client_node, &rbd_client_list, node) {
508 if (!ceph_compare_options(ceph_opts, client_node->client)) {
509 kref_get(&client_node->kref);
514 spin_unlock(&rbd_client_list_lock);
516 return found ? client_node : NULL;
526 /* string args above */
529 /* Boolean args above */
533 static match_table_t rbd_opts_tokens = {
535 /* string args above */
536 {Opt_read_only, "read_only"},
537 {Opt_read_only, "ro"}, /* Alternate spelling */
538 {Opt_read_write, "read_write"},
539 {Opt_read_write, "rw"}, /* Alternate spelling */
540 /* Boolean args above */
548 #define RBD_READ_ONLY_DEFAULT false
550 static int parse_rbd_opts_token(char *c, void *private)
552 struct rbd_options *rbd_opts = private;
553 substring_t argstr[MAX_OPT_ARGS];
554 int token, intval, ret;
556 token = match_token(c, rbd_opts_tokens, argstr);
560 if (token < Opt_last_int) {
561 ret = match_int(&argstr[0], &intval);
563 pr_err("bad mount option arg (not int) "
567 dout("got int token %d val %d\n", token, intval);
568 } else if (token > Opt_last_int && token < Opt_last_string) {
569 dout("got string token %d val %s\n", token,
571 } else if (token > Opt_last_string && token < Opt_last_bool) {
572 dout("got Boolean token %d\n", token);
574 dout("got token %d\n", token);
579 rbd_opts->read_only = true;
582 rbd_opts->read_only = false;
592 * Get a ceph client with specific addr and configuration, if one does
593 * not exist create it.
595 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
597 struct rbd_client *rbdc;
599 rbdc = rbd_client_find(ceph_opts);
600 if (rbdc) /* using an existing client */
601 ceph_destroy_options(ceph_opts);
603 rbdc = rbd_client_create(ceph_opts);
609 * Destroy ceph client
611 * Caller must hold rbd_client_list_lock.
613 static void rbd_client_release(struct kref *kref)
615 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
617 dout("%s: rbdc %p\n", __func__, rbdc);
618 spin_lock(&rbd_client_list_lock);
619 list_del(&rbdc->node);
620 spin_unlock(&rbd_client_list_lock);
622 ceph_destroy_client(rbdc->client);
627 * Drop reference to ceph client node. If it's not referenced anymore, release
630 static void rbd_put_client(struct rbd_client *rbdc)
633 kref_put(&rbdc->kref, rbd_client_release);
636 static bool rbd_image_format_valid(u32 image_format)
638 return image_format == 1 || image_format == 2;
641 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
646 /* The header has to start with the magic rbd header text */
647 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
650 /* The bio layer requires at least sector-sized I/O */
652 if (ondisk->options.order < SECTOR_SHIFT)
655 /* If we use u64 in a few spots we may be able to loosen this */
657 if (ondisk->options.order > 8 * sizeof (int) - 1)
661 * The size of a snapshot header has to fit in a size_t, and
662 * that limits the number of snapshots.
664 snap_count = le32_to_cpu(ondisk->snap_count);
665 size = SIZE_MAX - sizeof (struct ceph_snap_context);
666 if (snap_count > size / sizeof (__le64))
670 * Not only that, but the size of the entire the snapshot
671 * header must also be representable in a size_t.
673 size -= snap_count * sizeof (__le64);
674 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
681 * Create a new header structure, translate header format from the on-disk
684 static int rbd_header_from_disk(struct rbd_image_header *header,
685 struct rbd_image_header_ondisk *ondisk)
692 memset(header, 0, sizeof (*header));
694 snap_count = le32_to_cpu(ondisk->snap_count);
696 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
697 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
698 if (!header->object_prefix)
700 memcpy(header->object_prefix, ondisk->object_prefix, len);
701 header->object_prefix[len] = '\0';
704 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
706 /* Save a copy of the snapshot names */
708 if (snap_names_len > (u64) SIZE_MAX)
710 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
711 if (!header->snap_names)
714 * Note that rbd_dev_v1_header_read() guarantees
715 * the ondisk buffer we're working with has
716 * snap_names_len bytes beyond the end of the
717 * snapshot id array, this memcpy() is safe.
719 memcpy(header->snap_names, &ondisk->snaps[snap_count],
722 /* Record each snapshot's size */
724 size = snap_count * sizeof (*header->snap_sizes);
725 header->snap_sizes = kmalloc(size, GFP_KERNEL);
726 if (!header->snap_sizes)
728 for (i = 0; i < snap_count; i++)
729 header->snap_sizes[i] =
730 le64_to_cpu(ondisk->snaps[i].image_size);
732 WARN_ON(ondisk->snap_names_len);
733 header->snap_names = NULL;
734 header->snap_sizes = NULL;
737 header->features = 0; /* No features support in v1 images */
738 header->obj_order = ondisk->options.order;
739 header->crypt_type = ondisk->options.crypt_type;
740 header->comp_type = ondisk->options.comp_type;
742 /* Allocate and fill in the snapshot context */
744 header->image_size = le64_to_cpu(ondisk->image_size);
745 size = sizeof (struct ceph_snap_context);
746 size += snap_count * sizeof (header->snapc->snaps[0]);
747 header->snapc = kzalloc(size, GFP_KERNEL);
751 atomic_set(&header->snapc->nref, 1);
752 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
753 header->snapc->num_snaps = snap_count;
754 for (i = 0; i < snap_count; i++)
755 header->snapc->snaps[i] =
756 le64_to_cpu(ondisk->snaps[i].id);
761 kfree(header->snap_sizes);
762 header->snap_sizes = NULL;
763 kfree(header->snap_names);
764 header->snap_names = NULL;
765 kfree(header->object_prefix);
766 header->object_prefix = NULL;
771 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
773 struct rbd_snap *snap;
775 if (snap_id == CEPH_NOSNAP)
776 return RBD_SNAP_HEAD_NAME;
778 list_for_each_entry(snap, &rbd_dev->snaps, node)
779 if (snap_id == snap->id)
785 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
788 struct rbd_snap *snap;
790 list_for_each_entry(snap, &rbd_dev->snaps, node) {
791 if (!strcmp(snap_name, snap->name)) {
792 rbd_dev->spec->snap_id = snap->id;
793 rbd_dev->mapping.size = snap->size;
794 rbd_dev->mapping.features = snap->features;
803 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
807 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
808 sizeof (RBD_SNAP_HEAD_NAME))) {
809 rbd_dev->spec->snap_id = CEPH_NOSNAP;
810 rbd_dev->mapping.size = rbd_dev->header.image_size;
811 rbd_dev->mapping.features = rbd_dev->header.features;
814 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
817 rbd_dev->mapping.read_only = true;
819 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
825 static void rbd_header_free(struct rbd_image_header *header)
827 kfree(header->object_prefix);
828 header->object_prefix = NULL;
829 kfree(header->snap_sizes);
830 header->snap_sizes = NULL;
831 kfree(header->snap_names);
832 header->snap_names = NULL;
833 ceph_put_snap_context(header->snapc);
834 header->snapc = NULL;
837 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
843 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
846 segment = offset >> rbd_dev->header.obj_order;
847 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
848 rbd_dev->header.object_prefix, segment);
849 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
850 pr_err("error formatting segment name for #%llu (%d)\n",
859 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
861 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
863 return offset & (segment_size - 1);
866 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
867 u64 offset, u64 length)
869 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
871 offset &= segment_size - 1;
873 rbd_assert(length <= U64_MAX - offset);
874 if (offset + length > segment_size)
875 length = segment_size - offset;
881 * returns the size of an object in the image
883 static u64 rbd_obj_bytes(struct rbd_image_header *header)
885 return 1 << header->obj_order;
892 static void bio_chain_put(struct bio *chain)
898 chain = chain->bi_next;
904 * zeros a bio chain, starting at specific offset
906 static void zero_bio_chain(struct bio *chain, int start_ofs)
915 bio_for_each_segment(bv, chain, i) {
916 if (pos + bv->bv_len > start_ofs) {
917 int remainder = max(start_ofs - pos, 0);
918 buf = bvec_kmap_irq(bv, &flags);
919 memset(buf + remainder, 0,
920 bv->bv_len - remainder);
921 bvec_kunmap_irq(buf, &flags);
926 chain = chain->bi_next;
931 * Clone a portion of a bio, starting at the given byte offset
932 * and continuing for the number of bytes indicated.
934 static struct bio *bio_clone_range(struct bio *bio_src,
943 unsigned short end_idx;
947 /* Handle the easy case for the caller */
949 if (!offset && len == bio_src->bi_size)
950 return bio_clone(bio_src, gfpmask);
952 if (WARN_ON_ONCE(!len))
954 if (WARN_ON_ONCE(len > bio_src->bi_size))
956 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
959 /* Find first affected segment... */
962 __bio_for_each_segment(bv, bio_src, idx, 0) {
963 if (resid < bv->bv_len)
969 /* ...and the last affected segment */
972 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
973 if (resid <= bv->bv_len)
977 vcnt = end_idx - idx + 1;
979 /* Build the clone */
981 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
983 return NULL; /* ENOMEM */
985 bio->bi_bdev = bio_src->bi_bdev;
986 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
987 bio->bi_rw = bio_src->bi_rw;
988 bio->bi_flags |= 1 << BIO_CLONED;
991 * Copy over our part of the bio_vec, then update the first
992 * and last (or only) entries.
994 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
995 vcnt * sizeof (struct bio_vec));
996 bio->bi_io_vec[0].bv_offset += voff;
998 bio->bi_io_vec[0].bv_len -= voff;
999 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1001 bio->bi_io_vec[0].bv_len = len;
1004 bio->bi_vcnt = vcnt;
1012 * Clone a portion of a bio chain, starting at the given byte offset
1013 * into the first bio in the source chain and continuing for the
1014 * number of bytes indicated. The result is another bio chain of
1015 * exactly the given length, or a null pointer on error.
1017 * The bio_src and offset parameters are both in-out. On entry they
1018 * refer to the first source bio and the offset into that bio where
1019 * the start of data to be cloned is located.
1021 * On return, bio_src is updated to refer to the bio in the source
1022 * chain that contains first un-cloned byte, and *offset will
1023 * contain the offset of that byte within that bio.
1025 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1026 unsigned int *offset,
1030 struct bio *bi = *bio_src;
1031 unsigned int off = *offset;
1032 struct bio *chain = NULL;
1035 /* Build up a chain of clone bios up to the limit */
1037 if (!bi || off >= bi->bi_size || !len)
1038 return NULL; /* Nothing to clone */
1042 unsigned int bi_size;
1046 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1047 goto out_err; /* EINVAL; ran out of bio's */
1049 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1050 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1052 goto out_err; /* ENOMEM */
1055 end = &bio->bi_next;
1058 if (off == bi->bi_size) {
1069 bio_chain_put(chain);
1074 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1076 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1077 atomic_read(&obj_request->kref.refcount));
1078 kref_get(&obj_request->kref);
1081 static void rbd_obj_request_destroy(struct kref *kref);
1082 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1084 rbd_assert(obj_request != NULL);
1085 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1086 atomic_read(&obj_request->kref.refcount));
1087 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1090 static void rbd_img_request_get(struct rbd_img_request *img_request)
1092 dout("%s: img %p (was %d)\n", __func__, img_request,
1093 atomic_read(&img_request->kref.refcount));
1094 kref_get(&img_request->kref);
1097 static void rbd_img_request_destroy(struct kref *kref);
1098 static void rbd_img_request_put(struct rbd_img_request *img_request)
1100 rbd_assert(img_request != NULL);
1101 dout("%s: img %p (was %d)\n", __func__, img_request,
1102 atomic_read(&img_request->kref.refcount));
1103 kref_put(&img_request->kref, rbd_img_request_destroy);
1106 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1107 struct rbd_obj_request *obj_request)
1109 rbd_assert(obj_request->img_request == NULL);
1111 rbd_obj_request_get(obj_request);
1112 obj_request->img_request = img_request;
1113 obj_request->which = img_request->obj_request_count;
1114 rbd_assert(obj_request->which != BAD_WHICH);
1115 img_request->obj_request_count++;
1116 list_add_tail(&obj_request->links, &img_request->obj_requests);
1117 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1118 obj_request->which);
1121 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1122 struct rbd_obj_request *obj_request)
1124 rbd_assert(obj_request->which != BAD_WHICH);
1126 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1127 obj_request->which);
1128 list_del(&obj_request->links);
1129 rbd_assert(img_request->obj_request_count > 0);
1130 img_request->obj_request_count--;
1131 rbd_assert(obj_request->which == img_request->obj_request_count);
1132 obj_request->which = BAD_WHICH;
1133 rbd_assert(obj_request->img_request == img_request);
1134 obj_request->img_request = NULL;
1135 obj_request->callback = NULL;
1136 rbd_obj_request_put(obj_request);
1139 static bool obj_request_type_valid(enum obj_request_type type)
1142 case OBJ_REQUEST_NODATA:
1143 case OBJ_REQUEST_BIO:
1144 case OBJ_REQUEST_PAGES:
1151 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1152 struct rbd_obj_request *obj_request)
1154 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1156 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1159 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1162 dout("%s: img %p\n", __func__, img_request);
1165 * If no error occurred, compute the aggregate transfer
1166 * count for the image request. We could instead use
1167 * atomic64_cmpxchg() to update it as each object request
1168 * completes; not clear which way is better off hand.
1170 if (!img_request->result) {
1171 struct rbd_obj_request *obj_request;
1174 for_each_obj_request(img_request, obj_request)
1175 xferred += obj_request->xferred;
1176 img_request->xferred = xferred;
1179 if (img_request->callback)
1180 img_request->callback(img_request);
1182 rbd_img_request_put(img_request);
1185 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1187 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1189 dout("%s: obj %p\n", __func__, obj_request);
1191 return wait_for_completion_interruptible(&obj_request->completion);
1194 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1196 atomic_set(&obj_request->done, 0);
1200 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1204 done = atomic_inc_return(&obj_request->done);
1206 struct rbd_img_request *img_request = obj_request->img_request;
1207 struct rbd_device *rbd_dev;
1209 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1210 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1215 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1218 return atomic_read(&obj_request->done) != 0;
1222 * The default/initial value for all image request flags is 0. Each
1223 * is conditionally set to 1 at image request initialization time
1224 * and currently never change thereafter.
1226 static void img_request_write_set(struct rbd_img_request *img_request)
1228 set_bit(IMG_REQ_WRITE, &img_request->flags);
1232 static bool img_request_write_test(struct rbd_img_request *img_request)
1235 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1238 static void img_request_child_set(struct rbd_img_request *img_request)
1240 set_bit(IMG_REQ_CHILD, &img_request->flags);
1244 static bool img_request_child_test(struct rbd_img_request *img_request)
1247 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1251 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1253 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1254 obj_request, obj_request->img_request, obj_request->result,
1255 obj_request->xferred, obj_request->length);
1257 * ENOENT means a hole in the image. We zero-fill the
1258 * entire length of the request. A short read also implies
1259 * zero-fill to the end of the request. Either way we
1260 * update the xferred count to indicate the whole request
1263 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1264 if (obj_request->result == -ENOENT) {
1265 zero_bio_chain(obj_request->bio_list, 0);
1266 obj_request->result = 0;
1267 obj_request->xferred = obj_request->length;
1268 } else if (obj_request->xferred < obj_request->length &&
1269 !obj_request->result) {
1270 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1271 obj_request->xferred = obj_request->length;
1273 obj_request_done_set(obj_request);
1276 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1278 dout("%s: obj %p cb %p\n", __func__, obj_request,
1279 obj_request->callback);
1280 if (obj_request->callback)
1281 obj_request->callback(obj_request);
1283 complete_all(&obj_request->completion);
1286 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1288 dout("%s: obj %p\n", __func__, obj_request);
1289 obj_request_done_set(obj_request);
1292 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1294 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1295 obj_request->result, obj_request->xferred, obj_request->length);
1296 if (obj_request->img_request)
1297 rbd_img_obj_request_read_callback(obj_request);
1299 obj_request_done_set(obj_request);
1302 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1304 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1305 obj_request->result, obj_request->length);
1307 * There is no such thing as a successful short write.
1308 * Our xferred value is the number of bytes transferred
1309 * back. Set it to our originally-requested length.
1311 obj_request->xferred = obj_request->length;
1312 obj_request_done_set(obj_request);
1316 * For a simple stat call there's nothing to do. We'll do more if
1317 * this is part of a write sequence for a layered image.
1319 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1321 dout("%s: obj %p\n", __func__, obj_request);
1322 obj_request_done_set(obj_request);
1325 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1326 struct ceph_msg *msg)
1328 struct rbd_obj_request *obj_request = osd_req->r_priv;
1331 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1332 rbd_assert(osd_req == obj_request->osd_req);
1333 rbd_assert(!!obj_request->img_request ^
1334 (obj_request->which == BAD_WHICH));
1336 if (osd_req->r_result < 0)
1337 obj_request->result = osd_req->r_result;
1338 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1340 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1343 * We support a 64-bit length, but ultimately it has to be
1344 * passed to blk_end_request(), which takes an unsigned int.
1346 obj_request->xferred = osd_req->r_reply_op_len[0];
1347 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1348 opcode = osd_req->r_ops[0].op;
1350 case CEPH_OSD_OP_READ:
1351 rbd_osd_read_callback(obj_request);
1353 case CEPH_OSD_OP_WRITE:
1354 rbd_osd_write_callback(obj_request);
1356 case CEPH_OSD_OP_STAT:
1357 rbd_osd_stat_callback(obj_request);
1359 case CEPH_OSD_OP_CALL:
1360 case CEPH_OSD_OP_NOTIFY_ACK:
1361 case CEPH_OSD_OP_WATCH:
1362 rbd_osd_trivial_callback(obj_request);
1365 rbd_warn(NULL, "%s: unsupported op %hu\n",
1366 obj_request->object_name, (unsigned short) opcode);
1370 if (obj_request_done_test(obj_request))
1371 rbd_obj_request_complete(obj_request);
1374 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1377 struct rbd_img_request *img_request = obj_request->img_request;
1378 struct ceph_osd_request *osd_req = obj_request->osd_req;
1379 struct ceph_snap_context *snapc = NULL;
1380 u64 snap_id = CEPH_NOSNAP;
1381 struct timespec *mtime = NULL;
1382 struct timespec now;
1384 rbd_assert(osd_req != NULL);
1386 if (write_request) {
1390 snapc = img_request->snapc;
1391 } else if (img_request) {
1392 snap_id = img_request->snap_id;
1394 ceph_osdc_build_request(osd_req, obj_request->offset,
1395 snapc, snap_id, mtime);
1398 static struct ceph_osd_request *rbd_osd_req_create(
1399 struct rbd_device *rbd_dev,
1401 struct rbd_obj_request *obj_request)
1403 struct rbd_img_request *img_request = obj_request->img_request;
1404 struct ceph_snap_context *snapc = NULL;
1405 struct ceph_osd_client *osdc;
1406 struct ceph_osd_request *osd_req;
1409 rbd_assert(write_request ==
1410 img_request_write_test(img_request));
1412 snapc = img_request->snapc;
1415 /* Allocate and initialize the request, for the single op */
1417 osdc = &rbd_dev->rbd_client->client->osdc;
1418 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1420 return NULL; /* ENOMEM */
1423 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1425 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1427 osd_req->r_callback = rbd_osd_req_callback;
1428 osd_req->r_priv = obj_request;
1430 osd_req->r_oid_len = strlen(obj_request->object_name);
1431 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1432 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1434 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1439 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1441 ceph_osdc_put_request(osd_req);
1444 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1446 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1447 u64 offset, u64 length,
1448 enum obj_request_type type)
1450 struct rbd_obj_request *obj_request;
1454 rbd_assert(obj_request_type_valid(type));
1456 size = strlen(object_name) + 1;
1457 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1461 name = (char *)(obj_request + 1);
1462 obj_request->object_name = memcpy(name, object_name, size);
1463 obj_request->offset = offset;
1464 obj_request->length = length;
1465 obj_request->which = BAD_WHICH;
1466 obj_request->type = type;
1467 INIT_LIST_HEAD(&obj_request->links);
1468 obj_request_done_init(obj_request);
1469 init_completion(&obj_request->completion);
1470 kref_init(&obj_request->kref);
1472 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1473 offset, length, (int)type, obj_request);
1478 static void rbd_obj_request_destroy(struct kref *kref)
1480 struct rbd_obj_request *obj_request;
1482 obj_request = container_of(kref, struct rbd_obj_request, kref);
1484 dout("%s: obj %p\n", __func__, obj_request);
1486 rbd_assert(obj_request->img_request == NULL);
1487 rbd_assert(obj_request->which == BAD_WHICH);
1489 if (obj_request->osd_req)
1490 rbd_osd_req_destroy(obj_request->osd_req);
1492 rbd_assert(obj_request_type_valid(obj_request->type));
1493 switch (obj_request->type) {
1494 case OBJ_REQUEST_NODATA:
1495 break; /* Nothing to do */
1496 case OBJ_REQUEST_BIO:
1497 if (obj_request->bio_list)
1498 bio_chain_put(obj_request->bio_list);
1500 case OBJ_REQUEST_PAGES:
1501 if (obj_request->pages)
1502 ceph_release_page_vector(obj_request->pages,
1503 obj_request->page_count);
1511 * Caller is responsible for filling in the list of object requests
1512 * that comprises the image request, and the Linux request pointer
1513 * (if there is one).
1515 static struct rbd_img_request *rbd_img_request_create(
1516 struct rbd_device *rbd_dev,
1517 u64 offset, u64 length,
1521 struct rbd_img_request *img_request;
1522 struct ceph_snap_context *snapc = NULL;
1524 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1528 if (write_request) {
1529 down_read(&rbd_dev->header_rwsem);
1530 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1531 up_read(&rbd_dev->header_rwsem);
1532 if (WARN_ON(!snapc)) {
1534 return NULL; /* Shouldn't happen */
1539 img_request->rq = NULL;
1540 img_request->rbd_dev = rbd_dev;
1541 img_request->offset = offset;
1542 img_request->length = length;
1543 img_request->flags = 0;
1544 if (write_request) {
1545 img_request_write_set(img_request);
1546 img_request->snapc = snapc;
1548 img_request->snap_id = rbd_dev->spec->snap_id;
1551 img_request_child_set(img_request);
1552 spin_lock_init(&img_request->completion_lock);
1553 img_request->next_completion = 0;
1554 img_request->callback = NULL;
1555 img_request->result = 0;
1556 img_request->obj_request_count = 0;
1557 INIT_LIST_HEAD(&img_request->obj_requests);
1558 kref_init(&img_request->kref);
1560 rbd_img_request_get(img_request); /* Avoid a warning */
1561 rbd_img_request_put(img_request); /* TEMPORARY */
1563 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1564 write_request ? "write" : "read", offset, length,
1570 static void rbd_img_request_destroy(struct kref *kref)
1572 struct rbd_img_request *img_request;
1573 struct rbd_obj_request *obj_request;
1574 struct rbd_obj_request *next_obj_request;
1576 img_request = container_of(kref, struct rbd_img_request, kref);
1578 dout("%s: img %p\n", __func__, img_request);
1580 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1581 rbd_img_obj_request_del(img_request, obj_request);
1582 rbd_assert(img_request->obj_request_count == 0);
1584 if (img_request_write_test(img_request))
1585 ceph_put_snap_context(img_request->snapc);
1590 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1592 struct rbd_img_request *img_request;
1593 u32 which = obj_request->which;
1596 img_request = obj_request->img_request;
1598 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1599 rbd_assert(img_request != NULL);
1600 rbd_assert(!img_request_child_test(img_request))
1601 rbd_assert(img_request->rq != NULL);
1603 rbd_assert(img_request->obj_request_count > 0);
1604 rbd_assert(which != BAD_WHICH);
1605 rbd_assert(which < img_request->obj_request_count);
1606 rbd_assert(which >= img_request->next_completion);
1608 spin_lock_irq(&img_request->completion_lock);
1609 if (which != img_request->next_completion)
1612 for_each_obj_request_from(img_request, obj_request) {
1613 unsigned int xferred;
1617 rbd_assert(which < img_request->obj_request_count);
1619 if (!obj_request_done_test(obj_request))
1622 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1623 xferred = (unsigned int)obj_request->xferred;
1624 result = obj_request->result;
1626 struct rbd_device *rbd_dev = img_request->rbd_dev;
1628 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1629 img_request_write_test(img_request) ? "write"
1631 obj_request->length, obj_request->img_offset,
1632 obj_request->offset);
1633 rbd_warn(rbd_dev, " result %d xferred %x\n",
1635 if (!img_request->result)
1636 img_request->result = result;
1639 more = blk_end_request(img_request->rq, result, xferred);
1643 rbd_assert(more ^ (which == img_request->obj_request_count));
1644 img_request->next_completion = which;
1646 spin_unlock_irq(&img_request->completion_lock);
1649 rbd_img_request_complete(img_request);
1652 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1653 struct bio *bio_list)
1655 struct rbd_device *rbd_dev = img_request->rbd_dev;
1656 struct rbd_obj_request *obj_request = NULL;
1657 struct rbd_obj_request *next_obj_request;
1658 bool write_request = img_request_write_test(img_request);
1659 unsigned int bio_offset;
1664 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1666 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1668 img_offset = img_request->offset;
1669 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1670 resid = img_request->length;
1671 rbd_assert(resid > 0);
1673 struct ceph_osd_request *osd_req;
1674 const char *object_name;
1675 unsigned int clone_size;
1679 object_name = rbd_segment_name(rbd_dev, img_offset);
1682 offset = rbd_segment_offset(rbd_dev, img_offset);
1683 length = rbd_segment_length(rbd_dev, img_offset, resid);
1684 obj_request = rbd_obj_request_create(object_name,
1687 kfree(object_name); /* object request has its own copy */
1691 rbd_assert(length <= (u64) UINT_MAX);
1692 clone_size = (unsigned int) length;
1693 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1694 &bio_offset, clone_size,
1696 if (!obj_request->bio_list)
1699 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1703 obj_request->osd_req = osd_req;
1704 obj_request->callback = rbd_img_obj_callback;
1706 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1708 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1709 obj_request->bio_list, obj_request->length);
1710 rbd_osd_req_format(obj_request, write_request);
1712 obj_request->img_offset = img_offset;
1713 rbd_img_obj_request_add(img_request, obj_request);
1715 img_offset += length;
1722 rbd_obj_request_put(obj_request);
1724 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1725 rbd_obj_request_put(obj_request);
1730 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1732 struct rbd_device *rbd_dev = img_request->rbd_dev;
1733 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1734 struct rbd_obj_request *obj_request;
1735 struct rbd_obj_request *next_obj_request;
1737 dout("%s: img %p\n", __func__, img_request);
1738 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1741 ret = rbd_obj_request_submit(osdc, obj_request);
1745 * The image request has its own reference to each
1746 * of its object requests, so we can safely drop the
1749 rbd_obj_request_put(obj_request);
1755 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1756 u64 ver, u64 notify_id)
1758 struct rbd_obj_request *obj_request;
1759 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1762 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1763 OBJ_REQUEST_NODATA);
1768 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1769 if (!obj_request->osd_req)
1771 obj_request->callback = rbd_obj_request_put;
1773 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1775 rbd_osd_req_format(obj_request, false);
1777 ret = rbd_obj_request_submit(osdc, obj_request);
1780 rbd_obj_request_put(obj_request);
1785 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1787 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1794 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1795 rbd_dev->header_name, (unsigned long long) notify_id,
1796 (unsigned int) opcode);
1797 rc = rbd_dev_refresh(rbd_dev, &hver);
1799 rbd_warn(rbd_dev, "got notification but failed to "
1800 " update snaps: %d\n", rc);
1802 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1806 * Request sync osd watch/unwatch. The value of "start" determines
1807 * whether a watch request is being initiated or torn down.
1809 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1811 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1812 struct rbd_obj_request *obj_request;
1815 rbd_assert(start ^ !!rbd_dev->watch_event);
1816 rbd_assert(start ^ !!rbd_dev->watch_request);
1819 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1820 &rbd_dev->watch_event);
1823 rbd_assert(rbd_dev->watch_event != NULL);
1827 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1828 OBJ_REQUEST_NODATA);
1832 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1833 if (!obj_request->osd_req)
1837 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1839 ceph_osdc_unregister_linger_request(osdc,
1840 rbd_dev->watch_request->osd_req);
1842 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1843 rbd_dev->watch_event->cookie,
1844 rbd_dev->header.obj_version, start);
1845 rbd_osd_req_format(obj_request, true);
1847 ret = rbd_obj_request_submit(osdc, obj_request);
1850 ret = rbd_obj_request_wait(obj_request);
1853 ret = obj_request->result;
1858 * A watch request is set to linger, so the underlying osd
1859 * request won't go away until we unregister it. We retain
1860 * a pointer to the object request during that time (in
1861 * rbd_dev->watch_request), so we'll keep a reference to
1862 * it. We'll drop that reference (below) after we've
1866 rbd_dev->watch_request = obj_request;
1871 /* We have successfully torn down the watch request */
1873 rbd_obj_request_put(rbd_dev->watch_request);
1874 rbd_dev->watch_request = NULL;
1876 /* Cancel the event if we're tearing down, or on error */
1877 ceph_osdc_cancel_event(rbd_dev->watch_event);
1878 rbd_dev->watch_event = NULL;
1880 rbd_obj_request_put(obj_request);
1886 * Synchronous osd object method call
1888 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1889 const char *object_name,
1890 const char *class_name,
1891 const char *method_name,
1892 const char *outbound,
1893 size_t outbound_size,
1895 size_t inbound_size,
1898 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1899 struct rbd_obj_request *obj_request;
1900 struct page **pages;
1905 * Method calls are ultimately read operations. The result
1906 * should placed into the inbound buffer provided. They
1907 * also supply outbound data--parameters for the object
1908 * method. Currently if this is present it will be a
1911 page_count = (u32) calc_pages_for(0, inbound_size);
1912 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1914 return PTR_ERR(pages);
1917 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1922 obj_request->pages = pages;
1923 obj_request->page_count = page_count;
1925 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1926 if (!obj_request->osd_req)
1929 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1930 class_name, method_name);
1931 if (outbound_size) {
1932 struct ceph_pagelist *pagelist;
1934 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1938 ceph_pagelist_init(pagelist);
1939 ceph_pagelist_append(pagelist, outbound, outbound_size);
1940 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1943 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1944 obj_request->pages, inbound_size,
1946 rbd_osd_req_format(obj_request, false);
1948 ret = rbd_obj_request_submit(osdc, obj_request);
1951 ret = rbd_obj_request_wait(obj_request);
1955 ret = obj_request->result;
1959 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1961 *version = obj_request->version;
1964 rbd_obj_request_put(obj_request);
1966 ceph_release_page_vector(pages, page_count);
1971 static void rbd_request_fn(struct request_queue *q)
1972 __releases(q->queue_lock) __acquires(q->queue_lock)
1974 struct rbd_device *rbd_dev = q->queuedata;
1975 bool read_only = rbd_dev->mapping.read_only;
1979 while ((rq = blk_fetch_request(q))) {
1980 bool write_request = rq_data_dir(rq) == WRITE;
1981 struct rbd_img_request *img_request;
1985 /* Ignore any non-FS requests that filter through. */
1987 if (rq->cmd_type != REQ_TYPE_FS) {
1988 dout("%s: non-fs request type %d\n", __func__,
1989 (int) rq->cmd_type);
1990 __blk_end_request_all(rq, 0);
1994 /* Ignore/skip any zero-length requests */
1996 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1997 length = (u64) blk_rq_bytes(rq);
2000 dout("%s: zero-length request\n", __func__);
2001 __blk_end_request_all(rq, 0);
2005 spin_unlock_irq(q->queue_lock);
2007 /* Disallow writes to a read-only device */
2009 if (write_request) {
2013 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2017 * Quit early if the mapped snapshot no longer
2018 * exists. It's still possible the snapshot will
2019 * have disappeared by the time our request arrives
2020 * at the osd, but there's no sense in sending it if
2023 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2024 dout("request for non-existent snapshot");
2025 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2031 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2032 goto end_request; /* Shouldn't happen */
2035 img_request = rbd_img_request_create(rbd_dev, offset, length,
2036 write_request, false);
2040 img_request->rq = rq;
2042 result = rbd_img_request_fill_bio(img_request, rq->bio);
2044 result = rbd_img_request_submit(img_request);
2046 rbd_img_request_put(img_request);
2048 spin_lock_irq(q->queue_lock);
2050 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2051 write_request ? "write" : "read",
2052 length, offset, result);
2054 __blk_end_request_all(rq, result);
2060 * a queue callback. Makes sure that we don't create a bio that spans across
2061 * multiple osd objects. One exception would be with a single page bios,
2062 * which we handle later at bio_chain_clone_range()
2064 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2065 struct bio_vec *bvec)
2067 struct rbd_device *rbd_dev = q->queuedata;
2068 sector_t sector_offset;
2069 sector_t sectors_per_obj;
2070 sector_t obj_sector_offset;
2074 * Find how far into its rbd object the partition-relative
2075 * bio start sector is to offset relative to the enclosing
2078 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2079 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2080 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2083 * Compute the number of bytes from that offset to the end
2084 * of the object. Account for what's already used by the bio.
2086 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2087 if (ret > bmd->bi_size)
2088 ret -= bmd->bi_size;
2093 * Don't send back more than was asked for. And if the bio
2094 * was empty, let the whole thing through because: "Note
2095 * that a block device *must* allow a single page to be
2096 * added to an empty bio."
2098 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2099 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2100 ret = (int) bvec->bv_len;
2105 static void rbd_free_disk(struct rbd_device *rbd_dev)
2107 struct gendisk *disk = rbd_dev->disk;
2112 if (disk->flags & GENHD_FL_UP)
2115 blk_cleanup_queue(disk->queue);
2119 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2120 const char *object_name,
2121 u64 offset, u64 length,
2122 char *buf, u64 *version)
2125 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2126 struct rbd_obj_request *obj_request;
2127 struct page **pages = NULL;
2132 page_count = (u32) calc_pages_for(offset, length);
2133 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2135 ret = PTR_ERR(pages);
2138 obj_request = rbd_obj_request_create(object_name, offset, length,
2143 obj_request->pages = pages;
2144 obj_request->page_count = page_count;
2146 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2147 if (!obj_request->osd_req)
2150 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2151 offset, length, 0, 0);
2152 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2154 obj_request->length,
2155 obj_request->offset & ~PAGE_MASK,
2157 rbd_osd_req_format(obj_request, false);
2159 ret = rbd_obj_request_submit(osdc, obj_request);
2162 ret = rbd_obj_request_wait(obj_request);
2166 ret = obj_request->result;
2170 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2171 size = (size_t) obj_request->xferred;
2172 ceph_copy_from_page_vector(pages, buf, 0, size);
2173 rbd_assert(size <= (size_t) INT_MAX);
2176 *version = obj_request->version;
2179 rbd_obj_request_put(obj_request);
2181 ceph_release_page_vector(pages, page_count);
2187 * Read the complete header for the given rbd device.
2189 * Returns a pointer to a dynamically-allocated buffer containing
2190 * the complete and validated header. Caller can pass the address
2191 * of a variable that will be filled in with the version of the
2192 * header object at the time it was read.
2194 * Returns a pointer-coded errno if a failure occurs.
2196 static struct rbd_image_header_ondisk *
2197 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2199 struct rbd_image_header_ondisk *ondisk = NULL;
2206 * The complete header will include an array of its 64-bit
2207 * snapshot ids, followed by the names of those snapshots as
2208 * a contiguous block of NUL-terminated strings. Note that
2209 * the number of snapshots could change by the time we read
2210 * it in, in which case we re-read it.
2217 size = sizeof (*ondisk);
2218 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2220 ondisk = kmalloc(size, GFP_KERNEL);
2222 return ERR_PTR(-ENOMEM);
2224 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2226 (char *) ondisk, version);
2229 if (WARN_ON((size_t) ret < size)) {
2231 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2235 if (!rbd_dev_ondisk_valid(ondisk)) {
2237 rbd_warn(rbd_dev, "invalid header");
2241 names_size = le64_to_cpu(ondisk->snap_names_len);
2242 want_count = snap_count;
2243 snap_count = le32_to_cpu(ondisk->snap_count);
2244 } while (snap_count != want_count);
2251 return ERR_PTR(ret);
2255 * reload the ondisk the header
2257 static int rbd_read_header(struct rbd_device *rbd_dev,
2258 struct rbd_image_header *header)
2260 struct rbd_image_header_ondisk *ondisk;
2264 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2266 return PTR_ERR(ondisk);
2267 ret = rbd_header_from_disk(header, ondisk);
2269 header->obj_version = ver;
2275 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2277 struct rbd_snap *snap;
2278 struct rbd_snap *next;
2280 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2281 rbd_remove_snap_dev(snap);
2284 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2288 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2291 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2292 dout("setting size to %llu sectors", (unsigned long long) size);
2293 rbd_dev->mapping.size = (u64) size;
2294 set_capacity(rbd_dev->disk, size);
2298 * only read the first part of the ondisk header, without the snaps info
2300 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2303 struct rbd_image_header h;
2305 ret = rbd_read_header(rbd_dev, &h);
2309 down_write(&rbd_dev->header_rwsem);
2311 /* Update image size, and check for resize of mapped image */
2312 rbd_dev->header.image_size = h.image_size;
2313 rbd_update_mapping_size(rbd_dev);
2315 /* rbd_dev->header.object_prefix shouldn't change */
2316 kfree(rbd_dev->header.snap_sizes);
2317 kfree(rbd_dev->header.snap_names);
2318 /* osd requests may still refer to snapc */
2319 ceph_put_snap_context(rbd_dev->header.snapc);
2322 *hver = h.obj_version;
2323 rbd_dev->header.obj_version = h.obj_version;
2324 rbd_dev->header.image_size = h.image_size;
2325 rbd_dev->header.snapc = h.snapc;
2326 rbd_dev->header.snap_names = h.snap_names;
2327 rbd_dev->header.snap_sizes = h.snap_sizes;
2328 /* Free the extra copy of the object prefix */
2329 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2330 kfree(h.object_prefix);
2332 ret = rbd_dev_snaps_update(rbd_dev);
2334 ret = rbd_dev_snaps_register(rbd_dev);
2336 up_write(&rbd_dev->header_rwsem);
2341 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2345 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2346 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2347 if (rbd_dev->image_format == 1)
2348 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2350 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2351 mutex_unlock(&ctl_mutex);
2356 static int rbd_init_disk(struct rbd_device *rbd_dev)
2358 struct gendisk *disk;
2359 struct request_queue *q;
2362 /* create gendisk info */
2363 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2367 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2369 disk->major = rbd_dev->major;
2370 disk->first_minor = 0;
2371 disk->fops = &rbd_bd_ops;
2372 disk->private_data = rbd_dev;
2374 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2378 /* We use the default size, but let's be explicit about it. */
2379 blk_queue_physical_block_size(q, SECTOR_SIZE);
2381 /* set io sizes to object size */
2382 segment_size = rbd_obj_bytes(&rbd_dev->header);
2383 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2384 blk_queue_max_segment_size(q, segment_size);
2385 blk_queue_io_min(q, segment_size);
2386 blk_queue_io_opt(q, segment_size);
2388 blk_queue_merge_bvec(q, rbd_merge_bvec);
2391 q->queuedata = rbd_dev;
2393 rbd_dev->disk = disk;
2395 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2408 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2410 return container_of(dev, struct rbd_device, dev);
2413 static ssize_t rbd_size_show(struct device *dev,
2414 struct device_attribute *attr, char *buf)
2416 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2419 down_read(&rbd_dev->header_rwsem);
2420 size = get_capacity(rbd_dev->disk);
2421 up_read(&rbd_dev->header_rwsem);
2423 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2427 * Note this shows the features for whatever's mapped, which is not
2428 * necessarily the base image.
2430 static ssize_t rbd_features_show(struct device *dev,
2431 struct device_attribute *attr, char *buf)
2433 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2435 return sprintf(buf, "0x%016llx\n",
2436 (unsigned long long) rbd_dev->mapping.features);
2439 static ssize_t rbd_major_show(struct device *dev,
2440 struct device_attribute *attr, char *buf)
2442 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2444 return sprintf(buf, "%d\n", rbd_dev->major);
2447 static ssize_t rbd_client_id_show(struct device *dev,
2448 struct device_attribute *attr, char *buf)
2450 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2452 return sprintf(buf, "client%lld\n",
2453 ceph_client_id(rbd_dev->rbd_client->client));
2456 static ssize_t rbd_pool_show(struct device *dev,
2457 struct device_attribute *attr, char *buf)
2459 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2461 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2464 static ssize_t rbd_pool_id_show(struct device *dev,
2465 struct device_attribute *attr, char *buf)
2467 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469 return sprintf(buf, "%llu\n",
2470 (unsigned long long) rbd_dev->spec->pool_id);
2473 static ssize_t rbd_name_show(struct device *dev,
2474 struct device_attribute *attr, char *buf)
2476 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2478 if (rbd_dev->spec->image_name)
2479 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2481 return sprintf(buf, "(unknown)\n");
2484 static ssize_t rbd_image_id_show(struct device *dev,
2485 struct device_attribute *attr, char *buf)
2487 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2489 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2493 * Shows the name of the currently-mapped snapshot (or
2494 * RBD_SNAP_HEAD_NAME for the base image).
2496 static ssize_t rbd_snap_show(struct device *dev,
2497 struct device_attribute *attr,
2500 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2502 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2506 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2507 * for the parent image. If there is no parent, simply shows
2508 * "(no parent image)".
2510 static ssize_t rbd_parent_show(struct device *dev,
2511 struct device_attribute *attr,
2514 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2515 struct rbd_spec *spec = rbd_dev->parent_spec;
2520 return sprintf(buf, "(no parent image)\n");
2522 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2523 (unsigned long long) spec->pool_id, spec->pool_name);
2528 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2529 spec->image_name ? spec->image_name : "(unknown)");
2534 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2535 (unsigned long long) spec->snap_id, spec->snap_name);
2540 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2545 return (ssize_t) (bufp - buf);
2548 static ssize_t rbd_image_refresh(struct device *dev,
2549 struct device_attribute *attr,
2553 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2556 ret = rbd_dev_refresh(rbd_dev, NULL);
2558 return ret < 0 ? ret : size;
2561 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2562 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2563 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2564 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2565 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2566 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2567 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2568 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2569 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2570 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2571 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2573 static struct attribute *rbd_attrs[] = {
2574 &dev_attr_size.attr,
2575 &dev_attr_features.attr,
2576 &dev_attr_major.attr,
2577 &dev_attr_client_id.attr,
2578 &dev_attr_pool.attr,
2579 &dev_attr_pool_id.attr,
2580 &dev_attr_name.attr,
2581 &dev_attr_image_id.attr,
2582 &dev_attr_current_snap.attr,
2583 &dev_attr_parent.attr,
2584 &dev_attr_refresh.attr,
2588 static struct attribute_group rbd_attr_group = {
2592 static const struct attribute_group *rbd_attr_groups[] = {
2597 static void rbd_sysfs_dev_release(struct device *dev)
2601 static struct device_type rbd_device_type = {
2603 .groups = rbd_attr_groups,
2604 .release = rbd_sysfs_dev_release,
2612 static ssize_t rbd_snap_size_show(struct device *dev,
2613 struct device_attribute *attr,
2616 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2618 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2621 static ssize_t rbd_snap_id_show(struct device *dev,
2622 struct device_attribute *attr,
2625 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2627 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2630 static ssize_t rbd_snap_features_show(struct device *dev,
2631 struct device_attribute *attr,
2634 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2636 return sprintf(buf, "0x%016llx\n",
2637 (unsigned long long) snap->features);
2640 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2641 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2642 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2644 static struct attribute *rbd_snap_attrs[] = {
2645 &dev_attr_snap_size.attr,
2646 &dev_attr_snap_id.attr,
2647 &dev_attr_snap_features.attr,
2651 static struct attribute_group rbd_snap_attr_group = {
2652 .attrs = rbd_snap_attrs,
2655 static void rbd_snap_dev_release(struct device *dev)
2657 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2662 static const struct attribute_group *rbd_snap_attr_groups[] = {
2663 &rbd_snap_attr_group,
2667 static struct device_type rbd_snap_device_type = {
2668 .groups = rbd_snap_attr_groups,
2669 .release = rbd_snap_dev_release,
2672 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2674 kref_get(&spec->kref);
2679 static void rbd_spec_free(struct kref *kref);
2680 static void rbd_spec_put(struct rbd_spec *spec)
2683 kref_put(&spec->kref, rbd_spec_free);
2686 static struct rbd_spec *rbd_spec_alloc(void)
2688 struct rbd_spec *spec;
2690 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2693 kref_init(&spec->kref);
2695 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2700 static void rbd_spec_free(struct kref *kref)
2702 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2704 kfree(spec->pool_name);
2705 kfree(spec->image_id);
2706 kfree(spec->image_name);
2707 kfree(spec->snap_name);
2711 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2712 struct rbd_spec *spec)
2714 struct rbd_device *rbd_dev;
2716 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2720 spin_lock_init(&rbd_dev->lock);
2722 INIT_LIST_HEAD(&rbd_dev->node);
2723 INIT_LIST_HEAD(&rbd_dev->snaps);
2724 init_rwsem(&rbd_dev->header_rwsem);
2726 rbd_dev->spec = spec;
2727 rbd_dev->rbd_client = rbdc;
2729 /* Initialize the layout used for all rbd requests */
2731 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2732 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2733 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2734 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2739 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2741 rbd_spec_put(rbd_dev->parent_spec);
2742 kfree(rbd_dev->header_name);
2743 rbd_put_client(rbd_dev->rbd_client);
2744 rbd_spec_put(rbd_dev->spec);
2748 static bool rbd_snap_registered(struct rbd_snap *snap)
2750 bool ret = snap->dev.type == &rbd_snap_device_type;
2751 bool reg = device_is_registered(&snap->dev);
2753 rbd_assert(!ret ^ reg);
2758 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2760 list_del(&snap->node);
2761 if (device_is_registered(&snap->dev))
2762 device_unregister(&snap->dev);
2765 static int rbd_register_snap_dev(struct rbd_snap *snap,
2766 struct device *parent)
2768 struct device *dev = &snap->dev;
2771 dev->type = &rbd_snap_device_type;
2772 dev->parent = parent;
2773 dev->release = rbd_snap_dev_release;
2774 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2775 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2777 ret = device_register(dev);
2782 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2783 const char *snap_name,
2784 u64 snap_id, u64 snap_size,
2787 struct rbd_snap *snap;
2790 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2792 return ERR_PTR(-ENOMEM);
2795 snap->name = kstrdup(snap_name, GFP_KERNEL);
2800 snap->size = snap_size;
2801 snap->features = snap_features;
2809 return ERR_PTR(ret);
2812 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2813 u64 *snap_size, u64 *snap_features)
2817 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2819 *snap_size = rbd_dev->header.snap_sizes[which];
2820 *snap_features = 0; /* No features for v1 */
2822 /* Skip over names until we find the one we are looking for */
2824 snap_name = rbd_dev->header.snap_names;
2826 snap_name += strlen(snap_name) + 1;
2832 * Get the size and object order for an image snapshot, or if
2833 * snap_id is CEPH_NOSNAP, gets this information for the base
2836 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2837 u8 *order, u64 *snap_size)
2839 __le64 snapid = cpu_to_le64(snap_id);
2844 } __attribute__ ((packed)) size_buf = { 0 };
2846 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2848 (char *) &snapid, sizeof (snapid),
2849 (char *) &size_buf, sizeof (size_buf), NULL);
2850 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2854 *order = size_buf.order;
2855 *snap_size = le64_to_cpu(size_buf.size);
2857 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2858 (unsigned long long) snap_id, (unsigned int) *order,
2859 (unsigned long long) *snap_size);
2864 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2866 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2867 &rbd_dev->header.obj_order,
2868 &rbd_dev->header.image_size);
2871 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2877 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2881 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2882 "rbd", "get_object_prefix",
2884 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2885 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2890 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2891 p + RBD_OBJ_PREFIX_LEN_MAX,
2894 if (IS_ERR(rbd_dev->header.object_prefix)) {
2895 ret = PTR_ERR(rbd_dev->header.object_prefix);
2896 rbd_dev->header.object_prefix = NULL;
2898 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2907 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2910 __le64 snapid = cpu_to_le64(snap_id);
2914 } features_buf = { 0 };
2918 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2919 "rbd", "get_features",
2920 (char *) &snapid, sizeof (snapid),
2921 (char *) &features_buf, sizeof (features_buf),
2923 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2927 incompat = le64_to_cpu(features_buf.incompat);
2928 if (incompat & ~RBD_FEATURES_SUPPORTED)
2931 *snap_features = le64_to_cpu(features_buf.features);
2933 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2934 (unsigned long long) snap_id,
2935 (unsigned long long) *snap_features,
2936 (unsigned long long) le64_to_cpu(features_buf.incompat));
2941 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2943 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2944 &rbd_dev->header.features);
2947 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2949 struct rbd_spec *parent_spec;
2951 void *reply_buf = NULL;
2959 parent_spec = rbd_spec_alloc();
2963 size = sizeof (__le64) + /* pool_id */
2964 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2965 sizeof (__le64) + /* snap_id */
2966 sizeof (__le64); /* overlap */
2967 reply_buf = kmalloc(size, GFP_KERNEL);
2973 snapid = cpu_to_le64(CEPH_NOSNAP);
2974 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2975 "rbd", "get_parent",
2976 (char *) &snapid, sizeof (snapid),
2977 (char *) reply_buf, size, NULL);
2978 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2984 end = (char *) reply_buf + size;
2985 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2986 if (parent_spec->pool_id == CEPH_NOPOOL)
2987 goto out; /* No parent? No problem. */
2989 /* The ceph file layout needs to fit pool id in 32 bits */
2992 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2995 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2996 if (IS_ERR(image_id)) {
2997 ret = PTR_ERR(image_id);
3000 parent_spec->image_id = image_id;
3001 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3002 ceph_decode_64_safe(&p, end, overlap, out_err);
3004 rbd_dev->parent_overlap = overlap;
3005 rbd_dev->parent_spec = parent_spec;
3006 parent_spec = NULL; /* rbd_dev now owns this */
3011 rbd_spec_put(parent_spec);
3016 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3018 size_t image_id_size;
3023 void *reply_buf = NULL;
3025 char *image_name = NULL;
3028 rbd_assert(!rbd_dev->spec->image_name);
3030 len = strlen(rbd_dev->spec->image_id);
3031 image_id_size = sizeof (__le32) + len;
3032 image_id = kmalloc(image_id_size, GFP_KERNEL);
3037 end = (char *) image_id + image_id_size;
3038 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3040 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3041 reply_buf = kmalloc(size, GFP_KERNEL);
3045 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3046 "rbd", "dir_get_name",
3047 image_id, image_id_size,
3048 (char *) reply_buf, size, NULL);
3052 end = (char *) reply_buf + size;
3053 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3054 if (IS_ERR(image_name))
3057 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3066 * When a parent image gets probed, we only have the pool, image,
3067 * and snapshot ids but not the names of any of them. This call
3068 * is made later to fill in those names. It has to be done after
3069 * rbd_dev_snaps_update() has completed because some of the
3070 * information (in particular, snapshot name) is not available
3073 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3075 struct ceph_osd_client *osdc;
3077 void *reply_buf = NULL;
3080 if (rbd_dev->spec->pool_name)
3081 return 0; /* Already have the names */
3083 /* Look up the pool name */
3085 osdc = &rbd_dev->rbd_client->client->osdc;
3086 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3088 rbd_warn(rbd_dev, "there is no pool with id %llu",
3089 rbd_dev->spec->pool_id); /* Really a BUG() */
3093 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3094 if (!rbd_dev->spec->pool_name)
3097 /* Fetch the image name; tolerate failure here */
3099 name = rbd_dev_image_name(rbd_dev);
3101 rbd_dev->spec->image_name = (char *) name;
3103 rbd_warn(rbd_dev, "unable to get image name");
3105 /* Look up the snapshot name. */
3107 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3109 rbd_warn(rbd_dev, "no snapshot with id %llu",
3110 rbd_dev->spec->snap_id); /* Really a BUG() */
3114 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3115 if(!rbd_dev->spec->snap_name)
3121 kfree(rbd_dev->spec->pool_name);
3122 rbd_dev->spec->pool_name = NULL;
3127 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3136 struct ceph_snap_context *snapc;
3140 * We'll need room for the seq value (maximum snapshot id),
3141 * snapshot count, and array of that many snapshot ids.
3142 * For now we have a fixed upper limit on the number we're
3143 * prepared to receive.
3145 size = sizeof (__le64) + sizeof (__le32) +
3146 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3147 reply_buf = kzalloc(size, GFP_KERNEL);
3151 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3152 "rbd", "get_snapcontext",
3154 reply_buf, size, ver);
3155 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3161 end = (char *) reply_buf + size;
3162 ceph_decode_64_safe(&p, end, seq, out);
3163 ceph_decode_32_safe(&p, end, snap_count, out);
3166 * Make sure the reported number of snapshot ids wouldn't go
3167 * beyond the end of our buffer. But before checking that,
3168 * make sure the computed size of the snapshot context we
3169 * allocate is representable in a size_t.
3171 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3176 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3179 size = sizeof (struct ceph_snap_context) +
3180 snap_count * sizeof (snapc->snaps[0]);
3181 snapc = kmalloc(size, GFP_KERNEL);
3187 atomic_set(&snapc->nref, 1);
3189 snapc->num_snaps = snap_count;
3190 for (i = 0; i < snap_count; i++)
3191 snapc->snaps[i] = ceph_decode_64(&p);
3193 rbd_dev->header.snapc = snapc;
3195 dout(" snap context seq = %llu, snap_count = %u\n",
3196 (unsigned long long) seq, (unsigned int) snap_count);
3204 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3214 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3215 reply_buf = kmalloc(size, GFP_KERNEL);
3217 return ERR_PTR(-ENOMEM);
3219 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3220 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3221 "rbd", "get_snapshot_name",
3222 (char *) &snap_id, sizeof (snap_id),
3223 reply_buf, size, NULL);
3224 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3229 end = (char *) reply_buf + size;
3230 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3231 if (IS_ERR(snap_name)) {
3232 ret = PTR_ERR(snap_name);
3235 dout(" snap_id 0x%016llx snap_name = %s\n",
3236 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3244 return ERR_PTR(ret);
3247 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3248 u64 *snap_size, u64 *snap_features)
3254 snap_id = rbd_dev->header.snapc->snaps[which];
3255 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3257 return ERR_PTR(ret);
3258 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3260 return ERR_PTR(ret);
3262 return rbd_dev_v2_snap_name(rbd_dev, which);
3265 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3266 u64 *snap_size, u64 *snap_features)
3268 if (rbd_dev->image_format == 1)
3269 return rbd_dev_v1_snap_info(rbd_dev, which,
3270 snap_size, snap_features);
3271 if (rbd_dev->image_format == 2)
3272 return rbd_dev_v2_snap_info(rbd_dev, which,
3273 snap_size, snap_features);
3274 return ERR_PTR(-EINVAL);
3277 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3282 down_write(&rbd_dev->header_rwsem);
3284 /* Grab old order first, to see if it changes */
3286 obj_order = rbd_dev->header.obj_order,
3287 ret = rbd_dev_v2_image_size(rbd_dev);
3290 if (rbd_dev->header.obj_order != obj_order) {
3294 rbd_update_mapping_size(rbd_dev);
3296 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3297 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3300 ret = rbd_dev_snaps_update(rbd_dev);
3301 dout("rbd_dev_snaps_update returned %d\n", ret);
3304 ret = rbd_dev_snaps_register(rbd_dev);
3305 dout("rbd_dev_snaps_register returned %d\n", ret);
3307 up_write(&rbd_dev->header_rwsem);
3313 * Scan the rbd device's current snapshot list and compare it to the
3314 * newly-received snapshot context. Remove any existing snapshots
3315 * not present in the new snapshot context. Add a new snapshot for
3316 * any snaphots in the snapshot context not in the current list.
3317 * And verify there are no changes to snapshots we already know
3320 * Assumes the snapshots in the snapshot context are sorted by
3321 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3322 * are also maintained in that order.)
3324 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3326 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3327 const u32 snap_count = snapc->num_snaps;
3328 struct list_head *head = &rbd_dev->snaps;
3329 struct list_head *links = head->next;
3332 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3333 while (index < snap_count || links != head) {
3335 struct rbd_snap *snap;
3338 u64 snap_features = 0;
3340 snap_id = index < snap_count ? snapc->snaps[index]
3342 snap = links != head ? list_entry(links, struct rbd_snap, node)
3344 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3346 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3347 struct list_head *next = links->next;
3350 * A previously-existing snapshot is not in
3351 * the new snap context.
3353 * If the now missing snapshot is the one the
3354 * image is mapped to, clear its exists flag
3355 * so we can avoid sending any more requests
3358 if (rbd_dev->spec->snap_id == snap->id)
3359 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3360 rbd_remove_snap_dev(snap);
3361 dout("%ssnap id %llu has been removed\n",
3362 rbd_dev->spec->snap_id == snap->id ?
3364 (unsigned long long) snap->id);
3366 /* Done with this list entry; advance */
3372 snap_name = rbd_dev_snap_info(rbd_dev, index,
3373 &snap_size, &snap_features);
3374 if (IS_ERR(snap_name))
3375 return PTR_ERR(snap_name);
3377 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3378 (unsigned long long) snap_id);
3379 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3380 struct rbd_snap *new_snap;
3382 /* We haven't seen this snapshot before */
3384 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3385 snap_id, snap_size, snap_features);
3386 if (IS_ERR(new_snap)) {
3387 int err = PTR_ERR(new_snap);
3389 dout(" failed to add dev, error %d\n", err);
3394 /* New goes before existing, or at end of list */
3396 dout(" added dev%s\n", snap ? "" : " at end\n");
3398 list_add_tail(&new_snap->node, &snap->node);
3400 list_add_tail(&new_snap->node, head);
3402 /* Already have this one */
3404 dout(" already present\n");
3406 rbd_assert(snap->size == snap_size);
3407 rbd_assert(!strcmp(snap->name, snap_name));
3408 rbd_assert(snap->features == snap_features);
3410 /* Done with this list entry; advance */
3412 links = links->next;
3415 /* Advance to the next entry in the snapshot context */
3419 dout("%s: done\n", __func__);
3425 * Scan the list of snapshots and register the devices for any that
3426 * have not already been registered.
3428 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3430 struct rbd_snap *snap;
3433 dout("%s:\n", __func__);
3434 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3437 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3438 if (!rbd_snap_registered(snap)) {
3439 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3444 dout("%s: returning %d\n", __func__, ret);
3449 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3456 dev = &rbd_dev->dev;
3457 dev->bus = &rbd_bus_type;
3458 dev->type = &rbd_device_type;
3459 dev->parent = &rbd_root_dev;
3460 dev->release = rbd_dev_release;
3461 dev_set_name(dev, "%d", rbd_dev->dev_id);
3462 ret = device_register(dev);
3464 mutex_unlock(&ctl_mutex);
3469 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3471 device_unregister(&rbd_dev->dev);
3474 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3477 * Get a unique rbd identifier for the given new rbd_dev, and add
3478 * the rbd_dev to the global list. The minimum rbd id is 1.
3480 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3482 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3484 spin_lock(&rbd_dev_list_lock);
3485 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3486 spin_unlock(&rbd_dev_list_lock);
3487 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3488 (unsigned long long) rbd_dev->dev_id);
3492 * Remove an rbd_dev from the global list, and record that its
3493 * identifier is no longer in use.
3495 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3497 struct list_head *tmp;
3498 int rbd_id = rbd_dev->dev_id;
3501 rbd_assert(rbd_id > 0);
3503 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3504 (unsigned long long) rbd_dev->dev_id);
3505 spin_lock(&rbd_dev_list_lock);
3506 list_del_init(&rbd_dev->node);
3509 * If the id being "put" is not the current maximum, there
3510 * is nothing special we need to do.
3512 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3513 spin_unlock(&rbd_dev_list_lock);
3518 * We need to update the current maximum id. Search the
3519 * list to find out what it is. We're more likely to find
3520 * the maximum at the end, so search the list backward.
3523 list_for_each_prev(tmp, &rbd_dev_list) {
3524 struct rbd_device *rbd_dev;
3526 rbd_dev = list_entry(tmp, struct rbd_device, node);
3527 if (rbd_dev->dev_id > max_id)
3528 max_id = rbd_dev->dev_id;
3530 spin_unlock(&rbd_dev_list_lock);
3533 * The max id could have been updated by rbd_dev_id_get(), in
3534 * which case it now accurately reflects the new maximum.
3535 * Be careful not to overwrite the maximum value in that
3538 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3539 dout(" max dev id has been reset\n");
3543 * Skips over white space at *buf, and updates *buf to point to the
3544 * first found non-space character (if any). Returns the length of
3545 * the token (string of non-white space characters) found. Note
3546 * that *buf must be terminated with '\0'.
3548 static inline size_t next_token(const char **buf)
3551 * These are the characters that produce nonzero for
3552 * isspace() in the "C" and "POSIX" locales.
3554 const char *spaces = " \f\n\r\t\v";
3556 *buf += strspn(*buf, spaces); /* Find start of token */
3558 return strcspn(*buf, spaces); /* Return token length */
3562 * Finds the next token in *buf, and if the provided token buffer is
3563 * big enough, copies the found token into it. The result, if
3564 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3565 * must be terminated with '\0' on entry.
3567 * Returns the length of the token found (not including the '\0').
3568 * Return value will be 0 if no token is found, and it will be >=
3569 * token_size if the token would not fit.
3571 * The *buf pointer will be updated to point beyond the end of the
3572 * found token. Note that this occurs even if the token buffer is
3573 * too small to hold it.
3575 static inline size_t copy_token(const char **buf,
3581 len = next_token(buf);
3582 if (len < token_size) {
3583 memcpy(token, *buf, len);
3584 *(token + len) = '\0';
3592 * Finds the next token in *buf, dynamically allocates a buffer big
3593 * enough to hold a copy of it, and copies the token into the new
3594 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3595 * that a duplicate buffer is created even for a zero-length token.
3597 * Returns a pointer to the newly-allocated duplicate, or a null
3598 * pointer if memory for the duplicate was not available. If
3599 * the lenp argument is a non-null pointer, the length of the token
3600 * (not including the '\0') is returned in *lenp.
3602 * If successful, the *buf pointer will be updated to point beyond
3603 * the end of the found token.
3605 * Note: uses GFP_KERNEL for allocation.
3607 static inline char *dup_token(const char **buf, size_t *lenp)
3612 len = next_token(buf);
3613 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3616 *(dup + len) = '\0';
3626 * Parse the options provided for an "rbd add" (i.e., rbd image
3627 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3628 * and the data written is passed here via a NUL-terminated buffer.
3629 * Returns 0 if successful or an error code otherwise.
3631 * The information extracted from these options is recorded in
3632 * the other parameters which return dynamically-allocated
3635 * The address of a pointer that will refer to a ceph options
3636 * structure. Caller must release the returned pointer using
3637 * ceph_destroy_options() when it is no longer needed.
3639 * Address of an rbd options pointer. Fully initialized by
3640 * this function; caller must release with kfree().
3642 * Address of an rbd image specification pointer. Fully
3643 * initialized by this function based on parsed options.
3644 * Caller must release with rbd_spec_put().
3646 * The options passed take this form:
3647 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3650 * A comma-separated list of one or more monitor addresses.
3651 * A monitor address is an ip address, optionally followed
3652 * by a port number (separated by a colon).
3653 * I.e.: ip1[:port1][,ip2[:port2]...]
3655 * A comma-separated list of ceph and/or rbd options.
3657 * The name of the rados pool containing the rbd image.
3659 * The name of the image in that pool to map.
3661 * An optional snapshot id. If provided, the mapping will
3662 * present data from the image at the time that snapshot was
3663 * created. The image head is used if no snapshot id is
3664 * provided. Snapshot mappings are always read-only.
3666 static int rbd_add_parse_args(const char *buf,
3667 struct ceph_options **ceph_opts,
3668 struct rbd_options **opts,
3669 struct rbd_spec **rbd_spec)
3673 const char *mon_addrs;
3674 size_t mon_addrs_size;
3675 struct rbd_spec *spec = NULL;
3676 struct rbd_options *rbd_opts = NULL;
3677 struct ceph_options *copts;
3680 /* The first four tokens are required */
3682 len = next_token(&buf);
3684 rbd_warn(NULL, "no monitor address(es) provided");
3688 mon_addrs_size = len + 1;
3692 options = dup_token(&buf, NULL);
3696 rbd_warn(NULL, "no options provided");
3700 spec = rbd_spec_alloc();
3704 spec->pool_name = dup_token(&buf, NULL);
3705 if (!spec->pool_name)
3707 if (!*spec->pool_name) {
3708 rbd_warn(NULL, "no pool name provided");
3712 spec->image_name = dup_token(&buf, NULL);
3713 if (!spec->image_name)
3715 if (!*spec->image_name) {
3716 rbd_warn(NULL, "no image name provided");
3721 * Snapshot name is optional; default is to use "-"
3722 * (indicating the head/no snapshot).
3724 len = next_token(&buf);
3726 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3727 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3728 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3729 ret = -ENAMETOOLONG;
3732 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3733 if (!spec->snap_name)
3735 *(spec->snap_name + len) = '\0';
3737 /* Initialize all rbd options to the defaults */
3739 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3743 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3745 copts = ceph_parse_options(options, mon_addrs,
3746 mon_addrs + mon_addrs_size - 1,
3747 parse_rbd_opts_token, rbd_opts);
3748 if (IS_ERR(copts)) {
3749 ret = PTR_ERR(copts);
3770 * An rbd format 2 image has a unique identifier, distinct from the
3771 * name given to it by the user. Internally, that identifier is
3772 * what's used to specify the names of objects related to the image.
3774 * A special "rbd id" object is used to map an rbd image name to its
3775 * id. If that object doesn't exist, then there is no v2 rbd image
3776 * with the supplied name.
3778 * This function will record the given rbd_dev's image_id field if
3779 * it can be determined, and in that case will return 0. If any
3780 * errors occur a negative errno will be returned and the rbd_dev's
3781 * image_id field will be unchanged (and should be NULL).
3783 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3792 * When probing a parent image, the image id is already
3793 * known (and the image name likely is not). There's no
3794 * need to fetch the image id again in this case.
3796 if (rbd_dev->spec->image_id)
3800 * First, see if the format 2 image id file exists, and if
3801 * so, get the image's persistent id from it.
3803 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3804 object_name = kmalloc(size, GFP_NOIO);
3807 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3808 dout("rbd id object name is %s\n", object_name);
3810 /* Response will be an encoded string, which includes a length */
3812 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3813 response = kzalloc(size, GFP_NOIO);
3819 ret = rbd_obj_method_sync(rbd_dev, object_name,
3822 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3823 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3828 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3829 p + RBD_IMAGE_ID_LEN_MAX,
3831 if (IS_ERR(rbd_dev->spec->image_id)) {
3832 ret = PTR_ERR(rbd_dev->spec->image_id);
3833 rbd_dev->spec->image_id = NULL;
3835 dout("image_id is %s\n", rbd_dev->spec->image_id);
3844 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3849 /* Version 1 images have no id; empty string is used */
3851 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3852 if (!rbd_dev->spec->image_id)
3855 /* Record the header object name for this rbd image. */
3857 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3858 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3859 if (!rbd_dev->header_name) {
3863 sprintf(rbd_dev->header_name, "%s%s",
3864 rbd_dev->spec->image_name, RBD_SUFFIX);
3866 /* Populate rbd image metadata */
3868 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3872 /* Version 1 images have no parent (no layering) */
3874 rbd_dev->parent_spec = NULL;
3875 rbd_dev->parent_overlap = 0;
3877 rbd_dev->image_format = 1;
3879 dout("discovered version 1 image, header name is %s\n",
3880 rbd_dev->header_name);
3885 kfree(rbd_dev->header_name);
3886 rbd_dev->header_name = NULL;
3887 kfree(rbd_dev->spec->image_id);
3888 rbd_dev->spec->image_id = NULL;
3893 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3900 * Image id was filled in by the caller. Record the header
3901 * object name for this rbd image.
3903 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3904 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3905 if (!rbd_dev->header_name)
3907 sprintf(rbd_dev->header_name, "%s%s",
3908 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3910 /* Get the size and object order for the image */
3912 ret = rbd_dev_v2_image_size(rbd_dev);
3916 /* Get the object prefix (a.k.a. block_name) for the image */
3918 ret = rbd_dev_v2_object_prefix(rbd_dev);
3922 /* Get the and check features for the image */
3924 ret = rbd_dev_v2_features(rbd_dev);
3928 /* If the image supports layering, get the parent info */
3930 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3931 ret = rbd_dev_v2_parent_info(rbd_dev);
3936 /* crypto and compression type aren't (yet) supported for v2 images */
3938 rbd_dev->header.crypt_type = 0;
3939 rbd_dev->header.comp_type = 0;
3941 /* Get the snapshot context, plus the header version */
3943 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3946 rbd_dev->header.obj_version = ver;
3948 rbd_dev->image_format = 2;
3950 dout("discovered version 2 image, header name is %s\n",
3951 rbd_dev->header_name);
3955 rbd_dev->parent_overlap = 0;
3956 rbd_spec_put(rbd_dev->parent_spec);
3957 rbd_dev->parent_spec = NULL;
3958 kfree(rbd_dev->header_name);
3959 rbd_dev->header_name = NULL;
3960 kfree(rbd_dev->header.object_prefix);
3961 rbd_dev->header.object_prefix = NULL;
3966 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3970 /* no need to lock here, as rbd_dev is not registered yet */
3971 ret = rbd_dev_snaps_update(rbd_dev);
3975 ret = rbd_dev_probe_update_spec(rbd_dev);
3979 ret = rbd_dev_set_mapping(rbd_dev);
3983 /* generate unique id: find highest unique id, add one */
3984 rbd_dev_id_get(rbd_dev);
3986 /* Fill in the device name, now that we have its id. */
3987 BUILD_BUG_ON(DEV_NAME_LEN
3988 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3989 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3991 /* Get our block major device number. */
3993 ret = register_blkdev(0, rbd_dev->name);
3996 rbd_dev->major = ret;
3998 /* Set up the blkdev mapping. */
4000 ret = rbd_init_disk(rbd_dev);
4002 goto err_out_blkdev;
4004 ret = rbd_bus_add_dev(rbd_dev);
4009 * At this point cleanup in the event of an error is the job
4010 * of the sysfs code (initiated by rbd_bus_del_dev()).
4012 down_write(&rbd_dev->header_rwsem);
4013 ret = rbd_dev_snaps_register(rbd_dev);
4014 up_write(&rbd_dev->header_rwsem);
4018 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4022 /* Everything's ready. Announce the disk to the world. */
4024 add_disk(rbd_dev->disk);
4026 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4027 (unsigned long long) rbd_dev->mapping.size);
4031 /* this will also clean up rest of rbd_dev stuff */
4033 rbd_bus_del_dev(rbd_dev);
4037 rbd_free_disk(rbd_dev);
4039 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4041 rbd_dev_id_put(rbd_dev);
4043 rbd_remove_all_snaps(rbd_dev);
4049 * Probe for the existence of the header object for the given rbd
4050 * device. For format 2 images this includes determining the image
4053 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4058 * Get the id from the image id object. If it's not a
4059 * format 2 image, we'll get ENOENT back, and we'll assume
4060 * it's a format 1 image.
4062 ret = rbd_dev_image_id(rbd_dev);
4064 ret = rbd_dev_v1_probe(rbd_dev);
4066 ret = rbd_dev_v2_probe(rbd_dev);
4068 dout("probe failed, returning %d\n", ret);
4073 ret = rbd_dev_probe_finish(rbd_dev);
4075 rbd_header_free(&rbd_dev->header);
4080 static ssize_t rbd_add(struct bus_type *bus,
4084 struct rbd_device *rbd_dev = NULL;
4085 struct ceph_options *ceph_opts = NULL;
4086 struct rbd_options *rbd_opts = NULL;
4087 struct rbd_spec *spec = NULL;
4088 struct rbd_client *rbdc;
4089 struct ceph_osd_client *osdc;
4092 if (!try_module_get(THIS_MODULE))
4095 /* parse add command */
4096 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4098 goto err_out_module;
4100 rbdc = rbd_get_client(ceph_opts);
4105 ceph_opts = NULL; /* rbd_dev client now owns this */
4108 osdc = &rbdc->client->osdc;
4109 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4111 goto err_out_client;
4112 spec->pool_id = (u64) rc;
4114 /* The ceph file layout needs to fit pool id in 32 bits */
4116 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4118 goto err_out_client;
4121 rbd_dev = rbd_dev_create(rbdc, spec);
4123 goto err_out_client;
4124 rbdc = NULL; /* rbd_dev now owns this */
4125 spec = NULL; /* rbd_dev now owns this */
4127 rbd_dev->mapping.read_only = rbd_opts->read_only;
4129 rbd_opts = NULL; /* done with this */
4131 rc = rbd_dev_probe(rbd_dev);
4133 goto err_out_rbd_dev;
4137 rbd_dev_destroy(rbd_dev);
4139 rbd_put_client(rbdc);
4142 ceph_destroy_options(ceph_opts);
4146 module_put(THIS_MODULE);
4148 dout("Error adding device %s\n", buf);
4150 return (ssize_t) rc;
4153 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4155 struct list_head *tmp;
4156 struct rbd_device *rbd_dev;
4158 spin_lock(&rbd_dev_list_lock);
4159 list_for_each(tmp, &rbd_dev_list) {
4160 rbd_dev = list_entry(tmp, struct rbd_device, node);
4161 if (rbd_dev->dev_id == dev_id) {
4162 spin_unlock(&rbd_dev_list_lock);
4166 spin_unlock(&rbd_dev_list_lock);
4170 static void rbd_dev_release(struct device *dev)
4172 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4174 if (rbd_dev->watch_event)
4175 rbd_dev_header_watch_sync(rbd_dev, 0);
4177 /* clean up and free blkdev */
4178 rbd_free_disk(rbd_dev);
4179 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4181 /* release allocated disk header fields */
4182 rbd_header_free(&rbd_dev->header);
4184 /* done with the id, and with the rbd_dev */
4185 rbd_dev_id_put(rbd_dev);
4186 rbd_assert(rbd_dev->rbd_client != NULL);
4187 rbd_dev_destroy(rbd_dev);
4189 /* release module ref */
4190 module_put(THIS_MODULE);
4193 static ssize_t rbd_remove(struct bus_type *bus,
4197 struct rbd_device *rbd_dev = NULL;
4202 rc = strict_strtoul(buf, 10, &ul);
4206 /* convert to int; abort if we lost anything in the conversion */
4207 target_id = (int) ul;
4208 if (target_id != ul)
4211 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4213 rbd_dev = __rbd_get_dev(target_id);
4219 spin_lock_irq(&rbd_dev->lock);
4220 if (rbd_dev->open_count)
4223 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4224 spin_unlock_irq(&rbd_dev->lock);
4228 rbd_remove_all_snaps(rbd_dev);
4229 rbd_bus_del_dev(rbd_dev);
4232 mutex_unlock(&ctl_mutex);
4238 * create control files in sysfs
4241 static int rbd_sysfs_init(void)
4245 ret = device_register(&rbd_root_dev);
4249 ret = bus_register(&rbd_bus_type);
4251 device_unregister(&rbd_root_dev);
4256 static void rbd_sysfs_cleanup(void)
4258 bus_unregister(&rbd_bus_type);
4259 device_unregister(&rbd_root_dev);
4262 static int __init rbd_init(void)
4266 if (!libceph_compatible(NULL)) {
4267 rbd_warn(NULL, "libceph incompatibility (quitting)");
4271 rc = rbd_sysfs_init();
4274 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4278 static void __exit rbd_exit(void)
4280 rbd_sysfs_cleanup();
4283 module_init(rbd_init);
4284 module_exit(rbd_exit);
4286 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4287 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4288 MODULE_DESCRIPTION("rados block device");
4290 /* following authorship retained from original osdblk.c */
4291 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4293 MODULE_LICENSE("GPL");