]> Pileus Git - ~andy/linux/blobdiff - fs/ceph/caps.c
Linux 3.14
[~andy/linux] / fs / ceph / caps.c
index 3c0a4bd7499645ca8bf90fd1a6ba16f6831c164c..17543383545c162f58425fbc8629a99f1c5a717e 100644 (file)
@@ -555,21 +555,34 @@ retry:
                cap->ci = ci;
                __insert_cap_node(ci, cap);
 
-               /* clear out old exporting info?  (i.e. on cap import) */
-               if (ci->i_cap_exporting_mds == mds) {
-                       ci->i_cap_exporting_issued = 0;
-                       ci->i_cap_exporting_mseq = 0;
-                       ci->i_cap_exporting_mds = -1;
-               }
-
                /* add to session cap list */
                cap->session = session;
                spin_lock(&session->s_cap_lock);
                list_add_tail(&cap->session_caps, &session->s_caps);
                session->s_nr_caps++;
                spin_unlock(&session->s_cap_lock);
-       } else if (new_cap)
-               ceph_put_cap(mdsc, new_cap);
+       } else {
+               if (new_cap)
+                       ceph_put_cap(mdsc, new_cap);
+
+               /*
+                * auth mds of the inode changed. we received the cap export
+                * message, but still haven't received the cap import message.
+                * handle_cap_export() updated the new auth MDS' cap.
+                *
+                * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
+                * a message that was send before the cap import message. So
+                * don't remove caps.
+                */
+               if (ceph_seq_cmp(seq, cap->seq) <= 0) {
+                       WARN_ON(cap != ci->i_auth_cap);
+                       WARN_ON(cap->cap_id != cap_id);
+                       seq = cap->seq;
+                       mseq = cap->mseq;
+                       issued |= cap->issued;
+                       flags |= CEPH_CAP_FLAG_AUTH;
+               }
+       }
 
        if (!ci->i_snap_realm) {
                /*
@@ -611,15 +624,9 @@ retry:
                if (ci->i_auth_cap == NULL ||
                    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
                        ci->i_auth_cap = cap;
-       } else if (ci->i_auth_cap == cap) {
-               ci->i_auth_cap = NULL;
-               spin_lock(&mdsc->cap_dirty_lock);
-               if (!list_empty(&ci->i_dirty_item)) {
-                       dout(" moving %p to cap_dirty_migrating\n", inode);
-                       list_move(&ci->i_dirty_item,
-                                 &mdsc->cap_dirty_migrating);
-               }
-               spin_unlock(&mdsc->cap_dirty_lock);
+               ci->i_cap_exporting_issued = 0;
+       } else {
+               WARN_ON(ci->i_auth_cap == cap);
        }
 
        dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
@@ -628,7 +635,7 @@ retry:
        cap->cap_id = cap_id;
        cap->issued = issued;
        cap->implemented |= issued;
-       if (mseq > cap->mseq)
+       if (ceph_seq_cmp(mseq, cap->mseq) > 0)
                cap->mds_wanted = wanted;
        else
                cap->mds_wanted |= wanted;
@@ -816,7 +823,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
 
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
                cap = rb_entry(p, struct ceph_cap, ci_node);
-               if (cap != ocap && __cap_is_valid(cap) &&
+               if (cap != ocap &&
                    (cap->implemented & ~cap->issued & mask))
                        return 1;
        }
@@ -888,7 +895,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
  */
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
-       return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0;
+       return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
+}
+
+int ceph_is_any_caps(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int ret;
+
+       spin_lock(&ci->i_ceph_lock);
+       ret = __ceph_is_any_caps(ci);
+       spin_unlock(&ci->i_ceph_lock);
+
+       return ret;
 }
 
 /*
@@ -1383,13 +1402,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
                                ci->i_snap_realm->cached_context);
                dout(" inode %p now dirty snapc %p auth cap %p\n",
                     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
+               WARN_ON(!ci->i_auth_cap);
                BUG_ON(!list_empty(&ci->i_dirty_item));
                spin_lock(&mdsc->cap_dirty_lock);
-               if (ci->i_auth_cap)
-                       list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
-               else
-                       list_add(&ci->i_dirty_item,
-                                &mdsc->cap_dirty_migrating);
+               list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
                spin_unlock(&mdsc->cap_dirty_lock);
                if (ci->i_flushing_caps == 0) {
                        ihold(inode);
@@ -1735,13 +1751,12 @@ ack:
 /*
  * Try to flush dirty caps back to the auth mds.
  */
