]> Pileus Git - ~andy/linux/blob - fs/dlm/lock.c
Merge tag 'tags/disintegrate-tile-20121009' into for-linus
[~andy/linux] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 error = -ENOTBLK;
691                 goto out_unlock;
692         }
693
694         if (from_other) {
695                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696                           from_nodeid, dir_nodeid, r->res_name);
697         }
698
699         if (dir_nodeid == our_nodeid) {
700                 /* When we are the dir nodeid, we can set the master
701                    node immediately */
702                 r->res_master_nodeid = our_nodeid;
703                 r->res_nodeid = 0;
704         } else {
705                 /* set_master will send_lookup to dir_nodeid */
706                 r->res_master_nodeid = 0;
707                 r->res_nodeid = -1;
708         }
709
710  out_add:
711         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712  out_unlock:
713         spin_unlock(&ls->ls_rsbtbl[b].lock);
714  out:
715         *r_ret = r;
716         return error;
717 }
718
719 /* During recovery, other nodes can send us new MSTCPY locks (from
720    dlm_recover_locks) before we've made ourself master (in
721    dlm_recover_masters). */
722
723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724                           uint32_t hash, uint32_t b,
725                           int dir_nodeid, int from_nodeid,
726                           unsigned int flags, struct dlm_rsb **r_ret)
727 {
728         struct dlm_rsb *r = NULL;
729         int our_nodeid = dlm_our_nodeid();
730         int recover = (flags & R_RECEIVE_RECOVER);
731         int error;
732
733  retry:
734         error = pre_rsb_struct(ls);
735         if (error < 0)
736                 goto out;
737
738         spin_lock(&ls->ls_rsbtbl[b].lock);
739
740         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741         if (error)
742                 goto do_toss;
743
744         /*
745          * rsb is active, so we can't check master_nodeid without lock_rsb.
746          */
747
748         kref_get(&r->res_ref);
749         goto out_unlock;
750
751
752  do_toss:
753         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754         if (error)
755                 goto do_new;
756
757         /*
758          * rsb found inactive. No other thread is using this rsb because
759          * it's on the toss list, so we can look at or update
760          * res_master_nodeid without lock_rsb.
761          */
762
763         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764                 /* our rsb is not master, and another node has sent us a
765                    request; this should never happen */
766                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767                           from_nodeid, r->res_master_nodeid, dir_nodeid);
768                 dlm_print_rsb(r);
769                 error = -ENOTBLK;
770                 goto out_unlock;
771         }
772
773         if (!recover && (r->res_master_nodeid != our_nodeid) &&
774             (dir_nodeid == our_nodeid)) {
775                 /* our rsb is not master, and we are dir; may as well fix it;
776                    this should never happen */
777                 log_error(ls, "find_rsb toss our %d master %d dir %d",
778                           our_nodeid, r->res_master_nodeid, dir_nodeid);
779                 dlm_print_rsb(r);
780                 r->res_master_nodeid = our_nodeid;
781                 r->res_nodeid = 0;
782         }
783
784         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786         goto out_unlock;
787
788
789  do_new:
790         /*
791          * rsb not found
792          */
793
794         error = get_rsb_struct(ls, name, len, &r);
795         if (error == -EAGAIN) {
796                 spin_unlock(&ls->ls_rsbtbl[b].lock);
797                 goto retry;
798         }
799         if (error)
800                 goto out_unlock;
801
802         r->res_hash = hash;
803         r->res_bucket = b;
804         r->res_dir_nodeid = dir_nodeid;
805         r->res_master_nodeid = dir_nodeid;
806         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807         kref_init(&r->res_ref);
808
809         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810  out_unlock:
811         spin_unlock(&ls->ls_rsbtbl[b].lock);
812  out:
813         *r_ret = r;
814         return error;
815 }
816
817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818                     unsigned int flags, struct dlm_rsb **r_ret)
819 {
820         uint32_t hash, b;
821         int dir_nodeid;
822
823         if (len > DLM_RESNAME_MAXLEN)
824                 return -EINVAL;
825
826         hash = jhash(name, len, 0);
827         b = hash & (ls->ls_rsbtbl_size - 1);
828
829         dir_nodeid = dlm_hash2nodeid(ls, hash);
830
831         if (dlm_no_directory(ls))
832                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833                                       from_nodeid, flags, r_ret);
834         else
835                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836                                       from_nodeid, flags, r_ret);
837 }
838
839 /* we have received a request and found that res_master_nodeid != our_nodeid,
840    so we need to return an error or make ourself the master */
841
842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843                                   int from_nodeid)
844 {
845         if (dlm_no_directory(ls)) {
846                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847                           from_nodeid, r->res_master_nodeid,
848                           r->res_dir_nodeid);
849                 dlm_print_rsb(r);
850                 return -ENOTBLK;
851         }
852
853         if (from_nodeid != r->res_dir_nodeid) {
854                 /* our rsb is not master, and another node (not the dir node)
855                    has sent us a request.  this is much more common when our
856                    master_nodeid is zero, so limit debug to non-zero.  */
857
858                 if (r->res_master_nodeid) {
859                         log_debug(ls, "validate master from_other %d master %d "
860                                   "dir %d first %x %s", from_nodeid,
861                                   r->res_master_nodeid, r->res_dir_nodeid,
862                                   r->res_first_lkid, r->res_name);
863                 }
864                 return -ENOTBLK;
865         } else {
866                 /* our rsb is not master, but the dir nodeid has sent us a
867                    request; this could happen with master 0 / res_nodeid -1 */
868
869                 if (r->res_master_nodeid) {
870                         log_error(ls, "validate master from_dir %d master %d "
871                                   "first %x %s",
872                                   from_nodeid, r->res_master_nodeid,
873                                   r->res_first_lkid, r->res_name);
874                 }
875
876                 r->res_master_nodeid = dlm_our_nodeid();
877                 r->res_nodeid = 0;
878                 return 0;
879         }
880 }
881
882 /*
883  * We're the dir node for this res and another node wants to know the
884  * master nodeid.  During normal operation (non recovery) this is only
885  * called from receive_lookup(); master lookups when the local node is
886  * the dir node are done by find_rsb().
887  *
888  * normal operation, we are the dir node for a resource
889  * . _request_lock
890  * . set_master
891  * . send_lookup
892  * . receive_lookup
893  * . dlm_master_lookup flags 0
894  *
895  * recover directory, we are rebuilding dir for all resources
896  * . dlm_recover_directory
897  * . dlm_rcom_names
898  *   remote node sends back the rsb names it is master of and we are dir of
899  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900  *   we either create new rsb setting remote node as master, or find existing
901  *   rsb and set master to be the remote node.
902  *
903  * recover masters, we are finding the new master for resources
904  * . dlm_recover_masters
905  * . recover_master
906  * . dlm_send_rcom_lookup
907  * . receive_rcom_lookup
908  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
909  */
910
911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
912                       unsigned int flags, int *r_nodeid, int *result)
913 {
914         struct dlm_rsb *r = NULL;
915         uint32_t hash, b;
916         int from_master = (flags & DLM_LU_RECOVER_DIR);
917         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918         int our_nodeid = dlm_our_nodeid();
919         int dir_nodeid, error, toss_list = 0;
920
921         if (len > DLM_RESNAME_MAXLEN)
922                 return -EINVAL;
923
924         if (from_nodeid == our_nodeid) {
925                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926                           our_nodeid, flags);
927                 return -EINVAL;
928         }
929
930         hash = jhash(name, len, 0);
931         b = hash & (ls->ls_rsbtbl_size - 1);
932
933         dir_nodeid = dlm_hash2nodeid(ls, hash);
934         if (dir_nodeid != our_nodeid) {
935                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936                           from_nodeid, dir_nodeid, our_nodeid, hash,
937                           ls->ls_num_nodes);
938                 *r_nodeid = -1;
939                 return -EINVAL;
940         }
941
942  retry:
943         error = pre_rsb_struct(ls);
944         if (error < 0)
945                 return error;
946
947         spin_lock(&ls->ls_rsbtbl[b].lock);
948         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949         if (!error) {
950                 /* because the rsb is active, we need to lock_rsb before
951                    checking/changing re_master_nodeid */
952
953                 hold_rsb(r);
954                 spin_unlock(&ls->ls_rsbtbl[b].lock);
955                 lock_rsb(r);
956                 goto found;
957         }
958
959         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960         if (error)
961                 goto not_found;
962
963         /* because the rsb is inactive (on toss list), it's not refcounted
964            and lock_rsb is not used, but is protected by the rsbtbl lock */
965
966         toss_list = 1;
967  found:
968         if (r->res_dir_nodeid != our_nodeid) {
969                 /* should not happen, but may as well fix it and carry on */
970                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971                           r->res_dir_nodeid, our_nodeid, r->res_name);
972                 r->res_dir_nodeid = our_nodeid;
973         }
974
975         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976                 /* Recovery uses this function to set a new master when
977                    the previous master failed.  Setting NEW_MASTER will
978                    force dlm_recover_masters to call recover_master on this
979                    rsb even though the res_nodeid is no longer removed. */
980
981                 r->res_master_nodeid = from_nodeid;
982                 r->res_nodeid = from_nodeid;
983                 rsb_set_flag(r, RSB_NEW_MASTER);
984
985                 if (toss_list) {
986                         /* I don't think we should ever find it on toss list. */
987                         log_error(ls, "dlm_master_lookup fix_master on toss");
988                         dlm_dump_rsb(r);
989                 }
990         }
991
992         if (from_master && (r->res_master_nodeid != from_nodeid)) {
993                 /* this will happen if from_nodeid became master during
994                    a previous recovery cycle, and we aborted the previous
995                    cycle before recovering this master value */
996
997                 log_limit(ls, "dlm_master_lookup from_master %d "
998                           "master_nodeid %d res_nodeid %d first %x %s",
999                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000                           r->res_first_lkid, r->res_name);
1001
1002                 if (r->res_master_nodeid == our_nodeid) {
1003                         log_error(ls, "from_master %d our_master", from_nodeid);
1004                         dlm_dump_rsb(r);
1005                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1006                         goto out_found;
1007                 }
1008
1009                 r->res_master_nodeid = from_nodeid;
1010                 r->res_nodeid = from_nodeid;
1011                 rsb_set_flag(r, RSB_NEW_MASTER);
1012         }
1013
1014         if (!r->res_master_nodeid) {
1015                 /* this will happen if recovery happens while we're looking
1016                    up the master for this rsb */
1017
1018                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019                           from_nodeid, r->res_first_lkid, r->res_name);
1020                 r->res_master_nodeid = from_nodeid;
1021                 r->res_nodeid = from_nodeid;
1022         }
1023
1024         if (!from_master && !fix_master &&
1025             (r->res_master_nodeid == from_nodeid)) {
1026                 /* this can happen when the master sends remove, the dir node
1027                    finds the rsb on the keep list and ignores the remove,
1028                    and the former master sends a lookup */
1029
1030                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031                           "first %x %s", from_nodeid, flags,
1032                           r->res_first_lkid, r->res_name);
1033         }
1034
1035  out_found:
1036         *r_nodeid = r->res_master_nodeid;
1037         if (result)
1038                 *result = DLM_LU_MATCH;
1039
1040         if (toss_list) {
1041                 r->res_toss_time = jiffies;
1042                 /* the rsb was inactive (on toss list) */
1043                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1044         } else {
1045                 /* the rsb was active */
1046                 unlock_rsb(r);
1047                 put_rsb(r);
1048         }
1049         return 0;
1050
1051  not_found:
1052         error = get_rsb_struct(ls, name, len, &r);
1053         if (error == -EAGAIN) {
1054                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1055                 goto retry;
1056         }
1057         if (error)
1058                 goto out_unlock;
1059
1060         r->res_hash = hash;
1061         r->res_bucket = b;
1062         r->res_dir_nodeid = our_nodeid;
1063         r->res_master_nodeid = from_nodeid;
1064         r->res_nodeid = from_nodeid;
1065         kref_init(&r->res_ref);
1066         r->res_toss_time = jiffies;
1067
1068         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069         if (error) {
1070                 /* should never happen */
1071                 dlm_free_rsb(r);
1072                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1073                 goto retry;
1074         }
1075
1076         if (result)
1077                 *result = DLM_LU_ADD;
1078         *r_nodeid = from_nodeid;
1079         error = 0;
1080  out_unlock:
1081         spin_unlock(&ls->ls_rsbtbl[b].lock);
1082         return error;
1083 }
1084
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086 {
1087         struct rb_node *n;
1088         struct dlm_rsb *r;
1089         int i;
1090
1091         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092                 spin_lock(&ls->ls_rsbtbl[i].lock);
1093                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095                         if (r->res_hash == hash)
1096                                 dlm_dump_rsb(r);
1097                 }
1098                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1099         }
1100 }
1101
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103 {
1104         struct dlm_rsb *r = NULL;
1105         uint32_t hash, b;
1106         int error;
1107
1108         hash = jhash(name, len, 0);
1109         b = hash & (ls->ls_rsbtbl_size - 1);
1110
1111         spin_lock(&ls->ls_rsbtbl[b].lock);
1112         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113         if (!error)
1114                 goto out_dump;
1115
1116         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117         if (error)
1118                 goto out;
1119  out_dump:
1120         dlm_dump_rsb(r);
1121  out:
1122         spin_unlock(&ls->ls_rsbtbl[b].lock);
1123 }
1124
1125 static void toss_rsb(struct kref *kref)
1126 {
1127         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128         struct dlm_ls *ls = r->res_ls;
1129
1130         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131         kref_init(&r->res_ref);
1132         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134         r->res_toss_time = jiffies;
1135         if (r->res_lvbptr) {
1136                 dlm_free_lvb(r->res_lvbptr);
1137                 r->res_lvbptr = NULL;
1138         }
1139 }
1140
1141 /* See comment for unhold_lkb */
1142
1143 static void unhold_rsb(struct dlm_rsb *r)
1144 {
1145         int rv;
1146         rv = kref_put(&r->res_ref, toss_rsb);
1147         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1148 }
1149
1150 static void kill_rsb(struct kref *kref)
1151 {
1152         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1153
1154         /* All work is done after the return from kref_put() so we
1155            can release the write_lock before the remove and free. */
1156
1157         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1163 }
1164
1165 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166    The rsb must exist as long as any lkb's for it do. */
1167
1168 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1169 {
1170         hold_rsb(r);
1171         lkb->lkb_resource = r;
1172 }
1173
1174 static void detach_lkb(struct dlm_lkb *lkb)
1175 {
1176         if (lkb->lkb_resource) {
1177                 put_rsb(lkb->lkb_resource);
1178                 lkb->lkb_resource = NULL;
1179         }
1180 }
1181
1182 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1183 {
1184         struct dlm_lkb *lkb;
1185         int rv, id;
1186
1187         lkb = dlm_allocate_lkb(ls);
1188         if (!lkb)
1189                 return -ENOMEM;
1190
1191         lkb->lkb_nodeid = -1;
1192         lkb->lkb_grmode = DLM_LOCK_IV;
1193         kref_init(&lkb->lkb_ref);
1194         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196         INIT_LIST_HEAD(&lkb->lkb_time_list);
1197         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198         mutex_init(&lkb->lkb_cb_mutex);
1199         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1200
1201  retry:
1202         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1203         if (!rv)
1204                 return -ENOMEM;
1205
1206         spin_lock(&ls->ls_lkbidr_spin);
1207         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1208         if (!rv)
1209                 lkb->lkb_id = id;
1210         spin_unlock(&ls->ls_lkbidr_spin);
1211
1212         if (rv == -EAGAIN)
1213                 goto retry;
1214
1215         if (rv < 0) {
1216                 log_error(ls, "create_lkb idr error %d", rv);
1217                 return rv;
1218         }
1219
1220         *lkb_ret = lkb;
1221         return 0;
1222 }
1223
1224 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1225 {
1226         struct dlm_lkb *lkb;
1227
1228         spin_lock(&ls->ls_lkbidr_spin);
1229         lkb = idr_find(&ls->ls_lkbidr, lkid);
1230         if (lkb)
1231                 kref_get(&lkb->lkb_ref);
1232         spin_unlock(&ls->ls_lkbidr_spin);
1233
1234         *lkb_ret = lkb;
1235         return lkb ? 0 : -ENOENT;
1236 }
1237
1238 static void kill_lkb(struct kref *kref)
1239 {
1240         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1241
1242         /* All work is done after the return from kref_put() so we
1243            can release the write_lock before the detach_lkb */
1244
1245         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1246 }
1247
1248 /* __put_lkb() is used when an lkb may not have an rsb attached to
1249    it so we need to provide the lockspace explicitly */
1250
1251 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1252 {
1253         uint32_t lkid = lkb->lkb_id;
1254
1255         spin_lock(&ls->ls_lkbidr_spin);
1256         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257                 idr_remove(&ls->ls_lkbidr, lkid);
1258                 spin_unlock(&ls->ls_lkbidr_spin);
1259
1260                 detach_lkb(lkb);
1261
1262                 /* for local/process lkbs, lvbptr points to caller's lksb */
1263                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264                         dlm_free_lvb(lkb->lkb_lvbptr);
1265                 dlm_free_lkb(lkb);
1266                 return 1;
1267         } else {
1268                 spin_unlock(&ls->ls_lkbidr_spin);
1269                 return 0;
1270         }
1271 }
1272
1273 int dlm_put_lkb(struct dlm_lkb *lkb)
1274 {
1275         struct dlm_ls *ls;
1276
1277         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1279
1280         ls = lkb->lkb_resource->res_ls;
1281         return __put_lkb(ls, lkb);
1282 }
1283
1284 /* This is only called to add a reference when the code already holds
1285    a valid reference to the lkb, so there's no need for locking. */
1286
1287 static inline void hold_lkb(struct dlm_lkb *lkb)
1288 {
1289         kref_get(&lkb->lkb_ref);
1290 }
1291
1292 /* This is called when we need to remove a reference and are certain
1293    it's not the last ref.  e.g. del_lkb is always called between a
1294    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295    put_lkb would work fine, but would involve unnecessary locking */
1296
1297 static inline void unhold_lkb(struct dlm_lkb *lkb)
1298 {
1299         int rv;
1300         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1302 }
1303
1304 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1305                             int mode)
1306 {
1307         struct dlm_lkb *lkb = NULL;
1308
1309         list_for_each_entry(lkb, head, lkb_statequeue)
1310                 if (lkb->lkb_rqmode < mode)
1311                         break;
1312
1313         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1314 }
1315
1316 /* add/remove lkb to rsb's grant/convert/wait queue */
1317
1318 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1319 {
1320         kref_get(&lkb->lkb_ref);
1321
1322         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1323
1324         lkb->lkb_timestamp = ktime_get();
1325
1326         lkb->lkb_status = status;
1327
1328         switch (status) {
1329         case DLM_LKSTS_WAITING:
1330                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1332                 else
1333                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1334                 break;
1335         case DLM_LKSTS_GRANTED:
1336                 /* convention says granted locks kept in order of grmode */
1337                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1338                                 lkb->lkb_grmode);
1339                 break;
1340         case DLM_LKSTS_CONVERT:
1341                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1343                 else
1344                         list_add_tail(&lkb->lkb_statequeue,
1345                                       &r->res_convertqueue);
1346                 break;
1347         default:
1348                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1349         }
1350 }
1351
1352 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353 {
1354         lkb->lkb_status = 0;
1355         list_del(&lkb->lkb_statequeue);
1356         unhold_lkb(lkb);
1357 }
1358
1359 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1360 {
1361         hold_lkb(lkb);
1362         del_lkb(r, lkb);
1363         add_lkb(r, lkb, sts);
1364         unhold_lkb(lkb);
1365 }
1366
1367 static int msg_reply_type(int mstype)
1368 {
1369         switch (mstype) {
1370         case DLM_MSG_REQUEST:
1371                 return DLM_MSG_REQUEST_REPLY;
1372         case DLM_MSG_CONVERT:
1373                 return DLM_MSG_CONVERT_REPLY;
1374         case DLM_MSG_UNLOCK:
1375                 return DLM_MSG_UNLOCK_REPLY;
1376         case DLM_MSG_CANCEL:
1377                 return DLM_MSG_CANCEL_REPLY;
1378         case DLM_MSG_LOOKUP:
1379                 return DLM_MSG_LOOKUP_REPLY;
1380         }
1381         return -1;
1382 }
1383
1384 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1385 {
1386         int i;
1387
1388         for (i = 0; i < num_nodes; i++) {
1389                 if (!warned[i]) {
1390                         warned[i] = nodeid;
1391                         return 0;
1392                 }
1393                 if (warned[i] == nodeid)
1394                         return 1;
1395         }
1396         return 0;
1397 }
1398
1399 void dlm_scan_waiters(struct dlm_ls *ls)
1400 {
1401         struct dlm_lkb *lkb;
1402         ktime_t zero = ktime_set(0, 0);
1403         s64 us;
1404         s64 debug_maxus = 0;
1405         u32 debug_scanned = 0;
1406         u32 debug_expired = 0;
1407         int num_nodes = 0;
1408         int *warned = NULL;
1409
1410         if (!dlm_config.ci_waitwarn_us)
1411                 return;
1412
1413         mutex_lock(&ls->ls_waiters_mutex);
1414
1415         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1416                 if (ktime_equal(lkb->lkb_wait_time, zero))
1417                         continue;
1418
1419                 debug_scanned++;
1420
1421                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1422
1423                 if (us < dlm_config.ci_waitwarn_us)
1424                         continue;
1425
1426                 lkb->lkb_wait_time = zero;
1427
1428                 debug_expired++;
1429                 if (us > debug_maxus)
1430                         debug_maxus = us;
1431
1432                 if (!num_nodes) {
1433                         num_nodes = ls->ls_num_nodes;
1434                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1435                 }
1436                 if (!warned)
1437                         continue;
1438                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1439                         continue;
1440
1441                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1442                           "node %d", lkb->lkb_id, (long long)us,
1443                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1444         }
1445         mutex_unlock(&ls->ls_waiters_mutex);
1446         kfree(warned);
1447
1448         if (debug_expired)
1449                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450                           debug_scanned, debug_expired,
1451                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1452 }
1453
1454 /* add/remove lkb from global waiters list of lkb's waiting for
1455    a reply from a remote node */
1456
1457 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1458 {
1459         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1460         int error = 0;
1461
1462         mutex_lock(&ls->ls_waiters_mutex);
1463
1464         if (is_overlap_unlock(lkb) ||
1465             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1466                 error = -EINVAL;
1467                 goto out;
1468         }
1469
1470         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1471                 switch (mstype) {
1472                 case DLM_MSG_UNLOCK:
1473                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1474                         break;
1475                 case DLM_MSG_CANCEL:
1476                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1477                         break;
1478                 default:
1479                         error = -EBUSY;
1480                         goto out;
1481                 }
1482                 lkb->lkb_wait_count++;
1483                 hold_lkb(lkb);
1484
1485                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487                           lkb->lkb_wait_count, lkb->lkb_flags);
1488                 goto out;
1489         }
1490
1491         DLM_ASSERT(!lkb->lkb_wait_count,
1492                    dlm_print_lkb(lkb);
1493                    printk("wait_count %d\n", lkb->lkb_wait_count););
1494
1495         lkb->lkb_wait_count++;
1496         lkb->lkb_wait_type = mstype;
1497         lkb->lkb_wait_time = ktime_get();
1498         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1499         hold_lkb(lkb);
1500         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1501  out:
1502         if (error)
1503                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1506         mutex_unlock(&ls->ls_waiters_mutex);
1507         return error;
1508 }
1509
1510 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1511    list as part of process_requestqueue (e.g. a lookup that has an optimized
1512    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513    set RESEND and dlm_recover_waiters_post() */
1514
1515 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516                                 struct dlm_message *ms)
1517 {
1518         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519         int overlap_done = 0;
1520
1521         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1523                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1524                 overlap_done = 1;
1525                 goto out_del;
1526         }
1527
1528         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1530                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1531                 overlap_done = 1;
1532                 goto out_del;
1533         }
1534
1535         /* Cancel state was preemptively cleared by a successful convert,
1536            see next comment, nothing to do. */
1537
1538         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541                           lkb->lkb_id, lkb->lkb_wait_type);
1542                 return -1;
1543         }
1544
1545         /* Remove for the convert reply, and premptively remove for the
1546            cancel reply.  A convert has been granted while there's still
1547            an outstanding cancel on it (the cancel is moot and the result
1548            in the cancel reply should be 0).  We preempt the cancel reply
1549            because the app gets the convert result and then can follow up
1550            with another op, like convert.  This subsequent op would see the
1551            lingering state of the cancel and fail with -EBUSY. */
1552
1553         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1557                           lkb->lkb_id);
1558                 lkb->lkb_wait_type = 0;
1559                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1560                 lkb->lkb_wait_count--;
1561                 goto out_del;
1562         }
1563
1564         /* N.B. type of reply may not always correspond to type of original
1565            msg due to lookup->request optimization, verify others? */
1566
1567         if (lkb->lkb_wait_type) {
1568                 lkb->lkb_wait_type = 0;
1569                 goto out_del;
1570         }
1571
1572         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574                   mstype, lkb->lkb_flags);
1575         return -1;
1576
1577  out_del:
1578         /* the force-unlock/cancel has completed and we haven't recvd a reply
1579            to the op that was in progress prior to the unlock/cancel; we
1580            give up on any reply to the earlier op.  FIXME: not sure when/how
1581            this would happen */
1582
1583         if (overlap_done && lkb->lkb_wait_type) {
1584                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586                 lkb->lkb_wait_count--;
1587                 lkb->lkb_wait_type = 0;
1588         }
1589
1590         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1591
1592         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593         lkb->lkb_wait_count--;
1594         if (!lkb->lkb_wait_count)
1595                 list_del_init(&lkb->lkb_wait_reply);
1596         unhold_lkb(lkb);
1597         return 0;
1598 }
1599
1600 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1601 {
1602         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1603         int error;
1604
1605         mutex_lock(&ls->ls_waiters_mutex);
1606         error = _remove_from_waiters(lkb, mstype, NULL);
1607         mutex_unlock(&ls->ls_waiters_mutex);
1608         return error;
1609 }
1610
1611 /* Handles situations where we might be processing a "fake" or "stub" reply in
1612    which we can't try to take waiters_mutex again. */
1613
1614 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1615 {
1616         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1617         int error;
1618
1619         if (ms->m_flags != DLM_IFL_STUB_MS)
1620                 mutex_lock(&ls->ls_waiters_mutex);
1621         error = _remove_from_waiters(lkb, ms->m_type, ms);
1622         if (ms->m_flags != DLM_IFL_STUB_MS)
1623                 mutex_unlock(&ls->ls_waiters_mutex);
1624         return error;
1625 }
1626
1627 /* If there's an rsb for the same resource being removed, ensure
1628    that the remove message is sent before the new lookup message.
1629    It should be rare to need a delay here, but if not, then it may
1630    be worthwhile to add a proper wait mechanism rather than a delay. */
1631
1632 static void wait_pending_remove(struct dlm_rsb *r)
1633 {
1634         struct dlm_ls *ls = r->res_ls;
1635  restart:
1636         spin_lock(&ls->ls_remove_spin);
1637         if (ls->ls_remove_len &&
1638             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639                 log_debug(ls, "delay lookup for remove dir %d %s",
1640                           r->res_dir_nodeid, r->res_name);
1641                 spin_unlock(&ls->ls_remove_spin);
1642                 msleep(1);
1643                 goto restart;
1644         }
1645         spin_unlock(&ls->ls_remove_spin);
1646 }
1647
1648 /*
1649  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650  * read by other threads in wait_pending_remove.  ls_remove_names
1651  * and ls_remove_lens are only used by the scan thread, so they do
1652  * not need protection.
1653  */
1654
1655 static void shrink_bucket(struct dlm_ls *ls, int b)
1656 {
1657         struct rb_node *n, *next;
1658         struct dlm_rsb *r;
1659         char *name;
1660         int our_nodeid = dlm_our_nodeid();
1661         int remote_count = 0;
1662         int i, len, rv;
1663
1664         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1665
1666         spin_lock(&ls->ls_rsbtbl[b].lock);
1667         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1668                 next = rb_next(n);
1669                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1670
1671                 /* If we're the directory record for this rsb, and
1672                    we're not the master of it, then we need to wait
1673                    for the master node to send us a dir remove for
1674                    before removing the dir record. */
1675
1676                 if (!dlm_no_directory(ls) &&
1677                     (r->res_master_nodeid != our_nodeid) &&
1678                     (dlm_dir_nodeid(r) == our_nodeid)) {
1679                         continue;
1680                 }
1681
1682                 if (!time_after_eq(jiffies, r->res_toss_time +
1683                                    dlm_config.ci_toss_secs * HZ)) {
1684                         continue;
1685                 }
1686
1687                 if (!dlm_no_directory(ls) &&
1688                     (r->res_master_nodeid == our_nodeid) &&
1689                     (dlm_dir_nodeid(r) != our_nodeid)) {
1690
1691                         /* We're the master of this rsb but we're not
1692                            the directory record, so we need to tell the
1693                            dir node to remove the dir record. */
1694
1695                         ls->ls_remove_lens[remote_count] = r->res_length;
1696                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1697                                DLM_RESNAME_MAXLEN);
1698                         remote_count++;
1699
1700                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1701                                 break;
1702                         continue;
1703                 }
1704
1705                 if (!kref_put(&r->res_ref, kill_rsb)) {
1706                         log_error(ls, "tossed rsb in use %s", r->res_name);
1707                         continue;
1708                 }
1709
1710                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711                 dlm_free_rsb(r);
1712         }
1713         spin_unlock(&ls->ls_rsbtbl[b].lock);
1714
1715         /*
1716          * While searching for rsb's to free, we found some that require
1717          * remote removal.  We leave them in place and find them again here
1718          * so there is a very small gap between removing them from the toss
1719          * list and sending the removal.  Keeping this gap small is
1720          * important to keep us (the master node) from being out of sync
1721          * with the remote dir node for very long.
1722          *
1723          * From the time the rsb is removed from toss until just after
1724          * send_remove, the rsb name is saved in ls_remove_name.  A new
1725          * lookup checks this to ensure that a new lookup message for the
1726          * same resource name is not sent just before the remove message.
1727          */
1728
1729         for (i = 0; i < remote_count; i++) {
1730                 name = ls->ls_remove_names[i];
1731                 len = ls->ls_remove_lens[i];
1732
1733                 spin_lock(&ls->ls_rsbtbl[b].lock);
1734                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1735                 if (rv) {
1736                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1737                         log_debug(ls, "remove_name not toss %s", name);
1738                         continue;
1739                 }
1740
1741                 if (r->res_master_nodeid != our_nodeid) {
1742                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1743                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1744                                   r->res_master_nodeid, r->res_dir_nodeid,
1745                                   our_nodeid, name);
1746                         continue;
1747                 }
1748
1749                 if (r->res_dir_nodeid == our_nodeid) {
1750                         /* should never happen */
1751                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1752                         log_error(ls, "remove_name dir %d master %d our %d %s",
1753                                   r->res_dir_nodeid, r->res_master_nodeid,
1754                                   our_nodeid, name);
1755                         continue;
1756                 }
1757
1758                 if (!time_after_eq(jiffies, r->res_toss_time +
1759                                    dlm_config.ci_toss_secs * HZ)) {
1760                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1761                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762                                   r->res_toss_time, jiffies, name);
1763                         continue;
1764                 }
1765
1766                 if (!kref_put(&r->res_ref, kill_rsb)) {
1767                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1768                         log_error(ls, "remove_name in use %s", name);
1769                         continue;
1770                 }
1771
1772                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773
1774                 /* block lookup of same name until we've sent remove */
1775                 spin_lock(&ls->ls_remove_spin);
1776                 ls->ls_remove_len = len;
1777                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778                 spin_unlock(&ls->ls_remove_spin);
1779                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1780
1781                 send_remove(r);
1782
1783                 /* allow lookup of name again */
1784                 spin_lock(&ls->ls_remove_spin);
1785                 ls->ls_remove_len = 0;
1786                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787                 spin_unlock(&ls->ls_remove_spin);
1788
1789                 dlm_free_rsb(r);
1790         }
1791 }
1792
1793 void dlm_scan_rsbs(struct dlm_ls *ls)
1794 {
1795         int i;
1796
1797         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1798                 shrink_bucket(ls, i);
1799                 if (dlm_locking_stopped(ls))
1800                         break;
1801                 cond_resched();
1802         }
1803 }
1804
1805 static void add_timeout(struct dlm_lkb *lkb)
1806 {
1807         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1808
1809         if (is_master_copy(lkb))
1810                 return;
1811
1812         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1813             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1814                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1815                 goto add_it;
1816         }
1817         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1818                 goto add_it;
1819         return;
1820
1821  add_it:
1822         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1823         mutex_lock(&ls->ls_timeout_mutex);
1824         hold_lkb(lkb);
1825         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1826         mutex_unlock(&ls->ls_timeout_mutex);
1827 }
1828
1829 static void del_timeout(struct dlm_lkb *lkb)
1830 {
1831         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1832
1833         mutex_lock(&ls->ls_timeout_mutex);
1834         if (!list_empty(&lkb->lkb_time_list)) {
1835                 list_del_init(&lkb->lkb_time_list);
1836                 unhold_lkb(lkb);
1837         }
1838         mutex_unlock(&ls->ls_timeout_mutex);
1839 }
1840
1841 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1842    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1843    and then lock rsb because of lock ordering in add_timeout.  We may need
1844    to specify some special timeout-related bits in the lkb that are just to
1845    be accessed under the timeout_mutex. */
1846
1847 void dlm_scan_timeout(struct dlm_ls *ls)
1848 {
1849         struct dlm_rsb *r;
1850         struct dlm_lkb *lkb;
1851         int do_cancel, do_warn;
1852         s64 wait_us;
1853
1854         for (;;) {
1855                 if (dlm_locking_stopped(ls))
1856                         break;
1857
1858                 do_cancel = 0;
1859                 do_warn = 0;
1860                 mutex_lock(&ls->ls_timeout_mutex);
1861                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1862
1863                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1864                                                         lkb->lkb_timestamp));
1865
1866                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1867                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1868                                 do_cancel = 1;
1869
1870                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1871                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1872                                 do_warn = 1;
1873
1874                         if (!do_cancel && !do_warn)
1875                                 continue;
1876                         hold_lkb(lkb);
1877                         break;
1878                 }
1879                 mutex_unlock(&ls->ls_timeout_mutex);
1880
1881                 if (!do_cancel && !do_warn)
1882                         break;
1883
1884                 r = lkb->lkb_resource;
1885                 hold_rsb(r);
1886                 lock_rsb(r);
1887
1888                 if (do_warn) {
1889                         /* clear flag so we only warn once */
1890                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1891                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1892                                 del_timeout(lkb);
1893                         dlm_timeout_warn(lkb);
1894                 }
1895
1896                 if (do_cancel) {
1897                         log_debug(ls, "timeout cancel %x node %d %s",
1898                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1899                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1900                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1901                         del_timeout(lkb);
1902                         _cancel_lock(r, lkb);
1903                 }
1904
1905                 unlock_rsb(r);
1906                 unhold_rsb(r);
1907                 dlm_put_lkb(lkb);
1908         }
1909 }
1910
1911 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1912    dlm_recoverd before checking/setting ls_recover_begin. */
1913
1914 void dlm_adjust_timeouts(struct dlm_ls *ls)
1915 {
1916         struct dlm_lkb *lkb;
1917         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1918
1919         ls->ls_recover_begin = 0;
1920         mutex_lock(&ls->ls_timeout_mutex);
1921         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1922                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1923         mutex_unlock(&ls->ls_timeout_mutex);
1924
1925         if (!dlm_config.ci_waitwarn_us)
1926                 return;
1927
1928         mutex_lock(&ls->ls_waiters_mutex);
1929         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1930                 if (ktime_to_us(lkb->lkb_wait_time))
1931                         lkb->lkb_wait_time = ktime_get();
1932         }
1933         mutex_unlock(&ls->ls_waiters_mutex);
1934 }
1935
1936 /* lkb is master or local copy */
1937
1938 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1939 {
1940         int b, len = r->res_ls->ls_lvblen;
1941
1942         /* b=1 lvb returned to caller
1943            b=0 lvb written to rsb or invalidated
1944            b=-1 do nothing */
1945
1946         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1947
1948         if (b == 1) {
1949                 if (!lkb->lkb_lvbptr)
1950                         return;
1951
1952                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1953                         return;
1954
1955                 if (!r->res_lvbptr)
1956                         return;
1957
1958                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1959                 lkb->lkb_lvbseq = r->res_lvbseq;
1960
1961         } else if (b == 0) {
1962                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1963                         rsb_set_flag(r, RSB_VALNOTVALID);
1964                         return;
1965                 }
1966
1967                 if (!lkb->lkb_lvbptr)
1968                         return;
1969
1970                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1971                         return;
1972
1973                 if (!r->res_lvbptr)
1974                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1975
1976                 if (!r->res_lvbptr)
1977                         return;
1978
1979                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1980                 r->res_lvbseq++;
1981                 lkb->lkb_lvbseq = r->res_lvbseq;
1982                 rsb_clear_flag(r, RSB_VALNOTVALID);
1983         }
1984
1985         if (rsb_flag(r, RSB_VALNOTVALID))
1986                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1987 }
1988
1989 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1990 {
1991         if (lkb->lkb_grmode < DLM_LOCK_PW)
1992                 return;
1993
1994         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1995                 rsb_set_flag(r, RSB_VALNOTVALID);
1996                 return;
1997         }
1998
1999         if (!lkb->lkb_lvbptr)
2000                 return;
2001
2002         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2003                 return;
2004
2005         if (!r->res_lvbptr)
2006                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2007
2008         if (!r->res_lvbptr)
2009                 return;
2010
2011         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2012         r->res_lvbseq++;
2013         rsb_clear_flag(r, RSB_VALNOTVALID);
2014 }
2015
2016 /* lkb is process copy (pc) */
2017
2018 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2019                             struct dlm_message *ms)
2020 {
2021         int b;
2022
2023         if (!lkb->lkb_lvbptr)
2024                 return;
2025
2026         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2027                 return;
2028
2029         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2030         if (b == 1) {
2031                 int len = receive_extralen(ms);
2032                 if (len > DLM_RESNAME_MAXLEN)
2033                         len = DLM_RESNAME_MAXLEN;
2034                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2035                 lkb->lkb_lvbseq = ms->m_lvbseq;
2036         }
2037 }
2038
2039 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2040    remove_lock -- used for unlock, removes lkb from granted
2041    revert_lock -- used for cancel, moves lkb from convert to granted
2042    grant_lock  -- used for request and convert, adds lkb to granted or
2043                   moves lkb from convert or waiting to granted
2044
2045    Each of these is used for master or local copy lkb's.  There is
2046    also a _pc() variation used to make the corresponding change on
2047    a process copy (pc) lkb. */
2048
2049 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051         del_lkb(r, lkb);
2052         lkb->lkb_grmode = DLM_LOCK_IV;
2053         /* this unhold undoes the original ref from create_lkb()
2054            so this leads to the lkb being freed */
2055         unhold_lkb(lkb);
2056 }
2057
2058 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060         set_lvb_unlock(r, lkb);
2061         _remove_lock(r, lkb);
2062 }
2063
2064 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2065 {
2066         _remove_lock(r, lkb);
2067 }
2068
2069 /* returns: 0 did nothing
2070             1 moved lock to granted
2071            -1 removed lock */
2072
2073 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2074 {
2075         int rv = 0;
2076
2077         lkb->lkb_rqmode = DLM_LOCK_IV;
2078
2079         switch (lkb->lkb_status) {
2080         case DLM_LKSTS_GRANTED:
2081                 break;
2082         case DLM_LKSTS_CONVERT:
2083                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2084                 rv = 1;
2085                 break;
2086         case DLM_LKSTS_WAITING:
2087                 del_lkb(r, lkb);
2088                 lkb->lkb_grmode = DLM_LOCK_IV;
2089                 /* this unhold undoes the original ref from create_lkb()
2090                    so this leads to the lkb being freed */
2091                 unhold_lkb(lkb);
2092                 rv = -1;
2093                 break;
2094         default:
2095                 log_print("invalid status for revert %d", lkb->lkb_status);
2096         }
2097         return rv;
2098 }
2099
2100 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101 {
2102         return revert_lock(r, lkb);
2103 }
2104
2105 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 {
2107         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2108                 lkb->lkb_grmode = lkb->lkb_rqmode;
2109                 if (lkb->lkb_status)
2110                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2111                 else
2112                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2113         }
2114
2115         lkb->lkb_rqmode = DLM_LOCK_IV;
2116         lkb->lkb_highbast = 0;
2117 }
2118
2119 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2120 {
2121         set_lvb_lock(r, lkb);
2122         _grant_lock(r, lkb);
2123 }
2124
2125 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126                           struct dlm_message *ms)
2127 {
2128         set_lvb_lock_pc(r, lkb, ms);
2129         _grant_lock(r, lkb);
2130 }
2131
2132 /* called by grant_pending_locks() which means an async grant message must
2133    be sent to the requesting node in addition to granting the lock if the
2134    lkb belongs to a remote node. */
2135
2136 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2137 {
2138         grant_lock(r, lkb);
2139         if (is_master_copy(lkb))
2140                 send_grant(r, lkb);
2141         else
2142                 queue_cast(r, lkb, 0);
2143 }
2144
2145 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2146    change the granted/requested modes.  We're munging things accordingly in
2147    the process copy.
2148    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2149    conversion deadlock
2150    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2151    compatible with other granted locks */
2152
2153 static void munge_demoted(struct dlm_lkb *lkb)
2154 {
2155         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2156                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2157                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2158                 return;
2159         }
2160
2161         lkb->lkb_grmode = DLM_LOCK_NL;
2162 }
2163
2164 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2165 {
2166         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2167             ms->m_type != DLM_MSG_GRANT) {
2168                 log_print("munge_altmode %x invalid reply type %d",
2169                           lkb->lkb_id, ms->m_type);
2170                 return;
2171         }
2172
2173         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2174                 lkb->lkb_rqmode = DLM_LOCK_PR;
2175         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2176                 lkb->lkb_rqmode = DLM_LOCK_CW;
2177         else {
2178                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2179                 dlm_print_lkb(lkb);
2180         }
2181 }
2182
2183 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2184 {
2185         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2186                                            lkb_statequeue);
2187         if (lkb->lkb_id == first->lkb_id)
2188                 return 1;
2189
2190         return 0;
2191 }
2192
2193 /* Check if the given lkb conflicts with another lkb on the queue. */
2194
2195 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2196 {
2197         struct dlm_lkb *this;
2198
2199         list_for_each_entry(this, head, lkb_statequeue) {
2200                 if (this == lkb)
2201                         continue;
2202                 if (!modes_compat(this, lkb))
2203                         return 1;
2204         }
2205         return 0;
2206 }
2207
2208 /*
2209  * "A conversion deadlock arises with a pair of lock requests in the converting
2210  * queue for one resource.  The granted mode of each lock blocks the requested
2211  * mode of the other lock."
2212  *
2213  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2214  * convert queue from being granted, then deadlk/demote lkb.
2215  *
2216  * Example:
2217  * Granted Queue: empty
2218  * Convert Queue: NL->EX (first lock)
2219  *                PR->EX (second lock)
2220  *
2221  * The first lock can't be granted because of the granted mode of the second
2222  * lock and the second lock can't be granted because it's not first in the
2223  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2224  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2225  * flag set and return DEMOTED in the lksb flags.
2226  *
2227  * Originally, this function detected conv-deadlk in a more limited scope:
2228  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2229  * - if lkb1 was the first entry in the queue (not just earlier), and was
2230  *   blocked by the granted mode of lkb2, and there was nothing on the
2231  *   granted queue preventing lkb1 from being granted immediately, i.e.
2232  *   lkb2 was the only thing preventing lkb1 from being granted.
2233  *
2234  * That second condition meant we'd only say there was conv-deadlk if
2235  * resolving it (by demotion) would lead to the first lock on the convert
2236  * queue being granted right away.  It allowed conversion deadlocks to exist
2237  * between locks on the convert queue while they couldn't be granted anyway.
2238  *
2239  * Now, we detect and take action on conversion deadlocks immediately when
2240  * they're created, even if they may not be immediately consequential.  If
2241  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2242  * mode that would prevent lkb1's conversion from being granted, we do a
2243  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2244  * I think this means that the lkb_is_ahead condition below should always
2245  * be zero, i.e. there will never be conv-deadlk between two locks that are
2246  * both already on the convert queue.
2247  */
2248
2249 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2250 {
2251         struct dlm_lkb *lkb1;
2252         int lkb_is_ahead = 0;
2253
2254         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2255                 if (lkb1 == lkb2) {
2256                         lkb_is_ahead = 1;
2257                         continue;
2258                 }
2259
2260                 if (!lkb_is_ahead) {
2261                         if (!modes_compat(lkb2, lkb1))
2262                                 return 1;
2263                 } else {
2264                         if (!modes_compat(lkb2, lkb1) &&
2265                             !modes_compat(lkb1, lkb2))
2266                                 return 1;
2267                 }
2268         }
2269         return 0;
2270 }
2271
2272 /*
2273  * Return 1 if the lock can be granted, 0 otherwise.
2274  * Also detect and resolve conversion deadlocks.
2275  *
2276  * lkb is the lock to be granted
2277  *
2278  * now is 1 if the function is being called in the context of the
2279  * immediate request, it is 0 if called later, after the lock has been
2280  * queued.
2281  *
2282  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2283  * after recovery.
2284  *
2285  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2286  */
2287
2288 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2289                            int recover)
2290 {
2291         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2292
2293         /*
2294          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2295          * a new request for a NL mode lock being blocked.
2296          *
2297          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2298          * request, then it would be granted.  In essence, the use of this flag
2299          * tells the Lock Manager to expedite theis request by not considering
2300          * what may be in the CONVERTING or WAITING queues...  As of this
2301          * writing, the EXPEDITE flag can be used only with new requests for NL
2302          * mode locks.  This flag is not valid for conversion requests.
2303          *
2304          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2305          * conversion or used with a non-NL requested mode.  We also know an
2306          * EXPEDITE request is always granted immediately, so now must always
2307          * be 1.  The full condition to grant an expedite request: (now &&
2308          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2309          * therefore be shortened to just checking the flag.
2310          */
2311
2312         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2313                 return 1;
2314
2315         /*
2316          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2317          * added to the remaining conditions.
2318          */
2319
2320         if (queue_conflict(&r->res_grantqueue, lkb))
2321                 return 0;
2322
2323         /*
2324          * 6-3: By default, a conversion request is immediately granted if the
2325          * requested mode is compatible with the modes of all other granted
2326          * locks
2327          */
2328
2329         if (queue_conflict(&r->res_convertqueue, lkb))
2330                 return 0;
2331
2332         /*
2333          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2334          * locks for a recovered rsb, on which lkb's have been rebuilt.
2335          * The lkb's may have been rebuilt on the queues in a different
2336          * order than they were in on the previous master.  So, granting
2337          * queued conversions in order after recovery doesn't make sense
2338          * since the order hasn't been preserved anyway.  The new order
2339          * could also have created a new "in place" conversion deadlock.
2340          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2341          * After recovery, there would be no granted locks, and possibly
2342          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2343          * recovery, grant conversions without considering order.
2344          */
2345
2346         if (conv && recover)
2347                 return 1;
2348
2349         /*
2350          * 6-5: But the default algorithm for deciding whether to grant or
2351          * queue conversion requests does not by itself guarantee that such
2352          * requests are serviced on a "first come first serve" basis.  This, in
2353          * turn, can lead to a phenomenon known as "indefinate postponement".
2354          *
2355          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2356          * the system service employed to request a lock conversion.  This flag
2357          * forces certain conversion requests to be queued, even if they are
2358          * compatible with the granted modes of other locks on the same
2359          * resource.  Thus, the use of this flag results in conversion requests
2360          * being ordered on a "first come first servce" basis.
2361          *
2362          * DCT: This condition is all about new conversions being able to occur
2363          * "in place" while the lock remains on the granted queue (assuming
2364          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2365          * doesn't _have_ to go onto the convert queue where it's processed in
2366          * order.  The "now" variable is necessary to distinguish converts
2367          * being received and processed for the first time now, because once a
2368          * convert is moved to the conversion queue the condition below applies
2369          * requiring fifo granting.
2370          */
2371
2372         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2373                 return 1;
2374
2375         /*
2376          * Even if the convert is compat with all granted locks,
2377          * QUECVT forces it behind other locks on the convert queue.
2378          */
2379
2380         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2381                 if (list_empty(&r->res_convertqueue))
2382                         return 1;
2383                 else
2384                         return 0;
2385         }
2386
2387         /*
2388          * The NOORDER flag is set to avoid the standard vms rules on grant
2389          * order.
2390          */
2391
2392         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2393                 return 1;
2394
2395         /*
2396          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2397          * granted until all other conversion requests ahead of it are granted
2398          * and/or canceled.
2399          */
2400
2401         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2402                 return 1;
2403
2404         /*
2405          * 6-4: By default, a new request is immediately granted only if all
2406          * three of the following conditions are satisfied when the request is
2407          * issued:
2408          * - The queue of ungranted conversion requests for the resource is
2409          *   empty.
2410          * - The queue of ungranted new requests for the resource is empty.
2411          * - The mode of the new request is compatible with the most
2412          *   restrictive mode of all granted locks on the resource.
2413          */
2414
2415         if (now && !conv && list_empty(&r->res_convertqueue) &&
2416             list_empty(&r->res_waitqueue))
2417                 return 1;
2418
2419         /*
2420          * 6-4: Once a lock request is in the queue of ungranted new requests,
2421          * it cannot be granted until the queue of ungranted conversion
2422          * requests is empty, all ungranted new requests ahead of it are
2423          * granted and/or canceled, and it is compatible with the granted mode
2424          * of the most restrictive lock granted on the resource.
2425          */
2426
2427         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2428             first_in_list(lkb, &r->res_waitqueue))
2429                 return 1;
2430
2431         return 0;
2432 }
2433
2434 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2435                           int recover, int *err)
2436 {
2437         int rv;
2438         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2439         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2440
2441         if (err)
2442                 *err = 0;
2443
2444         rv = _can_be_granted(r, lkb, now, recover);
2445         if (rv)
2446                 goto out;
2447
2448         /*
2449          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2450          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2451          * cancels one of the locks.
2452          */
2453
2454         if (is_convert && can_be_queued(lkb) &&
2455             conversion_deadlock_detect(r, lkb)) {
2456                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2457                         lkb->lkb_grmode = DLM_LOCK_NL;
2458                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2459                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2460                         if (err)
2461                                 *err = -EDEADLK;
2462                         else {
2463                                 log_print("can_be_granted deadlock %x now %d",
2464                                           lkb->lkb_id, now);
2465                                 dlm_dump_rsb(r);
2466                         }
2467                 }
2468                 goto out;
2469         }
2470
2471         /*
2472          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2473          * to grant a request in a mode other than the normal rqmode.  It's a
2474          * simple way to provide a big optimization to applications that can
2475          * use them.
2476          */
2477
2478         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2479                 alt = DLM_LOCK_PR;
2480         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2481                 alt = DLM_LOCK_CW;
2482
2483         if (alt) {
2484                 lkb->lkb_rqmode = alt;
2485                 rv = _can_be_granted(r, lkb, now, 0);
2486                 if (rv)
2487                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2488                 else
2489                         lkb->lkb_rqmode = rqmode;
2490         }
2491  out:
2492         return rv;
2493 }
2494
2495 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2496    for locks pending on the convert list.  Once verified (watch for these
2497    log_prints), we should be able to just call _can_be_granted() and not
2498    bother with the demote/deadlk cases here (and there's no easy way to deal
2499    with a deadlk here, we'd have to generate something like grant_lock with
2500    the deadlk error.) */
2501
2502 /* Returns the highest requested mode of all blocked conversions; sets
2503    cw if there's a blocked conversion to DLM_LOCK_CW. */
2504
2505 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2506                                  unsigned int *count)
2507 {
2508         struct dlm_lkb *lkb, *s;
2509         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2510         int hi, demoted, quit, grant_restart, demote_restart;
2511         int deadlk;
2512
2513         quit = 0;
2514  restart:
2515         grant_restart = 0;
2516         demote_restart = 0;
2517         hi = DLM_LOCK_IV;
2518
2519         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2520                 demoted = is_demoted(lkb);
2521                 deadlk = 0;
2522
2523                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2524                         grant_lock_pending(r, lkb);
2525                         grant_restart = 1;
2526                         if (count)
2527                                 (*count)++;
2528                         continue;
2529                 }
2530
2531                 if (!demoted && is_demoted(lkb)) {
2532                         log_print("WARN: pending demoted %x node %d %s",
2533                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2534                         demote_restart = 1;
2535                         continue;
2536                 }
2537
2538                 if (deadlk) {
2539                         log_print("WARN: pending deadlock %x node %d %s",
2540                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2541                         dlm_dump_rsb(r);
2542                         continue;
2543                 }
2544
2545                 hi = max_t(int, lkb->lkb_rqmode, hi);
2546
2547                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2548                         *cw = 1;
2549         }
2550
2551         if (grant_restart)
2552                 goto restart;
2553         if (demote_restart && !quit) {
2554                 quit = 1;
2555                 goto restart;
2556         }
2557
2558         return max_t(int, high, hi);
2559 }
2560
2561 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2562                               unsigned int *count)
2563 {
2564         struct dlm_lkb *lkb, *s;
2565
2566         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2567                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2568                         grant_lock_pending(r, lkb);
2569                         if (count)
2570                                 (*count)++;
2571                 } else {
2572                         high = max_t(int, lkb->lkb_rqmode, high);
2573                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2574                                 *cw = 1;
2575                 }
2576         }
2577
2578         return high;
2579 }
2580
2581 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2582    on either the convert or waiting queue.
2583    high is the largest rqmode of all locks blocked on the convert or
2584    waiting queue. */
2585
2586 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2587 {
2588         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2589                 if (gr->lkb_highbast < DLM_LOCK_EX)
2590                         return 1;
2591                 return 0;
2592         }
2593
2594         if (gr->lkb_highbast < high &&
2595             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2596                 return 1;
2597         return 0;
2598 }
2599
2600 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2601 {
2602         struct dlm_lkb *lkb, *s;
2603         int high = DLM_LOCK_IV;
2604         int cw = 0;
2605
2606         if (!is_master(r)) {
2607                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2608                 dlm_dump_rsb(r);
2609                 return;
2610         }
2611
2612         high = grant_pending_convert(r, high, &cw, count);
2613         high = grant_pending_wait(r, high, &cw, count);
2614
2615         if (high == DLM_LOCK_IV)
2616                 return;
2617
2618         /*
2619          * If there are locks left on the wait/convert queue then send blocking
2620          * ASTs to granted locks based on the largest requested mode (high)
2621          * found above.
2622          */
2623
2624         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2625                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2626                         if (cw && high == DLM_LOCK_PR &&
2627                             lkb->lkb_grmode == DLM_LOCK_PR)
2628                                 queue_bast(r, lkb, DLM_LOCK_CW);
2629                         else
2630                                 queue_bast(r, lkb, high);
2631                         lkb->lkb_highbast = high;
2632                 }
2633         }
2634 }
2635
2636 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2637 {
2638         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2639             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2640                 if (gr->lkb_highbast < DLM_LOCK_EX)
2641                         return 1;
2642                 return 0;
2643         }
2644
2645         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2646                 return 1;
2647         return 0;
2648 }
2649
2650 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2651                             struct dlm_lkb *lkb)
2652 {
2653         struct dlm_lkb *gr;
2654
2655         list_for_each_entry(gr, head, lkb_statequeue) {
2656                 /* skip self when sending basts to convertqueue */
2657                 if (gr == lkb)
2658                         continue;
2659                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2660                         queue_bast(r, gr, lkb->lkb_rqmode);
2661                         gr->lkb_highbast = lkb->lkb_rqmode;
2662                 }
2663         }
2664 }
2665
2666 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2667 {
2668         send_bast_queue(r, &r->res_grantqueue, lkb);
2669 }
2670
2671 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2672 {
2673         send_bast_queue(r, &r->res_grantqueue, lkb);
2674         send_bast_queue(r, &r->res_convertqueue, lkb);
2675 }
2676
2677 /* set_master(r, lkb) -- set the master nodeid of a resource
2678
2679    The purpose of this function is to set the nodeid field in the given
2680    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2681    known, it can just be copied to the lkb and the function will return
2682    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2683    before it can be copied to the lkb.
2684
2685    When the rsb nodeid is being looked up remotely, the initial lkb
2686    causing the lookup is kept on the ls_waiters list waiting for the
2687    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2688    on the rsb's res_lookup list until the master is verified.
2689
2690    Return values:
2691    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2692    1: the rsb master is not available and the lkb has been placed on
2693       a wait queue
2694 */
2695
2696 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2697 {
2698         int our_nodeid = dlm_our_nodeid();
2699
2700         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2701                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2702                 r->res_first_lkid = lkb->lkb_id;
2703                 lkb->lkb_nodeid = r->res_nodeid;
2704                 return 0;
2705         }
2706
2707         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2708                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2709                 return 1;
2710         }
2711
2712         if (r->res_master_nodeid == our_nodeid) {
2713                 lkb->lkb_nodeid = 0;
2714                 return 0;
2715         }
2716
2717         if (r->res_master_nodeid) {
2718                 lkb->lkb_nodeid = r->res_master_nodeid;
2719                 return 0;
2720         }
2721
2722         if (dlm_dir_nodeid(r) == our_nodeid) {
2723                 /* This is a somewhat unusual case; find_rsb will usually
2724                    have set res_master_nodeid when dir nodeid is local, but
2725                    there are cases where we become the dir node after we've
2726                    past find_rsb and go through _request_lock again.
2727                    confirm_master() or process_lookup_list() needs to be
2728                    called after this. */
2729                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2730                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2731                           r->res_name);
2732                 r->res_master_nodeid = our_nodeid;
2733                 r->res_nodeid = 0;
2734                 lkb->lkb_nodeid = 0;
2735                 return 0;
2736         }
2737
2738         wait_pending_remove(r);
2739
2740         r->res_first_lkid = lkb->lkb_id;
2741         send_lookup(r, lkb);
2742         return 1;
2743 }
2744
2745 static void process_lookup_list(struct dlm_rsb *r)
2746 {
2747         struct dlm_lkb *lkb, *safe;
2748
2749         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2750                 list_del_init(&lkb->lkb_rsb_lookup);
2751                 _request_lock(r, lkb);
2752                 schedule();
2753         }
2754 }
2755
2756 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2757
2758 static void confirm_master(struct dlm_rsb *r, int error)
2759 {
2760         struct dlm_lkb *lkb;
2761
2762         if (!r->res_first_lkid)
2763                 return;
2764
2765         switch (error) {
2766         case 0:
2767         case -EINPROGRESS:
2768                 r->res_first_lkid = 0;
2769                 process_lookup_list(r);
2770                 break;
2771
2772         case -EAGAIN:
2773         case -EBADR:
2774         case -ENOTBLK:
2775                 /* the remote request failed and won't be retried (it was
2776                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2777                    lkb the first_lkid */
2778
2779                 r->res_first_lkid = 0;
2780
2781                 if (!list_empty(&r->res_lookup)) {
2782                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2783                                          lkb_rsb_lookup);
2784                         list_del_init(&lkb->lkb_rsb_lookup);
2785                         r->res_first_lkid = lkb->lkb_id;
2786                         _request_lock(r, lkb);
2787                 }
2788                 break;
2789
2790         default:
2791                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2792         }
2793 }
2794
2795 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2796                          int namelen, unsigned long timeout_cs,
2797                          void (*ast) (void *astparam),
2798                          void *astparam,
2799                          void (*bast) (void *astparam, int mode),
2800                          struct dlm_args *args)
2801 {
2802         int rv = -EINVAL;
2803
2804         /* check for invalid arg usage */
2805
2806         if (mode < 0 || mode > DLM_LOCK_EX)
2807                 goto out;
2808
2809         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2810                 goto out;
2811
2812         if (flags & DLM_LKF_CANCEL)
2813                 goto out;
2814
2815         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2816                 goto out;
2817
2818         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2819                 goto out;
2820
2821         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2822                 goto out;
2823
2824         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2825                 goto out;
2826
2827         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2828                 goto out;
2829
2830         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2831                 goto out;
2832
2833         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2834                 goto out;
2835
2836         if (!ast || !lksb)
2837                 goto out;
2838
2839         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2840                 goto out;
2841
2842         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2843                 goto out;
2844
2845         /* these args will be copied to the lkb in validate_lock_args,
2846            it cannot be done now because when converting locks, fields in
2847            an active lkb cannot be modified before locking the rsb */
2848
2849         args->flags = flags;
2850         args->astfn = ast;
2851         args->astparam = astparam;
2852         args->bastfn = bast;
2853         args->timeout = timeout_cs;
2854         args->mode = mode;
2855         args->lksb = lksb;
2856         rv = 0;
2857  out:
2858         return rv;
2859 }
2860
2861 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2862 {
2863         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2864                       DLM_LKF_FORCEUNLOCK))
2865                 return -EINVAL;
2866
2867         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2868                 return -EINVAL;
2869
2870         args->flags = flags;
2871         args->astparam = astarg;
2872         return 0;
2873 }
2874
2875 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2876                               struct dlm_args *args)
2877 {
2878         int rv = -EINVAL;
2879
2880         if (args->flags & DLM_LKF_CONVERT) {
2881                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2882                         goto out;
2883
2884                 if (args->flags & DLM_LKF_QUECVT &&
2885                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2886                         goto out;
2887
2888                 rv = -EBUSY;
2889                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2890                         goto out;
2891
2892                 if (lkb->lkb_wait_type)
2893                         goto out;
2894
2895                 if (is_overlap(lkb))
2896                         goto out;
2897         }
2898
2899         lkb->lkb_exflags = args->flags;
2900         lkb->lkb_sbflags = 0;
2901         lkb->lkb_astfn = args->astfn;
2902         lkb->lkb_astparam = args->astparam;
2903         lkb->lkb_bastfn = args->bastfn;
2904         lkb->lkb_rqmode = args->mode;
2905         lkb->lkb_lksb = args->lksb;
2906         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2907         lkb->lkb_ownpid = (int) current->pid;
2908         lkb->lkb_timeout_cs = args->timeout;
2909         rv = 0;
2910  out:
2911         if (rv)
2912                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2913                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2914                           lkb->lkb_status, lkb->lkb_wait_type,
2915                           lkb->lkb_resource->res_name);
2916         return rv;
2917 }
2918
2919 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2920    for success */
2921
2922 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2923    because there may be a lookup in progress and it's valid to do
2924    cancel/unlockf on it */
2925
2926 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2927 {
2928         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2929         int rv = -EINVAL;
2930
2931         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2932                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2933                 dlm_print_lkb(lkb);
2934                 goto out;
2935         }
2936
2937         /* an lkb may still exist even though the lock is EOL'ed due to a
2938            cancel, unlock or failed noqueue request; an app can't use these
2939            locks; return same error as if the lkid had not been found at all */
2940
2941         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2942                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2943                 rv = -ENOENT;
2944                 goto out;
2945         }
2946
2947         /* an lkb may be waiting for an rsb lookup to complete where the
2948            lookup was initiated by another lock */
2949
2950         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2951                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2952                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2953                         list_del_init(&lkb->lkb_rsb_lookup);
2954                         queue_cast(lkb->lkb_resource, lkb,
2955                                    args->flags & DLM_LKF_CANCEL ?
2956                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2957                         unhold_lkb(lkb); /* undoes create_lkb() */
2958                 }
2959                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2960                 rv = -EBUSY;
2961                 goto out;
2962         }
2963
2964         /* cancel not allowed with another cancel/unlock in progress */
2965
2966         if (args->flags & DLM_LKF_CANCEL) {
2967                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2968                         goto out;
2969
2970                 if (is_overlap(lkb))
2971                         goto out;
2972
2973                 /* don't let scand try to do a cancel */
2974                 del_timeout(lkb);
2975
2976                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2977                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2978                         rv = -EBUSY;
2979                         goto out;
2980                 }
2981
2982                 /* there's nothing to cancel */
2983                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2984                     !lkb->lkb_wait_type) {
2985                         rv = -EBUSY;
2986                         goto out;
2987                 }
2988
2989                 switch (lkb->lkb_wait_type) {
2990                 case DLM_MSG_LOOKUP:
2991                 case DLM_MSG_REQUEST:
2992                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2993                         rv = -EBUSY;
2994                         goto out;
2995                 case DLM_MSG_UNLOCK:
2996                 case DLM_MSG_CANCEL:
2997                         goto out;
2998                 }
2999                 /* add_to_waiters() will set OVERLAP_CANCEL */
3000                 goto out_ok;
3001         }
3002
3003         /* do we need to allow a force-unlock if there's a normal unlock
3004            already in progress?  in what conditions could the normal unlock
3005            fail such that we'd want to send a force-unlock to be sure? */
3006
3007         if (args->flags & DLM_LKF_FORCEUNLOCK) {
3008                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3009                         goto out;
3010
3011                 if (is_overlap_unlock(lkb))
3012                         goto out;
3013
3014                 /* don't let scand try to do a cancel */
3015                 del_timeout(lkb);
3016
3017                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3018                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3019                         rv = -EBUSY;
3020                         goto out;
3021                 }
3022
3023                 switch (lkb->lkb_wait_type) {
3024                 case DLM_MSG_LOOKUP:
3025                 case DLM_MSG_REQUEST:
3026                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3027                         rv = -EBUSY;
3028                         goto out;
3029                 case DLM_MSG_UNLOCK:
3030                         goto out;
3031                 }
3032                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3033                 goto out_ok;
3034         }
3035
3036         /* normal unlock not allowed if there's any op in progress */
3037         rv = -EBUSY;
3038         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3039                 goto out;
3040
3041  out_ok:
3042         /* an overlapping op shouldn't blow away exflags from other op */
3043         lkb->lkb_exflags |= args->flags;
3044         lkb->lkb_sbflags = 0;
3045         lkb->lkb_astparam = args->astparam;
3046         rv = 0;
3047  out:
3048         if (rv)
3049                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3050                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3051                           args->flags, lkb->lkb_wait_type,
3052                           lkb->lkb_resource->res_name);
3053         return rv;
3054 }
3055
3056 /*
3057  * Four stage 4 varieties:
3058  * do_request(), do_convert(), do_unlock(), do_cancel()
3059  * These are called on the master node for the given lock and
3060  * from the central locking logic.
3061  */
3062
3063 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3064 {
3065         int error = 0;
3066
3067         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3068                 grant_lock(r, lkb);
3069                 queue_cast(r, lkb, 0);
3070                 goto out;
3071         }
3072
3073         if (can_be_queued(lkb)) {
3074                 error = -EINPROGRESS;
3075                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3076                 add_timeout(lkb);
3077                 goto out;
3078         }
3079
3080         error = -EAGAIN;
3081         queue_cast(r, lkb, -EAGAIN);
3082  out:
3083         return error;
3084 }
3085
3086 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3087                                int error)
3088 {
3089         switch (error) {
3090         case -EAGAIN:
3091                 if (force_blocking_asts(lkb))
3092                         send_blocking_asts_all(r, lkb);
3093                 break;
3094         case -EINPROGRESS:
3095                 send_blocking_asts(r, lkb);
3096                 break;
3097         }
3098 }
3099
3100 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3101 {
3102         int error = 0;
3103         int deadlk = 0;
3104
3105         /* changing an existing lock may allow others to be granted */
3106
3107         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3108                 grant_lock(r, lkb);
3109                 queue_cast(r, lkb, 0);
3110                 goto out;
3111         }
3112
3113         /* can_be_granted() detected that this lock would block in a conversion
3114            deadlock, so we leave it on the granted queue and return EDEADLK in
3115            the ast for the convert. */
3116
3117         if (deadlk) {
3118                 /* it's left on the granted queue */
3119                 revert_lock(r, lkb);
3120                 queue_cast(r, lkb, -EDEADLK);
3121                 error = -EDEADLK;
3122                 goto out;
3123         }
3124
3125         /* is_demoted() means the can_be_granted() above set the grmode
3126            to NL, and left us on the granted queue.  This auto-demotion
3127            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3128            now grantable.  We have to try to grant other converting locks
3129            before we try again to grant this one. */
3130
3131         if (is_demoted(lkb)) {
3132                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3133                 if (_can_be_granted(r, lkb, 1, 0)) {
3134                         grant_lock(r, lkb);
3135                         queue_cast(r, lkb, 0);
3136                         goto out;
3137                 }
3138                 /* else fall through and move to convert queue */
3139         }
3140
3141         if (can_be_queued(lkb)) {
3142                 error = -EINPROGRESS;
3143                 del_lkb(r, lkb);
3144                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3145                 add_timeout(lkb);
3146                 goto out;
3147         }
3148
3149         error = -EAGAIN;
3150         queue_cast(r, lkb, -EAGAIN);
3151  out:
3152         return error;
3153 }
3154
3155 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3156                                int error)
3157 {
3158         switch (error) {
3159         case 0:
3160                 grant_pending_locks(r, NULL);
3161                 /* grant_pending_locks also sends basts */
3162                 break;
3163         case -EAGAIN:
3164                 if (force_blocking_asts(lkb))
3165                         send_blocking_asts_all(r, lkb);
3166                 break;
3167         case -EINPROGRESS:
3168                 send_blocking_asts(r, lkb);
3169                 break;
3170         }
3171 }
3172
3173 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3174 {
3175         remove_lock(r, lkb);
3176         queue_cast(r, lkb, -DLM_EUNLOCK);
3177         return -DLM_EUNLOCK;
3178 }
3179
3180 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3181                               int error)
3182 {
3183         grant_pending_locks(r, NULL);
3184 }
3185
3186 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3187
3188 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3189 {
3190         int error;
3191
3192         error = revert_lock(r, lkb);
3193         if (error) {
3194                 queue_cast(r, lkb, -DLM_ECANCEL);
3195                 return -DLM_ECANCEL;
3196         }
3197         return 0;
3198 }
3199
3200 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3201                               int error)
3202 {
3203         if (error)
3204                 grant_pending_locks(r, NULL);
3205 }
3206
3207 /*
3208  * Four stage 3 varieties:
3209  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3210  */
3211
3212 /* add a new lkb to a possibly new rsb, called by requesting process */
3213
3214 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3215 {
3216         int error;
3217
3218         /* set_master: sets lkb nodeid from r */
3219
3220         error = set_master(r, lkb);
3221         if (error < 0)
3222                 goto out;
3223         if (error) {
3224                 error = 0;
3225                 goto out;
3226         }
3227
3228         if (is_remote(r)) {
3229                 /* receive_request() calls do_request() on remote node */
3230                 error = send_request(r, lkb);
3231         } else {
3232                 error = do_request(r, lkb);
3233                 /* for remote locks the request_reply is sent
3234                    between do_request and do_request_effects */
3235                 do_request_effects(r, lkb, error);
3236         }
3237  out:
3238         return error;
3239 }
3240
3241 /* change some property of an existing lkb, e.g. mode */
3242
3243 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3244 {
3245         int error;
3246
3247         if (is_remote(r)) {
3248                 /* receive_convert() calls do_convert() on remote node */
3249                 error = send_convert(r, lkb);
3250         } else {
3251                 error = do_convert(r, lkb);
3252                 /* for remote locks the convert_reply is sent
3253                    between do_convert and do_convert_effects */
3254                 do_convert_effects(r, lkb, error);
3255         }
3256
3257         return error;
3258 }
3259
3260 /* remove an existing lkb from the granted queue */
3261
3262 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3263 {
3264         int error;
3265
3266         if (is_remote(r)) {
3267                 /* receive_unlock() calls do_unlock() on remote node */
3268                 error = send_unlock(r, lkb);
3269         } else {
3270                 error = do_unlock(r, lkb);
3271                 /* for remote locks the unlock_reply is sent
3272                    between do_unlock and do_unlock_effects */
3273                 do_unlock_effects(r, lkb, error);
3274         }
3275
3276         return error;
3277 }
3278
3279 /* remove an existing lkb from the convert or wait queue */
3280
3281 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3282 {
3283         int error;
3284
3285         if (is_remote(r)) {
3286                 /* receive_cancel() calls do_cancel() on remote node */
3287                 error = send_cancel(r, lkb);
3288         } else {
3289                 error = do_cancel(r, lkb);
3290                 /* for remote locks the cancel_reply is sent
3291                    between do_cancel and do_cancel_effects */
3292                 do_cancel_effects(r, lkb, error);
3293         }
3294
3295         return error;
3296 }
3297
3298 /*
3299  * Four stage 2 varieties:
3300  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3301  */
3302
3303 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3304                         int len, struct dlm_args *args)
3305 {
3306         struct dlm_rsb *r;
3307         int error;
3308
3309         error = validate_lock_args(ls, lkb, args);
3310         if (error)
3311                 return error;
3312
3313         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3314         if (error)
3315                 return error;
3316
3317         lock_rsb(r);
3318
3319         attach_lkb(r, lkb);
3320         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3321
3322         error = _request_lock(r, lkb);
3323
3324         unlock_rsb(r);
3325         put_rsb(r);
3326         return error;
3327 }
3328
3329 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3330                         struct dlm_args *args)
3331 {
3332         struct dlm_rsb *r;
3333         int error;
3334
3335         r = lkb->lkb_resource;
3336
3337         hold_rsb(r);
3338         lock_rsb(r);
3339
3340         error = validate_lock_args(ls, lkb, args);
3341         if (error)
3342                 goto out;
3343
3344         error = _convert_lock(r, lkb);
3345  out:
3346         unlock_rsb(r);
3347         put_rsb(r);
3348         return error;
3349 }
3350
3351 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3352                        struct dlm_args *args)
3353 {
3354         struct dlm_rsb *r;
3355         int error;
3356
3357         r = lkb->lkb_resource;
3358
3359         hold_rsb(r);
3360         lock_rsb(r);
3361
3362         error = validate_unlock_args(lkb, args);
3363         if (error)
3364                 goto out;
3365
3366         error = _unlock_lock(r, lkb);
3367  out:
3368         unlock_rsb(r);
3369         put_rsb(r);
3370         return error;
3371 }
3372
3373 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3374                        struct dlm_args *args)
3375 {
3376         struct dlm_rsb *r;
3377         int error;
3378
3379         r = lkb->lkb_resource;
3380
3381         hold_rsb(r);
3382         lock_rsb(r);
3383
3384         error = validate_unlock_args(lkb, args);
3385         if (error)
3386                 goto out;
3387
3388         error = _cancel_lock(r, lkb);
3389  out:
3390         unlock_rsb(r);
3391         put_rsb(r);
3392         return error;
3393 }
3394
3395 /*
3396  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3397  */
3398
3399 int dlm_lock(dlm_lockspace_t *lockspace,
3400              int mode,
3401              struct dlm_lksb *lksb,
3402              uint32_t flags,
3403              void *name,
3404              unsigned int namelen,
3405              uint32_t parent_lkid,
3406              void (*ast) (void *astarg),
3407              void *astarg,
3408              void (*bast) (void *astarg, int mode))
3409 {
3410         struct dlm_ls *ls;
3411         struct dlm_lkb *lkb;
3412         struct dlm_args args;
3413         int error, convert = flags & DLM_LKF_CONVERT;
3414
3415         ls = dlm_find_lockspace_local(lockspace);
3416         if (!ls)
3417                 return -EINVAL;
3418
3419         dlm_lock_recovery(ls);
3420
3421         if (convert)
3422                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3423         else
3424                 error = create_lkb(ls, &lkb);
3425
3426         if (error)
3427                 goto out;
3428
3429         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3430                               astarg, bast, &args);
3431         if (error)
3432                 goto out_put;
3433
3434         if (convert)
3435                 error = convert_lock(ls, lkb, &args);
3436         else
3437                 error = request_lock(ls, lkb, name, namelen, &args);
3438
3439         if (error == -EINPROGRESS)
3440                 error = 0;
3441  out_put:
3442         if (convert || error)
3443                 __put_lkb(ls, lkb);
3444         if (error == -EAGAIN || error == -EDEADLK)
3445                 error = 0;
3446  out:
3447         dlm_unlock_recovery(ls);
3448         dlm_put_lockspace(ls);
3449         return error;
3450 }
3451
3452 int dlm_unlock(dlm_lockspace_t *lockspace,
3453                uint32_t lkid,
3454                uint32_t flags,
3455                struct dlm_lksb *lksb,
3456                void *astarg)
3457 {
3458         struct dlm_ls *ls;
3459         struct dlm_lkb *lkb;
3460         struct dlm_args args;
3461         int error;
3462
3463         ls = dlm_find_lockspace_local(lockspace);
3464         if (!ls)
3465                 return -EINVAL;
3466
3467         dlm_lock_recovery(ls);
3468
3469         error = find_lkb(ls, lkid, &lkb);
3470         if (error)
3471                 goto out;
3472
3473         error = set_unlock_args(flags, astarg, &args);
3474         if (error)
3475                 goto out_put;
3476
3477         if (flags & DLM_LKF_CANCEL)
3478                 error = cancel_lock(ls, lkb, &args);
3479         else
3480                 error = unlock_lock(ls, lkb, &args);
3481
3482         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3483                 error = 0;
3484         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3485                 error = 0;
3486  out_put:
3487         dlm_put_lkb(lkb);
3488  out:
3489         dlm_unlock_recovery(ls);
3490         dlm_put_lockspace(ls);
3491         return error;
3492 }
3493
3494 /*
3495  * send/receive routines for remote operations and replies
3496  *
3497  * send_args
3498  * send_common
3499  * send_request                 receive_request
3500  * send_convert                 receive_convert
3501  * send_unlock                  receive_unlock
3502  * send_cancel                  receive_cancel
3503  * send_grant                   receive_grant
3504  * send_bast                    receive_bast
3505  * send_lookup                  receive_lookup
3506  * send_remove                  receive_remove
3507  *
3508  *                              send_common_reply
3509  * receive_request_reply        send_request_reply
3510  * receive_convert_reply        send_convert_reply
3511  * receive_unlock_reply         send_unlock_reply
3512  * receive_cancel_reply         send_cancel_reply
3513  * receive_lookup_reply         send_lookup_reply
3514  */
3515
3516 static int _create_message(struct dlm_ls *ls, int mb_len,
3517                            int to_nodeid, int mstype,
3518                            struct dlm_message **ms_ret,
3519                            struct dlm_mhandle **mh_ret)
3520 {
3521         struct dlm_message *ms;
3522         struct dlm_mhandle *mh;
3523         char *mb;
3524
3525         /* get_buffer gives us a message handle (mh) that we need to
3526            pass into lowcomms_commit and a message buffer (mb) that we
3527            write our data into */
3528
3529         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3530         if (!mh)
3531                 return -ENOBUFS;
3532
3533         memset(mb, 0, mb_len);
3534
3535         ms = (struct dlm_message *) mb;
3536
3537         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3538         ms->m_header.h_lockspace = ls->ls_global_id;
3539         ms->m_header.h_nodeid = dlm_our_nodeid();
3540         ms->m_header.h_length = mb_len;
3541         ms->m_header.h_cmd = DLM_MSG;
3542
3543         ms->m_type = mstype;
3544
3545         *mh_ret = mh;
3546         *ms_ret = ms;
3547         return 0;
3548 }
3549
3550 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551                           int to_nodeid, int mstype,
3552                           struct dlm_message **ms_ret,
3553                           struct dlm_mhandle **mh_ret)
3554 {
3555         int mb_len = sizeof(struct dlm_message);
3556
3557         switch (mstype) {
3558         case DLM_MSG_REQUEST:
3559         case DLM_MSG_LOOKUP:
3560         case DLM_MSG_REMOVE:
3561                 mb_len += r->res_length;
3562                 break;
3563         case DLM_MSG_CONVERT:
3564         case DLM_MSG_UNLOCK:
3565         case DLM_MSG_REQUEST_REPLY:
3566         case DLM_MSG_CONVERT_REPLY:
3567         case DLM_MSG_GRANT:
3568                 if (lkb && lkb->lkb_lvbptr)
3569                         mb_len += r->res_ls->ls_lvblen;
3570                 break;
3571         }
3572
3573         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3574                                ms_ret, mh_ret);
3575 }
3576
3577 /* further lowcomms enhancements or alternate implementations may make
3578    the return value from this function useful at some point */
3579
3580 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3581 {
3582         dlm_message_out(ms);
3583         dlm_lowcomms_commit_buffer(mh);
3584         return 0;
3585 }
3586
3587 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3588                       struct dlm_message *ms)
3589 {
3590         ms->m_nodeid   = lkb->lkb_nodeid;
3591         ms->m_pid      = lkb->lkb_ownpid;
3592         ms->m_lkid     = lkb->lkb_id;
3593         ms->m_remid    = lkb->lkb_remid;
3594         ms->m_exflags  = lkb->lkb_exflags;
3595         ms->m_sbflags  = lkb->lkb_sbflags;
3596         ms->m_flags    = lkb->lkb_flags;
3597         ms->m_lvbseq   = lkb->lkb_lvbseq;
3598         ms->m_status   = lkb->lkb_status;
3599         ms->m_grmode   = lkb->lkb_grmode;
3600         ms->m_rqmode   = lkb->lkb_rqmode;
3601         ms->m_hash     = r->res_hash;
3602
3603         /* m_result and m_bastmode are set from function args,
3604            not from lkb fields */
3605
3606         if (lkb->lkb_bastfn)
3607                 ms->m_asts |= DLM_CB_BAST;
3608         if (lkb->lkb_astfn)
3609                 ms->m_asts |= DLM_CB_CAST;
3610
3611         /* compare with switch in create_message; send_remove() doesn't
3612            use send_args() */
3613
3614         switch (ms->m_type) {
3615         case DLM_MSG_REQUEST:
3616         case DLM_MSG_LOOKUP:
3617                 memcpy(ms->m_extra, r->res_name, r->res_length);
3618                 break;
3619         case DLM_MSG_CONVERT:
3620         case DLM_MSG_UNLOCK:
3621         case DLM_MSG_REQUEST_REPLY:
3622         case DLM_MSG_CONVERT_REPLY:
3623         case DLM_MSG_GRANT:
3624                 if (!lkb->lkb_lvbptr)
3625                         break;
3626                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3627                 break;
3628         }
3629 }
3630
3631 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3632 {
3633         struct dlm_message *ms;
3634         struct dlm_mhandle *mh;
3635         int to_nodeid, error;
3636
3637         to_nodeid = r->res_nodeid;
3638
3639         error = add_to_waiters(lkb, mstype, to_nodeid);
3640         if (error)
3641                 return error;
3642
3643         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3644         if (error)
3645                 goto fail;
3646
3647         send_args(r, lkb, ms);
3648
3649         error = send_message(mh, ms);
3650         if (error)
3651                 goto fail;
3652         return 0;
3653
3654  fail:
3655         remove_from_waiters(lkb, msg_reply_type(mstype));
3656         return error;
3657 }
3658
3659 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3660 {
3661         return send_common(r, lkb, DLM_MSG_REQUEST);
3662 }
3663
3664 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665 {
3666         int error;
3667
3668         error = send_common(r, lkb, DLM_MSG_CONVERT);
3669
3670         /* down conversions go without a reply from the master */
3671         if (!error && down_conversion(lkb)) {
3672                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3673                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3674                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3675                 r->res_ls->ls_stub_ms.m_result = 0;
3676                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3677         }
3678
3679         return error;
3680 }
3681
3682 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3683    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3684    that the master is still correct. */
3685
3686 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3687 {
3688         return send_common(r, lkb, DLM_MSG_UNLOCK);
3689 }
3690
3691 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3692 {
3693         return send_common(r, lkb, DLM_MSG_CANCEL);
3694 }
3695
3696 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3697 {
3698         struct dlm_message *ms;
3699         struct dlm_mhandle *mh;
3700         int to_nodeid, error;
3701
3702         to_nodeid = lkb->lkb_nodeid;
3703
3704         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3705         if (error)
3706                 goto out;
3707
3708         send_args(r, lkb, ms);
3709
3710         ms->m_result = 0;
3711
3712         error = send_message(mh, ms);
3713  out:
3714         return error;
3715 }
3716
3717 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3718 {
3719         struct dlm_message *ms;
3720         struct dlm_mhandle *mh;
3721         int to_nodeid, error;
3722
3723         to_nodeid = lkb->lkb_nodeid;
3724
3725         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3726         if (error)
3727                 goto out;
3728
3729         send_args(r, lkb, ms);
3730
3731         ms->m_bastmode = mode;
3732
3733         error = send_message(mh, ms);
3734  out:
3735         return error;
3736 }
3737
3738 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3739 {
3740         struct dlm_message *ms;
3741         struct dlm_mhandle *mh;
3742         int to_nodeid, error;
3743
3744         to_nodeid = dlm_dir_nodeid(r);
3745
3746         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3747         if (error)
3748                 return error;
3749
3750         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3751         if (error)
3752                 goto fail;
3753
3754         send_args(r, lkb, ms);
3755
3756         error = send_message(mh, ms);
3757         if (error)
3758                 goto fail;
3759         return 0;
3760
3761  fail:
3762         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3763         return error;
3764 }
3765
3766 static int send_remove(struct dlm_rsb *r)
3767 {
3768         struct dlm_message *ms;
3769         struct dlm_mhandle *mh;
3770         int to_nodeid, error;
3771
3772         to_nodeid = dlm_dir_nodeid(r);
3773
3774         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3775         if (error)
3776                 goto out;
3777
3778         memcpy(ms->m_extra, r->res_name, r->res_length);
3779         ms->m_hash = r->res_hash;
3780
3781         error = send_message(mh, ms);
3782  out:
3783         return error;
3784 }
3785
3786 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3787                              int mstype, int rv)
3788 {
3789         struct dlm_message *ms;
3790         struct dlm_mhandle *mh;
3791         int to_nodeid, error;
3792
3793         to_nodeid = lkb->lkb_nodeid;
3794
3795         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3796         if (error)
3797                 goto out;
3798
3799         send_args(r, lkb, ms);
3800
3801         ms->m_result = rv;
3802
3803         error = send_message(mh, ms);
3804  out:
3805         return error;
3806 }
3807
3808 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3809 {
3810         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3811 }
3812
3813 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3814 {
3815         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3816 }
3817
3818 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3819 {
3820         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3821 }
3822
3823 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824 {
3825         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3826 }
3827
3828 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3829                              int ret_nodeid, int rv)
3830 {
3831         struct dlm_rsb *r = &ls->ls_stub_rsb;
3832         struct dlm_message *ms;
3833         struct dlm_mhandle *mh;
3834         int error, nodeid = ms_in->m_header.h_nodeid;
3835
3836         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3837         if (error)
3838                 goto out;
3839
3840         ms->m_lkid = ms_in->m_lkid;
3841         ms->m_result = rv;
3842         ms->m_nodeid = ret_nodeid;
3843
3844         error = send_message(mh, ms);
3845  out:
3846         return error;
3847 }
3848
3849 /* which args we save from a received message depends heavily on the type
3850    of message, unlike the send side where we can safely send everything about
3851    the lkb for any type of message */
3852
3853 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3854 {
3855         lkb->lkb_exflags = ms->m_exflags;
3856         lkb->lkb_sbflags = ms->m_sbflags;
3857         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3858                          (ms->m_flags & 0x0000FFFF);
3859 }
3860
3861 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3862 {
3863         if (ms->m_flags == DLM_IFL_STUB_MS)
3864                 return;
3865
3866         lkb->lkb_sbflags = ms->m_sbflags;
3867         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3868                          (ms->m_flags & 0x0000FFFF);
3869 }
3870
3871 static int receive_extralen(struct dlm_message *ms)
3872 {
3873         return (ms->m_header.h_length - sizeof(struct dlm_message));
3874 }
3875
3876 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3877                        struct dlm_message *ms)
3878 {
3879         int len;
3880
3881         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3882                 if (!lkb->lkb_lvbptr)
3883                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3884                 if (!lkb->lkb_lvbptr)
3885                         return -ENOMEM;
3886                 len = receive_extralen(ms);
3887                 if (len > DLM_RESNAME_MAXLEN)
3888                         len = DLM_RESNAME_MAXLEN;
3889                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3890         }
3891         return 0;
3892 }
3893
3894 static void fake_bastfn(void *astparam, int mode)
3895 {
3896         log_print("fake_bastfn should not be called");
3897 }
3898
3899 static void fake_astfn(void *astparam)
3900 {
3901         log_print("fake_astfn should not be called");
3902 }
3903
3904 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3905                                 struct dlm_message *ms)
3906 {
3907         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3908         lkb->lkb_ownpid = ms->m_pid;
3909         lkb->lkb_remid = ms->m_lkid;
3910         lkb->lkb_grmode = DLM_LOCK_IV;
3911         lkb->lkb_rqmode = ms->m_rqmode;
3912
3913         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3914         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3915
3916         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3917                 /* lkb was just created so there won't be an lvb yet */
3918                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3919                 if (!lkb->lkb_lvbptr)
3920                         return -ENOMEM;
3921         }
3922
3923         return 0;
3924 }
3925
3926 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3927                                 struct dlm_message *ms)
3928 {
3929         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3930                 return -EBUSY;
3931
3932         if (receive_lvb(ls, lkb, ms))
3933                 return -ENOMEM;
3934
3935         lkb->lkb_rqmode = ms->m_rqmode;
3936         lkb->lkb_lvbseq = ms->m_lvbseq;
3937
3938         return 0;
3939 }
3940
3941 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3942                                struct dlm_message *ms)
3943 {
3944         if (receive_lvb(ls, lkb, ms))
3945                 return -ENOMEM;
3946         return 0;
3947 }
3948
3949 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3950    uses to send a reply and that the remote end uses to process the reply. */
3951
3952 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3953 {
3954         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3955         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3956         lkb->lkb_remid = ms->m_lkid;
3957 }
3958
3959 /* This is called after the rsb is locked so that we can safely inspect
3960    fields in the lkb. */
3961
3962 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3963 {
3964         int from = ms->m_header.h_nodeid;
3965         int error = 0;
3966
3967         switch (ms->m_type) {
3968         case DLM_MSG_CONVERT:
3969         case DLM_MSG_UNLOCK:
3970         case DLM_MSG_CANCEL:
3971                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3972                         error = -EINVAL;
3973                 break;
3974
3975         case DLM_MSG_CONVERT_REPLY:
3976         case DLM_MSG_UNLOCK_REPLY:
3977         case DLM_MSG_CANCEL_REPLY:
3978         case DLM_MSG_GRANT:
3979         case DLM_MSG_BAST:
3980                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3981                         error = -EINVAL;
3982                 break;
3983
3984         case DLM_MSG_REQUEST_REPLY:
3985                 if (!is_process_copy(lkb))
3986                         error = -EINVAL;
3987                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3988                         error = -EINVAL;
3989                 break;
3990
3991         default:
3992                 error = -EINVAL;
3993         }
3994
3995         if (error)
3996                 log_error(lkb->lkb_resource->res_ls,
3997                           "ignore invalid message %d from %d %x %x %x %d",
3998                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3999                           lkb->lkb_flags, lkb->lkb_nodeid);
4000         return error;
4001 }
4002
4003 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4004 {
4005         char name[DLM_RESNAME_MAXLEN + 1];
4006         struct dlm_message *ms;
4007         struct dlm_mhandle *mh;
4008         struct dlm_rsb *r;
4009         uint32_t hash, b;
4010         int rv, dir_nodeid;
4011
4012         memset(name, 0, sizeof(name));
4013         memcpy(name, ms_name, len);
4014
4015         hash = jhash(name, len, 0);
4016         b = hash & (ls->ls_rsbtbl_size - 1);
4017
4018         dir_nodeid = dlm_hash2nodeid(ls, hash);
4019
4020         log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4021
4022         spin_lock(&ls->ls_rsbtbl[b].lock);
4023         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4024         if (!rv) {
4025                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4026                 log_error(ls, "repeat_remove on keep %s", name);
4027                 return;
4028         }
4029
4030         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4031         if (!rv) {
4032                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4033                 log_error(ls, "repeat_remove on toss %s", name);
4034                 return;
4035         }
4036
4037         /* use ls->remove_name2 to avoid conflict with shrink? */
4038
4039         spin_lock(&ls->ls_remove_spin);
4040         ls->ls_remove_len = len;
4041         memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4042         spin_unlock(&ls->ls_remove_spin);
4043         spin_unlock(&ls->ls_rsbtbl[b].lock);
4044
4045         rv = _create_message(ls, sizeof(struct dlm_message) + len,
4046                              dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4047         if (rv)
4048                 return;
4049
4050         memcpy(ms->m_extra, name, len);
4051         ms->m_hash = hash;
4052
4053         send_message(mh, ms);
4054
4055         spin_lock(&ls->ls_remove_spin);
4056         ls->ls_remove_len = 0;
4057         memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4058         spin_unlock(&ls->ls_remove_spin);
4059 }
4060
4061 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4062 {
4063         struct dlm_lkb *lkb;
4064         struct dlm_rsb *r;
4065         int from_nodeid;
4066         int error, namelen = 0;
4067
4068         from_nodeid = ms->m_header.h_nodeid;
4069
4070         error = create_lkb(ls, &lkb);
4071         if (error)
4072                 goto fail;
4073
4074         receive_flags(lkb, ms);
4075         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4076         error = receive_request_args(ls, lkb, ms);
4077         if (error) {
4078                 __put_lkb(ls, lkb);
4079                 goto fail;
4080         }
4081
4082         /* The dir node is the authority on whether we are the master
4083            for this rsb or not, so if the master sends us a request, we should
4084            recreate the rsb if we've destroyed it.   This race happens when we
4085            send a remove message to the dir node at the same time that the dir
4086            node sends us a request for the rsb. */
4087
4088         namelen = receive_extralen(ms);
4089
4090         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4091                          R_RECEIVE_REQUEST, &r);
4092         if (error) {
4093                 __put_lkb(ls, lkb);
4094                 goto fail;
4095         }
4096
4097         lock_rsb(r);
4098
4099         if (r->res_master_nodeid != dlm_our_nodeid()) {
4100                 error = validate_master_nodeid(ls, r, from_nodeid);
4101                 if (error) {
4102                         unlock_rsb(r);
4103                         put_rsb(r);
4104                         __put_lkb(ls, lkb);
4105                         goto fail;
4106                 }
4107         }
4108
4109         attach_lkb(r, lkb);
4110         error = do_request(r, lkb);
4111         send_request_reply(r, lkb, error);
4112         do_request_effects(r, lkb, error);
4113
4114         unlock_rsb(r);
4115         put_rsb(r);
4116
4117         if (error == -EINPROGRESS)
4118                 error = 0;
4119         if (error)
4120                 dlm_put_lkb(lkb);
4121         return 0;
4122
4123  fail:
4124         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4125            and do this receive_request again from process_lookup_list once
4126            we get the lookup reply.  This would avoid a many repeated
4127            ENOTBLK request failures when the lookup reply designating us
4128            as master is delayed. */
4129
4130         /* We could repeatedly return -EBADR here if our send_remove() is
4131            delayed in being sent/arriving/being processed on the dir node.
4132            Another node would repeatedly lookup up the master, and the dir
4133            node would continue returning our nodeid until our send_remove
4134            took effect.
4135
4136            We send another remove message in case our previous send_remove
4137            was lost/ignored/missed somehow. */
4138
4139         if (error != -ENOTBLK) {
4140                 log_limit(ls, "receive_request %x from %d %d",
4141                           ms->m_lkid, from_nodeid, error);
4142         }
4143
4144         if (namelen && error == -EBADR) {
4145                 send_repeat_remove(ls, ms->m_extra, namelen);
4146                 msleep(1000);
4147         }
4148
4149         setup_stub_lkb(ls, ms);
4150         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4151         return error;
4152 }
4153
4154 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4155 {
4156         struct dlm_lkb *lkb;
4157         struct dlm_rsb *r;
4158         int error, reply = 1;
4159
4160         error = find_lkb(ls, ms->m_remid, &lkb);
4161         if (error)
4162                 goto fail;
4163
4164         if (lkb->lkb_remid != ms->m_lkid) {
4165                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4166                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4167                           (unsigned long long)lkb->lkb_recover_seq,
4168                           ms->m_header.h_nodeid, ms->m_lkid);
4169                 error = -ENOENT;
4170                 goto fail;
4171         }
4172
4173         r = lkb->lkb_resource;
4174
4175         hold_rsb(r);
4176         lock_rsb(r);
4177
4178         error = validate_message(lkb, ms);
4179         if (error)
4180                 goto out;
4181
4182         receive_flags(lkb, ms);
4183
4184         error = receive_convert_args(ls, lkb, ms);
4185         if (error) {
4186                 send_convert_reply(r, lkb, error);
4187                 goto out;
4188         }
4189
4190         reply = !down_conversion(lkb);
4191
4192         error = do_convert(r, lkb);
4193         if (reply)
4194                 send_convert_reply(r, lkb, error);
4195         do_convert_effects(r, lkb, error);
4196  out:
4197         unlock_rsb(r);
4198         put_rsb(r);
4199         dlm_put_lkb(lkb);
4200         return 0;
4201
4202  fail:
4203         setup_stub_lkb(ls, ms);
4204         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4205         return error;
4206 }
4207
4208 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4209 {
4210         struct dlm_lkb *lkb;
4211         struct dlm_rsb *r;
4212         int error;
4213
4214         error = find_lkb(ls, ms->m_remid, &lkb);
4215         if (error)
4216                 goto fail;
4217
4218         if (lkb->lkb_remid != ms->m_lkid) {
4219                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4220                           lkb->lkb_id, lkb->lkb_remid,
4221                           ms->m_header.h_nodeid, ms->m_lkid);
4222                 error = -ENOENT;
4223                 goto fail;
4224         }
4225
4226         r = lkb->lkb_resource;
4227
4228         hold_rsb(r);
4229         lock_rsb(r);
4230
4231         error = validate_message(lkb, ms);
4232         if (error)
4233                 goto out;
4234
4235         receive_flags(lkb, ms);
4236
4237         error = receive_unlock_args(ls, lkb, ms);
4238         if (error) {
4239                 send_unlock_reply(r, lkb, error);
4240                 goto out;
4241         }
4242
4243         error = do_unlock(r, lkb);
4244         send_unlock_reply(r, lkb, error);
4245         do_unlock_effects(r, lkb, error);
4246  out:
4247         unlock_rsb(r);
4248         put_rsb(r);
4249         dlm_put_lkb(lkb);
4250         return 0;
4251
4252  fail:
4253         setup_stub_lkb(ls, ms);
4254         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4255         return error;
4256 }
4257
4258 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4259 {
4260         struct dlm_lkb *lkb;
4261         struct dlm_rsb *r;
4262         int error;
4263
4264         error = find_lkb(ls, ms->m_remid, &lkb);
4265         if (error)
4266                 goto fail;
4267
4268         receive_flags(lkb, ms);
4269
4270         r = lkb->lkb_resource;
4271
4272         hold_rsb(r);
4273         lock_rsb(r);
4274
4275         error = validate_message(lkb, ms);
4276         if (error)
4277                 goto out;
4278
4279         error = do_cancel(r, lkb);
4280         send_cancel_reply(r, lkb, error);
4281         do_cancel_effects(r, lkb, error);
4282  out:
4283         unlock_rsb(r);
4284         put_rsb(r);
4285         dlm_put_lkb(lkb);
4286         return 0;
4287
4288  fail:
4289         setup_stub_lkb(ls, ms);
4290         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4291         return error;
4292 }
4293
4294 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4295 {
4296         struct dlm_lkb *lkb;
4297         struct dlm_rsb *r;
4298         int error;
4299
4300         error = find_lkb(ls, ms->m_remid, &lkb);
4301         if (error)
4302                 return error;
4303
4304         r = lkb->lkb_resource;
4305
4306         hold_rsb(r);
4307         lock_rsb(r);
4308
4309         error = validate_message(lkb, ms);
4310         if (error)
4311                 goto out;
4312
4313         receive_flags_reply(lkb, ms);
4314         if (is_altmode(lkb))
4315                 munge_altmode(lkb, ms);
4316         grant_lock_pc(r, lkb, ms);
4317         queue_cast(r, lkb, 0);
4318  out:
4319         unlock_rsb(r);
4320         put_rsb(r);
4321         dlm_put_lkb(lkb);
4322         return 0;
4323 }
4324
4325 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4326 {
4327         struct dlm_lkb *lkb;
4328         struct dlm_rsb *r;
4329         int error;
4330
4331         error = find_lkb(ls, ms->m_remid, &lkb);
4332         if (error)
4333                 return error;
4334
4335         r = lkb->lkb_resource;
4336
4337         hold_rsb(r);
4338         lock_rsb(r);
4339
4340         error = validate_message(lkb, ms);
4341         if (error)
4342                 goto out;
4343
4344         queue_bast(r, lkb, ms->m_bastmode);
4345         lkb->lkb_highbast = ms->m_bastmode;
4346  out:
4347         unlock_rsb(r);
4348         put_rsb(r);
4349         dlm_put_lkb(lkb);
4350         return 0;
4351 }
4352
4353 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4354 {
4355         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4356
4357         from_nodeid = ms->m_header.h_nodeid;
4358         our_nodeid = dlm_our_nodeid();
4359
4360         len = receive_extralen(ms);
4361
4362         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4363                                   &ret_nodeid, NULL);
4364
4365         /* Optimization: we're master so treat lookup as a request */
4366         if (!error && ret_nodeid == our_nodeid) {
4367                 receive_request(ls, ms);
4368                 return;
4369         }
4370         send_lookup_reply(ls, ms, ret_nodeid, error);
4371 }
4372
4373 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4374 {
4375         char name[DLM_RESNAME_MAXLEN+1];
4376         struct dlm_rsb *r;
4377         uint32_t hash, b;
4378         int rv, len, dir_nodeid, from_nodeid;
4379
4380         from_nodeid = ms->m_header.h_nodeid;
4381
4382         len = receive_extralen(ms);
4383
4384         if (len > DLM_RESNAME_MAXLEN) {
4385                 log_error(ls, "receive_remove from %d bad len %d",
4386                           from_nodeid, len);
4387                 return;
4388         }
4389
4390         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4391         if (dir_nodeid != dlm_our_nodeid()) {
4392                 log_error(ls, "receive_remove from %d bad nodeid %d",
4393                           from_nodeid, dir_nodeid);
4394                 return;
4395         }
4396
4397         /* Look for name on rsbtbl.toss, if it's there, kill it.
4398            If it's on rsbtbl.keep, it's being used, and we should ignore this
4399            message.  This is an expected race between the dir node sending a
4400            request to the master node at the same time as the master node sends
4401            a remove to the dir node.  The resolution to that race is for the
4402            dir node to ignore the remove message, and the master node to
4403            recreate the master rsb when it gets a request from the dir node for
4404            an rsb it doesn't have. */
4405
4406         memset(name, 0, sizeof(name));
4407         memcpy(name, ms->m_extra, len);
4408
4409         hash = jhash(name, len, 0);
4410         b = hash & (ls->ls_rsbtbl_size - 1);
4411
4412         spin_lock(&ls->ls_rsbtbl[b].lock);
4413
4414         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4415         if (rv) {
4416                 /* verify the rsb is on keep list per comment above */
4417                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4418                 if (rv) {
4419                         /* should not happen */
4420                         log_error(ls, "receive_remove from %d not found %s",
4421                                   from_nodeid, name);
4422                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4423                         return;
4424                 }
4425                 if (r->res_master_nodeid != from_nodeid) {
4426                         /* should not happen */
4427                         log_error(ls, "receive_remove keep from %d master %d",
4428                                   from_nodeid, r->res_master_nodeid);
4429                         dlm_print_rsb(r);
4430                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4431                         return;
4432                 }
4433
4434                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4435                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4436                           name);
4437                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4438                 return;
4439         }
4440
4441         if (r->res_master_nodeid != from_nodeid) {
4442                 log_error(ls, "receive_remove toss from %d master %d",
4443                           from_nodeid, r->res_master_nodeid);
4444                 dlm_print_rsb(r);
4445                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4446                 return;
4447         }
4448
4449         if (kref_put(&r->res_ref, kill_rsb)) {
4450                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4451                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4452                 dlm_free_rsb(r);
4453         } else {
4454                 log_error(ls, "receive_remove from %d rsb ref error",
4455                           from_nodeid);
4456                 dlm_print_rsb(r);
4457                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4458         }
4459 }
4460
4461 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4462 {
4463         do_purge(ls, ms->m_nodeid, ms->m_pid);
4464 }
4465
4466 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4467 {
4468         struct dlm_lkb *lkb;
4469         struct dlm_rsb *r;
4470         int error, mstype, result;
4471         int from_nodeid = ms->m_header.h_nodeid;
4472
4473         error = find_lkb(ls, ms->m_remid, &lkb);
4474         if (error)
4475                 return error;
4476
4477         r = lkb->lkb_resource;
4478         hold_rsb(r);
4479         lock_rsb(r);
4480
4481         error = validate_message(lkb, ms);
4482         if (error)
4483                 goto out;
4484
4485         mstype = lkb->lkb_wait_type;
4486         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4487         if (error) {
4488                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4489                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4490                 dlm_dump_rsb(r);
4491                 goto out;
4492         }
4493
4494         /* Optimization: the dir node was also the master, so it took our
4495            lookup as a request and sent request reply instead of lookup reply */
4496         if (mstype == DLM_MSG_LOOKUP) {
4497                 r->res_master_nodeid = from_nodeid;
4498                 r->res_nodeid = from_nodeid;
4499                 lkb->lkb_nodeid = from_nodeid;
4500         }
4501
4502         /* this is the value returned from do_request() on the master */
4503         result = ms->m_result;
4504
4505         switch (result) {
4506         case -EAGAIN:
4507                 /* request would block (be queued) on remote master */
4508                 queue_cast(r, lkb, -EAGAIN);
4509                 confirm_master(r, -EAGAIN);
4510                 unhold_lkb(lkb); /* undoes create_lkb() */
4511                 break;
4512
4513         case -EINPROGRESS:
4514         case 0:
4515                 /* request was queued or granted on remote master */
4516                 receive_flags_reply(lkb, ms);
4517                 lkb->lkb_remid = ms->m_lkid;
4518                 if (is_altmode(lkb))
4519                         munge_altmode(lkb, ms);
4520                 if (result) {
4521                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4522                         add_timeout(lkb);
4523                 } else {
4524                         grant_lock_pc(r, lkb, ms);
4525                         queue_cast(r, lkb, 0);
4526                 }
4527                 confirm_master(r, result);
4528                 break;
4529
4530         case -EBADR:
4531         case -ENOTBLK:
4532                 /* find_rsb failed to find rsb or rsb wasn't master */
4533                 log_limit(ls, "receive_request_reply %x from %d %d "
4534                           "master %d dir %d first %x %s", lkb->lkb_id,
4535                           from_nodeid, result, r->res_master_nodeid,
4536                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4537
4538                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4539                     r->res_master_nodeid != dlm_our_nodeid()) {
4540                         /* cause _request_lock->set_master->send_lookup */
4541                         r->res_master_nodeid = 0;
4542                         r->res_nodeid = -1;
4543                         lkb->lkb_nodeid = -1;
4544                 }
4545
4546                 if (is_overlap(lkb)) {
4547                         /* we'll ignore error in cancel/unlock reply */
4548                         queue_cast_overlap(r, lkb);
4549                         confirm_master(r, result);
4550                         unhold_lkb(lkb); /* undoes create_lkb() */
4551                 } else {
4552                         _request_lock(r, lkb);
4553
4554                         if (r->res_master_nodeid == dlm_our_nodeid())
4555                                 confirm_master(r, 0);
4556                 }
4557                 break;
4558
4559         default:
4560                 log_error(ls, "receive_request_reply %x error %d",
4561                           lkb->lkb_id, result);
4562         }
4563
4564         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4565                 log_debug(ls, "receive_request_reply %x result %d unlock",
4566                           lkb->lkb_id, result);
4567                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4568                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4569                 send_unlock(r, lkb);
4570         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4571                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4572                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4573                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4574                 send_cancel(r, lkb);
4575         } else {
4576                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4577                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4578         }
4579  out:
4580         unlock_rsb(r);
4581         put_rsb(r);
4582         dlm_put_lkb(lkb);
4583         return 0;
4584 }
4585
4586 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4587                                     struct dlm_message *ms)
4588 {
4589         /* this is the value returned from do_convert() on the master */
4590         switch (ms->m_result) {
4591         case -EAGAIN:
4592                 /* convert would block (be queued) on remote master */
4593                 queue_cast(r, lkb, -EAGAIN);
4594                 break;
4595
4596         case -EDEADLK:
4597                 receive_flags_reply(lkb, ms);
4598                 revert_lock_pc(r, lkb);
4599                 queue_cast(r, lkb, -EDEADLK);
4600                 break;
4601
4602         case -EINPROGRESS:
4603                 /* convert was queued on remote master */
4604                 receive_flags_reply(lkb, ms);
4605                 if (is_demoted(lkb))
4606                         munge_demoted(lkb);
4607                 del_lkb(r, lkb);
4608                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4609                 add_timeout(lkb);
4610                 break;
4611
4612         case 0:
4613                 /* convert was granted on remote master */
4614                 receive_flags_reply(lkb, ms);
4615                 if (is_demoted(lkb))
4616                         munge_demoted(lkb);
4617                 grant_lock_pc(r, lkb, ms);
4618                 queue_cast(r, lkb, 0);
4619                 break;
4620
4621         default:
4622                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4623                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4624                           ms->m_result);
4625                 dlm_print_rsb(r);
4626                 dlm_print_lkb(lkb);
4627         }
4628 }
4629
4630 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4631 {
4632         struct dlm_rsb *r = lkb->lkb_resource;
4633         int error;
4634
4635         hold_rsb(r);
4636         lock_rsb(r);
4637
4638         error = validate_message(lkb, ms);
4639         if (error)
4640                 goto out;
4641
4642         /* stub reply can happen with waiters_mutex held */
4643         error = remove_from_waiters_ms(lkb, ms);
4644         if (error)
4645                 goto out;
4646
4647         __receive_convert_reply(r, lkb, ms);
4648  out:
4649         unlock_rsb(r);
4650         put_rsb(r);
4651 }
4652
4653 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4654 {
4655         struct dlm_lkb *lkb;
4656         int error;
4657
4658         error = find_lkb(ls, ms->m_remid, &lkb);
4659         if (error)
4660                 return error;
4661
4662         _receive_convert_reply(lkb, ms);
4663         dlm_put_lkb(lkb);
4664         return 0;
4665 }
4666
4667 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4668 {
4669         struct dlm_rsb *r = lkb->lkb_resource;
4670         int error;
4671
4672         hold_rsb(r);
4673         lock_rsb(r);
4674
4675         error = validate_message(lkb, ms);
4676         if (error)
4677                 goto out;
4678
4679         /* stub reply can happen with waiters_mutex held */
4680         error = remove_from_waiters_ms(lkb, ms);
4681         if (error)
4682                 goto out;
4683
4684         /* this is the value returned from do_unlock() on the master */
4685
4686         switch (ms->m_result) {
4687         case -DLM_EUNLOCK:
4688                 receive_flags_reply(lkb, ms);
4689                 remove_lock_pc(r, lkb);
4690                 queue_cast(r, lkb, -DLM_EUNLOCK);
4691                 break;
4692         case -ENOENT:
4693                 break;
4694         default:
4695                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4696                           lkb->lkb_id, ms->m_result);
4697         }
4698  out:
4699         unlock_rsb(r);
4700         put_rsb(r);
4701 }
4702
4703 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4704 {
4705         struct dlm_lkb *lkb;
4706         int error;
4707
4708         error = find_lkb(ls, ms->m_remid, &lkb);
4709         if (error)
4710                 return error;
4711
4712         _receive_unlock_reply(lkb, ms);
4713         dlm_put_lkb(lkb);
4714         return 0;
4715 }
4716
4717 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4718 {
4719         struct dlm_rsb *r = lkb->lkb_resource;
4720         int error;
4721
4722         hold_rsb(r);
4723         lock_rsb(r);
4724
4725         error = validate_message(lkb, ms);
4726         if (error)
4727                 goto out;
4728
4729         /* stub reply can happen with waiters_mutex held */
4730         error = remove_from_waiters_ms(lkb, ms);
4731         if (error)
4732                 goto out;
4733
4734         /* this is the value returned from do_cancel() on the master */
4735
4736         switch (ms->m_result) {
4737         case -DLM_ECANCEL:
4738                 receive_flags_reply(lkb, ms);
4739                 revert_lock_pc(r, lkb);
4740                 queue_cast(r, lkb, -DLM_ECANCEL);
4741                 break;
4742         case 0:
4743                 break;
4744         default:
4745                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4746                           lkb->lkb_id, ms->m_result);
4747         }
4748  out:
4749         unlock_rsb(r);
4750         put_rsb(r);
4751 }
4752
4753 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4754 {
4755         struct dlm_lkb *lkb;
4756         int error;
4757
4758         error = find_lkb(ls, ms->m_remid, &lkb);
4759         if (error)
4760                 return error;
4761
4762         _receive_cancel_reply(lkb, ms);
4763         dlm_put_lkb(lkb);
4764         return 0;
4765 }
4766
4767 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4768 {
4769         struct dlm_lkb *lkb;
4770         struct dlm_rsb *r;
4771         int error, ret_nodeid;
4772         int do_lookup_list = 0;
4773
4774         error = find_lkb(ls, ms->m_lkid, &lkb);
4775         if (error) {
4776                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4777                 return;
4778         }
4779
4780         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4781            FIXME: will a non-zero error ever be returned? */
4782
4783         r = lkb->lkb_resource;
4784         hold_rsb(r);
4785         lock_rsb(r);
4786
4787         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4788         if (error)
4789                 goto out;
4790
4791         ret_nodeid = ms->m_nodeid;
4792
4793         /* We sometimes receive a request from the dir node for this
4794            rsb before we've received the dir node's loookup_reply for it.
4795            The request from the dir node implies we're the master, so we set
4796            ourself as master in receive_request_reply, and verify here that
4797            we are indeed the master. */
4798
4799         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4800                 /* This should never happen */
4801                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4802                           "master %d dir %d our %d first %x %s",
4803                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4804                           r->res_master_nodeid, r->res_dir_nodeid,
4805                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4806         }
4807
4808         if (ret_nodeid == dlm_our_nodeid()) {
4809                 r->res_master_nodeid = ret_nodeid;
4810                 r->res_nodeid = 0;
4811                 do_lookup_list = 1;
4812                 r->res_first_lkid = 0;
4813         } else if (ret_nodeid == -1) {
4814                 /* the remote node doesn't believe it's the dir node */
4815                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4816                           lkb->lkb_id, ms->m_header.h_nodeid);
4817                 r->res_master_nodeid = 0;
4818                 r->res_nodeid = -1;
4819                 lkb->lkb_nodeid = -1;
4820         } else {
4821                 /* set_master() will set lkb_nodeid from r */
4822                 r->res_master_nodeid = ret_nodeid;
4823                 r->res_nodeid = ret_nodeid;
4824         }
4825
4826         if (is_overlap(lkb)) {
4827                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4828                           lkb->lkb_id, lkb->lkb_flags);
4829                 queue_cast_overlap(r, lkb);
4830                 unhold_lkb(lkb); /* undoes create_lkb() */
4831                 goto out_list;
4832         }
4833
4834         _request_lock(r, lkb);
4835
4836  out_list:
4837         if (do_lookup_list)
4838                 process_lookup_list(r);
4839  out:
4840         unlock_rsb(r);
4841         put_rsb(r);
4842         dlm_put_lkb(lkb);
4843 }
4844
4845 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4846                              uint32_t saved_seq)
4847 {
4848         int error = 0, noent = 0;
4849
4850         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4851                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4852                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4853                           ms->m_remid, ms->m_result);
4854                 return;
4855         }
4856
4857         switch (ms->m_type) {
4858
4859         /* messages sent to a master node */
4860
4861         case DLM_MSG_REQUEST:
4862                 error = receive_request(ls, ms);
4863                 break;
4864
4865         case DLM_MSG_CONVERT:
4866                 error = receive_convert(ls, ms);
4867                 break;
4868
4869         case DLM_MSG_UNLOCK:
4870                 error = receive_unlock(ls, ms);
4871                 break;
4872
4873         case DLM_MSG_CANCEL:
4874                 noent = 1;
4875                 error = receive_cancel(ls, ms);
4876                 break;
4877
4878         /* messages sent from a master node (replies to above) */
4879
4880         case DLM_MSG_REQUEST_REPLY:
4881                 error = receive_request_reply(ls, ms);
4882                 break;
4883
4884         case DLM_MSG_CONVERT_REPLY:
4885                 error = receive_convert_reply(ls, ms);
4886                 break;
4887
4888         case DLM_MSG_UNLOCK_REPLY:
4889                 error = receive_unlock_reply(ls, ms);
4890                 break;
4891
4892         case DLM_MSG_CANCEL_REPLY:
4893                 error = receive_cancel_reply(ls, ms);
4894                 break;
4895
4896         /* messages sent from a master node (only two types of async msg) */
4897
4898         case DLM_MSG_GRANT:
4899                 noent = 1;
4900                 error = receive_grant(ls, ms);
4901                 break;
4902
4903         case DLM_MSG_BAST:
4904                 noent = 1;
4905                 error = receive_bast(ls, ms);
4906                 break;
4907
4908         /* messages sent to a dir node */
4909
4910         case DLM_MSG_LOOKUP:
4911                 receive_lookup(ls, ms);
4912                 break;
4913
4914         case DLM_MSG_REMOVE:
4915                 receive_remove(ls, ms);
4916                 break;
4917
4918         /* messages sent from a dir node (remove has no reply) */
4919
4920         case DLM_MSG_LOOKUP_REPLY:
4921                 receive_lookup_reply(ls, ms);
4922                 break;
4923
4924         /* other messages */
4925
4926         case DLM_MSG_PURGE:
4927                 receive_purge(ls, ms);
4928                 break;
4929
4930         default:
4931                 log_error(ls, "unknown message type %d", ms->m_type);
4932         }
4933
4934         /*
4935          * When checking for ENOENT, we're checking the result of
4936          * find_lkb(m_remid):
4937          *
4938          * The lock id referenced in the message wasn't found.  This may
4939          * happen in normal usage for the async messages and cancel, so
4940          * only use log_debug for them.
4941          *
4942          * Some errors are expected and normal.
4943          */
4944
4945         if (error == -ENOENT && noent) {
4946                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4947                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4948                           ms->m_lkid, saved_seq);
4949         } else if (error == -ENOENT) {
4950                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4951                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4952                           ms->m_lkid, saved_seq);
4953
4954                 if (ms->m_type == DLM_MSG_CONVERT)
4955                         dlm_dump_rsb_hash(ls, ms->m_hash);
4956         }
4957
4958         if (error == -EINVAL) {
4959                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4960                           "saved_seq %u",
4961                           ms->m_type, ms->m_header.h_nodeid,
4962                           ms->m_lkid, ms->m_remid, saved_seq);
4963         }
4964 }
4965
4966 /* If the lockspace is in recovery mode (locking stopped), then normal
4967    messages are saved on the requestqueue for processing after recovery is
4968    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4969    messages off the requestqueue before we process new ones. This occurs right
4970    after recovery completes when we transition from saving all messages on
4971    requestqueue, to processing all the saved messages, to processing new
4972    messages as they arrive. */
4973
4974 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4975                                 int nodeid)
4976 {
4977         if (dlm_locking_stopped(ls)) {
4978                 /* If we were a member of this lockspace, left, and rejoined,
4979                    other nodes may still be sending us messages from the
4980                    lockspace generation before we left. */
4981                 if (!ls->ls_generation) {
4982                         log_limit(ls, "receive %d from %d ignore old gen",
4983                                   ms->m_type, nodeid);
4984                         return;
4985                 }
4986
4987                 dlm_add_requestqueue(ls, nodeid, ms);
4988         } else {
4989                 dlm_wait_requestqueue(ls);
4990                 _receive_message(ls, ms, 0);
4991         }
4992 }
4993
4994 /* This is called by dlm_recoverd to process messages that were saved on
4995    the requestqueue. */
4996
4997 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4998                                uint32_t saved_seq)
4999 {
5000         _receive_message(ls, ms, saved_seq);
5001 }
5002
5003 /* This is called by the midcomms layer when something is received for
5004    the lockspace.  It could be either a MSG (normal message sent as part of
5005    standard locking activity) or an RCOM (recovery message sent as part of
5006    lockspace recovery). */
5007
5008 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5009 {
5010         struct dlm_header *hd = &p->header;
5011         struct dlm_ls *ls;
5012         int type = 0;
5013
5014         switch (hd->h_cmd) {
5015         case DLM_MSG:
5016                 dlm_message_in(&p->message);
5017                 type = p->message.m_type;
5018                 break;
5019         case DLM_RCOM:
5020                 dlm_rcom_in(&p->rcom);
5021                 type = p->rcom.rc_type;
5022                 break;
5023         default:
5024                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5025                 return;
5026         }
5027
5028         if (hd->h_nodeid != nodeid) {
5029                 log_print("invalid h_nodeid %d from %d lockspace %x",
5030                           hd->h_nodeid, nodeid, hd->h_lockspace);
5031                 return;
5032         }
5033
5034         ls = dlm_find_lockspace_global(hd->h_lockspace);
5035         if (!ls) {
5036                 if (dlm_config.ci_log_debug) {
5037                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5038                                 "%u from %d cmd %d type %d\n",
5039                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
5040                 }
5041
5042                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5043                         dlm_send_ls_not_ready(nodeid, &p->rcom);
5044                 return;
5045         }
5046
5047         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5048            be inactive (in this ls) before transitioning to recovery mode */
5049
5050         down_read(&ls->ls_recv_active);
5051         if (hd->h_cmd == DLM_MSG)
5052                 dlm_receive_message(ls, &p->message, nodeid);
5053         else
5054                 dlm_receive_rcom(ls, &p->rcom, nodeid);
5055         up_read(&ls->ls_recv_active);
5056
5057         dlm_put_lockspace(ls);
5058 }
5059
5060 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5061                                    struct dlm_message *ms_stub)
5062 {
5063         if (middle_conversion(lkb)) {
5064                 hold_lkb(lkb);
5065                 memset(ms_stub, 0, sizeof(struct dlm_message));
5066                 ms_stub->m_flags = DLM_IFL_STUB_MS;
5067                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5068                 ms_stub->m_result = -EINPROGRESS;
5069                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5070                 _receive_convert_reply(lkb, ms_stub);
5071
5072                 /* Same special case as in receive_rcom_lock_args() */
5073                 lkb->lkb_grmode = DLM_LOCK_IV;
5074                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5075                 unhold_lkb(lkb);
5076
5077         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5078                 lkb->lkb_flags |= DLM_IFL_RESEND;
5079         }
5080
5081         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5082            conversions are async; there's no reply from the remote master */
5083 }
5084
5085 /* A waiting lkb needs recovery if the master node has failed, or
5086    the master node is changing (only when no directory is used) */
5087
5088 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5089                                  int dir_nodeid)
5090 {
5091         if (dlm_no_directory(ls))
5092                 return 1;
5093
5094         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5095                 return 1;
5096
5097         return 0;
5098 }
5099
5100 /* Recovery for locks that are waiting for replies from nodes that are now
5101    gone.  We can just complete unlocks and cancels by faking a reply from the
5102    dead node.  Requests and up-conversions we flag to be resent after
5103    recovery.  Down-conversions can just be completed with a fake reply like
5104    unlocks.  Conversions between PR and CW need special attention. */
5105
5106 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5107 {
5108         struct dlm_lkb *lkb, *safe;
5109         struct dlm_message *ms_stub;
5110         int wait_type, stub_unlock_result, stub_cancel_result;
5111         int dir_nodeid;
5112
5113         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
5114         if (!ms_stub) {
5115                 log_error(ls, "dlm_recover_waiters_pre no mem");
5116                 return;
5117         }
5118
5119         mutex_lock(&ls->ls_waiters_mutex);
5120
5121         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5122
5123                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5124
5125                 /* exclude debug messages about unlocks because there can be so
5126                    many and they aren't very interesting */
5127
5128                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5129                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5130                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5131                                   lkb->lkb_id,
5132                                   lkb->lkb_remid,
5133                                   lkb->lkb_wait_type,
5134                                   lkb->lkb_resource->res_nodeid,
5135                                   lkb->lkb_nodeid,
5136                                   lkb->lkb_wait_nodeid,
5137                                   dir_nodeid);
5138                 }
5139
5140                 /* all outstanding lookups, regardless of destination  will be
5141                    resent after recovery is done */
5142
5143                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5144                         lkb->lkb_flags |= DLM_IFL_RESEND;
5145                         continue;
5146                 }
5147
5148                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5149                         continue;
5150
5151                 wait_type = lkb->lkb_wait_type;
5152                 stub_unlock_result = -DLM_EUNLOCK;
5153                 stub_cancel_result = -DLM_ECANCEL;
5154
5155                 /* Main reply may have been received leaving a zero wait_type,
5156                    but a reply for the overlapping op may not have been
5157                    received.  In that case we need to fake the appropriate
5158                    reply for the overlap op. */
5159
5160                 if (!wait_type) {
5161                         if (is_overlap_cancel(lkb)) {
5162                                 wait_type = DLM_MSG_CANCEL;
5163                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5164                                         stub_cancel_result = 0;
5165                         }
5166                         if (is_overlap_unlock(lkb)) {
5167                                 wait_type = DLM_MSG_UNLOCK;
5168                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5169                                         stub_unlock_result = -ENOENT;
5170                         }
5171
5172                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5173                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
5174                                   stub_cancel_result, stub_unlock_result);
5175                 }
5176
5177                 switch (wait_type) {
5178
5179                 case DLM_MSG_REQUEST:
5180                         lkb->lkb_flags |= DLM_IFL_RESEND;
5181                         break;
5182
5183                 case DLM_MSG_CONVERT:
5184                         recover_convert_waiter(ls, lkb, ms_stub);
5185                         break;
5186
5187                 case DLM_MSG_UNLOCK:
5188                         hold_lkb(lkb);
5189                         memset(ms_stub, 0, sizeof(struct dlm_message));
5190                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5191                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5192                         ms_stub->m_result = stub_unlock_result;
5193                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5194                         _receive_unlock_reply(lkb, ms_stub);
5195                         dlm_put_lkb(lkb);
5196                         break;
5197
5198                 case DLM_MSG_CANCEL:
5199                         hold_lkb(lkb);
5200                         memset(ms_stub, 0, sizeof(struct dlm_message));
5201                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5202                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5203                         ms_stub->m_result = stub_cancel_result;
5204                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5205                         _receive_cancel_reply(lkb, ms_stub);
5206                         dlm_put_lkb(lkb);
5207                         break;
5208
5209                 default:
5210                         log_error(ls, "invalid lkb wait_type %d %d",
5211                                   lkb->lkb_wait_type, wait_type);
5212                 }
5213                 schedule();
5214         }
5215         mutex_unlock(&ls->ls_waiters_mutex);
5216         kfree(ms_stub);
5217 }
5218
5219 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5220 {
5221         struct dlm_lkb *lkb;
5222         int found = 0;
5223
5224         mutex_lock(&ls->ls_waiters_mutex);
5225         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5226                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5227                         hold_lkb(lkb);
5228                         found = 1;
5229                         break;
5230                 }
5231         }
5232         mutex_unlock(&ls->ls_waiters_mutex);
5233
5234         if (!found)
5235                 lkb = NULL;
5236         return lkb;
5237 }
5238
5239 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5240    master or dir-node for r.  Processing the lkb may result in it being placed
5241    back on waiters. */
5242
5243 /* We do this after normal locking has been enabled and any saved messages
5244    (in requestqueue) have been processed.  We should be confident that at
5245    this point we won't get or process a reply to any of these waiting
5246    operations.  But, new ops may be coming in on the rsbs/locks here from
5247    userspace or remotely. */
5248
5249 /* there may have been an overlap unlock/cancel prior to recovery or after
5250    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5251    overlap flag would just have been set and nothing new sent.  we can be
5252    confident here than any replies to either the initial op or overlap ops
5253    prior to recovery have been received. */
5254
5255 int dlm_recover_waiters_post(struct dlm_ls *ls)
5256 {
5257         struct dlm_lkb *lkb;
5258         struct dlm_rsb *r;
5259         int error = 0, mstype, err, oc, ou;
5260
5261         while (1) {
5262                 if (dlm_locking_stopped(ls)) {
5263                         log_debug(ls, "recover_waiters_post aborted");
5264                         error = -EINTR;
5265                         break;
5266                 }
5267
5268                 lkb = find_resend_waiter(ls);
5269                 if (!lkb)
5270                         break;
5271
5272                 r = lkb->lkb_resource;
5273                 hold_rsb(r);
5274                 lock_rsb(r);
5275
5276                 mstype = lkb->lkb_wait_type;
5277                 oc = is_overlap_cancel(lkb);
5278                 ou = is_overlap_unlock(lkb);
5279                 err = 0;
5280
5281                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5282                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5283                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5284                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5285                           dlm_dir_nodeid(r), oc, ou);
5286
5287                 /* At this point we assume that we won't get a reply to any
5288                    previous op or overlap op on this lock.  First, do a big
5289                    remove_from_waiters() for all previous ops. */
5290
5291                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5292                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5293                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5294                 lkb->lkb_wait_type = 0;
5295                 lkb->lkb_wait_count = 0;
5296                 mutex_lock(&ls->ls_waiters_mutex);
5297                 list_del_init(&lkb->lkb_wait_reply);
5298                 mutex_unlock(&ls->ls_waiters_mutex);
5299                 unhold_lkb(lkb); /* for waiters list */
5300
5301                 if (oc || ou) {
5302                         /* do an unlock or cancel instead of resending */
5303                         switch (mstype) {
5304                         case DLM_MSG_LOOKUP:
5305                         case DLM_MSG_REQUEST:
5306                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5307                                                         -DLM_ECANCEL);
5308                                 unhold_lkb(lkb); /* undoes create_lkb() */
5309                                 break;
5310                         case DLM_MSG_CONVERT:
5311                                 if (oc) {
5312                                         queue_cast(r, lkb, -DLM_ECANCEL);
5313                                 } else {
5314                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5315                                         _unlock_lock(r, lkb);
5316                                 }
5317                                 break;
5318                         default:
5319                                 err = 1;
5320                         }
5321                 } else {
5322                         switch (mstype) {
5323                         case DLM_MSG_LOOKUP:
5324                         case DLM_MSG_REQUEST:
5325                                 _request_lock(r, lkb);
5326                                 if (is_master(r))
5327                                         confirm_master(r, 0);
5328                                 break;
5329                         case DLM_MSG_CONVERT:
5330                                 _convert_lock(r, lkb);
5331                                 break;
5332                         default:
5333                                 err = 1;
5334                         }
5335                 }
5336
5337                 if (err) {
5338                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5339                                   "dir_nodeid %d overlap %d %d",
5340                                   lkb->lkb_id, mstype, r->res_nodeid,
5341                                   dlm_dir_nodeid(r), oc, ou);
5342                 }
5343                 unlock_rsb(r);
5344                 put_rsb(r);
5345                 dlm_put_lkb(lkb);
5346         }
5347
5348         return error;
5349 }
5350
5351 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5352                               struct list_head *list)
5353 {
5354         struct dlm_lkb *lkb, *safe;
5355
5356         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5357                 if (!is_master_copy(lkb))
5358                         continue;
5359
5360                 /* don't purge lkbs we've added in recover_master_copy for
5361                    the current recovery seq */
5362
5363                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5364                         continue;
5365
5366                 del_lkb(r, lkb);
5367
5368                 /* this put should free the lkb */
5369                 if (!dlm_put_lkb(lkb))
5370                         log_error(ls, "purged mstcpy lkb not released");
5371         }
5372 }
5373
5374 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5375 {
5376         struct dlm_ls *ls = r->res_ls;
5377
5378         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5379         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5380         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5381 }
5382
5383 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5384                             struct list_head *list,
5385                             int nodeid_gone, unsigned int *count)
5386 {
5387         struct dlm_lkb *lkb, *safe;
5388
5389         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5390                 if (!is_master_copy(lkb))
5391                         continue;
5392
5393                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5394                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5395
5396                         del_lkb(r, lkb);
5397
5398                         /* this put should free the lkb */
5399                         if (!dlm_put_lkb(lkb))
5400                                 log_error(ls, "purged dead lkb not released");
5401
5402                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5403
5404                         (*count)++;
5405                 }
5406         }
5407 }
5408
5409 /* Get rid of locks held by nodes that are gone. */
5410
5411 void dlm_recover_purge(struct dlm_ls *ls)
5412 {
5413         struct dlm_rsb *r;
5414         struct dlm_member *memb;
5415         int nodes_count = 0;
5416         int nodeid_gone = 0;
5417         unsigned int lkb_count = 0;
5418
5419         /* cache one removed nodeid to optimize the common
5420            case of a single node removed */
5421
5422         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5423                 nodes_count++;
5424                 nodeid_gone = memb->nodeid;
5425         }
5426
5427         if (!nodes_count)
5428                 return;
5429
5430         down_write(&ls->ls_root_sem);
5431         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5432                 hold_rsb(r);
5433                 lock_rsb(r);
5434                 if (is_master(r)) {
5435                         purge_dead_list(ls, r, &r->res_grantqueue,
5436                                         nodeid_gone, &lkb_count);
5437                         purge_dead_list(ls, r, &r->res_convertqueue,
5438                                         nodeid_gone, &lkb_count);
5439                         purge_dead_list(ls, r, &r->res_waitqueue,
5440                                         nodeid_gone, &lkb_count);
5441                 }
5442                 unlock_rsb(r);
5443                 unhold_rsb(r);
5444                 cond_resched();
5445         }
5446         up_write(&ls->ls_root_sem);
5447
5448         if (lkb_count)
5449                 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
5450                           lkb_count, nodes_count);
5451 }
5452
5453 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5454 {
5455         struct rb_node *n;
5456         struct dlm_rsb *r;
5457
5458         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5459         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5460                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5461
5462                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5463                         continue;
5464                 if (!is_master(r)) {
5465                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5466                         continue;
5467                 }
5468                 hold_rsb(r);
5469                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5470                 return r;
5471         }
5472         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5473         return NULL;
5474 }
5475
5476 /*
5477  * Attempt to grant locks on resources that we are the master of.
5478  * Locks may have become grantable during recovery because locks
5479  * from departed nodes have been purged (or not rebuilt), allowing
5480  * previously blocked locks to now be granted.  The subset of rsb's
5481  * we are interested in are those with lkb's on either the convert or
5482  * waiting queues.
5483  *
5484  * Simplest would be to go through each master rsb and check for non-empty
5485  * convert or waiting queues, and attempt to grant on those rsbs.
5486  * Checking the queues requires lock_rsb, though, for which we'd need
5487  * to release the rsbtbl lock.  This would make iterating through all
5488  * rsb's very inefficient.  So, we rely on earlier recovery routines
5489  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5490  * locks for.
5491  */
5492
5493 void dlm_recover_grant(struct dlm_ls *ls)
5494 {
5495         struct dlm_rsb *r;
5496         int bucket = 0;
5497         unsigned int count = 0;
5498         unsigned int rsb_count = 0;
5499         unsigned int lkb_count = 0;
5500
5501         while (1) {
5502                 r = find_grant_rsb(ls, bucket);
5503                 if (!r) {
5504                         if (bucket == ls->ls_rsbtbl_size - 1)
5505                                 break;
5506                         bucket++;
5507                         continue;
5508                 }
5509                 rsb_count++;
5510                 count = 0;
5511                 lock_rsb(r);
5512                 /* the RECOVER_GRANT flag is checked in the grant path */
5513                 grant_pending_locks(r, &count);
5514                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5515                 lkb_count += count;
5516                 confirm_master(r, 0);
5517                 unlock_rsb(r);
5518                 put_rsb(r);
5519                 cond_resched();
5520         }
5521
5522         if (lkb_count)
5523                 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
5524                           lkb_count, rsb_count);
5525 }
5526
5527 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5528                                          uint32_t remid)
5529 {
5530         struct dlm_lkb *lkb;
5531
5532         list_for_each_entry(lkb, head, lkb_statequeue) {
5533                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5534                         return lkb;
5535         }
5536         return NULL;
5537 }
5538
5539 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5540                                     uint32_t remid)
5541 {
5542         struct dlm_lkb *lkb;
5543
5544         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5545         if (lkb)
5546                 return lkb;
5547         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5548         if (lkb)
5549                 return lkb;
5550         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5551         if (lkb)
5552                 return lkb;
5553         return NULL;
5554 }
5555
5556 /* needs at least dlm_rcom + rcom_lock */
5557 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5558                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5559 {
5560         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5561
5562         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5563         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5564         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5565         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5566         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5567         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5568         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5569         lkb->lkb_rqmode = rl->rl_rqmode;
5570         lkb->lkb_grmode = rl->rl_grmode;
5571         /* don't set lkb_status because add_lkb wants to itself */
5572
5573         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5574         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5575
5576         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5577                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5578                          sizeof(struct rcom_lock);
5579                 if (lvblen > ls->ls_lvblen)
5580                         return -EINVAL;
5581                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5582                 if (!lkb->lkb_lvbptr)
5583                         return -ENOMEM;
5584                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5585         }
5586
5587         /* Conversions between PR and CW (middle modes) need special handling.
5588            The real granted mode of these converting locks cannot be determined
5589            until all locks have been rebuilt on the rsb (recover_conversion) */
5590
5591         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5592             middle_conversion(lkb)) {
5593                 rl->rl_status = DLM_LKSTS_CONVERT;
5594                 lkb->lkb_grmode = DLM_LOCK_IV;
5595                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5596         }
5597
5598         return 0;
5599 }
5600
5601 /* This lkb may have been recovered in a previous aborted recovery so we need
5602    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5603    If so we just send back a standard reply.  If not, we create a new lkb with
5604    the given values and send back our lkid.  We send back our lkid by sending
5605    back the rcom_lock struct we got but with the remid field filled in. */
5606
5607 /* needs at least dlm_rcom + rcom_lock */
5608 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5609 {
5610         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5611         struct dlm_rsb *r;
5612         struct dlm_lkb *lkb;
5613         uint32_t remid = 0;
5614         int from_nodeid = rc->rc_header.h_nodeid;
5615         int error;
5616
5617         if (rl->rl_parent_lkid) {
5618                 error = -EOPNOTSUPP;
5619                 goto out;
5620         }
5621
5622         remid = le32_to_cpu(rl->rl_lkid);
5623
5624         /* In general we expect the rsb returned to be R_MASTER, but we don't
5625            have to require it.  Recovery of masters on one node can overlap
5626            recovery of locks on another node, so one node can send us MSTCPY
5627            locks before we've made ourselves master of this rsb.  We can still
5628            add new MSTCPY locks that we receive here without any harm; when
5629            we make ourselves master, dlm_recover_masters() won't touch the
5630            MSTCPY locks we've received early. */
5631
5632         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5633                          from_nodeid, R_RECEIVE_RECOVER, &r);
5634         if (error)
5635                 goto out;
5636
5637         lock_rsb(r);
5638
5639         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5640                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5641                           from_nodeid, remid);
5642                 error = -EBADR;
5643                 goto out_unlock;
5644         }
5645
5646         lkb = search_remid(r, from_nodeid, remid);
5647         if (lkb) {
5648                 error = -EEXIST;
5649                 goto out_remid;
5650         }
5651
5652         error = create_lkb(ls, &lkb);
5653         if (error)
5654                 goto out_unlock;
5655
5656         error = receive_rcom_lock_args(ls, lkb, r, rc);
5657         if (error) {
5658                 __put_lkb(ls, lkb);
5659                 goto out_unlock;
5660         }
5661
5662         attach_lkb(r, lkb);
5663         add_lkb(r, lkb, rl->rl_status);
5664         error = 0;
5665         ls->ls_recover_locks_in++;
5666
5667         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5668                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5669
5670  out_remid:
5671         /* this is the new value returned to the lock holder for
5672            saving in its process-copy lkb */
5673         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5674
5675         lkb->lkb_recover_seq = ls->ls_recover_seq;
5676
5677  out_unlock:
5678         unlock_rsb(r);
5679         put_rsb(r);
5680  out:
5681         if (error && error != -EEXIST)
5682                 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
5683                           from_nodeid, remid, error);
5684         rl->rl_result = cpu_to_le32(error);
5685         return error;
5686 }
5687
5688 /* needs at least dlm_rcom + rcom_lock */
5689 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5690 {
5691         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5692         struct dlm_rsb *r;
5693         struct dlm_lkb *lkb;
5694         uint32_t lkid, remid;
5695         int error, result;
5696
5697         lkid = le32_to_cpu(rl->rl_lkid);
5698         remid = le32_to_cpu(rl->rl_remid);
5699         result = le32_to_cpu(rl->rl_result);
5700
5701         error = find_lkb(ls, lkid, &lkb);
5702         if (error) {
5703                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5704                           lkid, rc->rc_header.h_nodeid, remid, result);
5705                 return error;
5706         }
5707
5708         r = lkb->lkb_resource;
5709         hold_rsb(r);
5710         lock_rsb(r);
5711
5712         if (!is_process_copy(lkb)) {
5713                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5714                           lkid, rc->rc_header.h_nodeid, remid, result);
5715                 dlm_dump_rsb(r);
5716                 unlock_rsb(r);
5717                 put_rsb(r);
5718                 dlm_put_lkb(lkb);
5719                 return -EINVAL;
5720         }
5721
5722         switch (result) {
5723         case -EBADR:
5724                 /* There's a chance the new master received our lock before
5725                    dlm_recover_master_reply(), this wouldn't happen if we did
5726                    a barrier between recover_masters and recover_locks. */
5727
5728                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5729                           lkid, rc->rc_header.h_nodeid, remid, result);
5730         
5731                 dlm_send_rcom_lock(r, lkb);
5732                 goto out;
5733         case -EEXIST:
5734         case 0:
5735                 lkb->lkb_remid = remid;
5736                 break;
5737         default:
5738                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5739                           lkid, rc->rc_header.h_nodeid, remid, result);
5740         }
5741
5742         /* an ack for dlm_recover_locks() which waits for replies from
5743            all the locks it sends to new masters */
5744         dlm_recovered_lock(r);
5745  out:
5746         unlock_rsb(r);
5747         put_rsb(r);
5748         dlm_put_lkb(lkb);
5749
5750         return 0;
5751 }
5752
5753 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5754                      int mode, uint32_t flags, void *name, unsigned int namelen,
5755                      unsigned long timeout_cs)
5756 {
5757         struct dlm_lkb *lkb;
5758         struct dlm_args args;
5759         int error;
5760
5761         dlm_lock_recovery(ls);
5762
5763         error = create_lkb(ls, &lkb);
5764         if (error) {
5765                 kfree(ua);
5766                 goto out;
5767         }
5768
5769         if (flags & DLM_LKF_VALBLK) {
5770                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5771                 if (!ua->lksb.sb_lvbptr) {
5772                         kfree(ua);
5773                         __put_lkb(ls, lkb);
5774                         error = -ENOMEM;
5775                         goto out;
5776                 }
5777         }
5778
5779         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5780            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5781            lock and that lkb_astparam is the dlm_user_args structure. */
5782
5783         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5784                               fake_astfn, ua, fake_bastfn, &args);
5785         lkb->lkb_flags |= DLM_IFL_USER;
5786
5787         if (error) {
5788                 __put_lkb(ls, lkb);
5789                 goto out;
5790         }
5791
5792         error = request_lock(ls, lkb, name, namelen, &args);
5793
5794         switch (error) {
5795         case 0:
5796                 break;
5797         case -EINPROGRESS:
5798                 error = 0;
5799                 break;
5800         case -EAGAIN:
5801                 error = 0;
5802                 /* fall through */
5803         default:
5804                 __put_lkb(ls, lkb);
5805                 goto out;
5806         }
5807
5808         /* add this new lkb to the per-process list of locks */
5809         spin_lock(&ua->proc->locks_spin);
5810         hold_lkb(lkb);
5811         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5812         spin_unlock(&ua->proc->locks_spin);
5813  out:
5814         dlm_unlock_recovery(ls);
5815         return error;
5816 }
5817
5818 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5819                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5820                      unsigned long timeout_cs)
5821 {
5822         struct dlm_lkb *lkb;
5823         struct dlm_args args;
5824         struct dlm_user_args *ua;
5825         int error;
5826
5827         dlm_lock_recovery(ls);
5828
5829         error = find_lkb(ls, lkid, &lkb);
5830         if (error)
5831                 goto out;
5832
5833         /* user can change the params on its lock when it converts it, or
5834            add an lvb that didn't exist before */
5835
5836         ua = lkb->lkb_ua;
5837
5838         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5839                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5840                 if (!ua->lksb.sb_lvbptr) {
5841                         error = -ENOMEM;
5842                         goto out_put;
5843                 }
5844         }
5845         if (lvb_in && ua->lksb.sb_lvbptr)
5846                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5847
5848         ua->xid = ua_tmp->xid;
5849         ua->castparam = ua_tmp->castparam;
5850         ua->castaddr = ua_tmp->castaddr;
5851         ua->bastparam = ua_tmp->bastparam;
5852         ua->bastaddr = ua_tmp->bastaddr;
5853         ua->user_lksb = ua_tmp->user_lksb;
5854
5855         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5856                               fake_astfn, ua, fake_bastfn, &args);
5857         if (error)
5858                 goto out_put;
5859
5860         error = convert_lock(ls, lkb, &args);
5861
5862         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5863                 error = 0;
5864  out_put:
5865         dlm_put_lkb(lkb);
5866  out:
5867         dlm_unlock_recovery(ls);
5868         kfree(ua_tmp);
5869         return error;
5870 }
5871
5872 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5873                     uint32_t flags, uint32_t lkid, char *lvb_in)
5874 {
5875         struct dlm_lkb *lkb;
5876         struct dlm_args args;
5877         struct dlm_user_args *ua;
5878         int error;
5879
5880         dlm_lock_recovery(ls);
5881
5882         error = find_lkb(ls, lkid, &lkb);
5883         if (error)
5884                 goto out;
5885
5886         ua = lkb->lkb_ua;
5887
5888         if (lvb_in && ua->lksb.sb_lvbptr)
5889                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5890         if (ua_tmp->castparam)
5891                 ua->castparam = ua_tmp->castparam;
5892         ua->user_lksb = ua_tmp->user_lksb;
5893
5894         error = set_unlock_args(flags, ua, &args);
5895         if (error)
5896                 goto out_put;
5897
5898         error = unlock_lock(ls, lkb, &args);
5899
5900         if (error == -DLM_EUNLOCK)
5901                 error = 0;
5902         /* from validate_unlock_args() */
5903         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5904                 error = 0;
5905         if (error)
5906                 goto out_put;
5907
5908         spin_lock(&ua->proc->locks_spin);
5909         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5910         if (!list_empty(&lkb->lkb_ownqueue))
5911                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5912         spin_unlock(&ua->proc->locks_spin);
5913  out_put:
5914         dlm_put_lkb(lkb);
5915  out:
5916         dlm_unlock_recovery(ls);
5917         kfree(ua_tmp);
5918         return error;
5919 }
5920
5921 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5922                     uint32_t flags, uint32_t lkid)
5923 {
5924         struct dlm_lkb *lkb;
5925         struct dlm_args args;
5926         struct dlm_user_args *ua;
5927         int error;
5928
5929         dlm_lock_recovery(ls);
5930
5931         error = find_lkb(ls, lkid, &lkb);
5932         if (error)
5933                 goto out;
5934
5935         ua = lkb->lkb_ua;
5936         if (ua_tmp->castparam)
5937                 ua->castparam = ua_tmp->castparam;
5938         ua->user_lksb = ua_tmp->user_lksb;
5939
5940         error = set_unlock_args(flags, ua, &args);
5941         if (error)
5942                 goto out_put;
5943
5944         error = cancel_lock(ls, lkb, &args);
5945
5946         if (error == -DLM_ECANCEL)
5947                 error = 0;
5948         /* from validate_unlock_args() */
5949         if (error == -EBUSY)
5950                 error = 0;
5951  out_put:
5952         dlm_put_lkb(lkb);
5953  out:
5954         dlm_unlock_recovery(ls);
5955         kfree(ua_tmp);
5956         return error;
5957 }
5958
5959 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5960 {
5961         struct dlm_lkb *lkb;
5962         struct dlm_args args;
5963         struct dlm_user_args *ua;
5964         struct dlm_rsb *r;
5965         int error;
5966
5967         dlm_lock_recovery(ls);
5968
5969         error = find_lkb(ls, lkid, &lkb);
5970         if (error)
5971                 goto out;
5972
5973         ua = lkb->lkb_ua;
5974
5975         error = set_unlock_args(flags, ua, &args);
5976         if (error)
5977                 goto out_put;
5978
5979         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5980
5981         r = lkb->lkb_resource;
5982         hold_rsb(r);
5983         lock_rsb(r);
5984
5985         error = validate_unlock_args(lkb, &args);
5986         if (error)
5987                 goto out_r;
5988         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
5989
5990         error = _cancel_lock(r, lkb);
5991  out_r:
5992         unlock_rsb(r);
5993         put_rsb(r);
5994
5995         if (error == -DLM_ECANCEL)
5996                 error = 0;
5997         /* from validate_unlock_args() */
5998         if (error == -EBUSY)
5999                 error = 0;
6000  out_put:
6001         dlm_put_lkb(lkb);
6002  out:
6003         dlm_unlock_recovery(ls);
6004         return error;
6005 }
6006
6007 /* lkb's that are removed from the waiters list by revert are just left on the
6008    orphans list with the granted orphan locks, to be freed by purge */
6009
6010 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6011 {
6012         struct dlm_args args;
6013         int error;
6014
6015         hold_lkb(lkb);
6016         mutex_lock(&ls->ls_orphans_mutex);
6017         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6018         mutex_unlock(&ls->ls_orphans_mutex);
6019
6020         set_unlock_args(0, lkb->lkb_ua, &args);
6021
6022         error = cancel_lock(ls, lkb, &args);
6023         if (error == -DLM_ECANCEL)
6024                 error = 0;
6025         return error;
6026 }
6027
6028 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
6029    Regardless of what rsb queue the lock is on, it's removed and freed. */
6030
6031 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6032 {
6033         struct dlm_args args;
6034         int error;
6035
6036         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
6037
6038         error = unlock_lock(ls, lkb, &args);
6039         if (error == -DLM_EUNLOCK)
6040                 error = 0;
6041         return error;
6042 }
6043
6044 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6045    (which does lock_rsb) due to deadlock with receiving a message that does
6046    lock_rsb followed by dlm_user_add_cb() */
6047
6048 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6049                                      struct dlm_user_proc *proc)
6050 {
6051         struct dlm_lkb *lkb = NULL;
6052
6053         mutex_lock(&ls->ls_clear_proc_locks);
6054         if (list_empty(&proc->locks))
6055                 goto out;
6056
6057         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6058         list_del_init(&lkb->lkb_ownqueue);
6059
6060         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6061                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6062         else
6063                 lkb->lkb_flags |= DLM_IFL_DEAD;
6064  out:
6065         mutex_unlock(&ls->ls_clear_proc_locks);
6066         return lkb;
6067 }
6068
6069 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6070    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6071    which we clear here. */
6072
6073 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6074    list, and no more device_writes should add lkb's to proc->locks list; so we
6075    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6076    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6077    them ourself. */
6078
6079 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6080 {
6081         struct dlm_lkb *lkb, *safe;
6082
6083         dlm_lock_recovery(ls);
6084
6085         while (1) {
6086                 lkb = del_proc_lock(ls, proc);
6087                 if (!lkb)
6088                         break;
6089                 del_timeout(lkb);
6090                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6091                         orphan_proc_lock(ls, lkb);
6092                 else
6093                         unlock_proc_lock(ls, lkb);
6094
6095                 /* this removes the reference for the proc->locks list
6096                    added by dlm_user_request, it may result in the lkb
6097                    being freed */
6098
6099                 dlm_put_lkb(lkb);
6100         }
6101
6102         mutex_lock(&ls->ls_clear_proc_locks);
6103
6104         /* in-progress unlocks */
6105         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6106                 list_del_init(&lkb->lkb_ownqueue);
6107                 lkb->lkb_flags |= DLM_IFL_DEAD;
6108                 dlm_put_lkb(lkb);
6109         }
6110
6111         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6112                 memset(&lkb->lkb_callbacks, 0,
6113                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6114                 list_del_init(&lkb->lkb_cb_list);
6115                 dlm_put_lkb(lkb);
6116         }
6117
6118         mutex_unlock(&ls->ls_clear_proc_locks);
6119         dlm_unlock_recovery(ls);
6120 }
6121
6122 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6123 {
6124         struct dlm_lkb *lkb, *safe;
6125
6126         while (1) {
6127                 lkb = NULL;
6128                 spin_lock(&proc->locks_spin);
6129                 if (!list_empty(&proc->locks)) {
6130                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6131                                          lkb_ownqueue);
6132                         list_del_init(&lkb->lkb_ownqueue);
6133                 }
6134                 spin_unlock(&proc->locks_spin);
6135
6136                 if (!lkb)
6137                         break;
6138
6139                 lkb->lkb_flags |= DLM_IFL_DEAD;
6140                 unlock_proc_lock(ls, lkb);
6141                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6142         }
6143
6144         spin_lock(&proc->locks_spin);
6145         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6146                 list_del_init(&lkb->lkb_ownqueue);
6147                 lkb->lkb_flags |= DLM_IFL_DEAD;
6148                 dlm_put_lkb(lkb);
6149         }
6150         spin_unlock(&proc->locks_spin);
6151
6152         spin_lock(&proc->asts_spin);
6153         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6154                 memset(&lkb->lkb_callbacks, 0,
6155                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6156                 list_del_init(&lkb->lkb_cb_list);
6157                 dlm_put_lkb(lkb);
6158         }
6159         spin_unlock(&proc->asts_spin);
6160 }
6161
6162 /* pid of 0 means purge all orphans */
6163
6164 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6165 {
6166         struct dlm_lkb *lkb, *safe;
6167
6168         mutex_lock(&ls->ls_orphans_mutex);
6169         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6170                 if (pid && lkb->lkb_ownpid != pid)
6171                         continue;
6172                 unlock_proc_lock(ls, lkb);
6173                 list_del_init(&lkb->lkb_ownqueue);
6174                 dlm_put_lkb(lkb);
6175         }
6176         mutex_unlock(&ls->ls_orphans_mutex);
6177 }
6178
6179 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6180 {
6181         struct dlm_message *ms;
6182         struct dlm_mhandle *mh;
6183         int error;
6184
6185         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6186                                 DLM_MSG_PURGE, &ms, &mh);
6187         if (error)
6188                 return error;
6189         ms->m_nodeid = nodeid;
6190         ms->m_pid = pid;
6191
6192         return send_message(mh, ms);
6193 }
6194
6195 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6196                    int nodeid, int pid)
6197 {
6198         int error = 0;
6199
6200         if (nodeid != dlm_our_nodeid()) {
6201                 error = send_purge(ls, nodeid, pid);
6202         } else {
6203                 dlm_lock_recovery(ls);
6204                 if (pid == current->pid)
6205                         purge_proc_locks(ls, proc);
6206                 else
6207                         do_purge(ls, nodeid, pid);
6208                 dlm_unlock_recovery(ls);
6209         }
6210         return error;
6211 }
6212