]> Pileus Git - ~andy/linux/blobdiff - drivers/block/rbd.c
Merge branches 'for-3.10/wiimote' and 'for-3.9/upstream-fixes' into for-linus
[~andy/linux] / drivers / block / rbd.c
index 89576a0b3f2ed5b099ecf7ce675104d282b0dd48..f556f8a8b3f9b476949c6133f39778c49502e380 100644 (file)
 #define        SECTOR_SHIFT    9
 #define        SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
 
-/* It might be useful to have this defined elsewhere too */
+/* It might be useful to have these defined elsewhere */
 
-#define        U64_MAX ((u64) (~0ULL))
+#define        U8_MAX  ((u8)   (~0U))
+#define        U16_MAX ((u16)  (~0U))
+#define        U32_MAX ((u32)  (~0U))
+#define        U64_MAX ((u64)  (~0ULL))
 
 #define RBD_DRV_NAME "rbd"
 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
@@ -66,7 +69,6 @@
                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
 
 #define RBD_MAX_SNAP_COUNT     510     /* allows max snapc to fit in 4KB */
-#define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
@@ -93,8 +95,6 @@
 #define DEV_NAME_LEN           32
 #define MAX_INT_FORMAT_WIDTH   ((5 * sizeof (int)) / 2 + 1)
 
-#define RBD_READ_ONLY_DEFAULT          false
-
 /*
  * block device image metadata (in-memory version)
  */
@@ -119,16 +119,33 @@ struct rbd_image_header {
  * An rbd image specification.
  *
  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
- * identify an image.
+ * identify an image.  Each rbd_dev structure includes a pointer to
+ * an rbd_spec structure that encapsulates this identity.
+ *
+ * Each of the id's in an rbd_spec has an associated name.  For a
+ * user-mapped image, the names are supplied and the id's associated
+ * with them are looked up.  For a layered image, a parent image is
+ * defined by the tuple, and the names are looked up.
+ *
+ * An rbd_dev structure contains a parent_spec pointer which is
+ * non-null if the image it represents is a child in a layered
+ * image.  This pointer will refer to the rbd_spec structure used
+ * by the parent rbd_dev for its own identity (i.e., the structure
+ * is shared between the parent and child).
+ *
+ * Since these structures are populated once, during the discovery
+ * phase of image construction, they are effectively immutable so
+ * we make no effort to synchronize access to them.
+ *
+ * Note that code herein does not assume the image name is known (it
+ * could be a null pointer).
  */
 struct rbd_spec {
        u64             pool_id;
        char            *pool_name;
 
        char            *image_id;
-       size_t          image_id_len;
        char            *image_name;
-       size_t          image_name_len;
 
        u64             snap_id;
        char            *snap_name;
@@ -136,10 +153,6 @@ struct rbd_spec {
        struct kref     kref;
 };
 
-struct rbd_options {
-       bool    read_only;
-};
-
 /*
  * an instance of the client.  multiple devices may share an rbd client.
  */
@@ -149,37 +162,76 @@ struct rbd_client {
        struct list_head        node;
 };
 
-/*
- * a request completion status
- */
-struct rbd_req_status {
-       int done;
-       int rc;
-       u64 bytes;
+struct rbd_img_request;
+typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
+
+#define        BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
+
+struct rbd_obj_request;
+typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
+
+enum obj_request_type {
+       OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
 };
 
-/*
- * a collection of requests
- */
-struct rbd_req_coll {
-       int                     total;
-       int                     num_done;
+struct rbd_obj_request {
+       const char              *object_name;
+       u64                     offset;         /* object start byte */
+       u64                     length;         /* bytes from offset */
+
+       struct rbd_img_request  *img_request;
+       struct list_head        links;          /* img_request->obj_requests */
+       u32                     which;          /* posn image request list */
+
+       enum obj_request_type   type;
+       union {
+               struct bio      *bio_list;
+               struct {
+                       struct page     **pages;
+                       u32             page_count;
+               };
+       };
+
+       struct ceph_osd_request *osd_req;
+
+       u64                     xferred;        /* bytes transferred */
+       u64                     version;
+       int                     result;
+       atomic_t                done;
+
+       rbd_obj_callback_t      callback;
+       struct completion       completion;
+
        struct kref             kref;
-       struct rbd_req_status   status[0];
 };
 
-/*
- * a single io request
- */
-struct rbd_request {
-       struct request          *rq;            /* blk layer request */
-       struct bio              *bio;           /* cloned bio */
-       struct page             **pages;        /* list of used pages */
-       u64                     len;
-       int                     coll_index;
-       struct rbd_req_coll     *coll;
+struct rbd_img_request {
+       struct request          *rq;
+       struct rbd_device       *rbd_dev;
+       u64                     offset; /* starting image byte offset */
+       u64                     length; /* byte count from offset */
+       bool                    write_request;  /* false for read */
+       union {
+               struct ceph_snap_context *snapc;        /* for writes */
+               u64             snap_id;                /* for reads */
+       };
+       spinlock_t              completion_lock;/* protects next_completion */
+       u32                     next_completion;
+       rbd_img_callback_t      callback;
+
+       u32                     obj_request_count;
+       struct list_head        obj_requests;   /* rbd_obj_request structs */
+
+       struct kref             kref;
 };
 
+#define for_each_obj_request(ireq, oreq) \
+       list_for_each_entry(oreq, &(ireq)->obj_requests, links)
+#define for_each_obj_request_from(ireq, oreq) \
+       list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
+#define for_each_obj_request_safe(ireq, oreq, n) \
+       list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
+
 struct rbd_snap {
        struct  device          dev;
        const char              *name;
@@ -209,16 +261,18 @@ struct rbd_device {
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
-       spinlock_t              lock;           /* queue lock */
+       spinlock_t              lock;           /* queue, flags, open_count */
 
        struct rbd_image_header header;
-       bool                    exists;
+       unsigned long           flags;          /* possibly lock protected */
        struct rbd_spec         *spec;
 
        char                    *header_name;
 
+       struct ceph_file_layout layout;
+
        struct ceph_osd_event   *watch_event;
-       struct ceph_osd_request *watch_request;
+       struct rbd_obj_request  *watch_request;
 
        struct rbd_spec         *parent_spec;
        u64                     parent_overlap;
@@ -235,7 +289,19 @@ struct rbd_device {
 
        /* sysfs related */
        struct device           dev;
-       unsigned long           open_count;
+       unsigned long           open_count;     /* protected by lock */
+};
+
+/*
+ * Flag bits for rbd_dev->flags.  If atomicity is required,
+ * rbd_dev->lock is used to protect access.
+ *
+ * Currently, only the "removing" flag (which is coupled with the
+ * "open_count" field) requires atomic access.
+ */
+enum rbd_dev_flags {
+       RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
+       RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 };
 
 static DEFINE_MUTEX(ctl_mutex);          /* Serialize open/close/setup/teardown */
@@ -277,6 +343,33 @@ static struct device rbd_root_dev = {
        .release =      rbd_root_dev_release,
 };
 
+static __printf(2, 3)
+void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
+{
+       struct va_format vaf;
+       va_list args;
+
+       va_start(args, fmt);
+       vaf.fmt = fmt;
+       vaf.va = &args;
+
+       if (!rbd_dev)
+               printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
+       else if (rbd_dev->disk)
+               printk(KERN_WARNING "%s: %s: %pV\n",
+                       RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
+       else if (rbd_dev->spec && rbd_dev->spec->image_name)
+               printk(KERN_WARNING "%s: image %s: %pV\n",
+                       RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
+       else if (rbd_dev->spec && rbd_dev->spec->image_id)
+               printk(KERN_WARNING "%s: id %s: %pV\n",
+                       RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
+       else    /* punt */
+               printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
+                       RBD_DRV_NAME, rbd_dev, &vaf);
+       va_end(args);
+}
+
 #ifdef RBD_DEBUG
 #define rbd_assert(expr)                                               \
                if (unlikely(!(expr))) {                                \
@@ -296,14 +389,23 @@ static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+       bool removing = false;
 
        if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
                return -EROFS;
 
+       spin_lock_irq(&rbd_dev->lock);
+       if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
+               removing = true;
+       else
+               rbd_dev->open_count++;
+       spin_unlock_irq(&rbd_dev->lock);
+       if (removing)
+               return -ENOENT;
+
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        (void) get_device(&rbd_dev->dev);
        set_device_ro(bdev, rbd_dev->mapping.read_only);
-       rbd_dev->open_count++;
        mutex_unlock(&ctl_mutex);
 
        return 0;
@@ -312,10 +414,14 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
 static int rbd_release(struct gendisk *disk, fmode_t mode)
 {
        struct rbd_device *rbd_dev = disk->private_data;
+       unsigned long open_count_before;
+
+       spin_lock_irq(&rbd_dev->lock);
+       open_count_before = rbd_dev->open_count--;
+       spin_unlock_irq(&rbd_dev->lock);
+       rbd_assert(open_count_before > 0);
 
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       rbd_assert(rbd_dev->open_count > 0);
-       rbd_dev->open_count--;
        put_device(&rbd_dev->dev);
        mutex_unlock(&ctl_mutex);
 
@@ -337,7 +443,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
        struct rbd_client *rbdc;
        int ret = -ENOMEM;
 
-       dout("rbd_client_create\n");
+       dout("%s:\n", __func__);
        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
        if (!rbdc)
                goto out_opt;
@@ -361,8 +467,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
        spin_unlock(&rbd_client_list_lock);
 
        mutex_unlock(&ctl_mutex);
+       dout("%s: rbdc %p\n", __func__, rbdc);
 
-       dout("rbd_client_create created %p\n", rbdc);
        return rbdc;
 
 out_err:
@@ -373,6 +479,8 @@ out_mutex:
 out_opt:
        if (ceph_opts)
                ceph_destroy_options(ceph_opts);
+       dout("%s: error %d\n", __func__, ret);
+
        return ERR_PTR(ret);
 }
 
@@ -426,6 +534,12 @@ static match_table_t rbd_opts_tokens = {
        {-1, NULL}
 };
 
+struct rbd_options {
+       bool    read_only;
+};
+
+#define RBD_READ_ONLY_DEFAULT  false
+
 static int parse_rbd_opts_token(char *c, void *private)
 {
        struct rbd_options *rbd_opts = private;
@@ -493,7 +607,7 @@ static void rbd_client_release(struct kref *kref)
 {
        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 
-       dout("rbd_release_client %p\n", rbdc);
+       dout("%s: rbdc %p\n", __func__, rbdc);
        spin_lock(&rbd_client_list_lock);
        list_del(&rbdc->node);
        spin_unlock(&rbd_client_list_lock);
@@ -512,18 +626,6 @@ static void rbd_put_client(struct rbd_client *rbdc)
                kref_put(&rbdc->kref, rbd_client_release);
 }
 
-/*
- * Destroy requests collection
- */
-static void rbd_coll_release(struct kref *kref)
-{
-       struct rbd_req_coll *coll =
-               container_of(kref, struct rbd_req_coll, kref);
-
-       dout("rbd_coll_release %p\n", coll);
-       kfree(coll);
-}
-
 static bool rbd_image_format_valid(u32 image_format)
 {
        return image_format == 1 || image_format == 2;
@@ -707,7 +809,8 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
                        goto done;
                rbd_dev->mapping.read_only = true;
        }
-       rbd_dev->exists = true;
+       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+
 done:
        return ret;
 }
@@ -724,7 +827,7 @@ static void rbd_header_free(struct rbd_image_header *header)
        header->snapc = NULL;
 }
 
-static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
+static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 {
        char *name;
        u64 segment;
@@ -767,23 +870,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
        return length;
 }
 
-static int rbd_get_num_segments(struct rbd_image_header *header,
-                               u64 ofs, u64 len)
-{
-       u64 start_seg;
-       u64 end_seg;
-
-       if (!len)
-               return 0;
-       if (len - 1 > U64_MAX - ofs)
-               return -ERANGE;
-
-       start_seg = ofs >> header->obj_order;
-       end_seg = (ofs + len - 1) >> header->obj_order;
-
-       return end_seg - start_seg + 1;
-}
-
 /*
  * returns the size of an object in the image
  */
@@ -949,8 +1035,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
                unsigned int bi_size;
                struct bio *bio;
 
-               if (!bi)
+               if (!bi) {
+                       rbd_warn(NULL, "bio_chain exhausted with %u left", len);
                        goto out_err;   /* EINVAL; ran out of bio's */
+               }
                bi_size = min_t(unsigned int, bi->bi_size - off, len);
                bio = bio_clone_range(bi, off, bi_size, gfpmask);
                if (!bio)
@@ -976,399 +1064,734 @@ out_err:
        return NULL;
 }
 
-/*
- * helpers for osd request op vectors.
- */
-static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
-                                       int opcode, u32 payload_len)
+static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
 {
-       struct ceph_osd_req_op *ops;
+       dout("%s: obj %p (was %d)\n", __func__, obj_request,
+               atomic_read(&obj_request->kref.refcount));
+       kref_get(&obj_request->kref);
+}
 
-       ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
-       if (!ops)
+static void rbd_obj_request_destroy(struct kref *kref);
+static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
+{
+       rbd_assert(obj_request != NULL);
+       dout("%s: obj %p (was %d)\n", __func__, obj_request,
+               atomic_read(&obj_request->kref.refcount));
+       kref_put(&obj_request->kref, rbd_obj_request_destroy);
+}
+
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+       dout("%s: img %p (was %d)\n", __func__, img_request,
+               atomic_read(&img_request->kref.refcount));
+       kref_get(&img_request->kref);
+}
+
+static void rbd_img_request_destroy(struct kref *kref);
+static void rbd_img_request_put(struct rbd_img_request *img_request)
+{
+       rbd_assert(img_request != NULL);
+       dout("%s: img %p (was %d)\n", __func__, img_request,
+               atomic_read(&img_request->kref.refcount));
+       kref_put(&img_request->kref, rbd_img_request_destroy);
+}
+
+static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
+                                       struct rbd_obj_request *obj_request)
+{
+       rbd_assert(obj_request->img_request == NULL);
+
+       rbd_obj_request_get(obj_request);
+       obj_request->img_request = img_request;
+       obj_request->which = img_request->obj_request_count;
+       rbd_assert(obj_request->which != BAD_WHICH);
+       img_request->obj_request_count++;
+       list_add_tail(&obj_request->links, &img_request->obj_requests);
+       dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
+               obj_request->which);
+}
+
+static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
+                                       struct rbd_obj_request *obj_request)
+{
+       rbd_assert(obj_request->which != BAD_WHICH);
+
+       dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
+               obj_request->which);
+       list_del(&obj_request->links);
+       rbd_assert(img_request->obj_request_count > 0);
+       img_request->obj_request_count--;
+       rbd_assert(obj_request->which == img_request->obj_request_count);
+       obj_request->which = BAD_WHICH;
+       rbd_assert(obj_request->img_request == img_request);
+       obj_request->img_request = NULL;
+       obj_request->callback = NULL;
+       rbd_obj_request_put(obj_request);
+}
+
+static bool obj_request_type_valid(enum obj_request_type type)
+{
+       switch (type) {
+       case OBJ_REQUEST_NODATA:
+       case OBJ_REQUEST_BIO:
+       case OBJ_REQUEST_PAGES:
+               return true;
+       default:
+               return false;
+       }
+}
+
+static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
+{
+       struct ceph_osd_req_op *op;
+       va_list args;
+       size_t size;
+
+       op = kzalloc(sizeof (*op), GFP_NOIO);
+       if (!op)
                return NULL;
+       op->op = opcode;
+       va_start(args, opcode);
+       switch (opcode) {
+       case CEPH_OSD_OP_READ:
+       case CEPH_OSD_OP_WRITE:
+               /* rbd_osd_req_op_create(READ, offset, length) */
+               /* rbd_osd_req_op_create(WRITE, offset, length) */
+               op->extent.offset = va_arg(args, u64);
+               op->extent.length = va_arg(args, u64);
+               if (opcode == CEPH_OSD_OP_WRITE)
+                       op->payload_len = op->extent.length;
+               break;
+       case CEPH_OSD_OP_STAT:
+               break;
+       case CEPH_OSD_OP_CALL:
+               /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
+               op->cls.class_name = va_arg(args, char *);
+               size = strlen(op->cls.class_name);
+               rbd_assert(size <= (size_t) U8_MAX);
+               op->cls.class_len = size;
+               op->payload_len = size;
+
+               op->cls.method_name = va_arg(args, char *);
+               size = strlen(op->cls.method_name);
+               rbd_assert(size <= (size_t) U8_MAX);
+               op->cls.method_len = size;
+               op->payload_len += size;
+
+               op->cls.argc = 0;
+               op->cls.indata = va_arg(args, void *);
+               size = va_arg(args, size_t);
+               rbd_assert(size <= (size_t) U32_MAX);
+               op->cls.indata_len = (u32) size;
+               op->payload_len += size;
+               break;
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
+               /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
+               op->watch.cookie = va_arg(args, u64);
+               op->watch.ver = va_arg(args, u64);
+               op->watch.ver = cpu_to_le64(op->watch.ver);
+               if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
+                       op->watch.flag = (u8) 1;
+               break;
+       default:
+               rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
+               kfree(op);
+               op = NULL;
+               break;
+       }
+       va_end(args);
 
-       ops[0].op = opcode;
+       return op;
+}
 
-       /*
-        * op extent offset and length will be set later on
-        * in calc_raw_layout()
-        */
-       ops[0].payload_len = payload_len;
+static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
+{
+       kfree(op);
+}
+
+static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
+                               struct rbd_obj_request *obj_request)
+{
+       dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
+
+       return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
+}
 
-       return ops;
+static void rbd_img_request_complete(struct rbd_img_request *img_request)
+{
+       dout("%s: img %p\n", __func__, img_request);
+       if (img_request->callback)
+               img_request->callback(img_request);
+       else
+               rbd_img_request_put(img_request);
 }
 
-static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
+/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
+
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
 {
-       kfree(ops);
+       dout("%s: obj %p\n", __func__, obj_request);
+
+       return wait_for_completion_interruptible(&obj_request->completion);
 }
 
-static void rbd_coll_end_req_index(struct request *rq,
-                                  struct rbd_req_coll *coll,
-                                  int index,
-                                  int ret, u64 len)
+static void obj_request_done_init(struct rbd_obj_request *obj_request)
 {
-       struct request_queue *q;
-       int min, max, i;
+       atomic_set(&obj_request->done, 0);
+       smp_wmb();
+}
 
-       dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
-            coll, index, ret, (unsigned long long) len);
+static void obj_request_done_set(struct rbd_obj_request *obj_request)
+{
+       int done;
 
-       if (!rq)
-               return;
+       done = atomic_inc_return(&obj_request->done);
+       if (done > 1) {
+               struct rbd_img_request *img_request = obj_request->img_request;
+               struct rbd_device *rbd_dev;
 
-       if (!coll) {
-               blk_end_request(rq, ret, len);
-               return;
+               rbd_dev = img_request ? img_request->rbd_dev : NULL;
+               rbd_warn(rbd_dev, "obj_request %p was already done\n",
+                       obj_request);
        }
+}
 
-       q = rq->q;
-
-       spin_lock_irq(q->queue_lock);
-       coll->status[index].done = 1;
-       coll->status[index].rc = ret;
-       coll->status[index].bytes = len;
-       max = min = coll->num_done;
-       while (max < coll->total && coll->status[max].done)
-               max++;
-
-       for (i = min; i<max; i++) {
-               __blk_end_request(rq, coll->status[i].rc,
-                                 coll->status[i].bytes);
-               coll->num_done++;
-               kref_put(&coll->kref, rbd_coll_release);
+static bool obj_request_done_test(struct rbd_obj_request *obj_request)
+{
+       smp_mb();
+       return atomic_read(&obj_request->done) != 0;
+}
+
+static void
+rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
+{
+       dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
+               obj_request, obj_request->img_request, obj_request->result,
+               obj_request->xferred, obj_request->length);
+       /*
+        * ENOENT means a hole in the image.  We zero-fill the
+        * entire length of the request.  A short read also implies
+        * zero-fill to the end of the request.  Either way we
+        * update the xferred count to indicate the whole request
+        * was satisfied.
+        */
+       BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
+       if (obj_request->result == -ENOENT) {
+               zero_bio_chain(obj_request->bio_list, 0);
+               obj_request->result = 0;
+               obj_request->xferred = obj_request->length;
+       } else if (obj_request->xferred < obj_request->length &&
+                       !obj_request->result) {
+               zero_bio_chain(obj_request->bio_list, obj_request->xferred);
+               obj_request->xferred = obj_request->length;
        }
-       spin_unlock_irq(q->queue_lock);
+       obj_request_done_set(obj_request);
+}
+
+static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
+{
+       dout("%s: obj %p cb %p\n", __func__, obj_request,
+               obj_request->callback);
+       if (obj_request->callback)
+               obj_request->callback(obj_request);
+       else
+               complete_all(&obj_request->completion);
+}
+
+static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
+{
+       dout("%s: obj %p\n", __func__, obj_request);
+       obj_request_done_set(obj_request);
+}
+
+static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
+{
+       dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
+               obj_request->result, obj_request->xferred, obj_request->length);
+       if (obj_request->img_request)
+               rbd_img_obj_request_read_callback(obj_request);
+       else
+               obj_request_done_set(obj_request);
 }
 
-static void rbd_coll_end_req(struct rbd_request *req,
-                            int ret, u64 len)
+static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
 {
-       rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
+       dout("%s: obj %p result %d %llu\n", __func__, obj_request,
+               obj_request->result, obj_request->length);
+       /*
+        * There is no such thing as a successful short write.
+        * Our xferred value is the number of bytes transferred
+        * back.  Set it to our originally-requested length.
+        */
+       obj_request->xferred = obj_request->length;
+       obj_request_done_set(obj_request);
 }
 
 /*
- * Send ceph osd request
+ * For a simple stat call there's nothing to do.  We'll do more if
+ * this is part of a write sequence for a layered image.
  */
-static int rbd_do_request(struct request *rq,
-                         struct rbd_device *rbd_dev,
-                         struct ceph_snap_context *snapc,
-                         u64 snapid,
-                         const char *object_name, u64 ofs, u64 len,
-                         struct bio *bio,
-                         struct page **pages,
-                         int num_pages,
-                         int flags,
-                         struct ceph_osd_req_op *ops,
-                         struct rbd_req_coll *coll,
-                         int coll_index,
-                         void (*rbd_cb)(struct ceph_osd_request *req,
-                                        struct ceph_msg *msg),
-                         struct ceph_osd_request **linger_req,
-                         u64 *ver)
-{
-       struct ceph_osd_request *req;
-       struct ceph_file_layout *layout;
-       int ret;
-       u64 bno;
-       struct timespec mtime = CURRENT_TIME;
-       struct rbd_request *req_data;
-       struct ceph_osd_request_head *reqhead;
-       struct ceph_osd_client *osdc;
+static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
+{
+       dout("%s: obj %p\n", __func__, obj_request);
+       obj_request_done_set(obj_request);
+}
 
-       req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
-       if (!req_data) {
-               if (coll)
-                       rbd_coll_end_req_index(rq, coll, coll_index,
-                                              -ENOMEM, len);
-               return -ENOMEM;
+static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
+                               struct ceph_msg *msg)
+{
+       struct rbd_obj_request *obj_request = osd_req->r_priv;
+       u16 opcode;
+
+       dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
+       rbd_assert(osd_req == obj_request->osd_req);
+       rbd_assert(!!obj_request->img_request ^
+                               (obj_request->which == BAD_WHICH));
+
+       if (osd_req->r_result < 0)
+               obj_request->result = osd_req->r_result;
+       obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
+
+       WARN_ON(osd_req->r_num_ops != 1);       /* For now */
+
+       /*
+        * We support a 64-bit length, but ultimately it has to be
+        * passed to blk_end_request(), which takes an unsigned int.
+        */
+       obj_request->xferred = osd_req->r_reply_op_len[0];
+       rbd_assert(obj_request->xferred < (u64) UINT_MAX);
+       opcode = osd_req->r_request_ops[0].op;
+       switch (opcode) {
+       case CEPH_OSD_OP_READ:
+               rbd_osd_read_callback(obj_request);
+               break;
+       case CEPH_OSD_OP_WRITE:
+               rbd_osd_write_callback(obj_request);
+               break;
+       case CEPH_OSD_OP_STAT:
+               rbd_osd_stat_callback(obj_request);
+               break;
+       case CEPH_OSD_OP_CALL:
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               rbd_osd_trivial_callback(obj_request);
+               break;
+       default:
+               rbd_warn(NULL, "%s: unsupported op %hu\n",
+                       obj_request->object_name, (unsigned short) opcode);
+               break;
        }
 
-       if (coll) {
-               req_data->coll = coll;
-               req_data->coll_index = coll_index;
+       if (obj_request_done_test(obj_request))
+               rbd_obj_request_complete(obj_request);
+}
+
+static struct ceph_osd_request *rbd_osd_req_create(
+                                       struct rbd_device *rbd_dev,
+                                       bool write_request,
+                                       struct rbd_obj_request *obj_request,
+                                       struct ceph_osd_req_op *op)
+{
+       struct rbd_img_request *img_request = obj_request->img_request;
+       struct ceph_snap_context *snapc = NULL;
+       struct ceph_osd_client *osdc;
+       struct ceph_osd_request *osd_req;
+       struct timespec now;
+       struct timespec *mtime;
+       u64 snap_id = CEPH_NOSNAP;
+       u64 offset = obj_request->offset;
+       u64 length = obj_request->length;
+
+       if (img_request) {
+               rbd_assert(img_request->write_request == write_request);
+               if (img_request->write_request)
+                       snapc = img_request->snapc;
+               else
+                       snap_id = img_request->snap_id;
        }
 
-       dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
-               object_name, (unsigned long long) ofs,
-               (unsigned long long) len, coll, coll_index);
+       /* Allocate and initialize the request, for the single op */
 
        osdc = &rbd_dev->rbd_client->client->osdc;
-       req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
-                                       false, GFP_NOIO, pages, bio);
-       if (!req) {
-               ret = -ENOMEM;
-               goto done_pages;
+       osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
+       if (!osd_req)
+               return NULL;    /* ENOMEM */
+
+       rbd_assert(obj_request_type_valid(obj_request->type));
+       switch (obj_request->type) {
+       case OBJ_REQUEST_NODATA:
+               break;          /* Nothing to do */
+       case OBJ_REQUEST_BIO:
+               rbd_assert(obj_request->bio_list != NULL);
+               osd_req->r_bio = obj_request->bio_list;
+               break;
+       case OBJ_REQUEST_PAGES:
+               osd_req->r_pages = obj_request->pages;
+               osd_req->r_num_pages = obj_request->page_count;
+               osd_req->r_page_alignment = offset & ~PAGE_MASK;
+               break;
+       }
+
+       if (write_request) {
+               osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+               now = CURRENT_TIME;
+               mtime = &now;
+       } else {
+               osd_req->r_flags = CEPH_OSD_FLAG_READ;
+               mtime = NULL;   /* not needed for reads */
+               offset = 0;     /* These are not used... */
+               length = 0;     /* ...for osd read requests */
        }
 
-       req->r_callback = rbd_cb;
+       osd_req->r_callback = rbd_osd_req_callback;
+       osd_req->r_priv = obj_request;
 
-       req_data->rq = rq;
-       req_data->bio = bio;
-       req_data->pages = pages;
-       req_data->len = len;
+       osd_req->r_oid_len = strlen(obj_request->object_name);
+       rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
+       memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
 
-       req->r_priv = req_data;
+       osd_req->r_file_layout = rbd_dev->layout;       /* struct */
 
-       reqhead = req->r_request->front.iov_base;
-       reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+       /* osd_req will get its own reference to snapc (if non-null) */
 
-       strncpy(req->r_oid, object_name, sizeof(req->r_oid));
-       req->r_oid_len = strlen(req->r_oid);
+       ceph_osdc_build_request(osd_req, offset, length, 1, op,
+                               snapc, snap_id, mtime);
 
-       layout = &req->r_file_layout;
-       memset(layout, 0, sizeof(*layout));
-       layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_stripe_count = cpu_to_le32(1);
-       layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
-       ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
-                                  req, ops);
-       rbd_assert(ret == 0);
+       return osd_req;
+}
 
-       ceph_osdc_build_request(req, ofs, &len,
-                               ops,
-                               snapc,
-                               &mtime,
-                               req->r_oid, req->r_oid_len);
+static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
+{
+       ceph_osdc_put_request(osd_req);
+}
 
-       if (linger_req) {
-               ceph_osdc_set_request_linger(osdc, req);
-               *linger_req = req;
-       }
+/* object_name is assumed to be a non-null pointer and NUL-terminated */
 
-       ret = ceph_osdc_start_request(osdc, req, false);
-       if (ret < 0)
-               goto done_err;
-
-       if (!rbd_cb) {
-               ret = ceph_osdc_wait_request(osdc, req);
-               if (ver)
-                       *ver = le64_to_cpu(req->r_reassert_version.version);
-               dout("reassert_ver=%llu\n",
-                       (unsigned long long)
-                               le64_to_cpu(req->r_reassert_version.version));
-               ceph_osdc_put_request(req);
+static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
+                                               u64 offset, u64 length,
+                                               enum obj_request_type type)
+{
+       struct rbd_obj_request *obj_request;
+       size_t size;
+       char *name;
+
+       rbd_assert(obj_request_type_valid(type));
+
+       size = strlen(object_name) + 1;
+       obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
+       if (!obj_request)
+               return NULL;
+
+       name = (char *)(obj_request + 1);
+       obj_request->object_name = memcpy(name, object_name, size);
+       obj_request->offset = offset;
+       obj_request->length = length;
+       obj_request->which = BAD_WHICH;
+       obj_request->type = type;
+       INIT_LIST_HEAD(&obj_request->links);
+       obj_request_done_init(obj_request);
+       init_completion(&obj_request->completion);
+       kref_init(&obj_request->kref);
+
+       dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
+               offset, length, (int)type, obj_request);
+
+       return obj_request;
+}
+
+static void rbd_obj_request_destroy(struct kref *kref)
+{
+       struct rbd_obj_request *obj_request;
+
+       obj_request = container_of(kref, struct rbd_obj_request, kref);
+
+       dout("%s: obj %p\n", __func__, obj_request);
+
+       rbd_assert(obj_request->img_request == NULL);
+       rbd_assert(obj_request->which == BAD_WHICH);
+
+       if (obj_request->osd_req)
+               rbd_osd_req_destroy(obj_request->osd_req);
+
+       rbd_assert(obj_request_type_valid(obj_request->type));
+       switch (obj_request->type) {
+       case OBJ_REQUEST_NODATA:
+               break;          /* Nothing to do */
+       case OBJ_REQUEST_BIO:
+               if (obj_request->bio_list)
+                       bio_chain_put(obj_request->bio_list);
+               break;
+       case OBJ_REQUEST_PAGES:
+               if (obj_request->pages)
+                       ceph_release_page_vector(obj_request->pages,
+                                               obj_request->page_count);
+               break;
        }
-       return ret;
 
-done_err:
-       bio_chain_put(req_data->bio);
-       ceph_osdc_put_request(req);
-done_pages:
-       rbd_coll_end_req(req_data, ret, len);
-       kfree(req_data);
-       return ret;
+       kfree(obj_request);
 }
 
 /*
- * Ceph osd op callback
+ * Caller is responsible for filling in the list of object requests
+ * that comprises the image request, and the Linux request pointer
+ * (if there is one).
  */
-static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
-{
-       struct rbd_request *req_data = req->r_priv;
-       struct ceph_osd_reply_head *replyhead;
-       struct ceph_osd_op *op;
-       __s32 rc;
-       u64 bytes;
-       int read_op;
-
-       /* parse reply */
-       replyhead = msg->front.iov_base;
-       WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
-       op = (void *)(replyhead + 1);
-       rc = le32_to_cpu(replyhead->result);
-       bytes = le64_to_cpu(op->extent.length);
-       read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
-
-       dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
-               (unsigned long long) bytes, read_op, (int) rc);
-
-       if (rc == -ENOENT && read_op) {
-               zero_bio_chain(req_data->bio, 0);
-               rc = 0;
-       } else if (rc == 0 && read_op && bytes < req_data->len) {
-               zero_bio_chain(req_data->bio, bytes);
-               bytes = req_data->len;
-       }
+static struct rbd_img_request *rbd_img_request_create(
+                                       struct rbd_device *rbd_dev,
+                                       u64 offset, u64 length,
+                                       bool write_request)
+{
+       struct rbd_img_request *img_request;
+       struct ceph_snap_context *snapc = NULL;
 
-       rbd_coll_end_req(req_data, rc, bytes);
+       img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
+       if (!img_request)
+               return NULL;
 
-       if (req_data->bio)
-               bio_chain_put(req_data->bio);
+       if (write_request) {
+               down_read(&rbd_dev->header_rwsem);
+               snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+               up_read(&rbd_dev->header_rwsem);
+               if (WARN_ON(!snapc)) {
+                       kfree(img_request);
+                       return NULL;    /* Shouldn't happen */
+               }
+       }
 
-       ceph_osdc_put_request(req);
-       kfree(req_data);
+       img_request->rq = NULL;
+       img_request->rbd_dev = rbd_dev;
+       img_request->offset = offset;
+       img_request->length = length;
+       img_request->write_request = write_request;
+       if (write_request)
+               img_request->snapc = snapc;
+       else
+               img_request->snap_id = rbd_dev->spec->snap_id;
+       spin_lock_init(&img_request->completion_lock);
+       img_request->next_completion = 0;
+       img_request->callback = NULL;
+       img_request->obj_request_count = 0;
+       INIT_LIST_HEAD(&img_request->obj_requests);
+       kref_init(&img_request->kref);
+
+       rbd_img_request_get(img_request);       /* Avoid a warning */
+       rbd_img_request_put(img_request);       /* TEMPORARY */
+
+       dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
+               write_request ? "write" : "read", offset, length,
+               img_request);
+
+       return img_request;
 }
 
-static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void rbd_img_request_destroy(struct kref *kref)
 {
-       ceph_osdc_put_request(req);
+       struct rbd_img_request *img_request;
+       struct rbd_obj_request *obj_request;
+       struct rbd_obj_request *next_obj_request;
+
+       img_request = container_of(kref, struct rbd_img_request, kref);
+
+       dout("%s: img %p\n", __func__, img_request);
+
+       for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+               rbd_img_obj_request_del(img_request, obj_request);
+       rbd_assert(img_request->obj_request_count == 0);
+
+       if (img_request->write_request)
+               ceph_put_snap_context(img_request->snapc);
+
+       kfree(img_request);
 }
 
-/*
- * Do a synchronous ceph osd operation
- */
-static int rbd_req_sync_op(struct rbd_device *rbd_dev,
-                          struct ceph_snap_context *snapc,
-                          u64 snapid,
-                          int flags,
-                          struct ceph_osd_req_op *ops,
-                          const char *object_name,
-                          u64 ofs, u64 inbound_size,
-                          char *inbound,
-                          struct ceph_osd_request **linger_req,
-                          u64 *ver)
+static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
+                                       struct bio *bio_list)
 {
-       int ret;
-       struct page **pages;
-       int num_pages;
-
-       rbd_assert(ops != NULL);
+       struct rbd_device *rbd_dev = img_request->rbd_dev;
+       struct rbd_obj_request *obj_request = NULL;
+       struct rbd_obj_request *next_obj_request;
+       unsigned int bio_offset;
+       u64 image_offset;
+       u64 resid;
+       u16 opcode;
+
+       dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
+
+       opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
+                                             : CEPH_OSD_OP_READ;
+       bio_offset = 0;
+       image_offset = img_request->offset;
+       rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
+       resid = img_request->length;
+       rbd_assert(resid > 0);
+       while (resid) {
+               const char *object_name;
+               unsigned int clone_size;
+               struct ceph_osd_req_op *op;
+               u64 offset;
+               u64 length;
+
+               object_name = rbd_segment_name(rbd_dev, image_offset);
+               if (!object_name)
+                       goto out_unwind;
+               offset = rbd_segment_offset(rbd_dev, image_offset);
+               length = rbd_segment_length(rbd_dev, image_offset, resid);
+               obj_request = rbd_obj_request_create(object_name,
+                                               offset, length,
+                                               OBJ_REQUEST_BIO);
+               kfree(object_name);     /* object request has its own copy */
+               if (!obj_request)
+                       goto out_unwind;
+
+               rbd_assert(length <= (u64) UINT_MAX);
+               clone_size = (unsigned int) length;
+               obj_request->bio_list = bio_chain_clone_range(&bio_list,
+                                               &bio_offset, clone_size,
+                                               GFP_ATOMIC);
+               if (!obj_request->bio_list)
+                       goto out_partial;
 
-       num_pages = calc_pages_for(ofs, inbound_size);
-       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
+               /*
+                * Build up the op to use in building the osd
+                * request.  Note that the contents of the op are
+                * copied by rbd_osd_req_create().
+                */
+               op = rbd_osd_req_op_create(opcode, offset, length);
+               if (!op)
+                       goto out_partial;
+               obj_request->osd_req = rbd_osd_req_create(rbd_dev,
+                                               img_request->write_request,
+                                               obj_request, op);
+               rbd_osd_req_op_destroy(op);
+               if (!obj_request->osd_req)
+                       goto out_partial;
+               /* status and version are initially zero-filled */
+
+               rbd_img_obj_request_add(img_request, obj_request);
+
+               image_offset += length;
+               resid -= length;
+       }
 
-       ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
-                         object_name, ofs, inbound_size, NULL,
-                         pages, num_pages,
-                         flags,
-                         ops,
-                         NULL, 0,
-                         NULL,
-                         linger_req, ver);
-       if (ret < 0)
-               goto done;
+       return 0;
 
-       if ((flags & CEPH_OSD_FLAG_READ) && inbound)
-               ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
+out_partial:
+       rbd_obj_request_put(obj_request);
+out_unwind:
+       for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+               rbd_obj_request_put(obj_request);
 
-done:
-       ceph_release_page_vector(pages, num_pages);
-       return ret;
+       return -ENOMEM;
 }
 
-/*
- * Do an asynchronous ceph osd operation
- */
-static int rbd_do_op(struct request *rq,
-                    struct rbd_device *rbd_dev,
-                    struct ceph_snap_context *snapc,
-                    u64 ofs, u64 len,
-                    struct bio *bio,
-                    struct rbd_req_coll *coll,
-                    int coll_index)
-{
-       char *seg_name;
-       u64 seg_ofs;
-       u64 seg_len;
-       int ret;
-       struct ceph_osd_req_op *ops;
-       u32 payload_len;
-       int opcode;
-       int flags;
-       u64 snapid;
-
-       seg_name = rbd_segment_name(rbd_dev, ofs);
-       if (!seg_name)
-               return -ENOMEM;
-       seg_len = rbd_segment_length(rbd_dev, ofs, len);
-       seg_ofs = rbd_segment_offset(rbd_dev, ofs);
-
-       if (rq_data_dir(rq) == WRITE) {
-               opcode = CEPH_OSD_OP_WRITE;
-               flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
-               snapid = CEPH_NOSNAP;
-               payload_len = seg_len;
-       } else {
-               opcode = CEPH_OSD_OP_READ;
-               flags = CEPH_OSD_FLAG_READ;
-               snapc = NULL;
-               snapid = rbd_dev->spec->snap_id;
-               payload_len = 0;
+static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request;
+       u32 which = obj_request->which;
+       bool more = true;
+
+       img_request = obj_request->img_request;
+
+       dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
+       rbd_assert(img_request != NULL);
+       rbd_assert(img_request->rq != NULL);
+       rbd_assert(img_request->obj_request_count > 0);
+       rbd_assert(which != BAD_WHICH);
+       rbd_assert(which < img_request->obj_request_count);
+       rbd_assert(which >= img_request->next_completion);
+
+       spin_lock_irq(&img_request->completion_lock);
+       if (which != img_request->next_completion)
+               goto out;
+
+       for_each_obj_request_from(img_request, obj_request) {
+               unsigned int xferred;
+               int result;
+
+               rbd_assert(more);
+               rbd_assert(which < img_request->obj_request_count);
+
+               if (!obj_request_done_test(obj_request))
+                       break;
+
+               rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
+               xferred = (unsigned int) obj_request->xferred;
+               result = (int) obj_request->result;
+               if (result)
+                       rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
+                               img_request->write_request ? "write" : "read",
+                               result, xferred);
+
+               more = blk_end_request(img_request->rq, result, xferred);
+               which++;
        }
 
-       ret = -ENOMEM;
-       ops = rbd_create_rw_ops(1, opcode, payload_len);
-       if (!ops)
-               goto done;
+       rbd_assert(more ^ (which == img_request->obj_request_count));
+       img_request->next_completion = which;
+out:
+       spin_unlock_irq(&img_request->completion_lock);
 
-       /* we've taken care of segment sizes earlier when we
-          cloned the bios. We should never have a segment
-          truncated at this point */
-       rbd_assert(seg_len == len);
-
-       ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
-                            seg_name, seg_ofs, seg_len,
-                            bio,
-                            NULL, 0,
-                            flags,
-                            ops,
-                            coll, coll_index,
-                            rbd_req_cb, 0, NULL);
-
-       rbd_destroy_ops(ops);
-done:
-       kfree(seg_name);
-       return ret;
+       if (!more)
+               rbd_img_request_complete(img_request);
 }
 
-/*
- * Request sync osd read
- */
-static int rbd_req_sync_read(struct rbd_device *rbd_dev,
-                         u64 snapid,
-                         const char *object_name,
-                         u64 ofs, u64 len,
-                         char *buf,
-                         u64 *ver)
-{
-       struct ceph_osd_req_op *ops;
-       int ret;
+static int rbd_img_request_submit(struct rbd_img_request *img_request)
+{
+       struct rbd_device *rbd_dev = img_request->rbd_dev;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_obj_request *obj_request;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
-       if (!ops)
-               return -ENOMEM;
+       dout("%s: img %p\n", __func__, img_request);
+       for_each_obj_request(img_request, obj_request) {
+               int ret;
 
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                              snapid,
-                              CEPH_OSD_FLAG_READ,
-                              ops, object_name, ofs, len, buf, NULL, ver);
-       rbd_destroy_ops(ops);
+               obj_request->callback = rbd_img_obj_callback;
+               ret = rbd_obj_request_submit(osdc, obj_request);
+               if (ret)
+                       return ret;
+               /*
+                * The image request has its own reference to each
+                * of its object requests, so we can safely drop the
+                * initial one here.
+                */
+               rbd_obj_request_put(obj_request);
+       }
 
-       return ret;
+       return 0;
 }
 
-/*
- * Request sync osd watch
- */
-static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
-                                  u64 ver,
-                                  u64 notify_id)
+static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
+                                  u64 ver, u64 notify_id)
 {
-       struct ceph_osd_req_op *ops;
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_req_op *op;
+       struct ceph_osd_client *osdc;
        int ret;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
-       if (!ops)
+       obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+                                                       OBJ_REQUEST_NODATA);
+       if (!obj_request)
                return -ENOMEM;
 
-       ops[0].watch.ver = cpu_to_le64(ver);
-       ops[0].watch.cookie = notify_id;
-       ops[0].watch.flag = 0;
+       ret = -ENOMEM;
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
+       if (!op)
+               goto out;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out;
 
-       ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
-                         rbd_dev->header_name, 0, 0, NULL,
-                         NULL, 0,
-                         CEPH_OSD_FLAG_READ,
-                         ops,
-                         NULL, 0,
-                         rbd_simple_req_cb, 0, NULL);
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       obj_request->callback = rbd_obj_request_put;
+       ret = rbd_obj_request_submit(osdc, obj_request);
+out:
+       if (ret)
+               rbd_obj_request_put(obj_request);
 
-       rbd_destroy_ops(ops);
        return ret;
 }
 
@@ -1381,95 +1804,103 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        if (!rbd_dev)
                return;
 
-       dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
+       dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
                rbd_dev->header_name, (unsigned long long) notify_id,
                (unsigned int) opcode);
        rc = rbd_dev_refresh(rbd_dev, &hver);
        if (rc)
-               pr_warning(RBD_DRV_NAME "%d got notification but failed to "
-                          " update snaps: %d\n", rbd_dev->major, rc);
+               rbd_warn(rbd_dev, "got notification but failed to "
+                          " update snaps: %d\n", rc);
 
-       rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
+       rbd_obj_notify_ack(rbd_dev, hver, notify_id);
 }
 
 /*
- * Request sync osd watch
+ * Request sync osd watch/unwatch.  The value of "start" determines
+ * whether a watch request is being initiated or torn down.
  */
-static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
 {
-       struct ceph_osd_req_op *ops;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_req_op *op;
        int ret;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
-       if (!ops)
-               return -ENOMEM;
+       rbd_assert(start ^ !!rbd_dev->watch_event);
+       rbd_assert(start ^ !!rbd_dev->watch_request);
 
-       ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
-                                    (void *)rbd_dev, &rbd_dev->watch_event);
-       if (ret < 0)
-               goto fail;
+       if (start) {
+               ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+                                               &rbd_dev->watch_event);
+               if (ret < 0)
+                       return ret;
+               rbd_assert(rbd_dev->watch_event != NULL);
+       }
 
-       ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
-       ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
-       ops[0].watch.flag = 1;
+       ret = -ENOMEM;
+       obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+                                                       OBJ_REQUEST_NODATA);
+       if (!obj_request)
+               goto out_cancel;
+
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
+                               rbd_dev->watch_event->cookie,
+                               rbd_dev->header.obj_version, start);
+       if (!op)
+               goto out_cancel;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
+                                                       obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out_cancel;
+
+       if (start)
+               ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+       else
+               ceph_osdc_unregister_linger_request(osdc,
+                                       rbd_dev->watch_request->osd_req);
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out_cancel;
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out_cancel;
+       ret = obj_request->result;
+       if (ret)
+               goto out_cancel;
 
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                             CEPH_NOSNAP,
-                             CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                             ops,
-                             rbd_dev->header_name,
-                             0, 0, NULL,
-                             &rbd_dev->watch_request, NULL);
+       /*
+        * A watch request is set to linger, so the underlying osd
+        * request won't go away until we unregister it.  We retain
+        * a pointer to the object request during that time (in
+        * rbd_dev->watch_request), so we'll keep a reference to
+        * it.  We'll drop that reference (below) after we've
+        * unregistered it.
+        */
+       if (start) {
+               rbd_dev->watch_request = obj_request;
 
-       if (ret < 0)
-               goto fail_event;
+               return 0;
+       }
 
-       rbd_destroy_ops(ops);
-       return 0;
+       /* We have successfully torn down the watch request */
 
-fail_event:
+       rbd_obj_request_put(rbd_dev->watch_request);
+       rbd_dev->watch_request = NULL;
+out_cancel:
+       /* Cancel the event if we're tearing down, or on error */
        ceph_osdc_cancel_event(rbd_dev->watch_event);
        rbd_dev->watch_event = NULL;
-fail:
-       rbd_destroy_ops(ops);
-       return ret;
-}
+       if (obj_request)
+               rbd_obj_request_put(obj_request);
 
-/*
- * Request sync osd unwatch
- */
-static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
-{
-       struct ceph_osd_req_op *ops;
-       int ret;
-
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
-       if (!ops)
-               return -ENOMEM;
-
-       ops[0].watch.ver = 0;
-       ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
-       ops[0].watch.flag = 0;
-
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                             CEPH_NOSNAP,
-                             CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                             ops,
-                             rbd_dev->header_name,
-                             0, 0, NULL, NULL, NULL);
-
-
-       rbd_destroy_ops(ops);
-       ceph_osdc_cancel_event(rbd_dev->watch_event);
-       rbd_dev->watch_event = NULL;
        return ret;
 }
 
 /*
  * Synchronous osd object method call
  */
-static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
+static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
                             const char *object_name,
                             const char *class_name,
                             const char *method_name,
@@ -1477,169 +1908,154 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
                             size_t outbound_size,
                             char *inbound,
                             size_t inbound_size,
-                            int flags,
-                            u64 *ver)
+                            u64 *version)
 {
-       struct ceph_osd_req_op *ops;
-       int class_name_len = strlen(class_name);
-       int method_name_len = strlen(method_name);
-       int payload_size;
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_client *osdc;
+       struct ceph_osd_req_op *op;
+       struct page **pages;
+       u32 page_count;
        int ret;
 
        /*
-        * Any input parameters required by the method we're calling
-        * will be sent along with the class and method names as
-        * part of the message payload.  That data and its size are
-        * supplied via the indata and indata_len fields (named from
-        * the perspective of the server side) in the OSD request
-        * operation.
+        * Method calls are ultimately read operations but they
+        * don't involve object data (so no offset or length).
+        * The result should placed into the inbound buffer
+        * provided.  They also supply outbound data--parameters for
+        * the object method.  Currently if this is present it will
+        * be a snapshot id.
         */
-       payload_size = class_name_len + method_name_len + outbound_size;
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
-       if (!ops)
-               return -ENOMEM;
+       page_count = (u32) calc_pages_for(0, inbound_size);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
 
-       ops[0].cls.class_name = class_name;
-       ops[0].cls.class_len = (__u8) class_name_len;
-       ops[0].cls.method_name = method_name;
-       ops[0].cls.method_len = (__u8) method_name_len;
-       ops[0].cls.argc = 0;
-       ops[0].cls.indata = outbound;
-       ops[0].cls.indata_len = outbound_size;
+       ret = -ENOMEM;
+       obj_request = rbd_obj_request_create(object_name, 0, 0,
+                                                       OBJ_REQUEST_PAGES);
+       if (!obj_request)
+               goto out;
 
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                              CEPH_NOSNAP,
-                              flags, ops,
-                              object_name, 0, inbound_size, inbound,
-                              NULL, ver);
+       obj_request->pages = pages;
+       obj_request->page_count = page_count;
 
-       rbd_destroy_ops(ops);
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
+                                       method_name, outbound, outbound_size);
+       if (!op)
+               goto out;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out;
 
-       dout("cls_exec returned %d\n", ret);
-       return ret;
-}
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out;
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out;
 
-static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
-{
-       struct rbd_req_coll *coll =
-                       kzalloc(sizeof(struct rbd_req_coll) +
-                               sizeof(struct rbd_req_status) * num_reqs,
-                               GFP_ATOMIC);
+       ret = obj_request->result;
+       if (ret < 0)
+               goto out;
+       ret = 0;
+       ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
+       if (version)
+               *version = obj_request->version;
+out:
+       if (obj_request)
+               rbd_obj_request_put(obj_request);
+       else
+               ceph_release_page_vector(pages, page_count);
 
-       if (!coll)
-               return NULL;
-       coll->total = num_reqs;
-       kref_init(&coll->kref);
-       return coll;
+       return ret;
 }
 
-/*
- * block device queue callback
- */
-static void rbd_rq_fn(struct request_queue *q)
+static void rbd_request_fn(struct request_queue *q)
+               __releases(q->queue_lock) __acquires(q->queue_lock)
 {
        struct rbd_device *rbd_dev = q->queuedata;
+       bool read_only = rbd_dev->mapping.read_only;
        struct request *rq;
+       int result;
 
        while ((rq = blk_fetch_request(q))) {
-               struct bio *bio;
-               bool do_write;
-               unsigned int size;
-               u64 ofs;
-               int num_segs, cur_seg = 0;
-               struct rbd_req_coll *coll;
-               struct ceph_snap_context *snapc;
-               unsigned int bio_offset;
-
-               dout("fetched request\n");
-
-               /* filter out block requests we don't understand */
-               if ((rq->cmd_type != REQ_TYPE_FS)) {
-                       __blk_end_request_all(rq, 0);
-                       continue;
-               }
+               bool write_request = rq_data_dir(rq) == WRITE;
+               struct rbd_img_request *img_request;
+               u64 offset;
+               u64 length;
 
-               /* deduce our operation (read, write) */
-               do_write = (rq_data_dir(rq) == WRITE);
-               if (do_write && rbd_dev->mapping.read_only) {
-                       __blk_end_request_all(rq, -EROFS);
+               /* Ignore any non-FS requests that filter through. */
+
+               if (rq->cmd_type != REQ_TYPE_FS) {
+                       dout("%s: non-fs request type %d\n", __func__,
+                               (int) rq->cmd_type);
+                       __blk_end_request_all(rq, 0);
                        continue;
                }
 
-               spin_unlock_irq(q->queue_lock);
+               /* Ignore/skip any zero-length requests */
 
-               down_read(&rbd_dev->header_rwsem);
+               offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
+               length = (u64) blk_rq_bytes(rq);
 
-               if (!rbd_dev->exists) {
-                       rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-                       up_read(&rbd_dev->header_rwsem);
-                       dout("request for non-existent snapshot");
-                       spin_lock_irq(q->queue_lock);
-                       __blk_end_request_all(rq, -ENXIO);
+               if (!length) {
+                       dout("%s: zero-length request\n", __func__);
+                       __blk_end_request_all(rq, 0);
                        continue;
                }
 
-               snapc = ceph_get_snap_context(rbd_dev->header.snapc);
-
-               up_read(&rbd_dev->header_rwsem);
-
-               size = blk_rq_bytes(rq);
-               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-               bio = rq->bio;
+               spin_unlock_irq(q->queue_lock);
 
-               dout("%s 0x%x bytes at 0x%llx\n",
-                    do_write ? "write" : "read",
-                    size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
+               /* Disallow writes to a read-only device */
 
-               num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
-               if (num_segs <= 0) {
-                       spin_lock_irq(q->queue_lock);
-                       __blk_end_request_all(rq, num_segs);
-                       ceph_put_snap_context(snapc);
-                       continue;
-               }
-               coll = rbd_alloc_coll(num_segs);
-               if (!coll) {
-                       spin_lock_irq(q->queue_lock);
-                       __blk_end_request_all(rq, -ENOMEM);
-                       ceph_put_snap_context(snapc);
-                       continue;
+               if (write_request) {
+                       result = -EROFS;
+                       if (read_only)
+                               goto end_request;
+                       rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
                }
 
-               bio_offset = 0;
-               do {
-                       u64 limit = rbd_segment_length(rbd_dev, ofs, size);
-                       unsigned int chain_size;
-                       struct bio *bio_chain;
-
-                       BUG_ON(limit > (u64) UINT_MAX);
-                       chain_size = (unsigned int) limit;
-                       dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-
-                       kref_get(&coll->kref);
+               /*
+                * Quit early if the mapped snapshot no longer
+                * exists.  It's still possible the snapshot will
+                * have disappeared by the time our request arrives
+                * at the osd, but there's no sense in sending it if
+                * we already know.
+                */
+               if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+                       dout("request for non-existent snapshot");
+                       rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+                       result = -ENXIO;
+                       goto end_request;
+               }
 
-                       /* Pass a cloned bio chain via an osd request */
+               result = -EINVAL;
+               if (WARN_ON(offset && length > U64_MAX - offset + 1))
+                       goto end_request;       /* Shouldn't happen */
 
-                       bio_chain = bio_chain_clone_range(&bio,
-                                               &bio_offset, chain_size,
-                                               GFP_ATOMIC);
-                       if (bio_chain)
-                               (void) rbd_do_op(rq, rbd_dev, snapc,
-                                               ofs, chain_size,
-                                               bio_chain, coll, cur_seg);
-                       else
-                               rbd_coll_end_req_index(rq, coll, cur_seg,
-                                                      -ENOMEM, chain_size);
-                       size -= chain_size;
-                       ofs += chain_size;
+               result = -ENOMEM;
+               img_request = rbd_img_request_create(rbd_dev, offset, length,
+                                                       write_request);
+               if (!img_request)
+                       goto end_request;
 
-                       cur_seg++;
-               } while (size > 0);
-               kref_put(&coll->kref, rbd_coll_release);
+               img_request->rq = rq;
 
+               result = rbd_img_request_fill_bio(img_request, rq->bio);
+               if (!result)
+                       result = rbd_img_request_submit(img_request);
+               if (result)
+                       rbd_img_request_put(img_request);
+end_request:
                spin_lock_irq(q->queue_lock);
-
-               ceph_put_snap_context(snapc);
+               if (result < 0) {
+                       rbd_warn(rbd_dev, "obj_request %s result %d\n",
+                               write_request ? "write" : "read", result);
+                       __blk_end_request_all(rq, result);
+               }
        }
 }
 
@@ -1703,6 +2119,71 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
        put_disk(disk);
 }
 
+static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
+                               const char *object_name,
+                               u64 offset, u64 length,
+                               char *buf, u64 *version)
+
+{
+       struct ceph_osd_req_op *op;
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_client *osdc;
+       struct page **pages = NULL;
+       u32 page_count;
+       size_t size;
+       int ret;
+
+       page_count = (u32) calc_pages_for(offset, length);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               ret = PTR_ERR(pages);
+
+       ret = -ENOMEM;
+       obj_request = rbd_obj_request_create(object_name, offset, length,
+                                                       OBJ_REQUEST_PAGES);
+       if (!obj_request)
+               goto out;
+
+       obj_request->pages = pages;
+       obj_request->page_count = page_count;
+
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
+       if (!op)
+               goto out;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out;
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out;
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out;
+
+       ret = obj_request->result;
+       if (ret < 0)
+               goto out;
+
+       rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
+       size = (size_t) obj_request->xferred;
+       ceph_copy_from_page_vector(pages, buf, 0, size);
+       rbd_assert(size <= (size_t) INT_MAX);
+       ret = (int) size;
+       if (version)
+               *version = obj_request->version;
+out:
+       if (obj_request)
+               rbd_obj_request_put(obj_request);
+       else
+               ceph_release_page_vector(pages, page_count);
+
+       return ret;
+}
+
 /*
  * Read the complete header for the given rbd device.
  *
@@ -1741,24 +2222,20 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                if (!ondisk)
                        return ERR_PTR(-ENOMEM);
 
-               ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
-                                      rbd_dev->header_name,
+               ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
                                       0, size,
                                       (char *) ondisk, version);
-
                if (ret < 0)
                        goto out_err;
                if (WARN_ON((size_t) ret < size)) {
                        ret = -ENXIO;
-                       pr_warning("short header read for image %s"
-                                       " (want %zd got %d)\n",
-                               rbd_dev->spec->image_name, size, ret);
+                       rbd_warn(rbd_dev, "short header read (want %zd got %d)",
+                               size, ret);
                        goto out_err;
                }
                if (!rbd_dev_ondisk_valid(ondisk)) {
                        ret = -ENXIO;
-                       pr_warning("invalid header for image %s\n",
-                               rbd_dev->spec->image_name);
+                       rbd_warn(rbd_dev, "invalid header");
                        goto out_err;
                }
 
@@ -1895,8 +2372,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        disk->fops = &rbd_bd_ops;
        disk->private_data = rbd_dev;
 
-       /* init rq */
-       q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
+       q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
        if (!q)
                goto out_disk;
 
@@ -2233,7 +2709,7 @@ static void rbd_spec_free(struct kref *kref)
        kfree(spec);
 }
 
-struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
                                struct rbd_spec *spec)
 {
        struct rbd_device *rbd_dev;
@@ -2243,6 +2719,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
                return NULL;
 
        spin_lock_init(&rbd_dev->lock);
+       rbd_dev->flags = 0;
        INIT_LIST_HEAD(&rbd_dev->node);
        INIT_LIST_HEAD(&rbd_dev->snaps);
        init_rwsem(&rbd_dev->header_rwsem);
@@ -2250,6 +2727,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        rbd_dev->spec = spec;
        rbd_dev->rbd_client = rbdc;
 
+       /* Initialize the layout used for all rbd requests */
+
+       rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+       rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
+       rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+       rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
+
        return rbd_dev;
 }
 
@@ -2360,12 +2844,11 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
                __le64 size;
        } __attribute__ ((packed)) size_buf = { 0 };
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_size",
                                (char *) &snapid, sizeof (snapid),
-                               (char *) &size_buf, sizeof (size_buf),
-                               CEPH_OSD_FLAG_READ, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+                               (char *) &size_buf, sizeof (size_buf), NULL);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
 
@@ -2396,15 +2879,13 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
        if (!reply_buf)
                return -ENOMEM;
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_object_prefix",
                                NULL, 0,
-                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
-                               CEPH_OSD_FLAG_READ, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
-       ret = 0;    /* rbd_req_sync_exec() can return positive */
 
        p = reply_buf;
        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2435,12 +2916,12 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
        u64 incompat;
        int ret;
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_features",
                                (char *) &snapid, sizeof (snapid),
                                (char *) &features_buf, sizeof (features_buf),
-                               CEPH_OSD_FLAG_READ, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+                               NULL);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
 
@@ -2474,7 +2955,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        void *end;
        char *image_id;
        u64 overlap;
-       size_t len = 0;
        int ret;
 
        parent_spec = rbd_spec_alloc();
@@ -2492,12 +2972,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        }
 
        snapid = cpu_to_le64(CEPH_NOSNAP);
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_parent",
                                (char *) &snapid, sizeof (snapid),
-                               (char *) reply_buf, size,
-                               CEPH_OSD_FLAG_READ, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+                               (char *) reply_buf, size, NULL);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out_err;
 
@@ -2508,13 +2987,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        if (parent_spec->pool_id == CEPH_NOPOOL)
                goto out;       /* No parent?  No problem. */
 
-       image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+       /* The ceph file layout needs to fit pool id in 32 bits */
+
+       ret = -EIO;
+       if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
+               goto out;
+
+       image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
                ret = PTR_ERR(image_id);
                goto out_err;
        }
        parent_spec->image_id = image_id;
-       parent_spec->image_id_len = len;
        ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
        ceph_decode_64_safe(&p, end, overlap, out_err);
 
@@ -2544,26 +3028,25 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
 
        rbd_assert(!rbd_dev->spec->image_name);
 
-       image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
+       len = strlen(rbd_dev->spec->image_id);
+       image_id_size = sizeof (__le32) + len;
        image_id = kmalloc(image_id_size, GFP_KERNEL);
        if (!image_id)
                return NULL;
 
        p = image_id;
        end = (char *) image_id + image_id_size;
-       ceph_encode_string(&p, end, rbd_dev->spec->image_id,
-                               (u32) rbd_dev->spec->image_id_len);
+       ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
 
        size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
        reply_buf = kmalloc(size, GFP_KERNEL);
        if (!reply_buf)
                goto out;
 
-       ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+       ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
                                "rbd", "dir_get_name",
                                image_id, image_id_size,
-                               (char *) reply_buf, size,
-                               CEPH_OSD_FLAG_READ, NULL);
+                               (char *) reply_buf, size, NULL);
        if (ret < 0)
                goto out;
        p = reply_buf;
@@ -2602,8 +3085,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
 
        osdc = &rbd_dev->rbd_client->client->osdc;
        name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
-       if (!name)
-               return -EIO;    /* pool id too large (>= 2^31) */
+       if (!name) {
+               rbd_warn(rbd_dev, "there is no pool with id %llu",
+                       rbd_dev->spec->pool_id);        /* Really a BUG() */
+               return -EIO;
+       }
 
        rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
        if (!rbd_dev->spec->pool_name)
@@ -2612,19 +3098,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
        /* Fetch the image name; tolerate failure here */
 
        name = rbd_dev_image_name(rbd_dev);
-       if (name) {
-               rbd_dev->spec->image_name_len = strlen(name);
+       if (name)
                rbd_dev->spec->image_name = (char *) name;
-       } else {
-               pr_warning(RBD_DRV_NAME "%d "
-                       "unable to get image name for image id %s\n",
-                       rbd_dev->major, rbd_dev->spec->image_id);
-       }
+       else
+               rbd_warn(rbd_dev, "unable to get image name");
 
        /* Look up the snapshot name. */
 
        name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
        if (!name) {
+               rbd_warn(rbd_dev, "no snapshot with id %llu",
+                       rbd_dev->spec->snap_id);        /* Really a BUG() */
                ret = -EIO;
                goto out_err;
        }
@@ -2665,12 +3149,11 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
        if (!reply_buf)
                return -ENOMEM;
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapcontext",
                                NULL, 0,
-                               reply_buf, size,
-                               CEPH_OSD_FLAG_READ, ver);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+                               reply_buf, size, ver);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
 
@@ -2735,12 +3218,11 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
                return ERR_PTR(-ENOMEM);
 
        snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapshot_name",
                                (char *) &snap_id, sizeof (snap_id),
-                               reply_buf, size,
-                               CEPH_OSD_FLAG_READ, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+                               reply_buf, size, NULL);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
 
@@ -2766,7 +3248,7 @@ out:
 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
                u64 *snap_size, u64 *snap_features)
 {
-       __le64 snap_id;
+       u64 snap_id;
        u8 order;
        int ret;
 
@@ -2865,10 +3347,17 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
                if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
                        struct list_head *next = links->next;
 
-                       /* Existing snapshot not in the new snap context */
-
+                       /*
+                        * A previously-existing snapshot is not in
+                        * the new snap context.
+                        *
+                        * If the now missing snapshot is the one the
+                        * image is mapped to, clear its exists flag
+                        * so we can avoid sending any more requests
+                        * to it.
+                        */
                        if (rbd_dev->spec->snap_id == snap->id)
-                               rbd_dev->exists = false;
+                               clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
                        rbd_remove_snap_dev(snap);
                        dout("%ssnap id %llu has been removed\n",
                                rbd_dev->spec->snap_id == snap->id ?
@@ -2942,7 +3431,7 @@ static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
        struct rbd_snap *snap;
        int ret = 0;
 
-       dout("%s called\n", __func__);
+       dout("%s:\n", __func__);
        if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
                return -EIO;
 
@@ -2983,22 +3472,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
        device_unregister(&rbd_dev->dev);
 }
 
-static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
-{
-       int ret, rc;
-
-       do {
-               ret = rbd_req_sync_watch(rbd_dev);
-               if (ret == -ERANGE) {
-                       rc = rbd_dev_refresh(rbd_dev, NULL);
-                       if (rc < 0)
-                               return rc;
-               }
-       } while (ret == -ERANGE);
-
-       return ret;
-}
-
 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
 
 /*
@@ -3138,11 +3611,9 @@ static inline char *dup_token(const char **buf, size_t *lenp)
        size_t len;
 
        len = next_token(buf);
-       dup = kmalloc(len + 1, GFP_KERNEL);
+       dup = kmemdup(*buf, len + 1, GFP_KERNEL);
        if (!dup)
                return NULL;
-
-       memcpy(dup, *buf, len);
        *(dup + len) = '\0';
        *buf += len;
 
@@ -3210,8 +3681,10 @@ static int rbd_add_parse_args(const char *buf,
        /* The first four tokens are required */
 
        len = next_token(&buf);
-       if (!len)
-               return -EINVAL; /* Missing monitor address(es) */
+       if (!len) {
+               rbd_warn(NULL, "no monitor address(es) provided");
+               return -EINVAL;
+       }
        mon_addrs = buf;
        mon_addrs_size = len + 1;
        buf += len;
@@ -3220,8 +3693,10 @@ static int rbd_add_parse_args(const char *buf,
        options = dup_token(&buf, NULL);
        if (!options)
                return -ENOMEM;
-       if (!*options)
-               goto out_err;   /* Missing options */
+       if (!*options) {
+               rbd_warn(NULL, "no options provided");
+               goto out_err;
+       }
 
        spec = rbd_spec_alloc();
        if (!spec)
@@ -3230,14 +3705,18 @@ static int rbd_add_parse_args(const char *buf,
        spec->pool_name = dup_token(&buf, NULL);
        if (!spec->pool_name)
                goto out_mem;
-       if (!*spec->pool_name)
-               goto out_err;   /* Missing pool name */
+       if (!*spec->pool_name) {
+               rbd_warn(NULL, "no pool name provided");
+               goto out_err;
+       }
 
-       spec->image_name = dup_token(&buf, &spec->image_name_len);
+       spec->image_name = dup_token(&buf, NULL);
        if (!spec->image_name)
                goto out_mem;
-       if (!*spec->image_name)
-               goto out_err;   /* Missing image name */
+       if (!*spec->image_name) {
+               rbd_warn(NULL, "no image name provided");
+               goto out_err;
+       }
 
        /*
         * Snapshot name is optional; default is to use "-"
@@ -3251,10 +3730,9 @@ static int rbd_add_parse_args(const char *buf,
                ret = -ENAMETOOLONG;
                goto out_err;
        }
-       spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+       spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
        if (!spec->snap_name)
                goto out_mem;
-       memcpy(spec->snap_name, buf, len);
        *(spec->snap_name + len) = '\0';
 
        /* Initialize all rbd options to the defaults */
@@ -3323,7 +3801,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
         * First, see if the format 2 image id file exists, and if
         * so, get the image's persistent id from it.
         */
-       size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
+       size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
        object_name = kmalloc(size, GFP_NOIO);
        if (!object_name)
                return -ENOMEM;
@@ -3339,21 +3817,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
                goto out;
        }
 
-       ret = rbd_req_sync_exec(rbd_dev, object_name,
+       ret = rbd_obj_method_sync(rbd_dev, object_name,
                                "rbd", "get_id",
                                NULL, 0,
-                               response, RBD_IMAGE_ID_LEN_MAX,
-                               CEPH_OSD_FLAG_READ, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+                               response, RBD_IMAGE_ID_LEN_MAX, NULL);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
-       ret = 0;    /* rbd_req_sync_exec() can return positive */
 
        p = response;
        rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
                                                p + RBD_IMAGE_ID_LEN_MAX,
-                                               &rbd_dev->spec->image_id_len,
-                                               GFP_NOIO);
+                                               NULL, GFP_NOIO);
        if (IS_ERR(rbd_dev->spec->image_id)) {
                ret = PTR_ERR(rbd_dev->spec->image_id);
                rbd_dev->spec->image_id = NULL;
@@ -3377,11 +3852,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
        rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
        if (!rbd_dev->spec->image_id)
                return -ENOMEM;
-       rbd_dev->spec->image_id_len = 0;
 
        /* Record the header object name for this rbd image. */
 
-       size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
+       size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
        rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
        if (!rbd_dev->header_name) {
                ret = -ENOMEM;
@@ -3427,7 +3901,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
         * Image id was filled in by the caller.  Record the header
         * object name for this rbd image.
         */
-       size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
+       size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
        rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
        if (!rbd_dev->header_name)
                return -ENOMEM;
@@ -3542,7 +4016,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_bus;
 
-       ret = rbd_init_watch_dev(rbd_dev);
+       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
        if (ret)
                goto err_out_bus;
 
@@ -3638,6 +4112,13 @@ static ssize_t rbd_add(struct bus_type *bus,
                goto err_out_client;
        spec->pool_id = (u64) rc;
 
+       /* The ceph file layout needs to fit pool id in 32 bits */
+
+       if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
+               rc = -EIO;
+               goto err_out_client;
+       }
+
        rbd_dev = rbd_dev_create(rbdc, spec);
        if (!rbd_dev)
                goto err_out_client;
@@ -3691,15 +4172,8 @@ static void rbd_dev_release(struct device *dev)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       if (rbd_dev->watch_request) {
-               struct ceph_client *client = rbd_dev->rbd_client->client;
-
-               ceph_osdc_unregister_linger_request(&client->osdc,
-                                                   rbd_dev->watch_request);
-       }
        if (rbd_dev->watch_event)
-               rbd_req_sync_unwatch(rbd_dev);
-
+               rbd_dev_header_watch_sync(rbd_dev, 0);
 
        /* clean up and free blkdev */
        rbd_free_disk(rbd_dev);
@@ -3743,10 +4217,14 @@ static ssize_t rbd_remove(struct bus_type *bus,
                goto done;
        }
 
-       if (rbd_dev->open_count) {
+       spin_lock_irq(&rbd_dev->lock);
+       if (rbd_dev->open_count)
                ret = -EBUSY;
+       else
+               set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
+       spin_unlock_irq(&rbd_dev->lock);
+       if (ret < 0)
                goto done;
-       }
 
        rbd_remove_all_snaps(rbd_dev);
        rbd_bus_del_dev(rbd_dev);
@@ -3782,10 +4260,15 @@ static void rbd_sysfs_cleanup(void)
        device_unregister(&rbd_root_dev);
 }
 
-int __init rbd_init(void)
+static int __init rbd_init(void)
 {
        int rc;
 
+       if (!libceph_compatible(NULL)) {
+               rbd_warn(NULL, "libceph incompatibility (quitting)");
+
+               return -EINVAL;
+       }
        rc = rbd_sysfs_init();
        if (rc)
                return rc;
@@ -3793,7 +4276,7 @@ int __init rbd_init(void)
        return 0;
 }
 
-void __exit rbd_exit(void)
+static void __exit rbd_exit(void)
 {
        rbd_sysfs_cleanup();
 }