4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 # include <linux/module.h>
40 # include <linux/pagemap.h>
41 # include <linux/miscdevice.h>
42 # include <linux/init.h>
44 #include <lustre_acl.h>
45 #include <obd_class.h>
46 #include <lustre_dlm.h>
47 /* fid_res_name_eq() */
48 #include <lustre_fid.h>
49 #include <lprocfs_status.h>
50 #include "mdc_internal.h"
52 struct mdc_getattr_args {
53 struct obd_export *ga_exp;
54 struct md_enqueue_info *ga_minfo;
55 struct ldlm_enqueue_info *ga_einfo;
58 int it_disposition(struct lookup_intent *it, int flag)
60 return it->d.lustre.it_disposition & flag;
62 EXPORT_SYMBOL(it_disposition);
64 void it_set_disposition(struct lookup_intent *it, int flag)
66 it->d.lustre.it_disposition |= flag;
68 EXPORT_SYMBOL(it_set_disposition);
70 void it_clear_disposition(struct lookup_intent *it, int flag)
72 it->d.lustre.it_disposition &= ~flag;
74 EXPORT_SYMBOL(it_clear_disposition);
76 int it_open_error(int phase, struct lookup_intent *it)
78 if (it_disposition(it, DISP_OPEN_OPEN)) {
79 if (phase >= DISP_OPEN_OPEN)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_OPEN_CREATE)) {
86 if (phase >= DISP_OPEN_CREATE)
87 return it->d.lustre.it_status;
92 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
93 if (phase >= DISP_LOOKUP_EXECD)
94 return it->d.lustre.it_status;
99 if (it_disposition(it, DISP_IT_EXECD)) {
100 if (phase >= DISP_IT_EXECD)
101 return it->d.lustre.it_status;
105 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
106 it->d.lustre.it_status);
110 EXPORT_SYMBOL(it_open_error);
112 /* this must be called on a lockh that is known to have a referenced lock */
113 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
116 struct ldlm_lock *lock;
117 struct inode *new_inode = data;
125 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
127 LASSERT(lock != NULL);
128 lock_res_and_lock(lock);
129 if (lock->l_resource->lr_lvb_inode &&
130 lock->l_resource->lr_lvb_inode != data) {
131 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
132 LASSERTF(old_inode->i_state & I_FREEING,
133 "Found existing inode %p/%lu/%u state %lu in lock: "
134 "setting data to %p/%lu/%u\n", old_inode,
135 old_inode->i_ino, old_inode->i_generation,
137 new_inode, new_inode->i_ino, new_inode->i_generation);
139 lock->l_resource->lr_lvb_inode = new_inode;
141 *bits = lock->l_policy_data.l_inodebits.bits;
143 unlock_res_and_lock(lock);
149 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
150 const struct lu_fid *fid, ldlm_type_t type,
151 ldlm_policy_data_t *policy, ldlm_mode_t mode,
152 struct lustre_handle *lockh)
154 struct ldlm_res_id res_id;
157 fid_build_reg_res_name(fid, &res_id);
158 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
159 &res_id, type, policy, mode, lockh, 0);
163 int mdc_cancel_unused(struct obd_export *exp,
164 const struct lu_fid *fid,
165 ldlm_policy_data_t *policy,
167 ldlm_cancel_flags_t flags,
170 struct ldlm_res_id res_id;
171 struct obd_device *obd = class_exp2obd(exp);
174 fid_build_reg_res_name(fid, &res_id);
175 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
176 policy, mode, flags, opaque);
180 int mdc_null_inode(struct obd_export *exp,
181 const struct lu_fid *fid)
183 struct ldlm_res_id res_id;
184 struct ldlm_resource *res;
185 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
187 LASSERTF(ns != NULL, "no namespace passed\n");
189 fid_build_reg_res_name(fid, &res_id);
191 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
196 res->lr_lvb_inode = NULL;
199 ldlm_resource_putref(res);
203 /* find any ldlm lock of the inode in mdc
207 int mdc_find_cbdata(struct obd_export *exp,
208 const struct lu_fid *fid,
209 ldlm_iterator_t it, void *data)
211 struct ldlm_res_id res_id;
214 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
215 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
217 if (rc == LDLM_ITER_STOP)
219 else if (rc == LDLM_ITER_CONTINUE)
224 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
226 /* Don't hold error requests for replay. */
227 if (req->rq_replay) {
228 spin_lock(&req->rq_lock);
230 spin_unlock(&req->rq_lock);
232 if (rc && req->rq_transno != 0) {
233 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
238 /* Save a large LOV EA into the request buffer so that it is available
239 * for replay. We don't do this in the initial request because the
240 * original request doesn't need this buffer (at most it sends just the
241 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
242 * buffer and may also be difficult to allocate and save a very large
243 * request buffer for each open. (bug 5707)
245 * OOM here may cause recovery failure if lmm is needed (only for the
246 * original open if the MDS crashed just when this client also OOM'd)
247 * but this is incredibly unlikely, and questionable whether the client
248 * could do MDS recovery under OOM anyways... */
249 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
250 struct mdt_body *body)
254 /* FIXME: remove this explicit offset. */
255 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
258 CERROR("Can't enlarge segment %d size to %d\n",
259 DLM_INTENT_REC_OFF + 4, body->eadatasize);
260 body->valid &= ~OBD_MD_FLEASIZE;
261 body->eadatasize = 0;
265 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
266 struct lookup_intent *it,
267 struct md_op_data *op_data,
268 void *lmm, int lmmsize,
271 struct ptlrpc_request *req;
272 struct obd_device *obddev = class_exp2obd(exp);
273 struct ldlm_intent *lit;
279 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
281 /* XXX: openlock is not cancelled for cross-refs. */
282 /* If inode is known, cancel conflicting OPEN locks. */
283 if (fid_is_sane(&op_data->op_fid2)) {
284 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
287 else if (it->it_flags & FMODE_EXEC)
292 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
297 /* If CREATE, cancel parent's UPDATE lock. */
298 if (it->it_op & IT_CREAT)
302 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
304 MDS_INODELOCK_UPDATE);
306 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
307 &RQF_LDLM_INTENT_OPEN);
309 ldlm_lock_list_put(&cancels, l_bl_ast, count);
310 return ERR_PTR(-ENOMEM);
313 /* parent capability */
314 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
315 /* child capability, reserve the size according to parent capa, it will
316 * be filled after we get the reply */
317 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
319 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
320 op_data->op_namelen + 1);
321 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
322 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
324 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
326 ptlrpc_request_free(req);
330 spin_lock(&req->rq_lock);
331 req->rq_replay = req->rq_import->imp_replayable;
332 spin_unlock(&req->rq_lock);
334 /* pack the intent */
335 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
336 lit->opc = (__u64)it->it_op;
338 /* pack the intended request */
339 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
342 /* for remote client, fetch remote perm for current user */
343 if (client_is_remote(exp))
344 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
345 sizeof(struct mdt_remote_perm));
346 ptlrpc_request_set_replen(req);
350 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
351 struct lookup_intent *it,
352 struct md_op_data *op_data)
354 struct ptlrpc_request *req;
355 struct obd_device *obddev = class_exp2obd(exp);
356 struct ldlm_intent *lit;
359 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
360 &RQF_LDLM_INTENT_UNLINK);
362 return ERR_PTR(-ENOMEM);
364 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
365 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
366 op_data->op_namelen + 1);
368 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
370 ptlrpc_request_free(req);
374 /* pack the intent */
375 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
376 lit->opc = (__u64)it->it_op;
378 /* pack the intended request */
379 mdc_unlink_pack(req, op_data);
381 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
382 obddev->u.cli.cl_max_mds_easize);
383 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
384 obddev->u.cli.cl_max_mds_cookiesize);
385 ptlrpc_request_set_replen(req);
389 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
390 struct lookup_intent *it,
391 struct md_op_data *op_data)
393 struct ptlrpc_request *req;
394 struct obd_device *obddev = class_exp2obd(exp);
395 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
396 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
397 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
398 (client_is_remote(exp) ?
399 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
400 struct ldlm_intent *lit;
403 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
404 &RQF_LDLM_INTENT_GETATTR);
406 return ERR_PTR(-ENOMEM);
408 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
409 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
410 op_data->op_namelen + 1);
412 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
414 ptlrpc_request_free(req);
418 /* pack the intent */
419 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
420 lit->opc = (__u64)it->it_op;
422 /* pack the intended request */
423 mdc_getattr_pack(req, valid, it->it_flags, op_data,
424 obddev->u.cli.cl_max_mds_easize);
426 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
427 obddev->u.cli.cl_max_mds_easize);
428 if (client_is_remote(exp))
429 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
430 sizeof(struct mdt_remote_perm));
431 ptlrpc_request_set_replen(req);
435 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
436 struct lookup_intent *it,
437 struct md_op_data *unused)
439 struct obd_device *obd = class_exp2obd(exp);
440 struct ptlrpc_request *req;
441 struct ldlm_intent *lit;
442 struct layout_intent *layout;
445 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
446 &RQF_LDLM_INTENT_LAYOUT);
448 return ERR_PTR(-ENOMEM);
450 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
451 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
453 ptlrpc_request_free(req);
457 /* pack the intent */
458 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
459 lit->opc = (__u64)it->it_op;
461 /* pack the layout intent request */
462 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
463 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
464 * set for replication */
465 layout->li_opc = LAYOUT_INTENT_ACCESS;
467 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
468 obd->u.cli.cl_max_mds_easize);
469 ptlrpc_request_set_replen(req);
473 static struct ptlrpc_request *
474 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
476 struct ptlrpc_request *req;
479 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
481 return ERR_PTR(-ENOMEM);
483 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
485 ptlrpc_request_free(req);
489 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
490 ptlrpc_request_set_replen(req);
494 static int mdc_finish_enqueue(struct obd_export *exp,
495 struct ptlrpc_request *req,
496 struct ldlm_enqueue_info *einfo,
497 struct lookup_intent *it,
498 struct lustre_handle *lockh,
501 struct req_capsule *pill = &req->rq_pill;
502 struct ldlm_request *lockreq;
503 struct ldlm_reply *lockrep;
504 struct lustre_intent_data *intent = &it->d.lustre;
505 struct ldlm_lock *lock;
506 void *lvb_data = NULL;
510 /* Similarly, if we're going to replay this request, we don't want to
511 * actually get a lock, just perform the intent. */
512 if (req->rq_transno || req->rq_replay) {
513 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
514 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
517 if (rc == ELDLM_LOCK_ABORTED) {
519 memset(lockh, 0, sizeof(*lockh));
521 } else { /* rc = 0 */
522 lock = ldlm_handle2lock(lockh);
523 LASSERT(lock != NULL);
525 /* If the server gave us back a different lock mode, we should
526 * fix up our variables. */
527 if (lock->l_req_mode != einfo->ei_mode) {
528 ldlm_lock_addref(lockh, lock->l_req_mode);
529 ldlm_lock_decref(lockh, einfo->ei_mode);
530 einfo->ei_mode = lock->l_req_mode;
535 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
536 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
538 intent->it_disposition = (int)lockrep->lock_policy_res1;
539 intent->it_status = (int)lockrep->lock_policy_res2;
540 intent->it_lock_mode = einfo->ei_mode;
541 intent->it_lock_handle = lockh->cookie;
542 intent->it_data = req;
544 /* Technically speaking rq_transno must already be zero if
545 * it_status is in error, so the check is a bit redundant */
546 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
547 mdc_clear_replay_flag(req, intent->it_status);
549 /* If we're doing an IT_OPEN which did not result in an actual
550 * successful open, then we need to remove the bit which saves
551 * this request for unconditional replay.
553 * It's important that we do this first! Otherwise we might exit the
554 * function without doing so, and try to replay a failed create
556 if (it->it_op & IT_OPEN && req->rq_replay &&
557 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
558 mdc_clear_replay_flag(req, intent->it_status);
560 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
561 it->it_op, intent->it_disposition, intent->it_status);
563 /* We know what to expect, so we do any byte flipping required here */
564 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
565 struct mdt_body *body;
567 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
569 CERROR ("Can't swab mdt_body\n");
573 if (it_disposition(it, DISP_OPEN_OPEN) &&
574 !it_open_error(DISP_OPEN_OPEN, it)) {
576 * If this is a successful OPEN request, we need to set
577 * replay handler and data early, so that if replay
578 * happens immediately after swabbing below, new reply
579 * is swabbed by that handler correctly.
581 mdc_set_open_replay_data(NULL, NULL, req);
584 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
587 mdc_update_max_ea_from_body(exp, body);
590 * The eadata is opaque; just check that it is there.
591 * Eventually, obd_unpackmd() will check the contents.
593 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
598 /* save lvb data and length in case this is for layout
601 lvb_len = body->eadatasize;
604 * We save the reply LOV EA in case we have to replay a
605 * create for recovery. If we didn't allocate a large
606 * enough request buffer above we need to reallocate it
607 * here to hold the actual LOV EA.
609 * To not save LOV EA if request is not going to replay
610 * (for example error one).
612 if ((it->it_op & IT_OPEN) && req->rq_replay) {
614 if (req_capsule_get_size(pill, &RMF_EADATA,
617 mdc_realloc_openmsg(req, body);
619 req_capsule_shrink(pill, &RMF_EADATA,
623 req_capsule_set_size(pill, &RMF_EADATA,
627 lmm = req_capsule_client_get(pill, &RMF_EADATA);
629 memcpy(lmm, eadata, body->eadatasize);
633 if (body->valid & OBD_MD_FLRMTPERM) {
634 struct mdt_remote_perm *perm;
636 LASSERT(client_is_remote(exp));
637 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
638 lustre_swab_mdt_remote_perm);
642 if (body->valid & OBD_MD_FLMDSCAPA) {
643 struct lustre_capa *capa, *p;
645 capa = req_capsule_server_get(pill, &RMF_CAPA1);
649 if (it->it_op & IT_OPEN) {
650 /* client fid capa will be checked in replay */
651 p = req_capsule_client_get(pill, &RMF_CAPA2);
656 if (body->valid & OBD_MD_FLOSSCAPA) {
657 struct lustre_capa *capa;
659 capa = req_capsule_server_get(pill, &RMF_CAPA2);
663 } else if (it->it_op & IT_LAYOUT) {
664 /* maybe the lock was granted right away and layout
665 * is packed into RMF_DLM_LVB of req */
666 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
668 lvb_data = req_capsule_server_sized_get(pill,
669 &RMF_DLM_LVB, lvb_len);
670 if (lvb_data == NULL)
675 /* fill in stripe data for layout lock */
676 lock = ldlm_handle2lock(lockh);
677 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
680 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
681 ldlm_it2str(it->it_op), lvb_len);
683 OBD_ALLOC_LARGE(lmm, lvb_len);
688 memcpy(lmm, lvb_data, lvb_len);
690 /* install lvb_data */
691 lock_res_and_lock(lock);
692 if (lock->l_lvb_data == NULL) {
693 lock->l_lvb_data = lmm;
694 lock->l_lvb_len = lvb_len;
697 unlock_res_and_lock(lock);
699 OBD_FREE_LARGE(lmm, lvb_len);
707 /* We always reserve enough space in the reply packet for a stripe MD, because
708 * we don't know in advance the file type. */
709 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
710 struct lookup_intent *it, struct md_op_data *op_data,
711 struct lustre_handle *lockh, void *lmm, int lmmsize,
712 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
714 struct obd_device *obddev = class_exp2obd(exp);
715 struct ptlrpc_request *req = NULL;
716 __u64 flags, saved_flags = extra_lock_flags;
718 struct ldlm_res_id res_id;
719 static const ldlm_policy_data_t lookup_policy =
720 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
721 static const ldlm_policy_data_t update_policy =
722 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
723 static const ldlm_policy_data_t layout_policy =
724 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
725 ldlm_policy_data_t const *policy = &lookup_policy;
726 int generation, resends = 0;
727 struct ldlm_reply *lockrep;
728 enum lvb_type lvb_type = 0;
730 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
733 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
736 saved_flags |= LDLM_FL_HAS_INTENT;
737 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
738 policy = &update_policy;
739 else if (it->it_op & IT_LAYOUT)
740 policy = &layout_policy;
743 LASSERT(reqp == NULL);
745 generation = obddev->u.cli.cl_import->imp_generation;
749 /* The only way right now is FLOCK, in this case we hide flock
750 policy as lmm, but lmmsize is 0 */
751 LASSERT(lmm && lmmsize == 0);
752 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
754 policy = (ldlm_policy_data_t *)lmm;
755 res_id.name[3] = LDLM_FLOCK;
756 } else if (it->it_op & IT_OPEN) {
757 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
759 policy = &update_policy;
760 einfo->ei_cbdata = NULL;
762 } else if (it->it_op & IT_UNLINK) {
763 req = mdc_intent_unlink_pack(exp, it, op_data);
764 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
765 req = mdc_intent_getattr_pack(exp, it, op_data);
766 } else if (it->it_op & IT_READDIR) {
767 req = mdc_enqueue_pack(exp, 0);
768 } else if (it->it_op & IT_LAYOUT) {
769 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
772 req = mdc_intent_layout_pack(exp, it, op_data);
773 lvb_type = LVB_T_LAYOUT;
782 if (req != NULL && it && it->it_op & IT_CREAT)
783 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
785 req->rq_no_retry_einprogress = 1;
788 req->rq_generation_set = 1;
789 req->rq_import_generation = generation;
790 req->rq_sent = cfs_time_current_sec() + resends;
793 /* It is important to obtain rpc_lock first (if applicable), so that
794 * threads that are serialised with rpc_lock are not polluting our
795 * rpcs in flight counter. We do not do flock request limiting, though*/
797 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
798 rc = mdc_enter_request(&obddev->u.cli);
800 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
801 mdc_clear_replay_flag(req, 0);
802 ptlrpc_req_finished(req);
807 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
808 0, lvb_type, lockh, 0);
810 /* For flock requests we immediatelly return without further
811 delay and let caller deal with the rest, since rest of
812 this function metadata processing makes no sense for flock
813 requests anyway. But in case of problem during comms with
814 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
815 can not rely on caller and this mainly for F_UNLCKs
816 (explicits or automatically generated by Kernel to clean
817 current FLocks upon exit) that can't be trashed */
818 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
823 mdc_exit_request(&obddev->u.cli);
824 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
827 CERROR("ldlm_cli_enqueue: %d\n", rc);
828 mdc_clear_replay_flag(req, rc);
829 ptlrpc_req_finished(req);
833 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
834 LASSERT(lockrep != NULL);
836 lockrep->lock_policy_res2 =
837 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
839 /* Retry the create infinitely when we get -EINPROGRESS from
840 * server. This is required by the new quota design. */
841 if (it && it->it_op & IT_CREAT &&
842 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
843 mdc_clear_replay_flag(req, rc);
844 ptlrpc_req_finished(req);
847 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
848 obddev->obd_name, resends, it->it_op,
849 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
851 if (generation == obddev->u.cli.cl_import->imp_generation) {
854 CDEBUG(D_HA, "resend cross eviction\n");
859 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
861 if (lustre_handle_is_used(lockh)) {
862 ldlm_lock_decref(lockh, einfo->ei_mode);
863 memset(lockh, 0, sizeof(*lockh));
865 ptlrpc_req_finished(req);
870 static int mdc_finish_intent_lock(struct obd_export *exp,
871 struct ptlrpc_request *request,
872 struct md_op_data *op_data,
873 struct lookup_intent *it,
874 struct lustre_handle *lockh)
876 struct lustre_handle old_lock;
877 struct mdt_body *mdt_body;
878 struct ldlm_lock *lock;
881 LASSERT(request != NULL);
882 LASSERT(request != LP_POISON);
883 LASSERT(request->rq_repmsg != LP_POISON);
885 if (!it_disposition(it, DISP_IT_EXECD)) {
886 /* The server failed before it even started executing the
887 * intent, i.e. because it couldn't unpack the request. */
888 LASSERT(it->d.lustre.it_status != 0);
889 return it->d.lustre.it_status;
891 rc = it_open_error(DISP_IT_EXECD, it);
895 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
896 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
898 /* If we were revalidating a fid/name pair, mark the intent in
899 * case we fail and get called again from lookup */
900 if (fid_is_sane(&op_data->op_fid2) &&
901 it->it_create_mode & M_CHECK_STALE &&
902 it->it_op != IT_GETATTR) {
903 it_set_disposition(it, DISP_ENQ_COMPLETE);
905 /* Also: did we find the same inode? */
906 /* sever can return one of two fids:
907 * op_fid2 - new allocated fid - if file is created.
908 * op_fid3 - existent fid - if file only open.
909 * op_fid3 is saved in lmv_intent_open */
910 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
911 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
912 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
913 "\n", PFID(&op_data->op_fid2),
914 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
919 rc = it_open_error(DISP_LOOKUP_EXECD, it);
923 /* keep requests around for the multiple phases of the call
924 * this shows the DISP_XX must guarantee we make it into the call
926 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
927 it_disposition(it, DISP_OPEN_CREATE) &&
928 !it_open_error(DISP_OPEN_CREATE, it)) {
929 it_set_disposition(it, DISP_ENQ_CREATE_REF);
930 ptlrpc_request_addref(request); /* balanced in ll_create_node */
932 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
933 it_disposition(it, DISP_OPEN_OPEN) &&
934 !it_open_error(DISP_OPEN_OPEN, it)) {
935 it_set_disposition(it, DISP_ENQ_OPEN_REF);
936 ptlrpc_request_addref(request); /* balanced in ll_file_open */
937 /* BUG 11546 - eviction in the middle of open rpc processing */
938 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
941 if (it->it_op & IT_CREAT) {
942 /* XXX this belongs in ll_create_it */
943 } else if (it->it_op == IT_OPEN) {
944 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
946 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
949 /* If we already have a matching lock, then cancel the new
950 * one. We have to set the data here instead of in
951 * mdc_enqueue, because we need to use the child's inode as
952 * the l_ast_data to match, and that's not available until
953 * intent_finish has performed the iget().) */
954 lock = ldlm_handle2lock(lockh);
956 ldlm_policy_data_t policy = lock->l_policy_data;
957 LDLM_DEBUG(lock, "matching against this");
959 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
960 &lock->l_resource->lr_name),
961 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
962 (unsigned long)lock->l_resource->lr_name.name[0],
963 (unsigned long)lock->l_resource->lr_name.name[1],
964 (unsigned long)lock->l_resource->lr_name.name[2],
965 (unsigned long)fid_seq(&mdt_body->fid1),
966 (unsigned long)fid_oid(&mdt_body->fid1),
967 (unsigned long)fid_ver(&mdt_body->fid1));
970 memcpy(&old_lock, lockh, sizeof(*lockh));
971 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
972 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
973 ldlm_lock_decref_and_cancel(lockh,
974 it->d.lustre.it_lock_mode);
975 memcpy(lockh, &old_lock, sizeof(old_lock));
976 it->d.lustre.it_lock_handle = lockh->cookie;
979 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
980 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
981 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
985 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
986 struct lu_fid *fid, __u64 *bits)
988 /* We could just return 1 immediately, but since we should only
989 * be called in revalidate_it if we already have a lock, let's
991 struct ldlm_res_id res_id;
992 struct lustre_handle lockh;
993 ldlm_policy_data_t policy;
996 if (it->d.lustre.it_lock_handle) {
997 lockh.cookie = it->d.lustre.it_lock_handle;
998 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1000 fid_build_reg_res_name(fid, &res_id);
1001 switch (it->it_op) {
1003 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1006 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1009 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1012 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1013 LDLM_FL_BLOCK_GRANTED, &res_id,
1014 LDLM_IBITS, &policy,
1015 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1019 it->d.lustre.it_lock_handle = lockh.cookie;
1020 it->d.lustre.it_lock_mode = mode;
1022 it->d.lustre.it_lock_handle = 0;
1023 it->d.lustre.it_lock_mode = 0;
1030 * This long block is all about fixing up the lock and request state
1031 * so that it is correct as of the moment _before_ the operation was
1032 * applied; that way, the VFS will think that everything is normal and
1033 * call Lustre's regular VFS methods.
1035 * If we're performing a creation, that means that unless the creation
1036 * failed with EEXIST, we should fake up a negative dentry.
1038 * For everything else, we want to lookup to succeed.
1040 * One additional note: if CREATE or OPEN succeeded, we add an extra
1041 * reference to the request because we need to keep it around until
1042 * ll_create/ll_open gets called.
1044 * The server will return to us, in it_disposition, an indication of
1045 * exactly what d.lustre.it_status refers to.
1047 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1048 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1049 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1050 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1053 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1056 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1057 void *lmm, int lmmsize, struct lookup_intent *it,
1058 int lookup_flags, struct ptlrpc_request **reqp,
1059 ldlm_blocking_callback cb_blocking,
1060 __u64 extra_lock_flags)
1062 struct lustre_handle lockh;
1067 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1068 ", intent: %s flags %#o\n", op_data->op_namelen,
1069 op_data->op_name, PFID(&op_data->op_fid2),
1070 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1074 if (fid_is_sane(&op_data->op_fid2) &&
1075 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1076 /* We could just return 1 immediately, but since we should only
1077 * be called in revalidate_it if we already have a lock, let's
1079 it->d.lustre.it_lock_handle = 0;
1080 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1081 /* Only return failure if it was not GETATTR by cfid
1082 (from inode_revalidate) */
1083 if (rc || op_data->op_namelen != 0)
1087 /* lookup_it may be called only after revalidate_it has run, because
1088 * revalidate_it cannot return errors, only zero. Returning zero causes
1089 * this call to lookup, which *can* return an error.
1091 * We only want to execute the request associated with the intent one
1092 * time, however, so don't send the request again. Instead, skip past
1093 * this and use the request from revalidate. In this case, revalidate
1094 * never dropped its reference, so the refcounts are all OK */
1095 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1096 struct ldlm_enqueue_info einfo = {
1097 .ei_type = LDLM_IBITS,
1098 .ei_mode = it_to_lock_mode(it),
1099 .ei_cb_bl = cb_blocking,
1100 .ei_cb_cp = ldlm_completion_ast,
1103 /* For case if upper layer did not alloc fid, do it now. */
1104 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1105 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1107 CERROR("Can't alloc new fid, rc %d\n", rc);
1111 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1112 lmm, lmmsize, NULL, extra_lock_flags);
1115 } else if (!fid_is_sane(&op_data->op_fid2) ||
1116 !(it->it_create_mode & M_CHECK_STALE)) {
1117 /* DISP_ENQ_COMPLETE set means there is extra reference on
1118 * request referenced from this intent, saved for subsequent
1119 * lookup. This path is executed when we proceed to this
1120 * lookup, so we clear DISP_ENQ_COMPLETE */
1121 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1123 *reqp = it->d.lustre.it_data;
1124 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1128 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1129 struct ptlrpc_request *req,
1132 struct mdc_getattr_args *ga = args;
1133 struct obd_export *exp = ga->ga_exp;
1134 struct md_enqueue_info *minfo = ga->ga_minfo;
1135 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1136 struct lookup_intent *it;
1137 struct lustre_handle *lockh;
1138 struct obd_device *obddev;
1139 struct ldlm_reply *lockrep;
1140 __u64 flags = LDLM_FL_HAS_INTENT;
1143 lockh = &minfo->mi_lockh;
1145 obddev = class_exp2obd(exp);
1147 mdc_exit_request(&obddev->u.cli);
1148 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1151 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1152 &flags, NULL, 0, lockh, rc);
1154 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1155 mdc_clear_replay_flag(req, rc);
1159 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1160 LASSERT(lockrep != NULL);
1162 lockrep->lock_policy_res2 =
1163 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1165 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1169 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1172 OBD_FREE_PTR(einfo);
1173 minfo->mi_cb(req, minfo, rc);
1177 int mdc_intent_getattr_async(struct obd_export *exp,
1178 struct md_enqueue_info *minfo,
1179 struct ldlm_enqueue_info *einfo)
1181 struct md_op_data *op_data = &minfo->mi_data;
1182 struct lookup_intent *it = &minfo->mi_it;
1183 struct ptlrpc_request *req;
1184 struct mdc_getattr_args *ga;
1185 struct obd_device *obddev = class_exp2obd(exp);
1186 struct ldlm_res_id res_id;
1187 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1188 * for statahead currently. Consider CMD in future, such two bits
1189 * maybe managed by different MDS, should be adjusted then. */
1190 ldlm_policy_data_t policy = {
1191 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1192 MDS_INODELOCK_UPDATE }
1195 __u64 flags = LDLM_FL_HAS_INTENT;
1197 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1198 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1199 ldlm_it2str(it->it_op), it->it_flags);
1201 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1202 req = mdc_intent_getattr_pack(exp, it, op_data);
1206 rc = mdc_enter_request(&obddev->u.cli);
1208 ptlrpc_req_finished(req);
1212 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1213 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1215 mdc_exit_request(&obddev->u.cli);
1216 ptlrpc_req_finished(req);
1220 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1221 ga = ptlrpc_req_async_args(req);
1223 ga->ga_minfo = minfo;
1224 ga->ga_einfo = einfo;
1226 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1227 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);