]> Pileus Git - ~andy/linux/blob - net/tipc/port.c
net: sctp: minor: remove variable in sctp_init_sock
[~andy/linux] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41
42 /* Connection management: */
43 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
44 #define CONFIRMED 0
45 #define PROBING 1
46
47 #define MAX_REJECT_SIZE 1024
48
49 static struct sk_buff *msg_queue_head;
50 static struct sk_buff *msg_queue_tail;
51
52 DEFINE_SPINLOCK(tipc_port_list_lock);
53 static DEFINE_SPINLOCK(queue_lock);
54
55 static LIST_HEAD(ports);
56 static void port_handle_node_down(unsigned long ref);
57 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
58 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
59 static void port_timeout(unsigned long ref);
60
61
62 static u32 port_peernode(struct tipc_port *p_ptr)
63 {
64         return msg_destnode(&p_ptr->phdr);
65 }
66
67 static u32 port_peerport(struct tipc_port *p_ptr)
68 {
69         return msg_destport(&p_ptr->phdr);
70 }
71
72 /**
73  * tipc_port_peer_msg - verify message was sent by connected port's peer
74  *
75  * Handles cases where the node's network address has changed from
76  * the default of <0.0.0> to its configured setting.
77  */
78 int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
79 {
80         u32 peernode;
81         u32 orignode;
82
83         if (msg_origport(msg) != port_peerport(p_ptr))
84                 return 0;
85
86         orignode = msg_orignode(msg);
87         peernode = port_peernode(p_ptr);
88         return (orignode == peernode) ||
89                 (!orignode && (peernode == tipc_own_addr)) ||
90                 (!peernode && (orignode == tipc_own_addr));
91 }
92
93 /**
94  * tipc_multicast - send a multicast message to local and remote destinations
95  */
96 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
97                    u32 num_sect, struct iovec const *msg_sect,
98                    unsigned int total_len)
99 {
100         struct tipc_msg *hdr;
101         struct sk_buff *buf;
102         struct sk_buff *ibuf = NULL;
103         struct tipc_port_list dports = {0, NULL, };
104         struct tipc_port *oport = tipc_port_deref(ref);
105         int ext_targets;
106         int res;
107
108         if (unlikely(!oport))
109                 return -EINVAL;
110
111         /* Create multicast message */
112         hdr = &oport->phdr;
113         msg_set_type(hdr, TIPC_MCAST_MSG);
114         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
115         msg_set_destport(hdr, 0);
116         msg_set_destnode(hdr, 0);
117         msg_set_nametype(hdr, seq->type);
118         msg_set_namelower(hdr, seq->lower);
119         msg_set_nameupper(hdr, seq->upper);
120         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
121         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
122                         !oport->user_port, &buf);
123         if (unlikely(!buf))
124                 return res;
125
126         /* Figure out where to send multicast message */
127         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
128                                                 TIPC_NODE_SCOPE, &dports);
129
130         /* Send message to destinations (duplicate it only if necessary) */
131         if (ext_targets) {
132                 if (dports.count != 0) {
133                         ibuf = skb_copy(buf, GFP_ATOMIC);
134                         if (ibuf == NULL) {
135                                 tipc_port_list_free(&dports);
136                                 kfree_skb(buf);
137                                 return -ENOMEM;
138                         }
139                 }
140                 res = tipc_bclink_send_msg(buf);
141                 if ((res < 0) && (dports.count != 0))
142                         kfree_skb(ibuf);
143         } else {
144                 ibuf = buf;
145         }
146
147         if (res >= 0) {
148                 if (ibuf)
149                         tipc_port_recv_mcast(ibuf, &dports);
150         } else {
151                 tipc_port_list_free(&dports);
152         }
153         return res;
154 }
155
156 /**
157  * tipc_port_recv_mcast - deliver multicast message to all destination ports
158  *
159  * If there is no port list, perform a lookup to create one
160  */
161 void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
162 {
163         struct tipc_msg *msg;
164         struct tipc_port_list dports = {0, NULL, };
165         struct tipc_port_list *item = dp;
166         int cnt = 0;
167
168         msg = buf_msg(buf);
169
170         /* Create destination port list, if one wasn't supplied */
171         if (dp == NULL) {
172                 tipc_nametbl_mc_translate(msg_nametype(msg),
173                                      msg_namelower(msg),
174                                      msg_nameupper(msg),
175                                      TIPC_CLUSTER_SCOPE,
176                                      &dports);
177                 item = dp = &dports;
178         }
179
180         /* Deliver a copy of message to each destination port */
181         if (dp->count != 0) {
182                 msg_set_destnode(msg, tipc_own_addr);
183                 if (dp->count == 1) {
184                         msg_set_destport(msg, dp->ports[0]);
185                         tipc_port_recv_msg(buf);
186                         tipc_port_list_free(dp);
187                         return;
188                 }
189                 for (; cnt < dp->count; cnt++) {
190                         int index = cnt % PLSIZE;
191                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
192
193                         if (b == NULL) {
194                                 pr_warn("Unable to deliver multicast message(s)\n");
195                                 goto exit;
196                         }
197                         if ((index == 0) && (cnt != 0))
198                                 item = item->next;
199                         msg_set_destport(buf_msg(b), item->ports[index]);
200                         tipc_port_recv_msg(b);
201                 }
202         }
203 exit:
204         kfree_skb(buf);
205         tipc_port_list_free(dp);
206 }
207
208 /**
209  * tipc_createport_raw - create a generic TIPC port
210  *
211  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
212  */
213 struct tipc_port *tipc_createport_raw(void *usr_handle,
214                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
215                         void (*wakeup)(struct tipc_port *),
216                         const u32 importance)
217 {
218         struct tipc_port *p_ptr;
219         struct tipc_msg *msg;
220         u32 ref;
221
222         p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
223         if (!p_ptr) {
224                 pr_warn("Port creation failed, no memory\n");
225                 return NULL;
226         }
227         ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
228         if (!ref) {
229                 pr_warn("Port creation failed, ref. table exhausted\n");
230                 kfree(p_ptr);
231                 return NULL;
232         }
233
234         p_ptr->usr_handle = usr_handle;
235         p_ptr->max_pkt = MAX_PKT_DEFAULT;
236         p_ptr->ref = ref;
237         INIT_LIST_HEAD(&p_ptr->wait_list);
238         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
239         p_ptr->dispatcher = dispatcher;
240         p_ptr->wakeup = wakeup;
241         p_ptr->user_port = NULL;
242         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
243         INIT_LIST_HEAD(&p_ptr->publications);
244         INIT_LIST_HEAD(&p_ptr->port_list);
245
246         /*
247          * Must hold port list lock while initializing message header template
248          * to ensure a change to node's own network address doesn't result
249          * in template containing out-dated network address information
250          */
251         spin_lock_bh(&tipc_port_list_lock);
252         msg = &p_ptr->phdr;
253         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
254         msg_set_origport(msg, ref);
255         list_add_tail(&p_ptr->port_list, &ports);
256         spin_unlock_bh(&tipc_port_list_lock);
257         return p_ptr;
258 }
259
260 int tipc_deleteport(u32 ref)
261 {
262         struct tipc_port *p_ptr;
263         struct sk_buff *buf = NULL;
264
265         tipc_withdraw(ref, 0, NULL);
266         p_ptr = tipc_port_lock(ref);
267         if (!p_ptr)
268                 return -EINVAL;
269
270         tipc_ref_discard(ref);
271         tipc_port_unlock(p_ptr);
272
273         k_cancel_timer(&p_ptr->timer);
274         if (p_ptr->connected) {
275                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
276                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
277         }
278         kfree(p_ptr->user_port);
279
280         spin_lock_bh(&tipc_port_list_lock);
281         list_del(&p_ptr->port_list);
282         list_del(&p_ptr->wait_list);
283         spin_unlock_bh(&tipc_port_list_lock);
284         k_term_timer(&p_ptr->timer);
285         kfree(p_ptr);
286         tipc_net_route_msg(buf);
287         return 0;
288 }
289
290 static int port_unreliable(struct tipc_port *p_ptr)
291 {
292         return msg_src_droppable(&p_ptr->phdr);
293 }
294
295 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
296 {
297         struct tipc_port *p_ptr;
298
299         p_ptr = tipc_port_lock(ref);
300         if (!p_ptr)
301                 return -EINVAL;
302         *isunreliable = port_unreliable(p_ptr);
303         tipc_port_unlock(p_ptr);
304         return 0;
305 }
306
307 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
308 {
309         struct tipc_port *p_ptr;
310
311         p_ptr = tipc_port_lock(ref);
312         if (!p_ptr)
313                 return -EINVAL;
314         msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
315         tipc_port_unlock(p_ptr);
316         return 0;
317 }
318
319 static int port_unreturnable(struct tipc_port *p_ptr)
320 {
321         return msg_dest_droppable(&p_ptr->phdr);
322 }
323
324 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
325 {
326         struct tipc_port *p_ptr;
327
328         p_ptr = tipc_port_lock(ref);
329         if (!p_ptr)
330                 return -EINVAL;
331         *isunrejectable = port_unreturnable(p_ptr);
332         tipc_port_unlock(p_ptr);
333         return 0;
334 }
335
336 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
337 {
338         struct tipc_port *p_ptr;
339
340         p_ptr = tipc_port_lock(ref);
341         if (!p_ptr)
342                 return -EINVAL;
343         msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
344         tipc_port_unlock(p_ptr);
345         return 0;
346 }
347
348 /*
349  * port_build_proto_msg(): create connection protocol message for port
350  *
351  * On entry the port must be locked and connected.
352  */
353 static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
354                                             u32 type, u32 ack)
355 {
356         struct sk_buff *buf;
357         struct tipc_msg *msg;
358
359         buf = tipc_buf_acquire(INT_H_SIZE);
360         if (buf) {
361                 msg = buf_msg(buf);
362                 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
363                               port_peernode(p_ptr));
364                 msg_set_destport(msg, port_peerport(p_ptr));
365                 msg_set_origport(msg, p_ptr->ref);
366                 msg_set_msgcnt(msg, ack);
367         }
368         return buf;
369 }
370
371 int tipc_reject_msg(struct sk_buff *buf, u32 err)
372 {
373         struct tipc_msg *msg = buf_msg(buf);
374         struct sk_buff *rbuf;
375         struct tipc_msg *rmsg;
376         int hdr_sz;
377         u32 imp;
378         u32 data_sz = msg_data_sz(msg);
379         u32 src_node;
380         u32 rmsg_sz;
381
382         /* discard rejected message if it shouldn't be returned to sender */
383         if (WARN(!msg_isdata(msg),
384                  "attempt to reject message with user=%u", msg_user(msg))) {
385                 dump_stack();
386                 goto exit;
387         }
388         if (msg_errcode(msg) || msg_dest_droppable(msg))
389                 goto exit;
390
391         /*
392          * construct returned message by copying rejected message header and
393          * data (or subset), then updating header fields that need adjusting
394          */
395         hdr_sz = msg_hdr_sz(msg);
396         rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
397
398         rbuf = tipc_buf_acquire(rmsg_sz);
399         if (rbuf == NULL)
400                 goto exit;
401
402         rmsg = buf_msg(rbuf);
403         skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
404
405         if (msg_connected(rmsg)) {
406                 imp = msg_importance(rmsg);
407                 if (imp < TIPC_CRITICAL_IMPORTANCE)
408                         msg_set_importance(rmsg, ++imp);
409         }
410         msg_set_non_seq(rmsg, 0);
411         msg_set_size(rmsg, rmsg_sz);
412         msg_set_errcode(rmsg, err);
413         msg_set_prevnode(rmsg, tipc_own_addr);
414         msg_swap_words(rmsg, 4, 5);
415         if (!msg_short(rmsg))
416                 msg_swap_words(rmsg, 6, 7);
417
418         /* send self-abort message when rejecting on a connected port */
419         if (msg_connected(msg)) {
420                 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
421
422                 if (p_ptr) {
423                         struct sk_buff *abuf = NULL;
424
425                         if (p_ptr->connected)
426                                 abuf = port_build_self_abort_msg(p_ptr, err);
427                         tipc_port_unlock(p_ptr);
428                         tipc_net_route_msg(abuf);
429                 }
430         }
431
432         /* send returned message & dispose of rejected message */
433         src_node = msg_prevnode(msg);
434         if (in_own_node(src_node))
435                 tipc_port_recv_msg(rbuf);
436         else
437                 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
438 exit:
439         kfree_skb(buf);
440         return data_sz;
441 }
442
443 int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
444                               struct iovec const *msg_sect, u32 num_sect,
445                               unsigned int total_len, int err)
446 {
447         struct sk_buff *buf;
448         int res;
449
450         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
451                         !p_ptr->user_port, &buf);
452         if (!buf)
453                 return res;
454
455         return tipc_reject_msg(buf, err);
456 }
457
458 static void port_timeout(unsigned long ref)
459 {
460         struct tipc_port *p_ptr = tipc_port_lock(ref);
461         struct sk_buff *buf = NULL;
462
463         if (!p_ptr)
464                 return;
465
466         if (!p_ptr->connected) {
467                 tipc_port_unlock(p_ptr);
468                 return;
469         }
470
471         /* Last probe answered ? */
472         if (p_ptr->probing_state == PROBING) {
473                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
474         } else {
475                 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
476                 p_ptr->probing_state = PROBING;
477                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
478         }
479         tipc_port_unlock(p_ptr);
480         tipc_net_route_msg(buf);
481 }
482
483
484 static void port_handle_node_down(unsigned long ref)
485 {
486         struct tipc_port *p_ptr = tipc_port_lock(ref);
487         struct sk_buff *buf = NULL;
488
489         if (!p_ptr)
490                 return;
491         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
492         tipc_port_unlock(p_ptr);
493         tipc_net_route_msg(buf);
494 }
495
496
497 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
498 {
499         struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
500
501         if (buf) {
502                 struct tipc_msg *msg = buf_msg(buf);
503                 msg_swap_words(msg, 4, 5);
504                 msg_swap_words(msg, 6, 7);
505         }
506         return buf;
507 }
508
509
510 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
511 {
512         struct sk_buff *buf;
513         struct tipc_msg *msg;
514         u32 imp;
515
516         if (!p_ptr->connected)
517                 return NULL;
518
519         buf = tipc_buf_acquire(BASIC_H_SIZE);
520         if (buf) {
521                 msg = buf_msg(buf);
522                 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
523                 msg_set_hdr_sz(msg, BASIC_H_SIZE);
524                 msg_set_size(msg, BASIC_H_SIZE);
525                 imp = msg_importance(msg);
526                 if (imp < TIPC_CRITICAL_IMPORTANCE)
527                         msg_set_importance(msg, ++imp);
528                 msg_set_errcode(msg, err);
529         }
530         return buf;
531 }
532
533 void tipc_port_recv_proto_msg(struct sk_buff *buf)
534 {
535         struct tipc_msg *msg = buf_msg(buf);
536         struct tipc_port *p_ptr;
537         struct sk_buff *r_buf = NULL;
538         u32 destport = msg_destport(msg);
539         int wakeable;
540
541         /* Validate connection */
542         p_ptr = tipc_port_lock(destport);
543         if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
544                 r_buf = tipc_buf_acquire(BASIC_H_SIZE);
545                 if (r_buf) {
546                         msg = buf_msg(r_buf);
547                         tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
548                                       BASIC_H_SIZE, msg_orignode(msg));
549                         msg_set_errcode(msg, TIPC_ERR_NO_PORT);
550                         msg_set_origport(msg, destport);
551                         msg_set_destport(msg, msg_origport(msg));
552                 }
553                 if (p_ptr)
554                         tipc_port_unlock(p_ptr);
555                 goto exit;
556         }
557
558         /* Process protocol message sent by peer */
559         switch (msg_type(msg)) {
560         case CONN_ACK:
561                 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
562                         p_ptr->wakeup;
563                 p_ptr->acked += msg_msgcnt(msg);
564                 if (!tipc_port_congested(p_ptr)) {
565                         p_ptr->congested = 0;
566                         if (wakeable)
567                                 p_ptr->wakeup(p_ptr);
568                 }
569                 break;
570         case CONN_PROBE:
571                 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
572                 break;
573         default:
574                 /* CONN_PROBE_REPLY or unrecognized - no action required */
575                 break;
576         }
577         p_ptr->probing_state = CONFIRMED;
578         tipc_port_unlock(p_ptr);
579 exit:
580         tipc_net_route_msg(r_buf);
581         kfree_skb(buf);
582 }
583
584 static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
585 {
586         struct publication *publ;
587         int ret;
588
589         if (full_id)
590                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
591                                     tipc_zone(tipc_own_addr),
592                                     tipc_cluster(tipc_own_addr),
593                                     tipc_node(tipc_own_addr), p_ptr->ref);
594         else
595                 ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref);
596
597         if (p_ptr->connected) {
598                 u32 dport = port_peerport(p_ptr);
599                 u32 destnode = port_peernode(p_ptr);
600
601                 ret += tipc_snprintf(buf + ret, len - ret,
602                                      " connected to <%u.%u.%u:%u>",
603                                      tipc_zone(destnode),
604                                      tipc_cluster(destnode),
605                                      tipc_node(destnode), dport);
606                 if (p_ptr->conn_type != 0)
607                         ret += tipc_snprintf(buf + ret, len - ret,
608                                              " via {%u,%u}", p_ptr->conn_type,
609                                              p_ptr->conn_instance);
610         } else if (p_ptr->published) {
611                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
612                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
613                         if (publ->lower == publ->upper)
614                                 ret += tipc_snprintf(buf + ret, len - ret,
615                                                      " {%u,%u}", publ->type,
616                                                      publ->lower);
617                         else
618                                 ret += tipc_snprintf(buf + ret, len - ret,
619                                                      " {%u,%u,%u}", publ->type,
620                                                      publ->lower, publ->upper);
621                 }
622         }
623         ret += tipc_snprintf(buf + ret, len - ret, "\n");
624         return ret;
625 }
626
627 struct sk_buff *tipc_port_get_ports(void)
628 {
629         struct sk_buff *buf;
630         struct tlv_desc *rep_tlv;
631         char *pb;
632         int pb_len;
633         struct tipc_port *p_ptr;
634         int str_len = 0;
635
636         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
637         if (!buf)
638                 return NULL;
639         rep_tlv = (struct tlv_desc *)buf->data;
640         pb = TLV_DATA(rep_tlv);
641         pb_len = ULTRA_STRING_MAX_LEN;
642
643         spin_lock_bh(&tipc_port_list_lock);
644         list_for_each_entry(p_ptr, &ports, port_list) {
645                 spin_lock_bh(p_ptr->lock);
646                 str_len += port_print(p_ptr, pb, pb_len, 0);
647                 spin_unlock_bh(p_ptr->lock);
648         }
649         spin_unlock_bh(&tipc_port_list_lock);
650         str_len += 1;   /* for "\0" */
651         skb_put(buf, TLV_SPACE(str_len));
652         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
653
654         return buf;
655 }
656
657 void tipc_port_reinit(void)
658 {
659         struct tipc_port *p_ptr;
660         struct tipc_msg *msg;
661
662         spin_lock_bh(&tipc_port_list_lock);
663         list_for_each_entry(p_ptr, &ports, port_list) {
664                 msg = &p_ptr->phdr;
665                 msg_set_prevnode(msg, tipc_own_addr);
666                 msg_set_orignode(msg, tipc_own_addr);
667         }
668         spin_unlock_bh(&tipc_port_list_lock);
669 }
670
671
672 /*
673  *  port_dispatcher_sigh(): Signal handler for messages destinated
674  *                          to the tipc_port interface.
675  */
676 static void port_dispatcher_sigh(void *dummy)
677 {
678         struct sk_buff *buf;
679
680         spin_lock_bh(&queue_lock);
681         buf = msg_queue_head;
682         msg_queue_head = NULL;
683         spin_unlock_bh(&queue_lock);
684
685         while (buf) {
686                 struct tipc_port *p_ptr;
687                 struct user_port *up_ptr;
688                 struct tipc_portid orig;
689                 struct tipc_name_seq dseq;
690                 void *usr_handle;
691                 int connected;
692                 int peer_invalid;
693                 int published;
694                 u32 message_type;
695
696                 struct sk_buff *next = buf->next;
697                 struct tipc_msg *msg = buf_msg(buf);
698                 u32 dref = msg_destport(msg);
699
700                 message_type = msg_type(msg);
701                 if (message_type > TIPC_DIRECT_MSG)
702                         goto reject;    /* Unsupported message type */
703
704                 p_ptr = tipc_port_lock(dref);
705                 if (!p_ptr)
706                         goto reject;    /* Port deleted while msg in queue */
707
708                 orig.ref = msg_origport(msg);
709                 orig.node = msg_orignode(msg);
710                 up_ptr = p_ptr->user_port;
711                 usr_handle = up_ptr->usr_handle;
712                 connected = p_ptr->connected;
713                 peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg);
714                 published = p_ptr->published;
715
716                 if (unlikely(msg_errcode(msg)))
717                         goto err;
718
719                 switch (message_type) {
720
721                 case TIPC_CONN_MSG:{
722                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
723                                 u32 dsz;
724
725                                 tipc_port_unlock(p_ptr);
726                                 if (unlikely(!cb))
727                                         goto reject;
728                                 if (unlikely(!connected)) {
729                                         if (tipc_connect(dref, &orig))
730                                                 goto reject;
731                                 } else if (peer_invalid)
732                                         goto reject;
733                                 dsz = msg_data_sz(msg);
734                                 if (unlikely(dsz &&
735                                              (++p_ptr->conn_unacked >=
736                                               TIPC_FLOW_CONTROL_WIN)))
737                                         tipc_acknowledge(dref,
738                                                          p_ptr->conn_unacked);
739                                 skb_pull(buf, msg_hdr_sz(msg));
740                                 cb(usr_handle, dref, &buf, msg_data(msg), dsz);
741                                 break;
742                         }
743                 case TIPC_DIRECT_MSG:{
744                                 tipc_msg_event cb = up_ptr->msg_cb;
745
746                                 tipc_port_unlock(p_ptr);
747                                 if (unlikely(!cb || connected))
748                                         goto reject;
749                                 skb_pull(buf, msg_hdr_sz(msg));
750                                 cb(usr_handle, dref, &buf, msg_data(msg),
751                                    msg_data_sz(msg), msg_importance(msg),
752                                    &orig);
753                                 break;
754                         }
755                 case TIPC_MCAST_MSG:
756                 case TIPC_NAMED_MSG:{
757                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
758
759                                 tipc_port_unlock(p_ptr);
760                                 if (unlikely(!cb || connected || !published))
761                                         goto reject;
762                                 dseq.type =  msg_nametype(msg);
763                                 dseq.lower = msg_nameinst(msg);
764                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
765                                         ? dseq.lower : msg_nameupper(msg);
766                                 skb_pull(buf, msg_hdr_sz(msg));
767                                 cb(usr_handle, dref, &buf, msg_data(msg),
768                                    msg_data_sz(msg), msg_importance(msg),
769                                    &orig, &dseq);
770                                 break;
771                         }
772                 }
773                 if (buf)
774                         kfree_skb(buf);
775                 buf = next;
776                 continue;
777 err:
778                 switch (message_type) {
779
780                 case TIPC_CONN_MSG:{
781                                 tipc_conn_shutdown_event cb =
782                                         up_ptr->conn_err_cb;
783
784                                 tipc_port_unlock(p_ptr);
785                                 if (!cb || !connected || peer_invalid)
786                                         break;
787                                 tipc_disconnect(dref);
788                                 skb_pull(buf, msg_hdr_sz(msg));
789                                 cb(usr_handle, dref, &buf, msg_data(msg),
790                                    msg_data_sz(msg), msg_errcode(msg));
791                                 break;
792                         }
793                 case TIPC_DIRECT_MSG:{
794                                 tipc_msg_err_event cb = up_ptr->err_cb;
795
796                                 tipc_port_unlock(p_ptr);
797                                 if (!cb || connected)
798                                         break;
799                                 skb_pull(buf, msg_hdr_sz(msg));
800                                 cb(usr_handle, dref, &buf, msg_data(msg),
801                                    msg_data_sz(msg), msg_errcode(msg), &orig);
802                                 break;
803                         }
804                 case TIPC_MCAST_MSG:
805                 case TIPC_NAMED_MSG:{
806                                 tipc_named_msg_err_event cb =
807                                         up_ptr->named_err_cb;
808
809                                 tipc_port_unlock(p_ptr);
810                                 if (!cb || connected)
811                                         break;
812                                 dseq.type =  msg_nametype(msg);
813                                 dseq.lower = msg_nameinst(msg);
814                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
815                                         ? dseq.lower : msg_nameupper(msg);
816                                 skb_pull(buf, msg_hdr_sz(msg));
817                                 cb(usr_handle, dref, &buf, msg_data(msg),
818                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
819                                 break;
820                         }
821                 }
822                 if (buf)
823                         kfree_skb(buf);
824                 buf = next;
825                 continue;
826 reject:
827                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
828                 buf = next;
829         }
830 }
831
832 /*
833  *  port_dispatcher(): Dispatcher for messages destinated
834  *  to the tipc_port interface. Called with port locked.
835  */
836 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
837 {
838         buf->next = NULL;
839         spin_lock_bh(&queue_lock);
840         if (msg_queue_head) {
841                 msg_queue_tail->next = buf;
842                 msg_queue_tail = buf;
843         } else {
844                 msg_queue_tail = msg_queue_head = buf;
845                 tipc_k_signal((Handler)port_dispatcher_sigh, 0);
846         }
847         spin_unlock_bh(&queue_lock);
848         return 0;
849 }
850
851 /*
852  * Wake up port after congestion: Called with port locked
853  */
854 static void port_wakeup_sh(unsigned long ref)
855 {
856         struct tipc_port *p_ptr;
857         struct user_port *up_ptr;
858         tipc_continue_event cb = NULL;
859         void *uh = NULL;
860
861         p_ptr = tipc_port_lock(ref);
862         if (p_ptr) {
863                 up_ptr = p_ptr->user_port;
864                 if (up_ptr) {
865                         cb = up_ptr->continue_event_cb;
866                         uh = up_ptr->usr_handle;
867                 }
868                 tipc_port_unlock(p_ptr);
869         }
870         if (cb)
871                 cb(uh, ref);
872 }
873
874
875 static void port_wakeup(struct tipc_port *p_ptr)
876 {
877         tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
878 }
879
880 void tipc_acknowledge(u32 ref, u32 ack)
881 {
882         struct tipc_port *p_ptr;
883         struct sk_buff *buf = NULL;
884
885         p_ptr = tipc_port_lock(ref);
886         if (!p_ptr)
887                 return;
888         if (p_ptr->connected) {
889                 p_ptr->conn_unacked -= ack;
890                 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
891         }
892         tipc_port_unlock(p_ptr);
893         tipc_net_route_msg(buf);
894 }
895
896 /*
897  * tipc_createport(): user level call.
898  */
899 int tipc_createport(void *usr_handle,
900                     unsigned int importance,
901                     tipc_msg_err_event error_cb,
902                     tipc_named_msg_err_event named_error_cb,
903                     tipc_conn_shutdown_event conn_error_cb,
904                     tipc_msg_event msg_cb,
905                     tipc_named_msg_event named_msg_cb,
906                     tipc_conn_msg_event conn_msg_cb,
907                     tipc_continue_event continue_event_cb, /* May be zero */
908                     u32 *portref)
909 {
910         struct user_port *up_ptr;
911         struct tipc_port *p_ptr;
912
913         up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
914         if (!up_ptr) {
915                 pr_warn("Port creation failed, no memory\n");
916                 return -ENOMEM;
917         }
918         p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup,
919                                     importance);
920         if (!p_ptr) {
921                 kfree(up_ptr);
922                 return -ENOMEM;
923         }
924
925         p_ptr->user_port = up_ptr;
926         up_ptr->usr_handle = usr_handle;
927         up_ptr->ref = p_ptr->ref;
928         up_ptr->err_cb = error_cb;
929         up_ptr->named_err_cb = named_error_cb;
930         up_ptr->conn_err_cb = conn_error_cb;
931         up_ptr->msg_cb = msg_cb;
932         up_ptr->named_msg_cb = named_msg_cb;
933         up_ptr->conn_msg_cb = conn_msg_cb;
934         up_ptr->continue_event_cb = continue_event_cb;
935         *portref = p_ptr->ref;
936         tipc_port_unlock(p_ptr);
937         return 0;
938 }
939
940 int tipc_portimportance(u32 ref, unsigned int *importance)
941 {
942         struct tipc_port *p_ptr;
943
944         p_ptr = tipc_port_lock(ref);
945         if (!p_ptr)
946                 return -EINVAL;
947         *importance = (unsigned int)msg_importance(&p_ptr->phdr);
948         tipc_port_unlock(p_ptr);
949         return 0;
950 }
951
952 int tipc_set_portimportance(u32 ref, unsigned int imp)
953 {
954         struct tipc_port *p_ptr;
955
956         if (imp > TIPC_CRITICAL_IMPORTANCE)
957                 return -EINVAL;
958
959         p_ptr = tipc_port_lock(ref);
960         if (!p_ptr)
961                 return -EINVAL;
962         msg_set_importance(&p_ptr->phdr, (u32)imp);
963         tipc_port_unlock(p_ptr);
964         return 0;
965 }
966
967
968 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
969 {
970         struct tipc_port *p_ptr;
971         struct publication *publ;
972         u32 key;
973         int res = -EINVAL;
974
975         p_ptr = tipc_port_lock(ref);
976         if (!p_ptr)
977                 return -EINVAL;
978
979         if (p_ptr->connected)
980                 goto exit;
981         key = ref + p_ptr->pub_count + 1;
982         if (key == ref) {
983                 res = -EADDRINUSE;
984                 goto exit;
985         }
986         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
987                                     scope, p_ptr->ref, key);
988         if (publ) {
989                 list_add(&publ->pport_list, &p_ptr->publications);
990                 p_ptr->pub_count++;
991                 p_ptr->published = 1;
992                 res = 0;
993         }
994 exit:
995         tipc_port_unlock(p_ptr);
996         return res;
997 }
998
999 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1000 {
1001         struct tipc_port *p_ptr;
1002         struct publication *publ;
1003         struct publication *tpubl;
1004         int res = -EINVAL;
1005
1006         p_ptr = tipc_port_lock(ref);
1007         if (!p_ptr)
1008                 return -EINVAL;
1009         if (!seq) {
1010                 list_for_each_entry_safe(publ, tpubl,
1011                                          &p_ptr->publications, pport_list) {
1012                         tipc_nametbl_withdraw(publ->type, publ->lower,
1013                                               publ->ref, publ->key);
1014                 }
1015                 res = 0;
1016         } else {
1017                 list_for_each_entry_safe(publ, tpubl,
1018                                          &p_ptr->publications, pport_list) {
1019                         if (publ->scope != scope)
1020                                 continue;
1021                         if (publ->type != seq->type)
1022                                 continue;
1023                         if (publ->lower != seq->lower)
1024                                 continue;
1025                         if (publ->upper != seq->upper)
1026                                 break;
1027                         tipc_nametbl_withdraw(publ->type, publ->lower,
1028                                               publ->ref, publ->key);
1029                         res = 0;
1030                         break;
1031                 }
1032         }
1033         if (list_empty(&p_ptr->publications))
1034                 p_ptr->published = 0;
1035         tipc_port_unlock(p_ptr);
1036         return res;
1037 }
1038
1039 int tipc_connect(u32 ref, struct tipc_portid const *peer)
1040 {
1041         struct tipc_port *p_ptr;
1042         int res;
1043
1044         p_ptr = tipc_port_lock(ref);
1045         if (!p_ptr)
1046                 return -EINVAL;
1047         res = __tipc_connect(ref, p_ptr, peer);
1048         tipc_port_unlock(p_ptr);
1049         return res;
1050 }
1051
1052 /*
1053  * __tipc_connect - connect to a remote peer
1054  *
1055  * Port must be locked.
1056  */
1057 int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
1058                         struct tipc_portid const *peer)
1059 {
1060         struct tipc_msg *msg;
1061         int res = -EINVAL;
1062
1063         if (p_ptr->published || p_ptr->connected)
1064                 goto exit;
1065         if (!peer->ref)
1066                 goto exit;
1067
1068         msg = &p_ptr->phdr;
1069         msg_set_destnode(msg, peer->node);
1070         msg_set_destport(msg, peer->ref);
1071         msg_set_type(msg, TIPC_CONN_MSG);
1072         msg_set_lookup_scope(msg, 0);
1073         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1074
1075         p_ptr->probing_interval = PROBING_INTERVAL;
1076         p_ptr->probing_state = CONFIRMED;
1077         p_ptr->connected = 1;
1078         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1079
1080         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1081                           (void *)(unsigned long)ref,
1082                           (net_ev_handler)port_handle_node_down);
1083         res = 0;
1084 exit:
1085         p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1086         return res;
1087 }
1088
1089 /*
1090  * __tipc_disconnect - disconnect port from peer
1091  *
1092  * Port must be locked.
1093  */
1094 int __tipc_disconnect(struct tipc_port *tp_ptr)
1095 {
1096         int res;
1097
1098         if (tp_ptr->connected) {
1099                 tp_ptr->connected = 0;
1100                 /* let timer expire on it's own to avoid deadlock! */
1101                 tipc_nodesub_unsubscribe(&tp_ptr->subscription);
1102                 res = 0;
1103         } else {
1104                 res = -ENOTCONN;
1105         }
1106         return res;
1107 }
1108
1109 /*
1110  * tipc_disconnect(): Disconnect port form peer.
1111  *                    This is a node local operation.
1112  */
1113 int tipc_disconnect(u32 ref)
1114 {
1115         struct tipc_port *p_ptr;
1116         int res;
1117
1118         p_ptr = tipc_port_lock(ref);
1119         if (!p_ptr)
1120                 return -EINVAL;
1121         res = __tipc_disconnect(p_ptr);
1122         tipc_port_unlock(p_ptr);
1123         return res;
1124 }
1125
1126 /*
1127  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1128  */
1129 int tipc_shutdown(u32 ref)
1130 {
1131         struct tipc_port *p_ptr;
1132         struct sk_buff *buf = NULL;
1133
1134         p_ptr = tipc_port_lock(ref);
1135         if (!p_ptr)
1136                 return -EINVAL;
1137
1138         buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
1139         tipc_port_unlock(p_ptr);
1140         tipc_net_route_msg(buf);
1141         return tipc_disconnect(ref);
1142 }
1143
1144 /**
1145  * tipc_port_recv_msg - receive message from lower layer and deliver to port user
1146  */
1147 int tipc_port_recv_msg(struct sk_buff *buf)
1148 {
1149         struct tipc_port *p_ptr;
1150         struct tipc_msg *msg = buf_msg(buf);
1151         u32 destport = msg_destport(msg);
1152         u32 dsz = msg_data_sz(msg);
1153         u32 err;
1154
1155         /* forward unresolved named message */
1156         if (unlikely(!destport)) {
1157                 tipc_net_route_msg(buf);
1158                 return dsz;
1159         }
1160
1161         /* validate destination & pass to port, otherwise reject message */
1162         p_ptr = tipc_port_lock(destport);
1163         if (likely(p_ptr)) {
1164                 err = p_ptr->dispatcher(p_ptr, buf);
1165                 tipc_port_unlock(p_ptr);
1166                 if (likely(!err))
1167                         return dsz;
1168         } else {
1169                 err = TIPC_ERR_NO_PORT;
1170         }
1171
1172         return tipc_reject_msg(buf, err);
1173 }
1174
1175 /*
1176  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1177  *                        message for this node.
1178  */
1179 static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1180                                    struct iovec const *msg_sect,
1181                                    unsigned int total_len)
1182 {
1183         struct sk_buff *buf;
1184         int res;
1185
1186         res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1187                         MAX_MSG_SIZE, !sender->user_port, &buf);
1188         if (likely(buf))
1189                 tipc_port_recv_msg(buf);
1190         return res;
1191 }
1192
1193 /**
1194  * tipc_send - send message sections on connection
1195  */
1196 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1197               unsigned int total_len)
1198 {
1199         struct tipc_port *p_ptr;
1200         u32 destnode;
1201         int res;
1202
1203         p_ptr = tipc_port_deref(ref);
1204         if (!p_ptr || !p_ptr->connected)
1205                 return -EINVAL;
1206
1207         p_ptr->congested = 1;
1208         if (!tipc_port_congested(p_ptr)) {
1209                 destnode = port_peernode(p_ptr);
1210                 if (likely(!in_own_node(destnode)))
1211                         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1212                                                            total_len, destnode);
1213                 else
1214                         res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1215                                                       total_len);
1216
1217                 if (likely(res != -ELINKCONG)) {
1218                         p_ptr->congested = 0;
1219                         if (res > 0)
1220                                 p_ptr->sent++;
1221                         return res;
1222                 }
1223         }
1224         if (port_unreliable(p_ptr)) {
1225                 p_ptr->congested = 0;
1226                 return total_len;
1227         }
1228         return -ELINKCONG;
1229 }
1230
1231 /**
1232  * tipc_send2name - send message sections to port name
1233  */
1234 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1235                    unsigned int num_sect, struct iovec const *msg_sect,
1236                    unsigned int total_len)
1237 {
1238         struct tipc_port *p_ptr;
1239         struct tipc_msg *msg;
1240         u32 destnode = domain;
1241         u32 destport;
1242         int res;
1243
1244         p_ptr = tipc_port_deref(ref);
1245         if (!p_ptr || p_ptr->connected)
1246                 return -EINVAL;
1247
1248         msg = &p_ptr->phdr;
1249         msg_set_type(msg, TIPC_NAMED_MSG);
1250         msg_set_hdr_sz(msg, NAMED_H_SIZE);
1251         msg_set_nametype(msg, name->type);
1252         msg_set_nameinst(msg, name->instance);
1253         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1254         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1255         msg_set_destnode(msg, destnode);
1256         msg_set_destport(msg, destport);
1257
1258         if (likely(destport || destnode)) {
1259                 if (likely(in_own_node(destnode)))
1260                         res = tipc_port_recv_sections(p_ptr, num_sect,
1261                                                       msg_sect, total_len);
1262                 else if (tipc_own_addr)
1263                         res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1264                                                            num_sect, total_len,
1265                                                            destnode);
1266                 else
1267                         res = tipc_port_reject_sections(p_ptr, msg, msg_sect,
1268                                                         num_sect, total_len,
1269                                                         TIPC_ERR_NO_NODE);
1270                 if (likely(res != -ELINKCONG)) {
1271                         if (res > 0)
1272                                 p_ptr->sent++;
1273                         return res;
1274                 }
1275                 if (port_unreliable(p_ptr)) {
1276                         return total_len;
1277                 }
1278                 return -ELINKCONG;
1279         }
1280         return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1281                                          total_len, TIPC_ERR_NO_NAME);
1282 }
1283
1284 /**
1285  * tipc_send2port - send message sections to port identity
1286  */
1287 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1288                    unsigned int num_sect, struct iovec const *msg_sect,
1289                    unsigned int total_len)
1290 {
1291         struct tipc_port *p_ptr;
1292         struct tipc_msg *msg;
1293         int res;
1294
1295         p_ptr = tipc_port_deref(ref);
1296         if (!p_ptr || p_ptr->connected)
1297                 return -EINVAL;
1298
1299         msg = &p_ptr->phdr;
1300         msg_set_type(msg, TIPC_DIRECT_MSG);
1301         msg_set_lookup_scope(msg, 0);
1302         msg_set_destnode(msg, dest->node);
1303         msg_set_destport(msg, dest->ref);
1304         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1305
1306         if (in_own_node(dest->node))
1307                 res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1308                                                total_len);
1309         else if (tipc_own_addr)
1310                 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1311                                                    total_len, dest->node);
1312         else
1313                 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1314                                                 total_len, TIPC_ERR_NO_NODE);
1315         if (likely(res != -ELINKCONG)) {
1316                 if (res > 0)
1317                         p_ptr->sent++;
1318                 return res;
1319         }
1320         if (port_unreliable(p_ptr)) {
1321                 return total_len;
1322         }
1323         return -ELINKCONG;
1324 }
1325
1326 /**
1327  * tipc_send_buf2port - send message buffer to port identity
1328  */
1329 int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1330                struct sk_buff *buf, unsigned int dsz)
1331 {
1332         struct tipc_port *p_ptr;
1333         struct tipc_msg *msg;
1334         int res;
1335
1336         p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1337         if (!p_ptr || p_ptr->connected)
1338                 return -EINVAL;
1339
1340         msg = &p_ptr->phdr;
1341         msg_set_type(msg, TIPC_DIRECT_MSG);
1342         msg_set_destnode(msg, dest->node);
1343         msg_set_destport(msg, dest->ref);
1344         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1345         msg_set_size(msg, BASIC_H_SIZE + dsz);
1346         if (skb_cow(buf, BASIC_H_SIZE))
1347                 return -ENOMEM;
1348
1349         skb_push(buf, BASIC_H_SIZE);
1350         skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
1351
1352         if (in_own_node(dest->node))
1353                 res = tipc_port_recv_msg(buf);
1354         else
1355                 res = tipc_send_buf_fast(buf, dest->node);
1356         if (likely(res != -ELINKCONG)) {
1357                 if (res > 0)
1358                         p_ptr->sent++;
1359                 return res;
1360         }
1361         if (port_unreliable(p_ptr))
1362                 return dsz;
1363         return -ELINKCONG;
1364 }