]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
211baa7f4f0b3fe5403cb1ab3bb41d94a69c4351
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176 };
177
178 struct rbd_obj_request {
179         const char              *object_name;
180         u64                     offset;         /* object start byte */
181         u64                     length;         /* bytes from offset */
182         unsigned long           flags;
183
184         struct rbd_img_request  *img_request;
185         u64                     img_offset;     /* image relative offset */
186         struct list_head        links;          /* img_request->obj_requests */
187         u32                     which;          /* posn image request list */
188
189         enum obj_request_type   type;
190         union {
191                 struct bio      *bio_list;
192                 struct {
193                         struct page     **pages;
194                         u32             page_count;
195                 };
196         };
197
198         struct ceph_osd_request *osd_req;
199
200         u64                     xferred;        /* bytes transferred */
201         u64                     version;
202         int                     result;
203
204         rbd_obj_callback_t      callback;
205         struct completion       completion;
206
207         struct kref             kref;
208 };
209
210 enum img_req_flags {
211         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
212         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
213         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
214 };
215
216 struct rbd_img_request {
217         struct rbd_device       *rbd_dev;
218         u64                     offset; /* starting image byte offset */
219         u64                     length; /* byte count from offset */
220         unsigned long           flags;
221         union {
222                 u64                     snap_id;        /* for reads */
223                 struct ceph_snap_context *snapc;        /* for writes */
224         };
225         union {
226                 struct request          *rq;            /* block request */
227                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
228         };
229         spinlock_t              completion_lock;/* protects next_completion */
230         u32                     next_completion;
231         rbd_img_callback_t      callback;
232         u64                     xferred;/* aggregate bytes transferred */
233         int                     result; /* first nonzero obj_request result */
234
235         u32                     obj_request_count;
236         struct list_head        obj_requests;   /* rbd_obj_request structs */
237
238         struct kref             kref;
239 };
240
241 #define for_each_obj_request(ireq, oreq) \
242         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
243 #define for_each_obj_request_from(ireq, oreq) \
244         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
245 #define for_each_obj_request_safe(ireq, oreq, n) \
246         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
247
248 struct rbd_snap {
249         struct  device          dev;
250         const char              *name;
251         u64                     size;
252         struct list_head        node;
253         u64                     id;
254         u64                     features;
255 };
256
257 struct rbd_mapping {
258         u64                     size;
259         u64                     features;
260         bool                    read_only;
261 };
262
263 /*
264  * a single device
265  */
266 struct rbd_device {
267         int                     dev_id;         /* blkdev unique id */
268
269         int                     major;          /* blkdev assigned major */
270         struct gendisk          *disk;          /* blkdev's gendisk and rq */
271
272         u32                     image_format;   /* Either 1 or 2 */
273         struct rbd_client       *rbd_client;
274
275         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
276
277         spinlock_t              lock;           /* queue, flags, open_count */
278
279         struct rbd_image_header header;
280         unsigned long           flags;          /* possibly lock protected */
281         struct rbd_spec         *spec;
282
283         char                    *header_name;
284
285         struct ceph_file_layout layout;
286
287         struct ceph_osd_event   *watch_event;
288         struct rbd_obj_request  *watch_request;
289
290         struct rbd_spec         *parent_spec;
291         u64                     parent_overlap;
292         struct rbd_device       *parent;
293
294         /* protects updating the header */
295         struct rw_semaphore     header_rwsem;
296
297         struct rbd_mapping      mapping;
298
299         struct list_head        node;
300
301         /* list of snapshots */
302         struct list_head        snaps;
303
304         /* sysfs related */
305         struct device           dev;
306         unsigned long           open_count;     /* protected by lock */
307 };
308
309 /*
310  * Flag bits for rbd_dev->flags.  If atomicity is required,
311  * rbd_dev->lock is used to protect access.
312  *
313  * Currently, only the "removing" flag (which is coupled with the
314  * "open_count" field) requires atomic access.
315  */
316 enum rbd_dev_flags {
317         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
318         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
319 };
320
321 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
322
323 static LIST_HEAD(rbd_dev_list);    /* devices */
324 static DEFINE_SPINLOCK(rbd_dev_list_lock);
325
326 static LIST_HEAD(rbd_client_list);              /* clients */
327 static DEFINE_SPINLOCK(rbd_client_list_lock);
328
329 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
330 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
331
332 static void rbd_dev_release(struct device *dev);
333 static void rbd_remove_snap_dev(struct rbd_snap *snap);
334
335 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
336                        size_t count);
337 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
338                           size_t count);
339 static int rbd_dev_probe(struct rbd_device *rbd_dev);
340
341 static struct bus_attribute rbd_bus_attrs[] = {
342         __ATTR(add, S_IWUSR, NULL, rbd_add),
343         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
344         __ATTR_NULL
345 };
346
347 static struct bus_type rbd_bus_type = {
348         .name           = "rbd",
349         .bus_attrs      = rbd_bus_attrs,
350 };
351
352 static void rbd_root_dev_release(struct device *dev)
353 {
354 }
355
356 static struct device rbd_root_dev = {
357         .init_name =    "rbd",
358         .release =      rbd_root_dev_release,
359 };
360
361 static __printf(2, 3)
362 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
363 {
364         struct va_format vaf;
365         va_list args;
366
367         va_start(args, fmt);
368         vaf.fmt = fmt;
369         vaf.va = &args;
370
371         if (!rbd_dev)
372                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
373         else if (rbd_dev->disk)
374                 printk(KERN_WARNING "%s: %s: %pV\n",
375                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
376         else if (rbd_dev->spec && rbd_dev->spec->image_name)
377                 printk(KERN_WARNING "%s: image %s: %pV\n",
378                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
379         else if (rbd_dev->spec && rbd_dev->spec->image_id)
380                 printk(KERN_WARNING "%s: id %s: %pV\n",
381                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
382         else    /* punt */
383                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
384                         RBD_DRV_NAME, rbd_dev, &vaf);
385         va_end(args);
386 }
387
388 #ifdef RBD_DEBUG
389 #define rbd_assert(expr)                                                \
390                 if (unlikely(!(expr))) {                                \
391                         printk(KERN_ERR "\nAssertion failure in %s() "  \
392                                                 "at line %d:\n\n"       \
393                                         "\trbd_assert(%s);\n\n",        \
394                                         __func__, __LINE__, #expr);     \
395                         BUG();                                          \
396                 }
397 #else /* !RBD_DEBUG */
398 #  define rbd_assert(expr)      ((void) 0)
399 #endif /* !RBD_DEBUG */
400
401 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
402
403 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
404 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
405
406 static int rbd_open(struct block_device *bdev, fmode_t mode)
407 {
408         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
409         bool removing = false;
410
411         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
412                 return -EROFS;
413
414         spin_lock_irq(&rbd_dev->lock);
415         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
416                 removing = true;
417         else
418                 rbd_dev->open_count++;
419         spin_unlock_irq(&rbd_dev->lock);
420         if (removing)
421                 return -ENOENT;
422
423         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
424         (void) get_device(&rbd_dev->dev);
425         set_device_ro(bdev, rbd_dev->mapping.read_only);
426         mutex_unlock(&ctl_mutex);
427
428         return 0;
429 }
430
431 static int rbd_release(struct gendisk *disk, fmode_t mode)
432 {
433         struct rbd_device *rbd_dev = disk->private_data;
434         unsigned long open_count_before;
435
436         spin_lock_irq(&rbd_dev->lock);
437         open_count_before = rbd_dev->open_count--;
438         spin_unlock_irq(&rbd_dev->lock);
439         rbd_assert(open_count_before > 0);
440
441         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
442         put_device(&rbd_dev->dev);
443         mutex_unlock(&ctl_mutex);
444
445         return 0;
446 }
447
448 static const struct block_device_operations rbd_bd_ops = {
449         .owner                  = THIS_MODULE,
450         .open                   = rbd_open,
451         .release                = rbd_release,
452 };
453
454 /*
455  * Initialize an rbd client instance.
456  * We own *ceph_opts.
457  */
458 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
459 {
460         struct rbd_client *rbdc;
461         int ret = -ENOMEM;
462
463         dout("%s:\n", __func__);
464         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
465         if (!rbdc)
466                 goto out_opt;
467
468         kref_init(&rbdc->kref);
469         INIT_LIST_HEAD(&rbdc->node);
470
471         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472
473         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
474         if (IS_ERR(rbdc->client))
475                 goto out_mutex;
476         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
477
478         ret = ceph_open_session(rbdc->client);
479         if (ret < 0)
480                 goto out_err;
481
482         spin_lock(&rbd_client_list_lock);
483         list_add_tail(&rbdc->node, &rbd_client_list);
484         spin_unlock(&rbd_client_list_lock);
485
486         mutex_unlock(&ctl_mutex);
487         dout("%s: rbdc %p\n", __func__, rbdc);
488
489         return rbdc;
490
491 out_err:
492         ceph_destroy_client(rbdc->client);
493 out_mutex:
494         mutex_unlock(&ctl_mutex);
495         kfree(rbdc);
496 out_opt:
497         if (ceph_opts)
498                 ceph_destroy_options(ceph_opts);
499         dout("%s: error %d\n", __func__, ret);
500
501         return ERR_PTR(ret);
502 }
503
504 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
505 {
506         kref_get(&rbdc->kref);
507
508         return rbdc;
509 }
510
511 /*
512  * Find a ceph client with specific addr and configuration.  If
513  * found, bump its reference count.
514  */
515 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
516 {
517         struct rbd_client *client_node;
518         bool found = false;
519
520         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
521                 return NULL;
522
523         spin_lock(&rbd_client_list_lock);
524         list_for_each_entry(client_node, &rbd_client_list, node) {
525                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
526                         __rbd_get_client(client_node);
527
528                         found = true;
529                         break;
530                 }
531         }
532         spin_unlock(&rbd_client_list_lock);
533
534         return found ? client_node : NULL;
535 }
536
537 /*
538  * mount options
539  */
540 enum {
541         Opt_last_int,
542         /* int args above */
543         Opt_last_string,
544         /* string args above */
545         Opt_read_only,
546         Opt_read_write,
547         /* Boolean args above */
548         Opt_last_bool,
549 };
550
551 static match_table_t rbd_opts_tokens = {
552         /* int args above */
553         /* string args above */
554         {Opt_read_only, "read_only"},
555         {Opt_read_only, "ro"},          /* Alternate spelling */
556         {Opt_read_write, "read_write"},
557         {Opt_read_write, "rw"},         /* Alternate spelling */
558         /* Boolean args above */
559         {-1, NULL}
560 };
561
562 struct rbd_options {
563         bool    read_only;
564 };
565
566 #define RBD_READ_ONLY_DEFAULT   false
567
568 static int parse_rbd_opts_token(char *c, void *private)
569 {
570         struct rbd_options *rbd_opts = private;
571         substring_t argstr[MAX_OPT_ARGS];
572         int token, intval, ret;
573
574         token = match_token(c, rbd_opts_tokens, argstr);
575         if (token < 0)
576                 return -EINVAL;
577
578         if (token < Opt_last_int) {
579                 ret = match_int(&argstr[0], &intval);
580                 if (ret < 0) {
581                         pr_err("bad mount option arg (not int) "
582                                "at '%s'\n", c);
583                         return ret;
584                 }
585                 dout("got int token %d val %d\n", token, intval);
586         } else if (token > Opt_last_int && token < Opt_last_string) {
587                 dout("got string token %d val %s\n", token,
588                      argstr[0].from);
589         } else if (token > Opt_last_string && token < Opt_last_bool) {
590                 dout("got Boolean token %d\n", token);
591         } else {
592                 dout("got token %d\n", token);
593         }
594
595         switch (token) {
596         case Opt_read_only:
597                 rbd_opts->read_only = true;
598                 break;
599         case Opt_read_write:
600                 rbd_opts->read_only = false;
601                 break;
602         default:
603                 rbd_assert(false);
604                 break;
605         }
606         return 0;
607 }
608
609 /*
610  * Get a ceph client with specific addr and configuration, if one does
611  * not exist create it.
612  */
613 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
614 {
615         struct rbd_client *rbdc;
616
617         rbdc = rbd_client_find(ceph_opts);
618         if (rbdc)       /* using an existing client */
619                 ceph_destroy_options(ceph_opts);
620         else
621                 rbdc = rbd_client_create(ceph_opts);
622
623         return rbdc;
624 }
625
626 /*
627  * Destroy ceph client
628  *
629  * Caller must hold rbd_client_list_lock.
630  */
631 static void rbd_client_release(struct kref *kref)
632 {
633         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
634
635         dout("%s: rbdc %p\n", __func__, rbdc);
636         spin_lock(&rbd_client_list_lock);
637         list_del(&rbdc->node);
638         spin_unlock(&rbd_client_list_lock);
639
640         ceph_destroy_client(rbdc->client);
641         kfree(rbdc);
642 }
643
644 /*
645  * Drop reference to ceph client node. If it's not referenced anymore, release
646  * it.
647  */
648 static void rbd_put_client(struct rbd_client *rbdc)
649 {
650         if (rbdc)
651                 kref_put(&rbdc->kref, rbd_client_release);
652 }
653
654 static bool rbd_image_format_valid(u32 image_format)
655 {
656         return image_format == 1 || image_format == 2;
657 }
658
659 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
660 {
661         size_t size;
662         u32 snap_count;
663
664         /* The header has to start with the magic rbd header text */
665         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
666                 return false;
667
668         /* The bio layer requires at least sector-sized I/O */
669
670         if (ondisk->options.order < SECTOR_SHIFT)
671                 return false;
672
673         /* If we use u64 in a few spots we may be able to loosen this */
674
675         if (ondisk->options.order > 8 * sizeof (int) - 1)
676                 return false;
677
678         /*
679          * The size of a snapshot header has to fit in a size_t, and
680          * that limits the number of snapshots.
681          */
682         snap_count = le32_to_cpu(ondisk->snap_count);
683         size = SIZE_MAX - sizeof (struct ceph_snap_context);
684         if (snap_count > size / sizeof (__le64))
685                 return false;
686
687         /*
688          * Not only that, but the size of the entire the snapshot
689          * header must also be representable in a size_t.
690          */
691         size -= snap_count * sizeof (__le64);
692         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
693                 return false;
694
695         return true;
696 }
697
698 /*
699  * Create a new header structure, translate header format from the on-disk
700  * header.
701  */
702 static int rbd_header_from_disk(struct rbd_image_header *header,
703                                  struct rbd_image_header_ondisk *ondisk)
704 {
705         u32 snap_count;
706         size_t len;
707         size_t size;
708         u32 i;
709
710         memset(header, 0, sizeof (*header));
711
712         snap_count = le32_to_cpu(ondisk->snap_count);
713
714         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
715         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
716         if (!header->object_prefix)
717                 return -ENOMEM;
718         memcpy(header->object_prefix, ondisk->object_prefix, len);
719         header->object_prefix[len] = '\0';
720
721         if (snap_count) {
722                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
723
724                 /* Save a copy of the snapshot names */
725
726                 if (snap_names_len > (u64) SIZE_MAX)
727                         return -EIO;
728                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
729                 if (!header->snap_names)
730                         goto out_err;
731                 /*
732                  * Note that rbd_dev_v1_header_read() guarantees
733                  * the ondisk buffer we're working with has
734                  * snap_names_len bytes beyond the end of the
735                  * snapshot id array, this memcpy() is safe.
736                  */
737                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
738                         snap_names_len);
739
740                 /* Record each snapshot's size */
741
742                 size = snap_count * sizeof (*header->snap_sizes);
743                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
744                 if (!header->snap_sizes)
745                         goto out_err;
746                 for (i = 0; i < snap_count; i++)
747                         header->snap_sizes[i] =
748                                 le64_to_cpu(ondisk->snaps[i].image_size);
749         } else {
750                 WARN_ON(ondisk->snap_names_len);
751                 header->snap_names = NULL;
752                 header->snap_sizes = NULL;
753         }
754
755         header->features = 0;   /* No features support in v1 images */
756         header->obj_order = ondisk->options.order;
757         header->crypt_type = ondisk->options.crypt_type;
758         header->comp_type = ondisk->options.comp_type;
759
760         /* Allocate and fill in the snapshot context */
761
762         header->image_size = le64_to_cpu(ondisk->image_size);
763         size = sizeof (struct ceph_snap_context);
764         size += snap_count * sizeof (header->snapc->snaps[0]);
765         header->snapc = kzalloc(size, GFP_KERNEL);
766         if (!header->snapc)
767                 goto out_err;
768
769         atomic_set(&header->snapc->nref, 1);
770         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
771         header->snapc->num_snaps = snap_count;
772         for (i = 0; i < snap_count; i++)
773                 header->snapc->snaps[i] =
774                         le64_to_cpu(ondisk->snaps[i].id);
775
776         return 0;
777
778 out_err:
779         kfree(header->snap_sizes);
780         header->snap_sizes = NULL;
781         kfree(header->snap_names);
782         header->snap_names = NULL;
783         kfree(header->object_prefix);
784         header->object_prefix = NULL;
785
786         return -ENOMEM;
787 }
788
789 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
790 {
791         struct rbd_snap *snap;
792
793         if (snap_id == CEPH_NOSNAP)
794                 return RBD_SNAP_HEAD_NAME;
795
796         list_for_each_entry(snap, &rbd_dev->snaps, node)
797                 if (snap_id == snap->id)
798                         return snap->name;
799
800         return NULL;
801 }
802
803 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
804 {
805
806         struct rbd_snap *snap;
807
808         list_for_each_entry(snap, &rbd_dev->snaps, node) {
809                 if (!strcmp(snap_name, snap->name)) {
810                         rbd_dev->spec->snap_id = snap->id;
811                         rbd_dev->mapping.size = snap->size;
812                         rbd_dev->mapping.features = snap->features;
813
814                         return 0;
815                 }
816         }
817
818         return -ENOENT;
819 }
820
821 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
822 {
823         int ret;
824
825         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
826                     sizeof (RBD_SNAP_HEAD_NAME))) {
827                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
828                 rbd_dev->mapping.size = rbd_dev->header.image_size;
829                 rbd_dev->mapping.features = rbd_dev->header.features;
830                 ret = 0;
831         } else {
832                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
833                 if (ret < 0)
834                         goto done;
835                 rbd_dev->mapping.read_only = true;
836         }
837         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
838
839 done:
840         return ret;
841 }
842
843 static void rbd_header_free(struct rbd_image_header *header)
844 {
845         kfree(header->object_prefix);
846         header->object_prefix = NULL;
847         kfree(header->snap_sizes);
848         header->snap_sizes = NULL;
849         kfree(header->snap_names);
850         header->snap_names = NULL;
851         ceph_put_snap_context(header->snapc);
852         header->snapc = NULL;
853 }
854
855 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
856 {
857         char *name;
858         u64 segment;
859         int ret;
860
861         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
862         if (!name)
863                 return NULL;
864         segment = offset >> rbd_dev->header.obj_order;
865         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
866                         rbd_dev->header.object_prefix, segment);
867         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
868                 pr_err("error formatting segment name for #%llu (%d)\n",
869                         segment, ret);
870                 kfree(name);
871                 name = NULL;
872         }
873
874         return name;
875 }
876
877 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
878 {
879         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
880
881         return offset & (segment_size - 1);
882 }
883
884 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
885                                 u64 offset, u64 length)
886 {
887         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
888
889         offset &= segment_size - 1;
890
891         rbd_assert(length <= U64_MAX - offset);
892         if (offset + length > segment_size)
893                 length = segment_size - offset;
894
895         return length;
896 }
897
898 /*
899  * returns the size of an object in the image
900  */
901 static u64 rbd_obj_bytes(struct rbd_image_header *header)
902 {
903         return 1 << header->obj_order;
904 }
905
906 /*
907  * bio helpers
908  */
909
910 static void bio_chain_put(struct bio *chain)
911 {
912         struct bio *tmp;
913
914         while (chain) {
915                 tmp = chain;
916                 chain = chain->bi_next;
917                 bio_put(tmp);
918         }
919 }
920
921 /*
922  * zeros a bio chain, starting at specific offset
923  */
924 static void zero_bio_chain(struct bio *chain, int start_ofs)
925 {
926         struct bio_vec *bv;
927         unsigned long flags;
928         void *buf;
929         int i;
930         int pos = 0;
931
932         while (chain) {
933                 bio_for_each_segment(bv, chain, i) {
934                         if (pos + bv->bv_len > start_ofs) {
935                                 int remainder = max(start_ofs - pos, 0);
936                                 buf = bvec_kmap_irq(bv, &flags);
937                                 memset(buf + remainder, 0,
938                                        bv->bv_len - remainder);
939                                 bvec_kunmap_irq(buf, &flags);
940                         }
941                         pos += bv->bv_len;
942                 }
943
944                 chain = chain->bi_next;
945         }
946 }
947
948 /*
949  * Clone a portion of a bio, starting at the given byte offset
950  * and continuing for the number of bytes indicated.
951  */
952 static struct bio *bio_clone_range(struct bio *bio_src,
953                                         unsigned int offset,
954                                         unsigned int len,
955                                         gfp_t gfpmask)
956 {
957         struct bio_vec *bv;
958         unsigned int resid;
959         unsigned short idx;
960         unsigned int voff;
961         unsigned short end_idx;
962         unsigned short vcnt;
963         struct bio *bio;
964
965         /* Handle the easy case for the caller */
966
967         if (!offset && len == bio_src->bi_size)
968                 return bio_clone(bio_src, gfpmask);
969
970         if (WARN_ON_ONCE(!len))
971                 return NULL;
972         if (WARN_ON_ONCE(len > bio_src->bi_size))
973                 return NULL;
974         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
975                 return NULL;
976
977         /* Find first affected segment... */
978
979         resid = offset;
980         __bio_for_each_segment(bv, bio_src, idx, 0) {
981                 if (resid < bv->bv_len)
982                         break;
983                 resid -= bv->bv_len;
984         }
985         voff = resid;
986
987         /* ...and the last affected segment */
988
989         resid += len;
990         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
991                 if (resid <= bv->bv_len)
992                         break;
993                 resid -= bv->bv_len;
994         }
995         vcnt = end_idx - idx + 1;
996
997         /* Build the clone */
998
999         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1000         if (!bio)
1001                 return NULL;    /* ENOMEM */
1002
1003         bio->bi_bdev = bio_src->bi_bdev;
1004         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1005         bio->bi_rw = bio_src->bi_rw;
1006         bio->bi_flags |= 1 << BIO_CLONED;
1007
1008         /*
1009          * Copy over our part of the bio_vec, then update the first
1010          * and last (or only) entries.
1011          */
1012         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1013                         vcnt * sizeof (struct bio_vec));
1014         bio->bi_io_vec[0].bv_offset += voff;
1015         if (vcnt > 1) {
1016                 bio->bi_io_vec[0].bv_len -= voff;
1017                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1018         } else {
1019                 bio->bi_io_vec[0].bv_len = len;
1020         }
1021
1022         bio->bi_vcnt = vcnt;
1023         bio->bi_size = len;
1024         bio->bi_idx = 0;
1025
1026         return bio;
1027 }
1028
1029 /*
1030  * Clone a portion of a bio chain, starting at the given byte offset
1031  * into the first bio in the source chain and continuing for the
1032  * number of bytes indicated.  The result is another bio chain of
1033  * exactly the given length, or a null pointer on error.
1034  *
1035  * The bio_src and offset parameters are both in-out.  On entry they
1036  * refer to the first source bio and the offset into that bio where
1037  * the start of data to be cloned is located.
1038  *
1039  * On return, bio_src is updated to refer to the bio in the source
1040  * chain that contains first un-cloned byte, and *offset will
1041  * contain the offset of that byte within that bio.
1042  */
1043 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1044                                         unsigned int *offset,
1045                                         unsigned int len,
1046                                         gfp_t gfpmask)
1047 {
1048         struct bio *bi = *bio_src;
1049         unsigned int off = *offset;
1050         struct bio *chain = NULL;
1051         struct bio **end;
1052
1053         /* Build up a chain of clone bios up to the limit */
1054
1055         if (!bi || off >= bi->bi_size || !len)
1056                 return NULL;            /* Nothing to clone */
1057
1058         end = &chain;
1059         while (len) {
1060                 unsigned int bi_size;
1061                 struct bio *bio;
1062
1063                 if (!bi) {
1064                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1065                         goto out_err;   /* EINVAL; ran out of bio's */
1066                 }
1067                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1068                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1069                 if (!bio)
1070                         goto out_err;   /* ENOMEM */
1071
1072                 *end = bio;
1073                 end = &bio->bi_next;
1074
1075                 off += bi_size;
1076                 if (off == bi->bi_size) {
1077                         bi = bi->bi_next;
1078                         off = 0;
1079                 }
1080                 len -= bi_size;
1081         }
1082         *bio_src = bi;
1083         *offset = off;
1084
1085         return chain;
1086 out_err:
1087         bio_chain_put(chain);
1088
1089         return NULL;
1090 }
1091
1092 /*
1093  * The default/initial value for all object request flags is 0.  For
1094  * each flag, once its value is set to 1 it is never reset to 0
1095  * again.
1096  */
1097 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1098 {
1099         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1100                 struct rbd_device *rbd_dev;
1101
1102                 rbd_dev = obj_request->img_request->rbd_dev;
1103                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1104                         obj_request);
1105         }
1106 }
1107
1108 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1109 {
1110         smp_mb();
1111         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1112 }
1113
1114 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1115 {
1116         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1117                 struct rbd_device *rbd_dev = NULL;
1118
1119                 if (obj_request_img_data_test(obj_request))
1120                         rbd_dev = obj_request->img_request->rbd_dev;
1121                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1122                         obj_request);
1123         }
1124 }
1125
1126 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1127 {
1128         smp_mb();
1129         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1130 }
1131
1132 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1133 {
1134         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1135                 atomic_read(&obj_request->kref.refcount));
1136         kref_get(&obj_request->kref);
1137 }
1138
1139 static void rbd_obj_request_destroy(struct kref *kref);
1140 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1141 {
1142         rbd_assert(obj_request != NULL);
1143         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1144                 atomic_read(&obj_request->kref.refcount));
1145         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1146 }
1147
1148 static void rbd_img_request_get(struct rbd_img_request *img_request)
1149 {
1150         dout("%s: img %p (was %d)\n", __func__, img_request,
1151                 atomic_read(&img_request->kref.refcount));
1152         kref_get(&img_request->kref);
1153 }
1154
1155 static void rbd_img_request_destroy(struct kref *kref);
1156 static void rbd_img_request_put(struct rbd_img_request *img_request)
1157 {
1158         rbd_assert(img_request != NULL);
1159         dout("%s: img %p (was %d)\n", __func__, img_request,
1160                 atomic_read(&img_request->kref.refcount));
1161         kref_put(&img_request->kref, rbd_img_request_destroy);
1162 }
1163
1164 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1165                                         struct rbd_obj_request *obj_request)
1166 {
1167         rbd_assert(obj_request->img_request == NULL);
1168
1169         /* Image request now owns object's original reference */
1170         obj_request->img_request = img_request;
1171         obj_request->which = img_request->obj_request_count;
1172         rbd_assert(!obj_request_img_data_test(obj_request));
1173         obj_request_img_data_set(obj_request);
1174         rbd_assert(obj_request->which != BAD_WHICH);
1175         img_request->obj_request_count++;
1176         list_add_tail(&obj_request->links, &img_request->obj_requests);
1177         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1178                 obj_request->which);
1179 }
1180
1181 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1182                                         struct rbd_obj_request *obj_request)
1183 {
1184         rbd_assert(obj_request->which != BAD_WHICH);
1185
1186         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1187                 obj_request->which);
1188         list_del(&obj_request->links);
1189         rbd_assert(img_request->obj_request_count > 0);
1190         img_request->obj_request_count--;
1191         rbd_assert(obj_request->which == img_request->obj_request_count);
1192         obj_request->which = BAD_WHICH;
1193         rbd_assert(obj_request_img_data_test(obj_request));
1194         rbd_assert(obj_request->img_request == img_request);
1195         obj_request->img_request = NULL;
1196         obj_request->callback = NULL;
1197         rbd_obj_request_put(obj_request);
1198 }
1199
1200 static bool obj_request_type_valid(enum obj_request_type type)
1201 {
1202         switch (type) {
1203         case OBJ_REQUEST_NODATA:
1204         case OBJ_REQUEST_BIO:
1205         case OBJ_REQUEST_PAGES:
1206                 return true;
1207         default:
1208                 return false;
1209         }
1210 }
1211
1212 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1213                                 struct rbd_obj_request *obj_request)
1214 {
1215         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1216
1217         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1218 }
1219
1220 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1221 {
1222
1223         dout("%s: img %p\n", __func__, img_request);
1224
1225         /*
1226          * If no error occurred, compute the aggregate transfer
1227          * count for the image request.  We could instead use
1228          * atomic64_cmpxchg() to update it as each object request
1229          * completes; not clear which way is better off hand.
1230          */
1231         if (!img_request->result) {
1232                 struct rbd_obj_request *obj_request;
1233                 u64 xferred = 0;
1234
1235                 for_each_obj_request(img_request, obj_request)
1236                         xferred += obj_request->xferred;
1237                 img_request->xferred = xferred;
1238         }
1239
1240         if (img_request->callback)
1241                 img_request->callback(img_request);
1242         else
1243                 rbd_img_request_put(img_request);
1244 }
1245
1246 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1247
1248 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1249 {
1250         dout("%s: obj %p\n", __func__, obj_request);
1251
1252         return wait_for_completion_interruptible(&obj_request->completion);
1253 }
1254
1255 /*
1256  * The default/initial value for all image request flags is 0.  Each
1257  * is conditionally set to 1 at image request initialization time
1258  * and currently never change thereafter.
1259  */
1260 static void img_request_write_set(struct rbd_img_request *img_request)
1261 {
1262         set_bit(IMG_REQ_WRITE, &img_request->flags);
1263         smp_mb();
1264 }
1265
1266 static bool img_request_write_test(struct rbd_img_request *img_request)
1267 {
1268         smp_mb();
1269         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1270 }
1271
1272 static void img_request_child_set(struct rbd_img_request *img_request)
1273 {
1274         set_bit(IMG_REQ_CHILD, &img_request->flags);
1275         smp_mb();
1276 }
1277
1278 static bool img_request_child_test(struct rbd_img_request *img_request)
1279 {
1280         smp_mb();
1281         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1282 }
1283
1284 static void img_request_layered_set(struct rbd_img_request *img_request)
1285 {
1286         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1287         smp_mb();
1288 }
1289
1290 static bool img_request_layered_test(struct rbd_img_request *img_request)
1291 {
1292         smp_mb();
1293         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1294 }
1295
1296 static void
1297 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1298 {
1299         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1300                 obj_request, obj_request->img_request, obj_request->result,
1301                 obj_request->xferred, obj_request->length);
1302         /*
1303          * ENOENT means a hole in the image.  We zero-fill the
1304          * entire length of the request.  A short read also implies
1305          * zero-fill to the end of the request.  Either way we
1306          * update the xferred count to indicate the whole request
1307          * was satisfied.
1308          */
1309         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1310         if (obj_request->result == -ENOENT) {
1311                 zero_bio_chain(obj_request->bio_list, 0);
1312                 obj_request->result = 0;
1313                 obj_request->xferred = obj_request->length;
1314         } else if (obj_request->xferred < obj_request->length &&
1315                         !obj_request->result) {
1316                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1317                 obj_request->xferred = obj_request->length;
1318         }
1319         obj_request_done_set(obj_request);
1320 }
1321
1322 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1323 {
1324         dout("%s: obj %p cb %p\n", __func__, obj_request,
1325                 obj_request->callback);
1326         if (obj_request->callback)
1327                 obj_request->callback(obj_request);
1328         else
1329                 complete_all(&obj_request->completion);
1330 }
1331
1332 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1333 {
1334         dout("%s: obj %p\n", __func__, obj_request);
1335         obj_request_done_set(obj_request);
1336 }
1337
1338 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1339 {
1340         struct rbd_img_request *img_request = NULL;
1341         bool layered = false;
1342
1343         if (obj_request_img_data_test(obj_request)) {
1344                 img_request = obj_request->img_request;
1345                 layered = img_request && img_request_layered_test(img_request);
1346         } else {
1347                 img_request = NULL;
1348                 layered = false;
1349         }
1350
1351         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1352                 obj_request, img_request, obj_request->result,
1353                 obj_request->xferred, obj_request->length);
1354         if (layered && obj_request->result == -ENOENT)
1355                 rbd_img_parent_read(obj_request);
1356         else if (img_request)
1357                 rbd_img_obj_request_read_callback(obj_request);
1358         else
1359                 obj_request_done_set(obj_request);
1360 }
1361
1362 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1363 {
1364         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1365                 obj_request->result, obj_request->length);
1366         /*
1367          * There is no such thing as a successful short write.  Set
1368          * it to our originally-requested length.
1369          */
1370         obj_request->xferred = obj_request->length;
1371         obj_request_done_set(obj_request);
1372 }
1373
1374 /*
1375  * For a simple stat call there's nothing to do.  We'll do more if
1376  * this is part of a write sequence for a layered image.
1377  */
1378 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1379 {
1380         dout("%s: obj %p\n", __func__, obj_request);
1381         obj_request_done_set(obj_request);
1382 }
1383
1384 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1385                                 struct ceph_msg *msg)
1386 {
1387         struct rbd_obj_request *obj_request = osd_req->r_priv;
1388         u16 opcode;
1389
1390         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1391         rbd_assert(osd_req == obj_request->osd_req);
1392         if (obj_request_img_data_test(obj_request)) {
1393                 rbd_assert(obj_request->img_request);
1394                 rbd_assert(obj_request->which != BAD_WHICH);
1395         } else {
1396                 rbd_assert(obj_request->which == BAD_WHICH);
1397         }
1398
1399         if (osd_req->r_result < 0)
1400                 obj_request->result = osd_req->r_result;
1401         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1402
1403         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1404
1405         /*
1406          * We support a 64-bit length, but ultimately it has to be
1407          * passed to blk_end_request(), which takes an unsigned int.
1408          */
1409         obj_request->xferred = osd_req->r_reply_op_len[0];
1410         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1411         opcode = osd_req->r_ops[0].op;
1412         switch (opcode) {
1413         case CEPH_OSD_OP_READ:
1414                 rbd_osd_read_callback(obj_request);
1415                 break;
1416         case CEPH_OSD_OP_WRITE:
1417                 rbd_osd_write_callback(obj_request);
1418                 break;
1419         case CEPH_OSD_OP_STAT:
1420                 rbd_osd_stat_callback(obj_request);
1421                 break;
1422         case CEPH_OSD_OP_CALL:
1423         case CEPH_OSD_OP_NOTIFY_ACK:
1424         case CEPH_OSD_OP_WATCH:
1425                 rbd_osd_trivial_callback(obj_request);
1426                 break;
1427         default:
1428                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1429                         obj_request->object_name, (unsigned short) opcode);
1430                 break;
1431         }
1432
1433         if (obj_request_done_test(obj_request))
1434                 rbd_obj_request_complete(obj_request);
1435 }
1436
1437 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1438                                         bool write_request)
1439 {
1440         struct rbd_img_request *img_request = obj_request->img_request;
1441         struct ceph_osd_request *osd_req = obj_request->osd_req;
1442         struct ceph_snap_context *snapc = NULL;
1443         u64 snap_id = CEPH_NOSNAP;
1444         struct timespec *mtime = NULL;
1445         struct timespec now;
1446
1447         rbd_assert(osd_req != NULL);
1448
1449         if (write_request) {
1450                 now = CURRENT_TIME;
1451                 mtime = &now;
1452                 if (img_request)
1453                         snapc = img_request->snapc;
1454         } else if (img_request) {
1455                 snap_id = img_request->snap_id;
1456         }
1457         ceph_osdc_build_request(osd_req, obj_request->offset,
1458                         snapc, snap_id, mtime);
1459 }
1460
1461 static struct ceph_osd_request *rbd_osd_req_create(
1462                                         struct rbd_device *rbd_dev,
1463                                         bool write_request,
1464                                         struct rbd_obj_request *obj_request)
1465 {
1466         struct ceph_snap_context *snapc = NULL;
1467         struct ceph_osd_client *osdc;
1468         struct ceph_osd_request *osd_req;
1469
1470         if (obj_request_img_data_test(obj_request)) {
1471                 struct rbd_img_request *img_request = obj_request->img_request;
1472
1473                 rbd_assert(write_request ==
1474                                 img_request_write_test(img_request));
1475                 if (write_request)
1476                         snapc = img_request->snapc;
1477         }
1478
1479         /* Allocate and initialize the request, for the single op */
1480
1481         osdc = &rbd_dev->rbd_client->client->osdc;
1482         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1483         if (!osd_req)
1484                 return NULL;    /* ENOMEM */
1485
1486         if (write_request)
1487                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1488         else
1489                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1490
1491         osd_req->r_callback = rbd_osd_req_callback;
1492         osd_req->r_priv = obj_request;
1493
1494         osd_req->r_oid_len = strlen(obj_request->object_name);
1495         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1496         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1497
1498         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1499
1500         return osd_req;
1501 }
1502
1503 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1504 {
1505         ceph_osdc_put_request(osd_req);
1506 }
1507
1508 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1509
1510 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1511                                                 u64 offset, u64 length,
1512                                                 enum obj_request_type type)
1513 {
1514         struct rbd_obj_request *obj_request;
1515         size_t size;
1516         char *name;
1517
1518         rbd_assert(obj_request_type_valid(type));
1519
1520         size = strlen(object_name) + 1;
1521         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1522         if (!obj_request)
1523                 return NULL;
1524
1525         name = (char *)(obj_request + 1);
1526         obj_request->object_name = memcpy(name, object_name, size);
1527         obj_request->offset = offset;
1528         obj_request->length = length;
1529         obj_request->flags = 0;
1530         obj_request->which = BAD_WHICH;
1531         obj_request->type = type;
1532         INIT_LIST_HEAD(&obj_request->links);
1533         init_completion(&obj_request->completion);
1534         kref_init(&obj_request->kref);
1535
1536         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1537                 offset, length, (int)type, obj_request);
1538
1539         return obj_request;
1540 }
1541
1542 static void rbd_obj_request_destroy(struct kref *kref)
1543 {
1544         struct rbd_obj_request *obj_request;
1545
1546         obj_request = container_of(kref, struct rbd_obj_request, kref);
1547
1548         dout("%s: obj %p\n", __func__, obj_request);
1549
1550         rbd_assert(obj_request->img_request == NULL);
1551         rbd_assert(obj_request->which == BAD_WHICH);
1552
1553         if (obj_request->osd_req)
1554                 rbd_osd_req_destroy(obj_request->osd_req);
1555
1556         rbd_assert(obj_request_type_valid(obj_request->type));
1557         switch (obj_request->type) {
1558         case OBJ_REQUEST_NODATA:
1559                 break;          /* Nothing to do */
1560         case OBJ_REQUEST_BIO:
1561                 if (obj_request->bio_list)
1562                         bio_chain_put(obj_request->bio_list);
1563                 break;
1564         case OBJ_REQUEST_PAGES:
1565                 if (obj_request->pages)
1566                         ceph_release_page_vector(obj_request->pages,
1567                                                 obj_request->page_count);
1568                 break;
1569         }
1570
1571         kfree(obj_request);
1572 }
1573
1574 /*
1575  * Caller is responsible for filling in the list of object requests
1576  * that comprises the image request, and the Linux request pointer
1577  * (if there is one).
1578  */
1579 static struct rbd_img_request *rbd_img_request_create(
1580                                         struct rbd_device *rbd_dev,
1581                                         u64 offset, u64 length,
1582                                         bool write_request,
1583                                         bool child_request)
1584 {
1585         struct rbd_img_request *img_request;
1586         struct ceph_snap_context *snapc = NULL;
1587
1588         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1589         if (!img_request)
1590                 return NULL;
1591
1592         if (write_request) {
1593                 down_read(&rbd_dev->header_rwsem);
1594                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1595                 up_read(&rbd_dev->header_rwsem);
1596                 if (WARN_ON(!snapc)) {
1597                         kfree(img_request);
1598                         return NULL;    /* Shouldn't happen */
1599                 }
1600
1601         }
1602
1603         img_request->rq = NULL;
1604         img_request->rbd_dev = rbd_dev;
1605         img_request->offset = offset;
1606         img_request->length = length;
1607         img_request->flags = 0;
1608         if (write_request) {
1609                 img_request_write_set(img_request);
1610                 img_request->snapc = snapc;
1611         } else {
1612                 img_request->snap_id = rbd_dev->spec->snap_id;
1613         }
1614         if (child_request)
1615                 img_request_child_set(img_request);
1616         if (rbd_dev->parent_spec)
1617                 img_request_layered_set(img_request);
1618         spin_lock_init(&img_request->completion_lock);
1619         img_request->next_completion = 0;
1620         img_request->callback = NULL;
1621         img_request->result = 0;
1622         img_request->obj_request_count = 0;
1623         INIT_LIST_HEAD(&img_request->obj_requests);
1624         kref_init(&img_request->kref);
1625
1626         rbd_img_request_get(img_request);       /* Avoid a warning */
1627         rbd_img_request_put(img_request);       /* TEMPORARY */
1628
1629         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1630                 write_request ? "write" : "read", offset, length,
1631                 img_request);
1632
1633         return img_request;
1634 }
1635
1636 static void rbd_img_request_destroy(struct kref *kref)
1637 {
1638         struct rbd_img_request *img_request;
1639         struct rbd_obj_request *obj_request;
1640         struct rbd_obj_request *next_obj_request;
1641
1642         img_request = container_of(kref, struct rbd_img_request, kref);
1643
1644         dout("%s: img %p\n", __func__, img_request);
1645
1646         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1647                 rbd_img_obj_request_del(img_request, obj_request);
1648         rbd_assert(img_request->obj_request_count == 0);
1649
1650         if (img_request_write_test(img_request))
1651                 ceph_put_snap_context(img_request->snapc);
1652
1653         if (img_request_child_test(img_request))
1654                 rbd_obj_request_put(img_request->obj_request);
1655
1656         kfree(img_request);
1657 }
1658
1659 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1660 {
1661         struct rbd_img_request *img_request;
1662         unsigned int xferred;
1663         int result;
1664         bool more;
1665
1666         rbd_assert(obj_request_img_data_test(obj_request));
1667         img_request = obj_request->img_request;
1668
1669         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1670         xferred = (unsigned int)obj_request->xferred;
1671         result = obj_request->result;
1672         if (result) {
1673                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1674
1675                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1676                         img_request_write_test(img_request) ? "write" : "read",
1677                         obj_request->length, obj_request->img_offset,
1678                         obj_request->offset);
1679                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1680                         result, xferred);
1681                 if (!img_request->result)
1682                         img_request->result = result;
1683         }
1684
1685         if (img_request_child_test(img_request)) {
1686                 rbd_assert(img_request->obj_request != NULL);
1687                 more = obj_request->which < img_request->obj_request_count - 1;
1688         } else {
1689                 rbd_assert(img_request->rq != NULL);
1690                 more = blk_end_request(img_request->rq, result, xferred);
1691         }
1692
1693         return more;
1694 }
1695
1696 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1697 {
1698         struct rbd_img_request *img_request;
1699         u32 which = obj_request->which;
1700         bool more = true;
1701
1702         rbd_assert(obj_request_img_data_test(obj_request));
1703         img_request = obj_request->img_request;
1704
1705         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1706         rbd_assert(img_request != NULL);
1707         rbd_assert(img_request->obj_request_count > 0);
1708         rbd_assert(which != BAD_WHICH);
1709         rbd_assert(which < img_request->obj_request_count);
1710         rbd_assert(which >= img_request->next_completion);
1711
1712         spin_lock_irq(&img_request->completion_lock);
1713         if (which != img_request->next_completion)
1714                 goto out;
1715
1716         for_each_obj_request_from(img_request, obj_request) {
1717                 rbd_assert(more);
1718                 rbd_assert(which < img_request->obj_request_count);
1719
1720                 if (!obj_request_done_test(obj_request))
1721                         break;
1722                 more = rbd_img_obj_end_request(obj_request);
1723                 which++;
1724         }
1725
1726         rbd_assert(more ^ (which == img_request->obj_request_count));
1727         img_request->next_completion = which;
1728 out:
1729         spin_unlock_irq(&img_request->completion_lock);
1730
1731         if (!more)
1732                 rbd_img_request_complete(img_request);
1733 }
1734
1735 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1736                                         struct bio *bio_list)
1737 {
1738         struct rbd_device *rbd_dev = img_request->rbd_dev;
1739         struct rbd_obj_request *obj_request = NULL;
1740         struct rbd_obj_request *next_obj_request;
1741         bool write_request = img_request_write_test(img_request);
1742         unsigned int bio_offset;
1743         u64 img_offset;
1744         u64 resid;
1745         u16 opcode;
1746
1747         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1748
1749         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1750         bio_offset = 0;
1751         img_offset = img_request->offset;
1752         rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1753         resid = img_request->length;
1754         rbd_assert(resid > 0);
1755         while (resid) {
1756                 struct ceph_osd_request *osd_req;
1757                 const char *object_name;
1758                 unsigned int clone_size;
1759                 u64 offset;
1760                 u64 length;
1761
1762                 object_name = rbd_segment_name(rbd_dev, img_offset);
1763                 if (!object_name)
1764                         goto out_unwind;
1765                 offset = rbd_segment_offset(rbd_dev, img_offset);
1766                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1767                 obj_request = rbd_obj_request_create(object_name,
1768                                                 offset, length,
1769                                                 OBJ_REQUEST_BIO);
1770                 kfree(object_name);     /* object request has its own copy */
1771                 if (!obj_request)
1772                         goto out_unwind;
1773
1774                 rbd_assert(length <= (u64) UINT_MAX);
1775                 clone_size = (unsigned int) length;
1776                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1777                                                 &bio_offset, clone_size,
1778                                                 GFP_ATOMIC);
1779                 if (!obj_request->bio_list)
1780                         goto out_partial;
1781
1782                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1783                                                 obj_request);
1784                 if (!osd_req)
1785                         goto out_partial;
1786                 obj_request->osd_req = osd_req;
1787                 obj_request->callback = rbd_img_obj_callback;
1788
1789                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1790                                                 0, 0);
1791                 osd_req_op_extent_osd_data_bio(osd_req, 0,
1792                                 obj_request->bio_list, obj_request->length);
1793                 rbd_osd_req_format(obj_request, write_request);
1794
1795                 obj_request->img_offset = img_offset;
1796                 rbd_img_obj_request_add(img_request, obj_request);
1797
1798                 img_offset += length;
1799                 resid -= length;
1800         }
1801
1802         return 0;
1803
1804 out_partial:
1805         rbd_obj_request_put(obj_request);
1806 out_unwind:
1807         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1808                 rbd_obj_request_put(obj_request);
1809
1810         return -ENOMEM;
1811 }
1812
1813 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1814 {
1815         struct rbd_device *rbd_dev = img_request->rbd_dev;
1816         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1817         struct rbd_obj_request *obj_request;
1818         struct rbd_obj_request *next_obj_request;
1819
1820         dout("%s: img %p\n", __func__, img_request);
1821         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1822                 int ret;
1823
1824                 ret = rbd_obj_request_submit(osdc, obj_request);
1825                 if (ret)
1826                         return ret;
1827         }
1828
1829         return 0;
1830 }
1831
1832 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
1833 {
1834         struct rbd_obj_request *obj_request;
1835
1836         rbd_assert(img_request_child_test(img_request));
1837
1838         obj_request = img_request->obj_request;
1839         rbd_assert(obj_request != NULL);
1840         obj_request->result = img_request->result;
1841         obj_request->xferred = img_request->xferred;
1842
1843         rbd_img_obj_request_read_callback(obj_request);
1844         rbd_obj_request_complete(obj_request);
1845 }
1846
1847 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
1848 {
1849         struct rbd_device *rbd_dev;
1850         struct rbd_img_request *img_request;
1851         int result;
1852
1853         rbd_assert(obj_request_img_data_test(obj_request));
1854         rbd_assert(obj_request->img_request != NULL);
1855         rbd_assert(obj_request->result == (s32) -ENOENT);
1856         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1857
1858         rbd_dev = obj_request->img_request->rbd_dev;
1859         rbd_assert(rbd_dev->parent != NULL);
1860         /* rbd_read_finish(obj_request, obj_request->length); */
1861         img_request = rbd_img_request_create(rbd_dev->parent,
1862                                                 obj_request->img_offset,
1863                                                 obj_request->length,
1864                                                 false, true);
1865         result = -ENOMEM;
1866         if (!img_request)
1867                 goto out_err;
1868
1869         rbd_obj_request_get(obj_request);
1870         img_request->obj_request = obj_request;
1871
1872         result = rbd_img_request_fill_bio(img_request, obj_request->bio_list);
1873         if (result)
1874                 goto out_err;
1875
1876         img_request->callback = rbd_img_parent_read_callback;
1877         result = rbd_img_request_submit(img_request);
1878         if (result)
1879                 goto out_err;
1880
1881         return;
1882 out_err:
1883         if (img_request)
1884                 rbd_img_request_put(img_request);
1885         obj_request->result = result;
1886         obj_request->xferred = 0;
1887         obj_request_done_set(obj_request);
1888 }
1889
1890 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1891                                    u64 ver, u64 notify_id)
1892 {
1893         struct rbd_obj_request *obj_request;
1894         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1895         int ret;
1896
1897         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1898                                                         OBJ_REQUEST_NODATA);
1899         if (!obj_request)
1900                 return -ENOMEM;
1901
1902         ret = -ENOMEM;
1903         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1904         if (!obj_request->osd_req)
1905                 goto out;
1906         obj_request->callback = rbd_obj_request_put;
1907
1908         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1909                                         notify_id, ver, 0);
1910         rbd_osd_req_format(obj_request, false);
1911
1912         ret = rbd_obj_request_submit(osdc, obj_request);
1913 out:
1914         if (ret)
1915                 rbd_obj_request_put(obj_request);
1916
1917         return ret;
1918 }
1919
1920 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1921 {
1922         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1923         u64 hver;
1924         int rc;
1925
1926         if (!rbd_dev)
1927                 return;
1928
1929         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1930                 rbd_dev->header_name, (unsigned long long) notify_id,
1931                 (unsigned int) opcode);
1932         rc = rbd_dev_refresh(rbd_dev, &hver);
1933         if (rc)
1934                 rbd_warn(rbd_dev, "got notification but failed to "
1935                            " update snaps: %d\n", rc);
1936
1937         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1938 }
1939
1940 /*
1941  * Request sync osd watch/unwatch.  The value of "start" determines
1942  * whether a watch request is being initiated or torn down.
1943  */
1944 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1945 {
1946         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1947         struct rbd_obj_request *obj_request;
1948         int ret;
1949
1950         rbd_assert(start ^ !!rbd_dev->watch_event);
1951         rbd_assert(start ^ !!rbd_dev->watch_request);
1952
1953         if (start) {
1954                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1955                                                 &rbd_dev->watch_event);
1956                 if (ret < 0)
1957                         return ret;
1958                 rbd_assert(rbd_dev->watch_event != NULL);
1959         }
1960
1961         ret = -ENOMEM;
1962         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1963                                                         OBJ_REQUEST_NODATA);
1964         if (!obj_request)
1965                 goto out_cancel;
1966
1967         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1968         if (!obj_request->osd_req)
1969                 goto out_cancel;
1970
1971         if (start)
1972                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1973         else
1974                 ceph_osdc_unregister_linger_request(osdc,
1975                                         rbd_dev->watch_request->osd_req);
1976
1977         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1978                                 rbd_dev->watch_event->cookie,
1979                                 rbd_dev->header.obj_version, start);
1980         rbd_osd_req_format(obj_request, true);
1981
1982         ret = rbd_obj_request_submit(osdc, obj_request);
1983         if (ret)
1984                 goto out_cancel;
1985         ret = rbd_obj_request_wait(obj_request);
1986         if (ret)
1987                 goto out_cancel;
1988         ret = obj_request->result;
1989         if (ret)
1990                 goto out_cancel;
1991
1992         /*
1993          * A watch request is set to linger, so the underlying osd
1994          * request won't go away until we unregister it.  We retain
1995          * a pointer to the object request during that time (in
1996          * rbd_dev->watch_request), so we'll keep a reference to
1997          * it.  We'll drop that reference (below) after we've
1998          * unregistered it.
1999          */
2000         if (start) {
2001                 rbd_dev->watch_request = obj_request;
2002
2003                 return 0;
2004         }
2005
2006         /* We have successfully torn down the watch request */
2007
2008         rbd_obj_request_put(rbd_dev->watch_request);
2009         rbd_dev->watch_request = NULL;
2010 out_cancel:
2011         /* Cancel the event if we're tearing down, or on error */
2012         ceph_osdc_cancel_event(rbd_dev->watch_event);
2013         rbd_dev->watch_event = NULL;
2014         if (obj_request)
2015                 rbd_obj_request_put(obj_request);
2016
2017         return ret;
2018 }
2019
2020 /*
2021  * Synchronous osd object method call
2022  */
2023 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2024                              const char *object_name,
2025                              const char *class_name,
2026                              const char *method_name,
2027                              const char *outbound,
2028                              size_t outbound_size,
2029                              char *inbound,
2030                              size_t inbound_size,
2031                              u64 *version)
2032 {
2033         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2034         struct rbd_obj_request *obj_request;
2035         struct page **pages;
2036         u32 page_count;
2037         int ret;
2038
2039         /*
2040          * Method calls are ultimately read operations.  The result
2041          * should placed into the inbound buffer provided.  They
2042          * also supply outbound data--parameters for the object
2043          * method.  Currently if this is present it will be a
2044          * snapshot id.
2045          */
2046         page_count = (u32) calc_pages_for(0, inbound_size);
2047         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2048         if (IS_ERR(pages))
2049                 return PTR_ERR(pages);
2050
2051         ret = -ENOMEM;
2052         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2053                                                         OBJ_REQUEST_PAGES);
2054         if (!obj_request)
2055                 goto out;
2056
2057         obj_request->pages = pages;
2058         obj_request->page_count = page_count;
2059
2060         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2061         if (!obj_request->osd_req)
2062                 goto out;
2063
2064         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2065                                         class_name, method_name);
2066         if (outbound_size) {
2067                 struct ceph_pagelist *pagelist;
2068
2069                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2070                 if (!pagelist)
2071                         goto out;
2072
2073                 ceph_pagelist_init(pagelist);
2074                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2075                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2076                                                 pagelist);
2077         }
2078         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2079                                         obj_request->pages, inbound_size,
2080                                         0, false, false);
2081         rbd_osd_req_format(obj_request, false);
2082
2083         ret = rbd_obj_request_submit(osdc, obj_request);
2084         if (ret)
2085                 goto out;
2086         ret = rbd_obj_request_wait(obj_request);
2087         if (ret)
2088                 goto out;
2089
2090         ret = obj_request->result;
2091         if (ret < 0)
2092                 goto out;
2093         ret = 0;
2094         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2095         if (version)
2096                 *version = obj_request->version;
2097 out:
2098         if (obj_request)
2099                 rbd_obj_request_put(obj_request);
2100         else
2101                 ceph_release_page_vector(pages, page_count);
2102
2103         return ret;
2104 }
2105
2106 static void rbd_request_fn(struct request_queue *q)
2107                 __releases(q->queue_lock) __acquires(q->queue_lock)
2108 {
2109         struct rbd_device *rbd_dev = q->queuedata;
2110         bool read_only = rbd_dev->mapping.read_only;
2111         struct request *rq;
2112         int result;
2113
2114         while ((rq = blk_fetch_request(q))) {
2115                 bool write_request = rq_data_dir(rq) == WRITE;
2116                 struct rbd_img_request *img_request;
2117                 u64 offset;
2118                 u64 length;
2119
2120                 /* Ignore any non-FS requests that filter through. */
2121
2122                 if (rq->cmd_type != REQ_TYPE_FS) {
2123                         dout("%s: non-fs request type %d\n", __func__,
2124                                 (int) rq->cmd_type);
2125                         __blk_end_request_all(rq, 0);
2126                         continue;
2127                 }
2128
2129                 /* Ignore/skip any zero-length requests */
2130
2131                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2132                 length = (u64) blk_rq_bytes(rq);
2133
2134                 if (!length) {
2135                         dout("%s: zero-length request\n", __func__);
2136                         __blk_end_request_all(rq, 0);
2137                         continue;
2138                 }
2139
2140                 spin_unlock_irq(q->queue_lock);
2141
2142                 /* Disallow writes to a read-only device */
2143
2144                 if (write_request) {
2145                         result = -EROFS;
2146                         if (read_only)
2147                                 goto end_request;
2148                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2149                 }
2150
2151                 /*
2152                  * Quit early if the mapped snapshot no longer
2153                  * exists.  It's still possible the snapshot will
2154                  * have disappeared by the time our request arrives
2155                  * at the osd, but there's no sense in sending it if
2156                  * we already know.
2157                  */
2158                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2159                         dout("request for non-existent snapshot");
2160                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2161                         result = -ENXIO;
2162                         goto end_request;
2163                 }
2164
2165                 result = -EINVAL;
2166                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2167                         goto end_request;       /* Shouldn't happen */
2168
2169                 result = -ENOMEM;
2170                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2171                                                         write_request, false);
2172                 if (!img_request)
2173                         goto end_request;
2174
2175                 img_request->rq = rq;
2176
2177                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2178                 if (!result)
2179                         result = rbd_img_request_submit(img_request);
2180                 if (result)
2181                         rbd_img_request_put(img_request);
2182 end_request:
2183                 spin_lock_irq(q->queue_lock);
2184                 if (result < 0) {
2185                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2186                                 write_request ? "write" : "read",
2187                                 length, offset, result);
2188
2189                         __blk_end_request_all(rq, result);
2190                 }
2191         }
2192 }
2193
2194 /*
2195  * a queue callback. Makes sure that we don't create a bio that spans across
2196  * multiple osd objects. One exception would be with a single page bios,
2197  * which we handle later at bio_chain_clone_range()
2198  */
2199 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2200                           struct bio_vec *bvec)
2201 {
2202         struct rbd_device *rbd_dev = q->queuedata;
2203         sector_t sector_offset;
2204         sector_t sectors_per_obj;
2205         sector_t obj_sector_offset;
2206         int ret;
2207
2208         /*
2209          * Find how far into its rbd object the partition-relative
2210          * bio start sector is to offset relative to the enclosing
2211          * device.
2212          */
2213         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2214         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2215         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2216
2217         /*
2218          * Compute the number of bytes from that offset to the end
2219          * of the object.  Account for what's already used by the bio.
2220          */
2221         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2222         if (ret > bmd->bi_size)
2223                 ret -= bmd->bi_size;
2224         else
2225                 ret = 0;
2226
2227         /*
2228          * Don't send back more than was asked for.  And if the bio
2229          * was empty, let the whole thing through because:  "Note
2230          * that a block device *must* allow a single page to be
2231          * added to an empty bio."
2232          */
2233         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2234         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2235                 ret = (int) bvec->bv_len;
2236
2237         return ret;
2238 }
2239
2240 static void rbd_free_disk(struct rbd_device *rbd_dev)
2241 {
2242         struct gendisk *disk = rbd_dev->disk;
2243
2244         if (!disk)
2245                 return;
2246
2247         if (disk->flags & GENHD_FL_UP)
2248                 del_gendisk(disk);
2249         if (disk->queue)
2250                 blk_cleanup_queue(disk->queue);
2251         put_disk(disk);
2252 }
2253
2254 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2255                                 const char *object_name,
2256                                 u64 offset, u64 length,
2257                                 char *buf, u64 *version)
2258
2259 {
2260         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2261         struct rbd_obj_request *obj_request;
2262         struct page **pages = NULL;
2263         u32 page_count;
2264         size_t size;
2265         int ret;
2266
2267         page_count = (u32) calc_pages_for(offset, length);
2268         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2269         if (IS_ERR(pages))
2270                 ret = PTR_ERR(pages);
2271
2272         ret = -ENOMEM;
2273         obj_request = rbd_obj_request_create(object_name, offset, length,
2274                                                         OBJ_REQUEST_PAGES);
2275         if (!obj_request)
2276                 goto out;
2277
2278         obj_request->pages = pages;
2279         obj_request->page_count = page_count;
2280
2281         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2282         if (!obj_request->osd_req)
2283                 goto out;
2284
2285         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2286                                         offset, length, 0, 0);
2287         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2288                                         obj_request->pages,
2289                                         obj_request->length,
2290                                         obj_request->offset & ~PAGE_MASK,
2291                                         false, false);
2292         rbd_osd_req_format(obj_request, false);
2293
2294         ret = rbd_obj_request_submit(osdc, obj_request);
2295         if (ret)
2296                 goto out;
2297         ret = rbd_obj_request_wait(obj_request);
2298         if (ret)
2299                 goto out;
2300
2301         ret = obj_request->result;
2302         if (ret < 0)
2303                 goto out;
2304
2305         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2306         size = (size_t) obj_request->xferred;
2307         ceph_copy_from_page_vector(pages, buf, 0, size);
2308         rbd_assert(size <= (size_t) INT_MAX);
2309         ret = (int) size;
2310         if (version)
2311                 *version = obj_request->version;
2312 out:
2313         if (obj_request)
2314                 rbd_obj_request_put(obj_request);
2315         else
2316                 ceph_release_page_vector(pages, page_count);
2317
2318         return ret;
2319 }
2320
2321 /*
2322  * Read the complete header for the given rbd device.
2323  *
2324  * Returns a pointer to a dynamically-allocated buffer containing
2325  * the complete and validated header.  Caller can pass the address
2326  * of a variable that will be filled in with the version of the
2327  * header object at the time it was read.
2328  *
2329  * Returns a pointer-coded errno if a failure occurs.
2330  */
2331 static struct rbd_image_header_ondisk *
2332 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2333 {
2334         struct rbd_image_header_ondisk *ondisk = NULL;
2335         u32 snap_count = 0;
2336         u64 names_size = 0;
2337         u32 want_count;
2338         int ret;
2339
2340         /*
2341          * The complete header will include an array of its 64-bit
2342          * snapshot ids, followed by the names of those snapshots as
2343          * a contiguous block of NUL-terminated strings.  Note that
2344          * the number of snapshots could change by the time we read
2345          * it in, in which case we re-read it.
2346          */
2347         do {
2348                 size_t size;
2349
2350                 kfree(ondisk);
2351
2352                 size = sizeof (*ondisk);
2353                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2354                 size += names_size;
2355                 ondisk = kmalloc(size, GFP_KERNEL);
2356                 if (!ondisk)
2357                         return ERR_PTR(-ENOMEM);
2358
2359                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2360                                        0, size,
2361                                        (char *) ondisk, version);
2362                 if (ret < 0)
2363                         goto out_err;
2364                 if (WARN_ON((size_t) ret < size)) {
2365                         ret = -ENXIO;
2366                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2367                                 size, ret);
2368                         goto out_err;
2369                 }
2370                 if (!rbd_dev_ondisk_valid(ondisk)) {
2371                         ret = -ENXIO;
2372                         rbd_warn(rbd_dev, "invalid header");
2373                         goto out_err;
2374                 }
2375
2376                 names_size = le64_to_cpu(ondisk->snap_names_len);
2377                 want_count = snap_count;
2378                 snap_count = le32_to_cpu(ondisk->snap_count);
2379         } while (snap_count != want_count);
2380
2381         return ondisk;
2382
2383 out_err:
2384         kfree(ondisk);
2385
2386         return ERR_PTR(ret);
2387 }
2388
2389 /*
2390  * reload the ondisk the header
2391  */
2392 static int rbd_read_header(struct rbd_device *rbd_dev,
2393                            struct rbd_image_header *header)
2394 {
2395         struct rbd_image_header_ondisk *ondisk;
2396         u64 ver = 0;
2397         int ret;
2398
2399         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2400         if (IS_ERR(ondisk))
2401                 return PTR_ERR(ondisk);
2402         ret = rbd_header_from_disk(header, ondisk);
2403         if (ret >= 0)
2404                 header->obj_version = ver;
2405         kfree(ondisk);
2406
2407         return ret;
2408 }
2409
2410 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2411 {
2412         struct rbd_snap *snap;
2413         struct rbd_snap *next;
2414
2415         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2416                 rbd_remove_snap_dev(snap);
2417 }
2418
2419 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2420 {
2421         sector_t size;
2422
2423         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2424                 return;
2425
2426         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2427         dout("setting size to %llu sectors", (unsigned long long) size);
2428         rbd_dev->mapping.size = (u64) size;
2429         set_capacity(rbd_dev->disk, size);
2430 }
2431
2432 /*
2433  * only read the first part of the ondisk header, without the snaps info
2434  */
2435 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2436 {
2437         int ret;
2438         struct rbd_image_header h;
2439
2440         ret = rbd_read_header(rbd_dev, &h);
2441         if (ret < 0)
2442                 return ret;
2443
2444         down_write(&rbd_dev->header_rwsem);
2445
2446         /* Update image size, and check for resize of mapped image */
2447         rbd_dev->header.image_size = h.image_size;
2448         rbd_update_mapping_size(rbd_dev);
2449
2450         /* rbd_dev->header.object_prefix shouldn't change */
2451         kfree(rbd_dev->header.snap_sizes);
2452         kfree(rbd_dev->header.snap_names);
2453         /* osd requests may still refer to snapc */
2454         ceph_put_snap_context(rbd_dev->header.snapc);
2455
2456         if (hver)
2457                 *hver = h.obj_version;
2458         rbd_dev->header.obj_version = h.obj_version;
2459         rbd_dev->header.image_size = h.image_size;
2460         rbd_dev->header.snapc = h.snapc;
2461         rbd_dev->header.snap_names = h.snap_names;
2462         rbd_dev->header.snap_sizes = h.snap_sizes;
2463         /* Free the extra copy of the object prefix */
2464         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2465         kfree(h.object_prefix);
2466
2467         ret = rbd_dev_snaps_update(rbd_dev);
2468         if (!ret)
2469                 ret = rbd_dev_snaps_register(rbd_dev);
2470
2471         up_write(&rbd_dev->header_rwsem);
2472
2473         return ret;
2474 }
2475
2476 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2477 {
2478         int ret;
2479
2480         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2481         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2482         if (rbd_dev->image_format == 1)
2483                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2484         else
2485                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2486         mutex_unlock(&ctl_mutex);
2487
2488         return ret;
2489 }
2490
2491 static int rbd_init_disk(struct rbd_device *rbd_dev)
2492 {
2493         struct gendisk *disk;
2494         struct request_queue *q;
2495         u64 segment_size;
2496
2497         /* create gendisk info */
2498         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2499         if (!disk)
2500                 return -ENOMEM;
2501
2502         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2503                  rbd_dev->dev_id);
2504         disk->major = rbd_dev->major;
2505         disk->first_minor = 0;
2506         disk->fops = &rbd_bd_ops;
2507         disk->private_data = rbd_dev;
2508
2509         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2510         if (!q)
2511                 goto out_disk;
2512
2513         /* We use the default size, but let's be explicit about it. */
2514         blk_queue_physical_block_size(q, SECTOR_SIZE);
2515
2516         /* set io sizes to object size */
2517         segment_size = rbd_obj_bytes(&rbd_dev->header);
2518         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2519         blk_queue_max_segment_size(q, segment_size);
2520         blk_queue_io_min(q, segment_size);
2521         blk_queue_io_opt(q, segment_size);
2522
2523         blk_queue_merge_bvec(q, rbd_merge_bvec);
2524         disk->queue = q;
2525
2526         q->queuedata = rbd_dev;
2527
2528         rbd_dev->disk = disk;
2529
2530         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2531
2532         return 0;
2533 out_disk:
2534         put_disk(disk);
2535
2536         return -ENOMEM;
2537 }
2538
2539 /*
2540   sysfs
2541 */
2542
2543 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2544 {
2545         return container_of(dev, struct rbd_device, dev);
2546 }
2547
2548 static ssize_t rbd_size_show(struct device *dev,
2549                              struct device_attribute *attr, char *buf)
2550 {
2551         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2552         sector_t size;
2553
2554         down_read(&rbd_dev->header_rwsem);
2555         size = get_capacity(rbd_dev->disk);
2556         up_read(&rbd_dev->header_rwsem);
2557
2558         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2559 }
2560
2561 /*
2562  * Note this shows the features for whatever's mapped, which is not
2563  * necessarily the base image.
2564  */
2565 static ssize_t rbd_features_show(struct device *dev,
2566                              struct device_attribute *attr, char *buf)
2567 {
2568         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2569
2570         return sprintf(buf, "0x%016llx\n",
2571                         (unsigned long long) rbd_dev->mapping.features);
2572 }
2573
2574 static ssize_t rbd_major_show(struct device *dev,
2575                               struct device_attribute *attr, char *buf)
2576 {
2577         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2578
2579         return sprintf(buf, "%d\n", rbd_dev->major);
2580 }
2581
2582 static ssize_t rbd_client_id_show(struct device *dev,
2583                                   struct device_attribute *attr, char *buf)
2584 {
2585         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2586
2587         return sprintf(buf, "client%lld\n",
2588                         ceph_client_id(rbd_dev->rbd_client->client));
2589 }
2590
2591 static ssize_t rbd_pool_show(struct device *dev,
2592                              struct device_attribute *attr, char *buf)
2593 {
2594         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2595
2596         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2597 }
2598
2599 static ssize_t rbd_pool_id_show(struct device *dev,
2600                              struct device_attribute *attr, char *buf)
2601 {
2602         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2603
2604         return sprintf(buf, "%llu\n",
2605                 (unsigned long long) rbd_dev->spec->pool_id);
2606 }
2607
2608 static ssize_t rbd_name_show(struct device *dev,
2609                              struct device_attribute *attr, char *buf)
2610 {
2611         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2612
2613         if (rbd_dev->spec->image_name)
2614                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2615
2616         return sprintf(buf, "(unknown)\n");
2617 }
2618
2619 static ssize_t rbd_image_id_show(struct device *dev,
2620                              struct device_attribute *attr, char *buf)
2621 {
2622         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2623
2624         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2625 }
2626
2627 /*
2628  * Shows the name of the currently-mapped snapshot (or
2629  * RBD_SNAP_HEAD_NAME for the base image).
2630  */
2631 static ssize_t rbd_snap_show(struct device *dev,
2632                              struct device_attribute *attr,
2633                              char *buf)
2634 {
2635         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2636
2637         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2638 }
2639
2640 /*
2641  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2642  * for the parent image.  If there is no parent, simply shows
2643  * "(no parent image)".
2644  */
2645 static ssize_t rbd_parent_show(struct device *dev,
2646                              struct device_attribute *attr,
2647                              char *buf)
2648 {
2649         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2650         struct rbd_spec *spec = rbd_dev->parent_spec;
2651         int count;
2652         char *bufp = buf;
2653
2654         if (!spec)
2655                 return sprintf(buf, "(no parent image)\n");
2656
2657         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2658                         (unsigned long long) spec->pool_id, spec->pool_name);
2659         if (count < 0)
2660                 return count;
2661         bufp += count;
2662
2663         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2664                         spec->image_name ? spec->image_name : "(unknown)");
2665         if (count < 0)
2666                 return count;
2667         bufp += count;
2668
2669         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2670                         (unsigned long long) spec->snap_id, spec->snap_name);
2671         if (count < 0)
2672                 return count;
2673         bufp += count;
2674
2675         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2676         if (count < 0)
2677                 return count;
2678         bufp += count;
2679
2680         return (ssize_t) (bufp - buf);
2681 }
2682
2683 static ssize_t rbd_image_refresh(struct device *dev,
2684                                  struct device_attribute *attr,
2685                                  const char *buf,
2686                                  size_t size)
2687 {
2688         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2689         int ret;
2690
2691         ret = rbd_dev_refresh(rbd_dev, NULL);
2692
2693         return ret < 0 ? ret : size;
2694 }
2695
2696 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2697 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2698 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2699 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2700 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2701 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2702 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2703 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2704 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2705 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2706 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2707
2708 static struct attribute *rbd_attrs[] = {
2709         &dev_attr_size.attr,
2710         &dev_attr_features.attr,
2711         &dev_attr_major.attr,
2712         &dev_attr_client_id.attr,
2713         &dev_attr_pool.attr,
2714         &dev_attr_pool_id.attr,
2715         &dev_attr_name.attr,
2716         &dev_attr_image_id.attr,
2717         &dev_attr_current_snap.attr,
2718         &dev_attr_parent.attr,
2719         &dev_attr_refresh.attr,
2720         NULL
2721 };
2722
2723 static struct attribute_group rbd_attr_group = {
2724         .attrs = rbd_attrs,
2725 };
2726
2727 static const struct attribute_group *rbd_attr_groups[] = {
2728         &rbd_attr_group,
2729         NULL
2730 };
2731
2732 static void rbd_sysfs_dev_release(struct device *dev)
2733 {
2734 }
2735
2736 static struct device_type rbd_device_type = {
2737         .name           = "rbd",
2738         .groups         = rbd_attr_groups,
2739         .release        = rbd_sysfs_dev_release,
2740 };
2741
2742
2743 /*
2744   sysfs - snapshots
2745 */
2746
2747 static ssize_t rbd_snap_size_show(struct device *dev,
2748                                   struct device_attribute *attr,
2749                                   char *buf)
2750 {
2751         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2752
2753         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2754 }
2755
2756 static ssize_t rbd_snap_id_show(struct device *dev,
2757                                 struct device_attribute *attr,
2758                                 char *buf)
2759 {
2760         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2761
2762         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2763 }
2764
2765 static ssize_t rbd_snap_features_show(struct device *dev,
2766                                 struct device_attribute *attr,
2767                                 char *buf)
2768 {
2769         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2770
2771         return sprintf(buf, "0x%016llx\n",
2772                         (unsigned long long) snap->features);
2773 }
2774
2775 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2776 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2777 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2778
2779 static struct attribute *rbd_snap_attrs[] = {
2780         &dev_attr_snap_size.attr,
2781         &dev_attr_snap_id.attr,
2782         &dev_attr_snap_features.attr,
2783         NULL,
2784 };
2785
2786 static struct attribute_group rbd_snap_attr_group = {
2787         .attrs = rbd_snap_attrs,
2788 };
2789
2790 static void rbd_snap_dev_release(struct device *dev)
2791 {
2792         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2793         kfree(snap->name);
2794         kfree(snap);
2795 }
2796
2797 static const struct attribute_group *rbd_snap_attr_groups[] = {
2798         &rbd_snap_attr_group,
2799         NULL
2800 };
2801
2802 static struct device_type rbd_snap_device_type = {
2803         .groups         = rbd_snap_attr_groups,
2804         .release        = rbd_snap_dev_release,
2805 };
2806
2807 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2808 {
2809         kref_get(&spec->kref);
2810
2811         return spec;
2812 }
2813
2814 static void rbd_spec_free(struct kref *kref);
2815 static void rbd_spec_put(struct rbd_spec *spec)
2816 {
2817         if (spec)
2818                 kref_put(&spec->kref, rbd_spec_free);
2819 }
2820
2821 static struct rbd_spec *rbd_spec_alloc(void)
2822 {
2823         struct rbd_spec *spec;
2824
2825         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2826         if (!spec)
2827                 return NULL;
2828         kref_init(&spec->kref);
2829
2830         return spec;
2831 }
2832
2833 static void rbd_spec_free(struct kref *kref)
2834 {
2835         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2836
2837         kfree(spec->pool_name);
2838         kfree(spec->image_id);
2839         kfree(spec->image_name);
2840         kfree(spec->snap_name);
2841         kfree(spec);
2842 }
2843
2844 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2845                                 struct rbd_spec *spec)
2846 {
2847         struct rbd_device *rbd_dev;
2848
2849         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2850         if (!rbd_dev)
2851                 return NULL;
2852
2853         spin_lock_init(&rbd_dev->lock);
2854         rbd_dev->flags = 0;
2855         INIT_LIST_HEAD(&rbd_dev->node);
2856         INIT_LIST_HEAD(&rbd_dev->snaps);
2857         init_rwsem(&rbd_dev->header_rwsem);
2858
2859         rbd_dev->spec = spec;
2860         rbd_dev->rbd_client = rbdc;
2861
2862         /* Initialize the layout used for all rbd requests */
2863
2864         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2865         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2866         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2867         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2868
2869         return rbd_dev;
2870 }
2871
2872 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2873 {
2874         rbd_spec_put(rbd_dev->parent_spec);
2875         kfree(rbd_dev->header_name);
2876         rbd_put_client(rbd_dev->rbd_client);
2877         rbd_spec_put(rbd_dev->spec);
2878         kfree(rbd_dev);
2879 }
2880
2881 static bool rbd_snap_registered(struct rbd_snap *snap)
2882 {
2883         bool ret = snap->dev.type == &rbd_snap_device_type;
2884         bool reg = device_is_registered(&snap->dev);
2885
2886         rbd_assert(!ret ^ reg);
2887
2888         return ret;
2889 }
2890
2891 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2892 {
2893         list_del(&snap->node);
2894         if (device_is_registered(&snap->dev))
2895                 device_unregister(&snap->dev);
2896 }
2897
2898 static int rbd_register_snap_dev(struct rbd_snap *snap,
2899                                   struct device *parent)
2900 {
2901         struct device *dev = &snap->dev;
2902         int ret;
2903
2904         dev->type = &rbd_snap_device_type;
2905         dev->parent = parent;
2906         dev->release = rbd_snap_dev_release;
2907         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2908         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2909
2910         ret = device_register(dev);
2911
2912         return ret;
2913 }
2914
2915 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2916                                                 const char *snap_name,
2917                                                 u64 snap_id, u64 snap_size,
2918                                                 u64 snap_features)
2919 {
2920         struct rbd_snap *snap;
2921         int ret;
2922
2923         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2924         if (!snap)
2925                 return ERR_PTR(-ENOMEM);
2926
2927         ret = -ENOMEM;
2928         snap->name = kstrdup(snap_name, GFP_KERNEL);
2929         if (!snap->name)
2930                 goto err;
2931
2932         snap->id = snap_id;
2933         snap->size = snap_size;
2934         snap->features = snap_features;
2935
2936         return snap;
2937
2938 err:
2939         kfree(snap->name);
2940         kfree(snap);
2941
2942         return ERR_PTR(ret);
2943 }
2944
2945 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2946                 u64 *snap_size, u64 *snap_features)
2947 {
2948         char *snap_name;
2949
2950         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2951
2952         *snap_size = rbd_dev->header.snap_sizes[which];
2953         *snap_features = 0;     /* No features for v1 */
2954
2955         /* Skip over names until we find the one we are looking for */
2956
2957         snap_name = rbd_dev->header.snap_names;
2958         while (which--)
2959                 snap_name += strlen(snap_name) + 1;
2960
2961         return snap_name;
2962 }
2963
2964 /*
2965  * Get the size and object order for an image snapshot, or if
2966  * snap_id is CEPH_NOSNAP, gets this information for the base
2967  * image.
2968  */
2969 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2970                                 u8 *order, u64 *snap_size)
2971 {
2972         __le64 snapid = cpu_to_le64(snap_id);
2973         int ret;
2974         struct {
2975                 u8 order;
2976                 __le64 size;
2977         } __attribute__ ((packed)) size_buf = { 0 };
2978
2979         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2980                                 "rbd", "get_size",
2981                                 (char *) &snapid, sizeof (snapid),
2982                                 (char *) &size_buf, sizeof (size_buf), NULL);
2983         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2984         if (ret < 0)
2985                 return ret;
2986
2987         *order = size_buf.order;
2988         *snap_size = le64_to_cpu(size_buf.size);
2989
2990         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2991                 (unsigned long long) snap_id, (unsigned int) *order,
2992                 (unsigned long long) *snap_size);
2993
2994         return 0;
2995 }
2996
2997 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2998 {
2999         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3000                                         &rbd_dev->header.obj_order,
3001                                         &rbd_dev->header.image_size);
3002 }
3003
3004 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3005 {
3006         void *reply_buf;
3007         int ret;
3008         void *p;
3009
3010         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3011         if (!reply_buf)
3012                 return -ENOMEM;
3013
3014         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3015                                 "rbd", "get_object_prefix",
3016                                 NULL, 0,
3017                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3018         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3019         if (ret < 0)
3020                 goto out;
3021
3022         p = reply_buf;
3023         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3024                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
3025                                                 NULL, GFP_NOIO);
3026
3027         if (IS_ERR(rbd_dev->header.object_prefix)) {
3028                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3029                 rbd_dev->header.object_prefix = NULL;
3030         } else {
3031                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3032         }
3033
3034 out:
3035         kfree(reply_buf);
3036
3037         return ret;
3038 }
3039
3040 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3041                 u64 *snap_features)
3042 {
3043         __le64 snapid = cpu_to_le64(snap_id);
3044         struct {
3045                 __le64 features;
3046                 __le64 incompat;
3047         } features_buf = { 0 };
3048         u64 incompat;
3049         int ret;
3050
3051         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3052                                 "rbd", "get_features",
3053                                 (char *) &snapid, sizeof (snapid),
3054                                 (char *) &features_buf, sizeof (features_buf),
3055                                 NULL);
3056         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3057         if (ret < 0)
3058                 return ret;
3059
3060         incompat = le64_to_cpu(features_buf.incompat);
3061         if (incompat & ~RBD_FEATURES_SUPPORTED)
3062                 return -ENXIO;
3063
3064         *snap_features = le64_to_cpu(features_buf.features);
3065
3066         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3067                 (unsigned long long) snap_id,
3068                 (unsigned long long) *snap_features,
3069                 (unsigned long long) le64_to_cpu(features_buf.incompat));
3070
3071         return 0;
3072 }
3073
3074 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3075 {
3076         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3077                                                 &rbd_dev->header.features);
3078 }
3079
3080 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3081 {
3082         struct rbd_spec *parent_spec;
3083         size_t size;
3084         void *reply_buf = NULL;
3085         __le64 snapid;
3086         void *p;
3087         void *end;
3088         char *image_id;
3089         u64 overlap;
3090         int ret;
3091
3092         parent_spec = rbd_spec_alloc();
3093         if (!parent_spec)
3094                 return -ENOMEM;
3095
3096         size = sizeof (__le64) +                                /* pool_id */
3097                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3098                 sizeof (__le64) +                               /* snap_id */
3099                 sizeof (__le64);                                /* overlap */
3100         reply_buf = kmalloc(size, GFP_KERNEL);
3101         if (!reply_buf) {
3102                 ret = -ENOMEM;
3103                 goto out_err;
3104         }
3105
3106         snapid = cpu_to_le64(CEPH_NOSNAP);
3107         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3108                                 "rbd", "get_parent",
3109                                 (char *) &snapid, sizeof (snapid),
3110                                 (char *) reply_buf, size, NULL);
3111         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3112         if (ret < 0)
3113                 goto out_err;
3114
3115         ret = -ERANGE;
3116         p = reply_buf;
3117         end = (char *) reply_buf + size;
3118         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3119         if (parent_spec->pool_id == CEPH_NOPOOL)
3120                 goto out;       /* No parent?  No problem. */
3121
3122         /* The ceph file layout needs to fit pool id in 32 bits */
3123
3124         ret = -EIO;
3125         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3126                 goto out;
3127
3128         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3129         if (IS_ERR(image_id)) {
3130                 ret = PTR_ERR(image_id);
3131                 goto out_err;
3132         }
3133         parent_spec->image_id = image_id;
3134         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3135         ceph_decode_64_safe(&p, end, overlap, out_err);
3136
3137         rbd_dev->parent_overlap = overlap;
3138         rbd_dev->parent_spec = parent_spec;
3139         parent_spec = NULL;     /* rbd_dev now owns this */
3140 out:
3141         ret = 0;
3142 out_err:
3143         kfree(reply_buf);
3144         rbd_spec_put(parent_spec);
3145
3146         return ret;
3147 }
3148
3149 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3150 {
3151         size_t image_id_size;
3152         char *image_id;
3153         void *p;
3154         void *end;
3155         size_t size;
3156         void *reply_buf = NULL;
3157         size_t len = 0;
3158         char *image_name = NULL;
3159         int ret;
3160
3161         rbd_assert(!rbd_dev->spec->image_name);
3162
3163         len = strlen(rbd_dev->spec->image_id);
3164         image_id_size = sizeof (__le32) + len;
3165         image_id = kmalloc(image_id_size, GFP_KERNEL);
3166         if (!image_id)
3167                 return NULL;
3168
3169         p = image_id;
3170         end = (char *) image_id + image_id_size;
3171         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3172
3173         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3174         reply_buf = kmalloc(size, GFP_KERNEL);
3175         if (!reply_buf)
3176                 goto out;
3177
3178         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3179                                 "rbd", "dir_get_name",
3180                                 image_id, image_id_size,
3181                                 (char *) reply_buf, size, NULL);
3182         if (ret < 0)
3183                 goto out;
3184         p = reply_buf;
3185         end = (char *) reply_buf + size;
3186         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3187         if (IS_ERR(image_name))
3188                 image_name = NULL;
3189         else
3190                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3191 out:
3192         kfree(reply_buf);
3193         kfree(image_id);
3194
3195         return image_name;
3196 }
3197
3198 /*
3199  * When a parent image gets probed, we only have the pool, image,
3200  * and snapshot ids but not the names of any of them.  This call
3201  * is made later to fill in those names.  It has to be done after
3202  * rbd_dev_snaps_update() has completed because some of the
3203  * information (in particular, snapshot name) is not available
3204  * until then.
3205  */
3206 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3207 {
3208         struct ceph_osd_client *osdc;
3209         const char *name;
3210         void *reply_buf = NULL;
3211         int ret;
3212
3213         if (rbd_dev->spec->pool_name)
3214                 return 0;       /* Already have the names */
3215
3216         /* Look up the pool name */
3217
3218         osdc = &rbd_dev->rbd_client->client->osdc;
3219         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3220         if (!name) {
3221                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3222                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3223                 return -EIO;
3224         }
3225
3226         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3227         if (!rbd_dev->spec->pool_name)
3228                 return -ENOMEM;
3229
3230         /* Fetch the image name; tolerate failure here */
3231
3232         name = rbd_dev_image_name(rbd_dev);
3233         if (name)
3234                 rbd_dev->spec->image_name = (char *) name;
3235         else
3236                 rbd_warn(rbd_dev, "unable to get image name");
3237
3238         /* Look up the snapshot name. */
3239
3240         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3241         if (!name) {
3242                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3243                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3244                 ret = -EIO;
3245                 goto out_err;
3246         }
3247         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3248         if(!rbd_dev->spec->snap_name)
3249                 goto out_err;
3250
3251         return 0;
3252 out_err:
3253         kfree(reply_buf);
3254         kfree(rbd_dev->spec->pool_name);
3255         rbd_dev->spec->pool_name = NULL;
3256
3257         return ret;
3258 }
3259
3260 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3261 {
3262         size_t size;
3263         int ret;
3264         void *reply_buf;
3265         void *p;
3266         void *end;
3267         u64 seq;
3268         u32 snap_count;
3269         struct ceph_snap_context *snapc;
3270         u32 i;
3271
3272         /*
3273          * We'll need room for the seq value (maximum snapshot id),
3274          * snapshot count, and array of that many snapshot ids.
3275          * For now we have a fixed upper limit on the number we're
3276          * prepared to receive.
3277          */
3278         size = sizeof (__le64) + sizeof (__le32) +
3279                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3280         reply_buf = kzalloc(size, GFP_KERNEL);
3281         if (!reply_buf)
3282                 return -ENOMEM;
3283
3284         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3285                                 "rbd", "get_snapcontext",
3286                                 NULL, 0,
3287                                 reply_buf, size, ver);
3288         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3289         if (ret < 0)
3290                 goto out;
3291
3292         ret = -ERANGE;
3293         p = reply_buf;
3294         end = (char *) reply_buf + size;
3295         ceph_decode_64_safe(&p, end, seq, out);
3296         ceph_decode_32_safe(&p, end, snap_count, out);
3297
3298         /*
3299          * Make sure the reported number of snapshot ids wouldn't go
3300          * beyond the end of our buffer.  But before checking that,
3301          * make sure the computed size of the snapshot context we
3302          * allocate is representable in a size_t.
3303          */
3304         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3305                                  / sizeof (u64)) {
3306                 ret = -EINVAL;
3307                 goto out;
3308         }
3309         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3310                 goto out;
3311
3312         size = sizeof (struct ceph_snap_context) +
3313                                 snap_count * sizeof (snapc->snaps[0]);
3314         snapc = kmalloc(size, GFP_KERNEL);
3315         if (!snapc) {
3316                 ret = -ENOMEM;
3317                 goto out;
3318         }
3319
3320         atomic_set(&snapc->nref, 1);
3321         snapc->seq = seq;
3322         snapc->num_snaps = snap_count;
3323         for (i = 0; i < snap_count; i++)
3324                 snapc->snaps[i] = ceph_decode_64(&p);
3325
3326         rbd_dev->header.snapc = snapc;
3327
3328         dout("  snap context seq = %llu, snap_count = %u\n",
3329                 (unsigned long long) seq, (unsigned int) snap_count);
3330
3331 out:
3332         kfree(reply_buf);
3333
3334         return 0;
3335 }
3336
3337 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3338 {
3339         size_t size;
3340         void *reply_buf;
3341         __le64 snap_id;
3342         int ret;
3343         void *p;
3344         void *end;
3345         char *snap_name;
3346
3347         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3348         reply_buf = kmalloc(size, GFP_KERNEL);
3349         if (!reply_buf)
3350                 return ERR_PTR(-ENOMEM);
3351
3352         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3353         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3354                                 "rbd", "get_snapshot_name",
3355                                 (char *) &snap_id, sizeof (snap_id),
3356                                 reply_buf, size, NULL);
3357         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3358         if (ret < 0)
3359                 goto out;
3360
3361         p = reply_buf;
3362         end = (char *) reply_buf + size;
3363         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3364         if (IS_ERR(snap_name)) {
3365                 ret = PTR_ERR(snap_name);
3366                 goto out;
3367         } else {
3368                 dout("  snap_id 0x%016llx snap_name = %s\n",
3369                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3370         }
3371         kfree(reply_buf);
3372
3373         return snap_name;
3374 out:
3375         kfree(reply_buf);
3376
3377         return ERR_PTR(ret);
3378 }
3379
3380 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3381                 u64 *snap_size, u64 *snap_features)
3382 {
3383         u64 snap_id;
3384         u8 order;
3385         int ret;
3386
3387         snap_id = rbd_dev->header.snapc->snaps[which];
3388         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3389         if (ret)
3390                 return ERR_PTR(ret);
3391         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3392         if (ret)
3393                 return ERR_PTR(ret);
3394
3395         return rbd_dev_v2_snap_name(rbd_dev, which);
3396 }
3397
3398 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3399                 u64 *snap_size, u64 *snap_features)
3400 {
3401         if (rbd_dev->image_format == 1)
3402                 return rbd_dev_v1_snap_info(rbd_dev, which,
3403                                         snap_size, snap_features);
3404         if (rbd_dev->image_format == 2)
3405                 return rbd_dev_v2_snap_info(rbd_dev, which,
3406                                         snap_size, snap_features);
3407         return ERR_PTR(-EINVAL);
3408 }
3409
3410 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3411 {
3412         int ret;
3413         __u8 obj_order;
3414
3415         down_write(&rbd_dev->header_rwsem);
3416
3417         /* Grab old order first, to see if it changes */
3418
3419         obj_order = rbd_dev->header.obj_order,
3420         ret = rbd_dev_v2_image_size(rbd_dev);
3421         if (ret)
3422                 goto out;
3423         if (rbd_dev->header.obj_order != obj_order) {
3424                 ret = -EIO;
3425                 goto out;
3426         }
3427         rbd_update_mapping_size(rbd_dev);
3428
3429         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3430         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3431         if (ret)
3432                 goto out;
3433         ret = rbd_dev_snaps_update(rbd_dev);
3434         dout("rbd_dev_snaps_update returned %d\n", ret);
3435         if (ret)
3436                 goto out;
3437         ret = rbd_dev_snaps_register(rbd_dev);
3438         dout("rbd_dev_snaps_register returned %d\n", ret);
3439 out:
3440         up_write(&rbd_dev->header_rwsem);
3441
3442         return ret;
3443 }
3444
3445 /*
3446  * Scan the rbd device's current snapshot list and compare it to the
3447  * newly-received snapshot context.  Remove any existing snapshots
3448  * not present in the new snapshot context.  Add a new snapshot for
3449  * any snaphots in the snapshot context not in the current list.
3450  * And verify there are no changes to snapshots we already know
3451  * about.
3452  *
3453  * Assumes the snapshots in the snapshot context are sorted by
3454  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3455  * are also maintained in that order.)
3456  */
3457 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3458 {
3459         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3460         const u32 snap_count = snapc->num_snaps;
3461         struct list_head *head = &rbd_dev->snaps;
3462         struct list_head *links = head->next;
3463         u32 index = 0;
3464
3465         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3466         while (index < snap_count || links != head) {
3467                 u64 snap_id;
3468                 struct rbd_snap *snap;
3469                 char *snap_name;
3470                 u64 snap_size = 0;
3471                 u64 snap_features = 0;
3472
3473                 snap_id = index < snap_count ? snapc->snaps[index]
3474                                              : CEPH_NOSNAP;
3475                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3476                                      : NULL;
3477                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3478
3479                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3480                         struct list_head *next = links->next;
3481
3482                         /*
3483                          * A previously-existing snapshot is not in
3484                          * the new snap context.
3485                          *
3486                          * If the now missing snapshot is the one the
3487                          * image is mapped to, clear its exists flag
3488                          * so we can avoid sending any more requests
3489                          * to it.
3490                          */
3491                         if (rbd_dev->spec->snap_id == snap->id)
3492                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3493                         rbd_remove_snap_dev(snap);
3494                         dout("%ssnap id %llu has been removed\n",
3495                                 rbd_dev->spec->snap_id == snap->id ?
3496                                                         "mapped " : "",
3497                                 (unsigned long long) snap->id);
3498
3499                         /* Done with this list entry; advance */
3500
3501                         links = next;
3502                         continue;
3503                 }
3504
3505                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3506                                         &snap_size, &snap_features);
3507                 if (IS_ERR(snap_name))
3508                         return PTR_ERR(snap_name);
3509
3510                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3511                         (unsigned long long) snap_id);
3512                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3513                         struct rbd_snap *new_snap;
3514
3515                         /* We haven't seen this snapshot before */
3516
3517                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3518                                         snap_id, snap_size, snap_features);
3519                         if (IS_ERR(new_snap)) {
3520                                 int err = PTR_ERR(new_snap);
3521
3522                                 dout("  failed to add dev, error %d\n", err);
3523
3524                                 return err;
3525                         }
3526
3527                         /* New goes before existing, or at end of list */
3528
3529                         dout("  added dev%s\n", snap ? "" : " at end\n");
3530                         if (snap)
3531                                 list_add_tail(&new_snap->node, &snap->node);
3532                         else
3533                                 list_add_tail(&new_snap->node, head);
3534                 } else {
3535                         /* Already have this one */
3536
3537                         dout("  already present\n");
3538
3539                         rbd_assert(snap->size == snap_size);
3540                         rbd_assert(!strcmp(snap->name, snap_name));
3541                         rbd_assert(snap->features == snap_features);
3542
3543                         /* Done with this list entry; advance */
3544
3545                         links = links->next;
3546                 }
3547
3548                 /* Advance to the next entry in the snapshot context */
3549
3550                 index++;
3551         }
3552         dout("%s: done\n", __func__);
3553
3554         return 0;
3555 }
3556
3557 /*
3558  * Scan the list of snapshots and register the devices for any that
3559  * have not already been registered.
3560  */
3561 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3562 {
3563         struct rbd_snap *snap;
3564         int ret = 0;
3565
3566         dout("%s:\n", __func__);
3567         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3568                 return -EIO;
3569
3570         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3571                 if (!rbd_snap_registered(snap)) {
3572                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3573                         if (ret < 0)
3574                                 break;
3575                 }
3576         }
3577         dout("%s: returning %d\n", __func__, ret);
3578
3579         return ret;
3580 }
3581
3582 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3583 {
3584         struct device *dev;
3585         int ret;
3586
3587         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3588
3589         dev = &rbd_dev->dev;
3590         dev->bus = &rbd_bus_type;
3591         dev->type = &rbd_device_type;
3592         dev->parent = &rbd_root_dev;
3593         dev->release = rbd_dev_release;
3594         dev_set_name(dev, "%d", rbd_dev->dev_id);
3595         ret = device_register(dev);
3596
3597         mutex_unlock(&ctl_mutex);
3598
3599         return ret;
3600 }
3601
3602 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3603 {
3604         device_unregister(&rbd_dev->dev);
3605 }
3606
3607 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3608
3609 /*
3610  * Get a unique rbd identifier for the given new rbd_dev, and add
3611  * the rbd_dev to the global list.  The minimum rbd id is 1.
3612  */
3613 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3614 {
3615         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3616
3617         spin_lock(&rbd_dev_list_lock);
3618         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3619         spin_unlock(&rbd_dev_list_lock);
3620         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3621                 (unsigned long long) rbd_dev->dev_id);
3622 }
3623
3624 /*
3625  * Remove an rbd_dev from the global list, and record that its
3626  * identifier is no longer in use.
3627  */
3628 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3629 {
3630         struct list_head *tmp;
3631         int rbd_id = rbd_dev->dev_id;
3632         int max_id;
3633
3634         rbd_assert(rbd_id > 0);
3635
3636         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3637                 (unsigned long long) rbd_dev->dev_id);
3638         spin_lock(&rbd_dev_list_lock);
3639         list_del_init(&rbd_dev->node);
3640
3641         /*
3642          * If the id being "put" is not the current maximum, there
3643          * is nothing special we need to do.
3644          */
3645         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3646                 spin_unlock(&rbd_dev_list_lock);
3647                 return;
3648         }
3649
3650         /*
3651          * We need to update the current maximum id.  Search the
3652          * list to find out what it is.  We're more likely to find
3653          * the maximum at the end, so search the list backward.
3654          */
3655         max_id = 0;
3656         list_for_each_prev(tmp, &rbd_dev_list) {
3657                 struct rbd_device *rbd_dev;
3658
3659                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3660                 if (rbd_dev->dev_id > max_id)
3661                         max_id = rbd_dev->dev_id;
3662         }
3663         spin_unlock(&rbd_dev_list_lock);
3664
3665         /*
3666          * The max id could have been updated by rbd_dev_id_get(), in
3667          * which case it now accurately reflects the new maximum.
3668          * Be careful not to overwrite the maximum value in that
3669          * case.
3670          */
3671         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3672         dout("  max dev id has been reset\n");
3673 }
3674
3675 /*
3676  * Skips over white space at *buf, and updates *buf to point to the
3677  * first found non-space character (if any). Returns the length of
3678  * the token (string of non-white space characters) found.  Note
3679  * that *buf must be terminated with '\0'.
3680  */
3681 static inline size_t next_token(const char **buf)
3682 {
3683         /*
3684         * These are the characters that produce nonzero for
3685         * isspace() in the "C" and "POSIX" locales.
3686         */
3687         const char *spaces = " \f\n\r\t\v";
3688
3689         *buf += strspn(*buf, spaces);   /* Find start of token */
3690
3691         return strcspn(*buf, spaces);   /* Return token length */
3692 }
3693
3694 /*
3695  * Finds the next token in *buf, and if the provided token buffer is
3696  * big enough, copies the found token into it.  The result, if
3697  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3698  * must be terminated with '\0' on entry.
3699  *
3700  * Returns the length of the token found (not including the '\0').
3701  * Return value will be 0 if no token is found, and it will be >=
3702  * token_size if the token would not fit.
3703  *
3704  * The *buf pointer will be updated to point beyond the end of the
3705  * found token.  Note that this occurs even if the token buffer is
3706  * too small to hold it.
3707  */
3708 static inline size_t copy_token(const char **buf,
3709                                 char *token,
3710                                 size_t token_size)
3711 {
3712         size_t len;
3713
3714         len = next_token(buf);
3715         if (len < token_size) {
3716                 memcpy(token, *buf, len);
3717                 *(token + len) = '\0';
3718         }
3719         *buf += len;
3720
3721         return len;
3722 }
3723
3724 /*
3725  * Finds the next token in *buf, dynamically allocates a buffer big
3726  * enough to hold a copy of it, and copies the token into the new
3727  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3728  * that a duplicate buffer is created even for a zero-length token.
3729  *
3730  * Returns a pointer to the newly-allocated duplicate, or a null
3731  * pointer if memory for the duplicate was not available.  If
3732  * the lenp argument is a non-null pointer, the length of the token
3733  * (not including the '\0') is returned in *lenp.
3734  *
3735  * If successful, the *buf pointer will be updated to point beyond
3736  * the end of the found token.
3737  *
3738  * Note: uses GFP_KERNEL for allocation.
3739  */
3740 static inline char *dup_token(const char **buf, size_t *lenp)
3741 {
3742         char *dup;
3743         size_t len;
3744
3745         len = next_token(buf);
3746         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3747         if (!dup)
3748                 return NULL;
3749         *(dup + len) = '\0';
3750         *buf += len;
3751
3752         if (lenp)
3753                 *lenp = len;
3754
3755         return dup;
3756 }
3757
3758 /*
3759  * Parse the options provided for an "rbd add" (i.e., rbd image
3760  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3761  * and the data written is passed here via a NUL-terminated buffer.
3762  * Returns 0 if successful or an error code otherwise.
3763  *
3764  * The information extracted from these options is recorded in
3765  * the other parameters which return dynamically-allocated
3766  * structures:
3767  *  ceph_opts
3768  *      The address of a pointer that will refer to a ceph options
3769  *      structure.  Caller must release the returned pointer using
3770  *      ceph_destroy_options() when it is no longer needed.
3771  *  rbd_opts
3772  *      Address of an rbd options pointer.  Fully initialized by
3773  *      this function; caller must release with kfree().
3774  *  spec
3775  *      Address of an rbd image specification pointer.  Fully
3776  *      initialized by this function based on parsed options.
3777  *      Caller must release with rbd_spec_put().
3778  *
3779  * The options passed take this form:
3780  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3781  * where:
3782  *  <mon_addrs>
3783  *      A comma-separated list of one or more monitor addresses.
3784  *      A monitor address is an ip address, optionally followed
3785  *      by a port number (separated by a colon).
3786  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3787  *  <options>
3788  *      A comma-separated list of ceph and/or rbd options.
3789  *  <pool_name>
3790  *      The name of the rados pool containing the rbd image.
3791  *  <image_name>
3792  *      The name of the image in that pool to map.
3793  *  <snap_id>
3794  *      An optional snapshot id.  If provided, the mapping will
3795  *      present data from the image at the time that snapshot was
3796  *      created.  The image head is used if no snapshot id is
3797  *      provided.  Snapshot mappings are always read-only.
3798  */
3799 static int rbd_add_parse_args(const char *buf,
3800                                 struct ceph_options **ceph_opts,
3801                                 struct rbd_options **opts,
3802                                 struct rbd_spec **rbd_spec)
3803 {
3804         size_t len;
3805         char *options;
3806         const char *mon_addrs;
3807         size_t mon_addrs_size;
3808         struct rbd_spec *spec = NULL;
3809         struct rbd_options *rbd_opts = NULL;
3810         struct ceph_options *copts;
3811         int ret;
3812
3813         /* The first four tokens are required */
3814
3815         len = next_token(&buf);
3816         if (!len) {
3817                 rbd_warn(NULL, "no monitor address(es) provided");
3818                 return -EINVAL;
3819         }
3820         mon_addrs = buf;
3821         mon_addrs_size = len + 1;
3822         buf += len;
3823
3824         ret = -EINVAL;
3825         options = dup_token(&buf, NULL);
3826         if (!options)
3827                 return -ENOMEM;
3828         if (!*options) {
3829                 rbd_warn(NULL, "no options provided");
3830                 goto out_err;
3831         }
3832
3833         spec = rbd_spec_alloc();
3834         if (!spec)
3835                 goto out_mem;
3836
3837         spec->pool_name = dup_token(&buf, NULL);
3838         if (!spec->pool_name)
3839                 goto out_mem;
3840         if (!*spec->pool_name) {
3841                 rbd_warn(NULL, "no pool name provided");
3842                 goto out_err;
3843         }
3844
3845         spec->image_name = dup_token(&buf, NULL);
3846         if (!spec->image_name)
3847                 goto out_mem;
3848         if (!*spec->image_name) {
3849                 rbd_warn(NULL, "no image name provided");
3850                 goto out_err;
3851         }
3852
3853         /*
3854          * Snapshot name is optional; default is to use "-"
3855          * (indicating the head/no snapshot).
3856          */
3857         len = next_token(&buf);
3858         if (!len) {
3859                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3860                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3861         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3862                 ret = -ENAMETOOLONG;
3863                 goto out_err;
3864         }
3865         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3866         if (!spec->snap_name)
3867                 goto out_mem;
3868         *(spec->snap_name + len) = '\0';
3869
3870         /* Initialize all rbd options to the defaults */
3871
3872         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3873         if (!rbd_opts)
3874                 goto out_mem;
3875
3876         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3877
3878         copts = ceph_parse_options(options, mon_addrs,
3879                                         mon_addrs + mon_addrs_size - 1,
3880                                         parse_rbd_opts_token, rbd_opts);
3881         if (IS_ERR(copts)) {
3882                 ret = PTR_ERR(copts);
3883                 goto out_err;
3884         }
3885         kfree(options);
3886
3887         *ceph_opts = copts;
3888         *opts = rbd_opts;
3889         *rbd_spec = spec;
3890
3891         return 0;
3892 out_mem:
3893         ret = -ENOMEM;
3894 out_err:
3895         kfree(rbd_opts);
3896         rbd_spec_put(spec);
3897         kfree(options);
3898
3899         return ret;
3900 }
3901
3902 /*
3903  * An rbd format 2 image has a unique identifier, distinct from the
3904  * name given to it by the user.  Internally, that identifier is
3905  * what's used to specify the names of objects related to the image.
3906  *
3907  * A special "rbd id" object is used to map an rbd image name to its
3908  * id.  If that object doesn't exist, then there is no v2 rbd image
3909  * with the supplied name.
3910  *
3911  * This function will record the given rbd_dev's image_id field if
3912  * it can be determined, and in that case will return 0.  If any
3913  * errors occur a negative errno will be returned and the rbd_dev's
3914  * image_id field will be unchanged (and should be NULL).
3915  */
3916 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3917 {
3918         int ret;
3919         size_t size;
3920         char *object_name;
3921         void *response;
3922         void *p;
3923
3924         /* If we already have it we don't need to look it up */
3925
3926         if (rbd_dev->spec->image_id)
3927                 return 0;
3928
3929         /*
3930          * When probing a parent image, the image id is already
3931          * known (and the image name likely is not).  There's no
3932          * need to fetch the image id again in this case.
3933          */
3934         if (rbd_dev->spec->image_id)
3935                 return 0;
3936
3937         /*
3938          * First, see if the format 2 image id file exists, and if
3939          * so, get the image's persistent id from it.
3940          */
3941         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3942         object_name = kmalloc(size, GFP_NOIO);
3943         if (!object_name)
3944                 return -ENOMEM;
3945         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3946         dout("rbd id object name is %s\n", object_name);
3947
3948         /* Response will be an encoded string, which includes a length */
3949
3950         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3951         response = kzalloc(size, GFP_NOIO);
3952         if (!response) {
3953                 ret = -ENOMEM;
3954                 goto out;
3955         }
3956
3957         ret = rbd_obj_method_sync(rbd_dev, object_name,
3958                                 "rbd", "get_id",
3959                                 NULL, 0,
3960                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3961         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3962         if (ret < 0)
3963                 goto out;
3964
3965         p = response;
3966         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3967                                                 p + RBD_IMAGE_ID_LEN_MAX,
3968                                                 NULL, GFP_NOIO);
3969         if (IS_ERR(rbd_dev->spec->image_id)) {
3970                 ret = PTR_ERR(rbd_dev->spec->image_id);
3971                 rbd_dev->spec->image_id = NULL;
3972         } else {
3973                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3974         }
3975 out:
3976         kfree(response);
3977         kfree(object_name);
3978
3979         return ret;
3980 }
3981
3982 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3983 {
3984         int ret;
3985         size_t size;
3986
3987         /* Version 1 images have no id; empty string is used */
3988
3989         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3990         if (!rbd_dev->spec->image_id)
3991                 return -ENOMEM;
3992
3993         /* Record the header object name for this rbd image. */
3994
3995         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3996         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3997         if (!rbd_dev->header_name) {
3998                 ret = -ENOMEM;
3999                 goto out_err;
4000         }
4001         sprintf(rbd_dev->header_name, "%s%s",
4002                 rbd_dev->spec->image_name, RBD_SUFFIX);
4003
4004         /* Populate rbd image metadata */
4005
4006         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4007         if (ret < 0)
4008                 goto out_err;
4009
4010         /* Version 1 images have no parent (no layering) */
4011
4012         rbd_dev->parent_spec = NULL;
4013         rbd_dev->parent_overlap = 0;
4014
4015         rbd_dev->image_format = 1;
4016
4017         dout("discovered version 1 image, header name is %s\n",
4018                 rbd_dev->header_name);
4019
4020         return 0;
4021
4022 out_err:
4023         kfree(rbd_dev->header_name);
4024         rbd_dev->header_name = NULL;
4025         kfree(rbd_dev->spec->image_id);
4026         rbd_dev->spec->image_id = NULL;
4027
4028         return ret;
4029 }
4030
4031 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4032 {
4033         size_t size;
4034         int ret;
4035         u64 ver = 0;
4036
4037         /*
4038          * Image id was filled in by the caller.  Record the header
4039          * object name for this rbd image.
4040          */
4041         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4042         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4043         if (!rbd_dev->header_name)
4044                 return -ENOMEM;
4045         sprintf(rbd_dev->header_name, "%s%s",
4046                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4047
4048         /* Get the size and object order for the image */
4049
4050         ret = rbd_dev_v2_image_size(rbd_dev);
4051         if (ret < 0)
4052                 goto out_err;
4053
4054         /* Get the object prefix (a.k.a. block_name) for the image */
4055
4056         ret = rbd_dev_v2_object_prefix(rbd_dev);
4057         if (ret < 0)
4058                 goto out_err;
4059
4060         /* Get the and check features for the image */
4061
4062         ret = rbd_dev_v2_features(rbd_dev);
4063         if (ret < 0)
4064                 goto out_err;
4065
4066         /* If the image supports layering, get the parent info */
4067
4068         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4069                 ret = rbd_dev_v2_parent_info(rbd_dev);
4070                 if (ret < 0)
4071                         goto out_err;
4072         }
4073
4074         /* crypto and compression type aren't (yet) supported for v2 images */
4075
4076         rbd_dev->header.crypt_type = 0;
4077         rbd_dev->header.comp_type = 0;
4078
4079         /* Get the snapshot context, plus the header version */
4080
4081         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4082         if (ret)
4083                 goto out_err;
4084         rbd_dev->header.obj_version = ver;
4085
4086         rbd_dev->image_format = 2;
4087
4088         dout("discovered version 2 image, header name is %s\n",
4089                 rbd_dev->header_name);
4090
4091         return 0;
4092 out_err:
4093         rbd_dev->parent_overlap = 0;
4094         rbd_spec_put(rbd_dev->parent_spec);
4095         rbd_dev->parent_spec = NULL;
4096         kfree(rbd_dev->header_name);
4097         rbd_dev->header_name = NULL;
4098         kfree(rbd_dev->header.object_prefix);
4099         rbd_dev->header.object_prefix = NULL;
4100
4101         return ret;
4102 }
4103
4104 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4105 {
4106         struct rbd_device *parent = NULL;
4107         struct rbd_spec *parent_spec = NULL;
4108         struct rbd_client *rbdc = NULL;
4109         int ret;
4110
4111         /* no need to lock here, as rbd_dev is not registered yet */
4112         ret = rbd_dev_snaps_update(rbd_dev);
4113         if (ret)
4114                 return ret;
4115
4116         ret = rbd_dev_probe_update_spec(rbd_dev);
4117         if (ret)
4118                 goto err_out_snaps;
4119
4120         ret = rbd_dev_set_mapping(rbd_dev);
4121         if (ret)
4122                 goto err_out_snaps;
4123
4124         /* generate unique id: find highest unique id, add one */
4125         rbd_dev_id_get(rbd_dev);
4126
4127         /* Fill in the device name, now that we have its id. */
4128         BUILD_BUG_ON(DEV_NAME_LEN
4129                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4130         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4131
4132         /* Get our block major device number. */
4133
4134         ret = register_blkdev(0, rbd_dev->name);
4135         if (ret < 0)
4136                 goto err_out_id;
4137         rbd_dev->major = ret;
4138
4139         /* Set up the blkdev mapping. */
4140
4141         ret = rbd_init_disk(rbd_dev);
4142         if (ret)
4143                 goto err_out_blkdev;
4144
4145         ret = rbd_bus_add_dev(rbd_dev);
4146         if (ret)
4147                 goto err_out_disk;
4148
4149         /*
4150          * At this point cleanup in the event of an error is the job
4151          * of the sysfs code (initiated by rbd_bus_del_dev()).
4152          */
4153         /* Probe the parent if there is one */
4154
4155         if (rbd_dev->parent_spec) {
4156                 /*
4157                  * We need to pass a reference to the client and the
4158                  * parent spec when creating the parent rbd_dev.
4159                  * Images related by parent/child relationships
4160                  * always share both.
4161                  */
4162                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4163                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4164
4165                 parent = rbd_dev_create(rbdc, parent_spec);
4166                 if (!parent) {
4167                         ret = -ENOMEM;
4168                         goto err_out_spec;
4169                 }
4170                 rbdc = NULL;            /* parent now owns reference */
4171                 parent_spec = NULL;     /* parent now owns reference */
4172                 ret = rbd_dev_probe(parent);
4173                 if (ret < 0)
4174                         goto err_out_parent;
4175                 rbd_dev->parent = parent;
4176         }
4177
4178         down_write(&rbd_dev->header_rwsem);
4179         ret = rbd_dev_snaps_register(rbd_dev);
4180         up_write(&rbd_dev->header_rwsem);
4181         if (ret)
4182                 goto err_out_bus;
4183
4184         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4185         if (ret)
4186                 goto err_out_bus;
4187
4188         /* Everything's ready.  Announce the disk to the world. */
4189
4190         add_disk(rbd_dev->disk);
4191
4192         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4193                 (unsigned long long) rbd_dev->mapping.size);
4194
4195         return ret;
4196
4197 err_out_parent:
4198         rbd_dev_destroy(parent);
4199 err_out_spec:
4200         rbd_spec_put(parent_spec);
4201         rbd_put_client(rbdc);
4202 err_out_bus:
4203         /* this will also clean up rest of rbd_dev stuff */
4204
4205         rbd_bus_del_dev(rbd_dev);
4206
4207         return ret;
4208 err_out_disk:
4209         rbd_free_disk(rbd_dev);
4210 err_out_blkdev:
4211         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4212 err_out_id:
4213         rbd_dev_id_put(rbd_dev);
4214 err_out_snaps:
4215         rbd_remove_all_snaps(rbd_dev);
4216
4217         return ret;
4218 }
4219
4220 /*
4221  * Probe for the existence of the header object for the given rbd
4222  * device.  For format 2 images this includes determining the image
4223  * id.
4224  */
4225 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4226 {
4227         int ret;
4228
4229         /*
4230          * Get the id from the image id object.  If it's not a
4231          * format 2 image, we'll get ENOENT back, and we'll assume
4232          * it's a format 1 image.
4233          */
4234         ret = rbd_dev_image_id(rbd_dev);
4235         if (ret)
4236                 ret = rbd_dev_v1_probe(rbd_dev);
4237         else
4238                 ret = rbd_dev_v2_probe(rbd_dev);
4239         if (ret) {
4240                 dout("probe failed, returning %d\n", ret);
4241
4242                 return ret;
4243         }
4244
4245         ret = rbd_dev_probe_finish(rbd_dev);
4246         if (ret)
4247                 rbd_header_free(&rbd_dev->header);
4248
4249         return ret;
4250 }
4251
4252 static ssize_t rbd_add(struct bus_type *bus,
4253                        const char *buf,
4254                        size_t count)
4255 {
4256         struct rbd_device *rbd_dev = NULL;
4257         struct ceph_options *ceph_opts = NULL;
4258         struct rbd_options *rbd_opts = NULL;
4259         struct rbd_spec *spec = NULL;
4260         struct rbd_client *rbdc;
4261         struct ceph_osd_client *osdc;
4262         int rc = -ENOMEM;
4263
4264         if (!try_module_get(THIS_MODULE))
4265                 return -ENODEV;
4266
4267         /* parse add command */
4268         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4269         if (rc < 0)
4270                 goto err_out_module;
4271
4272         rbdc = rbd_get_client(ceph_opts);
4273         if (IS_ERR(rbdc)) {
4274                 rc = PTR_ERR(rbdc);
4275                 goto err_out_args;
4276         }
4277         ceph_opts = NULL;       /* rbd_dev client now owns this */
4278
4279         /* pick the pool */
4280         osdc = &rbdc->client->osdc;
4281         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4282         if (rc < 0)
4283                 goto err_out_client;
4284         spec->pool_id = (u64) rc;
4285
4286         /* The ceph file layout needs to fit pool id in 32 bits */
4287
4288         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4289                 rc = -EIO;
4290                 goto err_out_client;
4291         }
4292
4293         rbd_dev = rbd_dev_create(rbdc, spec);
4294         if (!rbd_dev)
4295                 goto err_out_client;
4296         rbdc = NULL;            /* rbd_dev now owns this */
4297         spec = NULL;            /* rbd_dev now owns this */
4298
4299         rbd_dev->mapping.read_only = rbd_opts->read_only;
4300         kfree(rbd_opts);
4301         rbd_opts = NULL;        /* done with this */
4302
4303         rc = rbd_dev_probe(rbd_dev);
4304         if (rc < 0)
4305                 goto err_out_rbd_dev;
4306
4307         return count;
4308 err_out_rbd_dev:
4309         rbd_dev_destroy(rbd_dev);
4310 err_out_client:
4311         rbd_put_client(rbdc);
4312 err_out_args:
4313         if (ceph_opts)
4314                 ceph_destroy_options(ceph_opts);
4315         kfree(rbd_opts);
4316         rbd_spec_put(spec);
4317 err_out_module:
4318         module_put(THIS_MODULE);
4319
4320         dout("Error adding device %s\n", buf);
4321
4322         return (ssize_t) rc;
4323 }
4324
4325 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4326 {
4327         struct list_head *tmp;
4328         struct rbd_device *rbd_dev;
4329
4330         spin_lock(&rbd_dev_list_lock);
4331         list_for_each(tmp, &rbd_dev_list) {
4332                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4333                 if (rbd_dev->dev_id == dev_id) {
4334                         spin_unlock(&rbd_dev_list_lock);
4335                         return rbd_dev;
4336                 }
4337         }
4338         spin_unlock(&rbd_dev_list_lock);
4339         return NULL;
4340 }
4341
4342 static void rbd_dev_release(struct device *dev)
4343 {
4344         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4345
4346         if (rbd_dev->watch_event)
4347                 rbd_dev_header_watch_sync(rbd_dev, 0);
4348
4349         /* clean up and free blkdev */
4350         rbd_free_disk(rbd_dev);
4351         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4352
4353         /* release allocated disk header fields */
4354         rbd_header_free(&rbd_dev->header);
4355
4356         /* done with the id, and with the rbd_dev */
4357         rbd_dev_id_put(rbd_dev);
4358         rbd_assert(rbd_dev->rbd_client != NULL);
4359         rbd_dev_destroy(rbd_dev);
4360
4361         /* release module ref */
4362         module_put(THIS_MODULE);
4363 }
4364
4365 static void __rbd_remove(struct rbd_device *rbd_dev)
4366 {
4367         rbd_remove_all_snaps(rbd_dev);
4368         rbd_bus_del_dev(rbd_dev);
4369 }
4370
4371 static ssize_t rbd_remove(struct bus_type *bus,
4372                           const char *buf,
4373                           size_t count)
4374 {
4375         struct rbd_device *rbd_dev = NULL;
4376         int target_id, rc;
4377         unsigned long ul;
4378         int ret = count;
4379
4380         rc = strict_strtoul(buf, 10, &ul);
4381         if (rc)
4382                 return rc;
4383
4384         /* convert to int; abort if we lost anything in the conversion */
4385         target_id = (int) ul;
4386         if (target_id != ul)
4387                 return -EINVAL;
4388
4389         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4390
4391         rbd_dev = __rbd_get_dev(target_id);
4392         if (!rbd_dev) {
4393                 ret = -ENOENT;
4394                 goto done;
4395         }
4396
4397         spin_lock_irq(&rbd_dev->lock);
4398         if (rbd_dev->open_count)
4399                 ret = -EBUSY;
4400         else
4401                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4402         spin_unlock_irq(&rbd_dev->lock);
4403         if (ret < 0)
4404                 goto done;
4405
4406         while (rbd_dev->parent_spec) {
4407                 struct rbd_device *first = rbd_dev;
4408                 struct rbd_device *second = first->parent;
4409                 struct rbd_device *third;
4410
4411                 /*
4412                  * Follow to the parent with no grandparent and
4413                  * remove it.
4414                  */
4415                 while (second && (third = second->parent)) {
4416                         first = second;
4417                         second = third;
4418                 }
4419                 __rbd_remove(second);
4420                 rbd_spec_put(first->parent_spec);
4421                 first->parent_spec = NULL;
4422                 first->parent_overlap = 0;
4423                 first->parent = NULL;
4424         }
4425         __rbd_remove(rbd_dev);
4426
4427 done:
4428         mutex_unlock(&ctl_mutex);
4429
4430         return ret;
4431 }
4432
4433 /*
4434  * create control files in sysfs
4435  * /sys/bus/rbd/...
4436  */
4437 static int rbd_sysfs_init(void)
4438 {
4439         int ret;
4440
4441         ret = device_register(&rbd_root_dev);
4442         if (ret < 0)
4443                 return ret;
4444
4445         ret = bus_register(&rbd_bus_type);
4446         if (ret < 0)
4447                 device_unregister(&rbd_root_dev);
4448
4449         return ret;
4450 }
4451
4452 static void rbd_sysfs_cleanup(void)
4453 {
4454         bus_unregister(&rbd_bus_type);
4455         device_unregister(&rbd_root_dev);
4456 }
4457
4458 static int __init rbd_init(void)
4459 {
4460         int rc;
4461
4462         if (!libceph_compatible(NULL)) {
4463                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4464
4465                 return -EINVAL;
4466         }
4467         rc = rbd_sysfs_init();
4468         if (rc)
4469                 return rc;
4470         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4471         return 0;
4472 }
4473
4474 static void __exit rbd_exit(void)
4475 {
4476         rbd_sysfs_cleanup();
4477 }
4478
4479 module_init(rbd_init);
4480 module_exit(rbd_exit);
4481
4482 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4483 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4484 MODULE_DESCRIPTION("rados block device");
4485
4486 /* following authorship retained from original osdblk.c */
4487 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4488
4489 MODULE_LICENSE("GPL");