]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
rbd: define parent image request routines
[~andy/linux] / drivers / block / rbd.c
index 15ac2a54d4f3a3100d5b1068bb433c0da7f4e361..1ffdfbfbf3c41e39c73411ae4c9a0a120ca261e3 100644 (file)
@@ -224,6 +224,7 @@ struct rbd_obj_request {
                };
        };
        struct page             **copyup_pages;
+       u32                     copyup_page_count;
 
        struct ceph_osd_request *osd_req;
 
@@ -256,6 +257,7 @@ struct rbd_img_request {
                struct rbd_obj_request  *obj_request;   /* obj req initiator */
        };
        struct page             **copyup_pages;
+       u32                     copyup_page_count;
        spinlock_t              completion_lock;/* protects next_completion */
        u32                     next_completion;
        rbd_img_callback_t      callback;
@@ -358,7 +360,7 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
                          size_t count);
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 
 static struct bus_attribute rbd_bus_attrs[] = {
        __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -1357,20 +1359,18 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
-       dout("%s: img %p (was %d)\n", __func__, img_request,
-               atomic_read(&img_request->kref.refcount));
-       kref_get(&img_request->kref);
-}
-
+static bool img_request_child_test(struct rbd_img_request *img_request);
+static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
 static void rbd_img_request_put(struct rbd_img_request *img_request)
 {
        rbd_assert(img_request != NULL);
        dout("%s: img %p (was %d)\n", __func__, img_request,
                atomic_read(&img_request->kref.refcount));
-       kref_put(&img_request->kref, rbd_img_request_destroy);
+       if (img_request_child_test(img_request))
+               kref_put(&img_request->kref, rbd_parent_request_destroy);
+       else
+               kref_put(&img_request->kref, rbd_img_request_destroy);
 }
 
 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
@@ -1487,6 +1487,12 @@ static void img_request_child_set(struct rbd_img_request *img_request)
        smp_mb();
 }
 
+static void img_request_child_clear(struct rbd_img_request *img_request)
+{
+       clear_bit(IMG_REQ_CHILD, &img_request->flags);
+       smp_mb();
+}
+
 static bool img_request_child_test(struct rbd_img_request *img_request)
 {
        smp_mb();
@@ -1842,6 +1848,17 @@ static void rbd_obj_request_destroy(struct kref *kref)
        kmem_cache_free(rbd_obj_request_cache, obj_request);
 }
 
+/* It's OK to call this for a device with no parent */
+
+static void rbd_spec_put(struct rbd_spec *spec);
+static void rbd_dev_unparent(struct rbd_device *rbd_dev)
+{
+       rbd_dev_remove_parent(rbd_dev);
+       rbd_spec_put(rbd_dev->parent_spec);
+       rbd_dev->parent_spec = NULL;
+       rbd_dev->parent_overlap = 0;
+}
+
 /*
  * Caller is responsible for filling in the list of object requests
  * that comprises the image request, and the Linux request pointer
@@ -1850,8 +1867,7 @@ static void rbd_obj_request_destroy(struct kref *kref)
 static struct rbd_img_request *rbd_img_request_create(
                                        struct rbd_device *rbd_dev,
                                        u64 offset, u64 length,
-                                       bool write_request,
-                                       bool child_request)
+                                       bool write_request)
 {
        struct rbd_img_request *img_request;
 
@@ -1876,9 +1892,7 @@ static struct rbd_img_request *rbd_img_request_create(
        } else {
                img_request->snap_id = rbd_dev->spec->snap_id;
        }
-       if (child_request)
-               img_request_child_set(img_request);
-       if (rbd_dev->parent_spec)
+       if (rbd_dev->parent_overlap)
                img_request_layered_set(img_request);
        spin_lock_init(&img_request->completion_lock);
        img_request->next_completion = 0;
@@ -1888,9 +1902,6 @@ static struct rbd_img_request *rbd_img_request_create(
        INIT_LIST_HEAD(&img_request->obj_requests);
        kref_init(&img_request->kref);
 
-       rbd_img_request_get(img_request);       /* Avoid a warning */
-       rbd_img_request_put(img_request);       /* TEMPORARY */
-
        dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
                write_request ? "write" : "read", offset, length,
                img_request);
