3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
37 #include <linux/kernel.h>
38 #include <linux/device.h>
39 #include <linux/module.h>
41 #include <linux/blkdev.h>
43 #include "rbd_types.h"
45 #define RBD_DEBUG /* Activate rbd_assert() calls */
48 * The basic unit of block I/O is a sector. It is interpreted in a
49 * number of contexts in Linux (blk, bio, genhd), but the default is
50 * universally 512 bytes. These symbols are just slightly more
51 * meaningful than the bare numbers they represent.
53 #define SECTOR_SHIFT 9
54 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
56 #define RBD_DRV_NAME "rbd"
57 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
59 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
61 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
62 #define RBD_MAX_SNAP_NAME_LEN \
63 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
65 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
67 #define RBD_SNAP_HEAD_NAME "-"
69 /* This allows a single page to hold an image name sent by OSD */
70 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
71 #define RBD_IMAGE_ID_LEN_MAX 64
73 #define RBD_OBJ_PREFIX_LEN_MAX 64
77 #define RBD_FEATURE_LAYERING (1<<0)
78 #define RBD_FEATURE_STRIPINGV2 (1<<1)
79 #define RBD_FEATURES_ALL \
80 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
82 /* Features supported by this (client software) implementation. */
84 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
87 * An RBD device name will be "rbd#", where the "rbd" comes from
88 * RBD_DRV_NAME above, and # is a unique integer identifier.
89 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
90 * enough to hold all possible device names.
92 #define DEV_NAME_LEN 32
93 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
96 * block device image metadata (in-memory version)
98 struct rbd_image_header {
99 /* These four fields never change for a given rbd image */
106 /* The remaining fields need to be updated occasionally */
108 struct ceph_snap_context *snapc;
117 * An rbd image specification.
119 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
120 * identify an image. Each rbd_dev structure includes a pointer to
121 * an rbd_spec structure that encapsulates this identity.
123 * Each of the id's in an rbd_spec has an associated name. For a
124 * user-mapped image, the names are supplied and the id's associated
125 * with them are looked up. For a layered image, a parent image is
126 * defined by the tuple, and the names are looked up.
128 * An rbd_dev structure contains a parent_spec pointer which is
129 * non-null if the image it represents is a child in a layered
130 * image. This pointer will refer to the rbd_spec structure used
131 * by the parent rbd_dev for its own identity (i.e., the structure
132 * is shared between the parent and child).
134 * Since these structures are populated once, during the discovery
135 * phase of image construction, they are effectively immutable so
136 * we make no effort to synchronize access to them.
138 * Note that code herein does not assume the image name is known (it
139 * could be a null pointer).
143 const char *pool_name;
145 const char *image_id;
146 const char *image_name;
149 const char *snap_name;
155 * an instance of the client. multiple devices may share an rbd client.
158 struct ceph_client *client;
160 struct list_head node;
163 struct rbd_img_request;
164 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
168 struct rbd_obj_request;
169 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171 enum obj_request_type {
172 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
177 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
178 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
179 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
182 struct rbd_obj_request {
183 const char *object_name;
184 u64 offset; /* object start byte */
185 u64 length; /* bytes from offset */
189 * An object request associated with an image will have its
190 * img_data flag set; a standalone object request will not.
192 * A standalone object request will have which == BAD_WHICH
193 * and a null obj_request pointer.
195 * An object request initiated in support of a layered image
196 * object (to check for its existence before a write) will
197 * have which == BAD_WHICH and a non-null obj_request pointer.
199 * Finally, an object request for rbd image data will have
200 * which != BAD_WHICH, and will have a non-null img_request
201 * pointer. The value of which will be in the range
202 * 0..(img_request->obj_request_count-1).
205 struct rbd_obj_request *obj_request; /* STAT op */
207 struct rbd_img_request *img_request;
209 /* links for img_request->obj_requests list */
210 struct list_head links;
213 u32 which; /* posn image request list */
215 enum obj_request_type type;
217 struct bio *bio_list;
223 struct page **copyup_pages;
225 struct ceph_osd_request *osd_req;
227 u64 xferred; /* bytes transferred */
231 rbd_obj_callback_t callback;
232 struct completion completion;
238 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
239 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
240 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
243 struct rbd_img_request {
244 struct rbd_device *rbd_dev;
245 u64 offset; /* starting image byte offset */
246 u64 length; /* byte count from offset */
249 u64 snap_id; /* for reads */
250 struct ceph_snap_context *snapc; /* for writes */
253 struct request *rq; /* block request */
254 struct rbd_obj_request *obj_request; /* obj req initiator */
256 struct page **copyup_pages;
257 spinlock_t completion_lock;/* protects next_completion */
259 rbd_img_callback_t callback;
260 u64 xferred;/* aggregate bytes transferred */
261 int result; /* first nonzero obj_request result */
263 u32 obj_request_count;
264 struct list_head obj_requests; /* rbd_obj_request structs */
269 #define for_each_obj_request(ireq, oreq) \
270 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
271 #define for_each_obj_request_from(ireq, oreq) \
272 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_safe(ireq, oreq, n) \
274 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
279 struct list_head node;
294 int dev_id; /* blkdev unique id */
296 int major; /* blkdev assigned major */
297 struct gendisk *disk; /* blkdev's gendisk and rq */
299 u32 image_format; /* Either 1 or 2 */
300 struct rbd_client *rbd_client;
302 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304 spinlock_t lock; /* queue, flags, open_count */
306 struct rbd_image_header header;
307 unsigned long flags; /* possibly lock protected */
308 struct rbd_spec *spec;
312 struct ceph_file_layout layout;
314 struct ceph_osd_event *watch_event;
315 struct rbd_obj_request *watch_request;
317 struct rbd_spec *parent_spec;
319 struct rbd_device *parent;
321 /* protects updating the header */
322 struct rw_semaphore header_rwsem;
324 struct rbd_mapping mapping;
326 struct list_head node;
328 /* list of snapshots */
329 struct list_head snaps;
333 unsigned long open_count; /* protected by lock */
337 * Flag bits for rbd_dev->flags. If atomicity is required,
338 * rbd_dev->lock is used to protect access.
340 * Currently, only the "removing" flag (which is coupled with the
341 * "open_count" field) requires atomic access.
344 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
345 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
348 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
350 static LIST_HEAD(rbd_dev_list); /* devices */
351 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353 static LIST_HEAD(rbd_client_list); /* clients */
354 static DEFINE_SPINLOCK(rbd_client_list_lock);
356 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360 static void rbd_dev_device_release(struct device *dev);
361 static void rbd_snap_destroy(struct rbd_snap *snap);
363 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369 static struct bus_attribute rbd_bus_attrs[] = {
370 __ATTR(add, S_IWUSR, NULL, rbd_add),
371 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
375 static struct bus_type rbd_bus_type = {
377 .bus_attrs = rbd_bus_attrs,
380 static void rbd_root_dev_release(struct device *dev)
384 static struct device rbd_root_dev = {
386 .release = rbd_root_dev_release,
389 static __printf(2, 3)
390 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 struct va_format vaf;
400 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
401 else if (rbd_dev->disk)
402 printk(KERN_WARNING "%s: %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
404 else if (rbd_dev->spec && rbd_dev->spec->image_name)
405 printk(KERN_WARNING "%s: image %s: %pV\n",
406 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
407 else if (rbd_dev->spec && rbd_dev->spec->image_id)
408 printk(KERN_WARNING "%s: id %s: %pV\n",
409 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
412 RBD_DRV_NAME, rbd_dev, &vaf);
417 #define rbd_assert(expr) \
418 if (unlikely(!(expr))) { \
419 printk(KERN_ERR "\nAssertion failure in %s() " \
421 "\trbd_assert(%s);\n\n", \
422 __func__, __LINE__, #expr); \
425 #else /* !RBD_DEBUG */
426 # define rbd_assert(expr) ((void) 0)
427 #endif /* !RBD_DEBUG */
429 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439 bool removing = false;
441 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
444 spin_lock_irq(&rbd_dev->lock);
445 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448 rbd_dev->open_count++;
449 spin_unlock_irq(&rbd_dev->lock);
453 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454 (void) get_device(&rbd_dev->dev);
455 set_device_ro(bdev, rbd_dev->mapping.read_only);
456 mutex_unlock(&ctl_mutex);
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 struct rbd_device *rbd_dev = disk->private_data;
464 unsigned long open_count_before;
466 spin_lock_irq(&rbd_dev->lock);
467 open_count_before = rbd_dev->open_count--;
468 spin_unlock_irq(&rbd_dev->lock);
469 rbd_assert(open_count_before > 0);
471 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472 put_device(&rbd_dev->dev);
473 mutex_unlock(&ctl_mutex);
478 static const struct block_device_operations rbd_bd_ops = {
479 .owner = THIS_MODULE,
481 .release = rbd_release,
485 * Initialize an rbd client instance.
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 struct rbd_client *rbdc;
493 dout("%s:\n", __func__);
494 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
498 kref_init(&rbdc->kref);
499 INIT_LIST_HEAD(&rbdc->node);
501 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504 if (IS_ERR(rbdc->client))
506 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508 ret = ceph_open_session(rbdc->client);
512 spin_lock(&rbd_client_list_lock);
513 list_add_tail(&rbdc->node, &rbd_client_list);
514 spin_unlock(&rbd_client_list_lock);
516 mutex_unlock(&ctl_mutex);
517 dout("%s: rbdc %p\n", __func__, rbdc);
522 ceph_destroy_client(rbdc->client);
524 mutex_unlock(&ctl_mutex);
528 ceph_destroy_options(ceph_opts);
529 dout("%s: error %d\n", __func__, ret);
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 kref_get(&rbdc->kref);
542 * Find a ceph client with specific addr and configuration. If
543 * found, bump its reference count.
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 struct rbd_client *client_node;
550 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
553 spin_lock(&rbd_client_list_lock);
554 list_for_each_entry(client_node, &rbd_client_list, node) {
555 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556 __rbd_get_client(client_node);
562 spin_unlock(&rbd_client_list_lock);
564 return found ? client_node : NULL;
574 /* string args above */
577 /* Boolean args above */
581 static match_table_t rbd_opts_tokens = {
583 /* string args above */
584 {Opt_read_only, "read_only"},
585 {Opt_read_only, "ro"}, /* Alternate spelling */
586 {Opt_read_write, "read_write"},
587 {Opt_read_write, "rw"}, /* Alternate spelling */
588 /* Boolean args above */
596 #define RBD_READ_ONLY_DEFAULT false
598 static int parse_rbd_opts_token(char *c, void *private)
600 struct rbd_options *rbd_opts = private;
601 substring_t argstr[MAX_OPT_ARGS];
602 int token, intval, ret;
604 token = match_token(c, rbd_opts_tokens, argstr);
608 if (token < Opt_last_int) {
609 ret = match_int(&argstr[0], &intval);
611 pr_err("bad mount option arg (not int) "
615 dout("got int token %d val %d\n", token, intval);
616 } else if (token > Opt_last_int && token < Opt_last_string) {
617 dout("got string token %d val %s\n", token,
619 } else if (token > Opt_last_string && token < Opt_last_bool) {
620 dout("got Boolean token %d\n", token);
622 dout("got token %d\n", token);
627 rbd_opts->read_only = true;
630 rbd_opts->read_only = false;
640 * Get a ceph client with specific addr and configuration, if one does
641 * not exist create it.
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 struct rbd_client *rbdc;
647 rbdc = rbd_client_find(ceph_opts);
648 if (rbdc) /* using an existing client */
649 ceph_destroy_options(ceph_opts);
651 rbdc = rbd_client_create(ceph_opts);
657 * Destroy ceph client
659 * Caller must hold rbd_client_list_lock.
661 static void rbd_client_release(struct kref *kref)
663 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665 dout("%s: rbdc %p\n", __func__, rbdc);
666 spin_lock(&rbd_client_list_lock);
667 list_del(&rbdc->node);
668 spin_unlock(&rbd_client_list_lock);
670 ceph_destroy_client(rbdc->client);
675 * Drop reference to ceph client node. If it's not referenced anymore, release
678 static void rbd_put_client(struct rbd_client *rbdc)
681 kref_put(&rbdc->kref, rbd_client_release);
684 static bool rbd_image_format_valid(u32 image_format)
686 return image_format == 1 || image_format == 2;
689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
694 /* The header has to start with the magic rbd header text */
695 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698 /* The bio layer requires at least sector-sized I/O */
700 if (ondisk->options.order < SECTOR_SHIFT)
703 /* If we use u64 in a few spots we may be able to loosen this */
705 if (ondisk->options.order > 8 * sizeof (int) - 1)
709 * The size of a snapshot header has to fit in a size_t, and
710 * that limits the number of snapshots.
712 snap_count = le32_to_cpu(ondisk->snap_count);
713 size = SIZE_MAX - sizeof (struct ceph_snap_context);
714 if (snap_count > size / sizeof (__le64))
718 * Not only that, but the size of the entire the snapshot
719 * header must also be representable in a size_t.
721 size -= snap_count * sizeof (__le64);
722 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
729 * Create a new header structure, translate header format from the on-disk
732 static int rbd_header_from_disk(struct rbd_image_header *header,
733 struct rbd_image_header_ondisk *ondisk)
740 memset(header, 0, sizeof (*header));
742 snap_count = le32_to_cpu(ondisk->snap_count);
744 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746 if (!header->object_prefix)
748 memcpy(header->object_prefix, ondisk->object_prefix, len);
749 header->object_prefix[len] = '\0';
752 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
754 /* Save a copy of the snapshot names */
756 if (snap_names_len > (u64) SIZE_MAX)
758 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759 if (!header->snap_names)
762 * Note that rbd_dev_v1_header_read() guarantees
763 * the ondisk buffer we're working with has
764 * snap_names_len bytes beyond the end of the
765 * snapshot id array, this memcpy() is safe.
767 memcpy(header->snap_names, &ondisk->snaps[snap_count],
770 /* Record each snapshot's size */
772 size = snap_count * sizeof (*header->snap_sizes);
773 header->snap_sizes = kmalloc(size, GFP_KERNEL);
774 if (!header->snap_sizes)
776 for (i = 0; i < snap_count; i++)
777 header->snap_sizes[i] =
778 le64_to_cpu(ondisk->snaps[i].image_size);
780 header->snap_names = NULL;
781 header->snap_sizes = NULL;
784 header->features = 0; /* No features support in v1 images */
785 header->obj_order = ondisk->options.order;
786 header->crypt_type = ondisk->options.crypt_type;
787 header->comp_type = ondisk->options.comp_type;
789 /* Allocate and fill in the snapshot context */
791 header->image_size = le64_to_cpu(ondisk->image_size);
793 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
796 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
797 for (i = 0; i < snap_count; i++)
798 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
803 kfree(header->snap_sizes);
804 header->snap_sizes = NULL;
805 kfree(header->snap_names);
806 header->snap_names = NULL;
807 kfree(header->object_prefix);
808 header->object_prefix = NULL;
813 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
815 struct rbd_snap *snap;
817 if (snap_id == CEPH_NOSNAP)
818 return RBD_SNAP_HEAD_NAME;
820 list_for_each_entry(snap, &rbd_dev->snaps, node)
821 if (snap_id == snap->id)
827 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
828 const char *snap_name)
830 struct rbd_snap *snap;
832 list_for_each_entry(snap, &rbd_dev->snaps, node)
833 if (!strcmp(snap_name, snap->name))
839 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
841 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
842 sizeof (RBD_SNAP_HEAD_NAME))) {
843 rbd_dev->mapping.size = rbd_dev->header.image_size;
844 rbd_dev->mapping.features = rbd_dev->header.features;
846 struct rbd_snap *snap;
848 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
851 rbd_dev->mapping.size = snap->size;
852 rbd_dev->mapping.features = snap->features;
853 rbd_dev->mapping.read_only = true;
859 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
861 rbd_dev->mapping.size = 0;
862 rbd_dev->mapping.features = 0;
863 rbd_dev->mapping.read_only = true;
866 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
868 rbd_dev->mapping.size = 0;
869 rbd_dev->mapping.features = 0;
870 rbd_dev->mapping.read_only = true;
873 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
879 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
882 segment = offset >> rbd_dev->header.obj_order;
883 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
884 rbd_dev->header.object_prefix, segment);
885 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
886 pr_err("error formatting segment name for #%llu (%d)\n",
895 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
897 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
899 return offset & (segment_size - 1);
902 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
903 u64 offset, u64 length)
905 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
907 offset &= segment_size - 1;
909 rbd_assert(length <= U64_MAX - offset);
910 if (offset + length > segment_size)
911 length = segment_size - offset;
917 * returns the size of an object in the image
919 static u64 rbd_obj_bytes(struct rbd_image_header *header)
921 return 1 << header->obj_order;
928 static void bio_chain_put(struct bio *chain)
934 chain = chain->bi_next;
940 * zeros a bio chain, starting at specific offset
942 static void zero_bio_chain(struct bio *chain, int start_ofs)
951 bio_for_each_segment(bv, chain, i) {
952 if (pos + bv->bv_len > start_ofs) {
953 int remainder = max(start_ofs - pos, 0);
954 buf = bvec_kmap_irq(bv, &flags);
955 memset(buf + remainder, 0,
956 bv->bv_len - remainder);
957 bvec_kunmap_irq(buf, &flags);
962 chain = chain->bi_next;
967 * similar to zero_bio_chain(), zeros data defined by a page array,
968 * starting at the given byte offset from the start of the array and
969 * continuing up to the given end offset. The pages array is
970 * assumed to be big enough to hold all bytes up to the end.
972 static void zero_pages(struct page **pages, u64 offset, u64 end)
974 struct page **page = &pages[offset >> PAGE_SHIFT];
976 rbd_assert(end > offset);
977 rbd_assert(end - offset <= (u64)SIZE_MAX);
978 while (offset < end) {
984 page_offset = (size_t)(offset & ~PAGE_MASK);
985 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
986 local_irq_save(flags);
987 kaddr = kmap_atomic(*page);
988 memset(kaddr + page_offset, 0, length);
989 kunmap_atomic(kaddr);
990 local_irq_restore(flags);
998 * Clone a portion of a bio, starting at the given byte offset
999 * and continuing for the number of bytes indicated.
1001 static struct bio *bio_clone_range(struct bio *bio_src,
1002 unsigned int offset,
1010 unsigned short end_idx;
1011 unsigned short vcnt;
1014 /* Handle the easy case for the caller */
1016 if (!offset && len == bio_src->bi_size)
1017 return bio_clone(bio_src, gfpmask);
1019 if (WARN_ON_ONCE(!len))
1021 if (WARN_ON_ONCE(len > bio_src->bi_size))
1023 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1026 /* Find first affected segment... */
1029 __bio_for_each_segment(bv, bio_src, idx, 0) {
1030 if (resid < bv->bv_len)
1032 resid -= bv->bv_len;
1036 /* ...and the last affected segment */
1039 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1040 if (resid <= bv->bv_len)
1042 resid -= bv->bv_len;
1044 vcnt = end_idx - idx + 1;
1046 /* Build the clone */
1048 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1050 return NULL; /* ENOMEM */
1052 bio->bi_bdev = bio_src->bi_bdev;
1053 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1054 bio->bi_rw = bio_src->bi_rw;
1055 bio->bi_flags |= 1 << BIO_CLONED;
1058 * Copy over our part of the bio_vec, then update the first
1059 * and last (or only) entries.
1061 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1062 vcnt * sizeof (struct bio_vec));
1063 bio->bi_io_vec[0].bv_offset += voff;
1065 bio->bi_io_vec[0].bv_len -= voff;
1066 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1068 bio->bi_io_vec[0].bv_len = len;
1071 bio->bi_vcnt = vcnt;
1079 * Clone a portion of a bio chain, starting at the given byte offset
1080 * into the first bio in the source chain and continuing for the
1081 * number of bytes indicated. The result is another bio chain of
1082 * exactly the given length, or a null pointer on error.
1084 * The bio_src and offset parameters are both in-out. On entry they
1085 * refer to the first source bio and the offset into that bio where
1086 * the start of data to be cloned is located.
1088 * On return, bio_src is updated to refer to the bio in the source
1089 * chain that contains first un-cloned byte, and *offset will
1090 * contain the offset of that byte within that bio.
1092 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1093 unsigned int *offset,
1097 struct bio *bi = *bio_src;
1098 unsigned int off = *offset;
1099 struct bio *chain = NULL;
1102 /* Build up a chain of clone bios up to the limit */
1104 if (!bi || off >= bi->bi_size || !len)
1105 return NULL; /* Nothing to clone */
1109 unsigned int bi_size;
1113 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1114 goto out_err; /* EINVAL; ran out of bio's */
1116 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1117 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1119 goto out_err; /* ENOMEM */
1122 end = &bio->bi_next;
1125 if (off == bi->bi_size) {
1136 bio_chain_put(chain);
1142 * The default/initial value for all object request flags is 0. For
1143 * each flag, once its value is set to 1 it is never reset to 0
1146 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1148 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1149 struct rbd_device *rbd_dev;
1151 rbd_dev = obj_request->img_request->rbd_dev;
1152 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1157 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1160 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1163 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1165 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1166 struct rbd_device *rbd_dev = NULL;
1168 if (obj_request_img_data_test(obj_request))
1169 rbd_dev = obj_request->img_request->rbd_dev;
1170 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1175 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1178 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1182 * This sets the KNOWN flag after (possibly) setting the EXISTS
1183 * flag. The latter is set based on the "exists" value provided.
1185 * Note that for our purposes once an object exists it never goes
1186 * away again. It's possible that the response from two existence
1187 * checks are separated by the creation of the target object, and
1188 * the first ("doesn't exist") response arrives *after* the second
1189 * ("does exist"). In that case we ignore the second one.
1191 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1195 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1196 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1200 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1203 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1206 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1209 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1212 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1214 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1215 atomic_read(&obj_request->kref.refcount));
1216 kref_get(&obj_request->kref);
1219 static void rbd_obj_request_destroy(struct kref *kref);
1220 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1222 rbd_assert(obj_request != NULL);
1223 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1224 atomic_read(&obj_request->kref.refcount));
1225 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1228 static void rbd_img_request_get(struct rbd_img_request *img_request)
1230 dout("%s: img %p (was %d)\n", __func__, img_request,
1231 atomic_read(&img_request->kref.refcount));
1232 kref_get(&img_request->kref);
1235 static void rbd_img_request_destroy(struct kref *kref);
1236 static void rbd_img_request_put(struct rbd_img_request *img_request)
1238 rbd_assert(img_request != NULL);
1239 dout("%s: img %p (was %d)\n", __func__, img_request,
1240 atomic_read(&img_request->kref.refcount));
1241 kref_put(&img_request->kref, rbd_img_request_destroy);
1244 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1245 struct rbd_obj_request *obj_request)
1247 rbd_assert(obj_request->img_request == NULL);
1249 /* Image request now owns object's original reference */
1250 obj_request->img_request = img_request;
1251 obj_request->which = img_request->obj_request_count;
1252 rbd_assert(!obj_request_img_data_test(obj_request));
1253 obj_request_img_data_set(obj_request);
1254 rbd_assert(obj_request->which != BAD_WHICH);
1255 img_request->obj_request_count++;
1256 list_add_tail(&obj_request->links, &img_request->obj_requests);
1257 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1258 obj_request->which);
1261 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1262 struct rbd_obj_request *obj_request)
1264 rbd_assert(obj_request->which != BAD_WHICH);
1266 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1267 obj_request->which);
1268 list_del(&obj_request->links);
1269 rbd_assert(img_request->obj_request_count > 0);
1270 img_request->obj_request_count--;
1271 rbd_assert(obj_request->which == img_request->obj_request_count);
1272 obj_request->which = BAD_WHICH;
1273 rbd_assert(obj_request_img_data_test(obj_request));
1274 rbd_assert(obj_request->img_request == img_request);
1275 obj_request->img_request = NULL;
1276 obj_request->callback = NULL;
1277 rbd_obj_request_put(obj_request);
1280 static bool obj_request_type_valid(enum obj_request_type type)
1283 case OBJ_REQUEST_NODATA:
1284 case OBJ_REQUEST_BIO:
1285 case OBJ_REQUEST_PAGES:
1292 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1293 struct rbd_obj_request *obj_request)
1295 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1297 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1300 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1303 dout("%s: img %p\n", __func__, img_request);
1306 * If no error occurred, compute the aggregate transfer
1307 * count for the image request. We could instead use
1308 * atomic64_cmpxchg() to update it as each object request
1309 * completes; not clear which way is better off hand.
1311 if (!img_request->result) {
1312 struct rbd_obj_request *obj_request;
1315 for_each_obj_request(img_request, obj_request)
1316 xferred += obj_request->xferred;
1317 img_request->xferred = xferred;
1320 if (img_request->callback)
1321 img_request->callback(img_request);
1323 rbd_img_request_put(img_request);
1326 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1328 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1330 dout("%s: obj %p\n", __func__, obj_request);
1332 return wait_for_completion_interruptible(&obj_request->completion);
1336 * The default/initial value for all image request flags is 0. Each
1337 * is conditionally set to 1 at image request initialization time
1338 * and currently never change thereafter.
1340 static void img_request_write_set(struct rbd_img_request *img_request)
1342 set_bit(IMG_REQ_WRITE, &img_request->flags);
1346 static bool img_request_write_test(struct rbd_img_request *img_request)
1349 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1352 static void img_request_child_set(struct rbd_img_request *img_request)
1354 set_bit(IMG_REQ_CHILD, &img_request->flags);
1358 static bool img_request_child_test(struct rbd_img_request *img_request)
1361 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1364 static void img_request_layered_set(struct rbd_img_request *img_request)
1366 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1370 static bool img_request_layered_test(struct rbd_img_request *img_request)
1373 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1377 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1379 u64 xferred = obj_request->xferred;
1380 u64 length = obj_request->length;
1382 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1383 obj_request, obj_request->img_request, obj_request->result,
1386 * ENOENT means a hole in the image. We zero-fill the
1387 * entire length of the request. A short read also implies
1388 * zero-fill to the end of the request. Either way we
1389 * update the xferred count to indicate the whole request
1392 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1393 if (obj_request->result == -ENOENT) {
1394 if (obj_request->type == OBJ_REQUEST_BIO)
1395 zero_bio_chain(obj_request->bio_list, 0);
1397 zero_pages(obj_request->pages, 0, length);
1398 obj_request->result = 0;
1399 obj_request->xferred = length;
1400 } else if (xferred < length && !obj_request->result) {
1401 if (obj_request->type == OBJ_REQUEST_BIO)
1402 zero_bio_chain(obj_request->bio_list, xferred);
1404 zero_pages(obj_request->pages, xferred, length);
1405 obj_request->xferred = length;
1407 obj_request_done_set(obj_request);
1410 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1412 dout("%s: obj %p cb %p\n", __func__, obj_request,
1413 obj_request->callback);
1414 if (obj_request->callback)
1415 obj_request->callback(obj_request);
1417 complete_all(&obj_request->completion);
1420 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1422 dout("%s: obj %p\n", __func__, obj_request);
1423 obj_request_done_set(obj_request);
1426 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1428 struct rbd_img_request *img_request = NULL;
1429 struct rbd_device *rbd_dev = NULL;
1430 bool layered = false;
1432 if (obj_request_img_data_test(obj_request)) {
1433 img_request = obj_request->img_request;
1434 layered = img_request && img_request_layered_test(img_request);
1435 rbd_dev = img_request->rbd_dev;
1438 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1439 obj_request, img_request, obj_request->result,
1440 obj_request->xferred, obj_request->length);
1441 if (layered && obj_request->result == -ENOENT &&
1442 obj_request->img_offset < rbd_dev->parent_overlap)
1443 rbd_img_parent_read(obj_request);
1444 else if (img_request)
1445 rbd_img_obj_request_read_callback(obj_request);
1447 obj_request_done_set(obj_request);
1450 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1452 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1453 obj_request->result, obj_request->length);
1455 * There is no such thing as a successful short write. Set
1456 * it to our originally-requested length.
1458 obj_request->xferred = obj_request->length;
1459 obj_request_done_set(obj_request);
1463 * For a simple stat call there's nothing to do. We'll do more if
1464 * this is part of a write sequence for a layered image.
1466 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1468 dout("%s: obj %p\n", __func__, obj_request);
1469 obj_request_done_set(obj_request);
1472 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1473 struct ceph_msg *msg)
1475 struct rbd_obj_request *obj_request = osd_req->r_priv;
1478 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1479 rbd_assert(osd_req == obj_request->osd_req);
1480 if (obj_request_img_data_test(obj_request)) {
1481 rbd_assert(obj_request->img_request);
1482 rbd_assert(obj_request->which != BAD_WHICH);
1484 rbd_assert(obj_request->which == BAD_WHICH);
1487 if (osd_req->r_result < 0)
1488 obj_request->result = osd_req->r_result;
1489 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1491 BUG_ON(osd_req->r_num_ops > 2);
1494 * We support a 64-bit length, but ultimately it has to be
1495 * passed to blk_end_request(), which takes an unsigned int.
1497 obj_request->xferred = osd_req->r_reply_op_len[0];
1498 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1499 opcode = osd_req->r_ops[0].op;
1501 case CEPH_OSD_OP_READ:
1502 rbd_osd_read_callback(obj_request);
1504 case CEPH_OSD_OP_WRITE:
1505 rbd_osd_write_callback(obj_request);
1507 case CEPH_OSD_OP_STAT:
1508 rbd_osd_stat_callback(obj_request);
1510 case CEPH_OSD_OP_CALL:
1511 case CEPH_OSD_OP_NOTIFY_ACK:
1512 case CEPH_OSD_OP_WATCH:
1513 rbd_osd_trivial_callback(obj_request);
1516 rbd_warn(NULL, "%s: unsupported op %hu\n",
1517 obj_request->object_name, (unsigned short) opcode);
1521 if (obj_request_done_test(obj_request))
1522 rbd_obj_request_complete(obj_request);
1525 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1527 struct rbd_img_request *img_request = obj_request->img_request;
1528 struct ceph_osd_request *osd_req = obj_request->osd_req;
1531 rbd_assert(osd_req != NULL);
1533 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1534 ceph_osdc_build_request(osd_req, obj_request->offset,
1535 NULL, snap_id, NULL);
1538 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1540 struct rbd_img_request *img_request = obj_request->img_request;
1541 struct ceph_osd_request *osd_req = obj_request->osd_req;
1542 struct ceph_snap_context *snapc;
1543 struct timespec mtime = CURRENT_TIME;
1545 rbd_assert(osd_req != NULL);
1547 snapc = img_request ? img_request->snapc : NULL;
1548 ceph_osdc_build_request(osd_req, obj_request->offset,
1549 snapc, CEPH_NOSNAP, &mtime);
1552 static struct ceph_osd_request *rbd_osd_req_create(
1553 struct rbd_device *rbd_dev,
1555 struct rbd_obj_request *obj_request)
1557 struct ceph_snap_context *snapc = NULL;
1558 struct ceph_osd_client *osdc;
1559 struct ceph_osd_request *osd_req;
1561 if (obj_request_img_data_test(obj_request)) {
1562 struct rbd_img_request *img_request = obj_request->img_request;
1564 rbd_assert(write_request ==
1565 img_request_write_test(img_request));
1567 snapc = img_request->snapc;
1570 /* Allocate and initialize the request, for the single op */
1572 osdc = &rbd_dev->rbd_client->client->osdc;
1573 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1575 return NULL; /* ENOMEM */
1578 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1580 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1582 osd_req->r_callback = rbd_osd_req_callback;
1583 osd_req->r_priv = obj_request;
1585 osd_req->r_oid_len = strlen(obj_request->object_name);
1586 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1587 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1589 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1595 * Create a copyup osd request based on the information in the
1596 * object request supplied. A copyup request has two osd ops,
1597 * a copyup method call, and a "normal" write request.
1599 static struct ceph_osd_request *
1600 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1602 struct rbd_img_request *img_request;
1603 struct ceph_snap_context *snapc;
1604 struct rbd_device *rbd_dev;
1605 struct ceph_osd_client *osdc;
1606 struct ceph_osd_request *osd_req;
1608 rbd_assert(obj_request_img_data_test(obj_request));
1609 img_request = obj_request->img_request;
1610 rbd_assert(img_request);
1611 rbd_assert(img_request_write_test(img_request));
1613 /* Allocate and initialize the request, for the two ops */
1615 snapc = img_request->snapc;
1616 rbd_dev = img_request->rbd_dev;
1617 osdc = &rbd_dev->rbd_client->client->osdc;
1618 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1620 return NULL; /* ENOMEM */
1622 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1623 osd_req->r_callback = rbd_osd_req_callback;
1624 osd_req->r_priv = obj_request;
1626 osd_req->r_oid_len = strlen(obj_request->object_name);
1627 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1628 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1630 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1636 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1638 ceph_osdc_put_request(osd_req);
1641 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1643 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1644 u64 offset, u64 length,
1645 enum obj_request_type type)
1647 struct rbd_obj_request *obj_request;
1651 rbd_assert(obj_request_type_valid(type));
1653 size = strlen(object_name) + 1;
1654 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1658 name = (char *)(obj_request + 1);
1659 obj_request->object_name = memcpy(name, object_name, size);
1660 obj_request->offset = offset;
1661 obj_request->length = length;
1662 obj_request->flags = 0;
1663 obj_request->which = BAD_WHICH;
1664 obj_request->type = type;
1665 INIT_LIST_HEAD(&obj_request->links);
1666 init_completion(&obj_request->completion);
1667 kref_init(&obj_request->kref);
1669 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1670 offset, length, (int)type, obj_request);
1675 static void rbd_obj_request_destroy(struct kref *kref)
1677 struct rbd_obj_request *obj_request;
1679 obj_request = container_of(kref, struct rbd_obj_request, kref);
1681 dout("%s: obj %p\n", __func__, obj_request);
1683 rbd_assert(obj_request->img_request == NULL);
1684 rbd_assert(obj_request->which == BAD_WHICH);
1686 if (obj_request->osd_req)
1687 rbd_osd_req_destroy(obj_request->osd_req);
1689 rbd_assert(obj_request_type_valid(obj_request->type));
1690 switch (obj_request->type) {
1691 case OBJ_REQUEST_NODATA:
1692 break; /* Nothing to do */
1693 case OBJ_REQUEST_BIO:
1694 if (obj_request->bio_list)
1695 bio_chain_put(obj_request->bio_list);
1697 case OBJ_REQUEST_PAGES:
1698 if (obj_request->pages)
1699 ceph_release_page_vector(obj_request->pages,
1700 obj_request->page_count);
1708 * Caller is responsible for filling in the list of object requests
1709 * that comprises the image request, and the Linux request pointer
1710 * (if there is one).
1712 static struct rbd_img_request *rbd_img_request_create(
1713 struct rbd_device *rbd_dev,
1714 u64 offset, u64 length,
1718 struct rbd_img_request *img_request;
1720 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1724 if (write_request) {
1725 down_read(&rbd_dev->header_rwsem);
1726 ceph_get_snap_context(rbd_dev->header.snapc);
1727 up_read(&rbd_dev->header_rwsem);
1730 img_request->rq = NULL;
1731 img_request->rbd_dev = rbd_dev;
1732 img_request->offset = offset;
1733 img_request->length = length;
1734 img_request->flags = 0;
1735 if (write_request) {
1736 img_request_write_set(img_request);
1737 img_request->snapc = rbd_dev->header.snapc;
1739 img_request->snap_id = rbd_dev->spec->snap_id;
1742 img_request_child_set(img_request);
1743 if (rbd_dev->parent_spec)
1744 img_request_layered_set(img_request);
1745 spin_lock_init(&img_request->completion_lock);
1746 img_request->next_completion = 0;
1747 img_request->callback = NULL;
1748 img_request->result = 0;
1749 img_request->obj_request_count = 0;
1750 INIT_LIST_HEAD(&img_request->obj_requests);
1751 kref_init(&img_request->kref);
1753 rbd_img_request_get(img_request); /* Avoid a warning */
1754 rbd_img_request_put(img_request); /* TEMPORARY */
1756 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1757 write_request ? "write" : "read", offset, length,
1763 static void rbd_img_request_destroy(struct kref *kref)
1765 struct rbd_img_request *img_request;
1766 struct rbd_obj_request *obj_request;
1767 struct rbd_obj_request *next_obj_request;
1769 img_request = container_of(kref, struct rbd_img_request, kref);
1771 dout("%s: img %p\n", __func__, img_request);
1773 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1774 rbd_img_obj_request_del(img_request, obj_request);
1775 rbd_assert(img_request->obj_request_count == 0);
1777 if (img_request_write_test(img_request))
1778 ceph_put_snap_context(img_request->snapc);
1780 if (img_request_child_test(img_request))
1781 rbd_obj_request_put(img_request->obj_request);
1786 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1788 struct rbd_img_request *img_request;
1789 unsigned int xferred;
1793 rbd_assert(obj_request_img_data_test(obj_request));
1794 img_request = obj_request->img_request;
1796 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1797 xferred = (unsigned int)obj_request->xferred;
1798 result = obj_request->result;
1800 struct rbd_device *rbd_dev = img_request->rbd_dev;
1802 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1803 img_request_write_test(img_request) ? "write" : "read",
1804 obj_request->length, obj_request->img_offset,
1805 obj_request->offset);
1806 rbd_warn(rbd_dev, " result %d xferred %x\n",
1808 if (!img_request->result)
1809 img_request->result = result;
1812 /* Image object requests don't own their page array */
1814 if (obj_request->type == OBJ_REQUEST_PAGES) {
1815 obj_request->pages = NULL;
1816 obj_request->page_count = 0;
1819 if (img_request_child_test(img_request)) {
1820 rbd_assert(img_request->obj_request != NULL);
1821 more = obj_request->which < img_request->obj_request_count - 1;
1823 rbd_assert(img_request->rq != NULL);
1824 more = blk_end_request(img_request->rq, result, xferred);
1830 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1832 struct rbd_img_request *img_request;
1833 u32 which = obj_request->which;
1836 rbd_assert(obj_request_img_data_test(obj_request));
1837 img_request = obj_request->img_request;
1839 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1840 rbd_assert(img_request != NULL);
1841 rbd_assert(img_request->obj_request_count > 0);
1842 rbd_assert(which != BAD_WHICH);
1843 rbd_assert(which < img_request->obj_request_count);
1844 rbd_assert(which >= img_request->next_completion);
1846 spin_lock_irq(&img_request->completion_lock);
1847 if (which != img_request->next_completion)
1850 for_each_obj_request_from(img_request, obj_request) {
1852 rbd_assert(which < img_request->obj_request_count);
1854 if (!obj_request_done_test(obj_request))
1856 more = rbd_img_obj_end_request(obj_request);
1860 rbd_assert(more ^ (which == img_request->obj_request_count));
1861 img_request->next_completion = which;
1863 spin_unlock_irq(&img_request->completion_lock);
1866 rbd_img_request_complete(img_request);
1870 * Split up an image request into one or more object requests, each
1871 * to a different object. The "type" parameter indicates whether
1872 * "data_desc" is the pointer to the head of a list of bio
1873 * structures, or the base of a page array. In either case this
1874 * function assumes data_desc describes memory sufficient to hold
1875 * all data described by the image request.
1877 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1878 enum obj_request_type type,
1881 struct rbd_device *rbd_dev = img_request->rbd_dev;
1882 struct rbd_obj_request *obj_request = NULL;
1883 struct rbd_obj_request *next_obj_request;
1884 bool write_request = img_request_write_test(img_request);
1885 struct bio *bio_list;
1886 unsigned int bio_offset = 0;
1887 struct page **pages;
1892 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1893 (int)type, data_desc);
1895 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1896 img_offset = img_request->offset;
1897 resid = img_request->length;
1898 rbd_assert(resid > 0);
1900 if (type == OBJ_REQUEST_BIO) {
1901 bio_list = data_desc;
1902 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1904 rbd_assert(type == OBJ_REQUEST_PAGES);
1909 struct ceph_osd_request *osd_req;
1910 const char *object_name;
1914 object_name = rbd_segment_name(rbd_dev, img_offset);
1917 offset = rbd_segment_offset(rbd_dev, img_offset);
1918 length = rbd_segment_length(rbd_dev, img_offset, resid);
1919 obj_request = rbd_obj_request_create(object_name,
1920 offset, length, type);
1921 kfree(object_name); /* object request has its own copy */
1925 if (type == OBJ_REQUEST_BIO) {
1926 unsigned int clone_size;
1928 rbd_assert(length <= (u64)UINT_MAX);
1929 clone_size = (unsigned int)length;
1930 obj_request->bio_list =
1931 bio_chain_clone_range(&bio_list,
1935 if (!obj_request->bio_list)
1938 unsigned int page_count;
1940 obj_request->pages = pages;
1941 page_count = (u32)calc_pages_for(offset, length);
1942 obj_request->page_count = page_count;
1943 if ((offset + length) & ~PAGE_MASK)
1944 page_count--; /* more on last page */
1945 pages += page_count;
1948 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1952 obj_request->osd_req = osd_req;
1953 obj_request->callback = rbd_img_obj_callback;
1955 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1957 if (type == OBJ_REQUEST_BIO)
1958 osd_req_op_extent_osd_data_bio(osd_req, 0,
1959 obj_request->bio_list, length);
1961 osd_req_op_extent_osd_data_pages(osd_req, 0,
1962 obj_request->pages, length,
1963 offset & ~PAGE_MASK, false, false);
1966 rbd_osd_req_format_write(obj_request);
1968 rbd_osd_req_format_read(obj_request);
1970 obj_request->img_offset = img_offset;
1971 rbd_img_obj_request_add(img_request, obj_request);
1973 img_offset += length;
1980 rbd_obj_request_put(obj_request);
1982 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1983 rbd_obj_request_put(obj_request);
1989 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1991 struct rbd_img_request *img_request;
1992 struct rbd_device *rbd_dev;
1996 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1997 rbd_assert(obj_request_img_data_test(obj_request));
1998 img_request = obj_request->img_request;
1999 rbd_assert(img_request);
2001 rbd_dev = img_request->rbd_dev;
2002 rbd_assert(rbd_dev);
2003 length = (u64)1 << rbd_dev->header.obj_order;
2004 page_count = (u32)calc_pages_for(0, length);
2006 rbd_assert(obj_request->copyup_pages);
2007 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2008 obj_request->copyup_pages = NULL;
2011 * We want the transfer count to reflect the size of the
2012 * original write request. There is no such thing as a
2013 * successful short write, so if the request was successful
2014 * we can just set it to the originally-requested length.
2016 if (!obj_request->result)
2017 obj_request->xferred = obj_request->length;
2019 /* Finish up with the normal image object callback */
2021 rbd_img_obj_callback(obj_request);
2025 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2027 struct rbd_obj_request *orig_request;
2028 struct ceph_osd_request *osd_req;
2029 struct ceph_osd_client *osdc;
2030 struct rbd_device *rbd_dev;
2031 struct page **pages;
2036 rbd_assert(img_request_child_test(img_request));
2038 /* First get what we need from the image request */
2040 pages = img_request->copyup_pages;
2041 rbd_assert(pages != NULL);
2042 img_request->copyup_pages = NULL;
2044 orig_request = img_request->obj_request;
2045 rbd_assert(orig_request != NULL);
2046 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2047 result = img_request->result;
2048 obj_size = img_request->length;
2049 xferred = img_request->xferred;
2051 rbd_dev = img_request->rbd_dev;
2052 rbd_assert(rbd_dev);
2053 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2055 rbd_img_request_put(img_request);
2060 /* Allocate the new copyup osd request for the original request */
2063 rbd_assert(!orig_request->osd_req);
2064 osd_req = rbd_osd_req_create_copyup(orig_request);
2067 orig_request->osd_req = osd_req;
2068 orig_request->copyup_pages = pages;
2070 /* Initialize the copyup op */
2072 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2073 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2076 /* Then the original write request op */
2078 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2079 orig_request->offset,
2080 orig_request->length, 0, 0);
2081 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2082 orig_request->length);
2084 rbd_osd_req_format_write(orig_request);
2086 /* All set, send it off. */
2088 orig_request->callback = rbd_img_obj_copyup_callback;
2089 osdc = &rbd_dev->rbd_client->client->osdc;
2090 result = rbd_obj_request_submit(osdc, orig_request);
2094 /* Record the error code and complete the request */
2096 orig_request->result = result;
2097 orig_request->xferred = 0;
2098 obj_request_done_set(orig_request);
2099 rbd_obj_request_complete(orig_request);
2103 * Read from the parent image the range of data that covers the
2104 * entire target of the given object request. This is used for
2105 * satisfying a layered image write request when the target of an
2106 * object request from the image request does not exist.
2108 * A page array big enough to hold the returned data is allocated
2109 * and supplied to rbd_img_request_fill() as the "data descriptor."
2110 * When the read completes, this page array will be transferred to
2111 * the original object request for the copyup operation.
2113 * If an error occurs, record it as the result of the original
2114 * object request and mark it done so it gets completed.
2116 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2118 struct rbd_img_request *img_request = NULL;
2119 struct rbd_img_request *parent_request = NULL;
2120 struct rbd_device *rbd_dev;
2123 struct page **pages = NULL;
2127 rbd_assert(obj_request_img_data_test(obj_request));
2128 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2130 img_request = obj_request->img_request;
2131 rbd_assert(img_request != NULL);
2132 rbd_dev = img_request->rbd_dev;
2133 rbd_assert(rbd_dev->parent != NULL);
2136 * First things first. The original osd request is of no
2137 * use to use any more, we'll need a new one that can hold
2138 * the two ops in a copyup request. We'll get that later,
2139 * but for now we can release the old one.
2141 rbd_osd_req_destroy(obj_request->osd_req);
2142 obj_request->osd_req = NULL;
2145 * Determine the byte range covered by the object in the
2146 * child image to which the original request was to be sent.
2148 img_offset = obj_request->img_offset - obj_request->offset;
2149 length = (u64)1 << rbd_dev->header.obj_order;
2152 * There is no defined parent data beyond the parent
2153 * overlap, so limit what we read at that boundary if
2156 if (img_offset + length > rbd_dev->parent_overlap) {
2157 rbd_assert(img_offset < rbd_dev->parent_overlap);
2158 length = rbd_dev->parent_overlap - img_offset;
2162 * Allocate a page array big enough to receive the data read
2165 page_count = (u32)calc_pages_for(0, length);
2166 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2167 if (IS_ERR(pages)) {
2168 result = PTR_ERR(pages);
2174 parent_request = rbd_img_request_create(rbd_dev->parent,
2177 if (!parent_request)
2179 rbd_obj_request_get(obj_request);
2180 parent_request->obj_request = obj_request;
2182 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2185 parent_request->copyup_pages = pages;
2187 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2188 result = rbd_img_request_submit(parent_request);
2192 parent_request->copyup_pages = NULL;
2193 parent_request->obj_request = NULL;
2194 rbd_obj_request_put(obj_request);
2197 ceph_release_page_vector(pages, page_count);
2199 rbd_img_request_put(parent_request);
2200 obj_request->result = result;
2201 obj_request->xferred = 0;
2202 obj_request_done_set(obj_request);
2207 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2209 struct rbd_obj_request *orig_request;
2212 rbd_assert(!obj_request_img_data_test(obj_request));
2215 * All we need from the object request is the original
2216 * request and the result of the STAT op. Grab those, then
2217 * we're done with the request.
2219 orig_request = obj_request->obj_request;
2220 obj_request->obj_request = NULL;
2221 rbd_assert(orig_request);
2222 rbd_assert(orig_request->img_request);
2224 result = obj_request->result;
2225 obj_request->result = 0;
2227 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2228 obj_request, orig_request, result,
2229 obj_request->xferred, obj_request->length);
2230 rbd_obj_request_put(obj_request);
2232 rbd_assert(orig_request);
2233 rbd_assert(orig_request->img_request);
2236 * Our only purpose here is to determine whether the object
2237 * exists, and we don't want to treat the non-existence as
2238 * an error. If something else comes back, transfer the
2239 * error to the original request and complete it now.
2242 obj_request_existence_set(orig_request, true);
2243 } else if (result == -ENOENT) {
2244 obj_request_existence_set(orig_request, false);
2245 } else if (result) {
2246 orig_request->result = result;
2251 * Resubmit the original request now that we have recorded
2252 * whether the target object exists.
2254 orig_request->result = rbd_img_obj_request_submit(orig_request);
2256 if (orig_request->result)
2257 rbd_obj_request_complete(orig_request);
2258 rbd_obj_request_put(orig_request);
2261 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2263 struct rbd_obj_request *stat_request;
2264 struct rbd_device *rbd_dev;
2265 struct ceph_osd_client *osdc;
2266 struct page **pages = NULL;
2272 * The response data for a STAT call consists of:
2279 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2280 page_count = (u32)calc_pages_for(0, size);
2281 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2283 return PTR_ERR(pages);
2286 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2291 rbd_obj_request_get(obj_request);
2292 stat_request->obj_request = obj_request;
2293 stat_request->pages = pages;
2294 stat_request->page_count = page_count;
2296 rbd_assert(obj_request->img_request);
2297 rbd_dev = obj_request->img_request->rbd_dev;
2298 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2300 if (!stat_request->osd_req)
2302 stat_request->callback = rbd_img_obj_exists_callback;
2304 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2305 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2307 rbd_osd_req_format_read(stat_request);
2309 osdc = &rbd_dev->rbd_client->client->osdc;
2310 ret = rbd_obj_request_submit(osdc, stat_request);
2313 rbd_obj_request_put(obj_request);
2318 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2320 struct rbd_img_request *img_request;
2321 struct rbd_device *rbd_dev;
2324 rbd_assert(obj_request_img_data_test(obj_request));
2326 img_request = obj_request->img_request;
2327 rbd_assert(img_request);
2328 rbd_dev = img_request->rbd_dev;
2331 * Only writes to layered images need special handling.
2332 * Reads and non-layered writes are simple object requests.
2333 * Layered writes that start beyond the end of the overlap
2334 * with the parent have no parent data, so they too are
2335 * simple object requests. Finally, if the target object is
2336 * known to already exist, its parent data has already been
2337 * copied, so a write to the object can also be handled as a
2338 * simple object request.
2340 if (!img_request_write_test(img_request) ||
2341 !img_request_layered_test(img_request) ||
2342 rbd_dev->parent_overlap <= obj_request->img_offset ||
2343 ((known = obj_request_known_test(obj_request)) &&
2344 obj_request_exists_test(obj_request))) {
2346 struct rbd_device *rbd_dev;
2347 struct ceph_osd_client *osdc;
2349 rbd_dev = obj_request->img_request->rbd_dev;
2350 osdc = &rbd_dev->rbd_client->client->osdc;
2352 return rbd_obj_request_submit(osdc, obj_request);
2356 * It's a layered write. The target object might exist but
2357 * we may not know that yet. If we know it doesn't exist,
2358 * start by reading the data for the full target object from
2359 * the parent so we can use it for a copyup to the target.
2362 return rbd_img_obj_parent_read_full(obj_request);
2364 /* We don't know whether the target exists. Go find out. */
2366 return rbd_img_obj_exists_submit(obj_request);
2369 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2371 struct rbd_obj_request *obj_request;
2372 struct rbd_obj_request *next_obj_request;
2374 dout("%s: img %p\n", __func__, img_request);
2375 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2378 ret = rbd_img_obj_request_submit(obj_request);
2386 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2388 struct rbd_obj_request *obj_request;
2389 struct rbd_device *rbd_dev;
2392 rbd_assert(img_request_child_test(img_request));
2394 obj_request = img_request->obj_request;
2395 rbd_assert(obj_request);
2396 rbd_assert(obj_request->img_request);
2398 obj_request->result = img_request->result;
2399 if (obj_request->result)
2403 * We need to zero anything beyond the parent overlap
2404 * boundary. Since rbd_img_obj_request_read_callback()
2405 * will zero anything beyond the end of a short read, an
2406 * easy way to do this is to pretend the data from the
2407 * parent came up short--ending at the overlap boundary.
2409 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2410 obj_end = obj_request->img_offset + obj_request->length;
2411 rbd_dev = obj_request->img_request->rbd_dev;
2412 if (obj_end > rbd_dev->parent_overlap) {
2415 if (obj_request->img_offset < rbd_dev->parent_overlap)
2416 xferred = rbd_dev->parent_overlap -
2417 obj_request->img_offset;
2419 obj_request->xferred = min(img_request->xferred, xferred);
2421 obj_request->xferred = img_request->xferred;
2424 rbd_img_obj_request_read_callback(obj_request);
2425 rbd_obj_request_complete(obj_request);
2428 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2430 struct rbd_device *rbd_dev;
2431 struct rbd_img_request *img_request;
2434 rbd_assert(obj_request_img_data_test(obj_request));
2435 rbd_assert(obj_request->img_request != NULL);
2436 rbd_assert(obj_request->result == (s32) -ENOENT);
2437 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2439 rbd_dev = obj_request->img_request->rbd_dev;
2440 rbd_assert(rbd_dev->parent != NULL);
2441 /* rbd_read_finish(obj_request, obj_request->length); */
2442 img_request = rbd_img_request_create(rbd_dev->parent,
2443 obj_request->img_offset,
2444 obj_request->length,
2450 rbd_obj_request_get(obj_request);
2451 img_request->obj_request = obj_request;
2453 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2454 obj_request->bio_list);
2458 img_request->callback = rbd_img_parent_read_callback;
2459 result = rbd_img_request_submit(img_request);
2466 rbd_img_request_put(img_request);
2467 obj_request->result = result;
2468 obj_request->xferred = 0;
2469 obj_request_done_set(obj_request);
2472 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2474 struct rbd_obj_request *obj_request;
2475 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2478 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2479 OBJ_REQUEST_NODATA);
2484 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2485 if (!obj_request->osd_req)
2487 obj_request->callback = rbd_obj_request_put;
2489 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2491 rbd_osd_req_format_read(obj_request);
2493 ret = rbd_obj_request_submit(osdc, obj_request);
2496 rbd_obj_request_put(obj_request);
2501 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2503 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2508 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2509 rbd_dev->header_name, (unsigned long long)notify_id,
2510 (unsigned int)opcode);
2511 (void)rbd_dev_refresh(rbd_dev);
2513 rbd_obj_notify_ack(rbd_dev, notify_id);
2517 * Request sync osd watch/unwatch. The value of "start" determines
2518 * whether a watch request is being initiated or torn down.
2520 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2522 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2523 struct rbd_obj_request *obj_request;
2526 rbd_assert(start ^ !!rbd_dev->watch_event);
2527 rbd_assert(start ^ !!rbd_dev->watch_request);
2530 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2531 &rbd_dev->watch_event);
2534 rbd_assert(rbd_dev->watch_event != NULL);
2538 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2539 OBJ_REQUEST_NODATA);
2543 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2544 if (!obj_request->osd_req)
2548 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2550 ceph_osdc_unregister_linger_request(osdc,
2551 rbd_dev->watch_request->osd_req);
2553 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2554 rbd_dev->watch_event->cookie, 0, start);
2555 rbd_osd_req_format_write(obj_request);
2557 ret = rbd_obj_request_submit(osdc, obj_request);
2560 ret = rbd_obj_request_wait(obj_request);
2563 ret = obj_request->result;
2568 * A watch request is set to linger, so the underlying osd
2569 * request won't go away until we unregister it. We retain
2570 * a pointer to the object request during that time (in
2571 * rbd_dev->watch_request), so we'll keep a reference to
2572 * it. We'll drop that reference (below) after we've
2576 rbd_dev->watch_request = obj_request;
2581 /* We have successfully torn down the watch request */
2583 rbd_obj_request_put(rbd_dev->watch_request);
2584 rbd_dev->watch_request = NULL;
2586 /* Cancel the event if we're tearing down, or on error */
2587 ceph_osdc_cancel_event(rbd_dev->watch_event);
2588 rbd_dev->watch_event = NULL;
2590 rbd_obj_request_put(obj_request);
2596 * Synchronous osd object method call. Returns the number of bytes
2597 * returned in the outbound buffer, or a negative error code.
2599 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2600 const char *object_name,
2601 const char *class_name,
2602 const char *method_name,
2603 const void *outbound,
2604 size_t outbound_size,
2606 size_t inbound_size)
2608 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2609 struct rbd_obj_request *obj_request;
2610 struct page **pages;
2615 * Method calls are ultimately read operations. The result
2616 * should placed into the inbound buffer provided. They
2617 * also supply outbound data--parameters for the object
2618 * method. Currently if this is present it will be a
2621 page_count = (u32)calc_pages_for(0, inbound_size);
2622 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2624 return PTR_ERR(pages);
2627 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2632 obj_request->pages = pages;
2633 obj_request->page_count = page_count;
2635 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2636 if (!obj_request->osd_req)
2639 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2640 class_name, method_name);
2641 if (outbound_size) {
2642 struct ceph_pagelist *pagelist;
2644 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2648 ceph_pagelist_init(pagelist);
2649 ceph_pagelist_append(pagelist, outbound, outbound_size);
2650 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2653 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2654 obj_request->pages, inbound_size,
2656 rbd_osd_req_format_read(obj_request);
2658 ret = rbd_obj_request_submit(osdc, obj_request);
2661 ret = rbd_obj_request_wait(obj_request);
2665 ret = obj_request->result;
2669 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2670 ret = (int)obj_request->xferred;
2671 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2674 rbd_obj_request_put(obj_request);
2676 ceph_release_page_vector(pages, page_count);
2681 static void rbd_request_fn(struct request_queue *q)
2682 __releases(q->queue_lock) __acquires(q->queue_lock)
2684 struct rbd_device *rbd_dev = q->queuedata;
2685 bool read_only = rbd_dev->mapping.read_only;
2689 while ((rq = blk_fetch_request(q))) {
2690 bool write_request = rq_data_dir(rq) == WRITE;
2691 struct rbd_img_request *img_request;
2695 /* Ignore any non-FS requests that filter through. */
2697 if (rq->cmd_type != REQ_TYPE_FS) {
2698 dout("%s: non-fs request type %d\n", __func__,
2699 (int) rq->cmd_type);
2700 __blk_end_request_all(rq, 0);
2704 /* Ignore/skip any zero-length requests */
2706 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2707 length = (u64) blk_rq_bytes(rq);
2710 dout("%s: zero-length request\n", __func__);
2711 __blk_end_request_all(rq, 0);
2715 spin_unlock_irq(q->queue_lock);
2717 /* Disallow writes to a read-only device */
2719 if (write_request) {
2723 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2727 * Quit early if the mapped snapshot no longer
2728 * exists. It's still possible the snapshot will
2729 * have disappeared by the time our request arrives
2730 * at the osd, but there's no sense in sending it if
2733 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2734 dout("request for non-existent snapshot");
2735 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2741 if (offset && length > U64_MAX - offset + 1) {
2742 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2744 goto end_request; /* Shouldn't happen */
2748 img_request = rbd_img_request_create(rbd_dev, offset, length,
2749 write_request, false);
2753 img_request->rq = rq;
2755 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2758 result = rbd_img_request_submit(img_request);
2760 rbd_img_request_put(img_request);
2762 spin_lock_irq(q->queue_lock);
2764 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2765 write_request ? "write" : "read",
2766 length, offset, result);
2768 __blk_end_request_all(rq, result);
2774 * a queue callback. Makes sure that we don't create a bio that spans across
2775 * multiple osd objects. One exception would be with a single page bios,
2776 * which we handle later at bio_chain_clone_range()
2778 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2779 struct bio_vec *bvec)
2781 struct rbd_device *rbd_dev = q->queuedata;
2782 sector_t sector_offset;
2783 sector_t sectors_per_obj;
2784 sector_t obj_sector_offset;
2788 * Find how far into its rbd object the partition-relative
2789 * bio start sector is to offset relative to the enclosing
2792 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2793 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2794 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2797 * Compute the number of bytes from that offset to the end
2798 * of the object. Account for what's already used by the bio.
2800 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2801 if (ret > bmd->bi_size)
2802 ret -= bmd->bi_size;
2807 * Don't send back more than was asked for. And if the bio
2808 * was empty, let the whole thing through because: "Note
2809 * that a block device *must* allow a single page to be
2810 * added to an empty bio."
2812 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2813 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2814 ret = (int) bvec->bv_len;
2819 static void rbd_free_disk(struct rbd_device *rbd_dev)
2821 struct gendisk *disk = rbd_dev->disk;
2826 rbd_dev->disk = NULL;
2827 if (disk->flags & GENHD_FL_UP) {
2830 blk_cleanup_queue(disk->queue);
2835 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2836 const char *object_name,
2837 u64 offset, u64 length, void *buf)
2840 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2841 struct rbd_obj_request *obj_request;
2842 struct page **pages = NULL;
2847 page_count = (u32) calc_pages_for(offset, length);
2848 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2850 ret = PTR_ERR(pages);
2853 obj_request = rbd_obj_request_create(object_name, offset, length,
2858 obj_request->pages = pages;
2859 obj_request->page_count = page_count;
2861 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2862 if (!obj_request->osd_req)
2865 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2866 offset, length, 0, 0);
2867 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2869 obj_request->length,
2870 obj_request->offset & ~PAGE_MASK,
2872 rbd_osd_req_format_read(obj_request);
2874 ret = rbd_obj_request_submit(osdc, obj_request);
2877 ret = rbd_obj_request_wait(obj_request);
2881 ret = obj_request->result;
2885 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2886 size = (size_t) obj_request->xferred;
2887 ceph_copy_from_page_vector(pages, buf, 0, size);
2888 rbd_assert(size <= (size_t)INT_MAX);
2892 rbd_obj_request_put(obj_request);
2894 ceph_release_page_vector(pages, page_count);
2900 * Read the complete header for the given rbd device.
2902 * Returns a pointer to a dynamically-allocated buffer containing
2903 * the complete and validated header. Caller can pass the address
2904 * of a variable that will be filled in with the version of the
2905 * header object at the time it was read.
2907 * Returns a pointer-coded errno if a failure occurs.
2909 static struct rbd_image_header_ondisk *
2910 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2912 struct rbd_image_header_ondisk *ondisk = NULL;
2919 * The complete header will include an array of its 64-bit
2920 * snapshot ids, followed by the names of those snapshots as
2921 * a contiguous block of NUL-terminated strings. Note that
2922 * the number of snapshots could change by the time we read
2923 * it in, in which case we re-read it.
2930 size = sizeof (*ondisk);
2931 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2933 ondisk = kmalloc(size, GFP_KERNEL);
2935 return ERR_PTR(-ENOMEM);
2937 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2941 if ((size_t)ret < size) {
2943 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2947 if (!rbd_dev_ondisk_valid(ondisk)) {
2949 rbd_warn(rbd_dev, "invalid header");
2953 names_size = le64_to_cpu(ondisk->snap_names_len);
2954 want_count = snap_count;
2955 snap_count = le32_to_cpu(ondisk->snap_count);
2956 } while (snap_count != want_count);
2963 return ERR_PTR(ret);
2967 * reload the ondisk the header
2969 static int rbd_read_header(struct rbd_device *rbd_dev,
2970 struct rbd_image_header *header)
2972 struct rbd_image_header_ondisk *ondisk;
2975 ondisk = rbd_dev_v1_header_read(rbd_dev);
2977 return PTR_ERR(ondisk);
2978 ret = rbd_header_from_disk(header, ondisk);
2984 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2986 struct rbd_snap *snap;
2987 struct rbd_snap *next;
2989 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2990 list_del(&snap->node);
2991 rbd_snap_destroy(snap);
2995 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2997 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3000 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3003 rbd_dev->mapping.size = rbd_dev->header.image_size;
3004 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3005 dout("setting size to %llu sectors", (unsigned long long)size);
3006 set_capacity(rbd_dev->disk, size);
3011 * only read the first part of the ondisk header, without the snaps info
3013 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3016 struct rbd_image_header h;
3018 ret = rbd_read_header(rbd_dev, &h);
3022 down_write(&rbd_dev->header_rwsem);
3024 /* Update image size, and check for resize of mapped image */
3025 rbd_dev->header.image_size = h.image_size;
3026 rbd_update_mapping_size(rbd_dev);
3028 /* rbd_dev->header.object_prefix shouldn't change */
3029 kfree(rbd_dev->header.snap_sizes);
3030 kfree(rbd_dev->header.snap_names);
3031 /* osd requests may still refer to snapc */
3032 ceph_put_snap_context(rbd_dev->header.snapc);
3034 rbd_dev->header.image_size = h.image_size;
3035 rbd_dev->header.snapc = h.snapc;
3036 rbd_dev->header.snap_names = h.snap_names;
3037 rbd_dev->header.snap_sizes = h.snap_sizes;
3038 /* Free the extra copy of the object prefix */
3039 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3040 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3041 kfree(h.object_prefix);
3043 ret = rbd_dev_snaps_update(rbd_dev);
3045 up_write(&rbd_dev->header_rwsem);
3050 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3055 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3056 image_size = rbd_dev->header.image_size;
3057 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3058 if (rbd_dev->image_format == 1)
3059 ret = rbd_dev_v1_refresh(rbd_dev);
3061 ret = rbd_dev_v2_refresh(rbd_dev);
3062 mutex_unlock(&ctl_mutex);
3064 rbd_warn(rbd_dev, "got notification but failed to "
3065 " update snaps: %d\n", ret);
3066 if (image_size != rbd_dev->header.image_size)
3067 revalidate_disk(rbd_dev->disk);
3072 static int rbd_init_disk(struct rbd_device *rbd_dev)
3074 struct gendisk *disk;
3075 struct request_queue *q;
3078 /* create gendisk info */
3079 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3083 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3085 disk->major = rbd_dev->major;
3086 disk->first_minor = 0;
3087 disk->fops = &rbd_bd_ops;
3088 disk->private_data = rbd_dev;
3090 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3094 /* We use the default size, but let's be explicit about it. */
3095 blk_queue_physical_block_size(q, SECTOR_SIZE);
3097 /* set io sizes to object size */
3098 segment_size = rbd_obj_bytes(&rbd_dev->header);
3099 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3100 blk_queue_max_segment_size(q, segment_size);
3101 blk_queue_io_min(q, segment_size);
3102 blk_queue_io_opt(q, segment_size);
3104 blk_queue_merge_bvec(q, rbd_merge_bvec);
3107 q->queuedata = rbd_dev;
3109 rbd_dev->disk = disk;
3122 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3124 return container_of(dev, struct rbd_device, dev);
3127 static ssize_t rbd_size_show(struct device *dev,
3128 struct device_attribute *attr, char *buf)
3130 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3132 return sprintf(buf, "%llu\n",
3133 (unsigned long long)rbd_dev->mapping.size);
3137 * Note this shows the features for whatever's mapped, which is not
3138 * necessarily the base image.
3140 static ssize_t rbd_features_show(struct device *dev,
3141 struct device_attribute *attr, char *buf)
3143 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3145 return sprintf(buf, "0x%016llx\n",
3146 (unsigned long long)rbd_dev->mapping.features);
3149 static ssize_t rbd_major_show(struct device *dev,
3150 struct device_attribute *attr, char *buf)
3152 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3155 return sprintf(buf, "%d\n", rbd_dev->major);
3157 return sprintf(buf, "(none)\n");
3161 static ssize_t rbd_client_id_show(struct device *dev,
3162 struct device_attribute *attr, char *buf)
3164 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3166 return sprintf(buf, "client%lld\n",
3167 ceph_client_id(rbd_dev->rbd_client->client));
3170 static ssize_t rbd_pool_show(struct device *dev,
3171 struct device_attribute *attr, char *buf)
3173 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3175 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3178 static ssize_t rbd_pool_id_show(struct device *dev,
3179 struct device_attribute *attr, char *buf)
3181 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3183 return sprintf(buf, "%llu\n",
3184 (unsigned long long) rbd_dev->spec->pool_id);
3187 static ssize_t rbd_name_show(struct device *dev,
3188 struct device_attribute *attr, char *buf)
3190 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3192 if (rbd_dev->spec->image_name)
3193 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3195 return sprintf(buf, "(unknown)\n");
3198 static ssize_t rbd_image_id_show(struct device *dev,
3199 struct device_attribute *attr, char *buf)
3201 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3203 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3207 * Shows the name of the currently-mapped snapshot (or
3208 * RBD_SNAP_HEAD_NAME for the base image).
3210 static ssize_t rbd_snap_show(struct device *dev,
3211 struct device_attribute *attr,
3214 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3216 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3220 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3221 * for the parent image. If there is no parent, simply shows
3222 * "(no parent image)".
3224 static ssize_t rbd_parent_show(struct device *dev,
3225 struct device_attribute *attr,
3228 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3229 struct rbd_spec *spec = rbd_dev->parent_spec;
3234 return sprintf(buf, "(no parent image)\n");
3236 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3237 (unsigned long long) spec->pool_id, spec->pool_name);
3242 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3243 spec->image_name ? spec->image_name : "(unknown)");
3248 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3249 (unsigned long long) spec->snap_id, spec->snap_name);
3254 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3259 return (ssize_t) (bufp - buf);
3262 static ssize_t rbd_image_refresh(struct device *dev,
3263 struct device_attribute *attr,
3267 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3270 ret = rbd_dev_refresh(rbd_dev);
3272 return ret < 0 ? ret : size;
3275 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3276 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3277 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3278 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3279 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3280 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3281 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3282 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3283 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3284 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3285 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3287 static struct attribute *rbd_attrs[] = {
3288 &dev_attr_size.attr,
3289 &dev_attr_features.attr,
3290 &dev_attr_major.attr,
3291 &dev_attr_client_id.attr,
3292 &dev_attr_pool.attr,
3293 &dev_attr_pool_id.attr,
3294 &dev_attr_name.attr,
3295 &dev_attr_image_id.attr,
3296 &dev_attr_current_snap.attr,
3297 &dev_attr_parent.attr,
3298 &dev_attr_refresh.attr,
3302 static struct attribute_group rbd_attr_group = {
3306 static const struct attribute_group *rbd_attr_groups[] = {
3311 static void rbd_sysfs_dev_release(struct device *dev)
3315 static struct device_type rbd_device_type = {
3317 .groups = rbd_attr_groups,
3318 .release = rbd_sysfs_dev_release,
3321 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3323 kref_get(&spec->kref);
3328 static void rbd_spec_free(struct kref *kref);
3329 static void rbd_spec_put(struct rbd_spec *spec)
3332 kref_put(&spec->kref, rbd_spec_free);
3335 static struct rbd_spec *rbd_spec_alloc(void)
3337 struct rbd_spec *spec;
3339 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3342 kref_init(&spec->kref);
3347 static void rbd_spec_free(struct kref *kref)
3349 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3351 kfree(spec->pool_name);
3352 kfree(spec->image_id);
3353 kfree(spec->image_name);
3354 kfree(spec->snap_name);
3358 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3359 struct rbd_spec *spec)
3361 struct rbd_device *rbd_dev;
3363 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3367 spin_lock_init(&rbd_dev->lock);
3369 INIT_LIST_HEAD(&rbd_dev->node);
3370 INIT_LIST_HEAD(&rbd_dev->snaps);
3371 init_rwsem(&rbd_dev->header_rwsem);
3373 rbd_dev->spec = spec;
3374 rbd_dev->rbd_client = rbdc;
3376 /* Initialize the layout used for all rbd requests */
3378 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3379 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3380 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3381 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3386 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3388 rbd_put_client(rbd_dev->rbd_client);
3389 rbd_spec_put(rbd_dev->spec);
3393 static void rbd_snap_destroy(struct rbd_snap *snap)
3399 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3400 const char *snap_name,
3401 u64 snap_id, u64 snap_size,
3404 struct rbd_snap *snap;
3406 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3408 return ERR_PTR(-ENOMEM);
3410 snap->name = snap_name;
3412 snap->size = snap_size;
3413 snap->features = snap_features;
3419 * Returns a dynamically-allocated snapshot name if successful, or a
3420 * pointer-coded error otherwise.
3422 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3423 u64 *snap_size, u64 *snap_features)
3425 const char *snap_name;
3428 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3430 /* Skip over names until we find the one we are looking for */
3432 snap_name = rbd_dev->header.snap_names;
3433 for (i = 0; i < which; i++)
3434 snap_name += strlen(snap_name) + 1;
3436 snap_name = kstrdup(snap_name, GFP_KERNEL);
3438 return ERR_PTR(-ENOMEM);
3440 *snap_size = rbd_dev->header.snap_sizes[which];
3441 *snap_features = 0; /* No features for v1 */
3447 * Get the size and object order for an image snapshot, or if
3448 * snap_id is CEPH_NOSNAP, gets this information for the base
3451 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3452 u8 *order, u64 *snap_size)
3454 __le64 snapid = cpu_to_le64(snap_id);
3459 } __attribute__ ((packed)) size_buf = { 0 };
3461 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3463 &snapid, sizeof (snapid),
3464 &size_buf, sizeof (size_buf));
3465 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3468 if (ret < sizeof (size_buf))
3472 *order = size_buf.order;
3473 *snap_size = le64_to_cpu(size_buf.size);
3475 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3476 (unsigned long long)snap_id, (unsigned int)*order,
3477 (unsigned long long)*snap_size);
3482 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3484 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3485 &rbd_dev->header.obj_order,
3486 &rbd_dev->header.image_size);
3489 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3495 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3499 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3500 "rbd", "get_object_prefix", NULL, 0,
3501 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3502 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3507 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3508 p + ret, NULL, GFP_NOIO);
3511 if (IS_ERR(rbd_dev->header.object_prefix)) {
3512 ret = PTR_ERR(rbd_dev->header.object_prefix);
3513 rbd_dev->header.object_prefix = NULL;
3515 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3523 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3526 __le64 snapid = cpu_to_le64(snap_id);
3530 } __attribute__ ((packed)) features_buf = { 0 };
3534 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3535 "rbd", "get_features",
3536 &snapid, sizeof (snapid),
3537 &features_buf, sizeof (features_buf));
3538 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3541 if (ret < sizeof (features_buf))
3544 incompat = le64_to_cpu(features_buf.incompat);
3545 if (incompat & ~RBD_FEATURES_SUPPORTED)
3548 *snap_features = le64_to_cpu(features_buf.features);
3550 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3551 (unsigned long long)snap_id,
3552 (unsigned long long)*snap_features,
3553 (unsigned long long)le64_to_cpu(features_buf.incompat));
3558 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3560 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3561 &rbd_dev->header.features);
3564 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3566 struct rbd_spec *parent_spec;
3568 void *reply_buf = NULL;
3576 parent_spec = rbd_spec_alloc();
3580 size = sizeof (__le64) + /* pool_id */
3581 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3582 sizeof (__le64) + /* snap_id */
3583 sizeof (__le64); /* overlap */
3584 reply_buf = kmalloc(size, GFP_KERNEL);
3590 snapid = cpu_to_le64(CEPH_NOSNAP);
3591 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3592 "rbd", "get_parent",
3593 &snapid, sizeof (snapid),
3595 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3600 end = reply_buf + ret;
3602 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3603 if (parent_spec->pool_id == CEPH_NOPOOL)
3604 goto out; /* No parent? No problem. */
3606 /* The ceph file layout needs to fit pool id in 32 bits */
3609 if (parent_spec->pool_id > (u64)U32_MAX) {
3610 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3611 (unsigned long long)parent_spec->pool_id, U32_MAX);
3615 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3616 if (IS_ERR(image_id)) {
3617 ret = PTR_ERR(image_id);
3620 parent_spec->image_id = image_id;
3621 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3622 ceph_decode_64_safe(&p, end, overlap, out_err);
3624 rbd_dev->parent_overlap = overlap;
3625 rbd_dev->parent_spec = parent_spec;
3626 parent_spec = NULL; /* rbd_dev now owns this */
3631 rbd_spec_put(parent_spec);
3636 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3640 __le64 stripe_count;
3641 } __attribute__ ((packed)) striping_info_buf = { 0 };
3642 size_t size = sizeof (striping_info_buf);
3649 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3650 "rbd", "get_stripe_unit_count", NULL, 0,
3651 (char *)&striping_info_buf, size);
3652 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3659 * We don't actually support the "fancy striping" feature
3660 * (STRIPINGV2) yet, but if the striping sizes are the
3661 * defaults the behavior is the same as before. So find
3662 * out, and only fail if the image has non-default values.
3665 obj_size = (u64)1 << rbd_dev->header.obj_order;
3666 p = &striping_info_buf;
3667 stripe_unit = ceph_decode_64(&p);
3668 if (stripe_unit != obj_size) {
3669 rbd_warn(rbd_dev, "unsupported stripe unit "
3670 "(got %llu want %llu)",
3671 stripe_unit, obj_size);
3674 stripe_count = ceph_decode_64(&p);
3675 if (stripe_count != 1) {
3676 rbd_warn(rbd_dev, "unsupported stripe count "
3677 "(got %llu want 1)", stripe_count);
3680 rbd_dev->header.stripe_unit = stripe_unit;
3681 rbd_dev->header.stripe_count = stripe_count;
3686 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3688 size_t image_id_size;
3693 void *reply_buf = NULL;
3695 char *image_name = NULL;
3698 rbd_assert(!rbd_dev->spec->image_name);
3700 len = strlen(rbd_dev->spec->image_id);
3701 image_id_size = sizeof (__le32) + len;
3702 image_id = kmalloc(image_id_size, GFP_KERNEL);
3707 end = image_id + image_id_size;
3708 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3710 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3711 reply_buf = kmalloc(size, GFP_KERNEL);
3715 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3716 "rbd", "dir_get_name",
3717 image_id, image_id_size,
3722 end = reply_buf + ret;
3724 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3725 if (IS_ERR(image_name))
3728 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3737 * When an rbd image has a parent image, it is identified by the
3738 * pool, image, and snapshot ids (not names). This function fills
3739 * in the names for those ids. (It's OK if we can't figure out the
3740 * name for an image id, but the pool and snapshot ids should always
3741 * exist and have names.) All names in an rbd spec are dynamically
3744 * When an image being mapped (not a parent) is probed, we have the
3745 * pool name and pool id, image name and image id, and the snapshot
3746 * name. The only thing we're missing is the snapshot id.
3748 * The set of snapshots for an image is not known until they have
3749 * been read by rbd_dev_snaps_update(), so we can't completely fill
3750 * in this information until after that has been called.
3752 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3754 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3755 struct rbd_spec *spec = rbd_dev->spec;
3756 const char *pool_name;
3757 const char *image_name;
3758 const char *snap_name;
3762 * An image being mapped will have the pool name (etc.), but
3763 * we need to look up the snapshot id.
3765 if (spec->pool_name) {
3766 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3767 struct rbd_snap *snap;
3769 snap = snap_by_name(rbd_dev, spec->snap_name);
3772 spec->snap_id = snap->id;
3774 spec->snap_id = CEPH_NOSNAP;
3780 /* Get the pool name; we have to make our own copy of this */
3782 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3784 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3787 pool_name = kstrdup(pool_name, GFP_KERNEL);
3791 /* Fetch the image name; tolerate failure here */
3793 image_name = rbd_dev_image_name(rbd_dev);
3795 rbd_warn(rbd_dev, "unable to get image name");
3797 /* Look up the snapshot name, and make a copy */
3799 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3801 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3805 snap_name = kstrdup(snap_name, GFP_KERNEL);
3811 spec->pool_name = pool_name;
3812 spec->image_name = image_name;
3813 spec->snap_name = snap_name;
3823 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3832 struct ceph_snap_context *snapc;
3836 * We'll need room for the seq value (maximum snapshot id),
3837 * snapshot count, and array of that many snapshot ids.
3838 * For now we have a fixed upper limit on the number we're
3839 * prepared to receive.
3841 size = sizeof (__le64) + sizeof (__le32) +
3842 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3843 reply_buf = kzalloc(size, GFP_KERNEL);
3847 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3848 "rbd", "get_snapcontext", NULL, 0,
3850 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3855 end = reply_buf + ret;
3857 ceph_decode_64_safe(&p, end, seq, out);
3858 ceph_decode_32_safe(&p, end, snap_count, out);
3861 * Make sure the reported number of snapshot ids wouldn't go
3862 * beyond the end of our buffer. But before checking that,
3863 * make sure the computed size of the snapshot context we
3864 * allocate is representable in a size_t.
3866 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3871 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3875 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3881 for (i = 0; i < snap_count; i++)
3882 snapc->snaps[i] = ceph_decode_64(&p);
3884 rbd_dev->header.snapc = snapc;
3886 dout(" snap context seq = %llu, snap_count = %u\n",
3887 (unsigned long long)seq, (unsigned int)snap_count);
3894 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3904 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3905 reply_buf = kmalloc(size, GFP_KERNEL);
3907 return ERR_PTR(-ENOMEM);
3909 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3910 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3911 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3912 "rbd", "get_snapshot_name",
3913 &snap_id, sizeof (snap_id),
3915 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3917 snap_name = ERR_PTR(ret);
3922 end = reply_buf + ret;
3923 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3924 if (IS_ERR(snap_name))
3927 dout(" snap_id 0x%016llx snap_name = %s\n",
3928 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3935 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3936 u64 *snap_size, u64 *snap_features)
3941 const char *snap_name;
3944 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3945 snap_id = rbd_dev->header.snapc->snaps[which];
3946 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3950 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3954 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3955 if (!IS_ERR(snap_name)) {
3957 *snap_features = features;
3962 return ERR_PTR(ret);
3965 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3966 u64 *snap_size, u64 *snap_features)
3968 if (rbd_dev->image_format == 1)
3969 return rbd_dev_v1_snap_info(rbd_dev, which,
3970 snap_size, snap_features);
3971 if (rbd_dev->image_format == 2)
3972 return rbd_dev_v2_snap_info(rbd_dev, which,
3973 snap_size, snap_features);
3974 return ERR_PTR(-EINVAL);
3977 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
3981 down_write(&rbd_dev->header_rwsem);
3983 ret = rbd_dev_v2_image_size(rbd_dev);
3986 rbd_update_mapping_size(rbd_dev);
3988 ret = rbd_dev_v2_snap_context(rbd_dev);
3989 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3992 ret = rbd_dev_snaps_update(rbd_dev);
3993 dout("rbd_dev_snaps_update returned %d\n", ret);
3997 up_write(&rbd_dev->header_rwsem);
4003 * Scan the rbd device's current snapshot list and compare it to the
4004 * newly-received snapshot context. Remove any existing snapshots
4005 * not present in the new snapshot context. Add a new snapshot for
4006 * any snaphots in the snapshot context not in the current list.
4007 * And verify there are no changes to snapshots we already know
4010 * Assumes the snapshots in the snapshot context are sorted by
4011 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4012 * are also maintained in that order.)
4014 * Note that any error occurs while updating the snapshot list
4015 * aborts the update, and the entire list is cleared. The snapshot
4016 * list becomes inconsistent at that point anyway, so it might as
4019 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4021 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4022 const u32 snap_count = snapc->num_snaps;
4023 struct list_head *head = &rbd_dev->snaps;
4024 struct list_head *links = head->next;
4028 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4029 while (index < snap_count || links != head) {
4031 struct rbd_snap *snap;
4032 const char *snap_name;
4034 u64 snap_features = 0;
4036 snap_id = index < snap_count ? snapc->snaps[index]
4038 snap = links != head ? list_entry(links, struct rbd_snap, node)
4040 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4042 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4043 struct list_head *next = links->next;
4046 * A previously-existing snapshot is not in
4047 * the new snap context.
4049 * If the now-missing snapshot is the one
4050 * the image represents, clear its existence
4051 * flag so we can avoid sending any more
4054 if (rbd_dev->spec->snap_id == snap->id)
4055 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4056 dout("removing %ssnap id %llu\n",
4057 rbd_dev->spec->snap_id == snap->id ?
4059 (unsigned long long)snap->id);
4061 list_del(&snap->node);
4062 rbd_snap_destroy(snap);
4064 /* Done with this list entry; advance */
4070 snap_name = rbd_dev_snap_info(rbd_dev, index,
4071 &snap_size, &snap_features);
4072 if (IS_ERR(snap_name)) {
4073 ret = PTR_ERR(snap_name);
4074 dout("failed to get snap info, error %d\n", ret);
4078 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4079 (unsigned long long)snap_id);
4080 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4081 struct rbd_snap *new_snap;
4083 /* We haven't seen this snapshot before */
4085 new_snap = rbd_snap_create(rbd_dev, snap_name,
4086 snap_id, snap_size, snap_features);
4087 if (IS_ERR(new_snap)) {
4088 ret = PTR_ERR(new_snap);
4089 dout(" failed to add dev, error %d\n", ret);
4093 /* New goes before existing, or at end of list */
4095 dout(" added dev%s\n", snap ? "" : " at end\n");
4097 list_add_tail(&new_snap->node, &snap->node);
4099 list_add_tail(&new_snap->node, head);
4101 /* Already have this one */
4103 dout(" already present\n");
4105 rbd_assert(snap->size == snap_size);
4106 rbd_assert(!strcmp(snap->name, snap_name));
4107 rbd_assert(snap->features == snap_features);
4109 /* Done with this list entry; advance */
4111 links = links->next;
4114 /* Advance to the next entry in the snapshot context */
4118 dout("%s: done\n", __func__);
4122 rbd_remove_all_snaps(rbd_dev);
4127 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4132 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4134 dev = &rbd_dev->dev;
4135 dev->bus = &rbd_bus_type;
4136 dev->type = &rbd_device_type;
4137 dev->parent = &rbd_root_dev;
4138 dev->release = rbd_dev_device_release;
4139 dev_set_name(dev, "%d", rbd_dev->dev_id);
4140 ret = device_register(dev);
4142 mutex_unlock(&ctl_mutex);
4147 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4149 device_unregister(&rbd_dev->dev);
4152 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4155 * Get a unique rbd identifier for the given new rbd_dev, and add
4156 * the rbd_dev to the global list. The minimum rbd id is 1.
4158 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4160 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4162 spin_lock(&rbd_dev_list_lock);
4163 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4164 spin_unlock(&rbd_dev_list_lock);
4165 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4166 (unsigned long long) rbd_dev->dev_id);
4170 * Remove an rbd_dev from the global list, and record that its
4171 * identifier is no longer in use.
4173 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4175 struct list_head *tmp;
4176 int rbd_id = rbd_dev->dev_id;
4179 rbd_assert(rbd_id > 0);
4181 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4182 (unsigned long long) rbd_dev->dev_id);
4183 spin_lock(&rbd_dev_list_lock);
4184 list_del_init(&rbd_dev->node);
4187 * If the id being "put" is not the current maximum, there
4188 * is nothing special we need to do.
4190 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4191 spin_unlock(&rbd_dev_list_lock);
4196 * We need to update the current maximum id. Search the
4197 * list to find out what it is. We're more likely to find
4198 * the maximum at the end, so search the list backward.
4201 list_for_each_prev(tmp, &rbd_dev_list) {
4202 struct rbd_device *rbd_dev;
4204 rbd_dev = list_entry(tmp, struct rbd_device, node);
4205 if (rbd_dev->dev_id > max_id)
4206 max_id = rbd_dev->dev_id;
4208 spin_unlock(&rbd_dev_list_lock);
4211 * The max id could have been updated by rbd_dev_id_get(), in
4212 * which case it now accurately reflects the new maximum.
4213 * Be careful not to overwrite the maximum value in that
4216 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4217 dout(" max dev id has been reset\n");
4221 * Skips over white space at *buf, and updates *buf to point to the
4222 * first found non-space character (if any). Returns the length of
4223 * the token (string of non-white space characters) found. Note
4224 * that *buf must be terminated with '\0'.
4226 static inline size_t next_token(const char **buf)
4229 * These are the characters that produce nonzero for
4230 * isspace() in the "C" and "POSIX" locales.
4232 const char *spaces = " \f\n\r\t\v";
4234 *buf += strspn(*buf, spaces); /* Find start of token */
4236 return strcspn(*buf, spaces); /* Return token length */
4240 * Finds the next token in *buf, and if the provided token buffer is
4241 * big enough, copies the found token into it. The result, if
4242 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4243 * must be terminated with '\0' on entry.
4245 * Returns the length of the token found (not including the '\0').
4246 * Return value will be 0 if no token is found, and it will be >=
4247 * token_size if the token would not fit.
4249 * The *buf pointer will be updated to point beyond the end of the
4250 * found token. Note that this occurs even if the token buffer is
4251 * too small to hold it.
4253 static inline size_t copy_token(const char **buf,
4259 len = next_token(buf);
4260 if (len < token_size) {
4261 memcpy(token, *buf, len);
4262 *(token + len) = '\0';
4270 * Finds the next token in *buf, dynamically allocates a buffer big
4271 * enough to hold a copy of it, and copies the token into the new
4272 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4273 * that a duplicate buffer is created even for a zero-length token.
4275 * Returns a pointer to the newly-allocated duplicate, or a null
4276 * pointer if memory for the duplicate was not available. If
4277 * the lenp argument is a non-null pointer, the length of the token
4278 * (not including the '\0') is returned in *lenp.
4280 * If successful, the *buf pointer will be updated to point beyond
4281 * the end of the found token.
4283 * Note: uses GFP_KERNEL for allocation.
4285 static inline char *dup_token(const char **buf, size_t *lenp)
4290 len = next_token(buf);
4291 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4294 *(dup + len) = '\0';
4304 * Parse the options provided for an "rbd add" (i.e., rbd image
4305 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4306 * and the data written is passed here via a NUL-terminated buffer.
4307 * Returns 0 if successful or an error code otherwise.
4309 * The information extracted from these options is recorded in
4310 * the other parameters which return dynamically-allocated
4313 * The address of a pointer that will refer to a ceph options
4314 * structure. Caller must release the returned pointer using
4315 * ceph_destroy_options() when it is no longer needed.
4317 * Address of an rbd options pointer. Fully initialized by
4318 * this function; caller must release with kfree().
4320 * Address of an rbd image specification pointer. Fully
4321 * initialized by this function based on parsed options.
4322 * Caller must release with rbd_spec_put().
4324 * The options passed take this form:
4325 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4328 * A comma-separated list of one or more monitor addresses.
4329 * A monitor address is an ip address, optionally followed
4330 * by a port number (separated by a colon).
4331 * I.e.: ip1[:port1][,ip2[:port2]...]
4333 * A comma-separated list of ceph and/or rbd options.
4335 * The name of the rados pool containing the rbd image.
4337 * The name of the image in that pool to map.
4339 * An optional snapshot id. If provided, the mapping will
4340 * present data from the image at the time that snapshot was
4341 * created. The image head is used if no snapshot id is
4342 * provided. Snapshot mappings are always read-only.
4344 static int rbd_add_parse_args(const char *buf,
4345 struct ceph_options **ceph_opts,
4346 struct rbd_options **opts,
4347 struct rbd_spec **rbd_spec)
4351 const char *mon_addrs;
4353 size_t mon_addrs_size;
4354 struct rbd_spec *spec = NULL;
4355 struct rbd_options *rbd_opts = NULL;
4356 struct ceph_options *copts;
4359 /* The first four tokens are required */
4361 len = next_token(&buf);
4363 rbd_warn(NULL, "no monitor address(es) provided");
4367 mon_addrs_size = len + 1;
4371 options = dup_token(&buf, NULL);
4375 rbd_warn(NULL, "no options provided");
4379 spec = rbd_spec_alloc();
4383 spec->pool_name = dup_token(&buf, NULL);
4384 if (!spec->pool_name)
4386 if (!*spec->pool_name) {
4387 rbd_warn(NULL, "no pool name provided");
4391 spec->image_name = dup_token(&buf, NULL);
4392 if (!spec->image_name)
4394 if (!*spec->image_name) {
4395 rbd_warn(NULL, "no image name provided");
4400 * Snapshot name is optional; default is to use "-"
4401 * (indicating the head/no snapshot).
4403 len = next_token(&buf);
4405 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4406 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4407 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4408 ret = -ENAMETOOLONG;
4411 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4414 *(snap_name + len) = '\0';
4415 spec->snap_name = snap_name;
4417 /* Initialize all rbd options to the defaults */
4419 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4423 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4425 copts = ceph_parse_options(options, mon_addrs,
4426 mon_addrs + mon_addrs_size - 1,
4427 parse_rbd_opts_token, rbd_opts);
4428 if (IS_ERR(copts)) {
4429 ret = PTR_ERR(copts);
4450 * An rbd format 2 image has a unique identifier, distinct from the
4451 * name given to it by the user. Internally, that identifier is
4452 * what's used to specify the names of objects related to the image.
4454 * A special "rbd id" object is used to map an rbd image name to its
4455 * id. If that object doesn't exist, then there is no v2 rbd image
4456 * with the supplied name.
4458 * This function will record the given rbd_dev's image_id field if
4459 * it can be determined, and in that case will return 0. If any
4460 * errors occur a negative errno will be returned and the rbd_dev's
4461 * image_id field will be unchanged (and should be NULL).
4463 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4472 * When probing a parent image, the image id is already
4473 * known (and the image name likely is not). There's no
4474 * need to fetch the image id again in this case. We
4475 * do still need to set the image format though.
4477 if (rbd_dev->spec->image_id) {
4478 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4484 * First, see if the format 2 image id file exists, and if
4485 * so, get the image's persistent id from it.
4487 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4488 object_name = kmalloc(size, GFP_NOIO);
4491 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4492 dout("rbd id object name is %s\n", object_name);
4494 /* Response will be an encoded string, which includes a length */
4496 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4497 response = kzalloc(size, GFP_NOIO);
4503 /* If it doesn't exist we'll assume it's a format 1 image */
4505 ret = rbd_obj_method_sync(rbd_dev, object_name,
4506 "rbd", "get_id", NULL, 0,
4507 response, RBD_IMAGE_ID_LEN_MAX);
4508 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4509 if (ret == -ENOENT) {
4510 image_id = kstrdup("", GFP_KERNEL);
4511 ret = image_id ? 0 : -ENOMEM;
4513 rbd_dev->image_format = 1;
4514 } else if (ret > sizeof (__le32)) {
4517 image_id = ceph_extract_encoded_string(&p, p + ret,
4519 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4521 rbd_dev->image_format = 2;
4527 rbd_dev->spec->image_id = image_id;
4528 dout("image_id is %s\n", image_id);
4537 /* Undo whatever state changes are made by v1 or v2 image probe */
4539 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4541 struct rbd_image_header *header;
4543 rbd_dev_remove_parent(rbd_dev);
4544 rbd_spec_put(rbd_dev->parent_spec);
4545 rbd_dev->parent_spec = NULL;
4546 rbd_dev->parent_overlap = 0;
4548 /* Free dynamic fields from the header, then zero it out */
4550 header = &rbd_dev->header;
4551 ceph_put_snap_context(header->snapc);
4552 kfree(header->snap_sizes);
4553 kfree(header->snap_names);
4554 kfree(header->object_prefix);
4555 memset(header, 0, sizeof (*header));
4558 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4562 /* Populate rbd image metadata */
4564 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4568 /* Version 1 images have no parent (no layering) */
4570 rbd_dev->parent_spec = NULL;
4571 rbd_dev->parent_overlap = 0;
4573 dout("discovered version 1 image, header name is %s\n",
4574 rbd_dev->header_name);
4579 kfree(rbd_dev->header_name);
4580 rbd_dev->header_name = NULL;
4581 kfree(rbd_dev->spec->image_id);
4582 rbd_dev->spec->image_id = NULL;
4587 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4591 ret = rbd_dev_v2_image_size(rbd_dev);
4595 /* Get the object prefix (a.k.a. block_name) for the image */
4597 ret = rbd_dev_v2_object_prefix(rbd_dev);
4601 /* Get the and check features for the image */
4603 ret = rbd_dev_v2_features(rbd_dev);
4607 /* If the image supports layering, get the parent info */
4609 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4610 ret = rbd_dev_v2_parent_info(rbd_dev);
4615 * Don't print a warning for parent images. We can
4616 * tell this point because we won't know its pool
4617 * name yet (just its pool id).
4619 if (rbd_dev->spec->pool_name)
4620 rbd_warn(rbd_dev, "WARNING: kernel layering "
4621 "is EXPERIMENTAL!");
4624 /* If the image supports fancy striping, get its parameters */
4626 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4627 ret = rbd_dev_v2_striping_info(rbd_dev);
4632 /* crypto and compression type aren't (yet) supported for v2 images */
4634 rbd_dev->header.crypt_type = 0;
4635 rbd_dev->header.comp_type = 0;
4637 /* Get the snapshot context, plus the header version */
4639 ret = rbd_dev_v2_snap_context(rbd_dev);
4643 dout("discovered version 2 image, header name is %s\n",
4644 rbd_dev->header_name);
4648 rbd_dev->parent_overlap = 0;
4649 rbd_spec_put(rbd_dev->parent_spec);
4650 rbd_dev->parent_spec = NULL;
4651 kfree(rbd_dev->header_name);
4652 rbd_dev->header_name = NULL;
4653 kfree(rbd_dev->header.object_prefix);
4654 rbd_dev->header.object_prefix = NULL;
4659 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4661 struct rbd_device *parent = NULL;
4662 struct rbd_spec *parent_spec;
4663 struct rbd_client *rbdc;
4666 if (!rbd_dev->parent_spec)
4669 * We need to pass a reference to the client and the parent
4670 * spec when creating the parent rbd_dev. Images related by
4671 * parent/child relationships always share both.
4673 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4674 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4677 parent = rbd_dev_create(rbdc, parent_spec);
4681 ret = rbd_dev_image_probe(parent);
4684 rbd_dev->parent = parent;
4689 rbd_spec_put(rbd_dev->parent_spec);
4690 kfree(rbd_dev->header_name);
4691 rbd_dev_destroy(parent);
4693 rbd_put_client(rbdc);
4694 rbd_spec_put(parent_spec);
4700 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4704 ret = rbd_dev_mapping_set(rbd_dev);
4708 /* generate unique id: find highest unique id, add one */
4709 rbd_dev_id_get(rbd_dev);
4711 /* Fill in the device name, now that we have its id. */
4712 BUILD_BUG_ON(DEV_NAME_LEN
4713 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4714 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4716 /* Get our block major device number. */
4718 ret = register_blkdev(0, rbd_dev->name);
4721 rbd_dev->major = ret;
4723 /* Set up the blkdev mapping. */
4725 ret = rbd_init_disk(rbd_dev);
4727 goto err_out_blkdev;
4729 ret = rbd_bus_add_dev(rbd_dev);
4733 /* Everything's ready. Announce the disk to the world. */
4735 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4736 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4737 add_disk(rbd_dev->disk);
4739 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4740 (unsigned long long) rbd_dev->mapping.size);
4745 rbd_free_disk(rbd_dev);
4747 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4749 rbd_dev_id_put(rbd_dev);
4750 rbd_dev_mapping_clear(rbd_dev);
4755 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4757 struct rbd_spec *spec = rbd_dev->spec;
4760 /* Record the header object name for this rbd image. */
4762 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4764 if (rbd_dev->image_format == 1)
4765 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4767 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4769 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4770 if (!rbd_dev->header_name)
4773 if (rbd_dev->image_format == 1)
4774 sprintf(rbd_dev->header_name, "%s%s",
4775 spec->image_name, RBD_SUFFIX);
4777 sprintf(rbd_dev->header_name, "%s%s",
4778 RBD_HEADER_PREFIX, spec->image_id);
4782 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4786 rbd_remove_all_snaps(rbd_dev);
4787 rbd_dev_unprobe(rbd_dev);
4788 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4790 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4791 kfree(rbd_dev->header_name);
4792 rbd_dev->header_name = NULL;
4793 rbd_dev->image_format = 0;
4794 kfree(rbd_dev->spec->image_id);
4795 rbd_dev->spec->image_id = NULL;
4797 rbd_dev_destroy(rbd_dev);
4801 * Probe for the existence of the header object for the given rbd
4802 * device. For format 2 images this includes determining the image
4805 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4811 * Get the id from the image id object. If it's not a
4812 * format 2 image, we'll get ENOENT back, and we'll assume
4813 * it's a format 1 image.
4815 ret = rbd_dev_image_id(rbd_dev);
4818 rbd_assert(rbd_dev->spec->image_id);
4819 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4821 ret = rbd_dev_header_name(rbd_dev);
4823 goto err_out_format;
4825 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4827 goto out_header_name;
4829 if (rbd_dev->image_format == 1)
4830 ret = rbd_dev_v1_probe(rbd_dev);
4832 ret = rbd_dev_v2_probe(rbd_dev);
4836 ret = rbd_dev_snaps_update(rbd_dev);
4840 ret = rbd_dev_spec_update(rbd_dev);
4844 ret = rbd_dev_probe_parent(rbd_dev);
4849 rbd_remove_all_snaps(rbd_dev);
4851 rbd_dev_unprobe(rbd_dev);
4853 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4855 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4857 kfree(rbd_dev->header_name);
4858 rbd_dev->header_name = NULL;
4860 rbd_dev->image_format = 0;
4861 kfree(rbd_dev->spec->image_id);
4862 rbd_dev->spec->image_id = NULL;
4864 dout("probe failed, returning %d\n", ret);
4869 static ssize_t rbd_add(struct bus_type *bus,
4873 struct rbd_device *rbd_dev = NULL;
4874 struct ceph_options *ceph_opts = NULL;
4875 struct rbd_options *rbd_opts = NULL;
4876 struct rbd_spec *spec = NULL;
4877 struct rbd_client *rbdc;
4878 struct ceph_osd_client *osdc;
4881 if (!try_module_get(THIS_MODULE))
4884 /* parse add command */
4885 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4887 goto err_out_module;
4889 rbdc = rbd_get_client(ceph_opts);
4894 ceph_opts = NULL; /* rbd_dev client now owns this */
4897 osdc = &rbdc->client->osdc;
4898 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4900 goto err_out_client;
4901 spec->pool_id = (u64)rc;
4903 /* The ceph file layout needs to fit pool id in 32 bits */
4905 if (spec->pool_id > (u64)U32_MAX) {
4906 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4907 (unsigned long long)spec->pool_id, U32_MAX);
4909 goto err_out_client;
4912 rbd_dev = rbd_dev_create(rbdc, spec);
4914 goto err_out_client;
4915 rbdc = NULL; /* rbd_dev now owns this */
4916 spec = NULL; /* rbd_dev now owns this */
4918 rbd_dev->mapping.read_only = rbd_opts->read_only;
4920 rbd_opts = NULL; /* done with this */
4922 rc = rbd_dev_image_probe(rbd_dev);
4924 goto err_out_rbd_dev;
4926 rc = rbd_dev_device_setup(rbd_dev);
4930 rbd_dev_image_release(rbd_dev);
4932 rbd_dev_destroy(rbd_dev);
4934 rbd_put_client(rbdc);
4937 ceph_destroy_options(ceph_opts);
4941 module_put(THIS_MODULE);
4943 dout("Error adding device %s\n", buf);
4948 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4950 struct list_head *tmp;
4951 struct rbd_device *rbd_dev;
4953 spin_lock(&rbd_dev_list_lock);
4954 list_for_each(tmp, &rbd_dev_list) {
4955 rbd_dev = list_entry(tmp, struct rbd_device, node);
4956 if (rbd_dev->dev_id == dev_id) {
4957 spin_unlock(&rbd_dev_list_lock);
4961 spin_unlock(&rbd_dev_list_lock);
4965 static void rbd_dev_device_release(struct device *dev)
4967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4969 rbd_free_disk(rbd_dev);
4970 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4971 rbd_dev_clear_mapping(rbd_dev);
4972 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4974 rbd_dev_id_put(rbd_dev);
4975 rbd_dev_mapping_clear(rbd_dev);
4978 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4980 while (rbd_dev->parent) {
4981 struct rbd_device *first = rbd_dev;
4982 struct rbd_device *second = first->parent;
4983 struct rbd_device *third;
4986 * Follow to the parent with no grandparent and
4989 while (second && (third = second->parent)) {
4994 rbd_dev_image_release(second);
4995 first->parent = NULL;
4996 first->parent_overlap = 0;
4998 rbd_assert(first->parent_spec);
4999 rbd_spec_put(first->parent_spec);
5000 first->parent_spec = NULL;
5004 static ssize_t rbd_remove(struct bus_type *bus,
5008 struct rbd_device *rbd_dev = NULL;
5013 ret = strict_strtoul(buf, 10, &ul);
5017 /* convert to int; abort if we lost anything in the conversion */
5018 target_id = (int) ul;
5019 if (target_id != ul)
5022 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5024 rbd_dev = __rbd_get_dev(target_id);
5030 spin_lock_irq(&rbd_dev->lock);
5031 if (rbd_dev->open_count)
5034 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5035 spin_unlock_irq(&rbd_dev->lock);
5039 rbd_bus_del_dev(rbd_dev);
5040 rbd_dev_image_release(rbd_dev);
5041 module_put(THIS_MODULE);
5043 mutex_unlock(&ctl_mutex);
5049 * create control files in sysfs
5052 static int rbd_sysfs_init(void)
5056 ret = device_register(&rbd_root_dev);
5060 ret = bus_register(&rbd_bus_type);
5062 device_unregister(&rbd_root_dev);
5067 static void rbd_sysfs_cleanup(void)
5069 bus_unregister(&rbd_bus_type);
5070 device_unregister(&rbd_root_dev);
5073 static int __init rbd_init(void)
5077 if (!libceph_compatible(NULL)) {
5078 rbd_warn(NULL, "libceph incompatibility (quitting)");
5082 rc = rbd_sysfs_init();
5085 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5089 static void __exit rbd_exit(void)
5091 rbd_sysfs_cleanup();
5094 module_init(rbd_init);
5095 module_exit(rbd_exit);
5097 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5098 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5099 MODULE_DESCRIPTION("rados block device");
5101 /* following authorship retained from original osdblk.c */
5102 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5104 MODULE_LICENSE("GPL");