]> Pileus Git - ~andy/linux/blob - drivers/block/drbd/drbd_receiver.c
Merge branch 'for-jens' of git://git.drbd.org/linux-drbd into for-3.6/drivers
[~andy/linux] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (page == NULL)
281                 return;
282
283         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
284                 i = page_chain_free(page);
285         else {
286                 struct page *tmp;
287                 tmp = page_chain_tail(page, &i);
288                 spin_lock(&drbd_pp_lock);
289                 page_chain_add(&drbd_pp_pool, page, tmp);
290                 drbd_pp_vacant += i;
291                 spin_unlock(&drbd_pp_lock);
292         }
293         i = atomic_sub_return(i, a);
294         if (i < 0)
295                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
296                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
297         wake_up(&drbd_pp_wait);
298 }
299
300 /*
301 You need to hold the req_lock:
302  _drbd_wait_ee_list_empty()
303
304 You must not have the req_lock:
305  drbd_free_ee()
306  drbd_alloc_ee()
307  drbd_init_ee()
308  drbd_release_ee()
309  drbd_ee_fix_bhs()
310  drbd_process_done_ee()
311  drbd_clear_done_ee()
312  drbd_wait_ee_list_empty()
313 */
314
315 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
316                                      u64 id,
317                                      sector_t sector,
318                                      unsigned int data_size,
319                                      gfp_t gfp_mask) __must_hold(local)
320 {
321         struct drbd_epoch_entry *e;
322         struct page *page = NULL;
323         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
324
325         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
326                 return NULL;
327
328         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
329         if (!e) {
330                 if (!(gfp_mask & __GFP_NOWARN))
331                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
332                 return NULL;
333         }
334
335         if (data_size) {
336                 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
337                 if (!page)
338                         goto fail;
339         }
340
341         INIT_HLIST_NODE(&e->collision);
342         e->epoch = NULL;
343         e->mdev = mdev;
344         e->pages = page;
345         atomic_set(&e->pending_bios, 0);
346         e->size = data_size;
347         e->flags = 0;
348         e->sector = sector;
349         e->block_id = id;
350
351         return e;
352
353  fail:
354         mempool_free(e, drbd_ee_mempool);
355         return NULL;
356 }
357
358 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
359 {
360         if (e->flags & EE_HAS_DIGEST)
361                 kfree(e->digest);
362         drbd_pp_free(mdev, e->pages, is_net);
363         D_ASSERT(atomic_read(&e->pending_bios) == 0);
364         D_ASSERT(hlist_unhashed(&e->collision));
365         mempool_free(e, drbd_ee_mempool);
366 }
367
368 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
369 {
370         LIST_HEAD(work_list);
371         struct drbd_epoch_entry *e, *t;
372         int count = 0;
373         int is_net = list == &mdev->net_ee;
374
375         spin_lock_irq(&mdev->req_lock);
376         list_splice_init(list, &work_list);
377         spin_unlock_irq(&mdev->req_lock);
378
379         list_for_each_entry_safe(e, t, &work_list, w.list) {
380                 drbd_free_some_ee(mdev, e, is_net);
381                 count++;
382         }
383         return count;
384 }
385
386
387 /*
388  * This function is called from _asender only_
389  * but see also comments in _req_mod(,barrier_acked)
390  * and receive_Barrier.
391  *
392  * Move entries from net_ee to done_ee, if ready.
393  * Grab done_ee, call all callbacks, free the entries.
394  * The callbacks typically send out ACKs.
395  */
396 static int drbd_process_done_ee(struct drbd_conf *mdev)
397 {
398         LIST_HEAD(work_list);
399         LIST_HEAD(reclaimed);
400         struct drbd_epoch_entry *e, *t;
401         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
402
403         spin_lock_irq(&mdev->req_lock);
404         reclaim_net_ee(mdev, &reclaimed);
405         list_splice_init(&mdev->done_ee, &work_list);
406         spin_unlock_irq(&mdev->req_lock);
407
408         list_for_each_entry_safe(e, t, &reclaimed, w.list)
409                 drbd_free_net_ee(mdev, e);
410
411         /* possible callbacks here:
412          * e_end_block, and e_end_resync_block, e_send_discard_ack.
413          * all ignore the last argument.
414          */
415         list_for_each_entry_safe(e, t, &work_list, w.list) {
416                 /* list_del not necessary, next/prev members not touched */
417                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
418                 drbd_free_ee(mdev, e);
419         }
420         wake_up(&mdev->ee_wait);
421
422         return ok;
423 }
424
425 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
426 {
427         DEFINE_WAIT(wait);
428
429         /* avoids spin_lock/unlock
430          * and calling prepare_to_wait in the fast path */
431         while (!list_empty(head)) {
432                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
433                 spin_unlock_irq(&mdev->req_lock);
434                 io_schedule();
435                 finish_wait(&mdev->ee_wait, &wait);
436                 spin_lock_irq(&mdev->req_lock);
437         }
438 }
439
440 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441 {
442         spin_lock_irq(&mdev->req_lock);
443         _drbd_wait_ee_list_empty(mdev, head);
444         spin_unlock_irq(&mdev->req_lock);
445 }
446
447 /* see also kernel_accept; which is only present since 2.6.18.
448  * also we want to log which part of it failed, exactly */
449 static int drbd_accept(struct drbd_conf *mdev, const char **what,
450                 struct socket *sock, struct socket **newsock)
451 {
452         struct sock *sk = sock->sk;
453         int err = 0;
454
455         *what = "listen";
456         err = sock->ops->listen(sock, 5);
457         if (err < 0)
458                 goto out;
459
460         *what = "sock_create_lite";
461         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
462                                newsock);
463         if (err < 0)
464                 goto out;
465
466         *what = "accept";
467         err = sock->ops->accept(sock, *newsock, 0);
468         if (err < 0) {
469                 sock_release(*newsock);
470                 *newsock = NULL;
471                 goto out;
472         }
473         (*newsock)->ops  = sock->ops;
474         __module_get((*newsock)->ops->owner);
475
476 out:
477         return err;
478 }
479
480 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
481                     void *buf, size_t size, int flags)
482 {
483         mm_segment_t oldfs;
484         struct kvec iov = {
485                 .iov_base = buf,
486                 .iov_len = size,
487         };
488         struct msghdr msg = {
489                 .msg_iovlen = 1,
490                 .msg_iov = (struct iovec *)&iov,
491                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
492         };
493         int rv;
494
495         oldfs = get_fs();
496         set_fs(KERNEL_DS);
497         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
498         set_fs(oldfs);
499
500         return rv;
501 }
502
503 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
504 {
505         mm_segment_t oldfs;
506         struct kvec iov = {
507                 .iov_base = buf,
508                 .iov_len = size,
509         };
510         struct msghdr msg = {
511                 .msg_iovlen = 1,
512                 .msg_iov = (struct iovec *)&iov,
513                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
514         };
515         int rv;
516
517         oldfs = get_fs();
518         set_fs(KERNEL_DS);
519
520         for (;;) {
521                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
522                 if (rv == size)
523                         break;
524
525                 /* Note:
526                  * ECONNRESET   other side closed the connection
527                  * ERESTARTSYS  (on  sock) we got a signal
528                  */
529
530                 if (rv < 0) {
531                         if (rv == -ECONNRESET)
532                                 dev_info(DEV, "sock was reset by peer\n");
533                         else if (rv != -ERESTARTSYS)
534                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
535                         break;
536                 } else if (rv == 0) {
537                         dev_info(DEV, "sock was shut down by peer\n");
538                         break;
539                 } else  {
540                         /* signal came in, or peer/link went down,
541                          * after we read a partial message
542                          */
543                         /* D_ASSERT(signal_pending(current)); */
544                         break;
545                 }
546         };
547
548         set_fs(oldfs);
549
550         if (rv != size)
551                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
552
553         return rv;
554 }
555
556 /* quoting tcp(7):
557  *   On individual connections, the socket buffer size must be set prior to the
558  *   listen(2) or connect(2) calls in order to have it take effect.
559  * This is our wrapper to do so.
560  */
561 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
562                 unsigned int rcv)
563 {
564         /* open coded SO_SNDBUF, SO_RCVBUF */
565         if (snd) {
566                 sock->sk->sk_sndbuf = snd;
567                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
568         }
569         if (rcv) {
570                 sock->sk->sk_rcvbuf = rcv;
571                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
572         }
573 }
574
575 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
576 {
577         const char *what;
578         struct socket *sock;
579         struct sockaddr_in6 src_in6;
580         int err;
581         int disconnect_on_error = 1;
582
583         if (!get_net_conf(mdev))
584                 return NULL;
585
586         what = "sock_create_kern";
587         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
588                 SOCK_STREAM, IPPROTO_TCP, &sock);
589         if (err < 0) {
590                 sock = NULL;
591                 goto out;
592         }
593
594         sock->sk->sk_rcvtimeo =
595         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
596         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
597                         mdev->net_conf->rcvbuf_size);
598
599        /* explicitly bind to the configured IP as source IP
600         *  for the outgoing connections.
601         *  This is needed for multihomed hosts and to be
602         *  able to use lo: interfaces for drbd.
603         * Make sure to use 0 as port number, so linux selects
604         *  a free one dynamically.
605         */
606         memcpy(&src_in6, mdev->net_conf->my_addr,
607                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
608         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
609                 src_in6.sin6_port = 0;
610         else
611                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
612
613         what = "bind before connect";
614         err = sock->ops->bind(sock,
615                               (struct sockaddr *) &src_in6,
616                               mdev->net_conf->my_addr_len);
617         if (err < 0)
618                 goto out;
619
620         /* connect may fail, peer not yet available.
621          * stay C_WF_CONNECTION, don't go Disconnecting! */
622         disconnect_on_error = 0;
623         what = "connect";
624         err = sock->ops->connect(sock,
625                                  (struct sockaddr *)mdev->net_conf->peer_addr,
626                                  mdev->net_conf->peer_addr_len, 0);
627
628 out:
629         if (err < 0) {
630                 if (sock) {
631                         sock_release(sock);
632                         sock = NULL;
633                 }
634                 switch (-err) {
635                         /* timeout, busy, signal pending */
636                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637                 case EINTR: case ERESTARTSYS:
638                         /* peer not (yet) available, network problem */
639                 case ECONNREFUSED: case ENETUNREACH:
640                 case EHOSTDOWN:    case EHOSTUNREACH:
641                         disconnect_on_error = 0;
642                         break;
643                 default:
644                         dev_err(DEV, "%s failed, err = %d\n", what, err);
645                 }
646                 if (disconnect_on_error)
647                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
648         }
649         put_net_conf(mdev);
650         return sock;
651 }
652
653 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
654 {
655         int timeo, err;
656         struct socket *s_estab = NULL, *s_listen;
657         const char *what;
658
659         if (!get_net_conf(mdev))
660                 return NULL;
661
662         what = "sock_create_kern";
663         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
664                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
665         if (err) {
666                 s_listen = NULL;
667                 goto out;
668         }
669
670         timeo = mdev->net_conf->try_connect_int * HZ;
671         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
672
673         s_listen->sk->sk_reuse    = SK_CAN_REUSE; /* SO_REUSEADDR */
674         s_listen->sk->sk_rcvtimeo = timeo;
675         s_listen->sk->sk_sndtimeo = timeo;
676         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
677                         mdev->net_conf->rcvbuf_size);
678
679         what = "bind before listen";
680         err = s_listen->ops->bind(s_listen,
681                               (struct sockaddr *) mdev->net_conf->my_addr,
682                               mdev->net_conf->my_addr_len);
683         if (err < 0)
684                 goto out;
685
686         err = drbd_accept(mdev, &what, s_listen, &s_estab);
687
688 out:
689         if (s_listen)
690                 sock_release(s_listen);
691         if (err < 0) {
692                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
693                         dev_err(DEV, "%s failed, err = %d\n", what, err);
694                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
695                 }
696         }
697         put_net_conf(mdev);
698
699         return s_estab;
700 }
701
702 static int drbd_send_fp(struct drbd_conf *mdev,
703         struct socket *sock, enum drbd_packets cmd)
704 {
705         struct p_header80 *h = &mdev->data.sbuf.header.h80;
706
707         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
708 }
709
710 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
711 {
712         struct p_header80 *h = &mdev->data.rbuf.header.h80;
713         int rr;
714
715         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
716
717         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
718                 return be16_to_cpu(h->command);
719
720         return 0xffff;
721 }
722
723 /**
724  * drbd_socket_okay() - Free the socket if its connection is not okay
725  * @mdev:       DRBD device.
726  * @sock:       pointer to the pointer to the socket.
727  */
728 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
729 {
730         int rr;
731         char tb[4];
732
733         if (!*sock)
734                 return false;
735
736         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
737
738         if (rr > 0 || rr == -EAGAIN) {
739                 return true;
740         } else {
741                 sock_release(*sock);
742                 *sock = NULL;
743                 return false;
744         }
745 }
746
747 /*
748  * return values:
749  *   1 yes, we have a valid connection
750  *   0 oops, did not work out, please try again
751  *  -1 peer talks different language,
752  *     no point in trying again, please go standalone.
753  *  -2 We do not have a network config...
754  */
755 static int drbd_connect(struct drbd_conf *mdev)
756 {
757         struct socket *s, *sock, *msock;
758         int try, h, ok;
759         enum drbd_state_rv rv;
760
761         D_ASSERT(!mdev->data.socket);
762
763         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
764                 return -2;
765
766         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
767
768         sock  = NULL;
769         msock = NULL;
770
771         do {
772                 for (try = 0;;) {
773                         /* 3 tries, this should take less than a second! */
774                         s = drbd_try_connect(mdev);
775                         if (s || ++try >= 3)
776                                 break;
777                         /* give the other side time to call bind() & listen() */
778                         schedule_timeout_interruptible(HZ / 10);
779                 }
780
781                 if (s) {
782                         if (!sock) {
783                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
784                                 sock = s;
785                                 s = NULL;
786                         } else if (!msock) {
787                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
788                                 msock = s;
789                                 s = NULL;
790                         } else {
791                                 dev_err(DEV, "Logic error in drbd_connect()\n");
792                                 goto out_release_sockets;
793                         }
794                 }
795
796                 if (sock && msock) {
797                         schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
798                         ok = drbd_socket_okay(mdev, &sock);
799                         ok = drbd_socket_okay(mdev, &msock) && ok;
800                         if (ok)
801                                 break;
802                 }
803
804 retry:
805                 s = drbd_wait_for_connect(mdev);
806                 if (s) {
807                         try = drbd_recv_fp(mdev, s);
808                         drbd_socket_okay(mdev, &sock);
809                         drbd_socket_okay(mdev, &msock);
810                         switch (try) {
811                         case P_HAND_SHAKE_S:
812                                 if (sock) {
813                                         dev_warn(DEV, "initial packet S crossed\n");
814                                         sock_release(sock);
815                                 }
816                                 sock = s;
817                                 break;
818                         case P_HAND_SHAKE_M:
819                                 if (msock) {
820                                         dev_warn(DEV, "initial packet M crossed\n");
821                                         sock_release(msock);
822                                 }
823                                 msock = s;
824                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
825                                 break;
826                         default:
827                                 dev_warn(DEV, "Error receiving initial packet\n");
828                                 sock_release(s);
829                                 if (random32() & 1)
830                                         goto retry;
831                         }
832                 }
833
834                 if (mdev->state.conn <= C_DISCONNECTING)
835                         goto out_release_sockets;
836                 if (signal_pending(current)) {
837                         flush_signals(current);
838                         smp_rmb();
839                         if (get_t_state(&mdev->receiver) == Exiting)
840                                 goto out_release_sockets;
841                 }
842
843                 if (sock && msock) {
844                         ok = drbd_socket_okay(mdev, &sock);
845                         ok = drbd_socket_okay(mdev, &msock) && ok;
846                         if (ok)
847                                 break;
848                 }
849         } while (1);
850
851         msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
852         sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
853
854         sock->sk->sk_allocation = GFP_NOIO;
855         msock->sk->sk_allocation = GFP_NOIO;
856
857         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
858         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
859
860         /* NOT YET ...
861          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
863          * first set it to the P_HAND_SHAKE timeout,
864          * which we set to 4x the configured ping_timeout. */
865         sock->sk->sk_sndtimeo =
866         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
867
868         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
869         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
870
871         /* we don't want delays.
872          * we use TCP_CORK where appropriate, though */
873         drbd_tcp_nodelay(sock);
874         drbd_tcp_nodelay(msock);
875
876         mdev->data.socket = sock;
877         mdev->meta.socket = msock;
878         mdev->last_received = jiffies;
879
880         D_ASSERT(mdev->asender.task == NULL);
881
882         h = drbd_do_handshake(mdev);
883         if (h <= 0)
884                 return h;
885
886         if (mdev->cram_hmac_tfm) {
887                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
888                 switch (drbd_do_auth(mdev)) {
889                 case -1:
890                         dev_err(DEV, "Authentication of peer failed\n");
891                         return -1;
892                 case 0:
893                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
894                         return 0;
895                 }
896         }
897
898         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
899         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
900
901         atomic_set(&mdev->packet_seq, 0);
902         mdev->peer_seq = 0;
903
904         if (drbd_send_protocol(mdev) == -1)
905                 return -1;
906         set_bit(STATE_SENT, &mdev->flags);
907         drbd_send_sync_param(mdev, &mdev->sync_conf);
908         drbd_send_sizes(mdev, 0, 0);
909         drbd_send_uuids(mdev);
910         drbd_send_current_state(mdev);
911         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
912         clear_bit(RESIZE_PENDING, &mdev->flags);
913
914         spin_lock_irq(&mdev->req_lock);
915         rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
916         if (mdev->state.conn != C_WF_REPORT_PARAMS)
917                 clear_bit(STATE_SENT, &mdev->flags);
918         spin_unlock_irq(&mdev->req_lock);
919
920         if (rv < SS_SUCCESS)
921                 return 0;
922
923         drbd_thread_start(&mdev->asender);
924         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
925
926         return 1;
927
928 out_release_sockets:
929         if (sock)
930                 sock_release(sock);
931         if (msock)
932                 sock_release(msock);
933         return -1;
934 }
935
936 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
937 {
938         union p_header *h = &mdev->data.rbuf.header;
939         int r;
940
941         r = drbd_recv(mdev, h, sizeof(*h));
942         if (unlikely(r != sizeof(*h))) {
943                 if (!signal_pending(current))
944                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
945                 return false;
946         }
947
948         if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
949                 *cmd = be16_to_cpu(h->h80.command);
950                 *packet_size = be16_to_cpu(h->h80.length);
951         } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
952                 *cmd = be16_to_cpu(h->h95.command);
953                 *packet_size = be32_to_cpu(h->h95.length);
954         } else {
955                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
956                     be32_to_cpu(h->h80.magic),
957                     be16_to_cpu(h->h80.command),
958                     be16_to_cpu(h->h80.length));
959                 return false;
960         }
961         mdev->last_received = jiffies;
962
963         return true;
964 }
965
966 static void drbd_flush(struct drbd_conf *mdev)
967 {
968         int rv;
969
970         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
971                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
972                                         NULL);
973                 if (rv) {
974                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
975                         /* would rather check on EOPNOTSUPP, but that is not reliable.
976                          * don't try again for ANY return value != 0
977                          * if (rv == -EOPNOTSUPP) */
978                         drbd_bump_write_ordering(mdev, WO_drain_io);
979                 }
980                 put_ldev(mdev);
981         }
982 }
983
984 /**
985  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
986  * @mdev:       DRBD device.
987  * @epoch:      Epoch object.
988  * @ev:         Epoch event.
989  */
990 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
991                                                struct drbd_epoch *epoch,
992                                                enum epoch_event ev)
993 {
994         int epoch_size;
995         struct drbd_epoch *next_epoch;
996         enum finish_epoch rv = FE_STILL_LIVE;
997
998         spin_lock(&mdev->epoch_lock);
999         do {
1000                 next_epoch = NULL;
1001
1002                 epoch_size = atomic_read(&epoch->epoch_size);
1003
1004                 switch (ev & ~EV_CLEANUP) {
1005                 case EV_PUT:
1006                         atomic_dec(&epoch->active);
1007                         break;
1008                 case EV_GOT_BARRIER_NR:
1009                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1010                         break;
1011                 case EV_BECAME_LAST:
1012                         /* nothing to do*/
1013                         break;
1014                 }
1015
1016                 if (epoch_size != 0 &&
1017                     atomic_read(&epoch->active) == 0 &&
1018                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1019                         if (!(ev & EV_CLEANUP)) {
1020                                 spin_unlock(&mdev->epoch_lock);
1021                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1022                                 spin_lock(&mdev->epoch_lock);
1023                         }
1024                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1025                                 dec_unacked(mdev);
1026
1027                         if (mdev->current_epoch != epoch) {
1028                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1029                                 list_del(&epoch->list);
1030                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1031                                 mdev->epochs--;
1032                                 kfree(epoch);
1033
1034                                 if (rv == FE_STILL_LIVE)
1035                                         rv = FE_DESTROYED;
1036                         } else {
1037                                 epoch->flags = 0;
1038                                 atomic_set(&epoch->epoch_size, 0);
1039                                 /* atomic_set(&epoch->active, 0); is already zero */
1040                                 if (rv == FE_STILL_LIVE)
1041                                         rv = FE_RECYCLED;
1042                                 wake_up(&mdev->ee_wait);
1043                         }
1044                 }
1045
1046                 if (!next_epoch)
1047                         break;
1048
1049                 epoch = next_epoch;
1050         } while (1);
1051
1052         spin_unlock(&mdev->epoch_lock);
1053
1054         return rv;
1055 }
1056
1057 /**
1058  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1059  * @mdev:       DRBD device.
1060  * @wo:         Write ordering method to try.
1061  */
1062 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1063 {
1064         enum write_ordering_e pwo;
1065         static char *write_ordering_str[] = {
1066                 [WO_none] = "none",
1067                 [WO_drain_io] = "drain",
1068                 [WO_bdev_flush] = "flush",
1069         };
1070
1071         pwo = mdev->write_ordering;
1072         wo = min(pwo, wo);
1073         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1074                 wo = WO_drain_io;
1075         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1076                 wo = WO_none;
1077         mdev->write_ordering = wo;
1078         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1079                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1080 }
1081
1082 /**
1083  * drbd_submit_ee()
1084  * @mdev:       DRBD device.
1085  * @e:          epoch entry
1086  * @rw:         flag field, see bio->bi_rw
1087  *
1088  * May spread the pages to multiple bios,
1089  * depending on bio_add_page restrictions.
1090  *
1091  * Returns 0 if all bios have been submitted,
1092  * -ENOMEM if we could not allocate enough bios,
1093  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1094  *  single page to an empty bio (which should never happen and likely indicates
1095  *  that the lower level IO stack is in some way broken). This has been observed
1096  *  on certain Xen deployments.
1097  */
1098 /* TODO allocate from our own bio_set. */
1099 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1100                 const unsigned rw, const int fault_type)
1101 {
1102         struct bio *bios = NULL;
1103         struct bio *bio;
1104         struct page *page = e->pages;
1105         sector_t sector = e->sector;
1106         unsigned ds = e->size;
1107         unsigned n_bios = 0;
1108         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1109         int err = -ENOMEM;
1110
1111         /* In most cases, we will only need one bio.  But in case the lower
1112          * level restrictions happen to be different at this offset on this
1113          * side than those of the sending peer, we may need to submit the
1114          * request in more than one bio.
1115          *
1116          * Plain bio_alloc is good enough here, this is no DRBD internally
1117          * generated bio, but a bio allocated on behalf of the peer.
1118          */
1119 next_bio:
1120         bio = bio_alloc(GFP_NOIO, nr_pages);
1121         if (!bio) {
1122                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1123                 goto fail;
1124         }
1125         /* > e->sector, unless this is the first bio */
1126         bio->bi_sector = sector;
1127         bio->bi_bdev = mdev->ldev->backing_bdev;
1128         bio->bi_rw = rw;
1129         bio->bi_private = e;
1130         bio->bi_end_io = drbd_endio_sec;
1131
1132         bio->bi_next = bios;
1133         bios = bio;
1134         ++n_bios;
1135
1136         page_chain_for_each(page) {
1137                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1138                 if (!bio_add_page(bio, page, len, 0)) {
1139                         /* A single page must always be possible!
1140                          * But in case it fails anyways,
1141                          * we deal with it, and complain (below). */
1142                         if (bio->bi_vcnt == 0) {
1143                                 dev_err(DEV,
1144                                         "bio_add_page failed for len=%u, "
1145                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1146                                         len, (unsigned long long)bio->bi_sector);
1147                                 err = -ENOSPC;
1148                                 goto fail;
1149                         }
1150                         goto next_bio;
1151                 }
1152                 ds -= len;
1153                 sector += len >> 9;
1154                 --nr_pages;
1155         }
1156         D_ASSERT(page == NULL);
1157         D_ASSERT(ds == 0);
1158
1159         atomic_set(&e->pending_bios, n_bios);
1160         do {
1161                 bio = bios;
1162                 bios = bios->bi_next;
1163                 bio->bi_next = NULL;
1164
1165                 drbd_generic_make_request(mdev, fault_type, bio);
1166         } while (bios);
1167         return 0;
1168
1169 fail:
1170         while (bios) {
1171                 bio = bios;
1172                 bios = bios->bi_next;
1173                 bio_put(bio);
1174         }
1175         return err;
1176 }
1177
1178 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1179 {
1180         int rv;
1181         struct p_barrier *p = &mdev->data.rbuf.barrier;
1182         struct drbd_epoch *epoch;
1183
1184         inc_unacked(mdev);
1185
1186         mdev->current_epoch->barrier_nr = p->barrier;
1187         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1188
1189         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1190          * the activity log, which means it would not be resynced in case the
1191          * R_PRIMARY crashes now.
1192          * Therefore we must send the barrier_ack after the barrier request was
1193          * completed. */
1194         switch (mdev->write_ordering) {
1195         case WO_none:
1196                 if (rv == FE_RECYCLED)
1197                         return true;
1198
1199                 /* receiver context, in the writeout path of the other node.
1200                  * avoid potential distributed deadlock */
1201                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1202                 if (epoch)
1203                         break;
1204                 else
1205                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1206                         /* Fall through */
1207
1208         case WO_bdev_flush:
1209         case WO_drain_io:
1210                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1211                 drbd_flush(mdev);
1212
1213                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1214                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1215                         if (epoch)
1216                                 break;
1217                 }
1218
1219                 epoch = mdev->current_epoch;
1220                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1221
1222                 D_ASSERT(atomic_read(&epoch->active) == 0);
1223                 D_ASSERT(epoch->flags == 0);
1224
1225                 return true;
1226         default:
1227                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1228                 return false;
1229         }
1230
1231         epoch->flags = 0;
1232         atomic_set(&epoch->epoch_size, 0);
1233         atomic_set(&epoch->active, 0);
1234
1235         spin_lock(&mdev->epoch_lock);
1236         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1237                 list_add(&epoch->list, &mdev->current_epoch->list);
1238                 mdev->current_epoch = epoch;
1239                 mdev->epochs++;
1240         } else {
1241                 /* The current_epoch got recycled while we allocated this one... */
1242                 kfree(epoch);
1243         }
1244         spin_unlock(&mdev->epoch_lock);
1245
1246         return true;
1247 }
1248
1249 /* used from receive_RSDataReply (recv_resync_read)
1250  * and from receive_Data */
1251 static struct drbd_epoch_entry *
1252 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1253 {
1254         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1255         struct drbd_epoch_entry *e;
1256         struct page *page;
1257         int dgs, ds, rr;
1258         void *dig_in = mdev->int_dig_in;
1259         void *dig_vv = mdev->int_dig_vv;
1260         unsigned long *data;
1261
1262         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1263                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1264
1265         if (dgs) {
1266                 rr = drbd_recv(mdev, dig_in, dgs);
1267                 if (rr != dgs) {
1268                         if (!signal_pending(current))
1269                                 dev_warn(DEV,
1270                                         "short read receiving data digest: read %d expected %d\n",
1271                                         rr, dgs);
1272                         return NULL;
1273                 }
1274         }
1275
1276         data_size -= dgs;
1277
1278         ERR_IF(data_size &  0x1ff) return NULL;
1279         ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1280
1281         /* even though we trust out peer,
1282          * we sometimes have to double check. */
1283         if (sector + (data_size>>9) > capacity) {
1284                 dev_err(DEV, "request from peer beyond end of local disk: "
1285                         "capacity: %llus < sector: %llus + size: %u\n",
1286                         (unsigned long long)capacity,
1287                         (unsigned long long)sector, data_size);
1288                 return NULL;
1289         }
1290
1291         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1292          * "criss-cross" setup, that might cause write-out on some other DRBD,
1293          * which in turn might block on the other node at this very place.  */
1294         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1295         if (!e)
1296                 return NULL;
1297
1298         if (!data_size)
1299                 return e;
1300
1301         ds = data_size;
1302         page = e->pages;
1303         page_chain_for_each(page) {
1304                 unsigned len = min_t(int, ds, PAGE_SIZE);
1305                 data = kmap(page);
1306                 rr = drbd_recv(mdev, data, len);
1307                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1308                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1309                         data[0] = data[0] ^ (unsigned long)-1;
1310                 }
1311                 kunmap(page);
1312                 if (rr != len) {
1313                         drbd_free_ee(mdev, e);
1314                         if (!signal_pending(current))
1315                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1316                                 rr, len);
1317                         return NULL;
1318                 }
1319                 ds -= rr;
1320         }
1321
1322         if (dgs) {
1323                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1324                 if (memcmp(dig_in, dig_vv, dgs)) {
1325                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1326                                 (unsigned long long)sector, data_size);
1327                         drbd_bcast_ee(mdev, "digest failed",
1328                                         dgs, dig_in, dig_vv, e);
1329                         drbd_free_ee(mdev, e);
1330                         return NULL;
1331                 }
1332         }
1333         mdev->recv_cnt += data_size>>9;
1334         return e;
1335 }
1336
1337 /* drbd_drain_block() just takes a data block
1338  * out of the socket input buffer, and discards it.
1339  */
1340 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1341 {
1342         struct page *page;
1343         int rr, rv = 1;
1344         void *data;
1345
1346         if (!data_size)
1347                 return true;
1348
1349         page = drbd_pp_alloc(mdev, 1, 1);
1350
1351         data = kmap(page);
1352         while (data_size) {
1353                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1354                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1355                         rv = 0;
1356                         if (!signal_pending(current))
1357                                 dev_warn(DEV,
1358                                         "short read receiving data: read %d expected %d\n",
1359                                         rr, min_t(int, data_size, PAGE_SIZE));
1360                         break;
1361                 }
1362                 data_size -= rr;
1363         }
1364         kunmap(page);
1365         drbd_pp_free(mdev, page, 0);
1366         return rv;
1367 }
1368
1369 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1370                            sector_t sector, int data_size)
1371 {
1372         struct bio_vec *bvec;
1373         struct bio *bio;
1374         int dgs, rr, i, expect;
1375         void *dig_in = mdev->int_dig_in;
1376         void *dig_vv = mdev->int_dig_vv;
1377
1378         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1379                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1380
1381         if (dgs) {
1382                 rr = drbd_recv(mdev, dig_in, dgs);
1383                 if (rr != dgs) {
1384                         if (!signal_pending(current))
1385                                 dev_warn(DEV,
1386                                         "short read receiving data reply digest: read %d expected %d\n",
1387                                         rr, dgs);
1388                         return 0;
1389                 }
1390         }
1391
1392         data_size -= dgs;
1393
1394         /* optimistically update recv_cnt.  if receiving fails below,
1395          * we disconnect anyways, and counters will be reset. */
1396         mdev->recv_cnt += data_size>>9;
1397
1398         bio = req->master_bio;
1399         D_ASSERT(sector == bio->bi_sector);
1400
1401         bio_for_each_segment(bvec, bio, i) {
1402                 expect = min_t(int, data_size, bvec->bv_len);
1403                 rr = drbd_recv(mdev,
1404                              kmap(bvec->bv_page)+bvec->bv_offset,
1405                              expect);
1406                 kunmap(bvec->bv_page);
1407                 if (rr != expect) {
1408                         if (!signal_pending(current))
1409                                 dev_warn(DEV, "short read receiving data reply: "
1410                                         "read %d expected %d\n",
1411                                         rr, expect);
1412                         return 0;
1413                 }
1414                 data_size -= rr;
1415         }
1416
1417         if (dgs) {
1418                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1419                 if (memcmp(dig_in, dig_vv, dgs)) {
1420                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1421                         return 0;
1422                 }
1423         }
1424
1425         D_ASSERT(data_size == 0);
1426         return 1;
1427 }
1428
1429 /* e_end_resync_block() is called via
1430  * drbd_process_done_ee() by asender only */
1431 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1432 {
1433         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1434         sector_t sector = e->sector;
1435         int ok;
1436
1437         D_ASSERT(hlist_unhashed(&e->collision));
1438
1439         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1440                 drbd_set_in_sync(mdev, sector, e->size);
1441                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1442         } else {
1443                 /* Record failure to sync */
1444                 drbd_rs_failed_io(mdev, sector, e->size);
1445
1446                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1447         }
1448         dec_unacked(mdev);
1449
1450         return ok;
1451 }
1452
1453 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1454 {
1455         struct drbd_epoch_entry *e;
1456
1457         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1458         if (!e)
1459                 goto fail;
1460
1461         dec_rs_pending(mdev);
1462
1463         inc_unacked(mdev);
1464         /* corresponding dec_unacked() in e_end_resync_block()
1465          * respective _drbd_clear_done_ee */
1466
1467         e->w.cb = e_end_resync_block;
1468
1469         spin_lock_irq(&mdev->req_lock);
1470         list_add(&e->w.list, &mdev->sync_ee);
1471         spin_unlock_irq(&mdev->req_lock);
1472
1473         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1474         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1475                 return true;
1476
1477         /* don't care for the reason here */
1478         dev_err(DEV, "submit failed, triggering re-connect\n");
1479         spin_lock_irq(&mdev->req_lock);
1480         list_del(&e->w.list);
1481         spin_unlock_irq(&mdev->req_lock);
1482
1483         drbd_free_ee(mdev, e);
1484 fail:
1485         put_ldev(mdev);
1486         return false;
1487 }
1488
1489 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1490 {
1491         struct drbd_request *req;
1492         sector_t sector;
1493         int ok;
1494         struct p_data *p = &mdev->data.rbuf.data;
1495
1496         sector = be64_to_cpu(p->sector);
1497
1498         spin_lock_irq(&mdev->req_lock);
1499         req = _ar_id_to_req(mdev, p->block_id, sector);
1500         spin_unlock_irq(&mdev->req_lock);
1501         if (unlikely(!req)) {
1502                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1503                 return false;
1504         }
1505
1506         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1507          * special casing it there for the various failure cases.
1508          * still no race with drbd_fail_pending_reads */
1509         ok = recv_dless_read(mdev, req, sector, data_size);
1510
1511         if (ok)
1512                 req_mod(req, data_received);
1513         /* else: nothing. handled from drbd_disconnect...
1514          * I don't think we may complete this just yet
1515          * in case we are "on-disconnect: freeze" */
1516
1517         return ok;
1518 }
1519
1520 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1521 {
1522         sector_t sector;
1523         int ok;
1524         struct p_data *p = &mdev->data.rbuf.data;
1525
1526         sector = be64_to_cpu(p->sector);
1527         D_ASSERT(p->block_id == ID_SYNCER);
1528
1529         if (get_ldev(mdev)) {
1530                 /* data is submitted to disk within recv_resync_read.
1531                  * corresponding put_ldev done below on error,
1532                  * or in drbd_endio_write_sec. */
1533                 ok = recv_resync_read(mdev, sector, data_size);
1534         } else {
1535                 if (__ratelimit(&drbd_ratelimit_state))
1536                         dev_err(DEV, "Can not write resync data to local disk.\n");
1537
1538                 ok = drbd_drain_block(mdev, data_size);
1539
1540                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1541         }
1542
1543         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1544
1545         return ok;
1546 }
1547
1548 /* e_end_block() is called via drbd_process_done_ee().
1549  * this means this function only runs in the asender thread
1550  */
1551 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1552 {
1553         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1554         sector_t sector = e->sector;
1555         int ok = 1, pcmd;
1556
1557         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1558                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1559                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1560                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1561                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1562                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1563                         ok &= drbd_send_ack(mdev, pcmd, e);
1564                         if (pcmd == P_RS_WRITE_ACK)
1565                                 drbd_set_in_sync(mdev, sector, e->size);
1566                 } else {
1567                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1568                         /* we expect it to be marked out of sync anyways...
1569                          * maybe assert this?  */
1570                 }
1571                 dec_unacked(mdev);
1572         }
1573         /* we delete from the conflict detection hash _after_ we sent out the
1574          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1575         if (mdev->net_conf->two_primaries) {
1576                 spin_lock_irq(&mdev->req_lock);
1577                 D_ASSERT(!hlist_unhashed(&e->collision));
1578                 hlist_del_init(&e->collision);
1579                 spin_unlock_irq(&mdev->req_lock);
1580         } else {
1581                 D_ASSERT(hlist_unhashed(&e->collision));
1582         }
1583
1584         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1585
1586         return ok;
1587 }
1588
1589 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1590 {
1591         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1592         int ok = 1;
1593
1594         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1595         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1596
1597         spin_lock_irq(&mdev->req_lock);
1598         D_ASSERT(!hlist_unhashed(&e->collision));
1599         hlist_del_init(&e->collision);
1600         spin_unlock_irq(&mdev->req_lock);
1601
1602         dec_unacked(mdev);
1603
1604         return ok;
1605 }
1606
1607 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1608 {
1609
1610         struct drbd_epoch_entry *rs_e;
1611         bool rv = 0;
1612
1613         spin_lock_irq(&mdev->req_lock);
1614         list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1615                 if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1616                         rv = 1;
1617                         break;
1618                 }
1619         }
1620         spin_unlock_irq(&mdev->req_lock);
1621
1622         return rv;
1623 }
1624
1625 /* Called from receive_Data.
1626  * Synchronize packets on sock with packets on msock.
1627  *
1628  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1629  * packet traveling on msock, they are still processed in the order they have
1630  * been sent.
1631  *
1632  * Note: we don't care for Ack packets overtaking P_DATA packets.
1633  *
1634  * In case packet_seq is larger than mdev->peer_seq number, there are
1635  * outstanding packets on the msock. We wait for them to arrive.
1636  * In case we are the logically next packet, we update mdev->peer_seq
1637  * ourselves. Correctly handles 32bit wrap around.
1638  *
1639  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1640  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1641  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1642  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1643  *
1644  * returns 0 if we may process the packet,
1645  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1646 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1647 {
1648         DEFINE_WAIT(wait);
1649         unsigned int p_seq;
1650         long timeout;
1651         int ret = 0;
1652         spin_lock(&mdev->peer_seq_lock);
1653         for (;;) {
1654                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1655                 if (seq_le(packet_seq, mdev->peer_seq+1))
1656                         break;
1657                 if (signal_pending(current)) {
1658                         ret = -ERESTARTSYS;
1659                         break;
1660                 }
1661                 p_seq = mdev->peer_seq;
1662                 spin_unlock(&mdev->peer_seq_lock);
1663                 timeout = schedule_timeout(30*HZ);
1664                 spin_lock(&mdev->peer_seq_lock);
1665                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1666                         ret = -ETIMEDOUT;
1667                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1668                         break;
1669                 }
1670         }
1671         finish_wait(&mdev->seq_wait, &wait);
1672         if (mdev->peer_seq+1 == packet_seq)
1673                 mdev->peer_seq++;
1674         spin_unlock(&mdev->peer_seq_lock);
1675         return ret;
1676 }
1677
1678 /* see also bio_flags_to_wire()
1679  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1680  * flags and back. We may replicate to other kernel versions. */
1681 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1682 {
1683         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1684                 (dpf & DP_FUA ? REQ_FUA : 0) |
1685                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1686                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1687 }
1688
1689 /* mirrored write */
1690 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1691 {
1692         sector_t sector;
1693         struct drbd_epoch_entry *e;
1694         struct p_data *p = &mdev->data.rbuf.data;
1695         int rw = WRITE;
1696         u32 dp_flags;
1697
1698         if (!get_ldev(mdev)) {
1699                 spin_lock(&mdev->peer_seq_lock);
1700                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1701                         mdev->peer_seq++;
1702                 spin_unlock(&mdev->peer_seq_lock);
1703
1704                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1705                 atomic_inc(&mdev->current_epoch->epoch_size);
1706                 return drbd_drain_block(mdev, data_size);
1707         }
1708
1709         /* get_ldev(mdev) successful.
1710          * Corresponding put_ldev done either below (on various errors),
1711          * or in drbd_endio_write_sec, if we successfully submit the data at
1712          * the end of this function. */
1713
1714         sector = be64_to_cpu(p->sector);
1715         e = read_in_block(mdev, p->block_id, sector, data_size);
1716         if (!e) {
1717                 put_ldev(mdev);
1718                 return false;
1719         }
1720
1721         e->w.cb = e_end_block;
1722
1723         dp_flags = be32_to_cpu(p->dp_flags);
1724         rw |= wire_flags_to_bio(mdev, dp_flags);
1725         if (e->pages == NULL) {
1726                 D_ASSERT(e->size == 0);
1727                 D_ASSERT(dp_flags & DP_FLUSH);
1728         }
1729
1730         if (dp_flags & DP_MAY_SET_IN_SYNC)
1731                 e->flags |= EE_MAY_SET_IN_SYNC;
1732
1733         spin_lock(&mdev->epoch_lock);
1734         e->epoch = mdev->current_epoch;
1735         atomic_inc(&e->epoch->epoch_size);
1736         atomic_inc(&e->epoch->active);
1737         spin_unlock(&mdev->epoch_lock);
1738
1739         /* I'm the receiver, I do hold a net_cnt reference. */
1740         if (!mdev->net_conf->two_primaries) {
1741                 spin_lock_irq(&mdev->req_lock);
1742         } else {
1743                 /* don't get the req_lock yet,
1744                  * we may sleep in drbd_wait_peer_seq */
1745                 const int size = e->size;
1746                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1747                 DEFINE_WAIT(wait);
1748                 struct drbd_request *i;
1749                 struct hlist_node *n;
1750                 struct hlist_head *slot;
1751                 int first;
1752
1753                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1754                 BUG_ON(mdev->ee_hash == NULL);
1755                 BUG_ON(mdev->tl_hash == NULL);
1756
1757                 /* conflict detection and handling:
1758                  * 1. wait on the sequence number,
1759                  *    in case this data packet overtook ACK packets.
1760                  * 2. check our hash tables for conflicting requests.
1761                  *    we only need to walk the tl_hash, since an ee can not
1762                  *    have a conflict with an other ee: on the submitting
1763                  *    node, the corresponding req had already been conflicting,
1764                  *    and a conflicting req is never sent.
1765                  *
1766                  * Note: for two_primaries, we are protocol C,
1767                  * so there cannot be any request that is DONE
1768                  * but still on the transfer log.
1769                  *
1770                  * unconditionally add to the ee_hash.
1771                  *
1772                  * if no conflicting request is found:
1773                  *    submit.
1774                  *
1775                  * if any conflicting request is found
1776                  * that has not yet been acked,
1777                  * AND I have the "discard concurrent writes" flag:
1778                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1779                  *
1780                  * if any conflicting request is found:
1781                  *       block the receiver, waiting on misc_wait
1782                  *       until no more conflicting requests are there,
1783                  *       or we get interrupted (disconnect).
1784                  *
1785                  *       we do not just write after local io completion of those
1786                  *       requests, but only after req is done completely, i.e.
1787                  *       we wait for the P_DISCARD_ACK to arrive!
1788                  *
1789                  *       then proceed normally, i.e. submit.
1790                  */
1791                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1792                         goto out_interrupted;
1793
1794                 spin_lock_irq(&mdev->req_lock);
1795
1796                 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1797
1798 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1799                 slot = tl_hash_slot(mdev, sector);
1800                 first = 1;
1801                 for (;;) {
1802                         int have_unacked = 0;
1803                         int have_conflict = 0;
1804                         prepare_to_wait(&mdev->misc_wait, &wait,
1805                                 TASK_INTERRUPTIBLE);
1806                         hlist_for_each_entry(i, n, slot, collision) {
1807                                 if (OVERLAPS) {
1808                                         /* only ALERT on first iteration,
1809                                          * we may be woken up early... */
1810                                         if (first)
1811                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1812                                                       " new: %llus +%u; pending: %llus +%u\n",
1813                                                       current->comm, current->pid,
1814                                                       (unsigned long long)sector, size,
1815                                                       (unsigned long long)i->sector, i->size);
1816                                         if (i->rq_state & RQ_NET_PENDING)
1817                                                 ++have_unacked;
1818                                         ++have_conflict;
1819                                 }
1820                         }
1821 #undef OVERLAPS
1822                         if (!have_conflict)
1823                                 break;
1824
1825                         /* Discard Ack only for the _first_ iteration */
1826                         if (first && discard && have_unacked) {
1827                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1828                                      (unsigned long long)sector);
1829                                 inc_unacked(mdev);
1830                                 e->w.cb = e_send_discard_ack;
1831                                 list_add_tail(&e->w.list, &mdev->done_ee);
1832
1833                                 spin_unlock_irq(&mdev->req_lock);
1834
1835                                 /* we could probably send that P_DISCARD_ACK ourselves,
1836                                  * but I don't like the receiver using the msock */
1837
1838                                 put_ldev(mdev);
1839                                 wake_asender(mdev);
1840                                 finish_wait(&mdev->misc_wait, &wait);
1841                                 return true;
1842                         }
1843
1844                         if (signal_pending(current)) {
1845                                 hlist_del_init(&e->collision);
1846
1847                                 spin_unlock_irq(&mdev->req_lock);
1848
1849                                 finish_wait(&mdev->misc_wait, &wait);
1850                                 goto out_interrupted;
1851                         }
1852
1853                         spin_unlock_irq(&mdev->req_lock);
1854                         if (first) {
1855                                 first = 0;
1856                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1857                                      "sec=%llus\n", (unsigned long long)sector);
1858                         } else if (discard) {
1859                                 /* we had none on the first iteration.
1860                                  * there must be none now. */
1861                                 D_ASSERT(have_unacked == 0);
1862                         }
1863                         schedule();
1864                         spin_lock_irq(&mdev->req_lock);
1865                 }
1866                 finish_wait(&mdev->misc_wait, &wait);
1867         }
1868
1869         list_add(&e->w.list, &mdev->active_ee);
1870         spin_unlock_irq(&mdev->req_lock);
1871
1872         if (mdev->state.conn == C_SYNC_TARGET)
1873                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1874
1875         switch (mdev->net_conf->wire_protocol) {
1876         case DRBD_PROT_C:
1877                 inc_unacked(mdev);
1878                 /* corresponding dec_unacked() in e_end_block()
1879                  * respective _drbd_clear_done_ee */
1880                 break;
1881         case DRBD_PROT_B:
1882                 /* I really don't like it that the receiver thread
1883                  * sends on the msock, but anyways */
1884                 drbd_send_ack(mdev, P_RECV_ACK, e);
1885                 break;
1886         case DRBD_PROT_A:
1887                 /* nothing to do */
1888                 break;
1889         }
1890
1891         if (mdev->state.pdsk < D_INCONSISTENT) {
1892                 /* In case we have the only disk of the cluster, */
1893                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1894                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1895                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1896                 drbd_al_begin_io(mdev, e->sector);
1897         }
1898
1899         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1900                 return true;
1901
1902         /* don't care for the reason here */
1903         dev_err(DEV, "submit failed, triggering re-connect\n");
1904         spin_lock_irq(&mdev->req_lock);
1905         list_del(&e->w.list);
1906         hlist_del_init(&e->collision);
1907         spin_unlock_irq(&mdev->req_lock);
1908         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1909                 drbd_al_complete_io(mdev, e->sector);
1910
1911 out_interrupted:
1912         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1913         put_ldev(mdev);
1914         drbd_free_ee(mdev, e);
1915         return false;
1916 }
1917
1918 /* We may throttle resync, if the lower device seems to be busy,
1919  * and current sync rate is above c_min_rate.
1920  *
1921  * To decide whether or not the lower device is busy, we use a scheme similar
1922  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1923  * (more than 64 sectors) of activity we cannot account for with our own resync
1924  * activity, it obviously is "busy".
1925  *
1926  * The current sync rate used here uses only the most recent two step marks,
1927  * to have a short time average so we can react faster.
1928  */
1929 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1930 {
1931         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1932         unsigned long db, dt, dbdt;
1933         struct lc_element *tmp;
1934         int curr_events;
1935         int throttle = 0;
1936
1937         /* feature disabled? */
1938         if (mdev->sync_conf.c_min_rate == 0)
1939                 return 0;
1940
1941         spin_lock_irq(&mdev->al_lock);
1942         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1943         if (tmp) {
1944                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1945                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1946                         spin_unlock_irq(&mdev->al_lock);
1947                         return 0;
1948                 }
1949                 /* Do not slow down if app IO is already waiting for this extent */
1950         }
1951         spin_unlock_irq(&mdev->al_lock);
1952
1953         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1954                       (int)part_stat_read(&disk->part0, sectors[1]) -
1955                         atomic_read(&mdev->rs_sect_ev);
1956
1957         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1958                 unsigned long rs_left;
1959                 int i;
1960
1961                 mdev->rs_last_events = curr_events;
1962
1963                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1964                  * approx. */
1965                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1966
1967                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1968                         rs_left = mdev->ov_left;
1969                 else
1970                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1971
1972                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1973                 if (!dt)
1974                         dt++;
1975                 db = mdev->rs_mark_left[i] - rs_left;
1976                 dbdt = Bit2KB(db/dt);
1977
1978                 if (dbdt > mdev->sync_conf.c_min_rate)
1979                         throttle = 1;
1980         }
1981         return throttle;
1982 }
1983
1984
1985 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1986 {
1987         sector_t sector;
1988         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1989         struct drbd_epoch_entry *e;
1990         struct digest_info *di = NULL;
1991         int size, verb;
1992         unsigned int fault_type;
1993         struct p_block_req *p = &mdev->data.rbuf.block_req;
1994
1995         sector = be64_to_cpu(p->sector);
1996         size   = be32_to_cpu(p->blksize);
1997
1998         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1999                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2000                                 (unsigned long long)sector, size);
2001                 return false;
2002         }
2003         if (sector + (size>>9) > capacity) {
2004                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2005                                 (unsigned long long)sector, size);
2006                 return false;
2007         }
2008
2009         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2010                 verb = 1;
2011                 switch (cmd) {
2012                 case P_DATA_REQUEST:
2013                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2014                         break;
2015                 case P_RS_DATA_REQUEST:
2016                 case P_CSUM_RS_REQUEST:
2017                 case P_OV_REQUEST:
2018                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2019                         break;
2020                 case P_OV_REPLY:
2021                         verb = 0;
2022                         dec_rs_pending(mdev);
2023                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2024                         break;
2025                 default:
2026                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2027                                 cmdname(cmd));
2028                 }
2029                 if (verb && __ratelimit(&drbd_ratelimit_state))
2030                         dev_err(DEV, "Can not satisfy peer's read request, "
2031                             "no local data.\n");
2032
2033                 /* drain possibly payload */
2034                 return drbd_drain_block(mdev, digest_size);
2035         }
2036
2037         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2038          * "criss-cross" setup, that might cause write-out on some other DRBD,
2039          * which in turn might block on the other node at this very place.  */
2040         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2041         if (!e) {
2042                 put_ldev(mdev);
2043                 return false;
2044         }
2045
2046         switch (cmd) {
2047         case P_DATA_REQUEST:
2048                 e->w.cb = w_e_end_data_req;
2049                 fault_type = DRBD_FAULT_DT_RD;
2050                 /* application IO, don't drbd_rs_begin_io */
2051                 goto submit;
2052
2053         case P_RS_DATA_REQUEST:
2054                 e->w.cb = w_e_end_rsdata_req;
2055                 fault_type = DRBD_FAULT_RS_RD;
2056                 /* used in the sector offset progress display */
2057                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2058                 break;
2059
2060         case P_OV_REPLY:
2061         case P_CSUM_RS_REQUEST:
2062                 fault_type = DRBD_FAULT_RS_RD;
2063                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2064                 if (!di)
2065                         goto out_free_e;
2066
2067                 di->digest_size = digest_size;
2068                 di->digest = (((char *)di)+sizeof(struct digest_info));
2069
2070                 e->digest = di;
2071                 e->flags |= EE_HAS_DIGEST;
2072
2073                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2074                         goto out_free_e;
2075
2076                 if (cmd == P_CSUM_RS_REQUEST) {
2077                         D_ASSERT(mdev->agreed_pro_version >= 89);
2078                         e->w.cb = w_e_end_csum_rs_req;
2079                         /* used in the sector offset progress display */
2080                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2081                 } else if (cmd == P_OV_REPLY) {
2082                         /* track progress, we may need to throttle */
2083                         atomic_add(size >> 9, &mdev->rs_sect_in);
2084                         e->w.cb = w_e_end_ov_reply;
2085                         dec_rs_pending(mdev);
2086                         /* drbd_rs_begin_io done when we sent this request,
2087                          * but accounting still needs to be done. */
2088                         goto submit_for_resync;
2089                 }
2090                 break;
2091
2092         case P_OV_REQUEST:
2093                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2094                     mdev->agreed_pro_version >= 90) {
2095                         unsigned long now = jiffies;
2096                         int i;
2097                         mdev->ov_start_sector = sector;
2098                         mdev->ov_position = sector;
2099                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2100                         mdev->rs_total = mdev->ov_left;
2101                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2102                                 mdev->rs_mark_left[i] = mdev->ov_left;
2103                                 mdev->rs_mark_time[i] = now;
2104                         }
2105                         dev_info(DEV, "Online Verify start sector: %llu\n",
2106                                         (unsigned long long)sector);
2107                 }
2108                 e->w.cb = w_e_end_ov_req;
2109                 fault_type = DRBD_FAULT_RS_RD;
2110                 break;
2111
2112         default:
2113                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2114                     cmdname(cmd));
2115                 fault_type = DRBD_FAULT_MAX;
2116                 goto out_free_e;
2117         }
2118
2119         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2120          * wrt the receiver, but it is not as straightforward as it may seem.
2121          * Various places in the resync start and stop logic assume resync
2122          * requests are processed in order, requeuing this on the worker thread
2123          * introduces a bunch of new code for synchronization between threads.
2124          *
2125          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2126          * "forever", throttling after drbd_rs_begin_io will lock that extent
2127          * for application writes for the same time.  For now, just throttle
2128          * here, where the rest of the code expects the receiver to sleep for
2129          * a while, anyways.
2130          */
2131
2132         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2133          * this defers syncer requests for some time, before letting at least
2134          * on request through.  The resync controller on the receiving side
2135          * will adapt to the incoming rate accordingly.
2136          *
2137          * We cannot throttle here if remote is Primary/SyncTarget:
2138          * we would also throttle its application reads.
2139          * In that case, throttling is done on the SyncTarget only.
2140          */
2141         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2142                 schedule_timeout_uninterruptible(HZ/10);
2143         if (drbd_rs_begin_io(mdev, sector))
2144                 goto out_free_e;
2145
2146 submit_for_resync:
2147         atomic_add(size >> 9, &mdev->rs_sect_ev);
2148
2149 submit:
2150         inc_unacked(mdev);
2151         spin_lock_irq(&mdev->req_lock);
2152         list_add_tail(&e->w.list, &mdev->read_ee);
2153         spin_unlock_irq(&mdev->req_lock);
2154
2155         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2156                 return true;
2157
2158         /* don't care for the reason here */
2159         dev_err(DEV, "submit failed, triggering re-connect\n");
2160         spin_lock_irq(&mdev->req_lock);
2161         list_del(&e->w.list);
2162         spin_unlock_irq(&mdev->req_lock);
2163         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2164
2165 out_free_e:
2166         put_ldev(mdev);
2167         drbd_free_ee(mdev, e);
2168         return false;
2169 }
2170
2171 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2172 {
2173         int self, peer, rv = -100;
2174         unsigned long ch_self, ch_peer;
2175
2176         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2177         peer = mdev->p_uuid[UI_BITMAP] & 1;
2178
2179         ch_peer = mdev->p_uuid[UI_SIZE];
2180         ch_self = mdev->comm_bm_set;
2181
2182         switch (mdev->net_conf->after_sb_0p) {
2183         case ASB_CONSENSUS:
2184         case ASB_DISCARD_SECONDARY:
2185         case ASB_CALL_HELPER:
2186                 dev_err(DEV, "Configuration error.\n");
2187                 break;
2188         case ASB_DISCONNECT:
2189                 break;
2190         case ASB_DISCARD_YOUNGER_PRI:
2191                 if (self == 0 && peer == 1) {
2192                         rv = -1;
2193                         break;
2194                 }
2195                 if (self == 1 && peer == 0) {
2196                         rv =  1;
2197                         break;
2198                 }
2199                 /* Else fall through to one of the other strategies... */
2200         case ASB_DISCARD_OLDER_PRI:
2201                 if (self == 0 && peer == 1) {
2202                         rv = 1;
2203                         break;
2204                 }
2205                 if (self == 1 && peer == 0) {
2206                         rv = -1;
2207                         break;
2208                 }
2209                 /* Else fall through to one of the other strategies... */
2210                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2211                      "Using discard-least-changes instead\n");
2212         case ASB_DISCARD_ZERO_CHG:
2213                 if (ch_peer == 0 && ch_self == 0) {
2214                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2215                                 ? -1 : 1;
2216                         break;
2217                 } else {
2218                         if (ch_peer == 0) { rv =  1; break; }
2219                         if (ch_self == 0) { rv = -1; break; }
2220                 }
2221                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2222                         break;
2223         case ASB_DISCARD_LEAST_CHG:
2224                 if      (ch_self < ch_peer)
2225                         rv = -1;
2226                 else if (ch_self > ch_peer)
2227                         rv =  1;
2228                 else /* ( ch_self == ch_peer ) */
2229                      /* Well, then use something else. */
2230                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2231                                 ? -1 : 1;
2232                 break;
2233         case ASB_DISCARD_LOCAL:
2234                 rv = -1;
2235                 break;
2236         case ASB_DISCARD_REMOTE:
2237                 rv =  1;
2238         }
2239
2240         return rv;
2241 }
2242
2243 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2244 {
2245         int hg, rv = -100;
2246
2247         switch (mdev->net_conf->after_sb_1p) {
2248         case ASB_DISCARD_YOUNGER_PRI:
2249         case ASB_DISCARD_OLDER_PRI:
2250         case ASB_DISCARD_LEAST_CHG:
2251         case ASB_DISCARD_LOCAL:
2252         case ASB_DISCARD_REMOTE:
2253                 dev_err(DEV, "Configuration error.\n");
2254                 break;
2255         case ASB_DISCONNECT:
2256                 break;
2257         case ASB_CONSENSUS:
2258                 hg = drbd_asb_recover_0p(mdev);
2259                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2260                         rv = hg;
2261                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2262                         rv = hg;
2263                 break;
2264         case ASB_VIOLENTLY:
2265                 rv = drbd_asb_recover_0p(mdev);
2266                 break;
2267         case ASB_DISCARD_SECONDARY:
2268                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2269         case ASB_CALL_HELPER:
2270                 hg = drbd_asb_recover_0p(mdev);
2271                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2272                         enum drbd_state_rv rv2;
2273
2274                         drbd_set_role(mdev, R_SECONDARY, 0);
2275                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2276                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2277                           * we do not need to wait for the after state change work either. */
2278                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2279                         if (rv2 != SS_SUCCESS) {
2280                                 drbd_khelper(mdev, "pri-lost-after-sb");
2281                         } else {
2282                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2283                                 rv = hg;
2284                         }
2285                 } else
2286                         rv = hg;
2287         }
2288
2289         return rv;
2290 }
2291
2292 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2293 {
2294         int hg, rv = -100;
2295
2296         switch (mdev->net_conf->after_sb_2p) {
2297         case ASB_DISCARD_YOUNGER_PRI:
2298         case ASB_DISCARD_OLDER_PRI:
2299         case ASB_DISCARD_LEAST_CHG:
2300         case ASB_DISCARD_LOCAL:
2301         case ASB_DISCARD_REMOTE:
2302         case ASB_CONSENSUS:
2303         case ASB_DISCARD_SECONDARY:
2304                 dev_err(DEV, "Configuration error.\n");
2305                 break;
2306         case ASB_VIOLENTLY:
2307                 rv = drbd_asb_recover_0p(mdev);
2308                 break;
2309         case ASB_DISCONNECT:
2310                 break;
2311         case ASB_CALL_HELPER:
2312                 hg = drbd_asb_recover_0p(mdev);
2313                 if (hg == -1) {
2314                         enum drbd_state_rv rv2;
2315
2316                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2318                           * we do not need to wait for the after state change work either. */
2319                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320                         if (rv2 != SS_SUCCESS) {
2321                                 drbd_khelper(mdev, "pri-lost-after-sb");
2322                         } else {
2323                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2324                                 rv = hg;
2325                         }
2326                 } else
2327                         rv = hg;
2328         }
2329
2330         return rv;
2331 }
2332
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334                            u64 bits, u64 flags)
2335 {
2336         if (!uuid) {
2337                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338                 return;
2339         }
2340         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341              text,
2342              (unsigned long long)uuid[UI_CURRENT],
2343              (unsigned long long)uuid[UI_BITMAP],
2344              (unsigned long long)uuid[UI_HISTORY_START],
2345              (unsigned long long)uuid[UI_HISTORY_END],
2346              (unsigned long long)bits,
2347              (unsigned long long)flags);
2348 }
2349
2350 /*
2351   100   after split brain try auto recover
2352     2   C_SYNC_SOURCE set BitMap
2353     1   C_SYNC_SOURCE use BitMap
2354     0   no Sync
2355    -1   C_SYNC_TARGET use BitMap
2356    -2   C_SYNC_TARGET set BitMap
2357  -100   after split brain, disconnect
2358 -1000   unrelated data
2359 -1091   requires proto 91
2360 -1096   requires proto 96
2361  */
2362 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2363 {
2364         u64 self, peer;
2365         int i, j;
2366
2367         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2368         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2369
2370         *rule_nr = 10;
2371         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2372                 return 0;
2373
2374         *rule_nr = 20;
2375         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2376              peer != UUID_JUST_CREATED)
2377                 return -2;
2378
2379         *rule_nr = 30;
2380         if (self != UUID_JUST_CREATED &&
2381             (peer == UUID_JUST_CREATED || peer == (u64)0))
2382                 return 2;
2383
2384         if (self == peer) {
2385                 int rct, dc; /* roles at crash time */
2386
2387                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2388
2389                         if (mdev->agreed_pro_version < 91)
2390                                 return -1091;
2391
2392                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2393                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2394                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2395                                 drbd_uuid_set_bm(mdev, 0UL);
2396
2397                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2398                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2399                                 *rule_nr = 34;
2400                         } else {
2401                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2402                                 *rule_nr = 36;
2403                         }
2404
2405                         return 1;
2406                 }
2407
2408                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2409
2410                         if (mdev->agreed_pro_version < 91)
2411                                 return -1091;
2412
2413                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2414                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2415                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2416
2417                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2418                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2419                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2420
2421                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2422                                 *rule_nr = 35;
2423                         } else {
2424                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2425                                 *rule_nr = 37;
2426                         }
2427
2428                         return -1;
2429                 }
2430
2431                 /* Common power [off|failure] */
2432                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2433                         (mdev->p_uuid[UI_FLAGS] & 2);
2434                 /* lowest bit is set when we were primary,
2435                  * next bit (weight 2) is set when peer was primary */
2436                 *rule_nr = 40;
2437
2438                 switch (rct) {
2439                 case 0: /* !self_pri && !peer_pri */ return 0;
2440                 case 1: /*  self_pri && !peer_pri */ return 1;
2441                 case 2: /* !self_pri &&  peer_pri */ return -1;
2442                 case 3: /*  self_pri &&  peer_pri */
2443                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2444                         return dc ? -1 : 1;
2445                 }
2446         }
2447
2448         *rule_nr = 50;
2449         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2450         if (self == peer)
2451                 return -1;
2452
2453         *rule_nr = 51;
2454         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2455         if (self == peer) {
2456                 if (mdev->agreed_pro_version < 96 ?
2457                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2458                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2459                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2460                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2461                            resync as sync source modifications of the peer's UUIDs. */
2462
2463                         if (mdev->agreed_pro_version < 91)
2464                                 return -1091;
2465
2466                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2467                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2468
2469                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2470                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2471
2472                         return -1;
2473                 }
2474         }
2475
2476         *rule_nr = 60;
2477         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2478         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2479                 peer = mdev->p_uuid[i] & ~((u64)1);
2480                 if (self == peer)
2481                         return -2;
2482         }
2483
2484         *rule_nr = 70;
2485         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2486         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2487         if (self == peer)
2488                 return 1;
2489
2490         *rule_nr = 71;
2491         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2492         if (self == peer) {
2493                 if (mdev->agreed_pro_version < 96 ?
2494                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2495                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2496                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2497                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2498                            resync as sync source modifications of our UUIDs. */
2499
2500                         if (mdev->agreed_pro_version < 91)
2501                                 return -1091;
2502
2503                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2504                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2505
2506                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2507                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2508                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2509
2510                         return 1;
2511                 }
2512         }
2513
2514
2515         *rule_nr = 80;
2516         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2517         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2518                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2519                 if (self == peer)
2520                         return 2;
2521         }
2522
2523         *rule_nr = 90;
2524         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2525         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2526         if (self == peer && self != ((u64)0))
2527                 return 100;
2528
2529         *rule_nr = 100;
2530         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2531                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2532                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2533                         peer = mdev->p_uuid[j] & ~((u64)1);
2534                         if (self == peer)
2535                                 return -100;
2536                 }
2537         }
2538
2539         return -1000;
2540 }
2541
2542 /* drbd_sync_handshake() returns the new conn state on success, or
2543    CONN_MASK (-1) on failure.
2544  */
2545 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2546                                            enum drbd_disk_state peer_disk) __must_hold(local)
2547 {
2548         int hg, rule_nr;
2549         enum drbd_conns rv = C_MASK;
2550         enum drbd_disk_state mydisk;
2551
2552         mydisk = mdev->state.disk;
2553         if (mydisk == D_NEGOTIATING)
2554                 mydisk = mdev->new_state_tmp.disk;
2555
2556         dev_info(DEV, "drbd_sync_handshake:\n");
2557         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2558         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2559                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2560
2561         hg = drbd_uuid_compare(mdev, &rule_nr);
2562
2563         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2564
2565         if (hg == -1000) {
2566                 dev_alert(DEV, "Unrelated data, aborting!\n");
2567                 return C_MASK;
2568         }
2569         if (hg < -1000) {
2570                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2571                 return C_MASK;
2572         }
2573
2574         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2575             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2576                 int f = (hg == -100) || abs(hg) == 2;
2577                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2578                 if (f)
2579                         hg = hg*2;
2580                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2581                      hg > 0 ? "source" : "target");
2582         }
2583
2584         if (abs(hg) == 100)
2585                 drbd_khelper(mdev, "initial-split-brain");
2586
2587         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2588                 int pcount = (mdev->state.role == R_PRIMARY)
2589                            + (peer_role == R_PRIMARY);
2590                 int forced = (hg == -100);
2591
2592                 switch (pcount) {
2593                 case 0:
2594                         hg = drbd_asb_recover_0p(mdev);
2595                         break;
2596                 case 1:
2597                         hg = drbd_asb_recover_1p(mdev);
2598                         break;
2599                 case 2:
2600                         hg = drbd_asb_recover_2p(mdev);
2601                         break;
2602                 }
2603                 if (abs(hg) < 100) {
2604                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2605                              "automatically solved. Sync from %s node\n",
2606                              pcount, (hg < 0) ? "peer" : "this");
2607                         if (forced) {
2608                                 dev_warn(DEV, "Doing a full sync, since"
2609                                      " UUIDs where ambiguous.\n");
2610                                 hg = hg*2;
2611                         }
2612                 }
2613         }
2614
2615         if (hg == -100) {
2616                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2617                         hg = -1;
2618                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2619                         hg = 1;
2620
2621                 if (abs(hg) < 100)
2622                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2623                              "Sync from %s node\n",
2624                              (hg < 0) ? "peer" : "this");
2625         }
2626
2627         if (hg == -100) {
2628                 /* FIXME this log message is not correct if we end up here
2629                  * after an attempted attach on a diskless node.
2630                  * We just refuse to attach -- well, we drop the "connection"
2631                  * to that disk, in a way... */
2632                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2633                 drbd_khelper(mdev, "split-brain");
2634                 return C_MASK;
2635         }
2636
2637         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2638                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2639                 return C_MASK;
2640         }
2641
2642         if (hg < 0 && /* by intention we do not use mydisk here. */
2643             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2644                 switch (mdev->net_conf->rr_conflict) {
2645                 case ASB_CALL_HELPER:
2646                         drbd_khelper(mdev, "pri-lost");
2647                         /* fall through */
2648                 case ASB_DISCONNECT:
2649                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2650                         return C_MASK;
2651                 case ASB_VIOLENTLY:
2652                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2653                              "assumption\n");
2654                 }
2655         }
2656
2657         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2658                 if (hg == 0)
2659                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2660                 else
2661                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2662                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2663                                  abs(hg) >= 2 ? "full" : "bit-map based");
2664                 return C_MASK;
2665         }
2666
2667         if (abs(hg) >= 2) {
2668                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2669                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2670                                         BM_LOCKED_SET_ALLOWED))
2671                         return C_MASK;
2672         }
2673
2674         if (hg > 0) { /* become sync source. */
2675                 rv = C_WF_BITMAP_S;
2676         } else if (hg < 0) { /* become sync target */
2677                 rv = C_WF_BITMAP_T;
2678         } else {
2679                 rv = C_CONNECTED;
2680                 if (drbd_bm_total_weight(mdev)) {
2681                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2682                              drbd_bm_total_weight(mdev));
2683                 }
2684         }
2685
2686         return rv;
2687 }
2688
2689 /* returns 1 if invalid */
2690 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2691 {
2692         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2693         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2694             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2695                 return 0;
2696
2697         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2698         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2699             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2700                 return 1;
2701
2702         /* everything else is valid if they are equal on both sides. */
2703         if (peer == self)
2704                 return 0;
2705
2706         /* everything es is invalid. */
2707         return 1;
2708 }
2709
2710 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2711 {
2712         struct p_protocol *p = &mdev->data.rbuf.protocol;
2713         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2714         int p_want_lose, p_two_primaries, cf;
2715         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2716
2717         p_proto         = be32_to_cpu(p->protocol);
2718         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2719         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2720         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2721         p_two_primaries = be32_to_cpu(p->two_primaries);
2722         cf              = be32_to_cpu(p->conn_flags);
2723         p_want_lose = cf & CF_WANT_LOSE;
2724
2725         clear_bit(CONN_DRY_RUN, &mdev->flags);
2726
2727         if (cf & CF_DRY_RUN)
2728                 set_bit(CONN_DRY_RUN, &mdev->flags);
2729
2730         if (p_proto != mdev->net_conf->wire_protocol) {
2731                 dev_err(DEV, "incompatible communication protocols\n");
2732                 goto disconnect;
2733         }
2734
2735         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2736                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2737                 goto disconnect;
2738         }
2739
2740         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2741                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2742                 goto disconnect;
2743         }
2744
2745         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2746                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2747                 goto disconnect;
2748         }
2749
2750         if (p_want_lose && mdev->net_conf->want_lose) {
2751                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2752                 goto disconnect;
2753         }
2754
2755         if (p_two_primaries != mdev->net_conf->two_primaries) {
2756                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2757                 goto disconnect;
2758         }
2759
2760         if (mdev->agreed_pro_version >= 87) {
2761                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2762
2763                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2764                         return false;
2765
2766                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2767                 if (strcmp(p_integrity_alg, my_alg)) {
2768                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2769                         goto disconnect;
2770                 }
2771                 dev_info(DEV, "data-integrity-alg: %s\n",
2772                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2773         }
2774
2775         return true;
2776
2777 disconnect:
2778         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2779         return false;
2780 }
2781
2782 /* helper function
2783  * input: alg name, feature name
2784  * return: NULL (alg name was "")
2785  *         ERR_PTR(error) if something goes wrong
2786  *         or the crypto hash ptr, if it worked out ok. */
2787 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2788                 const char *alg, const char *name)
2789 {
2790         struct crypto_hash *tfm;
2791
2792         if (!alg[0])
2793                 return NULL;
2794
2795         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2796         if (IS_ERR(tfm)) {
2797                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2798                         alg, name, PTR_ERR(tfm));
2799                 return tfm;
2800         }
2801         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2802                 crypto_free_hash(tfm);
2803                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2804                 return ERR_PTR(-EINVAL);
2805         }
2806         return tfm;
2807 }
2808
2809 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2810 {
2811         int ok = true;
2812         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2813         unsigned int header_size, data_size, exp_max_sz;
2814         struct crypto_hash *verify_tfm = NULL;
2815         struct crypto_hash *csums_tfm = NULL;
2816         const int apv = mdev->agreed_pro_version;
2817         int *rs_plan_s = NULL;
2818         int fifo_size = 0;
2819
2820         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2821                     : apv == 88 ? sizeof(struct p_rs_param)
2822                                         + SHARED_SECRET_MAX
2823                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2824                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2825
2826         if (packet_size > exp_max_sz) {
2827                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2828                     packet_size, exp_max_sz);
2829                 return false;
2830         }
2831
2832         if (apv <= 88) {
2833                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2834                 data_size   = packet_size  - header_size;
2835         } else if (apv <= 94) {
2836                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2837                 data_size   = packet_size  - header_size;
2838                 D_ASSERT(data_size == 0);
2839         } else {
2840                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2841                 data_size   = packet_size  - header_size;
2842                 D_ASSERT(data_size == 0);
2843         }
2844
2845         /* initialize verify_alg and csums_alg */
2846         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2847
2848         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2849                 return false;
2850
2851         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2852
2853         if (apv >= 88) {
2854                 if (apv == 88) {
2855                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2856                                 dev_err(DEV, "verify-alg of wrong size, "
2857                                         "peer wants %u, accepting only up to %u byte\n",
2858                                         data_size, SHARED_SECRET_MAX);
2859                                 return false;
2860                         }
2861
2862                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2863                                 return false;
2864
2865                         /* we expect NUL terminated string */
2866                         /* but just in case someone tries to be evil */
2867                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2868                         p->verify_alg[data_size-1] = 0;
2869
2870                 } else /* apv >= 89 */ {
2871                         /* we still expect NUL terminated strings */
2872                         /* but just in case someone tries to be evil */
2873                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2874                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2875                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2876                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2877                 }
2878
2879                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2880                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2881                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2882                                     mdev->sync_conf.verify_alg, p->verify_alg);
2883                                 goto disconnect;
2884                         }
2885                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2886                                         p->verify_alg, "verify-alg");
2887                         if (IS_ERR(verify_tfm)) {
2888                                 verify_tfm = NULL;
2889                                 goto disconnect;
2890                         }
2891                 }
2892
2893                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2894                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2895                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2896                                     mdev->sync_conf.csums_alg, p->csums_alg);
2897                                 goto disconnect;
2898                         }
2899                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2900                                         p->csums_alg, "csums-alg");
2901                         if (IS_ERR(csums_tfm)) {
2902                                 csums_tfm = NULL;
2903                                 goto disconnect;
2904                         }
2905                 }
2906
2907                 if (apv > 94) {
2908                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2909                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2910                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2911                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2912                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2913
2914                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2915                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2916                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2917                                 if (!rs_plan_s) {
2918                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2919                                         goto disconnect;
2920                                 }
2921                         }
2922                 }
2923
2924                 spin_lock(&mdev->peer_seq_lock);
2925                 /* lock against drbd_nl_syncer_conf() */
2926                 if (verify_tfm) {
2927                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2928                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2929                         crypto_free_hash(mdev->verify_tfm);
2930                         mdev->verify_tfm = verify_tfm;
2931                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2932                 }
2933                 if (csums_tfm) {
2934                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2935                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2936                         crypto_free_hash(mdev->csums_tfm);
2937                         mdev->csums_tfm = csums_tfm;
2938                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2939                 }
2940                 if (fifo_size != mdev->rs_plan_s.size) {
2941                         kfree(mdev->rs_plan_s.values);
2942                         mdev->rs_plan_s.values = rs_plan_s;
2943                         mdev->rs_plan_s.size   = fifo_size;
2944                         mdev->rs_planed = 0;
2945                 }
2946                 spin_unlock(&mdev->peer_seq_lock);
2947         }
2948
2949         return ok;
2950 disconnect:
2951         /* just for completeness: actually not needed,
2952          * as this is not reached if csums_tfm was ok. */
2953         crypto_free_hash(csums_tfm);
2954         /* but free the verify_tfm again, if csums_tfm did not work out */
2955         crypto_free_hash(verify_tfm);
2956         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2957         return false;
2958 }
2959
2960 /* warn if the arguments differ by more than 12.5% */
2961 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2962         const char *s, sector_t a, sector_t b)
2963 {
2964         sector_t d;
2965         if (a == 0 || b == 0)
2966                 return;
2967         d = (a > b) ? (a - b) : (b - a);
2968         if (d > (a>>3) || d > (b>>3))
2969                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2970                      (unsigned long long)a, (unsigned long long)b);
2971 }
2972
2973 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2974 {
2975         struct p_sizes *p = &mdev->data.rbuf.sizes;
2976         enum determine_dev_size dd = unchanged;
2977         sector_t p_size, p_usize, my_usize;
2978         int ldsc = 0; /* local disk size changed */
2979         enum dds_flags ddsf;
2980
2981         p_size = be64_to_cpu(p->d_size);
2982         p_usize = be64_to_cpu(p->u_size);
2983
2984         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2985                 dev_err(DEV, "some backing storage is needed\n");
2986                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2987                 return false;
2988         }
2989
2990         /* just store the peer's disk size for now.
2991          * we still need to figure out whether we accept that. */
2992         mdev->p_size = p_size;
2993
2994         if (get_ldev(mdev)) {
2995                 warn_if_differ_considerably(mdev, "lower level device sizes",
2996                            p_size, drbd_get_max_capacity(mdev->ldev));
2997                 warn_if_differ_considerably(mdev, "user requested size",
2998                                             p_usize, mdev->ldev->dc.disk_size);
2999
3000                 /* if this is the first connect, or an otherwise expected
3001                  * param exchange, choose the minimum */
3002                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3003                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3004                                              p_usize);
3005
3006                 my_usize = mdev->ldev->dc.disk_size;
3007
3008                 if (mdev->ldev->dc.disk_size != p_usize) {
3009                         mdev->ldev->dc.disk_size = p_usize;
3010                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3011                              (unsigned long)mdev->ldev->dc.disk_size);
3012                 }
3013
3014                 /* Never shrink a device with usable data during connect.
3015                    But allow online shrinking if we are connected. */
3016                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3017                    drbd_get_capacity(mdev->this_bdev) &&
3018                    mdev->state.disk >= D_OUTDATED &&
3019                    mdev->state.conn < C_CONNECTED) {
3020                         dev_err(DEV, "The peer's disk size is too small!\n");
3021                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3022                         mdev->ldev->dc.disk_size = my_usize;
3023                         put_ldev(mdev);
3024                         return false;
3025                 }
3026                 put_ldev(mdev);
3027         }
3028
3029         ddsf = be16_to_cpu(p->dds_flags);
3030         if (get_ldev(mdev)) {
3031                 dd = drbd_determine_dev_size(mdev, ddsf);
3032                 put_ldev(mdev);
3033                 if (dd == dev_size_error)
3034                         return false;
3035                 drbd_md_sync(mdev);
3036         } else {
3037                 /* I am diskless, need to accept the peer's size. */
3038                 drbd_set_my_capacity(mdev, p_size);
3039         }
3040
3041         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3042         drbd_reconsider_max_bio_size(mdev);
3043
3044         if (get_ldev(mdev)) {
3045                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3046                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3047                         ldsc = 1;
3048                 }
3049
3050                 put_ldev(mdev);
3051         }
3052
3053         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3054                 if (be64_to_cpu(p->c_size) !=
3055                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3056                         /* we have different sizes, probably peer
3057                          * needs to know my new size... */
3058                         drbd_send_sizes(mdev, 0, ddsf);
3059                 }
3060                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3061                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3062                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3063                             mdev->state.disk >= D_INCONSISTENT) {
3064                                 if (ddsf & DDSF_NO_RESYNC)
3065                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3066                                 else
3067                                         resync_after_online_grow(mdev);
3068                         } else
3069                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3070                 }
3071         }
3072
3073         return true;
3074 }
3075
3076 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3077 {
3078         struct p_uuids *p = &mdev->data.rbuf.uuids;
3079         u64 *p_uuid;
3080         int i, updated_uuids = 0;
3081
3082         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3083
3084         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3085                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3086
3087         kfree(mdev->p_uuid);
3088         mdev->p_uuid = p_uuid;
3089
3090         if (mdev->state.conn < C_CONNECTED &&
3091             mdev->state.disk < D_INCONSISTENT &&
3092             mdev->state.role == R_PRIMARY &&
3093             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3094                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3095                     (unsigned long long)mdev->ed_uuid);
3096                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3097                 return false;
3098         }
3099
3100         if (get_ldev(mdev)) {
3101                 int skip_initial_sync =
3102                         mdev->state.conn == C_CONNECTED &&
3103                         mdev->agreed_pro_version >= 90 &&
3104                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3105                         (p_uuid[UI_FLAGS] & 8);
3106                 if (skip_initial_sync) {
3107                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3108                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3109                                         "clear_n_write from receive_uuids",
3110                                         BM_LOCKED_TEST_ALLOWED);
3111                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3112                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3113                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3114                                         CS_VERBOSE, NULL);
3115                         drbd_md_sync(mdev);
3116                         updated_uuids = 1;
3117                 }
3118                 put_ldev(mdev);
3119         } else if (mdev->state.disk < D_INCONSISTENT &&
3120                    mdev->state.role == R_PRIMARY) {
3121                 /* I am a diskless primary, the peer just created a new current UUID
3122                    for me. */
3123                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3124         }
3125
3126         /* Before we test for the disk state, we should wait until an eventually
3127            ongoing cluster wide state change is finished. That is important if
3128            we are primary and are detaching from our disk. We need to see the
3129            new disk state... */
3130         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3131         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3132                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3133
3134         if (updated_uuids)
3135                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3136
3137         return true;
3138 }
3139
3140 /**
3141  * convert_state() - Converts the peer's view of the cluster state to our point of view
3142  * @ps:         The state as seen by the peer.
3143  */
3144 static union drbd_state convert_state(union drbd_state ps)
3145 {
3146         union drbd_state ms;
3147
3148         static enum drbd_conns c_tab[] = {
3149                 [C_CONNECTED] = C_CONNECTED,
3150
3151                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3152                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3153                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3154                 [C_VERIFY_S]       = C_VERIFY_T,
3155                 [C_MASK]   = C_MASK,
3156         };
3157
3158         ms.i = ps.i;
3159
3160         ms.conn = c_tab[ps.conn];
3161         ms.peer = ps.role;
3162         ms.role = ps.peer;
3163         ms.pdsk = ps.disk;
3164         ms.disk = ps.pdsk;
3165         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3166
3167         return ms;
3168 }
3169
3170 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3171 {
3172         struct p_req_state *p = &mdev->data.rbuf.req_state;
3173         union drbd_state mask, val;
3174         enum drbd_state_rv rv;
3175
3176         mask.i = be32_to_cpu(p->mask);
3177         val.i = be32_to_cpu(p->val);
3178
3179         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3180             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3181                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3182                 return true;
3183         }
3184
3185         mask = convert_state(mask);
3186         val = convert_state(val);
3187
3188         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3189
3190         drbd_send_sr_reply(mdev, rv);
3191         drbd_md_sync(mdev);
3192
3193         return true;
3194 }
3195
3196 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3197 {
3198         struct p_state *p = &mdev->data.rbuf.state;
3199         union drbd_state os, ns, peer_state;
3200         enum drbd_disk_state real_peer_disk;
3201         enum chg_state_flags cs_flags;
3202         int rv;
3203
3204         peer_state.i = be32_to_cpu(p->state);
3205
3206         real_peer_disk = peer_state.disk;
3207         if (peer_state.disk == D_NEGOTIATING) {
3208                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3209                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3210         }
3211
3212         spin_lock_irq(&mdev->req_lock);
3213  retry:
3214         os = ns = mdev->state;
3215         spin_unlock_irq(&mdev->req_lock);
3216
3217         /* If some other part of the code (asender thread, timeout)
3218          * already decided to close the connection again,
3219          * we must not "re-establish" it here. */
3220         if (os.conn <= C_TEAR_DOWN)
3221                 return false;
3222
3223         /* If this is the "end of sync" confirmation, usually the peer disk
3224          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3225          * set) resync started in PausedSyncT, or if the timing of pause-/
3226          * unpause-sync events has been "just right", the peer disk may
3227          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3228          */
3229         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3230             real_peer_disk == D_UP_TO_DATE &&
3231             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3232                 /* If we are (becoming) SyncSource, but peer is still in sync
3233                  * preparation, ignore its uptodate-ness to avoid flapping, it
3234                  * will change to inconsistent once the peer reaches active
3235                  * syncing states.
3236                  * It may have changed syncer-paused flags, however, so we
3237                  * cannot ignore this completely. */
3238                 if (peer_state.conn > C_CONNECTED &&
3239                     peer_state.conn < C_SYNC_SOURCE)
3240                         real_peer_disk = D_INCONSISTENT;
3241
3242                 /* if peer_state changes to connected at the same time,
3243                  * it explicitly notifies us that it finished resync.
3244                  * Maybe we should finish it up, too? */
3245                 else if (os.conn >= C_SYNC_SOURCE &&
3246                          peer_state.conn == C_CONNECTED) {
3247                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3248                                 drbd_resync_finished(mdev);
3249                         return true;
3250                 }
3251         }
3252
3253         /* peer says his disk is inconsistent, while we think it is uptodate,
3254          * and this happens while the peer still thinks we have a sync going on,
3255          * but we think we are already done with the sync.
3256          * We ignore this to avoid flapping pdsk.
3257          * This should not happen, if the peer is a recent version of drbd. */
3258         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3259             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3260                 real_peer_disk = D_UP_TO_DATE;
3261
3262         if (ns.conn == C_WF_REPORT_PARAMS)
3263                 ns.conn = C_CONNECTED;
3264
3265         if (peer_state.conn == C_AHEAD)
3266                 ns.conn = C_BEHIND;
3267
3268         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3269             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3270                 int cr; /* consider resync */
3271
3272                 /* if we established a new connection */
3273                 cr  = (os.conn < C_CONNECTED);
3274                 /* if we had an established connection
3275                  * and one of the nodes newly attaches a disk */
3276                 cr |= (os.conn == C_CONNECTED &&
3277                        (peer_state.disk == D_NEGOTIATING ||
3278                         os.disk == D_NEGOTIATING));
3279                 /* if we have both been inconsistent, and the peer has been
3280                  * forced to be UpToDate with --overwrite-data */
3281                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3282                 /* if we had been plain connected, and the admin requested to
3283                  * start a sync by "invalidate" or "invalidate-remote" */
3284                 cr |= (os.conn == C_CONNECTED &&
3285                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3286                                  peer_state.conn <= C_WF_BITMAP_T));
3287
3288                 if (cr)
3289                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3290
3291                 put_ldev(mdev);
3292                 if (ns.conn == C_MASK) {
3293                         ns.conn = C_CONNECTED;
3294                         if (mdev->state.disk == D_NEGOTIATING) {
3295                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3296                         } else if (peer_state.disk == D_NEGOTIATING) {
3297                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3298                                 peer_state.disk = D_DISKLESS;
3299                                 real_peer_disk = D_DISKLESS;
3300                         } else {
3301                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3302                                         return false;
3303                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3304                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3305                                 return false;
3306                         }
3307                 }
3308         }
3309
3310         spin_lock_irq(&mdev->req_lock);
3311         if (mdev->state.i != os.i)
3312                 goto retry;
3313         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3314         ns.peer = peer_state.role;
3315         ns.pdsk = real_peer_disk;
3316         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3317         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3318                 ns.disk = mdev->new_state_tmp.disk;
3319         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3320         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3321             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3322                 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3323                    for temporal network outages! */
3324                 spin_unlock_irq(&mdev->req_lock);
3325                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3326                 tl_clear(mdev);
3327                 drbd_uuid_new_current(mdev);
3328                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3329                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3330                 return false;
3331         }
3332         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3333         ns = mdev->state;
3334         spin_unlock_irq(&mdev->req_lock);
3335
3336         if (rv < SS_SUCCESS) {
3337                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3338                 return false;
3339         }
3340
3341         if (os.conn > C_WF_REPORT_PARAMS) {
3342                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3343                     peer_state.disk != D_NEGOTIATING ) {
3344                         /* we want resync, peer has not yet decided to sync... */
3345                         /* Nowadays only used when forcing a node into primary role and
3346                            setting its disk to UpToDate with that */
3347                         drbd_send_uuids(mdev);
3348                         drbd_send_current_state(mdev);
3349                 }
3350         }
3351
3352         mdev->net_conf->want_lose = 0;
3353
3354         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3355
3356         return true;
3357 }
3358
3359 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3360 {
3361         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3362
3363         wait_event(mdev->misc_wait,
3364                    mdev->state.conn == C_WF_SYNC_UUID ||
3365                    mdev->state.conn == C_BEHIND ||
3366                    mdev->state.conn < C_CONNECTED ||
3367                    mdev->state.disk < D_NEGOTIATING);
3368
3369         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3370
3371         /* Here the _drbd_uuid_ functions are right, current should
3372            _not_ be rotated into the history */
3373         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3374                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3375                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3376
3377                 drbd_print_uuids(mdev, "updated sync uuid");
3378                 drbd_start_resync(mdev, C_SYNC_TARGET);
3379
3380                 put_ldev(mdev);
3381         } else
3382                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3383
3384         return true;
3385 }
3386
3387 /**
3388  * receive_bitmap_plain
3389  *
3390  * Return 0 when done, 1 when another iteration is needed, and a negative error
3391  * code upon failure.
3392  */
3393 static int
3394 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3395                      unsigned long *buffer, struct bm_xfer_ctx *c)
3396 {
3397         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3398         unsigned want = num_words * sizeof(long);
3399         int err;
3400
3401         if (want != data_size) {
3402                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3403                 return -EIO;
3404         }
3405         if (want == 0)
3406                 return 0;
3407         err = drbd_recv(mdev, buffer, want);
3408         if (err != want) {
3409                 if (err >= 0)
3410                         err = -EIO;
3411                 return err;
3412         }
3413
3414         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3415
3416         c->word_offset += num_words;
3417         c->bit_offset = c->word_offset * BITS_PER_LONG;
3418         if (c->bit_offset > c->bm_bits)
3419                 c->bit_offset = c->bm_bits;
3420
3421         return 1;
3422 }
3423
3424 /**
3425  * recv_bm_rle_bits
3426  *
3427  * Return 0 when done, 1 when another iteration is needed, and a negative error
3428  * code upon failure.
3429  */
3430 static int
3431 recv_bm_rle_bits(struct drbd_conf *mdev,
3432                 struct p_compressed_bm *p,
3433                 struct bm_xfer_ctx *c)
3434 {
3435         struct bitstream bs;
3436         u64 look_ahead;
3437         u64 rl;
3438         u64 tmp;
3439         unsigned long s = c->bit_offset;
3440         unsigned long e;
3441         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3442         int toggle = DCBP_get_start(p);
3443         int have;
3444         int bits;
3445
3446         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3447
3448         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3449         if (bits < 0)
3450                 return -EIO;
3451
3452         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3453                 bits = vli_decode_bits(&rl, look_ahead);
3454                 if (bits <= 0)
3455                         return -EIO;
3456
3457                 if (toggle) {
3458                         e = s + rl -1;
3459                         if (e >= c->bm_bits) {
3460                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3461                                 return -EIO;
3462                         }
3463                         _drbd_bm_set_bits(mdev, s, e);
3464                 }
3465
3466                 if (have < bits) {
3467                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3468                                 have, bits, look_ahead,
3469                                 (unsigned int)(bs.cur.b - p->code),
3470                                 (unsigned int)bs.buf_len);
3471                         return -EIO;
3472                 }
3473                 look_ahead >>= bits;
3474                 have -= bits;
3475
3476                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3477                 if (bits < 0)
3478                         return -EIO;
3479                 look_ahead |= tmp << have;
3480                 have += bits;
3481         }
3482
3483         c->bit_offset = s;
3484         bm_xfer_ctx_bit_to_word_offset(c);
3485
3486         return (s != c->bm_bits);
3487 }
3488
3489 /**
3490  * decode_bitmap_c
3491  *
3492  * Return 0 when done, 1 when another iteration is needed, and a negative error
3493  * code upon failure.
3494  */
3495 static int
3496 decode_bitmap_c(struct drbd_conf *mdev,
3497                 struct p_compressed_bm *p,
3498                 struct bm_xfer_ctx *c)
3499 {
3500         if (DCBP_get_code(p) == RLE_VLI_Bits)
3501                 return recv_bm_rle_bits(mdev, p, c);
3502
3503         /* other variants had been implemented for evaluation,
3504          * but have been dropped as this one turned out to be "best"
3505          * during all our tests. */
3506
3507         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3508         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3509         return -EIO;
3510 }
3511
3512 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3513                 const char *direction, struct bm_xfer_ctx *c)
3514 {
3515         /* what would it take to transfer it "plaintext" */
3516         unsigned plain = sizeof(struct p_header80) *
3517                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3518                 + c->bm_words * sizeof(long);
3519         unsigned total = c->bytes[0] + c->bytes[1];
3520         unsigned r;
3521
3522         /* total can not be zero. but just in case: */
3523         if (total == 0)
3524                 return;
3525
3526         /* don't report if not compressed */
3527         if (total >= plain)
3528                 return;
3529
3530         /* total < plain. check for overflow, still */
3531         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3532                                     : (1000 * total / plain);
3533
3534         if (r > 1000)
3535                 r = 1000;
3536
3537         r = 1000 - r;
3538         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3539              "total %u; compression: %u.%u%%\n",
3540                         direction,
3541                         c->bytes[1], c->packets[1],
3542                         c->bytes[0], c->packets[0],
3543                         total, r/10, r % 10);
3544 }
3545
3546 /* Since we are processing the bitfield from lower addresses to higher,
3547    it does not matter if the process it in 32 bit chunks or 64 bit
3548    chunks as long as it is little endian. (Understand it as byte stream,
3549    beginning with the lowest byte...) If we would use big endian
3550    we would need to process it from the highest address to the lowest,
3551    in order to be agnostic to the 32 vs 64 bits issue.
3552
3553    returns 0 on failure, 1 if we successfully received it. */
3554 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3555 {
3556         struct bm_xfer_ctx c;
3557         void *buffer;
3558         int err;
3559         int ok = false;
3560         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3561
3562         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3563         /* you are supposed to send additional out-of-sync information
3564          * if you actually set bits during this phase */
3565
3566         /* maybe we should use some per thread scratch page,
3567          * and allocate that during initial device creation? */
3568         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3569         if (!buffer) {
3570                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3571                 goto out;
3572         }
3573
3574         c = (struct bm_xfer_ctx) {
3575                 .bm_bits = drbd_bm_bits(mdev),
3576                 .bm_words = drbd_bm_words(mdev),
3577         };
3578
3579         for(;;) {
3580                 if (cmd == P_BITMAP) {
3581                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3582                 } else if (cmd == P_COMPRESSED_BITMAP) {
3583                         /* MAYBE: sanity check that we speak proto >= 90,
3584                          * and the feature is enabled! */
3585                         struct p_compressed_bm *p;
3586
3587                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3588                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3589                                 goto out;
3590                         }
3591                         /* use the page buff */
3592                         p = buffer;
3593                         memcpy(p, h, sizeof(*h));
3594                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3595                                 goto out;
3596                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3597                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3598                                 goto out;
3599                         }
3600                         err = decode_bitmap_c(mdev, p, &c);
3601                 } else {
3602                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3603                         goto out;
3604                 }
3605
3606                 c.packets[cmd == P_BITMAP]++;
3607                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3608
3609                 if (err <= 0) {
3610                         if (err < 0)
3611                                 goto out;
3612                         break;
3613                 }
3614                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3615                         goto out;
3616         }
3617
3618         INFO_bm_xfer_stats(mdev, "receive", &c);
3619
3620         if (mdev->state.conn == C_WF_BITMAP_T) {
3621                 enum drbd_state_rv rv;
3622
3623                 ok = !drbd_send_bitmap(mdev);
3624                 if (!ok)
3625                         goto out;
3626                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3627                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3628                 D_ASSERT(rv == SS_SUCCESS);
3629         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3630                 /* admin may have requested C_DISCONNECTING,
3631                  * other threads may have noticed network errors */
3632                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3633                     drbd_conn_str(mdev->state.conn));
3634         }
3635
3636         ok = true;
3637  out:
3638         drbd_bm_unlock(mdev);
3639         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3640                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3641         free_page((unsigned long) buffer);
3642         return ok;
3643 }
3644
3645 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3646 {
3647         /* TODO zero copy sink :) */
3648         static char sink[128];
3649         int size, want, r;
3650
3651         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3652                  cmd, data_size);
3653
3654         size = data_size;
3655         while (size > 0) {
3656                 want = min_t(int, size, sizeof(sink));
3657                 r = drbd_recv(mdev, sink, want);
3658                 ERR_IF(r <= 0) break;
3659                 size -= r;
3660         }
3661         return size == 0;
3662 }
3663
3664 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3665 {
3666         /* Make sure we've acked all the TCP data associated
3667          * with the data requests being unplugged */
3668         drbd_tcp_quickack(mdev->data.socket);
3669
3670         return true;
3671 }
3672
3673 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3674 {
3675         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3676
3677         switch (mdev->state.conn) {
3678         case C_WF_SYNC_UUID:
3679         case C_WF_BITMAP_T:
3680         case C_BEHIND:
3681                         break;
3682         default:
3683                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3684                                 drbd_conn_str(mdev->state.conn));
3685         }
3686
3687         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3688
3689         return true;
3690 }
3691
3692 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3693
3694 struct data_cmd {
3695         int expect_payload;
3696         size_t pkt_size;
3697         drbd_cmd_handler_f function;
3698 };
3699
3700 static struct data_cmd drbd_cmd_handler[] = {
3701         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3702         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3703         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3704         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3705         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3706         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3707         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3708         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3709         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3710         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3711         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3712         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3713         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3714         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3715         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3716         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3717         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3718         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3719         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3720         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3721         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3722         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3723         /* anything missing from this table is in
3724          * the asender_tbl, see get_asender_cmd */
3725         [P_MAX_CMD]         = { 0, 0, NULL },
3726 };
3727
3728 /* All handler functions that expect a sub-header get that sub-heder in
3729    mdev->data.rbuf.header.head.payload.
3730
3731    Usually in mdev->data.rbuf.header.head the callback can find the usual
3732    p_header, but they may not rely on that. Since there is also p_header95 !
3733  */
3734
3735 static void drbdd(struct drbd_conf *mdev)
3736 {
3737         union p_header *header = &mdev->data.rbuf.header;
3738         unsigned int packet_size;
3739         enum drbd_packets cmd;
3740         size_t shs; /* sub header size */
3741         int rv;
3742
3743         while (get_t_state(&mdev->receiver) == Running) {
3744                 drbd_thread_current_set_cpu(mdev);
3745                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3746                         goto err_out;
3747
3748                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3749                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3750                         goto err_out;
3751                 }
3752
3753                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3754                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3755                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3756                         goto err_out;
3757                 }
3758
3759                 if (shs) {
3760                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3761                         if (unlikely(rv != shs)) {
3762                                 if (!signal_pending(current))
3763                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3764                                 goto err_out;
3765                         }
3766                 }
3767
3768                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3769
3770                 if (unlikely(!rv)) {
3771                         dev_err(DEV, "error receiving %s, l: %d!\n",
3772                             cmdname(cmd), packet_size);
3773                         goto err_out;
3774                 }
3775         }
3776
3777         if (0) {
3778         err_out:
3779                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3780         }
3781         /* If we leave here, we probably want to update at least the
3782          * "Connected" indicator on stable storage. Do so explicitly here. */
3783         drbd_md_sync(mdev);
3784 }
3785
3786 void drbd_flush_workqueue(struct drbd_conf *mdev)
3787 {
3788         struct drbd_wq_barrier barr;
3789
3790         barr.w.cb = w_prev_work_done;
3791         init_completion(&barr.done);
3792         drbd_queue_work(&mdev->data.work, &barr.w);
3793         wait_for_completion(&barr.done);
3794 }
3795
3796 void drbd_free_tl_hash(struct drbd_conf *mdev)
3797 {
3798         struct hlist_head *h;
3799
3800         spin_lock_irq(&mdev->req_lock);
3801
3802         if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3803                 spin_unlock_irq(&mdev->req_lock);
3804                 return;
3805         }
3806         /* paranoia code */
3807         for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3808                 if (h->first)
3809                         dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3810                                 (int)(h - mdev->ee_hash), h->first);
3811         kfree(mdev->ee_hash);
3812         mdev->ee_hash = NULL;
3813         mdev->ee_hash_s = 0;
3814
3815         /* We may not have had the chance to wait for all locally pending
3816          * application requests. The hlist_add_fake() prevents access after
3817          * free on master bio completion. */
3818         for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) {
3819                 struct drbd_request *req;
3820                 struct hlist_node *pos, *n;
3821                 hlist_for_each_entry_safe(req, pos, n, h, collision) {
3822                         hlist_del_init(&req->collision);
3823                         hlist_add_fake(&req->collision);
3824                 }
3825         }
3826
3827         kfree(mdev->tl_hash);
3828         mdev->tl_hash = NULL;
3829         mdev->tl_hash_s = 0;
3830         spin_unlock_irq(&mdev->req_lock);
3831 }
3832
3833 static void drbd_disconnect(struct drbd_conf *mdev)
3834 {
3835         enum drbd_fencing_p fp;
3836         union drbd_state os, ns;
3837         int rv = SS_UNKNOWN_ERROR;
3838         unsigned int i;
3839
3840         if (mdev->state.conn == C_STANDALONE)
3841                 return;
3842
3843         /* We are about to start the cleanup after connection loss.
3844          * Make sure drbd_make_request knows about that.
3845          * Usually we should be in some network failure state already,
3846          * but just in case we are not, we fix it up here.
3847          */
3848         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3849
3850         /* asender does not clean up anything. it must not interfere, either */
3851         drbd_thread_stop(&mdev->asender);
3852         drbd_free_sock(mdev);
3853
3854         /* wait for current activity to cease. */
3855         spin_lock_irq(&mdev->req_lock);
3856         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3857         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3858         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3859         spin_unlock_irq(&mdev->req_lock);
3860
3861         /* We do not have data structures that would allow us to
3862          * get the rs_pending_cnt down to 0 again.
3863          *  * On C_SYNC_TARGET we do not have any data structures describing
3864          *    the pending RSDataRequest's we have sent.
3865          *  * On C_SYNC_SOURCE there is no data structure that tracks
3866          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3867          *  And no, it is not the sum of the reference counts in the
3868          *  resync_LRU. The resync_LRU tracks the whole operation including
3869          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3870          *  on the fly. */
3871         drbd_rs_cancel_all(mdev);
3872         mdev->rs_total = 0;
3873         mdev->rs_failed = 0;
3874         atomic_set(&mdev->rs_pending_cnt, 0);
3875         wake_up(&mdev->misc_wait);
3876
3877         /* make sure syncer is stopped and w_resume_next_sg queued */
3878         del_timer_sync(&mdev->resync_timer);
3879         resync_timer_fn((unsigned long)mdev);
3880
3881         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3882          * w_make_resync_request etc. which may still be on the worker queue
3883          * to be "canceled" */
3884         drbd_flush_workqueue(mdev);
3885
3886         /* This also does reclaim_net_ee().  If we do this too early, we might
3887          * miss some resync ee and pages.*/
3888         drbd_process_done_ee(mdev);
3889
3890         kfree(mdev->p_uuid);
3891         mdev->p_uuid = NULL;
3892
3893         if (!is_susp(mdev->state))
3894                 tl_clear(mdev);
3895
3896         dev_info(DEV, "Connection closed\n");
3897
3898         drbd_md_sync(mdev);
3899
3900         fp = FP_DONT_CARE;
3901         if (get_ldev(mdev)) {
3902                 fp = mdev->ldev->dc.fencing;
3903                 put_ldev(mdev);
3904         }
3905
3906         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3907                 drbd_try_outdate_peer_async(mdev);
3908
3909         spin_lock_irq(&mdev->req_lock);
3910         os = mdev->state;
3911         if (os.conn >= C_UNCONNECTED) {
3912                 /* Do not restart in case we are C_DISCONNECTING */
3913                 ns = os;
3914                 ns.conn = C_UNCONNECTED;
3915                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3916         }
3917         spin_unlock_irq(&mdev->req_lock);
3918
3919         if (os.conn == C_DISCONNECTING) {
3920                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3921
3922                 crypto_free_hash(mdev->cram_hmac_tfm);
3923                 mdev->cram_hmac_tfm = NULL;
3924
3925                 kfree(mdev->net_conf);
3926                 mdev->net_conf = NULL;
3927                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3928         }
3929
3930         /* serialize with bitmap writeout triggered by the state change,
3931          * if any. */
3932         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3933
3934         /* tcp_close and release of sendpage pages can be deferred.  I don't
3935          * want to use SO_LINGER, because apparently it can be deferred for
3936          * more than 20 seconds (longest time I checked).
3937          *
3938          * Actually we don't care for exactly when the network stack does its
3939          * put_page(), but release our reference on these pages right here.
3940          */
3941         i = drbd_release_ee(mdev, &mdev->net_ee);
3942         if (i)
3943                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3944         i = atomic_read(&mdev->pp_in_use_by_net);
3945         if (i)
3946                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3947         i = atomic_read(&mdev->pp_in_use);
3948         if (i)
3949                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3950
3951         D_ASSERT(list_empty(&mdev->read_ee));
3952         D_ASSERT(list_empty(&mdev->active_ee));
3953         D_ASSERT(list_empty(&mdev->sync_ee));
3954         D_ASSERT(list_empty(&mdev->done_ee));
3955
3956         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3957         atomic_set(&mdev->current_epoch->epoch_size, 0);
3958         D_ASSERT(list_empty(&mdev->current_epoch->list));
3959 }
3960
3961 /*
3962  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3963  * we can agree on is stored in agreed_pro_version.
3964  *
3965  * feature flags and the reserved array should be enough room for future
3966  * enhancements of the handshake protocol, and possible plugins...
3967  *
3968  * for now, they are expected to be zero, but ignored.
3969  */
3970 static int drbd_send_handshake(struct drbd_conf *mdev)
3971 {
3972         /* ASSERT current == mdev->receiver ... */
3973         struct p_handshake *p = &mdev->data.sbuf.handshake;
3974         int ok;
3975
3976         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3977                 dev_err(DEV, "interrupted during initial handshake\n");
3978                 return 0; /* interrupted. not ok. */
3979         }
3980
3981         if (mdev->data.socket == NULL) {
3982                 mutex_unlock(&mdev->data.mutex);
3983                 return 0;
3984         }
3985
3986         memset(p, 0, sizeof(*p));
3987         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3988         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3989         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3990                              (struct p_header80 *)p, sizeof(*p), 0 );
3991         mutex_unlock(&mdev->data.mutex);
3992         return ok;
3993 }
3994
3995 /*
3996  * return values:
3997  *   1 yes, we have a valid connection
3998  *   0 oops, did not work out, please try again
3999  *  -1 peer talks different language,
4000  *     no point in trying again, please go standalone.
4001  */
4002 static int drbd_do_handshake(struct drbd_conf *mdev)
4003 {
4004         /* ASSERT current == mdev->receiver ... */
4005         struct p_handshake *p = &mdev->data.rbuf.handshake;
4006         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4007         unsigned int length;
4008         enum drbd_packets cmd;
4009         int rv;
4010
4011         rv = drbd_send_handshake(mdev);
4012         if (!rv)
4013                 return 0;
4014
4015         rv = drbd_recv_header(mdev, &cmd, &length);
4016         if (!rv)
4017                 return 0;
4018
4019         if (cmd != P_HAND_SHAKE) {
4020                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4021                      cmdname(cmd), cmd);
4022                 return -1;
4023         }
4024
4025         if (length != expect) {
4026                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4027                      expect, length);
4028                 return -1;
4029         }
4030
4031         rv = drbd_recv(mdev, &p->head.payload, expect);
4032
4033         if (rv != expect) {
4034                 if (!signal_pending(current))
4035                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4036                 return 0;
4037         }
4038
4039         p->protocol_min = be32_to_cpu(p->protocol_min);
4040         p->protocol_max = be32_to_cpu(p->protocol_max);
4041         if (p->protocol_max == 0)
4042                 p->protocol_max = p->protocol_min;
4043
4044         if (PRO_VERSION_MAX < p->protocol_min ||
4045             PRO_VERSION_MIN > p->protocol_max)
4046                 goto incompat;
4047
4048         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4049
4050         dev_info(DEV, "Handshake successful: "
4051              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4052
4053         return 1;
4054
4055  incompat:
4056         dev_err(DEV, "incompatible DRBD dialects: "
4057             "I support %d-%d, peer supports %d-%d\n",
4058             PRO_VERSION_MIN, PRO_VERSION_MAX,
4059             p->protocol_min, p->protocol_max);
4060         return -1;
4061 }
4062
4063 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4064 static int drbd_do_auth(struct drbd_conf *mdev)
4065 {
4066         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4067         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4068         return -1;
4069 }
4070 #else
4071 #define CHALLENGE_LEN 64
4072
4073 /* Return value:
4074         1 - auth succeeded,
4075         0 - failed, try again (network error),
4076         -1 - auth failed, don't try again.
4077 */
4078
4079 static int drbd_do_auth(struct drbd_conf *mdev)
4080 {
4081         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4082         struct scatterlist sg;
4083         char *response = NULL;
4084         char *right_response = NULL;
4085         char *peers_ch = NULL;
4086         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4087         unsigned int resp_size;
4088         struct hash_desc desc;
4089         enum drbd_packets cmd;
4090         unsigned int length;
4091         int rv;
4092
4093         desc.tfm = mdev->cram_hmac_tfm;
4094         desc.flags = 0;
4095
4096         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4097                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4098         if (rv) {
4099                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4100                 rv = -1;
4101                 goto fail;
4102         }
4103
4104         get_random_bytes(my_challenge, CHALLENGE_LEN);
4105
4106         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4107         if (!rv)
4108                 goto fail;
4109
4110         rv = drbd_recv_header(mdev, &cmd, &length);
4111         if (!rv)
4112                 goto fail;
4113
4114         if (cmd != P_AUTH_CHALLENGE) {
4115                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4116                     cmdname(cmd), cmd);
4117                 rv = 0;
4118                 goto fail;
4119         }
4120
4121         if (length > CHALLENGE_LEN * 2) {
4122                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4123                 rv = -1;
4124                 goto fail;
4125         }
4126
4127         peers_ch = kmalloc(length, GFP_NOIO);
4128         if (peers_ch == NULL) {
4129                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4130                 rv = -1;
4131                 goto fail;
4132         }
4133
4134         rv = drbd_recv(mdev, peers_ch, length);
4135
4136         if (rv != length) {
4137                 if (!signal_pending(current))
4138                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4139                 rv = 0;
4140                 goto fail;
4141         }
4142
4143         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4144         response = kmalloc(resp_size, GFP_NOIO);
4145         if (response == NULL) {
4146                 dev_err(DEV, "kmalloc of response failed\n");
4147                 rv = -1;
4148                 goto fail;
4149         }
4150
4151         sg_init_table(&sg, 1);
4152         sg_set_buf(&sg, peers_ch, length);
4153
4154         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4155         if (rv) {
4156                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4157                 rv = -1;
4158                 goto fail;
4159         }
4160
4161         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4162         if (!rv)
4163                 goto fail;
4164
4165         rv = drbd_recv_header(mdev, &cmd, &length);
4166         if (!rv)
4167                 goto fail;
4168
4169         if (cmd != P_AUTH_RESPONSE) {
4170                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4171                         cmdname(cmd), cmd);
4172                 rv = 0;
4173                 goto fail;
4174         }
4175
4176         if (length != resp_size) {
4177                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4178                 rv = 0;
4179                 goto fail;
4180         }
4181
4182         rv = drbd_recv(mdev, response , resp_size);
4183
4184         if (rv != resp_size) {
4185                 if (!signal_pending(current))
4186                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4187                 rv = 0;
4188                 goto fail;
4189         }
4190
4191         right_response = kmalloc(resp_size, GFP_NOIO);
4192         if (right_response == NULL) {
4193                 dev_err(DEV, "kmalloc of right_response failed\n");
4194                 rv = -1;
4195                 goto fail;
4196         }
4197
4198         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4199
4200         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4201         if (rv) {
4202                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4203                 rv = -1;
4204                 goto fail;
4205         }
4206
4207         rv = !memcmp(response, right_response, resp_size);
4208
4209         if (rv)
4210                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4211                      resp_size, mdev->net_conf->cram_hmac_alg);
4212         else
4213                 rv = -1;
4214
4215  fail:
4216         kfree(peers_ch);
4217         kfree(response);
4218         kfree(right_response);
4219
4220         return rv;
4221 }
4222 #endif
4223
4224 int drbdd_init(struct drbd_thread *thi)
4225 {
4226         struct drbd_conf *mdev = thi->mdev;
4227         unsigned int minor = mdev_to_minor(mdev);
4228         int h;
4229
4230         sprintf(current->comm, "drbd%d_receiver", minor);
4231
4232         dev_info(DEV, "receiver (re)started\n");
4233
4234         do {
4235                 h = drbd_connect(mdev);
4236                 if (h == 0) {
4237                         drbd_disconnect(mdev);
4238                         schedule_timeout_interruptible(HZ);
4239                 }
4240                 if (h == -1) {
4241                         dev_warn(DEV, "Discarding network configuration.\n");
4242                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4243                 }
4244         } while (h == 0);
4245
4246         if (h > 0) {
4247                 if (get_net_conf(mdev)) {
4248                         drbdd(mdev);
4249                         put_net_conf(mdev);
4250                 }
4251         }
4252
4253         drbd_disconnect(mdev);
4254
4255         dev_info(DEV, "receiver terminated\n");
4256         return 0;
4257 }
4258
4259 /* ********* acknowledge sender ******** */
4260
4261 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4262 {
4263         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4264
4265         int retcode = be32_to_cpu(p->retcode);
4266
4267         if (retcode >= SS_SUCCESS) {
4268                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4269         } else {
4270                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4271                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4272                     drbd_set_st_err_str(retcode), retcode);
4273         }
4274         wake_up(&mdev->state_wait);
4275
4276         return true;
4277 }
4278
4279 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4280 {
4281         return drbd_send_ping_ack(mdev);
4282
4283 }
4284
4285 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4286 {
4287         /* restore idle timeout */
4288         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4289         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4290                 wake_up(&mdev->misc_wait);
4291
4292         return true;
4293 }
4294
4295 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4296 {
4297         struct p_block_ack *p = (struct p_block_ack *)h;
4298         sector_t sector = be64_to_cpu(p->sector);
4299         int blksize = be32_to_cpu(p->blksize);
4300
4301         D_ASSERT(mdev->agreed_pro_version >= 89);
4302
4303         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4304
4305         if (get_ldev(mdev)) {
4306                 drbd_rs_complete_io(mdev, sector);
4307                 drbd_set_in_sync(mdev, sector, blksize);
4308                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4309                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4310                 put_ldev(mdev);
4311         }
4312         dec_rs_pending(mdev);
4313         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4314
4315         return true;
4316 }
4317
4318 /* when we receive the ACK for a write request,
4319  * verify that we actually know about it */
4320 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4321         u64 id, sector_t sector)
4322 {
4323         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4324         struct hlist_node *n;
4325         struct drbd_request *req;
4326
4327         hlist_for_each_entry(req, n, slot, collision) {
4328                 if ((unsigned long)req == (unsigned long)id) {
4329                         if (req->sector != sector) {
4330                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4331                                     "wrong sector (%llus versus %llus)\n", req,
4332                                     (unsigned long long)req->sector,
4333                                     (unsigned long long)sector);
4334                                 break;
4335                         }
4336                         return req;
4337                 }
4338         }
4339         return NULL;
4340 }
4341
4342 typedef struct drbd_request *(req_validator_fn)
4343         (struct drbd_conf *mdev, u64 id, sector_t sector);
4344
4345 static int validate_req_change_req_state(struct drbd_conf *mdev,
4346         u64 id, sector_t sector, req_validator_fn validator,
4347         const char *func, enum drbd_req_event what)
4348 {
4349         struct drbd_request *req;
4350         struct bio_and_error m;
4351
4352         spin_lock_irq(&mdev->req_lock);
4353         req = validator(mdev, id, sector);
4354         if (unlikely(!req)) {
4355                 spin_unlock_irq(&mdev->req_lock);
4356
4357                 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4358                         (void *)(unsigned long)id, (unsigned long long)sector);
4359                 return false;
4360         }
4361         __req_mod(req, what, &m);
4362         spin_unlock_irq(&mdev->req_lock);
4363
4364         if (m.bio)
4365                 complete_master_bio(mdev, &m);
4366         return true;
4367 }
4368
4369 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4370 {
4371         struct p_block_ack *p = (struct p_block_ack *)h;
4372         sector_t sector = be64_to_cpu(p->sector);
4373         int blksize = be32_to_cpu(p->blksize);
4374         enum drbd_req_event what;
4375
4376         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377
4378         if (is_syncer_block_id(p->block_id)) {
4379                 drbd_set_in_sync(mdev, sector, blksize);
4380                 dec_rs_pending(mdev);
4381                 return true;
4382         }
4383         switch (be16_to_cpu(h->command)) {
4384         case P_RS_WRITE_ACK:
4385                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4386                 what = write_acked_by_peer_and_sis;
4387                 break;
4388         case P_WRITE_ACK:
4389                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4390                 what = write_acked_by_peer;
4391                 break;
4392         case P_RECV_ACK:
4393                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4394                 what = recv_acked_by_peer;
4395                 break;
4396         case P_DISCARD_ACK:
4397                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4398                 what = conflict_discarded_by_peer;
4399                 break;
4400         default:
4401                 D_ASSERT(0);
4402                 return false;
4403         }
4404
4405         return validate_req_change_req_state(mdev, p->block_id, sector,
4406                 _ack_id_to_req, __func__ , what);
4407 }
4408
4409 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4410 {
4411         struct p_block_ack *p = (struct p_block_ack *)h;
4412         sector_t sector = be64_to_cpu(p->sector);
4413         int size = be32_to_cpu(p->blksize);
4414         struct drbd_request *req;
4415         struct bio_and_error m;
4416
4417         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4418
4419         if (is_syncer_block_id(p->block_id)) {
4420                 dec_rs_pending(mdev);
4421                 drbd_rs_failed_io(mdev, sector, size);
4422                 return true;
4423         }
4424
4425         spin_lock_irq(&mdev->req_lock);
4426         req = _ack_id_to_req(mdev, p->block_id, sector);
4427         if (!req) {
4428                 spin_unlock_irq(&mdev->req_lock);
4429                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4430                     mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4431                         /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4432                            The master bio might already be completed, therefore the
4433                            request is no longer in the collision hash.
4434                            => Do not try to validate block_id as request. */
4435                         /* In Protocol B we might already have got a P_RECV_ACK
4436                            but then get a P_NEG_ACK after wards. */
4437                         drbd_set_out_of_sync(mdev, sector, size);
4438                         return true;
4439                 } else {
4440                         dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4441                                 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4442                         return false;
4443                 }
4444         }
4445         __req_mod(req, neg_acked, &m);
4446         spin_unlock_irq(&mdev->req_lock);
4447
4448         if (m.bio)
4449                 complete_master_bio(mdev, &m);
4450         return true;
4451 }
4452
4453 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4454 {
4455         struct p_block_ack *p = (struct p_block_ack *)h;
4456         sector_t sector = be64_to_cpu(p->sector);
4457
4458         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4459         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4460             (unsigned long long)sector, be32_to_cpu(p->blksize));
4461
4462         return validate_req_change_req_state(mdev, p->block_id, sector,
4463                 _ar_id_to_req, __func__ , neg_acked);
4464 }
4465
4466 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4467 {
4468         sector_t sector;
4469         int size;
4470         struct p_block_ack *p = (struct p_block_ack *)h;
4471
4472         sector = be64_to_cpu(p->sector);
4473         size = be32_to_cpu(p->blksize);
4474
4475         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4476
4477         dec_rs_pending(mdev);
4478
4479         if (get_ldev_if_state(mdev, D_FAILED)) {
4480                 drbd_rs_complete_io(mdev, sector);
4481                 switch (be16_to_cpu(h->command)) {
4482                 case P_NEG_RS_DREPLY:
4483                         drbd_rs_failed_io(mdev, sector, size);
4484                 case P_RS_CANCEL:
4485                         break;
4486                 default:
4487                         D_ASSERT(0);
4488                         put_ldev(mdev);
4489                         return false;
4490                 }
4491                 put_ldev(mdev);
4492         }
4493
4494         return true;
4495 }
4496
4497 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4498 {
4499         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4500
4501         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4502
4503         if (mdev->state.conn == C_AHEAD &&
4504             atomic_read(&mdev->ap_in_flight) == 0 &&
4505             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
4506                 mdev->start_resync_timer.expires = jiffies + HZ;
4507                 add_timer(&mdev->start_resync_timer);
4508         }
4509
4510         return true;
4511 }
4512
4513 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4514 {
4515         struct p_block_ack *p = (struct p_block_ack *)h;
4516         struct drbd_work *w;
4517         sector_t sector;
4518         int size;
4519
4520         sector = be64_to_cpu(p->sector);
4521         size = be32_to_cpu(p->blksize);
4522
4523         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4524
4525         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4526                 drbd_ov_oos_found(mdev, sector, size);
4527         else
4528                 ov_oos_print(mdev);
4529
4530         if (!get_ldev(mdev))
4531                 return true;
4532
4533         drbd_rs_complete_io(mdev, sector);
4534         dec_rs_pending(mdev);
4535
4536         --mdev->ov_left;
4537
4538         /* let's advance progress step marks only for every other megabyte */
4539         if ((mdev->ov_left & 0x200) == 0x200)
4540                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4541
4542         if (mdev->ov_left == 0) {
4543                 w = kmalloc(sizeof(*w), GFP_NOIO);
4544                 if (w) {
4545                         w->cb = w_ov_finished;
4546                         drbd_queue_work_front(&mdev->data.work, w);
4547                 } else {
4548                         dev_err(DEV, "kmalloc(w) failed.");
4549                         ov_oos_print(mdev);
4550                         drbd_resync_finished(mdev);
4551                 }
4552         }
4553         put_ldev(mdev);
4554         return true;
4555 }
4556
4557 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4558 {
4559         return true;
4560 }
4561
4562 struct asender_cmd {
4563         size_t pkt_size;
4564         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4565 };
4566
4567 static struct asender_cmd *get_asender_cmd(int cmd)
4568 {
4569         static struct asender_cmd asender_tbl[] = {
4570                 /* anything missing from this table is in
4571                  * the drbd_cmd_handler (drbd_default_handler) table,
4572                  * see the beginning of drbdd() */
4573         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4574         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4575         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4576         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4577         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4578         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4579         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4580         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4581         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4582         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4583         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4584         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4585         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4586         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4587         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4588         [P_MAX_CMD]         = { 0, NULL },
4589         };
4590         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4591                 return NULL;
4592         return &asender_tbl[cmd];
4593 }
4594
4595 int drbd_asender(struct drbd_thread *thi)
4596 {
4597         struct drbd_conf *mdev = thi->mdev;
4598         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4599         struct asender_cmd *cmd = NULL;
4600
4601         int rv, len;
4602         void *buf    = h;
4603         int received = 0;
4604         int expect   = sizeof(struct p_header80);
4605         int empty;
4606         int ping_timeout_active = 0;
4607
4608         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4609
4610         current->policy = SCHED_RR;  /* Make this a realtime task! */
4611         current->rt_priority = 2;    /* more important than all other tasks */
4612
4613         while (get_t_state(thi) == Running) {
4614                 drbd_thread_current_set_cpu(mdev);
4615                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4616                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4617                         mdev->meta.socket->sk->sk_rcvtimeo =
4618                                 mdev->net_conf->ping_timeo*HZ/10;
4619                         ping_timeout_active = 1;
4620                 }
4621
4622                 /* conditionally cork;
4623                  * it may hurt latency if we cork without much to send */
4624                 if (!mdev->net_conf->no_cork &&
4625                         3 < atomic_read(&mdev->unacked_cnt))
4626                         drbd_tcp_cork(mdev->meta.socket);
4627                 while (1) {
4628                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4629                         flush_signals(current);
4630                         if (!drbd_process_done_ee(mdev))
4631                                 goto reconnect;
4632                         /* to avoid race with newly queued ACKs */
4633                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4634                         spin_lock_irq(&mdev->req_lock);
4635                         empty = list_empty(&mdev->done_ee);
4636                         spin_unlock_irq(&mdev->req_lock);
4637                         /* new ack may have been queued right here,
4638                          * but then there is also a signal pending,
4639                          * and we start over... */
4640                         if (empty)
4641                                 break;
4642                 }
4643                 /* but unconditionally uncork unless disabled */
4644                 if (!mdev->net_conf->no_cork)
4645                         drbd_tcp_uncork(mdev->meta.socket);
4646
4647                 /* short circuit, recv_msg would return EINTR anyways. */
4648                 if (signal_pending(current))
4649                         continue;
4650
4651                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4652                                      buf, expect-received, 0);
4653                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4654
4655                 flush_signals(current);
4656
4657                 /* Note:
4658                  * -EINTR        (on meta) we got a signal
4659                  * -EAGAIN       (on meta) rcvtimeo expired
4660                  * -ECONNRESET   other side closed the connection
4661                  * -ERESTARTSYS  (on data) we got a signal
4662                  * rv <  0       other than above: unexpected error!
4663                  * rv == expected: full header or command
4664                  * rv <  expected: "woken" by signal during receive
4665                  * rv == 0       : "connection shut down by peer"
4666                  */
4667                 if (likely(rv > 0)) {
4668                         received += rv;
4669                         buf      += rv;
4670                 } else if (rv == 0) {
4671                         dev_err(DEV, "meta connection shut down by peer.\n");
4672                         goto reconnect;
4673                 } else if (rv == -EAGAIN) {
4674                         /* If the data socket received something meanwhile,
4675                          * that is good enough: peer is still alive. */
4676                         if (time_after(mdev->last_received,
4677                                 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4678                                 continue;
4679                         if (ping_timeout_active) {
4680                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4681                                 goto reconnect;
4682                         }
4683                         set_bit(SEND_PING, &mdev->flags);
4684                         continue;
4685                 } else if (rv == -EINTR) {
4686                         continue;
4687                 } else {
4688                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4689                         goto reconnect;
4690                 }
4691
4692                 if (received == expect && cmd == NULL) {
4693                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4694                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4695                                     be32_to_cpu(h->magic),
4696                                     be16_to_cpu(h->command),
4697                                     be16_to_cpu(h->length));
4698                                 goto reconnect;
4699                         }
4700                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4701                         len = be16_to_cpu(h->length);
4702                         if (unlikely(cmd == NULL)) {
4703                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4704                                     be32_to_cpu(h->magic),
4705                                     be16_to_cpu(h->command),
4706                                     be16_to_cpu(h->length));
4707                                 goto disconnect;
4708                         }
4709                         expect = cmd->pkt_size;
4710                         ERR_IF(len != expect-sizeof(struct p_header80))
4711                                 goto reconnect;
4712                 }
4713                 if (received == expect) {
4714                         mdev->last_received = jiffies;
4715                         D_ASSERT(cmd != NULL);
4716                         if (!cmd->process(mdev, h))
4717                                 goto reconnect;
4718
4719                         /* the idle_timeout (ping-int)
4720                          * has been restored in got_PingAck() */
4721                         if (cmd == get_asender_cmd(P_PING_ACK))
4722                                 ping_timeout_active = 0;
4723
4724                         buf      = h;
4725                         received = 0;
4726                         expect   = sizeof(struct p_header80);
4727                         cmd      = NULL;
4728                 }
4729         }
4730
4731         if (0) {
4732 reconnect:
4733                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4734                 drbd_md_sync(mdev);
4735         }
4736         if (0) {
4737 disconnect:
4738                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4739                 drbd_md_sync(mdev);
4740         }
4741         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4742
4743         D_ASSERT(mdev->state.conn < C_CONNECTED);
4744         dev_info(DEV, "asender terminated\n");
4745
4746         return 0;
4747 }