]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
rbd: define parent image request routines
[~andy/linux] / drivers / block / rbd.c
index 3cc080c5c49e5840c0291652b6a50684d1cfd5e6..1ffdfbfbf3c41e39c73411ae4c9a0a120ca261e3 100644 (file)
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/decode.h>
 #include <linux/parser.h>
+#include <linux/bsearch.h>
 
 #include <linux/kernel.h>
 #include <linux/device.h>
 #include <linux/module.h>
 #include <linux/fs.h>
 #include <linux/blkdev.h>
+#include <linux/slab.h>
 
 #include "rbd_types.h"
 
@@ -66,6 +68,8 @@
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+#define        BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
+
 /* This allows a single page to hold an image name sent by OSD */
 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
 #define RBD_IMAGE_ID_LEN_MAX   64
  * block device image metadata (in-memory version)
  */
 struct rbd_image_header {
-       /* These four fields never change for a given rbd image */
+       /* These six fields never change for a given rbd image */
        char *object_prefix;
-       u64 features;
        __u8 obj_order;
        __u8 crypt_type;
        __u8 comp_type;
+       u64 stripe_unit;
+       u64 stripe_count;
+       u64 features;           /* Might be changeable someday? */
 
        /* The remaining fields need to be updated occasionally */
        u64 image_size;
        struct ceph_snap_context *snapc;
-       char *snap_names;
-       u64 *snap_sizes;
-
-       u64 stripe_unit;
-       u64 stripe_count;
+       char *snap_names;       /* format 1 only */
+       u64 *snap_sizes;        /* format 1 only */
 };
 
 /*
@@ -221,6 +224,7 @@ struct rbd_obj_request {
                };
        };
        struct page             **copyup_pages;
+       u32                     copyup_page_count;
 
        struct ceph_osd_request *osd_req;
 
@@ -253,6 +257,7 @@ struct rbd_img_request {
                struct rbd_obj_request  *obj_request;   /* obj req initiator */
        };
        struct page             **copyup_pages;
+       u32                     copyup_page_count;
        spinlock_t              completion_lock;/* protects next_completion */
        u32                     next_completion;
        rbd_img_callback_t      callback;
@@ -272,14 +277,6 @@ struct rbd_img_request {
 #define for_each_obj_request_safe(ireq, oreq, n) \
        list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
-struct rbd_snap {
-       const char              *name;
-       u64                     size;
-       struct list_head        node;
-       u64                     id;
-       u64                     features;
-};
-
 struct rbd_mapping {
        u64                     size;
        u64                     features;
@@ -324,9 +321,6 @@ struct rbd_device {
 
        struct list_head        node;
 
-       /* list of snapshots */
-       struct list_head        snaps;
-
        /* sysfs related */
        struct device           dev;
        unsigned long           open_count;     /* protected by lock */
@@ -352,18 +346,21 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
 static LIST_HEAD(rbd_client_list);             /* clients */
 static DEFINE_SPINLOCK(rbd_client_list_lock);
 
-static int rbd_img_request_submit(struct rbd_img_request *img_request);
+/* Slab caches for frequently-allocated structures */
+
+static struct kmem_cache       *rbd_img_request_cache;
+static struct kmem_cache       *rbd_obj_request_cache;
+static struct kmem_cache       *rbd_segment_name_cache;
 
-static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
+static int rbd_img_request_submit(struct rbd_img_request *img_request);
 
 static void rbd_dev_device_release(struct device *dev);
-static void rbd_snap_destroy(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
                          size_t count);
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 
 static struct bus_attribute rbd_bus_attrs[] = {
        __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -430,7 +427,15 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
+static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
+                                       u64 snap_id);
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+                               u8 *order, u64 *snap_size);
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+               u64 *snap_features);
+static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -725,148 +730,271 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 }
 
 /*
- * Create a new header structure, translate header format from the on-disk
- * header.
+ * Fill an rbd image header with information from the given format 1
+ * on-disk header.
  */
