]> Pileus Git - ~andy/linux/blob - net/ceph/osd_client.c
libceph: pass object number back to calc_layout() caller
[~andy/linux] / net / ceph / osd_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20
21 #define OSD_OP_FRONT_LEN        4096
22 #define OSD_OPREPLY_FRONT_LEN   512
23
24 static const struct ceph_connection_operations osd_con_ops;
25
26 static void __send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 static void __register_request(struct ceph_osd_client *osdc,
29                                struct ceph_osd_request *req);
30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31                                         struct ceph_osd_request *req);
32 static void __send_request(struct ceph_osd_client *osdc,
33                            struct ceph_osd_request *req);
34
35 static int op_has_extent(int op)
36 {
37         return (op == CEPH_OSD_OP_READ ||
38                 op == CEPH_OSD_OP_WRITE);
39 }
40
41 /*
42  * Implement client access to distributed object storage cluster.
43  *
44  * All data objects are stored within a cluster/cloud of OSDs, or
45  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
46  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
47  * remote daemons serving up and coordinating consistent and safe
48  * access to storage.
49  *
50  * Cluster membership and the mapping of data objects onto storage devices
51  * are described by the osd map.
52  *
53  * We keep track of pending OSD requests (read, write), resubmit
54  * requests to different OSDs when the cluster topology/data layout
55  * change, or retry the affected requests when the communications
56  * channel with an OSD is reset.
57  */
58
59 /*
60  * calculate the mapping of a file extent onto an object, and fill out the
61  * request accordingly.  shorten extent as necessary if it crosses an
62  * object boundary.
63  *
64  * fill osd op in request message.
65  */
66 static int calc_layout(struct ceph_vino vino,
67                        struct ceph_file_layout *layout,
68                        u64 off, u64 *plen,
69                        struct ceph_osd_request *req,
70                        struct ceph_osd_req_op *op, u64 *bno)
71 {
72         u64 orig_len = *plen;
73         u64 objoff = 0;
74         u64 objlen = 0;
75         int r;
76
77         /* object extent? */
78         r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
79                                           &objoff, &objlen);
80         if (r < 0)
81                 return r;
82         if (objlen < orig_len) {
83                 *plen = objlen;
84                 dout(" skipping last %llu, final file extent %llu~%llu\n",
85                      orig_len - *plen, off, *plen);
86         }
87
88         if (op_has_extent(op->op)) {
89                 u32 osize = le32_to_cpu(layout->fl_object_size);
90                 op->extent.offset = objoff;
91                 op->extent.length = objlen;
92                 if (op->extent.truncate_size <= off - objoff) {
93                         op->extent.truncate_size = 0;
94                 } else {
95                         op->extent.truncate_size -= off - objoff;
96                         if (op->extent.truncate_size > osize)
97                                 op->extent.truncate_size = osize;
98                 }
99         }
100         req->r_num_pages = calc_pages_for(off, *plen);
101         req->r_page_alignment = off & ~PAGE_MASK;
102         if (op->op == CEPH_OSD_OP_WRITE)
103                 op->payload_len = *plen;
104
105         dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
106              *bno, objoff, objlen, req->r_num_pages);
107
108         snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, *bno);
109         req->r_oid_len = strlen(req->r_oid);
110
111         return 0;
112 }
113
114 /*
115  * requests
116  */
117 void ceph_osdc_release_request(struct kref *kref)
118 {
119         struct ceph_osd_request *req = container_of(kref,
120                                                     struct ceph_osd_request,
121                                                     r_kref);
122
123         if (req->r_request)
124                 ceph_msg_put(req->r_request);
125         if (req->r_con_filling_msg) {
126                 dout("%s revoking msg %p from con %p\n", __func__,
127                      req->r_reply, req->r_con_filling_msg);
128                 ceph_msg_revoke_incoming(req->r_reply);
129                 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
130                 req->r_con_filling_msg = NULL;
131         }
132         if (req->r_reply)
133                 ceph_msg_put(req->r_reply);
134         if (req->r_own_pages)
135                 ceph_release_page_vector(req->r_pages,
136                                          req->r_num_pages);
137         ceph_put_snap_context(req->r_snapc);
138         ceph_pagelist_release(&req->r_trail);
139         if (req->r_mempool)
140                 mempool_free(req, req->r_osdc->req_mempool);
141         else
142                 kfree(req);
143 }
144 EXPORT_SYMBOL(ceph_osdc_release_request);
145
146 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
147                                                struct ceph_snap_context *snapc,
148                                                unsigned int num_ops,
149                                                bool use_mempool,
150                                                gfp_t gfp_flags)
151 {
152         struct ceph_osd_request *req;
153         struct ceph_msg *msg;
154         size_t msg_size;
155
156         msg_size = 4 + 4 + 8 + 8 + 4+8;
157         msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
158         msg_size += 1 + 8 + 4 + 4;     /* pg_t */
159         msg_size += 4 + MAX_OBJ_NAME_SIZE;
160         msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
161         msg_size += 8;  /* snapid */
162         msg_size += 8;  /* snap_seq */
163         msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
164         msg_size += 4;
165
166         if (use_mempool) {
167                 req = mempool_alloc(osdc->req_mempool, gfp_flags);
168                 memset(req, 0, sizeof(*req));
169         } else {
170                 req = kzalloc(sizeof(*req), gfp_flags);
171         }
172         if (req == NULL)
173                 return NULL;
174
175         req->r_osdc = osdc;
176         req->r_mempool = use_mempool;
177
178         kref_init(&req->r_kref);
179         init_completion(&req->r_completion);
180         init_completion(&req->r_safe_completion);
181         RB_CLEAR_NODE(&req->r_node);
182         INIT_LIST_HEAD(&req->r_unsafe_item);
183         INIT_LIST_HEAD(&req->r_linger_item);
184         INIT_LIST_HEAD(&req->r_linger_osd);
185         INIT_LIST_HEAD(&req->r_req_lru_item);
186         INIT_LIST_HEAD(&req->r_osd_item);
187
188         /* create reply message */
189         if (use_mempool)
190                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
191         else
192                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
193                                    OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
194         if (!msg) {
195                 ceph_osdc_put_request(req);
196                 return NULL;
197         }
198         req->r_reply = msg;
199
200         ceph_pagelist_init(&req->r_trail);
201
202         /* create request message; allow space for oid */
203         if (use_mempool)
204                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
205         else
206                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
207         if (!msg) {
208                 ceph_osdc_put_request(req);
209                 return NULL;
210         }
211
212         memset(msg->front.iov_base, 0, msg->front.iov_len);
213
214         req->r_request = msg;
215
216         return req;
217 }
218 EXPORT_SYMBOL(ceph_osdc_alloc_request);
219
220 static void osd_req_encode_op(struct ceph_osd_request *req,
221                               struct ceph_osd_op *dst,
222                               struct ceph_osd_req_op *src)
223 {
224         dst->op = cpu_to_le16(src->op);
225
226         switch (src->op) {
227         case CEPH_OSD_OP_STAT:
228                 break;
229         case CEPH_OSD_OP_READ:
230         case CEPH_OSD_OP_WRITE:
231                 dst->extent.offset =
232                         cpu_to_le64(src->extent.offset);
233                 dst->extent.length =
234                         cpu_to_le64(src->extent.length);
235                 dst->extent.truncate_size =
236                         cpu_to_le64(src->extent.truncate_size);
237                 dst->extent.truncate_seq =
238                         cpu_to_le32(src->extent.truncate_seq);
239                 break;
240         case CEPH_OSD_OP_CALL:
241                 dst->cls.class_len = src->cls.class_len;
242                 dst->cls.method_len = src->cls.method_len;
243                 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
244
245                 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
246                                      src->cls.class_len);
247                 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
248                                      src->cls.method_len);
249                 ceph_pagelist_append(&req->r_trail, src->cls.indata,
250                                      src->cls.indata_len);
251                 break;
252         case CEPH_OSD_OP_STARTSYNC:
253                 break;
254         case CEPH_OSD_OP_NOTIFY_ACK:
255         case CEPH_OSD_OP_WATCH:
256                 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
257                 dst->watch.ver = cpu_to_le64(src->watch.ver);
258                 dst->watch.flag = src->watch.flag;
259                 break;
260         default:
261                 pr_err("unrecognized osd opcode %d\n", dst->op);
262                 WARN_ON(1);
263                 break;
264         case CEPH_OSD_OP_MAPEXT:
265         case CEPH_OSD_OP_MASKTRUNC:
266         case CEPH_OSD_OP_SPARSE_READ:
267         case CEPH_OSD_OP_NOTIFY:
268         case CEPH_OSD_OP_ASSERT_VER:
269         case CEPH_OSD_OP_WRITEFULL:
270         case CEPH_OSD_OP_TRUNCATE:
271         case CEPH_OSD_OP_ZERO:
272         case CEPH_OSD_OP_DELETE:
273         case CEPH_OSD_OP_APPEND:
274         case CEPH_OSD_OP_SETTRUNC:
275         case CEPH_OSD_OP_TRIMTRUNC:
276         case CEPH_OSD_OP_TMAPUP:
277         case CEPH_OSD_OP_TMAPPUT:
278         case CEPH_OSD_OP_TMAPGET:
279         case CEPH_OSD_OP_CREATE:
280         case CEPH_OSD_OP_ROLLBACK:
281         case CEPH_OSD_OP_OMAPGETKEYS:
282         case CEPH_OSD_OP_OMAPGETVALS:
283         case CEPH_OSD_OP_OMAPGETHEADER:
284         case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
285         case CEPH_OSD_OP_MODE_RD:
286         case CEPH_OSD_OP_OMAPSETVALS:
287         case CEPH_OSD_OP_OMAPSETHEADER:
288         case CEPH_OSD_OP_OMAPCLEAR:
289         case CEPH_OSD_OP_OMAPRMKEYS:
290         case CEPH_OSD_OP_OMAP_CMP:
291         case CEPH_OSD_OP_CLONERANGE:
292         case CEPH_OSD_OP_ASSERT_SRC_VERSION:
293         case CEPH_OSD_OP_SRC_CMPXATTR:
294         case CEPH_OSD_OP_GETXATTR:
295         case CEPH_OSD_OP_GETXATTRS:
296         case CEPH_OSD_OP_CMPXATTR:
297         case CEPH_OSD_OP_SETXATTR:
298         case CEPH_OSD_OP_SETXATTRS:
299         case CEPH_OSD_OP_RESETXATTRS:
300         case CEPH_OSD_OP_RMXATTR:
301         case CEPH_OSD_OP_PULL:
302         case CEPH_OSD_OP_PUSH:
303         case CEPH_OSD_OP_BALANCEREADS:
304         case CEPH_OSD_OP_UNBALANCEREADS:
305         case CEPH_OSD_OP_SCRUB:
306         case CEPH_OSD_OP_SCRUB_RESERVE:
307         case CEPH_OSD_OP_SCRUB_UNRESERVE:
308         case CEPH_OSD_OP_SCRUB_STOP:
309         case CEPH_OSD_OP_SCRUB_MAP:
310         case CEPH_OSD_OP_WRLOCK:
311         case CEPH_OSD_OP_WRUNLOCK:
312         case CEPH_OSD_OP_RDLOCK:
313         case CEPH_OSD_OP_RDUNLOCK:
314         case CEPH_OSD_OP_UPLOCK:
315         case CEPH_OSD_OP_DNLOCK:
316         case CEPH_OSD_OP_PGLS:
317         case CEPH_OSD_OP_PGLS_FILTER:
318                 pr_err("unsupported osd opcode %s\n",
319                         ceph_osd_op_name(dst->op));
320                 WARN_ON(1);
321                 break;
322         }
323         dst->payload_len = cpu_to_le32(src->payload_len);
324 }
325
326 /*
327  * build new request AND message
328  *
329  */
330 void ceph_osdc_build_request(struct ceph_osd_request *req,
331                              u64 off, u64 len, unsigned int num_ops,
332                              struct ceph_osd_req_op *src_ops,
333                              struct ceph_snap_context *snapc, u64 snap_id,
334                              struct timespec *mtime)
335 {
336         struct ceph_msg *msg = req->r_request;
337         struct ceph_osd_req_op *src_op;
338         void *p;
339         size_t msg_size;
340         int flags = req->r_flags;
341         u64 data_len;
342         int i;
343
344         req->r_num_ops = num_ops;
345         req->r_snapid = snap_id;
346         req->r_snapc = ceph_get_snap_context(snapc);
347
348         /* encode request */
349         msg->hdr.version = cpu_to_le16(4);
350
351         p = msg->front.iov_base;
352         ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
353         req->r_request_osdmap_epoch = p;
354         p += 4;
355         req->r_request_flags = p;
356         p += 4;
357         if (req->r_flags & CEPH_OSD_FLAG_WRITE)
358                 ceph_encode_timespec(p, mtime);
359         p += sizeof(struct ceph_timespec);
360         req->r_request_reassert_version = p;
361         p += sizeof(struct ceph_eversion); /* will get filled in */
362
363         /* oloc */
364         ceph_encode_8(&p, 4);
365         ceph_encode_8(&p, 4);
366         ceph_encode_32(&p, 8 + 4 + 4);
367         req->r_request_pool = p;
368         p += 8;
369         ceph_encode_32(&p, -1);  /* preferred */
370         ceph_encode_32(&p, 0);   /* key len */
371
372         ceph_encode_8(&p, 1);
373         req->r_request_pgid = p;
374         p += 8 + 4;
375         ceph_encode_32(&p, -1);  /* preferred */
376
377         /* oid */
378         ceph_encode_32(&p, req->r_oid_len);
379         memcpy(p, req->r_oid, req->r_oid_len);
380         dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
381         p += req->r_oid_len;
382
383         /* ops */
384         ceph_encode_16(&p, num_ops);
385         src_op = src_ops;
386         req->r_request_ops = p;
387         for (i = 0; i < num_ops; i++, src_op++) {
388                 osd_req_encode_op(req, p, src_op);
389                 p += sizeof(struct ceph_osd_op);
390         }
391
392         /* snaps */
393         ceph_encode_64(&p, req->r_snapid);
394         ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
395         ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
396         if (req->r_snapc) {
397                 for (i = 0; i < snapc->num_snaps; i++) {
398                         ceph_encode_64(&p, req->r_snapc->snaps[i]);
399                 }
400         }
401
402         req->r_request_attempts = p;
403         p += 4;
404
405         data_len = req->r_trail.length;
406         if (flags & CEPH_OSD_FLAG_WRITE) {
407                 req->r_request->hdr.data_off = cpu_to_le16(off);
408                 data_len += len;
409         }
410         req->r_request->hdr.data_len = cpu_to_le32(data_len);
411         req->r_request->page_alignment = req->r_page_alignment;
412
413         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
414         msg_size = p - msg->front.iov_base;
415         msg->front.iov_len = msg_size;
416         msg->hdr.front_len = cpu_to_le32(msg_size);
417
418         dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
419              num_ops);
420         return;
421 }
422 EXPORT_SYMBOL(ceph_osdc_build_request);
423
424 /*
425  * build new request AND message, calculate layout, and adjust file
426  * extent as needed.
427  *
428  * if the file was recently truncated, we include information about its
429  * old and new size so that the object can be updated appropriately.  (we
430  * avoid synchronously deleting truncated objects because it's slow.)
431  *
432  * if @do_sync, include a 'startsync' command so that the osd will flush
433  * data quickly.
434  */
435 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
436                                                struct ceph_file_layout *layout,
437                                                struct ceph_vino vino,
438                                                u64 off, u64 *plen,
439                                                int opcode, int flags,
440                                                struct ceph_snap_context *snapc,
441                                                int do_sync,
442                                                u32 truncate_seq,
443                                                u64 truncate_size,
444                                                struct timespec *mtime,
445                                                bool use_mempool,
446                                                int page_align)
447 {
448         struct ceph_osd_req_op ops[2];
449         struct ceph_osd_request *req;
450         unsigned int num_op = 1;
451         u64 bno = 0;
452         int r;
453
454         memset(&ops, 0, sizeof ops);
455
456         ops[0].op = opcode;
457         ops[0].extent.truncate_seq = truncate_seq;
458         ops[0].extent.truncate_size = truncate_size;
459
460         if (do_sync) {
461                 ops[1].op = CEPH_OSD_OP_STARTSYNC;
462                 num_op++;
463         }
464
465         req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
466                                         GFP_NOFS);
467         if (!req)
468                 return ERR_PTR(-ENOMEM);
469         req->r_flags = flags;
470
471         /* calculate max write size */
472         r = calc_layout(vino, layout, off, plen, req, ops, &bno);
473         if (r < 0) {
474                 ceph_osdc_put_request(req);
475                 return ERR_PTR(r);
476         }
477
478         req->r_file_layout = *layout;  /* keep a copy */
479
480         /* in case it differs from natural (file) alignment that
481            calc_layout filled in for us */
482         req->r_num_pages = calc_pages_for(page_align, *plen);
483         req->r_page_alignment = page_align;
484
485         ceph_osdc_build_request(req, off, *plen, num_op, ops,
486                                 snapc, vino.snap, mtime);
487
488         return req;
489 }
490 EXPORT_SYMBOL(ceph_osdc_new_request);
491
492 /*
493  * We keep osd requests in an rbtree, sorted by ->r_tid.
494  */
495 static void __insert_request(struct ceph_osd_client *osdc,
496                              struct ceph_osd_request *new)
497 {
498         struct rb_node **p = &osdc->requests.rb_node;
499         struct rb_node *parent = NULL;
500         struct ceph_osd_request *req = NULL;
501
502         while (*p) {
503                 parent = *p;
504                 req = rb_entry(parent, struct ceph_osd_request, r_node);
505                 if (new->r_tid < req->r_tid)
506                         p = &(*p)->rb_left;
507                 else if (new->r_tid > req->r_tid)
508                         p = &(*p)->rb_right;
509                 else
510                         BUG();
511         }
512
513         rb_link_node(&new->r_node, parent, p);
514         rb_insert_color(&new->r_node, &osdc->requests);
515 }
516
517 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
518                                                  u64 tid)
519 {
520         struct ceph_osd_request *req;
521         struct rb_node *n = osdc->requests.rb_node;
522
523         while (n) {
524                 req = rb_entry(n, struct ceph_osd_request, r_node);
525                 if (tid < req->r_tid)
526                         n = n->rb_left;
527                 else if (tid > req->r_tid)
528                         n = n->rb_right;
529                 else
530                         return req;
531         }
532         return NULL;
533 }
534
535 static struct ceph_osd_request *
536 __lookup_request_ge(struct ceph_osd_client *osdc,
537                     u64 tid)
538 {
539         struct ceph_osd_request *req;
540         struct rb_node *n = osdc->requests.rb_node;
541
542         while (n) {
543                 req = rb_entry(n, struct ceph_osd_request, r_node);
544                 if (tid < req->r_tid) {
545                         if (!n->rb_left)
546                                 return req;
547                         n = n->rb_left;
548                 } else if (tid > req->r_tid) {
549                         n = n->rb_right;
550                 } else {
551                         return req;
552                 }
553         }
554         return NULL;
555 }
556
557 /*
558  * Resubmit requests pending on the given osd.
559  */
560 static void __kick_osd_requests(struct ceph_osd_client *osdc,
561                                 struct ceph_osd *osd)
562 {
563         struct ceph_osd_request *req, *nreq;
564         int err;
565
566         dout("__kick_osd_requests osd%d\n", osd->o_osd);
567         err = __reset_osd(osdc, osd);
568         if (err)
569                 return;
570
571         list_for_each_entry(req, &osd->o_requests, r_osd_item) {
572                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
573                 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
574                      osd->o_osd);
575                 if (!req->r_linger)
576                         req->r_flags |= CEPH_OSD_FLAG_RETRY;
577         }
578
579         list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
580                                  r_linger_osd) {
581                 /*
582                  * reregister request prior to unregistering linger so
583                  * that r_osd is preserved.
584                  */
585                 BUG_ON(!list_empty(&req->r_req_lru_item));
586                 __register_request(osdc, req);
587                 list_add(&req->r_req_lru_item, &osdc->req_unsent);
588                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
589                 __unregister_linger_request(osdc, req);
590                 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
591                      osd->o_osd);
592         }
593 }
594
595 /*
596  * If the osd connection drops, we need to resubmit all requests.
597  */
598 static void osd_reset(struct ceph_connection *con)
599 {
600         struct ceph_osd *osd = con->private;
601         struct ceph_osd_client *osdc;
602
603         if (!osd)
604                 return;
605         dout("osd_reset osd%d\n", osd->o_osd);
606         osdc = osd->o_osdc;
607         down_read(&osdc->map_sem);
608         mutex_lock(&osdc->request_mutex);
609         __kick_osd_requests(osdc, osd);
610         __send_queued(osdc);
611         mutex_unlock(&osdc->request_mutex);
612         up_read(&osdc->map_sem);
613 }
614
615 /*
616  * Track open sessions with osds.
617  */
618 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
619 {
620         struct ceph_osd *osd;
621
622         osd = kzalloc(sizeof(*osd), GFP_NOFS);
623         if (!osd)
624                 return NULL;
625
626         atomic_set(&osd->o_ref, 1);
627         osd->o_osdc = osdc;
628         osd->o_osd = onum;
629         RB_CLEAR_NODE(&osd->o_node);
630         INIT_LIST_HEAD(&osd->o_requests);
631         INIT_LIST_HEAD(&osd->o_linger_requests);
632         INIT_LIST_HEAD(&osd->o_osd_lru);
633         osd->o_incarnation = 1;
634
635         ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
636
637         INIT_LIST_HEAD(&osd->o_keepalive_item);
638         return osd;
639 }
640
641 static struct ceph_osd *get_osd(struct ceph_osd *osd)
642 {
643         if (atomic_inc_not_zero(&osd->o_ref)) {
644                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
645                      atomic_read(&osd->o_ref));
646                 return osd;
647         } else {
648                 dout("get_osd %p FAIL\n", osd);
649                 return NULL;
650         }
651 }
652
653 static void put_osd(struct ceph_osd *osd)
654 {
655         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
656              atomic_read(&osd->o_ref) - 1);
657         if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
658                 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
659
660                 if (ac->ops && ac->ops->destroy_authorizer)
661                         ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
662                 kfree(osd);
663         }
664 }
665
666 /*
667  * remove an osd from our map
668  */
669 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
670 {
671         dout("__remove_osd %p\n", osd);
672         BUG_ON(!list_empty(&osd->o_requests));
673         rb_erase(&osd->o_node, &osdc->osds);
674         list_del_init(&osd->o_osd_lru);
675         ceph_con_close(&osd->o_con);
676         put_osd(osd);
677 }
678
679 static void remove_all_osds(struct ceph_osd_client *osdc)
680 {
681         dout("%s %p\n", __func__, osdc);
682         mutex_lock(&osdc->request_mutex);
683         while (!RB_EMPTY_ROOT(&osdc->osds)) {
684                 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
685                                                 struct ceph_osd, o_node);
686                 __remove_osd(osdc, osd);
687         }
688         mutex_unlock(&osdc->request_mutex);
689 }
690
691 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
692                               struct ceph_osd *osd)
693 {
694         dout("__move_osd_to_lru %p\n", osd);
695         BUG_ON(!list_empty(&osd->o_osd_lru));
696         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
697         osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
698 }
699
700 static void __remove_osd_from_lru(struct ceph_osd *osd)
701 {
702         dout("__remove_osd_from_lru %p\n", osd);
703         if (!list_empty(&osd->o_osd_lru))
704                 list_del_init(&osd->o_osd_lru);
705 }
706
707 static void remove_old_osds(struct ceph_osd_client *osdc)
708 {
709         struct ceph_osd *osd, *nosd;
710
711         dout("__remove_old_osds %p\n", osdc);
712         mutex_lock(&osdc->request_mutex);
713         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
714                 if (time_before(jiffies, osd->lru_ttl))
715                         break;
716                 __remove_osd(osdc, osd);
717         }
718         mutex_unlock(&osdc->request_mutex);
719 }
720
721 /*
722  * reset osd connect
723  */
724 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
725 {
726         struct ceph_entity_addr *peer_addr;
727
728         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
729         if (list_empty(&osd->o_requests) &&
730             list_empty(&osd->o_linger_requests)) {
731                 __remove_osd(osdc, osd);
732
733                 return -ENODEV;
734         }
735
736         peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
737         if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
738                         !ceph_con_opened(&osd->o_con)) {
739                 struct ceph_osd_request *req;
740
741                 dout(" osd addr hasn't changed and connection never opened,"
742                      " letting msgr retry");
743                 /* touch each r_stamp for handle_timeout()'s benfit */
744                 list_for_each_entry(req, &osd->o_requests, r_osd_item)
745                         req->r_stamp = jiffies;
746
747                 return -EAGAIN;
748         }
749
750         ceph_con_close(&osd->o_con);
751         ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
752         osd->o_incarnation++;
753
754         return 0;
755 }
756
757 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
758 {
759         struct rb_node **p = &osdc->osds.rb_node;
760         struct rb_node *parent = NULL;
761         struct ceph_osd *osd = NULL;
762
763         dout("__insert_osd %p osd%d\n", new, new->o_osd);
764         while (*p) {
765                 parent = *p;
766                 osd = rb_entry(parent, struct ceph_osd, o_node);
767                 if (new->o_osd < osd->o_osd)
768                         p = &(*p)->rb_left;
769                 else if (new->o_osd > osd->o_osd)
770                         p = &(*p)->rb_right;
771                 else
772                         BUG();
773         }
774
775         rb_link_node(&new->o_node, parent, p);
776         rb_insert_color(&new->o_node, &osdc->osds);
777 }
778
779 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
780 {
781         struct ceph_osd *osd;
782         struct rb_node *n = osdc->osds.rb_node;
783
784         while (n) {
785                 osd = rb_entry(n, struct ceph_osd, o_node);
786                 if (o < osd->o_osd)
787                         n = n->rb_left;
788                 else if (o > osd->o_osd)
789                         n = n->rb_right;
790                 else
791                         return osd;
792         }
793         return NULL;
794 }
795
796 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
797 {
798         schedule_delayed_work(&osdc->timeout_work,
799                         osdc->client->options->osd_keepalive_timeout * HZ);
800 }
801
802 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
803 {
804         cancel_delayed_work(&osdc->timeout_work);
805 }
806
807 /*
808  * Register request, assign tid.  If this is the first request, set up
809  * the timeout event.
810  */
811 static void __register_request(struct ceph_osd_client *osdc,
812                                struct ceph_osd_request *req)
813 {
814         req->r_tid = ++osdc->last_tid;
815         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
816         dout("__register_request %p tid %lld\n", req, req->r_tid);
817         __insert_request(osdc, req);
818         ceph_osdc_get_request(req);
819         osdc->num_requests++;
820         if (osdc->num_requests == 1) {
821                 dout(" first request, scheduling timeout\n");
822                 __schedule_osd_timeout(osdc);
823         }
824 }
825
826 static void register_request(struct ceph_osd_client *osdc,
827                              struct ceph_osd_request *req)
828 {
829         mutex_lock(&osdc->request_mutex);
830         __register_request(osdc, req);
831         mutex_unlock(&osdc->request_mutex);
832 }
833
834 /*
835  * called under osdc->request_mutex
836  */
837 static void __unregister_request(struct ceph_osd_client *osdc,
838                                  struct ceph_osd_request *req)
839 {
840         if (RB_EMPTY_NODE(&req->r_node)) {
841                 dout("__unregister_request %p tid %lld not registered\n",
842                         req, req->r_tid);
843                 return;
844         }
845
846         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
847         rb_erase(&req->r_node, &osdc->requests);
848         osdc->num_requests--;
849
850         if (req->r_osd) {
851                 /* make sure the original request isn't in flight. */
852                 ceph_msg_revoke(req->r_request);
853
854                 list_del_init(&req->r_osd_item);
855                 if (list_empty(&req->r_osd->o_requests) &&
856                     list_empty(&req->r_osd->o_linger_requests)) {
857                         dout("moving osd to %p lru\n", req->r_osd);
858                         __move_osd_to_lru(osdc, req->r_osd);
859                 }
860                 if (list_empty(&req->r_linger_item))
861                         req->r_osd = NULL;
862         }
863
864         list_del_init(&req->r_req_lru_item);
865         ceph_osdc_put_request(req);
866
867         if (osdc->num_requests == 0) {
868                 dout(" no requests, canceling timeout\n");
869                 __cancel_osd_timeout(osdc);
870         }
871 }
872
873 /*
874  * Cancel a previously queued request message
875  */
876 static void __cancel_request(struct ceph_osd_request *req)
877 {
878         if (req->r_sent && req->r_osd) {
879                 ceph_msg_revoke(req->r_request);
880                 req->r_sent = 0;
881         }
882 }
883
884 static void __register_linger_request(struct ceph_osd_client *osdc,
885                                     struct ceph_osd_request *req)
886 {
887         dout("__register_linger_request %p\n", req);
888         list_add_tail(&req->r_linger_item, &osdc->req_linger);
889         if (req->r_osd)
890                 list_add_tail(&req->r_linger_osd,
891                               &req->r_osd->o_linger_requests);
892 }
893
894 static void __unregister_linger_request(struct ceph_osd_client *osdc,
895                                         struct ceph_osd_request *req)
896 {
897         dout("__unregister_linger_request %p\n", req);
898         list_del_init(&req->r_linger_item);
899         if (req->r_osd) {
900                 list_del_init(&req->r_linger_osd);
901
902                 if (list_empty(&req->r_osd->o_requests) &&
903                     list_empty(&req->r_osd->o_linger_requests)) {
904                         dout("moving osd to %p lru\n", req->r_osd);
905                         __move_osd_to_lru(osdc, req->r_osd);
906                 }
907                 if (list_empty(&req->r_osd_item))
908                         req->r_osd = NULL;
909         }
910 }
911
912 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
913                                          struct ceph_osd_request *req)
914 {
915         mutex_lock(&osdc->request_mutex);
916         if (req->r_linger) {
917                 __unregister_linger_request(osdc, req);
918                 ceph_osdc_put_request(req);
919         }
920         mutex_unlock(&osdc->request_mutex);
921 }
922 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
923
924 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
925                                   struct ceph_osd_request *req)
926 {
927         if (!req->r_linger) {
928                 dout("set_request_linger %p\n", req);
929                 req->r_linger = 1;
930                 /*
931                  * caller is now responsible for calling
932                  * unregister_linger_request
933                  */
934                 ceph_osdc_get_request(req);
935         }
936 }
937 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
938
939 /*
940  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
941  * (as needed), and set the request r_osd appropriately.  If there is
942  * no up osd, set r_osd to NULL.  Move the request to the appropriate list
943  * (unsent, homeless) or leave on in-flight lru.
944  *
945  * Return 0 if unchanged, 1 if changed, or negative on error.
946  *
947  * Caller should hold map_sem for read and request_mutex.
948  */
949 static int __map_request(struct ceph_osd_client *osdc,
950                          struct ceph_osd_request *req, int force_resend)
951 {
952         struct ceph_pg pgid;
953         int acting[CEPH_PG_MAX_SIZE];
954         int o = -1, num = 0;
955         int err;
956
957         dout("map_request %p tid %lld\n", req, req->r_tid);
958         err = ceph_calc_object_layout(&pgid, req->r_oid,
959                                       &req->r_file_layout, osdc->osdmap);
960         if (err) {
961                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
962                 return err;
963         }
964         req->r_pgid = pgid;
965
966         err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
967         if (err > 0) {
968                 o = acting[0];
969                 num = err;
970         }
971
972         if ((!force_resend &&
973              req->r_osd && req->r_osd->o_osd == o &&
974              req->r_sent >= req->r_osd->o_incarnation &&
975              req->r_num_pg_osds == num &&
976              memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
977             (req->r_osd == NULL && o == -1))
978                 return 0;  /* no change */
979
980         dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
981              req->r_tid, pgid.pool, pgid.seed, o,
982              req->r_osd ? req->r_osd->o_osd : -1);
983
984         /* record full pg acting set */
985         memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
986         req->r_num_pg_osds = num;
987
988         if (req->r_osd) {
989                 __cancel_request(req);
990                 list_del_init(&req->r_osd_item);
991                 req->r_osd = NULL;
992         }
993
994         req->r_osd = __lookup_osd(osdc, o);
995         if (!req->r_osd && o >= 0) {
996                 err = -ENOMEM;
997                 req->r_osd = create_osd(osdc, o);
998                 if (!req->r_osd) {
999                         list_move(&req->r_req_lru_item, &osdc->req_notarget);
1000                         goto out;
1001                 }
1002
1003                 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1004                 __insert_osd(osdc, req->r_osd);
1005
1006                 ceph_con_open(&req->r_osd->o_con,
1007                               CEPH_ENTITY_TYPE_OSD, o,
1008                               &osdc->osdmap->osd_addr[o]);
1009         }
1010
1011         if (req->r_osd) {
1012                 __remove_osd_from_lru(req->r_osd);
1013                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
1014                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
1015         } else {
1016                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1017         }
1018         err = 1;   /* osd or pg changed */
1019
1020 out:
1021         return err;
1022 }
1023
1024 /*
1025  * caller should hold map_sem (for read) and request_mutex
1026  */
1027 static void __send_request(struct ceph_osd_client *osdc,
1028                            struct ceph_osd_request *req)
1029 {
1030         void *p;
1031
1032         dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1033              req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1034              (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1035
1036         /* fill in message content that changes each time we send it */
1037         put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1038         put_unaligned_le32(req->r_flags, req->r_request_flags);
1039         put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1040         p = req->r_request_pgid;
1041         ceph_encode_64(&p, req->r_pgid.pool);
1042         ceph_encode_32(&p, req->r_pgid.seed);
1043         put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
1044         memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1045                sizeof(req->r_reassert_version));
1046
1047         req->r_stamp = jiffies;
1048         list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1049
1050         ceph_msg_get(req->r_request); /* send consumes a ref */
1051         ceph_con_send(&req->r_osd->o_con, req->r_request);
1052         req->r_sent = req->r_osd->o_incarnation;
1053 }
1054
1055 /*
1056  * Send any requests in the queue (req_unsent).
1057  */
1058 static void __send_queued(struct ceph_osd_client *osdc)
1059 {
1060         struct ceph_osd_request *req, *tmp;
1061
1062         dout("__send_queued\n");
1063         list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1064                 __send_request(osdc, req);
1065 }
1066
1067 /*
1068  * Timeout callback, called every N seconds when 1 or more osd
1069  * requests has been active for more than N seconds.  When this
1070  * happens, we ping all OSDs with requests who have timed out to
1071  * ensure any communications channel reset is detected.  Reset the
1072  * request timeouts another N seconds in the future as we go.
1073  * Reschedule the timeout event another N seconds in future (unless
1074  * there are no open requests).
1075  */
1076 static void handle_timeout(struct work_struct *work)
1077 {
1078         struct ceph_osd_client *osdc =
1079                 container_of(work, struct ceph_osd_client, timeout_work.work);
1080         struct ceph_osd_request *req;
1081         struct ceph_osd *osd;
1082         unsigned long keepalive =
1083                 osdc->client->options->osd_keepalive_timeout * HZ;
1084         struct list_head slow_osds;
1085         dout("timeout\n");
1086         down_read(&osdc->map_sem);
1087
1088         ceph_monc_request_next_osdmap(&osdc->client->monc);
1089
1090         mutex_lock(&osdc->request_mutex);
1091
1092         /*
1093          * ping osds that are a bit slow.  this ensures that if there
1094          * is a break in the TCP connection we will notice, and reopen
1095          * a connection with that osd (from the fault callback).
1096          */
1097         INIT_LIST_HEAD(&slow_osds);
1098         list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1099                 if (time_before(jiffies, req->r_stamp + keepalive))
1100                         break;
1101
1102                 osd = req->r_osd;
1103                 BUG_ON(!osd);
1104                 dout(" tid %llu is slow, will send keepalive on osd%d\n",
1105                      req->r_tid, osd->o_osd);
1106                 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1107         }
1108         while (!list_empty(&slow_osds)) {
1109                 osd = list_entry(slow_osds.next, struct ceph_osd,
1110                                  o_keepalive_item);
1111                 list_del_init(&osd->o_keepalive_item);
1112                 ceph_con_keepalive(&osd->o_con);
1113         }
1114
1115         __schedule_osd_timeout(osdc);
1116         __send_queued(osdc);
1117         mutex_unlock(&osdc->request_mutex);
1118         up_read(&osdc->map_sem);
1119 }
1120
1121 static void handle_osds_timeout(struct work_struct *work)
1122 {
1123         struct ceph_osd_client *osdc =
1124                 container_of(work, struct ceph_osd_client,
1125                              osds_timeout_work.work);
1126         unsigned long delay =
1127                 osdc->client->options->osd_idle_ttl * HZ >> 2;
1128
1129         dout("osds timeout\n");
1130         down_read(&osdc->map_sem);
1131         remove_old_osds(osdc);
1132         up_read(&osdc->map_sem);
1133
1134         schedule_delayed_work(&osdc->osds_timeout_work,
1135                               round_jiffies_relative(delay));
1136 }
1137
1138 static void complete_request(struct ceph_osd_request *req)
1139 {
1140         if (req->r_safe_callback)
1141                 req->r_safe_callback(req, NULL);
1142         complete_all(&req->r_safe_completion);  /* fsync waiter */
1143 }
1144
1145 static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1146 {
1147         __u8 v;
1148
1149         ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1150         v = ceph_decode_8(p);
1151         if (v > 1) {
1152                 pr_warning("do not understand pg encoding %d > 1", v);
1153                 return -EINVAL;
1154         }
1155         pgid->pool = ceph_decode_64(p);
1156         pgid->seed = ceph_decode_32(p);
1157         *p += 4;
1158         return 0;
1159
1160 bad:
1161         pr_warning("incomplete pg encoding");
1162         return -EINVAL;
1163 }
1164
1165 /*
1166  * handle osd op reply.  either call the callback if it is specified,
1167  * or do the completion to wake up the waiting thread.
1168  */
1169 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1170                          struct ceph_connection *con)
1171 {
1172         void *p, *end;
1173         struct ceph_osd_request *req;
1174         u64 tid;
1175         int object_len;
1176         int numops, payload_len, flags;
1177         s32 result;
1178         s32 retry_attempt;
1179         struct ceph_pg pg;
1180         int err;
1181         u32 reassert_epoch;
1182         u64 reassert_version;
1183         u32 osdmap_epoch;
1184         int i;
1185
1186         tid = le64_to_cpu(msg->hdr.tid);
1187         dout("handle_reply %p tid %llu\n", msg, tid);
1188
1189         p = msg->front.iov_base;
1190         end = p + msg->front.iov_len;
1191
1192         ceph_decode_need(&p, end, 4, bad);
1193         object_len = ceph_decode_32(&p);
1194         ceph_decode_need(&p, end, object_len, bad);
1195         p += object_len;
1196
1197         err = __decode_pgid(&p, end, &pg);
1198         if (err)
1199                 goto bad;
1200
1201         ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1202         flags = ceph_decode_64(&p);
1203         result = ceph_decode_32(&p);
1204         reassert_epoch = ceph_decode_32(&p);
1205         reassert_version = ceph_decode_64(&p);
1206         osdmap_epoch = ceph_decode_32(&p);
1207
1208         /* lookup */
1209         mutex_lock(&osdc->request_mutex);
1210         req = __lookup_request(osdc, tid);
1211         if (req == NULL) {
1212                 dout("handle_reply tid %llu dne\n", tid);
1213                 mutex_unlock(&osdc->request_mutex);
1214                 return;
1215         }
1216         ceph_osdc_get_request(req);
1217
1218         dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1219              req, result);
1220
1221         ceph_decode_need(&p, end, 4, bad);
1222         numops = ceph_decode_32(&p);
1223         if (numops > CEPH_OSD_MAX_OP)
1224                 goto bad_put;
1225         if (numops != req->r_num_ops)
1226                 goto bad_put;
1227         payload_len = 0;
1228         ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1229         for (i = 0; i < numops; i++) {
1230                 struct ceph_osd_op *op = p;
1231                 int len;
1232
1233                 len = le32_to_cpu(op->payload_len);
1234                 req->r_reply_op_len[i] = len;
1235                 dout(" op %d has %d bytes\n", i, len);
1236                 payload_len += len;
1237                 p += sizeof(*op);
1238         }
1239         if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1240                 pr_warning("sum of op payload lens %d != data_len %d",
1241                            payload_len, le32_to_cpu(msg->hdr.data_len));
1242                 goto bad_put;
1243         }
1244
1245         ceph_decode_need(&p, end, 4 + numops * 4, bad);
1246         retry_attempt = ceph_decode_32(&p);
1247         for (i = 0; i < numops; i++)
1248                 req->r_reply_op_result[i] = ceph_decode_32(&p);
1249
1250         /*
1251          * if this connection filled our message, drop our reference now, to
1252          * avoid a (safe but slower) revoke later.
1253          */
1254         if (req->r_con_filling_msg == con && req->r_reply == msg) {
1255                 dout(" dropping con_filling_msg ref %p\n", con);
1256                 req->r_con_filling_msg = NULL;
1257                 con->ops->put(con);
1258         }
1259
1260         if (!req->r_got_reply) {
1261                 unsigned int bytes;
1262
1263                 req->r_result = result;
1264                 bytes = le32_to_cpu(msg->hdr.data_len);
1265                 dout("handle_reply result %d bytes %d\n", req->r_result,
1266                      bytes);
1267                 if (req->r_result == 0)
1268                         req->r_result = bytes;
1269
1270                 /* in case this is a write and we need to replay, */
1271                 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1272                 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1273
1274                 req->r_got_reply = 1;
1275         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1276                 dout("handle_reply tid %llu dup ack\n", tid);
1277                 mutex_unlock(&osdc->request_mutex);
1278                 goto done;
1279         }
1280
1281         dout("handle_reply tid %llu flags %d\n", tid, flags);
1282
1283         if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1284                 __register_linger_request(osdc, req);
1285
1286         /* either this is a read, or we got the safe response */
1287         if (result < 0 ||
1288             (flags & CEPH_OSD_FLAG_ONDISK) ||
1289             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1290                 __unregister_request(osdc, req);
1291
1292         mutex_unlock(&osdc->request_mutex);
1293
1294         if (req->r_callback)
1295                 req->r_callback(req, msg);
1296         else
1297                 complete_all(&req->r_completion);
1298
1299         if (flags & CEPH_OSD_FLAG_ONDISK)
1300                 complete_request(req);
1301
1302 done:
1303         dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1304         ceph_osdc_put_request(req);
1305         return;
1306
1307 bad_put:
1308         ceph_osdc_put_request(req);
1309 bad:
1310         pr_err("corrupt osd_op_reply got %d %d\n",
1311                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1312         ceph_msg_dump(msg);
1313 }
1314
1315 static void reset_changed_osds(struct ceph_osd_client *osdc)
1316 {
1317         struct rb_node *p, *n;
1318
1319         for (p = rb_first(&osdc->osds); p; p = n) {
1320                 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1321
1322                 n = rb_next(p);
1323                 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1324                     memcmp(&osd->o_con.peer_addr,
1325                            ceph_osd_addr(osdc->osdmap,
1326                                          osd->o_osd),
1327                            sizeof(struct ceph_entity_addr)) != 0)
1328                         __reset_osd(osdc, osd);
1329         }
1330 }
1331
1332 /*
1333  * Requeue requests whose mapping to an OSD has changed.  If requests map to
1334  * no osd, request a new map.
1335  *
1336  * Caller should hold map_sem for read.
1337  */
1338 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1339 {
1340         struct ceph_osd_request *req, *nreq;
1341         struct rb_node *p;
1342         int needmap = 0;
1343         int err;
1344
1345         dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1346         mutex_lock(&osdc->request_mutex);
1347         for (p = rb_first(&osdc->requests); p; ) {
1348                 req = rb_entry(p, struct ceph_osd_request, r_node);
1349                 p = rb_next(p);
1350
1351                 /*
1352                  * For linger requests that have not yet been
1353                  * registered, move them to the linger list; they'll
1354                  * be sent to the osd in the loop below.  Unregister
1355                  * the request before re-registering it as a linger
1356                  * request to ensure the __map_request() below
1357                  * will decide it needs to be sent.
1358                  */
1359                 if (req->r_linger && list_empty(&req->r_linger_item)) {
1360                         dout("%p tid %llu restart on osd%d\n",
1361                              req, req->r_tid,
1362                              req->r_osd ? req->r_osd->o_osd : -1);
1363                         __unregister_request(osdc, req);
1364                         __register_linger_request(osdc, req);
1365                         continue;
1366                 }
1367
1368                 err = __map_request(osdc, req, force_resend);
1369                 if (err < 0)
1370                         continue;  /* error */
1371                 if (req->r_osd == NULL) {
1372                         dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1373                         needmap++;  /* request a newer map */
1374                 } else if (err > 0) {
1375                         if (!req->r_linger) {
1376                                 dout("%p tid %llu requeued on osd%d\n", req,
1377                                      req->r_tid,
1378                                      req->r_osd ? req->r_osd->o_osd : -1);
1379                                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1380                         }
1381                 }
1382         }
1383
1384         list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1385                                  r_linger_item) {
1386                 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1387
1388                 err = __map_request(osdc, req, force_resend);
1389                 dout("__map_request returned %d\n", err);
1390                 if (err == 0)
1391                         continue;  /* no change and no osd was specified */
1392                 if (err < 0)
1393                         continue;  /* hrm! */
1394                 if (req->r_osd == NULL) {
1395                         dout("tid %llu maps to no valid osd\n", req->r_tid);
1396                         needmap++;  /* request a newer map */
1397                         continue;
1398                 }
1399
1400                 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1401                      req->r_osd ? req->r_osd->o_osd : -1);
1402                 __register_request(osdc, req);
1403                 __unregister_linger_request(osdc, req);
1404         }
1405         mutex_unlock(&osdc->request_mutex);
1406
1407         if (needmap) {
1408                 dout("%d requests for down osds, need new map\n", needmap);
1409                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1410         }
1411         reset_changed_osds(osdc);
1412 }
1413
1414
1415 /*
1416  * Process updated osd map.
1417  *
1418  * The message contains any number of incremental and full maps, normally
1419  * indicating some sort of topology change in the cluster.  Kick requests
1420  * off to different OSDs as needed.
1421  */
1422 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1423 {
1424         void *p, *end, *next;
1425         u32 nr_maps, maplen;
1426         u32 epoch;
1427         struct ceph_osdmap *newmap = NULL, *oldmap;
1428         int err;
1429         struct ceph_fsid fsid;
1430
1431         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1432         p = msg->front.iov_base;
1433         end = p + msg->front.iov_len;
1434
1435         /* verify fsid */
1436         ceph_decode_need(&p, end, sizeof(fsid), bad);
1437         ceph_decode_copy(&p, &fsid, sizeof(fsid));
1438         if (ceph_check_fsid(osdc->client, &fsid) < 0)
1439                 return;
1440
1441         down_write(&osdc->map_sem);
1442
1443         /* incremental maps */
1444         ceph_decode_32_safe(&p, end, nr_maps, bad);
1445         dout(" %d inc maps\n", nr_maps);
1446         while (nr_maps > 0) {
1447                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1448                 epoch = ceph_decode_32(&p);
1449                 maplen = ceph_decode_32(&p);
1450                 ceph_decode_need(&p, end, maplen, bad);
1451                 next = p + maplen;
1452                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1453                         dout("applying incremental map %u len %d\n",
1454                              epoch, maplen);
1455                         newmap = osdmap_apply_incremental(&p, next,
1456                                                           osdc->osdmap,
1457                                                           &osdc->client->msgr);
1458                         if (IS_ERR(newmap)) {
1459                                 err = PTR_ERR(newmap);
1460                                 goto bad;
1461                         }
1462                         BUG_ON(!newmap);
1463                         if (newmap != osdc->osdmap) {
1464                                 ceph_osdmap_destroy(osdc->osdmap);
1465                                 osdc->osdmap = newmap;
1466                         }
1467                         kick_requests(osdc, 0);
1468                 } else {
1469                         dout("ignoring incremental map %u len %d\n",
1470                              epoch, maplen);
1471                 }
1472                 p = next;
1473                 nr_maps--;
1474         }
1475         if (newmap)
1476                 goto done;
1477
1478         /* full maps */
1479         ceph_decode_32_safe(&p, end, nr_maps, bad);
1480         dout(" %d full maps\n", nr_maps);
1481         while (nr_maps) {
1482                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1483                 epoch = ceph_decode_32(&p);
1484                 maplen = ceph_decode_32(&p);
1485                 ceph_decode_need(&p, end, maplen, bad);
1486                 if (nr_maps > 1) {
1487                         dout("skipping non-latest full map %u len %d\n",
1488                              epoch, maplen);
1489                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1490                         dout("skipping full map %u len %d, "
1491                              "older than our %u\n", epoch, maplen,
1492                              osdc->osdmap->epoch);
1493                 } else {
1494                         int skipped_map = 0;
1495
1496                         dout("taking full map %u len %d\n", epoch, maplen);
1497                         newmap = osdmap_decode(&p, p+maplen);
1498                         if (IS_ERR(newmap)) {
1499                                 err = PTR_ERR(newmap);
1500                                 goto bad;
1501                         }
1502                         BUG_ON(!newmap);
1503                         oldmap = osdc->osdmap;
1504                         osdc->osdmap = newmap;
1505                         if (oldmap) {
1506                                 if (oldmap->epoch + 1 < newmap->epoch)
1507                                         skipped_map = 1;
1508                                 ceph_osdmap_destroy(oldmap);
1509                         }
1510                         kick_requests(osdc, skipped_map);
1511                 }
1512                 p += maplen;
1513                 nr_maps--;
1514         }
1515
1516 done:
1517         downgrade_write(&osdc->map_sem);
1518         ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1519
1520         /*
1521          * subscribe to subsequent osdmap updates if full to ensure
1522          * we find out when we are no longer full and stop returning
1523          * ENOSPC.
1524          */
1525         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1526                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1527
1528         mutex_lock(&osdc->request_mutex);
1529         __send_queued(osdc);
1530         mutex_unlock(&osdc->request_mutex);
1531         up_read(&osdc->map_sem);
1532         wake_up_all(&osdc->client->auth_wq);
1533         return;
1534
1535 bad:
1536         pr_err("osdc handle_map corrupt msg\n");
1537         ceph_msg_dump(msg);
1538         up_write(&osdc->map_sem);
1539         return;
1540 }
1541
1542 /*
1543  * watch/notify callback event infrastructure
1544  *
1545  * These callbacks are used both for watch and notify operations.
1546  */
1547 static void __release_event(struct kref *kref)
1548 {
1549         struct ceph_osd_event *event =
1550                 container_of(kref, struct ceph_osd_event, kref);
1551
1552         dout("__release_event %p\n", event);
1553         kfree(event);
1554 }
1555
1556 static void get_event(struct ceph_osd_event *event)
1557 {
1558         kref_get(&event->kref);
1559 }
1560
1561 void ceph_osdc_put_event(struct ceph_osd_event *event)
1562 {
1563         kref_put(&event->kref, __release_event);
1564 }
1565 EXPORT_SYMBOL(ceph_osdc_put_event);
1566
1567 static void __insert_event(struct ceph_osd_client *osdc,
1568                              struct ceph_osd_event *new)
1569 {
1570         struct rb_node **p = &osdc->event_tree.rb_node;
1571         struct rb_node *parent = NULL;
1572         struct ceph_osd_event *event = NULL;
1573
1574         while (*p) {
1575                 parent = *p;
1576                 event = rb_entry(parent, struct ceph_osd_event, node);
1577                 if (new->cookie < event->cookie)
1578                         p = &(*p)->rb_left;
1579                 else if (new->cookie > event->cookie)
1580                         p = &(*p)->rb_right;
1581                 else
1582                         BUG();
1583         }
1584
1585         rb_link_node(&new->node, parent, p);
1586         rb_insert_color(&new->node, &osdc->event_tree);
1587 }
1588
1589 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1590                                                 u64 cookie)
1591 {
1592         struct rb_node **p = &osdc->event_tree.rb_node;
1593         struct rb_node *parent = NULL;
1594         struct ceph_osd_event *event = NULL;
1595
1596         while (*p) {
1597                 parent = *p;
1598                 event = rb_entry(parent, struct ceph_osd_event, node);
1599                 if (cookie < event->cookie)
1600                         p = &(*p)->rb_left;
1601                 else if (cookie > event->cookie)
1602                         p = &(*p)->rb_right;
1603                 else
1604                         return event;
1605         }
1606         return NULL;
1607 }
1608
1609 static void __remove_event(struct ceph_osd_event *event)
1610 {
1611         struct ceph_osd_client *osdc = event->osdc;
1612
1613         if (!RB_EMPTY_NODE(&event->node)) {
1614                 dout("__remove_event removed %p\n", event);
1615                 rb_erase(&event->node, &osdc->event_tree);
1616                 ceph_osdc_put_event(event);
1617         } else {
1618                 dout("__remove_event didn't remove %p\n", event);
1619         }
1620 }
1621
1622 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1623                            void (*event_cb)(u64, u64, u8, void *),
1624                            void *data, struct ceph_osd_event **pevent)
1625 {
1626         struct ceph_osd_event *event;
1627
1628         event = kmalloc(sizeof(*event), GFP_NOIO);
1629         if (!event)
1630                 return -ENOMEM;
1631
1632         dout("create_event %p\n", event);
1633         event->cb = event_cb;
1634         event->one_shot = 0;
1635         event->data = data;
1636         event->osdc = osdc;
1637         INIT_LIST_HEAD(&event->osd_node);
1638         RB_CLEAR_NODE(&event->node);
1639         kref_init(&event->kref);   /* one ref for us */
1640         kref_get(&event->kref);    /* one ref for the caller */
1641
1642         spin_lock(&osdc->event_lock);
1643         event->cookie = ++osdc->event_count;
1644         __insert_event(osdc, event);
1645         spin_unlock(&osdc->event_lock);
1646
1647         *pevent = event;
1648         return 0;
1649 }
1650 EXPORT_SYMBOL(ceph_osdc_create_event);
1651
1652 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1653 {
1654         struct ceph_osd_client *osdc = event->osdc;
1655
1656         dout("cancel_event %p\n", event);
1657         spin_lock(&osdc->event_lock);
1658         __remove_event(event);
1659         spin_unlock(&osdc->event_lock);
1660         ceph_osdc_put_event(event); /* caller's */
1661 }
1662 EXPORT_SYMBOL(ceph_osdc_cancel_event);
1663
1664
1665 static void do_event_work(struct work_struct *work)
1666 {
1667         struct ceph_osd_event_work *event_work =
1668                 container_of(work, struct ceph_osd_event_work, work);
1669         struct ceph_osd_event *event = event_work->event;
1670         u64 ver = event_work->ver;
1671         u64 notify_id = event_work->notify_id;
1672         u8 opcode = event_work->opcode;
1673
1674         dout("do_event_work completing %p\n", event);
1675         event->cb(ver, notify_id, opcode, event->data);
1676         dout("do_event_work completed %p\n", event);
1677         ceph_osdc_put_event(event);
1678         kfree(event_work);
1679 }
1680
1681
1682 /*
1683  * Process osd watch notifications
1684  */
1685 static void handle_watch_notify(struct ceph_osd_client *osdc,
1686                                 struct ceph_msg *msg)
1687 {
1688         void *p, *end;
1689         u8 proto_ver;
1690         u64 cookie, ver, notify_id;
1691         u8 opcode;
1692         struct ceph_osd_event *event;
1693         struct ceph_osd_event_work *event_work;
1694
1695         p = msg->front.iov_base;
1696         end = p + msg->front.iov_len;
1697
1698         ceph_decode_8_safe(&p, end, proto_ver, bad);
1699         ceph_decode_8_safe(&p, end, opcode, bad);
1700         ceph_decode_64_safe(&p, end, cookie, bad);
1701         ceph_decode_64_safe(&p, end, ver, bad);
1702         ceph_decode_64_safe(&p, end, notify_id, bad);
1703
1704         spin_lock(&osdc->event_lock);
1705         event = __find_event(osdc, cookie);
1706         if (event) {
1707                 BUG_ON(event->one_shot);
1708                 get_event(event);
1709         }
1710         spin_unlock(&osdc->event_lock);
1711         dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1712              cookie, ver, event);
1713         if (event) {
1714                 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1715                 if (!event_work) {
1716                         dout("ERROR: could not allocate event_work\n");
1717                         goto done_err;
1718                 }
1719                 INIT_WORK(&event_work->work, do_event_work);
1720                 event_work->event = event;
1721                 event_work->ver = ver;
1722                 event_work->notify_id = notify_id;
1723                 event_work->opcode = opcode;
1724                 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1725                         dout("WARNING: failed to queue notify event work\n");
1726                         goto done_err;
1727                 }
1728         }
1729
1730         return;
1731
1732 done_err:
1733         ceph_osdc_put_event(event);
1734         return;
1735
1736 bad:
1737         pr_err("osdc handle_watch_notify corrupt msg\n");
1738         return;
1739 }
1740
1741 /*
1742  * Register request, send initial attempt.
1743  */
1744 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1745                             struct ceph_osd_request *req,
1746                             bool nofail)
1747 {
1748         int rc = 0;
1749
1750         req->r_request->pages = req->r_pages;
1751         req->r_request->nr_pages = req->r_num_pages;
1752 #ifdef CONFIG_BLOCK
1753         req->r_request->bio = req->r_bio;
1754 #endif
1755         req->r_request->trail = &req->r_trail;
1756
1757         register_request(osdc, req);
1758
1759         down_read(&osdc->map_sem);
1760         mutex_lock(&osdc->request_mutex);
1761         /*
1762          * a racing kick_requests() may have sent the message for us
1763          * while we dropped request_mutex above, so only send now if
1764          * the request still han't been touched yet.
1765          */
1766         if (req->r_sent == 0) {
1767                 rc = __map_request(osdc, req, 0);
1768                 if (rc < 0) {
1769                         if (nofail) {
1770                                 dout("osdc_start_request failed map, "
1771                                      " will retry %lld\n", req->r_tid);
1772                                 rc = 0;
1773                         }
1774                         goto out_unlock;
1775                 }
1776                 if (req->r_osd == NULL) {
1777                         dout("send_request %p no up osds in pg\n", req);
1778                         ceph_monc_request_next_osdmap(&osdc->client->monc);
1779                 } else {
1780                         __send_request(osdc, req);
1781                 }
1782                 rc = 0;
1783         }
1784
1785 out_unlock:
1786         mutex_unlock(&osdc->request_mutex);
1787         up_read(&osdc->map_sem);
1788         return rc;
1789 }
1790 EXPORT_SYMBOL(ceph_osdc_start_request);
1791
1792 /*
1793  * wait for a request to complete
1794  */
1795 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1796                            struct ceph_osd_request *req)
1797 {
1798         int rc;
1799
1800         rc = wait_for_completion_interruptible(&req->r_completion);
1801         if (rc < 0) {
1802                 mutex_lock(&osdc->request_mutex);
1803                 __cancel_request(req);
1804                 __unregister_request(osdc, req);
1805                 mutex_unlock(&osdc->request_mutex);
1806                 complete_request(req);
1807                 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1808                 return rc;
1809         }
1810
1811         dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1812         return req->r_result;
1813 }
1814 EXPORT_SYMBOL(ceph_osdc_wait_request);
1815
1816 /*
1817  * sync - wait for all in-flight requests to flush.  avoid starvation.
1818  */
1819 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1820 {
1821         struct ceph_osd_request *req;
1822         u64 last_tid, next_tid = 0;
1823
1824         mutex_lock(&osdc->request_mutex);
1825         last_tid = osdc->last_tid;
1826         while (1) {
1827                 req = __lookup_request_ge(osdc, next_tid);
1828                 if (!req)
1829                         break;
1830                 if (req->r_tid > last_tid)
1831                         break;
1832
1833                 next_tid = req->r_tid + 1;
1834                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1835                         continue;
1836
1837                 ceph_osdc_get_request(req);
1838                 mutex_unlock(&osdc->request_mutex);
1839                 dout("sync waiting on tid %llu (last is %llu)\n",
1840                      req->r_tid, last_tid);
1841                 wait_for_completion(&req->r_safe_completion);
1842                 mutex_lock(&osdc->request_mutex);
1843                 ceph_osdc_put_request(req);
1844         }
1845         mutex_unlock(&osdc->request_mutex);
1846         dout("sync done (thru tid %llu)\n", last_tid);
1847 }
1848 EXPORT_SYMBOL(ceph_osdc_sync);
1849
1850 /*
1851  * init, shutdown
1852  */
1853 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1854 {
1855         int err;
1856
1857         dout("init\n");
1858         osdc->client = client;
1859         osdc->osdmap = NULL;
1860         init_rwsem(&osdc->map_sem);
1861         init_completion(&osdc->map_waiters);
1862         osdc->last_requested_map = 0;
1863         mutex_init(&osdc->request_mutex);
1864         osdc->last_tid = 0;
1865         osdc->osds = RB_ROOT;
1866         INIT_LIST_HEAD(&osdc->osd_lru);
1867         osdc->requests = RB_ROOT;
1868         INIT_LIST_HEAD(&osdc->req_lru);
1869         INIT_LIST_HEAD(&osdc->req_unsent);
1870         INIT_LIST_HEAD(&osdc->req_notarget);
1871         INIT_LIST_HEAD(&osdc->req_linger);
1872         osdc->num_requests = 0;
1873         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1874         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1875         spin_lock_init(&osdc->event_lock);
1876         osdc->event_tree = RB_ROOT;
1877         osdc->event_count = 0;
1878
1879         schedule_delayed_work(&osdc->osds_timeout_work,
1880            round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1881
1882         err = -ENOMEM;
1883         osdc->req_mempool = mempool_create_kmalloc_pool(10,
1884                                         sizeof(struct ceph_osd_request));
1885         if (!osdc->req_mempool)
1886                 goto out;
1887
1888         err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1889                                 OSD_OP_FRONT_LEN, 10, true,
1890                                 "osd_op");
1891         if (err < 0)
1892                 goto out_mempool;
1893         err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
1894                                 OSD_OPREPLY_FRONT_LEN, 10, true,
1895                                 "osd_op_reply");
1896         if (err < 0)
1897                 goto out_msgpool;
1898
1899         osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1900         if (IS_ERR(osdc->notify_wq)) {
1901                 err = PTR_ERR(osdc->notify_wq);
1902                 osdc->notify_wq = NULL;
1903                 goto out_msgpool;
1904         }
1905         return 0;
1906
1907 out_msgpool:
1908         ceph_msgpool_destroy(&osdc->msgpool_op);
1909 out_mempool:
1910         mempool_destroy(osdc->req_mempool);
1911 out:
1912         return err;
1913 }
1914
1915 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1916 {
1917         flush_workqueue(osdc->notify_wq);
1918         destroy_workqueue(osdc->notify_wq);
1919         cancel_delayed_work_sync(&osdc->timeout_work);
1920         cancel_delayed_work_sync(&osdc->osds_timeout_work);
1921         if (osdc->osdmap) {
1922                 ceph_osdmap_destroy(osdc->osdmap);
1923                 osdc->osdmap = NULL;
1924         }
1925         remove_all_osds(osdc);
1926         mempool_destroy(osdc->req_mempool);
1927         ceph_msgpool_destroy(&osdc->msgpool_op);
1928         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1929 }
1930
1931 /*
1932  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1933  * *plen.  Return number of bytes read, or error.
1934  */
1935 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1936                         struct ceph_vino vino, struct ceph_file_layout *layout,
1937                         u64 off, u64 *plen,
1938                         u32 truncate_seq, u64 truncate_size,
1939                         struct page **pages, int num_pages, int page_align)
1940 {
1941         struct ceph_osd_request *req;
1942         int rc = 0;
1943
1944         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1945              vino.snap, off, *plen);
1946         req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1947                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1948                                     NULL, 0, truncate_seq, truncate_size, NULL,
1949                                     false, page_align);
1950         if (IS_ERR(req))
1951                 return PTR_ERR(req);
1952
1953         /* it may be a short read due to an object boundary */
1954         req->r_pages = pages;
1955
1956         dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1957              off, *plen, req->r_num_pages, page_align);
1958
1959         rc = ceph_osdc_start_request(osdc, req, false);
1960         if (!rc)
1961                 rc = ceph_osdc_wait_request(osdc, req);
1962
1963         ceph_osdc_put_request(req);
1964         dout("readpages result %d\n", rc);
1965         return rc;
1966 }
1967 EXPORT_SYMBOL(ceph_osdc_readpages);
1968
1969 /*
1970  * do a synchronous write on N pages
1971  */
1972 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1973                          struct ceph_file_layout *layout,
1974                          struct ceph_snap_context *snapc,
1975                          u64 off, u64 len,
1976                          u32 truncate_seq, u64 truncate_size,
1977                          struct timespec *mtime,
1978                          struct page **pages, int num_pages)
1979 {
1980         struct ceph_osd_request *req;
1981         int rc = 0;
1982         int page_align = off & ~PAGE_MASK;
1983
1984         BUG_ON(vino.snap != CEPH_NOSNAP);
1985         req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1986                                     CEPH_OSD_OP_WRITE,
1987                                     CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1988                                     snapc, 0,
1989                                     truncate_seq, truncate_size, mtime,
1990                                     true, page_align);
1991         if (IS_ERR(req))
1992                 return PTR_ERR(req);
1993
1994         /* it may be a short write due to an object boundary */
1995         req->r_pages = pages;
1996         dout("writepages %llu~%llu (%d pages)\n", off, len,
1997              req->r_num_pages);
1998
1999         rc = ceph_osdc_start_request(osdc, req, true);
2000         if (!rc)
2001                 rc = ceph_osdc_wait_request(osdc, req);
2002
2003         ceph_osdc_put_request(req);
2004         if (rc == 0)
2005                 rc = len;
2006         dout("writepages result %d\n", rc);
2007         return rc;
2008 }
2009 EXPORT_SYMBOL(ceph_osdc_writepages);
2010
2011 /*
2012  * handle incoming message
2013  */
2014 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2015 {
2016         struct ceph_osd *osd = con->private;
2017         struct ceph_osd_client *osdc;
2018         int type = le16_to_cpu(msg->hdr.type);
2019
2020         if (!osd)
2021                 goto out;
2022         osdc = osd->o_osdc;
2023
2024         switch (type) {
2025         case CEPH_MSG_OSD_MAP:
2026                 ceph_osdc_handle_map(osdc, msg);
2027                 break;
2028         case CEPH_MSG_OSD_OPREPLY:
2029                 handle_reply(osdc, msg, con);
2030                 break;
2031         case CEPH_MSG_WATCH_NOTIFY:
2032                 handle_watch_notify(osdc, msg);
2033                 break;
2034
2035         default:
2036                 pr_err("received unknown message type %d %s\n", type,
2037                        ceph_msg_type_name(type));
2038         }
2039 out:
2040         ceph_msg_put(msg);
2041 }
2042
2043 /*
2044  * lookup and return message for incoming reply.  set up reply message
2045  * pages.
2046  */
2047 static struct ceph_msg *get_reply(struct ceph_connection *con,
2048                                   struct ceph_msg_header *hdr,
2049                                   int *skip)
2050 {
2051         struct ceph_osd *osd = con->private;
2052         struct ceph_osd_client *osdc = osd->o_osdc;
2053         struct ceph_msg *m;
2054         struct ceph_osd_request *req;
2055         int front = le32_to_cpu(hdr->front_len);
2056         int data_len = le32_to_cpu(hdr->data_len);
2057         u64 tid;
2058
2059         tid = le64_to_cpu(hdr->tid);
2060         mutex_lock(&osdc->request_mutex);
2061         req = __lookup_request(osdc, tid);
2062         if (!req) {
2063                 *skip = 1;
2064                 m = NULL;
2065                 dout("get_reply unknown tid %llu from osd%d\n", tid,
2066                      osd->o_osd);
2067                 goto out;
2068         }
2069
2070         if (req->r_con_filling_msg) {
2071                 dout("%s revoking msg %p from old con %p\n", __func__,
2072                      req->r_reply, req->r_con_filling_msg);
2073                 ceph_msg_revoke_incoming(req->r_reply);
2074                 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2075                 req->r_con_filling_msg = NULL;
2076         }
2077
2078         if (front > req->r_reply->front.iov_len) {
2079                 pr_warning("get_reply front %d > preallocated %d\n",
2080                            front, (int)req->r_reply->front.iov_len);
2081                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2082                 if (!m)
2083                         goto out;
2084                 ceph_msg_put(req->r_reply);
2085                 req->r_reply = m;
2086         }
2087         m = ceph_msg_get(req->r_reply);
2088
2089         if (data_len > 0) {
2090                 int want = calc_pages_for(req->r_page_alignment, data_len);
2091
2092                 if (req->r_pages && unlikely(req->r_num_pages < want)) {
2093                         pr_warning("tid %lld reply has %d bytes %d pages, we"
2094                                    " had only %d pages ready\n", tid, data_len,
2095                                    want, req->r_num_pages);
2096                         *skip = 1;
2097                         ceph_msg_put(m);
2098                         m = NULL;
2099                         goto out;
2100                 }
2101                 m->pages = req->r_pages;
2102                 m->nr_pages = req->r_num_pages;
2103                 m->page_alignment = req->r_page_alignment;
2104 #ifdef CONFIG_BLOCK
2105                 m->bio = req->r_bio;
2106 #endif
2107         }
2108         *skip = 0;
2109         req->r_con_filling_msg = con->ops->get(con);
2110         dout("get_reply tid %lld %p\n", tid, m);
2111
2112 out:
2113         mutex_unlock(&osdc->request_mutex);
2114         return m;
2115
2116 }
2117
2118 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2119                                   struct ceph_msg_header *hdr,
2120                                   int *skip)
2121 {
2122         struct ceph_osd *osd = con->private;
2123         int type = le16_to_cpu(hdr->type);
2124         int front = le32_to_cpu(hdr->front_len);
2125
2126         *skip = 0;
2127         switch (type) {
2128         case CEPH_MSG_OSD_MAP:
2129         case CEPH_MSG_WATCH_NOTIFY:
2130                 return ceph_msg_new(type, front, GFP_NOFS, false);
2131         case CEPH_MSG_OSD_OPREPLY:
2132                 return get_reply(con, hdr, skip);
2133         default:
2134                 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2135                         osd->o_osd);
2136                 *skip = 1;
2137                 return NULL;
2138         }
2139 }
2140
2141 /*
2142  * Wrappers to refcount containing ceph_osd struct
2143  */
2144 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2145 {
2146         struct ceph_osd *osd = con->private;
2147         if (get_osd(osd))
2148                 return con;
2149         return NULL;
2150 }
2151
2152 static void put_osd_con(struct ceph_connection *con)
2153 {
2154         struct ceph_osd *osd = con->private;
2155         put_osd(osd);
2156 }
2157
2158 /*
2159  * authentication
2160  */
2161 /*
2162  * Note: returned pointer is the address of a structure that's
2163  * managed separately.  Caller must *not* attempt to free it.
2164  */
2165 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2166                                         int *proto, int force_new)
2167 {
2168         struct ceph_osd *o = con->private;
2169         struct ceph_osd_client *osdc = o->o_osdc;
2170         struct ceph_auth_client *ac = osdc->client->monc.auth;
2171         struct ceph_auth_handshake *auth = &o->o_auth;
2172
2173         if (force_new && auth->authorizer) {
2174                 if (ac->ops && ac->ops->destroy_authorizer)
2175                         ac->ops->destroy_authorizer(ac, auth->authorizer);
2176                 auth->authorizer = NULL;
2177         }
2178         if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2179                 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2180                                                         auth);
2181                 if (ret)
2182                         return ERR_PTR(ret);
2183         }
2184         *proto = ac->protocol;
2185
2186         return auth;
2187 }
2188
2189
2190 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2191 {
2192         struct ceph_osd *o = con->private;
2193         struct ceph_osd_client *osdc = o->o_osdc;
2194         struct ceph_auth_client *ac = osdc->client->monc.auth;
2195
2196         /*
2197          * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2198          * XXX which do we do:  succeed or fail?
2199          */
2200         return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2201 }
2202
2203 static int invalidate_authorizer(struct ceph_connection *con)
2204 {
2205         struct ceph_osd *o = con->private;
2206         struct ceph_osd_client *osdc = o->o_osdc;
2207         struct ceph_auth_client *ac = osdc->client->monc.auth;
2208
2209         if (ac->ops && ac->ops->invalidate_authorizer)
2210                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2211
2212         return ceph_monc_validate_auth(&osdc->client->monc);
2213 }
2214
2215 static const struct ceph_connection_operations osd_con_ops = {
2216         .get = get_osd_con,
2217         .put = put_osd_con,
2218         .dispatch = dispatch,
2219         .get_authorizer = get_authorizer,
2220         .verify_authorizer_reply = verify_authorizer_reply,
2221         .invalidate_authorizer = invalidate_authorizer,
2222         .alloc_msg = alloc_msg,
2223         .fault = osd_reset,
2224 };