]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: barriers are hard
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX  ((u8)   (~0U))
58 #define U16_MAX ((u16)  (~0U))
59 #define U32_MAX ((u32)  (~0U))
60 #define U64_MAX ((u64)  (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN   \
69                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME      "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX    64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX  64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING      1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL          (0)
88
89 /*
90  * An RBD device name will be "rbd#", where the "rbd" comes from
91  * RBD_DRV_NAME above, and # is a unique integer identifier.
92  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93  * enough to hold all possible device names.
94  */
95 #define DEV_NAME_LEN            32
96 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 obj_version;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         char            *pool_name;
146
147         char            *image_id;
148         char            *image_name;
149
150         u64             snap_id;
151         char            *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 struct rbd_obj_request {
178         const char              *object_name;
179         u64                     offset;         /* object start byte */
180         u64                     length;         /* bytes from offset */
181
182         struct rbd_img_request  *img_request;
183         struct list_head        links;          /* img_request->obj_requests */
184         u32                     which;          /* posn image request list */
185
186         enum obj_request_type   type;
187         union {
188                 struct bio      *bio_list;
189                 struct {
190                         struct page     **pages;
191                         u32             page_count;
192                 };
193         };
194
195         struct ceph_osd_request *osd_req;
196
197         u64                     xferred;        /* bytes transferred */
198         u64                     version;
199         s32                     result;
200         atomic_t                done;
201
202         rbd_obj_callback_t      callback;
203         struct completion       completion;
204
205         struct kref             kref;
206 };
207
208 struct rbd_img_request {
209         struct request          *rq;
210         struct rbd_device       *rbd_dev;
211         u64                     offset; /* starting image byte offset */
212         u64                     length; /* byte count from offset */
213         bool                    write_request;  /* false for read */
214         union {
215                 struct ceph_snap_context *snapc;        /* for writes */
216                 u64             snap_id;                /* for reads */
217         };
218         spinlock_t              completion_lock;/* protects next_completion */
219         u32                     next_completion;
220         rbd_img_callback_t      callback;
221
222         u32                     obj_request_count;
223         struct list_head        obj_requests;   /* rbd_obj_request structs */
224
225         struct kref             kref;
226 };
227
228 #define for_each_obj_request(ireq, oreq) \
229         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
234
235 struct rbd_snap {
236         struct  device          dev;
237         const char              *name;
238         u64                     size;
239         struct list_head        node;
240         u64                     id;
241         u64                     features;
242 };
243
244 struct rbd_mapping {
245         u64                     size;
246         u64                     features;
247         bool                    read_only;
248 };
249
250 /*
251  * a single device
252  */
253 struct rbd_device {
254         int                     dev_id;         /* blkdev unique id */
255
256         int                     major;          /* blkdev assigned major */
257         struct gendisk          *disk;          /* blkdev's gendisk and rq */
258
259         u32                     image_format;   /* Either 1 or 2 */
260         struct rbd_client       *rbd_client;
261
262         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264         spinlock_t              lock;           /* queue, flags, open_count */
265
266         struct rbd_image_header header;
267         unsigned long           flags;          /* possibly lock protected */
268         struct rbd_spec         *spec;
269
270         char                    *header_name;
271
272         struct ceph_file_layout layout;
273
274         struct ceph_osd_event   *watch_event;
275         struct rbd_obj_request  *watch_request;
276
277         struct rbd_spec         *parent_spec;
278         u64                     parent_overlap;
279
280         /* protects updating the header */
281         struct rw_semaphore     header_rwsem;
282
283         struct rbd_mapping      mapping;
284
285         struct list_head        node;
286
287         /* list of snapshots */
288         struct list_head        snaps;
289
290         /* sysfs related */
291         struct device           dev;
292         unsigned long           open_count;     /* protected by lock */
293 };
294
295 /*
296  * Flag bits for rbd_dev->flags.  If atomicity is required,
297  * rbd_dev->lock is used to protect access.
298  *
299  * Currently, only the "removing" flag (which is coupled with the
300  * "open_count" field) requires atomic access.
301  */
302 enum rbd_dev_flags {
303         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
304         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
305 };
306
307 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
308
309 static LIST_HEAD(rbd_dev_list);    /* devices */
310 static DEFINE_SPINLOCK(rbd_dev_list_lock);
311
312 static LIST_HEAD(rbd_client_list);              /* clients */
313 static DEFINE_SPINLOCK(rbd_client_list_lock);
314
315 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
317
318 static void rbd_dev_release(struct device *dev);
319 static void rbd_remove_snap_dev(struct rbd_snap *snap);
320
321 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322                        size_t count);
323 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324                           size_t count);
325
326 static struct bus_attribute rbd_bus_attrs[] = {
327         __ATTR(add, S_IWUSR, NULL, rbd_add),
328         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329         __ATTR_NULL
330 };
331
332 static struct bus_type rbd_bus_type = {
333         .name           = "rbd",
334         .bus_attrs      = rbd_bus_attrs,
335 };
336
337 static void rbd_root_dev_release(struct device *dev)
338 {
339 }
340
341 static struct device rbd_root_dev = {
342         .init_name =    "rbd",
343         .release =      rbd_root_dev_release,
344 };
345
346 static __printf(2, 3)
347 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348 {
349         struct va_format vaf;
350         va_list args;
351
352         va_start(args, fmt);
353         vaf.fmt = fmt;
354         vaf.va = &args;
355
356         if (!rbd_dev)
357                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358         else if (rbd_dev->disk)
359                 printk(KERN_WARNING "%s: %s: %pV\n",
360                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361         else if (rbd_dev->spec && rbd_dev->spec->image_name)
362                 printk(KERN_WARNING "%s: image %s: %pV\n",
363                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364         else if (rbd_dev->spec && rbd_dev->spec->image_id)
365                 printk(KERN_WARNING "%s: id %s: %pV\n",
366                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367         else    /* punt */
368                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369                         RBD_DRV_NAME, rbd_dev, &vaf);
370         va_end(args);
371 }
372
373 #ifdef RBD_DEBUG
374 #define rbd_assert(expr)                                                \
375                 if (unlikely(!(expr))) {                                \
376                         printk(KERN_ERR "\nAssertion failure in %s() "  \
377                                                 "at line %d:\n\n"       \
378                                         "\trbd_assert(%s);\n\n",        \
379                                         __func__, __LINE__, #expr);     \
380                         BUG();                                          \
381                 }
382 #else /* !RBD_DEBUG */
383 #  define rbd_assert(expr)      ((void) 0)
384 #endif /* !RBD_DEBUG */
385
386 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
388
389 static int rbd_open(struct block_device *bdev, fmode_t mode)
390 {
391         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392         bool removing = false;
393
394         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
395                 return -EROFS;
396
397         spin_lock_irq(&rbd_dev->lock);
398         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399                 removing = true;
400         else
401                 rbd_dev->open_count++;
402         spin_unlock_irq(&rbd_dev->lock);
403         if (removing)
404                 return -ENOENT;
405
406         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407         (void) get_device(&rbd_dev->dev);
408         set_device_ro(bdev, rbd_dev->mapping.read_only);
409         mutex_unlock(&ctl_mutex);
410
411         return 0;
412 }
413
414 static int rbd_release(struct gendisk *disk, fmode_t mode)
415 {
416         struct rbd_device *rbd_dev = disk->private_data;
417         unsigned long open_count_before;
418
419         spin_lock_irq(&rbd_dev->lock);
420         open_count_before = rbd_dev->open_count--;
421         spin_unlock_irq(&rbd_dev->lock);
422         rbd_assert(open_count_before > 0);
423
424         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
425         put_device(&rbd_dev->dev);
426         mutex_unlock(&ctl_mutex);
427
428         return 0;
429 }
430
431 static const struct block_device_operations rbd_bd_ops = {
432         .owner                  = THIS_MODULE,
433         .open                   = rbd_open,
434         .release                = rbd_release,
435 };
436
437 /*
438  * Initialize an rbd client instance.
439  * We own *ceph_opts.
440  */
441 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
442 {
443         struct rbd_client *rbdc;
444         int ret = -ENOMEM;
445
446         dout("rbd_client_create\n");
447         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448         if (!rbdc)
449                 goto out_opt;
450
451         kref_init(&rbdc->kref);
452         INIT_LIST_HEAD(&rbdc->node);
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455
456         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
457         if (IS_ERR(rbdc->client))
458                 goto out_mutex;
459         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
460
461         ret = ceph_open_session(rbdc->client);
462         if (ret < 0)
463                 goto out_err;
464
465         spin_lock(&rbd_client_list_lock);
466         list_add_tail(&rbdc->node, &rbd_client_list);
467         spin_unlock(&rbd_client_list_lock);
468
469         mutex_unlock(&ctl_mutex);
470
471         dout("rbd_client_create created %p\n", rbdc);
472         return rbdc;
473
474 out_err:
475         ceph_destroy_client(rbdc->client);
476 out_mutex:
477         mutex_unlock(&ctl_mutex);
478         kfree(rbdc);
479 out_opt:
480         if (ceph_opts)
481                 ceph_destroy_options(ceph_opts);
482         return ERR_PTR(ret);
483 }
484
485 /*
486  * Find a ceph client with specific addr and configuration.  If
487  * found, bump its reference count.
488  */
489 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *client_node;
492         bool found = false;
493
494         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
495                 return NULL;
496
497         spin_lock(&rbd_client_list_lock);
498         list_for_each_entry(client_node, &rbd_client_list, node) {
499                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
500                         kref_get(&client_node->kref);
501                         found = true;
502                         break;
503                 }
504         }
505         spin_unlock(&rbd_client_list_lock);
506
507         return found ? client_node : NULL;
508 }
509
510 /*
511  * mount options
512  */
513 enum {
514         Opt_last_int,
515         /* int args above */
516         Opt_last_string,
517         /* string args above */
518         Opt_read_only,
519         Opt_read_write,
520         /* Boolean args above */
521         Opt_last_bool,
522 };
523
524 static match_table_t rbd_opts_tokens = {
525         /* int args above */
526         /* string args above */
527         {Opt_read_only, "read_only"},
528         {Opt_read_only, "ro"},          /* Alternate spelling */
529         {Opt_read_write, "read_write"},
530         {Opt_read_write, "rw"},         /* Alternate spelling */
531         /* Boolean args above */
532         {-1, NULL}
533 };
534
535 struct rbd_options {
536         bool    read_only;
537 };
538
539 #define RBD_READ_ONLY_DEFAULT   false
540
541 static int parse_rbd_opts_token(char *c, void *private)
542 {
543         struct rbd_options *rbd_opts = private;
544         substring_t argstr[MAX_OPT_ARGS];
545         int token, intval, ret;
546
547         token = match_token(c, rbd_opts_tokens, argstr);
548         if (token < 0)
549                 return -EINVAL;
550
551         if (token < Opt_last_int) {
552                 ret = match_int(&argstr[0], &intval);
553                 if (ret < 0) {
554                         pr_err("bad mount option arg (not int) "
555                                "at '%s'\n", c);
556                         return ret;
557                 }
558                 dout("got int token %d val %d\n", token, intval);
559         } else if (token > Opt_last_int && token < Opt_last_string) {
560                 dout("got string token %d val %s\n", token,
561                      argstr[0].from);
562         } else if (token > Opt_last_string && token < Opt_last_bool) {
563                 dout("got Boolean token %d\n", token);
564         } else {
565                 dout("got token %d\n", token);
566         }
567
568         switch (token) {
569         case Opt_read_only:
570                 rbd_opts->read_only = true;
571                 break;
572         case Opt_read_write:
573                 rbd_opts->read_only = false;
574                 break;
575         default:
576                 rbd_assert(false);
577                 break;
578         }
579         return 0;
580 }
581
582 /*
583  * Get a ceph client with specific addr and configuration, if one does
584  * not exist create it.
585  */
586 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
587 {
588         struct rbd_client *rbdc;
589
590         rbdc = rbd_client_find(ceph_opts);
591         if (rbdc)       /* using an existing client */
592                 ceph_destroy_options(ceph_opts);
593         else
594                 rbdc = rbd_client_create(ceph_opts);
595
596         return rbdc;
597 }
598
599 /*
600  * Destroy ceph client
601  *
602  * Caller must hold rbd_client_list_lock.
603  */
604 static void rbd_client_release(struct kref *kref)
605 {
606         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
607
608         dout("rbd_release_client %p\n", rbdc);
609         spin_lock(&rbd_client_list_lock);
610         list_del(&rbdc->node);
611         spin_unlock(&rbd_client_list_lock);
612
613         ceph_destroy_client(rbdc->client);
614         kfree(rbdc);
615 }
616
617 /*
618  * Drop reference to ceph client node. If it's not referenced anymore, release
619  * it.
620  */
621 static void rbd_put_client(struct rbd_client *rbdc)
622 {
623         if (rbdc)
624                 kref_put(&rbdc->kref, rbd_client_release);
625 }
626
627 static bool rbd_image_format_valid(u32 image_format)
628 {
629         return image_format == 1 || image_format == 2;
630 }
631
632 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
633 {
634         size_t size;
635         u32 snap_count;
636
637         /* The header has to start with the magic rbd header text */
638         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
639                 return false;
640
641         /* The bio layer requires at least sector-sized I/O */
642
643         if (ondisk->options.order < SECTOR_SHIFT)
644                 return false;
645
646         /* If we use u64 in a few spots we may be able to loosen this */
647
648         if (ondisk->options.order > 8 * sizeof (int) - 1)
649                 return false;
650
651         /*
652          * The size of a snapshot header has to fit in a size_t, and
653          * that limits the number of snapshots.
654          */
655         snap_count = le32_to_cpu(ondisk->snap_count);
656         size = SIZE_MAX - sizeof (struct ceph_snap_context);
657         if (snap_count > size / sizeof (__le64))
658                 return false;
659
660         /*
661          * Not only that, but the size of the entire the snapshot
662          * header must also be representable in a size_t.
663          */
664         size -= snap_count * sizeof (__le64);
665         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
666                 return false;
667
668         return true;
669 }
670
671 /*
672  * Create a new header structure, translate header format from the on-disk
673  * header.
674  */
675 static int rbd_header_from_disk(struct rbd_image_header *header,
676                                  struct rbd_image_header_ondisk *ondisk)
677 {
678         u32 snap_count;
679         size_t len;
680         size_t size;
681         u32 i;
682
683         memset(header, 0, sizeof (*header));
684
685         snap_count = le32_to_cpu(ondisk->snap_count);
686
687         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
688         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
689         if (!header->object_prefix)
690                 return -ENOMEM;
691         memcpy(header->object_prefix, ondisk->object_prefix, len);
692         header->object_prefix[len] = '\0';
693
694         if (snap_count) {
695                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
696
697                 /* Save a copy of the snapshot names */
698
699                 if (snap_names_len > (u64) SIZE_MAX)
700                         return -EIO;
701                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
702                 if (!header->snap_names)
703                         goto out_err;
704                 /*
705                  * Note that rbd_dev_v1_header_read() guarantees
706                  * the ondisk buffer we're working with has
707                  * snap_names_len bytes beyond the end of the
708                  * snapshot id array, this memcpy() is safe.
709                  */
710                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
711                         snap_names_len);
712
713                 /* Record each snapshot's size */
714
715                 size = snap_count * sizeof (*header->snap_sizes);
716                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
717                 if (!header->snap_sizes)
718                         goto out_err;
719                 for (i = 0; i < snap_count; i++)
720                         header->snap_sizes[i] =
721                                 le64_to_cpu(ondisk->snaps[i].image_size);
722         } else {
723                 WARN_ON(ondisk->snap_names_len);
724                 header->snap_names = NULL;
725                 header->snap_sizes = NULL;
726         }
727
728         header->features = 0;   /* No features support in v1 images */
729         header->obj_order = ondisk->options.order;
730         header->crypt_type = ondisk->options.crypt_type;
731         header->comp_type = ondisk->options.comp_type;
732
733         /* Allocate and fill in the snapshot context */
734
735         header->image_size = le64_to_cpu(ondisk->image_size);
736         size = sizeof (struct ceph_snap_context);
737         size += snap_count * sizeof (header->snapc->snaps[0]);
738         header->snapc = kzalloc(size, GFP_KERNEL);
739         if (!header->snapc)
740                 goto out_err;
741
742         atomic_set(&header->snapc->nref, 1);
743         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
744         header->snapc->num_snaps = snap_count;
745         for (i = 0; i < snap_count; i++)
746                 header->snapc->snaps[i] =
747                         le64_to_cpu(ondisk->snaps[i].id);
748
749         return 0;
750
751 out_err:
752         kfree(header->snap_sizes);
753         header->snap_sizes = NULL;
754         kfree(header->snap_names);
755         header->snap_names = NULL;
756         kfree(header->object_prefix);
757         header->object_prefix = NULL;
758
759         return -ENOMEM;
760 }
761
762 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
763 {
764         struct rbd_snap *snap;
765
766         if (snap_id == CEPH_NOSNAP)
767                 return RBD_SNAP_HEAD_NAME;
768
769         list_for_each_entry(snap, &rbd_dev->snaps, node)
770                 if (snap_id == snap->id)
771                         return snap->name;
772
773         return NULL;
774 }
775
776 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
777 {
778
779         struct rbd_snap *snap;
780
781         list_for_each_entry(snap, &rbd_dev->snaps, node) {
782                 if (!strcmp(snap_name, snap->name)) {
783                         rbd_dev->spec->snap_id = snap->id;
784                         rbd_dev->mapping.size = snap->size;
785                         rbd_dev->mapping.features = snap->features;
786
787                         return 0;
788                 }
789         }
790
791         return -ENOENT;
792 }
793
794 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
795 {
796         int ret;
797
798         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
799                     sizeof (RBD_SNAP_HEAD_NAME))) {
800                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
801                 rbd_dev->mapping.size = rbd_dev->header.image_size;
802                 rbd_dev->mapping.features = rbd_dev->header.features;
803                 ret = 0;
804         } else {
805                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
806                 if (ret < 0)
807                         goto done;
808                 rbd_dev->mapping.read_only = true;
809         }
810         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811
812 done:
813         return ret;
814 }
815
816 static void rbd_header_free(struct rbd_image_header *header)
817 {
818         kfree(header->object_prefix);
819         header->object_prefix = NULL;
820         kfree(header->snap_sizes);
821         header->snap_sizes = NULL;
822         kfree(header->snap_names);
823         header->snap_names = NULL;
824         ceph_put_snap_context(header->snapc);
825         header->snapc = NULL;
826 }
827
828 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
829 {
830         char *name;
831         u64 segment;
832         int ret;
833
834         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
835         if (!name)
836                 return NULL;
837         segment = offset >> rbd_dev->header.obj_order;
838         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
839                         rbd_dev->header.object_prefix, segment);
840         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
841                 pr_err("error formatting segment name for #%llu (%d)\n",
842                         segment, ret);
843                 kfree(name);
844                 name = NULL;
845         }
846
847         return name;
848 }
849
850 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
851 {
852         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
853
854         return offset & (segment_size - 1);
855 }
856
857 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
858                                 u64 offset, u64 length)
859 {
860         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
861
862         offset &= segment_size - 1;
863
864         rbd_assert(length <= U64_MAX - offset);
865         if (offset + length > segment_size)
866                 length = segment_size - offset;
867
868         return length;
869 }
870
871 /*
872  * returns the size of an object in the image
873  */
874 static u64 rbd_obj_bytes(struct rbd_image_header *header)
875 {
876         return 1 << header->obj_order;
877 }
878
879 /*
880  * bio helpers
881  */
882
883 static void bio_chain_put(struct bio *chain)
884 {
885         struct bio *tmp;
886
887         while (chain) {
888                 tmp = chain;
889                 chain = chain->bi_next;
890                 bio_put(tmp);
891         }
892 }
893
894 /*
895  * zeros a bio chain, starting at specific offset
896  */
897 static void zero_bio_chain(struct bio *chain, int start_ofs)
898 {
899         struct bio_vec *bv;
900         unsigned long flags;
901         void *buf;
902         int i;
903         int pos = 0;
904
905         while (chain) {
906                 bio_for_each_segment(bv, chain, i) {
907                         if (pos + bv->bv_len > start_ofs) {
908                                 int remainder = max(start_ofs - pos, 0);
909                                 buf = bvec_kmap_irq(bv, &flags);
910                                 memset(buf + remainder, 0,
911                                        bv->bv_len - remainder);
912                                 bvec_kunmap_irq(buf, &flags);
913                         }
914                         pos += bv->bv_len;
915                 }
916
917                 chain = chain->bi_next;
918         }
919 }
920
921 /*
922  * Clone a portion of a bio, starting at the given byte offset
923  * and continuing for the number of bytes indicated.
924  */
925 static struct bio *bio_clone_range(struct bio *bio_src,
926                                         unsigned int offset,
927                                         unsigned int len,
928                                         gfp_t gfpmask)
929 {
930         struct bio_vec *bv;
931         unsigned int resid;
932         unsigned short idx;
933         unsigned int voff;
934         unsigned short end_idx;
935         unsigned short vcnt;
936         struct bio *bio;
937
938         /* Handle the easy case for the caller */
939
940         if (!offset && len == bio_src->bi_size)
941                 return bio_clone(bio_src, gfpmask);
942
943         if (WARN_ON_ONCE(!len))
944                 return NULL;
945         if (WARN_ON_ONCE(len > bio_src->bi_size))
946                 return NULL;
947         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
948                 return NULL;
949
950         /* Find first affected segment... */
951
952         resid = offset;
953         __bio_for_each_segment(bv, bio_src, idx, 0) {
954                 if (resid < bv->bv_len)
955                         break;
956                 resid -= bv->bv_len;
957         }
958         voff = resid;
959
960         /* ...and the last affected segment */
961
962         resid += len;
963         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
964                 if (resid <= bv->bv_len)
965                         break;
966                 resid -= bv->bv_len;
967         }
968         vcnt = end_idx - idx + 1;
969
970         /* Build the clone */
971
972         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
973         if (!bio)
974                 return NULL;    /* ENOMEM */
975
976         bio->bi_bdev = bio_src->bi_bdev;
977         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
978         bio->bi_rw = bio_src->bi_rw;
979         bio->bi_flags |= 1 << BIO_CLONED;
980
981         /*
982          * Copy over our part of the bio_vec, then update the first
983          * and last (or only) entries.
984          */
985         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
986                         vcnt * sizeof (struct bio_vec));
987         bio->bi_io_vec[0].bv_offset += voff;
988         if (vcnt > 1) {
989                 bio->bi_io_vec[0].bv_len -= voff;
990                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
991         } else {
992                 bio->bi_io_vec[0].bv_len = len;
993         }
994
995         bio->bi_vcnt = vcnt;
996         bio->bi_size = len;
997         bio->bi_idx = 0;
998
999         return bio;
1000 }
1001
1002 /*
1003  * Clone a portion of a bio chain, starting at the given byte offset
1004  * into the first bio in the source chain and continuing for the
1005  * number of bytes indicated.  The result is another bio chain of
1006  * exactly the given length, or a null pointer on error.
1007  *
1008  * The bio_src and offset parameters are both in-out.  On entry they
1009  * refer to the first source bio and the offset into that bio where
1010  * the start of data to be cloned is located.
1011  *
1012  * On return, bio_src is updated to refer to the bio in the source
1013  * chain that contains first un-cloned byte, and *offset will
1014  * contain the offset of that byte within that bio.
1015  */
1016 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1017                                         unsigned int *offset,
1018                                         unsigned int len,
1019                                         gfp_t gfpmask)
1020 {
1021         struct bio *bi = *bio_src;
1022         unsigned int off = *offset;
1023         struct bio *chain = NULL;
1024         struct bio **end;
1025
1026         /* Build up a chain of clone bios up to the limit */
1027
1028         if (!bi || off >= bi->bi_size || !len)
1029                 return NULL;            /* Nothing to clone */
1030
1031         end = &chain;
1032         while (len) {
1033                 unsigned int bi_size;
1034                 struct bio *bio;
1035
1036                 if (!bi) {
1037                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1038                         goto out_err;   /* EINVAL; ran out of bio's */
1039                 }
1040                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1041                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1042                 if (!bio)
1043                         goto out_err;   /* ENOMEM */
1044
1045                 *end = bio;
1046                 end = &bio->bi_next;
1047
1048                 off += bi_size;
1049                 if (off == bi->bi_size) {
1050                         bi = bi->bi_next;
1051                         off = 0;
1052                 }
1053                 len -= bi_size;
1054         }
1055         *bio_src = bi;
1056         *offset = off;
1057
1058         return chain;
1059 out_err:
1060         bio_chain_put(chain);
1061
1062         return NULL;
1063 }
1064
1065 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1066 {
1067         kref_get(&obj_request->kref);
1068 }
1069
1070 static void rbd_obj_request_destroy(struct kref *kref);
1071 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1072 {
1073         rbd_assert(obj_request != NULL);
1074         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1075 }
1076
1077 static void rbd_img_request_get(struct rbd_img_request *img_request)
1078 {
1079         kref_get(&img_request->kref);
1080 }
1081
1082 static void rbd_img_request_destroy(struct kref *kref);
1083 static void rbd_img_request_put(struct rbd_img_request *img_request)
1084 {
1085         rbd_assert(img_request != NULL);
1086         kref_put(&img_request->kref, rbd_img_request_destroy);
1087 }
1088
1089 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1090                                         struct rbd_obj_request *obj_request)
1091 {
1092         rbd_assert(obj_request->img_request == NULL);
1093
1094         rbd_obj_request_get(obj_request);
1095         obj_request->img_request = img_request;
1096         obj_request->which = img_request->obj_request_count;
1097         rbd_assert(obj_request->which != BAD_WHICH);
1098         img_request->obj_request_count++;
1099         list_add_tail(&obj_request->links, &img_request->obj_requests);
1100 }
1101
1102 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1103                                         struct rbd_obj_request *obj_request)
1104 {
1105         rbd_assert(obj_request->which != BAD_WHICH);
1106
1107         list_del(&obj_request->links);
1108         rbd_assert(img_request->obj_request_count > 0);
1109         img_request->obj_request_count--;
1110         rbd_assert(obj_request->which == img_request->obj_request_count);
1111         obj_request->which = BAD_WHICH;
1112         rbd_assert(obj_request->img_request == img_request);
1113         obj_request->img_request = NULL;
1114         obj_request->callback = NULL;
1115         rbd_obj_request_put(obj_request);
1116 }
1117
1118 static bool obj_request_type_valid(enum obj_request_type type)
1119 {
1120         switch (type) {
1121         case OBJ_REQUEST_NODATA:
1122         case OBJ_REQUEST_BIO:
1123         case OBJ_REQUEST_PAGES:
1124                 return true;
1125         default:
1126                 return false;
1127         }
1128 }
1129
1130 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1131 {
1132         struct ceph_osd_req_op *op;
1133         va_list args;
1134         size_t size;
1135
1136         op = kzalloc(sizeof (*op), GFP_NOIO);
1137         if (!op)
1138                 return NULL;
1139         op->op = opcode;
1140         va_start(args, opcode);
1141         switch (opcode) {
1142         case CEPH_OSD_OP_READ:
1143         case CEPH_OSD_OP_WRITE:
1144                 /* rbd_osd_req_op_create(READ, offset, length) */
1145                 /* rbd_osd_req_op_create(WRITE, offset, length) */
1146                 op->extent.offset = va_arg(args, u64);
1147                 op->extent.length = va_arg(args, u64);
1148                 if (opcode == CEPH_OSD_OP_WRITE)
1149                         op->payload_len = op->extent.length;
1150                 break;
1151         case CEPH_OSD_OP_STAT:
1152                 break;
1153         case CEPH_OSD_OP_CALL:
1154                 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1155                 op->cls.class_name = va_arg(args, char *);
1156                 size = strlen(op->cls.class_name);
1157                 rbd_assert(size <= (size_t) U8_MAX);
1158                 op->cls.class_len = size;
1159                 op->payload_len = size;
1160
1161                 op->cls.method_name = va_arg(args, char *);
1162                 size = strlen(op->cls.method_name);
1163                 rbd_assert(size <= (size_t) U8_MAX);
1164                 op->cls.method_len = size;
1165                 op->payload_len += size;
1166
1167                 op->cls.argc = 0;
1168                 op->cls.indata = va_arg(args, void *);
1169                 size = va_arg(args, size_t);
1170                 rbd_assert(size <= (size_t) U32_MAX);
1171                 op->cls.indata_len = (u32) size;
1172                 op->payload_len += size;
1173                 break;
1174         case CEPH_OSD_OP_NOTIFY_ACK:
1175         case CEPH_OSD_OP_WATCH:
1176                 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1177                 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1178                 op->watch.cookie = va_arg(args, u64);
1179                 op->watch.ver = va_arg(args, u64);
1180                 op->watch.ver = cpu_to_le64(op->watch.ver);
1181                 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1182                         op->watch.flag = (u8) 1;
1183                 break;
1184         default:
1185                 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1186                 kfree(op);
1187                 op = NULL;
1188                 break;
1189         }
1190         va_end(args);
1191
1192         return op;
1193 }
1194
1195 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1196 {
1197         kfree(op);
1198 }
1199
1200 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1201                                 struct rbd_obj_request *obj_request)
1202 {
1203         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1204 }
1205
1206 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1207 {
1208         if (img_request->callback)
1209                 img_request->callback(img_request);
1210         else
1211                 rbd_img_request_put(img_request);
1212 }
1213
1214 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1215
1216 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1217 {
1218         return wait_for_completion_interruptible(&obj_request->completion);
1219 }
1220
1221 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1222 {
1223         atomic_set(&obj_request->done, 0);
1224         smp_wmb();
1225 }
1226
1227 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1228 {
1229         int done;
1230
1231         done = atomic_inc_return(&obj_request->done);
1232         if (done > 1) {
1233                 struct rbd_img_request *img_request = obj_request->img_request;
1234                 struct rbd_device *rbd_dev;
1235
1236                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1237                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1238                         obj_request);
1239         }
1240 }
1241
1242 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1243 {
1244         smp_mb();
1245         return atomic_read(&obj_request->done) != 0;
1246 }
1247
1248 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request,
1249                                 struct ceph_osd_op *op)
1250 {
1251         obj_request_done_set(obj_request);
1252 }
1253
1254 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1255 {
1256         if (obj_request->callback)
1257                 obj_request->callback(obj_request);
1258         else
1259                 complete_all(&obj_request->completion);
1260 }
1261
1262 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1263                                 struct ceph_osd_op *op)
1264 {
1265         u64 xferred;
1266
1267         /*
1268          * We support a 64-bit length, but ultimately it has to be
1269          * passed to blk_end_request(), which takes an unsigned int.
1270          */
1271         xferred = le64_to_cpu(op->extent.length);
1272         rbd_assert(xferred < (u64) UINT_MAX);
1273         if (obj_request->result == (s32) -ENOENT) {
1274                 zero_bio_chain(obj_request->bio_list, 0);
1275                 obj_request->result = 0;
1276         } else if (xferred < obj_request->length && !obj_request->result) {
1277                 zero_bio_chain(obj_request->bio_list, xferred);
1278                 xferred = obj_request->length;
1279         }
1280         obj_request->xferred = xferred;
1281         obj_request_done_set(obj_request);
1282 }
1283
1284 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1285                                 struct ceph_osd_op *op)
1286 {
1287         obj_request->xferred = le64_to_cpu(op->extent.length);
1288         obj_request_done_set(obj_request);
1289 }
1290
1291 /*
1292  * For a simple stat call there's nothing to do.  We'll do more if
1293  * this is part of a write sequence for a layered image.
1294  */
1295 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request,
1296                                 struct ceph_osd_op *op)
1297 {
1298         obj_request_done_set(obj_request);
1299 }
1300
1301 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1302                                 struct ceph_msg *msg)
1303 {
1304         struct rbd_obj_request *obj_request = osd_req->r_priv;
1305         struct ceph_osd_reply_head *reply_head;
1306         struct ceph_osd_op *op;
1307         u32 num_ops;
1308         u16 opcode;
1309
1310         rbd_assert(osd_req == obj_request->osd_req);
1311         rbd_assert(!!obj_request->img_request ^
1312                                 (obj_request->which == BAD_WHICH));
1313
1314         obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1315         reply_head = msg->front.iov_base;
1316         obj_request->result = (s32) le32_to_cpu(reply_head->result);
1317         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1318
1319         num_ops = le32_to_cpu(reply_head->num_ops);
1320         WARN_ON(num_ops != 1);  /* For now */
1321
1322         op = &reply_head->ops[0];
1323         opcode = le16_to_cpu(op->op);
1324         switch (opcode) {
1325         case CEPH_OSD_OP_READ:
1326                 rbd_osd_read_callback(obj_request, op);
1327                 break;
1328         case CEPH_OSD_OP_WRITE:
1329                 rbd_osd_write_callback(obj_request, op);
1330                 break;
1331         case CEPH_OSD_OP_STAT:
1332                 rbd_osd_stat_callback(obj_request, op);
1333                 break;
1334         case CEPH_OSD_OP_CALL:
1335         case CEPH_OSD_OP_NOTIFY_ACK:
1336         case CEPH_OSD_OP_WATCH:
1337                 rbd_osd_trivial_callback(obj_request, op);
1338                 break;
1339         default:
1340                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1341                         obj_request->object_name, (unsigned short) opcode);
1342                 break;
1343         }
1344
1345         if (obj_request_done_test(obj_request))
1346                 rbd_obj_request_complete(obj_request);
1347 }
1348
1349 static struct ceph_osd_request *rbd_osd_req_create(
1350                                         struct rbd_device *rbd_dev,
1351                                         bool write_request,
1352                                         struct rbd_obj_request *obj_request,
1353                                         struct ceph_osd_req_op *op)
1354 {
1355         struct rbd_img_request *img_request = obj_request->img_request;
1356         struct ceph_snap_context *snapc = NULL;
1357         struct ceph_osd_client *osdc;
1358         struct ceph_osd_request *osd_req;
1359         struct timespec now;
1360         struct timespec *mtime;
1361         u64 snap_id = CEPH_NOSNAP;
1362         u64 offset = obj_request->offset;
1363         u64 length = obj_request->length;
1364
1365         if (img_request) {
1366                 rbd_assert(img_request->write_request == write_request);
1367                 if (img_request->write_request)
1368                         snapc = img_request->snapc;
1369                 else
1370                         snap_id = img_request->snap_id;
1371         }
1372
1373         /* Allocate and initialize the request, for the single op */
1374
1375         osdc = &rbd_dev->rbd_client->client->osdc;
1376         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1377         if (!osd_req)
1378                 return NULL;    /* ENOMEM */
1379
1380         rbd_assert(obj_request_type_valid(obj_request->type));
1381         switch (obj_request->type) {
1382         case OBJ_REQUEST_NODATA:
1383                 break;          /* Nothing to do */
1384         case OBJ_REQUEST_BIO:
1385                 rbd_assert(obj_request->bio_list != NULL);
1386                 osd_req->r_bio = obj_request->bio_list;
1387                 break;
1388         case OBJ_REQUEST_PAGES:
1389                 osd_req->r_pages = obj_request->pages;
1390                 osd_req->r_num_pages = obj_request->page_count;
1391                 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1392                 break;
1393         }
1394
1395         if (write_request) {
1396                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1397                 now = CURRENT_TIME;
1398                 mtime = &now;
1399         } else {
1400                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1401                 mtime = NULL;   /* not needed for reads */
1402                 offset = 0;     /* These are not used... */
1403                 length = 0;     /* ...for osd read requests */
1404         }
1405
1406         osd_req->r_callback = rbd_osd_req_callback;
1407         osd_req->r_priv = obj_request;
1408
1409         osd_req->r_oid_len = strlen(obj_request->object_name);
1410         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1411         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1412
1413         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1414
1415         /* osd_req will get its own reference to snapc (if non-null) */
1416
1417         ceph_osdc_build_request(osd_req, offset, length, 1, op,
1418                                 snapc, snap_id, mtime);
1419
1420         return osd_req;
1421 }
1422
1423 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1424 {
1425         ceph_osdc_put_request(osd_req);
1426 }
1427
1428 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1429
1430 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1431                                                 u64 offset, u64 length,
1432                                                 enum obj_request_type type)
1433 {
1434         struct rbd_obj_request *obj_request;
1435         size_t size;
1436         char *name;
1437
1438         rbd_assert(obj_request_type_valid(type));
1439
1440         size = strlen(object_name) + 1;
1441         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1442         if (!obj_request)
1443                 return NULL;
1444
1445         name = (char *)(obj_request + 1);
1446         obj_request->object_name = memcpy(name, object_name, size);
1447         obj_request->offset = offset;
1448         obj_request->length = length;
1449         obj_request->which = BAD_WHICH;
1450         obj_request->type = type;
1451         INIT_LIST_HEAD(&obj_request->links);
1452         obj_request_done_init(obj_request);
1453         init_completion(&obj_request->completion);
1454         kref_init(&obj_request->kref);
1455
1456         return obj_request;
1457 }
1458
1459 static void rbd_obj_request_destroy(struct kref *kref)
1460 {
1461         struct rbd_obj_request *obj_request;
1462
1463         obj_request = container_of(kref, struct rbd_obj_request, kref);
1464
1465         rbd_assert(obj_request->img_request == NULL);
1466         rbd_assert(obj_request->which == BAD_WHICH);
1467
1468         if (obj_request->osd_req)
1469                 rbd_osd_req_destroy(obj_request->osd_req);
1470
1471         rbd_assert(obj_request_type_valid(obj_request->type));
1472         switch (obj_request->type) {
1473         case OBJ_REQUEST_NODATA:
1474                 break;          /* Nothing to do */
1475         case OBJ_REQUEST_BIO:
1476                 if (obj_request->bio_list)
1477                         bio_chain_put(obj_request->bio_list);
1478                 break;
1479         case OBJ_REQUEST_PAGES:
1480                 if (obj_request->pages)
1481                         ceph_release_page_vector(obj_request->pages,
1482                                                 obj_request->page_count);
1483                 break;
1484         }
1485
1486         kfree(obj_request);
1487 }
1488
1489 /*
1490  * Caller is responsible for filling in the list of object requests
1491  * that comprises the image request, and the Linux request pointer
1492  * (if there is one).
1493  */
1494 struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1495                                         u64 offset, u64 length,
1496                                         bool write_request)
1497 {
1498         struct rbd_img_request *img_request;
1499         struct ceph_snap_context *snapc = NULL;
1500
1501         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1502         if (!img_request)
1503                 return NULL;
1504
1505         if (write_request) {
1506                 down_read(&rbd_dev->header_rwsem);
1507                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1508                 up_read(&rbd_dev->header_rwsem);
1509                 if (WARN_ON(!snapc)) {
1510                         kfree(img_request);
1511                         return NULL;    /* Shouldn't happen */
1512                 }
1513         }
1514
1515         img_request->rq = NULL;
1516         img_request->rbd_dev = rbd_dev;
1517         img_request->offset = offset;
1518         img_request->length = length;
1519         img_request->write_request = write_request;
1520         if (write_request)
1521                 img_request->snapc = snapc;
1522         else
1523                 img_request->snap_id = rbd_dev->spec->snap_id;
1524         spin_lock_init(&img_request->completion_lock);
1525         img_request->next_completion = 0;
1526         img_request->callback = NULL;
1527         img_request->obj_request_count = 0;
1528         INIT_LIST_HEAD(&img_request->obj_requests);
1529         kref_init(&img_request->kref);
1530
1531         rbd_img_request_get(img_request);       /* Avoid a warning */
1532         rbd_img_request_put(img_request);       /* TEMPORARY */
1533
1534         return img_request;
1535 }
1536
1537 static void rbd_img_request_destroy(struct kref *kref)
1538 {
1539         struct rbd_img_request *img_request;
1540         struct rbd_obj_request *obj_request;
1541         struct rbd_obj_request *next_obj_request;
1542
1543         img_request = container_of(kref, struct rbd_img_request, kref);
1544
1545         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1546                 rbd_img_obj_request_del(img_request, obj_request);
1547         rbd_assert(img_request->obj_request_count == 0);
1548
1549         if (img_request->write_request)
1550                 ceph_put_snap_context(img_request->snapc);
1551
1552         kfree(img_request);
1553 }
1554
1555 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1556                                         struct bio *bio_list)
1557 {
1558         struct rbd_device *rbd_dev = img_request->rbd_dev;
1559         struct rbd_obj_request *obj_request = NULL;
1560         struct rbd_obj_request *next_obj_request;
1561         unsigned int bio_offset;
1562         u64 image_offset;
1563         u64 resid;
1564         u16 opcode;
1565
1566         opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1567                                               : CEPH_OSD_OP_READ;
1568         bio_offset = 0;
1569         image_offset = img_request->offset;
1570         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1571         resid = img_request->length;
1572         rbd_assert(resid > 0);
1573         while (resid) {
1574                 const char *object_name;
1575                 unsigned int clone_size;
1576                 struct ceph_osd_req_op *op;
1577                 u64 offset;
1578                 u64 length;
1579
1580                 object_name = rbd_segment_name(rbd_dev, image_offset);
1581                 if (!object_name)
1582                         goto out_unwind;
1583                 offset = rbd_segment_offset(rbd_dev, image_offset);
1584                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1585                 obj_request = rbd_obj_request_create(object_name,
1586                                                 offset, length,
1587                                                 OBJ_REQUEST_BIO);
1588                 kfree(object_name);     /* object request has its own copy */
1589                 if (!obj_request)
1590                         goto out_unwind;
1591
1592                 rbd_assert(length <= (u64) UINT_MAX);
1593                 clone_size = (unsigned int) length;
1594                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1595                                                 &bio_offset, clone_size,
1596                                                 GFP_ATOMIC);
1597                 if (!obj_request->bio_list)
1598                         goto out_partial;
1599
1600                 /*
1601                  * Build up the op to use in building the osd
1602                  * request.  Note that the contents of the op are
1603                  * copied by rbd_osd_req_create().
1604                  */
1605                 op = rbd_osd_req_op_create(opcode, offset, length);
1606                 if (!op)
1607                         goto out_partial;
1608                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1609                                                 img_request->write_request,
1610                                                 obj_request, op);
1611                 rbd_osd_req_op_destroy(op);
1612                 if (!obj_request->osd_req)
1613                         goto out_partial;
1614                 /* status and version are initially zero-filled */
1615
1616                 rbd_img_obj_request_add(img_request, obj_request);
1617
1618                 image_offset += length;
1619                 resid -= length;
1620         }
1621
1622         return 0;
1623
1624 out_partial:
1625         rbd_obj_request_put(obj_request);
1626 out_unwind:
1627         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1628                 rbd_obj_request_put(obj_request);
1629
1630         return -ENOMEM;
1631 }
1632
1633 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1634 {
1635         struct rbd_img_request *img_request;
1636         u32 which = obj_request->which;
1637         bool more = true;
1638
1639         img_request = obj_request->img_request;
1640
1641         rbd_assert(img_request != NULL);
1642         rbd_assert(img_request->rq != NULL);
1643         rbd_assert(img_request->obj_request_count > 0);
1644         rbd_assert(which != BAD_WHICH);
1645         rbd_assert(which < img_request->obj_request_count);
1646         rbd_assert(which >= img_request->next_completion);
1647
1648         spin_lock_irq(&img_request->completion_lock);
1649         if (which != img_request->next_completion)
1650                 goto out;
1651
1652         for_each_obj_request_from(img_request, obj_request) {
1653                 unsigned int xferred;
1654                 int result;
1655
1656                 rbd_assert(more);
1657                 rbd_assert(which < img_request->obj_request_count);
1658
1659                 if (!obj_request_done_test(obj_request))
1660                         break;
1661
1662                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1663                 xferred = (unsigned int) obj_request->xferred;
1664                 result = (int) obj_request->result;
1665                 if (result)
1666                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1667                                 img_request->write_request ? "write" : "read",
1668                                 result, xferred);
1669
1670                 more = blk_end_request(img_request->rq, result, xferred);
1671                 which++;
1672         }
1673         rbd_assert(more ^ (which == img_request->obj_request_count));
1674         img_request->next_completion = which;
1675 out:
1676         spin_unlock_irq(&img_request->completion_lock);
1677
1678         if (!more)
1679                 rbd_img_request_complete(img_request);
1680 }
1681
1682 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1683 {
1684         struct rbd_device *rbd_dev = img_request->rbd_dev;
1685         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1686         struct rbd_obj_request *obj_request;
1687
1688         for_each_obj_request(img_request, obj_request) {
1689                 int ret;
1690
1691                 obj_request->callback = rbd_img_obj_callback;
1692                 ret = rbd_obj_request_submit(osdc, obj_request);
1693                 if (ret)
1694                         return ret;
1695                 /*
1696                  * The image request has its own reference to each
1697                  * of its object requests, so we can safely drop the
1698                  * initial one here.
1699                  */
1700                 rbd_obj_request_put(obj_request);
1701         }
1702
1703         return 0;
1704 }
1705
1706 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1707                                    u64 ver, u64 notify_id)
1708 {
1709         struct rbd_obj_request *obj_request;
1710         struct ceph_osd_req_op *op;
1711         struct ceph_osd_client *osdc;
1712         int ret;
1713
1714         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1715                                                         OBJ_REQUEST_NODATA);
1716         if (!obj_request)
1717                 return -ENOMEM;
1718
1719         ret = -ENOMEM;
1720         op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1721         if (!op)
1722                 goto out;
1723         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1724                                                 obj_request, op);
1725         rbd_osd_req_op_destroy(op);
1726         if (!obj_request->osd_req)
1727                 goto out;
1728
1729         osdc = &rbd_dev->rbd_client->client->osdc;
1730         obj_request->callback = rbd_obj_request_put;
1731         ret = rbd_obj_request_submit(osdc, obj_request);
1732 out:
1733         if (ret)
1734                 rbd_obj_request_put(obj_request);
1735
1736         return ret;
1737 }
1738
1739 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1740 {
1741         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1742         u64 hver;
1743         int rc;
1744
1745         if (!rbd_dev)
1746                 return;
1747
1748         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1749                 rbd_dev->header_name, (unsigned long long) notify_id,
1750                 (unsigned int) opcode);
1751         rc = rbd_dev_refresh(rbd_dev, &hver);
1752         if (rc)
1753                 rbd_warn(rbd_dev, "got notification but failed to "
1754                            " update snaps: %d\n", rc);
1755
1756         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1757 }
1758
1759 /*
1760  * Request sync osd watch/unwatch.  The value of "start" determines
1761  * whether a watch request is being initiated or torn down.
1762  */
1763 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1764 {
1765         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1766         struct rbd_obj_request *obj_request;
1767         struct ceph_osd_req_op *op;
1768         int ret;
1769
1770         rbd_assert(start ^ !!rbd_dev->watch_event);
1771         rbd_assert(start ^ !!rbd_dev->watch_request);
1772
1773         if (start) {
1774                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1775                                                 &rbd_dev->watch_event);
1776                 if (ret < 0)
1777                         return ret;
1778                 rbd_assert(rbd_dev->watch_event != NULL);
1779         }
1780
1781         ret = -ENOMEM;
1782         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1783                                                         OBJ_REQUEST_NODATA);
1784         if (!obj_request)
1785                 goto out_cancel;
1786
1787         op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1788                                 rbd_dev->watch_event->cookie,
1789                                 rbd_dev->header.obj_version, start);
1790         if (!op)
1791                 goto out_cancel;
1792         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1793                                                         obj_request, op);
1794         rbd_osd_req_op_destroy(op);
1795         if (!obj_request->osd_req)
1796                 goto out_cancel;
1797
1798         if (start)
1799                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1800         else
1801                 ceph_osdc_unregister_linger_request(osdc,
1802                                         rbd_dev->watch_request->osd_req);
1803         ret = rbd_obj_request_submit(osdc, obj_request);
1804         if (ret)
1805                 goto out_cancel;
1806         ret = rbd_obj_request_wait(obj_request);
1807         if (ret)
1808                 goto out_cancel;
1809         ret = obj_request->result;
1810         if (ret)
1811                 goto out_cancel;
1812
1813         /*
1814          * A watch request is set to linger, so the underlying osd
1815          * request won't go away until we unregister it.  We retain
1816          * a pointer to the object request during that time (in
1817          * rbd_dev->watch_request), so we'll keep a reference to
1818          * it.  We'll drop that reference (below) after we've
1819          * unregistered it.
1820          */
1821         if (start) {
1822                 rbd_dev->watch_request = obj_request;
1823
1824                 return 0;
1825         }
1826
1827         /* We have successfully torn down the watch request */
1828
1829         rbd_obj_request_put(rbd_dev->watch_request);
1830         rbd_dev->watch_request = NULL;
1831 out_cancel:
1832         /* Cancel the event if we're tearing down, or on error */
1833         ceph_osdc_cancel_event(rbd_dev->watch_event);
1834         rbd_dev->watch_event = NULL;
1835         if (obj_request)
1836                 rbd_obj_request_put(obj_request);
1837
1838         return ret;
1839 }
1840
1841 /*
1842  * Synchronous osd object method call
1843  */
1844 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1845                              const char *object_name,
1846                              const char *class_name,
1847                              const char *method_name,
1848                              const char *outbound,
1849                              size_t outbound_size,
1850                              char *inbound,
1851                              size_t inbound_size,
1852                              u64 *version)
1853 {
1854         struct rbd_obj_request *obj_request;
1855         struct ceph_osd_client *osdc;
1856         struct ceph_osd_req_op *op;
1857         struct page **pages;
1858         u32 page_count;
1859         int ret;
1860
1861         /*
1862          * Method calls are ultimately read operations but they
1863          * don't involve object data (so no offset or length).
1864          * The result should placed into the inbound buffer
1865          * provided.  They also supply outbound data--parameters for
1866          * the object method.  Currently if this is present it will
1867          * be a snapshot id.
1868          */
1869         page_count = (u32) calc_pages_for(0, inbound_size);
1870         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1871         if (IS_ERR(pages))
1872                 return PTR_ERR(pages);
1873
1874         ret = -ENOMEM;
1875         obj_request = rbd_obj_request_create(object_name, 0, 0,
1876                                                         OBJ_REQUEST_PAGES);
1877         if (!obj_request)
1878                 goto out;
1879
1880         obj_request->pages = pages;
1881         obj_request->page_count = page_count;
1882
1883         op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1884                                         method_name, outbound, outbound_size);
1885         if (!op)
1886                 goto out;
1887         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1888                                                 obj_request, op);
1889         rbd_osd_req_op_destroy(op);
1890         if (!obj_request->osd_req)
1891                 goto out;
1892
1893         osdc = &rbd_dev->rbd_client->client->osdc;
1894         ret = rbd_obj_request_submit(osdc, obj_request);
1895         if (ret)
1896                 goto out;
1897         ret = rbd_obj_request_wait(obj_request);
1898         if (ret)
1899                 goto out;
1900
1901         ret = obj_request->result;
1902         if (ret < 0)
1903                 goto out;
1904         ret = 0;
1905         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1906         if (version)
1907                 *version = obj_request->version;
1908 out:
1909         if (obj_request)
1910                 rbd_obj_request_put(obj_request);
1911         else
1912                 ceph_release_page_vector(pages, page_count);
1913
1914         return ret;
1915 }
1916
1917 static void rbd_request_fn(struct request_queue *q)
1918 {
1919         struct rbd_device *rbd_dev = q->queuedata;
1920         bool read_only = rbd_dev->mapping.read_only;
1921         struct request *rq;
1922         int result;
1923
1924         while ((rq = blk_fetch_request(q))) {
1925                 bool write_request = rq_data_dir(rq) == WRITE;
1926                 struct rbd_img_request *img_request;
1927                 u64 offset;
1928                 u64 length;
1929
1930                 /* Ignore any non-FS requests that filter through. */
1931
1932                 if (rq->cmd_type != REQ_TYPE_FS) {
1933                         dout("%s: non-fs request type %d\n", __func__,
1934                                 (int) rq->cmd_type);
1935                         __blk_end_request_all(rq, 0);
1936                         continue;
1937                 }
1938
1939                 /* Ignore/skip any zero-length requests */
1940
1941                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1942                 length = (u64) blk_rq_bytes(rq);
1943
1944                 if (!length) {
1945                         dout("%s: zero-length request\n", __func__);
1946                         __blk_end_request_all(rq, 0);
1947                         continue;
1948                 }
1949
1950                 spin_unlock_irq(q->queue_lock);
1951
1952                 /* Disallow writes to a read-only device */
1953
1954                 if (write_request) {
1955                         result = -EROFS;
1956                         if (read_only)
1957                                 goto end_request;
1958                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1959                 }
1960
1961                 /*
1962                  * Quit early if the mapped snapshot no longer
1963                  * exists.  It's still possible the snapshot will
1964                  * have disappeared by the time our request arrives
1965                  * at the osd, but there's no sense in sending it if
1966                  * we already know.
1967                  */
1968                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1969                         dout("request for non-existent snapshot");
1970                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1971                         result = -ENXIO;
1972                         goto end_request;
1973                 }
1974
1975                 result = -EINVAL;
1976                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1977                         goto end_request;       /* Shouldn't happen */
1978
1979                 result = -ENOMEM;
1980                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1981                                                         write_request);
1982                 if (!img_request)
1983                         goto end_request;
1984
1985                 img_request->rq = rq;
1986
1987                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1988                 if (!result)
1989                         result = rbd_img_request_submit(img_request);
1990                 if (result)
1991                         rbd_img_request_put(img_request);
1992 end_request:
1993                 spin_lock_irq(q->queue_lock);
1994                 if (result < 0) {
1995                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1996                                 write_request ? "write" : "read", result);
1997                         __blk_end_request_all(rq, result);
1998                 }
1999         }
2000 }
2001
2002 /*
2003  * a queue callback. Makes sure that we don't create a bio that spans across
2004  * multiple osd objects. One exception would be with a single page bios,
2005  * which we handle later at bio_chain_clone_range()
2006  */
2007 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2008                           struct bio_vec *bvec)
2009 {
2010         struct rbd_device *rbd_dev = q->queuedata;
2011         sector_t sector_offset;
2012         sector_t sectors_per_obj;
2013         sector_t obj_sector_offset;
2014         int ret;
2015
2016         /*
2017          * Find how far into its rbd object the partition-relative
2018          * bio start sector is to offset relative to the enclosing
2019          * device.
2020          */
2021         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2022         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2023         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2024
2025         /*
2026          * Compute the number of bytes from that offset to the end
2027          * of the object.  Account for what's already used by the bio.
2028          */
2029         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2030         if (ret > bmd->bi_size)
2031                 ret -= bmd->bi_size;
2032         else
2033                 ret = 0;
2034
2035         /*
2036          * Don't send back more than was asked for.  And if the bio
2037          * was empty, let the whole thing through because:  "Note
2038          * that a block device *must* allow a single page to be
2039          * added to an empty bio."
2040          */
2041         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2042         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2043                 ret = (int) bvec->bv_len;
2044
2045         return ret;
2046 }
2047
2048 static void rbd_free_disk(struct rbd_device *rbd_dev)
2049 {
2050         struct gendisk *disk = rbd_dev->disk;
2051
2052         if (!disk)
2053                 return;
2054
2055         if (disk->flags & GENHD_FL_UP)
2056                 del_gendisk(disk);
2057         if (disk->queue)
2058                 blk_cleanup_queue(disk->queue);
2059         put_disk(disk);
2060 }
2061
2062 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2063                                 const char *object_name,
2064                                 u64 offset, u64 length,
2065                                 char *buf, u64 *version)
2066
2067 {
2068         struct ceph_osd_req_op *op;
2069         struct rbd_obj_request *obj_request;
2070         struct ceph_osd_client *osdc;
2071         struct page **pages = NULL;
2072         u32 page_count;
2073         size_t size;
2074         int ret;
2075
2076         page_count = (u32) calc_pages_for(offset, length);
2077         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2078         if (IS_ERR(pages))
2079                 ret = PTR_ERR(pages);
2080
2081         ret = -ENOMEM;
2082         obj_request = rbd_obj_request_create(object_name, offset, length,
2083                                                         OBJ_REQUEST_PAGES);
2084         if (!obj_request)
2085                 goto out;
2086
2087         obj_request->pages = pages;
2088         obj_request->page_count = page_count;
2089
2090         op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2091         if (!op)
2092                 goto out;
2093         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2094                                                 obj_request, op);
2095         rbd_osd_req_op_destroy(op);
2096         if (!obj_request->osd_req)
2097                 goto out;
2098
2099         osdc = &rbd_dev->rbd_client->client->osdc;
2100         ret = rbd_obj_request_submit(osdc, obj_request);
2101         if (ret)
2102                 goto out;
2103         ret = rbd_obj_request_wait(obj_request);
2104         if (ret)
2105                 goto out;
2106
2107         ret = obj_request->result;
2108         if (ret < 0)
2109                 goto out;
2110
2111         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2112         size = (size_t) obj_request->xferred;
2113         ceph_copy_from_page_vector(pages, buf, 0, size);
2114         rbd_assert(size <= (size_t) INT_MAX);
2115         ret = (int) size;
2116         if (version)
2117                 *version = obj_request->version;
2118 out:
2119         if (obj_request)
2120                 rbd_obj_request_put(obj_request);
2121         else
2122                 ceph_release_page_vector(pages, page_count);
2123
2124         return ret;
2125 }
2126
2127 /*
2128  * Read the complete header for the given rbd device.
2129  *
2130  * Returns a pointer to a dynamically-allocated buffer containing
2131  * the complete and validated header.  Caller can pass the address
2132  * of a variable that will be filled in with the version of the
2133  * header object at the time it was read.
2134  *
2135  * Returns a pointer-coded errno if a failure occurs.
2136  */
2137 static struct rbd_image_header_ondisk *
2138 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2139 {
2140         struct rbd_image_header_ondisk *ondisk = NULL;
2141         u32 snap_count = 0;
2142         u64 names_size = 0;
2143         u32 want_count;
2144         int ret;
2145
2146         /*
2147          * The complete header will include an array of its 64-bit
2148          * snapshot ids, followed by the names of those snapshots as
2149          * a contiguous block of NUL-terminated strings.  Note that
2150          * the number of snapshots could change by the time we read
2151          * it in, in which case we re-read it.
2152          */
2153         do {
2154                 size_t size;
2155
2156                 kfree(ondisk);
2157
2158                 size = sizeof (*ondisk);
2159                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2160                 size += names_size;
2161                 ondisk = kmalloc(size, GFP_KERNEL);
2162                 if (!ondisk)
2163                         return ERR_PTR(-ENOMEM);
2164
2165                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2166                                        0, size,
2167                                        (char *) ondisk, version);
2168                 if (ret < 0)
2169                         goto out_err;
2170                 if (WARN_ON((size_t) ret < size)) {
2171                         ret = -ENXIO;
2172                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2173                                 size, ret);
2174                         goto out_err;
2175                 }
2176                 if (!rbd_dev_ondisk_valid(ondisk)) {
2177                         ret = -ENXIO;
2178                         rbd_warn(rbd_dev, "invalid header");
2179                         goto out_err;
2180                 }
2181
2182                 names_size = le64_to_cpu(ondisk->snap_names_len);
2183                 want_count = snap_count;
2184                 snap_count = le32_to_cpu(ondisk->snap_count);
2185         } while (snap_count != want_count);
2186
2187         return ondisk;
2188
2189 out_err:
2190         kfree(ondisk);
2191
2192         return ERR_PTR(ret);
2193 }
2194
2195 /*
2196  * reload the ondisk the header
2197  */
2198 static int rbd_read_header(struct rbd_device *rbd_dev,
2199                            struct rbd_image_header *header)
2200 {
2201         struct rbd_image_header_ondisk *ondisk;
2202         u64 ver = 0;
2203         int ret;
2204
2205         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2206         if (IS_ERR(ondisk))
2207                 return PTR_ERR(ondisk);
2208         ret = rbd_header_from_disk(header, ondisk);
2209         if (ret >= 0)
2210                 header->obj_version = ver;
2211         kfree(ondisk);
2212
2213         return ret;
2214 }
2215
2216 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2217 {
2218         struct rbd_snap *snap;
2219         struct rbd_snap *next;
2220
2221         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2222                 rbd_remove_snap_dev(snap);
2223 }
2224
2225 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2226 {
2227         sector_t size;
2228
2229         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2230                 return;
2231
2232         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2233         dout("setting size to %llu sectors", (unsigned long long) size);
2234         rbd_dev->mapping.size = (u64) size;
2235         set_capacity(rbd_dev->disk, size);
2236 }
2237
2238 /*
2239  * only read the first part of the ondisk header, without the snaps info
2240  */
2241 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2242 {
2243         int ret;
2244         struct rbd_image_header h;
2245
2246         ret = rbd_read_header(rbd_dev, &h);
2247         if (ret < 0)
2248                 return ret;
2249
2250         down_write(&rbd_dev->header_rwsem);
2251
2252         /* Update image size, and check for resize of mapped image */
2253         rbd_dev->header.image_size = h.image_size;
2254         rbd_update_mapping_size(rbd_dev);
2255
2256         /* rbd_dev->header.object_prefix shouldn't change */
2257         kfree(rbd_dev->header.snap_sizes);
2258         kfree(rbd_dev->header.snap_names);
2259         /* osd requests may still refer to snapc */
2260         ceph_put_snap_context(rbd_dev->header.snapc);
2261
2262         if (hver)
2263                 *hver = h.obj_version;
2264         rbd_dev->header.obj_version = h.obj_version;
2265         rbd_dev->header.image_size = h.image_size;
2266         rbd_dev->header.snapc = h.snapc;
2267         rbd_dev->header.snap_names = h.snap_names;
2268         rbd_dev->header.snap_sizes = h.snap_sizes;
2269         /* Free the extra copy of the object prefix */
2270         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2271         kfree(h.object_prefix);
2272
2273         ret = rbd_dev_snaps_update(rbd_dev);
2274         if (!ret)
2275                 ret = rbd_dev_snaps_register(rbd_dev);
2276
2277         up_write(&rbd_dev->header_rwsem);
2278
2279         return ret;
2280 }
2281
2282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2283 {
2284         int ret;
2285
2286         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2287         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2288         if (rbd_dev->image_format == 1)
2289                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2290         else
2291                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2292         mutex_unlock(&ctl_mutex);
2293
2294         return ret;
2295 }
2296
2297 static int rbd_init_disk(struct rbd_device *rbd_dev)
2298 {
2299         struct gendisk *disk;
2300         struct request_queue *q;
2301         u64 segment_size;
2302
2303         /* create gendisk info */
2304         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2305         if (!disk)
2306                 return -ENOMEM;
2307
2308         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2309                  rbd_dev->dev_id);
2310         disk->major = rbd_dev->major;
2311         disk->first_minor = 0;
2312         disk->fops = &rbd_bd_ops;
2313         disk->private_data = rbd_dev;
2314
2315         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2316         if (!q)
2317                 goto out_disk;
2318
2319         /* We use the default size, but let's be explicit about it. */
2320         blk_queue_physical_block_size(q, SECTOR_SIZE);
2321
2322         /* set io sizes to object size */
2323         segment_size = rbd_obj_bytes(&rbd_dev->header);
2324         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2325         blk_queue_max_segment_size(q, segment_size);
2326         blk_queue_io_min(q, segment_size);
2327         blk_queue_io_opt(q, segment_size);
2328
2329         blk_queue_merge_bvec(q, rbd_merge_bvec);
2330         disk->queue = q;
2331
2332         q->queuedata = rbd_dev;
2333
2334         rbd_dev->disk = disk;
2335
2336         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2337
2338         return 0;
2339 out_disk:
2340         put_disk(disk);
2341
2342         return -ENOMEM;
2343 }
2344
2345 /*
2346   sysfs
2347 */
2348
2349 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2350 {
2351         return container_of(dev, struct rbd_device, dev);
2352 }
2353
2354 static ssize_t rbd_size_show(struct device *dev,
2355                              struct device_attribute *attr, char *buf)
2356 {
2357         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2358         sector_t size;
2359
2360         down_read(&rbd_dev->header_rwsem);
2361         size = get_capacity(rbd_dev->disk);
2362         up_read(&rbd_dev->header_rwsem);
2363
2364         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2365 }
2366
2367 /*
2368  * Note this shows the features for whatever's mapped, which is not
2369  * necessarily the base image.
2370  */
2371 static ssize_t rbd_features_show(struct device *dev,
2372                              struct device_attribute *attr, char *buf)
2373 {
2374         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2375
2376         return sprintf(buf, "0x%016llx\n",
2377                         (unsigned long long) rbd_dev->mapping.features);
2378 }
2379
2380 static ssize_t rbd_major_show(struct device *dev,
2381                               struct device_attribute *attr, char *buf)
2382 {
2383         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2384
2385         return sprintf(buf, "%d\n", rbd_dev->major);
2386 }
2387
2388 static ssize_t rbd_client_id_show(struct device *dev,
2389                                   struct device_attribute *attr, char *buf)
2390 {
2391         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2392
2393         return sprintf(buf, "client%lld\n",
2394                         ceph_client_id(rbd_dev->rbd_client->client));
2395 }
2396
2397 static ssize_t rbd_pool_show(struct device *dev,
2398                              struct device_attribute *attr, char *buf)
2399 {
2400         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2401
2402         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2403 }
2404
2405 static ssize_t rbd_pool_id_show(struct device *dev,
2406                              struct device_attribute *attr, char *buf)
2407 {
2408         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2409
2410         return sprintf(buf, "%llu\n",
2411                 (unsigned long long) rbd_dev->spec->pool_id);
2412 }
2413
2414 static ssize_t rbd_name_show(struct device *dev,
2415                              struct device_attribute *attr, char *buf)
2416 {
2417         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2418
2419         if (rbd_dev->spec->image_name)
2420                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2421
2422         return sprintf(buf, "(unknown)\n");
2423 }
2424
2425 static ssize_t rbd_image_id_show(struct device *dev,
2426                              struct device_attribute *attr, char *buf)
2427 {
2428         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2429
2430         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2431 }
2432
2433 /*
2434  * Shows the name of the currently-mapped snapshot (or
2435  * RBD_SNAP_HEAD_NAME for the base image).
2436  */
2437 static ssize_t rbd_snap_show(struct device *dev,
2438                              struct device_attribute *attr,
2439                              char *buf)
2440 {
2441         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2442
2443         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2444 }
2445
2446 /*
2447  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2448  * for the parent image.  If there is no parent, simply shows
2449  * "(no parent image)".
2450  */
2451 static ssize_t rbd_parent_show(struct device *dev,
2452                              struct device_attribute *attr,
2453                              char *buf)
2454 {
2455         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2456         struct rbd_spec *spec = rbd_dev->parent_spec;
2457         int count;
2458         char *bufp = buf;
2459
2460         if (!spec)
2461                 return sprintf(buf, "(no parent image)\n");
2462
2463         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2464                         (unsigned long long) spec->pool_id, spec->pool_name);
2465         if (count < 0)
2466                 return count;
2467         bufp += count;
2468
2469         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2470                         spec->image_name ? spec->image_name : "(unknown)");
2471         if (count < 0)
2472                 return count;
2473         bufp += count;
2474
2475         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2476                         (unsigned long long) spec->snap_id, spec->snap_name);
2477         if (count < 0)
2478                 return count;
2479         bufp += count;
2480
2481         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2482         if (count < 0)
2483                 return count;
2484         bufp += count;
2485
2486         return (ssize_t) (bufp - buf);
2487 }
2488
2489 static ssize_t rbd_image_refresh(struct device *dev,
2490                                  struct device_attribute *attr,
2491                                  const char *buf,
2492                                  size_t size)
2493 {
2494         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2495         int ret;
2496
2497         ret = rbd_dev_refresh(rbd_dev, NULL);
2498
2499         return ret < 0 ? ret : size;
2500 }
2501
2502 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2503 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2504 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2505 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2506 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2507 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2508 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2509 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2510 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2511 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2512 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2513
2514 static struct attribute *rbd_attrs[] = {
2515         &dev_attr_size.attr,
2516         &dev_attr_features.attr,
2517         &dev_attr_major.attr,
2518         &dev_attr_client_id.attr,
2519         &dev_attr_pool.attr,
2520         &dev_attr_pool_id.attr,
2521         &dev_attr_name.attr,
2522         &dev_attr_image_id.attr,
2523         &dev_attr_current_snap.attr,
2524         &dev_attr_parent.attr,
2525         &dev_attr_refresh.attr,
2526         NULL
2527 };
2528
2529 static struct attribute_group rbd_attr_group = {
2530         .attrs = rbd_attrs,
2531 };
2532
2533 static const struct attribute_group *rbd_attr_groups[] = {
2534         &rbd_attr_group,
2535         NULL
2536 };
2537
2538 static void rbd_sysfs_dev_release(struct device *dev)
2539 {
2540 }
2541
2542 static struct device_type rbd_device_type = {
2543         .name           = "rbd",
2544         .groups         = rbd_attr_groups,
2545         .release        = rbd_sysfs_dev_release,
2546 };
2547
2548
2549 /*
2550   sysfs - snapshots
2551 */
2552
2553 static ssize_t rbd_snap_size_show(struct device *dev,
2554                                   struct device_attribute *attr,
2555                                   char *buf)
2556 {
2557         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2558
2559         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2560 }
2561
2562 static ssize_t rbd_snap_id_show(struct device *dev,
2563                                 struct device_attribute *attr,
2564                                 char *buf)
2565 {
2566         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2567
2568         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2569 }
2570
2571 static ssize_t rbd_snap_features_show(struct device *dev,
2572                                 struct device_attribute *attr,
2573                                 char *buf)
2574 {
2575         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2576
2577         return sprintf(buf, "0x%016llx\n",
2578                         (unsigned long long) snap->features);
2579 }
2580
2581 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2582 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2583 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2584
2585 static struct attribute *rbd_snap_attrs[] = {
2586         &dev_attr_snap_size.attr,
2587         &dev_attr_snap_id.attr,
2588         &dev_attr_snap_features.attr,
2589         NULL,
2590 };
2591
2592 static struct attribute_group rbd_snap_attr_group = {
2593         .attrs = rbd_snap_attrs,
2594 };
2595
2596 static void rbd_snap_dev_release(struct device *dev)
2597 {
2598         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2599         kfree(snap->name);
2600         kfree(snap);
2601 }
2602
2603 static const struct attribute_group *rbd_snap_attr_groups[] = {
2604         &rbd_snap_attr_group,
2605         NULL
2606 };
2607
2608 static struct device_type rbd_snap_device_type = {
2609         .groups         = rbd_snap_attr_groups,
2610         .release        = rbd_snap_dev_release,
2611 };
2612
2613 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2614 {
2615         kref_get(&spec->kref);
2616
2617         return spec;
2618 }
2619
2620 static void rbd_spec_free(struct kref *kref);
2621 static void rbd_spec_put(struct rbd_spec *spec)
2622 {
2623         if (spec)
2624                 kref_put(&spec->kref, rbd_spec_free);
2625 }
2626
2627 static struct rbd_spec *rbd_spec_alloc(void)
2628 {
2629         struct rbd_spec *spec;
2630
2631         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2632         if (!spec)
2633                 return NULL;
2634         kref_init(&spec->kref);
2635
2636         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2637
2638         return spec;
2639 }
2640
2641 static void rbd_spec_free(struct kref *kref)
2642 {
2643         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2644
2645         kfree(spec->pool_name);
2646         kfree(spec->image_id);
2647         kfree(spec->image_name);
2648         kfree(spec->snap_name);
2649         kfree(spec);
2650 }
2651
2652 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2653                                 struct rbd_spec *spec)
2654 {
2655         struct rbd_device *rbd_dev;
2656
2657         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2658         if (!rbd_dev)
2659                 return NULL;
2660
2661         spin_lock_init(&rbd_dev->lock);
2662         rbd_dev->flags = 0;
2663         INIT_LIST_HEAD(&rbd_dev->node);
2664         INIT_LIST_HEAD(&rbd_dev->snaps);
2665         init_rwsem(&rbd_dev->header_rwsem);
2666
2667         rbd_dev->spec = spec;
2668         rbd_dev->rbd_client = rbdc;
2669
2670         /* Initialize the layout used for all rbd requests */
2671
2672         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2673         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2674         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2675         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2676
2677         return rbd_dev;
2678 }
2679
2680 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2681 {
2682         rbd_spec_put(rbd_dev->parent_spec);
2683         kfree(rbd_dev->header_name);
2684         rbd_put_client(rbd_dev->rbd_client);
2685         rbd_spec_put(rbd_dev->spec);
2686         kfree(rbd_dev);
2687 }
2688
2689 static bool rbd_snap_registered(struct rbd_snap *snap)
2690 {
2691         bool ret = snap->dev.type == &rbd_snap_device_type;
2692         bool reg = device_is_registered(&snap->dev);
2693
2694         rbd_assert(!ret ^ reg);
2695
2696         return ret;
2697 }
2698
2699 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2700 {
2701         list_del(&snap->node);
2702         if (device_is_registered(&snap->dev))
2703                 device_unregister(&snap->dev);
2704 }
2705
2706 static int rbd_register_snap_dev(struct rbd_snap *snap,
2707                                   struct device *parent)
2708 {
2709         struct device *dev = &snap->dev;
2710         int ret;
2711
2712         dev->type = &rbd_snap_device_type;
2713         dev->parent = parent;
2714         dev->release = rbd_snap_dev_release;
2715         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2716         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2717
2718         ret = device_register(dev);
2719
2720         return ret;
2721 }
2722
2723 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2724                                                 const char *snap_name,
2725                                                 u64 snap_id, u64 snap_size,
2726                                                 u64 snap_features)
2727 {
2728         struct rbd_snap *snap;
2729         int ret;
2730
2731         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2732         if (!snap)
2733                 return ERR_PTR(-ENOMEM);
2734
2735         ret = -ENOMEM;
2736         snap->name = kstrdup(snap_name, GFP_KERNEL);
2737         if (!snap->name)
2738                 goto err;
2739
2740         snap->id = snap_id;
2741         snap->size = snap_size;
2742         snap->features = snap_features;
2743
2744         return snap;
2745
2746 err:
2747         kfree(snap->name);
2748         kfree(snap);
2749
2750         return ERR_PTR(ret);
2751 }
2752
2753 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2754                 u64 *snap_size, u64 *snap_features)
2755 {
2756         char *snap_name;
2757
2758         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2759
2760         *snap_size = rbd_dev->header.snap_sizes[which];
2761         *snap_features = 0;     /* No features for v1 */
2762
2763         /* Skip over names until we find the one we are looking for */
2764
2765         snap_name = rbd_dev->header.snap_names;
2766         while (which--)
2767                 snap_name += strlen(snap_name) + 1;
2768
2769         return snap_name;
2770 }
2771
2772 /*
2773  * Get the size and object order for an image snapshot, or if
2774  * snap_id is CEPH_NOSNAP, gets this information for the base
2775  * image.
2776  */
2777 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2778                                 u8 *order, u64 *snap_size)
2779 {
2780         __le64 snapid = cpu_to_le64(snap_id);
2781         int ret;
2782         struct {
2783                 u8 order;
2784                 __le64 size;
2785         } __attribute__ ((packed)) size_buf = { 0 };
2786
2787         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2788                                 "rbd", "get_size",
2789                                 (char *) &snapid, sizeof (snapid),
2790                                 (char *) &size_buf, sizeof (size_buf), NULL);
2791         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2792         if (ret < 0)
2793                 return ret;
2794
2795         *order = size_buf.order;
2796         *snap_size = le64_to_cpu(size_buf.size);
2797
2798         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2799                 (unsigned long long) snap_id, (unsigned int) *order,
2800                 (unsigned long long) *snap_size);
2801
2802         return 0;
2803 }
2804
2805 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2806 {
2807         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2808                                         &rbd_dev->header.obj_order,
2809                                         &rbd_dev->header.image_size);
2810 }
2811
2812 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2813 {
2814         void *reply_buf;
2815         int ret;
2816         void *p;
2817
2818         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2819         if (!reply_buf)
2820                 return -ENOMEM;
2821
2822         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2823                                 "rbd", "get_object_prefix",
2824                                 NULL, 0,
2825                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2826         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2827         if (ret < 0)
2828                 goto out;
2829
2830         p = reply_buf;
2831         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2832                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2833                                                 NULL, GFP_NOIO);
2834
2835         if (IS_ERR(rbd_dev->header.object_prefix)) {
2836                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2837                 rbd_dev->header.object_prefix = NULL;
2838         } else {
2839                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2840         }
2841
2842 out:
2843         kfree(reply_buf);
2844
2845         return ret;
2846 }
2847
2848 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2849                 u64 *snap_features)
2850 {
2851         __le64 snapid = cpu_to_le64(snap_id);
2852         struct {
2853                 __le64 features;
2854                 __le64 incompat;
2855         } features_buf = { 0 };
2856         u64 incompat;
2857         int ret;
2858
2859         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2860                                 "rbd", "get_features",
2861                                 (char *) &snapid, sizeof (snapid),
2862                                 (char *) &features_buf, sizeof (features_buf),
2863                                 NULL);
2864         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2865         if (ret < 0)
2866                 return ret;
2867
2868         incompat = le64_to_cpu(features_buf.incompat);
2869         if (incompat & ~RBD_FEATURES_ALL)
2870                 return -ENXIO;
2871
2872         *snap_features = le64_to_cpu(features_buf.features);
2873
2874         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2875                 (unsigned long long) snap_id,
2876                 (unsigned long long) *snap_features,
2877                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2878
2879         return 0;
2880 }
2881
2882 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2883 {
2884         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2885                                                 &rbd_dev->header.features);
2886 }
2887
2888 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2889 {
2890         struct rbd_spec *parent_spec;
2891         size_t size;
2892         void *reply_buf = NULL;
2893         __le64 snapid;
2894         void *p;
2895         void *end;
2896         char *image_id;
2897         u64 overlap;
2898         int ret;
2899
2900         parent_spec = rbd_spec_alloc();
2901         if (!parent_spec)
2902                 return -ENOMEM;
2903
2904         size = sizeof (__le64) +                                /* pool_id */
2905                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2906                 sizeof (__le64) +                               /* snap_id */
2907                 sizeof (__le64);                                /* overlap */
2908         reply_buf = kmalloc(size, GFP_KERNEL);
2909         if (!reply_buf) {
2910                 ret = -ENOMEM;
2911                 goto out_err;
2912         }
2913
2914         snapid = cpu_to_le64(CEPH_NOSNAP);
2915         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2916                                 "rbd", "get_parent",
2917                                 (char *) &snapid, sizeof (snapid),
2918                                 (char *) reply_buf, size, NULL);
2919         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2920         if (ret < 0)
2921                 goto out_err;
2922
2923         ret = -ERANGE;
2924         p = reply_buf;
2925         end = (char *) reply_buf + size;
2926         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2927         if (parent_spec->pool_id == CEPH_NOPOOL)
2928                 goto out;       /* No parent?  No problem. */
2929
2930         /* The ceph file layout needs to fit pool id in 32 bits */
2931
2932         ret = -EIO;
2933         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2934                 goto out;
2935
2936         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2937         if (IS_ERR(image_id)) {
2938                 ret = PTR_ERR(image_id);
2939                 goto out_err;
2940         }
2941         parent_spec->image_id = image_id;
2942         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2943         ceph_decode_64_safe(&p, end, overlap, out_err);
2944
2945         rbd_dev->parent_overlap = overlap;
2946         rbd_dev->parent_spec = parent_spec;
2947         parent_spec = NULL;     /* rbd_dev now owns this */
2948 out:
2949         ret = 0;
2950 out_err:
2951         kfree(reply_buf);
2952         rbd_spec_put(parent_spec);
2953
2954         return ret;
2955 }
2956
2957 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2958 {
2959         size_t image_id_size;
2960         char *image_id;
2961         void *p;
2962         void *end;
2963         size_t size;
2964         void *reply_buf = NULL;
2965         size_t len = 0;
2966         char *image_name = NULL;
2967         int ret;
2968
2969         rbd_assert(!rbd_dev->spec->image_name);
2970
2971         len = strlen(rbd_dev->spec->image_id);
2972         image_id_size = sizeof (__le32) + len;
2973         image_id = kmalloc(image_id_size, GFP_KERNEL);
2974         if (!image_id)
2975                 return NULL;
2976
2977         p = image_id;
2978         end = (char *) image_id + image_id_size;
2979         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2980
2981         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2982         reply_buf = kmalloc(size, GFP_KERNEL);
2983         if (!reply_buf)
2984                 goto out;
2985
2986         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2987                                 "rbd", "dir_get_name",
2988                                 image_id, image_id_size,
2989                                 (char *) reply_buf, size, NULL);
2990         if (ret < 0)
2991                 goto out;
2992         p = reply_buf;
2993         end = (char *) reply_buf + size;
2994         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2995         if (IS_ERR(image_name))
2996                 image_name = NULL;
2997         else
2998                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2999 out:
3000         kfree(reply_buf);
3001         kfree(image_id);
3002
3003         return image_name;
3004 }
3005
3006 /*
3007  * When a parent image gets probed, we only have the pool, image,
3008  * and snapshot ids but not the names of any of them.  This call
3009  * is made later to fill in those names.  It has to be done after
3010  * rbd_dev_snaps_update() has completed because some of the
3011  * information (in particular, snapshot name) is not available
3012  * until then.
3013  */
3014 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3015 {
3016         struct ceph_osd_client *osdc;
3017         const char *name;
3018         void *reply_buf = NULL;
3019         int ret;
3020
3021         if (rbd_dev->spec->pool_name)
3022                 return 0;       /* Already have the names */
3023
3024         /* Look up the pool name */
3025
3026         osdc = &rbd_dev->rbd_client->client->osdc;
3027         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3028         if (!name) {
3029                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3030                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3031                 return -EIO;
3032         }
3033
3034         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3035         if (!rbd_dev->spec->pool_name)
3036                 return -ENOMEM;
3037
3038         /* Fetch the image name; tolerate failure here */
3039
3040         name = rbd_dev_image_name(rbd_dev);
3041         if (name)
3042                 rbd_dev->spec->image_name = (char *) name;
3043         else
3044                 rbd_warn(rbd_dev, "unable to get image name");
3045
3046         /* Look up the snapshot name. */
3047
3048         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3049         if (!name) {
3050                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3051                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3052                 ret = -EIO;
3053                 goto out_err;
3054         }
3055         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3056         if(!rbd_dev->spec->snap_name)
3057                 goto out_err;
3058
3059         return 0;
3060 out_err:
3061         kfree(reply_buf);
3062         kfree(rbd_dev->spec->pool_name);
3063         rbd_dev->spec->pool_name = NULL;
3064
3065         return ret;
3066 }
3067
3068 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3069 {
3070         size_t size;
3071         int ret;
3072         void *reply_buf;
3073         void *p;
3074         void *end;
3075         u64 seq;
3076         u32 snap_count;
3077         struct ceph_snap_context *snapc;
3078         u32 i;
3079
3080         /*
3081          * We'll need room for the seq value (maximum snapshot id),
3082          * snapshot count, and array of that many snapshot ids.
3083          * For now we have a fixed upper limit on the number we're
3084          * prepared to receive.
3085          */
3086         size = sizeof (__le64) + sizeof (__le32) +
3087                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3088         reply_buf = kzalloc(size, GFP_KERNEL);
3089         if (!reply_buf)
3090                 return -ENOMEM;
3091
3092         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3093                                 "rbd", "get_snapcontext",
3094                                 NULL, 0,
3095                                 reply_buf, size, ver);
3096         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3097         if (ret < 0)
3098                 goto out;
3099
3100         ret = -ERANGE;
3101         p = reply_buf;
3102         end = (char *) reply_buf + size;
3103         ceph_decode_64_safe(&p, end, seq, out);
3104         ceph_decode_32_safe(&p, end, snap_count, out);
3105
3106         /*
3107          * Make sure the reported number of snapshot ids wouldn't go
3108          * beyond the end of our buffer.  But before checking that,
3109          * make sure the computed size of the snapshot context we
3110          * allocate is representable in a size_t.
3111          */
3112         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3113                                  / sizeof (u64)) {
3114                 ret = -EINVAL;
3115                 goto out;
3116         }
3117         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3118                 goto out;
3119
3120         size = sizeof (struct ceph_snap_context) +
3121                                 snap_count * sizeof (snapc->snaps[0]);
3122         snapc = kmalloc(size, GFP_KERNEL);
3123         if (!snapc) {
3124                 ret = -ENOMEM;
3125                 goto out;
3126         }
3127
3128         atomic_set(&snapc->nref, 1);
3129         snapc->seq = seq;
3130         snapc->num_snaps = snap_count;
3131         for (i = 0; i < snap_count; i++)
3132                 snapc->snaps[i] = ceph_decode_64(&p);
3133
3134         rbd_dev->header.snapc = snapc;
3135
3136         dout("  snap context seq = %llu, snap_count = %u\n",
3137                 (unsigned long long) seq, (unsigned int) snap_count);
3138
3139 out:
3140         kfree(reply_buf);
3141
3142         return 0;
3143 }
3144
3145 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3146 {
3147         size_t size;
3148         void *reply_buf;
3149         __le64 snap_id;
3150         int ret;
3151         void *p;
3152         void *end;
3153         char *snap_name;
3154
3155         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3156         reply_buf = kmalloc(size, GFP_KERNEL);
3157         if (!reply_buf)
3158                 return ERR_PTR(-ENOMEM);
3159
3160         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3161         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3162                                 "rbd", "get_snapshot_name",
3163                                 (char *) &snap_id, sizeof (snap_id),
3164                                 reply_buf, size, NULL);
3165         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3166         if (ret < 0)
3167                 goto out;
3168
3169         p = reply_buf;
3170         end = (char *) reply_buf + size;
3171         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3172         if (IS_ERR(snap_name)) {
3173                 ret = PTR_ERR(snap_name);
3174                 goto out;
3175         } else {
3176                 dout("  snap_id 0x%016llx snap_name = %s\n",
3177                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3178         }
3179         kfree(reply_buf);
3180
3181         return snap_name;
3182 out:
3183         kfree(reply_buf);
3184
3185         return ERR_PTR(ret);
3186 }
3187
3188 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3189                 u64 *snap_size, u64 *snap_features)
3190 {
3191         u64 snap_id;
3192         u8 order;
3193         int ret;
3194
3195         snap_id = rbd_dev->header.snapc->snaps[which];
3196         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3197         if (ret)
3198                 return ERR_PTR(ret);
3199         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3200         if (ret)
3201                 return ERR_PTR(ret);
3202
3203         return rbd_dev_v2_snap_name(rbd_dev, which);
3204 }
3205
3206 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3207                 u64 *snap_size, u64 *snap_features)
3208 {
3209         if (rbd_dev->image_format == 1)
3210                 return rbd_dev_v1_snap_info(rbd_dev, which,
3211                                         snap_size, snap_features);
3212         if (rbd_dev->image_format == 2)
3213                 return rbd_dev_v2_snap_info(rbd_dev, which,
3214                                         snap_size, snap_features);
3215         return ERR_PTR(-EINVAL);
3216 }
3217
3218 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3219 {
3220         int ret;
3221         __u8 obj_order;
3222
3223         down_write(&rbd_dev->header_rwsem);
3224
3225         /* Grab old order first, to see if it changes */
3226
3227         obj_order = rbd_dev->header.obj_order,
3228         ret = rbd_dev_v2_image_size(rbd_dev);
3229         if (ret)
3230                 goto out;
3231         if (rbd_dev->header.obj_order != obj_order) {
3232                 ret = -EIO;
3233                 goto out;
3234         }
3235         rbd_update_mapping_size(rbd_dev);
3236
3237         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3238         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3239         if (ret)
3240                 goto out;
3241         ret = rbd_dev_snaps_update(rbd_dev);
3242         dout("rbd_dev_snaps_update returned %d\n", ret);
3243         if (ret)
3244                 goto out;
3245         ret = rbd_dev_snaps_register(rbd_dev);
3246         dout("rbd_dev_snaps_register returned %d\n", ret);
3247 out:
3248         up_write(&rbd_dev->header_rwsem);
3249
3250         return ret;
3251 }
3252
3253 /*
3254  * Scan the rbd device's current snapshot list and compare it to the
3255  * newly-received snapshot context.  Remove any existing snapshots
3256  * not present in the new snapshot context.  Add a new snapshot for
3257  * any snaphots in the snapshot context not in the current list.
3258  * And verify there are no changes to snapshots we already know
3259  * about.
3260  *
3261  * Assumes the snapshots in the snapshot context are sorted by
3262  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3263  * are also maintained in that order.)
3264  */
3265 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3266 {
3267         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3268         const u32 snap_count = snapc->num_snaps;
3269         struct list_head *head = &rbd_dev->snaps;
3270         struct list_head *links = head->next;
3271         u32 index = 0;
3272
3273         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3274         while (index < snap_count || links != head) {
3275                 u64 snap_id;
3276                 struct rbd_snap *snap;
3277                 char *snap_name;
3278                 u64 snap_size = 0;
3279                 u64 snap_features = 0;
3280
3281                 snap_id = index < snap_count ? snapc->snaps[index]
3282                                              : CEPH_NOSNAP;
3283                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3284                                      : NULL;
3285                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3286
3287                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3288                         struct list_head *next = links->next;
3289
3290                         /*
3291                          * A previously-existing snapshot is not in
3292                          * the new snap context.
3293                          *
3294                          * If the now missing snapshot is the one the
3295                          * image is mapped to, clear its exists flag
3296                          * so we can avoid sending any more requests
3297                          * to it.
3298                          */
3299                         if (rbd_dev->spec->snap_id == snap->id)
3300                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3301                         rbd_remove_snap_dev(snap);
3302                         dout("%ssnap id %llu has been removed\n",
3303                                 rbd_dev->spec->snap_id == snap->id ?
3304                                                         "mapped " : "",
3305                                 (unsigned long long) snap->id);
3306
3307                         /* Done with this list entry; advance */
3308
3309                         links = next;
3310                         continue;
3311                 }
3312
3313                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3314                                         &snap_size, &snap_features);
3315                 if (IS_ERR(snap_name))
3316                         return PTR_ERR(snap_name);
3317
3318                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3319                         (unsigned long long) snap_id);
3320                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3321                         struct rbd_snap *new_snap;
3322
3323                         /* We haven't seen this snapshot before */
3324
3325                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3326                                         snap_id, snap_size, snap_features);
3327                         if (IS_ERR(new_snap)) {
3328                                 int err = PTR_ERR(new_snap);
3329
3330                                 dout("  failed to add dev, error %d\n", err);
3331
3332                                 return err;
3333                         }
3334
3335                         /* New goes before existing, or at end of list */
3336
3337                         dout("  added dev%s\n", snap ? "" : " at end\n");
3338                         if (snap)
3339                                 list_add_tail(&new_snap->node, &snap->node);
3340                         else
3341                                 list_add_tail(&new_snap->node, head);
3342                 } else {
3343                         /* Already have this one */
3344
3345                         dout("  already present\n");
3346
3347                         rbd_assert(snap->size == snap_size);
3348                         rbd_assert(!strcmp(snap->name, snap_name));
3349                         rbd_assert(snap->features == snap_features);
3350
3351                         /* Done with this list entry; advance */
3352
3353                         links = links->next;
3354                 }
3355
3356                 /* Advance to the next entry in the snapshot context */
3357
3358                 index++;
3359         }
3360         dout("%s: done\n", __func__);
3361
3362         return 0;
3363 }
3364
3365 /*
3366  * Scan the list of snapshots and register the devices for any that
3367  * have not already been registered.
3368  */
3369 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3370 {
3371         struct rbd_snap *snap;
3372         int ret = 0;
3373
3374         dout("%s called\n", __func__);
3375         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3376                 return -EIO;
3377
3378         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3379                 if (!rbd_snap_registered(snap)) {
3380                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3381                         if (ret < 0)
3382                                 break;
3383                 }
3384         }
3385         dout("%s: returning %d\n", __func__, ret);
3386
3387         return ret;
3388 }
3389
3390 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3391 {
3392         struct device *dev;
3393         int ret;
3394
3395         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3396
3397         dev = &rbd_dev->dev;
3398         dev->bus = &rbd_bus_type;
3399         dev->type = &rbd_device_type;
3400         dev->parent = &rbd_root_dev;
3401         dev->release = rbd_dev_release;
3402         dev_set_name(dev, "%d", rbd_dev->dev_id);
3403         ret = device_register(dev);
3404
3405         mutex_unlock(&ctl_mutex);
3406
3407         return ret;
3408 }
3409
3410 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3411 {
3412         device_unregister(&rbd_dev->dev);
3413 }
3414
3415 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3416
3417 /*
3418  * Get a unique rbd identifier for the given new rbd_dev, and add
3419  * the rbd_dev to the global list.  The minimum rbd id is 1.
3420  */
3421 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3422 {
3423         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3424
3425         spin_lock(&rbd_dev_list_lock);
3426         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3427         spin_unlock(&rbd_dev_list_lock);
3428         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3429                 (unsigned long long) rbd_dev->dev_id);
3430 }
3431
3432 /*
3433  * Remove an rbd_dev from the global list, and record that its
3434  * identifier is no longer in use.
3435  */
3436 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3437 {
3438         struct list_head *tmp;
3439         int rbd_id = rbd_dev->dev_id;
3440         int max_id;
3441
3442         rbd_assert(rbd_id > 0);
3443
3444         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3445                 (unsigned long long) rbd_dev->dev_id);
3446         spin_lock(&rbd_dev_list_lock);
3447         list_del_init(&rbd_dev->node);
3448
3449         /*
3450          * If the id being "put" is not the current maximum, there
3451          * is nothing special we need to do.
3452          */
3453         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3454                 spin_unlock(&rbd_dev_list_lock);
3455                 return;
3456         }
3457
3458         /*
3459          * We need to update the current maximum id.  Search the
3460          * list to find out what it is.  We're more likely to find
3461          * the maximum at the end, so search the list backward.
3462          */
3463         max_id = 0;
3464         list_for_each_prev(tmp, &rbd_dev_list) {
3465                 struct rbd_device *rbd_dev;
3466
3467                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3468                 if (rbd_dev->dev_id > max_id)
3469                         max_id = rbd_dev->dev_id;
3470         }
3471         spin_unlock(&rbd_dev_list_lock);
3472
3473         /*
3474          * The max id could have been updated by rbd_dev_id_get(), in
3475          * which case it now accurately reflects the new maximum.
3476          * Be careful not to overwrite the maximum value in that
3477          * case.
3478          */
3479         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3480         dout("  max dev id has been reset\n");
3481 }
3482
3483 /*
3484  * Skips over white space at *buf, and updates *buf to point to the
3485  * first found non-space character (if any). Returns the length of
3486  * the token (string of non-white space characters) found.  Note
3487  * that *buf must be terminated with '\0'.
3488  */
3489 static inline size_t next_token(const char **buf)
3490 {
3491         /*
3492         * These are the characters that produce nonzero for
3493         * isspace() in the "C" and "POSIX" locales.
3494         */
3495         const char *spaces = " \f\n\r\t\v";
3496
3497         *buf += strspn(*buf, spaces);   /* Find start of token */
3498
3499         return strcspn(*buf, spaces);   /* Return token length */
3500 }
3501
3502 /*
3503  * Finds the next token in *buf, and if the provided token buffer is
3504  * big enough, copies the found token into it.  The result, if
3505  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3506  * must be terminated with '\0' on entry.
3507  *
3508  * Returns the length of the token found (not including the '\0').
3509  * Return value will be 0 if no token is found, and it will be >=
3510  * token_size if the token would not fit.
3511  *
3512  * The *buf pointer will be updated to point beyond the end of the
3513  * found token.  Note that this occurs even if the token buffer is
3514  * too small to hold it.
3515  */
3516 static inline size_t copy_token(const char **buf,
3517                                 char *token,
3518                                 size_t token_size)
3519 {
3520         size_t len;
3521
3522         len = next_token(buf);
3523         if (len < token_size) {
3524                 memcpy(token, *buf, len);
3525                 *(token + len) = '\0';
3526         }
3527         *buf += len;
3528
3529         return len;
3530 }
3531
3532 /*
3533  * Finds the next token in *buf, dynamically allocates a buffer big
3534  * enough to hold a copy of it, and copies the token into the new
3535  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3536  * that a duplicate buffer is created even for a zero-length token.
3537  *
3538  * Returns a pointer to the newly-allocated duplicate, or a null
3539  * pointer if memory for the duplicate was not available.  If
3540  * the lenp argument is a non-null pointer, the length of the token
3541  * (not including the '\0') is returned in *lenp.
3542  *
3543  * If successful, the *buf pointer will be updated to point beyond
3544  * the end of the found token.
3545  *
3546  * Note: uses GFP_KERNEL for allocation.
3547  */
3548 static inline char *dup_token(const char **buf, size_t *lenp)
3549 {
3550         char *dup;
3551         size_t len;
3552
3553         len = next_token(buf);
3554         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3555         if (!dup)
3556                 return NULL;
3557         *(dup + len) = '\0';
3558         *buf += len;
3559
3560         if (lenp)
3561                 *lenp = len;
3562
3563         return dup;
3564 }
3565
3566 /*
3567  * Parse the options provided for an "rbd add" (i.e., rbd image
3568  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3569  * and the data written is passed here via a NUL-terminated buffer.
3570  * Returns 0 if successful or an error code otherwise.
3571  *
3572  * The information extracted from these options is recorded in
3573  * the other parameters which return dynamically-allocated
3574  * structures:
3575  *  ceph_opts
3576  *      The address of a pointer that will refer to a ceph options
3577  *      structure.  Caller must release the returned pointer using
3578  *      ceph_destroy_options() when it is no longer needed.
3579  *  rbd_opts
3580  *      Address of an rbd options pointer.  Fully initialized by
3581  *      this function; caller must release with kfree().
3582  *  spec
3583  *      Address of an rbd image specification pointer.  Fully
3584  *      initialized by this function based on parsed options.
3585  *      Caller must release with rbd_spec_put().
3586  *
3587  * The options passed take this form:
3588  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3589  * where:
3590  *  <mon_addrs>
3591  *      A comma-separated list of one or more monitor addresses.
3592  *      A monitor address is an ip address, optionally followed
3593  *      by a port number (separated by a colon).
3594  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3595  *  <options>
3596  *      A comma-separated list of ceph and/or rbd options.
3597  *  <pool_name>
3598  *      The name of the rados pool containing the rbd image.
3599  *  <image_name>
3600  *      The name of the image in that pool to map.
3601  *  <snap_id>
3602  *      An optional snapshot id.  If provided, the mapping will
3603  *      present data from the image at the time that snapshot was
3604  *      created.  The image head is used if no snapshot id is
3605  *      provided.  Snapshot mappings are always read-only.
3606  */
3607 static int rbd_add_parse_args(const char *buf,
3608                                 struct ceph_options **ceph_opts,
3609                                 struct rbd_options **opts,
3610                                 struct rbd_spec **rbd_spec)
3611 {
3612         size_t len;
3613         char *options;
3614         const char *mon_addrs;
3615         size_t mon_addrs_size;
3616         struct rbd_spec *spec = NULL;
3617         struct rbd_options *rbd_opts = NULL;
3618         struct ceph_options *copts;
3619         int ret;
3620
3621         /* The first four tokens are required */
3622
3623         len = next_token(&buf);
3624         if (!len) {
3625                 rbd_warn(NULL, "no monitor address(es) provided");
3626                 return -EINVAL;
3627         }
3628         mon_addrs = buf;
3629         mon_addrs_size = len + 1;
3630         buf += len;
3631
3632         ret = -EINVAL;
3633         options = dup_token(&buf, NULL);
3634         if (!options)
3635                 return -ENOMEM;
3636         if (!*options) {
3637                 rbd_warn(NULL, "no options provided");
3638                 goto out_err;
3639         }
3640
3641         spec = rbd_spec_alloc();
3642         if (!spec)
3643                 goto out_mem;
3644
3645         spec->pool_name = dup_token(&buf, NULL);
3646         if (!spec->pool_name)
3647                 goto out_mem;
3648         if (!*spec->pool_name) {
3649                 rbd_warn(NULL, "no pool name provided");
3650                 goto out_err;
3651         }
3652
3653         spec->image_name = dup_token(&buf, NULL);
3654         if (!spec->image_name)
3655                 goto out_mem;
3656         if (!*spec->image_name) {
3657                 rbd_warn(NULL, "no image name provided");
3658                 goto out_err;
3659         }
3660
3661         /*
3662          * Snapshot name is optional; default is to use "-"
3663          * (indicating the head/no snapshot).
3664          */
3665         len = next_token(&buf);
3666         if (!len) {
3667                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3668                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3669         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3670                 ret = -ENAMETOOLONG;
3671                 goto out_err;
3672         }
3673         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3674         if (!spec->snap_name)
3675                 goto out_mem;
3676         *(spec->snap_name + len) = '\0';
3677
3678         /* Initialize all rbd options to the defaults */
3679
3680         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3681         if (!rbd_opts)
3682                 goto out_mem;
3683
3684         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3685
3686         copts = ceph_parse_options(options, mon_addrs,
3687                                         mon_addrs + mon_addrs_size - 1,
3688                                         parse_rbd_opts_token, rbd_opts);
3689         if (IS_ERR(copts)) {
3690                 ret = PTR_ERR(copts);
3691                 goto out_err;
3692         }
3693         kfree(options);
3694
3695         *ceph_opts = copts;
3696         *opts = rbd_opts;
3697         *rbd_spec = spec;
3698
3699         return 0;
3700 out_mem:
3701         ret = -ENOMEM;
3702 out_err:
3703         kfree(rbd_opts);
3704         rbd_spec_put(spec);
3705         kfree(options);
3706
3707         return ret;
3708 }
3709
3710 /*
3711  * An rbd format 2 image has a unique identifier, distinct from the
3712  * name given to it by the user.  Internally, that identifier is
3713  * what's used to specify the names of objects related to the image.
3714  *
3715  * A special "rbd id" object is used to map an rbd image name to its
3716  * id.  If that object doesn't exist, then there is no v2 rbd image
3717  * with the supplied name.
3718  *
3719  * This function will record the given rbd_dev's image_id field if
3720  * it can be determined, and in that case will return 0.  If any
3721  * errors occur a negative errno will be returned and the rbd_dev's
3722  * image_id field will be unchanged (and should be NULL).
3723  */
3724 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3725 {
3726         int ret;
3727         size_t size;
3728         char *object_name;
3729         void *response;
3730         void *p;
3731
3732         /*
3733          * When probing a parent image, the image id is already
3734          * known (and the image name likely is not).  There's no
3735          * need to fetch the image id again in this case.
3736          */
3737         if (rbd_dev->spec->image_id)
3738                 return 0;
3739
3740         /*
3741          * First, see if the format 2 image id file exists, and if
3742          * so, get the image's persistent id from it.
3743          */
3744         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3745         object_name = kmalloc(size, GFP_NOIO);
3746         if (!object_name)
3747                 return -ENOMEM;
3748         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3749         dout("rbd id object name is %s\n", object_name);
3750
3751         /* Response will be an encoded string, which includes a length */
3752
3753         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3754         response = kzalloc(size, GFP_NOIO);
3755         if (!response) {
3756                 ret = -ENOMEM;
3757                 goto out;
3758         }
3759
3760         ret = rbd_obj_method_sync(rbd_dev, object_name,
3761                                 "rbd", "get_id",
3762                                 NULL, 0,
3763                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3764         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3765         if (ret < 0)
3766                 goto out;
3767
3768         p = response;
3769         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3770                                                 p + RBD_IMAGE_ID_LEN_MAX,
3771                                                 NULL, GFP_NOIO);
3772         if (IS_ERR(rbd_dev->spec->image_id)) {
3773                 ret = PTR_ERR(rbd_dev->spec->image_id);
3774                 rbd_dev->spec->image_id = NULL;
3775         } else {
3776                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3777         }
3778 out:
3779         kfree(response);
3780         kfree(object_name);
3781
3782         return ret;
3783 }
3784
3785 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3786 {
3787         int ret;
3788         size_t size;
3789
3790         /* Version 1 images have no id; empty string is used */
3791
3792         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3793         if (!rbd_dev->spec->image_id)
3794                 return -ENOMEM;
3795
3796         /* Record the header object name for this rbd image. */
3797
3798         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3799         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3800         if (!rbd_dev->header_name) {
3801                 ret = -ENOMEM;
3802                 goto out_err;
3803         }
3804         sprintf(rbd_dev->header_name, "%s%s",
3805                 rbd_dev->spec->image_name, RBD_SUFFIX);
3806
3807         /* Populate rbd image metadata */
3808
3809         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3810         if (ret < 0)
3811                 goto out_err;
3812
3813         /* Version 1 images have no parent (no layering) */
3814
3815         rbd_dev->parent_spec = NULL;
3816         rbd_dev->parent_overlap = 0;
3817
3818         rbd_dev->image_format = 1;
3819
3820         dout("discovered version 1 image, header name is %s\n",
3821                 rbd_dev->header_name);
3822
3823         return 0;
3824
3825 out_err:
3826         kfree(rbd_dev->header_name);
3827         rbd_dev->header_name = NULL;
3828         kfree(rbd_dev->spec->image_id);
3829         rbd_dev->spec->image_id = NULL;
3830
3831         return ret;
3832 }
3833
3834 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3835 {
3836         size_t size;
3837         int ret;
3838         u64 ver = 0;
3839
3840         /*
3841          * Image id was filled in by the caller.  Record the header
3842          * object name for this rbd image.
3843          */
3844         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3845         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3846         if (!rbd_dev->header_name)
3847                 return -ENOMEM;
3848         sprintf(rbd_dev->header_name, "%s%s",
3849                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3850
3851         /* Get the size and object order for the image */
3852
3853         ret = rbd_dev_v2_image_size(rbd_dev);
3854         if (ret < 0)
3855                 goto out_err;
3856
3857         /* Get the object prefix (a.k.a. block_name) for the image */
3858
3859         ret = rbd_dev_v2_object_prefix(rbd_dev);
3860         if (ret < 0)
3861                 goto out_err;
3862
3863         /* Get the and check features for the image */
3864
3865         ret = rbd_dev_v2_features(rbd_dev);
3866         if (ret < 0)
3867                 goto out_err;
3868
3869         /* If the image supports layering, get the parent info */
3870
3871         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3872                 ret = rbd_dev_v2_parent_info(rbd_dev);
3873                 if (ret < 0)
3874                         goto out_err;
3875         }
3876
3877         /* crypto and compression type aren't (yet) supported for v2 images */
3878
3879         rbd_dev->header.crypt_type = 0;
3880         rbd_dev->header.comp_type = 0;
3881
3882         /* Get the snapshot context, plus the header version */
3883
3884         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3885         if (ret)
3886                 goto out_err;
3887         rbd_dev->header.obj_version = ver;
3888
3889         rbd_dev->image_format = 2;
3890
3891         dout("discovered version 2 image, header name is %s\n",
3892                 rbd_dev->header_name);
3893
3894         return 0;
3895 out_err:
3896         rbd_dev->parent_overlap = 0;
3897         rbd_spec_put(rbd_dev->parent_spec);
3898         rbd_dev->parent_spec = NULL;
3899         kfree(rbd_dev->header_name);
3900         rbd_dev->header_name = NULL;
3901         kfree(rbd_dev->header.object_prefix);
3902         rbd_dev->header.object_prefix = NULL;
3903
3904         return ret;
3905 }
3906
3907 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3908 {
3909         int ret;
3910
3911         /* no need to lock here, as rbd_dev is not registered yet */
3912         ret = rbd_dev_snaps_update(rbd_dev);
3913         if (ret)
3914                 return ret;
3915
3916         ret = rbd_dev_probe_update_spec(rbd_dev);
3917         if (ret)
3918                 goto err_out_snaps;
3919
3920         ret = rbd_dev_set_mapping(rbd_dev);
3921         if (ret)
3922                 goto err_out_snaps;
3923
3924         /* generate unique id: find highest unique id, add one */
3925         rbd_dev_id_get(rbd_dev);
3926
3927         /* Fill in the device name, now that we have its id. */
3928         BUILD_BUG_ON(DEV_NAME_LEN
3929                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3930         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3931
3932         /* Get our block major device number. */
3933
3934         ret = register_blkdev(0, rbd_dev->name);
3935         if (ret < 0)
3936                 goto err_out_id;
3937         rbd_dev->major = ret;
3938
3939         /* Set up the blkdev mapping. */
3940
3941         ret = rbd_init_disk(rbd_dev);
3942         if (ret)
3943                 goto err_out_blkdev;
3944
3945         ret = rbd_bus_add_dev(rbd_dev);
3946         if (ret)
3947                 goto err_out_disk;
3948
3949         /*
3950          * At this point cleanup in the event of an error is the job
3951          * of the sysfs code (initiated by rbd_bus_del_dev()).
3952          */
3953         down_write(&rbd_dev->header_rwsem);
3954         ret = rbd_dev_snaps_register(rbd_dev);
3955         up_write(&rbd_dev->header_rwsem);
3956         if (ret)
3957                 goto err_out_bus;
3958
3959         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3960         if (ret)
3961                 goto err_out_bus;
3962
3963         /* Everything's ready.  Announce the disk to the world. */
3964
3965         add_disk(rbd_dev->disk);
3966
3967         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3968                 (unsigned long long) rbd_dev->mapping.size);
3969
3970         return ret;
3971 err_out_bus:
3972         /* this will also clean up rest of rbd_dev stuff */
3973
3974         rbd_bus_del_dev(rbd_dev);
3975
3976         return ret;
3977 err_out_disk:
3978         rbd_free_disk(rbd_dev);
3979 err_out_blkdev:
3980         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3981 err_out_id:
3982         rbd_dev_id_put(rbd_dev);
3983 err_out_snaps:
3984         rbd_remove_all_snaps(rbd_dev);
3985
3986         return ret;
3987 }
3988
3989 /*
3990  * Probe for the existence of the header object for the given rbd
3991  * device.  For format 2 images this includes determining the image
3992  * id.
3993  */
3994 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3995 {
3996         int ret;
3997
3998         /*
3999          * Get the id from the image id object.  If it's not a
4000          * format 2 image, we'll get ENOENT back, and we'll assume
4001          * it's a format 1 image.
4002          */
4003         ret = rbd_dev_image_id(rbd_dev);
4004         if (ret)
4005                 ret = rbd_dev_v1_probe(rbd_dev);
4006         else
4007                 ret = rbd_dev_v2_probe(rbd_dev);
4008         if (ret) {
4009                 dout("probe failed, returning %d\n", ret);
4010
4011                 return ret;
4012         }
4013
4014         ret = rbd_dev_probe_finish(rbd_dev);
4015         if (ret)
4016                 rbd_header_free(&rbd_dev->header);
4017
4018         return ret;
4019 }
4020
4021 static ssize_t rbd_add(struct bus_type *bus,
4022                        const char *buf,
4023                        size_t count)
4024 {
4025         struct rbd_device *rbd_dev = NULL;
4026         struct ceph_options *ceph_opts = NULL;
4027         struct rbd_options *rbd_opts = NULL;
4028         struct rbd_spec *spec = NULL;
4029         struct rbd_client *rbdc;
4030         struct ceph_osd_client *osdc;
4031         int rc = -ENOMEM;
4032
4033         if (!try_module_get(THIS_MODULE))
4034                 return -ENODEV;
4035
4036         /* parse add command */
4037         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4038         if (rc < 0)
4039                 goto err_out_module;
4040
4041         rbdc = rbd_get_client(ceph_opts);
4042         if (IS_ERR(rbdc)) {
4043                 rc = PTR_ERR(rbdc);
4044                 goto err_out_args;
4045         }
4046         ceph_opts = NULL;       /* rbd_dev client now owns this */
4047
4048         /* pick the pool */
4049         osdc = &rbdc->client->osdc;
4050         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4051         if (rc < 0)
4052                 goto err_out_client;
4053         spec->pool_id = (u64) rc;
4054
4055         /* The ceph file layout needs to fit pool id in 32 bits */
4056
4057         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4058                 rc = -EIO;
4059                 goto err_out_client;
4060         }
4061
4062         rbd_dev = rbd_dev_create(rbdc, spec);
4063         if (!rbd_dev)
4064                 goto err_out_client;
4065         rbdc = NULL;            /* rbd_dev now owns this */
4066         spec = NULL;            /* rbd_dev now owns this */
4067
4068         rbd_dev->mapping.read_only = rbd_opts->read_only;
4069         kfree(rbd_opts);
4070         rbd_opts = NULL;        /* done with this */
4071
4072         rc = rbd_dev_probe(rbd_dev);
4073         if (rc < 0)
4074                 goto err_out_rbd_dev;
4075
4076         return count;
4077 err_out_rbd_dev:
4078         rbd_dev_destroy(rbd_dev);
4079 err_out_client:
4080         rbd_put_client(rbdc);
4081 err_out_args:
4082         if (ceph_opts)
4083                 ceph_destroy_options(ceph_opts);
4084         kfree(rbd_opts);
4085         rbd_spec_put(spec);
4086 err_out_module:
4087         module_put(THIS_MODULE);
4088
4089         dout("Error adding device %s\n", buf);
4090
4091         return (ssize_t) rc;
4092 }
4093
4094 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4095 {
4096         struct list_head *tmp;
4097         struct rbd_device *rbd_dev;
4098
4099         spin_lock(&rbd_dev_list_lock);
4100         list_for_each(tmp, &rbd_dev_list) {
4101                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4102                 if (rbd_dev->dev_id == dev_id) {
4103                         spin_unlock(&rbd_dev_list_lock);
4104                         return rbd_dev;
4105                 }
4106         }
4107         spin_unlock(&rbd_dev_list_lock);
4108         return NULL;
4109 }
4110
4111 static void rbd_dev_release(struct device *dev)
4112 {
4113         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4114
4115         if (rbd_dev->watch_event)
4116                 rbd_dev_header_watch_sync(rbd_dev, 0);
4117
4118         /* clean up and free blkdev */
4119         rbd_free_disk(rbd_dev);
4120         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4121
4122         /* release allocated disk header fields */
4123         rbd_header_free(&rbd_dev->header);
4124
4125         /* done with the id, and with the rbd_dev */
4126         rbd_dev_id_put(rbd_dev);
4127         rbd_assert(rbd_dev->rbd_client != NULL);
4128         rbd_dev_destroy(rbd_dev);
4129
4130         /* release module ref */
4131         module_put(THIS_MODULE);
4132 }
4133
4134 static ssize_t rbd_remove(struct bus_type *bus,
4135                           const char *buf,
4136                           size_t count)
4137 {
4138         struct rbd_device *rbd_dev = NULL;
4139         int target_id, rc;
4140         unsigned long ul;
4141         int ret = count;
4142
4143         rc = strict_strtoul(buf, 10, &ul);
4144         if (rc)
4145                 return rc;
4146
4147         /* convert to int; abort if we lost anything in the conversion */
4148         target_id = (int) ul;
4149         if (target_id != ul)
4150                 return -EINVAL;
4151
4152         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4153
4154         rbd_dev = __rbd_get_dev(target_id);
4155         if (!rbd_dev) {
4156                 ret = -ENOENT;
4157                 goto done;
4158         }
4159
4160         spin_lock_irq(&rbd_dev->lock);
4161         if (rbd_dev->open_count)
4162                 ret = -EBUSY;
4163         else
4164                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4165         spin_unlock_irq(&rbd_dev->lock);
4166         if (ret < 0)
4167                 goto done;
4168
4169         rbd_remove_all_snaps(rbd_dev);
4170         rbd_bus_del_dev(rbd_dev);
4171
4172 done:
4173         mutex_unlock(&ctl_mutex);
4174
4175         return ret;
4176 }
4177
4178 /*
4179  * create control files in sysfs
4180  * /sys/bus/rbd/...
4181  */
4182 static int rbd_sysfs_init(void)
4183 {
4184         int ret;
4185
4186         ret = device_register(&rbd_root_dev);
4187         if (ret < 0)
4188                 return ret;
4189
4190         ret = bus_register(&rbd_bus_type);
4191         if (ret < 0)
4192                 device_unregister(&rbd_root_dev);
4193
4194         return ret;
4195 }
4196
4197 static void rbd_sysfs_cleanup(void)
4198 {
4199         bus_unregister(&rbd_bus_type);
4200         device_unregister(&rbd_root_dev);
4201 }
4202
4203 int __init rbd_init(void)
4204 {
4205         int rc;
4206
4207         if (!libceph_compatible(NULL)) {
4208                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4209
4210                 return -EINVAL;
4211         }
4212         rc = rbd_sysfs_init();
4213         if (rc)
4214                 return rc;
4215         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4216         return 0;
4217 }
4218
4219 void __exit rbd_exit(void)
4220 {
4221         rbd_sysfs_cleanup();
4222 }
4223
4224 module_init(rbd_init);
4225 module_exit(rbd_exit);
4226
4227 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4228 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4229 MODULE_DESCRIPTION("rados block device");
4230
4231 /* following authorship retained from original osdblk.c */
4232 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4233
4234 MODULE_LICENSE("GPL");