]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
rbd: take a little credit
[~andy/linux] / drivers / block / rbd.c
index d6d314027b5d45a95ee762ef3c55dcb0769e15a8..14c6dc92ef0e1db3c6f301de683ae96b59eb7199 100644 (file)
@@ -372,7 +372,7 @@ enum rbd_dev_flags {
        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 };
 
-static DEFINE_MUTEX(ctl_mutex);          /* Serialize open/close/setup/teardown */
+static DEFINE_MUTEX(client_mutex);     /* Serialize client creation */
 
 static LIST_HEAD(rbd_dev_list);    /* devices */
 static DEFINE_SPINLOCK(rbd_dev_list_lock);
@@ -489,10 +489,8 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
        if (removing)
                return -ENOENT;
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        (void) get_device(&rbd_dev->dev);
        set_device_ro(bdev, rbd_dev->mapping.read_only);
-       mutex_unlock(&ctl_mutex);
 
        return 0;
 }
@@ -507,9 +505,7 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
        spin_unlock_irq(&rbd_dev->lock);
        rbd_assert(open_count_before > 0);
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        put_device(&rbd_dev->dev);
-       mutex_unlock(&ctl_mutex);
 }
 
 static const struct block_device_operations rbd_bd_ops = {
@@ -519,8 +515,8 @@ static const struct block_device_operations rbd_bd_ops = {
 };
 
 /*
- * Initialize an rbd client instance.
- * We own *ceph_opts.
+ * Initialize an rbd client instance.  Success or not, this function
+ * consumes ceph_opts.  Caller holds client_mutex.
  */
 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 {
@@ -535,30 +531,25 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
        kref_init(&rbdc->kref);
        INIT_LIST_HEAD(&rbdc->node);
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
        rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
        if (IS_ERR(rbdc->client))
-               goto out_mutex;
+               goto out_rbdc;
        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 
        ret = ceph_open_session(rbdc->client);
        if (ret < 0)
-               goto out_err;
+               goto out_client;
 
        spin_lock(&rbd_client_list_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
        spin_unlock(&rbd_client_list_lock);
 
-       mutex_unlock(&ctl_mutex);
        dout("%s: rbdc %p\n", __func__, rbdc);
 
        return rbdc;
-
-out_err:
+out_client:
        ceph_destroy_client(rbdc->client);
-out_mutex:
-       mutex_unlock(&ctl_mutex);
+out_rbdc:
        kfree(rbdc);
 out_opt:
        if (ceph_opts)
@@ -675,17 +666,20 @@ static int parse_rbd_opts_token(char *c, void *private)
 
 /*
  * Get a ceph client with specific addr and configuration, if one does
- * not exist create it.
+ * not exist create it.  Either way, ceph_opts is consumed by this
+ * function.
  */
 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 {
        struct rbd_client *rbdc;
 
+       mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
        rbdc = rbd_client_find(ceph_opts);
        if (rbdc)       /* using an existing client */
                ceph_destroy_options(ceph_opts);
        else
                rbdc = rbd_client_create(ceph_opts);
+       mutex_unlock(&client_mutex);
 
        return rbdc;
 }
@@ -839,7 +833,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
 
        /* We won't fail any more, fill in the header */
 
-       down_write(&rbd_dev->header_rwsem);
        if (first_time) {
                header->object_prefix = object_prefix;
                header->obj_order = ondisk->options.order;
@@ -868,8 +861,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
                if (rbd_dev->mapping.size != header->image_size)
                        rbd_dev->mapping.size = header->image_size;
 
-       up_write(&rbd_dev->header_rwsem);
-
        return 0;
 out_2big:
        ret = -EIO;
@@ -1035,12 +1026,16 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
        char *name;
        u64 segment;
        int ret;
+       char *name_format;
 
        name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
        if (!name)
                return NULL;
        segment = offset >> rbd_dev->header.obj_order;
-       ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
+       name_format = "%s.%012llx";
+       if (rbd_dev->image_format == 2)
+               name_format = "%s.%016llx";
+       ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
                        rbd_dev->header.object_prefix, segment);
        if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
                pr_err("error formatting segment name for #%llu (%d)\n",
@@ -1121,6 +1116,7 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
                                buf = bvec_kmap_irq(bv, &flags);
                                memset(buf + remainder, 0,
                                       bv->bv_len - remainder);
+                               flush_dcache_page(bv->bv_page);
                                bvec_kunmap_irq(buf, &flags);
                        }
                        pos += bv->bv_len;
@@ -1148,11 +1144,12 @@ static void zero_pages(struct page **pages, u64 offset, u64 end)
                unsigned long flags;
                void *kaddr;
 
-               page_offset = (size_t)(offset & ~PAGE_MASK);
-               length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
+               page_offset = offset & ~PAGE_MASK;
+               length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
                local_irq_save(flags);
                kaddr = kmap_atomic(*page);
                memset(kaddr + page_offset, 0, length);
+               flush_dcache_page(*page);
                kunmap_atomic(kaddr);
                local_irq_restore(flags);
 
@@ -2247,13 +2244,17 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                                        obj_request->pages, length,
                                        offset & ~PAGE_MASK, false, false);
 
+               /*
+                * set obj_request->img_request before formatting
+                * the osd_request so that it gets the right snapc
+                */
+               rbd_img_obj_request_add(img_request, obj_request);
                if (write_request)
                        rbd_osd_req_format_write(obj_request);
                else
                        rbd_osd_req_format_read(obj_request);
 
                obj_request->img_offset = img_offset;
-               rbd_img_obj_request_add(img_request, obj_request);
 
                img_offset += length;
                resid -= length;
@@ -2526,6 +2527,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
         */
        orig_request = obj_request->obj_request;
        obj_request->obj_request = NULL;
+       rbd_obj_request_put(orig_request);
        rbd_assert(orig_request);
        rbd_assert(orig_request->img_request);
 
@@ -2546,7 +2548,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
        if (!rbd_dev->parent_overlap) {
                struct ceph_osd_client *osdc;
 
-               rbd_obj_request_put(orig_request);
                osdc = &rbd_dev->rbd_client->client->osdc;
                result = rbd_obj_request_submit(osdc, orig_request);
                if (!result)
@@ -2576,7 +2577,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
 out:
        if (orig_request->result)
                rbd_obj_request_complete(orig_request);
-       rbd_obj_request_put(orig_request);
 }
 
 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
@@ -2850,7 +2850,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
                (unsigned int)opcode);
        ret = rbd_dev_refresh(rbd_dev);
        if (ret)
-               rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
+               rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
 
        rbd_obj_notify_ack(rbd_dev, notify_id);
 }
