]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
rbd: fix image request leak on parent read
[~andy/linux] / drivers / block / rbd.c
index 3f162e21619418f441679b03bd1b856bcde4d3e9..c2ca1818f33583679ac3d96f8bbaa4f33949640a 100644 (file)
@@ -1,3 +1,4 @@
+
 /*
    rbd.c -- Export ceph rados objects as a Linux block device
 
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/decode.h>
 #include <linux/parser.h>
+#include <linux/bsearch.h>
 
 #include <linux/kernel.h>
 #include <linux/device.h>
 #include <linux/module.h>
 #include <linux/fs.h>
 #include <linux/blkdev.h>
+#include <linux/slab.h>
 
 #include "rbd_types.h"
 
@@ -65,6 +68,8 @@
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+#define        BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
+
 /* This allows a single page to hold an image name sent by OSD */
 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
 #define RBD_IMAGE_ID_LEN_MAX   64
@@ -80,7 +85,7 @@
 
 /* Features supported by this (client software) implementation. */
 
-#define RBD_FEATURES_SUPPORTED (0)
+#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
 
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
@@ -108,7 +113,8 @@ struct rbd_image_header {
        char *snap_names;
        u64 *snap_sizes;
 
-       u64 obj_version;
+       u64 stripe_unit;
+       u64 stripe_count;
 };
 
 /*
@@ -138,13 +144,13 @@ struct rbd_image_header {
  */
 struct rbd_spec {
        u64             pool_id;
-       char            *pool_name;
+       const char      *pool_name;
 
-       char            *image_id;
-       char            *image_name;
+       const char      *image_id;
+       const char      *image_name;
 
        u64             snap_id;
-       char            *snap_name;
+       const char      *snap_name;
 
        struct kref     kref;
 };
@@ -173,6 +179,8 @@ enum obj_request_type {
 enum obj_req_flags {
        OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
        OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
+       OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
+       OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
 };
 
 struct rbd_obj_request {
@@ -181,9 +189,31 @@ struct rbd_obj_request {
        u64                     length;         /* bytes from offset */
        unsigned long           flags;
 
-       struct rbd_img_request  *img_request;
-       u64                     img_offset;     /* image relative offset */
-       struct list_head        links;          /* img_request->obj_requests */
+       /*
+        * An object request associated with an image will have its
+        * img_data flag set; a standalone object request will not.
+        *
+        * A standalone object request will have which == BAD_WHICH
+        * and a null obj_request pointer.
+        *
+        * An object request initiated in support of a layered image
+        * object (to check for its existence before a write) will
+        * have which == BAD_WHICH and a non-null obj_request pointer.
+        *
+        * Finally, an object request for rbd image data will have
+        * which != BAD_WHICH, and will have a non-null img_request
+        * pointer.  The value of which will be in the range
+        * 0..(img_request->obj_request_count-1).
+        */
+       union {
+               struct rbd_obj_request  *obj_request;   /* STAT op */
+               struct {
+                       struct rbd_img_request  *img_request;
+                       u64                     img_offset;
+                       /* links for img_request->obj_requests list */
+                       struct list_head        links;
+               };
+       };
        u32                     which;          /* posn image request list */
 
        enum obj_request_type   type;
@@ -194,11 +224,11 @@ struct rbd_obj_request {
                        u32             page_count;
                };
        };
+       struct page             **copyup_pages;
 
        struct ceph_osd_request *osd_req;
 
        u64                     xferred;        /* bytes transferred */
-       u64                     version;
        int                     result;
 
        rbd_obj_callback_t      callback;
@@ -226,6 +256,7 @@ struct rbd_img_request {
                struct request          *rq;            /* block request */
                struct rbd_obj_request  *obj_request;   /* obj req initiator */
        };
+       struct page             **copyup_pages;
        spinlock_t              completion_lock;/* protects next_completion */
        u32                     next_completion;
        rbd_img_callback_t      callback;
@@ -245,15 +276,6 @@ struct rbd_img_request {
 #define for_each_obj_request_safe(ireq, oreq, n) \
        list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
-struct rbd_snap {
-       struct  device          dev;
-       const char              *name;
-       u64                     size;
-       struct list_head        node;
-       u64                     id;
-       u64                     features;
-};
-
 struct rbd_mapping {
        u64                     size;
        u64                     features;
@@ -289,6 +311,7 @@ struct rbd_device {
 
        struct rbd_spec         *parent_spec;
        u64                     parent_overlap;
+       struct rbd_device       *parent;
 
        /* protects updating the header */
        struct rw_semaphore     header_rwsem;
@@ -297,9 +320,6 @@ struct rbd_device {
 
        struct list_head        node;
 
-       /* list of snapshots */
-       struct list_head        snaps;
-
        /* sysfs related */
        struct device           dev;
        unsigned long           open_count;     /* protected by lock */
@@ -325,16 +345,21 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
 static LIST_HEAD(rbd_client_list);             /* clients */
 static DEFINE_SPINLOCK(rbd_client_list_lock);
 
-static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
-static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
+/* Slab caches for frequently-allocated structures */
 
-static void rbd_dev_release(struct device *dev);
-static void rbd_remove_snap_dev(struct rbd_snap *snap);
+static struct kmem_cache       *rbd_img_request_cache;
+static struct kmem_cache       *rbd_obj_request_cache;
+static struct kmem_cache       *rbd_segment_name_cache;
+
+static int rbd_img_request_submit(struct rbd_img_request *img_request);
+
+static void rbd_dev_device_release(struct device *dev);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
                          size_t count);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
 
 static struct bus_attribute rbd_bus_attrs[] = {
        __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -396,8 +421,19 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 #  define rbd_assert(expr)     ((void) 0)
 #endif /* !RBD_DEBUG */
 
-static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
+static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
+
+static int rbd_dev_refresh(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
+static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
+                                       u64 snap_id);
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+                               u8 *order, u64 *snap_size);
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+               u64 *snap_features);
+static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -497,6 +533,13 @@ out_opt:
        return ERR_PTR(ret);
 }
 
+static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
+{
+       kref_get(&rbdc->kref);
+
+       return rbdc;
+}
+
 /*
  * Find a ceph client with specific addr and configuration.  If
  * found, bump its reference count.
@@ -512,7 +555,8 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
        spin_lock(&rbd_client_list_lock);
        list_for_each_entry(client_node, &rbd_client_list, node) {
                if (!ceph_compare_options(ceph_opts, client_node->client)) {
-                       kref_get(&client_node->kref);
+                       __rbd_get_client(client_node);
+
                        found = true;
                        break;
                }
@@ -735,7 +779,6 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
                        header->snap_sizes[i] =
                                le64_to_cpu(ondisk->snaps[i].image_size);
        } else {
-               WARN_ON(ondisk->snap_names_len);
                header->snap_names = NULL;
                header->snap_sizes = NULL;
        }
@@ -748,18 +791,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
        /* Allocate and fill in the snapshot context */
 
        header->image_size = le64_to_cpu(ondisk->image_size);
-       size = sizeof (struct ceph_snap_context);
-       size += snap_count * sizeof (header->snapc->snaps[0]);
-       header->snapc = kzalloc(size, GFP_KERNEL);
+
+       header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
        if (!header->snapc)
                goto out_err;
-
-       atomic_set(&header->snapc->nref, 1);
        header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
-       header->snapc->num_snaps = snap_count;
        for (i = 0; i < snap_count; i++)
-               header->snapc->snaps[i] =
-                       le64_to_cpu(ondisk->snaps[i].id);
+               header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
 
        return 0;
 
@@ -774,70 +812,174 @@ out_err:
        return -ENOMEM;
 }
 
-static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
+{
+       const char *snap_name;
+
+       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+       /* Skip over names until we find the one we are looking for */
+
+       snap_name = rbd_dev->header.snap_names;
+       while (which--)
+               snap_name += strlen(snap_name) + 1;
+
+       return kstrdup(snap_name, GFP_KERNEL);
+}
+
+/*
+ * Snapshot id comparison function for use with qsort()/bsearch().
+ * Note that result is for snapshots in *descending* order.
+ */
+static int snapid_compare_reverse(const void *s1, const void *s2)
+{
+       u64 snap_id1 = *(u64 *)s1;
+       u64 snap_id2 = *(u64 *)s2;
+
+       if (snap_id1 < snap_id2)
+               return 1;
+       return snap_id1 == snap_id2 ? 0 : -1;
+}
+
+/*
+ * Search a snapshot context to see if the given snapshot id is
+ * present.
+ *
+ * Returns the position of the snapshot id in the array if it's found,
+ * or BAD_SNAP_INDEX otherwise.
+ *
+ * Note: The snapshot array is in kept sorted (by the osd) in
+ * reverse order, highest snapshot id first.
+ */
+static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
 {
-       struct rbd_snap *snap;
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       u64 *found;
+
+       found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
+                               sizeof (snap_id), snapid_compare_reverse);
+
+       return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
+}
+
+static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
+                                       u64 snap_id)
+{
+       u32 which;
+
+       which = rbd_dev_snap_index(rbd_dev, snap_id);
+       if (which == BAD_SNAP_INDEX)
+               return NULL;
 
+       return _rbd_dev_v1_snap_name(rbd_dev, which);
+}
+
+static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+{
        if (snap_id == CEPH_NOSNAP)
                return RBD_SNAP_HEAD_NAME;
 
-       list_for_each_entry(snap, &rbd_dev->snaps, node)
-               if (snap_id == snap->id)
-                       return snap->name;
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       if (rbd_dev->image_format == 1)
+               return rbd_dev_v1_snap_name(rbd_dev, snap_id);
 
-       return NULL;
+       return rbd_dev_v2_snap_name(rbd_dev, snap_id);
 }
 
-static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
+static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+                               u64 *snap_size)
 {
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       if (snap_id == CEPH_NOSNAP) {
+               *snap_size = rbd_dev->header.image_size;
+       } else if (rbd_dev->image_format == 1) {
+               u32 which;
 
-       struct rbd_snap *snap;
+               which = rbd_dev_snap_index(rbd_dev, snap_id);
+               if (which == BAD_SNAP_INDEX)
+                       return -ENOENT;
 
-       list_for_each_entry(snap, &rbd_dev->snaps, node) {
-               if (!strcmp(snap_name, snap->name)) {
-                       rbd_dev->spec->snap_id = snap->id;
-                       rbd_dev->mapping.size = snap->size;
-                       rbd_dev->mapping.features = snap->features;
+               *snap_size = rbd_dev->header.snap_sizes[which];
+       } else {
+               u64 size = 0;
+               int ret;
 
-                       return 0;
-               }
+               ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
+               if (ret)
+                       return ret;
+
+               *snap_size = size;
        }
+       return 0;
+}
+
+static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+                       u64 *snap_features)
+{
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       if (snap_id == CEPH_NOSNAP) {
+               *snap_features = rbd_dev->header.features;
+       } else if (rbd_dev->image_format == 1) {
+               *snap_features = 0;     /* No features for format 1 */
+       } else {
+               u64 features = 0;
+               int ret;
+
+               ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
+               if (ret)
+                       return ret;
 
-       return -ENOENT;
+               *snap_features = features;
+       }
+       return 0;
 }
 
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
+static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
 {
+       const char *snap_name = rbd_dev->spec->snap_name;
+       u64 snap_id;
+       u64 size = 0;
+       u64 features = 0;
        int ret;
 
-       if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
-                   sizeof (RBD_SNAP_HEAD_NAME))) {
-               rbd_dev->spec->snap_id = CEPH_NOSNAP;
-               rbd_dev->mapping.size = rbd_dev->header.image_size;
-               rbd_dev->mapping.features = rbd_dev->header.features;
-               ret = 0;
+       if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
+               snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
+               if (snap_id == CEPH_NOSNAP)
+                       return -ENOENT;
        } else {
-               ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
-               if (ret < 0)
-                       goto done;
-               rbd_dev->mapping.read_only = true;
+               snap_id = CEPH_NOSNAP;
        }
-       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 
-done:
-       return ret;
+       ret = rbd_snap_size(rbd_dev, snap_id, &size);
+       if (ret)
+               return ret;
+       ret = rbd_snap_features(rbd_dev, snap_id, &features);
+       if (ret)
+               return ret;
+
+       rbd_dev->mapping.size = size;
+       rbd_dev->mapping.features = features;
+
+       /* If we are mapping a snapshot it must be marked read-only */
+
+       if (snap_id != CEPH_NOSNAP)
+               rbd_dev->mapping.read_only = true;
+
+       return 0;
 }
 
