]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: prevent open for image being removed
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX  ((u8)   (~0U))
58 #define U16_MAX ((u16)  (~0U))
59 #define U32_MAX ((u32)  (~0U))
60 #define U64_MAX ((u64)  (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN   \
69                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME      "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX    64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX  64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING      1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL          (0)
88
89 /*
90  * An RBD device name will be "rbd#", where the "rbd" comes from
91  * RBD_DRV_NAME above, and # is a unique integer identifier.
92  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93  * enough to hold all possible device names.
94  */
95 #define DEV_NAME_LEN            32
96 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 obj_version;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         char            *pool_name;
146
147         char            *image_id;
148         char            *image_name;
149
150         u64             snap_id;
151         char            *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 struct rbd_obj_request {
178         const char              *object_name;
179         u64                     offset;         /* object start byte */
180         u64                     length;         /* bytes from offset */
181
182         struct rbd_img_request  *img_request;
183         struct list_head        links;          /* img_request->obj_requests */
184         u32                     which;          /* posn image request list */
185
186         enum obj_request_type   type;
187         union {
188                 struct bio      *bio_list;
189                 struct {
190                         struct page     **pages;
191                         u32             page_count;
192                 };
193         };
194
195         struct ceph_osd_request *osd_req;
196
197         u64                     xferred;        /* bytes transferred */
198         u64                     version;
199         s32                     result;
200         atomic_t                done;
201
202         rbd_obj_callback_t      callback;
203         struct completion       completion;
204
205         struct kref             kref;
206 };
207
208 struct rbd_img_request {
209         struct request          *rq;
210         struct rbd_device       *rbd_dev;
211         u64                     offset; /* starting image byte offset */
212         u64                     length; /* byte count from offset */
213         bool                    write_request;  /* false for read */
214         union {
215                 struct ceph_snap_context *snapc;        /* for writes */
216                 u64             snap_id;                /* for reads */
217         };
218         spinlock_t              completion_lock;/* protects next_completion */
219         u32                     next_completion;
220         rbd_img_callback_t      callback;
221
222         u32                     obj_request_count;
223         struct list_head        obj_requests;   /* rbd_obj_request structs */
224
225         struct kref             kref;
226 };
227
228 #define for_each_obj_request(ireq, oreq) \
229         list_for_each_entry(oreq, &ireq->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231         list_for_each_entry_from(oreq, &ireq->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233         list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
234
235 struct rbd_snap {
236         struct  device          dev;
237         const char              *name;
238         u64                     size;
239         struct list_head        node;
240         u64                     id;
241         u64                     features;
242 };
243
244 struct rbd_mapping {
245         u64                     size;
246         u64                     features;
247         bool                    read_only;
248 };
249
250 /*
251  * a single device
252  */
253 struct rbd_device {
254         int                     dev_id;         /* blkdev unique id */
255
256         int                     major;          /* blkdev assigned major */
257         struct gendisk          *disk;          /* blkdev's gendisk and rq */
258
259         u32                     image_format;   /* Either 1 or 2 */
260         struct rbd_client       *rbd_client;
261
262         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264         spinlock_t              lock;           /* queue, flags, open_count */
265
266         struct rbd_image_header header;
267         unsigned long           flags;          /* possibly lock protected */
268         struct rbd_spec         *spec;
269
270         char                    *header_name;
271
272         struct ceph_file_layout layout;
273
274         struct ceph_osd_event   *watch_event;
275         struct rbd_obj_request  *watch_request;
276
277         struct rbd_spec         *parent_spec;
278         u64                     parent_overlap;
279
280         /* protects updating the header */
281         struct rw_semaphore     header_rwsem;
282
283         struct rbd_mapping      mapping;
284
285         struct list_head        node;
286
287         /* list of snapshots */
288         struct list_head        snaps;
289
290         /* sysfs related */
291         struct device           dev;
292         unsigned long           open_count;     /* protected by lock */
293 };
294
295 /*
296  * Flag bits for rbd_dev->flags.  If atomicity is required,
297  * rbd_dev->lock is used to protect access.
298  *
299  * Currently, only the "removing" flag (which is coupled with the
300  * "open_count" field) requires atomic access.
301  */
302 enum rbd_dev_flags {
303         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
304         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
305 };
306
307 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
308
309 static LIST_HEAD(rbd_dev_list);    /* devices */
310 static DEFINE_SPINLOCK(rbd_dev_list_lock);
311
312 static LIST_HEAD(rbd_client_list);              /* clients */
313 static DEFINE_SPINLOCK(rbd_client_list_lock);
314
315 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
317
318 static void rbd_dev_release(struct device *dev);
319 static void rbd_remove_snap_dev(struct rbd_snap *snap);
320
321 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322                        size_t count);
323 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324                           size_t count);
325
326 static struct bus_attribute rbd_bus_attrs[] = {
327         __ATTR(add, S_IWUSR, NULL, rbd_add),
328         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329         __ATTR_NULL
330 };
331
332 static struct bus_type rbd_bus_type = {
333         .name           = "rbd",
334         .bus_attrs      = rbd_bus_attrs,
335 };
336
337 static void rbd_root_dev_release(struct device *dev)
338 {
339 }
340
341 static struct device rbd_root_dev = {
342         .init_name =    "rbd",
343         .release =      rbd_root_dev_release,
344 };
345
346 static __printf(2, 3)
347 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348 {
349         struct va_format vaf;
350         va_list args;
351
352         va_start(args, fmt);
353         vaf.fmt = fmt;
354         vaf.va = &args;
355
356         if (!rbd_dev)
357                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358         else if (rbd_dev->disk)
359                 printk(KERN_WARNING "%s: %s: %pV\n",
360                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361         else if (rbd_dev->spec && rbd_dev->spec->image_name)
362                 printk(KERN_WARNING "%s: image %s: %pV\n",
363                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364         else if (rbd_dev->spec && rbd_dev->spec->image_id)
365                 printk(KERN_WARNING "%s: id %s: %pV\n",
366                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367         else    /* punt */
368                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369                         RBD_DRV_NAME, rbd_dev, &vaf);
370         va_end(args);
371 }
372
373 #ifdef RBD_DEBUG
374 #define rbd_assert(expr)                                                \
375                 if (unlikely(!(expr))) {                                \
376                         printk(KERN_ERR "\nAssertion failure in %s() "  \
377                                                 "at line %d:\n\n"       \
378                                         "\trbd_assert(%s);\n\n",        \
379                                         __func__, __LINE__, #expr);     \
380                         BUG();                                          \
381                 }
382 #else /* !RBD_DEBUG */
383 #  define rbd_assert(expr)      ((void) 0)
384 #endif /* !RBD_DEBUG */
385
386 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
388
389 static int rbd_open(struct block_device *bdev, fmode_t mode)
390 {
391         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392         bool removing = false;
393
394         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
395                 return -EROFS;
396
397         spin_lock(&rbd_dev->lock);
398         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399                 removing = true;
400         else
401                 rbd_dev->open_count++;
402         spin_unlock(&rbd_dev->lock);
403         if (removing)
404                 return -ENOENT;
405
406         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407         (void) get_device(&rbd_dev->dev);
408         set_device_ro(bdev, rbd_dev->mapping.read_only);
409         mutex_unlock(&ctl_mutex);
410
411         return 0;
412 }
413
414 static int rbd_release(struct gendisk *disk, fmode_t mode)
415 {
416         struct rbd_device *rbd_dev = disk->private_data;
417         unsigned long open_count_before;
418
419         spin_lock(&rbd_dev->lock);
420         open_count_before = rbd_dev->open_count--;
421         spin_unlock(&rbd_dev->lock);
422         rbd_assert(open_count_before > 0);
423
424         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
425         put_device(&rbd_dev->dev);
426         mutex_unlock(&ctl_mutex);
427
428         return 0;
429 }
430
431 static const struct block_device_operations rbd_bd_ops = {
432         .owner                  = THIS_MODULE,
433         .open                   = rbd_open,
434         .release                = rbd_release,
435 };
436
437 /*
438  * Initialize an rbd client instance.
439  * We own *ceph_opts.
440  */
441 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
442 {
443         struct rbd_client *rbdc;
444         int ret = -ENOMEM;
445
446         dout("rbd_client_create\n");
447         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448         if (!rbdc)
449                 goto out_opt;
450
451         kref_init(&rbdc->kref);
452         INIT_LIST_HEAD(&rbdc->node);
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455
456         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
457         if (IS_ERR(rbdc->client))
458                 goto out_mutex;
459         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
460
461         ret = ceph_open_session(rbdc->client);
462         if (ret < 0)
463                 goto out_err;
464
465         spin_lock(&rbd_client_list_lock);
466         list_add_tail(&rbdc->node, &rbd_client_list);
467         spin_unlock(&rbd_client_list_lock);
468
469         mutex_unlock(&ctl_mutex);
470
471         dout("rbd_client_create created %p\n", rbdc);
472         return rbdc;
473
474 out_err:
475         ceph_destroy_client(rbdc->client);
476 out_mutex:
477         mutex_unlock(&ctl_mutex);
478         kfree(rbdc);
479 out_opt:
480         if (ceph_opts)
481                 ceph_destroy_options(ceph_opts);
482         return ERR_PTR(ret);
483 }
484
485 /*
486  * Find a ceph client with specific addr and configuration.  If
487  * found, bump its reference count.
488  */
489 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *client_node;
492         bool found = false;
493
494         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
495                 return NULL;
496
497         spin_lock(&rbd_client_list_lock);
498         list_for_each_entry(client_node, &rbd_client_list, node) {
499                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
500                         kref_get(&client_node->kref);
501                         found = true;
502                         break;
503                 }
504         }
505         spin_unlock(&rbd_client_list_lock);
506
507         return found ? client_node : NULL;
508 }
509
510 /*
511  * mount options
512  */
513 enum {
514         Opt_last_int,
515         /* int args above */
516         Opt_last_string,
517         /* string args above */
518         Opt_read_only,
519         Opt_read_write,
520         /* Boolean args above */
521         Opt_last_bool,
522 };
523
524 static match_table_t rbd_opts_tokens = {
525         /* int args above */
526         /* string args above */
527         {Opt_read_only, "read_only"},
528         {Opt_read_only, "ro"},          /* Alternate spelling */
529         {Opt_read_write, "read_write"},
530         {Opt_read_write, "rw"},         /* Alternate spelling */
531         /* Boolean args above */
532         {-1, NULL}
533 };
534
535 struct rbd_options {
536         bool    read_only;
537 };
538
539 #define RBD_READ_ONLY_DEFAULT   false
540
541 static int parse_rbd_opts_token(char *c, void *private)
542 {
543         struct rbd_options *rbd_opts = private;
544         substring_t argstr[MAX_OPT_ARGS];
545         int token, intval, ret;
546
547         token = match_token(c, rbd_opts_tokens, argstr);
548         if (token < 0)
549                 return -EINVAL;
550
551         if (token < Opt_last_int) {
552                 ret = match_int(&argstr[0], &intval);
553                 if (ret < 0) {
554                         pr_err("bad mount option arg (not int) "
555                                "at '%s'\n", c);
556                         return ret;
557                 }
558                 dout("got int token %d val %d\n", token, intval);
559         } else if (token > Opt_last_int && token < Opt_last_string) {
560                 dout("got string token %d val %s\n", token,
561                      argstr[0].from);
562         } else if (token > Opt_last_string && token < Opt_last_bool) {
563                 dout("got Boolean token %d\n", token);
564         } else {
565                 dout("got token %d\n", token);
566         }
567
568         switch (token) {
569         case Opt_read_only:
570                 rbd_opts->read_only = true;
571                 break;
572         case Opt_read_write:
573                 rbd_opts->read_only = false;
574                 break;
575         default:
576                 rbd_assert(false);
577                 break;
578         }
579         return 0;
580 }
581
582 /*
583  * Get a ceph client with specific addr and configuration, if one does
584  * not exist create it.
585  */
586 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
587 {
588         struct rbd_client *rbdc;
589
590         rbdc = rbd_client_find(ceph_opts);
591         if (rbdc)       /* using an existing client */
592                 ceph_destroy_options(ceph_opts);
593         else
594                 rbdc = rbd_client_create(ceph_opts);
595
596         return rbdc;
597 }
598
599 /*
600  * Destroy ceph client
601  *
602  * Caller must hold rbd_client_list_lock.
603  */
604 static void rbd_client_release(struct kref *kref)
605 {
606         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
607
608         dout("rbd_release_client %p\n", rbdc);
609         spin_lock(&rbd_client_list_lock);
610         list_del(&rbdc->node);
611         spin_unlock(&rbd_client_list_lock);
612
613         ceph_destroy_client(rbdc->client);
614         kfree(rbdc);
615 }
616
617 /*
618  * Drop reference to ceph client node. If it's not referenced anymore, release
619  * it.
620  */
621 static void rbd_put_client(struct rbd_client *rbdc)
622 {
623         if (rbdc)
624                 kref_put(&rbdc->kref, rbd_client_release);
625 }
626
627 static bool rbd_image_format_valid(u32 image_format)
628 {
629         return image_format == 1 || image_format == 2;
630 }
631
632 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
633 {
634         size_t size;
635         u32 snap_count;
636
637         /* The header has to start with the magic rbd header text */
638         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
639                 return false;
640
641         /* The bio layer requires at least sector-sized I/O */
642
643         if (ondisk->options.order < SECTOR_SHIFT)
644                 return false;
645
646         /* If we use u64 in a few spots we may be able to loosen this */
647
648         if (ondisk->options.order > 8 * sizeof (int) - 1)
649                 return false;
650
651         /*
652          * The size of a snapshot header has to fit in a size_t, and
653          * that limits the number of snapshots.
654          */
655         snap_count = le32_to_cpu(ondisk->snap_count);
656         size = SIZE_MAX - sizeof (struct ceph_snap_context);
657         if (snap_count > size / sizeof (__le64))
658                 return false;
659
660         /*
661          * Not only that, but the size of the entire the snapshot
662          * header must also be representable in a size_t.
663          */
664         size -= snap_count * sizeof (__le64);
665         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
666                 return false;
667
668         return true;
669 }
670
671 /*
672  * Create a new header structure, translate header format from the on-disk
673  * header.
674  */
675 static int rbd_header_from_disk(struct rbd_image_header *header,
676                                  struct rbd_image_header_ondisk *ondisk)
677 {
678         u32 snap_count;
679         size_t len;
680         size_t size;
681         u32 i;
682
683         memset(header, 0, sizeof (*header));
684
685         snap_count = le32_to_cpu(ondisk->snap_count);
686
687         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
688         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
689         if (!header->object_prefix)
690                 return -ENOMEM;
691         memcpy(header->object_prefix, ondisk->object_prefix, len);
692         header->object_prefix[len] = '\0';
693
694         if (snap_count) {
695                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
696
697                 /* Save a copy of the snapshot names */
698
699                 if (snap_names_len > (u64) SIZE_MAX)
700                         return -EIO;
701                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
702                 if (!header->snap_names)
703                         goto out_err;
704                 /*
705                  * Note that rbd_dev_v1_header_read() guarantees
706                  * the ondisk buffer we're working with has
707                  * snap_names_len bytes beyond the end of the
708                  * snapshot id array, this memcpy() is safe.
709                  */
710                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
711                         snap_names_len);
712
713                 /* Record each snapshot's size */
714
715                 size = snap_count * sizeof (*header->snap_sizes);
716                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
717                 if (!header->snap_sizes)
718                         goto out_err;
719                 for (i = 0; i < snap_count; i++)
720                         header->snap_sizes[i] =
721                                 le64_to_cpu(ondisk->snaps[i].image_size);
722         } else {
723                 WARN_ON(ondisk->snap_names_len);
724                 header->snap_names = NULL;
725                 header->snap_sizes = NULL;
726         }
727
728         header->features = 0;   /* No features support in v1 images */
729         header->obj_order = ondisk->options.order;
730         header->crypt_type = ondisk->options.crypt_type;
731         header->comp_type = ondisk->options.comp_type;
732
733         /* Allocate and fill in the snapshot context */
734
735         header->image_size = le64_to_cpu(ondisk->image_size);
736         size = sizeof (struct ceph_snap_context);
737         size += snap_count * sizeof (header->snapc->snaps[0]);
738         header->snapc = kzalloc(size, GFP_KERNEL);
739         if (!header->snapc)
740                 goto out_err;
741
742         atomic_set(&header->snapc->nref, 1);
743         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
744         header->snapc->num_snaps = snap_count;
745         for (i = 0; i < snap_count; i++)
746                 header->snapc->snaps[i] =
747                         le64_to_cpu(ondisk->snaps[i].id);
748
749         return 0;
750
751 out_err:
752         kfree(header->snap_sizes);
753         header->snap_sizes = NULL;
754         kfree(header->snap_names);
755         header->snap_names = NULL;
756         kfree(header->object_prefix);
757         header->object_prefix = NULL;
758
759         return -ENOMEM;
760 }
761
762 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
763 {
764         struct rbd_snap *snap;
765
766         if (snap_id == CEPH_NOSNAP)
767                 return RBD_SNAP_HEAD_NAME;
768
769         list_for_each_entry(snap, &rbd_dev->snaps, node)
770                 if (snap_id == snap->id)
771                         return snap->name;
772
773         return NULL;
774 }
775
776 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
777 {
778
779         struct rbd_snap *snap;
780
781         list_for_each_entry(snap, &rbd_dev->snaps, node) {
782                 if (!strcmp(snap_name, snap->name)) {
783                         rbd_dev->spec->snap_id = snap->id;
784                         rbd_dev->mapping.size = snap->size;
785                         rbd_dev->mapping.features = snap->features;
786
787                         return 0;
788                 }
789         }
790
791         return -ENOENT;
792 }
793
794 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
795 {
796         int ret;
797
798         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
799                     sizeof (RBD_SNAP_HEAD_NAME))) {
800                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
801                 rbd_dev->mapping.size = rbd_dev->header.image_size;
802                 rbd_dev->mapping.features = rbd_dev->header.features;
803                 ret = 0;
804         } else {
805                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
806                 if (ret < 0)
807                         goto done;
808                 rbd_dev->mapping.read_only = true;
809         }
810         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811
812 done:
813         return ret;
814 }
815
816 static void rbd_header_free(struct rbd_image_header *header)
817 {
818         kfree(header->object_prefix);
819         header->object_prefix = NULL;
820         kfree(header->snap_sizes);
821         header->snap_sizes = NULL;
822         kfree(header->snap_names);
823         header->snap_names = NULL;
824         ceph_put_snap_context(header->snapc);
825         header->snapc = NULL;
826 }
827
828 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
829 {
830         char *name;
831         u64 segment;
832         int ret;
833
834         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
835         if (!name)
836                 return NULL;
837         segment = offset >> rbd_dev->header.obj_order;
838         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
839                         rbd_dev->header.object_prefix, segment);
840         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
841                 pr_err("error formatting segment name for #%llu (%d)\n",
842                         segment, ret);
843                 kfree(name);
844                 name = NULL;
845         }
846
847         return name;
848 }
849
850 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
851 {
852         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
853
854         return offset & (segment_size - 1);
855 }
856
857 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
858                                 u64 offset, u64 length)
859 {
860         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
861
862         offset &= segment_size - 1;
863
864         rbd_assert(length <= U64_MAX - offset);
865         if (offset + length > segment_size)
866                 length = segment_size - offset;
867
868         return length;
869 }
870
871 /*
872  * returns the size of an object in the image
873  */
874 static u64 rbd_obj_bytes(struct rbd_image_header *header)
875 {
876         return 1 << header->obj_order;
877 }
878
879 /*
880  * bio helpers
881  */
882
883 static void bio_chain_put(struct bio *chain)
884 {
885         struct bio *tmp;
886
887         while (chain) {
888                 tmp = chain;
889                 chain = chain->bi_next;
890                 bio_put(tmp);
891         }
892 }
893
894 /*
895  * zeros a bio chain, starting at specific offset
896  */
897 static void zero_bio_chain(struct bio *chain, int start_ofs)
898 {
899         struct bio_vec *bv;
900         unsigned long flags;
901         void *buf;
902         int i;
903         int pos = 0;
904
905         while (chain) {
906                 bio_for_each_segment(bv, chain, i) {
907                         if (pos + bv->bv_len > start_ofs) {
908                                 int remainder = max(start_ofs - pos, 0);
909                                 buf = bvec_kmap_irq(bv, &flags);
910                                 memset(buf + remainder, 0,
911                                        bv->bv_len - remainder);
912                                 bvec_kunmap_irq(buf, &flags);
913                         }
914                         pos += bv->bv_len;
915                 }
916
917                 chain = chain->bi_next;
918         }
919 }
920
921 /*
922  * Clone a portion of a bio, starting at the given byte offset
923  * and continuing for the number of bytes indicated.
924  */
925 static struct bio *bio_clone_range(struct bio *bio_src,
926                                         unsigned int offset,
927                                         unsigned int len,
928                                         gfp_t gfpmask)
929 {
930         struct bio_vec *bv;
931         unsigned int resid;
932         unsigned short idx;
933         unsigned int voff;
934         unsigned short end_idx;
935         unsigned short vcnt;
936         struct bio *bio;
937
938         /* Handle the easy case for the caller */
939
940         if (!offset && len == bio_src->bi_size)
941                 return bio_clone(bio_src, gfpmask);
942
943         if (WARN_ON_ONCE(!len))
944                 return NULL;
945         if (WARN_ON_ONCE(len > bio_src->bi_size))
946                 return NULL;
947         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
948                 return NULL;
949
950         /* Find first affected segment... */
951
952         resid = offset;
953         __bio_for_each_segment(bv, bio_src, idx, 0) {
954                 if (resid < bv->bv_len)
955                         break;
956                 resid -= bv->bv_len;
957         }
958         voff = resid;
959
960         /* ...and the last affected segment */
961
962         resid += len;
963         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
964                 if (resid <= bv->bv_len)
965                         break;
966                 resid -= bv->bv_len;
967         }
968         vcnt = end_idx - idx + 1;
969
970         /* Build the clone */
971
972         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
973         if (!bio)
974                 return NULL;    /* ENOMEM */
975
976         bio->bi_bdev = bio_src->bi_bdev;
977         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
978         bio->bi_rw = bio_src->bi_rw;
979         bio->bi_flags |= 1 << BIO_CLONED;
980
981         /*
982          * Copy over our part of the bio_vec, then update the first
983          * and last (or only) entries.
984          */
985         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
986                         vcnt * sizeof (struct bio_vec));
987         bio->bi_io_vec[0].bv_offset += voff;
988         if (vcnt > 1) {
989                 bio->bi_io_vec[0].bv_len -= voff;
990                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
991         } else {
992                 bio->bi_io_vec[0].bv_len = len;
993         }
994
995         bio->bi_vcnt = vcnt;
996         bio->bi_size = len;
997         bio->bi_idx = 0;
998
999         return bio;
1000 }
1001
1002 /*
1003  * Clone a portion of a bio chain, starting at the given byte offset
1004  * into the first bio in the source chain and continuing for the
1005  * number of bytes indicated.  The result is another bio chain of
1006  * exactly the given length, or a null pointer on error.
1007  *
1008  * The bio_src and offset parameters are both in-out.  On entry they
1009  * refer to the first source bio and the offset into that bio where
1010  * the start of data to be cloned is located.
1011  *
1012  * On return, bio_src is updated to refer to the bio in the source
1013  * chain that contains first un-cloned byte, and *offset will
1014  * contain the offset of that byte within that bio.
1015  */
1016 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1017                                         unsigned int *offset,
1018                                         unsigned int len,
1019                                         gfp_t gfpmask)
1020 {
1021         struct bio *bi = *bio_src;
1022         unsigned int off = *offset;
1023         struct bio *chain = NULL;
1024         struct bio **end;
1025
1026         /* Build up a chain of clone bios up to the limit */
1027
1028         if (!bi || off >= bi->bi_size || !len)
1029                 return NULL;            /* Nothing to clone */
1030
1031         end = &chain;
1032         while (len) {
1033                 unsigned int bi_size;
1034                 struct bio *bio;
1035
1036                 if (!bi) {
1037                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1038                         goto out_err;   /* EINVAL; ran out of bio's */
1039                 }
1040                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1041                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1042                 if (!bio)
1043                         goto out_err;   /* ENOMEM */
1044
1045                 *end = bio;
1046                 end = &bio->bi_next;
1047
1048                 off += bi_size;
1049                 if (off == bi->bi_size) {
1050                         bi = bi->bi_next;
1051                         off = 0;
1052                 }
1053                 len -= bi_size;
1054         }
1055         *bio_src = bi;
1056         *offset = off;
1057
1058         return chain;
1059 out_err:
1060         bio_chain_put(chain);
1061
1062         return NULL;
1063 }
1064
1065 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1066 {
1067         kref_get(&obj_request->kref);
1068 }
1069
1070 static void rbd_obj_request_destroy(struct kref *kref);
1071 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1072 {
1073         rbd_assert(obj_request != NULL);
1074         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1075 }
1076
1077 static void rbd_img_request_get(struct rbd_img_request *img_request)
1078 {
1079         kref_get(&img_request->kref);
1080 }
1081
1082 static void rbd_img_request_destroy(struct kref *kref);
1083 static void rbd_img_request_put(struct rbd_img_request *img_request)
1084 {
1085         rbd_assert(img_request != NULL);
1086         kref_put(&img_request->kref, rbd_img_request_destroy);
1087 }
1088
1089 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1090                                         struct rbd_obj_request *obj_request)
1091 {
1092         rbd_assert(obj_request->img_request == NULL);
1093
1094         rbd_obj_request_get(obj_request);
1095         obj_request->img_request = img_request;
1096         obj_request->which = img_request->obj_request_count;
1097         rbd_assert(obj_request->which != BAD_WHICH);
1098         img_request->obj_request_count++;
1099         list_add_tail(&obj_request->links, &img_request->obj_requests);
1100 }
1101
1102 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1103                                         struct rbd_obj_request *obj_request)
1104 {
1105         rbd_assert(obj_request->which != BAD_WHICH);
1106
1107         list_del(&obj_request->links);
1108         rbd_assert(img_request->obj_request_count > 0);
1109         img_request->obj_request_count--;
1110         rbd_assert(obj_request->which == img_request->obj_request_count);
1111         obj_request->which = BAD_WHICH;
1112         rbd_assert(obj_request->img_request == img_request);
1113         obj_request->img_request = NULL;
1114         obj_request->callback = NULL;
1115         rbd_obj_request_put(obj_request);
1116 }
1117
1118 static bool obj_request_type_valid(enum obj_request_type type)
1119 {
1120         switch (type) {
1121         case OBJ_REQUEST_NODATA:
1122         case OBJ_REQUEST_BIO:
1123         case OBJ_REQUEST_PAGES:
1124                 return true;
1125         default:
1126                 return false;
1127         }
1128 }
1129
1130 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1131 {
1132         struct ceph_osd_req_op *op;
1133         va_list args;
1134         size_t size;
1135
1136         op = kzalloc(sizeof (*op), GFP_NOIO);
1137         if (!op)
1138                 return NULL;
1139         op->op = opcode;
1140         va_start(args, opcode);
1141         switch (opcode) {
1142         case CEPH_OSD_OP_READ:
1143         case CEPH_OSD_OP_WRITE:
1144                 /* rbd_osd_req_op_create(READ, offset, length) */
1145                 /* rbd_osd_req_op_create(WRITE, offset, length) */
1146                 op->extent.offset = va_arg(args, u64);
1147                 op->extent.length = va_arg(args, u64);
1148                 if (opcode == CEPH_OSD_OP_WRITE)
1149                         op->payload_len = op->extent.length;
1150                 break;
1151         case CEPH_OSD_OP_CALL:
1152                 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1153                 op->cls.class_name = va_arg(args, char *);
1154                 size = strlen(op->cls.class_name);
1155                 rbd_assert(size <= (size_t) U8_MAX);
1156                 op->cls.class_len = size;
1157                 op->payload_len = size;
1158
1159                 op->cls.method_name = va_arg(args, char *);
1160                 size = strlen(op->cls.method_name);
1161                 rbd_assert(size <= (size_t) U8_MAX);
1162                 op->cls.method_len = size;
1163                 op->payload_len += size;
1164
1165                 op->cls.argc = 0;
1166                 op->cls.indata = va_arg(args, void *);
1167                 size = va_arg(args, size_t);
1168                 rbd_assert(size <= (size_t) U32_MAX);
1169                 op->cls.indata_len = (u32) size;
1170                 op->payload_len += size;
1171                 break;
1172         case CEPH_OSD_OP_NOTIFY_ACK:
1173         case CEPH_OSD_OP_WATCH:
1174                 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1175                 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1176                 op->watch.cookie = va_arg(args, u64);
1177                 op->watch.ver = va_arg(args, u64);
1178                 op->watch.ver = cpu_to_le64(op->watch.ver);
1179                 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1180                         op->watch.flag = (u8) 1;
1181                 break;
1182         default:
1183                 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1184                 kfree(op);
1185                 op = NULL;
1186                 break;
1187         }
1188         va_end(args);
1189
1190         return op;
1191 }
1192
1193 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1194 {
1195         kfree(op);
1196 }
1197
1198 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1199                                 struct rbd_obj_request *obj_request)
1200 {
1201         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1202 }
1203
1204 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1205 {
1206         if (img_request->callback)
1207                 img_request->callback(img_request);
1208         else
1209                 rbd_img_request_put(img_request);
1210 }
1211
1212 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1213
1214 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1215 {
1216         return wait_for_completion_interruptible(&obj_request->completion);
1217 }
1218
1219 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request,
1220                                 struct ceph_osd_op *op)
1221 {
1222         atomic_set(&obj_request->done, 1);
1223 }
1224
1225 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1226 {
1227         if (obj_request->callback)
1228                 obj_request->callback(obj_request);
1229         else
1230                 complete_all(&obj_request->completion);
1231 }
1232
1233 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
1234                                 struct ceph_osd_op *op)
1235 {
1236         u64 xferred;
1237
1238         /*
1239          * We support a 64-bit length, but ultimately it has to be
1240          * passed to blk_end_request(), which takes an unsigned int.
1241          */
1242         xferred = le64_to_cpu(op->extent.length);
1243         rbd_assert(xferred < (u64) UINT_MAX);
1244         if (obj_request->result == (s32) -ENOENT) {
1245                 zero_bio_chain(obj_request->bio_list, 0);
1246                 obj_request->result = 0;
1247         } else if (xferred < obj_request->length && !obj_request->result) {
1248                 zero_bio_chain(obj_request->bio_list, xferred);
1249                 xferred = obj_request->length;
1250         }
1251         obj_request->xferred = xferred;
1252         atomic_set(&obj_request->done, 1);
1253 }
1254
1255 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
1256                                 struct ceph_osd_op *op)
1257 {
1258         obj_request->xferred = le64_to_cpu(op->extent.length);
1259         atomic_set(&obj_request->done, 1);
1260 }
1261
1262 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1263                                 struct ceph_msg *msg)
1264 {
1265         struct rbd_obj_request *obj_request = osd_req->r_priv;
1266         struct ceph_osd_reply_head *reply_head;
1267         struct ceph_osd_op *op;
1268         u32 num_ops;
1269         u16 opcode;
1270
1271         rbd_assert(osd_req == obj_request->osd_req);
1272         rbd_assert(!!obj_request->img_request ^
1273                                 (obj_request->which == BAD_WHICH));
1274
1275         obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
1276         reply_head = msg->front.iov_base;
1277         obj_request->result = (s32) le32_to_cpu(reply_head->result);
1278         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1279
1280         num_ops = le32_to_cpu(reply_head->num_ops);
1281         WARN_ON(num_ops != 1);  /* For now */
1282
1283         op = &reply_head->ops[0];
1284         opcode = le16_to_cpu(op->op);
1285         switch (opcode) {
1286         case CEPH_OSD_OP_READ:
1287                 rbd_osd_read_callback(obj_request, op);
1288                 break;
1289         case CEPH_OSD_OP_WRITE:
1290                 rbd_osd_write_callback(obj_request, op);
1291                 break;
1292         case CEPH_OSD_OP_CALL:
1293         case CEPH_OSD_OP_NOTIFY_ACK:
1294         case CEPH_OSD_OP_WATCH:
1295                 rbd_osd_trivial_callback(obj_request, op);
1296                 break;
1297         default:
1298                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1299                         obj_request->object_name, (unsigned short) opcode);
1300                 break;
1301         }
1302
1303         if (atomic_read(&obj_request->done))
1304                 rbd_obj_request_complete(obj_request);
1305 }
1306
1307 static struct ceph_osd_request *rbd_osd_req_create(
1308                                         struct rbd_device *rbd_dev,
1309                                         bool write_request,
1310                                         struct rbd_obj_request *obj_request,
1311                                         struct ceph_osd_req_op *op)
1312 {
1313         struct rbd_img_request *img_request = obj_request->img_request;
1314         struct ceph_snap_context *snapc = NULL;
1315         struct ceph_osd_client *osdc;
1316         struct ceph_osd_request *osd_req;
1317         struct timespec now;
1318         struct timespec *mtime;
1319         u64 snap_id = CEPH_NOSNAP;
1320         u64 offset = obj_request->offset;
1321         u64 length = obj_request->length;
1322
1323         if (img_request) {
1324                 rbd_assert(img_request->write_request == write_request);
1325                 if (img_request->write_request)
1326                         snapc = img_request->snapc;
1327                 else
1328                         snap_id = img_request->snap_id;
1329         }
1330
1331         /* Allocate and initialize the request, for the single op */
1332
1333         osdc = &rbd_dev->rbd_client->client->osdc;
1334         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1335         if (!osd_req)
1336                 return NULL;    /* ENOMEM */
1337
1338         rbd_assert(obj_request_type_valid(obj_request->type));
1339         switch (obj_request->type) {
1340         case OBJ_REQUEST_NODATA:
1341                 break;          /* Nothing to do */
1342         case OBJ_REQUEST_BIO:
1343                 rbd_assert(obj_request->bio_list != NULL);
1344                 osd_req->r_bio = obj_request->bio_list;
1345                 bio_get(osd_req->r_bio);
1346                 /* osd client requires "num pages" even for bio */
1347                 osd_req->r_num_pages = calc_pages_for(offset, length);
1348                 break;
1349         case OBJ_REQUEST_PAGES:
1350                 osd_req->r_pages = obj_request->pages;
1351                 osd_req->r_num_pages = obj_request->page_count;
1352                 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1353                 break;
1354         }
1355
1356         if (write_request) {
1357                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1358                 now = CURRENT_TIME;
1359                 mtime = &now;
1360         } else {
1361                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1362                 mtime = NULL;   /* not needed for reads */
1363                 offset = 0;     /* These are not used... */
1364                 length = 0;     /* ...for osd read requests */
1365         }
1366
1367         osd_req->r_callback = rbd_osd_req_callback;
1368         osd_req->r_priv = obj_request;
1369
1370         osd_req->r_oid_len = strlen(obj_request->object_name);
1371         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1372         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1373
1374         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1375
1376         /* osd_req will get its own reference to snapc (if non-null) */
1377
1378         ceph_osdc_build_request(osd_req, offset, length, 1, op,
1379                                 snapc, snap_id, mtime);
1380
1381         return osd_req;
1382 }
1383
1384 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1385 {
1386         ceph_osdc_put_request(osd_req);
1387 }
1388
1389 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1390
1391 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1392                                                 u64 offset, u64 length,
1393                                                 enum obj_request_type type)
1394 {
1395         struct rbd_obj_request *obj_request;
1396         size_t size;
1397         char *name;
1398
1399         rbd_assert(obj_request_type_valid(type));
1400
1401         size = strlen(object_name) + 1;
1402         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1403         if (!obj_request)
1404                 return NULL;
1405
1406         name = (char *)(obj_request + 1);
1407         obj_request->object_name = memcpy(name, object_name, size);
1408         obj_request->offset = offset;
1409         obj_request->length = length;
1410         obj_request->which = BAD_WHICH;
1411         obj_request->type = type;
1412         INIT_LIST_HEAD(&obj_request->links);
1413         atomic_set(&obj_request->done, 0);
1414         init_completion(&obj_request->completion);
1415         kref_init(&obj_request->kref);
1416
1417         return obj_request;
1418 }
1419
1420 static void rbd_obj_request_destroy(struct kref *kref)
1421 {
1422         struct rbd_obj_request *obj_request;
1423
1424         obj_request = container_of(kref, struct rbd_obj_request, kref);
1425
1426         rbd_assert(obj_request->img_request == NULL);
1427         rbd_assert(obj_request->which == BAD_WHICH);
1428
1429         if (obj_request->osd_req)
1430                 rbd_osd_req_destroy(obj_request->osd_req);
1431
1432         rbd_assert(obj_request_type_valid(obj_request->type));
1433         switch (obj_request->type) {
1434         case OBJ_REQUEST_NODATA:
1435                 break;          /* Nothing to do */
1436         case OBJ_REQUEST_BIO:
1437                 if (obj_request->bio_list)
1438                         bio_chain_put(obj_request->bio_list);
1439                 break;
1440         case OBJ_REQUEST_PAGES:
1441                 if (obj_request->pages)
1442                         ceph_release_page_vector(obj_request->pages,
1443                                                 obj_request->page_count);
1444                 break;
1445         }
1446
1447         kfree(obj_request);
1448 }
1449
1450 /*
1451  * Caller is responsible for filling in the list of object requests
1452  * that comprises the image request, and the Linux request pointer
1453  * (if there is one).
1454  */
1455 struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
1456                                         u64 offset, u64 length,
1457                                         bool write_request)
1458 {
1459         struct rbd_img_request *img_request;
1460         struct ceph_snap_context *snapc = NULL;
1461
1462         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1463         if (!img_request)
1464                 return NULL;
1465
1466         if (write_request) {
1467                 down_read(&rbd_dev->header_rwsem);
1468                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1469                 up_read(&rbd_dev->header_rwsem);
1470                 if (WARN_ON(!snapc)) {
1471                         kfree(img_request);
1472                         return NULL;    /* Shouldn't happen */
1473                 }
1474         }
1475
1476         img_request->rq = NULL;
1477         img_request->rbd_dev = rbd_dev;
1478         img_request->offset = offset;
1479         img_request->length = length;
1480         img_request->write_request = write_request;
1481         if (write_request)
1482                 img_request->snapc = snapc;
1483         else
1484                 img_request->snap_id = rbd_dev->spec->snap_id;
1485         spin_lock_init(&img_request->completion_lock);
1486         img_request->next_completion = 0;
1487         img_request->callback = NULL;
1488         img_request->obj_request_count = 0;
1489         INIT_LIST_HEAD(&img_request->obj_requests);
1490         kref_init(&img_request->kref);
1491
1492         rbd_img_request_get(img_request);       /* Avoid a warning */
1493         rbd_img_request_put(img_request);       /* TEMPORARY */
1494
1495         return img_request;
1496 }
1497
1498 static void rbd_img_request_destroy(struct kref *kref)
1499 {
1500         struct rbd_img_request *img_request;
1501         struct rbd_obj_request *obj_request;
1502         struct rbd_obj_request *next_obj_request;
1503
1504         img_request = container_of(kref, struct rbd_img_request, kref);
1505
1506         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1507                 rbd_img_obj_request_del(img_request, obj_request);
1508         rbd_assert(img_request->obj_request_count == 0);
1509
1510         if (img_request->write_request)
1511                 ceph_put_snap_context(img_request->snapc);
1512
1513         kfree(img_request);
1514 }
1515
1516 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1517                                         struct bio *bio_list)
1518 {
1519         struct rbd_device *rbd_dev = img_request->rbd_dev;
1520         struct rbd_obj_request *obj_request = NULL;
1521         struct rbd_obj_request *next_obj_request;
1522         unsigned int bio_offset;
1523         u64 image_offset;
1524         u64 resid;
1525         u16 opcode;
1526
1527         opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1528                                               : CEPH_OSD_OP_READ;
1529         bio_offset = 0;
1530         image_offset = img_request->offset;
1531         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1532         resid = img_request->length;
1533         while (resid) {
1534                 const char *object_name;
1535                 unsigned int clone_size;
1536                 struct ceph_osd_req_op *op;
1537                 u64 offset;
1538                 u64 length;
1539
1540                 object_name = rbd_segment_name(rbd_dev, image_offset);
1541                 if (!object_name)
1542                         goto out_unwind;
1543                 offset = rbd_segment_offset(rbd_dev, image_offset);
1544                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1545                 obj_request = rbd_obj_request_create(object_name,
1546                                                 offset, length,
1547                                                 OBJ_REQUEST_BIO);
1548                 kfree(object_name);     /* object request has its own copy */
1549                 if (!obj_request)
1550                         goto out_unwind;
1551
1552                 rbd_assert(length <= (u64) UINT_MAX);
1553                 clone_size = (unsigned int) length;
1554                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1555                                                 &bio_offset, clone_size,
1556                                                 GFP_ATOMIC);
1557                 if (!obj_request->bio_list)
1558                         goto out_partial;
1559
1560                 /*
1561                  * Build up the op to use in building the osd
1562                  * request.  Note that the contents of the op are
1563                  * copied by rbd_osd_req_create().
1564                  */
1565                 op = rbd_osd_req_op_create(opcode, offset, length);
1566                 if (!op)
1567                         goto out_partial;
1568                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1569                                                 img_request->write_request,
1570                                                 obj_request, op);
1571                 rbd_osd_req_op_destroy(op);
1572                 if (!obj_request->osd_req)
1573                         goto out_partial;
1574                 /* status and version are initially zero-filled */
1575
1576                 rbd_img_obj_request_add(img_request, obj_request);
1577
1578                 image_offset += length;
1579                 resid -= length;
1580         }
1581
1582         return 0;
1583
1584 out_partial:
1585         rbd_obj_request_put(obj_request);
1586 out_unwind:
1587         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1588                 rbd_obj_request_put(obj_request);
1589
1590         return -ENOMEM;
1591 }
1592
1593 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1594 {
1595         struct rbd_img_request *img_request;
1596         u32 which = obj_request->which;
1597         bool more = true;
1598
1599         img_request = obj_request->img_request;
1600         rbd_assert(img_request != NULL);
1601         rbd_assert(img_request->rq != NULL);
1602         rbd_assert(which != BAD_WHICH);
1603         rbd_assert(which < img_request->obj_request_count);
1604         rbd_assert(which >= img_request->next_completion);
1605
1606         spin_lock_irq(&img_request->completion_lock);
1607         if (which != img_request->next_completion)
1608                 goto out;
1609
1610         for_each_obj_request_from(img_request, obj_request) {
1611                 unsigned int xferred;
1612                 int result;
1613
1614                 rbd_assert(more);
1615                 rbd_assert(which < img_request->obj_request_count);
1616
1617                 if (!atomic_read(&obj_request->done))
1618                         break;
1619
1620                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1621                 xferred = (unsigned int) obj_request->xferred;
1622                 result = (int) obj_request->result;
1623                 if (result)
1624                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1625                                 img_request->write_request ? "write" : "read",
1626                                 result, xferred);
1627
1628                 more = blk_end_request(img_request->rq, result, xferred);
1629                 which++;
1630         }
1631         rbd_assert(more ^ (which == img_request->obj_request_count));
1632         img_request->next_completion = which;
1633 out:
1634         spin_unlock_irq(&img_request->completion_lock);
1635
1636         if (!more)
1637                 rbd_img_request_complete(img_request);
1638 }
1639
1640 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1641 {
1642         struct rbd_device *rbd_dev = img_request->rbd_dev;
1643         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1644         struct rbd_obj_request *obj_request;
1645
1646         for_each_obj_request(img_request, obj_request) {
1647                 int ret;
1648
1649                 obj_request->callback = rbd_img_obj_callback;
1650                 ret = rbd_obj_request_submit(osdc, obj_request);
1651                 if (ret)
1652                         return ret;
1653                 /*
1654                  * The image request has its own reference to each
1655                  * of its object requests, so we can safely drop the
1656                  * initial one here.
1657                  */
1658                 rbd_obj_request_put(obj_request);
1659         }
1660
1661         return 0;
1662 }
1663
1664 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1665                                    u64 ver, u64 notify_id)
1666 {
1667         struct rbd_obj_request *obj_request;
1668         struct ceph_osd_req_op *op;
1669         struct ceph_osd_client *osdc;
1670         int ret;
1671
1672         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1673                                                         OBJ_REQUEST_NODATA);
1674         if (!obj_request)
1675                 return -ENOMEM;
1676
1677         ret = -ENOMEM;
1678         op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1679         if (!op)
1680                 goto out;
1681         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1682                                                 obj_request, op);
1683         rbd_osd_req_op_destroy(op);
1684         if (!obj_request->osd_req)
1685                 goto out;
1686
1687         osdc = &rbd_dev->rbd_client->client->osdc;
1688         obj_request->callback = rbd_obj_request_put;
1689         ret = rbd_obj_request_submit(osdc, obj_request);
1690 out:
1691         if (ret)
1692                 rbd_obj_request_put(obj_request);
1693
1694         return ret;
1695 }
1696
1697 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1698 {
1699         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1700         u64 hver;
1701         int rc;
1702
1703         if (!rbd_dev)
1704                 return;
1705
1706         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1707                 rbd_dev->header_name, (unsigned long long) notify_id,
1708                 (unsigned int) opcode);
1709         rc = rbd_dev_refresh(rbd_dev, &hver);
1710         if (rc)
1711                 rbd_warn(rbd_dev, "got notification but failed to "
1712                            " update snaps: %d\n", rc);
1713
1714         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1715 }
1716
1717 /*
1718  * Request sync osd watch/unwatch.  The value of "start" determines
1719  * whether a watch request is being initiated or torn down.
1720  */
1721 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1722 {
1723         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1724         struct rbd_obj_request *obj_request;
1725         struct ceph_osd_req_op *op;
1726         int ret;
1727
1728         rbd_assert(start ^ !!rbd_dev->watch_event);
1729         rbd_assert(start ^ !!rbd_dev->watch_request);
1730
1731         if (start) {
1732                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
1733                                                 &rbd_dev->watch_event);
1734                 if (ret < 0)
1735                         return ret;
1736                 rbd_assert(rbd_dev->watch_event != NULL);
1737         }
1738
1739         ret = -ENOMEM;
1740         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1741                                                         OBJ_REQUEST_NODATA);
1742         if (!obj_request)
1743                 goto out_cancel;
1744
1745         op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1746                                 rbd_dev->watch_event->cookie,
1747                                 rbd_dev->header.obj_version, start);
1748         if (!op)
1749                 goto out_cancel;
1750         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1751                                                         obj_request, op);
1752         rbd_osd_req_op_destroy(op);
1753         if (!obj_request->osd_req)
1754                 goto out_cancel;
1755
1756         if (start)
1757                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1758         else
1759                 ceph_osdc_unregister_linger_request(osdc,
1760                                         rbd_dev->watch_request->osd_req);
1761         ret = rbd_obj_request_submit(osdc, obj_request);
1762         if (ret)
1763                 goto out_cancel;
1764         ret = rbd_obj_request_wait(obj_request);
1765         if (ret)
1766                 goto out_cancel;
1767         ret = obj_request->result;
1768         if (ret)
1769                 goto out_cancel;
1770
1771         /*
1772          * A watch request is set to linger, so the underlying osd
1773          * request won't go away until we unregister it.  We retain
1774          * a pointer to the object request during that time (in
1775          * rbd_dev->watch_request), so we'll keep a reference to
1776          * it.  We'll drop that reference (below) after we've
1777          * unregistered it.
1778          */
1779         if (start) {
1780                 rbd_dev->watch_request = obj_request;
1781
1782                 return 0;
1783         }
1784
1785         /* We have successfully torn down the watch request */
1786
1787         rbd_obj_request_put(rbd_dev->watch_request);
1788         rbd_dev->watch_request = NULL;
1789 out_cancel:
1790         /* Cancel the event if we're tearing down, or on error */
1791         ceph_osdc_cancel_event(rbd_dev->watch_event);
1792         rbd_dev->watch_event = NULL;
1793         if (obj_request)
1794                 rbd_obj_request_put(obj_request);
1795
1796         return ret;
1797 }
1798
1799 /*
1800  * Synchronous osd object method call
1801  */
1802 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1803                              const char *object_name,
1804                              const char *class_name,
1805                              const char *method_name,
1806                              const char *outbound,
1807                              size_t outbound_size,
1808                              char *inbound,
1809                              size_t inbound_size,
1810                              u64 *version)
1811 {
1812         struct rbd_obj_request *obj_request;
1813         struct ceph_osd_client *osdc;
1814         struct ceph_osd_req_op *op;
1815         struct page **pages;
1816         u32 page_count;
1817         int ret;
1818
1819         /*
1820          * Method calls are ultimately read operations but they
1821          * don't involve object data (so no offset or length).
1822          * The result should placed into the inbound buffer
1823          * provided.  They also supply outbound data--parameters for
1824          * the object method.  Currently if this is present it will
1825          * be a snapshot id.
1826          */
1827         page_count = (u32) calc_pages_for(0, inbound_size);
1828         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1829         if (IS_ERR(pages))
1830                 return PTR_ERR(pages);
1831
1832         ret = -ENOMEM;
1833         obj_request = rbd_obj_request_create(object_name, 0, 0,
1834                                                         OBJ_REQUEST_PAGES);
1835         if (!obj_request)
1836                 goto out;
1837
1838         obj_request->pages = pages;
1839         obj_request->page_count = page_count;
1840
1841         op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1842                                         method_name, outbound, outbound_size);
1843         if (!op)
1844                 goto out;
1845         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1846                                                 obj_request, op);
1847         rbd_osd_req_op_destroy(op);
1848         if (!obj_request->osd_req)
1849                 goto out;
1850
1851         osdc = &rbd_dev->rbd_client->client->osdc;
1852         ret = rbd_obj_request_submit(osdc, obj_request);
1853         if (ret)
1854                 goto out;
1855         ret = rbd_obj_request_wait(obj_request);
1856         if (ret)
1857                 goto out;
1858
1859         ret = obj_request->result;
1860         if (ret < 0)
1861                 goto out;
1862         ret = ceph_copy_from_page_vector(pages, inbound, 0,
1863                                         obj_request->xferred);
1864         if (version)
1865                 *version = obj_request->version;
1866 out:
1867         if (obj_request)
1868                 rbd_obj_request_put(obj_request);
1869         else
1870                 ceph_release_page_vector(pages, page_count);
1871
1872         return ret;
1873 }
1874
1875 static void rbd_request_fn(struct request_queue *q)
1876 {
1877         struct rbd_device *rbd_dev = q->queuedata;
1878         bool read_only = rbd_dev->mapping.read_only;
1879         struct request *rq;
1880         int result;
1881
1882         while ((rq = blk_fetch_request(q))) {
1883                 bool write_request = rq_data_dir(rq) == WRITE;
1884                 struct rbd_img_request *img_request;
1885                 u64 offset;
1886                 u64 length;
1887
1888                 /* Ignore any non-FS requests that filter through. */
1889
1890                 if (rq->cmd_type != REQ_TYPE_FS) {
1891                         __blk_end_request_all(rq, 0);
1892                         continue;
1893                 }
1894
1895                 spin_unlock_irq(q->queue_lock);
1896
1897                 /* Disallow writes to a read-only device */
1898
1899                 if (write_request) {
1900                         result = -EROFS;
1901                         if (read_only)
1902                                 goto end_request;
1903                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1904                 }
1905
1906                 /*
1907                  * Quit early if the mapped snapshot no longer
1908                  * exists.  It's still possible the snapshot will
1909                  * have disappeared by the time our request arrives
1910                  * at the osd, but there's no sense in sending it if
1911                  * we already know.
1912                  */
1913                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1914                         dout("request for non-existent snapshot");
1915                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1916                         result = -ENXIO;
1917                         goto end_request;
1918                 }
1919
1920                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1921                 length = (u64) blk_rq_bytes(rq);
1922
1923                 result = -EINVAL;
1924                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1925                         goto end_request;       /* Shouldn't happen */
1926
1927                 result = -ENOMEM;
1928                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1929                                                         write_request);
1930                 if (!img_request)
1931                         goto end_request;
1932
1933                 img_request->rq = rq;
1934
1935                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1936                 if (!result)
1937                         result = rbd_img_request_submit(img_request);
1938                 if (result)
1939                         rbd_img_request_put(img_request);
1940 end_request:
1941                 spin_lock_irq(q->queue_lock);
1942                 if (result < 0) {
1943                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1944                                 write_request ? "write" : "read", result);
1945                         __blk_end_request_all(rq, result);
1946                 }
1947         }
1948 }
1949
1950 /*
1951  * a queue callback. Makes sure that we don't create a bio that spans across
1952  * multiple osd objects. One exception would be with a single page bios,
1953  * which we handle later at bio_chain_clone_range()
1954  */
1955 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1956                           struct bio_vec *bvec)
1957 {
1958         struct rbd_device *rbd_dev = q->queuedata;
1959         sector_t sector_offset;
1960         sector_t sectors_per_obj;
1961         sector_t obj_sector_offset;
1962         int ret;
1963
1964         /*
1965          * Find how far into its rbd object the partition-relative
1966          * bio start sector is to offset relative to the enclosing
1967          * device.
1968          */
1969         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1970         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1971         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1972
1973         /*
1974          * Compute the number of bytes from that offset to the end
1975          * of the object.  Account for what's already used by the bio.
1976          */
1977         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1978         if (ret > bmd->bi_size)
1979                 ret -= bmd->bi_size;
1980         else
1981                 ret = 0;
1982
1983         /*
1984          * Don't send back more than was asked for.  And if the bio
1985          * was empty, let the whole thing through because:  "Note
1986          * that a block device *must* allow a single page to be
1987          * added to an empty bio."
1988          */
1989         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1990         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1991                 ret = (int) bvec->bv_len;
1992
1993         return ret;
1994 }
1995
1996 static void rbd_free_disk(struct rbd_device *rbd_dev)
1997 {
1998         struct gendisk *disk = rbd_dev->disk;
1999
2000         if (!disk)
2001                 return;
2002
2003         if (disk->flags & GENHD_FL_UP)
2004                 del_gendisk(disk);
2005         if (disk->queue)
2006                 blk_cleanup_queue(disk->queue);
2007         put_disk(disk);
2008 }
2009
2010 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2011                                 const char *object_name,
2012                                 u64 offset, u64 length,
2013                                 char *buf, u64 *version)
2014
2015 {
2016         struct ceph_osd_req_op *op;
2017         struct rbd_obj_request *obj_request;
2018         struct ceph_osd_client *osdc;
2019         struct page **pages = NULL;
2020         u32 page_count;
2021         int ret;
2022
2023         page_count = (u32) calc_pages_for(offset, length);
2024         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2025         if (IS_ERR(pages))
2026                 ret = PTR_ERR(pages);
2027
2028         ret = -ENOMEM;
2029         obj_request = rbd_obj_request_create(object_name, offset, length,
2030                                                         OBJ_REQUEST_PAGES);
2031         if (!obj_request)
2032                 goto out;
2033
2034         obj_request->pages = pages;
2035         obj_request->page_count = page_count;
2036
2037         op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2038         if (!op)
2039                 goto out;
2040         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2041                                                 obj_request, op);
2042         rbd_osd_req_op_destroy(op);
2043         if (!obj_request->osd_req)
2044                 goto out;
2045
2046         osdc = &rbd_dev->rbd_client->client->osdc;
2047         ret = rbd_obj_request_submit(osdc, obj_request);
2048         if (ret)
2049                 goto out;
2050         ret = rbd_obj_request_wait(obj_request);
2051         if (ret)
2052                 goto out;
2053
2054         ret = obj_request->result;
2055         if (ret < 0)
2056                 goto out;
2057         ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
2058         if (version)
2059                 *version = obj_request->version;
2060 out:
2061         if (obj_request)
2062                 rbd_obj_request_put(obj_request);
2063         else
2064                 ceph_release_page_vector(pages, page_count);
2065
2066         return ret;
2067 }
2068
2069 /*
2070  * Read the complete header for the given rbd device.
2071  *
2072  * Returns a pointer to a dynamically-allocated buffer containing
2073  * the complete and validated header.  Caller can pass the address
2074  * of a variable that will be filled in with the version of the
2075  * header object at the time it was read.
2076  *
2077  * Returns a pointer-coded errno if a failure occurs.
2078  */
2079 static struct rbd_image_header_ondisk *
2080 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2081 {
2082         struct rbd_image_header_ondisk *ondisk = NULL;
2083         u32 snap_count = 0;
2084         u64 names_size = 0;
2085         u32 want_count;
2086         int ret;
2087
2088         /*
2089          * The complete header will include an array of its 64-bit
2090          * snapshot ids, followed by the names of those snapshots as
2091          * a contiguous block of NUL-terminated strings.  Note that
2092          * the number of snapshots could change by the time we read
2093          * it in, in which case we re-read it.
2094          */
2095         do {
2096                 size_t size;
2097
2098                 kfree(ondisk);
2099
2100                 size = sizeof (*ondisk);
2101                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2102                 size += names_size;
2103                 ondisk = kmalloc(size, GFP_KERNEL);
2104                 if (!ondisk)
2105                         return ERR_PTR(-ENOMEM);
2106
2107                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2108                                        0, size,
2109                                        (char *) ondisk, version);
2110
2111                 if (ret < 0)
2112                         goto out_err;
2113                 if (WARN_ON((size_t) ret < size)) {
2114                         ret = -ENXIO;
2115                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2116                                 size, ret);
2117                         goto out_err;
2118                 }
2119                 if (!rbd_dev_ondisk_valid(ondisk)) {
2120                         ret = -ENXIO;
2121                         rbd_warn(rbd_dev, "invalid header");
2122                         goto out_err;
2123                 }
2124
2125                 names_size = le64_to_cpu(ondisk->snap_names_len);
2126                 want_count = snap_count;
2127                 snap_count = le32_to_cpu(ondisk->snap_count);
2128         } while (snap_count != want_count);
2129
2130         return ondisk;
2131
2132 out_err:
2133         kfree(ondisk);
2134
2135         return ERR_PTR(ret);
2136 }
2137
2138 /*
2139  * reload the ondisk the header
2140  */
2141 static int rbd_read_header(struct rbd_device *rbd_dev,
2142                            struct rbd_image_header *header)
2143 {
2144         struct rbd_image_header_ondisk *ondisk;
2145         u64 ver = 0;
2146         int ret;
2147
2148         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2149         if (IS_ERR(ondisk))
2150                 return PTR_ERR(ondisk);
2151         ret = rbd_header_from_disk(header, ondisk);
2152         if (ret >= 0)
2153                 header->obj_version = ver;
2154         kfree(ondisk);
2155
2156         return ret;
2157 }
2158
2159 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2160 {
2161         struct rbd_snap *snap;
2162         struct rbd_snap *next;
2163
2164         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2165                 rbd_remove_snap_dev(snap);
2166 }
2167
2168 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2169 {
2170         sector_t size;
2171
2172         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2173                 return;
2174
2175         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2176         dout("setting size to %llu sectors", (unsigned long long) size);
2177         rbd_dev->mapping.size = (u64) size;
2178         set_capacity(rbd_dev->disk, size);
2179 }
2180
2181 /*
2182  * only read the first part of the ondisk header, without the snaps info
2183  */
2184 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2185 {
2186         int ret;
2187         struct rbd_image_header h;
2188
2189         ret = rbd_read_header(rbd_dev, &h);
2190         if (ret < 0)
2191                 return ret;
2192
2193         down_write(&rbd_dev->header_rwsem);
2194
2195         /* Update image size, and check for resize of mapped image */
2196         rbd_dev->header.image_size = h.image_size;
2197         rbd_update_mapping_size(rbd_dev);
2198
2199         /* rbd_dev->header.object_prefix shouldn't change */
2200         kfree(rbd_dev->header.snap_sizes);
2201         kfree(rbd_dev->header.snap_names);
2202         /* osd requests may still refer to snapc */
2203         ceph_put_snap_context(rbd_dev->header.snapc);
2204
2205         if (hver)
2206                 *hver = h.obj_version;
2207         rbd_dev->header.obj_version = h.obj_version;
2208         rbd_dev->header.image_size = h.image_size;
2209         rbd_dev->header.snapc = h.snapc;
2210         rbd_dev->header.snap_names = h.snap_names;
2211         rbd_dev->header.snap_sizes = h.snap_sizes;
2212         /* Free the extra copy of the object prefix */
2213         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2214         kfree(h.object_prefix);
2215
2216         ret = rbd_dev_snaps_update(rbd_dev);
2217         if (!ret)
2218                 ret = rbd_dev_snaps_register(rbd_dev);
2219
2220         up_write(&rbd_dev->header_rwsem);
2221
2222         return ret;
2223 }
2224
2225 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2226 {
2227         int ret;
2228
2229         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2230         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2231         if (rbd_dev->image_format == 1)
2232                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2233         else
2234                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2235         mutex_unlock(&ctl_mutex);
2236
2237         return ret;
2238 }
2239
2240 static int rbd_init_disk(struct rbd_device *rbd_dev)
2241 {
2242         struct gendisk *disk;
2243         struct request_queue *q;
2244         u64 segment_size;
2245
2246         /* create gendisk info */
2247         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2248         if (!disk)
2249                 return -ENOMEM;
2250
2251         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2252                  rbd_dev->dev_id);
2253         disk->major = rbd_dev->major;
2254         disk->first_minor = 0;
2255         disk->fops = &rbd_bd_ops;
2256         disk->private_data = rbd_dev;
2257
2258         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2259         if (!q)
2260                 goto out_disk;
2261
2262         /* We use the default size, but let's be explicit about it. */
2263         blk_queue_physical_block_size(q, SECTOR_SIZE);
2264
2265         /* set io sizes to object size */
2266         segment_size = rbd_obj_bytes(&rbd_dev->header);
2267         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2268         blk_queue_max_segment_size(q, segment_size);
2269         blk_queue_io_min(q, segment_size);
2270         blk_queue_io_opt(q, segment_size);
2271
2272         blk_queue_merge_bvec(q, rbd_merge_bvec);
2273         disk->queue = q;
2274
2275         q->queuedata = rbd_dev;
2276
2277         rbd_dev->disk = disk;
2278
2279         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2280
2281         return 0;
2282 out_disk:
2283         put_disk(disk);
2284
2285         return -ENOMEM;
2286 }
2287
2288 /*
2289   sysfs
2290 */
2291
2292 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2293 {
2294         return container_of(dev, struct rbd_device, dev);
2295 }
2296
2297 static ssize_t rbd_size_show(struct device *dev,
2298                              struct device_attribute *attr, char *buf)
2299 {
2300         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2301         sector_t size;
2302
2303         down_read(&rbd_dev->header_rwsem);
2304         size = get_capacity(rbd_dev->disk);
2305         up_read(&rbd_dev->header_rwsem);
2306
2307         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2308 }
2309
2310 /*
2311  * Note this shows the features for whatever's mapped, which is not
2312  * necessarily the base image.
2313  */
2314 static ssize_t rbd_features_show(struct device *dev,
2315                              struct device_attribute *attr, char *buf)
2316 {
2317         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2318
2319         return sprintf(buf, "0x%016llx\n",
2320                         (unsigned long long) rbd_dev->mapping.features);
2321 }
2322
2323 static ssize_t rbd_major_show(struct device *dev,
2324                               struct device_attribute *attr, char *buf)
2325 {
2326         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2327
2328         return sprintf(buf, "%d\n", rbd_dev->major);
2329 }
2330
2331 static ssize_t rbd_client_id_show(struct device *dev,
2332                                   struct device_attribute *attr, char *buf)
2333 {
2334         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2335
2336         return sprintf(buf, "client%lld\n",
2337                         ceph_client_id(rbd_dev->rbd_client->client));
2338 }
2339
2340 static ssize_t rbd_pool_show(struct device *dev,
2341                              struct device_attribute *attr, char *buf)
2342 {
2343         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2344
2345         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2346 }
2347
2348 static ssize_t rbd_pool_id_show(struct device *dev,
2349                              struct device_attribute *attr, char *buf)
2350 {
2351         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2352
2353         return sprintf(buf, "%llu\n",
2354                 (unsigned long long) rbd_dev->spec->pool_id);
2355 }
2356
2357 static ssize_t rbd_name_show(struct device *dev,
2358                              struct device_attribute *attr, char *buf)
2359 {
2360         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2361
2362         if (rbd_dev->spec->image_name)
2363                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2364
2365         return sprintf(buf, "(unknown)\n");
2366 }
2367
2368 static ssize_t rbd_image_id_show(struct device *dev,
2369                              struct device_attribute *attr, char *buf)
2370 {
2371         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2372
2373         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2374 }
2375
2376 /*
2377  * Shows the name of the currently-mapped snapshot (or
2378  * RBD_SNAP_HEAD_NAME for the base image).
2379  */
2380 static ssize_t rbd_snap_show(struct device *dev,
2381                              struct device_attribute *attr,
2382                              char *buf)
2383 {
2384         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2385
2386         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2387 }
2388
2389 /*
2390  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2391  * for the parent image.  If there is no parent, simply shows
2392  * "(no parent image)".
2393  */
2394 static ssize_t rbd_parent_show(struct device *dev,
2395                              struct device_attribute *attr,
2396                              char *buf)
2397 {
2398         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2399         struct rbd_spec *spec = rbd_dev->parent_spec;
2400         int count;
2401         char *bufp = buf;
2402
2403         if (!spec)
2404                 return sprintf(buf, "(no parent image)\n");
2405
2406         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2407                         (unsigned long long) spec->pool_id, spec->pool_name);
2408         if (count < 0)
2409                 return count;
2410         bufp += count;
2411
2412         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2413                         spec->image_name ? spec->image_name : "(unknown)");
2414         if (count < 0)
2415                 return count;
2416         bufp += count;
2417
2418         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2419                         (unsigned long long) spec->snap_id, spec->snap_name);
2420         if (count < 0)
2421                 return count;
2422         bufp += count;
2423
2424         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2425         if (count < 0)
2426                 return count;
2427         bufp += count;
2428
2429         return (ssize_t) (bufp - buf);
2430 }
2431
2432 static ssize_t rbd_image_refresh(struct device *dev,
2433                                  struct device_attribute *attr,
2434                                  const char *buf,
2435                                  size_t size)
2436 {
2437         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2438         int ret;
2439
2440         ret = rbd_dev_refresh(rbd_dev, NULL);
2441
2442         return ret < 0 ? ret : size;
2443 }
2444
2445 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2446 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2447 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2448 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2449 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2450 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2451 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2452 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2453 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2454 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2455 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2456
2457 static struct attribute *rbd_attrs[] = {
2458         &dev_attr_size.attr,
2459         &dev_attr_features.attr,
2460         &dev_attr_major.attr,
2461         &dev_attr_client_id.attr,
2462         &dev_attr_pool.attr,
2463         &dev_attr_pool_id.attr,
2464         &dev_attr_name.attr,
2465         &dev_attr_image_id.attr,
2466         &dev_attr_current_snap.attr,
2467         &dev_attr_parent.attr,
2468         &dev_attr_refresh.attr,
2469         NULL
2470 };
2471
2472 static struct attribute_group rbd_attr_group = {
2473         .attrs = rbd_attrs,
2474 };
2475
2476 static const struct attribute_group *rbd_attr_groups[] = {
2477         &rbd_attr_group,
2478         NULL
2479 };
2480
2481 static void rbd_sysfs_dev_release(struct device *dev)
2482 {
2483 }
2484
2485 static struct device_type rbd_device_type = {
2486         .name           = "rbd",
2487         .groups         = rbd_attr_groups,
2488         .release        = rbd_sysfs_dev_release,
2489 };
2490
2491
2492 /*
2493   sysfs - snapshots
2494 */
2495
2496 static ssize_t rbd_snap_size_show(struct device *dev,
2497                                   struct device_attribute *attr,
2498                                   char *buf)
2499 {
2500         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2501
2502         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2503 }
2504
2505 static ssize_t rbd_snap_id_show(struct device *dev,
2506                                 struct device_attribute *attr,
2507                                 char *buf)
2508 {
2509         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2510
2511         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2512 }
2513
2514 static ssize_t rbd_snap_features_show(struct device *dev,
2515                                 struct device_attribute *attr,
2516                                 char *buf)
2517 {
2518         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2519
2520         return sprintf(buf, "0x%016llx\n",
2521                         (unsigned long long) snap->features);
2522 }
2523
2524 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2525 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2526 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2527
2528 static struct attribute *rbd_snap_attrs[] = {
2529         &dev_attr_snap_size.attr,
2530         &dev_attr_snap_id.attr,
2531         &dev_attr_snap_features.attr,
2532         NULL,
2533 };
2534
2535 static struct attribute_group rbd_snap_attr_group = {
2536         .attrs = rbd_snap_attrs,
2537 };
2538
2539 static void rbd_snap_dev_release(struct device *dev)
2540 {
2541         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2542         kfree(snap->name);
2543         kfree(snap);
2544 }
2545
2546 static const struct attribute_group *rbd_snap_attr_groups[] = {
2547         &rbd_snap_attr_group,
2548         NULL
2549 };
2550
2551 static struct device_type rbd_snap_device_type = {
2552         .groups         = rbd_snap_attr_groups,
2553         .release        = rbd_snap_dev_release,
2554 };
2555
2556 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2557 {
2558         kref_get(&spec->kref);
2559
2560         return spec;
2561 }
2562
2563 static void rbd_spec_free(struct kref *kref);
2564 static void rbd_spec_put(struct rbd_spec *spec)
2565 {
2566         if (spec)
2567                 kref_put(&spec->kref, rbd_spec_free);
2568 }
2569
2570 static struct rbd_spec *rbd_spec_alloc(void)
2571 {
2572         struct rbd_spec *spec;
2573
2574         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2575         if (!spec)
2576                 return NULL;
2577         kref_init(&spec->kref);
2578
2579         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2580
2581         return spec;
2582 }
2583
2584 static void rbd_spec_free(struct kref *kref)
2585 {
2586         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2587
2588         kfree(spec->pool_name);
2589         kfree(spec->image_id);
2590         kfree(spec->image_name);
2591         kfree(spec->snap_name);
2592         kfree(spec);
2593 }
2594
2595 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2596                                 struct rbd_spec *spec)
2597 {
2598         struct rbd_device *rbd_dev;
2599
2600         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2601         if (!rbd_dev)
2602                 return NULL;
2603
2604         spin_lock_init(&rbd_dev->lock);
2605         rbd_dev->flags = 0;
2606         INIT_LIST_HEAD(&rbd_dev->node);
2607         INIT_LIST_HEAD(&rbd_dev->snaps);
2608         init_rwsem(&rbd_dev->header_rwsem);
2609
2610         rbd_dev->spec = spec;
2611         rbd_dev->rbd_client = rbdc;
2612
2613         /* Initialize the layout used for all rbd requests */
2614
2615         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2616         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2617         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2618         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2619
2620         return rbd_dev;
2621 }
2622
2623 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2624 {
2625         rbd_spec_put(rbd_dev->parent_spec);
2626         kfree(rbd_dev->header_name);
2627         rbd_put_client(rbd_dev->rbd_client);
2628         rbd_spec_put(rbd_dev->spec);
2629         kfree(rbd_dev);
2630 }
2631
2632 static bool rbd_snap_registered(struct rbd_snap *snap)
2633 {
2634         bool ret = snap->dev.type == &rbd_snap_device_type;
2635         bool reg = device_is_registered(&snap->dev);
2636
2637         rbd_assert(!ret ^ reg);
2638
2639         return ret;
2640 }
2641
2642 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2643 {
2644         list_del(&snap->node);
2645         if (device_is_registered(&snap->dev))
2646                 device_unregister(&snap->dev);
2647 }
2648
2649 static int rbd_register_snap_dev(struct rbd_snap *snap,
2650                                   struct device *parent)
2651 {
2652         struct device *dev = &snap->dev;
2653         int ret;
2654
2655         dev->type = &rbd_snap_device_type;
2656         dev->parent = parent;
2657         dev->release = rbd_snap_dev_release;
2658         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2659         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2660
2661         ret = device_register(dev);
2662
2663         return ret;
2664 }
2665
2666 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2667                                                 const char *snap_name,
2668                                                 u64 snap_id, u64 snap_size,
2669                                                 u64 snap_features)
2670 {
2671         struct rbd_snap *snap;
2672         int ret;
2673
2674         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2675         if (!snap)
2676                 return ERR_PTR(-ENOMEM);
2677
2678         ret = -ENOMEM;
2679         snap->name = kstrdup(snap_name, GFP_KERNEL);
2680         if (!snap->name)
2681                 goto err;
2682
2683         snap->id = snap_id;
2684         snap->size = snap_size;
2685         snap->features = snap_features;
2686
2687         return snap;
2688
2689 err:
2690         kfree(snap->name);
2691         kfree(snap);
2692
2693         return ERR_PTR(ret);
2694 }
2695
2696 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2697                 u64 *snap_size, u64 *snap_features)
2698 {
2699         char *snap_name;
2700
2701         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2702
2703         *snap_size = rbd_dev->header.snap_sizes[which];
2704         *snap_features = 0;     /* No features for v1 */
2705
2706         /* Skip over names until we find the one we are looking for */
2707
2708         snap_name = rbd_dev->header.snap_names;
2709         while (which--)
2710                 snap_name += strlen(snap_name) + 1;
2711
2712         return snap_name;
2713 }
2714
2715 /*
2716  * Get the size and object order for an image snapshot, or if
2717  * snap_id is CEPH_NOSNAP, gets this information for the base
2718  * image.
2719  */
2720 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2721                                 u8 *order, u64 *snap_size)
2722 {
2723         __le64 snapid = cpu_to_le64(snap_id);
2724         int ret;
2725         struct {
2726                 u8 order;
2727                 __le64 size;
2728         } __attribute__ ((packed)) size_buf = { 0 };
2729
2730         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2731                                 "rbd", "get_size",
2732                                 (char *) &snapid, sizeof (snapid),
2733                                 (char *) &size_buf, sizeof (size_buf), NULL);
2734         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2735         if (ret < 0)
2736                 return ret;
2737
2738         *order = size_buf.order;
2739         *snap_size = le64_to_cpu(size_buf.size);
2740
2741         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2742                 (unsigned long long) snap_id, (unsigned int) *order,
2743                 (unsigned long long) *snap_size);
2744
2745         return 0;
2746 }
2747
2748 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2749 {
2750         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2751                                         &rbd_dev->header.obj_order,
2752                                         &rbd_dev->header.image_size);
2753 }
2754
2755 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2756 {
2757         void *reply_buf;
2758         int ret;
2759         void *p;
2760
2761         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2762         if (!reply_buf)
2763                 return -ENOMEM;
2764
2765         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2766                                 "rbd", "get_object_prefix",
2767                                 NULL, 0,
2768                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2769         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2770         if (ret < 0)
2771                 goto out;
2772         ret = 0;    /* rbd_obj_method_sync() can return positive */
2773
2774         p = reply_buf;
2775         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2776                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2777                                                 NULL, GFP_NOIO);
2778
2779         if (IS_ERR(rbd_dev->header.object_prefix)) {
2780                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2781                 rbd_dev->header.object_prefix = NULL;
2782         } else {
2783                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2784         }
2785
2786 out:
2787         kfree(reply_buf);
2788
2789         return ret;
2790 }
2791
2792 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2793                 u64 *snap_features)
2794 {
2795         __le64 snapid = cpu_to_le64(snap_id);
2796         struct {
2797                 __le64 features;
2798                 __le64 incompat;
2799         } features_buf = { 0 };
2800         u64 incompat;
2801         int ret;
2802
2803         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2804                                 "rbd", "get_features",
2805                                 (char *) &snapid, sizeof (snapid),
2806                                 (char *) &features_buf, sizeof (features_buf),
2807                                 NULL);
2808         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2809         if (ret < 0)
2810                 return ret;
2811
2812         incompat = le64_to_cpu(features_buf.incompat);
2813         if (incompat & ~RBD_FEATURES_ALL)
2814                 return -ENXIO;
2815
2816         *snap_features = le64_to_cpu(features_buf.features);
2817
2818         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2819                 (unsigned long long) snap_id,
2820                 (unsigned long long) *snap_features,
2821                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2822
2823         return 0;
2824 }
2825
2826 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2827 {
2828         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2829                                                 &rbd_dev->header.features);
2830 }
2831
2832 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2833 {
2834         struct rbd_spec *parent_spec;
2835         size_t size;
2836         void *reply_buf = NULL;
2837         __le64 snapid;
2838         void *p;
2839         void *end;
2840         char *image_id;
2841         u64 overlap;
2842         int ret;
2843
2844         parent_spec = rbd_spec_alloc();
2845         if (!parent_spec)
2846                 return -ENOMEM;
2847
2848         size = sizeof (__le64) +                                /* pool_id */
2849                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2850                 sizeof (__le64) +                               /* snap_id */
2851                 sizeof (__le64);                                /* overlap */
2852         reply_buf = kmalloc(size, GFP_KERNEL);
2853         if (!reply_buf) {
2854                 ret = -ENOMEM;
2855                 goto out_err;
2856         }
2857
2858         snapid = cpu_to_le64(CEPH_NOSNAP);
2859         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2860                                 "rbd", "get_parent",
2861                                 (char *) &snapid, sizeof (snapid),
2862                                 (char *) reply_buf, size, NULL);
2863         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2864         if (ret < 0)
2865                 goto out_err;
2866
2867         ret = -ERANGE;
2868         p = reply_buf;
2869         end = (char *) reply_buf + size;
2870         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2871         if (parent_spec->pool_id == CEPH_NOPOOL)
2872                 goto out;       /* No parent?  No problem. */
2873
2874         /* The ceph file layout needs to fit pool id in 32 bits */
2875
2876         ret = -EIO;
2877         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2878                 goto out;
2879
2880         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2881         if (IS_ERR(image_id)) {
2882                 ret = PTR_ERR(image_id);
2883                 goto out_err;
2884         }
2885         parent_spec->image_id = image_id;
2886         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2887         ceph_decode_64_safe(&p, end, overlap, out_err);
2888
2889         rbd_dev->parent_overlap = overlap;
2890         rbd_dev->parent_spec = parent_spec;
2891         parent_spec = NULL;     /* rbd_dev now owns this */
2892 out:
2893         ret = 0;
2894 out_err:
2895         kfree(reply_buf);
2896         rbd_spec_put(parent_spec);
2897
2898         return ret;
2899 }
2900
2901 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2902 {
2903         size_t image_id_size;
2904         char *image_id;
2905         void *p;
2906         void *end;
2907         size_t size;
2908         void *reply_buf = NULL;
2909         size_t len = 0;
2910         char *image_name = NULL;
2911         int ret;
2912
2913         rbd_assert(!rbd_dev->spec->image_name);
2914
2915         len = strlen(rbd_dev->spec->image_id);
2916         image_id_size = sizeof (__le32) + len;
2917         image_id = kmalloc(image_id_size, GFP_KERNEL);
2918         if (!image_id)
2919                 return NULL;
2920
2921         p = image_id;
2922         end = (char *) image_id + image_id_size;
2923         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2924
2925         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2926         reply_buf = kmalloc(size, GFP_KERNEL);
2927         if (!reply_buf)
2928                 goto out;
2929
2930         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2931                                 "rbd", "dir_get_name",
2932                                 image_id, image_id_size,
2933                                 (char *) reply_buf, size, NULL);
2934         if (ret < 0)
2935                 goto out;
2936         p = reply_buf;
2937         end = (char *) reply_buf + size;
2938         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2939         if (IS_ERR(image_name))
2940                 image_name = NULL;
2941         else
2942                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2943 out:
2944         kfree(reply_buf);
2945         kfree(image_id);
2946
2947         return image_name;
2948 }
2949
2950 /*
2951  * When a parent image gets probed, we only have the pool, image,
2952  * and snapshot ids but not the names of any of them.  This call
2953  * is made later to fill in those names.  It has to be done after
2954  * rbd_dev_snaps_update() has completed because some of the
2955  * information (in particular, snapshot name) is not available
2956  * until then.
2957  */
2958 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2959 {
2960         struct ceph_osd_client *osdc;
2961         const char *name;
2962         void *reply_buf = NULL;
2963         int ret;
2964
2965         if (rbd_dev->spec->pool_name)
2966                 return 0;       /* Already have the names */
2967
2968         /* Look up the pool name */
2969
2970         osdc = &rbd_dev->rbd_client->client->osdc;
2971         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2972         if (!name) {
2973                 rbd_warn(rbd_dev, "there is no pool with id %llu",
2974                         rbd_dev->spec->pool_id);        /* Really a BUG() */
2975                 return -EIO;
2976         }
2977
2978         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2979         if (!rbd_dev->spec->pool_name)
2980                 return -ENOMEM;
2981
2982         /* Fetch the image name; tolerate failure here */
2983
2984         name = rbd_dev_image_name(rbd_dev);
2985         if (name)
2986                 rbd_dev->spec->image_name = (char *) name;
2987         else
2988                 rbd_warn(rbd_dev, "unable to get image name");
2989
2990         /* Look up the snapshot name. */
2991
2992         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2993         if (!name) {
2994                 rbd_warn(rbd_dev, "no snapshot with id %llu",
2995                         rbd_dev->spec->snap_id);        /* Really a BUG() */
2996                 ret = -EIO;
2997                 goto out_err;
2998         }
2999         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3000         if(!rbd_dev->spec->snap_name)
3001                 goto out_err;
3002
3003         return 0;
3004 out_err:
3005         kfree(reply_buf);
3006         kfree(rbd_dev->spec->pool_name);
3007         rbd_dev->spec->pool_name = NULL;
3008
3009         return ret;
3010 }
3011
3012 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3013 {
3014         size_t size;
3015         int ret;
3016         void *reply_buf;
3017         void *p;
3018         void *end;
3019         u64 seq;
3020         u32 snap_count;
3021         struct ceph_snap_context *snapc;
3022         u32 i;
3023
3024         /*
3025          * We'll need room for the seq value (maximum snapshot id),
3026          * snapshot count, and array of that many snapshot ids.
3027          * For now we have a fixed upper limit on the number we're
3028          * prepared to receive.
3029          */
3030         size = sizeof (__le64) + sizeof (__le32) +
3031                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3032         reply_buf = kzalloc(size, GFP_KERNEL);
3033         if (!reply_buf)
3034                 return -ENOMEM;
3035
3036         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3037                                 "rbd", "get_snapcontext",
3038                                 NULL, 0,
3039                                 reply_buf, size, ver);
3040         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3041         if (ret < 0)
3042                 goto out;
3043
3044         ret = -ERANGE;
3045         p = reply_buf;
3046         end = (char *) reply_buf + size;
3047         ceph_decode_64_safe(&p, end, seq, out);
3048         ceph_decode_32_safe(&p, end, snap_count, out);
3049
3050         /*
3051          * Make sure the reported number of snapshot ids wouldn't go
3052          * beyond the end of our buffer.  But before checking that,
3053          * make sure the computed size of the snapshot context we
3054          * allocate is representable in a size_t.
3055          */
3056         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3057                                  / sizeof (u64)) {
3058                 ret = -EINVAL;
3059                 goto out;
3060         }
3061         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3062                 goto out;
3063
3064         size = sizeof (struct ceph_snap_context) +
3065                                 snap_count * sizeof (snapc->snaps[0]);
3066         snapc = kmalloc(size, GFP_KERNEL);
3067         if (!snapc) {
3068                 ret = -ENOMEM;
3069                 goto out;
3070         }
3071
3072         atomic_set(&snapc->nref, 1);
3073         snapc->seq = seq;
3074         snapc->num_snaps = snap_count;
3075         for (i = 0; i < snap_count; i++)
3076                 snapc->snaps[i] = ceph_decode_64(&p);
3077
3078         rbd_dev->header.snapc = snapc;
3079
3080         dout("  snap context seq = %llu, snap_count = %u\n",
3081                 (unsigned long long) seq, (unsigned int) snap_count);
3082
3083 out:
3084         kfree(reply_buf);
3085
3086         return 0;
3087 }
3088
3089 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3090 {
3091         size_t size;
3092         void *reply_buf;
3093         __le64 snap_id;
3094         int ret;
3095         void *p;
3096         void *end;
3097         char *snap_name;
3098
3099         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3100         reply_buf = kmalloc(size, GFP_KERNEL);
3101         if (!reply_buf)
3102                 return ERR_PTR(-ENOMEM);
3103
3104         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3105         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3106                                 "rbd", "get_snapshot_name",
3107                                 (char *) &snap_id, sizeof (snap_id),
3108                                 reply_buf, size, NULL);
3109         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3110         if (ret < 0)
3111                 goto out;
3112
3113         p = reply_buf;
3114         end = (char *) reply_buf + size;
3115         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3116         if (IS_ERR(snap_name)) {
3117                 ret = PTR_ERR(snap_name);
3118                 goto out;
3119         } else {
3120                 dout("  snap_id 0x%016llx snap_name = %s\n",
3121                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3122         }
3123         kfree(reply_buf);
3124
3125         return snap_name;
3126 out:
3127         kfree(reply_buf);
3128
3129         return ERR_PTR(ret);
3130 }
3131
3132 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3133                 u64 *snap_size, u64 *snap_features)
3134 {
3135         u64 snap_id;
3136         u8 order;
3137         int ret;
3138
3139         snap_id = rbd_dev->header.snapc->snaps[which];
3140         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3141         if (ret)
3142                 return ERR_PTR(ret);
3143         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3144         if (ret)
3145                 return ERR_PTR(ret);
3146
3147         return rbd_dev_v2_snap_name(rbd_dev, which);
3148 }
3149
3150 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3151                 u64 *snap_size, u64 *snap_features)
3152 {
3153         if (rbd_dev->image_format == 1)
3154                 return rbd_dev_v1_snap_info(rbd_dev, which,
3155                                         snap_size, snap_features);
3156         if (rbd_dev->image_format == 2)
3157                 return rbd_dev_v2_snap_info(rbd_dev, which,
3158                                         snap_size, snap_features);
3159         return ERR_PTR(-EINVAL);
3160 }
3161
3162 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3163 {
3164         int ret;
3165         __u8 obj_order;
3166
3167         down_write(&rbd_dev->header_rwsem);
3168
3169         /* Grab old order first, to see if it changes */
3170
3171         obj_order = rbd_dev->header.obj_order,
3172         ret = rbd_dev_v2_image_size(rbd_dev);
3173         if (ret)
3174                 goto out;
3175         if (rbd_dev->header.obj_order != obj_order) {
3176                 ret = -EIO;
3177                 goto out;
3178         }
3179         rbd_update_mapping_size(rbd_dev);
3180
3181         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3182         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3183         if (ret)
3184                 goto out;
3185         ret = rbd_dev_snaps_update(rbd_dev);
3186         dout("rbd_dev_snaps_update returned %d\n", ret);
3187         if (ret)
3188                 goto out;
3189         ret = rbd_dev_snaps_register(rbd_dev);
3190         dout("rbd_dev_snaps_register returned %d\n", ret);
3191 out:
3192         up_write(&rbd_dev->header_rwsem);
3193
3194         return ret;
3195 }
3196
3197 /*
3198  * Scan the rbd device's current snapshot list and compare it to the
3199  * newly-received snapshot context.  Remove any existing snapshots
3200  * not present in the new snapshot context.  Add a new snapshot for
3201  * any snaphots in the snapshot context not in the current list.
3202  * And verify there are no changes to snapshots we already know
3203  * about.
3204  *
3205  * Assumes the snapshots in the snapshot context are sorted by
3206  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3207  * are also maintained in that order.)
3208  */
3209 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3210 {
3211         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3212         const u32 snap_count = snapc->num_snaps;
3213         struct list_head *head = &rbd_dev->snaps;
3214         struct list_head *links = head->next;
3215         u32 index = 0;
3216
3217         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3218         while (index < snap_count || links != head) {
3219                 u64 snap_id;
3220                 struct rbd_snap *snap;
3221                 char *snap_name;
3222                 u64 snap_size = 0;
3223                 u64 snap_features = 0;
3224
3225                 snap_id = index < snap_count ? snapc->snaps[index]
3226                                              : CEPH_NOSNAP;
3227                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3228                                      : NULL;
3229                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3230
3231                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3232                         struct list_head *next = links->next;
3233
3234                         /*
3235                          * A previously-existing snapshot is not in
3236                          * the new snap context.
3237                          *
3238                          * If the now missing snapshot is the one the
3239                          * image is mapped to, clear its exists flag
3240                          * so we can avoid sending any more requests
3241                          * to it.
3242                          */
3243                         if (rbd_dev->spec->snap_id == snap->id)
3244                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3245                         rbd_remove_snap_dev(snap);
3246                         dout("%ssnap id %llu has been removed\n",
3247                                 rbd_dev->spec->snap_id == snap->id ?
3248                                                         "mapped " : "",
3249                                 (unsigned long long) snap->id);
3250
3251                         /* Done with this list entry; advance */
3252
3253                         links = next;
3254                         continue;
3255                 }
3256
3257                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3258                                         &snap_size, &snap_features);
3259                 if (IS_ERR(snap_name))
3260                         return PTR_ERR(snap_name);
3261
3262                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3263                         (unsigned long long) snap_id);
3264                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3265                         struct rbd_snap *new_snap;
3266
3267                         /* We haven't seen this snapshot before */
3268
3269                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3270                                         snap_id, snap_size, snap_features);
3271                         if (IS_ERR(new_snap)) {
3272                                 int err = PTR_ERR(new_snap);
3273
3274                                 dout("  failed to add dev, error %d\n", err);
3275
3276                                 return err;
3277                         }
3278
3279                         /* New goes before existing, or at end of list */
3280
3281                         dout("  added dev%s\n", snap ? "" : " at end\n");
3282                         if (snap)
3283                                 list_add_tail(&new_snap->node, &snap->node);
3284                         else
3285                                 list_add_tail(&new_snap->node, head);
3286                 } else {
3287                         /* Already have this one */
3288
3289                         dout("  already present\n");
3290
3291                         rbd_assert(snap->size == snap_size);
3292                         rbd_assert(!strcmp(snap->name, snap_name));
3293                         rbd_assert(snap->features == snap_features);
3294
3295                         /* Done with this list entry; advance */
3296
3297                         links = links->next;
3298                 }
3299
3300                 /* Advance to the next entry in the snapshot context */
3301
3302                 index++;
3303         }
3304         dout("%s: done\n", __func__);
3305
3306         return 0;
3307 }
3308
3309 /*
3310  * Scan the list of snapshots and register the devices for any that
3311  * have not already been registered.
3312  */
3313 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3314 {
3315         struct rbd_snap *snap;
3316         int ret = 0;
3317
3318         dout("%s called\n", __func__);
3319         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3320                 return -EIO;
3321
3322         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3323                 if (!rbd_snap_registered(snap)) {
3324                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3325                         if (ret < 0)
3326                                 break;
3327                 }
3328         }
3329         dout("%s: returning %d\n", __func__, ret);
3330
3331         return ret;
3332 }
3333
3334 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3335 {
3336         struct device *dev;
3337         int ret;
3338
3339         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3340
3341         dev = &rbd_dev->dev;
3342         dev->bus = &rbd_bus_type;
3343         dev->type = &rbd_device_type;
3344         dev->parent = &rbd_root_dev;
3345         dev->release = rbd_dev_release;
3346         dev_set_name(dev, "%d", rbd_dev->dev_id);
3347         ret = device_register(dev);
3348
3349         mutex_unlock(&ctl_mutex);
3350
3351         return ret;
3352 }
3353
3354 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3355 {
3356         device_unregister(&rbd_dev->dev);
3357 }
3358
3359 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3360
3361 /*
3362  * Get a unique rbd identifier for the given new rbd_dev, and add
3363  * the rbd_dev to the global list.  The minimum rbd id is 1.
3364  */
3365 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3366 {
3367         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3368
3369         spin_lock(&rbd_dev_list_lock);
3370         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3371         spin_unlock(&rbd_dev_list_lock);
3372         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3373                 (unsigned long long) rbd_dev->dev_id);
3374 }
3375
3376 /*
3377  * Remove an rbd_dev from the global list, and record that its
3378  * identifier is no longer in use.
3379  */
3380 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3381 {
3382         struct list_head *tmp;
3383         int rbd_id = rbd_dev->dev_id;
3384         int max_id;
3385
3386         rbd_assert(rbd_id > 0);
3387
3388         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3389                 (unsigned long long) rbd_dev->dev_id);
3390         spin_lock(&rbd_dev_list_lock);
3391         list_del_init(&rbd_dev->node);
3392
3393         /*
3394          * If the id being "put" is not the current maximum, there
3395          * is nothing special we need to do.
3396          */
3397         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3398                 spin_unlock(&rbd_dev_list_lock);
3399                 return;
3400         }
3401
3402         /*
3403          * We need to update the current maximum id.  Search the
3404          * list to find out what it is.  We're more likely to find
3405          * the maximum at the end, so search the list backward.
3406          */
3407         max_id = 0;
3408         list_for_each_prev(tmp, &rbd_dev_list) {
3409                 struct rbd_device *rbd_dev;
3410
3411                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3412                 if (rbd_dev->dev_id > max_id)
3413                         max_id = rbd_dev->dev_id;
3414         }
3415         spin_unlock(&rbd_dev_list_lock);
3416
3417         /*
3418          * The max id could have been updated by rbd_dev_id_get(), in
3419          * which case it now accurately reflects the new maximum.
3420          * Be careful not to overwrite the maximum value in that
3421          * case.
3422          */
3423         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3424         dout("  max dev id has been reset\n");
3425 }
3426
3427 /*
3428  * Skips over white space at *buf, and updates *buf to point to the
3429  * first found non-space character (if any). Returns the length of
3430  * the token (string of non-white space characters) found.  Note
3431  * that *buf must be terminated with '\0'.
3432  */
3433 static inline size_t next_token(const char **buf)
3434 {
3435         /*
3436         * These are the characters that produce nonzero for
3437         * isspace() in the "C" and "POSIX" locales.
3438         */
3439         const char *spaces = " \f\n\r\t\v";
3440
3441         *buf += strspn(*buf, spaces);   /* Find start of token */
3442
3443         return strcspn(*buf, spaces);   /* Return token length */
3444 }
3445
3446 /*
3447  * Finds the next token in *buf, and if the provided token buffer is
3448  * big enough, copies the found token into it.  The result, if
3449  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3450  * must be terminated with '\0' on entry.
3451  *
3452  * Returns the length of the token found (not including the '\0').
3453  * Return value will be 0 if no token is found, and it will be >=
3454  * token_size if the token would not fit.
3455  *
3456  * The *buf pointer will be updated to point beyond the end of the
3457  * found token.  Note that this occurs even if the token buffer is
3458  * too small to hold it.
3459  */
3460 static inline size_t copy_token(const char **buf,
3461                                 char *token,
3462                                 size_t token_size)
3463 {
3464         size_t len;
3465
3466         len = next_token(buf);
3467         if (len < token_size) {
3468                 memcpy(token, *buf, len);
3469                 *(token + len) = '\0';
3470         }
3471         *buf += len;
3472
3473         return len;
3474 }
3475
3476 /*
3477  * Finds the next token in *buf, dynamically allocates a buffer big
3478  * enough to hold a copy of it, and copies the token into the new
3479  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3480  * that a duplicate buffer is created even for a zero-length token.
3481  *
3482  * Returns a pointer to the newly-allocated duplicate, or a null
3483  * pointer if memory for the duplicate was not available.  If
3484  * the lenp argument is a non-null pointer, the length of the token
3485  * (not including the '\0') is returned in *lenp.
3486  *
3487  * If successful, the *buf pointer will be updated to point beyond
3488  * the end of the found token.
3489  *
3490  * Note: uses GFP_KERNEL for allocation.
3491  */
3492 static inline char *dup_token(const char **buf, size_t *lenp)
3493 {
3494         char *dup;
3495         size_t len;
3496
3497         len = next_token(buf);
3498         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3499         if (!dup)
3500                 return NULL;
3501         *(dup + len) = '\0';
3502         *buf += len;
3503
3504         if (lenp)
3505                 *lenp = len;
3506
3507         return dup;
3508 }
3509
3510 /*
3511  * Parse the options provided for an "rbd add" (i.e., rbd image
3512  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3513  * and the data written is passed here via a NUL-terminated buffer.
3514  * Returns 0 if successful or an error code otherwise.
3515  *
3516  * The information extracted from these options is recorded in
3517  * the other parameters which return dynamically-allocated
3518  * structures:
3519  *  ceph_opts
3520  *      The address of a pointer that will refer to a ceph options
3521  *      structure.  Caller must release the returned pointer using
3522  *      ceph_destroy_options() when it is no longer needed.
3523  *  rbd_opts
3524  *      Address of an rbd options pointer.  Fully initialized by
3525  *      this function; caller must release with kfree().
3526  *  spec
3527  *      Address of an rbd image specification pointer.  Fully
3528  *      initialized by this function based on parsed options.
3529  *      Caller must release with rbd_spec_put().
3530  *
3531  * The options passed take this form:
3532  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3533  * where:
3534  *  <mon_addrs>
3535  *      A comma-separated list of one or more monitor addresses.
3536  *      A monitor address is an ip address, optionally followed
3537  *      by a port number (separated by a colon).
3538  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3539  *  <options>
3540  *      A comma-separated list of ceph and/or rbd options.
3541  *  <pool_name>
3542  *      The name of the rados pool containing the rbd image.
3543  *  <image_name>
3544  *      The name of the image in that pool to map.
3545  *  <snap_id>
3546  *      An optional snapshot id.  If provided, the mapping will
3547  *      present data from the image at the time that snapshot was
3548  *      created.  The image head is used if no snapshot id is
3549  *      provided.  Snapshot mappings are always read-only.
3550  */
3551 static int rbd_add_parse_args(const char *buf,
3552                                 struct ceph_options **ceph_opts,
3553                                 struct rbd_options **opts,
3554                                 struct rbd_spec **rbd_spec)
3555 {
3556         size_t len;
3557         char *options;
3558         const char *mon_addrs;
3559         size_t mon_addrs_size;
3560         struct rbd_spec *spec = NULL;
3561         struct rbd_options *rbd_opts = NULL;
3562         struct ceph_options *copts;
3563         int ret;
3564
3565         /* The first four tokens are required */
3566
3567         len = next_token(&buf);
3568         if (!len) {
3569                 rbd_warn(NULL, "no monitor address(es) provided");
3570                 return -EINVAL;
3571         }
3572         mon_addrs = buf;
3573         mon_addrs_size = len + 1;
3574         buf += len;
3575
3576         ret = -EINVAL;
3577         options = dup_token(&buf, NULL);
3578         if (!options)
3579                 return -ENOMEM;
3580         if (!*options) {
3581                 rbd_warn(NULL, "no options provided");
3582                 goto out_err;
3583         }
3584
3585         spec = rbd_spec_alloc();
3586         if (!spec)
3587                 goto out_mem;
3588
3589         spec->pool_name = dup_token(&buf, NULL);
3590         if (!spec->pool_name)
3591                 goto out_mem;
3592         if (!*spec->pool_name) {
3593                 rbd_warn(NULL, "no pool name provided");
3594                 goto out_err;
3595         }
3596
3597         spec->image_name = dup_token(&buf, NULL);
3598         if (!spec->image_name)
3599                 goto out_mem;
3600         if (!*spec->image_name) {
3601                 rbd_warn(NULL, "no image name provided");
3602                 goto out_err;
3603         }
3604
3605         /*
3606          * Snapshot name is optional; default is to use "-"
3607          * (indicating the head/no snapshot).
3608          */
3609         len = next_token(&buf);
3610         if (!len) {
3611                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3612                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3613         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3614                 ret = -ENAMETOOLONG;
3615                 goto out_err;
3616         }
3617         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3618         if (!spec->snap_name)
3619                 goto out_mem;
3620         *(spec->snap_name + len) = '\0';
3621
3622         /* Initialize all rbd options to the defaults */
3623
3624         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3625         if (!rbd_opts)
3626                 goto out_mem;
3627
3628         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3629
3630         copts = ceph_parse_options(options, mon_addrs,
3631                                         mon_addrs + mon_addrs_size - 1,
3632                                         parse_rbd_opts_token, rbd_opts);
3633         if (IS_ERR(copts)) {
3634                 ret = PTR_ERR(copts);
3635                 goto out_err;
3636         }
3637         kfree(options);
3638
3639         *ceph_opts = copts;
3640         *opts = rbd_opts;
3641         *rbd_spec = spec;
3642
3643         return 0;
3644 out_mem:
3645         ret = -ENOMEM;
3646 out_err:
3647         kfree(rbd_opts);
3648         rbd_spec_put(spec);
3649         kfree(options);
3650
3651         return ret;
3652 }
3653
3654 /*
3655  * An rbd format 2 image has a unique identifier, distinct from the
3656  * name given to it by the user.  Internally, that identifier is
3657  * what's used to specify the names of objects related to the image.
3658  *
3659  * A special "rbd id" object is used to map an rbd image name to its
3660  * id.  If that object doesn't exist, then there is no v2 rbd image
3661  * with the supplied name.
3662  *
3663  * This function will record the given rbd_dev's image_id field if
3664  * it can be determined, and in that case will return 0.  If any
3665  * errors occur a negative errno will be returned and the rbd_dev's
3666  * image_id field will be unchanged (and should be NULL).
3667  */
3668 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3669 {
3670         int ret;
3671         size_t size;
3672         char *object_name;
3673         void *response;
3674         void *p;
3675
3676         /*
3677          * When probing a parent image, the image id is already
3678          * known (and the image name likely is not).  There's no
3679          * need to fetch the image id again in this case.
3680          */
3681         if (rbd_dev->spec->image_id)
3682                 return 0;
3683
3684         /*
3685          * First, see if the format 2 image id file exists, and if
3686          * so, get the image's persistent id from it.
3687          */
3688         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3689         object_name = kmalloc(size, GFP_NOIO);
3690         if (!object_name)
3691                 return -ENOMEM;
3692         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3693         dout("rbd id object name is %s\n", object_name);
3694
3695         /* Response will be an encoded string, which includes a length */
3696
3697         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3698         response = kzalloc(size, GFP_NOIO);
3699         if (!response) {
3700                 ret = -ENOMEM;
3701                 goto out;
3702         }
3703
3704         ret = rbd_obj_method_sync(rbd_dev, object_name,
3705                                 "rbd", "get_id",
3706                                 NULL, 0,
3707                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3708         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3709         if (ret < 0)
3710                 goto out;
3711         ret = 0;    /* rbd_obj_method_sync() can return positive */
3712
3713         p = response;
3714         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3715                                                 p + RBD_IMAGE_ID_LEN_MAX,
3716                                                 NULL, GFP_NOIO);
3717         if (IS_ERR(rbd_dev->spec->image_id)) {
3718                 ret = PTR_ERR(rbd_dev->spec->image_id);
3719                 rbd_dev->spec->image_id = NULL;
3720         } else {
3721                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3722         }
3723 out:
3724         kfree(response);
3725         kfree(object_name);
3726
3727         return ret;
3728 }
3729
3730 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3731 {
3732         int ret;
3733         size_t size;
3734
3735         /* Version 1 images have no id; empty string is used */
3736
3737         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3738         if (!rbd_dev->spec->image_id)
3739                 return -ENOMEM;
3740
3741         /* Record the header object name for this rbd image. */
3742
3743         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3744         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3745         if (!rbd_dev->header_name) {
3746                 ret = -ENOMEM;
3747                 goto out_err;
3748         }
3749         sprintf(rbd_dev->header_name, "%s%s",
3750                 rbd_dev->spec->image_name, RBD_SUFFIX);
3751
3752         /* Populate rbd image metadata */
3753
3754         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3755         if (ret < 0)
3756                 goto out_err;
3757
3758         /* Version 1 images have no parent (no layering) */
3759
3760         rbd_dev->parent_spec = NULL;
3761         rbd_dev->parent_overlap = 0;
3762
3763         rbd_dev->image_format = 1;
3764
3765         dout("discovered version 1 image, header name is %s\n",
3766                 rbd_dev->header_name);
3767
3768         return 0;
3769
3770 out_err:
3771         kfree(rbd_dev->header_name);
3772         rbd_dev->header_name = NULL;
3773         kfree(rbd_dev->spec->image_id);
3774         rbd_dev->spec->image_id = NULL;
3775
3776         return ret;
3777 }
3778
3779 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3780 {
3781         size_t size;
3782         int ret;
3783         u64 ver = 0;
3784
3785         /*
3786          * Image id was filled in by the caller.  Record the header
3787          * object name for this rbd image.
3788          */
3789         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3790         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3791         if (!rbd_dev->header_name)
3792                 return -ENOMEM;
3793         sprintf(rbd_dev->header_name, "%s%s",
3794                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3795
3796         /* Get the size and object order for the image */
3797
3798         ret = rbd_dev_v2_image_size(rbd_dev);
3799         if (ret < 0)
3800                 goto out_err;
3801
3802         /* Get the object prefix (a.k.a. block_name) for the image */
3803
3804         ret = rbd_dev_v2_object_prefix(rbd_dev);
3805         if (ret < 0)
3806                 goto out_err;
3807
3808         /* Get the and check features for the image */
3809
3810         ret = rbd_dev_v2_features(rbd_dev);
3811         if (ret < 0)
3812                 goto out_err;
3813
3814         /* If the image supports layering, get the parent info */
3815
3816         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3817                 ret = rbd_dev_v2_parent_info(rbd_dev);
3818                 if (ret < 0)
3819                         goto out_err;
3820         }
3821
3822         /* crypto and compression type aren't (yet) supported for v2 images */
3823
3824         rbd_dev->header.crypt_type = 0;
3825         rbd_dev->header.comp_type = 0;
3826
3827         /* Get the snapshot context, plus the header version */
3828
3829         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3830         if (ret)
3831                 goto out_err;
3832         rbd_dev->header.obj_version = ver;
3833
3834         rbd_dev->image_format = 2;
3835
3836         dout("discovered version 2 image, header name is %s\n",
3837                 rbd_dev->header_name);
3838
3839         return 0;
3840 out_err:
3841         rbd_dev->parent_overlap = 0;
3842         rbd_spec_put(rbd_dev->parent_spec);
3843         rbd_dev->parent_spec = NULL;
3844         kfree(rbd_dev->header_name);
3845         rbd_dev->header_name = NULL;
3846         kfree(rbd_dev->header.object_prefix);
3847         rbd_dev->header.object_prefix = NULL;
3848
3849         return ret;
3850 }
3851
3852 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3853 {
3854         int ret;
3855
3856         /* no need to lock here, as rbd_dev is not registered yet */
3857         ret = rbd_dev_snaps_update(rbd_dev);
3858         if (ret)
3859                 return ret;
3860
3861         ret = rbd_dev_probe_update_spec(rbd_dev);
3862         if (ret)
3863                 goto err_out_snaps;
3864
3865         ret = rbd_dev_set_mapping(rbd_dev);
3866         if (ret)
3867                 goto err_out_snaps;
3868
3869         /* generate unique id: find highest unique id, add one */
3870         rbd_dev_id_get(rbd_dev);
3871
3872         /* Fill in the device name, now that we have its id. */
3873         BUILD_BUG_ON(DEV_NAME_LEN
3874                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3875         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3876
3877         /* Get our block major device number. */
3878
3879         ret = register_blkdev(0, rbd_dev->name);
3880         if (ret < 0)
3881                 goto err_out_id;
3882         rbd_dev->major = ret;
3883
3884         /* Set up the blkdev mapping. */
3885
3886         ret = rbd_init_disk(rbd_dev);
3887         if (ret)
3888                 goto err_out_blkdev;
3889
3890         ret = rbd_bus_add_dev(rbd_dev);
3891         if (ret)
3892                 goto err_out_disk;
3893
3894         /*
3895          * At this point cleanup in the event of an error is the job
3896          * of the sysfs code (initiated by rbd_bus_del_dev()).
3897          */
3898         down_write(&rbd_dev->header_rwsem);
3899         ret = rbd_dev_snaps_register(rbd_dev);
3900         up_write(&rbd_dev->header_rwsem);
3901         if (ret)
3902                 goto err_out_bus;
3903
3904         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3905         if (ret)
3906                 goto err_out_bus;
3907
3908         /* Everything's ready.  Announce the disk to the world. */
3909
3910         add_disk(rbd_dev->disk);
3911
3912         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3913                 (unsigned long long) rbd_dev->mapping.size);
3914
3915         return ret;
3916 err_out_bus:
3917         /* this will also clean up rest of rbd_dev stuff */
3918
3919         rbd_bus_del_dev(rbd_dev);
3920
3921         return ret;
3922 err_out_disk:
3923         rbd_free_disk(rbd_dev);
3924 err_out_blkdev:
3925         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3926 err_out_id:
3927         rbd_dev_id_put(rbd_dev);
3928 err_out_snaps:
3929         rbd_remove_all_snaps(rbd_dev);
3930
3931         return ret;
3932 }
3933
3934 /*
3935  * Probe for the existence of the header object for the given rbd
3936  * device.  For format 2 images this includes determining the image
3937  * id.
3938  */
3939 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3940 {
3941         int ret;
3942
3943         /*
3944          * Get the id from the image id object.  If it's not a
3945          * format 2 image, we'll get ENOENT back, and we'll assume
3946          * it's a format 1 image.
3947          */
3948         ret = rbd_dev_image_id(rbd_dev);
3949         if (ret)
3950                 ret = rbd_dev_v1_probe(rbd_dev);
3951         else
3952                 ret = rbd_dev_v2_probe(rbd_dev);
3953         if (ret) {
3954                 dout("probe failed, returning %d\n", ret);
3955
3956                 return ret;
3957         }
3958
3959         ret = rbd_dev_probe_finish(rbd_dev);
3960         if (ret)
3961                 rbd_header_free(&rbd_dev->header);
3962
3963         return ret;
3964 }
3965
3966 static ssize_t rbd_add(struct bus_type *bus,
3967                        const char *buf,
3968                        size_t count)
3969 {
3970         struct rbd_device *rbd_dev = NULL;
3971         struct ceph_options *ceph_opts = NULL;
3972         struct rbd_options *rbd_opts = NULL;
3973         struct rbd_spec *spec = NULL;
3974         struct rbd_client *rbdc;
3975         struct ceph_osd_client *osdc;
3976         int rc = -ENOMEM;
3977
3978         if (!try_module_get(THIS_MODULE))
3979                 return -ENODEV;
3980
3981         /* parse add command */
3982         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3983         if (rc < 0)
3984                 goto err_out_module;
3985
3986         rbdc = rbd_get_client(ceph_opts);
3987         if (IS_ERR(rbdc)) {
3988                 rc = PTR_ERR(rbdc);
3989                 goto err_out_args;
3990         }
3991         ceph_opts = NULL;       /* rbd_dev client now owns this */
3992
3993         /* pick the pool */
3994         osdc = &rbdc->client->osdc;
3995         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3996         if (rc < 0)
3997                 goto err_out_client;
3998         spec->pool_id = (u64) rc;
3999
4000         /* The ceph file layout needs to fit pool id in 32 bits */
4001
4002         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4003                 rc = -EIO;
4004                 goto err_out_client;
4005         }
4006
4007         rbd_dev = rbd_dev_create(rbdc, spec);
4008         if (!rbd_dev)
4009                 goto err_out_client;
4010         rbdc = NULL;            /* rbd_dev now owns this */
4011         spec = NULL;            /* rbd_dev now owns this */
4012
4013         rbd_dev->mapping.read_only = rbd_opts->read_only;
4014         kfree(rbd_opts);
4015         rbd_opts = NULL;        /* done with this */
4016
4017         rc = rbd_dev_probe(rbd_dev);
4018         if (rc < 0)
4019                 goto err_out_rbd_dev;
4020
4021         return count;
4022 err_out_rbd_dev:
4023         rbd_dev_destroy(rbd_dev);
4024 err_out_client:
4025         rbd_put_client(rbdc);
4026 err_out_args:
4027         if (ceph_opts)
4028                 ceph_destroy_options(ceph_opts);
4029         kfree(rbd_opts);
4030         rbd_spec_put(spec);
4031 err_out_module:
4032         module_put(THIS_MODULE);
4033
4034         dout("Error adding device %s\n", buf);
4035
4036         return (ssize_t) rc;
4037 }
4038
4039 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4040 {
4041         struct list_head *tmp;
4042         struct rbd_device *rbd_dev;
4043
4044         spin_lock(&rbd_dev_list_lock);
4045         list_for_each(tmp, &rbd_dev_list) {
4046                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4047                 if (rbd_dev->dev_id == dev_id) {
4048                         spin_unlock(&rbd_dev_list_lock);
4049                         return rbd_dev;
4050                 }
4051         }
4052         spin_unlock(&rbd_dev_list_lock);
4053         return NULL;
4054 }
4055
4056 static void rbd_dev_release(struct device *dev)
4057 {
4058         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4059
4060         if (rbd_dev->watch_event)
4061                 rbd_dev_header_watch_sync(rbd_dev, 0);
4062
4063         /* clean up and free blkdev */
4064         rbd_free_disk(rbd_dev);
4065         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4066
4067         /* release allocated disk header fields */
4068         rbd_header_free(&rbd_dev->header);
4069
4070         /* done with the id, and with the rbd_dev */
4071         rbd_dev_id_put(rbd_dev);
4072         rbd_assert(rbd_dev->rbd_client != NULL);
4073         rbd_dev_destroy(rbd_dev);
4074
4075         /* release module ref */
4076         module_put(THIS_MODULE);
4077 }
4078
4079 static ssize_t rbd_remove(struct bus_type *bus,
4080                           const char *buf,
4081                           size_t count)
4082 {
4083         struct rbd_device *rbd_dev = NULL;
4084         int target_id, rc;
4085         unsigned long ul;
4086         int ret = count;
4087
4088         rc = strict_strtoul(buf, 10, &ul);
4089         if (rc)
4090                 return rc;
4091
4092         /* convert to int; abort if we lost anything in the conversion */
4093         target_id = (int) ul;
4094         if (target_id != ul)
4095                 return -EINVAL;
4096
4097         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4098
4099         rbd_dev = __rbd_get_dev(target_id);
4100         if (!rbd_dev) {
4101                 ret = -ENOENT;
4102                 goto done;
4103         }
4104
4105         spin_lock(&rbd_dev->lock);
4106         if (rbd_dev->open_count)
4107                 ret = -EBUSY;
4108         else
4109                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4110         spin_unlock(&rbd_dev->lock);
4111         if (ret < 0)
4112                 goto done;
4113
4114         rbd_remove_all_snaps(rbd_dev);
4115         rbd_bus_del_dev(rbd_dev);
4116
4117 done:
4118         mutex_unlock(&ctl_mutex);
4119
4120         return ret;
4121 }
4122
4123 /*
4124  * create control files in sysfs
4125  * /sys/bus/rbd/...
4126  */
4127 static int rbd_sysfs_init(void)
4128 {
4129         int ret;
4130
4131         ret = device_register(&rbd_root_dev);
4132         if (ret < 0)
4133                 return ret;
4134
4135         ret = bus_register(&rbd_bus_type);
4136         if (ret < 0)
4137                 device_unregister(&rbd_root_dev);
4138
4139         return ret;
4140 }
4141
4142 static void rbd_sysfs_cleanup(void)
4143 {
4144         bus_unregister(&rbd_bus_type);
4145         device_unregister(&rbd_root_dev);
4146 }
4147
4148 int __init rbd_init(void)
4149 {
4150         int rc;
4151
4152         rc = rbd_sysfs_init();
4153         if (rc)
4154                 return rc;
4155         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4156         return 0;
4157 }
4158
4159 void __exit rbd_exit(void)
4160 {
4161         rbd_sysfs_cleanup();
4162 }
4163
4164 module_init(rbd_init);
4165 module_exit(rbd_exit);
4166
4167 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4168 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4169 MODULE_DESCRIPTION("rados block device");
4170
4171 /* following authorship retained from original osdblk.c */
4172 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4173
4174 MODULE_LICENSE("GPL");