]> Pileus Git - ~andy/linux/blob - net/ceph/osd_client.c
Merge branch 'master' of git://git.kernel.org/pub/scm/linux/kernel/git/klassert/ipsec
[~andy/linux] / net / ceph / osd_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20
21 #define OSD_OP_FRONT_LEN        4096
22 #define OSD_OPREPLY_FRONT_LEN   512
23
24 static const struct ceph_connection_operations osd_con_ops;
25
26 static void __send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 static void __register_request(struct ceph_osd_client *osdc,
29                                struct ceph_osd_request *req);
30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31                                         struct ceph_osd_request *req);
32 static void __send_request(struct ceph_osd_client *osdc,
33                            struct ceph_osd_request *req);
34
35 static int op_has_extent(int op)
36 {
37         return (op == CEPH_OSD_OP_READ ||
38                 op == CEPH_OSD_OP_WRITE);
39 }
40
41 /*
42  * Implement client access to distributed object storage cluster.
43  *
44  * All data objects are stored within a cluster/cloud of OSDs, or
45  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
46  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
47  * remote daemons serving up and coordinating consistent and safe
48  * access to storage.
49  *
50  * Cluster membership and the mapping of data objects onto storage devices
51  * are described by the osd map.
52  *
53  * We keep track of pending OSD requests (read, write), resubmit
54  * requests to different OSDs when the cluster topology/data layout
55  * change, or retry the affected requests when the communications
56  * channel with an OSD is reset.
57  */
58
59 /*
60  * calculate the mapping of a file extent onto an object, and fill out the
61  * request accordingly.  shorten extent as necessary if it crosses an
62  * object boundary.
63  *
64  * fill osd op in request message.
65  */
66 static int calc_layout(struct ceph_vino vino,
67                        struct ceph_file_layout *layout,
68                        u64 off, u64 *plen,
69                        struct ceph_osd_request *req,
70                        struct ceph_osd_req_op *op)
71 {
72         u64 orig_len = *plen;
73         u64 bno = 0;
74         u64 objoff = 0;
75         u64 objlen = 0;
76         int r;
77
78         /* object extent? */
79         r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
80                                           &objoff, &objlen);
81         if (r < 0)
82                 return r;
83         if (objlen < orig_len) {
84                 *plen = objlen;
85                 dout(" skipping last %llu, final file extent %llu~%llu\n",
86                      orig_len - *plen, off, *plen);
87         }
88
89         if (op_has_extent(op->op)) {
90                 u32 osize = le32_to_cpu(layout->fl_object_size);
91                 op->extent.offset = objoff;
92                 op->extent.length = objlen;
93                 if (op->extent.truncate_size <= off - objoff) {
94                         op->extent.truncate_size = 0;
95                 } else {
96                         op->extent.truncate_size -= off - objoff;
97                         if (op->extent.truncate_size > osize)
98                                 op->extent.truncate_size = osize;
99                 }
100         }
101         req->r_num_pages = calc_pages_for(off, *plen);
102         req->r_page_alignment = off & ~PAGE_MASK;
103         if (op->op == CEPH_OSD_OP_WRITE)
104                 op->payload_len = *plen;
105
106         dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
107              bno, objoff, objlen, req->r_num_pages);
108
109         snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
110         req->r_oid_len = strlen(req->r_oid);
111
112         return r;
113 }
114
115 /*
116  * requests
117  */
118 void ceph_osdc_release_request(struct kref *kref)
119 {
120         struct ceph_osd_request *req = container_of(kref,
121                                                     struct ceph_osd_request,
122                                                     r_kref);
123
124         if (req->r_request)
125                 ceph_msg_put(req->r_request);
126         if (req->r_con_filling_msg) {
127                 dout("%s revoking msg %p from con %p\n", __func__,
128                      req->r_reply, req->r_con_filling_msg);
129                 ceph_msg_revoke_incoming(req->r_reply);
130                 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131                 req->r_con_filling_msg = NULL;
132         }
133         if (req->r_reply)
134                 ceph_msg_put(req->r_reply);
135         if (req->r_own_pages)
136                 ceph_release_page_vector(req->r_pages,
137                                          req->r_num_pages);
138         ceph_put_snap_context(req->r_snapc);
139         ceph_pagelist_release(&req->r_trail);
140         if (req->r_mempool)
141                 mempool_free(req, req->r_osdc->req_mempool);
142         else
143                 kfree(req);
144 }
145 EXPORT_SYMBOL(ceph_osdc_release_request);
146
147 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
148                                                struct ceph_snap_context *snapc,
149                                                unsigned int num_ops,
150                                                bool use_mempool,
151                                                gfp_t gfp_flags)
152 {
153         struct ceph_osd_request *req;
154         struct ceph_msg *msg;
155         size_t msg_size;
156
157         msg_size = 4 + 4 + 8 + 8 + 4+8;
158         msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159         msg_size += 1 + 8 + 4 + 4;     /* pg_t */
160         msg_size += 4 + MAX_OBJ_NAME_SIZE;
161         msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162         msg_size += 8;  /* snapid */
163         msg_size += 8;  /* snap_seq */
164         msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
165         msg_size += 4;
166
167         if (use_mempool) {
168                 req = mempool_alloc(osdc->req_mempool, gfp_flags);
169                 memset(req, 0, sizeof(*req));
170         } else {
171                 req = kzalloc(sizeof(*req), gfp_flags);
172         }
173         if (req == NULL)
174                 return NULL;
175
176         req->r_osdc = osdc;
177         req->r_mempool = use_mempool;
178
179         kref_init(&req->r_kref);
180         init_completion(&req->r_completion);
181         init_completion(&req->r_safe_completion);
182         RB_CLEAR_NODE(&req->r_node);
183         INIT_LIST_HEAD(&req->r_unsafe_item);
184         INIT_LIST_HEAD(&req->r_linger_item);
185         INIT_LIST_HEAD(&req->r_linger_osd);
186         INIT_LIST_HEAD(&req->r_req_lru_item);
187         INIT_LIST_HEAD(&req->r_osd_item);
188
189         /* create reply message */
190         if (use_mempool)
191                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
192         else
193                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
194                                    OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
195         if (!msg) {
196                 ceph_osdc_put_request(req);
197                 return NULL;
198         }
199         req->r_reply = msg;
200
201         ceph_pagelist_init(&req->r_trail);
202
203         /* create request message; allow space for oid */
204         if (use_mempool)
205                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
206         else
207                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
208         if (!msg) {
209                 ceph_osdc_put_request(req);
210                 return NULL;
211         }
212
213         memset(msg->front.iov_base, 0, msg->front.iov_len);
214
215         req->r_request = msg;
216
217         return req;
218 }
219 EXPORT_SYMBOL(ceph_osdc_alloc_request);
220
221 static void osd_req_encode_op(struct ceph_osd_request *req,
222                               struct ceph_osd_op *dst,
223                               struct ceph_osd_req_op *src)
224 {
225         dst->op = cpu_to_le16(src->op);
226
227         switch (src->op) {
228         case CEPH_OSD_OP_STAT:
229                 break;
230         case CEPH_OSD_OP_READ:
231         case CEPH_OSD_OP_WRITE:
232                 dst->extent.offset =
233                         cpu_to_le64(src->extent.offset);
234                 dst->extent.length =
235                         cpu_to_le64(src->extent.length);
236                 dst->extent.truncate_size =
237                         cpu_to_le64(src->extent.truncate_size);
238                 dst->extent.truncate_seq =
239                         cpu_to_le32(src->extent.truncate_seq);
240                 break;
241         case CEPH_OSD_OP_CALL:
242                 dst->cls.class_len = src->cls.class_len;
243                 dst->cls.method_len = src->cls.method_len;
244                 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
245
246                 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
247                                      src->cls.class_len);
248                 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
249                                      src->cls.method_len);
250                 ceph_pagelist_append(&req->r_trail, src->cls.indata,
251                                      src->cls.indata_len);
252                 break;
253         case CEPH_OSD_OP_STARTSYNC:
254                 break;
255         case CEPH_OSD_OP_NOTIFY_ACK:
256         case CEPH_OSD_OP_WATCH:
257                 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
258                 dst->watch.ver = cpu_to_le64(src->watch.ver);
259                 dst->watch.flag = src->watch.flag;
260                 break;
261         default:
262                 pr_err("unrecognized osd opcode %d\n", dst->op);
263                 WARN_ON(1);
264                 break;
265         case CEPH_OSD_OP_MAPEXT:
266         case CEPH_OSD_OP_MASKTRUNC:
267         case CEPH_OSD_OP_SPARSE_READ:
268         case CEPH_OSD_OP_NOTIFY:
269         case CEPH_OSD_OP_ASSERT_VER:
270         case CEPH_OSD_OP_WRITEFULL:
271         case CEPH_OSD_OP_TRUNCATE:
272         case CEPH_OSD_OP_ZERO:
273         case CEPH_OSD_OP_DELETE:
274         case CEPH_OSD_OP_APPEND:
275         case CEPH_OSD_OP_SETTRUNC:
276         case CEPH_OSD_OP_TRIMTRUNC:
277         case CEPH_OSD_OP_TMAPUP:
278         case CEPH_OSD_OP_TMAPPUT:
279         case CEPH_OSD_OP_TMAPGET:
280         case CEPH_OSD_OP_CREATE:
281         case CEPH_OSD_OP_ROLLBACK:
282         case CEPH_OSD_OP_OMAPGETKEYS:
283         case CEPH_OSD_OP_OMAPGETVALS:
284         case CEPH_OSD_OP_OMAPGETHEADER:
285         case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
286         case CEPH_OSD_OP_MODE_RD:
287         case CEPH_OSD_OP_OMAPSETVALS:
288         case CEPH_OSD_OP_OMAPSETHEADER:
289         case CEPH_OSD_OP_OMAPCLEAR:
290         case CEPH_OSD_OP_OMAPRMKEYS:
291         case CEPH_OSD_OP_OMAP_CMP:
292         case CEPH_OSD_OP_CLONERANGE:
293         case CEPH_OSD_OP_ASSERT_SRC_VERSION:
294         case CEPH_OSD_OP_SRC_CMPXATTR:
295         case CEPH_OSD_OP_GETXATTR:
296         case CEPH_OSD_OP_GETXATTRS:
297         case CEPH_OSD_OP_CMPXATTR:
298         case CEPH_OSD_OP_SETXATTR:
299         case CEPH_OSD_OP_SETXATTRS:
300         case CEPH_OSD_OP_RESETXATTRS:
301         case CEPH_OSD_OP_RMXATTR:
302         case CEPH_OSD_OP_PULL:
303         case CEPH_OSD_OP_PUSH:
304         case CEPH_OSD_OP_BALANCEREADS:
305         case CEPH_OSD_OP_UNBALANCEREADS:
306         case CEPH_OSD_OP_SCRUB:
307         case CEPH_OSD_OP_SCRUB_RESERVE:
308         case CEPH_OSD_OP_SCRUB_UNRESERVE:
309         case CEPH_OSD_OP_SCRUB_STOP:
310         case CEPH_OSD_OP_SCRUB_MAP:
311         case CEPH_OSD_OP_WRLOCK:
312         case CEPH_OSD_OP_WRUNLOCK:
313         case CEPH_OSD_OP_RDLOCK:
314         case CEPH_OSD_OP_RDUNLOCK:
315         case CEPH_OSD_OP_UPLOCK:
316         case CEPH_OSD_OP_DNLOCK:
317         case CEPH_OSD_OP_PGLS:
318         case CEPH_OSD_OP_PGLS_FILTER:
319                 pr_err("unsupported osd opcode %s\n",
320                         ceph_osd_op_name(dst->op));
321                 WARN_ON(1);
322                 break;
323         }
324         dst->payload_len = cpu_to_le32(src->payload_len);
325 }
326
327 /*
328  * build new request AND message
329  *
330  */
331 void ceph_osdc_build_request(struct ceph_osd_request *req,
332                              u64 off, u64 len, unsigned int num_ops,
333                              struct ceph_osd_req_op *src_ops,
334                              struct ceph_snap_context *snapc, u64 snap_id,
335                              struct timespec *mtime)
336 {
337         struct ceph_msg *msg = req->r_request;
338         struct ceph_osd_req_op *src_op;
339         void *p;
340         size_t msg_size;
341         int flags = req->r_flags;
342         u64 data_len;
343         int i;
344
345         req->r_num_ops = num_ops;
346         req->r_snapid = snap_id;
347         req->r_snapc = ceph_get_snap_context(snapc);
348
349         /* encode request */
350         msg->hdr.version = cpu_to_le16(4);
351
352         p = msg->front.iov_base;
353         ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
354         req->r_request_osdmap_epoch = p;
355         p += 4;
356         req->r_request_flags = p;
357         p += 4;
358         if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359                 ceph_encode_timespec(p, mtime);
360         p += sizeof(struct ceph_timespec);
361         req->r_request_reassert_version = p;
362         p += sizeof(struct ceph_eversion); /* will get filled in */
363
364         /* oloc */
365         ceph_encode_8(&p, 4);
366         ceph_encode_8(&p, 4);
367         ceph_encode_32(&p, 8 + 4 + 4);
368         req->r_request_pool = p;
369         p += 8;
370         ceph_encode_32(&p, -1);  /* preferred */
371         ceph_encode_32(&p, 0);   /* key len */
372
373         ceph_encode_8(&p, 1);
374         req->r_request_pgid = p;
375         p += 8 + 4;
376         ceph_encode_32(&p, -1);  /* preferred */
377
378         /* oid */
379         ceph_encode_32(&p, req->r_oid_len);
380         memcpy(p, req->r_oid, req->r_oid_len);
381         dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
382         p += req->r_oid_len;
383
384         /* ops */
385         ceph_encode_16(&p, num_ops);
386         src_op = src_ops;
387         req->r_request_ops = p;
388         for (i = 0; i < num_ops; i++, src_op++) {
389                 osd_req_encode_op(req, p, src_op);
390                 p += sizeof(struct ceph_osd_op);
391         }
392
393         /* snaps */
394         ceph_encode_64(&p, req->r_snapid);
395         ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
396         ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
397         if (req->r_snapc) {
398                 for (i = 0; i < snapc->num_snaps; i++) {
399                         ceph_encode_64(&p, req->r_snapc->snaps[i]);
400                 }
401         }
402
403         req->r_request_attempts = p;
404         p += 4;
405
406         data_len = req->r_trail.length;
407         if (flags & CEPH_OSD_FLAG_WRITE) {
408                 req->r_request->hdr.data_off = cpu_to_le16(off);
409                 data_len += len;
410         }
411         req->r_request->hdr.data_len = cpu_to_le32(data_len);
412         req->r_request->page_alignment = req->r_page_alignment;
413
414         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
415         msg_size = p - msg->front.iov_base;
416         msg->front.iov_len = msg_size;
417         msg->hdr.front_len = cpu_to_le32(msg_size);
418
419         dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420              num_ops);
421         return;
422 }
423 EXPORT_SYMBOL(ceph_osdc_build_request);
424
425 /*
426  * build new request AND message, calculate layout, and adjust file
427  * extent as needed.
428  *
429  * if the file was recently truncated, we include information about its
430  * old and new size so that the object can be updated appropriately.  (we
431  * avoid synchronously deleting truncated objects because it's slow.)
432  *
433  * if @do_sync, include a 'startsync' command so that the osd will flush
434  * data quickly.
435  */
436 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
437                                                struct ceph_file_layout *layout,
438                                                struct ceph_vino vino,
439                                                u64 off, u64 *plen,
440                                                int opcode, int flags,
441                                                struct ceph_snap_context *snapc,
442                                                int do_sync,
443                                                u32 truncate_seq,
444                                                u64 truncate_size,
445                                                struct timespec *mtime,
446                                                bool use_mempool,
447                                                int page_align)
448 {
449         struct ceph_osd_req_op ops[2];
450         struct ceph_osd_request *req;
451         unsigned int num_op = 1;
452         int r;
453
454         memset(&ops, 0, sizeof ops);
455
456         ops[0].op = opcode;
457         ops[0].extent.truncate_seq = truncate_seq;
458         ops[0].extent.truncate_size = truncate_size;
459
460         if (do_sync) {
461                 ops[1].op = CEPH_OSD_OP_STARTSYNC;
462                 num_op++;
463         }
464
465         req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
466                                         GFP_NOFS);
467         if (!req)
468                 return ERR_PTR(-ENOMEM);
469         req->r_flags = flags;
470
471         /* calculate max write size */
472         r = calc_layout(vino, layout, off, plen, req, ops);
473         if (r < 0)
474                 return ERR_PTR(r);
475         req->r_file_layout = *layout;  /* keep a copy */
476
477         /* in case it differs from natural (file) alignment that
478            calc_layout filled in for us */
479         req->r_num_pages = calc_pages_for(page_align, *plen);
480         req->r_page_alignment = page_align;
481
482         ceph_osdc_build_request(req, off, *plen, num_op, ops,
483                                 snapc, vino.snap, mtime);
484
485         return req;
486 }
487 EXPORT_SYMBOL(ceph_osdc_new_request);
488
489 /*
490  * We keep osd requests in an rbtree, sorted by ->r_tid.
491  */
492 static void __insert_request(struct ceph_osd_client *osdc,
493                              struct ceph_osd_request *new)
494 {
495         struct rb_node **p = &osdc->requests.rb_node;
496         struct rb_node *parent = NULL;
497         struct ceph_osd_request *req = NULL;
498
499         while (*p) {
500                 parent = *p;
501                 req = rb_entry(parent, struct ceph_osd_request, r_node);
502                 if (new->r_tid < req->r_tid)
503                         p = &(*p)->rb_left;
504                 else if (new->r_tid > req->r_tid)
505                         p = &(*p)->rb_right;
506                 else
507                         BUG();
508         }
509
510         rb_link_node(&new->r_node, parent, p);
511         rb_insert_color(&new->r_node, &osdc->requests);
512 }
513
514 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
515                                                  u64 tid)
516 {
517         struct ceph_osd_request *req;
518         struct rb_node *n = osdc->requests.rb_node;
519
520         while (n) {
521                 req = rb_entry(n, struct ceph_osd_request, r_node);
522                 if (tid < req->r_tid)
523                         n = n->rb_left;
524                 else if (tid > req->r_tid)
525                         n = n->rb_right;
526                 else
527                         return req;
528         }
529         return NULL;
530 }
531
532 static struct ceph_osd_request *
533 __lookup_request_ge(struct ceph_osd_client *osdc,
534                     u64 tid)
535 {
536         struct ceph_osd_request *req;
537         struct rb_node *n = osdc->requests.rb_node;
538
539         while (n) {
540                 req = rb_entry(n, struct ceph_osd_request, r_node);
541                 if (tid < req->r_tid) {
542                         if (!n->rb_left)
543                                 return req;
544                         n = n->rb_left;
545                 } else if (tid > req->r_tid) {
546                         n = n->rb_right;
547                 } else {
548                         return req;
549                 }
550         }
551         return NULL;
552 }
553
554 /*
555  * Resubmit requests pending on the given osd.
556  */
557 static void __kick_osd_requests(struct ceph_osd_client *osdc,
558                                 struct ceph_osd *osd)
559 {
560         struct ceph_osd_request *req, *nreq;
561         int err;
562
563         dout("__kick_osd_requests osd%d\n", osd->o_osd);
564         err = __reset_osd(osdc, osd);
565         if (err)
566                 return;
567
568         list_for_each_entry(req, &osd->o_requests, r_osd_item) {
569                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
570                 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
571                      osd->o_osd);
572                 if (!req->r_linger)
573                         req->r_flags |= CEPH_OSD_FLAG_RETRY;
574         }
575
576         list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
577                                  r_linger_osd) {
578                 /*
579                  * reregister request prior to unregistering linger so
580                  * that r_osd is preserved.
581                  */
582                 BUG_ON(!list_empty(&req->r_req_lru_item));
583                 __register_request(osdc, req);
584                 list_add(&req->r_req_lru_item, &osdc->req_unsent);
585                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
586                 __unregister_linger_request(osdc, req);
587                 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
588                      osd->o_osd);
589         }
590 }
591
592 /*
593  * If the osd connection drops, we need to resubmit all requests.
594  */
595 static void osd_reset(struct ceph_connection *con)
596 {
597         struct ceph_osd *osd = con->private;
598         struct ceph_osd_client *osdc;
599
600         if (!osd)
601                 return;
602         dout("osd_reset osd%d\n", osd->o_osd);
603         osdc = osd->o_osdc;
604         down_read(&osdc->map_sem);
605         mutex_lock(&osdc->request_mutex);
606         __kick_osd_requests(osdc, osd);
607         __send_queued(osdc);
608         mutex_unlock(&osdc->request_mutex);
609         up_read(&osdc->map_sem);
610 }
611
612 /*
613  * Track open sessions with osds.
614  */
615 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
616 {
617         struct ceph_osd *osd;
618
619         osd = kzalloc(sizeof(*osd), GFP_NOFS);
620         if (!osd)
621                 return NULL;
622
623         atomic_set(&osd->o_ref, 1);
624         osd->o_osdc = osdc;
625         osd->o_osd = onum;
626         RB_CLEAR_NODE(&osd->o_node);
627         INIT_LIST_HEAD(&osd->o_requests);
628         INIT_LIST_HEAD(&osd->o_linger_requests);
629         INIT_LIST_HEAD(&osd->o_osd_lru);
630         osd->o_incarnation = 1;
631
632         ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
633
634         INIT_LIST_HEAD(&osd->o_keepalive_item);
635         return osd;
636 }
637
638 static struct ceph_osd *get_osd(struct ceph_osd *osd)
639 {
640         if (atomic_inc_not_zero(&osd->o_ref)) {
641                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
642                      atomic_read(&osd->o_ref));
643                 return osd;
644         } else {
645                 dout("get_osd %p FAIL\n", osd);
646                 return NULL;
647         }
648 }
649
650 static void put_osd(struct ceph_osd *osd)
651 {
652         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
653              atomic_read(&osd->o_ref) - 1);
654         if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
655                 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
656
657                 if (ac->ops && ac->ops->destroy_authorizer)
658                         ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
659                 kfree(osd);
660         }
661 }
662
663 /*
664  * remove an osd from our map
665  */
666 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
667 {
668         dout("__remove_osd %p\n", osd);
669         BUG_ON(!list_empty(&osd->o_requests));
670         rb_erase(&osd->o_node, &osdc->osds);
671         list_del_init(&osd->o_osd_lru);
672         ceph_con_close(&osd->o_con);
673         put_osd(osd);
674 }
675
676 static void remove_all_osds(struct ceph_osd_client *osdc)
677 {
678         dout("%s %p\n", __func__, osdc);
679         mutex_lock(&osdc->request_mutex);
680         while (!RB_EMPTY_ROOT(&osdc->osds)) {
681                 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
682                                                 struct ceph_osd, o_node);
683                 __remove_osd(osdc, osd);
684         }
685         mutex_unlock(&osdc->request_mutex);
686 }
687
688 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
689                               struct ceph_osd *osd)
690 {
691         dout("__move_osd_to_lru %p\n", osd);
692         BUG_ON(!list_empty(&osd->o_osd_lru));
693         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
694         osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
695 }
696
697 static void __remove_osd_from_lru(struct ceph_osd *osd)
698 {
699         dout("__remove_osd_from_lru %p\n", osd);
700         if (!list_empty(&osd->o_osd_lru))
701                 list_del_init(&osd->o_osd_lru);
702 }
703
704 static void remove_old_osds(struct ceph_osd_client *osdc)
705 {
706         struct ceph_osd *osd, *nosd;
707
708         dout("__remove_old_osds %p\n", osdc);
709         mutex_lock(&osdc->request_mutex);
710         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
711                 if (time_before(jiffies, osd->lru_ttl))
712                         break;
713                 __remove_osd(osdc, osd);
714         }
715         mutex_unlock(&osdc->request_mutex);
716 }
717
718 /*
719  * reset osd connect
720  */
721 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
722 {
723         struct ceph_entity_addr *peer_addr;
724
725         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
726         if (list_empty(&osd->o_requests) &&
727             list_empty(&osd->o_linger_requests)) {
728                 __remove_osd(osdc, osd);
729
730                 return -ENODEV;
731         }
732
733         peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
734         if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
735                         !ceph_con_opened(&osd->o_con)) {
736                 struct ceph_osd_request *req;
737
738                 dout(" osd addr hasn't changed and connection never opened,"
739                      " letting msgr retry");
740                 /* touch each r_stamp for handle_timeout()'s benfit */
741                 list_for_each_entry(req, &osd->o_requests, r_osd_item)
742                         req->r_stamp = jiffies;
743
744                 return -EAGAIN;
745         }
746
747         ceph_con_close(&osd->o_con);
748         ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
749         osd->o_incarnation++;
750
751         return 0;
752 }
753
754 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
755 {
756         struct rb_node **p = &osdc->osds.rb_node;
757         struct rb_node *parent = NULL;
758         struct ceph_osd *osd = NULL;
759
760         dout("__insert_osd %p osd%d\n", new, new->o_osd);
761         while (*p) {
762                 parent = *p;
763                 osd = rb_entry(parent, struct ceph_osd, o_node);
764                 if (new->o_osd < osd->o_osd)
765                         p = &(*p)->rb_left;
766                 else if (new->o_osd > osd->o_osd)
767                         p = &(*p)->rb_right;
768                 else
769                         BUG();
770         }
771
772         rb_link_node(&new->o_node, parent, p);
773         rb_insert_color(&new->o_node, &osdc->osds);
774 }
775
776 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
777 {
778         struct ceph_osd *osd;
779         struct rb_node *n = osdc->osds.rb_node;
780
781         while (n) {
782                 osd = rb_entry(n, struct ceph_osd, o_node);
783                 if (o < osd->o_osd)
784                         n = n->rb_left;
785                 else if (o > osd->o_osd)
786                         n = n->rb_right;
787                 else
788                         return osd;
789         }
790         return NULL;
791 }
792
793 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
794 {
795         schedule_delayed_work(&osdc->timeout_work,
796                         osdc->client->options->osd_keepalive_timeout * HZ);
797 }
798
799 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
800 {
801         cancel_delayed_work(&osdc->timeout_work);
802 }
803
804 /*
805  * Register request, assign tid.  If this is the first request, set up
806  * the timeout event.
807  */
808 static void __register_request(struct ceph_osd_client *osdc,
809                                struct ceph_osd_request *req)
810 {
811         req->r_tid = ++osdc->last_tid;
812         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
813         dout("__register_request %p tid %lld\n", req, req->r_tid);
814         __insert_request(osdc, req);
815         ceph_osdc_get_request(req);
816         osdc->num_requests++;
817         if (osdc->num_requests == 1) {
818                 dout(" first request, scheduling timeout\n");
819                 __schedule_osd_timeout(osdc);
820         }
821 }
822
823 static void register_request(struct ceph_osd_client *osdc,
824                              struct ceph_osd_request *req)
825 {
826         mutex_lock(&osdc->request_mutex);
827         __register_request(osdc, req);
828         mutex_unlock(&osdc->request_mutex);
829 }
830
831 /*
832  * called under osdc->request_mutex
833  */
834 static void __unregister_request(struct ceph_osd_client *osdc,
835                                  struct ceph_osd_request *req)
836 {
837         if (RB_EMPTY_NODE(&req->r_node)) {
838                 dout("__unregister_request %p tid %lld not registered\n",
839                         req, req->r_tid);
840                 return;
841         }
842
843         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
844         rb_erase(&req->r_node, &osdc->requests);
845         osdc->num_requests--;
846
847         if (req->r_osd) {
848                 /* make sure the original request isn't in flight. */
849                 ceph_msg_revoke(req->r_request);
850
851                 list_del_init(&req->r_osd_item);
852                 if (list_empty(&req->r_osd->o_requests) &&
853                     list_empty(&req->r_osd->o_linger_requests)) {
854                         dout("moving osd to %p lru\n", req->r_osd);
855                         __move_osd_to_lru(osdc, req->r_osd);
856                 }
857                 if (list_empty(&req->r_linger_item))
858                         req->r_osd = NULL;
859         }
860
861         list_del_init(&req->r_req_lru_item);
862         ceph_osdc_put_request(req);
863
864         if (osdc->num_requests == 0) {
865                 dout(" no requests, canceling timeout\n");
866                 __cancel_osd_timeout(osdc);
867         }
868 }
869
870 /*
871  * Cancel a previously queued request message
872  */
873 static void __cancel_request(struct ceph_osd_request *req)
874 {
875         if (req->r_sent && req->r_osd) {
876                 ceph_msg_revoke(req->r_request);
877                 req->r_sent = 0;
878         }
879 }
880
881 static void __register_linger_request(struct ceph_osd_client *osdc,
882                                     struct ceph_osd_request *req)
883 {
884         dout("__register_linger_request %p\n", req);
885         list_add_tail(&req->r_linger_item, &osdc->req_linger);
886         if (req->r_osd)
887                 list_add_tail(&req->r_linger_osd,
888                               &req->r_osd->o_linger_requests);
889 }
890
891 static void __unregister_linger_request(struct ceph_osd_client *osdc,
892                                         struct ceph_osd_request *req)
893 {
894         dout("__unregister_linger_request %p\n", req);
895         list_del_init(&req->r_linger_item);
896         if (req->r_osd) {
897                 list_del_init(&req->r_linger_osd);
898
899                 if (list_empty(&req->r_osd->o_requests) &&
900                     list_empty(&req->r_osd->o_linger_requests)) {
901                         dout("moving osd to %p lru\n", req->r_osd);
902                         __move_osd_to_lru(osdc, req->r_osd);
903                 }
904                 if (list_empty(&req->r_osd_item))
905                         req->r_osd = NULL;
906         }
907 }
908
909 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
910                                          struct ceph_osd_request *req)
911 {
912         mutex_lock(&osdc->request_mutex);
913         if (req->r_linger) {
914                 __unregister_linger_request(osdc, req);
915                 ceph_osdc_put_request(req);
916         }
917         mutex_unlock(&osdc->request_mutex);
918 }
919 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
920
921 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
922                                   struct ceph_osd_request *req)
923 {
924         if (!req->r_linger) {
925                 dout("set_request_linger %p\n", req);
926                 req->r_linger = 1;
927                 /*
928                  * caller is now responsible for calling
929                  * unregister_linger_request
930                  */
931                 ceph_osdc_get_request(req);
932         }
933 }
934 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
935
936 /*
937  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
938  * (as needed), and set the request r_osd appropriately.  If there is
939  * no up osd, set r_osd to NULL.  Move the request to the appropriate list
940  * (unsent, homeless) or leave on in-flight lru.
941  *
942  * Return 0 if unchanged, 1 if changed, or negative on error.
943  *
944  * Caller should hold map_sem for read and request_mutex.
945  */
946 static int __map_request(struct ceph_osd_client *osdc,
947                          struct ceph_osd_request *req, int force_resend)
948 {
949         struct ceph_pg pgid;
950         int acting[CEPH_PG_MAX_SIZE];
951         int o = -1, num = 0;
952         int err;
953
954         dout("map_request %p tid %lld\n", req, req->r_tid);
955         err = ceph_calc_object_layout(&pgid, req->r_oid,
956                                       &req->r_file_layout, osdc->osdmap);
957         if (err) {
958                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
959                 return err;
960         }
961         req->r_pgid = pgid;
962
963         err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
964         if (err > 0) {
965                 o = acting[0];
966                 num = err;
967         }
968
969         if ((!force_resend &&
970              req->r_osd && req->r_osd->o_osd == o &&
971              req->r_sent >= req->r_osd->o_incarnation &&
972              req->r_num_pg_osds == num &&
973              memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
974             (req->r_osd == NULL && o == -1))
975                 return 0;  /* no change */
976
977         dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
978              req->r_tid, pgid.pool, pgid.seed, o,
979              req->r_osd ? req->r_osd->o_osd : -1);
980
981         /* record full pg acting set */
982         memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
983         req->r_num_pg_osds = num;
984
985         if (req->r_osd) {
986                 __cancel_request(req);
987                 list_del_init(&req->r_osd_item);
988                 req->r_osd = NULL;
989         }
990
991         req->r_osd = __lookup_osd(osdc, o);
992         if (!req->r_osd && o >= 0) {
993                 err = -ENOMEM;
994                 req->r_osd = create_osd(osdc, o);
995                 if (!req->r_osd) {
996                         list_move(&req->r_req_lru_item, &osdc->req_notarget);
997                         goto out;
998                 }
999
1000                 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1001                 __insert_osd(osdc, req->r_osd);
1002
1003                 ceph_con_open(&req->r_osd->o_con,
1004                               CEPH_ENTITY_TYPE_OSD, o,
1005                               &osdc->osdmap->osd_addr[o]);
1006         }
1007
1008         if (req->r_osd) {
1009                 __remove_osd_from_lru(req->r_osd);
1010                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
1011                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
1012         } else {
1013                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1014         }
1015         err = 1;   /* osd or pg changed */
1016
1017 out:
1018         return err;
1019 }
1020
1021 /*
1022  * caller should hold map_sem (for read) and request_mutex
1023  */
1024 static void __send_request(struct ceph_osd_client *osdc,
1025                            struct ceph_osd_request *req)
1026 {
1027         void *p;
1028
1029         dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1030              req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1031              (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1032
1033         /* fill in message content that changes each time we send it */
1034         put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035         put_unaligned_le32(req->r_flags, req->r_request_flags);
1036         put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037         p = req->r_request_pgid;
1038         ceph_encode_64(&p, req->r_pgid.pool);
1039         ceph_encode_32(&p, req->r_pgid.seed);
1040         put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
1041         memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042                sizeof(req->r_reassert_version));
1043
1044         req->r_stamp = jiffies;
1045         list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1046
1047         ceph_msg_get(req->r_request); /* send consumes a ref */
1048         ceph_con_send(&req->r_osd->o_con, req->r_request);
1049         req->r_sent = req->r_osd->o_incarnation;
1050 }
1051
1052 /*
1053  * Send any requests in the queue (req_unsent).
1054  */
1055 static void __send_queued(struct ceph_osd_client *osdc)
1056 {
1057         struct ceph_osd_request *req, *tmp;
1058
1059         dout("__send_queued\n");
1060         list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1061                 __send_request(osdc, req);
1062 }
1063
1064 /*
1065  * Timeout callback, called every N seconds when 1 or more osd
1066  * requests has been active for more than N seconds.  When this
1067  * happens, we ping all OSDs with requests who have timed out to
1068  * ensure any communications channel reset is detected.  Reset the
1069  * request timeouts another N seconds in the future as we go.
1070  * Reschedule the timeout event another N seconds in future (unless
1071  * there are no open requests).
1072  */
1073 static void handle_timeout(struct work_struct *work)
1074 {
1075         struct ceph_osd_client *osdc =
1076                 container_of(work, struct ceph_osd_client, timeout_work.work);
1077         struct ceph_osd_request *req;
1078         struct ceph_osd *osd;
1079         unsigned long keepalive =
1080                 osdc->client->options->osd_keepalive_timeout * HZ;
1081         struct list_head slow_osds;
1082         dout("timeout\n");
1083         down_read(&osdc->map_sem);
1084
1085         ceph_monc_request_next_osdmap(&osdc->client->monc);
1086
1087         mutex_lock(&osdc->request_mutex);
1088
1089         /*
1090          * ping osds that are a bit slow.  this ensures that if there
1091          * is a break in the TCP connection we will notice, and reopen
1092          * a connection with that osd (from the fault callback).
1093          */
1094         INIT_LIST_HEAD(&slow_osds);
1095         list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1096                 if (time_before(jiffies, req->r_stamp + keepalive))
1097                         break;
1098
1099                 osd = req->r_osd;
1100                 BUG_ON(!osd);
1101                 dout(" tid %llu is slow, will send keepalive on osd%d\n",
1102                      req->r_tid, osd->o_osd);
1103                 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1104         }
1105         while (!list_empty(&slow_osds)) {
1106                 osd = list_entry(slow_osds.next, struct ceph_osd,
1107                                  o_keepalive_item);
1108                 list_del_init(&osd->o_keepalive_item);
1109                 ceph_con_keepalive(&osd->o_con);
1110         }
1111
1112         __schedule_osd_timeout(osdc);
1113         __send_queued(osdc);
1114         mutex_unlock(&osdc->request_mutex);
1115         up_read(&osdc->map_sem);
1116 }
1117
1118 static void handle_osds_timeout(struct work_struct *work)
1119 {
1120         struct ceph_osd_client *osdc =
1121                 container_of(work, struct ceph_osd_client,
1122                              osds_timeout_work.work);
1123         unsigned long delay =
1124                 osdc->client->options->osd_idle_ttl * HZ >> 2;
1125
1126         dout("osds timeout\n");
1127         down_read(&osdc->map_sem);
1128         remove_old_osds(osdc);
1129         up_read(&osdc->map_sem);
1130
1131         schedule_delayed_work(&osdc->osds_timeout_work,
1132                               round_jiffies_relative(delay));
1133 }
1134
1135 static void complete_request(struct ceph_osd_request *req)
1136 {
1137         if (req->r_safe_callback)
1138                 req->r_safe_callback(req, NULL);
1139         complete_all(&req->r_safe_completion);  /* fsync waiter */
1140 }
1141
1142 static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143 {
1144         __u8 v;
1145
1146         ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147         v = ceph_decode_8(p);
1148         if (v > 1) {
1149                 pr_warning("do not understand pg encoding %d > 1", v);
1150                 return -EINVAL;
1151         }
1152         pgid->pool = ceph_decode_64(p);
1153         pgid->seed = ceph_decode_32(p);
1154         *p += 4;
1155         return 0;
1156
1157 bad:
1158         pr_warning("incomplete pg encoding");
1159         return -EINVAL;
1160 }
1161
1162 /*
1163  * handle osd op reply.  either call the callback if it is specified,
1164  * or do the completion to wake up the waiting thread.
1165  */
1166 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1167                          struct ceph_connection *con)
1168 {
1169         void *p, *end;
1170         struct ceph_osd_request *req;
1171         u64 tid;
1172         int object_len;
1173         int numops, payload_len, flags;
1174         s32 result;
1175         s32 retry_attempt;
1176         struct ceph_pg pg;
1177         int err;
1178         u32 reassert_epoch;
1179         u64 reassert_version;
1180         u32 osdmap_epoch;
1181         int i;
1182
1183         tid = le64_to_cpu(msg->hdr.tid);
1184         dout("handle_reply %p tid %llu\n", msg, tid);
1185
1186         p = msg->front.iov_base;
1187         end = p + msg->front.iov_len;
1188
1189         ceph_decode_need(&p, end, 4, bad);
1190         object_len = ceph_decode_32(&p);
1191         ceph_decode_need(&p, end, object_len, bad);
1192         p += object_len;
1193
1194         err = __decode_pgid(&p, end, &pg);
1195         if (err)
1196                 goto bad;
1197
1198         ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199         flags = ceph_decode_64(&p);
1200         result = ceph_decode_32(&p);
1201         reassert_epoch = ceph_decode_32(&p);
1202         reassert_version = ceph_decode_64(&p);
1203         osdmap_epoch = ceph_decode_32(&p);
1204
1205         /* lookup */
1206         mutex_lock(&osdc->request_mutex);
1207         req = __lookup_request(osdc, tid);
1208         if (req == NULL) {
1209                 dout("handle_reply tid %llu dne\n", tid);
1210                 mutex_unlock(&osdc->request_mutex);
1211                 return;
1212         }
1213         ceph_osdc_get_request(req);
1214
1215         dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216              req, result);
1217
1218         ceph_decode_need(&p, end, 4, bad);
1219         numops = ceph_decode_32(&p);
1220         if (numops > CEPH_OSD_MAX_OP)
1221                 goto bad_put;
1222         if (numops != req->r_num_ops)
1223                 goto bad_put;
1224         payload_len = 0;
1225         ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226         for (i = 0; i < numops; i++) {
1227                 struct ceph_osd_op *op = p;
1228                 int len;
1229
1230                 len = le32_to_cpu(op->payload_len);
1231                 req->r_reply_op_len[i] = len;
1232                 dout(" op %d has %d bytes\n", i, len);
1233                 payload_len += len;
1234                 p += sizeof(*op);
1235         }
1236         if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237                 pr_warning("sum of op payload lens %d != data_len %d",
1238                            payload_len, le32_to_cpu(msg->hdr.data_len));
1239                 goto bad_put;
1240         }
1241
1242         ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243         retry_attempt = ceph_decode_32(&p);
1244         for (i = 0; i < numops; i++)
1245                 req->r_reply_op_result[i] = ceph_decode_32(&p);
1246
1247         /*
1248          * if this connection filled our message, drop our reference now, to
1249          * avoid a (safe but slower) revoke later.
1250          */
1251         if (req->r_con_filling_msg == con && req->r_reply == msg) {
1252                 dout(" dropping con_filling_msg ref %p\n", con);
1253                 req->r_con_filling_msg = NULL;
1254                 con->ops->put(con);
1255         }
1256
1257         if (!req->r_got_reply) {
1258                 unsigned int bytes;
1259
1260                 req->r_result = result;
1261                 bytes = le32_to_cpu(msg->hdr.data_len);
1262                 dout("handle_reply result %d bytes %d\n", req->r_result,
1263                      bytes);
1264                 if (req->r_result == 0)
1265                         req->r_result = bytes;
1266
1267                 /* in case this is a write and we need to replay, */
1268                 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269                 req->r_reassert_version.version = cpu_to_le64(reassert_version);
1270
1271                 req->r_got_reply = 1;
1272         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1273                 dout("handle_reply tid %llu dup ack\n", tid);
1274                 mutex_unlock(&osdc->request_mutex);
1275                 goto done;
1276         }
1277
1278         dout("handle_reply tid %llu flags %d\n", tid, flags);
1279
1280         if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1281                 __register_linger_request(osdc, req);
1282
1283         /* either this is a read, or we got the safe response */
1284         if (result < 0 ||
1285             (flags & CEPH_OSD_FLAG_ONDISK) ||
1286             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1287                 __unregister_request(osdc, req);
1288
1289         mutex_unlock(&osdc->request_mutex);
1290
1291         if (req->r_callback)
1292                 req->r_callback(req, msg);
1293         else
1294                 complete_all(&req->r_completion);
1295
1296         if (flags & CEPH_OSD_FLAG_ONDISK)
1297                 complete_request(req);
1298
1299 done:
1300         dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1301         ceph_osdc_put_request(req);
1302         return;
1303
1304 bad_put:
1305         ceph_osdc_put_request(req);
1306 bad:
1307         pr_err("corrupt osd_op_reply got %d %d\n",
1308                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1309         ceph_msg_dump(msg);
1310 }
1311
1312 static void reset_changed_osds(struct ceph_osd_client *osdc)
1313 {
1314         struct rb_node *p, *n;
1315
1316         for (p = rb_first(&osdc->osds); p; p = n) {
1317                 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1318
1319                 n = rb_next(p);
1320                 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1321                     memcmp(&osd->o_con.peer_addr,
1322                            ceph_osd_addr(osdc->osdmap,
1323                                          osd->o_osd),
1324                            sizeof(struct ceph_entity_addr)) != 0)
1325                         __reset_osd(osdc, osd);
1326         }
1327 }
1328
1329 /*
1330  * Requeue requests whose mapping to an OSD has changed.  If requests map to
1331  * no osd, request a new map.
1332  *
1333  * Caller should hold map_sem for read.
1334  */
1335 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1336 {
1337         struct ceph_osd_request *req, *nreq;
1338         struct rb_node *p;
1339         int needmap = 0;
1340         int err;
1341
1342         dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1343         mutex_lock(&osdc->request_mutex);
1344         for (p = rb_first(&osdc->requests); p; ) {
1345                 req = rb_entry(p, struct ceph_osd_request, r_node);
1346                 p = rb_next(p);
1347
1348                 /*
1349                  * For linger requests that have not yet been
1350                  * registered, move them to the linger list; they'll
1351                  * be sent to the osd in the loop below.  Unregister
1352                  * the request before re-registering it as a linger
1353                  * request to ensure the __map_request() below
1354                  * will decide it needs to be sent.
1355                  */
1356                 if (req->r_linger && list_empty(&req->r_linger_item)) {
1357                         dout("%p tid %llu restart on osd%d\n",
1358                              req, req->r_tid,
1359                              req->r_osd ? req->r_osd->o_osd : -1);
1360                         __unregister_request(osdc, req);
1361                         __register_linger_request(osdc, req);
1362                         continue;
1363                 }
1364
1365                 err = __map_request(osdc, req, force_resend);
1366                 if (err < 0)
1367                         continue;  /* error */
1368                 if (req->r_osd == NULL) {
1369                         dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1370                         needmap++;  /* request a newer map */
1371                 } else if (err > 0) {
1372                         if (!req->r_linger) {
1373                                 dout("%p tid %llu requeued on osd%d\n", req,
1374                                      req->r_tid,
1375                                      req->r_osd ? req->r_osd->o_osd : -1);
1376                                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1377                         }
1378                 }
1379         }
1380
1381         list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1382                                  r_linger_item) {
1383                 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1384
1385                 err = __map_request(osdc, req, force_resend);
1386                 dout("__map_request returned %d\n", err);
1387                 if (err == 0)
1388                         continue;  /* no change and no osd was specified */
1389                 if (err < 0)
1390                         continue;  /* hrm! */
1391                 if (req->r_osd == NULL) {
1392                         dout("tid %llu maps to no valid osd\n", req->r_tid);
1393                         needmap++;  /* request a newer map */
1394                         continue;
1395                 }
1396
1397                 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1398                      req->r_osd ? req->r_osd->o_osd : -1);
1399                 __register_request(osdc, req);
1400                 __unregister_linger_request(osdc, req);
1401         }
1402         mutex_unlock(&osdc->request_mutex);
1403
1404         if (needmap) {
1405                 dout("%d requests for down osds, need new map\n", needmap);
1406                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1407         }
1408         reset_changed_osds(osdc);
1409 }
1410
1411
1412 /*
1413  * Process updated osd map.
1414  *
1415  * The message contains any number of incremental and full maps, normally
1416  * indicating some sort of topology change in the cluster.  Kick requests
1417  * off to different OSDs as needed.
1418  */
1419 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1420 {
1421         void *p, *end, *next;
1422         u32 nr_maps, maplen;
1423         u32 epoch;
1424         struct ceph_osdmap *newmap = NULL, *oldmap;
1425         int err;
1426         struct ceph_fsid fsid;
1427
1428         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1429         p = msg->front.iov_base;
1430         end = p + msg->front.iov_len;
1431
1432         /* verify fsid */
1433         ceph_decode_need(&p, end, sizeof(fsid), bad);
1434         ceph_decode_copy(&p, &fsid, sizeof(fsid));
1435         if (ceph_check_fsid(osdc->client, &fsid) < 0)
1436                 return;
1437
1438         down_write(&osdc->map_sem);
1439
1440         /* incremental maps */
1441         ceph_decode_32_safe(&p, end, nr_maps, bad);
1442         dout(" %d inc maps\n", nr_maps);
1443         while (nr_maps > 0) {
1444                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1445                 epoch = ceph_decode_32(&p);
1446                 maplen = ceph_decode_32(&p);
1447                 ceph_decode_need(&p, end, maplen, bad);
1448                 next = p + maplen;
1449                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1450                         dout("applying incremental map %u len %d\n",
1451                              epoch, maplen);
1452                         newmap = osdmap_apply_incremental(&p, next,
1453                                                           osdc->osdmap,
1454                                                           &osdc->client->msgr);
1455                         if (IS_ERR(newmap)) {
1456                                 err = PTR_ERR(newmap);
1457                                 goto bad;
1458                         }
1459                         BUG_ON(!newmap);
1460                         if (newmap != osdc->osdmap) {
1461                                 ceph_osdmap_destroy(osdc->osdmap);
1462                                 osdc->osdmap = newmap;
1463                         }
1464                         kick_requests(osdc, 0);
1465                 } else {
1466                         dout("ignoring incremental map %u len %d\n",
1467                              epoch, maplen);
1468                 }
1469                 p = next;
1470                 nr_maps--;
1471         }
1472         if (newmap)
1473                 goto done;
1474
1475         /* full maps */
1476         ceph_decode_32_safe(&p, end, nr_maps, bad);
1477         dout(" %d full maps\n", nr_maps);
1478         while (nr_maps) {
1479                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1480                 epoch = ceph_decode_32(&p);
1481                 maplen = ceph_decode_32(&p);
1482                 ceph_decode_need(&p, end, maplen, bad);
1483                 if (nr_maps > 1) {
1484                         dout("skipping non-latest full map %u len %d\n",
1485                              epoch, maplen);
1486                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1487                         dout("skipping full map %u len %d, "
1488                              "older than our %u\n", epoch, maplen,
1489                              osdc->osdmap->epoch);
1490                 } else {
1491                         int skipped_map = 0;
1492
1493                         dout("taking full map %u len %d\n", epoch, maplen);
1494                         newmap = osdmap_decode(&p, p+maplen);
1495                         if (IS_ERR(newmap)) {
1496                                 err = PTR_ERR(newmap);
1497                                 goto bad;
1498                         }
1499                         BUG_ON(!newmap);
1500                         oldmap = osdc->osdmap;
1501                         osdc->osdmap = newmap;
1502                         if (oldmap) {
1503                                 if (oldmap->epoch + 1 < newmap->epoch)
1504                                         skipped_map = 1;
1505                                 ceph_osdmap_destroy(oldmap);
1506                         }
1507                         kick_requests(osdc, skipped_map);
1508                 }
1509                 p += maplen;
1510                 nr_maps--;
1511         }
1512
1513 done:
1514         downgrade_write(&osdc->map_sem);
1515         ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1516
1517         /*
1518          * subscribe to subsequent osdmap updates if full to ensure
1519          * we find out when we are no longer full and stop returning
1520          * ENOSPC.
1521          */
1522         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1523                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1524
1525         mutex_lock(&osdc->request_mutex);
1526         __send_queued(osdc);
1527         mutex_unlock(&osdc->request_mutex);
1528         up_read(&osdc->map_sem);
1529         wake_up_all(&osdc->client->auth_wq);
1530         return;
1531
1532 bad:
1533         pr_err("osdc handle_map corrupt msg\n");
1534         ceph_msg_dump(msg);
1535         up_write(&osdc->map_sem);
1536         return;
1537 }
1538
1539 /*
1540  * watch/notify callback event infrastructure
1541  *
1542  * These callbacks are used both for watch and notify operations.
1543  */
1544 static void __release_event(struct kref *kref)
1545 {
1546         struct ceph_osd_event *event =
1547                 container_of(kref, struct ceph_osd_event, kref);
1548
1549         dout("__release_event %p\n", event);
1550         kfree(event);
1551 }
1552
1553 static void get_event(struct ceph_osd_event *event)
1554 {
1555         kref_get(&event->kref);
1556 }
1557
1558 void ceph_osdc_put_event(struct ceph_osd_event *event)
1559 {
1560         kref_put(&event->kref, __release_event);
1561 }
1562 EXPORT_SYMBOL(ceph_osdc_put_event);
1563
1564 static void __insert_event(struct ceph_osd_client *osdc,
1565                              struct ceph_osd_event *new)
1566 {
1567         struct rb_node **p = &osdc->event_tree.rb_node;
1568         struct rb_node *parent = NULL;
1569         struct ceph_osd_event *event = NULL;
1570
1571         while (*p) {
1572                 parent = *p;
1573                 event = rb_entry(parent, struct ceph_osd_event, node);
1574                 if (new->cookie < event->cookie)
1575                         p = &(*p)->rb_left;
1576                 else if (new->cookie > event->cookie)
1577                         p = &(*p)->rb_right;
1578                 else
1579                         BUG();
1580         }
1581
1582         rb_link_node(&new->node, parent, p);
1583         rb_insert_color(&new->node, &osdc->event_tree);
1584 }
1585
1586 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1587                                                 u64 cookie)
1588 {
1589         struct rb_node **p = &osdc->event_tree.rb_node;
1590         struct rb_node *parent = NULL;
1591         struct ceph_osd_event *event = NULL;
1592
1593         while (*p) {
1594                 parent = *p;
1595                 event = rb_entry(parent, struct ceph_osd_event, node);
1596                 if (cookie < event->cookie)
1597                         p = &(*p)->rb_left;
1598                 else if (cookie > event->cookie)
1599                         p = &(*p)->rb_right;
1600                 else
1601                         return event;
1602         }
1603         return NULL;
1604 }
1605
1606 static void __remove_event(struct ceph_osd_event *event)
1607 {
1608         struct ceph_osd_client *osdc = event->osdc;
1609
1610         if (!RB_EMPTY_NODE(&event->node)) {
1611                 dout("__remove_event removed %p\n", event);
1612                 rb_erase(&event->node, &osdc->event_tree);
1613                 ceph_osdc_put_event(event);
1614         } else {
1615                 dout("__remove_event didn't remove %p\n", event);
1616         }
1617 }
1618
1619 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1620                            void (*event_cb)(u64, u64, u8, void *),
1621                            void *data, struct ceph_osd_event **pevent)
1622 {
1623         struct ceph_osd_event *event;
1624
1625         event = kmalloc(sizeof(*event), GFP_NOIO);
1626         if (!event)
1627                 return -ENOMEM;
1628
1629         dout("create_event %p\n", event);
1630         event->cb = event_cb;
1631         event->one_shot = 0;
1632         event->data = data;
1633         event->osdc = osdc;
1634         INIT_LIST_HEAD(&event->osd_node);
1635         RB_CLEAR_NODE(&event->node);
1636         kref_init(&event->kref);   /* one ref for us */
1637         kref_get(&event->kref);    /* one ref for the caller */
1638
1639         spin_lock(&osdc->event_lock);
1640         event->cookie = ++osdc->event_count;
1641         __insert_event(osdc, event);
1642         spin_unlock(&osdc->event_lock);
1643
1644         *pevent = event;
1645         return 0;
1646 }
1647 EXPORT_SYMBOL(ceph_osdc_create_event);
1648
1649 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1650 {
1651         struct ceph_osd_client *osdc = event->osdc;
1652
1653         dout("cancel_event %p\n", event);
1654         spin_lock(&osdc->event_lock);
1655         __remove_event(event);
1656         spin_unlock(&osdc->event_lock);
1657         ceph_osdc_put_event(event); /* caller's */
1658 }
1659 EXPORT_SYMBOL(ceph_osdc_cancel_event);
1660
1661
1662 static void do_event_work(struct work_struct *work)
1663 {
1664         struct ceph_osd_event_work *event_work =
1665                 container_of(work, struct ceph_osd_event_work, work);
1666         struct ceph_osd_event *event = event_work->event;
1667         u64 ver = event_work->ver;
1668         u64 notify_id = event_work->notify_id;
1669         u8 opcode = event_work->opcode;
1670
1671         dout("do_event_work completing %p\n", event);
1672         event->cb(ver, notify_id, opcode, event->data);
1673         dout("do_event_work completed %p\n", event);
1674         ceph_osdc_put_event(event);
1675         kfree(event_work);
1676 }
1677
1678
1679 /*
1680  * Process osd watch notifications
1681  */
1682 static void handle_watch_notify(struct ceph_osd_client *osdc,
1683                                 struct ceph_msg *msg)
1684 {
1685         void *p, *end;
1686         u8 proto_ver;
1687         u64 cookie, ver, notify_id;
1688         u8 opcode;
1689         struct ceph_osd_event *event;
1690         struct ceph_osd_event_work *event_work;
1691
1692         p = msg->front.iov_base;
1693         end = p + msg->front.iov_len;
1694
1695         ceph_decode_8_safe(&p, end, proto_ver, bad);
1696         ceph_decode_8_safe(&p, end, opcode, bad);
1697         ceph_decode_64_safe(&p, end, cookie, bad);
1698         ceph_decode_64_safe(&p, end, ver, bad);
1699         ceph_decode_64_safe(&p, end, notify_id, bad);
1700
1701         spin_lock(&osdc->event_lock);
1702         event = __find_event(osdc, cookie);
1703         if (event) {
1704                 BUG_ON(event->one_shot);
1705                 get_event(event);
1706         }
1707         spin_unlock(&osdc->event_lock);
1708         dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1709              cookie, ver, event);
1710         if (event) {
1711                 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1712                 if (!event_work) {
1713                         dout("ERROR: could not allocate event_work\n");
1714                         goto done_err;
1715                 }
1716                 INIT_WORK(&event_work->work, do_event_work);
1717                 event_work->event = event;
1718                 event_work->ver = ver;
1719                 event_work->notify_id = notify_id;
1720                 event_work->opcode = opcode;
1721                 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1722                         dout("WARNING: failed to queue notify event work\n");
1723                         goto done_err;
1724                 }
1725         }
1726
1727         return;
1728
1729 done_err:
1730         ceph_osdc_put_event(event);
1731         return;
1732
1733 bad:
1734         pr_err("osdc handle_watch_notify corrupt msg\n");
1735         return;
1736 }
1737
1738 /*
1739  * Register request, send initial attempt.
1740  */
1741 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1742                             struct ceph_osd_request *req,
1743                             bool nofail)
1744 {
1745         int rc = 0;
1746
1747         req->r_request->pages = req->r_pages;
1748         req->r_request->nr_pages = req->r_num_pages;
1749 #ifdef CONFIG_BLOCK
1750         req->r_request->bio = req->r_bio;
1751 #endif
1752         req->r_request->trail = &req->r_trail;
1753
1754         register_request(osdc, req);
1755
1756         down_read(&osdc->map_sem);
1757         mutex_lock(&osdc->request_mutex);
1758         /*
1759          * a racing kick_requests() may have sent the message for us
1760          * while we dropped request_mutex above, so only send now if
1761          * the request still han't been touched yet.
1762          */
1763         if (req->r_sent == 0) {
1764                 rc = __map_request(osdc, req, 0);
1765                 if (rc < 0) {
1766                         if (nofail) {
1767                                 dout("osdc_start_request failed map, "
1768                                      " will retry %lld\n", req->r_tid);
1769                                 rc = 0;
1770                         }
1771                         goto out_unlock;
1772                 }
1773                 if (req->r_osd == NULL) {
1774                         dout("send_request %p no up osds in pg\n", req);
1775                         ceph_monc_request_next_osdmap(&osdc->client->monc);
1776                 } else {
1777                         __send_request(osdc, req);
1778                 }
1779                 rc = 0;
1780         }
1781
1782 out_unlock:
1783         mutex_unlock(&osdc->request_mutex);
1784         up_read(&osdc->map_sem);
1785         return rc;
1786 }
1787 EXPORT_SYMBOL(ceph_osdc_start_request);
1788
1789 /*
1790  * wait for a request to complete
1791  */
1792 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1793                            struct ceph_osd_request *req)
1794 {
1795         int rc;
1796
1797         rc = wait_for_completion_interruptible(&req->r_completion);
1798         if (rc < 0) {
1799                 mutex_lock(&osdc->request_mutex);
1800                 __cancel_request(req);
1801                 __unregister_request(osdc, req);
1802                 mutex_unlock(&osdc->request_mutex);
1803                 complete_request(req);
1804                 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1805                 return rc;
1806         }
1807
1808         dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1809         return req->r_result;
1810 }
1811 EXPORT_SYMBOL(ceph_osdc_wait_request);
1812
1813 /*
1814  * sync - wait for all in-flight requests to flush.  avoid starvation.
1815  */
1816 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1817 {
1818         struct ceph_osd_request *req;
1819         u64 last_tid, next_tid = 0;
1820
1821         mutex_lock(&osdc->request_mutex);
1822         last_tid = osdc->last_tid;
1823         while (1) {
1824                 req = __lookup_request_ge(osdc, next_tid);
1825                 if (!req)
1826                         break;
1827                 if (req->r_tid > last_tid)
1828                         break;
1829
1830                 next_tid = req->r_tid + 1;
1831                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1832                         continue;
1833
1834                 ceph_osdc_get_request(req);
1835                 mutex_unlock(&osdc->request_mutex);
1836                 dout("sync waiting on tid %llu (last is %llu)\n",
1837                      req->r_tid, last_tid);
1838                 wait_for_completion(&req->r_safe_completion);
1839                 mutex_lock(&osdc->request_mutex);
1840                 ceph_osdc_put_request(req);
1841         }
1842         mutex_unlock(&osdc->request_mutex);
1843         dout("sync done (thru tid %llu)\n", last_tid);
1844 }
1845 EXPORT_SYMBOL(ceph_osdc_sync);
1846
1847 /*
1848  * init, shutdown
1849  */
1850 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1851 {
1852         int err;
1853
1854         dout("init\n");
1855         osdc->client = client;
1856         osdc->osdmap = NULL;
1857         init_rwsem(&osdc->map_sem);
1858         init_completion(&osdc->map_waiters);
1859         osdc->last_requested_map = 0;
1860         mutex_init(&osdc->request_mutex);
1861         osdc->last_tid = 0;
1862         osdc->osds = RB_ROOT;
1863         INIT_LIST_HEAD(&osdc->osd_lru);
1864         osdc->requests = RB_ROOT;
1865         INIT_LIST_HEAD(&osdc->req_lru);
1866         INIT_LIST_HEAD(&osdc->req_unsent);
1867         INIT_LIST_HEAD(&osdc->req_notarget);
1868         INIT_LIST_HEAD(&osdc->req_linger);
1869         osdc->num_requests = 0;
1870         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1871         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1872         spin_lock_init(&osdc->event_lock);
1873         osdc->event_tree = RB_ROOT;
1874         osdc->event_count = 0;
1875
1876         schedule_delayed_work(&osdc->osds_timeout_work,
1877            round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1878
1879         err = -ENOMEM;
1880         osdc->req_mempool = mempool_create_kmalloc_pool(10,
1881                                         sizeof(struct ceph_osd_request));
1882         if (!osdc->req_mempool)
1883                 goto out;
1884
1885         err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1886                                 OSD_OP_FRONT_LEN, 10, true,
1887                                 "osd_op");
1888         if (err < 0)
1889                 goto out_mempool;
1890         err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
1891                                 OSD_OPREPLY_FRONT_LEN, 10, true,
1892                                 "osd_op_reply");
1893         if (err < 0)
1894                 goto out_msgpool;
1895
1896         osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1897         if (IS_ERR(osdc->notify_wq)) {
1898                 err = PTR_ERR(osdc->notify_wq);
1899                 osdc->notify_wq = NULL;
1900                 goto out_msgpool;
1901         }
1902         return 0;
1903
1904 out_msgpool:
1905         ceph_msgpool_destroy(&osdc->msgpool_op);
1906 out_mempool:
1907         mempool_destroy(osdc->req_mempool);
1908 out:
1909         return err;
1910 }
1911
1912 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1913 {
1914         flush_workqueue(osdc->notify_wq);
1915         destroy_workqueue(osdc->notify_wq);
1916         cancel_delayed_work_sync(&osdc->timeout_work);
1917         cancel_delayed_work_sync(&osdc->osds_timeout_work);
1918         if (osdc->osdmap) {
1919                 ceph_osdmap_destroy(osdc->osdmap);
1920                 osdc->osdmap = NULL;
1921         }
1922         remove_all_osds(osdc);
1923         mempool_destroy(osdc->req_mempool);
1924         ceph_msgpool_destroy(&osdc->msgpool_op);
1925         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1926 }
1927
1928 /*
1929  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1930  * *plen.  Return number of bytes read, or error.
1931  */
1932 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1933                         struct ceph_vino vino, struct ceph_file_layout *layout,
1934                         u64 off, u64 *plen,
1935                         u32 truncate_seq, u64 truncate_size,
1936                         struct page **pages, int num_pages, int page_align)
1937 {
1938         struct ceph_osd_request *req;
1939         int rc = 0;
1940
1941         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1942              vino.snap, off, *plen);
1943         req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1944                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1945                                     NULL, 0, truncate_seq, truncate_size, NULL,
1946                                     false, page_align);
1947         if (IS_ERR(req))
1948                 return PTR_ERR(req);
1949
1950         /* it may be a short read due to an object boundary */
1951         req->r_pages = pages;
1952
1953         dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1954              off, *plen, req->r_num_pages, page_align);
1955
1956         rc = ceph_osdc_start_request(osdc, req, false);
1957         if (!rc)
1958                 rc = ceph_osdc_wait_request(osdc, req);
1959
1960         ceph_osdc_put_request(req);
1961         dout("readpages result %d\n", rc);
1962         return rc;
1963 }
1964 EXPORT_SYMBOL(ceph_osdc_readpages);
1965
1966 /*
1967  * do a synchronous write on N pages
1968  */
1969 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1970                          struct ceph_file_layout *layout,
1971                          struct ceph_snap_context *snapc,
1972                          u64 off, u64 len,
1973                          u32 truncate_seq, u64 truncate_size,
1974                          struct timespec *mtime,
1975                          struct page **pages, int num_pages)
1976 {
1977         struct ceph_osd_request *req;
1978         int rc = 0;
1979         int page_align = off & ~PAGE_MASK;
1980
1981         BUG_ON(vino.snap != CEPH_NOSNAP);
1982         req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1983                                     CEPH_OSD_OP_WRITE,
1984                                     CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1985                                     snapc, 0,
1986                                     truncate_seq, truncate_size, mtime,
1987                                     true, page_align);
1988         if (IS_ERR(req))
1989                 return PTR_ERR(req);
1990
1991         /* it may be a short write due to an object boundary */
1992         req->r_pages = pages;
1993         dout("writepages %llu~%llu (%d pages)\n", off, len,
1994              req->r_num_pages);
1995
1996         rc = ceph_osdc_start_request(osdc, req, true);
1997         if (!rc)
1998                 rc = ceph_osdc_wait_request(osdc, req);
1999
2000         ceph_osdc_put_request(req);
2001         if (rc == 0)
2002                 rc = len;
2003         dout("writepages result %d\n", rc);
2004         return rc;
2005 }
2006 EXPORT_SYMBOL(ceph_osdc_writepages);
2007
2008 /*
2009  * handle incoming message
2010  */
2011 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2012 {
2013         struct ceph_osd *osd = con->private;
2014         struct ceph_osd_client *osdc;
2015         int type = le16_to_cpu(msg->hdr.type);
2016
2017         if (!osd)
2018                 goto out;
2019         osdc = osd->o_osdc;
2020
2021         switch (type) {
2022         case CEPH_MSG_OSD_MAP:
2023                 ceph_osdc_handle_map(osdc, msg);
2024                 break;
2025         case CEPH_MSG_OSD_OPREPLY:
2026                 handle_reply(osdc, msg, con);
2027                 break;
2028         case CEPH_MSG_WATCH_NOTIFY:
2029                 handle_watch_notify(osdc, msg);
2030                 break;
2031
2032         default:
2033                 pr_err("received unknown message type %d %s\n", type,
2034                        ceph_msg_type_name(type));
2035         }
2036 out:
2037         ceph_msg_put(msg);
2038 }
2039
2040 /*
2041  * lookup and return message for incoming reply.  set up reply message
2042  * pages.
2043  */
2044 static struct ceph_msg *get_reply(struct ceph_connection *con,
2045                                   struct ceph_msg_header *hdr,
2046                                   int *skip)
2047 {
2048         struct ceph_osd *osd = con->private;
2049         struct ceph_osd_client *osdc = osd->o_osdc;
2050         struct ceph_msg *m;
2051         struct ceph_osd_request *req;
2052         int front = le32_to_cpu(hdr->front_len);
2053         int data_len = le32_to_cpu(hdr->data_len);
2054         u64 tid;
2055
2056         tid = le64_to_cpu(hdr->tid);
2057         mutex_lock(&osdc->request_mutex);
2058         req = __lookup_request(osdc, tid);
2059         if (!req) {
2060                 *skip = 1;
2061                 m = NULL;
2062                 dout("get_reply unknown tid %llu from osd%d\n", tid,
2063                      osd->o_osd);
2064                 goto out;
2065         }
2066
2067         if (req->r_con_filling_msg) {
2068                 dout("%s revoking msg %p from old con %p\n", __func__,
2069                      req->r_reply, req->r_con_filling_msg);
2070                 ceph_msg_revoke_incoming(req->r_reply);
2071                 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2072                 req->r_con_filling_msg = NULL;
2073         }
2074
2075         if (front > req->r_reply->front.iov_len) {
2076                 pr_warning("get_reply front %d > preallocated %d\n",
2077                            front, (int)req->r_reply->front.iov_len);
2078                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2079                 if (!m)
2080                         goto out;
2081                 ceph_msg_put(req->r_reply);
2082                 req->r_reply = m;
2083         }
2084         m = ceph_msg_get(req->r_reply);
2085
2086         if (data_len > 0) {
2087                 int want = calc_pages_for(req->r_page_alignment, data_len);
2088
2089                 if (req->r_pages && unlikely(req->r_num_pages < want)) {
2090                         pr_warning("tid %lld reply has %d bytes %d pages, we"
2091                                    " had only %d pages ready\n", tid, data_len,
2092                                    want, req->r_num_pages);
2093                         *skip = 1;
2094                         ceph_msg_put(m);
2095                         m = NULL;
2096                         goto out;
2097                 }
2098                 m->pages = req->r_pages;
2099                 m->nr_pages = req->r_num_pages;
2100                 m->page_alignment = req->r_page_alignment;
2101 #ifdef CONFIG_BLOCK
2102                 m->bio = req->r_bio;
2103 #endif
2104         }
2105         *skip = 0;
2106         req->r_con_filling_msg = con->ops->get(con);
2107         dout("get_reply tid %lld %p\n", tid, m);
2108
2109 out:
2110         mutex_unlock(&osdc->request_mutex);
2111         return m;
2112
2113 }
2114
2115 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2116                                   struct ceph_msg_header *hdr,
2117                                   int *skip)
2118 {
2119         struct ceph_osd *osd = con->private;
2120         int type = le16_to_cpu(hdr->type);
2121         int front = le32_to_cpu(hdr->front_len);
2122
2123         *skip = 0;
2124         switch (type) {
2125         case CEPH_MSG_OSD_MAP:
2126         case CEPH_MSG_WATCH_NOTIFY:
2127                 return ceph_msg_new(type, front, GFP_NOFS, false);
2128         case CEPH_MSG_OSD_OPREPLY:
2129                 return get_reply(con, hdr, skip);
2130         default:
2131                 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2132                         osd->o_osd);
2133                 *skip = 1;
2134                 return NULL;
2135         }
2136 }
2137
2138 /*
2139  * Wrappers to refcount containing ceph_osd struct
2140  */
2141 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2142 {
2143         struct ceph_osd *osd = con->private;
2144         if (get_osd(osd))
2145                 return con;
2146         return NULL;
2147 }
2148
2149 static void put_osd_con(struct ceph_connection *con)
2150 {
2151         struct ceph_osd *osd = con->private;
2152         put_osd(osd);
2153 }
2154
2155 /*
2156  * authentication
2157  */
2158 /*
2159  * Note: returned pointer is the address of a structure that's
2160  * managed separately.  Caller must *not* attempt to free it.
2161  */
2162 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2163                                         int *proto, int force_new)
2164 {
2165         struct ceph_osd *o = con->private;
2166         struct ceph_osd_client *osdc = o->o_osdc;
2167         struct ceph_auth_client *ac = osdc->client->monc.auth;
2168         struct ceph_auth_handshake *auth = &o->o_auth;
2169
2170         if (force_new && auth->authorizer) {
2171                 if (ac->ops && ac->ops->destroy_authorizer)
2172                         ac->ops->destroy_authorizer(ac, auth->authorizer);
2173                 auth->authorizer = NULL;
2174         }
2175         if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2176                 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2177                                                         auth);
2178                 if (ret)
2179                         return ERR_PTR(ret);
2180         }
2181         *proto = ac->protocol;
2182
2183         return auth;
2184 }
2185
2186
2187 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2188 {
2189         struct ceph_osd *o = con->private;
2190         struct ceph_osd_client *osdc = o->o_osdc;
2191         struct ceph_auth_client *ac = osdc->client->monc.auth;
2192
2193         /*
2194          * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2195          * XXX which do we do:  succeed or fail?
2196          */
2197         return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2198 }
2199
2200 static int invalidate_authorizer(struct ceph_connection *con)
2201 {
2202         struct ceph_osd *o = con->private;
2203         struct ceph_osd_client *osdc = o->o_osdc;
2204         struct ceph_auth_client *ac = osdc->client->monc.auth;
2205
2206         if (ac->ops && ac->ops->invalidate_authorizer)
2207                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2208
2209         return ceph_monc_validate_auth(&osdc->client->monc);
2210 }
2211
2212 static const struct ceph_connection_operations osd_con_ops = {
2213         .get = get_osd_con,
2214         .put = put_osd_con,
2215         .dispatch = dispatch,
2216         .get_authorizer = get_authorizer,
2217         .verify_authorizer_reply = verify_authorizer_reply,
2218         .invalidate_authorizer = invalidate_authorizer,
2219         .alloc_msg = alloc_msg,
2220         .fault = osd_reset,
2221 };