]> Pileus Git - ~andy/linux/blobdiff - fs/dlm/recover.c
Merge tag 'devicetree-for-linus' of git://git.secretlab.ca/git/linux-2.6
[~andy/linux] / fs / dlm / recover.c
index 34d5adf1fce7d2022679d42e1e00673af0825dae..7554e4dac6bbbc0290ac26a1a75438a1551d5060 100644 (file)
@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
 {
        struct dlm_lkb *lkb;
 
-       list_for_each_entry(lkb, queue, lkb_statequeue)
-               if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
+       list_for_each_entry(lkb, queue, lkb_statequeue) {
+               if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
                        lkb->lkb_nodeid = nodeid;
+                       lkb->lkb_remid = 0;
+               }
+       }
 }
 
 static void set_master_lkbs(struct dlm_rsb *r)
@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
 /*
  * Propagate the new master nodeid to locks
  * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
- * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
+ * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
  * rsb's to consider.
  */
 
 static void set_new_master(struct dlm_rsb *r, int nodeid)
 {
-       lock_rsb(r);
        r->res_nodeid = nodeid;
        set_master_lkbs(r);
        rsb_set_flag(r, RSB_NEW_MASTER);
        rsb_set_flag(r, RSB_NEW_MASTER2);
-       unlock_rsb(r);
 }
 
 /*
@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
 static int recover_master(struct dlm_rsb *r)
 {
        struct dlm_ls *ls = r->res_ls;
-       int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
-
-       dir_nodeid = dlm_dir_nodeid(r);
+       int error, ret_nodeid;
+       int our_nodeid = dlm_our_nodeid();
+       int dir_nodeid = dlm_dir_nodeid(r);
 
        if (dir_nodeid == our_nodeid) {
                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
 
                if (ret_nodeid == our_nodeid)
                        ret_nodeid = 0;
+               lock_rsb(r);
                set_new_master(r, ret_nodeid);
+               unlock_rsb(r);
        } else {
                recover_list_add(r);
                error = dlm_send_rcom_lookup(r, dir_nodeid);
@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
 }
 
 /*
- * When not using a directory, most resource names will hash to a new static
- * master nodeid and the resource will need to be remastered.
+ * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
+ * This is necessary because recovery can be started, aborted and restarted,
+ * causing the master nodeid to briefly change during the aborted recovery, and
+ * change back to the original value in the second recovery.  The MSTCPY locks
+ * may or may not have been purged during the aborted recovery.  Another node
+ * with an outstanding request in waiters list and a request reply saved in the
+ * requestqueue, cannot know whether it should ignore the reply and resend the
+ * request, or accept the reply and complete the request.  It must do the
+ * former if the remote node purged MSTCPY locks, and it must do the later if
+ * the remote node did not.  This is solved by always purging MSTCPY locks, in
+ * which case, the request reply would always be ignored and the request
+ * resent.
  */
 
 static int recover_master_static(struct dlm_rsb *r)
 {
-       int master = dlm_dir_nodeid(r);
+       int dir_nodeid = dlm_dir_nodeid(r);
+       int new_master = dir_nodeid;
 
-       if (master == dlm_our_nodeid())
-               master = 0;
+       if (dir_nodeid == dlm_our_nodeid())
+               new_master = 0;
 
-       if (r->res_nodeid != master) {
-               if (is_master(r))
-                       dlm_purge_mstcpy_locks(r);
-               set_new_master(r, master);
-               return 1;
-       }
-       return 0;
+       lock_rsb(r);
+       dlm_purge_mstcpy_locks(r);
+       set_new_master(r, new_master);
+       unlock_rsb(r);
+       return 1;
 }
 
 /*
@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
        if (nodeid == dlm_our_nodeid())
                nodeid = 0;
 
+       lock_rsb(r);
        set_new_master(r, nodeid);
+       unlock_rsb(r);
        recover_list_del(r);
 
        if (recover_list_empty(ls))
@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
        struct dlm_rsb *r;
        int error, count = 0;
 
-       log_debug(ls, "dlm_recover_locks");
-
        down_read(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
                if (is_master(r)) {
@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_locks %d locks", count);
+       log_debug(ls, "dlm_recover_locks %d out", count);
 
        error = dlm_wait_function(ls, &recover_list_empty);
  out:
@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
 }
 
 /* We've become the new master for this rsb and waiting/converting locks may
-   need to be granted in dlm_grant_after_purge() due to locks that may have
+   need to be granted in dlm_recover_grant() due to locks that may have
    existed from a removed node. */
 
-static void set_locks_purged(struct dlm_rsb *r)
+static void recover_grant(struct dlm_rsb *r)
 {
        if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
-               rsb_set_flag(r, RSB_LOCKS_PURGED);
+               rsb_set_flag(r, RSB_RECOVER_GRANT);
 }
 
 void dlm_recover_rsbs(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
-       int count = 0;
-
-       log_debug(ls, "dlm_recover_rsbs");
+       unsigned int count = 0;
 
        down_read(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
                        if (rsb_flag(r, RSB_RECOVER_CONVERT))
                                recover_conversion(r);
                        if (rsb_flag(r, RSB_NEW_MASTER2))
-                               set_locks_purged(r);
+                               recover_grant(r);
                        recover_lvb(r);
                        count++;
                }
@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
+       if (count)
+               log_debug(ls, "dlm_recover_rsbs %d done", count);
 }
 
 /* Create a single list of all root rsb's to be used during recovery */