]> Pileus Git - ~andy/linux/blob - fs/dlm/member.c
Linux 3.14
[~andy/linux] / fs / dlm / member.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 #include "dlm_internal.h"
14 #include "lockspace.h"
15 #include "member.h"
16 #include "recoverd.h"
17 #include "recover.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "lowcomms.h"
21
22 int dlm_slots_version(struct dlm_header *h)
23 {
24         if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
25                 return 0;
26         return 1;
27 }
28
29 void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
30                    struct dlm_member *memb)
31 {
32         struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
33
34         if (!dlm_slots_version(&rc->rc_header))
35                 return;
36
37         memb->slot = le16_to_cpu(rf->rf_our_slot);
38         memb->generation = le32_to_cpu(rf->rf_generation);
39 }
40
41 void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
42 {
43         struct dlm_slot *slot;
44         struct rcom_slot *ro;
45         int i;
46
47         ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
48
49         /* ls_slots array is sparse, but not rcom_slots */
50
51         for (i = 0; i < ls->ls_slots_size; i++) {
52                 slot = &ls->ls_slots[i];
53                 if (!slot->nodeid)
54                         continue;
55                 ro->ro_nodeid = cpu_to_le32(slot->nodeid);
56                 ro->ro_slot = cpu_to_le16(slot->slot);
57                 ro++;
58         }
59 }
60
61 #define SLOT_DEBUG_LINE 128
62
63 static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
64                             struct rcom_slot *ro0, struct dlm_slot *array,
65                             int array_size)
66 {
67         char line[SLOT_DEBUG_LINE];
68         int len = SLOT_DEBUG_LINE - 1;
69         int pos = 0;
70         int ret, i;
71
72         if (!dlm_config.ci_log_debug)
73                 return;
74
75         memset(line, 0, sizeof(line));
76
77         if (array) {
78                 for (i = 0; i < array_size; i++) {
79                         if (!array[i].nodeid)
80                                 continue;
81
82                         ret = snprintf(line + pos, len - pos, " %d:%d",
83                                        array[i].slot, array[i].nodeid);
84                         if (ret >= len - pos)
85                                 break;
86                         pos += ret;
87                 }
88         } else if (ro0) {
89                 for (i = 0; i < num_slots; i++) {
90                         ret = snprintf(line + pos, len - pos, " %d:%d",
91                                        ro0[i].ro_slot, ro0[i].ro_nodeid);
92                         if (ret >= len - pos)
93                                 break;
94                         pos += ret;
95                 }
96         }
97
98         log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
99 }
100
101 int dlm_slots_copy_in(struct dlm_ls *ls)
102 {
103         struct dlm_member *memb;
104         struct dlm_rcom *rc = ls->ls_recover_buf;
105         struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
106         struct rcom_slot *ro0, *ro;
107         int our_nodeid = dlm_our_nodeid();
108         int i, num_slots;
109         uint32_t gen;
110
111         if (!dlm_slots_version(&rc->rc_header))
112                 return -1;
113
114         gen = le32_to_cpu(rf->rf_generation);
115         if (gen <= ls->ls_generation) {
116                 log_error(ls, "dlm_slots_copy_in gen %u old %u",
117                           gen, ls->ls_generation);
118         }
119         ls->ls_generation = gen;
120
121         num_slots = le16_to_cpu(rf->rf_num_slots);
122         if (!num_slots)
123                 return -1;
124
125         ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
126
127         for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
128                 ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
129                 ro->ro_slot = le16_to_cpu(ro->ro_slot);
130         }
131
132         log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
133
134         list_for_each_entry(memb, &ls->ls_nodes, list) {
135                 for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
136                         if (ro->ro_nodeid != memb->nodeid)
137                                 continue;
138                         memb->slot = ro->ro_slot;
139                         memb->slot_prev = memb->slot;
140                         break;
141                 }
142
143                 if (memb->nodeid == our_nodeid) {
144                         if (ls->ls_slot && ls->ls_slot != memb->slot) {
145                                 log_error(ls, "dlm_slots_copy_in our slot "
146                                           "changed %d %d", ls->ls_slot,
147                                           memb->slot);
148                                 return -1;
149                         }
150
151                         if (!ls->ls_slot)
152                                 ls->ls_slot = memb->slot;
153                 }
154
155                 if (!memb->slot) {
156                         log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
157                                    memb->nodeid);
158                         return -1;
159                 }
160         }
161
162         return 0;
163 }
164
165 /* for any nodes that do not support slots, we will not have set memb->slot
166    in wait_status_all(), so memb->slot will remain -1, and we will not
167    assign slots or set ls_num_slots here */
168
169 int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
170                      struct dlm_slot **slots_out, uint32_t *gen_out)
171 {
172         struct dlm_member *memb;
173         struct dlm_slot *array;
174         int our_nodeid = dlm_our_nodeid();
175         int array_size, max_slots, i;
176         int need = 0;
177         int max = 0;
178         int num = 0;
179         uint32_t gen = 0;
180
181         /* our own memb struct will have slot -1 gen 0 */
182
183         list_for_each_entry(memb, &ls->ls_nodes, list) {
184                 if (memb->nodeid == our_nodeid) {
185                         memb->slot = ls->ls_slot;
186                         memb->generation = ls->ls_generation;
187                         break;
188                 }
189         }
190
191         list_for_each_entry(memb, &ls->ls_nodes, list) {
192                 if (memb->generation > gen)
193                         gen = memb->generation;
194
195                 /* node doesn't support slots */
196
197                 if (memb->slot == -1)
198                         return -1;
199
200                 /* node needs a slot assigned */
201
202                 if (!memb->slot)
203                         need++;
204
205                 /* node has a slot assigned */
206
207                 num++;
208
209                 if (!max || max < memb->slot)
210                         max = memb->slot;
211
212                 /* sanity check, once slot is assigned it shouldn't change */
213
214                 if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
215                         log_error(ls, "nodeid %d slot changed %d %d",
216                                   memb->nodeid, memb->slot_prev, memb->slot);
217                         return -1;
218                 }
219                 memb->slot_prev = memb->slot;
220         }
221
222         array_size = max + need;
223
224         array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
225         if (!array)
226                 return -ENOMEM;
227
228         num = 0;
229
230         /* fill in slots (offsets) that are used */
231
232         list_for_each_entry(memb, &ls->ls_nodes, list) {
233                 if (!memb->slot)
234                         continue;
235
236                 if (memb->slot > array_size) {
237                         log_error(ls, "invalid slot number %d", memb->slot);
238                         kfree(array);
239                         return -1;
240                 }
241
242                 array[memb->slot - 1].nodeid = memb->nodeid;
243                 array[memb->slot - 1].slot = memb->slot;
244                 num++;
245         }
246
247         /* assign new slots from unused offsets */
248
249         list_for_each_entry(memb, &ls->ls_nodes, list) {
250                 if (memb->slot)
251                         continue;
252
253                 for (i = 0; i < array_size; i++) {
254                         if (array[i].nodeid)
255                                 continue;
256
257                         memb->slot = i + 1;
258                         memb->slot_prev = memb->slot;
259                         array[i].nodeid = memb->nodeid;
260                         array[i].slot = memb->slot;
261                         num++;
262
263                         if (!ls->ls_slot && memb->nodeid == our_nodeid)
264                                 ls->ls_slot = memb->slot;
265                         break;
266                 }
267
268                 if (!memb->slot) {
269                         log_error(ls, "no free slot found");
270                         kfree(array);
271                         return -1;
272                 }
273         }
274
275         gen++;
276
277         log_debug_slots(ls, gen, num, NULL, array, array_size);
278
279         max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
280                      sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
281
282         if (num > max_slots) {
283                 log_error(ls, "num_slots %d exceeds max_slots %d",
284                           num, max_slots);
285                 kfree(array);
286                 return -1;
287         }
288
289         *gen_out = gen;
290         *slots_out = array;
291         *slots_size = array_size;
292         *num_slots = num;
293         return 0;
294 }
295
296 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
297 {
298         struct dlm_member *memb = NULL;
299         struct list_head *tmp;
300         struct list_head *newlist = &new->list;
301         struct list_head *head = &ls->ls_nodes;
302
303         list_for_each(tmp, head) {
304                 memb = list_entry(tmp, struct dlm_member, list);
305                 if (new->nodeid < memb->nodeid)
306                         break;
307         }
308
309         if (!memb)
310                 list_add_tail(newlist, head);
311         else {
312                 /* FIXME: can use list macro here */
313                 newlist->prev = tmp->prev;
314                 newlist->next = tmp;
315                 tmp->prev->next = newlist;
316                 tmp->prev = newlist;
317         }
318 }
319
320 static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
321 {
322         struct dlm_member *memb;
323         int error;
324
325         memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
326         if (!memb)
327                 return -ENOMEM;
328
329         error = dlm_lowcomms_connect_node(node->nodeid);
330         if (error < 0) {
331                 kfree(memb);
332                 return error;
333         }
334
335         memb->nodeid = node->nodeid;
336         memb->weight = node->weight;
337         memb->comm_seq = node->comm_seq;
338         add_ordered_member(ls, memb);
339         ls->ls_num_nodes++;
340         return 0;
341 }
342
343 static struct dlm_member *find_memb(struct list_head *head, int nodeid)
344 {
345         struct dlm_member *memb;
346
347         list_for_each_entry(memb, head, list) {
348                 if (memb->nodeid == nodeid)
349                         return memb;
350         }
351         return NULL;
352 }
353
354 int dlm_is_member(struct dlm_ls *ls, int nodeid)
355 {
356         if (find_memb(&ls->ls_nodes, nodeid))
357                 return 1;
358         return 0;
359 }
360
361 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
362 {
363         if (find_memb(&ls->ls_nodes_gone, nodeid))
364                 return 1;
365         return 0;
366 }
367
368 static void clear_memb_list(struct list_head *head)
369 {
370         struct dlm_member *memb;
371
372         while (!list_empty(head)) {
373                 memb = list_entry(head->next, struct dlm_member, list);
374                 list_del(&memb->list);
375                 kfree(memb);
376         }
377 }
378
379 void dlm_clear_members(struct dlm_ls *ls)
380 {
381         clear_memb_list(&ls->ls_nodes);
382         ls->ls_num_nodes = 0;
383 }
384
385 void dlm_clear_members_gone(struct dlm_ls *ls)
386 {
387         clear_memb_list(&ls->ls_nodes_gone);
388 }
389
390 static void make_member_array(struct dlm_ls *ls)
391 {
392         struct dlm_member *memb;
393         int i, w, x = 0, total = 0, all_zero = 0, *array;
394
395         kfree(ls->ls_node_array);
396         ls->ls_node_array = NULL;
397
398         list_for_each_entry(memb, &ls->ls_nodes, list) {
399                 if (memb->weight)
400                         total += memb->weight;
401         }
402
403         /* all nodes revert to weight of 1 if all have weight 0 */
404
405         if (!total) {
406                 total = ls->ls_num_nodes;
407                 all_zero = 1;
408         }
409
410         ls->ls_total_weight = total;
411
412         array = kmalloc(sizeof(int) * total, GFP_NOFS);
413         if (!array)
414                 return;
415
416         list_for_each_entry(memb, &ls->ls_nodes, list) {
417                 if (!all_zero && !memb->weight)
418                         continue;
419
420                 if (all_zero)
421                         w = 1;
422                 else
423                         w = memb->weight;
424
425                 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
426
427                 for (i = 0; i < w; i++)
428                         array[x++] = memb->nodeid;
429         }
430
431         ls->ls_node_array = array;
432 }
433
434 /* send a status request to all members just to establish comms connections */
435
436 static int ping_members(struct dlm_ls *ls)
437 {
438         struct dlm_member *memb;
439         int error = 0;
440
441         list_for_each_entry(memb, &ls->ls_nodes, list) {
442                 error = dlm_recovery_stopped(ls);
443                 if (error)
444                         break;
445                 error = dlm_rcom_status(ls, memb->nodeid, 0);
446                 if (error)
447                         break;
448         }
449         if (error)
450                 log_debug(ls, "ping_members aborted %d last nodeid %d",
451                           error, ls->ls_recover_nodeid);
452         return error;
453 }
454
455 static void dlm_lsop_recover_prep(struct dlm_ls *ls)
456 {
457         if (!ls->ls_ops || !ls->ls_ops->recover_prep)
458                 return;
459         ls->ls_ops->recover_prep(ls->ls_ops_arg);
460 }
461
462 static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
463 {
464         struct dlm_slot slot;
465         uint32_t seq;
466         int error;
467
468         if (!ls->ls_ops || !ls->ls_ops->recover_slot)
469                 return;
470
471         /* if there is no comms connection with this node
472            or the present comms connection is newer
473            than the one when this member was added, then
474            we consider the node to have failed (versus
475            being removed due to dlm_release_lockspace) */
476
477         error = dlm_comm_seq(memb->nodeid, &seq);
478
479         if (!error && seq == memb->comm_seq)
480                 return;
481
482         slot.nodeid = memb->nodeid;
483         slot.slot = memb->slot;
484
485         ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
486 }
487
488 void dlm_lsop_recover_done(struct dlm_ls *ls)
489 {
490         struct dlm_member *memb;
491         struct dlm_slot *slots;
492         int i, num;
493
494         if (!ls->ls_ops || !ls->ls_ops->recover_done)
495                 return;
496
497         num = ls->ls_num_nodes;
498
499         slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
500         if (!slots)
501                 return;
502
503         i = 0;
504         list_for_each_entry(memb, &ls->ls_nodes, list) {
505                 if (i == num) {
506                         log_error(ls, "dlm_lsop_recover_done bad num %d", num);
507                         goto out;
508                 }
509                 slots[i].nodeid = memb->nodeid;
510                 slots[i].slot = memb->slot;
511                 i++;
512         }
513
514         ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
515                                  ls->ls_slot, ls->ls_generation);
516  out:
517         kfree(slots);
518 }
519
520 static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
521                                                 int nodeid)
522 {
523         int i;
524
525         for (i = 0; i < rv->nodes_count; i++) {
526                 if (rv->nodes[i].nodeid == nodeid)
527                         return &rv->nodes[i];
528         }
529         return NULL;
530 }
531
532 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
533 {
534         struct dlm_member *memb, *safe;
535         struct dlm_config_node *node;
536         int i, error, neg = 0, low = -1;
537
538         /* previously removed members that we've not finished removing need to
539            count as a negative change so the "neg" recovery steps will happen */
540
541         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
542                 log_debug(ls, "prev removed member %d", memb->nodeid);
543                 neg++;
544         }
545
546         /* move departed members from ls_nodes to ls_nodes_gone */
547
548         list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
549                 node = find_config_node(rv, memb->nodeid);
550                 if (node && !node->new)
551                         continue;
552
553                 if (!node) {
554                         log_debug(ls, "remove member %d", memb->nodeid);
555                 } else {
556                         /* removed and re-added */
557                         log_debug(ls, "remove member %d comm_seq %u %u",
558                                   memb->nodeid, memb->comm_seq, node->comm_seq);
559                 }
560
561                 neg++;
562                 list_move(&memb->list, &ls->ls_nodes_gone);
563                 ls->ls_num_nodes--;
564                 dlm_lsop_recover_slot(ls, memb);
565         }
566
567         /* add new members to ls_nodes */
568
569         for (i = 0; i < rv->nodes_count; i++) {
570                 node = &rv->nodes[i];
571                 if (dlm_is_member(ls, node->nodeid))
572                         continue;
573                 dlm_add_member(ls, node);
574                 log_debug(ls, "add member %d", node->nodeid);
575         }
576
577         list_for_each_entry(memb, &ls->ls_nodes, list) {
578                 if (low == -1 || memb->nodeid < low)
579                         low = memb->nodeid;
580         }
581         ls->ls_low_nodeid = low;
582
583         make_member_array(ls);
584         *neg_out = neg;
585
586         error = ping_members(ls);
587         if (!error || error == -EPROTO) {
588                 /* new_lockspace() may be waiting to know if the config
589                    is good or bad */
590                 ls->ls_members_result = error;
591                 complete(&ls->ls_members_done);
592         }
593
594         log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
595         return error;
596 }
597
598 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
599    dlm_ls_start() is called on any of them to start the new recovery. */
600
601 int dlm_ls_stop(struct dlm_ls *ls)
602 {
603         int new;
604
605         /*
606          * Prevent dlm_recv from being in the middle of something when we do
607          * the stop.  This includes ensuring dlm_recv isn't processing a
608          * recovery message (rcom), while dlm_recoverd is aborting and
609          * resetting things from an in-progress recovery.  i.e. we want
610          * dlm_recoverd to abort its recovery without worrying about dlm_recv
611          * processing an rcom at the same time.  Stopping dlm_recv also makes
612          * it easy for dlm_receive_message() to check locking stopped and add a
613          * message to the requestqueue without races.
614          */
615
616         down_write(&ls->ls_recv_active);
617
618         /*
619          * Abort any recovery that's in progress (see RECOVER_STOP,
620          * dlm_recovery_stopped()) and tell any other threads running in the
621          * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
622          */
623
624         spin_lock(&ls->ls_recover_lock);
625         set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
626         new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
627         ls->ls_recover_seq++;
628         spin_unlock(&ls->ls_recover_lock);
629
630         /*
631          * Let dlm_recv run again, now any normal messages will be saved on the
632          * requestqueue for later.
633          */
634
635         up_write(&ls->ls_recv_active);
636
637         /*
638          * This in_recovery lock does two things:
639          * 1) Keeps this function from returning until all threads are out
640          *    of locking routines and locking is truly stopped.
641          * 2) Keeps any new requests from being processed until it's unlocked
642          *    when recovery is complete.
643          */
644
645         if (new) {
646                 set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
647                 wake_up_process(ls->ls_recoverd_task);
648                 wait_event(ls->ls_recover_lock_wait,
649                            test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
650         }
651
652         /*
653          * The recoverd suspend/resume makes sure that dlm_recoverd (if
654          * running) has noticed RECOVER_STOP above and quit processing the
655          * previous recovery.
656          */
657
658         dlm_recoverd_suspend(ls);
659
660         spin_lock(&ls->ls_recover_lock);
661         kfree(ls->ls_slots);
662         ls->ls_slots = NULL;
663         ls->ls_num_slots = 0;
664         ls->ls_slots_size = 0;
665         ls->ls_recover_status = 0;
666         spin_unlock(&ls->ls_recover_lock);
667
668         dlm_recoverd_resume(ls);
669
670         if (!ls->ls_recover_begin)
671                 ls->ls_recover_begin = jiffies;
672
673         dlm_lsop_recover_prep(ls);
674         return 0;
675 }
676
677 int dlm_ls_start(struct dlm_ls *ls)
678 {
679         struct dlm_recover *rv = NULL, *rv_old;
680         struct dlm_config_node *nodes;
681         int error, count;
682
683         rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
684         if (!rv)
685                 return -ENOMEM;
686
687         error = dlm_config_nodes(ls->ls_name, &nodes, &count);
688         if (error < 0)
689                 goto fail;
690
691         spin_lock(&ls->ls_recover_lock);
692
693         /* the lockspace needs to be stopped before it can be started */
694
695         if (!dlm_locking_stopped(ls)) {
696                 spin_unlock(&ls->ls_recover_lock);
697                 log_error(ls, "start ignored: lockspace running");
698                 error = -EINVAL;
699                 goto fail;
700         }
701
702         rv->nodes = nodes;
703         rv->nodes_count = count;
704         rv->seq = ++ls->ls_recover_seq;
705         rv_old = ls->ls_recover_args;
706         ls->ls_recover_args = rv;
707         spin_unlock(&ls->ls_recover_lock);
708
709         if (rv_old) {
710                 log_error(ls, "unused recovery %llx %d",
711                           (unsigned long long)rv_old->seq, rv_old->nodes_count);
712                 kfree(rv_old->nodes);
713                 kfree(rv_old);
714         }
715
716         set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
717         wake_up_process(ls->ls_recoverd_task);
718         return 0;
719
720  fail:
721         kfree(rv);
722         kfree(nodes);
723         return error;
724 }
725