-static void rbd_header_free(struct rbd_image_header *header)
+static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
 {
-       kfree(header->object_prefix);
-       header->object_prefix = NULL;
-       kfree(header->snap_sizes);
-       header->snap_sizes = NULL;
-       kfree(header->snap_names);
-       header->snap_names = NULL;
-       ceph_put_snap_context(header->snapc);
-       header->snapc = NULL;
+       rbd_dev->mapping.size = 0;
+       rbd_dev->mapping.features = 0;
+       rbd_dev->mapping.read_only = true;
+}
+
+static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
+{
+       rbd_dev->mapping.size = 0;
+       rbd_dev->mapping.features = 0;
+       rbd_dev->mapping.read_only = true;
 }
 
 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
@@ -846,7 +988,7 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
        u64 segment;
        int ret;
 
-       name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
+       name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
        if (!name)
                return NULL;
        segment = offset >> rbd_dev->header.obj_order;
@@ -862,6 +1004,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
        return name;
 }
 
+static void rbd_segment_name_free(const char *name)
+{
+       /* The explicit cast here is needed to drop the const qualifier */
+
+       kmem_cache_free(rbd_segment_name_cache, (void *)name);
+}
+
 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 {
        u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -933,6 +1082,37 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
        }
 }
 
+/*
+ * similar to zero_bio_chain(), zeros data defined by a page array,
+ * starting at the given byte offset from the start of the array and
+ * continuing up to the given end offset.  The pages array is
+ * assumed to be big enough to hold all bytes up to the end.
+ */
+static void zero_pages(struct page **pages, u64 offset, u64 end)
+{
+       struct page **page = &pages[offset >> PAGE_SHIFT];
+
+       rbd_assert(end > offset);
+       rbd_assert(end - offset <= (u64)SIZE_MAX);
+       while (offset < end) {
+               size_t page_offset;
+               size_t length;
+               unsigned long flags;
+               void *kaddr;
+
+               page_offset = (size_t)(offset & ~PAGE_MASK);
+               length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
+               local_irq_save(flags);
+               kaddr = kmap_atomic(*page);
+               memset(kaddr + page_offset, 0, length);
+               kunmap_atomic(kaddr);
+               local_irq_restore(flags);
+
+               offset += length;
+               page++;
+       }
+}
+
 /*
  * Clone a portion of a bio, starting at the given byte offset
  * and continuing for the number of bytes indicated.
@@ -1082,13 +1262,30 @@ out_err:
  * each flag, once its value is set to 1 it is never reset to 0
  * again.
  */
+static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
+{
+       if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
+               struct rbd_device *rbd_dev;
+
+               rbd_dev = obj_request->img_request->rbd_dev;
+               rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
+                       obj_request);
+       }
+}
+
+static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
+{
+       smp_mb();
+       return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
+}
+
 static void obj_request_done_set(struct rbd_obj_request *obj_request)
 {
        if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
-               struct rbd_img_request *img_request = obj_request->img_request;
-               struct rbd_device *rbd_dev;
+               struct rbd_device *rbd_dev = NULL;
 
-               rbd_dev = img_request ? img_request->rbd_dev : NULL;
+               if (obj_request_img_data_test(obj_request))
+                       rbd_dev = obj_request->img_request->rbd_dev;
                rbd_warn(rbd_dev, "obj_request %p already marked done\n",
                        obj_request);
        }
@@ -1100,22 +1297,35 @@ static bool obj_request_done_test(struct rbd_obj_request *obj_request)
        return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
 }
 
-static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
+/*
+ * This sets the KNOWN flag after (possibly) setting the EXISTS
+ * flag.  The latter is set based on the "exists" value provided.
+ *
+ * Note that for our purposes once an object exists it never goes
+ * away again.  It's possible that the response from two existence
+ * checks are separated by the creation of the target object, and
+ * the first ("doesn't exist") response arrives *after* the second
+ * ("does exist").  In that case we ignore the second one.
+ */
+static void obj_request_existence_set(struct rbd_obj_request *obj_request,
+                               bool exists)
 {
-       if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
-               struct rbd_img_request *img_request = obj_request->img_request;
-               struct rbd_device *rbd_dev;
+       if (exists)
+               set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
+       set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
+       smp_mb();
+}
 
-               rbd_dev = img_request ? img_request->rbd_dev : NULL;
-               rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
-                       obj_request);
-       }
+static bool obj_request_known_test(struct rbd_obj_request *obj_request)
+{
+       smp_mb();
+       return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
 }
 
-static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
+static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
 {
        smp_mb();
-       return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
+       return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
 }
 
 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
@@ -1155,7 +1365,7 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
 {
        rbd_assert(obj_request->img_request == NULL);
 
-       rbd_obj_request_get(obj_request);
+       /* Image request now owns object's original reference */
        obj_request->img_request = img_request;
        obj_request->which = img_request->obj_request_count;
        rbd_assert(!obj_request_img_data_test(obj_request));
@@ -1285,9 +1495,12 @@ static bool img_request_layered_test(struct rbd_img_request *img_request)
 static void
 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
 {
+       u64 xferred = obj_request->xferred;
+       u64 length = obj_request->length;
+
        dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
                obj_request, obj_request->img_request, obj_request->result,
-               obj_request->xferred, obj_request->length);
+               xferred, length);
        /*
         * ENOENT means a hole in the image.  We zero-fill the
         * entire length of the request.  A short read also implies
@@ -1295,15 +1508,20 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
         * update the xferred count to indicate the whole request
         * was satisfied.
         */
-       BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
+       rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
        if (obj_request->result == -ENOENT) {
-               zero_bio_chain(obj_request->bio_list, 0);
+               if (obj_request->type == OBJ_REQUEST_BIO)
+                       zero_bio_chain(obj_request->bio_list, 0);
+               else
+                       zero_pages(obj_request->pages, 0, length);
                obj_request->result = 0;
-               obj_request->xferred = obj_request->length;
-       } else if (obj_request->xferred < obj_request->length &&
-                       !obj_request->result) {
-               zero_bio_chain(obj_request->bio_list, obj_request->xferred);
-               obj_request->xferred = obj_request->length;
+               obj_request->xferred = length;
+       } else if (xferred < length && !obj_request->result) {
+               if (obj_request->type == OBJ_REQUEST_BIO)
+                       zero_bio_chain(obj_request->bio_list, xferred);
+               else
+                       zero_pages(obj_request->pages, xferred, length);
+               obj_request->xferred = length;
        }
        obj_request_done_set(obj_request);
 }
@@ -1326,9 +1544,23 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
 
 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
-       dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
-               obj_request->result, obj_request->xferred, obj_request->length);
-       if (obj_request->img_request)
+       struct rbd_img_request *img_request = NULL;
+       struct rbd_device *rbd_dev = NULL;
+       bool layered = false;
+
+       if (obj_request_img_data_test(obj_request)) {
+               img_request = obj_request->img_request;
+               layered = img_request && img_request_layered_test(img_request);
+               rbd_dev = img_request->rbd_dev;
+       }
+
+       dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
+               obj_request, img_request, obj_request->result,
+               obj_request->xferred, obj_request->length);
+       if (layered && obj_request->result == -ENOENT &&
+                       obj_request->img_offset < rbd_dev->parent_overlap)
+               rbd_img_parent_read(obj_request);
+       else if (img_request)
                rbd_img_obj_request_read_callback(obj_request);
        else
                obj_request_done_set(obj_request);
@@ -1339,9 +1571,8 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
        dout("%s: obj %p result %d %llu\n", __func__, obj_request,
                obj_request->result, obj_request->length);
        /*
-        * There is no such thing as a successful short write.
-        * Our xferred value is the number of bytes transferred
-        * back.  Set it to our originally-requested length.
+        * There is no such thing as a successful short write.  Set
+        * it to our originally-requested length.
         */
        obj_request->xferred = obj_request->length;
        obj_request_done_set(obj_request);
@@ -1365,23 +1596,24 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 
        dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
        rbd_assert(osd_req == obj_request->osd_req);
-       rbd_assert(obj_request_img_data_test(obj_request) ^
-                               !obj_request->img_request);
-       rbd_assert(obj_request_img_data_test(obj_request) ^
-                               (obj_request->which == BAD_WHICH));
+       if (obj_request_img_data_test(obj_request)) {
+               rbd_assert(obj_request->img_request);
+               rbd_assert(obj_request->which != BAD_WHICH);
+       } else {
+               rbd_assert(obj_request->which == BAD_WHICH);
+       }
 
        if (osd_req->r_result < 0)
                obj_request->result = osd_req->r_result;
-       obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
 
-       WARN_ON(osd_req->r_num_ops != 1);       /* For now */
+       BUG_ON(osd_req->r_num_ops > 2);
 
        /*
         * We support a 64-bit length, but ultimately it has to be
         * passed to blk_end_request(), which takes an unsigned int.
         */
        obj_request->xferred = osd_req->r_reply_op_len[0];
-       rbd_assert(obj_request->xferred < (u64) UINT_MAX);
+       rbd_assert(obj_request->xferred < (u64)UINT_MAX);
        opcode = osd_req->r_ops[0].op;
        switch (opcode) {
        case CEPH_OSD_OP_READ:
@@ -1408,28 +1640,31 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                rbd_obj_request_complete(obj_request);
 }
 
-static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
-                                       bool write_request)
+static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request = obj_request->img_request;
        struct ceph_osd_request *osd_req = obj_request->osd_req;
-       struct ceph_snap_context *snapc = NULL;
-       u64 snap_id = CEPH_NOSNAP;
-       struct timespec *mtime = NULL;
-       struct timespec now;
+       u64 snap_id;
 
        rbd_assert(osd_req != NULL);
 
-       if (write_request) {
-               now = CURRENT_TIME;
-               mtime = &now;
-               if (img_request)
-                       snapc = img_request->snapc;
-       } else if (img_request) {
-               snap_id = img_request->snap_id;
-       }
+       snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
+       ceph_osdc_build_request(osd_req, obj_request->offset,
+                       NULL, snap_id, NULL);
+}
+
+static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request = obj_request->img_request;
+       struct ceph_osd_request *osd_req = obj_request->osd_req;
+       struct ceph_snap_context *snapc;
+       struct timespec mtime = CURRENT_TIME;
+
+       rbd_assert(osd_req != NULL);
+
+       snapc = img_request ? img_request->snapc : NULL;
        ceph_osdc_build_request(osd_req, obj_request->offset,
-                       snapc, snap_id, mtime);
+                       snapc, CEPH_NOSNAP, &mtime);
 }
 
 static struct ceph_osd_request *rbd_osd_req_create(
@@ -1474,6 +1709,48 @@ static struct ceph_osd_request *rbd_osd_req_create(
        return osd_req;
 }
 
+/*
+ * Create a copyup osd request based on the information in the
+ * object request supplied.  A copyup request has two osd ops,
+ * a copyup method call, and a "normal" write request.
+ */
+static struct ceph_osd_request *
+rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       struct ceph_snap_context *snapc;
+       struct rbd_device *rbd_dev;
+       struct ceph_osd_client *osdc;
+       struct ceph_osd_request *osd_req;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       img_request = obj_request->img_request;
+       rbd_assert(img_request);
+       rbd_assert(img_request_write_test(img_request));
+
+       /* Allocate and initialize the request, for the two ops */
+
+       snapc = img_request->snapc;
+       rbd_dev = img_request->rbd_dev;
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
+       if (!osd_req)
+               return NULL;    /* ENOMEM */
+
+       osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+       osd_req->r_callback = rbd_osd_req_callback;
+       osd_req->r_priv = obj_request;
+
+       osd_req->r_oid_len = strlen(obj_request->object_name);
+       rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
+       memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
+
+       osd_req->r_file_layout = rbd_dev->layout;       /* struct */
+
+       return osd_req;
+}
+
+
 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
 {
        ceph_osdc_put_request(osd_req);
@@ -1492,11 +1769,16 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
        rbd_assert(obj_request_type_valid(type));
 
        size = strlen(object_name) + 1;
-       obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
-       if (!obj_request)
+       name = kmalloc(size, GFP_KERNEL);
+       if (!name)
+               return NULL;
+
+       obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
+       if (!obj_request) {
+               kfree(name);
                return NULL;
+       }
 
-       name = (char *)(obj_request + 1);
        obj_request->object_name = memcpy(name, object_name, size);
        obj_request->offset = offset;
        obj_request->length = length;
@@ -1542,7 +1824,9 @@ static void rbd_obj_request_destroy(struct kref *kref)
                break;
        }
 
