1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
29 spin_lock(&ls->ls_recover_list_lock);
30 list_add(&de->list, &ls->ls_recover_list);
31 spin_unlock(&ls->ls_recover_list_lock);
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
37 struct dlm_direntry *de;
39 spin_lock(&ls->ls_recover_list_lock);
40 list_for_each_entry(de, &ls->ls_recover_list, list) {
41 if (de->length == len) {
43 de->master_nodeid = 0;
44 memset(de->name, 0, len);
49 spin_unlock(&ls->ls_recover_list_lock);
52 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_KERNEL);
56 void dlm_clear_free_entries(struct dlm_ls *ls)
58 struct dlm_direntry *de;
60 spin_lock(&ls->ls_recover_list_lock);
61 while (!list_empty(&ls->ls_recover_list)) {
62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
67 spin_unlock(&ls->ls_recover_list_lock);
71 * We use the upper 16 bits of the hash value to select the directory node.
72 * Low bits are used for distribution of rsb's among hash buckets on each node.
74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75 * num_nodes to the hash value. This value in the desired range is used as an
76 * offset into the sorted list of nodeid's to give the particular nodeid.
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
81 struct list_head *tmp;
82 struct dlm_member *memb = NULL;
86 if (ls->ls_num_nodes == 1) {
87 nodeid = dlm_our_nodeid();
91 if (ls->ls_node_array) {
92 node = (hash >> 16) % ls->ls_total_weight;
93 nodeid = ls->ls_node_array[node];
97 /* make_member_array() failed to kmalloc ls_node_array... */
99 node = (hash >> 16) % ls->ls_num_nodes;
101 list_for_each(tmp, &ls->ls_nodes) {
104 memb = list_entry(tmp, struct dlm_member, list);
108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 ls->ls_num_nodes, n, node););
110 nodeid = memb->nodeid;
115 int dlm_dir_nodeid(struct dlm_rsb *r)
117 return dlm_hash2nodeid(r->res_ls, r->res_hash);
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
124 val = jhash(name, len, 0);
125 val &= (ls->ls_dirtbl_size - 1);
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
134 bucket = dir_hash(ls, de->name, de->length);
135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 int namelen, uint32_t bucket)
141 struct dlm_direntry *de;
143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 if (de->length == namelen && !memcmp(name, de->name, namelen))
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
154 struct dlm_direntry *de;
157 bucket = dir_hash(ls, name, namelen);
159 write_lock(&ls->ls_dirtbl[bucket].lock);
161 de = search_bucket(ls, name, namelen, bucket);
164 log_error(ls, "remove fr %u none", nodeid);
168 if (de->master_nodeid != nodeid) {
169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
176 write_unlock(&ls->ls_dirtbl[bucket].lock);
179 void dlm_dir_clear(struct dlm_ls *ls)
181 struct list_head *head;
182 struct dlm_direntry *de;
185 DLM_ASSERT(list_empty(&ls->ls_recover_list), );
187 for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 write_lock(&ls->ls_dirtbl[i].lock);
189 head = &ls->ls_dirtbl[i].list;
190 while (!list_empty(head)) {
191 de = list_entry(head->next, struct dlm_direntry, list);
195 write_unlock(&ls->ls_dirtbl[i].lock);
199 int dlm_recover_directory(struct dlm_ls *ls)
201 struct dlm_member *memb;
202 struct dlm_direntry *de;
203 char *b, *last_name = NULL;
204 int error = -ENOMEM, last_len, count = 0;
207 log_debug(ls, "dlm_recover_directory");
209 if (dlm_no_directory(ls))
214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
218 list_for_each_entry(memb, &ls->ls_nodes, list) {
219 memset(last_name, 0, DLM_RESNAME_MAXLEN);
224 error = dlm_recovery_stopped(ls);
228 error = dlm_rcom_names(ls, memb->nodeid,
229 last_name, last_len);
236 * pick namelen/name pairs out of received buffer
239 b = ls->ls_recover_buf->rc_buf;
240 left = ls->ls_recover_buf->rc_header.h_length;
241 left -= sizeof(struct dlm_rcom);
247 if (left < sizeof(__be16))
250 memcpy(&v, b, sizeof(__be16));
251 namelen = be16_to_cpu(v);
253 left -= sizeof(__be16);
255 /* namelen of 0xFFFFF marks end of names for
256 this node; namelen of 0 marks end of the
259 if (namelen == 0xFFFF)
267 if (namelen > DLM_RESNAME_MAXLEN)
271 de = get_free_de(ls, namelen);
275 de->master_nodeid = memb->nodeid;
276 de->length = namelen;
278 memcpy(de->name, b, namelen);
279 memcpy(last_name, b, namelen);
283 add_entry_to_hash(ls, de);
293 dlm_set_recover_status(ls, DLM_RS_DIR);
294 log_debug(ls, "dlm_recover_directory %d entries", count);
298 dlm_clear_free_entries(ls);
302 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
303 int namelen, int *r_nodeid)
305 struct dlm_direntry *de, *tmp;
308 bucket = dir_hash(ls, name, namelen);
310 write_lock(&ls->ls_dirtbl[bucket].lock);
311 de = search_bucket(ls, name, namelen, bucket);
313 *r_nodeid = de->master_nodeid;
314 write_unlock(&ls->ls_dirtbl[bucket].lock);
315 if (*r_nodeid == nodeid)
320 write_unlock(&ls->ls_dirtbl[bucket].lock);
322 de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_KERNEL);
326 de->master_nodeid = nodeid;
327 de->length = namelen;
328 memcpy(de->name, name, namelen);
330 write_lock(&ls->ls_dirtbl[bucket].lock);
331 tmp = search_bucket(ls, name, namelen, bucket);
336 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
338 *r_nodeid = de->master_nodeid;
339 write_unlock(&ls->ls_dirtbl[bucket].lock);
343 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346 return get_entry(ls, nodeid, name, namelen, r_nodeid);
349 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
353 down_read(&ls->ls_root_sem);
354 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
355 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
356 up_read(&ls->ls_root_sem);
360 up_read(&ls->ls_root_sem);
364 /* Find the rsb where we left off (or start again), then send rsb names
365 for rsb's we're master of and whose directory node matches the requesting
366 node. inbuf is the rsb name last sent, inlen is the name's length */
368 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
369 char *outbuf, int outlen, int nodeid)
371 struct list_head *list;
373 int offset = 0, dir_nodeid;
376 down_read(&ls->ls_root_sem);
379 r = find_rsb_root(ls, inbuf, inlen);
381 inbuf[inlen - 1] = '\0';
382 log_error(ls, "copy_master_names from %d start %d %s",
383 nodeid, inlen, inbuf);
386 list = r->res_root_list.next;
388 list = ls->ls_root_list.next;
391 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
392 r = list_entry(list, struct dlm_rsb, res_root_list);
396 dir_nodeid = dlm_dir_nodeid(r);
397 if (dir_nodeid != nodeid)
401 * The block ends when we can't fit the following in the
402 * remaining buffer space:
403 * namelen (uint16_t) +
404 * name (r->res_length) +
405 * end-of-block record 0x0000 (uint16_t)
408 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
409 /* Write end-of-block record */
411 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
412 offset += sizeof(uint16_t);
416 be_namelen = cpu_to_be16(r->res_length);
417 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
418 offset += sizeof(uint16_t);
419 memcpy(outbuf + offset, r->res_name, r->res_length);
420 offset += r->res_length;
424 * If we've reached the end of the list (and there's room) write a
425 * terminating record.
428 if ((list == &ls->ls_root_list) &&
429 (offset + sizeof(uint16_t) <= outlen)) {
431 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
432 offset += sizeof(uint16_t);
436 up_read(&ls->ls_root_sem);