]> Pileus Git - ~andy/linux/blob - drivers/staging/lustre/lustre/lov/lov_request.c
staging: lustre: Use parenthesis around sizeof
[~andy/linux] / drivers / staging / lustre / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #include <linux/libcfs/libcfs.h>
40
41 #include <obd_class.h>
42 #include <obd_lov.h>
43 #include <lustre/lustre_idl.h>
44
45 #include "lov_internal.h"
46
47 static void lov_init_set(struct lov_request_set *set)
48 {
49         set->set_count = 0;
50         atomic_set(&set->set_completes, 0);
51         atomic_set(&set->set_success, 0);
52         atomic_set(&set->set_finish_checked, 0);
53         set->set_cookies = 0;
54         INIT_LIST_HEAD(&set->set_list);
55         atomic_set(&set->set_refcount, 1);
56         init_waitqueue_head(&set->set_waitq);
57         spin_lock_init(&set->set_lock);
58 }
59
60 void lov_finish_set(struct lov_request_set *set)
61 {
62         struct list_head *pos, *n;
63
64         LASSERT(set);
65         list_for_each_safe(pos, n, &set->set_list) {
66                 struct lov_request *req = list_entry(pos,
67                                                          struct lov_request,
68                                                          rq_link);
69                 list_del_init(&req->rq_link);
70
71                 if (req->rq_oi.oi_oa)
72                         OBDO_FREE(req->rq_oi.oi_oa);
73                 if (req->rq_oi.oi_md)
74                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
75                 if (req->rq_oi.oi_osfs)
76                         OBD_FREE(req->rq_oi.oi_osfs,
77                                  sizeof(*req->rq_oi.oi_osfs));
78                 OBD_FREE(req, sizeof(*req));
79         }
80
81         if (set->set_pga) {
82                 int len = set->set_oabufs * sizeof(*set->set_pga);
83                 OBD_FREE_LARGE(set->set_pga, len);
84         }
85         if (set->set_lockh)
86                 lov_llh_put(set->set_lockh);
87
88         OBD_FREE(set, sizeof(*set));
89 }
90
91 int lov_set_finished(struct lov_request_set *set, int idempotent)
92 {
93         int completes = atomic_read(&set->set_completes);
94
95         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
96
97         if (completes == set->set_count) {
98                 if (idempotent)
99                         return 1;
100                 if (atomic_inc_return(&set->set_finish_checked) == 1)
101                         return 1;
102         }
103         return 0;
104 }
105
106 void lov_update_set(struct lov_request_set *set,
107                     struct lov_request *req, int rc)
108 {
109         req->rq_complete = 1;
110         req->rq_rc = rc;
111
112         atomic_inc(&set->set_completes);
113         if (rc == 0)
114                 atomic_inc(&set->set_success);
115
116         wake_up(&set->set_waitq);
117 }
118
119 int lov_update_common_set(struct lov_request_set *set,
120                           struct lov_request *req, int rc)
121 {
122         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123
124         lov_update_set(set, req, rc);
125
126         /* grace error on inactive ost */
127         if (rc && !(lov->lov_tgts[req->rq_idx] &&
128                     lov->lov_tgts[req->rq_idx]->ltd_active))
129                 rc = 0;
130
131         /* FIXME in raid1 regime, should return 0 */
132         return rc;
133 }
134
135 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
136 {
137         list_add_tail(&req->rq_link, &set->set_list);
138         set->set_count++;
139         req->rq_rqset = set;
140 }
141
142 static int lov_check_set(struct lov_obd *lov, int idx)
143 {
144         int rc = 0;
145         mutex_lock(&lov->lov_lock);
146
147         if (lov->lov_tgts[idx] == NULL ||
148             lov->lov_tgts[idx]->ltd_active ||
149             (lov->lov_tgts[idx]->ltd_exp != NULL &&
150              class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
151                 rc = 1;
152
153         mutex_unlock(&lov->lov_lock);
154         return rc;
155 }
156
157 /* Check if the OSC connection exists and is active.
158  * If the OSC has not yet had a chance to connect to the OST the first time,
159  * wait once for it to connect instead of returning an error.
160  */
161 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
162 {
163         wait_queue_head_t waitq;
164         struct l_wait_info lwi;
165         struct lov_tgt_desc *tgt;
166         int rc = 0;
167
168         mutex_lock(&lov->lov_lock);
169
170         tgt = lov->lov_tgts[ost_idx];
171
172         if (unlikely(tgt == NULL))
173                 GOTO(out, rc = 0);
174
175         if (likely(tgt->ltd_active))
176                 GOTO(out, rc = 1);
177
178         if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried)
179                 GOTO(out, rc = 0);
180
181         mutex_unlock(&lov->lov_lock);
182
183         init_waitqueue_head(&waitq);
184         lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
185                                    cfs_time_seconds(1), NULL, NULL);
186
187         rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
188         if (tgt != NULL && tgt->ltd_active)
189                 return 1;
190
191         return 0;
192
193 out:
194         mutex_unlock(&lov->lov_lock);
195         return rc;
196 }
197
198 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
199                                struct lov_oinfo *loi, int flags,
200                                struct ost_lvb *lvb, __u32 mode, int rc);
201
202 static int lov_update_enqueue_lov(struct obd_export *exp,
203                                   struct lustre_handle *lov_lockhp,
204                                   struct lov_oinfo *loi, int flags, int idx,
205                                   struct ost_id *oi, int rc)
206 {
207         struct lov_obd *lov = &exp->exp_obd->u.lov;
208
209         if (rc != ELDLM_OK &&
210             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
211                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
212                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
213                         /* -EUSERS used by OST to report file contention */
214                         if (rc != -EINTR && rc != -EUSERS)
215                                 CERROR("%s: enqueue objid "DOSTID" subobj"
216                                        DOSTID" on OST idx %d: rc %d\n",
217                                        exp->exp_obd->obd_name,
218                                        POSTID(oi), POSTID(&loi->loi_oi),
219                                        loi->loi_ost_idx, rc);
220                 } else
221                         rc = ELDLM_OK;
222         }
223         return rc;
224 }
225
226 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
227 {
228         struct lov_request_set *set = req->rq_rqset;
229         struct lustre_handle *lov_lockhp;
230         struct obd_info *oi = set->set_oi;
231         struct lov_oinfo *loi;
232
233         LASSERT(oi != NULL);
234
235         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
236         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
237
238         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
239          * and that copy can be arbitrarily out of date.
240          *
241          * The LOV API is due for a serious rewriting anyways, and this
242          * can be addressed then. */
243
244         lov_stripe_lock(oi->oi_md);
245         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
246                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
247         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
248                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
249         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
250                                     req->rq_idx, &oi->oi_md->lsm_oi, rc);
251         lov_stripe_unlock(oi->oi_md);
252         lov_update_set(set, req, rc);
253         return rc;
254 }
255
256 /* The callback for osc_enqueue that updates lov info for every OSC request. */
257 static int cb_update_enqueue(void *cookie, int rc)
258 {
259         struct obd_info *oinfo = cookie;
260         struct ldlm_enqueue_info *einfo;
261         struct lov_request *lovreq;
262
263         lovreq = container_of(oinfo, struct lov_request, rq_oi);
264         einfo = lovreq->rq_rqset->set_ei;
265         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
266 }
267
268 static int enqueue_done(struct lov_request_set *set, __u32 mode)
269 {
270         struct lov_request *req;
271         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
272         int completes = atomic_read(&set->set_completes);
273         int rc = 0;
274
275         /* enqueue/match success, just return */
276         if (completes && completes == atomic_read(&set->set_success))
277                 return 0;
278
279         /* cancel enqueued/matched locks */
280         list_for_each_entry(req, &set->set_list, rq_link) {
281                 struct lustre_handle *lov_lockhp;
282
283                 if (!req->rq_complete || req->rq_rc)
284                         continue;
285
286                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
287                 LASSERT(lov_lockhp);
288                 if (!lustre_handle_is_used(lov_lockhp))
289                         continue;
290
291                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
292                                 req->rq_oi.oi_md, mode, lov_lockhp);
293                 if (rc && lov->lov_tgts[req->rq_idx] &&
294                     lov->lov_tgts[req->rq_idx]->ltd_active)
295                         CERROR("%s: cancelling obdjid "DOSTID" on OST"
296                                "idx %d error: rc = %d\n",
297                                set->set_exp->exp_obd->obd_name,
298                                POSTID(&req->rq_oi.oi_md->lsm_oi),
299                                req->rq_idx, rc);
300         }
301         if (set->set_lockh)
302                 lov_llh_put(set->set_lockh);
303         return rc;
304 }
305
306 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
307                          struct ptlrpc_request_set *rqset)
308 {
309         int ret = 0;
310
311         if (set == NULL)
312                 return 0;
313         LASSERT(set->set_exp);
314         /* Do enqueue_done only for sync requests and if any request
315          * succeeded. */
316         if (!rqset) {
317                 if (rc)
318                         atomic_set(&set->set_completes, 0);
319                 ret = enqueue_done(set, mode);
320         } else if (set->set_lockh)
321                 lov_llh_put(set->set_lockh);
322
323         lov_put_reqset(set);
324
325         return rc ? rc : ret;
326 }
327
328 static void lov_llh_addref(void *llhp)
329 {
330         struct lov_lock_handles *llh = llhp;
331
332         atomic_inc(&llh->llh_refcount);
333         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
334                atomic_read(&llh->llh_refcount));
335 }
336
337 static struct portals_handle_ops lov_handle_ops = {
338         .hop_addref = lov_llh_addref,
339         .hop_free   = NULL,
340 };
341
342 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
343 {
344         struct lov_lock_handles *llh;
345
346         OBD_ALLOC(llh, sizeof(*llh) +
347                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
348         if (llh == NULL)
349                 return NULL;
350
351         atomic_set(&llh->llh_refcount, 2);
352         llh->llh_stripe_count = lsm->lsm_stripe_count;
353         INIT_LIST_HEAD(&llh->llh_handle.h_link);
354         class_handle_hash(&llh->llh_handle, &lov_handle_ops);
355
356         return llh;
357 }
358
359 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
360                          struct ldlm_enqueue_info *einfo,
361                          struct lov_request_set **reqset)
362 {
363         struct lov_obd *lov = &exp->exp_obd->u.lov;
364         struct lov_request_set *set;
365         int i, rc = 0;
366
367         OBD_ALLOC(set, sizeof(*set));
368         if (set == NULL)
369                 return -ENOMEM;
370         lov_init_set(set);
371
372         set->set_exp = exp;
373         set->set_oi = oinfo;
374         set->set_ei = einfo;
375         set->set_lockh = lov_llh_new(oinfo->oi_md);
376         if (set->set_lockh == NULL)
377                 GOTO(out_set, rc = -ENOMEM);
378         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
379
380         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
381                 struct lov_oinfo *loi;
382                 struct lov_request *req;
383                 obd_off start, end;
384
385                 loi = oinfo->oi_md->lsm_oinfo[i];
386                 if (!lov_stripe_intersects(oinfo->oi_md, i,
387                                            oinfo->oi_policy.l_extent.start,
388                                            oinfo->oi_policy.l_extent.end,
389                                            &start, &end))
390                         continue;
391
392                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
393                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
394                         continue;
395                 }
396
397                 OBD_ALLOC(req, sizeof(*req));
398                 if (req == NULL)
399                         GOTO(out_set, rc = -ENOMEM);
400
401                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
402                         sizeof(struct lov_oinfo *) +
403                         sizeof(struct lov_oinfo);
404                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
405                 if (req->rq_oi.oi_md == NULL) {
406                         OBD_FREE(req, sizeof(*req));
407                         GOTO(out_set, rc = -ENOMEM);
408                 }
409                 req->rq_oi.oi_md->lsm_oinfo[0] =
410                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
411                         sizeof(struct lov_oinfo *);
412
413                 /* Set lov request specific parameters. */
414                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
415                 req->rq_oi.oi_cb_up = cb_update_enqueue;
416                 req->rq_oi.oi_flags = oinfo->oi_flags;
417
418                 LASSERT(req->rq_oi.oi_lockh);
419
420                 req->rq_oi.oi_policy.l_extent.gid =
421                         oinfo->oi_policy.l_extent.gid;
422                 req->rq_oi.oi_policy.l_extent.start = start;
423                 req->rq_oi.oi_policy.l_extent.end = end;
424
425                 req->rq_idx = loi->loi_ost_idx;
426                 req->rq_stripe = i;
427
428                 /* XXX LOV STACKING: submd should be from the subobj */
429                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
430                 req->rq_oi.oi_md->lsm_stripe_count = 0;
431                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
432                         loi->loi_kms_valid;
433                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
434                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
435
436                 lov_set_add_req(req, set);
437         }
438         if (!set->set_count)
439                 GOTO(out_set, rc = -EIO);
440         *reqset = set;
441         return 0;
442 out_set:
443         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
444         return rc;
445 }
446
447 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
448 {
449         int rc = 0;
450
451         if (set == NULL)
452                 return 0;
453         LASSERT(set->set_exp);
454         rc = enqueue_done(set, mode);
455         if ((set->set_count == atomic_read(&set->set_success)) &&
456             (flags & LDLM_FL_TEST_LOCK))
457                 lov_llh_put(set->set_lockh);
458
459         lov_put_reqset(set);
460
461         return rc;
462 }
463
464 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
465                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
466                        __u32 mode, struct lustre_handle *lockh,
467                        struct lov_request_set **reqset)
468 {
469         struct lov_obd *lov = &exp->exp_obd->u.lov;
470         struct lov_request_set *set;
471         int i, rc = 0;
472
473         OBD_ALLOC(set, sizeof(*set));
474         if (set == NULL)
475                 return -ENOMEM;
476         lov_init_set(set);
477
478         set->set_exp = exp;
479         set->set_oi = oinfo;
480         set->set_oi->oi_md = lsm;
481         set->set_lockh = lov_llh_new(lsm);
482         if (set->set_lockh == NULL)
483                 GOTO(out_set, rc = -ENOMEM);
484         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
485
486         for (i = 0; i < lsm->lsm_stripe_count; i++){
487                 struct lov_oinfo *loi;
488                 struct lov_request *req;
489                 obd_off start, end;
490
491                 loi = lsm->lsm_oinfo[i];
492                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
493                                            policy->l_extent.end, &start, &end))
494                         continue;
495
496                 /* FIXME raid1 should grace this error */
497                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
498                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
499                         GOTO(out_set, rc = -EIO);
500                 }
501
502                 OBD_ALLOC(req, sizeof(*req));
503                 if (req == NULL)
504                         GOTO(out_set, rc = -ENOMEM);
505
506                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
507                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
508                 if (req->rq_oi.oi_md == NULL) {
509                         OBD_FREE(req, sizeof(*req));
510                         GOTO(out_set, rc = -ENOMEM);
511                 }
512
513                 req->rq_oi.oi_policy.l_extent.start = start;
514                 req->rq_oi.oi_policy.l_extent.end = end;
515                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
516
517                 req->rq_idx = loi->loi_ost_idx;
518                 req->rq_stripe = i;
519
520                 /* XXX LOV STACKING: submd should be from the subobj */
521                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
522                 req->rq_oi.oi_md->lsm_stripe_count = 0;
523
524                 lov_set_add_req(req, set);
525         }
526         if (!set->set_count)
527                 GOTO(out_set, rc = -EIO);
528         *reqset = set;
529         return rc;
530 out_set:
531         lov_fini_match_set(set, mode, 0);
532         return rc;
533 }
534
535 int lov_fini_cancel_set(struct lov_request_set *set)
536 {
537         int rc = 0;
538
539         if (set == NULL)
540                 return 0;
541
542         LASSERT(set->set_exp);
543         if (set->set_lockh)
544                 lov_llh_put(set->set_lockh);
545
546         lov_put_reqset(set);
547
548         return rc;
549 }
550
551 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
552                         struct lov_stripe_md *lsm, __u32 mode,
553                         struct lustre_handle *lockh,
554                         struct lov_request_set **reqset)
555 {
556         struct lov_request_set *set;
557         int i, rc = 0;
558
559         OBD_ALLOC(set, sizeof(*set));
560         if (set == NULL)
561                 return -ENOMEM;
562         lov_init_set(set);
563
564         set->set_exp = exp;
565         set->set_oi = oinfo;
566         set->set_oi->oi_md = lsm;
567         set->set_lockh = lov_handle2llh(lockh);
568         if (set->set_lockh == NULL) {
569                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
570                 GOTO(out_set, rc = -EINVAL);
571         }
572         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
573
574         for (i = 0; i < lsm->lsm_stripe_count; i++){
575                 struct lov_request *req;
576                 struct lustre_handle *lov_lockhp;
577                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
578
579                 lov_lockhp = set->set_lockh->llh_handles + i;
580                 if (!lustre_handle_is_used(lov_lockhp)) {
581                         CDEBUG(D_INFO, "lov idx %d subobj "DOSTID" no lock\n",
582                                loi->loi_ost_idx, POSTID(&loi->loi_oi));
583                         continue;
584                 }
585
586                 OBD_ALLOC(req, sizeof(*req));
587                 if (req == NULL)
588                         GOTO(out_set, rc = -ENOMEM);
589
590                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
591                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
592                 if (req->rq_oi.oi_md == NULL) {
593                         OBD_FREE(req, sizeof(*req));
594                         GOTO(out_set, rc = -ENOMEM);
595                 }
596
597                 req->rq_idx = loi->loi_ost_idx;
598                 req->rq_stripe = i;
599
600                 /* XXX LOV STACKING: submd should be from the subobj */
601                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
602                 req->rq_oi.oi_md->lsm_stripe_count = 0;
603
604                 lov_set_add_req(req, set);
605         }
606         if (!set->set_count)
607                 GOTO(out_set, rc = -EIO);
608         *reqset = set;
609         return rc;
610 out_set:
611         lov_fini_cancel_set(set);
612         return rc;
613 }
614 static int common_attr_done(struct lov_request_set *set)
615 {
616         struct list_head *pos;
617         struct lov_request *req;
618         struct obdo *tmp_oa;
619         int rc = 0, attrset = 0;
620
621         LASSERT(set->set_oi != NULL);
622
623         if (set->set_oi->oi_oa == NULL)
624                 return 0;
625
626         if (!atomic_read(&set->set_success))
627                 return -EIO;
628
629         OBDO_ALLOC(tmp_oa);
630         if (tmp_oa == NULL)
631                 GOTO(out, rc = -ENOMEM);
632
633         list_for_each (pos, &set->set_list) {
634                 req = list_entry(pos, struct lov_request, rq_link);
635
636                 if (!req->rq_complete || req->rq_rc)
637                         continue;
638                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
639                         continue;
640                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
641                                 req->rq_oi.oi_oa->o_valid,
642                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
643         }
644         if (!attrset) {
645                 CERROR("No stripes had valid attrs\n");
646                 rc = -EIO;
647         }
648         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
649             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
650                 /* When we take attributes of some epoch, we require all the
651                  * ost to be active. */
652                 CERROR("Not all the stripes had valid attrs\n");
653                 GOTO(out, rc = -EIO);
654         }
655
656         tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
657         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
658 out:
659         if (tmp_oa)
660                 OBDO_FREE(tmp_oa);
661         return rc;
662
663 }
664
665 static int brw_done(struct lov_request_set *set)
666 {
667         struct lov_stripe_md *lsm = set->set_oi->oi_md;
668         struct lov_oinfo     *loi = NULL;
669         struct list_head *pos;
670         struct lov_request *req;
671
672         list_for_each (pos, &set->set_list) {
673                 req = list_entry(pos, struct lov_request, rq_link);
674
675                 if (!req->rq_complete || req->rq_rc)
676                         continue;
677
678                 loi = lsm->lsm_oinfo[req->rq_stripe];
679
680                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
681                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
682         }
683
684         return 0;
685 }
686
687 int lov_fini_brw_set(struct lov_request_set *set)
688 {
689         int rc = 0;
690
691         if (set == NULL)
692                 return 0;
693         LASSERT(set->set_exp);
694         if (atomic_read(&set->set_completes)) {
695                 rc = brw_done(set);
696                 /* FIXME update qos data here */
697         }
698         lov_put_reqset(set);
699
700         return rc;
701 }
702
703 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
704                      obd_count oa_bufs, struct brw_page *pga,
705                      struct obd_trans_info *oti,
706                      struct lov_request_set **reqset)
707 {
708         struct {
709                 obd_count       index;
710                 obd_count       count;
711                 obd_count       off;
712         } *info = NULL;
713         struct lov_request_set *set;
714         struct lov_obd *lov = &exp->exp_obd->u.lov;
715         int rc = 0, i, shift;
716
717         OBD_ALLOC(set, sizeof(*set));
718         if (set == NULL)
719                 return -ENOMEM;
720         lov_init_set(set);
721
722         set->set_exp = exp;
723         set->set_oti = oti;
724         set->set_oi = oinfo;
725         set->set_oabufs = oa_bufs;
726         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
727         if (!set->set_pga)
728                 GOTO(out, rc = -ENOMEM);
729
730         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
731         if (!info)
732                 GOTO(out, rc = -ENOMEM);
733
734         /* calculate the page count for each stripe */
735         for (i = 0; i < oa_bufs; i++) {
736                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
737                 info[stripe].count++;
738         }
739
740         /* alloc and initialize lov request */
741         shift = 0;
742         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
743                 struct lov_oinfo *loi = NULL;
744                 struct lov_request *req;
745
746                 if (info[i].count == 0)
747                         continue;
748
749                 loi = oinfo->oi_md->lsm_oinfo[i];
750                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
751                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
752                         GOTO(out, rc = -EIO);
753                 }
754
755                 OBD_ALLOC(req, sizeof(*req));
756                 if (req == NULL)
757                         GOTO(out, rc = -ENOMEM);
758
759                 OBDO_ALLOC(req->rq_oi.oi_oa);
760                 if (req->rq_oi.oi_oa == NULL) {
761                         OBD_FREE(req, sizeof(*req));
762                         GOTO(out, rc = -ENOMEM);
763                 }
764
765                 if (oinfo->oi_oa) {
766                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
767                                sizeof(*req->rq_oi.oi_oa));
768                 }
769                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
770                 req->rq_oi.oi_oa->o_stripe_idx = i;
771
772                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
773                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
774                 if (req->rq_oi.oi_md == NULL) {
775                         OBDO_FREE(req->rq_oi.oi_oa);
776                         OBD_FREE(req, sizeof(*req));
777                         GOTO(out, rc = -ENOMEM);
778                 }
779
780                 req->rq_idx = loi->loi_ost_idx;
781                 req->rq_stripe = i;
782
783                 /* XXX LOV STACKING */
784                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
785                 req->rq_oabufs = info[i].count;
786                 req->rq_pgaidx = shift;
787                 shift += req->rq_oabufs;
788
789                 /* remember the index for sort brw_page array */
790                 info[i].index = req->rq_pgaidx;
791
792                 req->rq_oi.oi_capa = oinfo->oi_capa;
793
794                 lov_set_add_req(req, set);
795         }
796         if (!set->set_count)
797                 GOTO(out, rc = -EIO);
798
799         /* rotate & sort the brw_page array */
800         for (i = 0; i < oa_bufs; i++) {
801                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
802
803                 shift = info[stripe].index + info[stripe].off;
804                 LASSERT(shift < oa_bufs);
805                 set->set_pga[shift] = pga[i];
806                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
807                                   &set->set_pga[shift].off);
808                 info[stripe].off++;
809         }
810 out:
811         if (info)
812                 OBD_FREE_LARGE(info,
813                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
814
815         if (rc == 0)
816                 *reqset = set;
817         else
818                 lov_fini_brw_set(set);
819
820         return rc;
821 }
822
823 int lov_fini_getattr_set(struct lov_request_set *set)
824 {
825         int rc = 0;
826
827         if (set == NULL)
828                 return 0;
829         LASSERT(set->set_exp);
830         if (atomic_read(&set->set_completes))
831                 rc = common_attr_done(set);
832
833         lov_put_reqset(set);
834
835         return rc;
836 }
837
838 /* The callback for osc_getattr_async that finilizes a request info when a
839  * response is received. */
840 static int cb_getattr_update(void *cookie, int rc)
841 {
842         struct obd_info *oinfo = cookie;
843         struct lov_request *lovreq;
844         lovreq = container_of(oinfo, struct lov_request, rq_oi);
845         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
846 }
847
848 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
849                          struct lov_request_set **reqset)
850 {
851         struct lov_request_set *set;
852         struct lov_obd *lov = &exp->exp_obd->u.lov;
853         int rc = 0, i;
854
855         OBD_ALLOC(set, sizeof(*set));
856         if (set == NULL)
857                 return -ENOMEM;
858         lov_init_set(set);
859
860         set->set_exp = exp;
861         set->set_oi = oinfo;
862
863         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
864                 struct lov_oinfo *loi;
865                 struct lov_request *req;
866
867                 loi = oinfo->oi_md->lsm_oinfo[i];
868                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
869                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
870                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
871                                 /* SOM requires all the OSTs to be active. */
872                                 GOTO(out_set, rc = -EIO);
873                         continue;
874                 }
875
876                 OBD_ALLOC(req, sizeof(*req));
877                 if (req == NULL)
878                         GOTO(out_set, rc = -ENOMEM);
879
880                 req->rq_stripe = i;
881                 req->rq_idx = loi->loi_ost_idx;
882
883                 OBDO_ALLOC(req->rq_oi.oi_oa);
884                 if (req->rq_oi.oi_oa == NULL) {
885                         OBD_FREE(req, sizeof(*req));
886                         GOTO(out_set, rc = -ENOMEM);
887                 }
888                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
889                        sizeof(*req->rq_oi.oi_oa));
890                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
891                 req->rq_oi.oi_cb_up = cb_getattr_update;
892                 req->rq_oi.oi_capa = oinfo->oi_capa;
893
894                 lov_set_add_req(req, set);
895         }
896         if (!set->set_count)
897                 GOTO(out_set, rc = -EIO);
898         *reqset = set;
899         return rc;
900 out_set:
901         lov_fini_getattr_set(set);
902         return rc;
903 }
904
905 int lov_fini_destroy_set(struct lov_request_set *set)
906 {
907         if (set == NULL)
908                 return 0;
909         LASSERT(set->set_exp);
910         if (atomic_read(&set->set_completes)) {
911                 /* FIXME update qos data here */
912         }
913
914         lov_put_reqset(set);
915
916         return 0;
917 }
918
919 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
920                          struct obdo *src_oa, struct lov_stripe_md *lsm,
921                          struct obd_trans_info *oti,
922                          struct lov_request_set **reqset)
923 {
924         struct lov_request_set *set;
925         struct lov_obd *lov = &exp->exp_obd->u.lov;
926         int rc = 0, i;
927
928         OBD_ALLOC(set, sizeof(*set));
929         if (set == NULL)
930                 return -ENOMEM;
931         lov_init_set(set);
932
933         set->set_exp = exp;
934         set->set_oi = oinfo;
935         set->set_oi->oi_md = lsm;
936         set->set_oi->oi_oa = src_oa;
937         set->set_oti = oti;
938         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
939                 set->set_cookies = oti->oti_logcookies;
940
941         for (i = 0; i < lsm->lsm_stripe_count; i++) {
942                 struct lov_oinfo *loi;
943                 struct lov_request *req;
944
945                 loi = lsm->lsm_oinfo[i];
946                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
947                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
948                         continue;
949                 }
950
951                 OBD_ALLOC(req, sizeof(*req));
952                 if (req == NULL)
953                         GOTO(out_set, rc = -ENOMEM);
954
955                 req->rq_stripe = i;
956                 req->rq_idx = loi->loi_ost_idx;
957
958                 OBDO_ALLOC(req->rq_oi.oi_oa);
959                 if (req->rq_oi.oi_oa == NULL) {
960                         OBD_FREE(req, sizeof(*req));
961                         GOTO(out_set, rc = -ENOMEM);
962                 }
963                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
964                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
965                 lov_set_add_req(req, set);
966         }
967         if (!set->set_count)
968                 GOTO(out_set, rc = -EIO);
969         *reqset = set;
970         return rc;
971 out_set:
972         lov_fini_destroy_set(set);
973         return rc;
974 }
975
976 int lov_fini_setattr_set(struct lov_request_set *set)
977 {
978         int rc = 0;
979
980         if (set == NULL)
981                 return 0;
982         LASSERT(set->set_exp);
983         if (atomic_read(&set->set_completes)) {
984                 rc = common_attr_done(set);
985                 /* FIXME update qos data here */
986         }
987
988         lov_put_reqset(set);
989         return rc;
990 }
991
992 int lov_update_setattr_set(struct lov_request_set *set,
993                            struct lov_request *req, int rc)
994 {
995         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
996         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
997
998         lov_update_set(set, req, rc);
999
1000         /* grace error on inactive ost */
1001         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1002                     lov->lov_tgts[req->rq_idx]->ltd_active))
1003                 rc = 0;
1004
1005         if (rc == 0) {
1006                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1007                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1008                                 req->rq_oi.oi_oa->o_ctime;
1009                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1010                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1011                                 req->rq_oi.oi_oa->o_mtime;
1012                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1013                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1014                                 req->rq_oi.oi_oa->o_atime;
1015         }
1016
1017         return rc;
1018 }
1019
1020 /* The callback for osc_setattr_async that finilizes a request info when a
1021  * response is received. */
1022 static int cb_setattr_update(void *cookie, int rc)
1023 {
1024         struct obd_info *oinfo = cookie;
1025         struct lov_request *lovreq;
1026         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1027         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1028 }
1029
1030 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1031                          struct obd_trans_info *oti,
1032                          struct lov_request_set **reqset)
1033 {
1034         struct lov_request_set *set;
1035         struct lov_obd *lov = &exp->exp_obd->u.lov;
1036         int rc = 0, i;
1037
1038         OBD_ALLOC(set, sizeof(*set));
1039         if (set == NULL)
1040                 return -ENOMEM;
1041         lov_init_set(set);
1042
1043         set->set_exp = exp;
1044         set->set_oti = oti;
1045         set->set_oi = oinfo;
1046         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1047                 set->set_cookies = oti->oti_logcookies;
1048
1049         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1050                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1051                 struct lov_request *req;
1052
1053                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1054                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1055                         continue;
1056                 }
1057
1058                 OBD_ALLOC(req, sizeof(*req));
1059                 if (req == NULL)
1060                         GOTO(out_set, rc = -ENOMEM);
1061                 req->rq_stripe = i;
1062                 req->rq_idx = loi->loi_ost_idx;
1063
1064                 OBDO_ALLOC(req->rq_oi.oi_oa);
1065                 if (req->rq_oi.oi_oa == NULL) {
1066                         OBD_FREE(req, sizeof(*req));
1067                         GOTO(out_set, rc = -ENOMEM);
1068                 }
1069                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1070                        sizeof(*req->rq_oi.oi_oa));
1071                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1072                 req->rq_oi.oi_oa->o_stripe_idx = i;
1073                 req->rq_oi.oi_cb_up = cb_setattr_update;
1074                 req->rq_oi.oi_capa = oinfo->oi_capa;
1075
1076                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1077                         int off = lov_stripe_offset(oinfo->oi_md,
1078                                                     oinfo->oi_oa->o_size, i,
1079                                                     &req->rq_oi.oi_oa->o_size);
1080
1081                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1082                                 req->rq_oi.oi_oa->o_size--;
1083
1084                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1085                                i, req->rq_oi.oi_oa->o_size,
1086                                oinfo->oi_oa->o_size);
1087                 }
1088                 lov_set_add_req(req, set);
1089         }
1090         if (!set->set_count)
1091                 GOTO(out_set, rc = -EIO);
1092         *reqset = set;
1093         return rc;
1094 out_set:
1095         lov_fini_setattr_set(set);
1096         return rc;
1097 }
1098
1099 int lov_fini_punch_set(struct lov_request_set *set)
1100 {
1101         int rc = 0;
1102
1103         if (set == NULL)
1104                 return 0;
1105         LASSERT(set->set_exp);
1106         if (atomic_read(&set->set_completes)) {
1107                 rc = -EIO;
1108                 /* FIXME update qos data here */
1109                 if (atomic_read(&set->set_success))
1110                         rc = common_attr_done(set);
1111         }
1112
1113         lov_put_reqset(set);
1114
1115         return rc;
1116 }
1117
1118 int lov_update_punch_set(struct lov_request_set *set,
1119                          struct lov_request *req, int rc)
1120 {
1121         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1122         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1123
1124         lov_update_set(set, req, rc);
1125
1126         /* grace error on inactive ost */
1127         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1128                 rc = 0;
1129
1130         if (rc == 0) {
1131                 lov_stripe_lock(lsm);
1132                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1133                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1134                                 req->rq_oi.oi_oa->o_blocks;
1135                 }
1136
1137                 lov_stripe_unlock(lsm);
1138         }
1139
1140         return rc;
1141 }
1142
1143 /* The callback for osc_punch that finilizes a request info when a response
1144  * is received. */
1145 static int cb_update_punch(void *cookie, int rc)
1146 {
1147         struct obd_info *oinfo = cookie;
1148         struct lov_request *lovreq;
1149         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1150         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1151 }
1152
1153 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1154                        struct obd_trans_info *oti,
1155                        struct lov_request_set **reqset)
1156 {
1157         struct lov_request_set *set;
1158         struct lov_obd *lov = &exp->exp_obd->u.lov;
1159         int rc = 0, i;
1160
1161         OBD_ALLOC(set, sizeof(*set));
1162         if (set == NULL)
1163                 return -ENOMEM;
1164         lov_init_set(set);
1165
1166         set->set_oi = oinfo;
1167         set->set_exp = exp;
1168
1169         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1170                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1171                 struct lov_request *req;
1172                 obd_off rs, re;
1173
1174                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1175                                            oinfo->oi_policy.l_extent.start,
1176                                            oinfo->oi_policy.l_extent.end,
1177                                            &rs, &re))
1178                         continue;
1179
1180                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1181                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1182                         GOTO(out_set, rc = -EIO);
1183                 }
1184
1185                 OBD_ALLOC(req, sizeof(*req));
1186                 if (req == NULL)
1187                         GOTO(out_set, rc = -ENOMEM);
1188                 req->rq_stripe = i;
1189                 req->rq_idx = loi->loi_ost_idx;
1190
1191                 OBDO_ALLOC(req->rq_oi.oi_oa);
1192                 if (req->rq_oi.oi_oa == NULL) {
1193                         OBD_FREE(req, sizeof(*req));
1194                         GOTO(out_set, rc = -ENOMEM);
1195                 }
1196                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1197                        sizeof(*req->rq_oi.oi_oa));
1198                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1199                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1200
1201                 req->rq_oi.oi_oa->o_stripe_idx = i;
1202                 req->rq_oi.oi_cb_up = cb_update_punch;
1203
1204                 req->rq_oi.oi_policy.l_extent.start = rs;
1205                 req->rq_oi.oi_policy.l_extent.end = re;
1206                 req->rq_oi.oi_policy.l_extent.gid = -1;
1207
1208                 req->rq_oi.oi_capa = oinfo->oi_capa;
1209
1210                 lov_set_add_req(req, set);
1211         }
1212         if (!set->set_count)
1213                 GOTO(out_set, rc = -EIO);
1214         *reqset = set;
1215         return rc;
1216 out_set:
1217         lov_fini_punch_set(set);
1218         return rc;
1219 }
1220
1221 int lov_fini_sync_set(struct lov_request_set *set)
1222 {
1223         int rc = 0;
1224
1225         if (set == NULL)
1226                 return 0;
1227         LASSERT(set->set_exp);
1228         if (atomic_read(&set->set_completes)) {
1229                 if (!atomic_read(&set->set_success))
1230                         rc = -EIO;
1231                 /* FIXME update qos data here */
1232         }
1233
1234         lov_put_reqset(set);
1235
1236         return rc;
1237 }
1238
1239 /* The callback for osc_sync that finilizes a request info when a
1240  * response is recieved. */
1241 static int cb_sync_update(void *cookie, int rc)
1242 {
1243         struct obd_info *oinfo = cookie;
1244         struct lov_request *lovreq;
1245
1246         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1247         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1248 }
1249
1250 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1251                       obd_off start, obd_off end,
1252                       struct lov_request_set **reqset)
1253 {
1254         struct lov_request_set *set;
1255         struct lov_obd *lov = &exp->exp_obd->u.lov;
1256         int rc = 0, i;
1257
1258         OBD_ALLOC_PTR(set);
1259         if (set == NULL)
1260                 return -ENOMEM;
1261         lov_init_set(set);
1262
1263         set->set_exp = exp;
1264         set->set_oi = oinfo;
1265
1266         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1267                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1268                 struct lov_request *req;
1269                 obd_off rs, re;
1270
1271                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1272                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1273                         continue;
1274                 }
1275
1276                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1277                                            &re))
1278                         continue;
1279
1280                 OBD_ALLOC_PTR(req);
1281                 if (req == NULL)
1282                         GOTO(out_set, rc = -ENOMEM);
1283                 req->rq_stripe = i;
1284                 req->rq_idx = loi->loi_ost_idx;
1285
1286                 OBDO_ALLOC(req->rq_oi.oi_oa);
1287                 if (req->rq_oi.oi_oa == NULL) {
1288                         OBD_FREE(req, sizeof(*req));
1289                         GOTO(out_set, rc = -ENOMEM);
1290                 }
1291                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1292                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1293                 req->rq_oi.oi_oa->o_stripe_idx = i;
1294
1295                 req->rq_oi.oi_policy.l_extent.start = rs;
1296                 req->rq_oi.oi_policy.l_extent.end = re;
1297                 req->rq_oi.oi_policy.l_extent.gid = -1;
1298                 req->rq_oi.oi_cb_up = cb_sync_update;
1299
1300                 lov_set_add_req(req, set);
1301         }
1302         if (!set->set_count)
1303                 GOTO(out_set, rc = -EIO);
1304         *reqset = set;
1305         return rc;
1306 out_set:
1307         lov_fini_sync_set(set);
1308         return rc;
1309 }
1310
1311 #define LOV_U64_MAX ((__u64)~0ULL)
1312 #define LOV_SUM_MAX(tot, add)                                      \
1313         do {                                                        \
1314                 if ((tot) + (add) < (tot))                            \
1315                         (tot) = LOV_U64_MAX;                        \
1316                 else                                                \
1317                         (tot) += (add);                          \
1318         } while(0)
1319
1320 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1321 {
1322         if (success) {
1323                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1324                                                            LOV_MAGIC, 0);
1325                 if (osfs->os_files != LOV_U64_MAX)
1326                         lov_do_div64(osfs->os_files, expected_stripes);
1327                 if (osfs->os_ffree != LOV_U64_MAX)
1328                         lov_do_div64(osfs->os_ffree, expected_stripes);
1329
1330                 spin_lock(&obd->obd_osfs_lock);
1331                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1332                 obd->obd_osfs_age = cfs_time_current_64();
1333                 spin_unlock(&obd->obd_osfs_lock);
1334                 return 0;
1335         }
1336
1337         return -EIO;
1338 }
1339
1340 int lov_fini_statfs_set(struct lov_request_set *set)
1341 {
1342         int rc = 0;
1343
1344         if (set == NULL)
1345                 return 0;
1346
1347         if (atomic_read(&set->set_completes)) {
1348                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1349                                      atomic_read(&set->set_success));
1350         }
1351         lov_put_reqset(set);
1352         return rc;
1353 }
1354
1355 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1356                        int success)
1357 {
1358         int shift = 0, quit = 0;
1359         __u64 tmp;
1360
1361         if (success == 0) {
1362                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1363         } else {
1364                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1365                         /* assume all block sizes are always powers of 2 */
1366                         /* get the bits difference */
1367                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1368                         for (shift = 0; shift <= 64; ++shift) {
1369                                 if (tmp & 1) {
1370                                         if (quit)
1371                                                 break;
1372                                         else
1373                                                 quit = 1;
1374                                         shift = 0;
1375                                 }
1376                                 tmp >>= 1;
1377                         }
1378                 }
1379
1380                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1381                         osfs->os_bsize = lov_sfs->os_bsize;
1382
1383                         osfs->os_bfree  >>= shift;
1384                         osfs->os_bavail >>= shift;
1385                         osfs->os_blocks >>= shift;
1386                 } else if (shift != 0) {
1387                         lov_sfs->os_bfree  >>= shift;
1388                         lov_sfs->os_bavail >>= shift;
1389                         lov_sfs->os_blocks >>= shift;
1390                 }
1391                 osfs->os_bfree += lov_sfs->os_bfree;
1392                 osfs->os_bavail += lov_sfs->os_bavail;
1393                 osfs->os_blocks += lov_sfs->os_blocks;
1394                 /* XXX not sure about this one - depends on policy.
1395                  *   - could be minimum if we always stripe on all OBDs
1396                  *     (but that would be wrong for any other policy,
1397                  *     if one of the OBDs has no more objects left)
1398                  *   - could be sum if we stripe whole objects
1399                  *   - could be average, just to give a nice number
1400                  *
1401                  * To give a "reasonable" (if not wholly accurate)
1402                  * number, we divide the total number of free objects
1403                  * by expected stripe count (watch out for overflow).
1404                  */
1405                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1406                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1407         }
1408 }
1409
1410 /* The callback for osc_statfs_async that finilizes a request info when a
1411  * response is received. */
1412 static int cb_statfs_update(void *cookie, int rc)
1413 {
1414         struct obd_info *oinfo = cookie;
1415         struct lov_request *lovreq;
1416         struct lov_request_set *set;
1417         struct obd_statfs *osfs, *lov_sfs;
1418         struct lov_obd *lov;
1419         struct lov_tgt_desc *tgt;
1420         struct obd_device *lovobd, *tgtobd;
1421         int success;
1422
1423         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1424         set = lovreq->rq_rqset;
1425         lovobd = set->set_obd;
1426         lov = &lovobd->u.lov;
1427         osfs = set->set_oi->oi_osfs;
1428         lov_sfs = oinfo->oi_osfs;
1429         success = atomic_read(&set->set_success);
1430         /* XXX: the same is done in lov_update_common_set, however
1431            lovset->set_exp is not initialized. */
1432         lov_update_set(set, lovreq, rc);
1433         if (rc)
1434                 GOTO(out, rc);
1435
1436         obd_getref(lovobd);
1437         tgt = lov->lov_tgts[lovreq->rq_idx];
1438         if (!tgt || !tgt->ltd_active)
1439                 GOTO(out_update, rc);
1440
1441         tgtobd = class_exp2obd(tgt->ltd_exp);
1442         spin_lock(&tgtobd->obd_osfs_lock);
1443         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1444         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1445                 tgtobd->obd_osfs_age = cfs_time_current_64();
1446         spin_unlock(&tgtobd->obd_osfs_lock);
1447
1448 out_update:
1449         lov_update_statfs(osfs, lov_sfs, success);
1450         obd_putref(lovobd);
1451
1452 out:
1453         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1454             lov_set_finished(set, 0)) {
1455                 lov_statfs_interpret(NULL, set, set->set_count !=
1456                                      atomic_read(&set->set_success));
1457         }
1458
1459         return 0;
1460 }
1461
1462 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1463                         struct lov_request_set **reqset)
1464 {
1465         struct lov_request_set *set;
1466         struct lov_obd *lov = &obd->u.lov;
1467         int rc = 0, i;
1468
1469         OBD_ALLOC(set, sizeof(*set));
1470         if (set == NULL)
1471                 return -ENOMEM;
1472         lov_init_set(set);
1473
1474         set->set_obd = obd;
1475         set->set_oi = oinfo;
1476
1477         /* We only get block data from the OBD */
1478         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1479                 struct lov_request *req;
1480
1481                 if (lov->lov_tgts[i] == NULL ||
1482                     (!lov_check_and_wait_active(lov, i) &&
1483                      (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1484                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1485                         continue;
1486                 }
1487
1488                 /* skip targets that have been explicitely disabled by the
1489                  * administrator */
1490                 if (!lov->lov_tgts[i]->ltd_exp) {
1491                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1492                         continue;
1493                 }
1494
1495                 OBD_ALLOC(req, sizeof(*req));
1496                 if (req == NULL)
1497                         GOTO(out_set, rc = -ENOMEM);
1498
1499                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1500                 if (req->rq_oi.oi_osfs == NULL) {
1501                         OBD_FREE(req, sizeof(*req));
1502                         GOTO(out_set, rc = -ENOMEM);
1503                 }
1504
1505                 req->rq_idx = i;
1506                 req->rq_oi.oi_cb_up = cb_statfs_update;
1507                 req->rq_oi.oi_flags = oinfo->oi_flags;
1508
1509                 lov_set_add_req(req, set);
1510         }
1511         if (!set->set_count)
1512                 GOTO(out_set, rc = -EIO);
1513         *reqset = set;
1514         return rc;
1515 out_set:
1516         lov_fini_statfs_set(set);
1517         return rc;
1518 }