]> Pileus Git - ~andy/linux/blobdiff - net/ceph/osd_client.c
libceph: init sent and completed when starting
[~andy/linux] / net / ceph / osd_client.c
index 69ef6539ca14458b2989e301a311e65135bf334f..d5953b87918c072daaa1427187246f9f9cfcad3e 100644 (file)
@@ -1,3 +1,4 @@
+
 #include <linux/ceph/ceph_debug.h>
 
 #include <linux/module.h>
@@ -21,6 +22,8 @@
 #define OSD_OP_FRONT_LEN       4096
 #define OSD_OPREPLY_FRONT_LEN  512
 
+static struct kmem_cache       *ceph_osd_request_cache;
+
 static const struct ceph_connection_operations osd_con_ops;
 
 static void __send_queued(struct ceph_osd_client *osdc);
@@ -79,46 +82,243 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
        return 0;
 }
 
+static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
+{
+       memset(osd_data, 0, sizeof (*osd_data));
+       osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
+}
+
+static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
+                       struct page **pages, u64 length, u32 alignment,
+                       bool pages_from_pool, bool own_pages)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
+       osd_data->pages = pages;
+       osd_data->length = length;
+       osd_data->alignment = alignment;
+       osd_data->pages_from_pool = pages_from_pool;
+       osd_data->own_pages = own_pages;
+}
+
+static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
+                       struct ceph_pagelist *pagelist)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
+       osd_data->pagelist = pagelist;
+}
+
+#ifdef CONFIG_BLOCK
+static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
+                       struct bio *bio, size_t bio_length)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
+       osd_data->bio = bio;
+       osd_data->bio_length = bio_length;
+}
+#endif /* CONFIG_BLOCK */
+
+#define osd_req_op_data(oreq, whch, typ, fld)  \
+       ({                                              \
+               BUG_ON(whch >= (oreq)->r_num_ops);      \
+               &(oreq)->r_ops[whch].typ.fld;           \
+       })
+
+static struct ceph_osd_data *
+osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
+{
+       BUG_ON(which >= osd_req->r_num_ops);
+
+       return &osd_req->r_ops[which].raw_data_in;
+}
+
+struct ceph_osd_data *
+osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
+                       unsigned int which)
+{
+       return osd_req_op_data(osd_req, which, extent, osd_data);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data);
+
+struct ceph_osd_data *
+osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
+                       unsigned int which)
+{
+       return osd_req_op_data(osd_req, which, cls, response_data);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data);   /* ??? */
+
+void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages,
+                       u64 length, u32 alignment,
+                       bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_raw_data_in(osd_req, which);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
+
+void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages,
+                       u64 length, u32 alignment,
+                       bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
+
+void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct ceph_pagelist *pagelist)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
+
+#ifdef CONFIG_BLOCK
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct bio *bio, size_t bio_length)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_bio_init(osd_data, bio, bio_length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
+#endif /* CONFIG_BLOCK */
+
+static void osd_req_op_cls_request_info_pagelist(
+                       struct ceph_osd_request *osd_req,
+                       unsigned int which, struct ceph_pagelist *pagelist)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_info);
+       ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+
+void osd_req_op_cls_request_data_pagelist(
+                       struct ceph_osd_request *osd_req,
+                       unsigned int which, struct ceph_pagelist *pagelist)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+       ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
+
+void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages, u64 length,
+                       u32 alignment, bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
+
+void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
+                       unsigned int which, struct page **pages, u64 length,
+                       u32 alignment, bool pages_from_pool, bool own_pages)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, cls, response_data);
+       ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+                               pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
+
+static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
+{
+       switch (osd_data->type) {
+       case CEPH_OSD_DATA_TYPE_NONE:
+               return 0;
+       case CEPH_OSD_DATA_TYPE_PAGES:
+               return osd_data->length;
+       case CEPH_OSD_DATA_TYPE_PAGELIST:
+               return (u64)osd_data->pagelist->length;
+#ifdef CONFIG_BLOCK
+       case CEPH_OSD_DATA_TYPE_BIO:
+               return (u64)osd_data->bio_length;
+#endif /* CONFIG_BLOCK */
+       default:
+               WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
+               return 0;
+       }
+}
+
+static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
+{
+       if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
+               int num_pages;
+
+               num_pages = calc_pages_for((u64)osd_data->alignment,
+                                               (u64)osd_data->length);
+               ceph_release_page_vector(osd_data->pages, num_pages);
+       }
+       ceph_osd_data_init(osd_data);
+}
+
+static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
+                       unsigned int which)
+{
+       struct ceph_osd_req_op *op;
+
+       BUG_ON(which >= osd_req->r_num_ops);
+       op = &osd_req->r_ops[which];
+
+       switch (op->op) {
+       case CEPH_OSD_OP_READ:
+       case CEPH_OSD_OP_WRITE:
+               ceph_osd_data_release(&op->extent.osd_data);
+               break;
+       case CEPH_OSD_OP_CALL:
+               ceph_osd_data_release(&op->cls.request_info);
+               ceph_osd_data_release(&op->cls.request_data);
+               ceph_osd_data_release(&op->cls.response_data);
+               break;
+       default:
+               break;
+       }
+}
+
 /*
  * requests
  */
 void ceph_osdc_release_request(struct kref *kref)
 {
-       int num_pages;
-       struct ceph_osd_request *req = container_of(kref,
-                                                   struct ceph_osd_request,
-                                                   r_kref);
+       struct ceph_osd_request *req;
+       unsigned int which;
 
+       req = container_of(kref, struct ceph_osd_request, r_kref);
        if (req->r_request)
                ceph_msg_put(req->r_request);
-       if (req->r_con_filling_msg) {
-               dout("%s revoking msg %p from con %p\n", __func__,
-                    req->r_reply, req->r_con_filling_msg);
+       if (req->r_reply) {
                ceph_msg_revoke_incoming(req->r_reply);
-               req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
-               req->r_con_filling_msg = NULL;
-       }
-       if (req->r_reply)
                ceph_msg_put(req->r_reply);
-
-       if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES &&
-                       req->r_data_in.own_pages) {
-               num_pages = calc_pages_for((u64)req->r_data_in.alignment,
-                                               (u64)req->r_data_in.length);
-               ceph_release_page_vector(req->r_data_in.pages, num_pages);
-       }
-       if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES &&
-                       req->r_data_out.own_pages) {
-               num_pages = calc_pages_for((u64)req->r_data_out.alignment,
-                                               (u64)req->r_data_out.length);
-               ceph_release_page_vector(req->r_data_out.pages, num_pages);
        }
 
+       for (which = 0; which < req->r_num_ops; which++)
+               osd_req_op_data_release(req, which);
+
        ceph_put_snap_context(req->r_snapc);
        if (req->r_mempool)
                mempool_free(req, req->r_osdc->req_mempool);
        else
-               kfree(req);
+               kmem_cache_free(ceph_osd_request_cache, req);
+
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
@@ -132,6 +332,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        struct ceph_msg *msg;
        size_t msg_size;
 
+       BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
+       BUG_ON(num_ops > CEPH_OSD_MAX_OP);
+
        msg_size = 4 + 4 + 8 + 8 + 4+8;
        msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
        msg_size += 1 + 8 + 4 + 4;     /* pg_t */
@@ -146,13 +349,14 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                req = mempool_alloc(osdc->req_mempool, gfp_flags);
                memset(req, 0, sizeof(*req));
        } else {
-               req = kzalloc(sizeof(*req), gfp_flags);
+               req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
        }
        if (req == NULL)
                return NULL;
 
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
+       req->r_num_ops = num_ops;
 
        kref_init(&req->r_kref);
        init_completion(&req->r_completion);
@@ -176,9 +380,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        }
        req->r_reply = msg;
 
-       req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE;
-       req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE;
-
        /* create request message; allow space for oid */
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -271,25 +472,39 @@ static bool osd_req_opcode_valid(u16 opcode)
  * other information associated with them.  It also serves as a
  * common init routine for all the other init functions, below.
  */
-void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode)
+static struct ceph_osd_req_op *
+_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+                               u16 opcode)
 {
+       struct ceph_osd_req_op *op;
+
+       BUG_ON(which >= osd_req->r_num_ops);
        BUG_ON(!osd_req_opcode_valid(opcode));
 
+       op = &osd_req->r_ops[which];
        memset(op, 0, sizeof (*op));
-
        op->op = opcode;
+
+       return op;
+}
+
+void osd_req_op_init(struct ceph_osd_request *osd_req,
+                               unsigned int which, u16 opcode)
+{
+       (void)_osd_req_op_init(osd_req, which, opcode);
 }
+EXPORT_SYMBOL(osd_req_op_init);
 
-void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
+void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
+                               unsigned int which, u16 opcode,
                                u64 offset, u64 length,
                                u64 truncate_size, u32 truncate_seq)
 {
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
        size_t payload_len = 0;
 
        BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
 
-       osd_req_op_init(op, opcode);
-
        op->extent.offset = offset;
        op->extent.length = length;
        op->extent.truncate_size = truncate_size;
@@ -301,33 +516,54 @@ void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
 }
 EXPORT_SYMBOL(osd_req_op_extent_init);
 
-void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
-                       const char *class, const char *method,
-                       const void *request_data, size_t request_data_size)
+void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
+                               unsigned int which, u64 length)
+{
+       struct ceph_osd_req_op *op;
+       u64 previous;
+
+       BUG_ON(which >= osd_req->r_num_ops);
+       op = &osd_req->r_ops[which];
+       previous = op->extent.length;
+
+       if (length == previous)
+               return;         /* Nothing to do */
+       BUG_ON(length > previous);
+
+       op->extent.length = length;
+       op->payload_len -= previous - length;
+}
+EXPORT_SYMBOL(osd_req_op_extent_update);
+
+void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
+                       u16 opcode, const char *class, const char *method)
 {
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+       struct ceph_pagelist *pagelist;
        size_t payload_len = 0;
        size_t size;
 
        BUG_ON(opcode != CEPH_OSD_OP_CALL);
 
-       osd_req_op_init(op, opcode);
+       pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+       BUG_ON(!pagelist);
+       ceph_pagelist_init(pagelist);
 
        op->cls.class_name = class;
        size = strlen(class);
        BUG_ON(size > (size_t) U8_MAX);
        op->cls.class_len = size;
+       ceph_pagelist_append(pagelist, class, size);
        payload_len += size;
 
        op->cls.method_name = method;
        size = strlen(method);
        BUG_ON(size > (size_t) U8_MAX);
        op->cls.method_len = size;
+       ceph_pagelist_append(pagelist, method, size);
        payload_len += size;
 
-       op->cls.indata = request_data;
-       BUG_ON(request_data_size > (size_t) U32_MAX);
-       op->cls.indata_len = (u32) request_data_size;
-       payload_len += request_data_size;
+       osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
 
        op->cls.argc = 0;       /* currently unused */
 
@@ -335,28 +571,53 @@ void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
 }
 EXPORT_SYMBOL(osd_req_op_cls_init);
 
-void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
+void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
+                               unsigned int which, u16 opcode,
                                u64 cookie, u64 version, int flag)
 {
-       BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
 
-       osd_req_op_init(op, opcode);
+       BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
 
        op->watch.cookie = cookie;
-       /* op->watch.ver = version; */  /* XXX 3847 */
-       op->watch.ver = cpu_to_le64(version);
+       op->watch.ver = version;
        if (opcode == CEPH_OSD_OP_WATCH && flag)
-               op->watch.flag = (u8) 1;
+               op->watch.flag = (u8)1;
 }
 EXPORT_SYMBOL(osd_req_op_watch_init);
 
+static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
+                               struct ceph_osd_data *osd_data)
+{
+       u64 length = ceph_osd_data_length(osd_data);
+
+       if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+               BUG_ON(length > (u64) SIZE_MAX);
+               if (length)
+                       ceph_msg_data_add_pages(msg, osd_data->pages,
+                                       length, osd_data->alignment);
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+               BUG_ON(!length);
+               ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
+#ifdef CONFIG_BLOCK
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+               ceph_msg_data_add_bio(msg, osd_data->bio, length);
+#endif
+       } else {
+               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+       }
+}
+
 static u64 osd_req_encode_op(struct ceph_osd_request *req,
-                             struct ceph_osd_op *dst,
-                             struct ceph_osd_req_op *src)
+                             struct ceph_osd_op *dst, unsigned int which)
 {
-       u64 out_data_len = 0;
-       struct ceph_pagelist *pagelist;
+       struct ceph_osd_req_op *src;
+       struct ceph_osd_data *osd_data;
+       u64 request_data_len = 0;
+       u64 data_length;
 
+       BUG_ON(which >= req->r_num_ops);
+       src = &req->r_ops[which];
        if (WARN_ON(!osd_req_opcode_valid(src->op))) {
                pr_err("unrecognized osd opcode %d\n", src->op);
 
@@ -365,36 +626,44 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 
        switch (src->op) {
        case CEPH_OSD_OP_STAT:
+               osd_data = &src->raw_data_in;
+               ceph_osdc_msg_data_add(req->r_reply, osd_data);
                break;
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
                if (src->op == CEPH_OSD_OP_WRITE)
-                       out_data_len = src->extent.length;
+                       request_data_len = src->extent.length;
                dst->extent.offset = cpu_to_le64(src->extent.offset);
                dst->extent.length = cpu_to_le64(src->extent.length);
                dst->extent.truncate_size =
                        cpu_to_le64(src->extent.truncate_size);
                dst->extent.truncate_seq =
                        cpu_to_le32(src->extent.truncate_seq);
+               osd_data = &src->extent.osd_data;
+               if (src->op == CEPH_OSD_OP_WRITE)
+                       ceph_osdc_msg_data_add(req->r_request, osd_data);
+               else
+                       ceph_osdc_msg_data_add(req->r_reply, osd_data);
                break;
        case CEPH_OSD_OP_CALL:
-               pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
-               BUG_ON(!pagelist);
-               ceph_pagelist_init(pagelist);
-
                dst->cls.class_len = src->cls.class_len;
                dst->cls.method_len = src->cls.method_len;
-               dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
-               ceph_pagelist_append(pagelist, src->cls.class_name,
-                                    src->cls.class_len);
-               ceph_pagelist_append(pagelist, src->cls.method_name,
-                                    src->cls.method_len);
-               ceph_pagelist_append(pagelist, src->cls.indata,
-                                    src->cls.indata_len);
-
-               req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST;
-               req->r_data_out.pagelist = pagelist;
-               out_data_len = pagelist->length;
+               osd_data = &src->cls.request_info;
+               ceph_osdc_msg_data_add(req->r_request, osd_data);
+               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+               request_data_len = osd_data->pagelist->length;
+
+               osd_data = &src->cls.request_data;
+               data_length = ceph_osd_data_length(osd_data);
+               if (data_length) {
+                       BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+                       dst->cls.indata_len = cpu_to_le32(data_length);
+                       ceph_osdc_msg_data_add(req->r_request, osd_data);
+                       src->payload_len += data_length;
+                       request_data_len += data_length;
+               }
+               osd_data = &src->cls.response_data;
+               ceph_osdc_msg_data_add(req->r_reply, osd_data);
                break;
        case CEPH_OSD_OP_STARTSYNC:
                break;
@@ -414,115 +683,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
        dst->op = cpu_to_le16(src->op);
        dst->payload_len = cpu_to_le32(src->payload_len);
 
-       return out_data_len;
+       return request_data_len;
 }
 
-/*
- * build new request AND message
- *
- */
-void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, unsigned int num_ops,
-                            struct ceph_osd_req_op *src_ops,
-                            struct ceph_snap_context *snapc, u64 snap_id,
-                            struct timespec *mtime)
-{
-       struct ceph_msg *msg = req->r_request;
-       struct ceph_osd_req_op *src_op;
-       void *p;
-       size_t msg_size;
-       int flags = req->r_flags;
-       u64 data_len;
-       int i;
-
-       req->r_num_ops = num_ops;
-       req->r_snapid = snap_id;
-       req->r_snapc = ceph_get_snap_context(snapc);
-
-       /* encode request */
-       msg->hdr.version = cpu_to_le16(4);
-
-       p = msg->front.iov_base;
-       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
-       req->r_request_osdmap_epoch = p;
-       p += 4;
-       req->r_request_flags = p;
-       p += 4;
-       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
-               ceph_encode_timespec(p, mtime);
-       p += sizeof(struct ceph_timespec);
-       req->r_request_reassert_version = p;
-       p += sizeof(struct ceph_eversion); /* will get filled in */
-
-       /* oloc */
-       ceph_encode_8(&p, 4);
-       ceph_encode_8(&p, 4);
-       ceph_encode_32(&p, 8 + 4 + 4);
-       req->r_request_pool = p;
-       p += 8;
-       ceph_encode_32(&p, -1);  /* preferred */
-       ceph_encode_32(&p, 0);   /* key len */
-
-       ceph_encode_8(&p, 1);
-       req->r_request_pgid = p;
-       p += 8 + 4;
-       ceph_encode_32(&p, -1);  /* preferred */
-
-       /* oid */
-       ceph_encode_32(&p, req->r_oid_len);
-       memcpy(p, req->r_oid, req->r_oid_len);
-       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
-       p += req->r_oid_len;
-
-       /* ops--can imply data */
-       ceph_encode_16(&p, num_ops);
-       src_op = src_ops;
-       req->r_request_ops = p;
-       data_len = 0;
-       for (i = 0; i < num_ops; i++, src_op++) {
-               data_len += osd_req_encode_op(req, p, src_op);
-               p += sizeof(struct ceph_osd_op);
-       }
-
-       /* snaps */
-       ceph_encode_64(&p, req->r_snapid);
-       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
-       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
-       if (req->r_snapc) {
-               for (i = 0; i < snapc->num_snaps; i++) {
-                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
-               }
-       }
-
-       req->r_request_attempts = p;
-       p += 4;
-
-       /* data */
-       if (flags & CEPH_OSD_FLAG_WRITE) {
-               u16 data_off;
-
-               /*
-                * The header "data_off" is a hint to the receiver
-                * allowing it to align received data into its
-                * buffers such that there's no need to re-copy
-                * it before writing it to disk (direct I/O).
-                */
-               data_off = (u16) (off & 0xffff);
-               req->r_request->hdr.data_off = cpu_to_le16(data_off);
-       }
-       req->r_request->hdr.data_len = cpu_to_le32(data_len);
-
-       BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
-       msg_size = p - msg->front.iov_base;
-       msg->front.iov_len = msg_size;
-       msg->hdr.front_len = cpu_to_le32(msg_size);
-
-       dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
-            num_ops);
-       return;
-}
-EXPORT_SYMBOL(ceph_osdc_build_request);
-
 /*
  * build new request AND message, calculate layout, and adjust file
  * extent as needed.
@@ -537,18 +700,14 @@ EXPORT_SYMBOL(ceph_osdc_build_request);
 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               struct ceph_file_layout *layout,
                                               struct ceph_vino vino,
-                                              u64 off, u64 *plen,
+                                              u64 off, u64 *plen, int num_ops,
                                               int opcode, int flags,
                                               struct ceph_snap_context *snapc,
-                                              int do_sync,
                                               u32 truncate_seq,
                                               u64 truncate_size,
-                                              struct timespec *mtime,
                                               bool use_mempool)
 {
-       struct ceph_osd_req_op ops[2];
        struct ceph_osd_request *req;
-       unsigned int num_op = do_sync ? 2 : 1;
        u64 objnum = 0;
        u64 objoff = 0;
        u64 objlen = 0;
@@ -558,10 +717,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
 
-       req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+       req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
                                        GFP_NOFS);
        if (!req)
                return ERR_PTR(-ENOMEM);
+
        req->r_flags = flags;
 
        /* calculate max write size */
@@ -581,10 +741,16 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                        truncate_size = object_size;
        }
 
-       osd_req_op_extent_init(&ops[0], opcode, objoff, objlen,
+       osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
                                truncate_size, truncate_seq);
-       if (do_sync)
-               osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC);
+
+       /*
+        * A second op in the ops array means the caller wants to
+        * also issue a include a 'startsync' command so that the
+        * osd will flush data quickly.
+        */
+       if (num_ops > 1)
+               osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
 
        req->r_file_layout = *layout;  /* keep a copy */
 
@@ -592,9 +758,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                vino.ino, objnum);
        req->r_oid_len = strlen(req->r_oid);
 
-       ceph_osdc_build_request(req, off, num_op, ops,
-                               snapc, vino.snap, mtime);
-
        return req;
 }
 EXPORT_SYMBOL(ceph_osdc_new_request);
@@ -1041,6 +1204,7 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
        mutex_lock(&osdc->request_mutex);
        if (req->r_linger) {
                __unregister_linger_request(osdc, req);
+               req->r_linger = 0;
                ceph_osdc_put_request(req);
        }
        mutex_unlock(&osdc->request_mutex);
@@ -1174,8 +1338,14 @@ static void __send_request(struct ceph_osd_client *osdc,
        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
-       ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+       /* Mark the request unsafe if this is the first timet's being sent. */
+
+       if (!req->r_sent && req->r_unsafe_callback)
+               req->r_unsafe_callback(req, true);
        req->r_sent = req->r_osd->o_incarnation;
+
+       ceph_con_send(&req->r_osd->o_con, req->r_request);
 }
 
 /*
@@ -1263,31 +1433,11 @@ static void handle_osds_timeout(struct work_struct *work)
 
 static void complete_request(struct ceph_osd_request *req)
 {
-       if (req->r_safe_callback)
-               req->r_safe_callback(req, NULL);
+       if (req->r_unsafe_callback)
+               req->r_unsafe_callback(req, false);
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }
 
-static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
-{
-       __u8 v;
-
-       ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
-       v = ceph_decode_8(p);
-       if (v > 1) {
-               pr_warning("do not understand pg encoding %d > 1", v);
-               return -EINVAL;
-       }
-       pgid->pool = ceph_decode_64(p);
-       pgid->seed = ceph_decode_32(p);
-       *p += 4;
-       return 0;
-
-bad:
-       pr_warning("incomplete pg encoding");
-       return -EINVAL;
-}
-
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
@@ -1299,7 +1449,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        struct ceph_osd_request *req;
        u64 tid;
        int object_len;
-       int numops, payload_len, flags;
+       unsigned int numops;
+       int payload_len, flags;
        s32 result;
        s32 retry_attempt;
        struct ceph_pg pg;
@@ -1308,7 +1459,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        u64 reassert_version;
        u32 osdmap_epoch;
        int already_completed;
-       int i;
+       u32 bytes;
+       unsigned int i;
 
        tid = le64_to_cpu(msg->hdr.tid);
        dout("handle_reply %p tid %llu\n", msg, tid);
@@ -1321,7 +1473,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        ceph_decode_need(&p, end, object_len, bad);
        p += object_len;
 
-       err = __decode_pgid(&p, end, &pg);
+       err = ceph_decode_pgid(&p, end, &pg);
        if (err)
                goto bad;
 
@@ -1362,9 +1514,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                payload_len += len;
                p += sizeof(*op);
        }
-       if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
+       bytes = le32_to_cpu(msg->hdr.data_len);
+       if (payload_len != bytes) {
                pr_warning("sum of op payload lens %d != data_len %d",
-                          payload_len, le32_to_cpu(msg->hdr.data_len));
+                          payload_len, bytes);
                goto bad_put;
        }
 
@@ -1373,21 +1526,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
 
-       /*
-        * if this connection filled our message, drop our reference now, to
-        * avoid a (safe but slower) revoke later.
-        */
-       if (req->r_con_filling_msg == con && req->r_reply == msg) {
-               dout(" dropping con_filling_msg ref %p\n", con);
-               req->r_con_filling_msg = NULL;
-               con->ops->put(con);
-       }
-
        if (!req->r_got_reply) {
-               unsigned int bytes;
 
                req->r_result = result;
-               bytes = le32_to_cpu(msg->hdr.data_len);
                dout("handle_reply result %d bytes %d\n", req->r_result,
                     bytes);
                if (req->r_result == 0)
@@ -1870,25 +2011,103 @@ bad:
        return;
 }
 
-static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
-                               struct ceph_osd_data *osd_data)
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
+                               struct ceph_snap_context *snapc, u64 snap_id,
+                               struct timespec *mtime)
 {
-       if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
-               BUG_ON(osd_data->length > (u64) SIZE_MAX);
-               if (osd_data->length)
-                       ceph_msg_data_set_pages(msg, osd_data->pages,
-                               osd_data->length, osd_data->alignment);
-       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
-               BUG_ON(!osd_data->pagelist->length);
-               ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
-#ifdef CONFIG_BLOCK
-       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
-               ceph_msg_data_set_bio(msg, osd_data->bio);
-#endif
-       } else {
-               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+       struct ceph_msg *msg = req->r_request;
+       void *p;
+       size_t msg_size;
+       int flags = req->r_flags;
+       u64 data_len;
+       unsigned int i;
+
+       req->r_snapid = snap_id;
+       req->r_snapc = ceph_get_snap_context(snapc);
+
+       /* encode request */
+       msg->hdr.version = cpu_to_le16(4);
+
+       p = msg->front.iov_base;
+       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
+       req->r_request_osdmap_epoch = p;
+       p += 4;
+       req->r_request_flags = p;
+       p += 4;
+       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+               ceph_encode_timespec(p, mtime);
+       p += sizeof(struct ceph_timespec);
+       req->r_request_reassert_version = p;
+       p += sizeof(struct ceph_eversion); /* will get filled in */
+
+       /* oloc */
+       ceph_encode_8(&p, 4);
+       ceph_encode_8(&p, 4);
+       ceph_encode_32(&p, 8 + 4 + 4);
+       req->r_request_pool = p;
+       p += 8;
+       ceph_encode_32(&p, -1);  /* preferred */
+       ceph_encode_32(&p, 0);   /* key len */
+
+       ceph_encode_8(&p, 1);
+       req->r_request_pgid = p;
+       p += 8 + 4;
+       ceph_encode_32(&p, -1);  /* preferred */
+
+       /* oid */
+       ceph_encode_32(&p, req->r_oid_len);
+       memcpy(p, req->r_oid, req->r_oid_len);
+       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
+       p += req->r_oid_len;
+
+       /* ops--can imply data */
+       ceph_encode_16(&p, (u16)req->r_num_ops);
+       data_len = 0;
+       for (i = 0; i < req->r_num_ops; i++) {
+               data_len += osd_req_encode_op(req, p, i);
+               p += sizeof(struct ceph_osd_op);
+       }
+
+       /* snaps */
+       ceph_encode_64(&p, req->r_snapid);
+       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+       if (req->r_snapc) {
+               for (i = 0; i < snapc->num_snaps; i++) {
+                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
+               }
        }
+
+       req->r_request_attempts = p;
+       p += 4;
+
+       /* data */
+       if (flags & CEPH_OSD_FLAG_WRITE) {
+               u16 data_off;
+
+               /*
+                * The header "data_off" is a hint to the receiver
+                * allowing it to align received data into its
+                * buffers such that there's no need to re-copy
+                * it before writing it to disk (direct I/O).
+                */
+               data_off = (u16) (off & 0xffff);
+               req->r_request->hdr.data_off = cpu_to_le16(data_off);
+       }
+       req->r_request->hdr.data_len = cpu_to_le32(data_len);
+
+       BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+       msg_size = p - msg->front.iov_base;
+       msg->front.iov_len = msg_size;
+       msg->hdr.front_len = cpu_to_le32(msg_size);
+
+       dout("build_request msg_size was %d\n", (int)msg_size);
 }
+EXPORT_SYMBOL(ceph_osdc_build_request);
 
 /*
  * Register request, send initial attempt.
@@ -1899,15 +2118,12 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 {
        int rc = 0;
 
-       /* Set up response incoming data and request outgoing data fields */
-
-       ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in);
-       ceph_osdc_msg_data_set(req->r_request, &req->r_data_out);
-
        down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
        __register_request(osdc, req);
-       WARN_ON(req->r_sent);
+       req->r_sent = 0;
+       req->r_got_reply = 0;
+       req->r_completed = 0;
        rc = __map_request(osdc, req, 0);
        if (rc < 0) {
                if (nofail) {
@@ -2081,28 +2297,26 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct page **pages, int num_pages, int page_align)
 {
        struct ceph_osd_request *req;
-       struct ceph_osd_data *osd_data;
        int rc = 0;
 
        dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
             vino.snap, off, *plen);
-       req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
+       req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
-                                   NULL, 0, truncate_seq, truncate_size, NULL,
+                                   NULL, truncate_seq, truncate_size,
                                    false);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
        /* it may be a short read due to an object boundary */
 
-       osd_data = &req->r_data_in;
-       osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
-       osd_data->pages = pages;
-       osd_data->length = *plen;
-       osd_data->alignment = page_align;
+       osd_req_op_extent_osd_data_pages(req, 0,
+                               pages, *plen, page_align, false, false);
 
        dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
-            off, *plen, osd_data->length, page_align);
+            off, *plen, *plen, page_align);
+
+       ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
 
        rc = ceph_osdc_start_request(osdc, req, false);
        if (!rc)
@@ -2126,27 +2340,24 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct page **pages, int num_pages)
 {
        struct ceph_osd_request *req;
-       struct ceph_osd_data *osd_data;
        int rc = 0;
        int page_align = off & ~PAGE_MASK;
 
-       BUG_ON(vino.snap != CEPH_NOSNAP);
-       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+       BUG_ON(vino.snap != CEPH_NOSNAP);       /* snapshots aren't writeable */
+       req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
                                    CEPH_OSD_OP_WRITE,
                                    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-                                   snapc, 0,
-                                   truncate_seq, truncate_size, mtime,
+                                   snapc, truncate_seq, truncate_size,
                                    true);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
        /* it may be a short write due to an object boundary */
-       osd_data = &req->r_data_out;
-       osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
-       osd_data->pages = pages;
-       osd_data->length = len;
-       osd_data->alignment = page_align;
-       dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length);
+       osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+                               false, false);
+       dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
+
+       ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
 
        rc = ceph_osdc_start_request(osdc, req, true);
        if (!rc)
@@ -2160,6 +2371,26 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
+int ceph_osdc_setup(void)
+{
+       BUG_ON(ceph_osd_request_cache);
+       ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
+                                       sizeof (struct ceph_osd_request),
+                                       __alignof__(struct ceph_osd_request),
+                                       0, NULL);
+
+       return ceph_osd_request_cache ? 0 : -ENOMEM;
+}
+EXPORT_SYMBOL(ceph_osdc_setup);
+
+void ceph_osdc_cleanup(void)
+{
+       BUG_ON(!ceph_osd_request_cache);
+       kmem_cache_destroy(ceph_osd_request_cache);
+       ceph_osd_request_cache = NULL;
+}
+EXPORT_SYMBOL(ceph_osdc_cleanup);
+
 /*
  * handle incoming message
  */
@@ -2219,13 +2450,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                goto out;
        }
 
-       if (req->r_con_filling_msg) {
+       if (req->r_reply->con)
                dout("%s revoking msg %p from old con %p\n", __func__,
-                    req->r_reply, req->r_con_filling_msg);
-               ceph_msg_revoke_incoming(req->r_reply);
-               req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
-               req->r_con_filling_msg = NULL;
-       }
+                    req->r_reply, req->r_reply->con);
+       ceph_msg_revoke_incoming(req->r_reply);
 
        if (front > req->r_reply->front.iov_len) {
                pr_warning("get_reply front %d > preallocated %d\n",
@@ -2239,8 +2467,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        m = ceph_msg_get(req->r_reply);
 
        if (data_len > 0) {
-               struct ceph_osd_data *osd_data = &req->r_data_in;
+               struct ceph_osd_data *osd_data;
 
+               /*
+                * XXX This is assuming there is only one op containing
+                * XXX page data.  Probably OK for reads, but this
+                * XXX ought to be done more generally.
+                */
+               osd_data = osd_req_op_extent_osd_data(req, 0);
                if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
                        if (osd_data->pages &&
                                unlikely(osd_data->length < data_len)) {
@@ -2256,7 +2490,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                }
        }
        *skip = 0;
-       req->r_con_filling_msg = con->ops->get(con);
        dout("get_reply tid %lld %p\n", tid, m);
 
 out: