3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
45 #include "rbd_types.h"
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58 #define RBD_DRV_NAME "rbd"
59 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64 #define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_SNAP_HEAD_NAME "-"
71 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING (1<<0)
82 #define RBD_FEATURE_STRIPINGV2 (1<<1)
83 #define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
86 /* Features supported by this (client software) implementation. */
88 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
96 #define DEV_NAME_LEN 32
97 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
100 * block device image metadata (in-memory version)
102 struct rbd_image_header {
103 /* These six fields never change for a given rbd image */
110 u64 features; /* Might be changeable someday? */
112 /* The remaining fields need to be updated occasionally */
114 struct ceph_snap_context *snapc;
115 char *snap_names; /* format 1 only */
116 u64 *snap_sizes; /* format 1 only */
120 * An rbd image specification.
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
146 const char *pool_name;
148 const char *image_id;
149 const char *image_name;
152 const char *snap_name;
158 * an instance of the client. multiple devices may share an rbd client.
161 struct ceph_client *client;
163 struct list_head node;
166 struct rbd_img_request;
167 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
171 struct rbd_obj_request;
172 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174 enum obj_request_type {
175 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
179 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
180 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
181 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
182 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
185 struct rbd_obj_request {
186 const char *object_name;
187 u64 offset; /* object start byte */
188 u64 length; /* bytes from offset */
192 * An object request associated with an image will have its
193 * img_data flag set; a standalone object request will not.
195 * A standalone object request will have which == BAD_WHICH
196 * and a null obj_request pointer.
198 * An object request initiated in support of a layered image
199 * object (to check for its existence before a write) will
200 * have which == BAD_WHICH and a non-null obj_request pointer.
202 * Finally, an object request for rbd image data will have
203 * which != BAD_WHICH, and will have a non-null img_request
204 * pointer. The value of which will be in the range
205 * 0..(img_request->obj_request_count-1).
208 struct rbd_obj_request *obj_request; /* STAT op */
210 struct rbd_img_request *img_request;
212 /* links for img_request->obj_requests list */
213 struct list_head links;
216 u32 which; /* posn image request list */
218 enum obj_request_type type;
220 struct bio *bio_list;
226 struct page **copyup_pages;
228 struct ceph_osd_request *osd_req;
230 u64 xferred; /* bytes transferred */
233 rbd_obj_callback_t callback;
234 struct completion completion;
240 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
241 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
242 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
245 struct rbd_img_request {
246 struct rbd_device *rbd_dev;
247 u64 offset; /* starting image byte offset */
248 u64 length; /* byte count from offset */
251 u64 snap_id; /* for reads */
252 struct ceph_snap_context *snapc; /* for writes */
255 struct request *rq; /* block request */
256 struct rbd_obj_request *obj_request; /* obj req initiator */
258 struct page **copyup_pages;
259 spinlock_t completion_lock;/* protects next_completion */
261 rbd_img_callback_t callback;
262 u64 xferred;/* aggregate bytes transferred */
263 int result; /* first nonzero obj_request result */
265 u32 obj_request_count;
266 struct list_head obj_requests; /* rbd_obj_request structs */
271 #define for_each_obj_request(ireq, oreq) \
272 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
273 #define for_each_obj_request_from(ireq, oreq) \
274 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
275 #define for_each_obj_request_safe(ireq, oreq, n) \
276 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
288 int dev_id; /* blkdev unique id */
290 int major; /* blkdev assigned major */
291 struct gendisk *disk; /* blkdev's gendisk and rq */
293 u32 image_format; /* Either 1 or 2 */
294 struct rbd_client *rbd_client;
296 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298 spinlock_t lock; /* queue, flags, open_count */
300 struct rbd_image_header header;
301 unsigned long flags; /* possibly lock protected */
302 struct rbd_spec *spec;
306 struct ceph_file_layout layout;
308 struct ceph_osd_event *watch_event;
309 struct rbd_obj_request *watch_request;
311 struct rbd_spec *parent_spec;
313 struct rbd_device *parent;
315 /* protects updating the header */
316 struct rw_semaphore header_rwsem;
318 struct rbd_mapping mapping;
320 struct list_head node;
324 unsigned long open_count; /* protected by lock */
328 * Flag bits for rbd_dev->flags. If atomicity is required,
329 * rbd_dev->lock is used to protect access.
331 * Currently, only the "removing" flag (which is coupled with the
332 * "open_count" field) requires atomic access.
335 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
336 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
339 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
341 static LIST_HEAD(rbd_dev_list); /* devices */
342 static DEFINE_SPINLOCK(rbd_dev_list_lock);
344 static LIST_HEAD(rbd_client_list); /* clients */
345 static DEFINE_SPINLOCK(rbd_client_list_lock);
347 /* Slab caches for frequently-allocated structures */
349 static struct kmem_cache *rbd_img_request_cache;
350 static struct kmem_cache *rbd_obj_request_cache;
351 static struct kmem_cache *rbd_segment_name_cache;
353 static int rbd_img_request_submit(struct rbd_img_request *img_request);
355 static void rbd_dev_device_release(struct device *dev);
357 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
363 static struct bus_attribute rbd_bus_attrs[] = {
364 __ATTR(add, S_IWUSR, NULL, rbd_add),
365 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
369 static struct bus_type rbd_bus_type = {
371 .bus_attrs = rbd_bus_attrs,
374 static void rbd_root_dev_release(struct device *dev)
378 static struct device rbd_root_dev = {
380 .release = rbd_root_dev_release,
383 static __printf(2, 3)
384 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386 struct va_format vaf;
394 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395 else if (rbd_dev->disk)
396 printk(KERN_WARNING "%s: %s: %pV\n",
397 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398 else if (rbd_dev->spec && rbd_dev->spec->image_name)
399 printk(KERN_WARNING "%s: image %s: %pV\n",
400 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401 else if (rbd_dev->spec && rbd_dev->spec->image_id)
402 printk(KERN_WARNING "%s: id %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406 RBD_DRV_NAME, rbd_dev, &vaf);
411 #define rbd_assert(expr) \
412 if (unlikely(!(expr))) { \
413 printk(KERN_ERR "\nAssertion failure in %s() " \
415 "\trbd_assert(%s);\n\n", \
416 __func__, __LINE__, #expr); \
419 #else /* !RBD_DEBUG */
420 # define rbd_assert(expr) ((void) 0)
421 #endif /* !RBD_DEBUG */
423 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
424 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
427 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
429 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
432 u8 *order, u64 *snap_size);
433 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
439 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440 bool removing = false;
442 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
445 spin_lock_irq(&rbd_dev->lock);
446 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
449 rbd_dev->open_count++;
450 spin_unlock_irq(&rbd_dev->lock);
454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455 (void) get_device(&rbd_dev->dev);
456 set_device_ro(bdev, rbd_dev->mapping.read_only);
457 mutex_unlock(&ctl_mutex);
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
464 struct rbd_device *rbd_dev = disk->private_data;
465 unsigned long open_count_before;
467 spin_lock_irq(&rbd_dev->lock);
468 open_count_before = rbd_dev->open_count--;
469 spin_unlock_irq(&rbd_dev->lock);
470 rbd_assert(open_count_before > 0);
472 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473 put_device(&rbd_dev->dev);
474 mutex_unlock(&ctl_mutex);
479 static const struct block_device_operations rbd_bd_ops = {
480 .owner = THIS_MODULE,
482 .release = rbd_release,
486 * Initialize an rbd client instance.
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
491 struct rbd_client *rbdc;
494 dout("%s:\n", __func__);
495 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
499 kref_init(&rbdc->kref);
500 INIT_LIST_HEAD(&rbdc->node);
502 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505 if (IS_ERR(rbdc->client))
507 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
509 ret = ceph_open_session(rbdc->client);
513 spin_lock(&rbd_client_list_lock);
514 list_add_tail(&rbdc->node, &rbd_client_list);
515 spin_unlock(&rbd_client_list_lock);
517 mutex_unlock(&ctl_mutex);
518 dout("%s: rbdc %p\n", __func__, rbdc);
523 ceph_destroy_client(rbdc->client);
525 mutex_unlock(&ctl_mutex);
529 ceph_destroy_options(ceph_opts);
530 dout("%s: error %d\n", __func__, ret);
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537 kref_get(&rbdc->kref);
543 * Find a ceph client with specific addr and configuration. If
544 * found, bump its reference count.
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
548 struct rbd_client *client_node;
551 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
554 spin_lock(&rbd_client_list_lock);
555 list_for_each_entry(client_node, &rbd_client_list, node) {
556 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557 __rbd_get_client(client_node);
563 spin_unlock(&rbd_client_list_lock);
565 return found ? client_node : NULL;
575 /* string args above */
578 /* Boolean args above */
582 static match_table_t rbd_opts_tokens = {
584 /* string args above */
585 {Opt_read_only, "read_only"},
586 {Opt_read_only, "ro"}, /* Alternate spelling */
587 {Opt_read_write, "read_write"},
588 {Opt_read_write, "rw"}, /* Alternate spelling */
589 /* Boolean args above */
597 #define RBD_READ_ONLY_DEFAULT false
599 static int parse_rbd_opts_token(char *c, void *private)
601 struct rbd_options *rbd_opts = private;
602 substring_t argstr[MAX_OPT_ARGS];
603 int token, intval, ret;
605 token = match_token(c, rbd_opts_tokens, argstr);
609 if (token < Opt_last_int) {
610 ret = match_int(&argstr[0], &intval);
612 pr_err("bad mount option arg (not int) "
616 dout("got int token %d val %d\n", token, intval);
617 } else if (token > Opt_last_int && token < Opt_last_string) {
618 dout("got string token %d val %s\n", token,
620 } else if (token > Opt_last_string && token < Opt_last_bool) {
621 dout("got Boolean token %d\n", token);
623 dout("got token %d\n", token);
628 rbd_opts->read_only = true;
631 rbd_opts->read_only = false;
641 * Get a ceph client with specific addr and configuration, if one does
642 * not exist create it.
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
646 struct rbd_client *rbdc;
648 rbdc = rbd_client_find(ceph_opts);
649 if (rbdc) /* using an existing client */
650 ceph_destroy_options(ceph_opts);
652 rbdc = rbd_client_create(ceph_opts);
658 * Destroy ceph client
660 * Caller must hold rbd_client_list_lock.
662 static void rbd_client_release(struct kref *kref)
664 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666 dout("%s: rbdc %p\n", __func__, rbdc);
667 spin_lock(&rbd_client_list_lock);
668 list_del(&rbdc->node);
669 spin_unlock(&rbd_client_list_lock);
671 ceph_destroy_client(rbdc->client);
676 * Drop reference to ceph client node. If it's not referenced anymore, release
679 static void rbd_put_client(struct rbd_client *rbdc)
682 kref_put(&rbdc->kref, rbd_client_release);
685 static bool rbd_image_format_valid(u32 image_format)
687 return image_format == 1 || image_format == 2;
690 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
695 /* The header has to start with the magic rbd header text */
696 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
699 /* The bio layer requires at least sector-sized I/O */
701 if (ondisk->options.order < SECTOR_SHIFT)
704 /* If we use u64 in a few spots we may be able to loosen this */
706 if (ondisk->options.order > 8 * sizeof (int) - 1)
710 * The size of a snapshot header has to fit in a size_t, and
711 * that limits the number of snapshots.
713 snap_count = le32_to_cpu(ondisk->snap_count);
714 size = SIZE_MAX - sizeof (struct ceph_snap_context);
715 if (snap_count > size / sizeof (__le64))
719 * Not only that, but the size of the entire the snapshot
720 * header must also be representable in a size_t.
722 size -= snap_count * sizeof (__le64);
723 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
730 * Fill an rbd image header with information from the given format 1
733 static int rbd_header_from_disk(struct rbd_image_header *header,
734 struct rbd_image_header_ondisk *ondisk)
736 bool first_time = header->object_prefix == NULL;
737 struct ceph_snap_context *snapc;
738 char *object_prefix = NULL;
739 char *snap_names = NULL;
740 u64 *snap_sizes = NULL;
746 /* Allocate this now to avoid having to handle failure below */
751 len = strnlen(ondisk->object_prefix,
752 sizeof (ondisk->object_prefix));
753 object_prefix = kmalloc(len + 1, GFP_KERNEL);
756 memcpy(object_prefix, ondisk->object_prefix, len);
757 object_prefix[len] = '\0';
760 /* Allocate the snapshot context and fill it in */
762 snap_count = le32_to_cpu(ondisk->snap_count);
763 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
766 snapc->seq = le64_to_cpu(ondisk->snap_seq);
768 struct rbd_image_snap_ondisk *snaps;
769 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
771 /* We'll keep a copy of the snapshot names... */
773 if (snap_names_len > (u64)SIZE_MAX)
775 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
779 /* ...as well as the array of their sizes. */
781 size = snap_count * sizeof (*header->snap_sizes);
782 snap_sizes = kmalloc(size, GFP_KERNEL);
787 * Copy the names, and fill in each snapshot's id
790 * Note that rbd_dev_v1_header_read() guarantees the
791 * ondisk buffer we're working with has
792 * snap_names_len bytes beyond the end of the
793 * snapshot id array, this memcpy() is safe.
795 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
796 snaps = ondisk->snaps;
797 for (i = 0; i < snap_count; i++) {
798 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
799 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
803 /* We won't fail any more, fill in the header */
806 header->object_prefix = object_prefix;
807 header->obj_order = ondisk->options.order;
808 header->crypt_type = ondisk->options.crypt_type;
809 header->comp_type = ondisk->options.comp_type;
810 /* The rest aren't used for format 1 images */
811 header->stripe_unit = 0;
812 header->stripe_count = 0;
813 header->features = 0;
816 /* The remaining fields always get updated (when we refresh) */
818 header->image_size = le64_to_cpu(ondisk->image_size);
819 header->snapc = snapc;
820 header->snap_names = snap_names;
821 header->snap_sizes = snap_sizes;
829 ceph_put_snap_context(snapc);
830 kfree(object_prefix);
835 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
837 const char *snap_name;
839 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
841 /* Skip over names until we find the one we are looking for */
843 snap_name = rbd_dev->header.snap_names;
845 snap_name += strlen(snap_name) + 1;
847 return kstrdup(snap_name, GFP_KERNEL);
851 * Snapshot id comparison function for use with qsort()/bsearch().
852 * Note that result is for snapshots in *descending* order.
854 static int snapid_compare_reverse(const void *s1, const void *s2)
856 u64 snap_id1 = *(u64 *)s1;
857 u64 snap_id2 = *(u64 *)s2;
859 if (snap_id1 < snap_id2)
861 return snap_id1 == snap_id2 ? 0 : -1;
865 * Search a snapshot context to see if the given snapshot id is
868 * Returns the position of the snapshot id in the array if it's found,
869 * or BAD_SNAP_INDEX otherwise.
871 * Note: The snapshot array is in kept sorted (by the osd) in
872 * reverse order, highest snapshot id first.
874 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
876 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
879 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
880 sizeof (snap_id), snapid_compare_reverse);
882 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
885 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
890 which = rbd_dev_snap_index(rbd_dev, snap_id);
891 if (which == BAD_SNAP_INDEX)
894 return _rbd_dev_v1_snap_name(rbd_dev, which);
897 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
899 if (snap_id == CEPH_NOSNAP)
900 return RBD_SNAP_HEAD_NAME;
902 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
903 if (rbd_dev->image_format == 1)
904 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
906 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
909 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
912 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
913 if (snap_id == CEPH_NOSNAP) {
914 *snap_size = rbd_dev->header.image_size;
915 } else if (rbd_dev->image_format == 1) {
918 which = rbd_dev_snap_index(rbd_dev, snap_id);
919 if (which == BAD_SNAP_INDEX)
922 *snap_size = rbd_dev->header.snap_sizes[which];
927 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
936 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
939 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
940 if (snap_id == CEPH_NOSNAP) {
941 *snap_features = rbd_dev->header.features;
942 } else if (rbd_dev->image_format == 1) {
943 *snap_features = 0; /* No features for format 1 */
948 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
952 *snap_features = features;
957 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
959 u64 snap_id = rbd_dev->spec->snap_id;
964 ret = rbd_snap_size(rbd_dev, snap_id, &size);
967 ret = rbd_snap_features(rbd_dev, snap_id, &features);
971 rbd_dev->mapping.size = size;
972 rbd_dev->mapping.features = features;
977 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
979 rbd_dev->mapping.size = 0;
980 rbd_dev->mapping.features = 0;
983 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
989 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
992 segment = offset >> rbd_dev->header.obj_order;
993 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
994 rbd_dev->header.object_prefix, segment);
995 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
996 pr_err("error formatting segment name for #%llu (%d)\n",
1005 static void rbd_segment_name_free(const char *name)
1007 /* The explicit cast here is needed to drop the const qualifier */
1009 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1012 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1014 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1016 return offset & (segment_size - 1);
1019 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1020 u64 offset, u64 length)
1022 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1024 offset &= segment_size - 1;
1026 rbd_assert(length <= U64_MAX - offset);
1027 if (offset + length > segment_size)
1028 length = segment_size - offset;
1034 * returns the size of an object in the image
1036 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1038 return 1 << header->obj_order;
1045 static void bio_chain_put(struct bio *chain)
1051 chain = chain->bi_next;
1057 * zeros a bio chain, starting at specific offset
1059 static void zero_bio_chain(struct bio *chain, int start_ofs)
1062 unsigned long flags;
1068 bio_for_each_segment(bv, chain, i) {
1069 if (pos + bv->bv_len > start_ofs) {
1070 int remainder = max(start_ofs - pos, 0);
1071 buf = bvec_kmap_irq(bv, &flags);
1072 memset(buf + remainder, 0,
1073 bv->bv_len - remainder);
1074 bvec_kunmap_irq(buf, &flags);
1079 chain = chain->bi_next;
1084 * similar to zero_bio_chain(), zeros data defined by a page array,
1085 * starting at the given byte offset from the start of the array and
1086 * continuing up to the given end offset. The pages array is
1087 * assumed to be big enough to hold all bytes up to the end.
1089 static void zero_pages(struct page **pages, u64 offset, u64 end)
1091 struct page **page = &pages[offset >> PAGE_SHIFT];
1093 rbd_assert(end > offset);
1094 rbd_assert(end - offset <= (u64)SIZE_MAX);
1095 while (offset < end) {
1098 unsigned long flags;
1101 page_offset = (size_t)(offset & ~PAGE_MASK);
1102 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1103 local_irq_save(flags);
1104 kaddr = kmap_atomic(*page);
1105 memset(kaddr + page_offset, 0, length);
1106 kunmap_atomic(kaddr);
1107 local_irq_restore(flags);
1115 * Clone a portion of a bio, starting at the given byte offset
1116 * and continuing for the number of bytes indicated.
1118 static struct bio *bio_clone_range(struct bio *bio_src,
1119 unsigned int offset,
1127 unsigned short end_idx;
1128 unsigned short vcnt;
1131 /* Handle the easy case for the caller */
1133 if (!offset && len == bio_src->bi_size)
1134 return bio_clone(bio_src, gfpmask);
1136 if (WARN_ON_ONCE(!len))
1138 if (WARN_ON_ONCE(len > bio_src->bi_size))
1140 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1143 /* Find first affected segment... */
1146 __bio_for_each_segment(bv, bio_src, idx, 0) {
1147 if (resid < bv->bv_len)
1149 resid -= bv->bv_len;
1153 /* ...and the last affected segment */
1156 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1157 if (resid <= bv->bv_len)
1159 resid -= bv->bv_len;
1161 vcnt = end_idx - idx + 1;
1163 /* Build the clone */
1165 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1167 return NULL; /* ENOMEM */
1169 bio->bi_bdev = bio_src->bi_bdev;
1170 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1171 bio->bi_rw = bio_src->bi_rw;
1172 bio->bi_flags |= 1 << BIO_CLONED;
1175 * Copy over our part of the bio_vec, then update the first
1176 * and last (or only) entries.
1178 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1179 vcnt * sizeof (struct bio_vec));
1180 bio->bi_io_vec[0].bv_offset += voff;
1182 bio->bi_io_vec[0].bv_len -= voff;
1183 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1185 bio->bi_io_vec[0].bv_len = len;
1188 bio->bi_vcnt = vcnt;
1196 * Clone a portion of a bio chain, starting at the given byte offset
1197 * into the first bio in the source chain and continuing for the
1198 * number of bytes indicated. The result is another bio chain of
1199 * exactly the given length, or a null pointer on error.
1201 * The bio_src and offset parameters are both in-out. On entry they
1202 * refer to the first source bio and the offset into that bio where
1203 * the start of data to be cloned is located.
1205 * On return, bio_src is updated to refer to the bio in the source
1206 * chain that contains first un-cloned byte, and *offset will
1207 * contain the offset of that byte within that bio.
1209 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1210 unsigned int *offset,
1214 struct bio *bi = *bio_src;
1215 unsigned int off = *offset;
1216 struct bio *chain = NULL;
1219 /* Build up a chain of clone bios up to the limit */
1221 if (!bi || off >= bi->bi_size || !len)
1222 return NULL; /* Nothing to clone */
1226 unsigned int bi_size;
1230 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1231 goto out_err; /* EINVAL; ran out of bio's */
1233 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1234 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1236 goto out_err; /* ENOMEM */
1239 end = &bio->bi_next;
1242 if (off == bi->bi_size) {
1253 bio_chain_put(chain);
1259 * The default/initial value for all object request flags is 0. For
1260 * each flag, once its value is set to 1 it is never reset to 0
1263 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1265 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1266 struct rbd_device *rbd_dev;
1268 rbd_dev = obj_request->img_request->rbd_dev;
1269 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1274 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1277 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1280 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1282 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1283 struct rbd_device *rbd_dev = NULL;
1285 if (obj_request_img_data_test(obj_request))
1286 rbd_dev = obj_request->img_request->rbd_dev;
1287 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1292 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1295 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1299 * This sets the KNOWN flag after (possibly) setting the EXISTS
1300 * flag. The latter is set based on the "exists" value provided.
1302 * Note that for our purposes once an object exists it never goes
1303 * away again. It's possible that the response from two existence
1304 * checks are separated by the creation of the target object, and
1305 * the first ("doesn't exist") response arrives *after* the second
1306 * ("does exist"). In that case we ignore the second one.
1308 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1312 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1313 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1317 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1320 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1323 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1326 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1329 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1331 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1332 atomic_read(&obj_request->kref.refcount));
1333 kref_get(&obj_request->kref);
1336 static void rbd_obj_request_destroy(struct kref *kref);
1337 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1339 rbd_assert(obj_request != NULL);
1340 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1341 atomic_read(&obj_request->kref.refcount));
1342 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1345 static void rbd_img_request_get(struct rbd_img_request *img_request)
1347 dout("%s: img %p (was %d)\n", __func__, img_request,
1348 atomic_read(&img_request->kref.refcount));
1349 kref_get(&img_request->kref);
1352 static void rbd_img_request_destroy(struct kref *kref);
1353 static void rbd_img_request_put(struct rbd_img_request *img_request)
1355 rbd_assert(img_request != NULL);
1356 dout("%s: img %p (was %d)\n", __func__, img_request,
1357 atomic_read(&img_request->kref.refcount));
1358 kref_put(&img_request->kref, rbd_img_request_destroy);
1361 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1362 struct rbd_obj_request *obj_request)
1364 rbd_assert(obj_request->img_request == NULL);
1366 /* Image request now owns object's original reference */
1367 obj_request->img_request = img_request;
1368 obj_request->which = img_request->obj_request_count;
1369 rbd_assert(!obj_request_img_data_test(obj_request));
1370 obj_request_img_data_set(obj_request);
1371 rbd_assert(obj_request->which != BAD_WHICH);
1372 img_request->obj_request_count++;
1373 list_add_tail(&obj_request->links, &img_request->obj_requests);
1374 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1375 obj_request->which);
1378 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1379 struct rbd_obj_request *obj_request)
1381 rbd_assert(obj_request->which != BAD_WHICH);
1383 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1384 obj_request->which);
1385 list_del(&obj_request->links);
1386 rbd_assert(img_request->obj_request_count > 0);
1387 img_request->obj_request_count--;
1388 rbd_assert(obj_request->which == img_request->obj_request_count);
1389 obj_request->which = BAD_WHICH;
1390 rbd_assert(obj_request_img_data_test(obj_request));
1391 rbd_assert(obj_request->img_request == img_request);
1392 obj_request->img_request = NULL;
1393 obj_request->callback = NULL;
1394 rbd_obj_request_put(obj_request);
1397 static bool obj_request_type_valid(enum obj_request_type type)
1400 case OBJ_REQUEST_NODATA:
1401 case OBJ_REQUEST_BIO:
1402 case OBJ_REQUEST_PAGES:
1409 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1410 struct rbd_obj_request *obj_request)
1412 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1414 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1417 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1420 dout("%s: img %p\n", __func__, img_request);
1423 * If no error occurred, compute the aggregate transfer
1424 * count for the image request. We could instead use
1425 * atomic64_cmpxchg() to update it as each object request
1426 * completes; not clear which way is better off hand.
1428 if (!img_request->result) {
1429 struct rbd_obj_request *obj_request;
1432 for_each_obj_request(img_request, obj_request)
1433 xferred += obj_request->xferred;
1434 img_request->xferred = xferred;
1437 if (img_request->callback)
1438 img_request->callback(img_request);
1440 rbd_img_request_put(img_request);
1443 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1445 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1447 dout("%s: obj %p\n", __func__, obj_request);
1449 return wait_for_completion_interruptible(&obj_request->completion);
1453 * The default/initial value for all image request flags is 0. Each
1454 * is conditionally set to 1 at image request initialization time
1455 * and currently never change thereafter.
1457 static void img_request_write_set(struct rbd_img_request *img_request)
1459 set_bit(IMG_REQ_WRITE, &img_request->flags);
1463 static bool img_request_write_test(struct rbd_img_request *img_request)
1466 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1469 static void img_request_child_set(struct rbd_img_request *img_request)
1471 set_bit(IMG_REQ_CHILD, &img_request->flags);
1475 static bool img_request_child_test(struct rbd_img_request *img_request)
1478 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1481 static void img_request_layered_set(struct rbd_img_request *img_request)
1483 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1487 static bool img_request_layered_test(struct rbd_img_request *img_request)
1490 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1494 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1496 u64 xferred = obj_request->xferred;
1497 u64 length = obj_request->length;
1499 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1500 obj_request, obj_request->img_request, obj_request->result,
1503 * ENOENT means a hole in the image. We zero-fill the
1504 * entire length of the request. A short read also implies
1505 * zero-fill to the end of the request. Either way we
1506 * update the xferred count to indicate the whole request
1509 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1510 if (obj_request->result == -ENOENT) {
1511 if (obj_request->type == OBJ_REQUEST_BIO)
1512 zero_bio_chain(obj_request->bio_list, 0);
1514 zero_pages(obj_request->pages, 0, length);
1515 obj_request->result = 0;
1516 obj_request->xferred = length;
1517 } else if (xferred < length && !obj_request->result) {
1518 if (obj_request->type == OBJ_REQUEST_BIO)
1519 zero_bio_chain(obj_request->bio_list, xferred);
1521 zero_pages(obj_request->pages, xferred, length);
1522 obj_request->xferred = length;
1524 obj_request_done_set(obj_request);
1527 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1529 dout("%s: obj %p cb %p\n", __func__, obj_request,
1530 obj_request->callback);
1531 if (obj_request->callback)
1532 obj_request->callback(obj_request);
1534 complete_all(&obj_request->completion);
1537 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1539 dout("%s: obj %p\n", __func__, obj_request);
1540 obj_request_done_set(obj_request);
1543 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1545 struct rbd_img_request *img_request = NULL;
1546 struct rbd_device *rbd_dev = NULL;
1547 bool layered = false;
1549 if (obj_request_img_data_test(obj_request)) {
1550 img_request = obj_request->img_request;
1551 layered = img_request && img_request_layered_test(img_request);
1552 rbd_dev = img_request->rbd_dev;
1555 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1556 obj_request, img_request, obj_request->result,
1557 obj_request->xferred, obj_request->length);
1558 if (layered && obj_request->result == -ENOENT &&
1559 obj_request->img_offset < rbd_dev->parent_overlap)
1560 rbd_img_parent_read(obj_request);
1561 else if (img_request)
1562 rbd_img_obj_request_read_callback(obj_request);
1564 obj_request_done_set(obj_request);
1567 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1569 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1570 obj_request->result, obj_request->length);
1572 * There is no such thing as a successful short write. Set
1573 * it to our originally-requested length.
1575 obj_request->xferred = obj_request->length;
1576 obj_request_done_set(obj_request);
1580 * For a simple stat call there's nothing to do. We'll do more if
1581 * this is part of a write sequence for a layered image.
1583 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1585 dout("%s: obj %p\n", __func__, obj_request);
1586 obj_request_done_set(obj_request);
1589 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1590 struct ceph_msg *msg)
1592 struct rbd_obj_request *obj_request = osd_req->r_priv;
1595 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1596 rbd_assert(osd_req == obj_request->osd_req);
1597 if (obj_request_img_data_test(obj_request)) {
1598 rbd_assert(obj_request->img_request);
1599 rbd_assert(obj_request->which != BAD_WHICH);
1601 rbd_assert(obj_request->which == BAD_WHICH);
1604 if (osd_req->r_result < 0)
1605 obj_request->result = osd_req->r_result;
1607 BUG_ON(osd_req->r_num_ops > 2);
1610 * We support a 64-bit length, but ultimately it has to be
1611 * passed to blk_end_request(), which takes an unsigned int.
1613 obj_request->xferred = osd_req->r_reply_op_len[0];
1614 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1615 opcode = osd_req->r_ops[0].op;
1617 case CEPH_OSD_OP_READ:
1618 rbd_osd_read_callback(obj_request);
1620 case CEPH_OSD_OP_WRITE:
1621 rbd_osd_write_callback(obj_request);
1623 case CEPH_OSD_OP_STAT:
1624 rbd_osd_stat_callback(obj_request);
1626 case CEPH_OSD_OP_CALL:
1627 case CEPH_OSD_OP_NOTIFY_ACK:
1628 case CEPH_OSD_OP_WATCH:
1629 rbd_osd_trivial_callback(obj_request);
1632 rbd_warn(NULL, "%s: unsupported op %hu\n",
1633 obj_request->object_name, (unsigned short) opcode);
1637 if (obj_request_done_test(obj_request))
1638 rbd_obj_request_complete(obj_request);
1641 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1643 struct rbd_img_request *img_request = obj_request->img_request;
1644 struct ceph_osd_request *osd_req = obj_request->osd_req;
1647 rbd_assert(osd_req != NULL);
1649 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1650 ceph_osdc_build_request(osd_req, obj_request->offset,
1651 NULL, snap_id, NULL);
1654 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1656 struct rbd_img_request *img_request = obj_request->img_request;
1657 struct ceph_osd_request *osd_req = obj_request->osd_req;
1658 struct ceph_snap_context *snapc;
1659 struct timespec mtime = CURRENT_TIME;
1661 rbd_assert(osd_req != NULL);
1663 snapc = img_request ? img_request->snapc : NULL;
1664 ceph_osdc_build_request(osd_req, obj_request->offset,
1665 snapc, CEPH_NOSNAP, &mtime);
1668 static struct ceph_osd_request *rbd_osd_req_create(
1669 struct rbd_device *rbd_dev,
1671 struct rbd_obj_request *obj_request)
1673 struct ceph_snap_context *snapc = NULL;
1674 struct ceph_osd_client *osdc;
1675 struct ceph_osd_request *osd_req;
1677 if (obj_request_img_data_test(obj_request)) {
1678 struct rbd_img_request *img_request = obj_request->img_request;
1680 rbd_assert(write_request ==
1681 img_request_write_test(img_request));
1683 snapc = img_request->snapc;
1686 /* Allocate and initialize the request, for the single op */
1688 osdc = &rbd_dev->rbd_client->client->osdc;
1689 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1691 return NULL; /* ENOMEM */
1694 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1696 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1698 osd_req->r_callback = rbd_osd_req_callback;
1699 osd_req->r_priv = obj_request;
1701 osd_req->r_oid_len = strlen(obj_request->object_name);
1702 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1703 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1705 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1711 * Create a copyup osd request based on the information in the
1712 * object request supplied. A copyup request has two osd ops,
1713 * a copyup method call, and a "normal" write request.
1715 static struct ceph_osd_request *
1716 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1718 struct rbd_img_request *img_request;
1719 struct ceph_snap_context *snapc;
1720 struct rbd_device *rbd_dev;
1721 struct ceph_osd_client *osdc;
1722 struct ceph_osd_request *osd_req;
1724 rbd_assert(obj_request_img_data_test(obj_request));
1725 img_request = obj_request->img_request;
1726 rbd_assert(img_request);
1727 rbd_assert(img_request_write_test(img_request));
1729 /* Allocate and initialize the request, for the two ops */
1731 snapc = img_request->snapc;
1732 rbd_dev = img_request->rbd_dev;
1733 osdc = &rbd_dev->rbd_client->client->osdc;
1734 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1736 return NULL; /* ENOMEM */
1738 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1739 osd_req->r_callback = rbd_osd_req_callback;
1740 osd_req->r_priv = obj_request;
1742 osd_req->r_oid_len = strlen(obj_request->object_name);
1743 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1744 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1746 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1752 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1754 ceph_osdc_put_request(osd_req);
1757 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1759 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1760 u64 offset, u64 length,
1761 enum obj_request_type type)
1763 struct rbd_obj_request *obj_request;
1767 rbd_assert(obj_request_type_valid(type));
1769 size = strlen(object_name) + 1;
1770 name = kmalloc(size, GFP_KERNEL);
1774 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1780 obj_request->object_name = memcpy(name, object_name, size);
1781 obj_request->offset = offset;
1782 obj_request->length = length;
1783 obj_request->flags = 0;
1784 obj_request->which = BAD_WHICH;
1785 obj_request->type = type;
1786 INIT_LIST_HEAD(&obj_request->links);
1787 init_completion(&obj_request->completion);
1788 kref_init(&obj_request->kref);
1790 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1791 offset, length, (int)type, obj_request);
1796 static void rbd_obj_request_destroy(struct kref *kref)
1798 struct rbd_obj_request *obj_request;
1800 obj_request = container_of(kref, struct rbd_obj_request, kref);
1802 dout("%s: obj %p\n", __func__, obj_request);
1804 rbd_assert(obj_request->img_request == NULL);
1805 rbd_assert(obj_request->which == BAD_WHICH);
1807 if (obj_request->osd_req)
1808 rbd_osd_req_destroy(obj_request->osd_req);
1810 rbd_assert(obj_request_type_valid(obj_request->type));
1811 switch (obj_request->type) {
1812 case OBJ_REQUEST_NODATA:
1813 break; /* Nothing to do */
1814 case OBJ_REQUEST_BIO:
1815 if (obj_request->bio_list)
1816 bio_chain_put(obj_request->bio_list);
1818 case OBJ_REQUEST_PAGES:
1819 if (obj_request->pages)
1820 ceph_release_page_vector(obj_request->pages,
1821 obj_request->page_count);
1825 kfree(obj_request->object_name);
1826 obj_request->object_name = NULL;
1827 kmem_cache_free(rbd_obj_request_cache, obj_request);
1831 * Caller is responsible for filling in the list of object requests
1832 * that comprises the image request, and the Linux request pointer
1833 * (if there is one).
1835 static struct rbd_img_request *rbd_img_request_create(
1836 struct rbd_device *rbd_dev,
1837 u64 offset, u64 length,
1841 struct rbd_img_request *img_request;
1843 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1847 if (write_request) {
1848 down_read(&rbd_dev->header_rwsem);
1849 ceph_get_snap_context(rbd_dev->header.snapc);
1850 up_read(&rbd_dev->header_rwsem);
1853 img_request->rq = NULL;
1854 img_request->rbd_dev = rbd_dev;
1855 img_request->offset = offset;
1856 img_request->length = length;
1857 img_request->flags = 0;
1858 if (write_request) {
1859 img_request_write_set(img_request);
1860 img_request->snapc = rbd_dev->header.snapc;
1862 img_request->snap_id = rbd_dev->spec->snap_id;
1865 img_request_child_set(img_request);
1866 if (rbd_dev->parent_spec)
1867 img_request_layered_set(img_request);
1868 spin_lock_init(&img_request->completion_lock);
1869 img_request->next_completion = 0;
1870 img_request->callback = NULL;
1871 img_request->result = 0;
1872 img_request->obj_request_count = 0;
1873 INIT_LIST_HEAD(&img_request->obj_requests);
1874 kref_init(&img_request->kref);
1876 rbd_img_request_get(img_request); /* Avoid a warning */
1877 rbd_img_request_put(img_request); /* TEMPORARY */
1879 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1880 write_request ? "write" : "read", offset, length,
1886 static void rbd_img_request_destroy(struct kref *kref)
1888 struct rbd_img_request *img_request;
1889 struct rbd_obj_request *obj_request;
1890 struct rbd_obj_request *next_obj_request;
1892 img_request = container_of(kref, struct rbd_img_request, kref);
1894 dout("%s: img %p\n", __func__, img_request);
1896 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1897 rbd_img_obj_request_del(img_request, obj_request);
1898 rbd_assert(img_request->obj_request_count == 0);
1900 if (img_request_write_test(img_request))
1901 ceph_put_snap_context(img_request->snapc);
1903 if (img_request_child_test(img_request))
1904 rbd_obj_request_put(img_request->obj_request);
1906 kmem_cache_free(rbd_img_request_cache, img_request);
1909 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1911 struct rbd_img_request *img_request;
1912 unsigned int xferred;
1916 rbd_assert(obj_request_img_data_test(obj_request));
1917 img_request = obj_request->img_request;
1919 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1920 xferred = (unsigned int)obj_request->xferred;
1921 result = obj_request->result;
1923 struct rbd_device *rbd_dev = img_request->rbd_dev;
1925 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1926 img_request_write_test(img_request) ? "write" : "read",
1927 obj_request->length, obj_request->img_offset,
1928 obj_request->offset);
1929 rbd_warn(rbd_dev, " result %d xferred %x\n",
1931 if (!img_request->result)
1932 img_request->result = result;
1935 /* Image object requests don't own their page array */
1937 if (obj_request->type == OBJ_REQUEST_PAGES) {
1938 obj_request->pages = NULL;
1939 obj_request->page_count = 0;
1942 if (img_request_child_test(img_request)) {
1943 rbd_assert(img_request->obj_request != NULL);
1944 more = obj_request->which < img_request->obj_request_count - 1;
1946 rbd_assert(img_request->rq != NULL);
1947 more = blk_end_request(img_request->rq, result, xferred);
1953 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1955 struct rbd_img_request *img_request;
1956 u32 which = obj_request->which;
1959 rbd_assert(obj_request_img_data_test(obj_request));
1960 img_request = obj_request->img_request;
1962 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1963 rbd_assert(img_request != NULL);
1964 rbd_assert(img_request->obj_request_count > 0);
1965 rbd_assert(which != BAD_WHICH);
1966 rbd_assert(which < img_request->obj_request_count);
1967 rbd_assert(which >= img_request->next_completion);
1969 spin_lock_irq(&img_request->completion_lock);
1970 if (which != img_request->next_completion)
1973 for_each_obj_request_from(img_request, obj_request) {
1975 rbd_assert(which < img_request->obj_request_count);
1977 if (!obj_request_done_test(obj_request))
1979 more = rbd_img_obj_end_request(obj_request);
1983 rbd_assert(more ^ (which == img_request->obj_request_count));
1984 img_request->next_completion = which;
1986 spin_unlock_irq(&img_request->completion_lock);
1989 rbd_img_request_complete(img_request);
1993 * Split up an image request into one or more object requests, each
1994 * to a different object. The "type" parameter indicates whether
1995 * "data_desc" is the pointer to the head of a list of bio
1996 * structures, or the base of a page array. In either case this
1997 * function assumes data_desc describes memory sufficient to hold
1998 * all data described by the image request.
2000 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2001 enum obj_request_type type,
2004 struct rbd_device *rbd_dev = img_request->rbd_dev;
2005 struct rbd_obj_request *obj_request = NULL;
2006 struct rbd_obj_request *next_obj_request;
2007 bool write_request = img_request_write_test(img_request);
2008 struct bio *bio_list;
2009 unsigned int bio_offset = 0;
2010 struct page **pages;
2015 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2016 (int)type, data_desc);
2018 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2019 img_offset = img_request->offset;
2020 resid = img_request->length;
2021 rbd_assert(resid > 0);
2023 if (type == OBJ_REQUEST_BIO) {
2024 bio_list = data_desc;
2025 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2027 rbd_assert(type == OBJ_REQUEST_PAGES);
2032 struct ceph_osd_request *osd_req;
2033 const char *object_name;
2037 object_name = rbd_segment_name(rbd_dev, img_offset);
2040 offset = rbd_segment_offset(rbd_dev, img_offset);
2041 length = rbd_segment_length(rbd_dev, img_offset, resid);
2042 obj_request = rbd_obj_request_create(object_name,
2043 offset, length, type);
2044 /* object request has its own copy of the object name */
2045 rbd_segment_name_free(object_name);
2049 if (type == OBJ_REQUEST_BIO) {
2050 unsigned int clone_size;
2052 rbd_assert(length <= (u64)UINT_MAX);
2053 clone_size = (unsigned int)length;
2054 obj_request->bio_list =
2055 bio_chain_clone_range(&bio_list,
2059 if (!obj_request->bio_list)
2062 unsigned int page_count;
2064 obj_request->pages = pages;
2065 page_count = (u32)calc_pages_for(offset, length);
2066 obj_request->page_count = page_count;
2067 if ((offset + length) & ~PAGE_MASK)
2068 page_count--; /* more on last page */
2069 pages += page_count;
2072 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2076 obj_request->osd_req = osd_req;
2077 obj_request->callback = rbd_img_obj_callback;
2079 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2081 if (type == OBJ_REQUEST_BIO)
2082 osd_req_op_extent_osd_data_bio(osd_req, 0,
2083 obj_request->bio_list, length);
2085 osd_req_op_extent_osd_data_pages(osd_req, 0,
2086 obj_request->pages, length,
2087 offset & ~PAGE_MASK, false, false);
2090 rbd_osd_req_format_write(obj_request);
2092 rbd_osd_req_format_read(obj_request);
2094 obj_request->img_offset = img_offset;
2095 rbd_img_obj_request_add(img_request, obj_request);
2097 img_offset += length;
2104 rbd_obj_request_put(obj_request);
2106 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2107 rbd_obj_request_put(obj_request);
2113 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2115 struct rbd_img_request *img_request;
2116 struct rbd_device *rbd_dev;
2120 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2121 rbd_assert(obj_request_img_data_test(obj_request));
2122 img_request = obj_request->img_request;
2123 rbd_assert(img_request);
2125 rbd_dev = img_request->rbd_dev;
2126 rbd_assert(rbd_dev);
2127 length = (u64)1 << rbd_dev->header.obj_order;
2128 page_count = (u32)calc_pages_for(0, length);
2130 rbd_assert(obj_request->copyup_pages);
2131 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2132 obj_request->copyup_pages = NULL;
2135 * We want the transfer count to reflect the size of the
2136 * original write request. There is no such thing as a
2137 * successful short write, so if the request was successful
2138 * we can just set it to the originally-requested length.
2140 if (!obj_request->result)
2141 obj_request->xferred = obj_request->length;
2143 /* Finish up with the normal image object callback */
2145 rbd_img_obj_callback(obj_request);
2149 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2151 struct rbd_obj_request *orig_request;
2152 struct ceph_osd_request *osd_req;
2153 struct ceph_osd_client *osdc;
2154 struct rbd_device *rbd_dev;
2155 struct page **pages;
2160 rbd_assert(img_request_child_test(img_request));
2162 /* First get what we need from the image request */
2164 pages = img_request->copyup_pages;
2165 rbd_assert(pages != NULL);
2166 img_request->copyup_pages = NULL;
2168 orig_request = img_request->obj_request;
2169 rbd_assert(orig_request != NULL);
2170 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2171 result = img_request->result;
2172 obj_size = img_request->length;
2173 xferred = img_request->xferred;
2175 rbd_dev = img_request->rbd_dev;
2176 rbd_assert(rbd_dev);
2177 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2179 rbd_img_request_put(img_request);
2184 /* Allocate the new copyup osd request for the original request */
2187 rbd_assert(!orig_request->osd_req);
2188 osd_req = rbd_osd_req_create_copyup(orig_request);
2191 orig_request->osd_req = osd_req;
2192 orig_request->copyup_pages = pages;
2194 /* Initialize the copyup op */
2196 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2197 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2200 /* Then the original write request op */
2202 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2203 orig_request->offset,
2204 orig_request->length, 0, 0);
2205 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2206 orig_request->length);
2208 rbd_osd_req_format_write(orig_request);
2210 /* All set, send it off. */
2212 orig_request->callback = rbd_img_obj_copyup_callback;
2213 osdc = &rbd_dev->rbd_client->client->osdc;
2214 result = rbd_obj_request_submit(osdc, orig_request);
2218 /* Record the error code and complete the request */
2220 orig_request->result = result;
2221 orig_request->xferred = 0;
2222 obj_request_done_set(orig_request);
2223 rbd_obj_request_complete(orig_request);
2227 * Read from the parent image the range of data that covers the
2228 * entire target of the given object request. This is used for
2229 * satisfying a layered image write request when the target of an
2230 * object request from the image request does not exist.
2232 * A page array big enough to hold the returned data is allocated
2233 * and supplied to rbd_img_request_fill() as the "data descriptor."
2234 * When the read completes, this page array will be transferred to
2235 * the original object request for the copyup operation.
2237 * If an error occurs, record it as the result of the original
2238 * object request and mark it done so it gets completed.
2240 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2242 struct rbd_img_request *img_request = NULL;
2243 struct rbd_img_request *parent_request = NULL;
2244 struct rbd_device *rbd_dev;
2247 struct page **pages = NULL;
2251 rbd_assert(obj_request_img_data_test(obj_request));
2252 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2254 img_request = obj_request->img_request;
2255 rbd_assert(img_request != NULL);
2256 rbd_dev = img_request->rbd_dev;
2257 rbd_assert(rbd_dev->parent != NULL);
2260 * First things first. The original osd request is of no
2261 * use to use any more, we'll need a new one that can hold
2262 * the two ops in a copyup request. We'll get that later,
2263 * but for now we can release the old one.
2265 rbd_osd_req_destroy(obj_request->osd_req);
2266 obj_request->osd_req = NULL;
2269 * Determine the byte range covered by the object in the
2270 * child image to which the original request was to be sent.
2272 img_offset = obj_request->img_offset - obj_request->offset;
2273 length = (u64)1 << rbd_dev->header.obj_order;
2276 * There is no defined parent data beyond the parent
2277 * overlap, so limit what we read at that boundary if
2280 if (img_offset + length > rbd_dev->parent_overlap) {
2281 rbd_assert(img_offset < rbd_dev->parent_overlap);
2282 length = rbd_dev->parent_overlap - img_offset;
2286 * Allocate a page array big enough to receive the data read
2289 page_count = (u32)calc_pages_for(0, length);
2290 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2291 if (IS_ERR(pages)) {
2292 result = PTR_ERR(pages);
2298 parent_request = rbd_img_request_create(rbd_dev->parent,
2301 if (!parent_request)
2303 rbd_obj_request_get(obj_request);
2304 parent_request->obj_request = obj_request;
2306 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2309 parent_request->copyup_pages = pages;
2311 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2312 result = rbd_img_request_submit(parent_request);
2316 parent_request->copyup_pages = NULL;
2317 parent_request->obj_request = NULL;
2318 rbd_obj_request_put(obj_request);
2321 ceph_release_page_vector(pages, page_count);
2323 rbd_img_request_put(parent_request);
2324 obj_request->result = result;
2325 obj_request->xferred = 0;
2326 obj_request_done_set(obj_request);
2331 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2333 struct rbd_obj_request *orig_request;
2336 rbd_assert(!obj_request_img_data_test(obj_request));
2339 * All we need from the object request is the original
2340 * request and the result of the STAT op. Grab those, then
2341 * we're done with the request.
2343 orig_request = obj_request->obj_request;
2344 obj_request->obj_request = NULL;
2345 rbd_assert(orig_request);
2346 rbd_assert(orig_request->img_request);
2348 result = obj_request->result;
2349 obj_request->result = 0;
2351 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2352 obj_request, orig_request, result,
2353 obj_request->xferred, obj_request->length);
2354 rbd_obj_request_put(obj_request);
2356 rbd_assert(orig_request);
2357 rbd_assert(orig_request->img_request);
2360 * Our only purpose here is to determine whether the object
2361 * exists, and we don't want to treat the non-existence as
2362 * an error. If something else comes back, transfer the
2363 * error to the original request and complete it now.
2366 obj_request_existence_set(orig_request, true);
2367 } else if (result == -ENOENT) {
2368 obj_request_existence_set(orig_request, false);
2369 } else if (result) {
2370 orig_request->result = result;
2375 * Resubmit the original request now that we have recorded
2376 * whether the target object exists.
2378 orig_request->result = rbd_img_obj_request_submit(orig_request);
2380 if (orig_request->result)
2381 rbd_obj_request_complete(orig_request);
2382 rbd_obj_request_put(orig_request);
2385 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2387 struct rbd_obj_request *stat_request;
2388 struct rbd_device *rbd_dev;
2389 struct ceph_osd_client *osdc;
2390 struct page **pages = NULL;
2396 * The response data for a STAT call consists of:
2403 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2404 page_count = (u32)calc_pages_for(0, size);
2405 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2407 return PTR_ERR(pages);
2410 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2415 rbd_obj_request_get(obj_request);
2416 stat_request->obj_request = obj_request;
2417 stat_request->pages = pages;
2418 stat_request->page_count = page_count;
2420 rbd_assert(obj_request->img_request);
2421 rbd_dev = obj_request->img_request->rbd_dev;
2422 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2424 if (!stat_request->osd_req)
2426 stat_request->callback = rbd_img_obj_exists_callback;
2428 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2429 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2431 rbd_osd_req_format_read(stat_request);
2433 osdc = &rbd_dev->rbd_client->client->osdc;
2434 ret = rbd_obj_request_submit(osdc, stat_request);
2437 rbd_obj_request_put(obj_request);
2442 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2444 struct rbd_img_request *img_request;
2445 struct rbd_device *rbd_dev;
2448 rbd_assert(obj_request_img_data_test(obj_request));
2450 img_request = obj_request->img_request;
2451 rbd_assert(img_request);
2452 rbd_dev = img_request->rbd_dev;
2455 * Only writes to layered images need special handling.
2456 * Reads and non-layered writes are simple object requests.
2457 * Layered writes that start beyond the end of the overlap
2458 * with the parent have no parent data, so they too are
2459 * simple object requests. Finally, if the target object is
2460 * known to already exist, its parent data has already been
2461 * copied, so a write to the object can also be handled as a
2462 * simple object request.
2464 if (!img_request_write_test(img_request) ||
2465 !img_request_layered_test(img_request) ||
2466 rbd_dev->parent_overlap <= obj_request->img_offset ||
2467 ((known = obj_request_known_test(obj_request)) &&
2468 obj_request_exists_test(obj_request))) {
2470 struct rbd_device *rbd_dev;
2471 struct ceph_osd_client *osdc;
2473 rbd_dev = obj_request->img_request->rbd_dev;
2474 osdc = &rbd_dev->rbd_client->client->osdc;
2476 return rbd_obj_request_submit(osdc, obj_request);
2480 * It's a layered write. The target object might exist but
2481 * we may not know that yet. If we know it doesn't exist,
2482 * start by reading the data for the full target object from
2483 * the parent so we can use it for a copyup to the target.
2486 return rbd_img_obj_parent_read_full(obj_request);
2488 /* We don't know whether the target exists. Go find out. */
2490 return rbd_img_obj_exists_submit(obj_request);
2493 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2495 struct rbd_obj_request *obj_request;
2496 struct rbd_obj_request *next_obj_request;
2498 dout("%s: img %p\n", __func__, img_request);
2499 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2502 ret = rbd_img_obj_request_submit(obj_request);
2510 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2512 struct rbd_obj_request *obj_request;
2513 struct rbd_device *rbd_dev;
2516 rbd_assert(img_request_child_test(img_request));
2518 obj_request = img_request->obj_request;
2519 rbd_assert(obj_request);
2520 rbd_assert(obj_request->img_request);
2522 obj_request->result = img_request->result;
2523 if (obj_request->result)
2527 * We need to zero anything beyond the parent overlap
2528 * boundary. Since rbd_img_obj_request_read_callback()
2529 * will zero anything beyond the end of a short read, an
2530 * easy way to do this is to pretend the data from the
2531 * parent came up short--ending at the overlap boundary.
2533 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2534 obj_end = obj_request->img_offset + obj_request->length;
2535 rbd_dev = obj_request->img_request->rbd_dev;
2536 if (obj_end > rbd_dev->parent_overlap) {
2539 if (obj_request->img_offset < rbd_dev->parent_overlap)
2540 xferred = rbd_dev->parent_overlap -
2541 obj_request->img_offset;
2543 obj_request->xferred = min(img_request->xferred, xferred);
2545 obj_request->xferred = img_request->xferred;
2548 rbd_img_request_put(img_request);
2549 rbd_img_obj_request_read_callback(obj_request);
2550 rbd_obj_request_complete(obj_request);
2553 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2555 struct rbd_device *rbd_dev;
2556 struct rbd_img_request *img_request;
2559 rbd_assert(obj_request_img_data_test(obj_request));
2560 rbd_assert(obj_request->img_request != NULL);
2561 rbd_assert(obj_request->result == (s32) -ENOENT);
2562 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2564 rbd_dev = obj_request->img_request->rbd_dev;
2565 rbd_assert(rbd_dev->parent != NULL);
2566 /* rbd_read_finish(obj_request, obj_request->length); */
2567 img_request = rbd_img_request_create(rbd_dev->parent,
2568 obj_request->img_offset,
2569 obj_request->length,
2575 rbd_obj_request_get(obj_request);
2576 img_request->obj_request = obj_request;
2578 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2579 obj_request->bio_list);
2583 img_request->callback = rbd_img_parent_read_callback;
2584 result = rbd_img_request_submit(img_request);
2591 rbd_img_request_put(img_request);
2592 obj_request->result = result;
2593 obj_request->xferred = 0;
2594 obj_request_done_set(obj_request);
2597 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
2599 struct rbd_obj_request *obj_request;
2600 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2603 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2604 OBJ_REQUEST_NODATA);
2609 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2610 if (!obj_request->osd_req)
2612 obj_request->callback = rbd_obj_request_put;
2614 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2616 rbd_osd_req_format_read(obj_request);
2618 ret = rbd_obj_request_submit(osdc, obj_request);
2621 rbd_obj_request_put(obj_request);
2626 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2628 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2634 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2635 rbd_dev->header_name, (unsigned long long)notify_id,
2636 (unsigned int)opcode);
2637 ret = rbd_dev_refresh(rbd_dev);
2639 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2641 rbd_obj_notify_ack(rbd_dev, notify_id);
2645 * Request sync osd watch/unwatch. The value of "start" determines
2646 * whether a watch request is being initiated or torn down.
2648 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2650 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2651 struct rbd_obj_request *obj_request;
2654 rbd_assert(start ^ !!rbd_dev->watch_event);
2655 rbd_assert(start ^ !!rbd_dev->watch_request);
2658 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2659 &rbd_dev->watch_event);
2662 rbd_assert(rbd_dev->watch_event != NULL);
2666 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2667 OBJ_REQUEST_NODATA);
2671 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2672 if (!obj_request->osd_req)
2676 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2678 ceph_osdc_unregister_linger_request(osdc,
2679 rbd_dev->watch_request->osd_req);
2681 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2682 rbd_dev->watch_event->cookie, 0, start);
2683 rbd_osd_req_format_write(obj_request);
2685 ret = rbd_obj_request_submit(osdc, obj_request);
2688 ret = rbd_obj_request_wait(obj_request);
2691 ret = obj_request->result;
2696 * A watch request is set to linger, so the underlying osd
2697 * request won't go away until we unregister it. We retain
2698 * a pointer to the object request during that time (in
2699 * rbd_dev->watch_request), so we'll keep a reference to
2700 * it. We'll drop that reference (below) after we've
2704 rbd_dev->watch_request = obj_request;
2709 /* We have successfully torn down the watch request */
2711 rbd_obj_request_put(rbd_dev->watch_request);
2712 rbd_dev->watch_request = NULL;
2714 /* Cancel the event if we're tearing down, or on error */
2715 ceph_osdc_cancel_event(rbd_dev->watch_event);
2716 rbd_dev->watch_event = NULL;
2718 rbd_obj_request_put(obj_request);
2724 * Synchronous osd object method call. Returns the number of bytes
2725 * returned in the outbound buffer, or a negative error code.
2727 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2728 const char *object_name,
2729 const char *class_name,
2730 const char *method_name,
2731 const void *outbound,
2732 size_t outbound_size,
2734 size_t inbound_size)
2736 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2737 struct rbd_obj_request *obj_request;
2738 struct page **pages;
2743 * Method calls are ultimately read operations. The result
2744 * should placed into the inbound buffer provided. They
2745 * also supply outbound data--parameters for the object
2746 * method. Currently if this is present it will be a
2749 page_count = (u32)calc_pages_for(0, inbound_size);
2750 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2752 return PTR_ERR(pages);
2755 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2760 obj_request->pages = pages;
2761 obj_request->page_count = page_count;
2763 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2764 if (!obj_request->osd_req)
2767 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2768 class_name, method_name);
2769 if (outbound_size) {
2770 struct ceph_pagelist *pagelist;
2772 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2776 ceph_pagelist_init(pagelist);
2777 ceph_pagelist_append(pagelist, outbound, outbound_size);
2778 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2781 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2782 obj_request->pages, inbound_size,
2784 rbd_osd_req_format_read(obj_request);
2786 ret = rbd_obj_request_submit(osdc, obj_request);
2789 ret = rbd_obj_request_wait(obj_request);
2793 ret = obj_request->result;
2797 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2798 ret = (int)obj_request->xferred;
2799 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2802 rbd_obj_request_put(obj_request);
2804 ceph_release_page_vector(pages, page_count);
2809 static void rbd_request_fn(struct request_queue *q)
2810 __releases(q->queue_lock) __acquires(q->queue_lock)
2812 struct rbd_device *rbd_dev = q->queuedata;
2813 bool read_only = rbd_dev->mapping.read_only;
2817 while ((rq = blk_fetch_request(q))) {
2818 bool write_request = rq_data_dir(rq) == WRITE;
2819 struct rbd_img_request *img_request;
2823 /* Ignore any non-FS requests that filter through. */
2825 if (rq->cmd_type != REQ_TYPE_FS) {
2826 dout("%s: non-fs request type %d\n", __func__,
2827 (int) rq->cmd_type);
2828 __blk_end_request_all(rq, 0);
2832 /* Ignore/skip any zero-length requests */
2834 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2835 length = (u64) blk_rq_bytes(rq);
2838 dout("%s: zero-length request\n", __func__);
2839 __blk_end_request_all(rq, 0);
2843 spin_unlock_irq(q->queue_lock);
2845 /* Disallow writes to a read-only device */
2847 if (write_request) {
2851 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2855 * Quit early if the mapped snapshot no longer
2856 * exists. It's still possible the snapshot will
2857 * have disappeared by the time our request arrives
2858 * at the osd, but there's no sense in sending it if
2861 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2862 dout("request for non-existent snapshot");
2863 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2869 if (offset && length > U64_MAX - offset + 1) {
2870 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2872 goto end_request; /* Shouldn't happen */
2876 if (offset + length > rbd_dev->mapping.size) {
2877 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2878 offset, length, rbd_dev->mapping.size);
2883 img_request = rbd_img_request_create(rbd_dev, offset, length,
2884 write_request, false);
2888 img_request->rq = rq;
2890 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2893 result = rbd_img_request_submit(img_request);
2895 rbd_img_request_put(img_request);
2897 spin_lock_irq(q->queue_lock);
2899 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2900 write_request ? "write" : "read",
2901 length, offset, result);
2903 __blk_end_request_all(rq, result);
2909 * a queue callback. Makes sure that we don't create a bio that spans across
2910 * multiple osd objects. One exception would be with a single page bios,
2911 * which we handle later at bio_chain_clone_range()
2913 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2914 struct bio_vec *bvec)
2916 struct rbd_device *rbd_dev = q->queuedata;
2917 sector_t sector_offset;
2918 sector_t sectors_per_obj;
2919 sector_t obj_sector_offset;
2923 * Find how far into its rbd object the partition-relative
2924 * bio start sector is to offset relative to the enclosing
2927 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2928 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2929 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2932 * Compute the number of bytes from that offset to the end
2933 * of the object. Account for what's already used by the bio.
2935 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2936 if (ret > bmd->bi_size)
2937 ret -= bmd->bi_size;
2942 * Don't send back more than was asked for. And if the bio
2943 * was empty, let the whole thing through because: "Note
2944 * that a block device *must* allow a single page to be
2945 * added to an empty bio."
2947 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2948 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2949 ret = (int) bvec->bv_len;
2954 static void rbd_free_disk(struct rbd_device *rbd_dev)
2956 struct gendisk *disk = rbd_dev->disk;
2961 rbd_dev->disk = NULL;
2962 if (disk->flags & GENHD_FL_UP) {
2965 blk_cleanup_queue(disk->queue);
2970 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2971 const char *object_name,
2972 u64 offset, u64 length, void *buf)
2975 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2976 struct rbd_obj_request *obj_request;
2977 struct page **pages = NULL;
2982 page_count = (u32) calc_pages_for(offset, length);
2983 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2985 ret = PTR_ERR(pages);
2988 obj_request = rbd_obj_request_create(object_name, offset, length,
2993 obj_request->pages = pages;
2994 obj_request->page_count = page_count;
2996 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2997 if (!obj_request->osd_req)
3000 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3001 offset, length, 0, 0);
3002 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3004 obj_request->length,
3005 obj_request->offset & ~PAGE_MASK,
3007 rbd_osd_req_format_read(obj_request);
3009 ret = rbd_obj_request_submit(osdc, obj_request);
3012 ret = rbd_obj_request_wait(obj_request);
3016 ret = obj_request->result;
3020 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3021 size = (size_t) obj_request->xferred;
3022 ceph_copy_from_page_vector(pages, buf, 0, size);
3023 rbd_assert(size <= (size_t)INT_MAX);
3027 rbd_obj_request_put(obj_request);
3029 ceph_release_page_vector(pages, page_count);
3035 * Read the complete header for the given rbd device.
3037 * Returns a pointer to a dynamically-allocated buffer containing
3038 * the complete and validated header. Caller can pass the address
3039 * of a variable that will be filled in with the version of the
3040 * header object at the time it was read.
3042 * Returns a pointer-coded errno if a failure occurs.
3044 static struct rbd_image_header_ondisk *
3045 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
3047 struct rbd_image_header_ondisk *ondisk = NULL;
3054 * The complete header will include an array of its 64-bit
3055 * snapshot ids, followed by the names of those snapshots as
3056 * a contiguous block of NUL-terminated strings. Note that
3057 * the number of snapshots could change by the time we read
3058 * it in, in which case we re-read it.
3065 size = sizeof (*ondisk);
3066 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3068 ondisk = kmalloc(size, GFP_KERNEL);
3070 return ERR_PTR(-ENOMEM);
3072 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3076 if ((size_t)ret < size) {
3078 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3082 if (!rbd_dev_ondisk_valid(ondisk)) {
3084 rbd_warn(rbd_dev, "invalid header");
3088 names_size = le64_to_cpu(ondisk->snap_names_len);
3089 want_count = snap_count;
3090 snap_count = le32_to_cpu(ondisk->snap_count);
3091 } while (snap_count != want_count);
3098 return ERR_PTR(ret);
3102 * reload the ondisk the header
3104 static int rbd_read_header(struct rbd_device *rbd_dev,
3105 struct rbd_image_header *header)
3107 struct rbd_image_header_ondisk *ondisk;
3110 ondisk = rbd_dev_v1_header_read(rbd_dev);
3112 return PTR_ERR(ondisk);
3113 ret = rbd_header_from_disk(header, ondisk);
3120 * only read the first part of the ondisk header, without the snaps info
3122 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
3125 struct rbd_image_header h;
3127 memset(&h, 0, sizeof (h));
3128 ret = rbd_read_header(rbd_dev, &h);
3132 down_write(&rbd_dev->header_rwsem);
3134 /* Update image size, and check for resize of mapped image */
3135 rbd_dev->header.image_size = h.image_size;
3136 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3137 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3138 rbd_dev->mapping.size = rbd_dev->header.image_size;
3140 /* rbd_dev->header.object_prefix shouldn't change */
3141 kfree(rbd_dev->header.snap_sizes);
3142 kfree(rbd_dev->header.snap_names);
3143 /* osd requests may still refer to snapc */
3144 ceph_put_snap_context(rbd_dev->header.snapc);
3146 rbd_dev->header.image_size = h.image_size;
3147 rbd_dev->header.snapc = h.snapc;
3148 rbd_dev->header.snap_names = h.snap_names;
3149 rbd_dev->header.snap_sizes = h.snap_sizes;
3150 /* Free the extra copy of the object prefix */
3151 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3152 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3153 kfree(h.object_prefix);
3155 up_write(&rbd_dev->header_rwsem);
3161 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3162 * has disappeared from the (just updated) snapshot context.
3164 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3168 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3171 snap_id = rbd_dev->spec->snap_id;
3172 if (snap_id == CEPH_NOSNAP)
3175 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3176 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3179 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3184 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3185 mapping_size = rbd_dev->mapping.size;
3186 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3187 if (rbd_dev->image_format == 1)
3188 ret = rbd_dev_v1_refresh(rbd_dev);
3190 ret = rbd_dev_v2_refresh(rbd_dev);
3192 /* If it's a mapped snapshot, validate its EXISTS flag */
3194 rbd_exists_validate(rbd_dev);
3195 mutex_unlock(&ctl_mutex);
3196 if (mapping_size != rbd_dev->mapping.size) {
3199 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3200 dout("setting size to %llu sectors", (unsigned long long)size);
3201 set_capacity(rbd_dev->disk, size);
3202 revalidate_disk(rbd_dev->disk);
3208 static int rbd_init_disk(struct rbd_device *rbd_dev)
3210 struct gendisk *disk;
3211 struct request_queue *q;
3214 /* create gendisk info */
3215 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3219 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3221 disk->major = rbd_dev->major;
3222 disk->first_minor = 0;
3223 disk->fops = &rbd_bd_ops;
3224 disk->private_data = rbd_dev;
3226 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3230 /* We use the default size, but let's be explicit about it. */
3231 blk_queue_physical_block_size(q, SECTOR_SIZE);
3233 /* set io sizes to object size */
3234 segment_size = rbd_obj_bytes(&rbd_dev->header);
3235 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3236 blk_queue_max_segment_size(q, segment_size);
3237 blk_queue_io_min(q, segment_size);
3238 blk_queue_io_opt(q, segment_size);
3240 blk_queue_merge_bvec(q, rbd_merge_bvec);
3243 q->queuedata = rbd_dev;
3245 rbd_dev->disk = disk;
3258 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3260 return container_of(dev, struct rbd_device, dev);
3263 static ssize_t rbd_size_show(struct device *dev,
3264 struct device_attribute *attr, char *buf)
3266 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3268 return sprintf(buf, "%llu\n",
3269 (unsigned long long)rbd_dev->mapping.size);
3273 * Note this shows the features for whatever's mapped, which is not
3274 * necessarily the base image.
3276 static ssize_t rbd_features_show(struct device *dev,
3277 struct device_attribute *attr, char *buf)
3279 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3281 return sprintf(buf, "0x%016llx\n",
3282 (unsigned long long)rbd_dev->mapping.features);
3285 static ssize_t rbd_major_show(struct device *dev,
3286 struct device_attribute *attr, char *buf)
3288 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3291 return sprintf(buf, "%d\n", rbd_dev->major);
3293 return sprintf(buf, "(none)\n");
3297 static ssize_t rbd_client_id_show(struct device *dev,
3298 struct device_attribute *attr, char *buf)
3300 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3302 return sprintf(buf, "client%lld\n",
3303 ceph_client_id(rbd_dev->rbd_client->client));
3306 static ssize_t rbd_pool_show(struct device *dev,
3307 struct device_attribute *attr, char *buf)
3309 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3311 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3314 static ssize_t rbd_pool_id_show(struct device *dev,
3315 struct device_attribute *attr, char *buf)
3317 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3319 return sprintf(buf, "%llu\n",
3320 (unsigned long long) rbd_dev->spec->pool_id);
3323 static ssize_t rbd_name_show(struct device *dev,
3324 struct device_attribute *attr, char *buf)
3326 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3328 if (rbd_dev->spec->image_name)
3329 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3331 return sprintf(buf, "(unknown)\n");
3334 static ssize_t rbd_image_id_show(struct device *dev,
3335 struct device_attribute *attr, char *buf)
3337 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3339 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3343 * Shows the name of the currently-mapped snapshot (or
3344 * RBD_SNAP_HEAD_NAME for the base image).
3346 static ssize_t rbd_snap_show(struct device *dev,
3347 struct device_attribute *attr,
3350 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3352 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3356 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3357 * for the parent image. If there is no parent, simply shows
3358 * "(no parent image)".
3360 static ssize_t rbd_parent_show(struct device *dev,
3361 struct device_attribute *attr,
3364 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3365 struct rbd_spec *spec = rbd_dev->parent_spec;
3370 return sprintf(buf, "(no parent image)\n");
3372 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3373 (unsigned long long) spec->pool_id, spec->pool_name);
3378 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3379 spec->image_name ? spec->image_name : "(unknown)");
3384 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3385 (unsigned long long) spec->snap_id, spec->snap_name);
3390 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3395 return (ssize_t) (bufp - buf);
3398 static ssize_t rbd_image_refresh(struct device *dev,
3399 struct device_attribute *attr,
3403 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3406 ret = rbd_dev_refresh(rbd_dev);
3408 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3410 return ret < 0 ? ret : size;
3413 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3414 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3415 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3416 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3417 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3418 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3419 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3420 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3421 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3422 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3423 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3425 static struct attribute *rbd_attrs[] = {
3426 &dev_attr_size.attr,
3427 &dev_attr_features.attr,
3428 &dev_attr_major.attr,
3429 &dev_attr_client_id.attr,
3430 &dev_attr_pool.attr,
3431 &dev_attr_pool_id.attr,
3432 &dev_attr_name.attr,
3433 &dev_attr_image_id.attr,
3434 &dev_attr_current_snap.attr,
3435 &dev_attr_parent.attr,
3436 &dev_attr_refresh.attr,
3440 static struct attribute_group rbd_attr_group = {
3444 static const struct attribute_group *rbd_attr_groups[] = {
3449 static void rbd_sysfs_dev_release(struct device *dev)
3453 static struct device_type rbd_device_type = {
3455 .groups = rbd_attr_groups,
3456 .release = rbd_sysfs_dev_release,
3459 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3461 kref_get(&spec->kref);
3466 static void rbd_spec_free(struct kref *kref);
3467 static void rbd_spec_put(struct rbd_spec *spec)
3470 kref_put(&spec->kref, rbd_spec_free);
3473 static struct rbd_spec *rbd_spec_alloc(void)
3475 struct rbd_spec *spec;
3477 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3480 kref_init(&spec->kref);
3485 static void rbd_spec_free(struct kref *kref)
3487 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3489 kfree(spec->pool_name);
3490 kfree(spec->image_id);
3491 kfree(spec->image_name);
3492 kfree(spec->snap_name);
3496 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3497 struct rbd_spec *spec)
3499 struct rbd_device *rbd_dev;
3501 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3505 spin_lock_init(&rbd_dev->lock);
3507 INIT_LIST_HEAD(&rbd_dev->node);
3508 init_rwsem(&rbd_dev->header_rwsem);
3510 rbd_dev->spec = spec;
3511 rbd_dev->rbd_client = rbdc;
3513 /* Initialize the layout used for all rbd requests */
3515 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3516 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3517 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3518 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3523 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3525 rbd_put_client(rbd_dev->rbd_client);
3526 rbd_spec_put(rbd_dev->spec);
3531 * Get the size and object order for an image snapshot, or if
3532 * snap_id is CEPH_NOSNAP, gets this information for the base
3535 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3536 u8 *order, u64 *snap_size)
3538 __le64 snapid = cpu_to_le64(snap_id);
3543 } __attribute__ ((packed)) size_buf = { 0 };
3545 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3547 &snapid, sizeof (snapid),
3548 &size_buf, sizeof (size_buf));
3549 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3552 if (ret < sizeof (size_buf))
3556 *order = size_buf.order;
3557 *snap_size = le64_to_cpu(size_buf.size);
3559 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3560 (unsigned long long)snap_id, (unsigned int)*order,
3561 (unsigned long long)*snap_size);
3566 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3568 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3569 &rbd_dev->header.obj_order,
3570 &rbd_dev->header.image_size);
3573 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3579 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3583 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3584 "rbd", "get_object_prefix", NULL, 0,
3585 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3586 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3591 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3592 p + ret, NULL, GFP_NOIO);
3595 if (IS_ERR(rbd_dev->header.object_prefix)) {
3596 ret = PTR_ERR(rbd_dev->header.object_prefix);
3597 rbd_dev->header.object_prefix = NULL;
3599 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3607 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3610 __le64 snapid = cpu_to_le64(snap_id);
3614 } __attribute__ ((packed)) features_buf = { 0 };
3618 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3619 "rbd", "get_features",
3620 &snapid, sizeof (snapid),
3621 &features_buf, sizeof (features_buf));
3622 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3625 if (ret < sizeof (features_buf))
3628 incompat = le64_to_cpu(features_buf.incompat);
3629 if (incompat & ~RBD_FEATURES_SUPPORTED)
3632 *snap_features = le64_to_cpu(features_buf.features);
3634 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3635 (unsigned long long)snap_id,
3636 (unsigned long long)*snap_features,
3637 (unsigned long long)le64_to_cpu(features_buf.incompat));
3642 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3644 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3645 &rbd_dev->header.features);
3648 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3650 struct rbd_spec *parent_spec;
3652 void *reply_buf = NULL;
3660 parent_spec = rbd_spec_alloc();
3664 size = sizeof (__le64) + /* pool_id */
3665 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3666 sizeof (__le64) + /* snap_id */
3667 sizeof (__le64); /* overlap */
3668 reply_buf = kmalloc(size, GFP_KERNEL);
3674 snapid = cpu_to_le64(CEPH_NOSNAP);
3675 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3676 "rbd", "get_parent",
3677 &snapid, sizeof (snapid),
3679 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3684 end = reply_buf + ret;
3686 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3687 if (parent_spec->pool_id == CEPH_NOPOOL)
3688 goto out; /* No parent? No problem. */
3690 /* The ceph file layout needs to fit pool id in 32 bits */
3693 if (parent_spec->pool_id > (u64)U32_MAX) {
3694 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3695 (unsigned long long)parent_spec->pool_id, U32_MAX);
3699 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3700 if (IS_ERR(image_id)) {
3701 ret = PTR_ERR(image_id);
3704 parent_spec->image_id = image_id;
3705 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3706 ceph_decode_64_safe(&p, end, overlap, out_err);
3708 rbd_dev->parent_overlap = overlap;
3709 rbd_dev->parent_spec = parent_spec;
3710 parent_spec = NULL; /* rbd_dev now owns this */
3715 rbd_spec_put(parent_spec);
3720 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3724 __le64 stripe_count;
3725 } __attribute__ ((packed)) striping_info_buf = { 0 };
3726 size_t size = sizeof (striping_info_buf);
3733 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3734 "rbd", "get_stripe_unit_count", NULL, 0,
3735 (char *)&striping_info_buf, size);
3736 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3743 * We don't actually support the "fancy striping" feature
3744 * (STRIPINGV2) yet, but if the striping sizes are the
3745 * defaults the behavior is the same as before. So find
3746 * out, and only fail if the image has non-default values.
3749 obj_size = (u64)1 << rbd_dev->header.obj_order;
3750 p = &striping_info_buf;
3751 stripe_unit = ceph_decode_64(&p);
3752 if (stripe_unit != obj_size) {
3753 rbd_warn(rbd_dev, "unsupported stripe unit "
3754 "(got %llu want %llu)",
3755 stripe_unit, obj_size);
3758 stripe_count = ceph_decode_64(&p);
3759 if (stripe_count != 1) {
3760 rbd_warn(rbd_dev, "unsupported stripe count "
3761 "(got %llu want 1)", stripe_count);
3764 rbd_dev->header.stripe_unit = stripe_unit;
3765 rbd_dev->header.stripe_count = stripe_count;
3770 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3772 size_t image_id_size;
3777 void *reply_buf = NULL;
3779 char *image_name = NULL;
3782 rbd_assert(!rbd_dev->spec->image_name);
3784 len = strlen(rbd_dev->spec->image_id);
3785 image_id_size = sizeof (__le32) + len;
3786 image_id = kmalloc(image_id_size, GFP_KERNEL);
3791 end = image_id + image_id_size;
3792 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3794 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3795 reply_buf = kmalloc(size, GFP_KERNEL);
3799 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3800 "rbd", "dir_get_name",
3801 image_id, image_id_size,
3806 end = reply_buf + ret;
3808 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3809 if (IS_ERR(image_name))
3812 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3820 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3822 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3823 const char *snap_name;
3826 /* Skip over names until we find the one we are looking for */
3828 snap_name = rbd_dev->header.snap_names;
3829 while (which < snapc->num_snaps) {
3830 if (!strcmp(name, snap_name))
3831 return snapc->snaps[which];
3832 snap_name += strlen(snap_name) + 1;
3838 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3840 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3845 for (which = 0; !found && which < snapc->num_snaps; which++) {
3846 const char *snap_name;
3848 snap_id = snapc->snaps[which];
3849 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3850 if (IS_ERR(snap_name))
3852 found = !strcmp(name, snap_name);
3855 return found ? snap_id : CEPH_NOSNAP;
3859 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3860 * no snapshot by that name is found, or if an error occurs.
3862 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3864 if (rbd_dev->image_format == 1)
3865 return rbd_v1_snap_id_by_name(rbd_dev, name);
3867 return rbd_v2_snap_id_by_name(rbd_dev, name);
3871 * When an rbd image has a parent image, it is identified by the
3872 * pool, image, and snapshot ids (not names). This function fills
3873 * in the names for those ids. (It's OK if we can't figure out the
3874 * name for an image id, but the pool and snapshot ids should always
3875 * exist and have names.) All names in an rbd spec are dynamically
3878 * When an image being mapped (not a parent) is probed, we have the
3879 * pool name and pool id, image name and image id, and the snapshot
3880 * name. The only thing we're missing is the snapshot id.
3882 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3884 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3885 struct rbd_spec *spec = rbd_dev->spec;
3886 const char *pool_name;
3887 const char *image_name;
3888 const char *snap_name;
3892 * An image being mapped will have the pool name (etc.), but
3893 * we need to look up the snapshot id.
3895 if (spec->pool_name) {
3896 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3899 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3900 if (snap_id == CEPH_NOSNAP)
3902 spec->snap_id = snap_id;
3904 spec->snap_id = CEPH_NOSNAP;
3910 /* Get the pool name; we have to make our own copy of this */
3912 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3914 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3917 pool_name = kstrdup(pool_name, GFP_KERNEL);
3921 /* Fetch the image name; tolerate failure here */
3923 image_name = rbd_dev_image_name(rbd_dev);
3925 rbd_warn(rbd_dev, "unable to get image name");
3927 /* Look up the snapshot name, and make a copy */
3929 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3935 spec->pool_name = pool_name;
3936 spec->image_name = image_name;
3937 spec->snap_name = snap_name;
3947 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
3956 struct ceph_snap_context *snapc;
3960 * We'll need room for the seq value (maximum snapshot id),
3961 * snapshot count, and array of that many snapshot ids.
3962 * For now we have a fixed upper limit on the number we're
3963 * prepared to receive.
3965 size = sizeof (__le64) + sizeof (__le32) +
3966 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3967 reply_buf = kzalloc(size, GFP_KERNEL);
3971 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3972 "rbd", "get_snapcontext", NULL, 0,
3974 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3979 end = reply_buf + ret;
3981 ceph_decode_64_safe(&p, end, seq, out);
3982 ceph_decode_32_safe(&p, end, snap_count, out);
3985 * Make sure the reported number of snapshot ids wouldn't go
3986 * beyond the end of our buffer. But before checking that,
3987 * make sure the computed size of the snapshot context we
3988 * allocate is representable in a size_t.
3990 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3995 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3999 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4005 for (i = 0; i < snap_count; i++)
4006 snapc->snaps[i] = ceph_decode_64(&p);
4008 ceph_put_snap_context(rbd_dev->header.snapc);
4009 rbd_dev->header.snapc = snapc;
4011 dout(" snap context seq = %llu, snap_count = %u\n",
4012 (unsigned long long)seq, (unsigned int)snap_count);
4019 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4030 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4031 reply_buf = kmalloc(size, GFP_KERNEL);
4033 return ERR_PTR(-ENOMEM);
4035 snapid = cpu_to_le64(snap_id);
4036 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4037 "rbd", "get_snapshot_name",
4038 &snapid, sizeof (snapid),
4040 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4042 snap_name = ERR_PTR(ret);
4047 end = reply_buf + ret;
4048 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4049 if (IS_ERR(snap_name))
4052 dout(" snap_id 0x%016llx snap_name = %s\n",
4053 (unsigned long long)snap_id, snap_name);
4060 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
4064 down_write(&rbd_dev->header_rwsem);
4066 ret = rbd_dev_v2_image_size(rbd_dev);
4069 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4070 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4071 rbd_dev->mapping.size = rbd_dev->header.image_size;
4073 ret = rbd_dev_v2_snap_context(rbd_dev);
4074 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4078 up_write(&rbd_dev->header_rwsem);
4083 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4088 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4090 dev = &rbd_dev->dev;
4091 dev->bus = &rbd_bus_type;
4092 dev->type = &rbd_device_type;
4093 dev->parent = &rbd_root_dev;
4094 dev->release = rbd_dev_device_release;
4095 dev_set_name(dev, "%d", rbd_dev->dev_id);
4096 ret = device_register(dev);
4098 mutex_unlock(&ctl_mutex);
4103 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4105 device_unregister(&rbd_dev->dev);
4108 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4111 * Get a unique rbd identifier for the given new rbd_dev, and add
4112 * the rbd_dev to the global list. The minimum rbd id is 1.
4114 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4116 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4118 spin_lock(&rbd_dev_list_lock);
4119 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4120 spin_unlock(&rbd_dev_list_lock);
4121 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4122 (unsigned long long) rbd_dev->dev_id);
4126 * Remove an rbd_dev from the global list, and record that its
4127 * identifier is no longer in use.
4129 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4131 struct list_head *tmp;
4132 int rbd_id = rbd_dev->dev_id;
4135 rbd_assert(rbd_id > 0);
4137 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4138 (unsigned long long) rbd_dev->dev_id);
4139 spin_lock(&rbd_dev_list_lock);
4140 list_del_init(&rbd_dev->node);
4143 * If the id being "put" is not the current maximum, there
4144 * is nothing special we need to do.
4146 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4147 spin_unlock(&rbd_dev_list_lock);
4152 * We need to update the current maximum id. Search the
4153 * list to find out what it is. We're more likely to find
4154 * the maximum at the end, so search the list backward.
4157 list_for_each_prev(tmp, &rbd_dev_list) {
4158 struct rbd_device *rbd_dev;
4160 rbd_dev = list_entry(tmp, struct rbd_device, node);
4161 if (rbd_dev->dev_id > max_id)
4162 max_id = rbd_dev->dev_id;
4164 spin_unlock(&rbd_dev_list_lock);
4167 * The max id could have been updated by rbd_dev_id_get(), in
4168 * which case it now accurately reflects the new maximum.
4169 * Be careful not to overwrite the maximum value in that
4172 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4173 dout(" max dev id has been reset\n");
4177 * Skips over white space at *buf, and updates *buf to point to the
4178 * first found non-space character (if any). Returns the length of
4179 * the token (string of non-white space characters) found. Note
4180 * that *buf must be terminated with '\0'.
4182 static inline size_t next_token(const char **buf)
4185 * These are the characters that produce nonzero for
4186 * isspace() in the "C" and "POSIX" locales.
4188 const char *spaces = " \f\n\r\t\v";
4190 *buf += strspn(*buf, spaces); /* Find start of token */
4192 return strcspn(*buf, spaces); /* Return token length */
4196 * Finds the next token in *buf, and if the provided token buffer is
4197 * big enough, copies the found token into it. The result, if
4198 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4199 * must be terminated with '\0' on entry.
4201 * Returns the length of the token found (not including the '\0').
4202 * Return value will be 0 if no token is found, and it will be >=
4203 * token_size if the token would not fit.
4205 * The *buf pointer will be updated to point beyond the end of the
4206 * found token. Note that this occurs even if the token buffer is
4207 * too small to hold it.
4209 static inline size_t copy_token(const char **buf,
4215 len = next_token(buf);
4216 if (len < token_size) {
4217 memcpy(token, *buf, len);
4218 *(token + len) = '\0';
4226 * Finds the next token in *buf, dynamically allocates a buffer big
4227 * enough to hold a copy of it, and copies the token into the new
4228 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4229 * that a duplicate buffer is created even for a zero-length token.
4231 * Returns a pointer to the newly-allocated duplicate, or a null
4232 * pointer if memory for the duplicate was not available. If
4233 * the lenp argument is a non-null pointer, the length of the token
4234 * (not including the '\0') is returned in *lenp.
4236 * If successful, the *buf pointer will be updated to point beyond
4237 * the end of the found token.
4239 * Note: uses GFP_KERNEL for allocation.
4241 static inline char *dup_token(const char **buf, size_t *lenp)
4246 len = next_token(buf);
4247 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4250 *(dup + len) = '\0';
4260 * Parse the options provided for an "rbd add" (i.e., rbd image
4261 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4262 * and the data written is passed here via a NUL-terminated buffer.
4263 * Returns 0 if successful or an error code otherwise.
4265 * The information extracted from these options is recorded in
4266 * the other parameters which return dynamically-allocated
4269 * The address of a pointer that will refer to a ceph options
4270 * structure. Caller must release the returned pointer using
4271 * ceph_destroy_options() when it is no longer needed.
4273 * Address of an rbd options pointer. Fully initialized by
4274 * this function; caller must release with kfree().
4276 * Address of an rbd image specification pointer. Fully
4277 * initialized by this function based on parsed options.
4278 * Caller must release with rbd_spec_put().
4280 * The options passed take this form:
4281 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4284 * A comma-separated list of one or more monitor addresses.
4285 * A monitor address is an ip address, optionally followed
4286 * by a port number (separated by a colon).
4287 * I.e.: ip1[:port1][,ip2[:port2]...]
4289 * A comma-separated list of ceph and/or rbd options.
4291 * The name of the rados pool containing the rbd image.
4293 * The name of the image in that pool to map.
4295 * An optional snapshot id. If provided, the mapping will
4296 * present data from the image at the time that snapshot was
4297 * created. The image head is used if no snapshot id is
4298 * provided. Snapshot mappings are always read-only.
4300 static int rbd_add_parse_args(const char *buf,
4301 struct ceph_options **ceph_opts,
4302 struct rbd_options **opts,
4303 struct rbd_spec **rbd_spec)
4307 const char *mon_addrs;
4309 size_t mon_addrs_size;
4310 struct rbd_spec *spec = NULL;
4311 struct rbd_options *rbd_opts = NULL;
4312 struct ceph_options *copts;
4315 /* The first four tokens are required */
4317 len = next_token(&buf);
4319 rbd_warn(NULL, "no monitor address(es) provided");
4323 mon_addrs_size = len + 1;
4327 options = dup_token(&buf, NULL);
4331 rbd_warn(NULL, "no options provided");
4335 spec = rbd_spec_alloc();
4339 spec->pool_name = dup_token(&buf, NULL);
4340 if (!spec->pool_name)
4342 if (!*spec->pool_name) {
4343 rbd_warn(NULL, "no pool name provided");
4347 spec->image_name = dup_token(&buf, NULL);
4348 if (!spec->image_name)
4350 if (!*spec->image_name) {
4351 rbd_warn(NULL, "no image name provided");
4356 * Snapshot name is optional; default is to use "-"
4357 * (indicating the head/no snapshot).
4359 len = next_token(&buf);
4361 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4362 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4363 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4364 ret = -ENAMETOOLONG;
4367 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4370 *(snap_name + len) = '\0';
4371 spec->snap_name = snap_name;
4373 /* Initialize all rbd options to the defaults */
4375 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4379 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4381 copts = ceph_parse_options(options, mon_addrs,
4382 mon_addrs + mon_addrs_size - 1,
4383 parse_rbd_opts_token, rbd_opts);
4384 if (IS_ERR(copts)) {
4385 ret = PTR_ERR(copts);
4406 * An rbd format 2 image has a unique identifier, distinct from the
4407 * name given to it by the user. Internally, that identifier is
4408 * what's used to specify the names of objects related to the image.
4410 * A special "rbd id" object is used to map an rbd image name to its
4411 * id. If that object doesn't exist, then there is no v2 rbd image
4412 * with the supplied name.
4414 * This function will record the given rbd_dev's image_id field if
4415 * it can be determined, and in that case will return 0. If any
4416 * errors occur a negative errno will be returned and the rbd_dev's
4417 * image_id field will be unchanged (and should be NULL).
4419 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4428 * When probing a parent image, the image id is already
4429 * known (and the image name likely is not). There's no
4430 * need to fetch the image id again in this case. We
4431 * do still need to set the image format though.
4433 if (rbd_dev->spec->image_id) {
4434 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4440 * First, see if the format 2 image id file exists, and if
4441 * so, get the image's persistent id from it.
4443 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4444 object_name = kmalloc(size, GFP_NOIO);
4447 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4448 dout("rbd id object name is %s\n", object_name);
4450 /* Response will be an encoded string, which includes a length */
4452 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4453 response = kzalloc(size, GFP_NOIO);
4459 /* If it doesn't exist we'll assume it's a format 1 image */
4461 ret = rbd_obj_method_sync(rbd_dev, object_name,
4462 "rbd", "get_id", NULL, 0,
4463 response, RBD_IMAGE_ID_LEN_MAX);
4464 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4465 if (ret == -ENOENT) {
4466 image_id = kstrdup("", GFP_KERNEL);
4467 ret = image_id ? 0 : -ENOMEM;
4469 rbd_dev->image_format = 1;
4470 } else if (ret > sizeof (__le32)) {
4473 image_id = ceph_extract_encoded_string(&p, p + ret,
4475 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4477 rbd_dev->image_format = 2;
4483 rbd_dev->spec->image_id = image_id;
4484 dout("image_id is %s\n", image_id);
4493 /* Undo whatever state changes are made by v1 or v2 image probe */
4495 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4497 struct rbd_image_header *header;
4499 rbd_dev_remove_parent(rbd_dev);
4500 rbd_spec_put(rbd_dev->parent_spec);
4501 rbd_dev->parent_spec = NULL;
4502 rbd_dev->parent_overlap = 0;
4504 /* Free dynamic fields from the header, then zero it out */
4506 header = &rbd_dev->header;
4507 ceph_put_snap_context(header->snapc);
4508 kfree(header->snap_sizes);
4509 kfree(header->snap_names);
4510 kfree(header->object_prefix);
4511 memset(header, 0, sizeof (*header));
4514 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4518 /* Populate rbd image metadata */
4520 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4524 /* Version 1 images have no parent (no layering) */
4526 rbd_dev->parent_spec = NULL;
4527 rbd_dev->parent_overlap = 0;
4529 dout("discovered version 1 image, header name is %s\n",
4530 rbd_dev->header_name);
4535 kfree(rbd_dev->header_name);
4536 rbd_dev->header_name = NULL;
4537 kfree(rbd_dev->spec->image_id);
4538 rbd_dev->spec->image_id = NULL;
4543 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4547 ret = rbd_dev_v2_image_size(rbd_dev);
4551 /* Get the object prefix (a.k.a. block_name) for the image */
4553 ret = rbd_dev_v2_object_prefix(rbd_dev);
4557 /* Get the and check features for the image */
4559 ret = rbd_dev_v2_features(rbd_dev);
4563 /* If the image supports layering, get the parent info */
4565 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4566 ret = rbd_dev_v2_parent_info(rbd_dev);
4570 * Print a warning if this image has a parent.
4571 * Don't print it if the image now being probed
4572 * is itself a parent. We can tell at this point
4573 * because we won't know its pool name yet (just its
4576 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
4577 rbd_warn(rbd_dev, "WARNING: kernel layering "
4578 "is EXPERIMENTAL!");
4581 /* If the image supports fancy striping, get its parameters */
4583 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4584 ret = rbd_dev_v2_striping_info(rbd_dev);
4589 /* crypto and compression type aren't (yet) supported for v2 images */
4591 rbd_dev->header.crypt_type = 0;
4592 rbd_dev->header.comp_type = 0;
4594 /* Get the snapshot context, plus the header version */
4596 ret = rbd_dev_v2_snap_context(rbd_dev);
4600 dout("discovered version 2 image, header name is %s\n",
4601 rbd_dev->header_name);
4605 rbd_dev->parent_overlap = 0;
4606 rbd_spec_put(rbd_dev->parent_spec);
4607 rbd_dev->parent_spec = NULL;
4608 kfree(rbd_dev->header_name);
4609 rbd_dev->header_name = NULL;
4610 kfree(rbd_dev->header.object_prefix);
4611 rbd_dev->header.object_prefix = NULL;
4616 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4618 struct rbd_device *parent = NULL;
4619 struct rbd_spec *parent_spec;
4620 struct rbd_client *rbdc;
4623 if (!rbd_dev->parent_spec)
4626 * We need to pass a reference to the client and the parent
4627 * spec when creating the parent rbd_dev. Images related by
4628 * parent/child relationships always share both.
4630 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4631 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4634 parent = rbd_dev_create(rbdc, parent_spec);
4638 ret = rbd_dev_image_probe(parent, true);
4641 rbd_dev->parent = parent;
4646 rbd_spec_put(rbd_dev->parent_spec);
4647 kfree(rbd_dev->header_name);
4648 rbd_dev_destroy(parent);
4650 rbd_put_client(rbdc);
4651 rbd_spec_put(parent_spec);
4657 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4661 /* generate unique id: find highest unique id, add one */
4662 rbd_dev_id_get(rbd_dev);
4664 /* Fill in the device name, now that we have its id. */
4665 BUILD_BUG_ON(DEV_NAME_LEN
4666 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4667 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4669 /* Get our block major device number. */
4671 ret = register_blkdev(0, rbd_dev->name);
4674 rbd_dev->major = ret;
4676 /* Set up the blkdev mapping. */
4678 ret = rbd_init_disk(rbd_dev);
4680 goto err_out_blkdev;
4682 ret = rbd_dev_mapping_set(rbd_dev);
4685 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4687 ret = rbd_bus_add_dev(rbd_dev);
4689 goto err_out_mapping;
4691 /* Everything's ready. Announce the disk to the world. */
4693 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4694 add_disk(rbd_dev->disk);
4696 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4697 (unsigned long long) rbd_dev->mapping.size);
4702 rbd_dev_mapping_clear(rbd_dev);
4704 rbd_free_disk(rbd_dev);
4706 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4708 rbd_dev_id_put(rbd_dev);
4709 rbd_dev_mapping_clear(rbd_dev);
4714 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4716 struct rbd_spec *spec = rbd_dev->spec;
4719 /* Record the header object name for this rbd image. */
4721 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4723 if (rbd_dev->image_format == 1)
4724 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4726 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4728 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4729 if (!rbd_dev->header_name)
4732 if (rbd_dev->image_format == 1)
4733 sprintf(rbd_dev->header_name, "%s%s",
4734 spec->image_name, RBD_SUFFIX);
4736 sprintf(rbd_dev->header_name, "%s%s",
4737 RBD_HEADER_PREFIX, spec->image_id);
4741 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4745 rbd_dev_unprobe(rbd_dev);
4746 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4748 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4749 kfree(rbd_dev->header_name);
4750 rbd_dev->header_name = NULL;
4751 rbd_dev->image_format = 0;
4752 kfree(rbd_dev->spec->image_id);
4753 rbd_dev->spec->image_id = NULL;
4755 rbd_dev_destroy(rbd_dev);
4759 * Probe for the existence of the header object for the given rbd
4760 * device. For format 2 images this includes determining the image
4763 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
4769 * Get the id from the image id object. If it's not a
4770 * format 2 image, we'll get ENOENT back, and we'll assume
4771 * it's a format 1 image.
4773 ret = rbd_dev_image_id(rbd_dev);
4776 rbd_assert(rbd_dev->spec->image_id);
4777 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4779 ret = rbd_dev_header_name(rbd_dev);
4781 goto err_out_format;
4783 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4785 goto out_header_name;
4787 if (rbd_dev->image_format == 1)
4788 ret = rbd_dev_v1_probe(rbd_dev);
4790 ret = rbd_dev_v2_probe(rbd_dev);
4794 ret = rbd_dev_spec_update(rbd_dev);
4798 /* If we are mapping a snapshot it must be marked read-only */
4800 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4802 rbd_dev->mapping.read_only = read_only;
4804 ret = rbd_dev_probe_parent(rbd_dev);
4809 rbd_dev_unprobe(rbd_dev);
4811 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4813 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4815 kfree(rbd_dev->header_name);
4816 rbd_dev->header_name = NULL;
4818 rbd_dev->image_format = 0;
4819 kfree(rbd_dev->spec->image_id);
4820 rbd_dev->spec->image_id = NULL;
4822 dout("probe failed, returning %d\n", ret);
4827 static ssize_t rbd_add(struct bus_type *bus,
4831 struct rbd_device *rbd_dev = NULL;
4832 struct ceph_options *ceph_opts = NULL;
4833 struct rbd_options *rbd_opts = NULL;
4834 struct rbd_spec *spec = NULL;
4835 struct rbd_client *rbdc;
4836 struct ceph_osd_client *osdc;
4840 if (!try_module_get(THIS_MODULE))
4843 /* parse add command */
4844 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4846 goto err_out_module;
4847 read_only = rbd_opts->read_only;
4849 rbd_opts = NULL; /* done with this */
4851 rbdc = rbd_get_client(ceph_opts);
4856 ceph_opts = NULL; /* rbd_dev client now owns this */
4859 osdc = &rbdc->client->osdc;
4860 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4862 goto err_out_client;
4863 spec->pool_id = (u64)rc;
4865 /* The ceph file layout needs to fit pool id in 32 bits */
4867 if (spec->pool_id > (u64)U32_MAX) {
4868 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4869 (unsigned long long)spec->pool_id, U32_MAX);
4871 goto err_out_client;
4874 rbd_dev = rbd_dev_create(rbdc, spec);
4876 goto err_out_client;
4877 rbdc = NULL; /* rbd_dev now owns this */
4878 spec = NULL; /* rbd_dev now owns this */
4880 rc = rbd_dev_image_probe(rbd_dev, read_only);
4882 goto err_out_rbd_dev;
4884 rc = rbd_dev_device_setup(rbd_dev);
4888 rbd_dev_image_release(rbd_dev);
4890 rbd_dev_destroy(rbd_dev);
4892 rbd_put_client(rbdc);
4895 ceph_destroy_options(ceph_opts);
4899 module_put(THIS_MODULE);
4901 dout("Error adding device %s\n", buf);
4906 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4908 struct list_head *tmp;
4909 struct rbd_device *rbd_dev;
4911 spin_lock(&rbd_dev_list_lock);
4912 list_for_each(tmp, &rbd_dev_list) {
4913 rbd_dev = list_entry(tmp, struct rbd_device, node);
4914 if (rbd_dev->dev_id == dev_id) {
4915 spin_unlock(&rbd_dev_list_lock);
4919 spin_unlock(&rbd_dev_list_lock);
4923 static void rbd_dev_device_release(struct device *dev)
4925 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4927 rbd_free_disk(rbd_dev);
4928 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4929 rbd_dev_mapping_clear(rbd_dev);
4930 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4932 rbd_dev_id_put(rbd_dev);
4933 rbd_dev_mapping_clear(rbd_dev);
4936 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4938 while (rbd_dev->parent) {
4939 struct rbd_device *first = rbd_dev;
4940 struct rbd_device *second = first->parent;
4941 struct rbd_device *third;
4944 * Follow to the parent with no grandparent and
4947 while (second && (third = second->parent)) {
4952 rbd_dev_image_release(second);
4953 first->parent = NULL;
4954 first->parent_overlap = 0;
4956 rbd_assert(first->parent_spec);
4957 rbd_spec_put(first->parent_spec);
4958 first->parent_spec = NULL;
4962 static ssize_t rbd_remove(struct bus_type *bus,
4966 struct rbd_device *rbd_dev = NULL;
4971 ret = strict_strtoul(buf, 10, &ul);
4975 /* convert to int; abort if we lost anything in the conversion */
4976 target_id = (int) ul;
4977 if (target_id != ul)
4980 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4982 rbd_dev = __rbd_get_dev(target_id);
4988 spin_lock_irq(&rbd_dev->lock);
4989 if (rbd_dev->open_count)
4992 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4993 spin_unlock_irq(&rbd_dev->lock);
4997 rbd_bus_del_dev(rbd_dev);
4998 rbd_dev_image_release(rbd_dev);
4999 module_put(THIS_MODULE);
5001 mutex_unlock(&ctl_mutex);
5007 * create control files in sysfs
5010 static int rbd_sysfs_init(void)
5014 ret = device_register(&rbd_root_dev);
5018 ret = bus_register(&rbd_bus_type);
5020 device_unregister(&rbd_root_dev);
5025 static void rbd_sysfs_cleanup(void)
5027 bus_unregister(&rbd_bus_type);
5028 device_unregister(&rbd_root_dev);
5031 static int rbd_slab_init(void)
5033 rbd_assert(!rbd_img_request_cache);
5034 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5035 sizeof (struct rbd_img_request),
5036 __alignof__(struct rbd_img_request),
5038 if (!rbd_img_request_cache)
5041 rbd_assert(!rbd_obj_request_cache);
5042 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5043 sizeof (struct rbd_obj_request),
5044 __alignof__(struct rbd_obj_request),
5046 if (!rbd_obj_request_cache)
5049 rbd_assert(!rbd_segment_name_cache);
5050 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5051 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5052 if (rbd_segment_name_cache)
5055 if (rbd_obj_request_cache) {
5056 kmem_cache_destroy(rbd_obj_request_cache);
5057 rbd_obj_request_cache = NULL;
5060 kmem_cache_destroy(rbd_img_request_cache);
5061 rbd_img_request_cache = NULL;
5066 static void rbd_slab_exit(void)
5068 rbd_assert(rbd_segment_name_cache);
5069 kmem_cache_destroy(rbd_segment_name_cache);
5070 rbd_segment_name_cache = NULL;
5072 rbd_assert(rbd_obj_request_cache);
5073 kmem_cache_destroy(rbd_obj_request_cache);
5074 rbd_obj_request_cache = NULL;
5076 rbd_assert(rbd_img_request_cache);
5077 kmem_cache_destroy(rbd_img_request_cache);
5078 rbd_img_request_cache = NULL;
5081 static int __init rbd_init(void)
5085 if (!libceph_compatible(NULL)) {
5086 rbd_warn(NULL, "libceph incompatibility (quitting)");
5090 rc = rbd_slab_init();
5093 rc = rbd_sysfs_init();
5097 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5102 static void __exit rbd_exit(void)
5104 rbd_sysfs_cleanup();
5108 module_init(rbd_init);
5109 module_exit(rbd_exit);
5111 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5112 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5113 MODULE_DESCRIPTION("rados block device");
5115 /* following authorship retained from original osdblk.c */
5116 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5118 MODULE_LICENSE("GPL");