]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
rbd: don't release write request until necessary
[~andy/linux] / drivers / block / rbd.c
index dbfc44a9defd327b62a8b1cc787e8ee1411b9066..d861c71b4005032c789259a0d8c8e252559c6cf1 100644 (file)
@@ -224,6 +224,7 @@ struct rbd_obj_request {
                };
        };
        struct page             **copyup_pages;
+       u32                     copyup_page_count;
 
        struct ceph_osd_request *osd_req;
 
@@ -256,6 +257,7 @@ struct rbd_img_request {
                struct rbd_obj_request  *obj_request;   /* obj req initiator */
        };
        struct page             **copyup_pages;
+       u32                     copyup_page_count;
        spinlock_t              completion_lock;/* protects next_completion */
        u32                     next_completion;
        rbd_img_callback_t      callback;
@@ -358,7 +360,7 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
                          size_t count);
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 
 static struct bus_attribute rbd_bus_attrs[] = {
        __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -1357,13 +1359,6 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
-       dout("%s: img %p (was %d)\n", __func__, img_request,
-               atomic_read(&img_request->kref.refcount));
-       kref_get(&img_request->kref);
-}
-
 static void rbd_img_request_destroy(struct kref *kref);
 static void rbd_img_request_put(struct rbd_img_request *img_request)
 {
@@ -1878,7 +1873,7 @@ static struct rbd_img_request *rbd_img_request_create(
        }
        if (child_request)
                img_request_child_set(img_request);
-       if (rbd_dev->parent_spec)
+       if (rbd_dev->parent_overlap)
                img_request_layered_set(img_request);
        spin_lock_init(&img_request->completion_lock);
        img_request->next_completion = 0;
@@ -1888,9 +1883,6 @@ static struct rbd_img_request *rbd_img_request_create(
        INIT_LIST_HEAD(&img_request->obj_requests);
        kref_init(&img_request->kref);
 
-       rbd_img_request_get(img_request);       /* Avoid a warning */
-       rbd_img_request_put(img_request);       /* TEMPORARY */
-
        dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
                write_request ? "write" : "read", offset, length,
                img_request);
@@ -2129,7 +2121,7 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request;
        struct rbd_device *rbd_dev;
-       u64 length;
+       struct page **pages;
        u32 page_count;
 
        rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
@@ -2139,12 +2131,14 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 
        rbd_dev = img_request->rbd_dev;
        rbd_assert(rbd_dev);
-       length = (u64)1 << rbd_dev->header.obj_order;
-       page_count = (u32)calc_pages_for(0, length);
 
-       rbd_assert(obj_request->copyup_pages);
-       ceph_release_page_vector(obj_request->copyup_pages, page_count);
+       pages = obj_request->copyup_pages;
+       rbd_assert(pages != NULL);
        obj_request->copyup_pages = NULL;
+       page_count = obj_request->copyup_page_count;
+       rbd_assert(page_count);
+       obj_request->copyup_page_count = 0;
+       ceph_release_page_vector(pages, page_count);
 
        /*
         * We want the transfer count to reflect the size of the
@@ -2168,9 +2162,11 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
        struct page **pages;
+       u32 page_count;
        int result;
-       u64 obj_size;
-       u64 xferred;
+       u64 parent_length;
+       u64 offset;
+       u64 length;
 
        rbd_assert(img_request_child_test(img_request));
 
@@ -2179,46 +2175,59 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        pages = img_request->copyup_pages;
        rbd_assert(pages != NULL);
        img_request->copyup_pages = NULL;
+       page_count = img_request->copyup_page_count;
+       rbd_assert(page_count);
+       img_request->copyup_page_count = 0;
 
        orig_request = img_request->obj_request;
        rbd_assert(orig_request != NULL);
-       rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(orig_request->type));
        result = img_request->result;
-       obj_size = img_request->length;
-       xferred = img_request->xferred;
+       parent_length = img_request->length;
+       rbd_assert(parent_length == img_request->xferred);
        rbd_img_request_put(img_request);
 
        rbd_assert(orig_request->img_request);
        rbd_dev = orig_request->img_request->rbd_dev;
        rbd_assert(rbd_dev);
-       rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
 
        if (result)
                goto out_err;
 
-       /* Allocate the new copyup osd request for the original request */
-
+       /*
+        * The original osd request is of no use to use any more.
+        * We need a new one that can hold the two ops in a copyup
+        * request.  Allocate the new copyup osd request for the
+        * original request, and release the old one.
+        */
        result = -ENOMEM;
-       rbd_assert(!orig_request->osd_req);
        osd_req = rbd_osd_req_create_copyup(orig_request);
        if (!osd_req)
                goto out_err;
+       rbd_osd_req_destroy(orig_request->osd_req);
        orig_request->osd_req = osd_req;
        orig_request->copyup_pages = pages;
+       orig_request->copyup_page_count = page_count;
 
        /* Initialize the copyup op */
 
        osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
-       osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+       osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
                                                false, false);
 
        /* Then the original write request op */
 
+       offset = orig_request->offset;
+       length = orig_request->length;
        osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
-                                       orig_request->offset,
-                                       orig_request->length, 0, 0);
-       osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
-                                       orig_request->length);
+                                       offset, length, 0, 0);
+       if (orig_request->type == OBJ_REQUEST_BIO)
+               osd_req_op_extent_osd_data_bio(osd_req, 1,
+                                       orig_request->bio_list, length);
+       else
+               osd_req_op_extent_osd_data_pages(osd_req, 1,
+                                       orig_request->pages, length,
+                                       offset & ~PAGE_MASK, false, false);
 
        rbd_osd_req_format_write(orig_request);
 
@@ -2264,22 +2273,13 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
        int result;
 
        rbd_assert(obj_request_img_data_test(obj_request));
-       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request_type_valid(obj_request->type));
 
        img_request = obj_request->img_request;
        rbd_assert(img_request != NULL);
        rbd_dev = img_request->rbd_dev;
        rbd_assert(rbd_dev->parent != NULL);
 
-       /*
-        * First things first.  The original osd request is of no
-        * use to use any more, we'll need a new one that can hold
-        * the two ops in a copyup request.  We'll get that later,
-        * but for now we can release the old one.
-        */
-       rbd_osd_req_destroy(obj_request->osd_req);
-       obj_request->osd_req = NULL;
-
        /*
         * Determine the byte range covered by the object in the
         * child image to which the original request was to be sent.
@@ -2322,6 +2322,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
        if (result)
                goto out_err;
        parent_request->copyup_pages = pages;
+       parent_request->copyup_page_count = page_count;
 
        parent_request->callback = rbd_img_obj_parent_read_full_callback;
        result = rbd_img_request_submit(parent_request);
@@ -2329,6 +2330,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
                return 0;
 
        parent_request->copyup_pages = NULL;
+       parent_request->copyup_page_count = 0;
        parent_request->obj_request = NULL;
        rbd_obj_request_put(obj_request);
 out_err:
@@ -2664,7 +2666,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
  * Request sync osd watch/unwatch.  The value of "start" determines
  * whether a watch request is being initiated or torn down.
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
@@ -2698,7 +2700,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
                                        rbd_dev->watch_request->osd_req);
 
        osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie, 0, start);
+                               rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
        rbd_osd_req_format_write(obj_request);
 
        ret = rbd_obj_request_submit(osdc, obj_request);
@@ -3606,6 +3608,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        __le64 snapid;
        void *p;
        void *end;
+       u64 pool_id;
        char *image_id;
        u64 overlap;
        int ret;
@@ -3636,18 +3639,19 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        p = reply_buf;
        end = reply_buf + ret;
        ret = -ERANGE;
-       ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
-       if (parent_spec->pool_id == CEPH_NOPOOL)
+       ceph_decode_64_safe(&p, end, pool_id, out_err);
+       if (pool_id == CEPH_NOPOOL)
                goto out;       /* No parent?  No problem. */
 
        /* The ceph file layout needs to fit pool id in 32 bits */
 
        ret = -EIO;
-       if (parent_spec->pool_id > (u64)U32_MAX) {
+       if (pool_id > (u64)U32_MAX) {
                rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
-                       (unsigned long long)parent_spec->pool_id, U32_MAX);
+                       (unsigned long long)pool_id, U32_MAX);
                goto out_err;
        }
+       parent_spec->pool_id = pool_id;
 
        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
@@ -3658,9 +3662,14 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
        ceph_decode_64_safe(&p, end, overlap, out_err);
 
-       rbd_dev->parent_overlap = overlap;
-       rbd_dev->parent_spec = parent_spec;
-       parent_spec = NULL;     /* rbd_dev now owns this */
+       if (overlap) {
+               rbd_spec_put(rbd_dev->parent_spec);
+               rbd_dev->parent_spec = parent_spec;
+               parent_spec = NULL;     /* rbd_dev now owns this */
+               rbd_dev->parent_overlap = overlap;
+       } else {
+               rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+       }
 out:
        ret = 0;
 out_err:
@@ -4023,17 +4032,43 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
                        goto out;
        }
 
+       /*
+        * If the image supports layering, get the parent info.  We
+        * need to probe the first time regardless.  Thereafter we
+        * only need to if there's a parent, to see if it has
+        * disappeared due to the mapped image getting flattened.
+        */
+       if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
+                       (first_time || rbd_dev->parent_spec)) {
+               bool warn;
+
+               ret = rbd_dev_v2_parent_info(rbd_dev);
+               if (ret)
+                       goto out;
+
+               /*
+                * Print a warning if this is the initial probe and
+                * the image has a parent.  Don't print it if the
+                * image now being probed is itself a parent.  We
+                * can tell at this point because we won't know its
+                * pool name yet (just its pool id).
+                */
+               warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
+               if (first_time && warn)
+                       rbd_warn(rbd_dev, "WARNING: kernel layering "
+                                       "is EXPERIMENTAL!");
+       }
+
        ret = rbd_dev_v2_image_size(rbd_dev);
        if (ret)
                goto out;
+
        if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
                if (rbd_dev->mapping.size != rbd_dev->header.image_size)
                        rbd_dev->mapping.size = rbd_dev->header.image_size;
 
        ret = rbd_dev_v2_snap_context(rbd_dev);
        dout("rbd_dev_v2_snap_context returned %d\n", ret);
-       if (ret)
-               goto out;
 out:
        up_write(&rbd_dev->header_rwsem);
 
@@ -4487,24 +4522,6 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
        if (ret)
                goto out_err;
 
-       /* If the image supports layering, get the parent info */
-
-       if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
-               ret = rbd_dev_v2_parent_info(rbd_dev);
-               if (ret)
-                       goto out_err;
-               /*
-                * Print a warning if this image has a parent.
-                * Don't print it if the image now being probed
-                * is itself a parent.  We can tell at this point
-                * because we won't know its pool name yet (just its
-                * pool id).
-                */
-               if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
-                       rbd_warn(rbd_dev, "WARNING: kernel layering "
-                                       "is EXPERIMENTAL!");
-       }
-
        /* If the image supports fancy striping, get its parameters */
 
        if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
@@ -4516,11 +4533,7 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
 
        return 0;
 out_err:
-       rbd_dev->parent_overlap = 0;
-       rbd_spec_put(rbd_dev->parent_spec);
-       rbd_dev->parent_spec = NULL;
-       kfree(rbd_dev->header_name);
-       rbd_dev->header_name = NULL;
+       rbd_dev->header.features = 0;
        kfree(rbd_dev->header.object_prefix);
        rbd_dev->header.object_prefix = NULL;
 
@@ -4549,7 +4562,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
        if (!parent)
                goto out_err;
 
-       ret = rbd_dev_image_probe(parent);
+       ret = rbd_dev_image_probe(parent, false);
        if (ret < 0)
                goto out_err;
        rbd_dev->parent = parent;
@@ -4654,12 +4667,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
 
 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 {
-       int ret;
-
        rbd_dev_unprobe(rbd_dev);
-       ret = rbd_dev_header_watch_sync(rbd_dev, 0);
-       if (ret)
-               rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
        rbd_dev->image_format = 0;
@@ -4671,9 +4679,11 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 
 /*
  * Probe for the existence of the header object for the given rbd
- * device.
+ * device.  If this image is the one being mapped (i.e., not a
+ * parent), initiate a watch on its header object before using that
+ * object to get detailed information about the rbd image.
  */
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 {
        int ret;
        int tmp;
@@ -4693,9 +4703,11 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_format;
 
-       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
-       if (ret)
-               goto out_header_name;
+       if (mapping) {
+               ret = rbd_dev_header_watch_sync(rbd_dev, true);
+               if (ret)
+                       goto out_header_name;
+       }
 
        if (rbd_dev->image_format == 1)
                ret = rbd_dev_v1_header_info(rbd_dev);
@@ -4719,9 +4731,12 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
 err_out_probe:
        rbd_dev_unprobe(rbd_dev);
 err_out_watch:
-       tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
-       if (tmp)
-               rbd_warn(rbd_dev, "unable to tear down watch request\n");
+       if (mapping) {
+               tmp = rbd_dev_header_watch_sync(rbd_dev, false);
+               if (tmp)
+                       rbd_warn(rbd_dev, "unable to tear down "
+                                       "watch request (%d)\n", tmp);
+       }
 out_header_name:
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
@@ -4788,7 +4803,7 @@ static ssize_t rbd_add(struct bus_type *bus,
        rbdc = NULL;            /* rbd_dev now owns this */
        spec = NULL;            /* rbd_dev now owns this */
 
-       rc = rbd_dev_image_probe(rbd_dev);
+       rc = rbd_dev_image_probe(rbd_dev, true);
        if (rc < 0)
                goto err_out_rbd_dev;
 
@@ -4910,10 +4925,13 @@ static ssize_t rbd_remove(struct bus_type *bus,
        spin_unlock_irq(&rbd_dev->lock);
        if (ret < 0)
                goto done;
-       ret = count;
        rbd_bus_del_dev(rbd_dev);
+       ret = rbd_dev_header_watch_sync(rbd_dev, false);
+       if (ret)
+               rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        rbd_dev_image_release(rbd_dev);
        module_put(THIS_MODULE);
+       ret = count;
 done:
        mutex_unlock(&ctl_mutex);