4 * Copyright (c) 2011-2012, Dan Magenheimer, Oracle Corp.
6 * Ramster_r2net provides an interface between zcache and r2net.
8 * FIXME: support more than two nodes
11 #include <linux/list.h>
13 #include "nodemanager.h"
15 #include "../zcache.h"
18 #define RAMSTER_TESTING
20 #define RMSTR_KEY 0x77347734
23 RMSTR_TMEM_PUT_EPH = 100,
25 RMSTR_TMEM_ASYNC_GET_REQUEST,
26 RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27 RMSTR_TMEM_ASYNC_GET_REPLY,
30 RMSTR_TMEM_DESTROY_POOL,
33 #define RMSTR_R2NET_MAX_LEN \
34 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
36 #include "tcp_internal.h"
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
41 int r2net_remote_target_node_set(int node_num)
45 r2net_target_node = r2nm_get_node_by_num(node_num);
46 if (r2net_target_node != NULL) {
47 r2net_target_nodenum = node_num;
48 r2nm_node_put(r2net_target_node);
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58 u32 len, void *data, void **ret_data)
61 struct tmem_xhandle xh;
63 size_t size = RMSTR_R2NET_MAX_LEN;
64 u16 msgtype = be16_to_cpu(msg->msg_type);
65 bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
68 xh = *(struct tmem_xhandle *)msg->buf;
69 if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
71 pdata = ramster_async_get_buf;
72 *(struct tmem_xhandle *)pdata = xh;
73 pdata += sizeof(struct tmem_xhandle);
74 local_irq_save(flags);
75 found = zcache_get_page(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76 pdata, &size, true, get_and_free ? 1 : -1);
77 local_irq_restore(flags);
79 /* a zero size indicates the get failed */
82 if (size > RMSTR_R2NET_MAX_LEN)
84 *ret_data = pdata - sizeof(struct tmem_xhandle);
85 /* now make caller (r2net_process_message) handle specially */
86 r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87 return size + sizeof(struct tmem_xhandle);
90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91 u32 len, void *data, void **ret_data)
93 char *in = (char *)msg->buf;
94 int datalen = len - sizeof(struct r2net_msg);
96 struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
98 in += sizeof(struct tmem_xhandle);
99 datalen -= sizeof(struct tmem_xhandle);
100 BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101 ret = ramster_localify(xh->pool_id, &xh->oid, xh->index,
102 in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
105 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
110 int ramster_remote_put_handler(struct r2net_msg *msg,
111 u32 len, void *data, void **ret_data)
113 struct tmem_xhandle *xh;
114 char *p = (char *)msg->buf;
115 int datalen = len - sizeof(struct r2net_msg) -
116 sizeof(struct tmem_xhandle);
117 u16 msgtype = be16_to_cpu(msg->msg_type);
118 bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
122 xh = (struct tmem_xhandle *)p;
123 p += sizeof(struct tmem_xhandle);
124 zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125 local_irq_save(flags);
126 ret = zcache_put_page(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127 p, datalen, true, ephemeral);
128 local_irq_restore(flags);
132 int ramster_remote_flush_handler(struct r2net_msg *msg,
133 u32 len, void *data, void **ret_data)
135 struct tmem_xhandle *xh;
136 char *p = (char *)msg->buf;
138 xh = (struct tmem_xhandle *)p;
139 p += sizeof(struct tmem_xhandle);
140 (void)zcache_flush_page(xh->client_id, xh->pool_id,
141 &xh->oid, xh->index);
145 int ramster_remote_flobj_handler(struct r2net_msg *msg,
146 u32 len, void *data, void **ret_data)
148 struct tmem_xhandle *xh;
149 char *p = (char *)msg->buf;
151 xh = (struct tmem_xhandle *)p;
152 p += sizeof(struct tmem_xhandle);
153 (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
157 int r2net_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
158 size_t expect_size, uint8_t expect_cksum,
161 int nodenum, ret = -1, status;
162 struct r2nm_node *node = NULL;
166 struct r2net_node *nn;
168 node = r2nm_get_node_by_num(remotenode);
171 xh->client_id = r2nm_this_node(); /* which node is getting */
172 xh->xh_data_cksum = expect_cksum;
173 xh->xh_data_size = expect_size;
175 vec[0].iov_len = sizeof(*xh);
176 vec[0].iov_base = xh;
178 node = r2net_target_node;
182 nodenum = r2net_target_nodenum;
185 nn = r2net_nn_from_num(nodenum);
186 if (nn->nn_persistent_error || !nn->nn_sc_valid) {
193 msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
195 msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
196 ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
197 vec, veclen, remotenode, &status);
200 if (ret == -ENOTCONN || ret == -EHOSTDOWN)
204 /* FIXME handle bad message possibilities here? */
205 pr_err("UNTESTED ret<0 in ramster_remote_async_get: ret=%d\n",
213 #ifdef RAMSTER_TESTING
214 /* leave me here to see if it catches a weird crash */
215 static void ramster_check_irq_counts(void)
217 static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
218 int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
220 cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
221 if (cur_hardirq_cnt > last_hardirq_cnt) {
222 last_hardirq_cnt = cur_hardirq_cnt;
223 if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
224 pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
227 cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
228 if (cur_softirq_cnt > last_softirq_cnt) {
229 last_softirq_cnt = cur_softirq_cnt;
230 if (!(last_softirq_cnt&(last_softirq_cnt-1)))
231 pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
234 cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
235 if (cur_preempt_cnt > last_preempt_cnt) {
236 last_preempt_cnt = cur_preempt_cnt;
237 if (!(last_preempt_cnt&(last_preempt_cnt-1)))
238 pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
244 int r2net_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
245 bool ephemeral, int *remotenode)
247 int nodenum, ret = -1, status;
248 struct r2nm_node *node = NULL;
252 struct r2net_node *nn;
254 BUG_ON(size > RMSTR_R2NET_MAX_LEN);
255 xh->client_id = r2nm_this_node(); /* which node is putting */
256 vec[0].iov_len = sizeof(*xh);
257 vec[0].iov_base = xh;
258 vec[1].iov_len = size;
259 vec[1].iov_base = data;
261 node = r2net_target_node;
265 nodenum = r2net_target_nodenum;
269 nn = r2net_nn_from_num(nodenum);
270 if (nn->nn_persistent_error || !nn->nn_sc_valid) {
277 msg_type = RMSTR_TMEM_PUT_EPH;
279 msg_type = RMSTR_TMEM_PUT_PERS;
280 #ifdef RAMSTER_TESTING
281 /* leave me here to see if it catches a weird crash */
282 ramster_check_irq_counts();
285 ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
291 *remotenode = nodenum;
299 int r2net_remote_flush(struct tmem_xhandle *xh, int remotenode)
301 int ret = -1, status;
302 struct r2nm_node *node = NULL;
306 node = r2nm_get_node_by_num(remotenode);
307 BUG_ON(node == NULL);
308 xh->client_id = r2nm_this_node(); /* which node is flushing */
309 vec[0].iov_len = sizeof(*xh);
310 vec[0].iov_base = xh;
311 BUG_ON(irqs_disabled());
312 BUG_ON(in_softirq());
313 ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
314 vec, veclen, remotenode, &status);
319 int r2net_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
321 int ret = -1, status;
322 struct r2nm_node *node = NULL;
326 node = r2nm_get_node_by_num(remotenode);
327 BUG_ON(node == NULL);
328 xh->client_id = r2nm_this_node(); /* which node is flobjing */
329 vec[0].iov_len = sizeof(*xh);
330 vec[0].iov_base = xh;
331 ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
332 vec, veclen, remotenode, &status);
338 * Handler registration
341 static LIST_HEAD(r2net_unreg_list);
343 static void r2net_unregister_handlers(void)
345 r2net_unregister_handler_list(&r2net_unreg_list);
348 int r2net_register_handlers(void)
352 status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
354 ramster_remote_put_handler,
355 NULL, NULL, &r2net_unreg_list);
359 status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
361 ramster_remote_put_handler,
362 NULL, NULL, &r2net_unreg_list);
366 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
368 ramster_remote_async_get_request_handler,
374 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
375 RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
376 ramster_remote_async_get_request_handler,
382 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
384 ramster_remote_async_get_reply_handler,
390 status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
392 ramster_remote_flush_handler,
398 status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
400 ramster_remote_flobj_handler,
406 pr_info("ramster: r2net handlers registered\n");
410 r2net_unregister_handlers();
411 pr_err("ramster: couldn't register r2net handlers\n");