return test_and_set_bit(con_flag, &con->flags);
}
+/* Slab caches for frequently-allocated structures */
+
+static struct kmem_cache *ceph_msg_cache;
+static struct kmem_cache *ceph_msg_data_cache;
+
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
*/
static struct workqueue_struct *ceph_msgr_wq;
+static int ceph_msgr_slab_init(void)
+{
+ BUG_ON(ceph_msg_cache);
+ ceph_msg_cache = kmem_cache_create("ceph_msg",
+ sizeof (struct ceph_msg),
+ __alignof__(struct ceph_msg), 0, NULL);
+
+ if (!ceph_msg_cache)
+ return -ENOMEM;
+
+ BUG_ON(ceph_msg_data_cache);
+ ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
+ sizeof (struct ceph_msg_data),
+ __alignof__(struct ceph_msg_data),
+ 0, NULL);
+ if (ceph_msg_data_cache)
+ return 0;
+
+ kmem_cache_destroy(ceph_msg_cache);
+ ceph_msg_cache = NULL;
+
+ return -ENOMEM;
+}
+
+static void ceph_msgr_slab_exit(void)
+{
+ BUG_ON(!ceph_msg_data_cache);
+ kmem_cache_destroy(ceph_msg_data_cache);
+ ceph_msg_data_cache = NULL;
+
+ BUG_ON(!ceph_msg_cache);
+ kmem_cache_destroy(ceph_msg_cache);
+ ceph_msg_cache = NULL;
+}
+
static void _ceph_msgr_exit(void)
{
if (ceph_msgr_wq) {
ceph_msgr_wq = NULL;
}
+ ceph_msgr_slab_exit();
+
BUG_ON(zero_page == NULL);
kunmap(zero_page);
page_cache_release(zero_page);
zero_page = ZERO_PAGE(0);
page_cache_get(zero_page);
+ if (ceph_msgr_slab_init())
+ return -ENOMEM;
+
ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
if (ceph_msgr_wq)
return 0;
}
#ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **bio_iter,
- unsigned int *bio_seg)
-{
- if (!bio) {
- *bio_iter = NULL;
- *bio_seg = 0;
- return;
- }
- *bio_iter = bio;
- *bio_seg = (unsigned int) bio->bi_idx;
-}
-
-static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
-{
- if (*bio_iter == NULL)
- return;
-
- BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
-
- (*seg)++;
- if (*seg == (*bio_iter)->bi_vcnt)
- init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
-}
/*
* For a bio data item, a piece is whatever remains of the next
* entry in the current bio iovec, or the first entry in the next
* bio in the list.
*/
-static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct bio *bio;
BUG_ON(data->type != CEPH_MSG_DATA_BIO);
bio = data->bio;
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
- /* resid = bio->bi_size */
+ cursor->resid = min(length, data->bio_length);
cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
- cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
+ cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
}
-static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset,
size_t *length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
*page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
BUG_ON(*page_offset >= PAGE_SIZE);
- *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
- BUG_ON(*length > PAGE_SIZE);
+ if (cursor->last_piece) /* pagelist offset is always 0 */
+ *length = cursor->resid;
+ else
+ *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+ BUG_ON(*length > cursor->resid);
+ BUG_ON(*page_offset + *length > PAGE_SIZE);
return bio_vec->bv_page;
}
-static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
- BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
bio = cursor->bio;
BUG_ON(!bio);
index = cursor->vector_index;
BUG_ON(index >= (unsigned int) bio->bi_vcnt);
bio_vec = &bio->bi_io_vec[index];
- BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
/* Advance the cursor offset */
+ BUG_ON(cursor->resid < bytes);
+ cursor->resid -= bytes;
cursor->vector_offset += bytes;
if (cursor->vector_offset < bio_vec->bv_len)
return false; /* more bytes to process in this segment */
+ BUG_ON(cursor->vector_offset != bio_vec->bv_len);
/* Move on to the next segment, and possibly the next bio */
- if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
+ if (++index == (unsigned int) bio->bi_vcnt) {
bio = bio->bi_next;
- cursor->bio = bio;
- cursor->vector_index = 0;
+ index = 0;
}
+ cursor->bio = bio;
+ cursor->vector_index = index;
cursor->vector_offset = 0;
- if (!cursor->last_piece && bio && !bio->bi_next)
- if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
+ if (!cursor->last_piece) {
+ BUG_ON(!cursor->resid);
+ BUG_ON(!bio);
+ /* A short read is OK, so use <= rather than == */
+ if (cursor->resid <= bio->bi_io_vec[index].bv_len)
cursor->last_piece = true;
+ }
return true;
}
-#endif
+#endif /* CONFIG_BLOCK */
/*
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
*/
-static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data *data = cursor->data;
int page_count;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(!data->pages);
BUG_ON(!data->length);
+ cursor->resid = min(length, data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
- BUG_ON(page_count > (int) USHRT_MAX);
- cursor->resid = data->length;
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
- cursor->page_count = (unsigned short) page_count;
- cursor->last_piece = cursor->page_count == 1;
+ BUG_ON(page_count > (int)USHRT_MAX);
+ cursor->page_count = (unsigned short)page_count;
+ BUG_ON(length > SIZE_MAX - cursor->page_offset);
+ cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
}
-static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length)
+static struct page *
+ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data *data = cursor->data;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_index >= cursor->page_count);
BUG_ON(cursor->page_offset >= PAGE_SIZE);
- BUG_ON(!cursor->resid);
*page_offset = cursor->page_offset;
- if (cursor->last_piece) {
- BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
+ if (cursor->last_piece)
*length = cursor->resid;
- } else {
+ else
*length = PAGE_SIZE - *page_offset;
- }
return data->pages[cursor->page_index];
}
-static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
- BUG_ON(bytes > cursor->resid);
/* Advance the cursor page offset */
cursor->resid -= bytes;
- cursor->page_offset += bytes;
- if (!bytes || cursor->page_offset & ~PAGE_MASK)
+ cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
+ if (!bytes || cursor->page_offset)
return false; /* more bytes to process in the current page */
- /* Move on to the next page */
+ /* Move on to the next page; offset is already at 0 */
BUG_ON(cursor->page_index >= cursor->page_count);
- cursor->page_offset = 0;
cursor->page_index++;
- cursor->last_piece = cursor->page_index == cursor->page_count - 1;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
return true;
}
* For a pagelist, a piece is whatever remains to be consumed in the
* first page in the list, or the front of the next page.
*/
-static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data)
+static void
+ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
struct page *page;
pagelist = data->pagelist;
BUG_ON(!pagelist);
- if (!pagelist->length)
+
+ if (!length)
return; /* pagelist can be assigned but empty */
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
+ cursor->resid = min(length, pagelist->length);
cursor->page = page;
cursor->offset = 0;
- cursor->last_piece = pagelist->length <= PAGE_SIZE;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
}
-static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length)
+static struct page *
+ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
- size_t piece_end;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
BUG_ON(!pagelist);
BUG_ON(!cursor->page);
- BUG_ON(cursor->offset >= pagelist->length);
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
- if (cursor->last_piece) {
- /* pagelist offset is always 0 */
- piece_end = pagelist->length & ~PAGE_MASK;
- if (!piece_end)
- piece_end = PAGE_SIZE;
- } else {
- piece_end = PAGE_SIZE;
- }
+ /* offset of first page in pagelist is always 0 */
*page_offset = cursor->offset & ~PAGE_MASK;
- *length = piece_end - *page_offset;
+ if (cursor->last_piece)
+ *length = cursor->resid;
+ else
+ *length = PAGE_SIZE - *page_offset;
- return data->cursor.page;
+ return cursor->page;
}
-static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
pagelist = data->pagelist;
BUG_ON(!pagelist);
- BUG_ON(!cursor->page);
- BUG_ON(cursor->offset + bytes > pagelist->length);
+
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
/* Advance the cursor offset */
+ cursor->resid -= bytes;
cursor->offset += bytes;
- /* pagelist offset is always 0 */
+ /* offset of first page in pagelist is always 0 */
if (!bytes || cursor->offset & ~PAGE_MASK)
return false; /* more bytes to process in the current page */
BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
cursor->page = list_entry_next(cursor->page, lru);
-
- /* cursor offset is at page boundary; pagelist offset is always 0 */
- if (pagelist->length - cursor->offset <= PAGE_SIZE)
- cursor->last_piece = true;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
return true;
}
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
{
- switch (data->type) {
+ size_t length = cursor->total_resid;
+
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- ceph_msg_data_pagelist_cursor_init(data);
+ ceph_msg_data_pagelist_cursor_init(cursor, length);
break;
case CEPH_MSG_DATA_PAGES:
- ceph_msg_data_pages_cursor_init(data);
+ ceph_msg_data_pages_cursor_init(cursor, length);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- ceph_msg_data_bio_cursor_init(data);
+ ceph_msg_data_bio_cursor_init(cursor, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
/* BUG(); */
break;
}
+ cursor->need_crc = true;
+}
+
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ cursor->data_head = &msg->data;
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
+
+ __ceph_msg_data_cursor_init(cursor);
}
/*
* data item, and supply the page offset and length of that piece.
* Indicate whether this is the last piece in this data item.
*/
-static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length,
+static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length,
bool *last_piece)
{
struct page *page;
- switch (data->type) {
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- page = ceph_msg_data_pagelist_next(data, page_offset, length);
+ page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
break;
case CEPH_MSG_DATA_PAGES:
- page = ceph_msg_data_pages_next(data, page_offset, length);
+ page = ceph_msg_data_pages_next(cursor, page_offset, length);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- page = ceph_msg_data_bio_next(data, page_offset, length);
+ page = ceph_msg_data_bio_next(cursor, page_offset, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
BUG_ON(*page_offset + *length > PAGE_SIZE);
BUG_ON(!*length);
if (last_piece)
- *last_piece = data->cursor.last_piece;
+ *last_piece = cursor->last_piece;
return page;
}
* Returns true if the result moves the cursor on to the next piece
* of the data item.
*/
-static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
{
bool new_piece;
- switch (data->type) {
+ BUG_ON(bytes > cursor->resid);
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- new_piece = ceph_msg_data_pagelist_advance(data, bytes);
+ new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
break;
case CEPH_MSG_DATA_PAGES:
- new_piece = ceph_msg_data_pages_advance(data, bytes);
+ new_piece = ceph_msg_data_pages_advance(cursor, bytes);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- new_piece = ceph_msg_data_bio_advance(data, bytes);
+ new_piece = ceph_msg_data_bio_advance(cursor, bytes);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
BUG();
break;
}
+ cursor->total_resid -= bytes;
+
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!cursor->last_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ new_piece = true;
+ }
+ cursor->need_crc = new_piece;
return new_piece;
}
-static void prepare_message_data(struct ceph_msg *msg,
- struct ceph_msg_pos *msg_pos)
+static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{
BUG_ON(!msg);
- BUG_ON(!msg->hdr.data_len);
+ BUG_ON(!data_len);
- /* initialize page iterator */
- msg_pos->page = 0;
- if (ceph_msg_has_pages(msg))
- msg_pos->page_pos = msg->p.alignment;
- else
- msg_pos->page_pos = 0;
-#ifdef CONFIG_BLOCK
- if (ceph_msg_has_bio(msg))
- init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
-#endif
- msg_pos->data_pos = 0;
+ /* Initialize data cursor */
- /* Initialize data cursors */
-
-#ifdef CONFIG_BLOCK
- if (ceph_msg_has_bio(msg))
- ceph_msg_data_cursor_init(&msg->b);
-#endif /* CONFIG_BLOCK */
- if (ceph_msg_has_pages(msg))
- ceph_msg_data_cursor_init(&msg->p);
- if (ceph_msg_has_pagelist(msg))
- ceph_msg_data_cursor_init(&msg->l);
-
- msg_pos->did_page_crc = false;
+ ceph_msg_data_cursor_init(msg, (size_t)data_len);
}
/*
m->hdr.seq = cpu_to_le64(++con->out_seq);
m->needs_out_seq = false;
}
+ WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
- dout("prepare_write_message %p seq %lld type %d len %d+%d+%d (%zd)\n",
+ dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
- le32_to_cpu(m->hdr.data_len), m->p.length);
+ m->data_length);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front + middle */
/* is there a data payload? */
con->out_msg->footer.data_crc = 0;
- if (m->hdr.data_len) {
- prepare_message_data(con->out_msg, &con->out_msg_pos);
+ if (m->data_length) {
+ prepare_message_data(con->out_msg, m->data_length);
con->out_more = 1; /* data + footer will follow */
} else {
/* no, queue up footer too and be done */
return ret; /* done! */
}
-static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
- size_t len, size_t sent)
-{
- struct ceph_msg *msg = con->out_msg;
- struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
- bool need_crc = false;
-
- BUG_ON(!msg);
- BUG_ON(!sent);
-
- msg_pos->data_pos += sent;
- msg_pos->page_pos += sent;
- if (ceph_msg_has_pages(msg))
- need_crc = ceph_msg_data_advance(&msg->p, sent);
- else if (ceph_msg_has_pagelist(msg))
- need_crc = ceph_msg_data_advance(&msg->l, sent);
-#ifdef CONFIG_BLOCK
- else if (ceph_msg_has_bio(msg))
- need_crc = ceph_msg_data_advance(&msg->b, sent);
-#endif /* CONFIG_BLOCK */
- BUG_ON(need_crc && sent != len);
-
- if (sent < len)
- return;
-
- BUG_ON(sent != len);
- msg_pos->page_pos = 0;
- msg_pos->page++;
- msg_pos->did_page_crc = false;
-}
-
-static void in_msg_pos_next(struct ceph_connection *con, size_t len,
- size_t received)
-{
- struct ceph_msg *msg = con->in_msg;
- struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
-
- BUG_ON(!msg);
- BUG_ON(!received);
-
- msg_pos->data_pos += received;
- msg_pos->page_pos += received;
- if (received < len)
- return;
-
- BUG_ON(received != len);
- msg_pos->page_pos = 0;
- msg_pos->page++;
-#ifdef CONFIG_BLOCK
- if (msg->b.bio)
- iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
-#endif /* CONFIG_BLOCK */
-}
-
static u32 ceph_crc32c_page(u32 crc, struct page *page,
unsigned int page_offset,
unsigned int length)
static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
- unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !con->msgr->nocrc;
- int ret;
+ u32 crc;
+
+ dout("%s %p msg %p\n", __func__, con, msg);
- dout("%s %p msg %p page %d offset %d\n", __func__,
- con, msg, msg_pos->page, msg_pos->page_pos);
+ if (list_empty(&msg->data))
+ return -EINVAL;
/*
* Iterate through each page that contains data to be
* need to map the page. If we have no pages, they have
* been revoked, so use the zero page.
*/
- while (data_len > msg_pos->data_pos) {
+ crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
+ while (cursor->resid) {
struct page *page;
size_t page_offset;
size_t length;
bool last_piece;
+ bool need_crc;
+ int ret;
- if (ceph_msg_has_pages(msg)) {
- page = ceph_msg_data_next(&msg->p, &page_offset,
- &length, &last_piece);
- } else if (ceph_msg_has_pagelist(msg)) {
- page = ceph_msg_data_next(&msg->l, &page_offset,
- &length, &last_piece);
-#ifdef CONFIG_BLOCK
- } else if (ceph_msg_has_bio(msg)) {
- page = ceph_msg_data_next(&msg->b, &page_offset,
- &length, &last_piece);
-#endif
- } else {
- size_t resid = data_len - msg_pos->data_pos;
-
- page = zero_page;
- page_offset = msg_pos->page_pos;
- length = PAGE_SIZE - page_offset;
- length = min(resid, length);
- last_piece = length == resid;
- }
- if (do_datacrc && !msg_pos->did_page_crc) {
- u32 crc = le32_to_cpu(msg->footer.data_crc);
-
- crc = ceph_crc32c_page(crc, page, page_offset, length);
- msg->footer.data_crc = cpu_to_le32(crc);
- msg_pos->did_page_crc = true;
- }
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+ &last_piece);
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, last_piece);
- if (ret <= 0)
- goto out;
+ if (ret <= 0) {
+ if (do_datacrc)
+ msg->footer.data_crc = cpu_to_le32(crc);
- out_msg_pos_next(con, page, length, (size_t) ret);
+ return ret;
+ }
+ if (do_datacrc && cursor->need_crc)
+ crc = ceph_crc32c_page(crc, page, page_offset, length);
+ need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
dout("%s %p msg %p done\n", __func__, con, msg);
/* prepare and queue up footer, too */
- if (!do_datacrc)
+ if (do_datacrc)
+ msg->footer.data_crc = cpu_to_le32(crc);
+ else
msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
con_out_kvec_reset(con);
prepare_write_message_footer(con);
- ret = 1;
-out:
- return ret;
+
+ return 1; /* must return > 0 to indicate success */
}
/*
con->error_msg = "connect authorization failure";
return -1;
}
- con->auth_retry = 1;
con_out_kvec_reset(con);
ret = prepare_write_connect(con);
if (ret < 0)
WARN_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_OPEN;
-
+ con->auth_retry = 0; /* we authenticated; clear flag */
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->peer_features = server_feat;
return 1;
}
-static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
-
-static int read_partial_message_pages(struct ceph_connection *con,
- struct page **pages,
- unsigned int data_len, bool do_datacrc)
-{
- struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
- struct page *page;
- size_t page_offset;
- size_t length;
- unsigned int left;
- int ret;
-
- /* (page) data */
- BUG_ON(pages == NULL);
- page = pages[msg_pos->page];
- page_offset = msg_pos->page_pos;
- BUG_ON(msg_pos->data_pos >= data_len);
- left = data_len - msg_pos->data_pos;
- BUG_ON(page_offset >= PAGE_SIZE);
- length = min_t(unsigned int, PAGE_SIZE - page_offset, left);
-
- ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
- if (ret <= 0)
- return ret;
-
- if (do_datacrc)
- con->in_data_crc = ceph_crc32c_page(con->in_data_crc, page,
- page_offset, ret);
-
- in_msg_pos_next(con, length, ret);
-
- return ret;
-}
-
-#ifdef CONFIG_BLOCK
-static int read_partial_message_bio(struct ceph_connection *con,
- unsigned int data_len, bool do_datacrc)
+static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
- struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
- struct bio_vec *bv;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ const bool do_datacrc = !con->msgr->nocrc;
struct page *page;
size_t page_offset;
size_t length;
- unsigned int left;
+ u32 crc = 0;
int ret;
BUG_ON(!msg);
- BUG_ON(!msg->b.bio_iter);
- bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg);
- page = bv->bv_page;
- page_offset = bv->bv_offset + msg_pos->page_pos;
- BUG_ON(msg_pos->data_pos >= data_len);
- left = data_len - msg_pos->data_pos;
- BUG_ON(msg_pos->page_pos >= bv->bv_len);
- length = min_t(unsigned int, bv->bv_len - msg_pos->page_pos, left);
-
- ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
- if (ret <= 0)
- return ret;
+ if (list_empty(&msg->data))
+ return -EIO;
if (do_datacrc)
- con->in_data_crc = ceph_crc32c_page(con->in_data_crc, page,
- page_offset, ret);
-
- in_msg_pos_next(con, length, ret);
-
- return ret;
-}
-#endif
-
-static int read_partial_msg_data(struct ceph_connection *con)
-{
- struct ceph_msg *msg = con->in_msg;
- struct ceph_msg_pos *msg_pos = &con->in_msg_pos;
- const bool do_datacrc = !con->msgr->nocrc;
- unsigned int data_len;
- int ret;
-
- BUG_ON(!msg);
+ crc = con->in_data_crc;
+ while (cursor->resid) {
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+ NULL);
+ ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+ if (ret <= 0) {
+ if (do_datacrc)
+ con->in_data_crc = crc;
- data_len = le32_to_cpu(con->in_hdr.data_len);
- while (msg_pos->data_pos < data_len) {
- if (ceph_msg_has_pages(msg)) {
- ret = read_partial_message_pages(con, msg->p.pages,
- data_len, do_datacrc);
- if (ret <= 0)
- return ret;
-#ifdef CONFIG_BLOCK
- } else if (ceph_msg_has_bio(msg)) {
- ret = read_partial_message_bio(con,
- data_len, do_datacrc);
- if (ret <= 0)
- return ret;
-#endif
- } else {
- BUG_ON(1);
+ return ret;
}
+
+ if (do_datacrc)
+ crc = ceph_crc32c_page(crc, page, page_offset, ret);
+ (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
+ if (do_datacrc)
+ con->in_data_crc = crc;
return 1; /* must return > 0 to indicate success */
}
/*
* read (part of) a message.
*/
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
+
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
ret = ceph_con_in_msg_alloc(con, &skip);
if (ret < 0)
return ret;
+
+ BUG_ON(!con->in_msg ^ skip);
+ if (con->in_msg && data_len > con->in_msg->data_length) {
+ pr_warning("%s skipping long message (%u > %zd)\n",
+ __func__, data_len, con->in_msg->data_length);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ skip = 1;
+ }
if (skip) {
/* skip this message */
dout("alloc_msg said skip message\n");
- BUG_ON(con->in_msg);
con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
/* prepare for data payload, if any */
if (data_len)
- prepare_message_data(con->in_msg, &con->in_msg_pos);
+ prepare_message_data(con->in_msg, data_len);
}
/* front */
}
EXPORT_SYMBOL(ceph_con_keepalive);
-static void ceph_msg_data_init(struct ceph_msg_data *data)
+static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
+{
+ struct ceph_msg_data *data;
+
+ if (WARN_ON(!ceph_msg_data_type_valid(type)))
+ return NULL;
+
+ data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
+ if (data)
+ data->type = type;
+ INIT_LIST_HEAD(&data->links);
+
+ return data;
+}
+
+static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
- data->type = CEPH_MSG_DATA_NONE;
+ if (!data)
+ return;
+
+ WARN_ON(!list_empty(&data->links));
+ if (data->type == CEPH_MSG_DATA_PAGELIST) {
+ ceph_pagelist_release(data->pagelist);
+ kfree(data->pagelist);
+ }
+ kmem_cache_free(ceph_msg_data_cache, data);
}
-void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment)
{
+ struct ceph_msg_data *data;
+
BUG_ON(!pages);
BUG_ON(!length);
- BUG_ON(msg->p.type != CEPH_MSG_DATA_NONE);
- msg->p.type = CEPH_MSG_DATA_PAGES;
- msg->p.pages = pages;
- msg->p.length = length;
- msg->p.alignment = alignment & ~PAGE_MASK;
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
+ BUG_ON(!data);
+ data->pages = pages;
+ data->length = length;
+ data->alignment = alignment & ~PAGE_MASK;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_pages);
+EXPORT_SYMBOL(ceph_msg_data_add_pages);
-void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist)
{
+ struct ceph_msg_data *data;
+
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
- BUG_ON(msg->l.type != CEPH_MSG_DATA_NONE);
- msg->l.type = CEPH_MSG_DATA_PAGELIST;
- msg->l.pagelist = pagelist;
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
+ BUG_ON(!data);
+ data->pagelist = pagelist;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += pagelist->length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
+EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
-void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
+#ifdef CONFIG_BLOCK
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
+ size_t length)
{
+ struct ceph_msg_data *data;
+
BUG_ON(!bio);
- BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
- msg->b.type = CEPH_MSG_DATA_BIO;
- msg->b.bio = bio;
+ data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
+ BUG_ON(!data);
+ data->bio = bio;
+ data->bio_length = length;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_bio);
+EXPORT_SYMBOL(ceph_msg_data_add_bio);
+#endif /* CONFIG_BLOCK */
/*
* construct a new message with given type, size
{
struct ceph_msg *m;
- m = kzalloc(sizeof(*m), flags);
+ m = kmem_cache_zalloc(ceph_msg_cache, flags);
if (m == NULL)
goto out;
INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref);
-
- ceph_msg_data_init(&m->p);
- ceph_msg_data_init(&m->l);
- ceph_msg_data_init(&m->b);
+ INIT_LIST_HEAD(&m->data);
/* front */
m->front_max = front_len;
vfree(m->front.iov_base);
else
kfree(m->front.iov_base);
- kfree(m);
+ kmem_cache_free(ceph_msg_cache, m);
}
/*
void ceph_msg_last_put(struct kref *kref)
{
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+ LIST_HEAD(data);
+ struct list_head *links;
+ struct list_head *next;
dout("ceph_msg_put last one on %p\n", m);
WARN_ON(!list_empty(&m->list_head));
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- if (ceph_msg_has_pages(m)) {
- m->p.length = 0;
- m->p.pages = NULL;
- }
- if (ceph_msg_has_pagelist(m)) {
- ceph_pagelist_release(m->l.pagelist);
- kfree(m->l.pagelist);
- m->l.pagelist = NULL;
+ list_splice_init(&m->data, &data);
+ list_for_each_safe(links, next, &data) {
+ struct ceph_msg_data *data;
+
+ data = list_entry(links, struct ceph_msg_data, links);
+ list_del_init(links);
+ ceph_msg_data_destroy(data);
}
+ m->data_length = 0;
if (m->pool)
ceph_msgpool_put(m->pool, m);
void ceph_msg_dump(struct ceph_msg *msg)
{
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
- msg->front_max, msg->p.length);
+ msg->front_max, msg->data_length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);