]> Pileus Git - ~andy/linux/blobdiff - net/ceph/osd_client.c
Merge remote-tracking branches 'spi/fix/doc', 'spi/fix/nuc900' and 'spi/fix/rspi...
[~andy/linux] / net / ceph / osd_client.c
index 3997a87c4f5104d46dd704216e3e9956106be360..0676f2b199d672eaf61157cdf78b75ee74afb434 100644 (file)
@@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_osd_item);
 
        req->r_base_oloc.pool = -1;
+       req->r_target_oloc.pool = -1;
 
        /* create reply message */
        if (use_mempool)
@@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
                             struct ceph_osd_request *req,
                             struct ceph_pg *pg_out)
 {
-       if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+       bool need_check_tiering;
+
+       need_check_tiering = false;
+       if (req->r_target_oloc.pool == -1) {
+               req->r_target_oloc = req->r_base_oloc; /* struct */
+               need_check_tiering = true;
+       }
+       if (req->r_target_oid.name_len == 0) {
+               ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
+               need_check_tiering = true;
+       }
+
+       if (need_check_tiering &&
+           (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
                struct ceph_pg_pool_info *pi;
 
-               pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool);
+               pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
                if (pi) {
                        if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
                            pi->read_tier >= 0)
-                               req->r_base_oloc.pool = pi->read_tier;
+                               req->r_target_oloc.pool = pi->read_tier;
                        if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
                            pi->write_tier >= 0)
-                               req->r_base_oloc.pool = pi->write_tier;
+                               req->r_target_oloc.pool = pi->write_tier;
                }
                /* !pi is caught in ceph_oloc_oid_to_pg() */
        }
 
-       return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc,
-                                  &req->r_base_oid, pg_out);
+       return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
+                                  &req->r_target_oid, pg_out);
 }
 
 /*
@@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
        /* fill in message content that changes each time we send it */
        put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
        put_unaligned_le32(req->r_flags, req->r_request_flags);
-       put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool);
+       put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
        p = req->r_request_pgid;
        ceph_encode_64(&p, req->r_pgid.pool);
        ceph_encode_32(&p, req->r_pgid.seed);
@@ -1412,6 +1426,40 @@ static void __send_queued(struct ceph_osd_client *osdc)
                __send_request(osdc, req);
 }
 
