]> Pileus Git - ~andy/linux/blob - drivers/block/drbd/drbd_worker.c
377f27bbba1782e678d43737803025e52bb1623f
[~andy/linux] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the resync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70         struct drbd_conf *mdev;
71
72         md_io = (struct drbd_md_io *)bio->bi_private;
73         mdev = container_of(md_io, struct drbd_conf, md_io);
74
75         md_io->error = error;
76
77         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
78          * to timeout on the lower level device, and eventually detach from it.
79          * If this io completion runs after that timeout expired, this
80          * drbd_md_put_buffer() may allow us to finally try and re-attach.
81          * During normal operation, this only puts that extra reference
82          * down to 1 again.
83          * Make sure we first drop the reference, and only then signal
84          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
85          * next drbd_md_sync_page_io(), that we trigger the
86          * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
87          */
88         drbd_md_put_buffer(mdev);
89         md_io->done = 1;
90         wake_up(&mdev->misc_wait);
91         bio_put(bio);
92         put_ldev(mdev);
93 }
94
95 /* reads on behalf of the partner,
96  * "submitted" by the receiver
97  */
98 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
99 {
100         unsigned long flags = 0;
101         struct drbd_conf *mdev = peer_req->w.mdev;
102
103         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
104         mdev->read_cnt += peer_req->i.size >> 9;
105         list_del(&peer_req->w.list);
106         if (list_empty(&mdev->read_ee))
107                 wake_up(&mdev->ee_wait);
108         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109                 __drbd_chk_io_error(mdev, DRBD_IO_ERROR);
110         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
111
112         drbd_queue_work(&mdev->tconn->sender_work, &peer_req->w);
113         put_ldev(mdev);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120         unsigned long flags = 0;
121         struct drbd_conf *mdev = peer_req->w.mdev;
122         struct drbd_interval i;
123         int do_wake;
124         u64 block_id;
125         int do_al_complete_io;
126
127         /* after we moved peer_req to done_ee,
128          * we may no longer access it,
129          * it may be freed/reused already!
130          * (as soon as we release the req_lock) */
131         i = peer_req->i;
132         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
133         block_id = peer_req->block_id;
134
135         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
136         mdev->writ_cnt += peer_req->i.size >> 9;
137         list_move_tail(&peer_req->w.list, &mdev->done_ee);
138
139         /*
140          * Do not remove from the write_requests tree here: we did not send the
141          * Ack yet and did not wake possibly waiting conflicting requests.
142          * Removed from the tree from "drbd_process_done_ee" within the
143          * appropriate w.cb (e_end_block/e_end_resync_block) or from
144          * _drbd_clear_done_ee.
145          */
146
147         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
148
149         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
150                 __drbd_chk_io_error(mdev, DRBD_IO_ERROR);
151         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
152
153         if (block_id == ID_SYNCER)
154                 drbd_rs_complete_io(mdev, i.sector);
155
156         if (do_wake)
157                 wake_up(&mdev->ee_wait);
158
159         if (do_al_complete_io)
160                 drbd_al_complete_io(mdev, &i);
161
162         wake_asender(mdev->tconn);
163         put_ldev(mdev);
164 }
165
166 /* writes on behalf of the partner, or resync writes,
167  * "submitted" by the receiver.
168  */
169 void drbd_peer_request_endio(struct bio *bio, int error)
170 {
171         struct drbd_peer_request *peer_req = bio->bi_private;
172         struct drbd_conf *mdev = peer_req->w.mdev;
173         int uptodate = bio_flagged(bio, BIO_UPTODATE);
174         int is_write = bio_data_dir(bio) == WRITE;
175
176         if (error && __ratelimit(&drbd_ratelimit_state))
177                 dev_warn(DEV, "%s: error=%d s=%llus\n",
178                                 is_write ? "write" : "read", error,
179                                 (unsigned long long)peer_req->i.sector);
180         if (!error && !uptodate) {
181                 if (__ratelimit(&drbd_ratelimit_state))
182                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
183                                         is_write ? "write" : "read",
184                                         (unsigned long long)peer_req->i.sector);
185                 /* strange behavior of some lower level drivers...
186                  * fail the request by clearing the uptodate flag,
187                  * but do not return any error?! */
188                 error = -EIO;
189         }
190
191         if (error)
192                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
193
194         bio_put(bio); /* no need for the bio anymore */
195         if (atomic_dec_and_test(&peer_req->pending_bios)) {
196                 if (is_write)
197                         drbd_endio_write_sec_final(peer_req);
198                 else
199                         drbd_endio_read_sec_final(peer_req);
200         }
201 }
202
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio, int error)
206 {
207         unsigned long flags;
208         struct drbd_request *req = bio->bi_private;
209         struct drbd_conf *mdev = req->w.mdev;
210         struct bio_and_error m;
211         enum drbd_req_event what;
212         int uptodate = bio_flagged(bio, BIO_UPTODATE);
213
214         if (!error && !uptodate) {
215                 dev_warn(DEV, "p %s: setting error to -EIO\n",
216                          bio_data_dir(bio) == WRITE ? "write" : "read");
217                 /* strange behavior of some lower level drivers...
218                  * fail the request by clearing the uptodate flag,
219                  * but do not return any error?! */
220                 error = -EIO;
221         }
222
223
224         /* If this request was aborted locally before,
225          * but now was completed "successfully",
226          * chances are that this caused arbitrary data corruption.
227          *
228          * "aborting" requests, or force-detaching the disk, is intended for
229          * completely blocked/hung local backing devices which do no longer
230          * complete requests at all, not even do error completions.  In this
231          * situation, usually a hard-reset and failover is the only way out.
232          *
233          * By "aborting", basically faking a local error-completion,
234          * we allow for a more graceful swichover by cleanly migrating services.
235          * Still the affected node has to be rebooted "soon".
236          *
237          * By completing these requests, we allow the upper layers to re-use
238          * the associated data pages.
239          *
240          * If later the local backing device "recovers", and now DMAs some data
241          * from disk into the original request pages, in the best case it will
242          * just put random data into unused pages; but typically it will corrupt
243          * meanwhile completely unrelated data, causing all sorts of damage.
244          *
245          * Which means delayed successful completion,
246          * especially for READ requests,
247          * is a reason to panic().
248          *
249          * We assume that a delayed *error* completion is OK,
250          * though we still will complain noisily about it.
251          */
252         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
253                 if (__ratelimit(&drbd_ratelimit_state))
254                         dev_emerg(DEV, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
255
256                 if (!error)
257                         panic("possible random memory corruption caused by delayed completion of aborted local request\n");
258         }
259
260         /* to avoid recursion in __req_mod */
261         if (unlikely(error)) {
262                 what = (bio_data_dir(bio) == WRITE)
263                         ? WRITE_COMPLETED_WITH_ERROR
264                         : (bio_rw(bio) == READ)
265                           ? READ_COMPLETED_WITH_ERROR
266                           : READ_AHEAD_COMPLETED_WITH_ERROR;
267         } else
268                 what = COMPLETED_OK;
269
270         bio_put(req->private_bio);
271         req->private_bio = ERR_PTR(error);
272
273         /* not req_mod(), we need irqsave here! */
274         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
275         __req_mod(req, what, &m);
276         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
277         put_ldev(mdev);
278
279         if (m.bio)
280                 complete_master_bio(mdev, &m);
281 }
282
283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
284                   struct drbd_peer_request *peer_req, void *digest)
285 {
286         struct hash_desc desc;
287         struct scatterlist sg;
288         struct page *page = peer_req->pages;
289         struct page *tmp;
290         unsigned len;
291
292         desc.tfm = tfm;
293         desc.flags = 0;
294
295         sg_init_table(&sg, 1);
296         crypto_hash_init(&desc);
297
298         while ((tmp = page_chain_next(page))) {
299                 /* all but the last page will be fully used */
300                 sg_set_page(&sg, page, PAGE_SIZE, 0);
301                 crypto_hash_update(&desc, &sg, sg.length);
302                 page = tmp;
303         }
304         /* and now the last, possibly only partially used page */
305         len = peer_req->i.size & (PAGE_SIZE - 1);
306         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
307         crypto_hash_update(&desc, &sg, sg.length);
308         crypto_hash_final(&desc, digest);
309 }
310
311 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
312 {
313         struct hash_desc desc;
314         struct scatterlist sg;
315         struct bio_vec *bvec;
316         int i;
317
318         desc.tfm = tfm;
319         desc.flags = 0;
320
321         sg_init_table(&sg, 1);
322         crypto_hash_init(&desc);
323
324         bio_for_each_segment(bvec, bio, i) {
325                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
326                 crypto_hash_update(&desc, &sg, sg.length);
327         }
328         crypto_hash_final(&desc, digest);
329 }
330
331 /* MAYBE merge common code with w_e_end_ov_req */
332 static int w_e_send_csum(struct drbd_work *w, int cancel)
333 {
334         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
335         struct drbd_conf *mdev = w->mdev;
336         int digest_size;
337         void *digest;
338         int err = 0;
339
340         if (unlikely(cancel))
341                 goto out;
342
343         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
344                 goto out;
345
346         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
347         digest = kmalloc(digest_size, GFP_NOIO);
348         if (digest) {
349                 sector_t sector = peer_req->i.sector;
350                 unsigned int size = peer_req->i.size;
351                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
352                 /* Free peer_req and pages before send.
353                  * In case we block on congestion, we could otherwise run into
354                  * some distributed deadlock, if the other side blocks on
355                  * congestion as well, because our receiver blocks in
356                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
357                 drbd_free_peer_req(mdev, peer_req);
358                 peer_req = NULL;
359                 inc_rs_pending(mdev);
360                 err = drbd_send_drequest_csum(mdev, sector, size,
361                                               digest, digest_size,
362                                               P_CSUM_RS_REQUEST);
363                 kfree(digest);
364         } else {
365                 dev_err(DEV, "kmalloc() of digest failed.\n");
366                 err = -ENOMEM;
367         }
368
369 out:
370         if (peer_req)
371                 drbd_free_peer_req(mdev, peer_req);
372
373         if (unlikely(err))
374                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
375         return err;
376 }
377
378 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
379
380 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
381 {
382         struct drbd_peer_request *peer_req;
383
384         if (!get_ldev(mdev))
385                 return -EIO;
386
387         if (drbd_rs_should_slow_down(mdev, sector))
388                 goto defer;
389
390         /* GFP_TRY, because if there is no memory available right now, this may
391          * be rescheduled for later. It is "only" background resync, after all. */
392         peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
393                                        size, GFP_TRY);
394         if (!peer_req)
395                 goto defer;
396
397         peer_req->w.cb = w_e_send_csum;
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_add(&peer_req->w.list, &mdev->read_ee);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         atomic_add(size >> 9, &mdev->rs_sect_ev);
403         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
404                 return 0;
405
406         /* If it failed because of ENOMEM, retry should help.  If it failed
407          * because bio_add_page failed (probably broken lower level driver),
408          * retry may or may not help.
409          * If it does not, you may need to force disconnect. */
410         spin_lock_irq(&mdev->tconn->req_lock);
411         list_del(&peer_req->w.list);
412         spin_unlock_irq(&mdev->tconn->req_lock);
413
414         drbd_free_peer_req(mdev, peer_req);
415 defer:
416         put_ldev(mdev);
417         return -EAGAIN;
418 }
419
420 int w_resync_timer(struct drbd_work *w, int cancel)
421 {
422         struct drbd_conf *mdev = w->mdev;
423         switch (mdev->state.conn) {
424         case C_VERIFY_S:
425                 w_make_ov_request(w, cancel);
426                 break;
427         case C_SYNC_TARGET:
428                 w_make_resync_request(w, cancel);
429                 break;
430         }
431
432         return 0;
433 }
434
435 void resync_timer_fn(unsigned long data)
436 {
437         struct drbd_conf *mdev = (struct drbd_conf *) data;
438
439         if (list_empty(&mdev->resync_work.list))
440                 drbd_queue_work(&mdev->tconn->sender_work, &mdev->resync_work);
441 }
442
443 static void fifo_set(struct fifo_buffer *fb, int value)
444 {
445         int i;
446
447         for (i = 0; i < fb->size; i++)
448                 fb->values[i] = value;
449 }
450
451 static int fifo_push(struct fifo_buffer *fb, int value)
452 {
453         int ov;
454
455         ov = fb->values[fb->head_index];
456         fb->values[fb->head_index++] = value;
457
458         if (fb->head_index >= fb->size)
459                 fb->head_index = 0;
460
461         return ov;
462 }
463
464 static void fifo_add_val(struct fifo_buffer *fb, int value)
465 {
466         int i;
467
468         for (i = 0; i < fb->size; i++)
469                 fb->values[i] += value;
470 }
471
472 struct fifo_buffer *fifo_alloc(int fifo_size)
473 {
474         struct fifo_buffer *fb;
475
476         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_KERNEL);
477         if (!fb)
478                 return NULL;
479
480         fb->head_index = 0;
481         fb->size = fifo_size;
482         fb->total = 0;
483
484         return fb;
485 }
486
487 static int drbd_rs_controller(struct drbd_conf *mdev)
488 {
489         struct disk_conf *dc;
490         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
491         unsigned int want;     /* The number of sectors we want in the proxy */
492         int req_sect; /* Number of sectors to request in this turn */
493         int correction; /* Number of sectors more we need in the proxy*/
494         int cps; /* correction per invocation of drbd_rs_controller() */
495         int steps; /* Number of time steps to plan ahead */
496         int curr_corr;
497         int max_sect;
498         struct fifo_buffer *plan;
499
500         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
501         mdev->rs_in_flight -= sect_in;
502
503         dc = rcu_dereference(mdev->ldev->disk_conf);
504         plan = rcu_dereference(mdev->rs_plan_s);
505
506         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
507
508         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
509                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
510         } else { /* normal path */
511                 want = dc->c_fill_target ? dc->c_fill_target :
512                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
513         }
514
515         correction = want - mdev->rs_in_flight - plan->total;
516
517         /* Plan ahead */
518         cps = correction / steps;
519         fifo_add_val(plan, cps);
520         plan->total += cps * steps;
521
522         /* What we do in this step */
523         curr_corr = fifo_push(plan, 0);
524         plan->total -= curr_corr;
525
526         req_sect = sect_in + curr_corr;
527         if (req_sect < 0)
528                 req_sect = 0;
529
530         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
531         if (req_sect > max_sect)
532                 req_sect = max_sect;
533
534         /*
535         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
536                  sect_in, mdev->rs_in_flight, want, correction,
537                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
538         */
539
540         return req_sect;
541 }
542
543 static int drbd_rs_number_requests(struct drbd_conf *mdev)
544 {
545         int number;
546
547         rcu_read_lock();
548         if (rcu_dereference(mdev->rs_plan_s)->size) {
549                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
550                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
551         } else {
552                 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
553                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
554         }
555         rcu_read_unlock();
556
557         /* ignore the amount of pending requests, the resync controller should
558          * throttle down to incoming reply rate soon enough anyways. */
559         return number;
560 }
561
562 int w_make_resync_request(struct drbd_work *w, int cancel)
563 {
564         struct drbd_conf *mdev = w->mdev;
565         unsigned long bit;
566         sector_t sector;
567         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
568         int max_bio_size;
569         int number, rollback_i, size;
570         int align, queued, sndbuf;
571         int i = 0;
572
573         if (unlikely(cancel))
574                 return 0;
575
576         if (mdev->rs_total == 0) {
577                 /* empty resync? */
578                 drbd_resync_finished(mdev);
579                 return 0;
580         }
581
582         if (!get_ldev(mdev)) {
583                 /* Since we only need to access mdev->rsync a
584                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
585                    to continue resync with a broken disk makes no sense at
586                    all */
587                 dev_err(DEV, "Disk broke down during resync!\n");
588                 return 0;
589         }
590
591         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
592         number = drbd_rs_number_requests(mdev);
593         if (number == 0)
594                 goto requeue;
595
596         for (i = 0; i < number; i++) {
597                 /* Stop generating RS requests, when half of the send buffer is filled */
598                 mutex_lock(&mdev->tconn->data.mutex);
599                 if (mdev->tconn->data.socket) {
600                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
601                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
602                 } else {
603                         queued = 1;
604                         sndbuf = 0;
605                 }
606                 mutex_unlock(&mdev->tconn->data.mutex);
607                 if (queued > sndbuf / 2)
608                         goto requeue;
609
610 next_sector:
611                 size = BM_BLOCK_SIZE;
612                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
613
614                 if (bit == DRBD_END_OF_BITMAP) {
615                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
616                         put_ldev(mdev);
617                         return 0;
618                 }
619
620                 sector = BM_BIT_TO_SECT(bit);
621
622                 if (drbd_rs_should_slow_down(mdev, sector) ||
623                     drbd_try_rs_begin_io(mdev, sector)) {
624                         mdev->bm_resync_fo = bit;
625                         goto requeue;
626                 }
627                 mdev->bm_resync_fo = bit + 1;
628
629                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
630                         drbd_rs_complete_io(mdev, sector);
631                         goto next_sector;
632                 }
633
634 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
635                 /* try to find some adjacent bits.
636                  * we stop if we have already the maximum req size.
637                  *
638                  * Additionally always align bigger requests, in order to
639                  * be prepared for all stripe sizes of software RAIDs.
640                  */
641                 align = 1;
642                 rollback_i = i;
643                 for (;;) {
644                         if (size + BM_BLOCK_SIZE > max_bio_size)
645                                 break;
646
647                         /* Be always aligned */
648                         if (sector & ((1<<(align+3))-1))
649                                 break;
650
651                         /* do not cross extent boundaries */
652                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
653                                 break;
654                         /* now, is it actually dirty, after all?
655                          * caution, drbd_bm_test_bit is tri-state for some
656                          * obscure reason; ( b == 0 ) would get the out-of-band
657                          * only accidentally right because of the "oddly sized"
658                          * adjustment below */
659                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
660                                 break;
661                         bit++;
662                         size += BM_BLOCK_SIZE;
663                         if ((BM_BLOCK_SIZE << align) <= size)
664                                 align++;
665                         i++;
666                 }
667                 /* if we merged some,
668                  * reset the offset to start the next drbd_bm_find_next from */
669                 if (size > BM_BLOCK_SIZE)
670                         mdev->bm_resync_fo = bit + 1;
671 #endif
672
673                 /* adjust very last sectors, in case we are oddly sized */
674                 if (sector + (size>>9) > capacity)
675                         size = (capacity-sector)<<9;
676                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
677                         switch (read_for_csum(mdev, sector, size)) {
678                         case -EIO: /* Disk failure */
679                                 put_ldev(mdev);
680                                 return -EIO;
681                         case -EAGAIN: /* allocation failed, or ldev busy */
682                                 drbd_rs_complete_io(mdev, sector);
683                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
684                                 i = rollback_i;
685                                 goto requeue;
686                         case 0:
687                                 /* everything ok */
688                                 break;
689                         default:
690                                 BUG();
691                         }
692                 } else {
693                         int err;
694
695                         inc_rs_pending(mdev);
696                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
697                                                  sector, size, ID_SYNCER);
698                         if (err) {
699                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
700                                 dec_rs_pending(mdev);
701                                 put_ldev(mdev);
702                                 return err;
703                         }
704                 }
705         }
706
707         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
708                 /* last syncer _request_ was sent,
709                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
710                  * next sync group will resume), as soon as we receive the last
711                  * resync data block, and the last bit is cleared.
712                  * until then resync "work" is "inactive" ...
713                  */
714                 put_ldev(mdev);
715                 return 0;
716         }
717
718  requeue:
719         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
720         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
721         put_ldev(mdev);
722         return 0;
723 }
724
725 static int w_make_ov_request(struct drbd_work *w, int cancel)
726 {
727         struct drbd_conf *mdev = w->mdev;
728         int number, i, size;
729         sector_t sector;
730         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
731         bool stop_sector_reached = false;
732
733         if (unlikely(cancel))
734                 return 1;
735
736         number = drbd_rs_number_requests(mdev);
737
738         sector = mdev->ov_position;
739         for (i = 0; i < number; i++) {
740                 if (sector >= capacity)
741                         return 1;
742
743                 /* We check for "finished" only in the reply path:
744                  * w_e_end_ov_reply().
745                  * We need to send at least one request out. */
746                 stop_sector_reached = i > 0
747                         && verify_can_do_stop_sector(mdev)
748                         && sector >= mdev->ov_stop_sector;
749                 if (stop_sector_reached)
750                         break;
751
752                 size = BM_BLOCK_SIZE;
753
754                 if (drbd_rs_should_slow_down(mdev, sector) ||
755                     drbd_try_rs_begin_io(mdev, sector)) {
756                         mdev->ov_position = sector;
757                         goto requeue;
758                 }
759
760                 if (sector + (size>>9) > capacity)
761                         size = (capacity-sector)<<9;
762
763                 inc_rs_pending(mdev);
764                 if (drbd_send_ov_request(mdev, sector, size)) {
765                         dec_rs_pending(mdev);
766                         return 0;
767                 }
768                 sector += BM_SECT_PER_BIT;
769         }
770         mdev->ov_position = sector;
771
772  requeue:
773         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
774         if (i == 0 || !stop_sector_reached)
775                 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
776         return 1;
777 }
778
779 int w_ov_finished(struct drbd_work *w, int cancel)
780 {
781         struct drbd_conf *mdev = w->mdev;
782         kfree(w);
783         ov_out_of_sync_print(mdev);
784         drbd_resync_finished(mdev);
785
786         return 0;
787 }
788
789 static int w_resync_finished(struct drbd_work *w, int cancel)
790 {
791         struct drbd_conf *mdev = w->mdev;
792         kfree(w);
793
794         drbd_resync_finished(mdev);
795
796         return 0;
797 }
798
799 static void ping_peer(struct drbd_conf *mdev)
800 {
801         struct drbd_tconn *tconn = mdev->tconn;
802
803         clear_bit(GOT_PING_ACK, &tconn->flags);
804         request_ping(tconn);
805         wait_event(tconn->ping_wait,
806                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
807 }
808
809 int drbd_resync_finished(struct drbd_conf *mdev)
810 {
811         unsigned long db, dt, dbdt;
812         unsigned long n_oos;
813         union drbd_state os, ns;
814         struct drbd_work *w;
815         char *khelper_cmd = NULL;
816         int verify_done = 0;
817
818         /* Remove all elements from the resync LRU. Since future actions
819          * might set bits in the (main) bitmap, then the entries in the
820          * resync LRU would be wrong. */
821         if (drbd_rs_del_all(mdev)) {
822                 /* In case this is not possible now, most probably because
823                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
824                  * queue (or even the read operations for those packets
825                  * is not finished by now).   Retry in 100ms. */
826
827                 schedule_timeout_interruptible(HZ / 10);
828                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
829                 if (w) {
830                         w->cb = w_resync_finished;
831                         w->mdev = mdev;
832                         drbd_queue_work(&mdev->tconn->sender_work, w);
833                         return 1;
834                 }
835                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
836         }
837
838         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
839         if (dt <= 0)
840                 dt = 1;
841         
842         db = mdev->rs_total;
843         /* adjust for verify start and stop sectors, respective reached position */
844         if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
845                 db -= mdev->ov_left;
846
847         dbdt = Bit2KB(db/dt);
848         mdev->rs_paused /= HZ;
849
850         if (!get_ldev(mdev))
851                 goto out;
852
853         ping_peer(mdev);
854
855         spin_lock_irq(&mdev->tconn->req_lock);
856         os = drbd_read_state(mdev);
857
858         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
859
860         /* This protects us against multiple calls (that can happen in the presence
861            of application IO), and against connectivity loss just before we arrive here. */
862         if (os.conn <= C_CONNECTED)
863                 goto out_unlock;
864
865         ns = os;
866         ns.conn = C_CONNECTED;
867
868         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
869              verify_done ? "Online verify" : "Resync",
870              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
871
872         n_oos = drbd_bm_total_weight(mdev);
873
874         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
875                 if (n_oos) {
876                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
877                               n_oos, Bit2KB(1));
878                         khelper_cmd = "out-of-sync";
879                 }
880         } else {
881                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
882
883                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
884                         khelper_cmd = "after-resync-target";
885
886                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
887                         const unsigned long s = mdev->rs_same_csum;
888                         const unsigned long t = mdev->rs_total;
889                         const int ratio =
890                                 (t == 0)     ? 0 :
891                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
892                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
893                              "transferred %luK total %luK\n",
894                              ratio,
895                              Bit2KB(mdev->rs_same_csum),
896                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
897                              Bit2KB(mdev->rs_total));
898                 }
899         }
900
901         if (mdev->rs_failed) {
902                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
903
904                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
905                         ns.disk = D_INCONSISTENT;
906                         ns.pdsk = D_UP_TO_DATE;
907                 } else {
908                         ns.disk = D_UP_TO_DATE;
909                         ns.pdsk = D_INCONSISTENT;
910                 }
911         } else {
912                 ns.disk = D_UP_TO_DATE;
913                 ns.pdsk = D_UP_TO_DATE;
914
915                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
916                         if (mdev->p_uuid) {
917                                 int i;
918                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
919                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
920                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
921                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
922                         } else {
923                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
924                         }
925                 }
926
927                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
928                         /* for verify runs, we don't update uuids here,
929                          * so there would be nothing to report. */
930                         drbd_uuid_set_bm(mdev, 0UL);
931                         drbd_print_uuids(mdev, "updated UUIDs");
932                         if (mdev->p_uuid) {
933                                 /* Now the two UUID sets are equal, update what we
934                                  * know of the peer. */
935                                 int i;
936                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
937                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
938                         }
939                 }
940         }
941
942         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
943 out_unlock:
944         spin_unlock_irq(&mdev->tconn->req_lock);
945         put_ldev(mdev);
946 out:
947         mdev->rs_total  = 0;
948         mdev->rs_failed = 0;
949         mdev->rs_paused = 0;
950
951         /* reset start sector, if we reached end of device */
952         if (verify_done && mdev->ov_left == 0)
953                 mdev->ov_start_sector = 0;
954
955         drbd_md_sync(mdev);
956
957         if (khelper_cmd)
958                 drbd_khelper(mdev, khelper_cmd);
959
960         return 1;
961 }
962
963 /* helper */
964 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
965 {
966         if (drbd_peer_req_has_active_page(peer_req)) {
967                 /* This might happen if sendpage() has not finished */
968                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
969                 atomic_add(i, &mdev->pp_in_use_by_net);
970                 atomic_sub(i, &mdev->pp_in_use);
971                 spin_lock_irq(&mdev->tconn->req_lock);
972                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
973                 spin_unlock_irq(&mdev->tconn->req_lock);
974                 wake_up(&drbd_pp_wait);
975         } else
976                 drbd_free_peer_req(mdev, peer_req);
977 }
978
979 /**
980  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
981  * @mdev:       DRBD device.
982  * @w:          work object.
983  * @cancel:     The connection will be closed anyways
984  */
985 int w_e_end_data_req(struct drbd_work *w, int cancel)
986 {
987         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
988         struct drbd_conf *mdev = w->mdev;
989         int err;
990
991         if (unlikely(cancel)) {
992                 drbd_free_peer_req(mdev, peer_req);
993                 dec_unacked(mdev);
994                 return 0;
995         }
996
997         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
998                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
999         } else {
1000                 if (__ratelimit(&drbd_ratelimit_state))
1001                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
1002                             (unsigned long long)peer_req->i.sector);
1003
1004                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
1005         }
1006
1007         dec_unacked(mdev);
1008
1009         move_to_net_ee_or_free(mdev, peer_req);
1010
1011         if (unlikely(err))
1012                 dev_err(DEV, "drbd_send_block() failed\n");
1013         return err;
1014 }
1015
1016 /**
1017  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1018  * @mdev:       DRBD device.
1019  * @w:          work object.
1020  * @cancel:     The connection will be closed anyways
1021  */
1022 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1023 {
1024         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1025         struct drbd_conf *mdev = w->mdev;
1026         int err;
1027
1028         if (unlikely(cancel)) {
1029                 drbd_free_peer_req(mdev, peer_req);
1030                 dec_unacked(mdev);
1031                 return 0;
1032         }
1033
1034         if (get_ldev_if_state(mdev, D_FAILED)) {
1035                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1036                 put_ldev(mdev);
1037         }
1038
1039         if (mdev->state.conn == C_AHEAD) {
1040                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
1041         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1042                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1043                         inc_rs_pending(mdev);
1044                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1045                 } else {
1046                         if (__ratelimit(&drbd_ratelimit_state))
1047                                 dev_err(DEV, "Not sending RSDataReply, "
1048                                     "partner DISKLESS!\n");
1049                         err = 0;
1050                 }
1051         } else {
1052                 if (__ratelimit(&drbd_ratelimit_state))
1053                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1054                             (unsigned long long)peer_req->i.sector);
1055
1056                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1057
1058                 /* update resync data with failure */
1059                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1060         }
1061
1062         dec_unacked(mdev);
1063
1064         move_to_net_ee_or_free(mdev, peer_req);
1065
1066         if (unlikely(err))
1067                 dev_err(DEV, "drbd_send_block() failed\n");
1068         return err;
1069 }
1070
1071 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1072 {
1073         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1074         struct drbd_conf *mdev = w->mdev;
1075         struct digest_info *di;
1076         int digest_size;
1077         void *digest = NULL;
1078         int err, eq = 0;
1079
1080         if (unlikely(cancel)) {
1081                 drbd_free_peer_req(mdev, peer_req);
1082                 dec_unacked(mdev);
1083                 return 0;
1084         }
1085
1086         if (get_ldev(mdev)) {
1087                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1088                 put_ldev(mdev);
1089         }
1090
1091         di = peer_req->digest;
1092
1093         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1094                 /* quick hack to try to avoid a race against reconfiguration.
1095                  * a real fix would be much more involved,
1096                  * introducing more locking mechanisms */
1097                 if (mdev->tconn->csums_tfm) {
1098                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1099                         D_ASSERT(digest_size == di->digest_size);
1100                         digest = kmalloc(digest_size, GFP_NOIO);
1101                 }
1102                 if (digest) {
1103                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1104                         eq = !memcmp(digest, di->digest, digest_size);
1105                         kfree(digest);
1106                 }
1107
1108                 if (eq) {
1109                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1110                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1111                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1112                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1113                 } else {
1114                         inc_rs_pending(mdev);
1115                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1116                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1117                         kfree(di);
1118                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1119                 }
1120         } else {
1121                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1122                 if (__ratelimit(&drbd_ratelimit_state))
1123                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1124         }
1125
1126         dec_unacked(mdev);
1127         move_to_net_ee_or_free(mdev, peer_req);
1128
1129         if (unlikely(err))
1130                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1131         return err;
1132 }
1133
1134 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1135 {
1136         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1137         struct drbd_conf *mdev = w->mdev;
1138         sector_t sector = peer_req->i.sector;
1139         unsigned int size = peer_req->i.size;
1140         int digest_size;
1141         void *digest;
1142         int err = 0;
1143
1144         if (unlikely(cancel))
1145                 goto out;
1146
1147         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1148         digest = kmalloc(digest_size, GFP_NOIO);
1149         if (!digest) {
1150                 err = 1;        /* terminate the connection in case the allocation failed */
1151                 goto out;
1152         }
1153
1154         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1155                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1156         else
1157                 memset(digest, 0, digest_size);
1158
1159         /* Free e and pages before send.
1160          * In case we block on congestion, we could otherwise run into
1161          * some distributed deadlock, if the other side blocks on
1162          * congestion as well, because our receiver blocks in
1163          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1164         drbd_free_peer_req(mdev, peer_req);
1165         peer_req = NULL;
1166         inc_rs_pending(mdev);
1167         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1168         if (err)
1169                 dec_rs_pending(mdev);
1170         kfree(digest);
1171
1172 out:
1173         if (peer_req)
1174                 drbd_free_peer_req(mdev, peer_req);
1175         dec_unacked(mdev);
1176         return err;
1177 }
1178
1179 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1180 {
1181         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1182                 mdev->ov_last_oos_size += size>>9;
1183         } else {
1184                 mdev->ov_last_oos_start = sector;
1185                 mdev->ov_last_oos_size = size>>9;
1186         }
1187         drbd_set_out_of_sync(mdev, sector, size);
1188 }
1189
1190 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1191 {
1192         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1193         struct drbd_conf *mdev = w->mdev;
1194         struct digest_info *di;
1195         void *digest;
1196         sector_t sector = peer_req->i.sector;
1197         unsigned int size = peer_req->i.size;
1198         int digest_size;
1199         int err, eq = 0;
1200         bool stop_sector_reached = false;
1201
1202         if (unlikely(cancel)) {
1203                 drbd_free_peer_req(mdev, peer_req);
1204                 dec_unacked(mdev);
1205                 return 0;
1206         }
1207
1208         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1209          * the resync lru has been cleaned up already */
1210         if (get_ldev(mdev)) {
1211                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1212                 put_ldev(mdev);
1213         }
1214
1215         di = peer_req->digest;
1216
1217         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1218                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1219                 digest = kmalloc(digest_size, GFP_NOIO);
1220                 if (digest) {
1221                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1222
1223                         D_ASSERT(digest_size == di->digest_size);
1224                         eq = !memcmp(digest, di->digest, digest_size);
1225                         kfree(digest);
1226                 }
1227         }
1228
1229         /* Free peer_req and pages before send.
1230          * In case we block on congestion, we could otherwise run into
1231          * some distributed deadlock, if the other side blocks on
1232          * congestion as well, because our receiver blocks in
1233          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1234         drbd_free_peer_req(mdev, peer_req);
1235         if (!eq)
1236                 drbd_ov_out_of_sync_found(mdev, sector, size);
1237         else
1238                 ov_out_of_sync_print(mdev);
1239
1240         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1241                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1242
1243         dec_unacked(mdev);
1244
1245         --mdev->ov_left;
1246
1247         /* let's advance progress step marks only for every other megabyte */
1248         if ((mdev->ov_left & 0x200) == 0x200)
1249                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1250
1251         stop_sector_reached = verify_can_do_stop_sector(mdev) &&
1252                 (sector + (size>>9)) >= mdev->ov_stop_sector;
1253
1254         if (mdev->ov_left == 0 || stop_sector_reached) {
1255                 ov_out_of_sync_print(mdev);
1256                 drbd_resync_finished(mdev);
1257         }
1258
1259         return err;
1260 }
1261
1262 int w_prev_work_done(struct drbd_work *w, int cancel)
1263 {
1264         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1265
1266         complete(&b->done);
1267         return 0;
1268 }
1269
1270 /* FIXME
1271  * We need to track the number of pending barrier acks,
1272  * and to be able to wait for them.
1273  * See also comment in drbd_adm_attach before drbd_suspend_io.
1274  */
1275 int drbd_send_barrier(struct drbd_tconn *tconn)
1276 {
1277         struct p_barrier *p;
1278         struct drbd_socket *sock;
1279
1280         sock = &tconn->data;
1281         p = conn_prepare_command(tconn, sock);
1282         if (!p)
1283                 return -EIO;
1284         p->barrier = tconn->send.current_epoch_nr;
1285         p->pad = 0;
1286         tconn->send.current_epoch_writes = 0;
1287
1288         return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
1289 }
1290
1291 int w_send_write_hint(struct drbd_work *w, int cancel)
1292 {
1293         struct drbd_conf *mdev = w->mdev;
1294         struct drbd_socket *sock;
1295
1296         if (cancel)
1297                 return 0;
1298         sock = &mdev->tconn->data;
1299         if (!drbd_prepare_command(mdev, sock))
1300                 return -EIO;
1301         return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1302 }
1303
1304 static void re_init_if_first_write(struct drbd_tconn *tconn, unsigned int epoch)
1305 {
1306         if (!tconn->send.seen_any_write_yet) {
1307                 tconn->send.seen_any_write_yet = true;
1308                 tconn->send.current_epoch_nr = epoch;
1309                 tconn->send.current_epoch_writes = 0;
1310         }
1311 }
1312
1313 static void maybe_send_barrier(struct drbd_tconn *tconn, unsigned int epoch)
1314 {
1315         /* re-init if first write on this connection */
1316         if (!tconn->send.seen_any_write_yet)
1317                 return;
1318         if (tconn->send.current_epoch_nr != epoch) {
1319                 if (tconn->send.current_epoch_writes)
1320                         drbd_send_barrier(tconn);
1321                 tconn->send.current_epoch_nr = epoch;
1322         }
1323 }
1324
1325 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1326 {
1327         struct drbd_request *req = container_of(w, struct drbd_request, w);
1328         struct drbd_conf *mdev = w->mdev;
1329         struct drbd_tconn *tconn = mdev->tconn;
1330         int err;
1331
1332         if (unlikely(cancel)) {
1333                 req_mod(req, SEND_CANCELED);
1334                 return 0;
1335         }
1336
1337         /* this time, no tconn->send.current_epoch_writes++;
1338          * If it was sent, it was the closing barrier for the last
1339          * replicated epoch, before we went into AHEAD mode.
1340          * No more barriers will be sent, until we leave AHEAD mode again. */
1341         maybe_send_barrier(tconn, req->epoch);
1342
1343         err = drbd_send_out_of_sync(mdev, req);
1344         req_mod(req, OOS_HANDED_TO_NETWORK);
1345
1346         return err;
1347 }
1348
1349 /**
1350  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1351  * @mdev:       DRBD device.
1352  * @w:          work object.
1353  * @cancel:     The connection will be closed anyways
1354  */
1355 int w_send_dblock(struct drbd_work *w, int cancel)
1356 {
1357         struct drbd_request *req = container_of(w, struct drbd_request, w);
1358         struct drbd_conf *mdev = w->mdev;
1359         struct drbd_tconn *tconn = mdev->tconn;
1360         int err;
1361
1362         if (unlikely(cancel)) {
1363                 req_mod(req, SEND_CANCELED);
1364                 return 0;
1365         }
1366
1367         re_init_if_first_write(tconn, req->epoch);
1368         maybe_send_barrier(tconn, req->epoch);
1369         tconn->send.current_epoch_writes++;
1370
1371         err = drbd_send_dblock(mdev, req);
1372         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1373
1374         return err;
1375 }
1376
1377 /**
1378  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1379  * @mdev:       DRBD device.
1380  * @w:          work object.
1381  * @cancel:     The connection will be closed anyways
1382  */
1383 int w_send_read_req(struct drbd_work *w, int cancel)
1384 {
1385         struct drbd_request *req = container_of(w, struct drbd_request, w);
1386         struct drbd_conf *mdev = w->mdev;
1387         struct drbd_tconn *tconn = mdev->tconn;
1388         int err;
1389
1390         if (unlikely(cancel)) {
1391                 req_mod(req, SEND_CANCELED);
1392                 return 0;
1393         }
1394
1395         /* Even read requests may close a write epoch,
1396          * if there was any yet. */
1397         maybe_send_barrier(tconn, req->epoch);
1398
1399         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1400                                  (unsigned long)req);
1401
1402         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1403
1404         return err;
1405 }
1406
1407 int w_restart_disk_io(struct drbd_work *w, int cancel)
1408 {
1409         struct drbd_request *req = container_of(w, struct drbd_request, w);
1410         struct drbd_conf *mdev = w->mdev;
1411
1412         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1413                 drbd_al_begin_io(mdev, &req->i);
1414
1415         drbd_req_make_private_bio(req, req->master_bio);
1416         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1417         generic_make_request(req->private_bio);
1418
1419         return 0;
1420 }
1421
1422 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1423 {
1424         struct drbd_conf *odev = mdev;
1425         int resync_after;
1426
1427         while (1) {
1428                 if (!odev->ldev)
1429                         return 1;
1430                 rcu_read_lock();
1431                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1432                 rcu_read_unlock();
1433                 if (resync_after == -1)
1434                         return 1;
1435                 odev = minor_to_mdev(resync_after);
1436                 if (!expect(odev))
1437                         return 1;
1438                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1439                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1440                     odev->state.aftr_isp || odev->state.peer_isp ||
1441                     odev->state.user_isp)
1442                         return 0;
1443         }
1444 }
1445
1446 /**
1447  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1448  * @mdev:       DRBD device.
1449  *
1450  * Called from process context only (admin command and after_state_ch).
1451  */
1452 static int _drbd_pause_after(struct drbd_conf *mdev)
1453 {
1454         struct drbd_conf *odev;
1455         int i, rv = 0;
1456
1457         rcu_read_lock();
1458         idr_for_each_entry(&minors, odev, i) {
1459                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1460                         continue;
1461                 if (!_drbd_may_sync_now(odev))
1462                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1463                                != SS_NOTHING_TO_DO);
1464         }
1465         rcu_read_unlock();
1466
1467         return rv;
1468 }
1469
1470 /**
1471  * _drbd_resume_next() - Resume resync on all devices that may resync now
1472  * @mdev:       DRBD device.
1473  *
1474  * Called from process context only (admin command and worker).
1475  */
1476 static int _drbd_resume_next(struct drbd_conf *mdev)
1477 {
1478         struct drbd_conf *odev;
1479         int i, rv = 0;
1480
1481         rcu_read_lock();
1482         idr_for_each_entry(&minors, odev, i) {
1483                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1484                         continue;
1485                 if (odev->state.aftr_isp) {
1486                         if (_drbd_may_sync_now(odev))
1487                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1488                                                         CS_HARD, NULL)
1489                                        != SS_NOTHING_TO_DO) ;
1490                 }
1491         }
1492         rcu_read_unlock();
1493         return rv;
1494 }
1495
1496 void resume_next_sg(struct drbd_conf *mdev)
1497 {
1498         write_lock_irq(&global_state_lock);
1499         _drbd_resume_next(mdev);
1500         write_unlock_irq(&global_state_lock);
1501 }
1502
1503 void suspend_other_sg(struct drbd_conf *mdev)
1504 {
1505         write_lock_irq(&global_state_lock);
1506         _drbd_pause_after(mdev);
1507         write_unlock_irq(&global_state_lock);
1508 }
1509
1510 /* caller must hold global_state_lock */
1511 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1512 {
1513         struct drbd_conf *odev;
1514         int resync_after;
1515
1516         if (o_minor == -1)
1517                 return NO_ERROR;
1518         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1519                 return ERR_RESYNC_AFTER;
1520
1521         /* check for loops */
1522         odev = minor_to_mdev(o_minor);
1523         while (1) {
1524                 if (odev == mdev)
1525                         return ERR_RESYNC_AFTER_CYCLE;
1526
1527                 rcu_read_lock();
1528                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1529                 rcu_read_unlock();
1530                 /* dependency chain ends here, no cycles. */
1531                 if (resync_after == -1)
1532                         return NO_ERROR;
1533
1534                 /* follow the dependency chain */
1535                 odev = minor_to_mdev(resync_after);
1536         }
1537 }
1538
1539 /* caller must hold global_state_lock */
1540 void drbd_resync_after_changed(struct drbd_conf *mdev)
1541 {
1542         int changes;
1543
1544         do {
1545                 changes  = _drbd_pause_after(mdev);
1546                 changes |= _drbd_resume_next(mdev);
1547         } while (changes);
1548 }
1549
1550 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1551 {
1552         struct fifo_buffer *plan;
1553
1554         atomic_set(&mdev->rs_sect_in, 0);
1555         atomic_set(&mdev->rs_sect_ev, 0);
1556         mdev->rs_in_flight = 0;
1557
1558         /* Updating the RCU protected object in place is necessary since
1559            this function gets called from atomic context.
1560            It is valid since all other updates also lead to an completely
1561            empty fifo */
1562         rcu_read_lock();
1563         plan = rcu_dereference(mdev->rs_plan_s);
1564         plan->total = 0;
1565         fifo_set(plan, 0);
1566         rcu_read_unlock();
1567 }
1568
1569 void start_resync_timer_fn(unsigned long data)
1570 {
1571         struct drbd_conf *mdev = (struct drbd_conf *) data;
1572
1573         drbd_queue_work(&mdev->tconn->sender_work, &mdev->start_resync_work);
1574 }
1575
1576 int w_start_resync(struct drbd_work *w, int cancel)
1577 {
1578         struct drbd_conf *mdev = w->mdev;
1579
1580         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1581                 dev_warn(DEV, "w_start_resync later...\n");
1582                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1583                 add_timer(&mdev->start_resync_timer);
1584                 return 0;
1585         }
1586
1587         drbd_start_resync(mdev, C_SYNC_SOURCE);
1588         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
1589         return 0;
1590 }
1591
1592 /**
1593  * drbd_start_resync() - Start the resync process
1594  * @mdev:       DRBD device.
1595  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1596  *
1597  * This function might bring you directly into one of the
1598  * C_PAUSED_SYNC_* states.
1599  */
1600 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1601 {
1602         union drbd_state ns;
1603         int r;
1604
1605         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1606                 dev_err(DEV, "Resync already running!\n");
1607                 return;
1608         }
1609
1610         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1611                 if (side == C_SYNC_TARGET) {
1612                         /* Since application IO was locked out during C_WF_BITMAP_T and
1613                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1614                            we check that we might make the data inconsistent. */
1615                         r = drbd_khelper(mdev, "before-resync-target");
1616                         r = (r >> 8) & 0xff;
1617                         if (r > 0) {
1618                                 dev_info(DEV, "before-resync-target handler returned %d, "
1619                                          "dropping connection.\n", r);
1620                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1621                                 return;
1622                         }
1623                 } else /* C_SYNC_SOURCE */ {
1624                         r = drbd_khelper(mdev, "before-resync-source");
1625                         r = (r >> 8) & 0xff;
1626                         if (r > 0) {
1627                                 if (r == 3) {
1628                                         dev_info(DEV, "before-resync-source handler returned %d, "
1629                                                  "ignoring. Old userland tools?", r);
1630                                 } else {
1631                                         dev_info(DEV, "before-resync-source handler returned %d, "
1632                                                  "dropping connection.\n", r);
1633                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1634                                         return;
1635                                 }
1636                         }
1637                 }
1638         }
1639
1640         if (current == mdev->tconn->worker.task) {
1641                 /* The worker should not sleep waiting for state_mutex,
1642                    that can take long */
1643                 if (!mutex_trylock(mdev->state_mutex)) {
1644                         set_bit(B_RS_H_DONE, &mdev->flags);
1645                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1646                         add_timer(&mdev->start_resync_timer);
1647                         return;
1648                 }
1649         } else {
1650                 mutex_lock(mdev->state_mutex);
1651         }
1652         clear_bit(B_RS_H_DONE, &mdev->flags);
1653
1654         write_lock_irq(&global_state_lock);
1655         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1656                 write_unlock_irq(&global_state_lock);
1657                 mutex_unlock(mdev->state_mutex);
1658                 return;
1659         }
1660
1661         ns = drbd_read_state(mdev);
1662
1663         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1664
1665         ns.conn = side;
1666
1667         if (side == C_SYNC_TARGET)
1668                 ns.disk = D_INCONSISTENT;
1669         else /* side == C_SYNC_SOURCE */
1670                 ns.pdsk = D_INCONSISTENT;
1671
1672         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1673         ns = drbd_read_state(mdev);
1674
1675         if (ns.conn < C_CONNECTED)
1676                 r = SS_UNKNOWN_ERROR;
1677
1678         if (r == SS_SUCCESS) {
1679                 unsigned long tw = drbd_bm_total_weight(mdev);
1680                 unsigned long now = jiffies;
1681                 int i;
1682
1683                 mdev->rs_failed    = 0;
1684                 mdev->rs_paused    = 0;
1685                 mdev->rs_same_csum = 0;
1686                 mdev->rs_last_events = 0;
1687                 mdev->rs_last_sect_ev = 0;
1688                 mdev->rs_total     = tw;
1689                 mdev->rs_start     = now;
1690                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1691                         mdev->rs_mark_left[i] = tw;
1692                         mdev->rs_mark_time[i] = now;
1693                 }
1694                 _drbd_pause_after(mdev);
1695         }
1696         write_unlock_irq(&global_state_lock);
1697
1698         if (r == SS_SUCCESS) {
1699                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1700                      drbd_conn_str(ns.conn),
1701                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1702                      (unsigned long) mdev->rs_total);
1703                 if (side == C_SYNC_TARGET)
1704                         mdev->bm_resync_fo = 0;
1705
1706                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1707                  * with w_send_oos, or the sync target will get confused as to
1708                  * how much bits to resync.  We cannot do that always, because for an
1709                  * empty resync and protocol < 95, we need to do it here, as we call
1710                  * drbd_resync_finished from here in that case.
1711                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1712                  * and from after_state_ch otherwise. */
1713                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1714                         drbd_gen_and_send_sync_uuid(mdev);
1715
1716                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1717                         /* This still has a race (about when exactly the peers
1718                          * detect connection loss) that can lead to a full sync
1719                          * on next handshake. In 8.3.9 we fixed this with explicit
1720                          * resync-finished notifications, but the fix
1721                          * introduces a protocol change.  Sleeping for some
1722                          * time longer than the ping interval + timeout on the
1723                          * SyncSource, to give the SyncTarget the chance to
1724                          * detect connection loss, then waiting for a ping
1725                          * response (implicit in drbd_resync_finished) reduces
1726                          * the race considerably, but does not solve it. */
1727                         if (side == C_SYNC_SOURCE) {
1728                                 struct net_conf *nc;
1729                                 int timeo;
1730
1731                                 rcu_read_lock();
1732                                 nc = rcu_dereference(mdev->tconn->net_conf);
1733                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1734                                 rcu_read_unlock();
1735                                 schedule_timeout_interruptible(timeo);
1736                         }
1737                         drbd_resync_finished(mdev);
1738                 }
1739
1740                 drbd_rs_controller_reset(mdev);
1741                 /* ns.conn may already be != mdev->state.conn,
1742                  * we may have been paused in between, or become paused until
1743                  * the timer triggers.
1744                  * No matter, that is handled in resync_timer_fn() */
1745                 if (ns.conn == C_SYNC_TARGET)
1746                         mod_timer(&mdev->resync_timer, jiffies);
1747
1748                 drbd_md_sync(mdev);
1749         }
1750         put_ldev(mdev);
1751         mutex_unlock(mdev->state_mutex);
1752 }
1753
1754 /* If the resource already closed the current epoch, but we did not
1755  * (because we have not yet seen new requests), we should send the
1756  * corresponding barrier now.  Must be checked within the same spinlock
1757  * that is used to check for new requests. */
1758 bool need_to_send_barrier(struct drbd_tconn *connection)
1759 {
1760         if (!connection->send.seen_any_write_yet)
1761                 return false;
1762
1763         /* Skip barriers that do not contain any writes.
1764          * This may happen during AHEAD mode. */
1765         if (!connection->send.current_epoch_writes)
1766                 return false;
1767
1768         /* ->req_lock is held when requests are queued on
1769          * connection->sender_work, and put into ->transfer_log.
1770          * It is also held when ->current_tle_nr is increased.
1771          * So either there are already new requests queued,
1772          * and corresponding barriers will be send there.
1773          * Or nothing new is queued yet, so the difference will be 1.
1774          */
1775         if (atomic_read(&connection->current_tle_nr) !=
1776             connection->send.current_epoch_nr + 1)
1777                 return false;
1778
1779         return true;
1780 }
1781
1782 bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1783 {
1784         spin_lock_irq(&queue->q_lock);
1785         list_splice_init(&queue->q, work_list);
1786         spin_unlock_irq(&queue->q_lock);
1787         return !list_empty(work_list);
1788 }
1789
1790 bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1791 {
1792         spin_lock_irq(&queue->q_lock);
1793         if (!list_empty(&queue->q))
1794                 list_move(queue->q.next, work_list);
1795         spin_unlock_irq(&queue->q_lock);
1796         return !list_empty(work_list);
1797 }
1798
1799 void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
1800 {
1801         DEFINE_WAIT(wait);
1802         struct net_conf *nc;
1803         int uncork, cork;
1804
1805         dequeue_work_item(&connection->sender_work, work_list);
1806         if (!list_empty(work_list))
1807                 return;
1808
1809         /* Still nothing to do?
1810          * Maybe we still need to close the current epoch,
1811          * even if no new requests are queued yet.
1812          *
1813          * Also, poke TCP, just in case.
1814          * Then wait for new work (or signal). */
1815         rcu_read_lock();
1816         nc = rcu_dereference(connection->net_conf);
1817         uncork = nc ? nc->tcp_cork : 0;
1818         rcu_read_unlock();
1819         if (uncork) {
1820                 mutex_lock(&connection->data.mutex);
1821                 if (connection->data.socket)
1822                         drbd_tcp_uncork(connection->data.socket);
1823                 mutex_unlock(&connection->data.mutex);
1824         }
1825
1826         for (;;) {
1827                 int send_barrier;
1828                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1829                 spin_lock_irq(&connection->req_lock);
1830                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
1831                 /* dequeue single item only,
1832                  * we still use drbd_queue_work_front() in some places */
1833                 if (!list_empty(&connection->sender_work.q))
1834                         list_move(connection->sender_work.q.next, work_list);
1835                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
1836                 if (!list_empty(work_list) || signal_pending(current)) {
1837                         spin_unlock_irq(&connection->req_lock);
1838                         break;
1839                 }
1840                 send_barrier = need_to_send_barrier(connection);
1841                 spin_unlock_irq(&connection->req_lock);
1842                 if (send_barrier) {
1843                         drbd_send_barrier(connection);
1844                         connection->send.current_epoch_nr++;
1845                 }
1846                 schedule();
1847                 /* may be woken up for other things but new work, too,
1848                  * e.g. if the current epoch got closed.
1849                  * In which case we send the barrier above. */
1850         }
1851         finish_wait(&connection->sender_work.q_wait, &wait);
1852
1853         /* someone may have changed the config while we have been waiting above. */
1854         rcu_read_lock();
1855         nc = rcu_dereference(connection->net_conf);
1856         cork = nc ? nc->tcp_cork : 0;
1857         rcu_read_unlock();
1858         mutex_lock(&connection->data.mutex);
1859         if (connection->data.socket) {
1860                 if (cork)
1861                         drbd_tcp_cork(connection->data.socket);
1862                 else if (!uncork)
1863                         drbd_tcp_uncork(connection->data.socket);
1864         }
1865         mutex_unlock(&connection->data.mutex);
1866 }
1867
1868 int drbd_worker(struct drbd_thread *thi)
1869 {
1870         struct drbd_tconn *tconn = thi->tconn;
1871         struct drbd_work *w = NULL;
1872         struct drbd_conf *mdev;
1873         LIST_HEAD(work_list);
1874         int vnr;
1875
1876         while (get_t_state(thi) == RUNNING) {
1877                 drbd_thread_current_set_cpu(thi);
1878
1879                 /* as long as we use drbd_queue_work_front(),
1880                  * we may only dequeue single work items here, not batches. */
1881                 if (list_empty(&work_list))
1882                         wait_for_work(tconn, &work_list);
1883
1884                 if (signal_pending(current)) {
1885                         flush_signals(current);
1886                         if (get_t_state(thi) == RUNNING) {
1887                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1888                                 continue;
1889                         }
1890                         break;
1891                 }
1892
1893                 if (get_t_state(thi) != RUNNING)
1894                         break;
1895
1896                 while (!list_empty(&work_list)) {
1897                         w = list_first_entry(&work_list, struct drbd_work, list);
1898                         list_del_init(&w->list);
1899                         if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
1900                                 continue;
1901                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1902                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1903                 }
1904         }
1905
1906         do {
1907                 while (!list_empty(&work_list)) {
1908                         w = list_first_entry(&work_list, struct drbd_work, list);
1909                         list_del_init(&w->list);
1910                         w->cb(w, 1);
1911                 }
1912                 dequeue_work_batch(&tconn->sender_work, &work_list);
1913         } while (!list_empty(&work_list));
1914
1915         rcu_read_lock();
1916         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1917                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1918                 kref_get(&mdev->kref);
1919                 rcu_read_unlock();
1920                 drbd_mdev_cleanup(mdev);
1921                 kref_put(&mdev->kref, &drbd_minor_destroy);
1922                 rcu_read_lock();
1923         }
1924         rcu_read_unlock();
1925
1926         return 0;
1927 }