]> Pileus Git - ~andy/linux/blobdiff - net/ceph/messenger.c
libceph: implement multiple data items in a message
[~andy/linux] / net / ceph / messenger.c
index 8bfe7d34bc85a540ee43b1b4aef5f70ec8400812..84703e550c26e3540b23122864bc9b0c5b68435a 100644 (file)
@@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
        BUG_ON(!bio);
        BUG_ON(!bio->bi_vcnt);
 
-       cursor->resid = length;
+       cursor->resid = min(length, data->bio_length);
        cursor->bio = bio;
        cursor->vector_index = 0;
        cursor->vector_offset = 0;
@@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
 
        BUG_ON(!data->pages);
        BUG_ON(!data->length);
-       BUG_ON(length > data->length);  /* short reads are OK */
 
-       cursor->resid = length;
+       cursor->resid = min(length, data->length);
        page_count = calc_pages_for(data->alignment, (u64)data->length);
        cursor->page_offset = data->alignment & ~PAGE_MASK;
        cursor->page_index = 0;
@@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
 
        pagelist = data->pagelist;
        BUG_ON(!pagelist);
-       BUG_ON(length > pagelist->length);      /* short reads are OK */
 
        if (!length)
                return;         /* pagelist can be assigned but empty */
@@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
        BUG_ON(list_empty(&pagelist->head));
        page = list_first_entry(&pagelist->head, struct page, lru);
 
-       cursor->resid = length;
+       cursor->resid = min(length, pagelist->length);
        cursor->page = page;
        cursor->offset = 0;
        cursor->last_piece = length <= PAGE_SIZE;
@@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
  * be processed in that piece.  It also tracks whether the current
  * piece is the last one in the data item.
  */
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
 {
-       struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       struct ceph_msg_data *data;
+       size_t length = cursor->total_resid;
 
-       data = list_first_entry(&msg->data, struct ceph_msg_data, links);
-       cursor->data = data;
        switch (cursor->data->type) {
        case CEPH_MSG_DATA_PAGELIST:
                ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
        cursor->need_crc = true;
 }
 
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+       struct ceph_msg_data_cursor *cursor = &msg->cursor;
+       struct ceph_msg_data *data;
+
+       BUG_ON(!length);
+       BUG_ON(length > msg->data_length);
+       BUG_ON(list_empty(&msg->data));
+
+       data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+
+       cursor->data_head = &msg->data;
+       cursor->total_resid = length;
+       data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+       cursor->data = data;
+
+       __ceph_msg_data_cursor_init(cursor);
+}
+
 /*
  * Return the page containing the next piece to process for a given
  * data item, and supply the page offset and length of that piece.
@@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
                BUG();
                break;
        }
+       cursor->total_resid -= bytes;
        cursor->need_crc = new_piece;
 
+       if (!cursor->resid && cursor->total_resid) {
+               WARN_ON(!cursor->last_piece);
+               BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+               cursor->data = list_entry_next(cursor->data, links);
+               __ceph_msg_data_cursor_init(cursor);
+       }
+
        return new_piece;
 }
 
@@ -2990,8 +3012,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
 
        BUG_ON(!pages);
        BUG_ON(!length);
-       BUG_ON(msg->data_length);
-       BUG_ON(!list_empty(&msg->data));
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
        BUG_ON(!data);
@@ -3012,8 +3032,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
 
        BUG_ON(!pagelist);
        BUG_ON(!pagelist->length);
-       BUG_ON(msg->data_length);
-       BUG_ON(!list_empty(&msg->data));
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
        BUG_ON(!data);
@@ -3031,8 +3049,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
        struct ceph_msg_data *data;
 
        BUG_ON(!bio);
-       BUG_ON(msg->data_length);
-       BUG_ON(!list_empty(&msg->data));
 
        data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
        BUG_ON(!data);