]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
rbd: encapsulate removing parent devices
[~andy/linux] / drivers / block / rbd.c
index 44dcc82770d9aceba4fde87d15f3eca8b08e0d40..87ef01189b83502315068e294f4dfa7b2e920075 100644 (file)
@@ -80,7 +80,7 @@
 
 /* Features supported by this (client software) implementation. */
 
-#define RBD_FEATURES_SUPPORTED (0)
+#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
 
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
@@ -108,6 +108,9 @@ struct rbd_image_header {
        char *snap_names;
        u64 *snap_sizes;
 
+       u64 stripe_unit;
+       u64 stripe_count;
+
        u64 obj_version;
 };
 
@@ -138,13 +141,13 @@ struct rbd_image_header {
  */
 struct rbd_spec {
        u64             pool_id;
-       char            *pool_name;
+       const char      *pool_name;
 
-       char            *image_id;
-       char            *image_name;
+       const char      *image_id;
+       const char      *image_name;
 
        u64             snap_id;
-       char            *snap_name;
+       const char      *snap_name;
 
        struct kref     kref;
 };
@@ -272,7 +275,6 @@ struct rbd_img_request {
        list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
 struct rbd_snap {
-       struct  device          dev;
        const char              *name;
        u64                     size;
        struct list_head        node;
@@ -355,16 +357,15 @@ static DEFINE_SPINLOCK(rbd_client_list_lock);
 static int rbd_img_request_submit(struct rbd_img_request *img_request);
 
 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
-static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
 
 static void rbd_dev_release(struct device *dev);
-static void rbd_remove_snap_dev(struct rbd_snap *snap);
+static void rbd_snap_destroy(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
                          size_t count);
-static int rbd_dev_probe(struct rbd_device *rbd_dev);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
 
 static struct bus_attribute rbd_bus_attrs[] = {
        __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -426,8 +427,9 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 #  define rbd_assert(expr)     ((void) 0)
 #endif /* !RBD_DEBUG */
 
-static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
+static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
@@ -670,6 +672,35 @@ static void rbd_client_release(struct kref *kref)
        kfree(rbdc);
 }
 
+/* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
+
+static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
+{
+       struct ceph_snap_context *snapc;
+       size_t size;
+
+       size = sizeof (struct ceph_snap_context);
+       size += snap_count * sizeof (snapc->snaps[0]);
+       snapc = kzalloc(size, GFP_KERNEL);
+       if (!snapc)
+               return NULL;
+
+       atomic_set(&snapc->nref, 1);
+       snapc->num_snaps = snap_count;
+
+       return snapc;
+}
+
+static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
+{
+       (void)ceph_get_snap_context(snapc);
+}
+
+static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
+{
+       ceph_put_snap_context(snapc);
+}
+
 /*
  * Drop reference to ceph client node. If it's not referenced anymore, release
  * it.
@@ -776,7 +807,6 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
                        header->snap_sizes[i] =
                                le64_to_cpu(ondisk->snaps[i].image_size);
        } else {
-               WARN_ON(ondisk->snap_names_len);
                header->snap_names = NULL;
                header->snap_sizes = NULL;
        }
@@ -789,18 +819,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
        /* Allocate and fill in the snapshot context */
 
        header->image_size = le64_to_cpu(ondisk->image_size);
-       size = sizeof (struct ceph_snap_context);
-       size += snap_count * sizeof (header->snapc->snaps[0]);
-       header->snapc = kzalloc(size, GFP_KERNEL);
+
+       header->snapc = rbd_snap_context_create(snap_count);
        if (!header->snapc)
                goto out_err;
-
-       atomic_set(&header->snapc->nref, 1);
        header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
-       header->snapc->num_snaps = snap_count;
        for (i = 0; i < snap_count; i++)
-               header->snapc->snaps[i] =
-                       le64_to_cpu(ondisk->snaps[i].id);
+               header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
 
        return 0;
 
@@ -829,44 +854,36 @@ static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
        return NULL;
 }
 
-static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
+static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
+                                       const char *snap_name)
 {
-
        struct rbd_snap *snap;
 
-       list_for_each_entry(snap, &rbd_dev->snaps, node) {
-               if (!strcmp(snap_name, snap->name)) {
-                       rbd_dev->spec->snap_id = snap->id;
-                       rbd_dev->mapping.size = snap->size;
-                       rbd_dev->mapping.features = snap->features;
-
-                       return 0;
-               }
-       }
+       list_for_each_entry(snap, &rbd_dev->snaps, node)
+               if (!strcmp(snap_name, snap->name))
+                       return snap;
 
-       return -ENOENT;
+       return NULL;
 }
 
 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
 {
-       int ret;
-
        if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
                    sizeof (RBD_SNAP_HEAD_NAME))) {
-               rbd_dev->spec->snap_id = CEPH_NOSNAP;
                rbd_dev->mapping.size = rbd_dev->header.image_size;
                rbd_dev->mapping.features = rbd_dev->header.features;
-               ret = 0;
        } else {
-               ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
-               if (ret < 0)
-                       goto done;
+               struct rbd_snap *snap;
+
+               snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
+               if (!snap)
+                       return -ENOENT;
+               rbd_dev->mapping.size = snap->size;
+               rbd_dev->mapping.features = snap->features;
                rbd_dev->mapping.read_only = true;
        }
-       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 
-done:
-       return ret;
+       return 0;
 }
 
 static void rbd_header_free(struct rbd_image_header *header)
