]> Pileus Git - ~andy/linux/blob - fs/dlm/recoverd.c
Merge tag 'ia64-3.5-merge' of git://git.kernel.org/pub/scm/linux/kernel/git/aegl...
[~andy/linux] / fs / dlm / recoverd.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "dir.h"
18 #include "ast.h"
19 #include "recover.h"
20 #include "lowcomms.h"
21 #include "lock.h"
22 #include "requestqueue.h"
23 #include "recoverd.h"
24
25
26 /* If the start for which we're re-enabling locking (seq) has been superseded
27    by a newer stop (ls_recover_seq), we need to leave locking disabled.
28
29    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
30    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
31    enables locking and clears the requestqueue between a and b. */
32
33 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
34 {
35         int error = -EINTR;
36
37         down_write(&ls->ls_recv_active);
38
39         spin_lock(&ls->ls_recover_lock);
40         if (ls->ls_recover_seq == seq) {
41                 set_bit(LSFL_RUNNING, &ls->ls_flags);
42                 /* unblocks processes waiting to enter the dlm */
43                 up_write(&ls->ls_in_recovery);
44                 error = 0;
45         }
46         spin_unlock(&ls->ls_recover_lock);
47
48         up_write(&ls->ls_recv_active);
49         return error;
50 }
51
52 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
53 {
54         unsigned long start;
55         int error, neg = 0;
56
57         log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
58
59         mutex_lock(&ls->ls_recoverd_active);
60
61         dlm_callback_suspend(ls);
62
63         /*
64          * Free non-master tossed rsb's.  Master rsb's are kept on toss
65          * list and put on root list to be included in resdir recovery.
66          */
67
68         dlm_clear_toss_list(ls);
69
70         /*
71          * This list of root rsb's will be the basis of most of the recovery
72          * routines.
73          */
74
75         dlm_create_root_list(ls);
76
77         /*
78          * Add or remove nodes from the lockspace's ls_nodes list.
79          */
80
81         error = dlm_recover_members(ls, rv, &neg);
82         if (error) {
83                 log_debug(ls, "dlm_recover_members error %d", error);
84                 goto fail;
85         }
86
87         ls->ls_recover_locks_in = 0;
88
89         dlm_set_recover_status(ls, DLM_RS_NODES);
90
91         error = dlm_recover_members_wait(ls);
92         if (error) {
93                 log_debug(ls, "dlm_recover_members_wait error %d", error);
94                 goto fail;
95         }
96
97         start = jiffies;
98
99         /*
100          * Rebuild our own share of the directory by collecting from all other
101          * nodes their master rsb names that hash to us.
102          */
103
104         error = dlm_recover_directory(ls);
105         if (error) {
106                 log_debug(ls, "dlm_recover_directory error %d", error);
107                 goto fail;
108         }
109
110         dlm_set_recover_status(ls, DLM_RS_DIR);
111
112         error = dlm_recover_directory_wait(ls);
113         if (error) {
114                 log_debug(ls, "dlm_recover_directory_wait error %d", error);
115                 goto fail;
116         }
117
118         /*
119          * We may have outstanding operations that are waiting for a reply from
120          * a failed node.  Mark these to be resent after recovery.  Unlock and
121          * cancel ops can just be completed.
122          */
123
124         dlm_recover_waiters_pre(ls);
125
126         error = dlm_recovery_stopped(ls);
127         if (error)
128                 goto fail;
129
130         if (neg || dlm_no_directory(ls)) {
131                 /*
132                  * Clear lkb's for departed nodes.
133                  */
134
135                 dlm_recover_purge(ls);
136
137                 /*
138                  * Get new master nodeid's for rsb's that were mastered on
139                  * departed nodes.
140                  */
141
142                 error = dlm_recover_masters(ls);
143                 if (error) {
144                         log_debug(ls, "dlm_recover_masters error %d", error);
145                         goto fail;
146                 }
147
148                 /*
149                  * Send our locks on remastered rsb's to the new masters.
150                  */
151
152                 error = dlm_recover_locks(ls);
153                 if (error) {
154                         log_debug(ls, "dlm_recover_locks error %d", error);
155                         goto fail;
156                 }
157
158                 dlm_set_recover_status(ls, DLM_RS_LOCKS);
159
160                 error = dlm_recover_locks_wait(ls);
161                 if (error) {
162                         log_debug(ls, "dlm_recover_locks_wait error %d", error);
163                         goto fail;
164                 }
165
166                 log_debug(ls, "dlm_recover_locks %u in",
167                           ls->ls_recover_locks_in);
168
169                 /*
170                  * Finalize state in master rsb's now that all locks can be
171                  * checked.  This includes conversion resolution and lvb
172                  * settings.
173                  */
174
175                 dlm_recover_rsbs(ls);
176         } else {
177                 /*
178                  * Other lockspace members may be going through the "neg" steps
179                  * while also adding us to the lockspace, in which case they'll
180                  * be doing the recover_locks (RS_LOCKS) barrier.
181                  */
182                 dlm_set_recover_status(ls, DLM_RS_LOCKS);
183
184                 error = dlm_recover_locks_wait(ls);
185                 if (error) {
186                         log_debug(ls, "dlm_recover_locks_wait error %d", error);
187                         goto fail;
188                 }
189         }
190
191         dlm_release_root_list(ls);
192
193         /*
194          * Purge directory-related requests that are saved in requestqueue.
195          * All dir requests from before recovery are invalid now due to the dir
196          * rebuild and will be resent by the requesting nodes.
197          */
198
199         dlm_purge_requestqueue(ls);
200
201         dlm_set_recover_status(ls, DLM_RS_DONE);
202
203         error = dlm_recover_done_wait(ls);
204         if (error) {
205                 log_debug(ls, "dlm_recover_done_wait error %d", error);
206                 goto fail;
207         }
208
209         dlm_clear_members_gone(ls);
210
211         dlm_adjust_timeouts(ls);
212
213         dlm_callback_resume(ls);
214
215         error = enable_locking(ls, rv->seq);
216         if (error) {
217                 log_debug(ls, "enable_locking error %d", error);
218                 goto fail;
219         }
220
221         error = dlm_process_requestqueue(ls);
222         if (error) {
223                 log_debug(ls, "dlm_process_requestqueue error %d", error);
224                 goto fail;
225         }
226
227         error = dlm_recover_waiters_post(ls);
228         if (error) {
229                 log_debug(ls, "dlm_recover_waiters_post error %d", error);
230                 goto fail;
231         }
232
233         dlm_recover_grant(ls);
234
235         log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
236                   (unsigned long long)rv->seq, ls->ls_generation,
237                   jiffies_to_msecs(jiffies - start));
238         mutex_unlock(&ls->ls_recoverd_active);
239
240         dlm_lsop_recover_done(ls);
241         return 0;
242
243  fail:
244         dlm_release_root_list(ls);
245         log_debug(ls, "dlm_recover %llu error %d",
246                   (unsigned long long)rv->seq, error);
247         mutex_unlock(&ls->ls_recoverd_active);
248         return error;
249 }
250
251 /* The dlm_ls_start() that created the rv we take here may already have been
252    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
253    flag set. */
254
255 static void do_ls_recovery(struct dlm_ls *ls)
256 {
257         struct dlm_recover *rv = NULL;
258
259         spin_lock(&ls->ls_recover_lock);
260         rv = ls->ls_recover_args;
261         ls->ls_recover_args = NULL;
262         if (rv && ls->ls_recover_seq == rv->seq)
263                 clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
264         spin_unlock(&ls->ls_recover_lock);
265
266         if (rv) {
267                 ls_recover(ls, rv);
268                 kfree(rv->nodes);
269                 kfree(rv);
270         }
271 }
272
273 static int dlm_recoverd(void *arg)
274 {
275         struct dlm_ls *ls;
276
277         ls = dlm_find_lockspace_local(arg);
278         if (!ls) {
279                 log_print("dlm_recoverd: no lockspace %p", arg);
280                 return -1;
281         }
282
283         while (!kthread_should_stop()) {
284                 set_current_state(TASK_INTERRUPTIBLE);
285                 if (!test_bit(LSFL_WORK, &ls->ls_flags))
286                         schedule();
287                 set_current_state(TASK_RUNNING);
288
289                 if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags))
290                         do_ls_recovery(ls);
291         }
292
293         dlm_put_lockspace(ls);
294         return 0;
295 }
296
297 void dlm_recoverd_kick(struct dlm_ls *ls)
298 {
299         set_bit(LSFL_WORK, &ls->ls_flags);
300         wake_up_process(ls->ls_recoverd_task);
301 }
302
303 int dlm_recoverd_start(struct dlm_ls *ls)
304 {
305         struct task_struct *p;
306         int error = 0;
307
308         p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
309         if (IS_ERR(p))
310                 error = PTR_ERR(p);
311         else
312                 ls->ls_recoverd_task = p;
313         return error;
314 }
315
316 void dlm_recoverd_stop(struct dlm_ls *ls)
317 {
318         kthread_stop(ls->ls_recoverd_task);
319 }
320
321 void dlm_recoverd_suspend(struct dlm_ls *ls)
322 {
323         wake_up(&ls->ls_wait_general);
324         mutex_lock(&ls->ls_recoverd_active);
325 }
326
327 void dlm_recoverd_resume(struct dlm_ls *ls)
328 {
329         mutex_unlock(&ls->ls_recoverd_active);
330 }
331