]> Pileus Git - ~andy/linux/blob - fs/ocfs2/dlmglue.c
ocfs2: combine inode and generic AST functions
[~andy/linux] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
36
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
40
41 #include <dlm/dlmapi.h>
42
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
45
46 #include "ocfs2.h"
47
48 #include "alloc.h"
49 #include "dcache.h"
50 #include "dlmglue.h"
51 #include "extent_map.h"
52 #include "heartbeat.h"
53 #include "inode.h"
54 #include "journal.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58 #include "vote.h"
59
60 #include "buffer_head_io.h"
61
62 struct ocfs2_mask_waiter {
63         struct list_head        mw_item;
64         int                     mw_status;
65         struct completion       mw_complete;
66         unsigned long           mw_mask;
67         unsigned long           mw_goal;
68 };
69
70 static void ocfs2_inode_bast_func(void *opaque,
71                                   int level);
72 static void ocfs2_dentry_bast_func(void *opaque,
73                                   int level);
74 static void ocfs2_super_bast_func(void *opaque,
75                                   int level);
76 static void ocfs2_rename_bast_func(void *opaque,
77                                    int level);
78
79 /*
80  * Return value from ocfs2_convert_worker_t functions.
81  *
82  * These control the precise actions of ocfs2_generic_unblock_lock()
83  * and ocfs2_process_blocked_lock()
84  *
85  */
86 enum ocfs2_unblock_action {
87         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
88         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
89                                       * ->post_unlock callback */
90         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
91                                       * ->post_unlock() callback. */
92 };
93
94 struct ocfs2_unblock_ctl {
95         int requeue;
96         enum ocfs2_unblock_action unblock_action;
97 };
98
99 /* so far, all locks have gotten along with the same unlock ast */
100 static void ocfs2_unlock_ast_func(void *opaque,
101                                   enum dlm_status status);
102 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
103                               struct ocfs2_unblock_ctl *ctl);
104 static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
105                               struct ocfs2_unblock_ctl *ctl);
106 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
107                                     struct ocfs2_unblock_ctl *ctl);
108 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
109                                      struct ocfs2_unblock_ctl *ctl);
110 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
111                                   struct ocfs2_unblock_ctl *ctl);
112
113 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
114                                      struct ocfs2_lock_res *lockres);
115
116 /*
117  * OCFS2 Lock Resource Operations
118  *
119  * These fine tune the behavior of the generic dlmglue locking infrastructure.
120  */
121 struct ocfs2_lock_res_ops {
122         void (*bast)(void *, int);
123         void (*unlock_ast)(void *, enum dlm_status);
124         int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
125         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
126
127         /*
128          * LOCK_TYPE_* flags which describe the specific requirements
129          * of a lock type. Descriptions of each individual flag follow.
130          */
131         int flags;
132 };
133
134 /*
135  * Some locks want to "refresh" potentially stale data when a
136  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
137  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
138  * individual lockres l_flags member from the ast function. It is
139  * expected that the locking wrapper will clear the
140  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
141  */
142 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
143
144 typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
145 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
146                                       struct ocfs2_lock_res *lockres,
147                                       struct ocfs2_unblock_ctl *ctl,
148                                       ocfs2_convert_worker_t *worker);
149
150 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
151         .bast           = ocfs2_inode_bast_func,
152         .unlock_ast     = ocfs2_unlock_ast_func,
153         .unblock        = ocfs2_unblock_inode_lock,
154         .flags          = 0,
155 };
156
157 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
158         .bast           = ocfs2_inode_bast_func,
159         .unlock_ast     = ocfs2_unlock_ast_func,
160         .unblock        = ocfs2_unblock_meta,
161         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
162 };
163
164 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
165         .bast           = ocfs2_inode_bast_func,
166         .unlock_ast     = ocfs2_unlock_ast_func,
167         .unblock        = ocfs2_unblock_data,
168         .flags          = 0,
169 };
170
171 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
172         .bast           = ocfs2_super_bast_func,
173         .unlock_ast     = ocfs2_unlock_ast_func,
174         .unblock        = ocfs2_unblock_osb_lock,
175         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
176 };
177
178 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
179         .bast           = ocfs2_rename_bast_func,
180         .unlock_ast     = ocfs2_unlock_ast_func,
181         .unblock        = ocfs2_unblock_osb_lock,
182         .flags          = 0,
183 };
184
185 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
186         .bast           = ocfs2_dentry_bast_func,
187         .unlock_ast     = ocfs2_unlock_ast_func,
188         .unblock        = ocfs2_unblock_dentry_lock,
189         .post_unlock    = ocfs2_dentry_post_unlock,
190         .flags          = 0,
191 };
192
193 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
194 {
195         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
196                 lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
197                 lockres->l_type == OCFS2_LOCK_TYPE_RW;
198 }
199
200 static inline int ocfs2_is_super_lock(struct ocfs2_lock_res *lockres)
201 {
202         return lockres->l_type == OCFS2_LOCK_TYPE_SUPER;
203 }
204
205 static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res *lockres)
206 {
207         return lockres->l_type == OCFS2_LOCK_TYPE_RENAME;
208 }
209
210 static inline struct ocfs2_super *ocfs2_lock_res_super(struct ocfs2_lock_res *lockres)
211 {
212         BUG_ON(!ocfs2_is_super_lock(lockres)
213                && !ocfs2_is_rename_lock(lockres));
214
215         return (struct ocfs2_super *) lockres->l_priv;
216 }
217
218 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
219 {
220         BUG_ON(!ocfs2_is_inode_lock(lockres));
221
222         return (struct inode *) lockres->l_priv;
223 }
224
225 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
226 {
227         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
228
229         return (struct ocfs2_dentry_lock *)lockres->l_priv;
230 }
231
232 static int ocfs2_lock_create(struct ocfs2_super *osb,
233                              struct ocfs2_lock_res *lockres,
234                              int level,
235                              int dlm_flags);
236 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
237                                                      int wanted);
238 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
239                                  struct ocfs2_lock_res *lockres,
240                                  int level);
241 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
242 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
243 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
244 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
245 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
246                                         struct ocfs2_lock_res *lockres);
247 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
248                                                 int convert);
249 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
250         mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
251                 "resource %s: %s\n", dlm_errname(_stat), _func, \
252                 _lockres->l_name, dlm_errmsg(_stat));           \
253 } while (0)
254 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
255                                  struct ocfs2_lock_res *lockres);
256 static int ocfs2_meta_lock_update(struct inode *inode,
257                                   struct buffer_head **bh);
258 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
259 static inline int ocfs2_highest_compat_lock_level(int level);
260 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
261                                                   struct ocfs2_lock_res *lockres,
262                                                   int new_level);
263
264 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
265                                   u64 blkno,
266                                   u32 generation,
267                                   char *name)
268 {
269         int len;
270
271         mlog_entry_void();
272
273         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
274
275         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
276                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
277                        (long long)blkno, generation);
278
279         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
280
281         mlog(0, "built lock resource with name: %s\n", name);
282
283         mlog_exit_void();
284 }
285
286 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
287
288 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
289                                        struct ocfs2_dlm_debug *dlm_debug)
290 {
291         mlog(0, "Add tracking for lockres %s\n", res->l_name);
292
293         spin_lock(&ocfs2_dlm_tracking_lock);
294         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
295         spin_unlock(&ocfs2_dlm_tracking_lock);
296 }
297
298 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
299 {
300         spin_lock(&ocfs2_dlm_tracking_lock);
301         if (!list_empty(&res->l_debug_list))
302                 list_del_init(&res->l_debug_list);
303         spin_unlock(&ocfs2_dlm_tracking_lock);
304 }
305
306 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
307                                        struct ocfs2_lock_res *res,
308                                        enum ocfs2_lock_type type,
309                                        struct ocfs2_lock_res_ops *ops,
310                                        void *priv)
311 {
312         res->l_type          = type;
313         res->l_ops           = ops;
314         res->l_priv          = priv;
315
316         res->l_level         = LKM_IVMODE;
317         res->l_requested     = LKM_IVMODE;
318         res->l_blocking      = LKM_IVMODE;
319         res->l_action        = OCFS2_AST_INVALID;
320         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
321
322         res->l_flags         = OCFS2_LOCK_INITIALIZED;
323
324         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
325 }
326
327 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
328 {
329         /* This also clears out the lock status block */
330         memset(res, 0, sizeof(struct ocfs2_lock_res));
331         spin_lock_init(&res->l_lock);
332         init_waitqueue_head(&res->l_event);
333         INIT_LIST_HEAD(&res->l_blocked_list);
334         INIT_LIST_HEAD(&res->l_mask_waiters);
335 }
336
337 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
338                                enum ocfs2_lock_type type,
339                                unsigned int generation,
340                                struct inode *inode)
341 {
342         struct ocfs2_lock_res_ops *ops;
343
344         switch(type) {
345                 case OCFS2_LOCK_TYPE_RW:
346                         ops = &ocfs2_inode_rw_lops;
347                         break;
348                 case OCFS2_LOCK_TYPE_META:
349                         ops = &ocfs2_inode_meta_lops;
350                         break;
351                 case OCFS2_LOCK_TYPE_DATA:
352                         ops = &ocfs2_inode_data_lops;
353                         break;
354                 default:
355                         mlog_bug_on_msg(1, "type: %d\n", type);
356                         ops = NULL; /* thanks, gcc */
357                         break;
358         };
359
360         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
361                               generation, res->l_name);
362         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
363 }
364
365 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
366 {
367         __be64 inode_blkno_be;
368
369         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
370                sizeof(__be64));
371
372         return be64_to_cpu(inode_blkno_be);
373 }
374
375 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
376                                 u64 parent, struct inode *inode)
377 {
378         int len;
379         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
380         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
381         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
382
383         ocfs2_lock_res_init_once(lockres);
384
385         /*
386          * Unfortunately, the standard lock naming scheme won't work
387          * here because we have two 16 byte values to use. Instead,
388          * we'll stuff the inode number as a binary value. We still
389          * want error prints to show something without garbling the
390          * display, so drop a null byte in there before the inode
391          * number. A future version of OCFS2 will likely use all
392          * binary lock names. The stringified names have been a
393          * tremendous aid in debugging, but now that the debugfs
394          * interface exists, we can mangle things there if need be.
395          *
396          * NOTE: We also drop the standard "pad" value (the total lock
397          * name size stays the same though - the last part is all
398          * zeros due to the memset in ocfs2_lock_res_init_once()
399          */
400         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
401                        "%c%016llx",
402                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
403                        (long long)parent);
404
405         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
406
407         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
408                sizeof(__be64));
409
410         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
411                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
412                                    dl);
413 }
414
415 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
416                                       struct ocfs2_super *osb)
417 {
418         /* Superblock lockres doesn't come from a slab so we call init
419          * once on it manually.  */
420         ocfs2_lock_res_init_once(res);
421         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
422                               0, res->l_name);
423         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
424                                    &ocfs2_super_lops, osb);
425 }
426
427 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
428                                        struct ocfs2_super *osb)
429 {
430         /* Rename lockres doesn't come from a slab so we call init
431          * once on it manually.  */
432         ocfs2_lock_res_init_once(res);
433         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
434         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
435                                    &ocfs2_rename_lops, osb);
436 }
437
438 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
439 {
440         mlog_entry_void();
441
442         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
443                 return;
444
445         ocfs2_remove_lockres_tracking(res);
446
447         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
448                         "Lockres %s is on the blocked list\n",
449                         res->l_name);
450         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
451                         "Lockres %s has mask waiters pending\n",
452                         res->l_name);
453         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
454                         "Lockres %s is locked\n",
455                         res->l_name);
456         mlog_bug_on_msg(res->l_ro_holders,
457                         "Lockres %s has %u ro holders\n",
458                         res->l_name, res->l_ro_holders);
459         mlog_bug_on_msg(res->l_ex_holders,
460                         "Lockres %s has %u ex holders\n",
461                         res->l_name, res->l_ex_holders);
462
463         /* Need to clear out the lock status block for the dlm */
464         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
465
466         res->l_flags = 0UL;
467         mlog_exit_void();
468 }
469
470 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
471                                      int level)
472 {
473         mlog_entry_void();
474
475         BUG_ON(!lockres);
476
477         switch(level) {
478         case LKM_EXMODE:
479                 lockres->l_ex_holders++;
480                 break;
481         case LKM_PRMODE:
482                 lockres->l_ro_holders++;
483                 break;
484         default:
485                 BUG();
486         }
487
488         mlog_exit_void();
489 }
490
491 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
492                                      int level)
493 {
494         mlog_entry_void();
495
496         BUG_ON(!lockres);
497
498         switch(level) {
499         case LKM_EXMODE:
500                 BUG_ON(!lockres->l_ex_holders);
501                 lockres->l_ex_holders--;
502                 break;
503         case LKM_PRMODE:
504                 BUG_ON(!lockres->l_ro_holders);
505                 lockres->l_ro_holders--;
506                 break;
507         default:
508                 BUG();
509         }
510         mlog_exit_void();
511 }
512
513 /* WARNING: This function lives in a world where the only three lock
514  * levels are EX, PR, and NL. It *will* have to be adjusted when more
515  * lock types are added. */
516 static inline int ocfs2_highest_compat_lock_level(int level)
517 {
518         int new_level = LKM_EXMODE;
519
520         if (level == LKM_EXMODE)
521                 new_level = LKM_NLMODE;
522         else if (level == LKM_PRMODE)
523                 new_level = LKM_PRMODE;
524         return new_level;
525 }
526
527 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
528                               unsigned long newflags)
529 {
530         struct list_head *pos, *tmp;
531         struct ocfs2_mask_waiter *mw;
532
533         assert_spin_locked(&lockres->l_lock);
534
535         lockres->l_flags = newflags;
536
537         list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
538                 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
539                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
540                         continue;
541
542                 list_del_init(&mw->mw_item);
543                 mw->mw_status = 0;
544                 complete(&mw->mw_complete);
545         }
546 }
547 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
548 {
549         lockres_set_flags(lockres, lockres->l_flags | or);
550 }
551 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
552                                 unsigned long clear)
553 {
554         lockres_set_flags(lockres, lockres->l_flags & ~clear);
555 }
556
557 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
558 {
559         mlog_entry_void();
560
561         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
562         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
563         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
564         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
565
566         lockres->l_level = lockres->l_requested;
567         if (lockres->l_level <=
568             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
569                 lockres->l_blocking = LKM_NLMODE;
570                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
571         }
572         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
573
574         mlog_exit_void();
575 }
576
577 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
578 {
579         mlog_entry_void();
580
581         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
582         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
583
584         /* Convert from RO to EX doesn't really need anything as our
585          * information is already up to data. Convert from NL to
586          * *anything* however should mark ourselves as needing an
587          * update */
588         if (lockres->l_level == LKM_NLMODE &&
589             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
590                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
591
592         lockres->l_level = lockres->l_requested;
593         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
594
595         mlog_exit_void();
596 }
597
598 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
599 {
600         mlog_entry_void();
601
602         BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
603         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
604
605         if (lockres->l_requested > LKM_NLMODE &&
606             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
607             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
608                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
609
610         lockres->l_level = lockres->l_requested;
611         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
612         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
613
614         mlog_exit_void();
615 }
616
617 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
618                                      int level)
619 {
620         int needs_downconvert = 0;
621         mlog_entry_void();
622
623         assert_spin_locked(&lockres->l_lock);
624
625         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
626
627         if (level > lockres->l_blocking) {
628                 /* only schedule a downconvert if we haven't already scheduled
629                  * one that goes low enough to satisfy the level we're
630                  * blocking.  this also catches the case where we get
631                  * duplicate BASTs */
632                 if (ocfs2_highest_compat_lock_level(level) <
633                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
634                         needs_downconvert = 1;
635
636                 lockres->l_blocking = level;
637         }
638
639         mlog_exit(needs_downconvert);
640         return needs_downconvert;
641 }
642
643 static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
644                                     struct ocfs2_lock_res *lockres,
645                                     int level)
646 {
647         int needs_downconvert;
648         unsigned long flags;
649
650         mlog_entry_void();
651
652         BUG_ON(level <= LKM_NLMODE);
653
654         spin_lock_irqsave(&lockres->l_lock, flags);
655         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
656         if (needs_downconvert)
657                 ocfs2_schedule_blocked_lock(osb, lockres);
658         spin_unlock_irqrestore(&lockres->l_lock, flags);
659
660         wake_up(&lockres->l_event);
661
662         ocfs2_kick_vote_thread(osb);
663
664         mlog_exit_void();
665 }
666
667 static void ocfs2_inode_bast_func(void *opaque, int level)
668 {
669         struct ocfs2_lock_res *lockres = opaque;
670         struct inode *inode;
671         struct ocfs2_super *osb;
672
673         mlog_entry_void();
674
675         BUG_ON(!ocfs2_is_inode_lock(lockres));
676
677         inode = ocfs2_lock_res_inode(lockres);
678         osb = OCFS2_SB(inode->i_sb);
679
680         mlog(0, "BAST fired for inode %llu, blocking %d, level %d type %s\n",
681              (unsigned long long)OCFS2_I(inode)->ip_blkno, level,
682              lockres->l_level, ocfs2_lock_type_string(lockres->l_type));
683
684         ocfs2_generic_bast_func(osb, lockres, level);
685
686         mlog_exit_void();
687 }
688
689 static void ocfs2_locking_ast(void *opaque)
690 {
691         struct ocfs2_lock_res *lockres = opaque;
692         struct dlm_lockstatus *lksb = &lockres->l_lksb;
693         unsigned long flags;
694
695         spin_lock_irqsave(&lockres->l_lock, flags);
696
697         if (lksb->status != DLM_NORMAL) {
698                 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
699                      lockres->l_name, lksb->status);
700                 spin_unlock_irqrestore(&lockres->l_lock, flags);
701                 return;
702         }
703
704         switch(lockres->l_action) {
705         case OCFS2_AST_ATTACH:
706                 ocfs2_generic_handle_attach_action(lockres);
707                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
708                 break;
709         case OCFS2_AST_CONVERT:
710                 ocfs2_generic_handle_convert_action(lockres);
711                 break;
712         case OCFS2_AST_DOWNCONVERT:
713                 ocfs2_generic_handle_downconvert_action(lockres);
714                 break;
715         default:
716                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
717                      "lockres flags = 0x%lx, unlock action: %u\n",
718                      lockres->l_name, lockres->l_action, lockres->l_flags,
719                      lockres->l_unlock_action);
720                 BUG();
721         }
722
723         /* set it to something invalid so if we get called again we
724          * can catch it. */
725         lockres->l_action = OCFS2_AST_INVALID;
726
727         wake_up(&lockres->l_event);
728         spin_unlock_irqrestore(&lockres->l_lock, flags);
729 }
730
731 static void ocfs2_super_bast_func(void *opaque,
732                                   int level)
733 {
734         struct ocfs2_lock_res *lockres = opaque;
735         struct ocfs2_super *osb;
736
737         mlog_entry_void();
738         mlog(0, "Superblock BAST fired\n");
739
740         BUG_ON(!ocfs2_is_super_lock(lockres));
741         osb = ocfs2_lock_res_super(lockres);
742         ocfs2_generic_bast_func(osb, lockres, level);
743
744         mlog_exit_void();
745 }
746
747 static void ocfs2_rename_bast_func(void *opaque,
748                                    int level)
749 {
750         struct ocfs2_lock_res *lockres = opaque;
751         struct ocfs2_super *osb;
752
753         mlog_entry_void();
754
755         mlog(0, "Rename BAST fired\n");
756
757         BUG_ON(!ocfs2_is_rename_lock(lockres));
758
759         osb = ocfs2_lock_res_super(lockres);
760         ocfs2_generic_bast_func(osb, lockres, level);
761
762         mlog_exit_void();
763 }
764
765 static void ocfs2_dentry_bast_func(void *opaque, int level)
766 {
767         struct ocfs2_lock_res *lockres = opaque;
768         struct ocfs2_dentry_lock *dl = lockres->l_priv;
769         struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
770
771         mlog(0, "Dentry bast: level: %d, name: %s\n", level,
772              lockres->l_name);
773
774         ocfs2_generic_bast_func(osb, lockres, level);
775 }
776
777 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
778                                                 int convert)
779 {
780         unsigned long flags;
781
782         mlog_entry_void();
783         spin_lock_irqsave(&lockres->l_lock, flags);
784         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
785         if (convert)
786                 lockres->l_action = OCFS2_AST_INVALID;
787         else
788                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
789         spin_unlock_irqrestore(&lockres->l_lock, flags);
790
791         wake_up(&lockres->l_event);
792         mlog_exit_void();
793 }
794
795 /* Note: If we detect another process working on the lock (i.e.,
796  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
797  * to do the right thing in that case.
798  */
799 static int ocfs2_lock_create(struct ocfs2_super *osb,
800                              struct ocfs2_lock_res *lockres,
801                              int level,
802                              int dlm_flags)
803 {
804         int ret = 0;
805         enum dlm_status status;
806         unsigned long flags;
807
808         mlog_entry_void();
809
810         mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
811              dlm_flags);
812
813         spin_lock_irqsave(&lockres->l_lock, flags);
814         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
815             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
816                 spin_unlock_irqrestore(&lockres->l_lock, flags);
817                 goto bail;
818         }
819
820         lockres->l_action = OCFS2_AST_ATTACH;
821         lockres->l_requested = level;
822         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
823         spin_unlock_irqrestore(&lockres->l_lock, flags);
824
825         status = dlmlock(osb->dlm,
826                          level,
827                          &lockres->l_lksb,
828                          dlm_flags,
829                          lockres->l_name,
830                          OCFS2_LOCK_ID_MAX_LEN - 1,
831                          ocfs2_locking_ast,
832                          lockres,
833                          lockres->l_ops->bast);
834         if (status != DLM_NORMAL) {
835                 ocfs2_log_dlm_error("dlmlock", status, lockres);
836                 ret = -EINVAL;
837                 ocfs2_recover_from_dlm_error(lockres, 1);
838         }
839
840         mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
841
842 bail:
843         mlog_exit(ret);
844         return ret;
845 }
846
847 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
848                                         int flag)
849 {
850         unsigned long flags;
851         int ret;
852
853         spin_lock_irqsave(&lockres->l_lock, flags);
854         ret = lockres->l_flags & flag;
855         spin_unlock_irqrestore(&lockres->l_lock, flags);
856
857         return ret;
858 }
859
860 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
861
862 {
863         wait_event(lockres->l_event,
864                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
865 }
866
867 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
868
869 {
870         wait_event(lockres->l_event,
871                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
872 }
873
874 /* predict what lock level we'll be dropping down to on behalf
875  * of another node, and return true if the currently wanted
876  * level will be compatible with it. */
877 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
878                                                      int wanted)
879 {
880         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
881
882         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
883 }
884
885 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
886 {
887         INIT_LIST_HEAD(&mw->mw_item);
888         init_completion(&mw->mw_complete);
889 }
890
891 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
892 {
893         wait_for_completion(&mw->mw_complete);
894         /* Re-arm the completion in case we want to wait on it again */
895         INIT_COMPLETION(mw->mw_complete);
896         return mw->mw_status;
897 }
898
899 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
900                                     struct ocfs2_mask_waiter *mw,
901                                     unsigned long mask,
902                                     unsigned long goal)
903 {
904         BUG_ON(!list_empty(&mw->mw_item));
905
906         assert_spin_locked(&lockres->l_lock);
907
908         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
909         mw->mw_mask = mask;
910         mw->mw_goal = goal;
911 }
912
913 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
914  * if the mask still hadn't reached its goal */
915 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
916                                       struct ocfs2_mask_waiter *mw)
917 {
918         unsigned long flags;
919         int ret = 0;
920
921         spin_lock_irqsave(&lockres->l_lock, flags);
922         if (!list_empty(&mw->mw_item)) {
923                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
924                         ret = -EBUSY;
925
926                 list_del_init(&mw->mw_item);
927                 init_completion(&mw->mw_complete);
928         }
929         spin_unlock_irqrestore(&lockres->l_lock, flags);
930
931         return ret;
932
933 }
934
935 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
936                               struct ocfs2_lock_res *lockres,
937                               int level,
938                               int lkm_flags,
939                               int arg_flags)
940 {
941         struct ocfs2_mask_waiter mw;
942         enum dlm_status status;
943         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
944         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
945         unsigned long flags;
946
947         mlog_entry_void();
948
949         ocfs2_init_mask_waiter(&mw);
950
951 again:
952         wait = 0;
953
954         if (catch_signals && signal_pending(current)) {
955                 ret = -ERESTARTSYS;
956                 goto out;
957         }
958
959         spin_lock_irqsave(&lockres->l_lock, flags);
960
961         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
962                         "Cluster lock called on freeing lockres %s! flags "
963                         "0x%lx\n", lockres->l_name, lockres->l_flags);
964
965         /* We only compare against the currently granted level
966          * here. If the lock is blocked waiting on a downconvert,
967          * we'll get caught below. */
968         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
969             level > lockres->l_level) {
970                 /* is someone sitting in dlm_lock? If so, wait on
971                  * them. */
972                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
973                 wait = 1;
974                 goto unlock;
975         }
976
977         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
978                 /* lock has not been created yet. */
979                 spin_unlock_irqrestore(&lockres->l_lock, flags);
980
981                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
982                 if (ret < 0) {
983                         mlog_errno(ret);
984                         goto out;
985                 }
986                 goto again;
987         }
988
989         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
990             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
991                 /* is the lock is currently blocked on behalf of
992                  * another node */
993                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
994                 wait = 1;
995                 goto unlock;
996         }
997
998         if (level > lockres->l_level) {
999                 if (lockres->l_action != OCFS2_AST_INVALID)
1000                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
1001                              lockres->l_name, lockres->l_action);
1002
1003                 lockres->l_action = OCFS2_AST_CONVERT;
1004                 lockres->l_requested = level;
1005                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1006                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1007
1008                 BUG_ON(level == LKM_IVMODE);
1009                 BUG_ON(level == LKM_NLMODE);
1010
1011                 mlog(0, "lock %s, convert from %d to level = %d\n",
1012                      lockres->l_name, lockres->l_level, level);
1013
1014                 /* call dlm_lock to upgrade lock now */
1015                 status = dlmlock(osb->dlm,
1016                                  level,
1017                                  &lockres->l_lksb,
1018                                  lkm_flags|LKM_CONVERT|LKM_VALBLK,
1019                                  lockres->l_name,
1020                                  OCFS2_LOCK_ID_MAX_LEN - 1,
1021                                  ocfs2_locking_ast,
1022                                  lockres,
1023                                  lockres->l_ops->bast);
1024                 if (status != DLM_NORMAL) {
1025                         if ((lkm_flags & LKM_NOQUEUE) &&
1026                             (status == DLM_NOTQUEUED))
1027                                 ret = -EAGAIN;
1028                         else {
1029                                 ocfs2_log_dlm_error("dlmlock", status,
1030                                                     lockres);
1031                                 ret = -EINVAL;
1032                         }
1033                         ocfs2_recover_from_dlm_error(lockres, 1);
1034                         goto out;
1035                 }
1036
1037                 mlog(0, "lock %s, successfull return from dlmlock\n",
1038                      lockres->l_name);
1039
1040                 /* At this point we've gone inside the dlm and need to
1041                  * complete our work regardless. */
1042                 catch_signals = 0;
1043
1044                 /* wait for busy to clear and carry on */
1045                 goto again;
1046         }
1047
1048         /* Ok, if we get here then we're good to go. */
1049         ocfs2_inc_holders(lockres, level);
1050
1051         ret = 0;
1052 unlock:
1053         spin_unlock_irqrestore(&lockres->l_lock, flags);
1054 out:
1055         /*
1056          * This is helping work around a lock inversion between the page lock
1057          * and dlm locks.  One path holds the page lock while calling aops
1058          * which block acquiring dlm locks.  The voting thread holds dlm
1059          * locks while acquiring page locks while down converting data locks.
1060          * This block is helping an aop path notice the inversion and back
1061          * off to unlock its page lock before trying the dlm lock again.
1062          */
1063         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1064             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1065                 wait = 0;
1066                 if (lockres_remove_mask_waiter(lockres, &mw))
1067                         ret = -EAGAIN;
1068                 else
1069                         goto again;
1070         }
1071         if (wait) {
1072                 ret = ocfs2_wait_for_mask(&mw);
1073                 if (ret == 0)
1074                         goto again;
1075                 mlog_errno(ret);
1076         }
1077
1078         mlog_exit(ret);
1079         return ret;
1080 }
1081
1082 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1083                                  struct ocfs2_lock_res *lockres,
1084                                  int level)
1085 {
1086         unsigned long flags;
1087
1088         mlog_entry_void();
1089         spin_lock_irqsave(&lockres->l_lock, flags);
1090         ocfs2_dec_holders(lockres, level);
1091         ocfs2_vote_on_unlock(osb, lockres);
1092         spin_unlock_irqrestore(&lockres->l_lock, flags);
1093         mlog_exit_void();
1094 }
1095
1096 int ocfs2_create_new_lock(struct ocfs2_super *osb,
1097                           struct ocfs2_lock_res *lockres,
1098                           int ex,
1099                           int local)
1100 {
1101         int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1102         unsigned long flags;
1103         int lkm_flags = local ? LKM_LOCAL : 0;
1104
1105         spin_lock_irqsave(&lockres->l_lock, flags);
1106         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1107         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1108         spin_unlock_irqrestore(&lockres->l_lock, flags);
1109
1110         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1111 }
1112
1113 /* Grants us an EX lock on the data and metadata resources, skipping
1114  * the normal cluster directory lookup. Use this ONLY on newly created
1115  * inodes which other nodes can't possibly see, and which haven't been
1116  * hashed in the inode hash yet. This can give us a good performance
1117  * increase as it'll skip the network broadcast normally associated
1118  * with creating a new lock resource. */
1119 int ocfs2_create_new_inode_locks(struct inode *inode)
1120 {
1121         int ret;
1122         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1123
1124         BUG_ON(!inode);
1125         BUG_ON(!ocfs2_inode_is_new(inode));
1126
1127         mlog_entry_void();
1128
1129         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1130
1131         /* NOTE: That we don't increment any of the holder counts, nor
1132          * do we add anything to a journal handle. Since this is
1133          * supposed to be a new inode which the cluster doesn't know
1134          * about yet, there is no need to.  As far as the LVB handling
1135          * is concerned, this is basically like acquiring an EX lock
1136          * on a resource which has an invalid one -- we'll set it
1137          * valid when we release the EX. */
1138
1139         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1140         if (ret) {
1141                 mlog_errno(ret);
1142                 goto bail;
1143         }
1144
1145         /*
1146          * We don't want to use LKM_LOCAL on a meta data lock as they
1147          * don't use a generation in their lock names.
1148          */
1149         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1150         if (ret) {
1151                 mlog_errno(ret);
1152                 goto bail;
1153         }
1154
1155         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1156         if (ret) {
1157                 mlog_errno(ret);
1158                 goto bail;
1159         }
1160
1161 bail:
1162         mlog_exit(ret);
1163         return ret;
1164 }
1165
1166 int ocfs2_rw_lock(struct inode *inode, int write)
1167 {
1168         int status, level;
1169         struct ocfs2_lock_res *lockres;
1170
1171         BUG_ON(!inode);
1172
1173         mlog_entry_void();
1174
1175         mlog(0, "inode %llu take %s RW lock\n",
1176              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1177              write ? "EXMODE" : "PRMODE");
1178
1179         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1180
1181         level = write ? LKM_EXMODE : LKM_PRMODE;
1182
1183         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1184                                     0);
1185         if (status < 0)
1186                 mlog_errno(status);
1187
1188         mlog_exit(status);
1189         return status;
1190 }
1191
1192 void ocfs2_rw_unlock(struct inode *inode, int write)
1193 {
1194         int level = write ? LKM_EXMODE : LKM_PRMODE;
1195         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1196
1197         mlog_entry_void();
1198
1199         mlog(0, "inode %llu drop %s RW lock\n",
1200              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1201              write ? "EXMODE" : "PRMODE");
1202
1203         ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1204
1205         mlog_exit_void();
1206 }
1207
1208 int ocfs2_data_lock_full(struct inode *inode,
1209                          int write,
1210                          int arg_flags)
1211 {
1212         int status = 0, level;
1213         struct ocfs2_lock_res *lockres;
1214
1215         BUG_ON(!inode);
1216
1217         mlog_entry_void();
1218
1219         mlog(0, "inode %llu take %s DATA lock\n",
1220              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1221              write ? "EXMODE" : "PRMODE");
1222
1223         /* We'll allow faking a readonly data lock for
1224          * rodevices. */
1225         if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1226                 if (write) {
1227                         status = -EROFS;
1228                         mlog_errno(status);
1229                 }
1230                 goto out;
1231         }
1232
1233         lockres = &OCFS2_I(inode)->ip_data_lockres;
1234
1235         level = write ? LKM_EXMODE : LKM_PRMODE;
1236
1237         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1238                                     0, arg_flags);
1239         if (status < 0 && status != -EAGAIN)
1240                 mlog_errno(status);
1241
1242 out:
1243         mlog_exit(status);
1244         return status;
1245 }
1246
1247 /* see ocfs2_meta_lock_with_page() */
1248 int ocfs2_data_lock_with_page(struct inode *inode,
1249                               int write,
1250                               struct page *page)
1251 {
1252         int ret;
1253
1254         ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1255         if (ret == -EAGAIN) {
1256                 unlock_page(page);
1257                 if (ocfs2_data_lock(inode, write) == 0)
1258                         ocfs2_data_unlock(inode, write);
1259                 ret = AOP_TRUNCATED_PAGE;
1260         }
1261
1262         return ret;
1263 }
1264
1265 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1266                                  struct ocfs2_lock_res *lockres)
1267 {
1268         int kick = 0;
1269
1270         mlog_entry_void();
1271
1272         /* If we know that another node is waiting on our lock, kick
1273          * the vote thread * pre-emptively when we reach a release
1274          * condition. */
1275         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1276                 switch(lockres->l_blocking) {
1277                 case LKM_EXMODE:
1278                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1279                                 kick = 1;
1280                         break;
1281                 case LKM_PRMODE:
1282                         if (!lockres->l_ex_holders)
1283                                 kick = 1;
1284                         break;
1285                 default:
1286                         BUG();
1287                 }
1288         }
1289
1290         if (kick)
1291                 ocfs2_kick_vote_thread(osb);
1292
1293         mlog_exit_void();
1294 }
1295
1296 void ocfs2_data_unlock(struct inode *inode,
1297                        int write)
1298 {
1299         int level = write ? LKM_EXMODE : LKM_PRMODE;
1300         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1301
1302         mlog_entry_void();
1303
1304         mlog(0, "inode %llu drop %s DATA lock\n",
1305              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1306              write ? "EXMODE" : "PRMODE");
1307
1308         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1309                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1310
1311         mlog_exit_void();
1312 }
1313
1314 #define OCFS2_SEC_BITS   34
1315 #define OCFS2_SEC_SHIFT  (64 - 34)
1316 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1317
1318 /* LVB only has room for 64 bits of time here so we pack it for
1319  * now. */
1320 static u64 ocfs2_pack_timespec(struct timespec *spec)
1321 {
1322         u64 res;
1323         u64 sec = spec->tv_sec;
1324         u32 nsec = spec->tv_nsec;
1325
1326         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1327
1328         return res;
1329 }
1330
1331 /* Call this with the lockres locked. I am reasonably sure we don't
1332  * need ip_lock in this function as anyone who would be changing those
1333  * values is supposed to be blocked in ocfs2_meta_lock right now. */
1334 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1335 {
1336         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1337         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1338         struct ocfs2_meta_lvb *lvb;
1339
1340         mlog_entry_void();
1341
1342         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1343
1344         /*
1345          * Invalidate the LVB of a deleted inode - this way other
1346          * nodes are forced to go to disk and discover the new inode
1347          * status.
1348          */
1349         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1350                 lvb->lvb_version = 0;
1351                 goto out;
1352         }
1353
1354         lvb->lvb_version   = OCFS2_LVB_VERSION;
1355         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1356         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1357         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1358         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1359         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1360         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1361         lvb->lvb_iatime_packed  =
1362                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1363         lvb->lvb_ictime_packed =
1364                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1365         lvb->lvb_imtime_packed =
1366                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1367         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1368         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1369
1370 out:
1371         mlog_meta_lvb(0, lockres);
1372
1373         mlog_exit_void();
1374 }
1375
1376 static void ocfs2_unpack_timespec(struct timespec *spec,
1377                                   u64 packed_time)
1378 {
1379         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1380         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1381 }
1382
1383 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1384 {
1385         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1386         struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1387         struct ocfs2_meta_lvb *lvb;
1388
1389         mlog_entry_void();
1390
1391         mlog_meta_lvb(0, lockres);
1392
1393         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1394
1395         /* We're safe here without the lockres lock... */
1396         spin_lock(&oi->ip_lock);
1397         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1398         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1399
1400         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1401         ocfs2_set_inode_flags(inode);
1402
1403         /* fast-symlinks are a special case */
1404         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1405                 inode->i_blocks = 0;
1406         else
1407                 inode->i_blocks =
1408                         ocfs2_align_bytes_to_sectors(i_size_read(inode));
1409
1410         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1411         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1412         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1413         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1414         ocfs2_unpack_timespec(&inode->i_atime,
1415                               be64_to_cpu(lvb->lvb_iatime_packed));
1416         ocfs2_unpack_timespec(&inode->i_mtime,
1417                               be64_to_cpu(lvb->lvb_imtime_packed));
1418         ocfs2_unpack_timespec(&inode->i_ctime,
1419                               be64_to_cpu(lvb->lvb_ictime_packed));
1420         spin_unlock(&oi->ip_lock);
1421
1422         mlog_exit_void();
1423 }
1424
1425 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1426                                               struct ocfs2_lock_res *lockres)
1427 {
1428         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1429
1430         if (lvb->lvb_version == OCFS2_LVB_VERSION
1431             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1432                 return 1;
1433         return 0;
1434 }
1435
1436 /* Determine whether a lock resource needs to be refreshed, and
1437  * arbitrate who gets to refresh it.
1438  *
1439  *   0 means no refresh needed.
1440  *
1441  *   > 0 means you need to refresh this and you MUST call
1442  *   ocfs2_complete_lock_res_refresh afterwards. */
1443 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1444 {
1445         unsigned long flags;
1446         int status = 0;
1447
1448         mlog_entry_void();
1449
1450 refresh_check:
1451         spin_lock_irqsave(&lockres->l_lock, flags);
1452         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1453                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1454                 goto bail;
1455         }
1456
1457         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1458                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1459
1460                 ocfs2_wait_on_refreshing_lock(lockres);
1461                 goto refresh_check;
1462         }
1463
1464         /* Ok, I'll be the one to refresh this lock. */
1465         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1466         spin_unlock_irqrestore(&lockres->l_lock, flags);
1467
1468         status = 1;
1469 bail:
1470         mlog_exit(status);
1471         return status;
1472 }
1473
1474 /* If status is non zero, I'll mark it as not being in refresh
1475  * anymroe, but i won't clear the needs refresh flag. */
1476 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1477                                                    int status)
1478 {
1479         unsigned long flags;
1480         mlog_entry_void();
1481
1482         spin_lock_irqsave(&lockres->l_lock, flags);
1483         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1484         if (!status)
1485                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1486         spin_unlock_irqrestore(&lockres->l_lock, flags);
1487
1488         wake_up(&lockres->l_event);
1489
1490         mlog_exit_void();
1491 }
1492
1493 /* may or may not return a bh if it went to disk. */
1494 static int ocfs2_meta_lock_update(struct inode *inode,
1495                                   struct buffer_head **bh)
1496 {
1497         int status = 0;
1498         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1499         struct ocfs2_lock_res *lockres;
1500         struct ocfs2_dinode *fe;
1501
1502         mlog_entry_void();
1503
1504         spin_lock(&oi->ip_lock);
1505         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1506                 mlog(0, "Orphaned inode %llu was deleted while we "
1507                      "were waiting on a lock. ip_flags = 0x%x\n",
1508                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1509                 spin_unlock(&oi->ip_lock);
1510                 status = -ENOENT;
1511                 goto bail;
1512         }
1513         spin_unlock(&oi->ip_lock);
1514
1515         lockres = &oi->ip_meta_lockres;
1516
1517         if (!ocfs2_should_refresh_lock_res(lockres))
1518                 goto bail;
1519
1520         /* This will discard any caching information we might have had
1521          * for the inode metadata. */
1522         ocfs2_metadata_cache_purge(inode);
1523
1524         /* will do nothing for inode types that don't use the extent
1525          * map (directories, bitmap files, etc) */
1526         ocfs2_extent_map_trunc(inode, 0);
1527
1528         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1529                 mlog(0, "Trusting LVB on inode %llu\n",
1530                      (unsigned long long)oi->ip_blkno);
1531                 ocfs2_refresh_inode_from_lvb(inode);
1532         } else {
1533                 /* Boo, we have to go to disk. */
1534                 /* read bh, cast, ocfs2_refresh_inode */
1535                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1536                                           bh, OCFS2_BH_CACHED, inode);
1537                 if (status < 0) {
1538                         mlog_errno(status);
1539                         goto bail_refresh;
1540                 }
1541                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1542
1543                 /* This is a good chance to make sure we're not
1544                  * locking an invalid object.
1545                  *
1546                  * We bug on a stale inode here because we checked
1547                  * above whether it was wiped from disk. The wiping
1548                  * node provides a guarantee that we receive that
1549                  * message and can mark the inode before dropping any
1550                  * locks associated with it. */
1551                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1552                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1553                         status = -EIO;
1554                         goto bail_refresh;
1555                 }
1556                 mlog_bug_on_msg(inode->i_generation !=
1557                                 le32_to_cpu(fe->i_generation),
1558                                 "Invalid dinode %llu disk generation: %u "
1559                                 "inode->i_generation: %u\n",
1560                                 (unsigned long long)oi->ip_blkno,
1561                                 le32_to_cpu(fe->i_generation),
1562                                 inode->i_generation);
1563                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1564                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1565                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1566                                 (unsigned long long)oi->ip_blkno,
1567                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1568                                 le32_to_cpu(fe->i_flags));
1569
1570                 ocfs2_refresh_inode(inode, fe);
1571         }
1572
1573         status = 0;
1574 bail_refresh:
1575         ocfs2_complete_lock_res_refresh(lockres, status);
1576 bail:
1577         mlog_exit(status);
1578         return status;
1579 }
1580
1581 static int ocfs2_assign_bh(struct inode *inode,
1582                            struct buffer_head **ret_bh,
1583                            struct buffer_head *passed_bh)
1584 {
1585         int status;
1586
1587         if (passed_bh) {
1588                 /* Ok, the update went to disk for us, use the
1589                  * returned bh. */
1590                 *ret_bh = passed_bh;
1591                 get_bh(*ret_bh);
1592
1593                 return 0;
1594         }
1595
1596         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1597                                   OCFS2_I(inode)->ip_blkno,
1598                                   ret_bh,
1599                                   OCFS2_BH_CACHED,
1600                                   inode);
1601         if (status < 0)
1602                 mlog_errno(status);
1603
1604         return status;
1605 }
1606
1607 /*
1608  * returns < 0 error if the callback will never be called, otherwise
1609  * the result of the lock will be communicated via the callback.
1610  */
1611 int ocfs2_meta_lock_full(struct inode *inode,
1612                          struct ocfs2_journal_handle *handle,
1613                          struct buffer_head **ret_bh,
1614                          int ex,
1615                          int arg_flags)
1616 {
1617         int status, level, dlm_flags, acquired;
1618         struct ocfs2_lock_res *lockres;
1619         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1620         struct buffer_head *local_bh = NULL;
1621
1622         BUG_ON(!inode);
1623
1624         mlog_entry_void();
1625
1626         mlog(0, "inode %llu, take %s META lock\n",
1627              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1628              ex ? "EXMODE" : "PRMODE");
1629
1630         status = 0;
1631         acquired = 0;
1632         /* We'll allow faking a readonly metadata lock for
1633          * rodevices. */
1634         if (ocfs2_is_hard_readonly(osb)) {
1635                 if (ex)
1636                         status = -EROFS;
1637                 goto bail;
1638         }
1639
1640         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1641                 wait_event(osb->recovery_event,
1642                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1643
1644         acquired = 0;
1645         lockres = &OCFS2_I(inode)->ip_meta_lockres;
1646         level = ex ? LKM_EXMODE : LKM_PRMODE;
1647         dlm_flags = 0;
1648         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1649                 dlm_flags |= LKM_NOQUEUE;
1650
1651         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1652         if (status < 0) {
1653                 if (status != -EAGAIN && status != -EIOCBRETRY)
1654                         mlog_errno(status);
1655                 goto bail;
1656         }
1657
1658         /* Notify the error cleanup path to drop the cluster lock. */
1659         acquired = 1;
1660
1661         /* We wait twice because a node may have died while we were in
1662          * the lower dlm layers. The second time though, we've
1663          * committed to owning this lock so we don't allow signals to
1664          * abort the operation. */
1665         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1666                 wait_event(osb->recovery_event,
1667                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1668
1669         /*
1670          * We only see this flag if we're being called from
1671          * ocfs2_read_locked_inode(). It means we're locking an inode
1672          * which hasn't been populated yet, so clear the refresh flag
1673          * and let the caller handle it.
1674          */
1675         if (inode->i_state & I_NEW) {
1676                 status = 0;
1677                 ocfs2_complete_lock_res_refresh(lockres, 0);
1678                 goto bail;
1679         }
1680
1681         /* This is fun. The caller may want a bh back, or it may
1682          * not. ocfs2_meta_lock_update definitely wants one in, but
1683          * may or may not read one, depending on what's in the
1684          * LVB. The result of all of this is that we've *only* gone to
1685          * disk if we have to, so the complexity is worthwhile. */
1686         status = ocfs2_meta_lock_update(inode, &local_bh);
1687         if (status < 0) {
1688                 if (status != -ENOENT)
1689                         mlog_errno(status);
1690                 goto bail;
1691         }
1692
1693         if (ret_bh) {
1694                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1695                 if (status < 0) {
1696                         mlog_errno(status);
1697                         goto bail;
1698                 }
1699         }
1700
1701         if (handle) {
1702                 status = ocfs2_handle_add_lock(handle, inode);
1703                 if (status < 0)
1704                         mlog_errno(status);
1705         }
1706
1707 bail:
1708         if (status < 0) {
1709                 if (ret_bh && (*ret_bh)) {
1710                         brelse(*ret_bh);
1711                         *ret_bh = NULL;
1712                 }
1713                 if (acquired)
1714                         ocfs2_meta_unlock(inode, ex);
1715         }
1716
1717         if (local_bh)
1718                 brelse(local_bh);
1719
1720         mlog_exit(status);
1721         return status;
1722 }
1723
1724 /*
1725  * This is working around a lock inversion between tasks acquiring DLM locks
1726  * while holding a page lock and the vote thread which blocks dlm lock acquiry
1727  * while acquiring page locks.
1728  *
1729  * ** These _with_page variantes are only intended to be called from aop
1730  * methods that hold page locks and return a very specific *positive* error
1731  * code that aop methods pass up to the VFS -- test for errors with != 0. **
1732  *
1733  * The DLM is called such that it returns -EAGAIN if it would have blocked
1734  * waiting for the vote thread.  In that case we unlock our page so the vote
1735  * thread can make progress.  Once we've done this we have to return
1736  * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1737  * into the VFS who will then immediately retry the aop call.
1738  *
1739  * We do a blocking lock and immediate unlock before returning, though, so that
1740  * the lock has a great chance of being cached on this node by the time the VFS
1741  * calls back to retry the aop.    This has a potential to livelock as nodes
1742  * ping locks back and forth, but that's a risk we're willing to take to avoid
1743  * the lock inversion simply.
1744  */
1745 int ocfs2_meta_lock_with_page(struct inode *inode,
1746                               struct ocfs2_journal_handle *handle,
1747                               struct buffer_head **ret_bh,
1748                               int ex,
1749                               struct page *page)
1750 {
1751         int ret;
1752
1753         ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1754                                    OCFS2_LOCK_NONBLOCK);
1755         if (ret == -EAGAIN) {
1756                 unlock_page(page);
1757                 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1758                         ocfs2_meta_unlock(inode, ex);
1759                 ret = AOP_TRUNCATED_PAGE;
1760         }
1761
1762         return ret;
1763 }
1764
1765 void ocfs2_meta_unlock(struct inode *inode,
1766                        int ex)
1767 {
1768         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1769         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1770
1771         mlog_entry_void();
1772
1773         mlog(0, "inode %llu drop %s META lock\n",
1774              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1775              ex ? "EXMODE" : "PRMODE");
1776
1777         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1778                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1779
1780         mlog_exit_void();
1781 }
1782
1783 int ocfs2_super_lock(struct ocfs2_super *osb,
1784                      int ex)
1785 {
1786         int status;
1787         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1788         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1789         struct buffer_head *bh;
1790         struct ocfs2_slot_info *si = osb->slot_info;
1791
1792         mlog_entry_void();
1793
1794         if (ocfs2_is_hard_readonly(osb))
1795                 return -EROFS;
1796
1797         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1798         if (status < 0) {
1799                 mlog_errno(status);
1800                 goto bail;
1801         }
1802
1803         /* The super block lock path is really in the best position to
1804          * know when resources covered by the lock need to be
1805          * refreshed, so we do it here. Of course, making sense of
1806          * everything is up to the caller :) */
1807         status = ocfs2_should_refresh_lock_res(lockres);
1808         if (status < 0) {
1809                 mlog_errno(status);
1810                 goto bail;
1811         }
1812         if (status) {
1813                 bh = si->si_bh;
1814                 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1815                                           si->si_inode);
1816                 if (status == 0)
1817                         ocfs2_update_slot_info(si);
1818
1819                 ocfs2_complete_lock_res_refresh(lockres, status);
1820
1821                 if (status < 0)
1822                         mlog_errno(status);
1823         }
1824 bail:
1825         mlog_exit(status);
1826         return status;
1827 }
1828
1829 void ocfs2_super_unlock(struct ocfs2_super *osb,
1830                         int ex)
1831 {
1832         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1833         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1834
1835         ocfs2_cluster_unlock(osb, lockres, level);
1836 }
1837
1838 int ocfs2_rename_lock(struct ocfs2_super *osb)
1839 {
1840         int status;
1841         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1842
1843         if (ocfs2_is_hard_readonly(osb))
1844                 return -EROFS;
1845
1846         status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1847         if (status < 0)
1848                 mlog_errno(status);
1849
1850         return status;
1851 }
1852
1853 void ocfs2_rename_unlock(struct ocfs2_super *osb)
1854 {
1855         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1856
1857         ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1858 }
1859
1860 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1861 {
1862         int ret;
1863         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1864         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1865         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1866
1867         BUG_ON(!dl);
1868
1869         if (ocfs2_is_hard_readonly(osb))
1870                 return -EROFS;
1871
1872         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1873         if (ret < 0)
1874                 mlog_errno(ret);
1875
1876         return ret;
1877 }
1878
1879 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1880 {
1881         int level = ex ? LKM_EXMODE : LKM_PRMODE;
1882         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1883         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1884
1885         ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1886 }
1887
1888 /* Reference counting of the dlm debug structure. We want this because
1889  * open references on the debug inodes can live on after a mount, so
1890  * we can't rely on the ocfs2_super to always exist. */
1891 static void ocfs2_dlm_debug_free(struct kref *kref)
1892 {
1893         struct ocfs2_dlm_debug *dlm_debug;
1894
1895         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1896
1897         kfree(dlm_debug);
1898 }
1899
1900 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1901 {
1902         if (dlm_debug)
1903                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1904 }
1905
1906 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1907 {
1908         kref_get(&debug->d_refcnt);
1909 }
1910
1911 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1912 {
1913         struct ocfs2_dlm_debug *dlm_debug;
1914
1915         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1916         if (!dlm_debug) {
1917                 mlog_errno(-ENOMEM);
1918                 goto out;
1919         }
1920
1921         kref_init(&dlm_debug->d_refcnt);
1922         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1923         dlm_debug->d_locking_state = NULL;
1924 out:
1925         return dlm_debug;
1926 }
1927
1928 /* Access to this is arbitrated for us via seq_file->sem. */
1929 struct ocfs2_dlm_seq_priv {
1930         struct ocfs2_dlm_debug *p_dlm_debug;
1931         struct ocfs2_lock_res p_iter_res;
1932         struct ocfs2_lock_res p_tmp_res;
1933 };
1934
1935 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1936                                                  struct ocfs2_dlm_seq_priv *priv)
1937 {
1938         struct ocfs2_lock_res *iter, *ret = NULL;
1939         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1940
1941         assert_spin_locked(&ocfs2_dlm_tracking_lock);
1942
1943         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1944                 /* discover the head of the list */
1945                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1946                         mlog(0, "End of list found, %p\n", ret);
1947                         break;
1948                 }
1949
1950                 /* We track our "dummy" iteration lockres' by a NULL
1951                  * l_ops field. */
1952                 if (iter->l_ops != NULL) {
1953                         ret = iter;
1954                         break;
1955                 }
1956         }
1957
1958         return ret;
1959 }
1960
1961 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1962 {
1963         struct ocfs2_dlm_seq_priv *priv = m->private;
1964         struct ocfs2_lock_res *iter;
1965
1966         spin_lock(&ocfs2_dlm_tracking_lock);
1967         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1968         if (iter) {
1969                 /* Since lockres' have the lifetime of their container
1970                  * (which can be inodes, ocfs2_supers, etc) we want to
1971                  * copy this out to a temporary lockres while still
1972                  * under the spinlock. Obviously after this we can't
1973                  * trust any pointers on the copy returned, but that's
1974                  * ok as the information we want isn't typically held
1975                  * in them. */
1976                 priv->p_tmp_res = *iter;
1977                 iter = &priv->p_tmp_res;
1978         }
1979         spin_unlock(&ocfs2_dlm_tracking_lock);
1980
1981         return iter;
1982 }
1983
1984 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1985 {
1986 }
1987
1988 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1989 {
1990         struct ocfs2_dlm_seq_priv *priv = m->private;
1991         struct ocfs2_lock_res *iter = v;
1992         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1993
1994         spin_lock(&ocfs2_dlm_tracking_lock);
1995         iter = ocfs2_dlm_next_res(iter, priv);
1996         list_del_init(&dummy->l_debug_list);
1997         if (iter) {
1998                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
1999                 priv->p_tmp_res = *iter;
2000                 iter = &priv->p_tmp_res;
2001         }
2002         spin_unlock(&ocfs2_dlm_tracking_lock);
2003
2004         return iter;
2005 }
2006
2007 /* So that debugfs.ocfs2 can determine which format is being used */
2008 #define OCFS2_DLM_DEBUG_STR_VERSION 1
2009 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2010 {
2011         int i;
2012         char *lvb;
2013         struct ocfs2_lock_res *lockres = v;
2014
2015         if (!lockres)
2016                 return -EINVAL;
2017
2018         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2019
2020         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2021                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2022                            lockres->l_name,
2023                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2024         else
2025                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2026
2027         seq_printf(m, "%d\t"
2028                    "0x%lx\t"
2029                    "0x%x\t"
2030                    "0x%x\t"
2031                    "%u\t"
2032                    "%u\t"
2033                    "%d\t"
2034                    "%d\t",
2035                    lockres->l_level,
2036                    lockres->l_flags,
2037                    lockres->l_action,
2038                    lockres->l_unlock_action,
2039                    lockres->l_ro_holders,
2040                    lockres->l_ex_holders,
2041                    lockres->l_requested,
2042                    lockres->l_blocking);
2043
2044         /* Dump the raw LVB */
2045         lvb = lockres->l_lksb.lvb;
2046         for(i = 0; i < DLM_LVB_LEN; i++)
2047                 seq_printf(m, "0x%x\t", lvb[i]);
2048
2049         /* End the line */
2050         seq_printf(m, "\n");
2051         return 0;
2052 }
2053
2054 static struct seq_operations ocfs2_dlm_seq_ops = {
2055         .start =        ocfs2_dlm_seq_start,
2056         .stop =         ocfs2_dlm_seq_stop,
2057         .next =         ocfs2_dlm_seq_next,
2058         .show =         ocfs2_dlm_seq_show,
2059 };
2060
2061 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2062 {
2063         struct seq_file *seq = (struct seq_file *) file->private_data;
2064         struct ocfs2_dlm_seq_priv *priv = seq->private;
2065         struct ocfs2_lock_res *res = &priv->p_iter_res;
2066
2067         ocfs2_remove_lockres_tracking(res);
2068         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2069         return seq_release_private(inode, file);
2070 }
2071
2072 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2073 {
2074         int ret;
2075         struct ocfs2_dlm_seq_priv *priv;
2076         struct seq_file *seq;
2077         struct ocfs2_super *osb;
2078
2079         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2080         if (!priv) {
2081                 ret = -ENOMEM;
2082                 mlog_errno(ret);
2083                 goto out;
2084         }
2085         osb = (struct ocfs2_super *) inode->u.generic_ip;
2086         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2087         priv->p_dlm_debug = osb->osb_dlm_debug;
2088         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2089
2090         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2091         if (ret) {
2092                 kfree(priv);
2093                 mlog_errno(ret);
2094                 goto out;
2095         }
2096
2097         seq = (struct seq_file *) file->private_data;
2098         seq->private = priv;
2099
2100         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2101                                    priv->p_dlm_debug);
2102
2103 out:
2104         return ret;
2105 }
2106
2107 static const struct file_operations ocfs2_dlm_debug_fops = {
2108         .open =         ocfs2_dlm_debug_open,
2109         .release =      ocfs2_dlm_debug_release,
2110         .read =         seq_read,
2111         .llseek =       seq_lseek,
2112 };
2113
2114 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2115 {
2116         int ret = 0;
2117         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2118
2119         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2120                                                          S_IFREG|S_IRUSR,
2121                                                          osb->osb_debug_root,
2122                                                          osb,
2123                                                          &ocfs2_dlm_debug_fops);
2124         if (!dlm_debug->d_locking_state) {
2125                 ret = -EINVAL;
2126                 mlog(ML_ERROR,
2127                      "Unable to create locking state debugfs file.\n");
2128                 goto out;
2129         }
2130
2131         ocfs2_get_dlm_debug(dlm_debug);
2132 out:
2133         return ret;
2134 }
2135
2136 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2137 {
2138         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2139
2140         if (dlm_debug) {
2141                 debugfs_remove(dlm_debug->d_locking_state);
2142                 ocfs2_put_dlm_debug(dlm_debug);
2143         }
2144 }
2145
2146 int ocfs2_dlm_init(struct ocfs2_super *osb)
2147 {
2148         int status;
2149         u32 dlm_key;
2150         struct dlm_ctxt *dlm;
2151
2152         mlog_entry_void();
2153
2154         status = ocfs2_dlm_init_debug(osb);
2155         if (status < 0) {
2156                 mlog_errno(status);
2157                 goto bail;
2158         }
2159
2160         /* launch vote thread */
2161         osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2162         if (IS_ERR(osb->vote_task)) {
2163                 status = PTR_ERR(osb->vote_task);
2164                 osb->vote_task = NULL;
2165                 mlog_errno(status);
2166                 goto bail;
2167         }
2168
2169         /* used by the dlm code to make message headers unique, each
2170          * node in this domain must agree on this. */
2171         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2172
2173         /* for now, uuid == domain */
2174         dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2175         if (IS_ERR(dlm)) {
2176                 status = PTR_ERR(dlm);
2177                 mlog_errno(status);
2178                 goto bail;
2179         }
2180
2181         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2182         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2183
2184         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2185
2186         osb->dlm = dlm;
2187
2188         status = 0;
2189 bail:
2190         if (status < 0) {
2191                 ocfs2_dlm_shutdown_debug(osb);
2192                 if (osb->vote_task)
2193                         kthread_stop(osb->vote_task);
2194         }
2195
2196         mlog_exit(status);
2197         return status;
2198 }
2199
2200 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2201 {
2202         mlog_entry_void();
2203
2204         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2205
2206         ocfs2_drop_osb_locks(osb);
2207
2208         if (osb->vote_task) {
2209                 kthread_stop(osb->vote_task);
2210                 osb->vote_task = NULL;
2211         }
2212
2213         ocfs2_lock_res_free(&osb->osb_super_lockres);
2214         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2215
2216         dlm_unregister_domain(osb->dlm);
2217         osb->dlm = NULL;
2218
2219         ocfs2_dlm_shutdown_debug(osb);
2220
2221         mlog_exit_void();
2222 }
2223
2224 static void ocfs2_unlock_ast_func(void *opaque, enum dlm_status status)
2225 {
2226         struct ocfs2_lock_res *lockres = opaque;
2227         unsigned long flags;
2228
2229         mlog_entry_void();
2230
2231         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2232              lockres->l_unlock_action);
2233
2234         spin_lock_irqsave(&lockres->l_lock, flags);
2235         /* We tried to cancel a convert request, but it was already
2236          * granted. All we want to do here is clear our unlock
2237          * state. The wake_up call done at the bottom is redundant
2238          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2239          * hurt anything anyway */
2240         if (status == DLM_CANCELGRANT &&
2241             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2242                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2243
2244                 /* We don't clear the busy flag in this case as it
2245                  * should have been cleared by the ast which the dlm
2246                  * has called. */
2247                 goto complete_unlock;
2248         }
2249
2250         if (status != DLM_NORMAL) {
2251                 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2252                      "unlock_action %d\n", status, lockres->l_name,
2253                      lockres->l_unlock_action);
2254                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2255                 return;
2256         }
2257
2258         switch(lockres->l_unlock_action) {
2259         case OCFS2_UNLOCK_CANCEL_CONVERT:
2260                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2261                 lockres->l_action = OCFS2_AST_INVALID;
2262                 break;
2263         case OCFS2_UNLOCK_DROP_LOCK:
2264                 lockres->l_level = LKM_IVMODE;
2265                 break;
2266         default:
2267                 BUG();
2268         }
2269
2270         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2271 complete_unlock:
2272         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2273         spin_unlock_irqrestore(&lockres->l_lock, flags);
2274
2275         wake_up(&lockres->l_event);
2276
2277         mlog_exit_void();
2278 }
2279
2280 typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2281
2282 struct drop_lock_cb {
2283         ocfs2_pre_drop_cb_t     *drop_func;
2284         void                    *drop_data;
2285 };
2286
2287 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2288                            struct ocfs2_lock_res *lockres,
2289                            struct drop_lock_cb *dcb)
2290 {
2291         enum dlm_status status;
2292         unsigned long flags;
2293
2294         /* We didn't get anywhere near actually using this lockres. */
2295         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2296                 goto out;
2297
2298         spin_lock_irqsave(&lockres->l_lock, flags);
2299
2300         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2301                         "lockres %s, flags 0x%lx\n",
2302                         lockres->l_name, lockres->l_flags);
2303
2304         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2305                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2306                      "%u, unlock_action = %u\n",
2307                      lockres->l_name, lockres->l_flags, lockres->l_action,
2308                      lockres->l_unlock_action);
2309
2310                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2311
2312                 /* XXX: Today we just wait on any busy
2313                  * locks... Perhaps we need to cancel converts in the
2314                  * future? */
2315                 ocfs2_wait_on_busy_lock(lockres);
2316
2317                 spin_lock_irqsave(&lockres->l_lock, flags);
2318         }
2319
2320         if (dcb)
2321                 dcb->drop_func(lockres, dcb->drop_data);
2322
2323         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2324                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2325                      lockres->l_name);
2326         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2327                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2328
2329         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2330                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2331                 goto out;
2332         }
2333
2334         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2335
2336         /* make sure we never get here while waiting for an ast to
2337          * fire. */
2338         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2339
2340         /* is this necessary? */
2341         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2342         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2343         spin_unlock_irqrestore(&lockres->l_lock, flags);
2344
2345         mlog(0, "lock %s\n", lockres->l_name);
2346
2347         status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK,
2348                            lockres->l_ops->unlock_ast, lockres);
2349         if (status != DLM_NORMAL) {
2350                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2351                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2352                 dlm_print_one_lock(lockres->l_lksb.lockid);
2353                 BUG();
2354         }
2355         mlog(0, "lock %s, successfull return from dlmunlock\n",
2356              lockres->l_name);
2357
2358         ocfs2_wait_on_busy_lock(lockres);
2359 out:
2360         mlog_exit(0);
2361         return 0;
2362 }
2363
2364 /* Mark the lockres as being dropped. It will no longer be
2365  * queued if blocking, but we still may have to wait on it
2366  * being dequeued from the vote thread before we can consider
2367  * it safe to drop. 
2368  *
2369  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2370 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2371 {
2372         int status;
2373         struct ocfs2_mask_waiter mw;
2374         unsigned long flags;
2375
2376         ocfs2_init_mask_waiter(&mw);
2377
2378         spin_lock_irqsave(&lockres->l_lock, flags);
2379         lockres->l_flags |= OCFS2_LOCK_FREEING;
2380         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2381                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2382                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2383
2384                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2385
2386                 status = ocfs2_wait_for_mask(&mw);
2387                 if (status)
2388                         mlog_errno(status);
2389
2390                 spin_lock_irqsave(&lockres->l_lock, flags);
2391         }
2392         spin_unlock_irqrestore(&lockres->l_lock, flags);
2393 }
2394
2395 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2396                                struct ocfs2_lock_res *lockres)
2397 {
2398         int ret;
2399
2400         ocfs2_mark_lockres_freeing(lockres);
2401         ret = ocfs2_drop_lock(osb, lockres, NULL);
2402         if (ret)
2403                 mlog_errno(ret);
2404 }
2405
2406 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2407 {
2408         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2409         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2410 }
2411
2412 static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2413 {
2414         struct inode *inode = data;
2415
2416         /* the metadata lock requires a bit more work as we have an
2417          * LVB to worry about. */
2418         if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2419             lockres->l_level == LKM_EXMODE &&
2420             !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2421                 __ocfs2_stuff_meta_lvb(inode);
2422 }
2423
2424 int ocfs2_drop_inode_locks(struct inode *inode)
2425 {
2426         int status, err;
2427         struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2428
2429         mlog_entry_void();
2430
2431         /* No need to call ocfs2_mark_lockres_freeing here -
2432          * ocfs2_clear_inode has done it for us. */
2433
2434         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2435                               &OCFS2_I(inode)->ip_data_lockres,
2436                               NULL);
2437         if (err < 0)
2438                 mlog_errno(err);
2439
2440         status = err;
2441
2442         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2443                               &OCFS2_I(inode)->ip_meta_lockres,
2444                               &meta_dcb);
2445         if (err < 0)
2446                 mlog_errno(err);
2447         if (err < 0 && !status)
2448                 status = err;
2449
2450         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2451                               &OCFS2_I(inode)->ip_rw_lockres,
2452                               NULL);
2453         if (err < 0)
2454                 mlog_errno(err);
2455         if (err < 0 && !status)
2456                 status = err;
2457
2458         mlog_exit(status);
2459         return status;
2460 }
2461
2462 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2463                                       int new_level)
2464 {
2465         assert_spin_locked(&lockres->l_lock);
2466
2467         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2468
2469         if (lockres->l_level <= new_level) {
2470                 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2471                      lockres->l_level, new_level);
2472                 BUG();
2473         }
2474
2475         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2476              lockres->l_name, new_level, lockres->l_blocking);
2477
2478         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2479         lockres->l_requested = new_level;
2480         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2481 }
2482
2483 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2484                                   struct ocfs2_lock_res *lockres,
2485                                   int new_level,
2486                                   int lvb)
2487 {
2488         int ret, dlm_flags = LKM_CONVERT;
2489         enum dlm_status status;
2490
2491         mlog_entry_void();
2492
2493         if (lvb)
2494                 dlm_flags |= LKM_VALBLK;
2495
2496         status = dlmlock(osb->dlm,
2497                          new_level,
2498                          &lockres->l_lksb,
2499                          dlm_flags,
2500                          lockres->l_name,
2501                          OCFS2_LOCK_ID_MAX_LEN - 1,
2502                          ocfs2_locking_ast,
2503                          lockres,
2504                          lockres->l_ops->bast);
2505         if (status != DLM_NORMAL) {
2506                 ocfs2_log_dlm_error("dlmlock", status, lockres);
2507                 ret = -EINVAL;
2508                 ocfs2_recover_from_dlm_error(lockres, 1);
2509                 goto bail;
2510         }
2511
2512         ret = 0;
2513 bail:
2514         mlog_exit(ret);
2515         return ret;
2516 }
2517
2518 /* returns 1 when the caller should unlock and call dlmunlock */
2519 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2520                                         struct ocfs2_lock_res *lockres)
2521 {
2522         assert_spin_locked(&lockres->l_lock);
2523
2524         mlog_entry_void();
2525         mlog(0, "lock %s\n", lockres->l_name);
2526
2527         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2528                 /* If we're already trying to cancel a lock conversion
2529                  * then just drop the spinlock and allow the caller to
2530                  * requeue this lock. */
2531
2532                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2533                 return 0;
2534         }
2535
2536         /* were we in a convert when we got the bast fire? */
2537         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2538                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2539         /* set things up for the unlockast to know to just
2540          * clear out the ast_action and unset busy, etc. */
2541         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2542
2543         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2544                         "lock %s, invalid flags: 0x%lx\n",
2545                         lockres->l_name, lockres->l_flags);
2546
2547         return 1;
2548 }
2549
2550 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2551                                 struct ocfs2_lock_res *lockres)
2552 {
2553         int ret;
2554         enum dlm_status status;
2555
2556         mlog_entry_void();
2557         mlog(0, "lock %s\n", lockres->l_name);
2558
2559         ret = 0;
2560         status = dlmunlock(osb->dlm,
2561                            &lockres->l_lksb,
2562                            LKM_CANCEL,
2563                            lockres->l_ops->unlock_ast,
2564                            lockres);
2565         if (status != DLM_NORMAL) {
2566                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2567                 ret = -EINVAL;
2568                 ocfs2_recover_from_dlm_error(lockres, 0);
2569         }
2570
2571         mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2572
2573         mlog_exit(ret);
2574         return ret;
2575 }
2576
2577 static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
2578                                                   struct ocfs2_lock_res *lockres,
2579                                                   int new_level)
2580 {
2581         int ret;
2582
2583         mlog_entry_void();
2584
2585         BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2586
2587         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2588                 ret = 0;
2589                 mlog(0, "lockres %s currently being refreshed -- backing "
2590                      "off!\n", lockres->l_name);
2591         } else if (new_level == LKM_PRMODE)
2592                 ret = !lockres->l_ex_holders &&
2593                         ocfs2_inode_fully_checkpointed(inode);
2594         else /* Must be NLMODE we're converting to. */
2595                 ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
2596                         ocfs2_inode_fully_checkpointed(inode);
2597
2598         mlog_exit(ret);
2599         return ret;
2600 }
2601
2602 static int ocfs2_do_unblock_meta(struct inode *inode,
2603                                  int *requeue)
2604 {
2605         int new_level;
2606         int set_lvb = 0;
2607         int ret = 0;
2608         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
2609         unsigned long flags;
2610
2611         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2612
2613         mlog_entry_void();
2614
2615         spin_lock_irqsave(&lockres->l_lock, flags);
2616
2617         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2618
2619         mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
2620              lockres->l_blocking);
2621
2622         BUG_ON(lockres->l_level != LKM_EXMODE &&
2623                lockres->l_level != LKM_PRMODE);
2624
2625         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2626                 *requeue = 1;
2627                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2628                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2629                 if (ret) {
2630                         ret = ocfs2_cancel_convert(osb, lockres);
2631                         if (ret < 0)
2632                                 mlog_errno(ret);
2633                 }
2634                 goto leave;
2635         }
2636
2637         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2638
2639         mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2640              lockres->l_level, lockres->l_blocking, new_level);
2641
2642         if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
2643                 if (lockres->l_level == LKM_EXMODE)
2644                         set_lvb = 1;
2645
2646                 /* If the lock hasn't been refreshed yet (rare), then
2647                  * our memory inode values are old and we skip
2648                  * stuffing the lvb. There's no need to actually clear
2649                  * out the lvb here as it's value is still valid. */
2650                 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2651                         if (set_lvb)
2652                                 __ocfs2_stuff_meta_lvb(inode);
2653                 } else
2654                         mlog(0, "lockres %s: downconverting stale lock!\n",
2655                              lockres->l_name);
2656
2657                 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2658                      "l_blocking=%d, new_level=%d\n",
2659                      lockres->l_level, lockres->l_blocking, new_level);
2660
2661                 ocfs2_prepare_downconvert(lockres, new_level);
2662                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2663                 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2664                 goto leave;
2665         }
2666         if (!ocfs2_inode_fully_checkpointed(inode))
2667                 ocfs2_start_checkpoint(osb);
2668
2669         *requeue = 1;
2670         spin_unlock_irqrestore(&lockres->l_lock, flags);
2671         ret = 0;
2672 leave:
2673         mlog_exit(ret);
2674         return ret;
2675 }
2676
2677 static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2678                                       struct ocfs2_lock_res *lockres,
2679                                       struct ocfs2_unblock_ctl *ctl,
2680                                       ocfs2_convert_worker_t *worker)
2681 {
2682         unsigned long flags;
2683         int blocking;
2684         int new_level;
2685         int ret = 0;
2686
2687         mlog_entry_void();
2688
2689         spin_lock_irqsave(&lockres->l_lock, flags);
2690
2691         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2692
2693 recheck:
2694         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2695                 ctl->requeue = 1;
2696                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2697                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2698                 if (ret) {
2699                         ret = ocfs2_cancel_convert(osb, lockres);
2700                         if (ret < 0)
2701                                 mlog_errno(ret);
2702                 }
2703                 goto leave;
2704         }
2705
2706         /* if we're blocking an exclusive and we have *any* holders,
2707          * then requeue. */
2708         if ((lockres->l_blocking == LKM_EXMODE)
2709             && (lockres->l_ex_holders || lockres->l_ro_holders)) {
2710                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2711                 ctl->requeue = 1;
2712                 ret = 0;
2713                 goto leave;
2714         }
2715
2716         /* If it's a PR we're blocking, then only
2717          * requeue if we've got any EX holders */
2718         if (lockres->l_blocking == LKM_PRMODE &&
2719             lockres->l_ex_holders) {
2720                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2721                 ctl->requeue = 1;
2722                 ret = 0;
2723                 goto leave;
2724         }
2725
2726         /* If we get here, then we know that there are no more
2727          * incompatible holders (and anyone asking for an incompatible
2728          * lock is blocked). We can now downconvert the lock */
2729         if (!worker)
2730                 goto downconvert;
2731
2732         /* Some lockres types want to do a bit of work before
2733          * downconverting a lock. Allow that here. The worker function
2734          * may sleep, so we save off a copy of what we're blocking as
2735          * it may change while we're not holding the spin lock. */
2736         blocking = lockres->l_blocking;
2737         spin_unlock_irqrestore(&lockres->l_lock, flags);
2738
2739         ctl->unblock_action = worker(lockres, blocking);
2740
2741         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2742                 goto leave;
2743
2744         spin_lock_irqsave(&lockres->l_lock, flags);
2745         if (blocking != lockres->l_blocking) {
2746                 /* If this changed underneath us, then we can't drop
2747                  * it just yet. */
2748                 goto recheck;
2749         }
2750
2751 downconvert:
2752         ctl->requeue = 0;
2753         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2754
2755         ocfs2_prepare_downconvert(lockres, new_level);
2756         spin_unlock_irqrestore(&lockres->l_lock, flags);
2757         ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0);
2758 leave:
2759         mlog_exit(ret);
2760         return ret;
2761 }
2762
2763 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2764                                      int blocking)
2765 {
2766         struct inode *inode;
2767         struct address_space *mapping;
2768
2769         inode = ocfs2_lock_res_inode(lockres);
2770         mapping = inode->i_mapping;
2771
2772         if (filemap_fdatawrite(mapping)) {
2773                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2774                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
2775         }
2776         sync_mapping_buffers(mapping);
2777         if (blocking == LKM_EXMODE) {
2778                 truncate_inode_pages(mapping, 0);
2779                 unmap_mapping_range(mapping, 0, 0, 0);
2780         } else {
2781                 /* We only need to wait on the I/O if we're not also
2782                  * truncating pages because truncate_inode_pages waits
2783                  * for us above. We don't truncate pages if we're
2784                  * blocking anything < EXMODE because we want to keep
2785                  * them around in that case. */
2786                 filemap_fdatawait(mapping);
2787         }
2788
2789         return UNBLOCK_CONTINUE;
2790 }
2791
2792 int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2793                        struct ocfs2_unblock_ctl *ctl)
2794 {
2795         int status;
2796         struct inode *inode;
2797         struct ocfs2_super *osb;
2798
2799         mlog_entry_void();
2800
2801         inode = ocfs2_lock_res_inode(lockres);
2802         osb = OCFS2_SB(inode->i_sb);
2803
2804         mlog(0, "unblock inode %llu\n",
2805              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2806
2807         status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2808                                             ocfs2_data_convert_worker);
2809         if (status < 0)
2810                 mlog_errno(status);
2811
2812         mlog(0, "inode %llu, requeue = %d\n",
2813              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2814
2815         mlog_exit(status);
2816         return status;
2817 }
2818
2819 static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2820                                     struct ocfs2_unblock_ctl *ctl)
2821 {
2822         int status;
2823         struct inode *inode;
2824
2825         mlog_entry_void();
2826
2827         mlog(0, "Unblock lockres %s\n", lockres->l_name);
2828
2829         inode  = ocfs2_lock_res_inode(lockres);
2830
2831         status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2832                                             lockres, ctl, NULL);
2833         if (status < 0)
2834                 mlog_errno(status);
2835
2836         mlog_exit(status);
2837         return status;
2838 }
2839
2840 static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2841                               struct ocfs2_unblock_ctl *ctl)
2842 {
2843         int status;
2844         struct inode *inode;
2845
2846         mlog_entry_void();
2847
2848         inode = ocfs2_lock_res_inode(lockres);
2849
2850         mlog(0, "unblock inode %llu\n",
2851              (unsigned long long)OCFS2_I(inode)->ip_blkno);
2852
2853         status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
2854         if (status < 0)
2855                 mlog_errno(status);
2856
2857         mlog(0, "inode %llu, requeue = %d\n",
2858              (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2859
2860         mlog_exit(status);
2861         return status;
2862 }
2863
2864 /*
2865  * Does the final reference drop on our dentry lock. Right now this
2866  * happens in the vote thread, but we could choose to simplify the
2867  * dlmglue API and push these off to the ocfs2_wq in the future.
2868  */
2869 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2870                                      struct ocfs2_lock_res *lockres)
2871 {
2872         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2873         ocfs2_dentry_lock_put(osb, dl);
2874 }
2875
2876 /*
2877  * d_delete() matching dentries before the lock downconvert.
2878  *
2879  * At this point, any process waiting to destroy the
2880  * dentry_lock due to last ref count is stopped by the
2881  * OCFS2_LOCK_QUEUED flag.
2882  *
2883  * We have two potential problems
2884  *
2885  * 1) If we do the last reference drop on our dentry_lock (via dput)
2886  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2887  *    the downconvert to finish. Instead we take an elevated
2888  *    reference and push the drop until after we've completed our
2889  *    unblock processing.
2890  *
2891  * 2) There might be another process with a final reference,
2892  *    waiting on us to finish processing. If this is the case, we
2893  *    detect it and exit out - there's no more dentries anyway.
2894  */
2895 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2896                                        int blocking)
2897 {
2898         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2899         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2900         struct dentry *dentry;
2901         unsigned long flags;
2902         int extra_ref = 0;
2903
2904         /*
2905          * This node is blocking another node from getting a read
2906          * lock. This happens when we've renamed within a
2907          * directory. We've forced the other nodes to d_delete(), but
2908          * we never actually dropped our lock because it's still
2909          * valid. The downconvert code will retain a PR for this node,
2910          * so there's no further work to do.
2911          */
2912         if (blocking == LKM_PRMODE)
2913                 return UNBLOCK_CONTINUE;
2914
2915         /*
2916          * Mark this inode as potentially orphaned. The code in
2917          * ocfs2_delete_inode() will figure out whether it actually
2918          * needs to be freed or not.
2919          */
2920         spin_lock(&oi->ip_lock);
2921         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2922         spin_unlock(&oi->ip_lock);
2923
2924         /*
2925          * Yuck. We need to make sure however that the check of
2926          * OCFS2_LOCK_FREEING and the extra reference are atomic with
2927          * respect to a reference decrement or the setting of that
2928          * flag.
2929          */
2930         spin_lock_irqsave(&lockres->l_lock, flags);
2931         spin_lock(&dentry_attach_lock);
2932         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2933             && dl->dl_count) {
2934                 dl->dl_count++;
2935                 extra_ref = 1;
2936         }
2937         spin_unlock(&dentry_attach_lock);
2938         spin_unlock_irqrestore(&lockres->l_lock, flags);
2939
2940         mlog(0, "extra_ref = %d\n", extra_ref);
2941
2942         /*
2943          * We have a process waiting on us in ocfs2_dentry_iput(),
2944          * which means we can't have any more outstanding
2945          * aliases. There's no need to do any more work.
2946          */
2947         if (!extra_ref)
2948                 return UNBLOCK_CONTINUE;
2949
2950         spin_lock(&dentry_attach_lock);
2951         while (1) {
2952                 dentry = ocfs2_find_local_alias(dl->dl_inode,
2953                                                 dl->dl_parent_blkno, 1);
2954                 if (!dentry)
2955                         break;
2956                 spin_unlock(&dentry_attach_lock);
2957
2958                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2959                      dentry->d_name.name);
2960
2961                 /*
2962                  * The following dcache calls may do an
2963                  * iput(). Normally we don't want that from the
2964                  * downconverting thread, but in this case it's ok
2965                  * because the requesting node already has an
2966                  * exclusive lock on the inode, so it can't be queued
2967                  * for a downconvert.
2968                  */
2969                 d_delete(dentry);
2970                 dput(dentry);
2971
2972                 spin_lock(&dentry_attach_lock);
2973         }
2974         spin_unlock(&dentry_attach_lock);
2975
2976         /*
2977          * If we are the last holder of this dentry lock, there is no
2978          * reason to downconvert so skip straight to the unlock.
2979          */
2980         if (dl->dl_count == 1)
2981                 return UNBLOCK_STOP_POST;
2982
2983         return UNBLOCK_CONTINUE_POST;
2984 }
2985
2986 static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
2987                                      struct ocfs2_unblock_ctl *ctl)
2988 {
2989         int ret;
2990         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2991         struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
2992
2993         mlog(0, "unblock dentry lock: %llu\n",
2994              (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
2995
2996         ret = ocfs2_generic_unblock_lock(osb,
2997                                          lockres,
2998                                          ctl,
2999                                          ocfs2_dentry_convert_worker);
3000         if (ret < 0)
3001                 mlog_errno(ret);
3002
3003         mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
3004
3005         return ret;
3006 }
3007
3008 /* Generic unblock function for any lockres whose private data is an
3009  * ocfs2_super pointer. */
3010 static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
3011                                   struct ocfs2_unblock_ctl *ctl)
3012 {
3013         int status;
3014         struct ocfs2_super *osb;
3015
3016         mlog_entry_void();
3017
3018         mlog(0, "Unblock lockres %s\n", lockres->l_name);
3019
3020         osb = ocfs2_lock_res_super(lockres);
3021
3022         status = ocfs2_generic_unblock_lock(osb,
3023                                             lockres,
3024                                             ctl,
3025                                             NULL);
3026         if (status < 0)
3027                 mlog_errno(status);
3028
3029         mlog_exit(status);
3030         return status;
3031 }
3032
3033 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3034                                 struct ocfs2_lock_res *lockres)
3035 {
3036         int status;
3037         struct ocfs2_unblock_ctl ctl = {0, 0,};
3038         unsigned long flags;
3039
3040         /* Our reference to the lockres in this function can be
3041          * considered valid until we remove the OCFS2_LOCK_QUEUED
3042          * flag. */
3043
3044         mlog_entry_void();
3045
3046         BUG_ON(!lockres);
3047         BUG_ON(!lockres->l_ops);
3048         BUG_ON(!lockres->l_ops->unblock);
3049
3050         mlog(0, "lockres %s blocked.\n", lockres->l_name);
3051
3052         /* Detect whether a lock has been marked as going away while
3053          * the vote thread was processing other things. A lock can
3054          * still be marked with OCFS2_LOCK_FREEING after this check,
3055          * but short circuiting here will still save us some
3056          * performance. */
3057         spin_lock_irqsave(&lockres->l_lock, flags);
3058         if (lockres->l_flags & OCFS2_LOCK_FREEING)
3059                 goto unqueue;
3060         spin_unlock_irqrestore(&lockres->l_lock, flags);
3061
3062         status = lockres->l_ops->unblock(lockres, &ctl);
3063         if (status < 0)
3064                 mlog_errno(status);
3065
3066         spin_lock_irqsave(&lockres->l_lock, flags);
3067 unqueue:
3068         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3069                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3070         } else
3071                 ocfs2_schedule_blocked_lock(osb, lockres);
3072
3073         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3074              ctl.requeue ? "yes" : "no");
3075         spin_unlock_irqrestore(&lockres->l_lock, flags);
3076
3077         if (ctl.unblock_action != UNBLOCK_CONTINUE
3078             && lockres->l_ops->post_unlock)
3079                 lockres->l_ops->post_unlock(osb, lockres);
3080
3081         mlog_exit_void();
3082 }
3083
3084 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3085                                         struct ocfs2_lock_res *lockres)
3086 {
3087         mlog_entry_void();
3088
3089         assert_spin_locked(&lockres->l_lock);
3090
3091         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3092                 /* Do not schedule a lock for downconvert when it's on
3093                  * the way to destruction - any nodes wanting access
3094                  * to the resource will get it soon. */
3095                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3096                      lockres->l_name, lockres->l_flags);
3097                 return;
3098         }
3099
3100         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3101
3102         spin_lock(&osb->vote_task_lock);
3103         if (list_empty(&lockres->l_blocked_list)) {
3104                 list_add_tail(&lockres->l_blocked_list,
3105                               &osb->blocked_lock_list);
3106                 osb->blocked_lock_count++;
3107         }
3108         spin_unlock(&osb->vote_task_lock);
3109
3110         mlog_exit_void();
3111 }
3112
3113 /* This aids in debugging situations where a bad LVB might be involved. */
3114 void ocfs2_dump_meta_lvb_info(u64 level,
3115                               const char *function,
3116                               unsigned int line,
3117                               struct ocfs2_lock_res *lockres)
3118 {
3119         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3120
3121         mlog(level, "LVB information for %s (called from %s:%u):\n",
3122              lockres->l_name, function, line);
3123         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3124              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3125              be32_to_cpu(lvb->lvb_igeneration));
3126         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3127              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3128              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3129              be16_to_cpu(lvb->lvb_imode));
3130         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3131              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3132              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3133              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3134              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3135              be32_to_cpu(lvb->lvb_iattr));
3136 }