@@ -877,7 +894,7 @@ static void rbd_header_free(struct rbd_image_header *header)
        header->snap_sizes = NULL;
        kfree(header->snap_names);
        header->snap_names = NULL;
-       ceph_put_snap_context(header->snapc);
+       rbd_snap_context_put(header->snapc);
        header->snapc = NULL;
 }
 
@@ -1727,7 +1744,6 @@ static struct rbd_img_request *rbd_img_request_create(
                                        bool child_request)
 {
        struct rbd_img_request *img_request;
-       struct ceph_snap_context *snapc = NULL;
 
        img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
        if (!img_request)
@@ -1735,13 +1751,8 @@ static struct rbd_img_request *rbd_img_request_create(
 
        if (write_request) {
                down_read(&rbd_dev->header_rwsem);
-               snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+               rbd_snap_context_get(rbd_dev->header.snapc);
                up_read(&rbd_dev->header_rwsem);
-               if (WARN_ON(!snapc)) {
-                       kfree(img_request);
-                       return NULL;    /* Shouldn't happen */
-               }
-
        }
 
        img_request->rq = NULL;
@@ -1751,7 +1762,7 @@ static struct rbd_img_request *rbd_img_request_create(
        img_request->flags = 0;
        if (write_request) {
                img_request_write_set(img_request);
-               img_request->snapc = snapc;
+               img_request->snapc = rbd_dev->header.snapc;
        } else {
                img_request->snap_id = rbd_dev->spec->snap_id;
        }
@@ -1792,7 +1803,7 @@ static void rbd_img_request_destroy(struct kref *kref)
        rbd_assert(img_request->obj_request_count == 0);
 
        if (img_request_write_test(img_request))
-               ceph_put_snap_context(img_request->snapc);
+               rbd_snap_context_put(img_request->snapc);
 
        if (img_request_child_test(img_request))
                rbd_obj_request_put(img_request->obj_request);
@@ -2520,7 +2531,6 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
        struct rbd_device *rbd_dev = (struct rbd_device *)data;
        u64 hver;
-       int rc;
 
        if (!rbd_dev)
                return;
@@ -2528,10 +2538,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
                rbd_dev->header_name, (unsigned long long) notify_id,
                (unsigned int) opcode);
-       rc = rbd_dev_refresh(rbd_dev, &hver);
-       if (rc)
-               rbd_warn(rbd_dev, "got notification but failed to "
-                          " update snaps: %d\n", rc);
+       (void)rbd_dev_refresh(rbd_dev, &hver);
 
        rbd_obj_notify_ack(rbd_dev, hver, notify_id);
 }
@@ -2617,7 +2624,8 @@ out_cancel:
 }
 
 /*
- * Synchronous osd object method call
+ * Synchronous osd object method call.  Returns the number of bytes
+ * returned in the outbound buffer, or a negative error code.
  */
 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
                             const char *object_name,
@@ -2764,8 +2772,11 @@ static void rbd_request_fn(struct request_queue *q)
                }
 
                result = -EINVAL;
-               if (WARN_ON(offset && length > U64_MAX - offset + 1))
+               if (offset && length > U64_MAX - offset + 1) {
+                       rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
+                               offset, length);
                        goto end_request;       /* Shouldn't happen */
+               }
 
                result = -ENOMEM;
                img_request = rbd_img_request_create(rbd_dev, offset, length,
@@ -2846,10 +2857,12 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
        if (!disk)
                return;
 
-       if (disk->flags & GENHD_FL_UP)
+       rbd_dev->disk = NULL;
+       if (disk->flags & GENHD_FL_UP) {
                del_gendisk(disk);
-       if (disk->queue)
-               blk_cleanup_queue(disk->queue);
+               if (disk->queue)
+                       blk_cleanup_queue(disk->queue);
+       }
        put_disk(disk);
 }
 
@@ -2962,7 +2975,7 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                                       0, size, ondisk, version);
                if (ret < 0)
                        goto out_err;
-               if (WARN_ON((size_t) ret < size)) {
+               if ((size_t)ret < size) {
                        ret = -ENXIO;
                        rbd_warn(rbd_dev, "short header read (want %zd got %d)",
                                size, ret);
@@ -3013,21 +3026,25 @@ static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
        struct rbd_snap *snap;
        struct rbd_snap *next;
 
-       list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
-               rbd_remove_snap_dev(snap);
+       list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
+               list_del(&snap->node);
+               rbd_snap_destroy(snap);
+       }
 }
 
 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
 {
-       sector_t size;
-
        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
                return;
 
-       size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
-       dout("setting size to %llu sectors", (unsigned long long) size);
-       rbd_dev->mapping.size = (u64) size;
-       set_capacity(rbd_dev->disk, size);
+       if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
+               sector_t size;
+
+               rbd_dev->mapping.size = rbd_dev->header.image_size;
+               size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
+               dout("setting size to %llu sectors", (unsigned long long)size);
+               set_capacity(rbd_dev->disk, size);
+       }
 }
 
 /*
@@ -3052,7 +3069,7 @@ static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
        kfree(rbd_dev->header.snap_sizes);
        kfree(rbd_dev->header.snap_names);
        /* osd requests may still refer to snapc */
-       ceph_put_snap_context(rbd_dev->header.snapc);
+       rbd_snap_context_put(rbd_dev->header.snapc);
 
        if (hver)
                *hver = h.obj_version;
@@ -3062,12 +3079,11 @@ static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
        rbd_dev->header.snap_names = h.snap_names;
        rbd_dev->header.snap_sizes = h.snap_sizes;
        /* Free the extra copy of the object prefix */
