]> Pileus Git - ~andy/linux/blobdiff - net/ceph/osd_client.c
libceph: init sent and completed when starting
[~andy/linux] / net / ceph / osd_client.c
index f9276cb26aa2b4dad552ddc1222f79f60c9ee1c3..d5953b87918c072daaa1427187246f9f9cfcad3e 100644 (file)
@@ -1,3 +1,4 @@
+
 #include <linux/ceph/ceph_debug.h>
 
 #include <linux/module.h>
@@ -21,6 +22,8 @@
 #define OSD_OP_FRONT_LEN       4096
 #define OSD_OPREPLY_FRONT_LEN  512
 
+static struct kmem_cache       *ceph_osd_request_cache;
+
 static const struct ceph_connection_operations osd_con_ops;
 
 static void __send_queued(struct ceph_osd_client *osdc);
@@ -32,12 +35,6 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
-static int op_has_extent(int op)
-{
-       return (op == CEPH_OSD_OP_READ ||
-               op == CEPH_OSD_OP_WRITE);
-}
-
 /*
  * Implement client access to distributed object storage cluster.
  *
@@ -64,42 +61,237 @@ static int op_has_extent(int op)
  * fill osd op in request message.
  */
 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
-                      struct ceph_osd_req_op *op, u64 *bno)
+                       u64 *objnum, u64 *objoff, u64 *objlen)
 {
        u64 orig_len = *plen;
-       u64 objoff = 0;
-       u64 objlen = 0;
        int r;
 
        /* object extent? */
-       r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
-                                         &objoff, &objlen);
+       r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
+                                         objoff, objlen);
        if (r < 0)
                return r;
-       if (objlen < orig_len) {
-               *plen = objlen;
+       if (*objlen < orig_len) {
+               *plen = *objlen;
                dout(" skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
        }
 
-       if (op_has_extent(op->op)) {
-               u32 osize = le32_to_cpu(layout->fl_object_size);
-               op->extent.offset = objoff;
-               op->extent.length = objlen;
-               if (op->extent.truncate_size <= off - objoff) {
-                       op->extent.truncate_size = 0;
-               } else {
-                       op->extent.truncate_size -= off - objoff;
-                       if (op->extent.truncate_size > osize)
-                               op->extent.truncate_size = osize;
-               }
+       dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
+
+       return 0;
+}
+
+static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
+{
+       memset(osd_data, 0, sizeof (*osd_data));
+       osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
+}
+
+static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
+                       struct page **pages, u64 length, u32 alignment,
+                       bool pages_from_pool, bool own_pages)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
+       osd_data->pages = pages;
+       osd_data->length = length;
+       osd_data->alignment = alignment;
+       osd_data->pages_from_pool = pages_from_pool;
+       osd_data->own_pages = own_pages;
+}
+
+static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
+                       struct ceph_pagelist *pagelist)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
+       osd_data->pagelist = pagelist;
+}
+
+#ifdef CONFIG_BLOCK
+static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
+                       struct bio *bio, size_t bio_length)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
+       osd_data->bio = bio;
+       osd_data->bio_length = bio_length;
+}
+#endif /* CONFIG_BLOCK */
+
+#define osd_req_op_data(oreq, whch, typ, fld)  \
+       ({                                              \
+               BUG_ON(whch >= (oreq)->r_num_ops);      \
+               &(oreq)->r_ops[whch].typ.fld;           \
+       })
+
+static struct ceph_osd_data *
+osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
+{
+       BUG_ON(which >= osd_req->r_num_ops);
+
+       return &osd_req->r_ops[which].raw_data_in;
+}
+
+struct ceph_osd_data *
+osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
+                       unsigned int which)
+{
+       return osd_req_op_data(osd_req, which, extent, osd_data);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data);
+
+struct ceph_osd_data *
+osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
+                       unsigned int which)
+{
+       return osd_req_op_data(osd_req, which, cls, response_data);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data);   /* ??? */
+
+void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages,
+                       u64 length, u32 alignment,
+                       bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_raw_data_in(osd_req, which);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
+
+void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages,
+                       u64 length, u32 alignment,
+                       bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
+
+void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct ceph_pagelist *pagelist)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
+
+#ifdef CONFIG_BLOCK
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct bio *bio, size_t bio_length)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_bio_init(osd_data, bio, bio_length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
+#endif /* CONFIG_BLOCK */
+
+static void osd_req_op_cls_request_info_pagelist(
+                       struct ceph_osd_request *osd_req,
+                       unsigned int which, struct ceph_pagelist *pagelist)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_info);
+       ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+
+void osd_req_op_cls_request_data_pagelist(
+                       struct ceph_osd_request *osd_req,
+                       unsigned int which, struct ceph_pagelist *pagelist)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+       ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
+
+void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages, u64 length,
+                       u32 alignment, bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
+
+void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages, u64 length,
+                       u32 alignment, bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, response_data);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
+
+static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
+{
+       switch (osd_data->type) {
+       case CEPH_OSD_DATA_TYPE_NONE:
+               return 0;
+       case CEPH_OSD_DATA_TYPE_PAGES:
+               return osd_data->length;
+       case CEPH_OSD_DATA_TYPE_PAGELIST:
+               return (u64)osd_data->pagelist->length;
+#ifdef CONFIG_BLOCK
+       case CEPH_OSD_DATA_TYPE_BIO:
+               return (u64)osd_data->bio_length;
+#endif /* CONFIG_BLOCK */
+       default:
+               WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
+               return 0;
        }
-       if (op->op == CEPH_OSD_OP_WRITE)
-               op->payload_len = *plen;
+}
 
-       dout("calc_layout bno=%llx %llu~%llu\n", *bno, objoff, objlen);
+static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
+{
+       if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
+               int num_pages;
 
-       return 0;
+               num_pages = calc_pages_for((u64)osd_data->alignment,
+                                               (u64)osd_data->length);
+               ceph_release_page_vector(osd_data->pages, num_pages);
+       }
+       ceph_osd_data_init(osd_data);
+}
+
+static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
+                       unsigned int which)
+{
+       struct ceph_osd_req_op *op;
+
+       BUG_ON(which >= osd_req->r_num_ops);
+       op = &osd_req->r_ops[which];
+
+       switch (op->op) {
+       case CEPH_OSD_OP_READ:
+       case CEPH_OSD_OP_WRITE:
+               ceph_osd_data_release(&op->extent.osd_data);
+               break;
+       case CEPH_OSD_OP_CALL:
+               ceph_osd_data_release(&op->cls.request_info);
+               ceph_osd_data_release(&op->cls.request_data);
+               ceph_osd_data_release(&op->cls.response_data);
+               break;
+       default:
+               break;
+       }
 }
 
 /*
@@ -107,41 +299,26 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
  */
 void ceph_osdc_release_request(struct kref *kref)
 {
-       int num_pages;
-       struct ceph_osd_request *req = container_of(kref,
-                                                   struct ceph_osd_request,
-                                                   r_kref);
+       struct ceph_osd_request *req;
+       unsigned int which;
 
+       req = container_of(kref, struct ceph_osd_request, r_kref);
        if (req->r_request)
                ceph_msg_put(req->r_request);
-       if (req->r_con_filling_msg) {
-               dout("%s revoking msg %p from con %p\n", __func__,
-                    req->r_reply, req->r_con_filling_msg);
+       if (req->r_reply) {
                ceph_msg_revoke_incoming(req->r_reply);
-               req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
-               req->r_con_filling_msg = NULL;
-       }
-       if (req->r_reply)
                ceph_msg_put(req->r_reply);
-
-       if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES &&
-                       req->r_data_in.own_pages) {
-               num_pages = calc_pages_for((u64)req->r_data_in.alignment,
-                                               (u64)req->r_data_in.length);
-               ceph_release_page_vector(req->r_data_in.pages, num_pages);
-       }
-       if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES &&
-                       req->r_data_out.own_pages) {
-               num_pages = calc_pages_for((u64)req->r_data_out.alignment,
-                                               (u64)req->r_data_out.length);
-               ceph_release_page_vector(req->r_data_out.pages, num_pages);
        }
 
+       for (which = 0; which < req->r_num_ops; which++)
+               osd_req_op_data_release(req, which);
+
        ceph_put_snap_context(req->r_snapc);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else
-               kfree(req);
+               kmem_cache_free(ceph_osd_request_cache, req);
+
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
@@ -155,6 +332,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        struct ceph_msg *msg;
        size_t msg_size;
 
+       BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
+       BUG_ON(num_ops > CEPH_OSD_MAX_OP);
+
        msg_size = 4 + 4 + 8 + 8 + 4+8;
        msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
        msg_size += 1 + 8 + 4 + 4;     /* pg_t */
@@ -169,13 +349,14 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                req = mempool_alloc(osdc->req_mempool, gfp_flags);
                memset(req, 0, sizeof(*req));
        } else {
-               req = kzalloc(sizeof(*req), gfp_flags);
+               req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
        }
        if (req == NULL)
                return NULL;
 
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
+       req->r_num_ops = num_ops;
 
        kref_init(&req->r_kref);
        init_completion(&req->r_completion);
@@ -199,9 +380,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        }
        req->r_reply = msg;
 
-       req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE;
-       req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE;
-
        /* create request message; allow space for oid */
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -220,70 +398,24 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_alloc_request);
 
-static u64 osd_req_encode_op(struct ceph_osd_request *req,
-                             struct ceph_osd_op *dst,
-                             struct ceph_osd_req_op *src)
+static bool osd_req_opcode_valid(u16 opcode)
 {
-       u64 out_data_len = 0;
-       struct ceph_pagelist *pagelist;
-
-       dst->op = cpu_to_le16(src->op);
-
-       switch (src->op) {
-       case CEPH_OSD_OP_STAT:
-               break;
+       switch (opcode) {
        case CEPH_OSD_OP_READ:
-       case CEPH_OSD_OP_WRITE:
-               if (src->op == CEPH_OSD_OP_WRITE)
-                       out_data_len = src->extent.length;
-               dst->extent.offset = cpu_to_le64(src->extent.offset);
-               dst->extent.length = cpu_to_le64(src->extent.length);
-               dst->extent.truncate_size =
-                       cpu_to_le64(src->extent.truncate_size);
-               dst->extent.truncate_seq =
-                       cpu_to_le32(src->extent.truncate_seq);
-               break;
-       case CEPH_OSD_OP_CALL:
-               pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
-               BUG_ON(!pagelist);
-               ceph_pagelist_init(pagelist);
-
-               dst->cls.class_len = src->cls.class_len;
-               dst->cls.method_len = src->cls.method_len;
-               dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
-               ceph_pagelist_append(pagelist, src->cls.class_name,
-                                    src->cls.class_len);
-               ceph_pagelist_append(pagelist, src->cls.method_name,
-                                    src->cls.method_len);
-               ceph_pagelist_append(pagelist, src->cls.indata,
-                                    src->cls.indata_len);
-
-               req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST;
-               req->r_data_out.pagelist = pagelist;
-               out_data_len = pagelist->length;
-               break;
-       case CEPH_OSD_OP_STARTSYNC:
-               break;
-       case CEPH_OSD_OP_NOTIFY_ACK:
-       case CEPH_OSD_OP_WATCH:
-               dst->watch.cookie = cpu_to_le64(src->watch.cookie);
-               dst->watch.ver = cpu_to_le64(src->watch.ver);
-               dst->watch.flag = src->watch.flag;
-               break;
-       default:
-               pr_err("unrecognized osd opcode %d\n", src->op);
-               WARN_ON(1);
-               break;
+       case CEPH_OSD_OP_STAT:
        case CEPH_OSD_OP_MAPEXT:
        case CEPH_OSD_OP_MASKTRUNC:
        case CEPH_OSD_OP_SPARSE_READ:
        case CEPH_OSD_OP_NOTIFY:
+       case CEPH_OSD_OP_NOTIFY_ACK:
        case CEPH_OSD_OP_ASSERT_VER:
+       case CEPH_OSD_OP_WRITE:
        case CEPH_OSD_OP_WRITEFULL:
        case CEPH_OSD_OP_TRUNCATE:
        case CEPH_OSD_OP_ZERO:
        case CEPH_OSD_OP_DELETE:
        case CEPH_OSD_OP_APPEND:
+       case CEPH_OSD_OP_STARTSYNC:
        case CEPH_OSD_OP_SETTRUNC:
        case CEPH_OSD_OP_TRIMTRUNC:
        case CEPH_OSD_OP_TMAPUP:
@@ -291,11 +423,11 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
        case CEPH_OSD_OP_TMAPGET:
        case CEPH_OSD_OP_CREATE:
        case CEPH_OSD_OP_ROLLBACK:
+       case CEPH_OSD_OP_WATCH:
        case CEPH_OSD_OP_OMAPGETKEYS:
        case CEPH_OSD_OP_OMAPGETVALS:
        case CEPH_OSD_OP_OMAPGETHEADER:
        case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
-       case CEPH_OSD_OP_MODE_RD:
        case CEPH_OSD_OP_OMAPSETVALS:
        case CEPH_OSD_OP_OMAPSETHEADER:
        case CEPH_OSD_OP_OMAPCLEAR:
@@ -326,113 +458,233 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
        case CEPH_OSD_OP_RDUNLOCK:
        case CEPH_OSD_OP_UPLOCK:
        case CEPH_OSD_OP_DNLOCK:
+       case CEPH_OSD_OP_CALL:
        case CEPH_OSD_OP_PGLS:
        case CEPH_OSD_OP_PGLS_FILTER:
-               pr_err("unsupported osd opcode %s\n",
-                       ceph_osd_op_name(src->op));
-               WARN_ON(1);
-               break;
+               return true;
+       default:
+               return false;
        }
-       dst->payload_len = cpu_to_le32(src->payload_len);
-
-       return out_data_len;
 }
 
 /*
- * build new request AND message
- *
+ * This is an osd op init function for opcodes that have no data or
+ * other information associated with them.  It also serves as a
+ * common init routine for all the other init functions, below.
  */
-void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, unsigned int num_ops,
-                            struct ceph_osd_req_op *src_ops,
-                            struct ceph_snap_context *snapc, u64 snap_id,
-                            struct timespec *mtime)
+static struct ceph_osd_req_op *
+_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+                               u16 opcode)
 {
-       struct ceph_msg *msg = req->r_request;
-       struct ceph_osd_req_op *src_op;
-       void *p;
-       size_t msg_size;
-       int flags = req->r_flags;
-       u64 data_len;
-       int i;
+       struct ceph_osd_req_op *op;
 
-       req->r_num_ops = num_ops;
-       req->r_snapid = snap_id;
-       req->r_snapc = ceph_get_snap_context(snapc);
+       BUG_ON(which >= osd_req->r_num_ops);
+       BUG_ON(!osd_req_opcode_valid(opcode));
 
-       /* encode request */
-       msg->hdr.version = cpu_to_le16(4);
+       op = &osd_req->r_ops[which];
+       memset(op, 0, sizeof (*op));
+       op->op = opcode;
 
-       p = msg->front.iov_base;
-       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
-       req->r_request_osdmap_epoch = p;
-       p += 4;
-       req->r_request_flags = p;
-       p += 4;
-       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
-               ceph_encode_timespec(p, mtime);
-       p += sizeof(struct ceph_timespec);
-       req->r_request_reassert_version = p;
-       p += sizeof(struct ceph_eversion); /* will get filled in */
+       return op;
+}
 
-       /* oloc */
-       ceph_encode_8(&p, 4);
-       ceph_encode_8(&p, 4);
-       ceph_encode_32(&p, 8 + 4 + 4);
-       req->r_request_pool = p;
-       p += 8;
-       ceph_encode_32(&p, -1);  /* preferred */
-       ceph_encode_32(&p, 0);   /* key len */
+void osd_req_op_init(struct ceph_osd_request *osd_req,
+                               unsigned int which, u16 opcode)
+{
+       (void)_osd_req_op_init(osd_req, which, opcode);
+}
+EXPORT_SYMBOL(osd_req_op_init);
 
-       ceph_encode_8(&p, 1);
-       req->r_request_pgid = p;
-       p += 8 + 4;
-       ceph_encode_32(&p, -1);  /* preferred */
+void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
+                               unsigned int which, u16 opcode,
+                               u64 offset, u64 length,
+                               u64 truncate_size, u32 truncate_seq)
+{
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+       size_t payload_len = 0;
 
-       /* oid */
-       ceph_encode_32(&p, req->r_oid_len);
-       memcpy(p, req->r_oid, req->r_oid_len);
-       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
-       p += req->r_oid_len;
+       BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
 
-       /* ops--can imply data */
-       ceph_encode_16(&p, num_ops);
-       src_op = src_ops;
-       req->r_request_ops = p;
-       data_len = 0;
-       for (i = 0; i < num_ops; i++, src_op++) {
-               data_len += osd_req_encode_op(req, p, src_op);
-               p += sizeof(struct ceph_osd_op);
-       }
+       op->extent.offset = offset;
+       op->extent.length = length;
+       op->extent.truncate_size = truncate_size;
+       op->extent.truncate_seq = truncate_seq;
+       if (opcode == CEPH_OSD_OP_WRITE)
+               payload_len += length;
 
-       /* snaps */
-       ceph_encode_64(&p, req->r_snapid);
-       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
-       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
-       if (req->r_snapc) {
-               for (i = 0; i < snapc->num_snaps; i++) {
-                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
-               }
+       op->payload_len = payload_len;
+}
+EXPORT_SYMBOL(osd_req_op_extent_init);
+
+void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
+                               unsigned int which, u64 length)
+{
+       struct ceph_osd_req_op *op;
+       u64 previous;
+
+       BUG_ON(which >= osd_req->r_num_ops);
+       op = &osd_req->r_ops[which];
+       previous = op->extent.length;
+
+       if (length == previous)
+               return;         /* Nothing to do */
+       BUG_ON(length > previous);
+
+       op->extent.length = length;
+       op->payload_len -= previous - length;
+}
+EXPORT_SYMBOL(osd_req_op_extent_update);
+
+void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
+                       u16 opcode, const char *class, const char *method)
+{
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+       struct ceph_pagelist *pagelist;
+       size_t payload_len = 0;
+       size_t size;
+
+       BUG_ON(opcode != CEPH_OSD_OP_CALL);
+
+       pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+       BUG_ON(!pagelist);
+       ceph_pagelist_init(pagelist);
+
+       op->cls.class_name = class;
+       size = strlen(class);
+       BUG_ON(size > (size_t) U8_MAX);
+       op->cls.class_len = size;
+       ceph_pagelist_append(pagelist, class, size);
+       payload_len += size;
+
+       op->cls.method_name = method;
+       size = strlen(method);
+       BUG_ON(size > (size_t) U8_MAX);
+       op->cls.method_len = size;
+       ceph_pagelist_append(pagelist, method, size);
+       payload_len += size;
+
+       osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
+
+       op->cls.argc = 0;       /* currently unused */
+
+       op->payload_len = payload_len;
+}
+EXPORT_SYMBOL(osd_req_op_cls_init);
+
+void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
+                               unsigned int which, u16 opcode,
+                               u64 cookie, u64 version, int flag)
+{
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+
+       BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
+
+       op->watch.cookie = cookie;
+       op->watch.ver = version;
+       if (opcode == CEPH_OSD_OP_WATCH && flag)
+               op->watch.flag = (u8)1;
+}
+EXPORT_SYMBOL(osd_req_op_watch_init);
+
+static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
+                               struct ceph_osd_data *osd_data)
+{
+       u64 length = ceph_osd_data_length(osd_data);
+
+       if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+               BUG_ON(length > (u64) SIZE_MAX);
+               if (length)
+                       ceph_msg_data_add_pages(msg, osd_data->pages,
+                                       length, osd_data->alignment);
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+               BUG_ON(!length);
+               ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
+#ifdef CONFIG_BLOCK
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+               ceph_msg_data_add_bio(msg, osd_data->bio, length);
+#endif
+       } else {
+               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
        }
+}
 
-       req->r_request_attempts = p;
-       p += 4;
+static u64 osd_req_encode_op(struct ceph_osd_request *req,
+                             struct ceph_osd_op *dst, unsigned int which)
+{
+       struct ceph_osd_req_op *src;
+       struct ceph_osd_data *osd_data;
+       u64 request_data_len = 0;
+       u64 data_length;
 
-       /* data */
-       if (flags & CEPH_OSD_FLAG_WRITE)
-               req->r_request->hdr.data_off = cpu_to_le16(off);
-       req->r_request->hdr.data_len = cpu_to_le32(data_len);
+       BUG_ON(which >= req->r_num_ops);
+       src = &req->r_ops[which];
+       if (WARN_ON(!osd_req_opcode_valid(src->op))) {
+               pr_err("unrecognized osd opcode %d\n", src->op);
 
-       BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
-       msg_size = p - msg->front.iov_base;
-       msg->front.iov_len = msg_size;
-       msg->hdr.front_len = cpu_to_le32(msg_size);
+               return 0;
+       }
 
-       dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
-            num_ops);
-       return;
+       switch (src->op) {
+       case CEPH_OSD_OP_STAT:
+               osd_data = &src->raw_data_in;
+               ceph_osdc_msg_data_add(req->r_reply, osd_data);
+               break;
+       case CEPH_OSD_OP_READ:
+       case CEPH_OSD_OP_WRITE:
+               if (src->op == CEPH_OSD_OP_WRITE)
+                       request_data_len = src->extent.length;
+               dst->extent.offset = cpu_to_le64(src->extent.offset);
+               dst->extent.length = cpu_to_le64(src->extent.length);
+               dst->extent.truncate_size =
+                       cpu_to_le64(src->extent.truncate_size);
+               dst->extent.truncate_seq =
+                       cpu_to_le32(src->extent.truncate_seq);
+               osd_data = &src->extent.osd_data;
+               if (src->op == CEPH_OSD_OP_WRITE)
+                       ceph_osdc_msg_data_add(req->r_request, osd_data);
+               else
+                       ceph_osdc_msg_data_add(req->r_reply, osd_data);
+               break;
+       case CEPH_OSD_OP_CALL:
+               dst->cls.class_len = src->cls.class_len;
+               dst->cls.method_len = src->cls.method_len;
+               osd_data = &src->cls.request_info;
+               ceph_osdc_msg_data_add(req->r_request, osd_data);
+               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+               request_data_len = osd_data->pagelist->length;
+
+               osd_data = &src->cls.request_data;
+               data_length = ceph_osd_data_length(osd_data);
+               if (data_length) {
+                       BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+                       dst->cls.indata_len = cpu_to_le32(data_length);
+                       ceph_osdc_msg_data_add(req->r_request, osd_data);
+                       src->payload_len += data_length;
+                       request_data_len += data_length;
+               }
+               osd_data = &src->cls.response_data;
+               ceph_osdc_msg_data_add(req->r_reply, osd_data);
+               break;
+       case CEPH_OSD_OP_STARTSYNC:
+               break;
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               dst->watch.cookie = cpu_to_le64(src->watch.cookie);
+               dst->watch.ver = cpu_to_le64(src->watch.ver);
+               dst->watch.flag = src->watch.flag;
+               break;
+       default:
+               pr_err("unsupported osd opcode %s\n",
+                       ceph_osd_op_name(src->op));
+               WARN_ON(1);
+
+               return 0;
+       }
+       dst->op = cpu_to_le16(src->op);
+       dst->payload_len = cpu_to_le32(src->payload_len);
+
+       return request_data_len;
 }
-EXPORT_SYMBOL(ceph_osdc_build_request);
 
 /*
  * build new request AND message, calculate layout, and adjust file
@@ -448,52 +700,64 @@ EXPORT_SYMBOL(ceph_osdc_build_request);
 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               struct ceph_file_layout *layout,
                                               struct ceph_vino vino,
-                                              u64 off, u64 *plen,
+                                              u64 off, u64 *plen, int num_ops,
                                               int opcode, int flags,
                                               struct ceph_snap_context *snapc,
-                                              int do_sync,
                                               u32 truncate_seq,
                                               u64 truncate_size,
-                                              struct timespec *mtime,
                                               bool use_mempool)
 {
-       struct ceph_osd_req_op ops[2];
        struct ceph_osd_request *req;
-       unsigned int num_op = 1;
-       u64 bno = 0;
+       u64 objnum = 0;
+       u64 objoff = 0;
+       u64 objlen = 0;
+       u32 object_size;
+       u64 object_base;
        int r;
 
-       memset(&ops, 0, sizeof ops);
-
-       ops[0].op = opcode;
-       ops[0].extent.truncate_seq = truncate_seq;
-       ops[0].extent.truncate_size = truncate_size;
+       BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
 
-       if (do_sync) {
-               ops[1].op = CEPH_OSD_OP_STARTSYNC;
-               num_op++;
-       }
-
-       req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+       req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
                                        GFP_NOFS);
        if (!req)
                return ERR_PTR(-ENOMEM);
+
        req->r_flags = flags;
 
        /* calculate max write size */
-       r = calc_layout(layout, off, plen, ops, &bno);
+       r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
        if (r < 0) {
                ceph_osdc_put_request(req);
                return ERR_PTR(r);
        }
+
+       object_size = le32_to_cpu(layout->fl_object_size);
+       object_base = off - objoff;
+       if (truncate_size <= object_base) {
+               truncate_size = 0;
+       } else {
+               truncate_size -= object_base;
+               if (truncate_size > object_size)
+                       truncate_size = object_size;
+       }
+
+       osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
+                               truncate_size, truncate_seq);
+
+       /*
+        * A second op in the ops array means the caller wants to
+        * also issue a include a 'startsync' command so that the
+        * osd will flush data quickly.
+        */
+       if (num_ops > 1)
+               osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
        req->r_file_layout = *layout;  /* keep a copy */
 
-       snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
+       snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
+               vino.ino, objnum);
        req->r_oid_len = strlen(req->r_oid);
 
-       ceph_osdc_build_request(req, off, num_op, ops,
-                               snapc, vino.snap, mtime);
-
        return req;
 }
 EXPORT_SYMBOL(ceph_osdc_new_request);
@@ -570,21 +834,46 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
                                struct ceph_osd *osd)
 {
        struct ceph_osd_request *req, *nreq;
+       LIST_HEAD(resend);
        int err;
 
        dout("__kick_osd_requests osd%d\n", osd->o_osd);
        err = __reset_osd(osdc, osd);
        if (err)
                return;
-
+       /*
+        * Build up a list of requests to resend by traversing the
+        * osd's list of requests.  Requests for a given object are
+        * sent in tid order, and that is also the order they're
+        * kept on this list.  Therefore all requests that are in
+        * flight will be found first, followed by all requests that
+        * have not yet been sent.  And to resend requests while
+        * preserving this order we will want to put any sent
+        * requests back on the front of the osd client's unsent
+        * list.
+        *
+        * So we build a separate ordered list of already-sent
+        * requests for the affected osd and splice it onto the
+        * front of the osd client's unsent list.  Once we've seen a
+        * request that has not yet been sent we're done.  Those
+        * requests are already sitting right where they belong.
+        */
        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
-               list_move(&req->r_req_lru_item, &osdc->req_unsent);
-               dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
+               if (!req->r_sent)
+                       break;
+               list_move_tail(&req->r_req_lru_item, &resend);
+               dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
                     osd->o_osd);
                if (!req->r_linger)
                        req->r_flags |= CEPH_OSD_FLAG_RETRY;
        }
+       list_splice(&resend, &osdc->req_unsent);
 
+       /*
+        * Linger requests are re-registered before sending, which
+        * sets up a new tid for each.  We add them to the unsent
+        * list at the end to keep things in tid order.
+        */
        list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
                                 r_linger_osd) {
                /*
@@ -593,8 +882,8 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
                 */
                BUG_ON(!list_empty(&req->r_req_lru_item));
                __register_request(osdc, req);
-               list_add(&req->r_req_lru_item, &osdc->req_unsent);
-               list_add(&req->r_osd_item, &req->r_osd->o_requests);
+               list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
+               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
                __unregister_linger_request(osdc, req);
                dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
                     osd->o_osd);
@@ -915,6 +1204,7 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
        mutex_lock(&osdc->request_mutex);
        if (req->r_linger) {
                __unregister_linger_request(osdc, req);
+               req->r_linger = 0;
                ceph_osdc_put_request(req);
        }
        mutex_unlock(&osdc->request_mutex);
@@ -1010,10 +1300,10 @@ static int __map_request(struct ceph_osd_client *osdc,
 
        if (req->r_osd) {
                __remove_osd_from_lru(req->r_osd);
-               list_add(&req->r_osd_item, &req->r_osd->o_requests);
-               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
+               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
        } else {
-               list_move(&req->r_req_lru_item, &osdc->req_notarget);
+               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
        }
        err = 1;   /* osd or pg changed */
 
@@ -1048,8 +1338,14 @@ static void __send_request(struct ceph_osd_client *osdc,
        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
-       ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+       /* Mark the request unsafe if this is the first timet's being sent. */
+
+       if (!req->r_sent && req->r_unsafe_callback)
+               req->r_unsafe_callback(req, true);
        req->r_sent = req->r_osd->o_incarnation;
+
+       ceph_con_send(&req->r_osd->o_con, req->r_request);
 }
 
 /*
@@ -1137,31 +1433,11 @@ static void handle_osds_timeout(struct work_struct *work)
 
 static void complete_request(struct ceph_osd_request *req)
 {
-       if (req->r_safe_callback)
-               req->r_safe_callback(req, NULL);
+       if (req->r_unsafe_callback)
+               req->r_unsafe_callback(req, false);
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }
 
-static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
-{
-       __u8 v;
-
-       ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
-       v = ceph_decode_8(p);
-       if (v > 1) {
-               pr_warning("do not understand pg encoding %d > 1", v);
-               return -EINVAL;
-       }
-       pgid->pool = ceph_decode_64(p);
-       pgid->seed = ceph_decode_32(p);
-       *p += 4;
-       return 0;
-
-bad:
-       pr_warning("incomplete pg encoding");
-       return -EINVAL;
-}
-
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
@@ -1173,7 +1449,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        struct ceph_osd_request *req;
        u64 tid;
        int object_len;
-       int numops, payload_len, flags;
+       unsigned int numops;
+       int payload_len, flags;
        s32 result;
        s32 retry_attempt;
        struct ceph_pg pg;
@@ -1182,7 +1459,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        u64 reassert_version;
        u32 osdmap_epoch;
        int already_completed;
-       int i;
+       u32 bytes;
+       unsigned int i;
 
        tid = le64_to_cpu(msg->hdr.tid);
        dout("handle_reply %p tid %llu\n", msg, tid);
@@ -1195,7 +1473,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        ceph_decode_need(&p, end, object_len, bad);
        p += object_len;
 
-       err = __decode_pgid(&p, end, &pg);
+       err = ceph_decode_pgid(&p, end, &pg);
        if (err)
                goto bad;
 
@@ -1211,8 +1489,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        req = __lookup_request(osdc, tid);
        if (req == NULL) {
                dout("handle_reply tid %llu dne\n", tid);
-               mutex_unlock(&osdc->request_mutex);
-               return;
+               goto bad_mutex;
        }
        ceph_osdc_get_request(req);
 
@@ -1237,9 +1514,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                payload_len += len;
                p += sizeof(*op);
        }
-       if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
+       bytes = le32_to_cpu(msg->hdr.data_len);
+       if (payload_len != bytes) {
                pr_warning("sum of op payload lens %d != data_len %d",
-                          payload_len, le32_to_cpu(msg->hdr.data_len));
+                          payload_len, bytes);
                goto bad_put;
        }
 
@@ -1248,21 +1526,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
 
-       /*
-        * if this connection filled our message, drop our reference now, to
-        * avoid a (safe but slower) revoke later.
-        */
-       if (req->r_con_filling_msg == con && req->r_reply == msg) {
-               dout(" dropping con_filling_msg ref %p\n", con);
-               req->r_con_filling_msg = NULL;
-               con->ops->put(con);
-       }
-
        if (!req->r_got_reply) {
-               unsigned int bytes;
 
                req->r_result = result;
-               bytes = le32_to_cpu(msg->hdr.data_len);
                dout("handle_reply result %d bytes %d\n", req->r_result,
                     bytes);
                if (req->r_result == 0)
@@ -1311,6 +1577,8 @@ done:
 
 bad_put:
        ceph_osdc_put_request(req);
+bad_mutex:
+       mutex_unlock(&osdc->request_mutex);
 bad:
        pr_err("corrupt osd_op_reply got %d %d\n",
               (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
@@ -1743,25 +2011,103 @@ bad:
        return;
 }
 
-static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
-                               struct ceph_osd_data *osd_data)
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
+                               struct ceph_snap_context *snapc, u64 snap_id,
+                               struct timespec *mtime)
 {
-       if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
-               BUG_ON(osd_data->length > (u64) SIZE_MAX);
-               if (osd_data->length)
-                       ceph_msg_data_set_pages(msg, osd_data->pages,
-                               osd_data->length, osd_data->alignment);
-       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
-               BUG_ON(!osd_data->pagelist->length);
-               ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
-#ifdef CONFIG_BLOCK
-       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
-               ceph_msg_data_set_bio(msg, osd_data->bio);
-#endif
-       } else {
-               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+       struct ceph_msg *msg = req->r_request;
+       void *p;
+       size_t msg_size;
+       int flags = req->r_flags;
+       u64 data_len;
+       unsigned int i;
+
+       req->r_snapid = snap_id;
+       req->r_snapc = ceph_get_snap_context(snapc);
+
+       /* encode request */
+       msg->hdr.version = cpu_to_le16(4);
+
+       p = msg->front.iov_base;
+       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
+       req->r_request_osdmap_epoch = p;
+       p += 4;
+       req->r_request_flags = p;
+       p += 4;
+       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+               ceph_encode_timespec(p, mtime);
+       p += sizeof(struct ceph_timespec);
+       req->r_request_reassert_version = p;
+       p += sizeof(struct ceph_eversion); /* will get filled in */
+
+       /* oloc */
+       ceph_encode_8(&p, 4);
+       ceph_encode_8(&p, 4);
+       ceph_encode_32(&p, 8 + 4 + 4);
+       req->r_request_pool = p;
+       p += 8;
+       ceph_encode_32(&p, -1);  /* preferred */
+       ceph_encode_32(&p, 0);   /* key len */
+
+       ceph_encode_8(&p, 1);
+       req->r_request_pgid = p;
+       p += 8 + 4;
+       ceph_encode_32(&p, -1);  /* preferred */
+
+       /* oid */
+       ceph_encode_32(&p, req->r_oid_len);
+       memcpy(p, req->r_oid, req->r_oid_len);
+       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
+       p += req->r_oid_len;
+
+       /* ops--can imply data */
+       ceph_encode_16(&p, (u16)req->r_num_ops);
+       data_len = 0;
+       for (i = 0; i < req->r_num_ops; i++) {
+               data_len += osd_req_encode_op(req, p, i);
+               p += sizeof(struct ceph_osd_op);
+       }
+
+       /* snaps */
+       ceph_encode_64(&p, req->r_snapid);
+       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+       if (req->r_snapc) {
+               for (i = 0; i < snapc->num_snaps; i++) {
+                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
+               }
        }
+
+       req->r_request_attempts = p;
+       p += 4;
+
+       /* data */
+       if (flags & CEPH_OSD_FLAG_WRITE) {
+               u16 data_off;
+
+               /*
+                * The header "data_off" is a hint to the receiver
+                * allowing it to align received data into its
+                * buffers such that there's no need to re-copy
+                * it before writing it to disk (direct I/O).
+                */
+               data_off = (u16) (off & 0xffff);
+               req->r_request->hdr.data_off = cpu_to_le16(data_off);
+       }
+       req->r_request->hdr.data_len = cpu_to_le32(data_len);
+
+       BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+       msg_size = p - msg->front.iov_base;
+       msg->front.iov_len = msg_size;
+       msg->hdr.front_len = cpu_to_le32(msg_size);
+
+       dout("build_request msg_size was %d\n", (int)msg_size);
 }
+EXPORT_SYMBOL(ceph_osdc_build_request);
 
 /*
  * Register request, send initial attempt.
@@ -1772,38 +2118,28 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 {
        int rc = 0;
 
-       /* Set up response incoming data and request outgoing data fields */
-
-       ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in);
-       ceph_osdc_msg_data_set(req->r_request, &req->r_data_out);
-
        down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
-       /*
-        * a racing kick_requests() may have sent the message for us
-        * while we dropped request_mutex above, so only send now if
-        * the request still han't been touched yet.
-        */
        __register_request(osdc, req);
-       if (req->r_sent == 0) {
-               rc = __map_request(osdc, req, 0);
-               if (rc < 0) {
-                       if (nofail) {
-                               dout("osdc_start_request failed map, "
-                                    " will retry %lld\n", req->r_tid);
-                               rc = 0;
-                       }
-                       goto out_unlock;
-               }
-               if (req->r_osd == NULL) {
-                       dout("send_request %p no up osds in pg\n", req);
-                       ceph_monc_request_next_osdmap(&osdc->client->monc);
-               } else {
-                       __send_request(osdc, req);
+       req->r_sent = 0;
+       req->r_got_reply = 0;
+       req->r_completed = 0;
+       rc = __map_request(osdc, req, 0);
+       if (rc < 0) {
+               if (nofail) {
+                       dout("osdc_start_request failed map, "
+                               " will retry %lld\n", req->r_tid);
+                       rc = 0;
                }
-               rc = 0;
+               goto out_unlock;
        }
-
+       if (req->r_osd == NULL) {
+               dout("send_request %p no up osds in pg\n", req);
+               ceph_monc_request_next_osdmap(&osdc->client->monc);
+       } else {
+               __send_queued(osdc);
+       }
+       rc = 0;
 out_unlock:
        mutex_unlock(&osdc->request_mutex);
        up_read(&osdc->map_sem);
@@ -1961,28 +2297,26 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct page **pages, int num_pages, int page_align)
 {
        struct ceph_osd_request *req;
-       struct ceph_osd_data *osd_data;
        int rc = 0;
 
        dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
             vino.snap, off, *plen);
-       req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
+       req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
-                                   NULL, 0, truncate_seq, truncate_size, NULL,
+                                   NULL, truncate_seq, truncate_size,
                                    false);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
        /* it may be a short read due to an object boundary */
 
-       osd_data = &req->r_data_in;
-       osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
-       osd_data->pages = pages;
-       osd_data->length = *plen;
-       osd_data->alignment = page_align;
+       osd_req_op_extent_osd_data_pages(req, 0,
+                               pages, *plen, page_align, false, false);
 
        dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
-            off, *plen, osd_data->length, page_align);
+            off, *plen, *plen, page_align);
+
+       ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
 
        rc = ceph_osdc_start_request(osdc, req, false);
        if (!rc)
@@ -2006,27 +2340,24 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct page **pages, int num_pages)
 {
        struct ceph_osd_request *req;
-       struct ceph_osd_data *osd_data;
        int rc = 0;
        int page_align = off & ~PAGE_MASK;
 
-       BUG_ON(vino.snap != CEPH_NOSNAP);
-       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+       BUG_ON(vino.snap != CEPH_NOSNAP);       /* snapshots aren't writeable */
+       req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
                                    CEPH_OSD_OP_WRITE,
                                    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-                                   snapc, 0,
-                                   truncate_seq, truncate_size, mtime,
+                                   snapc, truncate_seq, truncate_size,
                                    true);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
        /* it may be a short write due to an object boundary */
-       osd_data = &req->r_data_out;
-       osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
-       osd_data->pages = pages;
-       osd_data->length = len;
-       osd_data->alignment = page_align;
-       dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length);
+       osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+                               false, false);
+       dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
+
+       ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
 
        rc = ceph_osdc_start_request(osdc, req, true);
        if (!rc)
@@ -2040,6 +2371,26 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
+int ceph_osdc_setup(void)
+{
+       BUG_ON(ceph_osd_request_cache);
+       ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
+                                       sizeof (struct ceph_osd_request),
+                                       __alignof__(struct ceph_osd_request),
+                                       0, NULL);
+
+       return ceph_osd_request_cache ? 0 : -ENOMEM;
+}
+EXPORT_SYMBOL(ceph_osdc_setup);
+
+void ceph_osdc_cleanup(void)
+{
+       BUG_ON(!ceph_osd_request_cache);
+       kmem_cache_destroy(ceph_osd_request_cache);
+       ceph_osd_request_cache = NULL;
+}
+EXPORT_SYMBOL(ceph_osdc_cleanup);
+
 /*
  * handle incoming message
  */
@@ -2099,13 +2450,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                goto out;
        }
 
-       if (req->r_con_filling_msg) {
+       if (req->r_reply->con)
                dout("%s revoking msg %p from old con %p\n", __func__,
-                    req->r_reply, req->r_con_filling_msg);
-               ceph_msg_revoke_incoming(req->r_reply);
-               req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
-               req->r_con_filling_msg = NULL;
-       }
+                    req->r_reply, req->r_reply->con);
+       ceph_msg_revoke_incoming(req->r_reply);
 
        if (front > req->r_reply->front.iov_len) {
                pr_warning("get_reply front %d > preallocated %d\n",
@@ -2119,8 +2467,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        m = ceph_msg_get(req->r_reply);
 
        if (data_len > 0) {
-               struct ceph_osd_data *osd_data = &req->r_data_in;
+               struct ceph_osd_data *osd_data;
 
+               /*
+                * XXX This is assuming there is only one op containing
+                * XXX page data.  Probably OK for reads, but this
+                * XXX ought to be done more generally.
+                */
+               osd_data = osd_req_op_extent_osd_data(req, 0);
                if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
                        if (osd_data->pages &&
                                unlikely(osd_data->length < data_len)) {
@@ -2136,7 +2490,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                }
        }
        *skip = 0;
-       req->r_con_filling_msg = con->ops->get(con);
        dout("get_reply tid %lld %p\n", tid, m);
 
 out: