X-Git-Url: http://pileus.org/git/?a=blobdiff_plain;f=net%2Fsunrpc%2Fsvcsock.c;h=3e65719f1ef698891d8e81ebea6c57e72b212717;hb=011fec7486028977e2b4012afb84235759c6ad82;hp=5fb537e4d63bc4cbc0a02811fbf2767645b61af9;hpb=a6046f71f2b598af241e7496a8ead90f2979224b;p=~andy%2Flinux diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 5fb537e4d63..3e65719f1ef 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -45,80 +46,44 @@ #include #include #include +#include #include #include -/* SMP locking strategy: - * - * svc_pool->sp_lock protects most of the fields of that pool. - * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. - * when both need to be taken (rare), svc_serv->sv_lock is first. - * BKL protects svc_serv->sv_nrthread. - * svc_sock->sk_lock protects the svc_sock->sk_deferred list - * and the ->sk_info_authunix cache. - * svc_sock->sk_xprt.xpt_flags.XPT_BUSY prevents a svc_sock being - * enqueued multiply. - * - * Some flags can be set to certain values at any time - * providing that certain rules are followed: - * - * XPT_CONN, XPT_DATA, can be set or cleared at any time. - * after a set, svc_xprt_enqueue must be called. - * after a clear, the socket must be read/accepted - * if this succeeds, it must be set again. - * XPT_CLOSE can set at any time. It is never cleared. - * xpt_ref contains a bias of '1' until XPT_DEAD is set. - * so when xprt_ref hits zero, we know the transport is dead - * and no-one is using it. - * XPT_DEAD can only be set while XPT_BUSY is held which ensures - * no other thread will be using the socket or will try to - * set XPT_DEAD. - * - */ - #define RPCDBG_FACILITY RPCDBG_SVCXPRT static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *, int *errp, int flags); -static void svc_delete_xprt(struct svc_xprt *xprt); static void svc_udp_data_ready(struct sock *, int); static int svc_udp_recvfrom(struct svc_rqst *); static int svc_udp_sendto(struct svc_rqst *); -static void svc_close_xprt(struct svc_xprt *xprt); static void svc_sock_detach(struct svc_xprt *); static void svc_sock_free(struct svc_xprt *); -static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk); -static int svc_deferred_recv(struct svc_rqst *rqstp); -static struct cache_deferred_req *svc_defer(struct cache_req *req); static struct svc_xprt *svc_create_socket(struct svc_serv *, int, struct sockaddr *, int, int); - -/* apparently the "standard" is that clients close - * idle connections after 5 minutes, servers after - * 6 minutes - * http://www.connectathon.org/talks96/nfstcp.pdf - */ -static int svc_conn_age_period = 6*60; - #ifdef CONFIG_DEBUG_LOCK_ALLOC static struct lock_class_key svc_key[2]; static struct lock_class_key svc_slock_key[2]; -static inline void svc_reclassify_socket(struct socket *sock) +static void svc_reclassify_socket(struct socket *sock) { struct sock *sk = sock->sk; BUG_ON(sock_owned_by_user(sk)); switch (sk->sk_family) { case AF_INET: sock_lock_init_class_and_name(sk, "slock-AF_INET-NFSD", - &svc_slock_key[0], "sk_lock-AF_INET-NFSD", &svc_key[0]); + &svc_slock_key[0], + "sk_xprt.xpt_lock-AF_INET-NFSD", + &svc_key[0]); break; case AF_INET6: sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFSD", - &svc_slock_key[1], "sk_lock-AF_INET6-NFSD", &svc_key[1]); + &svc_slock_key[1], + "sk_xprt.xpt_lock-AF_INET6-NFSD", + &svc_key[1]); break; default: @@ -126,67 +91,11 @@ static inline void svc_reclassify_socket(struct socket *sock) } } #else -static inline void svc_reclassify_socket(struct socket *sock) +static void svc_reclassify_socket(struct socket *sock) { } #endif -static char *__svc_print_addr(struct sockaddr *addr, char *buf, size_t len) -{ - switch (addr->sa_family) { - case AF_INET: - snprintf(buf, len, "%u.%u.%u.%u, port=%u", - NIPQUAD(((struct sockaddr_in *) addr)->sin_addr), - ntohs(((struct sockaddr_in *) addr)->sin_port)); - break; - - case AF_INET6: - snprintf(buf, len, "%x:%x:%x:%x:%x:%x:%x:%x, port=%u", - NIP6(((struct sockaddr_in6 *) addr)->sin6_addr), - ntohs(((struct sockaddr_in6 *) addr)->sin6_port)); - break; - - default: - snprintf(buf, len, "unknown address type: %d", addr->sa_family); - break; - } - return buf; -} - -/** - * svc_print_addr - Format rq_addr field for printing - * @rqstp: svc_rqst struct containing address to print - * @buf: target buffer for formatted address - * @len: length of target buffer - * - */ -char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) -{ - return __svc_print_addr(svc_addr(rqstp), buf, len); -} -EXPORT_SYMBOL_GPL(svc_print_addr); - -/* - * Queue up an idle server thread. Must have pool->sp_lock held. - * Note: this is really a stack rather than a queue, so that we only - * use as many different threads as we need, and the rest don't pollute - * the cache. - */ -static inline void -svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) -{ - list_add(&rqstp->rq_list, &pool->sp_threads); -} - -/* - * Dequeue an nfsd thread. Must have pool->sp_lock held. - */ -static inline void -svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) -{ - list_del(&rqstp->rq_list); -} - /* * Release an skbuff after use */ @@ -196,10 +105,12 @@ static void svc_release_skb(struct svc_rqst *rqstp) struct svc_deferred_req *dr = rqstp->rq_deferred; if (skb) { + struct svc_sock *svsk = + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); rqstp->rq_xprt_ctxt = NULL; dprintk("svc: service %p, releasing skb %p\n", rqstp, skb); - skb_free_datagram(rqstp->rq_sock->sk_sk, skb); + skb_free_datagram(svsk->sk_sk, skb); } if (dr) { rqstp->rq_deferred = NULL; @@ -207,223 +118,6 @@ static void svc_release_skb(struct svc_rqst *rqstp) } } -/* - * Queue up a socket with data pending. If there are idle nfsd - * processes, wake 'em up. - * - */ -void svc_xprt_enqueue(struct svc_xprt *xprt) -{ - struct svc_serv *serv = xprt->xpt_server; - struct svc_pool *pool; - struct svc_rqst *rqstp; - int cpu; - - if (!(xprt->xpt_flags & - ((1<xpt_flags)) - return; - - cpu = get_cpu(); - pool = svc_pool_for_cpu(xprt->xpt_server, cpu); - put_cpu(); - - spin_lock_bh(&pool->sp_lock); - - if (!list_empty(&pool->sp_threads) && - !list_empty(&pool->sp_sockets)) - printk(KERN_ERR - "svc_xprt_enqueue: " - "threads and transports both waiting??\n"); - - if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { - /* Don't enqueue dead sockets */ - dprintk("svc: transport %p is dead, not enqueued\n", xprt); - goto out_unlock; - } - - /* Mark socket as busy. It will remain in this state until the - * server has processed all pending data and put the socket back - * on the idle list. We update XPT_BUSY atomically because - * it also guards against trying to enqueue the svc_sock twice. - */ - if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { - /* Don't enqueue socket while already enqueued */ - dprintk("svc: transport %p busy, not enqueued\n", xprt); - goto out_unlock; - } - BUG_ON(xprt->xpt_pool != NULL); - xprt->xpt_pool = pool; - - /* Handle pending connection */ - if (test_bit(XPT_CONN, &xprt->xpt_flags)) - goto process; - - /* Handle close in-progress */ - if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) - goto process; - - /* Check if we have space to reply to a request */ - if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { - /* Don't enqueue while not enough space for reply */ - dprintk("svc: no write space, transport %p not enqueued\n", - xprt); - xprt->xpt_pool = NULL; - clear_bit(XPT_BUSY, &xprt->xpt_flags); - goto out_unlock; - } - - process: - if (!list_empty(&pool->sp_threads)) { - rqstp = list_entry(pool->sp_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: transport %p served by daemon %p\n", - xprt, rqstp); - svc_thread_dequeue(pool, rqstp); - if (rqstp->rq_xprt) - printk(KERN_ERR - "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", - rqstp, rqstp->rq_xprt); - rqstp->rq_xprt = xprt; - svc_xprt_get(xprt); - rqstp->rq_reserved = serv->sv_max_mesg; - atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); - BUG_ON(xprt->xpt_pool != pool); - wake_up(&rqstp->rq_wait); - } else { - dprintk("svc: transport %p put into queue\n", xprt); - list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); - BUG_ON(xprt->xpt_pool != pool); - } - -out_unlock: - spin_unlock_bh(&pool->sp_lock); -} -EXPORT_SYMBOL_GPL(svc_xprt_enqueue); - -/* - * Dequeue the first socket. Must be called with the pool->sp_lock held. - */ -static inline struct svc_sock * -svc_sock_dequeue(struct svc_pool *pool) -{ - struct svc_sock *svsk; - - if (list_empty(&pool->sp_sockets)) - return NULL; - - svsk = list_entry(pool->sp_sockets.next, - struct svc_sock, sk_xprt.xpt_ready); - list_del_init(&svsk->sk_xprt.xpt_ready); - - dprintk("svc: socket %p dequeued, inuse=%d\n", - svsk->sk_sk, atomic_read(&svsk->sk_xprt.xpt_ref.refcount)); - - return svsk; -} - -/* - * svc_xprt_received conditionally queues the transport for processing - * by another thread. The caller must hold the XPT_BUSY bit and must - * not thereafter touch transport data. - * - * Note: XPT_DATA only gets cleared when a read-attempt finds no (or - * insufficient) data. - */ -void svc_xprt_received(struct svc_xprt *xprt) -{ - BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); - xprt->xpt_pool = NULL; - clear_bit(XPT_BUSY, &xprt->xpt_flags); - svc_xprt_enqueue(xprt); -} -EXPORT_SYMBOL_GPL(svc_xprt_received); - -/** - * svc_reserve - change the space reserved for the reply to a request. - * @rqstp: The request in question - * @space: new max space to reserve - * - * Each request reserves some space on the output queue of the socket - * to make sure the reply fits. This function reduces that reserved - * space to be the amount of space used already, plus @space. - * - */ -void svc_reserve(struct svc_rqst *rqstp, int space) -{ - space += rqstp->rq_res.head[0].iov_len; - - if (space < rqstp->rq_reserved) { - struct svc_xprt *xprt = rqstp->rq_xprt; - atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); - rqstp->rq_reserved = space; - - svc_xprt_enqueue(xprt); - } -} - -static void -svc_sock_release(struct svc_rqst *rqstp) -{ - struct svc_sock *svsk = rqstp->rq_sock; - - rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); - - svc_free_res_pages(rqstp); - rqstp->rq_res.page_len = 0; - rqstp->rq_res.page_base = 0; - - - /* Reset response buffer and release - * the reservation. - * But first, check that enough space was reserved - * for the reply, otherwise we have a bug! - */ - if ((rqstp->rq_res.len) > rqstp->rq_reserved) - printk(KERN_ERR "RPC request reserved %d but used %d\n", - rqstp->rq_reserved, - rqstp->rq_res.len); - - rqstp->rq_res.head[0].iov_len = 0; - svc_reserve(rqstp, 0); - rqstp->rq_sock = NULL; - - svc_xprt_put(&svsk->sk_xprt); -} - -/* - * External function to wake up a server waiting for data - * This really only makes sense for services like lockd - * which have exactly one thread anyway. - */ -void -svc_wake_up(struct svc_serv *serv) -{ - struct svc_rqst *rqstp; - unsigned int i; - struct svc_pool *pool; - - for (i = 0; i < serv->sv_nrpools; i++) { - pool = &serv->sv_pools[i]; - - spin_lock_bh(&pool->sp_lock); - if (!list_empty(&pool->sp_threads)) { - rqstp = list_entry(pool->sp_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: daemon %p woken up.\n", rqstp); - /* - svc_thread_dequeue(pool, rqstp); - rqstp->rq_sock = NULL; - */ - wake_up(&rqstp->rq_wait); - } - spin_unlock_bh(&pool->sp_lock); - } -} - union svc_pktinfo_u { struct in_pktinfo pkti; struct in6_pktinfo pkti6; @@ -433,7 +127,9 @@ union svc_pktinfo_u { static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh) { - switch (rqstp->rq_sock->sk_sk->sk_family) { + struct svc_sock *svsk = + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); + switch (svsk->sk_sk->sk_family) { case AF_INET: { struct in_pktinfo *pki = CMSG_DATA(cmh); @@ -463,10 +159,10 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh) /* * Generic sendto routine */ -static int -svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) +static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) { - struct svc_sock *svsk = rqstp->rq_sock; + struct svc_sock *svsk = + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); struct socket *sock = svsk->sk_sock; int slen; union { @@ -481,7 +177,7 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) size_t base = xdr->page_base; unsigned int pglen = xdr->page_len; unsigned int flags = MSG_MORE; - char buf[RPC_MAX_ADDRBUFLEN]; + RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]); slen = xdr->len; @@ -539,7 +235,7 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) } out: dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n", - rqstp->rq_sock, xdr->head[0].iov_base, xdr->head[0].iov_len, + svsk, xdr->head[0].iov_base, xdr->head[0].iov_len, xdr->len, len, svc_print_addr(rqstp, buf, sizeof(buf))); return len; @@ -598,8 +294,7 @@ EXPORT_SYMBOL(svc_sock_names); /* * Check input queue length */ -static int -svc_recv_available(struct svc_sock *svsk) +static int svc_recv_available(struct svc_sock *svsk) { struct socket *sock = svsk->sk_sock; int avail, err; @@ -612,48 +307,31 @@ svc_recv_available(struct svc_sock *svsk) /* * Generic recvfrom routine. */ -static int -svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, int buflen) +static int svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, + int buflen) { - struct svc_sock *svsk = rqstp->rq_sock; + struct svc_sock *svsk = + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); struct msghdr msg = { .msg_flags = MSG_DONTWAIT, }; - struct sockaddr *sin; int len; + rqstp->rq_xprt_hlen = 0; + len = kernel_recvmsg(svsk->sk_sock, &msg, iov, nr, buflen, msg.msg_flags); - /* sock_recvmsg doesn't fill in the name/namelen, so we must.. - */ - memcpy(&rqstp->rq_addr, &svsk->sk_remote, svsk->sk_remotelen); - rqstp->rq_addrlen = svsk->sk_remotelen; - - /* Destination address in request is needed for binding the - * source address in RPC callbacks later. - */ - sin = (struct sockaddr *)&svsk->sk_local; - switch (sin->sa_family) { - case AF_INET: - rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; - break; - case AF_INET6: - rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; - break; - } - dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n", svsk, iov[0].iov_base, iov[0].iov_len, len); - return len; } /* * Set socket snd and rcv buffer lengths */ -static inline void -svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv) +static void svc_sock_setbufsize(struct socket *sock, unsigned int snd, + unsigned int rcv) { #if 0 mm_segment_t oldfs; @@ -678,8 +356,7 @@ svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv) /* * INET callback when data has been received on the socket. */ -static void -svc_udp_data_ready(struct sock *sk, int count) +static void svc_udp_data_ready(struct sock *sk, int count) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; @@ -697,8 +374,7 @@ svc_udp_data_ready(struct sock *sk, int count) /* * INET callback when space is newly available on the socket. */ -static void -svc_write_space(struct sock *sk) +static void svc_write_space(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data); @@ -715,10 +391,19 @@ svc_write_space(struct sock *sk) } } -static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp, - struct cmsghdr *cmh) +/* + * Copy the UDP datagram's destination address to the rqstp structure. + * The 'destination' address in this case is the address to which the + * peer sent the datagram, i.e. our local address. For multihomed + * hosts, this can change from msg to msg. Note that only the IP + * address changes, the port number should remain the same. + */ +static void svc_udp_get_dest_address(struct svc_rqst *rqstp, + struct cmsghdr *cmh) { - switch (rqstp->rq_sock->sk_sk->sk_family) { + struct svc_sock *svsk = + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); + switch (svsk->sk_sk->sk_family) { case AF_INET: { struct in_pktinfo *pki = CMSG_DATA(cmh); rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr; @@ -735,10 +420,10 @@ static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp, /* * Receive a datagram from a UDP socket. */ -static int -svc_udp_recvfrom(struct svc_rqst *rqstp) +static int svc_udp_recvfrom(struct svc_rqst *rqstp) { - struct svc_sock *svsk = rqstp->rq_sock; + struct svc_sock *svsk = + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); struct svc_serv *serv = svsk->sk_xprt.xpt_server; struct sk_buff *skb; union { @@ -767,11 +452,6 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) (serv->sv_nrthreads+3) * serv->sv_max_mesg, (serv->sv_nrthreads+3) * serv->sv_max_mesg); - if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) { - svc_xprt_received(&svsk->sk_xprt); - return svc_deferred_recv(rqstp); - } - clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); skb = NULL; err = kernel_recvmsg(svsk->sk_sock, &msg, NULL, @@ -788,7 +468,10 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) svc_xprt_received(&svsk->sk_xprt); return -EAGAIN; } - rqstp->rq_addrlen = sizeof(rqstp->rq_addr); + len = svc_addr_len(svc_addr(rqstp)); + if (len < 0) + return len; + rqstp->rq_addrlen = len; if (skb->tstamp.tv64 == 0) { skb->tstamp = ktime_get_real(); /* Don't enable netstamp, sunrpc doesn't @@ -831,7 +514,8 @@ svc_udp_recvfrom(struct svc_rqst *rqstp) skb_free_datagram(svsk->sk_sk, skb); } else { /* we can use it in-place */ - rqstp->rq_arg.head[0].iov_base = skb->data + sizeof(struct udphdr); + rqstp->rq_arg.head[0].iov_base = skb->data + + sizeof(struct udphdr); rqstp->rq_arg.head[0].iov_len = len; if (skb_checksum_complete(skb)) { skb_free_datagram(svsk->sk_sk, skb); @@ -930,6 +614,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv) mm_segment_t oldfs; svc_xprt_init(&svc_udp_class, &svsk->sk_xprt, serv); + clear_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags); svsk->sk_sk->sk_data_ready = svc_udp_data_ready; svsk->sk_sk->sk_write_space = svc_write_space; @@ -941,7 +626,8 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv) 3 * svsk->sk_xprt.xpt_server->sv_max_mesg, 3 * svsk->sk_xprt.xpt_server->sv_max_mesg); - set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* might have come in before data_ready set up */ + /* data might have come in before data_ready set up */ + set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags); oldfs = get_fs(); @@ -956,8 +642,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv) * A data_ready event on a listening socket means there's a connection * pending. Do not use state_change as a substitute for it. */ -static void -svc_tcp_listen_data_ready(struct sock *sk, int count_unused) +static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; @@ -989,8 +674,7 @@ svc_tcp_listen_data_ready(struct sock *sk, int count_unused) /* * A state change on a connected socket means it's dying or dead. */ -static void -svc_tcp_state_change(struct sock *sk) +static void svc_tcp_state_change(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; @@ -1007,8 +691,7 @@ svc_tcp_state_change(struct sock *sk) wake_up_interruptible_all(sk->sk_sleep); } -static void -svc_tcp_data_ready(struct sock *sk, int count) +static void svc_tcp_data_ready(struct sock *sk, int count) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; @@ -1022,20 +705,6 @@ svc_tcp_data_ready(struct sock *sk, int count) wake_up_interruptible(sk->sk_sleep); } -static inline int svc_port_is_privileged(struct sockaddr *sin) -{ - switch (sin->sa_family) { - case AF_INET: - return ntohs(((struct sockaddr_in *)sin)->sin_port) - < PROT_SOCK; - case AF_INET6: - return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) - < PROT_SOCK; - default: - return 0; - } -} - /* * Accept a TCP connection */ @@ -1049,7 +718,7 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt) struct socket *newsock; struct svc_sock *newsvsk; int err, slen; - char buf[RPC_MAX_ADDRBUFLEN]; + RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]); dprintk("svc: tcp_accept %p sock %p\n", svsk, sock); if (!sock) @@ -1097,16 +766,13 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt) if (!(newsvsk = svc_setup_socket(serv, newsock, &err, (SVC_SOCK_ANONYMOUS | SVC_SOCK_TEMPORARY)))) goto failed; - memcpy(&newsvsk->sk_remote, sin, slen); - newsvsk->sk_remotelen = slen; + svc_xprt_set_remote(&newsvsk->sk_xprt, sin, slen); err = kernel_getsockname(newsock, sin, &slen); if (unlikely(err < 0)) { dprintk("svc_tcp_accept: kernel_getsockname error %d\n", -err); slen = offsetof(struct sockaddr, sa_data); } - memcpy(&newsvsk->sk_local, sin, slen); - - svc_xprt_received(&newsvsk->sk_xprt); + svc_xprt_set_local(&newsvsk->sk_xprt, sin, slen); if (serv->sv_stats) serv->sv_stats->nettcpconn++; @@ -1121,10 +787,10 @@ failed: /* * Receive data from a TCP socket. */ -static int -svc_tcp_recvfrom(struct svc_rqst *rqstp) +static int svc_tcp_recvfrom(struct svc_rqst *rqstp) { - struct svc_sock *svsk = rqstp->rq_sock; + struct svc_sock *svsk = + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); struct svc_serv *serv = svsk->sk_xprt.xpt_server; int len; struct kvec *vec; @@ -1135,11 +801,6 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags), test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags)); - if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) { - svc_xprt_received(&svsk->sk_xprt); - return svc_deferred_recv(rqstp); - } - if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags)) /* sndbuf needs to have room for one request * per thread, otherwise we can stall even when the @@ -1163,8 +824,8 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) * the next four bytes. Otherwise try to gobble up as much as * possible up to the complete record length. */ - if (svsk->sk_tcplen < 4) { - unsigned long want = 4 - svsk->sk_tcplen; + if (svsk->sk_tcplen < sizeof(rpc_fraghdr)) { + int want = sizeof(rpc_fraghdr) - svsk->sk_tcplen; struct kvec iov; iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen; @@ -1174,32 +835,31 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) svsk->sk_tcplen += len; if (len < want) { - dprintk("svc: short recvfrom while reading record length (%d of %lu)\n", - len, want); + dprintk("svc: short recvfrom while reading record " + "length (%d of %d)\n", len, want); svc_xprt_received(&svsk->sk_xprt); return -EAGAIN; /* record header not complete */ } svsk->sk_reclen = ntohl(svsk->sk_reclen); - if (!(svsk->sk_reclen & 0x80000000)) { + if (!(svsk->sk_reclen & RPC_LAST_STREAM_FRAGMENT)) { /* FIXME: technically, a record can be fragmented, * and non-terminal fragments will not have the top * bit set in the fragment length header. * But apparently no known nfs clients send fragmented * records. */ if (net_ratelimit()) - printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx" - " (non-terminal)\n", - (unsigned long) svsk->sk_reclen); + printk(KERN_NOTICE "RPC: multiple fragments " + "per record not supported\n"); goto err_delete; } - svsk->sk_reclen &= 0x7fffffff; + svsk->sk_reclen &= RPC_FRAGMENT_SIZE_MASK; dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen); if (svsk->sk_reclen > serv->sv_max_mesg) { if (net_ratelimit()) - printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx" - " (large)\n", - (unsigned long) svsk->sk_reclen); + printk(KERN_NOTICE "RPC: " + "fragment too large: 0x%08lx\n", + (unsigned long)svsk->sk_reclen); goto err_delete; } } @@ -1252,6 +912,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) svsk->sk_reclen = 0; svsk->sk_tcplen = 0; + svc_xprt_copy_addrs(rqstp, &svsk->sk_xprt); svc_xprt_received(&svsk->sk_xprt); if (serv->sv_stats) serv->sv_stats->nettcpcnt++; @@ -1278,8 +939,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) /* * Send out data on TCP socket. */ -static int -svc_tcp_sendto(struct svc_rqst *rqstp) +static int svc_tcp_sendto(struct svc_rqst *rqstp) { struct xdr_buf *xbufp = &rqstp->rq_res; int sent; @@ -1292,16 +952,18 @@ svc_tcp_sendto(struct svc_rqst *rqstp) reclen = htonl(0x80000000|((xbufp->len ) - 4)); memcpy(xbufp->head[0].iov_base, &reclen, 4); - if (test_bit(XPT_DEAD, &rqstp->rq_sock->sk_xprt.xpt_flags)) + if (test_bit(XPT_DEAD, &rqstp->rq_xprt->xpt_flags)) return -ENOTCONN; sent = svc_sendto(rqstp, &rqstp->rq_res); if (sent != xbufp->len) { - printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n", - rqstp->rq_sock->sk_xprt.xpt_server->sv_name, + printk(KERN_NOTICE + "rpc-srv/tcp: %s: %s %d when sending %d bytes " + "- shutting down socket\n", + rqstp->rq_xprt->xpt_server->sv_name, (sent<0)?"got error":"sent only", sent, xbufp->len); - set_bit(XPT_CLOSE, &rqstp->rq_sock->sk_xprt.xpt_flags); + set_bit(XPT_CLOSE, &rqstp->rq_xprt->xpt_flags); svc_xprt_enqueue(rqstp->rq_xprt); sent = -EAGAIN; } @@ -1321,7 +983,7 @@ static void svc_tcp_prep_reply_hdr(struct svc_rqst *rqstp) static int svc_tcp_has_wspace(struct svc_xprt *xprt) { - struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); + struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); struct svc_serv *serv = svsk->sk_xprt.xpt_server; int required; int wspace; @@ -1384,10 +1046,9 @@ void svc_cleanup_xprt_sock(void) static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv) { struct sock *sk = svsk->sk_sk; - struct tcp_sock *tp = tcp_sk(sk); svc_xprt_init(&svc_tcp_class, &svsk->sk_xprt, serv); - + set_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags); if (sk->sk_state == TCP_LISTEN) { dprintk("setting up TCP socket for listening\n"); set_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags); @@ -1402,7 +1063,7 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv) svsk->sk_reclen = 0; svsk->sk_tcplen = 0; - tp->nonagle = 1; /* disable Nagle's algorithm */ + tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; /* initialise setting must have enough space to * receive and respond to one request. @@ -1419,8 +1080,7 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv) } } -void -svc_sock_update_bufs(struct svc_serv *serv) +void svc_sock_update_bufs(struct svc_serv *serv) { /* * The number of server threads has changed. Update @@ -1441,282 +1101,7 @@ svc_sock_update_bufs(struct svc_serv *serv) } spin_unlock_bh(&serv->sv_lock); } - -/* - * Make sure that we don't have too many active connections. If we - * have, something must be dropped. - * - * There's no point in trying to do random drop here for DoS - * prevention. The NFS clients does 1 reconnect in 15 seconds. An - * attacker can easily beat that. - * - * The only somewhat efficient mechanism would be if drop old - * connections from the same IP first. But right now we don't even - * record the client IP in svc_sock. - */ -static void svc_check_conn_limits(struct svc_serv *serv) -{ - if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) { - struct svc_sock *svsk = NULL; - spin_lock_bh(&serv->sv_lock); - if (!list_empty(&serv->sv_tempsocks)) { - if (net_ratelimit()) { - /* Try to help the admin */ - printk(KERN_NOTICE "%s: too many open TCP " - "sockets, consider increasing the " - "number of nfsd threads\n", - serv->sv_name); - } - /* - * Always select the oldest socket. It's not fair, - * but so is life - */ - svsk = list_entry(serv->sv_tempsocks.prev, - struct svc_sock, - sk_xprt.xpt_list); - set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); - svc_xprt_get(&svsk->sk_xprt); - } - spin_unlock_bh(&serv->sv_lock); - - if (svsk) { - svc_xprt_enqueue(&svsk->sk_xprt); - svc_xprt_put(&svsk->sk_xprt); - } - } -} - -/* - * Receive the next request on any socket. This code is carefully - * organised not to touch any cachelines in the shared svc_serv - * structure, only cachelines in the local svc_pool. - */ -int -svc_recv(struct svc_rqst *rqstp, long timeout) -{ - struct svc_sock *svsk = NULL; - struct svc_serv *serv = rqstp->rq_server; - struct svc_pool *pool = rqstp->rq_pool; - int len, i; - int pages; - struct xdr_buf *arg; - DECLARE_WAITQUEUE(wait, current); - - dprintk("svc: server %p waiting for data (to = %ld)\n", - rqstp, timeout); - - if (rqstp->rq_sock) - printk(KERN_ERR - "svc_recv: service %p, socket not NULL!\n", - rqstp); - if (waitqueue_active(&rqstp->rq_wait)) - printk(KERN_ERR - "svc_recv: service %p, wait queue active!\n", - rqstp); - - - /* now allocate needed pages. If we get a failure, sleep briefly */ - pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; - for (i=0; i < pages ; i++) - while (rqstp->rq_pages[i] == NULL) { - struct page *p = alloc_page(GFP_KERNEL); - if (!p) - schedule_timeout_uninterruptible(msecs_to_jiffies(500)); - rqstp->rq_pages[i] = p; - } - rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ - BUG_ON(pages >= RPCSVC_MAXPAGES); - - /* Make arg->head point to first page and arg->pages point to rest */ - arg = &rqstp->rq_arg; - arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); - arg->head[0].iov_len = PAGE_SIZE; - arg->pages = rqstp->rq_pages + 1; - arg->page_base = 0; - /* save at least one page for response */ - arg->page_len = (pages-2)*PAGE_SIZE; - arg->len = (pages-1)*PAGE_SIZE; - arg->tail[0].iov_len = 0; - - try_to_freeze(); - cond_resched(); - if (signalled()) - return -EINTR; - - spin_lock_bh(&pool->sp_lock); - if ((svsk = svc_sock_dequeue(pool)) != NULL) { - rqstp->rq_sock = svsk; - svc_xprt_get(&svsk->sk_xprt); - rqstp->rq_reserved = serv->sv_max_mesg; - atomic_add(rqstp->rq_reserved, &svsk->sk_xprt.xpt_reserved); - } else { - /* No data pending. Go to sleep */ - svc_thread_enqueue(pool, rqstp); - - /* - * We have to be able to interrupt this wait - * to bring down the daemons ... - */ - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&rqstp->rq_wait, &wait); - spin_unlock_bh(&pool->sp_lock); - - schedule_timeout(timeout); - - try_to_freeze(); - - spin_lock_bh(&pool->sp_lock); - remove_wait_queue(&rqstp->rq_wait, &wait); - - if (!(svsk = rqstp->rq_sock)) { - svc_thread_dequeue(pool, rqstp); - spin_unlock_bh(&pool->sp_lock); - dprintk("svc: server %p, no data yet\n", rqstp); - return signalled()? -EINTR : -EAGAIN; - } - } - spin_unlock_bh(&pool->sp_lock); - - len = 0; - if (test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags)) { - dprintk("svc_recv: found XPT_CLOSE\n"); - svc_delete_xprt(&svsk->sk_xprt); - } else if (test_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags)) { - struct svc_xprt *newxpt; - newxpt = svsk->sk_xprt.xpt_ops->xpo_accept(&svsk->sk_xprt); - if (newxpt) { - /* - * We know this module_get will succeed because the - * listener holds a reference too - */ - __module_get(newxpt->xpt_class->xcl_owner); - svc_check_conn_limits(svsk->sk_xprt.xpt_server); - } - svc_xprt_received(&svsk->sk_xprt); - } else { - dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n", - rqstp, pool->sp_id, svsk, - atomic_read(&svsk->sk_xprt.xpt_ref.refcount)); - len = svsk->sk_xprt.xpt_ops->xpo_recvfrom(rqstp); - dprintk("svc: got len=%d\n", len); - } - - /* No data, incomplete (TCP) read, or accept() */ - if (len == 0 || len == -EAGAIN) { - rqstp->rq_res.len = 0; - svc_sock_release(rqstp); - return -EAGAIN; - } - svsk->sk_lastrecv = get_seconds(); - clear_bit(XPT_OLD, &svsk->sk_xprt.xpt_flags); - - rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); - rqstp->rq_chandle.defer = svc_defer; - - if (serv->sv_stats) - serv->sv_stats->netcnt++; - return len; -} - -/* - * Drop request - */ -void -svc_drop(struct svc_rqst *rqstp) -{ - dprintk("svc: socket %p dropped request\n", rqstp->rq_sock); - svc_sock_release(rqstp); -} - -/* - * Return reply to client. - */ -int -svc_send(struct svc_rqst *rqstp) -{ - struct svc_xprt *xprt; - int len; - struct xdr_buf *xb; - - xprt = rqstp->rq_xprt; - if (!xprt) - return -EFAULT; - - /* release the receive skb before sending the reply */ - rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); - - /* calculate over-all length */ - xb = & rqstp->rq_res; - xb->len = xb->head[0].iov_len + - xb->page_len + - xb->tail[0].iov_len; - - /* Grab mutex to serialize outgoing data. */ - mutex_lock(&xprt->xpt_mutex); - if (test_bit(XPT_DEAD, &xprt->xpt_flags)) - len = -ENOTCONN; - else - len = xprt->xpt_ops->xpo_sendto(rqstp); - mutex_unlock(&xprt->xpt_mutex); - svc_sock_release(rqstp); - - if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) - return 0; - return len; -} - -/* - * Timer function to close old temporary sockets, using - * a mark-and-sweep algorithm. - */ -static void -svc_age_temp_sockets(unsigned long closure) -{ - struct svc_serv *serv = (struct svc_serv *)closure; - struct svc_sock *svsk; - struct list_head *le, *next; - LIST_HEAD(to_be_aged); - - dprintk("svc_age_temp_sockets\n"); - - if (!spin_trylock_bh(&serv->sv_lock)) { - /* busy, try again 1 sec later */ - dprintk("svc_age_temp_sockets: busy\n"); - mod_timer(&serv->sv_temptimer, jiffies + HZ); - return; - } - - list_for_each_safe(le, next, &serv->sv_tempsocks) { - svsk = list_entry(le, struct svc_sock, sk_xprt.xpt_list); - - if (!test_and_set_bit(XPT_OLD, &svsk->sk_xprt.xpt_flags)) - continue; - if (atomic_read(&svsk->sk_xprt.xpt_ref.refcount) > 1 - || test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)) - continue; - svc_xprt_get(&svsk->sk_xprt); - list_move(le, &to_be_aged); - set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); - set_bit(XPT_DETACHED, &svsk->sk_xprt.xpt_flags); - } - spin_unlock_bh(&serv->sv_lock); - - while (!list_empty(&to_be_aged)) { - le = to_be_aged.next; - /* fiddling the sk_xprt.xpt_list node is safe 'cos we're XPT_DETACHED */ - list_del_init(le); - svsk = list_entry(le, struct svc_sock, sk_xprt.xpt_list); - - dprintk("queuing svsk %p for closing, %lu seconds old\n", - svsk, get_seconds() - svsk->sk_lastrecv); - - /* a thread will dequeue and close it soon */ - svc_xprt_enqueue(&svsk->sk_xprt); - svc_xprt_put(&svsk->sk_xprt); - } - - mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); -} +EXPORT_SYMBOL(svc_sock_update_bufs); /* * Initialize socket for RPC use and create svc_sock struct @@ -1729,7 +1114,6 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv, struct svc_sock *svsk; struct sock *inet; int pmap_register = !(flags & SVC_SOCK_ANONYMOUS); - int is_temporary = flags & SVC_SOCK_TEMPORARY; dprintk("svc: svc_setup_socket %p\n", sock); if (!(svsk = kzalloc(sizeof(*svsk), GFP_KERNEL))) { @@ -1749,16 +1133,12 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv, return NULL; } - set_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags); inet->sk_user_data = svsk; svsk->sk_sock = sock; svsk->sk_sk = inet; svsk->sk_ostate = inet->sk_state_change; svsk->sk_odata = inet->sk_data_ready; svsk->sk_owspace = inet->sk_write_space; - svsk->sk_lastrecv = get_seconds(); - spin_lock_init(&svsk->sk_lock); - INIT_LIST_HEAD(&svsk->sk_deferred); /* Initialize the socket */ if (sock->type == SOCK_DGRAM) @@ -1766,24 +1146,6 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv, else svc_tcp_init(svsk, serv); - spin_lock_bh(&serv->sv_lock); - if (is_temporary) { - set_bit(XPT_TEMP, &svsk->sk_xprt.xpt_flags); - list_add(&svsk->sk_xprt.xpt_list, &serv->sv_tempsocks); - serv->sv_tmpcnt++; - if (serv->sv_temptimer.function == NULL) { - /* setup timer to age temp sockets */ - setup_timer(&serv->sv_temptimer, svc_age_temp_sockets, - (unsigned long)serv); - mod_timer(&serv->sv_temptimer, - jiffies + svc_conn_age_period * HZ); - } - } else { - clear_bit(XPT_TEMP, &svsk->sk_xprt.xpt_flags); - list_add(&svsk->sk_xprt.xpt_list, &serv->sv_permsocks); - } - spin_unlock_bh(&serv->sv_lock); - dprintk("svc: svc_setup_socket created %p (inet %p)\n", svsk, svsk->sk_sk); @@ -1811,6 +1173,15 @@ int svc_addsock(struct svc_serv *serv, else { svsk = svc_setup_socket(serv, so, &err, SVC_SOCK_DEFAULTS); if (svsk) { + struct sockaddr_storage addr; + struct sockaddr *sin = (struct sockaddr *)&addr; + int salen; + if (kernel_getsockname(svsk->sk_sock, sin, &salen) == 0) + svc_xprt_set_local(&svsk->sk_xprt, sin, salen); + clear_bit(XPT_TEMP, &svsk->sk_xprt.xpt_flags); + spin_lock_bh(&serv->sv_lock); + list_add(&svsk->sk_xprt.xpt_list, &serv->sv_permsocks); + spin_unlock_bh(&serv->sv_lock); svc_xprt_received(&svsk->sk_xprt); err = 0; } @@ -1836,7 +1207,10 @@ static struct svc_xprt *svc_create_socket(struct svc_serv *serv, struct socket *sock; int error; int type; - char buf[RPC_MAX_ADDRBUFLEN]; + struct sockaddr_storage addr; + struct sockaddr *newsin = (struct sockaddr *)&addr; + int newlen; + RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]); dprintk("svc: svc_create_socket(%s, %d, %s)\n", serv->sv_program->pg_name, protocol, @@ -1861,13 +1235,18 @@ static struct svc_xprt *svc_create_socket(struct svc_serv *serv, if (error < 0) goto bummer; + newlen = len; + error = kernel_getsockname(sock, newsin, &newlen); + if (error < 0) + goto bummer; + if (protocol == IPPROTO_TCP) { if ((error = kernel_listen(sock, 64)) < 0) goto bummer; } if ((svsk = svc_setup_socket(serv, sock, &error, flags)) != NULL) { - svc_xprt_received(&svsk->sk_xprt); + svc_xprt_set_local(&svsk->sk_xprt, newsin, newlen); return (struct svc_xprt *)svsk; } @@ -1902,169 +1281,9 @@ static void svc_sock_free(struct svc_xprt *xprt) struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); dprintk("svc: svc_sock_free(%p)\n", svsk); - if (svsk->sk_info_authunix != NULL) - svcauth_unix_info_release(svsk->sk_info_authunix); if (svsk->sk_sock->file) sockfd_put(svsk->sk_sock); else sock_release(svsk->sk_sock); kfree(svsk); } - -/* - * Remove a dead transport - */ -static void svc_delete_xprt(struct svc_xprt *xprt) -{ - struct svc_serv *serv = xprt->xpt_server; - - dprintk("svc: svc_delete_xprt(%p)\n", xprt); - xprt->xpt_ops->xpo_detach(xprt); - - spin_lock_bh(&serv->sv_lock); - if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) - list_del_init(&xprt->xpt_list); - /* - * We used to delete the transport from whichever list - * it's sk_xprt.xpt_ready node was on, but we don't actually - * need to. This is because the only time we're called - * while still attached to a queue, the queue itself - * is about to be destroyed (in svc_destroy). - */ - if (!test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) { - BUG_ON(atomic_read(&xprt->xpt_ref.refcount) < 2); - if (test_bit(XPT_TEMP, &xprt->xpt_flags)) - serv->sv_tmpcnt--; - svc_xprt_put(xprt); - } - spin_unlock_bh(&serv->sv_lock); -} - -static void svc_close_xprt(struct svc_xprt *xprt) -{ - set_bit(XPT_CLOSE, &xprt->xpt_flags); - if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) - /* someone else will have to effect the close */ - return; - - svc_xprt_get(xprt); - svc_delete_xprt(xprt); - clear_bit(XPT_BUSY, &xprt->xpt_flags); - svc_xprt_put(xprt); -} - -void svc_close_all(struct list_head *xprt_list) -{ - struct svc_xprt *xprt; - struct svc_xprt *tmp; - - list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { - set_bit(XPT_CLOSE, &xprt->xpt_flags); - if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { - /* Waiting to be processed, but no threads left, - * So just remove it from the waiting list - */ - list_del_init(&xprt->xpt_ready); - clear_bit(XPT_BUSY, &xprt->xpt_flags); - } - svc_close_xprt(xprt); - } -} - -/* - * Handle defer and revisit of requests - */ - -static void svc_revisit(struct cache_deferred_req *dreq, int too_many) -{ - struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle); - struct svc_sock *svsk; - - if (too_many) { - svc_xprt_put(&dr->svsk->sk_xprt); - kfree(dr); - return; - } - dprintk("revisit queued\n"); - svsk = dr->svsk; - dr->svsk = NULL; - spin_lock(&svsk->sk_lock); - list_add(&dr->handle.recent, &svsk->sk_deferred); - spin_unlock(&svsk->sk_lock); - set_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags); - svc_xprt_enqueue(&svsk->sk_xprt); - svc_xprt_put(&svsk->sk_xprt); -} - -static struct cache_deferred_req * -svc_defer(struct cache_req *req) -{ - struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); - int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len); - struct svc_deferred_req *dr; - - if (rqstp->rq_arg.page_len) - return NULL; /* if more than a page, give up FIXME */ - if (rqstp->rq_deferred) { - dr = rqstp->rq_deferred; - rqstp->rq_deferred = NULL; - } else { - int skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; - /* FIXME maybe discard if size too large */ - dr = kmalloc(size, GFP_KERNEL); - if (dr == NULL) - return NULL; - - dr->handle.owner = rqstp->rq_server; - dr->prot = rqstp->rq_prot; - memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); - dr->addrlen = rqstp->rq_addrlen; - dr->daddr = rqstp->rq_daddr; - dr->argslen = rqstp->rq_arg.len >> 2; - memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2); - } - svc_xprt_get(rqstp->rq_xprt); - dr->svsk = rqstp->rq_sock; - - dr->handle.revisit = svc_revisit; - return &dr->handle; -} - -/* - * recv data from a deferred request into an active one - */ -static int svc_deferred_recv(struct svc_rqst *rqstp) -{ - struct svc_deferred_req *dr = rqstp->rq_deferred; - - rqstp->rq_arg.head[0].iov_base = dr->args; - rqstp->rq_arg.head[0].iov_len = dr->argslen<<2; - rqstp->rq_arg.page_len = 0; - rqstp->rq_arg.len = dr->argslen<<2; - rqstp->rq_prot = dr->prot; - memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); - rqstp->rq_addrlen = dr->addrlen; - rqstp->rq_daddr = dr->daddr; - rqstp->rq_respages = rqstp->rq_pages; - return dr->argslen<<2; -} - - -static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) -{ - struct svc_deferred_req *dr = NULL; - - if (!test_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags)) - return NULL; - spin_lock(&svsk->sk_lock); - clear_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags); - if (!list_empty(&svsk->sk_deferred)) { - dr = list_entry(svsk->sk_deferred.next, - struct svc_deferred_req, - handle.recent); - list_del_init(&dr->handle.recent); - set_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags); - } - spin_unlock(&svsk->sk_lock); - return dr; -}