return test_and_set_bit(con_flag, &con->flags);
}
+/* Slab caches for frequently-allocated structures */
+
+static struct kmem_cache *ceph_msg_cache;
+
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
*/
static struct workqueue_struct *ceph_msgr_wq;
+static int ceph_msgr_slab_init(void)
+{
+ BUG_ON(ceph_msg_cache);
+ ceph_msg_cache = kmem_cache_create("ceph_msg",
+ sizeof (struct ceph_msg),
+ __alignof__(struct ceph_msg), 0, NULL);
+ return ceph_msg_cache ? 0 : -ENOMEM;
+}
+
+static void ceph_msgr_slab_exit(void)
+{
+ BUG_ON(!ceph_msg_cache);
+ kmem_cache_destroy(ceph_msg_cache);
+ ceph_msg_cache = NULL;
+}
+
static void _ceph_msgr_exit(void)
{
if (ceph_msgr_wq) {
ceph_msgr_wq = NULL;
}
+ ceph_msgr_slab_exit();
+
BUG_ON(zero_page == NULL);
kunmap(zero_page);
page_cache_release(zero_page);
zero_page = ZERO_PAGE(0);
page_cache_get(zero_page);
+ if (ceph_msgr_slab_init())
+ return -ENOMEM;
+
ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
if (ceph_msgr_wq)
return 0;
* entry in the current bio iovec, or the first entry in the next
* bio in the list.
*/
-static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct bio *bio;
BUG_ON(data->type != CEPH_MSG_DATA_BIO);
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
- cursor->resid = length;
+ cursor->resid = min(length, data->bio_length);
cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
}
-static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset,
size_t *length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
return bio_vec->bv_page;
}
-static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
- BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
bio = cursor->bio;
BUG_ON(!bio);
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
*/
-static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
int page_count;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(!data->pages);
BUG_ON(!data->length);
- BUG_ON(length > data->length); /* short reads are OK */
- cursor->resid = length;
+ cursor->resid = min(length, data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
}
-static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length)
+static struct page *
+ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
return data->pages[cursor->page_index];
}
-static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
* For a pagelist, a piece is whatever remains to be consumed in the
* first page in the list, or the front of the next page.
*/
-static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
+static void
+ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
struct page *page;
pagelist = data->pagelist;
BUG_ON(!pagelist);
- BUG_ON(length > pagelist->length); /* short reads are OK */
if (!length)
return; /* pagelist can be assigned but empty */
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
- cursor->resid = length;
+ cursor->resid = min(length, pagelist->length);
cursor->page = page;
cursor->offset = 0;
- cursor->last_piece = length <= PAGE_SIZE;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
}
-static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length)
+static struct page *
+ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
else
*length = PAGE_SIZE - *page_offset;
- return data->cursor->page;
+ return cursor->page;
}
-static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
- size_t length)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
{
- switch (data->type) {
+ size_t length = cursor->total_resid;
+
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- ceph_msg_data_pagelist_cursor_init(data, length);
+ ceph_msg_data_pagelist_cursor_init(cursor, length);
break;
case CEPH_MSG_DATA_PAGES:
- ceph_msg_data_pages_cursor_init(data, length);
+ ceph_msg_data_pages_cursor_init(cursor, length);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- ceph_msg_data_bio_cursor_init(data, length);
+ ceph_msg_data_bio_cursor_init(cursor, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
/* BUG(); */
break;
}
- data->cursor->need_crc = true;
+ cursor->need_crc = true;
+}
+
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ cursor->data_head = &msg->data;
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
+
+ __ceph_msg_data_cursor_init(cursor);
}
/*
* data item, and supply the page offset and length of that piece.
* Indicate whether this is the last piece in this data item.
*/
-static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length,
+static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length,
bool *last_piece)
{
struct page *page;
- switch (data->type) {
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- page = ceph_msg_data_pagelist_next(data, page_offset, length);
+ page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
break;
case CEPH_MSG_DATA_PAGES:
- page = ceph_msg_data_pages_next(data, page_offset, length);
+ page = ceph_msg_data_pages_next(cursor, page_offset, length);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- page = ceph_msg_data_bio_next(data, page_offset, length);
+ page = ceph_msg_data_bio_next(cursor, page_offset, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
BUG_ON(*page_offset + *length > PAGE_SIZE);
BUG_ON(!*length);
if (last_piece)
- *last_piece = data->cursor->last_piece;
+ *last_piece = cursor->last_piece;
return page;
}
* Returns true if the result moves the cursor on to the next piece
* of the data item.
*/
-static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
bool new_piece;
BUG_ON(bytes > cursor->resid);
- switch (data->type) {
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- new_piece = ceph_msg_data_pagelist_advance(data, bytes);
+ new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
break;
case CEPH_MSG_DATA_PAGES:
- new_piece = ceph_msg_data_pages_advance(data, bytes);
+ new_piece = ceph_msg_data_pages_advance(cursor, bytes);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- new_piece = ceph_msg_data_bio_advance(data, bytes);
+ new_piece = ceph_msg_data_bio_advance(cursor, bytes);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
BUG();
break;
}
- data->cursor->need_crc = new_piece;
+ cursor->total_resid -= bytes;
+
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!cursor->last_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ new_piece = true;
+ }
+ cursor->need_crc = new_piece;
return new_piece;
}
/* Initialize data cursor */
- ceph_msg_data_cursor_init(msg->data, (size_t)data_len);
+ ceph_msg_data_cursor_init(msg, (size_t)data_len);
}
/*
static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- struct ceph_msg_data_cursor *cursor = msg->data->cursor;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !con->msgr->nocrc;
u32 crc;
dout("%s %p msg %p\n", __func__, con, msg);
- if (WARN_ON(!msg->data))
+ if (list_empty(&msg->data))
return -EINVAL;
/*
bool need_crc;
int ret;
- page = ceph_msg_data_next(msg->data, &page_offset, &length,
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
&last_piece);
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, last_piece);
}
if (do_datacrc && cursor->need_crc)
crc = ceph_crc32c_page(crc, page, page_offset, length);
- need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
+ need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
dout("%s %p msg %p done\n", __func__, con, msg);
static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
- struct ceph_msg_data_cursor *cursor = msg->data->cursor;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
const bool do_datacrc = !con->msgr->nocrc;
struct page *page;
size_t page_offset;
int ret;
BUG_ON(!msg);
- if (!msg->data)
+ if (list_empty(&msg->data))
return -EIO;
if (do_datacrc)
crc = con->in_data_crc;
while (cursor->resid) {
- page = ceph_msg_data_next(msg->data, &page_offset, &length,
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) {
if (do_datacrc)
crc = ceph_crc32c_page(crc, page, page_offset, ret);
- (void) ceph_msg_data_advance(msg->data, (size_t)ret);
+ (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
if (do_datacrc)
con->in_data_crc = crc;
data = kzalloc(sizeof (*data), GFP_NOFS);
if (data)
data->type = type;
+ INIT_LIST_HEAD(&data->links);
return data;
}
if (!data)
return;
+ WARN_ON(!list_empty(&data->links));
if (data->type == CEPH_MSG_DATA_PAGELIST) {
ceph_pagelist_release(data->pagelist);
kfree(data->pagelist);
kfree(data);
}
-void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment)
{
struct ceph_msg_data *data;
BUG_ON(!pages);
BUG_ON(!length);
- BUG_ON(msg->data_length);
- BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data);
- data->cursor = &msg->cursor;
data->pages = pages;
data->length = length;
data->alignment = alignment & ~PAGE_MASK;
- msg->data = data;
- msg->data_length = length;
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_pages);
+EXPORT_SYMBOL(ceph_msg_data_add_pages);
-void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist)
{
struct ceph_msg_data *data;
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
- BUG_ON(msg->data_length);
- BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data);
- data->cursor = &msg->cursor;
data->pagelist = pagelist;
- msg->data = data;
- msg->data_length = pagelist->length;
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += pagelist->length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
+EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
#ifdef CONFIG_BLOCK
-void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
size_t length)
{
struct ceph_msg_data *data;
BUG_ON(!bio);
- BUG_ON(msg->data_length);
- BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);
- data->cursor = &msg->cursor;
data->bio = bio;
data->bio_length = length;
- msg->data = data;
- msg->data_length = length;
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_bio);
+EXPORT_SYMBOL(ceph_msg_data_add_bio);
#endif /* CONFIG_BLOCK */
/*
{
struct ceph_msg *m;
- m = kzalloc(sizeof(*m), flags);
+ m = kmem_cache_zalloc(ceph_msg_cache, flags);
if (m == NULL)
goto out;
INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref);
+ INIT_LIST_HEAD(&m->data);
/* front */
m->front_max = front_len;
vfree(m->front.iov_base);
else
kfree(m->front.iov_base);
- kfree(m);
+ kmem_cache_free(ceph_msg_cache, m);
}
/*
void ceph_msg_last_put(struct kref *kref)
{
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+ LIST_HEAD(data);
+ struct list_head *links;
+ struct list_head *next;
dout("ceph_msg_put last one on %p\n", m);
WARN_ON(!list_empty(&m->list_head));
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- ceph_msg_data_destroy(m->data);
- m->data = NULL;
+
+ list_splice_init(&m->data, &data);
+ list_for_each_safe(links, next, &data) {
+ struct ceph_msg_data *data;
+
+ data = list_entry(links, struct ceph_msg_data, links);
+ list_del_init(links);
+ ceph_msg_data_destroy(data);
+ }
m->data_length = 0;
if (m->pool)
void ceph_msg_dump(struct ceph_msg *msg)
{
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
- msg->front_max, msg->data->length);
+ msg->front_max, msg->data_length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);