]> Pileus Git - ~andy/linux/blob - fs/dlm/dir.c
Merge tag 'fbdev-fixes-for-3.3-1' of git://github.com/schandinat/linux-2.6
[~andy/linux] / fs / dlm / dir.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25
26
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29         spin_lock(&ls->ls_recover_list_lock);
30         list_add(&de->list, &ls->ls_recover_list);
31         spin_unlock(&ls->ls_recover_list_lock);
32 }
33
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36         int found = 0;
37         struct dlm_direntry *de;
38
39         spin_lock(&ls->ls_recover_list_lock);
40         list_for_each_entry(de, &ls->ls_recover_list, list) {
41                 if (de->length == len) {
42                         list_del(&de->list);
43                         de->master_nodeid = 0;
44                         memset(de->name, 0, len);
45                         found = 1;
46                         break;
47                 }
48         }
49         spin_unlock(&ls->ls_recover_list_lock);
50
51         if (!found)
52                 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53         return de;
54 }
55
56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58         struct dlm_direntry *de;
59
60         spin_lock(&ls->ls_recover_list_lock);
61         while (!list_empty(&ls->ls_recover_list)) {
62                 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63                                 list);
64                 list_del(&de->list);
65                 kfree(de);
66         }
67         spin_unlock(&ls->ls_recover_list_lock);
68 }
69
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81         struct list_head *tmp;
82         struct dlm_member *memb = NULL;
83         uint32_t node, n = 0;
84         int nodeid;
85
86         if (ls->ls_num_nodes == 1) {
87                 nodeid = dlm_our_nodeid();
88                 goto out;
89         }
90
91         if (ls->ls_node_array) {
92                 node = (hash >> 16) % ls->ls_total_weight;
93                 nodeid = ls->ls_node_array[node];
94                 goto out;
95         }
96
97         /* make_member_array() failed to kmalloc ls_node_array... */
98
99         node = (hash >> 16) % ls->ls_num_nodes;
100
101         list_for_each(tmp, &ls->ls_nodes) {
102                 if (n++ != node)
103                         continue;
104                 memb = list_entry(tmp, struct dlm_member, list);
105                 break;
106         }
107
108         DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109                                  ls->ls_num_nodes, n, node););
110         nodeid = memb->nodeid;
111  out:
112         return nodeid;
113 }
114
115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117         return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122         uint32_t val;
123
124         val = jhash(name, len, 0);
125         val &= (ls->ls_dirtbl_size - 1);
126
127         return val;
128 }
129
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132         uint32_t bucket;
133
134         bucket = dir_hash(ls, de->name, de->length);
135         list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139                                           int namelen, uint32_t bucket)
140 {
141         struct dlm_direntry *de;
142
143         list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144                 if (de->length == namelen && !memcmp(name, de->name, namelen))
145                         goto out;
146         }
147         de = NULL;
148  out:
149         return de;
150 }
151
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154         struct dlm_direntry *de;
155         uint32_t bucket;
156
157         bucket = dir_hash(ls, name, namelen);
158
159         spin_lock(&ls->ls_dirtbl[bucket].lock);
160
161         de = search_bucket(ls, name, namelen, bucket);
162
163         if (!de) {
164                 log_error(ls, "remove fr %u none", nodeid);
165                 goto out;
166         }
167
168         if (de->master_nodeid != nodeid) {
169                 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170                 goto out;
171         }
172
173         list_del(&de->list);
174         kfree(de);
175  out:
176         spin_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178
179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181         struct list_head *head;
182         struct dlm_direntry *de;
183         int i;
184
185         DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187         for (i = 0; i < ls->ls_dirtbl_size; i++) {
188                 spin_lock(&ls->ls_dirtbl[i].lock);
189                 head = &ls->ls_dirtbl[i].list;
190                 while (!list_empty(head)) {
191                         de = list_entry(head->next, struct dlm_direntry, list);
192                         list_del(&de->list);
193                         put_free_de(ls, de);
194                 }
195                 spin_unlock(&ls->ls_dirtbl[i].lock);
196         }
197 }
198
199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201         struct dlm_member *memb;
202         struct dlm_direntry *de;
203         char *b, *last_name = NULL;
204         int error = -ENOMEM, last_len, count = 0;
205         uint16_t namelen;
206
207         log_debug(ls, "dlm_recover_directory");
208
209         if (dlm_no_directory(ls))
210                 goto out_status;
211
212         dlm_dir_clear(ls);
213
214         last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215         if (!last_name)
216                 goto out;
217
218         list_for_each_entry(memb, &ls->ls_nodes, list) {
219                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220                 last_len = 0;
221
222                 for (;;) {
223                         int left;
224                         error = dlm_recovery_stopped(ls);
225                         if (error)
226                                 goto out_free;
227
228                         error = dlm_rcom_names(ls, memb->nodeid,
229                                                last_name, last_len);
230                         if (error)
231                                 goto out_free;
232
233                         schedule();
234
235                         /*
236                          * pick namelen/name pairs out of received buffer
237                          */
238
239                         b = ls->ls_recover_buf->rc_buf;
240                         left = ls->ls_recover_buf->rc_header.h_length;
241                         left -= sizeof(struct dlm_rcom);
242
243                         for (;;) {
244                                 __be16 v;
245
246                                 error = -EINVAL;
247                                 if (left < sizeof(__be16))
248                                         goto out_free;
249
250                                 memcpy(&v, b, sizeof(__be16));
251                                 namelen = be16_to_cpu(v);
252                                 b += sizeof(__be16);
253                                 left -= sizeof(__be16);
254
255                                 /* namelen of 0xFFFFF marks end of names for
256                                    this node; namelen of 0 marks end of the
257                                    buffer */
258
259                                 if (namelen == 0xFFFF)
260                                         goto done;
261                                 if (!namelen)
262                                         break;
263
264                                 if (namelen > left)
265                                         goto out_free;
266
267                                 if (namelen > DLM_RESNAME_MAXLEN)
268                                         goto out_free;
269
270                                 error = -ENOMEM;
271                                 de = get_free_de(ls, namelen);
272                                 if (!de)
273                                         goto out_free;
274
275                                 de->master_nodeid = memb->nodeid;
276                                 de->length = namelen;
277                                 last_len = namelen;
278                                 memcpy(de->name, b, namelen);
279                                 memcpy(last_name, b, namelen);
280                                 b += namelen;
281                                 left -= namelen;
282
283                                 add_entry_to_hash(ls, de);
284                                 count++;
285                         }
286                 }
287          done:
288                 ;
289         }
290
291  out_status:
292         error = 0;
293         log_debug(ls, "dlm_recover_directory %d entries", count);
294  out_free:
295         kfree(last_name);
296  out:
297         dlm_clear_free_entries(ls);
298         return error;
299 }
300
301 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302                      int namelen, int *r_nodeid)
303 {
304         struct dlm_direntry *de, *tmp;
305         uint32_t bucket;
306
307         bucket = dir_hash(ls, name, namelen);
308
309         spin_lock(&ls->ls_dirtbl[bucket].lock);
310         de = search_bucket(ls, name, namelen, bucket);
311         if (de) {
312                 *r_nodeid = de->master_nodeid;
313                 spin_unlock(&ls->ls_dirtbl[bucket].lock);
314                 if (*r_nodeid == nodeid)
315                         return -EEXIST;
316                 return 0;
317         }
318
319         spin_unlock(&ls->ls_dirtbl[bucket].lock);
320
321         if (namelen > DLM_RESNAME_MAXLEN)
322                 return -EINVAL;
323
324         de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325         if (!de)
326                 return -ENOMEM;
327
328         de->master_nodeid = nodeid;
329         de->length = namelen;
330         memcpy(de->name, name, namelen);
331
332         spin_lock(&ls->ls_dirtbl[bucket].lock);
333         tmp = search_bucket(ls, name, namelen, bucket);
334         if (tmp) {
335                 kfree(de);
336                 de = tmp;
337         } else {
338                 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339         }
340         *r_nodeid = de->master_nodeid;
341         spin_unlock(&ls->ls_dirtbl[bucket].lock);
342         return 0;
343 }
344
345 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346                    int *r_nodeid)
347 {
348         return get_entry(ls, nodeid, name, namelen, r_nodeid);
349 }
350
351 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352 {
353         struct dlm_rsb *r;
354
355         down_read(&ls->ls_root_sem);
356         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
357                 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
358                         up_read(&ls->ls_root_sem);
359                         return r;
360                 }
361         }
362         up_read(&ls->ls_root_sem);
363         return NULL;
364 }
365
366 /* Find the rsb where we left off (or start again), then send rsb names
367    for rsb's we're master of and whose directory node matches the requesting
368    node.  inbuf is the rsb name last sent, inlen is the name's length */
369
370 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
371                            char *outbuf, int outlen, int nodeid)
372 {
373         struct list_head *list;
374         struct dlm_rsb *r;
375         int offset = 0, dir_nodeid;
376         __be16 be_namelen;
377
378         down_read(&ls->ls_root_sem);
379
380         if (inlen > 1) {
381                 r = find_rsb_root(ls, inbuf, inlen);
382                 if (!r) {
383                         inbuf[inlen - 1] = '\0';
384                         log_error(ls, "copy_master_names from %d start %d %s",
385                                   nodeid, inlen, inbuf);
386                         goto out;
387                 }
388                 list = r->res_root_list.next;
389         } else {
390                 list = ls->ls_root_list.next;
391         }
392
393         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
394                 r = list_entry(list, struct dlm_rsb, res_root_list);
395                 if (r->res_nodeid)
396                         continue;
397
398                 dir_nodeid = dlm_dir_nodeid(r);
399                 if (dir_nodeid != nodeid)
400                         continue;
401
402                 /*
403                  * The block ends when we can't fit the following in the
404                  * remaining buffer space:
405                  * namelen (uint16_t) +
406                  * name (r->res_length) +
407                  * end-of-block record 0x0000 (uint16_t)
408                  */
409
410                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
411                         /* Write end-of-block record */
412                         be_namelen = cpu_to_be16(0);
413                         memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
414                         offset += sizeof(__be16);
415                         goto out;
416                 }
417
418                 be_namelen = cpu_to_be16(r->res_length);
419                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
420                 offset += sizeof(__be16);
421                 memcpy(outbuf + offset, r->res_name, r->res_length);
422                 offset += r->res_length;
423         }
424
425         /*
426          * If we've reached the end of the list (and there's room) write a
427          * terminating record.
428          */
429
430         if ((list == &ls->ls_root_list) &&
431             (offset + sizeof(uint16_t) <= outlen)) {
432                 be_namelen = cpu_to_be16(0xFFFF);
433                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
434                 offset += sizeof(__be16);
435         }
436
437  out:
438         up_read(&ls->ls_root_sem);
439 }
440