+
#include <linux/ceph/ceph_debug.h>
#include <linux/module.h>
#define OSD_OP_FRONT_LEN 4096
#define OSD_OPREPLY_FRONT_LEN 512
+static struct kmem_cache *ceph_osd_request_cache;
+
static const struct ceph_connection_operations osd_con_ops;
static void __send_queued(struct ceph_osd_client *osdc);
osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
}
-void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
+static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
struct page **pages, u64 length, u32 alignment,
bool pages_from_pool, bool own_pages)
{
osd_data->pages_from_pool = pages_from_pool;
osd_data->own_pages = own_pages;
}
-EXPORT_SYMBOL(ceph_osd_data_pages_init);
-void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
+static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
struct ceph_pagelist *pagelist)
{
osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
osd_data->pagelist = pagelist;
}
-EXPORT_SYMBOL(ceph_osd_data_pagelist_init);
#ifdef CONFIG_BLOCK
-void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
+static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
struct bio *bio, size_t bio_length)
{
osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
osd_data->bio = bio;
osd_data->bio_length = bio_length;
}
-EXPORT_SYMBOL(ceph_osd_data_bio_init);
#endif /* CONFIG_BLOCK */
+#define osd_req_op_data(oreq, whch, typ, fld) \
+ ({ \
+ BUG_ON(whch >= (oreq)->r_num_ops); \
+ &(oreq)->r_ops[whch].typ.fld; \
+ })
+
+static struct ceph_osd_data *
+osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
+{
+ BUG_ON(which >= osd_req->r_num_ops);
+
+ return &osd_req->r_ops[which].raw_data_in;
+}
+
+struct ceph_osd_data *
+osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ return osd_req_op_data(osd_req, which, extent, osd_data);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data);
+
+struct ceph_osd_data *
+osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ return osd_req_op_data(osd_req, which, cls, response_data);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */
+
+void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_raw_data_in(osd_req, which);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
+
+void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
+
+void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
+
+#ifdef CONFIG_BLOCK
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+ unsigned int which, struct bio *bio, size_t bio_length)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_bio_init(osd_data, bio, bio_length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
+#endif /* CONFIG_BLOCK */
+
+static void osd_req_op_cls_request_info_pagelist(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_info);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+
+void osd_req_op_cls_request_data_pagelist(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
+
+void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
+
+void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, response_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
+
static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
{
switch (osd_data->type) {
static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
{
- if (osd_data->type != CEPH_OSD_DATA_TYPE_PAGES)
- return;
-
- if (osd_data->own_pages) {
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
int num_pages;
num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length);
ceph_release_page_vector(osd_data->pages, num_pages);
}
+ ceph_osd_data_init(osd_data);
+}
+
+static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ struct ceph_osd_req_op *op;
+
+ BUG_ON(which >= osd_req->r_num_ops);
+ op = &osd_req->r_ops[which];
+
+ switch (op->op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_WRITE:
+ ceph_osd_data_release(&op->extent.osd_data);
+ break;
+ case CEPH_OSD_OP_CALL:
+ ceph_osd_data_release(&op->cls.request_info);
+ ceph_osd_data_release(&op->cls.request_data);
+ ceph_osd_data_release(&op->cls.response_data);
+ break;
+ default:
+ break;
+ }
}
/*
void ceph_osdc_release_request(struct kref *kref)
{
struct ceph_osd_request *req;
+ unsigned int which;
req = container_of(kref, struct ceph_osd_request, r_kref);
if (req->r_request)
ceph_msg_put(req->r_reply);
}
- ceph_osd_data_release(&req->r_data_in);
- ceph_osd_data_release(&req->r_data_out);
+ for (which = 0; which < req->r_num_ops; which++)
+ osd_req_op_data_release(req, which);
ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
- kfree(req);
+ kmem_cache_free(ceph_osd_request_cache, req);
+
}
EXPORT_SYMBOL(ceph_osdc_release_request);
req = mempool_alloc(osdc->req_mempool, gfp_flags);
memset(req, 0, sizeof(*req));
} else {
- req = kzalloc(sizeof(*req), gfp_flags);
+ req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
}
if (req == NULL)
return NULL;
}
req->r_reply = msg;
- ceph_osd_data_init(&req->r_data_in);
- ceph_osd_data_init(&req->r_data_out);
-
/* create request message; allow space for oid */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
* common init routine for all the other init functions, below.
*/
static struct ceph_osd_req_op *
-osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode)
{
struct ceph_osd_req_op *op;
return op;
}
+void osd_req_op_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode)
+{
+ (void)_osd_req_op_init(osd_req, which, opcode);
+}
+EXPORT_SYMBOL(osd_req_op_init);
+
void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq)
{
- struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
}
EXPORT_SYMBOL(osd_req_op_extent_update);
-void osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct ceph_osd_data *osd_data)
-{
- BUG_ON(which >= osd_req->r_num_ops);
- osd_req->r_ops[which].extent.osd_data = osd_data;
-}
-EXPORT_SYMBOL(osd_req_op_extent_osd_data);
-
void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
- u16 opcode, const char *class, const char *method,
- const void *request_data, size_t request_data_size)
+ u16 opcode, const char *class, const char *method)
{
- struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_pagelist *pagelist;
size_t payload_len = 0;
size_t size;
BUG_ON(opcode != CEPH_OSD_OP_CALL);
+ pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+ BUG_ON(!pagelist);
+ ceph_pagelist_init(pagelist);
+
op->cls.class_name = class;
size = strlen(class);
BUG_ON(size > (size_t) U8_MAX);
op->cls.class_len = size;
+ ceph_pagelist_append(pagelist, class, size);
payload_len += size;
op->cls.method_name = method;
size = strlen(method);
BUG_ON(size > (size_t) U8_MAX);
op->cls.method_len = size;
+ ceph_pagelist_append(pagelist, method, size);
payload_len += size;
- op->cls.request_data = request_data;
- BUG_ON(request_data_size > (size_t) U32_MAX);
- op->cls.request_data_len = (u32) request_data_size;
- payload_len += request_data_size;
+ osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
op->cls.argc = 0; /* currently unused */
op->payload_len = payload_len;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
-void osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
- unsigned int which,
- struct ceph_osd_data *response_data)
-{
- BUG_ON(which >= osd_req->r_num_ops);
- osd_req->r_ops[which].cls.response_data = response_data;
-}
-EXPORT_SYMBOL(osd_req_op_cls_response_data);
void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
{
- struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
op->watch.cookie = cookie;
- /* op->watch.ver = version; */ /* XXX 3847 */
- op->watch.ver = cpu_to_le64(version);
+ op->watch.ver = version;
if (opcode == CEPH_OSD_OP_WATCH && flag)
op->watch.flag = (u8)1;
}
EXPORT_SYMBOL(osd_req_op_watch_init);
+static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
+ struct ceph_osd_data *osd_data)
+{
+ u64 length = ceph_osd_data_length(osd_data);
+
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+ BUG_ON(length > (u64) SIZE_MAX);
+ if (length)
+ ceph_msg_data_add_pages(msg, osd_data->pages,
+ length, osd_data->alignment);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+ BUG_ON(!length);
+ ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
+#ifdef CONFIG_BLOCK
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+ ceph_msg_data_add_bio(msg, osd_data->bio, length);
+#endif
+ } else {
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+ }
+}
+
static u64 osd_req_encode_op(struct ceph_osd_request *req,
struct ceph_osd_op *dst, unsigned int which)
{
struct ceph_osd_req_op *src;
+ struct ceph_osd_data *osd_data;
u64 request_data_len = 0;
- struct ceph_pagelist *pagelist;
+ u64 data_length;
BUG_ON(which >= req->r_num_ops);
src = &req->r_ops[which];
switch (src->op) {
case CEPH_OSD_OP_STAT:
+ osd_data = &src->raw_data_in;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
cpu_to_le64(src->extent.truncate_size);
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
+ osd_data = &src->extent.osd_data;
if (src->op == CEPH_OSD_OP_WRITE)
- WARN_ON(src->extent.osd_data != &req->r_data_out);
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
else
- WARN_ON(src->extent.osd_data != &req->r_data_in);
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_CALL:
- pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
- BUG_ON(!pagelist);
- ceph_pagelist_init(pagelist);
-
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
- dst->cls.indata_len = cpu_to_le32(src->cls.request_data_len);
- ceph_pagelist_append(pagelist, src->cls.class_name,
- src->cls.class_len);
- ceph_pagelist_append(pagelist, src->cls.method_name,
- src->cls.method_len);
- ceph_pagelist_append(pagelist, src->cls.request_data,
- src->cls.request_data_len);
- ceph_osd_data_pagelist_init(&req->r_data_out, pagelist);
-
- WARN_ON(src->cls.response_data != &req->r_data_in);
- request_data_len = pagelist->length;
+ osd_data = &src->cls.request_info;
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+ request_data_len = osd_data->pagelist->length;
+
+ osd_data = &src->cls.request_data;
+ data_length = ceph_osd_data_length(osd_data);
+ if (data_length) {
+ BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+ dst->cls.indata_len = cpu_to_le32(data_length);
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ src->payload_len += data_length;
+ request_data_len += data_length;
+ }
+ osd_data = &src->cls.response_data;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_STARTSYNC:
break;
return request_data_len;
}
-/*
- * build new request AND message
- *
- */
-void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
- struct ceph_snap_context *snapc, u64 snap_id,
- struct timespec *mtime)
-{
- struct ceph_msg *msg = req->r_request;
- void *p;
- size_t msg_size;
- int flags = req->r_flags;
- u64 data_len;
- unsigned int i;
-
- req->r_snapid = snap_id;
- req->r_snapc = ceph_get_snap_context(snapc);
-
- /* encode request */
- msg->hdr.version = cpu_to_le16(4);
-
- p = msg->front.iov_base;
- ceph_encode_32(&p, 1); /* client_inc is always 1 */
- req->r_request_osdmap_epoch = p;
- p += 4;
- req->r_request_flags = p;
- p += 4;
- if (req->r_flags & CEPH_OSD_FLAG_WRITE)
- ceph_encode_timespec(p, mtime);
- p += sizeof(struct ceph_timespec);
- req->r_request_reassert_version = p;
- p += sizeof(struct ceph_eversion); /* will get filled in */
-
- /* oloc */
- ceph_encode_8(&p, 4);
- ceph_encode_8(&p, 4);
- ceph_encode_32(&p, 8 + 4 + 4);
- req->r_request_pool = p;
- p += 8;
- ceph_encode_32(&p, -1); /* preferred */
- ceph_encode_32(&p, 0); /* key len */
-
- ceph_encode_8(&p, 1);
- req->r_request_pgid = p;
- p += 8 + 4;
- ceph_encode_32(&p, -1); /* preferred */
-
- /* oid */
- ceph_encode_32(&p, req->r_oid_len);
- memcpy(p, req->r_oid, req->r_oid_len);
- dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
- p += req->r_oid_len;
-
- /* ops--can imply data */
- ceph_encode_16(&p, (u16)req->r_num_ops);
- data_len = 0;
- for (i = 0; i < req->r_num_ops; i++) {
- data_len += osd_req_encode_op(req, p, i);
- p += sizeof(struct ceph_osd_op);
- }
-
- /* snaps */
- ceph_encode_64(&p, req->r_snapid);
- ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
- ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
- if (req->r_snapc) {
- for (i = 0; i < snapc->num_snaps; i++) {
- ceph_encode_64(&p, req->r_snapc->snaps[i]);
- }
- }
-
- req->r_request_attempts = p;
- p += 4;
-
- /* data */
- if (flags & CEPH_OSD_FLAG_WRITE) {
- u16 data_off;
-
- /*
- * The header "data_off" is a hint to the receiver
- * allowing it to align received data into its
- * buffers such that there's no need to re-copy
- * it before writing it to disk (direct I/O).
- */
- data_off = (u16) (off & 0xffff);
- req->r_request->hdr.data_off = cpu_to_le16(data_off);
- }
- req->r_request->hdr.data_len = cpu_to_le32(data_len);
-
- BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
- msg_size = p - msg->front.iov_base;
- msg->front.iov_len = msg_size;
- msg->hdr.front_len = cpu_to_le32(msg_size);
-
- dout("build_request msg_size was %d\n", (int)msg_size);
-}
-EXPORT_SYMBOL(ceph_osdc_build_request);
-
/*
* build new request AND message, calculate layout, and adjust file
* extent as needed.
bool use_mempool)
{
struct ceph_osd_request *req;
- struct ceph_osd_data *osd_data;
u64 objnum = 0;
u64 objoff = 0;
u64 objlen = 0;
GFP_NOFS);
if (!req)
return ERR_PTR(-ENOMEM);
- osd_data = opcode == CEPH_OSD_OP_WRITE ? &req->r_data_out
- : &req->r_data_in;
req->r_flags = flags;
osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
truncate_size, truncate_seq);
- osd_req_op_extent_osd_data(req, 0, osd_data);
/*
* A second op in the ops array means the caller wants to
mutex_lock(&osdc->request_mutex);
if (req->r_linger) {
__unregister_linger_request(osdc, req);
+ req->r_linger = 0;
ceph_osdc_put_request(req);
}
mutex_unlock(&osdc->request_mutex);
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
ceph_msg_get(req->r_request); /* send consumes a ref */
- ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+ /* Mark the request unsafe if this is the first timet's being sent. */
+
+ if (!req->r_sent && req->r_unsafe_callback)
+ req->r_unsafe_callback(req, true);
req->r_sent = req->r_osd->o_incarnation;
+
+ ceph_con_send(&req->r_osd->o_con, req->r_request);
}
/*
static void complete_request(struct ceph_osd_request *req)
{
- if (req->r_safe_callback)
- req->r_safe_callback(req, NULL);
+ if (req->r_unsafe_callback)
+ req->r_unsafe_callback(req, false);
complete_all(&req->r_safe_completion); /* fsync waiter */
}
return;
}
-static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
- struct ceph_osd_data *osd_data)
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
+ struct ceph_snap_context *snapc, u64 snap_id,
+ struct timespec *mtime)
{
- u64 length = ceph_osd_data_length(osd_data);
+ struct ceph_msg *msg = req->r_request;
+ void *p;
+ size_t msg_size;
+ int flags = req->r_flags;
+ u64 data_len;
+ unsigned int i;
- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
- BUG_ON(length > (u64) SIZE_MAX);
- if (length)
- ceph_msg_data_set_pages(msg, osd_data->pages,
- length, osd_data->alignment);
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
- BUG_ON(!length);
- ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
-#ifdef CONFIG_BLOCK
- } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
- ceph_msg_data_set_bio(msg, osd_data->bio, length);
-#endif
- } else {
- BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+ req->r_snapid = snap_id;
+ req->r_snapc = ceph_get_snap_context(snapc);
+
+ /* encode request */
+ msg->hdr.version = cpu_to_le16(4);
+
+ p = msg->front.iov_base;
+ ceph_encode_32(&p, 1); /* client_inc is always 1 */
+ req->r_request_osdmap_epoch = p;
+ p += 4;
+ req->r_request_flags = p;
+ p += 4;
+ if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+ ceph_encode_timespec(p, mtime);
+ p += sizeof(struct ceph_timespec);
+ req->r_request_reassert_version = p;
+ p += sizeof(struct ceph_eversion); /* will get filled in */
+
+ /* oloc */
+ ceph_encode_8(&p, 4);
+ ceph_encode_8(&p, 4);
+ ceph_encode_32(&p, 8 + 4 + 4);
+ req->r_request_pool = p;
+ p += 8;
+ ceph_encode_32(&p, -1); /* preferred */
+ ceph_encode_32(&p, 0); /* key len */
+
+ ceph_encode_8(&p, 1);
+ req->r_request_pgid = p;
+ p += 8 + 4;
+ ceph_encode_32(&p, -1); /* preferred */
+
+ /* oid */
+ ceph_encode_32(&p, req->r_oid_len);
+ memcpy(p, req->r_oid, req->r_oid_len);
+ dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
+ p += req->r_oid_len;
+
+ /* ops--can imply data */
+ ceph_encode_16(&p, (u16)req->r_num_ops);
+ data_len = 0;
+ for (i = 0; i < req->r_num_ops; i++) {
+ data_len += osd_req_encode_op(req, p, i);
+ p += sizeof(struct ceph_osd_op);
+ }
+
+ /* snaps */
+ ceph_encode_64(&p, req->r_snapid);
+ ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+ ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+ if (req->r_snapc) {
+ for (i = 0; i < snapc->num_snaps; i++) {
+ ceph_encode_64(&p, req->r_snapc->snaps[i]);
+ }
+ }
+
+ req->r_request_attempts = p;
+ p += 4;
+
+ /* data */
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ u16 data_off;
+
+ /*
+ * The header "data_off" is a hint to the receiver
+ * allowing it to align received data into its
+ * buffers such that there's no need to re-copy
+ * it before writing it to disk (direct I/O).
+ */
+ data_off = (u16) (off & 0xffff);
+ req->r_request->hdr.data_off = cpu_to_le16(data_off);
}
+ req->r_request->hdr.data_len = cpu_to_le32(data_len);
+
+ BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+ msg_size = p - msg->front.iov_base;
+ msg->front.iov_len = msg_size;
+ msg->hdr.front_len = cpu_to_le32(msg_size);
+
+ dout("build_request msg_size was %d\n", (int)msg_size);
}
+EXPORT_SYMBOL(ceph_osdc_build_request);
/*
* Register request, send initial attempt.
{
int rc = 0;
- /* Set up response incoming data and request outgoing data fields */
-
- ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in);
- ceph_osdc_msg_data_set(req->r_request, &req->r_data_out);
-
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
__register_request(osdc, req);
- WARN_ON(req->r_sent);
+ req->r_sent = 0;
+ req->r_got_reply = 0;
+ req->r_completed = 0;
rc = __map_request(osdc, req, 0);
if (rc < 0) {
if (nofail) {
/* it may be a short read due to an object boundary */
- ceph_osd_data_pages_init(&req->r_data_in, pages, *plen, page_align,
- false, false);
+ osd_req_op_extent_osd_data_pages(req, 0,
+ pages, *plen, page_align, false, false);
dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
off, *plen, *plen, page_align);
return PTR_ERR(req);
/* it may be a short write due to an object boundary */
- ceph_osd_data_pages_init(&req->r_data_out, pages, len, page_align,
+ osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
false, false);
dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
}
EXPORT_SYMBOL(ceph_osdc_writepages);
+int ceph_osdc_setup(void)
+{
+ BUG_ON(ceph_osd_request_cache);
+ ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
+ sizeof (struct ceph_osd_request),
+ __alignof__(struct ceph_osd_request),
+ 0, NULL);
+
+ return ceph_osd_request_cache ? 0 : -ENOMEM;
+}
+EXPORT_SYMBOL(ceph_osdc_setup);
+
+void ceph_osdc_cleanup(void)
+{
+ BUG_ON(!ceph_osd_request_cache);
+ kmem_cache_destroy(ceph_osd_request_cache);
+ ceph_osd_request_cache = NULL;
+}
+EXPORT_SYMBOL(ceph_osdc_cleanup);
+
/*
* handle incoming message
*/
m = ceph_msg_get(req->r_reply);
if (data_len > 0) {
- struct ceph_osd_data *osd_data = &req->r_data_in;
+ struct ceph_osd_data *osd_data;
+ /*
+ * XXX This is assuming there is only one op containing
+ * XXX page data. Probably OK for reads, but this
+ * XXX ought to be done more generally.
+ */
+ osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
if (osd_data->pages &&
unlikely(osd_data->length < data_len)) {