-static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
-                         unsigned *flush_tid)
+static int try_flush_caps(struct inode *inode, unsigned *flush_tid)
 {
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int unlock_session = session ? 0 : 1;
        int flushing = 0;
+       struct ceph_mds_session *session = NULL;
 
 retry:
        spin_lock(&ci->i_ceph_lock);
@@ -1755,13 +1770,14 @@ retry:
                int want = __ceph_caps_wanted(ci);
                int delayed;
 
-               if (!session) {
+               if (!session || session != cap->session) {
                        spin_unlock(&ci->i_ceph_lock);
+                       if (session)
+                               mutex_unlock(&session->s_mutex);
                        session = cap->session;
                        mutex_lock(&session->s_mutex);
                        goto retry;
                }
-               BUG_ON(session != cap->session);
                if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
                        goto out;
 
@@ -1780,7 +1796,7 @@ retry:
 out:
        spin_unlock(&ci->i_ceph_lock);
 out_unlocked:
-       if (session && unlock_session)
+       if (session)
                mutex_unlock(&session->s_mutex);
        return flushing;
 }
@@ -1865,7 +1881,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
                return ret;
        mutex_lock(&inode->i_mutex);
 
-       dirty = try_flush_caps(inode, NULL, &flush_tid);
+       dirty = try_flush_caps(inode, &flush_tid);
        dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
        /*
@@ -1900,7 +1916,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 
        dout("write_inode %p wait=%d\n", inode, wait);
        if (wait) {
-               dirty = try_flush_caps(inode, NULL, &flush_tid);
+               dirty = try_flush_caps(inode, &flush_tid);
                if (dirty)
                        err = wait_event_interruptible(ci->i_cap_wq,
                                       caps_are_flushed(inode, flush_tid));
@@ -2350,11 +2366,11 @@ static void invalidate_aliases(struct inode *inode)
        d_prune_aliases(inode);
        /*
         * For non-directory inode, d_find_alias() only returns
-        * connected dentry. After calling d_invalidate(), the
-        * dentry become disconnected.
+        * hashed dentry. After calling d_invalidate(), the
+        * dentry becomes unhashed.
         *
         * For directory inode, d_find_alias() can return
-        * disconnected dentry. But directory inode should have
+        * unhashed dentry. But directory inode should have
         * one alias at most.
         */
        while ((dn = d_find_alias(inode))) {
@@ -2408,6 +2424,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
                inode->i_size);
 
+
+       /*
+        * auth mds of the inode changed. we received the cap export message,
+        * but still haven't received the cap import message. handle_cap_export
+        * updated the new auth MDS' cap.
+        *
+        * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
+        * that was sent before the cap import message. So don't remove caps.
+        */
+       if (ceph_seq_cmp(seq, cap->seq) <= 0) {
+               WARN_ON(cap != ci->i_auth_cap);
+               WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
+               seq = cap->seq;
+               newcaps |= cap->issued;
+       }
+
        /*
         * If CACHE is being revoked, and we have no dirty buffers,
         * try to invalidate (once).  (If there are dirty buffers, we
@@ -2434,6 +2466,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        issued |= implemented | __ceph_caps_dirty(ci);
 
        cap->cap_gen = session->s_cap_gen;
+       cap->seq = seq;
 
        __check_cap_issue(ci, cap, newcaps);
 
@@ -2464,6 +2497,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                                ceph_buffer_put(ci->i_xattrs.blob);
                        ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
                        ci->i_xattrs.version = version;
+                       ceph_forget_all_cached_acls(inode);
                }
        }
 
@@ -2483,6 +2517,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                            le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
                            &atime);
 
+
+       /* file layout may have changed */
+       ci->i_layout = grant->layout;
+
        /* max size increase? */
        if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
                dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
@@ -2511,11 +2549,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                        check_caps = 1;
        }
 
-       cap->seq = seq;
-
-       /* file layout may have changed */
-       ci->i_layout = grant->layout;
-
        /* revocation, grant, or no-op? */
        if (cap->issued & ~newcaps) {
                int revoking = cap->issued & ~newcaps;
@@ -2741,65 +2774,114 @@ static void handle_cap_trunc(struct inode *inode,
  * caller holds s_mutex
  */
 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
-                             struct ceph_mds_session *session,
-                             int *open_target_sessions)
+                             struct ceph_mds_cap_peer *ph,
+                             struct ceph_mds_session *session)
 {
        struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+       struct ceph_mds_session *tsession = NULL;
+       struct ceph_cap *cap, *tcap;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int mds = session->s_mds;
+       u64 t_cap_id;
        unsigned mseq = le32_to_cpu(ex->migrate_seq);
-       struct ceph_cap *cap = NULL, *t;
-       struct rb_node *p;
-       int remember = 1;
+       unsigned t_seq, t_mseq;
+       int target, issued;
+       int mds = session->s_mds;
 
-       dout("handle_cap_export inode %p ci %p mds%d mseq %d\n",
-            inode, ci, mds, mseq);
+       if (ph) {
+               t_cap_id = le64_to_cpu(ph->cap_id);
+               t_seq = le32_to_cpu(ph->seq);
+               t_mseq = le32_to_cpu(ph->mseq);
+               target = le32_to_cpu(ph->mds);
+       } else {
+               t_cap_id = t_seq = t_mseq = 0;
+               target = -1;
+       }
 
+       dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
+            inode, ci, mds, mseq, target);
+retry:
        spin_lock(&ci->i_ceph_lock);
+       cap = __get_cap_for_mds(ci, mds);
+       if (!cap)
+               goto out_unlock;
 
-       /* make sure we haven't seen a higher mseq */
-       for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
-               t = rb_entry(p, struct ceph_cap, ci_node);
-               if (ceph_seq_cmp(t->mseq, mseq) > 0) {
-                       dout(" higher mseq on cap from mds%d\n",
-                            t->session->s_mds);
-                       remember = 0;
-               }
-               if (t->session->s_mds == mds)
-                       cap = t;
+       if (target < 0) {
+               __ceph_remove_cap(cap, false);
+               goto out_unlock;
        }
 
-       if (cap) {
-               if (remember) {
-                       /* make note */
-                       ci->i_cap_exporting_mds = mds;
-                       ci->i_cap_exporting_mseq = mseq;
-                       ci->i_cap_exporting_issued = cap->issued;
-
-                       /*
-                        * make sure we have open sessions with all possible
-                        * export targets, so that we get the matching IMPORT
-                        */
-                       *open_target_sessions = 1;
+       /*
+        * now we know we haven't received the cap import message yet
+        * because the exported cap still exist.
+        */
 
-                       /*
-                        * we can't flush dirty caps that we've seen the
-                        * EXPORT but no IMPORT for
-                        */
-                       spin_lock(&mdsc->cap_dirty_lock);
-                       if (!list_empty(&ci->i_dirty_item)) {
-                               dout(" moving %p to cap_dirty_migrating\n",
-                                    inode);
-                               list_move(&ci->i_dirty_item,
-                                         &mdsc->cap_dirty_migrating);
+       issued = cap->issued;
+       WARN_ON(issued != cap->implemented);
+
+       tcap = __get_cap_for_mds(ci, target);
+       if (tcap) {
+               /* already have caps from the target */
+               if (tcap->cap_id != t_cap_id ||
+                   ceph_seq_cmp(tcap->seq, t_seq) < 0) {
+                       dout(" updating import cap %p mds%d\n", tcap, target);
+                       tcap->cap_id = t_cap_id;
+                       tcap->seq = t_seq - 1;
+                       tcap->issue_seq = t_seq - 1;
+                       tcap->mseq = t_mseq;
+                       tcap->issued |= issued;
+                       tcap->implemented |= issued;
+                       if (cap == ci->i_auth_cap)
+                               ci->i_auth_cap = tcap;
+                       if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
+                               spin_lock(&mdsc->cap_dirty_lock);
+                               list_move_tail(&ci->i_flushing_item,
+                                              &tcap->session->s_cap_flushing);
+                               spin_unlock(&mdsc->cap_dirty_lock);
                        }
-                       spin_unlock(&mdsc->cap_dirty_lock);
                }
                __ceph_remove_cap(cap, false);
+               goto out_unlock;
        }
-       /* else, we already released it */
 
+       if (tsession) {
+               int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
+               spin_unlock(&ci->i_ceph_lock);
+               /* add placeholder for the export tagert */
+               ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
+                            t_seq - 1, t_mseq, (u64)-1, flag, NULL);
+               goto retry;
+       }
+
+       spin_unlock(&ci->i_ceph_lock);
+       mutex_unlock(&session->s_mutex);
+
+       /* open target session */
+       tsession = ceph_mdsc_open_export_target_session(mdsc, target);
+       if (!IS_ERR(tsession)) {
+               if (mds > target) {
+                       mutex_lock(&session->s_mutex);
+                       mutex_lock_nested(&tsession->s_mutex,
+                                         SINGLE_DEPTH_NESTING);
+               } else {
+                       mutex_lock(&tsession->s_mutex);
+                       mutex_lock_nested(&session->s_mutex,
+                                         SINGLE_DEPTH_NESTING);
+               }
+               ceph_add_cap_releases(mdsc, tsession);
+       } else {
+               WARN_ON(1);
+               tsession = NULL;
+               target = -1;
+       }
+       goto retry;
+
+out_unlock:
        spin_unlock(&ci->i_ceph_lock);
+       mutex_unlock(&session->s_mutex);
+       if (tsession) {
+               mutex_unlock(&tsession->s_mutex);
+               ceph_put_mds_session(tsession);
+       }
 }
 
 /*
@@ -2810,10 +2892,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
  */
 static void handle_cap_import(struct ceph_mds_client *mdsc,
                              struct inode *inode, struct ceph_mds_caps *im,
+                             struct ceph_mds_cap_peer *ph,
                              struct ceph_mds_session *session,
                              void *snaptrace, int snaptrace_len)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_cap *cap;
        int mds = session->s_mds;
        unsigned issued = le32_to_cpu(im->caps);
        unsigned wanted = le32_to_cpu(im->wanted);
@@ -2821,28 +2905,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
        unsigned mseq = le32_to_cpu(im->migrate_seq);
        u64 realmino = le64_to_cpu(im->realm);
        u64 cap_id = le64_to_cpu(im->cap_id);
+       u64 p_cap_id;
+       int peer;
 
-       if (ci->i_cap_exporting_mds >= 0 &&
-           ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) {
-               dout("handle_cap_import inode %p ci %p mds%d mseq %d"
-                    " - cleared exporting from mds%d\n",
-                    inode, ci, mds, mseq,
-                    ci->i_cap_exporting_mds);
-               ci->i_cap_exporting_issued = 0;
-               ci->i_cap_exporting_mseq = 0;
-               ci->i_cap_exporting_mds = -1;
+       if (ph) {
+               p_cap_id = le64_to_cpu(ph->cap_id);
+               peer = le32_to_cpu(ph->mds);
+       } else {
+               p_cap_id = 0;
+               peer = -1;
+       }
 
-               spin_lock(&mdsc->cap_dirty_lock);
-               if (!list_empty(&ci->i_dirty_item)) {
-                       dout(" moving %p back to cap_dirty\n", inode);
-                       list_move(&ci->i_dirty_item, &mdsc->cap_dirty);
+       dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
+            inode, ci, mds, mseq, peer);
+
+       spin_lock(&ci->i_ceph_lock);
+       cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
+       if (cap && cap->cap_id == p_cap_id) {
+               dout(" remove export cap %p mds%d flags %d\n",
+                    cap, peer, ph->flags);
+               if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
+                   (cap->seq != le32_to_cpu(ph->seq) ||
+                    cap->mseq != le32_to_cpu(ph->mseq))) {
+                       pr_err("handle_cap_import: mismatched seq/mseq: "
+                              "ino (%llx.%llx) mds%d seq %d mseq %d "
+                              "importer mds%d has peer seq %d mseq %d\n",
+                              ceph_vinop(inode), peer, cap->seq,
+                              cap->mseq, mds, le32_to_cpu(ph->seq),
+                              le32_to_cpu(ph->mseq));
                }
-               spin_unlock(&mdsc->cap_dirty_lock);
-       } else {
-               dout("handle_cap_import inode %p ci %p mds%d mseq %d\n",
-                    inode, ci, mds, mseq);
+               ci->i_cap_exporting_issued = cap->issued;
+               __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
        }
 
+       /* make sure we re-request max_size, if necessary */
+       ci->i_wanted_max_size = 0;
+       ci->i_requested_max_size = 0;
+       spin_unlock(&ci->i_ceph_lock);
+
        down_write(&mdsc->snap_rwsem);
        ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
                               false);
@@ -2853,11 +2953,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
        kick_flushing_inode_caps(mdsc, session, inode);
        up_read(&mdsc->snap_rwsem);
 
-       /* make sure we re-request max_size, if necessary */
-       spin_lock(&ci->i_ceph_lock);
-       ci->i_wanted_max_size = 0;  /* reset */
-       ci->i_requested_max_size = 0;
-       spin_unlock(&ci->i_ceph_lock);
 }
 
 /*
@@ -2875,6 +2970,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct ceph_inode_info *ci;
        struct ceph_cap *cap;
        struct ceph_mds_caps *h;
+       struct ceph_mds_cap_peer *peer = NULL;
        int mds = session->s_mds;
        int op;
        u32 seq, mseq;
@@ -2885,12 +2981,13 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        void *snaptrace;
        size_t snaptrace_len;
        void *flock;
+       void *end;
        u32 flock_len;
-       int open_target_sessions = 0;
 
        dout("handle_caps from mds%d\n", mds);
 
        /* decode */
+       end = msg->front.iov_base + msg->front.iov_len;
        tid = le64_to_cpu(msg->hdr.tid);
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
@@ -2908,17 +3005,28 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        snaptrace_len = le32_to_cpu(h->snap_trace_len);
 
        if (le16_to_cpu(msg->hdr.version) >= 2) {
-               void *p, *end;
-
-               p = snaptrace + snaptrace_len;
-               end = msg->front.iov_base + msg->front.iov_len;
+               void *p = snaptrace + snaptrace_len;
                ceph_decode_32_safe(&p, end, flock_len, bad);
+               if (p + flock_len > end)
+                       goto bad;
                flock = p;
        } else {
                flock = NULL;
                flock_len = 0;
        }
 
+       if (le16_to_cpu(msg->hdr.version) >= 3) {
+               if (op == CEPH_CAP_OP_IMPORT) {
+                       void *p = flock + flock_len;
+                       if (p + sizeof(*peer) > end)
+                               goto bad;
+                       peer = p;
+               } else if (op == CEPH_CAP_OP_EXPORT) {
+                       /* recorded in unused fields */
+                       peer = (void *)&h->size;
+               }
+       }
+
        mutex_lock(&session->s_mutex);
        session->s_seq++;
        dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2951,11 +3059,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                goto done;
 
        case CEPH_CAP_OP_EXPORT:
-               handle_cap_export(inode, h, session, &open_target_sessions);
-               goto done;
+               handle_cap_export(inode, h, peer, session);
+               goto done_unlocked;
 
        case CEPH_CAP_OP_IMPORT:
-               handle_cap_import(mdsc, inode, h, session,
+               handle_cap_import(mdsc, inode, h, peer, session,
                                  snaptrace, snaptrace_len);
        }
 
@@ -3007,8 +3115,6 @@ done:
 done_unlocked:
        if (inode)
                iput(inode);
-       if (open_target_sessions)
-               ceph_mdsc_open_export_target_sessions(mdsc, session);
        return;
 
 bad: