4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
47 extern struct list_head obd_types;
48 spinlock_t obd_types_lock;
50 struct kmem_cache *obd_device_cachep;
51 struct kmem_cache *obdo_cachep;
52 EXPORT_SYMBOL(obdo_cachep);
53 struct kmem_cache *import_cachep;
55 struct list_head obd_zombie_imports;
56 struct list_head obd_zombie_exports;
57 spinlock_t obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62 const char *status, int locks);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device *obd_device_alloc(void)
73 struct obd_device *obd;
75 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
77 obd->obd_magic = OBD_DEVICE_MAGIC;
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
112 EXPORT_SYMBOL(class_search_type);
114 struct obd_type *class_get_type(const char *name)
116 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
121 if (strcmp(modname, "obdfilter") == 0)
124 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125 modname = LUSTRE_OSP_NAME;
127 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128 modname = LUSTRE_MDT_NAME;
130 if (!request_module("%s", modname)) {
131 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132 type = class_search_type(name);
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type->obd_type_lock);
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
146 EXPORT_SYMBOL(class_get_type);
148 void class_put_type(struct obd_type *type)
151 spin_lock(&type->obd_type_lock);
153 module_put(type->typ_dt_ops->o_owner);
154 spin_unlock(&type->obd_type_lock);
156 EXPORT_SYMBOL(class_put_type);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161 struct lprocfs_vars *vars, const char *name,
162 struct lu_device_type *ldt)
164 struct obd_type *type;
168 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170 if (class_search_type(name)) {
171 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
176 OBD_ALLOC(type, sizeof(*type));
180 OBD_ALLOC_PTR(type->typ_dt_ops);
181 OBD_ALLOC_PTR(type->typ_md_ops);
182 OBD_ALLOC(type->typ_name, strlen(name) + 1);
184 if (type->typ_dt_ops == NULL ||
185 type->typ_md_ops == NULL ||
186 type->typ_name == NULL)
189 *(type->typ_dt_ops) = *dt_ops;
190 /* md_ops is optional */
192 *(type->typ_md_ops) = *md_ops;
193 strcpy(type->typ_name, name);
194 spin_lock_init(&type->obd_type_lock);
196 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
198 if (IS_ERR(type->typ_procroot)) {
199 rc = PTR_ERR(type->typ_procroot);
200 type->typ_procroot = NULL;
206 rc = lu_device_type_init(ldt);
211 spin_lock(&obd_types_lock);
212 list_add(&type->typ_chain, &obd_types);
213 spin_unlock(&obd_types_lock);
218 if (type->typ_name != NULL)
219 OBD_FREE(type->typ_name, strlen(name) + 1);
220 if (type->typ_md_ops != NULL)
221 OBD_FREE_PTR(type->typ_md_ops);
222 if (type->typ_dt_ops != NULL)
223 OBD_FREE_PTR(type->typ_dt_ops);
224 OBD_FREE(type, sizeof(*type));
227 EXPORT_SYMBOL(class_register_type);
229 int class_unregister_type(const char *name)
231 struct obd_type *type = class_search_type(name);
234 CERROR("unknown obd type\n");
238 if (type->typ_refcnt) {
239 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
240 /* This is a bad situation, let's make the best of it */
241 /* Remove ops, but leave the name for debugging */
242 OBD_FREE_PTR(type->typ_dt_ops);
243 OBD_FREE_PTR(type->typ_md_ops);
247 if (type->typ_procroot) {
248 lprocfs_remove(&type->typ_procroot);
252 lu_device_type_fini(type->typ_lu);
254 spin_lock(&obd_types_lock);
255 list_del(&type->typ_chain);
256 spin_unlock(&obd_types_lock);
257 OBD_FREE(type->typ_name, strlen(name) + 1);
258 if (type->typ_dt_ops != NULL)
259 OBD_FREE_PTR(type->typ_dt_ops);
260 if (type->typ_md_ops != NULL)
261 OBD_FREE_PTR(type->typ_md_ops);
262 OBD_FREE(type, sizeof(*type));
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
268 * Create a new obd device.
270 * Find an empty slot in ::obd_devs[], create a new obd device in it.
272 * \param[in] type_name obd device type string.
273 * \param[in] name obd device name.
275 * \retval NULL if create fails, otherwise return the obd device
278 struct obd_device *class_newdev(const char *type_name, const char *name)
280 struct obd_device *result = NULL;
281 struct obd_device *newdev;
282 struct obd_type *type = NULL;
284 int new_obd_minor = 0;
286 if (strlen(name) >= MAX_OBD_NAME) {
287 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288 return ERR_PTR(-EINVAL);
291 type = class_get_type(type_name);
293 CERROR("OBD: unknown type: %s\n", type_name);
294 return ERR_PTR(-ENODEV);
297 newdev = obd_device_alloc();
299 GOTO(out_type, result = ERR_PTR(-ENOMEM));
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303 write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
307 if (obd && (strcmp(name, obd->obd_name) == 0)) {
308 CERROR("Device %s already exists at %d, won't add\n",
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0]='\0';
321 result = ERR_PTR(-EEXIST);
324 if (!result && !obd) {
326 result->obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
334 write_unlock(&obd_dev_lock);
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 GOTO(out, result = ERR_PTR(-EOVERFLOW));
345 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346 result->obd_name, result);
350 obd_device_free(newdev);
352 class_put_type(type);
356 void class_release_dev(struct obd_device *obd)
358 struct obd_type *obd_type = obd->obd_type;
360 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
361 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
362 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
363 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
364 LASSERT(obd_type != NULL);
366 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
367 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
369 write_lock(&obd_dev_lock);
370 obd_devs[obd->obd_minor] = NULL;
371 write_unlock(&obd_dev_lock);
372 obd_device_free(obd);
374 class_put_type(obd_type);
377 int class_name2dev(const char *name)
384 read_lock(&obd_dev_lock);
385 for (i = 0; i < class_devno_max(); i++) {
386 struct obd_device *obd = class_num2obd(i);
388 if (obd && strcmp(name, obd->obd_name) == 0) {
389 /* Make sure we finished attaching before we give
390 out any references */
391 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
392 if (obd->obd_attached) {
393 read_unlock(&obd_dev_lock);
399 read_unlock(&obd_dev_lock);
403 EXPORT_SYMBOL(class_name2dev);
405 struct obd_device *class_name2obd(const char *name)
407 int dev = class_name2dev(name);
409 if (dev < 0 || dev > class_devno_max())
411 return class_num2obd(dev);
413 EXPORT_SYMBOL(class_name2obd);
415 int class_uuid2dev(struct obd_uuid *uuid)
419 read_lock(&obd_dev_lock);
420 for (i = 0; i < class_devno_max(); i++) {
421 struct obd_device *obd = class_num2obd(i);
423 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
424 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
425 read_unlock(&obd_dev_lock);
429 read_unlock(&obd_dev_lock);
433 EXPORT_SYMBOL(class_uuid2dev);
435 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
437 int dev = class_uuid2dev(uuid);
440 return class_num2obd(dev);
442 EXPORT_SYMBOL(class_uuid2obd);
445 * Get obd device from ::obd_devs[]
447 * \param num [in] array index
449 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
450 * otherwise return the obd device there.
452 struct obd_device *class_num2obd(int num)
454 struct obd_device *obd = NULL;
456 if (num < class_devno_max()) {
461 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
462 "%p obd_magic %08x != %08x\n",
463 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
464 LASSERTF(obd->obd_minor == num,
465 "%p obd_minor %0d != %0d\n",
466 obd, obd->obd_minor, num);
471 EXPORT_SYMBOL(class_num2obd);
474 * Get obd devices count. Device in any
476 * \retval obd device count
478 int get_devices_count(void)
480 int index, max_index = class_devno_max(), dev_count = 0;
482 read_lock(&obd_dev_lock);
483 for (index = 0; index <= max_index; index++) {
484 struct obd_device *obd = class_num2obd(index);
488 read_unlock(&obd_dev_lock);
492 EXPORT_SYMBOL(get_devices_count);
494 void class_obd_list(void)
499 read_lock(&obd_dev_lock);
500 for (i = 0; i < class_devno_max(); i++) {
501 struct obd_device *obd = class_num2obd(i);
505 if (obd->obd_stopping)
507 else if (obd->obd_set_up)
509 else if (obd->obd_attached)
513 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
514 i, status, obd->obd_type->typ_name,
515 obd->obd_name, obd->obd_uuid.uuid,
516 atomic_read(&obd->obd_refcount));
518 read_unlock(&obd_dev_lock);
522 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
523 specified, then only the client with that uuid is returned,
524 otherwise any client connected to the tgt is returned. */
525 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
526 const char * typ_name,
527 struct obd_uuid *grp_uuid)
531 read_lock(&obd_dev_lock);
532 for (i = 0; i < class_devno_max(); i++) {
533 struct obd_device *obd = class_num2obd(i);
537 if ((strncmp(obd->obd_type->typ_name, typ_name,
538 strlen(typ_name)) == 0)) {
539 if (obd_uuid_equals(tgt_uuid,
540 &obd->u.cli.cl_target_uuid) &&
541 ((grp_uuid)? obd_uuid_equals(grp_uuid,
542 &obd->obd_uuid) : 1)) {
543 read_unlock(&obd_dev_lock);
548 read_unlock(&obd_dev_lock);
552 EXPORT_SYMBOL(class_find_client_obd);
554 /* Iterate the obd_device list looking devices have grp_uuid. Start
555 searching at *next, and if a device is found, the next index to look
556 at is saved in *next. If next is NULL, then the first matching device
557 will always be returned. */
558 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
564 else if (*next >= 0 && *next < class_devno_max())
569 read_lock(&obd_dev_lock);
570 for (; i < class_devno_max(); i++) {
571 struct obd_device *obd = class_num2obd(i);
575 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
578 read_unlock(&obd_dev_lock);
582 read_unlock(&obd_dev_lock);
586 EXPORT_SYMBOL(class_devices_in_group);
589 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
590 * adjust sptlrpc settings accordingly.
592 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
594 struct obd_device *obd;
598 LASSERT(namelen > 0);
600 read_lock(&obd_dev_lock);
601 for (i = 0; i < class_devno_max(); i++) {
602 obd = class_num2obd(i);
604 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
607 /* only notify mdc, osc, mdt, ost */
608 type = obd->obd_type->typ_name;
609 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
610 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
611 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
612 strcmp(type, LUSTRE_OST_NAME) != 0)
615 if (strncmp(obd->obd_name, fsname, namelen))
618 class_incref(obd, __FUNCTION__, obd);
619 read_unlock(&obd_dev_lock);
620 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
621 sizeof(KEY_SPTLRPC_CONF),
622 KEY_SPTLRPC_CONF, 0, NULL, NULL);
624 class_decref(obd, __FUNCTION__, obd);
625 read_lock(&obd_dev_lock);
627 read_unlock(&obd_dev_lock);
630 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
632 void obd_cleanup_caches(void)
634 if (obd_device_cachep) {
635 kmem_cache_destroy(obd_device_cachep);
636 obd_device_cachep = NULL;
639 kmem_cache_destroy(obdo_cachep);
643 kmem_cache_destroy(import_cachep);
644 import_cachep = NULL;
647 kmem_cache_destroy(capa_cachep);
652 int obd_init_caches(void)
654 LASSERT(obd_device_cachep == NULL);
655 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
656 sizeof(struct obd_device),
658 if (!obd_device_cachep)
661 LASSERT(obdo_cachep == NULL);
662 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
667 LASSERT(import_cachep == NULL);
668 import_cachep = kmem_cache_create("ll_import_cache",
669 sizeof(struct obd_import),
674 LASSERT(capa_cachep == NULL);
675 capa_cachep = kmem_cache_create("capa_cache",
676 sizeof(struct obd_capa), 0, 0, NULL);
682 obd_cleanup_caches();
687 /* map connection to client */
688 struct obd_export *class_conn2export(struct lustre_handle *conn)
690 struct obd_export *export;
693 CDEBUG(D_CACHE, "looking for null handle\n");
697 if (conn->cookie == -1) { /* this means assign a new connection */
698 CDEBUG(D_CACHE, "want a new connection\n");
702 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
703 export = class_handle2object(conn->cookie);
706 EXPORT_SYMBOL(class_conn2export);
708 struct obd_device *class_exp2obd(struct obd_export *exp)
714 EXPORT_SYMBOL(class_exp2obd);
716 struct obd_device *class_conn2obd(struct lustre_handle *conn)
718 struct obd_export *export;
719 export = class_conn2export(conn);
721 struct obd_device *obd = export->exp_obd;
722 class_export_put(export);
727 EXPORT_SYMBOL(class_conn2obd);
729 struct obd_import *class_exp2cliimp(struct obd_export *exp)
731 struct obd_device *obd = exp->exp_obd;
734 return obd->u.cli.cl_import;
736 EXPORT_SYMBOL(class_exp2cliimp);
738 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
740 struct obd_device *obd = class_conn2obd(conn);
743 return obd->u.cli.cl_import;
745 EXPORT_SYMBOL(class_conn2cliimp);
747 /* Export management functions */
748 static void class_export_destroy(struct obd_export *exp)
750 struct obd_device *obd = exp->exp_obd;
752 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753 LASSERT(obd != NULL);
755 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756 exp->exp_client_uuid.uuid, obd->obd_name);
758 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
759 if (exp->exp_connection)
760 ptlrpc_put_connection_superhack(exp->exp_connection);
762 LASSERT(list_empty(&exp->exp_outstanding_replies));
763 LASSERT(list_empty(&exp->exp_uncommitted_replies));
764 LASSERT(list_empty(&exp->exp_req_replay_queue));
765 LASSERT(list_empty(&exp->exp_hp_rpcs));
766 obd_destroy_export(exp);
767 class_decref(obd, "export", exp);
769 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
772 static void export_handle_addref(void *export)
774 class_export_get(export);
777 static struct portals_handle_ops export_handle_ops = {
778 .hop_addref = export_handle_addref,
782 struct obd_export *class_export_get(struct obd_export *exp)
784 atomic_inc(&exp->exp_refcount);
785 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
786 atomic_read(&exp->exp_refcount));
789 EXPORT_SYMBOL(class_export_get);
791 void class_export_put(struct obd_export *exp)
793 LASSERT(exp != NULL);
794 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
795 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
796 atomic_read(&exp->exp_refcount) - 1);
798 if (atomic_dec_and_test(&exp->exp_refcount)) {
799 LASSERT(!list_empty(&exp->exp_obd_chain));
800 CDEBUG(D_IOCTL, "final put %p/%s\n",
801 exp, exp->exp_client_uuid.uuid);
803 /* release nid stat refererence */
804 lprocfs_exp_cleanup(exp);
806 obd_zombie_export_add(exp);
809 EXPORT_SYMBOL(class_export_put);
811 /* Creates a new export, adds it to the hash table, and returns a
812 * pointer to it. The refcount is 2: one for the hash reference, and
813 * one for the pointer returned by this function. */
814 struct obd_export *class_new_export(struct obd_device *obd,
815 struct obd_uuid *cluuid)
817 struct obd_export *export;
818 struct cfs_hash *hash = NULL;
821 OBD_ALLOC_PTR(export);
823 return ERR_PTR(-ENOMEM);
825 export->exp_conn_cnt = 0;
826 export->exp_lock_hash = NULL;
827 export->exp_flock_hash = NULL;
828 atomic_set(&export->exp_refcount, 2);
829 atomic_set(&export->exp_rpc_count, 0);
830 atomic_set(&export->exp_cb_count, 0);
831 atomic_set(&export->exp_locks_count, 0);
832 #if LUSTRE_TRACKS_LOCK_EXP_REFS
833 INIT_LIST_HEAD(&export->exp_locks_list);
834 spin_lock_init(&export->exp_locks_list_guard);
836 atomic_set(&export->exp_replay_count, 0);
837 export->exp_obd = obd;
838 INIT_LIST_HEAD(&export->exp_outstanding_replies);
839 spin_lock_init(&export->exp_uncommitted_replies_lock);
840 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
841 INIT_LIST_HEAD(&export->exp_req_replay_queue);
842 INIT_LIST_HEAD(&export->exp_handle.h_link);
843 INIT_LIST_HEAD(&export->exp_hp_rpcs);
844 class_handle_hash(&export->exp_handle, &export_handle_ops);
845 export->exp_last_request_time = cfs_time_current_sec();
846 spin_lock_init(&export->exp_lock);
847 spin_lock_init(&export->exp_rpc_lock);
848 INIT_HLIST_NODE(&export->exp_uuid_hash);
849 INIT_HLIST_NODE(&export->exp_nid_hash);
850 spin_lock_init(&export->exp_bl_list_lock);
851 INIT_LIST_HEAD(&export->exp_bl_list);
853 export->exp_sp_peer = LUSTRE_SP_ANY;
854 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
855 export->exp_client_uuid = *cluuid;
856 obd_init_export(export);
858 spin_lock(&obd->obd_dev_lock);
859 /* shouldn't happen, but might race */
860 if (obd->obd_stopping)
861 GOTO(exit_unlock, rc = -ENODEV);
863 hash = cfs_hash_getref(obd->obd_uuid_hash);
865 GOTO(exit_unlock, rc = -ENODEV);
866 spin_unlock(&obd->obd_dev_lock);
868 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
869 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
871 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
872 obd->obd_name, cluuid->uuid, rc);
873 GOTO(exit_err, rc = -EALREADY);
877 spin_lock(&obd->obd_dev_lock);
878 if (obd->obd_stopping) {
879 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
880 GOTO(exit_unlock, rc = -ENODEV);
883 class_incref(obd, "export", export);
884 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
885 list_add_tail(&export->exp_obd_chain_timed,
886 &export->exp_obd->obd_exports_timed);
887 export->exp_obd->obd_num_exports++;
888 spin_unlock(&obd->obd_dev_lock);
889 cfs_hash_putref(hash);
893 spin_unlock(&obd->obd_dev_lock);
896 cfs_hash_putref(hash);
897 class_handle_unhash(&export->exp_handle);
898 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
899 obd_destroy_export(export);
900 OBD_FREE_PTR(export);
903 EXPORT_SYMBOL(class_new_export);
905 void class_unlink_export(struct obd_export *exp)
907 class_handle_unhash(&exp->exp_handle);
909 spin_lock(&exp->exp_obd->obd_dev_lock);
910 /* delete an uuid-export hashitem from hashtables */
911 if (!hlist_unhashed(&exp->exp_uuid_hash))
912 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
913 &exp->exp_client_uuid,
914 &exp->exp_uuid_hash);
916 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
917 list_del_init(&exp->exp_obd_chain_timed);
918 exp->exp_obd->obd_num_exports--;
919 spin_unlock(&exp->exp_obd->obd_dev_lock);
920 class_export_put(exp);
922 EXPORT_SYMBOL(class_unlink_export);
924 /* Import management functions */
925 void class_import_destroy(struct obd_import *imp)
927 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
928 imp->imp_obd->obd_name);
930 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
932 ptlrpc_put_connection_superhack(imp->imp_connection);
934 while (!list_empty(&imp->imp_conn_list)) {
935 struct obd_import_conn *imp_conn;
937 imp_conn = list_entry(imp->imp_conn_list.next,
938 struct obd_import_conn, oic_item);
939 list_del_init(&imp_conn->oic_item);
940 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
941 OBD_FREE(imp_conn, sizeof(*imp_conn));
944 LASSERT(imp->imp_sec == NULL);
945 class_decref(imp->imp_obd, "import", imp);
946 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
949 static void import_handle_addref(void *import)
951 class_import_get(import);
954 static struct portals_handle_ops import_handle_ops = {
955 .hop_addref = import_handle_addref,
959 struct obd_import *class_import_get(struct obd_import *import)
961 atomic_inc(&import->imp_refcount);
962 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
963 atomic_read(&import->imp_refcount),
964 import->imp_obd->obd_name);
967 EXPORT_SYMBOL(class_import_get);
969 void class_import_put(struct obd_import *imp)
971 LASSERT(list_empty(&imp->imp_zombie_chain));
972 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
974 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
975 atomic_read(&imp->imp_refcount) - 1,
976 imp->imp_obd->obd_name);
978 if (atomic_dec_and_test(&imp->imp_refcount)) {
979 CDEBUG(D_INFO, "final put import %p\n", imp);
980 obd_zombie_import_add(imp);
983 /* catch possible import put race */
984 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
986 EXPORT_SYMBOL(class_import_put);
988 static void init_imp_at(struct imp_at *at) {
990 at_init(&at->iat_net_latency, 0, 0);
991 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
992 /* max service estimates are tracked on the server side, so
993 don't use the AT history here, just use the last reported
994 val. (But keep hist for proc histogram, worst_ever) */
995 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1000 struct obd_import *class_new_import(struct obd_device *obd)
1002 struct obd_import *imp;
1004 OBD_ALLOC(imp, sizeof(*imp));
1008 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1009 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1010 INIT_LIST_HEAD(&imp->imp_replay_list);
1011 INIT_LIST_HEAD(&imp->imp_sending_list);
1012 INIT_LIST_HEAD(&imp->imp_delayed_list);
1013 spin_lock_init(&imp->imp_lock);
1014 imp->imp_last_success_conn = 0;
1015 imp->imp_state = LUSTRE_IMP_NEW;
1016 imp->imp_obd = class_incref(obd, "import", imp);
1017 mutex_init(&imp->imp_sec_mutex);
1018 init_waitqueue_head(&imp->imp_recovery_waitq);
1020 atomic_set(&imp->imp_refcount, 2);
1021 atomic_set(&imp->imp_unregistering, 0);
1022 atomic_set(&imp->imp_inflight, 0);
1023 atomic_set(&imp->imp_replay_inflight, 0);
1024 atomic_set(&imp->imp_inval_count, 0);
1025 INIT_LIST_HEAD(&imp->imp_conn_list);
1026 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1027 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1028 init_imp_at(&imp->imp_at);
1030 /* the default magic is V2, will be used in connect RPC, and
1031 * then adjusted according to the flags in request/reply. */
1032 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1036 EXPORT_SYMBOL(class_new_import);
1038 void class_destroy_import(struct obd_import *import)
1040 LASSERT(import != NULL);
1041 LASSERT(import != LP_POISON);
1043 class_handle_unhash(&import->imp_handle);
1045 spin_lock(&import->imp_lock);
1046 import->imp_generation++;
1047 spin_unlock(&import->imp_lock);
1048 class_import_put(import);
1050 EXPORT_SYMBOL(class_destroy_import);
1052 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1054 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1056 spin_lock(&exp->exp_locks_list_guard);
1058 LASSERT(lock->l_exp_refs_nr >= 0);
1060 if (lock->l_exp_refs_target != NULL &&
1061 lock->l_exp_refs_target != exp) {
1062 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1063 exp, lock, lock->l_exp_refs_target);
1065 if ((lock->l_exp_refs_nr ++) == 0) {
1066 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1067 lock->l_exp_refs_target = exp;
1069 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1070 lock, exp, lock->l_exp_refs_nr);
1071 spin_unlock(&exp->exp_locks_list_guard);
1073 EXPORT_SYMBOL(__class_export_add_lock_ref);
1075 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1077 spin_lock(&exp->exp_locks_list_guard);
1078 LASSERT(lock->l_exp_refs_nr > 0);
1079 if (lock->l_exp_refs_target != exp) {
1080 LCONSOLE_WARN("lock %p, "
1081 "mismatching export pointers: %p, %p\n",
1082 lock, lock->l_exp_refs_target, exp);
1084 if (-- lock->l_exp_refs_nr == 0) {
1085 list_del_init(&lock->l_exp_refs_link);
1086 lock->l_exp_refs_target = NULL;
1088 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1089 lock, exp, lock->l_exp_refs_nr);
1090 spin_unlock(&exp->exp_locks_list_guard);
1092 EXPORT_SYMBOL(__class_export_del_lock_ref);
1095 /* A connection defines an export context in which preallocation can
1096 be managed. This releases the export pointer reference, and returns
1097 the export handle, so the export refcount is 1 when this function
1099 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1100 struct obd_uuid *cluuid)
1102 struct obd_export *export;
1103 LASSERT(conn != NULL);
1104 LASSERT(obd != NULL);
1105 LASSERT(cluuid != NULL);
1107 export = class_new_export(obd, cluuid);
1109 return PTR_ERR(export);
1111 conn->cookie = export->exp_handle.h_cookie;
1112 class_export_put(export);
1114 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1115 cluuid->uuid, conn->cookie);
1118 EXPORT_SYMBOL(class_connect);
1120 /* if export is involved in recovery then clean up related things */
1121 void class_export_recovery_cleanup(struct obd_export *exp)
1123 struct obd_device *obd = exp->exp_obd;
1125 spin_lock(&obd->obd_recovery_task_lock);
1126 if (exp->exp_delayed)
1127 obd->obd_delayed_clients--;
1128 if (obd->obd_recovering) {
1129 if (exp->exp_in_recovery) {
1130 spin_lock(&exp->exp_lock);
1131 exp->exp_in_recovery = 0;
1132 spin_unlock(&exp->exp_lock);
1133 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1134 atomic_dec(&obd->obd_connected_clients);
1137 /* if called during recovery then should update
1138 * obd_stale_clients counter,
1139 * lightweight exports are not counted */
1140 if (exp->exp_failed &&
1141 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1142 exp->exp_obd->obd_stale_clients++;
1144 spin_unlock(&obd->obd_recovery_task_lock);
1145 /** Cleanup req replay fields */
1146 if (exp->exp_req_replay_needed) {
1147 spin_lock(&exp->exp_lock);
1148 exp->exp_req_replay_needed = 0;
1149 spin_unlock(&exp->exp_lock);
1150 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1151 atomic_dec(&obd->obd_req_replay_clients);
1153 /** Cleanup lock replay data */
1154 if (exp->exp_lock_replay_needed) {
1155 spin_lock(&exp->exp_lock);
1156 exp->exp_lock_replay_needed = 0;
1157 spin_unlock(&exp->exp_lock);
1158 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1159 atomic_dec(&obd->obd_lock_replay_clients);
1163 /* This function removes 1-3 references from the export:
1164 * 1 - for export pointer passed
1165 * and if disconnect really need
1166 * 2 - removing from hash
1167 * 3 - in client_unlink_export
1168 * The export pointer passed to this function can destroyed */
1169 int class_disconnect(struct obd_export *export)
1171 int already_disconnected;
1173 if (export == NULL) {
1174 CWARN("attempting to free NULL export %p\n", export);
1178 spin_lock(&export->exp_lock);
1179 already_disconnected = export->exp_disconnected;
1180 export->exp_disconnected = 1;
1181 spin_unlock(&export->exp_lock);
1183 /* class_cleanup(), abort_recovery(), and class_fail_export()
1184 * all end up in here, and if any of them race we shouldn't
1185 * call extra class_export_puts(). */
1186 if (already_disconnected) {
1187 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1188 GOTO(no_disconn, already_disconnected);
1191 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1192 export->exp_handle.h_cookie);
1194 if (!hlist_unhashed(&export->exp_nid_hash))
1195 cfs_hash_del(export->exp_obd->obd_nid_hash,
1196 &export->exp_connection->c_peer.nid,
1197 &export->exp_nid_hash);
1199 class_export_recovery_cleanup(export);
1200 class_unlink_export(export);
1202 class_export_put(export);
1205 EXPORT_SYMBOL(class_disconnect);
1207 /* Return non-zero for a fully connected export */
1208 int class_connected_export(struct obd_export *exp)
1212 spin_lock(&exp->exp_lock);
1213 connected = (exp->exp_conn_cnt > 0);
1214 spin_unlock(&exp->exp_lock);
1219 EXPORT_SYMBOL(class_connected_export);
1221 static void class_disconnect_export_list(struct list_head *list,
1222 enum obd_option flags)
1225 struct obd_export *exp;
1227 /* It's possible that an export may disconnect itself, but
1228 * nothing else will be added to this list. */
1229 while (!list_empty(list)) {
1230 exp = list_entry(list->next, struct obd_export,
1232 /* need for safe call CDEBUG after obd_disconnect */
1233 class_export_get(exp);
1235 spin_lock(&exp->exp_lock);
1236 exp->exp_flags = flags;
1237 spin_unlock(&exp->exp_lock);
1239 if (obd_uuid_equals(&exp->exp_client_uuid,
1240 &exp->exp_obd->obd_uuid)) {
1242 "exp %p export uuid == obd uuid, don't discon\n",
1244 /* Need to delete this now so we don't end up pointing
1245 * to work_list later when this export is cleaned up. */
1246 list_del_init(&exp->exp_obd_chain);
1247 class_export_put(exp);
1251 class_export_get(exp);
1252 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1253 "last request at "CFS_TIME_T"\n",
1254 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1255 exp, exp->exp_last_request_time);
1256 /* release one export reference anyway */
1257 rc = obd_disconnect(exp);
1259 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1260 obd_export_nid2str(exp), exp, rc);
1261 class_export_put(exp);
1265 void class_disconnect_exports(struct obd_device *obd)
1267 struct list_head work_list;
1269 /* Move all of the exports from obd_exports to a work list, en masse. */
1270 INIT_LIST_HEAD(&work_list);
1271 spin_lock(&obd->obd_dev_lock);
1272 list_splice_init(&obd->obd_exports, &work_list);
1273 list_splice_init(&obd->obd_delayed_exports, &work_list);
1274 spin_unlock(&obd->obd_dev_lock);
1276 if (!list_empty(&work_list)) {
1277 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1278 "disconnecting them\n", obd->obd_minor, obd);
1279 class_disconnect_export_list(&work_list,
1280 exp_flags_from_obd(obd));
1282 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1283 obd->obd_minor, obd);
1285 EXPORT_SYMBOL(class_disconnect_exports);
1287 /* Remove exports that have not completed recovery.
1289 void class_disconnect_stale_exports(struct obd_device *obd,
1290 int (*test_export)(struct obd_export *))
1292 struct list_head work_list;
1293 struct obd_export *exp, *n;
1296 INIT_LIST_HEAD(&work_list);
1297 spin_lock(&obd->obd_dev_lock);
1298 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1300 /* don't count self-export as client */
1301 if (obd_uuid_equals(&exp->exp_client_uuid,
1302 &exp->exp_obd->obd_uuid))
1305 /* don't evict clients which have no slot in last_rcvd
1306 * (e.g. lightweight connection) */
1307 if (exp->exp_target_data.ted_lr_idx == -1)
1310 spin_lock(&exp->exp_lock);
1311 if (exp->exp_failed || test_export(exp)) {
1312 spin_unlock(&exp->exp_lock);
1315 exp->exp_failed = 1;
1316 spin_unlock(&exp->exp_lock);
1318 list_move(&exp->exp_obd_chain, &work_list);
1320 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1321 obd->obd_name, exp->exp_client_uuid.uuid,
1322 exp->exp_connection == NULL ? "<unknown>" :
1323 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1324 print_export_data(exp, "EVICTING", 0);
1326 spin_unlock(&obd->obd_dev_lock);
1329 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1330 obd->obd_name, evicted);
1332 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1333 OBD_OPT_ABORT_RECOV);
1335 EXPORT_SYMBOL(class_disconnect_stale_exports);
1337 void class_fail_export(struct obd_export *exp)
1339 int rc, already_failed;
1341 spin_lock(&exp->exp_lock);
1342 already_failed = exp->exp_failed;
1343 exp->exp_failed = 1;
1344 spin_unlock(&exp->exp_lock);
1346 if (already_failed) {
1347 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1348 exp, exp->exp_client_uuid.uuid);
1352 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1353 exp, exp->exp_client_uuid.uuid);
1355 if (obd_dump_on_timeout)
1356 libcfs_debug_dumplog();
1358 /* need for safe call CDEBUG after obd_disconnect */
1359 class_export_get(exp);
1361 /* Most callers into obd_disconnect are removing their own reference
1362 * (request, for example) in addition to the one from the hash table.
1363 * We don't have such a reference here, so make one. */
1364 class_export_get(exp);
1365 rc = obd_disconnect(exp);
1367 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1369 CDEBUG(D_HA, "disconnected export %p/%s\n",
1370 exp, exp->exp_client_uuid.uuid);
1371 class_export_put(exp);
1373 EXPORT_SYMBOL(class_fail_export);
1375 char *obd_export_nid2str(struct obd_export *exp)
1377 if (exp->exp_connection != NULL)
1378 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1382 EXPORT_SYMBOL(obd_export_nid2str);
1384 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1386 struct cfs_hash *nid_hash;
1387 struct obd_export *doomed_exp = NULL;
1388 int exports_evicted = 0;
1390 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1392 spin_lock(&obd->obd_dev_lock);
1393 /* umount has run already, so evict thread should leave
1394 * its task to umount thread now */
1395 if (obd->obd_stopping) {
1396 spin_unlock(&obd->obd_dev_lock);
1397 return exports_evicted;
1399 nid_hash = obd->obd_nid_hash;
1400 cfs_hash_getref(nid_hash);
1401 spin_unlock(&obd->obd_dev_lock);
1404 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1405 if (doomed_exp == NULL)
1408 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1409 "nid %s found, wanted nid %s, requested nid %s\n",
1410 obd_export_nid2str(doomed_exp),
1411 libcfs_nid2str(nid_key), nid);
1412 LASSERTF(doomed_exp != obd->obd_self_export,
1413 "self-export is hashed by NID?\n");
1415 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1416 "request\n", obd->obd_name,
1417 obd_uuid2str(&doomed_exp->exp_client_uuid),
1418 obd_export_nid2str(doomed_exp));
1419 class_fail_export(doomed_exp);
1420 class_export_put(doomed_exp);
1423 cfs_hash_putref(nid_hash);
1425 if (!exports_evicted)
1426 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1427 obd->obd_name, nid);
1428 return exports_evicted;
1430 EXPORT_SYMBOL(obd_export_evict_by_nid);
1432 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1434 struct cfs_hash *uuid_hash;
1435 struct obd_export *doomed_exp = NULL;
1436 struct obd_uuid doomed_uuid;
1437 int exports_evicted = 0;
1439 spin_lock(&obd->obd_dev_lock);
1440 if (obd->obd_stopping) {
1441 spin_unlock(&obd->obd_dev_lock);
1442 return exports_evicted;
1444 uuid_hash = obd->obd_uuid_hash;
1445 cfs_hash_getref(uuid_hash);
1446 spin_unlock(&obd->obd_dev_lock);
1448 obd_str2uuid(&doomed_uuid, uuid);
1449 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1450 CERROR("%s: can't evict myself\n", obd->obd_name);
1451 cfs_hash_putref(uuid_hash);
1452 return exports_evicted;
1455 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1457 if (doomed_exp == NULL) {
1458 CERROR("%s: can't disconnect %s: no exports found\n",
1459 obd->obd_name, uuid);
1461 CWARN("%s: evicting %s at administrative request\n",
1462 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1463 class_fail_export(doomed_exp);
1464 class_export_put(doomed_exp);
1467 cfs_hash_putref(uuid_hash);
1469 return exports_evicted;
1471 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1473 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1474 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1475 EXPORT_SYMBOL(class_export_dump_hook);
1478 static void print_export_data(struct obd_export *exp, const char *status,
1481 struct ptlrpc_reply_state *rs;
1482 struct ptlrpc_reply_state *first_reply = NULL;
1485 spin_lock(&exp->exp_lock);
1486 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1492 spin_unlock(&exp->exp_lock);
1494 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1495 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1496 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1497 atomic_read(&exp->exp_rpc_count),
1498 atomic_read(&exp->exp_cb_count),
1499 atomic_read(&exp->exp_locks_count),
1500 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1501 nreplies, first_reply, nreplies > 3 ? "..." : "",
1502 exp->exp_last_committed);
1503 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1504 if (locks && class_export_dump_hook != NULL)
1505 class_export_dump_hook(exp);
1509 void dump_exports(struct obd_device *obd, int locks)
1511 struct obd_export *exp;
1513 spin_lock(&obd->obd_dev_lock);
1514 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1515 print_export_data(exp, "ACTIVE", locks);
1516 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1517 print_export_data(exp, "UNLINKED", locks);
1518 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1519 print_export_data(exp, "DELAYED", locks);
1520 spin_unlock(&obd->obd_dev_lock);
1521 spin_lock(&obd_zombie_impexp_lock);
1522 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1523 print_export_data(exp, "ZOMBIE", locks);
1524 spin_unlock(&obd_zombie_impexp_lock);
1526 EXPORT_SYMBOL(dump_exports);
1528 void obd_exports_barrier(struct obd_device *obd)
1531 LASSERT(list_empty(&obd->obd_exports));
1532 spin_lock(&obd->obd_dev_lock);
1533 while (!list_empty(&obd->obd_unlinked_exports)) {
1534 spin_unlock(&obd->obd_dev_lock);
1535 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1536 cfs_time_seconds(waited));
1537 if (waited > 5 && IS_PO2(waited)) {
1538 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1539 "more than %d seconds. "
1540 "The obd refcount = %d. Is it stuck?\n",
1541 obd->obd_name, waited,
1542 atomic_read(&obd->obd_refcount));
1543 dump_exports(obd, 1);
1546 spin_lock(&obd->obd_dev_lock);
1548 spin_unlock(&obd->obd_dev_lock);
1550 EXPORT_SYMBOL(obd_exports_barrier);
1552 /* Total amount of zombies to be destroyed */
1553 static int zombies_count = 0;
1556 * kill zombie imports and exports
1558 void obd_zombie_impexp_cull(void)
1560 struct obd_import *import;
1561 struct obd_export *export;
1564 spin_lock(&obd_zombie_impexp_lock);
1567 if (!list_empty(&obd_zombie_imports)) {
1568 import = list_entry(obd_zombie_imports.next,
1571 list_del_init(&import->imp_zombie_chain);
1575 if (!list_empty(&obd_zombie_exports)) {
1576 export = list_entry(obd_zombie_exports.next,
1579 list_del_init(&export->exp_obd_chain);
1582 spin_unlock(&obd_zombie_impexp_lock);
1584 if (import != NULL) {
1585 class_import_destroy(import);
1586 spin_lock(&obd_zombie_impexp_lock);
1588 spin_unlock(&obd_zombie_impexp_lock);
1591 if (export != NULL) {
1592 class_export_destroy(export);
1593 spin_lock(&obd_zombie_impexp_lock);
1595 spin_unlock(&obd_zombie_impexp_lock);
1599 } while (import != NULL || export != NULL);
1602 static struct completion obd_zombie_start;
1603 static struct completion obd_zombie_stop;
1604 static unsigned long obd_zombie_flags;
1605 static wait_queue_head_t obd_zombie_waitq;
1606 static pid_t obd_zombie_pid;
1609 OBD_ZOMBIE_STOP = 0x0001,
1613 * check for work for kill zombie import/export thread.
1615 static int obd_zombie_impexp_check(void *arg)
1619 spin_lock(&obd_zombie_impexp_lock);
1620 rc = (zombies_count == 0) &&
1621 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1622 spin_unlock(&obd_zombie_impexp_lock);
1628 * Add export to the obd_zombe thread and notify it.
1630 static void obd_zombie_export_add(struct obd_export *exp) {
1631 spin_lock(&exp->exp_obd->obd_dev_lock);
1632 LASSERT(!list_empty(&exp->exp_obd_chain));
1633 list_del_init(&exp->exp_obd_chain);
1634 spin_unlock(&exp->exp_obd->obd_dev_lock);
1635 spin_lock(&obd_zombie_impexp_lock);
1637 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1638 spin_unlock(&obd_zombie_impexp_lock);
1640 obd_zombie_impexp_notify();
1644 * Add import to the obd_zombe thread and notify it.
1646 static void obd_zombie_import_add(struct obd_import *imp) {
1647 LASSERT(imp->imp_sec == NULL);
1648 LASSERT(imp->imp_rq_pool == NULL);
1649 spin_lock(&obd_zombie_impexp_lock);
1650 LASSERT(list_empty(&imp->imp_zombie_chain));
1652 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1653 spin_unlock(&obd_zombie_impexp_lock);
1655 obd_zombie_impexp_notify();
1659 * notify import/export destroy thread about new zombie.
1661 static void obd_zombie_impexp_notify(void)
1664 * Make sure obd_zomebie_impexp_thread get this notification.
1665 * It is possible this signal only get by obd_zombie_barrier, and
1666 * barrier gulps this notification and sleeps away and hangs ensues
1668 wake_up_all(&obd_zombie_waitq);
1672 * check whether obd_zombie is idle
1674 static int obd_zombie_is_idle(void)
1678 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1679 spin_lock(&obd_zombie_impexp_lock);
1680 rc = (zombies_count == 0);
1681 spin_unlock(&obd_zombie_impexp_lock);
1686 * wait when obd_zombie import/export queues become empty
1688 void obd_zombie_barrier(void)
1690 struct l_wait_info lwi = { 0 };
1692 if (obd_zombie_pid == current_pid())
1693 /* don't wait for myself */
1695 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1697 EXPORT_SYMBOL(obd_zombie_barrier);
1701 * destroy zombie export/import thread.
1703 static int obd_zombie_impexp_thread(void *unused)
1705 unshare_fs_struct();
1706 complete(&obd_zombie_start);
1708 obd_zombie_pid = current_pid();
1710 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1711 struct l_wait_info lwi = { 0 };
1713 l_wait_event(obd_zombie_waitq,
1714 !obd_zombie_impexp_check(NULL), &lwi);
1715 obd_zombie_impexp_cull();
1718 * Notify obd_zombie_barrier callers that queues
1721 wake_up(&obd_zombie_waitq);
1724 complete(&obd_zombie_stop);
1731 * start destroy zombie import/export thread
1733 int obd_zombie_impexp_init(void)
1735 struct task_struct *task;
1737 INIT_LIST_HEAD(&obd_zombie_imports);
1738 INIT_LIST_HEAD(&obd_zombie_exports);
1739 spin_lock_init(&obd_zombie_impexp_lock);
1740 init_completion(&obd_zombie_start);
1741 init_completion(&obd_zombie_stop);
1742 init_waitqueue_head(&obd_zombie_waitq);
1745 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1747 return PTR_ERR(task);
1749 wait_for_completion(&obd_zombie_start);
1753 * stop destroy zombie import/export thread
1755 void obd_zombie_impexp_stop(void)
1757 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1758 obd_zombie_impexp_notify();
1759 wait_for_completion(&obd_zombie_stop);
1762 /***** Kernel-userspace comm helpers *******/
1764 /* Get length of entire message, including header */
1765 int kuc_len(int payload_len)
1767 return sizeof(struct kuc_hdr) + payload_len;
1769 EXPORT_SYMBOL(kuc_len);
1771 /* Get a pointer to kuc header, given a ptr to the payload
1772 * @param p Pointer to payload area
1773 * @returns Pointer to kuc header
1775 struct kuc_hdr * kuc_ptr(void *p)
1777 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1778 LASSERT(lh->kuc_magic == KUC_MAGIC);
1781 EXPORT_SYMBOL(kuc_ptr);
1783 /* Test if payload is part of kuc message
1784 * @param p Pointer to payload area
1787 int kuc_ispayload(void *p)
1789 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1791 if (kh->kuc_magic == KUC_MAGIC)
1796 EXPORT_SYMBOL(kuc_ispayload);
1798 /* Alloc space for a message, and fill in header
1799 * @return Pointer to payload area
1801 void *kuc_alloc(int payload_len, int transport, int type)
1804 int len = kuc_len(payload_len);
1808 return ERR_PTR(-ENOMEM);
1810 lh->kuc_magic = KUC_MAGIC;
1811 lh->kuc_transport = transport;
1812 lh->kuc_msgtype = type;
1813 lh->kuc_msglen = len;
1815 return (void *)(lh + 1);
1817 EXPORT_SYMBOL(kuc_alloc);
1819 /* Takes pointer to payload area */
1820 inline void kuc_free(void *p, int payload_len)
1822 struct kuc_hdr *lh = kuc_ptr(p);
1823 OBD_FREE(lh, kuc_len(payload_len));
1825 EXPORT_SYMBOL(kuc_free);