]> Pileus Git - ~andy/linux/blobdiff - net/ceph/osd_client.c
Merge remote-tracking branches 'spi/fix/doc', 'spi/fix/nuc900' and 'spi/fix/rspi...
[~andy/linux] / net / ceph / osd_client.c
index 7130c5c83d7962b82e2c8d8c875e1fabf142bdbb..0676f2b199d672eaf61157cdf78b75ee74afb434 100644 (file)
@@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        msg_size = 4 + 4 + 8 + 8 + 4+8;
        msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
        msg_size += 1 + 8 + 4 + 4;     /* pg_t */
-       msg_size += 4 + MAX_OBJ_NAME_SIZE;
+       msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
        msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
        msg_size += 8;  /* snapid */
        msg_size += 8;  /* snap_seq */
@@ -368,7 +368,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_req_lru_item);
        INIT_LIST_HEAD(&req->r_osd_item);
 
-       req->r_oloc.pool = -1;
+       req->r_base_oloc.pool = -1;
+       req->r_target_oloc.pool = -1;
 
        /* create reply message */
        if (use_mempool)
@@ -763,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (num_ops > 1)
                osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
 
-       req->r_oloc.pool = ceph_file_layout_pg_pool(*layout);
+       req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
 
-       snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
-               vino.ino, objnum);
-       req->r_oid_len = strlen(req->r_oid);
+       snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
+                "%llx.%08llx", vino.ino, objnum);
+       req->r_base_oid.name_len = strlen(req->r_base_oid.name);
 
        return req;
 }
@@ -1249,6 +1250,45 @@ static bool __req_should_be_paused(struct ceph_osd_client *osdc,
                (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
 }
 
+/*
+ * Calculate mapping of a request to a PG.  Takes tiering into account.
+ */
+static int __calc_request_pg(struct ceph_osdmap *osdmap,
+                            struct ceph_osd_request *req,
+                            struct ceph_pg *pg_out)
+{
+       bool need_check_tiering;
+
+       need_check_tiering = false;
+       if (req->r_target_oloc.pool == -1) {
+               req->r_target_oloc = req->r_base_oloc; /* struct */
+               need_check_tiering = true;
+       }
+       if (req->r_target_oid.name_len == 0) {
+               ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
+               need_check_tiering = true;
+       }
+
+       if (need_check_tiering &&
+           (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+               struct ceph_pg_pool_info *pi;
+
+               pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
+               if (pi) {
+                       if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
+                           pi->read_tier >= 0)
+                               req->r_target_oloc.pool = pi->read_tier;
+                       if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+                           pi->write_tier >= 0)
+                               req->r_target_oloc.pool = pi->write_tier;
+               }
+               /* !pi is caught in ceph_oloc_oid_to_pg() */
+       }
+
+       return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
+                                  &req->r_target_oid, pg_out);
+}
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -1269,8 +1309,8 @@ static int __map_request(struct ceph_osd_client *osdc,
        bool was_paused;
 
        dout("map_request %p tid %lld\n", req, req->r_tid);
-       err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
-                               req->r_oloc.pool);
+
+       err = __calc_request_pg(osdc->osdmap, req, &pgid);
        if (err) {
                list_move(&req->r_req_lru_item, &osdc->req_notarget);
                return err;
@@ -1356,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
        /* fill in message content that changes each time we send it */
        put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
        put_unaligned_le32(req->r_flags, req->r_request_flags);
-       put_unaligned_le64(req->r_oloc.pool, req->r_request_pool);
+       put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
        p = req->r_request_pgid;
        ceph_encode_64(&p, req->r_pgid.pool);
        ceph_encode_32(&p, req->r_pgid.seed);
@@ -1386,6 +1426,40 @@ static void __send_queued(struct ceph_osd_client *osdc)
                __send_request(osdc, req);
 }
 
