]> Pileus Git - ~andy/linux/blobdiff - net/tipc/link.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[~andy/linux] / net / tipc / link.c
index 13b9877458201fa4d5bb14a36892825272229050..d4b5de41b682188f1cf3bcffb08a44f38a9f84ae 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/link.c: TIPC link code
  *
- * Copyright (c) 1996-2007, 2012, Ericsson AB
+ * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -78,8 +78,8 @@ static const char *link_unk_evt = "Unknown link event ";
 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
                                       struct sk_buff *buf);
 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
-static int  link_recv_changeover_msg(struct tipc_link **l_ptr,
-                                    struct sk_buff **buf);
+static int  tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
+                                struct sk_buff **buf);
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
 static int  link_send_sections_long(struct tipc_port *sender,
                                    struct iovec const *msg_sect,
@@ -87,7 +87,6 @@ static int  link_send_sections_long(struct tipc_port *sender,
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
-static void link_start(struct tipc_link *l_ptr);
 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
 static void tipc_link_send_sync(struct tipc_link *l);
 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
@@ -278,9 +277,11 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 
        tipc_node_attach_link(n_ptr, l_ptr);
 
-       k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
+       k_init_timer(&l_ptr->timer, (Handler)link_timeout,
+                    (unsigned long)l_ptr);
        list_add_tail(&l_ptr->link_list, &b_ptr->links);
-       tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
+
+       link_state_event(l_ptr, STARTING_EVT);
 
        return l_ptr;
 }
@@ -305,19 +306,13 @@ void tipc_link_delete(struct tipc_link *l_ptr)
        tipc_node_lock(l_ptr->owner);
        tipc_link_reset(l_ptr);
        tipc_node_detach_link(l_ptr->owner, l_ptr);
-       tipc_link_stop(l_ptr);
+       tipc_link_purge_queues(l_ptr);
        list_del_init(&l_ptr->link_list);
        tipc_node_unlock(l_ptr->owner);
        k_term_timer(&l_ptr->timer);
        kfree(l_ptr);
 }
 
-static void link_start(struct tipc_link *l_ptr)
-{
-       tipc_node_lock(l_ptr->owner);
-       link_state_event(l_ptr, STARTING_EVT);
-       tipc_node_unlock(l_ptr->owner);
-}
 
 /**
  * link_schedule_port - schedule port for deferred sending
@@ -386,14 +381,7 @@ exit:
  */
 static void link_release_outqueue(struct tipc_link *l_ptr)
 {
-       struct sk_buff *buf = l_ptr->first_out;
-       struct sk_buff *next;
-
-       while (buf) {
-               next = buf->next;
-               kfree_skb(buf);
-               buf = next;
-       }
+       kfree_skb_list(l_ptr->first_out);
        l_ptr->first_out = NULL;
        l_ptr->out_queue_size = 0;
 }
@@ -410,37 +398,20 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 }
 
 /**
- * tipc_link_stop - purge all inbound and outbound messages associated with link
+ * tipc_link_purge_queues - purge all pkt queues associated with link
  * @l_ptr: pointer to link
  */
-void tipc_link_stop(struct tipc_link *l_ptr)
+void tipc_link_purge_queues(struct tipc_link *l_ptr)
 {
-       struct sk_buff *buf;
-       struct sk_buff *next;
-
-       buf = l_ptr->oldest_deferred_in;
-       while (buf) {
-               next = buf->next;
-               kfree_skb(buf);
-               buf = next;
-       }
-
-       buf = l_ptr->first_out;
-       while (buf) {
-               next = buf->next;
-               kfree_skb(buf);
-               buf = next;
-       }
-
+       kfree_skb_list(l_ptr->oldest_deferred_in);
+       kfree_skb_list(l_ptr->first_out);
        tipc_link_reset_fragments(l_ptr);
-
        kfree_skb(l_ptr->proto_msg_queue);
        l_ptr->proto_msg_queue = NULL;
 }
 
 void tipc_link_reset(struct tipc_link *l_ptr)
 {
-       struct sk_buff *buf;
        u32 prev_state = l_ptr->state;
        u32 checkpoint = l_ptr->next_in_no;
        int was_active_link = tipc_link_is_active(l_ptr);
@@ -461,8 +432,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        tipc_node_link_down(l_ptr->owner, l_ptr);
        tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
 
-       if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
-           l_ptr->owner->permit_changeover) {
+       if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
                l_ptr->reset_checkpoint = checkpoint;
                l_ptr->exp_msg_count = START_CHANGEOVER;
        }
