]> Pileus Git - ~andy/linux/blob - drivers/staging/lustre/lustre/ldlm/ldlm_lockd.c
Merge tag 'for-3.14-merge-window' of git://git.kernel.org/pub/scm/linux/kernel/git...
[~andy/linux] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 # include <linux/libcfs/libcfs.h>
45
46 #include <lustre_dlm.h>
47 #include <obd_class.h>
48 #include <linux/list.h>
49 #include "ldlm_internal.h"
50
51 static int ldlm_num_threads;
52 module_param(ldlm_num_threads, int, 0444);
53 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
54
55 static char *ldlm_cpts;
56 module_param(ldlm_cpts, charp, 0444);
57 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
58
59 extern struct kmem_cache *ldlm_resource_slab;
60 extern struct kmem_cache *ldlm_lock_slab;
61 static struct mutex     ldlm_ref_mutex;
62 static int ldlm_refcount;
63
64 struct ldlm_cb_async_args {
65         struct ldlm_cb_set_arg *ca_set_arg;
66         struct ldlm_lock       *ca_lock;
67 };
68
69 /* LDLM state */
70
71 static struct ldlm_state *ldlm_state;
72
73 inline cfs_time_t round_timeout(cfs_time_t timeout)
74 {
75         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
76 }
77
78 /* timeout for initial callback (AST) reply (bz10399) */
79 static inline unsigned int ldlm_get_rq_timeout(void)
80 {
81         /* Non-AT value */
82         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
83
84         return timeout < 1 ? 1 : timeout;
85 }
86
87 #define ELT_STOPPED   0
88 #define ELT_READY     1
89 #define ELT_TERMINATE 2
90
91 struct ldlm_bl_pool {
92         spinlock_t              blp_lock;
93
94         /*
95          * blp_prio_list is used for callbacks that should be handled
96          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
97          * see bug 13843
98          */
99         struct list_head              blp_prio_list;
100
101         /*
102          * blp_list is used for all other callbacks which are likely
103          * to take longer to process.
104          */
105         struct list_head              blp_list;
106
107         wait_queue_head_t            blp_waitq;
108         struct completion       blp_comp;
109         atomic_t            blp_num_threads;
110         atomic_t            blp_busy_threads;
111         int                  blp_min_threads;
112         int                  blp_max_threads;
113 };
114
115 struct ldlm_bl_work_item {
116         struct list_head              blwi_entry;
117         struct ldlm_namespace  *blwi_ns;
118         struct ldlm_lock_desc   blwi_ld;
119         struct ldlm_lock       *blwi_lock;
120         struct list_head              blwi_head;
121         int                  blwi_count;
122         struct completion       blwi_comp;
123         ldlm_cancel_flags_t     blwi_flags;
124         int                  blwi_mem_pressure;
125 };
126
127
128 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
129 {
130         return 0;
131 }
132
133 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
134 {
135         return 0;
136 }
137
138
139
140 /**
141  * Callback handler for receiving incoming blocking ASTs.
142  *
143  * This can only happen on client side.
144  */
145 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
146                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
147 {
148         int do_ast;
149
150         LDLM_DEBUG(lock, "client blocking AST callback handler");
151
152         lock_res_and_lock(lock);
153         lock->l_flags |= LDLM_FL_CBPENDING;
154
155         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
156                 lock->l_flags |= LDLM_FL_CANCEL;
157
158         do_ast = (!lock->l_readers && !lock->l_writers);
159         unlock_res_and_lock(lock);
160
161         if (do_ast) {
162                 CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
163                        lock, lock->l_blocking_ast);
164                 if (lock->l_blocking_ast != NULL)
165                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
166                                              LDLM_CB_BLOCKING);
167         } else {
168                 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
169                        lock);
170         }
171
172         LDLM_DEBUG(lock, "client blocking callback handler END");
173         LDLM_LOCK_RELEASE(lock);
174 }
175
176 /**
177  * Callback handler for receiving incoming completion ASTs.
178  *
179  * This only can happen on client side.
180  */
181 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
182                                     struct ldlm_namespace *ns,
183                                     struct ldlm_request *dlm_req,
184                                     struct ldlm_lock *lock)
185 {
186         int lvb_len;
187         LIST_HEAD(ast_list);
188         int rc = 0;
189
190         LDLM_DEBUG(lock, "client completion callback handler START");
191
192         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
193                 int to = cfs_time_seconds(1);
194                 while (to > 0) {
195                         schedule_timeout_and_set_state(
196                                 TASK_INTERRUPTIBLE, to);
197                         if (lock->l_granted_mode == lock->l_req_mode ||
198                             lock->l_flags & LDLM_FL_DESTROYED)
199                                 break;
200                 }
201         }
202
203         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
204         if (lvb_len < 0) {
205                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
206                 GOTO(out, rc = lvb_len);
207         } else if (lvb_len > 0) {
208                 if (lock->l_lvb_len > 0) {
209                         /* for extent lock, lvb contains ost_lvb{}. */
210                         LASSERT(lock->l_lvb_data != NULL);
211
212                         if (unlikely(lock->l_lvb_len < lvb_len)) {
213                                 LDLM_ERROR(lock, "Replied LVB is larger than "
214                                            "expectation, expected = %d, "
215                                            "replied = %d",
216                                            lock->l_lvb_len, lvb_len);
217                                 GOTO(out, rc = -EINVAL);
218                         }
219                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
220                                                      * variable length */
221                         void *lvb_data;
222
223                         OBD_ALLOC(lvb_data, lvb_len);
224                         if (lvb_data == NULL) {
225                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
226                                 GOTO(out, rc = -ENOMEM);
227                         }
228
229                         lock_res_and_lock(lock);
230                         LASSERT(lock->l_lvb_data == NULL);
231                         lock->l_lvb_data = lvb_data;
232                         lock->l_lvb_len = lvb_len;
233                         unlock_res_and_lock(lock);
234                 }
235         }
236
237         lock_res_and_lock(lock);
238         if ((lock->l_flags & LDLM_FL_DESTROYED) ||
239             lock->l_granted_mode == lock->l_req_mode) {
240                 /* bug 11300: the lock has already been granted */
241                 unlock_res_and_lock(lock);
242                 LDLM_DEBUG(lock, "Double grant race happened");
243                 GOTO(out, rc = 0);
244         }
245
246         /* If we receive the completion AST before the actual enqueue returned,
247          * then we might need to switch lock modes, resources, or extents. */
248         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
249                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
250                 LDLM_DEBUG(lock, "completion AST, new lock mode");
251         }
252
253         if (lock->l_resource->lr_type != LDLM_PLAIN) {
254                 ldlm_convert_policy_to_local(req->rq_export,
255                                           dlm_req->lock_desc.l_resource.lr_type,
256                                           &dlm_req->lock_desc.l_policy_data,
257                                           &lock->l_policy_data);
258                 LDLM_DEBUG(lock, "completion AST, new policy data");
259         }
260
261         ldlm_resource_unlink_lock(lock);
262         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
263                    &lock->l_resource->lr_name,
264                    sizeof(lock->l_resource->lr_name)) != 0) {
265                 unlock_res_and_lock(lock);
266                 rc = ldlm_lock_change_resource(ns, lock,
267                                 &dlm_req->lock_desc.l_resource.lr_name);
268                 if (rc < 0) {
269                         LDLM_ERROR(lock, "Failed to allocate resource");
270                         GOTO(out, rc);
271                 }
272                 LDLM_DEBUG(lock, "completion AST, new resource");
273                 CERROR("change resource!\n");
274                 lock_res_and_lock(lock);
275         }
276
277         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
278                 /* BL_AST locks are not needed in LRU.
279                  * Let ldlm_cancel_lru() be fast. */
280                 ldlm_lock_remove_from_lru(lock);
281                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
282                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
283         }
284
285         if (lock->l_lvb_len > 0) {
286                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
287                                    lock->l_lvb_data, lvb_len);
288                 if (rc < 0) {
289                         unlock_res_and_lock(lock);
290                         GOTO(out, rc);
291                 }
292         }
293
294         ldlm_grant_lock(lock, &ast_list);
295         unlock_res_and_lock(lock);
296
297         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
298
299         /* Let Enqueue to call osc_lock_upcall() and initialize
300          * l_ast_data */
301         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
302
303         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
304
305         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
306                           lock);
307         GOTO(out, rc);
308
309 out:
310         if (rc < 0) {
311                 lock_res_and_lock(lock);
312                 lock->l_flags |= LDLM_FL_FAILED;
313                 unlock_res_and_lock(lock);
314                 wake_up(&lock->l_waitq);
315         }
316         LDLM_LOCK_RELEASE(lock);
317 }
318
319 /**
320  * Callback handler for receiving incoming glimpse ASTs.
321  *
322  * This only can happen on client side.  After handling the glimpse AST
323  * we also consider dropping the lock here if it is unused locally for a
324  * long time.
325  */
326 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
327                                     struct ldlm_namespace *ns,
328                                     struct ldlm_request *dlm_req,
329                                     struct ldlm_lock *lock)
330 {
331         int rc = -ENOSYS;
332
333         LDLM_DEBUG(lock, "client glimpse AST callback handler");
334
335         if (lock->l_glimpse_ast != NULL)
336                 rc = lock->l_glimpse_ast(lock, req);
337
338         if (req->rq_repmsg != NULL) {
339                 ptlrpc_reply(req);
340         } else {
341                 req->rq_status = rc;
342                 ptlrpc_error(req);
343         }
344
345         lock_res_and_lock(lock);
346         if (lock->l_granted_mode == LCK_PW &&
347             !lock->l_readers && !lock->l_writers &&
348             cfs_time_after(cfs_time_current(),
349                            cfs_time_add(lock->l_last_used,
350                                         cfs_time_seconds(10)))) {
351                 unlock_res_and_lock(lock);
352                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
353                         ldlm_handle_bl_callback(ns, NULL, lock);
354
355                 return;
356         }
357         unlock_res_and_lock(lock);
358         LDLM_LOCK_RELEASE(lock);
359 }
360
361 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
362 {
363         if (req->rq_no_reply)
364                 return 0;
365
366         req->rq_status = rc;
367         if (!req->rq_packed_final) {
368                 rc = lustre_pack_reply(req, 1, NULL, NULL);
369                 if (rc)
370                         return rc;
371         }
372         return ptlrpc_reply(req);
373 }
374
375 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
376                                ldlm_cancel_flags_t cancel_flags)
377 {
378         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
379
380         spin_lock(&blp->blp_lock);
381         if (blwi->blwi_lock &&
382             blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
383                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
384                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
385         } else {
386                 /* other blocking callbacks are added to the regular list */
387                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
388         }
389         spin_unlock(&blp->blp_lock);
390
391         wake_up(&blp->blp_waitq);
392
393         /* can not check blwi->blwi_flags as blwi could be already freed in
394            LCF_ASYNC mode */
395         if (!(cancel_flags & LCF_ASYNC))
396                 wait_for_completion(&blwi->blwi_comp);
397
398         return 0;
399 }
400
401 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
402                              struct ldlm_namespace *ns,
403                              struct ldlm_lock_desc *ld,
404                              struct list_head *cancels, int count,
405                              struct ldlm_lock *lock,
406                              ldlm_cancel_flags_t cancel_flags)
407 {
408         init_completion(&blwi->blwi_comp);
409         INIT_LIST_HEAD(&blwi->blwi_head);
410
411         if (memory_pressure_get())
412                 blwi->blwi_mem_pressure = 1;
413
414         blwi->blwi_ns = ns;
415         blwi->blwi_flags = cancel_flags;
416         if (ld != NULL)
417                 blwi->blwi_ld = *ld;
418         if (count) {
419                 list_add(&blwi->blwi_head, cancels);
420                 list_del_init(cancels);
421                 blwi->blwi_count = count;
422         } else {
423                 blwi->blwi_lock = lock;
424         }
425 }
426
427 /**
428  * Queues a list of locks \a cancels containing \a count locks
429  * for later processing by a blocking thread.  If \a count is zero,
430  * then the lock referenced as \a lock is queued instead.
431  *
432  * The blocking thread would then call ->l_blocking_ast callback in the lock.
433  * If list addition fails an error is returned and caller is supposed to
434  * call ->l_blocking_ast itself.
435  */
436 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
437                              struct ldlm_lock_desc *ld,
438                              struct ldlm_lock *lock,
439                              struct list_head *cancels, int count,
440                              ldlm_cancel_flags_t cancel_flags)
441 {
442         if (cancels && count == 0)
443                 return 0;
444
445         if (cancel_flags & LCF_ASYNC) {
446                 struct ldlm_bl_work_item *blwi;
447
448                 OBD_ALLOC(blwi, sizeof(*blwi));
449                 if (blwi == NULL)
450                         return -ENOMEM;
451                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
452
453                 return __ldlm_bl_to_thread(blwi, cancel_flags);
454         } else {
455                 /* if it is synchronous call do minimum mem alloc, as it could
456                  * be triggered from kernel shrinker
457                  */
458                 struct ldlm_bl_work_item blwi;
459
460                 memset(&blwi, 0, sizeof(blwi));
461                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
462                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
463         }
464 }
465
466
467 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
468                            struct ldlm_lock *lock)
469 {
470         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
471 }
472
473 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
474                            struct list_head *cancels, int count,
475                            ldlm_cancel_flags_t cancel_flags)
476 {
477         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
478 }
479
480 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
481 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
482 {
483         struct obd_device *obd = req->rq_export->exp_obd;
484         char *key;
485         void *val;
486         int keylen, vallen;
487         int rc = -ENOSYS;
488
489         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
490
491         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
492
493         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
494         if (key == NULL) {
495                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
496                 return -EFAULT;
497         }
498         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
499                                       RCL_CLIENT);
500         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
501         if (val == NULL) {
502                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
503                 return -EFAULT;
504         }
505         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
506                                       RCL_CLIENT);
507
508         /* We are responsible for swabbing contents of val */
509
510         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
511                 /* Pass it on to mdc (the "export" in this case) */
512                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
513                                         req->rq_export,
514                                         sizeof(KEY_HSM_COPYTOOL_SEND),
515                                         KEY_HSM_COPYTOOL_SEND,
516                                         vallen, val, NULL);
517         else
518                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
519
520         return rc;
521 }
522
523 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
524                                         const char *msg, int rc,
525                                         struct lustre_handle *handle)
526 {
527         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
528                   "%s: [nid %s] [rc %d] [lock "LPX64"]",
529                   msg, libcfs_id2str(req->rq_peer), rc,
530                   handle ? handle->cookie : 0);
531         if (req->rq_no_reply)
532                 CWARN("No reply was sent, maybe cause bug 21636.\n");
533         else if (rc)
534                 CWARN("Send reply failed, maybe cause bug 21636.\n");
535 }
536
537 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
538 {
539         struct obd_quotactl *oqctl;
540         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
541
542         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
543         if (oqctl == NULL) {
544                 CERROR("Can't unpack obd_quotactl\n");
545                 return -EPROTO;
546         }
547
548         oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
549
550         cli->cl_qchk_stat = oqctl->qc_stat;
551         return 0;
552 }
553
554 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
555 static int ldlm_callback_handler(struct ptlrpc_request *req)
556 {
557         struct ldlm_namespace *ns;
558         struct ldlm_request *dlm_req;
559         struct ldlm_lock *lock;
560         int rc;
561
562         /* Requests arrive in sender's byte order.  The ptlrpc service
563          * handler has already checked and, if necessary, byte-swapped the
564          * incoming request message body, but I am responsible for the
565          * message buffers. */
566
567         /* do nothing for sec context finalize */
568         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
569                 return 0;
570
571         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
572
573         if (req->rq_export == NULL) {
574                 rc = ldlm_callback_reply(req, -ENOTCONN);
575                 ldlm_callback_errmsg(req, "Operate on unconnected server",
576                                      rc, NULL);
577                 return 0;
578         }
579
580         LASSERT(req->rq_export != NULL);
581         LASSERT(req->rq_export->exp_obd != NULL);
582
583         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
584         case LDLM_BL_CALLBACK:
585                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
586                         return 0;
587                 break;
588         case LDLM_CP_CALLBACK:
589                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
590                         return 0;
591                 break;
592         case LDLM_GL_CALLBACK:
593                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
594                         return 0;
595                 break;
596         case LDLM_SET_INFO:
597                 rc = ldlm_handle_setinfo(req);
598                 ldlm_callback_reply(req, rc);
599                 return 0;
600         case OBD_QC_CALLBACK:
601                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
602                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
603                         return 0;
604                 rc = ldlm_handle_qc_callback(req);
605                 ldlm_callback_reply(req, rc);
606                 return 0;
607         default:
608                 CERROR("unknown opcode %u\n",
609                        lustre_msg_get_opc(req->rq_reqmsg));
610                 ldlm_callback_reply(req, -EPROTO);
611                 return 0;
612         }
613
614         ns = req->rq_export->exp_obd->obd_namespace;
615         LASSERT(ns != NULL);
616
617         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
618
619         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
620         if (dlm_req == NULL) {
621                 rc = ldlm_callback_reply(req, -EPROTO);
622                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
623                                      NULL);
624                 return 0;
625         }
626
627         /* Force a known safe race, send a cancel to the server for a lock
628          * which the server has already started a blocking callback on. */
629         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
630             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
631                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
632                 if (rc < 0)
633                         CERROR("ldlm_cli_cancel: %d\n", rc);
634         }
635
636         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
637         if (!lock) {
638                 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
639                        "disappeared\n", dlm_req->lock_handle[0].cookie);
640                 rc = ldlm_callback_reply(req, -EINVAL);
641                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
642                                      &dlm_req->lock_handle[0]);
643                 return 0;
644         }
645
646         if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
647             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
648                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
649
650         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
651         lock_res_and_lock(lock);
652         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
653                                               LDLM_AST_FLAGS);
654         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
655                 /* If somebody cancels lock and cache is already dropped,
656                  * or lock is failed before cp_ast received on client,
657                  * we can tell the server we have no lock. Otherwise, we
658                  * should send cancel after dropping the cache. */
659                 if (((lock->l_flags & LDLM_FL_CANCELING) &&
660                     (lock->l_flags & LDLM_FL_BL_DONE)) ||
661                     (lock->l_flags & LDLM_FL_FAILED)) {
662                         LDLM_DEBUG(lock, "callback on lock "
663                                    LPX64" - lock disappeared\n",
664                                    dlm_req->lock_handle[0].cookie);
665                         unlock_res_and_lock(lock);
666                         LDLM_LOCK_RELEASE(lock);
667                         rc = ldlm_callback_reply(req, -EINVAL);
668                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
669                                              &dlm_req->lock_handle[0]);
670                         return 0;
671                 }
672                 /* BL_AST locks are not needed in LRU.
673                  * Let ldlm_cancel_lru() be fast. */
674                 ldlm_lock_remove_from_lru(lock);
675                 lock->l_flags |= LDLM_FL_BL_AST;
676         }
677         unlock_res_and_lock(lock);
678
679         /* We want the ost thread to get this reply so that it can respond
680          * to ost requests (write cache writeback) that might be triggered
681          * in the callback.
682          *
683          * But we'd also like to be able to indicate in the reply that we're
684          * cancelling right now, because it's unused, or have an intent result
685          * in the reply, so we might have to push the responsibility for sending
686          * the reply down into the AST handlers, alas. */
687
688         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
689         case LDLM_BL_CALLBACK:
690                 CDEBUG(D_INODE, "blocking ast\n");
691                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
692                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
693                         rc = ldlm_callback_reply(req, 0);
694                         if (req->rq_no_reply || rc)
695                                 ldlm_callback_errmsg(req, "Normal process", rc,
696                                                      &dlm_req->lock_handle[0]);
697                 }
698                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
699                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
700                 break;
701         case LDLM_CP_CALLBACK:
702                 CDEBUG(D_INODE, "completion ast\n");
703                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
704                 ldlm_callback_reply(req, 0);
705                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
706                 break;
707         case LDLM_GL_CALLBACK:
708                 CDEBUG(D_INODE, "glimpse ast\n");
709                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
710                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
711                 break;
712         default:
713                 LBUG();                  /* checked above */
714         }
715
716         return 0;
717 }
718
719
720 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
721 {
722         struct ldlm_bl_work_item *blwi = NULL;
723         static unsigned int num_bl = 0;
724
725         spin_lock(&blp->blp_lock);
726         /* process a request from the blp_list at least every blp_num_threads */
727         if (!list_empty(&blp->blp_list) &&
728             (list_empty(&blp->blp_prio_list) || num_bl == 0))
729                 blwi = list_entry(blp->blp_list.next,
730                                       struct ldlm_bl_work_item, blwi_entry);
731         else
732                 if (!list_empty(&blp->blp_prio_list))
733                         blwi = list_entry(blp->blp_prio_list.next,
734                                               struct ldlm_bl_work_item,
735                                               blwi_entry);
736
737         if (blwi) {
738                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
739                         num_bl = 0;
740                 list_del(&blwi->blwi_entry);
741         }
742         spin_unlock(&blp->blp_lock);
743
744         return blwi;
745 }
746
747 /* This only contains temporary data until the thread starts */
748 struct ldlm_bl_thread_data {
749         char                    bltd_name[CFS_CURPROC_COMM_MAX];
750         struct ldlm_bl_pool     *bltd_blp;
751         struct completion       bltd_comp;
752         int                     bltd_num;
753 };
754
755 static int ldlm_bl_thread_main(void *arg);
756
757 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
758 {
759         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
760         struct task_struct *task;
761
762         init_completion(&bltd.bltd_comp);
763         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
764         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
765                 "ldlm_bl_%02d", bltd.bltd_num);
766         task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
767         if (IS_ERR(task)) {
768                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
769                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
770                 return PTR_ERR(task);
771         }
772         wait_for_completion(&bltd.bltd_comp);
773
774         return 0;
775 }
776
777 /**
778  * Main blocking requests processing thread.
779  *
780  * Callers put locks into its queue by calling ldlm_bl_to_thread.
781  * This thread in the end ends up doing actual call to ->l_blocking_ast
782  * for queued locks.
783  */
784 static int ldlm_bl_thread_main(void *arg)
785 {
786         struct ldlm_bl_pool *blp;
787
788         {
789                 struct ldlm_bl_thread_data *bltd = arg;
790
791                 blp = bltd->bltd_blp;
792
793                 atomic_inc(&blp->blp_num_threads);
794                 atomic_inc(&blp->blp_busy_threads);
795
796                 complete(&bltd->bltd_comp);
797                 /* cannot use bltd after this, it is only on caller's stack */
798         }
799
800         while (1) {
801                 struct l_wait_info lwi = { 0 };
802                 struct ldlm_bl_work_item *blwi = NULL;
803                 int busy;
804
805                 blwi = ldlm_bl_get_work(blp);
806
807                 if (blwi == NULL) {
808                         atomic_dec(&blp->blp_busy_threads);
809                         l_wait_event_exclusive(blp->blp_waitq,
810                                          (blwi = ldlm_bl_get_work(blp)) != NULL,
811                                          &lwi);
812                         busy = atomic_inc_return(&blp->blp_busy_threads);
813                 } else {
814                         busy = atomic_read(&blp->blp_busy_threads);
815                 }
816
817                 if (blwi->blwi_ns == NULL)
818                         /* added by ldlm_cleanup() */
819                         break;
820
821                 /* Not fatal if racy and have a few too many threads */
822                 if (unlikely(busy < blp->blp_max_threads &&
823                              busy >= atomic_read(&blp->blp_num_threads) &&
824                              !blwi->blwi_mem_pressure))
825                         /* discard the return value, we tried */
826                         ldlm_bl_thread_start(blp);
827
828                 if (blwi->blwi_mem_pressure)
829                         memory_pressure_set();
830
831                 if (blwi->blwi_count) {
832                         int count;
833                         /* The special case when we cancel locks in LRU
834                          * asynchronously, we pass the list of locks here.
835                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
836                          * canceled locally yet. */
837                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
838                                                            blwi->blwi_count,
839                                                            LCF_BL_AST);
840                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
841                                              blwi->blwi_flags);
842                 } else {
843                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
844                                                 blwi->blwi_lock);
845                 }
846                 if (blwi->blwi_mem_pressure)
847                         memory_pressure_clr();
848
849                 if (blwi->blwi_flags & LCF_ASYNC)
850                         OBD_FREE(blwi, sizeof(*blwi));
851                 else
852                         complete(&blwi->blwi_comp);
853         }
854
855         atomic_dec(&blp->blp_busy_threads);
856         atomic_dec(&blp->blp_num_threads);
857         complete(&blp->blp_comp);
858         return 0;
859 }
860
861
862 static int ldlm_setup(void);
863 static int ldlm_cleanup(void);
864
865 int ldlm_get_ref(void)
866 {
867         int rc = 0;
868
869         mutex_lock(&ldlm_ref_mutex);
870         if (++ldlm_refcount == 1) {
871                 rc = ldlm_setup();
872                 if (rc)
873                         ldlm_refcount--;
874         }
875         mutex_unlock(&ldlm_ref_mutex);
876
877         return rc;
878 }
879 EXPORT_SYMBOL(ldlm_get_ref);
880
881 void ldlm_put_ref(void)
882 {
883         mutex_lock(&ldlm_ref_mutex);
884         if (ldlm_refcount == 1) {
885                 int rc = ldlm_cleanup();
886                 if (rc)
887                         CERROR("ldlm_cleanup failed: %d\n", rc);
888                 else
889                         ldlm_refcount--;
890         } else {
891                 ldlm_refcount--;
892         }
893         mutex_unlock(&ldlm_ref_mutex);
894 }
895 EXPORT_SYMBOL(ldlm_put_ref);
896
897 /*
898  * Export handle<->lock hash operations.
899  */
900 static unsigned
901 ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
902 {
903         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
904 }
905
906 static void *
907 ldlm_export_lock_key(struct hlist_node *hnode)
908 {
909         struct ldlm_lock *lock;
910
911         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
912         return &lock->l_remote_handle;
913 }
914
915 static void
916 ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
917 {
918         struct ldlm_lock     *lock;
919
920         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
921         lock->l_remote_handle = *(struct lustre_handle *)key;
922 }
923
924 static int
925 ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
926 {
927         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
928 }
929
930 static void *
931 ldlm_export_lock_object(struct hlist_node *hnode)
932 {
933         return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
934 }
935
936 static void
937 ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
938 {
939         struct ldlm_lock *lock;
940
941         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
942         LDLM_LOCK_GET(lock);
943 }
944
945 static void
946 ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
947 {
948         struct ldlm_lock *lock;
949
950         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
951         LDLM_LOCK_RELEASE(lock);
952 }
953
954 static cfs_hash_ops_t ldlm_export_lock_ops = {
955         .hs_hash        = ldlm_export_lock_hash,
956         .hs_key  = ldlm_export_lock_key,
957         .hs_keycmp      = ldlm_export_lock_keycmp,
958         .hs_keycpy      = ldlm_export_lock_keycpy,
959         .hs_object      = ldlm_export_lock_object,
960         .hs_get  = ldlm_export_lock_get,
961         .hs_put  = ldlm_export_lock_put,
962         .hs_put_locked  = ldlm_export_lock_put,
963 };
964
965 int ldlm_init_export(struct obd_export *exp)
966 {
967         int rc;
968         exp->exp_lock_hash =
969                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
970                                 HASH_EXP_LOCK_CUR_BITS,
971                                 HASH_EXP_LOCK_MAX_BITS,
972                                 HASH_EXP_LOCK_BKT_BITS, 0,
973                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
974                                 &ldlm_export_lock_ops,
975                                 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
976                                 CFS_HASH_NBLK_CHANGE);
977
978         if (!exp->exp_lock_hash)
979                 return -ENOMEM;
980
981         rc = ldlm_init_flock_export(exp);
982         if (rc)
983                 GOTO(err, rc);
984
985         return 0;
986 err:
987         ldlm_destroy_export(exp);
988         return rc;
989 }
990 EXPORT_SYMBOL(ldlm_init_export);
991
992 void ldlm_destroy_export(struct obd_export *exp)
993 {
994         cfs_hash_putref(exp->exp_lock_hash);
995         exp->exp_lock_hash = NULL;
996
997         ldlm_destroy_flock_export(exp);
998 }
999 EXPORT_SYMBOL(ldlm_destroy_export);
1000
1001 static int ldlm_setup(void)
1002 {
1003         static struct ptlrpc_service_conf       conf;
1004         struct ldlm_bl_pool                     *blp = NULL;
1005         int rc = 0;
1006         int i;
1007
1008         if (ldlm_state != NULL)
1009                 return -EALREADY;
1010
1011         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1012         if (ldlm_state == NULL)
1013                 return -ENOMEM;
1014
1015         rc = ldlm_proc_setup();
1016         if (rc != 0)
1017                 GOTO(out, rc);
1018
1019         memset(&conf, 0, sizeof(conf));
1020         conf = (typeof(conf)) {
1021                 .psc_name               = "ldlm_cbd",
1022                 .psc_watchdog_factor    = 2,
1023                 .psc_buf                = {
1024                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
1025                         .bc_buf_size            = LDLM_BUFSIZE,
1026                         .bc_req_max_size        = LDLM_MAXREQSIZE,
1027                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
1028                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
1029                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
1030                 },
1031                 .psc_thr                = {
1032                         .tc_thr_name            = "ldlm_cb",
1033                         .tc_thr_factor          = LDLM_THR_FACTOR,
1034                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
1035                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
1036                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
1037                         .tc_nthrs_user          = ldlm_num_threads,
1038                         .tc_cpu_affinity        = 1,
1039                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
1040                 },
1041                 .psc_cpt                = {
1042                         .cc_pattern             = ldlm_cpts,
1043                 },
1044                 .psc_ops                = {
1045                         .so_req_handler         = ldlm_callback_handler,
1046                 },
1047         };
1048         ldlm_state->ldlm_cb_service = \
1049                         ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
1050         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
1051                 CERROR("failed to start service\n");
1052                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
1053                 ldlm_state->ldlm_cb_service = NULL;
1054                 GOTO(out, rc);
1055         }
1056
1057
1058         OBD_ALLOC(blp, sizeof(*blp));
1059         if (blp == NULL)
1060                 GOTO(out, rc = -ENOMEM);
1061         ldlm_state->ldlm_bl_pool = blp;
1062
1063         spin_lock_init(&blp->blp_lock);
1064         INIT_LIST_HEAD(&blp->blp_list);
1065         INIT_LIST_HEAD(&blp->blp_prio_list);
1066         init_waitqueue_head(&blp->blp_waitq);
1067         atomic_set(&blp->blp_num_threads, 0);
1068         atomic_set(&blp->blp_busy_threads, 0);
1069
1070         if (ldlm_num_threads == 0) {
1071                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1072                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1073         } else {
1074                 blp->blp_min_threads = blp->blp_max_threads = \
1075                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1076                                                          ldlm_num_threads));
1077         }
1078
1079         for (i = 0; i < blp->blp_min_threads; i++) {
1080                 rc = ldlm_bl_thread_start(blp);
1081                 if (rc < 0)
1082                         GOTO(out, rc);
1083         }
1084
1085
1086         rc = ldlm_pools_init();
1087         if (rc) {
1088                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1089                 GOTO(out, rc);
1090         }
1091         return 0;
1092
1093  out:
1094         ldlm_cleanup();
1095         return rc;
1096 }
1097
1098 static int ldlm_cleanup(void)
1099 {
1100         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1101             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1102                 CERROR("ldlm still has namespaces; clean these up first.\n");
1103                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1104                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1105                 return -EBUSY;
1106         }
1107
1108         ldlm_pools_fini();
1109
1110         if (ldlm_state->ldlm_bl_pool != NULL) {
1111                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1112
1113                 while (atomic_read(&blp->blp_num_threads) > 0) {
1114                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1115
1116                         init_completion(&blp->blp_comp);
1117
1118                         spin_lock(&blp->blp_lock);
1119                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1120                         wake_up(&blp->blp_waitq);
1121                         spin_unlock(&blp->blp_lock);
1122
1123                         wait_for_completion(&blp->blp_comp);
1124                 }
1125
1126                 OBD_FREE(blp, sizeof(*blp));
1127         }
1128
1129         if (ldlm_state->ldlm_cb_service != NULL)
1130                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1131
1132         ldlm_proc_cleanup();
1133
1134
1135         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1136         ldlm_state = NULL;
1137
1138         return 0;
1139 }
1140
1141 int ldlm_init(void)
1142 {
1143         mutex_init(&ldlm_ref_mutex);
1144         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1145         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1146         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1147                                                sizeof(struct ldlm_resource), 0,
1148                                                SLAB_HWCACHE_ALIGN, NULL);
1149         if (ldlm_resource_slab == NULL)
1150                 return -ENOMEM;
1151
1152         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1153                               sizeof(struct ldlm_lock), 0,
1154                               SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1155         if (ldlm_lock_slab == NULL) {
1156                 kmem_cache_destroy(ldlm_resource_slab);
1157                 return -ENOMEM;
1158         }
1159
1160         ldlm_interval_slab = kmem_cache_create("interval_node",
1161                                         sizeof(struct ldlm_interval),
1162                                         0, SLAB_HWCACHE_ALIGN, NULL);
1163         if (ldlm_interval_slab == NULL) {
1164                 kmem_cache_destroy(ldlm_resource_slab);
1165                 kmem_cache_destroy(ldlm_lock_slab);
1166                 return -ENOMEM;
1167         }
1168 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1169         class_export_dump_hook = ldlm_dump_export_locks;
1170 #endif
1171         return 0;
1172 }
1173
1174 void ldlm_exit(void)
1175 {
1176         if (ldlm_refcount)
1177                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1178         kmem_cache_destroy(ldlm_resource_slab);
1179         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1180          * synchronize_rcu() to wait a grace period elapsed, so that
1181          * ldlm_lock_free() get a chance to be called. */
1182         synchronize_rcu();
1183         kmem_cache_destroy(ldlm_lock_slab);
1184         kmem_cache_destroy(ldlm_interval_slab);
1185 }