-       WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
+       if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
+               rbd_warn(rbd_dev, "object prefix changed (ignoring)");
        kfree(h.object_prefix);
 
        ret = rbd_dev_snaps_update(rbd_dev);
-       if (!ret)
-               ret = rbd_dev_snaps_register(rbd_dev);
 
        up_write(&rbd_dev->header_rwsem);
 
@@ -3086,6 +3102,9 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
                ret = rbd_dev_v2_refresh(rbd_dev, hver);
        mutex_unlock(&ctl_mutex);
        revalidate_disk(rbd_dev->disk);
+       if (ret)
+               rbd_warn(rbd_dev, "got notification but failed to "
+                          " update snaps: %d\n", ret);
 
        return ret;
 }
@@ -3129,8 +3148,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 
        rbd_dev->disk = disk;
 
-       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
-
        return 0;
 out_disk:
        put_disk(disk);
@@ -3151,13 +3168,9 @@ static ssize_t rbd_size_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       sector_t size;
 
-       down_read(&rbd_dev->header_rwsem);
-       size = get_capacity(rbd_dev->disk);
-       up_read(&rbd_dev->header_rwsem);
-
-       return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
+       return sprintf(buf, "%llu\n",
+               (unsigned long long)rbd_dev->mapping.size);
 }
 
 /*
@@ -3170,7 +3183,7 @@ static ssize_t rbd_features_show(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "0x%016llx\n",
-                       (unsigned long long) rbd_dev->mapping.features);
+                       (unsigned long long)rbd_dev->mapping.features);
 }
 
 static ssize_t rbd_major_show(struct device *dev,
@@ -3178,7 +3191,11 @@ static ssize_t rbd_major_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%d\n", rbd_dev->major);
+       if (rbd_dev->major)
+               return sprintf(buf, "%d\n", rbd_dev->major);
+
+       return sprintf(buf, "(none)\n");
+
 }
 
 static ssize_t rbd_client_id_show(struct device *dev,
@@ -3204,7 +3221,7 @@ static ssize_t rbd_pool_id_show(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%llu\n",
-               (unsigned long long) rbd_dev->spec->pool_id);
+                       (unsigned long long) rbd_dev->spec->pool_id);
 }
 
 static ssize_t rbd_name_show(struct device *dev,
@@ -3341,71 +3358,6 @@ static struct device_type rbd_device_type = {
        .release        = rbd_sysfs_dev_release,
 };
 
-
-/*
-  sysfs - snapshots
-*/
-
-static ssize_t rbd_snap_size_show(struct device *dev,
-                                 struct device_attribute *attr,
-                                 char *buf)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-
-       return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
-}
-
-static ssize_t rbd_snap_id_show(struct device *dev,
-                               struct device_attribute *attr,
-                               char *buf)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-
-       return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
-}
-
-static ssize_t rbd_snap_features_show(struct device *dev,
-                               struct device_attribute *attr,
-                               char *buf)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-
-       return sprintf(buf, "0x%016llx\n",
-                       (unsigned long long) snap->features);
-}
-
-static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
-static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
-static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
-
-static struct attribute *rbd_snap_attrs[] = {
-       &dev_attr_snap_size.attr,
-       &dev_attr_snap_id.attr,
-       &dev_attr_snap_features.attr,
-       NULL,
-};
-
-static struct attribute_group rbd_snap_attr_group = {
-       .attrs = rbd_snap_attrs,
-};
-
-static void rbd_snap_dev_release(struct device *dev)
-{
-       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
-       kfree(snap->name);
-       kfree(snap);
-}
-
-static const struct attribute_group *rbd_snap_attr_groups[] = {
-       &rbd_snap_attr_group,
-       NULL
-};
-
-static struct device_type rbd_snap_device_type = {
-       .groups         = rbd_snap_attr_groups,
-       .release        = rbd_snap_dev_release,
-};
-
 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
 {
        kref_get(&spec->kref);
@@ -3473,93 +3425,61 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
 {
-       rbd_spec_put(rbd_dev->parent_spec);
-       kfree(rbd_dev->header_name);
        rbd_put_client(rbd_dev->rbd_client);
        rbd_spec_put(rbd_dev->spec);
        kfree(rbd_dev);
 }
 
-static bool rbd_snap_registered(struct rbd_snap *snap)
-{
-       bool ret = snap->dev.type == &rbd_snap_device_type;
-       bool reg = device_is_registered(&snap->dev);
-
-       rbd_assert(!ret ^ reg);
-
-       return ret;
-}
-
-static void rbd_remove_snap_dev(struct rbd_snap *snap)
-{
-       list_del(&snap->node);
-       if (device_is_registered(&snap->dev))
-               device_unregister(&snap->dev);
-}
-
-static int rbd_register_snap_dev(struct rbd_snap *snap,
-                                 struct device *parent)
+static void rbd_snap_destroy(struct rbd_snap *snap)
 {
-       struct device *dev = &snap->dev;
-       int ret;
-
-       dev->type = &rbd_snap_device_type;
-       dev->parent = parent;
-       dev->release = rbd_snap_dev_release;
-       dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
-       dout("%s: registering device for snapshot %s\n", __func__, snap->name);
-
-       ret = device_register(dev);
-
-       return ret;
+       kfree(snap->name);
+       kfree(snap);
 }
 
-static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
+static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
                                                const char *snap_name,
                                                u64 snap_id, u64 snap_size,
                                                u64 snap_features)
 {
        struct rbd_snap *snap;
-       int ret;
 
        snap = kzalloc(sizeof (*snap), GFP_KERNEL);
        if (!snap)
                return ERR_PTR(-ENOMEM);
 
-       ret = -ENOMEM;
-       snap->name = kstrdup(snap_name, GFP_KERNEL);
-       if (!snap->name)
-               goto err;
-
+       snap->name = snap_name;
        snap->id = snap_id;
        snap->size = snap_size;
        snap->features = snap_features;
 
        return snap;
-
-err:
-       kfree(snap->name);
-       kfree(snap);
-
-       return ERR_PTR(ret);
 }
 
+/*
+ * Returns a dynamically-allocated snapshot name if successful, or a
+ * pointer-coded error otherwise.
+ */
 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
                u64 *snap_size, u64 *snap_features)
 {
        char *snap_name;
+       int i;
 
        rbd_assert(which < rbd_dev->header.snapc->num_snaps);
 
-       *snap_size = rbd_dev->header.snap_sizes[which];
-       *snap_features = 0;     /* No features for v1 */
-
        /* Skip over names until we find the one we are looking for */
 
        snap_name = rbd_dev->header.snap_names;
-       while (which--)
+       for (i = 0; i < which; i++)
                snap_name += strlen(snap_name) + 1;
 
+       snap_name = kstrdup(snap_name, GFP_KERNEL);
+       if (!snap_name)
+               return ERR_PTR(-ENOMEM);
+
+       *snap_size = rbd_dev->header.snap_sizes[which];
+       *snap_features = 0;     /* No features for v1 */
+
        return snap_name;
 }
 
@@ -3588,7 +3508,8 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
        if (ret < sizeof (size_buf))
                return -ERANGE;
 
-       *order = size_buf.order;
+       if (order)
+               *order = size_buf.order;
        *snap_size = le64_to_cpu(size_buf.size);
 
        dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
@@ -3725,8 +3646,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        /* The ceph file layout needs to fit pool id in 32 bits */
 
        ret = -EIO;
-       if (WARN_ON(parent_spec->pool_id > (u64)U32_MAX))
+       if (parent_spec->pool_id > (u64)U32_MAX) {
+               rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
+                       (unsigned long long)parent_spec->pool_id, U32_MAX);
                goto out_err;
+       }
 
        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
@@ -3749,6 +3673,56 @@ out_err:
        return ret;
 }
 
+static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
+{
+       struct {
+               __le64 stripe_unit;
+               __le64 stripe_count;
+       } __attribute__ ((packed)) striping_info_buf = { 0 };
+       size_t size = sizeof (striping_info_buf);
+       void *p;
+       u64 obj_size;
+       u64 stripe_unit;
+       u64 stripe_count;
+       int ret;
+
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_stripe_unit_count", NULL, 0,
+                               (char *)&striping_info_buf, size, NULL);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
+       if (ret < size)
+               return -ERANGE;
+
+       /*
+        * We don't actually support the "fancy striping" feature
+        * (STRIPINGV2) yet, but if the striping sizes are the
+        * defaults the behavior is the same as before.  So find
+        * out, and only fail if the image has non-default values.
+        */
+       ret = -EINVAL;
+       obj_size = (u64)1 << rbd_dev->header.obj_order;
+       p = &striping_info_buf;
+       stripe_unit = ceph_decode_64(&p);
+       if (stripe_unit != obj_size) {
+               rbd_warn(rbd_dev, "unsupported stripe unit "
+                               "(got %llu want %llu)",
+                               stripe_unit, obj_size);
+               return -EINVAL;
+       }
+       stripe_count = ceph_decode_64(&p);
+       if (stripe_count != 1) {
+               rbd_warn(rbd_dev, "unsupported stripe count "
+                               "(got %llu want 1)", stripe_count);
+               return -EINVAL;
+       }
+       rbd_dev->header.stripe_unit = stripe_unit;
+       rbd_dev->header.stripe_count = stripe_count;
+
+       return 0;
+}
+
 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
 {
        size_t image_id_size;
@@ -3785,7 +3759,8 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
        if (ret < 0)
                goto out;
        p = reply_buf;
-       end = reply_buf + size;
+       end = reply_buf + ret;
+
        image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
        if (IS_ERR(image_name))
                image_name = NULL;
@@ -3799,63 +3774,88 @@ out:
 }
 
 /*
- * When a parent image gets probed, we only have the pool, image,
- * and snapshot ids but not the names of any of them.  This call
- * is made later to fill in those names.  It has to be done after
- * rbd_dev_snaps_update() has completed because some of the
- * information (in particular, snapshot name) is not available
- * until then.
+ * When an rbd image has a parent image, it is identified by the
+ * pool, image, and snapshot ids (not names).  This function fills
+ * in the names for those ids.  (It's OK if we can't figure out the
+ * name for an image id, but the pool and snapshot ids should always
+ * exist and have names.)  All names in an rbd spec are dynamically
+ * allocated.
+ *
+ * When an image being mapped (not a parent) is probed, we have the
+ * pool name and pool id, image name and image id, and the snapshot
+ * name.  The only thing we're missing is the snapshot id.
+ *
+ * The set of snapshots for an image is not known until they have
+ * been read by rbd_dev_snaps_update(), so we can't completely fill
+ * in this information until after that has been called.
  */
-static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
+static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 {
-       struct ceph_osd_client *osdc;
-       const char *name;
-       void *reply_buf = NULL;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_spec *spec = rbd_dev->spec;
+       const char *pool_name;
+       const char *image_name;
+       const char *snap_name;
        int ret;
 
-       if (rbd_dev->spec->pool_name)
-               return 0;       /* Already have the names */
+       /*
+        * An image being mapped will have the pool name (etc.), but
+        * we need to look up the snapshot id.
+        */
+       if (spec->pool_name) {
+               if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
+                       struct rbd_snap *snap;
+
+                       snap = snap_by_name(rbd_dev, spec->snap_name);
+                       if (!snap)
+                               return -ENOENT;
+                       spec->snap_id = snap->id;
+               } else {
+                       spec->snap_id = CEPH_NOSNAP;
+               }
+
+               return 0;
+       }
 
-       /* Look up the pool name */
+       /* Get the pool name; we have to make our own copy of this */
 
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
-       if (!name) {
-               rbd_warn(rbd_dev, "there is no pool with id %llu",
-                       rbd_dev->spec->pool_id);        /* Really a BUG() */
+       pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
+       if (!pool_name) {
+               rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
                return -EIO;
        }
-
-       rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
-       if (!rbd_dev->spec->pool_name)
+       pool_name = kstrdup(pool_name, GFP_KERNEL);
+       if (!pool_name)
                return -ENOMEM;
 
        /* Fetch the image name; tolerate failure here */
 
-       name = rbd_dev_image_name(rbd_dev);
-       if (name)
-               rbd_dev->spec->image_name = (char *)name;
-       else
+       image_name = rbd_dev_image_name(rbd_dev);
+       if (!image_name)
                rbd_warn(rbd_dev, "unable to get image name");
 
-       /* Look up the snapshot name. */
+       /* Look up the snapshot name, and make a copy */
 
-       name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
-       if (!name) {
-               rbd_warn(rbd_dev, "no snapshot with id %llu",
-                       rbd_dev->spec->snap_id);        /* Really a BUG() */
+       snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
+       if (!snap_name) {
+               rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
                ret = -EIO;
                goto out_err;
        }
-       rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
-       if(!rbd_dev->spec->snap_name)
+       snap_name = kstrdup(snap_name, GFP_KERNEL);
+       if (!snap_name) {
+               ret = -ENOMEM;
                goto out_err;
+       }
+
+       spec->pool_name = pool_name;
+       spec->image_name = image_name;
+       spec->snap_name = snap_name;
 
        return 0;
 out_err:
-       kfree(reply_buf);
-       kfree(rbd_dev->spec->pool_name);
-       rbd_dev->spec->pool_name = NULL;
+       kfree(image_name);
+       kfree(pool_name);
 
        return ret;
 }
@@ -3910,19 +3910,14 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
        }
        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
                goto out;
+       ret = 0;
 
-       size = sizeof (struct ceph_snap_context) +
-                               snap_count * sizeof (snapc->snaps[0]);
-       snapc = kmalloc(size, GFP_KERNEL);
+       snapc = rbd_snap_context_create(snap_count);
        if (!snapc) {
                ret = -ENOMEM;
                goto out;
        }
-       ret = 0;
-
-       atomic_set(&snapc->nref, 1);
        snapc->seq = seq;
-       snapc->num_snaps = snap_count;
        for (i = 0; i < snap_count; i++)
                snapc->snaps[i] = ceph_decode_64(&p);
 
@@ -3951,50 +3946,60 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
        if (!reply_buf)
                return ERR_PTR(-ENOMEM);
 
+       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
        snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
        ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapshot_name",
                                &snap_id, sizeof (snap_id),
                                reply_buf, size, NULL);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
-       if (ret < 0)
+       if (ret < 0) {
+               snap_name = ERR_PTR(ret);
                goto out;
+       }
 
        p = reply_buf;
-       end = reply_buf + size;
+       end = reply_buf + ret;
        snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
-       if (IS_ERR(snap_name)) {
-               ret = PTR_ERR(snap_name);
+       if (IS_ERR(snap_name))
                goto out;
-       } else {
-               dout("  snap_id 0x%016llx snap_name = %s\n",
-                       (unsigned long long)le64_to_cpu(snap_id), snap_name);
-       }
-       kfree(reply_buf);
 
-       return snap_name;
+       dout("  snap_id 0x%016llx snap_name = %s\n",
+               (unsigned long long)le64_to_cpu(snap_id), snap_name);
 out:
        kfree(reply_buf);
 
-       return ERR_PTR(ret);
+       return snap_name;
 }
 
 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
                u64 *snap_size, u64 *snap_features)
 {
        u64 snap_id;
-       u8 order;
+       u64 size;
+       u64 features;
+       char *snap_name;
        int ret;
 
+       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
        snap_id = rbd_dev->header.snapc->snaps[which];
-       ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
+       ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
        if (ret)
-               return ERR_PTR(ret);
-       ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
+               goto out_err;
+
+       ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
        if (ret)
-               return ERR_PTR(ret);
+               goto out_err;
 
-       return rbd_dev_v2_snap_name(rbd_dev, which);
+       snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
+       if (!IS_ERR(snap_name)) {
+               *snap_size = size;
+               *snap_features = features;
+       }
+
+       return snap_name;
+out_err:
+       return ERR_PTR(ret);
 }
 
 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
@@ -4036,8 +4041,6 @@ static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
        dout("rbd_dev_snaps_update returned %d\n", ret);
        if (ret)
                goto out;
-       ret = rbd_dev_snaps_register(rbd_dev);
-       dout("rbd_dev_snaps_register returned %d\n", ret);
 out:
        up_write(&rbd_dev->header_rwsem);
 
@@ -4055,6 +4058,11 @@ out:
  * Assumes the snapshots in the snapshot context are sorted by
  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
  * are also maintained in that order.)
+ *
+ * Note that any error occurs while updating the snapshot list
+ * aborts the update, and the entire list is cleared.  The snapshot
+ * list becomes inconsistent at that point anyway, so it might as
+ * well be empty.
  */
 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 {
@@ -4063,8 +4071,9 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
        struct list_head *head = &rbd_dev->snaps;
        struct list_head *links = head->next;
        u32 index = 0;
+       int ret = 0;
 
-       dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
+       dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
        while (index < snap_count || links != head) {
                u64 snap_id;
                struct rbd_snap *snap;
@@ -4085,18 +4094,20 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
                         * A previously-existing snapshot is not in
                         * the new snap context.
                         *
-                        * If the now missing snapshot is the one the
-                        * image is mapped to, clear its exists flag
-                        * so we can avoid sending any more requests
-                        * to it.
+                        * If the now-missing snapshot is the one
+                        * the image represents, clear its existence
+                        * flag so we can avoid sending any more
+                        * requests to it.
                         */
                        if (rbd_dev->spec->snap_id == snap->id)
                                clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-                       rbd_remove_snap_dev(snap);
-                       dout("%ssnap id %llu has been removed\n",
+                       dout("removing %ssnap id %llu\n",
                                rbd_dev->spec->snap_id == snap->id ?
                                                        "mapped " : "",
-                               (unsigned long long) snap->id);
+                               (unsigned long long)snap->id);
+
+                       list_del(&snap->node);
+                       rbd_snap_destroy(snap);
 
                        /* Done with this list entry; advance */
 
@@ -4106,24 +4117,25 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 
                snap_name = rbd_dev_snap_info(rbd_dev, index,
                                        &snap_size, &snap_features);
-               if (IS_ERR(snap_name))
-                       return PTR_ERR(snap_name);
+               if (IS_ERR(snap_name)) {
+                       ret = PTR_ERR(snap_name);
+                       dout("failed to get snap info, error %d\n", ret);
+                       goto out_err;
+               }
 
-               dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
-                       (unsigned long long) snap_id);
+               dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
+                       (unsigned long long)snap_id);
                if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
                        struct rbd_snap *new_snap;
 
                        /* We haven't seen this snapshot before */
 
-                       new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
+                       new_snap = rbd_snap_create(rbd_dev, snap_name,
                                        snap_id, snap_size, snap_features);
                        if (IS_ERR(new_snap)) {
-                               int err = PTR_ERR(new_snap);
-
-                               dout("  failed to add dev, error %d\n", err);
-
-                               return err;
+                               ret = PTR_ERR(new_snap);
+                               dout("  failed to add dev, error %d\n", ret);
+                               goto out_err;
                        }
 
                        /* New goes before existing, or at end of list */
@@ -4154,29 +4166,8 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
        dout("%s: done\n", __func__);
 
        return 0;
-}
-
-/*
- * Scan the list of snapshots and register the devices for any that
- * have not already been registered.
- */
-static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
-{
-       struct rbd_snap *snap;
-       int ret = 0;
-
-       dout("%s:\n", __func__);
-       if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
-               return -EIO;
-
-       list_for_each_entry(snap, &rbd_dev->snaps, node) {
-               if (!rbd_snap_registered(snap)) {
-                       ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
-                       if (ret < 0)
-                               break;
-               }
-       }
-       dout("%s: returning %d\n", __func__, ret);
+out_err:
+       rbd_remove_all_snaps(rbd_dev);
 
        return ret;
 }
