]> Pileus Git - ~andy/linux/blob - drivers/staging/lustre/lustre/ldlm/ldlm_lock.c
Merge tag 'for-3.14-merge-window' of git://git.kernel.org/pub/scm/linux/kernel/git...
[~andy/linux] / drivers / staging / lustre / lustre / ldlm / ldlm_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lock.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 # include <linux/libcfs/libcfs.h>
45 # include <linux/lustre_intent.h>
46
47 #include <obd_class.h>
48 #include "ldlm_internal.h"
49
50 /* lock types */
51 char *ldlm_lockname[] = {
52         [0]             = "--",
53         [LCK_EX]        = "EX",
54         [LCK_PW]        = "PW",
55         [LCK_PR]        = "PR",
56         [LCK_CW]        = "CW",
57         [LCK_CR]        = "CR",
58         [LCK_NL]        = "NL",
59         [LCK_GROUP]     = "GROUP",
60         [LCK_COS]       = "COS",
61 };
62 EXPORT_SYMBOL(ldlm_lockname);
63
64 char *ldlm_typename[] = {
65         [LDLM_PLAIN]    = "PLN",
66         [LDLM_EXTENT]   = "EXT",
67         [LDLM_FLOCK]    = "FLK",
68         [LDLM_IBITS]    = "IBT",
69 };
70 EXPORT_SYMBOL(ldlm_typename);
71
72 static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = {
73         [LDLM_PLAIN - LDLM_MIN_TYPE]    = ldlm_plain_policy_wire_to_local,
74         [LDLM_EXTENT - LDLM_MIN_TYPE]   = ldlm_extent_policy_wire_to_local,
75         [LDLM_FLOCK - LDLM_MIN_TYPE]    = ldlm_flock_policy_wire18_to_local,
76         [LDLM_IBITS - LDLM_MIN_TYPE]    = ldlm_ibits_policy_wire_to_local,
77 };
78
79 static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = {
80         [LDLM_PLAIN - LDLM_MIN_TYPE]    = ldlm_plain_policy_wire_to_local,
81         [LDLM_EXTENT - LDLM_MIN_TYPE]   = ldlm_extent_policy_wire_to_local,
82         [LDLM_FLOCK - LDLM_MIN_TYPE]    = ldlm_flock_policy_wire21_to_local,
83         [LDLM_IBITS - LDLM_MIN_TYPE]    = ldlm_ibits_policy_wire_to_local,
84 };
85
86 static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = {
87         [LDLM_PLAIN - LDLM_MIN_TYPE]    = ldlm_plain_policy_local_to_wire,
88         [LDLM_EXTENT - LDLM_MIN_TYPE]   = ldlm_extent_policy_local_to_wire,
89         [LDLM_FLOCK - LDLM_MIN_TYPE]    = ldlm_flock_policy_local_to_wire,
90         [LDLM_IBITS - LDLM_MIN_TYPE]    = ldlm_ibits_policy_local_to_wire,
91 };
92
93 /**
94  * Converts lock policy from local format to on the wire lock_desc format
95  */
96 void ldlm_convert_policy_to_wire(ldlm_type_t type,
97                                  const ldlm_policy_data_t *lpolicy,
98                                  ldlm_wire_policy_data_t *wpolicy)
99 {
100         ldlm_policy_local_to_wire_t convert;
101
102         convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE];
103
104         convert(lpolicy, wpolicy);
105 }
106
107 /**
108  * Converts lock policy from on the wire lock_desc format to local format
109  */
110 void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type,
111                                   const ldlm_wire_policy_data_t *wpolicy,
112                                   ldlm_policy_data_t *lpolicy)
113 {
114         ldlm_policy_wire_to_local_t convert;
115         int new_client;
116
117         /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */
118         new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0;
119         if (new_client)
120                 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE];
121         else
122                 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE];
123
124         convert(wpolicy, lpolicy);
125 }
126
127 char *ldlm_it2str(int it)
128 {
129         switch (it) {
130         case IT_OPEN:
131                 return "open";
132         case IT_CREAT:
133                 return "creat";
134         case (IT_OPEN | IT_CREAT):
135                 return "open|creat";
136         case IT_READDIR:
137                 return "readdir";
138         case IT_GETATTR:
139                 return "getattr";
140         case IT_LOOKUP:
141                 return "lookup";
142         case IT_UNLINK:
143                 return "unlink";
144         case IT_GETXATTR:
145                 return "getxattr";
146         case IT_LAYOUT:
147                 return "layout";
148         case IT_SETXATTR:
149                 return "setxattr";
150         default:
151                 CERROR("Unknown intent %d\n", it);
152                 return "UNKNOWN";
153         }
154 }
155 EXPORT_SYMBOL(ldlm_it2str);
156
157 extern struct kmem_cache *ldlm_lock_slab;
158
159
160 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
161 {
162         ns->ns_policy = arg;
163 }
164 EXPORT_SYMBOL(ldlm_register_intent);
165
166 /*
167  * REFCOUNTED LOCK OBJECTS
168  */
169
170
171 /**
172  * Get a reference on a lock.
173  *
174  * Lock refcounts, during creation:
175  *   - one special one for allocation, dec'd only once in destroy
176  *   - one for being a lock that's in-use
177  *   - one for the addref associated with a new lock
178  */
179 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
180 {
181         atomic_inc(&lock->l_refc);
182         return lock;
183 }
184 EXPORT_SYMBOL(ldlm_lock_get);
185
186 /**
187  * Release lock reference.
188  *
189  * Also frees the lock if it was last reference.
190  */
191 void ldlm_lock_put(struct ldlm_lock *lock)
192 {
193         LASSERT(lock->l_resource != LP_POISON);
194         LASSERT(atomic_read(&lock->l_refc) > 0);
195         if (atomic_dec_and_test(&lock->l_refc)) {
196                 struct ldlm_resource *res;
197
198                 LDLM_DEBUG(lock,
199                            "final lock_put on destroyed lock, freeing it.");
200
201                 res = lock->l_resource;
202                 LASSERT(lock->l_flags & LDLM_FL_DESTROYED);
203                 LASSERT(list_empty(&lock->l_res_link));
204                 LASSERT(list_empty(&lock->l_pending_chain));
205
206                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
207                                      LDLM_NSS_LOCKS);
208                 lu_ref_del(&res->lr_reference, "lock", lock);
209                 ldlm_resource_putref(res);
210                 lock->l_resource = NULL;
211                 if (lock->l_export) {
212                         class_export_lock_put(lock->l_export, lock);
213                         lock->l_export = NULL;
214                 }
215
216                 if (lock->l_lvb_data != NULL)
217                         OBD_FREE(lock->l_lvb_data, lock->l_lvb_len);
218
219                 ldlm_interval_free(ldlm_interval_detach(lock));
220                 lu_ref_fini(&lock->l_reference);
221                 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
222         }
223 }
224 EXPORT_SYMBOL(ldlm_lock_put);
225
226 /**
227  * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked.
228  */
229 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
230 {
231         int rc = 0;
232         if (!list_empty(&lock->l_lru)) {
233                 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
234
235                 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
236                 list_del_init(&lock->l_lru);
237                 if (lock->l_flags & LDLM_FL_SKIPPED)
238                         lock->l_flags &= ~LDLM_FL_SKIPPED;
239                 LASSERT(ns->ns_nr_unused > 0);
240                 ns->ns_nr_unused--;
241                 rc = 1;
242         }
243         return rc;
244 }
245
246 /**
247  * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first.
248  */
249 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
250 {
251         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
252         int rc;
253
254         if (lock->l_flags & LDLM_FL_NS_SRV) {
255                 LASSERT(list_empty(&lock->l_lru));
256                 return 0;
257         }
258
259         spin_lock(&ns->ns_lock);
260         rc = ldlm_lock_remove_from_lru_nolock(lock);
261         spin_unlock(&ns->ns_lock);
262         return rc;
263 }
264
265 /**
266  * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked.
267  */
268 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
269 {
270         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
271
272         lock->l_last_used = cfs_time_current();
273         LASSERT(list_empty(&lock->l_lru));
274         LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
275         list_add_tail(&lock->l_lru, &ns->ns_unused_list);
276         LASSERT(ns->ns_nr_unused >= 0);
277         ns->ns_nr_unused++;
278 }
279
280 /**
281  * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks
282  * first.
283  */
284 void ldlm_lock_add_to_lru(struct ldlm_lock *lock)
285 {
286         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
287
288         spin_lock(&ns->ns_lock);
289         ldlm_lock_add_to_lru_nolock(lock);
290         spin_unlock(&ns->ns_lock);
291 }
292
293 /**
294  * Moves LDLM lock \a lock that is already in namespace LRU to the tail of
295  * the LRU. Performs necessary LRU locking
296  */
297 void ldlm_lock_touch_in_lru(struct ldlm_lock *lock)
298 {
299         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
300
301         if (lock->l_flags & LDLM_FL_NS_SRV) {
302                 LASSERT(list_empty(&lock->l_lru));
303                 return;
304         }
305
306         spin_lock(&ns->ns_lock);
307         if (!list_empty(&lock->l_lru)) {
308                 ldlm_lock_remove_from_lru_nolock(lock);
309                 ldlm_lock_add_to_lru_nolock(lock);
310         }
311         spin_unlock(&ns->ns_lock);
312 }
313
314 /**
315  * Helper to destroy a locked lock.
316  *
317  * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock
318  * Must be called with l_lock and lr_lock held.
319  *
320  * Does not actually free the lock data, but rather marks the lock as
321  * destroyed by setting l_destroyed field in the lock to 1.  Destroys a
322  * handle->lock association too, so that the lock can no longer be found
323  * and removes the lock from LRU list.  Actual lock freeing occurs when
324  * last lock reference goes away.
325  *
326  * Original comment (of some historical value):
327  * This used to have a 'strict' flag, which recovery would use to mark an
328  * in-use lock as needing-to-die.  Lest I am ever tempted to put it back, I
329  * shall explain why it's gone: with the new hash table scheme, once you call
330  * ldlm_lock_destroy, you can never drop your final references on this lock.
331  * Because it's not in the hash table anymore.  -phil
332  */
333 int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
334 {
335         if (lock->l_readers || lock->l_writers) {
336                 LDLM_ERROR(lock, "lock still has references");
337                 LBUG();
338         }
339
340         if (!list_empty(&lock->l_res_link)) {
341                 LDLM_ERROR(lock, "lock still on resource");
342                 LBUG();
343         }
344
345         if (lock->l_flags & LDLM_FL_DESTROYED) {
346                 LASSERT(list_empty(&lock->l_lru));
347                 return 0;
348         }
349         lock->l_flags |= LDLM_FL_DESTROYED;
350
351         if (lock->l_export && lock->l_export->exp_lock_hash) {
352                 /* NB: it's safe to call cfs_hash_del() even lock isn't
353                  * in exp_lock_hash. */
354                 /* In the function below, .hs_keycmp resolves to
355                  * ldlm_export_lock_keycmp() */
356                 /* coverity[overrun-buffer-val] */
357                 cfs_hash_del(lock->l_export->exp_lock_hash,
358                              &lock->l_remote_handle, &lock->l_exp_hash);
359         }
360
361         ldlm_lock_remove_from_lru(lock);
362         class_handle_unhash(&lock->l_handle);
363
364 #if 0
365         /* Wake anyone waiting for this lock */
366         /* FIXME: I should probably add yet another flag, instead of using
367          * l_export to only call this on clients */
368         if (lock->l_export)
369                 class_export_put(lock->l_export);
370         lock->l_export = NULL;
371         if (lock->l_export && lock->l_completion_ast)
372                 lock->l_completion_ast(lock, 0);
373 #endif
374         return 1;
375 }
376
377 /**
378  * Destroys a LDLM lock \a lock. Performs necessary locking first.
379  */
380 void ldlm_lock_destroy(struct ldlm_lock *lock)
381 {
382         int first;
383
384         lock_res_and_lock(lock);
385         first = ldlm_lock_destroy_internal(lock);
386         unlock_res_and_lock(lock);
387
388         /* drop reference from hashtable only for first destroy */
389         if (first) {
390                 lu_ref_del(&lock->l_reference, "hash", lock);
391                 LDLM_LOCK_RELEASE(lock);
392         }
393 }
394
395 /**
396  * Destroys a LDLM lock \a lock that is already locked.
397  */
398 void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
399 {
400         int first;
401
402         first = ldlm_lock_destroy_internal(lock);
403         /* drop reference from hashtable only for first destroy */
404         if (first) {
405                 lu_ref_del(&lock->l_reference, "hash", lock);
406                 LDLM_LOCK_RELEASE(lock);
407         }
408 }
409
410 /* this is called by portals_handle2object with the handle lock taken */
411 static void lock_handle_addref(void *lock)
412 {
413         LDLM_LOCK_GET((struct ldlm_lock *)lock);
414 }
415
416 static void lock_handle_free(void *lock, int size)
417 {
418         LASSERT(size == sizeof(struct ldlm_lock));
419         OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
420 }
421
422 struct portals_handle_ops lock_handle_ops = {
423         .hop_addref = lock_handle_addref,
424         .hop_free   = lock_handle_free,
425 };
426
427 /**
428  *
429  * Allocate and initialize new lock structure.
430  *
431  * usage: pass in a resource on which you have done ldlm_resource_get
432  *      new lock will take over the refcount.
433  * returns: lock with refcount 2 - one for current caller and one for remote
434  */
435 static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
436 {
437         struct ldlm_lock *lock;
438
439         if (resource == NULL)
440                 LBUG();
441
442         OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, __GFP_IO);
443         if (lock == NULL)
444                 return NULL;
445
446         spin_lock_init(&lock->l_lock);
447         lock->l_resource = resource;
448         lu_ref_add(&resource->lr_reference, "lock", lock);
449
450         atomic_set(&lock->l_refc, 2);
451         INIT_LIST_HEAD(&lock->l_res_link);
452         INIT_LIST_HEAD(&lock->l_lru);
453         INIT_LIST_HEAD(&lock->l_pending_chain);
454         INIT_LIST_HEAD(&lock->l_bl_ast);
455         INIT_LIST_HEAD(&lock->l_cp_ast);
456         INIT_LIST_HEAD(&lock->l_rk_ast);
457         init_waitqueue_head(&lock->l_waitq);
458         lock->l_blocking_lock = NULL;
459         INIT_LIST_HEAD(&lock->l_sl_mode);
460         INIT_LIST_HEAD(&lock->l_sl_policy);
461         INIT_HLIST_NODE(&lock->l_exp_hash);
462         INIT_HLIST_NODE(&lock->l_exp_flock_hash);
463
464         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
465                              LDLM_NSS_LOCKS);
466         INIT_LIST_HEAD(&lock->l_handle.h_link);
467         class_handle_hash(&lock->l_handle, &lock_handle_ops);
468
469         lu_ref_init(&lock->l_reference);
470         lu_ref_add(&lock->l_reference, "hash", lock);
471         lock->l_callback_timeout = 0;
472
473 #if LUSTRE_TRACKS_LOCK_EXP_REFS
474         INIT_LIST_HEAD(&lock->l_exp_refs_link);
475         lock->l_exp_refs_nr = 0;
476         lock->l_exp_refs_target = NULL;
477 #endif
478         INIT_LIST_HEAD(&lock->l_exp_list);
479
480         return lock;
481 }
482
483 /**
484  * Moves LDLM lock \a lock to another resource.
485  * This is used on client when server returns some other lock than requested
486  * (typically as a result of intent operation)
487  */
488 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
489                               const struct ldlm_res_id *new_resid)
490 {
491         struct ldlm_resource *oldres = lock->l_resource;
492         struct ldlm_resource *newres;
493         int type;
494
495         LASSERT(ns_is_client(ns));
496
497         lock_res_and_lock(lock);
498         if (memcmp(new_resid, &lock->l_resource->lr_name,
499                    sizeof(lock->l_resource->lr_name)) == 0) {
500                 /* Nothing to do */
501                 unlock_res_and_lock(lock);
502                 return 0;
503         }
504
505         LASSERT(new_resid->name[0] != 0);
506
507         /* This function assumes that the lock isn't on any lists */
508         LASSERT(list_empty(&lock->l_res_link));
509
510         type = oldres->lr_type;
511         unlock_res_and_lock(lock);
512
513         newres = ldlm_resource_get(ns, NULL, new_resid, type, 1);
514         if (newres == NULL)
515                 return -ENOMEM;
516
517         lu_ref_add(&newres->lr_reference, "lock", lock);
518         /*
519          * To flip the lock from the old to the new resource, lock, oldres and
520          * newres have to be locked. Resource spin-locks are nested within
521          * lock->l_lock, and are taken in the memory address order to avoid
522          * dead-locks.
523          */
524         spin_lock(&lock->l_lock);
525         oldres = lock->l_resource;
526         if (oldres < newres) {
527                 lock_res(oldres);
528                 lock_res_nested(newres, LRT_NEW);
529         } else {
530                 lock_res(newres);
531                 lock_res_nested(oldres, LRT_NEW);
532         }
533         LASSERT(memcmp(new_resid, &oldres->lr_name,
534                        sizeof(oldres->lr_name)) != 0);
535         lock->l_resource = newres;
536         unlock_res(oldres);
537         unlock_res_and_lock(lock);
538
539         /* ...and the flowers are still standing! */
540         lu_ref_del(&oldres->lr_reference, "lock", lock);
541         ldlm_resource_putref(oldres);
542
543         return 0;
544 }
545 EXPORT_SYMBOL(ldlm_lock_change_resource);
546
547 /** \defgroup ldlm_handles LDLM HANDLES
548  * Ways to get hold of locks without any addresses.
549  * @{
550  */
551
552 /**
553  * Fills in handle for LDLM lock \a lock into supplied \a lockh
554  * Does not take any references.
555  */
556 void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh)
557 {
558         lockh->cookie = lock->l_handle.h_cookie;
559 }
560 EXPORT_SYMBOL(ldlm_lock2handle);
561
562 /**
563  * Obtain a lock reference by handle.
564  *
565  * if \a flags: atomically get the lock and set the flags.
566  *            Return NULL if flag already set
567  */
568 struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
569                                      __u64 flags)
570 {
571         struct ldlm_lock *lock;
572
573         LASSERT(handle);
574
575         lock = class_handle2object(handle->cookie);
576         if (lock == NULL)
577                 return NULL;
578
579         /* It's unlikely but possible that someone marked the lock as
580          * destroyed after we did handle2object on it */
581         if (flags == 0 && ((lock->l_flags & LDLM_FL_DESTROYED)== 0)) {
582                 lu_ref_add(&lock->l_reference, "handle", current);
583                 return lock;
584         }
585
586         lock_res_and_lock(lock);
587
588         LASSERT(lock->l_resource != NULL);
589
590         lu_ref_add_atomic(&lock->l_reference, "handle", current);
591         if (unlikely(lock->l_flags & LDLM_FL_DESTROYED)) {
592                 unlock_res_and_lock(lock);
593                 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
594                 LDLM_LOCK_PUT(lock);
595                 return NULL;
596         }
597
598         if (flags && (lock->l_flags & flags)) {
599                 unlock_res_and_lock(lock);
600                 LDLM_LOCK_PUT(lock);
601                 return NULL;
602         }
603
604         if (flags)
605                 lock->l_flags |= flags;
606
607         unlock_res_and_lock(lock);
608         return lock;
609 }
610 EXPORT_SYMBOL(__ldlm_handle2lock);
611 /** @} ldlm_handles */
612
613 /**
614  * Fill in "on the wire" representation for given LDLM lock into supplied
615  * lock descriptor \a desc structure.
616  */
617 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
618 {
619         struct obd_export *exp = lock->l_export ?: lock->l_conn_export;
620
621         /* INODEBITS_INTEROP: If the other side does not support
622          * inodebits, reply with a plain lock descriptor. */
623         if ((lock->l_resource->lr_type == LDLM_IBITS) &&
624             (exp && !(exp_connect_flags(exp) & OBD_CONNECT_IBITS))) {
625                 /* Make sure all the right bits are set in this lock we
626                    are going to pass to client */
627                 LASSERTF(lock->l_policy_data.l_inodebits.bits ==
628                          (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
629                           MDS_INODELOCK_LAYOUT),
630                          "Inappropriate inode lock bits during "
631                          "conversion " LPU64 "\n",
632                          lock->l_policy_data.l_inodebits.bits);
633
634                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
635                 desc->l_resource.lr_type = LDLM_PLAIN;
636
637                 /* Convert "new" lock mode to something old client can
638                    understand */
639                 if ((lock->l_req_mode == LCK_CR) ||
640                     (lock->l_req_mode == LCK_CW))
641                         desc->l_req_mode = LCK_PR;
642                 else
643                         desc->l_req_mode = lock->l_req_mode;
644                 if ((lock->l_granted_mode == LCK_CR) ||
645                     (lock->l_granted_mode == LCK_CW)) {
646                         desc->l_granted_mode = LCK_PR;
647                 } else {
648                         /* We never grant PW/EX locks to clients */
649                         LASSERT((lock->l_granted_mode != LCK_PW) &&
650                                 (lock->l_granted_mode != LCK_EX));
651                         desc->l_granted_mode = lock->l_granted_mode;
652                 }
653
654                 /* We do not copy policy here, because there is no
655                    policy for plain locks */
656         } else {
657                 ldlm_res2desc(lock->l_resource, &desc->l_resource);
658                 desc->l_req_mode = lock->l_req_mode;
659                 desc->l_granted_mode = lock->l_granted_mode;
660                 ldlm_convert_policy_to_wire(lock->l_resource->lr_type,
661                                             &lock->l_policy_data,
662                                             &desc->l_policy_data);
663         }
664 }
665 EXPORT_SYMBOL(ldlm_lock2desc);
666
667 /**
668  * Add a lock to list of conflicting locks to send AST to.
669  *
670  * Only add if we have not sent a blocking AST to the lock yet.
671  */
672 void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
673                            struct list_head *work_list)
674 {
675         if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) {
676                 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST.");
677                 lock->l_flags |= LDLM_FL_AST_SENT;
678                 /* If the enqueuing client said so, tell the AST recipient to
679                  * discard dirty data, rather than writing back. */
680                 if (new->l_flags & LDLM_FL_AST_DISCARD_DATA)
681                         lock->l_flags |= LDLM_FL_DISCARD_DATA;
682                 LASSERT(list_empty(&lock->l_bl_ast));
683                 list_add(&lock->l_bl_ast, work_list);
684                 LDLM_LOCK_GET(lock);
685                 LASSERT(lock->l_blocking_lock == NULL);
686                 lock->l_blocking_lock = LDLM_LOCK_GET(new);
687         }
688 }
689
690 /**
691  * Add a lock to list of just granted locks to send completion AST to.
692  */
693 void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list)
694 {
695         if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) {
696                 lock->l_flags |= LDLM_FL_CP_REQD;
697                 LDLM_DEBUG(lock, "lock granted; sending completion AST.");
698                 LASSERT(list_empty(&lock->l_cp_ast));
699                 list_add(&lock->l_cp_ast, work_list);
700                 LDLM_LOCK_GET(lock);
701         }
702 }
703
704 /**
705  * Aggregator function to add AST work items into a list. Determines
706  * what sort of an AST work needs to be done and calls the proper
707  * adding function.
708  * Must be called with lr_lock held.
709  */
710 void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
711                             struct list_head *work_list)
712 {
713         check_res_locked(lock->l_resource);
714         if (new)
715                 ldlm_add_bl_work_item(lock, new, work_list);
716         else
717                 ldlm_add_cp_work_item(lock, work_list);
718 }
719
720 /**
721  * Add specified reader/writer reference to LDLM lock with handle \a lockh.
722  * r/w reference type is determined by \a mode
723  * Calls ldlm_lock_addref_internal.
724  */
725 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
726 {
727         struct ldlm_lock *lock;
728
729         lock = ldlm_handle2lock(lockh);
730         LASSERT(lock != NULL);
731         ldlm_lock_addref_internal(lock, mode);
732         LDLM_LOCK_PUT(lock);
733 }
734 EXPORT_SYMBOL(ldlm_lock_addref);
735
736 /**
737  * Helper function.
738  * Add specified reader/writer reference to LDLM lock \a lock.
739  * r/w reference type is determined by \a mode
740  * Removes lock from LRU if it is there.
741  * Assumes the LDLM lock is already locked.
742  */
743 void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
744 {
745         ldlm_lock_remove_from_lru(lock);
746         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
747                 lock->l_readers++;
748                 lu_ref_add_atomic(&lock->l_reference, "reader", lock);
749         }
750         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
751                 lock->l_writers++;
752                 lu_ref_add_atomic(&lock->l_reference, "writer", lock);
753         }
754         LDLM_LOCK_GET(lock);
755         lu_ref_add_atomic(&lock->l_reference, "user", lock);
756         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
757 }
758
759 /**
760  * Attempts to add reader/writer reference to a lock with handle \a lockh, and
761  * fails if lock is already LDLM_FL_CBPENDING or destroyed.
762  *
763  * \retval 0 success, lock was addref-ed
764  *
765  * \retval -EAGAIN lock is being canceled.
766  */
767 int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode)
768 {
769         struct ldlm_lock *lock;
770         int            result;
771
772         result = -EAGAIN;
773         lock = ldlm_handle2lock(lockh);
774         if (lock != NULL) {
775                 lock_res_and_lock(lock);
776                 if (lock->l_readers != 0 || lock->l_writers != 0 ||
777                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
778                         ldlm_lock_addref_internal_nolock(lock, mode);
779                         result = 0;
780                 }
781                 unlock_res_and_lock(lock);
782                 LDLM_LOCK_PUT(lock);
783         }
784         return result;
785 }
786 EXPORT_SYMBOL(ldlm_lock_addref_try);
787
788 /**
789  * Add specified reader/writer reference to LDLM lock \a lock.
790  * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work.
791  * Only called for local locks.
792  */
793 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
794 {
795         lock_res_and_lock(lock);
796         ldlm_lock_addref_internal_nolock(lock, mode);
797         unlock_res_and_lock(lock);
798 }
799
800 /**
801  * Removes reader/writer reference for LDLM lock \a lock.
802  * Assumes LDLM lock is already locked.
803  * only called in ldlm_flock_destroy and for local locks.
804  * Does NOT add lock to LRU if no r/w references left to accommodate flock locks
805  * that cannot be placed in LRU.
806  */
807 void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode)
808 {
809         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
810         if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
811                 LASSERT(lock->l_readers > 0);
812                 lu_ref_del(&lock->l_reference, "reader", lock);
813                 lock->l_readers--;
814         }
815         if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
816                 LASSERT(lock->l_writers > 0);
817                 lu_ref_del(&lock->l_reference, "writer", lock);
818                 lock->l_writers--;
819         }
820
821         lu_ref_del(&lock->l_reference, "user", lock);
822         LDLM_LOCK_RELEASE(lock);    /* matches the LDLM_LOCK_GET() in addref */
823 }
824
825 /**
826  * Removes reader/writer reference for LDLM lock \a lock.
827  * Locks LDLM lock first.
828  * If the lock is determined to be client lock on a client and r/w refcount
829  * drops to zero and the lock is not blocked, the lock is added to LRU lock
830  * on the namespace.
831  * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called.
832  */
833 void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
834 {
835         struct ldlm_namespace *ns;
836
837         lock_res_and_lock(lock);
838
839         ns = ldlm_lock_to_ns(lock);
840
841         ldlm_lock_decref_internal_nolock(lock, mode);
842
843         if (lock->l_flags & LDLM_FL_LOCAL &&
844             !lock->l_readers && !lock->l_writers) {
845                 /* If this is a local lock on a server namespace and this was
846                  * the last reference, cancel the lock. */
847                 CDEBUG(D_INFO, "forcing cancel of local lock\n");
848                 lock->l_flags |= LDLM_FL_CBPENDING;
849         }
850
851         if (!lock->l_readers && !lock->l_writers &&
852             (lock->l_flags & LDLM_FL_CBPENDING)) {
853                 /* If we received a blocked AST and this was the last reference,
854                  * run the callback. */
855                 if ((lock->l_flags & LDLM_FL_NS_SRV) && lock->l_export)
856                         CERROR("FL_CBPENDING set on non-local lock--just a "
857                                "warning\n");
858
859                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
860
861                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
862                 ldlm_lock_remove_from_lru(lock);
863                 unlock_res_and_lock(lock);
864
865                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
866                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
867
868                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
869                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
870                         ldlm_handle_bl_callback(ns, NULL, lock);
871         } else if (ns_is_client(ns) &&
872                    !lock->l_readers && !lock->l_writers &&
873                    !(lock->l_flags & LDLM_FL_NO_LRU) &&
874                    !(lock->l_flags & LDLM_FL_BL_AST)) {
875
876                 LDLM_DEBUG(lock, "add lock into lru list");
877
878                 /* If this is a client-side namespace and this was the last
879                  * reference, put it on the LRU. */
880                 ldlm_lock_add_to_lru(lock);
881                 unlock_res_and_lock(lock);
882
883                 if (lock->l_flags & LDLM_FL_FAIL_LOC)
884                         OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
885
886                 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
887                  * are not supported by the server, otherwise, it is done on
888                  * enqueue. */
889                 if (!exp_connect_cancelset(lock->l_conn_export) &&
890                     !ns_connect_lru_resize(ns))
891                         ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
892         } else {
893                 LDLM_DEBUG(lock, "do not add lock into lru list");
894                 unlock_res_and_lock(lock);
895         }
896 }
897
898 /**
899  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
900  */
901 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
902 {
903         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
904         LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
905         ldlm_lock_decref_internal(lock, mode);
906         LDLM_LOCK_PUT(lock);
907 }
908 EXPORT_SYMBOL(ldlm_lock_decref);
909
910 /**
911  * Decrease reader/writer refcount for LDLM lock with handle
912  * \a lockh and mark it for subsequent cancellation once r/w refcount
913  * drops to zero instead of putting into LRU.
914  *
915  * Typical usage is for GROUP locks which we cannot allow to be cached.
916  */
917 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode)
918 {
919         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
920
921         LASSERT(lock != NULL);
922
923         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
924         lock_res_and_lock(lock);
925         lock->l_flags |= LDLM_FL_CBPENDING;
926         unlock_res_and_lock(lock);
927         ldlm_lock_decref_internal(lock, mode);
928         LDLM_LOCK_PUT(lock);
929 }
930 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
931
932 struct sl_insert_point {
933         struct list_head *res_link;
934         struct list_head *mode_link;
935         struct list_head *policy_link;
936 };
937
938 /**
939  * Finds a position to insert the new lock into granted lock list.
940  *
941  * Used for locks eligible for skiplist optimization.
942  *
943  * Parameters:
944  *      queue [input]:  the granted list where search acts on;
945  *      req [input]:    the lock whose position to be located;
946  *      prev [output]:  positions within 3 lists to insert @req to
947  * Return Value:
948  *      filled @prev
949  * NOTE: called by
950  *  - ldlm_grant_lock_with_skiplist
951  */
952 static void search_granted_lock(struct list_head *queue,
953                                 struct ldlm_lock *req,
954                                 struct sl_insert_point *prev)
955 {
956         struct list_head *tmp;
957         struct ldlm_lock *lock, *mode_end, *policy_end;
958
959         list_for_each(tmp, queue) {
960                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
961
962                 mode_end = list_entry(lock->l_sl_mode.prev,
963                                           struct ldlm_lock, l_sl_mode);
964
965                 if (lock->l_req_mode != req->l_req_mode) {
966                         /* jump to last lock of mode group */
967                         tmp = &mode_end->l_res_link;
968                         continue;
969                 }
970
971                 /* suitable mode group is found */
972                 if (lock->l_resource->lr_type == LDLM_PLAIN) {
973                         /* insert point is last lock of the mode group */
974                         prev->res_link = &mode_end->l_res_link;
975                         prev->mode_link = &mode_end->l_sl_mode;
976                         prev->policy_link = &req->l_sl_policy;
977                         return;
978                 } else if (lock->l_resource->lr_type == LDLM_IBITS) {
979                         for (;;) {
980                                 policy_end =
981                                         list_entry(lock->l_sl_policy.prev,
982                                                        struct ldlm_lock,
983                                                        l_sl_policy);
984
985                                 if (lock->l_policy_data.l_inodebits.bits ==
986                                     req->l_policy_data.l_inodebits.bits) {
987                                         /* insert point is last lock of
988                                          * the policy group */
989                                         prev->res_link =
990                                                 &policy_end->l_res_link;
991                                         prev->mode_link =
992                                                 &policy_end->l_sl_mode;
993                                         prev->policy_link =
994                                                 &policy_end->l_sl_policy;
995                                         return;
996                                 }
997
998                                 if (policy_end == mode_end)
999                                         /* done with mode group */
1000                                         break;
1001
1002                                 /* go to next policy group within mode group */
1003                                 tmp = policy_end->l_res_link.next;
1004                                 lock = list_entry(tmp, struct ldlm_lock,
1005                                                       l_res_link);
1006                         }  /* loop over policy groups within the mode group */
1007
1008                         /* insert point is last lock of the mode group,
1009                          * new policy group is started */
1010                         prev->res_link = &mode_end->l_res_link;
1011                         prev->mode_link = &mode_end->l_sl_mode;
1012                         prev->policy_link = &req->l_sl_policy;
1013                         return;
1014                 } else {
1015                         LDLM_ERROR(lock,"is not LDLM_PLAIN or LDLM_IBITS lock");
1016                         LBUG();
1017                 }
1018         }
1019
1020         /* insert point is last lock on the queue,
1021          * new mode group and new policy group are started */
1022         prev->res_link = queue->prev;
1023         prev->mode_link = &req->l_sl_mode;
1024         prev->policy_link = &req->l_sl_policy;
1025         return;
1026 }
1027
1028 /**
1029  * Add a lock into resource granted list after a position described by
1030  * \a prev.
1031  */
1032 static void ldlm_granted_list_add_lock(struct ldlm_lock *lock,
1033                                        struct sl_insert_point *prev)
1034 {
1035         struct ldlm_resource *res = lock->l_resource;
1036
1037         check_res_locked(res);
1038
1039         ldlm_resource_dump(D_INFO, res);
1040         LDLM_DEBUG(lock, "About to add lock:");
1041
1042         if (lock->l_flags & LDLM_FL_DESTROYED) {
1043                 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n");
1044                 return;
1045         }
1046
1047         LASSERT(list_empty(&lock->l_res_link));
1048         LASSERT(list_empty(&lock->l_sl_mode));
1049         LASSERT(list_empty(&lock->l_sl_policy));
1050
1051         /*
1052          * lock->link == prev->link means lock is first starting the group.
1053          * Don't re-add to itself to suppress kernel warnings.
1054          */
1055         if (&lock->l_res_link != prev->res_link)
1056                 list_add(&lock->l_res_link, prev->res_link);
1057         if (&lock->l_sl_mode != prev->mode_link)
1058                 list_add(&lock->l_sl_mode, prev->mode_link);
1059         if (&lock->l_sl_policy != prev->policy_link)
1060                 list_add(&lock->l_sl_policy, prev->policy_link);
1061 }
1062
1063 /**
1064  * Add a lock to granted list on a resource maintaining skiplist
1065  * correctness.
1066  */
1067 static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
1068 {
1069         struct sl_insert_point prev;
1070
1071         LASSERT(lock->l_req_mode == lock->l_granted_mode);
1072
1073         search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
1074         ldlm_granted_list_add_lock(lock, &prev);
1075 }
1076
1077 /**
1078  * Perform lock granting bookkeeping.
1079  *
1080  * Includes putting the lock into granted list and updating lock mode.
1081  * NOTE: called by
1082  *  - ldlm_lock_enqueue
1083  *  - ldlm_reprocess_queue
1084  *  - ldlm_lock_convert
1085  *
1086  * must be called with lr_lock held
1087  */
1088 void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
1089 {
1090         struct ldlm_resource *res = lock->l_resource;
1091
1092         check_res_locked(res);
1093
1094         lock->l_granted_mode = lock->l_req_mode;
1095         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
1096                 ldlm_grant_lock_with_skiplist(lock);
1097         else if (res->lr_type == LDLM_EXTENT)
1098                 ldlm_extent_add_lock(res, lock);
1099         else
1100                 ldlm_resource_add_lock(res, &res->lr_granted, lock);
1101
1102         if (lock->l_granted_mode < res->lr_most_restr)
1103                 res->lr_most_restr = lock->l_granted_mode;
1104
1105         if (work_list && lock->l_completion_ast != NULL)
1106                 ldlm_add_ast_work_item(lock, NULL, work_list);
1107
1108         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
1109 }
1110
1111 /**
1112  * Search for a lock with given properties in a queue.
1113  *
1114  * \retval a referenced lock or NULL.  See the flag descriptions below, in the
1115  * comment above ldlm_lock_match
1116  */
1117 static struct ldlm_lock *search_queue(struct list_head *queue,
1118                                       ldlm_mode_t *mode,
1119                                       ldlm_policy_data_t *policy,
1120                                       struct ldlm_lock *old_lock,
1121                                       __u64 flags, int unref)
1122 {
1123         struct ldlm_lock *lock;
1124         struct list_head       *tmp;
1125
1126         list_for_each(tmp, queue) {
1127                 ldlm_mode_t match;
1128
1129                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1130
1131                 if (lock == old_lock)
1132                         break;
1133
1134                 /* Check if this lock can be matched.
1135                  * Used by LU-2919(exclusive open) for open lease lock */
1136                 if (ldlm_is_excl(lock))
1137                         continue;
1138
1139                 /* llite sometimes wants to match locks that will be
1140                  * canceled when their users drop, but we allow it to match
1141                  * if it passes in CBPENDING and the lock still has users.
1142                  * this is generally only going to be used by children
1143                  * whose parents already hold a lock so forward progress
1144                  * can still happen. */
1145                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1146                     !(flags & LDLM_FL_CBPENDING))
1147                         continue;
1148                 if (!unref && lock->l_flags & LDLM_FL_CBPENDING &&
1149                     lock->l_readers == 0 && lock->l_writers == 0)
1150                         continue;
1151
1152                 if (!(lock->l_req_mode & *mode))
1153                         continue;
1154                 match = lock->l_req_mode;
1155
1156                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
1157                     (lock->l_policy_data.l_extent.start >
1158                      policy->l_extent.start ||
1159                      lock->l_policy_data.l_extent.end < policy->l_extent.end))
1160                         continue;
1161
1162                 if (unlikely(match == LCK_GROUP) &&
1163                     lock->l_resource->lr_type == LDLM_EXTENT &&
1164                     lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
1165                         continue;
1166
1167                 /* We match if we have existing lock with same or wider set
1168                    of bits. */
1169                 if (lock->l_resource->lr_type == LDLM_IBITS &&
1170                      ((lock->l_policy_data.l_inodebits.bits &
1171                       policy->l_inodebits.bits) !=
1172                       policy->l_inodebits.bits))
1173                         continue;
1174
1175                 if (!unref && (lock->l_flags & LDLM_FL_GONE_MASK))
1176                         continue;
1177
1178                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
1179                     !(lock->l_flags & LDLM_FL_LOCAL))
1180                         continue;
1181
1182                 if (flags & LDLM_FL_TEST_LOCK) {
1183                         LDLM_LOCK_GET(lock);
1184                         ldlm_lock_touch_in_lru(lock);
1185                 } else {
1186                         ldlm_lock_addref_internal_nolock(lock, match);
1187                 }
1188                 *mode = match;
1189                 return lock;
1190         }
1191
1192         return NULL;
1193 }
1194
1195 void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
1196 {
1197         if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
1198                 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
1199                 wake_up_all(&lock->l_waitq);
1200         }
1201 }
1202 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
1203
1204 void ldlm_lock_fail_match(struct ldlm_lock *lock)
1205 {
1206         lock_res_and_lock(lock);
1207         ldlm_lock_fail_match_locked(lock);
1208         unlock_res_and_lock(lock);
1209 }
1210 EXPORT_SYMBOL(ldlm_lock_fail_match);
1211
1212 /**
1213  * Mark lock as "matchable" by OST.
1214  *
1215  * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB
1216  * is not yet valid.
1217  * Assumes LDLM lock is already locked.
1218  */
1219 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
1220 {
1221         lock->l_flags |= LDLM_FL_LVB_READY;
1222         wake_up_all(&lock->l_waitq);
1223 }
1224 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
1225
1226 /**
1227  * Mark lock as "matchable" by OST.
1228  * Locks the lock and then \see ldlm_lock_allow_match_locked
1229  */
1230 void ldlm_lock_allow_match(struct ldlm_lock *lock)
1231 {
1232         lock_res_and_lock(lock);
1233         ldlm_lock_allow_match_locked(lock);
1234         unlock_res_and_lock(lock);
1235 }
1236 EXPORT_SYMBOL(ldlm_lock_allow_match);
1237
1238 /**
1239  * Attempt to find a lock with specified properties.
1240  *
1241  * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is
1242  * set in \a flags
1243  *
1244  * Can be called in two ways:
1245  *
1246  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
1247  * for a duplicate of.
1248  *
1249  * Otherwise, all of the fields must be filled in, to match against.
1250  *
1251  * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the
1252  *     server (ie, connh is NULL)
1253  * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted
1254  *     list will be considered
1255  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
1256  *     to be canceled can still be matched as long as they still have reader
1257  *     or writer referneces
1258  * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
1259  *     just tell us if we would have matched.
1260  *
1261  * \retval 1 if it finds an already-existing lock that is compatible; in this
1262  * case, lockh is filled in with a addref()ed lock
1263  *
1264  * We also check security context, and if that fails we simply return 0 (to
1265  * keep caller code unchanged), the context failure will be discovered by
1266  * caller sometime later.
1267  */
1268 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
1269                             const struct ldlm_res_id *res_id, ldlm_type_t type,
1270                             ldlm_policy_data_t *policy, ldlm_mode_t mode,
1271                             struct lustre_handle *lockh, int unref)
1272 {
1273         struct ldlm_resource *res;
1274         struct ldlm_lock *lock, *old_lock = NULL;
1275         int rc = 0;
1276
1277         if (ns == NULL) {
1278                 old_lock = ldlm_handle2lock(lockh);
1279                 LASSERT(old_lock);
1280
1281                 ns = ldlm_lock_to_ns(old_lock);
1282                 res_id = &old_lock->l_resource->lr_name;
1283                 type = old_lock->l_resource->lr_type;
1284                 mode = old_lock->l_req_mode;
1285         }
1286
1287         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
1288         if (res == NULL) {
1289                 LASSERT(old_lock == NULL);
1290                 return 0;
1291         }
1292
1293         LDLM_RESOURCE_ADDREF(res);
1294         lock_res(res);
1295
1296         lock = search_queue(&res->lr_granted, &mode, policy, old_lock,
1297                             flags, unref);
1298         if (lock != NULL)
1299                 GOTO(out, rc = 1);
1300         if (flags & LDLM_FL_BLOCK_GRANTED)
1301                 GOTO(out, rc = 0);
1302         lock = search_queue(&res->lr_converting, &mode, policy, old_lock,
1303                             flags, unref);
1304         if (lock != NULL)
1305                 GOTO(out, rc = 1);
1306         lock = search_queue(&res->lr_waiting, &mode, policy, old_lock,
1307                             flags, unref);
1308         if (lock != NULL)
1309                 GOTO(out, rc = 1);
1310
1311  out:
1312         unlock_res(res);
1313         LDLM_RESOURCE_DELREF(res);
1314         ldlm_resource_putref(res);
1315
1316         if (lock) {
1317                 ldlm_lock2handle(lock, lockh);
1318                 if ((flags & LDLM_FL_LVB_READY) &&
1319                     (!(lock->l_flags & LDLM_FL_LVB_READY))) {
1320                         __u64 wait_flags = LDLM_FL_LVB_READY |
1321                                 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
1322                         struct l_wait_info lwi;
1323                         if (lock->l_completion_ast) {
1324                                 int err = lock->l_completion_ast(lock,
1325                                                           LDLM_FL_WAIT_NOREPROC,
1326                                                                  NULL);
1327                                 if (err) {
1328                                         if (flags & LDLM_FL_TEST_LOCK)
1329                                                 LDLM_LOCK_RELEASE(lock);
1330                                         else
1331                                                 ldlm_lock_decref_internal(lock,
1332                                                                           mode);
1333                                         rc = 0;
1334                                         goto out2;
1335                                 }
1336                         }
1337
1338                         lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
1339                                                NULL, LWI_ON_SIGNAL_NOOP, NULL);
1340
1341                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
1342                         l_wait_event(lock->l_waitq,
1343                                      lock->l_flags & wait_flags,
1344                                      &lwi);
1345                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
1346                                 if (flags & LDLM_FL_TEST_LOCK)
1347                                         LDLM_LOCK_RELEASE(lock);
1348                                 else
1349                                         ldlm_lock_decref_internal(lock, mode);
1350                                 rc = 0;
1351                         }
1352                 }
1353         }
1354  out2:
1355         if (rc) {
1356                 LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
1357                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1358                                 res_id->name[2] : policy->l_extent.start,
1359                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1360                                 res_id->name[3] : policy->l_extent.end);
1361
1362                 /* check user's security context */
1363                 if (lock->l_conn_export &&
1364                     sptlrpc_import_check_ctx(
1365                                 class_exp2cliimp(lock->l_conn_export))) {
1366                         if (!(flags & LDLM_FL_TEST_LOCK))
1367                                 ldlm_lock_decref_internal(lock, mode);
1368                         rc = 0;
1369                 }
1370
1371                 if (flags & LDLM_FL_TEST_LOCK)
1372                         LDLM_LOCK_RELEASE(lock);
1373
1374         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
1375                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
1376                                   LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
1377                                   type, mode, res_id->name[0], res_id->name[1],
1378                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1379                                         res_id->name[2] :policy->l_extent.start,
1380                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
1381                                         res_id->name[3] : policy->l_extent.end);
1382         }
1383         if (old_lock)
1384                 LDLM_LOCK_PUT(old_lock);
1385
1386         return rc ? mode : 0;
1387 }
1388 EXPORT_SYMBOL(ldlm_lock_match);
1389
1390 ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
1391                                         __u64 *bits)
1392 {
1393         struct ldlm_lock *lock;
1394         ldlm_mode_t mode = 0;
1395
1396         lock = ldlm_handle2lock(lockh);
1397         if (lock != NULL) {
1398                 lock_res_and_lock(lock);
1399                 if (lock->l_flags & LDLM_FL_GONE_MASK)
1400                         GOTO(out, mode);
1401
1402                 if (lock->l_flags & LDLM_FL_CBPENDING &&
1403                     lock->l_readers == 0 && lock->l_writers == 0)
1404                         GOTO(out, mode);
1405
1406                 if (bits)
1407                         *bits = lock->l_policy_data.l_inodebits.bits;
1408                 mode = lock->l_granted_mode;
1409                 ldlm_lock_addref_internal_nolock(lock, mode);
1410         }
1411
1412 out:
1413         if (lock != NULL) {
1414                 unlock_res_and_lock(lock);
1415                 LDLM_LOCK_PUT(lock);
1416         }
1417         return mode;
1418 }
1419 EXPORT_SYMBOL(ldlm_revalidate_lock_handle);
1420
1421 /** The caller must guarantee that the buffer is large enough. */
1422 int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill,
1423                   enum req_location loc, void *data, int size)
1424 {
1425         void *lvb;
1426
1427         LASSERT(data != NULL);
1428         LASSERT(size >= 0);
1429
1430         switch (lock->l_lvb_type) {
1431         case LVB_T_OST:
1432                 if (size == sizeof(struct ost_lvb)) {
1433                         if (loc == RCL_CLIENT)
1434                                 lvb = req_capsule_client_swab_get(pill,
1435                                                 &RMF_DLM_LVB,
1436                                                 lustre_swab_ost_lvb);
1437                         else
1438                                 lvb = req_capsule_server_swab_get(pill,
1439                                                 &RMF_DLM_LVB,
1440                                                 lustre_swab_ost_lvb);
1441                         if (unlikely(lvb == NULL)) {
1442                                 LDLM_ERROR(lock, "no LVB");
1443                                 return -EPROTO;
1444                         }
1445
1446                         memcpy(data, lvb, size);
1447                 } else if (size == sizeof(struct ost_lvb_v1)) {
1448                         struct ost_lvb *olvb = data;
1449
1450                         if (loc == RCL_CLIENT)
1451                                 lvb = req_capsule_client_swab_get(pill,
1452                                                 &RMF_DLM_LVB,
1453                                                 lustre_swab_ost_lvb_v1);
1454                         else
1455                                 lvb = req_capsule_server_sized_swab_get(pill,
1456                                                 &RMF_DLM_LVB, size,
1457                                                 lustre_swab_ost_lvb_v1);
1458                         if (unlikely(lvb == NULL)) {
1459                                 LDLM_ERROR(lock, "no LVB");
1460                                 return -EPROTO;
1461                         }
1462
1463                         memcpy(data, lvb, size);
1464                         olvb->lvb_mtime_ns = 0;
1465                         olvb->lvb_atime_ns = 0;
1466                         olvb->lvb_ctime_ns = 0;
1467                 } else {
1468                         LDLM_ERROR(lock, "Replied unexpected ost LVB size %d",
1469                                    size);
1470                         return -EINVAL;
1471                 }
1472                 break;
1473         case LVB_T_LQUOTA:
1474                 if (size == sizeof(struct lquota_lvb)) {
1475                         if (loc == RCL_CLIENT)
1476                                 lvb = req_capsule_client_swab_get(pill,
1477                                                 &RMF_DLM_LVB,
1478                                                 lustre_swab_lquota_lvb);
1479                         else
1480                                 lvb = req_capsule_server_swab_get(pill,
1481                                                 &RMF_DLM_LVB,
1482                                                 lustre_swab_lquota_lvb);
1483                         if (unlikely(lvb == NULL)) {
1484                                 LDLM_ERROR(lock, "no LVB");
1485                                 return -EPROTO;
1486                         }
1487
1488                         memcpy(data, lvb, size);
1489                 } else {
1490                         LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d",
1491                                    size);
1492                         return -EINVAL;
1493                 }
1494                 break;
1495         case LVB_T_LAYOUT:
1496                 if (size == 0)
1497                         break;
1498
1499                 if (loc == RCL_CLIENT)
1500                         lvb = req_capsule_client_get(pill, &RMF_DLM_LVB);
1501                 else
1502                         lvb = req_capsule_server_get(pill, &RMF_DLM_LVB);
1503                 if (unlikely(lvb == NULL)) {
1504                         LDLM_ERROR(lock, "no LVB");
1505                         return -EPROTO;
1506                 }
1507
1508                 memcpy(data, lvb, size);
1509                 break;
1510         default:
1511                 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type);
1512                 dump_stack();
1513                 return -EINVAL;
1514         }
1515
1516         return 0;
1517 }
1518
1519 /**
1520  * Create and fill in new LDLM lock with specified properties.
1521  * Returns a referenced lock
1522  */
1523 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
1524                                    const struct ldlm_res_id *res_id,
1525                                    ldlm_type_t type,
1526                                    ldlm_mode_t mode,
1527                                    const struct ldlm_callback_suite *cbs,
1528                                    void *data, __u32 lvb_len,
1529                                    enum lvb_type lvb_type)
1530 {
1531         struct ldlm_lock *lock;
1532         struct ldlm_resource *res;
1533
1534         res = ldlm_resource_get(ns, NULL, res_id, type, 1);
1535         if (res == NULL)
1536                 return NULL;
1537
1538         lock = ldlm_lock_new(res);
1539
1540         if (lock == NULL)
1541                 return NULL;
1542
1543         lock->l_req_mode = mode;
1544         lock->l_ast_data = data;
1545         lock->l_pid = current_pid();
1546         if (ns_is_server(ns))
1547                 lock->l_flags |= LDLM_FL_NS_SRV;
1548         if (cbs) {
1549                 lock->l_blocking_ast = cbs->lcs_blocking;
1550                 lock->l_completion_ast = cbs->lcs_completion;
1551                 lock->l_glimpse_ast = cbs->lcs_glimpse;
1552         }
1553
1554         lock->l_tree_node = NULL;
1555         /* if this is the extent lock, allocate the interval tree node */
1556         if (type == LDLM_EXTENT) {
1557                 if (ldlm_interval_alloc(lock) == NULL)
1558                         GOTO(out, 0);
1559         }
1560
1561         if (lvb_len) {
1562                 lock->l_lvb_len = lvb_len;
1563                 OBD_ALLOC(lock->l_lvb_data, lvb_len);
1564                 if (lock->l_lvb_data == NULL)
1565                         GOTO(out, 0);
1566         }
1567
1568         lock->l_lvb_type = lvb_type;
1569         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK))
1570                 GOTO(out, 0);
1571
1572         return lock;
1573
1574 out:
1575         ldlm_lock_destroy(lock);
1576         LDLM_LOCK_RELEASE(lock);
1577         return NULL;
1578 }
1579
1580 /**
1581  * Enqueue (request) a lock.
1582  *
1583  * Does not block. As a result of enqueue the lock would be put
1584  * into granted or waiting list.
1585  *
1586  * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag
1587  * set, skip all the enqueueing and delegate lock processing to intent policy
1588  * function.
1589  */
1590 ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
1591                                struct ldlm_lock **lockp,
1592                                void *cookie, __u64 *flags)
1593 {
1594         struct ldlm_lock *lock = *lockp;
1595         struct ldlm_resource *res = lock->l_resource;
1596         int local = ns_is_client(ldlm_res_to_ns(res));
1597         ldlm_error_t rc = ELDLM_OK;
1598         struct ldlm_interval *node = NULL;
1599
1600         lock->l_last_activity = cfs_time_current_sec();
1601         /* policies are not executed on the client or during replay */
1602         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
1603             && !local && ns->ns_policy) {
1604                 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
1605                                    NULL);
1606                 if (rc == ELDLM_LOCK_REPLACED) {
1607                         /* The lock that was returned has already been granted,
1608                          * and placed into lockp.  If it's not the same as the
1609                          * one we passed in, then destroy the old one and our
1610                          * work here is done. */
1611                         if (lock != *lockp) {
1612                                 ldlm_lock_destroy(lock);
1613                                 LDLM_LOCK_RELEASE(lock);
1614                         }
1615                         *flags |= LDLM_FL_LOCK_CHANGED;
1616                         return 0;
1617                 } else if (rc != ELDLM_OK ||
1618                            (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {
1619                         ldlm_lock_destroy(lock);
1620                         return rc;
1621                 }
1622         }
1623
1624         /* For a replaying lock, it might be already in granted list. So
1625          * unlinking the lock will cause the interval node to be freed, we
1626          * have to allocate the interval node early otherwise we can't regrant
1627          * this lock in the future. - jay */
1628         if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
1629                 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, __GFP_IO);
1630
1631         lock_res_and_lock(lock);
1632         if (local && lock->l_req_mode == lock->l_granted_mode) {
1633                 /* The server returned a blocked lock, but it was granted
1634                  * before we got a chance to actually enqueue it.  We don't
1635                  * need to do anything else. */
1636                 *flags &= ~(LDLM_FL_BLOCK_GRANTED |
1637                             LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);
1638                 GOTO(out, ELDLM_OK);
1639         }
1640
1641         ldlm_resource_unlink_lock(lock);
1642         if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {
1643                 if (node == NULL) {
1644                         ldlm_lock_destroy_nolock(lock);
1645                         GOTO(out, rc = -ENOMEM);
1646                 }
1647
1648                 INIT_LIST_HEAD(&node->li_group);
1649                 ldlm_interval_attach(node, lock);
1650                 node = NULL;
1651         }
1652
1653         /* Some flags from the enqueue want to make it into the AST, via the
1654          * lock's l_flags. */
1655         lock->l_flags |= *flags & LDLM_FL_AST_DISCARD_DATA;
1656
1657         /* This distinction between local lock trees is very important; a client
1658          * namespace only has information about locks taken by that client, and
1659          * thus doesn't have enough information to decide for itself if it can
1660          * be granted (below).  In this case, we do exactly what the server
1661          * tells us to do, as dictated by the 'flags'.
1662          *
1663          * We do exactly the same thing during recovery, when the server is
1664          * more or less trusting the clients not to lie.
1665          *
1666          * FIXME (bug 268): Detect obvious lies by checking compatibility in
1667          * granted/converting queues. */
1668         if (local) {
1669                 if (*flags & LDLM_FL_BLOCK_CONV)
1670                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
1671                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
1672                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
1673                 else
1674                         ldlm_grant_lock(lock, NULL);
1675                 GOTO(out, ELDLM_OK);
1676         } else {
1677                 CERROR("This is client-side-only module, cannot handle "
1678                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1679                 LBUG();
1680         }
1681
1682 out:
1683         unlock_res_and_lock(lock);
1684         if (node)
1685                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
1686         return rc;
1687 }
1688
1689
1690 /**
1691  * Process a call to blocking AST callback for a lock in ast_work list
1692  */
1693 static int
1694 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1695 {
1696         struct ldlm_cb_set_arg *arg = opaq;
1697         struct ldlm_lock_desc   d;
1698         int                  rc;
1699         struct ldlm_lock       *lock;
1700
1701         if (list_empty(arg->list))
1702                 return -ENOENT;
1703
1704         lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
1705
1706         /* nobody should touch l_bl_ast */
1707         lock_res_and_lock(lock);
1708         list_del_init(&lock->l_bl_ast);
1709
1710         LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1711         LASSERT(lock->l_bl_ast_run == 0);
1712         LASSERT(lock->l_blocking_lock);
1713         lock->l_bl_ast_run++;
1714         unlock_res_and_lock(lock);
1715
1716         ldlm_lock2desc(lock->l_blocking_lock, &d);
1717
1718         rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
1719         LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1720         lock->l_blocking_lock = NULL;
1721         LDLM_LOCK_RELEASE(lock);
1722
1723         return rc;
1724 }
1725
1726 /**
1727  * Process a call to completion AST callback for a lock in ast_work list
1728  */
1729 static int
1730 ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1731 {
1732         struct ldlm_cb_set_arg  *arg = opaq;
1733         int                   rc = 0;
1734         struct ldlm_lock        *lock;
1735         ldlm_completion_callback completion_callback;
1736
1737         if (list_empty(arg->list))
1738                 return -ENOENT;
1739
1740         lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
1741
1742         /* It's possible to receive a completion AST before we've set
1743          * the l_completion_ast pointer: either because the AST arrived
1744          * before the reply, or simply because there's a small race
1745          * window between receiving the reply and finishing the local
1746          * enqueue. (bug 842)
1747          *
1748          * This can't happen with the blocking_ast, however, because we
1749          * will never call the local blocking_ast until we drop our
1750          * reader/writer reference, which we won't do until we get the
1751          * reply and finish enqueueing. */
1752
1753         /* nobody should touch l_cp_ast */
1754         lock_res_and_lock(lock);
1755         list_del_init(&lock->l_cp_ast);
1756         LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
1757         /* save l_completion_ast since it can be changed by
1758          * mds_intent_policy(), see bug 14225 */
1759         completion_callback = lock->l_completion_ast;
1760         lock->l_flags &= ~LDLM_FL_CP_REQD;
1761         unlock_res_and_lock(lock);
1762
1763         if (completion_callback != NULL)
1764                 rc = completion_callback(lock, 0, (void *)arg);
1765         LDLM_LOCK_RELEASE(lock);
1766
1767         return rc;
1768 }
1769
1770 /**
1771  * Process a call to revocation AST callback for a lock in ast_work list
1772  */
1773 static int
1774 ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1775 {
1776         struct ldlm_cb_set_arg *arg = opaq;
1777         struct ldlm_lock_desc   desc;
1778         int                  rc;
1779         struct ldlm_lock       *lock;
1780
1781         if (list_empty(arg->list))
1782                 return -ENOENT;
1783
1784         lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast);
1785         list_del_init(&lock->l_rk_ast);
1786
1787         /* the desc just pretend to exclusive */
1788         ldlm_lock2desc(lock, &desc);
1789         desc.l_req_mode = LCK_EX;
1790         desc.l_granted_mode = 0;
1791
1792         rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
1793         LDLM_LOCK_RELEASE(lock);
1794
1795         return rc;
1796 }
1797
1798 /**
1799  * Process a call to glimpse AST callback for a lock in ast_work list
1800  */
1801 int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
1802 {
1803         struct ldlm_cb_set_arg          *arg = opaq;
1804         struct ldlm_glimpse_work        *gl_work;
1805         struct ldlm_lock                *lock;
1806         int                              rc = 0;
1807
1808         if (list_empty(arg->list))
1809                 return -ENOENT;
1810
1811         gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work,
1812                                  gl_list);
1813         list_del_init(&gl_work->gl_list);
1814
1815         lock = gl_work->gl_lock;
1816
1817         /* transfer the glimpse descriptor to ldlm_cb_set_arg */
1818         arg->gl_desc = gl_work->gl_desc;
1819
1820         /* invoke the actual glimpse callback */
1821         if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
1822                 rc = 1;
1823
1824         LDLM_LOCK_RELEASE(lock);
1825
1826         if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
1827                 OBD_FREE_PTR(gl_work);
1828
1829         return rc;
1830 }
1831
1832 /**
1833  * Process list of locks in need of ASTs being sent.
1834  *
1835  * Used on server to send multiple ASTs together instead of sending one by
1836  * one.
1837  */
1838 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
1839                       ldlm_desc_ast_t ast_type)
1840 {
1841         struct ldlm_cb_set_arg *arg;
1842         set_producer_func       work_ast_lock;
1843         int                  rc;
1844
1845         if (list_empty(rpc_list))
1846                 return 0;
1847
1848         OBD_ALLOC_PTR(arg);
1849         if (arg == NULL)
1850                 return -ENOMEM;
1851
1852         atomic_set(&arg->restart, 0);
1853         arg->list = rpc_list;
1854
1855         switch (ast_type) {
1856                 case LDLM_WORK_BL_AST:
1857                         arg->type = LDLM_BL_CALLBACK;
1858                         work_ast_lock = ldlm_work_bl_ast_lock;
1859                         break;
1860                 case LDLM_WORK_CP_AST:
1861                         arg->type = LDLM_CP_CALLBACK;
1862                         work_ast_lock = ldlm_work_cp_ast_lock;
1863                         break;
1864                 case LDLM_WORK_REVOKE_AST:
1865                         arg->type = LDLM_BL_CALLBACK;
1866                         work_ast_lock = ldlm_work_revoke_ast_lock;
1867                         break;
1868                 case LDLM_WORK_GL_AST:
1869                         arg->type = LDLM_GL_CALLBACK;
1870                         work_ast_lock = ldlm_work_gl_ast_lock;
1871                         break;
1872                 default:
1873                         LBUG();
1874         }
1875
1876         /* We create a ptlrpc request set with flow control extension.
1877          * This request set will use the work_ast_lock function to produce new
1878          * requests and will send a new request each time one completes in order
1879          * to keep the number of requests in flight to ns_max_parallel_ast */
1880         arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX,
1881                                      work_ast_lock, arg);
1882         if (arg->set == NULL)
1883                 GOTO(out, rc = -ENOMEM);
1884
1885         ptlrpc_set_wait(arg->set);
1886         ptlrpc_set_destroy(arg->set);
1887
1888         rc = atomic_read(&arg->restart) ? -ERESTART : 0;
1889         GOTO(out, rc);
1890 out:
1891         OBD_FREE_PTR(arg);
1892         return rc;
1893 }
1894
1895 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
1896 {
1897         ldlm_reprocess_all(res);
1898         return LDLM_ITER_CONTINUE;
1899 }
1900
1901 static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1902                               struct hlist_node *hnode, void *arg)
1903 {
1904         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
1905         int    rc;
1906
1907         rc = reprocess_one_queue(res, arg);
1908
1909         return rc == LDLM_ITER_STOP;
1910 }
1911
1912 /**
1913  * Iterate through all resources on a namespace attempting to grant waiting
1914  * locks.
1915  */
1916 void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
1917 {
1918         if (ns != NULL) {
1919                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
1920                                          ldlm_reprocess_res, NULL);
1921         }
1922 }
1923 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1924
1925 /**
1926  * Try to grant all waiting locks on a resource.
1927  *
1928  * Calls ldlm_reprocess_queue on converting and waiting queues.
1929  *
1930  * Typically called after some resource locks are cancelled to see
1931  * if anything could be granted as a result of the cancellation.
1932  */
1933 void ldlm_reprocess_all(struct ldlm_resource *res)
1934 {
1935         LIST_HEAD(rpc_list);
1936
1937         if (!ns_is_client(ldlm_res_to_ns(res))) {
1938                 CERROR("This is client-side-only module, cannot handle "
1939                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
1940                 LBUG();
1941         }
1942 }
1943
1944 /**
1945  * Helper function to call blocking AST for LDLM lock \a lock in a
1946  * "cancelling" mode.
1947  */
1948 void ldlm_cancel_callback(struct ldlm_lock *lock)
1949 {
1950         check_res_locked(lock->l_resource);
1951         if (!(lock->l_flags & LDLM_FL_CANCEL)) {
1952                 lock->l_flags |= LDLM_FL_CANCEL;
1953                 if (lock->l_blocking_ast) {
1954                         unlock_res_and_lock(lock);
1955                         lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
1956                                              LDLM_CB_CANCELING);
1957                         lock_res_and_lock(lock);
1958                 } else {
1959                         LDLM_DEBUG(lock, "no blocking ast");
1960                 }
1961         }
1962         lock->l_flags |= LDLM_FL_BL_DONE;
1963 }
1964
1965 /**
1966  * Remove skiplist-enabled LDLM lock \a req from granted list
1967  */
1968 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
1969 {
1970         if (req->l_resource->lr_type != LDLM_PLAIN &&
1971             req->l_resource->lr_type != LDLM_IBITS)
1972                 return;
1973
1974         list_del_init(&req->l_sl_policy);
1975         list_del_init(&req->l_sl_mode);
1976 }
1977
1978 /**
1979  * Attempts to cancel LDLM lock \a lock that has no reader/writer references.
1980  */
1981 void ldlm_lock_cancel(struct ldlm_lock *lock)
1982 {
1983         struct ldlm_resource *res;
1984         struct ldlm_namespace *ns;
1985
1986         lock_res_and_lock(lock);
1987
1988         res = lock->l_resource;
1989         ns  = ldlm_res_to_ns(res);
1990
1991         /* Please do not, no matter how tempting, remove this LBUG without
1992          * talking to me first. -phik */
1993         if (lock->l_readers || lock->l_writers) {
1994                 LDLM_ERROR(lock, "lock still has references");
1995                 LBUG();
1996         }
1997
1998         if (lock->l_flags & LDLM_FL_WAITED)
1999                 ldlm_del_waiting_lock(lock);
2000
2001         /* Releases cancel callback. */
2002         ldlm_cancel_callback(lock);
2003
2004         /* Yes, second time, just in case it was added again while we were
2005          * running with no res lock in ldlm_cancel_callback */
2006         if (lock->l_flags & LDLM_FL_WAITED)
2007                 ldlm_del_waiting_lock(lock);
2008
2009         ldlm_resource_unlink_lock(lock);
2010         ldlm_lock_destroy_nolock(lock);
2011
2012         if (lock->l_granted_mode == lock->l_req_mode)
2013                 ldlm_pool_del(&ns->ns_pool, lock);
2014
2015         /* Make sure we will not be called again for same lock what is possible
2016          * if not to zero out lock->l_granted_mode */
2017         lock->l_granted_mode = LCK_MINMODE;
2018         unlock_res_and_lock(lock);
2019 }
2020 EXPORT_SYMBOL(ldlm_lock_cancel);
2021
2022 /**
2023  * Set opaque data into the lock that only makes sense to upper layer.
2024  */
2025 int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
2026 {
2027         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2028         int rc = -EINVAL;
2029
2030         if (lock) {
2031                 if (lock->l_ast_data == NULL)
2032                         lock->l_ast_data = data;
2033                 if (lock->l_ast_data == data)
2034                         rc = 0;
2035                 LDLM_LOCK_PUT(lock);
2036         }
2037         return rc;
2038 }
2039 EXPORT_SYMBOL(ldlm_lock_set_data);
2040
2041 struct export_cl_data {
2042         struct obd_export       *ecl_exp;
2043         int                     ecl_loop;
2044 };
2045
2046 /**
2047  * Iterator function for ldlm_cancel_locks_for_export.
2048  * Cancels passed locks.
2049  */
2050 int ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2051                                     struct hlist_node *hnode, void *data)
2052
2053 {
2054         struct export_cl_data   *ecl = (struct export_cl_data *)data;
2055         struct obd_export       *exp  = ecl->ecl_exp;
2056         struct ldlm_lock     *lock = cfs_hash_object(hs, hnode);
2057         struct ldlm_resource *res;
2058
2059         res = ldlm_resource_getref(lock->l_resource);
2060         LDLM_LOCK_GET(lock);
2061
2062         LDLM_DEBUG(lock, "export %p", exp);
2063         ldlm_res_lvbo_update(res, NULL, 1);
2064         ldlm_lock_cancel(lock);
2065         ldlm_reprocess_all(res);
2066         ldlm_resource_putref(res);
2067         LDLM_LOCK_RELEASE(lock);
2068
2069         ecl->ecl_loop++;
2070         if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
2071                 CDEBUG(D_INFO,
2072                        "Cancel lock %p for export %p (loop %d), still have "
2073                        "%d locks left on hash table.\n",
2074                        lock, exp, ecl->ecl_loop,
2075                        atomic_read(&hs->hs_count));
2076         }
2077
2078         return 0;
2079 }
2080
2081 /**
2082  * Cancel all locks for given export.
2083  *
2084  * Typically called on client disconnection/eviction
2085  */
2086 void ldlm_cancel_locks_for_export(struct obd_export *exp)
2087 {
2088         struct export_cl_data   ecl = {
2089                 .ecl_exp        = exp,
2090                 .ecl_loop       = 0,
2091         };
2092
2093         cfs_hash_for_each_empty(exp->exp_lock_hash,
2094                                 ldlm_cancel_locks_for_export_cb, &ecl);
2095 }
2096
2097 /**
2098  * Downgrade an exclusive lock.
2099  *
2100  * A fast variant of ldlm_lock_convert for conversion of exclusive
2101  * locks. The conversion is always successful.
2102  * Used by Commit on Sharing (COS) code.
2103  *
2104  * \param lock A lock to convert
2105  * \param new_mode new lock mode
2106  */
2107 void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
2108 {
2109         LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
2110         LASSERT(new_mode == LCK_COS);
2111
2112         lock_res_and_lock(lock);
2113         ldlm_resource_unlink_lock(lock);
2114         /*
2115          * Remove the lock from pool as it will be added again in
2116          * ldlm_grant_lock() called below.
2117          */
2118         ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
2119
2120         lock->l_req_mode = new_mode;
2121         ldlm_grant_lock(lock, NULL);
2122         unlock_res_and_lock(lock);
2123         ldlm_reprocess_all(lock->l_resource);
2124 }
2125 EXPORT_SYMBOL(ldlm_lock_downgrade);
2126
2127 /**
2128  * Attempt to convert already granted lock to a different mode.
2129  *
2130  * While lock conversion is not currently used, future client-side
2131  * optimizations could take advantage of it to avoid discarding cached
2132  * pages on a file.
2133  */
2134 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
2135                                         __u32 *flags)
2136 {
2137         LIST_HEAD(rpc_list);
2138         struct ldlm_resource *res;
2139         struct ldlm_namespace *ns;
2140         int granted = 0;
2141         struct ldlm_interval *node;
2142
2143         /* Just return if mode is unchanged. */
2144         if (new_mode == lock->l_granted_mode) {
2145                 *flags |= LDLM_FL_BLOCK_GRANTED;
2146                 return lock->l_resource;
2147         }
2148
2149         /* I can't check the type of lock here because the bitlock of lock
2150          * is not held here, so do the allocation blindly. -jay */
2151         OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, __GFP_IO);
2152         if (node == NULL)
2153                 /* Actually, this causes EDEADLOCK to be returned */
2154                 return NULL;
2155
2156         LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
2157                  "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
2158
2159         lock_res_and_lock(lock);
2160
2161         res = lock->l_resource;
2162         ns  = ldlm_res_to_ns(res);
2163
2164         lock->l_req_mode = new_mode;
2165         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {
2166                 ldlm_resource_unlink_lock(lock);
2167         } else {
2168                 ldlm_resource_unlink_lock(lock);
2169                 if (res->lr_type == LDLM_EXTENT) {
2170                         /* FIXME: ugly code, I have to attach the lock to a
2171                          * interval node again since perhaps it will be granted
2172                          * soon */
2173                         INIT_LIST_HEAD(&node->li_group);
2174                         ldlm_interval_attach(node, lock);
2175                         node = NULL;
2176                 }
2177         }
2178
2179         /*
2180          * Remove old lock from the pool before adding the lock with new
2181          * mode below in ->policy()
2182          */
2183         ldlm_pool_del(&ns->ns_pool, lock);
2184
2185         /* If this is a local resource, put it on the appropriate list. */
2186         if (ns_is_client(ldlm_res_to_ns(res))) {
2187                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {
2188                         ldlm_resource_add_lock(res, &res->lr_converting, lock);
2189                 } else {
2190                         /* This should never happen, because of the way the
2191                          * server handles conversions. */
2192                         LDLM_ERROR(lock, "Erroneous flags %x on local lock\n",
2193                                    *flags);
2194                         LBUG();
2195
2196                         ldlm_grant_lock(lock, &rpc_list);
2197                         granted = 1;
2198                         /* FIXME: completion handling not with lr_lock held ! */
2199                         if (lock->l_completion_ast)
2200                                 lock->l_completion_ast(lock, 0, NULL);
2201                 }
2202         } else {
2203                 CERROR("This is client-side-only module, cannot handle "
2204                        "LDLM_NAMESPACE_SERVER resource type lock.\n");
2205                 LBUG();
2206         }
2207         unlock_res_and_lock(lock);
2208
2209         if (granted)
2210                 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
2211         if (node)
2212                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
2213         return res;
2214 }
2215 EXPORT_SYMBOL(ldlm_lock_convert);
2216
2217 /**
2218  * Print lock with lock handle \a lockh description into debug log.
2219  *
2220  * Used when printing all locks on a resource for debug purposes.
2221  */
2222 void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
2223 {
2224         struct ldlm_lock *lock;
2225
2226         if (!((libcfs_debug | D_ERROR) & level))
2227                 return;
2228
2229         lock = ldlm_handle2lock(lockh);
2230         if (lock == NULL)
2231                 return;
2232
2233         LDLM_DEBUG_LIMIT(level, lock, "###");
2234
2235         LDLM_LOCK_PUT(lock);
2236 }
2237 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2238
2239 /**
2240  * Print lock information with custom message into debug log.
2241  * Helper function.
2242  */
2243 void _ldlm_lock_debug(struct ldlm_lock *lock,
2244                       struct libcfs_debug_msg_data *msgdata,
2245                       const char *fmt, ...)
2246 {
2247         va_list args;
2248         struct obd_export *exp = lock->l_export;
2249         struct ldlm_resource *resource = lock->l_resource;
2250         char *nid = "local";
2251
2252         va_start(args, fmt);
2253
2254         if (exp && exp->exp_connection) {
2255                 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
2256         } else if (exp && exp->exp_obd != NULL) {
2257                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
2258                 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
2259         }
2260
2261         if (resource == NULL) {
2262                 libcfs_debug_vmsg2(msgdata, fmt, args,
2263                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2264                        "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
2265                        "remote: "LPX64" expref: %d pid: %u timeout: %lu "
2266                        "lvb_type: %d\n",
2267                        lock,
2268                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2269                        lock->l_readers, lock->l_writers,
2270                        ldlm_lockname[lock->l_granted_mode],
2271                        ldlm_lockname[lock->l_req_mode],
2272                        lock->l_flags, nid, lock->l_remote_handle.cookie,
2273                        exp ? atomic_read(&exp->exp_refcount) : -99,
2274                        lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
2275                 va_end(args);
2276                 return;
2277         }
2278
2279         switch (resource->lr_type) {
2280         case LDLM_EXTENT:
2281                 libcfs_debug_vmsg2(msgdata, fmt, args,
2282                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2283                         "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
2284                         "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
2285                         LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
2286                         ldlm_lock_to_ns_name(lock), lock,
2287                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2288                         lock->l_readers, lock->l_writers,
2289                         ldlm_lockname[lock->l_granted_mode],
2290                         ldlm_lockname[lock->l_req_mode],
2291                         PLDLMRES(resource),
2292                         atomic_read(&resource->lr_refcount),
2293                         ldlm_typename[resource->lr_type],
2294                         lock->l_policy_data.l_extent.start,
2295                         lock->l_policy_data.l_extent.end,
2296                         lock->l_req_extent.start, lock->l_req_extent.end,
2297                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2298                         exp ? atomic_read(&exp->exp_refcount) : -99,
2299                         lock->l_pid, lock->l_callback_timeout,
2300                         lock->l_lvb_type);
2301                 break;
2302
2303         case LDLM_FLOCK:
2304                 libcfs_debug_vmsg2(msgdata, fmt, args,
2305                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2306                         "res: "DLDLMRES" rrc: %d type: %s pid: %d "
2307                         "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
2308                         "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
2309                         ldlm_lock_to_ns_name(lock), lock,
2310                         lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
2311                         lock->l_readers, lock->l_writers,
2312                         ldlm_lockname[lock->l_granted_mode],
2313                         ldlm_lockname[lock->l_req_mode],
2314                         PLDLMRES(resource),
2315                         atomic_read(&resource->lr_refcount),
2316                         ldlm_typename[resource->lr_type],
2317                         lock->l_policy_data.l_flock.pid,
2318                         lock->l_policy_data.l_flock.start,
2319                         lock->l_policy_data.l_flock.end,
2320                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2321                         exp ? atomic_read(&exp->exp_refcount) : -99,
2322                         lock->l_pid, lock->l_callback_timeout);
2323                 break;
2324
2325         case LDLM_IBITS:
2326                 libcfs_debug_vmsg2(msgdata, fmt, args,
2327                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2328                         "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
2329                         "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
2330                         "pid: %u timeout: %lu lvb_type: %d\n",
2331                         ldlm_lock_to_ns_name(lock),
2332                         lock, lock->l_handle.h_cookie,
2333                         atomic_read(&lock->l_refc),
2334                         lock->l_readers, lock->l_writers,
2335                         ldlm_lockname[lock->l_granted_mode],
2336                         ldlm_lockname[lock->l_req_mode],
2337                         PLDLMRES(resource),
2338                         lock->l_policy_data.l_inodebits.bits,
2339                         atomic_read(&resource->lr_refcount),
2340                         ldlm_typename[resource->lr_type],
2341                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2342                         exp ? atomic_read(&exp->exp_refcount) : -99,
2343                         lock->l_pid, lock->l_callback_timeout,
2344                         lock->l_lvb_type);
2345                 break;
2346
2347         default:
2348                 libcfs_debug_vmsg2(msgdata, fmt, args,
2349                         " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
2350                         "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
2351                         "nid: %s remote: "LPX64" expref: %d pid: %u "
2352                         "timeout: %lu lvb_type: %d\n",
2353                         ldlm_lock_to_ns_name(lock),
2354                         lock, lock->l_handle.h_cookie,
2355                         atomic_read(&lock->l_refc),
2356                         lock->l_readers, lock->l_writers,
2357                         ldlm_lockname[lock->l_granted_mode],
2358                         ldlm_lockname[lock->l_req_mode],
2359                         PLDLMRES(resource),
2360                         atomic_read(&resource->lr_refcount),
2361                         ldlm_typename[resource->lr_type],
2362                         lock->l_flags, nid, lock->l_remote_handle.cookie,
2363                         exp ? atomic_read(&exp->exp_refcount) : -99,
2364                         lock->l_pid, lock->l_callback_timeout,
2365                         lock->l_lvb_type);
2366                 break;
2367         }
2368         va_end(args);
2369 }
2370 EXPORT_SYMBOL(_ldlm_lock_debug);