]> Pileus Git - ~andy/linux/blob - net/ceph/osd_client.c
libceph: define osd data initialization helpers
[~andy/linux] / net / ceph / osd_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20
21 #define OSD_OP_FRONT_LEN        4096
22 #define OSD_OPREPLY_FRONT_LEN   512
23
24 static const struct ceph_connection_operations osd_con_ops;
25
26 static void __send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 static void __register_request(struct ceph_osd_client *osdc,
29                                struct ceph_osd_request *req);
30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31                                         struct ceph_osd_request *req);
32 static void __send_request(struct ceph_osd_client *osdc,
33                            struct ceph_osd_request *req);
34
35 /*
36  * Implement client access to distributed object storage cluster.
37  *
38  * All data objects are stored within a cluster/cloud of OSDs, or
39  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
40  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
41  * remote daemons serving up and coordinating consistent and safe
42  * access to storage.
43  *
44  * Cluster membership and the mapping of data objects onto storage devices
45  * are described by the osd map.
46  *
47  * We keep track of pending OSD requests (read, write), resubmit
48  * requests to different OSDs when the cluster topology/data layout
49  * change, or retry the affected requests when the communications
50  * channel with an OSD is reset.
51  */
52
53 /*
54  * calculate the mapping of a file extent onto an object, and fill out the
55  * request accordingly.  shorten extent as necessary if it crosses an
56  * object boundary.
57  *
58  * fill osd op in request message.
59  */
60 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
61                         u64 *objnum, u64 *objoff, u64 *objlen)
62 {
63         u64 orig_len = *plen;
64         int r;
65
66         /* object extent? */
67         r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
68                                           objoff, objlen);
69         if (r < 0)
70                 return r;
71         if (*objlen < orig_len) {
72                 *plen = *objlen;
73                 dout(" skipping last %llu, final file extent %llu~%llu\n",
74                      orig_len - *plen, off, *plen);
75         }
76
77         dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
78
79         return 0;
80 }
81
82 void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
83                         struct page **pages, u64 length, u32 alignment,
84                         bool pages_from_pool, bool own_pages)
85 {
86         osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
87         osd_data->pages = pages;
88         osd_data->length = length;
89         osd_data->alignment = alignment;
90         osd_data->pages_from_pool = pages_from_pool;
91         osd_data->own_pages = own_pages;
92 }
93 EXPORT_SYMBOL(ceph_osd_data_pages_init);
94
95 void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
96                         struct ceph_pagelist *pagelist)
97 {
98         osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
99         osd_data->pagelist = pagelist;
100 }
101 EXPORT_SYMBOL(ceph_osd_data_pagelist_init);
102
103 #ifdef CONFIG_BLOCK
104 void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
105                         struct bio *bio, size_t bio_length)
106 {
107         osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
108         osd_data->bio = bio;
109         osd_data->bio_length = bio_length;
110 }
111 EXPORT_SYMBOL(ceph_osd_data_bio_init);
112 #endif /* CONFIG_BLOCK */
113
114 /*
115  * requests
116  */
117 void ceph_osdc_release_request(struct kref *kref)
118 {
119         int num_pages;
120         struct ceph_osd_request *req = container_of(kref,
121                                                     struct ceph_osd_request,
122                                                     r_kref);
123
124         if (req->r_request)
125                 ceph_msg_put(req->r_request);
126         if (req->r_reply) {
127                 ceph_msg_revoke_incoming(req->r_reply);
128                 ceph_msg_put(req->r_reply);
129         }
130
131         if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES &&
132                         req->r_data_in.own_pages) {
133                 num_pages = calc_pages_for((u64)req->r_data_in.alignment,
134                                                 (u64)req->r_data_in.length);
135                 ceph_release_page_vector(req->r_data_in.pages, num_pages);
136         }
137         if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES &&
138                         req->r_data_out.own_pages) {
139                 num_pages = calc_pages_for((u64)req->r_data_out.alignment,
140                                                 (u64)req->r_data_out.length);
141                 ceph_release_page_vector(req->r_data_out.pages, num_pages);
142         }
143
144         ceph_put_snap_context(req->r_snapc);
145         if (req->r_mempool)
146                 mempool_free(req, req->r_osdc->req_mempool);
147         else
148                 kfree(req);
149 }
150 EXPORT_SYMBOL(ceph_osdc_release_request);
151
152 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
153                                                struct ceph_snap_context *snapc,
154                                                unsigned int num_ops,
155                                                bool use_mempool,
156                                                gfp_t gfp_flags)
157 {
158         struct ceph_osd_request *req;
159         struct ceph_msg *msg;
160         size_t msg_size;
161
162         msg_size = 4 + 4 + 8 + 8 + 4+8;
163         msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
164         msg_size += 1 + 8 + 4 + 4;     /* pg_t */
165         msg_size += 4 + MAX_OBJ_NAME_SIZE;
166         msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
167         msg_size += 8;  /* snapid */
168         msg_size += 8;  /* snap_seq */
169         msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
170         msg_size += 4;
171
172         if (use_mempool) {
173                 req = mempool_alloc(osdc->req_mempool, gfp_flags);
174                 memset(req, 0, sizeof(*req));
175         } else {
176                 req = kzalloc(sizeof(*req), gfp_flags);
177         }
178         if (req == NULL)
179                 return NULL;
180
181         req->r_osdc = osdc;
182         req->r_mempool = use_mempool;
183
184         kref_init(&req->r_kref);
185         init_completion(&req->r_completion);
186         init_completion(&req->r_safe_completion);
187         RB_CLEAR_NODE(&req->r_node);
188         INIT_LIST_HEAD(&req->r_unsafe_item);
189         INIT_LIST_HEAD(&req->r_linger_item);
190         INIT_LIST_HEAD(&req->r_linger_osd);
191         INIT_LIST_HEAD(&req->r_req_lru_item);
192         INIT_LIST_HEAD(&req->r_osd_item);
193
194         /* create reply message */
195         if (use_mempool)
196                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
197         else
198                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
199                                    OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
200         if (!msg) {
201                 ceph_osdc_put_request(req);
202                 return NULL;
203         }
204         req->r_reply = msg;
205
206         req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE;
207         req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE;
208
209         /* create request message; allow space for oid */
210         if (use_mempool)
211                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
212         else
213                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
214         if (!msg) {
215                 ceph_osdc_put_request(req);
216                 return NULL;
217         }
218
219         memset(msg->front.iov_base, 0, msg->front.iov_len);
220
221         req->r_request = msg;
222
223         return req;
224 }
225 EXPORT_SYMBOL(ceph_osdc_alloc_request);
226
227 static bool osd_req_opcode_valid(u16 opcode)
228 {
229         switch (opcode) {
230         case CEPH_OSD_OP_READ:
231         case CEPH_OSD_OP_STAT:
232         case CEPH_OSD_OP_MAPEXT:
233         case CEPH_OSD_OP_MASKTRUNC:
234         case CEPH_OSD_OP_SPARSE_READ:
235         case CEPH_OSD_OP_NOTIFY:
236         case CEPH_OSD_OP_NOTIFY_ACK:
237         case CEPH_OSD_OP_ASSERT_VER:
238         case CEPH_OSD_OP_WRITE:
239         case CEPH_OSD_OP_WRITEFULL:
240         case CEPH_OSD_OP_TRUNCATE:
241         case CEPH_OSD_OP_ZERO:
242         case CEPH_OSD_OP_DELETE:
243         case CEPH_OSD_OP_APPEND:
244         case CEPH_OSD_OP_STARTSYNC:
245         case CEPH_OSD_OP_SETTRUNC:
246         case CEPH_OSD_OP_TRIMTRUNC:
247         case CEPH_OSD_OP_TMAPUP:
248         case CEPH_OSD_OP_TMAPPUT:
249         case CEPH_OSD_OP_TMAPGET:
250         case CEPH_OSD_OP_CREATE:
251         case CEPH_OSD_OP_ROLLBACK:
252         case CEPH_OSD_OP_WATCH:
253         case CEPH_OSD_OP_OMAPGETKEYS:
254         case CEPH_OSD_OP_OMAPGETVALS:
255         case CEPH_OSD_OP_OMAPGETHEADER:
256         case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
257         case CEPH_OSD_OP_OMAPSETVALS:
258         case CEPH_OSD_OP_OMAPSETHEADER:
259         case CEPH_OSD_OP_OMAPCLEAR:
260         case CEPH_OSD_OP_OMAPRMKEYS:
261         case CEPH_OSD_OP_OMAP_CMP:
262         case CEPH_OSD_OP_CLONERANGE:
263         case CEPH_OSD_OP_ASSERT_SRC_VERSION:
264         case CEPH_OSD_OP_SRC_CMPXATTR:
265         case CEPH_OSD_OP_GETXATTR:
266         case CEPH_OSD_OP_GETXATTRS:
267         case CEPH_OSD_OP_CMPXATTR:
268         case CEPH_OSD_OP_SETXATTR:
269         case CEPH_OSD_OP_SETXATTRS:
270         case CEPH_OSD_OP_RESETXATTRS:
271         case CEPH_OSD_OP_RMXATTR:
272         case CEPH_OSD_OP_PULL:
273         case CEPH_OSD_OP_PUSH:
274         case CEPH_OSD_OP_BALANCEREADS:
275         case CEPH_OSD_OP_UNBALANCEREADS:
276         case CEPH_OSD_OP_SCRUB:
277         case CEPH_OSD_OP_SCRUB_RESERVE:
278         case CEPH_OSD_OP_SCRUB_UNRESERVE:
279         case CEPH_OSD_OP_SCRUB_STOP:
280         case CEPH_OSD_OP_SCRUB_MAP:
281         case CEPH_OSD_OP_WRLOCK:
282         case CEPH_OSD_OP_WRUNLOCK:
283         case CEPH_OSD_OP_RDLOCK:
284         case CEPH_OSD_OP_RDUNLOCK:
285         case CEPH_OSD_OP_UPLOCK:
286         case CEPH_OSD_OP_DNLOCK:
287         case CEPH_OSD_OP_CALL:
288         case CEPH_OSD_OP_PGLS:
289         case CEPH_OSD_OP_PGLS_FILTER:
290                 return true;
291         default:
292                 return false;
293         }
294 }
295
296 /*
297  * This is an osd op init function for opcodes that have no data or
298  * other information associated with them.  It also serves as a
299  * common init routine for all the other init functions, below.
300  */
301 void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode)
302 {
303         BUG_ON(!osd_req_opcode_valid(opcode));
304
305         memset(op, 0, sizeof (*op));
306
307         op->op = opcode;
308 }
309
310 void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
311                                 u64 offset, u64 length,
312                                 u64 truncate_size, u32 truncate_seq)
313 {
314         size_t payload_len = 0;
315
316         BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
317
318         osd_req_op_init(op, opcode);
319
320         op->extent.offset = offset;
321         op->extent.length = length;
322         op->extent.truncate_size = truncate_size;
323         op->extent.truncate_seq = truncate_seq;
324         if (opcode == CEPH_OSD_OP_WRITE)
325                 payload_len += length;
326
327         op->payload_len = payload_len;
328 }
329 EXPORT_SYMBOL(osd_req_op_extent_init);
330
331 void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length)
332 {
333         u64 previous = op->extent.length;
334
335         if (length == previous)
336                 return;         /* Nothing to do */
337         BUG_ON(length > previous);
338
339         op->extent.length = length;
340         op->payload_len -= previous - length;
341 }
342 EXPORT_SYMBOL(osd_req_op_extent_update);
343
344 void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
345                         const char *class, const char *method,
346                         const void *request_data, size_t request_data_size)
347 {
348         size_t payload_len = 0;
349         size_t size;
350
351         BUG_ON(opcode != CEPH_OSD_OP_CALL);
352
353         osd_req_op_init(op, opcode);
354
355         op->cls.class_name = class;
356         size = strlen(class);
357         BUG_ON(size > (size_t) U8_MAX);
358         op->cls.class_len = size;
359         payload_len += size;
360
361         op->cls.method_name = method;
362         size = strlen(method);
363         BUG_ON(size > (size_t) U8_MAX);
364         op->cls.method_len = size;
365         payload_len += size;
366
367         op->cls.indata = request_data;
368         BUG_ON(request_data_size > (size_t) U32_MAX);
369         op->cls.indata_len = (u32) request_data_size;
370         payload_len += request_data_size;
371
372         op->cls.argc = 0;       /* currently unused */
373
374         op->payload_len = payload_len;
375 }
376 EXPORT_SYMBOL(osd_req_op_cls_init);
377
378 void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
379                                 u64 cookie, u64 version, int flag)
380 {
381         BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
382
383         osd_req_op_init(op, opcode);
384
385         op->watch.cookie = cookie;
386         /* op->watch.ver = version; */  /* XXX 3847 */
387         op->watch.ver = cpu_to_le64(version);
388         if (opcode == CEPH_OSD_OP_WATCH && flag)
389                 op->watch.flag = (u8) 1;
390 }
391 EXPORT_SYMBOL(osd_req_op_watch_init);
392
393 static u64 osd_req_encode_op(struct ceph_osd_request *req,
394                               struct ceph_osd_op *dst,
395                               struct ceph_osd_req_op *src)
396 {
397         u64 out_data_len = 0;
398         struct ceph_pagelist *pagelist;
399
400         if (WARN_ON(!osd_req_opcode_valid(src->op))) {
401                 pr_err("unrecognized osd opcode %d\n", src->op);
402
403                 return 0;
404         }
405
406         switch (src->op) {
407         case CEPH_OSD_OP_STAT:
408                 break;
409         case CEPH_OSD_OP_READ:
410         case CEPH_OSD_OP_WRITE:
411                 if (src->op == CEPH_OSD_OP_WRITE)
412                         out_data_len = src->extent.length;
413                 dst->extent.offset = cpu_to_le64(src->extent.offset);
414                 dst->extent.length = cpu_to_le64(src->extent.length);
415                 dst->extent.truncate_size =
416                         cpu_to_le64(src->extent.truncate_size);
417                 dst->extent.truncate_seq =
418                         cpu_to_le32(src->extent.truncate_seq);
419                 break;
420         case CEPH_OSD_OP_CALL:
421                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
422                 BUG_ON(!pagelist);
423                 ceph_pagelist_init(pagelist);
424
425                 dst->cls.class_len = src->cls.class_len;
426                 dst->cls.method_len = src->cls.method_len;
427                 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
428                 ceph_pagelist_append(pagelist, src->cls.class_name,
429                                      src->cls.class_len);
430                 ceph_pagelist_append(pagelist, src->cls.method_name,
431                                      src->cls.method_len);
432                 ceph_pagelist_append(pagelist, src->cls.indata,
433                                      src->cls.indata_len);
434
435                 ceph_osd_data_pagelist_init(&req->r_data_out, pagelist);
436                 out_data_len = pagelist->length;
437                 break;
438         case CEPH_OSD_OP_STARTSYNC:
439                 break;
440         case CEPH_OSD_OP_NOTIFY_ACK:
441         case CEPH_OSD_OP_WATCH:
442                 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
443                 dst->watch.ver = cpu_to_le64(src->watch.ver);
444                 dst->watch.flag = src->watch.flag;
445                 break;
446         default:
447                 pr_err("unsupported osd opcode %s\n",
448                         ceph_osd_op_name(src->op));
449                 WARN_ON(1);
450
451                 return 0;
452         }
453         dst->op = cpu_to_le16(src->op);
454         dst->payload_len = cpu_to_le32(src->payload_len);
455
456         return out_data_len;
457 }
458
459 /*
460  * build new request AND message
461  *
462  */
463 void ceph_osdc_build_request(struct ceph_osd_request *req,
464                              u64 off, unsigned int num_ops,
465                              struct ceph_osd_req_op *src_ops,
466                              struct ceph_snap_context *snapc, u64 snap_id,
467                              struct timespec *mtime)
468 {
469         struct ceph_msg *msg = req->r_request;
470         struct ceph_osd_req_op *src_op;
471         void *p;
472         size_t msg_size;
473         int flags = req->r_flags;
474         u64 data_len;
475         int i;
476
477         req->r_num_ops = num_ops;
478         req->r_snapid = snap_id;
479         req->r_snapc = ceph_get_snap_context(snapc);
480
481         /* encode request */
482         msg->hdr.version = cpu_to_le16(4);
483
484         p = msg->front.iov_base;
485         ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
486         req->r_request_osdmap_epoch = p;
487         p += 4;
488         req->r_request_flags = p;
489         p += 4;
490         if (req->r_flags & CEPH_OSD_FLAG_WRITE)
491                 ceph_encode_timespec(p, mtime);
492         p += sizeof(struct ceph_timespec);
493         req->r_request_reassert_version = p;
494         p += sizeof(struct ceph_eversion); /* will get filled in */
495
496         /* oloc */
497         ceph_encode_8(&p, 4);
498         ceph_encode_8(&p, 4);
499         ceph_encode_32(&p, 8 + 4 + 4);
500         req->r_request_pool = p;
501         p += 8;
502         ceph_encode_32(&p, -1);  /* preferred */
503         ceph_encode_32(&p, 0);   /* key len */
504
505         ceph_encode_8(&p, 1);
506         req->r_request_pgid = p;
507         p += 8 + 4;
508         ceph_encode_32(&p, -1);  /* preferred */
509
510         /* oid */
511         ceph_encode_32(&p, req->r_oid_len);
512         memcpy(p, req->r_oid, req->r_oid_len);
513         dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
514         p += req->r_oid_len;
515
516         /* ops--can imply data */
517         ceph_encode_16(&p, num_ops);
518         src_op = src_ops;
519         req->r_request_ops = p;
520         data_len = 0;
521         for (i = 0; i < num_ops; i++, src_op++) {
522                 data_len += osd_req_encode_op(req, p, src_op);
523                 p += sizeof(struct ceph_osd_op);
524         }
525
526         /* snaps */
527         ceph_encode_64(&p, req->r_snapid);
528         ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
529         ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
530         if (req->r_snapc) {
531                 for (i = 0; i < snapc->num_snaps; i++) {
532                         ceph_encode_64(&p, req->r_snapc->snaps[i]);
533                 }
534         }
535
536         req->r_request_attempts = p;
537         p += 4;
538
539         /* data */
540         if (flags & CEPH_OSD_FLAG_WRITE) {
541                 u16 data_off;
542
543                 /*
544                  * The header "data_off" is a hint to the receiver
545                  * allowing it to align received data into its
546                  * buffers such that there's no need to re-copy
547                  * it before writing it to disk (direct I/O).
548                  */
549                 data_off = (u16) (off & 0xffff);
550                 req->r_request->hdr.data_off = cpu_to_le16(data_off);
551         }
552         req->r_request->hdr.data_len = cpu_to_le32(data_len);
553
554         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
555         msg_size = p - msg->front.iov_base;
556         msg->front.iov_len = msg_size;
557         msg->hdr.front_len = cpu_to_le32(msg_size);
558
559         dout("build_request msg_size was %d\n", (int)msg_size);
560 }
561 EXPORT_SYMBOL(ceph_osdc_build_request);
562
563 /*
564  * build new request AND message, calculate layout, and adjust file
565  * extent as needed.
566  *
567  * if the file was recently truncated, we include information about its
568  * old and new size so that the object can be updated appropriately.  (we
569  * avoid synchronously deleting truncated objects because it's slow.)
570  *
571  * if @do_sync, include a 'startsync' command so that the osd will flush
572  * data quickly.
573  */
574 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
575                                                struct ceph_file_layout *layout,
576                                                struct ceph_vino vino,
577                                                u64 off, u64 *plen, int num_ops,
578                                                struct ceph_osd_req_op *ops,
579                                                int opcode, int flags,
580                                                struct ceph_snap_context *snapc,
581                                                u32 truncate_seq,
582                                                u64 truncate_size,
583                                                bool use_mempool)
584 {
585         struct ceph_osd_request *req;
586         u64 objnum = 0;
587         u64 objoff = 0;
588         u64 objlen = 0;
589         u32 object_size;
590         u64 object_base;
591         int r;
592
593         BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
594
595         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
596                                         GFP_NOFS);
597         if (!req)
598                 return ERR_PTR(-ENOMEM);
599         req->r_flags = flags;
600
601         /* calculate max write size */
602         r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
603         if (r < 0) {
604                 ceph_osdc_put_request(req);
605                 return ERR_PTR(r);
606         }
607
608         object_size = le32_to_cpu(layout->fl_object_size);
609         object_base = off - objoff;
610         if (truncate_size <= object_base) {
611                 truncate_size = 0;
612         } else {
613                 truncate_size -= object_base;
614                 if (truncate_size > object_size)
615                         truncate_size = object_size;
616         }
617
618         osd_req_op_extent_init(&ops[0], opcode, objoff, objlen,
619                                 truncate_size, truncate_seq);
620         /*
621          * A second op in the ops array means the caller wants to
622          * also issue a include a 'startsync' command so that the
623          * osd will flush data quickly.
624          */
625         if (num_ops > 1)
626                 osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC);
627
628         req->r_file_layout = *layout;  /* keep a copy */
629
630         snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
631                 vino.ino, objnum);
632         req->r_oid_len = strlen(req->r_oid);
633
634         return req;
635 }
636 EXPORT_SYMBOL(ceph_osdc_new_request);
637
638 /*
639  * We keep osd requests in an rbtree, sorted by ->r_tid.
640  */
641 static void __insert_request(struct ceph_osd_client *osdc,
642                              struct ceph_osd_request *new)
643 {
644         struct rb_node **p = &osdc->requests.rb_node;
645         struct rb_node *parent = NULL;
646         struct ceph_osd_request *req = NULL;
647
648         while (*p) {
649                 parent = *p;
650                 req = rb_entry(parent, struct ceph_osd_request, r_node);
651                 if (new->r_tid < req->r_tid)
652                         p = &(*p)->rb_left;
653                 else if (new->r_tid > req->r_tid)
654                         p = &(*p)->rb_right;
655                 else
656                         BUG();
657         }
658
659         rb_link_node(&new->r_node, parent, p);
660         rb_insert_color(&new->r_node, &osdc->requests);
661 }
662
663 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
664                                                  u64 tid)
665 {
666         struct ceph_osd_request *req;
667         struct rb_node *n = osdc->requests.rb_node;
668
669         while (n) {
670                 req = rb_entry(n, struct ceph_osd_request, r_node);
671                 if (tid < req->r_tid)
672                         n = n->rb_left;
673                 else if (tid > req->r_tid)
674                         n = n->rb_right;
675                 else
676                         return req;
677         }
678         return NULL;
679 }
680
681 static struct ceph_osd_request *
682 __lookup_request_ge(struct ceph_osd_client *osdc,
683                     u64 tid)
684 {
685         struct ceph_osd_request *req;
686         struct rb_node *n = osdc->requests.rb_node;
687
688         while (n) {
689                 req = rb_entry(n, struct ceph_osd_request, r_node);
690                 if (tid < req->r_tid) {
691                         if (!n->rb_left)
692                                 return req;
693                         n = n->rb_left;
694                 } else if (tid > req->r_tid) {
695                         n = n->rb_right;
696                 } else {
697                         return req;
698                 }
699         }
700         return NULL;
701 }
702
703 /*
704  * Resubmit requests pending on the given osd.
705  */
706 static void __kick_osd_requests(struct ceph_osd_client *osdc,
707                                 struct ceph_osd *osd)
708 {
709         struct ceph_osd_request *req, *nreq;
710         LIST_HEAD(resend);
711         int err;
712
713         dout("__kick_osd_requests osd%d\n", osd->o_osd);
714         err = __reset_osd(osdc, osd);
715         if (err)
716                 return;
717         /*
718          * Build up a list of requests to resend by traversing the
719          * osd's list of requests.  Requests for a given object are
720          * sent in tid order, and that is also the order they're
721          * kept on this list.  Therefore all requests that are in
722          * flight will be found first, followed by all requests that
723          * have not yet been sent.  And to resend requests while
724          * preserving this order we will want to put any sent
725          * requests back on the front of the osd client's unsent
726          * list.
727          *
728          * So we build a separate ordered list of already-sent
729          * requests for the affected osd and splice it onto the
730          * front of the osd client's unsent list.  Once we've seen a
731          * request that has not yet been sent we're done.  Those
732          * requests are already sitting right where they belong.
733          */
734         list_for_each_entry(req, &osd->o_requests, r_osd_item) {
735                 if (!req->r_sent)
736                         break;
737                 list_move_tail(&req->r_req_lru_item, &resend);
738                 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
739                      osd->o_osd);
740                 if (!req->r_linger)
741                         req->r_flags |= CEPH_OSD_FLAG_RETRY;
742         }
743         list_splice(&resend, &osdc->req_unsent);
744
745         /*
746          * Linger requests are re-registered before sending, which
747          * sets up a new tid for each.  We add them to the unsent
748          * list at the end to keep things in tid order.
749          */
750         list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
751                                  r_linger_osd) {
752                 /*
753                  * reregister request prior to unregistering linger so
754                  * that r_osd is preserved.
755                  */
756                 BUG_ON(!list_empty(&req->r_req_lru_item));
757                 __register_request(osdc, req);
758                 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
759                 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
760                 __unregister_linger_request(osdc, req);
761                 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
762                      osd->o_osd);
763         }
764 }
765
766 /*
767  * If the osd connection drops, we need to resubmit all requests.
768  */
769 static void osd_reset(struct ceph_connection *con)
770 {
771         struct ceph_osd *osd = con->private;
772         struct ceph_osd_client *osdc;
773
774         if (!osd)
775                 return;
776         dout("osd_reset osd%d\n", osd->o_osd);
777         osdc = osd->o_osdc;
778         down_read(&osdc->map_sem);
779         mutex_lock(&osdc->request_mutex);
780         __kick_osd_requests(osdc, osd);
781         __send_queued(osdc);
782         mutex_unlock(&osdc->request_mutex);
783         up_read(&osdc->map_sem);
784 }
785
786 /*
787  * Track open sessions with osds.
788  */
789 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
790 {
791         struct ceph_osd *osd;
792
793         osd = kzalloc(sizeof(*osd), GFP_NOFS);
794         if (!osd)
795                 return NULL;
796
797         atomic_set(&osd->o_ref, 1);
798         osd->o_osdc = osdc;
799         osd->o_osd = onum;
800         RB_CLEAR_NODE(&osd->o_node);
801         INIT_LIST_HEAD(&osd->o_requests);
802         INIT_LIST_HEAD(&osd->o_linger_requests);
803         INIT_LIST_HEAD(&osd->o_osd_lru);
804         osd->o_incarnation = 1;
805
806         ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
807
808         INIT_LIST_HEAD(&osd->o_keepalive_item);
809         return osd;
810 }
811
812 static struct ceph_osd *get_osd(struct ceph_osd *osd)
813 {
814         if (atomic_inc_not_zero(&osd->o_ref)) {
815                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
816                      atomic_read(&osd->o_ref));
817                 return osd;
818         } else {
819                 dout("get_osd %p FAIL\n", osd);
820                 return NULL;
821         }
822 }
823
824 static void put_osd(struct ceph_osd *osd)
825 {
826         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
827              atomic_read(&osd->o_ref) - 1);
828         if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
829                 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
830
831                 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
832                 kfree(osd);
833         }
834 }
835
836 /*
837  * remove an osd from our map
838  */
839 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
840 {
841         dout("__remove_osd %p\n", osd);
842         BUG_ON(!list_empty(&osd->o_requests));
843         rb_erase(&osd->o_node, &osdc->osds);
844         list_del_init(&osd->o_osd_lru);
845         ceph_con_close(&osd->o_con);
846         put_osd(osd);
847 }
848
849 static void remove_all_osds(struct ceph_osd_client *osdc)
850 {
851         dout("%s %p\n", __func__, osdc);
852         mutex_lock(&osdc->request_mutex);
853         while (!RB_EMPTY_ROOT(&osdc->osds)) {
854                 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
855                                                 struct ceph_osd, o_node);
856                 __remove_osd(osdc, osd);
857         }
858         mutex_unlock(&osdc->request_mutex);
859 }
860
861 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
862                               struct ceph_osd *osd)
863 {
864         dout("__move_osd_to_lru %p\n", osd);
865         BUG_ON(!list_empty(&osd->o_osd_lru));
866         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
867         osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
868 }
869
870 static void __remove_osd_from_lru(struct ceph_osd *osd)
871 {
872         dout("__remove_osd_from_lru %p\n", osd);
873         if (!list_empty(&osd->o_osd_lru))
874                 list_del_init(&osd->o_osd_lru);
875 }
876
877 static void remove_old_osds(struct ceph_osd_client *osdc)
878 {
879         struct ceph_osd *osd, *nosd;
880
881         dout("__remove_old_osds %p\n", osdc);
882         mutex_lock(&osdc->request_mutex);
883         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
884                 if (time_before(jiffies, osd->lru_ttl))
885                         break;
886                 __remove_osd(osdc, osd);
887         }
888         mutex_unlock(&osdc->request_mutex);
889 }
890
891 /*
892  * reset osd connect
893  */
894 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
895 {
896         struct ceph_entity_addr *peer_addr;
897
898         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
899         if (list_empty(&osd->o_requests) &&
900             list_empty(&osd->o_linger_requests)) {
901                 __remove_osd(osdc, osd);
902
903                 return -ENODEV;
904         }
905
906         peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
907         if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
908                         !ceph_con_opened(&osd->o_con)) {
909                 struct ceph_osd_request *req;
910
911                 dout(" osd addr hasn't changed and connection never opened,"
912                      " letting msgr retry");
913                 /* touch each r_stamp for handle_timeout()'s benfit */
914                 list_for_each_entry(req, &osd->o_requests, r_osd_item)
915                         req->r_stamp = jiffies;
916
917                 return -EAGAIN;
918         }
919
920         ceph_con_close(&osd->o_con);
921         ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
922         osd->o_incarnation++;
923
924         return 0;
925 }
926
927 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
928 {
929         struct rb_node **p = &osdc->osds.rb_node;
930         struct rb_node *parent = NULL;
931         struct ceph_osd *osd = NULL;
932
933         dout("__insert_osd %p osd%d\n", new, new->o_osd);
934         while (*p) {
935                 parent = *p;
936                 osd = rb_entry(parent, struct ceph_osd, o_node);
937                 if (new->o_osd < osd->o_osd)
938                         p = &(*p)->rb_left;
939                 else if (new->o_osd > osd->o_osd)
940                         p = &(*p)->rb_right;
941                 else
942                         BUG();
943         }
944
945         rb_link_node(&new->o_node, parent, p);
946         rb_insert_color(&new->o_node, &osdc->osds);
947 }
948
949 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
950 {
951         struct ceph_osd *osd;
952         struct rb_node *n = osdc->osds.rb_node;
953
954         while (n) {
955                 osd = rb_entry(n, struct ceph_osd, o_node);
956                 if (o < osd->o_osd)
957                         n = n->rb_left;
958                 else if (o > osd->o_osd)
959                         n = n->rb_right;
960                 else
961                         return osd;
962         }
963         return NULL;
964 }
965
966 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
967 {
968         schedule_delayed_work(&osdc->timeout_work,
969                         osdc->client->options->osd_keepalive_timeout * HZ);
970 }
971
972 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
973 {
974         cancel_delayed_work(&osdc->timeout_work);
975 }
976
977 /*
978  * Register request, assign tid.  If this is the first request, set up
979  * the timeout event.
980  */
981 static void __register_request(struct ceph_osd_client *osdc,
982                                struct ceph_osd_request *req)
983 {
984         req->r_tid = ++osdc->last_tid;
985         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
986         dout("__register_request %p tid %lld\n", req, req->r_tid);
987         __insert_request(osdc, req);
988         ceph_osdc_get_request(req);
989         osdc->num_requests++;
990         if (osdc->num_requests == 1) {
991                 dout(" first request, scheduling timeout\n");
992                 __schedule_osd_timeout(osdc);
993         }
994 }
995
996 /*
997  * called under osdc->request_mutex
998  */
999 static void __unregister_request(struct ceph_osd_client *osdc,
1000                                  struct ceph_osd_request *req)
1001 {
1002         if (RB_EMPTY_NODE(&req->r_node)) {
1003                 dout("__unregister_request %p tid %lld not registered\n",
1004                         req, req->r_tid);
1005                 return;
1006         }
1007
1008         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1009         rb_erase(&req->r_node, &osdc->requests);
1010         osdc->num_requests--;
1011
1012         if (req->r_osd) {
1013                 /* make sure the original request isn't in flight. */
1014                 ceph_msg_revoke(req->r_request);
1015
1016                 list_del_init(&req->r_osd_item);
1017                 if (list_empty(&req->r_osd->o_requests) &&
1018                     list_empty(&req->r_osd->o_linger_requests)) {
1019                         dout("moving osd to %p lru\n", req->r_osd);
1020                         __move_osd_to_lru(osdc, req->r_osd);
1021                 }
1022                 if (list_empty(&req->r_linger_item))
1023                         req->r_osd = NULL;
1024         }
1025
1026         list_del_init(&req->r_req_lru_item);
1027         ceph_osdc_put_request(req);
1028
1029         if (osdc->num_requests == 0) {
1030                 dout(" no requests, canceling timeout\n");
1031                 __cancel_osd_timeout(osdc);
1032         }
1033 }
1034
1035 /*
1036  * Cancel a previously queued request message
1037  */
1038 static void __cancel_request(struct ceph_osd_request *req)
1039 {
1040         if (req->r_sent && req->r_osd) {
1041                 ceph_msg_revoke(req->r_request);
1042                 req->r_sent = 0;
1043         }
1044 }
1045
1046 static void __register_linger_request(struct ceph_osd_client *osdc,
1047                                     struct ceph_osd_request *req)
1048 {
1049         dout("__register_linger_request %p\n", req);
1050         list_add_tail(&req->r_linger_item, &osdc->req_linger);
1051         if (req->r_osd)
1052                 list_add_tail(&req->r_linger_osd,
1053                               &req->r_osd->o_linger_requests);
1054 }
1055
1056 static void __unregister_linger_request(struct ceph_osd_client *osdc,
1057                                         struct ceph_osd_request *req)
1058 {
1059         dout("__unregister_linger_request %p\n", req);
1060         list_del_init(&req->r_linger_item);
1061         if (req->r_osd) {
1062                 list_del_init(&req->r_linger_osd);
1063
1064                 if (list_empty(&req->r_osd->o_requests) &&
1065                     list_empty(&req->r_osd->o_linger_requests)) {
1066                         dout("moving osd to %p lru\n", req->r_osd);
1067                         __move_osd_to_lru(osdc, req->r_osd);
1068                 }
1069                 if (list_empty(&req->r_osd_item))
1070                         req->r_osd = NULL;
1071         }
1072 }
1073
1074 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
1075                                          struct ceph_osd_request *req)
1076 {
1077         mutex_lock(&osdc->request_mutex);
1078         if (req->r_linger) {
1079                 __unregister_linger_request(osdc, req);
1080                 ceph_osdc_put_request(req);
1081         }
1082         mutex_unlock(&osdc->request_mutex);
1083 }
1084 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
1085
1086 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1087                                   struct ceph_osd_request *req)
1088 {
1089         if (!req->r_linger) {
1090                 dout("set_request_linger %p\n", req);
1091                 req->r_linger = 1;
1092                 /*
1093                  * caller is now responsible for calling
1094                  * unregister_linger_request
1095                  */
1096                 ceph_osdc_get_request(req);
1097         }
1098 }
1099 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1100
1101 /*
1102  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1103  * (as needed), and set the request r_osd appropriately.  If there is
1104  * no up osd, set r_osd to NULL.  Move the request to the appropriate list
1105  * (unsent, homeless) or leave on in-flight lru.
1106  *
1107  * Return 0 if unchanged, 1 if changed, or negative on error.
1108  *
1109  * Caller should hold map_sem for read and request_mutex.
1110  */
1111 static int __map_request(struct ceph_osd_client *osdc,
1112                          struct ceph_osd_request *req, int force_resend)
1113 {
1114         struct ceph_pg pgid;
1115         int acting[CEPH_PG_MAX_SIZE];
1116         int o = -1, num = 0;
1117         int err;
1118
1119         dout("map_request %p tid %lld\n", req, req->r_tid);
1120         err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
1121                                 ceph_file_layout_pg_pool(req->r_file_layout));
1122         if (err) {
1123                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1124                 return err;
1125         }
1126         req->r_pgid = pgid;
1127
1128         err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
1129         if (err > 0) {
1130                 o = acting[0];
1131                 num = err;
1132         }
1133
1134         if ((!force_resend &&
1135              req->r_osd && req->r_osd->o_osd == o &&
1136              req->r_sent >= req->r_osd->o_incarnation &&
1137              req->r_num_pg_osds == num &&
1138              memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
1139             (req->r_osd == NULL && o == -1))
1140                 return 0;  /* no change */
1141
1142         dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
1143              req->r_tid, pgid.pool, pgid.seed, o,
1144              req->r_osd ? req->r_osd->o_osd : -1);
1145
1146         /* record full pg acting set */
1147         memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
1148         req->r_num_pg_osds = num;
1149
1150         if (req->r_osd) {
1151                 __cancel_request(req);
1152                 list_del_init(&req->r_osd_item);
1153                 req->r_osd = NULL;
1154         }
1155
1156         req->r_osd = __lookup_osd(osdc, o);
1157         if (!req->r_osd && o >= 0) {
1158                 err = -ENOMEM;
1159                 req->r_osd = create_osd(osdc, o);
1160                 if (!req->r_osd) {
1161                         list_move(&req->r_req_lru_item, &osdc->req_notarget);
1162                         goto out;
1163                 }
1164
1165                 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1166                 __insert_osd(osdc, req->r_osd);
1167
1168                 ceph_con_open(&req->r_osd->o_con,
1169                               CEPH_ENTITY_TYPE_OSD, o,
1170                               &osdc->osdmap->osd_addr[o]);
1171         }
1172
1173         if (req->r_osd) {
1174                 __remove_osd_from_lru(req->r_osd);
1175                 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
1176                 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
1177         } else {
1178                 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
1179         }
1180         err = 1;   /* osd or pg changed */
1181
1182 out:
1183         return err;
1184 }
1185
1186 /*
1187  * caller should hold map_sem (for read) and request_mutex
1188  */
1189 static void __send_request(struct ceph_osd_client *osdc,
1190                            struct ceph_osd_request *req)
1191 {
1192         void *p;
1193
1194         dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1195              req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1196              (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1197
1198         /* fill in message content that changes each time we send it */
1199         put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1200         put_unaligned_le32(req->r_flags, req->r_request_flags);
1201         put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1202         p = req->r_request_pgid;
1203         ceph_encode_64(&p, req->r_pgid.pool);
1204         ceph_encode_32(&p, req->r_pgid.seed);
1205         put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
1206         memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1207                sizeof(req->r_reassert_version));
1208
1209         req->r_stamp = jiffies;
1210         list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1211
1212         ceph_msg_get(req->r_request); /* send consumes a ref */
1213         ceph_con_send(&req->r_osd->o_con, req->r_request);
1214         req->r_sent = req->r_osd->o_incarnation;
1215 }
1216
1217 /*
1218  * Send any requests in the queue (req_unsent).
1219  */
1220 static void __send_queued(struct ceph_osd_client *osdc)
1221 {
1222         struct ceph_osd_request *req, *tmp;
1223
1224         dout("__send_queued\n");
1225         list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1226                 __send_request(osdc, req);
1227 }
1228
1229 /*
1230  * Timeout callback, called every N seconds when 1 or more osd
1231  * requests has been active for more than N seconds.  When this
1232  * happens, we ping all OSDs with requests who have timed out to
1233  * ensure any communications channel reset is detected.  Reset the
1234  * request timeouts another N seconds in the future as we go.
1235  * Reschedule the timeout event another N seconds in future (unless
1236  * there are no open requests).
1237  */
1238 static void handle_timeout(struct work_struct *work)
1239 {
1240         struct ceph_osd_client *osdc =
1241                 container_of(work, struct ceph_osd_client, timeout_work.work);
1242         struct ceph_osd_request *req;
1243         struct ceph_osd *osd;
1244         unsigned long keepalive =
1245                 osdc->client->options->osd_keepalive_timeout * HZ;
1246         struct list_head slow_osds;
1247         dout("timeout\n");
1248         down_read(&osdc->map_sem);
1249
1250         ceph_monc_request_next_osdmap(&osdc->client->monc);
1251
1252         mutex_lock(&osdc->request_mutex);
1253
1254         /*
1255          * ping osds that are a bit slow.  this ensures that if there
1256          * is a break in the TCP connection we will notice, and reopen
1257          * a connection with that osd (from the fault callback).
1258          */
1259         INIT_LIST_HEAD(&slow_osds);
1260         list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1261                 if (time_before(jiffies, req->r_stamp + keepalive))
1262                         break;
1263
1264                 osd = req->r_osd;
1265                 BUG_ON(!osd);
1266                 dout(" tid %llu is slow, will send keepalive on osd%d\n",
1267                      req->r_tid, osd->o_osd);
1268                 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1269         }
1270         while (!list_empty(&slow_osds)) {
1271                 osd = list_entry(slow_osds.next, struct ceph_osd,
1272                                  o_keepalive_item);
1273                 list_del_init(&osd->o_keepalive_item);
1274                 ceph_con_keepalive(&osd->o_con);
1275         }
1276
1277         __schedule_osd_timeout(osdc);
1278         __send_queued(osdc);
1279         mutex_unlock(&osdc->request_mutex);
1280         up_read(&osdc->map_sem);
1281 }
1282
1283 static void handle_osds_timeout(struct work_struct *work)
1284 {
1285         struct ceph_osd_client *osdc =
1286                 container_of(work, struct ceph_osd_client,
1287                              osds_timeout_work.work);
1288         unsigned long delay =
1289                 osdc->client->options->osd_idle_ttl * HZ >> 2;
1290
1291         dout("osds timeout\n");
1292         down_read(&osdc->map_sem);
1293         remove_old_osds(osdc);
1294         up_read(&osdc->map_sem);
1295
1296         schedule_delayed_work(&osdc->osds_timeout_work,
1297                               round_jiffies_relative(delay));
1298 }
1299
1300 static void complete_request(struct ceph_osd_request *req)
1301 {
1302         if (req->r_safe_callback)
1303                 req->r_safe_callback(req, NULL);
1304         complete_all(&req->r_safe_completion);  /* fsync waiter */
1305 }
1306
1307 /*
1308  * handle osd op reply.  either call the callback if it is specified,
1309  * or do the completion to wake up the waiting thread.
1310  */
1311 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1312                          struct ceph_connection *con)
1313 {
1314         void *p, *end;
1315         struct ceph_osd_request *req;
1316         u64 tid;
1317         int object_len;
1318         int numops, payload_len, flags;
1319         s32 result;
1320         s32 retry_attempt;
1321         struct ceph_pg pg;
1322         int err;
1323         u32 reassert_epoch;
1324         u64 reassert_version;
1325         u32 osdmap_epoch;
1326         int already_completed;
1327         u32 bytes;
1328         int i;
1329
1330         tid = le64_to_cpu(msg->hdr.tid);
1331         dout("handle_reply %p tid %llu\n", msg, tid);
1332
1333         p = msg->front.iov_base;
1334         end = p + msg->front.iov_len;
1335
1336         ceph_decode_need(&p, end, 4, bad);
1337         object_len = ceph_decode_32(&p);
1338         ceph_decode_need(&p, end, object_len, bad);
1339         p += object_len;
1340
1341         err = ceph_decode_pgid(&p, end, &pg);
1342         if (err)
1343                 goto bad;
1344
1345         ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1346         flags = ceph_decode_64(&p);
1347         result = ceph_decode_32(&p);
1348         reassert_epoch = ceph_decode_32(&p);
1349         reassert_version = ceph_decode_64(&p);
1350         osdmap_epoch = ceph_decode_32(&p);
1351
1352         /* lookup */
1353         mutex_lock(&osdc->request_mutex);
1354         req = __lookup_request(osdc, tid);
1355         if (req == NULL) {
1356                 dout("handle_reply tid %llu dne\n", tid);
1357                 goto bad_mutex;
1358         }
1359         ceph_osdc_get_request(req);
1360
1361         dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1362              req, result);
1363
1364         ceph_decode_need(&p, end, 4, bad);
1365         numops = ceph_decode_32(&p);
1366         if (numops > CEPH_OSD_MAX_OP)
1367                 goto bad_put;
1368         if (numops != req->r_num_ops)
1369                 goto bad_put;
1370         payload_len = 0;
1371         ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1372         for (i = 0; i < numops; i++) {
1373                 struct ceph_osd_op *op = p;
1374                 int len;
1375
1376                 len = le32_to_cpu(op->payload_len);
1377                 req->r_reply_op_len[i] = len;
1378                 dout(" op %d has %d bytes\n", i, len);
1379                 payload_len += len;
1380                 p += sizeof(*op);
1381         }
1382         bytes = le32_to_cpu(msg->hdr.data_len);
1383         if (payload_len != bytes) {
1384                 pr_warning("sum of op payload lens %d != data_len %d",
1385                            payload_len, bytes);
1386                 goto bad_put;
1387         }
1388
1389         ceph_decode_need(&p, end, 4 + numops * 4, bad);
1390         retry_attempt = ceph_decode_32(&p);
1391         for (i = 0; i < numops; i++)
1392                 req->r_reply_op_result[i] = ceph_decode_32(&p);
1393
1394         if (!req->r_got_reply) {
1395
1396                 req->r_result = result;
1397                 dout("handle_reply result %d bytes %d\n", req->r_result,
1398                      bytes);
1399                 if (req->r_result == 0)
1400                         req->r_result = bytes;
1401
1402                 /* in case this is a write and we need to replay, */
1403                 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1404                 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1405
1406                 req->r_got_reply = 1;
1407         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1408                 dout("handle_reply tid %llu dup ack\n", tid);
1409                 mutex_unlock(&osdc->request_mutex);
1410                 goto done;
1411         }
1412
1413         dout("handle_reply tid %llu flags %d\n", tid, flags);
1414
1415         if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1416                 __register_linger_request(osdc, req);
1417
1418         /* either this is a read, or we got the safe response */
1419         if (result < 0 ||
1420             (flags & CEPH_OSD_FLAG_ONDISK) ||
1421             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1422                 __unregister_request(osdc, req);
1423
1424         already_completed = req->r_completed;
1425         req->r_completed = 1;
1426         mutex_unlock(&osdc->request_mutex);
1427         if (already_completed)
1428                 goto done;
1429
1430         if (req->r_callback)
1431                 req->r_callback(req, msg);
1432         else
1433                 complete_all(&req->r_completion);
1434
1435         if (flags & CEPH_OSD_FLAG_ONDISK)
1436                 complete_request(req);
1437
1438 done:
1439         dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1440         ceph_osdc_put_request(req);
1441         return;
1442
1443 bad_put:
1444         ceph_osdc_put_request(req);
1445 bad_mutex:
1446         mutex_unlock(&osdc->request_mutex);
1447 bad:
1448         pr_err("corrupt osd_op_reply got %d %d\n",
1449                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1450         ceph_msg_dump(msg);
1451 }
1452
1453 static void reset_changed_osds(struct ceph_osd_client *osdc)
1454 {
1455         struct rb_node *p, *n;
1456
1457         for (p = rb_first(&osdc->osds); p; p = n) {
1458                 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1459
1460                 n = rb_next(p);
1461                 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1462                     memcmp(&osd->o_con.peer_addr,
1463                            ceph_osd_addr(osdc->osdmap,
1464                                          osd->o_osd),
1465                            sizeof(struct ceph_entity_addr)) != 0)
1466                         __reset_osd(osdc, osd);
1467         }
1468 }
1469
1470 /*
1471  * Requeue requests whose mapping to an OSD has changed.  If requests map to
1472  * no osd, request a new map.
1473  *
1474  * Caller should hold map_sem for read.
1475  */
1476 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1477 {
1478         struct ceph_osd_request *req, *nreq;
1479         struct rb_node *p;
1480         int needmap = 0;
1481         int err;
1482
1483         dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1484         mutex_lock(&osdc->request_mutex);
1485         for (p = rb_first(&osdc->requests); p; ) {
1486                 req = rb_entry(p, struct ceph_osd_request, r_node);
1487                 p = rb_next(p);
1488
1489                 /*
1490                  * For linger requests that have not yet been
1491                  * registered, move them to the linger list; they'll
1492                  * be sent to the osd in the loop below.  Unregister
1493                  * the request before re-registering it as a linger
1494                  * request to ensure the __map_request() below
1495                  * will decide it needs to be sent.
1496                  */
1497                 if (req->r_linger && list_empty(&req->r_linger_item)) {
1498                         dout("%p tid %llu restart on osd%d\n",
1499                              req, req->r_tid,
1500                              req->r_osd ? req->r_osd->o_osd : -1);
1501                         __unregister_request(osdc, req);
1502                         __register_linger_request(osdc, req);
1503                         continue;
1504                 }
1505
1506                 err = __map_request(osdc, req, force_resend);
1507                 if (err < 0)
1508                         continue;  /* error */
1509                 if (req->r_osd == NULL) {
1510                         dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1511                         needmap++;  /* request a newer map */
1512                 } else if (err > 0) {
1513                         if (!req->r_linger) {
1514                                 dout("%p tid %llu requeued on osd%d\n", req,
1515                                      req->r_tid,
1516                                      req->r_osd ? req->r_osd->o_osd : -1);
1517                                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1518                         }
1519                 }
1520         }
1521
1522         list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1523                                  r_linger_item) {
1524                 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1525
1526                 err = __map_request(osdc, req, force_resend);
1527                 dout("__map_request returned %d\n", err);
1528                 if (err == 0)
1529                         continue;  /* no change and no osd was specified */
1530                 if (err < 0)
1531                         continue;  /* hrm! */
1532                 if (req->r_osd == NULL) {
1533                         dout("tid %llu maps to no valid osd\n", req->r_tid);
1534                         needmap++;  /* request a newer map */
1535                         continue;
1536                 }
1537
1538                 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1539                      req->r_osd ? req->r_osd->o_osd : -1);
1540                 __register_request(osdc, req);
1541                 __unregister_linger_request(osdc, req);
1542         }
1543         mutex_unlock(&osdc->request_mutex);
1544
1545         if (needmap) {
1546                 dout("%d requests for down osds, need new map\n", needmap);
1547                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1548         }
1549         reset_changed_osds(osdc);
1550 }
1551
1552
1553 /*
1554  * Process updated osd map.
1555  *
1556  * The message contains any number of incremental and full maps, normally
1557  * indicating some sort of topology change in the cluster.  Kick requests
1558  * off to different OSDs as needed.
1559  */
1560 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1561 {
1562         void *p, *end, *next;
1563         u32 nr_maps, maplen;
1564         u32 epoch;
1565         struct ceph_osdmap *newmap = NULL, *oldmap;
1566         int err;
1567         struct ceph_fsid fsid;
1568
1569         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1570         p = msg->front.iov_base;
1571         end = p + msg->front.iov_len;
1572
1573         /* verify fsid */
1574         ceph_decode_need(&p, end, sizeof(fsid), bad);
1575         ceph_decode_copy(&p, &fsid, sizeof(fsid));
1576         if (ceph_check_fsid(osdc->client, &fsid) < 0)
1577                 return;
1578
1579         down_write(&osdc->map_sem);
1580
1581         /* incremental maps */
1582         ceph_decode_32_safe(&p, end, nr_maps, bad);
1583         dout(" %d inc maps\n", nr_maps);
1584         while (nr_maps > 0) {
1585                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1586                 epoch = ceph_decode_32(&p);
1587                 maplen = ceph_decode_32(&p);
1588                 ceph_decode_need(&p, end, maplen, bad);
1589                 next = p + maplen;
1590                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1591                         dout("applying incremental map %u len %d\n",
1592                              epoch, maplen);
1593                         newmap = osdmap_apply_incremental(&p, next,
1594                                                           osdc->osdmap,
1595                                                           &osdc->client->msgr);
1596                         if (IS_ERR(newmap)) {
1597                                 err = PTR_ERR(newmap);
1598                                 goto bad;
1599                         }
1600                         BUG_ON(!newmap);
1601                         if (newmap != osdc->osdmap) {
1602                                 ceph_osdmap_destroy(osdc->osdmap);
1603                                 osdc->osdmap = newmap;
1604                         }
1605                         kick_requests(osdc, 0);
1606                 } else {
1607                         dout("ignoring incremental map %u len %d\n",
1608                              epoch, maplen);
1609                 }
1610                 p = next;
1611                 nr_maps--;
1612         }
1613         if (newmap)
1614                 goto done;
1615
1616         /* full maps */
1617         ceph_decode_32_safe(&p, end, nr_maps, bad);
1618         dout(" %d full maps\n", nr_maps);
1619         while (nr_maps) {
1620                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1621                 epoch = ceph_decode_32(&p);
1622                 maplen = ceph_decode_32(&p);
1623                 ceph_decode_need(&p, end, maplen, bad);
1624                 if (nr_maps > 1) {
1625                         dout("skipping non-latest full map %u len %d\n",
1626                              epoch, maplen);
1627                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1628                         dout("skipping full map %u len %d, "
1629                              "older than our %u\n", epoch, maplen,
1630                              osdc->osdmap->epoch);
1631                 } else {
1632                         int skipped_map = 0;
1633
1634                         dout("taking full map %u len %d\n", epoch, maplen);
1635                         newmap = osdmap_decode(&p, p+maplen);
1636                         if (IS_ERR(newmap)) {
1637                                 err = PTR_ERR(newmap);
1638                                 goto bad;
1639                         }
1640                         BUG_ON(!newmap);
1641                         oldmap = osdc->osdmap;
1642                         osdc->osdmap = newmap;
1643                         if (oldmap) {
1644                                 if (oldmap->epoch + 1 < newmap->epoch)
1645                                         skipped_map = 1;
1646                                 ceph_osdmap_destroy(oldmap);
1647                         }
1648                         kick_requests(osdc, skipped_map);
1649                 }
1650                 p += maplen;
1651                 nr_maps--;
1652         }
1653
1654 done:
1655         downgrade_write(&osdc->map_sem);
1656         ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1657
1658         /*
1659          * subscribe to subsequent osdmap updates if full to ensure
1660          * we find out when we are no longer full and stop returning
1661          * ENOSPC.
1662          */
1663         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1664                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1665
1666         mutex_lock(&osdc->request_mutex);
1667         __send_queued(osdc);
1668         mutex_unlock(&osdc->request_mutex);
1669         up_read(&osdc->map_sem);
1670         wake_up_all(&osdc->client->auth_wq);
1671         return;
1672
1673 bad:
1674         pr_err("osdc handle_map corrupt msg\n");
1675         ceph_msg_dump(msg);
1676         up_write(&osdc->map_sem);
1677         return;
1678 }
1679
1680 /*
1681  * watch/notify callback event infrastructure
1682  *
1683  * These callbacks are used both for watch and notify operations.
1684  */
1685 static void __release_event(struct kref *kref)
1686 {
1687         struct ceph_osd_event *event =
1688                 container_of(kref, struct ceph_osd_event, kref);
1689
1690         dout("__release_event %p\n", event);
1691         kfree(event);
1692 }
1693
1694 static void get_event(struct ceph_osd_event *event)
1695 {
1696         kref_get(&event->kref);
1697 }
1698
1699 void ceph_osdc_put_event(struct ceph_osd_event *event)
1700 {
1701         kref_put(&event->kref, __release_event);
1702 }
1703 EXPORT_SYMBOL(ceph_osdc_put_event);
1704
1705 static void __insert_event(struct ceph_osd_client *osdc,
1706                              struct ceph_osd_event *new)
1707 {
1708         struct rb_node **p = &osdc->event_tree.rb_node;
1709         struct rb_node *parent = NULL;
1710         struct ceph_osd_event *event = NULL;
1711
1712         while (*p) {
1713                 parent = *p;
1714                 event = rb_entry(parent, struct ceph_osd_event, node);
1715                 if (new->cookie < event->cookie)
1716                         p = &(*p)->rb_left;
1717                 else if (new->cookie > event->cookie)
1718                         p = &(*p)->rb_right;
1719                 else
1720                         BUG();
1721         }
1722
1723         rb_link_node(&new->node, parent, p);
1724         rb_insert_color(&new->node, &osdc->event_tree);
1725 }
1726
1727 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1728                                                 u64 cookie)
1729 {
1730         struct rb_node **p = &osdc->event_tree.rb_node;
1731         struct rb_node *parent = NULL;
1732         struct ceph_osd_event *event = NULL;
1733
1734         while (*p) {
1735                 parent = *p;
1736                 event = rb_entry(parent, struct ceph_osd_event, node);
1737                 if (cookie < event->cookie)
1738                         p = &(*p)->rb_left;
1739                 else if (cookie > event->cookie)
1740                         p = &(*p)->rb_right;
1741                 else
1742                         return event;
1743         }
1744         return NULL;
1745 }
1746
1747 static void __remove_event(struct ceph_osd_event *event)
1748 {
1749         struct ceph_osd_client *osdc = event->osdc;
1750
1751         if (!RB_EMPTY_NODE(&event->node)) {
1752                 dout("__remove_event removed %p\n", event);
1753                 rb_erase(&event->node, &osdc->event_tree);
1754                 ceph_osdc_put_event(event);
1755         } else {
1756                 dout("__remove_event didn't remove %p\n", event);
1757         }
1758 }
1759
1760 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1761                            void (*event_cb)(u64, u64, u8, void *),
1762                            void *data, struct ceph_osd_event **pevent)
1763 {
1764         struct ceph_osd_event *event;
1765
1766         event = kmalloc(sizeof(*event), GFP_NOIO);
1767         if (!event)
1768                 return -ENOMEM;
1769
1770         dout("create_event %p\n", event);
1771         event->cb = event_cb;
1772         event->one_shot = 0;
1773         event->data = data;
1774         event->osdc = osdc;
1775         INIT_LIST_HEAD(&event->osd_node);
1776         RB_CLEAR_NODE(&event->node);
1777         kref_init(&event->kref);   /* one ref for us */
1778         kref_get(&event->kref);    /* one ref for the caller */
1779
1780         spin_lock(&osdc->event_lock);
1781         event->cookie = ++osdc->event_count;
1782         __insert_event(osdc, event);
1783         spin_unlock(&osdc->event_lock);
1784
1785         *pevent = event;
1786         return 0;
1787 }
1788 EXPORT_SYMBOL(ceph_osdc_create_event);
1789
1790 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1791 {
1792         struct ceph_osd_client *osdc = event->osdc;
1793
1794         dout("cancel_event %p\n", event);
1795         spin_lock(&osdc->event_lock);
1796         __remove_event(event);
1797         spin_unlock(&osdc->event_lock);
1798         ceph_osdc_put_event(event); /* caller's */
1799 }
1800 EXPORT_SYMBOL(ceph_osdc_cancel_event);
1801
1802
1803 static void do_event_work(struct work_struct *work)
1804 {
1805         struct ceph_osd_event_work *event_work =
1806                 container_of(work, struct ceph_osd_event_work, work);
1807         struct ceph_osd_event *event = event_work->event;
1808         u64 ver = event_work->ver;
1809         u64 notify_id = event_work->notify_id;
1810         u8 opcode = event_work->opcode;
1811
1812         dout("do_event_work completing %p\n", event);
1813         event->cb(ver, notify_id, opcode, event->data);
1814         dout("do_event_work completed %p\n", event);
1815         ceph_osdc_put_event(event);
1816         kfree(event_work);
1817 }
1818
1819
1820 /*
1821  * Process osd watch notifications
1822  */
1823 static void handle_watch_notify(struct ceph_osd_client *osdc,
1824                                 struct ceph_msg *msg)
1825 {
1826         void *p, *end;
1827         u8 proto_ver;
1828         u64 cookie, ver, notify_id;
1829         u8 opcode;
1830         struct ceph_osd_event *event;
1831         struct ceph_osd_event_work *event_work;
1832
1833         p = msg->front.iov_base;
1834         end = p + msg->front.iov_len;
1835
1836         ceph_decode_8_safe(&p, end, proto_ver, bad);
1837         ceph_decode_8_safe(&p, end, opcode, bad);
1838         ceph_decode_64_safe(&p, end, cookie, bad);
1839         ceph_decode_64_safe(&p, end, ver, bad);
1840         ceph_decode_64_safe(&p, end, notify_id, bad);
1841
1842         spin_lock(&osdc->event_lock);
1843         event = __find_event(osdc, cookie);
1844         if (event) {
1845                 BUG_ON(event->one_shot);
1846                 get_event(event);
1847         }
1848         spin_unlock(&osdc->event_lock);
1849         dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1850              cookie, ver, event);
1851         if (event) {
1852                 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1853                 if (!event_work) {
1854                         dout("ERROR: could not allocate event_work\n");
1855                         goto done_err;
1856                 }
1857                 INIT_WORK(&event_work->work, do_event_work);
1858                 event_work->event = event;
1859                 event_work->ver = ver;
1860                 event_work->notify_id = notify_id;
1861                 event_work->opcode = opcode;
1862                 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1863                         dout("WARNING: failed to queue notify event work\n");
1864                         goto done_err;
1865                 }
1866         }
1867
1868         return;
1869
1870 done_err:
1871         ceph_osdc_put_event(event);
1872         return;
1873
1874 bad:
1875         pr_err("osdc handle_watch_notify corrupt msg\n");
1876         return;
1877 }
1878
1879 static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
1880                                 struct ceph_osd_data *osd_data)
1881 {
1882         if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
1883                 BUG_ON(osd_data->length > (u64) SIZE_MAX);
1884                 if (osd_data->length)
1885                         ceph_msg_data_set_pages(msg, osd_data->pages,
1886                                 osd_data->length, osd_data->alignment);
1887         } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
1888                 BUG_ON(!osd_data->pagelist->length);
1889                 ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
1890 #ifdef CONFIG_BLOCK
1891         } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
1892                 ceph_msg_data_set_bio(msg, osd_data->bio, osd_data->bio_length);
1893 #endif
1894         } else {
1895                 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
1896         }
1897 }
1898
1899 /*
1900  * Register request, send initial attempt.
1901  */
1902 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1903                             struct ceph_osd_request *req,
1904                             bool nofail)
1905 {
1906         int rc = 0;
1907
1908         /* Set up response incoming data and request outgoing data fields */
1909
1910         ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in);
1911         ceph_osdc_msg_data_set(req->r_request, &req->r_data_out);
1912
1913         down_read(&osdc->map_sem);
1914         mutex_lock(&osdc->request_mutex);
1915         __register_request(osdc, req);
1916         WARN_ON(req->r_sent);
1917         rc = __map_request(osdc, req, 0);
1918         if (rc < 0) {
1919                 if (nofail) {
1920                         dout("osdc_start_request failed map, "
1921                                 " will retry %lld\n", req->r_tid);
1922                         rc = 0;
1923                 }
1924                 goto out_unlock;
1925         }
1926         if (req->r_osd == NULL) {
1927                 dout("send_request %p no up osds in pg\n", req);
1928                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1929         } else {
1930                 __send_queued(osdc);
1931         }
1932         rc = 0;
1933 out_unlock:
1934         mutex_unlock(&osdc->request_mutex);
1935         up_read(&osdc->map_sem);
1936         return rc;
1937 }
1938 EXPORT_SYMBOL(ceph_osdc_start_request);
1939
1940 /*
1941  * wait for a request to complete
1942  */
1943 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1944                            struct ceph_osd_request *req)
1945 {
1946         int rc;
1947
1948         rc = wait_for_completion_interruptible(&req->r_completion);
1949         if (rc < 0) {
1950                 mutex_lock(&osdc->request_mutex);
1951                 __cancel_request(req);
1952                 __unregister_request(osdc, req);
1953                 mutex_unlock(&osdc->request_mutex);
1954                 complete_request(req);
1955                 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1956                 return rc;
1957         }
1958
1959         dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1960         return req->r_result;
1961 }
1962 EXPORT_SYMBOL(ceph_osdc_wait_request);
1963
1964 /*
1965  * sync - wait for all in-flight requests to flush.  avoid starvation.
1966  */
1967 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1968 {
1969         struct ceph_osd_request *req;
1970         u64 last_tid, next_tid = 0;
1971
1972         mutex_lock(&osdc->request_mutex);
1973         last_tid = osdc->last_tid;
1974         while (1) {
1975                 req = __lookup_request_ge(osdc, next_tid);
1976                 if (!req)
1977                         break;
1978                 if (req->r_tid > last_tid)
1979                         break;
1980
1981                 next_tid = req->r_tid + 1;
1982                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1983                         continue;
1984
1985                 ceph_osdc_get_request(req);
1986                 mutex_unlock(&osdc->request_mutex);
1987                 dout("sync waiting on tid %llu (last is %llu)\n",
1988                      req->r_tid, last_tid);
1989                 wait_for_completion(&req->r_safe_completion);
1990                 mutex_lock(&osdc->request_mutex);
1991                 ceph_osdc_put_request(req);
1992         }
1993         mutex_unlock(&osdc->request_mutex);
1994         dout("sync done (thru tid %llu)\n", last_tid);
1995 }
1996 EXPORT_SYMBOL(ceph_osdc_sync);
1997
1998 /*
1999  * init, shutdown
2000  */
2001 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2002 {
2003         int err;
2004
2005         dout("init\n");
2006         osdc->client = client;
2007         osdc->osdmap = NULL;
2008         init_rwsem(&osdc->map_sem);
2009         init_completion(&osdc->map_waiters);
2010         osdc->last_requested_map = 0;
2011         mutex_init(&osdc->request_mutex);
2012         osdc->last_tid = 0;
2013         osdc->osds = RB_ROOT;
2014         INIT_LIST_HEAD(&osdc->osd_lru);
2015         osdc->requests = RB_ROOT;
2016         INIT_LIST_HEAD(&osdc->req_lru);
2017         INIT_LIST_HEAD(&osdc->req_unsent);
2018         INIT_LIST_HEAD(&osdc->req_notarget);
2019         INIT_LIST_HEAD(&osdc->req_linger);
2020         osdc->num_requests = 0;
2021         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
2022         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
2023         spin_lock_init(&osdc->event_lock);
2024         osdc->event_tree = RB_ROOT;
2025         osdc->event_count = 0;
2026
2027         schedule_delayed_work(&osdc->osds_timeout_work,
2028            round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
2029
2030         err = -ENOMEM;
2031         osdc->req_mempool = mempool_create_kmalloc_pool(10,
2032                                         sizeof(struct ceph_osd_request));
2033         if (!osdc->req_mempool)
2034                 goto out;
2035
2036         err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
2037                                 OSD_OP_FRONT_LEN, 10, true,
2038                                 "osd_op");
2039         if (err < 0)
2040                 goto out_mempool;
2041         err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
2042                                 OSD_OPREPLY_FRONT_LEN, 10, true,
2043                                 "osd_op_reply");
2044         if (err < 0)
2045                 goto out_msgpool;
2046
2047         osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
2048         if (IS_ERR(osdc->notify_wq)) {
2049                 err = PTR_ERR(osdc->notify_wq);
2050                 osdc->notify_wq = NULL;
2051                 goto out_msgpool;
2052         }
2053         return 0;
2054
2055 out_msgpool:
2056         ceph_msgpool_destroy(&osdc->msgpool_op);
2057 out_mempool:
2058         mempool_destroy(osdc->req_mempool);
2059 out:
2060         return err;
2061 }
2062
2063 void ceph_osdc_stop(struct ceph_osd_client *osdc)
2064 {
2065         flush_workqueue(osdc->notify_wq);
2066         destroy_workqueue(osdc->notify_wq);
2067         cancel_delayed_work_sync(&osdc->timeout_work);
2068         cancel_delayed_work_sync(&osdc->osds_timeout_work);
2069         if (osdc->osdmap) {
2070                 ceph_osdmap_destroy(osdc->osdmap);
2071                 osdc->osdmap = NULL;
2072         }
2073         remove_all_osds(osdc);
2074         mempool_destroy(osdc->req_mempool);
2075         ceph_msgpool_destroy(&osdc->msgpool_op);
2076         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
2077 }
2078
2079 /*
2080  * Read some contiguous pages.  If we cross a stripe boundary, shorten
2081  * *plen.  Return number of bytes read, or error.
2082  */
2083 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
2084                         struct ceph_vino vino, struct ceph_file_layout *layout,
2085                         u64 off, u64 *plen,
2086                         u32 truncate_seq, u64 truncate_size,
2087                         struct page **pages, int num_pages, int page_align)
2088 {
2089         struct ceph_osd_request *req;
2090         struct ceph_osd_req_op op;
2091         int rc = 0;
2092
2093         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
2094              vino.snap, off, *plen);
2095         req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, &op,
2096                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
2097                                     NULL, truncate_seq, truncate_size,
2098                                     false);
2099         if (IS_ERR(req))
2100                 return PTR_ERR(req);
2101
2102         /* it may be a short read due to an object boundary */
2103
2104         ceph_osd_data_pages_init(&req->r_data_in, pages, *plen, page_align,
2105                                 false, false);
2106
2107         dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
2108              off, *plen, *plen, page_align);
2109
2110         ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL);
2111
2112         rc = ceph_osdc_start_request(osdc, req, false);
2113         if (!rc)
2114                 rc = ceph_osdc_wait_request(osdc, req);
2115
2116         ceph_osdc_put_request(req);
2117         dout("readpages result %d\n", rc);
2118         return rc;
2119 }
2120 EXPORT_SYMBOL(ceph_osdc_readpages);
2121
2122 /*
2123  * do a synchronous write on N pages
2124  */
2125 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2126                          struct ceph_file_layout *layout,
2127                          struct ceph_snap_context *snapc,
2128                          u64 off, u64 len,
2129                          u32 truncate_seq, u64 truncate_size,
2130                          struct timespec *mtime,
2131                          struct page **pages, int num_pages)
2132 {
2133         struct ceph_osd_request *req;
2134         struct ceph_osd_req_op op;
2135         int rc = 0;
2136         int page_align = off & ~PAGE_MASK;
2137
2138         BUG_ON(vino.snap != CEPH_NOSNAP);       /* snapshots aren't writeable */
2139         req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, &op,
2140                                     CEPH_OSD_OP_WRITE,
2141                                     CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
2142                                     snapc, truncate_seq, truncate_size,
2143                                     true);
2144         if (IS_ERR(req))
2145                 return PTR_ERR(req);
2146
2147         /* it may be a short write due to an object boundary */
2148         ceph_osd_data_pages_init(&req->r_data_out, pages, len, page_align,
2149                                 false, false);
2150         dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
2151
2152         ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime);
2153
2154         rc = ceph_osdc_start_request(osdc, req, true);
2155         if (!rc)
2156                 rc = ceph_osdc_wait_request(osdc, req);
2157
2158         ceph_osdc_put_request(req);
2159         if (rc == 0)
2160                 rc = len;
2161         dout("writepages result %d\n", rc);
2162         return rc;
2163 }
2164 EXPORT_SYMBOL(ceph_osdc_writepages);
2165
2166 /*
2167  * handle incoming message
2168  */
2169 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2170 {
2171         struct ceph_osd *osd = con->private;
2172         struct ceph_osd_client *osdc;
2173         int type = le16_to_cpu(msg->hdr.type);
2174
2175         if (!osd)
2176                 goto out;
2177         osdc = osd->o_osdc;
2178
2179         switch (type) {
2180         case CEPH_MSG_OSD_MAP:
2181                 ceph_osdc_handle_map(osdc, msg);
2182                 break;
2183         case CEPH_MSG_OSD_OPREPLY:
2184                 handle_reply(osdc, msg, con);
2185                 break;
2186         case CEPH_MSG_WATCH_NOTIFY:
2187                 handle_watch_notify(osdc, msg);
2188                 break;
2189
2190         default:
2191                 pr_err("received unknown message type %d %s\n", type,
2192                        ceph_msg_type_name(type));
2193         }
2194 out:
2195         ceph_msg_put(msg);
2196 }
2197
2198 /*
2199  * lookup and return message for incoming reply.  set up reply message
2200  * pages.
2201  */
2202 static struct ceph_msg *get_reply(struct ceph_connection *con,
2203                                   struct ceph_msg_header *hdr,
2204                                   int *skip)
2205 {
2206         struct ceph_osd *osd = con->private;
2207         struct ceph_osd_client *osdc = osd->o_osdc;
2208         struct ceph_msg *m;
2209         struct ceph_osd_request *req;
2210         int front = le32_to_cpu(hdr->front_len);
2211         int data_len = le32_to_cpu(hdr->data_len);
2212         u64 tid;
2213
2214         tid = le64_to_cpu(hdr->tid);
2215         mutex_lock(&osdc->request_mutex);
2216         req = __lookup_request(osdc, tid);
2217         if (!req) {
2218                 *skip = 1;
2219                 m = NULL;
2220                 dout("get_reply unknown tid %llu from osd%d\n", tid,
2221                      osd->o_osd);
2222                 goto out;
2223         }
2224
2225         if (req->r_reply->con)
2226                 dout("%s revoking msg %p from old con %p\n", __func__,
2227                      req->r_reply, req->r_reply->con);
2228         ceph_msg_revoke_incoming(req->r_reply);
2229
2230         if (front > req->r_reply->front.iov_len) {
2231                 pr_warning("get_reply front %d > preallocated %d\n",
2232                            front, (int)req->r_reply->front.iov_len);
2233                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2234                 if (!m)
2235                         goto out;
2236                 ceph_msg_put(req->r_reply);
2237                 req->r_reply = m;
2238         }
2239         m = ceph_msg_get(req->r_reply);
2240
2241         if (data_len > 0) {
2242                 struct ceph_osd_data *osd_data = &req->r_data_in;
2243
2244                 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
2245                         if (osd_data->pages &&
2246                                 unlikely(osd_data->length < data_len)) {
2247
2248                                 pr_warning("tid %lld reply has %d bytes "
2249                                         "we had only %llu bytes ready\n",
2250                                         tid, data_len, osd_data->length);
2251                                 *skip = 1;
2252                                 ceph_msg_put(m);
2253                                 m = NULL;
2254                                 goto out;
2255                         }
2256                 }
2257         }
2258         *skip = 0;
2259         dout("get_reply tid %lld %p\n", tid, m);
2260
2261 out:
2262         mutex_unlock(&osdc->request_mutex);
2263         return m;
2264
2265 }
2266
2267 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2268                                   struct ceph_msg_header *hdr,
2269                                   int *skip)
2270 {
2271         struct ceph_osd *osd = con->private;
2272         int type = le16_to_cpu(hdr->type);
2273         int front = le32_to_cpu(hdr->front_len);
2274
2275         *skip = 0;
2276         switch (type) {
2277         case CEPH_MSG_OSD_MAP:
2278         case CEPH_MSG_WATCH_NOTIFY:
2279                 return ceph_msg_new(type, front, GFP_NOFS, false);
2280         case CEPH_MSG_OSD_OPREPLY:
2281                 return get_reply(con, hdr, skip);
2282         default:
2283                 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2284                         osd->o_osd);
2285                 *skip = 1;
2286                 return NULL;
2287         }
2288 }
2289
2290 /*
2291  * Wrappers to refcount containing ceph_osd struct
2292  */
2293 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2294 {
2295         struct ceph_osd *osd = con->private;
2296         if (get_osd(osd))
2297                 return con;
2298         return NULL;
2299 }
2300
2301 static void put_osd_con(struct ceph_connection *con)
2302 {
2303         struct ceph_osd *osd = con->private;
2304         put_osd(osd);
2305 }
2306
2307 /*
2308  * authentication
2309  */
2310 /*
2311  * Note: returned pointer is the address of a structure that's
2312  * managed separately.  Caller must *not* attempt to free it.
2313  */
2314 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2315                                         int *proto, int force_new)
2316 {
2317         struct ceph_osd *o = con->private;
2318         struct ceph_osd_client *osdc = o->o_osdc;
2319         struct ceph_auth_client *ac = osdc->client->monc.auth;
2320         struct ceph_auth_handshake *auth = &o->o_auth;
2321
2322         if (force_new && auth->authorizer) {
2323                 ceph_auth_destroy_authorizer(ac, auth->authorizer);
2324                 auth->authorizer = NULL;
2325         }
2326         if (!auth->authorizer) {
2327                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2328                                                       auth);
2329                 if (ret)
2330                         return ERR_PTR(ret);
2331         } else {
2332                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2333                                                      auth);
2334                 if (ret)
2335                         return ERR_PTR(ret);
2336         }
2337         *proto = ac->protocol;
2338
2339         return auth;
2340 }
2341
2342
2343 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2344 {
2345         struct ceph_osd *o = con->private;
2346         struct ceph_osd_client *osdc = o->o_osdc;
2347         struct ceph_auth_client *ac = osdc->client->monc.auth;
2348
2349         return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2350 }
2351
2352 static int invalidate_authorizer(struct ceph_connection *con)
2353 {
2354         struct ceph_osd *o = con->private;
2355         struct ceph_osd_client *osdc = o->o_osdc;
2356         struct ceph_auth_client *ac = osdc->client->monc.auth;
2357
2358         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2359         return ceph_monc_validate_auth(&osdc->client->monc);
2360 }
2361
2362 static const struct ceph_connection_operations osd_con_ops = {
2363         .get = get_osd_con,
2364         .put = put_osd_con,
2365         .dispatch = dispatch,
2366         .get_authorizer = get_authorizer,
2367         .verify_authorizer_reply = verify_authorizer_reply,
2368         .invalidate_authorizer = invalidate_authorizer,
2369         .alloc_msg = alloc_msg,
2370         .fault = osd_reset,
2371 };