]> Pileus Git - ~andy/linux/blob - drivers/staging/lustre/lustre/mdc/mdc_locks.c
Merge branch 'acpi-assorted'
[~andy/linux] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40 # include <linux/pagemap.h>
41 # include <linux/miscdevice.h>
42 # include <linux/init.h>
43
44 #include <lustre_acl.h>
45 #include <obd_class.h>
46 #include <lustre_dlm.h>
47 /* fid_res_name_eq() */
48 #include <lustre_fid.h>
49 #include <lprocfs_status.h>
50 #include "mdc_internal.h"
51
52 struct mdc_getattr_args {
53         struct obd_export          *ga_exp;
54         struct md_enqueue_info      *ga_minfo;
55         struct ldlm_enqueue_info    *ga_einfo;
56 };
57
58 int it_disposition(struct lookup_intent *it, int flag)
59 {
60         return it->d.lustre.it_disposition & flag;
61 }
62 EXPORT_SYMBOL(it_disposition);
63
64 void it_set_disposition(struct lookup_intent *it, int flag)
65 {
66         it->d.lustre.it_disposition |= flag;
67 }
68 EXPORT_SYMBOL(it_set_disposition);
69
70 void it_clear_disposition(struct lookup_intent *it, int flag)
71 {
72         it->d.lustre.it_disposition &= ~flag;
73 }
74 EXPORT_SYMBOL(it_clear_disposition);
75
76 int it_open_error(int phase, struct lookup_intent *it)
77 {
78         if (it_disposition(it, DISP_OPEN_OPEN)) {
79                 if (phase >= DISP_OPEN_OPEN)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_OPEN_CREATE)) {
86                 if (phase >= DISP_OPEN_CREATE)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91
92         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
93                 if (phase >= DISP_LOOKUP_EXECD)
94                         return it->d.lustre.it_status;
95                 else
96                         return 0;
97         }
98
99         if (it_disposition(it, DISP_IT_EXECD)) {
100                 if (phase >= DISP_IT_EXECD)
101                         return it->d.lustre.it_status;
102                 else
103                         return 0;
104         }
105         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
106                it->d.lustre.it_status);
107         LBUG();
108         return 0;
109 }
110 EXPORT_SYMBOL(it_open_error);
111
112 /* this must be called on a lockh that is known to have a referenced lock */
113 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
114                       __u64 *bits)
115 {
116         struct ldlm_lock *lock;
117         struct inode *new_inode = data;
118
119         if(bits)
120                 *bits = 0;
121
122         if (!*lockh)
123                 return 0;
124
125         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
126
127         LASSERT(lock != NULL);
128         lock_res_and_lock(lock);
129         if (lock->l_resource->lr_lvb_inode &&
130             lock->l_resource->lr_lvb_inode != data) {
131                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
132                 LASSERTF(old_inode->i_state & I_FREEING,
133                          "Found existing inode %p/%lu/%u state %lu in lock: "
134                          "setting data to %p/%lu/%u\n", old_inode,
135                          old_inode->i_ino, old_inode->i_generation,
136                          old_inode->i_state,
137                          new_inode, new_inode->i_ino, new_inode->i_generation);
138         }
139         lock->l_resource->lr_lvb_inode = new_inode;
140         if (bits)
141                 *bits = lock->l_policy_data.l_inodebits.bits;
142
143         unlock_res_and_lock(lock);
144         LDLM_LOCK_PUT(lock);
145
146         return 0;
147 }
148
149 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
150                            const struct lu_fid *fid, ldlm_type_t type,
151                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
152                            struct lustre_handle *lockh)
153 {
154         struct ldlm_res_id res_id;
155         ldlm_mode_t rc;
156
157         fid_build_reg_res_name(fid, &res_id);
158         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
159                              &res_id, type, policy, mode, lockh, 0);
160         return rc;
161 }
162
163 int mdc_cancel_unused(struct obd_export *exp,
164                       const struct lu_fid *fid,
165                       ldlm_policy_data_t *policy,
166                       ldlm_mode_t mode,
167                       ldlm_cancel_flags_t flags,
168                       void *opaque)
169 {
170         struct ldlm_res_id res_id;
171         struct obd_device *obd = class_exp2obd(exp);
172         int rc;
173
174         fid_build_reg_res_name(fid, &res_id);
175         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
176                                              policy, mode, flags, opaque);
177         return rc;
178 }
179
180 int mdc_null_inode(struct obd_export *exp,
181                    const struct lu_fid *fid)
182 {
183         struct ldlm_res_id res_id;
184         struct ldlm_resource *res;
185         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
186
187         LASSERTF(ns != NULL, "no namespace passed\n");
188
189         fid_build_reg_res_name(fid, &res_id);
190
191         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
192         if(res == NULL)
193                 return 0;
194
195         lock_res(res);
196         res->lr_lvb_inode = NULL;
197         unlock_res(res);
198
199         ldlm_resource_putref(res);
200         return 0;
201 }
202
203 /* find any ldlm lock of the inode in mdc
204  * return 0    not find
205  *      1    find one
206  *      < 0    error */
207 int mdc_find_cbdata(struct obd_export *exp,
208                     const struct lu_fid *fid,
209                     ldlm_iterator_t it, void *data)
210 {
211         struct ldlm_res_id res_id;
212         int rc = 0;
213
214         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
215         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
216                                    it, data);
217         if (rc == LDLM_ITER_STOP)
218                 return 1;
219         else if (rc == LDLM_ITER_CONTINUE)
220                 return 0;
221         return rc;
222 }
223
224 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
225 {
226         /* Don't hold error requests for replay. */
227         if (req->rq_replay) {
228                 spin_lock(&req->rq_lock);
229                 req->rq_replay = 0;
230                 spin_unlock(&req->rq_lock);
231         }
232         if (rc && req->rq_transno != 0) {
233                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
234                 LBUG();
235         }
236 }
237
238 /* Save a large LOV EA into the request buffer so that it is available
239  * for replay.  We don't do this in the initial request because the
240  * original request doesn't need this buffer (at most it sends just the
241  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
242  * buffer and may also be difficult to allocate and save a very large
243  * request buffer for each open. (bug 5707)
244  *
245  * OOM here may cause recovery failure if lmm is needed (only for the
246  * original open if the MDS crashed just when this client also OOM'd)
247  * but this is incredibly unlikely, and questionable whether the client
248  * could do MDS recovery under OOM anyways... */
249 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
250                                 struct mdt_body *body)
251 {
252         int     rc;
253
254         /* FIXME: remove this explicit offset. */
255         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
256                                         body->eadatasize);
257         if (rc) {
258                 CERROR("Can't enlarge segment %d size to %d\n",
259                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
260                 body->valid &= ~OBD_MD_FLEASIZE;
261                 body->eadatasize = 0;
262         }
263 }
264
265 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
266                                                    struct lookup_intent *it,
267                                                    struct md_op_data *op_data,
268                                                    void *lmm, int lmmsize,
269                                                    void *cb_data)
270 {
271         struct ptlrpc_request *req;
272         struct obd_device     *obddev = class_exp2obd(exp);
273         struct ldlm_intent    *lit;
274         LIST_HEAD(cancels);
275         int                 count = 0;
276         int                 mode;
277         int                 rc;
278
279         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
280
281         /* XXX: openlock is not cancelled for cross-refs. */
282         /* If inode is known, cancel conflicting OPEN locks. */
283         if (fid_is_sane(&op_data->op_fid2)) {
284                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
285                         mode = LCK_CW;
286 #ifdef FMODE_EXEC
287                 else if (it->it_flags & FMODE_EXEC)
288                         mode = LCK_PR;
289 #endif
290                 else
291                         mode = LCK_CR;
292                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
293                                                 &cancels, mode,
294                                                 MDS_INODELOCK_OPEN);
295         }
296
297         /* If CREATE, cancel parent's UPDATE lock. */
298         if (it->it_op & IT_CREAT)
299                 mode = LCK_EX;
300         else
301                 mode = LCK_CR;
302         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
303                                          &cancels, mode,
304                                          MDS_INODELOCK_UPDATE);
305
306         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
307                                    &RQF_LDLM_INTENT_OPEN);
308         if (req == NULL) {
309                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
310                 return ERR_PTR(-ENOMEM);
311         }
312
313         /* parent capability */
314         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
315         /* child capability, reserve the size according to parent capa, it will
316          * be filled after we get the reply */
317         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
320                              op_data->op_namelen + 1);
321         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
322                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
323
324         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
325         if (rc) {
326                 ptlrpc_request_free(req);
327                 return NULL;
328         }
329
330         spin_lock(&req->rq_lock);
331         req->rq_replay = req->rq_import->imp_replayable;
332         spin_unlock(&req->rq_lock);
333
334         /* pack the intent */
335         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
336         lit->opc = (__u64)it->it_op;
337
338         /* pack the intended request */
339         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
340                       lmmsize);
341
342         /* for remote client, fetch remote perm for current user */
343         if (client_is_remote(exp))
344                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
345                                      sizeof(struct mdt_remote_perm));
346         ptlrpc_request_set_replen(req);
347         return req;
348 }
349
350 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
351                                                      struct lookup_intent *it,
352                                                      struct md_op_data *op_data)
353 {
354         struct ptlrpc_request *req;
355         struct obd_device     *obddev = class_exp2obd(exp);
356         struct ldlm_intent    *lit;
357         int                 rc;
358
359         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
360                                    &RQF_LDLM_INTENT_UNLINK);
361         if (req == NULL)
362                 return ERR_PTR(-ENOMEM);
363
364         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
365         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
366                              op_data->op_namelen + 1);
367
368         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
369         if (rc) {
370                 ptlrpc_request_free(req);
371                 return ERR_PTR(rc);
372         }
373
374         /* pack the intent */
375         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
376         lit->opc = (__u64)it->it_op;
377
378         /* pack the intended request */
379         mdc_unlink_pack(req, op_data);
380
381         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
382                              obddev->u.cli.cl_max_mds_easize);
383         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
384                              obddev->u.cli.cl_max_mds_cookiesize);
385         ptlrpc_request_set_replen(req);
386         return req;
387 }
388
389 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
390                                                       struct lookup_intent *it,
391                                                       struct md_op_data *op_data)
392 {
393         struct ptlrpc_request *req;
394         struct obd_device     *obddev = class_exp2obd(exp);
395         obd_valid             valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
396                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
397                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
398                                        (client_is_remote(exp) ?
399                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
400         struct ldlm_intent    *lit;
401         int                 rc;
402
403         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
404                                    &RQF_LDLM_INTENT_GETATTR);
405         if (req == NULL)
406                 return ERR_PTR(-ENOMEM);
407
408         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
409         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
410                              op_data->op_namelen + 1);
411
412         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
413         if (rc) {
414                 ptlrpc_request_free(req);
415                 return ERR_PTR(rc);
416         }
417
418         /* pack the intent */
419         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
420         lit->opc = (__u64)it->it_op;
421
422         /* pack the intended request */
423         mdc_getattr_pack(req, valid, it->it_flags, op_data,
424                          obddev->u.cli.cl_max_mds_easize);
425
426         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
427                              obddev->u.cli.cl_max_mds_easize);
428         if (client_is_remote(exp))
429                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
430                                      sizeof(struct mdt_remote_perm));
431         ptlrpc_request_set_replen(req);
432         return req;
433 }
434
435 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
436                                                      struct lookup_intent *it,
437                                                      struct md_op_data *unused)
438 {
439         struct obd_device     *obd = class_exp2obd(exp);
440         struct ptlrpc_request *req;
441         struct ldlm_intent    *lit;
442         struct layout_intent  *layout;
443         int rc;
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
446                                 &RQF_LDLM_INTENT_LAYOUT);
447         if (req == NULL)
448                 return ERR_PTR(-ENOMEM);
449
450         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
451         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452         if (rc) {
453                 ptlrpc_request_free(req);
454                 return ERR_PTR(rc);
455         }
456
457         /* pack the intent */
458         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
459         lit->opc = (__u64)it->it_op;
460
461         /* pack the layout intent request */
462         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
463         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
464          * set for replication */
465         layout->li_opc = LAYOUT_INTENT_ACCESS;
466
467         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
468                         obd->u.cli.cl_max_mds_easize);
469         ptlrpc_request_set_replen(req);
470         return req;
471 }
472
473 static struct ptlrpc_request *
474 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
475 {
476         struct ptlrpc_request *req;
477         int rc;
478
479         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
480         if (req == NULL)
481                 return ERR_PTR(-ENOMEM);
482
483         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
484         if (rc) {
485                 ptlrpc_request_free(req);
486                 return ERR_PTR(rc);
487         }
488
489         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
490         ptlrpc_request_set_replen(req);
491         return req;
492 }
493
494 static int mdc_finish_enqueue(struct obd_export *exp,
495                               struct ptlrpc_request *req,
496                               struct ldlm_enqueue_info *einfo,
497                               struct lookup_intent *it,
498                               struct lustre_handle *lockh,
499                               int rc)
500 {
501         struct req_capsule  *pill = &req->rq_pill;
502         struct ldlm_request *lockreq;
503         struct ldlm_reply   *lockrep;
504         struct lustre_intent_data *intent = &it->d.lustre;
505         struct ldlm_lock    *lock;
506         void            *lvb_data = NULL;
507         int               lvb_len = 0;
508
509         LASSERT(rc >= 0);
510         /* Similarly, if we're going to replay this request, we don't want to
511          * actually get a lock, just perform the intent. */
512         if (req->rq_transno || req->rq_replay) {
513                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
514                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
515         }
516
517         if (rc == ELDLM_LOCK_ABORTED) {
518                 einfo->ei_mode = 0;
519                 memset(lockh, 0, sizeof(*lockh));
520                 rc = 0;
521         } else { /* rc = 0 */
522                 lock = ldlm_handle2lock(lockh);
523                 LASSERT(lock != NULL);
524
525                 /* If the server gave us back a different lock mode, we should
526                  * fix up our variables. */
527                 if (lock->l_req_mode != einfo->ei_mode) {
528                         ldlm_lock_addref(lockh, lock->l_req_mode);
529                         ldlm_lock_decref(lockh, einfo->ei_mode);
530                         einfo->ei_mode = lock->l_req_mode;
531                 }
532                 LDLM_LOCK_PUT(lock);
533         }
534
535         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
536         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
537
538         intent->it_disposition = (int)lockrep->lock_policy_res1;
539         intent->it_status = (int)lockrep->lock_policy_res2;
540         intent->it_lock_mode = einfo->ei_mode;
541         intent->it_lock_handle = lockh->cookie;
542         intent->it_data = req;
543
544         /* Technically speaking rq_transno must already be zero if
545          * it_status is in error, so the check is a bit redundant */
546         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
547                 mdc_clear_replay_flag(req, intent->it_status);
548
549         /* If we're doing an IT_OPEN which did not result in an actual
550          * successful open, then we need to remove the bit which saves
551          * this request for unconditional replay.
552          *
553          * It's important that we do this first!  Otherwise we might exit the
554          * function without doing so, and try to replay a failed create
555          * (bug 3440) */
556         if (it->it_op & IT_OPEN && req->rq_replay &&
557             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
558                 mdc_clear_replay_flag(req, intent->it_status);
559
560         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
561                   it->it_op, intent->it_disposition, intent->it_status);
562
563         /* We know what to expect, so we do any byte flipping required here */
564         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
565                 struct mdt_body *body;
566
567                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
568                 if (body == NULL) {
569                         CERROR ("Can't swab mdt_body\n");
570                         return -EPROTO;
571                 }
572
573                 if (it_disposition(it, DISP_OPEN_OPEN) &&
574                     !it_open_error(DISP_OPEN_OPEN, it)) {
575                         /*
576                          * If this is a successful OPEN request, we need to set
577                          * replay handler and data early, so that if replay
578                          * happens immediately after swabbing below, new reply
579                          * is swabbed by that handler correctly.
580                          */
581                         mdc_set_open_replay_data(NULL, NULL, req);
582                 }
583
584                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
585                         void *eadata;
586
587                         mdc_update_max_ea_from_body(exp, body);
588
589                         /*
590                          * The eadata is opaque; just check that it is there.
591                          * Eventually, obd_unpackmd() will check the contents.
592                          */
593                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
594                                                               body->eadatasize);
595                         if (eadata == NULL)
596                                 return -EPROTO;
597
598                         /* save lvb data and length in case this is for layout
599                          * lock */
600                         lvb_data = eadata;
601                         lvb_len = body->eadatasize;
602
603                         /*
604                          * We save the reply LOV EA in case we have to replay a
605                          * create for recovery.  If we didn't allocate a large
606                          * enough request buffer above we need to reallocate it
607                          * here to hold the actual LOV EA.
608                          *
609                          * To not save LOV EA if request is not going to replay
610                          * (for example error one).
611                          */
612                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
613                                 void *lmm;
614                                 if (req_capsule_get_size(pill, &RMF_EADATA,
615                                                          RCL_CLIENT) <
616                                     body->eadatasize)
617                                         mdc_realloc_openmsg(req, body);
618                                 else
619                                         req_capsule_shrink(pill, &RMF_EADATA,
620                                                            body->eadatasize,
621                                                            RCL_CLIENT);
622
623                                 req_capsule_set_size(pill, &RMF_EADATA,
624                                                      RCL_CLIENT,
625                                                      body->eadatasize);
626
627                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
628                                 if (lmm)
629                                         memcpy(lmm, eadata, body->eadatasize);
630                         }
631                 }
632
633                 if (body->valid & OBD_MD_FLRMTPERM) {
634                         struct mdt_remote_perm *perm;
635
636                         LASSERT(client_is_remote(exp));
637                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
638                                                 lustre_swab_mdt_remote_perm);
639                         if (perm == NULL)
640                                 return -EPROTO;
641                 }
642                 if (body->valid & OBD_MD_FLMDSCAPA) {
643                         struct lustre_capa *capa, *p;
644
645                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
646                         if (capa == NULL)
647                                 return -EPROTO;
648
649                         if (it->it_op & IT_OPEN) {
650                                 /* client fid capa will be checked in replay */
651                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
652                                 LASSERT(p);
653                                 *p = *capa;
654                         }
655                 }
656                 if (body->valid & OBD_MD_FLOSSCAPA) {
657                         struct lustre_capa *capa;
658
659                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
660                         if (capa == NULL)
661                                 return -EPROTO;
662                 }
663         } else if (it->it_op & IT_LAYOUT) {
664                 /* maybe the lock was granted right away and layout
665                  * is packed into RMF_DLM_LVB of req */
666                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
667                 if (lvb_len > 0) {
668                         lvb_data = req_capsule_server_sized_get(pill,
669                                                         &RMF_DLM_LVB, lvb_len);
670                         if (lvb_data == NULL)
671                                 return -EPROTO;
672                 }
673         }
674
675         /* fill in stripe data for layout lock */
676         lock = ldlm_handle2lock(lockh);
677         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
678                 void *lmm;
679
680                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
681                         ldlm_it2str(it->it_op), lvb_len);
682
683                 OBD_ALLOC_LARGE(lmm, lvb_len);
684                 if (lmm == NULL) {
685                         LDLM_LOCK_PUT(lock);
686                         return -ENOMEM;
687                 }
688                 memcpy(lmm, lvb_data, lvb_len);
689
690                 /* install lvb_data */
691                 lock_res_and_lock(lock);
692                 if (lock->l_lvb_data == NULL) {
693                         lock->l_lvb_data = lmm;
694                         lock->l_lvb_len = lvb_len;
695                         lmm = NULL;
696                 }
697                 unlock_res_and_lock(lock);
698                 if (lmm != NULL)
699                         OBD_FREE_LARGE(lmm, lvb_len);
700         }
701         if (lock != NULL)
702                 LDLM_LOCK_PUT(lock);
703
704         return rc;
705 }
706
707 /* We always reserve enough space in the reply packet for a stripe MD, because
708  * we don't know in advance the file type. */
709 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
710                 struct lookup_intent *it, struct md_op_data *op_data,
711                 struct lustre_handle *lockh, void *lmm, int lmmsize,
712                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
713 {
714         struct obd_device     *obddev = class_exp2obd(exp);
715         struct ptlrpc_request *req = NULL;
716         __u64             flags, saved_flags = extra_lock_flags;
717         int                 rc;
718         struct ldlm_res_id res_id;
719         static const ldlm_policy_data_t lookup_policy =
720                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
721         static const ldlm_policy_data_t update_policy =
722                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
723         static const ldlm_policy_data_t layout_policy =
724                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
725         ldlm_policy_data_t const *policy = &lookup_policy;
726         int                 generation, resends = 0;
727         struct ldlm_reply     *lockrep;
728         enum lvb_type          lvb_type = 0;
729
730         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
731                  einfo->ei_type);
732
733         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
734
735         if (it) {
736                 saved_flags |= LDLM_FL_HAS_INTENT;
737                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
738                         policy = &update_policy;
739                 else if (it->it_op & IT_LAYOUT)
740                         policy = &layout_policy;
741         }
742
743         LASSERT(reqp == NULL);
744
745         generation = obddev->u.cli.cl_import->imp_generation;
746 resend:
747         flags = saved_flags;
748         if (!it) {
749                 /* The only way right now is FLOCK, in this case we hide flock
750                    policy as lmm, but lmmsize is 0 */
751                 LASSERT(lmm && lmmsize == 0);
752                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
753                          einfo->ei_type);
754                 policy = (ldlm_policy_data_t *)lmm;
755                 res_id.name[3] = LDLM_FLOCK;
756         } else if (it->it_op & IT_OPEN) {
757                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
758                                            einfo->ei_cbdata);
759                 policy = &update_policy;
760                 einfo->ei_cbdata = NULL;
761                 lmm = NULL;
762         } else if (it->it_op & IT_UNLINK) {
763                 req = mdc_intent_unlink_pack(exp, it, op_data);
764         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
765                 req = mdc_intent_getattr_pack(exp, it, op_data);
766         } else if (it->it_op & IT_READDIR) {
767                 req = mdc_enqueue_pack(exp, 0);
768         } else if (it->it_op & IT_LAYOUT) {
769                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
770                         return -EOPNOTSUPP;
771
772                 req = mdc_intent_layout_pack(exp, it, op_data);
773                 lvb_type = LVB_T_LAYOUT;
774         } else {
775                 LBUG();
776                 return -EINVAL;
777         }
778
779         if (IS_ERR(req))
780                 return PTR_ERR(req);
781
782         if (req != NULL && it && it->it_op & IT_CREAT)
783                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
784                  * retry logic */
785                 req->rq_no_retry_einprogress = 1;
786
787         if (resends) {
788                 req->rq_generation_set = 1;
789                 req->rq_import_generation = generation;
790                 req->rq_sent = cfs_time_current_sec() + resends;
791         }
792
793         /* It is important to obtain rpc_lock first (if applicable), so that
794          * threads that are serialised with rpc_lock are not polluting our
795          * rpcs in flight counter. We do not do flock request limiting, though*/
796         if (it) {
797                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
798                 rc = mdc_enter_request(&obddev->u.cli);
799                 if (rc != 0) {
800                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
801                         mdc_clear_replay_flag(req, 0);
802                         ptlrpc_req_finished(req);
803                         return rc;
804                 }
805         }
806
807         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
808                               0, lvb_type, lockh, 0);
809         if (!it) {
810                 /* For flock requests we immediatelly return without further
811                    delay and let caller deal with the rest, since rest of
812                    this function metadata processing makes no sense for flock
813                    requests anyway. But in case of problem during comms with
814                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
815                    can not rely on caller and this mainly for F_UNLCKs
816                    (explicits or automatically generated by Kernel to clean
817                    current FLocks upon exit) that can't be trashed */
818                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
819                         goto resend;
820                 return rc;
821         }
822
823         mdc_exit_request(&obddev->u.cli);
824         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
825
826         if (rc < 0) {
827                 CERROR("ldlm_cli_enqueue: %d\n", rc);
828                 mdc_clear_replay_flag(req, rc);
829                 ptlrpc_req_finished(req);
830                 return rc;
831         }
832
833         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
834         LASSERT(lockrep != NULL);
835
836         lockrep->lock_policy_res2 =
837                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
838
839         /* Retry the create infinitely when we get -EINPROGRESS from
840          * server. This is required by the new quota design. */
841         if (it && it->it_op & IT_CREAT &&
842             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
843                 mdc_clear_replay_flag(req, rc);
844                 ptlrpc_req_finished(req);
845                 resends++;
846
847                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
848                        obddev->obd_name, resends, it->it_op,
849                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
850
851                 if (generation == obddev->u.cli.cl_import->imp_generation) {
852                         goto resend;
853                 } else {
854                         CDEBUG(D_HA, "resend cross eviction\n");
855                         return -EIO;
856                 }
857         }
858
859         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
860         if (rc < 0) {
861                 if (lustre_handle_is_used(lockh)) {
862                         ldlm_lock_decref(lockh, einfo->ei_mode);
863                         memset(lockh, 0, sizeof(*lockh));
864                 }
865                 ptlrpc_req_finished(req);
866         }
867         return rc;
868 }
869
870 static int mdc_finish_intent_lock(struct obd_export *exp,
871                                   struct ptlrpc_request *request,
872                                   struct md_op_data *op_data,
873                                   struct lookup_intent *it,
874                                   struct lustre_handle *lockh)
875 {
876         struct lustre_handle old_lock;
877         struct mdt_body *mdt_body;
878         struct ldlm_lock *lock;
879         int rc;
880
881         LASSERT(request != NULL);
882         LASSERT(request != LP_POISON);
883         LASSERT(request->rq_repmsg != LP_POISON);
884
885         if (!it_disposition(it, DISP_IT_EXECD)) {
886                 /* The server failed before it even started executing the
887                  * intent, i.e. because it couldn't unpack the request. */
888                 LASSERT(it->d.lustre.it_status != 0);
889                 return it->d.lustre.it_status;
890         }
891         rc = it_open_error(DISP_IT_EXECD, it);
892         if (rc)
893                 return rc;
894
895         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
896         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
897
898         /* If we were revalidating a fid/name pair, mark the intent in
899          * case we fail and get called again from lookup */
900         if (fid_is_sane(&op_data->op_fid2) &&
901             it->it_create_mode & M_CHECK_STALE &&
902             it->it_op != IT_GETATTR) {
903                 it_set_disposition(it, DISP_ENQ_COMPLETE);
904
905                 /* Also: did we find the same inode? */
906                 /* sever can return one of two fids:
907                  * op_fid2 - new allocated fid - if file is created.
908                  * op_fid3 - existent fid - if file only open.
909                  * op_fid3 is saved in lmv_intent_open */
910                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
911                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
912                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
913                                "\n", PFID(&op_data->op_fid2),
914                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
915                         return -ESTALE;
916                 }
917         }
918
919         rc = it_open_error(DISP_LOOKUP_EXECD, it);
920         if (rc)
921                 return rc;
922
923         /* keep requests around for the multiple phases of the call
924          * this shows the DISP_XX must guarantee we make it into the call
925          */
926         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
927             it_disposition(it, DISP_OPEN_CREATE) &&
928             !it_open_error(DISP_OPEN_CREATE, it)) {
929                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
930                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
931         }
932         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
933             it_disposition(it, DISP_OPEN_OPEN) &&
934             !it_open_error(DISP_OPEN_OPEN, it)) {
935                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
936                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
937                 /* BUG 11546 - eviction in the middle of open rpc processing */
938                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
939         }
940
941         if (it->it_op & IT_CREAT) {
942                 /* XXX this belongs in ll_create_it */
943         } else if (it->it_op == IT_OPEN) {
944                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
945         } else {
946                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
947         }
948
949         /* If we already have a matching lock, then cancel the new
950          * one.  We have to set the data here instead of in
951          * mdc_enqueue, because we need to use the child's inode as
952          * the l_ast_data to match, and that's not available until
953          * intent_finish has performed the iget().) */
954         lock = ldlm_handle2lock(lockh);
955         if (lock) {
956                 ldlm_policy_data_t policy = lock->l_policy_data;
957                 LDLM_DEBUG(lock, "matching against this");
958
959                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
960                                          &lock->l_resource->lr_name),
961                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
962                          (unsigned long)lock->l_resource->lr_name.name[0],
963                          (unsigned long)lock->l_resource->lr_name.name[1],
964                          (unsigned long)lock->l_resource->lr_name.name[2],
965                          (unsigned long)fid_seq(&mdt_body->fid1),
966                          (unsigned long)fid_oid(&mdt_body->fid1),
967                          (unsigned long)fid_ver(&mdt_body->fid1));
968                 LDLM_LOCK_PUT(lock);
969
970                 memcpy(&old_lock, lockh, sizeof(*lockh));
971                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
972                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
973                         ldlm_lock_decref_and_cancel(lockh,
974                                                     it->d.lustre.it_lock_mode);
975                         memcpy(lockh, &old_lock, sizeof(old_lock));
976                         it->d.lustre.it_lock_handle = lockh->cookie;
977                 }
978         }
979         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
980                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
981                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
982         return rc;
983 }
984
985 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
986                         struct lu_fid *fid, __u64 *bits)
987 {
988         /* We could just return 1 immediately, but since we should only
989          * be called in revalidate_it if we already have a lock, let's
990          * verify that. */
991         struct ldlm_res_id res_id;
992         struct lustre_handle lockh;
993         ldlm_policy_data_t policy;
994         ldlm_mode_t mode;
995
996         if (it->d.lustre.it_lock_handle) {
997                 lockh.cookie = it->d.lustre.it_lock_handle;
998                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
999         } else {
1000                 fid_build_reg_res_name(fid, &res_id);
1001                 switch (it->it_op) {
1002                 case IT_GETATTR:
1003                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1004                         break;
1005                 case IT_LAYOUT:
1006                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1007                         break;
1008                 default:
1009                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1010                         break;
1011                 }
1012                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1013                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1014                                        LDLM_IBITS, &policy,
1015                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1016         }
1017
1018         if (mode) {
1019                 it->d.lustre.it_lock_handle = lockh.cookie;
1020                 it->d.lustre.it_lock_mode = mode;
1021         } else {
1022                 it->d.lustre.it_lock_handle = 0;
1023                 it->d.lustre.it_lock_mode = 0;
1024         }
1025
1026         return !!mode;
1027 }
1028
1029 /*
1030  * This long block is all about fixing up the lock and request state
1031  * so that it is correct as of the moment _before_ the operation was
1032  * applied; that way, the VFS will think that everything is normal and
1033  * call Lustre's regular VFS methods.
1034  *
1035  * If we're performing a creation, that means that unless the creation
1036  * failed with EEXIST, we should fake up a negative dentry.
1037  *
1038  * For everything else, we want to lookup to succeed.
1039  *
1040  * One additional note: if CREATE or OPEN succeeded, we add an extra
1041  * reference to the request because we need to keep it around until
1042  * ll_create/ll_open gets called.
1043  *
1044  * The server will return to us, in it_disposition, an indication of
1045  * exactly what d.lustre.it_status refers to.
1046  *
1047  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1048  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1049  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1050  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1051  * was successful.
1052  *
1053  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1054  * child lookup.
1055  */
1056 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1057                     void *lmm, int lmmsize, struct lookup_intent *it,
1058                     int lookup_flags, struct ptlrpc_request **reqp,
1059                     ldlm_blocking_callback cb_blocking,
1060                     __u64 extra_lock_flags)
1061 {
1062         struct lustre_handle lockh;
1063         int rc = 0;
1064
1065         LASSERT(it);
1066
1067         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1068                ", intent: %s flags %#o\n", op_data->op_namelen,
1069                op_data->op_name, PFID(&op_data->op_fid2),
1070                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1071                it->it_flags);
1072
1073         lockh.cookie = 0;
1074         if (fid_is_sane(&op_data->op_fid2) &&
1075             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1076                 /* We could just return 1 immediately, but since we should only
1077                  * be called in revalidate_it if we already have a lock, let's
1078                  * verify that. */
1079                 it->d.lustre.it_lock_handle = 0;
1080                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1081                 /* Only return failure if it was not GETATTR by cfid
1082                    (from inode_revalidate) */
1083                 if (rc || op_data->op_namelen != 0)
1084                         return rc;
1085         }
1086
1087         /* lookup_it may be called only after revalidate_it has run, because
1088          * revalidate_it cannot return errors, only zero.  Returning zero causes
1089          * this call to lookup, which *can* return an error.
1090          *
1091          * We only want to execute the request associated with the intent one
1092          * time, however, so don't send the request again.  Instead, skip past
1093          * this and use the request from revalidate.  In this case, revalidate
1094          * never dropped its reference, so the refcounts are all OK */
1095         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1096                 struct ldlm_enqueue_info einfo = {
1097                         .ei_type        = LDLM_IBITS,
1098                         .ei_mode        = it_to_lock_mode(it),
1099                         .ei_cb_bl       = cb_blocking,
1100                         .ei_cb_cp       = ldlm_completion_ast,
1101                 };
1102
1103                 /* For case if upper layer did not alloc fid, do it now. */
1104                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1105                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1106                         if (rc < 0) {
1107                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1108                                 return rc;
1109                         }
1110                 }
1111                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1112                                  lmm, lmmsize, NULL, extra_lock_flags);
1113                 if (rc < 0)
1114                         return rc;
1115         } else if (!fid_is_sane(&op_data->op_fid2) ||
1116                    !(it->it_create_mode & M_CHECK_STALE)) {
1117                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1118                  * request referenced from this intent, saved for subsequent
1119                  * lookup.  This path is executed when we proceed to this
1120                  * lookup, so we clear DISP_ENQ_COMPLETE */
1121                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1122         }
1123         *reqp = it->d.lustre.it_data;
1124         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1125         return rc;
1126 }
1127
1128 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1129                                               struct ptlrpc_request *req,
1130                                               void *args, int rc)
1131 {
1132         struct mdc_getattr_args  *ga = args;
1133         struct obd_export       *exp = ga->ga_exp;
1134         struct md_enqueue_info   *minfo = ga->ga_minfo;
1135         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1136         struct lookup_intent     *it;
1137         struct lustre_handle     *lockh;
1138         struct obd_device       *obddev;
1139         struct ldlm_reply        *lockrep;
1140         __u64                flags = LDLM_FL_HAS_INTENT;
1141
1142         it    = &minfo->mi_it;
1143         lockh = &minfo->mi_lockh;
1144
1145         obddev = class_exp2obd(exp);
1146
1147         mdc_exit_request(&obddev->u.cli);
1148         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1149                 rc = -ETIMEDOUT;
1150
1151         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1152                                    &flags, NULL, 0, lockh, rc);
1153         if (rc < 0) {
1154                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1155                 mdc_clear_replay_flag(req, rc);
1156                 GOTO(out, rc);
1157         }
1158
1159         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1160         LASSERT(lockrep != NULL);
1161
1162         lockrep->lock_policy_res2 =
1163                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1164
1165         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1166         if (rc)
1167                 GOTO(out, rc);
1168
1169         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1170
1171 out:
1172         OBD_FREE_PTR(einfo);
1173         minfo->mi_cb(req, minfo, rc);
1174         return 0;
1175 }
1176
1177 int mdc_intent_getattr_async(struct obd_export *exp,
1178                              struct md_enqueue_info *minfo,
1179                              struct ldlm_enqueue_info *einfo)
1180 {
1181         struct md_op_data       *op_data = &minfo->mi_data;
1182         struct lookup_intent    *it = &minfo->mi_it;
1183         struct ptlrpc_request   *req;
1184         struct mdc_getattr_args *ga;
1185         struct obd_device       *obddev = class_exp2obd(exp);
1186         struct ldlm_res_id       res_id;
1187         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1188          *     for statahead currently. Consider CMD in future, such two bits
1189          *     maybe managed by different MDS, should be adjusted then. */
1190         ldlm_policy_data_t       policy = {
1191                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1192                                                          MDS_INODELOCK_UPDATE }
1193                                  };
1194         int                   rc = 0;
1195         __u64               flags = LDLM_FL_HAS_INTENT;
1196
1197         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1198                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1199                ldlm_it2str(it->it_op), it->it_flags);
1200
1201         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1202         req = mdc_intent_getattr_pack(exp, it, op_data);
1203         if (!req)
1204                 return -ENOMEM;
1205
1206         rc = mdc_enter_request(&obddev->u.cli);
1207         if (rc != 0) {
1208                 ptlrpc_req_finished(req);
1209                 return rc;
1210         }
1211
1212         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1213                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1214         if (rc < 0) {
1215                 mdc_exit_request(&obddev->u.cli);
1216                 ptlrpc_req_finished(req);
1217                 return rc;
1218         }
1219
1220         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1221         ga = ptlrpc_req_async_args(req);
1222         ga->ga_exp = exp;
1223         ga->ga_minfo = minfo;
1224         ga->ga_einfo = einfo;
1225
1226         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1227         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1228
1229         return 0;
1230 }