@@ -471,12 +441,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        link_release_outqueue(l_ptr);
        kfree_skb(l_ptr->proto_msg_queue);
        l_ptr->proto_msg_queue = NULL;
-       buf = l_ptr->oldest_deferred_in;
-       while (buf) {
-               struct sk_buff *next = buf->next;
-               kfree_skb(buf);
-               buf = next;
-       }
+       kfree_skb_list(l_ptr->oldest_deferred_in);
        if (!list_empty(&l_ptr->waiting_ports))
                tipc_link_wakeup_ports(l_ptr, 1);
 
@@ -517,10 +482,11 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
        if (!l_ptr->started && (event != STARTING_EVT))
                return;         /* Not yet. */
 
-       if (link_blocked(l_ptr)) {
+       /* Check whether changeover is going on */
+       if (l_ptr->exp_msg_count) {
                if (event == TIMEOUT_EVT)
                        link_set_timer(l_ptr, cont_intv);
-               return;   /* Changeover going on */
+               return;
        }
 
        switch (l_ptr->state) {
@@ -790,8 +756,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
                return link_send_long_buf(l_ptr, buf);
 
        /* Packet can be queued or sent. */
-       if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
-                  !link_congested(l_ptr))) {
+       if (likely(!link_congested(l_ptr))) {
                link_add_to_outqueue(l_ptr, buf, msg);
 
                tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
@@ -957,14 +922,13 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
 
        if (likely(!link_congested(l_ptr))) {
                if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
-                       if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
-                               link_add_to_outqueue(l_ptr, buf, msg);
-                               tipc_bearer_send(l_ptr->b_ptr, buf,
-                                                &l_ptr->media_addr);
-                               l_ptr->unacked_window = 0;
-                               return res;
-                       }
-               } else
+                       link_add_to_outqueue(l_ptr, buf, msg);
+                       tipc_bearer_send(l_ptr->b_ptr, buf,
+                                        &l_ptr->media_addr);
+                       l_ptr->unacked_window = 0;
+                       return res;
+               }
+               else
                        *used_max_pkt = l_ptr->max_pkt;
        }
        return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
@@ -1013,8 +977,7 @@ exit:
                        }
 
                        /* Exit if link (or bearer) is congested */
-                       if (link_congested(l_ptr) ||
-                           tipc_bearer_blocked(l_ptr->b_ptr)) {
+                       if (link_congested(l_ptr)) {
                                res = link_schedule_port(l_ptr,
                                                         sender->ref, res);
                                goto exit;
@@ -1127,10 +1090,7 @@ again:
                if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
                        res = -EFAULT;
 error:
-                       for (; buf_chain; buf_chain = buf) {
-                               buf = buf_chain->next;
-                               kfree_skb(buf_chain);
-                       }
+                       kfree_skb_list(buf_chain);
                        return res;
                }
                sect_crs += sz;
@@ -1180,18 +1140,12 @@ error:
                if (l_ptr->max_pkt < max_pkt) {
                        sender->max_pkt = l_ptr->max_pkt;
                        tipc_node_unlock(node);
-                       for (; buf_chain; buf_chain = buf) {
-                               buf = buf_chain->next;
-                               kfree_skb(buf_chain);
-                       }
+                       kfree_skb_list(buf_chain);
                        goto again;
                }
        } else {
 reject:
-               for (; buf_chain; buf_chain = buf) {
-                       buf = buf_chain->next;
-                       kfree_skb(buf_chain);
-               }
+               kfree_skb_list(buf_chain);
                return tipc_port_reject_sections(sender, hdr, msg_sect,
                                                 len, TIPC_ERR_NO_NODE);
        }
@@ -1209,7 +1163,7 @@ reject:
 /*
  * tipc_link_push_packet: Push one unsent packet to the media
  */
-u32 tipc_link_push_packet(struct tipc_link *l_ptr)
+static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
 {
        struct sk_buff *buf = l_ptr->first_out;
        u32 r_q_size = l_ptr->retransm_queue_size;
@@ -1281,9 +1235,6 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
 {
        u32 res;
 
-       if (tipc_bearer_blocked(l_ptr->b_ptr))
-               return;
-
        do {
                res = tipc_link_push_packet(l_ptr);
        } while (!res);
@@ -1370,26 +1321,15 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
 
        msg = buf_msg(buf);
 
-       if (tipc_bearer_blocked(l_ptr->b_ptr)) {
-               if (l_ptr->retransm_queue_size == 0) {
-                       l_ptr->retransm_queue_head = msg_seqno(msg);
-                       l_ptr->retransm_queue_size = retransmits;
-               } else {
-                       pr_err("Unexpected retransmit on link %s (qsize=%d)\n",
-                              l_ptr->name, l_ptr->retransm_queue_size);
+       /* Detect repeated retransmit failures */
+       if (l_ptr->last_retransmitted == msg_seqno(msg)) {
+               if (++l_ptr->stale_count > 100) {
+                       link_retransmit_failure(l_ptr, buf);
+                       return;
                }
-               return;
        } else {
-               /* Detect repeated retransmit failures on unblocked bearer */
-               if (l_ptr->last_retransmitted == msg_seqno(msg)) {
-                       if (++l_ptr->stale_count > 100) {
-                               link_retransmit_failure(l_ptr, buf);
-                               return;
-                       }
-               } else {
-                       l_ptr->last_retransmitted = msg_seqno(msg);
-                       l_ptr->stale_count = 1;
-               }
+               l_ptr->last_retransmitted = msg_seqno(msg);
+               l_ptr->stale_count = 1;
        }
 
        while (retransmits && (buf != l_ptr->next_out) && buf) {
@@ -1476,14 +1416,14 @@ static int link_recv_buf_validate(struct sk_buff *buf)
 }
 
 /**
- * tipc_recv_msg - process TIPC messages arriving from off-node
+ * tipc_rcv - process TIPC packets/messages arriving from off-node
  * @head: pointer to message buffer chain
  * @tb_ptr: pointer to bearer message arrived on
  *
  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
  * structure (i.e. cannot be NULL), but bearer can be inactive.
  */
-void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
+void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
 {
        read_lock_bh(&tipc_net_lock);
        while (head) {
@@ -1658,7 +1598,7 @@ deliver:
                        continue;
                case CHANGEOVER_PROTOCOL:
                        type = msg_type(msg);
-                       if (link_recv_changeover_msg(&l_ptr, &buf)) {
+                       if (tipc_link_tunnel_rcv(&l_ptr, &buf)) {
                                msg = buf_msg(buf);
                                seq_no = msg_seqno(msg);
                                if (type == ORIGINAL_MSG)
@@ -1787,7 +1727,8 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
                l_ptr->proto_msg_queue = NULL;
        }
 
-       if (link_blocked(l_ptr))
+       /* Don't send protocol message during link changeover */
+       if (l_ptr->exp_msg_count)
                return;
 
        /* Abort non-RESET send if communication with node is prohibited */
@@ -1862,12 +1803,6 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
        buf->priority = TC_PRIO_CONTROL;
 
-       /* Defer message if bearer is already blocked */
-       if (tipc_bearer_blocked(l_ptr->b_ptr)) {
-               l_ptr->proto_msg_queue = buf;
-               return;
-       }
-
        tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
        l_ptr->unacked_window = 0;
        kfree_skb(buf);
@@ -1886,7 +1821,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
        u32 msg_tol;
        struct tipc_msg *msg = buf_msg(buf);
 
-       if (link_blocked(l_ptr))
+       /* Discard protocol message during link changeover */
+       if (l_ptr->exp_msg_count)
                goto exit;
 
        /* record unnumbered packet arrival (force mismatch on next timeout) */
@@ -1896,8 +1832,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
                if (tipc_own_addr > msg_prevnode(msg))
                        l_ptr->b_ptr->net_plane = msg_net_plane(msg);
 
-       l_ptr->owner->permit_changeover = msg_redundant_link(msg);
-
        switch (msg_type(msg)) {
 
        case RESET_MSG:
@@ -2013,13 +1947,13 @@ exit:
 }
 
 
-/*
- * tipc_link_tunnel(): Send one message via a link belonging to
- * another bearer. Owner node is locked.
+/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
+ * a different bearer. Owner node is locked.
  */
-static void tipc_link_tunnel(struct tipc_link *l_ptr,
-                            struct tipc_msg *tunnel_hdr, struct tipc_msg *msg,
-                            u32 selector)
+static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
+                                 struct tipc_msg *tunnel_hdr,
+                                 struct tipc_msg *msg,
+                                 u32 selector)
 {
        struct tipc_link *tunnel;
        struct sk_buff *buf;
@@ -2042,12 +1976,13 @@ static void tipc_link_tunnel(struct tipc_link *l_ptr,
 }
 
 
-
-/*
- * changeover(): Send whole message queue via the remaining link
- *               Owner node is locked.
+/* tipc_link_failover_send_queue(): A link has gone down, but a second
+ * link is still active. We can do failover. Tunnel the failing link's
+ * whole send queue via the remaining link. This way, we don't lose
+ * any packets, and sequence order is preserved for subsequent traffic
+ * sent over the remaining link. Owner node is locked.
  */
-void tipc_link_changeover(struct tipc_link *l_ptr)
+void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 {
        u32 msgcount = l_ptr->out_queue_size;
        struct sk_buff *crs = l_ptr->first_out;
@@ -2058,11 +1993,6 @@ void tipc_link_changeover(struct tipc_link *l_ptr)
        if (!tunnel)
                return;
 
-       if (!l_ptr->owner->permit_changeover) {
-               pr_warn("%speer did not permit changeover\n", link_co_err);
-               return;
-       }
-
        tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
                 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
@@ -2096,20 +2026,30 @@ void tipc_link_changeover(struct tipc_link *l_ptr)
                        msgcount = msg_msgcnt(msg);
                        while (msgcount--) {
                                msg_set_seqno(m, msg_seqno(msg));
-                               tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
-                                                msg_link_selector(m));
+                               tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
+                                                     msg_link_selector(m));
                                pos += align(msg_size(m));
                                m = (struct tipc_msg *)pos;
                        }
                } else {
-                       tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
-                                        msg_link_selector(msg));
+                       tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
+                                             msg_link_selector(msg));
                }
                crs = crs->next;
        }
 }
 
-void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
+/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a
+ * duplicate of the first link's send queue via the new link. This way, we
+ * are guaranteed that currently queued packets from a socket are delivered
+ * before future traffic from the same socket, even if this is using the
+ * new link. The last arriving copy of each duplicate packet is dropped at
+ * the receiving end by the regular protocol check, so packet cardinality
+ * and sequence order is preserved per sender/receiver socket pair.
+ * Owner node is locked.
+ */
+void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
+                             struct tipc_link *tunnel)
 {
        struct sk_buff *iter;
        struct tipc_msg tunnel_hdr;
@@ -2165,12 +2105,14 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
        return eb;
 }
 
-/*
- *  link_recv_changeover_msg(): Receive tunneled packet sent
- *  via other link. Node is locked. Return extracted buffer.
+/*  tipc_link_tunnel_rcv(): Receive a tunneled packet, sent
+ *  via other link as result of a failover (ORIGINAL_MSG) or
+ *  a new active link (DUPLICATE_MSG). Failover packets are
+ *  returned to the active link for delivery upwards.
+ *  Owner node is locked.
  */
-static int link_recv_changeover_msg(struct tipc_link **l_ptr,
-                                   struct sk_buff **buf)
+static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
+                               struct sk_buff **buf)
 {
        struct sk_buff *tunnel_buf = *buf;
        struct tipc_link *dest_link;
@@ -2307,11 +2249,7 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
                fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
                if (fragm == NULL) {
                        kfree_skb(buf);
-                       while (buf_chain) {
-                               buf = buf_chain;
-                               buf_chain = buf_chain->next;
-                               kfree_skb(buf);
-                       }
+                       kfree_skb_list(buf_chain);
                        return -ENOMEM;
                }
                msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);