@@ -1915,12 +1926,46 @@ static void rbd_img_request_destroy(struct kref *kref)
        if (img_request_write_test(img_request))
                ceph_put_snap_context(img_request->snapc);
 
-       if (img_request_child_test(img_request))
-               rbd_obj_request_put(img_request->obj_request);
-
        kmem_cache_free(rbd_img_request_cache, img_request);
 }
 
+static struct rbd_img_request *rbd_parent_request_create(
+                                       struct rbd_obj_request *obj_request,
+                                       u64 img_offset, u64 length)
+{
+       struct rbd_img_request *parent_request;
+       struct rbd_device *rbd_dev;
+
+       rbd_assert(obj_request->img_request);
+       rbd_dev = obj_request->img_request->rbd_dev;
+
+       parent_request = rbd_img_request_create(rbd_dev->parent,
+                                               img_offset, length, false);
+       if (!parent_request)
+               return NULL;
+
+       img_request_child_set(parent_request);
+       rbd_obj_request_get(obj_request);
+       parent_request->obj_request = obj_request;
+
+       return parent_request;
+}
+
+static void rbd_parent_request_destroy(struct kref *kref)
+{
+       struct rbd_img_request *parent_request;
+       struct rbd_obj_request *orig_request;
+
+       parent_request = container_of(kref, struct rbd_img_request, kref);
+       orig_request = parent_request->obj_request;
+
+       parent_request->obj_request = NULL;
+       rbd_obj_request_put(orig_request);
+       img_request_child_clear(parent_request);
+
+       rbd_img_request_destroy(kref);
+}
+
 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request;
@@ -2129,7 +2174,7 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request;
        struct rbd_device *rbd_dev;
-       u64 length;
+       struct page **pages;
        u32 page_count;
 
        rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
@@ -2139,12 +2184,14 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 
        rbd_dev = img_request->rbd_dev;
        rbd_assert(rbd_dev);
-       length = (u64)1 << rbd_dev->header.obj_order;
-       page_count = (u32)calc_pages_for(0, length);
 
-       rbd_assert(obj_request->copyup_pages);
-       ceph_release_page_vector(obj_request->copyup_pages, page_count);
+       pages = obj_request->copyup_pages;
+       rbd_assert(pages != NULL);
        obj_request->copyup_pages = NULL;
+       page_count = obj_request->copyup_page_count;
+       rbd_assert(page_count);
+       obj_request->copyup_page_count = 0;
+       ceph_release_page_vector(pages, page_count);
 
        /*
         * We want the transfer count to reflect the size of the
@@ -2168,9 +2215,11 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
        struct page **pages;
+       u32 page_count;
        int result;
-       u64 obj_size;
-       u64 xferred;
+       u64 parent_length;
+       u64 offset;
+       u64 length;
 
        rbd_assert(img_request_child_test(img_request));
 
@@ -2179,46 +2228,59 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        pages = img_request->copyup_pages;
        rbd_assert(pages != NULL);
        img_request->copyup_pages = NULL;
+       page_count = img_request->copyup_page_count;
+       rbd_assert(page_count);
+       img_request->copyup_page_count = 0;
 
        orig_request = img_request->obj_request;
        rbd_assert(orig_request != NULL);
-       rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(orig_request->type));
        result = img_request->result;
-       obj_size = img_request->length;
-       xferred = img_request->xferred;
+       parent_length = img_request->length;
+       rbd_assert(parent_length == img_request->xferred);
        rbd_img_request_put(img_request);
 
        rbd_assert(orig_request->img_request);
        rbd_dev = orig_request->img_request->rbd_dev;
        rbd_assert(rbd_dev);
-       rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
 
        if (result)
                goto out_err;
 
-       /* Allocate the new copyup osd request for the original request */
-
+       /*
+        * The original osd request is of no use to use any more.
+        * We need a new one that can hold the two ops in a copyup
+        * request.  Allocate the new copyup osd request for the
+        * original request, and release the old one.
+        */
        result = -ENOMEM;
-       rbd_assert(!orig_request->osd_req);
        osd_req = rbd_osd_req_create_copyup(orig_request);
        if (!osd_req)
                goto out_err;
+       rbd_osd_req_destroy(orig_request->osd_req);
        orig_request->osd_req = osd_req;
        orig_request->copyup_pages = pages;
+       orig_request->copyup_page_count = page_count;
 
        /* Initialize the copyup op */
 
        osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