+/*
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
+                                    struct ceph_osd_request *req,
+                                    bool nofail)
+{
+       int rc;
+
+       __register_request(osdc, req);
+       req->r_sent = 0;
+       req->r_got_reply = 0;
+       rc = __map_request(osdc, req, 0);
+       if (rc < 0) {
+               if (nofail) {
+                       dout("osdc_start_request failed map, "
+                               " will retry %lld\n", req->r_tid);
+                       rc = 0;
+               } else {
+                       __unregister_request(osdc, req);
+               }
+               return rc;
+       }
+
+       if (req->r_osd == NULL) {
+               dout("send_request %p no up osds in pg\n", req);
+               ceph_monc_request_next_osdmap(&osdc->client->monc);
+       } else {
+               __send_queued(osdc);
+       }
+
+       return 0;
+}
+
 /*
  * Timeout callback, called every N seconds when 1 or more osd
  * requests has been active for more than N seconds.  When this
@@ -1483,6 +1531,109 @@ static void handle_osds_timeout(struct work_struct *work)
                              round_jiffies_relative(delay));
 }
 
+static int ceph_oloc_decode(void **p, void *end,
+                           struct ceph_object_locator *oloc)
+{
+       u8 struct_v, struct_cv;
+       u32 len;
+       void *struct_end;
+       int ret = 0;
+
+       ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+       struct_v = ceph_decode_8(p);
+       struct_cv = ceph_decode_8(p);
+       if (struct_v < 3) {
+               pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
+                       struct_v, struct_cv);
+               goto e_inval;
+       }
+       if (struct_cv > 6) {
+               pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
+                       struct_v, struct_cv);
+               goto e_inval;
+       }
+       len = ceph_decode_32(p);
+       ceph_decode_need(p, end, len, e_inval);
+       struct_end = *p + len;
+
+       oloc->pool = ceph_decode_64(p);
+       *p += 4; /* skip preferred */
+
+       len = ceph_decode_32(p);
+       if (len > 0) {
+               pr_warn("ceph_object_locator::key is set\n");
+               goto e_inval;
+       }
+
+       if (struct_v >= 5) {
+               len = ceph_decode_32(p);
+               if (len > 0) {
+                       pr_warn("ceph_object_locator::nspace is set\n");
+                       goto e_inval;
+               }
+       }
+
+       if (struct_v >= 6) {
+               s64 hash = ceph_decode_64(p);
+               if (hash != -1) {
+                       pr_warn("ceph_object_locator::hash is set\n");
+                       goto e_inval;
+               }
+       }
+
+       /* skip the rest */
+       *p = struct_end;
+out:
+       return ret;
+
+e_inval:
+       ret = -EINVAL;
+       goto out;
+}
+
+static int ceph_redirect_decode(void **p, void *end,
+                               struct ceph_request_redirect *redir)
+{
+       u8 struct_v, struct_cv;
+       u32 len;
+       void *struct_end;
+       int ret;
+
+       ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+       struct_v = ceph_decode_8(p);
+       struct_cv = ceph_decode_8(p);
+       if (struct_cv > 1) {
+               pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
+                       struct_v, struct_cv);
+               goto e_inval;
+       }
+       len = ceph_decode_32(p);
+       ceph_decode_need(p, end, len, e_inval);
+       struct_end = *p + len;
+
+       ret = ceph_oloc_decode(p, end, &redir->oloc);
+       if (ret)
+               goto out;
+
+       len = ceph_decode_32(p);
+       if (len > 0) {
+               pr_warn("ceph_request_redirect::object_name is set\n");
+               goto e_inval;
+       }
+
+       len = ceph_decode_32(p);
+       *p += len; /* skip osd_instructions */
+
+       /* skip the rest */
+       *p = struct_end;
+out:
+       return ret;
+
+e_inval:
+       ret = -EINVAL;
+       goto out;
+}
+
 static void complete_request(struct ceph_osd_request *req)
 {
        complete_all(&req->r_safe_completion);  /* fsync waiter */
@@ -1497,6 +1648,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 {
        void *p, *end;
        struct ceph_osd_request *req;
+       struct ceph_request_redirect redir;
        u64 tid;
        int object_len;
        unsigned int numops;
@@ -1535,6 +1687,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        osdmap_epoch = ceph_decode_32(&p);
 
        /* lookup */
+       down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
        if (req == NULL) {
@@ -1576,10 +1729,40 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
 
-       already_completed = req->r_got_reply;
+       if (le16_to_cpu(msg->hdr.version) >= 6) {
+               p += 8 + 4; /* skip replay_version */
+               p += 8; /* skip user_version */
 
-       if (!req->r_got_reply) {
+               err = ceph_redirect_decode(&p, end, &redir);
+               if (err)
+                       goto bad_put;
+       } else {
+               redir.oloc.pool = -1;
+       }
+
+       if (redir.oloc.pool != -1) {
+               dout("redirect pool %lld\n", redir.oloc.pool);
+
+               __unregister_request(osdc, req);
+
+               req->r_target_oloc = redir.oloc; /* struct */
 
+               /*
+                * Start redirect requests with nofail=true.  If
+                * mapping fails, request will end up on the notarget
+                * list, waiting for the new osdmap (which can take
+                * a while), even though the original request mapped
+                * successfully.  In the future we might want to follow
+                * original request's nofail setting here.
+                */
+               err = __ceph_osdc_start_request(osdc, req, true);
+               BUG_ON(err);
+
+               goto out_unlock;
+       }
+
+       already_completed = req->r_got_reply;
+       if (!req->r_got_reply) {
                req->r_result = result;
                dout("handle_reply result %d bytes %d\n", req->r_result,
                     bytes);
@@ -1593,8 +1776,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                req->r_got_reply = 1;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
                dout("handle_reply tid %llu dup ack\n", tid);
-               mutex_unlock(&osdc->request_mutex);
-               goto done;
+               goto out_unlock;
        }
 
        dout("handle_reply tid %llu flags %d\n", tid, flags);
@@ -1609,6 +1791,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                __unregister_request(osdc, req);
 
        mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
 
        if (!already_completed) {
                if (req->r_unsafe_callback &&
@@ -1626,10 +1809,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                complete_request(req);
        }
 
-done:
+out:
        dout("req=%p req->r_linger=%d\n", req, req->r_linger);
        ceph_osdc_put_request(req);
        return;
+out_unlock:
+       mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
+       goto out;
 
 bad_put:
        req->r_result = -EIO;
@@ -1642,6 +1829,7 @@ bad_put:
        ceph_osdc_put_request(req);
 bad_mutex:
        mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
 bad:
        pr_err("corrupt osd_op_reply got %d %d\n",
               (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
@@ -2202,34 +2390,16 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
                            struct ceph_osd_request *req,
                            bool nofail)
 {
-       int rc = 0;
+       int rc;
 
        down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
-       __register_request(osdc, req);
-       req->r_sent = 0;
-       req->r_got_reply = 0;
-       rc = __map_request(osdc, req, 0);
-       if (rc < 0) {
-               if (nofail) {
-                       dout("osdc_start_request failed map, "
-                               " will retry %lld\n", req->r_tid);
-                       rc = 0;
-               } else {
-                       __unregister_request(osdc, req);
-               }
-               goto out_unlock;
-       }
-       if (req->r_osd == NULL) {
-               dout("send_request %p no up osds in pg\n", req);
-               ceph_monc_request_next_osdmap(&osdc->client->monc);
-       } else {
-               __send_queued(osdc);
-       }
-       rc = 0;
-out_unlock:
+
+       rc = __ceph_osdc_start_request(osdc, req, nofail);
+
        mutex_unlock(&osdc->request_mutex);
        up_read(&osdc->map_sem);
+
        return rc;
 }
 EXPORT_SYMBOL(ceph_osdc_start_request);
@@ -2355,9 +2525,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        err = -ENOMEM;
        osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
        if (!osdc->notify_wq)
-               goto out_msgpool;
+               goto out_msgpool_reply;
+
        return 0;
 
+out_msgpool_reply:
+       ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 out_msgpool:
        ceph_msgpool_destroy(&osdc->msgpool_op);
 out_mempool: