]> Pileus Git - ~andy/linux/blobdiff - fs/ocfs2/journal.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-2.6
[~andy/linux] / fs / ocfs2 / journal.c
index a20a0f1e37fd18e898ade8803d19ff7489df7e85..f033760ecbeaae0dde0944d190828aedb7b9c5d0 100644 (file)
@@ -28,6 +28,8 @@
 #include <linux/slab.h>
 #include <linux/highmem.h>
 #include <linux/kthread.h>
+#include <linux/time.h>
+#include <linux/random.h>
 
 #define MLOG_MASK_PREFIX ML_JOURNAL
 #include <cluster/masklog.h>
@@ -52,6 +54,8 @@
 
 DEFINE_SPINLOCK(trans_inc_lock);
 
+#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
+
 static int ocfs2_force_read_journal(struct inode *inode);
 static int ocfs2_recover_node(struct ocfs2_super *osb,
                              int node_num, int slot_num);
@@ -1841,6 +1845,128 @@ bail:
        return status;
 }
 
+/*
+ * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some
+ * randomness to the timeout to minimize multple nodes firing the timer at the
+ * same time.
+ */
+static inline unsigned long ocfs2_orphan_scan_timeout(void)
+{
+       unsigned long time;
+
+       get_random_bytes(&time, sizeof(time));
+       time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000);
+       return msecs_to_jiffies(time);
+}
+
+/*
+ * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for
+ * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
+ * is done to catch any orphans that are left over in orphan directories.
+ *
+ * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
+ * seconds.  It gets an EX lock on os_lockres and checks sequence number
+ * stored in LVB. If the sequence number has changed, it means some other
+ * node has done the scan.  This node skips the scan and tracks the
+ * sequence number.  If the sequence number didn't change, it means a scan
+ * hasn't happened.  The node queues a scan and increments the
+ * sequence number in the LVB.
+ */
+void ocfs2_queue_orphan_scan(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+       int status, i;
+       u32 seqno = 0;
+
+       os = &osb->osb_orphan_scan;
+
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
+               goto out;
+
+       status = ocfs2_orphan_scan_lock(osb, &seqno);
+       if (status < 0) {
+               if (status != -EAGAIN)
+                       mlog_errno(status);
+               goto out;
+       }
+
+       /* Do no queue the tasks if the volume is being umounted */
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE)
+               goto unlock;
+
+       if (os->os_seqno != seqno) {
+               os->os_seqno = seqno;
+               goto unlock;
+       }
+
+       for (i = 0; i < osb->max_slots; i++)
+               ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL,
+                                               NULL);
+       /*
+        * We queued a recovery on orphan slots, increment the sequence
+        * number and update LVB so other node will skip the scan for a while
+        */
+       seqno++;
+       os->os_count++;
+       os->os_scantime = CURRENT_TIME;
+unlock:
+       ocfs2_orphan_scan_unlock(osb, seqno);
+out:
+       return;
+}
+
+/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */
+void ocfs2_orphan_scan_work(struct work_struct *work)
+{
+       struct ocfs2_orphan_scan *os;
+       struct ocfs2_super *osb;
+
+       os = container_of(work, struct ocfs2_orphan_scan,
+                         os_orphan_scan_work.work);
+       osb = os->os_osb;
+
+       mutex_lock(&os->os_lock);
+       ocfs2_queue_orphan_scan(osb);
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE)
+               schedule_delayed_work(&os->os_orphan_scan_work,
+                                     ocfs2_orphan_scan_timeout());
+       mutex_unlock(&os->os_lock);
+}
+
+void ocfs2_orphan_scan_stop(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) {
+               atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
+               mutex_lock(&os->os_lock);
+               cancel_delayed_work(&os->os_orphan_scan_work);
+               mutex_unlock(&os->os_lock);
+       }
+}
+
+void ocfs2_orphan_scan_init(struct ocfs2_super *osb)
+{
+       struct ocfs2_orphan_scan *os;
+
+       os = &osb->osb_orphan_scan;
+       os->os_osb = osb;
+       os->os_count = 0;
+       os->os_seqno = 0;
+       os->os_scantime = CURRENT_TIME;
+       mutex_init(&os->os_lock);
+       INIT_DELAYED_WORK(&os->os_orphan_scan_work, ocfs2_orphan_scan_work);
+
+       if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
+               atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE);
+       else {
+               atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE);
+               schedule_delayed_work(&os->os_orphan_scan_work,
+                                     ocfs2_orphan_scan_timeout());
+       }
+}
+
 struct ocfs2_orphan_filldir_priv {
        struct inode            *head;
        struct ocfs2_super      *osb;