]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
rbd: kill __rbd_remove()
[~andy/linux] / drivers / block / rbd.c
index 2e2e9c35b4e57d8e8974a79b19548ddf4a0da27b..0bae4e74555d7b6c92ecc5491e063cae22f79ec3 100644 (file)
@@ -365,7 +365,7 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
                          size_t count);
-static int rbd_dev_probe(struct rbd_device *rbd_dev);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
 
 static struct bus_attribute rbd_bus_attrs[] = {
        __ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -427,8 +427,9 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
 #  define rbd_assert(expr)     ((void) 0)
 #endif /* !RBD_DEBUG */
 
-static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
+static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
+static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
@@ -671,6 +672,35 @@ static void rbd_client_release(struct kref *kref)
        kfree(rbdc);
 }
 
+/* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
+
+static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
+{
+       struct ceph_snap_context *snapc;
+       size_t size;
+
+       size = sizeof (struct ceph_snap_context);
+       size += snap_count * sizeof (snapc->snaps[0]);
+       snapc = kzalloc(size, GFP_KERNEL);
+       if (!snapc)
+               return NULL;
+
+       atomic_set(&snapc->nref, 1);
+       snapc->num_snaps = snap_count;
+
+       return snapc;
+}
+
+static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
+{
+       (void)ceph_get_snap_context(snapc);
+}
+
+static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
+{
+       ceph_put_snap_context(snapc);
+}
+
 /*
  * Drop reference to ceph client node. If it's not referenced anymore, release
  * it.
@@ -789,18 +819,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
        /* Allocate and fill in the snapshot context */
 
        header->image_size = le64_to_cpu(ondisk->image_size);
-       size = sizeof (struct ceph_snap_context);
-       size += snap_count * sizeof (header->snapc->snaps[0]);
-       header->snapc = kzalloc(size, GFP_KERNEL);
+
+       header->snapc = rbd_snap_context_create(snap_count);
        if (!header->snapc)
                goto out_err;
-
-       atomic_set(&header->snapc->nref, 1);
        header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
-       header->snapc->num_snaps = snap_count;
        for (i = 0; i < snap_count; i++)
-               header->snapc->snaps[i] =
-                       le64_to_cpu(ondisk->snaps[i].id);
+               header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
 
        return 0;
 
@@ -841,7 +866,7 @@ static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
        return NULL;
 }
 
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
+static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
 {
        if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
                    sizeof (RBD_SNAP_HEAD_NAME))) {
@@ -857,11 +882,17 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
                rbd_dev->mapping.features = snap->features;
                rbd_dev->mapping.read_only = true;
        }
-       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 
        return 0;
 }
 
+static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
+{
+       rbd_dev->mapping.size = 0;
+       rbd_dev->mapping.features = 0;
+       rbd_dev->mapping.read_only = true;
+}
+
 static void rbd_header_free(struct rbd_image_header *header)
 {
        kfree(header->object_prefix);
@@ -870,7 +901,7 @@ static void rbd_header_free(struct rbd_image_header *header)
        header->snap_sizes = NULL;
        kfree(header->snap_names);
        header->snap_names = NULL;
-       ceph_put_snap_context(header->snapc);
+       rbd_snap_context_put(header->snapc);
        header->snapc = NULL;
 }
 
@@ -1720,7 +1751,6 @@ static struct rbd_img_request *rbd_img_request_create(
                                        bool child_request)
 {
        struct rbd_img_request *img_request;
-       struct ceph_snap_context *snapc = NULL;
 
        img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
        if (!img_request)
@@ -1728,13 +1758,8 @@ static struct rbd_img_request *rbd_img_request_create(
 
        if (write_request) {
                down_read(&rbd_dev->header_rwsem);
-               snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+               rbd_snap_context_get(rbd_dev->header.snapc);
                up_read(&rbd_dev->header_rwsem);
-               if (WARN_ON(!snapc)) {
-                       kfree(img_request);
-                       return NULL;    /* Shouldn't happen */
-               }
-
        }
 
        img_request->rq = NULL;
@@ -1744,7 +1769,7 @@ static struct rbd_img_request *rbd_img_request_create(
        img_request->flags = 0;
        if (write_request) {
                img_request_write_set(img_request);
-               img_request->snapc = snapc;
+               img_request->snapc = rbd_dev->header.snapc;
        } else {
                img_request->snap_id = rbd_dev->spec->snap_id;
        }
@@ -1785,7 +1810,7 @@ static void rbd_img_request_destroy(struct kref *kref)
        rbd_assert(img_request->obj_request_count == 0);
 
        if (img_request_write_test(img_request))
-               ceph_put_snap_context(img_request->snapc);
+               rbd_snap_context_put(img_request->snapc);
 
        if (img_request_child_test(img_request))
                rbd_obj_request_put(img_request->obj_request);
@@ -3016,15 +3041,17 @@ static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 
 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
 {
-       sector_t size;
-
        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
                return;
 
-       size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
-       dout("setting size to %llu sectors", (unsigned long long) size);
-       rbd_dev->mapping.size = (u64) size;
-       set_capacity(rbd_dev->disk, size);
+       if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
+               sector_t size;
+
+               rbd_dev->mapping.size = rbd_dev->header.image_size;
+               size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
+               dout("setting size to %llu sectors", (unsigned long long)size);
+               set_capacity(rbd_dev->disk, size);
+       }
 }
 
 /*
@@ -3049,7 +3076,7 @@ static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
        kfree(rbd_dev->header.snap_sizes);
        kfree(rbd_dev->header.snap_names);
        /* osd requests may still refer to snapc */
-       ceph_put_snap_context(rbd_dev->header.snapc);
+       rbd_snap_context_put(rbd_dev->header.snapc);
 
        if (hver)
                *hver = h.obj_version;
@@ -3128,8 +3155,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 
        rbd_dev->disk = disk;
 
-       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
-
        return 0;
 out_disk:
        put_disk(disk);
@@ -3150,13 +3175,9 @@ static ssize_t rbd_size_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       sector_t size;
-
-       down_read(&rbd_dev->header_rwsem);
-       size = get_capacity(rbd_dev->disk);
-       up_read(&rbd_dev->header_rwsem);
 
-       return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
+       return sprintf(buf, "%llu\n",
+               (unsigned long long)rbd_dev->mapping.size);
 }
 
 /*
@@ -3169,7 +3190,7 @@ static ssize_t rbd_features_show(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "0x%016llx\n",
-                       (unsigned long long) rbd_dev->mapping.features);
+                       (unsigned long long)rbd_dev->mapping.features);
 }
 
 static ssize_t rbd_major_show(struct device *dev,
@@ -3177,7 +3198,11 @@ static ssize_t rbd_major_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%d\n", rbd_dev->major);
+       if (rbd_dev->major)
+               return sprintf(buf, "%d\n", rbd_dev->major);
+
+       return sprintf(buf, "(none)\n");
+
 }
 
 static ssize_t rbd_client_id_show(struct device *dev,
@@ -3203,7 +3228,7 @@ static ssize_t rbd_pool_id_show(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%llu\n",
-               (unsigned long long) rbd_dev->spec->pool_id);
+                       (unsigned long long) rbd_dev->spec->pool_id);
 }
 
 static ssize_t rbd_name_show(struct device *dev,
@@ -3407,8 +3432,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
 {
-       rbd_spec_put(rbd_dev->parent_spec);
-       kfree(rbd_dev->header_name);
        rbd_put_client(rbd_dev->rbd_client);
        rbd_spec_put(rbd_dev->spec);
        kfree(rbd_dev);
@@ -3758,83 +3781,88 @@ out:
 }
 
 /*
- * When a parent image gets probed, we only have the pool, image,
- * and snapshot ids but not the names of any of them.  This call
- * is made later to fill in those names.  It has to be done after
- * rbd_dev_snaps_update() has completed because some of the
- * information (in particular, snapshot name) is not available
- * until then.
+ * When an rbd image has a parent image, it is identified by the
+ * pool, image, and snapshot ids (not names).  This function fills
+ * in the names for those ids.  (It's OK if we can't figure out the
+ * name for an image id, but the pool and snapshot ids should always
+ * exist and have names.)  All names in an rbd spec are dynamically
+ * allocated.
  *
  * When an image being mapped (not a parent) is probed, we have the
  * pool name and pool id, image name and image id, and the snapshot
  * name.  The only thing we're missing is the snapshot id.
+ *
+ * The set of snapshots for an image is not known until they have
+ * been read by rbd_dev_snaps_update(), so we can't completely fill
+ * in this information until after that has been called.
  */
-static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
+static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 {
-       struct ceph_osd_client *osdc;
-       const char *name;
-       void *reply_buf = NULL;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_spec *spec = rbd_dev->spec;
+       const char *pool_name;
+       const char *image_name;
+       const char *snap_name;
        int ret;
 
        /*
         * An image being mapped will have the pool name (etc.), but
         * we need to look up the snapshot id.
         */
-       if (rbd_dev->spec->pool_name) {
-               if (strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) {
+       if (spec->pool_name) {
+               if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
                        struct rbd_snap *snap;
 
-                       snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
+                       snap = snap_by_name(rbd_dev, spec->snap_name);
                        if (!snap)
                                return -ENOENT;
-                       rbd_dev->spec->snap_id = snap->id;
+                       spec->snap_id = snap->id;
                } else {
-                       rbd_dev->spec->snap_id = CEPH_NOSNAP;
+                       spec->snap_id = CEPH_NOSNAP;
                }
 
                return 0;
        }
 
-       /* Look up the pool name */
+       /* Get the pool name; we have to make our own copy of this */
 
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
-       if (!name) {
-               rbd_warn(rbd_dev, "there is no pool with id %llu",
-                       rbd_dev->spec->pool_id);        /* Really a BUG() */
+       pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
+       if (!pool_name) {
+               rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
                return -EIO;
        }
-
-       rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
-       if (!rbd_dev->spec->pool_name)
+       pool_name = kstrdup(pool_name, GFP_KERNEL);
+       if (!pool_name)
                return -ENOMEM;
 
        /* Fetch the image name; tolerate failure here */
 
-       name = rbd_dev_image_name(rbd_dev);
-       if (name)
-               rbd_dev->spec->image_name = (char *)name;
-       else
+       image_name = rbd_dev_image_name(rbd_dev);
+       if (!image_name)
                rbd_warn(rbd_dev, "unable to get image name");
 
-       /* Look up the snapshot name. */
+       /* Look up the snapshot name, and make a copy */
 
-       name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
-       if (!name) {
-               rbd_warn(rbd_dev, "no snapshot with id %llu",
-                       rbd_dev->spec->snap_id);        /* Really a BUG() */
+       snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
+       if (!snap_name) {
+               rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
                ret = -EIO;
                goto out_err;
        }
-       rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
-       if(!rbd_dev->spec->snap_name)
+       snap_name = kstrdup(snap_name, GFP_KERNEL);
+       if (!snap_name) {
+               ret = -ENOMEM;
                goto out_err;
+       }
+
+       spec->pool_name = pool_name;
+       spec->image_name = image_name;
+       spec->snap_name = snap_name;
 
        return 0;
 out_err:
-       kfree(reply_buf);
-       kfree(rbd_dev->spec->pool_name);
-       rbd_dev->spec->pool_name = NULL;
+       kfree(image_name);
+       kfree(pool_name);
 
        return ret;
 }
@@ -3889,19 +3917,14 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
        }
        if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
                goto out;
+       ret = 0;
 
-       size = sizeof (struct ceph_snap_context) +
-                               snap_count * sizeof (snapc->snaps[0]);
-       snapc = kmalloc(size, GFP_KERNEL);
+       snapc = rbd_snap_context_create(snap_count);
        if (!snapc) {
                ret = -ENOMEM;
                goto out;
        }
-       ret = 0;
-
-       atomic_set(&snapc->nref, 1);
        snapc->seq = seq;
-       snapc->num_snaps = snap_count;
        for (i = 0; i < snap_count; i++)
                snapc->snaps[i] = ceph_decode_64(&p);
 
@@ -4687,11 +4710,49 @@ out_err:
        return ret;
 }
 
-static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
+static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
 {
        struct rbd_device *parent = NULL;
-       struct rbd_spec *parent_spec = NULL;
-       struct rbd_client *rbdc = NULL;
+       struct rbd_spec *parent_spec;
+       struct rbd_client *rbdc;
+       int ret;
+
+       if (!rbd_dev->parent_spec)
+               return 0;
+       /*
+        * We need to pass a reference to the client and the parent
+        * spec when creating the parent rbd_dev.  Images related by
+        * parent/child relationships always share both.
+        */
+       parent_spec = rbd_spec_get(rbd_dev->parent_spec);
+       rbdc = __rbd_get_client(rbd_dev->rbd_client);
+
+       ret = -ENOMEM;
+       parent = rbd_dev_create(rbdc, parent_spec);
+       if (!parent)
+               goto out_err;
+
+       ret = rbd_dev_image_probe(parent);
+       if (ret < 0)
+               goto out_err;
+       rbd_dev->parent = parent;
+
+       return 0;
+out_err:
+       if (parent) {
+               rbd_spec_put(rbd_dev->parent_spec);
+               kfree(rbd_dev->header_name);
+               rbd_dev_destroy(parent);
+       } else {
+               rbd_put_client(rbdc);
+               rbd_spec_put(parent_spec);
+       }
+
+       return ret;
+}
+
+static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
+{
        int ret;
 
        /* no need to lock here, as rbd_dev is not registered yet */
@@ -4699,11 +4760,15 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                return ret;
 
-       ret = rbd_dev_probe_update_spec(rbd_dev);
+       ret = rbd_dev_spec_update(rbd_dev);
+       if (ret)
+               goto err_out_snaps;
+
+       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
        if (ret)
                goto err_out_snaps;
 
-       ret = rbd_dev_set_mapping(rbd_dev);
+       ret = rbd_dev_mapping_set(rbd_dev);
        if (ret)
                goto err_out_snaps;
 
@@ -4732,41 +4797,14 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_disk;
 
-       /*
-        * At this point cleanup in the event of an error is the job
-        * of the sysfs code (initiated by rbd_bus_del_dev()).
-        */
-       /* Probe the parent if there is one */
-
-       if (rbd_dev->parent_spec) {
-               /*
-                * We need to pass a reference to the client and the
-                * parent spec when creating the parent rbd_dev.
-                * Images related by parent/child relationships
-                * always share both.
-                */
-               parent_spec = rbd_spec_get(rbd_dev->parent_spec);
-               rbdc = __rbd_get_client(rbd_dev->rbd_client);
-
-               parent = rbd_dev_create(rbdc, parent_spec);
-               if (!parent) {
-                       ret = -ENOMEM;
-                       goto err_out_spec;
-               }
-               rbdc = NULL;            /* parent now owns reference */
-               parent_spec = NULL;     /* parent now owns reference */
-               ret = rbd_dev_probe(parent);
-               if (ret < 0)
-                       goto err_out_parent;
-               rbd_dev->parent = parent;
-       }
-
-       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
+       ret = rbd_dev_probe_parent(rbd_dev);
        if (ret)
                goto err_out_bus;
 
        /* Everything's ready.  Announce the disk to the world. */
 
+       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        add_disk(rbd_dev->disk);
 
        pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
@@ -4774,11 +4812,6 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
 
        return ret;
 
-err_out_parent:
-       rbd_dev_destroy(parent);
-err_out_spec:
-       rbd_spec_put(parent_spec);
-       rbd_put_client(rbdc);
 err_out_bus:
        /* this will also clean up rest of rbd_dev stuff */
 
@@ -4791,6 +4824,7 @@ err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_id:
        rbd_dev_id_put(rbd_dev);
+       rbd_dev_mapping_clear(rbd_dev);
 err_out_snaps:
        rbd_remove_all_snaps(rbd_dev);
 
@@ -4802,7 +4836,7 @@ err_out_snaps:
  * device.  For format 2 images this includes determining the image
  * id.
  */
-static int rbd_dev_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
 {
        int ret;
 
@@ -4891,12 +4925,14 @@ static ssize_t rbd_add(struct bus_type *bus,
        kfree(rbd_opts);
        rbd_opts = NULL;        /* done with this */
 
-       rc = rbd_dev_probe(rbd_dev);
+       rc = rbd_dev_image_probe(rbd_dev);
        if (rc < 0)
                goto err_out_rbd_dev;
 
        return count;
 err_out_rbd_dev:
+       rbd_spec_put(rbd_dev->parent_spec);
+       kfree(rbd_dev->header_name);
        rbd_dev_destroy(rbd_dev);
 err_out_client:
        rbd_put_client(rbdc);
@@ -4946,17 +4982,38 @@ static void rbd_dev_release(struct device *dev)
 
        /* done with the id, and with the rbd_dev */
        rbd_dev_id_put(rbd_dev);
+       rbd_dev_mapping_clear(rbd_dev);
        rbd_assert(rbd_dev->rbd_client != NULL);
+       rbd_spec_put(rbd_dev->parent_spec);
+       kfree(rbd_dev->header_name);
        rbd_dev_destroy(rbd_dev);
 
        /* release module ref */
        module_put(THIS_MODULE);
 }
 
-static void __rbd_remove(struct rbd_device *rbd_dev)
+static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
 {
-       rbd_remove_all_snaps(rbd_dev);
-       rbd_bus_del_dev(rbd_dev);
+       while (rbd_dev->parent_spec) {
+               struct rbd_device *first = rbd_dev;
+               struct rbd_device *second = first->parent;
+               struct rbd_device *third;
+
+               /*
+                * Follow to the parent with no grandparent and
+                * remove it.
+                */
+               while (second && (third = second->parent)) {
+                       first = second;
+                       second = third;
+               }
+               rbd_remove_all_snaps(second);
+               rbd_bus_del_dev(second);
+               rbd_spec_put(first->parent_spec);
+               first->parent_spec = NULL;
+               first->parent_overlap = 0;
+               first->parent = NULL;
+       }
 }
 
 static ssize_t rbd_remove(struct bus_type *bus,
@@ -4994,27 +5051,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
        if (ret < 0)
                goto done;
 
-       while (rbd_dev->parent_spec) {
-               struct rbd_device *first = rbd_dev;
-               struct rbd_device *second = first->parent;
-               struct rbd_device *third;
-
-               /*
-                * Follow to the parent with no grandparent and
-                * remove it.
-                */
-               while (second && (third = second->parent)) {
-                       first = second;
-                       second = third;
-               }
-               __rbd_remove(second);
-               rbd_spec_put(first->parent_spec);
-               first->parent_spec = NULL;
-               first->parent_overlap = 0;
-               first->parent = NULL;
-       }
-       __rbd_remove(rbd_dev);
+       rbd_dev_remove_parent(rbd_dev);
 
+       rbd_remove_all_snaps(rbd_dev);
+       rbd_bus_del_dev(rbd_dev);
 done:
        mutex_unlock(&ctl_mutex);