-       kfree(obj_request);
+       kfree(obj_request->object_name);
+       obj_request->object_name = NULL;
+       kmem_cache_free(rbd_obj_request_cache, obj_request);
 }
 
 /*
@@ -1557,21 +1841,15 @@ static struct rbd_img_request *rbd_img_request_create(
                                        bool child_request)
 {
        struct rbd_img_request *img_request;
-       struct ceph_snap_context *snapc = NULL;
 
-       img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
+       img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
        if (!img_request)
                return NULL;
 
        if (write_request) {
                down_read(&rbd_dev->header_rwsem);
-               snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+               ceph_get_snap_context(rbd_dev->header.snapc);
                up_read(&rbd_dev->header_rwsem);
-               if (WARN_ON(!snapc)) {
-                       kfree(img_request);
-                       return NULL;    /* Shouldn't happen */
-               }
-
        }
 
        img_request->rq = NULL;
@@ -1581,7 +1859,7 @@ static struct rbd_img_request *rbd_img_request_create(
        img_request->flags = 0;
        if (write_request) {
                img_request_write_set(img_request);
-               img_request->snapc = snapc;
+               img_request->snapc = rbd_dev->header.snapc;
        } else {
                img_request->snap_id = rbd_dev->spec->snap_id;
        }
@@ -1597,7 +1875,6 @@ static struct rbd_img_request *rbd_img_request_create(
        INIT_LIST_HEAD(&img_request->obj_requests);
        kref_init(&img_request->kref);
 
-       (void) img_request_layered_test(img_request);   /* Avoid a warning */
        rbd_img_request_get(img_request);       /* Avoid a warning */
        rbd_img_request_put(img_request);       /* TEMPORARY */
 
@@ -1625,7 +1902,10 @@ static void rbd_img_request_destroy(struct kref *kref)
        if (img_request_write_test(img_request))
                ceph_put_snap_context(img_request->snapc);
 
-       kfree(img_request);
+       if (img_request_child_test(img_request))
+               rbd_obj_request_put(img_request->obj_request);
+
+       kmem_cache_free(rbd_img_request_cache, img_request);
 }
 
 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
@@ -1633,13 +1913,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
        struct rbd_img_request *img_request;
        unsigned int xferred;
        int result;
+       bool more;
 
        rbd_assert(obj_request_img_data_test(obj_request));
        img_request = obj_request->img_request;
 
-       rbd_assert(!img_request_child_test(img_request));
-       rbd_assert(img_request->rq != NULL);
-
        rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
        xferred = (unsigned int)obj_request->xferred;
        result = obj_request->result;
@@ -1656,7 +1934,22 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
                        img_request->result = result;
        }
 
-       return blk_end_request(img_request->rq, result, xferred);
+       /* Image object requests don't own their page array */
+
+       if (obj_request->type == OBJ_REQUEST_PAGES) {
+               obj_request->pages = NULL;
+               obj_request->page_count = 0;
+       }
+
+       if (img_request_child_test(img_request)) {
+               rbd_assert(img_request->obj_request != NULL);
+               more = obj_request->which < img_request->obj_request_count - 1;
+       } else {
+               rbd_assert(img_request->rq != NULL);
+               more = blk_end_request(img_request->rq, result, xferred);
+       }
+
+       return more;
 }
 
 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
@@ -1698,30 +1991,48 @@ out:
                rbd_img_request_complete(img_request);
 }
 
-static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
-                                       struct bio *bio_list)
+/*
+ * Split up an image request into one or more object requests, each
+ * to a different object.  The "type" parameter indicates whether
+ * "data_desc" is the pointer to the head of a list of bio
+ * structures, or the base of a page array.  In either case this
+ * function assumes data_desc describes memory sufficient to hold
+ * all data described by the image request.
+ */
+static int rbd_img_request_fill(struct rbd_img_request *img_request,
+                                       enum obj_request_type type,
+                                       void *data_desc)
 {
        struct rbd_device *rbd_dev = img_request->rbd_dev;
        struct rbd_obj_request *obj_request = NULL;
        struct rbd_obj_request *next_obj_request;
        bool write_request = img_request_write_test(img_request);
-       unsigned int bio_offset;
+       struct bio *bio_list;
+       unsigned int bio_offset = 0;
+       struct page **pages;
        u64 img_offset;
        u64 resid;
        u16 opcode;
 
-       dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
+       dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
+               (int)type, data_desc);
 
        opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
-       bio_offset = 0;
        img_offset = img_request->offset;
-       rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
        resid = img_request->length;
        rbd_assert(resid > 0);
+
+       if (type == OBJ_REQUEST_BIO) {
+               bio_list = data_desc;
+               rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
+       } else {
+               rbd_assert(type == OBJ_REQUEST_PAGES);
+               pages = data_desc;
+       }
+
        while (resid) {
                struct ceph_osd_request *osd_req;
                const char *object_name;
-               unsigned int clone_size;
                u64 offset;
                u64 length;
 
@@ -1731,19 +2042,34 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
                offset = rbd_segment_offset(rbd_dev, img_offset);
                length = rbd_segment_length(rbd_dev, img_offset, resid);
                obj_request = rbd_obj_request_create(object_name,
-                                               offset, length,
-                                               OBJ_REQUEST_BIO);
-               kfree(object_name);     /* object request has its own copy */
+                                               offset, length, type);
+               /* object request has its own copy of the object name */
+               rbd_segment_name_free(object_name);
                if (!obj_request)
                        goto out_unwind;
 
-               rbd_assert(length <= (u64) UINT_MAX);
-               clone_size = (unsigned int) length;
-               obj_request->bio_list = bio_chain_clone_range(&bio_list,
-                                               &bio_offset, clone_size,
-                                               GFP_ATOMIC);
-               if (!obj_request->bio_list)
-                       goto out_partial;
+               if (type == OBJ_REQUEST_BIO) {
+                       unsigned int clone_size;
+
+                       rbd_assert(length <= (u64)UINT_MAX);
+                       clone_size = (unsigned int)length;
+                       obj_request->bio_list =
+                                       bio_chain_clone_range(&bio_list,
+                                                               &bio_offset,
+                                                               clone_size,
+                                                               GFP_ATOMIC);
+                       if (!obj_request->bio_list)
+                               goto out_partial;
+               } else {
+                       unsigned int page_count;
+
+                       obj_request->pages = pages;
+                       page_count = (u32)calc_pages_for(offset, length);
+                       obj_request->page_count = page_count;
+                       if ((offset + length) & ~PAGE_MASK)
+                               page_count--;   /* more on last page */
+                       pages += page_count;
+               }
 
                osd_req = rbd_osd_req_create(rbd_dev, write_request,
                                                obj_request);
@@ -1754,9 +2080,18 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
 
                osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
                                                0, 0);
-               osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
-                               obj_request->bio_list, obj_request->length);
-               rbd_osd_req_format(obj_request, write_request);
+               if (type == OBJ_REQUEST_BIO)
+                       osd_req_op_extent_osd_data_bio(osd_req, 0,
+                                       obj_request->bio_list, length);
+               else
+                       osd_req_op_extent_osd_data_pages(osd_req, 0,
+                                       obj_request->pages, length,
+                                       offset & ~PAGE_MASK, false, false);
+
+               if (write_request)
+                       rbd_osd_req_format_write(obj_request);
+               else
+                       rbd_osd_req_format_read(obj_request);
 
                obj_request->img_offset = img_offset;
                rbd_img_obj_request_add(img_request, obj_request);
@@ -1776,33 +2111,492 @@ out_unwind:
        return -ENOMEM;
 }
 
-static int rbd_img_request_submit(struct rbd_img_request *img_request)
+static void
+rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 {
-       struct rbd_device *rbd_dev = img_request->rbd_dev;
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       struct rbd_obj_request *obj_request;
-       struct rbd_obj_request *next_obj_request;
+       struct rbd_img_request *img_request;
+       struct rbd_device *rbd_dev;
+       u64 length;
+       u32 page_count;
 
-       dout("%s: img %p\n", __func__, img_request);
-       for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
-               int ret;
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_img_data_test(obj_request));
+       img_request = obj_request->img_request;
+       rbd_assert(img_request);
 
-               ret = rbd_obj_request_submit(osdc, obj_request);
+       rbd_dev = img_request->rbd_dev;
+       rbd_assert(rbd_dev);
+       length = (u64)1 << rbd_dev->header.obj_order;
+       page_count = (u32)calc_pages_for(0, length);
+
+       rbd_assert(obj_request->copyup_pages);
+       ceph_release_page_vector(obj_request->copyup_pages, page_count);
+       obj_request->copyup_pages = NULL;
+
+       /*
+        * We want the transfer count to reflect the size of the
+        * original write request.  There is no such thing as a
+        * successful short write, so if the request was successful
+        * we can just set it to the originally-requested length.
+        */
+       if (!obj_request->result)
+               obj_request->xferred = obj_request->length;
+
+       /* Finish up with the normal image object callback */
+
+       rbd_img_obj_callback(obj_request);
+}
+
+static void
+rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
+{
+       struct rbd_obj_request *orig_request;
+       struct ceph_osd_request *osd_req;
+       struct ceph_osd_client *osdc;
+       struct rbd_device *rbd_dev;
+       struct page **pages;
+       int result;
+       u64 obj_size;
+       u64 xferred;
+
+       rbd_assert(img_request_child_test(img_request));
+
+       /* First get what we need from the image request */
+
+       pages = img_request->copyup_pages;
+       rbd_assert(pages != NULL);
+       img_request->copyup_pages = NULL;
+
+       orig_request = img_request->obj_request;
+       rbd_assert(orig_request != NULL);
+       rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+       result = img_request->result;
+       obj_size = img_request->length;
+       xferred = img_request->xferred;
+
+       rbd_dev = img_request->rbd_dev;
+       rbd_assert(rbd_dev);
+       rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
+
+       rbd_img_request_put(img_request);
+
+       if (result)
+               goto out_err;
+
+       /* Allocate the new copyup osd request for the original request */
+
+       result = -ENOMEM;
+       rbd_assert(!orig_request->osd_req);
+       osd_req = rbd_osd_req_create_copyup(orig_request);
+       if (!osd_req)
+               goto out_err;
+       orig_request->osd_req = osd_req;
+       orig_request->copyup_pages = pages;
+
+       /* Initialize the copyup op */
+
+       osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
+       osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+                                               false, false);
+
+       /* Then the original write request op */
+
+       osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
+                                       orig_request->offset,
+                                       orig_request->length, 0, 0);
+       osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
+                                       orig_request->length);
+
+       rbd_osd_req_format_write(orig_request);
+
+       /* All set, send it off. */
+
+       orig_request->callback = rbd_img_obj_copyup_callback;
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       result = rbd_obj_request_submit(osdc, orig_request);
+       if (!result)
+               return;
+out_err:
+       /* Record the error code and complete the request */
+
+       orig_request->result = result;
+       orig_request->xferred = 0;
+       obj_request_done_set(orig_request);
+       rbd_obj_request_complete(orig_request);
+}
+
+/*
+ * Read from the parent image the range of data that covers the
+ * entire target of the given object request.  This is used for
+ * satisfying a layered image write request when the target of an
+ * object request from the image request does not exist.
+ *
+ * A page array big enough to hold the returned data is allocated
+ * and supplied to rbd_img_request_fill() as the "data descriptor."
+ * When the read completes, this page array will be transferred to
+ * the original object request for the copyup operation.
+ *
+ * If an error occurs, record it as the result of the original
+ * object request and mark it done so it gets completed.
+ */
+static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request = NULL;
+       struct rbd_img_request *parent_request = NULL;
+       struct rbd_device *rbd_dev;
+       u64 img_offset;
+       u64 length;
+       struct page **pages = NULL;
+       u32 page_count;
+       int result;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+
+       img_request = obj_request->img_request;
+       rbd_assert(img_request != NULL);
+       rbd_dev = img_request->rbd_dev;
+       rbd_assert(rbd_dev->parent != NULL);
+
+       /*
+        * First things first.  The original osd request is of no
+        * use to use any more, we'll need a new one that can hold
+        * the two ops in a copyup request.  We'll get that later,
+        * but for now we can release the old one.
+        */
+       rbd_osd_req_destroy(obj_request->osd_req);
+       obj_request->osd_req = NULL;
+
+       /*
+        * Determine the byte range covered by the object in the
+        * child image to which the original request was to be sent.
+        */
+       img_offset = obj_request->img_offset - obj_request->offset;
+       length = (u64)1 << rbd_dev->header.obj_order;
+
+       /*
+        * There is no defined parent data beyond the parent
+        * overlap, so limit what we read at that boundary if
+        * necessary.
+        */
+       if (img_offset + length > rbd_dev->parent_overlap) {
+               rbd_assert(img_offset < rbd_dev->parent_overlap);
+               length = rbd_dev->parent_overlap - img_offset;
+       }
+
+       /*
+        * Allocate a page array big enough to receive the data read
+        * from the parent.
+        */
+       page_count = (u32)calc_pages_for(0, length);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages)) {
+               result = PTR_ERR(pages);
+               pages = NULL;
+               goto out_err;
+       }
+
+       result = -ENOMEM;
+       parent_request = rbd_img_request_create(rbd_dev->parent,
+                                               img_offset, length,
+                                               false, true);
+       if (!parent_request)
+               goto out_err;
+       rbd_obj_request_get(obj_request);
+       parent_request->obj_request = obj_request;
+
+       result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
+       if (result)
+               goto out_err;
+       parent_request->copyup_pages = pages;
+
+       parent_request->callback = rbd_img_obj_parent_read_full_callback;
+       result = rbd_img_request_submit(parent_request);
+       if (!result)
+               return 0;
+
+       parent_request->copyup_pages = NULL;
+       parent_request->obj_request = NULL;
+       rbd_obj_request_put(obj_request);
+out_err:
+       if (pages)
+               ceph_release_page_vector(pages, page_count);
+       if (parent_request)
+               rbd_img_request_put(parent_request);
+       obj_request->result = result;
+       obj_request->xferred = 0;
+       obj_request_done_set(obj_request);
+
+       return result;
+}
+
+static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
+{
+       struct rbd_obj_request *orig_request;
+       int result;
+
+       rbd_assert(!obj_request_img_data_test(obj_request));
+
+       /*
+        * All we need from the object request is the original
+        * request and the result of the STAT op.  Grab those, then
+        * we're done with the request.
+        */
+       orig_request = obj_request->obj_request;
+       obj_request->obj_request = NULL;
+       rbd_assert(orig_request);
+       rbd_assert(orig_request->img_request);
+
+       result = obj_request->result;
+       obj_request->result = 0;
+
+       dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
+               obj_request, orig_request, result,
+               obj_request->xferred, obj_request->length);
+       rbd_obj_request_put(obj_request);
+
+       rbd_assert(orig_request);
+       rbd_assert(orig_request->img_request);
+
+       /*
+        * Our only purpose here is to determine whether the object
+        * exists, and we don't want to treat the non-existence as
+        * an error.  If something else comes back, transfer the
+        * error to the original request and complete it now.
+        */
+       if (!result) {
+               obj_request_existence_set(orig_request, true);
+       } else if (result == -ENOENT) {
+               obj_request_existence_set(orig_request, false);
+       } else if (result) {
+               orig_request->result = result;
+               goto out;
+       }
+
+       /*
+        * Resubmit the original request now that we have recorded
+        * whether the target object exists.
+        */
+       orig_request->result = rbd_img_obj_request_submit(orig_request);
+out:
+       if (orig_request->result)
+               rbd_obj_request_complete(orig_request);
+       rbd_obj_request_put(orig_request);
+}
+
+static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
+{
+       struct rbd_obj_request *stat_request;
+       struct rbd_device *rbd_dev;
+       struct ceph_osd_client *osdc;
+       struct page **pages = NULL;
+       u32 page_count;
+       size_t size;
+       int ret;
+
+       /*
+        * The response data for a STAT call consists of:
+        *     le64 length;
+        *     struct {
+        *         le32 tv_sec;
+        *         le32 tv_nsec;
+        *     } mtime;
+        */
+       size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
+       page_count = (u32)calc_pages_for(0, size);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       ret = -ENOMEM;
+       stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
+                                                       OBJ_REQUEST_PAGES);
+       if (!stat_request)
+               goto out;
+
+       rbd_obj_request_get(obj_request);
+       stat_request->obj_request = obj_request;
+       stat_request->pages = pages;
+       stat_request->page_count = page_count;
+
+       rbd_assert(obj_request->img_request);
+       rbd_dev = obj_request->img_request->rbd_dev;
+       stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               stat_request);
+       if (!stat_request->osd_req)
+               goto out;
+       stat_request->callback = rbd_img_obj_exists_callback;
+
+       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
+       osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
+                                       false, false);
+       rbd_osd_req_format_read(stat_request);
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       ret = rbd_obj_request_submit(osdc, stat_request);
+out:
+       if (ret)
+               rbd_obj_request_put(obj_request);
+
+       return ret;
+}
+
+static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       struct rbd_device *rbd_dev;
+       bool known;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+
+       img_request = obj_request->img_request;
+       rbd_assert(img_request);
+       rbd_dev = img_request->rbd_dev;
+
+       /*
+        * Only writes to layered images need special handling.
+        * Reads and non-layered writes are simple object requests.
+        * Layered writes that start beyond the end of the overlap
+        * with the parent have no parent data, so they too are
+        * simple object requests.  Finally, if the target object is
+        * known to already exist, its parent data has already been
+        * copied, so a write to the object can also be handled as a
+        * simple object request.
+        */
+       if (!img_request_write_test(img_request) ||
+               !img_request_layered_test(img_request) ||
+               rbd_dev->parent_overlap <= obj_request->img_offset ||
+               ((known = obj_request_known_test(obj_request)) &&
+                       obj_request_exists_test(obj_request))) {
+
+               struct rbd_device *rbd_dev;
+               struct ceph_osd_client *osdc;
+
+               rbd_dev = obj_request->img_request->rbd_dev;
+               osdc = &rbd_dev->rbd_client->client->osdc;
+
+               return rbd_obj_request_submit(osdc, obj_request);
+       }
+
+       /*
+        * It's a layered write.  The target object might exist but
+        * we may not know that yet.  If we know it doesn't exist,
+        * start by reading the data for the full target object from
+        * the parent so we can use it for a copyup to the target.
+        */
+       if (known)
+               return rbd_img_obj_parent_read_full(obj_request);
+
+       /* We don't know whether the target exists.  Go find out. */
+
+       return rbd_img_obj_exists_submit(obj_request);
+}
+
+static int rbd_img_request_submit(struct rbd_img_request *img_request)
+{
+       struct rbd_obj_request *obj_request;
+       struct rbd_obj_request *next_obj_request;
+
+       dout("%s: img %p\n", __func__, img_request);
+       for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
+               int ret;
+
+               ret = rbd_img_obj_request_submit(obj_request);
                if (ret)
                        return ret;
-               /*
-                * The image request has its own reference to each
-                * of its object requests, so we can safely drop the
-                * initial one here.
-                */
-               rbd_obj_request_put(obj_request);
        }
 
        return 0;
 }
 
-static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
-                                  u64 ver, u64 notify_id)
+static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
+{
+       struct rbd_obj_request *obj_request;
+       struct rbd_device *rbd_dev;
+       u64 obj_end;
+
+       rbd_assert(img_request_child_test(img_request));
+
+       obj_request = img_request->obj_request;
+       rbd_assert(obj_request);
+       rbd_assert(obj_request->img_request);
+
+       obj_request->result = img_request->result;
+       if (obj_request->result)
+               goto out;
+
+       /*
+        * We need to zero anything beyond the parent overlap
+        * boundary.  Since rbd_img_obj_request_read_callback()
+        * will zero anything beyond the end of a short read, an
+        * easy way to do this is to pretend the data from the
+        * parent came up short--ending at the overlap boundary.
+        */
+       rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
+       obj_end = obj_request->img_offset + obj_request->length;
+       rbd_dev = obj_request->img_request->rbd_dev;
+       if (obj_end > rbd_dev->parent_overlap) {
+               u64 xferred = 0;
+
+               if (obj_request->img_offset < rbd_dev->parent_overlap)
+                       xferred = rbd_dev->parent_overlap -
+                                       obj_request->img_offset;
+
+               obj_request->xferred = min(img_request->xferred, xferred);
+       } else {
+               obj_request->xferred = img_request->xferred;
+       }
+out:
+       rbd_img_request_put(img_request);
+       rbd_img_obj_request_read_callback(obj_request);
+       rbd_obj_request_complete(obj_request);
+}
+
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
+{
+       struct rbd_device *rbd_dev;
+       struct rbd_img_request *img_request;
+       int result;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       rbd_assert(obj_request->img_request != NULL);
+       rbd_assert(obj_request->result == (s32) -ENOENT);
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+
+       rbd_dev = obj_request->img_request->rbd_dev;
+       rbd_assert(rbd_dev->parent != NULL);
+       /* rbd_read_finish(obj_request, obj_request->length); */
+       img_request = rbd_img_request_create(rbd_dev->parent,
+                                               obj_request->img_offset,
+                                               obj_request->length,
+                                               false, true);
+       result = -ENOMEM;
+       if (!img_request)
+               goto out_err;
+
+       rbd_obj_request_get(obj_request);
+       img_request->obj_request = obj_request;
+
+       result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+                                       obj_request->bio_list);
+       if (result)
+               goto out_err;
+
+       img_request->callback = rbd_img_parent_read_callback;
+       result = rbd_img_request_submit(img_request);
+       if (result)
+               goto out_err;
+
+       return;
+out_err:
+       if (img_request)
+               rbd_img_request_put(img_request);
+       obj_request->result = result;
+       obj_request->xferred = 0;
+       obj_request_done_set(obj_request);
+}
+
+static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
 {
        struct rbd_obj_request *obj_request;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
@@ -1820,8 +2614,8 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
        obj_request->callback = rbd_obj_request_put;
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
-                                       notify_id, ver, 0);
-       rbd_osd_req_format(obj_request, false);
+                                       notify_id, 0, 0);
+       rbd_osd_req_format_read(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
 out:
@@ -1834,21 +2628,16 @@ out:
 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
        struct rbd_device *rbd_dev = (struct rbd_device *)data;
-       u64 hver;
-       int rc;
 
        if (!rbd_dev)
                return;
 
        dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
-               rbd_dev->header_name, (unsigned long long) notify_id,
-               (unsigned int) opcode);
-       rc = rbd_dev_refresh(rbd_dev, &hver);
-       if (rc)
-               rbd_warn(rbd_dev, "got notification but failed to "
-                          " update snaps: %d\n", rc);
+               rbd_dev->header_name, (unsigned long long)notify_id,
+               (unsigned int)opcode);
+       (void)rbd_dev_refresh(rbd_dev);
 
-       rbd_obj_notify_ack(rbd_dev, hver, notify_id);
+       rbd_obj_notify_ack(rbd_dev, notify_id);
 }
 
 /*
@@ -1889,9 +2678,8 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
                                        rbd_dev->watch_request->osd_req);
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie,
-                               rbd_dev->header.obj_version, start);
-       rbd_osd_req_format(obj_request, true);
+                               rbd_dev->watch_event->cookie, 0, start);
+       rbd_osd_req_format_write(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
@@ -1932,17 +2720,17 @@ out_cancel:
 }
 
 /*
- * Synchronous osd object method call
+ * Synchronous osd object method call.  Returns the number of bytes
+ * returned in the outbound buffer, or a negative error code.
  */
 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
                             const char *object_name,
                             const char *class_name,
                             const char *method_name,
-                            const char *outbound,
+                            const void *outbound,
                             size_t outbound_size,
-                            char *inbound,
-                            size_t inbound_size,
-                            u64 *version)
+                            void *inbound,
+                            size_t inbound_size)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
@@ -1957,7 +2745,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
         * method.  Currently if this is present it will be a
         * snapshot id.
         */
-       page_count = (u32) calc_pages_for(0, inbound_size);
+       page_count = (u32)calc_pages_for(0, inbound_size);
        pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
        if (IS_ERR(pages))
                return PTR_ERR(pages);
@@ -1992,7 +2780,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
        osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
                                        obj_request->pages, inbound_size,
                                        0, false, false);
-       rbd_osd_req_format(obj_request, false);
+       rbd_osd_req_format_read(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
@@ -2004,10 +2792,10 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
        ret = obj_request->result;
        if (ret < 0)
                goto out;
-       ret = 0;
+
+       rbd_assert(obj_request->xferred < (u64)INT_MAX);
+       ret = (int)obj_request->xferred;
        ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
-       if (version)
-               *version = obj_request->version;
 out:
        if (obj_request)
                rbd_obj_request_put(obj_request);
@@ -2077,8 +2865,11 @@ static void rbd_request_fn(struct request_queue *q)
                }
 
                result = -EINVAL;
-               if (WARN_ON(offset && length > U64_MAX - offset + 1))
+               if (offset && length > U64_MAX - offset + 1) {
+                       rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
+                               offset, length);
                        goto end_request;       /* Shouldn't happen */
+               }
 
                result = -ENOMEM;
                img_request = rbd_img_request_create(rbd_dev, offset, length,
@@ -2088,7 +2879,8 @@ static void rbd_request_fn(struct request_queue *q)
 
                img_request->rq = rq;
 
-               result = rbd_img_request_fill_bio(img_request, rq->bio);
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+                                               rq->bio);
                if (!result)
                        result = rbd_img_request_submit(img_request);
                if (result)
@@ -2158,17 +2950,18 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
        if (!disk)
                return;
 
-       if (disk->flags & GENHD_FL_UP)
+       rbd_dev->disk = NULL;
+       if (disk->flags & GENHD_FL_UP) {
                del_gendisk(disk);
-       if (disk->queue)
-               blk_cleanup_queue(disk->queue);
+               if (disk->queue)
+                       blk_cleanup_queue(disk->queue);
+       }
        put_disk(disk);
 }
 
 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
                                const char *object_name,
-                               u64 offset, u64 length,
-                               char *buf, u64 *version)
+                               u64 offset, u64 length, void *buf)
 
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
@@ -2198,12 +2991,12 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
 
        osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
                                        offset, length, 0, 0);
-       osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
+       osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
                                        obj_request->pages,
                                        obj_request->length,
                                        obj_request->offset & ~PAGE_MASK,
                                        false, false);
-       rbd_osd_req_format(obj_request, false);
+       rbd_osd_req_format_read(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
        if (ret)
@@ -2219,10 +3012,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
        size = (size_t) obj_request->xferred;
        ceph_copy_from_page_vector(pages, buf, 0, size);
-       rbd_assert(size <= (size_t) INT_MAX);
-       ret = (int) size;
-       if (version)
-               *version = obj_request->version;
+       rbd_assert(size <= (size_t)INT_MAX);
+       ret = (int)size;
 out:
        if (obj_request)
                rbd_obj_request_put(obj_request);
@@ -2243,7 +3034,7 @@ out:
  * Returns a pointer-coded errno if a failure occurs.
  */
 static struct rbd_image_header_ondisk *
-rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
+rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
 {
        struct rbd_image_header_ondisk *ondisk = NULL;
        u32 snap_count = 0;
@@ -2271,11 +3062,10 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                        return ERR_PTR(-ENOMEM);
 
                ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
-                                      0, size,
-                                      (char *) ondisk, version);
+                                      0, size, ondisk);
                if (ret < 0)
                        goto out_err;
-               if (WARN_ON((size_t) ret < size)) {
+               if ((size_t)ret < size) {
                        ret = -ENXIO;
                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
                                size, ret);
@@ -2307,46 +3097,36 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
                           struct rbd_image_header *header)
 {
        struct rbd_image_header_ondisk *ondisk;
-       u64 ver = 0;
        int ret;
 
-       ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
+       ondisk = rbd_dev_v1_header_read(rbd_dev);
        if (IS_ERR(ondisk))
                return PTR_ERR(ondisk);
        ret = rbd_header_from_disk(header, ondisk);
-       if (ret >= 0)
-               header->obj_version = ver;
        kfree(ondisk);
 
        return ret;
 }
 
-static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
-{
-       struct rbd_snap *snap;
-       struct rbd_snap *next;
-
-       list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
-               rbd_remove_snap_dev(snap);
-}
-
 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
 {
-       sector_t size;
-
        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
                return;
 
-       size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
-       dout("setting size to %llu sectors", (unsigned long long) size);
-       rbd_dev->mapping.size = (u64) size;
-       set_capacity(rbd_dev->disk, size);
+       if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
+               sector_t size;
+
+               rbd_dev->mapping.size = rbd_dev->header.image_size;
+               size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
+               dout("setting size to %llu sectors", (unsigned long long)size);
+               set_capacity(rbd_dev->disk, size);
+       }
 }
 
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
 {
        int ret;
        struct rbd_image_header h;
@@ -2367,37 +3147,61 @@ static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
        /* osd requests may still refer to snapc */
        ceph_put_snap_context(rbd_dev->header.snapc);
 
-       if (hver)
-               *hver = h.obj_version;
-       rbd_dev->header.obj_version = h.obj_version;
        rbd_dev->header.image_size = h.image_size;
        rbd_dev->header.snapc = h.snapc;
        rbd_dev->header.snap_names = h.snap_names;
        rbd_dev->header.snap_sizes = h.snap_sizes;
        /* Free the extra copy of the object prefix */
-       WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
+       if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
+               rbd_warn(rbd_dev, "object prefix changed (ignoring)");
        kfree(h.object_prefix);
 
-       ret = rbd_dev_snaps_update(rbd_dev);
-       if (!ret)
-               ret = rbd_dev_snaps_register(rbd_dev);
-
        up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
 
-static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
+/*
+ * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
+ * has disappeared from the (just updated) snapshot context.
+ */
+static void rbd_exists_validate(struct rbd_device *rbd_dev)
+{
+       u64 snap_id;
+
+       if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
+               return;
+
+       snap_id = rbd_dev->spec->snap_id;
+       if (snap_id == CEPH_NOSNAP)
+               return;
+
+       if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
+               clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+}
+
+static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 {
+       u64 image_size;
        int ret;
 
        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       image_size = rbd_dev->header.image_size;
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        if (rbd_dev->image_format == 1)
-               ret = rbd_dev_v1_refresh(rbd_dev, hver);
+               ret = rbd_dev_v1_refresh(rbd_dev);
        else
-               ret = rbd_dev_v2_refresh(rbd_dev, hver);
+               ret = rbd_dev_v2_refresh(rbd_dev);
+
+       /* If it's a mapped snapshot, validate its EXISTS flag */
+
+       rbd_exists_validate(rbd_dev);
        mutex_unlock(&ctl_mutex);
+       if (ret)
+               rbd_warn(rbd_dev, "got notification but failed to "
+                          " update snaps: %d\n", ret);
+       if (image_size != rbd_dev->header.image_size)
+               revalidate_disk(rbd_dev->disk);
 
        return ret;
 }
@@ -2441,8 +3245,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 
        rbd_dev->disk = disk;
 
-       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
-
        return 0;
 out_disk:
        put_disk(disk);
@@ -2463,13 +3265,9 @@ static ssize_t rbd_size_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       sector_t size;
 
-       down_read(&rbd_dev->header_rwsem);
-       size = get_capacity(rbd_dev->disk);
-       up_read(&rbd_dev->header_rwsem);
-
-       return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
+       return sprintf(buf, "%llu\n",
+               (unsigned long long)rbd_dev->mapping.size);
 }
 
 /*
@@ -2482,7 +3280,7 @@ static ssize_t rbd_features_show(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "0x%016llx\n",
-                       (unsigned long long) rbd_dev->mapping.features);
+                       (unsigned long long)rbd_dev->mapping.features);
 }
 
 static ssize_t rbd_major_show(struct device *dev,
@@ -2490,7 +3288,11 @@ static ssize_t rbd_major_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%d\n", rbd_dev->major);
+       if (rbd_dev->major)
+               return sprintf(buf, "%d\n", rbd_dev->major);
+
+       return sprintf(buf, "(none)\n");
+
 }
 
 static ssize_t rbd_client_id_show(struct device *dev,
@@ -2516,7 +3318,7 @@ static ssize_t rbd_pool_id_show(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%llu\n",
-               (unsigned long long) rbd_dev->spec->pool_id);
+                       (unsigned long long) rbd_dev->spec->pool_id);
 }
 
 static ssize_t rbd_name_show(struct device *dev,
@@ -2602,7 +3404,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int ret;
 
-       ret = rbd_dev_refresh(rbd_dev, NULL);
+       ret = rbd_dev_refresh(rbd_dev);
 
        return ret < 0 ? ret : size;
 }
@@ -2653,71 +3455,6 @@ static struct device_type rbd_device_type = {
        .release        = rbd_sysfs_dev_release,
 };
 
-
-/*
-  sysfs - snapshots
-*/
-
-static ssize_t rbd_snap_size_show(struct device *dev,
-                                 struct device_attribute *attr,
-                                 char *buf)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-
-       return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
-}
-
-static ssize_t rbd_snap_id_show(struct device *dev,
-                               struct device_attribute *attr,
-                               char *buf)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-
-       return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
-}
-
-static ssize_t rbd_snap_features_show(struct device *dev,
-                               struct device_attribute *attr,
-                               char *buf)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-
-       return sprintf(buf, "0x%016llx\n",
-                       (unsigned long long) snap->features);
-}
-
-static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
-static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
-static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
-
-static struct attribute *rbd_snap_attrs[] = {
-       &dev_attr_snap_size.attr,
-       &dev_attr_snap_id.attr,
-       &dev_attr_snap_features.attr,
-       NULL,
-};
-
-static struct attribute_group rbd_snap_attr_group = {
-       .attrs = rbd_snap_attrs,
-};
-
-static void rbd_snap_dev_release(struct device *dev)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-       kfree(snap->name);
-       kfree(snap);
-}
-
-static const struct attribute_group *rbd_snap_attr_groups[] = {
-       &rbd_snap_attr_group,
-       NULL
-};
-
-static struct device_type rbd_snap_device_type = {
-       .groups         = rbd_snap_attr_groups,
-       .release        = rbd_snap_dev_release,
-};
-
 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
 {
        kref_get(&spec->kref);
@@ -2741,8 +3478,6 @@ static struct rbd_spec *rbd_spec_alloc(void)
                return NULL;
        kref_init(&spec->kref);
 
-       rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
-
        return spec;
 }
 
@@ -2769,7 +3504,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        spin_lock_init(&rbd_dev->lock);
        rbd_dev->flags = 0;
        INIT_LIST_HEAD(&rbd_dev->node);
-       INIT_LIST_HEAD(&rbd_dev->snaps);
        init_rwsem(&rbd_dev->header_rwsem);
 
        rbd_dev->spec = spec;
@@ -2787,96 +3521,11 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
 {
-       rbd_spec_put(rbd_dev->parent_spec);
-       kfree(rbd_dev->header_name);
        rbd_put_client(rbd_dev->rbd_client);
        rbd_spec_put(rbd_dev->spec);
        kfree(rbd_dev);
 }
 
-static bool rbd_snap_registered(struct rbd_snap *snap)
-{
-       bool ret = snap->dev.type == &rbd_snap_device_type;
-       bool reg = device_is_registered(&snap->dev);
-
-       rbd_assert(!ret ^ reg);
-
-       return ret;
-}
-
-static void rbd_remove_snap_dev(struct rbd_snap *snap)
-{
-       list_del(&snap->node);
-       if (device_is_registered(&snap->dev))
-               device_unregister(&snap->dev);
-}
-
-static int rbd_register_snap_dev(struct rbd_snap *snap,
-                                 struct device *parent)
-{
-       struct device *dev = &snap->dev;
-       int ret;
-
-       dev->type = &rbd_snap_device_type;
-       dev->parent = parent;
-       dev->release = rbd_snap_dev_release;
-       dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
-       dout("%s: registering device for snapshot %s\n", __func__, snap->name);
-
-       ret = device_register(dev);
-
-       return ret;
-}
-
-static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
-                                               const char *snap_name,
-                                               u64 snap_id, u64 snap_size,
-                                               u64 snap_features)
-{
-       struct rbd_snap *snap;
-       int ret;
-
-       snap = kzalloc(sizeof (*snap), GFP_KERNEL);
-       if (!snap)
-               return ERR_PTR(-ENOMEM);
-
-       ret = -ENOMEM;
-       snap->name = kstrdup(snap_name, GFP_KERNEL);
-       if (!snap->name)
-               goto err;
-
-       snap->id = snap_id;
-       snap->size = snap_size;
-       snap->features = snap_features;
-
-       return snap;
-
-err:
-       kfree(snap->name);
-       kfree(snap);
-
-       return ERR_PTR(ret);
-}
-
-static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
-               u64 *snap_size, u64 *snap_features)
-{
-       char *snap_name;
-
-       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
-
-       *snap_size = rbd_dev->header.snap_sizes[which];
-       *snap_features = 0;     /* No features for v1 */
-
-       /* Skip over names until we find the one we are looking for */
-
-       snap_name = rbd_dev->header.snap_names;
-       while (which--)
-               snap_name += strlen(snap_name) + 1;
-
-       return snap_name;
-}
-
 /*
  * Get the size and object order for an image snapshot, or if
  * snap_id is CEPH_NOSNAP, gets this information for the base
@@ -2894,18 +3543,21 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_size",
-                               (char *) &snapid, sizeof (snapid),
-                               (char *) &size_buf, sizeof (size_buf), NULL);
+                               &snapid, sizeof (snapid),
+                               &size_buf, sizeof (size_buf));
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
+       if (ret < sizeof (size_buf))
+               return -ERANGE;
 
-       *order = size_buf.order;
+       if (order)
+               *order = size_buf.order;
        *snap_size = le64_to_cpu(size_buf.size);
 
        dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
-               (unsigned long long) snap_id, (unsigned int) *order,
-               (unsigned long long) *snap_size);
+               (unsigned long long)snap_id, (unsigned int)*order,
+               (unsigned long long)*snap_size);
 
        return 0;
 }
@@ -2928,17 +3580,16 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
                return -ENOMEM;
 
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
-                               "rbd", "get_object_prefix",
-                               NULL, 0,
-                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
+                               "rbd", "get_object_prefix", NULL, 0,
+                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
 
        p = reply_buf;
        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
-                                               p + RBD_OBJ_PREFIX_LEN_MAX,
-                                               NULL, GFP_NOIO);
+                                               p + ret, NULL, GFP_NOIO);
+       ret = 0;
 
        if (IS_ERR(rbd_dev->header.object_prefix)) {
                ret = PTR_ERR(rbd_dev->header.object_prefix);
@@ -2946,7 +3597,6 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
        } else {
                dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
        }
-
 out:
        kfree(reply_buf);
 
@@ -2960,18 +3610,19 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
        struct {
                __le64 features;
                __le64 incompat;
-       } features_buf = { 0 };
+       } __attribute__ ((packed)) features_buf = { 0 };
        u64 incompat;
        int ret;
 
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_features",
-                               (char *) &snapid, sizeof (snapid),
-                               (char *) &features_buf, sizeof (features_buf),
-                               NULL);
+                               &snapid, sizeof (snapid),
+                               &features_buf, sizeof (features_buf));
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
+       if (ret < sizeof (features_buf))
+               return -ERANGE;
 
        incompat = le64_to_cpu(features_buf.incompat);
        if (incompat & ~RBD_FEATURES_SUPPORTED)
@@ -2980,9 +3631,9 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
        *snap_features = le64_to_cpu(features_buf.features);
 
        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
-               (unsigned long long) snap_id,
-               (unsigned long long) *snap_features,
-               (unsigned long long) le64_to_cpu(features_buf.incompat));
+               (unsigned long long)snap_id,
+               (unsigned long long)*snap_features,
+               (unsigned long long)le64_to_cpu(features_buf.incompat));
 
        return 0;
 }
@@ -3022,15 +3673,15 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        snapid = cpu_to_le64(CEPH_NOSNAP);
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_parent",
-                               (char *) &snapid, sizeof (snapid),
-                               (char *) reply_buf, size, NULL);
+                               &snapid, sizeof (snapid),
+                               reply_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out_err;
 
-       ret = -ERANGE;
        p = reply_buf;
-       end = (char *) reply_buf + size;
+       end = reply_buf + ret;
+       ret = -ERANGE;
        ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
        if (parent_spec->pool_id == CEPH_NOPOOL)
                goto out;       /* No parent?  No problem. */