-static int rbd_header_from_disk(struct rbd_image_header *header,
+static int rbd_header_from_disk(struct rbd_device *rbd_dev,
                                 struct rbd_image_header_ondisk *ondisk)
 {
+       struct rbd_image_header *header = &rbd_dev->header;
+       bool first_time = header->object_prefix == NULL;
+       struct ceph_snap_context *snapc;
+       char *object_prefix = NULL;
+       char *snap_names = NULL;
+       u64 *snap_sizes = NULL;
        u32 snap_count;
-       size_t len;
        size_t size;
+       int ret = -ENOMEM;
        u32 i;
 
-       memset(header, 0, sizeof (*header));
+       /* Allocate this now to avoid having to handle failure below */
 
-       snap_count = le32_to_cpu(ondisk->snap_count);
+       if (first_time) {
+               size_t len;
 
-       len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
-       header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
-       if (!header->object_prefix)
-               return -ENOMEM;
-       memcpy(header->object_prefix, ondisk->object_prefix, len);
-       header->object_prefix[len] = '\0';
+               len = strnlen(ondisk->object_prefix,
+                               sizeof (ondisk->object_prefix));
+               object_prefix = kmalloc(len + 1, GFP_KERNEL);
+               if (!object_prefix)
+                       return -ENOMEM;
+               memcpy(object_prefix, ondisk->object_prefix, len);
+               object_prefix[len] = '\0';
+       }
 
+       /* Allocate the snapshot context and fill it in */
+
+       snap_count = le32_to_cpu(ondisk->snap_count);
+       snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
+       if (!snapc)
+               goto out_err;
+       snapc->seq = le64_to_cpu(ondisk->snap_seq);
        if (snap_count) {
+               struct rbd_image_snap_ondisk *snaps;
                u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 
-               /* Save a copy of the snapshot names */
+               /* We'll keep a copy of the snapshot names... */
+
+               if (snap_names_len > (u64)SIZE_MAX)
+                       goto out_2big;
+               snap_names = kmalloc(snap_names_len, GFP_KERNEL);
+               if (!snap_names)
+                       goto out_err;
+
+               /* ...as well as the array of their sizes. */
 
-               if (snap_names_len > (u64) SIZE_MAX)
-                       return -EIO;
-               header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
-               if (!header->snap_names)
+               size = snap_count * sizeof (*header->snap_sizes);
+               snap_sizes = kmalloc(size, GFP_KERNEL);
+               if (!snap_sizes)
                        goto out_err;
+
                /*
-                * Note that rbd_dev_v1_header_read() guarantees
-                * the ondisk buffer we're working with has
+                * Copy the names, and fill in each snapshot's id
+                * and size.
+                *
+                * Note that rbd_dev_v1_header_info() guarantees the
+                * ondisk buffer we're working with has
                 * snap_names_len bytes beyond the end of the
                 * snapshot id array, this memcpy() is safe.
                 */
-               memcpy(header->snap_names, &ondisk->snaps[snap_count],
-                       snap_names_len);
+               memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
+               snaps = ondisk->snaps;
+               for (i = 0; i < snap_count; i++) {
+                       snapc->snaps[i] = le64_to_cpu(snaps[i].id);
+                       snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
+               }
+       }
 
-               /* Record each snapshot's size */
+       /* We won't fail any more, fill in the header */
 
-               size = snap_count * sizeof (*header->snap_sizes);
-               header->snap_sizes = kmalloc(size, GFP_KERNEL);
-               if (!header->snap_sizes)
-                       goto out_err;
-               for (i = 0; i < snap_count; i++)
-                       header->snap_sizes[i] =
-                               le64_to_cpu(ondisk->snaps[i].image_size);
+       down_write(&rbd_dev->header_rwsem);
+       if (first_time) {
+               header->object_prefix = object_prefix;
+               header->obj_order = ondisk->options.order;
+               header->crypt_type = ondisk->options.crypt_type;
+               header->comp_type = ondisk->options.comp_type;
+               /* The rest aren't used for format 1 images */
+               header->stripe_unit = 0;
+               header->stripe_count = 0;
+               header->features = 0;
        } else {
-               header->snap_names = NULL;
-               header->snap_sizes = NULL;
+               ceph_put_snap_context(header->snapc);
+               kfree(header->snap_names);
+               kfree(header->snap_sizes);
        }
 
-       header->features = 0;   /* No features support in v1 images */
-       header->obj_order = ondisk->options.order;
-       header->crypt_type = ondisk->options.crypt_type;
-       header->comp_type = ondisk->options.comp_type;
-
-       /* Allocate and fill in the snapshot context */
+       /* The remaining fields always get updated (when we refresh) */
 
        header->image_size = le64_to_cpu(ondisk->image_size);
+       header->snapc = snapc;
+       header->snap_names = snap_names;
+       header->snap_sizes = snap_sizes;
 
-       header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
-       if (!header->snapc)
-               goto out_err;
-       header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
-       for (i = 0; i < snap_count; i++)
-               header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
+       /* Make sure mapping size is consistent with header info */
 
-       return 0;
+       if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
+               if (rbd_dev->mapping.size != header->image_size)
+                       rbd_dev->mapping.size = header->image_size;
 
+       up_write(&rbd_dev->header_rwsem);
+
+       return 0;
+out_2big:
+       ret = -EIO;
 out_err:
-       kfree(header->snap_sizes);
-       header->snap_sizes = NULL;
-       kfree(header->snap_names);
-       header->snap_names = NULL;
-       kfree(header->object_prefix);
-       header->object_prefix = NULL;
+       kfree(snap_sizes);
+       kfree(snap_names);
+       ceph_put_snap_context(snapc);
+       kfree(object_prefix);
 
-       return -ENOMEM;
+       return ret;
 }
 
-static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
+{
+       const char *snap_name;
+
+       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+       /* Skip over names until we find the one we are looking for */
+
+       snap_name = rbd_dev->header.snap_names;
+       while (which--)
+               snap_name += strlen(snap_name) + 1;
+
+       return kstrdup(snap_name, GFP_KERNEL);
+}
+
+/*
+ * Snapshot id comparison function for use with qsort()/bsearch().
+ * Note that result is for snapshots in *descending* order.
+ */
+static int snapid_compare_reverse(const void *s1, const void *s2)
 {
-       struct rbd_snap *snap;
+       u64 snap_id1 = *(u64 *)s1;
+       u64 snap_id2 = *(u64 *)s2;
 
+       if (snap_id1 < snap_id2)
+               return 1;
+       return snap_id1 == snap_id2 ? 0 : -1;
+}
+
+/*
+ * Search a snapshot context to see if the given snapshot id is
+ * present.
+ *
+ * Returns the position of the snapshot id in the array if it's found,
+ * or BAD_SNAP_INDEX otherwise.
+ *
+ * Note: The snapshot array is in kept sorted (by the osd) in
+ * reverse order, highest snapshot id first.
+ */
+static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
+{
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       u64 *found;
+
+       found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
+                               sizeof (snap_id), snapid_compare_reverse);
+
+       return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
+}
+
+static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
+                                       u64 snap_id)
+{
+       u32 which;
+
+       which = rbd_dev_snap_index(rbd_dev, snap_id);
+       if (which == BAD_SNAP_INDEX)
+               return NULL;
+
+       return _rbd_dev_v1_snap_name(rbd_dev, which);
+}
+
+static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+{
        if (snap_id == CEPH_NOSNAP)
                return RBD_SNAP_HEAD_NAME;
 
-       list_for_each_entry(snap, &rbd_dev->snaps, node)
-               if (snap_id == snap->id)
-                       return snap->name;
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       if (rbd_dev->image_format == 1)
+               return rbd_dev_v1_snap_name(rbd_dev, snap_id);
 
-       return NULL;
+       return rbd_dev_v2_snap_name(rbd_dev, snap_id);
 }
 
-static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
-                                       const char *snap_name)
+static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+                               u64 *snap_size)
 {
-       struct rbd_snap *snap;
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       if (snap_id == CEPH_NOSNAP) {
+               *snap_size = rbd_dev->header.image_size;
+       } else if (rbd_dev->image_format == 1) {
+               u32 which;
 
-       list_for_each_entry(snap, &rbd_dev->snaps, node)
-               if (!strcmp(snap_name, snap->name))
-                       return snap;
+               which = rbd_dev_snap_index(rbd_dev, snap_id);
+               if (which == BAD_SNAP_INDEX)
+                       return -ENOENT;
 
-       return NULL;
+               *snap_size = rbd_dev->header.snap_sizes[which];
+       } else {
+               u64 size = 0;
+               int ret;
+
+               ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
+               if (ret)
+                       return ret;
+
+               *snap_size = size;
+       }
+       return 0;
 }
 
-static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
+static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+                       u64 *snap_features)
 {
-       if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
-                   sizeof (RBD_SNAP_HEAD_NAME))) {
-               rbd_dev->mapping.size = rbd_dev->header.image_size;
-               rbd_dev->mapping.features = rbd_dev->header.features;
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       if (snap_id == CEPH_NOSNAP) {
+               *snap_features = rbd_dev->header.features;
+       } else if (rbd_dev->image_format == 1) {
+               *snap_features = 0;     /* No features for format 1 */
        } else {
-               struct rbd_snap *snap;
+               u64 features = 0;
+               int ret;
 
-               snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
-               if (!snap)
-                       return -ENOENT;
-               rbd_dev->mapping.size = snap->size;
-               rbd_dev->mapping.features = snap->features;
-               rbd_dev->mapping.read_only = true;
-       }
+               ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
+               if (ret)
+                       return ret;
 
+               *snap_features = features;
+       }
        return 0;
 }
 
-static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
+static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
 {
-       rbd_dev->mapping.size = 0;
-       rbd_dev->mapping.features = 0;
-       rbd_dev->mapping.read_only = true;
+       u64 snap_id = rbd_dev->spec->snap_id;
+       u64 size = 0;
+       u64 features = 0;
+       int ret;
+
+       ret = rbd_snap_size(rbd_dev, snap_id, &size);
+       if (ret)
+               return ret;
+       ret = rbd_snap_features(rbd_dev, snap_id, &features);
+       if (ret)
+               return ret;
+
+       rbd_dev->mapping.size = size;
+       rbd_dev->mapping.features = features;
+
+       return 0;
 }
 
-static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
+static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
 {
        rbd_dev->mapping.size = 0;
        rbd_dev->mapping.features = 0;
-       rbd_dev->mapping.read_only = true;
 }
 
 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
@@ -875,7 +1003,7 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
        u64 segment;
        int ret;
 
-       name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
+       name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
        if (!name)
                return NULL;
        segment = offset >> rbd_dev->header.obj_order;
@@ -891,6 +1019,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
        return name;
 }
 
+static void rbd_segment_name_free(const char *name)
+{
+       /* The explicit cast here is needed to drop the const qualifier */
+
+       kmem_cache_free(rbd_segment_name_cache, (void *)name);
+}
+
 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 {
        u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -1224,20 +1359,18 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
-       dout("%s: img %p (was %d)\n", __func__, img_request,
-               atomic_read(&img_request->kref.refcount));
-       kref_get(&img_request->kref);
-}
-
+static bool img_request_child_test(struct rbd_img_request *img_request);
+static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
 static void rbd_img_request_put(struct rbd_img_request *img_request)
 {
        rbd_assert(img_request != NULL);
        dout("%s: img %p (was %d)\n", __func__, img_request,
                atomic_read(&img_request->kref.refcount));
-       kref_put(&img_request->kref, rbd_img_request_destroy);
+       if (img_request_child_test(img_request))
+               kref_put(&img_request->kref, rbd_parent_request_destroy);
+       else
+               kref_put(&img_request->kref, rbd_img_request_destroy);
 }
 
 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
@@ -1354,6 +1487,12 @@ static void img_request_child_set(struct rbd_img_request *img_request)
        smp_mb();
 }
 
+static void img_request_child_clear(struct rbd_img_request *img_request)
+{
+       clear_bit(IMG_REQ_CHILD, &img_request->flags);
+       smp_mb();
+}
+
 static bool img_request_child_test(struct rbd_img_request *img_request)
 {
        smp_mb();
@@ -1649,11 +1788,16 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
        rbd_assert(obj_request_type_valid(type));
 
        size = strlen(object_name) + 1;
-       obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
-       if (!obj_request)
+       name = kmalloc(size, GFP_KERNEL);
+       if (!name)
+               return NULL;
+
+       obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
+       if (!obj_request) {
+               kfree(name);
                return NULL;
+       }
 
-       name = (char *)(obj_request + 1);
        obj_request->object_name = memcpy(name, object_name, size);
        obj_request->offset = offset;
        obj_request->length = length;
@@ -1699,7 +1843,20 @@ static void rbd_obj_request_destroy(struct kref *kref)
                break;
        }
 
-       kfree(obj_request);
+       kfree(obj_request->object_name);
+       obj_request->object_name = NULL;
+       kmem_cache_free(rbd_obj_request_cache, obj_request);
+}
+
+/* It's OK to call this for a device with no parent */
+
+static void rbd_spec_put(struct rbd_spec *spec);
+static void rbd_dev_unparent(struct rbd_device *rbd_dev)
+{
+       rbd_dev_remove_parent(rbd_dev);
+       rbd_spec_put(rbd_dev->parent_spec);
+       rbd_dev->parent_spec = NULL;
+       rbd_dev->parent_overlap = 0;
 }
 
 /*
@@ -1710,12 +1867,11 @@ static void rbd_obj_request_destroy(struct kref *kref)
 static struct rbd_img_request *rbd_img_request_create(
                                        struct rbd_device *rbd_dev,
                                        u64 offset, u64 length,
-                                       bool write_request,
-                                       bool child_request)
+                                       bool write_request)
 {
        struct rbd_img_request *img_request;
 
-       img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
+       img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
        if (!img_request)
                return NULL;
 
@@ -1736,9 +1892,7 @@ static struct rbd_img_request *rbd_img_request_create(
        } else {
                img_request->snap_id = rbd_dev->spec->snap_id;
        }
-       if (child_request)
-               img_request_child_set(img_request);
-       if (rbd_dev->parent_spec)
+       if (rbd_dev->parent_overlap)
                img_request_layered_set(img_request);
        spin_lock_init(&img_request->completion_lock);
        img_request->next_completion = 0;
@@ -1748,9 +1902,6 @@ static struct rbd_img_request *rbd_img_request_create(
        INIT_LIST_HEAD(&img_request->obj_requests);
        kref_init(&img_request->kref);
 
-       rbd_img_request_get(img_request);       /* Avoid a warning */
-       rbd_img_request_put(img_request);       /* TEMPORARY */
-
        dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
                write_request ? "write" : "read", offset, length,
                img_request);
@@ -1775,10 +1926,44 @@ static void rbd_img_request_destroy(struct kref *kref)
        if (img_request_write_test(img_request))
                ceph_put_snap_context(img_request->snapc);
 
-       if (img_request_child_test(img_request))
-               rbd_obj_request_put(img_request->obj_request);
+       kmem_cache_free(rbd_img_request_cache, img_request);
+}
+
+static struct rbd_img_request *rbd_parent_request_create(
+                                       struct rbd_obj_request *obj_request,
+                                       u64 img_offset, u64 length)
+{
+       struct rbd_img_request *parent_request;
+       struct rbd_device *rbd_dev;
+
+       rbd_assert(obj_request->img_request);
+       rbd_dev = obj_request->img_request->rbd_dev;
+
+       parent_request = rbd_img_request_create(rbd_dev->parent,
+                                               img_offset, length, false);
+       if (!parent_request)
+               return NULL;
+
+       img_request_child_set(parent_request);
+       rbd_obj_request_get(obj_request);
+       parent_request->obj_request = obj_request;
+
+       return parent_request;
+}
+
+static void rbd_parent_request_destroy(struct kref *kref)
+{
+       struct rbd_img_request *parent_request;
+       struct rbd_obj_request *orig_request;
+
+       parent_request = container_of(kref, struct rbd_img_request, kref);
+       orig_request = parent_request->obj_request;
+
+       parent_request->obj_request = NULL;
+       rbd_obj_request_put(orig_request);
+       img_request_child_clear(parent_request);
 
-       kfree(img_request);
+       rbd_img_request_destroy(kref);
 }
 
 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
@@ -1916,7 +2101,8 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                length = rbd_segment_length(rbd_dev, img_offset, resid);
                obj_request = rbd_obj_request_create(object_name,
                                                offset, length, type);
-               kfree(object_name);     /* object request has its own copy */
+               /* object request has its own copy of the object name */
+               rbd_segment_name_free(object_name);
                if (!obj_request)
                        goto out_unwind;
 
@@ -1988,7 +2174,7 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request;
        struct rbd_device *rbd_dev;
-       u64 length;
+       struct page **pages;
        u32 page_count;
 
        rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
@@ -1998,12 +2184,14 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 
        rbd_dev = img_request->rbd_dev;
        rbd_assert(rbd_dev);
-       length = (u64)1 << rbd_dev->header.obj_order;
-       page_count = (u32)calc_pages_for(0, length);
 
-       rbd_assert(obj_request->copyup_pages);
-       ceph_release_page_vector(obj_request->copyup_pages, page_count);
+       pages = obj_request->copyup_pages;
+       rbd_assert(pages != NULL);
        obj_request->copyup_pages = NULL;
+       page_count = obj_request->copyup_page_count;
+       rbd_assert(page_count);
+       obj_request->copyup_page_count = 0;
+       ceph_release_page_vector(pages, page_count);
 
        /*
         * We want the transfer count to reflect the size of the
@@ -2027,9 +2215,11 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
        struct page **pages;
+       u32 page_count;
        int result;
-       u64 obj_size;
-       u64 xferred;
+       u64 parent_length;
+       u64 offset;
+       u64 length;
 
        rbd_assert(img_request_child_test(img_request));
 
@@ -2038,46 +2228,59 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        pages = img_request->copyup_pages;
        rbd_assert(pages != NULL);
        img_request->copyup_pages = NULL;
+       page_count = img_request->copyup_page_count;
+       rbd_assert(page_count);
+       img_request->copyup_page_count = 0;
 
        orig_request = img_request->obj_request;
        rbd_assert(orig_request != NULL);
-       rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(orig_request->type));
        result = img_request->result;
-       obj_size = img_request->length;
-       xferred = img_request->xferred;
+       parent_length = img_request->length;
+       rbd_assert(parent_length == img_request->xferred);
+       rbd_img_request_put(img_request);
 
-       rbd_dev = img_request->rbd_dev;
+       rbd_assert(orig_request->img_request);
+       rbd_dev = orig_request->img_request->rbd_dev;
        rbd_assert(rbd_dev);
-       rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
-
-       rbd_img_request_put(img_request);
 
        if (result)
                goto out_err;
 
-       /* Allocate the new copyup osd request for the original request */
-
+       /*
+        * The original osd request is of no use to use any more.
+        * We need a new one that can hold the two ops in a copyup
+        * request.  Allocate the new copyup osd request for the
+        * original request, and release the old one.
+        */
        result = -ENOMEM;
-       rbd_assert(!orig_request->osd_req);
        osd_req = rbd_osd_req_create_copyup(orig_request);
        if (!osd_req)
                goto out_err;
+       rbd_osd_req_destroy(orig_request->osd_req);
        orig_request->osd_req = osd_req;
        orig_request->copyup_pages = pages;
+       orig_request->copyup_page_count = page_count;
 
        /* Initialize the copyup op */
 
        osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
-       osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+       osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
                                                false, false);
 
        /* Then the original write request op */
 
+       offset = orig_request->offset;
+       length = orig_request->length;
        osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
-                                       orig_request->offset,
-                                       orig_request->length, 0, 0);
-       osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
-                                       orig_request->length);
+                                       offset, length, 0, 0);
+       if (orig_request->type == OBJ_REQUEST_BIO)
+               osd_req_op_extent_osd_data_bio(osd_req, 1,
+                                       orig_request->bio_list, length);
+       else
+               osd_req_op_extent_osd_data_pages(osd_req, 1,
+                                       orig_request->pages, length,
+                                       offset & ~PAGE_MASK, false, false);
 
        rbd_osd_req_format_write(orig_request);
 
@@ -2123,22 +2326,13 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
        int result;
 
        rbd_assert(obj_request_img_data_test(obj_request));
-       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(obj_request->type));
 
        img_request = obj_request->img_request;
        rbd_assert(img_request != NULL);
        rbd_dev = img_request->rbd_dev;
        rbd_assert(rbd_dev->parent != NULL);
 
-       /*
-        * First things first.  The original osd request is of no
-        * use to use any more, we'll need a new one that can hold
-        * the two ops in a copyup request.  We'll get that later,
-        * but for now we can release the old one.
-        */
-       rbd_osd_req_destroy(obj_request->osd_req);
-       obj_request->osd_req = NULL;
-
        /*
         * Determine the byte range covered by the object in the
         * child image to which the original request was to be sent.
@@ -2169,18 +2363,16 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
        }
 
        result = -ENOMEM;
-       parent_request = rbd_img_request_create(rbd_dev->parent,
-                                               img_offset, length,
-                                               false, true);
+       parent_request = rbd_parent_request_create(obj_request,
+                                               img_offset, length);
        if (!parent_request)
                goto out_err;
-       rbd_obj_request_get(obj_request);
-       parent_request->obj_request = obj_request;
 
        result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
        if (result)
                goto out_err;
        parent_request->copyup_pages = pages;
+       parent_request->copyup_page_count = page_count;
 
        parent_request->callback = rbd_img_obj_parent_read_full_callback;
        result = rbd_img_request_submit(parent_request);
@@ -2188,6 +2380,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
                return 0;
 
        parent_request->copyup_pages = NULL;
+       parent_request->copyup_page_count = 0;
        parent_request->obj_request = NULL;
        rbd_obj_request_put(obj_request);
 out_err:
@@ -2419,37 +2612,35 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
                obj_request->xferred = img_request->xferred;
        }
 out:
+       rbd_img_request_put(img_request);
        rbd_img_obj_request_read_callback(obj_request);
        rbd_obj_request_complete(obj_request);
 }
 
 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
 {
-       struct rbd_device *rbd_dev;
        struct rbd_img_request *img_request;
        int result;
 
        rbd_assert(obj_request_img_data_test(obj_request));
        rbd_assert(obj_request->img_request != NULL);
        rbd_assert(obj_request->result == (s32) -ENOENT);
-       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(obj_request->type));
 
-       rbd_dev = obj_request->img_request->rbd_dev;
-       rbd_assert(rbd_dev->parent != NULL);
        /* rbd_read_finish(obj_request, obj_request->length); */
-       img_request = rbd_img_request_create(rbd_dev->parent,
+       img_request = rbd_parent_request_create(obj_request,
                                                obj_request->img_offset,
-                                               obj_request->length,
-                                               false, true);
+                                               obj_request->length);
        result = -ENOMEM;
        if (!img_request)
                goto out_err;
 
-       rbd_obj_request_get(obj_request);
-       img_request->obj_request = obj_request;
-
-       result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-                                       obj_request->bio_list);
+       if (obj_request->type == OBJ_REQUEST_BIO)
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+                                               obj_request->bio_list);
+       else
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
+                                               obj_request->pages);
        if (result)
                goto out_err;
 
@@ -2499,6 +2690,7 @@ out:
 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
        struct rbd_device *rbd_dev = (struct rbd_device *)data;
+       int ret;
 
        if (!rbd_dev)
                return;
@@ -2506,7 +2698,9 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
                rbd_dev->header_name, (unsigned long long)notify_id,
                (unsigned int)opcode);
-       (void)rbd_dev_refresh(rbd_dev);
+       ret = rbd_dev_refresh(rbd_dev);
+       if (ret)
+               rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
 
        rbd_obj_notify_ack(rbd_dev, notify_id);
 }
@@ -2515,7 +2709,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
  * Request sync osd watch/unwatch.  The value of "start" determines
  * whether a watch request is being initiated or torn down.
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
@@ -2549,7 +2743,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
                                        rbd_dev->watch_request->osd_req);
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie, 0, start);
+                               rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
        rbd_osd_req_format_write(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
@@ -2742,9 +2936,16 @@ static void rbd_request_fn(struct request_queue *q)
                        goto end_request;       /* Shouldn't happen */
                }
 
+               result = -EIO;
+               if (offset + length > rbd_dev->mapping.size) {
+                       rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
+                               offset, length, rbd_dev->mapping.size);
+                       goto end_request;
+               }
+
                result = -ENOMEM;
                img_request = rbd_img_request_create(rbd_dev, offset, length,
-                                                       write_request, false);
+                                                       write_request);
                if (!img_request)
                        goto end_request;
 
@@ -2895,17 +3096,11 @@ out:
 }
 
 /*
- * Read the complete header for the given rbd device.
- *
- * Returns a pointer to a dynamically-allocated buffer containing
- * the complete and validated header.  Caller can pass the address
- * of a variable that will be filled in with the version of the
- * header object at the time it was read.
- *
- * Returns a pointer-coded errno if a failure occurs.
+ * Read the complete header for the given rbd device.  On successful
+ * return, the rbd_dev->header field will contain up-to-date
+ * information about the image.
  */
-static struct rbd_image_header_ondisk *
-rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
+static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
 {
        struct rbd_image_header_ondisk *ondisk = NULL;
        u32 snap_count = 0;
@@ -2930,22 +3125,22 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
                size += names_size;
                ondisk = kmalloc(size, GFP_KERNEL);
                if (!ondisk)
-                       return ERR_PTR(-ENOMEM);
+                       return -ENOMEM;
 
                ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
                                       0, size, ondisk);
                if (ret < 0)
-                       goto out_err;
+                       goto out;
                if ((size_t)ret < size) {
                        ret = -ENXIO;
                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
                                size, ret);
-                       goto out_err;
+                       goto out;
                }
                if (!rbd_dev_ondisk_valid(ondisk)) {
                        ret = -ENXIO;
                        rbd_warn(rbd_dev, "invalid header");
-                       goto out_err;
+                       goto out;
                }
 
                names_size = le64_to_cpu(ondisk->snap_names_len);
@@ -2953,118 +3148,59 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
                snap_count = le32_to_cpu(ondisk->snap_count);
        } while (snap_count != want_count);
 
-       return ondisk;
-
-out_err:
+       ret = rbd_header_from_disk(rbd_dev, ondisk);
+out:
        kfree(ondisk);
 
-       return ERR_PTR(ret);
+       return ret;
 }
 
 /*
- * reload the ondisk the header
+ * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
+ * has disappeared from the (just updated) snapshot context.
  */
-static int rbd_read_header(struct rbd_device *rbd_dev,
-                          struct rbd_image_header *header)
+static void rbd_exists_validate(struct rbd_device *rbd_dev)
 {
-       struct rbd_image_header_ondisk *ondisk;
-       int ret;
+       u64 snap_id;
 
-       ondisk = rbd_dev_v1_header_read(rbd_dev);
-       if (IS_ERR(ondisk))
-               return PTR_ERR(ondisk);
-       ret = rbd_header_from_disk(header, ondisk);
-       kfree(ondisk);
+       if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
+               return;
 
-       return ret;
+       snap_id = rbd_dev->spec->snap_id;
+       if (snap_id == CEPH_NOSNAP)
+               return;
+
+       if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
+               clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 }
 
-static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 {
-       struct rbd_snap *snap;
-       struct rbd_snap *next;
+       u64 mapping_size;
+       int ret;
 
-       list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
-               list_del(&snap->node);
-               rbd_snap_destroy(snap);
-       }
-}
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       mapping_size = rbd_dev->mapping.size;
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+       if (rbd_dev->image_format == 1)
+               ret = rbd_dev_v1_header_info(rbd_dev);
+       else
+               ret = rbd_dev_v2_header_info(rbd_dev);
 
-static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
-{
-       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
-               return;
+       /* If it's a mapped snapshot, validate its EXISTS flag */
 
-       if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
+       rbd_exists_validate(rbd_dev);
+       mutex_unlock(&ctl_mutex);
+       if (mapping_size != rbd_dev->mapping.size) {
                sector_t size;
 
-               rbd_dev->mapping.size = rbd_dev->header.image_size;
                size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
                dout("setting size to %llu sectors", (unsigned long long)size);
                set_capacity(rbd_dev->disk, size);
+               revalidate_disk(rbd_dev->disk);
        }
-}
-
-/*
- * only read the first part of the ondisk header, without the snaps info
- */
-static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
-{
-       int ret;
-       struct rbd_image_header h;
-
-       ret = rbd_read_header(rbd_dev, &h);
-       if (ret < 0)
-               return ret;
-
-       down_write(&rbd_dev->header_rwsem);
-
-       /* Update image size, and check for resize of mapped image */
-       rbd_dev->header.image_size = h.image_size;
-       rbd_update_mapping_size(rbd_dev);
-
-       /* rbd_dev->header.object_prefix shouldn't change */
-       kfree(rbd_dev->header.snap_sizes);
-       kfree(rbd_dev->header.snap_names);
-       /* osd requests may still refer to snapc */
-       ceph_put_snap_context(rbd_dev->header.snapc);
-
-       rbd_dev->header.image_size = h.image_size;
-       rbd_dev->header.snapc = h.snapc;
-       rbd_dev->header.snap_names = h.snap_names;
-       rbd_dev->header.snap_sizes = h.snap_sizes;
-       /* Free the extra copy of the object prefix */
-       if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
-               rbd_warn(rbd_dev, "object prefix changed (ignoring)");
-       kfree(h.object_prefix);
-
-       ret = rbd_dev_snaps_update(rbd_dev);
-
-       up_write(&rbd_dev->header_rwsem);
-
-       return ret;
-}
-
-static int rbd_dev_refresh(struct rbd_device *rbd_dev)
-{
-       u64 image_size;
-       int ret;
-
-       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
-       image_size = rbd_dev->header.image_size;
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       if (rbd_dev->image_format == 1)
-               ret = rbd_dev_v1_refresh(rbd_dev);
-       else
-               ret = rbd_dev_v2_refresh(rbd_dev);
-       mutex_unlock(&ctl_mutex);
-       if (ret)
-               rbd_warn(rbd_dev, "got notification but failed to "
-                          " update snaps: %d\n", ret);
-       if (image_size != rbd_dev->header.image_size)
-               revalidate_disk(rbd_dev->disk);
-
-       return ret;
+
+       return ret;
 }
 
 static int rbd_init_disk(struct rbd_device *rbd_dev)
@@ -3266,6 +3402,8 @@ static ssize_t rbd_image_refresh(struct device *dev,
        int ret;
 
        ret = rbd_dev_refresh(rbd_dev);
+       if (ret)
+               rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
 
        return ret < 0 ? ret : size;
 }
@@ -3365,7 +3503,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        spin_lock_init(&rbd_dev->lock);
        rbd_dev->flags = 0;
        INIT_LIST_HEAD(&rbd_dev->node);
-       INIT_LIST_HEAD(&rbd_dev->snaps);
        init_rwsem(&rbd_dev->header_rwsem);
 
        rbd_dev->spec = spec;
@@ -3388,59 +3525,6 @@ static void rbd_dev_destroy(struct rbd_device *rbd_dev)
        kfree(rbd_dev);
 }
 
-static void rbd_snap_destroy(struct rbd_snap *snap)
-{
-       kfree(snap->name);
-       kfree(snap);
-}
-
-static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
-                                               const char *snap_name,
-                                               u64 snap_id, u64 snap_size,
-                                               u64 snap_features)
-{
-       struct rbd_snap *snap;
-
-       snap = kzalloc(sizeof (*snap), GFP_KERNEL);
-       if (!snap)
-               return ERR_PTR(-ENOMEM);
-
-       snap->name = snap_name;
-       snap->id = snap_id;
-       snap->size = snap_size;
-       snap->features = snap_features;
-
-       return snap;
-}
-
-/*
- * Returns a dynamically-allocated snapshot name if successful, or a
- * pointer-coded error otherwise.
- */
-static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
-               u64 *snap_size, u64 *snap_features)
-{
-       const char *snap_name;
-       int i;
-
-       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
-
-       /* Skip over names until we find the one we are looking for */
-
-       snap_name = rbd_dev->header.snap_names;
-       for (i = 0; i < which; i++)
-               snap_name += strlen(snap_name) + 1;
-
-       snap_name = kstrdup(snap_name, GFP_KERNEL);
-       if (!snap_name)
-               return ERR_PTR(-ENOMEM);
-
-       *snap_size = rbd_dev->header.snap_sizes[which];
-       *snap_features = 0;     /* No features for v1 */
-
-       return snap_name;
-}
-
 /*
  * Get the size and object order for an image snapshot, or if
  * snap_id is CEPH_NOSNAP, gets this information for the base
@@ -3567,6 +3651,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        __le64 snapid;
        void *p;
        void *end;
+       u64 pool_id;
        char *image_id;
        u64 overlap;
        int ret;
@@ -3597,18 +3682,19 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        p = reply_buf;
        end = reply_buf + ret;
        ret = -ERANGE;
-       ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
-       if (parent_spec->pool_id == CEPH_NOPOOL)
+       ceph_decode_64_safe(&p, end, pool_id, out_err);
+       if (pool_id == CEPH_NOPOOL)
                goto out;       /* No parent?  No problem. */
 
        /* The ceph file layout needs to fit pool id in 32 bits */
 
        ret = -EIO;
-       if (parent_spec->pool_id > (u64)U32_MAX) {
+       if (pool_id > (u64)U32_MAX) {
                rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
-                       (unsigned long long)parent_spec->pool_id, U32_MAX);
+                       (unsigned long long)pool_id, U32_MAX);
                goto out_err;
        }
+       parent_spec->pool_id = pool_id;
 
        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
@@ -3619,9 +3705,14 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
        ceph_decode_64_safe(&p, end, overlap, out_err);
 
-       rbd_dev->parent_overlap = overlap;
-       rbd_dev->parent_spec = parent_spec;
-       parent_spec = NULL;     /* rbd_dev now owns this */
+       if (overlap) {
+               rbd_spec_put(rbd_dev->parent_spec);
+               rbd_dev->parent_spec = parent_spec;
+               parent_spec = NULL;     /* rbd_dev now owns this */
+               rbd_dev->parent_overlap = overlap;
+       } else {
+               rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+       }
 out:
        ret = 0;
 out_err:
@@ -3731,6 +3822,56 @@ out:
        return image_name;
 }
 
+static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       const char *snap_name;
+       u32 which = 0;
+
+       /* Skip over names until we find the one we are looking for */
+
+       snap_name = rbd_dev->header.snap_names;
+       while (which < snapc->num_snaps) {
+               if (!strcmp(name, snap_name))
+                       return snapc->snaps[which];
+               snap_name += strlen(snap_name) + 1;
+               which++;
+       }
+       return CEPH_NOSNAP;
+}
+
+static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       u32 which;
+       bool found = false;
+       u64 snap_id;
+
+       for (which = 0; !found && which < snapc->num_snaps; which++) {
+               const char *snap_name;
+
+               snap_id = snapc->snaps[which];
+               snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
+               if (IS_ERR(snap_name))
+                       break;
+               found = !strcmp(name, snap_name);
+               kfree(snap_name);
+       }
+       return found ? snap_id : CEPH_NOSNAP;
+}
+
+/*
+ * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
+ * no snapshot by that name is found, or if an error occurs.
+ */
+static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
+{
+       if (rbd_dev->image_format == 1)
+               return rbd_v1_snap_id_by_name(rbd_dev, name);
+
+       return rbd_v2_snap_id_by_name(rbd_dev, name);
+}
+
 /*
  * When an rbd image has a parent image, it is identified by the
  * pool, image, and snapshot ids (not names).  This function fills
@@ -3742,10 +3883,6 @@ out:
  * When an image being mapped (not a parent) is probed, we have the
  * pool name and pool id, image name and image id, and the snapshot
  * name.  The only thing we're missing is the snapshot id.
- *
- * The set of snapshots for an image is not known until they have
- * been read by rbd_dev_snaps_update(), so we can't completely fill
- * in this information until after that has been called.
  */
 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 {
@@ -3762,12 +3899,12 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
         */
        if (spec->pool_name) {
                if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
-                       struct rbd_snap *snap;
+                       u64 snap_id;
 
-                       snap = snap_by_name(rbd_dev, spec->snap_name);
-                       if (!snap)
+                       snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
+                       if (snap_id == CEPH_NOSNAP)
                                return -ENOENT;
-                       spec->snap_id = snap->id;
+                       spec->snap_id = snap_id;
                } else {
                        spec->snap_id = CEPH_NOSNAP;
                }
@@ -3795,12 +3932,6 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
        /* Look up the snapshot name, and make a copy */
 
        snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
-       if (!snap_name) {
-               rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
-               ret = -EIO;
-               goto out_err;
-       }
-       snap_name = kstrdup(snap_name, GFP_KERNEL);
        if (!snap_name) {
                ret = -ENOMEM;
                goto out_err;
@@ -3879,6 +4010,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
        for (i = 0; i < snap_count; i++)
                snapc->snaps[i] = ceph_decode_64(&p);
 
+       ceph_put_snap_context(rbd_dev->header.snapc);
        rbd_dev->header.snapc = snapc;
 
        dout("  snap context seq = %llu, snap_count = %u\n",
@@ -3889,11 +4021,12 @@ out:
        return ret;
 }
 
-static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
+static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
+                                       u64 snap_id)
 {
        size_t size;
        void *reply_buf;
-       __le64 snap_id;
+       __le64 snapid;
        int ret;
        void *p;
        void *end;
@@ -3904,11 +4037,10 @@ static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
        if (!reply_buf)
                return ERR_PTR(-ENOMEM);
 
-       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
-       snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+       snapid = cpu_to_le64(snap_id);
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapshot_name",
-                               &snap_id, sizeof (snap_id),
+                               &snapid, sizeof (snapid),
                                reply_buf, size);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0) {
@@ -3923,205 +4055,69 @@ static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
                goto out;
 
        dout("  snap_id 0x%016llx snap_name = %s\n",
-               (unsigned long long)le64_to_cpu(snap_id), snap_name);
+               (unsigned long long)snap_id, snap_name);
 out:
        kfree(reply_buf);
 
        return snap_name;
 }
 
-static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
-               u64 *snap_size, u64 *snap_features)
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 {
-       u64 snap_id;
-       u64 size;
-       u64 features;
-       const char *snap_name;
+       bool first_time = rbd_dev->header.object_prefix == NULL;
        int ret;
 
-       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
-       snap_id = rbd_dev->header.snapc->snaps[which];
-       ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
-       if (ret)
-               goto out_err;
-
-       ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
-       if (ret)
-               goto out_err;
+       down_write(&rbd_dev->header_rwsem);
 
-       snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
-       if (!IS_ERR(snap_name)) {
-               *snap_size = size;
-               *snap_features = features;
+       if (first_time) {
+               ret = rbd_dev_v2_header_onetime(rbd_dev);
+               if (ret)
+                       goto out;
        }
 
-       return snap_name;
-out_err:
-       return ERR_PTR(ret);
-}
-
-static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
-               u64 *snap_size, u64 *snap_features)
-{
-       if (rbd_dev->image_format == 1)
-               return rbd_dev_v1_snap_info(rbd_dev, which,
-                                       snap_size, snap_features);
-       if (rbd_dev->image_format == 2)
-               return rbd_dev_v2_snap_info(rbd_dev, which,
-                                       snap_size, snap_features);
-       return ERR_PTR(-EINVAL);
-}
+       /*
+        * If the image supports layering, get the parent info.  We
+        * need to probe the first time regardless.  Thereafter we
+        * only need to if there's a parent, to see if it has
+        * disappeared due to the mapped image getting flattened.
+        */
+       if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
+                       (first_time || rbd_dev->parent_spec)) {
+               bool warn;
 
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
-{
-       int ret;
+               ret = rbd_dev_v2_parent_info(rbd_dev);
+               if (ret)
+                       goto out;
 
-       down_write(&rbd_dev->header_rwsem);
+               /*
+                * Print a warning if this is the initial probe and
+                * the image has a parent.  Don't print it if the
+                * image now being probed is itself a parent.  We
+                * can tell at this point because we won't know its
+                * pool name yet (just its pool id).
+                */
+               warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
+               if (first_time && warn)
+                       rbd_warn(rbd_dev, "WARNING: kernel layering "
+                                       "is EXPERIMENTAL!");
+       }
 
        ret = rbd_dev_v2_image_size(rbd_dev);
        if (ret)
                goto out;
-       rbd_update_mapping_size(rbd_dev);
+
+       if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
+               if (rbd_dev->mapping.size != rbd_dev->header.image_size)
+                       rbd_dev->mapping.size = rbd_dev->header.image_size;
 
        ret = rbd_dev_v2_snap_context(rbd_dev);
        dout("rbd_dev_v2_snap_context returned %d\n", ret);
-       if (ret)
-               goto out;
-       ret = rbd_dev_snaps_update(rbd_dev);
-       dout("rbd_dev_snaps_update returned %d\n", ret);
-       if (ret)
-               goto out;
 out:
        up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
 
-/*
- * Scan the rbd device's current snapshot list and compare it to the
- * newly-received snapshot context.  Remove any existing snapshots
- * not present in the new snapshot context.  Add a new snapshot for
- * any snaphots in the snapshot context not in the current list.
- * And verify there are no changes to snapshots we already know
- * about.
- *
- * Assumes the snapshots in the snapshot context are sorted by
- * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
- * are also maintained in that order.)
- *
- * Note that any error occurs while updating the snapshot list
- * aborts the update, and the entire list is cleared.  The snapshot
- * list becomes inconsistent at that point anyway, so it might as
- * well be empty.
- */
-static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
-{
-       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
-       const u32 snap_count = snapc->num_snaps;
-       struct list_head *head = &rbd_dev->snaps;
-       struct list_head *links = head->next;
-       u32 index = 0;
-       int ret = 0;
-
-       dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
-       while (index < snap_count || links != head) {
-               u64 snap_id;
-               struct rbd_snap *snap;
-               const char *snap_name;
-               u64 snap_size = 0;
-               u64 snap_features = 0;
-
-               snap_id = index < snap_count ? snapc->snaps[index]
-                                            : CEPH_NOSNAP;
-               snap = links != head ? list_entry(links, struct rbd_snap, node)
-                                    : NULL;
-               rbd_assert(!snap || snap->id != CEPH_NOSNAP);
-
-               if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
-                       struct list_head *next = links->next;
-
-                       /*
-                        * A previously-existing snapshot is not in
-                        * the new snap context.
-                        *
-                        * If the now-missing snapshot is the one
-                        * the image represents, clear its existence
-                        * flag so we can avoid sending any more
-                        * requests to it.
-                        */
-                       if (rbd_dev->spec->snap_id == snap->id)
-                               clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-                       dout("removing %ssnap id %llu\n",
-                               rbd_dev->spec->snap_id == snap->id ?
-                                                       "mapped " : "",
-                               (unsigned long long)snap->id);
-
-                       list_del(&snap->node);
-                       rbd_snap_destroy(snap);
-
-                       /* Done with this list entry; advance */
-
-                       links = next;
-                       continue;
-               }
-
-               snap_name = rbd_dev_snap_info(rbd_dev, index,
-                                       &snap_size, &snap_features);
-               if (IS_ERR(snap_name)) {
-                       ret = PTR_ERR(snap_name);
-                       dout("failed to get snap info, error %d\n", ret);
-                       goto out_err;
-               }
-
-               dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
-                       (unsigned long long)snap_id);
-               if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
-                       struct rbd_snap *new_snap;
-
-                       /* We haven't seen this snapshot before */
-
-                       new_snap = rbd_snap_create(rbd_dev, snap_name,
-                                       snap_id, snap_size, snap_features);
-                       if (IS_ERR(new_snap)) {
-                               ret = PTR_ERR(new_snap);
-                               dout("  failed to add dev, error %d\n", ret);
-                               goto out_err;
-                       }
-
-                       /* New goes before existing, or at end of list */
-
-                       dout("  added dev%s\n", snap ? "" : " at end\n");
-                       if (snap)
-                               list_add_tail(&new_snap->node, &snap->node);
-                       else
-                               list_add_tail(&new_snap->node, head);
-               } else {
-                       /* Already have this one */
-
-                       dout("  already present\n");
-
-                       rbd_assert(snap->size == snap_size);
-                       rbd_assert(!strcmp(snap->name, snap_name));
-                       rbd_assert(snap->features == snap_features);
-
-                       /* Done with this list entry; advance */
-
-                       links = links->next;
-               }
-
-               /* Advance to the next entry in the snapshot context */
-
-               index++;
-       }
-       dout("%s: done\n", __func__);
-
-       return 0;
-out_err:
-       rbd_remove_all_snaps(rbd_dev);
-
-       return ret;
-}
-
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
        struct device *dev;
@@ -4538,10 +4534,7 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 {
        struct rbd_image_header *header;
 
-       rbd_dev_remove_parent(rbd_dev);
-       rbd_spec_put(rbd_dev->parent_spec);
-       rbd_dev->parent_spec = NULL;
-       rbd_dev->parent_overlap = 0;
+       rbd_dev_unparent(rbd_dev);
 
        /* Free dynamic fields from the header, then zero it out */
 
@@ -4553,72 +4546,22 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
        memset(header, 0, sizeof (*header));
 }
 
-static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
-{
-       int ret;
-
-       /* Populate rbd image metadata */
-
-       ret = rbd_read_header(rbd_dev, &rbd_dev->header);
-       if (ret < 0)
-               goto out_err;
-
-       /* Version 1 images have no parent (no layering) */
-
-       rbd_dev->parent_spec = NULL;
-       rbd_dev->parent_overlap = 0;
-
-       dout("discovered version 1 image, header name is %s\n",
-               rbd_dev->header_name);
-
-       return 0;
-
-out_err:
-       kfree(rbd_dev->header_name);
-       rbd_dev->header_name = NULL;
-       kfree(rbd_dev->spec->image_id);
-       rbd_dev->spec->image_id = NULL;
-
-       return ret;
-}
-
-static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       ret = rbd_dev_v2_image_size(rbd_dev);
-       if (ret)
-               goto out_err;
-
-       /* Get the object prefix (a.k.a. block_name) for the image */
-
        ret = rbd_dev_v2_object_prefix(rbd_dev);
        if (ret)
                goto out_err;
 
-       /* Get the and check features for the image */
-
+       /*
+        * Get the and check features for the image.  Currently the
+        * features are assumed to never change.
+        */
        ret = rbd_dev_v2_features(rbd_dev);
        if (ret)
                goto out_err;
 
-       /* If the image supports layering, get the parent info */
-
-       if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
-               ret = rbd_dev_v2_parent_info(rbd_dev);
-               if (ret)
-                       goto out_err;
-
-               /*
-                * Don't print a warning for parent images.  We can
-                * tell this point because we won't know its pool
-                * name yet (just its pool id).
-                */
-               if (rbd_dev->spec->pool_name)
-                       rbd_warn(rbd_dev, "WARNING: kernel layering "
-                                       "is EXPERIMENTAL!");
-       }
-
        /* If the image supports fancy striping, get its parameters */
 
        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
@@ -4626,28 +4569,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
                if (ret < 0)
                        goto out_err;
        }
-
-       /* crypto and compression type aren't (yet) supported for v2 images */
-
-       rbd_dev->header.crypt_type = 0;
-       rbd_dev->header.comp_type = 0;
-
-       /* Get the snapshot context, plus the header version */
-
-       ret = rbd_dev_v2_snap_context(rbd_dev);
-       if (ret)
-               goto out_err;
-
-       dout("discovered version 2 image, header name is %s\n",
-               rbd_dev->header_name);
+       /* No support for crypto and compression type format 2 images */
 
        return 0;
 out_err:
-       rbd_dev->parent_overlap = 0;
-       rbd_spec_put(rbd_dev->parent_spec);
-       rbd_dev->parent_spec = NULL;
-       kfree(rbd_dev->header_name);
-       rbd_dev->header_name = NULL;
+       rbd_dev->header.features = 0;
        kfree(rbd_dev->header.object_prefix);
        rbd_dev->header.object_prefix = NULL;
 
@@ -4676,7 +4602,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
        if (!parent)
                goto out_err;
 
-       ret = rbd_dev_image_probe(parent);
+       ret = rbd_dev_image_probe(parent, false);
        if (ret < 0)
                goto out_err;
        rbd_dev->parent = parent;
@@ -4684,7 +4610,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
        return 0;
 out_err:
        if (parent) {
-               rbd_spec_put(rbd_dev->parent_spec);
+               rbd_dev_unparent(rbd_dev);
                kfree(rbd_dev->header_name);
                rbd_dev_destroy(parent);
        } else {
@@ -4699,10 +4625,6 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       ret = rbd_dev_mapping_set(rbd_dev);
-       if (ret)
-               return ret;
-
        /* generate unique id: find highest unique id, add one */
        rbd_dev_id_get(rbd_dev);
 
@@ -4724,13 +4646,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_blkdev;
 
-       ret = rbd_bus_add_dev(rbd_dev);
+       ret = rbd_dev_mapping_set(rbd_dev);
        if (ret)
                goto err_out_disk;
+       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+
+       ret = rbd_bus_add_dev(rbd_dev);
+       if (ret)
+               goto err_out_mapping;
 
        /* Everything's ready.  Announce the disk to the world. */
 
-       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        add_disk(rbd_dev->disk);
 
@@ -4739,6 +4665,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 
        return ret;
 
+err_out_mapping:
+       rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
        rbd_free_disk(rbd_dev);
 err_out_blkdev:
@@ -4779,13 +4707,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
 
 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 {
-       int ret;
-
-       rbd_remove_all_snaps(rbd_dev);
        rbd_dev_unprobe(rbd_dev);
-       ret = rbd_dev_header_watch_sync(rbd_dev, 0);
-       if (ret)
-               rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
        rbd_dev->image_format = 0;
@@ -4797,10 +4719,11 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 
 /*
  * Probe for the existence of the header object for the given rbd
- * device.  For format 2 images this includes determining the image
- * id.
+ * device.  If this image is the one being mapped (i.e., not a
+ * parent), initiate a watch on its header object before using that
+ * object to get detailed information about the rbd image.
  */
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 {
        int ret;
        int tmp;
@@ -4820,37 +4743,40 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_format;
 
-       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
-       if (ret)
-               goto out_header_name;
+       if (mapping) {
+               ret = rbd_dev_header_watch_sync(rbd_dev, true);
+               if (ret)
+                       goto out_header_name;
+       }
 
        if (rbd_dev->image_format == 1)
-               ret = rbd_dev_v1_probe(rbd_dev);
+               ret = rbd_dev_v1_header_info(rbd_dev);
        else
-               ret = rbd_dev_v2_probe(rbd_dev);
+               ret = rbd_dev_v2_header_info(rbd_dev);
        if (ret)
                goto err_out_watch;
 
-       ret = rbd_dev_snaps_update(rbd_dev);
+       ret = rbd_dev_spec_update(rbd_dev);
        if (ret)
                goto err_out_probe;
 
-       ret = rbd_dev_spec_update(rbd_dev);
+       ret = rbd_dev_probe_parent(rbd_dev);
        if (ret)
-               goto err_out_snaps;
+               goto err_out_probe;
 
-       ret = rbd_dev_probe_parent(rbd_dev);
-       if (!ret)
-               return 0;
+       dout("discovered format %u image, header name is %s\n",
+               rbd_dev->image_format, rbd_dev->header_name);
 
-err_out_snaps:
-       rbd_remove_all_snaps(rbd_dev);
+       return 0;
 err_out_probe:
        rbd_dev_unprobe(rbd_dev);
 err_out_watch:
-       tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
-       if (tmp)
-               rbd_warn(rbd_dev, "unable to tear down watch request\n");
+       if (mapping) {
+               tmp = rbd_dev_header_watch_sync(rbd_dev, false);
+               if (tmp)
+                       rbd_warn(rbd_dev, "unable to tear down "
+                                       "watch request (%d)\n", tmp);
+       }
 out_header_name:
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
@@ -4874,6 +4800,7 @@ static ssize_t rbd_add(struct bus_type *bus,
        struct rbd_spec *spec = NULL;
        struct rbd_client *rbdc;
        struct ceph_osd_client *osdc;
+       bool read_only;
        int rc = -ENOMEM;
 
        if (!try_module_get(THIS_MODULE))
@@ -4883,6 +4810,9 @@ static ssize_t rbd_add(struct bus_type *bus,
        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
        if (rc < 0)
                goto err_out_module;
+       read_only = rbd_opts->read_only;
+       kfree(rbd_opts);
+       rbd_opts = NULL;        /* done with this */
 
        rbdc = rbd_get_client(ceph_opts);
        if (IS_ERR(rbdc)) {
@@ -4913,14 +4843,16 @@ static ssize_t rbd_add(struct bus_type *bus,
        rbdc = NULL;            /* rbd_dev now owns this */
        spec = NULL;            /* rbd_dev now owns this */
 
-       rbd_dev->mapping.read_only = rbd_opts->read_only;
-       kfree(rbd_opts);
-       rbd_opts = NULL;        /* done with this */
-
-       rc = rbd_dev_image_probe(rbd_dev);
+       rc = rbd_dev_image_probe(rbd_dev, true);
        if (rc < 0)
                goto err_out_rbd_dev;
 
+       /* If we are mapping a snapshot it must be marked read-only */
+
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+               read_only = true;
+       rbd_dev->mapping.read_only = read_only;
+
        rc = rbd_dev_device_setup(rbd_dev);
        if (!rc)
                return count;
@@ -4966,7 +4898,7 @@ static void rbd_dev_device_release(struct device *dev)
 
        rbd_free_disk(rbd_dev);
        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-       rbd_dev_clear_mapping(rbd_dev);
+       rbd_dev_mapping_clear(rbd_dev);
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
        rbd_dev->major = 0;
        rbd_dev_id_put(rbd_dev);
@@ -5033,10 +4965,13 @@ static ssize_t rbd_remove(struct bus_type *bus,
        spin_unlock_irq(&rbd_dev->lock);
        if (ret < 0)
                goto done;
-       ret = count;
        rbd_bus_del_dev(rbd_dev);
+       ret = rbd_dev_header_watch_sync(rbd_dev, false);
+       if (ret)
+               rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        rbd_dev_image_release(rbd_dev);
        module_put(THIS_MODULE);
+       ret = count;
 done:
        mutex_unlock(&ctl_mutex);
 
@@ -5068,6 +5003,56 @@ static void rbd_sysfs_cleanup(void)
        device_unregister(&rbd_root_dev);
 }
 
+static int rbd_slab_init(void)
+{
+       rbd_assert(!rbd_img_request_cache);
+       rbd_img_request_cache = kmem_cache_create("rbd_img_request",
+                                       sizeof (struct rbd_img_request),
+                                       __alignof__(struct rbd_img_request),
+                                       0, NULL);
+       if (!rbd_img_request_cache)
+               return -ENOMEM;
+
+       rbd_assert(!rbd_obj_request_cache);
+       rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
+                                       sizeof (struct rbd_obj_request),
+                                       __alignof__(struct rbd_obj_request),
+                                       0, NULL);
+       if (!rbd_obj_request_cache)
+               goto out_err;
+
+       rbd_assert(!rbd_segment_name_cache);
+       rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
+                                       MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
+       if (rbd_segment_name_cache)
+               return 0;
+out_err:
+       if (rbd_obj_request_cache) {
+               kmem_cache_destroy(rbd_obj_request_cache);
+               rbd_obj_request_cache = NULL;
+       }
+
+       kmem_cache_destroy(rbd_img_request_cache);
+       rbd_img_request_cache = NULL;
+
+       return -ENOMEM;
+}
+
+static void rbd_slab_exit(void)
+{
+       rbd_assert(rbd_segment_name_cache);
+       kmem_cache_destroy(rbd_segment_name_cache);
+       rbd_segment_name_cache = NULL;
+
+       rbd_assert(rbd_obj_request_cache);
+       kmem_cache_destroy(rbd_obj_request_cache);
+       rbd_obj_request_cache = NULL;
+
+       rbd_assert(rbd_img_request_cache);
+       kmem_cache_destroy(rbd_img_request_cache);
+       rbd_img_request_cache = NULL;
+}
+
 static int __init rbd_init(void)
 {
        int rc;
@@ -5077,16 +5062,22 @@ static int __init rbd_init(void)
 
                return -EINVAL;
        }
-       rc = rbd_sysfs_init();
+       rc = rbd_slab_init();
        if (rc)
                return rc;
-       pr_info("loaded " RBD_DRV_NAME_LONG "\n");
-       return 0;
+       rc = rbd_sysfs_init();
+       if (rc)
+               rbd_slab_exit();
+       else
+               pr_info("loaded " RBD_DRV_NAME_LONG "\n");
+
+       return rc;
 }
 
 static void __exit rbd_exit(void)
 {
        rbd_sysfs_cleanup();
+       rbd_slab_exit();
 }
 
 module_init(rbd_init);