]> Pileus Git - ~andy/linux/blob - net/tipc/socket.c
tipc: Remove unneeded parameter to tipc_createport_raw()
[~andy/linux] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, Ericsson AB
5  * Copyright (c) 2004-2008, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/module.h>
38 #include <linux/types.h>
39 #include <linux/net.h>
40 #include <linux/socket.h>
41 #include <linux/errno.h>
42 #include <linux/mm.h>
43 #include <linux/slab.h>
44 #include <linux/poll.h>
45 #include <linux/fcntl.h>
46 #include <asm/string.h>
47 #include <asm/atomic.h>
48 #include <net/sock.h>
49
50 #include <linux/tipc.h>
51 #include <linux/tipc_config.h>
52 #include <net/tipc/tipc_msg.h>
53 #include <net/tipc/tipc_port.h>
54
55 #include "core.h"
56
57 #define SS_LISTENING    -1      /* socket is listening */
58 #define SS_READY        -2      /* socket is connectionless */
59
60 #define OVERLOAD_LIMIT_BASE     5000
61 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
62
63 struct tipc_sock {
64         struct sock sk;
65         struct tipc_port *p;
66 };
67
68 #define tipc_sk(sk) ((struct tipc_sock *)(sk))
69 #define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
70
71 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
72 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
73 static void wakeupdispatch(struct tipc_port *tport);
74
75 static const struct proto_ops packet_ops;
76 static const struct proto_ops stream_ops;
77 static const struct proto_ops msg_ops;
78
79 static struct proto tipc_proto;
80
81 static int sockets_enabled = 0;
82
83 static atomic_t tipc_queue_size = ATOMIC_INIT(0);
84
85 /*
86  * Revised TIPC socket locking policy:
87  *
88  * Most socket operations take the standard socket lock when they start
89  * and hold it until they finish (or until they need to sleep).  Acquiring
90  * this lock grants the owner exclusive access to the fields of the socket
91  * data structures, with the exception of the backlog queue.  A few socket
92  * operations can be done without taking the socket lock because they only
93  * read socket information that never changes during the life of the socket.
94  *
95  * Socket operations may acquire the lock for the associated TIPC port if they
96  * need to perform an operation on the port.  If any routine needs to acquire
97  * both the socket lock and the port lock it must take the socket lock first
98  * to avoid the risk of deadlock.
99  *
100  * The dispatcher handling incoming messages cannot grab the socket lock in
101  * the standard fashion, since invoked it runs at the BH level and cannot block.
102  * Instead, it checks to see if the socket lock is currently owned by someone,
103  * and either handles the message itself or adds it to the socket's backlog
104  * queue; in the latter case the queued message is processed once the process
105  * owning the socket lock releases it.
106  *
107  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
108  * the problem of a blocked socket operation preventing any other operations
109  * from occurring.  However, applications must be careful if they have
110  * multiple threads trying to send (or receive) on the same socket, as these
111  * operations might interfere with each other.  For example, doing a connect
112  * and a receive at the same time might allow the receive to consume the
113  * ACK message meant for the connect.  While additional work could be done
114  * to try and overcome this, it doesn't seem to be worthwhile at the present.
115  *
116  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
117  * that another operation that must be performed in a non-blocking manner is
118  * not delayed for very long because the lock has already been taken.
119  *
120  * NOTE: This code assumes that certain fields of a port/socket pair are
121  * constant over its lifetime; such fields can be examined without taking
122  * the socket lock and/or port lock, and do not need to be re-read even
123  * after resuming processing after waiting.  These fields include:
124  *   - socket type
125  *   - pointer to socket sk structure (aka tipc_sock structure)
126  *   - pointer to port structure
127  *   - port reference
128  */
129
130 /**
131  * advance_rx_queue - discard first buffer in socket receive queue
132  *
133  * Caller must hold socket lock
134  */
135
136 static void advance_rx_queue(struct sock *sk)
137 {
138         buf_discard(__skb_dequeue(&sk->sk_receive_queue));
139         atomic_dec(&tipc_queue_size);
140 }
141
142 /**
143  * discard_rx_queue - discard all buffers in socket receive queue
144  *
145  * Caller must hold socket lock
146  */
147
148 static void discard_rx_queue(struct sock *sk)
149 {
150         struct sk_buff *buf;
151
152         while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
153                 atomic_dec(&tipc_queue_size);
154                 buf_discard(buf);
155         }
156 }
157
158 /**
159  * reject_rx_queue - reject all buffers in socket receive queue
160  *
161  * Caller must hold socket lock
162  */
163
164 static void reject_rx_queue(struct sock *sk)
165 {
166         struct sk_buff *buf;
167
168         while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
169                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
170                 atomic_dec(&tipc_queue_size);
171         }
172 }
173
174 /**
175  * tipc_create - create a TIPC socket
176  * @net: network namespace (must be default network)
177  * @sock: pre-allocated socket structure
178  * @protocol: protocol indicator (must be 0)
179  *
180  * This routine creates additional data structures used by the TIPC socket,
181  * initializes them, and links them together.
182  *
183  * Returns 0 on success, errno otherwise
184  */
185
186 static int tipc_create(struct net *net, struct socket *sock, int protocol)
187 {
188         const struct proto_ops *ops;
189         socket_state state;
190         struct sock *sk;
191         struct tipc_port *tp_ptr;
192
193         /* Validate arguments */
194
195         if (net != &init_net)
196                 return -EAFNOSUPPORT;
197
198         if (unlikely(protocol != 0))
199                 return -EPROTONOSUPPORT;
200
201         switch (sock->type) {
202         case SOCK_STREAM:
203                 ops = &stream_ops;
204                 state = SS_UNCONNECTED;
205                 break;
206         case SOCK_SEQPACKET:
207                 ops = &packet_ops;
208                 state = SS_UNCONNECTED;
209                 break;
210         case SOCK_DGRAM:
211         case SOCK_RDM:
212                 ops = &msg_ops;
213                 state = SS_READY;
214                 break;
215         default:
216                 return -EPROTOTYPE;
217         }
218
219         /* Allocate socket's protocol area */
220
221         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
222         if (sk == NULL)
223                 return -ENOMEM;
224
225         /* Allocate TIPC port for socket to use */
226
227         tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
228                                      TIPC_LOW_IMPORTANCE);
229         if (unlikely(!tp_ptr)) {
230                 sk_free(sk);
231                 return -ENOMEM;
232         }
233
234         /* Finish initializing socket data structures */
235
236         sock->ops = ops;
237         sock->state = state;
238
239         sock_init_data(sock, sk);
240         sk->sk_rcvtimeo = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
241         sk->sk_backlog_rcv = backlog_rcv;
242         tipc_sk(sk)->p = tp_ptr;
243
244         spin_unlock_bh(tp_ptr->lock);
245
246         if (sock->state == SS_READY) {
247                 tipc_set_portunreturnable(tp_ptr->ref, 1);
248                 if (sock->type == SOCK_DGRAM)
249                         tipc_set_portunreliable(tp_ptr->ref, 1);
250         }
251
252         atomic_inc(&tipc_user_count);
253         return 0;
254 }
255
256 /**
257  * release - destroy a TIPC socket
258  * @sock: socket to destroy
259  *
260  * This routine cleans up any messages that are still queued on the socket.
261  * For DGRAM and RDM socket types, all queued messages are rejected.
262  * For SEQPACKET and STREAM socket types, the first message is rejected
263  * and any others are discarded.  (If the first message on a STREAM socket
264  * is partially-read, it is discarded and the next one is rejected instead.)
265  *
266  * NOTE: Rejected messages are not necessarily returned to the sender!  They
267  * are returned or discarded according to the "destination droppable" setting
268  * specified for the message by the sender.
269  *
270  * Returns 0 on success, errno otherwise
271  */
272
273 static int release(struct socket *sock)
274 {
275         struct sock *sk = sock->sk;
276         struct tipc_port *tport;
277         struct sk_buff *buf;
278         int res;
279
280         /*
281          * Exit if socket isn't fully initialized (occurs when a failed accept()
282          * releases a pre-allocated child socket that was never used)
283          */
284
285         if (sk == NULL)
286                 return 0;
287
288         tport = tipc_sk_port(sk);
289         lock_sock(sk);
290
291         /*
292          * Reject all unreceived messages, except on an active connection
293          * (which disconnects locally & sends a 'FIN+' to peer)
294          */
295
296         while (sock->state != SS_DISCONNECTING) {
297                 buf = __skb_dequeue(&sk->sk_receive_queue);
298                 if (buf == NULL)
299                         break;
300                 atomic_dec(&tipc_queue_size);
301                 if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf)))
302                         buf_discard(buf);
303                 else {
304                         if ((sock->state == SS_CONNECTING) ||
305                             (sock->state == SS_CONNECTED)) {
306                                 sock->state = SS_DISCONNECTING;
307                                 tipc_disconnect(tport->ref);
308                         }
309                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
310                 }
311         }
312
313         /*
314          * Delete TIPC port; this ensures no more messages are queued
315          * (also disconnects an active connection & sends a 'FIN-' to peer)
316          */
317
318         res = tipc_deleteport(tport->ref);
319
320         /* Discard any remaining (connection-based) messages in receive queue */
321
322         discard_rx_queue(sk);
323
324         /* Reject any messages that accumulated in backlog queue */
325
326         sock->state = SS_DISCONNECTING;
327         release_sock(sk);
328
329         sock_put(sk);
330         sock->sk = NULL;
331
332         atomic_dec(&tipc_user_count);
333         return res;
334 }
335
336 /**
337  * bind - associate or disassocate TIPC name(s) with a socket
338  * @sock: socket structure
339  * @uaddr: socket address describing name(s) and desired operation
340  * @uaddr_len: size of socket address data structure
341  *
342  * Name and name sequence binding is indicated using a positive scope value;
343  * a negative scope value unbinds the specified name.  Specifying no name
344  * (i.e. a socket address length of 0) unbinds all names from the socket.
345  *
346  * Returns 0 on success, errno otherwise
347  *
348  * NOTE: This routine doesn't need to take the socket lock since it doesn't
349  *       access any non-constant socket information.
350  */
351
352 static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
353 {
354         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
355         u32 portref = tipc_sk_port(sock->sk)->ref;
356
357         if (unlikely(!uaddr_len))
358                 return tipc_withdraw(portref, 0, NULL);
359
360         if (uaddr_len < sizeof(struct sockaddr_tipc))
361                 return -EINVAL;
362         if (addr->family != AF_TIPC)
363                 return -EAFNOSUPPORT;
364
365         if (addr->addrtype == TIPC_ADDR_NAME)
366                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
367         else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
368                 return -EAFNOSUPPORT;
369
370         return (addr->scope > 0) ?
371                 tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
372                 tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
373 }
374
375 /**
376  * get_name - get port ID of socket or peer socket
377  * @sock: socket structure
378  * @uaddr: area for returned socket address
379  * @uaddr_len: area for returned length of socket address
380  * @peer: 0 to obtain socket name, 1 to obtain peer socket name
381  *
382  * Returns 0 on success, errno otherwise
383  *
384  * NOTE: This routine doesn't need to take the socket lock since it doesn't
385  *       access any non-constant socket information.
386  */
387
388 static int get_name(struct socket *sock, struct sockaddr *uaddr,
389                     int *uaddr_len, int peer)
390 {
391         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
392         u32 portref = tipc_sk_port(sock->sk)->ref;
393         u32 res;
394
395         if (peer) {
396                 res = tipc_peer(portref, &addr->addr.id);
397                 if (res)
398                         return res;
399         } else {
400                 tipc_ownidentity(portref, &addr->addr.id);
401         }
402
403         *uaddr_len = sizeof(*addr);
404         addr->addrtype = TIPC_ADDR_ID;
405         addr->family = AF_TIPC;
406         addr->scope = 0;
407         addr->addr.name.domain = 0;
408
409         return 0;
410 }
411
412 /**
413  * poll - read and possibly block on pollmask
414  * @file: file structure associated with the socket
415  * @sock: socket for which to calculate the poll bits
416  * @wait: ???
417  *
418  * Returns pollmask value
419  *
420  * COMMENTARY:
421  * It appears that the usual socket locking mechanisms are not useful here
422  * since the pollmask info is potentially out-of-date the moment this routine
423  * exits.  TCP and other protocols seem to rely on higher level poll routines
424  * to handle any preventable race conditions, so TIPC will do the same ...
425  *
426  * TIPC sets the returned events as follows:
427  * a) POLLRDNORM and POLLIN are set if the socket's receive queue is non-empty
428  *    or if a connection-oriented socket is does not have an active connection
429  *    (i.e. a read operation will not block).
430  * b) POLLOUT is set except when a socket's connection has been terminated
431  *    (i.e. a write operation will not block).
432  * c) POLLHUP is set when a socket's connection has been terminated.
433  *
434  * IMPORTANT: The fact that a read or write operation will not block does NOT
435  * imply that the operation will succeed!
436  */
437
438 static unsigned int poll(struct file *file, struct socket *sock,
439                          poll_table *wait)
440 {
441         struct sock *sk = sock->sk;
442         u32 mask;
443
444         poll_wait(file, sk->sk_sleep, wait);
445
446         if (!skb_queue_empty(&sk->sk_receive_queue) ||
447             (sock->state == SS_UNCONNECTED) ||
448             (sock->state == SS_DISCONNECTING))
449                 mask = (POLLRDNORM | POLLIN);
450         else
451                 mask = 0;
452
453         if (sock->state == SS_DISCONNECTING)
454                 mask |= POLLHUP;
455         else
456                 mask |= POLLOUT;
457
458         return mask;
459 }
460
461 /**
462  * dest_name_check - verify user is permitted to send to specified port name
463  * @dest: destination address
464  * @m: descriptor for message to be sent
465  *
466  * Prevents restricted configuration commands from being issued by
467  * unauthorized users.
468  *
469  * Returns 0 if permission is granted, otherwise errno
470  */
471
472 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
473 {
474         struct tipc_cfg_msg_hdr hdr;
475
476         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
477                 return 0;
478         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
479                 return 0;
480         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
481                 return -EACCES;
482
483         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
484                 return -EFAULT;
485         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
486                 return -EACCES;
487
488         return 0;
489 }
490
491 /**
492  * send_msg - send message in connectionless manner
493  * @iocb: if NULL, indicates that socket lock is already held
494  * @sock: socket structure
495  * @m: message to send
496  * @total_len: length of message
497  *
498  * Message must have an destination specified explicitly.
499  * Used for SOCK_RDM and SOCK_DGRAM messages,
500  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
501  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
502  *
503  * Returns the number of bytes sent on success, or errno otherwise
504  */
505
506 static int send_msg(struct kiocb *iocb, struct socket *sock,
507                     struct msghdr *m, size_t total_len)
508 {
509         struct sock *sk = sock->sk;
510         struct tipc_port *tport = tipc_sk_port(sk);
511         struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
512         int needs_conn;
513         int res = -EINVAL;
514
515         if (unlikely(!dest))
516                 return -EDESTADDRREQ;
517         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
518                      (dest->family != AF_TIPC)))
519                 return -EINVAL;
520
521         if (iocb)
522                 lock_sock(sk);
523
524         needs_conn = (sock->state != SS_READY);
525         if (unlikely(needs_conn)) {
526                 if (sock->state == SS_LISTENING) {
527                         res = -EPIPE;
528                         goto exit;
529                 }
530                 if (sock->state != SS_UNCONNECTED) {
531                         res = -EISCONN;
532                         goto exit;
533                 }
534                 if ((tport->published) ||
535                     ((sock->type == SOCK_STREAM) && (total_len != 0))) {
536                         res = -EOPNOTSUPP;
537                         goto exit;
538                 }
539                 if (dest->addrtype == TIPC_ADDR_NAME) {
540                         tport->conn_type = dest->addr.name.name.type;
541                         tport->conn_instance = dest->addr.name.name.instance;
542                 }
543
544                 /* Abort any pending connection attempts (very unlikely) */
545
546                 reject_rx_queue(sk);
547         }
548
549         do {
550                 if (dest->addrtype == TIPC_ADDR_NAME) {
551                         if ((res = dest_name_check(dest, m)))
552                                 break;
553                         res = tipc_send2name(tport->ref,
554                                              &dest->addr.name.name,
555                                              dest->addr.name.domain,
556                                              m->msg_iovlen,
557                                              m->msg_iov);
558                 }
559                 else if (dest->addrtype == TIPC_ADDR_ID) {
560                         res = tipc_send2port(tport->ref,
561                                              &dest->addr.id,
562                                              m->msg_iovlen,
563                                              m->msg_iov);
564                 }
565                 else if (dest->addrtype == TIPC_ADDR_MCAST) {
566                         if (needs_conn) {
567                                 res = -EOPNOTSUPP;
568                                 break;
569                         }
570                         if ((res = dest_name_check(dest, m)))
571                                 break;
572                         res = tipc_multicast(tport->ref,
573                                              &dest->addr.nameseq,
574                                              0,
575                                              m->msg_iovlen,
576                                              m->msg_iov);
577                 }
578                 if (likely(res != -ELINKCONG)) {
579                         if (needs_conn && (res >= 0)) {
580                                 sock->state = SS_CONNECTING;
581                         }
582                         break;
583                 }
584                 if (m->msg_flags & MSG_DONTWAIT) {
585                         res = -EWOULDBLOCK;
586                         break;
587                 }
588                 release_sock(sk);
589                 res = wait_event_interruptible(*sk->sk_sleep,
590                                                !tport->congested);
591                 lock_sock(sk);
592                 if (res)
593                         break;
594         } while (1);
595
596 exit:
597         if (iocb)
598                 release_sock(sk);
599         return res;
600 }
601
602 /**
603  * send_packet - send a connection-oriented message
604  * @iocb: if NULL, indicates that socket lock is already held
605  * @sock: socket structure
606  * @m: message to send
607  * @total_len: length of message
608  *
609  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
610  *
611  * Returns the number of bytes sent on success, or errno otherwise
612  */
613
614 static int send_packet(struct kiocb *iocb, struct socket *sock,
615                        struct msghdr *m, size_t total_len)
616 {
617         struct sock *sk = sock->sk;
618         struct tipc_port *tport = tipc_sk_port(sk);
619         struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
620         int res;
621
622         /* Handle implied connection establishment */
623
624         if (unlikely(dest))
625                 return send_msg(iocb, sock, m, total_len);
626
627         if (iocb)
628                 lock_sock(sk);
629
630         do {
631                 if (unlikely(sock->state != SS_CONNECTED)) {
632                         if (sock->state == SS_DISCONNECTING)
633                                 res = -EPIPE;
634                         else
635                                 res = -ENOTCONN;
636                         break;
637                 }
638
639                 res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov);
640                 if (likely(res != -ELINKCONG)) {
641                         break;
642                 }
643                 if (m->msg_flags & MSG_DONTWAIT) {
644                         res = -EWOULDBLOCK;
645                         break;
646                 }
647                 release_sock(sk);
648                 res = wait_event_interruptible(*sk->sk_sleep,
649                         (!tport->congested || !tport->connected));
650                 lock_sock(sk);
651                 if (res)
652                         break;
653         } while (1);
654
655         if (iocb)
656                 release_sock(sk);
657         return res;
658 }
659
660 /**
661  * send_stream - send stream-oriented data
662  * @iocb: (unused)
663  * @sock: socket structure
664  * @m: data to send
665  * @total_len: total length of data to be sent
666  *
667  * Used for SOCK_STREAM data.
668  *
669  * Returns the number of bytes sent on success (or partial success),
670  * or errno if no data sent
671  */
672
673 static int send_stream(struct kiocb *iocb, struct socket *sock,
674                        struct msghdr *m, size_t total_len)
675 {
676         struct sock *sk = sock->sk;
677         struct tipc_port *tport = tipc_sk_port(sk);
678         struct msghdr my_msg;
679         struct iovec my_iov;
680         struct iovec *curr_iov;
681         int curr_iovlen;
682         char __user *curr_start;
683         u32 hdr_size;
684         int curr_left;
685         int bytes_to_send;
686         int bytes_sent;
687         int res;
688
689         lock_sock(sk);
690
691         /* Handle special cases where there is no connection */
692
693         if (unlikely(sock->state != SS_CONNECTED)) {
694                 if (sock->state == SS_UNCONNECTED) {
695                         res = send_packet(NULL, sock, m, total_len);
696                         goto exit;
697                 } else if (sock->state == SS_DISCONNECTING) {
698                         res = -EPIPE;
699                         goto exit;
700                 } else {
701                         res = -ENOTCONN;
702                         goto exit;
703                 }
704         }
705
706         if (unlikely(m->msg_name)) {
707                 res = -EISCONN;
708                 goto exit;
709         }
710
711         /*
712          * Send each iovec entry using one or more messages
713          *
714          * Note: This algorithm is good for the most likely case
715          * (i.e. one large iovec entry), but could be improved to pass sets
716          * of small iovec entries into send_packet().
717          */
718
719         curr_iov = m->msg_iov;
720         curr_iovlen = m->msg_iovlen;
721         my_msg.msg_iov = &my_iov;
722         my_msg.msg_iovlen = 1;
723         my_msg.msg_flags = m->msg_flags;
724         my_msg.msg_name = NULL;
725         bytes_sent = 0;
726
727         hdr_size = msg_hdr_sz(&tport->phdr);
728
729         while (curr_iovlen--) {
730                 curr_start = curr_iov->iov_base;
731                 curr_left = curr_iov->iov_len;
732
733                 while (curr_left) {
734                         bytes_to_send = tport->max_pkt - hdr_size;
735                         if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
736                                 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
737                         if (curr_left < bytes_to_send)
738                                 bytes_to_send = curr_left;
739                         my_iov.iov_base = curr_start;
740                         my_iov.iov_len = bytes_to_send;
741                         if ((res = send_packet(NULL, sock, &my_msg, 0)) < 0) {
742                                 if (bytes_sent)
743                                         res = bytes_sent;
744                                 goto exit;
745                         }
746                         curr_left -= bytes_to_send;
747                         curr_start += bytes_to_send;
748                         bytes_sent += bytes_to_send;
749                 }
750
751                 curr_iov++;
752         }
753         res = bytes_sent;
754 exit:
755         release_sock(sk);
756         return res;
757 }
758
759 /**
760  * auto_connect - complete connection setup to a remote port
761  * @sock: socket structure
762  * @msg: peer's response message
763  *
764  * Returns 0 on success, errno otherwise
765  */
766
767 static int auto_connect(struct socket *sock, struct tipc_msg *msg)
768 {
769         struct tipc_port *tport = tipc_sk_port(sock->sk);
770         struct tipc_portid peer;
771
772         if (msg_errcode(msg)) {
773                 sock->state = SS_DISCONNECTING;
774                 return -ECONNREFUSED;
775         }
776
777         peer.ref = msg_origport(msg);
778         peer.node = msg_orignode(msg);
779         tipc_connect2port(tport->ref, &peer);
780         tipc_set_portimportance(tport->ref, msg_importance(msg));
781         sock->state = SS_CONNECTED;
782         return 0;
783 }
784
785 /**
786  * set_orig_addr - capture sender's address for received message
787  * @m: descriptor for message info
788  * @msg: received message header
789  *
790  * Note: Address is not captured if not requested by receiver.
791  */
792
793 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
794 {
795         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
796
797         if (addr) {
798                 addr->family = AF_TIPC;
799                 addr->addrtype = TIPC_ADDR_ID;
800                 addr->addr.id.ref = msg_origport(msg);
801                 addr->addr.id.node = msg_orignode(msg);
802                 addr->addr.name.domain = 0;     /* could leave uninitialized */
803                 addr->scope = 0;                /* could leave uninitialized */
804                 m->msg_namelen = sizeof(struct sockaddr_tipc);
805         }
806 }
807
808 /**
809  * anc_data_recv - optionally capture ancillary data for received message
810  * @m: descriptor for message info
811  * @msg: received message header
812  * @tport: TIPC port associated with message
813  *
814  * Note: Ancillary data is not captured if not requested by receiver.
815  *
816  * Returns 0 if successful, otherwise errno
817  */
818
819 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
820                                 struct tipc_port *tport)
821 {
822         u32 anc_data[3];
823         u32 err;
824         u32 dest_type;
825         int has_name;
826         int res;
827
828         if (likely(m->msg_controllen == 0))
829                 return 0;
830
831         /* Optionally capture errored message object(s) */
832
833         err = msg ? msg_errcode(msg) : 0;
834         if (unlikely(err)) {
835                 anc_data[0] = err;
836                 anc_data[1] = msg_data_sz(msg);
837                 if ((res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data)))
838                         return res;
839                 if (anc_data[1] &&
840                     (res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
841                                     msg_data(msg))))
842                         return res;
843         }
844
845         /* Optionally capture message destination object */
846
847         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
848         switch (dest_type) {
849         case TIPC_NAMED_MSG:
850                 has_name = 1;
851                 anc_data[0] = msg_nametype(msg);
852                 anc_data[1] = msg_namelower(msg);
853                 anc_data[2] = msg_namelower(msg);
854                 break;
855         case TIPC_MCAST_MSG:
856                 has_name = 1;
857                 anc_data[0] = msg_nametype(msg);
858                 anc_data[1] = msg_namelower(msg);
859                 anc_data[2] = msg_nameupper(msg);
860                 break;
861         case TIPC_CONN_MSG:
862                 has_name = (tport->conn_type != 0);
863                 anc_data[0] = tport->conn_type;
864                 anc_data[1] = tport->conn_instance;
865                 anc_data[2] = tport->conn_instance;
866                 break;
867         default:
868                 has_name = 0;
869         }
870         if (has_name &&
871             (res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data)))
872                 return res;
873
874         return 0;
875 }
876
877 /**
878  * recv_msg - receive packet-oriented message
879  * @iocb: (unused)
880  * @m: descriptor for message info
881  * @buf_len: total size of user buffer area
882  * @flags: receive flags
883  *
884  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
885  * If the complete message doesn't fit in user area, truncate it.
886  *
887  * Returns size of returned message data, errno otherwise
888  */
889
890 static int recv_msg(struct kiocb *iocb, struct socket *sock,
891                     struct msghdr *m, size_t buf_len, int flags)
892 {
893         struct sock *sk = sock->sk;
894         struct tipc_port *tport = tipc_sk_port(sk);
895         struct sk_buff *buf;
896         struct tipc_msg *msg;
897         unsigned int sz;
898         u32 err;
899         int res;
900
901         /* Catch invalid receive requests */
902
903         if (m->msg_iovlen != 1)
904                 return -EOPNOTSUPP;   /* Don't do multiple iovec entries yet */
905
906         if (unlikely(!buf_len))
907                 return -EINVAL;
908
909         lock_sock(sk);
910
911         if (unlikely(sock->state == SS_UNCONNECTED)) {
912                 res = -ENOTCONN;
913                 goto exit;
914         }
915
916 restart:
917
918         /* Look for a message in receive queue; wait if necessary */
919
920         while (skb_queue_empty(&sk->sk_receive_queue)) {
921                 if (sock->state == SS_DISCONNECTING) {
922                         res = -ENOTCONN;
923                         goto exit;
924                 }
925                 if (flags & MSG_DONTWAIT) {
926                         res = -EWOULDBLOCK;
927                         goto exit;
928                 }
929                 release_sock(sk);
930                 res = wait_event_interruptible(*sk->sk_sleep,
931                         (!skb_queue_empty(&sk->sk_receive_queue) ||
932                          (sock->state == SS_DISCONNECTING)));
933                 lock_sock(sk);
934                 if (res)
935                         goto exit;
936         }
937
938         /* Look at first message in receive queue */
939
940         buf = skb_peek(&sk->sk_receive_queue);
941         msg = buf_msg(buf);
942         sz = msg_data_sz(msg);
943         err = msg_errcode(msg);
944
945         /* Complete connection setup for an implied connect */
946
947         if (unlikely(sock->state == SS_CONNECTING)) {
948                 res = auto_connect(sock, msg);
949                 if (res)
950                         goto exit;
951         }
952
953         /* Discard an empty non-errored message & try again */
954
955         if ((!sz) && (!err)) {
956                 advance_rx_queue(sk);
957                 goto restart;
958         }
959
960         /* Capture sender's address (optional) */
961
962         set_orig_addr(m, msg);
963
964         /* Capture ancillary data (optional) */
965
966         res = anc_data_recv(m, msg, tport);
967         if (res)
968                 goto exit;
969
970         /* Capture message data (if valid) & compute return value (always) */
971
972         if (!err) {
973                 if (unlikely(buf_len < sz)) {
974                         sz = buf_len;
975                         m->msg_flags |= MSG_TRUNC;
976                 }
977                 if (unlikely(copy_to_user(m->msg_iov->iov_base, msg_data(msg),
978                                           sz))) {
979                         res = -EFAULT;
980                         goto exit;
981                 }
982                 res = sz;
983         } else {
984                 if ((sock->state == SS_READY) ||
985                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
986                         res = 0;
987                 else
988                         res = -ECONNRESET;
989         }
990
991         /* Consume received message (optional) */
992
993         if (likely(!(flags & MSG_PEEK))) {
994                 if ((sock->state != SS_READY) &&
995                     (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
996                         tipc_acknowledge(tport->ref, tport->conn_unacked);
997                 advance_rx_queue(sk);
998         }
999 exit:
1000         release_sock(sk);
1001         return res;
1002 }
1003
1004 /**
1005  * recv_stream - receive stream-oriented data
1006  * @iocb: (unused)
1007  * @m: descriptor for message info
1008  * @buf_len: total size of user buffer area
1009  * @flags: receive flags
1010  *
1011  * Used for SOCK_STREAM messages only.  If not enough data is available
1012  * will optionally wait for more; never truncates data.
1013  *
1014  * Returns size of returned message data, errno otherwise
1015  */
1016
1017 static int recv_stream(struct kiocb *iocb, struct socket *sock,
1018                        struct msghdr *m, size_t buf_len, int flags)
1019 {
1020         struct sock *sk = sock->sk;
1021         struct tipc_port *tport = tipc_sk_port(sk);
1022         struct sk_buff *buf;
1023         struct tipc_msg *msg;
1024         unsigned int sz;
1025         int sz_to_copy;
1026         int sz_copied = 0;
1027         int needed;
1028         char __user *crs = m->msg_iov->iov_base;
1029         unsigned char *buf_crs;
1030         u32 err;
1031         int res = 0;
1032
1033         /* Catch invalid receive attempts */
1034
1035         if (m->msg_iovlen != 1)
1036                 return -EOPNOTSUPP;   /* Don't do multiple iovec entries yet */
1037
1038         if (unlikely(!buf_len))
1039                 return -EINVAL;
1040
1041         lock_sock(sk);
1042
1043         if (unlikely((sock->state == SS_UNCONNECTED) ||
1044                      (sock->state == SS_CONNECTING))) {
1045                 res = -ENOTCONN;
1046                 goto exit;
1047         }
1048
1049 restart:
1050
1051         /* Look for a message in receive queue; wait if necessary */
1052
1053         while (skb_queue_empty(&sk->sk_receive_queue)) {
1054                 if (sock->state == SS_DISCONNECTING) {
1055                         res = -ENOTCONN;
1056                         goto exit;
1057                 }
1058                 if (flags & MSG_DONTWAIT) {
1059                         res = -EWOULDBLOCK;
1060                         goto exit;
1061                 }
1062                 release_sock(sk);
1063                 res = wait_event_interruptible(*sk->sk_sleep,
1064                         (!skb_queue_empty(&sk->sk_receive_queue) ||
1065                          (sock->state == SS_DISCONNECTING)));
1066                 lock_sock(sk);
1067                 if (res)
1068                         goto exit;
1069         }
1070
1071         /* Look at first message in receive queue */
1072
1073         buf = skb_peek(&sk->sk_receive_queue);
1074         msg = buf_msg(buf);
1075         sz = msg_data_sz(msg);
1076         err = msg_errcode(msg);
1077
1078         /* Discard an empty non-errored message & try again */
1079
1080         if ((!sz) && (!err)) {
1081                 advance_rx_queue(sk);
1082                 goto restart;
1083         }
1084
1085         /* Optionally capture sender's address & ancillary data of first msg */
1086
1087         if (sz_copied == 0) {
1088                 set_orig_addr(m, msg);
1089                 res = anc_data_recv(m, msg, tport);
1090                 if (res)
1091                         goto exit;
1092         }
1093
1094         /* Capture message data (if valid) & compute return value (always) */
1095
1096         if (!err) {
1097                 buf_crs = (unsigned char *)(TIPC_SKB_CB(buf)->handle);
1098                 sz = (unsigned char *)msg + msg_size(msg) - buf_crs;
1099
1100                 needed = (buf_len - sz_copied);
1101                 sz_to_copy = (sz <= needed) ? sz : needed;
1102                 if (unlikely(copy_to_user(crs, buf_crs, sz_to_copy))) {
1103                         res = -EFAULT;
1104                         goto exit;
1105                 }
1106                 sz_copied += sz_to_copy;
1107
1108                 if (sz_to_copy < sz) {
1109                         if (!(flags & MSG_PEEK))
1110                                 TIPC_SKB_CB(buf)->handle = buf_crs + sz_to_copy;
1111                         goto exit;
1112                 }
1113
1114                 crs += sz_to_copy;
1115         } else {
1116                 if (sz_copied != 0)
1117                         goto exit; /* can't add error msg to valid data */
1118
1119                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1120                         res = 0;
1121                 else
1122                         res = -ECONNRESET;
1123         }
1124
1125         /* Consume received message (optional) */
1126
1127         if (likely(!(flags & MSG_PEEK))) {
1128                 if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1129                         tipc_acknowledge(tport->ref, tport->conn_unacked);
1130                 advance_rx_queue(sk);
1131         }
1132
1133         /* Loop around if more data is required */
1134
1135         if ((sz_copied < buf_len)    /* didn't get all requested data */
1136             && (!skb_queue_empty(&sock->sk->sk_receive_queue) ||
1137                 (flags & MSG_WAITALL))
1138                                      /* ... and more is ready or required */
1139             && (!(flags & MSG_PEEK)) /* ... and aren't just peeking at data */
1140             && (!err)                /* ... and haven't reached a FIN */
1141             )
1142                 goto restart;
1143
1144 exit:
1145         release_sock(sk);
1146         return sz_copied ? sz_copied : res;
1147 }
1148
1149 /**
1150  * rx_queue_full - determine if receive queue can accept another message
1151  * @msg: message to be added to queue
1152  * @queue_size: current size of queue
1153  * @base: nominal maximum size of queue
1154  *
1155  * Returns 1 if queue is unable to accept message, 0 otherwise
1156  */
1157
1158 static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1159 {
1160         u32 threshold;
1161         u32 imp = msg_importance(msg);
1162
1163         if (imp == TIPC_LOW_IMPORTANCE)
1164                 threshold = base;
1165         else if (imp == TIPC_MEDIUM_IMPORTANCE)
1166                 threshold = base * 2;
1167         else if (imp == TIPC_HIGH_IMPORTANCE)
1168                 threshold = base * 100;
1169         else
1170                 return 0;
1171
1172         if (msg_connected(msg))
1173                 threshold *= 4;
1174
1175         return (queue_size >= threshold);
1176 }
1177
1178 /**
1179  * filter_rcv - validate incoming message
1180  * @sk: socket
1181  * @buf: message
1182  *
1183  * Enqueues message on receive queue if acceptable; optionally handles
1184  * disconnect indication for a connected socket.
1185  *
1186  * Called with socket lock already taken; port lock may also be taken.
1187  *
1188  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1189  */
1190
1191 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1192 {
1193         struct socket *sock = sk->sk_socket;
1194         struct tipc_msg *msg = buf_msg(buf);
1195         u32 recv_q_len;
1196
1197         /* Reject message if it is wrong sort of message for socket */
1198
1199         /*
1200          * WOULD IT BE BETTER TO JUST DISCARD THESE MESSAGES INSTEAD?
1201          * "NO PORT" ISN'T REALLY THE RIGHT ERROR CODE, AND THERE MAY
1202          * BE SECURITY IMPLICATIONS INHERENT IN REJECTING INVALID TRAFFIC
1203          */
1204
1205         if (sock->state == SS_READY) {
1206                 if (msg_connected(msg)) {
1207                         msg_dbg(msg, "dispatch filter 1\n");
1208                         return TIPC_ERR_NO_PORT;
1209                 }
1210         } else {
1211                 if (msg_mcast(msg)) {
1212                         msg_dbg(msg, "dispatch filter 2\n");
1213                         return TIPC_ERR_NO_PORT;
1214                 }
1215                 if (sock->state == SS_CONNECTED) {
1216                         if (!msg_connected(msg)) {
1217                                 msg_dbg(msg, "dispatch filter 3\n");
1218                                 return TIPC_ERR_NO_PORT;
1219                         }
1220                 }
1221                 else if (sock->state == SS_CONNECTING) {
1222                         if (!msg_connected(msg) && (msg_errcode(msg) == 0)) {
1223                                 msg_dbg(msg, "dispatch filter 4\n");
1224                                 return TIPC_ERR_NO_PORT;
1225                         }
1226                 }
1227                 else if (sock->state == SS_LISTENING) {
1228                         if (msg_connected(msg) || msg_errcode(msg)) {
1229                                 msg_dbg(msg, "dispatch filter 5\n");
1230                                 return TIPC_ERR_NO_PORT;
1231                         }
1232                 }
1233                 else if (sock->state == SS_DISCONNECTING) {
1234                         msg_dbg(msg, "dispatch filter 6\n");
1235                         return TIPC_ERR_NO_PORT;
1236                 }
1237                 else /* (sock->state == SS_UNCONNECTED) */ {
1238                         if (msg_connected(msg) || msg_errcode(msg)) {
1239                                 msg_dbg(msg, "dispatch filter 7\n");
1240                                 return TIPC_ERR_NO_PORT;
1241                         }
1242                 }
1243         }
1244
1245         /* Reject message if there isn't room to queue it */
1246
1247         recv_q_len = (u32)atomic_read(&tipc_queue_size);
1248         if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1249                 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1250                         return TIPC_ERR_OVERLOAD;
1251         }
1252         recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1253         if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1254                 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1255                         return TIPC_ERR_OVERLOAD;
1256         }
1257
1258         /* Enqueue message (finally!) */
1259
1260         msg_dbg(msg, "<DISP<: ");
1261         TIPC_SKB_CB(buf)->handle = msg_data(msg);
1262         atomic_inc(&tipc_queue_size);
1263         __skb_queue_tail(&sk->sk_receive_queue, buf);
1264
1265         /* Initiate connection termination for an incoming 'FIN' */
1266
1267         if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1268                 sock->state = SS_DISCONNECTING;
1269                 tipc_disconnect_port(tipc_sk_port(sk));
1270         }
1271
1272         if (waitqueue_active(sk->sk_sleep))
1273                 wake_up_interruptible(sk->sk_sleep);
1274         return TIPC_OK;
1275 }
1276
1277 /**
1278  * backlog_rcv - handle incoming message from backlog queue
1279  * @sk: socket
1280  * @buf: message
1281  *
1282  * Caller must hold socket lock, but not port lock.
1283  *
1284  * Returns 0
1285  */
1286
1287 static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1288 {
1289         u32 res;
1290
1291         res = filter_rcv(sk, buf);
1292         if (res)
1293                 tipc_reject_msg(buf, res);
1294         return 0;
1295 }
1296
1297 /**
1298  * dispatch - handle incoming message
1299  * @tport: TIPC port that received message
1300  * @buf: message
1301  *
1302  * Called with port lock already taken.
1303  *
1304  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1305  */
1306
1307 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1308 {
1309         struct sock *sk = (struct sock *)tport->usr_handle;
1310         u32 res;
1311
1312         /*
1313          * Process message if socket is unlocked; otherwise add to backlog queue
1314          *
1315          * This code is based on sk_receive_skb(), but must be distinct from it
1316          * since a TIPC-specific filter/reject mechanism is utilized
1317          */
1318
1319         bh_lock_sock(sk);
1320         if (!sock_owned_by_user(sk)) {
1321                 res = filter_rcv(sk, buf);
1322         } else {
1323                 sk_add_backlog(sk, buf);
1324                 res = TIPC_OK;
1325         }
1326         bh_unlock_sock(sk);
1327
1328         return res;
1329 }
1330
1331 /**
1332  * wakeupdispatch - wake up port after congestion
1333  * @tport: port to wakeup
1334  *
1335  * Called with port lock already taken.
1336  */
1337
1338 static void wakeupdispatch(struct tipc_port *tport)
1339 {
1340         struct sock *sk = (struct sock *)tport->usr_handle;
1341
1342         if (waitqueue_active(sk->sk_sleep))
1343                 wake_up_interruptible(sk->sk_sleep);
1344 }
1345
1346 /**
1347  * connect - establish a connection to another TIPC port
1348  * @sock: socket structure
1349  * @dest: socket address for destination port
1350  * @destlen: size of socket address data structure
1351  * @flags: file-related flags associated with socket
1352  *
1353  * Returns 0 on success, errno otherwise
1354  */
1355
1356 static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1357                    int flags)
1358 {
1359         struct sock *sk = sock->sk;
1360         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1361         struct msghdr m = {NULL,};
1362         struct sk_buff *buf;
1363         struct tipc_msg *msg;
1364         int res;
1365
1366         lock_sock(sk);
1367
1368         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1369
1370         if (sock->state == SS_READY) {
1371                 res = -EOPNOTSUPP;
1372                 goto exit;
1373         }
1374
1375         /* For now, TIPC does not support the non-blocking form of connect() */
1376
1377         if (flags & O_NONBLOCK) {
1378                 res = -EWOULDBLOCK;
1379                 goto exit;
1380         }
1381
1382         /* Issue Posix-compliant error code if socket is in the wrong state */
1383
1384         if (sock->state == SS_LISTENING) {
1385                 res = -EOPNOTSUPP;
1386                 goto exit;
1387         }
1388         if (sock->state == SS_CONNECTING) {
1389                 res = -EALREADY;
1390                 goto exit;
1391         }
1392         if (sock->state != SS_UNCONNECTED) {
1393                 res = -EISCONN;
1394                 goto exit;
1395         }
1396
1397         /*
1398          * Reject connection attempt using multicast address
1399          *
1400          * Note: send_msg() validates the rest of the address fields,
1401          *       so there's no need to do it here
1402          */
1403
1404         if (dst->addrtype == TIPC_ADDR_MCAST) {
1405                 res = -EINVAL;
1406                 goto exit;
1407         }
1408
1409         /* Reject any messages already in receive queue (very unlikely) */
1410
1411         reject_rx_queue(sk);
1412
1413         /* Send a 'SYN-' to destination */
1414
1415         m.msg_name = dest;
1416         m.msg_namelen = destlen;
1417         res = send_msg(NULL, sock, &m, 0);
1418         if (res < 0) {
1419                 goto exit;
1420         }
1421
1422         /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1423
1424         release_sock(sk);
1425         res = wait_event_interruptible_timeout(*sk->sk_sleep,
1426                         (!skb_queue_empty(&sk->sk_receive_queue) ||
1427                         (sock->state != SS_CONNECTING)),
1428                         sk->sk_rcvtimeo);
1429         lock_sock(sk);
1430
1431         if (res > 0) {
1432                 buf = skb_peek(&sk->sk_receive_queue);
1433                 if (buf != NULL) {
1434                         msg = buf_msg(buf);
1435                         res = auto_connect(sock, msg);
1436                         if (!res) {
1437                                 if (!msg_data_sz(msg))
1438                                         advance_rx_queue(sk);
1439                         }
1440                 } else {
1441                         if (sock->state == SS_CONNECTED) {
1442                                 res = -EISCONN;
1443                         } else {
1444                                 res = -ECONNREFUSED;
1445                         }
1446                 }
1447         } else {
1448                 if (res == 0)
1449                         res = -ETIMEDOUT;
1450                 else
1451                         ; /* leave "res" unchanged */
1452                 sock->state = SS_DISCONNECTING;
1453         }
1454
1455 exit:
1456         release_sock(sk);
1457         return res;
1458 }
1459
1460 /**
1461  * listen - allow socket to listen for incoming connections
1462  * @sock: socket structure
1463  * @len: (unused)
1464  *
1465  * Returns 0 on success, errno otherwise
1466  */
1467
1468 static int listen(struct socket *sock, int len)
1469 {
1470         struct sock *sk = sock->sk;
1471         int res;
1472
1473         lock_sock(sk);
1474
1475         if (sock->state == SS_READY)
1476                 res = -EOPNOTSUPP;
1477         else if (sock->state != SS_UNCONNECTED)
1478                 res = -EINVAL;
1479         else {
1480                 sock->state = SS_LISTENING;
1481                 res = 0;
1482         }
1483
1484         release_sock(sk);
1485         return res;
1486 }
1487
1488 /**
1489  * accept - wait for connection request
1490  * @sock: listening socket
1491  * @newsock: new socket that is to be connected
1492  * @flags: file-related flags associated with socket
1493  *
1494  * Returns 0 on success, errno otherwise
1495  */
1496
1497 static int accept(struct socket *sock, struct socket *new_sock, int flags)
1498 {
1499         struct sock *sk = sock->sk;
1500         struct sk_buff *buf;
1501         int res;
1502
1503         lock_sock(sk);
1504
1505         if (sock->state == SS_READY) {
1506                 res = -EOPNOTSUPP;
1507                 goto exit;
1508         }
1509         if (sock->state != SS_LISTENING) {
1510                 res = -EINVAL;
1511                 goto exit;
1512         }
1513
1514         while (skb_queue_empty(&sk->sk_receive_queue)) {
1515                 if (flags & O_NONBLOCK) {
1516                         res = -EWOULDBLOCK;
1517                         goto exit;
1518                 }
1519                 release_sock(sk);
1520                 res = wait_event_interruptible(*sk->sk_sleep,
1521                                 (!skb_queue_empty(&sk->sk_receive_queue)));
1522                 lock_sock(sk);
1523                 if (res)
1524                         goto exit;
1525         }
1526
1527         buf = skb_peek(&sk->sk_receive_queue);
1528
1529         res = tipc_create(sock_net(sock->sk), new_sock, 0);
1530         if (!res) {
1531                 struct sock *new_sk = new_sock->sk;
1532                 struct tipc_port *new_tport = tipc_sk_port(new_sk);
1533                 u32 new_ref = new_tport->ref;
1534                 struct tipc_portid id;
1535                 struct tipc_msg *msg = buf_msg(buf);
1536
1537                 lock_sock(new_sk);
1538
1539                 /*
1540                  * Reject any stray messages received by new socket
1541                  * before the socket lock was taken (very, very unlikely)
1542                  */
1543
1544                 reject_rx_queue(new_sk);
1545
1546                 /* Connect new socket to it's peer */
1547
1548                 id.ref = msg_origport(msg);
1549                 id.node = msg_orignode(msg);
1550                 tipc_connect2port(new_ref, &id);
1551                 new_sock->state = SS_CONNECTED;
1552
1553                 tipc_set_portimportance(new_ref, msg_importance(msg));
1554                 if (msg_named(msg)) {
1555                         new_tport->conn_type = msg_nametype(msg);
1556                         new_tport->conn_instance = msg_nameinst(msg);
1557                 }
1558
1559                 /*
1560                  * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1561                  * Respond to 'SYN+' by queuing it on new socket.
1562                  */
1563
1564                 msg_dbg(msg,"<ACC<: ");
1565                 if (!msg_data_sz(msg)) {
1566                         struct msghdr m = {NULL,};
1567
1568                         advance_rx_queue(sk);
1569                         send_packet(NULL, new_sock, &m, 0);
1570                 } else {
1571                         __skb_dequeue(&sk->sk_receive_queue);
1572                         __skb_queue_head(&new_sk->sk_receive_queue, buf);
1573                 }
1574                 release_sock(new_sk);
1575         }
1576 exit:
1577         release_sock(sk);
1578         return res;
1579 }
1580
1581 /**
1582  * shutdown - shutdown socket connection
1583  * @sock: socket structure
1584  * @how: direction to close (must be SHUT_RDWR)
1585  *
1586  * Terminates connection (if necessary), then purges socket's receive queue.
1587  *
1588  * Returns 0 on success, errno otherwise
1589  */
1590
1591 static int shutdown(struct socket *sock, int how)
1592 {
1593         struct sock *sk = sock->sk;
1594         struct tipc_port *tport = tipc_sk_port(sk);
1595         struct sk_buff *buf;
1596         int res;
1597
1598         if (how != SHUT_RDWR)
1599                 return -EINVAL;
1600
1601         lock_sock(sk);
1602
1603         switch (sock->state) {
1604         case SS_CONNECTING:
1605         case SS_CONNECTED:
1606
1607                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1608 restart:
1609                 buf = __skb_dequeue(&sk->sk_receive_queue);
1610                 if (buf) {
1611                         atomic_dec(&tipc_queue_size);
1612                         if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) {
1613                                 buf_discard(buf);
1614                                 goto restart;
1615                         }
1616                         tipc_disconnect(tport->ref);
1617                         tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1618                 } else {
1619                         tipc_shutdown(tport->ref);
1620                 }
1621
1622                 sock->state = SS_DISCONNECTING;
1623
1624                 /* fall through */
1625
1626         case SS_DISCONNECTING:
1627
1628                 /* Discard any unreceived messages; wake up sleeping tasks */
1629
1630                 discard_rx_queue(sk);
1631                 if (waitqueue_active(sk->sk_sleep))
1632                         wake_up_interruptible(sk->sk_sleep);
1633                 res = 0;
1634                 break;
1635
1636         default:
1637                 res = -ENOTCONN;
1638         }
1639
1640         release_sock(sk);
1641         return res;
1642 }
1643
1644 /**
1645  * setsockopt - set socket option
1646  * @sock: socket structure
1647  * @lvl: option level
1648  * @opt: option identifier
1649  * @ov: pointer to new option value
1650  * @ol: length of option value
1651  *
1652  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1653  * (to ease compatibility).
1654  *
1655  * Returns 0 on success, errno otherwise
1656  */
1657
1658 static int setsockopt(struct socket *sock,
1659                       int lvl, int opt, char __user *ov, int ol)
1660 {
1661         struct sock *sk = sock->sk;
1662         struct tipc_port *tport = tipc_sk_port(sk);
1663         u32 value;
1664         int res;
1665
1666         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1667                 return 0;
1668         if (lvl != SOL_TIPC)
1669                 return -ENOPROTOOPT;
1670         if (ol < sizeof(value))
1671                 return -EINVAL;
1672         if ((res = get_user(value, (u32 __user *)ov)))
1673                 return res;
1674
1675         lock_sock(sk);
1676
1677         switch (opt) {
1678         case TIPC_IMPORTANCE:
1679                 res = tipc_set_portimportance(tport->ref, value);
1680                 break;
1681         case TIPC_SRC_DROPPABLE:
1682                 if (sock->type != SOCK_STREAM)
1683                         res = tipc_set_portunreliable(tport->ref, value);
1684                 else
1685                         res = -ENOPROTOOPT;
1686                 break;
1687         case TIPC_DEST_DROPPABLE:
1688                 res = tipc_set_portunreturnable(tport->ref, value);
1689                 break;
1690         case TIPC_CONN_TIMEOUT:
1691                 sk->sk_rcvtimeo = msecs_to_jiffies(value);
1692                 /* no need to set "res", since already 0 at this point */
1693                 break;
1694         default:
1695                 res = -EINVAL;
1696         }
1697
1698         release_sock(sk);
1699
1700         return res;
1701 }
1702
1703 /**
1704  * getsockopt - get socket option
1705  * @sock: socket structure
1706  * @lvl: option level
1707  * @opt: option identifier
1708  * @ov: receptacle for option value
1709  * @ol: receptacle for length of option value
1710  *
1711  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1712  * (to ease compatibility).
1713  *
1714  * Returns 0 on success, errno otherwise
1715  */
1716
1717 static int getsockopt(struct socket *sock,
1718                       int lvl, int opt, char __user *ov, int __user *ol)
1719 {
1720         struct sock *sk = sock->sk;
1721         struct tipc_port *tport = tipc_sk_port(sk);
1722         int len;
1723         u32 value;
1724         int res;
1725
1726         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1727                 return put_user(0, ol);
1728         if (lvl != SOL_TIPC)
1729                 return -ENOPROTOOPT;
1730         if ((res = get_user(len, ol)))
1731                 return res;
1732
1733         lock_sock(sk);
1734
1735         switch (opt) {
1736         case TIPC_IMPORTANCE:
1737                 res = tipc_portimportance(tport->ref, &value);
1738                 break;
1739         case TIPC_SRC_DROPPABLE:
1740                 res = tipc_portunreliable(tport->ref, &value);
1741                 break;
1742         case TIPC_DEST_DROPPABLE:
1743                 res = tipc_portunreturnable(tport->ref, &value);
1744                 break;
1745         case TIPC_CONN_TIMEOUT:
1746                 value = jiffies_to_msecs(sk->sk_rcvtimeo);
1747                 /* no need to set "res", since already 0 at this point */
1748                 break;
1749         default:
1750                 res = -EINVAL;
1751         }
1752
1753         release_sock(sk);
1754
1755         if (res) {
1756                 /* "get" failed */
1757         }
1758         else if (len < sizeof(value)) {
1759                 res = -EINVAL;
1760         }
1761         else if (copy_to_user(ov, &value, sizeof(value))) {
1762                 res = -EFAULT;
1763         }
1764         else {
1765                 res = put_user(sizeof(value), ol);
1766         }
1767
1768         return res;
1769 }
1770
1771 /**
1772  * Protocol switches for the various types of TIPC sockets
1773  */
1774
1775 static const struct proto_ops msg_ops = {
1776         .owner          = THIS_MODULE,
1777         .family         = AF_TIPC,
1778         .release        = release,
1779         .bind           = bind,
1780         .connect        = connect,
1781         .socketpair     = sock_no_socketpair,
1782         .accept         = accept,
1783         .getname        = get_name,
1784         .poll           = poll,
1785         .ioctl          = sock_no_ioctl,
1786         .listen         = listen,
1787         .shutdown       = shutdown,
1788         .setsockopt     = setsockopt,
1789         .getsockopt     = getsockopt,
1790         .sendmsg        = send_msg,
1791         .recvmsg        = recv_msg,
1792         .mmap           = sock_no_mmap,
1793         .sendpage       = sock_no_sendpage
1794 };
1795
1796 static const struct proto_ops packet_ops = {
1797         .owner          = THIS_MODULE,
1798         .family         = AF_TIPC,
1799         .release        = release,
1800         .bind           = bind,
1801         .connect        = connect,
1802         .socketpair     = sock_no_socketpair,
1803         .accept         = accept,
1804         .getname        = get_name,
1805         .poll           = poll,
1806         .ioctl          = sock_no_ioctl,
1807         .listen         = listen,
1808         .shutdown       = shutdown,
1809         .setsockopt     = setsockopt,
1810         .getsockopt     = getsockopt,
1811         .sendmsg        = send_packet,
1812         .recvmsg        = recv_msg,
1813         .mmap           = sock_no_mmap,
1814         .sendpage       = sock_no_sendpage
1815 };
1816
1817 static const struct proto_ops stream_ops = {
1818         .owner          = THIS_MODULE,
1819         .family         = AF_TIPC,
1820         .release        = release,
1821         .bind           = bind,
1822         .connect        = connect,
1823         .socketpair     = sock_no_socketpair,
1824         .accept         = accept,
1825         .getname        = get_name,
1826         .poll           = poll,
1827         .ioctl          = sock_no_ioctl,
1828         .listen         = listen,
1829         .shutdown       = shutdown,
1830         .setsockopt     = setsockopt,
1831         .getsockopt     = getsockopt,
1832         .sendmsg        = send_stream,
1833         .recvmsg        = recv_stream,
1834         .mmap           = sock_no_mmap,
1835         .sendpage       = sock_no_sendpage
1836 };
1837
1838 static const struct net_proto_family tipc_family_ops = {
1839         .owner          = THIS_MODULE,
1840         .family         = AF_TIPC,
1841         .create         = tipc_create
1842 };
1843
1844 static struct proto tipc_proto = {
1845         .name           = "TIPC",
1846         .owner          = THIS_MODULE,
1847         .obj_size       = sizeof(struct tipc_sock)
1848 };
1849
1850 /**
1851  * tipc_socket_init - initialize TIPC socket interface
1852  *
1853  * Returns 0 on success, errno otherwise
1854  */
1855 int tipc_socket_init(void)
1856 {
1857         int res;
1858
1859         res = proto_register(&tipc_proto, 1);
1860         if (res) {
1861                 err("Failed to register TIPC protocol type\n");
1862                 goto out;
1863         }
1864
1865         res = sock_register(&tipc_family_ops);
1866         if (res) {
1867                 err("Failed to register TIPC socket type\n");
1868                 proto_unregister(&tipc_proto);
1869                 goto out;
1870         }
1871
1872         sockets_enabled = 1;
1873  out:
1874         return res;
1875 }
1876
1877 /**
1878  * tipc_socket_stop - stop TIPC socket interface
1879  */
1880
1881 void tipc_socket_stop(void)
1882 {
1883         if (!sockets_enabled)
1884                 return;
1885
1886         sockets_enabled = 0;
1887         sock_unregister(tipc_family_ops.family);
1888         proto_unregister(&tipc_proto);
1889 }
1890