-       osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+       osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
                                                false, false);
 
        /* Then the original write request op */
 
+       offset = orig_request->offset;
+       length = orig_request->length;
        osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
-                                       orig_request->offset,
-                                       orig_request->length, 0, 0);
-       osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
-                                       orig_request->length);
+                                       offset, length, 0, 0);
+       if (orig_request->type == OBJ_REQUEST_BIO)
+               osd_req_op_extent_osd_data_bio(osd_req, 1,
+                                       orig_request->bio_list, length);
+       else
+               osd_req_op_extent_osd_data_pages(osd_req, 1,
+                                       orig_request->pages, length,
+                                       offset & ~PAGE_MASK, false, false);
 
        rbd_osd_req_format_write(orig_request);
 
@@ -2264,22 +2326,13 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
        int result;
 
        rbd_assert(obj_request_img_data_test(obj_request));
-       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(obj_request->type));
 
        img_request = obj_request->img_request;
        rbd_assert(img_request != NULL);
        rbd_dev = img_request->rbd_dev;
        rbd_assert(rbd_dev->parent != NULL);
 
-       /*
-        * First things first.  The original osd request is of no
-        * use to use any more, we'll need a new one that can hold
-        * the two ops in a copyup request.  We'll get that later,
-        * but for now we can release the old one.
-        */
-       rbd_osd_req_destroy(obj_request->osd_req);
-       obj_request->osd_req = NULL;
-
        /*
         * Determine the byte range covered by the object in the
         * child image to which the original request was to be sent.
@@ -2310,18 +2363,16 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
        }
 
        result = -ENOMEM;
-       parent_request = rbd_img_request_create(rbd_dev->parent,
-                                               img_offset, length,
-                                               false, true);
+       parent_request = rbd_parent_request_create(obj_request,
+                                               img_offset, length);
        if (!parent_request)
                goto out_err;
-       rbd_obj_request_get(obj_request);
-       parent_request->obj_request = obj_request;
 
        result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
        if (result)
                goto out_err;
        parent_request->copyup_pages = pages;
+       parent_request->copyup_page_count = page_count;
 
        parent_request->callback = rbd_img_obj_parent_read_full_callback;
        result = rbd_img_request_submit(parent_request);
@@ -2329,6 +2380,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
                return 0;
 
        parent_request->copyup_pages = NULL;
+       parent_request->copyup_page_count = 0;
        parent_request->obj_request = NULL;
        rbd_obj_request_put(obj_request);
 out_err:
@@ -2567,31 +2619,28 @@ out:
 
 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
 {
-       struct rbd_device *rbd_dev;
        struct rbd_img_request *img_request;
        int result;
 
        rbd_assert(obj_request_img_data_test(obj_request));
        rbd_assert(obj_request->img_request != NULL);
        rbd_assert(obj_request->result == (s32) -ENOENT);
-       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(obj_request->type));
 
-       rbd_dev = obj_request->img_request->rbd_dev;
-       rbd_assert(rbd_dev->parent != NULL);
        /* rbd_read_finish(obj_request, obj_request->length); */
-       img_request = rbd_img_request_create(rbd_dev->parent,
+       img_request = rbd_parent_request_create(obj_request,
                                                obj_request->img_offset,
-                                               obj_request->length,
-                                               false, true);
+                                               obj_request->length);
        result = -ENOMEM;
        if (!img_request)
                goto out_err;
 
-       rbd_obj_request_get(obj_request);
-       img_request->obj_request = obj_request;
-
-       result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-                                       obj_request->bio_list);
+       if (obj_request->type == OBJ_REQUEST_BIO)
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+                                               obj_request->bio_list);
+       else
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
+                                               obj_request->pages);
        if (result)
                goto out_err;
 
@@ -2660,7 +2709,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
  * Request sync osd watch/unwatch.  The value of "start" determines
  * whether a watch request is being initiated or torn down.
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
@@ -2694,7 +2743,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
                                        rbd_dev->watch_request->osd_req);
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie, 0, start);
+                               rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
        rbd_osd_req_format_write(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
@@ -2896,7 +2945,7 @@ static void rbd_request_fn(struct request_queue *q)
 
                result = -ENOMEM;
                img_request = rbd_img_request_create(rbd_dev, offset, length,
-                                                       write_request, false);
+                                                       write_request);
                if (!img_request)
                        goto end_request;
 
@@ -3602,6 +3651,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        __le64 snapid;
        void *p;
        void *end;
+       u64 pool_id;
        char *image_id;
        u64 overlap;
        int ret;
@@ -3632,18 +3682,19 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        p = reply_buf;
        end = reply_buf + ret;
        ret = -ERANGE;
-       ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
-       if (parent_spec->pool_id == CEPH_NOPOOL)
+       ceph_decode_64_safe(&p, end, pool_id, out_err);
+       if (pool_id == CEPH_NOPOOL)
                goto out;       /* No parent?  No problem. */
 
        /* The ceph file layout needs to fit pool id in 32 bits */
 
        ret = -EIO;
-       if (parent_spec->pool_id > (u64)U32_MAX) {
+       if (pool_id > (u64)U32_MAX) {
                rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
-                       (unsigned long long)parent_spec->pool_id, U32_MAX);
+                       (unsigned long long)pool_id, U32_MAX);
                goto out_err;
        }
+       parent_spec->pool_id = pool_id;
 
        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
@@ -3654,9 +3705,14 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
        ceph_decode_64_safe(&p, end, overlap, out_err);
 
-       rbd_dev->parent_overlap = overlap;
-       rbd_dev->parent_spec = parent_spec;
-       parent_spec = NULL;     /* rbd_dev now owns this */
+       if (overlap) {
+               rbd_spec_put(rbd_dev->parent_spec);
+               rbd_dev->parent_spec = parent_spec;
+               parent_spec = NULL;     /* rbd_dev now owns this */
+               rbd_dev->parent_overlap = overlap;
+       } else {
+               rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+       }
 out:
        ret = 0;
 out_err:
@@ -4019,17 +4075,43 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
                        goto out;
        }
 
+       /*
+        * If the image supports layering, get the parent info.  We
+        * need to probe the first time regardless.  Thereafter we
+        * only need to if there's a parent, to see if it has
+        * disappeared due to the mapped image getting flattened.
+        */
+       if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
+                       (first_time || rbd_dev->parent_spec)) {
+               bool warn;
+
+               ret = rbd_dev_v2_parent_info(rbd_dev);
+               if (ret)
+                       goto out;
+
+               /*
+                * Print a warning if this is the initial probe and
+                * the image has a parent.  Don't print it if the
+                * image now being probed is itself a parent.  We
+                * can tell at this point because we won't know its
+                * pool name yet (just its pool id).
+                */
+               warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
+               if (first_time && warn)
+                       rbd_warn(rbd_dev, "WARNING: kernel layering "
+                                       "is EXPERIMENTAL!");
+       }
+
        ret = rbd_dev_v2_image_size(rbd_dev);
        if (ret)
                goto out;
+
        if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
                if (rbd_dev->mapping.size != rbd_dev->header.image_size)
                        rbd_dev->mapping.size = rbd_dev->header.image_size;
 
        ret = rbd_dev_v2_snap_context(rbd_dev);
        dout("rbd_dev_v2_snap_context returned %d\n", ret);
-       if (ret)
-               goto out;
 out:
        up_write(&rbd_dev->header_rwsem);
 
@@ -4452,10 +4534,7 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 {
        struct rbd_image_header *header;
 
-       rbd_dev_remove_parent(rbd_dev);
-       rbd_spec_put(rbd_dev->parent_spec);
-       rbd_dev->parent_spec = NULL;
-       rbd_dev->parent_overlap = 0;
+       rbd_dev_unparent(rbd_dev);
 
        /* Free dynamic fields from the header, then zero it out */
 
@@ -4483,24 +4562,6 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
        if (ret)
                goto out_err;
 
-       /* If the image supports layering, get the parent info */
-
-       if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
-               ret = rbd_dev_v2_parent_info(rbd_dev);
-               if (ret)
-                       goto out_err;
-               /*
-                * Print a warning if this image has a parent.
-                * Don't print it if the image now being probed
-                * is itself a parent.  We can tell at this point
-                * because we won't know its pool name yet (just its
-                * pool id).
-                */
-               if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
-                       rbd_warn(rbd_dev, "WARNING: kernel layering "
-                                       "is EXPERIMENTAL!");
-       }
-
        /* If the image supports fancy striping, get its parameters */
 
        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
@@ -4512,11 +4573,7 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
 
        return 0;
 out_err:
-       rbd_dev->parent_overlap = 0;
-       rbd_spec_put(rbd_dev->parent_spec);
-       rbd_dev->parent_spec = NULL;
-       kfree(rbd_dev->header_name);
-       rbd_dev->header_name = NULL;
+       rbd_dev->header.features = 0;
        kfree(rbd_dev->header.object_prefix);
        rbd_dev->header.object_prefix = NULL;
 
@@ -4545,7 +4602,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
        if (!parent)
                goto out_err;
 
-       ret = rbd_dev_image_probe(parent, true);
+       ret = rbd_dev_image_probe(parent, false);
        if (ret < 0)
                goto out_err;
        rbd_dev->parent = parent;
@@ -4553,7 +4610,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
        return 0;
 out_err:
        if (parent) {
-               rbd_spec_put(rbd_dev->parent_spec);
+               rbd_dev_unparent(rbd_dev);
                kfree(rbd_dev->header_name);
                rbd_dev_destroy(parent);
        } else {
@@ -4650,12 +4707,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
 
 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 {
-       int ret;
-
        rbd_dev_unprobe(rbd_dev);
-       ret = rbd_dev_header_watch_sync(rbd_dev, 0);
-       if (ret)
-               rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
        rbd_dev->image_format = 0;
@@ -4667,10 +4719,11 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 
 /*
  * Probe for the existence of the header object for the given rbd
- * device.  For format 2 images this includes determining the image
- * id.
+ * device.  If this image is the one being mapped (i.e., not a
+ * parent), initiate a watch on its header object before using that
+ * object to get detailed information about the rbd image.
  */
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 {
        int ret;
        int tmp;
@@ -4690,9 +4743,11 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
        if (ret)
                goto err_out_format;
 
-       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
-       if (ret)
-               goto out_header_name;
+       if (mapping) {
+               ret = rbd_dev_header_watch_sync(rbd_dev, true);
+               if (ret)
+                       goto out_header_name;
+       }
 
        if (rbd_dev->image_format == 1)
                ret = rbd_dev_v1_header_info(rbd_dev);
@@ -4705,12 +4760,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
        if (ret)
                goto err_out_probe;
 
-       /* If we are mapping a snapshot it must be marked read-only */
-
-       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
-               read_only = true;
-       rbd_dev->mapping.read_only = read_only;
-
        ret = rbd_dev_probe_parent(rbd_dev);
        if (ret)
                goto err_out_probe;
@@ -4722,9 +4771,12 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
 err_out_probe:
        rbd_dev_unprobe(rbd_dev);
 err_out_watch:
-       tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
-       if (tmp)
-               rbd_warn(rbd_dev, "unable to tear down watch request\n");
+       if (mapping) {
+               tmp = rbd_dev_header_watch_sync(rbd_dev, false);
+               if (tmp)
+                       rbd_warn(rbd_dev, "unable to tear down "
+                                       "watch request (%d)\n", tmp);
+       }
 out_header_name:
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
@@ -4791,10 +4843,16 @@ static ssize_t rbd_add(struct bus_type *bus,
        rbdc = NULL;            /* rbd_dev now owns this */
        spec = NULL;            /* rbd_dev now owns this */
 
-       rc = rbd_dev_image_probe(rbd_dev, read_only);
+       rc = rbd_dev_image_probe(rbd_dev, true);
        if (rc < 0)
                goto err_out_rbd_dev;
 
+       /* If we are mapping a snapshot it must be marked read-only */
+
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+               read_only = true;
+       rbd_dev->mapping.read_only = read_only;
+
        rc = rbd_dev_device_setup(rbd_dev);
        if (!rc)
                return count;
@@ -4907,10 +4965,13 @@ static ssize_t rbd_remove(struct bus_type *bus,
        spin_unlock_irq(&rbd_dev->lock);
        if (ret < 0)
                goto done;
-       ret = count;
        rbd_bus_del_dev(rbd_dev);
+       ret = rbd_dev_header_watch_sync(rbd_dev, false);
+       if (ret)
+               rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        rbd_dev_image_release(rbd_dev);
        module_put(THIS_MODULE);
+       ret = count;
 done:
        mutex_unlock(&ctl_mutex);