@@ -4406,6 +4397,7 @@ static int rbd_add_parse_args(const char *buf,
        size_t len;
        char *options;
        const char *mon_addrs;
+       char *snap_name;
        size_t mon_addrs_size;
        struct rbd_spec *spec = NULL;
        struct rbd_options *rbd_opts = NULL;
@@ -4464,10 +4456,11 @@ static int rbd_add_parse_args(const char *buf,
                ret = -ENAMETOOLONG;
                goto out_err;
        }
-       spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
-       if (!spec->snap_name)
+       snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
+       if (!snap_name)
                goto out_mem;
-       *(spec->snap_name + len) = '\0';
+       *(snap_name + len) = '\0';
+       spec->snap_name = snap_name;
 
        /* Initialize all rbd options to the defaults */
 
@@ -4521,20 +4514,19 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        size_t size;
        char *object_name;
        void *response;
-       void *p;
-
-       /* If we already have it we don't need to look it up */
-
-       if (rbd_dev->spec->image_id)
-               return 0;
+       char *image_id;
 
        /*
         * When probing a parent image, the image id is already
         * known (and the image name likely is not).  There's no
-        * need to fetch the image id again in this case.
+        * need to fetch the image id again in this case.  We
+        * do still need to set the image format though.
         */
-       if (rbd_dev->spec->image_id)
+       if (rbd_dev->spec->image_id) {
+               rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
+
                return 0;
+       }
 
        /*
         * First, see if the format 2 image id file exists, and if
@@ -4556,24 +4548,32 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
                goto out;
        }
 
+       /* If it doesn't exist we'll assume it's a format 1 image */
+
        ret = rbd_obj_method_sync(rbd_dev, object_name,
                                "rbd", "get_id", NULL, 0,
                                response, RBD_IMAGE_ID_LEN_MAX, NULL);
        dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
-       if (ret < 0)
-               goto out;
-
-       p = response;
-       rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
-                                               p + ret,
+       if (ret == -ENOENT) {
+               image_id = kstrdup("", GFP_KERNEL);
+               ret = image_id ? 0 : -ENOMEM;
+               if (!ret)
+                       rbd_dev->image_format = 1;
+       } else if (ret > sizeof (__le32)) {
+               void *p = response;
+
+               image_id = ceph_extract_encoded_string(&p, p + ret,
                                                NULL, GFP_NOIO);
-       ret = 0;
-
-       if (IS_ERR(rbd_dev->spec->image_id)) {
-               ret = PTR_ERR(rbd_dev->spec->image_id);
-               rbd_dev->spec->image_id = NULL;
+               ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
+               if (!ret)
+                       rbd_dev->image_format = 2;
        } else {
-               dout("image_id is %s\n", rbd_dev->spec->image_id);
+               ret = -EINVAL;
+       }
+
+       if (!ret) {
+               rbd_dev->spec->image_id = image_id;
+               dout("image_id is %s\n", image_id);
        }
 out:
        kfree(response);
@@ -4587,12 +4587,6 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
        int ret;
        size_t size;
 
-       /* Version 1 images have no id; empty string is used */
-
-       rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
-       if (!rbd_dev->spec->image_id)
-               return -ENOMEM;
-
        /* Record the header object name for this rbd image. */
 
        size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
@@ -4615,8 +4609,6 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
        rbd_dev->parent_spec = NULL;
        rbd_dev->parent_overlap = 0;
 
-       rbd_dev->image_format = 1;
-
        dout("discovered version 1 image, header name is %s\n",
                rbd_dev->header_name);
 
@@ -4671,6 +4663,16 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
                ret = rbd_dev_v2_parent_info(rbd_dev);
                if (ret)
                        goto out_err;
+               rbd_warn(rbd_dev, "WARNING: kernel support for "
+                                       "layered rbd images is EXPERIMENTAL!");
+       }
+
+       /* If the image supports fancy striping, get its parameters */
+
+       if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
+               ret = rbd_dev_v2_striping_info(rbd_dev);
+               if (ret < 0)
+                       goto out_err;
        }
 
        /* crypto and compression type aren't (yet) supported for v2 images */
@@ -4685,8 +4687,6 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
                goto out_err;
        rbd_dev->header.obj_version = ver;
 
-       rbd_dev->image_format = 2;
-
        dout("discovered version 2 image, header name is %s\n",
                rbd_dev->header_name);
 
@@ -4703,11 +4703,49 @@ out_err:
        return ret;
 }
 
-static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
+static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
 {
        struct rbd_device *parent = NULL;
-       struct rbd_spec *parent_spec = NULL;
-       struct rbd_client *rbdc = NULL;
+       struct rbd_spec *parent_spec;
+       struct rbd_client *rbdc;
+       int ret;
+
+       if (!rbd_dev->parent_spec)
+               return 0;
+       /*
+        * We need to pass a reference to the client and the parent
+        * spec when creating the parent rbd_dev.  Images related by
+        * parent/child relationships always share both.
+        */
+       parent_spec = rbd_spec_get(rbd_dev->parent_spec);
+       rbdc = __rbd_get_client(rbd_dev->rbd_client);
+
+       ret = -ENOMEM;
+       parent = rbd_dev_create(rbdc, parent_spec);
+       if (!parent)
+               goto out_err;
+
+       ret = rbd_dev_image_probe(parent);
+       if (ret < 0)
+               goto out_err;
+       rbd_dev->parent = parent;
+
+       return 0;
+out_err:
+       if (parent) {
+               rbd_spec_put(rbd_dev->parent_spec);
+               kfree(rbd_dev->header_name);
+               rbd_dev_destroy(parent);
+       } else {
+               rbd_put_client(rbdc);
+               rbd_spec_put(parent_spec);
+       }
+
+       return ret;
+}
+
+static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
+{
        int ret;
 
        /* no need to lock here, as rbd_dev is not registered yet */
@@ -4715,7 +4753,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                return ret;
 
-       ret = rbd_dev_probe_update_spec(rbd_dev);
+       ret = rbd_dev_spec_update(rbd_dev);
        if (ret)
                goto err_out_snaps;
 
@@ -4748,38 +4786,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_disk;
 
-       /*
-        * At this point cleanup in the event of an error is the job
-        * of the sysfs code (initiated by rbd_bus_del_dev()).
-        */
-       /* Probe the parent if there is one */
-
-       if (rbd_dev->parent_spec) {
-               /*
-                * We need to pass a reference to the client and the
-                * parent spec when creating the parent rbd_dev.
-                * Images related by parent/child relationships
-                * always share both.
-                */
-               parent_spec = rbd_spec_get(rbd_dev->parent_spec);
-               rbdc = __rbd_get_client(rbd_dev->rbd_client);
-
-               parent = rbd_dev_create(rbdc, parent_spec);
-               if (!parent) {
-                       ret = -ENOMEM;
-                       goto err_out_spec;
-               }
-               rbdc = NULL;            /* parent now owns reference */
-               parent_spec = NULL;     /* parent now owns reference */
-               ret = rbd_dev_probe(parent);
-               if (ret < 0)
-                       goto err_out_parent;
-               rbd_dev->parent = parent;
-       }
-
-       down_write(&rbd_dev->header_rwsem);
-       ret = rbd_dev_snaps_register(rbd_dev);
-       up_write(&rbd_dev->header_rwsem);
+       ret = rbd_dev_probe_parent(rbd_dev);
        if (ret)
                goto err_out_bus;
 
@@ -4789,6 +4796,8 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
 
        /* Everything's ready.  Announce the disk to the world. */
 
+       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        add_disk(rbd_dev->disk);
 
        pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
@@ -4796,11 +4805,6 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
 
        return ret;
 
-err_out_parent:
-       rbd_dev_destroy(parent);
-err_out_spec:
-       rbd_spec_put(parent_spec);
-       rbd_put_client(rbdc);
 err_out_bus:
        /* this will also clean up rest of rbd_dev stuff */
 
@@ -4824,7 +4828,7 @@ err_out_snaps:
  * device.  For format 2 images this includes determining the image
  * id.
  */
-static int rbd_dev_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
 {
        int ret;
 
@@ -4835,19 +4839,28 @@ static int rbd_dev_probe(struct rbd_device *rbd_dev)
         */
        ret = rbd_dev_image_id(rbd_dev);
        if (ret)
+               return ret;
+       rbd_assert(rbd_dev->spec->image_id);
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+       if (rbd_dev->image_format == 1)
                ret = rbd_dev_v1_probe(rbd_dev);
        else
                ret = rbd_dev_v2_probe(rbd_dev);
-       if (ret) {
-               dout("probe failed, returning %d\n", ret);
-
-               return ret;
-       }
+       if (ret)
+               goto out_err;
 
        ret = rbd_dev_probe_finish(rbd_dev);
        if (ret)
                rbd_header_free(&rbd_dev->header);
 
+       return ret;
+out_err:
+       kfree(rbd_dev->spec->image_id);
+       rbd_dev->spec->image_id = NULL;
+
+       dout("probe failed, returning %d\n", ret);
+
        return ret;
 }
 
@@ -4883,11 +4896,13 @@ static ssize_t rbd_add(struct bus_type *bus,
        rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
        if (rc < 0)
                goto err_out_client;
-       spec->pool_id = (u64) rc;
+       spec->pool_id = (u64)rc;
 
        /* The ceph file layout needs to fit pool id in 32 bits */
 
-       if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
+       if (spec->pool_id > (u64)U32_MAX) {
+               rbd_warn(NULL, "pool id too large (%llu > %u)\n",
+                               (unsigned long long)spec->pool_id, U32_MAX);
                rc = -EIO;
                goto err_out_client;
        }
@@ -4902,12 +4917,14 @@ static ssize_t rbd_add(struct bus_type *bus,
        kfree(rbd_opts);
        rbd_opts = NULL;        /* done with this */
 
-       rc = rbd_dev_probe(rbd_dev);
+       rc = rbd_dev_image_probe(rbd_dev);
        if (rc < 0)
                goto err_out_rbd_dev;
 
        return count;
 err_out_rbd_dev:
+       rbd_spec_put(rbd_dev->parent_spec);
+       kfree(rbd_dev->header_name);
        rbd_dev_destroy(rbd_dev);
 err_out_client:
        rbd_put_client(rbdc);
@@ -4921,7 +4938,7 @@ err_out_module:
 
        dout("Error adding device %s\n", buf);
 
-       return (ssize_t) rc;
+       return (ssize_t)rc;
 }
 
 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
@@ -4958,6 +4975,8 @@ static void rbd_dev_release(struct device *dev)
        /* done with the id, and with the rbd_dev */
        rbd_dev_id_put(rbd_dev);
        rbd_assert(rbd_dev->rbd_client != NULL);
+       rbd_spec_put(rbd_dev->parent_spec);
+       kfree(rbd_dev->header_name);
        rbd_dev_destroy(rbd_dev);
 
        /* release module ref */
@@ -4970,6 +4989,29 @@ static void __rbd_remove(struct rbd_device *rbd_dev)
        rbd_bus_del_dev(rbd_dev);
 }
 
+static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
+{
+       while (rbd_dev->parent_spec) {
+               struct rbd_device *first = rbd_dev;
+               struct rbd_device *second = first->parent;
+               struct rbd_device *third;
+
+               /*
+                * Follow to the parent with no grandparent and
+                * remove it.
+                */
+               while (second && (third = second->parent)) {
+                       first = second;
+                       second = third;
+               }
+               __rbd_remove(second);
+               rbd_spec_put(first->parent_spec);
+               first->parent_spec = NULL;
+               first->parent_overlap = 0;
+               first->parent = NULL;
+       }
+}
+
 static ssize_t rbd_remove(struct bus_type *bus,
                          const char *buf,
                          size_t count)
@@ -5005,25 +5047,8 @@ static ssize_t rbd_remove(struct bus_type *bus,
        if (ret < 0)
                goto done;
 
-       while (rbd_dev->parent_spec) {
-               struct rbd_device *first = rbd_dev;
-               struct rbd_device *second = first->parent;
-               struct rbd_device *third;
+       rbd_dev_remove_parent(rbd_dev);
 
-               /*
-                * Follow to the parent with no grandparent and
-                * remove it.
-                */
-               while (second && (third = second->parent)) {
-                       first = second;
-                       second = third;
-               }
-               __rbd_remove(second);
-               rbd_spec_put(first->parent_spec);
-               first->parent_spec = NULL;
-               first->parent_overlap = 0;
-               first->parent = NULL;
-       }
        __rbd_remove(rbd_dev);
 
 done: