]> Pileus Git - ~andy/linux/blob - net/tipc/link.c
tipc: don't reroute message fragments
[~andy/linux] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "name_distr.h"
41 #include "discover.h"
42 #include "config.h"
43
44 #include <linux/pkt_sched.h>
45
46 /*
47  * Error message prefixes
48  */
49 static const char *link_co_err = "Link changeover error, ";
50 static const char *link_rst_msg = "Resetting link ";
51 static const char *link_unk_evt = "Unknown link event ";
52
53 /*
54  * Out-of-range value for link session numbers
55  */
56 #define INVALID_SESSION 0x10000
57
58 /*
59  * Link state events:
60  */
61 #define  STARTING_EVT    856384768      /* link processing trigger */
62 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
63 #define  TIMEOUT_EVT     560817u        /* link timer expired */
64
65 /*
66  * The following two 'message types' is really just implementation
67  * data conveniently stored in the message header.
68  * They must not be considered part of the protocol
69  */
70 #define OPEN_MSG   0
71 #define CLOSED_MSG 1
72
73 /*
74  * State value stored in 'exp_msg_count'
75  */
76 #define START_CHANGEOVER 100000u
77
78 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
79                                        struct sk_buff *buf);
80 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
81 static int  link_recv_changeover_msg(struct tipc_link **l_ptr,
82                                      struct sk_buff **buf);
83 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
84 static int  link_send_sections_long(struct tipc_port *sender,
85                                     struct iovec const *msg_sect,
86                                     unsigned int len, u32 destnode);
87 static void link_state_event(struct tipc_link *l_ptr, u32 event);
88 static void link_reset_statistics(struct tipc_link *l_ptr);
89 static void link_print(struct tipc_link *l_ptr, const char *str);
90 static void link_start(struct tipc_link *l_ptr);
91 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
92 static void tipc_link_send_sync(struct tipc_link *l);
93 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
94
95 /*
96  *  Simple link routines
97  */
98 static unsigned int align(unsigned int i)
99 {
100         return (i + 3) & ~3u;
101 }
102
103 static void link_init_max_pkt(struct tipc_link *l_ptr)
104 {
105         u32 max_pkt;
106
107         max_pkt = (l_ptr->b_ptr->mtu & ~3);
108         if (max_pkt > MAX_MSG_SIZE)
109                 max_pkt = MAX_MSG_SIZE;
110
111         l_ptr->max_pkt_target = max_pkt;
112         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
113                 l_ptr->max_pkt = l_ptr->max_pkt_target;
114         else
115                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
116
117         l_ptr->max_pkt_probes = 0;
118 }
119
120 static u32 link_next_sent(struct tipc_link *l_ptr)
121 {
122         if (l_ptr->next_out)
123                 return buf_seqno(l_ptr->next_out);
124         return mod(l_ptr->next_out_no);
125 }
126
127 static u32 link_last_sent(struct tipc_link *l_ptr)
128 {
129         return mod(link_next_sent(l_ptr) - 1);
130 }
131
132 /*
133  *  Simple non-static link routines (i.e. referenced outside this file)
134  */
135 int tipc_link_is_up(struct tipc_link *l_ptr)
136 {
137         if (!l_ptr)
138                 return 0;
139         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
140 }
141
142 int tipc_link_is_active(struct tipc_link *l_ptr)
143 {
144         return  (l_ptr->owner->active_links[0] == l_ptr) ||
145                 (l_ptr->owner->active_links[1] == l_ptr);
146 }
147
148 /**
149  * link_timeout - handle expiration of link timer
150  * @l_ptr: pointer to link
151  *
152  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
153  * with tipc_link_delete().  (There is no risk that the node will be deleted by
154  * another thread because tipc_link_delete() always cancels the link timer before
155  * tipc_node_delete() is called.)
156  */
157 static void link_timeout(struct tipc_link *l_ptr)
158 {
159         tipc_node_lock(l_ptr->owner);
160
161         /* update counters used in statistical profiling of send traffic */
162         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
163         l_ptr->stats.queue_sz_counts++;
164
165         if (l_ptr->first_out) {
166                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
167                 u32 length = msg_size(msg);
168
169                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
170                     (msg_type(msg) == FIRST_FRAGMENT)) {
171                         length = msg_size(msg_get_wrapped(msg));
172                 }
173                 if (length) {
174                         l_ptr->stats.msg_lengths_total += length;
175                         l_ptr->stats.msg_length_counts++;
176                         if (length <= 64)
177                                 l_ptr->stats.msg_length_profile[0]++;
178                         else if (length <= 256)
179                                 l_ptr->stats.msg_length_profile[1]++;
180                         else if (length <= 1024)
181                                 l_ptr->stats.msg_length_profile[2]++;
182                         else if (length <= 4096)
183                                 l_ptr->stats.msg_length_profile[3]++;
184                         else if (length <= 16384)
185                                 l_ptr->stats.msg_length_profile[4]++;
186                         else if (length <= 32768)
187                                 l_ptr->stats.msg_length_profile[5]++;
188                         else
189                                 l_ptr->stats.msg_length_profile[6]++;
190                 }
191         }
192
193         /* do all other link processing performed on a periodic basis */
194
195         link_state_event(l_ptr, TIMEOUT_EVT);
196
197         if (l_ptr->next_out)
198                 tipc_link_push_queue(l_ptr);
199
200         tipc_node_unlock(l_ptr->owner);
201 }
202
203 static void link_set_timer(struct tipc_link *l_ptr, u32 time)
204 {
205         k_start_timer(&l_ptr->timer, time);
206 }
207
208 /**
209  * tipc_link_create - create a new link
210  * @n_ptr: pointer to associated node
211  * @b_ptr: pointer to associated bearer
212  * @media_addr: media address to use when sending messages over link
213  *
214  * Returns pointer to link.
215  */
216 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
217                               struct tipc_bearer *b_ptr,
218                               const struct tipc_media_addr *media_addr)
219 {
220         struct tipc_link *l_ptr;
221         struct tipc_msg *msg;
222         char *if_name;
223         char addr_string[16];
224         u32 peer = n_ptr->addr;
225
226         if (n_ptr->link_cnt >= 2) {
227                 tipc_addr_string_fill(addr_string, n_ptr->addr);
228                 pr_err("Attempt to establish third link to %s\n", addr_string);
229                 return NULL;
230         }
231
232         if (n_ptr->links[b_ptr->identity]) {
233                 tipc_addr_string_fill(addr_string, n_ptr->addr);
234                 pr_err("Attempt to establish second link on <%s> to %s\n",
235                        b_ptr->name, addr_string);
236                 return NULL;
237         }
238
239         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
240         if (!l_ptr) {
241                 pr_warn("Link creation failed, no memory\n");
242                 return NULL;
243         }
244
245         l_ptr->addr = peer;
246         if_name = strchr(b_ptr->name, ':') + 1;
247         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
248                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
249                 tipc_node(tipc_own_addr),
250                 if_name,
251                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
252                 /* note: peer i/f name is updated by reset/activate message */
253         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
254         l_ptr->owner = n_ptr;
255         l_ptr->checkpoint = 1;
256         l_ptr->peer_session = INVALID_SESSION;
257         l_ptr->b_ptr = b_ptr;
258         link_set_supervision_props(l_ptr, b_ptr->tolerance);
259         l_ptr->state = RESET_UNKNOWN;
260
261         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
262         msg = l_ptr->pmsg;
263         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
264         msg_set_size(msg, sizeof(l_ptr->proto_msg));
265         msg_set_session(msg, (tipc_random & 0xffff));
266         msg_set_bearer_id(msg, b_ptr->identity);
267         strcpy((char *)msg_data(msg), if_name);
268
269         l_ptr->priority = b_ptr->priority;
270         tipc_link_set_queue_limits(l_ptr, b_ptr->window);
271
272         link_init_max_pkt(l_ptr);
273
274         l_ptr->next_out_no = 1;
275         INIT_LIST_HEAD(&l_ptr->waiting_ports);
276
277         link_reset_statistics(l_ptr);
278
279         tipc_node_attach_link(n_ptr, l_ptr);
280
281         k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
282         list_add_tail(&l_ptr->link_list, &b_ptr->links);
283         tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
284
285         return l_ptr;
286 }
287
288 /**
289  * tipc_link_delete - delete a link
290  * @l_ptr: pointer to link
291  *
292  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
293  * This routine must not grab the node lock until after link timer cancellation
294  * to avoid a potential deadlock situation.
295  */
296 void tipc_link_delete(struct tipc_link *l_ptr)
297 {
298         if (!l_ptr) {
299                 pr_err("Attempt to delete non-existent link\n");
300                 return;
301         }
302
303         k_cancel_timer(&l_ptr->timer);
304
305         tipc_node_lock(l_ptr->owner);
306         tipc_link_reset(l_ptr);
307         tipc_node_detach_link(l_ptr->owner, l_ptr);
308         tipc_link_stop(l_ptr);
309         list_del_init(&l_ptr->link_list);
310         tipc_node_unlock(l_ptr->owner);
311         k_term_timer(&l_ptr->timer);
312         kfree(l_ptr);
313 }
314
315 static void link_start(struct tipc_link *l_ptr)
316 {
317         tipc_node_lock(l_ptr->owner);
318         link_state_event(l_ptr, STARTING_EVT);
319         tipc_node_unlock(l_ptr->owner);
320 }
321
322 /**
323  * link_schedule_port - schedule port for deferred sending
324  * @l_ptr: pointer to link
325  * @origport: reference to sending port
326  * @sz: amount of data to be sent
327  *
328  * Schedules port for renewed sending of messages after link congestion
329  * has abated.
330  */
331 static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
332 {
333         struct tipc_port *p_ptr;
334
335         spin_lock_bh(&tipc_port_list_lock);
336         p_ptr = tipc_port_lock(origport);
337         if (p_ptr) {
338                 if (!p_ptr->wakeup)
339                         goto exit;
340                 if (!list_empty(&p_ptr->wait_list))
341                         goto exit;
342                 p_ptr->congested = 1;
343                 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
344                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
345                 l_ptr->stats.link_congs++;
346 exit:
347                 tipc_port_unlock(p_ptr);
348         }
349         spin_unlock_bh(&tipc_port_list_lock);
350         return -ELINKCONG;
351 }
352
353 void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
354 {
355         struct tipc_port *p_ptr;
356         struct tipc_port *temp_p_ptr;
357         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
358
359         if (all)
360                 win = 100000;
361         if (win <= 0)
362                 return;
363         if (!spin_trylock_bh(&tipc_port_list_lock))
364                 return;
365         if (link_congested(l_ptr))
366                 goto exit;
367         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
368                                  wait_list) {
369                 if (win <= 0)
370                         break;
371                 list_del_init(&p_ptr->wait_list);
372                 spin_lock_bh(p_ptr->lock);
373                 p_ptr->congested = 0;
374                 p_ptr->wakeup(p_ptr);
375                 win -= p_ptr->waiting_pkts;
376                 spin_unlock_bh(p_ptr->lock);
377         }
378
379 exit:
380         spin_unlock_bh(&tipc_port_list_lock);
381 }
382
383 /**
384  * link_release_outqueue - purge link's outbound message queue
385  * @l_ptr: pointer to link
386  */
387 static void link_release_outqueue(struct tipc_link *l_ptr)
388 {
389         struct sk_buff *buf = l_ptr->first_out;
390         struct sk_buff *next;
391
392         while (buf) {
393                 next = buf->next;
394                 kfree_skb(buf);
395                 buf = next;
396         }
397         l_ptr->first_out = NULL;
398         l_ptr->out_queue_size = 0;
399 }
400
401 /**
402  * tipc_link_reset_fragments - purge link's inbound message fragments queue
403  * @l_ptr: pointer to link
404  */
405 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
406 {
407         struct sk_buff *buf = l_ptr->defragm_buf;
408         struct sk_buff *next;
409
410         while (buf) {
411                 next = buf->next;
412                 kfree_skb(buf);
413                 buf = next;
414         }
415         l_ptr->defragm_buf = NULL;
416 }
417
418 /**
419  * tipc_link_stop - purge all inbound and outbound messages associated with link
420  * @l_ptr: pointer to link
421  */
422 void tipc_link_stop(struct tipc_link *l_ptr)
423 {
424         struct sk_buff *buf;
425         struct sk_buff *next;
426
427         buf = l_ptr->oldest_deferred_in;
428         while (buf) {
429                 next = buf->next;
430                 kfree_skb(buf);
431                 buf = next;
432         }
433
434         buf = l_ptr->first_out;
435         while (buf) {
436                 next = buf->next;
437                 kfree_skb(buf);
438                 buf = next;
439         }
440
441         tipc_link_reset_fragments(l_ptr);
442
443         kfree_skb(l_ptr->proto_msg_queue);
444         l_ptr->proto_msg_queue = NULL;
445 }
446
447 void tipc_link_reset(struct tipc_link *l_ptr)
448 {
449         struct sk_buff *buf;
450         u32 prev_state = l_ptr->state;
451         u32 checkpoint = l_ptr->next_in_no;
452         int was_active_link = tipc_link_is_active(l_ptr);
453
454         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
455
456         /* Link is down, accept any session */
457         l_ptr->peer_session = INVALID_SESSION;
458
459         /* Prepare for max packet size negotiation */
460         link_init_max_pkt(l_ptr);
461
462         l_ptr->state = RESET_UNKNOWN;
463
464         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
465                 return;
466
467         tipc_node_link_down(l_ptr->owner, l_ptr);
468         tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
469
470         if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
471             l_ptr->owner->permit_changeover) {
472                 l_ptr->reset_checkpoint = checkpoint;
473                 l_ptr->exp_msg_count = START_CHANGEOVER;
474         }
475
476         /* Clean up all queues: */
477         link_release_outqueue(l_ptr);
478         kfree_skb(l_ptr->proto_msg_queue);
479         l_ptr->proto_msg_queue = NULL;
480         buf = l_ptr->oldest_deferred_in;
481         while (buf) {
482                 struct sk_buff *next = buf->next;
483                 kfree_skb(buf);
484                 buf = next;
485         }
486         if (!list_empty(&l_ptr->waiting_ports))
487                 tipc_link_wakeup_ports(l_ptr, 1);
488
489         l_ptr->retransm_queue_head = 0;
490         l_ptr->retransm_queue_size = 0;
491         l_ptr->last_out = NULL;
492         l_ptr->first_out = NULL;
493         l_ptr->next_out = NULL;
494         l_ptr->unacked_window = 0;
495         l_ptr->checkpoint = 1;
496         l_ptr->next_out_no = 1;
497         l_ptr->deferred_inqueue_sz = 0;
498         l_ptr->oldest_deferred_in = NULL;
499         l_ptr->newest_deferred_in = NULL;
500         l_ptr->fsm_msg_cnt = 0;
501         l_ptr->stale_count = 0;
502         link_reset_statistics(l_ptr);
503 }
504
505
506 static void link_activate(struct tipc_link *l_ptr)
507 {
508         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
509         tipc_node_link_up(l_ptr->owner, l_ptr);
510         tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
511 }
512
513 /**
514  * link_state_event - link finite state machine
515  * @l_ptr: pointer to link
516  * @event: state machine event to process
517  */
518 static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
519 {
520         struct tipc_link *other;
521         u32 cont_intv = l_ptr->continuity_interval;
522
523         if (!l_ptr->started && (event != STARTING_EVT))
524                 return;         /* Not yet. */
525
526         if (link_blocked(l_ptr)) {
527                 if (event == TIMEOUT_EVT)
528                         link_set_timer(l_ptr, cont_intv);
529                 return;   /* Changeover going on */
530         }
531
532         switch (l_ptr->state) {
533         case WORKING_WORKING:
534                 switch (event) {
535                 case TRAFFIC_MSG_EVT:
536                 case ACTIVATE_MSG:
537                         break;
538                 case TIMEOUT_EVT:
539                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
540                                 l_ptr->checkpoint = l_ptr->next_in_no;
541                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
542                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
543                                                                  0, 0, 0, 0, 0);
544                                         l_ptr->fsm_msg_cnt++;
545                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
546                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
547                                                                  1, 0, 0, 0, 0);
548                                         l_ptr->fsm_msg_cnt++;
549                                 }
550                                 link_set_timer(l_ptr, cont_intv);
551                                 break;
552                         }
553                         l_ptr->state = WORKING_UNKNOWN;
554                         l_ptr->fsm_msg_cnt = 0;
555                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
556                         l_ptr->fsm_msg_cnt++;
557                         link_set_timer(l_ptr, cont_intv / 4);
558                         break;
559                 case RESET_MSG:
560                         pr_info("%s<%s>, requested by peer\n", link_rst_msg,
561                                 l_ptr->name);
562                         tipc_link_reset(l_ptr);
563                         l_ptr->state = RESET_RESET;
564                         l_ptr->fsm_msg_cnt = 0;
565                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
566                         l_ptr->fsm_msg_cnt++;
567                         link_set_timer(l_ptr, cont_intv);
568                         break;
569                 default:
570                         pr_err("%s%u in WW state\n", link_unk_evt, event);
571                 }
572                 break;
573         case WORKING_UNKNOWN:
574                 switch (event) {
575                 case TRAFFIC_MSG_EVT:
576                 case ACTIVATE_MSG:
577                         l_ptr->state = WORKING_WORKING;
578                         l_ptr->fsm_msg_cnt = 0;
579                         link_set_timer(l_ptr, cont_intv);
580                         break;
581                 case RESET_MSG:
582                         pr_info("%s<%s>, requested by peer while probing\n",
583                                 link_rst_msg, l_ptr->name);
584                         tipc_link_reset(l_ptr);
585                         l_ptr->state = RESET_RESET;
586                         l_ptr->fsm_msg_cnt = 0;
587                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
588                         l_ptr->fsm_msg_cnt++;
589                         link_set_timer(l_ptr, cont_intv);
590                         break;
591                 case TIMEOUT_EVT:
592                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
593                                 l_ptr->state = WORKING_WORKING;
594                                 l_ptr->fsm_msg_cnt = 0;
595                                 l_ptr->checkpoint = l_ptr->next_in_no;
596                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
597                                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
598                                                                  0, 0, 0, 0, 0);
599                                         l_ptr->fsm_msg_cnt++;
600                                 }
601                                 link_set_timer(l_ptr, cont_intv);
602                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
603                                 tipc_link_send_proto_msg(l_ptr, STATE_MSG,
604                                                          1, 0, 0, 0, 0);
605                                 l_ptr->fsm_msg_cnt++;
606                                 link_set_timer(l_ptr, cont_intv / 4);
607                         } else {        /* Link has failed */
608                                 pr_warn("%s<%s>, peer not responding\n",
609                                         link_rst_msg, l_ptr->name);
610                                 tipc_link_reset(l_ptr);
611                                 l_ptr->state = RESET_UNKNOWN;
612                                 l_ptr->fsm_msg_cnt = 0;
613                                 tipc_link_send_proto_msg(l_ptr, RESET_MSG,
614                                                          0, 0, 0, 0, 0);
615                                 l_ptr->fsm_msg_cnt++;
616                                 link_set_timer(l_ptr, cont_intv);
617                         }
618                         break;
619                 default:
620                         pr_err("%s%u in WU state\n", link_unk_evt, event);
621                 }
622                 break;
623         case RESET_UNKNOWN:
624                 switch (event) {
625                 case TRAFFIC_MSG_EVT:
626                         break;
627                 case ACTIVATE_MSG:
628                         other = l_ptr->owner->active_links[0];
629                         if (other && link_working_unknown(other))
630                                 break;
631                         l_ptr->state = WORKING_WORKING;
632                         l_ptr->fsm_msg_cnt = 0;
633                         link_activate(l_ptr);
634                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
635                         l_ptr->fsm_msg_cnt++;
636                         if (l_ptr->owner->working_links == 1)
637                                 tipc_link_send_sync(l_ptr);
638                         link_set_timer(l_ptr, cont_intv);
639                         break;
640                 case RESET_MSG:
641                         l_ptr->state = RESET_RESET;
642                         l_ptr->fsm_msg_cnt = 0;
643                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
644                         l_ptr->fsm_msg_cnt++;
645                         link_set_timer(l_ptr, cont_intv);
646                         break;
647                 case STARTING_EVT:
648                         l_ptr->started = 1;
649                         /* fall through */
650                 case TIMEOUT_EVT:
651                         tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
652                         l_ptr->fsm_msg_cnt++;
653                         link_set_timer(l_ptr, cont_intv);
654                         break;
655                 default:
656                         pr_err("%s%u in RU state\n", link_unk_evt, event);
657                 }
658                 break;
659         case RESET_RESET:
660                 switch (event) {
661                 case TRAFFIC_MSG_EVT:
662                 case ACTIVATE_MSG:
663                         other = l_ptr->owner->active_links[0];
664                         if (other && link_working_unknown(other))
665                                 break;
666                         l_ptr->state = WORKING_WORKING;
667                         l_ptr->fsm_msg_cnt = 0;
668                         link_activate(l_ptr);
669                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
670                         l_ptr->fsm_msg_cnt++;
671                         if (l_ptr->owner->working_links == 1)
672                                 tipc_link_send_sync(l_ptr);
673                         link_set_timer(l_ptr, cont_intv);
674                         break;
675                 case RESET_MSG:
676                         break;
677                 case TIMEOUT_EVT:
678                         tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
679                         l_ptr->fsm_msg_cnt++;
680                         link_set_timer(l_ptr, cont_intv);
681                         break;
682                 default:
683                         pr_err("%s%u in RR state\n", link_unk_evt, event);
684                 }
685                 break;
686         default:
687                 pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
688         }
689 }
690
691 /*
692  * link_bundle_buf(): Append contents of a buffer to
693  * the tail of an existing one.
694  */
695 static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
696                            struct sk_buff *buf)
697 {
698         struct tipc_msg *bundler_msg = buf_msg(bundler);
699         struct tipc_msg *msg = buf_msg(buf);
700         u32 size = msg_size(msg);
701         u32 bundle_size = msg_size(bundler_msg);
702         u32 to_pos = align(bundle_size);
703         u32 pad = to_pos - bundle_size;
704
705         if (msg_user(bundler_msg) != MSG_BUNDLER)
706                 return 0;
707         if (msg_type(bundler_msg) != OPEN_MSG)
708                 return 0;
709         if (skb_tailroom(bundler) < (pad + size))
710                 return 0;
711         if (l_ptr->max_pkt < (to_pos + size))
712                 return 0;
713
714         skb_put(bundler, pad + size);
715         skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
716         msg_set_size(bundler_msg, to_pos + size);
717         msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
718         kfree_skb(buf);
719         l_ptr->stats.sent_bundled++;
720         return 1;
721 }
722
723 static void link_add_to_outqueue(struct tipc_link *l_ptr,
724                                  struct sk_buff *buf,
725                                  struct tipc_msg *msg)
726 {
727         u32 ack = mod(l_ptr->next_in_no - 1);
728         u32 seqno = mod(l_ptr->next_out_no++);
729
730         msg_set_word(msg, 2, ((ack << 16) | seqno));
731         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
732         buf->next = NULL;
733         if (l_ptr->first_out) {
734                 l_ptr->last_out->next = buf;
735                 l_ptr->last_out = buf;
736         } else
737                 l_ptr->first_out = l_ptr->last_out = buf;
738
739         l_ptr->out_queue_size++;
740         if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
741                 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
742 }
743
744 static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
745                                        struct sk_buff *buf_chain,
746                                        u32 long_msgno)
747 {
748         struct sk_buff *buf;
749         struct tipc_msg *msg;
750
751         if (!l_ptr->next_out)
752                 l_ptr->next_out = buf_chain;
753         while (buf_chain) {
754                 buf = buf_chain;
755                 buf_chain = buf_chain->next;
756
757                 msg = buf_msg(buf);
758                 msg_set_long_msgno(msg, long_msgno);
759                 link_add_to_outqueue(l_ptr, buf, msg);
760         }
761 }
762
763 /*
764  * tipc_link_send_buf() is the 'full path' for messages, called from
765  * inside TIPC when the 'fast path' in tipc_send_buf
766  * has failed, and from link_send()
767  */
768 int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
769 {
770         struct tipc_msg *msg = buf_msg(buf);
771         u32 size = msg_size(msg);
772         u32 dsz = msg_data_sz(msg);
773         u32 queue_size = l_ptr->out_queue_size;
774         u32 imp = tipc_msg_tot_importance(msg);
775         u32 queue_limit = l_ptr->queue_limit[imp];
776         u32 max_packet = l_ptr->max_pkt;
777
778         /* Match msg importance against queue limits: */
779         if (unlikely(queue_size >= queue_limit)) {
780                 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
781                         link_schedule_port(l_ptr, msg_origport(msg), size);
782                         kfree_skb(buf);
783                         return -ELINKCONG;
784                 }
785                 kfree_skb(buf);
786                 if (imp > CONN_MANAGER) {
787                         pr_warn("%s<%s>, send queue full", link_rst_msg,
788                                 l_ptr->name);
789                         tipc_link_reset(l_ptr);
790                 }
791                 return dsz;
792         }
793
794         /* Fragmentation needed ? */
795         if (size > max_packet)
796                 return link_send_long_buf(l_ptr, buf);
797
798         /* Packet can be queued or sent. */
799         if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
800                    !link_congested(l_ptr))) {
801                 link_add_to_outqueue(l_ptr, buf, msg);
802
803                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
804                 l_ptr->unacked_window = 0;
805                 return dsz;
806         }
807         /* Congestion: can message be bundled ? */
808         if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
809             (msg_user(msg) != MSG_FRAGMENTER)) {
810
811                 /* Try adding message to an existing bundle */
812                 if (l_ptr->next_out &&
813                     link_bundle_buf(l_ptr, l_ptr->last_out, buf))
814                         return dsz;
815
816                 /* Try creating a new bundle */
817                 if (size <= max_packet * 2 / 3) {
818                         struct sk_buff *bundler = tipc_buf_acquire(max_packet);
819                         struct tipc_msg bundler_hdr;
820
821                         if (bundler) {
822                                 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
823                                          INT_H_SIZE, l_ptr->addr);
824                                 skb_copy_to_linear_data(bundler, &bundler_hdr,
825                                                         INT_H_SIZE);
826                                 skb_trim(bundler, INT_H_SIZE);
827                                 link_bundle_buf(l_ptr, bundler, buf);
828                                 buf = bundler;
829                                 msg = buf_msg(buf);
830                                 l_ptr->stats.sent_bundles++;
831                         }
832                 }
833         }
834         if (!l_ptr->next_out)
835                 l_ptr->next_out = buf;
836         link_add_to_outqueue(l_ptr, buf, msg);
837         return dsz;
838 }
839
840 /*
841  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
842  * not been selected yet, and the the owner node is not locked
843  * Called by TIPC internal users, e.g. the name distributor
844  */
845 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
846 {
847         struct tipc_link *l_ptr;
848         struct tipc_node *n_ptr;
849         int res = -ELINKCONG;
850
851         read_lock_bh(&tipc_net_lock);
852         n_ptr = tipc_node_find(dest);
853         if (n_ptr) {
854                 tipc_node_lock(n_ptr);
855                 l_ptr = n_ptr->active_links[selector & 1];
856                 if (l_ptr)
857                         res = tipc_link_send_buf(l_ptr, buf);
858                 else
859                         kfree_skb(buf);
860                 tipc_node_unlock(n_ptr);
861         } else {
862                 kfree_skb(buf);
863         }
864         read_unlock_bh(&tipc_net_lock);
865         return res;
866 }
867
868 /*
869  * tipc_link_send_sync - synchronize broadcast link endpoints.
870  *
871  * Give a newly added peer node the sequence number where it should
872  * start receiving and acking broadcast packets.
873  *
874  * Called with node locked
875  */
876 static void tipc_link_send_sync(struct tipc_link *l)
877 {
878         struct sk_buff *buf;
879         struct tipc_msg *msg;
880
881         buf = tipc_buf_acquire(INT_H_SIZE);
882         if (!buf)
883                 return;
884
885         msg = buf_msg(buf);
886         tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
887         msg_set_last_bcast(msg, l->owner->bclink.acked);
888         link_add_chain_to_outqueue(l, buf, 0);
889         tipc_link_push_queue(l);
890 }
891
892 /*
893  * tipc_link_recv_sync - synchronize broadcast link endpoints.
894  * Receive the sequence number where we should start receiving and
895  * acking broadcast packets from a newly added peer node, and open
896  * up for reception of such packets.
897  *
898  * Called with node locked
899  */
900 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
901 {
902         struct tipc_msg *msg = buf_msg(buf);
903
904         n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
905         n->bclink.recv_permitted = true;
906         kfree_skb(buf);
907 }
908
909 /*
910  * tipc_link_send_names - send name table entries to new neighbor
911  *
912  * Send routine for bulk delivery of name table messages when contact
913  * with a new neighbor occurs. No link congestion checking is performed
914  * because name table messages *must* be delivered. The messages must be
915  * small enough not to require fragmentation.
916  * Called without any locks held.
917  */
918 void tipc_link_send_names(struct list_head *message_list, u32 dest)
919 {
920         struct tipc_node *n_ptr;
921         struct tipc_link *l_ptr;
922         struct sk_buff *buf;
923         struct sk_buff *temp_buf;
924
925         if (list_empty(message_list))
926                 return;
927
928         read_lock_bh(&tipc_net_lock);
929         n_ptr = tipc_node_find(dest);
930         if (n_ptr) {
931                 tipc_node_lock(n_ptr);
932                 l_ptr = n_ptr->active_links[0];
933                 if (l_ptr) {
934                         /* convert circular list to linear list */
935                         ((struct sk_buff *)message_list->prev)->next = NULL;
936                         link_add_chain_to_outqueue(l_ptr,
937                                 (struct sk_buff *)message_list->next, 0);
938                         tipc_link_push_queue(l_ptr);
939                         INIT_LIST_HEAD(message_list);
940                 }
941                 tipc_node_unlock(n_ptr);
942         }
943         read_unlock_bh(&tipc_net_lock);
944
945         /* discard the messages if they couldn't be sent */
946         list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
947                 list_del((struct list_head *)buf);
948                 kfree_skb(buf);
949         }
950 }
951
952 /*
953  * link_send_buf_fast: Entry for data messages where the
954  * destination link is known and the header is complete,
955  * inclusive total message length. Very time critical.
956  * Link is locked. Returns user data length.
957  */
958 static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
959                               u32 *used_max_pkt)
960 {
961         struct tipc_msg *msg = buf_msg(buf);
962         int res = msg_data_sz(msg);
963
964         if (likely(!link_congested(l_ptr))) {
965                 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
966                         if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
967                                 link_add_to_outqueue(l_ptr, buf, msg);
968                                 tipc_bearer_send(l_ptr->b_ptr, buf,
969                                                  &l_ptr->media_addr);
970                                 l_ptr->unacked_window = 0;
971                                 return res;
972                         }
973                 } else
974                         *used_max_pkt = l_ptr->max_pkt;
975         }
976         return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
977 }
978
979 /*
980  * tipc_link_send_sections_fast: Entry for messages where the
981  * destination processor is known and the header is complete,
982  * except for total message length.
983  * Returns user data length or errno.
984  */
985 int tipc_link_send_sections_fast(struct tipc_port *sender,
986                                  struct iovec const *msg_sect,
987                                  unsigned int len, u32 destaddr)
988 {
989         struct tipc_msg *hdr = &sender->phdr;
990         struct tipc_link *l_ptr;
991         struct sk_buff *buf;
992         struct tipc_node *node;
993         int res;
994         u32 selector = msg_origport(hdr) & 1;
995
996 again:
997         /*
998          * Try building message using port's max_pkt hint.
999          * (Must not hold any locks while building message.)
1000          */
1001         res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
1002         /* Exit if build request was invalid */
1003         if (unlikely(res < 0))
1004                 return res;
1005
1006         read_lock_bh(&tipc_net_lock);
1007         node = tipc_node_find(destaddr);
1008         if (likely(node)) {
1009                 tipc_node_lock(node);
1010                 l_ptr = node->active_links[selector];
1011                 if (likely(l_ptr)) {
1012                         if (likely(buf)) {
1013                                 res = link_send_buf_fast(l_ptr, buf,
1014                                                          &sender->max_pkt);
1015 exit:
1016                                 tipc_node_unlock(node);
1017                                 read_unlock_bh(&tipc_net_lock);
1018                                 return res;
1019                         }
1020
1021                         /* Exit if link (or bearer) is congested */
1022                         if (link_congested(l_ptr) ||
1023                             tipc_bearer_blocked(l_ptr->b_ptr)) {
1024                                 res = link_schedule_port(l_ptr,
1025                                                          sender->ref, res);
1026                                 goto exit;
1027                         }
1028
1029                         /*
1030                          * Message size exceeds max_pkt hint; update hint,
1031                          * then re-try fast path or fragment the message
1032                          */
1033                         sender->max_pkt = l_ptr->max_pkt;
1034                         tipc_node_unlock(node);
1035                         read_unlock_bh(&tipc_net_lock);
1036
1037
1038                         if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1039                                 goto again;
1040
1041                         return link_send_sections_long(sender, msg_sect, len,
1042                                                        destaddr);
1043                 }
1044                 tipc_node_unlock(node);
1045         }
1046         read_unlock_bh(&tipc_net_lock);
1047
1048         /* Couldn't find a link to the destination node */
1049         if (buf)
1050                 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1051         if (res >= 0)
1052                 return tipc_port_reject_sections(sender, hdr, msg_sect,
1053                                                  len, TIPC_ERR_NO_NODE);
1054         return res;
1055 }
1056
1057 /*
1058  * link_send_sections_long(): Entry for long messages where the
1059  * destination node is known and the header is complete,
1060  * inclusive total message length.
1061  * Link and bearer congestion status have been checked to be ok,
1062  * and are ignored if they change.
1063  *
1064  * Note that fragments do not use the full link MTU so that they won't have
1065  * to undergo refragmentation if link changeover causes them to be sent
1066  * over another link with an additional tunnel header added as prefix.
1067  * (Refragmentation will still occur if the other link has a smaller MTU.)
1068  *
1069  * Returns user data length or errno.
1070  */
1071 static int link_send_sections_long(struct tipc_port *sender,
1072                                    struct iovec const *msg_sect,
1073                                    unsigned int len, u32 destaddr)
1074 {
1075         struct tipc_link *l_ptr;
1076         struct tipc_node *node;
1077         struct tipc_msg *hdr = &sender->phdr;
1078         u32 dsz = len;
1079         u32 max_pkt, fragm_sz, rest;
1080         struct tipc_msg fragm_hdr;
1081         struct sk_buff *buf, *buf_chain, *prev;
1082         u32 fragm_crs, fragm_rest, hsz, sect_rest;
1083         const unchar __user *sect_crs;
1084         int curr_sect;
1085         u32 fragm_no;
1086         int res = 0;
1087
1088 again:
1089         fragm_no = 1;
1090         max_pkt = sender->max_pkt - INT_H_SIZE;
1091                 /* leave room for tunnel header in case of link changeover */
1092         fragm_sz = max_pkt - INT_H_SIZE;
1093                 /* leave room for fragmentation header in each fragment */
1094         rest = dsz;
1095         fragm_crs = 0;
1096         fragm_rest = 0;
1097         sect_rest = 0;
1098         sect_crs = NULL;
1099         curr_sect = -1;
1100
1101         /* Prepare reusable fragment header */
1102         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1103                  INT_H_SIZE, msg_destnode(hdr));
1104         msg_set_size(&fragm_hdr, max_pkt);
1105         msg_set_fragm_no(&fragm_hdr, 1);
1106
1107         /* Prepare header of first fragment */
1108         buf_chain = buf = tipc_buf_acquire(max_pkt);
1109         if (!buf)
1110                 return -ENOMEM;
1111         buf->next = NULL;
1112         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1113         hsz = msg_hdr_sz(hdr);
1114         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1115
1116         /* Chop up message */
1117         fragm_crs = INT_H_SIZE + hsz;
1118         fragm_rest = fragm_sz - hsz;
1119
1120         do {            /* For all sections */
1121                 u32 sz;
1122
1123                 if (!sect_rest) {
1124                         sect_rest = msg_sect[++curr_sect].iov_len;
1125                         sect_crs = msg_sect[curr_sect].iov_base;
1126                 }
1127
1128                 if (sect_rest < fragm_rest)
1129                         sz = sect_rest;
1130                 else
1131                         sz = fragm_rest;
1132
1133                 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1134                         res = -EFAULT;
1135 error:
1136                         for (; buf_chain; buf_chain = buf) {
1137                                 buf = buf_chain->next;
1138                                 kfree_skb(buf_chain);
1139                         }
1140                         return res;
1141                 }
1142                 sect_crs += sz;
1143                 sect_rest -= sz;
1144                 fragm_crs += sz;
1145                 fragm_rest -= sz;
1146                 rest -= sz;
1147
1148                 if (!fragm_rest && rest) {
1149
1150                         /* Initiate new fragment: */
1151                         if (rest <= fragm_sz) {
1152                                 fragm_sz = rest;
1153                                 msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1154                         } else {
1155                                 msg_set_type(&fragm_hdr, FRAGMENT);
1156                         }
1157                         msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1158                         msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1159                         prev = buf;
1160                         buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1161                         if (!buf) {
1162                                 res = -ENOMEM;
1163                                 goto error;
1164                         }
1165
1166                         buf->next = NULL;
1167                         prev->next = buf;
1168                         skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1169                         fragm_crs = INT_H_SIZE;
1170                         fragm_rest = fragm_sz;
1171                 }
1172         } while (rest > 0);
1173
1174         /*
1175          * Now we have a buffer chain. Select a link and check
1176          * that packet size is still OK
1177          */
1178         node = tipc_node_find(destaddr);
1179         if (likely(node)) {
1180                 tipc_node_lock(node);
1181                 l_ptr = node->active_links[sender->ref & 1];
1182                 if (!l_ptr) {
1183                         tipc_node_unlock(node);
1184                         goto reject;
1185                 }
1186                 if (l_ptr->max_pkt < max_pkt) {
1187                         sender->max_pkt = l_ptr->max_pkt;
1188                         tipc_node_unlock(node);
1189                         for (; buf_chain; buf_chain = buf) {
1190                                 buf = buf_chain->next;
1191                                 kfree_skb(buf_chain);
1192                         }
1193                         goto again;
1194                 }
1195         } else {
1196 reject:
1197                 for (; buf_chain; buf_chain = buf) {
1198                         buf = buf_chain->next;
1199                         kfree_skb(buf_chain);
1200                 }
1201                 return tipc_port_reject_sections(sender, hdr, msg_sect,
1202                                                  len, TIPC_ERR_NO_NODE);
1203         }
1204
1205         /* Append chain of fragments to send queue & send them */
1206         l_ptr->long_msg_seq_no++;
1207         link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
1208         l_ptr->stats.sent_fragments += fragm_no;
1209         l_ptr->stats.sent_fragmented++;
1210         tipc_link_push_queue(l_ptr);
1211         tipc_node_unlock(node);
1212         return dsz;
1213 }
1214
1215 /*
1216  * tipc_link_push_packet: Push one unsent packet to the media
1217  */
1218 u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1219 {
1220         struct sk_buff *buf = l_ptr->first_out;
1221         u32 r_q_size = l_ptr->retransm_queue_size;
1222         u32 r_q_head = l_ptr->retransm_queue_head;
1223
1224         /* Step to position where retransmission failed, if any,    */
1225         /* consider that buffers may have been released in meantime */
1226         if (r_q_size && buf) {
1227                 u32 last = lesser(mod(r_q_head + r_q_size),
1228                                   link_last_sent(l_ptr));
1229                 u32 first = buf_seqno(buf);
1230
1231                 while (buf && less(first, r_q_head)) {
1232                         first = mod(first + 1);
1233                         buf = buf->next;
1234                 }
1235                 l_ptr->retransm_queue_head = r_q_head = first;
1236                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1237         }
1238
1239         /* Continue retransmission now, if there is anything: */
1240         if (r_q_size && buf) {
1241                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1242                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1243                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1244                 l_ptr->retransm_queue_head = mod(++r_q_head);
1245                 l_ptr->retransm_queue_size = --r_q_size;
1246                 l_ptr->stats.retransmitted++;
1247                 return 0;
1248         }
1249
1250         /* Send deferred protocol message, if any: */
1251         buf = l_ptr->proto_msg_queue;
1252         if (buf) {
1253                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1254                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1255                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1256                 l_ptr->unacked_window = 0;
1257                 kfree_skb(buf);
1258                 l_ptr->proto_msg_queue = NULL;
1259                 return 0;
1260         }
1261
1262         /* Send one deferred data message, if send window not full: */
1263         buf = l_ptr->next_out;
1264         if (buf) {
1265                 struct tipc_msg *msg = buf_msg(buf);
1266                 u32 next = msg_seqno(msg);
1267                 u32 first = buf_seqno(l_ptr->first_out);
1268
1269                 if (mod(next - first) < l_ptr->queue_limit[0]) {
1270                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1271                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1272                         tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1273                         if (msg_user(msg) == MSG_BUNDLER)
1274                                 msg_set_type(msg, CLOSED_MSG);
1275                         l_ptr->next_out = buf->next;
1276                         return 0;
1277                 }
1278         }
1279         return 1;
1280 }
1281
1282 /*
1283  * push_queue(): push out the unsent messages of a link where
1284  *               congestion has abated. Node is locked
1285  */
1286 void tipc_link_push_queue(struct tipc_link *l_ptr)
1287 {
1288         u32 res;
1289
1290         if (tipc_bearer_blocked(l_ptr->b_ptr))
1291                 return;
1292
1293         do {
1294                 res = tipc_link_push_packet(l_ptr);
1295         } while (!res);
1296 }
1297
1298 static void link_reset_all(unsigned long addr)
1299 {
1300         struct tipc_node *n_ptr;
1301         char addr_string[16];
1302         u32 i;
1303
1304         read_lock_bh(&tipc_net_lock);
1305         n_ptr = tipc_node_find((u32)addr);
1306         if (!n_ptr) {
1307                 read_unlock_bh(&tipc_net_lock);
1308                 return; /* node no longer exists */
1309         }
1310
1311         tipc_node_lock(n_ptr);
1312
1313         pr_warn("Resetting all links to %s\n",
1314                 tipc_addr_string_fill(addr_string, n_ptr->addr));
1315
1316         for (i = 0; i < MAX_BEARERS; i++) {
1317                 if (n_ptr->links[i]) {
1318                         link_print(n_ptr->links[i], "Resetting link\n");
1319                         tipc_link_reset(n_ptr->links[i]);
1320                 }
1321         }
1322
1323         tipc_node_unlock(n_ptr);
1324         read_unlock_bh(&tipc_net_lock);
1325 }
1326
1327 static void link_retransmit_failure(struct tipc_link *l_ptr,
1328                                     struct sk_buff *buf)
1329 {
1330         struct tipc_msg *msg = buf_msg(buf);
1331
1332         pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
1333
1334         if (l_ptr->addr) {
1335                 /* Handle failure on standard link */
1336                 link_print(l_ptr, "Resetting link\n");
1337                 tipc_link_reset(l_ptr);
1338
1339         } else {
1340                 /* Handle failure on broadcast link */
1341                 struct tipc_node *n_ptr;
1342                 char addr_string[16];
1343
1344                 pr_info("Msg seq number: %u,  ", msg_seqno(msg));
1345                 pr_cont("Outstanding acks: %lu\n",
1346                         (unsigned long) TIPC_SKB_CB(buf)->handle);
1347
1348                 n_ptr = tipc_bclink_retransmit_to();
1349                 tipc_node_lock(n_ptr);
1350
1351                 tipc_addr_string_fill(addr_string, n_ptr->addr);
1352                 pr_info("Broadcast link info for %s\n", addr_string);
1353                 pr_info("Reception permitted: %d,  Acked: %u\n",
1354                         n_ptr->bclink.recv_permitted,
1355                         n_ptr->bclink.acked);
1356                 pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
1357                         n_ptr->bclink.last_in,
1358                         n_ptr->bclink.oos_state,
1359                         n_ptr->bclink.last_sent);
1360
1361                 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1362
1363                 tipc_node_unlock(n_ptr);
1364
1365                 l_ptr->stale_count = 0;
1366         }
1367 }
1368
1369 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1370                           u32 retransmits)
1371 {
1372         struct tipc_msg *msg;
1373
1374         if (!buf)
1375                 return;
1376
1377         msg = buf_msg(buf);
1378
1379         if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1380                 if (l_ptr->retransm_queue_size == 0) {
1381                         l_ptr->retransm_queue_head = msg_seqno(msg);
1382                         l_ptr->retransm_queue_size = retransmits;
1383                 } else {
1384                         pr_err("Unexpected retransmit on link %s (qsize=%d)\n",
1385                                l_ptr->name, l_ptr->retransm_queue_size);
1386                 }
1387                 return;
1388         } else {
1389                 /* Detect repeated retransmit failures on unblocked bearer */
1390                 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1391                         if (++l_ptr->stale_count > 100) {
1392                                 link_retransmit_failure(l_ptr, buf);
1393                                 return;
1394                         }
1395                 } else {
1396                         l_ptr->last_retransmitted = msg_seqno(msg);
1397                         l_ptr->stale_count = 1;
1398                 }
1399         }
1400
1401         while (retransmits && (buf != l_ptr->next_out) && buf) {
1402                 msg = buf_msg(buf);
1403                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1404                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1405                 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1406                 buf = buf->next;
1407                 retransmits--;
1408                 l_ptr->stats.retransmitted++;
1409         }
1410
1411         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1412 }
1413
1414 /**
1415  * link_insert_deferred_queue - insert deferred messages back into receive chain
1416  */
1417 static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1418                                                   struct sk_buff *buf)
1419 {
1420         u32 seq_no;
1421
1422         if (l_ptr->oldest_deferred_in == NULL)
1423                 return buf;
1424
1425         seq_no = buf_seqno(l_ptr->oldest_deferred_in);
1426         if (seq_no == mod(l_ptr->next_in_no)) {
1427                 l_ptr->newest_deferred_in->next = buf;
1428                 buf = l_ptr->oldest_deferred_in;
1429                 l_ptr->oldest_deferred_in = NULL;
1430                 l_ptr->deferred_inqueue_sz = 0;
1431         }
1432         return buf;
1433 }
1434
1435 /**
1436  * link_recv_buf_validate - validate basic format of received message
1437  *
1438  * This routine ensures a TIPC message has an acceptable header, and at least
1439  * as much data as the header indicates it should.  The routine also ensures
1440  * that the entire message header is stored in the main fragment of the message
1441  * buffer, to simplify future access to message header fields.
1442  *
1443  * Note: Having extra info present in the message header or data areas is OK.
1444  * TIPC will ignore the excess, under the assumption that it is optional info
1445  * introduced by a later release of the protocol.
1446  */
1447 static int link_recv_buf_validate(struct sk_buff *buf)
1448 {
1449         static u32 min_data_hdr_size[8] = {
1450                 SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1451                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1452                 };
1453
1454         struct tipc_msg *msg;
1455         u32 tipc_hdr[2];
1456         u32 size;
1457         u32 hdr_size;
1458         u32 min_hdr_size;
1459
1460         if (unlikely(buf->len < MIN_H_SIZE))
1461                 return 0;
1462
1463         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1464         if (msg == NULL)
1465                 return 0;
1466
1467         if (unlikely(msg_version(msg) != TIPC_VERSION))
1468                 return 0;
1469
1470         size = msg_size(msg);
1471         hdr_size = msg_hdr_sz(msg);
1472         min_hdr_size = msg_isdata(msg) ?
1473                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1474
1475         if (unlikely((hdr_size < min_hdr_size) ||
1476                      (size < hdr_size) ||
1477                      (buf->len < size) ||
1478                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1479                 return 0;
1480
1481         return pskb_may_pull(buf, hdr_size);
1482 }
1483
1484 /**
1485  * tipc_recv_msg - process TIPC messages arriving from off-node
1486  * @head: pointer to message buffer chain
1487  * @tb_ptr: pointer to bearer message arrived on
1488  *
1489  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1490  * structure (i.e. cannot be NULL), but bearer can be inactive.
1491  */
1492 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1493 {
1494         read_lock_bh(&tipc_net_lock);
1495         while (head) {
1496                 struct tipc_node *n_ptr;
1497                 struct tipc_link *l_ptr;
1498                 struct sk_buff *crs;
1499                 struct sk_buff *buf = head;
1500                 struct tipc_msg *msg;
1501                 u32 seq_no;
1502                 u32 ackd;
1503                 u32 released = 0;
1504                 int type;
1505
1506                 head = head->next;
1507
1508                 /* Ensure bearer is still enabled */
1509                 if (unlikely(!b_ptr->active))
1510                         goto discard;
1511
1512                 /* Ensure message is well-formed */
1513                 if (unlikely(!link_recv_buf_validate(buf)))
1514                         goto discard;
1515
1516                 /* Ensure message data is a single contiguous unit */
1517                 if (unlikely(skb_linearize(buf)))
1518                         goto discard;
1519
1520                 /* Handle arrival of a non-unicast link message */
1521                 msg = buf_msg(buf);
1522
1523                 if (unlikely(msg_non_seq(msg))) {
1524                         if (msg_user(msg) ==  LINK_CONFIG)
1525                                 tipc_disc_recv_msg(buf, b_ptr);
1526                         else
1527                                 tipc_bclink_recv_pkt(buf);
1528                         continue;
1529                 }
1530
1531                 /* Discard unicast link messages destined for another node */
1532                 if (unlikely(!msg_short(msg) &&
1533                              (msg_destnode(msg) != tipc_own_addr)))
1534                         goto discard;
1535
1536                 /* Locate neighboring node that sent message */
1537                 n_ptr = tipc_node_find(msg_prevnode(msg));
1538                 if (unlikely(!n_ptr))
1539                         goto discard;
1540                 tipc_node_lock(n_ptr);
1541
1542                 /* Locate unicast link endpoint that should handle message */
1543                 l_ptr = n_ptr->links[b_ptr->identity];
1544                 if (unlikely(!l_ptr))
1545                         goto unlock_discard;
1546
1547                 /* Verify that communication with node is currently allowed */
1548                 if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
1549                         msg_user(msg) == LINK_PROTOCOL &&
1550                         (msg_type(msg) == RESET_MSG ||
1551                                         msg_type(msg) == ACTIVATE_MSG) &&
1552                         !msg_redundant_link(msg))
1553                         n_ptr->block_setup &= ~WAIT_PEER_DOWN;
1554
1555                 if (n_ptr->block_setup)
1556                         goto unlock_discard;
1557
1558                 /* Validate message sequence number info */
1559                 seq_no = msg_seqno(msg);
1560                 ackd = msg_ack(msg);
1561
1562                 /* Release acked messages */
1563                 if (n_ptr->bclink.recv_permitted)
1564                         tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1565
1566                 crs = l_ptr->first_out;
1567                 while ((crs != l_ptr->next_out) &&
1568                        less_eq(buf_seqno(crs), ackd)) {
1569                         struct sk_buff *next = crs->next;
1570
1571                         kfree_skb(crs);
1572                         crs = next;
1573                         released++;
1574                 }
1575                 if (released) {
1576                         l_ptr->first_out = crs;
1577                         l_ptr->out_queue_size -= released;
1578                 }
1579
1580                 /* Try sending any messages link endpoint has pending */
1581                 if (unlikely(l_ptr->next_out))
1582                         tipc_link_push_queue(l_ptr);
1583                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1584                         tipc_link_wakeup_ports(l_ptr, 0);
1585                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1586                         l_ptr->stats.sent_acks++;
1587                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1588                 }
1589
1590                 /* Now (finally!) process the incoming message */
1591 protocol_check:
1592                 if (unlikely(!link_working_working(l_ptr))) {
1593                         if (msg_user(msg) == LINK_PROTOCOL) {
1594                                 link_recv_proto_msg(l_ptr, buf);
1595                                 head = link_insert_deferred_queue(l_ptr, head);
1596                                 tipc_node_unlock(n_ptr);
1597                                 continue;
1598                         }
1599
1600                         /* Traffic message. Conditionally activate link */
1601                         link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1602
1603                         if (link_working_working(l_ptr)) {
1604                                 /* Re-insert buffer in front of queue */
1605                                 buf->next = head;
1606                                 head = buf;
1607                                 tipc_node_unlock(n_ptr);
1608                                 continue;
1609                         }
1610                         goto unlock_discard;
1611                 }
1612
1613                 /* Link is now in state WORKING_WORKING */
1614                 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1615                         link_handle_out_of_seq_msg(l_ptr, buf);
1616                         head = link_insert_deferred_queue(l_ptr, head);
1617                         tipc_node_unlock(n_ptr);
1618                         continue;
1619                 }
1620                 l_ptr->next_in_no++;
1621                 if (unlikely(l_ptr->oldest_deferred_in))
1622                         head = link_insert_deferred_queue(l_ptr, head);
1623 deliver:
1624                 if (likely(msg_isdata(msg))) {
1625                         tipc_node_unlock(n_ptr);
1626                         tipc_port_recv_msg(buf);
1627                         continue;
1628                 }
1629                 switch (msg_user(msg)) {
1630                         int ret;
1631                 case MSG_BUNDLER:
1632                         l_ptr->stats.recv_bundles++;
1633                         l_ptr->stats.recv_bundled += msg_msgcnt(msg);
1634                         tipc_node_unlock(n_ptr);
1635                         tipc_link_recv_bundle(buf);
1636                         continue;
1637                 case NAME_DISTRIBUTOR:
1638                         n_ptr->bclink.recv_permitted = true;
1639                         tipc_node_unlock(n_ptr);
1640                         tipc_named_recv(buf);
1641                         continue;
1642                 case BCAST_PROTOCOL:
1643                         tipc_link_recv_sync(n_ptr, buf);
1644                         tipc_node_unlock(n_ptr);
1645                         continue;
1646                 case CONN_MANAGER:
1647                         tipc_node_unlock(n_ptr);
1648                         tipc_port_recv_proto_msg(buf);
1649                         continue;
1650                 case MSG_FRAGMENTER:
1651                         l_ptr->stats.recv_fragments++;
1652                         ret = tipc_link_recv_fragment(&l_ptr->defragm_buf,
1653                                                       &buf, &msg);
1654                         if (ret == 1) {
1655                                 l_ptr->stats.recv_fragmented++;
1656                                 goto deliver;
1657                         }
1658                         if (ret == -1)
1659                                 l_ptr->next_in_no--;
1660                         tipc_node_unlock(n_ptr);
1661                         continue;
1662                 case CHANGEOVER_PROTOCOL:
1663                         type = msg_type(msg);
1664                         if (link_recv_changeover_msg(&l_ptr, &buf)) {
1665                                 msg = buf_msg(buf);
1666                                 seq_no = msg_seqno(msg);
1667                                 if (type == ORIGINAL_MSG)
1668                                         goto deliver;
1669                                 goto protocol_check;
1670                         }
1671                         break;
1672                 default:
1673                         kfree_skb(buf);
1674                         buf = NULL;
1675                         break;
1676                 }
1677                 tipc_node_unlock(n_ptr);
1678                 tipc_net_route_msg(buf);
1679                 continue;
1680 unlock_discard:
1681
1682                 tipc_node_unlock(n_ptr);
1683 discard:
1684                 kfree_skb(buf);
1685         }
1686         read_unlock_bh(&tipc_net_lock);
1687 }
1688
1689 /**
1690  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1691  *
1692  * Returns increase in queue length (i.e. 0 or 1)
1693  */
1694 u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
1695                         struct sk_buff *buf)
1696 {
1697         struct sk_buff *queue_buf;
1698         struct sk_buff **prev;
1699         u32 seq_no = buf_seqno(buf);
1700
1701         buf->next = NULL;
1702
1703         /* Empty queue ? */
1704         if (*head == NULL) {
1705                 *head = *tail = buf;
1706                 return 1;
1707         }
1708
1709         /* Last ? */
1710         if (less(buf_seqno(*tail), seq_no)) {
1711                 (*tail)->next = buf;
1712                 *tail = buf;
1713                 return 1;
1714         }
1715
1716         /* Locate insertion point in queue, then insert; discard if duplicate */
1717         prev = head;
1718         queue_buf = *head;
1719         for (;;) {
1720                 u32 curr_seqno = buf_seqno(queue_buf);
1721
1722                 if (seq_no == curr_seqno) {
1723                         kfree_skb(buf);
1724                         return 0;
1725                 }
1726
1727                 if (less(seq_no, curr_seqno))
1728                         break;
1729
1730                 prev = &queue_buf->next;
1731                 queue_buf = queue_buf->next;
1732         }
1733
1734         buf->next = queue_buf;
1735         *prev = buf;
1736         return 1;
1737 }
1738
1739 /*
1740  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1741  */
1742 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1743                                        struct sk_buff *buf)
1744 {
1745         u32 seq_no = buf_seqno(buf);
1746
1747         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1748                 link_recv_proto_msg(l_ptr, buf);
1749                 return;
1750         }
1751
1752         /* Record OOS packet arrival (force mismatch on next timeout) */
1753         l_ptr->checkpoint--;
1754
1755         /*
1756          * Discard packet if a duplicate; otherwise add it to deferred queue
1757          * and notify peer of gap as per protocol specification
1758          */
1759         if (less(seq_no, mod(l_ptr->next_in_no))) {
1760                 l_ptr->stats.duplicates++;
1761                 kfree_skb(buf);
1762                 return;
1763         }
1764
1765         if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1766                                 &l_ptr->newest_deferred_in, buf)) {
1767                 l_ptr->deferred_inqueue_sz++;
1768                 l_ptr->stats.deferred_recv++;
1769                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1770                         tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1771         } else
1772                 l_ptr->stats.duplicates++;
1773 }
1774
1775 /*
1776  * Send protocol message to the other endpoint.
1777  */
1778 void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1779                               int probe_msg, u32 gap, u32 tolerance,
1780                               u32 priority, u32 ack_mtu)
1781 {
1782         struct sk_buff *buf = NULL;
1783         struct tipc_msg *msg = l_ptr->pmsg;
1784         u32 msg_size = sizeof(l_ptr->proto_msg);
1785         int r_flag;
1786
1787         /* Discard any previous message that was deferred due to congestion */
1788         if (l_ptr->proto_msg_queue) {
1789                 kfree_skb(l_ptr->proto_msg_queue);
1790                 l_ptr->proto_msg_queue = NULL;
1791         }
1792
1793         if (link_blocked(l_ptr))
1794                 return;
1795
1796         /* Abort non-RESET send if communication with node is prohibited */
1797         if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
1798                 return;
1799
1800         /* Create protocol message with "out-of-sequence" sequence number */
1801         msg_set_type(msg, msg_typ);
1802         msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
1803         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1804         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1805
1806         if (msg_typ == STATE_MSG) {
1807                 u32 next_sent = mod(l_ptr->next_out_no);
1808
1809                 if (!tipc_link_is_up(l_ptr))
1810                         return;
1811                 if (l_ptr->next_out)
1812                         next_sent = buf_seqno(l_ptr->next_out);
1813                 msg_set_next_sent(msg, next_sent);
1814                 if (l_ptr->oldest_deferred_in) {
1815                         u32 rec = buf_seqno(l_ptr->oldest_deferred_in);
1816                         gap = mod(rec - mod(l_ptr->next_in_no));
1817                 }
1818                 msg_set_seq_gap(msg, gap);
1819                 if (gap)
1820                         l_ptr->stats.sent_nacks++;
1821                 msg_set_link_tolerance(msg, tolerance);
1822                 msg_set_linkprio(msg, priority);
1823                 msg_set_max_pkt(msg, ack_mtu);
1824                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1825                 msg_set_probe(msg, probe_msg != 0);
1826                 if (probe_msg) {
1827                         u32 mtu = l_ptr->max_pkt;
1828
1829                         if ((mtu < l_ptr->max_pkt_target) &&
1830                             link_working_working(l_ptr) &&
1831                             l_ptr->fsm_msg_cnt) {
1832                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1833                                 if (l_ptr->max_pkt_probes == 10) {
1834                                         l_ptr->max_pkt_target = (msg_size - 4);
1835                                         l_ptr->max_pkt_probes = 0;
1836                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1837                                 }
1838                                 l_ptr->max_pkt_probes++;
1839                         }
1840
1841                         l_ptr->stats.sent_probes++;
1842                 }
1843                 l_ptr->stats.sent_states++;
1844         } else {                /* RESET_MSG or ACTIVATE_MSG */
1845                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1846                 msg_set_seq_gap(msg, 0);
1847                 msg_set_next_sent(msg, 1);
1848                 msg_set_probe(msg, 0);
1849                 msg_set_link_tolerance(msg, l_ptr->tolerance);
1850                 msg_set_linkprio(msg, l_ptr->priority);
1851                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1852         }
1853
1854         r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1855         msg_set_redundant_link(msg, r_flag);
1856         msg_set_linkprio(msg, l_ptr->priority);
1857         msg_set_size(msg, msg_size);
1858
1859         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1860
1861         buf = tipc_buf_acquire(msg_size);
1862         if (!buf)
1863                 return;
1864
1865         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1866         buf->priority = TC_PRIO_CONTROL;
1867
1868         /* Defer message if bearer is already blocked */
1869         if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1870                 l_ptr->proto_msg_queue = buf;
1871                 return;
1872         }
1873
1874         tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1875         l_ptr->unacked_window = 0;
1876         kfree_skb(buf);
1877 }
1878
1879 /*
1880  * Receive protocol message :
1881  * Note that network plane id propagates through the network, and may
1882  * change at any time. The node with lowest address rules
1883  */
1884 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
1885 {
1886         u32 rec_gap = 0;
1887         u32 max_pkt_info;
1888         u32 max_pkt_ack;
1889         u32 msg_tol;
1890         struct tipc_msg *msg = buf_msg(buf);
1891
1892         if (link_blocked(l_ptr))
1893                 goto exit;
1894
1895         /* record unnumbered packet arrival (force mismatch on next timeout) */
1896         l_ptr->checkpoint--;
1897
1898         if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
1899                 if (tipc_own_addr > msg_prevnode(msg))
1900                         l_ptr->b_ptr->net_plane = msg_net_plane(msg);
1901
1902         l_ptr->owner->permit_changeover = msg_redundant_link(msg);
1903
1904         switch (msg_type(msg)) {
1905
1906         case RESET_MSG:
1907                 if (!link_working_unknown(l_ptr) &&
1908                     (l_ptr->peer_session != INVALID_SESSION)) {
1909                         if (less_eq(msg_session(msg), l_ptr->peer_session))
1910                                 break; /* duplicate or old reset: ignore */
1911                 }
1912
1913                 if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1914                                 link_working_unknown(l_ptr))) {
1915                         /*
1916                          * peer has lost contact -- don't allow peer's links
1917                          * to reactivate before we recognize loss & clean up
1918                          */
1919                         l_ptr->owner->block_setup = WAIT_NODE_DOWN;
1920                 }
1921
1922                 link_state_event(l_ptr, RESET_MSG);
1923
1924                 /* fall thru' */
1925         case ACTIVATE_MSG:
1926                 /* Update link settings according other endpoint's values */
1927                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1928
1929                 msg_tol = msg_link_tolerance(msg);
1930                 if (msg_tol > l_ptr->tolerance)
1931                         link_set_supervision_props(l_ptr, msg_tol);
1932
1933                 if (msg_linkprio(msg) > l_ptr->priority)
1934                         l_ptr->priority = msg_linkprio(msg);
1935
1936                 max_pkt_info = msg_max_pkt(msg);
1937                 if (max_pkt_info) {
1938                         if (max_pkt_info < l_ptr->max_pkt_target)
1939                                 l_ptr->max_pkt_target = max_pkt_info;
1940                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
1941                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
1942                 } else {
1943                         l_ptr->max_pkt = l_ptr->max_pkt_target;
1944                 }
1945
1946                 /* Synchronize broadcast link info, if not done previously */
1947                 if (!tipc_node_is_up(l_ptr->owner)) {
1948                         l_ptr->owner->bclink.last_sent =
1949                                 l_ptr->owner->bclink.last_in =
1950                                 msg_last_bcast(msg);
1951                         l_ptr->owner->bclink.oos_state = 0;
1952                 }
1953
1954                 l_ptr->peer_session = msg_session(msg);
1955                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
1956
1957                 if (msg_type(msg) == ACTIVATE_MSG)
1958                         link_state_event(l_ptr, ACTIVATE_MSG);
1959                 break;
1960         case STATE_MSG:
1961
1962                 msg_tol = msg_link_tolerance(msg);
1963                 if (msg_tol)
1964                         link_set_supervision_props(l_ptr, msg_tol);
1965
1966                 if (msg_linkprio(msg) &&
1967                     (msg_linkprio(msg) != l_ptr->priority)) {
1968                         pr_warn("%s<%s>, priority change %u->%u\n",
1969                                 link_rst_msg, l_ptr->name, l_ptr->priority,
1970                                 msg_linkprio(msg));
1971                         l_ptr->priority = msg_linkprio(msg);
1972                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
1973                         break;
1974                 }
1975                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1976                 l_ptr->stats.recv_states++;
1977                 if (link_reset_unknown(l_ptr))
1978                         break;
1979
1980                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
1981                         rec_gap = mod(msg_next_sent(msg) -
1982                                       mod(l_ptr->next_in_no));
1983                 }
1984
1985                 max_pkt_ack = msg_max_pkt(msg);
1986                 if (max_pkt_ack > l_ptr->max_pkt) {
1987                         l_ptr->max_pkt = max_pkt_ack;
1988                         l_ptr->max_pkt_probes = 0;
1989                 }
1990
1991                 max_pkt_ack = 0;
1992                 if (msg_probe(msg)) {
1993                         l_ptr->stats.recv_probes++;
1994                         if (msg_size(msg) > sizeof(l_ptr->proto_msg))
1995                                 max_pkt_ack = msg_size(msg);
1996                 }
1997
1998                 /* Protocol message before retransmits, reduce loss risk */
1999                 if (l_ptr->owner->bclink.recv_permitted)
2000                         tipc_bclink_update_link_state(l_ptr->owner,
2001                                                       msg_last_bcast(msg));
2002
2003                 if (rec_gap || (msg_probe(msg))) {
2004                         tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2005                                                  0, rec_gap, 0, 0, max_pkt_ack);
2006                 }
2007                 if (msg_seq_gap(msg)) {
2008                         l_ptr->stats.recv_nacks++;
2009                         tipc_link_retransmit(l_ptr, l_ptr->first_out,
2010                                              msg_seq_gap(msg));
2011                 }
2012                 break;
2013         }
2014 exit:
2015         kfree_skb(buf);
2016 }
2017
2018
2019 /*
2020  * tipc_link_tunnel(): Send one message via a link belonging to
2021  * another bearer. Owner node is locked.
2022  */
2023 static void tipc_link_tunnel(struct tipc_link *l_ptr,
2024                              struct tipc_msg *tunnel_hdr, struct tipc_msg *msg,
2025                              u32 selector)
2026 {
2027         struct tipc_link *tunnel;
2028         struct sk_buff *buf;
2029         u32 length = msg_size(msg);
2030
2031         tunnel = l_ptr->owner->active_links[selector & 1];
2032         if (!tipc_link_is_up(tunnel)) {
2033                 pr_warn("%stunnel link no longer available\n", link_co_err);
2034                 return;
2035         }
2036         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2037         buf = tipc_buf_acquire(length + INT_H_SIZE);
2038         if (!buf) {
2039                 pr_warn("%sunable to send tunnel msg\n", link_co_err);
2040                 return;
2041         }
2042         skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2043         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2044         tipc_link_send_buf(tunnel, buf);
2045 }
2046
2047
2048
2049 /*
2050  * changeover(): Send whole message queue via the remaining link
2051  *               Owner node is locked.
2052  */
2053 void tipc_link_changeover(struct tipc_link *l_ptr)
2054 {
2055         u32 msgcount = l_ptr->out_queue_size;
2056         struct sk_buff *crs = l_ptr->first_out;
2057         struct tipc_link *tunnel = l_ptr->owner->active_links[0];
2058         struct tipc_msg tunnel_hdr;
2059         int split_bundles;
2060
2061         if (!tunnel)
2062                 return;
2063
2064         if (!l_ptr->owner->permit_changeover) {
2065                 pr_warn("%speer did not permit changeover\n", link_co_err);
2066                 return;
2067         }
2068
2069         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2070                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2071         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2072         msg_set_msgcnt(&tunnel_hdr, msgcount);
2073
2074         if (!l_ptr->first_out) {
2075                 struct sk_buff *buf;
2076
2077                 buf = tipc_buf_acquire(INT_H_SIZE);
2078                 if (buf) {
2079                         skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2080                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
2081                         tipc_link_send_buf(tunnel, buf);
2082                 } else {
2083                         pr_warn("%sunable to send changeover msg\n",
2084                                 link_co_err);
2085                 }
2086                 return;
2087         }
2088
2089         split_bundles = (l_ptr->owner->active_links[0] !=
2090                          l_ptr->owner->active_links[1]);
2091
2092         while (crs) {
2093                 struct tipc_msg *msg = buf_msg(crs);
2094
2095                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2096                         struct tipc_msg *m = msg_get_wrapped(msg);
2097                         unchar *pos = (unchar *)m;
2098
2099                         msgcount = msg_msgcnt(msg);
2100                         while (msgcount--) {
2101                                 msg_set_seqno(m, msg_seqno(msg));
2102                                 tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2103                                                  msg_link_selector(m));
2104                                 pos += align(msg_size(m));
2105                                 m = (struct tipc_msg *)pos;
2106                         }
2107                 } else {
2108                         tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2109                                          msg_link_selector(msg));
2110                 }
2111                 crs = crs->next;
2112         }
2113 }
2114
2115 void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
2116 {
2117         struct sk_buff *iter;
2118         struct tipc_msg tunnel_hdr;
2119
2120         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2121                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2122         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2123         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2124         iter = l_ptr->first_out;
2125         while (iter) {
2126                 struct sk_buff *outbuf;
2127                 struct tipc_msg *msg = buf_msg(iter);
2128                 u32 length = msg_size(msg);
2129
2130                 if (msg_user(msg) == MSG_BUNDLER)
2131                         msg_set_type(msg, CLOSED_MSG);
2132                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2133                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2134                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2135                 outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2136                 if (outbuf == NULL) {
2137                         pr_warn("%sunable to send duplicate msg\n",
2138                                 link_co_err);
2139                         return;
2140                 }
2141                 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2142                 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2143                                                length);
2144                 tipc_link_send_buf(tunnel, outbuf);
2145                 if (!tipc_link_is_up(l_ptr))
2146                         return;
2147                 iter = iter->next;
2148         }
2149 }
2150
2151 /**
2152  * buf_extract - extracts embedded TIPC message from another message
2153  * @skb: encapsulating message buffer
2154  * @from_pos: offset to extract from
2155  *
2156  * Returns a new message buffer containing an embedded message.  The
2157  * encapsulating message itself is left unchanged.
2158  */
2159 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2160 {
2161         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2162         u32 size = msg_size(msg);
2163         struct sk_buff *eb;
2164
2165         eb = tipc_buf_acquire(size);
2166         if (eb)
2167                 skb_copy_to_linear_data(eb, msg, size);
2168         return eb;
2169 }
2170
2171 /*
2172  *  link_recv_changeover_msg(): Receive tunneled packet sent
2173  *  via other link. Node is locked. Return extracted buffer.
2174  */
2175 static int link_recv_changeover_msg(struct tipc_link **l_ptr,
2176                                     struct sk_buff **buf)
2177 {
2178         struct sk_buff *tunnel_buf = *buf;
2179         struct tipc_link *dest_link;
2180         struct tipc_msg *msg;
2181         struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2182         u32 msg_typ = msg_type(tunnel_msg);
2183         u32 msg_count = msg_msgcnt(tunnel_msg);
2184         u32 bearer_id = msg_bearer_id(tunnel_msg);
2185
2186         if (bearer_id >= MAX_BEARERS)
2187                 goto exit;
2188         dest_link = (*l_ptr)->owner->links[bearer_id];
2189         if (!dest_link)
2190                 goto exit;
2191         if (dest_link == *l_ptr) {
2192                 pr_err("Unexpected changeover message on link <%s>\n",
2193                        (*l_ptr)->name);
2194                 goto exit;
2195         }
2196         *l_ptr = dest_link;
2197         msg = msg_get_wrapped(tunnel_msg);
2198
2199         if (msg_typ == DUPLICATE_MSG) {
2200                 if (less(msg_seqno(msg), mod(dest_link->next_in_no)))
2201                         goto exit;
2202                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2203                 if (*buf == NULL) {
2204                         pr_warn("%sduplicate msg dropped\n", link_co_err);
2205                         goto exit;
2206                 }
2207                 kfree_skb(tunnel_buf);
2208                 return 1;
2209         }
2210
2211         /* First original message ?: */
2212         if (tipc_link_is_up(dest_link)) {
2213                 pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg,
2214                         dest_link->name);
2215                 tipc_link_reset(dest_link);
2216                 dest_link->exp_msg_count = msg_count;
2217                 if (!msg_count)
2218                         goto exit;
2219         } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2220                 dest_link->exp_msg_count = msg_count;
2221                 if (!msg_count)
2222                         goto exit;
2223         }
2224
2225         /* Receive original message */
2226         if (dest_link->exp_msg_count == 0) {
2227                 pr_warn("%sgot too many tunnelled messages\n", link_co_err);
2228                 goto exit;
2229         }
2230         dest_link->exp_msg_count--;
2231         if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2232                 goto exit;
2233         } else {
2234                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2235                 if (*buf != NULL) {
2236                         kfree_skb(tunnel_buf);
2237                         return 1;
2238                 } else {
2239                         pr_warn("%soriginal msg dropped\n", link_co_err);
2240                 }
2241         }
2242 exit:
2243         *buf = NULL;
2244         kfree_skb(tunnel_buf);
2245         return 0;
2246 }
2247
2248 /*
2249  *  Bundler functionality:
2250  */
2251 void tipc_link_recv_bundle(struct sk_buff *buf)
2252 {
2253         u32 msgcount = msg_msgcnt(buf_msg(buf));
2254         u32 pos = INT_H_SIZE;
2255         struct sk_buff *obuf;
2256
2257         while (msgcount--) {
2258                 obuf = buf_extract(buf, pos);
2259                 if (obuf == NULL) {
2260                         pr_warn("Link unable to unbundle message(s)\n");
2261                         break;
2262                 }
2263                 pos += align(msg_size(buf_msg(obuf)));
2264                 tipc_net_route_msg(obuf);
2265         }
2266         kfree_skb(buf);
2267 }
2268
2269 /*
2270  *  Fragmentation/defragmentation:
2271  */
2272
2273 /*
2274  * link_send_long_buf: Entry for buffers needing fragmentation.
2275  * The buffer is complete, inclusive total message length.
2276  * Returns user data length.
2277  */
2278 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2279 {
2280         struct sk_buff *buf_chain = NULL;
2281         struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
2282         struct tipc_msg *inmsg = buf_msg(buf);
2283         struct tipc_msg fragm_hdr;
2284         u32 insize = msg_size(inmsg);
2285         u32 dsz = msg_data_sz(inmsg);
2286         unchar *crs = buf->data;
2287         u32 rest = insize;
2288         u32 pack_sz = l_ptr->max_pkt;
2289         u32 fragm_sz = pack_sz - INT_H_SIZE;
2290         u32 fragm_no = 0;
2291         u32 destaddr;
2292
2293         if (msg_short(inmsg))
2294                 destaddr = l_ptr->addr;
2295         else
2296                 destaddr = msg_destnode(inmsg);
2297
2298         /* Prepare reusable fragment header: */
2299         tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2300                  INT_H_SIZE, destaddr);
2301
2302         /* Chop up message: */
2303         while (rest > 0) {
2304                 struct sk_buff *fragm;
2305
2306                 if (rest <= fragm_sz) {
2307                         fragm_sz = rest;
2308                         msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2309                 }
2310                 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2311                 if (fragm == NULL) {
2312                         kfree_skb(buf);
2313                         while (buf_chain) {
2314                                 buf = buf_chain;
2315                                 buf_chain = buf_chain->next;
2316                                 kfree_skb(buf);
2317                         }
2318                         return -ENOMEM;
2319                 }
2320                 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2321                 fragm_no++;
2322                 msg_set_fragm_no(&fragm_hdr, fragm_no);
2323                 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2324                 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2325                                                fragm_sz);
2326                 buf_chain_tail->next = fragm;
2327                 buf_chain_tail = fragm;
2328
2329                 rest -= fragm_sz;
2330                 crs += fragm_sz;
2331                 msg_set_type(&fragm_hdr, FRAGMENT);
2332         }
2333         kfree_skb(buf);
2334
2335         /* Append chain of fragments to send queue & send them */
2336         l_ptr->long_msg_seq_no++;
2337         link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
2338         l_ptr->stats.sent_fragments += fragm_no;
2339         l_ptr->stats.sent_fragmented++;
2340         tipc_link_push_queue(l_ptr);
2341
2342         return dsz;
2343 }
2344
2345 /*
2346  * A pending message being re-assembled must store certain values
2347  * to handle subsequent fragments correctly. The following functions
2348  * help storing these values in unused, available fields in the
2349  * pending message. This makes dynamic memory allocation unnecessary.
2350  */
2351 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2352 {
2353         msg_set_seqno(buf_msg(buf), seqno);
2354 }
2355
2356 static u32 get_fragm_size(struct sk_buff *buf)
2357 {
2358         return msg_ack(buf_msg(buf));
2359 }
2360
2361 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2362 {
2363         msg_set_ack(buf_msg(buf), sz);
2364 }
2365
2366 static u32 get_expected_frags(struct sk_buff *buf)
2367 {
2368         return msg_bcast_ack(buf_msg(buf));
2369 }
2370
2371 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2372 {
2373         msg_set_bcast_ack(buf_msg(buf), exp);
2374 }
2375
2376 /*
2377  * tipc_link_recv_fragment(): Called with node lock on. Returns
2378  * the reassembled buffer if message is complete.
2379  */
2380 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2381                             struct tipc_msg **m)
2382 {
2383         struct sk_buff *prev = NULL;
2384         struct sk_buff *fbuf = *fb;
2385         struct tipc_msg *fragm = buf_msg(fbuf);
2386         struct sk_buff *pbuf = *pending;
2387         u32 long_msg_seq_no = msg_long_msgno(fragm);
2388
2389         *fb = NULL;
2390
2391         /* Is there an incomplete message waiting for this fragment? */
2392         while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) ||
2393                         (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2394                 prev = pbuf;
2395                 pbuf = pbuf->next;
2396         }
2397
2398         if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2399                 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2400                 u32 msg_sz = msg_size(imsg);
2401                 u32 fragm_sz = msg_data_sz(fragm);
2402                 u32 exp_fragm_cnt;
2403                 u32 max =  TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
2404
2405                 if (msg_type(imsg) == TIPC_MCAST_MSG)
2406                         max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2407                 if (fragm_sz == 0 || msg_size(imsg) > max) {
2408                         kfree_skb(fbuf);
2409                         return 0;
2410                 }
2411                 exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz);
2412                 pbuf = tipc_buf_acquire(msg_size(imsg));
2413                 if (pbuf != NULL) {
2414                         pbuf->next = *pending;
2415                         *pending = pbuf;
2416                         skb_copy_to_linear_data(pbuf, imsg,
2417                                                 msg_data_sz(fragm));
2418                         /*  Prepare buffer for subsequent fragments. */
2419                         set_long_msg_seqno(pbuf, long_msg_seq_no);
2420                         set_fragm_size(pbuf, fragm_sz);
2421                         set_expected_frags(pbuf, exp_fragm_cnt - 1);
2422                 } else {
2423                         pr_debug("Link unable to reassemble fragmented message\n");
2424                         kfree_skb(fbuf);
2425                         return -1;
2426                 }
2427                 kfree_skb(fbuf);
2428                 return 0;
2429         } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2430                 u32 dsz = msg_data_sz(fragm);
2431                 u32 fsz = get_fragm_size(pbuf);
2432                 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2433                 u32 exp_frags = get_expected_frags(pbuf) - 1;
2434                 skb_copy_to_linear_data_offset(pbuf, crs,
2435                                                msg_data(fragm), dsz);
2436                 kfree_skb(fbuf);
2437
2438                 /* Is message complete? */
2439                 if (exp_frags == 0) {
2440                         if (prev)
2441                                 prev->next = pbuf->next;
2442                         else
2443                                 *pending = pbuf->next;
2444                         msg_reset_reroute_cnt(buf_msg(pbuf));
2445                         *fb = pbuf;
2446                         *m = buf_msg(pbuf);
2447                         return 1;
2448                 }
2449                 set_expected_frags(pbuf, exp_frags);
2450                 return 0;
2451         }
2452         kfree_skb(fbuf);
2453         return 0;
2454 }
2455
2456 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
2457 {
2458         if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
2459                 return;
2460
2461         l_ptr->tolerance = tolerance;
2462         l_ptr->continuity_interval =
2463                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2464         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2465 }
2466
2467 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
2468 {
2469         /* Data messages from this node, inclusive FIRST_FRAGM */
2470         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2471         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2472         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2473         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2474         /* Transiting data messages,inclusive FIRST_FRAGM */
2475         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2476         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2477         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2478         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2479         l_ptr->queue_limit[CONN_MANAGER] = 1200;
2480         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2481         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2482         /* FRAGMENT and LAST_FRAGMENT packets */
2483         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2484 }
2485
2486 /**
2487  * link_find_link - locate link by name
2488  * @name: ptr to link name string
2489  * @node: ptr to area to be filled with ptr to associated node
2490  *
2491  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2492  * this also prevents link deletion.
2493  *
2494  * Returns pointer to link (or 0 if invalid link name).
2495  */
2496 static struct tipc_link *link_find_link(const char *name,
2497                                         struct tipc_node **node)
2498 {
2499         struct tipc_link *l_ptr;
2500         struct tipc_node *n_ptr;
2501         int i;
2502
2503         list_for_each_entry(n_ptr, &tipc_node_list, list) {
2504                 for (i = 0; i < MAX_BEARERS; i++) {
2505                         l_ptr = n_ptr->links[i];
2506                         if (l_ptr && !strcmp(l_ptr->name, name))
2507                                 goto found;
2508                 }
2509         }
2510         l_ptr = NULL;
2511         n_ptr = NULL;
2512 found:
2513         *node = n_ptr;
2514         return l_ptr;
2515 }
2516
2517 /**
2518  * link_value_is_valid -- validate proposed link tolerance/priority/window
2519  *
2520  * @cmd: value type (TIPC_CMD_SET_LINK_*)
2521  * @new_value: the new value
2522  *
2523  * Returns 1 if value is within range, 0 if not.
2524  */
2525 static int link_value_is_valid(u16 cmd, u32 new_value)
2526 {
2527         switch (cmd) {
2528         case TIPC_CMD_SET_LINK_TOL:
2529                 return (new_value >= TIPC_MIN_LINK_TOL) &&
2530                         (new_value <= TIPC_MAX_LINK_TOL);
2531         case TIPC_CMD_SET_LINK_PRI:
2532                 return (new_value <= TIPC_MAX_LINK_PRI);
2533         case TIPC_CMD_SET_LINK_WINDOW:
2534                 return (new_value >= TIPC_MIN_LINK_WIN) &&
2535                         (new_value <= TIPC_MAX_LINK_WIN);
2536         }
2537         return 0;
2538 }
2539
2540 /**
2541  * link_cmd_set_value - change priority/tolerance/window for link/bearer/media
2542  * @name: ptr to link, bearer, or media name
2543  * @new_value: new value of link, bearer, or media setting
2544  * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
2545  *
2546  * Caller must hold 'tipc_net_lock' to ensure link/bearer/media is not deleted.
2547  *
2548  * Returns 0 if value updated and negative value on error.
2549  */
2550 static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
2551 {
2552         struct tipc_node *node;
2553         struct tipc_link *l_ptr;
2554         struct tipc_bearer *b_ptr;
2555         struct tipc_media *m_ptr;
2556         int res = 0;
2557
2558         l_ptr = link_find_link(name, &node);
2559         if (l_ptr) {
2560                 /*
2561                  * acquire node lock for tipc_link_send_proto_msg().
2562                  * see "TIPC locking policy" in net.c.
2563                  */
2564                 tipc_node_lock(node);
2565                 switch (cmd) {
2566                 case TIPC_CMD_SET_LINK_TOL:
2567                         link_set_supervision_props(l_ptr, new_value);
2568                         tipc_link_send_proto_msg(l_ptr,
2569                                 STATE_MSG, 0, 0, new_value, 0, 0);
2570                         break;
2571                 case TIPC_CMD_SET_LINK_PRI:
2572                         l_ptr->priority = new_value;
2573                         tipc_link_send_proto_msg(l_ptr,
2574                                 STATE_MSG, 0, 0, 0, new_value, 0);
2575                         break;
2576                 case TIPC_CMD_SET_LINK_WINDOW:
2577                         tipc_link_set_queue_limits(l_ptr, new_value);
2578                         break;
2579                 default:
2580                         res = -EINVAL;
2581                         break;
2582                 }
2583                 tipc_node_unlock(node);
2584                 return res;
2585         }
2586
2587         b_ptr = tipc_bearer_find(name);
2588         if (b_ptr) {
2589                 switch (cmd) {
2590                 case TIPC_CMD_SET_LINK_TOL:
2591                         b_ptr->tolerance = new_value;
2592                         break;
2593                 case TIPC_CMD_SET_LINK_PRI:
2594                         b_ptr->priority = new_value;
2595                         break;
2596                 case TIPC_CMD_SET_LINK_WINDOW:
2597                         b_ptr->window = new_value;
2598                         break;
2599                 default:
2600                         res = -EINVAL;
2601                         break;
2602                 }
2603                 return res;
2604         }
2605
2606         m_ptr = tipc_media_find(name);
2607         if (!m_ptr)
2608                 return -ENODEV;
2609         switch (cmd) {
2610         case TIPC_CMD_SET_LINK_TOL:
2611                 m_ptr->tolerance = new_value;
2612                 break;
2613         case TIPC_CMD_SET_LINK_PRI:
2614                 m_ptr->priority = new_value;
2615                 break;
2616         case TIPC_CMD_SET_LINK_WINDOW:
2617                 m_ptr->window = new_value;
2618                 break;
2619         default:
2620                 res = -EINVAL;
2621                 break;
2622         }
2623         return res;
2624 }
2625
2626 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2627                                      u16 cmd)
2628 {
2629         struct tipc_link_config *args;
2630         u32 new_value;
2631         int res;
2632
2633         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2634                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2635
2636         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2637         new_value = ntohl(args->value);
2638
2639         if (!link_value_is_valid(cmd, new_value))
2640                 return tipc_cfg_reply_error_string(
2641                         "cannot change, value invalid");
2642
2643         if (!strcmp(args->name, tipc_bclink_name)) {
2644                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2645                     (tipc_bclink_set_queue_limits(new_value) == 0))
2646                         return tipc_cfg_reply_none();
2647                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2648                                                    " (cannot change setting on broadcast link)");
2649         }
2650
2651         read_lock_bh(&tipc_net_lock);
2652         res = link_cmd_set_value(args->name, new_value, cmd);
2653         read_unlock_bh(&tipc_net_lock);
2654         if (res)
2655                 return tipc_cfg_reply_error_string("cannot change link setting");
2656
2657         return tipc_cfg_reply_none();
2658 }
2659
2660 /**
2661  * link_reset_statistics - reset link statistics
2662  * @l_ptr: pointer to link
2663  */
2664 static void link_reset_statistics(struct tipc_link *l_ptr)
2665 {
2666         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2667         l_ptr->stats.sent_info = l_ptr->next_out_no;
2668         l_ptr->stats.recv_info = l_ptr->next_in_no;
2669 }
2670
2671 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2672 {
2673         char *link_name;
2674         struct tipc_link *l_ptr;
2675         struct tipc_node *node;
2676
2677         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2678                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2679
2680         link_name = (char *)TLV_DATA(req_tlv_area);
2681         if (!strcmp(link_name, tipc_bclink_name)) {
2682                 if (tipc_bclink_reset_stats())
2683                         return tipc_cfg_reply_error_string("link not found");
2684                 return tipc_cfg_reply_none();
2685         }
2686
2687         read_lock_bh(&tipc_net_lock);
2688         l_ptr = link_find_link(link_name, &node);
2689         if (!l_ptr) {
2690                 read_unlock_bh(&tipc_net_lock);
2691                 return tipc_cfg_reply_error_string("link not found");
2692         }
2693
2694         tipc_node_lock(node);
2695         link_reset_statistics(l_ptr);
2696         tipc_node_unlock(node);
2697         read_unlock_bh(&tipc_net_lock);
2698         return tipc_cfg_reply_none();
2699 }
2700
2701 /**
2702  * percent - convert count to a percentage of total (rounding up or down)
2703  */
2704 static u32 percent(u32 count, u32 total)
2705 {
2706         return (count * 100 + (total / 2)) / total;
2707 }
2708
2709 /**
2710  * tipc_link_stats - print link statistics
2711  * @name: link name
2712  * @buf: print buffer area
2713  * @buf_size: size of print buffer area
2714  *
2715  * Returns length of print buffer data string (or 0 if error)
2716  */
2717 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2718 {
2719         struct tipc_link *l;
2720         struct tipc_stats *s;
2721         struct tipc_node *node;
2722         char *status;
2723         u32 profile_total = 0;
2724         int ret;
2725
2726         if (!strcmp(name, tipc_bclink_name))
2727                 return tipc_bclink_stats(buf, buf_size);
2728
2729         read_lock_bh(&tipc_net_lock);
2730         l = link_find_link(name, &node);
2731         if (!l) {
2732                 read_unlock_bh(&tipc_net_lock);
2733                 return 0;
2734         }
2735         tipc_node_lock(node);
2736         s = &l->stats;
2737
2738         if (tipc_link_is_active(l))
2739                 status = "ACTIVE";
2740         else if (tipc_link_is_up(l))
2741                 status = "STANDBY";
2742         else
2743                 status = "DEFUNCT";
2744
2745         ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
2746                             "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2747                             "  Window:%u packets\n",
2748                             l->name, status, l->max_pkt, l->priority,
2749                             l->tolerance, l->queue_limit[0]);
2750
2751         ret += tipc_snprintf(buf + ret, buf_size - ret,
2752                              "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2753                              l->next_in_no - s->recv_info, s->recv_fragments,
2754                              s->recv_fragmented, s->recv_bundles,
2755                              s->recv_bundled);
2756
2757         ret += tipc_snprintf(buf + ret, buf_size - ret,
2758                              "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2759                              l->next_out_no - s->sent_info, s->sent_fragments,
2760                              s->sent_fragmented, s->sent_bundles,
2761                              s->sent_bundled);
2762
2763         profile_total = s->msg_length_counts;
2764         if (!profile_total)
2765                 profile_total = 1;
2766
2767         ret += tipc_snprintf(buf + ret, buf_size - ret,
2768                              "  TX profile sample:%u packets  average:%u octets\n"
2769                              "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2770                              "-16384:%u%% -32768:%u%% -66000:%u%%\n",
2771                              s->msg_length_counts,
2772                              s->msg_lengths_total / profile_total,
2773                              percent(s->msg_length_profile[0], profile_total),
2774                              percent(s->msg_length_profile[1], profile_total),
2775                              percent(s->msg_length_profile[2], profile_total),
2776                              percent(s->msg_length_profile[3], profile_total),
2777                              percent(s->msg_length_profile[4], profile_total),
2778                              percent(s->msg_length_profile[5], profile_total),
2779                              percent(s->msg_length_profile[6], profile_total));
2780
2781         ret += tipc_snprintf(buf + ret, buf_size - ret,
2782                              "  RX states:%u probes:%u naks:%u defs:%u"
2783                              " dups:%u\n", s->recv_states, s->recv_probes,
2784                              s->recv_nacks, s->deferred_recv, s->duplicates);
2785
2786         ret += tipc_snprintf(buf + ret, buf_size - ret,
2787                              "  TX states:%u probes:%u naks:%u acks:%u"
2788                              " dups:%u\n", s->sent_states, s->sent_probes,
2789                              s->sent_nacks, s->sent_acks, s->retransmitted);
2790
2791         ret += tipc_snprintf(buf + ret, buf_size - ret,
2792                              "  Congestion link:%u  Send queue"
2793                              " max:%u avg:%u\n", s->link_congs,
2794                              s->max_queue_sz, s->queue_sz_counts ?
2795                              (s->accu_queue_sz / s->queue_sz_counts) : 0);
2796
2797         tipc_node_unlock(node);
2798         read_unlock_bh(&tipc_net_lock);
2799         return ret;
2800 }
2801
2802 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2803 {
2804         struct sk_buff *buf;
2805         struct tlv_desc *rep_tlv;
2806         int str_len;
2807         int pb_len;
2808         char *pb;
2809
2810         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2811                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2812
2813         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2814         if (!buf)
2815                 return NULL;
2816
2817         rep_tlv = (struct tlv_desc *)buf->data;
2818         pb = TLV_DATA(rep_tlv);
2819         pb_len = ULTRA_STRING_MAX_LEN;
2820         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
2821                                   pb, pb_len);
2822         if (!str_len) {
2823                 kfree_skb(buf);
2824                 return tipc_cfg_reply_error_string("link not found");
2825         }
2826         str_len += 1;   /* for "\0" */
2827         skb_put(buf, TLV_SPACE(str_len));
2828         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2829
2830         return buf;
2831 }
2832
2833 /**
2834  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
2835  * @dest: network address of destination node
2836  * @selector: used to select from set of active links
2837  *
2838  * If no active link can be found, uses default maximum packet size.
2839  */
2840 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
2841 {
2842         struct tipc_node *n_ptr;
2843         struct tipc_link *l_ptr;
2844         u32 res = MAX_PKT_DEFAULT;
2845
2846         if (dest == tipc_own_addr)
2847                 return MAX_MSG_SIZE;
2848
2849         read_lock_bh(&tipc_net_lock);
2850         n_ptr = tipc_node_find(dest);
2851         if (n_ptr) {
2852                 tipc_node_lock(n_ptr);
2853                 l_ptr = n_ptr->active_links[selector & 1];
2854                 if (l_ptr)
2855                         res = l_ptr->max_pkt;
2856                 tipc_node_unlock(n_ptr);
2857         }
2858         read_unlock_bh(&tipc_net_lock);
2859         return res;
2860 }
2861
2862 static void link_print(struct tipc_link *l_ptr, const char *str)
2863 {
2864         pr_info("%s Link %x<%s>:", str, l_ptr->addr, l_ptr->b_ptr->name);
2865
2866         if (link_working_unknown(l_ptr))
2867                 pr_cont(":WU\n");
2868         else if (link_reset_reset(l_ptr))
2869                 pr_cont(":RR\n");
2870         else if (link_reset_unknown(l_ptr))
2871                 pr_cont(":RU\n");
2872         else if (link_working_working(l_ptr))
2873                 pr_cont(":WW\n");
2874         else
2875                 pr_cont("\n");
2876 }