]> Pileus Git - ~andy/linux/blob - drivers/staging/lustre/lustre/obdclass/genops.c
Linux 3.14
[~andy/linux] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_ost.h>
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
46
47 extern struct list_head obd_types;
48 spinlock_t obd_types_lock;
49
50 struct kmem_cache *obd_device_cachep;
51 struct kmem_cache *obdo_cachep;
52 EXPORT_SYMBOL(obdo_cachep);
53 struct kmem_cache *import_cachep;
54
55 struct list_head      obd_zombie_imports;
56 struct list_head      obd_zombie_exports;
57 spinlock_t  obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62                               const char *status, int locks);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66
67 /*
68  * support functions: we could use inter-module communication, but this
69  * is more portable to other OS's
70  */
71 static struct obd_device *obd_device_alloc(void)
72 {
73         struct obd_device *obd;
74
75         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
76         if (obd != NULL) {
77                 obd->obd_magic = OBD_DEVICE_MAGIC;
78         }
79         return obd;
80 }
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112 EXPORT_SYMBOL(class_search_type);
113
114 struct obd_type *class_get_type(const char *name)
115 {
116         struct obd_type *type = class_search_type(name);
117
118         if (!type) {
119                 const char *modname = name;
120
121                 if (strcmp(modname, "obdfilter") == 0)
122                         modname = "ofd";
123
124                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125                         modname = LUSTRE_OSP_NAME;
126
127                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128                         modname = LUSTRE_MDT_NAME;
129
130                 if (!request_module("%s", modname)) {
131                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132                         type = class_search_type(name);
133                 } else {
134                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135                                            modname);
136                 }
137         }
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         struct lprocfs_vars *vars, const char *name,
162                         struct lu_device_type *ldt)
163 {
164         struct obd_type *type;
165         int rc = 0;
166
167         /* sanity check */
168         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169
170         if (class_search_type(name)) {
171                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172                 return -EEXIST;
173         }
174
175         rc = -ENOMEM;
176         OBD_ALLOC(type, sizeof(*type));
177         if (type == NULL)
178                 return rc;
179
180         OBD_ALLOC_PTR(type->typ_dt_ops);
181         OBD_ALLOC_PTR(type->typ_md_ops);
182         OBD_ALLOC(type->typ_name, strlen(name) + 1);
183
184         if (type->typ_dt_ops == NULL ||
185             type->typ_md_ops == NULL ||
186             type->typ_name == NULL)
187                 GOTO (failed, rc);
188
189         *(type->typ_dt_ops) = *dt_ops;
190         /* md_ops is optional */
191         if (md_ops)
192                 *(type->typ_md_ops) = *md_ops;
193         strcpy(type->typ_name, name);
194         spin_lock_init(&type->obd_type_lock);
195
196         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
197                                               vars, type);
198         if (IS_ERR(type->typ_procroot)) {
199                 rc = PTR_ERR(type->typ_procroot);
200                 type->typ_procroot = NULL;
201                 GOTO (failed, rc);
202         }
203
204         if (ldt != NULL) {
205                 type->typ_lu = ldt;
206                 rc = lu_device_type_init(ldt);
207                 if (rc != 0)
208                         GOTO (failed, rc);
209         }
210
211         spin_lock(&obd_types_lock);
212         list_add(&type->typ_chain, &obd_types);
213         spin_unlock(&obd_types_lock);
214
215         return 0;
216
217  failed:
218         if (type->typ_name != NULL)
219                 OBD_FREE(type->typ_name, strlen(name) + 1);
220         if (type->typ_md_ops != NULL)
221                 OBD_FREE_PTR(type->typ_md_ops);
222         if (type->typ_dt_ops != NULL)
223                 OBD_FREE_PTR(type->typ_dt_ops);
224         OBD_FREE(type, sizeof(*type));
225         return rc;
226 }
227 EXPORT_SYMBOL(class_register_type);
228
229 int class_unregister_type(const char *name)
230 {
231         struct obd_type *type = class_search_type(name);
232
233         if (!type) {
234                 CERROR("unknown obd type\n");
235                 return -EINVAL;
236         }
237
238         if (type->typ_refcnt) {
239                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
240                 /* This is a bad situation, let's make the best of it */
241                 /* Remove ops, but leave the name for debugging */
242                 OBD_FREE_PTR(type->typ_dt_ops);
243                 OBD_FREE_PTR(type->typ_md_ops);
244                 return -EBUSY;
245         }
246
247         if (type->typ_procroot) {
248                 lprocfs_remove(&type->typ_procroot);
249         }
250
251         if (type->typ_lu)
252                 lu_device_type_fini(type->typ_lu);
253
254         spin_lock(&obd_types_lock);
255         list_del(&type->typ_chain);
256         spin_unlock(&obd_types_lock);
257         OBD_FREE(type->typ_name, strlen(name) + 1);
258         if (type->typ_dt_ops != NULL)
259                 OBD_FREE_PTR(type->typ_dt_ops);
260         if (type->typ_md_ops != NULL)
261                 OBD_FREE_PTR(type->typ_md_ops);
262         OBD_FREE(type, sizeof(*type));
263         return 0;
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
266
267 /**
268  * Create a new obd device.
269  *
270  * Find an empty slot in ::obd_devs[], create a new obd device in it.
271  *
272  * \param[in] type_name obd device type string.
273  * \param[in] name      obd device name.
274  *
275  * \retval NULL if create fails, otherwise return the obd device
276  *       pointer created.
277  */
278 struct obd_device *class_newdev(const char *type_name, const char *name)
279 {
280         struct obd_device *result = NULL;
281         struct obd_device *newdev;
282         struct obd_type *type = NULL;
283         int i;
284         int new_obd_minor = 0;
285
286         if (strlen(name) >= MAX_OBD_NAME) {
287                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288                 return ERR_PTR(-EINVAL);
289         }
290
291         type = class_get_type(type_name);
292         if (type == NULL){
293                 CERROR("OBD: unknown type: %s\n", type_name);
294                 return ERR_PTR(-ENODEV);
295         }
296
297         newdev = obd_device_alloc();
298         if (newdev == NULL)
299                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
300
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
308                         CERROR("Device %s already exists at %d, won't add\n",
309                                name, i);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0]='\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
340         }
341
342         if (IS_ERR(result))
343                 GOTO(out, result);
344
345         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346                result->obd_name, result);
347
348         return result;
349 out:
350         obd_device_free(newdev);
351 out_type:
352         class_put_type(type);
353         return result;
354 }
355
356 void class_release_dev(struct obd_device *obd)
357 {
358         struct obd_type *obd_type = obd->obd_type;
359
360         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
361                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
362         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
363                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
364         LASSERT(obd_type != NULL);
365
366         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
367                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
368
369         write_lock(&obd_dev_lock);
370         obd_devs[obd->obd_minor] = NULL;
371         write_unlock(&obd_dev_lock);
372         obd_device_free(obd);
373
374         class_put_type(obd_type);
375 }
376
377 int class_name2dev(const char *name)
378 {
379         int i;
380
381         if (!name)
382                 return -1;
383
384         read_lock(&obd_dev_lock);
385         for (i = 0; i < class_devno_max(); i++) {
386                 struct obd_device *obd = class_num2obd(i);
387
388                 if (obd && strcmp(name, obd->obd_name) == 0) {
389                         /* Make sure we finished attaching before we give
390                            out any references */
391                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
392                         if (obd->obd_attached) {
393                                 read_unlock(&obd_dev_lock);
394                                 return i;
395                         }
396                         break;
397                 }
398         }
399         read_unlock(&obd_dev_lock);
400
401         return -1;
402 }
403 EXPORT_SYMBOL(class_name2dev);
404
405 struct obd_device *class_name2obd(const char *name)
406 {
407         int dev = class_name2dev(name);
408
409         if (dev < 0 || dev > class_devno_max())
410                 return NULL;
411         return class_num2obd(dev);
412 }
413 EXPORT_SYMBOL(class_name2obd);
414
415 int class_uuid2dev(struct obd_uuid *uuid)
416 {
417         int i;
418
419         read_lock(&obd_dev_lock);
420         for (i = 0; i < class_devno_max(); i++) {
421                 struct obd_device *obd = class_num2obd(i);
422
423                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
424                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
425                         read_unlock(&obd_dev_lock);
426                         return i;
427                 }
428         }
429         read_unlock(&obd_dev_lock);
430
431         return -1;
432 }
433 EXPORT_SYMBOL(class_uuid2dev);
434
435 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
436 {
437         int dev = class_uuid2dev(uuid);
438         if (dev < 0)
439                 return NULL;
440         return class_num2obd(dev);
441 }
442 EXPORT_SYMBOL(class_uuid2obd);
443
444 /**
445  * Get obd device from ::obd_devs[]
446  *
447  * \param num [in] array index
448  *
449  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
450  *       otherwise return the obd device there.
451  */
452 struct obd_device *class_num2obd(int num)
453 {
454         struct obd_device *obd = NULL;
455
456         if (num < class_devno_max()) {
457                 obd = obd_devs[num];
458                 if (obd == NULL)
459                         return NULL;
460
461                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
462                          "%p obd_magic %08x != %08x\n",
463                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
464                 LASSERTF(obd->obd_minor == num,
465                          "%p obd_minor %0d != %0d\n",
466                          obd, obd->obd_minor, num);
467         }
468
469         return obd;
470 }
471 EXPORT_SYMBOL(class_num2obd);
472
473 /**
474  * Get obd devices count. Device in any
475  *    state are counted
476  * \retval obd device count
477  */
478 int get_devices_count(void)
479 {
480         int index, max_index = class_devno_max(), dev_count = 0;
481
482         read_lock(&obd_dev_lock);
483         for (index = 0; index <= max_index; index++) {
484                 struct obd_device *obd = class_num2obd(index);
485                 if (obd != NULL)
486                         dev_count++;
487         }
488         read_unlock(&obd_dev_lock);
489
490         return dev_count;
491 }
492 EXPORT_SYMBOL(get_devices_count);
493
494 void class_obd_list(void)
495 {
496         char *status;
497         int i;
498
499         read_lock(&obd_dev_lock);
500         for (i = 0; i < class_devno_max(); i++) {
501                 struct obd_device *obd = class_num2obd(i);
502
503                 if (obd == NULL)
504                         continue;
505                 if (obd->obd_stopping)
506                         status = "ST";
507                 else if (obd->obd_set_up)
508                         status = "UP";
509                 else if (obd->obd_attached)
510                         status = "AT";
511                 else
512                         status = "--";
513                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
514                          i, status, obd->obd_type->typ_name,
515                          obd->obd_name, obd->obd_uuid.uuid,
516                          atomic_read(&obd->obd_refcount));
517         }
518         read_unlock(&obd_dev_lock);
519         return;
520 }
521
522 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
523    specified, then only the client with that uuid is returned,
524    otherwise any client connected to the tgt is returned. */
525 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
526                                           const char * typ_name,
527                                           struct obd_uuid *grp_uuid)
528 {
529         int i;
530
531         read_lock(&obd_dev_lock);
532         for (i = 0; i < class_devno_max(); i++) {
533                 struct obd_device *obd = class_num2obd(i);
534
535                 if (obd == NULL)
536                         continue;
537                 if ((strncmp(obd->obd_type->typ_name, typ_name,
538                              strlen(typ_name)) == 0)) {
539                         if (obd_uuid_equals(tgt_uuid,
540                                             &obd->u.cli.cl_target_uuid) &&
541                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
542                                                          &obd->obd_uuid) : 1)) {
543                                 read_unlock(&obd_dev_lock);
544                                 return obd;
545                         }
546                 }
547         }
548         read_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552 EXPORT_SYMBOL(class_find_client_obd);
553
554 /* Iterate the obd_device list looking devices have grp_uuid. Start
555    searching at *next, and if a device is found, the next index to look
556    at is saved in *next. If next is NULL, then the first matching device
557    will always be returned. */
558 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
559 {
560         int i;
561
562         if (next == NULL)
563                 i = 0;
564         else if (*next >= 0 && *next < class_devno_max())
565                 i = *next;
566         else
567                 return NULL;
568
569         read_lock(&obd_dev_lock);
570         for (; i < class_devno_max(); i++) {
571                 struct obd_device *obd = class_num2obd(i);
572
573                 if (obd == NULL)
574                         continue;
575                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
576                         if (next != NULL)
577                                 *next = i+1;
578                         read_unlock(&obd_dev_lock);
579                         return obd;
580                 }
581         }
582         read_unlock(&obd_dev_lock);
583
584         return NULL;
585 }
586 EXPORT_SYMBOL(class_devices_in_group);
587
588 /**
589  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
590  * adjust sptlrpc settings accordingly.
591  */
592 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
593 {
594         struct obd_device  *obd;
595         const char       *type;
596         int              i, rc = 0, rc2;
597
598         LASSERT(namelen > 0);
599
600         read_lock(&obd_dev_lock);
601         for (i = 0; i < class_devno_max(); i++) {
602                 obd = class_num2obd(i);
603
604                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
605                         continue;
606
607                 /* only notify mdc, osc, mdt, ost */
608                 type = obd->obd_type->typ_name;
609                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
610                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
611                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
612                     strcmp(type, LUSTRE_OST_NAME) != 0)
613                         continue;
614
615                 if (strncmp(obd->obd_name, fsname, namelen))
616                         continue;
617
618                 class_incref(obd, __FUNCTION__, obd);
619                 read_unlock(&obd_dev_lock);
620                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
621                                          sizeof(KEY_SPTLRPC_CONF),
622                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
623                 rc = rc ? rc : rc2;
624                 class_decref(obd, __FUNCTION__, obd);
625                 read_lock(&obd_dev_lock);
626         }
627         read_unlock(&obd_dev_lock);
628         return rc;
629 }
630 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
631
632 void obd_cleanup_caches(void)
633 {
634         if (obd_device_cachep) {
635                 kmem_cache_destroy(obd_device_cachep);
636                 obd_device_cachep = NULL;
637         }
638         if (obdo_cachep) {
639                 kmem_cache_destroy(obdo_cachep);
640                 obdo_cachep = NULL;
641         }
642         if (import_cachep) {
643                 kmem_cache_destroy(import_cachep);
644                 import_cachep = NULL;
645         }
646         if (capa_cachep) {
647                 kmem_cache_destroy(capa_cachep);
648                 capa_cachep = NULL;
649         }
650 }
651
652 int obd_init_caches(void)
653 {
654         LASSERT(obd_device_cachep == NULL);
655         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
656                                                  sizeof(struct obd_device),
657                                                  0, 0, NULL);
658         if (!obd_device_cachep)
659                 GOTO(out, -ENOMEM);
660
661         LASSERT(obdo_cachep == NULL);
662         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
663                                            0, 0, NULL);
664         if (!obdo_cachep)
665                 GOTO(out, -ENOMEM);
666
667         LASSERT(import_cachep == NULL);
668         import_cachep = kmem_cache_create("ll_import_cache",
669                                              sizeof(struct obd_import),
670                                              0, 0, NULL);
671         if (!import_cachep)
672                 GOTO(out, -ENOMEM);
673
674         LASSERT(capa_cachep == NULL);
675         capa_cachep = kmem_cache_create("capa_cache",
676                                            sizeof(struct obd_capa), 0, 0, NULL);
677         if (!capa_cachep)
678                 GOTO(out, -ENOMEM);
679
680         return 0;
681  out:
682         obd_cleanup_caches();
683         return -ENOMEM;
684
685 }
686
687 /* map connection to client */
688 struct obd_export *class_conn2export(struct lustre_handle *conn)
689 {
690         struct obd_export *export;
691
692         if (!conn) {
693                 CDEBUG(D_CACHE, "looking for null handle\n");
694                 return NULL;
695         }
696
697         if (conn->cookie == -1) {  /* this means assign a new connection */
698                 CDEBUG(D_CACHE, "want a new connection\n");
699                 return NULL;
700         }
701
702         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
703         export = class_handle2object(conn->cookie);
704         return export;
705 }
706 EXPORT_SYMBOL(class_conn2export);
707
708 struct obd_device *class_exp2obd(struct obd_export *exp)
709 {
710         if (exp)
711                 return exp->exp_obd;
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_exp2obd);
715
716 struct obd_device *class_conn2obd(struct lustre_handle *conn)
717 {
718         struct obd_export *export;
719         export = class_conn2export(conn);
720         if (export) {
721                 struct obd_device *obd = export->exp_obd;
722                 class_export_put(export);
723                 return obd;
724         }
725         return NULL;
726 }
727 EXPORT_SYMBOL(class_conn2obd);
728
729 struct obd_import *class_exp2cliimp(struct obd_export *exp)
730 {
731         struct obd_device *obd = exp->exp_obd;
732         if (obd == NULL)
733                 return NULL;
734         return obd->u.cli.cl_import;
735 }
736 EXPORT_SYMBOL(class_exp2cliimp);
737
738 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
739 {
740         struct obd_device *obd = class_conn2obd(conn);
741         if (obd == NULL)
742                 return NULL;
743         return obd->u.cli.cl_import;
744 }
745 EXPORT_SYMBOL(class_conn2cliimp);
746
747 /* Export management functions */
748 static void class_export_destroy(struct obd_export *exp)
749 {
750         struct obd_device *obd = exp->exp_obd;
751
752         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753         LASSERT(obd != NULL);
754
755         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756                exp->exp_client_uuid.uuid, obd->obd_name);
757
758         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
759         if (exp->exp_connection)
760                 ptlrpc_put_connection_superhack(exp->exp_connection);
761
762         LASSERT(list_empty(&exp->exp_outstanding_replies));
763         LASSERT(list_empty(&exp->exp_uncommitted_replies));
764         LASSERT(list_empty(&exp->exp_req_replay_queue));
765         LASSERT(list_empty(&exp->exp_hp_rpcs));
766         obd_destroy_export(exp);
767         class_decref(obd, "export", exp);
768
769         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
770 }
771
772 static void export_handle_addref(void *export)
773 {
774         class_export_get(export);
775 }
776
777 static struct portals_handle_ops export_handle_ops = {
778         .hop_addref = export_handle_addref,
779         .hop_free   = NULL,
780 };
781
782 struct obd_export *class_export_get(struct obd_export *exp)
783 {
784         atomic_inc(&exp->exp_refcount);
785         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
786                atomic_read(&exp->exp_refcount));
787         return exp;
788 }
789 EXPORT_SYMBOL(class_export_get);
790
791 void class_export_put(struct obd_export *exp)
792 {
793         LASSERT(exp != NULL);
794         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
795         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
796                atomic_read(&exp->exp_refcount) - 1);
797
798         if (atomic_dec_and_test(&exp->exp_refcount)) {
799                 LASSERT(!list_empty(&exp->exp_obd_chain));
800                 CDEBUG(D_IOCTL, "final put %p/%s\n",
801                        exp, exp->exp_client_uuid.uuid);
802
803                 /* release nid stat refererence */
804                 lprocfs_exp_cleanup(exp);
805
806                 obd_zombie_export_add(exp);
807         }
808 }
809 EXPORT_SYMBOL(class_export_put);
810
811 /* Creates a new export, adds it to the hash table, and returns a
812  * pointer to it. The refcount is 2: one for the hash reference, and
813  * one for the pointer returned by this function. */
814 struct obd_export *class_new_export(struct obd_device *obd,
815                                     struct obd_uuid *cluuid)
816 {
817         struct obd_export *export;
818         struct cfs_hash *hash = NULL;
819         int rc = 0;
820
821         OBD_ALLOC_PTR(export);
822         if (!export)
823                 return ERR_PTR(-ENOMEM);
824
825         export->exp_conn_cnt = 0;
826         export->exp_lock_hash = NULL;
827         export->exp_flock_hash = NULL;
828         atomic_set(&export->exp_refcount, 2);
829         atomic_set(&export->exp_rpc_count, 0);
830         atomic_set(&export->exp_cb_count, 0);
831         atomic_set(&export->exp_locks_count, 0);
832 #if LUSTRE_TRACKS_LOCK_EXP_REFS
833         INIT_LIST_HEAD(&export->exp_locks_list);
834         spin_lock_init(&export->exp_locks_list_guard);
835 #endif
836         atomic_set(&export->exp_replay_count, 0);
837         export->exp_obd = obd;
838         INIT_LIST_HEAD(&export->exp_outstanding_replies);
839         spin_lock_init(&export->exp_uncommitted_replies_lock);
840         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
841         INIT_LIST_HEAD(&export->exp_req_replay_queue);
842         INIT_LIST_HEAD(&export->exp_handle.h_link);
843         INIT_LIST_HEAD(&export->exp_hp_rpcs);
844         class_handle_hash(&export->exp_handle, &export_handle_ops);
845         export->exp_last_request_time = cfs_time_current_sec();
846         spin_lock_init(&export->exp_lock);
847         spin_lock_init(&export->exp_rpc_lock);
848         INIT_HLIST_NODE(&export->exp_uuid_hash);
849         INIT_HLIST_NODE(&export->exp_nid_hash);
850         spin_lock_init(&export->exp_bl_list_lock);
851         INIT_LIST_HEAD(&export->exp_bl_list);
852
853         export->exp_sp_peer = LUSTRE_SP_ANY;
854         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
855         export->exp_client_uuid = *cluuid;
856         obd_init_export(export);
857
858         spin_lock(&obd->obd_dev_lock);
859         /* shouldn't happen, but might race */
860         if (obd->obd_stopping)
861                 GOTO(exit_unlock, rc = -ENODEV);
862
863         hash = cfs_hash_getref(obd->obd_uuid_hash);
864         if (hash == NULL)
865                 GOTO(exit_unlock, rc = -ENODEV);
866         spin_unlock(&obd->obd_dev_lock);
867
868         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
869                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
870                 if (rc != 0) {
871                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
872                                       obd->obd_name, cluuid->uuid, rc);
873                         GOTO(exit_err, rc = -EALREADY);
874                 }
875         }
876
877         spin_lock(&obd->obd_dev_lock);
878         if (obd->obd_stopping) {
879                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
880                 GOTO(exit_unlock, rc = -ENODEV);
881         }
882
883         class_incref(obd, "export", export);
884         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
885         list_add_tail(&export->exp_obd_chain_timed,
886                           &export->exp_obd->obd_exports_timed);
887         export->exp_obd->obd_num_exports++;
888         spin_unlock(&obd->obd_dev_lock);
889         cfs_hash_putref(hash);
890         return export;
891
892 exit_unlock:
893         spin_unlock(&obd->obd_dev_lock);
894 exit_err:
895         if (hash)
896                 cfs_hash_putref(hash);
897         class_handle_unhash(&export->exp_handle);
898         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
899         obd_destroy_export(export);
900         OBD_FREE_PTR(export);
901         return ERR_PTR(rc);
902 }
903 EXPORT_SYMBOL(class_new_export);
904
905 void class_unlink_export(struct obd_export *exp)
906 {
907         class_handle_unhash(&exp->exp_handle);
908
909         spin_lock(&exp->exp_obd->obd_dev_lock);
910         /* delete an uuid-export hashitem from hashtables */
911         if (!hlist_unhashed(&exp->exp_uuid_hash))
912                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
913                              &exp->exp_client_uuid,
914                              &exp->exp_uuid_hash);
915
916         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
917         list_del_init(&exp->exp_obd_chain_timed);
918         exp->exp_obd->obd_num_exports--;
919         spin_unlock(&exp->exp_obd->obd_dev_lock);
920         class_export_put(exp);
921 }
922 EXPORT_SYMBOL(class_unlink_export);
923
924 /* Import management functions */
925 void class_import_destroy(struct obd_import *imp)
926 {
927         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
928                 imp->imp_obd->obd_name);
929
930         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
931
932         ptlrpc_put_connection_superhack(imp->imp_connection);
933
934         while (!list_empty(&imp->imp_conn_list)) {
935                 struct obd_import_conn *imp_conn;
936
937                 imp_conn = list_entry(imp->imp_conn_list.next,
938                                           struct obd_import_conn, oic_item);
939                 list_del_init(&imp_conn->oic_item);
940                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
941                 OBD_FREE(imp_conn, sizeof(*imp_conn));
942         }
943
944         LASSERT(imp->imp_sec == NULL);
945         class_decref(imp->imp_obd, "import", imp);
946         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
947 }
948
949 static void import_handle_addref(void *import)
950 {
951         class_import_get(import);
952 }
953
954 static struct portals_handle_ops import_handle_ops = {
955         .hop_addref = import_handle_addref,
956         .hop_free   = NULL,
957 };
958
959 struct obd_import *class_import_get(struct obd_import *import)
960 {
961         atomic_inc(&import->imp_refcount);
962         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
963                atomic_read(&import->imp_refcount),
964                import->imp_obd->obd_name);
965         return import;
966 }
967 EXPORT_SYMBOL(class_import_get);
968
969 void class_import_put(struct obd_import *imp)
970 {
971         LASSERT(list_empty(&imp->imp_zombie_chain));
972         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
973
974         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
975                atomic_read(&imp->imp_refcount) - 1,
976                imp->imp_obd->obd_name);
977
978         if (atomic_dec_and_test(&imp->imp_refcount)) {
979                 CDEBUG(D_INFO, "final put import %p\n", imp);
980                 obd_zombie_import_add(imp);
981         }
982
983         /* catch possible import put race */
984         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
985 }
986 EXPORT_SYMBOL(class_import_put);
987
988 static void init_imp_at(struct imp_at *at) {
989         int i;
990         at_init(&at->iat_net_latency, 0, 0);
991         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
992                 /* max service estimates are tracked on the server side, so
993                    don't use the AT history here, just use the last reported
994                    val. (But keep hist for proc histogram, worst_ever) */
995                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
996                         AT_FLG_NOHIST);
997         }
998 }
999
1000 struct obd_import *class_new_import(struct obd_device *obd)
1001 {
1002         struct obd_import *imp;
1003
1004         OBD_ALLOC(imp, sizeof(*imp));
1005         if (imp == NULL)
1006                 return NULL;
1007
1008         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1009         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1010         INIT_LIST_HEAD(&imp->imp_replay_list);
1011         INIT_LIST_HEAD(&imp->imp_sending_list);
1012         INIT_LIST_HEAD(&imp->imp_delayed_list);
1013         spin_lock_init(&imp->imp_lock);
1014         imp->imp_last_success_conn = 0;
1015         imp->imp_state = LUSTRE_IMP_NEW;
1016         imp->imp_obd = class_incref(obd, "import", imp);
1017         mutex_init(&imp->imp_sec_mutex);
1018         init_waitqueue_head(&imp->imp_recovery_waitq);
1019
1020         atomic_set(&imp->imp_refcount, 2);
1021         atomic_set(&imp->imp_unregistering, 0);
1022         atomic_set(&imp->imp_inflight, 0);
1023         atomic_set(&imp->imp_replay_inflight, 0);
1024         atomic_set(&imp->imp_inval_count, 0);
1025         INIT_LIST_HEAD(&imp->imp_conn_list);
1026         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1027         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1028         init_imp_at(&imp->imp_at);
1029
1030         /* the default magic is V2, will be used in connect RPC, and
1031          * then adjusted according to the flags in request/reply. */
1032         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1033
1034         return imp;
1035 }
1036 EXPORT_SYMBOL(class_new_import);
1037
1038 void class_destroy_import(struct obd_import *import)
1039 {
1040         LASSERT(import != NULL);
1041         LASSERT(import != LP_POISON);
1042
1043         class_handle_unhash(&import->imp_handle);
1044
1045         spin_lock(&import->imp_lock);
1046         import->imp_generation++;
1047         spin_unlock(&import->imp_lock);
1048         class_import_put(import);
1049 }
1050 EXPORT_SYMBOL(class_destroy_import);
1051
1052 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1053
1054 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1055 {
1056         spin_lock(&exp->exp_locks_list_guard);
1057
1058         LASSERT(lock->l_exp_refs_nr >= 0);
1059
1060         if (lock->l_exp_refs_target != NULL &&
1061             lock->l_exp_refs_target != exp) {
1062                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1063                               exp, lock, lock->l_exp_refs_target);
1064         }
1065         if ((lock->l_exp_refs_nr ++) == 0) {
1066                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1067                 lock->l_exp_refs_target = exp;
1068         }
1069         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1070                lock, exp, lock->l_exp_refs_nr);
1071         spin_unlock(&exp->exp_locks_list_guard);
1072 }
1073 EXPORT_SYMBOL(__class_export_add_lock_ref);
1074
1075 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1076 {
1077         spin_lock(&exp->exp_locks_list_guard);
1078         LASSERT(lock->l_exp_refs_nr > 0);
1079         if (lock->l_exp_refs_target != exp) {
1080                 LCONSOLE_WARN("lock %p, "
1081                               "mismatching export pointers: %p, %p\n",
1082                               lock, lock->l_exp_refs_target, exp);
1083         }
1084         if (-- lock->l_exp_refs_nr == 0) {
1085                 list_del_init(&lock->l_exp_refs_link);
1086                 lock->l_exp_refs_target = NULL;
1087         }
1088         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1089                lock, exp, lock->l_exp_refs_nr);
1090         spin_unlock(&exp->exp_locks_list_guard);
1091 }
1092 EXPORT_SYMBOL(__class_export_del_lock_ref);
1093 #endif
1094
1095 /* A connection defines an export context in which preallocation can
1096    be managed. This releases the export pointer reference, and returns
1097    the export handle, so the export refcount is 1 when this function
1098    returns. */
1099 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1100                   struct obd_uuid *cluuid)
1101 {
1102         struct obd_export *export;
1103         LASSERT(conn != NULL);
1104         LASSERT(obd != NULL);
1105         LASSERT(cluuid != NULL);
1106
1107         export = class_new_export(obd, cluuid);
1108         if (IS_ERR(export))
1109                 return PTR_ERR(export);
1110
1111         conn->cookie = export->exp_handle.h_cookie;
1112         class_export_put(export);
1113
1114         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1115                cluuid->uuid, conn->cookie);
1116         return 0;
1117 }
1118 EXPORT_SYMBOL(class_connect);
1119
1120 /* if export is involved in recovery then clean up related things */
1121 void class_export_recovery_cleanup(struct obd_export *exp)
1122 {
1123         struct obd_device *obd = exp->exp_obd;
1124
1125         spin_lock(&obd->obd_recovery_task_lock);
1126         if (exp->exp_delayed)
1127                 obd->obd_delayed_clients--;
1128         if (obd->obd_recovering) {
1129                 if (exp->exp_in_recovery) {
1130                         spin_lock(&exp->exp_lock);
1131                         exp->exp_in_recovery = 0;
1132                         spin_unlock(&exp->exp_lock);
1133                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1134                         atomic_dec(&obd->obd_connected_clients);
1135                 }
1136
1137                 /* if called during recovery then should update
1138                  * obd_stale_clients counter,
1139                  * lightweight exports are not counted */
1140                 if (exp->exp_failed &&
1141                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1142                         exp->exp_obd->obd_stale_clients++;
1143         }
1144         spin_unlock(&obd->obd_recovery_task_lock);
1145         /** Cleanup req replay fields */
1146         if (exp->exp_req_replay_needed) {
1147                 spin_lock(&exp->exp_lock);
1148                 exp->exp_req_replay_needed = 0;
1149                 spin_unlock(&exp->exp_lock);
1150                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1151                 atomic_dec(&obd->obd_req_replay_clients);
1152         }
1153         /** Cleanup lock replay data */
1154         if (exp->exp_lock_replay_needed) {
1155                 spin_lock(&exp->exp_lock);
1156                 exp->exp_lock_replay_needed = 0;
1157                 spin_unlock(&exp->exp_lock);
1158                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1159                 atomic_dec(&obd->obd_lock_replay_clients);
1160         }
1161 }
1162
1163 /* This function removes 1-3 references from the export:
1164  * 1 - for export pointer passed
1165  * and if disconnect really need
1166  * 2 - removing from hash
1167  * 3 - in client_unlink_export
1168  * The export pointer passed to this function can destroyed */
1169 int class_disconnect(struct obd_export *export)
1170 {
1171         int already_disconnected;
1172
1173         if (export == NULL) {
1174                 CWARN("attempting to free NULL export %p\n", export);
1175                 return -EINVAL;
1176         }
1177
1178         spin_lock(&export->exp_lock);
1179         already_disconnected = export->exp_disconnected;
1180         export->exp_disconnected = 1;
1181         spin_unlock(&export->exp_lock);
1182
1183         /* class_cleanup(), abort_recovery(), and class_fail_export()
1184          * all end up in here, and if any of them race we shouldn't
1185          * call extra class_export_puts(). */
1186         if (already_disconnected) {
1187                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1188                 GOTO(no_disconn, already_disconnected);
1189         }
1190
1191         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1192                export->exp_handle.h_cookie);
1193
1194         if (!hlist_unhashed(&export->exp_nid_hash))
1195                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1196                              &export->exp_connection->c_peer.nid,
1197                              &export->exp_nid_hash);
1198
1199         class_export_recovery_cleanup(export);
1200         class_unlink_export(export);
1201 no_disconn:
1202         class_export_put(export);
1203         return 0;
1204 }
1205 EXPORT_SYMBOL(class_disconnect);
1206
1207 /* Return non-zero for a fully connected export */
1208 int class_connected_export(struct obd_export *exp)
1209 {
1210         if (exp) {
1211                 int connected;
1212                 spin_lock(&exp->exp_lock);
1213                 connected = (exp->exp_conn_cnt > 0);
1214                 spin_unlock(&exp->exp_lock);
1215                 return connected;
1216         }
1217         return 0;
1218 }
1219 EXPORT_SYMBOL(class_connected_export);
1220
1221 static void class_disconnect_export_list(struct list_head *list,
1222                                          enum obd_option flags)
1223 {
1224         int rc;
1225         struct obd_export *exp;
1226
1227         /* It's possible that an export may disconnect itself, but
1228          * nothing else will be added to this list. */
1229         while (!list_empty(list)) {
1230                 exp = list_entry(list->next, struct obd_export,
1231                                      exp_obd_chain);
1232                 /* need for safe call CDEBUG after obd_disconnect */
1233                 class_export_get(exp);
1234
1235                 spin_lock(&exp->exp_lock);
1236                 exp->exp_flags = flags;
1237                 spin_unlock(&exp->exp_lock);
1238
1239                 if (obd_uuid_equals(&exp->exp_client_uuid,
1240                                     &exp->exp_obd->obd_uuid)) {
1241                         CDEBUG(D_HA,
1242                                "exp %p export uuid == obd uuid, don't discon\n",
1243                                exp);
1244                         /* Need to delete this now so we don't end up pointing
1245                          * to work_list later when this export is cleaned up. */
1246                         list_del_init(&exp->exp_obd_chain);
1247                         class_export_put(exp);
1248                         continue;
1249                 }
1250
1251                 class_export_get(exp);
1252                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1253                        "last request at "CFS_TIME_T"\n",
1254                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1255                        exp, exp->exp_last_request_time);
1256                 /* release one export reference anyway */
1257                 rc = obd_disconnect(exp);
1258
1259                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1260                        obd_export_nid2str(exp), exp, rc);
1261                 class_export_put(exp);
1262         }
1263 }
1264
1265 void class_disconnect_exports(struct obd_device *obd)
1266 {
1267         struct list_head work_list;
1268
1269         /* Move all of the exports from obd_exports to a work list, en masse. */
1270         INIT_LIST_HEAD(&work_list);
1271         spin_lock(&obd->obd_dev_lock);
1272         list_splice_init(&obd->obd_exports, &work_list);
1273         list_splice_init(&obd->obd_delayed_exports, &work_list);
1274         spin_unlock(&obd->obd_dev_lock);
1275
1276         if (!list_empty(&work_list)) {
1277                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1278                        "disconnecting them\n", obd->obd_minor, obd);
1279                 class_disconnect_export_list(&work_list,
1280                                              exp_flags_from_obd(obd));
1281         } else
1282                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1283                        obd->obd_minor, obd);
1284 }
1285 EXPORT_SYMBOL(class_disconnect_exports);
1286
1287 /* Remove exports that have not completed recovery.
1288  */
1289 void class_disconnect_stale_exports(struct obd_device *obd,
1290                                     int (*test_export)(struct obd_export *))
1291 {
1292         struct list_head work_list;
1293         struct obd_export *exp, *n;
1294         int evicted = 0;
1295
1296         INIT_LIST_HEAD(&work_list);
1297         spin_lock(&obd->obd_dev_lock);
1298         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1299                                      exp_obd_chain) {
1300                 /* don't count self-export as client */
1301                 if (obd_uuid_equals(&exp->exp_client_uuid,
1302                                     &exp->exp_obd->obd_uuid))
1303                         continue;
1304
1305                 /* don't evict clients which have no slot in last_rcvd
1306                  * (e.g. lightweight connection) */
1307                 if (exp->exp_target_data.ted_lr_idx == -1)
1308                         continue;
1309
1310                 spin_lock(&exp->exp_lock);
1311                 if (exp->exp_failed || test_export(exp)) {
1312                         spin_unlock(&exp->exp_lock);
1313                         continue;
1314                 }
1315                 exp->exp_failed = 1;
1316                 spin_unlock(&exp->exp_lock);
1317
1318                 list_move(&exp->exp_obd_chain, &work_list);
1319                 evicted++;
1320                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1321                        obd->obd_name, exp->exp_client_uuid.uuid,
1322                        exp->exp_connection == NULL ? "<unknown>" :
1323                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1324                 print_export_data(exp, "EVICTING", 0);
1325         }
1326         spin_unlock(&obd->obd_dev_lock);
1327
1328         if (evicted)
1329                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1330                               obd->obd_name, evicted);
1331
1332         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1333                                                  OBD_OPT_ABORT_RECOV);
1334 }
1335 EXPORT_SYMBOL(class_disconnect_stale_exports);
1336
1337 void class_fail_export(struct obd_export *exp)
1338 {
1339         int rc, already_failed;
1340
1341         spin_lock(&exp->exp_lock);
1342         already_failed = exp->exp_failed;
1343         exp->exp_failed = 1;
1344         spin_unlock(&exp->exp_lock);
1345
1346         if (already_failed) {
1347                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1348                        exp, exp->exp_client_uuid.uuid);
1349                 return;
1350         }
1351
1352         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1353                exp, exp->exp_client_uuid.uuid);
1354
1355         if (obd_dump_on_timeout)
1356                 libcfs_debug_dumplog();
1357
1358         /* need for safe call CDEBUG after obd_disconnect */
1359         class_export_get(exp);
1360
1361         /* Most callers into obd_disconnect are removing their own reference
1362          * (request, for example) in addition to the one from the hash table.
1363          * We don't have such a reference here, so make one. */
1364         class_export_get(exp);
1365         rc = obd_disconnect(exp);
1366         if (rc)
1367                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1368         else
1369                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1370                        exp, exp->exp_client_uuid.uuid);
1371         class_export_put(exp);
1372 }
1373 EXPORT_SYMBOL(class_fail_export);
1374
1375 char *obd_export_nid2str(struct obd_export *exp)
1376 {
1377         if (exp->exp_connection != NULL)
1378                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1379
1380         return "(no nid)";
1381 }
1382 EXPORT_SYMBOL(obd_export_nid2str);
1383
1384 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1385 {
1386         struct cfs_hash *nid_hash;
1387         struct obd_export *doomed_exp = NULL;
1388         int exports_evicted = 0;
1389
1390         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1391
1392         spin_lock(&obd->obd_dev_lock);
1393         /* umount has run already, so evict thread should leave
1394          * its task to umount thread now */
1395         if (obd->obd_stopping) {
1396                 spin_unlock(&obd->obd_dev_lock);
1397                 return exports_evicted;
1398         }
1399         nid_hash = obd->obd_nid_hash;
1400         cfs_hash_getref(nid_hash);
1401         spin_unlock(&obd->obd_dev_lock);
1402
1403         do {
1404                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1405                 if (doomed_exp == NULL)
1406                         break;
1407
1408                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1409                          "nid %s found, wanted nid %s, requested nid %s\n",
1410                          obd_export_nid2str(doomed_exp),
1411                          libcfs_nid2str(nid_key), nid);
1412                 LASSERTF(doomed_exp != obd->obd_self_export,
1413                          "self-export is hashed by NID?\n");
1414                 exports_evicted++;
1415                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1416                               "request\n", obd->obd_name,
1417                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1418                               obd_export_nid2str(doomed_exp));
1419                 class_fail_export(doomed_exp);
1420                 class_export_put(doomed_exp);
1421         } while (1);
1422
1423         cfs_hash_putref(nid_hash);
1424
1425         if (!exports_evicted)
1426                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1427                        obd->obd_name, nid);
1428         return exports_evicted;
1429 }
1430 EXPORT_SYMBOL(obd_export_evict_by_nid);
1431
1432 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1433 {
1434         struct cfs_hash *uuid_hash;
1435         struct obd_export *doomed_exp = NULL;
1436         struct obd_uuid doomed_uuid;
1437         int exports_evicted = 0;
1438
1439         spin_lock(&obd->obd_dev_lock);
1440         if (obd->obd_stopping) {
1441                 spin_unlock(&obd->obd_dev_lock);
1442                 return exports_evicted;
1443         }
1444         uuid_hash = obd->obd_uuid_hash;
1445         cfs_hash_getref(uuid_hash);
1446         spin_unlock(&obd->obd_dev_lock);
1447
1448         obd_str2uuid(&doomed_uuid, uuid);
1449         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1450                 CERROR("%s: can't evict myself\n", obd->obd_name);
1451                 cfs_hash_putref(uuid_hash);
1452                 return exports_evicted;
1453         }
1454
1455         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1456
1457         if (doomed_exp == NULL) {
1458                 CERROR("%s: can't disconnect %s: no exports found\n",
1459                        obd->obd_name, uuid);
1460         } else {
1461                 CWARN("%s: evicting %s at administrative request\n",
1462                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1463                 class_fail_export(doomed_exp);
1464                 class_export_put(doomed_exp);
1465                 exports_evicted++;
1466         }
1467         cfs_hash_putref(uuid_hash);
1468
1469         return exports_evicted;
1470 }
1471 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1472
1473 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1474 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1475 EXPORT_SYMBOL(class_export_dump_hook);
1476 #endif
1477
1478 static void print_export_data(struct obd_export *exp, const char *status,
1479                               int locks)
1480 {
1481         struct ptlrpc_reply_state *rs;
1482         struct ptlrpc_reply_state *first_reply = NULL;
1483         int nreplies = 0;
1484
1485         spin_lock(&exp->exp_lock);
1486         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1487                                 rs_exp_list) {
1488                 if (nreplies == 0)
1489                         first_reply = rs;
1490                 nreplies++;
1491         }
1492         spin_unlock(&exp->exp_lock);
1493
1494         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1495                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1496                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1497                atomic_read(&exp->exp_rpc_count),
1498                atomic_read(&exp->exp_cb_count),
1499                atomic_read(&exp->exp_locks_count),
1500                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1501                nreplies, first_reply, nreplies > 3 ? "..." : "",
1502                exp->exp_last_committed);
1503 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1504         if (locks && class_export_dump_hook != NULL)
1505                 class_export_dump_hook(exp);
1506 #endif
1507 }
1508
1509 void dump_exports(struct obd_device *obd, int locks)
1510 {
1511         struct obd_export *exp;
1512
1513         spin_lock(&obd->obd_dev_lock);
1514         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1515                 print_export_data(exp, "ACTIVE", locks);
1516         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1517                 print_export_data(exp, "UNLINKED", locks);
1518         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1519                 print_export_data(exp, "DELAYED", locks);
1520         spin_unlock(&obd->obd_dev_lock);
1521         spin_lock(&obd_zombie_impexp_lock);
1522         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1523                 print_export_data(exp, "ZOMBIE", locks);
1524         spin_unlock(&obd_zombie_impexp_lock);
1525 }
1526 EXPORT_SYMBOL(dump_exports);
1527
1528 void obd_exports_barrier(struct obd_device *obd)
1529 {
1530         int waited = 2;
1531         LASSERT(list_empty(&obd->obd_exports));
1532         spin_lock(&obd->obd_dev_lock);
1533         while (!list_empty(&obd->obd_unlinked_exports)) {
1534                 spin_unlock(&obd->obd_dev_lock);
1535                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1536                                                    cfs_time_seconds(waited));
1537                 if (waited > 5 && IS_PO2(waited)) {
1538                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1539                                       "more than %d seconds. "
1540                                       "The obd refcount = %d. Is it stuck?\n",
1541                                       obd->obd_name, waited,
1542                                       atomic_read(&obd->obd_refcount));
1543                         dump_exports(obd, 1);
1544                 }
1545                 waited *= 2;
1546                 spin_lock(&obd->obd_dev_lock);
1547         }
1548         spin_unlock(&obd->obd_dev_lock);
1549 }
1550 EXPORT_SYMBOL(obd_exports_barrier);
1551
1552 /* Total amount of zombies to be destroyed */
1553 static int zombies_count = 0;
1554
1555 /**
1556  * kill zombie imports and exports
1557  */
1558 void obd_zombie_impexp_cull(void)
1559 {
1560         struct obd_import *import;
1561         struct obd_export *export;
1562
1563         do {
1564                 spin_lock(&obd_zombie_impexp_lock);
1565
1566                 import = NULL;
1567                 if (!list_empty(&obd_zombie_imports)) {
1568                         import = list_entry(obd_zombie_imports.next,
1569                                                 struct obd_import,
1570                                                 imp_zombie_chain);
1571                         list_del_init(&import->imp_zombie_chain);
1572                 }
1573
1574                 export = NULL;
1575                 if (!list_empty(&obd_zombie_exports)) {
1576                         export = list_entry(obd_zombie_exports.next,
1577                                                 struct obd_export,
1578                                                 exp_obd_chain);
1579                         list_del_init(&export->exp_obd_chain);
1580                 }
1581
1582                 spin_unlock(&obd_zombie_impexp_lock);
1583
1584                 if (import != NULL) {
1585                         class_import_destroy(import);
1586                         spin_lock(&obd_zombie_impexp_lock);
1587                         zombies_count--;
1588                         spin_unlock(&obd_zombie_impexp_lock);
1589                 }
1590
1591                 if (export != NULL) {
1592                         class_export_destroy(export);
1593                         spin_lock(&obd_zombie_impexp_lock);
1594                         zombies_count--;
1595                         spin_unlock(&obd_zombie_impexp_lock);
1596                 }
1597
1598                 cond_resched();
1599         } while (import != NULL || export != NULL);
1600 }
1601
1602 static struct completion        obd_zombie_start;
1603 static struct completion        obd_zombie_stop;
1604 static unsigned long            obd_zombie_flags;
1605 static wait_queue_head_t                obd_zombie_waitq;
1606 static pid_t                    obd_zombie_pid;
1607
1608 enum {
1609         OBD_ZOMBIE_STOP         = 0x0001,
1610 };
1611
1612 /**
1613  * check for work for kill zombie import/export thread.
1614  */
1615 static int obd_zombie_impexp_check(void *arg)
1616 {
1617         int rc;
1618
1619         spin_lock(&obd_zombie_impexp_lock);
1620         rc = (zombies_count == 0) &&
1621              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1622         spin_unlock(&obd_zombie_impexp_lock);
1623
1624         return rc;
1625 }
1626
1627 /**
1628  * Add export to the obd_zombe thread and notify it.
1629  */
1630 static void obd_zombie_export_add(struct obd_export *exp) {
1631         spin_lock(&exp->exp_obd->obd_dev_lock);
1632         LASSERT(!list_empty(&exp->exp_obd_chain));
1633         list_del_init(&exp->exp_obd_chain);
1634         spin_unlock(&exp->exp_obd->obd_dev_lock);
1635         spin_lock(&obd_zombie_impexp_lock);
1636         zombies_count++;
1637         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1638         spin_unlock(&obd_zombie_impexp_lock);
1639
1640         obd_zombie_impexp_notify();
1641 }
1642
1643 /**
1644  * Add import to the obd_zombe thread and notify it.
1645  */
1646 static void obd_zombie_import_add(struct obd_import *imp) {
1647         LASSERT(imp->imp_sec == NULL);
1648         LASSERT(imp->imp_rq_pool == NULL);
1649         spin_lock(&obd_zombie_impexp_lock);
1650         LASSERT(list_empty(&imp->imp_zombie_chain));
1651         zombies_count++;
1652         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1653         spin_unlock(&obd_zombie_impexp_lock);
1654
1655         obd_zombie_impexp_notify();
1656 }
1657
1658 /**
1659  * notify import/export destroy thread about new zombie.
1660  */
1661 static void obd_zombie_impexp_notify(void)
1662 {
1663         /*
1664          * Make sure obd_zomebie_impexp_thread get this notification.
1665          * It is possible this signal only get by obd_zombie_barrier, and
1666          * barrier gulps this notification and sleeps away and hangs ensues
1667          */
1668         wake_up_all(&obd_zombie_waitq);
1669 }
1670
1671 /**
1672  * check whether obd_zombie is idle
1673  */
1674 static int obd_zombie_is_idle(void)
1675 {
1676         int rc;
1677
1678         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1679         spin_lock(&obd_zombie_impexp_lock);
1680         rc = (zombies_count == 0);
1681         spin_unlock(&obd_zombie_impexp_lock);
1682         return rc;
1683 }
1684
1685 /**
1686  * wait when obd_zombie import/export queues become empty
1687  */
1688 void obd_zombie_barrier(void)
1689 {
1690         struct l_wait_info lwi = { 0 };
1691
1692         if (obd_zombie_pid == current_pid())
1693                 /* don't wait for myself */
1694                 return;
1695         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1696 }
1697 EXPORT_SYMBOL(obd_zombie_barrier);
1698
1699
1700 /**
1701  * destroy zombie export/import thread.
1702  */
1703 static int obd_zombie_impexp_thread(void *unused)
1704 {
1705         unshare_fs_struct();
1706         complete(&obd_zombie_start);
1707
1708         obd_zombie_pid = current_pid();
1709
1710         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1711                 struct l_wait_info lwi = { 0 };
1712
1713                 l_wait_event(obd_zombie_waitq,
1714                              !obd_zombie_impexp_check(NULL), &lwi);
1715                 obd_zombie_impexp_cull();
1716
1717                 /*
1718                  * Notify obd_zombie_barrier callers that queues
1719                  * may be empty.
1720                  */
1721                 wake_up(&obd_zombie_waitq);
1722         }
1723
1724         complete(&obd_zombie_stop);
1725
1726         return 0;
1727 }
1728
1729
1730 /**
1731  * start destroy zombie import/export thread
1732  */
1733 int obd_zombie_impexp_init(void)
1734 {
1735         struct task_struct *task;
1736
1737         INIT_LIST_HEAD(&obd_zombie_imports);
1738         INIT_LIST_HEAD(&obd_zombie_exports);
1739         spin_lock_init(&obd_zombie_impexp_lock);
1740         init_completion(&obd_zombie_start);
1741         init_completion(&obd_zombie_stop);
1742         init_waitqueue_head(&obd_zombie_waitq);
1743         obd_zombie_pid = 0;
1744
1745         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1746         if (IS_ERR(task))
1747                 return PTR_ERR(task);
1748
1749         wait_for_completion(&obd_zombie_start);
1750         return 0;
1751 }
1752 /**
1753  * stop destroy zombie import/export thread
1754  */
1755 void obd_zombie_impexp_stop(void)
1756 {
1757         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1758         obd_zombie_impexp_notify();
1759         wait_for_completion(&obd_zombie_stop);
1760 }
1761
1762 /***** Kernel-userspace comm helpers *******/
1763
1764 /* Get length of entire message, including header */
1765 int kuc_len(int payload_len)
1766 {
1767         return sizeof(struct kuc_hdr) + payload_len;
1768 }
1769 EXPORT_SYMBOL(kuc_len);
1770
1771 /* Get a pointer to kuc header, given a ptr to the payload
1772  * @param p Pointer to payload area
1773  * @returns Pointer to kuc header
1774  */
1775 struct kuc_hdr * kuc_ptr(void *p)
1776 {
1777         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1778         LASSERT(lh->kuc_magic == KUC_MAGIC);
1779         return lh;
1780 }
1781 EXPORT_SYMBOL(kuc_ptr);
1782
1783 /* Test if payload is part of kuc message
1784  * @param p Pointer to payload area
1785  * @returns boolean
1786  */
1787 int kuc_ispayload(void *p)
1788 {
1789         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1790
1791         if (kh->kuc_magic == KUC_MAGIC)
1792                 return 1;
1793         else
1794                 return 0;
1795 }
1796 EXPORT_SYMBOL(kuc_ispayload);
1797
1798 /* Alloc space for a message, and fill in header
1799  * @return Pointer to payload area
1800  */
1801 void *kuc_alloc(int payload_len, int transport, int type)
1802 {
1803         struct kuc_hdr *lh;
1804         int len = kuc_len(payload_len);
1805
1806         OBD_ALLOC(lh, len);
1807         if (lh == NULL)
1808                 return ERR_PTR(-ENOMEM);
1809
1810         lh->kuc_magic = KUC_MAGIC;
1811         lh->kuc_transport = transport;
1812         lh->kuc_msgtype = type;
1813         lh->kuc_msglen = len;
1814
1815         return (void *)(lh + 1);
1816 }
1817 EXPORT_SYMBOL(kuc_alloc);
1818
1819 /* Takes pointer to payload area */
1820 inline void kuc_free(void *p, int payload_len)
1821 {
1822         struct kuc_hdr *lh = kuc_ptr(p);
1823         OBD_FREE(lh, kuc_len(payload_len));
1824 }
1825 EXPORT_SYMBOL(kuc_free);