@@ -3330,8 +3330,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
        int ret;
 
        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       down_write(&rbd_dev->header_rwsem);
        mapping_size = rbd_dev->mapping.size;
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        if (rbd_dev->image_format == 1)
                ret = rbd_dev_v1_header_info(rbd_dev);
        else
@@ -3340,7 +3340,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
        /* If it's a mapped snapshot, validate its EXISTS flag */
 
        rbd_exists_validate(rbd_dev);
-       mutex_unlock(&ctl_mutex);
+       up_write(&rbd_dev->header_rwsem);
+
        if (mapping_size != rbd_dev->mapping.size) {
                sector_t size;
 
@@ -3804,6 +3805,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        void *end;
        u64 pool_id;
        char *image_id;
+       u64 snap_id;
        u64 overlap;
        int ret;
 
@@ -3863,24 +3865,56 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
                        (unsigned long long)pool_id, U32_MAX);
                goto out_err;
        }
-       parent_spec->pool_id = pool_id;
 
        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
                ret = PTR_ERR(image_id);
                goto out_err;
        }
-       parent_spec->image_id = image_id;
-       ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
+       ceph_decode_64_safe(&p, end, snap_id, out_err);
        ceph_decode_64_safe(&p, end, overlap, out_err);
 
-       if (overlap) {
-               rbd_spec_put(rbd_dev->parent_spec);
+       /*
+        * The parent won't change (except when the clone is
+        * flattened, already handled that).  So we only need to
+        * record the parent spec we have not already done so.
+        */
+       if (!rbd_dev->parent_spec) {
+               parent_spec->pool_id = pool_id;
+               parent_spec->image_id = image_id;
+               parent_spec->snap_id = snap_id;
                rbd_dev->parent_spec = parent_spec;
                parent_spec = NULL;     /* rbd_dev now owns this */
-               rbd_dev->parent_overlap = overlap;
-       } else {
-               rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+       }
+
+       /*
+        * We always update the parent overlap.  If it's zero we
+        * treat it specially.
+        */
+       rbd_dev->parent_overlap = overlap;
+       smp_mb();
+       if (!overlap) {
+
+               /* A null parent_spec indicates it's the initial probe */
+
+               if (parent_spec) {
+                       /*
+                        * The overlap has become zero, so the clone
+                        * must have been resized down to 0 at some
+                        * point.  Treat this the same as a flatten.
+                        */
+                       rbd_dev_parent_put(rbd_dev);
+                       pr_info("%s: clone image now standalone\n",
+                               rbd_dev->disk->disk_name);
+               } else {
+                       /*
+                        * For the initial probe, if we find the
+                        * overlap is zero we just pretend there was
+                        * no parent image.
+                        */
+                       rbd_warn(rbd_dev, "ignoring parent of "
+                                               "clone with overlap 0\n");
+               }
        }
 out:
        ret = 0;
@@ -4236,12 +4270,14 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
        bool first_time = rbd_dev->header.object_prefix == NULL;
        int ret;
 
-       down_write(&rbd_dev->header_rwsem);
+       ret = rbd_dev_v2_image_size(rbd_dev);
+       if (ret)
+               return ret;
 
        if (first_time) {
                ret = rbd_dev_v2_header_onetime(rbd_dev);
                if (ret)
-                       goto out;
+                       return ret;
        }
 
        /*
@@ -4256,7 +4292,7 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 
                ret = rbd_dev_v2_parent_info(rbd_dev);
                if (ret)
-                       goto out;
+                       return ret;
 
                /*
                 * Print a warning if this is the initial probe and
@@ -4271,18 +4307,12 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
                                        "is EXPERIMENTAL!");
        }
 
-       ret = rbd_dev_v2_image_size(rbd_dev);
-       if (ret)
-               goto out;
-
        if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
                if (rbd_dev->mapping.size != rbd_dev->header.image_size)
                        rbd_dev->mapping.size = rbd_dev->header.image_size;
 
        ret = rbd_dev_v2_snap_context(rbd_dev);
        dout("rbd_dev_v2_snap_context returned %d\n", ret);
-out:
-       up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
@@ -4292,8 +4322,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
        struct device *dev;
        int ret;
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
        dev = &rbd_dev->dev;
        dev->bus = &rbd_bus_type;
        dev->type = &rbd_device_type;
@@ -4302,8 +4330,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
        dev_set_name(dev, "%d", rbd_dev->dev_id);
        ret = device_register(dev);
 
-       mutex_unlock(&ctl_mutex);
-
        return ret;
 }
 
@@ -4697,8 +4723,10 @@ out:
        return ret;
 }
 
-/* Undo whatever state changes are made by v1 or v2 image probe */
-
+/*
+ * Undo whatever state changes are made by v1 or v2 header info
+ * call.
+ */
 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 {
        struct rbd_image_header *header;
@@ -4902,9 +4930,10 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
        int tmp;
 
        /*
-        * Get the id from the image id object.  If it's not a
-        * format 2 image, we'll get ENOENT back, and we'll assume
-        * it's a format 1 image.
+        * Get the id from the image id object.  Unless there's an
+        * error, rbd_dev->spec->image_id will be filled in with
+        * a dynamically-allocated string, and rbd_dev->image_format
+        * will be set to either 1 or 2.
         */
        ret = rbd_dev_image_id(rbd_dev);
        if (ret)
@@ -4992,7 +5021,6 @@ static ssize_t rbd_add(struct bus_type *bus,
                rc = PTR_ERR(rbdc);
                goto err_out_args;
        }
-       ceph_opts = NULL;       /* rbd_dev client now owns this */
 
        /* pick the pool */
        osdc = &rbdc->client->osdc;
@@ -5027,18 +5055,18 @@ static ssize_t rbd_add(struct bus_type *bus,
        rbd_dev->mapping.read_only = read_only;
 
        rc = rbd_dev_device_setup(rbd_dev);
-       if (!rc)
-               return count;
+       if (rc) {
+               rbd_dev_image_release(rbd_dev);
+               goto err_out_module;
+       }
+
+       return count;
 
-       rbd_dev_image_release(rbd_dev);
 err_out_rbd_dev:
        rbd_dev_destroy(rbd_dev);
 err_out_client:
        rbd_put_client(rbdc);
 err_out_args:
-       if (ceph_opts)
-               ceph_destroy_options(ceph_opts);
-       kfree(rbd_opts);
        rbd_spec_put(spec);
 err_out_module:
        module_put(THIS_MODULE);
@@ -5048,23 +5076,6 @@ err_out_module:
        return (ssize_t)rc;
 }
 
-static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
-{
-       struct list_head *tmp;
-       struct rbd_device *rbd_dev;
-
-       spin_lock(&rbd_dev_list_lock);
-       list_for_each(tmp, &rbd_dev_list) {
-               rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_dev->dev_id == dev_id) {
-                       spin_unlock(&rbd_dev_list_lock);
-                       return rbd_dev;
-               }
-       }
-       spin_unlock(&rbd_dev_list_lock);
-       return NULL;
-}
-
 static void rbd_dev_device_release(struct device *dev)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
@@ -5109,8 +5120,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
                          size_t count)
 {
        struct rbd_device *rbd_dev = NULL;
-       int target_id;
+       struct list_head *tmp;
+       int dev_id;
        unsigned long ul;
+       bool already = false;
        int ret;
 
        ret = strict_strtoul(buf, 10, &ul);
@@ -5118,37 +5131,40 @@ static ssize_t rbd_remove(struct bus_type *bus,
                return ret;
 
        /* convert to int; abort if we lost anything in the conversion */
-       target_id = (int) ul;
-       if (target_id != ul)
+       dev_id = (int)ul;
+       if (dev_id != ul)
                return -EINVAL;
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
-       rbd_dev = __rbd_get_dev(target_id);
-       if (!rbd_dev) {
-               ret = -ENOENT;
-               goto done;
+       ret = -ENOENT;
+       spin_lock(&rbd_dev_list_lock);
+       list_for_each(tmp, &rbd_dev_list) {
+               rbd_dev = list_entry(tmp, struct rbd_device, node);
+               if (rbd_dev->dev_id == dev_id) {
+                       ret = 0;
+                       break;
+               }
+       }
+       if (!ret) {
+               spin_lock_irq(&rbd_dev->lock);
+               if (rbd_dev->open_count)
+                       ret = -EBUSY;
+               else
+                       already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
+                                                       &rbd_dev->flags);
+               spin_unlock_irq(&rbd_dev->lock);
        }
+       spin_unlock(&rbd_dev_list_lock);
+       if (ret < 0 || already)
+               return ret;
 
-       spin_lock_irq(&rbd_dev->lock);
-       if (rbd_dev->open_count)
-               ret = -EBUSY;
-       else
-               set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
-       spin_unlock_irq(&rbd_dev->lock);
-       if (ret < 0)
-               goto done;
        rbd_bus_del_dev(rbd_dev);
        ret = rbd_dev_header_watch_sync(rbd_dev, false);
        if (ret)
                rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        rbd_dev_image_release(rbd_dev);
        module_put(THIS_MODULE);
-       ret = count;
-done:
-       mutex_unlock(&ctl_mutex);
 
-       return ret;
+       return count;
 }
 
 /*
@@ -5256,6 +5272,7 @@ static void __exit rbd_exit(void)
 module_init(rbd_init);
 module_exit(rbd_exit);
 
+MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
 MODULE_DESCRIPTION("rados block device");