]> Pileus Git - ~andy/linux/blobdiff - net/ceph/messenger.c
libceph: allocate ceph message data with a slab allocator
[~andy/linux] / net / ceph / messenger.c
index c74b5289778af38f3a9a01e6f6f732da1f2b835a..eb0a46a49bd42351d23878f118384ab09a8d2ee8 100644 (file)
@@ -21,6 +21,9 @@
 #include <linux/ceph/pagelist.h>
 #include <linux/export.h>
 
+#define list_entry_next(pos, member)                                   \
+       list_entry(pos->member.next, typeof(*pos), member)
+
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
  * hosts in the system.  The messenger provides ordered and reliable
@@ -149,6 +152,11 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
        return test_and_set_bit(con_flag, &con->flags);
 }
 
+/* Slab caches for frequently-allocated structures */
+
+static struct kmem_cache       *ceph_msg_cache;
+static struct kmem_cache       *ceph_msg_data_cache;
+
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -223,6 +231,41 @@ static void encode_my_addr(struct ceph_messenger *msgr)
  */
 static struct workqueue_struct *ceph_msgr_wq;
 
+static int ceph_msgr_slab_init(void)
+{
+       BUG_ON(ceph_msg_cache);
+       ceph_msg_cache = kmem_cache_create("ceph_msg",
+                                       sizeof (struct ceph_msg),
+                                       __alignof__(struct ceph_msg), 0, NULL);
+
+       if (!ceph_msg_cache)
+               return -ENOMEM;
+
+       BUG_ON(ceph_msg_data_cache);
+       ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
+                                       sizeof (struct ceph_msg_data),
+                                       __alignof__(struct ceph_msg_data),
+                                       0, NULL);
+       if (ceph_msg_data_cache)
+               return 0;
+
+       kmem_cache_destroy(ceph_msg_cache);
+       ceph_msg_cache = NULL;
+
+       return -ENOMEM;
+}
+
+static void ceph_msgr_slab_exit(void)
+{
+       BUG_ON(!ceph_msg_data_cache);
+       kmem_cache_destroy(ceph_msg_data_cache);
+       ceph_msg_data_cache = NULL;
+
+       BUG_ON(!ceph_msg_cache);
+       kmem_cache_destroy(ceph_msg_cache);
+       ceph_msg_cache = NULL;
+}
+
 static void _ceph_msgr_exit(void)
 {
        if (ceph_msgr_wq) {
@@ -230,6 +273,8 @@ static void _ceph_msgr_exit(void)
                ceph_msgr_wq = NULL;
        }
 
+       ceph_msgr_slab_exit();
+
        BUG_ON(zero_page == NULL);
        kunmap(zero_page);
        page_cache_release(zero_page);
@@ -242,6 +287,9 @@ int ceph_msgr_init(void)
        zero_page = ZERO_PAGE(0);
        page_cache_get(zero_page);
 
+       if (ceph_msgr_slab_init())
+               return -ENOMEM;
+
        ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
        if (ceph_msgr_wq)
                return 0;
@@ -713,49 +761,397 @@ static void con_out_kvec_add(struct ceph_connection *con,
 }
 
 #ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **bio_iter,
-                       unsigned int *bio_seg)
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
+                                       size_t length)
 {
-       if (!bio) {
-               *bio_iter = NULL;
-               *bio_seg = 0;
-               return;
+       struct ceph_msg_data *data = cursor->data;
+       struct bio *bio;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+       bio = data->bio;
+       BUG_ON(!bio);
+       BUG_ON(!bio->bi_vcnt);
+
+       cursor->resid = min(length, data->bio_length);
+       cursor->bio = bio;
+       cursor->vector_index = 0;
+       cursor->vector_offset = 0;
+       cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
+                                               size_t *page_offset,
+                                               size_t *length)
+{
+       struct ceph_msg_data *data = cursor->data;
+       struct bio *bio;
+       struct bio_vec *bio_vec;
+       unsigned int index;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+       bio = cursor->bio;
+       BUG_ON(!bio);
+
+       index = cursor->vector_index;
+       BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+
+       bio_vec = &bio->bi_io_vec[index];
+       BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
+       *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
+       BUG_ON(*page_offset >= PAGE_SIZE);
+       if (cursor->last_piece) /* pagelist offset is always 0 */
+               *length = cursor->resid;
+       else
+               *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+       BUG_ON(*length > cursor->resid);
+       BUG_ON(*page_offset + *length > PAGE_SIZE);
+
+       return bio_vec->bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
+                                       size_t bytes)
+{
+       struct bio *bio;
+       struct bio_vec *bio_vec;
+       unsigned int index;
+
+       BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
+
+       bio = cursor->bio;
+       BUG_ON(!bio);
+
+       index = cursor->vector_index;
+       BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+       bio_vec = &bio->bi_io_vec[index];
+
+       /* Advance the cursor offset */
+
+       BUG_ON(cursor->resid < bytes);
+       cursor->resid -= bytes;
+       cursor->vector_offset += bytes;
+       if (cursor->vector_offset < bio_vec->bv_len)
+               return false;   /* more bytes to process in this segment */
+       BUG_ON(cursor->vector_offset != bio_vec->bv_len);
+
+       /* Move on to the next segment, and possibly the next bio */
+
+       if (++index == (unsigned int) bio->bi_vcnt) {
+               bio = bio->bi_next;
+               index = 0;
+       }
+       cursor->bio = bio;
+       cursor->vector_index = index;
+       cursor->vector_offset = 0;
+
+       if (!cursor->last_piece) {
+               BUG_ON(!cursor->resid);
+               BUG_ON(!bio);
+               /* A short read is OK, so use <= rather than == */
+               if (cursor->resid <= bio->bi_io_vec[index].bv_len)
+                       cursor->last_piece = true;
        }
-       *bio_iter = bio;
-       *bio_seg = (unsigned int) bio->bi_idx;
+
+       return true;
 }
+#endif /* CONFIG_BLOCK */
 
-static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
+/*
+ * For a page array, a piece comes from the first page in the array
+ * that has not already been fully consumed.
+ */
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
+                                       size_t length)
 {
-       if (*bio_iter == NULL)
-               return;
+       struct ceph_msg_data *data = cursor->data;
+       int page_count;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
 
-       BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+       BUG_ON(!data->pages);
+       BUG_ON(!data->length);
 
-       (*seg)++;
-       if (*seg == (*bio_iter)->bi_vcnt)
-               init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+       cursor->resid = min(length, data->length);
+       page_count = calc_pages_for(data->alignment, (u64)data->length);
+       cursor->page_offset = data->alignment & ~PAGE_MASK;
+       cursor->page_index = 0;
+       BUG_ON(page_count > (int)USHRT_MAX);
+       cursor->page_count = (unsigned short)page_count;
+       BUG_ON(length > SIZE_MAX - cursor->page_offset);
+       cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
 }
-#endif
 
-static void prepare_message_data(struct ceph_msg *msg,
-                               struct ceph_msg_pos *msg_pos)
+static struct page *
+ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
+                                       size_t *page_offset, size_t *length)
 {
-       BUG_ON(!msg);
-       BUG_ON(!msg->hdr.data_len);
+       struct ceph_msg_data *data = cursor->data;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
 
-       /* initialize page iterator */
-       msg_pos->page = 0;
-       if (ceph_msg_has_pages(msg))
-               msg_pos->page_pos = msg->page_alignment;
+       BUG_ON(cursor->page_index >= cursor->page_count);
+       BUG_ON(cursor->page_offset >= PAGE_SIZE);
+
+       *page_offset = cursor->page_offset;
+       if (cursor->last_piece)
+               *length = cursor->resid;
        else
-               msg_pos->page_pos = 0;
+               *length = PAGE_SIZE - *page_offset;
+
+       return data->pages[cursor->page_index];
+}
+
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
+                                               size_t bytes)
+{
+       BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
+
+       BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
+
+       /* Advance the cursor page offset */
+
+       cursor->resid -= bytes;
+       cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
+       if (!bytes || cursor->page_offset)
+               return false;   /* more bytes to process in the current page */
+
+       /* Move on to the next page; offset is already at 0 */
+
+       BUG_ON(cursor->page_index >= cursor->page_count);
+       cursor->page_index++;
+       cursor->last_piece = cursor->resid <= PAGE_SIZE;
+
+       return true;
+}
+
+/*
+ * For a pagelist, a piece is whatever remains to be consumed in the
+ * first page in the list, or the front of the next page.
+ */
+static void
+ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
+                                       size_t length)
+{
+       struct ceph_msg_data *data = cursor->data;
+       struct ceph_pagelist *pagelist;
+       struct page *page;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+
+       if (!length)
+               return;         /* pagelist can be assigned but empty */
+
+       BUG_ON(list_empty(&pagelist->head));
+       page = list_first_entry(&pagelist->head, struct page, lru);
+
+       cursor->resid = min(length, pagelist->length);
+       cursor->page = page;
+       cursor->offset = 0;
+       cursor->last_piece = cursor->resid <= PAGE_SIZE;
+}
+
+static struct page *
+ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
+                               size_t *page_offset, size_t *length)
+{
+       struct ceph_msg_data *data = cursor->data;
+       struct ceph_pagelist *pagelist;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+
+       BUG_ON(!cursor->page);
+       BUG_ON(cursor->offset + cursor->resid != pagelist->length);
+
+       /* offset of first page in pagelist is always 0 */
+       *page_offset = cursor->offset & ~PAGE_MASK;
+       if (cursor->last_piece)
+               *length = cursor->resid;
+       else
+               *length = PAGE_SIZE - *page_offset;
+
+       return cursor->page;
+}
+
+static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
+                                               size_t bytes)
+{
+       struct ceph_msg_data *data = cursor->data;
+       struct ceph_pagelist *pagelist;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+
+       BUG_ON(cursor->offset + cursor->resid != pagelist->length);
+       BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+       /* Advance the cursor offset */
+
+       cursor->resid -= bytes;
+       cursor->offset += bytes;
+       /* offset of first page in pagelist is always 0 */
+       if (!bytes || cursor->offset & ~PAGE_MASK)
+               return false;   /* more bytes to process in the current page */
+
+       /* Move on to the next page */
+
+       BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+       cursor->page = list_entry_next(cursor->page, lru);
+       cursor->last_piece = cursor->resid <= PAGE_SIZE;
+
+       return true;
+}
+
+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page.  The network layer might not
+ * consume an entire piece at once.  A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece.  It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
+{
+       size_t length = cursor->total_resid;
+
+       switch (cursor->data->type) {
+       case CEPH_MSG_DATA_PAGELIST:
+               ceph_msg_data_pagelist_cursor_init(cursor, length);
+               break;
+       case CEPH_MSG_DATA_PAGES:
+               ceph_msg_data_pages_cursor_init(cursor, length);
+               break;
 #ifdef CONFIG_BLOCK
-       if (ceph_msg_has_bio(msg))
-               init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
-#endif
-       msg_pos->data_pos = 0;
-       msg_pos->did_page_crc = false;
+       case CEPH_MSG_DATA_BIO:
+               ceph_msg_data_bio_cursor_init(cursor, length);
+               break;
+#endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_NONE:
+       default:
+               /* BUG(); */
+               break;
+       }
+       cursor->need_crc = true;
+}
+
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+       struct ceph_msg_data_cursor *cursor = &msg->cursor;
+       struct ceph_msg_data *data;
+
+       BUG_ON(!length);
+       BUG_ON(length > msg->data_length);
+       BUG_ON(list_empty(&msg->data));
+
+       cursor->data_head = &msg->data;
+       cursor->total_resid = length;
+       data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+       cursor->data = data;
+
+       __ceph_msg_data_cursor_init(cursor);
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+                                       size_t *page_offset, size_t *length,
+                                       bool *last_piece)
+{
+       struct page *page;
+
+       switch (cursor->data->type) {
+       case CEPH_MSG_DATA_PAGELIST:
+               page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
+               break;
+       case CEPH_MSG_DATA_PAGES:
+               page = ceph_msg_data_pages_next(cursor, page_offset, length);
+               break;
+#ifdef CONFIG_BLOCK
+       case CEPH_MSG_DATA_BIO:
+               page = ceph_msg_data_bio_next(cursor, page_offset, length);
+               break;
+#endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_NONE:
+       default:
+               page = NULL;
+               break;
+       }
+       BUG_ON(!page);
+       BUG_ON(*page_offset + *length > PAGE_SIZE);
+       BUG_ON(!*length);
+       if (last_piece)
+               *last_piece = cursor->last_piece;
+
+       return page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * of the data item.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+                               size_t bytes)
+{
+       bool new_piece;
+
+       BUG_ON(bytes > cursor->resid);
+       switch (cursor->data->type) {
+       case CEPH_MSG_DATA_PAGELIST:
+               new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
+               break;
+       case CEPH_MSG_DATA_PAGES:
+               new_piece = ceph_msg_data_pages_advance(cursor, bytes);
+               break;
+#ifdef CONFIG_BLOCK
+       case CEPH_MSG_DATA_BIO:
+               new_piece = ceph_msg_data_bio_advance(cursor, bytes);
+               break;
+#endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_NONE:
+       default:
+               BUG();
+               break;
+       }
+       cursor->total_resid -= bytes;
+
+       if (!cursor->resid && cursor->total_resid) {
+               WARN_ON(!cursor->last_piece);
+               BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+               cursor->data = list_entry_next(cursor->data, links);
+               __ceph_msg_data_cursor_init(cursor);
+               new_piece = true;
+       }
+       cursor->need_crc = new_piece;
+
+       return new_piece;
+}
+
+static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
+{
+       BUG_ON(!msg);
+       BUG_ON(!data_len);
+
+       /* Initialize data cursor */
+
+       ceph_msg_data_cursor_init(msg, (size_t)data_len);
 }
 
 /*
@@ -818,11 +1214,12 @@ static void prepare_write_message(struct ceph_connection *con)
                m->hdr.seq = cpu_to_le64(++con->out_seq);
                m->needs_out_seq = false;
        }
+       WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
 
-       dout("prepare_write_message %p seq %lld type %d len %d+%d+%d (%zd)\n",
+       dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
-            le32_to_cpu(m->hdr.data_len), m->length);
+            m->data_length);
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
        /* tag + hdr + front + middle */
@@ -853,8 +1250,8 @@ static void prepare_write_message(struct ceph_connection *con)
 
        /* is there a data payload? */
        con->out_msg->footer.data_crc = 0;
-       if (m->hdr.data_len) {
-               prepare_message_data(con->out_msg, &con->out_msg_pos);
+       if (m->data_length) {
+               prepare_message_data(con->out_msg, m->data_length);
                con->out_more = 1;  /* data + footer will follow */
        } else {
                /* no, queue up footer too and be done */
@@ -885,6 +1282,24 @@ static void prepare_write_ack(struct ceph_connection *con)
        con_flag_set(con, CON_FLAG_WRITE_PENDING);
 }
 
+/*
+ * Prepare to share the seq during handshake
+ */
+static void prepare_write_seq(struct ceph_connection *con)
+{
+       dout("prepare_write_seq %p %llu -> %llu\n", con,
+            con->in_seq_acked, con->in_seq);
+       con->in_seq_acked = con->in_seq;
+
+       con_out_kvec_reset(con);
+
+       con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
+       con_out_kvec_add(con, sizeof (con->out_temp_ack),
+                        &con->out_temp_ack);
+
+       con_flag_set(con, CON_FLAG_WRITE_PENDING);
+}
+
 /*
  * Prepare to write keepalive byte.
  */
@@ -1034,59 +1449,6 @@ out:
        return ret;  /* done! */
 }
 
-static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
-                       size_t len, size_t sent, bool in_trail)
-{
-       struct ceph_msg *msg = con->out_msg;
-       struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
-
-       BUG_ON(!msg);
-       BUG_ON(!sent);
-
-       msg_pos->data_pos += sent;
-       msg_pos->page_pos += sent;
-       if (sent < len)
-               return;
-
-       BUG_ON(sent != len);
-       msg_pos->page_pos = 0;
-       msg_pos->page++;
-       msg_pos->did_page_crc = false;
-       if (in_trail) {
-               BUG_ON(!ceph_msg_has_trail(msg));
-               list_rotate_left(&msg->trail->head);
-       } else if (ceph_msg_has_pagelist(msg)) {
-               list_rotate_left(&msg->pagelist->head);
-#ifdef CONFIG_BLOCK
-       } else if (ceph_msg_has_bio(msg)) {
-               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
-       }
-}
-
-static void in_msg_pos_next(struct ceph_connection *con, size_t len,
-                               size_t received)
-{
-       struct ceph_msg *msg = con->in_msg;
-       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
-
-       BUG_ON(!msg);
-       BUG_ON(!received);
-
-       msg_pos->data_pos += received;
-       msg_pos->page_pos += received;
-       if (received < len)
-               return;
-
-       BUG_ON(received != len);
-       msg_pos->page_pos = 0;
-       msg_pos->page++;
-#ifdef CONFIG_BLOCK
-       if (msg->bio)
-               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif /* CONFIG_BLOCK */
-}
-
 static u32 ceph_crc32c_page(u32 crc, struct page *page,
                                unsigned int page_offset,
                                unsigned int length)
@@ -1110,22 +1472,14 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
 static int write_partial_message_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
-       struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
-       unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
+       struct ceph_msg_data_cursor *cursor = &msg->cursor;
        bool do_datacrc = !con->msgr->nocrc;
-       int ret;
-       int total_max_write;
-       bool in_trail = false;
-       size_t trail_len = 0;
-       size_t trail_off = data_len;
+       u32 crc;
 
-       if (ceph_msg_has_trail(msg)) {
-               trail_len = msg->trail->length;
-               trail_off -= trail_len;
-       }
+       dout("%s %p msg %p\n", __func__, con, msg);
 
-       dout("%s %p msg %p page %d offset %d\n", __func__,
-            con, msg, msg_pos->page, msg_pos->page_pos);
+       if (list_empty(&msg->data))
+               return -EINVAL;
 
        /*
         * Iterate through each page that contains data to be
@@ -1135,68 +1489,41 @@ static int write_partial_message_data(struct ceph_connection *con)
         * need to map the page.  If we have no pages, they have
         * been revoked, so use the zero page.
         */
-       while (data_len > msg_pos->data_pos) {
-               struct page *page = NULL;
+       crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
+       while (cursor->resid) {
+               struct page *page;
                size_t page_offset;
                size_t length;
-               int max_write = PAGE_SIZE;
-               int bio_offset = 0;
-
-               in_trail = in_trail || msg_pos->data_pos >= trail_off;
-               if (!in_trail)
-                       total_max_write = trail_off - msg_pos->data_pos;
-
-               if (in_trail) {
-                       BUG_ON(!ceph_msg_has_trail(msg));
-                       total_max_write = data_len - msg_pos->data_pos;
-                       page = list_first_entry(&msg->trail->head,
-                                               struct page, lru);
-               } else if (ceph_msg_has_pages(msg)) {
-                       page = msg->pages[msg_pos->page];
-               } else if (ceph_msg_has_pagelist(msg)) {
-                       page = list_first_entry(&msg->pagelist->head,
-                                               struct page, lru);
-#ifdef CONFIG_BLOCK
-               } else if (ceph_msg_has_bio(msg)) {
-                       struct bio_vec *bv;
-
-                       bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
-                       page = bv->bv_page;
-                       bio_offset = bv->bv_offset;
-                       max_write = bv->bv_len;
-#endif
-               } else {
-                       page = zero_page;
-               }
-               length = min_t(int, max_write - msg_pos->page_pos,
-                           total_max_write);
-
-               page_offset = msg_pos->page_pos + bio_offset;
-               if (do_datacrc && !msg_pos->did_page_crc) {
-                       u32 crc = le32_to_cpu(msg->footer.data_crc);
+               bool last_piece;
+               bool need_crc;
+               int ret;
 
-                       crc = ceph_crc32c_page(crc, page, page_offset, length);
-                       msg->footer.data_crc = cpu_to_le32(crc);
-                       msg_pos->did_page_crc = true;
-               }
+               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+                                                       &last_piece);
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
-                                     length, true);
-               if (ret <= 0)
-                       goto out;
+                                     length, last_piece);
+               if (ret <= 0) {
+                       if (do_datacrc)
+                               msg->footer.data_crc = cpu_to_le32(crc);
 
-               out_msg_pos_next(con, page, length, (size_t) ret, in_trail);
+                       return ret;
+               }
+               if (do_datacrc && cursor->need_crc)
+                       crc = ceph_crc32c_page(crc, page, page_offset, length);
+               need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
        }
 
        dout("%s %p msg %p done\n", __func__, con, msg);
 
        /* prepare and queue up footer, too */
-       if (!do_datacrc)
+       if (do_datacrc)
+               msg->footer.data_crc = cpu_to_le32(crc);
+       else
                msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
        con_out_kvec_reset(con);
        prepare_write_message_footer(con);
-       ret = 1;
-out:
-       return ret;
+
+       return 1;       /* must return > 0 to indicate success */
 }
 
 /*
@@ -1240,6 +1567,13 @@ static void prepare_read_ack(struct ceph_connection *con)
        con->in_base_pos = 0;
 }
 
+static void prepare_read_seq(struct ceph_connection *con)
+{
+       dout("prepare_read_seq %p\n", con);
+       con->in_base_pos = 0;
+       con->in_tag = CEPH_MSGR_TAG_SEQ;
+}
+
 static void prepare_read_tag(struct ceph_connection *con)
 {
        dout("prepare_read_tag %p\n", con);
@@ -1646,7 +1980,6 @@ static int process_connect(struct ceph_connection *con)
                        con->error_msg = "connect authorization failure";
                        return -1;
                }
-               con->auth_retry = 1;
                con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
@@ -1717,6 +2050,7 @@ static int process_connect(struct ceph_connection *con)
                prepare_read_connect(con);
                break;
 
+       case CEPH_MSGR_TAG_SEQ:
        case CEPH_MSGR_TAG_READY:
                if (req_feat & ~server_feat) {
                        pr_err("%s%lld %s protocol feature mismatch,"
@@ -1731,7 +2065,7 @@ static int process_connect(struct ceph_connection *con)
 
                WARN_ON(con->state != CON_STATE_NEGOTIATING);
                con->state = CON_STATE_OPEN;
-
+               con->auth_retry = 0;    /* we authenticated; clear flag */
                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
                con->connect_seq++;
                con->peer_features = server_feat;
@@ -1747,7 +2081,12 @@ static int process_connect(struct ceph_connection *con)
 
                con->delay = 0;      /* reset backoff memory */
 
-               prepare_read_tag(con);
+               if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
+                       prepare_write_seq(con);
+                       prepare_read_seq(con);
+               } else {
+                       prepare_read_tag(con);
+               }
                break;
 
        case CEPH_MSGR_TAG_WAIT:
@@ -1781,7 +2120,6 @@ static int read_partial_ack(struct ceph_connection *con)
        return read_partial(con, end, size, &con->in_temp_ack);
 }
 
-
 /*
  * We can finally discard anything that's been acked.
  */
@@ -1806,8 +2144,6 @@ static void process_ack(struct ceph_connection *con)
 }
 
 
-
-
 static int read_partial_message_section(struct ceph_connection *con,
                                        struct kvec *section,
                                        unsigned int sec_len, u32 *crc)
@@ -1831,106 +2167,40 @@ static int read_partial_message_section(struct ceph_connection *con,
        return 1;
 }
 
-static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
-
-static int read_partial_message_pages(struct ceph_connection *con,
-                                     struct page **pages,
-                                     unsigned int data_len, bool do_datacrc)
-{
-       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
-       struct page *page;
-       size_t page_offset;
-       size_t length;
-       unsigned int left;
-       int ret;
-
-       /* (page) data */
-       BUG_ON(pages == NULL);
-       page = pages[msg_pos->page];
-       page_offset = msg_pos->page_pos;
-       BUG_ON(msg_pos->data_pos >= data_len);
-       left = data_len - msg_pos->data_pos;
-       BUG_ON(page_offset >= PAGE_SIZE);
-       length = min_t(unsigned int, PAGE_SIZE - page_offset, left);
-
-       ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
-       if (ret <= 0)
-               return ret;
-
-       if (do_datacrc)
-               con->in_data_crc = ceph_crc32c_page(con->in_data_crc, page,
-                                                       page_offset, ret);
-
-       in_msg_pos_next(con, length, ret);
-
-       return ret;
-}
-
-#ifdef CONFIG_BLOCK
-static int read_partial_message_bio(struct ceph_connection *con,
-                                   unsigned int data_len, bool do_datacrc)
+static int read_partial_msg_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->in_msg;
-       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
-       struct bio_vec *bv;
+       struct ceph_msg_data_cursor *cursor = &msg->cursor;
+       const bool do_datacrc = !con->msgr->nocrc;
        struct page *page;
        size_t page_offset;
        size_t length;
-       unsigned int left;
+       u32 crc = 0;
        int ret;
 
        BUG_ON(!msg);
-       BUG_ON(!msg->bio_iter);
-       bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
-       page = bv->bv_page;
-       page_offset = bv->bv_offset + msg_pos->page_pos;
-       BUG_ON(msg_pos->data_pos >= data_len);
-       left = data_len - msg_pos->data_pos;
-       BUG_ON(msg_pos->page_pos >= bv->bv_len);
-       length = min_t(unsigned int, bv->bv_len - msg_pos->page_pos, left);
-
-       ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
-       if (ret <= 0)
-               return ret;
+       if (list_empty(&msg->data))
+               return -EIO;
 
        if (do_datacrc)
-               con->in_data_crc = ceph_crc32c_page(con->in_data_crc, page,
-                                                       page_offset, ret);
-
-       in_msg_pos_next(con, length, ret);
-
-       return ret;
-}
-#endif
-
-static int read_partial_msg_data(struct ceph_connection *con)
-{
-       struct ceph_msg *msg = con->in_msg;
-       struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
-       const bool do_datacrc = !con->msgr->nocrc;
-       unsigned int data_len;
-       int ret;
-
-       BUG_ON(!msg);
+               crc = con->in_data_crc;
+       while (cursor->resid) {
+               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+                                                       NULL);
+               ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+               if (ret <= 0) {
+                       if (do_datacrc)
+                               con->in_data_crc = crc;
 
-       data_len = le32_to_cpu(con->in_hdr.data_len);
-       while (msg_pos->data_pos < data_len) {
-               if (ceph_msg_has_pages(msg)) {
-                       ret = read_partial_message_pages(con, msg->pages,
-                                                data_len, do_datacrc);
-                       if (ret <= 0)
-                               return ret;
-#ifdef CONFIG_BLOCK
-               } else if (ceph_msg_has_bio(msg)) {
-                       ret = read_partial_message_bio(con,
-                                                data_len, do_datacrc);
-                       if (ret <= 0)
-                               return ret;
-#endif
-               } else {
-                       BUG_ON(1);
+                       return ret;
                }
+
+               if (do_datacrc)
+                       crc = ceph_crc32c_page(crc, page, page_offset, ret);
+               (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
        }
+       if (do_datacrc)
+               con->in_data_crc = crc;
 
        return 1;       /* must return > 0 to indicate success */
 }
@@ -1938,6 +2208,8 @@ static int read_partial_msg_data(struct ceph_connection *con)
 /*
  * read (part of) a message.
  */
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
+
 static int read_partial_message(struct ceph_connection *con)
 {
        struct ceph_msg *m = con->in_msg;
@@ -2003,10 +2275,18 @@ static int read_partial_message(struct ceph_connection *con)
                ret = ceph_con_in_msg_alloc(con, &skip);
                if (ret < 0)
                        return ret;
+
+               BUG_ON(!con->in_msg ^ skip);
+               if (con->in_msg && data_len > con->in_msg->data_length) {
+                       pr_warning("%s skipping long message (%u > %zd)\n",
+                               __func__, data_len, con->in_msg->data_length);
+                       ceph_msg_put(con->in_msg);
+                       con->in_msg = NULL;
+                       skip = 1;
+               }
                if (skip) {
                        /* skip this message */
                        dout("alloc_msg said skip message\n");
-                       BUG_ON(con->in_msg);
                        con->in_base_pos = -front_len - middle_len - data_len -
                                sizeof(m->footer);
                        con->in_tag = CEPH_MSGR_TAG_READY;
@@ -2024,7 +2304,7 @@ static int read_partial_message(struct ceph_connection *con)
                /* prepare for data payload, if any */
 
                if (data_len)
-                       prepare_message_data(con->in_msg, &con->in_msg_pos);
+                       prepare_message_data(con->in_msg, data_len);
        }
 
        /* front */
@@ -2330,7 +2610,12 @@ more:
                        prepare_read_tag(con);
                goto more;
        }
-       if (con->in_tag == CEPH_MSGR_TAG_ACK) {
+       if (con->in_tag == CEPH_MSGR_TAG_ACK ||
+           con->in_tag == CEPH_MSGR_TAG_SEQ) {
+               /*
+                * the final handshake seq exchange is semantically
+                * equivalent to an ACK
+                */
                ret = read_partial_ack(con);
                if (ret <= 0)
                        goto out;
@@ -2736,49 +3021,88 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
-void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
+{
+       struct ceph_msg_data *data;
+
+       if (WARN_ON(!ceph_msg_data_type_valid(type)))
+               return NULL;
+
+       data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
+       if (data)
+               data->type = type;
+       INIT_LIST_HEAD(&data->links);
+
+       return data;
+}
+
+static void ceph_msg_data_destroy(struct ceph_msg_data *data)
+{
+       if (!data)
+               return;
+
+       WARN_ON(!list_empty(&data->links));
+       if (data->type == CEPH_MSG_DATA_PAGELIST) {
+               ceph_pagelist_release(data->pagelist);
+               kfree(data->pagelist);
+       }
+       kmem_cache_free(ceph_msg_data_cache, data);
+}
+
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
                size_t length, size_t alignment)
 {
+       struct ceph_msg_data *data;
+
        BUG_ON(!pages);
        BUG_ON(!length);
-       BUG_ON(msg->pages);
-       BUG_ON(msg->length);
 
-       msg->pages = pages;
-       msg->length = length;
-       msg->page_alignment = alignment & ~PAGE_MASK;
+       data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
+       BUG_ON(!data);
+       data->pages = pages;
+       data->length = length;
+       data->alignment = alignment & ~PAGE_MASK;
+
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += length;
 }
-EXPORT_SYMBOL(ceph_msg_data_set_pages);
+EXPORT_SYMBOL(ceph_msg_data_add_pages);
 
-void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
                                struct ceph_pagelist *pagelist)
 {
+       struct ceph_msg_data *data;
+
        BUG_ON(!pagelist);
        BUG_ON(!pagelist->length);
-       BUG_ON(msg->pagelist);
 
-       msg->pagelist = pagelist;
+       data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
+       BUG_ON(!data);
+       data->pagelist = pagelist;
+
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += pagelist->length;
 }
-EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
+EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
 
-void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
+#ifdef CONFIG_BLOCK
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
+               size_t length)
 {
-       BUG_ON(!bio);
-       BUG_ON(msg->bio);
+       struct ceph_msg_data *data;
 
-       msg->bio = bio;
-}
-EXPORT_SYMBOL(ceph_msg_data_set_bio);
+       BUG_ON(!bio);
 
-void ceph_msg_data_set_trail(struct ceph_msg *msg, struct ceph_pagelist *trail)
-{
-       BUG_ON(!trail);
-       BUG_ON(!trail->length);
-       BUG_ON(msg->trail);
+       data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
+       BUG_ON(!data);
+       data->bio = bio;
+       data->bio_length = length;
 
-       msg->trail = trail;
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += length;
 }
-EXPORT_SYMBOL(ceph_msg_data_set_trail);
+EXPORT_SYMBOL(ceph_msg_data_add_bio);
+#endif /* CONFIG_BLOCK */
 
 /*
  * construct a new message with given type, size
@@ -2789,7 +3113,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 {
        struct ceph_msg *m;
 
-       m = kzalloc(sizeof(*m), flags);
+       m = kmem_cache_zalloc(ceph_msg_cache, flags);
        if (m == NULL)
                goto out;
 
@@ -2799,6 +3123,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 
        INIT_LIST_HEAD(&m->list_head);
        kref_init(&m->kref);
+       INIT_LIST_HEAD(&m->data);
 
        /* front */
        m->front_max = front_len;
@@ -2935,7 +3260,7 @@ void ceph_msg_kfree(struct ceph_msg *m)
                vfree(m->front.iov_base);
        else
                kfree(m->front.iov_base);
-       kfree(m);
+       kmem_cache_free(ceph_msg_cache, m);
 }
 
 /*
@@ -2944,6 +3269,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
 void ceph_msg_last_put(struct kref *kref)
 {
        struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+       LIST_HEAD(data);
+       struct list_head *links;
+       struct list_head *next;
 
        dout("ceph_msg_put last one on %p\n", m);
        WARN_ON(!list_empty(&m->list_head));
@@ -2953,19 +3281,16 @@ void ceph_msg_last_put(struct kref *kref)
                ceph_buffer_put(m->middle);
                m->middle = NULL;
        }
-       if (ceph_msg_has_pages(m)) {
-               m->length = 0;
-               m->pages = NULL;
-       }
 
-       if (ceph_msg_has_pagelist(m)) {
-               ceph_pagelist_release(m->pagelist);
-               kfree(m->pagelist);
-               m->pagelist = NULL;
-       }
+       list_splice_init(&m->data, &data);
+       list_for_each_safe(links, next, &data) {
+               struct ceph_msg_data *data;
 
-       if (ceph_msg_has_trail(m))
-               m->trail = NULL;
+               data = list_entry(links, struct ceph_msg_data, links);
+               list_del_init(links);
+               ceph_msg_data_destroy(data);
+       }
+       m->data_length = 0;
 
        if (m->pool)
                ceph_msgpool_put(m->pool, m);
@@ -2977,7 +3302,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
 void ceph_msg_dump(struct ceph_msg *msg)
 {
        pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
-                msg->front_max, msg->length);
+                msg->front_max, msg->data_length);
        print_hex_dump(KERN_DEBUG, "header: ",
                       DUMP_PREFIX_OFFSET, 16, 1,
                       &msg->hdr, sizeof(msg->hdr), true);