]> Pileus Git - ~andy/linux/blobdiff - net/ceph/messenger.c
libceph: allocate ceph messages with a slab allocator
[~andy/linux] / net / ceph / messenger.c
index d4e46d8a088ccefe94cba6a40de8a3012b12b7b1..bc1ba4c2605dcd44023ee981dc84406dc1137399 100644 (file)
@@ -152,6 +152,10 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
        return test_and_set_bit(con_flag, &con->flags);
 }
 
+/* Slab caches for frequently-allocated structures */
+
+static struct kmem_cache       *ceph_msg_cache;
+
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -226,6 +230,22 @@ static void encode_my_addr(struct ceph_messenger *msgr)
  */
 static struct workqueue_struct *ceph_msgr_wq;
 
+static int ceph_msgr_slab_init(void)
+{
+       BUG_ON(ceph_msg_cache);
+       ceph_msg_cache = kmem_cache_create("ceph_msg",
+                                       sizeof (struct ceph_msg),
+                                       __alignof__(struct ceph_msg), 0, NULL);
+       return ceph_msg_cache ? 0 : -ENOMEM;
+}
+
+static void ceph_msgr_slab_exit(void)
+{
+       BUG_ON(!ceph_msg_cache);
+       kmem_cache_destroy(ceph_msg_cache);
+       ceph_msg_cache = NULL;
+}
+
 static void _ceph_msgr_exit(void)
 {
        if (ceph_msgr_wq) {
@@ -233,6 +253,8 @@ static void _ceph_msgr_exit(void)
                ceph_msgr_wq = NULL;
        }
 
+       ceph_msgr_slab_exit();
+
        BUG_ON(zero_page == NULL);
        kunmap(zero_page);
        page_cache_release(zero_page);
@@ -245,6 +267,9 @@ int ceph_msgr_init(void)
        zero_page = ZERO_PAGE(0);
        page_cache_get(zero_page);
 
+       if (ceph_msgr_slab_init())
+               return -ENOMEM;
+
        ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
        if (ceph_msgr_wq)
                return 0;
@@ -722,10 +747,10 @@ static void con_out_kvec_add(struct ceph_connection *con,
  * entry in the current bio iovec, or the first entry in the next
  * bio in the list.
  */
-static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
                                        size_t length)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_msg_data *data = cursor->data;
        struct bio *bio;
 
        BUG_ON(data->type != CEPH_MSG_DATA_BIO);
@@ -734,18 +759,18 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
        BUG_ON(!bio);
        BUG_ON(!bio->bi_vcnt);
 
-       cursor->resid = length;
+       cursor->resid = min(length, data->bio_length);
        cursor->bio = bio;
        cursor->vector_index = 0;
        cursor->vector_offset = 0;
        cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
 }
 
-static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
                                                size_t *page_offset,
                                                size_t *length)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_msg_data *data = cursor->data;
        struct bio *bio;
        struct bio_vec *bio_vec;
        unsigned int index;
@@ -766,20 +791,20 @@ static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
                *length = cursor->resid;
        else
                *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
-       BUG_ON(*length > PAGE_SIZE);
        BUG_ON(*length > cursor->resid);
+       BUG_ON(*page_offset + *length > PAGE_SIZE);
 
        return bio_vec->bv_page;
 }
 
-static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
+                                       size_t bytes)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
        struct bio *bio;
        struct bio_vec *bio_vec;
        unsigned int index;
 
-       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+       BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
 
        bio = cursor->bio;
        BUG_ON(!bio);
@@ -817,38 +842,38 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
 
        return true;
 }
-#endif
+#endif /* CONFIG_BLOCK */
 
 /*
  * For a page array, a piece comes from the first page in the array
  * that has not already been fully consumed.
  */
-static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
                                        size_t length)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_msg_data *data = cursor->data;
        int page_count;
 
        BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
 
        BUG_ON(!data->pages);
        BUG_ON(!data->length);
-       BUG_ON(length != data->length);
 
-       cursor->resid = length;
+       cursor->resid = min(length, data->length);
        page_count = calc_pages_for(data->alignment, (u64)data->length);
        cursor->page_offset = data->alignment & ~PAGE_MASK;
        cursor->page_index = 0;
-       BUG_ON(page_count > (int) USHRT_MAX);
-       cursor->page_count = (unsigned short) page_count;
-       cursor->last_piece = length <= PAGE_SIZE;
+       BUG_ON(page_count > (int)USHRT_MAX);
+       cursor->page_count = (unsigned short)page_count;
+       BUG_ON(length > SIZE_MAX - cursor->page_offset);
+       cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
 }
 
-static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
-                                       size_t *page_offset,
-                                       size_t *length)
+static struct page *
+ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
+                                       size_t *page_offset, size_t *length)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_msg_data *data = cursor->data;
 
        BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
 
@@ -864,26 +889,23 @@ static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
        return data->pages[cursor->page_index];
 }
 
-static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
                                                size_t bytes)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
-
-       BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+       BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
 
        BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
 
        /* Advance the cursor page offset */
 
        cursor->resid -= bytes;
-       cursor->page_offset += bytes;
-       if (!bytes || cursor->page_offset & ~PAGE_MASK)
+       cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
+       if (!bytes || cursor->page_offset)
                return false;   /* more bytes to process in the current page */
 
-       /* Move on to the next page */
+       /* Move on to the next page; offset is already at 0 */
 
        BUG_ON(cursor->page_index >= cursor->page_count);
-       cursor->page_offset = 0;
        cursor->page_index++;
        cursor->last_piece = cursor->resid <= PAGE_SIZE;
 
@@ -894,10 +916,11 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
  * For a pagelist, a piece is whatever remains to be consumed in the
  * first page in the list, or the front of the next page.
  */
-static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
+static void
+ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
                                        size_t length)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_msg_data *data = cursor->data;
        struct ceph_pagelist *pagelist;
        struct page *page;
 
@@ -905,7 +928,6 @@ static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
 
        pagelist = data->pagelist;
        BUG_ON(!pagelist);
-       BUG_ON(length != pagelist->length);
 
        if (!length)
                return;         /* pagelist can be assigned but empty */
@@ -913,17 +935,17 @@ static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
        BUG_ON(list_empty(&pagelist->head));
        page = list_first_entry(&pagelist->head, struct page, lru);
 
-       cursor->resid = length;
+       cursor->resid = min(length, pagelist->length);
        cursor->page = page;
        cursor->offset = 0;
-       cursor->last_piece = length <= PAGE_SIZE;
+       cursor->last_piece = cursor->resid <= PAGE_SIZE;
 }
 
-static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
-                                               size_t *page_offset,
-                                               size_t *length)
+static struct page *
+ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
+                               size_t *page_offset, size_t *length)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_msg_data *data = cursor->data;
        struct ceph_pagelist *pagelist;
 
        BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -934,19 +956,20 @@ static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
        BUG_ON(!cursor->page);
        BUG_ON(cursor->offset + cursor->resid != pagelist->length);
 
+       /* offset of first page in pagelist is always 0 */
        *page_offset = cursor->offset & ~PAGE_MASK;
-       if (cursor->last_piece) /* pagelist offset is always 0 */
+       if (cursor->last_piece)
                *length = cursor->resid;
        else
                *length = PAGE_SIZE - *page_offset;
 
-       return data->cursor.page;
+       return cursor->page;
 }
 
-static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
                                                size_t bytes)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_msg_data *data = cursor->data;
        struct ceph_pagelist *pagelist;
 
        BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -961,7 +984,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
 
        cursor->resid -= bytes;
        cursor->offset += bytes;
-       /* pagelist offset is always 0 */
+       /* offset of first page in pagelist is always 0 */
        if (!bytes || cursor->offset & ~PAGE_MASK)
                return false;   /* more bytes to process in the current page */
 
@@ -982,19 +1005,20 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
  * be processed in that piece.  It also tracks whether the current
  * piece is the last one in the data item.
  */
-static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
-                                       size_t length)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
 {
-       switch (data->type) {
+       size_t length = cursor->total_resid;
+
+       switch (cursor->data->type) {
        case CEPH_MSG_DATA_PAGELIST:
-               ceph_msg_data_pagelist_cursor_init(data, length);
+               ceph_msg_data_pagelist_cursor_init(cursor, length);
                break;
        case CEPH_MSG_DATA_PAGES:
-               ceph_msg_data_pages_cursor_init(data, length);
+               ceph_msg_data_pages_cursor_init(cursor, length);
                break;
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
-               ceph_msg_data_bio_cursor_init(data, length);
+               ceph_msg_data_bio_cursor_init(cursor, length);
                break;
 #endif /* CONFIG_BLOCK */
        case CEPH_MSG_DATA_NONE:
@@ -1002,7 +1026,24 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
                /* BUG(); */
                break;
        }
-       data->cursor.need_crc = true;
+       cursor->need_crc = true;
+}
+
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+       struct ceph_msg_data_cursor *cursor = &msg->cursor;
+       struct ceph_msg_data *data;
+
+       BUG_ON(!length);
+       BUG_ON(length > msg->data_length);
+       BUG_ON(list_empty(&msg->data));
+
+       cursor->data_head = &msg->data;
+       cursor->total_resid = length;
+       data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+       cursor->data = data;
+
+       __ceph_msg_data_cursor_init(cursor);
 }
 
 /*
@@ -1010,23 +1051,22 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
  * data item, and supply the page offset and length of that piece.
  * Indicate whether this is the last piece in this data item.
  */
-static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
-                                       size_t *page_offset,
-                                       size_t *length,
+static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+                                       size_t *page_offset, size_t *length,
                                        bool *last_piece)
 {
        struct page *page;
 
-       switch (data->type) {
+       switch (cursor->data->type) {
        case CEPH_MSG_DATA_PAGELIST:
-               page = ceph_msg_data_pagelist_next(data, page_offset, length);
+               page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
                break;
        case CEPH_MSG_DATA_PAGES:
-               page = ceph_msg_data_pages_next(data, page_offset, length);
+               page = ceph_msg_data_pages_next(cursor, page_offset, length);
                break;
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
-               page = ceph_msg_data_bio_next(data, page_offset, length);
+               page = ceph_msg_data_bio_next(cursor, page_offset, length);
                break;
 #endif /* CONFIG_BLOCK */
        case CEPH_MSG_DATA_NONE:
@@ -1038,7 +1078,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
        BUG_ON(*page_offset + *length > PAGE_SIZE);
        BUG_ON(!*length);
        if (last_piece)
-               *last_piece = data->cursor.last_piece;
+               *last_piece = cursor->last_piece;
 
        return page;
 }
@@ -1047,22 +1087,22 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
  * Returns true if the result moves the cursor on to the next piece
  * of the data item.
  */
-static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+                               size_t bytes)
 {
-       struct ceph_msg_data_cursor *cursor = &data->cursor;
        bool new_piece;
 
        BUG_ON(bytes > cursor->resid);
-       switch (data->type) {
+       switch (cursor->data->type) {
        case CEPH_MSG_DATA_PAGELIST:
-               new_piece = ceph_msg_data_pagelist_advance(data, bytes);
+               new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
                break;
        case CEPH_MSG_DATA_PAGES:
-               new_piece = ceph_msg_data_pages_advance(data, bytes);
+               new_piece = ceph_msg_data_pages_advance(cursor, bytes);
                break;
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
-               new_piece = ceph_msg_data_bio_advance(data, bytes);
+               new_piece = ceph_msg_data_bio_advance(cursor, bytes);
                break;
 #endif /* CONFIG_BLOCK */
        case CEPH_MSG_DATA_NONE:
@@ -1070,23 +1110,28 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
                BUG();
                break;
        }
-       data->cursor.need_crc = new_piece;
+       cursor->total_resid -= bytes;
+
+       if (!cursor->resid && cursor->total_resid) {
+               WARN_ON(!cursor->last_piece);
+               BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+               cursor->data = list_entry_next(cursor->data, links);
+               __ceph_msg_data_cursor_init(cursor);
+               new_piece = true;
+       }
+       cursor->need_crc = new_piece;
 
        return new_piece;
 }
 
-static void prepare_message_data(struct ceph_msg *msg)
+static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 {
-       size_t data_len;
-
        BUG_ON(!msg);
-
-       data_len = le32_to_cpu(msg->hdr.data_len);
        BUG_ON(!data_len);
 
        /* Initialize data cursor */
 
-       ceph_msg_data_cursor_init(msg->data, data_len);
+       ceph_msg_data_cursor_init(msg, (size_t)data_len);
 }
 
 /*
@@ -1149,11 +1194,12 @@ static void prepare_write_message(struct ceph_connection *con)
                m->hdr.seq = cpu_to_le64(++con->out_seq);
                m->needs_out_seq = false;
        }
+       WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
 
-       dout("prepare_write_message %p seq %lld type %d len %d+%d+%d\n",
+       dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
-            le32_to_cpu(m->hdr.data_len));
+            m->data_length);
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
        /* tag + hdr + front + middle */
@@ -1184,8 +1230,8 @@ static void prepare_write_message(struct ceph_connection *con)
 
        /* is there a data payload? */
        con->out_msg->footer.data_crc = 0;
-       if (m->hdr.data_len) {
-               prepare_message_data(con->out_msg);
+       if (m->data_length) {
+               prepare_message_data(con->out_msg, m->data_length);
                con->out_more = 1;  /* data + footer will follow */
        } else {
                /* no, queue up footer too and be done */
@@ -1406,13 +1452,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
 static int write_partial_message_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
-       struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
+       struct ceph_msg_data_cursor *cursor = &msg->cursor;
        bool do_datacrc = !con->msgr->nocrc;
        u32 crc;
 
        dout("%s %p msg %p\n", __func__, con, msg);
 
-       if (WARN_ON(!msg->data))
+       if (list_empty(&msg->data))
                return -EINVAL;
 
        /*
@@ -1432,7 +1478,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                bool need_crc;
                int ret;
 
-               page = ceph_msg_data_next(msg->data, &page_offset, &length,
+               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
                                                        &last_piece);
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
                                      length, last_piece);
@@ -1444,7 +1490,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                }
                if (do_datacrc && cursor->need_crc)
                        crc = ceph_crc32c_page(crc, page, page_offset, length);
-               need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
+               need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
        }
 
        dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2104,7 +2150,7 @@ static int read_partial_message_section(struct ceph_connection *con,
 static int read_partial_msg_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->in_msg;
-       struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
+       struct ceph_msg_data_cursor *cursor = &msg->cursor;
        const bool do_datacrc = !con->msgr->nocrc;
        struct page *page;
        size_t page_offset;
@@ -2113,13 +2159,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
        int ret;
 
        BUG_ON(!msg);
-       if (!msg->data)
+       if (list_empty(&msg->data))
                return -EIO;
 
        if (do_datacrc)
                crc = con->in_data_crc;
        while (cursor->resid) {
-               page = ceph_msg_data_next(msg->data, &page_offset, &length,
+               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
                                                        NULL);
                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
                if (ret <= 0) {
@@ -2131,7 +2177,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 
                if (do_datacrc)
                        crc = ceph_crc32c_page(crc, page, page_offset, ret);
-               (void) ceph_msg_data_advance(msg->data, (size_t)ret);
+               (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
        }
        if (do_datacrc)
                con->in_data_crc = crc;
@@ -2209,10 +2255,18 @@ static int read_partial_message(struct ceph_connection *con)
                ret = ceph_con_in_msg_alloc(con, &skip);
                if (ret < 0)
                        return ret;
+
+               BUG_ON(!con->in_msg ^ skip);
+               if (con->in_msg && data_len > con->in_msg->data_length) {
+                       pr_warning("%s skipping long message (%u > %zd)\n",
+                               __func__, data_len, con->in_msg->data_length);
+                       ceph_msg_put(con->in_msg);
+                       con->in_msg = NULL;
+                       skip = 1;
+               }
                if (skip) {
                        /* skip this message */
                        dout("alloc_msg said skip message\n");
-                       BUG_ON(con->in_msg);
                        con->in_base_pos = -front_len - middle_len - data_len -
                                sizeof(m->footer);
                        con->in_tag = CEPH_MSGR_TAG_READY;
@@ -2230,7 +2284,7 @@ static int read_partial_message(struct ceph_connection *con)
                /* prepare for data payload, if any */
 
                if (data_len)
-                       prepare_message_data(con->in_msg);
+                       prepare_message_data(con->in_msg, data_len);
        }
 
        /* front */
@@ -2957,6 +3011,7 @@ static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
        data = kzalloc(sizeof (*data), GFP_NOFS);
        if (data)
                data->type = type;
+       INIT_LIST_HEAD(&data->links);
 
        return data;
 }
@@ -2966,6 +3021,7 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
        if (!data)
                return;
 
+       WARN_ON(!list_empty(&data->links));
        if (data->type == CEPH_MSG_DATA_PAGELIST) {
                ceph_pagelist_release(data->pagelist);
                kfree(data->pagelist);
@@ -2973,14 +3029,13 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
        kfree(data);
 }
 
-void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
                size_t length, size_t alignment)
 {
        struct ceph_msg_data *data;
 
        BUG_ON(!pages);
        BUG_ON(!length);
-       BUG_ON(msg->data != NULL);
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
        BUG_ON(!data);
@@ -2988,41 +3043,46 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
        data->length = length;
        data->alignment = alignment & ~PAGE_MASK;
 
-       msg->data = data;
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += length;
 }
-EXPORT_SYMBOL(ceph_msg_data_set_pages);
+EXPORT_SYMBOL(ceph_msg_data_add_pages);
 
-void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
                                struct ceph_pagelist *pagelist)
 {
        struct ceph_msg_data *data;
 
        BUG_ON(!pagelist);
        BUG_ON(!pagelist->length);
-       BUG_ON(msg->data != NULL);
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
        BUG_ON(!data);
        data->pagelist = pagelist;
 
-       msg->data = data;
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += pagelist->length;
 }
-EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
+EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
 
-void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
+#ifdef CONFIG_BLOCK
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
+               size_t length)
 {
        struct ceph_msg_data *data;
 
        BUG_ON(!bio);
-       BUG_ON(msg->data != NULL);
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
        BUG_ON(!data);
        data->bio = bio;
+       data->bio_length = length;
 
-       msg->data = data;
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += length;
 }
-EXPORT_SYMBOL(ceph_msg_data_set_bio);
+EXPORT_SYMBOL(ceph_msg_data_add_bio);
+#endif /* CONFIG_BLOCK */
 
 /*
  * construct a new message with given type, size
@@ -3033,7 +3093,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 {
        struct ceph_msg *m;
 
-       m = kzalloc(sizeof(*m), flags);
+       m = kmem_cache_zalloc(ceph_msg_cache, flags);
        if (m == NULL)
                goto out;
 
@@ -3043,6 +3103,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 
        INIT_LIST_HEAD(&m->list_head);
        kref_init(&m->kref);
+       INIT_LIST_HEAD(&m->data);
 
        /* front */
        m->front_max = front_len;
@@ -3179,7 +3240,7 @@ void ceph_msg_kfree(struct ceph_msg *m)
                vfree(m->front.iov_base);
        else
                kfree(m->front.iov_base);
-       kfree(m);
+       kmem_cache_free(ceph_msg_cache, m);
 }
 
 /*
@@ -3188,6 +3249,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
 void ceph_msg_last_put(struct kref *kref)
 {
        struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+       LIST_HEAD(data);
+       struct list_head *links;
+       struct list_head *next;
 
        dout("ceph_msg_put last one on %p\n", m);
        WARN_ON(!list_empty(&m->list_head));
@@ -3197,8 +3261,16 @@ void ceph_msg_last_put(struct kref *kref)
                ceph_buffer_put(m->middle);
                m->middle = NULL;
        }
-       ceph_msg_data_destroy(m->data);
-       m->data = NULL;
+
+       list_splice_init(&m->data, &data);
+       list_for_each_safe(links, next, &data) {
+               struct ceph_msg_data *data;
+
+               data = list_entry(links, struct ceph_msg_data, links);
+               list_del_init(links);
+               ceph_msg_data_destroy(data);
+       }
+       m->data_length = 0;
 
        if (m->pool)
                ceph_msgpool_put(m->pool, m);
@@ -3210,7 +3282,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
 void ceph_msg_dump(struct ceph_msg *msg)
 {
        pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
-                msg->front_max, msg->data->length);
+                msg->front_max, msg->data_length);
        print_hex_dump(KERN_DEBUG, "header: ",
                       DUMP_PREFIX_OFFSET, 16, 1,
                       &msg->hdr, sizeof(msg->hdr), true);