]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: implement full object parent reads
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221
222         struct ceph_osd_request *osd_req;
223
224         u64                     xferred;        /* bytes transferred */
225         u64                     version;
226         int                     result;
227
228         rbd_obj_callback_t      callback;
229         struct completion       completion;
230
231         struct kref             kref;
232 };
233
234 enum img_req_flags {
235         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
236         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
237         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
238 };
239
240 struct rbd_img_request {
241         struct rbd_device       *rbd_dev;
242         u64                     offset; /* starting image byte offset */
243         u64                     length; /* byte count from offset */
244         unsigned long           flags;
245         union {
246                 u64                     snap_id;        /* for reads */
247                 struct ceph_snap_context *snapc;        /* for writes */
248         };
249         union {
250                 struct request          *rq;            /* block request */
251                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
252         };
253         struct page             **copyup_pages;
254         spinlock_t              completion_lock;/* protects next_completion */
255         u32                     next_completion;
256         rbd_img_callback_t      callback;
257         u64                     xferred;/* aggregate bytes transferred */
258         int                     result; /* first nonzero obj_request result */
259
260         u32                     obj_request_count;
261         struct list_head        obj_requests;   /* rbd_obj_request structs */
262
263         struct kref             kref;
264 };
265
266 #define for_each_obj_request(ireq, oreq) \
267         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
268 #define for_each_obj_request_from(ireq, oreq) \
269         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
270 #define for_each_obj_request_safe(ireq, oreq, n) \
271         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
272
273 struct rbd_snap {
274         struct  device          dev;
275         const char              *name;
276         u64                     size;
277         struct list_head        node;
278         u64                     id;
279         u64                     features;
280 };
281
282 struct rbd_mapping {
283         u64                     size;
284         u64                     features;
285         bool                    read_only;
286 };
287
288 /*
289  * a single device
290  */
291 struct rbd_device {
292         int                     dev_id;         /* blkdev unique id */
293
294         int                     major;          /* blkdev assigned major */
295         struct gendisk          *disk;          /* blkdev's gendisk and rq */
296
297         u32                     image_format;   /* Either 1 or 2 */
298         struct rbd_client       *rbd_client;
299
300         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
301
302         spinlock_t              lock;           /* queue, flags, open_count */
303
304         struct rbd_image_header header;
305         unsigned long           flags;          /* possibly lock protected */
306         struct rbd_spec         *spec;
307
308         char                    *header_name;
309
310         struct ceph_file_layout layout;
311
312         struct ceph_osd_event   *watch_event;
313         struct rbd_obj_request  *watch_request;
314
315         struct rbd_spec         *parent_spec;
316         u64                     parent_overlap;
317         struct rbd_device       *parent;
318
319         /* protects updating the header */
320         struct rw_semaphore     header_rwsem;
321
322         struct rbd_mapping      mapping;
323
324         struct list_head        node;
325
326         /* list of snapshots */
327         struct list_head        snaps;
328
329         /* sysfs related */
330         struct device           dev;
331         unsigned long           open_count;     /* protected by lock */
332 };
333
334 /*
335  * Flag bits for rbd_dev->flags.  If atomicity is required,
336  * rbd_dev->lock is used to protect access.
337  *
338  * Currently, only the "removing" flag (which is coupled with the
339  * "open_count" field) requires atomic access.
340  */
341 enum rbd_dev_flags {
342         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
343         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
344 };
345
346 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
347
348 static LIST_HEAD(rbd_dev_list);    /* devices */
349 static DEFINE_SPINLOCK(rbd_dev_list_lock);
350
351 static LIST_HEAD(rbd_client_list);              /* clients */
352 static DEFINE_SPINLOCK(rbd_client_list_lock);
353
354 static int rbd_img_request_submit(struct rbd_img_request *img_request);
355
356 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
357 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
358
359 static void rbd_dev_release(struct device *dev);
360 static void rbd_remove_snap_dev(struct rbd_snap *snap);
361
362 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
363                        size_t count);
364 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
365                           size_t count);
366 static int rbd_dev_probe(struct rbd_device *rbd_dev);
367
368 static struct bus_attribute rbd_bus_attrs[] = {
369         __ATTR(add, S_IWUSR, NULL, rbd_add),
370         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371         __ATTR_NULL
372 };
373
374 static struct bus_type rbd_bus_type = {
375         .name           = "rbd",
376         .bus_attrs      = rbd_bus_attrs,
377 };
378
379 static void rbd_root_dev_release(struct device *dev)
380 {
381 }
382
383 static struct device rbd_root_dev = {
384         .init_name =    "rbd",
385         .release =      rbd_root_dev_release,
386 };
387
388 static __printf(2, 3)
389 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
390 {
391         struct va_format vaf;
392         va_list args;
393
394         va_start(args, fmt);
395         vaf.fmt = fmt;
396         vaf.va = &args;
397
398         if (!rbd_dev)
399                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400         else if (rbd_dev->disk)
401                 printk(KERN_WARNING "%s: %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_name)
404                 printk(KERN_WARNING "%s: image %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406         else if (rbd_dev->spec && rbd_dev->spec->image_id)
407                 printk(KERN_WARNING "%s: id %s: %pV\n",
408                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
409         else    /* punt */
410                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411                         RBD_DRV_NAME, rbd_dev, &vaf);
412         va_end(args);
413 }
414
415 #ifdef RBD_DEBUG
416 #define rbd_assert(expr)                                                \
417                 if (unlikely(!(expr))) {                                \
418                         printk(KERN_ERR "\nAssertion failure in %s() "  \
419                                                 "at line %d:\n\n"       \
420                                         "\trbd_assert(%s);\n\n",        \
421                                         __func__, __LINE__, #expr);     \
422                         BUG();                                          \
423                 }
424 #else /* !RBD_DEBUG */
425 #  define rbd_assert(expr)      ((void) 0)
426 #endif /* !RBD_DEBUG */
427
428 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
429 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
430
431 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
432 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
433
434 static int rbd_open(struct block_device *bdev, fmode_t mode)
435 {
436         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
437         bool removing = false;
438
439         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
440                 return -EROFS;
441
442         spin_lock_irq(&rbd_dev->lock);
443         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
444                 removing = true;
445         else
446                 rbd_dev->open_count++;
447         spin_unlock_irq(&rbd_dev->lock);
448         if (removing)
449                 return -ENOENT;
450
451         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
452         (void) get_device(&rbd_dev->dev);
453         set_device_ro(bdev, rbd_dev->mapping.read_only);
454         mutex_unlock(&ctl_mutex);
455
456         return 0;
457 }
458
459 static int rbd_release(struct gendisk *disk, fmode_t mode)
460 {
461         struct rbd_device *rbd_dev = disk->private_data;
462         unsigned long open_count_before;
463
464         spin_lock_irq(&rbd_dev->lock);
465         open_count_before = rbd_dev->open_count--;
466         spin_unlock_irq(&rbd_dev->lock);
467         rbd_assert(open_count_before > 0);
468
469         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
470         put_device(&rbd_dev->dev);
471         mutex_unlock(&ctl_mutex);
472
473         return 0;
474 }
475
476 static const struct block_device_operations rbd_bd_ops = {
477         .owner                  = THIS_MODULE,
478         .open                   = rbd_open,
479         .release                = rbd_release,
480 };
481
482 /*
483  * Initialize an rbd client instance.
484  * We own *ceph_opts.
485  */
486 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
487 {
488         struct rbd_client *rbdc;
489         int ret = -ENOMEM;
490
491         dout("%s:\n", __func__);
492         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
493         if (!rbdc)
494                 goto out_opt;
495
496         kref_init(&rbdc->kref);
497         INIT_LIST_HEAD(&rbdc->node);
498
499         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
500
501         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
502         if (IS_ERR(rbdc->client))
503                 goto out_mutex;
504         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
505
506         ret = ceph_open_session(rbdc->client);
507         if (ret < 0)
508                 goto out_err;
509
510         spin_lock(&rbd_client_list_lock);
511         list_add_tail(&rbdc->node, &rbd_client_list);
512         spin_unlock(&rbd_client_list_lock);
513
514         mutex_unlock(&ctl_mutex);
515         dout("%s: rbdc %p\n", __func__, rbdc);
516
517         return rbdc;
518
519 out_err:
520         ceph_destroy_client(rbdc->client);
521 out_mutex:
522         mutex_unlock(&ctl_mutex);
523         kfree(rbdc);
524 out_opt:
525         if (ceph_opts)
526                 ceph_destroy_options(ceph_opts);
527         dout("%s: error %d\n", __func__, ret);
528
529         return ERR_PTR(ret);
530 }
531
532 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
533 {
534         kref_get(&rbdc->kref);
535
536         return rbdc;
537 }
538
539 /*
540  * Find a ceph client with specific addr and configuration.  If
541  * found, bump its reference count.
542  */
543 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
544 {
545         struct rbd_client *client_node;
546         bool found = false;
547
548         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
549                 return NULL;
550
551         spin_lock(&rbd_client_list_lock);
552         list_for_each_entry(client_node, &rbd_client_list, node) {
553                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
554                         __rbd_get_client(client_node);
555
556                         found = true;
557                         break;
558                 }
559         }
560         spin_unlock(&rbd_client_list_lock);
561
562         return found ? client_node : NULL;
563 }
564
565 /*
566  * mount options
567  */
568 enum {
569         Opt_last_int,
570         /* int args above */
571         Opt_last_string,
572         /* string args above */
573         Opt_read_only,
574         Opt_read_write,
575         /* Boolean args above */
576         Opt_last_bool,
577 };
578
579 static match_table_t rbd_opts_tokens = {
580         /* int args above */
581         /* string args above */
582         {Opt_read_only, "read_only"},
583         {Opt_read_only, "ro"},          /* Alternate spelling */
584         {Opt_read_write, "read_write"},
585         {Opt_read_write, "rw"},         /* Alternate spelling */
586         /* Boolean args above */
587         {-1, NULL}
588 };
589
590 struct rbd_options {
591         bool    read_only;
592 };
593
594 #define RBD_READ_ONLY_DEFAULT   false
595
596 static int parse_rbd_opts_token(char *c, void *private)
597 {
598         struct rbd_options *rbd_opts = private;
599         substring_t argstr[MAX_OPT_ARGS];
600         int token, intval, ret;
601
602         token = match_token(c, rbd_opts_tokens, argstr);
603         if (token < 0)
604                 return -EINVAL;
605
606         if (token < Opt_last_int) {
607                 ret = match_int(&argstr[0], &intval);
608                 if (ret < 0) {
609                         pr_err("bad mount option arg (not int) "
610                                "at '%s'\n", c);
611                         return ret;
612                 }
613                 dout("got int token %d val %d\n", token, intval);
614         } else if (token > Opt_last_int && token < Opt_last_string) {
615                 dout("got string token %d val %s\n", token,
616                      argstr[0].from);
617         } else if (token > Opt_last_string && token < Opt_last_bool) {
618                 dout("got Boolean token %d\n", token);
619         } else {
620                 dout("got token %d\n", token);
621         }
622
623         switch (token) {
624         case Opt_read_only:
625                 rbd_opts->read_only = true;
626                 break;
627         case Opt_read_write:
628                 rbd_opts->read_only = false;
629                 break;
630         default:
631                 rbd_assert(false);
632                 break;
633         }
634         return 0;
635 }
636
637 /*
638  * Get a ceph client with specific addr and configuration, if one does
639  * not exist create it.
640  */
641 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
642 {
643         struct rbd_client *rbdc;
644
645         rbdc = rbd_client_find(ceph_opts);
646         if (rbdc)       /* using an existing client */
647                 ceph_destroy_options(ceph_opts);
648         else
649                 rbdc = rbd_client_create(ceph_opts);
650
651         return rbdc;
652 }
653
654 /*
655  * Destroy ceph client
656  *
657  * Caller must hold rbd_client_list_lock.
658  */
659 static void rbd_client_release(struct kref *kref)
660 {
661         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
662
663         dout("%s: rbdc %p\n", __func__, rbdc);
664         spin_lock(&rbd_client_list_lock);
665         list_del(&rbdc->node);
666         spin_unlock(&rbd_client_list_lock);
667
668         ceph_destroy_client(rbdc->client);
669         kfree(rbdc);
670 }
671
672 /*
673  * Drop reference to ceph client node. If it's not referenced anymore, release
674  * it.
675  */
676 static void rbd_put_client(struct rbd_client *rbdc)
677 {
678         if (rbdc)
679                 kref_put(&rbdc->kref, rbd_client_release);
680 }
681
682 static bool rbd_image_format_valid(u32 image_format)
683 {
684         return image_format == 1 || image_format == 2;
685 }
686
687 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
688 {
689         size_t size;
690         u32 snap_count;
691
692         /* The header has to start with the magic rbd header text */
693         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
694                 return false;
695
696         /* The bio layer requires at least sector-sized I/O */
697
698         if (ondisk->options.order < SECTOR_SHIFT)
699                 return false;
700
701         /* If we use u64 in a few spots we may be able to loosen this */
702
703         if (ondisk->options.order > 8 * sizeof (int) - 1)
704                 return false;
705
706         /*
707          * The size of a snapshot header has to fit in a size_t, and
708          * that limits the number of snapshots.
709          */
710         snap_count = le32_to_cpu(ondisk->snap_count);
711         size = SIZE_MAX - sizeof (struct ceph_snap_context);
712         if (snap_count > size / sizeof (__le64))
713                 return false;
714
715         /*
716          * Not only that, but the size of the entire the snapshot
717          * header must also be representable in a size_t.
718          */
719         size -= snap_count * sizeof (__le64);
720         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
721                 return false;
722
723         return true;
724 }
725
726 /*
727  * Create a new header structure, translate header format from the on-disk
728  * header.
729  */
730 static int rbd_header_from_disk(struct rbd_image_header *header,
731                                  struct rbd_image_header_ondisk *ondisk)
732 {
733         u32 snap_count;
734         size_t len;
735         size_t size;
736         u32 i;
737
738         memset(header, 0, sizeof (*header));
739
740         snap_count = le32_to_cpu(ondisk->snap_count);
741
742         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
743         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
744         if (!header->object_prefix)
745                 return -ENOMEM;
746         memcpy(header->object_prefix, ondisk->object_prefix, len);
747         header->object_prefix[len] = '\0';
748
749         if (snap_count) {
750                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
751
752                 /* Save a copy of the snapshot names */
753
754                 if (snap_names_len > (u64) SIZE_MAX)
755                         return -EIO;
756                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
757                 if (!header->snap_names)
758                         goto out_err;
759                 /*
760                  * Note that rbd_dev_v1_header_read() guarantees
761                  * the ondisk buffer we're working with has
762                  * snap_names_len bytes beyond the end of the
763                  * snapshot id array, this memcpy() is safe.
764                  */
765                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
766                         snap_names_len);
767
768                 /* Record each snapshot's size */
769
770                 size = snap_count * sizeof (*header->snap_sizes);
771                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
772                 if (!header->snap_sizes)
773                         goto out_err;
774                 for (i = 0; i < snap_count; i++)
775                         header->snap_sizes[i] =
776                                 le64_to_cpu(ondisk->snaps[i].image_size);
777         } else {
778                 WARN_ON(ondisk->snap_names_len);
779                 header->snap_names = NULL;
780                 header->snap_sizes = NULL;
781         }
782
783         header->features = 0;   /* No features support in v1 images */
784         header->obj_order = ondisk->options.order;
785         header->crypt_type = ondisk->options.crypt_type;
786         header->comp_type = ondisk->options.comp_type;
787
788         /* Allocate and fill in the snapshot context */
789
790         header->image_size = le64_to_cpu(ondisk->image_size);
791         size = sizeof (struct ceph_snap_context);
792         size += snap_count * sizeof (header->snapc->snaps[0]);
793         header->snapc = kzalloc(size, GFP_KERNEL);
794         if (!header->snapc)
795                 goto out_err;
796
797         atomic_set(&header->snapc->nref, 1);
798         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
799         header->snapc->num_snaps = snap_count;
800         for (i = 0; i < snap_count; i++)
801                 header->snapc->snaps[i] =
802                         le64_to_cpu(ondisk->snaps[i].id);
803
804         return 0;
805
806 out_err:
807         kfree(header->snap_sizes);
808         header->snap_sizes = NULL;
809         kfree(header->snap_names);
810         header->snap_names = NULL;
811         kfree(header->object_prefix);
812         header->object_prefix = NULL;
813
814         return -ENOMEM;
815 }
816
817 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
818 {
819         struct rbd_snap *snap;
820
821         if (snap_id == CEPH_NOSNAP)
822                 return RBD_SNAP_HEAD_NAME;
823
824         list_for_each_entry(snap, &rbd_dev->snaps, node)
825                 if (snap_id == snap->id)
826                         return snap->name;
827
828         return NULL;
829 }
830
831 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
832 {
833
834         struct rbd_snap *snap;
835
836         list_for_each_entry(snap, &rbd_dev->snaps, node) {
837                 if (!strcmp(snap_name, snap->name)) {
838                         rbd_dev->spec->snap_id = snap->id;
839                         rbd_dev->mapping.size = snap->size;
840                         rbd_dev->mapping.features = snap->features;
841
842                         return 0;
843                 }
844         }
845
846         return -ENOENT;
847 }
848
849 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
850 {
851         int ret;
852
853         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
854                     sizeof (RBD_SNAP_HEAD_NAME))) {
855                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
856                 rbd_dev->mapping.size = rbd_dev->header.image_size;
857                 rbd_dev->mapping.features = rbd_dev->header.features;
858                 ret = 0;
859         } else {
860                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
861                 if (ret < 0)
862                         goto done;
863                 rbd_dev->mapping.read_only = true;
864         }
865         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
866
867 done:
868         return ret;
869 }
870
871 static void rbd_header_free(struct rbd_image_header *header)
872 {
873         kfree(header->object_prefix);
874         header->object_prefix = NULL;
875         kfree(header->snap_sizes);
876         header->snap_sizes = NULL;
877         kfree(header->snap_names);
878         header->snap_names = NULL;
879         ceph_put_snap_context(header->snapc);
880         header->snapc = NULL;
881 }
882
883 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
884 {
885         char *name;
886         u64 segment;
887         int ret;
888
889         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
890         if (!name)
891                 return NULL;
892         segment = offset >> rbd_dev->header.obj_order;
893         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
894                         rbd_dev->header.object_prefix, segment);
895         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
896                 pr_err("error formatting segment name for #%llu (%d)\n",
897                         segment, ret);
898                 kfree(name);
899                 name = NULL;
900         }
901
902         return name;
903 }
904
905 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
906 {
907         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
908
909         return offset & (segment_size - 1);
910 }
911
912 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
913                                 u64 offset, u64 length)
914 {
915         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
916
917         offset &= segment_size - 1;
918
919         rbd_assert(length <= U64_MAX - offset);
920         if (offset + length > segment_size)
921                 length = segment_size - offset;
922
923         return length;
924 }
925
926 /*
927  * returns the size of an object in the image
928  */
929 static u64 rbd_obj_bytes(struct rbd_image_header *header)
930 {
931         return 1 << header->obj_order;
932 }
933
934 /*
935  * bio helpers
936  */
937
938 static void bio_chain_put(struct bio *chain)
939 {
940         struct bio *tmp;
941
942         while (chain) {
943                 tmp = chain;
944                 chain = chain->bi_next;
945                 bio_put(tmp);
946         }
947 }
948
949 /*
950  * zeros a bio chain, starting at specific offset
951  */
952 static void zero_bio_chain(struct bio *chain, int start_ofs)
953 {
954         struct bio_vec *bv;
955         unsigned long flags;
956         void *buf;
957         int i;
958         int pos = 0;
959
960         while (chain) {
961                 bio_for_each_segment(bv, chain, i) {
962                         if (pos + bv->bv_len > start_ofs) {
963                                 int remainder = max(start_ofs - pos, 0);
964                                 buf = bvec_kmap_irq(bv, &flags);
965                                 memset(buf + remainder, 0,
966                                        bv->bv_len - remainder);
967                                 bvec_kunmap_irq(buf, &flags);
968                         }
969                         pos += bv->bv_len;
970                 }
971
972                 chain = chain->bi_next;
973         }
974 }
975
976 /*
977  * similar to zero_bio_chain(), zeros data defined by a page array,
978  * starting at the given byte offset from the start of the array and
979  * continuing up to the given end offset.  The pages array is
980  * assumed to be big enough to hold all bytes up to the end.
981  */
982 static void zero_pages(struct page **pages, u64 offset, u64 end)
983 {
984         struct page **page = &pages[offset >> PAGE_SHIFT];
985
986         rbd_assert(end > offset);
987         rbd_assert(end - offset <= (u64)SIZE_MAX);
988         while (offset < end) {
989                 size_t page_offset;
990                 size_t length;
991                 unsigned long flags;
992                 void *kaddr;
993
994                 page_offset = (size_t)(offset & ~PAGE_MASK);
995                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
996                 local_irq_save(flags);
997                 kaddr = kmap_atomic(*page);
998                 memset(kaddr + page_offset, 0, length);
999                 kunmap_atomic(kaddr);
1000                 local_irq_restore(flags);
1001
1002                 offset += length;
1003                 page++;
1004         }
1005 }
1006
1007 /*
1008  * Clone a portion of a bio, starting at the given byte offset
1009  * and continuing for the number of bytes indicated.
1010  */
1011 static struct bio *bio_clone_range(struct bio *bio_src,
1012                                         unsigned int offset,
1013                                         unsigned int len,
1014                                         gfp_t gfpmask)
1015 {
1016         struct bio_vec *bv;
1017         unsigned int resid;
1018         unsigned short idx;
1019         unsigned int voff;
1020         unsigned short end_idx;
1021         unsigned short vcnt;
1022         struct bio *bio;
1023
1024         /* Handle the easy case for the caller */
1025
1026         if (!offset && len == bio_src->bi_size)
1027                 return bio_clone(bio_src, gfpmask);
1028
1029         if (WARN_ON_ONCE(!len))
1030                 return NULL;
1031         if (WARN_ON_ONCE(len > bio_src->bi_size))
1032                 return NULL;
1033         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1034                 return NULL;
1035
1036         /* Find first affected segment... */
1037
1038         resid = offset;
1039         __bio_for_each_segment(bv, bio_src, idx, 0) {
1040                 if (resid < bv->bv_len)
1041                         break;
1042                 resid -= bv->bv_len;
1043         }
1044         voff = resid;
1045
1046         /* ...and the last affected segment */
1047
1048         resid += len;
1049         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1050                 if (resid <= bv->bv_len)
1051                         break;
1052                 resid -= bv->bv_len;
1053         }
1054         vcnt = end_idx - idx + 1;
1055
1056         /* Build the clone */
1057
1058         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1059         if (!bio)
1060                 return NULL;    /* ENOMEM */
1061
1062         bio->bi_bdev = bio_src->bi_bdev;
1063         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1064         bio->bi_rw = bio_src->bi_rw;
1065         bio->bi_flags |= 1 << BIO_CLONED;
1066
1067         /*
1068          * Copy over our part of the bio_vec, then update the first
1069          * and last (or only) entries.
1070          */
1071         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1072                         vcnt * sizeof (struct bio_vec));
1073         bio->bi_io_vec[0].bv_offset += voff;
1074         if (vcnt > 1) {
1075                 bio->bi_io_vec[0].bv_len -= voff;
1076                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1077         } else {
1078                 bio->bi_io_vec[0].bv_len = len;
1079         }
1080
1081         bio->bi_vcnt = vcnt;
1082         bio->bi_size = len;
1083         bio->bi_idx = 0;
1084
1085         return bio;
1086 }
1087
1088 /*
1089  * Clone a portion of a bio chain, starting at the given byte offset
1090  * into the first bio in the source chain and continuing for the
1091  * number of bytes indicated.  The result is another bio chain of
1092  * exactly the given length, or a null pointer on error.
1093  *
1094  * The bio_src and offset parameters are both in-out.  On entry they
1095  * refer to the first source bio and the offset into that bio where
1096  * the start of data to be cloned is located.
1097  *
1098  * On return, bio_src is updated to refer to the bio in the source
1099  * chain that contains first un-cloned byte, and *offset will
1100  * contain the offset of that byte within that bio.
1101  */
1102 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1103                                         unsigned int *offset,
1104                                         unsigned int len,
1105                                         gfp_t gfpmask)
1106 {
1107         struct bio *bi = *bio_src;
1108         unsigned int off = *offset;
1109         struct bio *chain = NULL;
1110         struct bio **end;
1111
1112         /* Build up a chain of clone bios up to the limit */
1113
1114         if (!bi || off >= bi->bi_size || !len)
1115                 return NULL;            /* Nothing to clone */
1116
1117         end = &chain;
1118         while (len) {
1119                 unsigned int bi_size;
1120                 struct bio *bio;
1121
1122                 if (!bi) {
1123                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1124                         goto out_err;   /* EINVAL; ran out of bio's */
1125                 }
1126                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1127                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1128                 if (!bio)
1129                         goto out_err;   /* ENOMEM */
1130
1131                 *end = bio;
1132                 end = &bio->bi_next;
1133
1134                 off += bi_size;
1135                 if (off == bi->bi_size) {
1136                         bi = bi->bi_next;
1137                         off = 0;
1138                 }
1139                 len -= bi_size;
1140         }
1141         *bio_src = bi;
1142         *offset = off;
1143
1144         return chain;
1145 out_err:
1146         bio_chain_put(chain);
1147
1148         return NULL;
1149 }
1150
1151 /*
1152  * The default/initial value for all object request flags is 0.  For
1153  * each flag, once its value is set to 1 it is never reset to 0
1154  * again.
1155  */
1156 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1157 {
1158         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1159                 struct rbd_device *rbd_dev;
1160
1161                 rbd_dev = obj_request->img_request->rbd_dev;
1162                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1163                         obj_request);
1164         }
1165 }
1166
1167 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1168 {
1169         smp_mb();
1170         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1171 }
1172
1173 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1174 {
1175         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1176                 struct rbd_device *rbd_dev = NULL;
1177
1178                 if (obj_request_img_data_test(obj_request))
1179                         rbd_dev = obj_request->img_request->rbd_dev;
1180                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1181                         obj_request);
1182         }
1183 }
1184
1185 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1186 {
1187         smp_mb();
1188         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1189 }
1190
1191 /*
1192  * This sets the KNOWN flag after (possibly) setting the EXISTS
1193  * flag.  The latter is set based on the "exists" value provided.
1194  *
1195  * Note that for our purposes once an object exists it never goes
1196  * away again.  It's possible that the response from two existence
1197  * checks are separated by the creation of the target object, and
1198  * the first ("doesn't exist") response arrives *after* the second
1199  * ("does exist").  In that case we ignore the second one.
1200  */
1201 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1202                                 bool exists)
1203 {
1204         if (exists)
1205                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1206         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1207         smp_mb();
1208 }
1209
1210 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1211 {
1212         smp_mb();
1213         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1214 }
1215
1216 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1217 {
1218         smp_mb();
1219         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1220 }
1221
1222 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1223 {
1224         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1225                 atomic_read(&obj_request->kref.refcount));
1226         kref_get(&obj_request->kref);
1227 }
1228
1229 static void rbd_obj_request_destroy(struct kref *kref);
1230 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1231 {
1232         rbd_assert(obj_request != NULL);
1233         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1234                 atomic_read(&obj_request->kref.refcount));
1235         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1236 }
1237
1238 static void rbd_img_request_get(struct rbd_img_request *img_request)
1239 {
1240         dout("%s: img %p (was %d)\n", __func__, img_request,
1241                 atomic_read(&img_request->kref.refcount));
1242         kref_get(&img_request->kref);
1243 }
1244
1245 static void rbd_img_request_destroy(struct kref *kref);
1246 static void rbd_img_request_put(struct rbd_img_request *img_request)
1247 {
1248         rbd_assert(img_request != NULL);
1249         dout("%s: img %p (was %d)\n", __func__, img_request,
1250                 atomic_read(&img_request->kref.refcount));
1251         kref_put(&img_request->kref, rbd_img_request_destroy);
1252 }
1253
1254 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1255                                         struct rbd_obj_request *obj_request)
1256 {
1257         rbd_assert(obj_request->img_request == NULL);
1258
1259         /* Image request now owns object's original reference */
1260         obj_request->img_request = img_request;
1261         obj_request->which = img_request->obj_request_count;
1262         rbd_assert(!obj_request_img_data_test(obj_request));
1263         obj_request_img_data_set(obj_request);
1264         rbd_assert(obj_request->which != BAD_WHICH);
1265         img_request->obj_request_count++;
1266         list_add_tail(&obj_request->links, &img_request->obj_requests);
1267         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1268                 obj_request->which);
1269 }
1270
1271 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1272                                         struct rbd_obj_request *obj_request)
1273 {
1274         rbd_assert(obj_request->which != BAD_WHICH);
1275
1276         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1277                 obj_request->which);
1278         list_del(&obj_request->links);
1279         rbd_assert(img_request->obj_request_count > 0);
1280         img_request->obj_request_count--;
1281         rbd_assert(obj_request->which == img_request->obj_request_count);
1282         obj_request->which = BAD_WHICH;
1283         rbd_assert(obj_request_img_data_test(obj_request));
1284         rbd_assert(obj_request->img_request == img_request);
1285         obj_request->img_request = NULL;
1286         obj_request->callback = NULL;
1287         rbd_obj_request_put(obj_request);
1288 }
1289
1290 static bool obj_request_type_valid(enum obj_request_type type)
1291 {
1292         switch (type) {
1293         case OBJ_REQUEST_NODATA:
1294         case OBJ_REQUEST_BIO:
1295         case OBJ_REQUEST_PAGES:
1296                 return true;
1297         default:
1298                 return false;
1299         }
1300 }
1301
1302 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1303                                 struct rbd_obj_request *obj_request)
1304 {
1305         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1306
1307         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1308 }
1309
1310 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1311 {
1312
1313         dout("%s: img %p\n", __func__, img_request);
1314
1315         /*
1316          * If no error occurred, compute the aggregate transfer
1317          * count for the image request.  We could instead use
1318          * atomic64_cmpxchg() to update it as each object request
1319          * completes; not clear which way is better off hand.
1320          */
1321         if (!img_request->result) {
1322                 struct rbd_obj_request *obj_request;
1323                 u64 xferred = 0;
1324
1325                 for_each_obj_request(img_request, obj_request)
1326                         xferred += obj_request->xferred;
1327                 img_request->xferred = xferred;
1328         }
1329
1330         if (img_request->callback)
1331                 img_request->callback(img_request);
1332         else
1333                 rbd_img_request_put(img_request);
1334 }
1335
1336 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1337
1338 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1339 {
1340         dout("%s: obj %p\n", __func__, obj_request);
1341
1342         return wait_for_completion_interruptible(&obj_request->completion);
1343 }
1344
1345 /*
1346  * The default/initial value for all image request flags is 0.  Each
1347  * is conditionally set to 1 at image request initialization time
1348  * and currently never change thereafter.
1349  */
1350 static void img_request_write_set(struct rbd_img_request *img_request)
1351 {
1352         set_bit(IMG_REQ_WRITE, &img_request->flags);
1353         smp_mb();
1354 }
1355
1356 static bool img_request_write_test(struct rbd_img_request *img_request)
1357 {
1358         smp_mb();
1359         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1360 }
1361
1362 static void img_request_child_set(struct rbd_img_request *img_request)
1363 {
1364         set_bit(IMG_REQ_CHILD, &img_request->flags);
1365         smp_mb();
1366 }
1367
1368 static bool img_request_child_test(struct rbd_img_request *img_request)
1369 {
1370         smp_mb();
1371         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1372 }
1373
1374 static void img_request_layered_set(struct rbd_img_request *img_request)
1375 {
1376         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1377         smp_mb();
1378 }
1379
1380 static bool img_request_layered_test(struct rbd_img_request *img_request)
1381 {
1382         smp_mb();
1383         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1384 }
1385
1386 static void
1387 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1388 {
1389         u64 xferred = obj_request->xferred;
1390         u64 length = obj_request->length;
1391
1392         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1393                 obj_request, obj_request->img_request, obj_request->result,
1394                 xferred, length);
1395         /*
1396          * ENOENT means a hole in the image.  We zero-fill the
1397          * entire length of the request.  A short read also implies
1398          * zero-fill to the end of the request.  Either way we
1399          * update the xferred count to indicate the whole request
1400          * was satisfied.
1401          */
1402         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1403         if (obj_request->result == -ENOENT) {
1404                 if (obj_request->type == OBJ_REQUEST_BIO)
1405                         zero_bio_chain(obj_request->bio_list, 0);
1406                 else
1407                         zero_pages(obj_request->pages, 0, length);
1408                 obj_request->result = 0;
1409                 obj_request->xferred = length;
1410         } else if (xferred < length && !obj_request->result) {
1411                 if (obj_request->type == OBJ_REQUEST_BIO)
1412                         zero_bio_chain(obj_request->bio_list, xferred);
1413                 else
1414                         zero_pages(obj_request->pages, xferred, length);
1415                 obj_request->xferred = length;
1416         }
1417         obj_request_done_set(obj_request);
1418 }
1419
1420 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1421 {
1422         dout("%s: obj %p cb %p\n", __func__, obj_request,
1423                 obj_request->callback);
1424         if (obj_request->callback)
1425                 obj_request->callback(obj_request);
1426         else
1427                 complete_all(&obj_request->completion);
1428 }
1429
1430 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1431 {
1432         dout("%s: obj %p\n", __func__, obj_request);
1433         obj_request_done_set(obj_request);
1434 }
1435
1436 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1437 {
1438         struct rbd_img_request *img_request = NULL;
1439         bool layered = false;
1440
1441         if (obj_request_img_data_test(obj_request)) {
1442                 img_request = obj_request->img_request;
1443                 layered = img_request && img_request_layered_test(img_request);
1444         } else {
1445                 img_request = NULL;
1446                 layered = false;
1447         }
1448
1449         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1450                 obj_request, img_request, obj_request->result,
1451                 obj_request->xferred, obj_request->length);
1452         if (layered && obj_request->result == -ENOENT)
1453                 rbd_img_parent_read(obj_request);
1454         else if (img_request)
1455                 rbd_img_obj_request_read_callback(obj_request);
1456         else
1457                 obj_request_done_set(obj_request);
1458 }
1459
1460 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1461 {
1462         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1463                 obj_request->result, obj_request->length);
1464         /*
1465          * There is no such thing as a successful short write.  Set
1466          * it to our originally-requested length.
1467          */
1468         obj_request->xferred = obj_request->length;
1469         obj_request_done_set(obj_request);
1470 }
1471
1472 /*
1473  * For a simple stat call there's nothing to do.  We'll do more if
1474  * this is part of a write sequence for a layered image.
1475  */
1476 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1477 {
1478         dout("%s: obj %p\n", __func__, obj_request);
1479         obj_request_done_set(obj_request);
1480 }
1481
1482 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1483                                 struct ceph_msg *msg)
1484 {
1485         struct rbd_obj_request *obj_request = osd_req->r_priv;
1486         u16 opcode;
1487
1488         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1489         rbd_assert(osd_req == obj_request->osd_req);
1490         if (obj_request_img_data_test(obj_request)) {
1491                 rbd_assert(obj_request->img_request);
1492                 rbd_assert(obj_request->which != BAD_WHICH);
1493         } else {
1494                 rbd_assert(obj_request->which == BAD_WHICH);
1495         }
1496
1497         if (osd_req->r_result < 0)
1498                 obj_request->result = osd_req->r_result;
1499         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1500
1501         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1502
1503         /*
1504          * We support a 64-bit length, but ultimately it has to be
1505          * passed to blk_end_request(), which takes an unsigned int.
1506          */
1507         obj_request->xferred = osd_req->r_reply_op_len[0];
1508         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1509         opcode = osd_req->r_ops[0].op;
1510         switch (opcode) {
1511         case CEPH_OSD_OP_READ:
1512                 rbd_osd_read_callback(obj_request);
1513                 break;
1514         case CEPH_OSD_OP_WRITE:
1515                 rbd_osd_write_callback(obj_request);
1516                 break;
1517         case CEPH_OSD_OP_STAT:
1518                 rbd_osd_stat_callback(obj_request);
1519                 break;
1520         case CEPH_OSD_OP_CALL:
1521         case CEPH_OSD_OP_NOTIFY_ACK:
1522         case CEPH_OSD_OP_WATCH:
1523                 rbd_osd_trivial_callback(obj_request);
1524                 break;
1525         default:
1526                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1527                         obj_request->object_name, (unsigned short) opcode);
1528                 break;
1529         }
1530
1531         if (obj_request_done_test(obj_request))
1532                 rbd_obj_request_complete(obj_request);
1533 }
1534
1535 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1536 {
1537         struct rbd_img_request *img_request = obj_request->img_request;
1538         struct ceph_osd_request *osd_req = obj_request->osd_req;
1539         u64 snap_id;
1540
1541         rbd_assert(osd_req != NULL);
1542
1543         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1544         ceph_osdc_build_request(osd_req, obj_request->offset,
1545                         NULL, snap_id, NULL);
1546 }
1547
1548 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1549 {
1550         struct rbd_img_request *img_request = obj_request->img_request;
1551         struct ceph_osd_request *osd_req = obj_request->osd_req;
1552         struct ceph_snap_context *snapc;
1553         struct timespec mtime = CURRENT_TIME;
1554
1555         rbd_assert(osd_req != NULL);
1556
1557         snapc = img_request ? img_request->snapc : NULL;
1558         ceph_osdc_build_request(osd_req, obj_request->offset,
1559                         snapc, CEPH_NOSNAP, &mtime);
1560 }
1561
1562 static struct ceph_osd_request *rbd_osd_req_create(
1563                                         struct rbd_device *rbd_dev,
1564                                         bool write_request,
1565                                         struct rbd_obj_request *obj_request)
1566 {
1567         struct ceph_snap_context *snapc = NULL;
1568         struct ceph_osd_client *osdc;
1569         struct ceph_osd_request *osd_req;
1570
1571         if (obj_request_img_data_test(obj_request)) {
1572                 struct rbd_img_request *img_request = obj_request->img_request;
1573
1574                 rbd_assert(write_request ==
1575                                 img_request_write_test(img_request));
1576                 if (write_request)
1577                         snapc = img_request->snapc;
1578         }
1579
1580         /* Allocate and initialize the request, for the single op */
1581
1582         osdc = &rbd_dev->rbd_client->client->osdc;
1583         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1584         if (!osd_req)
1585                 return NULL;    /* ENOMEM */
1586
1587         if (write_request)
1588                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1589         else
1590                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1591
1592         osd_req->r_callback = rbd_osd_req_callback;
1593         osd_req->r_priv = obj_request;
1594
1595         osd_req->r_oid_len = strlen(obj_request->object_name);
1596         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1597         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1598
1599         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1600
1601         return osd_req;
1602 }
1603
1604 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1605 {
1606         ceph_osdc_put_request(osd_req);
1607 }
1608
1609 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1610
1611 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1612                                                 u64 offset, u64 length,
1613                                                 enum obj_request_type type)
1614 {
1615         struct rbd_obj_request *obj_request;
1616         size_t size;
1617         char *name;
1618
1619         rbd_assert(obj_request_type_valid(type));
1620
1621         size = strlen(object_name) + 1;
1622         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1623         if (!obj_request)
1624                 return NULL;
1625
1626         name = (char *)(obj_request + 1);
1627         obj_request->object_name = memcpy(name, object_name, size);
1628         obj_request->offset = offset;
1629         obj_request->length = length;
1630         obj_request->flags = 0;
1631         obj_request->which = BAD_WHICH;
1632         obj_request->type = type;
1633         INIT_LIST_HEAD(&obj_request->links);
1634         init_completion(&obj_request->completion);
1635         kref_init(&obj_request->kref);
1636
1637         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1638                 offset, length, (int)type, obj_request);
1639
1640         return obj_request;
1641 }
1642
1643 static void rbd_obj_request_destroy(struct kref *kref)
1644 {
1645         struct rbd_obj_request *obj_request;
1646
1647         obj_request = container_of(kref, struct rbd_obj_request, kref);
1648
1649         dout("%s: obj %p\n", __func__, obj_request);
1650
1651         rbd_assert(obj_request->img_request == NULL);
1652         rbd_assert(obj_request->which == BAD_WHICH);
1653
1654         if (obj_request->osd_req)
1655                 rbd_osd_req_destroy(obj_request->osd_req);
1656
1657         rbd_assert(obj_request_type_valid(obj_request->type));
1658         switch (obj_request->type) {
1659         case OBJ_REQUEST_NODATA:
1660                 break;          /* Nothing to do */
1661         case OBJ_REQUEST_BIO:
1662                 if (obj_request->bio_list)
1663                         bio_chain_put(obj_request->bio_list);
1664                 break;
1665         case OBJ_REQUEST_PAGES:
1666                 if (obj_request->pages)
1667                         ceph_release_page_vector(obj_request->pages,
1668                                                 obj_request->page_count);
1669                 break;
1670         }
1671
1672         kfree(obj_request);
1673 }
1674
1675 /*
1676  * Caller is responsible for filling in the list of object requests
1677  * that comprises the image request, and the Linux request pointer
1678  * (if there is one).
1679  */
1680 static struct rbd_img_request *rbd_img_request_create(
1681                                         struct rbd_device *rbd_dev,
1682                                         u64 offset, u64 length,
1683                                         bool write_request,
1684                                         bool child_request)
1685 {
1686         struct rbd_img_request *img_request;
1687         struct ceph_snap_context *snapc = NULL;
1688
1689         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1690         if (!img_request)
1691                 return NULL;
1692
1693         if (write_request) {
1694                 down_read(&rbd_dev->header_rwsem);
1695                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1696                 up_read(&rbd_dev->header_rwsem);
1697                 if (WARN_ON(!snapc)) {
1698                         kfree(img_request);
1699                         return NULL;    /* Shouldn't happen */
1700                 }
1701
1702         }
1703
1704         img_request->rq = NULL;
1705         img_request->rbd_dev = rbd_dev;
1706         img_request->offset = offset;
1707         img_request->length = length;
1708         img_request->flags = 0;
1709         if (write_request) {
1710                 img_request_write_set(img_request);
1711                 img_request->snapc = snapc;
1712         } else {
1713                 img_request->snap_id = rbd_dev->spec->snap_id;
1714         }
1715         if (child_request)
1716                 img_request_child_set(img_request);
1717         if (rbd_dev->parent_spec)
1718                 img_request_layered_set(img_request);
1719         spin_lock_init(&img_request->completion_lock);
1720         img_request->next_completion = 0;
1721         img_request->callback = NULL;
1722         img_request->result = 0;
1723         img_request->obj_request_count = 0;
1724         INIT_LIST_HEAD(&img_request->obj_requests);
1725         kref_init(&img_request->kref);
1726
1727         rbd_img_request_get(img_request);       /* Avoid a warning */
1728         rbd_img_request_put(img_request);       /* TEMPORARY */
1729
1730         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1731                 write_request ? "write" : "read", offset, length,
1732                 img_request);
1733
1734         return img_request;
1735 }
1736
1737 static void rbd_img_request_destroy(struct kref *kref)
1738 {
1739         struct rbd_img_request *img_request;
1740         struct rbd_obj_request *obj_request;
1741         struct rbd_obj_request *next_obj_request;
1742
1743         img_request = container_of(kref, struct rbd_img_request, kref);
1744
1745         dout("%s: img %p\n", __func__, img_request);
1746
1747         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1748                 rbd_img_obj_request_del(img_request, obj_request);
1749         rbd_assert(img_request->obj_request_count == 0);
1750
1751         if (img_request_write_test(img_request))
1752                 ceph_put_snap_context(img_request->snapc);
1753
1754         if (img_request_child_test(img_request))
1755                 rbd_obj_request_put(img_request->obj_request);
1756
1757         kfree(img_request);
1758 }
1759
1760 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1761 {
1762         struct rbd_img_request *img_request;
1763         unsigned int xferred;
1764         int result;
1765         bool more;
1766
1767         rbd_assert(obj_request_img_data_test(obj_request));
1768         img_request = obj_request->img_request;
1769
1770         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1771         xferred = (unsigned int)obj_request->xferred;
1772         result = obj_request->result;
1773         if (result) {
1774                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1775
1776                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1777                         img_request_write_test(img_request) ? "write" : "read",
1778                         obj_request->length, obj_request->img_offset,
1779                         obj_request->offset);
1780                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1781                         result, xferred);
1782                 if (!img_request->result)
1783                         img_request->result = result;
1784         }
1785
1786         /* Image object requests don't own their page array */
1787
1788         if (obj_request->type == OBJ_REQUEST_PAGES) {
1789                 obj_request->pages = NULL;
1790                 obj_request->page_count = 0;
1791         }
1792
1793         if (img_request_child_test(img_request)) {
1794                 rbd_assert(img_request->obj_request != NULL);
1795                 more = obj_request->which < img_request->obj_request_count - 1;
1796         } else {
1797                 rbd_assert(img_request->rq != NULL);
1798                 more = blk_end_request(img_request->rq, result, xferred);
1799         }
1800
1801         return more;
1802 }
1803
1804 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1805 {
1806         struct rbd_img_request *img_request;
1807         u32 which = obj_request->which;
1808         bool more = true;
1809
1810         rbd_assert(obj_request_img_data_test(obj_request));
1811         img_request = obj_request->img_request;
1812
1813         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1814         rbd_assert(img_request != NULL);
1815         rbd_assert(img_request->obj_request_count > 0);
1816         rbd_assert(which != BAD_WHICH);
1817         rbd_assert(which < img_request->obj_request_count);
1818         rbd_assert(which >= img_request->next_completion);
1819
1820         spin_lock_irq(&img_request->completion_lock);
1821         if (which != img_request->next_completion)
1822                 goto out;
1823
1824         for_each_obj_request_from(img_request, obj_request) {
1825                 rbd_assert(more);
1826                 rbd_assert(which < img_request->obj_request_count);
1827
1828                 if (!obj_request_done_test(obj_request))
1829                         break;
1830                 more = rbd_img_obj_end_request(obj_request);
1831                 which++;
1832         }
1833
1834         rbd_assert(more ^ (which == img_request->obj_request_count));
1835         img_request->next_completion = which;
1836 out:
1837         spin_unlock_irq(&img_request->completion_lock);
1838
1839         if (!more)
1840                 rbd_img_request_complete(img_request);
1841 }
1842
1843 /*
1844  * Split up an image request into one or more object requests, each
1845  * to a different object.  The "type" parameter indicates whether
1846  * "data_desc" is the pointer to the head of a list of bio
1847  * structures, or the base of a page array.  In either case this
1848  * function assumes data_desc describes memory sufficient to hold
1849  * all data described by the image request.
1850  */
1851 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1852                                         enum obj_request_type type,
1853                                         void *data_desc)
1854 {
1855         struct rbd_device *rbd_dev = img_request->rbd_dev;
1856         struct rbd_obj_request *obj_request = NULL;
1857         struct rbd_obj_request *next_obj_request;
1858         bool write_request = img_request_write_test(img_request);
1859         struct bio *bio_list;
1860         unsigned int bio_offset = 0;
1861         struct page **pages;
1862         u64 img_offset;
1863         u64 resid;
1864         u16 opcode;
1865
1866         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1867                 (int)type, data_desc);
1868
1869         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1870         img_offset = img_request->offset;
1871         resid = img_request->length;
1872         rbd_assert(resid > 0);
1873
1874         if (type == OBJ_REQUEST_BIO) {
1875                 bio_list = data_desc;
1876                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1877         } else {
1878                 rbd_assert(type == OBJ_REQUEST_PAGES);
1879                 pages = data_desc;
1880         }
1881
1882         while (resid) {
1883                 struct ceph_osd_request *osd_req;
1884                 const char *object_name;
1885                 u64 offset;
1886                 u64 length;
1887
1888                 object_name = rbd_segment_name(rbd_dev, img_offset);
1889                 if (!object_name)
1890                         goto out_unwind;
1891                 offset = rbd_segment_offset(rbd_dev, img_offset);
1892                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1893                 obj_request = rbd_obj_request_create(object_name,
1894                                                 offset, length, type);
1895                 kfree(object_name);     /* object request has its own copy */
1896                 if (!obj_request)
1897                         goto out_unwind;
1898
1899                 if (type == OBJ_REQUEST_BIO) {
1900                         unsigned int clone_size;
1901
1902                         rbd_assert(length <= (u64)UINT_MAX);
1903                         clone_size = (unsigned int)length;
1904                         obj_request->bio_list =
1905                                         bio_chain_clone_range(&bio_list,
1906                                                                 &bio_offset,
1907                                                                 clone_size,
1908                                                                 GFP_ATOMIC);
1909                         if (!obj_request->bio_list)
1910                                 goto out_partial;
1911                 } else {
1912                         unsigned int page_count;
1913
1914                         obj_request->pages = pages;
1915                         page_count = (u32)calc_pages_for(offset, length);
1916                         obj_request->page_count = page_count;
1917                         if ((offset + length) & ~PAGE_MASK)
1918                                 page_count--;   /* more on last page */
1919                         pages += page_count;
1920                 }
1921
1922                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1923                                                 obj_request);
1924                 if (!osd_req)
1925                         goto out_partial;
1926                 obj_request->osd_req = osd_req;
1927                 obj_request->callback = rbd_img_obj_callback;
1928
1929                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1930                                                 0, 0);
1931                 if (type == OBJ_REQUEST_BIO)
1932                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1933                                         obj_request->bio_list, length);
1934                 else
1935                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1936                                         obj_request->pages, length,
1937                                         offset & ~PAGE_MASK, false, false);
1938
1939                 if (write_request)
1940                         rbd_osd_req_format_write(obj_request);
1941                 else
1942                         rbd_osd_req_format_read(obj_request);
1943
1944                 obj_request->img_offset = img_offset;
1945                 rbd_img_obj_request_add(img_request, obj_request);
1946
1947                 img_offset += length;
1948                 resid -= length;
1949         }
1950
1951         return 0;
1952
1953 out_partial:
1954         rbd_obj_request_put(obj_request);
1955 out_unwind:
1956         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1957                 rbd_obj_request_put(obj_request);
1958
1959         return -ENOMEM;
1960 }
1961
1962 static void
1963 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
1964 {
1965         struct rbd_obj_request *orig_request;
1966         struct page **pages;
1967         u32 page_count;
1968         int result;
1969         u64 obj_size;
1970         u64 xferred;
1971
1972         rbd_assert(img_request_child_test(img_request));
1973
1974         /* First get what we need from the image request */
1975
1976         pages = img_request->copyup_pages;
1977         rbd_assert(pages != NULL);
1978         img_request->copyup_pages = NULL;
1979
1980         orig_request = img_request->obj_request;
1981         rbd_assert(orig_request != NULL);
1982
1983         result = img_request->result;
1984         obj_size = img_request->length;
1985         xferred = img_request->xferred;
1986
1987         rbd_img_request_put(img_request);
1988
1989         obj_request_existence_set(orig_request, true);
1990
1991         page_count = (u32)calc_pages_for(0, obj_size);
1992         ceph_release_page_vector(pages, page_count);
1993
1994         /* Resubmit the original request (for now). */
1995
1996         orig_request->result = rbd_img_obj_request_submit(orig_request);
1997         if (orig_request->result) {
1998                 obj_request_done_set(orig_request);
1999                 rbd_obj_request_complete(orig_request);
2000         }
2001 }
2002
2003 /*
2004  * Read from the parent image the range of data that covers the
2005  * entire target of the given object request.  This is used for
2006  * satisfying a layered image write request when the target of an
2007  * object request from the image request does not exist.
2008  *
2009  * A page array big enough to hold the returned data is allocated
2010  * and supplied to rbd_img_request_fill() as the "data descriptor."
2011  * When the read completes, this page array will be transferred to
2012  * the original object request for the copyup operation.
2013  *
2014  * If an error occurs, record it as the result of the original
2015  * object request and mark it done so it gets completed.
2016  */
2017 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2018 {
2019         struct rbd_img_request *img_request = NULL;
2020         struct rbd_img_request *parent_request = NULL;
2021         struct rbd_device *rbd_dev;
2022         u64 img_offset;
2023         u64 length;
2024         struct page **pages = NULL;
2025         u32 page_count;
2026         int result;
2027
2028         rbd_assert(obj_request_img_data_test(obj_request));
2029         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2030
2031         img_request = obj_request->img_request;
2032         rbd_assert(img_request != NULL);
2033         rbd_dev = img_request->rbd_dev;
2034         rbd_assert(rbd_dev->parent != NULL);
2035
2036         /*
2037          * Determine the byte range covered by the object in the
2038          * child image to which the original request was to be sent.
2039          */
2040         img_offset = obj_request->img_offset - obj_request->offset;
2041         length = (u64)1 << rbd_dev->header.obj_order;
2042
2043         /*
2044          * Allocate a page array big enough to receive the data read
2045          * from the parent.
2046          */
2047         page_count = (u32)calc_pages_for(0, length);
2048         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2049         if (IS_ERR(pages)) {
2050                 result = PTR_ERR(pages);
2051                 pages = NULL;
2052                 goto out_err;
2053         }
2054
2055         result = -ENOMEM;
2056         parent_request = rbd_img_request_create(rbd_dev->parent,
2057                                                 img_offset, length,
2058                                                 false, true);
2059         if (!parent_request)
2060                 goto out_err;
2061         rbd_obj_request_get(obj_request);
2062         parent_request->obj_request = obj_request;
2063
2064         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2065         if (result)
2066                 goto out_err;
2067         parent_request->copyup_pages = pages;
2068
2069         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2070         result = rbd_img_request_submit(parent_request);
2071         if (!result)
2072                 return 0;
2073
2074         parent_request->copyup_pages = NULL;
2075         parent_request->obj_request = NULL;
2076         rbd_obj_request_put(obj_request);
2077 out_err:
2078         if (pages)
2079                 ceph_release_page_vector(pages, page_count);
2080         if (parent_request)
2081                 rbd_img_request_put(parent_request);
2082         obj_request->result = result;
2083         obj_request->xferred = 0;
2084         obj_request_done_set(obj_request);
2085
2086         return result;
2087 }
2088
2089 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2090 {
2091         struct rbd_obj_request *orig_request;
2092         int result;
2093
2094         rbd_assert(!obj_request_img_data_test(obj_request));
2095
2096         /*
2097          * All we need from the object request is the original
2098          * request and the result of the STAT op.  Grab those, then
2099          * we're done with the request.
2100          */
2101         orig_request = obj_request->obj_request;
2102         obj_request->obj_request = NULL;
2103         rbd_assert(orig_request);
2104         rbd_assert(orig_request->img_request);
2105
2106         result = obj_request->result;
2107         obj_request->result = 0;
2108
2109         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2110                 obj_request, orig_request, result,
2111                 obj_request->xferred, obj_request->length);
2112         rbd_obj_request_put(obj_request);
2113
2114         rbd_assert(orig_request);
2115         rbd_assert(orig_request->img_request);
2116
2117         /*
2118          * Our only purpose here is to determine whether the object
2119          * exists, and we don't want to treat the non-existence as
2120          * an error.  If something else comes back, transfer the
2121          * error to the original request and complete it now.
2122          */
2123         if (!result) {
2124                 obj_request_existence_set(orig_request, true);
2125         } else if (result == -ENOENT) {
2126                 obj_request_existence_set(orig_request, false);
2127         } else if (result) {
2128                 orig_request->result = result;
2129                 goto out;
2130         }
2131
2132         /*
2133          * Resubmit the original request now that we have recorded
2134          * whether the target object exists.
2135          */
2136         orig_request->result = rbd_img_obj_request_submit(orig_request);
2137 out:
2138         if (orig_request->result)
2139                 rbd_obj_request_complete(orig_request);
2140         rbd_obj_request_put(orig_request);
2141 }
2142
2143 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2144 {
2145         struct rbd_obj_request *stat_request;
2146         struct rbd_device *rbd_dev;
2147         struct ceph_osd_client *osdc;
2148         struct page **pages = NULL;
2149         u32 page_count;
2150         size_t size;
2151         int ret;
2152
2153         /*
2154          * The response data for a STAT call consists of:
2155          *     le64 length;
2156          *     struct {
2157          *         le32 tv_sec;
2158          *         le32 tv_nsec;
2159          *     } mtime;
2160          */
2161         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2162         page_count = (u32)calc_pages_for(0, size);
2163         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2164         if (IS_ERR(pages))
2165                 return PTR_ERR(pages);
2166
2167         ret = -ENOMEM;
2168         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2169                                                         OBJ_REQUEST_PAGES);
2170         if (!stat_request)
2171                 goto out;
2172
2173         rbd_obj_request_get(obj_request);
2174         stat_request->obj_request = obj_request;
2175         stat_request->pages = pages;
2176         stat_request->page_count = page_count;
2177
2178         rbd_assert(obj_request->img_request);
2179         rbd_dev = obj_request->img_request->rbd_dev;
2180         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2181                                                 stat_request);
2182         if (!stat_request->osd_req)
2183                 goto out;
2184         stat_request->callback = rbd_img_obj_exists_callback;
2185
2186         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2187         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2188                                         false, false);
2189         rbd_osd_req_format_read(stat_request);
2190
2191         osdc = &rbd_dev->rbd_client->client->osdc;
2192         ret = rbd_obj_request_submit(osdc, stat_request);
2193 out:
2194         if (ret)
2195                 rbd_obj_request_put(obj_request);
2196
2197         return ret;
2198 }
2199
2200 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2201 {
2202         struct rbd_img_request *img_request;
2203         bool known;
2204
2205         rbd_assert(obj_request_img_data_test(obj_request));
2206
2207         img_request = obj_request->img_request;
2208         rbd_assert(img_request);
2209
2210         /*
2211          * Only layered writes need special handling.  If it's not a
2212          * layered write, or it is a layered write but we know the
2213          * target object exists, it's no different from any other
2214          * object request.
2215          */
2216         if (!img_request_write_test(img_request) ||
2217                 !img_request_layered_test(img_request) ||
2218                 ((known = obj_request_known_test(obj_request)) &&
2219                         obj_request_exists_test(obj_request))) {
2220
2221                 struct rbd_device *rbd_dev;
2222                 struct ceph_osd_client *osdc;
2223
2224                 rbd_dev = obj_request->img_request->rbd_dev;
2225                 osdc = &rbd_dev->rbd_client->client->osdc;
2226
2227                 return rbd_obj_request_submit(osdc, obj_request);
2228         }
2229
2230         /*
2231          * It's a layered write.  The target object might exist but
2232          * we may not know that yet.  If we know it doesn't exist,
2233          * start by reading the data for the full target object from
2234          * the parent so we can use it for a copyup to the target.
2235          */
2236         if (known)
2237                 return rbd_img_obj_parent_read_full(obj_request);
2238
2239         /* We don't know whether the target exists.  Go find out. */
2240
2241         return rbd_img_obj_exists_submit(obj_request);
2242 }
2243
2244 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2245 {
2246         struct rbd_obj_request *obj_request;
2247         struct rbd_obj_request *next_obj_request;
2248
2249         dout("%s: img %p\n", __func__, img_request);
2250         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2251                 int ret;
2252
2253                 ret = rbd_img_obj_request_submit(obj_request);
2254                 if (ret)
2255                         return ret;
2256         }
2257
2258         return 0;
2259 }
2260
2261 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2262 {
2263         struct rbd_obj_request *obj_request;
2264
2265         rbd_assert(img_request_child_test(img_request));
2266
2267         obj_request = img_request->obj_request;
2268         rbd_assert(obj_request != NULL);
2269         obj_request->result = img_request->result;
2270         obj_request->xferred = img_request->xferred;
2271
2272         rbd_img_obj_request_read_callback(obj_request);
2273         rbd_obj_request_complete(obj_request);
2274 }
2275
2276 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2277 {
2278         struct rbd_device *rbd_dev;
2279         struct rbd_img_request *img_request;
2280         int result;
2281
2282         rbd_assert(obj_request_img_data_test(obj_request));
2283         rbd_assert(obj_request->img_request != NULL);
2284         rbd_assert(obj_request->result == (s32) -ENOENT);
2285         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2286
2287         rbd_dev = obj_request->img_request->rbd_dev;
2288         rbd_assert(rbd_dev->parent != NULL);
2289         /* rbd_read_finish(obj_request, obj_request->length); */
2290         img_request = rbd_img_request_create(rbd_dev->parent,
2291                                                 obj_request->img_offset,
2292                                                 obj_request->length,
2293                                                 false, true);
2294         result = -ENOMEM;
2295         if (!img_request)
2296                 goto out_err;
2297
2298         rbd_obj_request_get(obj_request);
2299         img_request->obj_request = obj_request;
2300
2301         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2302                                         obj_request->bio_list);
2303         if (result)
2304                 goto out_err;
2305
2306         img_request->callback = rbd_img_parent_read_callback;
2307         result = rbd_img_request_submit(img_request);
2308         if (result)
2309                 goto out_err;
2310
2311         return;
2312 out_err:
2313         if (img_request)
2314                 rbd_img_request_put(img_request);
2315         obj_request->result = result;
2316         obj_request->xferred = 0;
2317         obj_request_done_set(obj_request);
2318 }
2319
2320 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2321                                    u64 ver, u64 notify_id)
2322 {
2323         struct rbd_obj_request *obj_request;
2324         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2325         int ret;
2326
2327         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2328                                                         OBJ_REQUEST_NODATA);
2329         if (!obj_request)
2330                 return -ENOMEM;
2331
2332         ret = -ENOMEM;
2333         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2334         if (!obj_request->osd_req)
2335                 goto out;
2336         obj_request->callback = rbd_obj_request_put;
2337
2338         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2339                                         notify_id, ver, 0);
2340         rbd_osd_req_format_read(obj_request);
2341
2342         ret = rbd_obj_request_submit(osdc, obj_request);
2343 out:
2344         if (ret)
2345                 rbd_obj_request_put(obj_request);
2346
2347         return ret;
2348 }
2349
2350 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2351 {
2352         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2353         u64 hver;
2354         int rc;
2355
2356         if (!rbd_dev)
2357                 return;
2358
2359         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2360                 rbd_dev->header_name, (unsigned long long) notify_id,
2361                 (unsigned int) opcode);
2362         rc = rbd_dev_refresh(rbd_dev, &hver);
2363         if (rc)
2364                 rbd_warn(rbd_dev, "got notification but failed to "
2365                            " update snaps: %d\n", rc);
2366
2367         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2368 }
2369
2370 /*
2371  * Request sync osd watch/unwatch.  The value of "start" determines
2372  * whether a watch request is being initiated or torn down.
2373  */
2374 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2375 {
2376         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2377         struct rbd_obj_request *obj_request;
2378         int ret;
2379
2380         rbd_assert(start ^ !!rbd_dev->watch_event);
2381         rbd_assert(start ^ !!rbd_dev->watch_request);
2382
2383         if (start) {
2384                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2385                                                 &rbd_dev->watch_event);
2386                 if (ret < 0)
2387                         return ret;
2388                 rbd_assert(rbd_dev->watch_event != NULL);
2389         }
2390
2391         ret = -ENOMEM;
2392         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2393                                                         OBJ_REQUEST_NODATA);
2394         if (!obj_request)
2395                 goto out_cancel;
2396
2397         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2398         if (!obj_request->osd_req)
2399                 goto out_cancel;
2400
2401         if (start)
2402                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2403         else
2404                 ceph_osdc_unregister_linger_request(osdc,
2405                                         rbd_dev->watch_request->osd_req);
2406
2407         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2408                                 rbd_dev->watch_event->cookie,
2409                                 rbd_dev->header.obj_version, start);
2410         rbd_osd_req_format_write(obj_request);
2411
2412         ret = rbd_obj_request_submit(osdc, obj_request);
2413         if (ret)
2414                 goto out_cancel;
2415         ret = rbd_obj_request_wait(obj_request);
2416         if (ret)
2417                 goto out_cancel;
2418         ret = obj_request->result;
2419         if (ret)
2420                 goto out_cancel;
2421
2422         /*
2423          * A watch request is set to linger, so the underlying osd
2424          * request won't go away until we unregister it.  We retain
2425          * a pointer to the object request during that time (in
2426          * rbd_dev->watch_request), so we'll keep a reference to
2427          * it.  We'll drop that reference (below) after we've
2428          * unregistered it.
2429          */
2430         if (start) {
2431                 rbd_dev->watch_request = obj_request;
2432
2433                 return 0;
2434         }
2435
2436         /* We have successfully torn down the watch request */
2437
2438         rbd_obj_request_put(rbd_dev->watch_request);
2439         rbd_dev->watch_request = NULL;
2440 out_cancel:
2441         /* Cancel the event if we're tearing down, or on error */
2442         ceph_osdc_cancel_event(rbd_dev->watch_event);
2443         rbd_dev->watch_event = NULL;
2444         if (obj_request)
2445                 rbd_obj_request_put(obj_request);
2446
2447         return ret;
2448 }
2449
2450 /*
2451  * Synchronous osd object method call
2452  */
2453 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2454                              const char *object_name,
2455                              const char *class_name,
2456                              const char *method_name,
2457                              const char *outbound,
2458                              size_t outbound_size,
2459                              char *inbound,
2460                              size_t inbound_size,
2461                              u64 *version)
2462 {
2463         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2464         struct rbd_obj_request *obj_request;
2465         struct page **pages;
2466         u32 page_count;
2467         int ret;
2468
2469         /*
2470          * Method calls are ultimately read operations.  The result
2471          * should placed into the inbound buffer provided.  They
2472          * also supply outbound data--parameters for the object
2473          * method.  Currently if this is present it will be a
2474          * snapshot id.
2475          */
2476         page_count = (u32) calc_pages_for(0, inbound_size);
2477         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2478         if (IS_ERR(pages))
2479                 return PTR_ERR(pages);
2480
2481         ret = -ENOMEM;
2482         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2483                                                         OBJ_REQUEST_PAGES);
2484         if (!obj_request)
2485                 goto out;
2486
2487         obj_request->pages = pages;
2488         obj_request->page_count = page_count;
2489
2490         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2491         if (!obj_request->osd_req)
2492                 goto out;
2493
2494         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2495                                         class_name, method_name);
2496         if (outbound_size) {
2497                 struct ceph_pagelist *pagelist;
2498
2499                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2500                 if (!pagelist)
2501                         goto out;
2502
2503                 ceph_pagelist_init(pagelist);
2504                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2505                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2506                                                 pagelist);
2507         }
2508         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2509                                         obj_request->pages, inbound_size,
2510                                         0, false, false);
2511         rbd_osd_req_format_read(obj_request);
2512
2513         ret = rbd_obj_request_submit(osdc, obj_request);
2514         if (ret)
2515                 goto out;
2516         ret = rbd_obj_request_wait(obj_request);
2517         if (ret)
2518                 goto out;
2519
2520         ret = obj_request->result;
2521         if (ret < 0)
2522                 goto out;
2523         ret = 0;
2524         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2525         if (version)
2526                 *version = obj_request->version;
2527 out:
2528         if (obj_request)
2529                 rbd_obj_request_put(obj_request);
2530         else
2531                 ceph_release_page_vector(pages, page_count);
2532
2533         return ret;
2534 }
2535
2536 static void rbd_request_fn(struct request_queue *q)
2537                 __releases(q->queue_lock) __acquires(q->queue_lock)
2538 {
2539         struct rbd_device *rbd_dev = q->queuedata;
2540         bool read_only = rbd_dev->mapping.read_only;
2541         struct request *rq;
2542         int result;
2543
2544         while ((rq = blk_fetch_request(q))) {
2545                 bool write_request = rq_data_dir(rq) == WRITE;
2546                 struct rbd_img_request *img_request;
2547                 u64 offset;
2548                 u64 length;
2549
2550                 /* Ignore any non-FS requests that filter through. */
2551
2552                 if (rq->cmd_type != REQ_TYPE_FS) {
2553                         dout("%s: non-fs request type %d\n", __func__,
2554                                 (int) rq->cmd_type);
2555                         __blk_end_request_all(rq, 0);
2556                         continue;
2557                 }
2558
2559                 /* Ignore/skip any zero-length requests */
2560
2561                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2562                 length = (u64) blk_rq_bytes(rq);
2563
2564                 if (!length) {
2565                         dout("%s: zero-length request\n", __func__);
2566                         __blk_end_request_all(rq, 0);
2567                         continue;
2568                 }
2569
2570                 spin_unlock_irq(q->queue_lock);
2571
2572                 /* Disallow writes to a read-only device */
2573
2574                 if (write_request) {
2575                         result = -EROFS;
2576                         if (read_only)
2577                                 goto end_request;
2578                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2579                 }
2580
2581                 /*
2582                  * Quit early if the mapped snapshot no longer
2583                  * exists.  It's still possible the snapshot will
2584                  * have disappeared by the time our request arrives
2585                  * at the osd, but there's no sense in sending it if
2586                  * we already know.
2587                  */
2588                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2589                         dout("request for non-existent snapshot");
2590                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2591                         result = -ENXIO;
2592                         goto end_request;
2593                 }
2594
2595                 result = -EINVAL;
2596                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2597                         goto end_request;       /* Shouldn't happen */
2598
2599                 result = -ENOMEM;
2600                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2601                                                         write_request, false);
2602                 if (!img_request)
2603                         goto end_request;
2604
2605                 img_request->rq = rq;
2606
2607                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2608                                                 rq->bio);
2609                 if (!result)
2610                         result = rbd_img_request_submit(img_request);
2611                 if (result)
2612                         rbd_img_request_put(img_request);
2613 end_request:
2614                 spin_lock_irq(q->queue_lock);
2615                 if (result < 0) {
2616                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2617                                 write_request ? "write" : "read",
2618                                 length, offset, result);
2619
2620                         __blk_end_request_all(rq, result);
2621                 }
2622         }
2623 }
2624
2625 /*
2626  * a queue callback. Makes sure that we don't create a bio that spans across
2627  * multiple osd objects. One exception would be with a single page bios,
2628  * which we handle later at bio_chain_clone_range()
2629  */
2630 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2631                           struct bio_vec *bvec)
2632 {
2633         struct rbd_device *rbd_dev = q->queuedata;
2634         sector_t sector_offset;
2635         sector_t sectors_per_obj;
2636         sector_t obj_sector_offset;
2637         int ret;
2638
2639         /*
2640          * Find how far into its rbd object the partition-relative
2641          * bio start sector is to offset relative to the enclosing
2642          * device.
2643          */
2644         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2645         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2646         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2647
2648         /*
2649          * Compute the number of bytes from that offset to the end
2650          * of the object.  Account for what's already used by the bio.
2651          */
2652         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2653         if (ret > bmd->bi_size)
2654                 ret -= bmd->bi_size;
2655         else
2656                 ret = 0;
2657
2658         /*
2659          * Don't send back more than was asked for.  And if the bio
2660          * was empty, let the whole thing through because:  "Note
2661          * that a block device *must* allow a single page to be
2662          * added to an empty bio."
2663          */
2664         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2665         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2666                 ret = (int) bvec->bv_len;
2667
2668         return ret;
2669 }
2670
2671 static void rbd_free_disk(struct rbd_device *rbd_dev)
2672 {
2673         struct gendisk *disk = rbd_dev->disk;
2674
2675         if (!disk)
2676                 return;
2677
2678         if (disk->flags & GENHD_FL_UP)
2679                 del_gendisk(disk);
2680         if (disk->queue)
2681                 blk_cleanup_queue(disk->queue);
2682         put_disk(disk);
2683 }
2684
2685 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2686                                 const char *object_name,
2687                                 u64 offset, u64 length,
2688                                 char *buf, u64 *version)
2689
2690 {
2691         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2692         struct rbd_obj_request *obj_request;
2693         struct page **pages = NULL;
2694         u32 page_count;
2695         size_t size;
2696         int ret;
2697
2698         page_count = (u32) calc_pages_for(offset, length);
2699         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2700         if (IS_ERR(pages))
2701                 ret = PTR_ERR(pages);
2702
2703         ret = -ENOMEM;
2704         obj_request = rbd_obj_request_create(object_name, offset, length,
2705                                                         OBJ_REQUEST_PAGES);
2706         if (!obj_request)
2707                 goto out;
2708
2709         obj_request->pages = pages;
2710         obj_request->page_count = page_count;
2711
2712         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2713         if (!obj_request->osd_req)
2714                 goto out;
2715
2716         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2717                                         offset, length, 0, 0);
2718         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2719                                         obj_request->pages,
2720                                         obj_request->length,
2721                                         obj_request->offset & ~PAGE_MASK,
2722                                         false, false);
2723         rbd_osd_req_format_read(obj_request);
2724
2725         ret = rbd_obj_request_submit(osdc, obj_request);
2726         if (ret)
2727                 goto out;
2728         ret = rbd_obj_request_wait(obj_request);
2729         if (ret)
2730                 goto out;
2731
2732         ret = obj_request->result;
2733         if (ret < 0)
2734                 goto out;
2735
2736         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2737         size = (size_t) obj_request->xferred;
2738         ceph_copy_from_page_vector(pages, buf, 0, size);
2739         rbd_assert(size <= (size_t) INT_MAX);
2740         ret = (int) size;
2741         if (version)
2742                 *version = obj_request->version;
2743 out:
2744         if (obj_request)
2745                 rbd_obj_request_put(obj_request);
2746         else
2747                 ceph_release_page_vector(pages, page_count);
2748
2749         return ret;
2750 }
2751
2752 /*
2753  * Read the complete header for the given rbd device.
2754  *
2755  * Returns a pointer to a dynamically-allocated buffer containing
2756  * the complete and validated header.  Caller can pass the address
2757  * of a variable that will be filled in with the version of the
2758  * header object at the time it was read.
2759  *
2760  * Returns a pointer-coded errno if a failure occurs.
2761  */
2762 static struct rbd_image_header_ondisk *
2763 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2764 {
2765         struct rbd_image_header_ondisk *ondisk = NULL;
2766         u32 snap_count = 0;
2767         u64 names_size = 0;
2768         u32 want_count;
2769         int ret;
2770
2771         /*
2772          * The complete header will include an array of its 64-bit
2773          * snapshot ids, followed by the names of those snapshots as
2774          * a contiguous block of NUL-terminated strings.  Note that
2775          * the number of snapshots could change by the time we read
2776          * it in, in which case we re-read it.
2777          */
2778         do {
2779                 size_t size;
2780
2781                 kfree(ondisk);
2782
2783                 size = sizeof (*ondisk);
2784                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2785                 size += names_size;
2786                 ondisk = kmalloc(size, GFP_KERNEL);
2787                 if (!ondisk)
2788                         return ERR_PTR(-ENOMEM);
2789
2790                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2791                                        0, size,
2792                                        (char *) ondisk, version);
2793                 if (ret < 0)
2794                         goto out_err;
2795                 if (WARN_ON((size_t) ret < size)) {
2796                         ret = -ENXIO;
2797                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2798                                 size, ret);
2799                         goto out_err;
2800                 }
2801                 if (!rbd_dev_ondisk_valid(ondisk)) {
2802                         ret = -ENXIO;
2803                         rbd_warn(rbd_dev, "invalid header");
2804                         goto out_err;
2805                 }
2806
2807                 names_size = le64_to_cpu(ondisk->snap_names_len);
2808                 want_count = snap_count;
2809                 snap_count = le32_to_cpu(ondisk->snap_count);
2810         } while (snap_count != want_count);
2811
2812         return ondisk;
2813
2814 out_err:
2815         kfree(ondisk);
2816
2817         return ERR_PTR(ret);
2818 }
2819
2820 /*
2821  * reload the ondisk the header
2822  */
2823 static int rbd_read_header(struct rbd_device *rbd_dev,
2824                            struct rbd_image_header *header)
2825 {
2826         struct rbd_image_header_ondisk *ondisk;
2827         u64 ver = 0;
2828         int ret;
2829
2830         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2831         if (IS_ERR(ondisk))
2832                 return PTR_ERR(ondisk);
2833         ret = rbd_header_from_disk(header, ondisk);
2834         if (ret >= 0)
2835                 header->obj_version = ver;
2836         kfree(ondisk);
2837
2838         return ret;
2839 }
2840
2841 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2842 {
2843         struct rbd_snap *snap;
2844         struct rbd_snap *next;
2845
2846         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2847                 rbd_remove_snap_dev(snap);
2848 }
2849
2850 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2851 {
2852         sector_t size;
2853
2854         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2855                 return;
2856
2857         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2858         dout("setting size to %llu sectors", (unsigned long long) size);
2859         rbd_dev->mapping.size = (u64) size;
2860         set_capacity(rbd_dev->disk, size);
2861 }
2862
2863 /*
2864  * only read the first part of the ondisk header, without the snaps info
2865  */
2866 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2867 {
2868         int ret;
2869         struct rbd_image_header h;
2870
2871         ret = rbd_read_header(rbd_dev, &h);
2872         if (ret < 0)
2873                 return ret;
2874
2875         down_write(&rbd_dev->header_rwsem);
2876
2877         /* Update image size, and check for resize of mapped image */
2878         rbd_dev->header.image_size = h.image_size;
2879         rbd_update_mapping_size(rbd_dev);
2880
2881         /* rbd_dev->header.object_prefix shouldn't change */
2882         kfree(rbd_dev->header.snap_sizes);
2883         kfree(rbd_dev->header.snap_names);
2884         /* osd requests may still refer to snapc */
2885         ceph_put_snap_context(rbd_dev->header.snapc);
2886
2887         if (hver)
2888                 *hver = h.obj_version;
2889         rbd_dev->header.obj_version = h.obj_version;
2890         rbd_dev->header.image_size = h.image_size;
2891         rbd_dev->header.snapc = h.snapc;
2892         rbd_dev->header.snap_names = h.snap_names;
2893         rbd_dev->header.snap_sizes = h.snap_sizes;
2894         /* Free the extra copy of the object prefix */
2895         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2896         kfree(h.object_prefix);
2897
2898         ret = rbd_dev_snaps_update(rbd_dev);
2899         if (!ret)
2900                 ret = rbd_dev_snaps_register(rbd_dev);
2901
2902         up_write(&rbd_dev->header_rwsem);
2903
2904         return ret;
2905 }
2906
2907 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2908 {
2909         int ret;
2910
2911         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2912         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2913         if (rbd_dev->image_format == 1)
2914                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2915         else
2916                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2917         mutex_unlock(&ctl_mutex);
2918         revalidate_disk(rbd_dev->disk);
2919
2920         return ret;
2921 }
2922
2923 static int rbd_init_disk(struct rbd_device *rbd_dev)
2924 {
2925         struct gendisk *disk;
2926         struct request_queue *q;
2927         u64 segment_size;
2928
2929         /* create gendisk info */
2930         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2931         if (!disk)
2932                 return -ENOMEM;
2933
2934         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2935                  rbd_dev->dev_id);
2936         disk->major = rbd_dev->major;
2937         disk->first_minor = 0;
2938         disk->fops = &rbd_bd_ops;
2939         disk->private_data = rbd_dev;
2940
2941         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2942         if (!q)
2943                 goto out_disk;
2944
2945         /* We use the default size, but let's be explicit about it. */
2946         blk_queue_physical_block_size(q, SECTOR_SIZE);
2947
2948         /* set io sizes to object size */
2949         segment_size = rbd_obj_bytes(&rbd_dev->header);
2950         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2951         blk_queue_max_segment_size(q, segment_size);
2952         blk_queue_io_min(q, segment_size);
2953         blk_queue_io_opt(q, segment_size);
2954
2955         blk_queue_merge_bvec(q, rbd_merge_bvec);
2956         disk->queue = q;
2957
2958         q->queuedata = rbd_dev;
2959
2960         rbd_dev->disk = disk;
2961
2962         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2963
2964         return 0;
2965 out_disk:
2966         put_disk(disk);
2967
2968         return -ENOMEM;
2969 }
2970
2971 /*
2972   sysfs
2973 */
2974
2975 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2976 {
2977         return container_of(dev, struct rbd_device, dev);
2978 }
2979
2980 static ssize_t rbd_size_show(struct device *dev,
2981                              struct device_attribute *attr, char *buf)
2982 {
2983         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2984         sector_t size;
2985
2986         down_read(&rbd_dev->header_rwsem);
2987         size = get_capacity(rbd_dev->disk);
2988         up_read(&rbd_dev->header_rwsem);
2989
2990         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2991 }
2992
2993 /*
2994  * Note this shows the features for whatever's mapped, which is not
2995  * necessarily the base image.
2996  */
2997 static ssize_t rbd_features_show(struct device *dev,
2998                              struct device_attribute *attr, char *buf)
2999 {
3000         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3001
3002         return sprintf(buf, "0x%016llx\n",
3003                         (unsigned long long) rbd_dev->mapping.features);
3004 }
3005
3006 static ssize_t rbd_major_show(struct device *dev,
3007                               struct device_attribute *attr, char *buf)
3008 {
3009         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3010
3011         return sprintf(buf, "%d\n", rbd_dev->major);
3012 }
3013
3014 static ssize_t rbd_client_id_show(struct device *dev,
3015                                   struct device_attribute *attr, char *buf)
3016 {
3017         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3018
3019         return sprintf(buf, "client%lld\n",
3020                         ceph_client_id(rbd_dev->rbd_client->client));
3021 }
3022
3023 static ssize_t rbd_pool_show(struct device *dev,
3024                              struct device_attribute *attr, char *buf)
3025 {
3026         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3027
3028         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3029 }
3030
3031 static ssize_t rbd_pool_id_show(struct device *dev,
3032                              struct device_attribute *attr, char *buf)
3033 {
3034         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3035
3036         return sprintf(buf, "%llu\n",
3037                 (unsigned long long) rbd_dev->spec->pool_id);
3038 }
3039
3040 static ssize_t rbd_name_show(struct device *dev,
3041                              struct device_attribute *attr, char *buf)
3042 {
3043         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3044
3045         if (rbd_dev->spec->image_name)
3046                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3047
3048         return sprintf(buf, "(unknown)\n");
3049 }
3050
3051 static ssize_t rbd_image_id_show(struct device *dev,
3052                              struct device_attribute *attr, char *buf)
3053 {
3054         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3055
3056         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3057 }
3058
3059 /*
3060  * Shows the name of the currently-mapped snapshot (or
3061  * RBD_SNAP_HEAD_NAME for the base image).
3062  */
3063 static ssize_t rbd_snap_show(struct device *dev,
3064                              struct device_attribute *attr,
3065                              char *buf)
3066 {
3067         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3068
3069         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3070 }
3071
3072 /*
3073  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3074  * for the parent image.  If there is no parent, simply shows
3075  * "(no parent image)".
3076  */
3077 static ssize_t rbd_parent_show(struct device *dev,
3078                              struct device_attribute *attr,
3079                              char *buf)
3080 {
3081         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3082         struct rbd_spec *spec = rbd_dev->parent_spec;
3083         int count;
3084         char *bufp = buf;
3085
3086         if (!spec)
3087                 return sprintf(buf, "(no parent image)\n");
3088
3089         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3090                         (unsigned long long) spec->pool_id, spec->pool_name);
3091         if (count < 0)
3092                 return count;
3093         bufp += count;
3094
3095         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3096                         spec->image_name ? spec->image_name : "(unknown)");
3097         if (count < 0)
3098                 return count;
3099         bufp += count;
3100
3101         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3102                         (unsigned long long) spec->snap_id, spec->snap_name);
3103         if (count < 0)
3104                 return count;
3105         bufp += count;
3106
3107         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3108         if (count < 0)
3109                 return count;
3110         bufp += count;
3111
3112         return (ssize_t) (bufp - buf);
3113 }
3114
3115 static ssize_t rbd_image_refresh(struct device *dev,
3116                                  struct device_attribute *attr,
3117                                  const char *buf,
3118                                  size_t size)
3119 {
3120         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3121         int ret;
3122
3123         ret = rbd_dev_refresh(rbd_dev, NULL);
3124
3125         return ret < 0 ? ret : size;
3126 }
3127
3128 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3129 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3130 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3131 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3132 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3133 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3134 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3135 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3136 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3137 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3138 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3139
3140 static struct attribute *rbd_attrs[] = {
3141         &dev_attr_size.attr,
3142         &dev_attr_features.attr,
3143         &dev_attr_major.attr,
3144         &dev_attr_client_id.attr,
3145         &dev_attr_pool.attr,
3146         &dev_attr_pool_id.attr,
3147         &dev_attr_name.attr,
3148         &dev_attr_image_id.attr,
3149         &dev_attr_current_snap.attr,
3150         &dev_attr_parent.attr,
3151         &dev_attr_refresh.attr,
3152         NULL
3153 };
3154
3155 static struct attribute_group rbd_attr_group = {
3156         .attrs = rbd_attrs,
3157 };
3158
3159 static const struct attribute_group *rbd_attr_groups[] = {
3160         &rbd_attr_group,
3161         NULL
3162 };
3163
3164 static void rbd_sysfs_dev_release(struct device *dev)
3165 {
3166 }
3167
3168 static struct device_type rbd_device_type = {
3169         .name           = "rbd",
3170         .groups         = rbd_attr_groups,
3171         .release        = rbd_sysfs_dev_release,
3172 };
3173
3174
3175 /*
3176   sysfs - snapshots
3177 */
3178
3179 static ssize_t rbd_snap_size_show(struct device *dev,
3180                                   struct device_attribute *attr,
3181                                   char *buf)
3182 {
3183         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3184
3185         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
3186 }
3187
3188 static ssize_t rbd_snap_id_show(struct device *dev,
3189                                 struct device_attribute *attr,
3190                                 char *buf)
3191 {
3192         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3193
3194         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
3195 }
3196
3197 static ssize_t rbd_snap_features_show(struct device *dev,
3198                                 struct device_attribute *attr,
3199                                 char *buf)
3200 {
3201         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3202
3203         return sprintf(buf, "0x%016llx\n",
3204                         (unsigned long long) snap->features);
3205 }
3206
3207 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
3208 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
3209 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
3210
3211 static struct attribute *rbd_snap_attrs[] = {
3212         &dev_attr_snap_size.attr,
3213         &dev_attr_snap_id.attr,
3214         &dev_attr_snap_features.attr,
3215         NULL,
3216 };
3217
3218 static struct attribute_group rbd_snap_attr_group = {
3219         .attrs = rbd_snap_attrs,
3220 };
3221
3222 static void rbd_snap_dev_release(struct device *dev)
3223 {
3224         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3225         kfree(snap->name);
3226         kfree(snap);
3227 }
3228
3229 static const struct attribute_group *rbd_snap_attr_groups[] = {
3230         &rbd_snap_attr_group,
3231         NULL
3232 };
3233
3234 static struct device_type rbd_snap_device_type = {
3235         .groups         = rbd_snap_attr_groups,
3236         .release        = rbd_snap_dev_release,
3237 };
3238
3239 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3240 {
3241         kref_get(&spec->kref);
3242
3243         return spec;
3244 }
3245
3246 static void rbd_spec_free(struct kref *kref);
3247 static void rbd_spec_put(struct rbd_spec *spec)
3248 {
3249         if (spec)
3250                 kref_put(&spec->kref, rbd_spec_free);
3251 }
3252
3253 static struct rbd_spec *rbd_spec_alloc(void)
3254 {
3255         struct rbd_spec *spec;
3256
3257         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3258         if (!spec)
3259                 return NULL;
3260         kref_init(&spec->kref);
3261
3262         return spec;
3263 }
3264
3265 static void rbd_spec_free(struct kref *kref)
3266 {
3267         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3268
3269         kfree(spec->pool_name);
3270         kfree(spec->image_id);
3271         kfree(spec->image_name);
3272         kfree(spec->snap_name);
3273         kfree(spec);
3274 }
3275
3276 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3277                                 struct rbd_spec *spec)
3278 {
3279         struct rbd_device *rbd_dev;
3280
3281         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3282         if (!rbd_dev)
3283                 return NULL;
3284
3285         spin_lock_init(&rbd_dev->lock);
3286         rbd_dev->flags = 0;
3287         INIT_LIST_HEAD(&rbd_dev->node);
3288         INIT_LIST_HEAD(&rbd_dev->snaps);
3289         init_rwsem(&rbd_dev->header_rwsem);
3290
3291         rbd_dev->spec = spec;
3292         rbd_dev->rbd_client = rbdc;
3293
3294         /* Initialize the layout used for all rbd requests */
3295
3296         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3297         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3298         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3299         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3300
3301         return rbd_dev;
3302 }
3303
3304 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3305 {
3306         rbd_spec_put(rbd_dev->parent_spec);
3307         kfree(rbd_dev->header_name);
3308         rbd_put_client(rbd_dev->rbd_client);
3309         rbd_spec_put(rbd_dev->spec);
3310         kfree(rbd_dev);
3311 }
3312
3313 static bool rbd_snap_registered(struct rbd_snap *snap)
3314 {
3315         bool ret = snap->dev.type == &rbd_snap_device_type;
3316         bool reg = device_is_registered(&snap->dev);
3317
3318         rbd_assert(!ret ^ reg);
3319
3320         return ret;
3321 }
3322
3323 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3324 {
3325         list_del(&snap->node);
3326         if (device_is_registered(&snap->dev))
3327                 device_unregister(&snap->dev);
3328 }
3329
3330 static int rbd_register_snap_dev(struct rbd_snap *snap,
3331                                   struct device *parent)
3332 {
3333         struct device *dev = &snap->dev;
3334         int ret;
3335
3336         dev->type = &rbd_snap_device_type;
3337         dev->parent = parent;
3338         dev->release = rbd_snap_dev_release;
3339         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
3340         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3341
3342         ret = device_register(dev);
3343
3344         return ret;
3345 }
3346
3347 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3348                                                 const char *snap_name,
3349                                                 u64 snap_id, u64 snap_size,
3350                                                 u64 snap_features)
3351 {
3352         struct rbd_snap *snap;
3353         int ret;
3354
3355         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3356         if (!snap)
3357                 return ERR_PTR(-ENOMEM);
3358
3359         ret = -ENOMEM;
3360         snap->name = kstrdup(snap_name, GFP_KERNEL);
3361         if (!snap->name)
3362                 goto err;
3363
3364         snap->id = snap_id;
3365         snap->size = snap_size;
3366         snap->features = snap_features;
3367
3368         return snap;
3369
3370 err:
3371         kfree(snap->name);
3372         kfree(snap);
3373
3374         return ERR_PTR(ret);
3375 }
3376
3377 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3378                 u64 *snap_size, u64 *snap_features)
3379 {
3380         char *snap_name;
3381
3382         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3383
3384         *snap_size = rbd_dev->header.snap_sizes[which];
3385         *snap_features = 0;     /* No features for v1 */
3386
3387         /* Skip over names until we find the one we are looking for */
3388
3389         snap_name = rbd_dev->header.snap_names;
3390         while (which--)
3391                 snap_name += strlen(snap_name) + 1;
3392
3393         return snap_name;
3394 }
3395
3396 /*
3397  * Get the size and object order for an image snapshot, or if
3398  * snap_id is CEPH_NOSNAP, gets this information for the base
3399  * image.
3400  */
3401 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3402                                 u8 *order, u64 *snap_size)
3403 {
3404         __le64 snapid = cpu_to_le64(snap_id);
3405         int ret;
3406         struct {
3407                 u8 order;
3408                 __le64 size;
3409         } __attribute__ ((packed)) size_buf = { 0 };
3410
3411         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3412                                 "rbd", "get_size",
3413                                 (char *) &snapid, sizeof (snapid),
3414                                 (char *) &size_buf, sizeof (size_buf), NULL);
3415         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3416         if (ret < 0)
3417                 return ret;
3418
3419         *order = size_buf.order;
3420         *snap_size = le64_to_cpu(size_buf.size);
3421
3422         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3423                 (unsigned long long) snap_id, (unsigned int) *order,
3424                 (unsigned long long) *snap_size);
3425
3426         return 0;
3427 }
3428
3429 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3430 {
3431         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3432                                         &rbd_dev->header.obj_order,
3433                                         &rbd_dev->header.image_size);
3434 }
3435
3436 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3437 {
3438         void *reply_buf;
3439         int ret;
3440         void *p;
3441
3442         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3443         if (!reply_buf)
3444                 return -ENOMEM;
3445
3446         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3447                                 "rbd", "get_object_prefix",
3448                                 NULL, 0,
3449                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3450         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3451         if (ret < 0)
3452                 goto out;
3453
3454         p = reply_buf;
3455         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3456                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
3457                                                 NULL, GFP_NOIO);
3458
3459         if (IS_ERR(rbd_dev->header.object_prefix)) {
3460                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3461                 rbd_dev->header.object_prefix = NULL;
3462         } else {
3463                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3464         }
3465
3466 out:
3467         kfree(reply_buf);
3468
3469         return ret;
3470 }
3471
3472 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3473                 u64 *snap_features)
3474 {
3475         __le64 snapid = cpu_to_le64(snap_id);
3476         struct {
3477                 __le64 features;
3478                 __le64 incompat;
3479         } features_buf = { 0 };
3480         u64 incompat;
3481         int ret;
3482
3483         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3484                                 "rbd", "get_features",
3485                                 (char *) &snapid, sizeof (snapid),
3486                                 (char *) &features_buf, sizeof (features_buf),
3487                                 NULL);
3488         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3489         if (ret < 0)
3490                 return ret;
3491
3492         incompat = le64_to_cpu(features_buf.incompat);
3493         if (incompat & ~RBD_FEATURES_SUPPORTED)
3494                 return -ENXIO;
3495
3496         *snap_features = le64_to_cpu(features_buf.features);
3497
3498         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3499                 (unsigned long long) snap_id,
3500                 (unsigned long long) *snap_features,
3501                 (unsigned long long) le64_to_cpu(features_buf.incompat));
3502
3503         return 0;
3504 }
3505
3506 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3507 {
3508         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3509                                                 &rbd_dev->header.features);
3510 }
3511
3512 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3513 {
3514         struct rbd_spec *parent_spec;
3515         size_t size;
3516         void *reply_buf = NULL;
3517         __le64 snapid;
3518         void *p;
3519         void *end;
3520         char *image_id;
3521         u64 overlap;
3522         int ret;
3523
3524         parent_spec = rbd_spec_alloc();
3525         if (!parent_spec)
3526                 return -ENOMEM;
3527
3528         size = sizeof (__le64) +                                /* pool_id */
3529                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3530                 sizeof (__le64) +                               /* snap_id */
3531                 sizeof (__le64);                                /* overlap */
3532         reply_buf = kmalloc(size, GFP_KERNEL);
3533         if (!reply_buf) {
3534                 ret = -ENOMEM;
3535                 goto out_err;
3536         }
3537
3538         snapid = cpu_to_le64(CEPH_NOSNAP);
3539         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3540                                 "rbd", "get_parent",
3541                                 (char *) &snapid, sizeof (snapid),
3542                                 (char *) reply_buf, size, NULL);
3543         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3544         if (ret < 0)
3545                 goto out_err;
3546
3547         ret = -ERANGE;
3548         p = reply_buf;
3549         end = (char *) reply_buf + size;
3550         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3551         if (parent_spec->pool_id == CEPH_NOPOOL)
3552                 goto out;       /* No parent?  No problem. */
3553
3554         /* The ceph file layout needs to fit pool id in 32 bits */
3555
3556         ret = -EIO;
3557         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3558                 goto out;
3559
3560         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3561         if (IS_ERR(image_id)) {
3562                 ret = PTR_ERR(image_id);
3563                 goto out_err;
3564         }
3565         parent_spec->image_id = image_id;
3566         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3567         ceph_decode_64_safe(&p, end, overlap, out_err);
3568
3569         rbd_dev->parent_overlap = overlap;
3570         rbd_dev->parent_spec = parent_spec;
3571         parent_spec = NULL;     /* rbd_dev now owns this */
3572 out:
3573         ret = 0;
3574 out_err:
3575         kfree(reply_buf);
3576         rbd_spec_put(parent_spec);
3577
3578         return ret;
3579 }
3580
3581 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3582 {
3583         size_t image_id_size;
3584         char *image_id;
3585         void *p;
3586         void *end;
3587         size_t size;
3588         void *reply_buf = NULL;
3589         size_t len = 0;
3590         char *image_name = NULL;
3591         int ret;
3592
3593         rbd_assert(!rbd_dev->spec->image_name);
3594
3595         len = strlen(rbd_dev->spec->image_id);
3596         image_id_size = sizeof (__le32) + len;
3597         image_id = kmalloc(image_id_size, GFP_KERNEL);
3598         if (!image_id)
3599                 return NULL;
3600
3601         p = image_id;
3602         end = (char *) image_id + image_id_size;
3603         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3604
3605         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3606         reply_buf = kmalloc(size, GFP_KERNEL);
3607         if (!reply_buf)
3608                 goto out;
3609
3610         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3611                                 "rbd", "dir_get_name",
3612                                 image_id, image_id_size,
3613                                 (char *) reply_buf, size, NULL);
3614         if (ret < 0)
3615                 goto out;
3616         p = reply_buf;
3617         end = (char *) reply_buf + size;
3618         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3619         if (IS_ERR(image_name))
3620                 image_name = NULL;
3621         else
3622                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3623 out:
3624         kfree(reply_buf);
3625         kfree(image_id);
3626
3627         return image_name;
3628 }
3629
3630 /*
3631  * When a parent image gets probed, we only have the pool, image,
3632  * and snapshot ids but not the names of any of them.  This call
3633  * is made later to fill in those names.  It has to be done after
3634  * rbd_dev_snaps_update() has completed because some of the
3635  * information (in particular, snapshot name) is not available
3636  * until then.
3637  */
3638 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3639 {
3640         struct ceph_osd_client *osdc;
3641         const char *name;
3642         void *reply_buf = NULL;
3643         int ret;
3644
3645         if (rbd_dev->spec->pool_name)
3646                 return 0;       /* Already have the names */
3647
3648         /* Look up the pool name */
3649
3650         osdc = &rbd_dev->rbd_client->client->osdc;
3651         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3652         if (!name) {
3653                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3654                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3655                 return -EIO;
3656         }
3657
3658         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3659         if (!rbd_dev->spec->pool_name)
3660                 return -ENOMEM;
3661
3662         /* Fetch the image name; tolerate failure here */
3663
3664         name = rbd_dev_image_name(rbd_dev);
3665         if (name)
3666                 rbd_dev->spec->image_name = (char *) name;
3667         else
3668                 rbd_warn(rbd_dev, "unable to get image name");
3669
3670         /* Look up the snapshot name. */
3671
3672         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3673         if (!name) {
3674                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3675                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3676                 ret = -EIO;
3677                 goto out_err;
3678         }
3679         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3680         if(!rbd_dev->spec->snap_name)
3681                 goto out_err;
3682
3683         return 0;
3684 out_err:
3685         kfree(reply_buf);
3686         kfree(rbd_dev->spec->pool_name);
3687         rbd_dev->spec->pool_name = NULL;
3688
3689         return ret;
3690 }
3691
3692 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3693 {
3694         size_t size;
3695         int ret;
3696         void *reply_buf;
3697         void *p;
3698         void *end;
3699         u64 seq;
3700         u32 snap_count;
3701         struct ceph_snap_context *snapc;
3702         u32 i;
3703
3704         /*
3705          * We'll need room for the seq value (maximum snapshot id),
3706          * snapshot count, and array of that many snapshot ids.
3707          * For now we have a fixed upper limit on the number we're
3708          * prepared to receive.
3709          */
3710         size = sizeof (__le64) + sizeof (__le32) +
3711                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3712         reply_buf = kzalloc(size, GFP_KERNEL);
3713         if (!reply_buf)
3714                 return -ENOMEM;
3715
3716         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3717                                 "rbd", "get_snapcontext",
3718                                 NULL, 0,
3719                                 reply_buf, size, ver);
3720         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3721         if (ret < 0)
3722                 goto out;
3723
3724         ret = -ERANGE;
3725         p = reply_buf;
3726         end = (char *) reply_buf + size;
3727         ceph_decode_64_safe(&p, end, seq, out);
3728         ceph_decode_32_safe(&p, end, snap_count, out);
3729
3730         /*
3731          * Make sure the reported number of snapshot ids wouldn't go
3732          * beyond the end of our buffer.  But before checking that,
3733          * make sure the computed size of the snapshot context we
3734          * allocate is representable in a size_t.
3735          */
3736         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3737                                  / sizeof (u64)) {
3738                 ret = -EINVAL;
3739                 goto out;
3740         }
3741         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3742                 goto out;
3743
3744         size = sizeof (struct ceph_snap_context) +
3745                                 snap_count * sizeof (snapc->snaps[0]);
3746         snapc = kmalloc(size, GFP_KERNEL);
3747         if (!snapc) {
3748                 ret = -ENOMEM;
3749                 goto out;
3750         }
3751
3752         atomic_set(&snapc->nref, 1);
3753         snapc->seq = seq;
3754         snapc->num_snaps = snap_count;
3755         for (i = 0; i < snap_count; i++)
3756                 snapc->snaps[i] = ceph_decode_64(&p);
3757
3758         rbd_dev->header.snapc = snapc;
3759
3760         dout("  snap context seq = %llu, snap_count = %u\n",
3761                 (unsigned long long) seq, (unsigned int) snap_count);
3762
3763 out:
3764         kfree(reply_buf);
3765
3766         return 0;
3767 }
3768
3769 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3770 {
3771         size_t size;
3772         void *reply_buf;
3773         __le64 snap_id;
3774         int ret;
3775         void *p;
3776         void *end;
3777         char *snap_name;
3778
3779         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3780         reply_buf = kmalloc(size, GFP_KERNEL);
3781         if (!reply_buf)
3782                 return ERR_PTR(-ENOMEM);
3783
3784         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3785         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3786                                 "rbd", "get_snapshot_name",
3787                                 (char *) &snap_id, sizeof (snap_id),
3788                                 reply_buf, size, NULL);
3789         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3790         if (ret < 0)
3791                 goto out;
3792
3793         p = reply_buf;
3794         end = (char *) reply_buf + size;
3795         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3796         if (IS_ERR(snap_name)) {
3797                 ret = PTR_ERR(snap_name);
3798                 goto out;
3799         } else {
3800                 dout("  snap_id 0x%016llx snap_name = %s\n",
3801                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3802         }
3803         kfree(reply_buf);
3804
3805         return snap_name;
3806 out:
3807         kfree(reply_buf);
3808
3809         return ERR_PTR(ret);
3810 }
3811
3812 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3813                 u64 *snap_size, u64 *snap_features)
3814 {
3815         u64 snap_id;
3816         u8 order;
3817         int ret;
3818
3819         snap_id = rbd_dev->header.snapc->snaps[which];
3820         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3821         if (ret)
3822                 return ERR_PTR(ret);
3823         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3824         if (ret)
3825                 return ERR_PTR(ret);
3826
3827         return rbd_dev_v2_snap_name(rbd_dev, which);
3828 }
3829
3830 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3831                 u64 *snap_size, u64 *snap_features)
3832 {
3833         if (rbd_dev->image_format == 1)
3834                 return rbd_dev_v1_snap_info(rbd_dev, which,
3835                                         snap_size, snap_features);
3836         if (rbd_dev->image_format == 2)
3837                 return rbd_dev_v2_snap_info(rbd_dev, which,
3838                                         snap_size, snap_features);
3839         return ERR_PTR(-EINVAL);
3840 }
3841
3842 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3843 {
3844         int ret;
3845         __u8 obj_order;
3846
3847         down_write(&rbd_dev->header_rwsem);
3848
3849         /* Grab old order first, to see if it changes */
3850
3851         obj_order = rbd_dev->header.obj_order,
3852         ret = rbd_dev_v2_image_size(rbd_dev);
3853         if (ret)
3854                 goto out;
3855         if (rbd_dev->header.obj_order != obj_order) {
3856                 ret = -EIO;
3857                 goto out;
3858         }
3859         rbd_update_mapping_size(rbd_dev);
3860
3861         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3862         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3863         if (ret)
3864                 goto out;
3865         ret = rbd_dev_snaps_update(rbd_dev);
3866         dout("rbd_dev_snaps_update returned %d\n", ret);
3867         if (ret)
3868                 goto out;
3869         ret = rbd_dev_snaps_register(rbd_dev);
3870         dout("rbd_dev_snaps_register returned %d\n", ret);
3871 out:
3872         up_write(&rbd_dev->header_rwsem);
3873
3874         return ret;
3875 }
3876
3877 /*
3878  * Scan the rbd device's current snapshot list and compare it to the
3879  * newly-received snapshot context.  Remove any existing snapshots
3880  * not present in the new snapshot context.  Add a new snapshot for
3881  * any snaphots in the snapshot context not in the current list.
3882  * And verify there are no changes to snapshots we already know
3883  * about.
3884  *
3885  * Assumes the snapshots in the snapshot context are sorted by
3886  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3887  * are also maintained in that order.)
3888  */
3889 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3890 {
3891         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3892         const u32 snap_count = snapc->num_snaps;
3893         struct list_head *head = &rbd_dev->snaps;
3894         struct list_head *links = head->next;
3895         u32 index = 0;
3896
3897         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3898         while (index < snap_count || links != head) {
3899                 u64 snap_id;
3900                 struct rbd_snap *snap;
3901                 char *snap_name;
3902                 u64 snap_size = 0;
3903                 u64 snap_features = 0;
3904
3905                 snap_id = index < snap_count ? snapc->snaps[index]
3906                                              : CEPH_NOSNAP;
3907                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3908                                      : NULL;
3909                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3910
3911                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3912                         struct list_head *next = links->next;
3913
3914                         /*
3915                          * A previously-existing snapshot is not in
3916                          * the new snap context.
3917                          *
3918                          * If the now missing snapshot is the one the
3919                          * image is mapped to, clear its exists flag
3920                          * so we can avoid sending any more requests
3921                          * to it.
3922                          */
3923                         if (rbd_dev->spec->snap_id == snap->id)
3924                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3925                         rbd_remove_snap_dev(snap);
3926                         dout("%ssnap id %llu has been removed\n",
3927                                 rbd_dev->spec->snap_id == snap->id ?
3928                                                         "mapped " : "",
3929                                 (unsigned long long) snap->id);
3930
3931                         /* Done with this list entry; advance */
3932
3933                         links = next;
3934                         continue;
3935                 }
3936
3937                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3938                                         &snap_size, &snap_features);
3939                 if (IS_ERR(snap_name))
3940                         return PTR_ERR(snap_name);
3941
3942                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3943                         (unsigned long long) snap_id);
3944                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3945                         struct rbd_snap *new_snap;
3946
3947                         /* We haven't seen this snapshot before */
3948
3949                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3950                                         snap_id, snap_size, snap_features);
3951                         if (IS_ERR(new_snap)) {
3952                                 int err = PTR_ERR(new_snap);
3953
3954                                 dout("  failed to add dev, error %d\n", err);
3955
3956                                 return err;
3957                         }
3958
3959                         /* New goes before existing, or at end of list */
3960
3961                         dout("  added dev%s\n", snap ? "" : " at end\n");
3962                         if (snap)
3963                                 list_add_tail(&new_snap->node, &snap->node);
3964                         else
3965                                 list_add_tail(&new_snap->node, head);
3966                 } else {
3967                         /* Already have this one */
3968
3969                         dout("  already present\n");
3970
3971                         rbd_assert(snap->size == snap_size);
3972                         rbd_assert(!strcmp(snap->name, snap_name));
3973                         rbd_assert(snap->features == snap_features);
3974
3975                         /* Done with this list entry; advance */
3976
3977                         links = links->next;
3978                 }
3979
3980                 /* Advance to the next entry in the snapshot context */
3981
3982                 index++;
3983         }
3984         dout("%s: done\n", __func__);
3985
3986         return 0;
3987 }
3988
3989 /*
3990  * Scan the list of snapshots and register the devices for any that
3991  * have not already been registered.
3992  */
3993 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3994 {
3995         struct rbd_snap *snap;
3996         int ret = 0;
3997
3998         dout("%s:\n", __func__);
3999         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
4000                 return -EIO;
4001
4002         list_for_each_entry(snap, &rbd_dev->snaps, node) {
4003                 if (!rbd_snap_registered(snap)) {
4004                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
4005                         if (ret < 0)
4006                                 break;
4007                 }
4008         }
4009         dout("%s: returning %d\n", __func__, ret);
4010
4011         return ret;
4012 }
4013
4014 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4015 {
4016         struct device *dev;
4017         int ret;
4018
4019         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4020
4021         dev = &rbd_dev->dev;
4022         dev->bus = &rbd_bus_type;
4023         dev->type = &rbd_device_type;
4024         dev->parent = &rbd_root_dev;
4025         dev->release = rbd_dev_release;
4026         dev_set_name(dev, "%d", rbd_dev->dev_id);
4027         ret = device_register(dev);
4028
4029         mutex_unlock(&ctl_mutex);
4030
4031         return ret;
4032 }
4033
4034 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4035 {
4036         device_unregister(&rbd_dev->dev);
4037 }
4038
4039 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4040
4041 /*
4042  * Get a unique rbd identifier for the given new rbd_dev, and add
4043  * the rbd_dev to the global list.  The minimum rbd id is 1.
4044  */
4045 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4046 {
4047         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4048
4049         spin_lock(&rbd_dev_list_lock);
4050         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4051         spin_unlock(&rbd_dev_list_lock);
4052         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4053                 (unsigned long long) rbd_dev->dev_id);
4054 }
4055
4056 /*
4057  * Remove an rbd_dev from the global list, and record that its
4058  * identifier is no longer in use.
4059  */
4060 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4061 {
4062         struct list_head *tmp;
4063         int rbd_id = rbd_dev->dev_id;
4064         int max_id;
4065
4066         rbd_assert(rbd_id > 0);
4067
4068         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4069                 (unsigned long long) rbd_dev->dev_id);
4070         spin_lock(&rbd_dev_list_lock);
4071         list_del_init(&rbd_dev->node);
4072
4073         /*
4074          * If the id being "put" is not the current maximum, there
4075          * is nothing special we need to do.
4076          */
4077         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4078                 spin_unlock(&rbd_dev_list_lock);
4079                 return;
4080         }
4081
4082         /*
4083          * We need to update the current maximum id.  Search the
4084          * list to find out what it is.  We're more likely to find
4085          * the maximum at the end, so search the list backward.
4086          */
4087         max_id = 0;
4088         list_for_each_prev(tmp, &rbd_dev_list) {
4089                 struct rbd_device *rbd_dev;
4090
4091                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4092                 if (rbd_dev->dev_id > max_id)
4093                         max_id = rbd_dev->dev_id;
4094         }
4095         spin_unlock(&rbd_dev_list_lock);
4096
4097         /*
4098          * The max id could have been updated by rbd_dev_id_get(), in
4099          * which case it now accurately reflects the new maximum.
4100          * Be careful not to overwrite the maximum value in that
4101          * case.
4102          */
4103         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4104         dout("  max dev id has been reset\n");
4105 }
4106
4107 /*
4108  * Skips over white space at *buf, and updates *buf to point to the
4109  * first found non-space character (if any). Returns the length of
4110  * the token (string of non-white space characters) found.  Note
4111  * that *buf must be terminated with '\0'.
4112  */
4113 static inline size_t next_token(const char **buf)
4114 {
4115         /*
4116         * These are the characters that produce nonzero for
4117         * isspace() in the "C" and "POSIX" locales.
4118         */
4119         const char *spaces = " \f\n\r\t\v";
4120
4121         *buf += strspn(*buf, spaces);   /* Find start of token */
4122
4123         return strcspn(*buf, spaces);   /* Return token length */
4124 }
4125
4126 /*
4127  * Finds the next token in *buf, and if the provided token buffer is
4128  * big enough, copies the found token into it.  The result, if
4129  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4130  * must be terminated with '\0' on entry.
4131  *
4132  * Returns the length of the token found (not including the '\0').
4133  * Return value will be 0 if no token is found, and it will be >=
4134  * token_size if the token would not fit.
4135  *
4136  * The *buf pointer will be updated to point beyond the end of the
4137  * found token.  Note that this occurs even if the token buffer is
4138  * too small to hold it.
4139  */
4140 static inline size_t copy_token(const char **buf,
4141                                 char *token,
4142                                 size_t token_size)
4143 {
4144         size_t len;
4145
4146         len = next_token(buf);
4147         if (len < token_size) {
4148                 memcpy(token, *buf, len);
4149                 *(token + len) = '\0';
4150         }
4151         *buf += len;
4152
4153         return len;
4154 }
4155
4156 /*
4157  * Finds the next token in *buf, dynamically allocates a buffer big
4158  * enough to hold a copy of it, and copies the token into the new
4159  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4160  * that a duplicate buffer is created even for a zero-length token.
4161  *
4162  * Returns a pointer to the newly-allocated duplicate, or a null
4163  * pointer if memory for the duplicate was not available.  If
4164  * the lenp argument is a non-null pointer, the length of the token
4165  * (not including the '\0') is returned in *lenp.
4166  *
4167  * If successful, the *buf pointer will be updated to point beyond
4168  * the end of the found token.
4169  *
4170  * Note: uses GFP_KERNEL for allocation.
4171  */
4172 static inline char *dup_token(const char **buf, size_t *lenp)
4173 {
4174         char *dup;
4175         size_t len;
4176
4177         len = next_token(buf);
4178         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4179         if (!dup)
4180                 return NULL;
4181         *(dup + len) = '\0';
4182         *buf += len;
4183
4184         if (lenp)
4185                 *lenp = len;
4186
4187         return dup;
4188 }
4189
4190 /*
4191  * Parse the options provided for an "rbd add" (i.e., rbd image
4192  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4193  * and the data written is passed here via a NUL-terminated buffer.
4194  * Returns 0 if successful or an error code otherwise.
4195  *
4196  * The information extracted from these options is recorded in
4197  * the other parameters which return dynamically-allocated
4198  * structures:
4199  *  ceph_opts
4200  *      The address of a pointer that will refer to a ceph options
4201  *      structure.  Caller must release the returned pointer using
4202  *      ceph_destroy_options() when it is no longer needed.
4203  *  rbd_opts
4204  *      Address of an rbd options pointer.  Fully initialized by
4205  *      this function; caller must release with kfree().
4206  *  spec
4207  *      Address of an rbd image specification pointer.  Fully
4208  *      initialized by this function based on parsed options.
4209  *      Caller must release with rbd_spec_put().
4210  *
4211  * The options passed take this form:
4212  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4213  * where:
4214  *  <mon_addrs>
4215  *      A comma-separated list of one or more monitor addresses.
4216  *      A monitor address is an ip address, optionally followed
4217  *      by a port number (separated by a colon).
4218  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4219  *  <options>
4220  *      A comma-separated list of ceph and/or rbd options.
4221  *  <pool_name>
4222  *      The name of the rados pool containing the rbd image.
4223  *  <image_name>
4224  *      The name of the image in that pool to map.
4225  *  <snap_id>
4226  *      An optional snapshot id.  If provided, the mapping will
4227  *      present data from the image at the time that snapshot was
4228  *      created.  The image head is used if no snapshot id is
4229  *      provided.  Snapshot mappings are always read-only.
4230  */
4231 static int rbd_add_parse_args(const char *buf,
4232                                 struct ceph_options **ceph_opts,
4233                                 struct rbd_options **opts,
4234                                 struct rbd_spec **rbd_spec)
4235 {
4236         size_t len;
4237         char *options;
4238         const char *mon_addrs;
4239         size_t mon_addrs_size;
4240         struct rbd_spec *spec = NULL;
4241         struct rbd_options *rbd_opts = NULL;
4242         struct ceph_options *copts;
4243         int ret;
4244
4245         /* The first four tokens are required */
4246
4247         len = next_token(&buf);
4248         if (!len) {
4249                 rbd_warn(NULL, "no monitor address(es) provided");
4250                 return -EINVAL;
4251         }
4252         mon_addrs = buf;
4253         mon_addrs_size = len + 1;
4254         buf += len;
4255
4256         ret = -EINVAL;
4257         options = dup_token(&buf, NULL);
4258         if (!options)
4259                 return -ENOMEM;
4260         if (!*options) {
4261                 rbd_warn(NULL, "no options provided");
4262                 goto out_err;
4263         }
4264
4265         spec = rbd_spec_alloc();
4266         if (!spec)
4267                 goto out_mem;
4268
4269         spec->pool_name = dup_token(&buf, NULL);
4270         if (!spec->pool_name)
4271                 goto out_mem;
4272         if (!*spec->pool_name) {
4273                 rbd_warn(NULL, "no pool name provided");
4274                 goto out_err;
4275         }
4276
4277         spec->image_name = dup_token(&buf, NULL);
4278         if (!spec->image_name)
4279                 goto out_mem;
4280         if (!*spec->image_name) {
4281                 rbd_warn(NULL, "no image name provided");
4282                 goto out_err;
4283         }
4284
4285         /*
4286          * Snapshot name is optional; default is to use "-"
4287          * (indicating the head/no snapshot).
4288          */
4289         len = next_token(&buf);
4290         if (!len) {
4291                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4292                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4293         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4294                 ret = -ENAMETOOLONG;
4295                 goto out_err;
4296         }
4297         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4298         if (!spec->snap_name)
4299                 goto out_mem;
4300         *(spec->snap_name + len) = '\0';
4301
4302         /* Initialize all rbd options to the defaults */
4303
4304         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4305         if (!rbd_opts)
4306                 goto out_mem;
4307
4308         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4309
4310         copts = ceph_parse_options(options, mon_addrs,
4311                                         mon_addrs + mon_addrs_size - 1,
4312                                         parse_rbd_opts_token, rbd_opts);
4313         if (IS_ERR(copts)) {
4314                 ret = PTR_ERR(copts);
4315                 goto out_err;
4316         }
4317         kfree(options);
4318
4319         *ceph_opts = copts;
4320         *opts = rbd_opts;
4321         *rbd_spec = spec;
4322
4323         return 0;
4324 out_mem:
4325         ret = -ENOMEM;
4326 out_err:
4327         kfree(rbd_opts);
4328         rbd_spec_put(spec);
4329         kfree(options);
4330
4331         return ret;
4332 }
4333
4334 /*
4335  * An rbd format 2 image has a unique identifier, distinct from the
4336  * name given to it by the user.  Internally, that identifier is
4337  * what's used to specify the names of objects related to the image.
4338  *
4339  * A special "rbd id" object is used to map an rbd image name to its
4340  * id.  If that object doesn't exist, then there is no v2 rbd image
4341  * with the supplied name.
4342  *
4343  * This function will record the given rbd_dev's image_id field if
4344  * it can be determined, and in that case will return 0.  If any
4345  * errors occur a negative errno will be returned and the rbd_dev's
4346  * image_id field will be unchanged (and should be NULL).
4347  */
4348 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4349 {
4350         int ret;
4351         size_t size;
4352         char *object_name;
4353         void *response;
4354         void *p;
4355
4356         /* If we already have it we don't need to look it up */
4357
4358         if (rbd_dev->spec->image_id)
4359                 return 0;
4360
4361         /*
4362          * When probing a parent image, the image id is already
4363          * known (and the image name likely is not).  There's no
4364          * need to fetch the image id again in this case.
4365          */
4366         if (rbd_dev->spec->image_id)
4367                 return 0;
4368
4369         /*
4370          * First, see if the format 2 image id file exists, and if
4371          * so, get the image's persistent id from it.
4372          */
4373         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4374         object_name = kmalloc(size, GFP_NOIO);
4375         if (!object_name)
4376                 return -ENOMEM;
4377         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4378         dout("rbd id object name is %s\n", object_name);
4379
4380         /* Response will be an encoded string, which includes a length */
4381
4382         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4383         response = kzalloc(size, GFP_NOIO);
4384         if (!response) {
4385                 ret = -ENOMEM;
4386                 goto out;
4387         }
4388
4389         ret = rbd_obj_method_sync(rbd_dev, object_name,
4390                                 "rbd", "get_id",
4391                                 NULL, 0,
4392                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4393         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4394         if (ret < 0)
4395                 goto out;
4396
4397         p = response;
4398         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4399                                                 p + RBD_IMAGE_ID_LEN_MAX,
4400                                                 NULL, GFP_NOIO);
4401         if (IS_ERR(rbd_dev->spec->image_id)) {
4402                 ret = PTR_ERR(rbd_dev->spec->image_id);
4403                 rbd_dev->spec->image_id = NULL;
4404         } else {
4405                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4406         }
4407 out:
4408         kfree(response);
4409         kfree(object_name);
4410
4411         return ret;
4412 }
4413
4414 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4415 {
4416         int ret;
4417         size_t size;
4418
4419         /* Version 1 images have no id; empty string is used */
4420
4421         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4422         if (!rbd_dev->spec->image_id)
4423                 return -ENOMEM;
4424
4425         /* Record the header object name for this rbd image. */
4426
4427         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4428         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4429         if (!rbd_dev->header_name) {
4430                 ret = -ENOMEM;
4431                 goto out_err;
4432         }
4433         sprintf(rbd_dev->header_name, "%s%s",
4434                 rbd_dev->spec->image_name, RBD_SUFFIX);
4435
4436         /* Populate rbd image metadata */
4437
4438         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4439         if (ret < 0)
4440                 goto out_err;
4441
4442         /* Version 1 images have no parent (no layering) */
4443
4444         rbd_dev->parent_spec = NULL;
4445         rbd_dev->parent_overlap = 0;
4446
4447         rbd_dev->image_format = 1;
4448
4449         dout("discovered version 1 image, header name is %s\n",
4450                 rbd_dev->header_name);
4451
4452         return 0;
4453
4454 out_err:
4455         kfree(rbd_dev->header_name);
4456         rbd_dev->header_name = NULL;
4457         kfree(rbd_dev->spec->image_id);
4458         rbd_dev->spec->image_id = NULL;
4459
4460         return ret;
4461 }
4462
4463 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4464 {
4465         size_t size;
4466         int ret;
4467         u64 ver = 0;
4468
4469         /*
4470          * Image id was filled in by the caller.  Record the header
4471          * object name for this rbd image.
4472          */
4473         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4474         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4475         if (!rbd_dev->header_name)
4476                 return -ENOMEM;
4477         sprintf(rbd_dev->header_name, "%s%s",
4478                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4479
4480         /* Get the size and object order for the image */
4481
4482         ret = rbd_dev_v2_image_size(rbd_dev);
4483         if (ret < 0)
4484                 goto out_err;
4485
4486         /* Get the object prefix (a.k.a. block_name) for the image */
4487
4488         ret = rbd_dev_v2_object_prefix(rbd_dev);
4489         if (ret < 0)
4490                 goto out_err;
4491
4492         /* Get the and check features for the image */
4493
4494         ret = rbd_dev_v2_features(rbd_dev);
4495         if (ret < 0)
4496                 goto out_err;
4497
4498         /* If the image supports layering, get the parent info */
4499
4500         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4501                 ret = rbd_dev_v2_parent_info(rbd_dev);
4502                 if (ret < 0)
4503                         goto out_err;
4504         }
4505
4506         /* crypto and compression type aren't (yet) supported for v2 images */
4507
4508         rbd_dev->header.crypt_type = 0;
4509         rbd_dev->header.comp_type = 0;
4510
4511         /* Get the snapshot context, plus the header version */
4512
4513         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4514         if (ret)
4515                 goto out_err;
4516         rbd_dev->header.obj_version = ver;
4517
4518         rbd_dev->image_format = 2;
4519
4520         dout("discovered version 2 image, header name is %s\n",
4521                 rbd_dev->header_name);
4522
4523         return 0;
4524 out_err:
4525         rbd_dev->parent_overlap = 0;
4526         rbd_spec_put(rbd_dev->parent_spec);
4527         rbd_dev->parent_spec = NULL;
4528         kfree(rbd_dev->header_name);
4529         rbd_dev->header_name = NULL;
4530         kfree(rbd_dev->header.object_prefix);
4531         rbd_dev->header.object_prefix = NULL;
4532
4533         return ret;
4534 }
4535
4536 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4537 {
4538         struct rbd_device *parent = NULL;
4539         struct rbd_spec *parent_spec = NULL;
4540         struct rbd_client *rbdc = NULL;
4541         int ret;
4542
4543         /* no need to lock here, as rbd_dev is not registered yet */
4544         ret = rbd_dev_snaps_update(rbd_dev);
4545         if (ret)
4546                 return ret;
4547
4548         ret = rbd_dev_probe_update_spec(rbd_dev);
4549         if (ret)
4550                 goto err_out_snaps;
4551
4552         ret = rbd_dev_set_mapping(rbd_dev);
4553         if (ret)
4554                 goto err_out_snaps;
4555
4556         /* generate unique id: find highest unique id, add one */
4557         rbd_dev_id_get(rbd_dev);
4558
4559         /* Fill in the device name, now that we have its id. */
4560         BUILD_BUG_ON(DEV_NAME_LEN
4561                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4562         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4563
4564         /* Get our block major device number. */
4565
4566         ret = register_blkdev(0, rbd_dev->name);
4567         if (ret < 0)
4568                 goto err_out_id;
4569         rbd_dev->major = ret;
4570
4571         /* Set up the blkdev mapping. */
4572
4573         ret = rbd_init_disk(rbd_dev);
4574         if (ret)
4575                 goto err_out_blkdev;
4576
4577         ret = rbd_bus_add_dev(rbd_dev);
4578         if (ret)
4579                 goto err_out_disk;
4580
4581         /*
4582          * At this point cleanup in the event of an error is the job
4583          * of the sysfs code (initiated by rbd_bus_del_dev()).
4584          */
4585         /* Probe the parent if there is one */
4586
4587         if (rbd_dev->parent_spec) {
4588                 /*
4589                  * We need to pass a reference to the client and the
4590                  * parent spec when creating the parent rbd_dev.
4591                  * Images related by parent/child relationships
4592                  * always share both.
4593                  */
4594                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4595                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4596
4597                 parent = rbd_dev_create(rbdc, parent_spec);
4598                 if (!parent) {
4599                         ret = -ENOMEM;
4600                         goto err_out_spec;
4601                 }
4602                 rbdc = NULL;            /* parent now owns reference */
4603                 parent_spec = NULL;     /* parent now owns reference */
4604                 ret = rbd_dev_probe(parent);
4605                 if (ret < 0)
4606                         goto err_out_parent;
4607                 rbd_dev->parent = parent;
4608         }
4609
4610         down_write(&rbd_dev->header_rwsem);
4611         ret = rbd_dev_snaps_register(rbd_dev);
4612         up_write(&rbd_dev->header_rwsem);
4613         if (ret)
4614                 goto err_out_bus;
4615
4616         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4617         if (ret)
4618                 goto err_out_bus;
4619
4620         /* Everything's ready.  Announce the disk to the world. */
4621
4622         add_disk(rbd_dev->disk);
4623
4624         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4625                 (unsigned long long) rbd_dev->mapping.size);
4626
4627         return ret;
4628
4629 err_out_parent:
4630         rbd_dev_destroy(parent);
4631 err_out_spec:
4632         rbd_spec_put(parent_spec);
4633         rbd_put_client(rbdc);
4634 err_out_bus:
4635         /* this will also clean up rest of rbd_dev stuff */
4636
4637         rbd_bus_del_dev(rbd_dev);
4638
4639         return ret;
4640 err_out_disk:
4641         rbd_free_disk(rbd_dev);
4642 err_out_blkdev:
4643         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4644 err_out_id:
4645         rbd_dev_id_put(rbd_dev);
4646 err_out_snaps:
4647         rbd_remove_all_snaps(rbd_dev);
4648
4649         return ret;
4650 }
4651
4652 /*
4653  * Probe for the existence of the header object for the given rbd
4654  * device.  For format 2 images this includes determining the image
4655  * id.
4656  */
4657 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4658 {
4659         int ret;
4660
4661         /*
4662          * Get the id from the image id object.  If it's not a
4663          * format 2 image, we'll get ENOENT back, and we'll assume
4664          * it's a format 1 image.
4665          */
4666         ret = rbd_dev_image_id(rbd_dev);
4667         if (ret)
4668                 ret = rbd_dev_v1_probe(rbd_dev);
4669         else
4670                 ret = rbd_dev_v2_probe(rbd_dev);
4671         if (ret) {
4672                 dout("probe failed, returning %d\n", ret);
4673
4674                 return ret;
4675         }
4676
4677         ret = rbd_dev_probe_finish(rbd_dev);
4678         if (ret)
4679                 rbd_header_free(&rbd_dev->header);
4680
4681         return ret;
4682 }
4683
4684 static ssize_t rbd_add(struct bus_type *bus,
4685                        const char *buf,
4686                        size_t count)
4687 {
4688         struct rbd_device *rbd_dev = NULL;
4689         struct ceph_options *ceph_opts = NULL;
4690         struct rbd_options *rbd_opts = NULL;
4691         struct rbd_spec *spec = NULL;
4692         struct rbd_client *rbdc;
4693         struct ceph_osd_client *osdc;
4694         int rc = -ENOMEM;
4695
4696         if (!try_module_get(THIS_MODULE))
4697                 return -ENODEV;
4698
4699         /* parse add command */
4700         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4701         if (rc < 0)
4702                 goto err_out_module;
4703
4704         rbdc = rbd_get_client(ceph_opts);
4705         if (IS_ERR(rbdc)) {
4706                 rc = PTR_ERR(rbdc);
4707                 goto err_out_args;
4708         }
4709         ceph_opts = NULL;       /* rbd_dev client now owns this */
4710
4711         /* pick the pool */
4712         osdc = &rbdc->client->osdc;
4713         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4714         if (rc < 0)
4715                 goto err_out_client;
4716         spec->pool_id = (u64) rc;
4717
4718         /* The ceph file layout needs to fit pool id in 32 bits */
4719
4720         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4721                 rc = -EIO;
4722                 goto err_out_client;
4723         }
4724
4725         rbd_dev = rbd_dev_create(rbdc, spec);
4726         if (!rbd_dev)
4727                 goto err_out_client;
4728         rbdc = NULL;            /* rbd_dev now owns this */
4729         spec = NULL;            /* rbd_dev now owns this */
4730
4731         rbd_dev->mapping.read_only = rbd_opts->read_only;
4732         kfree(rbd_opts);
4733         rbd_opts = NULL;        /* done with this */
4734
4735         rc = rbd_dev_probe(rbd_dev);
4736         if (rc < 0)
4737                 goto err_out_rbd_dev;
4738
4739         return count;
4740 err_out_rbd_dev:
4741         rbd_dev_destroy(rbd_dev);
4742 err_out_client:
4743         rbd_put_client(rbdc);
4744 err_out_args:
4745         if (ceph_opts)
4746                 ceph_destroy_options(ceph_opts);
4747         kfree(rbd_opts);
4748         rbd_spec_put(spec);
4749 err_out_module:
4750         module_put(THIS_MODULE);
4751
4752         dout("Error adding device %s\n", buf);
4753
4754         return (ssize_t) rc;
4755 }
4756
4757 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4758 {
4759         struct list_head *tmp;
4760         struct rbd_device *rbd_dev;
4761
4762         spin_lock(&rbd_dev_list_lock);
4763         list_for_each(tmp, &rbd_dev_list) {
4764                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4765                 if (rbd_dev->dev_id == dev_id) {
4766                         spin_unlock(&rbd_dev_list_lock);
4767                         return rbd_dev;
4768                 }
4769         }
4770         spin_unlock(&rbd_dev_list_lock);
4771         return NULL;
4772 }
4773
4774 static void rbd_dev_release(struct device *dev)
4775 {
4776         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4777
4778         if (rbd_dev->watch_event)
4779                 rbd_dev_header_watch_sync(rbd_dev, 0);
4780
4781         /* clean up and free blkdev */
4782         rbd_free_disk(rbd_dev);
4783         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4784
4785         /* release allocated disk header fields */
4786         rbd_header_free(&rbd_dev->header);
4787
4788         /* done with the id, and with the rbd_dev */
4789         rbd_dev_id_put(rbd_dev);
4790         rbd_assert(rbd_dev->rbd_client != NULL);
4791         rbd_dev_destroy(rbd_dev);
4792
4793         /* release module ref */
4794         module_put(THIS_MODULE);
4795 }
4796
4797 static void __rbd_remove(struct rbd_device *rbd_dev)
4798 {
4799         rbd_remove_all_snaps(rbd_dev);
4800         rbd_bus_del_dev(rbd_dev);
4801 }
4802
4803 static ssize_t rbd_remove(struct bus_type *bus,
4804                           const char *buf,
4805                           size_t count)
4806 {
4807         struct rbd_device *rbd_dev = NULL;
4808         int target_id, rc;
4809         unsigned long ul;
4810         int ret = count;
4811
4812         rc = strict_strtoul(buf, 10, &ul);
4813         if (rc)
4814                 return rc;
4815
4816         /* convert to int; abort if we lost anything in the conversion */
4817         target_id = (int) ul;
4818         if (target_id != ul)
4819                 return -EINVAL;
4820
4821         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4822
4823         rbd_dev = __rbd_get_dev(target_id);
4824         if (!rbd_dev) {
4825                 ret = -ENOENT;
4826                 goto done;
4827         }
4828
4829         spin_lock_irq(&rbd_dev->lock);
4830         if (rbd_dev->open_count)
4831                 ret = -EBUSY;
4832         else
4833                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4834         spin_unlock_irq(&rbd_dev->lock);
4835         if (ret < 0)
4836                 goto done;
4837
4838         while (rbd_dev->parent_spec) {
4839                 struct rbd_device *first = rbd_dev;
4840                 struct rbd_device *second = first->parent;
4841                 struct rbd_device *third;
4842
4843                 /*
4844                  * Follow to the parent with no grandparent and
4845                  * remove it.
4846                  */
4847                 while (second && (third = second->parent)) {
4848                         first = second;
4849                         second = third;
4850                 }
4851                 __rbd_remove(second);
4852                 rbd_spec_put(first->parent_spec);
4853                 first->parent_spec = NULL;
4854                 first->parent_overlap = 0;
4855                 first->parent = NULL;
4856         }
4857         __rbd_remove(rbd_dev);
4858
4859 done:
4860         mutex_unlock(&ctl_mutex);
4861
4862         return ret;
4863 }
4864
4865 /*
4866  * create control files in sysfs
4867  * /sys/bus/rbd/...
4868  */
4869 static int rbd_sysfs_init(void)
4870 {
4871         int ret;
4872
4873         ret = device_register(&rbd_root_dev);
4874         if (ret < 0)
4875                 return ret;
4876
4877         ret = bus_register(&rbd_bus_type);
4878         if (ret < 0)
4879                 device_unregister(&rbd_root_dev);
4880
4881         return ret;
4882 }
4883
4884 static void rbd_sysfs_cleanup(void)
4885 {
4886         bus_unregister(&rbd_bus_type);
4887         device_unregister(&rbd_root_dev);
4888 }
4889
4890 static int __init rbd_init(void)
4891 {
4892         int rc;
4893
4894         if (!libceph_compatible(NULL)) {
4895                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4896
4897                 return -EINVAL;
4898         }
4899         rc = rbd_sysfs_init();
4900         if (rc)
4901                 return rc;
4902         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4903         return 0;
4904 }
4905
4906 static void __exit rbd_exit(void)
4907 {
4908         rbd_sysfs_cleanup();
4909 }
4910
4911 module_init(rbd_init);
4912 module_exit(rbd_exit);
4913
4914 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4915 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4916 MODULE_DESCRIPTION("rados block device");
4917
4918 /* following authorship retained from original osdblk.c */
4919 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4920
4921 MODULE_LICENSE("GPL");