]> Pileus Git - ~andy/linux/blob - net/tipc/port.c
0f40b1055306d1a47dcdda6b0ae0fb1cb730b6d0
[~andy/linux] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41
42 /* Connection management: */
43 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
44 #define CONFIRMED 0
45 #define PROBING 1
46
47 #define MAX_REJECT_SIZE 1024
48
49 static struct sk_buff *msg_queue_head;
50 static struct sk_buff *msg_queue_tail;
51
52 DEFINE_SPINLOCK(tipc_port_list_lock);
53 static DEFINE_SPINLOCK(queue_lock);
54
55 static LIST_HEAD(ports);
56 static void port_handle_node_down(unsigned long ref);
57 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
58 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
59 static void port_timeout(unsigned long ref);
60
61
62 static inline u32 port_peernode(struct tipc_port *p_ptr)
63 {
64         return msg_destnode(&p_ptr->phdr);
65 }
66
67 static inline u32 port_peerport(struct tipc_port *p_ptr)
68 {
69         return msg_destport(&p_ptr->phdr);
70 }
71
72 /*
73  * tipc_port_peer_msg - verify message was sent by connected port's peer
74  *
75  * Handles cases where the node's network address has changed from
76  * the default of <0.0.0> to its configured setting.
77  */
78
79 int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
80 {
81         u32 peernode;
82         u32 orignode;
83
84         if (msg_origport(msg) != port_peerport(p_ptr))
85                 return 0;
86
87         orignode = msg_orignode(msg);
88         peernode = port_peernode(p_ptr);
89         return (orignode == peernode) ||
90                 (!orignode && (peernode == tipc_own_addr)) ||
91                 (!peernode && (orignode == tipc_own_addr));
92 }
93
94 /**
95  * tipc_multicast - send a multicast message to local and remote destinations
96  */
97
98 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
99                    u32 num_sect, struct iovec const *msg_sect,
100                    unsigned int total_len)
101 {
102         struct tipc_msg *hdr;
103         struct sk_buff *buf;
104         struct sk_buff *ibuf = NULL;
105         struct tipc_port_list dports = {0, NULL, };
106         struct tipc_port *oport = tipc_port_deref(ref);
107         int ext_targets;
108         int res;
109
110         if (unlikely(!oport))
111                 return -EINVAL;
112
113         /* Create multicast message */
114
115         hdr = &oport->phdr;
116         msg_set_type(hdr, TIPC_MCAST_MSG);
117         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
118         msg_set_destport(hdr, 0);
119         msg_set_destnode(hdr, 0);
120         msg_set_nametype(hdr, seq->type);
121         msg_set_namelower(hdr, seq->lower);
122         msg_set_nameupper(hdr, seq->upper);
123         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
124         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
125                         !oport->user_port, &buf);
126         if (unlikely(!buf))
127                 return res;
128
129         /* Figure out where to send multicast message */
130
131         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
132                                                 TIPC_NODE_SCOPE, &dports);
133
134         /* Send message to destinations (duplicate it only if necessary) */
135
136         if (ext_targets) {
137                 if (dports.count != 0) {
138                         ibuf = skb_copy(buf, GFP_ATOMIC);
139                         if (ibuf == NULL) {
140                                 tipc_port_list_free(&dports);
141                                 kfree_skb(buf);
142                                 return -ENOMEM;
143                         }
144                 }
145                 res = tipc_bclink_send_msg(buf);
146                 if ((res < 0) && (dports.count != 0))
147                         kfree_skb(ibuf);
148         } else {
149                 ibuf = buf;
150         }
151
152         if (res >= 0) {
153                 if (ibuf)
154                         tipc_port_recv_mcast(ibuf, &dports);
155         } else {
156                 tipc_port_list_free(&dports);
157         }
158         return res;
159 }
160
161 /**
162  * tipc_port_recv_mcast - deliver multicast message to all destination ports
163  *
164  * If there is no port list, perform a lookup to create one
165  */
166
167 void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
168 {
169         struct tipc_msg *msg;
170         struct tipc_port_list dports = {0, NULL, };
171         struct tipc_port_list *item = dp;
172         int cnt = 0;
173
174         msg = buf_msg(buf);
175
176         /* Create destination port list, if one wasn't supplied */
177
178         if (dp == NULL) {
179                 tipc_nametbl_mc_translate(msg_nametype(msg),
180                                      msg_namelower(msg),
181                                      msg_nameupper(msg),
182                                      TIPC_CLUSTER_SCOPE,
183                                      &dports);
184                 item = dp = &dports;
185         }
186
187         /* Deliver a copy of message to each destination port */
188
189         if (dp->count != 0) {
190                 msg_set_destnode(msg, tipc_own_addr);
191                 if (dp->count == 1) {
192                         msg_set_destport(msg, dp->ports[0]);
193                         tipc_port_recv_msg(buf);
194                         tipc_port_list_free(dp);
195                         return;
196                 }
197                 for (; cnt < dp->count; cnt++) {
198                         int index = cnt % PLSIZE;
199                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
200
201                         if (b == NULL) {
202                                 warn("Unable to deliver multicast message(s)\n");
203                                 goto exit;
204                         }
205                         if ((index == 0) && (cnt != 0))
206                                 item = item->next;
207                         msg_set_destport(buf_msg(b), item->ports[index]);
208                         tipc_port_recv_msg(b);
209                 }
210         }
211 exit:
212         kfree_skb(buf);
213         tipc_port_list_free(dp);
214 }
215
216 /**
217  * tipc_createport_raw - create a generic TIPC port
218  *
219  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
220  */
221
222 struct tipc_port *tipc_createport_raw(void *usr_handle,
223                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
224                         void (*wakeup)(struct tipc_port *),
225                         const u32 importance)
226 {
227         struct tipc_port *p_ptr;
228         struct tipc_msg *msg;
229         u32 ref;
230
231         p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
232         if (!p_ptr) {
233                 warn("Port creation failed, no memory\n");
234                 return NULL;
235         }
236         ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
237         if (!ref) {
238                 warn("Port creation failed, reference table exhausted\n");
239                 kfree(p_ptr);
240                 return NULL;
241         }
242
243         p_ptr->usr_handle = usr_handle;
244         p_ptr->max_pkt = MAX_PKT_DEFAULT;
245         p_ptr->ref = ref;
246         INIT_LIST_HEAD(&p_ptr->wait_list);
247         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
248         p_ptr->dispatcher = dispatcher;
249         p_ptr->wakeup = wakeup;
250         p_ptr->user_port = NULL;
251         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
252         INIT_LIST_HEAD(&p_ptr->publications);
253         INIT_LIST_HEAD(&p_ptr->port_list);
254
255         /*
256          * Must hold port list lock while initializing message header template
257          * to ensure a change to node's own network address doesn't result
258          * in template containing out-dated network address information
259          */
260
261         spin_lock_bh(&tipc_port_list_lock);
262         msg = &p_ptr->phdr;
263         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
264         msg_set_origport(msg, ref);
265         list_add_tail(&p_ptr->port_list, &ports);
266         spin_unlock_bh(&tipc_port_list_lock);
267         return p_ptr;
268 }
269
270 int tipc_deleteport(u32 ref)
271 {
272         struct tipc_port *p_ptr;
273         struct sk_buff *buf = NULL;
274
275         tipc_withdraw(ref, 0, NULL);
276         p_ptr = tipc_port_lock(ref);
277         if (!p_ptr)
278                 return -EINVAL;
279
280         tipc_ref_discard(ref);
281         tipc_port_unlock(p_ptr);
282
283         k_cancel_timer(&p_ptr->timer);
284         if (p_ptr->connected) {
285                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
286                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
287         }
288         kfree(p_ptr->user_port);
289
290         spin_lock_bh(&tipc_port_list_lock);
291         list_del(&p_ptr->port_list);
292         list_del(&p_ptr->wait_list);
293         spin_unlock_bh(&tipc_port_list_lock);
294         k_term_timer(&p_ptr->timer);
295         kfree(p_ptr);
296         tipc_net_route_msg(buf);
297         return 0;
298 }
299
300 static int port_unreliable(struct tipc_port *p_ptr)
301 {
302         return msg_src_droppable(&p_ptr->phdr);
303 }
304
305 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
306 {
307         struct tipc_port *p_ptr;
308
309         p_ptr = tipc_port_lock(ref);
310         if (!p_ptr)
311                 return -EINVAL;
312         *isunreliable = port_unreliable(p_ptr);
313         tipc_port_unlock(p_ptr);
314         return 0;
315 }
316
317 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
318 {
319         struct tipc_port *p_ptr;
320
321         p_ptr = tipc_port_lock(ref);
322         if (!p_ptr)
323                 return -EINVAL;
324         msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
325         tipc_port_unlock(p_ptr);
326         return 0;
327 }
328
329 static int port_unreturnable(struct tipc_port *p_ptr)
330 {
331         return msg_dest_droppable(&p_ptr->phdr);
332 }
333
334 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
335 {
336         struct tipc_port *p_ptr;
337
338         p_ptr = tipc_port_lock(ref);
339         if (!p_ptr)
340                 return -EINVAL;
341         *isunrejectable = port_unreturnable(p_ptr);
342         tipc_port_unlock(p_ptr);
343         return 0;
344 }
345
346 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
347 {
348         struct tipc_port *p_ptr;
349
350         p_ptr = tipc_port_lock(ref);
351         if (!p_ptr)
352                 return -EINVAL;
353         msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
354         tipc_port_unlock(p_ptr);
355         return 0;
356 }
357
358 /*
359  * port_build_proto_msg(): create connection protocol message for port
360  *
361  * On entry the port must be locked and connected.
362  */
363 static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
364                                             u32 type, u32 ack)
365 {
366         struct sk_buff *buf;
367         struct tipc_msg *msg;
368
369         buf = tipc_buf_acquire(INT_H_SIZE);
370         if (buf) {
371                 msg = buf_msg(buf);
372                 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
373                               port_peernode(p_ptr));
374                 msg_set_destport(msg, port_peerport(p_ptr));
375                 msg_set_origport(msg, p_ptr->ref);
376                 msg_set_msgcnt(msg, ack);
377         }
378         return buf;
379 }
380
381 int tipc_reject_msg(struct sk_buff *buf, u32 err)
382 {
383         struct tipc_msg *msg = buf_msg(buf);
384         struct sk_buff *rbuf;
385         struct tipc_msg *rmsg;
386         int hdr_sz;
387         u32 imp;
388         u32 data_sz = msg_data_sz(msg);
389         u32 src_node;
390         u32 rmsg_sz;
391
392         /* discard rejected message if it shouldn't be returned to sender */
393
394         if (WARN(!msg_isdata(msg),
395                  "attempt to reject message with user=%u", msg_user(msg))) {
396                 dump_stack();
397                 goto exit;
398         }
399         if (msg_errcode(msg) || msg_dest_droppable(msg))
400                 goto exit;
401
402         /*
403          * construct returned message by copying rejected message header and
404          * data (or subset), then updating header fields that need adjusting
405          */
406
407         hdr_sz = msg_hdr_sz(msg);
408         rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
409
410         rbuf = tipc_buf_acquire(rmsg_sz);
411         if (rbuf == NULL)
412                 goto exit;
413
414         rmsg = buf_msg(rbuf);
415         skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
416
417         if (msg_connected(rmsg)) {
418                 imp = msg_importance(rmsg);
419                 if (imp < TIPC_CRITICAL_IMPORTANCE)
420                         msg_set_importance(rmsg, ++imp);
421         }
422         msg_set_non_seq(rmsg, 0);
423         msg_set_size(rmsg, rmsg_sz);
424         msg_set_errcode(rmsg, err);
425         msg_set_prevnode(rmsg, tipc_own_addr);
426         msg_swap_words(rmsg, 4, 5);
427         if (!msg_short(rmsg))
428                 msg_swap_words(rmsg, 6, 7);
429
430         /* send self-abort message when rejecting on a connected port */
431         if (msg_connected(msg)) {
432                 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
433
434                 if (p_ptr) {
435                         struct sk_buff *abuf = NULL;
436
437                         if (p_ptr->connected)
438                                 abuf = port_build_self_abort_msg(p_ptr, err);
439                         tipc_port_unlock(p_ptr);
440                         tipc_net_route_msg(abuf);
441                 }
442         }
443
444         /* send returned message & dispose of rejected message */
445
446         src_node = msg_prevnode(msg);
447         if (in_own_node(src_node))
448                 tipc_port_recv_msg(rbuf);
449         else
450                 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
451 exit:
452         kfree_skb(buf);
453         return data_sz;
454 }
455
456 int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
457                               struct iovec const *msg_sect, u32 num_sect,
458                               unsigned int total_len, int err)
459 {
460         struct sk_buff *buf;
461         int res;
462
463         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
464                         !p_ptr->user_port, &buf);
465         if (!buf)
466                 return res;
467
468         return tipc_reject_msg(buf, err);
469 }
470
471 static void port_timeout(unsigned long ref)
472 {
473         struct tipc_port *p_ptr = tipc_port_lock(ref);
474         struct sk_buff *buf = NULL;
475
476         if (!p_ptr)
477                 return;
478
479         if (!p_ptr->connected) {
480                 tipc_port_unlock(p_ptr);
481                 return;
482         }
483
484         /* Last probe answered ? */
485         if (p_ptr->probing_state == PROBING) {
486                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
487         } else {
488                 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
489                 p_ptr->probing_state = PROBING;
490                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
491         }
492         tipc_port_unlock(p_ptr);
493         tipc_net_route_msg(buf);
494 }
495
496
497 static void port_handle_node_down(unsigned long ref)
498 {
499         struct tipc_port *p_ptr = tipc_port_lock(ref);
500         struct sk_buff *buf = NULL;
501
502         if (!p_ptr)
503                 return;
504         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
505         tipc_port_unlock(p_ptr);
506         tipc_net_route_msg(buf);
507 }
508
509
510 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
511 {
512         struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
513
514         if (buf) {
515                 struct tipc_msg *msg = buf_msg(buf);
516                 msg_swap_words(msg, 4, 5);
517                 msg_swap_words(msg, 6, 7);
518         }
519         return buf;
520 }
521
522
523 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
524 {
525         struct sk_buff *buf;
526         struct tipc_msg *msg;
527         u32 imp;
528
529         if (!p_ptr->connected)
530                 return NULL;
531
532         buf = tipc_buf_acquire(BASIC_H_SIZE);
533         if (buf) {
534                 msg = buf_msg(buf);
535                 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
536                 msg_set_hdr_sz(msg, BASIC_H_SIZE);
537                 msg_set_size(msg, BASIC_H_SIZE);
538                 imp = msg_importance(msg);
539                 if (imp < TIPC_CRITICAL_IMPORTANCE)
540                         msg_set_importance(msg, ++imp);
541                 msg_set_errcode(msg, err);
542         }
543         return buf;
544 }
545
546 void tipc_port_recv_proto_msg(struct sk_buff *buf)
547 {
548         struct tipc_msg *msg = buf_msg(buf);
549         struct tipc_port *p_ptr;
550         struct sk_buff *r_buf = NULL;
551         u32 destport = msg_destport(msg);
552         int wakeable;
553
554         /* Validate connection */
555
556         p_ptr = tipc_port_lock(destport);
557         if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
558                 r_buf = tipc_buf_acquire(BASIC_H_SIZE);
559                 if (r_buf) {
560                         msg = buf_msg(r_buf);
561                         tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
562                                       BASIC_H_SIZE, msg_orignode(msg));
563                         msg_set_errcode(msg, TIPC_ERR_NO_PORT);
564                         msg_set_origport(msg, destport);
565                         msg_set_destport(msg, msg_origport(msg));
566                 }
567                 if (p_ptr)
568                         tipc_port_unlock(p_ptr);
569                 goto exit;
570         }
571
572         /* Process protocol message sent by peer */
573
574         switch (msg_type(msg)) {
575         case CONN_ACK:
576                 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
577                         p_ptr->wakeup;
578                 p_ptr->acked += msg_msgcnt(msg);
579                 if (!tipc_port_congested(p_ptr)) {
580                         p_ptr->congested = 0;
581                         if (wakeable)
582                                 p_ptr->wakeup(p_ptr);
583                 }
584                 break;
585         case CONN_PROBE:
586                 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
587                 break;
588         default:
589                 /* CONN_PROBE_REPLY or unrecognized - no action required */
590                 break;
591         }
592         p_ptr->probing_state = CONFIRMED;
593         tipc_port_unlock(p_ptr);
594 exit:
595         tipc_net_route_msg(r_buf);
596         kfree_skb(buf);
597 }
598
599 static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
600 {
601         struct publication *publ;
602
603         if (full_id)
604                 tipc_printf(buf, "<%u.%u.%u:%u>:",
605                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
606                             tipc_node(tipc_own_addr), p_ptr->ref);
607         else
608                 tipc_printf(buf, "%-10u:", p_ptr->ref);
609
610         if (p_ptr->connected) {
611                 u32 dport = port_peerport(p_ptr);
612                 u32 destnode = port_peernode(p_ptr);
613
614                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
615                             tipc_zone(destnode), tipc_cluster(destnode),
616                             tipc_node(destnode), dport);
617                 if (p_ptr->conn_type != 0)
618                         tipc_printf(buf, " via {%u,%u}",
619                                     p_ptr->conn_type,
620                                     p_ptr->conn_instance);
621         } else if (p_ptr->published) {
622                 tipc_printf(buf, " bound to");
623                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
624                         if (publ->lower == publ->upper)
625                                 tipc_printf(buf, " {%u,%u}", publ->type,
626                                             publ->lower);
627                         else
628                                 tipc_printf(buf, " {%u,%u,%u}", publ->type,
629                                             publ->lower, publ->upper);
630                 }
631         }
632         tipc_printf(buf, "\n");
633 }
634
635 #define MAX_PORT_QUERY 32768
636
637 struct sk_buff *tipc_port_get_ports(void)
638 {
639         struct sk_buff *buf;
640         struct tlv_desc *rep_tlv;
641         struct print_buf pb;
642         struct tipc_port *p_ptr;
643         int str_len;
644
645         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
646         if (!buf)
647                 return NULL;
648         rep_tlv = (struct tlv_desc *)buf->data;
649
650         tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
651         spin_lock_bh(&tipc_port_list_lock);
652         list_for_each_entry(p_ptr, &ports, port_list) {
653                 spin_lock_bh(p_ptr->lock);
654                 port_print(p_ptr, &pb, 0);
655                 spin_unlock_bh(p_ptr->lock);
656         }
657         spin_unlock_bh(&tipc_port_list_lock);
658         str_len = tipc_printbuf_validate(&pb);
659
660         skb_put(buf, TLV_SPACE(str_len));
661         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
662
663         return buf;
664 }
665
666 void tipc_port_reinit(void)
667 {
668         struct tipc_port *p_ptr;
669         struct tipc_msg *msg;
670
671         spin_lock_bh(&tipc_port_list_lock);
672         list_for_each_entry(p_ptr, &ports, port_list) {
673                 msg = &p_ptr->phdr;
674                 msg_set_prevnode(msg, tipc_own_addr);
675                 msg_set_orignode(msg, tipc_own_addr);
676         }
677         spin_unlock_bh(&tipc_port_list_lock);
678 }
679
680
681 /*
682  *  port_dispatcher_sigh(): Signal handler for messages destinated
683  *                          to the tipc_port interface.
684  */
685
686 static void port_dispatcher_sigh(void *dummy)
687 {
688         struct sk_buff *buf;
689
690         spin_lock_bh(&queue_lock);
691         buf = msg_queue_head;
692         msg_queue_head = NULL;
693         spin_unlock_bh(&queue_lock);
694
695         while (buf) {
696                 struct tipc_port *p_ptr;
697                 struct user_port *up_ptr;
698                 struct tipc_portid orig;
699                 struct tipc_name_seq dseq;
700                 void *usr_handle;
701                 int connected;
702                 int peer_invalid;
703                 int published;
704                 u32 message_type;
705
706                 struct sk_buff *next = buf->next;
707                 struct tipc_msg *msg = buf_msg(buf);
708                 u32 dref = msg_destport(msg);
709
710                 message_type = msg_type(msg);
711                 if (message_type > TIPC_DIRECT_MSG)
712                         goto reject;    /* Unsupported message type */
713
714                 p_ptr = tipc_port_lock(dref);
715                 if (!p_ptr)
716                         goto reject;    /* Port deleted while msg in queue */
717
718                 orig.ref = msg_origport(msg);
719                 orig.node = msg_orignode(msg);
720                 up_ptr = p_ptr->user_port;
721                 usr_handle = up_ptr->usr_handle;
722                 connected = p_ptr->connected;
723                 peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg);
724                 published = p_ptr->published;
725
726                 if (unlikely(msg_errcode(msg)))
727                         goto err;
728
729                 switch (message_type) {
730
731                 case TIPC_CONN_MSG:{
732                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
733                                 u32 dsz;
734
735                                 tipc_port_unlock(p_ptr);
736                                 if (unlikely(!cb))
737                                         goto reject;
738                                 if (unlikely(!connected)) {
739                                         if (tipc_connect2port(dref, &orig))
740                                                 goto reject;
741                                 } else if (peer_invalid)
742                                         goto reject;
743                                 dsz = msg_data_sz(msg);
744                                 if (unlikely(dsz &&
745                                              (++p_ptr->conn_unacked >=
746                                               TIPC_FLOW_CONTROL_WIN)))
747                                         tipc_acknowledge(dref,
748                                                          p_ptr->conn_unacked);
749                                 skb_pull(buf, msg_hdr_sz(msg));
750                                 cb(usr_handle, dref, &buf, msg_data(msg), dsz);
751                                 break;
752                         }
753                 case TIPC_DIRECT_MSG:{
754                                 tipc_msg_event cb = up_ptr->msg_cb;
755
756                                 tipc_port_unlock(p_ptr);
757                                 if (unlikely(!cb || connected))
758                                         goto reject;
759                                 skb_pull(buf, msg_hdr_sz(msg));
760                                 cb(usr_handle, dref, &buf, msg_data(msg),
761                                    msg_data_sz(msg), msg_importance(msg),
762                                    &orig);
763                                 break;
764                         }
765                 case TIPC_MCAST_MSG:
766                 case TIPC_NAMED_MSG:{
767                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
768
769                                 tipc_port_unlock(p_ptr);
770                                 if (unlikely(!cb || connected || !published))
771                                         goto reject;
772                                 dseq.type =  msg_nametype(msg);
773                                 dseq.lower = msg_nameinst(msg);
774                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
775                                         ? dseq.lower : msg_nameupper(msg);
776                                 skb_pull(buf, msg_hdr_sz(msg));
777                                 cb(usr_handle, dref, &buf, msg_data(msg),
778                                    msg_data_sz(msg), msg_importance(msg),
779                                    &orig, &dseq);
780                                 break;
781                         }
782                 }
783                 if (buf)
784                         kfree_skb(buf);
785                 buf = next;
786                 continue;
787 err:
788                 switch (message_type) {
789
790                 case TIPC_CONN_MSG:{
791                                 tipc_conn_shutdown_event cb =
792                                         up_ptr->conn_err_cb;
793
794                                 tipc_port_unlock(p_ptr);
795                                 if (!cb || !connected || peer_invalid)
796                                         break;
797                                 tipc_disconnect(dref);
798                                 skb_pull(buf, msg_hdr_sz(msg));
799                                 cb(usr_handle, dref, &buf, msg_data(msg),
800                                    msg_data_sz(msg), msg_errcode(msg));
801                                 break;
802                         }
803                 case TIPC_DIRECT_MSG:{
804                                 tipc_msg_err_event cb = up_ptr->err_cb;
805
806                                 tipc_port_unlock(p_ptr);
807                                 if (!cb || connected)
808                                         break;
809                                 skb_pull(buf, msg_hdr_sz(msg));
810                                 cb(usr_handle, dref, &buf, msg_data(msg),
811                                    msg_data_sz(msg), msg_errcode(msg), &orig);
812                                 break;
813                         }
814                 case TIPC_MCAST_MSG:
815                 case TIPC_NAMED_MSG:{
816                                 tipc_named_msg_err_event cb =
817                                         up_ptr->named_err_cb;
818
819                                 tipc_port_unlock(p_ptr);
820                                 if (!cb || connected)
821                                         break;
822                                 dseq.type =  msg_nametype(msg);
823                                 dseq.lower = msg_nameinst(msg);
824                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
825                                         ? dseq.lower : msg_nameupper(msg);
826                                 skb_pull(buf, msg_hdr_sz(msg));
827                                 cb(usr_handle, dref, &buf, msg_data(msg),
828                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
829                                 break;
830                         }
831                 }
832                 if (buf)
833                         kfree_skb(buf);
834                 buf = next;
835                 continue;
836 reject:
837                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
838                 buf = next;
839         }
840 }
841
842 /*
843  *  port_dispatcher(): Dispatcher for messages destinated
844  *  to the tipc_port interface. Called with port locked.
845  */
846
847 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
848 {
849         buf->next = NULL;
850         spin_lock_bh(&queue_lock);
851         if (msg_queue_head) {
852                 msg_queue_tail->next = buf;
853                 msg_queue_tail = buf;
854         } else {
855                 msg_queue_tail = msg_queue_head = buf;
856                 tipc_k_signal((Handler)port_dispatcher_sigh, 0);
857         }
858         spin_unlock_bh(&queue_lock);
859         return 0;
860 }
861
862 /*
863  * Wake up port after congestion: Called with port locked,
864  *
865  */
866
867 static void port_wakeup_sh(unsigned long ref)
868 {
869         struct tipc_port *p_ptr;
870         struct user_port *up_ptr;
871         tipc_continue_event cb = NULL;
872         void *uh = NULL;
873
874         p_ptr = tipc_port_lock(ref);
875         if (p_ptr) {
876                 up_ptr = p_ptr->user_port;
877                 if (up_ptr) {
878                         cb = up_ptr->continue_event_cb;
879                         uh = up_ptr->usr_handle;
880                 }
881                 tipc_port_unlock(p_ptr);
882         }
883         if (cb)
884                 cb(uh, ref);
885 }
886
887
888 static void port_wakeup(struct tipc_port *p_ptr)
889 {
890         tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
891 }
892
893 void tipc_acknowledge(u32 ref, u32 ack)
894 {
895         struct tipc_port *p_ptr;
896         struct sk_buff *buf = NULL;
897
898         p_ptr = tipc_port_lock(ref);
899         if (!p_ptr)
900                 return;
901         if (p_ptr->connected) {
902                 p_ptr->conn_unacked -= ack;
903                 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
904         }
905         tipc_port_unlock(p_ptr);
906         tipc_net_route_msg(buf);
907 }
908
909 /*
910  * tipc_createport(): user level call.
911  */
912
913 int tipc_createport(void *usr_handle,
914                     unsigned int importance,
915                     tipc_msg_err_event error_cb,
916                     tipc_named_msg_err_event named_error_cb,
917                     tipc_conn_shutdown_event conn_error_cb,
918                     tipc_msg_event msg_cb,
919                     tipc_named_msg_event named_msg_cb,
920                     tipc_conn_msg_event conn_msg_cb,
921                     tipc_continue_event continue_event_cb,/* May be zero */
922                     u32 *portref)
923 {
924         struct user_port *up_ptr;
925         struct tipc_port *p_ptr;
926
927         up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
928         if (!up_ptr) {
929                 warn("Port creation failed, no memory\n");
930                 return -ENOMEM;
931         }
932         p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher,
933                                                    port_wakeup, importance);
934         if (!p_ptr) {
935                 kfree(up_ptr);
936                 return -ENOMEM;
937         }
938
939         p_ptr->user_port = up_ptr;
940         up_ptr->usr_handle = usr_handle;
941         up_ptr->ref = p_ptr->ref;
942         up_ptr->err_cb = error_cb;
943         up_ptr->named_err_cb = named_error_cb;
944         up_ptr->conn_err_cb = conn_error_cb;
945         up_ptr->msg_cb = msg_cb;
946         up_ptr->named_msg_cb = named_msg_cb;
947         up_ptr->conn_msg_cb = conn_msg_cb;
948         up_ptr->continue_event_cb = continue_event_cb;
949         *portref = p_ptr->ref;
950         tipc_port_unlock(p_ptr);
951         return 0;
952 }
953
954 int tipc_portimportance(u32 ref, unsigned int *importance)
955 {
956         struct tipc_port *p_ptr;
957
958         p_ptr = tipc_port_lock(ref);
959         if (!p_ptr)
960                 return -EINVAL;
961         *importance = (unsigned int)msg_importance(&p_ptr->phdr);
962         tipc_port_unlock(p_ptr);
963         return 0;
964 }
965
966 int tipc_set_portimportance(u32 ref, unsigned int imp)
967 {
968         struct tipc_port *p_ptr;
969
970         if (imp > TIPC_CRITICAL_IMPORTANCE)
971                 return -EINVAL;
972
973         p_ptr = tipc_port_lock(ref);
974         if (!p_ptr)
975                 return -EINVAL;
976         msg_set_importance(&p_ptr->phdr, (u32)imp);
977         tipc_port_unlock(p_ptr);
978         return 0;
979 }
980
981
982 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
983 {
984         struct tipc_port *p_ptr;
985         struct publication *publ;
986         u32 key;
987         int res = -EINVAL;
988
989         p_ptr = tipc_port_lock(ref);
990         if (!p_ptr)
991                 return -EINVAL;
992
993         if (p_ptr->connected)
994                 goto exit;
995         if (seq->lower > seq->upper)
996                 goto exit;
997         if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
998                 goto exit;
999         key = ref + p_ptr->pub_count + 1;
1000         if (key == ref) {
1001                 res = -EADDRINUSE;
1002                 goto exit;
1003         }
1004         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1005                                     scope, p_ptr->ref, key);
1006         if (publ) {
1007                 list_add(&publ->pport_list, &p_ptr->publications);
1008                 p_ptr->pub_count++;
1009                 p_ptr->published = 1;
1010                 res = 0;
1011         }
1012 exit:
1013         tipc_port_unlock(p_ptr);
1014         return res;
1015 }
1016
1017 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1018 {
1019         struct tipc_port *p_ptr;
1020         struct publication *publ;
1021         struct publication *tpubl;
1022         int res = -EINVAL;
1023
1024         p_ptr = tipc_port_lock(ref);
1025         if (!p_ptr)
1026                 return -EINVAL;
1027         if (!seq) {
1028                 list_for_each_entry_safe(publ, tpubl,
1029                                          &p_ptr->publications, pport_list) {
1030                         tipc_nametbl_withdraw(publ->type, publ->lower,
1031                                               publ->ref, publ->key);
1032                 }
1033                 res = 0;
1034         } else {
1035                 list_for_each_entry_safe(publ, tpubl,
1036                                          &p_ptr->publications, pport_list) {
1037                         if (publ->scope != scope)
1038                                 continue;
1039                         if (publ->type != seq->type)
1040                                 continue;
1041                         if (publ->lower != seq->lower)
1042                                 continue;
1043                         if (publ->upper != seq->upper)
1044                                 break;
1045                         tipc_nametbl_withdraw(publ->type, publ->lower,
1046                                               publ->ref, publ->key);
1047                         res = 0;
1048                         break;
1049                 }
1050         }
1051         if (list_empty(&p_ptr->publications))
1052                 p_ptr->published = 0;
1053         tipc_port_unlock(p_ptr);
1054         return res;
1055 }
1056
1057 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1058 {
1059         struct tipc_port *p_ptr;
1060         struct tipc_msg *msg;
1061         int res = -EINVAL;
1062
1063         p_ptr = tipc_port_lock(ref);
1064         if (!p_ptr)
1065                 return -EINVAL;
1066         if (p_ptr->published || p_ptr->connected)
1067                 goto exit;
1068         if (!peer->ref)
1069                 goto exit;
1070
1071         msg = &p_ptr->phdr;
1072         msg_set_destnode(msg, peer->node);
1073         msg_set_destport(msg, peer->ref);
1074         msg_set_type(msg, TIPC_CONN_MSG);
1075         msg_set_lookup_scope(msg, 0);
1076         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1077
1078         p_ptr->probing_interval = PROBING_INTERVAL;
1079         p_ptr->probing_state = CONFIRMED;
1080         p_ptr->connected = 1;
1081         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1082
1083         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1084                           (void *)(unsigned long)ref,
1085                           (net_ev_handler)port_handle_node_down);
1086         res = 0;
1087 exit:
1088         tipc_port_unlock(p_ptr);
1089         p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1090         return res;
1091 }
1092
1093 /**
1094  * tipc_disconnect_port - disconnect port from peer
1095  *
1096  * Port must be locked.
1097  */
1098
1099 int tipc_disconnect_port(struct tipc_port *tp_ptr)
1100 {
1101         int res;
1102
1103         if (tp_ptr->connected) {
1104                 tp_ptr->connected = 0;
1105                 /* let timer expire on it's own to avoid deadlock! */
1106                 tipc_nodesub_unsubscribe(
1107                         &((struct tipc_port *)tp_ptr)->subscription);
1108                 res = 0;
1109         } else {
1110                 res = -ENOTCONN;
1111         }
1112         return res;
1113 }
1114
1115 /*
1116  * tipc_disconnect(): Disconnect port form peer.
1117  *                    This is a node local operation.
1118  */
1119
1120 int tipc_disconnect(u32 ref)
1121 {
1122         struct tipc_port *p_ptr;
1123         int res;
1124
1125         p_ptr = tipc_port_lock(ref);
1126         if (!p_ptr)
1127                 return -EINVAL;
1128         res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1129         tipc_port_unlock(p_ptr);
1130         return res;
1131 }
1132
1133 /*
1134  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1135  */
1136 int tipc_shutdown(u32 ref)
1137 {
1138         struct tipc_port *p_ptr;
1139         struct sk_buff *buf = NULL;
1140
1141         p_ptr = tipc_port_lock(ref);
1142         if (!p_ptr)
1143                 return -EINVAL;
1144
1145         buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
1146         tipc_port_unlock(p_ptr);
1147         tipc_net_route_msg(buf);
1148         return tipc_disconnect(ref);
1149 }
1150
1151 /**
1152  * tipc_port_recv_msg - receive message from lower layer and deliver to port user
1153  */
1154
1155 int tipc_port_recv_msg(struct sk_buff *buf)
1156 {
1157         struct tipc_port *p_ptr;
1158         struct tipc_msg *msg = buf_msg(buf);
1159         u32 destport = msg_destport(msg);
1160         u32 dsz = msg_data_sz(msg);
1161         u32 err;
1162
1163         /* forward unresolved named message */
1164         if (unlikely(!destport)) {
1165                 tipc_net_route_msg(buf);
1166                 return dsz;
1167         }
1168
1169         /* validate destination & pass to port, otherwise reject message */
1170         p_ptr = tipc_port_lock(destport);
1171         if (likely(p_ptr)) {
1172                 err = p_ptr->dispatcher(p_ptr, buf);
1173                 tipc_port_unlock(p_ptr);
1174                 if (likely(!err))
1175                         return dsz;
1176         } else {
1177                 err = TIPC_ERR_NO_PORT;
1178         }
1179
1180         return tipc_reject_msg(buf, err);
1181 }
1182
1183 /*
1184  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1185  *                        message for this node.
1186  */
1187
1188 static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1189                                    struct iovec const *msg_sect,
1190                                    unsigned int total_len)
1191 {
1192         struct sk_buff *buf;
1193         int res;
1194
1195         res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1196                         MAX_MSG_SIZE, !sender->user_port, &buf);
1197         if (likely(buf))
1198                 tipc_port_recv_msg(buf);
1199         return res;
1200 }
1201
1202 /**
1203  * tipc_send - send message sections on connection
1204  */
1205
1206 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1207               unsigned int total_len)
1208 {
1209         struct tipc_port *p_ptr;
1210         u32 destnode;
1211         int res;
1212
1213         p_ptr = tipc_port_deref(ref);
1214         if (!p_ptr || !p_ptr->connected)
1215                 return -EINVAL;
1216
1217         p_ptr->congested = 1;
1218         if (!tipc_port_congested(p_ptr)) {
1219                 destnode = port_peernode(p_ptr);
1220                 if (likely(!in_own_node(destnode)))
1221                         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1222                                                            total_len, destnode);
1223                 else
1224                         res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1225                                                       total_len);
1226
1227                 if (likely(res != -ELINKCONG)) {
1228                         p_ptr->congested = 0;
1229                         if (res > 0)
1230                                 p_ptr->sent++;
1231                         return res;
1232                 }
1233         }
1234         if (port_unreliable(p_ptr)) {
1235                 p_ptr->congested = 0;
1236                 return total_len;
1237         }
1238         return -ELINKCONG;
1239 }
1240
1241 /**
1242  * tipc_send2name - send message sections to port name
1243  */
1244
1245 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1246                    unsigned int num_sect, struct iovec const *msg_sect,
1247                    unsigned int total_len)
1248 {
1249         struct tipc_port *p_ptr;
1250         struct tipc_msg *msg;
1251         u32 destnode = domain;
1252         u32 destport;
1253         int res;
1254
1255         p_ptr = tipc_port_deref(ref);
1256         if (!p_ptr || p_ptr->connected)
1257                 return -EINVAL;
1258
1259         msg = &p_ptr->phdr;
1260         msg_set_type(msg, TIPC_NAMED_MSG);
1261         msg_set_hdr_sz(msg, NAMED_H_SIZE);
1262         msg_set_nametype(msg, name->type);
1263         msg_set_nameinst(msg, name->instance);
1264         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1265         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1266         msg_set_destnode(msg, destnode);
1267         msg_set_destport(msg, destport);
1268
1269         if (likely(destport || destnode)) {
1270                 if (likely(in_own_node(destnode)))
1271                         res = tipc_port_recv_sections(p_ptr, num_sect,
1272                                                       msg_sect, total_len);
1273                 else if (tipc_own_addr)
1274                         res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1275                                                            num_sect, total_len,
1276                                                            destnode);
1277                 else
1278                         res = tipc_port_reject_sections(p_ptr, msg, msg_sect,
1279                                                         num_sect, total_len,
1280                                                         TIPC_ERR_NO_NODE);
1281                 if (likely(res != -ELINKCONG)) {
1282                         if (res > 0)
1283                                 p_ptr->sent++;
1284                         return res;
1285                 }
1286                 if (port_unreliable(p_ptr)) {
1287                         return total_len;
1288                 }
1289                 return -ELINKCONG;
1290         }
1291         return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1292                                          total_len, TIPC_ERR_NO_NAME);
1293 }
1294
1295 /**
1296  * tipc_send2port - send message sections to port identity
1297  */
1298
1299 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1300                    unsigned int num_sect, struct iovec const *msg_sect,
1301                    unsigned int total_len)
1302 {
1303         struct tipc_port *p_ptr;
1304         struct tipc_msg *msg;
1305         int res;
1306
1307         p_ptr = tipc_port_deref(ref);
1308         if (!p_ptr || p_ptr->connected)
1309                 return -EINVAL;
1310
1311         msg = &p_ptr->phdr;
1312         msg_set_type(msg, TIPC_DIRECT_MSG);
1313         msg_set_lookup_scope(msg, 0);
1314         msg_set_destnode(msg, dest->node);
1315         msg_set_destport(msg, dest->ref);
1316         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1317
1318         if (in_own_node(dest->node))
1319                 res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1320                                                total_len);
1321         else if (tipc_own_addr)
1322                 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1323                                                    total_len, dest->node);
1324         else
1325                 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1326                                                 total_len, TIPC_ERR_NO_NODE);
1327         if (likely(res != -ELINKCONG)) {
1328                 if (res > 0)
1329                         p_ptr->sent++;
1330                 return res;
1331         }
1332         if (port_unreliable(p_ptr)) {
1333                 return total_len;
1334         }
1335         return -ELINKCONG;
1336 }
1337
1338 /**
1339  * tipc_send_buf2port - send message buffer to port identity
1340  */
1341
1342 int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1343                struct sk_buff *buf, unsigned int dsz)
1344 {
1345         struct tipc_port *p_ptr;
1346         struct tipc_msg *msg;
1347         int res;
1348
1349         p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1350         if (!p_ptr || p_ptr->connected)
1351                 return -EINVAL;
1352
1353         msg = &p_ptr->phdr;
1354         msg_set_type(msg, TIPC_DIRECT_MSG);
1355         msg_set_destnode(msg, dest->node);
1356         msg_set_destport(msg, dest->ref);
1357         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1358         msg_set_size(msg, BASIC_H_SIZE + dsz);
1359         if (skb_cow(buf, BASIC_H_SIZE))
1360                 return -ENOMEM;
1361
1362         skb_push(buf, BASIC_H_SIZE);
1363         skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
1364
1365         if (in_own_node(dest->node))
1366                 res = tipc_port_recv_msg(buf);
1367         else
1368                 res = tipc_send_buf_fast(buf, dest->node);
1369         if (likely(res != -ELINKCONG)) {
1370                 if (res > 0)
1371                         p_ptr->sent++;
1372                 return res;
1373         }
1374         if (port_unreliable(p_ptr))
1375                 return dsz;
1376         return -ELINKCONG;
1377 }
1378