#define OSD_OP_FRONT_LEN 4096
#define OSD_OPREPLY_FRONT_LEN 512
+static struct kmem_cache *ceph_osd_request_cache;
+
static const struct ceph_connection_operations osd_con_ops;
static void __send_queued(struct ceph_osd_client *osdc);
}
#endif /* CONFIG_BLOCK */
-struct ceph_osd_data *
-osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
- unsigned int which, bool write_request)
+#define osd_req_op_data(oreq, whch, typ, fld) \
+ ({ \
+ BUG_ON(whch >= (oreq)->r_num_ops); \
+ &(oreq)->r_ops[whch].typ.fld; \
+ })
+
+static struct ceph_osd_data *
+osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
{
BUG_ON(which >= osd_req->r_num_ops);
- /* return &osd_req->r_ops[which].extent.osd_data; */
- return write_request ? &osd_req->r_data_out : &osd_req->r_data_in;
+ return &osd_req->r_ops[which].raw_data_in;
}
-EXPORT_SYMBOL(osd_req_op_extent_osd_data);
struct ceph_osd_data *
-osd_req_op_cls_request_info(struct ceph_osd_request *osd_req,
+osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
unsigned int which)
{
- BUG_ON(which >= osd_req->r_num_ops);
-
- /* return &osd_req->r_ops[which].cls.request_info; */
- return &osd_req->r_data_out; /* Request data is outgoing */
+ return osd_req_op_data(osd_req, which, extent, osd_data);
}
-EXPORT_SYMBOL(osd_req_op_cls_request_info); /* ??? */
+EXPORT_SYMBOL(osd_req_op_extent_osd_data);
struct ceph_osd_data *
osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
unsigned int which)
{
- BUG_ON(which >= osd_req->r_num_ops);
-
- /* return &osd_req->r_ops[which].cls.response_data; */
- return &osd_req->r_data_in; /* Response data is incoming */
+ return osd_req_op_data(osd_req, which, cls, response_data);
}
EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */
-void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
- unsigned int which, bool write_request,
- struct page **pages, u64 length, u32 alignment,
+void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
bool pages_from_pool, bool own_pages)
{
struct ceph_osd_data *osd_data;
- osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request);
+ osd_data = osd_req_op_raw_data_in(osd_req, which);
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
+
+void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
- osd_req->r_ops[which].extent.osd_data =
- osd_req_op_extent_osd_data(osd_req, which, write_request);
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
- unsigned int which, bool write_request,
- struct ceph_pagelist *pagelist)
+ unsigned int which, struct ceph_pagelist *pagelist)
{
struct ceph_osd_data *osd_data;
- osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request);
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
ceph_osd_data_pagelist_init(osd_data, pagelist);
-
- osd_req->r_ops[which].extent.osd_data =
- osd_req_op_extent_osd_data(osd_req, which, write_request);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
#ifdef CONFIG_BLOCK
void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
- unsigned int which, bool write_request,
- struct bio *bio, size_t bio_length)
+ unsigned int which, struct bio *bio, size_t bio_length)
{
struct ceph_osd_data *osd_data;
- osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request);
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
ceph_osd_data_bio_init(osd_data, bio, bio_length);
-
- osd_req->r_ops[which].extent.osd_data =
- osd_req_op_extent_osd_data(osd_req, which, write_request);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
#endif /* CONFIG_BLOCK */
{
struct ceph_osd_data *osd_data;
- osd_data = osd_req_op_cls_request_info(osd_req, which);
+ osd_data = osd_req_op_data(osd_req, which, cls, request_info);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+
+void osd_req_op_cls_request_data_pagelist(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
- osd_req->r_ops[which].cls.request_info =
- osd_req_op_cls_request_info(osd_req, which);
+void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages, u64 length,
{
struct ceph_osd_data *osd_data;
- osd_data = osd_req_op_cls_response_data(osd_req, which);
+ osd_data = osd_req_op_data(osd_req, which, cls, response_data);
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
pages_from_pool, own_pages);
-
- osd_req->r_ops[which].cls.response_data =
- osd_req_op_cls_response_data(osd_req, which);
}
EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
{
- if (osd_data->type != CEPH_OSD_DATA_TYPE_PAGES)
- return;
-
- if (osd_data->own_pages) {
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
int num_pages;
num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length);
ceph_release_page_vector(osd_data->pages, num_pages);
}
+ ceph_osd_data_init(osd_data);
+}
+
+static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ struct ceph_osd_req_op *op;
+
+ BUG_ON(which >= osd_req->r_num_ops);
+ op = &osd_req->r_ops[which];
+
+ switch (op->op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_WRITE:
+ ceph_osd_data_release(&op->extent.osd_data);
+ break;
+ case CEPH_OSD_OP_CALL:
+ ceph_osd_data_release(&op->cls.request_info);
+ ceph_osd_data_release(&op->cls.request_data);
+ ceph_osd_data_release(&op->cls.response_data);
+ break;
+ default:
+ break;
+ }
}
/*
void ceph_osdc_release_request(struct kref *kref)
{
struct ceph_osd_request *req;
+ unsigned int which;
req = container_of(kref, struct ceph_osd_request, r_kref);
if (req->r_request)
ceph_msg_put(req->r_reply);
}
- ceph_osd_data_release(&req->r_data_in);
- ceph_osd_data_release(&req->r_data_out);
+ for (which = 0; which < req->r_num_ops; which++)
+ osd_req_op_data_release(req, which);
ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
- kfree(req);
+ kmem_cache_free(ceph_osd_request_cache, req);
+
}
EXPORT_SYMBOL(ceph_osdc_release_request);
req = mempool_alloc(osdc->req_mempool, gfp_flags);
memset(req, 0, sizeof(*req));
} else {
- req = kzalloc(sizeof(*req), gfp_flags);
+ req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
}
if (req == NULL)
return NULL;
}
req->r_reply = msg;
- ceph_osd_data_init(&req->r_data_in);
- ceph_osd_data_init(&req->r_data_out);
-
/* create request message; allow space for oid */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
* common init routine for all the other init functions, below.
*/
static struct ceph_osd_req_op *
-osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode)
{
struct ceph_osd_req_op *op;
return op;
}
+void osd_req_op_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode)
+{
+ (void)_osd_req_op_init(osd_req, which, opcode);
+}
+EXPORT_SYMBOL(osd_req_op_init);
+
void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq)
{
- struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
EXPORT_SYMBOL(osd_req_op_extent_update);
void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
- u16 opcode, const char *class, const char *method,
- const void *request_data, size_t request_data_size)
+ u16 opcode, const char *class, const char *method)
{
- struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
struct ceph_pagelist *pagelist;
size_t payload_len = 0;
size_t size;
ceph_pagelist_append(pagelist, method, size);
payload_len += size;
- op->cls.request_data = request_data;
- BUG_ON(request_data_size > (size_t) U32_MAX);
- op->cls.request_data_len = (u32) request_data_size;
- ceph_pagelist_append(pagelist, request_data, request_data_size);
- payload_len += request_data_size;
-
osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
op->cls.argc = 0; /* currently unused */
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
{
- struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
op->watch.cookie = cookie;
- /* op->watch.ver = version; */ /* XXX 3847 */
- op->watch.ver = cpu_to_le64(version);
+ op->watch.ver = version;
if (opcode == CEPH_OSD_OP_WATCH && flag)
op->watch.flag = (u8)1;
}
EXPORT_SYMBOL(osd_req_op_watch_init);
+static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
+ struct ceph_osd_data *osd_data)
+{
+ u64 length = ceph_osd_data_length(osd_data);
+
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+ BUG_ON(length > (u64) SIZE_MAX);
+ if (length)
+ ceph_msg_data_add_pages(msg, osd_data->pages,
+ length, osd_data->alignment);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+ BUG_ON(!length);
+ ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
+#ifdef CONFIG_BLOCK
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+ ceph_msg_data_add_bio(msg, osd_data->bio, length);
+#endif
+ } else {
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+ }
+}
+
static u64 osd_req_encode_op(struct ceph_osd_request *req,
struct ceph_osd_op *dst, unsigned int which)
{
struct ceph_osd_req_op *src;
+ struct ceph_osd_data *osd_data;
u64 request_data_len = 0;
+ u64 data_length;
BUG_ON(which >= req->r_num_ops);
src = &req->r_ops[which];
switch (src->op) {
case CEPH_OSD_OP_STAT:
+ osd_data = &src->raw_data_in;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
cpu_to_le64(src->extent.truncate_size);
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
+ osd_data = &src->extent.osd_data;
if (src->op == CEPH_OSD_OP_WRITE)
- WARN_ON(src->extent.osd_data != &req->r_data_out);
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
else
- WARN_ON(src->extent.osd_data != &req->r_data_in);
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_CALL:
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
- dst->cls.indata_len = cpu_to_le32(src->cls.request_data_len);
- WARN_ON(src->cls.response_data != &req->r_data_in);
- WARN_ON(src->cls.request_info != &req->r_data_out);
- BUG_ON(src->cls.request_info->type !=
- CEPH_OSD_DATA_TYPE_PAGELIST);
- request_data_len = src->cls.request_info->pagelist->length;
+ osd_data = &src->cls.request_info;
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+ request_data_len = osd_data->pagelist->length;
+
+ osd_data = &src->cls.request_data;
+ data_length = ceph_osd_data_length(osd_data);
+ if (data_length) {
+ BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+ dst->cls.indata_len = cpu_to_le32(data_length);
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ src->payload_len += data_length;
+ request_data_len += data_length;
+ }
+ osd_data = &src->cls.response_data;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_STARTSYNC:
break;
mutex_lock(&osdc->request_mutex);
if (req->r_linger) {
__unregister_linger_request(osdc, req);
+ req->r_linger = 0;
ceph_osdc_put_request(req);
}
mutex_unlock(&osdc->request_mutex);
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
ceph_msg_get(req->r_request); /* send consumes a ref */
- ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+ /* Mark the request unsafe if this is the first timet's being sent. */
+
+ if (!req->r_sent && req->r_unsafe_callback)
+ req->r_unsafe_callback(req, true);
req->r_sent = req->r_osd->o_incarnation;
+
+ ceph_con_send(&req->r_osd->o_con, req->r_request);
}
/*
static void complete_request(struct ceph_osd_request *req)
{
- if (req->r_safe_callback)
- req->r_safe_callback(req, NULL);
+ if (req->r_unsafe_callback)
+ req->r_unsafe_callback(req, false);
complete_all(&req->r_safe_completion); /* fsync waiter */
}
return;
}
-static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
- struct ceph_osd_data *osd_data)
-{
- u64 length = ceph_osd_data_length(osd_data);
-
- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
- BUG_ON(length > (u64) SIZE_MAX);
- if (length)
- ceph_msg_data_set_pages(msg, osd_data->pages,
- length, osd_data->alignment);
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
- BUG_ON(!length);
- ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
-#ifdef CONFIG_BLOCK
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
- ceph_msg_data_set_bio(msg, osd_data->bio, length);
-#endif
- } else {
- BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
- }
-}
-
/*
* build new request AND message
*
u64 data_len;
unsigned int i;
- /* Set up response incoming data and request outgoing data fields */
-
- ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in);
- ceph_osdc_msg_data_set(req->r_request, &req->r_data_out);
-
req->r_snapid = snap_id;
req->r_snapc = ceph_get_snap_context(snapc);
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
__register_request(osdc, req);
- WARN_ON(req->r_sent);
+ req->r_sent = 0;
+ req->r_got_reply = 0;
+ req->r_completed = 0;
rc = __map_request(osdc, req, 0);
if (rc < 0) {
if (nofail) {
/* it may be a short read due to an object boundary */
- osd_req_op_extent_osd_data_pages(req, 0, false,
+ osd_req_op_extent_osd_data_pages(req, 0,
pages, *plen, page_align, false, false);
dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
return PTR_ERR(req);
/* it may be a short write due to an object boundary */
- osd_req_op_extent_osd_data_pages(req, 0, true, pages, len, page_align,
+ osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
false, false);
dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
}
EXPORT_SYMBOL(ceph_osdc_writepages);
+int ceph_osdc_setup(void)
+{
+ BUG_ON(ceph_osd_request_cache);
+ ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
+ sizeof (struct ceph_osd_request),
+ __alignof__(struct ceph_osd_request),
+ 0, NULL);
+
+ return ceph_osd_request_cache ? 0 : -ENOMEM;
+}
+EXPORT_SYMBOL(ceph_osdc_setup);
+
+void ceph_osdc_cleanup(void)
+{
+ BUG_ON(!ceph_osd_request_cache);
+ kmem_cache_destroy(ceph_osd_request_cache);
+ ceph_osd_request_cache = NULL;
+}
+EXPORT_SYMBOL(ceph_osdc_cleanup);
+
/*
* handle incoming message
*/
* XXX page data. Probably OK for reads, but this
* XXX ought to be done more generally.
*/
- osd_data = osd_req_op_extent_osd_data(req, 0, false);
+ osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
if (osd_data->pages &&
unlikely(osd_data->length < data_len)) {