]> Pileus Git - ~andy/linux/blob - drivers/staging/lustre/lustre/obdclass/cl_object.c
Linux 3.14
[~andy/linux] / drivers / staging / lustre / lustre / obdclass / cl_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Lustre Object.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 /*
42  * Locking.
43  *
44  *  i_mutex
45  *      PG_locked
46  *        ->coh_page_guard
47  *        ->coh_lock_guard
48  *        ->coh_attr_guard
49  *        ->ls_guard
50  */
51
52 #define DEBUG_SUBSYSTEM S_CLASS
53
54 #include <linux/libcfs/libcfs.h>
55 /* class_put_type() */
56 #include <obd_class.h>
57 #include <obd_support.h>
58 #include <lustre_fid.h>
59 #include <linux/list.h>
60 #include <linux/libcfs/libcfs_hash.h> /* for cfs_hash stuff */
61 #include <cl_object.h>
62 #include "cl_internal.h"
63
64 static struct kmem_cache *cl_env_kmem;
65
66 /** Lock class of cl_object_header::coh_page_guard */
67 static struct lock_class_key cl_page_guard_class;
68 /** Lock class of cl_object_header::coh_lock_guard */
69 static struct lock_class_key cl_lock_guard_class;
70 /** Lock class of cl_object_header::coh_attr_guard */
71 static struct lock_class_key cl_attr_guard_class;
72
73 extern __u32 lu_context_tags_default;
74 extern __u32 lu_session_tags_default;
75 /**
76  * Initialize cl_object_header.
77  */
78 int cl_object_header_init(struct cl_object_header *h)
79 {
80         int result;
81
82         result = lu_object_header_init(&h->coh_lu);
83         if (result == 0) {
84                 spin_lock_init(&h->coh_page_guard);
85                 spin_lock_init(&h->coh_lock_guard);
86                 spin_lock_init(&h->coh_attr_guard);
87                 lockdep_set_class(&h->coh_page_guard, &cl_page_guard_class);
88                 lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
89                 lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
90                 h->coh_pages = 0;
91                 /* XXX hard coded GFP_* mask. */
92                 INIT_RADIX_TREE(&h->coh_tree, GFP_ATOMIC);
93                 INIT_LIST_HEAD(&h->coh_locks);
94                 h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8);
95         }
96         return result;
97 }
98 EXPORT_SYMBOL(cl_object_header_init);
99
100 /**
101  * Finalize cl_object_header.
102  */
103 void cl_object_header_fini(struct cl_object_header *h)
104 {
105         LASSERT(list_empty(&h->coh_locks));
106         lu_object_header_fini(&h->coh_lu);
107 }
108 EXPORT_SYMBOL(cl_object_header_fini);
109
110 /**
111  * Returns a cl_object with a given \a fid.
112  *
113  * Returns either cached or newly created object. Additional reference on the
114  * returned object is acquired.
115  *
116  * \see lu_object_find(), cl_page_find(), cl_lock_find()
117  */
118 struct cl_object *cl_object_find(const struct lu_env *env,
119                                  struct cl_device *cd, const struct lu_fid *fid,
120                                  const struct cl_object_conf *c)
121 {
122         might_sleep();
123         return lu2cl(lu_object_find_slice(env, cl2lu_dev(cd), fid, &c->coc_lu));
124 }
125 EXPORT_SYMBOL(cl_object_find);
126
127 /**
128  * Releases a reference on \a o.
129  *
130  * When last reference is released object is returned to the cache, unless
131  * lu_object_header_flags::LU_OBJECT_HEARD_BANSHEE bit is set in its header.
132  *
133  * \see cl_page_put(), cl_lock_put().
134  */
135 void cl_object_put(const struct lu_env *env, struct cl_object *o)
136 {
137         lu_object_put(env, &o->co_lu);
138 }
139 EXPORT_SYMBOL(cl_object_put);
140
141 /**
142  * Acquire an additional reference to the object \a o.
143  *
144  * This can only be used to acquire _additional_ reference, i.e., caller
145  * already has to possess at least one reference to \a o before calling this.
146  *
147  * \see cl_page_get(), cl_lock_get().
148  */
149 void cl_object_get(struct cl_object *o)
150 {
151         lu_object_get(&o->co_lu);
152 }
153 EXPORT_SYMBOL(cl_object_get);
154
155 /**
156  * Returns the top-object for a given \a o.
157  *
158  * \see cl_page_top(), cl_io_top()
159  */
160 struct cl_object *cl_object_top(struct cl_object *o)
161 {
162         struct cl_object_header *hdr = cl_object_header(o);
163         struct cl_object *top;
164
165         while (hdr->coh_parent != NULL)
166                 hdr = hdr->coh_parent;
167
168         top = lu2cl(lu_object_top(&hdr->coh_lu));
169         CDEBUG(D_TRACE, "%p -> %p\n", o, top);
170         return top;
171 }
172 EXPORT_SYMBOL(cl_object_top);
173
174 /**
175  * Returns pointer to the lock protecting data-attributes for the given object
176  * \a o.
177  *
178  * Data-attributes are protected by the cl_object_header::coh_attr_guard
179  * spin-lock in the top-object.
180  *
181  * \see cl_attr, cl_object_attr_lock(), cl_object_operations::coo_attr_get().
182  */
183 static spinlock_t *cl_object_attr_guard(struct cl_object *o)
184 {
185         return &cl_object_header(cl_object_top(o))->coh_attr_guard;
186 }
187
188 /**
189  * Locks data-attributes.
190  *
191  * Prevents data-attributes from changing, until lock is released by
192  * cl_object_attr_unlock(). This has to be called before calls to
193  * cl_object_attr_get(), cl_object_attr_set().
194  */
195 void cl_object_attr_lock(struct cl_object *o)
196 {
197         spin_lock(cl_object_attr_guard(o));
198 }
199 EXPORT_SYMBOL(cl_object_attr_lock);
200
201 /**
202  * Releases data-attributes lock, acquired by cl_object_attr_lock().
203  */
204 void cl_object_attr_unlock(struct cl_object *o)
205 {
206         spin_unlock(cl_object_attr_guard(o));
207 }
208 EXPORT_SYMBOL(cl_object_attr_unlock);
209
210 /**
211  * Returns data-attributes of an object \a obj.
212  *
213  * Every layer is asked (by calling cl_object_operations::coo_attr_get())
214  * top-to-bottom to fill in parts of \a attr that this layer is responsible
215  * for.
216  */
217 int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
218                        struct cl_attr *attr)
219 {
220         struct lu_object_header *top;
221         int result;
222
223         LASSERT(spin_is_locked(cl_object_attr_guard(obj)));
224
225         top = obj->co_lu.lo_header;
226         result = 0;
227         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
228                 if (obj->co_ops->coo_attr_get != NULL) {
229                         result = obj->co_ops->coo_attr_get(env, obj, attr);
230                         if (result != 0) {
231                                 if (result > 0)
232                                         result = 0;
233                                 break;
234                         }
235                 }
236         }
237         return result;
238 }
239 EXPORT_SYMBOL(cl_object_attr_get);
240
241 /**
242  * Updates data-attributes of an object \a obj.
243  *
244  * Only attributes, mentioned in a validness bit-mask \a v are
245  * updated. Calls cl_object_operations::coo_attr_set() on every layer, bottom
246  * to top.
247  */
248 int cl_object_attr_set(const struct lu_env *env, struct cl_object *obj,
249                        const struct cl_attr *attr, unsigned v)
250 {
251         struct lu_object_header *top;
252         int result;
253
254         LASSERT(spin_is_locked(cl_object_attr_guard(obj)));
255
256         top = obj->co_lu.lo_header;
257         result = 0;
258         list_for_each_entry_reverse(obj, &top->loh_layers,
259                                         co_lu.lo_linkage) {
260                 if (obj->co_ops->coo_attr_set != NULL) {
261                         result = obj->co_ops->coo_attr_set(env, obj, attr, v);
262                         if (result != 0) {
263                                 if (result > 0)
264                                         result = 0;
265                                 break;
266                         }
267                 }
268         }
269         return result;
270 }
271 EXPORT_SYMBOL(cl_object_attr_set);
272
273 /**
274  * Notifies layers (bottom-to-top) that glimpse AST was received.
275  *
276  * Layers have to fill \a lvb fields with information that will be shipped
277  * back to glimpse issuer.
278  *
279  * \see cl_lock_operations::clo_glimpse()
280  */
281 int cl_object_glimpse(const struct lu_env *env, struct cl_object *obj,
282                       struct ost_lvb *lvb)
283 {
284         struct lu_object_header *top;
285         int result;
286
287         top = obj->co_lu.lo_header;
288         result = 0;
289         list_for_each_entry_reverse(obj, &top->loh_layers,
290                                         co_lu.lo_linkage) {
291                 if (obj->co_ops->coo_glimpse != NULL) {
292                         result = obj->co_ops->coo_glimpse(env, obj, lvb);
293                         if (result != 0)
294                                 break;
295                 }
296         }
297         LU_OBJECT_HEADER(D_DLMTRACE, env, lu_object_top(top),
298                          "size: "LPU64" mtime: "LPU64" atime: "LPU64" "
299                          "ctime: "LPU64" blocks: "LPU64"\n",
300                          lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime,
301                          lvb->lvb_ctime, lvb->lvb_blocks);
302         return result;
303 }
304 EXPORT_SYMBOL(cl_object_glimpse);
305
306 /**
307  * Updates a configuration of an object \a obj.
308  */
309 int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
310                 const struct cl_object_conf *conf)
311 {
312         struct lu_object_header *top;
313         int result;
314
315         top = obj->co_lu.lo_header;
316         result = 0;
317         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
318                 if (obj->co_ops->coo_conf_set != NULL) {
319                         result = obj->co_ops->coo_conf_set(env, obj, conf);
320                         if (result != 0)
321                                 break;
322                 }
323         }
324         return result;
325 }
326 EXPORT_SYMBOL(cl_conf_set);
327
328 /**
329  * Helper function removing all object locks, and marking object for
330  * deletion. All object pages must have been deleted at this point.
331  *
332  * This is called by cl_inode_fini() and lov_object_delete() to destroy top-
333  * and sub- objects respectively.
334  */
335 void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
336 {
337         struct cl_object_header *hdr;
338
339         hdr = cl_object_header(obj);
340         LASSERT(hdr->coh_tree.rnode == NULL);
341         LASSERT(hdr->coh_pages == 0);
342
343         set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
344         /*
345          * Destroy all locks. Object destruction (including cl_inode_fini())
346          * cannot cancel the locks, because in the case of a local client,
347          * where client and server share the same thread running
348          * prune_icache(), this can dead-lock with ldlm_cancel_handler()
349          * waiting on __wait_on_freeing_inode().
350          */
351         cl_locks_prune(env, obj, 0);
352 }
353 EXPORT_SYMBOL(cl_object_kill);
354
355 /**
356  * Prunes caches of pages and locks for this object.
357  */
358 void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
359 {
360         cl_pages_prune(env, obj);
361         cl_locks_prune(env, obj, 1);
362 }
363 EXPORT_SYMBOL(cl_object_prune);
364
365 /**
366  * Check if the object has locks.
367  */
368 int cl_object_has_locks(struct cl_object *obj)
369 {
370         struct cl_object_header *head = cl_object_header(obj);
371         int has;
372
373         spin_lock(&head->coh_lock_guard);
374         has = list_empty(&head->coh_locks);
375         spin_unlock(&head->coh_lock_guard);
376
377         return (has == 0);
378 }
379 EXPORT_SYMBOL(cl_object_has_locks);
380
381 void cache_stats_init(struct cache_stats *cs, const char *name)
382 {
383         int i;
384
385         cs->cs_name = name;
386         for (i = 0; i < CS_NR; i++)
387                 atomic_set(&cs->cs_stats[i], 0);
388 }
389
390 int cache_stats_print(const struct cache_stats *cs, struct seq_file *m, int h)
391 {
392         int i;
393         /*
394          *   lookup    hit    total  cached create
395          * env: ...... ...... ...... ...... ......
396          */
397         if (h) {
398                 const char *names[CS_NR] = CS_NAMES;
399
400                 seq_printf(m, "%6s", " ");
401                 for (i = 0; i < CS_NR; i++)
402                         seq_printf(m, "%8s", names[i]);
403                 seq_printf(m, "\n");
404         }
405
406         seq_printf(m, "%5.5s:", cs->cs_name);
407         for (i = 0; i < CS_NR; i++)
408                 seq_printf(m, "%8u", atomic_read(&cs->cs_stats[i]));
409         return 0;
410 }
411
412 /**
413  * Initialize client site.
414  *
415  * Perform common initialization (lu_site_init()), and initialize statistical
416  * counters. Also perform global initializations on the first call.
417  */
418 int cl_site_init(struct cl_site *s, struct cl_device *d)
419 {
420         int i;
421         int result;
422
423         result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
424         if (result == 0) {
425                 cache_stats_init(&s->cs_pages, "pages");
426                 cache_stats_init(&s->cs_locks, "locks");
427                 for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
428                         atomic_set(&s->cs_pages_state[0], 0);
429                 for (i = 0; i < ARRAY_SIZE(s->cs_locks_state); ++i)
430                         atomic_set(&s->cs_locks_state[i], 0);
431         }
432         return result;
433 }
434 EXPORT_SYMBOL(cl_site_init);
435
436 /**
437  * Finalize client site. Dual to cl_site_init().
438  */
439 void cl_site_fini(struct cl_site *s)
440 {
441         lu_site_fini(&s->cs_lu);
442 }
443 EXPORT_SYMBOL(cl_site_fini);
444
445 static struct cache_stats cl_env_stats = {
446         .cs_name    = "envs",
447         .cs_stats = { ATOMIC_INIT(0), }
448 };
449
450 /**
451  * Outputs client site statistical counters into a buffer. Suitable for
452  * ll_rd_*()-style functions.
453  */
454 int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
455 {
456         int i;
457         static const char *pstate[] = {
458                 [CPS_CACHED]  = "c",
459                 [CPS_OWNED]   = "o",
460                 [CPS_PAGEOUT] = "w",
461                 [CPS_PAGEIN]  = "r",
462                 [CPS_FREEING] = "f"
463         };
464         static const char *lstate[] = {
465                 [CLS_NEW]       = "n",
466                 [CLS_QUEUING]   = "q",
467                 [CLS_ENQUEUED]  = "e",
468                 [CLS_HELD]      = "h",
469                 [CLS_INTRANSIT] = "t",
470                 [CLS_CACHED]    = "c",
471                 [CLS_FREEING]   = "f"
472         };
473 /*
474        lookup    hit  total   busy create
475 pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
476 locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
477   env: ...... ...... ...... ...... ......
478  */
479         lu_site_stats_print(&site->cs_lu, m);
480         cache_stats_print(&site->cs_pages, m, 1);
481         seq_printf(m, " [");
482         for (i = 0; i < ARRAY_SIZE(site->cs_pages_state); ++i)
483                 seq_printf(m, "%s: %u ", pstate[i],
484                                 atomic_read(&site->cs_pages_state[i]));
485         seq_printf(m, "]\n");
486         cache_stats_print(&site->cs_locks, m, 0);
487         seq_printf(m, " [");
488         for (i = 0; i < ARRAY_SIZE(site->cs_locks_state); ++i)
489                 seq_printf(m, "%s: %u ", lstate[i],
490                                 atomic_read(&site->cs_locks_state[i]));
491         seq_printf(m, "]\n");
492         cache_stats_print(&cl_env_stats, m, 0);
493         seq_printf(m, "\n");
494         return 0;
495 }
496 EXPORT_SYMBOL(cl_site_stats_print);
497
498 /*****************************************************************************
499  *
500  * lu_env handling on client.
501  *
502  */
503
504 /**
505  * The most efficient way is to store cl_env pointer in task specific
506  * structures. On Linux, it wont' be easy to use task_struct->journal_info
507  * because Lustre code may call into other fs which has certain assumptions
508  * about journal_info. Currently following fields in task_struct are identified
509  * can be used for this purpose:
510  *  - cl_env: for liblustre.
511  *  - tux_info: ony on RedHat kernel.
512  *  - ...
513  * \note As long as we use task_struct to store cl_env, we assume that once
514  * called into Lustre, we'll never call into the other part of the kernel
515  * which will use those fields in task_struct without explicitly exiting
516  * Lustre.
517  *
518  * If there's no space in task_struct is available, hash will be used.
519  * bz20044, bz22683.
520  */
521
522 struct cl_env {
523         void         *ce_magic;
524         struct lu_env     ce_lu;
525         struct lu_context ce_ses;
526
527         /**
528          * This allows cl_env to be entered into cl_env_hash which implements
529          * the current thread -> client environment lookup.
530          */
531         struct hlist_node  ce_node;
532         /**
533          * Owner for the current cl_env.
534          *
535          * If LL_TASK_CL_ENV is defined, this point to the owning current,
536          * only for debugging purpose ;
537          * Otherwise hash is used, and this is the key for cfs_hash.
538          * Now current thread pid is stored. Note using thread pointer would
539          * lead to unbalanced hash because of its specific allocation locality
540          * and could be varied for different platforms and OSes, even different
541          * OS versions.
542          */
543         void         *ce_owner;
544
545         /*
546          * Linkage into global list of all client environments. Used for
547          * garbage collection.
548          */
549         struct list_head        ce_linkage;
550         /*
551          *
552          */
553         int            ce_ref;
554         /*
555          * Debugging field: address of the caller who made original
556          * allocation.
557          */
558         void         *ce_debug;
559 };
560
561 #define CL_ENV_INC(counter)
562 #define CL_ENV_DEC(counter)
563
564 static void cl_env_init0(struct cl_env *cle, void *debug)
565 {
566         LASSERT(cle->ce_ref == 0);
567         LASSERT(cle->ce_magic == &cl_env_init0);
568         LASSERT(cle->ce_debug == NULL && cle->ce_owner == NULL);
569
570         cle->ce_ref = 1;
571         cle->ce_debug = debug;
572         CL_ENV_INC(busy);
573 }
574
575
576 /*
577  * The implementation of using hash table to connect cl_env and thread
578  */
579
580 static struct cfs_hash *cl_env_hash;
581
582 static unsigned cl_env_hops_hash(struct cfs_hash *lh,
583                                  const void *key, unsigned mask)
584 {
585 #if BITS_PER_LONG == 64
586         return cfs_hash_u64_hash((__u64)key, mask);
587 #else
588         return cfs_hash_u32_hash((__u32)key, mask);
589 #endif
590 }
591
592 static void *cl_env_hops_obj(struct hlist_node *hn)
593 {
594         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
595         LASSERT(cle->ce_magic == &cl_env_init0);
596         return (void *)cle;
597 }
598
599 static int cl_env_hops_keycmp(const void *key, struct hlist_node *hn)
600 {
601         struct cl_env *cle = cl_env_hops_obj(hn);
602
603         LASSERT(cle->ce_owner != NULL);
604         return (key == cle->ce_owner);
605 }
606
607 static void cl_env_hops_noop(struct cfs_hash *hs, struct hlist_node *hn)
608 {
609         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
610         LASSERT(cle->ce_magic == &cl_env_init0);
611 }
612
613 static cfs_hash_ops_t cl_env_hops = {
614         .hs_hash        = cl_env_hops_hash,
615         .hs_key  = cl_env_hops_obj,
616         .hs_keycmp      = cl_env_hops_keycmp,
617         .hs_object      = cl_env_hops_obj,
618         .hs_get  = cl_env_hops_noop,
619         .hs_put_locked  = cl_env_hops_noop,
620 };
621
622 static inline struct cl_env *cl_env_fetch(void)
623 {
624         struct cl_env *cle;
625
626         cle = cfs_hash_lookup(cl_env_hash, (void *) (long) current->pid);
627         LASSERT(ergo(cle, cle->ce_magic == &cl_env_init0));
628         return cle;
629 }
630
631 static inline void cl_env_attach(struct cl_env *cle)
632 {
633         if (cle) {
634                 int rc;
635
636                 LASSERT(cle->ce_owner == NULL);
637                 cle->ce_owner = (void *) (long) current->pid;
638                 rc = cfs_hash_add_unique(cl_env_hash, cle->ce_owner,
639                                          &cle->ce_node);
640                 LASSERT(rc == 0);
641         }
642 }
643
644 static inline void cl_env_do_detach(struct cl_env *cle)
645 {
646         void *cookie;
647
648         LASSERT(cle->ce_owner == (void *) (long) current->pid);
649         cookie = cfs_hash_del(cl_env_hash, cle->ce_owner,
650                               &cle->ce_node);
651         LASSERT(cookie == cle);
652         cle->ce_owner = NULL;
653 }
654
655 static int cl_env_store_init(void) {
656         cl_env_hash = cfs_hash_create("cl_env",
657                                       HASH_CL_ENV_BITS, HASH_CL_ENV_BITS,
658                                       HASH_CL_ENV_BKT_BITS, 0,
659                                       CFS_HASH_MIN_THETA,
660                                       CFS_HASH_MAX_THETA,
661                                       &cl_env_hops,
662                                       CFS_HASH_RW_BKTLOCK);
663         return cl_env_hash != NULL ? 0 :-ENOMEM;
664 }
665
666 static void cl_env_store_fini(void) {
667         cfs_hash_putref(cl_env_hash);
668 }
669
670
671 static inline struct cl_env *cl_env_detach(struct cl_env *cle)
672 {
673         if (cle == NULL)
674                 cle = cl_env_fetch();
675
676         if (cle && cle->ce_owner)
677                 cl_env_do_detach(cle);
678
679         return cle;
680 }
681
682 static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
683 {
684         struct lu_env *env;
685         struct cl_env *cle;
686
687         OBD_SLAB_ALLOC_PTR_GFP(cle, cl_env_kmem, __GFP_IO);
688         if (cle != NULL) {
689                 int rc;
690
691                 INIT_LIST_HEAD(&cle->ce_linkage);
692                 cle->ce_magic = &cl_env_init0;
693                 env = &cle->ce_lu;
694                 rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
695                 if (rc == 0) {
696                         rc = lu_context_init(&cle->ce_ses,
697                                              LCT_SESSION | ses_tags);
698                         if (rc == 0) {
699                                 lu_context_enter(&cle->ce_ses);
700                                 env->le_ses = &cle->ce_ses;
701                                 cl_env_init0(cle, debug);
702                         } else
703                                 lu_env_fini(env);
704                 }
705                 if (rc != 0) {
706                         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
707                         env = ERR_PTR(rc);
708                 } else {
709                         CL_ENV_INC(create);
710                         CL_ENV_INC(total);
711                 }
712         } else
713                 env = ERR_PTR(-ENOMEM);
714         return env;
715 }
716
717 static void cl_env_fini(struct cl_env *cle)
718 {
719         CL_ENV_DEC(total);
720         lu_context_fini(&cle->ce_lu.le_ctx);
721         lu_context_fini(&cle->ce_ses);
722         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
723 }
724
725 static inline struct cl_env *cl_env_container(struct lu_env *env)
726 {
727         return container_of(env, struct cl_env, ce_lu);
728 }
729
730 struct lu_env *cl_env_peek(int *refcheck)
731 {
732         struct lu_env *env;
733         struct cl_env *cle;
734
735         CL_ENV_INC(lookup);
736
737         /* check that we don't go far from untrusted pointer */
738         CLASSERT(offsetof(struct cl_env, ce_magic) == 0);
739
740         env = NULL;
741         cle = cl_env_fetch();
742         if (cle != NULL) {
743                 CL_ENV_INC(hit);
744                 env = &cle->ce_lu;
745                 *refcheck = ++cle->ce_ref;
746         }
747         CDEBUG(D_OTHER, "%d@%p\n", cle ? cle->ce_ref : 0, cle);
748         return env;
749 }
750 EXPORT_SYMBOL(cl_env_peek);
751
752 /**
753  * Returns lu_env: if there already is an environment associated with the
754  * current thread, it is returned, otherwise, new environment is allocated.
755  *
756  * \param refcheck pointer to a counter used to detect environment leaks. In
757  * the usual case cl_env_get() and cl_env_put() are called in the same lexical
758  * scope and pointer to the same integer is passed as \a refcheck. This is
759  * used to detect missed cl_env_put().
760  *
761  * \see cl_env_put()
762  */
763 struct lu_env *cl_env_get(int *refcheck)
764 {
765         struct lu_env *env;
766
767         env = cl_env_peek(refcheck);
768         if (env == NULL) {
769                 env = cl_env_new(lu_context_tags_default,
770                                  lu_session_tags_default,
771                                  __builtin_return_address(0));
772
773                 if (!IS_ERR(env)) {
774                         struct cl_env *cle;
775
776                         cle = cl_env_container(env);
777                         cl_env_attach(cle);
778                         *refcheck = cle->ce_ref;
779                         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
780                 }
781         }
782         return env;
783 }
784 EXPORT_SYMBOL(cl_env_get);
785
786 /**
787  * Forces an allocation of a fresh environment with given tags.
788  *
789  * \see cl_env_get()
790  */
791 struct lu_env *cl_env_alloc(int *refcheck, __u32 tags)
792 {
793         struct lu_env *env;
794
795         LASSERT(cl_env_peek(refcheck) == NULL);
796         env = cl_env_new(tags, tags, __builtin_return_address(0));
797         if (!IS_ERR(env)) {
798                 struct cl_env *cle;
799
800                 cle = cl_env_container(env);
801                 *refcheck = cle->ce_ref;
802                 CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
803         }
804         return env;
805 }
806 EXPORT_SYMBOL(cl_env_alloc);
807
808 static void cl_env_exit(struct cl_env *cle)
809 {
810         LASSERT(cle->ce_owner == NULL);
811         lu_context_exit(&cle->ce_lu.le_ctx);
812         lu_context_exit(&cle->ce_ses);
813 }
814
815 /**
816  * Release an environment.
817  *
818  * Decrement \a env reference counter. When counter drops to 0, nothing in
819  * this thread is using environment and it is returned to the allocation
820  * cache, or freed straight away, if cache is large enough.
821  */
822 void cl_env_put(struct lu_env *env, int *refcheck)
823 {
824         struct cl_env *cle;
825
826         cle = cl_env_container(env);
827
828         LASSERT(cle->ce_ref > 0);
829         LASSERT(ergo(refcheck != NULL, cle->ce_ref == *refcheck));
830
831         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
832         if (--cle->ce_ref == 0) {
833                 CL_ENV_DEC(busy);
834                 cl_env_detach(cle);
835                 cle->ce_debug = NULL;
836                 cl_env_exit(cle);
837                 cl_env_fini(cle);
838         }
839 }
840 EXPORT_SYMBOL(cl_env_put);
841
842 /**
843  * Declares a point of re-entrancy.
844  *
845  * \see cl_env_reexit()
846  */
847 void *cl_env_reenter(void)
848 {
849         return cl_env_detach(NULL);
850 }
851 EXPORT_SYMBOL(cl_env_reenter);
852
853 /**
854  * Exits re-entrancy.
855  */
856 void cl_env_reexit(void *cookie)
857 {
858         cl_env_detach(NULL);
859         cl_env_attach(cookie);
860 }
861 EXPORT_SYMBOL(cl_env_reexit);
862
863 /**
864  * Setup user-supplied \a env as a current environment. This is to be used to
865  * guaranteed that environment exists even when cl_env_get() fails. It is up
866  * to user to ensure proper concurrency control.
867  *
868  * \see cl_env_unplant()
869  */
870 void cl_env_implant(struct lu_env *env, int *refcheck)
871 {
872         struct cl_env *cle = cl_env_container(env);
873
874         LASSERT(cle->ce_ref > 0);
875
876         cl_env_attach(cle);
877         cl_env_get(refcheck);
878         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
879 }
880 EXPORT_SYMBOL(cl_env_implant);
881
882 /**
883  * Detach environment installed earlier by cl_env_implant().
884  */
885 void cl_env_unplant(struct lu_env *env, int *refcheck)
886 {
887         struct cl_env *cle = cl_env_container(env);
888
889         LASSERT(cle->ce_ref > 1);
890
891         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
892
893         cl_env_detach(cle);
894         cl_env_put(env, refcheck);
895 }
896 EXPORT_SYMBOL(cl_env_unplant);
897
898 struct lu_env *cl_env_nested_get(struct cl_env_nest *nest)
899 {
900         struct lu_env *env;
901
902         nest->cen_cookie = NULL;
903         env = cl_env_peek(&nest->cen_refcheck);
904         if (env != NULL) {
905                 if (!cl_io_is_going(env))
906                         return env;
907                 else {
908                         cl_env_put(env, &nest->cen_refcheck);
909                         nest->cen_cookie = cl_env_reenter();
910                 }
911         }
912         env = cl_env_get(&nest->cen_refcheck);
913         if (IS_ERR(env)) {
914                 cl_env_reexit(nest->cen_cookie);
915                 return env;
916         }
917
918         LASSERT(!cl_io_is_going(env));
919         return env;
920 }
921 EXPORT_SYMBOL(cl_env_nested_get);
922
923 void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env)
924 {
925         cl_env_put(env, &nest->cen_refcheck);
926         cl_env_reexit(nest->cen_cookie);
927 }
928 EXPORT_SYMBOL(cl_env_nested_put);
929
930 /**
931  * Converts struct cl_attr to struct ost_lvb.
932  *
933  * \see cl_lvb2attr
934  */
935 void cl_attr2lvb(struct ost_lvb *lvb, const struct cl_attr *attr)
936 {
937         lvb->lvb_size   = attr->cat_size;
938         lvb->lvb_mtime  = attr->cat_mtime;
939         lvb->lvb_atime  = attr->cat_atime;
940         lvb->lvb_ctime  = attr->cat_ctime;
941         lvb->lvb_blocks = attr->cat_blocks;
942 }
943 EXPORT_SYMBOL(cl_attr2lvb);
944
945 /**
946  * Converts struct ost_lvb to struct cl_attr.
947  *
948  * \see cl_attr2lvb
949  */
950 void cl_lvb2attr(struct cl_attr *attr, const struct ost_lvb *lvb)
951 {
952         attr->cat_size   = lvb->lvb_size;
953         attr->cat_mtime  = lvb->lvb_mtime;
954         attr->cat_atime  = lvb->lvb_atime;
955         attr->cat_ctime  = lvb->lvb_ctime;
956         attr->cat_blocks = lvb->lvb_blocks;
957 }
958 EXPORT_SYMBOL(cl_lvb2attr);
959
960 /*****************************************************************************
961  *
962  * Temporary prototype thing: mirror obd-devices into cl devices.
963  *
964  */
965
966 struct cl_device *cl_type_setup(const struct lu_env *env, struct lu_site *site,
967                                 struct lu_device_type *ldt,
968                                 struct lu_device *next)
969 {
970         const char       *typename;
971         struct lu_device *d;
972
973         LASSERT(ldt != NULL);
974
975         typename = ldt->ldt_name;
976         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, NULL);
977         if (!IS_ERR(d)) {
978                 int rc;
979
980                 if (site != NULL)
981                         d->ld_site = site;
982                 rc = ldt->ldt_ops->ldto_device_init(env, d, typename, next);
983                 if (rc == 0) {
984                         lu_device_get(d);
985                         lu_ref_add(&d->ld_reference,
986                                    "lu-stack", &lu_site_init);
987                 } else {
988                         ldt->ldt_ops->ldto_device_free(env, d);
989                         CERROR("can't init device '%s', %d\n", typename, rc);
990                         d = ERR_PTR(rc);
991                 }
992         } else
993                 CERROR("Cannot allocate device: '%s'\n", typename);
994         return lu2cl_dev(d);
995 }
996 EXPORT_SYMBOL(cl_type_setup);
997
998 /**
999  * Finalize device stack by calling lu_stack_fini().
1000  */
1001 void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
1002 {
1003         lu_stack_fini(env, cl2lu_dev(cl));
1004 }
1005 EXPORT_SYMBOL(cl_stack_fini);
1006
1007 int  cl_lock_init(void);
1008 void cl_lock_fini(void);
1009
1010 int  cl_page_init(void);
1011 void cl_page_fini(void);
1012
1013 static struct lu_context_key cl_key;
1014
1015 struct cl_thread_info *cl_env_info(const struct lu_env *env)
1016 {
1017         return lu_context_key_get(&env->le_ctx, &cl_key);
1018 }
1019
1020 /* defines cl0_key_{init,fini}() */
1021 LU_KEY_INIT_FINI(cl0, struct cl_thread_info);
1022
1023 static void *cl_key_init(const struct lu_context *ctx,
1024                          struct lu_context_key *key)
1025 {
1026         struct cl_thread_info *info;
1027
1028         info = cl0_key_init(ctx, key);
1029         if (!IS_ERR(info)) {
1030                 int i;
1031
1032                 for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1033                         lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1034         }
1035         return info;
1036 }
1037
1038 static void cl_key_fini(const struct lu_context *ctx,
1039                         struct lu_context_key *key, void *data)
1040 {
1041         struct cl_thread_info *info;
1042         int i;
1043
1044         info = data;
1045         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1046                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1047         cl0_key_fini(ctx, key, data);
1048 }
1049
1050 static void cl_key_exit(const struct lu_context *ctx,
1051                         struct lu_context_key *key, void *data)
1052 {
1053         struct cl_thread_info *info = data;
1054         int i;
1055
1056         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
1057                 LASSERT(info->clt_counters[i].ctc_nr_held == 0);
1058                 LASSERT(info->clt_counters[i].ctc_nr_used == 0);
1059                 LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
1060                 LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
1061                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1062                 lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1063         }
1064 }
1065
1066 static struct lu_context_key cl_key = {
1067         .lct_tags = LCT_CL_THREAD,
1068         .lct_init = cl_key_init,
1069         .lct_fini = cl_key_fini,
1070         .lct_exit = cl_key_exit
1071 };
1072
1073 static struct lu_kmem_descr cl_object_caches[] = {
1074         {
1075                 .ckd_cache = &cl_env_kmem,
1076                 .ckd_name  = "cl_env_kmem",
1077                 .ckd_size  = sizeof (struct cl_env)
1078         },
1079         {
1080                 .ckd_cache = NULL
1081         }
1082 };
1083
1084 /**
1085  * Global initialization of cl-data. Create kmem caches, register
1086  * lu_context_key's, etc.
1087  *
1088  * \see cl_global_fini()
1089  */
1090 int cl_global_init(void)
1091 {
1092         int result;
1093
1094         result = cl_env_store_init();
1095         if (result)
1096                 return result;
1097
1098         result = lu_kmem_init(cl_object_caches);
1099         if (result)
1100                 goto out_store;
1101
1102         LU_CONTEXT_KEY_INIT(&cl_key);
1103         result = lu_context_key_register(&cl_key);
1104         if (result)
1105                 goto out_kmem;
1106
1107         result = cl_lock_init();
1108         if (result)
1109                 goto out_context;
1110
1111         result = cl_page_init();
1112         if (result)
1113                 goto out_lock;
1114
1115         return 0;
1116 out_lock:
1117         cl_lock_fini();
1118 out_context:
1119         lu_context_key_degister(&cl_key);
1120 out_kmem:
1121         lu_kmem_fini(cl_object_caches);
1122 out_store:
1123         cl_env_store_fini();
1124         return result;
1125 }
1126
1127 /**
1128  * Finalization of global cl-data. Dual to cl_global_init().
1129  */
1130 void cl_global_fini(void)
1131 {
1132         cl_lock_fini();
1133         cl_page_fini();
1134         lu_context_key_degister(&cl_key);
1135         lu_kmem_fini(cl_object_caches);
1136         cl_env_store_fini();
1137 }