};
};
struct page **copyup_pages;
+ u32 copyup_page_count;
struct ceph_osd_request *osd_req;
struct rbd_obj_request *obj_request; /* obj req initiator */
};
struct page **copyup_pages;
+ u32 copyup_page_count;
spinlock_t completion_lock;/* protects next_completion */
u32 next_completion;
rbd_img_callback_t callback;
size_t count);
static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
size_t count);
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
static struct bus_attribute rbd_bus_attrs[] = {
__ATTR(add, S_IWUSR, NULL, rbd_add),
kref_put(&obj_request->kref, rbd_obj_request_destroy);
}
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
- dout("%s: img %p (was %d)\n", __func__, img_request,
- atomic_read(&img_request->kref.refcount));
- kref_get(&img_request->kref);
-}
-
static void rbd_img_request_destroy(struct kref *kref);
static void rbd_img_request_put(struct rbd_img_request *img_request)
{
}
if (child_request)
img_request_child_set(img_request);
- if (rbd_dev->parent_spec)
+ if (rbd_dev->parent_overlap)
img_request_layered_set(img_request);
spin_lock_init(&img_request->completion_lock);
img_request->next_completion = 0;
INIT_LIST_HEAD(&img_request->obj_requests);
kref_init(&img_request->kref);
- rbd_img_request_get(img_request); /* Avoid a warning */
- rbd_img_request_put(img_request); /* TEMPORARY */
-
dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
write_request ? "write" : "read", offset, length,
img_request);
{
struct rbd_img_request *img_request;
struct rbd_device *rbd_dev;
- u64 length;
+ struct page **pages;
u32 page_count;
rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev);
- length = (u64)1 << rbd_dev->header.obj_order;
- page_count = (u32)calc_pages_for(0, length);
- rbd_assert(obj_request->copyup_pages);
- ceph_release_page_vector(obj_request->copyup_pages, page_count);
+ pages = obj_request->copyup_pages;
+ rbd_assert(pages != NULL);
obj_request->copyup_pages = NULL;
+ page_count = obj_request->copyup_page_count;
+ rbd_assert(page_count);
+ obj_request->copyup_page_count = 0;
+ ceph_release_page_vector(pages, page_count);
/*
* We want the transfer count to reflect the size of the
struct ceph_osd_client *osdc;
struct rbd_device *rbd_dev;
struct page **pages;
+ u32 page_count;
int result;
- u64 obj_size;
- u64 xferred;
+ u64 parent_length;
+ u64 offset;
+ u64 length;
rbd_assert(img_request_child_test(img_request));
pages = img_request->copyup_pages;
rbd_assert(pages != NULL);
img_request->copyup_pages = NULL;
+ page_count = img_request->copyup_page_count;
+ rbd_assert(page_count);
+ img_request->copyup_page_count = 0;
orig_request = img_request->obj_request;
rbd_assert(orig_request != NULL);
- rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
+ rbd_assert(obj_request_type_valid(orig_request->type));
result = img_request->result;
- obj_size = img_request->length;
- xferred = img_request->xferred;
+ parent_length = img_request->length;
+ rbd_assert(parent_length == img_request->xferred);
rbd_img_request_put(img_request);
rbd_assert(orig_request->img_request);
rbd_dev = orig_request->img_request->rbd_dev;
rbd_assert(rbd_dev);
- rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
if (result)
goto out_err;
- /* Allocate the new copyup osd request for the original request */
-
+ /*
+ * The original osd request is of no use to use any more.
+ * We need a new one that can hold the two ops in a copyup
+ * request. Allocate the new copyup osd request for the
+ * original request, and release the old one.
+ */
result = -ENOMEM;
- rbd_assert(!orig_request->osd_req);
osd_req = rbd_osd_req_create_copyup(orig_request);
if (!osd_req)
goto out_err;
+ rbd_osd_req_destroy(orig_request->osd_req);
orig_request->osd_req = osd_req;
orig_request->copyup_pages = pages;
+ orig_request->copyup_page_count = page_count;
/* Initialize the copyup op */
osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
- osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+ osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
false, false);
/* Then the original write request op */
+ offset = orig_request->offset;
+ length = orig_request->length;
osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
- orig_request->offset,
- orig_request->length, 0, 0);
- osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
- orig_request->length);
+ offset, length, 0, 0);
+ if (orig_request->type == OBJ_REQUEST_BIO)
+ osd_req_op_extent_osd_data_bio(osd_req, 1,
+ orig_request->bio_list, length);
+ else
+ osd_req_op_extent_osd_data_pages(osd_req, 1,
+ orig_request->pages, length,
+ offset & ~PAGE_MASK, false, false);
rbd_osd_req_format_write(orig_request);
int result;
rbd_assert(obj_request_img_data_test(obj_request));
- rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+ rbd_assert(obj_request_type_valid(obj_request->type));
img_request = obj_request->img_request;
rbd_assert(img_request != NULL);
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev->parent != NULL);
- /*
- * First things first. The original osd request is of no
- * use to use any more, we'll need a new one that can hold
- * the two ops in a copyup request. We'll get that later,
- * but for now we can release the old one.
- */
- rbd_osd_req_destroy(obj_request->osd_req);
- obj_request->osd_req = NULL;
-
/*
* Determine the byte range covered by the object in the
* child image to which the original request was to be sent.
if (result)
goto out_err;
parent_request->copyup_pages = pages;
+ parent_request->copyup_page_count = page_count;
parent_request->callback = rbd_img_obj_parent_read_full_callback;
result = rbd_img_request_submit(parent_request);
return 0;
parent_request->copyup_pages = NULL;
+ parent_request->copyup_page_count = 0;
parent_request->obj_request = NULL;
rbd_obj_request_put(obj_request);
out_err:
* Request sync osd watch/unwatch. The value of "start" determines
* whether a watch request is being initiated or torn down.
*/
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
rbd_dev->watch_request->osd_req);
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
- rbd_dev->watch_event->cookie, 0, start);
+ rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
rbd_osd_req_format_write(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
__le64 snapid;
void *p;
void *end;
+ u64 pool_id;
char *image_id;
u64 overlap;
int ret;
p = reply_buf;
end = reply_buf + ret;
ret = -ERANGE;
- ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
- if (parent_spec->pool_id == CEPH_NOPOOL)
+ ceph_decode_64_safe(&p, end, pool_id, out_err);
+ if (pool_id == CEPH_NOPOOL)
goto out; /* No parent? No problem. */
/* The ceph file layout needs to fit pool id in 32 bits */
ret = -EIO;
- if (parent_spec->pool_id > (u64)U32_MAX) {
+ if (pool_id > (u64)U32_MAX) {
rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
- (unsigned long long)parent_spec->pool_id, U32_MAX);
+ (unsigned long long)pool_id, U32_MAX);
goto out_err;
}
+ parent_spec->pool_id = pool_id;
image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
if (IS_ERR(image_id)) {
ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
ceph_decode_64_safe(&p, end, overlap, out_err);
- rbd_dev->parent_overlap = overlap;
- rbd_dev->parent_spec = parent_spec;
- parent_spec = NULL; /* rbd_dev now owns this */
+ if (overlap) {
+ rbd_spec_put(rbd_dev->parent_spec);
+ rbd_dev->parent_spec = parent_spec;
+ parent_spec = NULL; /* rbd_dev now owns this */
+ rbd_dev->parent_overlap = overlap;
+ } else {
+ rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+ }
out:
ret = 0;
out_err:
goto out;
}
+ /*
+ * If the image supports layering, get the parent info. We
+ * need to probe the first time regardless. Thereafter we
+ * only need to if there's a parent, to see if it has
+ * disappeared due to the mapped image getting flattened.
+ */
+ if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
+ (first_time || rbd_dev->parent_spec)) {
+ bool warn;
+
+ ret = rbd_dev_v2_parent_info(rbd_dev);
+ if (ret)
+ goto out;
+
+ /*
+ * Print a warning if this is the initial probe and
+ * the image has a parent. Don't print it if the
+ * image now being probed is itself a parent. We
+ * can tell at this point because we won't know its
+ * pool name yet (just its pool id).
+ */
+ warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
+ if (first_time && warn)
+ rbd_warn(rbd_dev, "WARNING: kernel layering "
+ "is EXPERIMENTAL!");
+ }
+
ret = rbd_dev_v2_image_size(rbd_dev);
if (ret)
goto out;
+
if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
if (rbd_dev->mapping.size != rbd_dev->header.image_size)
rbd_dev->mapping.size = rbd_dev->header.image_size;
ret = rbd_dev_v2_snap_context(rbd_dev);
dout("rbd_dev_v2_snap_context returned %d\n", ret);
- if (ret)
- goto out;
out:
up_write(&rbd_dev->header_rwsem);
if (ret)
goto out_err;
- /* If the image supports layering, get the parent info */
-
- if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
- ret = rbd_dev_v2_parent_info(rbd_dev);
- if (ret)
- goto out_err;
- /*
- * Print a warning if this image has a parent.
- * Don't print it if the image now being probed
- * is itself a parent. We can tell at this point
- * because we won't know its pool name yet (just its
- * pool id).
- */
- if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
- rbd_warn(rbd_dev, "WARNING: kernel layering "
- "is EXPERIMENTAL!");
- }
-
/* If the image supports fancy striping, get its parameters */
if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
return 0;
out_err:
- rbd_dev->parent_overlap = 0;
- rbd_spec_put(rbd_dev->parent_spec);
- rbd_dev->parent_spec = NULL;
- kfree(rbd_dev->header_name);
- rbd_dev->header_name = NULL;
+ rbd_dev->header.features = 0;
kfree(rbd_dev->header.object_prefix);
rbd_dev->header.object_prefix = NULL;
if (!parent)
goto out_err;
- ret = rbd_dev_image_probe(parent);
+ ret = rbd_dev_image_probe(parent, false);
if (ret < 0)
goto out_err;
rbd_dev->parent = parent;
static void rbd_dev_image_release(struct rbd_device *rbd_dev)
{
- int ret;
-
rbd_dev_unprobe(rbd_dev);
- ret = rbd_dev_header_watch_sync(rbd_dev, 0);
- if (ret)
- rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
kfree(rbd_dev->header_name);
rbd_dev->header_name = NULL;
rbd_dev->image_format = 0;
/*
* Probe for the existence of the header object for the given rbd
- * device.
+ * device. If this image is the one being mapped (i.e., not a
+ * parent), initiate a watch on its header object before using that
+ * object to get detailed information about the rbd image.
*/
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
{
int ret;
int tmp;
if (ret)
goto err_out_format;
- ret = rbd_dev_header_watch_sync(rbd_dev, 1);
- if (ret)
- goto out_header_name;
+ if (mapping) {
+ ret = rbd_dev_header_watch_sync(rbd_dev, true);
+ if (ret)
+ goto out_header_name;
+ }
if (rbd_dev->image_format == 1)
ret = rbd_dev_v1_header_info(rbd_dev);
err_out_probe:
rbd_dev_unprobe(rbd_dev);
err_out_watch:
- tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
- if (tmp)
- rbd_warn(rbd_dev, "unable to tear down watch request\n");
+ if (mapping) {
+ tmp = rbd_dev_header_watch_sync(rbd_dev, false);
+ if (tmp)
+ rbd_warn(rbd_dev, "unable to tear down "
+ "watch request (%d)\n", tmp);
+ }
out_header_name:
kfree(rbd_dev->header_name);
rbd_dev->header_name = NULL;
rbdc = NULL; /* rbd_dev now owns this */
spec = NULL; /* rbd_dev now owns this */
- rc = rbd_dev_image_probe(rbd_dev);
+ rc = rbd_dev_image_probe(rbd_dev, true);
if (rc < 0)
goto err_out_rbd_dev;
spin_unlock_irq(&rbd_dev->lock);
if (ret < 0)
goto done;
- ret = count;
rbd_bus_del_dev(rbd_dev);
+ ret = rbd_dev_header_watch_sync(rbd_dev, false);
+ if (ret)
+ rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
rbd_dev_image_release(rbd_dev);
module_put(THIS_MODULE);
+ ret = count;
done:
mutex_unlock(&ctl_mutex);