+/*
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
+                                    struct ceph_osd_request *req,
+                                    bool nofail)
+{
+       int rc;
+
+       __register_request(osdc, req);
+       req->r_sent = 0;
+       req->r_got_reply = 0;
+       rc = __map_request(osdc, req, 0);
+       if (rc < 0) {
+               if (nofail) {
+                       dout("osdc_start_request failed map, "
+                               " will retry %lld\n", req->r_tid);
+                       rc = 0;
+               } else {
+                       __unregister_request(osdc, req);
+               }
+               return rc;
+       }
+
+       if (req->r_osd == NULL) {
+               dout("send_request %p no up osds in pg\n", req);
+               ceph_monc_request_next_osdmap(&osdc->client->monc);
+       } else {
+               __send_queued(osdc);
+       }
+
+       return 0;
+}
+
 /*
  * Timeout callback, called every N seconds when 1 or more osd
  * requests has been active for more than N seconds.  When this
@@ -1457,6 +1531,109 @@ static void handle_osds_timeout(struct work_struct *work)
                              round_jiffies_relative(delay));
 }
 
+static int ceph_oloc_decode(void **p, void *end,
+                           struct ceph_object_locator *oloc)
+{
+       u8 struct_v, struct_cv;
+       u32 len;
+       void *struct_end;
+       int ret = 0;
+
+       ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+       struct_v = ceph_decode_8(p);
+       struct_cv = ceph_decode_8(p);
+       if (struct_v < 3) {
+               pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
+                       struct_v, struct_cv);
+               goto e_inval;
+       }
+       if (struct_cv > 6) {
+               pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
+                       struct_v, struct_cv);
+               goto e_inval;
+       }
+       len = ceph_decode_32(p);
+       ceph_decode_need(p, end, len, e_inval);
+       struct_end = *p + len;
+
+       oloc->pool = ceph_decode_64(p);
+       *p += 4; /* skip preferred */
+
+       len = ceph_decode_32(p);
+       if (len > 0) {
+               pr_warn("ceph_object_locator::key is set\n");
+               goto e_inval;
+       }
+
+       if (struct_v >= 5) {
+               len = ceph_decode_32(p);
+               if (len > 0) {
+                       pr_warn("ceph_object_locator::nspace is set\n");
+                       goto e_inval;
+               }
+       }
+
+       if (struct_v >= 6) {
+               s64 hash = ceph_decode_64(p);
+               if (hash != -1) {
+                       pr_warn("ceph_object_locator::hash is set\n");
+                       goto e_inval;
+               }
+       }
+
+       /* skip the rest */
+       *p = struct_end;
+out:
+       return ret;
+
+e_inval:
+       ret = -EINVAL;
+       goto out;
+}
+
+static int ceph_redirect_decode(void **p, void *end,
+                               struct ceph_request_redirect *redir)
+{
+       u8 struct_v, struct_cv;
+       u32 len;
+       void *struct_end;
+       int ret;
+
+       ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+       struct_v = ceph_decode_8(p);
+       struct_cv = ceph_decode_8(p);
+       if (struct_cv > 1) {
+               pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
+                       struct_v, struct_cv);
+               goto e_inval;
+       }
+       len = ceph_decode_32(p);
+       ceph_decode_need(p, end, len, e_inval);
+       struct_end = *p + len;
+
+       ret = ceph_oloc_decode(p, end, &redir->oloc);
+       if (ret)
+               goto out;
+
+       len = ceph_decode_32(p);
+       if (len > 0) {
+               pr_warn("ceph_request_redirect::object_name is set\n");
+               goto e_inval;
+       }
+
+       len = ceph_decode_32(p);
+       *p += len; /* skip osd_instructions */
+
+       /* skip the rest */
+       *p = struct_end;
+out:
+       return ret;
+
+e_inval:
+       ret = -EINVAL;
+       goto out;
+}
+
 static void complete_request(struct ceph_osd_request *req)
 {
        complete_all(&req->r_safe_completion);  /* fsync waiter */
@@ -1471,6 +1648,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 {
        void *p, *end;
        struct ceph_osd_request *req;
+       struct ceph_request_redirect redir;
        u64 tid;
        int object_len;
        unsigned int numops;
@@ -1509,6 +1687,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        osdmap_epoch = ceph_decode_32(&p);
 
        /* lookup */
+       down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
        if (req == NULL) {
@@ -1550,10 +1729,40 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
 
-       already_completed = req->r_got_reply;
+       if (le16_to_cpu(msg->hdr.version) >= 6) {
+               p += 8 + 4; /* skip replay_version */
+               p += 8; /* skip user_version */
 
-       if (!req->r_got_reply) {
+               err = ceph_redirect_decode(&p, end, &redir);
+               if (err)
+                       goto bad_put;
+       } else {
+               redir.oloc.pool = -1;
+       }
+
+       if (redir.oloc.pool != -1) {
+               dout("redirect pool %lld\n", redir.oloc.pool);
+
+               __unregister_request(osdc, req);
+
+               req->r_target_oloc = redir.oloc; /* struct */
 
+               /*
+                * Start redirect requests with nofail=true.  If
+                * mapping fails, request will end up on the notarget
+                * list, waiting for the new osdmap (which can take
+                * a while), even though the original request mapped
+                * successfully.  In the future we might want to follow
+                * original request's nofail setting here.
+                */
+               err = __ceph_osdc_start_request(osdc, req, true);
+               BUG_ON(err);
+
+               goto out_unlock;
+       }
+
+       already_completed = req->r_got_reply;
+       if (!req->r_got_reply) {
                req->r_result = result;
                dout("handle_reply result %d bytes %d\n", req->r_result,
                     bytes);
@@ -1567,8 +1776,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                req->r_got_reply = 1;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
                dout("handle_reply tid %llu dup ack\n", tid);
-               mutex_unlock(&osdc->request_mutex);
-               goto done;
+               goto out_unlock;
        }
 
        dout("handle_reply tid %llu flags %d\n", tid, flags);
@@ -1583,6 +1791,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                __unregister_request(osdc, req);
 
        mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
 
        if (!already_completed) {
                if (req->r_unsafe_callback &&
@@ -1600,10 +1809,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                complete_request(req);
        }
 
-done:
+out:
        dout("req=%p req->r_linger=%d\n", req, req->r_linger);
        ceph_osdc_put_request(req);
        return;
+out_unlock:
+       mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
+       goto out;
 
 bad_put:
        req->r_result = -EIO;
@@ -1616,6 +1829,7 @@ bad_put:
        ceph_osdc_put_request(req);
 bad_mutex:
        mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
 bad:
        pr_err("corrupt osd_op_reply got %d %d\n",
               (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
@@ -2118,10 +2332,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
        ceph_encode_32(&p, -1);  /* preferred */
 
        /* oid */
-       ceph_encode_32(&p, req->r_oid_len);
-       memcpy(p, req->r_oid, req->r_oid_len);
-       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
-       p += req->r_oid_len;
+       ceph_encode_32(&p, req->r_base_oid.name_len);
+       memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
+       dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
+            req->r_base_oid.name, req->r_base_oid.name_len);
+       p += req->r_base_oid.name_len;
 
        /* ops--can imply data */
        ceph_encode_16(&p, (u16)req->r_num_ops);
@@ -2175,34 +2390,16 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
                            struct ceph_osd_request *req,
                            bool nofail)
 {
-       int rc = 0;
+       int rc;
 
        down_read(&osdc->map_sem);
        mutex_lock(&osdc->request_mutex);
-       __register_request(osdc, req);
-       req->r_sent = 0;
-       req->r_got_reply = 0;
-       rc = __map_request(osdc, req, 0);
-       if (rc < 0) {
-               if (nofail) {
-                       dout("osdc_start_request failed map, "
-                               " will retry %lld\n", req->r_tid);
-                       rc = 0;
-               } else {
-                       __unregister_request(osdc, req);
-               }
-               goto out_unlock;
-       }
-       if (req->r_osd == NULL) {
-               dout("send_request %p no up osds in pg\n", req);
-               ceph_monc_request_next_osdmap(&osdc->client->monc);
-       } else {
-               __send_queued(osdc);
-       }
-       rc = 0;
-out_unlock:
+
+       rc = __ceph_osdc_start_request(osdc, req, nofail);
+
        mutex_unlock(&osdc->request_mutex);
        up_read(&osdc->map_sem);
+
        return rc;
 }
 EXPORT_SYMBOL(ceph_osdc_start_request);
@@ -2328,9 +2525,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        err = -ENOMEM;
        osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
        if (!osdc->notify_wq)
-               goto out_msgpool;
+               goto out_msgpool_reply;
+
        return 0;
 
+out_msgpool_reply:
+       ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 out_msgpool:
        ceph_msgpool_destroy(&osdc->msgpool_op);
 out_mempool: