]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
2e2e9c35b4e57d8e8974a79b19548ddf4a0da27b
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
432
433 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435
436 static int rbd_open(struct block_device *bdev, fmode_t mode)
437 {
438         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439         bool removing = false;
440
441         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442                 return -EROFS;
443
444         spin_lock_irq(&rbd_dev->lock);
445         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446                 removing = true;
447         else
448                 rbd_dev->open_count++;
449         spin_unlock_irq(&rbd_dev->lock);
450         if (removing)
451                 return -ENOENT;
452
453         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454         (void) get_device(&rbd_dev->dev);
455         set_device_ro(bdev, rbd_dev->mapping.read_only);
456         mutex_unlock(&ctl_mutex);
457
458         return 0;
459 }
460
461 static int rbd_release(struct gendisk *disk, fmode_t mode)
462 {
463         struct rbd_device *rbd_dev = disk->private_data;
464         unsigned long open_count_before;
465
466         spin_lock_irq(&rbd_dev->lock);
467         open_count_before = rbd_dev->open_count--;
468         spin_unlock_irq(&rbd_dev->lock);
469         rbd_assert(open_count_before > 0);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472         put_device(&rbd_dev->dev);
473         mutex_unlock(&ctl_mutex);
474
475         return 0;
476 }
477
478 static const struct block_device_operations rbd_bd_ops = {
479         .owner                  = THIS_MODULE,
480         .open                   = rbd_open,
481         .release                = rbd_release,
482 };
483
484 /*
485  * Initialize an rbd client instance.
486  * We own *ceph_opts.
487  */
488 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *rbdc;
491         int ret = -ENOMEM;
492
493         dout("%s:\n", __func__);
494         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495         if (!rbdc)
496                 goto out_opt;
497
498         kref_init(&rbdc->kref);
499         INIT_LIST_HEAD(&rbdc->node);
500
501         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504         if (IS_ERR(rbdc->client))
505                 goto out_mutex;
506         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508         ret = ceph_open_session(rbdc->client);
509         if (ret < 0)
510                 goto out_err;
511
512         spin_lock(&rbd_client_list_lock);
513         list_add_tail(&rbdc->node, &rbd_client_list);
514         spin_unlock(&rbd_client_list_lock);
515
516         mutex_unlock(&ctl_mutex);
517         dout("%s: rbdc %p\n", __func__, rbdc);
518
519         return rbdc;
520
521 out_err:
522         ceph_destroy_client(rbdc->client);
523 out_mutex:
524         mutex_unlock(&ctl_mutex);
525         kfree(rbdc);
526 out_opt:
527         if (ceph_opts)
528                 ceph_destroy_options(ceph_opts);
529         dout("%s: error %d\n", __func__, ret);
530
531         return ERR_PTR(ret);
532 }
533
534 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535 {
536         kref_get(&rbdc->kref);
537
538         return rbdc;
539 }
540
541 /*
542  * Find a ceph client with specific addr and configuration.  If
543  * found, bump its reference count.
544  */
545 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546 {
547         struct rbd_client *client_node;
548         bool found = false;
549
550         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551                 return NULL;
552
553         spin_lock(&rbd_client_list_lock);
554         list_for_each_entry(client_node, &rbd_client_list, node) {
555                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
556                         __rbd_get_client(client_node);
557
558                         found = true;
559                         break;
560                 }
561         }
562         spin_unlock(&rbd_client_list_lock);
563
564         return found ? client_node : NULL;
565 }
566
567 /*
568  * mount options
569  */
570 enum {
571         Opt_last_int,
572         /* int args above */
573         Opt_last_string,
574         /* string args above */
575         Opt_read_only,
576         Opt_read_write,
577         /* Boolean args above */
578         Opt_last_bool,
579 };
580
581 static match_table_t rbd_opts_tokens = {
582         /* int args above */
583         /* string args above */
584         {Opt_read_only, "read_only"},
585         {Opt_read_only, "ro"},          /* Alternate spelling */
586         {Opt_read_write, "read_write"},
587         {Opt_read_write, "rw"},         /* Alternate spelling */
588         /* Boolean args above */
589         {-1, NULL}
590 };
591
592 struct rbd_options {
593         bool    read_only;
594 };
595
596 #define RBD_READ_ONLY_DEFAULT   false
597
598 static int parse_rbd_opts_token(char *c, void *private)
599 {
600         struct rbd_options *rbd_opts = private;
601         substring_t argstr[MAX_OPT_ARGS];
602         int token, intval, ret;
603
604         token = match_token(c, rbd_opts_tokens, argstr);
605         if (token < 0)
606                 return -EINVAL;
607
608         if (token < Opt_last_int) {
609                 ret = match_int(&argstr[0], &intval);
610                 if (ret < 0) {
611                         pr_err("bad mount option arg (not int) "
612                                "at '%s'\n", c);
613                         return ret;
614                 }
615                 dout("got int token %d val %d\n", token, intval);
616         } else if (token > Opt_last_int && token < Opt_last_string) {
617                 dout("got string token %d val %s\n", token,
618                      argstr[0].from);
619         } else if (token > Opt_last_string && token < Opt_last_bool) {
620                 dout("got Boolean token %d\n", token);
621         } else {
622                 dout("got token %d\n", token);
623         }
624
625         switch (token) {
626         case Opt_read_only:
627                 rbd_opts->read_only = true;
628                 break;
629         case Opt_read_write:
630                 rbd_opts->read_only = false;
631                 break;
632         default:
633                 rbd_assert(false);
634                 break;
635         }
636         return 0;
637 }
638
639 /*
640  * Get a ceph client with specific addr and configuration, if one does
641  * not exist create it.
642  */
643 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644 {
645         struct rbd_client *rbdc;
646
647         rbdc = rbd_client_find(ceph_opts);
648         if (rbdc)       /* using an existing client */
649                 ceph_destroy_options(ceph_opts);
650         else
651                 rbdc = rbd_client_create(ceph_opts);
652
653         return rbdc;
654 }
655
656 /*
657  * Destroy ceph client
658  *
659  * Caller must hold rbd_client_list_lock.
660  */
661 static void rbd_client_release(struct kref *kref)
662 {
663         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665         dout("%s: rbdc %p\n", __func__, rbdc);
666         spin_lock(&rbd_client_list_lock);
667         list_del(&rbdc->node);
668         spin_unlock(&rbd_client_list_lock);
669
670         ceph_destroy_client(rbdc->client);
671         kfree(rbdc);
672 }
673
674 /*
675  * Drop reference to ceph client node. If it's not referenced anymore, release
676  * it.
677  */
678 static void rbd_put_client(struct rbd_client *rbdc)
679 {
680         if (rbdc)
681                 kref_put(&rbdc->kref, rbd_client_release);
682 }
683
684 static bool rbd_image_format_valid(u32 image_format)
685 {
686         return image_format == 1 || image_format == 2;
687 }
688
689 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690 {
691         size_t size;
692         u32 snap_count;
693
694         /* The header has to start with the magic rbd header text */
695         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696                 return false;
697
698         /* The bio layer requires at least sector-sized I/O */
699
700         if (ondisk->options.order < SECTOR_SHIFT)
701                 return false;
702
703         /* If we use u64 in a few spots we may be able to loosen this */
704
705         if (ondisk->options.order > 8 * sizeof (int) - 1)
706                 return false;
707
708         /*
709          * The size of a snapshot header has to fit in a size_t, and
710          * that limits the number of snapshots.
711          */
712         snap_count = le32_to_cpu(ondisk->snap_count);
713         size = SIZE_MAX - sizeof (struct ceph_snap_context);
714         if (snap_count > size / sizeof (__le64))
715                 return false;
716
717         /*
718          * Not only that, but the size of the entire the snapshot
719          * header must also be representable in a size_t.
720          */
721         size -= snap_count * sizeof (__le64);
722         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723                 return false;
724
725         return true;
726 }
727
728 /*
729  * Create a new header structure, translate header format from the on-disk
730  * header.
731  */
732 static int rbd_header_from_disk(struct rbd_image_header *header,
733                                  struct rbd_image_header_ondisk *ondisk)
734 {
735         u32 snap_count;
736         size_t len;
737         size_t size;
738         u32 i;
739
740         memset(header, 0, sizeof (*header));
741
742         snap_count = le32_to_cpu(ondisk->snap_count);
743
744         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746         if (!header->object_prefix)
747                 return -ENOMEM;
748         memcpy(header->object_prefix, ondisk->object_prefix, len);
749         header->object_prefix[len] = '\0';
750
751         if (snap_count) {
752                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753
754                 /* Save a copy of the snapshot names */
755
756                 if (snap_names_len > (u64) SIZE_MAX)
757                         return -EIO;
758                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759                 if (!header->snap_names)
760                         goto out_err;
761                 /*
762                  * Note that rbd_dev_v1_header_read() guarantees
763                  * the ondisk buffer we're working with has
764                  * snap_names_len bytes beyond the end of the
765                  * snapshot id array, this memcpy() is safe.
766                  */
767                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
768                         snap_names_len);
769
770                 /* Record each snapshot's size */
771
772                 size = snap_count * sizeof (*header->snap_sizes);
773                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
774                 if (!header->snap_sizes)
775                         goto out_err;
776                 for (i = 0; i < snap_count; i++)
777                         header->snap_sizes[i] =
778                                 le64_to_cpu(ondisk->snaps[i].image_size);
779         } else {
780                 header->snap_names = NULL;
781                 header->snap_sizes = NULL;
782         }
783
784         header->features = 0;   /* No features support in v1 images */
785         header->obj_order = ondisk->options.order;
786         header->crypt_type = ondisk->options.crypt_type;
787         header->comp_type = ondisk->options.comp_type;
788
789         /* Allocate and fill in the snapshot context */
790
791         header->image_size = le64_to_cpu(ondisk->image_size);
792         size = sizeof (struct ceph_snap_context);
793         size += snap_count * sizeof (header->snapc->snaps[0]);
794         header->snapc = kzalloc(size, GFP_KERNEL);
795         if (!header->snapc)
796                 goto out_err;
797
798         atomic_set(&header->snapc->nref, 1);
799         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
800         header->snapc->num_snaps = snap_count;
801         for (i = 0; i < snap_count; i++)
802                 header->snapc->snaps[i] =
803                         le64_to_cpu(ondisk->snaps[i].id);
804
805         return 0;
806
807 out_err:
808         kfree(header->snap_sizes);
809         header->snap_sizes = NULL;
810         kfree(header->snap_names);
811         header->snap_names = NULL;
812         kfree(header->object_prefix);
813         header->object_prefix = NULL;
814
815         return -ENOMEM;
816 }
817
818 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
819 {
820         struct rbd_snap *snap;
821
822         if (snap_id == CEPH_NOSNAP)
823                 return RBD_SNAP_HEAD_NAME;
824
825         list_for_each_entry(snap, &rbd_dev->snaps, node)
826                 if (snap_id == snap->id)
827                         return snap->name;
828
829         return NULL;
830 }
831
832 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
833                                         const char *snap_name)
834 {
835         struct rbd_snap *snap;
836
837         list_for_each_entry(snap, &rbd_dev->snaps, node)
838                 if (!strcmp(snap_name, snap->name))
839                         return snap;
840
841         return NULL;
842 }
843
844 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
845 {
846         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
847                     sizeof (RBD_SNAP_HEAD_NAME))) {
848                 rbd_dev->mapping.size = rbd_dev->header.image_size;
849                 rbd_dev->mapping.features = rbd_dev->header.features;
850         } else {
851                 struct rbd_snap *snap;
852
853                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
854                 if (!snap)
855                         return -ENOENT;
856                 rbd_dev->mapping.size = snap->size;
857                 rbd_dev->mapping.features = snap->features;
858                 rbd_dev->mapping.read_only = true;
859         }
860         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
861
862         return 0;
863 }
864
865 static void rbd_header_free(struct rbd_image_header *header)
866 {
867         kfree(header->object_prefix);
868         header->object_prefix = NULL;
869         kfree(header->snap_sizes);
870         header->snap_sizes = NULL;
871         kfree(header->snap_names);
872         header->snap_names = NULL;
873         ceph_put_snap_context(header->snapc);
874         header->snapc = NULL;
875 }
876
877 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
878 {
879         char *name;
880         u64 segment;
881         int ret;
882
883         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
884         if (!name)
885                 return NULL;
886         segment = offset >> rbd_dev->header.obj_order;
887         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
888                         rbd_dev->header.object_prefix, segment);
889         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
890                 pr_err("error formatting segment name for #%llu (%d)\n",
891                         segment, ret);
892                 kfree(name);
893                 name = NULL;
894         }
895
896         return name;
897 }
898
899 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
900 {
901         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
902
903         return offset & (segment_size - 1);
904 }
905
906 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
907                                 u64 offset, u64 length)
908 {
909         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
910
911         offset &= segment_size - 1;
912
913         rbd_assert(length <= U64_MAX - offset);
914         if (offset + length > segment_size)
915                 length = segment_size - offset;
916
917         return length;
918 }
919
920 /*
921  * returns the size of an object in the image
922  */
923 static u64 rbd_obj_bytes(struct rbd_image_header *header)
924 {
925         return 1 << header->obj_order;
926 }
927
928 /*
929  * bio helpers
930  */
931
932 static void bio_chain_put(struct bio *chain)
933 {
934         struct bio *tmp;
935
936         while (chain) {
937                 tmp = chain;
938                 chain = chain->bi_next;
939                 bio_put(tmp);
940         }
941 }
942
943 /*
944  * zeros a bio chain, starting at specific offset
945  */
946 static void zero_bio_chain(struct bio *chain, int start_ofs)
947 {
948         struct bio_vec *bv;
949         unsigned long flags;
950         void *buf;
951         int i;
952         int pos = 0;
953
954         while (chain) {
955                 bio_for_each_segment(bv, chain, i) {
956                         if (pos + bv->bv_len > start_ofs) {
957                                 int remainder = max(start_ofs - pos, 0);
958                                 buf = bvec_kmap_irq(bv, &flags);
959                                 memset(buf + remainder, 0,
960                                        bv->bv_len - remainder);
961                                 bvec_kunmap_irq(buf, &flags);
962                         }
963                         pos += bv->bv_len;
964                 }
965
966                 chain = chain->bi_next;
967         }
968 }
969
970 /*
971  * similar to zero_bio_chain(), zeros data defined by a page array,
972  * starting at the given byte offset from the start of the array and
973  * continuing up to the given end offset.  The pages array is
974  * assumed to be big enough to hold all bytes up to the end.
975  */
976 static void zero_pages(struct page **pages, u64 offset, u64 end)
977 {
978         struct page **page = &pages[offset >> PAGE_SHIFT];
979
980         rbd_assert(end > offset);
981         rbd_assert(end - offset <= (u64)SIZE_MAX);
982         while (offset < end) {
983                 size_t page_offset;
984                 size_t length;
985                 unsigned long flags;
986                 void *kaddr;
987
988                 page_offset = (size_t)(offset & ~PAGE_MASK);
989                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
990                 local_irq_save(flags);
991                 kaddr = kmap_atomic(*page);
992                 memset(kaddr + page_offset, 0, length);
993                 kunmap_atomic(kaddr);
994                 local_irq_restore(flags);
995
996                 offset += length;
997                 page++;
998         }
999 }
1000
1001 /*
1002  * Clone a portion of a bio, starting at the given byte offset
1003  * and continuing for the number of bytes indicated.
1004  */
1005 static struct bio *bio_clone_range(struct bio *bio_src,
1006                                         unsigned int offset,
1007                                         unsigned int len,
1008                                         gfp_t gfpmask)
1009 {
1010         struct bio_vec *bv;
1011         unsigned int resid;
1012         unsigned short idx;
1013         unsigned int voff;
1014         unsigned short end_idx;
1015         unsigned short vcnt;
1016         struct bio *bio;
1017
1018         /* Handle the easy case for the caller */
1019
1020         if (!offset && len == bio_src->bi_size)
1021                 return bio_clone(bio_src, gfpmask);
1022
1023         if (WARN_ON_ONCE(!len))
1024                 return NULL;
1025         if (WARN_ON_ONCE(len > bio_src->bi_size))
1026                 return NULL;
1027         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1028                 return NULL;
1029
1030         /* Find first affected segment... */
1031
1032         resid = offset;
1033         __bio_for_each_segment(bv, bio_src, idx, 0) {
1034                 if (resid < bv->bv_len)
1035                         break;
1036                 resid -= bv->bv_len;
1037         }
1038         voff = resid;
1039
1040         /* ...and the last affected segment */
1041
1042         resid += len;
1043         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1044                 if (resid <= bv->bv_len)
1045                         break;
1046                 resid -= bv->bv_len;
1047         }
1048         vcnt = end_idx - idx + 1;
1049
1050         /* Build the clone */
1051
1052         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1053         if (!bio)
1054                 return NULL;    /* ENOMEM */
1055
1056         bio->bi_bdev = bio_src->bi_bdev;
1057         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1058         bio->bi_rw = bio_src->bi_rw;
1059         bio->bi_flags |= 1 << BIO_CLONED;
1060
1061         /*
1062          * Copy over our part of the bio_vec, then update the first
1063          * and last (or only) entries.
1064          */
1065         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1066                         vcnt * sizeof (struct bio_vec));
1067         bio->bi_io_vec[0].bv_offset += voff;
1068         if (vcnt > 1) {
1069                 bio->bi_io_vec[0].bv_len -= voff;
1070                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1071         } else {
1072                 bio->bi_io_vec[0].bv_len = len;
1073         }
1074
1075         bio->bi_vcnt = vcnt;
1076         bio->bi_size = len;
1077         bio->bi_idx = 0;
1078
1079         return bio;
1080 }
1081
1082 /*
1083  * Clone a portion of a bio chain, starting at the given byte offset
1084  * into the first bio in the source chain and continuing for the
1085  * number of bytes indicated.  The result is another bio chain of
1086  * exactly the given length, or a null pointer on error.
1087  *
1088  * The bio_src and offset parameters are both in-out.  On entry they
1089  * refer to the first source bio and the offset into that bio where
1090  * the start of data to be cloned is located.
1091  *
1092  * On return, bio_src is updated to refer to the bio in the source
1093  * chain that contains first un-cloned byte, and *offset will
1094  * contain the offset of that byte within that bio.
1095  */
1096 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1097                                         unsigned int *offset,
1098                                         unsigned int len,
1099                                         gfp_t gfpmask)
1100 {
1101         struct bio *bi = *bio_src;
1102         unsigned int off = *offset;
1103         struct bio *chain = NULL;
1104         struct bio **end;
1105
1106         /* Build up a chain of clone bios up to the limit */
1107
1108         if (!bi || off >= bi->bi_size || !len)
1109                 return NULL;            /* Nothing to clone */
1110
1111         end = &chain;
1112         while (len) {
1113                 unsigned int bi_size;
1114                 struct bio *bio;
1115
1116                 if (!bi) {
1117                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1118                         goto out_err;   /* EINVAL; ran out of bio's */
1119                 }
1120                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1121                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1122                 if (!bio)
1123                         goto out_err;   /* ENOMEM */
1124
1125                 *end = bio;
1126                 end = &bio->bi_next;
1127
1128                 off += bi_size;
1129                 if (off == bi->bi_size) {
1130                         bi = bi->bi_next;
1131                         off = 0;
1132                 }
1133                 len -= bi_size;
1134         }
1135         *bio_src = bi;
1136         *offset = off;
1137
1138         return chain;
1139 out_err:
1140         bio_chain_put(chain);
1141
1142         return NULL;
1143 }
1144
1145 /*
1146  * The default/initial value for all object request flags is 0.  For
1147  * each flag, once its value is set to 1 it is never reset to 0
1148  * again.
1149  */
1150 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1151 {
1152         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1153                 struct rbd_device *rbd_dev;
1154
1155                 rbd_dev = obj_request->img_request->rbd_dev;
1156                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1157                         obj_request);
1158         }
1159 }
1160
1161 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1162 {
1163         smp_mb();
1164         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1165 }
1166
1167 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1168 {
1169         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1170                 struct rbd_device *rbd_dev = NULL;
1171
1172                 if (obj_request_img_data_test(obj_request))
1173                         rbd_dev = obj_request->img_request->rbd_dev;
1174                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1175                         obj_request);
1176         }
1177 }
1178
1179 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1180 {
1181         smp_mb();
1182         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1183 }
1184
1185 /*
1186  * This sets the KNOWN flag after (possibly) setting the EXISTS
1187  * flag.  The latter is set based on the "exists" value provided.
1188  *
1189  * Note that for our purposes once an object exists it never goes
1190  * away again.  It's possible that the response from two existence
1191  * checks are separated by the creation of the target object, and
1192  * the first ("doesn't exist") response arrives *after* the second
1193  * ("does exist").  In that case we ignore the second one.
1194  */
1195 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1196                                 bool exists)
1197 {
1198         if (exists)
1199                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1200         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1201         smp_mb();
1202 }
1203
1204 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1205 {
1206         smp_mb();
1207         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1208 }
1209
1210 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1211 {
1212         smp_mb();
1213         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1214 }
1215
1216 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1217 {
1218         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1219                 atomic_read(&obj_request->kref.refcount));
1220         kref_get(&obj_request->kref);
1221 }
1222
1223 static void rbd_obj_request_destroy(struct kref *kref);
1224 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1225 {
1226         rbd_assert(obj_request != NULL);
1227         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1228                 atomic_read(&obj_request->kref.refcount));
1229         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1230 }
1231
1232 static void rbd_img_request_get(struct rbd_img_request *img_request)
1233 {
1234         dout("%s: img %p (was %d)\n", __func__, img_request,
1235                 atomic_read(&img_request->kref.refcount));
1236         kref_get(&img_request->kref);
1237 }
1238
1239 static void rbd_img_request_destroy(struct kref *kref);
1240 static void rbd_img_request_put(struct rbd_img_request *img_request)
1241 {
1242         rbd_assert(img_request != NULL);
1243         dout("%s: img %p (was %d)\n", __func__, img_request,
1244                 atomic_read(&img_request->kref.refcount));
1245         kref_put(&img_request->kref, rbd_img_request_destroy);
1246 }
1247
1248 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1249                                         struct rbd_obj_request *obj_request)
1250 {
1251         rbd_assert(obj_request->img_request == NULL);
1252
1253         /* Image request now owns object's original reference */
1254         obj_request->img_request = img_request;
1255         obj_request->which = img_request->obj_request_count;
1256         rbd_assert(!obj_request_img_data_test(obj_request));
1257         obj_request_img_data_set(obj_request);
1258         rbd_assert(obj_request->which != BAD_WHICH);
1259         img_request->obj_request_count++;
1260         list_add_tail(&obj_request->links, &img_request->obj_requests);
1261         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1262                 obj_request->which);
1263 }
1264
1265 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1266                                         struct rbd_obj_request *obj_request)
1267 {
1268         rbd_assert(obj_request->which != BAD_WHICH);
1269
1270         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1271                 obj_request->which);
1272         list_del(&obj_request->links);
1273         rbd_assert(img_request->obj_request_count > 0);
1274         img_request->obj_request_count--;
1275         rbd_assert(obj_request->which == img_request->obj_request_count);
1276         obj_request->which = BAD_WHICH;
1277         rbd_assert(obj_request_img_data_test(obj_request));
1278         rbd_assert(obj_request->img_request == img_request);
1279         obj_request->img_request = NULL;
1280         obj_request->callback = NULL;
1281         rbd_obj_request_put(obj_request);
1282 }
1283
1284 static bool obj_request_type_valid(enum obj_request_type type)
1285 {
1286         switch (type) {
1287         case OBJ_REQUEST_NODATA:
1288         case OBJ_REQUEST_BIO:
1289         case OBJ_REQUEST_PAGES:
1290                 return true;
1291         default:
1292                 return false;
1293         }
1294 }
1295
1296 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1297                                 struct rbd_obj_request *obj_request)
1298 {
1299         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1300
1301         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1302 }
1303
1304 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1305 {
1306
1307         dout("%s: img %p\n", __func__, img_request);
1308
1309         /*
1310          * If no error occurred, compute the aggregate transfer
1311          * count for the image request.  We could instead use
1312          * atomic64_cmpxchg() to update it as each object request
1313          * completes; not clear which way is better off hand.
1314          */
1315         if (!img_request->result) {
1316                 struct rbd_obj_request *obj_request;
1317                 u64 xferred = 0;
1318
1319                 for_each_obj_request(img_request, obj_request)
1320                         xferred += obj_request->xferred;
1321                 img_request->xferred = xferred;
1322         }
1323
1324         if (img_request->callback)
1325                 img_request->callback(img_request);
1326         else
1327                 rbd_img_request_put(img_request);
1328 }
1329
1330 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1331
1332 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1333 {
1334         dout("%s: obj %p\n", __func__, obj_request);
1335
1336         return wait_for_completion_interruptible(&obj_request->completion);
1337 }
1338
1339 /*
1340  * The default/initial value for all image request flags is 0.  Each
1341  * is conditionally set to 1 at image request initialization time
1342  * and currently never change thereafter.
1343  */
1344 static void img_request_write_set(struct rbd_img_request *img_request)
1345 {
1346         set_bit(IMG_REQ_WRITE, &img_request->flags);
1347         smp_mb();
1348 }
1349
1350 static bool img_request_write_test(struct rbd_img_request *img_request)
1351 {
1352         smp_mb();
1353         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1354 }
1355
1356 static void img_request_child_set(struct rbd_img_request *img_request)
1357 {
1358         set_bit(IMG_REQ_CHILD, &img_request->flags);
1359         smp_mb();
1360 }
1361
1362 static bool img_request_child_test(struct rbd_img_request *img_request)
1363 {
1364         smp_mb();
1365         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1366 }
1367
1368 static void img_request_layered_set(struct rbd_img_request *img_request)
1369 {
1370         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1371         smp_mb();
1372 }
1373
1374 static bool img_request_layered_test(struct rbd_img_request *img_request)
1375 {
1376         smp_mb();
1377         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1378 }
1379
1380 static void
1381 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1382 {
1383         u64 xferred = obj_request->xferred;
1384         u64 length = obj_request->length;
1385
1386         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1387                 obj_request, obj_request->img_request, obj_request->result,
1388                 xferred, length);
1389         /*
1390          * ENOENT means a hole in the image.  We zero-fill the
1391          * entire length of the request.  A short read also implies
1392          * zero-fill to the end of the request.  Either way we
1393          * update the xferred count to indicate the whole request
1394          * was satisfied.
1395          */
1396         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1397         if (obj_request->result == -ENOENT) {
1398                 if (obj_request->type == OBJ_REQUEST_BIO)
1399                         zero_bio_chain(obj_request->bio_list, 0);
1400                 else
1401                         zero_pages(obj_request->pages, 0, length);
1402                 obj_request->result = 0;
1403                 obj_request->xferred = length;
1404         } else if (xferred < length && !obj_request->result) {
1405                 if (obj_request->type == OBJ_REQUEST_BIO)
1406                         zero_bio_chain(obj_request->bio_list, xferred);
1407                 else
1408                         zero_pages(obj_request->pages, xferred, length);
1409                 obj_request->xferred = length;
1410         }
1411         obj_request_done_set(obj_request);
1412 }
1413
1414 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1415 {
1416         dout("%s: obj %p cb %p\n", __func__, obj_request,
1417                 obj_request->callback);
1418         if (obj_request->callback)
1419                 obj_request->callback(obj_request);
1420         else
1421                 complete_all(&obj_request->completion);
1422 }
1423
1424 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1425 {
1426         dout("%s: obj %p\n", __func__, obj_request);
1427         obj_request_done_set(obj_request);
1428 }
1429
1430 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1431 {
1432         struct rbd_img_request *img_request = NULL;
1433         struct rbd_device *rbd_dev = NULL;
1434         bool layered = false;
1435
1436         if (obj_request_img_data_test(obj_request)) {
1437                 img_request = obj_request->img_request;
1438                 layered = img_request && img_request_layered_test(img_request);
1439                 rbd_dev = img_request->rbd_dev;
1440         }
1441
1442         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1443                 obj_request, img_request, obj_request->result,
1444                 obj_request->xferred, obj_request->length);
1445         if (layered && obj_request->result == -ENOENT &&
1446                         obj_request->img_offset < rbd_dev->parent_overlap)
1447                 rbd_img_parent_read(obj_request);
1448         else if (img_request)
1449                 rbd_img_obj_request_read_callback(obj_request);
1450         else
1451                 obj_request_done_set(obj_request);
1452 }
1453
1454 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1455 {
1456         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1457                 obj_request->result, obj_request->length);
1458         /*
1459          * There is no such thing as a successful short write.  Set
1460          * it to our originally-requested length.
1461          */
1462         obj_request->xferred = obj_request->length;
1463         obj_request_done_set(obj_request);
1464 }
1465
1466 /*
1467  * For a simple stat call there's nothing to do.  We'll do more if
1468  * this is part of a write sequence for a layered image.
1469  */
1470 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1471 {
1472         dout("%s: obj %p\n", __func__, obj_request);
1473         obj_request_done_set(obj_request);
1474 }
1475
1476 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1477                                 struct ceph_msg *msg)
1478 {
1479         struct rbd_obj_request *obj_request = osd_req->r_priv;
1480         u16 opcode;
1481
1482         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1483         rbd_assert(osd_req == obj_request->osd_req);
1484         if (obj_request_img_data_test(obj_request)) {
1485                 rbd_assert(obj_request->img_request);
1486                 rbd_assert(obj_request->which != BAD_WHICH);
1487         } else {
1488                 rbd_assert(obj_request->which == BAD_WHICH);
1489         }
1490
1491         if (osd_req->r_result < 0)
1492                 obj_request->result = osd_req->r_result;
1493         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1494
1495         BUG_ON(osd_req->r_num_ops > 2);
1496
1497         /*
1498          * We support a 64-bit length, but ultimately it has to be
1499          * passed to blk_end_request(), which takes an unsigned int.
1500          */
1501         obj_request->xferred = osd_req->r_reply_op_len[0];
1502         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1503         opcode = osd_req->r_ops[0].op;
1504         switch (opcode) {
1505         case CEPH_OSD_OP_READ:
1506                 rbd_osd_read_callback(obj_request);
1507                 break;
1508         case CEPH_OSD_OP_WRITE:
1509                 rbd_osd_write_callback(obj_request);
1510                 break;
1511         case CEPH_OSD_OP_STAT:
1512                 rbd_osd_stat_callback(obj_request);
1513                 break;
1514         case CEPH_OSD_OP_CALL:
1515         case CEPH_OSD_OP_NOTIFY_ACK:
1516         case CEPH_OSD_OP_WATCH:
1517                 rbd_osd_trivial_callback(obj_request);
1518                 break;
1519         default:
1520                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1521                         obj_request->object_name, (unsigned short) opcode);
1522                 break;
1523         }
1524
1525         if (obj_request_done_test(obj_request))
1526                 rbd_obj_request_complete(obj_request);
1527 }
1528
1529 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1530 {
1531         struct rbd_img_request *img_request = obj_request->img_request;
1532         struct ceph_osd_request *osd_req = obj_request->osd_req;
1533         u64 snap_id;
1534
1535         rbd_assert(osd_req != NULL);
1536
1537         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1538         ceph_osdc_build_request(osd_req, obj_request->offset,
1539                         NULL, snap_id, NULL);
1540 }
1541
1542 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1543 {
1544         struct rbd_img_request *img_request = obj_request->img_request;
1545         struct ceph_osd_request *osd_req = obj_request->osd_req;
1546         struct ceph_snap_context *snapc;
1547         struct timespec mtime = CURRENT_TIME;
1548
1549         rbd_assert(osd_req != NULL);
1550
1551         snapc = img_request ? img_request->snapc : NULL;
1552         ceph_osdc_build_request(osd_req, obj_request->offset,
1553                         snapc, CEPH_NOSNAP, &mtime);
1554 }
1555
1556 static struct ceph_osd_request *rbd_osd_req_create(
1557                                         struct rbd_device *rbd_dev,
1558                                         bool write_request,
1559                                         struct rbd_obj_request *obj_request)
1560 {
1561         struct ceph_snap_context *snapc = NULL;
1562         struct ceph_osd_client *osdc;
1563         struct ceph_osd_request *osd_req;
1564
1565         if (obj_request_img_data_test(obj_request)) {
1566                 struct rbd_img_request *img_request = obj_request->img_request;
1567
1568                 rbd_assert(write_request ==
1569                                 img_request_write_test(img_request));
1570                 if (write_request)
1571                         snapc = img_request->snapc;
1572         }
1573
1574         /* Allocate and initialize the request, for the single op */
1575
1576         osdc = &rbd_dev->rbd_client->client->osdc;
1577         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1578         if (!osd_req)
1579                 return NULL;    /* ENOMEM */
1580
1581         if (write_request)
1582                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1583         else
1584                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1585
1586         osd_req->r_callback = rbd_osd_req_callback;
1587         osd_req->r_priv = obj_request;
1588
1589         osd_req->r_oid_len = strlen(obj_request->object_name);
1590         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1591         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1592
1593         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1594
1595         return osd_req;
1596 }
1597
1598 /*
1599  * Create a copyup osd request based on the information in the
1600  * object request supplied.  A copyup request has two osd ops,
1601  * a copyup method call, and a "normal" write request.
1602  */
1603 static struct ceph_osd_request *
1604 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1605 {
1606         struct rbd_img_request *img_request;
1607         struct ceph_snap_context *snapc;
1608         struct rbd_device *rbd_dev;
1609         struct ceph_osd_client *osdc;
1610         struct ceph_osd_request *osd_req;
1611
1612         rbd_assert(obj_request_img_data_test(obj_request));
1613         img_request = obj_request->img_request;
1614         rbd_assert(img_request);
1615         rbd_assert(img_request_write_test(img_request));
1616
1617         /* Allocate and initialize the request, for the two ops */
1618
1619         snapc = img_request->snapc;
1620         rbd_dev = img_request->rbd_dev;
1621         osdc = &rbd_dev->rbd_client->client->osdc;
1622         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1623         if (!osd_req)
1624                 return NULL;    /* ENOMEM */
1625
1626         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1627         osd_req->r_callback = rbd_osd_req_callback;
1628         osd_req->r_priv = obj_request;
1629
1630         osd_req->r_oid_len = strlen(obj_request->object_name);
1631         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1632         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1633
1634         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1635
1636         return osd_req;
1637 }
1638
1639
1640 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1641 {
1642         ceph_osdc_put_request(osd_req);
1643 }
1644
1645 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1646
1647 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1648                                                 u64 offset, u64 length,
1649                                                 enum obj_request_type type)
1650 {
1651         struct rbd_obj_request *obj_request;
1652         size_t size;
1653         char *name;
1654
1655         rbd_assert(obj_request_type_valid(type));
1656
1657         size = strlen(object_name) + 1;
1658         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1659         if (!obj_request)
1660                 return NULL;
1661
1662         name = (char *)(obj_request + 1);
1663         obj_request->object_name = memcpy(name, object_name, size);
1664         obj_request->offset = offset;
1665         obj_request->length = length;
1666         obj_request->flags = 0;
1667         obj_request->which = BAD_WHICH;
1668         obj_request->type = type;
1669         INIT_LIST_HEAD(&obj_request->links);
1670         init_completion(&obj_request->completion);
1671         kref_init(&obj_request->kref);
1672
1673         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1674                 offset, length, (int)type, obj_request);
1675
1676         return obj_request;
1677 }
1678
1679 static void rbd_obj_request_destroy(struct kref *kref)
1680 {
1681         struct rbd_obj_request *obj_request;
1682
1683         obj_request = container_of(kref, struct rbd_obj_request, kref);
1684
1685         dout("%s: obj %p\n", __func__, obj_request);
1686
1687         rbd_assert(obj_request->img_request == NULL);
1688         rbd_assert(obj_request->which == BAD_WHICH);
1689
1690         if (obj_request->osd_req)
1691                 rbd_osd_req_destroy(obj_request->osd_req);
1692
1693         rbd_assert(obj_request_type_valid(obj_request->type));
1694         switch (obj_request->type) {
1695         case OBJ_REQUEST_NODATA:
1696                 break;          /* Nothing to do */
1697         case OBJ_REQUEST_BIO:
1698                 if (obj_request->bio_list)
1699                         bio_chain_put(obj_request->bio_list);
1700                 break;
1701         case OBJ_REQUEST_PAGES:
1702                 if (obj_request->pages)
1703                         ceph_release_page_vector(obj_request->pages,
1704                                                 obj_request->page_count);
1705                 break;
1706         }
1707
1708         kfree(obj_request);
1709 }
1710
1711 /*
1712  * Caller is responsible for filling in the list of object requests
1713  * that comprises the image request, and the Linux request pointer
1714  * (if there is one).
1715  */
1716 static struct rbd_img_request *rbd_img_request_create(
1717                                         struct rbd_device *rbd_dev,
1718                                         u64 offset, u64 length,
1719                                         bool write_request,
1720                                         bool child_request)
1721 {
1722         struct rbd_img_request *img_request;
1723         struct ceph_snap_context *snapc = NULL;
1724
1725         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1726         if (!img_request)
1727                 return NULL;
1728
1729         if (write_request) {
1730                 down_read(&rbd_dev->header_rwsem);
1731                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1732                 up_read(&rbd_dev->header_rwsem);
1733                 if (WARN_ON(!snapc)) {
1734                         kfree(img_request);
1735                         return NULL;    /* Shouldn't happen */
1736                 }
1737
1738         }
1739
1740         img_request->rq = NULL;
1741         img_request->rbd_dev = rbd_dev;
1742         img_request->offset = offset;
1743         img_request->length = length;
1744         img_request->flags = 0;
1745         if (write_request) {
1746                 img_request_write_set(img_request);
1747                 img_request->snapc = snapc;
1748         } else {
1749                 img_request->snap_id = rbd_dev->spec->snap_id;
1750         }
1751         if (child_request)
1752                 img_request_child_set(img_request);
1753         if (rbd_dev->parent_spec)
1754                 img_request_layered_set(img_request);
1755         spin_lock_init(&img_request->completion_lock);
1756         img_request->next_completion = 0;
1757         img_request->callback = NULL;
1758         img_request->result = 0;
1759         img_request->obj_request_count = 0;
1760         INIT_LIST_HEAD(&img_request->obj_requests);
1761         kref_init(&img_request->kref);
1762
1763         rbd_img_request_get(img_request);       /* Avoid a warning */
1764         rbd_img_request_put(img_request);       /* TEMPORARY */
1765
1766         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1767                 write_request ? "write" : "read", offset, length,
1768                 img_request);
1769
1770         return img_request;
1771 }
1772
1773 static void rbd_img_request_destroy(struct kref *kref)
1774 {
1775         struct rbd_img_request *img_request;
1776         struct rbd_obj_request *obj_request;
1777         struct rbd_obj_request *next_obj_request;
1778
1779         img_request = container_of(kref, struct rbd_img_request, kref);
1780
1781         dout("%s: img %p\n", __func__, img_request);
1782
1783         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1784                 rbd_img_obj_request_del(img_request, obj_request);
1785         rbd_assert(img_request->obj_request_count == 0);
1786
1787         if (img_request_write_test(img_request))
1788                 ceph_put_snap_context(img_request->snapc);
1789
1790         if (img_request_child_test(img_request))
1791                 rbd_obj_request_put(img_request->obj_request);
1792
1793         kfree(img_request);
1794 }
1795
1796 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1797 {
1798         struct rbd_img_request *img_request;
1799         unsigned int xferred;
1800         int result;
1801         bool more;
1802
1803         rbd_assert(obj_request_img_data_test(obj_request));
1804         img_request = obj_request->img_request;
1805
1806         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1807         xferred = (unsigned int)obj_request->xferred;
1808         result = obj_request->result;
1809         if (result) {
1810                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1811
1812                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1813                         img_request_write_test(img_request) ? "write" : "read",
1814                         obj_request->length, obj_request->img_offset,
1815                         obj_request->offset);
1816                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1817                         result, xferred);
1818                 if (!img_request->result)
1819                         img_request->result = result;
1820         }
1821
1822         /* Image object requests don't own their page array */
1823
1824         if (obj_request->type == OBJ_REQUEST_PAGES) {
1825                 obj_request->pages = NULL;
1826                 obj_request->page_count = 0;
1827         }
1828
1829         if (img_request_child_test(img_request)) {
1830                 rbd_assert(img_request->obj_request != NULL);
1831                 more = obj_request->which < img_request->obj_request_count - 1;
1832         } else {
1833                 rbd_assert(img_request->rq != NULL);
1834                 more = blk_end_request(img_request->rq, result, xferred);
1835         }
1836
1837         return more;
1838 }
1839
1840 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1841 {
1842         struct rbd_img_request *img_request;
1843         u32 which = obj_request->which;
1844         bool more = true;
1845
1846         rbd_assert(obj_request_img_data_test(obj_request));
1847         img_request = obj_request->img_request;
1848
1849         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1850         rbd_assert(img_request != NULL);
1851         rbd_assert(img_request->obj_request_count > 0);
1852         rbd_assert(which != BAD_WHICH);
1853         rbd_assert(which < img_request->obj_request_count);
1854         rbd_assert(which >= img_request->next_completion);
1855
1856         spin_lock_irq(&img_request->completion_lock);
1857         if (which != img_request->next_completion)
1858                 goto out;
1859
1860         for_each_obj_request_from(img_request, obj_request) {
1861                 rbd_assert(more);
1862                 rbd_assert(which < img_request->obj_request_count);
1863
1864                 if (!obj_request_done_test(obj_request))
1865                         break;
1866                 more = rbd_img_obj_end_request(obj_request);
1867                 which++;
1868         }
1869
1870         rbd_assert(more ^ (which == img_request->obj_request_count));
1871         img_request->next_completion = which;
1872 out:
1873         spin_unlock_irq(&img_request->completion_lock);
1874
1875         if (!more)
1876                 rbd_img_request_complete(img_request);
1877 }
1878
1879 /*
1880  * Split up an image request into one or more object requests, each
1881  * to a different object.  The "type" parameter indicates whether
1882  * "data_desc" is the pointer to the head of a list of bio
1883  * structures, or the base of a page array.  In either case this
1884  * function assumes data_desc describes memory sufficient to hold
1885  * all data described by the image request.
1886  */
1887 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1888                                         enum obj_request_type type,
1889                                         void *data_desc)
1890 {
1891         struct rbd_device *rbd_dev = img_request->rbd_dev;
1892         struct rbd_obj_request *obj_request = NULL;
1893         struct rbd_obj_request *next_obj_request;
1894         bool write_request = img_request_write_test(img_request);
1895         struct bio *bio_list;
1896         unsigned int bio_offset = 0;
1897         struct page **pages;
1898         u64 img_offset;
1899         u64 resid;
1900         u16 opcode;
1901
1902         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1903                 (int)type, data_desc);
1904
1905         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1906         img_offset = img_request->offset;
1907         resid = img_request->length;
1908         rbd_assert(resid > 0);
1909
1910         if (type == OBJ_REQUEST_BIO) {
1911                 bio_list = data_desc;
1912                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1913         } else {
1914                 rbd_assert(type == OBJ_REQUEST_PAGES);
1915                 pages = data_desc;
1916         }
1917
1918         while (resid) {
1919                 struct ceph_osd_request *osd_req;
1920                 const char *object_name;
1921                 u64 offset;
1922                 u64 length;
1923
1924                 object_name = rbd_segment_name(rbd_dev, img_offset);
1925                 if (!object_name)
1926                         goto out_unwind;
1927                 offset = rbd_segment_offset(rbd_dev, img_offset);
1928                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1929                 obj_request = rbd_obj_request_create(object_name,
1930                                                 offset, length, type);
1931                 kfree(object_name);     /* object request has its own copy */
1932                 if (!obj_request)
1933                         goto out_unwind;
1934
1935                 if (type == OBJ_REQUEST_BIO) {
1936                         unsigned int clone_size;
1937
1938                         rbd_assert(length <= (u64)UINT_MAX);
1939                         clone_size = (unsigned int)length;
1940                         obj_request->bio_list =
1941                                         bio_chain_clone_range(&bio_list,
1942                                                                 &bio_offset,
1943                                                                 clone_size,
1944                                                                 GFP_ATOMIC);
1945                         if (!obj_request->bio_list)
1946                                 goto out_partial;
1947                 } else {
1948                         unsigned int page_count;
1949
1950                         obj_request->pages = pages;
1951                         page_count = (u32)calc_pages_for(offset, length);
1952                         obj_request->page_count = page_count;
1953                         if ((offset + length) & ~PAGE_MASK)
1954                                 page_count--;   /* more on last page */
1955                         pages += page_count;
1956                 }
1957
1958                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1959                                                 obj_request);
1960                 if (!osd_req)
1961                         goto out_partial;
1962                 obj_request->osd_req = osd_req;
1963                 obj_request->callback = rbd_img_obj_callback;
1964
1965                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1966                                                 0, 0);
1967                 if (type == OBJ_REQUEST_BIO)
1968                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1969                                         obj_request->bio_list, length);
1970                 else
1971                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1972                                         obj_request->pages, length,
1973                                         offset & ~PAGE_MASK, false, false);
1974
1975                 if (write_request)
1976                         rbd_osd_req_format_write(obj_request);
1977                 else
1978                         rbd_osd_req_format_read(obj_request);
1979
1980                 obj_request->img_offset = img_offset;
1981                 rbd_img_obj_request_add(img_request, obj_request);
1982
1983                 img_offset += length;
1984                 resid -= length;
1985         }
1986
1987         return 0;
1988
1989 out_partial:
1990         rbd_obj_request_put(obj_request);
1991 out_unwind:
1992         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1993                 rbd_obj_request_put(obj_request);
1994
1995         return -ENOMEM;
1996 }
1997
1998 static void
1999 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2000 {
2001         struct rbd_img_request *img_request;
2002         struct rbd_device *rbd_dev;
2003         u64 length;
2004         u32 page_count;
2005
2006         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2007         rbd_assert(obj_request_img_data_test(obj_request));
2008         img_request = obj_request->img_request;
2009         rbd_assert(img_request);
2010
2011         rbd_dev = img_request->rbd_dev;
2012         rbd_assert(rbd_dev);
2013         length = (u64)1 << rbd_dev->header.obj_order;
2014         page_count = (u32)calc_pages_for(0, length);
2015
2016         rbd_assert(obj_request->copyup_pages);
2017         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2018         obj_request->copyup_pages = NULL;
2019
2020         /*
2021          * We want the transfer count to reflect the size of the
2022          * original write request.  There is no such thing as a
2023          * successful short write, so if the request was successful
2024          * we can just set it to the originally-requested length.
2025          */
2026         if (!obj_request->result)
2027                 obj_request->xferred = obj_request->length;
2028
2029         /* Finish up with the normal image object callback */
2030
2031         rbd_img_obj_callback(obj_request);
2032 }
2033
2034 static void
2035 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2036 {
2037         struct rbd_obj_request *orig_request;
2038         struct ceph_osd_request *osd_req;
2039         struct ceph_osd_client *osdc;
2040         struct rbd_device *rbd_dev;
2041         struct page **pages;
2042         int result;
2043         u64 obj_size;
2044         u64 xferred;
2045
2046         rbd_assert(img_request_child_test(img_request));
2047
2048         /* First get what we need from the image request */
2049
2050         pages = img_request->copyup_pages;
2051         rbd_assert(pages != NULL);
2052         img_request->copyup_pages = NULL;
2053
2054         orig_request = img_request->obj_request;
2055         rbd_assert(orig_request != NULL);
2056         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2057         result = img_request->result;
2058         obj_size = img_request->length;
2059         xferred = img_request->xferred;
2060
2061         rbd_dev = img_request->rbd_dev;
2062         rbd_assert(rbd_dev);
2063         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2064
2065         rbd_img_request_put(img_request);
2066
2067         if (result)
2068                 goto out_err;
2069
2070         /* Allocate the new copyup osd request for the original request */
2071
2072         result = -ENOMEM;
2073         rbd_assert(!orig_request->osd_req);
2074         osd_req = rbd_osd_req_create_copyup(orig_request);
2075         if (!osd_req)
2076                 goto out_err;
2077         orig_request->osd_req = osd_req;
2078         orig_request->copyup_pages = pages;
2079
2080         /* Initialize the copyup op */
2081
2082         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2083         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2084                                                 false, false);
2085
2086         /* Then the original write request op */
2087
2088         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2089                                         orig_request->offset,
2090                                         orig_request->length, 0, 0);
2091         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2092                                         orig_request->length);
2093
2094         rbd_osd_req_format_write(orig_request);
2095
2096         /* All set, send it off. */
2097
2098         orig_request->callback = rbd_img_obj_copyup_callback;
2099         osdc = &rbd_dev->rbd_client->client->osdc;
2100         result = rbd_obj_request_submit(osdc, orig_request);
2101         if (!result)
2102                 return;
2103 out_err:
2104         /* Record the error code and complete the request */
2105
2106         orig_request->result = result;
2107         orig_request->xferred = 0;
2108         obj_request_done_set(orig_request);
2109         rbd_obj_request_complete(orig_request);
2110 }
2111
2112 /*
2113  * Read from the parent image the range of data that covers the
2114  * entire target of the given object request.  This is used for
2115  * satisfying a layered image write request when the target of an
2116  * object request from the image request does not exist.
2117  *
2118  * A page array big enough to hold the returned data is allocated
2119  * and supplied to rbd_img_request_fill() as the "data descriptor."
2120  * When the read completes, this page array will be transferred to
2121  * the original object request for the copyup operation.
2122  *
2123  * If an error occurs, record it as the result of the original
2124  * object request and mark it done so it gets completed.
2125  */
2126 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2127 {
2128         struct rbd_img_request *img_request = NULL;
2129         struct rbd_img_request *parent_request = NULL;
2130         struct rbd_device *rbd_dev;
2131         u64 img_offset;
2132         u64 length;
2133         struct page **pages = NULL;
2134         u32 page_count;
2135         int result;
2136
2137         rbd_assert(obj_request_img_data_test(obj_request));
2138         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2139
2140         img_request = obj_request->img_request;
2141         rbd_assert(img_request != NULL);
2142         rbd_dev = img_request->rbd_dev;
2143         rbd_assert(rbd_dev->parent != NULL);
2144
2145         /*
2146          * First things first.  The original osd request is of no
2147          * use to use any more, we'll need a new one that can hold
2148          * the two ops in a copyup request.  We'll get that later,
2149          * but for now we can release the old one.
2150          */
2151         rbd_osd_req_destroy(obj_request->osd_req);
2152         obj_request->osd_req = NULL;
2153
2154         /*
2155          * Determine the byte range covered by the object in the
2156          * child image to which the original request was to be sent.
2157          */
2158         img_offset = obj_request->img_offset - obj_request->offset;
2159         length = (u64)1 << rbd_dev->header.obj_order;
2160
2161         /*
2162          * There is no defined parent data beyond the parent
2163          * overlap, so limit what we read at that boundary if
2164          * necessary.
2165          */
2166         if (img_offset + length > rbd_dev->parent_overlap) {
2167                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2168                 length = rbd_dev->parent_overlap - img_offset;
2169         }
2170
2171         /*
2172          * Allocate a page array big enough to receive the data read
2173          * from the parent.
2174          */
2175         page_count = (u32)calc_pages_for(0, length);
2176         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2177         if (IS_ERR(pages)) {
2178                 result = PTR_ERR(pages);
2179                 pages = NULL;
2180                 goto out_err;
2181         }
2182
2183         result = -ENOMEM;
2184         parent_request = rbd_img_request_create(rbd_dev->parent,
2185                                                 img_offset, length,
2186                                                 false, true);
2187         if (!parent_request)
2188                 goto out_err;
2189         rbd_obj_request_get(obj_request);
2190         parent_request->obj_request = obj_request;
2191
2192         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2193         if (result)
2194                 goto out_err;
2195         parent_request->copyup_pages = pages;
2196
2197         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2198         result = rbd_img_request_submit(parent_request);
2199         if (!result)
2200                 return 0;
2201
2202         parent_request->copyup_pages = NULL;
2203         parent_request->obj_request = NULL;
2204         rbd_obj_request_put(obj_request);
2205 out_err:
2206         if (pages)
2207                 ceph_release_page_vector(pages, page_count);
2208         if (parent_request)
2209                 rbd_img_request_put(parent_request);
2210         obj_request->result = result;
2211         obj_request->xferred = 0;
2212         obj_request_done_set(obj_request);
2213
2214         return result;
2215 }
2216
2217 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2218 {
2219         struct rbd_obj_request *orig_request;
2220         int result;
2221
2222         rbd_assert(!obj_request_img_data_test(obj_request));
2223
2224         /*
2225          * All we need from the object request is the original
2226          * request and the result of the STAT op.  Grab those, then
2227          * we're done with the request.
2228          */
2229         orig_request = obj_request->obj_request;
2230         obj_request->obj_request = NULL;
2231         rbd_assert(orig_request);
2232         rbd_assert(orig_request->img_request);
2233
2234         result = obj_request->result;
2235         obj_request->result = 0;
2236
2237         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2238                 obj_request, orig_request, result,
2239                 obj_request->xferred, obj_request->length);
2240         rbd_obj_request_put(obj_request);
2241
2242         rbd_assert(orig_request);
2243         rbd_assert(orig_request->img_request);
2244
2245         /*
2246          * Our only purpose here is to determine whether the object
2247          * exists, and we don't want to treat the non-existence as
2248          * an error.  If something else comes back, transfer the
2249          * error to the original request and complete it now.
2250          */
2251         if (!result) {
2252                 obj_request_existence_set(orig_request, true);
2253         } else if (result == -ENOENT) {
2254                 obj_request_existence_set(orig_request, false);
2255         } else if (result) {
2256                 orig_request->result = result;
2257                 goto out;
2258         }
2259
2260         /*
2261          * Resubmit the original request now that we have recorded
2262          * whether the target object exists.
2263          */
2264         orig_request->result = rbd_img_obj_request_submit(orig_request);
2265 out:
2266         if (orig_request->result)
2267                 rbd_obj_request_complete(orig_request);
2268         rbd_obj_request_put(orig_request);
2269 }
2270
2271 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2272 {
2273         struct rbd_obj_request *stat_request;
2274         struct rbd_device *rbd_dev;
2275         struct ceph_osd_client *osdc;
2276         struct page **pages = NULL;
2277         u32 page_count;
2278         size_t size;
2279         int ret;
2280
2281         /*
2282          * The response data for a STAT call consists of:
2283          *     le64 length;
2284          *     struct {
2285          *         le32 tv_sec;
2286          *         le32 tv_nsec;
2287          *     } mtime;
2288          */
2289         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2290         page_count = (u32)calc_pages_for(0, size);
2291         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2292         if (IS_ERR(pages))
2293                 return PTR_ERR(pages);
2294
2295         ret = -ENOMEM;
2296         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2297                                                         OBJ_REQUEST_PAGES);
2298         if (!stat_request)
2299                 goto out;
2300
2301         rbd_obj_request_get(obj_request);
2302         stat_request->obj_request = obj_request;
2303         stat_request->pages = pages;
2304         stat_request->page_count = page_count;
2305
2306         rbd_assert(obj_request->img_request);
2307         rbd_dev = obj_request->img_request->rbd_dev;
2308         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2309                                                 stat_request);
2310         if (!stat_request->osd_req)
2311                 goto out;
2312         stat_request->callback = rbd_img_obj_exists_callback;
2313
2314         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2315         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2316                                         false, false);
2317         rbd_osd_req_format_read(stat_request);
2318
2319         osdc = &rbd_dev->rbd_client->client->osdc;
2320         ret = rbd_obj_request_submit(osdc, stat_request);
2321 out:
2322         if (ret)
2323                 rbd_obj_request_put(obj_request);
2324
2325         return ret;
2326 }
2327
2328 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2329 {
2330         struct rbd_img_request *img_request;
2331         struct rbd_device *rbd_dev;
2332         bool known;
2333
2334         rbd_assert(obj_request_img_data_test(obj_request));
2335
2336         img_request = obj_request->img_request;
2337         rbd_assert(img_request);
2338         rbd_dev = img_request->rbd_dev;
2339
2340         /*
2341          * Only writes to layered images need special handling.
2342          * Reads and non-layered writes are simple object requests.
2343          * Layered writes that start beyond the end of the overlap
2344          * with the parent have no parent data, so they too are
2345          * simple object requests.  Finally, if the target object is
2346          * known to already exist, its parent data has already been
2347          * copied, so a write to the object can also be handled as a
2348          * simple object request.
2349          */
2350         if (!img_request_write_test(img_request) ||
2351                 !img_request_layered_test(img_request) ||
2352                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2353                 ((known = obj_request_known_test(obj_request)) &&
2354                         obj_request_exists_test(obj_request))) {
2355
2356                 struct rbd_device *rbd_dev;
2357                 struct ceph_osd_client *osdc;
2358
2359                 rbd_dev = obj_request->img_request->rbd_dev;
2360                 osdc = &rbd_dev->rbd_client->client->osdc;
2361
2362                 return rbd_obj_request_submit(osdc, obj_request);
2363         }
2364
2365         /*
2366          * It's a layered write.  The target object might exist but
2367          * we may not know that yet.  If we know it doesn't exist,
2368          * start by reading the data for the full target object from
2369          * the parent so we can use it for a copyup to the target.
2370          */
2371         if (known)
2372                 return rbd_img_obj_parent_read_full(obj_request);
2373
2374         /* We don't know whether the target exists.  Go find out. */
2375
2376         return rbd_img_obj_exists_submit(obj_request);
2377 }
2378
2379 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2380 {
2381         struct rbd_obj_request *obj_request;
2382         struct rbd_obj_request *next_obj_request;
2383
2384         dout("%s: img %p\n", __func__, img_request);
2385         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2386                 int ret;
2387
2388                 ret = rbd_img_obj_request_submit(obj_request);
2389                 if (ret)
2390                         return ret;
2391         }
2392
2393         return 0;
2394 }
2395
2396 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2397 {
2398         struct rbd_obj_request *obj_request;
2399         struct rbd_device *rbd_dev;
2400         u64 obj_end;
2401
2402         rbd_assert(img_request_child_test(img_request));
2403
2404         obj_request = img_request->obj_request;
2405         rbd_assert(obj_request);
2406         rbd_assert(obj_request->img_request);
2407
2408         obj_request->result = img_request->result;
2409         if (obj_request->result)
2410                 goto out;
2411
2412         /*
2413          * We need to zero anything beyond the parent overlap
2414          * boundary.  Since rbd_img_obj_request_read_callback()
2415          * will zero anything beyond the end of a short read, an
2416          * easy way to do this is to pretend the data from the
2417          * parent came up short--ending at the overlap boundary.
2418          */
2419         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2420         obj_end = obj_request->img_offset + obj_request->length;
2421         rbd_dev = obj_request->img_request->rbd_dev;
2422         if (obj_end > rbd_dev->parent_overlap) {
2423                 u64 xferred = 0;
2424
2425                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2426                         xferred = rbd_dev->parent_overlap -
2427                                         obj_request->img_offset;
2428
2429                 obj_request->xferred = min(img_request->xferred, xferred);
2430         } else {
2431                 obj_request->xferred = img_request->xferred;
2432         }
2433 out:
2434         rbd_img_obj_request_read_callback(obj_request);
2435         rbd_obj_request_complete(obj_request);
2436 }
2437
2438 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2439 {
2440         struct rbd_device *rbd_dev;
2441         struct rbd_img_request *img_request;
2442         int result;
2443
2444         rbd_assert(obj_request_img_data_test(obj_request));
2445         rbd_assert(obj_request->img_request != NULL);
2446         rbd_assert(obj_request->result == (s32) -ENOENT);
2447         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2448
2449         rbd_dev = obj_request->img_request->rbd_dev;
2450         rbd_assert(rbd_dev->parent != NULL);
2451         /* rbd_read_finish(obj_request, obj_request->length); */
2452         img_request = rbd_img_request_create(rbd_dev->parent,
2453                                                 obj_request->img_offset,
2454                                                 obj_request->length,
2455                                                 false, true);
2456         result = -ENOMEM;
2457         if (!img_request)
2458                 goto out_err;
2459
2460         rbd_obj_request_get(obj_request);
2461         img_request->obj_request = obj_request;
2462
2463         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2464                                         obj_request->bio_list);
2465         if (result)
2466                 goto out_err;
2467
2468         img_request->callback = rbd_img_parent_read_callback;
2469         result = rbd_img_request_submit(img_request);
2470         if (result)
2471                 goto out_err;
2472
2473         return;
2474 out_err:
2475         if (img_request)
2476                 rbd_img_request_put(img_request);
2477         obj_request->result = result;
2478         obj_request->xferred = 0;
2479         obj_request_done_set(obj_request);
2480 }
2481
2482 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2483                                    u64 ver, u64 notify_id)
2484 {
2485         struct rbd_obj_request *obj_request;
2486         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2487         int ret;
2488
2489         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2490                                                         OBJ_REQUEST_NODATA);
2491         if (!obj_request)
2492                 return -ENOMEM;
2493
2494         ret = -ENOMEM;
2495         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2496         if (!obj_request->osd_req)
2497                 goto out;
2498         obj_request->callback = rbd_obj_request_put;
2499
2500         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2501                                         notify_id, ver, 0);
2502         rbd_osd_req_format_read(obj_request);
2503
2504         ret = rbd_obj_request_submit(osdc, obj_request);
2505 out:
2506         if (ret)
2507                 rbd_obj_request_put(obj_request);
2508
2509         return ret;
2510 }
2511
2512 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2513 {
2514         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2515         u64 hver;
2516
2517         if (!rbd_dev)
2518                 return;
2519
2520         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2521                 rbd_dev->header_name, (unsigned long long) notify_id,
2522                 (unsigned int) opcode);
2523         (void)rbd_dev_refresh(rbd_dev, &hver);
2524
2525         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2526 }
2527
2528 /*
2529  * Request sync osd watch/unwatch.  The value of "start" determines
2530  * whether a watch request is being initiated or torn down.
2531  */
2532 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2533 {
2534         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2535         struct rbd_obj_request *obj_request;
2536         int ret;
2537
2538         rbd_assert(start ^ !!rbd_dev->watch_event);
2539         rbd_assert(start ^ !!rbd_dev->watch_request);
2540
2541         if (start) {
2542                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2543                                                 &rbd_dev->watch_event);
2544                 if (ret < 0)
2545                         return ret;
2546                 rbd_assert(rbd_dev->watch_event != NULL);
2547         }
2548
2549         ret = -ENOMEM;
2550         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2551                                                         OBJ_REQUEST_NODATA);
2552         if (!obj_request)
2553                 goto out_cancel;
2554
2555         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2556         if (!obj_request->osd_req)
2557                 goto out_cancel;
2558
2559         if (start)
2560                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2561         else
2562                 ceph_osdc_unregister_linger_request(osdc,
2563                                         rbd_dev->watch_request->osd_req);
2564
2565         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2566                                 rbd_dev->watch_event->cookie,
2567                                 rbd_dev->header.obj_version, start);
2568         rbd_osd_req_format_write(obj_request);
2569
2570         ret = rbd_obj_request_submit(osdc, obj_request);
2571         if (ret)
2572                 goto out_cancel;
2573         ret = rbd_obj_request_wait(obj_request);
2574         if (ret)
2575                 goto out_cancel;
2576         ret = obj_request->result;
2577         if (ret)
2578                 goto out_cancel;
2579
2580         /*
2581          * A watch request is set to linger, so the underlying osd
2582          * request won't go away until we unregister it.  We retain
2583          * a pointer to the object request during that time (in
2584          * rbd_dev->watch_request), so we'll keep a reference to
2585          * it.  We'll drop that reference (below) after we've
2586          * unregistered it.
2587          */
2588         if (start) {
2589                 rbd_dev->watch_request = obj_request;
2590
2591                 return 0;
2592         }
2593
2594         /* We have successfully torn down the watch request */
2595
2596         rbd_obj_request_put(rbd_dev->watch_request);
2597         rbd_dev->watch_request = NULL;
2598 out_cancel:
2599         /* Cancel the event if we're tearing down, or on error */
2600         ceph_osdc_cancel_event(rbd_dev->watch_event);
2601         rbd_dev->watch_event = NULL;
2602         if (obj_request)
2603                 rbd_obj_request_put(obj_request);
2604
2605         return ret;
2606 }
2607
2608 /*
2609  * Synchronous osd object method call.  Returns the number of bytes
2610  * returned in the outbound buffer, or a negative error code.
2611  */
2612 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2613                              const char *object_name,
2614                              const char *class_name,
2615                              const char *method_name,
2616                              const void *outbound,
2617                              size_t outbound_size,
2618                              void *inbound,
2619                              size_t inbound_size,
2620                              u64 *version)
2621 {
2622         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2623         struct rbd_obj_request *obj_request;
2624         struct page **pages;
2625         u32 page_count;
2626         int ret;
2627
2628         /*
2629          * Method calls are ultimately read operations.  The result
2630          * should placed into the inbound buffer provided.  They
2631          * also supply outbound data--parameters for the object
2632          * method.  Currently if this is present it will be a
2633          * snapshot id.
2634          */
2635         page_count = (u32)calc_pages_for(0, inbound_size);
2636         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2637         if (IS_ERR(pages))
2638                 return PTR_ERR(pages);
2639
2640         ret = -ENOMEM;
2641         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2642                                                         OBJ_REQUEST_PAGES);
2643         if (!obj_request)
2644                 goto out;
2645
2646         obj_request->pages = pages;
2647         obj_request->page_count = page_count;
2648
2649         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2650         if (!obj_request->osd_req)
2651                 goto out;
2652
2653         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2654                                         class_name, method_name);
2655         if (outbound_size) {
2656                 struct ceph_pagelist *pagelist;
2657
2658                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2659                 if (!pagelist)
2660                         goto out;
2661
2662                 ceph_pagelist_init(pagelist);
2663                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2664                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2665                                                 pagelist);
2666         }
2667         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2668                                         obj_request->pages, inbound_size,
2669                                         0, false, false);
2670         rbd_osd_req_format_read(obj_request);
2671
2672         ret = rbd_obj_request_submit(osdc, obj_request);
2673         if (ret)
2674                 goto out;
2675         ret = rbd_obj_request_wait(obj_request);
2676         if (ret)
2677                 goto out;
2678
2679         ret = obj_request->result;
2680         if (ret < 0)
2681                 goto out;
2682
2683         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2684         ret = (int)obj_request->xferred;
2685         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2686         if (version)
2687                 *version = obj_request->version;
2688 out:
2689         if (obj_request)
2690                 rbd_obj_request_put(obj_request);
2691         else
2692                 ceph_release_page_vector(pages, page_count);
2693
2694         return ret;
2695 }
2696
2697 static void rbd_request_fn(struct request_queue *q)
2698                 __releases(q->queue_lock) __acquires(q->queue_lock)
2699 {
2700         struct rbd_device *rbd_dev = q->queuedata;
2701         bool read_only = rbd_dev->mapping.read_only;
2702         struct request *rq;
2703         int result;
2704
2705         while ((rq = blk_fetch_request(q))) {
2706                 bool write_request = rq_data_dir(rq) == WRITE;
2707                 struct rbd_img_request *img_request;
2708                 u64 offset;
2709                 u64 length;
2710
2711                 /* Ignore any non-FS requests that filter through. */
2712
2713                 if (rq->cmd_type != REQ_TYPE_FS) {
2714                         dout("%s: non-fs request type %d\n", __func__,
2715                                 (int) rq->cmd_type);
2716                         __blk_end_request_all(rq, 0);
2717                         continue;
2718                 }
2719
2720                 /* Ignore/skip any zero-length requests */
2721
2722                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2723                 length = (u64) blk_rq_bytes(rq);
2724
2725                 if (!length) {
2726                         dout("%s: zero-length request\n", __func__);
2727                         __blk_end_request_all(rq, 0);
2728                         continue;
2729                 }
2730
2731                 spin_unlock_irq(q->queue_lock);
2732
2733                 /* Disallow writes to a read-only device */
2734
2735                 if (write_request) {
2736                         result = -EROFS;
2737                         if (read_only)
2738                                 goto end_request;
2739                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2740                 }
2741
2742                 /*
2743                  * Quit early if the mapped snapshot no longer
2744                  * exists.  It's still possible the snapshot will
2745                  * have disappeared by the time our request arrives
2746                  * at the osd, but there's no sense in sending it if
2747                  * we already know.
2748                  */
2749                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2750                         dout("request for non-existent snapshot");
2751                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2752                         result = -ENXIO;
2753                         goto end_request;
2754                 }
2755
2756                 result = -EINVAL;
2757                 if (offset && length > U64_MAX - offset + 1) {
2758                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2759                                 offset, length);
2760                         goto end_request;       /* Shouldn't happen */
2761                 }
2762
2763                 result = -ENOMEM;
2764                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2765                                                         write_request, false);
2766                 if (!img_request)
2767                         goto end_request;
2768
2769                 img_request->rq = rq;
2770
2771                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2772                                                 rq->bio);
2773                 if (!result)
2774                         result = rbd_img_request_submit(img_request);
2775                 if (result)
2776                         rbd_img_request_put(img_request);
2777 end_request:
2778                 spin_lock_irq(q->queue_lock);
2779                 if (result < 0) {
2780                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2781                                 write_request ? "write" : "read",
2782                                 length, offset, result);
2783
2784                         __blk_end_request_all(rq, result);
2785                 }
2786         }
2787 }
2788
2789 /*
2790  * a queue callback. Makes sure that we don't create a bio that spans across
2791  * multiple osd objects. One exception would be with a single page bios,
2792  * which we handle later at bio_chain_clone_range()
2793  */
2794 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2795                           struct bio_vec *bvec)
2796 {
2797         struct rbd_device *rbd_dev = q->queuedata;
2798         sector_t sector_offset;
2799         sector_t sectors_per_obj;
2800         sector_t obj_sector_offset;
2801         int ret;
2802
2803         /*
2804          * Find how far into its rbd object the partition-relative
2805          * bio start sector is to offset relative to the enclosing
2806          * device.
2807          */
2808         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2809         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2810         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2811
2812         /*
2813          * Compute the number of bytes from that offset to the end
2814          * of the object.  Account for what's already used by the bio.
2815          */
2816         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2817         if (ret > bmd->bi_size)
2818                 ret -= bmd->bi_size;
2819         else
2820                 ret = 0;
2821
2822         /*
2823          * Don't send back more than was asked for.  And if the bio
2824          * was empty, let the whole thing through because:  "Note
2825          * that a block device *must* allow a single page to be
2826          * added to an empty bio."
2827          */
2828         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2829         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2830                 ret = (int) bvec->bv_len;
2831
2832         return ret;
2833 }
2834
2835 static void rbd_free_disk(struct rbd_device *rbd_dev)
2836 {
2837         struct gendisk *disk = rbd_dev->disk;
2838
2839         if (!disk)
2840                 return;
2841
2842         rbd_dev->disk = NULL;
2843         if (disk->flags & GENHD_FL_UP) {
2844                 del_gendisk(disk);
2845                 if (disk->queue)
2846                         blk_cleanup_queue(disk->queue);
2847         }
2848         put_disk(disk);
2849 }
2850
2851 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2852                                 const char *object_name,
2853                                 u64 offset, u64 length,
2854                                 void *buf, u64 *version)
2855
2856 {
2857         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2858         struct rbd_obj_request *obj_request;
2859         struct page **pages = NULL;
2860         u32 page_count;
2861         size_t size;
2862         int ret;
2863
2864         page_count = (u32) calc_pages_for(offset, length);
2865         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2866         if (IS_ERR(pages))
2867                 ret = PTR_ERR(pages);
2868
2869         ret = -ENOMEM;
2870         obj_request = rbd_obj_request_create(object_name, offset, length,
2871                                                         OBJ_REQUEST_PAGES);
2872         if (!obj_request)
2873                 goto out;
2874
2875         obj_request->pages = pages;
2876         obj_request->page_count = page_count;
2877
2878         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2879         if (!obj_request->osd_req)
2880                 goto out;
2881
2882         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2883                                         offset, length, 0, 0);
2884         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2885                                         obj_request->pages,
2886                                         obj_request->length,
2887                                         obj_request->offset & ~PAGE_MASK,
2888                                         false, false);
2889         rbd_osd_req_format_read(obj_request);
2890
2891         ret = rbd_obj_request_submit(osdc, obj_request);
2892         if (ret)
2893                 goto out;
2894         ret = rbd_obj_request_wait(obj_request);
2895         if (ret)
2896                 goto out;
2897
2898         ret = obj_request->result;
2899         if (ret < 0)
2900                 goto out;
2901
2902         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2903         size = (size_t) obj_request->xferred;
2904         ceph_copy_from_page_vector(pages, buf, 0, size);
2905         rbd_assert(size <= (size_t) INT_MAX);
2906         ret = (int) size;
2907         if (version)
2908                 *version = obj_request->version;
2909 out:
2910         if (obj_request)
2911                 rbd_obj_request_put(obj_request);
2912         else
2913                 ceph_release_page_vector(pages, page_count);
2914
2915         return ret;
2916 }
2917
2918 /*
2919  * Read the complete header for the given rbd device.
2920  *
2921  * Returns a pointer to a dynamically-allocated buffer containing
2922  * the complete and validated header.  Caller can pass the address
2923  * of a variable that will be filled in with the version of the
2924  * header object at the time it was read.
2925  *
2926  * Returns a pointer-coded errno if a failure occurs.
2927  */
2928 static struct rbd_image_header_ondisk *
2929 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2930 {
2931         struct rbd_image_header_ondisk *ondisk = NULL;
2932         u32 snap_count = 0;
2933         u64 names_size = 0;
2934         u32 want_count;
2935         int ret;
2936
2937         /*
2938          * The complete header will include an array of its 64-bit
2939          * snapshot ids, followed by the names of those snapshots as
2940          * a contiguous block of NUL-terminated strings.  Note that
2941          * the number of snapshots could change by the time we read
2942          * it in, in which case we re-read it.
2943          */
2944         do {
2945                 size_t size;
2946
2947                 kfree(ondisk);
2948
2949                 size = sizeof (*ondisk);
2950                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2951                 size += names_size;
2952                 ondisk = kmalloc(size, GFP_KERNEL);
2953                 if (!ondisk)
2954                         return ERR_PTR(-ENOMEM);
2955
2956                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2957                                        0, size, ondisk, version);
2958                 if (ret < 0)
2959                         goto out_err;
2960                 if ((size_t)ret < size) {
2961                         ret = -ENXIO;
2962                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2963                                 size, ret);
2964                         goto out_err;
2965                 }
2966                 if (!rbd_dev_ondisk_valid(ondisk)) {
2967                         ret = -ENXIO;
2968                         rbd_warn(rbd_dev, "invalid header");
2969                         goto out_err;
2970                 }
2971
2972                 names_size = le64_to_cpu(ondisk->snap_names_len);
2973                 want_count = snap_count;
2974                 snap_count = le32_to_cpu(ondisk->snap_count);
2975         } while (snap_count != want_count);
2976
2977         return ondisk;
2978
2979 out_err:
2980         kfree(ondisk);
2981
2982         return ERR_PTR(ret);
2983 }
2984
2985 /*
2986  * reload the ondisk the header
2987  */
2988 static int rbd_read_header(struct rbd_device *rbd_dev,
2989                            struct rbd_image_header *header)
2990 {
2991         struct rbd_image_header_ondisk *ondisk;
2992         u64 ver = 0;
2993         int ret;
2994
2995         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2996         if (IS_ERR(ondisk))
2997                 return PTR_ERR(ondisk);
2998         ret = rbd_header_from_disk(header, ondisk);
2999         if (ret >= 0)
3000                 header->obj_version = ver;
3001         kfree(ondisk);
3002
3003         return ret;
3004 }
3005
3006 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3007 {
3008         struct rbd_snap *snap;
3009         struct rbd_snap *next;
3010
3011         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3012                 list_del(&snap->node);
3013                 rbd_snap_destroy(snap);
3014         }
3015 }
3016
3017 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3018 {
3019         sector_t size;
3020
3021         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3022                 return;
3023
3024         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3025         dout("setting size to %llu sectors", (unsigned long long) size);
3026         rbd_dev->mapping.size = (u64) size;
3027         set_capacity(rbd_dev->disk, size);
3028 }
3029
3030 /*
3031  * only read the first part of the ondisk header, without the snaps info
3032  */
3033 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3034 {
3035         int ret;
3036         struct rbd_image_header h;
3037
3038         ret = rbd_read_header(rbd_dev, &h);
3039         if (ret < 0)
3040                 return ret;
3041
3042         down_write(&rbd_dev->header_rwsem);
3043
3044         /* Update image size, and check for resize of mapped image */
3045         rbd_dev->header.image_size = h.image_size;
3046         rbd_update_mapping_size(rbd_dev);
3047
3048         /* rbd_dev->header.object_prefix shouldn't change */
3049         kfree(rbd_dev->header.snap_sizes);
3050         kfree(rbd_dev->header.snap_names);
3051         /* osd requests may still refer to snapc */
3052         ceph_put_snap_context(rbd_dev->header.snapc);
3053
3054         if (hver)
3055                 *hver = h.obj_version;
3056         rbd_dev->header.obj_version = h.obj_version;
3057         rbd_dev->header.image_size = h.image_size;
3058         rbd_dev->header.snapc = h.snapc;
3059         rbd_dev->header.snap_names = h.snap_names;
3060         rbd_dev->header.snap_sizes = h.snap_sizes;
3061         /* Free the extra copy of the object prefix */
3062         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3063                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3064         kfree(h.object_prefix);
3065
3066         ret = rbd_dev_snaps_update(rbd_dev);
3067
3068         up_write(&rbd_dev->header_rwsem);
3069
3070         return ret;
3071 }
3072
3073 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3074 {
3075         int ret;
3076
3077         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3078         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3079         if (rbd_dev->image_format == 1)
3080                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3081         else
3082                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3083         mutex_unlock(&ctl_mutex);
3084         revalidate_disk(rbd_dev->disk);
3085         if (ret)
3086                 rbd_warn(rbd_dev, "got notification but failed to "
3087                            " update snaps: %d\n", ret);
3088
3089         return ret;
3090 }
3091
3092 static int rbd_init_disk(struct rbd_device *rbd_dev)
3093 {
3094         struct gendisk *disk;
3095         struct request_queue *q;
3096         u64 segment_size;
3097
3098         /* create gendisk info */
3099         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3100         if (!disk)
3101                 return -ENOMEM;
3102
3103         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3104                  rbd_dev->dev_id);
3105         disk->major = rbd_dev->major;
3106         disk->first_minor = 0;
3107         disk->fops = &rbd_bd_ops;
3108         disk->private_data = rbd_dev;
3109
3110         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3111         if (!q)
3112                 goto out_disk;
3113
3114         /* We use the default size, but let's be explicit about it. */
3115         blk_queue_physical_block_size(q, SECTOR_SIZE);
3116
3117         /* set io sizes to object size */
3118         segment_size = rbd_obj_bytes(&rbd_dev->header);
3119         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3120         blk_queue_max_segment_size(q, segment_size);
3121         blk_queue_io_min(q, segment_size);
3122         blk_queue_io_opt(q, segment_size);
3123
3124         blk_queue_merge_bvec(q, rbd_merge_bvec);
3125         disk->queue = q;
3126
3127         q->queuedata = rbd_dev;
3128
3129         rbd_dev->disk = disk;
3130
3131         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3132
3133         return 0;
3134 out_disk:
3135         put_disk(disk);
3136
3137         return -ENOMEM;
3138 }
3139
3140 /*
3141   sysfs
3142 */
3143
3144 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3145 {
3146         return container_of(dev, struct rbd_device, dev);
3147 }
3148
3149 static ssize_t rbd_size_show(struct device *dev,
3150                              struct device_attribute *attr, char *buf)
3151 {
3152         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3153         sector_t size;
3154
3155         down_read(&rbd_dev->header_rwsem);
3156         size = get_capacity(rbd_dev->disk);
3157         up_read(&rbd_dev->header_rwsem);
3158
3159         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
3160 }
3161
3162 /*
3163  * Note this shows the features for whatever's mapped, which is not
3164  * necessarily the base image.
3165  */
3166 static ssize_t rbd_features_show(struct device *dev,
3167                              struct device_attribute *attr, char *buf)
3168 {
3169         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3170
3171         return sprintf(buf, "0x%016llx\n",
3172                         (unsigned long long) rbd_dev->mapping.features);
3173 }
3174
3175 static ssize_t rbd_major_show(struct device *dev,
3176                               struct device_attribute *attr, char *buf)
3177 {
3178         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3179
3180         return sprintf(buf, "%d\n", rbd_dev->major);
3181 }
3182
3183 static ssize_t rbd_client_id_show(struct device *dev,
3184                                   struct device_attribute *attr, char *buf)
3185 {
3186         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3187
3188         return sprintf(buf, "client%lld\n",
3189                         ceph_client_id(rbd_dev->rbd_client->client));
3190 }
3191
3192 static ssize_t rbd_pool_show(struct device *dev,
3193                              struct device_attribute *attr, char *buf)
3194 {
3195         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3196
3197         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3198 }
3199
3200 static ssize_t rbd_pool_id_show(struct device *dev,
3201                              struct device_attribute *attr, char *buf)
3202 {
3203         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
3205         return sprintf(buf, "%llu\n",
3206                 (unsigned long long) rbd_dev->spec->pool_id);
3207 }
3208
3209 static ssize_t rbd_name_show(struct device *dev,
3210                              struct device_attribute *attr, char *buf)
3211 {
3212         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3213
3214         if (rbd_dev->spec->image_name)
3215                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3216
3217         return sprintf(buf, "(unknown)\n");
3218 }
3219
3220 static ssize_t rbd_image_id_show(struct device *dev,
3221                              struct device_attribute *attr, char *buf)
3222 {
3223         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3224
3225         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3226 }
3227
3228 /*
3229  * Shows the name of the currently-mapped snapshot (or
3230  * RBD_SNAP_HEAD_NAME for the base image).
3231  */
3232 static ssize_t rbd_snap_show(struct device *dev,
3233                              struct device_attribute *attr,
3234                              char *buf)
3235 {
3236         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3237
3238         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3239 }
3240
3241 /*
3242  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3243  * for the parent image.  If there is no parent, simply shows
3244  * "(no parent image)".
3245  */
3246 static ssize_t rbd_parent_show(struct device *dev,
3247                              struct device_attribute *attr,
3248                              char *buf)
3249 {
3250         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3251         struct rbd_spec *spec = rbd_dev->parent_spec;
3252         int count;
3253         char *bufp = buf;
3254
3255         if (!spec)
3256                 return sprintf(buf, "(no parent image)\n");
3257
3258         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3259                         (unsigned long long) spec->pool_id, spec->pool_name);
3260         if (count < 0)
3261                 return count;
3262         bufp += count;
3263
3264         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3265                         spec->image_name ? spec->image_name : "(unknown)");
3266         if (count < 0)
3267                 return count;
3268         bufp += count;
3269
3270         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3271                         (unsigned long long) spec->snap_id, spec->snap_name);
3272         if (count < 0)
3273                 return count;
3274         bufp += count;
3275
3276         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3277         if (count < 0)
3278                 return count;
3279         bufp += count;
3280
3281         return (ssize_t) (bufp - buf);
3282 }
3283
3284 static ssize_t rbd_image_refresh(struct device *dev,
3285                                  struct device_attribute *attr,
3286                                  const char *buf,
3287                                  size_t size)
3288 {
3289         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3290         int ret;
3291
3292         ret = rbd_dev_refresh(rbd_dev, NULL);
3293
3294         return ret < 0 ? ret : size;
3295 }
3296
3297 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3298 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3299 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3300 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3301 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3302 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3303 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3304 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3305 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3306 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3307 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3308
3309 static struct attribute *rbd_attrs[] = {
3310         &dev_attr_size.attr,
3311         &dev_attr_features.attr,
3312         &dev_attr_major.attr,
3313         &dev_attr_client_id.attr,
3314         &dev_attr_pool.attr,
3315         &dev_attr_pool_id.attr,
3316         &dev_attr_name.attr,
3317         &dev_attr_image_id.attr,
3318         &dev_attr_current_snap.attr,
3319         &dev_attr_parent.attr,
3320         &dev_attr_refresh.attr,
3321         NULL
3322 };
3323
3324 static struct attribute_group rbd_attr_group = {
3325         .attrs = rbd_attrs,
3326 };
3327
3328 static const struct attribute_group *rbd_attr_groups[] = {
3329         &rbd_attr_group,
3330         NULL
3331 };
3332
3333 static void rbd_sysfs_dev_release(struct device *dev)
3334 {
3335 }
3336
3337 static struct device_type rbd_device_type = {
3338         .name           = "rbd",
3339         .groups         = rbd_attr_groups,
3340         .release        = rbd_sysfs_dev_release,
3341 };
3342
3343 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3344 {
3345         kref_get(&spec->kref);
3346
3347         return spec;
3348 }
3349
3350 static void rbd_spec_free(struct kref *kref);
3351 static void rbd_spec_put(struct rbd_spec *spec)
3352 {
3353         if (spec)
3354                 kref_put(&spec->kref, rbd_spec_free);
3355 }
3356
3357 static struct rbd_spec *rbd_spec_alloc(void)
3358 {
3359         struct rbd_spec *spec;
3360
3361         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3362         if (!spec)
3363                 return NULL;
3364         kref_init(&spec->kref);
3365
3366         return spec;
3367 }
3368
3369 static void rbd_spec_free(struct kref *kref)
3370 {
3371         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3372
3373         kfree(spec->pool_name);
3374         kfree(spec->image_id);
3375         kfree(spec->image_name);
3376         kfree(spec->snap_name);
3377         kfree(spec);
3378 }
3379
3380 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3381                                 struct rbd_spec *spec)
3382 {
3383         struct rbd_device *rbd_dev;
3384
3385         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3386         if (!rbd_dev)
3387                 return NULL;
3388
3389         spin_lock_init(&rbd_dev->lock);
3390         rbd_dev->flags = 0;
3391         INIT_LIST_HEAD(&rbd_dev->node);
3392         INIT_LIST_HEAD(&rbd_dev->snaps);
3393         init_rwsem(&rbd_dev->header_rwsem);
3394
3395         rbd_dev->spec = spec;
3396         rbd_dev->rbd_client = rbdc;
3397
3398         /* Initialize the layout used for all rbd requests */
3399
3400         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3401         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3402         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3403         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3404
3405         return rbd_dev;
3406 }
3407
3408 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3409 {
3410         rbd_spec_put(rbd_dev->parent_spec);
3411         kfree(rbd_dev->header_name);
3412         rbd_put_client(rbd_dev->rbd_client);
3413         rbd_spec_put(rbd_dev->spec);
3414         kfree(rbd_dev);
3415 }
3416
3417 static void rbd_snap_destroy(struct rbd_snap *snap)
3418 {
3419         kfree(snap->name);
3420         kfree(snap);
3421 }
3422
3423 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3424                                                 const char *snap_name,
3425                                                 u64 snap_id, u64 snap_size,
3426                                                 u64 snap_features)
3427 {
3428         struct rbd_snap *snap;
3429
3430         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3431         if (!snap)
3432                 return ERR_PTR(-ENOMEM);
3433
3434         snap->name = snap_name;
3435         snap->id = snap_id;
3436         snap->size = snap_size;
3437         snap->features = snap_features;
3438
3439         return snap;
3440 }
3441
3442 /*
3443  * Returns a dynamically-allocated snapshot name if successful, or a
3444  * pointer-coded error otherwise.
3445  */
3446 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3447                 u64 *snap_size, u64 *snap_features)
3448 {
3449         char *snap_name;
3450         int i;
3451
3452         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3453
3454         /* Skip over names until we find the one we are looking for */
3455
3456         snap_name = rbd_dev->header.snap_names;
3457         for (i = 0; i < which; i++)
3458                 snap_name += strlen(snap_name) + 1;
3459
3460         snap_name = kstrdup(snap_name, GFP_KERNEL);
3461         if (!snap_name)
3462                 return ERR_PTR(-ENOMEM);
3463
3464         *snap_size = rbd_dev->header.snap_sizes[which];
3465         *snap_features = 0;     /* No features for v1 */
3466
3467         return snap_name;
3468 }
3469
3470 /*
3471  * Get the size and object order for an image snapshot, or if
3472  * snap_id is CEPH_NOSNAP, gets this information for the base
3473  * image.
3474  */
3475 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3476                                 u8 *order, u64 *snap_size)
3477 {
3478         __le64 snapid = cpu_to_le64(snap_id);
3479         int ret;
3480         struct {
3481                 u8 order;
3482                 __le64 size;
3483         } __attribute__ ((packed)) size_buf = { 0 };
3484
3485         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3486                                 "rbd", "get_size",
3487                                 &snapid, sizeof (snapid),
3488                                 &size_buf, sizeof (size_buf), NULL);
3489         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3490         if (ret < 0)
3491                 return ret;
3492         if (ret < sizeof (size_buf))
3493                 return -ERANGE;
3494
3495         if (order)
3496                 *order = size_buf.order;
3497         *snap_size = le64_to_cpu(size_buf.size);
3498
3499         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3500                 (unsigned long long)snap_id, (unsigned int)*order,
3501                 (unsigned long long)*snap_size);
3502
3503         return 0;
3504 }
3505
3506 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3507 {
3508         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3509                                         &rbd_dev->header.obj_order,
3510                                         &rbd_dev->header.image_size);
3511 }
3512
3513 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3514 {
3515         void *reply_buf;
3516         int ret;
3517         void *p;
3518
3519         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3520         if (!reply_buf)
3521                 return -ENOMEM;
3522
3523         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3524                                 "rbd", "get_object_prefix", NULL, 0,
3525                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3526         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3527         if (ret < 0)
3528                 goto out;
3529
3530         p = reply_buf;
3531         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3532                                                 p + ret, NULL, GFP_NOIO);
3533         ret = 0;
3534
3535         if (IS_ERR(rbd_dev->header.object_prefix)) {
3536                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3537                 rbd_dev->header.object_prefix = NULL;
3538         } else {
3539                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3540         }
3541 out:
3542         kfree(reply_buf);
3543
3544         return ret;
3545 }
3546
3547 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3548                 u64 *snap_features)
3549 {
3550         __le64 snapid = cpu_to_le64(snap_id);
3551         struct {
3552                 __le64 features;
3553                 __le64 incompat;
3554         } __attribute__ ((packed)) features_buf = { 0 };
3555         u64 incompat;
3556         int ret;
3557
3558         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3559                                 "rbd", "get_features",
3560                                 &snapid, sizeof (snapid),
3561                                 &features_buf, sizeof (features_buf), NULL);
3562         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3563         if (ret < 0)
3564                 return ret;
3565         if (ret < sizeof (features_buf))
3566                 return -ERANGE;
3567
3568         incompat = le64_to_cpu(features_buf.incompat);
3569         if (incompat & ~RBD_FEATURES_SUPPORTED)
3570                 return -ENXIO;
3571
3572         *snap_features = le64_to_cpu(features_buf.features);
3573
3574         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3575                 (unsigned long long)snap_id,
3576                 (unsigned long long)*snap_features,
3577                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3578
3579         return 0;
3580 }
3581
3582 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3583 {
3584         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3585                                                 &rbd_dev->header.features);
3586 }
3587
3588 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3589 {
3590         struct rbd_spec *parent_spec;
3591         size_t size;
3592         void *reply_buf = NULL;
3593         __le64 snapid;
3594         void *p;
3595         void *end;
3596         char *image_id;
3597         u64 overlap;
3598         int ret;
3599
3600         parent_spec = rbd_spec_alloc();
3601         if (!parent_spec)
3602                 return -ENOMEM;
3603
3604         size = sizeof (__le64) +                                /* pool_id */
3605                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3606                 sizeof (__le64) +                               /* snap_id */
3607                 sizeof (__le64);                                /* overlap */
3608         reply_buf = kmalloc(size, GFP_KERNEL);
3609         if (!reply_buf) {
3610                 ret = -ENOMEM;
3611                 goto out_err;
3612         }
3613
3614         snapid = cpu_to_le64(CEPH_NOSNAP);
3615         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3616                                 "rbd", "get_parent",
3617                                 &snapid, sizeof (snapid),
3618                                 reply_buf, size, NULL);
3619         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3620         if (ret < 0)
3621                 goto out_err;
3622
3623         p = reply_buf;
3624         end = reply_buf + ret;
3625         ret = -ERANGE;
3626         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3627         if (parent_spec->pool_id == CEPH_NOPOOL)
3628                 goto out;       /* No parent?  No problem. */
3629
3630         /* The ceph file layout needs to fit pool id in 32 bits */
3631
3632         ret = -EIO;
3633         if (parent_spec->pool_id > (u64)U32_MAX) {
3634                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3635                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3636                 goto out_err;
3637         }
3638
3639         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3640         if (IS_ERR(image_id)) {
3641                 ret = PTR_ERR(image_id);
3642                 goto out_err;
3643         }
3644         parent_spec->image_id = image_id;
3645         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3646         ceph_decode_64_safe(&p, end, overlap, out_err);
3647
3648         rbd_dev->parent_overlap = overlap;
3649         rbd_dev->parent_spec = parent_spec;
3650         parent_spec = NULL;     /* rbd_dev now owns this */
3651 out:
3652         ret = 0;
3653 out_err:
3654         kfree(reply_buf);
3655         rbd_spec_put(parent_spec);
3656
3657         return ret;
3658 }
3659
3660 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3661 {
3662         struct {
3663                 __le64 stripe_unit;
3664                 __le64 stripe_count;
3665         } __attribute__ ((packed)) striping_info_buf = { 0 };
3666         size_t size = sizeof (striping_info_buf);
3667         void *p;
3668         u64 obj_size;
3669         u64 stripe_unit;
3670         u64 stripe_count;
3671         int ret;
3672
3673         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3674                                 "rbd", "get_stripe_unit_count", NULL, 0,
3675                                 (char *)&striping_info_buf, size, NULL);
3676         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3677         if (ret < 0)
3678                 return ret;
3679         if (ret < size)
3680                 return -ERANGE;
3681
3682         /*
3683          * We don't actually support the "fancy striping" feature
3684          * (STRIPINGV2) yet, but if the striping sizes are the
3685          * defaults the behavior is the same as before.  So find
3686          * out, and only fail if the image has non-default values.
3687          */
3688         ret = -EINVAL;
3689         obj_size = (u64)1 << rbd_dev->header.obj_order;
3690         p = &striping_info_buf;
3691         stripe_unit = ceph_decode_64(&p);
3692         if (stripe_unit != obj_size) {
3693                 rbd_warn(rbd_dev, "unsupported stripe unit "
3694                                 "(got %llu want %llu)",
3695                                 stripe_unit, obj_size);
3696                 return -EINVAL;
3697         }
3698         stripe_count = ceph_decode_64(&p);
3699         if (stripe_count != 1) {
3700                 rbd_warn(rbd_dev, "unsupported stripe count "
3701                                 "(got %llu want 1)", stripe_count);
3702                 return -EINVAL;
3703         }
3704         rbd_dev->header.stripe_unit = stripe_unit;
3705         rbd_dev->header.stripe_count = stripe_count;
3706
3707         return 0;
3708 }
3709
3710 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3711 {
3712         size_t image_id_size;
3713         char *image_id;
3714         void *p;
3715         void *end;
3716         size_t size;
3717         void *reply_buf = NULL;
3718         size_t len = 0;
3719         char *image_name = NULL;
3720         int ret;
3721
3722         rbd_assert(!rbd_dev->spec->image_name);
3723
3724         len = strlen(rbd_dev->spec->image_id);
3725         image_id_size = sizeof (__le32) + len;
3726         image_id = kmalloc(image_id_size, GFP_KERNEL);
3727         if (!image_id)
3728                 return NULL;
3729
3730         p = image_id;
3731         end = image_id + image_id_size;
3732         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3733
3734         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3735         reply_buf = kmalloc(size, GFP_KERNEL);
3736         if (!reply_buf)
3737                 goto out;
3738
3739         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3740                                 "rbd", "dir_get_name",
3741                                 image_id, image_id_size,
3742                                 reply_buf, size, NULL);
3743         if (ret < 0)
3744                 goto out;
3745         p = reply_buf;
3746         end = reply_buf + ret;
3747
3748         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3749         if (IS_ERR(image_name))
3750                 image_name = NULL;
3751         else
3752                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3753 out:
3754         kfree(reply_buf);
3755         kfree(image_id);
3756
3757         return image_name;
3758 }
3759
3760 /*
3761  * When a parent image gets probed, we only have the pool, image,
3762  * and snapshot ids but not the names of any of them.  This call
3763  * is made later to fill in those names.  It has to be done after
3764  * rbd_dev_snaps_update() has completed because some of the
3765  * information (in particular, snapshot name) is not available
3766  * until then.
3767  *
3768  * When an image being mapped (not a parent) is probed, we have the
3769  * pool name and pool id, image name and image id, and the snapshot
3770  * name.  The only thing we're missing is the snapshot id.
3771  */
3772 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3773 {
3774         struct ceph_osd_client *osdc;
3775         const char *name;
3776         void *reply_buf = NULL;
3777         int ret;
3778
3779         /*
3780          * An image being mapped will have the pool name (etc.), but
3781          * we need to look up the snapshot id.
3782          */
3783         if (rbd_dev->spec->pool_name) {
3784                 if (strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3785                         struct rbd_snap *snap;
3786
3787                         snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
3788                         if (!snap)
3789                                 return -ENOENT;
3790                         rbd_dev->spec->snap_id = snap->id;
3791                 } else {
3792                         rbd_dev->spec->snap_id = CEPH_NOSNAP;
3793                 }
3794
3795                 return 0;
3796         }
3797
3798         /* Look up the pool name */
3799
3800         osdc = &rbd_dev->rbd_client->client->osdc;
3801         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3802         if (!name) {
3803                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3804                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3805                 return -EIO;
3806         }
3807
3808         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3809         if (!rbd_dev->spec->pool_name)
3810                 return -ENOMEM;
3811
3812         /* Fetch the image name; tolerate failure here */
3813
3814         name = rbd_dev_image_name(rbd_dev);
3815         if (name)
3816                 rbd_dev->spec->image_name = (char *)name;
3817         else
3818                 rbd_warn(rbd_dev, "unable to get image name");
3819
3820         /* Look up the snapshot name. */
3821
3822         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3823         if (!name) {
3824                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3825                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3826                 ret = -EIO;
3827                 goto out_err;
3828         }
3829         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3830         if(!rbd_dev->spec->snap_name)
3831                 goto out_err;
3832
3833         return 0;
3834 out_err:
3835         kfree(reply_buf);
3836         kfree(rbd_dev->spec->pool_name);
3837         rbd_dev->spec->pool_name = NULL;
3838
3839         return ret;
3840 }
3841
3842 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3843 {
3844         size_t size;
3845         int ret;
3846         void *reply_buf;
3847         void *p;
3848         void *end;
3849         u64 seq;
3850         u32 snap_count;
3851         struct ceph_snap_context *snapc;
3852         u32 i;
3853
3854         /*
3855          * We'll need room for the seq value (maximum snapshot id),
3856          * snapshot count, and array of that many snapshot ids.
3857          * For now we have a fixed upper limit on the number we're
3858          * prepared to receive.
3859          */
3860         size = sizeof (__le64) + sizeof (__le32) +
3861                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3862         reply_buf = kzalloc(size, GFP_KERNEL);
3863         if (!reply_buf)
3864                 return -ENOMEM;
3865
3866         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3867                                 "rbd", "get_snapcontext", NULL, 0,
3868                                 reply_buf, size, ver);
3869         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3870         if (ret < 0)
3871                 goto out;
3872
3873         p = reply_buf;
3874         end = reply_buf + ret;
3875         ret = -ERANGE;
3876         ceph_decode_64_safe(&p, end, seq, out);
3877         ceph_decode_32_safe(&p, end, snap_count, out);
3878
3879         /*
3880          * Make sure the reported number of snapshot ids wouldn't go
3881          * beyond the end of our buffer.  But before checking that,
3882          * make sure the computed size of the snapshot context we
3883          * allocate is representable in a size_t.
3884          */
3885         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3886                                  / sizeof (u64)) {
3887                 ret = -EINVAL;
3888                 goto out;
3889         }
3890         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3891                 goto out;
3892
3893         size = sizeof (struct ceph_snap_context) +
3894                                 snap_count * sizeof (snapc->snaps[0]);
3895         snapc = kmalloc(size, GFP_KERNEL);
3896         if (!snapc) {
3897                 ret = -ENOMEM;
3898                 goto out;
3899         }
3900         ret = 0;
3901
3902         atomic_set(&snapc->nref, 1);
3903         snapc->seq = seq;
3904         snapc->num_snaps = snap_count;
3905         for (i = 0; i < snap_count; i++)
3906                 snapc->snaps[i] = ceph_decode_64(&p);
3907
3908         rbd_dev->header.snapc = snapc;
3909
3910         dout("  snap context seq = %llu, snap_count = %u\n",
3911                 (unsigned long long)seq, (unsigned int)snap_count);
3912 out:
3913         kfree(reply_buf);
3914
3915         return ret;
3916 }
3917
3918 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3919 {
3920         size_t size;
3921         void *reply_buf;
3922         __le64 snap_id;
3923         int ret;
3924         void *p;
3925         void *end;
3926         char *snap_name;
3927
3928         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3929         reply_buf = kmalloc(size, GFP_KERNEL);
3930         if (!reply_buf)
3931                 return ERR_PTR(-ENOMEM);
3932
3933         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3934         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3935         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3936                                 "rbd", "get_snapshot_name",
3937                                 &snap_id, sizeof (snap_id),
3938                                 reply_buf, size, NULL);
3939         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3940         if (ret < 0) {
3941                 snap_name = ERR_PTR(ret);
3942                 goto out;
3943         }
3944
3945         p = reply_buf;
3946         end = reply_buf + ret;
3947         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3948         if (IS_ERR(snap_name))
3949                 goto out;
3950
3951         dout("  snap_id 0x%016llx snap_name = %s\n",
3952                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3953 out:
3954         kfree(reply_buf);
3955
3956         return snap_name;
3957 }
3958
3959 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3960                 u64 *snap_size, u64 *snap_features)
3961 {
3962         u64 snap_id;
3963         u64 size;
3964         u64 features;
3965         char *snap_name;
3966         int ret;
3967
3968         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3969         snap_id = rbd_dev->header.snapc->snaps[which];
3970         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3971         if (ret)
3972                 goto out_err;
3973
3974         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3975         if (ret)
3976                 goto out_err;
3977
3978         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3979         if (!IS_ERR(snap_name)) {
3980                 *snap_size = size;
3981                 *snap_features = features;
3982         }
3983
3984         return snap_name;
3985 out_err:
3986         return ERR_PTR(ret);
3987 }
3988
3989 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3990                 u64 *snap_size, u64 *snap_features)
3991 {
3992         if (rbd_dev->image_format == 1)
3993                 return rbd_dev_v1_snap_info(rbd_dev, which,
3994                                         snap_size, snap_features);
3995         if (rbd_dev->image_format == 2)
3996                 return rbd_dev_v2_snap_info(rbd_dev, which,
3997                                         snap_size, snap_features);
3998         return ERR_PTR(-EINVAL);
3999 }
4000
4001 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4002 {
4003         int ret;
4004         __u8 obj_order;
4005
4006         down_write(&rbd_dev->header_rwsem);
4007
4008         /* Grab old order first, to see if it changes */
4009
4010         obj_order = rbd_dev->header.obj_order,
4011         ret = rbd_dev_v2_image_size(rbd_dev);
4012         if (ret)
4013                 goto out;
4014         if (rbd_dev->header.obj_order != obj_order) {
4015                 ret = -EIO;
4016                 goto out;
4017         }
4018         rbd_update_mapping_size(rbd_dev);
4019
4020         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4021         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4022         if (ret)
4023                 goto out;
4024         ret = rbd_dev_snaps_update(rbd_dev);
4025         dout("rbd_dev_snaps_update returned %d\n", ret);
4026         if (ret)
4027                 goto out;
4028 out:
4029         up_write(&rbd_dev->header_rwsem);
4030
4031         return ret;
4032 }
4033
4034 /*
4035  * Scan the rbd device's current snapshot list and compare it to the
4036  * newly-received snapshot context.  Remove any existing snapshots
4037  * not present in the new snapshot context.  Add a new snapshot for
4038  * any snaphots in the snapshot context not in the current list.
4039  * And verify there are no changes to snapshots we already know
4040  * about.
4041  *
4042  * Assumes the snapshots in the snapshot context are sorted by
4043  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4044  * are also maintained in that order.)
4045  *
4046  * Note that any error occurs while updating the snapshot list
4047  * aborts the update, and the entire list is cleared.  The snapshot
4048  * list becomes inconsistent at that point anyway, so it might as
4049  * well be empty.
4050  */
4051 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4052 {
4053         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4054         const u32 snap_count = snapc->num_snaps;
4055         struct list_head *head = &rbd_dev->snaps;
4056         struct list_head *links = head->next;
4057         u32 index = 0;
4058         int ret = 0;
4059
4060         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4061         while (index < snap_count || links != head) {
4062                 u64 snap_id;
4063                 struct rbd_snap *snap;
4064                 char *snap_name;
4065                 u64 snap_size = 0;
4066                 u64 snap_features = 0;
4067
4068                 snap_id = index < snap_count ? snapc->snaps[index]
4069                                              : CEPH_NOSNAP;
4070                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4071                                      : NULL;
4072                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4073
4074                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4075                         struct list_head *next = links->next;
4076
4077                         /*
4078                          * A previously-existing snapshot is not in
4079                          * the new snap context.
4080                          *
4081                          * If the now-missing snapshot is the one
4082                          * the image represents, clear its existence
4083                          * flag so we can avoid sending any more
4084                          * requests to it.
4085                          */
4086                         if (rbd_dev->spec->snap_id == snap->id)
4087                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4088                         dout("removing %ssnap id %llu\n",
4089                                 rbd_dev->spec->snap_id == snap->id ?
4090                                                         "mapped " : "",
4091                                 (unsigned long long)snap->id);
4092
4093                         list_del(&snap->node);
4094                         rbd_snap_destroy(snap);
4095
4096                         /* Done with this list entry; advance */
4097
4098                         links = next;
4099                         continue;
4100                 }
4101
4102                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4103                                         &snap_size, &snap_features);
4104                 if (IS_ERR(snap_name)) {
4105                         ret = PTR_ERR(snap_name);
4106                         dout("failed to get snap info, error %d\n", ret);
4107                         goto out_err;
4108                 }
4109
4110                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4111                         (unsigned long long)snap_id);
4112                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4113                         struct rbd_snap *new_snap;
4114
4115                         /* We haven't seen this snapshot before */
4116
4117                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4118                                         snap_id, snap_size, snap_features);
4119                         if (IS_ERR(new_snap)) {
4120                                 ret = PTR_ERR(new_snap);
4121                                 dout("  failed to add dev, error %d\n", ret);
4122                                 goto out_err;
4123                         }
4124
4125                         /* New goes before existing, or at end of list */
4126
4127                         dout("  added dev%s\n", snap ? "" : " at end\n");
4128                         if (snap)
4129                                 list_add_tail(&new_snap->node, &snap->node);
4130                         else
4131                                 list_add_tail(&new_snap->node, head);
4132                 } else {
4133                         /* Already have this one */
4134
4135                         dout("  already present\n");
4136
4137                         rbd_assert(snap->size == snap_size);
4138                         rbd_assert(!strcmp(snap->name, snap_name));
4139                         rbd_assert(snap->features == snap_features);
4140
4141                         /* Done with this list entry; advance */
4142
4143                         links = links->next;
4144                 }
4145
4146                 /* Advance to the next entry in the snapshot context */
4147
4148                 index++;
4149         }
4150         dout("%s: done\n", __func__);
4151
4152         return 0;
4153 out_err:
4154         rbd_remove_all_snaps(rbd_dev);
4155
4156         return ret;
4157 }
4158
4159 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4160 {
4161         struct device *dev;
4162         int ret;
4163
4164         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4165
4166         dev = &rbd_dev->dev;
4167         dev->bus = &rbd_bus_type;
4168         dev->type = &rbd_device_type;
4169         dev->parent = &rbd_root_dev;
4170         dev->release = rbd_dev_release;
4171         dev_set_name(dev, "%d", rbd_dev->dev_id);
4172         ret = device_register(dev);
4173
4174         mutex_unlock(&ctl_mutex);
4175
4176         return ret;
4177 }
4178
4179 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4180 {
4181         device_unregister(&rbd_dev->dev);
4182 }
4183
4184 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4185
4186 /*
4187  * Get a unique rbd identifier for the given new rbd_dev, and add
4188  * the rbd_dev to the global list.  The minimum rbd id is 1.
4189  */
4190 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4191 {
4192         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4193
4194         spin_lock(&rbd_dev_list_lock);
4195         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4196         spin_unlock(&rbd_dev_list_lock);
4197         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4198                 (unsigned long long) rbd_dev->dev_id);
4199 }
4200
4201 /*
4202  * Remove an rbd_dev from the global list, and record that its
4203  * identifier is no longer in use.
4204  */
4205 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4206 {
4207         struct list_head *tmp;
4208         int rbd_id = rbd_dev->dev_id;
4209         int max_id;
4210
4211         rbd_assert(rbd_id > 0);
4212
4213         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4214                 (unsigned long long) rbd_dev->dev_id);
4215         spin_lock(&rbd_dev_list_lock);
4216         list_del_init(&rbd_dev->node);
4217
4218         /*
4219          * If the id being "put" is not the current maximum, there
4220          * is nothing special we need to do.
4221          */
4222         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4223                 spin_unlock(&rbd_dev_list_lock);
4224                 return;
4225         }
4226
4227         /*
4228          * We need to update the current maximum id.  Search the
4229          * list to find out what it is.  We're more likely to find
4230          * the maximum at the end, so search the list backward.
4231          */
4232         max_id = 0;
4233         list_for_each_prev(tmp, &rbd_dev_list) {
4234                 struct rbd_device *rbd_dev;
4235
4236                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4237                 if (rbd_dev->dev_id > max_id)
4238                         max_id = rbd_dev->dev_id;
4239         }
4240         spin_unlock(&rbd_dev_list_lock);
4241
4242         /*
4243          * The max id could have been updated by rbd_dev_id_get(), in
4244          * which case it now accurately reflects the new maximum.
4245          * Be careful not to overwrite the maximum value in that
4246          * case.
4247          */
4248         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4249         dout("  max dev id has been reset\n");
4250 }
4251
4252 /*
4253  * Skips over white space at *buf, and updates *buf to point to the
4254  * first found non-space character (if any). Returns the length of
4255  * the token (string of non-white space characters) found.  Note
4256  * that *buf must be terminated with '\0'.
4257  */
4258 static inline size_t next_token(const char **buf)
4259 {
4260         /*
4261         * These are the characters that produce nonzero for
4262         * isspace() in the "C" and "POSIX" locales.
4263         */
4264         const char *spaces = " \f\n\r\t\v";
4265
4266         *buf += strspn(*buf, spaces);   /* Find start of token */
4267
4268         return strcspn(*buf, spaces);   /* Return token length */
4269 }
4270
4271 /*
4272  * Finds the next token in *buf, and if the provided token buffer is
4273  * big enough, copies the found token into it.  The result, if
4274  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4275  * must be terminated with '\0' on entry.
4276  *
4277  * Returns the length of the token found (not including the '\0').
4278  * Return value will be 0 if no token is found, and it will be >=
4279  * token_size if the token would not fit.
4280  *
4281  * The *buf pointer will be updated to point beyond the end of the
4282  * found token.  Note that this occurs even if the token buffer is
4283  * too small to hold it.
4284  */
4285 static inline size_t copy_token(const char **buf,
4286                                 char *token,
4287                                 size_t token_size)
4288 {
4289         size_t len;
4290
4291         len = next_token(buf);
4292         if (len < token_size) {
4293                 memcpy(token, *buf, len);
4294                 *(token + len) = '\0';
4295         }
4296         *buf += len;
4297
4298         return len;
4299 }
4300
4301 /*
4302  * Finds the next token in *buf, dynamically allocates a buffer big
4303  * enough to hold a copy of it, and copies the token into the new
4304  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4305  * that a duplicate buffer is created even for a zero-length token.
4306  *
4307  * Returns a pointer to the newly-allocated duplicate, or a null
4308  * pointer if memory for the duplicate was not available.  If
4309  * the lenp argument is a non-null pointer, the length of the token
4310  * (not including the '\0') is returned in *lenp.
4311  *
4312  * If successful, the *buf pointer will be updated to point beyond
4313  * the end of the found token.
4314  *
4315  * Note: uses GFP_KERNEL for allocation.
4316  */
4317 static inline char *dup_token(const char **buf, size_t *lenp)
4318 {
4319         char *dup;
4320         size_t len;
4321
4322         len = next_token(buf);
4323         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4324         if (!dup)
4325                 return NULL;
4326         *(dup + len) = '\0';
4327         *buf += len;
4328
4329         if (lenp)
4330                 *lenp = len;
4331
4332         return dup;
4333 }
4334
4335 /*
4336  * Parse the options provided for an "rbd add" (i.e., rbd image
4337  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4338  * and the data written is passed here via a NUL-terminated buffer.
4339  * Returns 0 if successful or an error code otherwise.
4340  *
4341  * The information extracted from these options is recorded in
4342  * the other parameters which return dynamically-allocated
4343  * structures:
4344  *  ceph_opts
4345  *      The address of a pointer that will refer to a ceph options
4346  *      structure.  Caller must release the returned pointer using
4347  *      ceph_destroy_options() when it is no longer needed.
4348  *  rbd_opts
4349  *      Address of an rbd options pointer.  Fully initialized by
4350  *      this function; caller must release with kfree().
4351  *  spec
4352  *      Address of an rbd image specification pointer.  Fully
4353  *      initialized by this function based on parsed options.
4354  *      Caller must release with rbd_spec_put().
4355  *
4356  * The options passed take this form:
4357  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4358  * where:
4359  *  <mon_addrs>
4360  *      A comma-separated list of one or more monitor addresses.
4361  *      A monitor address is an ip address, optionally followed
4362  *      by a port number (separated by a colon).
4363  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4364  *  <options>
4365  *      A comma-separated list of ceph and/or rbd options.
4366  *  <pool_name>
4367  *      The name of the rados pool containing the rbd image.
4368  *  <image_name>
4369  *      The name of the image in that pool to map.
4370  *  <snap_id>
4371  *      An optional snapshot id.  If provided, the mapping will
4372  *      present data from the image at the time that snapshot was
4373  *      created.  The image head is used if no snapshot id is
4374  *      provided.  Snapshot mappings are always read-only.
4375  */
4376 static int rbd_add_parse_args(const char *buf,
4377                                 struct ceph_options **ceph_opts,
4378                                 struct rbd_options **opts,
4379                                 struct rbd_spec **rbd_spec)
4380 {
4381         size_t len;
4382         char *options;
4383         const char *mon_addrs;
4384         char *snap_name;
4385         size_t mon_addrs_size;
4386         struct rbd_spec *spec = NULL;
4387         struct rbd_options *rbd_opts = NULL;
4388         struct ceph_options *copts;
4389         int ret;
4390
4391         /* The first four tokens are required */
4392
4393         len = next_token(&buf);
4394         if (!len) {
4395                 rbd_warn(NULL, "no monitor address(es) provided");
4396                 return -EINVAL;
4397         }
4398         mon_addrs = buf;
4399         mon_addrs_size = len + 1;
4400         buf += len;
4401
4402         ret = -EINVAL;
4403         options = dup_token(&buf, NULL);
4404         if (!options)
4405                 return -ENOMEM;
4406         if (!*options) {
4407                 rbd_warn(NULL, "no options provided");
4408                 goto out_err;
4409         }
4410
4411         spec = rbd_spec_alloc();
4412         if (!spec)
4413                 goto out_mem;
4414
4415         spec->pool_name = dup_token(&buf, NULL);
4416         if (!spec->pool_name)
4417                 goto out_mem;
4418         if (!*spec->pool_name) {
4419                 rbd_warn(NULL, "no pool name provided");
4420                 goto out_err;
4421         }
4422
4423         spec->image_name = dup_token(&buf, NULL);
4424         if (!spec->image_name)
4425                 goto out_mem;
4426         if (!*spec->image_name) {
4427                 rbd_warn(NULL, "no image name provided");
4428                 goto out_err;
4429         }
4430
4431         /*
4432          * Snapshot name is optional; default is to use "-"
4433          * (indicating the head/no snapshot).
4434          */
4435         len = next_token(&buf);
4436         if (!len) {
4437                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4438                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4439         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4440                 ret = -ENAMETOOLONG;
4441                 goto out_err;
4442         }
4443         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4444         if (!snap_name)
4445                 goto out_mem;
4446         *(snap_name + len) = '\0';
4447         spec->snap_name = snap_name;
4448
4449         /* Initialize all rbd options to the defaults */
4450
4451         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4452         if (!rbd_opts)
4453                 goto out_mem;
4454
4455         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4456
4457         copts = ceph_parse_options(options, mon_addrs,
4458                                         mon_addrs + mon_addrs_size - 1,
4459                                         parse_rbd_opts_token, rbd_opts);
4460         if (IS_ERR(copts)) {
4461                 ret = PTR_ERR(copts);
4462                 goto out_err;
4463         }
4464         kfree(options);
4465
4466         *ceph_opts = copts;
4467         *opts = rbd_opts;
4468         *rbd_spec = spec;
4469
4470         return 0;
4471 out_mem:
4472         ret = -ENOMEM;
4473 out_err:
4474         kfree(rbd_opts);
4475         rbd_spec_put(spec);
4476         kfree(options);
4477
4478         return ret;
4479 }
4480
4481 /*
4482  * An rbd format 2 image has a unique identifier, distinct from the
4483  * name given to it by the user.  Internally, that identifier is
4484  * what's used to specify the names of objects related to the image.
4485  *
4486  * A special "rbd id" object is used to map an rbd image name to its
4487  * id.  If that object doesn't exist, then there is no v2 rbd image
4488  * with the supplied name.
4489  *
4490  * This function will record the given rbd_dev's image_id field if
4491  * it can be determined, and in that case will return 0.  If any
4492  * errors occur a negative errno will be returned and the rbd_dev's
4493  * image_id field will be unchanged (and should be NULL).
4494  */
4495 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4496 {
4497         int ret;
4498         size_t size;
4499         char *object_name;
4500         void *response;
4501         char *image_id;
4502
4503         /*
4504          * When probing a parent image, the image id is already
4505          * known (and the image name likely is not).  There's no
4506          * need to fetch the image id again in this case.  We
4507          * do still need to set the image format though.
4508          */
4509         if (rbd_dev->spec->image_id) {
4510                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4511
4512                 return 0;
4513         }
4514
4515         /*
4516          * First, see if the format 2 image id file exists, and if
4517          * so, get the image's persistent id from it.
4518          */
4519         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4520         object_name = kmalloc(size, GFP_NOIO);
4521         if (!object_name)
4522                 return -ENOMEM;
4523         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4524         dout("rbd id object name is %s\n", object_name);
4525
4526         /* Response will be an encoded string, which includes a length */
4527
4528         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4529         response = kzalloc(size, GFP_NOIO);
4530         if (!response) {
4531                 ret = -ENOMEM;
4532                 goto out;
4533         }
4534
4535         /* If it doesn't exist we'll assume it's a format 1 image */
4536
4537         ret = rbd_obj_method_sync(rbd_dev, object_name,
4538                                 "rbd", "get_id", NULL, 0,
4539                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4540         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4541         if (ret == -ENOENT) {
4542                 image_id = kstrdup("", GFP_KERNEL);
4543                 ret = image_id ? 0 : -ENOMEM;
4544                 if (!ret)
4545                         rbd_dev->image_format = 1;
4546         } else if (ret > sizeof (__le32)) {
4547                 void *p = response;
4548
4549                 image_id = ceph_extract_encoded_string(&p, p + ret,
4550                                                 NULL, GFP_NOIO);
4551                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4552                 if (!ret)
4553                         rbd_dev->image_format = 2;
4554         } else {
4555                 ret = -EINVAL;
4556         }
4557
4558         if (!ret) {
4559                 rbd_dev->spec->image_id = image_id;
4560                 dout("image_id is %s\n", image_id);
4561         }
4562 out:
4563         kfree(response);
4564         kfree(object_name);
4565
4566         return ret;
4567 }
4568
4569 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4570 {
4571         int ret;
4572         size_t size;
4573
4574         /* Record the header object name for this rbd image. */
4575
4576         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4577         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4578         if (!rbd_dev->header_name) {
4579                 ret = -ENOMEM;
4580                 goto out_err;
4581         }
4582         sprintf(rbd_dev->header_name, "%s%s",
4583                 rbd_dev->spec->image_name, RBD_SUFFIX);
4584
4585         /* Populate rbd image metadata */
4586
4587         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4588         if (ret < 0)
4589                 goto out_err;
4590
4591         /* Version 1 images have no parent (no layering) */
4592
4593         rbd_dev->parent_spec = NULL;
4594         rbd_dev->parent_overlap = 0;
4595
4596         dout("discovered version 1 image, header name is %s\n",
4597                 rbd_dev->header_name);
4598
4599         return 0;
4600
4601 out_err:
4602         kfree(rbd_dev->header_name);
4603         rbd_dev->header_name = NULL;
4604         kfree(rbd_dev->spec->image_id);
4605         rbd_dev->spec->image_id = NULL;
4606
4607         return ret;
4608 }
4609
4610 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4611 {
4612         size_t size;
4613         int ret;
4614         u64 ver = 0;
4615
4616         /*
4617          * Image id was filled in by the caller.  Record the header
4618          * object name for this rbd image.
4619          */
4620         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4621         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4622         if (!rbd_dev->header_name)
4623                 return -ENOMEM;
4624         sprintf(rbd_dev->header_name, "%s%s",
4625                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4626
4627         /* Get the size and object order for the image */
4628         ret = rbd_dev_v2_image_size(rbd_dev);
4629         if (ret)
4630                 goto out_err;
4631
4632         /* Get the object prefix (a.k.a. block_name) for the image */
4633
4634         ret = rbd_dev_v2_object_prefix(rbd_dev);
4635         if (ret)
4636                 goto out_err;
4637
4638         /* Get the and check features for the image */
4639
4640         ret = rbd_dev_v2_features(rbd_dev);
4641         if (ret)
4642                 goto out_err;
4643
4644         /* If the image supports layering, get the parent info */
4645
4646         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4647                 ret = rbd_dev_v2_parent_info(rbd_dev);
4648                 if (ret)
4649                         goto out_err;
4650                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4651                                         "layered rbd images is EXPERIMENTAL!");
4652         }
4653
4654         /* If the image supports fancy striping, get its parameters */
4655
4656         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4657                 ret = rbd_dev_v2_striping_info(rbd_dev);
4658                 if (ret < 0)
4659                         goto out_err;
4660         }
4661
4662         /* crypto and compression type aren't (yet) supported for v2 images */
4663
4664         rbd_dev->header.crypt_type = 0;
4665         rbd_dev->header.comp_type = 0;
4666
4667         /* Get the snapshot context, plus the header version */
4668
4669         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4670         if (ret)
4671                 goto out_err;
4672         rbd_dev->header.obj_version = ver;
4673
4674         dout("discovered version 2 image, header name is %s\n",
4675                 rbd_dev->header_name);
4676
4677         return 0;
4678 out_err:
4679         rbd_dev->parent_overlap = 0;
4680         rbd_spec_put(rbd_dev->parent_spec);
4681         rbd_dev->parent_spec = NULL;
4682         kfree(rbd_dev->header_name);
4683         rbd_dev->header_name = NULL;
4684         kfree(rbd_dev->header.object_prefix);
4685         rbd_dev->header.object_prefix = NULL;
4686
4687         return ret;
4688 }
4689
4690 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4691 {
4692         struct rbd_device *parent = NULL;
4693         struct rbd_spec *parent_spec = NULL;
4694         struct rbd_client *rbdc = NULL;
4695         int ret;
4696
4697         /* no need to lock here, as rbd_dev is not registered yet */
4698         ret = rbd_dev_snaps_update(rbd_dev);
4699         if (ret)
4700                 return ret;
4701
4702         ret = rbd_dev_probe_update_spec(rbd_dev);
4703         if (ret)
4704                 goto err_out_snaps;
4705
4706         ret = rbd_dev_set_mapping(rbd_dev);
4707         if (ret)
4708                 goto err_out_snaps;
4709
4710         /* generate unique id: find highest unique id, add one */
4711         rbd_dev_id_get(rbd_dev);
4712
4713         /* Fill in the device name, now that we have its id. */
4714         BUILD_BUG_ON(DEV_NAME_LEN
4715                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4716         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4717
4718         /* Get our block major device number. */
4719
4720         ret = register_blkdev(0, rbd_dev->name);
4721         if (ret < 0)
4722                 goto err_out_id;
4723         rbd_dev->major = ret;
4724
4725         /* Set up the blkdev mapping. */
4726
4727         ret = rbd_init_disk(rbd_dev);
4728         if (ret)
4729                 goto err_out_blkdev;
4730
4731         ret = rbd_bus_add_dev(rbd_dev);
4732         if (ret)
4733                 goto err_out_disk;
4734
4735         /*
4736          * At this point cleanup in the event of an error is the job
4737          * of the sysfs code (initiated by rbd_bus_del_dev()).
4738          */
4739         /* Probe the parent if there is one */
4740
4741         if (rbd_dev->parent_spec) {
4742                 /*
4743                  * We need to pass a reference to the client and the
4744                  * parent spec when creating the parent rbd_dev.
4745                  * Images related by parent/child relationships
4746                  * always share both.
4747                  */
4748                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4749                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4750
4751                 parent = rbd_dev_create(rbdc, parent_spec);
4752                 if (!parent) {
4753                         ret = -ENOMEM;
4754                         goto err_out_spec;
4755                 }
4756                 rbdc = NULL;            /* parent now owns reference */
4757                 parent_spec = NULL;     /* parent now owns reference */
4758                 ret = rbd_dev_probe(parent);
4759                 if (ret < 0)
4760                         goto err_out_parent;
4761                 rbd_dev->parent = parent;
4762         }
4763
4764         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4765         if (ret)
4766                 goto err_out_bus;
4767
4768         /* Everything's ready.  Announce the disk to the world. */
4769
4770         add_disk(rbd_dev->disk);
4771
4772         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4773                 (unsigned long long) rbd_dev->mapping.size);
4774
4775         return ret;
4776
4777 err_out_parent:
4778         rbd_dev_destroy(parent);
4779 err_out_spec:
4780         rbd_spec_put(parent_spec);
4781         rbd_put_client(rbdc);
4782 err_out_bus:
4783         /* this will also clean up rest of rbd_dev stuff */
4784
4785         rbd_bus_del_dev(rbd_dev);
4786
4787         return ret;
4788 err_out_disk:
4789         rbd_free_disk(rbd_dev);
4790 err_out_blkdev:
4791         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4792 err_out_id:
4793         rbd_dev_id_put(rbd_dev);
4794 err_out_snaps:
4795         rbd_remove_all_snaps(rbd_dev);
4796
4797         return ret;
4798 }
4799
4800 /*
4801  * Probe for the existence of the header object for the given rbd
4802  * device.  For format 2 images this includes determining the image
4803  * id.
4804  */
4805 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4806 {
4807         int ret;
4808
4809         /*
4810          * Get the id from the image id object.  If it's not a
4811          * format 2 image, we'll get ENOENT back, and we'll assume
4812          * it's a format 1 image.
4813          */
4814         ret = rbd_dev_image_id(rbd_dev);
4815         if (ret)
4816                 return ret;
4817         rbd_assert(rbd_dev->spec->image_id);
4818         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4819
4820         if (rbd_dev->image_format == 1)
4821                 ret = rbd_dev_v1_probe(rbd_dev);
4822         else
4823                 ret = rbd_dev_v2_probe(rbd_dev);
4824         if (ret)
4825                 goto out_err;
4826
4827         ret = rbd_dev_probe_finish(rbd_dev);
4828         if (ret)
4829                 rbd_header_free(&rbd_dev->header);
4830
4831         return ret;
4832 out_err:
4833         kfree(rbd_dev->spec->image_id);
4834         rbd_dev->spec->image_id = NULL;
4835
4836         dout("probe failed, returning %d\n", ret);
4837
4838         return ret;
4839 }
4840
4841 static ssize_t rbd_add(struct bus_type *bus,
4842                        const char *buf,
4843                        size_t count)
4844 {
4845         struct rbd_device *rbd_dev = NULL;
4846         struct ceph_options *ceph_opts = NULL;
4847         struct rbd_options *rbd_opts = NULL;
4848         struct rbd_spec *spec = NULL;
4849         struct rbd_client *rbdc;
4850         struct ceph_osd_client *osdc;
4851         int rc = -ENOMEM;
4852
4853         if (!try_module_get(THIS_MODULE))
4854                 return -ENODEV;
4855
4856         /* parse add command */
4857         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4858         if (rc < 0)
4859                 goto err_out_module;
4860
4861         rbdc = rbd_get_client(ceph_opts);
4862         if (IS_ERR(rbdc)) {
4863                 rc = PTR_ERR(rbdc);
4864                 goto err_out_args;
4865         }
4866         ceph_opts = NULL;       /* rbd_dev client now owns this */
4867
4868         /* pick the pool */
4869         osdc = &rbdc->client->osdc;
4870         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4871         if (rc < 0)
4872                 goto err_out_client;
4873         spec->pool_id = (u64)rc;
4874
4875         /* The ceph file layout needs to fit pool id in 32 bits */
4876
4877         if (spec->pool_id > (u64)U32_MAX) {
4878                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4879                                 (unsigned long long)spec->pool_id, U32_MAX);
4880                 rc = -EIO;
4881                 goto err_out_client;
4882         }
4883
4884         rbd_dev = rbd_dev_create(rbdc, spec);
4885         if (!rbd_dev)
4886                 goto err_out_client;
4887         rbdc = NULL;            /* rbd_dev now owns this */
4888         spec = NULL;            /* rbd_dev now owns this */
4889
4890         rbd_dev->mapping.read_only = rbd_opts->read_only;
4891         kfree(rbd_opts);
4892         rbd_opts = NULL;        /* done with this */
4893
4894         rc = rbd_dev_probe(rbd_dev);
4895         if (rc < 0)
4896                 goto err_out_rbd_dev;
4897
4898         return count;
4899 err_out_rbd_dev:
4900         rbd_dev_destroy(rbd_dev);
4901 err_out_client:
4902         rbd_put_client(rbdc);
4903 err_out_args:
4904         if (ceph_opts)
4905                 ceph_destroy_options(ceph_opts);
4906         kfree(rbd_opts);
4907         rbd_spec_put(spec);
4908 err_out_module:
4909         module_put(THIS_MODULE);
4910
4911         dout("Error adding device %s\n", buf);
4912
4913         return (ssize_t)rc;
4914 }
4915
4916 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4917 {
4918         struct list_head *tmp;
4919         struct rbd_device *rbd_dev;
4920
4921         spin_lock(&rbd_dev_list_lock);
4922         list_for_each(tmp, &rbd_dev_list) {
4923                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4924                 if (rbd_dev->dev_id == dev_id) {
4925                         spin_unlock(&rbd_dev_list_lock);
4926                         return rbd_dev;
4927                 }
4928         }
4929         spin_unlock(&rbd_dev_list_lock);
4930         return NULL;
4931 }
4932
4933 static void rbd_dev_release(struct device *dev)
4934 {
4935         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4936
4937         if (rbd_dev->watch_event)
4938                 rbd_dev_header_watch_sync(rbd_dev, 0);
4939
4940         /* clean up and free blkdev */
4941         rbd_free_disk(rbd_dev);
4942         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4943
4944         /* release allocated disk header fields */
4945         rbd_header_free(&rbd_dev->header);
4946
4947         /* done with the id, and with the rbd_dev */
4948         rbd_dev_id_put(rbd_dev);
4949         rbd_assert(rbd_dev->rbd_client != NULL);
4950         rbd_dev_destroy(rbd_dev);
4951
4952         /* release module ref */
4953         module_put(THIS_MODULE);
4954 }
4955
4956 static void __rbd_remove(struct rbd_device *rbd_dev)
4957 {
4958         rbd_remove_all_snaps(rbd_dev);
4959         rbd_bus_del_dev(rbd_dev);
4960 }
4961
4962 static ssize_t rbd_remove(struct bus_type *bus,
4963                           const char *buf,
4964                           size_t count)
4965 {
4966         struct rbd_device *rbd_dev = NULL;
4967         int target_id, rc;
4968         unsigned long ul;
4969         int ret = count;
4970
4971         rc = strict_strtoul(buf, 10, &ul);
4972         if (rc)
4973                 return rc;
4974
4975         /* convert to int; abort if we lost anything in the conversion */
4976         target_id = (int) ul;
4977         if (target_id != ul)
4978                 return -EINVAL;
4979
4980         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4981
4982         rbd_dev = __rbd_get_dev(target_id);
4983         if (!rbd_dev) {
4984                 ret = -ENOENT;
4985                 goto done;
4986         }
4987
4988         spin_lock_irq(&rbd_dev->lock);
4989         if (rbd_dev->open_count)
4990                 ret = -EBUSY;
4991         else
4992                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4993         spin_unlock_irq(&rbd_dev->lock);
4994         if (ret < 0)
4995                 goto done;
4996
4997         while (rbd_dev->parent_spec) {
4998                 struct rbd_device *first = rbd_dev;
4999                 struct rbd_device *second = first->parent;
5000                 struct rbd_device *third;
5001
5002                 /*
5003                  * Follow to the parent with no grandparent and
5004                  * remove it.
5005                  */
5006                 while (second && (third = second->parent)) {
5007                         first = second;
5008                         second = third;
5009                 }
5010                 __rbd_remove(second);
5011                 rbd_spec_put(first->parent_spec);
5012                 first->parent_spec = NULL;
5013                 first->parent_overlap = 0;
5014                 first->parent = NULL;
5015         }
5016         __rbd_remove(rbd_dev);
5017
5018 done:
5019         mutex_unlock(&ctl_mutex);
5020
5021         return ret;
5022 }
5023
5024 /*
5025  * create control files in sysfs
5026  * /sys/bus/rbd/...
5027  */
5028 static int rbd_sysfs_init(void)
5029 {
5030         int ret;
5031
5032         ret = device_register(&rbd_root_dev);
5033         if (ret < 0)
5034                 return ret;
5035
5036         ret = bus_register(&rbd_bus_type);
5037         if (ret < 0)
5038                 device_unregister(&rbd_root_dev);
5039
5040         return ret;
5041 }
5042
5043 static void rbd_sysfs_cleanup(void)
5044 {
5045         bus_unregister(&rbd_bus_type);
5046         device_unregister(&rbd_root_dev);
5047 }
5048
5049 static int __init rbd_init(void)
5050 {
5051         int rc;
5052
5053         if (!libceph_compatible(NULL)) {
5054                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5055
5056                 return -EINVAL;
5057         }
5058         rc = rbd_sysfs_init();
5059         if (rc)
5060                 return rc;
5061         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5062         return 0;
5063 }
5064
5065 static void __exit rbd_exit(void)
5066 {
5067         rbd_sysfs_cleanup();
5068 }
5069
5070 module_init(rbd_init);
5071 module_exit(rbd_exit);
5072
5073 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5074 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5075 MODULE_DESCRIPTION("rados block device");
5076
5077 /* following authorship retained from original osdblk.c */
5078 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5079
5080 MODULE_LICENSE("GPL");