]> Pileus Git - ~andy/linux/commitdiff
Merge tag 'fscache-fixes-for-ceph' into wip-fscache
authorMilosz Tanski <milosz@adfin.com>
Fri, 6 Sep 2013 16:41:20 +0000 (16:41 +0000)
committerMilosz Tanski <milosz@adfin.com>
Fri, 6 Sep 2013 16:41:20 +0000 (16:41 +0000)
Patches for Ceph FS-Cache support

12 files changed:
drivers/block/rbd.c
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/ioctl.c
fs/ceph/mds_client.c
fs/ceph/super.h
net/ceph/messenger.c
net/ceph/osd_client.c
net/ceph/osdmap.c

index 4ad2ad9a5bb01448d6d2206a3387575718c47b87..fef3687c1527767aa37bc57a56af1756b54dd79e 100644 (file)
@@ -1557,11 +1557,12 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
                obj_request, obj_request->img_request, obj_request->result,
                xferred, length);
        /*
-        * ENOENT means a hole in the image.  We zero-fill the
-        * entire length of the request.  A short read also implies
-        * zero-fill to the end of the request.  Either way we
-        * update the xferred count to indicate the whole request
-        * was satisfied.
+        * ENOENT means a hole in the image.  We zero-fill the entire
+        * length of the request.  A short read also implies zero-fill
+        * to the end of the request.  An error requires the whole
+        * length of the request to be reported finished with an error
+        * to the block layer.  In each case we update the xferred
+        * count to indicate the whole request was satisfied.
         */
        rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
        if (obj_request->result == -ENOENT) {
@@ -1570,14 +1571,13 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
                else
                        zero_pages(obj_request->pages, 0, length);
                obj_request->result = 0;
-               obj_request->xferred = length;
        } else if (xferred < length && !obj_request->result) {
                if (obj_request->type == OBJ_REQUEST_BIO)
                        zero_bio_chain(obj_request->bio_list, xferred);
                else
                        zero_pages(obj_request->pages, xferred, length);
-               obj_request->xferred = length;
        }
+       obj_request->xferred = length;
        obj_request_done_set(obj_request);
 }
 
@@ -2163,9 +2163,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        struct rbd_obj_request *obj_request = NULL;
        struct rbd_obj_request *next_obj_request;
        bool write_request = img_request_write_test(img_request);
-       struct bio *bio_list = 0;
+       struct bio *bio_list = NULL;
        unsigned int bio_offset = 0;
-       struct page **pages = 0;
+       struct page **pages = NULL;
        u64 img_offset;
        u64 resid;
        u16 opcode;
@@ -2203,6 +2203,11 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                rbd_segment_name_free(object_name);
                if (!obj_request)
                        goto out_unwind;
+               /*
+                * set obj_request->img_request before creating the
+                * osd_request so that it gets the right snapc
+                */
+               rbd_img_obj_request_add(img_request, obj_request);
 
                if (type == OBJ_REQUEST_BIO) {
                        unsigned int clone_size;
@@ -2244,11 +2249,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                                        obj_request->pages, length,
                                        offset & ~PAGE_MASK, false, false);
 
-               /*
-                * set obj_request->img_request before formatting
-                * the osd_request so that it gets the right snapc
-                */
-               rbd_img_obj_request_add(img_request, obj_request);
                if (write_request)
                        rbd_osd_req_format_write(obj_request);
                else
@@ -3702,12 +3702,14 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
        if (ret < sizeof (size_buf))
                return -ERANGE;
 
-       if (order)
+       if (order) {
                *order = size_buf.order;
+               dout("  order %u", (unsigned int)*order);
+       }
        *snap_size = le64_to_cpu(size_buf.size);
 
-       dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
-               (unsigned long long)snap_id, (unsigned int)*order,
+       dout("  snap_id 0x%016llx snap_size = %llu\n",
+               (unsigned long long)snap_id,
                (unsigned long long)*snap_size);
 
        return 0;
index 5318a3b704f6d6f908520a9c1fc18b4dadc9a509..3bed7da383263a3ba967c6158b2ff6bd1bf5c4af 100644 (file)
@@ -70,15 +70,16 @@ static int ceph_set_page_dirty(struct page *page)
        struct address_space *mapping = page->mapping;
        struct inode *inode;
        struct ceph_inode_info *ci;
-       int undo = 0;
        struct ceph_snap_context *snapc;
+       int ret;
 
        if (unlikely(!mapping))
                return !TestSetPageDirty(page);
 
-       if (TestSetPageDirty(page)) {
+       if (PageDirty(page)) {
                dout("%p set_page_dirty %p idx %lu -- already dirty\n",
                     mapping->host, page, page->index);
+               BUG_ON(!PagePrivate(page));
                return 0;
        }
 
@@ -107,35 +108,19 @@ static int ceph_set_page_dirty(struct page *page)
             snapc, snapc->seq, snapc->num_snaps);
        spin_unlock(&ci->i_ceph_lock);
 
-       /* now adjust page */
-       spin_lock_irq(&mapping->tree_lock);
-       if (page->mapping) {    /* Race with truncate? */
-               WARN_ON_ONCE(!PageUptodate(page));
-               account_page_dirtied(page, page->mapping);
-               radix_tree_tag_set(&mapping->page_tree,
-                               page_index(page), PAGECACHE_TAG_DIRTY);
-
-               /*
-                * Reference snap context in page->private.  Also set
-                * PagePrivate so that we get invalidatepage callback.
-                */
-               page->private = (unsigned long)snapc;
-               SetPagePrivate(page);
-       } else {
-               dout("ANON set_page_dirty %p (raced truncate?)\n", page);
-               undo = 1;
-       }
-
-       spin_unlock_irq(&mapping->tree_lock);
-
-       if (undo)
-               /* whoops, we failed to dirty the page */
-               ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
+       /*
+        * Reference snap context in page->private.  Also set
+        * PagePrivate so that we get invalidatepage callback.
+        */
+       BUG_ON(PagePrivate(page));
+       page->private = (unsigned long)snapc;
+       SetPagePrivate(page);
 
-       __mark_inode_dirty(mapping->host, I_DIRTY_PAGES);
+       ret = __set_page_dirty_nobuffers(page);
+       WARN_ON(!PageLocked(page));
+       WARN_ON(!page->mapping);
 
-       BUG_ON(!PageDirty(page));
-       return 1;
+       return ret;
 }
 
 /*
@@ -150,11 +135,14 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
        struct ceph_inode_info *ci;
        struct ceph_snap_context *snapc = page_snap_context(page);
 
-       BUG_ON(!PageLocked(page));
-       BUG_ON(!PagePrivate(page));
-       BUG_ON(!page->mapping);
-
        inode = page->mapping->host;
+       ci = ceph_inode(inode);
+
+       if (offset != 0 || length != PAGE_CACHE_SIZE) {
+               dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
+                    inode, page, page->index, offset, length);
+               return;
+       }
 
        /*
         * We can get non-dirty pages here due to races between
@@ -164,21 +152,15 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
        if (!PageDirty(page))
                pr_err("%p invalidatepage %p page not dirty\n", inode, page);
 
-       if (offset == 0 && length == PAGE_CACHE_SIZE)
-               ClearPageChecked(page);
+       ClearPageChecked(page);
 
-       ci = ceph_inode(inode);
-       if (offset == 0 && length == PAGE_CACHE_SIZE) {
-               dout("%p invalidatepage %p idx %lu full dirty page\n",
-                    inode, page, page->index);
-               ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
-               ceph_put_snap_context(snapc);
-               page->private = 0;
-               ClearPagePrivate(page);
-       } else {
-               dout("%p invalidatepage %p idx %lu partial dirty page %u(%u)\n",
-                    inode, page, page->index, offset, length);
-       }
+       dout("%p invalidatepage %p idx %lu full dirty page\n",
+            inode, page, page->index);
+
+       ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
+       ceph_put_snap_context(snapc);
+       page->private = 0;
+       ClearPagePrivate(page);
 }
 
 /* just a sanity check */
index 25442b40c25a71761596e071612140f01279fb69..5a26bc1dd799adc45a75f0b477c25a84c19844bb 100644 (file)
@@ -2072,19 +2072,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
        /* finish pending truncate */
        while (ci->i_truncate_pending) {
                spin_unlock(&ci->i_ceph_lock);
-               if (!(need & CEPH_CAP_FILE_WR))
-                       mutex_lock(&inode->i_mutex);
                __ceph_do_pending_vmtruncate(inode);
-               if (!(need & CEPH_CAP_FILE_WR))
-                       mutex_unlock(&inode->i_mutex);
                spin_lock(&ci->i_ceph_lock);
        }
 
-       if (need & CEPH_CAP_FILE_WR) {
+       have = __ceph_caps_issued(ci, &implemented);
+
+       if (have & need & CEPH_CAP_FILE_WR) {
                if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
                        dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
                             inode, endoff, ci->i_max_size);
-                       if (endoff > ci->i_wanted_max_size) {
+                       if (endoff > ci->i_requested_max_size) {
                                *check_max = 1;
                                ret = 1;
                        }
@@ -2099,7 +2097,6 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
                        goto out;
                }
        }
