]> Pileus Git - ~andy/linux/blobdiff - net/9p/trans_rdma.c
Merge branch 'slab/for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/penber...
[~andy/linux] / net / 9p / trans_rdma.c
index 2c69ddd691a16a0056ed071695fb3ebc6293a917..928f2bb9bf8d5d4d43d6324c3ea989a9daf12728 100644 (file)
@@ -57,9 +57,7 @@
 #define P9_RDMA_IRD            0
 #define P9_RDMA_ORD            0
 #define P9_RDMA_TIMEOUT                30000           /* 30 seconds */
-#define P9_RDMA_MAXSIZE                (4*4096)        /* Min SGE is 4, so we can
-                                                * safely advertise a maxsize
-                                                * of 64k */
+#define P9_RDMA_MAXSIZE                (1024*1024)     /* 1MB */
 
 /**
  * struct p9_trans_rdma - RDMA transport instance
@@ -75,7 +73,9 @@
  * @sq_depth: The depth of the Send Queue
  * @sq_sem: Semaphore for the SQ
  * @rq_depth: The depth of the Receive Queue.
- * @rq_count: Count of requests in the Receive Queue.
+ * @rq_sem: Semaphore for the RQ
+ * @excess_rc : Amount of posted Receive Contexts without a pending request.
+ *             See rdma_request()
  * @addr: The remote peer's address
  * @req_lock: Protects the active request list
  * @cm_done: Completion event for connection management tracking
@@ -100,7 +100,8 @@ struct p9_trans_rdma {
        int sq_depth;
        struct semaphore sq_sem;
        int rq_depth;
-       atomic_t rq_count;
+       struct semaphore rq_sem;
+       atomic_t excess_rc;
        struct sockaddr_in addr;
        spinlock_t req_lock;
 
@@ -296,6 +297,13 @@ handle_recv(struct p9_client *client, struct p9_trans_rdma *rdma,
        if (!req)
                goto err_out;
 
+       /* Check that we have not yet received a reply for this request.
+        */
+       if (unlikely(req->rc)) {
+               pr_err("Duplicate reply for request %d", tag);
+               goto err_out;
+       }
+
        req->rc = c->rc;
        req->status = REQ_STATUS_RCVD;
        p9_client_cb(client, req);
@@ -336,8 +344,8 @@ static void cq_comp_handler(struct ib_cq *cq, void *cq_context)
 
                switch (c->wc_op) {
                case IB_WC_RECV:
-                       atomic_dec(&rdma->rq_count);
                        handle_recv(client, rdma, c, wc.status, wc.byte_len);
+                       up(&rdma->rq_sem);
                        break;
 
                case IB_WC_SEND:
@@ -421,32 +429,33 @@ static int rdma_request(struct p9_client *client, struct p9_req_t *req)
        struct p9_rdma_context *c = NULL;
        struct p9_rdma_context *rpl_context = NULL;
 
+       /* When an error occurs between posting the recv and the send,
+        * there will be a receive context posted without a pending request.
+        * Since there is no way to "un-post" it, we remember it and skip
+        * post_recv() for the next request.
+        * So here,
+        * see if we are this `next request' and need to absorb an excess rc.
+        * If yes, then drop and free our own, and do not recv_post().
+        **/
+       if (unlikely(atomic_read(&rdma->excess_rc) > 0)) {
+               if ((atomic_sub_return(1, &rdma->excess_rc) >= 0)) {
+                       /* Got one ! */
+                       kfree(req->rc);
+                       req->rc = NULL;
+                       goto dont_need_post_recv;
+               } else {
+                       /* We raced and lost. */
+                       atomic_inc(&rdma->excess_rc);
+               }
+       }
+
        /* Allocate an fcall for the reply */
        rpl_context = kmalloc(sizeof *rpl_context, GFP_NOFS);
        if (!rpl_context) {
                err = -ENOMEM;
-               goto err_close;
-       }
-
-       /*
-        * If the request has a buffer, steal it, otherwise
-        * allocate a new one.  Typically, requests should already
-        * have receive buffers allocated and just swap them around
-        */
-       if (!req->rc) {
-               req->rc = kmalloc(sizeof(struct p9_fcall)+client->msize,
-                                 GFP_NOFS);
-               if (req->rc) {
-                       req->rc->sdata = (char *) req->rc +
-                                               sizeof(struct p9_fcall);
-                       req->rc->capacity = client->msize;
-               }
+               goto recv_error;
        }
        rpl_context->rc = req->rc;
-       if (!rpl_context->rc) {
-               err = -ENOMEM;
-               goto err_free2;
-       }
 
        /*
         * Post a receive buffer for this request. We need to ensure
@@ -455,29 +464,35 @@ static int rdma_request(struct p9_client *client, struct p9_req_t *req)
         * outstanding request, so we must keep a count to avoid
         * overflowing the RQ.
         */
-       if (atomic_inc_return(&rdma->rq_count) <= rdma->rq_depth) {
-               err = post_recv(client, rpl_context);
-               if (err)
-                       goto err_free1;
-       } else
-               atomic_dec(&rdma->rq_count);
+       if (down_interruptible(&rdma->rq_sem)) {
+               err = -EINTR;
+               goto recv_error;
+       }
 
+       err = post_recv(client, rpl_context);
+       if (err) {
+               p9_debug(P9_DEBUG_FCALL, "POST RECV failed\n");
+               goto recv_error;
+       }
        /* remove posted receive buffer from request structure */
        req->rc = NULL;
 
+dont_need_post_recv:
        /* Post the request */
        c = kmalloc(sizeof *c, GFP_NOFS);
        if (!c) {
                err = -ENOMEM;
-               goto err_free1;
+               goto send_error;
        }
        c->req = req;
 
        c->busa = ib_dma_map_single(rdma->cm_id->device,
                                    c->req->tc->sdata, c->req->tc->size,
                                    DMA_TO_DEVICE);
-       if (ib_dma_mapping_error(rdma->cm_id->device, c->busa))
-               goto error;
+       if (ib_dma_mapping_error(rdma->cm_id->device, c->busa)) {
+               err = -EIO;
+               goto send_error;
+       }
 
        sge.addr = c->busa;
        sge.length = c->req->tc->size;
@@ -491,22 +506,32 @@ static int rdma_request(struct p9_client *client, struct p9_req_t *req)
        wr.sg_list = &sge;
        wr.num_sge = 1;
 
-       if (down_interruptible(&rdma->sq_sem))
-               goto error;
+       if (down_interruptible(&rdma->sq_sem)) {
+               err = -EINTR;
+               goto send_error;
+       }
 
-       return ib_post_send(rdma->qp, &wr, &bad_wr);
+       err = ib_post_send(rdma->qp, &wr, &bad_wr);
+       if (err)
+               goto send_error;
 
- error:
+       /* Success */
+       return 0;
+
+ /* Handle errors that happened during or while preparing the send: */
+ send_error:
        kfree(c);
-       kfree(rpl_context->rc);
-       kfree(rpl_context);
-       p9_debug(P9_DEBUG_ERROR, "EIO\n");
-       return -EIO;
- err_free1:
-       kfree(rpl_context->rc);
- err_free2:
+       p9_debug(P9_DEBUG_ERROR, "Error %d in rdma_request()\n", err);
+
+       /* Ach.
+        *  We did recv_post(), but not send. We have one recv_post in excess.
+        */
+       atomic_inc(&rdma->excess_rc);
+       return err;
+
+ /* Handle errors that happened during or while preparing post_recv(): */
+ recv_error:
        kfree(rpl_context);
- err_close:
        spin_lock_irqsave(&rdma->req_lock, flags);
        if (rdma->state < P9_RDMA_CLOSING) {
                rdma->state = P9_RDMA_CLOSING;
@@ -551,7 +576,8 @@ static struct p9_trans_rdma *alloc_rdma(struct p9_rdma_opts *opts)
        spin_lock_init(&rdma->req_lock);
        init_completion(&rdma->cm_done);
        sema_init(&rdma->sq_sem, rdma->sq_depth);
-       atomic_set(&rdma->rq_count, 0);
+       sema_init(&rdma->rq_sem, rdma->rq_depth);
+       atomic_set(&rdma->excess_rc, 0);
 
        return rdma;
 }
@@ -562,6 +588,17 @@ static int rdma_cancel(struct p9_client *client, struct p9_req_t *req)
        return 1;
 }
 
+/* A request has been fully flushed without a reply.
+ * That means we have posted one buffer in excess.
+ */
+static int rdma_cancelled(struct p9_client *client, struct p9_req_t *req)
+{
+       struct p9_trans_rdma *rdma = client->trans;
+
+       atomic_inc(&rdma->excess_rc);
+       return 0;
+}
+
 /**
  * trans_create_rdma - Transport method for creating atransport instance
  * @client: client instance