4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_LOV
39 #include <linux/libcfs/libcfs.h>
41 #include <obd_class.h>
43 #include <lustre/lustre_idl.h>
45 #include "lov_internal.h"
47 static void lov_init_set(struct lov_request_set *set)
50 atomic_set(&set->set_completes, 0);
51 atomic_set(&set->set_success, 0);
52 atomic_set(&set->set_finish_checked, 0);
54 INIT_LIST_HEAD(&set->set_list);
55 atomic_set(&set->set_refcount, 1);
56 init_waitqueue_head(&set->set_waitq);
57 spin_lock_init(&set->set_lock);
60 void lov_finish_set(struct lov_request_set *set)
62 struct list_head *pos, *n;
65 list_for_each_safe(pos, n, &set->set_list) {
66 struct lov_request *req = list_entry(pos,
69 list_del_init(&req->rq_link);
72 OBDO_FREE(req->rq_oi.oi_oa);
74 OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
75 if (req->rq_oi.oi_osfs)
76 OBD_FREE(req->rq_oi.oi_osfs,
77 sizeof(*req->rq_oi.oi_osfs));
78 OBD_FREE(req, sizeof(*req));
82 int len = set->set_oabufs * sizeof(*set->set_pga);
83 OBD_FREE_LARGE(set->set_pga, len);
86 lov_llh_put(set->set_lockh);
88 OBD_FREE(set, sizeof(*set));
91 int lov_set_finished(struct lov_request_set *set, int idempotent)
93 int completes = atomic_read(&set->set_completes);
95 CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
97 if (completes == set->set_count) {
100 if (atomic_inc_return(&set->set_finish_checked) == 1)
106 void lov_update_set(struct lov_request_set *set,
107 struct lov_request *req, int rc)
109 req->rq_complete = 1;
112 atomic_inc(&set->set_completes);
114 atomic_inc(&set->set_success);
116 wake_up(&set->set_waitq);
119 int lov_update_common_set(struct lov_request_set *set,
120 struct lov_request *req, int rc)
122 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
124 lov_update_set(set, req, rc);
126 /* grace error on inactive ost */
127 if (rc && !(lov->lov_tgts[req->rq_idx] &&
128 lov->lov_tgts[req->rq_idx]->ltd_active))
131 /* FIXME in raid1 regime, should return 0 */
135 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 list_add_tail(&req->rq_link, &set->set_list);
142 static int lov_check_set(struct lov_obd *lov, int idx)
145 mutex_lock(&lov->lov_lock);
147 if (lov->lov_tgts[idx] == NULL ||
148 lov->lov_tgts[idx]->ltd_active ||
149 (lov->lov_tgts[idx]->ltd_exp != NULL &&
150 class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
153 mutex_unlock(&lov->lov_lock);
157 /* Check if the OSC connection exists and is active.
158 * If the OSC has not yet had a chance to connect to the OST the first time,
159 * wait once for it to connect instead of returning an error.
161 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
163 wait_queue_head_t waitq;
164 struct l_wait_info lwi;
165 struct lov_tgt_desc *tgt;
168 mutex_lock(&lov->lov_lock);
170 tgt = lov->lov_tgts[ost_idx];
172 if (unlikely(tgt == NULL))
175 if (likely(tgt->ltd_active))
178 if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried)
181 mutex_unlock(&lov->lov_lock);
183 init_waitqueue_head(&waitq);
184 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
185 cfs_time_seconds(1), NULL, NULL);
187 rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
188 if (tgt != NULL && tgt->ltd_active)
194 mutex_unlock(&lov->lov_lock);
198 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
199 struct lov_oinfo *loi, int flags,
200 struct ost_lvb *lvb, __u32 mode, int rc);
202 static int lov_update_enqueue_lov(struct obd_export *exp,
203 struct lustre_handle *lov_lockhp,
204 struct lov_oinfo *loi, int flags, int idx,
205 struct ost_id *oi, int rc)
207 struct lov_obd *lov = &exp->exp_obd->u.lov;
209 if (rc != ELDLM_OK &&
210 !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
211 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
212 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
213 /* -EUSERS used by OST to report file contention */
214 if (rc != -EINTR && rc != -EUSERS)
215 CERROR("%s: enqueue objid "DOSTID" subobj"
216 DOSTID" on OST idx %d: rc %d\n",
217 exp->exp_obd->obd_name,
218 POSTID(oi), POSTID(&loi->loi_oi),
219 loi->loi_ost_idx, rc);
226 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
228 struct lov_request_set *set = req->rq_rqset;
229 struct lustre_handle *lov_lockhp;
230 struct obd_info *oi = set->set_oi;
231 struct lov_oinfo *loi;
235 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
236 loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
238 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
239 * and that copy can be arbitrarily out of date.
241 * The LOV API is due for a serious rewriting anyways, and this
242 * can be addressed then. */
244 lov_stripe_lock(oi->oi_md);
245 osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
246 &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
247 if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
248 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
249 rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
250 req->rq_idx, &oi->oi_md->lsm_oi, rc);
251 lov_stripe_unlock(oi->oi_md);
252 lov_update_set(set, req, rc);
256 /* The callback for osc_enqueue that updates lov info for every OSC request. */
257 static int cb_update_enqueue(void *cookie, int rc)
259 struct obd_info *oinfo = cookie;
260 struct ldlm_enqueue_info *einfo;
261 struct lov_request *lovreq;
263 lovreq = container_of(oinfo, struct lov_request, rq_oi);
264 einfo = lovreq->rq_rqset->set_ei;
265 return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
268 static int enqueue_done(struct lov_request_set *set, __u32 mode)
270 struct lov_request *req;
271 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
272 int completes = atomic_read(&set->set_completes);
275 /* enqueue/match success, just return */
276 if (completes && completes == atomic_read(&set->set_success))
279 /* cancel enqueued/matched locks */
280 list_for_each_entry(req, &set->set_list, rq_link) {
281 struct lustre_handle *lov_lockhp;
283 if (!req->rq_complete || req->rq_rc)
286 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
288 if (!lustre_handle_is_used(lov_lockhp))
291 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
292 req->rq_oi.oi_md, mode, lov_lockhp);
293 if (rc && lov->lov_tgts[req->rq_idx] &&
294 lov->lov_tgts[req->rq_idx]->ltd_active)
295 CERROR("%s: cancelling obdjid "DOSTID" on OST"
296 "idx %d error: rc = %d\n",
297 set->set_exp->exp_obd->obd_name,
298 POSTID(&req->rq_oi.oi_md->lsm_oi),
302 lov_llh_put(set->set_lockh);
306 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
307 struct ptlrpc_request_set *rqset)
313 LASSERT(set->set_exp);
314 /* Do enqueue_done only for sync requests and if any request
318 atomic_set(&set->set_completes, 0);
319 ret = enqueue_done(set, mode);
320 } else if (set->set_lockh)
321 lov_llh_put(set->set_lockh);
325 return rc ? rc : ret;
328 static void lov_llh_addref(void *llhp)
330 struct lov_lock_handles *llh = llhp;
332 atomic_inc(&llh->llh_refcount);
333 CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
334 atomic_read(&llh->llh_refcount));
337 static struct portals_handle_ops lov_handle_ops = {
338 .hop_addref = lov_llh_addref,
342 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
344 struct lov_lock_handles *llh;
346 OBD_ALLOC(llh, sizeof(*llh) +
347 sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
351 atomic_set(&llh->llh_refcount, 2);
352 llh->llh_stripe_count = lsm->lsm_stripe_count;
353 INIT_LIST_HEAD(&llh->llh_handle.h_link);
354 class_handle_hash(&llh->llh_handle, &lov_handle_ops);
359 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
360 struct ldlm_enqueue_info *einfo,
361 struct lov_request_set **reqset)
363 struct lov_obd *lov = &exp->exp_obd->u.lov;
364 struct lov_request_set *set;
367 OBD_ALLOC(set, sizeof(*set));
375 set->set_lockh = lov_llh_new(oinfo->oi_md);
376 if (set->set_lockh == NULL)
377 GOTO(out_set, rc = -ENOMEM);
378 oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
380 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
381 struct lov_oinfo *loi;
382 struct lov_request *req;
385 loi = oinfo->oi_md->lsm_oinfo[i];
386 if (!lov_stripe_intersects(oinfo->oi_md, i,
387 oinfo->oi_policy.l_extent.start,
388 oinfo->oi_policy.l_extent.end,
392 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
393 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
397 OBD_ALLOC(req, sizeof(*req));
399 GOTO(out_set, rc = -ENOMEM);
401 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
402 sizeof(struct lov_oinfo *) +
403 sizeof(struct lov_oinfo);
404 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
405 if (req->rq_oi.oi_md == NULL) {
406 OBD_FREE(req, sizeof(*req));
407 GOTO(out_set, rc = -ENOMEM);
409 req->rq_oi.oi_md->lsm_oinfo[0] =
410 ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
411 sizeof(struct lov_oinfo *);
413 /* Set lov request specific parameters. */
414 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
415 req->rq_oi.oi_cb_up = cb_update_enqueue;
416 req->rq_oi.oi_flags = oinfo->oi_flags;
418 LASSERT(req->rq_oi.oi_lockh);
420 req->rq_oi.oi_policy.l_extent.gid =
421 oinfo->oi_policy.l_extent.gid;
422 req->rq_oi.oi_policy.l_extent.start = start;
423 req->rq_oi.oi_policy.l_extent.end = end;
425 req->rq_idx = loi->loi_ost_idx;
428 /* XXX LOV STACKING: submd should be from the subobj */
429 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
430 req->rq_oi.oi_md->lsm_stripe_count = 0;
431 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
433 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
434 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
436 lov_set_add_req(req, set);
439 GOTO(out_set, rc = -EIO);
443 lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
447 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
453 LASSERT(set->set_exp);
454 rc = enqueue_done(set, mode);
455 if ((set->set_count == atomic_read(&set->set_success)) &&
456 (flags & LDLM_FL_TEST_LOCK))
457 lov_llh_put(set->set_lockh);
464 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
465 struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
466 __u32 mode, struct lustre_handle *lockh,
467 struct lov_request_set **reqset)
469 struct lov_obd *lov = &exp->exp_obd->u.lov;
470 struct lov_request_set *set;
473 OBD_ALLOC(set, sizeof(*set));
480 set->set_oi->oi_md = lsm;
481 set->set_lockh = lov_llh_new(lsm);
482 if (set->set_lockh == NULL)
483 GOTO(out_set, rc = -ENOMEM);
484 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
486 for (i = 0; i < lsm->lsm_stripe_count; i++){
487 struct lov_oinfo *loi;
488 struct lov_request *req;
491 loi = lsm->lsm_oinfo[i];
492 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
493 policy->l_extent.end, &start, &end))
496 /* FIXME raid1 should grace this error */
497 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
498 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
499 GOTO(out_set, rc = -EIO);
502 OBD_ALLOC(req, sizeof(*req));
504 GOTO(out_set, rc = -ENOMEM);
506 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
507 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
508 if (req->rq_oi.oi_md == NULL) {
509 OBD_FREE(req, sizeof(*req));
510 GOTO(out_set, rc = -ENOMEM);
513 req->rq_oi.oi_policy.l_extent.start = start;
514 req->rq_oi.oi_policy.l_extent.end = end;
515 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
517 req->rq_idx = loi->loi_ost_idx;
520 /* XXX LOV STACKING: submd should be from the subobj */
521 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
522 req->rq_oi.oi_md->lsm_stripe_count = 0;
524 lov_set_add_req(req, set);
527 GOTO(out_set, rc = -EIO);
531 lov_fini_match_set(set, mode, 0);
535 int lov_fini_cancel_set(struct lov_request_set *set)
542 LASSERT(set->set_exp);
544 lov_llh_put(set->set_lockh);
551 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
552 struct lov_stripe_md *lsm, __u32 mode,
553 struct lustre_handle *lockh,
554 struct lov_request_set **reqset)
556 struct lov_request_set *set;
559 OBD_ALLOC(set, sizeof(*set));
566 set->set_oi->oi_md = lsm;
567 set->set_lockh = lov_handle2llh(lockh);
568 if (set->set_lockh == NULL) {
569 CERROR("LOV: invalid lov lock handle %p\n", lockh);
570 GOTO(out_set, rc = -EINVAL);
572 lockh->cookie = set->set_lockh->llh_handle.h_cookie;
574 for (i = 0; i < lsm->lsm_stripe_count; i++){
575 struct lov_request *req;
576 struct lustre_handle *lov_lockhp;
577 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
579 lov_lockhp = set->set_lockh->llh_handles + i;
580 if (!lustre_handle_is_used(lov_lockhp)) {
581 CDEBUG(D_INFO, "lov idx %d subobj "DOSTID" no lock\n",
582 loi->loi_ost_idx, POSTID(&loi->loi_oi));
586 OBD_ALLOC(req, sizeof(*req));
588 GOTO(out_set, rc = -ENOMEM);
590 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
591 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
592 if (req->rq_oi.oi_md == NULL) {
593 OBD_FREE(req, sizeof(*req));
594 GOTO(out_set, rc = -ENOMEM);
597 req->rq_idx = loi->loi_ost_idx;
600 /* XXX LOV STACKING: submd should be from the subobj */
601 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
602 req->rq_oi.oi_md->lsm_stripe_count = 0;
604 lov_set_add_req(req, set);
607 GOTO(out_set, rc = -EIO);
611 lov_fini_cancel_set(set);
614 static int common_attr_done(struct lov_request_set *set)
616 struct list_head *pos;
617 struct lov_request *req;
619 int rc = 0, attrset = 0;
621 LASSERT(set->set_oi != NULL);
623 if (set->set_oi->oi_oa == NULL)
626 if (!atomic_read(&set->set_success))
631 GOTO(out, rc = -ENOMEM);
633 list_for_each (pos, &set->set_list) {
634 req = list_entry(pos, struct lov_request, rq_link);
636 if (!req->rq_complete || req->rq_rc)
638 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */
640 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
641 req->rq_oi.oi_oa->o_valid,
642 set->set_oi->oi_md, req->rq_stripe, &attrset);
645 CERROR("No stripes had valid attrs\n");
648 if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
649 (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
650 /* When we take attributes of some epoch, we require all the
651 * ost to be active. */
652 CERROR("Not all the stripes had valid attrs\n");
653 GOTO(out, rc = -EIO);
656 tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
657 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
665 static int brw_done(struct lov_request_set *set)
667 struct lov_stripe_md *lsm = set->set_oi->oi_md;
668 struct lov_oinfo *loi = NULL;
669 struct list_head *pos;
670 struct lov_request *req;
672 list_for_each (pos, &set->set_list) {
673 req = list_entry(pos, struct lov_request, rq_link);
675 if (!req->rq_complete || req->rq_rc)
678 loi = lsm->lsm_oinfo[req->rq_stripe];
680 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
681 loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
687 int lov_fini_brw_set(struct lov_request_set *set)
693 LASSERT(set->set_exp);
694 if (atomic_read(&set->set_completes)) {
696 /* FIXME update qos data here */
703 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
704 obd_count oa_bufs, struct brw_page *pga,
705 struct obd_trans_info *oti,
706 struct lov_request_set **reqset)
713 struct lov_request_set *set;
714 struct lov_obd *lov = &exp->exp_obd->u.lov;
715 int rc = 0, i, shift;
717 OBD_ALLOC(set, sizeof(*set));
725 set->set_oabufs = oa_bufs;
726 OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
728 GOTO(out, rc = -ENOMEM);
730 OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
732 GOTO(out, rc = -ENOMEM);
734 /* calculate the page count for each stripe */
735 for (i = 0; i < oa_bufs; i++) {
736 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
737 info[stripe].count++;
740 /* alloc and initialize lov request */
742 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
743 struct lov_oinfo *loi = NULL;
744 struct lov_request *req;
746 if (info[i].count == 0)
749 loi = oinfo->oi_md->lsm_oinfo[i];
750 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
751 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
752 GOTO(out, rc = -EIO);
755 OBD_ALLOC(req, sizeof(*req));
757 GOTO(out, rc = -ENOMEM);
759 OBDO_ALLOC(req->rq_oi.oi_oa);
760 if (req->rq_oi.oi_oa == NULL) {
761 OBD_FREE(req, sizeof(*req));
762 GOTO(out, rc = -ENOMEM);
766 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
767 sizeof(*req->rq_oi.oi_oa));
769 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
770 req->rq_oi.oi_oa->o_stripe_idx = i;
772 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
773 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
774 if (req->rq_oi.oi_md == NULL) {
775 OBDO_FREE(req->rq_oi.oi_oa);
776 OBD_FREE(req, sizeof(*req));
777 GOTO(out, rc = -ENOMEM);
780 req->rq_idx = loi->loi_ost_idx;
783 /* XXX LOV STACKING */
784 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
785 req->rq_oabufs = info[i].count;
786 req->rq_pgaidx = shift;
787 shift += req->rq_oabufs;
789 /* remember the index for sort brw_page array */
790 info[i].index = req->rq_pgaidx;
792 req->rq_oi.oi_capa = oinfo->oi_capa;
794 lov_set_add_req(req, set);
797 GOTO(out, rc = -EIO);
799 /* rotate & sort the brw_page array */
800 for (i = 0; i < oa_bufs; i++) {
801 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
803 shift = info[stripe].index + info[stripe].off;
804 LASSERT(shift < oa_bufs);
805 set->set_pga[shift] = pga[i];
806 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
807 &set->set_pga[shift].off);
813 sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
818 lov_fini_brw_set(set);
823 int lov_fini_getattr_set(struct lov_request_set *set)
829 LASSERT(set->set_exp);
830 if (atomic_read(&set->set_completes))
831 rc = common_attr_done(set);
838 /* The callback for osc_getattr_async that finilizes a request info when a
839 * response is received. */
840 static int cb_getattr_update(void *cookie, int rc)
842 struct obd_info *oinfo = cookie;
843 struct lov_request *lovreq;
844 lovreq = container_of(oinfo, struct lov_request, rq_oi);
845 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
848 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
849 struct lov_request_set **reqset)
851 struct lov_request_set *set;
852 struct lov_obd *lov = &exp->exp_obd->u.lov;
855 OBD_ALLOC(set, sizeof(*set));
863 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
864 struct lov_oinfo *loi;
865 struct lov_request *req;
867 loi = oinfo->oi_md->lsm_oinfo[i];
868 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
869 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
870 if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
871 /* SOM requires all the OSTs to be active. */
872 GOTO(out_set, rc = -EIO);
876 OBD_ALLOC(req, sizeof(*req));
878 GOTO(out_set, rc = -ENOMEM);
881 req->rq_idx = loi->loi_ost_idx;
883 OBDO_ALLOC(req->rq_oi.oi_oa);
884 if (req->rq_oi.oi_oa == NULL) {
885 OBD_FREE(req, sizeof(*req));
886 GOTO(out_set, rc = -ENOMEM);
888 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
889 sizeof(*req->rq_oi.oi_oa));
890 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
891 req->rq_oi.oi_cb_up = cb_getattr_update;
892 req->rq_oi.oi_capa = oinfo->oi_capa;
894 lov_set_add_req(req, set);
897 GOTO(out_set, rc = -EIO);
901 lov_fini_getattr_set(set);
905 int lov_fini_destroy_set(struct lov_request_set *set)
909 LASSERT(set->set_exp);
910 if (atomic_read(&set->set_completes)) {
911 /* FIXME update qos data here */
919 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
920 struct obdo *src_oa, struct lov_stripe_md *lsm,
921 struct obd_trans_info *oti,
922 struct lov_request_set **reqset)
924 struct lov_request_set *set;
925 struct lov_obd *lov = &exp->exp_obd->u.lov;
928 OBD_ALLOC(set, sizeof(*set));
935 set->set_oi->oi_md = lsm;
936 set->set_oi->oi_oa = src_oa;
938 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
939 set->set_cookies = oti->oti_logcookies;
941 for (i = 0; i < lsm->lsm_stripe_count; i++) {
942 struct lov_oinfo *loi;
943 struct lov_request *req;
945 loi = lsm->lsm_oinfo[i];
946 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
947 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
951 OBD_ALLOC(req, sizeof(*req));
953 GOTO(out_set, rc = -ENOMEM);
956 req->rq_idx = loi->loi_ost_idx;
958 OBDO_ALLOC(req->rq_oi.oi_oa);
959 if (req->rq_oi.oi_oa == NULL) {
960 OBD_FREE(req, sizeof(*req));
961 GOTO(out_set, rc = -ENOMEM);
963 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
964 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
965 lov_set_add_req(req, set);
968 GOTO(out_set, rc = -EIO);
972 lov_fini_destroy_set(set);
976 int lov_fini_setattr_set(struct lov_request_set *set)
982 LASSERT(set->set_exp);
983 if (atomic_read(&set->set_completes)) {
984 rc = common_attr_done(set);
985 /* FIXME update qos data here */
992 int lov_update_setattr_set(struct lov_request_set *set,
993 struct lov_request *req, int rc)
995 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
996 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
998 lov_update_set(set, req, rc);
1000 /* grace error on inactive ost */
1001 if (rc && !(lov->lov_tgts[req->rq_idx] &&
1002 lov->lov_tgts[req->rq_idx]->ltd_active))
1006 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1007 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1008 req->rq_oi.oi_oa->o_ctime;
1009 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1010 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1011 req->rq_oi.oi_oa->o_mtime;
1012 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1013 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1014 req->rq_oi.oi_oa->o_atime;
1020 /* The callback for osc_setattr_async that finilizes a request info when a
1021 * response is received. */
1022 static int cb_setattr_update(void *cookie, int rc)
1024 struct obd_info *oinfo = cookie;
1025 struct lov_request *lovreq;
1026 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1027 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1030 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1031 struct obd_trans_info *oti,
1032 struct lov_request_set **reqset)
1034 struct lov_request_set *set;
1035 struct lov_obd *lov = &exp->exp_obd->u.lov;
1038 OBD_ALLOC(set, sizeof(*set));
1045 set->set_oi = oinfo;
1046 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1047 set->set_cookies = oti->oti_logcookies;
1049 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1050 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1051 struct lov_request *req;
1053 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1054 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1058 OBD_ALLOC(req, sizeof(*req));
1060 GOTO(out_set, rc = -ENOMEM);
1062 req->rq_idx = loi->loi_ost_idx;
1064 OBDO_ALLOC(req->rq_oi.oi_oa);
1065 if (req->rq_oi.oi_oa == NULL) {
1066 OBD_FREE(req, sizeof(*req));
1067 GOTO(out_set, rc = -ENOMEM);
1069 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1070 sizeof(*req->rq_oi.oi_oa));
1071 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1072 req->rq_oi.oi_oa->o_stripe_idx = i;
1073 req->rq_oi.oi_cb_up = cb_setattr_update;
1074 req->rq_oi.oi_capa = oinfo->oi_capa;
1076 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1077 int off = lov_stripe_offset(oinfo->oi_md,
1078 oinfo->oi_oa->o_size, i,
1079 &req->rq_oi.oi_oa->o_size);
1081 if (off < 0 && req->rq_oi.oi_oa->o_size)
1082 req->rq_oi.oi_oa->o_size--;
1084 CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1085 i, req->rq_oi.oi_oa->o_size,
1086 oinfo->oi_oa->o_size);
1088 lov_set_add_req(req, set);
1090 if (!set->set_count)
1091 GOTO(out_set, rc = -EIO);
1095 lov_fini_setattr_set(set);
1099 int lov_fini_punch_set(struct lov_request_set *set)
1105 LASSERT(set->set_exp);
1106 if (atomic_read(&set->set_completes)) {
1108 /* FIXME update qos data here */
1109 if (atomic_read(&set->set_success))
1110 rc = common_attr_done(set);
1113 lov_put_reqset(set);
1118 int lov_update_punch_set(struct lov_request_set *set,
1119 struct lov_request *req, int rc)
1121 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1122 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1124 lov_update_set(set, req, rc);
1126 /* grace error on inactive ost */
1127 if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1131 lov_stripe_lock(lsm);
1132 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1133 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1134 req->rq_oi.oi_oa->o_blocks;
1137 lov_stripe_unlock(lsm);
1143 /* The callback for osc_punch that finilizes a request info when a response
1145 static int cb_update_punch(void *cookie, int rc)
1147 struct obd_info *oinfo = cookie;
1148 struct lov_request *lovreq;
1149 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1150 return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1153 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1154 struct obd_trans_info *oti,
1155 struct lov_request_set **reqset)
1157 struct lov_request_set *set;
1158 struct lov_obd *lov = &exp->exp_obd->u.lov;
1161 OBD_ALLOC(set, sizeof(*set));
1166 set->set_oi = oinfo;
1169 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1170 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1171 struct lov_request *req;
1174 if (!lov_stripe_intersects(oinfo->oi_md, i,
1175 oinfo->oi_policy.l_extent.start,
1176 oinfo->oi_policy.l_extent.end,
1180 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1181 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1182 GOTO(out_set, rc = -EIO);
1185 OBD_ALLOC(req, sizeof(*req));
1187 GOTO(out_set, rc = -ENOMEM);
1189 req->rq_idx = loi->loi_ost_idx;
1191 OBDO_ALLOC(req->rq_oi.oi_oa);
1192 if (req->rq_oi.oi_oa == NULL) {
1193 OBD_FREE(req, sizeof(*req));
1194 GOTO(out_set, rc = -ENOMEM);
1196 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1197 sizeof(*req->rq_oi.oi_oa));
1198 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1199 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1201 req->rq_oi.oi_oa->o_stripe_idx = i;
1202 req->rq_oi.oi_cb_up = cb_update_punch;
1204 req->rq_oi.oi_policy.l_extent.start = rs;
1205 req->rq_oi.oi_policy.l_extent.end = re;
1206 req->rq_oi.oi_policy.l_extent.gid = -1;
1208 req->rq_oi.oi_capa = oinfo->oi_capa;
1210 lov_set_add_req(req, set);
1212 if (!set->set_count)
1213 GOTO(out_set, rc = -EIO);
1217 lov_fini_punch_set(set);
1221 int lov_fini_sync_set(struct lov_request_set *set)
1227 LASSERT(set->set_exp);
1228 if (atomic_read(&set->set_completes)) {
1229 if (!atomic_read(&set->set_success))
1231 /* FIXME update qos data here */
1234 lov_put_reqset(set);
1239 /* The callback for osc_sync that finilizes a request info when a
1240 * response is recieved. */
1241 static int cb_sync_update(void *cookie, int rc)
1243 struct obd_info *oinfo = cookie;
1244 struct lov_request *lovreq;
1246 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1247 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1250 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1251 obd_off start, obd_off end,
1252 struct lov_request_set **reqset)
1254 struct lov_request_set *set;
1255 struct lov_obd *lov = &exp->exp_obd->u.lov;
1264 set->set_oi = oinfo;
1266 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1267 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1268 struct lov_request *req;
1271 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1272 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1276 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1282 GOTO(out_set, rc = -ENOMEM);
1284 req->rq_idx = loi->loi_ost_idx;
1286 OBDO_ALLOC(req->rq_oi.oi_oa);
1287 if (req->rq_oi.oi_oa == NULL) {
1288 OBD_FREE(req, sizeof(*req));
1289 GOTO(out_set, rc = -ENOMEM);
1291 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1292 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1293 req->rq_oi.oi_oa->o_stripe_idx = i;
1295 req->rq_oi.oi_policy.l_extent.start = rs;
1296 req->rq_oi.oi_policy.l_extent.end = re;
1297 req->rq_oi.oi_policy.l_extent.gid = -1;
1298 req->rq_oi.oi_cb_up = cb_sync_update;
1300 lov_set_add_req(req, set);
1302 if (!set->set_count)
1303 GOTO(out_set, rc = -EIO);
1307 lov_fini_sync_set(set);
1311 #define LOV_U64_MAX ((__u64)~0ULL)
1312 #define LOV_SUM_MAX(tot, add) \
1314 if ((tot) + (add) < (tot)) \
1315 (tot) = LOV_U64_MAX; \
1320 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1323 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1325 if (osfs->os_files != LOV_U64_MAX)
1326 lov_do_div64(osfs->os_files, expected_stripes);
1327 if (osfs->os_ffree != LOV_U64_MAX)
1328 lov_do_div64(osfs->os_ffree, expected_stripes);
1330 spin_lock(&obd->obd_osfs_lock);
1331 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1332 obd->obd_osfs_age = cfs_time_current_64();
1333 spin_unlock(&obd->obd_osfs_lock);
1340 int lov_fini_statfs_set(struct lov_request_set *set)
1347 if (atomic_read(&set->set_completes)) {
1348 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1349 atomic_read(&set->set_success));
1351 lov_put_reqset(set);
1355 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1358 int shift = 0, quit = 0;
1362 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1364 if (osfs->os_bsize != lov_sfs->os_bsize) {
1365 /* assume all block sizes are always powers of 2 */
1366 /* get the bits difference */
1367 tmp = osfs->os_bsize | lov_sfs->os_bsize;
1368 for (shift = 0; shift <= 64; ++shift) {
1380 if (osfs->os_bsize < lov_sfs->os_bsize) {
1381 osfs->os_bsize = lov_sfs->os_bsize;
1383 osfs->os_bfree >>= shift;
1384 osfs->os_bavail >>= shift;
1385 osfs->os_blocks >>= shift;
1386 } else if (shift != 0) {
1387 lov_sfs->os_bfree >>= shift;
1388 lov_sfs->os_bavail >>= shift;
1389 lov_sfs->os_blocks >>= shift;
1391 osfs->os_bfree += lov_sfs->os_bfree;
1392 osfs->os_bavail += lov_sfs->os_bavail;
1393 osfs->os_blocks += lov_sfs->os_blocks;
1394 /* XXX not sure about this one - depends on policy.
1395 * - could be minimum if we always stripe on all OBDs
1396 * (but that would be wrong for any other policy,
1397 * if one of the OBDs has no more objects left)
1398 * - could be sum if we stripe whole objects
1399 * - could be average, just to give a nice number
1401 * To give a "reasonable" (if not wholly accurate)
1402 * number, we divide the total number of free objects
1403 * by expected stripe count (watch out for overflow).
1405 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1406 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1410 /* The callback for osc_statfs_async that finilizes a request info when a
1411 * response is received. */
1412 static int cb_statfs_update(void *cookie, int rc)
1414 struct obd_info *oinfo = cookie;
1415 struct lov_request *lovreq;
1416 struct lov_request_set *set;
1417 struct obd_statfs *osfs, *lov_sfs;
1418 struct lov_obd *lov;
1419 struct lov_tgt_desc *tgt;
1420 struct obd_device *lovobd, *tgtobd;
1423 lovreq = container_of(oinfo, struct lov_request, rq_oi);
1424 set = lovreq->rq_rqset;
1425 lovobd = set->set_obd;
1426 lov = &lovobd->u.lov;
1427 osfs = set->set_oi->oi_osfs;
1428 lov_sfs = oinfo->oi_osfs;
1429 success = atomic_read(&set->set_success);
1430 /* XXX: the same is done in lov_update_common_set, however
1431 lovset->set_exp is not initialized. */
1432 lov_update_set(set, lovreq, rc);
1437 tgt = lov->lov_tgts[lovreq->rq_idx];
1438 if (!tgt || !tgt->ltd_active)
1439 GOTO(out_update, rc);
1441 tgtobd = class_exp2obd(tgt->ltd_exp);
1442 spin_lock(&tgtobd->obd_osfs_lock);
1443 memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1444 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1445 tgtobd->obd_osfs_age = cfs_time_current_64();
1446 spin_unlock(&tgtobd->obd_osfs_lock);
1449 lov_update_statfs(osfs, lov_sfs, success);
1453 if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1454 lov_set_finished(set, 0)) {
1455 lov_statfs_interpret(NULL, set, set->set_count !=
1456 atomic_read(&set->set_success));
1462 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1463 struct lov_request_set **reqset)
1465 struct lov_request_set *set;
1466 struct lov_obd *lov = &obd->u.lov;
1469 OBD_ALLOC(set, sizeof(*set));
1475 set->set_oi = oinfo;
1477 /* We only get block data from the OBD */
1478 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1479 struct lov_request *req;
1481 if (lov->lov_tgts[i] == NULL ||
1482 (!lov_check_and_wait_active(lov, i) &&
1483 (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1484 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1488 /* skip targets that have been explicitely disabled by the
1490 if (!lov->lov_tgts[i]->ltd_exp) {
1491 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1495 OBD_ALLOC(req, sizeof(*req));
1497 GOTO(out_set, rc = -ENOMEM);
1499 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1500 if (req->rq_oi.oi_osfs == NULL) {
1501 OBD_FREE(req, sizeof(*req));
1502 GOTO(out_set, rc = -ENOMEM);
1506 req->rq_oi.oi_cb_up = cb_statfs_update;
1507 req->rq_oi.oi_flags = oinfo->oi_flags;
1509 lov_set_add_req(req, set);
1511 if (!set->set_count)
1512 GOTO(out_set, rc = -EIO);
1516 lov_fini_statfs_set(set);