-       have = __ceph_caps_issued(ci, &implemented);
 
        if ((have & need) == need) {
                /*
@@ -2141,14 +2138,17 @@ static void check_max_size(struct inode *inode, loff_t endoff)
 
        /* do we need to explicitly request a larger max_size? */
        spin_lock(&ci->i_ceph_lock);
-       if ((endoff >= ci->i_max_size ||
-            endoff > (inode->i_size << 1)) &&
-           endoff > ci->i_wanted_max_size) {
+       if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
                dout("write %p at large endoff %llu, req max_size\n",
                     inode, endoff);
                ci->i_wanted_max_size = endoff;
-               check = 1;
        }
+       /* duplicate ceph_check_caps()'s logic */
+       if (ci->i_auth_cap &&
+           (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
+           ci->i_wanted_max_size > ci->i_max_size &&
+           ci->i_wanted_max_size > ci->i_requested_max_size)
+               check = 1;
        spin_unlock(&ci->i_ceph_lock);
        if (check)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
@@ -2333,6 +2333,38 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                iput(inode);
 }
 
+/*
+ * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
+ */
+static void invalidate_aliases(struct inode *inode)
+{
+       struct dentry *dn, *prev = NULL;
+
+       dout("invalidate_aliases inode %p\n", inode);
+       d_prune_aliases(inode);
+       /*
+        * For non-directory inode, d_find_alias() only returns
+        * connected dentry. After calling d_delete(), the dentry
+        * become disconnected.
+        *
+        * For directory inode, d_find_alias() only can return
+        * disconnected dentry. But directory inode should have
+        * one alias at most.
+        */
+       while ((dn = d_find_alias(inode))) {
+               if (dn == prev) {
+                       dput(dn);
+                       break;
+               }
+               d_delete(dn);
+               if (prev)
+                       dput(prev);
+               prev = dn;
+       }
+       if (prev)
+               dput(prev);
+}
+
 /*
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
@@ -2361,8 +2393,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        int check_caps = 0;
        int wake = 0;
        int writeback = 0;
-       int revoked_rdcache = 0;
        int queue_invalidate = 0;
+       int deleted_inode = 0;
 
        dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
             inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2377,9 +2409,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
            (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
            !ci->i_wrbuffer_ref) {
-               if (try_nonblocking_invalidate(inode) == 0) {
-                       revoked_rdcache = 1;
-               } else {
+               if (try_nonblocking_invalidate(inode)) {
                        /* there were locked pages.. invalidate later
                           in a separate thread. */
                        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
@@ -2407,8 +2437,12 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                     from_kgid(&init_user_ns, inode->i_gid));
        }
 
-       if ((issued & CEPH_CAP_LINK_EXCL) == 0)
+       if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
                set_nlink(inode, le32_to_cpu(grant->nlink));
+               if (inode->i_nlink == 0 &&
+                   (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
+                       deleted_inode = 1;
+       }
 
        if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
                int len = le32_to_cpu(grant->xattr_len);
@@ -2517,6 +2551,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                ceph_queue_writeback(inode);
        if (queue_invalidate)
                ceph_queue_invalidate(inode);
+       if (deleted_inode)
+               invalidate_aliases(inode);
        if (wake)
                wake_up_all(&ci->i_cap_wq);
 
index a40ceda47a3218ee53c2167d8844899c5de3e9cf..868b61d56cac77f3a8328d5ba4851ec7947fe827 100644 (file)
@@ -793,6 +793,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
        req->r_locked_dir = dir;
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+       /* release LINK_SHARED on source inode (mds will lock it) */
+       req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (err) {
                d_drop(dentry);
index 2ddf061c1c4af730885365b07dcb9388d7af98f9..20d0222c2e76b892d933a841d26ed6912d4c84f9 100644 (file)
@@ -8,6 +8,7 @@
 #include <linux/namei.h>
 #include <linux/writeback.h>
 #include <linux/aio.h>
+#include <linux/falloc.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -313,9 +314,9 @@ static int striped_read(struct inode *inode,
 {
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       u64 pos, this_len;
+       u64 pos, this_len, left;
        int io_align, page_align;
-       int left, pages_left;
+       int pages_left;
        int read;
        struct page **page_pos;
        int ret;
@@ -346,47 +347,40 @@ more:
                ret = 0;
        hit_stripe = this_len < left;
        was_short = ret >= 0 && ret < this_len;
-       dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read,
+       dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
             ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
 
-       if (ret > 0) {
-               int didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
-
-               if (read < pos - off) {
-                       dout(" zero gap %llu to %llu\n", off + read, pos);
-                       ceph_zero_page_vector_range(page_align + read,
-                                                   pos - off - read, pages);
+       if (ret >= 0) {
+               int didpages;
+               if (was_short && (pos + ret < inode->i_size)) {
+                       u64 tmp = min(this_len - ret,
+                                       inode->i_size - pos - ret);
+                       dout(" zero gap %llu to %llu\n",
+                               pos + ret, pos + ret + tmp);
+                       ceph_zero_page_vector_range(page_align + read + ret,
+                                                       tmp, pages);
+                       ret += tmp;
                }
+
+               didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
                pos += ret;
                read = pos - off;
                left -= ret;
                page_pos += didpages;
                pages_left -= didpages;
 
-               /* hit stripe*/
-               if (left && hit_stripe)
+               /* hit stripe and need continue*/
+               if (left && hit_stripe && pos < inode->i_size)
                        goto more;
        }
 
-       if (was_short) {
+       if (read > 0) {
+               ret = read;
                /* did we bounce off eof? */
                if (pos + left > inode->i_size)
                        *checkeof = 1;
-
-               /* zero trailing bytes (inside i_size) */
-               if (left > 0 && pos < inode->i_size) {
-                       if (pos + left > inode->i_size)
-                               left = inode->i_size - pos;
-
-                       dout("zero tail %d\n", left);
-                       ceph_zero_page_vector_range(page_align + read, left,
-                                                   pages);
-                       read += left;
-               }
        }
 
-       if (ret >= 0)
-               ret = read;
        dout("striped_read returns %d\n", ret);
        return ret;
 }
@@ -618,6 +612,8 @@ out:
                if (check_caps)
                        ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
                                        NULL);
+       } else if (ret != -EOLDSNAPC && written > 0) {
+               ret = written;
        }
        return ret;
 }
@@ -659,7 +655,6 @@ again:
 
        if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
            (iocb->ki_filp->f_flags & O_DIRECT) ||
-           (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
            (fi->flags & CEPH_F_SYNC))
                /* hmm, this isn't really async... */
                ret = ceph_sync_read(filp, base, len, ppos, &checkeof);
@@ -711,13 +706,11 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
                &ceph_sb_to_client(inode->i_sb)->client->osdc;
        ssize_t count, written = 0;
        int err, want, got;
-       bool hold_mutex;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
        mutex_lock(&inode->i_mutex);
-       hold_mutex = true;
 
        err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ);
        if (err)
@@ -763,18 +756,31 @@ retry_snap:
 
        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
            (iocb->ki_filp->f_flags & O_DIRECT) ||
-           (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
            (fi->flags & CEPH_F_SYNC)) {
                mutex_unlock(&inode->i_mutex);
                written = ceph_sync_write(file, iov->iov_base, count,
                                          pos, &iocb->ki_pos);
+               if (written == -EOLDSNAPC) {
+                       dout("aio_write %p %llx.%llx %llu~%u"
+                               "got EOLDSNAPC, retrying\n",
+                               inode, ceph_vinop(inode),
+                               pos, (unsigned)iov->iov_len);
+                       mutex_lock(&inode->i_mutex);
+                       goto retry_snap;
+               }
        } else {
+               /*
+                * No need to acquire the i_truncate_mutex. Because
+                * the MDS revokes Fwb caps before sending truncate
+                * message to us. We can't get Fwb cap while there
+                * are pending vmtruncate. So write and vmtruncate
+                * can not run at the same time
+                */
                written = generic_file_buffered_write(iocb, iov, nr_segs,
                                                      pos, &iocb->ki_pos,
                                                      count, 0);
                mutex_unlock(&inode->i_mutex);
        }
-       hold_mutex = false;
 
        if (written >= 0) {
                int dirty;
@@ -798,18 +804,12 @@ retry_snap:
                        written = err;
        }
 
-       if (written == -EOLDSNAPC) {
-               dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
-                    inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
-               mutex_lock(&inode->i_mutex);
-               hold_mutex = true;
-               goto retry_snap;
-       }
+       goto out_unlocked;
+
 out:
-       if (hold_mutex)
-               mutex_unlock(&inode->i_mutex);
+       mutex_unlock(&inode->i_mutex);
+out_unlocked:
        current->backing_dev_info = NULL;
-
        return written ? written : err;
 }
 
@@ -822,7 +822,6 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
        int ret;
 
        mutex_lock(&inode->i_mutex);
-       __ceph_do_pending_vmtruncate(inode);
 
        if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
                ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
@@ -871,6 +870,204 @@ out:
        return offset;
 }
 
+static inline void ceph_zero_partial_page(
+       struct inode *inode, loff_t offset, unsigned size)
+{
+       struct page *page;
+       pgoff_t index = offset >> PAGE_CACHE_SHIFT;
+
+       page = find_lock_page(inode->i_mapping, index);
+       if (page) {
+               wait_on_page_writeback(page);
+               zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size);
+               unlock_page(page);
+               page_cache_release(page);
+       }
+}
+
+static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
+                                     loff_t length)
+{
+       loff_t nearly = round_up(offset, PAGE_CACHE_SIZE);
+       if (offset < nearly) {
+               loff_t size = nearly - offset;
+               if (length < size)
+                       size = length;
+               ceph_zero_partial_page(inode, offset, size);
+               offset += size;
+               length -= size;
+       }
+       if (length >= PAGE_CACHE_SIZE) {
+               loff_t size = round_down(length, PAGE_CACHE_SIZE);
+               truncate_pagecache_range(inode, offset, offset + size - 1);
+               offset += size;
+               length -= size;
+       }
+       if (length)
+               ceph_zero_partial_page(inode, offset, length);
+}
+
+static int ceph_zero_partial_object(struct inode *inode,
+                                   loff_t offset, loff_t *length)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_osd_request *req;
+       int ret = 0;
+       loff_t zero = 0;
+       int op;
+
+       if (!length) {
+               op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
+               length = &zero;
+       } else {
+               op = CEPH_OSD_OP_ZERO;
+       }
+
+       req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+                                       ceph_vino(inode),
+                                       offset, length,
+                                       1, op,
+                                       CEPH_OSD_FLAG_WRITE |
+                                       CEPH_OSD_FLAG_ONDISK,
+                                       NULL, 0, 0, false);
+       if (IS_ERR(req)) {
+               ret = PTR_ERR(req);
+               goto out;
+       }
+
+       ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
+                               &inode->i_mtime);
+
+       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+       if (!ret) {
+               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+               if (ret == -ENOENT)
+                       ret = 0;
+       }
+       ceph_osdc_put_request(req);
+
+out:
+       return ret;
+}
+
+static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
+{
+       int ret = 0;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
+       s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+       s32 object_size = ceph_file_layout_object_size(ci->i_layout);
+       u64 object_set_size = object_size * stripe_count;
+       u64 nearly, t;
+
+       /* round offset up to next period boundary */
+       nearly = offset + object_set_size - 1;
+       t = nearly;
+       nearly -= do_div(t, object_set_size);
+
+       while (length && offset < nearly) {
+               loff_t size = length;
+               ret = ceph_zero_partial_object(inode, offset, &size);
+               if (ret < 0)
+                       return ret;
+               offset += size;
+               length -= size;
+       }
+       while (length >= object_set_size) {
+               int i;
+               loff_t pos = offset;
+               for (i = 0; i < stripe_count; ++i) {
+                       ret = ceph_zero_partial_object(inode, pos, NULL);
+                       if (ret < 0)
+                               return ret;
+                       pos += stripe_unit;
+               }
+               offset += object_set_size;
+               length -= object_set_size;
+       }
+       while (length) {
+               loff_t size = length;
+               ret = ceph_zero_partial_object(inode, offset, &size);
+               if (ret < 0)
+                       return ret;
+               offset += size;
+               length -= size;
+       }
+       return ret;
+}
+
+static long ceph_fallocate(struct file *file, int mode,
+                               loff_t offset, loff_t length)
+{
+       struct ceph_file_info *fi = file->private_data;
+       struct inode *inode = file->f_dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_osd_client *osdc =
+               &ceph_inode_to_client(inode)->client->osdc;
+       int want, got = 0;
+       int dirty;
+       int ret = 0;
+       loff_t endoff = 0;
+       loff_t size;
+
+       if (!S_ISREG(inode->i_mode))
+               return -EOPNOTSUPP;
+
+       if (IS_SWAPFILE(inode))
+               return -ETXTBSY;
+
+       mutex_lock(&inode->i_mutex);
+
+       if (ceph_snap(inode) != CEPH_NOSNAP) {
+               ret = -EROFS;
+               goto unlock;
+       }
+
+       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
+               !(mode & FALLOC_FL_PUNCH_HOLE)) {
+               ret = -ENOSPC;
+               goto unlock;
+       }
+
+       size = i_size_read(inode);
+       if (!(mode & FALLOC_FL_KEEP_SIZE))
+               endoff = offset + length;
+
+       if (fi->fmode & CEPH_FILE_MODE_LAZY)
+               want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+       else
+               want = CEPH_CAP_FILE_BUFFER;
+
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+       if (ret < 0)
+               goto unlock;
+
+       if (mode & FALLOC_FL_PUNCH_HOLE) {
+               if (offset < size)
+                       ceph_zero_pagecache_range(inode, offset, length);
+               ret = ceph_zero_objects(inode, offset, length);
+       } else if (endoff > size) {
+               truncate_pagecache_range(inode, size, -1);
+               if (ceph_inode_set_size(inode, endoff))
+                       ceph_check_caps(ceph_inode(inode),
+                               CHECK_CAPS_AUTHONLY, NULL);
+       }
+
+       if (!ret) {
+               spin_lock(&ci->i_ceph_lock);
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+               spin_unlock(&ci->i_ceph_lock);
+               if (dirty)
+                       __mark_inode_dirty(inode, dirty);
+       }
+
+       ceph_put_cap_refs(ci, got);
+unlock:
+       mutex_unlock(&inode->i_mutex);
+       return ret;
+}
+
 const struct file_operations ceph_file_fops = {
        .open = ceph_open,
        .release = ceph_release,
@@ -887,5 +1084,6 @@ const struct file_operations ceph_file_fops = {
        .splice_write = generic_file_splice_write,
        .unlocked_ioctl = ceph_ioctl,
        .compat_ioctl   = ceph_ioctl,
+       .fallocate      = ceph_fallocate,
 };
 
index f3a2abf28a77df362faf5c38dc471a64dcbfdffc..602ccd8e06b7a595f1bfe11e0646b2d5b7947d34 100644 (file)
@@ -61,6 +61,14 @@ struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
        return inode;
 }
 
+struct inode *ceph_lookup_inode(struct super_block *sb, struct ceph_vino vino)
+{
+       struct inode *inode;
+       ino_t t = ceph_vino_to_ino(vino);
+       inode = ilookup5_nowait(sb, t, ceph_ino_compare, &vino);
+       return inode;
+}
+
 /*
  * get/constuct snapdir inode for a given directory
  */
@@ -344,6 +352,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
                ci->i_nr_by_mode[i] = 0;
 
+       mutex_init(&ci->i_truncate_mutex);
        ci->i_truncate_seq = 0;
        ci->i_truncate_size = 0;
        ci->i_truncate_pending = 0;
@@ -455,16 +464,20 @@ int ceph_fill_file_size(struct inode *inode, int issued,
                        dout("truncate_seq %u -> %u\n",
                             ci->i_truncate_seq, truncate_seq);
                        ci->i_truncate_seq = truncate_seq;
+
+                       /* the MDS should have revoked these caps */
+                       WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL |
+                                              CEPH_CAP_FILE_RD |
+                                              CEPH_CAP_FILE_WR |
+                                              CEPH_CAP_FILE_LAZYIO));
                        /*
                         * If we hold relevant caps, or in the case where we're
                         * not the only client referencing this file and we
                         * don't hold those caps, then we need to check whether
                         * the file is either opened or mmaped
                         */
-                       if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
-                                      CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
-                                      CEPH_CAP_FILE_EXCL|
-                                      CEPH_CAP_FILE_LAZYIO)) ||
+                       if ((issued & (CEPH_CAP_FILE_CACHE|
+                                      CEPH_CAP_FILE_BUFFER)) ||
                            mapping_mapped(inode->i_mapping) ||
                            __ceph_caps_file_wanted(ci)) {
                                ci->i_truncate_pending++;
@@ -1419,18 +1432,20 @@ static void ceph_invalidate_work(struct work_struct *work)
        u32 orig_gen;
        int check = 0;
 
+       mutex_lock(&ci->i_truncate_mutex);
        spin_lock(&ci->i_ceph_lock);
        dout("invalidate_pages %p gen %d revoking %d\n", inode,
             ci->i_rdcache_gen, ci->i_rdcache_revoking);
        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
                /* nevermind! */
                spin_unlock(&ci->i_ceph_lock);
+               mutex_unlock(&ci->i_truncate_mutex);
                goto out;
        }
        orig_gen = ci->i_rdcache_gen;
        spin_unlock(&ci->i_ceph_lock);
 
-       truncate_inode_pages(&inode->i_data, 0);
+       truncate_inode_pages(inode->i_mapping, 0);
 
        spin_lock(&ci->i_ceph_lock);
        if (orig_gen == ci->i_rdcache_gen &&
@@ -1445,6 +1460,7 @@ static void ceph_invalidate_work(struct work_struct *work)
                     ci->i_rdcache_revoking);
        }
        spin_unlock(&ci->i_ceph_lock);
+       mutex_unlock(&ci->i_truncate_mutex);
 
        if (check)
                ceph_check_caps(ci, 0, NULL);
@@ -1465,9 +1481,7 @@ static void ceph_vmtruncate_work(struct work_struct *work)
        struct inode *inode = &ci->vfs_inode;
 
        dout("vmtruncate_work %p\n", inode);
-       mutex_lock(&inode->i_mutex);
        __ceph_do_pending_vmtruncate(inode);
-       mutex_unlock(&inode->i_mutex);
        iput(inode);
 }
 
@@ -1500,11 +1514,13 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
        u64 to;
        int wrbuffer_refs, finish = 0;
 
+       mutex_lock(&ci->i_truncate_mutex);
 retry:
        spin_lock(&ci->i_ceph_lock);
        if (ci->i_truncate_pending == 0) {
                dout("__do_pending_vmtruncate %p none pending\n", inode);
                spin_unlock(&ci->i_ceph_lock);
+               mutex_unlock(&ci->i_truncate_mutex);
                return;
        }
 
@@ -1521,6 +1537,9 @@ retry:
                goto retry;
        }
 
+       /* there should be no reader or writer */
+       WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref);
+
        to = ci->i_truncate_size;
        wrbuffer_refs = ci->i_wrbuffer_ref;
        dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
@@ -1538,6 +1557,8 @@ retry:
        if (!finish)
                goto retry;
 
+       mutex_unlock(&ci->i_truncate_mutex);
+
        if (wrbuffer_refs == 0)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 
@@ -1586,8 +1607,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
-       __ceph_do_pending_vmtruncate(inode);
-
        err = inode_change_ok(inode, attr);
        if (err != 0)
                return err;
@@ -1768,7 +1787,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
             ceph_cap_string(dirtied), mask);
 
        ceph_mdsc_put_request(req);
-       __ceph_do_pending_vmtruncate(inode);
+       if (mask & CEPH_SETATTR_SIZE)
+               __ceph_do_pending_vmtruncate(inode);
        return err;
 out:
        spin_unlock(&ci->i_ceph_lock);
index e0b4ef31d3c870c9e73fecad303e9f9957542385..669622fd1ae3d52af418cc4c283a5f22513bca73 100644 (file)
@@ -196,8 +196,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
        r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
                                          &dl.object_no, &dl.object_offset,
                                          &olen);
-       if (r < 0)
+       if (r < 0) {
+               up_read(&osdc->map_sem);
                return -EIO;
+       }
        dl.file_offset -= dl.object_offset;
        dl.object_size = ceph_file_layout_object_size(ci->i_layout);
        dl.block_size = ceph_file_layout_su(ci->i_layout);
@@ -209,8 +211,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
        snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
                 ceph_ino(inode), dl.object_no);
 
-       ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap,
-               ceph_file_layout_pg_pool(ci->i_layout));
+       r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap,
+                               ceph_file_layout_pg_pool(ci->i_layout));
+       if (r < 0) {
+               up_read(&osdc->map_sem);
+               return r;
+       }
 
        dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
        if (dl.osd >= 0) {
index 187bf214444da8c8fc9c6a8603b699a258f773f8..603786b564bed08e2591761ae79aa5ef0c926849 100644 (file)
@@ -414,6 +414,9 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 {
        struct ceph_mds_session *s;
 
+       if (mds >= mdsc->mdsmap->m_max_mds)
+               return ERR_PTR(-EINVAL);
+
        s = kzalloc(sizeof(*s), GFP_NOFS);
        if (!s)
                return ERR_PTR(-ENOMEM);
@@ -1028,6 +1031,37 @@ static void remove_session_caps(struct ceph_mds_session *session)
 {
        dout("remove_session_caps on %p\n", session);
        iterate_session_caps(session, remove_session_caps_cb, NULL);
+
+       spin_lock(&session->s_cap_lock);
+       if (session->s_nr_caps > 0) {
+               struct super_block *sb = session->s_mdsc->fsc->sb;
+               struct inode *inode;
+               struct ceph_cap *cap, *prev = NULL;
+               struct ceph_vino vino;
+               /*
+                * iterate_session_caps() skips inodes that are being
+                * deleted, we need to wait until deletions are complete.
+                * __wait_on_freeing_inode() is designed for the job,
+                * but it is not exported, so use lookup inode function
+                * to access it.
+                */
+               while (!list_empty(&session->s_caps)) {
+                       cap = list_entry(session->s_caps.next,
+                                        struct ceph_cap, session_caps);
+                       if (cap == prev)
+                               break;
+                       prev = cap;
+                       vino = cap->ci->i_vino;
+                       spin_unlock(&session->s_cap_lock);
+
+                       inode = ceph_lookup_inode(sb, vino);
+                       iput(inode);
+
+                       spin_lock(&session->s_cap_lock);
+               }
+       }
+       spin_unlock(&session->s_cap_lock);
+
        BUG_ON(session->s_nr_caps > 0);
        BUG_ON(!list_empty(&session->s_cap_flushing));
        cleanup_cap_releases(session);
index cbded572345e77a107e539aa4e433d6f6f7964c0..f1e4e4766ea2d8cabf6126cd59a98f4568517895 100644 (file)
@@ -288,6 +288,7 @@ struct ceph_inode_info {
 
        int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
 
+       struct mutex i_truncate_mutex;
        u32 i_truncate_seq;        /* last truncate to smaller size */
        u64 i_truncate_size;       /*  and the size we last truncated down to */
        int i_truncate_pending;    /*  still need to call vmtruncate */
@@ -677,6 +678,8 @@ extern void ceph_destroy_inode(struct inode *inode);
 
 extern struct inode *ceph_get_inode(struct super_block *sb,
                                    struct ceph_vino vino);
+extern struct inode *ceph_lookup_inode(struct super_block *sb,
+                                      struct ceph_vino vino);
 extern struct inode *ceph_get_snapdir(struct inode *parent);
 extern int ceph_fill_file_size(struct inode *inode, int issued,
                               u32 truncate_seq, u64 truncate_size, u64 size);
index eb0a46a49bd42351d23878f118384ab09a8d2ee8..dd9b5857ef5cdad15dd50f69455d2b5929f3f8c8 100644 (file)
@@ -290,7 +290,7 @@ int ceph_msgr_init(void)
        if (ceph_msgr_slab_init())
                return -ENOMEM;
 
-       ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
+       ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0);
        if (ceph_msgr_wq)
                return 0;
 
index dd47889adc4aec94941d6f17105878ebe235db8f..1606f740d6ae0d1b7ea8aeb1ba8126fd7693888d 100644 (file)
@@ -503,7 +503,9 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
        struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
        size_t payload_len = 0;
 
-       BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+       BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+              opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
+              opcode != CEPH_OSD_OP_TRUNCATE);
 
        op->extent.offset = offset;
        op->extent.length = length;
@@ -631,6 +633,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
                break;
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
+       case CEPH_OSD_OP_ZERO:
+       case CEPH_OSD_OP_DELETE:
+       case CEPH_OSD_OP_TRUNCATE:
                if (src->op == CEPH_OSD_OP_WRITE)
                        request_data_len = src->extent.length;
                dst->extent.offset = cpu_to_le64(src->extent.offset);
@@ -715,7 +720,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        u64 object_base;
        int r;
 
-       BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+       BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+              opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
+              opcode != CEPH_OSD_OP_TRUNCATE);
 
        req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
                                        GFP_NOFS);
@@ -1488,14 +1495,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
             req, result);
 
-       ceph_decode_need(&p, end, 4, bad);
+       ceph_decode_need(&p, end, 4, bad_put);
        numops = ceph_decode_32(&p);
        if (numops > CEPH_OSD_MAX_OP)
                goto bad_put;
        if (numops != req->r_num_ops)
                goto bad_put;
        payload_len = 0;
-       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
        for (i = 0; i < numops; i++) {
                struct ceph_osd_op *op = p;
                int len;
@@ -1513,7 +1520,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                goto bad_put;
        }
 
-       ceph_decode_need(&p, end, 4 + numops * 4, bad);
+       ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
        retry_attempt = ceph_decode_32(&p);
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
@@ -1786,6 +1793,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                nr_maps--;
        }
 
+       if (!osdc->osdmap)
+               goto bad;
 done:
        downgrade_write(&osdc->map_sem);
        ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
@@ -2129,6 +2138,8 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
                        dout("osdc_start_request failed map, "
                                " will retry %lld\n", req->r_tid);
                        rc = 0;
+               } else {
+                       __unregister_request(osdc, req);
                }
                goto out_unlock;
        }
@@ -2253,12 +2264,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        if (err < 0)
                goto out_msgpool;
 
+       err = -ENOMEM;
        osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
-       if (IS_ERR(osdc->notify_wq)) {
-               err = PTR_ERR(osdc->notify_wq);
-               osdc->notify_wq = NULL;
+       if (!osdc->notify_wq)
                goto out_msgpool;
-       }
        return 0;
 
 out_msgpool:
index 603ddd92db1965e16fac61a420c4cdb5c8330bbb..dbd9a4792427455e0a2dfdd7d2249c4abcbbb798 100644 (file)
@@ -1129,7 +1129,7 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
 
        /* pg_temp? */
        pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
-                                   pool->pgp_num_mask);
+                                   pool->pg_num_mask);
        pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
        if (pg) {
                *num = pg->len;