]> Pileus Git - ~andy/linux/blob - net/sunrpc/clnt.c
0bb23e8e9d0c0714766dbd0ab46eb0e785076813
[~andy/linux] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/rpc_pipe_fs.h>
35 #include <linux/sunrpc/metrics.h>
36
37
38 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
39
40 #ifdef RPC_DEBUG
41 # define RPCDBG_FACILITY        RPCDBG_CALL
42 #endif
43
44 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
45
46
47 static void     call_start(struct rpc_task *task);
48 static void     call_reserve(struct rpc_task *task);
49 static void     call_reserveresult(struct rpc_task *task);
50 static void     call_allocate(struct rpc_task *task);
51 static void     call_encode(struct rpc_task *task);
52 static void     call_decode(struct rpc_task *task);
53 static void     call_bind(struct rpc_task *task);
54 static void     call_bind_status(struct rpc_task *task);
55 static void     call_transmit(struct rpc_task *task);
56 static void     call_status(struct rpc_task *task);
57 static void     call_transmit_status(struct rpc_task *task);
58 static void     call_refresh(struct rpc_task *task);
59 static void     call_refreshresult(struct rpc_task *task);
60 static void     call_timeout(struct rpc_task *task);
61 static void     call_connect(struct rpc_task *task);
62 static void     call_connect_status(struct rpc_task *task);
63 static u32 *    call_header(struct rpc_task *task);
64 static u32 *    call_verify(struct rpc_task *task);
65
66
67 static int
68 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
69 {
70         static uint32_t clntid;
71         int error;
72
73         if (dir_name == NULL)
74                 return 0;
75         for (;;) {
76                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
77                                 "%s/clnt%x", dir_name,
78                                 (unsigned int)clntid++);
79                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
80                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
81                 if (!IS_ERR(clnt->cl_dentry))
82                         return 0;
83                 error = PTR_ERR(clnt->cl_dentry);
84                 if (error != -EEXIST) {
85                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
86                                         clnt->cl_pathname, error);
87                         return error;
88                 }
89         }
90 }
91
92 /*
93  * Create an RPC client
94  * FIXME: This should also take a flags argument (as in task->tk_flags).
95  * It's called (among others) from pmap_create_client, which may in
96  * turn be called by an async task. In this case, rpciod should not be
97  * made to sleep too long.
98  */
99 struct rpc_clnt *
100 rpc_new_client(struct rpc_xprt *xprt, char *servname,
101                   struct rpc_program *program, u32 vers,
102                   rpc_authflavor_t flavor)
103 {
104         struct rpc_version      *version;
105         struct rpc_clnt         *clnt = NULL;
106         struct rpc_auth         *auth;
107         int err;
108         int len;
109
110         dprintk("RPC: creating %s client for %s (xprt %p)\n",
111                 program->name, servname, xprt);
112
113         err = -EINVAL;
114         if (!xprt)
115                 goto out_no_xprt;
116         if (vers >= program->nrvers || !(version = program->version[vers]))
117                 goto out_err;
118
119         err = -ENOMEM;
120         clnt = kmalloc(sizeof(*clnt), GFP_KERNEL);
121         if (!clnt)
122                 goto out_err;
123         memset(clnt, 0, sizeof(*clnt));
124         atomic_set(&clnt->cl_users, 0);
125         atomic_set(&clnt->cl_count, 1);
126         clnt->cl_parent = clnt;
127
128         clnt->cl_server = clnt->cl_inline_name;
129         len = strlen(servname) + 1;
130         if (len > sizeof(clnt->cl_inline_name)) {
131                 char *buf = kmalloc(len, GFP_KERNEL);
132                 if (buf != 0)
133                         clnt->cl_server = buf;
134                 else
135                         len = sizeof(clnt->cl_inline_name);
136         }
137         strlcpy(clnt->cl_server, servname, len);
138
139         clnt->cl_xprt     = xprt;
140         clnt->cl_procinfo = version->procs;
141         clnt->cl_maxproc  = version->nrprocs;
142         clnt->cl_protname = program->name;
143         clnt->cl_pmap     = &clnt->cl_pmap_default;
144         clnt->cl_port     = xprt->addr.sin_port;
145         clnt->cl_prog     = program->number;
146         clnt->cl_vers     = version->number;
147         clnt->cl_prot     = xprt->prot;
148         clnt->cl_stats    = program->stats;
149         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
150         rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
151
152         if (!clnt->cl_port)
153                 clnt->cl_autobind = 1;
154
155         clnt->cl_rtt = &clnt->cl_rtt_default;
156         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
157
158         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
159         if (err < 0)
160                 goto out_no_path;
161
162         auth = rpcauth_create(flavor, clnt);
163         if (IS_ERR(auth)) {
164                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
165                                 flavor);
166                 err = PTR_ERR(auth);
167                 goto out_no_auth;
168         }
169
170         /* save the nodename */
171         clnt->cl_nodelen = strlen(system_utsname.nodename);
172         if (clnt->cl_nodelen > UNX_MAXNODENAME)
173                 clnt->cl_nodelen = UNX_MAXNODENAME;
174         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
175         return clnt;
176
177 out_no_auth:
178         rpc_rmdir(clnt->cl_pathname);
179 out_no_path:
180         if (clnt->cl_server != clnt->cl_inline_name)
181                 kfree(clnt->cl_server);
182         kfree(clnt);
183 out_err:
184         xprt_destroy(xprt);
185 out_no_xprt:
186         return ERR_PTR(err);
187 }
188
189 /**
190  * Create an RPC client
191  * @xprt - pointer to xprt struct
192  * @servname - name of server
193  * @info - rpc_program
194  * @version - rpc_program version
195  * @authflavor - rpc_auth flavour to use
196  *
197  * Creates an RPC client structure, then pings the server in order to
198  * determine if it is up, and if it supports this program and version.
199  *
200  * This function should never be called by asynchronous tasks such as
201  * the portmapper.
202  */
203 struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
204                 struct rpc_program *info, u32 version, rpc_authflavor_t authflavor)
205 {
206         struct rpc_clnt *clnt;
207         int err;
208         
209         clnt = rpc_new_client(xprt, servname, info, version, authflavor);
210         if (IS_ERR(clnt))
211                 return clnt;
212         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
213         if (err == 0)
214                 return clnt;
215         rpc_shutdown_client(clnt);
216         return ERR_PTR(err);
217 }
218
219 /*
220  * This function clones the RPC client structure. It allows us to share the
221  * same transport while varying parameters such as the authentication
222  * flavour.
223  */
224 struct rpc_clnt *
225 rpc_clone_client(struct rpc_clnt *clnt)
226 {
227         struct rpc_clnt *new;
228
229         new = kmalloc(sizeof(*new), GFP_KERNEL);
230         if (!new)
231                 goto out_no_clnt;
232         memcpy(new, clnt, sizeof(*new));
233         atomic_set(&new->cl_count, 1);
234         atomic_set(&new->cl_users, 0);
235         new->cl_parent = clnt;
236         atomic_inc(&clnt->cl_count);
237         /* Duplicate portmapper */
238         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
239         /* Turn off autobind on clones */
240         new->cl_autobind = 0;
241         new->cl_oneshot = 0;
242         new->cl_dead = 0;
243         dget(new->cl_dentry);
244         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
245         if (new->cl_auth)
246                 atomic_inc(&new->cl_auth->au_count);
247         new->cl_pmap            = &new->cl_pmap_default;
248         new->cl_metrics         = rpc_alloc_iostats(clnt);
249         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
250         return new;
251 out_no_clnt:
252         printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
253         return ERR_PTR(-ENOMEM);
254 }
255
256 /*
257  * Properly shut down an RPC client, terminating all outstanding
258  * requests. Note that we must be certain that cl_oneshot and
259  * cl_dead are cleared, or else the client would be destroyed
260  * when the last task releases it.
261  */
262 int
263 rpc_shutdown_client(struct rpc_clnt *clnt)
264 {
265         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
266                         clnt->cl_protname, clnt->cl_server,
267                         atomic_read(&clnt->cl_users));
268
269         while (atomic_read(&clnt->cl_users) > 0) {
270                 /* Don't let rpc_release_client destroy us */
271                 clnt->cl_oneshot = 0;
272                 clnt->cl_dead = 0;
273                 rpc_killall_tasks(clnt);
274                 wait_event_timeout(destroy_wait,
275                         !atomic_read(&clnt->cl_users), 1*HZ);
276         }
277
278         if (atomic_read(&clnt->cl_users) < 0) {
279                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
280                                 clnt, atomic_read(&clnt->cl_users));
281 #ifdef RPC_DEBUG
282                 rpc_show_tasks();
283 #endif
284                 BUG();
285         }
286
287         return rpc_destroy_client(clnt);
288 }
289
290 /*
291  * Delete an RPC client
292  */
293 int
294 rpc_destroy_client(struct rpc_clnt *clnt)
295 {
296         if (!atomic_dec_and_test(&clnt->cl_count))
297                 return 1;
298         BUG_ON(atomic_read(&clnt->cl_users) != 0);
299
300         dprintk("RPC: destroying %s client for %s\n",
301                         clnt->cl_protname, clnt->cl_server);
302         if (clnt->cl_auth) {
303                 rpcauth_destroy(clnt->cl_auth);
304                 clnt->cl_auth = NULL;
305         }
306         if (clnt->cl_parent != clnt) {
307                 rpc_destroy_client(clnt->cl_parent);
308                 goto out_free;
309         }
310         if (clnt->cl_pathname[0])
311                 rpc_rmdir(clnt->cl_pathname);
312         if (clnt->cl_xprt) {
313                 xprt_destroy(clnt->cl_xprt);
314                 clnt->cl_xprt = NULL;
315         }
316         if (clnt->cl_server != clnt->cl_inline_name)
317                 kfree(clnt->cl_server);
318 out_free:
319         rpc_free_iostats(clnt->cl_metrics);
320         clnt->cl_metrics = NULL;
321         if (clnt->cl_dentry)
322                 dput(clnt->cl_dentry);
323         kfree(clnt);
324         return 0;
325 }
326
327 /*
328  * Release an RPC client
329  */
330 void
331 rpc_release_client(struct rpc_clnt *clnt)
332 {
333         dprintk("RPC:      rpc_release_client(%p, %d)\n",
334                                 clnt, atomic_read(&clnt->cl_users));
335
336         if (!atomic_dec_and_test(&clnt->cl_users))
337                 return;
338         wake_up(&destroy_wait);
339         if (clnt->cl_oneshot || clnt->cl_dead)
340                 rpc_destroy_client(clnt);
341 }
342
343 /**
344  * rpc_bind_new_program - bind a new RPC program to an existing client
345  * @old - old rpc_client
346  * @program - rpc program to set
347  * @vers - rpc program version
348  *
349  * Clones the rpc client and sets up a new RPC program. This is mainly
350  * of use for enabling different RPC programs to share the same transport.
351  * The Sun NFSv2/v3 ACL protocol can do this.
352  */
353 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
354                                       struct rpc_program *program,
355                                       int vers)
356 {
357         struct rpc_clnt *clnt;
358         struct rpc_version *version;
359         int err;
360
361         BUG_ON(vers >= program->nrvers || !program->version[vers]);
362         version = program->version[vers];
363         clnt = rpc_clone_client(old);
364         if (IS_ERR(clnt))
365                 goto out;
366         clnt->cl_procinfo = version->procs;
367         clnt->cl_maxproc  = version->nrprocs;
368         clnt->cl_protname = program->name;
369         clnt->cl_prog     = program->number;
370         clnt->cl_vers     = version->number;
371         clnt->cl_stats    = program->stats;
372         err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
373         if (err != 0) {
374                 rpc_shutdown_client(clnt);
375                 clnt = ERR_PTR(err);
376         }
377 out:    
378         return clnt;
379 }
380
381 /*
382  * Default callback for async RPC calls
383  */
384 static void
385 rpc_default_callback(struct rpc_task *task, void *data)
386 {
387 }
388
389 static const struct rpc_call_ops rpc_default_ops = {
390         .rpc_call_done = rpc_default_callback,
391 };
392
393 /*
394  *      Export the signal mask handling for synchronous code that
395  *      sleeps on RPC calls
396  */
397 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
398  
399 static void rpc_save_sigmask(sigset_t *oldset, int intr)
400 {
401         unsigned long   sigallow = sigmask(SIGKILL);
402         sigset_t sigmask;
403
404         /* Block all signals except those listed in sigallow */
405         if (intr)
406                 sigallow |= RPC_INTR_SIGNALS;
407         siginitsetinv(&sigmask, sigallow);
408         sigprocmask(SIG_BLOCK, &sigmask, oldset);
409 }
410
411 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
412 {
413         rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
414 }
415
416 static inline void rpc_restore_sigmask(sigset_t *oldset)
417 {
418         sigprocmask(SIG_SETMASK, oldset, NULL);
419 }
420
421 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
422 {
423         rpc_save_sigmask(oldset, clnt->cl_intr);
424 }
425
426 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
427 {
428         rpc_restore_sigmask(oldset);
429 }
430
431 /*
432  * New rpc_call implementation
433  */
434 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
435 {
436         struct rpc_task *task;
437         sigset_t        oldset;
438         int             status;
439
440         /* If this client is slain all further I/O fails */
441         if (clnt->cl_dead) 
442                 return -EIO;
443
444         BUG_ON(flags & RPC_TASK_ASYNC);
445
446         status = -ENOMEM;
447         task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
448         if (task == NULL)
449                 goto out;
450
451         /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
452         rpc_task_sigmask(task, &oldset);
453
454         rpc_call_setup(task, msg, 0);
455
456         /* Set up the call info struct and execute the task */
457         status = task->tk_status;
458         if (status == 0) {
459                 atomic_inc(&task->tk_count);
460                 status = rpc_execute(task);
461                 if (status == 0)
462                         status = task->tk_status;
463         }
464         rpc_restore_sigmask(&oldset);
465         rpc_release_task(task);
466 out:
467         return status;
468 }
469
470 /*
471  * New rpc_call implementation
472  */
473 int
474 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
475                const struct rpc_call_ops *tk_ops, void *data)
476 {
477         struct rpc_task *task;
478         sigset_t        oldset;
479         int             status;
480
481         /* If this client is slain all further I/O fails */
482         if (clnt->cl_dead) 
483                 return -EIO;
484
485         flags |= RPC_TASK_ASYNC;
486
487         /* Create/initialize a new RPC task */
488         status = -ENOMEM;
489         if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
490                 goto out;
491
492         /* Mask signals on GSS_AUTH upcalls */
493         rpc_task_sigmask(task, &oldset);                
494
495         rpc_call_setup(task, msg, 0);
496
497         /* Set up the call info struct and execute the task */
498         status = task->tk_status;
499         if (status == 0)
500                 rpc_execute(task);
501         else
502                 rpc_release_task(task);
503
504         rpc_restore_sigmask(&oldset);           
505 out:
506         return status;
507 }
508
509
510 void
511 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
512 {
513         task->tk_msg   = *msg;
514         task->tk_flags |= flags;
515         /* Bind the user cred */
516         if (task->tk_msg.rpc_cred != NULL)
517                 rpcauth_holdcred(task);
518         else
519                 rpcauth_bindcred(task);
520
521         if (task->tk_status == 0)
522                 task->tk_action = call_start;
523         else
524                 task->tk_action = rpc_exit_task;
525 }
526
527 void
528 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
529 {
530         struct rpc_xprt *xprt = clnt->cl_xprt;
531         if (xprt->ops->set_buffer_size)
532                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
533 }
534
535 /*
536  * Return size of largest payload RPC client can support, in bytes
537  *
538  * For stream transports, this is one RPC record fragment (see RFC
539  * 1831), as we don't support multi-record requests yet.  For datagram
540  * transports, this is the size of an IP packet minus the IP, UDP, and
541  * RPC header sizes.
542  */
543 size_t rpc_max_payload(struct rpc_clnt *clnt)
544 {
545         return clnt->cl_xprt->max_payload;
546 }
547 EXPORT_SYMBOL(rpc_max_payload);
548
549 /**
550  * rpc_force_rebind - force transport to check that remote port is unchanged
551  * @clnt: client to rebind
552  *
553  */
554 void rpc_force_rebind(struct rpc_clnt *clnt)
555 {
556         if (clnt->cl_autobind)
557                 clnt->cl_port = 0;
558 }
559 EXPORT_SYMBOL(rpc_force_rebind);
560
561 /*
562  * Restart an (async) RPC call. Usually called from within the
563  * exit handler.
564  */
565 void
566 rpc_restart_call(struct rpc_task *task)
567 {
568         if (RPC_ASSASSINATED(task))
569                 return;
570
571         task->tk_action = call_start;
572 }
573
574 /*
575  * 0.  Initial state
576  *
577  *     Other FSM states can be visited zero or more times, but
578  *     this state is visited exactly once for each RPC.
579  */
580 static void
581 call_start(struct rpc_task *task)
582 {
583         struct rpc_clnt *clnt = task->tk_client;
584
585         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
586                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
587                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
588
589         /* Increment call count */
590         task->tk_msg.rpc_proc->p_count++;
591         clnt->cl_stats->rpccnt++;
592         task->tk_action = call_reserve;
593 }
594
595 /*
596  * 1.   Reserve an RPC call slot
597  */
598 static void
599 call_reserve(struct rpc_task *task)
600 {
601         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
602
603         if (!rpcauth_uptodatecred(task)) {
604                 task->tk_action = call_refresh;
605                 return;
606         }
607
608         task->tk_status  = 0;
609         task->tk_action  = call_reserveresult;
610         xprt_reserve(task);
611 }
612
613 /*
614  * 1b.  Grok the result of xprt_reserve()
615  */
616 static void
617 call_reserveresult(struct rpc_task *task)
618 {
619         int status = task->tk_status;
620
621         dprintk("RPC: %4d call_reserveresult (status %d)\n",
622                                 task->tk_pid, task->tk_status);
623
624         /*
625          * After a call to xprt_reserve(), we must have either
626          * a request slot or else an error status.
627          */
628         task->tk_status = 0;
629         if (status >= 0) {
630                 if (task->tk_rqstp) {
631                         task->tk_action = call_allocate;
632                         return;
633                 }
634
635                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
636                                 __FUNCTION__, status);
637                 rpc_exit(task, -EIO);
638                 return;
639         }
640
641         /*
642          * Even though there was an error, we may have acquired
643          * a request slot somehow.  Make sure not to leak it.
644          */
645         if (task->tk_rqstp) {
646                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
647                                 __FUNCTION__, status);
648                 xprt_release(task);
649         }
650
651         switch (status) {
652         case -EAGAIN:   /* woken up; retry */
653                 task->tk_action = call_reserve;
654                 return;
655         case -EIO:      /* probably a shutdown */
656                 break;
657         default:
658                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
659                                 __FUNCTION__, status);
660                 break;
661         }
662         rpc_exit(task, status);
663 }
664
665 /*
666  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
667  *      (Note: buffer memory is freed in xprt_release).
668  */
669 static void
670 call_allocate(struct rpc_task *task)
671 {
672         struct rpc_rqst *req = task->tk_rqstp;
673         struct rpc_xprt *xprt = task->tk_xprt;
674         unsigned int    bufsiz;
675
676         dprintk("RPC: %4d call_allocate (status %d)\n", 
677                                 task->tk_pid, task->tk_status);
678         task->tk_action = call_bind;
679         if (req->rq_buffer)
680                 return;
681
682         /* FIXME: compute buffer requirements more exactly using
683          * auth->au_wslack */
684         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
685
686         if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
687                 return;
688         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
689
690         if (RPC_IS_ASYNC(task) || !signalled()) {
691                 xprt_release(task);
692                 task->tk_action = call_reserve;
693                 rpc_delay(task, HZ>>4);
694                 return;
695         }
696
697         rpc_exit(task, -ERESTARTSYS);
698 }
699
700 static inline int
701 rpc_task_need_encode(struct rpc_task *task)
702 {
703         return task->tk_rqstp->rq_snd_buf.len == 0;
704 }
705
706 static inline void
707 rpc_task_force_reencode(struct rpc_task *task)
708 {
709         task->tk_rqstp->rq_snd_buf.len = 0;
710 }
711
712 /*
713  * 3.   Encode arguments of an RPC call
714  */
715 static void
716 call_encode(struct rpc_task *task)
717 {
718         struct rpc_rqst *req = task->tk_rqstp;
719         struct xdr_buf *sndbuf = &req->rq_snd_buf;
720         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
721         unsigned int    bufsiz;
722         kxdrproc_t      encode;
723         u32             *p;
724
725         dprintk("RPC: %4d call_encode (status %d)\n", 
726                                 task->tk_pid, task->tk_status);
727
728         /* Default buffer setup */
729         bufsiz = req->rq_bufsize >> 1;
730         sndbuf->head[0].iov_base = (void *)req->rq_buffer;
731         sndbuf->head[0].iov_len  = bufsiz;
732         sndbuf->tail[0].iov_len  = 0;
733         sndbuf->page_len         = 0;
734         sndbuf->len              = 0;
735         sndbuf->buflen           = bufsiz;
736         rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
737         rcvbuf->head[0].iov_len  = bufsiz;
738         rcvbuf->tail[0].iov_len  = 0;
739         rcvbuf->page_len         = 0;
740         rcvbuf->len              = 0;
741         rcvbuf->buflen           = bufsiz;
742
743         /* Encode header and provided arguments */
744         encode = task->tk_msg.rpc_proc->p_encode;
745         if (!(p = call_header(task))) {
746                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
747                 rpc_exit(task, -EIO);
748                 return;
749         }
750         if (encode == NULL)
751                 return;
752
753         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
754                         task->tk_msg.rpc_argp);
755         if (task->tk_status == -ENOMEM) {
756                 /* XXX: Is this sane? */
757                 rpc_delay(task, 3*HZ);
758                 task->tk_status = -EAGAIN;
759         }
760 }
761
762 /*
763  * 4.   Get the server port number if not yet set
764  */
765 static void
766 call_bind(struct rpc_task *task)
767 {
768         struct rpc_clnt *clnt = task->tk_client;
769
770         dprintk("RPC: %4d call_bind (status %d)\n",
771                                 task->tk_pid, task->tk_status);
772
773         task->tk_action = call_connect;
774         if (!clnt->cl_port) {
775                 task->tk_action = call_bind_status;
776                 task->tk_timeout = task->tk_xprt->bind_timeout;
777                 rpc_getport(task, clnt);
778         }
779 }
780
781 /*
782  * 4a.  Sort out bind result
783  */
784 static void
785 call_bind_status(struct rpc_task *task)
786 {
787         int status = -EACCES;
788
789         if (task->tk_status >= 0) {
790                 dprintk("RPC: %4d call_bind_status (status %d)\n",
791                                         task->tk_pid, task->tk_status);
792                 task->tk_status = 0;
793                 task->tk_action = call_connect;
794                 return;
795         }
796
797         switch (task->tk_status) {
798         case -EACCES:
799                 dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n",
800                                 task->tk_pid);
801                 rpc_delay(task, 3*HZ);
802                 goto retry_bind;
803         case -ETIMEDOUT:
804                 dprintk("RPC: %4d rpcbind request timed out\n",
805                                 task->tk_pid);
806                 if (RPC_IS_SOFT(task)) {
807                         status = -EIO;
808                         break;
809                 }
810                 goto retry_bind;
811         case -EPFNOSUPPORT:
812                 dprintk("RPC: %4d remote rpcbind service unavailable\n",
813                                 task->tk_pid);
814                 break;
815         case -EPROTONOSUPPORT:
816                 dprintk("RPC: %4d remote rpcbind version 2 unavailable\n",
817                                 task->tk_pid);
818                 break;
819         default:
820                 dprintk("RPC: %4d unrecognized rpcbind error (%d)\n",
821                                 task->tk_pid, -task->tk_status);
822                 status = -EIO;
823                 break;
824         }
825
826         rpc_exit(task, status);
827         return;
828
829 retry_bind:
830         task->tk_status = 0;
831         task->tk_action = call_bind;
832         return;
833 }
834
835 /*
836  * 4b.  Connect to the RPC server
837  */
838 static void
839 call_connect(struct rpc_task *task)
840 {
841         struct rpc_xprt *xprt = task->tk_xprt;
842
843         dprintk("RPC: %4d call_connect xprt %p %s connected\n",
844                         task->tk_pid, xprt,
845                         (xprt_connected(xprt) ? "is" : "is not"));
846
847         task->tk_action = call_transmit;
848         if (!xprt_connected(xprt)) {
849                 task->tk_action = call_connect_status;
850                 if (task->tk_status < 0)
851                         return;
852                 xprt_connect(task);
853         }
854 }
855
856 /*
857  * 4c.  Sort out connect result
858  */
859 static void
860 call_connect_status(struct rpc_task *task)
861 {
862         struct rpc_clnt *clnt = task->tk_client;
863         int status = task->tk_status;
864
865         dprintk("RPC: %5u call_connect_status (status %d)\n", 
866                                 task->tk_pid, task->tk_status);
867
868         task->tk_status = 0;
869         if (status >= 0) {
870                 clnt->cl_stats->netreconn++;
871                 task->tk_action = call_transmit;
872                 return;
873         }
874
875         /* Something failed: remote service port may have changed */
876         rpc_force_rebind(clnt);
877
878         switch (status) {
879         case -ENOTCONN:
880         case -ETIMEDOUT:
881         case -EAGAIN:
882                 task->tk_action = call_bind;
883                 break;
884         default:
885                 rpc_exit(task, -EIO);
886                 break;
887         }
888 }
889
890 /*
891  * 5.   Transmit the RPC request, and wait for reply
892  */
893 static void
894 call_transmit(struct rpc_task *task)
895 {
896         dprintk("RPC: %4d call_transmit (status %d)\n", 
897                                 task->tk_pid, task->tk_status);
898
899         task->tk_action = call_status;
900         if (task->tk_status < 0)
901                 return;
902         task->tk_status = xprt_prepare_transmit(task);
903         if (task->tk_status != 0)
904                 return;
905         /* Encode here so that rpcsec_gss can use correct sequence number. */
906         if (rpc_task_need_encode(task)) {
907                 task->tk_rqstp->rq_bytes_sent = 0;
908                 call_encode(task);
909                 /* Did the encode result in an error condition? */
910                 if (task->tk_status != 0)
911                         goto out_nosend;
912         }
913         task->tk_action = call_transmit_status;
914         xprt_transmit(task);
915         if (task->tk_status < 0)
916                 return;
917         if (!task->tk_msg.rpc_proc->p_decode) {
918                 task->tk_action = rpc_exit_task;
919                 rpc_wake_up_task(task);
920         }
921         return;
922 out_nosend:
923         /* release socket write lock before attempting to handle error */
924         xprt_abort_transmit(task);
925         rpc_task_force_reencode(task);
926 }
927
928 /*
929  * 6.   Sort out the RPC call status
930  */
931 static void
932 call_status(struct rpc_task *task)
933 {
934         struct rpc_clnt *clnt = task->tk_client;
935         struct rpc_rqst *req = task->tk_rqstp;
936         int             status;
937
938         if (req->rq_received > 0 && !req->rq_bytes_sent)
939                 task->tk_status = req->rq_received;
940
941         dprintk("RPC: %4d call_status (status %d)\n", 
942                                 task->tk_pid, task->tk_status);
943
944         status = task->tk_status;
945         if (status >= 0) {
946                 task->tk_action = call_decode;
947                 return;
948         }
949
950         task->tk_status = 0;
951         switch(status) {
952         case -ETIMEDOUT:
953                 task->tk_action = call_timeout;
954                 break;
955         case -ECONNREFUSED:
956         case -ENOTCONN:
957                 rpc_force_rebind(clnt);
958                 task->tk_action = call_bind;
959                 break;
960         case -EAGAIN:
961                 task->tk_action = call_transmit;
962                 break;
963         case -EIO:
964                 /* shutdown or soft timeout */
965                 rpc_exit(task, status);
966                 break;
967         default:
968                 printk("%s: RPC call returned error %d\n",
969                                clnt->cl_protname, -status);
970                 rpc_exit(task, status);
971                 break;
972         }
973 }
974
975 /*
976  * 6a.  Handle transmission errors.
977  */
978 static void
979 call_transmit_status(struct rpc_task *task)
980 {
981         if (task->tk_status != -EAGAIN)
982                 rpc_task_force_reencode(task);
983         call_status(task);
984 }
985
986 /*
987  * 6b.  Handle RPC timeout
988  *      We do not release the request slot, so we keep using the
989  *      same XID for all retransmits.
990  */
991 static void
992 call_timeout(struct rpc_task *task)
993 {
994         struct rpc_clnt *clnt = task->tk_client;
995
996         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
997                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
998                 goto retry;
999         }
1000
1001         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
1002         task->tk_timeouts++;
1003
1004         if (RPC_IS_SOFT(task)) {
1005                 printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1006                                 clnt->cl_protname, clnt->cl_server);
1007                 rpc_exit(task, -EIO);
1008                 return;
1009         }
1010
1011         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1012                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1013                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1014                         clnt->cl_protname, clnt->cl_server);
1015         }
1016         rpc_force_rebind(clnt);
1017
1018 retry:
1019         clnt->cl_stats->rpcretrans++;
1020         task->tk_action = call_bind;
1021         task->tk_status = 0;
1022 }
1023
1024 /*
1025  * 7.   Decode the RPC reply
1026  */
1027 static void
1028 call_decode(struct rpc_task *task)
1029 {
1030         struct rpc_clnt *clnt = task->tk_client;
1031         struct rpc_rqst *req = task->tk_rqstp;
1032         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
1033         u32             *p;
1034
1035         dprintk("RPC: %4d call_decode (status %d)\n", 
1036                                 task->tk_pid, task->tk_status);
1037
1038         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1039                 printk(KERN_NOTICE "%s: server %s OK\n",
1040                         clnt->cl_protname, clnt->cl_server);
1041                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1042         }
1043
1044         if (task->tk_status < 12) {
1045                 if (!RPC_IS_SOFT(task)) {
1046                         task->tk_action = call_bind;
1047                         clnt->cl_stats->rpcretrans++;
1048                         goto out_retry;
1049                 }
1050                 printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
1051                         clnt->cl_protname, task->tk_status);
1052                 rpc_exit(task, -EIO);
1053                 return;
1054         }
1055
1056         req->rq_rcv_buf.len = req->rq_private_buf.len;
1057
1058         /* Check that the softirq receive buffer is valid */
1059         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1060                                 sizeof(req->rq_rcv_buf)) != 0);
1061
1062         /* Verify the RPC header */
1063         p = call_verify(task);
1064         if (IS_ERR(p)) {
1065                 if (p == ERR_PTR(-EAGAIN))
1066                         goto out_retry;
1067                 return;
1068         }
1069
1070         task->tk_action = rpc_exit_task;
1071
1072         if (decode)
1073                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1074                                                       task->tk_msg.rpc_resp);
1075         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
1076                                         task->tk_status);
1077         return;
1078 out_retry:
1079         req->rq_received = req->rq_private_buf.len = 0;
1080         task->tk_status = 0;
1081 }
1082
1083 /*
1084  * 8.   Refresh the credentials if rejected by the server
1085  */
1086 static void
1087 call_refresh(struct rpc_task *task)
1088 {
1089         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
1090
1091         xprt_release(task);     /* Must do to obtain new XID */
1092         task->tk_action = call_refreshresult;
1093         task->tk_status = 0;
1094         task->tk_client->cl_stats->rpcauthrefresh++;
1095         rpcauth_refreshcred(task);
1096 }
1097
1098 /*
1099  * 8a.  Process the results of a credential refresh
1100  */
1101 static void
1102 call_refreshresult(struct rpc_task *task)
1103 {
1104         int status = task->tk_status;
1105         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
1106                                 task->tk_pid, task->tk_status);
1107
1108         task->tk_status = 0;
1109         task->tk_action = call_reserve;
1110         if (status >= 0 && rpcauth_uptodatecred(task))
1111                 return;
1112         if (status == -EACCES) {
1113                 rpc_exit(task, -EACCES);
1114                 return;
1115         }
1116         task->tk_action = call_refresh;
1117         if (status != -ETIMEDOUT)
1118                 rpc_delay(task, 3*HZ);
1119         return;
1120 }
1121
1122 /*
1123  * Call header serialization
1124  */
1125 static u32 *
1126 call_header(struct rpc_task *task)
1127 {
1128         struct rpc_clnt *clnt = task->tk_client;
1129         struct rpc_rqst *req = task->tk_rqstp;
1130         u32             *p = req->rq_svec[0].iov_base;
1131
1132         /* FIXME: check buffer size? */
1133
1134         p = xprt_skip_transport_header(task->tk_xprt, p);
1135         *p++ = req->rq_xid;             /* XID */
1136         *p++ = htonl(RPC_CALL);         /* CALL */
1137         *p++ = htonl(RPC_VERSION);      /* RPC version */
1138         *p++ = htonl(clnt->cl_prog);    /* program number */
1139         *p++ = htonl(clnt->cl_vers);    /* program version */
1140         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1141         p = rpcauth_marshcred(task, p);
1142         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1143         return p;
1144 }
1145
1146 /*
1147  * Reply header verification
1148  */
1149 static u32 *
1150 call_verify(struct rpc_task *task)
1151 {
1152         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1153         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1154         u32     *p = iov->iov_base, n;
1155         int error = -EACCES;
1156
1157         if ((len -= 3) < 0)
1158                 goto out_overflow;
1159         p += 1; /* skip XID */
1160
1161         if ((n = ntohl(*p++)) != RPC_REPLY) {
1162                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1163                 goto out_garbage;
1164         }
1165         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1166                 if (--len < 0)
1167                         goto out_overflow;
1168                 switch ((n = ntohl(*p++))) {
1169                         case RPC_AUTH_ERROR:
1170                                 break;
1171                         case RPC_MISMATCH:
1172                                 dprintk("%s: RPC call version mismatch!\n", __FUNCTION__);
1173                                 error = -EPROTONOSUPPORT;
1174                                 goto out_err;
1175                         default:
1176                                 dprintk("%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
1177                                 goto out_eio;
1178                 }
1179                 if (--len < 0)
1180                         goto out_overflow;
1181                 switch ((n = ntohl(*p++))) {
1182                 case RPC_AUTH_REJECTEDCRED:
1183                 case RPC_AUTH_REJECTEDVERF:
1184                 case RPCSEC_GSS_CREDPROBLEM:
1185                 case RPCSEC_GSS_CTXPROBLEM:
1186                         if (!task->tk_cred_retry)
1187                                 break;
1188                         task->tk_cred_retry--;
1189                         dprintk("RPC: %4d call_verify: retry stale creds\n",
1190                                                         task->tk_pid);
1191                         rpcauth_invalcred(task);
1192                         task->tk_action = call_refresh;
1193                         goto out_retry;
1194                 case RPC_AUTH_BADCRED:
1195                 case RPC_AUTH_BADVERF:
1196                         /* possibly garbled cred/verf? */
1197                         if (!task->tk_garb_retry)
1198                                 break;
1199                         task->tk_garb_retry--;
1200                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
1201                                                         task->tk_pid);
1202                         task->tk_action = call_bind;
1203                         goto out_retry;
1204                 case RPC_AUTH_TOOWEAK:
1205                         printk(KERN_NOTICE "call_verify: server %s requires stronger "
1206                                "authentication.\n", task->tk_client->cl_server);
1207                         break;
1208                 default:
1209                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1210                         error = -EIO;
1211                 }
1212                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1213                                                 task->tk_pid, n);
1214                 goto out_err;
1215         }
1216         if (!(p = rpcauth_checkverf(task, p))) {
1217                 printk(KERN_WARNING "call_verify: auth check failed\n");
1218                 goto out_garbage;               /* bad verifier, retry */
1219         }
1220         len = p - (u32 *)iov->iov_base - 1;
1221         if (len < 0)
1222                 goto out_overflow;
1223         switch ((n = ntohl(*p++))) {
1224         case RPC_SUCCESS:
1225                 return p;
1226         case RPC_PROG_UNAVAIL:
1227                 dprintk("RPC: call_verify: program %u is unsupported by server %s\n",
1228                                 (unsigned int)task->tk_client->cl_prog,
1229                                 task->tk_client->cl_server);
1230                 error = -EPFNOSUPPORT;
1231                 goto out_err;
1232         case RPC_PROG_MISMATCH:
1233                 dprintk("RPC: call_verify: program %u, version %u unsupported by server %s\n",
1234                                 (unsigned int)task->tk_client->cl_prog,
1235                                 (unsigned int)task->tk_client->cl_vers,
1236                                 task->tk_client->cl_server);
1237                 error = -EPROTONOSUPPORT;
1238                 goto out_err;
1239         case RPC_PROC_UNAVAIL:
1240                 dprintk("RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1241                                 task->tk_msg.rpc_proc,
1242                                 task->tk_client->cl_prog,
1243                                 task->tk_client->cl_vers,
1244                                 task->tk_client->cl_server);
1245                 error = -EOPNOTSUPP;
1246                 goto out_err;
1247         case RPC_GARBAGE_ARGS:
1248                 dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
1249                 break;                  /* retry */
1250         default:
1251                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1252                 /* Also retry */
1253         }
1254
1255 out_garbage:
1256         task->tk_client->cl_stats->rpcgarbage++;
1257         if (task->tk_garb_retry) {
1258                 task->tk_garb_retry--;
1259                 dprintk("RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
1260                 task->tk_action = call_bind;
1261 out_retry:
1262                 return ERR_PTR(-EAGAIN);
1263         }
1264         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1265 out_eio:
1266         error = -EIO;
1267 out_err:
1268         rpc_exit(task, error);
1269         return ERR_PTR(error);
1270 out_overflow:
1271         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1272         goto out_garbage;
1273 }
1274
1275 static int rpcproc_encode_null(void *rqstp, u32 *data, void *obj)
1276 {
1277         return 0;
1278 }
1279
1280 static int rpcproc_decode_null(void *rqstp, u32 *data, void *obj)
1281 {
1282         return 0;
1283 }
1284
1285 static struct rpc_procinfo rpcproc_null = {
1286         .p_encode = rpcproc_encode_null,
1287         .p_decode = rpcproc_decode_null,
1288 };
1289
1290 int rpc_ping(struct rpc_clnt *clnt, int flags)
1291 {
1292         struct rpc_message msg = {
1293                 .rpc_proc = &rpcproc_null,
1294         };
1295         int err;
1296         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1297         err = rpc_call_sync(clnt, &msg, flags);
1298         put_rpccred(msg.rpc_cred);
1299         return err;
1300 }