@@ -3038,8 +3689,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        /* The ceph file layout needs to fit pool id in 32 bits */
 
        ret = -EIO;
-       if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
-               goto out;
+       if (parent_spec->pool_id > (u64)U32_MAX) {
+               rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
+                       (unsigned long long)parent_spec->pool_id, U32_MAX);
+               goto out_err;
+       }
 
        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
@@ -3062,6 +3716,56 @@ out_err:
        return ret;
 }
 
+static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
+{
+       struct {
+               __le64 stripe_unit;
+               __le64 stripe_count;
+       } __attribute__ ((packed)) striping_info_buf = { 0 };
+       size_t size = sizeof (striping_info_buf);
+       void *p;
+       u64 obj_size;
+       u64 stripe_unit;
+       u64 stripe_count;
+       int ret;
+
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_stripe_unit_count", NULL, 0,
+                               (char *)&striping_info_buf, size);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
+       if (ret < size)
+               return -ERANGE;
+
+       /*
+        * We don't actually support the "fancy striping" feature
+        * (STRIPINGV2) yet, but if the striping sizes are the
+        * defaults the behavior is the same as before.  So find
+        * out, and only fail if the image has non-default values.
+        */
+       ret = -EINVAL;
+       obj_size = (u64)1 << rbd_dev->header.obj_order;
+       p = &striping_info_buf;
+       stripe_unit = ceph_decode_64(&p);
+       if (stripe_unit != obj_size) {
+               rbd_warn(rbd_dev, "unsupported stripe unit "
+                               "(got %llu want %llu)",
+                               stripe_unit, obj_size);
+               return -EINVAL;
+       }
+       stripe_count = ceph_decode_64(&p);
+       if (stripe_count != 1) {
+               rbd_warn(rbd_dev, "unsupported stripe count "
+                               "(got %llu want 1)", stripe_count);
+               return -EINVAL;
+       }
+       rbd_dev->header.stripe_unit = stripe_unit;
+       rbd_dev->header.stripe_count = stripe_count;
+
+       return 0;
+}
+
 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
 {
        size_t image_id_size;
@@ -3083,8 +3787,8 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
                return NULL;
 
        p = image_id;
-       end = (char *) image_id + image_id_size;
-       ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
+       end = image_id + image_id_size;
+       ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
 
        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
        reply_buf = kmalloc(size, GFP_KERNEL);
@@ -3094,11 +3798,12 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
        ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
                                "rbd", "dir_get_name",
                                image_id, image_id_size,
-                               (char *) reply_buf, size, NULL);
+                               reply_buf, size);
        if (ret < 0)
                goto out;
        p = reply_buf;
-       end = (char *) reply_buf + size;
+       end = reply_buf + ret;
+
        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
        if (IS_ERR(image_name))
                image_name = NULL;
@@ -3111,69 +3816,134 @@ out:
        return image_name;
 }
 
+static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       const char *snap_name;
+       u32 which = 0;
+
+       /* Skip over names until we find the one we are looking for */
+
+       snap_name = rbd_dev->header.snap_names;
+       while (which < snapc->num_snaps) {
+               if (!strcmp(name, snap_name))
+                       return snapc->snaps[which];
+               snap_name += strlen(snap_name) + 1;
+               which++;
+       }
+       return CEPH_NOSNAP;
+}
+
+static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       u32 which;
+       bool found = false;
+       u64 snap_id;
+
+       for (which = 0; !found && which < snapc->num_snaps; which++) {
+               const char *snap_name;
+
+               snap_id = snapc->snaps[which];
+               snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
+               if (IS_ERR(snap_name))
+                       break;
+               found = !strcmp(name, snap_name);
+               kfree(snap_name);
+       }
+       return found ? snap_id : CEPH_NOSNAP;
+}
+
 /*
- * When a parent image gets probed, we only have the pool, image,
- * and snapshot ids but not the names of any of them.  This call
- * is made later to fill in those names.  It has to be done after
- * rbd_dev_snaps_update() has completed because some of the
- * information (in particular, snapshot name) is not available
- * until then.
+ * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
+ * no snapshot by that name is found, or if an error occurs.
  */
-static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
+static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
 {
-       struct ceph_osd_client *osdc;
-       const char *name;
-       void *reply_buf = NULL;
+       if (rbd_dev->image_format == 1)
+               return rbd_v1_snap_id_by_name(rbd_dev, name);
+
+       return rbd_v2_snap_id_by_name(rbd_dev, name);
+}
+
+/*
+ * When an rbd image has a parent image, it is identified by the
+ * pool, image, and snapshot ids (not names).  This function fills
+ * in the names for those ids.  (It's OK if we can't figure out the
+ * name for an image id, but the pool and snapshot ids should always
+ * exist and have names.)  All names in an rbd spec are dynamically
+ * allocated.
+ *
+ * When an image being mapped (not a parent) is probed, we have the
+ * pool name and pool id, image name and image id, and the snapshot
+ * name.  The only thing we're missing is the snapshot id.
+ */
+static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
+{
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_spec *spec = rbd_dev->spec;
+       const char *pool_name;
+       const char *image_name;
+       const char *snap_name;
        int ret;
 
-       if (rbd_dev->spec->pool_name)
-               return 0;       /* Already have the names */
+       /*
+        * An image being mapped will have the pool name (etc.), but
+        * we need to look up the snapshot id.
+        */
+       if (spec->pool_name) {
+               if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
+                       u64 snap_id;
+
+                       snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
+                       if (snap_id == CEPH_NOSNAP)
+                               return -ENOENT;
+                       spec->snap_id = snap_id;
+               } else {
+                       spec->snap_id = CEPH_NOSNAP;
+               }
+
+               return 0;
+       }
 
-       /* Look up the pool name */
+       /* Get the pool name; we have to make our own copy of this */
 
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
-       if (!name) {
-               rbd_warn(rbd_dev, "there is no pool with id %llu",
-                       rbd_dev->spec->pool_id);        /* Really a BUG() */
+       pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
+       if (!pool_name) {
+               rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
                return -EIO;
        }
-
-       rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
-       if (!rbd_dev->spec->pool_name)
+       pool_name = kstrdup(pool_name, GFP_KERNEL);
+       if (!pool_name)
                return -ENOMEM;
 
        /* Fetch the image name; tolerate failure here */
 
-       name = rbd_dev_image_name(rbd_dev);
-       if (name)
-               rbd_dev->spec->image_name = (char *) name;
-       else
+       image_name = rbd_dev_image_name(rbd_dev);
+       if (!image_name)
                rbd_warn(rbd_dev, "unable to get image name");
 
-       /* Look up the snapshot name. */
+       /* Look up the snapshot name, and make a copy */
 
-       name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
-       if (!name) {
-               rbd_warn(rbd_dev, "no snapshot with id %llu",
-                       rbd_dev->spec->snap_id);        /* Really a BUG() */
-               ret = -EIO;
+       snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
+       if (!snap_name) {
+               ret = -ENOMEM;
                goto out_err;
        }
-       rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
-       if(!rbd_dev->spec->snap_name)
-               goto out_err;
+
+       spec->pool_name = pool_name;
+       spec->image_name = image_name;
+       spec->snap_name = snap_name;
 
        return 0;
 out_err:
-       kfree(reply_buf);
-       kfree(rbd_dev->spec->pool_name);
-       rbd_dev->spec->pool_name = NULL;
+       kfree(image_name);
+       kfree(pool_name);
 
        return ret;
 }
 
-static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
 {
        size_t size;
        int ret;
@@ -3198,16 +3968,15 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
                return -ENOMEM;
 
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
-                               "rbd", "get_snapcontext",
-                               NULL, 0,
-                               reply_buf, size, ver);
+                               "rbd", "get_snapcontext", NULL, 0,
+                               reply_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
 
-       ret = -ERANGE;
        p = reply_buf;
-       end = (char *) reply_buf + size;
+       end = reply_buf + ret;
+       ret = -ERANGE;
        ceph_decode_64_safe(&p, end, seq, out);
        ceph_decode_32_safe(&p, end, snap_count, out);
 
@@ -3224,37 +3993,33 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
        }
        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
                goto out;
+       ret = 0;
 
-       size = sizeof (struct ceph_snap_context) +
-                               snap_count * sizeof (snapc->snaps[0]);
-       snapc = kmalloc(size, GFP_KERNEL);
+       snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
        if (!snapc) {
                ret = -ENOMEM;
                goto out;
        }
-
-       atomic_set(&snapc->nref, 1);
        snapc->seq = seq;
-       snapc->num_snaps = snap_count;
        for (i = 0; i < snap_count; i++)
                snapc->snaps[i] = ceph_decode_64(&p);
 
        rbd_dev->header.snapc = snapc;
 
        dout("  snap context seq = %llu, snap_count = %u\n",
-               (unsigned long long) seq, (unsigned int) snap_count);
-
+               (unsigned long long)seq, (unsigned int)snap_count);
 out:
        kfree(reply_buf);
 
-       return 0;
+       return ret;
 }
 
-static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
+static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
+                                       u64 snap_id)
 {
        size_t size;
        void *reply_buf;
-       __le64 snap_id;
+       __le64 snapid;
        int ret;
        void *p;
        void *end;
@@ -3265,236 +4030,52 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
        if (!reply_buf)
                return ERR_PTR(-ENOMEM);
 
-       snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+       snapid = cpu_to_le64(snap_id);
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapshot_name",
-                               (char *) &snap_id, sizeof (snap_id),
-                               reply_buf, size, NULL);
+                               &snapid, sizeof (snapid),
+                               reply_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
-       if (ret < 0)
+       if (ret < 0) {
+               snap_name = ERR_PTR(ret);
                goto out;
+       }
 
        p = reply_buf;
-       end = (char *) reply_buf + size;
+       end = reply_buf + ret;
        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
-       if (IS_ERR(snap_name)) {
-               ret = PTR_ERR(snap_name);
+       if (IS_ERR(snap_name))
                goto out;
-       } else {
-               dout("  snap_id 0x%016llx snap_name = %s\n",
-                       (unsigned long long) le64_to_cpu(snap_id), snap_name);
-       }
-       kfree(reply_buf);
 
-       return snap_name;
+       dout("  snap_id 0x%016llx snap_name = %s\n",
+               (unsigned long long)snap_id, snap_name);
 out:
        kfree(reply_buf);
 
-       return ERR_PTR(ret);
-}
-
-static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
-               u64 *snap_size, u64 *snap_features)
-{
-       u64 snap_id;
-       u8 order;
-       int ret;
-
-       snap_id = rbd_dev->header.snapc->snaps[which];
-       ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
-       if (ret)
-               return ERR_PTR(ret);
-       ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
-       if (ret)
-               return ERR_PTR(ret);
-
-       return rbd_dev_v2_snap_name(rbd_dev, which);
-}
-
-static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
-               u64 *snap_size, u64 *snap_features)
-{
-       if (rbd_dev->image_format == 1)
-               return rbd_dev_v1_snap_info(rbd_dev, which,
-                                       snap_size, snap_features);
-       if (rbd_dev->image_format == 2)
-               return rbd_dev_v2_snap_info(rbd_dev, which,
-                                       snap_size, snap_features);
-       return ERR_PTR(-EINVAL);
+       return snap_name;
 }
 
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
 {
        int ret;
-       __u8 obj_order;
 
        down_write(&rbd_dev->header_rwsem);
 
-       /* Grab old order first, to see if it changes */
-
-       obj_order = rbd_dev->header.obj_order,
        ret = rbd_dev_v2_image_size(rbd_dev);
        if (ret)
                goto out;
-       if (rbd_dev->header.obj_order != obj_order) {
-               ret = -EIO;
-               goto out;
-       }
        rbd_update_mapping_size(rbd_dev);
 
-       ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+       ret = rbd_dev_v2_snap_context(rbd_dev);
        dout("rbd_dev_v2_snap_context returned %d\n", ret);
        if (ret)
                goto out;
-       ret = rbd_dev_snaps_update(rbd_dev);
-       dout("rbd_dev_snaps_update returned %d\n", ret);
-       if (ret)
-               goto out;
-       ret = rbd_dev_snaps_register(rbd_dev);
-       dout("rbd_dev_snaps_register returned %d\n", ret);
 out:
        up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
 
-/*
- * Scan the rbd device's current snapshot list and compare it to the
- * newly-received snapshot context.  Remove any existing snapshots
- * not present in the new snapshot context.  Add a new snapshot for
- * any snaphots in the snapshot context not in the current list.
- * And verify there are no changes to snapshots we already know
- * about.
- *
- * Assumes the snapshots in the snapshot context are sorted by
- * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
- * are also maintained in that order.)
- */
-static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
-{
-       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
-       const u32 snap_count = snapc->num_snaps;
-       struct list_head *head = &rbd_dev->snaps;
-       struct list_head *links = head->next;
-       u32 index = 0;
-
-       dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
-       while (index < snap_count || links != head) {
-               u64 snap_id;
-               struct rbd_snap *snap;
-               char *snap_name;
-               u64 snap_size = 0;
-               u64 snap_features = 0;
-
-               snap_id = index < snap_count ? snapc->snaps[index]
-                                            : CEPH_NOSNAP;
-               snap = links != head ? list_entry(links, struct rbd_snap, node)
-                                    : NULL;
-               rbd_assert(!snap || snap->id != CEPH_NOSNAP);
-
-               if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
-                       struct list_head *next = links->next;
-
-                       /*
-                        * A previously-existing snapshot is not in
-                        * the new snap context.
-                        *
-                        * If the now missing snapshot is the one the
-                        * image is mapped to, clear its exists flag
-                        * so we can avoid sending any more requests
-                        * to it.
-                        */
-                       if (rbd_dev->spec->snap_id == snap->id)
-                               clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-                       rbd_remove_snap_dev(snap);
-                       dout("%ssnap id %llu has been removed\n",
-                               rbd_dev->spec->snap_id == snap->id ?
-                                                       "mapped " : "",
-                               (unsigned long long) snap->id);
-
-                       /* Done with this list entry; advance */
-
-                       links = next;
-                       continue;
-               }
-
-               snap_name = rbd_dev_snap_info(rbd_dev, index,
-                                       &snap_size, &snap_features);
-               if (IS_ERR(snap_name))
-                       return PTR_ERR(snap_name);
-
-               dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
-                       (unsigned long long) snap_id);
-               if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
-                       struct rbd_snap *new_snap;
-
-                       /* We haven't seen this snapshot before */
-
-                       new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
-                                       snap_id, snap_size, snap_features);
-                       if (IS_ERR(new_snap)) {
-                               int err = PTR_ERR(new_snap);
-
-                               dout("  failed to add dev, error %d\n", err);
-
-                               return err;
-                       }
-
-                       /* New goes before existing, or at end of list */
-
-                       dout("  added dev%s\n", snap ? "" : " at end\n");
-                       if (snap)
-                               list_add_tail(&new_snap->node, &snap->node);
-                       else
-                               list_add_tail(&new_snap->node, head);
-               } else {
-                       /* Already have this one */
-
-                       dout("  already present\n");
-
-                       rbd_assert(snap->size == snap_size);
-                       rbd_assert(!strcmp(snap->name, snap_name));
-                       rbd_assert(snap->features == snap_features);
-
-                       /* Done with this list entry; advance */
-
-                       links = links->next;
-               }
-
-               /* Advance to the next entry in the snapshot context */
-
-               index++;
-       }
-       dout("%s: done\n", __func__);
-
-       return 0;
-}
-
-/*
- * Scan the list of snapshots and register the devices for any that
- * have not already been registered.
- */
-static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
-{
-       struct rbd_snap *snap;
-       int ret = 0;
-
-       dout("%s:\n", __func__);
-       if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
-               return -EIO;
-
-       list_for_each_entry(snap, &rbd_dev->snaps, node) {
-               if (!rbd_snap_registered(snap)) {
-                       ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
-                       if (ret < 0)
-                               break;
-               }
-       }
-       dout("%s: returning %d\n", __func__, ret);
-
-       return ret;
-}
-
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
        struct device *dev;
@@ -3506,7 +4087,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
        dev->bus = &rbd_bus_type;
        dev->type = &rbd_device_type;
        dev->parent = &rbd_root_dev;
-       dev->release = rbd_dev_release;
+       dev->release = rbd_dev_device_release;
        dev_set_name(dev, "%d", rbd_dev->dev_id);
        ret = device_register(dev);
 
@@ -3720,6 +4301,7 @@ static int rbd_add_parse_args(const char *buf,
        size_t len;
        char *options;
        const char *mon_addrs;
+       char *snap_name;
        size_t mon_addrs_size;
        struct rbd_spec *spec = NULL;
        struct rbd_options *rbd_opts = NULL;
@@ -3778,10 +4360,11 @@ static int rbd_add_parse_args(const char *buf,
                ret = -ENAMETOOLONG;
                goto out_err;
        }
-       spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
-       if (!spec->snap_name)
+       snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
+       if (!snap_name)
                goto out_mem;
-       *(spec->snap_name + len) = '\0';
+       *(snap_name + len) = '\0';
+       spec->snap_name = snap_name;
 
        /* Initialize all rbd options to the defaults */
 
@@ -3835,15 +4418,19 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        size_t size;
        char *object_name;
        void *response;
-       void *p;
+       char *image_id;
 
        /*
         * When probing a parent image, the image id is already
         * known (and the image name likely is not).  There's no
-        * need to fetch the image id again in this case.
+        * need to fetch the image id again in this case.  We
+        * do still need to set the image format though.
         */
-       if (rbd_dev->spec->image_id)
+       if (rbd_dev->spec->image_id) {
+               rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
+
                return 0;
+       }
 
        /*
         * First, see if the format 2 image id file exists, and if
@@ -3865,23 +4452,32 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
                goto out;
        }
 
+       /* If it doesn't exist we'll assume it's a format 1 image */
+
        ret = rbd_obj_method_sync(rbd_dev, object_name,
-                               "rbd", "get_id",
-                               NULL, 0,
-                               response, RBD_IMAGE_ID_LEN_MAX, NULL);
+                               "rbd", "get_id", NULL, 0,
+                               response, RBD_IMAGE_ID_LEN_MAX);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
-       if (ret < 0)
-               goto out;
-
-       p = response;
-       rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
-                                               p + RBD_IMAGE_ID_LEN_MAX,
+       if (ret == -ENOENT) {
+               image_id = kstrdup("", GFP_KERNEL);
+               ret = image_id ? 0 : -ENOMEM;
+               if (!ret)
+                       rbd_dev->image_format = 1;
+       } else if (ret > sizeof (__le32)) {
+               void *p = response;
+
+               image_id = ceph_extract_encoded_string(&p, p + ret,
                                                NULL, GFP_NOIO);
-       if (IS_ERR(rbd_dev->spec->image_id)) {
-               ret = PTR_ERR(rbd_dev->spec->image_id);
-               rbd_dev->spec->image_id = NULL;
+               ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
+               if (!ret)
+                       rbd_dev->image_format = 2;
        } else {
-               dout("image_id is %s\n", rbd_dev->spec->image_id);
+               ret = -EINVAL;
+       }
+
+       if (!ret) {
+               rbd_dev->spec->image_id = image_id;
+               dout("image_id is %s\n", image_id);
        }
 out:
        kfree(response);
@@ -3890,27 +4486,30 @@ out:
        return ret;
 }
 
-static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+/* Undo whatever state changes are made by v1 or v2 image probe */
+
+static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 {
-       int ret;
-       size_t size;
+       struct rbd_image_header *header;
 
-       /* Version 1 images have no id; empty string is used */
+       rbd_dev_remove_parent(rbd_dev);
+       rbd_spec_put(rbd_dev->parent_spec);
+       rbd_dev->parent_spec = NULL;
+       rbd_dev->parent_overlap = 0;
 
-       rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
-       if (!rbd_dev->spec->image_id)
-               return -ENOMEM;
+       /* Free dynamic fields from the header, then zero it out */
 
-       /* Record the header object name for this rbd image. */
+       header = &rbd_dev->header;
+       ceph_put_snap_context(header->snapc);
+       kfree(header->snap_sizes);
+       kfree(header->snap_names);
+       kfree(header->object_prefix);
+       memset(header, 0, sizeof (*header));
+}
 
-       size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
-       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
-       if (!rbd_dev->header_name) {
-               ret = -ENOMEM;
-               goto out_err;
-       }
-       sprintf(rbd_dev->header_name, "%s%s",
-               rbd_dev->spec->image_name, RBD_SUFFIX);
+static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
 
        /* Populate rbd image metadata */
 
@@ -3923,8 +4522,6 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
        rbd_dev->parent_spec = NULL;
        rbd_dev->parent_overlap = 0;
 
-       rbd_dev->image_format = 1;
-
        dout("discovered version 1 image, header name is %s\n",
                rbd_dev->header_name);
 
@@ -3941,43 +4538,45 @@ out_err:
 
 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 {
-       size_t size;
        int ret;
-       u64 ver = 0;
-
-       /*
-        * Image id was filled in by the caller.  Record the header
-        * object name for this rbd image.
-        */
-       size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
-       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
-       if (!rbd_dev->header_name)
-               return -ENOMEM;
-       sprintf(rbd_dev->header_name, "%s%s",
-                       RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
-
-       /* Get the size and object order for the image */
 
        ret = rbd_dev_v2_image_size(rbd_dev);
-       if (ret < 0)
+       if (ret)
                goto out_err;
 
        /* Get the object prefix (a.k.a. block_name) for the image */
 
        ret = rbd_dev_v2_object_prefix(rbd_dev);
-       if (ret < 0)
+       if (ret)
                goto out_err;
 
        /* Get the and check features for the image */
 
        ret = rbd_dev_v2_features(rbd_dev);
-       if (ret < 0)
+       if (ret)
                goto out_err;
 
        /* If the image supports layering, get the parent info */
 
        if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
                ret = rbd_dev_v2_parent_info(rbd_dev);
+               if (ret)
+                       goto out_err;
+
+               /*
+                * Don't print a warning for parent images.  We can
+                * tell this point because we won't know its pool
+                * name yet (just its pool id).
+                */
+               if (rbd_dev->spec->pool_name)
+                       rbd_warn(rbd_dev, "WARNING: kernel layering "
+                                       "is EXPERIMENTAL!");
+       }
+
+       /* If the image supports fancy striping, get its parameters */
+
+       if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
+               ret = rbd_dev_v2_striping_info(rbd_dev);
                if (ret < 0)
                        goto out_err;
        }
@@ -3989,12 +4588,9 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 
        /* Get the snapshot context, plus the header version */
 
-       ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
+       ret = rbd_dev_v2_snap_context(rbd_dev);
        if (ret)
                goto out_err;
-       rbd_dev->header.obj_version = ver;
-
-       rbd_dev->image_format = 2;
 
        dout("discovered version 2 image, header name is %s\n",
                rbd_dev->header_name);
@@ -4012,22 +4608,54 @@ out_err:
        return ret;
 }
 
-static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
+static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
 {
+       struct rbd_device *parent = NULL;
+       struct rbd_spec *parent_spec;
+       struct rbd_client *rbdc;
        int ret;
 
-       /* no need to lock here, as rbd_dev is not registered yet */
-       ret = rbd_dev_snaps_update(rbd_dev);
-       if (ret)
-               return ret;
+       if (!rbd_dev->parent_spec)
+               return 0;
+       /*
+        * We need to pass a reference to the client and the parent
+        * spec when creating the parent rbd_dev.  Images related by
+        * parent/child relationships always share both.
+        */
+       parent_spec = rbd_spec_get(rbd_dev->parent_spec);
+       rbdc = __rbd_get_client(rbd_dev->rbd_client);
 
-       ret = rbd_dev_probe_update_spec(rbd_dev);
-       if (ret)
-               goto err_out_snaps;
+       ret = -ENOMEM;
+       parent = rbd_dev_create(rbdc, parent_spec);
+       if (!parent)
+               goto out_err;
+
+       ret = rbd_dev_image_probe(parent);
+       if (ret < 0)
+               goto out_err;
+       rbd_dev->parent = parent;
+
+       return 0;
+out_err:
+       if (parent) {
+               rbd_spec_put(rbd_dev->parent_spec);
+               kfree(rbd_dev->header_name);
+               rbd_dev_destroy(parent);
+       } else {
+               rbd_put_client(rbdc);
+               rbd_spec_put(parent_spec);
+       }
 
-       ret = rbd_dev_set_mapping(rbd_dev);
+       return ret;
+}
+
+static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       ret = rbd_dev_mapping_set(rbd_dev);
        if (ret)
-               goto err_out_snaps;
+               return ret;
 
        /* generate unique id: find highest unique id, add one */
        rbd_dev_id_get(rbd_dev);
@@ -4054,54 +4682,81 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_disk;
 
-       /*
-        * At this point cleanup in the event of an error is the job
-        * of the sysfs code (initiated by rbd_bus_del_dev()).
-        */
-       down_write(&rbd_dev->header_rwsem);
-       ret = rbd_dev_snaps_register(rbd_dev);
-       up_write(&rbd_dev->header_rwsem);
-       if (ret)
-               goto err_out_bus;
-
-       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
-       if (ret)
-               goto err_out_bus;
-
        /* Everything's ready.  Announce the disk to the world. */
 
+       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        add_disk(rbd_dev->disk);
 
        pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
                (unsigned long long) rbd_dev->mapping.size);
 
        return ret;
-err_out_bus:
-       /* this will also clean up rest of rbd_dev stuff */
-
-       rbd_bus_del_dev(rbd_dev);
 
-       return ret;
 err_out_disk:
        rbd_free_disk(rbd_dev);
 err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_id:
        rbd_dev_id_put(rbd_dev);
-err_out_snaps:
-       rbd_remove_all_snaps(rbd_dev);
+       rbd_dev_mapping_clear(rbd_dev);
 
        return ret;
 }
 
+static int rbd_dev_header_name(struct rbd_device *rbd_dev)
+{
+       struct rbd_spec *spec = rbd_dev->spec;
+       size_t size;
+
+       /* Record the header object name for this rbd image. */
+
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+       if (rbd_dev->image_format == 1)
+               size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
+       else
+               size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
+
+       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+       if (!rbd_dev->header_name)
+               return -ENOMEM;
+
+       if (rbd_dev->image_format == 1)
+               sprintf(rbd_dev->header_name, "%s%s",
+                       spec->image_name, RBD_SUFFIX);
+       else
+               sprintf(rbd_dev->header_name, "%s%s",
+                       RBD_HEADER_PREFIX, spec->image_id);
+       return 0;
+}
+
+static void rbd_dev_image_release(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       rbd_dev_unprobe(rbd_dev);
+       ret = rbd_dev_header_watch_sync(rbd_dev, 0);
+       if (ret)
+               rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
+       kfree(rbd_dev->header_name);
+       rbd_dev->header_name = NULL;
+       rbd_dev->image_format = 0;
+       kfree(rbd_dev->spec->image_id);
+       rbd_dev->spec->image_id = NULL;
+
+       rbd_dev_destroy(rbd_dev);
+}
+
 /*
  * Probe for the existence of the header object for the given rbd
  * device.  For format 2 images this includes determining the image
  * id.
  */
-static int rbd_dev_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
 {
        int ret;
+       int tmp;
 
        /*
         * Get the id from the image id object.  If it's not a
@@ -4110,18 +4765,48 @@ static int rbd_dev_probe(struct rbd_device *rbd_dev)
         */
        ret = rbd_dev_image_id(rbd_dev);
        if (ret)
+               return ret;
+       rbd_assert(rbd_dev->spec->image_id);
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+       ret = rbd_dev_header_name(rbd_dev);
+       if (ret)
+               goto err_out_format;
+
+       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
+       if (ret)
+               goto out_header_name;
+
+       if (rbd_dev->image_format == 1)
                ret = rbd_dev_v1_probe(rbd_dev);
        else
                ret = rbd_dev_v2_probe(rbd_dev);
-       if (ret) {
-               dout("probe failed, returning %d\n", ret);
-
-               return ret;
-       }
+       if (ret)
+               goto err_out_watch;
 
-       ret = rbd_dev_probe_finish(rbd_dev);
+       ret = rbd_dev_spec_update(rbd_dev);
        if (ret)
-               rbd_header_free(&rbd_dev->header);
+               goto err_out_probe;
+
+       ret = rbd_dev_probe_parent(rbd_dev);
+       if (!ret)
+               return 0;
+
+err_out_probe:
+       rbd_dev_unprobe(rbd_dev);
+err_out_watch:
+       tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
+       if (tmp)
+               rbd_warn(rbd_dev, "unable to tear down watch request\n");
+out_header_name:
+       kfree(rbd_dev->header_name);
+       rbd_dev->header_name = NULL;
+err_out_format:
+       rbd_dev->image_format = 0;
+       kfree(rbd_dev->spec->image_id);
+       rbd_dev->spec->image_id = NULL;
+
+       dout("probe failed, returning %d\n", ret);
 
        return ret;
 }
@@ -4158,11 +4843,13 @@ static ssize_t rbd_add(struct bus_type *bus,
        rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
        if (rc < 0)
                goto err_out_client;
-       spec->pool_id = (u64) rc;
+       spec->pool_id = (u64)rc;
 
        /* The ceph file layout needs to fit pool id in 32 bits */
 
-       if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
+       if (spec->pool_id > (u64)U32_MAX) {
+               rbd_warn(NULL, "pool id too large (%llu > %u)\n",
+                               (unsigned long long)spec->pool_id, U32_MAX);
                rc = -EIO;
                goto err_out_client;
        }
@@ -4177,11 +4864,15 @@ static ssize_t rbd_add(struct bus_type *bus,
        kfree(rbd_opts);
        rbd_opts = NULL;        /* done with this */
 
-       rc = rbd_dev_probe(rbd_dev);
+       rc = rbd_dev_image_probe(rbd_dev);
        if (rc < 0)
                goto err_out_rbd_dev;
 
-       return count;
+       rc = rbd_dev_device_setup(rbd_dev);
+       if (!rc)
+               return count;
+
+       rbd_dev_image_release(rbd_dev);
 err_out_rbd_dev:
        rbd_dev_destroy(rbd_dev);
 err_out_client:
@@ -4196,7 +4887,7 @@ err_out_module:
 
        dout("Error adding device %s\n", buf);
 
-       return (ssize_t) rc;
+       return (ssize_t)rc;
 }
 
 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
@@ -4216,27 +4907,43 @@ static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
        return NULL;
 }
 
-static void rbd_dev_release(struct device *dev)
+static void rbd_dev_device_release(struct device *dev)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       if (rbd_dev->watch_event)
-               rbd_dev_header_watch_sync(rbd_dev, 0);
-
-       /* clean up and free blkdev */
        rbd_free_disk(rbd_dev);
+       clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+       rbd_dev_clear_mapping(rbd_dev);
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
-
-       /* release allocated disk header fields */
-       rbd_header_free(&rbd_dev->header);
-
-       /* done with the id, and with the rbd_dev */
+       rbd_dev->major = 0;
        rbd_dev_id_put(rbd_dev);
-       rbd_assert(rbd_dev->rbd_client != NULL);
-       rbd_dev_destroy(rbd_dev);
+       rbd_dev_mapping_clear(rbd_dev);
+}
 
-       /* release module ref */
-       module_put(THIS_MODULE);
+static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
+{
+       while (rbd_dev->parent) {
+               struct rbd_device *first = rbd_dev;
+               struct rbd_device *second = first->parent;
+               struct rbd_device *third;
+
+               /*
+                * Follow to the parent with no grandparent and
+                * remove it.
+                */
+               while (second && (third = second->parent)) {
+                       first = second;
+                       second = third;
+               }
+               rbd_assert(second);
+               rbd_dev_image_release(second);
+               first->parent = NULL;
+               first->parent_overlap = 0;
+
+               rbd_assert(first->parent_spec);
+               rbd_spec_put(first->parent_spec);
+               first->parent_spec = NULL;
+       }
 }
 
 static ssize_t rbd_remove(struct bus_type *bus,
@@ -4244,13 +4951,13 @@ static ssize_t rbd_remove(struct bus_type *bus,
                          size_t count)
 {
        struct rbd_device *rbd_dev = NULL;
-       int target_id, rc;
+       int target_id;
        unsigned long ul;
-       int ret = count;
+       int ret;
 
-       rc = strict_strtoul(buf, 10, &ul);
-       if (rc)
-               return rc;
+       ret = strict_strtoul(buf, 10, &ul);
+       if (ret)
+               return ret;
 
        /* convert to int; abort if we lost anything in the conversion */
        target_id = (int) ul;
@@ -4273,10 +4980,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
        spin_unlock_irq(&rbd_dev->lock);
        if (ret < 0)
                goto done;
-
-       rbd_remove_all_snaps(rbd_dev);
+       ret = count;
        rbd_bus_del_dev(rbd_dev);
-
+       rbd_dev_image_release(rbd_dev);
+       module_put(THIS_MODULE);
 done:
        mutex_unlock(&ctl_mutex);
 
@@ -4308,6 +5015,56 @@ static void rbd_sysfs_cleanup(void)
        device_unregister(&rbd_root_dev);
 }
 
+static int rbd_slab_init(void)
+{
+       rbd_assert(!rbd_img_request_cache);
+       rbd_img_request_cache = kmem_cache_create("rbd_img_request",
+                                       sizeof (struct rbd_img_request),
+                                       __alignof__(struct rbd_img_request),
+                                       0, NULL);
+       if (!rbd_img_request_cache)
+               return -ENOMEM;
+
+       rbd_assert(!rbd_obj_request_cache);
+       rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
+                                       sizeof (struct rbd_obj_request),
+                                       __alignof__(struct rbd_obj_request),
+                                       0, NULL);
+       if (!rbd_obj_request_cache)
+               goto out_err;
+
+       rbd_assert(!rbd_segment_name_cache);
+       rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
+                                       MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
+       if (rbd_segment_name_cache)
+               return 0;
+out_err:
+       if (rbd_obj_request_cache) {
+               kmem_cache_destroy(rbd_obj_request_cache);
+               rbd_obj_request_cache = NULL;
+       }
+
+       kmem_cache_destroy(rbd_img_request_cache);
+       rbd_img_request_cache = NULL;
+
+       return -ENOMEM;
+}
+
+static void rbd_slab_exit(void)
+{
+       rbd_assert(rbd_segment_name_cache);
+       kmem_cache_destroy(rbd_segment_name_cache);
+       rbd_segment_name_cache = NULL;
+
+       rbd_assert(rbd_obj_request_cache);
+       kmem_cache_destroy(rbd_obj_request_cache);
+       rbd_obj_request_cache = NULL;
+
+       rbd_assert(rbd_img_request_cache);
+       kmem_cache_destroy(rbd_img_request_cache);
+       rbd_img_request_cache = NULL;
+}
+
 static int __init rbd_init(void)
 {
        int rc;
@@ -4317,16 +5074,22 @@ static int __init rbd_init(void)
 
                return -EINVAL;
        }
-       rc = rbd_sysfs_init();
+       rc = rbd_slab_init();
        if (rc)
                return rc;
-       pr_info("loaded " RBD_DRV_NAME_LONG "\n");
-       return 0;
+       rc = rbd_sysfs_init();
+       if (rc)
+               rbd_slab_exit();
+       else
+               pr_info("loaded " RBD_DRV_NAME_LONG "\n");
+
+       return rc;
 }
 
 static void __exit rbd_exit(void)
 {
        rbd_sysfs_cleanup();
+       rbd_slab_exit();
 }
 
 module_init(rbd_init);