]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: record overall image request result
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 struct rbd_obj_request {
174         const char              *object_name;
175         u64                     offset;         /* object start byte */
176         u64                     length;         /* bytes from offset */
177
178         struct rbd_img_request  *img_request;
179         struct list_head        links;          /* img_request->obj_requests */
180         u32                     which;          /* posn image request list */
181
182         enum obj_request_type   type;
183         union {
184                 struct bio      *bio_list;
185                 struct {
186                         struct page     **pages;
187                         u32             page_count;
188                 };
189         };
190
191         struct ceph_osd_request *osd_req;
192
193         u64                     xferred;        /* bytes transferred */
194         u64                     version;
195         int                     result;
196         atomic_t                done;
197
198         rbd_obj_callback_t      callback;
199         struct completion       completion;
200
201         struct kref             kref;
202 };
203
204 struct rbd_img_request {
205         struct request          *rq;
206         struct rbd_device       *rbd_dev;
207         u64                     offset; /* starting image byte offset */
208         u64                     length; /* byte count from offset */
209         bool                    write_request;  /* false for read */
210         union {
211                 struct ceph_snap_context *snapc;        /* for writes */
212                 u64             snap_id;                /* for reads */
213         };
214         spinlock_t              completion_lock;/* protects next_completion */
215         u32                     next_completion;
216         rbd_img_callback_t      callback;
217         int                     result; /* first nonzero obj_request result */
218
219         u32                     obj_request_count;
220         struct list_head        obj_requests;   /* rbd_obj_request structs */
221
222         struct kref             kref;
223 };
224
225 #define for_each_obj_request(ireq, oreq) \
226         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
227 #define for_each_obj_request_from(ireq, oreq) \
228         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
229 #define for_each_obj_request_safe(ireq, oreq, n) \
230         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
231
232 struct rbd_snap {
233         struct  device          dev;
234         const char              *name;
235         u64                     size;
236         struct list_head        node;
237         u64                     id;
238         u64                     features;
239 };
240
241 struct rbd_mapping {
242         u64                     size;
243         u64                     features;
244         bool                    read_only;
245 };
246
247 /*
248  * a single device
249  */
250 struct rbd_device {
251         int                     dev_id;         /* blkdev unique id */
252
253         int                     major;          /* blkdev assigned major */
254         struct gendisk          *disk;          /* blkdev's gendisk and rq */
255
256         u32                     image_format;   /* Either 1 or 2 */
257         struct rbd_client       *rbd_client;
258
259         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
260
261         spinlock_t              lock;           /* queue, flags, open_count */
262
263         struct rbd_image_header header;
264         unsigned long           flags;          /* possibly lock protected */
265         struct rbd_spec         *spec;
266
267         char                    *header_name;
268
269         struct ceph_file_layout layout;
270
271         struct ceph_osd_event   *watch_event;
272         struct rbd_obj_request  *watch_request;
273
274         struct rbd_spec         *parent_spec;
275         u64                     parent_overlap;
276
277         /* protects updating the header */
278         struct rw_semaphore     header_rwsem;
279
280         struct rbd_mapping      mapping;
281
282         struct list_head        node;
283
284         /* list of snapshots */
285         struct list_head        snaps;
286
287         /* sysfs related */
288         struct device           dev;
289         unsigned long           open_count;     /* protected by lock */
290 };
291
292 /*
293  * Flag bits for rbd_dev->flags.  If atomicity is required,
294  * rbd_dev->lock is used to protect access.
295  *
296  * Currently, only the "removing" flag (which is coupled with the
297  * "open_count" field) requires atomic access.
298  */
299 enum rbd_dev_flags {
300         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
301         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
302 };
303
304 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
305
306 static LIST_HEAD(rbd_dev_list);    /* devices */
307 static DEFINE_SPINLOCK(rbd_dev_list_lock);
308
309 static LIST_HEAD(rbd_client_list);              /* clients */
310 static DEFINE_SPINLOCK(rbd_client_list_lock);
311
312 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
313 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
314
315 static void rbd_dev_release(struct device *dev);
316 static void rbd_remove_snap_dev(struct rbd_snap *snap);
317
318 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
319                        size_t count);
320 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
321                           size_t count);
322
323 static struct bus_attribute rbd_bus_attrs[] = {
324         __ATTR(add, S_IWUSR, NULL, rbd_add),
325         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
326         __ATTR_NULL
327 };
328
329 static struct bus_type rbd_bus_type = {
330         .name           = "rbd",
331         .bus_attrs      = rbd_bus_attrs,
332 };
333
334 static void rbd_root_dev_release(struct device *dev)
335 {
336 }
337
338 static struct device rbd_root_dev = {
339         .init_name =    "rbd",
340         .release =      rbd_root_dev_release,
341 };
342
343 static __printf(2, 3)
344 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
345 {
346         struct va_format vaf;
347         va_list args;
348
349         va_start(args, fmt);
350         vaf.fmt = fmt;
351         vaf.va = &args;
352
353         if (!rbd_dev)
354                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
355         else if (rbd_dev->disk)
356                 printk(KERN_WARNING "%s: %s: %pV\n",
357                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
358         else if (rbd_dev->spec && rbd_dev->spec->image_name)
359                 printk(KERN_WARNING "%s: image %s: %pV\n",
360                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
361         else if (rbd_dev->spec && rbd_dev->spec->image_id)
362                 printk(KERN_WARNING "%s: id %s: %pV\n",
363                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
364         else    /* punt */
365                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
366                         RBD_DRV_NAME, rbd_dev, &vaf);
367         va_end(args);
368 }
369
370 #ifdef RBD_DEBUG
371 #define rbd_assert(expr)                                                \
372                 if (unlikely(!(expr))) {                                \
373                         printk(KERN_ERR "\nAssertion failure in %s() "  \
374                                                 "at line %d:\n\n"       \
375                                         "\trbd_assert(%s);\n\n",        \
376                                         __func__, __LINE__, #expr);     \
377                         BUG();                                          \
378                 }
379 #else /* !RBD_DEBUG */
380 #  define rbd_assert(expr)      ((void) 0)
381 #endif /* !RBD_DEBUG */
382
383 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
384 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
385
386 static int rbd_open(struct block_device *bdev, fmode_t mode)
387 {
388         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
389         bool removing = false;
390
391         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
392                 return -EROFS;
393
394         spin_lock_irq(&rbd_dev->lock);
395         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
396                 removing = true;
397         else
398                 rbd_dev->open_count++;
399         spin_unlock_irq(&rbd_dev->lock);
400         if (removing)
401                 return -ENOENT;
402
403         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
404         (void) get_device(&rbd_dev->dev);
405         set_device_ro(bdev, rbd_dev->mapping.read_only);
406         mutex_unlock(&ctl_mutex);
407
408         return 0;
409 }
410
411 static int rbd_release(struct gendisk *disk, fmode_t mode)
412 {
413         struct rbd_device *rbd_dev = disk->private_data;
414         unsigned long open_count_before;
415
416         spin_lock_irq(&rbd_dev->lock);
417         open_count_before = rbd_dev->open_count--;
418         spin_unlock_irq(&rbd_dev->lock);
419         rbd_assert(open_count_before > 0);
420
421         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
422         put_device(&rbd_dev->dev);
423         mutex_unlock(&ctl_mutex);
424
425         return 0;
426 }
427
428 static const struct block_device_operations rbd_bd_ops = {
429         .owner                  = THIS_MODULE,
430         .open                   = rbd_open,
431         .release                = rbd_release,
432 };
433
434 /*
435  * Initialize an rbd client instance.
436  * We own *ceph_opts.
437  */
438 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
439 {
440         struct rbd_client *rbdc;
441         int ret = -ENOMEM;
442
443         dout("%s:\n", __func__);
444         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
445         if (!rbdc)
446                 goto out_opt;
447
448         kref_init(&rbdc->kref);
449         INIT_LIST_HEAD(&rbdc->node);
450
451         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
452
453         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
454         if (IS_ERR(rbdc->client))
455                 goto out_mutex;
456         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
457
458         ret = ceph_open_session(rbdc->client);
459         if (ret < 0)
460                 goto out_err;
461
462         spin_lock(&rbd_client_list_lock);
463         list_add_tail(&rbdc->node, &rbd_client_list);
464         spin_unlock(&rbd_client_list_lock);
465
466         mutex_unlock(&ctl_mutex);
467         dout("%s: rbdc %p\n", __func__, rbdc);
468
469         return rbdc;
470
471 out_err:
472         ceph_destroy_client(rbdc->client);
473 out_mutex:
474         mutex_unlock(&ctl_mutex);
475         kfree(rbdc);
476 out_opt:
477         if (ceph_opts)
478                 ceph_destroy_options(ceph_opts);
479         dout("%s: error %d\n", __func__, ret);
480
481         return ERR_PTR(ret);
482 }
483
484 /*
485  * Find a ceph client with specific addr and configuration.  If
486  * found, bump its reference count.
487  */
488 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
489 {
490         struct rbd_client *client_node;
491         bool found = false;
492
493         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
494                 return NULL;
495
496         spin_lock(&rbd_client_list_lock);
497         list_for_each_entry(client_node, &rbd_client_list, node) {
498                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
499                         kref_get(&client_node->kref);
500                         found = true;
501                         break;
502                 }
503         }
504         spin_unlock(&rbd_client_list_lock);
505
506         return found ? client_node : NULL;
507 }
508
509 /*
510  * mount options
511  */
512 enum {
513         Opt_last_int,
514         /* int args above */
515         Opt_last_string,
516         /* string args above */
517         Opt_read_only,
518         Opt_read_write,
519         /* Boolean args above */
520         Opt_last_bool,
521 };
522
523 static match_table_t rbd_opts_tokens = {
524         /* int args above */
525         /* string args above */
526         {Opt_read_only, "read_only"},
527         {Opt_read_only, "ro"},          /* Alternate spelling */
528         {Opt_read_write, "read_write"},
529         {Opt_read_write, "rw"},         /* Alternate spelling */
530         /* Boolean args above */
531         {-1, NULL}
532 };
533
534 struct rbd_options {
535         bool    read_only;
536 };
537
538 #define RBD_READ_ONLY_DEFAULT   false
539
540 static int parse_rbd_opts_token(char *c, void *private)
541 {
542         struct rbd_options *rbd_opts = private;
543         substring_t argstr[MAX_OPT_ARGS];
544         int token, intval, ret;
545
546         token = match_token(c, rbd_opts_tokens, argstr);
547         if (token < 0)
548                 return -EINVAL;
549
550         if (token < Opt_last_int) {
551                 ret = match_int(&argstr[0], &intval);
552                 if (ret < 0) {
553                         pr_err("bad mount option arg (not int) "
554                                "at '%s'\n", c);
555                         return ret;
556                 }
557                 dout("got int token %d val %d\n", token, intval);
558         } else if (token > Opt_last_int && token < Opt_last_string) {
559                 dout("got string token %d val %s\n", token,
560                      argstr[0].from);
561         } else if (token > Opt_last_string && token < Opt_last_bool) {
562                 dout("got Boolean token %d\n", token);
563         } else {
564                 dout("got token %d\n", token);
565         }
566
567         switch (token) {
568         case Opt_read_only:
569                 rbd_opts->read_only = true;
570                 break;
571         case Opt_read_write:
572                 rbd_opts->read_only = false;
573                 break;
574         default:
575                 rbd_assert(false);
576                 break;
577         }
578         return 0;
579 }
580
581 /*
582  * Get a ceph client with specific addr and configuration, if one does
583  * not exist create it.
584  */
585 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
586 {
587         struct rbd_client *rbdc;
588
589         rbdc = rbd_client_find(ceph_opts);
590         if (rbdc)       /* using an existing client */
591                 ceph_destroy_options(ceph_opts);
592         else
593                 rbdc = rbd_client_create(ceph_opts);
594
595         return rbdc;
596 }
597
598 /*
599  * Destroy ceph client
600  *
601  * Caller must hold rbd_client_list_lock.
602  */
603 static void rbd_client_release(struct kref *kref)
604 {
605         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
606
607         dout("%s: rbdc %p\n", __func__, rbdc);
608         spin_lock(&rbd_client_list_lock);
609         list_del(&rbdc->node);
610         spin_unlock(&rbd_client_list_lock);
611
612         ceph_destroy_client(rbdc->client);
613         kfree(rbdc);
614 }
615
616 /*
617  * Drop reference to ceph client node. If it's not referenced anymore, release
618  * it.
619  */
620 static void rbd_put_client(struct rbd_client *rbdc)
621 {
622         if (rbdc)
623                 kref_put(&rbdc->kref, rbd_client_release);
624 }
625
626 static bool rbd_image_format_valid(u32 image_format)
627 {
628         return image_format == 1 || image_format == 2;
629 }
630
631 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
632 {
633         size_t size;
634         u32 snap_count;
635
636         /* The header has to start with the magic rbd header text */
637         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
638                 return false;
639
640         /* The bio layer requires at least sector-sized I/O */
641
642         if (ondisk->options.order < SECTOR_SHIFT)
643                 return false;
644
645         /* If we use u64 in a few spots we may be able to loosen this */
646
647         if (ondisk->options.order > 8 * sizeof (int) - 1)
648                 return false;
649
650         /*
651          * The size of a snapshot header has to fit in a size_t, and
652          * that limits the number of snapshots.
653          */
654         snap_count = le32_to_cpu(ondisk->snap_count);
655         size = SIZE_MAX - sizeof (struct ceph_snap_context);
656         if (snap_count > size / sizeof (__le64))
657                 return false;
658
659         /*
660          * Not only that, but the size of the entire the snapshot
661          * header must also be representable in a size_t.
662          */
663         size -= snap_count * sizeof (__le64);
664         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
665                 return false;
666
667         return true;
668 }
669
670 /*
671  * Create a new header structure, translate header format from the on-disk
672  * header.
673  */
674 static int rbd_header_from_disk(struct rbd_image_header *header,
675                                  struct rbd_image_header_ondisk *ondisk)
676 {
677         u32 snap_count;
678         size_t len;
679         size_t size;
680         u32 i;
681
682         memset(header, 0, sizeof (*header));
683
684         snap_count = le32_to_cpu(ondisk->snap_count);
685
686         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
687         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
688         if (!header->object_prefix)
689                 return -ENOMEM;
690         memcpy(header->object_prefix, ondisk->object_prefix, len);
691         header->object_prefix[len] = '\0';
692
693         if (snap_count) {
694                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
695
696                 /* Save a copy of the snapshot names */
697
698                 if (snap_names_len > (u64) SIZE_MAX)
699                         return -EIO;
700                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
701                 if (!header->snap_names)
702                         goto out_err;
703                 /*
704                  * Note that rbd_dev_v1_header_read() guarantees
705                  * the ondisk buffer we're working with has
706                  * snap_names_len bytes beyond the end of the
707                  * snapshot id array, this memcpy() is safe.
708                  */
709                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
710                         snap_names_len);
711
712                 /* Record each snapshot's size */
713
714                 size = snap_count * sizeof (*header->snap_sizes);
715                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
716                 if (!header->snap_sizes)
717                         goto out_err;
718                 for (i = 0; i < snap_count; i++)
719                         header->snap_sizes[i] =
720                                 le64_to_cpu(ondisk->snaps[i].image_size);
721         } else {
722                 WARN_ON(ondisk->snap_names_len);
723                 header->snap_names = NULL;
724                 header->snap_sizes = NULL;
725         }
726
727         header->features = 0;   /* No features support in v1 images */
728         header->obj_order = ondisk->options.order;
729         header->crypt_type = ondisk->options.crypt_type;
730         header->comp_type = ondisk->options.comp_type;
731
732         /* Allocate and fill in the snapshot context */
733
734         header->image_size = le64_to_cpu(ondisk->image_size);
735         size = sizeof (struct ceph_snap_context);
736         size += snap_count * sizeof (header->snapc->snaps[0]);
737         header->snapc = kzalloc(size, GFP_KERNEL);
738         if (!header->snapc)
739                 goto out_err;
740
741         atomic_set(&header->snapc->nref, 1);
742         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
743         header->snapc->num_snaps = snap_count;
744         for (i = 0; i < snap_count; i++)
745                 header->snapc->snaps[i] =
746                         le64_to_cpu(ondisk->snaps[i].id);
747
748         return 0;
749
750 out_err:
751         kfree(header->snap_sizes);
752         header->snap_sizes = NULL;
753         kfree(header->snap_names);
754         header->snap_names = NULL;
755         kfree(header->object_prefix);
756         header->object_prefix = NULL;
757
758         return -ENOMEM;
759 }
760
761 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
762 {
763         struct rbd_snap *snap;
764
765         if (snap_id == CEPH_NOSNAP)
766                 return RBD_SNAP_HEAD_NAME;
767
768         list_for_each_entry(snap, &rbd_dev->snaps, node)
769                 if (snap_id == snap->id)
770                         return snap->name;
771
772         return NULL;
773 }
774
775 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
776 {
777
778         struct rbd_snap *snap;
779
780         list_for_each_entry(snap, &rbd_dev->snaps, node) {
781                 if (!strcmp(snap_name, snap->name)) {
782                         rbd_dev->spec->snap_id = snap->id;
783                         rbd_dev->mapping.size = snap->size;
784                         rbd_dev->mapping.features = snap->features;
785
786                         return 0;
787                 }
788         }
789
790         return -ENOENT;
791 }
792
793 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
794 {
795         int ret;
796
797         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
798                     sizeof (RBD_SNAP_HEAD_NAME))) {
799                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
800                 rbd_dev->mapping.size = rbd_dev->header.image_size;
801                 rbd_dev->mapping.features = rbd_dev->header.features;
802                 ret = 0;
803         } else {
804                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
805                 if (ret < 0)
806                         goto done;
807                 rbd_dev->mapping.read_only = true;
808         }
809         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
810
811 done:
812         return ret;
813 }
814
815 static void rbd_header_free(struct rbd_image_header *header)
816 {
817         kfree(header->object_prefix);
818         header->object_prefix = NULL;
819         kfree(header->snap_sizes);
820         header->snap_sizes = NULL;
821         kfree(header->snap_names);
822         header->snap_names = NULL;
823         ceph_put_snap_context(header->snapc);
824         header->snapc = NULL;
825 }
826
827 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
828 {
829         char *name;
830         u64 segment;
831         int ret;
832
833         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
834         if (!name)
835                 return NULL;
836         segment = offset >> rbd_dev->header.obj_order;
837         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
838                         rbd_dev->header.object_prefix, segment);
839         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
840                 pr_err("error formatting segment name for #%llu (%d)\n",
841                         segment, ret);
842                 kfree(name);
843                 name = NULL;
844         }
845
846         return name;
847 }
848
849 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
850 {
851         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
852
853         return offset & (segment_size - 1);
854 }
855
856 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
857                                 u64 offset, u64 length)
858 {
859         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
860
861         offset &= segment_size - 1;
862
863         rbd_assert(length <= U64_MAX - offset);
864         if (offset + length > segment_size)
865                 length = segment_size - offset;
866
867         return length;
868 }
869
870 /*
871  * returns the size of an object in the image
872  */
873 static u64 rbd_obj_bytes(struct rbd_image_header *header)
874 {
875         return 1 << header->obj_order;
876 }
877
878 /*
879  * bio helpers
880  */
881
882 static void bio_chain_put(struct bio *chain)
883 {
884         struct bio *tmp;
885
886         while (chain) {
887                 tmp = chain;
888                 chain = chain->bi_next;
889                 bio_put(tmp);
890         }
891 }
892
893 /*
894  * zeros a bio chain, starting at specific offset
895  */
896 static void zero_bio_chain(struct bio *chain, int start_ofs)
897 {
898         struct bio_vec *bv;
899         unsigned long flags;
900         void *buf;
901         int i;
902         int pos = 0;
903
904         while (chain) {
905                 bio_for_each_segment(bv, chain, i) {
906                         if (pos + bv->bv_len > start_ofs) {
907                                 int remainder = max(start_ofs - pos, 0);
908                                 buf = bvec_kmap_irq(bv, &flags);
909                                 memset(buf + remainder, 0,
910                                        bv->bv_len - remainder);
911                                 bvec_kunmap_irq(buf, &flags);
912                         }
913                         pos += bv->bv_len;
914                 }
915
916                 chain = chain->bi_next;
917         }
918 }
919
920 /*
921  * Clone a portion of a bio, starting at the given byte offset
922  * and continuing for the number of bytes indicated.
923  */
924 static struct bio *bio_clone_range(struct bio *bio_src,
925                                         unsigned int offset,
926                                         unsigned int len,
927                                         gfp_t gfpmask)
928 {
929         struct bio_vec *bv;
930         unsigned int resid;
931         unsigned short idx;
932         unsigned int voff;
933         unsigned short end_idx;
934         unsigned short vcnt;
935         struct bio *bio;
936
937         /* Handle the easy case for the caller */
938
939         if (!offset && len == bio_src->bi_size)
940                 return bio_clone(bio_src, gfpmask);
941
942         if (WARN_ON_ONCE(!len))
943                 return NULL;
944         if (WARN_ON_ONCE(len > bio_src->bi_size))
945                 return NULL;
946         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
947                 return NULL;
948
949         /* Find first affected segment... */
950
951         resid = offset;
952         __bio_for_each_segment(bv, bio_src, idx, 0) {
953                 if (resid < bv->bv_len)
954                         break;
955                 resid -= bv->bv_len;
956         }
957         voff = resid;
958
959         /* ...and the last affected segment */
960
961         resid += len;
962         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
963                 if (resid <= bv->bv_len)
964                         break;
965                 resid -= bv->bv_len;
966         }
967         vcnt = end_idx - idx + 1;
968
969         /* Build the clone */
970
971         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
972         if (!bio)
973                 return NULL;    /* ENOMEM */
974
975         bio->bi_bdev = bio_src->bi_bdev;
976         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
977         bio->bi_rw = bio_src->bi_rw;
978         bio->bi_flags |= 1 << BIO_CLONED;
979
980         /*
981          * Copy over our part of the bio_vec, then update the first
982          * and last (or only) entries.
983          */
984         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
985                         vcnt * sizeof (struct bio_vec));
986         bio->bi_io_vec[0].bv_offset += voff;
987         if (vcnt > 1) {
988                 bio->bi_io_vec[0].bv_len -= voff;
989                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
990         } else {
991                 bio->bi_io_vec[0].bv_len = len;
992         }
993
994         bio->bi_vcnt = vcnt;
995         bio->bi_size = len;
996         bio->bi_idx = 0;
997
998         return bio;
999 }
1000
1001 /*
1002  * Clone a portion of a bio chain, starting at the given byte offset
1003  * into the first bio in the source chain and continuing for the
1004  * number of bytes indicated.  The result is another bio chain of
1005  * exactly the given length, or a null pointer on error.
1006  *
1007  * The bio_src and offset parameters are both in-out.  On entry they
1008  * refer to the first source bio and the offset into that bio where
1009  * the start of data to be cloned is located.
1010  *
1011  * On return, bio_src is updated to refer to the bio in the source
1012  * chain that contains first un-cloned byte, and *offset will
1013  * contain the offset of that byte within that bio.
1014  */
1015 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1016                                         unsigned int *offset,
1017                                         unsigned int len,
1018                                         gfp_t gfpmask)
1019 {
1020         struct bio *bi = *bio_src;
1021         unsigned int off = *offset;
1022         struct bio *chain = NULL;
1023         struct bio **end;
1024
1025         /* Build up a chain of clone bios up to the limit */
1026
1027         if (!bi || off >= bi->bi_size || !len)
1028                 return NULL;            /* Nothing to clone */
1029
1030         end = &chain;
1031         while (len) {
1032                 unsigned int bi_size;
1033                 struct bio *bio;
1034
1035                 if (!bi) {
1036                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1037                         goto out_err;   /* EINVAL; ran out of bio's */
1038                 }
1039                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1040                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1041                 if (!bio)
1042                         goto out_err;   /* ENOMEM */
1043
1044                 *end = bio;
1045                 end = &bio->bi_next;
1046
1047                 off += bi_size;
1048                 if (off == bi->bi_size) {
1049                         bi = bi->bi_next;
1050                         off = 0;
1051                 }
1052                 len -= bi_size;
1053         }
1054         *bio_src = bi;
1055         *offset = off;
1056
1057         return chain;
1058 out_err:
1059         bio_chain_put(chain);
1060
1061         return NULL;
1062 }
1063
1064 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1065 {
1066         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1067                 atomic_read(&obj_request->kref.refcount));
1068         kref_get(&obj_request->kref);
1069 }
1070
1071 static void rbd_obj_request_destroy(struct kref *kref);
1072 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1073 {
1074         rbd_assert(obj_request != NULL);
1075         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1076                 atomic_read(&obj_request->kref.refcount));
1077         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1078 }
1079
1080 static void rbd_img_request_get(struct rbd_img_request *img_request)
1081 {
1082         dout("%s: img %p (was %d)\n", __func__, img_request,
1083                 atomic_read(&img_request->kref.refcount));
1084         kref_get(&img_request->kref);
1085 }
1086
1087 static void rbd_img_request_destroy(struct kref *kref);
1088 static void rbd_img_request_put(struct rbd_img_request *img_request)
1089 {
1090         rbd_assert(img_request != NULL);
1091         dout("%s: img %p (was %d)\n", __func__, img_request,
1092                 atomic_read(&img_request->kref.refcount));
1093         kref_put(&img_request->kref, rbd_img_request_destroy);
1094 }
1095
1096 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1097                                         struct rbd_obj_request *obj_request)
1098 {
1099         rbd_assert(obj_request->img_request == NULL);
1100
1101         rbd_obj_request_get(obj_request);
1102         obj_request->img_request = img_request;
1103         obj_request->which = img_request->obj_request_count;
1104         rbd_assert(obj_request->which != BAD_WHICH);
1105         img_request->obj_request_count++;
1106         list_add_tail(&obj_request->links, &img_request->obj_requests);
1107         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1108                 obj_request->which);
1109 }
1110
1111 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1112                                         struct rbd_obj_request *obj_request)
1113 {
1114         rbd_assert(obj_request->which != BAD_WHICH);
1115
1116         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1117                 obj_request->which);
1118         list_del(&obj_request->links);
1119         rbd_assert(img_request->obj_request_count > 0);
1120         img_request->obj_request_count--;
1121         rbd_assert(obj_request->which == img_request->obj_request_count);
1122         obj_request->which = BAD_WHICH;
1123         rbd_assert(obj_request->img_request == img_request);
1124         obj_request->img_request = NULL;
1125         obj_request->callback = NULL;
1126         rbd_obj_request_put(obj_request);
1127 }
1128
1129 static bool obj_request_type_valid(enum obj_request_type type)
1130 {
1131         switch (type) {
1132         case OBJ_REQUEST_NODATA:
1133         case OBJ_REQUEST_BIO:
1134         case OBJ_REQUEST_PAGES:
1135                 return true;
1136         default:
1137                 return false;
1138         }
1139 }
1140
1141 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1142                                 struct rbd_obj_request *obj_request)
1143 {
1144         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1145
1146         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1147 }
1148
1149 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1150 {
1151         dout("%s: img %p\n", __func__, img_request);
1152         if (img_request->callback)
1153                 img_request->callback(img_request);
1154         else
1155                 rbd_img_request_put(img_request);
1156 }
1157
1158 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1159
1160 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1161 {
1162         dout("%s: obj %p\n", __func__, obj_request);
1163
1164         return wait_for_completion_interruptible(&obj_request->completion);
1165 }
1166
1167 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1168 {
1169         atomic_set(&obj_request->done, 0);
1170         smp_wmb();
1171 }
1172
1173 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1174 {
1175         int done;
1176
1177         done = atomic_inc_return(&obj_request->done);
1178         if (done > 1) {
1179                 struct rbd_img_request *img_request = obj_request->img_request;
1180                 struct rbd_device *rbd_dev;
1181
1182                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1183                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1184                         obj_request);
1185         }
1186 }
1187
1188 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1189 {
1190         smp_mb();
1191         return atomic_read(&obj_request->done) != 0;
1192 }
1193
1194 static void
1195 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1196 {
1197         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1198                 obj_request, obj_request->img_request, obj_request->result,
1199                 obj_request->xferred, obj_request->length);
1200         /*
1201          * ENOENT means a hole in the image.  We zero-fill the
1202          * entire length of the request.  A short read also implies
1203          * zero-fill to the end of the request.  Either way we
1204          * update the xferred count to indicate the whole request
1205          * was satisfied.
1206          */
1207         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1208         if (obj_request->result == -ENOENT) {
1209                 zero_bio_chain(obj_request->bio_list, 0);
1210                 obj_request->result = 0;
1211                 obj_request->xferred = obj_request->length;
1212         } else if (obj_request->xferred < obj_request->length &&
1213                         !obj_request->result) {
1214                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1215                 obj_request->xferred = obj_request->length;
1216         }
1217         obj_request_done_set(obj_request);
1218 }
1219
1220 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1221 {
1222         dout("%s: obj %p cb %p\n", __func__, obj_request,
1223                 obj_request->callback);
1224         if (obj_request->callback)
1225                 obj_request->callback(obj_request);
1226         else
1227                 complete_all(&obj_request->completion);
1228 }
1229
1230 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1231 {
1232         dout("%s: obj %p\n", __func__, obj_request);
1233         obj_request_done_set(obj_request);
1234 }
1235
1236 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1237 {
1238         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1239                 obj_request->result, obj_request->xferred, obj_request->length);
1240         if (obj_request->img_request)
1241                 rbd_img_obj_request_read_callback(obj_request);
1242         else
1243                 obj_request_done_set(obj_request);
1244 }
1245
1246 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1247 {
1248         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1249                 obj_request->result, obj_request->length);
1250         /*
1251          * There is no such thing as a successful short write.
1252          * Our xferred value is the number of bytes transferred
1253          * back.  Set it to our originally-requested length.
1254          */
1255         obj_request->xferred = obj_request->length;
1256         obj_request_done_set(obj_request);
1257 }
1258
1259 /*
1260  * For a simple stat call there's nothing to do.  We'll do more if
1261  * this is part of a write sequence for a layered image.
1262  */
1263 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1264 {
1265         dout("%s: obj %p\n", __func__, obj_request);
1266         obj_request_done_set(obj_request);
1267 }
1268
1269 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1270                                 struct ceph_msg *msg)
1271 {
1272         struct rbd_obj_request *obj_request = osd_req->r_priv;
1273         u16 opcode;
1274
1275         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1276         rbd_assert(osd_req == obj_request->osd_req);
1277         rbd_assert(!!obj_request->img_request ^
1278                                 (obj_request->which == BAD_WHICH));
1279
1280         if (osd_req->r_result < 0)
1281                 obj_request->result = osd_req->r_result;
1282         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1283
1284         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1285
1286         /*
1287          * We support a 64-bit length, but ultimately it has to be
1288          * passed to blk_end_request(), which takes an unsigned int.
1289          */
1290         obj_request->xferred = osd_req->r_reply_op_len[0];
1291         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1292         opcode = osd_req->r_ops[0].op;
1293         switch (opcode) {
1294         case CEPH_OSD_OP_READ:
1295                 rbd_osd_read_callback(obj_request);
1296                 break;
1297         case CEPH_OSD_OP_WRITE:
1298                 rbd_osd_write_callback(obj_request);
1299                 break;
1300         case CEPH_OSD_OP_STAT:
1301                 rbd_osd_stat_callback(obj_request);
1302                 break;
1303         case CEPH_OSD_OP_CALL:
1304         case CEPH_OSD_OP_NOTIFY_ACK:
1305         case CEPH_OSD_OP_WATCH:
1306                 rbd_osd_trivial_callback(obj_request);
1307                 break;
1308         default:
1309                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1310                         obj_request->object_name, (unsigned short) opcode);
1311                 break;
1312         }
1313
1314         if (obj_request_done_test(obj_request))
1315                 rbd_obj_request_complete(obj_request);
1316 }
1317
1318 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1319                                         bool write_request)
1320 {
1321         struct rbd_img_request *img_request = obj_request->img_request;
1322         struct ceph_osd_request *osd_req = obj_request->osd_req;
1323         struct ceph_snap_context *snapc = NULL;
1324         u64 snap_id = CEPH_NOSNAP;
1325         struct timespec *mtime = NULL;
1326         struct timespec now;
1327
1328         rbd_assert(osd_req != NULL);
1329
1330         if (write_request) {
1331                 now = CURRENT_TIME;
1332                 mtime = &now;
1333                 if (img_request)
1334                         snapc = img_request->snapc;
1335         } else if (img_request) {
1336                 snap_id = img_request->snap_id;
1337         }
1338         ceph_osdc_build_request(osd_req, obj_request->offset,
1339                         snapc, snap_id, mtime);
1340 }
1341
1342 static struct ceph_osd_request *rbd_osd_req_create(
1343                                         struct rbd_device *rbd_dev,
1344                                         bool write_request,
1345                                         struct rbd_obj_request *obj_request)
1346 {
1347         struct rbd_img_request *img_request = obj_request->img_request;
1348         struct ceph_snap_context *snapc = NULL;
1349         struct ceph_osd_client *osdc;
1350         struct ceph_osd_request *osd_req;
1351
1352         if (img_request) {
1353                 rbd_assert(img_request->write_request == write_request);
1354                 if (img_request->write_request)
1355                         snapc = img_request->snapc;
1356         }
1357
1358         /* Allocate and initialize the request, for the single op */
1359
1360         osdc = &rbd_dev->rbd_client->client->osdc;
1361         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1362         if (!osd_req)
1363                 return NULL;    /* ENOMEM */
1364
1365         if (write_request)
1366                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1367         else
1368                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1369
1370         osd_req->r_callback = rbd_osd_req_callback;
1371         osd_req->r_priv = obj_request;
1372
1373         osd_req->r_oid_len = strlen(obj_request->object_name);
1374         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1375         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1376
1377         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1378
1379         return osd_req;
1380 }
1381
1382 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1383 {
1384         ceph_osdc_put_request(osd_req);
1385 }
1386
1387 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1388
1389 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1390                                                 u64 offset, u64 length,
1391                                                 enum obj_request_type type)
1392 {
1393         struct rbd_obj_request *obj_request;
1394         size_t size;
1395         char *name;
1396
1397         rbd_assert(obj_request_type_valid(type));
1398
1399         size = strlen(object_name) + 1;
1400         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1401         if (!obj_request)
1402                 return NULL;
1403
1404         name = (char *)(obj_request + 1);
1405         obj_request->object_name = memcpy(name, object_name, size);
1406         obj_request->offset = offset;
1407         obj_request->length = length;
1408         obj_request->which = BAD_WHICH;
1409         obj_request->type = type;
1410         INIT_LIST_HEAD(&obj_request->links);
1411         obj_request_done_init(obj_request);
1412         init_completion(&obj_request->completion);
1413         kref_init(&obj_request->kref);
1414
1415         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1416                 offset, length, (int)type, obj_request);
1417
1418         return obj_request;
1419 }
1420
1421 static void rbd_obj_request_destroy(struct kref *kref)
1422 {
1423         struct rbd_obj_request *obj_request;
1424
1425         obj_request = container_of(kref, struct rbd_obj_request, kref);
1426
1427         dout("%s: obj %p\n", __func__, obj_request);
1428
1429         rbd_assert(obj_request->img_request == NULL);
1430         rbd_assert(obj_request->which == BAD_WHICH);
1431
1432         if (obj_request->osd_req)
1433                 rbd_osd_req_destroy(obj_request->osd_req);
1434
1435         rbd_assert(obj_request_type_valid(obj_request->type));
1436         switch (obj_request->type) {
1437         case OBJ_REQUEST_NODATA:
1438                 break;          /* Nothing to do */
1439         case OBJ_REQUEST_BIO:
1440                 if (obj_request->bio_list)
1441                         bio_chain_put(obj_request->bio_list);
1442                 break;
1443         case OBJ_REQUEST_PAGES:
1444                 if (obj_request->pages)
1445                         ceph_release_page_vector(obj_request->pages,
1446                                                 obj_request->page_count);
1447                 break;
1448         }
1449
1450         kfree(obj_request);
1451 }
1452
1453 /*
1454  * Caller is responsible for filling in the list of object requests
1455  * that comprises the image request, and the Linux request pointer
1456  * (if there is one).
1457  */
1458 static struct rbd_img_request *rbd_img_request_create(
1459                                         struct rbd_device *rbd_dev,
1460                                         u64 offset, u64 length,
1461                                         bool write_request)
1462 {
1463         struct rbd_img_request *img_request;
1464         struct ceph_snap_context *snapc = NULL;
1465
1466         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1467         if (!img_request)
1468                 return NULL;
1469
1470         if (write_request) {
1471                 down_read(&rbd_dev->header_rwsem);
1472                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1473                 up_read(&rbd_dev->header_rwsem);
1474                 if (WARN_ON(!snapc)) {
1475                         kfree(img_request);
1476                         return NULL;    /* Shouldn't happen */
1477                 }
1478         }
1479
1480         img_request->rq = NULL;
1481         img_request->rbd_dev = rbd_dev;
1482         img_request->offset = offset;
1483         img_request->length = length;
1484         img_request->write_request = write_request;
1485         if (write_request)
1486                 img_request->snapc = snapc;
1487         else
1488                 img_request->snap_id = rbd_dev->spec->snap_id;
1489         spin_lock_init(&img_request->completion_lock);
1490         img_request->next_completion = 0;
1491         img_request->callback = NULL;
1492         img_request->result = 0;
1493         img_request->obj_request_count = 0;
1494         INIT_LIST_HEAD(&img_request->obj_requests);
1495         kref_init(&img_request->kref);
1496
1497         rbd_img_request_get(img_request);       /* Avoid a warning */
1498         rbd_img_request_put(img_request);       /* TEMPORARY */
1499
1500         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1501                 write_request ? "write" : "read", offset, length,
1502                 img_request);
1503
1504         return img_request;
1505 }
1506
1507 static void rbd_img_request_destroy(struct kref *kref)
1508 {
1509         struct rbd_img_request *img_request;
1510         struct rbd_obj_request *obj_request;
1511         struct rbd_obj_request *next_obj_request;
1512
1513         img_request = container_of(kref, struct rbd_img_request, kref);
1514
1515         dout("%s: img %p\n", __func__, img_request);
1516
1517         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1518                 rbd_img_obj_request_del(img_request, obj_request);
1519         rbd_assert(img_request->obj_request_count == 0);
1520
1521         if (img_request->write_request)
1522                 ceph_put_snap_context(img_request->snapc);
1523
1524         kfree(img_request);
1525 }
1526
1527 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1528 {
1529         struct rbd_img_request *img_request;
1530         u32 which = obj_request->which;
1531         bool more = true;
1532
1533         img_request = obj_request->img_request;
1534
1535         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1536         rbd_assert(img_request != NULL);
1537         rbd_assert(img_request->rq != NULL);
1538         rbd_assert(img_request->obj_request_count > 0);
1539         rbd_assert(which != BAD_WHICH);
1540         rbd_assert(which < img_request->obj_request_count);
1541         rbd_assert(which >= img_request->next_completion);
1542
1543         spin_lock_irq(&img_request->completion_lock);
1544         if (which != img_request->next_completion)
1545                 goto out;
1546
1547         for_each_obj_request_from(img_request, obj_request) {
1548                 unsigned int xferred;
1549                 int result;
1550
1551                 rbd_assert(more);
1552                 rbd_assert(which < img_request->obj_request_count);
1553
1554                 if (!obj_request_done_test(obj_request))
1555                         break;
1556
1557                 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1558                 xferred = (unsigned int)obj_request->xferred;
1559                 result = obj_request->result;
1560                 if (result) {
1561                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1562                                 img_request->write_request ? "write" : "read",
1563                                 result, xferred);
1564                         if (!img_request->result)
1565                                 img_request->result = result;
1566                 }
1567
1568                 more = blk_end_request(img_request->rq, result, xferred);
1569                 which++;
1570         }
1571
1572         rbd_assert(more ^ (which == img_request->obj_request_count));
1573         img_request->next_completion = which;
1574 out:
1575         spin_unlock_irq(&img_request->completion_lock);
1576
1577         if (!more)
1578                 rbd_img_request_complete(img_request);
1579 }
1580
1581 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1582                                         struct bio *bio_list)
1583 {
1584         struct rbd_device *rbd_dev = img_request->rbd_dev;
1585         struct rbd_obj_request *obj_request = NULL;
1586         struct rbd_obj_request *next_obj_request;
1587         bool write_request = img_request->write_request;
1588         unsigned int bio_offset;
1589         u64 image_offset;
1590         u64 resid;
1591         u16 opcode;
1592
1593         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1594
1595         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1596         bio_offset = 0;
1597         image_offset = img_request->offset;
1598         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1599         resid = img_request->length;
1600         rbd_assert(resid > 0);
1601         while (resid) {
1602                 struct ceph_osd_request *osd_req;
1603                 const char *object_name;
1604                 unsigned int clone_size;
1605                 u64 offset;
1606                 u64 length;
1607
1608                 object_name = rbd_segment_name(rbd_dev, image_offset);
1609                 if (!object_name)
1610                         goto out_unwind;
1611                 offset = rbd_segment_offset(rbd_dev, image_offset);
1612                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1613                 obj_request = rbd_obj_request_create(object_name,
1614                                                 offset, length,
1615                                                 OBJ_REQUEST_BIO);
1616                 kfree(object_name);     /* object request has its own copy */
1617                 if (!obj_request)
1618                         goto out_unwind;
1619
1620                 rbd_assert(length <= (u64) UINT_MAX);
1621                 clone_size = (unsigned int) length;
1622                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1623                                                 &bio_offset, clone_size,
1624                                                 GFP_ATOMIC);
1625                 if (!obj_request->bio_list)
1626                         goto out_partial;
1627
1628                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1629                                                 obj_request);
1630                 if (!osd_req)
1631                         goto out_partial;
1632                 obj_request->osd_req = osd_req;
1633                 obj_request->callback = rbd_img_obj_callback;
1634
1635                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1636                                                 0, 0);
1637                 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1638                                 obj_request->bio_list, obj_request->length);
1639                 rbd_osd_req_format(obj_request, write_request);
1640
1641                 rbd_img_obj_request_add(img_request, obj_request);
1642
1643                 image_offset += length;
1644                 resid -= length;
1645         }
1646
1647         return 0;
1648
1649 out_partial:
1650         rbd_obj_request_put(obj_request);
1651 out_unwind:
1652         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1653                 rbd_obj_request_put(obj_request);
1654
1655         return -ENOMEM;
1656 }
1657
1658 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1659 {
1660         struct rbd_device *rbd_dev = img_request->rbd_dev;
1661         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1662         struct rbd_obj_request *obj_request;
1663         struct rbd_obj_request *next_obj_request;
1664
1665         dout("%s: img %p\n", __func__, img_request);
1666         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1667                 int ret;
1668
1669                 ret = rbd_obj_request_submit(osdc, obj_request);
1670                 if (ret)
1671                         return ret;
1672                 /*
1673                  * The image request has its own reference to each
1674                  * of its object requests, so we can safely drop the
1675                  * initial one here.
1676                  */
1677                 rbd_obj_request_put(obj_request);
1678         }
1679
1680         return 0;
1681 }
1682
1683 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1684                                    u64 ver, u64 notify_id)
1685 {
1686         struct rbd_obj_request *obj_request;
1687         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1688         int ret;
1689
1690         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1691                                                         OBJ_REQUEST_NODATA);
1692         if (!obj_request)
1693                 return -ENOMEM;
1694
1695         ret = -ENOMEM;
1696         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1697         if (!obj_request->osd_req)
1698                 goto out;
1699         obj_request->callback = rbd_obj_request_put;
1700
1701         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1702                                         notify_id, ver, 0);
1703         rbd_osd_req_format(obj_request, false);
1704
1705         ret = rbd_obj_request_submit(osdc, obj_request);
1706 out:
1707         if (ret)
1708                 rbd_obj_request_put(obj_request);
1709
1710         return ret;
1711 }
1712
1713 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1714 {
1715         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1716         u64 hver;
1717         int rc;
1718
1719         if (!rbd_dev)
1720                 return;
1721
1722         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1723                 rbd_dev->header_name, (unsigned long long) notify_id,
1724                 (unsigned int) opcode);
1725         rc = rbd_dev_refresh(rbd_dev, &hver);
1726         if (rc)
1727                 rbd_warn(rbd_dev, "got notification but failed to "
1728                            " update snaps: %d\n", rc);
1729
1730         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1731 }
1732
1733 /*
1734  * Request sync osd watch/unwatch.  The value of "start" determines
1735  * whether a watch request is being initiated or torn down.
1736  */
1737 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1738 {
1739         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1740         struct rbd_obj_request *obj_request;
1741         int ret;
1742
1743         rbd_assert(start ^ !!rbd_dev->watch_event);
1744         rbd_assert(start ^ !!rbd_dev->watch_request);
1745
1746         if (start) {
1747                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1748                                                 &rbd_dev->watch_event);
1749                 if (ret < 0)
1750                         return ret;
1751                 rbd_assert(rbd_dev->watch_event != NULL);
1752         }
1753
1754         ret = -ENOMEM;
1755         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1756                                                         OBJ_REQUEST_NODATA);
1757         if (!obj_request)
1758                 goto out_cancel;
1759
1760         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1761         if (!obj_request->osd_req)
1762                 goto out_cancel;
1763
1764         if (start)
1765                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1766         else
1767                 ceph_osdc_unregister_linger_request(osdc,
1768                                         rbd_dev->watch_request->osd_req);
1769
1770         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1771                                 rbd_dev->watch_event->cookie,
1772                                 rbd_dev->header.obj_version, start);
1773         rbd_osd_req_format(obj_request, true);
1774
1775         ret = rbd_obj_request_submit(osdc, obj_request);
1776         if (ret)
1777                 goto out_cancel;
1778         ret = rbd_obj_request_wait(obj_request);
1779         if (ret)
1780                 goto out_cancel;
1781         ret = obj_request->result;
1782         if (ret)
1783                 goto out_cancel;
1784
1785         /*
1786          * A watch request is set to linger, so the underlying osd
1787          * request won't go away until we unregister it.  We retain
1788          * a pointer to the object request during that time (in
1789          * rbd_dev->watch_request), so we'll keep a reference to
1790          * it.  We'll drop that reference (below) after we've
1791          * unregistered it.
1792          */
1793         if (start) {
1794                 rbd_dev->watch_request = obj_request;
1795
1796                 return 0;
1797         }
1798
1799         /* We have successfully torn down the watch request */
1800
1801         rbd_obj_request_put(rbd_dev->watch_request);
1802         rbd_dev->watch_request = NULL;
1803 out_cancel:
1804         /* Cancel the event if we're tearing down, or on error */
1805         ceph_osdc_cancel_event(rbd_dev->watch_event);
1806         rbd_dev->watch_event = NULL;
1807         if (obj_request)
1808                 rbd_obj_request_put(obj_request);
1809
1810         return ret;
1811 }
1812
1813 /*
1814  * Synchronous osd object method call
1815  */
1816 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1817                              const char *object_name,
1818                              const char *class_name,
1819                              const char *method_name,
1820                              const char *outbound,
1821                              size_t outbound_size,
1822                              char *inbound,
1823                              size_t inbound_size,
1824                              u64 *version)
1825 {
1826         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1827         struct rbd_obj_request *obj_request;
1828         struct page **pages;
1829         u32 page_count;
1830         int ret;
1831
1832         /*
1833          * Method calls are ultimately read operations.  The result
1834          * should placed into the inbound buffer provided.  They
1835          * also supply outbound data--parameters for the object
1836          * method.  Currently if this is present it will be a
1837          * snapshot id.
1838          */
1839         page_count = (u32) calc_pages_for(0, inbound_size);
1840         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1841         if (IS_ERR(pages))
1842                 return PTR_ERR(pages);
1843
1844         ret = -ENOMEM;
1845         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1846                                                         OBJ_REQUEST_PAGES);
1847         if (!obj_request)
1848                 goto out;
1849
1850         obj_request->pages = pages;
1851         obj_request->page_count = page_count;
1852
1853         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1854         if (!obj_request->osd_req)
1855                 goto out;
1856
1857         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1858                                         class_name, method_name);
1859         if (outbound_size) {
1860                 struct ceph_pagelist *pagelist;
1861
1862                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1863                 if (!pagelist)
1864                         goto out;
1865
1866                 ceph_pagelist_init(pagelist);
1867                 ceph_pagelist_append(pagelist, outbound, outbound_size);
1868                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1869                                                 pagelist);
1870         }
1871         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1872                                         obj_request->pages, inbound_size,
1873                                         0, false, false);
1874         rbd_osd_req_format(obj_request, false);
1875
1876         ret = rbd_obj_request_submit(osdc, obj_request);
1877         if (ret)
1878                 goto out;
1879         ret = rbd_obj_request_wait(obj_request);
1880         if (ret)
1881                 goto out;
1882
1883         ret = obj_request->result;
1884         if (ret < 0)
1885                 goto out;
1886         ret = 0;
1887         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1888         if (version)
1889                 *version = obj_request->version;
1890 out:
1891         if (obj_request)
1892                 rbd_obj_request_put(obj_request);
1893         else
1894                 ceph_release_page_vector(pages, page_count);
1895
1896         return ret;
1897 }
1898
1899 static void rbd_request_fn(struct request_queue *q)
1900                 __releases(q->queue_lock) __acquires(q->queue_lock)
1901 {
1902         struct rbd_device *rbd_dev = q->queuedata;
1903         bool read_only = rbd_dev->mapping.read_only;
1904         struct request *rq;
1905         int result;
1906
1907         while ((rq = blk_fetch_request(q))) {
1908                 bool write_request = rq_data_dir(rq) == WRITE;
1909                 struct rbd_img_request *img_request;
1910                 u64 offset;
1911                 u64 length;
1912
1913                 /* Ignore any non-FS requests that filter through. */
1914
1915                 if (rq->cmd_type != REQ_TYPE_FS) {
1916                         dout("%s: non-fs request type %d\n", __func__,
1917                                 (int) rq->cmd_type);
1918                         __blk_end_request_all(rq, 0);
1919                         continue;
1920                 }
1921
1922                 /* Ignore/skip any zero-length requests */
1923
1924                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1925                 length = (u64) blk_rq_bytes(rq);
1926
1927                 if (!length) {
1928                         dout("%s: zero-length request\n", __func__);
1929                         __blk_end_request_all(rq, 0);
1930                         continue;
1931                 }
1932
1933                 spin_unlock_irq(q->queue_lock);
1934
1935                 /* Disallow writes to a read-only device */
1936
1937                 if (write_request) {
1938                         result = -EROFS;
1939                         if (read_only)
1940                                 goto end_request;
1941                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1942                 }
1943
1944                 /*
1945                  * Quit early if the mapped snapshot no longer
1946                  * exists.  It's still possible the snapshot will
1947                  * have disappeared by the time our request arrives
1948                  * at the osd, but there's no sense in sending it if
1949                  * we already know.
1950                  */
1951                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1952                         dout("request for non-existent snapshot");
1953                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1954                         result = -ENXIO;
1955                         goto end_request;
1956                 }
1957
1958                 result = -EINVAL;
1959                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1960                         goto end_request;       /* Shouldn't happen */
1961
1962                 result = -ENOMEM;
1963                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1964                                                         write_request);
1965                 if (!img_request)
1966                         goto end_request;
1967
1968                 img_request->rq = rq;
1969
1970                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1971                 if (!result)
1972                         result = rbd_img_request_submit(img_request);
1973                 if (result)
1974                         rbd_img_request_put(img_request);
1975 end_request:
1976                 spin_lock_irq(q->queue_lock);
1977                 if (result < 0) {
1978                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1979                                 write_request ? "write" : "read", result);
1980                         __blk_end_request_all(rq, result);
1981                 }
1982         }
1983 }
1984
1985 /*
1986  * a queue callback. Makes sure that we don't create a bio that spans across
1987  * multiple osd objects. One exception would be with a single page bios,
1988  * which we handle later at bio_chain_clone_range()
1989  */
1990 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1991                           struct bio_vec *bvec)
1992 {
1993         struct rbd_device *rbd_dev = q->queuedata;
1994         sector_t sector_offset;
1995         sector_t sectors_per_obj;
1996         sector_t obj_sector_offset;
1997         int ret;
1998
1999         /*
2000          * Find how far into its rbd object the partition-relative
2001          * bio start sector is to offset relative to the enclosing
2002          * device.
2003          */
2004         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2005         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2006         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2007
2008         /*
2009          * Compute the number of bytes from that offset to the end
2010          * of the object.  Account for what's already used by the bio.
2011          */
2012         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2013         if (ret > bmd->bi_size)
2014                 ret -= bmd->bi_size;
2015         else
2016                 ret = 0;
2017
2018         /*
2019          * Don't send back more than was asked for.  And if the bio
2020          * was empty, let the whole thing through because:  "Note
2021          * that a block device *must* allow a single page to be
2022          * added to an empty bio."
2023          */
2024         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2025         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2026                 ret = (int) bvec->bv_len;
2027
2028         return ret;
2029 }
2030
2031 static void rbd_free_disk(struct rbd_device *rbd_dev)
2032 {
2033         struct gendisk *disk = rbd_dev->disk;
2034
2035         if (!disk)
2036                 return;
2037
2038         if (disk->flags & GENHD_FL_UP)
2039                 del_gendisk(disk);
2040         if (disk->queue)
2041                 blk_cleanup_queue(disk->queue);
2042         put_disk(disk);
2043 }
2044
2045 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2046                                 const char *object_name,
2047                                 u64 offset, u64 length,
2048                                 char *buf, u64 *version)
2049
2050 {
2051         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2052         struct rbd_obj_request *obj_request;
2053         struct page **pages = NULL;
2054         u32 page_count;
2055         size_t size;
2056         int ret;
2057
2058         page_count = (u32) calc_pages_for(offset, length);
2059         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2060         if (IS_ERR(pages))
2061                 ret = PTR_ERR(pages);
2062
2063         ret = -ENOMEM;
2064         obj_request = rbd_obj_request_create(object_name, offset, length,
2065                                                         OBJ_REQUEST_PAGES);
2066         if (!obj_request)
2067                 goto out;
2068
2069         obj_request->pages = pages;
2070         obj_request->page_count = page_count;
2071
2072         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2073         if (!obj_request->osd_req)
2074                 goto out;
2075
2076         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2077                                         offset, length, 0, 0);
2078         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2079                                         obj_request->pages,
2080                                         obj_request->length,
2081                                         obj_request->offset & ~PAGE_MASK,
2082                                         false, false);
2083         rbd_osd_req_format(obj_request, false);
2084
2085         ret = rbd_obj_request_submit(osdc, obj_request);
2086         if (ret)
2087                 goto out;
2088         ret = rbd_obj_request_wait(obj_request);
2089         if (ret)
2090                 goto out;
2091
2092         ret = obj_request->result;
2093         if (ret < 0)
2094                 goto out;
2095
2096         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2097         size = (size_t) obj_request->xferred;
2098         ceph_copy_from_page_vector(pages, buf, 0, size);
2099         rbd_assert(size <= (size_t) INT_MAX);
2100         ret = (int) size;
2101         if (version)
2102                 *version = obj_request->version;
2103 out:
2104         if (obj_request)
2105                 rbd_obj_request_put(obj_request);
2106         else
2107                 ceph_release_page_vector(pages, page_count);
2108
2109         return ret;
2110 }
2111
2112 /*
2113  * Read the complete header for the given rbd device.
2114  *
2115  * Returns a pointer to a dynamically-allocated buffer containing
2116  * the complete and validated header.  Caller can pass the address
2117  * of a variable that will be filled in with the version of the
2118  * header object at the time it was read.
2119  *
2120  * Returns a pointer-coded errno if a failure occurs.
2121  */
2122 static struct rbd_image_header_ondisk *
2123 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2124 {
2125         struct rbd_image_header_ondisk *ondisk = NULL;
2126         u32 snap_count = 0;
2127         u64 names_size = 0;
2128         u32 want_count;
2129         int ret;
2130
2131         /*
2132          * The complete header will include an array of its 64-bit
2133          * snapshot ids, followed by the names of those snapshots as
2134          * a contiguous block of NUL-terminated strings.  Note that
2135          * the number of snapshots could change by the time we read
2136          * it in, in which case we re-read it.
2137          */
2138         do {
2139                 size_t size;
2140
2141                 kfree(ondisk);
2142
2143                 size = sizeof (*ondisk);
2144                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2145                 size += names_size;
2146                 ondisk = kmalloc(size, GFP_KERNEL);
2147                 if (!ondisk)
2148                         return ERR_PTR(-ENOMEM);
2149
2150                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2151                                        0, size,
2152                                        (char *) ondisk, version);
2153                 if (ret < 0)
2154                         goto out_err;
2155                 if (WARN_ON((size_t) ret < size)) {
2156                         ret = -ENXIO;
2157                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2158                                 size, ret);
2159                         goto out_err;
2160                 }
2161                 if (!rbd_dev_ondisk_valid(ondisk)) {
2162                         ret = -ENXIO;
2163                         rbd_warn(rbd_dev, "invalid header");
2164                         goto out_err;
2165                 }
2166
2167                 names_size = le64_to_cpu(ondisk->snap_names_len);
2168                 want_count = snap_count;
2169                 snap_count = le32_to_cpu(ondisk->snap_count);
2170         } while (snap_count != want_count);
2171
2172         return ondisk;
2173
2174 out_err:
2175         kfree(ondisk);
2176
2177         return ERR_PTR(ret);
2178 }
2179
2180 /*
2181  * reload the ondisk the header
2182  */
2183 static int rbd_read_header(struct rbd_device *rbd_dev,
2184                            struct rbd_image_header *header)
2185 {
2186         struct rbd_image_header_ondisk *ondisk;
2187         u64 ver = 0;
2188         int ret;
2189
2190         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2191         if (IS_ERR(ondisk))
2192                 return PTR_ERR(ondisk);
2193         ret = rbd_header_from_disk(header, ondisk);
2194         if (ret >= 0)
2195                 header->obj_version = ver;
2196         kfree(ondisk);
2197
2198         return ret;
2199 }
2200
2201 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2202 {
2203         struct rbd_snap *snap;
2204         struct rbd_snap *next;
2205
2206         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2207                 rbd_remove_snap_dev(snap);
2208 }
2209
2210 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2211 {
2212         sector_t size;
2213
2214         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2215                 return;
2216
2217         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2218         dout("setting size to %llu sectors", (unsigned long long) size);
2219         rbd_dev->mapping.size = (u64) size;
2220         set_capacity(rbd_dev->disk, size);
2221 }
2222
2223 /*
2224  * only read the first part of the ondisk header, without the snaps info
2225  */
2226 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2227 {
2228         int ret;
2229         struct rbd_image_header h;
2230
2231         ret = rbd_read_header(rbd_dev, &h);
2232         if (ret < 0)
2233                 return ret;
2234
2235         down_write(&rbd_dev->header_rwsem);
2236
2237         /* Update image size, and check for resize of mapped image */
2238         rbd_dev->header.image_size = h.image_size;
2239         rbd_update_mapping_size(rbd_dev);
2240
2241         /* rbd_dev->header.object_prefix shouldn't change */
2242         kfree(rbd_dev->header.snap_sizes);
2243         kfree(rbd_dev->header.snap_names);
2244         /* osd requests may still refer to snapc */
2245         ceph_put_snap_context(rbd_dev->header.snapc);
2246
2247         if (hver)
2248                 *hver = h.obj_version;
2249         rbd_dev->header.obj_version = h.obj_version;
2250         rbd_dev->header.image_size = h.image_size;
2251         rbd_dev->header.snapc = h.snapc;
2252         rbd_dev->header.snap_names = h.snap_names;
2253         rbd_dev->header.snap_sizes = h.snap_sizes;
2254         /* Free the extra copy of the object prefix */
2255         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2256         kfree(h.object_prefix);
2257
2258         ret = rbd_dev_snaps_update(rbd_dev);
2259         if (!ret)
2260                 ret = rbd_dev_snaps_register(rbd_dev);
2261
2262         up_write(&rbd_dev->header_rwsem);
2263
2264         return ret;
2265 }
2266
2267 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2268 {
2269         int ret;
2270
2271         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2272         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2273         if (rbd_dev->image_format == 1)
2274                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2275         else
2276                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2277         mutex_unlock(&ctl_mutex);
2278
2279         return ret;
2280 }
2281
2282 static int rbd_init_disk(struct rbd_device *rbd_dev)
2283 {
2284         struct gendisk *disk;
2285         struct request_queue *q;
2286         u64 segment_size;
2287
2288         /* create gendisk info */
2289         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2290         if (!disk)
2291                 return -ENOMEM;
2292
2293         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2294                  rbd_dev->dev_id);
2295         disk->major = rbd_dev->major;
2296         disk->first_minor = 0;
2297         disk->fops = &rbd_bd_ops;
2298         disk->private_data = rbd_dev;
2299
2300         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2301         if (!q)
2302                 goto out_disk;
2303
2304         /* We use the default size, but let's be explicit about it. */
2305         blk_queue_physical_block_size(q, SECTOR_SIZE);
2306
2307         /* set io sizes to object size */
2308         segment_size = rbd_obj_bytes(&rbd_dev->header);
2309         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2310         blk_queue_max_segment_size(q, segment_size);
2311         blk_queue_io_min(q, segment_size);
2312         blk_queue_io_opt(q, segment_size);
2313
2314         blk_queue_merge_bvec(q, rbd_merge_bvec);
2315         disk->queue = q;
2316
2317         q->queuedata = rbd_dev;
2318
2319         rbd_dev->disk = disk;
2320
2321         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2322
2323         return 0;
2324 out_disk:
2325         put_disk(disk);
2326
2327         return -ENOMEM;
2328 }
2329
2330 /*
2331   sysfs
2332 */
2333
2334 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2335 {
2336         return container_of(dev, struct rbd_device, dev);
2337 }
2338
2339 static ssize_t rbd_size_show(struct device *dev,
2340                              struct device_attribute *attr, char *buf)
2341 {
2342         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2343         sector_t size;
2344
2345         down_read(&rbd_dev->header_rwsem);
2346         size = get_capacity(rbd_dev->disk);
2347         up_read(&rbd_dev->header_rwsem);
2348
2349         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2350 }
2351
2352 /*
2353  * Note this shows the features for whatever's mapped, which is not
2354  * necessarily the base image.
2355  */
2356 static ssize_t rbd_features_show(struct device *dev,
2357                              struct device_attribute *attr, char *buf)
2358 {
2359         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2360
2361         return sprintf(buf, "0x%016llx\n",
2362                         (unsigned long long) rbd_dev->mapping.features);
2363 }
2364
2365 static ssize_t rbd_major_show(struct device *dev,
2366                               struct device_attribute *attr, char *buf)
2367 {
2368         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2369
2370         return sprintf(buf, "%d\n", rbd_dev->major);
2371 }
2372
2373 static ssize_t rbd_client_id_show(struct device *dev,
2374                                   struct device_attribute *attr, char *buf)
2375 {
2376         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2377
2378         return sprintf(buf, "client%lld\n",
2379                         ceph_client_id(rbd_dev->rbd_client->client));
2380 }
2381
2382 static ssize_t rbd_pool_show(struct device *dev,
2383                              struct device_attribute *attr, char *buf)
2384 {
2385         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2386
2387         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2388 }
2389
2390 static ssize_t rbd_pool_id_show(struct device *dev,
2391                              struct device_attribute *attr, char *buf)
2392 {
2393         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2394
2395         return sprintf(buf, "%llu\n",
2396                 (unsigned long long) rbd_dev->spec->pool_id);
2397 }
2398
2399 static ssize_t rbd_name_show(struct device *dev,
2400                              struct device_attribute *attr, char *buf)
2401 {
2402         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2403
2404         if (rbd_dev->spec->image_name)
2405                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2406
2407         return sprintf(buf, "(unknown)\n");
2408 }
2409
2410 static ssize_t rbd_image_id_show(struct device *dev,
2411                              struct device_attribute *attr, char *buf)
2412 {
2413         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2414
2415         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2416 }
2417
2418 /*
2419  * Shows the name of the currently-mapped snapshot (or
2420  * RBD_SNAP_HEAD_NAME for the base image).
2421  */
2422 static ssize_t rbd_snap_show(struct device *dev,
2423                              struct device_attribute *attr,
2424                              char *buf)
2425 {
2426         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2427
2428         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2429 }
2430
2431 /*
2432  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2433  * for the parent image.  If there is no parent, simply shows
2434  * "(no parent image)".
2435  */
2436 static ssize_t rbd_parent_show(struct device *dev,
2437                              struct device_attribute *attr,
2438                              char *buf)
2439 {
2440         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2441         struct rbd_spec *spec = rbd_dev->parent_spec;
2442         int count;
2443         char *bufp = buf;
2444
2445         if (!spec)
2446                 return sprintf(buf, "(no parent image)\n");
2447
2448         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2449                         (unsigned long long) spec->pool_id, spec->pool_name);
2450         if (count < 0)
2451                 return count;
2452         bufp += count;
2453
2454         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2455                         spec->image_name ? spec->image_name : "(unknown)");
2456         if (count < 0)
2457                 return count;
2458         bufp += count;
2459
2460         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2461                         (unsigned long long) spec->snap_id, spec->snap_name);
2462         if (count < 0)
2463                 return count;
2464         bufp += count;
2465
2466         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2467         if (count < 0)
2468                 return count;
2469         bufp += count;
2470
2471         return (ssize_t) (bufp - buf);
2472 }
2473
2474 static ssize_t rbd_image_refresh(struct device *dev,
2475                                  struct device_attribute *attr,
2476                                  const char *buf,
2477                                  size_t size)
2478 {
2479         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2480         int ret;
2481
2482         ret = rbd_dev_refresh(rbd_dev, NULL);
2483
2484         return ret < 0 ? ret : size;
2485 }
2486
2487 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2488 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2489 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2490 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2491 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2492 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2493 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2494 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2495 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2496 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2497 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2498
2499 static struct attribute *rbd_attrs[] = {
2500         &dev_attr_size.attr,
2501         &dev_attr_features.attr,
2502         &dev_attr_major.attr,
2503         &dev_attr_client_id.attr,
2504         &dev_attr_pool.attr,
2505         &dev_attr_pool_id.attr,
2506         &dev_attr_name.attr,
2507         &dev_attr_image_id.attr,
2508         &dev_attr_current_snap.attr,
2509         &dev_attr_parent.attr,
2510         &dev_attr_refresh.attr,
2511         NULL
2512 };
2513
2514 static struct attribute_group rbd_attr_group = {
2515         .attrs = rbd_attrs,
2516 };
2517
2518 static const struct attribute_group *rbd_attr_groups[] = {
2519         &rbd_attr_group,
2520         NULL
2521 };
2522
2523 static void rbd_sysfs_dev_release(struct device *dev)
2524 {
2525 }
2526
2527 static struct device_type rbd_device_type = {
2528         .name           = "rbd",
2529         .groups         = rbd_attr_groups,
2530         .release        = rbd_sysfs_dev_release,
2531 };
2532
2533
2534 /*
2535   sysfs - snapshots
2536 */
2537
2538 static ssize_t rbd_snap_size_show(struct device *dev,
2539                                   struct device_attribute *attr,
2540                                   char *buf)
2541 {
2542         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2543
2544         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2545 }
2546
2547 static ssize_t rbd_snap_id_show(struct device *dev,
2548                                 struct device_attribute *attr,
2549                                 char *buf)
2550 {
2551         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2552
2553         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2554 }
2555
2556 static ssize_t rbd_snap_features_show(struct device *dev,
2557                                 struct device_attribute *attr,
2558                                 char *buf)
2559 {
2560         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2561
2562         return sprintf(buf, "0x%016llx\n",
2563                         (unsigned long long) snap->features);
2564 }
2565
2566 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2567 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2568 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2569
2570 static struct attribute *rbd_snap_attrs[] = {
2571         &dev_attr_snap_size.attr,
2572         &dev_attr_snap_id.attr,
2573         &dev_attr_snap_features.attr,
2574         NULL,
2575 };
2576
2577 static struct attribute_group rbd_snap_attr_group = {
2578         .attrs = rbd_snap_attrs,
2579 };
2580
2581 static void rbd_snap_dev_release(struct device *dev)
2582 {
2583         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2584         kfree(snap->name);
2585         kfree(snap);
2586 }
2587
2588 static const struct attribute_group *rbd_snap_attr_groups[] = {
2589         &rbd_snap_attr_group,
2590         NULL
2591 };
2592
2593 static struct device_type rbd_snap_device_type = {
2594         .groups         = rbd_snap_attr_groups,
2595         .release        = rbd_snap_dev_release,
2596 };
2597
2598 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2599 {
2600         kref_get(&spec->kref);
2601
2602         return spec;
2603 }
2604
2605 static void rbd_spec_free(struct kref *kref);
2606 static void rbd_spec_put(struct rbd_spec *spec)
2607 {
2608         if (spec)
2609                 kref_put(&spec->kref, rbd_spec_free);
2610 }
2611
2612 static struct rbd_spec *rbd_spec_alloc(void)
2613 {
2614         struct rbd_spec *spec;
2615
2616         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2617         if (!spec)
2618                 return NULL;
2619         kref_init(&spec->kref);
2620
2621         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2622
2623         return spec;
2624 }
2625
2626 static void rbd_spec_free(struct kref *kref)
2627 {
2628         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2629
2630         kfree(spec->pool_name);
2631         kfree(spec->image_id);
2632         kfree(spec->image_name);
2633         kfree(spec->snap_name);
2634         kfree(spec);
2635 }
2636
2637 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2638                                 struct rbd_spec *spec)
2639 {
2640         struct rbd_device *rbd_dev;
2641
2642         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2643         if (!rbd_dev)
2644                 return NULL;
2645
2646         spin_lock_init(&rbd_dev->lock);
2647         rbd_dev->flags = 0;
2648         INIT_LIST_HEAD(&rbd_dev->node);
2649         INIT_LIST_HEAD(&rbd_dev->snaps);
2650         init_rwsem(&rbd_dev->header_rwsem);
2651
2652         rbd_dev->spec = spec;
2653         rbd_dev->rbd_client = rbdc;
2654
2655         /* Initialize the layout used for all rbd requests */
2656
2657         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2658         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2659         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2660         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2661
2662         return rbd_dev;
2663 }
2664
2665 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2666 {
2667         rbd_spec_put(rbd_dev->parent_spec);
2668         kfree(rbd_dev->header_name);
2669         rbd_put_client(rbd_dev->rbd_client);
2670         rbd_spec_put(rbd_dev->spec);
2671         kfree(rbd_dev);
2672 }
2673
2674 static bool rbd_snap_registered(struct rbd_snap *snap)
2675 {
2676         bool ret = snap->dev.type == &rbd_snap_device_type;
2677         bool reg = device_is_registered(&snap->dev);
2678
2679         rbd_assert(!ret ^ reg);
2680
2681         return ret;
2682 }
2683
2684 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2685 {
2686         list_del(&snap->node);
2687         if (device_is_registered(&snap->dev))
2688                 device_unregister(&snap->dev);
2689 }
2690
2691 static int rbd_register_snap_dev(struct rbd_snap *snap,
2692                                   struct device *parent)
2693 {
2694         struct device *dev = &snap->dev;
2695         int ret;
2696
2697         dev->type = &rbd_snap_device_type;
2698         dev->parent = parent;
2699         dev->release = rbd_snap_dev_release;
2700         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2701         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2702
2703         ret = device_register(dev);
2704
2705         return ret;
2706 }
2707
2708 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2709                                                 const char *snap_name,
2710                                                 u64 snap_id, u64 snap_size,
2711                                                 u64 snap_features)
2712 {
2713         struct rbd_snap *snap;
2714         int ret;
2715
2716         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2717         if (!snap)
2718                 return ERR_PTR(-ENOMEM);
2719
2720         ret = -ENOMEM;
2721         snap->name = kstrdup(snap_name, GFP_KERNEL);
2722         if (!snap->name)
2723                 goto err;
2724
2725         snap->id = snap_id;
2726         snap->size = snap_size;
2727         snap->features = snap_features;
2728
2729         return snap;
2730
2731 err:
2732         kfree(snap->name);
2733         kfree(snap);
2734
2735         return ERR_PTR(ret);
2736 }
2737
2738 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2739                 u64 *snap_size, u64 *snap_features)
2740 {
2741         char *snap_name;
2742
2743         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2744
2745         *snap_size = rbd_dev->header.snap_sizes[which];
2746         *snap_features = 0;     /* No features for v1 */
2747
2748         /* Skip over names until we find the one we are looking for */
2749
2750         snap_name = rbd_dev->header.snap_names;
2751         while (which--)
2752                 snap_name += strlen(snap_name) + 1;
2753
2754         return snap_name;
2755 }
2756
2757 /*
2758  * Get the size and object order for an image snapshot, or if
2759  * snap_id is CEPH_NOSNAP, gets this information for the base
2760  * image.
2761  */
2762 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2763                                 u8 *order, u64 *snap_size)
2764 {
2765         __le64 snapid = cpu_to_le64(snap_id);
2766         int ret;
2767         struct {
2768                 u8 order;
2769                 __le64 size;
2770         } __attribute__ ((packed)) size_buf = { 0 };
2771
2772         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2773                                 "rbd", "get_size",
2774                                 (char *) &snapid, sizeof (snapid),
2775                                 (char *) &size_buf, sizeof (size_buf), NULL);
2776         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2777         if (ret < 0)
2778                 return ret;
2779
2780         *order = size_buf.order;
2781         *snap_size = le64_to_cpu(size_buf.size);
2782
2783         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2784                 (unsigned long long) snap_id, (unsigned int) *order,
2785                 (unsigned long long) *snap_size);
2786
2787         return 0;
2788 }
2789
2790 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2791 {
2792         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2793                                         &rbd_dev->header.obj_order,
2794                                         &rbd_dev->header.image_size);
2795 }
2796
2797 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2798 {
2799         void *reply_buf;
2800         int ret;
2801         void *p;
2802
2803         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2804         if (!reply_buf)
2805                 return -ENOMEM;
2806
2807         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2808                                 "rbd", "get_object_prefix",
2809                                 NULL, 0,
2810                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2811         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2812         if (ret < 0)
2813                 goto out;
2814
2815         p = reply_buf;
2816         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2817                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2818                                                 NULL, GFP_NOIO);
2819
2820         if (IS_ERR(rbd_dev->header.object_prefix)) {
2821                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2822                 rbd_dev->header.object_prefix = NULL;
2823         } else {
2824                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2825         }
2826
2827 out:
2828         kfree(reply_buf);
2829
2830         return ret;
2831 }
2832
2833 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2834                 u64 *snap_features)
2835 {
2836         __le64 snapid = cpu_to_le64(snap_id);
2837         struct {
2838                 __le64 features;
2839                 __le64 incompat;
2840         } features_buf = { 0 };
2841         u64 incompat;
2842         int ret;
2843
2844         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2845                                 "rbd", "get_features",
2846                                 (char *) &snapid, sizeof (snapid),
2847                                 (char *) &features_buf, sizeof (features_buf),
2848                                 NULL);
2849         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2850         if (ret < 0)
2851                 return ret;
2852
2853         incompat = le64_to_cpu(features_buf.incompat);
2854         if (incompat & ~RBD_FEATURES_SUPPORTED)
2855                 return -ENXIO;
2856
2857         *snap_features = le64_to_cpu(features_buf.features);
2858
2859         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2860                 (unsigned long long) snap_id,
2861                 (unsigned long long) *snap_features,
2862                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2863
2864         return 0;
2865 }
2866
2867 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2868 {
2869         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2870                                                 &rbd_dev->header.features);
2871 }
2872
2873 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2874 {
2875         struct rbd_spec *parent_spec;
2876         size_t size;
2877         void *reply_buf = NULL;
2878         __le64 snapid;
2879         void *p;
2880         void *end;
2881         char *image_id;
2882         u64 overlap;
2883         int ret;
2884
2885         parent_spec = rbd_spec_alloc();
2886         if (!parent_spec)
2887                 return -ENOMEM;
2888
2889         size = sizeof (__le64) +                                /* pool_id */
2890                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2891                 sizeof (__le64) +                               /* snap_id */
2892                 sizeof (__le64);                                /* overlap */
2893         reply_buf = kmalloc(size, GFP_KERNEL);
2894         if (!reply_buf) {
2895                 ret = -ENOMEM;
2896                 goto out_err;
2897         }
2898
2899         snapid = cpu_to_le64(CEPH_NOSNAP);
2900         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2901                                 "rbd", "get_parent",
2902                                 (char *) &snapid, sizeof (snapid),
2903                                 (char *) reply_buf, size, NULL);
2904         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2905         if (ret < 0)
2906                 goto out_err;
2907
2908         ret = -ERANGE;
2909         p = reply_buf;
2910         end = (char *) reply_buf + size;
2911         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2912         if (parent_spec->pool_id == CEPH_NOPOOL)
2913                 goto out;       /* No parent?  No problem. */
2914
2915         /* The ceph file layout needs to fit pool id in 32 bits */
2916
2917         ret = -EIO;
2918         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2919                 goto out;
2920
2921         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2922         if (IS_ERR(image_id)) {
2923                 ret = PTR_ERR(image_id);
2924                 goto out_err;
2925         }
2926         parent_spec->image_id = image_id;
2927         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2928         ceph_decode_64_safe(&p, end, overlap, out_err);
2929
2930         rbd_dev->parent_overlap = overlap;
2931         rbd_dev->parent_spec = parent_spec;
2932         parent_spec = NULL;     /* rbd_dev now owns this */
2933 out:
2934         ret = 0;
2935 out_err:
2936         kfree(reply_buf);
2937         rbd_spec_put(parent_spec);
2938
2939         return ret;
2940 }
2941
2942 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2943 {
2944         size_t image_id_size;
2945         char *image_id;
2946         void *p;
2947         void *end;
2948         size_t size;
2949         void *reply_buf = NULL;
2950         size_t len = 0;
2951         char *image_name = NULL;
2952         int ret;
2953
2954         rbd_assert(!rbd_dev->spec->image_name);
2955
2956         len = strlen(rbd_dev->spec->image_id);
2957         image_id_size = sizeof (__le32) + len;
2958         image_id = kmalloc(image_id_size, GFP_KERNEL);
2959         if (!image_id)
2960                 return NULL;
2961
2962         p = image_id;
2963         end = (char *) image_id + image_id_size;
2964         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2965
2966         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2967         reply_buf = kmalloc(size, GFP_KERNEL);
2968         if (!reply_buf)
2969                 goto out;
2970
2971         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2972                                 "rbd", "dir_get_name",
2973                                 image_id, image_id_size,
2974                                 (char *) reply_buf, size, NULL);
2975         if (ret < 0)
2976                 goto out;
2977         p = reply_buf;
2978         end = (char *) reply_buf + size;
2979         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2980         if (IS_ERR(image_name))
2981                 image_name = NULL;
2982         else
2983                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2984 out:
2985         kfree(reply_buf);
2986         kfree(image_id);
2987
2988         return image_name;
2989 }
2990
2991 /*
2992  * When a parent image gets probed, we only have the pool, image,
2993  * and snapshot ids but not the names of any of them.  This call
2994  * is made later to fill in those names.  It has to be done after
2995  * rbd_dev_snaps_update() has completed because some of the
2996  * information (in particular, snapshot name) is not available
2997  * until then.
2998  */
2999 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3000 {
3001         struct ceph_osd_client *osdc;
3002         const char *name;
3003         void *reply_buf = NULL;
3004         int ret;
3005
3006         if (rbd_dev->spec->pool_name)
3007                 return 0;       /* Already have the names */
3008
3009         /* Look up the pool name */
3010
3011         osdc = &rbd_dev->rbd_client->client->osdc;
3012         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3013         if (!name) {
3014                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3015                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3016                 return -EIO;
3017         }
3018
3019         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3020         if (!rbd_dev->spec->pool_name)
3021                 return -ENOMEM;
3022
3023         /* Fetch the image name; tolerate failure here */
3024
3025         name = rbd_dev_image_name(rbd_dev);
3026         if (name)
3027                 rbd_dev->spec->image_name = (char *) name;
3028         else
3029                 rbd_warn(rbd_dev, "unable to get image name");
3030
3031         /* Look up the snapshot name. */
3032
3033         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3034         if (!name) {
3035                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3036                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3037                 ret = -EIO;
3038                 goto out_err;
3039         }
3040         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3041         if(!rbd_dev->spec->snap_name)
3042                 goto out_err;
3043
3044         return 0;
3045 out_err:
3046         kfree(reply_buf);
3047         kfree(rbd_dev->spec->pool_name);
3048         rbd_dev->spec->pool_name = NULL;
3049
3050         return ret;
3051 }
3052
3053 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3054 {
3055         size_t size;
3056         int ret;
3057         void *reply_buf;
3058         void *p;
3059         void *end;
3060         u64 seq;
3061         u32 snap_count;
3062         struct ceph_snap_context *snapc;
3063         u32 i;
3064
3065         /*
3066          * We'll need room for the seq value (maximum snapshot id),
3067          * snapshot count, and array of that many snapshot ids.
3068          * For now we have a fixed upper limit on the number we're
3069          * prepared to receive.
3070          */
3071         size = sizeof (__le64) + sizeof (__le32) +
3072                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3073         reply_buf = kzalloc(size, GFP_KERNEL);
3074         if (!reply_buf)
3075                 return -ENOMEM;
3076
3077         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3078                                 "rbd", "get_snapcontext",
3079                                 NULL, 0,
3080                                 reply_buf, size, ver);
3081         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3082         if (ret < 0)
3083                 goto out;
3084
3085         ret = -ERANGE;
3086         p = reply_buf;
3087         end = (char *) reply_buf + size;
3088         ceph_decode_64_safe(&p, end, seq, out);
3089         ceph_decode_32_safe(&p, end, snap_count, out);
3090
3091         /*
3092          * Make sure the reported number of snapshot ids wouldn't go
3093          * beyond the end of our buffer.  But before checking that,
3094          * make sure the computed size of the snapshot context we
3095          * allocate is representable in a size_t.
3096          */
3097         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3098                                  / sizeof (u64)) {
3099                 ret = -EINVAL;
3100                 goto out;
3101         }
3102         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3103                 goto out;
3104
3105         size = sizeof (struct ceph_snap_context) +
3106                                 snap_count * sizeof (snapc->snaps[0]);
3107         snapc = kmalloc(size, GFP_KERNEL);
3108         if (!snapc) {
3109                 ret = -ENOMEM;
3110                 goto out;
3111         }
3112
3113         atomic_set(&snapc->nref, 1);
3114         snapc->seq = seq;
3115         snapc->num_snaps = snap_count;
3116         for (i = 0; i < snap_count; i++)
3117                 snapc->snaps[i] = ceph_decode_64(&p);
3118
3119         rbd_dev->header.snapc = snapc;
3120
3121         dout("  snap context seq = %llu, snap_count = %u\n",
3122                 (unsigned long long) seq, (unsigned int) snap_count);
3123
3124 out:
3125         kfree(reply_buf);
3126
3127         return 0;
3128 }
3129
3130 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3131 {
3132         size_t size;
3133         void *reply_buf;
3134         __le64 snap_id;
3135         int ret;
3136         void *p;
3137         void *end;
3138         char *snap_name;
3139
3140         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3141         reply_buf = kmalloc(size, GFP_KERNEL);
3142         if (!reply_buf)
3143                 return ERR_PTR(-ENOMEM);
3144
3145         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3146         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3147                                 "rbd", "get_snapshot_name",
3148                                 (char *) &snap_id, sizeof (snap_id),
3149                                 reply_buf, size, NULL);
3150         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3151         if (ret < 0)
3152                 goto out;
3153
3154         p = reply_buf;
3155         end = (char *) reply_buf + size;
3156         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3157         if (IS_ERR(snap_name)) {
3158                 ret = PTR_ERR(snap_name);
3159                 goto out;
3160         } else {
3161                 dout("  snap_id 0x%016llx snap_name = %s\n",
3162                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3163         }
3164         kfree(reply_buf);
3165
3166         return snap_name;
3167 out:
3168         kfree(reply_buf);
3169
3170         return ERR_PTR(ret);
3171 }
3172
3173 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3174                 u64 *snap_size, u64 *snap_features)
3175 {
3176         u64 snap_id;
3177         u8 order;
3178         int ret;
3179
3180         snap_id = rbd_dev->header.snapc->snaps[which];
3181         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3182         if (ret)
3183                 return ERR_PTR(ret);
3184         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3185         if (ret)
3186                 return ERR_PTR(ret);
3187
3188         return rbd_dev_v2_snap_name(rbd_dev, which);
3189 }
3190
3191 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3192                 u64 *snap_size, u64 *snap_features)
3193 {
3194         if (rbd_dev->image_format == 1)
3195                 return rbd_dev_v1_snap_info(rbd_dev, which,
3196                                         snap_size, snap_features);
3197         if (rbd_dev->image_format == 2)
3198                 return rbd_dev_v2_snap_info(rbd_dev, which,
3199                                         snap_size, snap_features);
3200         return ERR_PTR(-EINVAL);
3201 }
3202
3203 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3204 {
3205         int ret;
3206         __u8 obj_order;
3207
3208         down_write(&rbd_dev->header_rwsem);
3209
3210         /* Grab old order first, to see if it changes */
3211
3212         obj_order = rbd_dev->header.obj_order,
3213         ret = rbd_dev_v2_image_size(rbd_dev);
3214         if (ret)
3215                 goto out;
3216         if (rbd_dev->header.obj_order != obj_order) {
3217                 ret = -EIO;
3218                 goto out;
3219         }
3220         rbd_update_mapping_size(rbd_dev);
3221
3222         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3223         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3224         if (ret)
3225                 goto out;
3226         ret = rbd_dev_snaps_update(rbd_dev);
3227         dout("rbd_dev_snaps_update returned %d\n", ret);
3228         if (ret)
3229                 goto out;
3230         ret = rbd_dev_snaps_register(rbd_dev);
3231         dout("rbd_dev_snaps_register returned %d\n", ret);
3232 out:
3233         up_write(&rbd_dev->header_rwsem);
3234
3235         return ret;
3236 }
3237
3238 /*
3239  * Scan the rbd device's current snapshot list and compare it to the
3240  * newly-received snapshot context.  Remove any existing snapshots
3241  * not present in the new snapshot context.  Add a new snapshot for
3242  * any snaphots in the snapshot context not in the current list.
3243  * And verify there are no changes to snapshots we already know
3244  * about.
3245  *
3246  * Assumes the snapshots in the snapshot context are sorted by
3247  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3248  * are also maintained in that order.)
3249  */
3250 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3251 {
3252         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3253         const u32 snap_count = snapc->num_snaps;
3254         struct list_head *head = &rbd_dev->snaps;
3255         struct list_head *links = head->next;
3256         u32 index = 0;
3257
3258         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3259         while (index < snap_count || links != head) {
3260                 u64 snap_id;
3261                 struct rbd_snap *snap;
3262                 char *snap_name;
3263                 u64 snap_size = 0;
3264                 u64 snap_features = 0;
3265
3266                 snap_id = index < snap_count ? snapc->snaps[index]
3267                                              : CEPH_NOSNAP;
3268                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3269                                      : NULL;
3270                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3271
3272                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3273                         struct list_head *next = links->next;
3274
3275                         /*
3276                          * A previously-existing snapshot is not in
3277                          * the new snap context.
3278                          *
3279                          * If the now missing snapshot is the one the
3280                          * image is mapped to, clear its exists flag
3281                          * so we can avoid sending any more requests
3282                          * to it.
3283                          */
3284                         if (rbd_dev->spec->snap_id == snap->id)
3285                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3286                         rbd_remove_snap_dev(snap);
3287                         dout("%ssnap id %llu has been removed\n",
3288                                 rbd_dev->spec->snap_id == snap->id ?
3289                                                         "mapped " : "",
3290                                 (unsigned long long) snap->id);
3291
3292                         /* Done with this list entry; advance */
3293
3294                         links = next;
3295                         continue;
3296                 }
3297
3298                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3299                                         &snap_size, &snap_features);
3300                 if (IS_ERR(snap_name))
3301                         return PTR_ERR(snap_name);
3302
3303                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3304                         (unsigned long long) snap_id);
3305                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3306                         struct rbd_snap *new_snap;
3307
3308                         /* We haven't seen this snapshot before */
3309
3310                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3311                                         snap_id, snap_size, snap_features);
3312                         if (IS_ERR(new_snap)) {
3313                                 int err = PTR_ERR(new_snap);
3314
3315                                 dout("  failed to add dev, error %d\n", err);
3316
3317                                 return err;
3318                         }
3319
3320                         /* New goes before existing, or at end of list */
3321
3322                         dout("  added dev%s\n", snap ? "" : " at end\n");
3323                         if (snap)
3324                                 list_add_tail(&new_snap->node, &snap->node);
3325                         else
3326                                 list_add_tail(&new_snap->node, head);
3327                 } else {
3328                         /* Already have this one */
3329
3330                         dout("  already present\n");
3331
3332                         rbd_assert(snap->size == snap_size);
3333                         rbd_assert(!strcmp(snap->name, snap_name));
3334                         rbd_assert(snap->features == snap_features);
3335
3336                         /* Done with this list entry; advance */
3337
3338                         links = links->next;
3339                 }
3340
3341                 /* Advance to the next entry in the snapshot context */
3342
3343                 index++;
3344         }
3345         dout("%s: done\n", __func__);
3346
3347         return 0;
3348 }
3349
3350 /*
3351  * Scan the list of snapshots and register the devices for any that
3352  * have not already been registered.
3353  */
3354 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3355 {
3356         struct rbd_snap *snap;
3357         int ret = 0;
3358
3359         dout("%s:\n", __func__);
3360         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3361                 return -EIO;
3362
3363         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3364                 if (!rbd_snap_registered(snap)) {
3365                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3366                         if (ret < 0)
3367                                 break;
3368                 }
3369         }
3370         dout("%s: returning %d\n", __func__, ret);
3371
3372         return ret;
3373 }
3374
3375 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3376 {
3377         struct device *dev;
3378         int ret;
3379
3380         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3381
3382         dev = &rbd_dev->dev;
3383         dev->bus = &rbd_bus_type;
3384         dev->type = &rbd_device_type;
3385         dev->parent = &rbd_root_dev;
3386         dev->release = rbd_dev_release;
3387         dev_set_name(dev, "%d", rbd_dev->dev_id);
3388         ret = device_register(dev);
3389
3390         mutex_unlock(&ctl_mutex);
3391
3392         return ret;
3393 }
3394
3395 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3396 {
3397         device_unregister(&rbd_dev->dev);
3398 }
3399
3400 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3401
3402 /*
3403  * Get a unique rbd identifier for the given new rbd_dev, and add
3404  * the rbd_dev to the global list.  The minimum rbd id is 1.
3405  */
3406 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3407 {
3408         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3409
3410         spin_lock(&rbd_dev_list_lock);
3411         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3412         spin_unlock(&rbd_dev_list_lock);
3413         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3414                 (unsigned long long) rbd_dev->dev_id);
3415 }
3416
3417 /*
3418  * Remove an rbd_dev from the global list, and record that its
3419  * identifier is no longer in use.
3420  */
3421 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3422 {
3423         struct list_head *tmp;
3424         int rbd_id = rbd_dev->dev_id;
3425         int max_id;
3426
3427         rbd_assert(rbd_id > 0);
3428
3429         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3430                 (unsigned long long) rbd_dev->dev_id);
3431         spin_lock(&rbd_dev_list_lock);
3432         list_del_init(&rbd_dev->node);
3433
3434         /*
3435          * If the id being "put" is not the current maximum, there
3436          * is nothing special we need to do.
3437          */
3438         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3439                 spin_unlock(&rbd_dev_list_lock);
3440                 return;
3441         }
3442
3443         /*
3444          * We need to update the current maximum id.  Search the
3445          * list to find out what it is.  We're more likely to find
3446          * the maximum at the end, so search the list backward.
3447          */
3448         max_id = 0;
3449         list_for_each_prev(tmp, &rbd_dev_list) {
3450                 struct rbd_device *rbd_dev;
3451
3452                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3453                 if (rbd_dev->dev_id > max_id)
3454                         max_id = rbd_dev->dev_id;
3455         }
3456         spin_unlock(&rbd_dev_list_lock);
3457
3458         /*
3459          * The max id could have been updated by rbd_dev_id_get(), in
3460          * which case it now accurately reflects the new maximum.
3461          * Be careful not to overwrite the maximum value in that
3462          * case.
3463          */
3464         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3465         dout("  max dev id has been reset\n");
3466 }
3467
3468 /*
3469  * Skips over white space at *buf, and updates *buf to point to the
3470  * first found non-space character (if any). Returns the length of
3471  * the token (string of non-white space characters) found.  Note
3472  * that *buf must be terminated with '\0'.
3473  */
3474 static inline size_t next_token(const char **buf)
3475 {
3476         /*
3477         * These are the characters that produce nonzero for
3478         * isspace() in the "C" and "POSIX" locales.
3479         */
3480         const char *spaces = " \f\n\r\t\v";
3481
3482         *buf += strspn(*buf, spaces);   /* Find start of token */
3483
3484         return strcspn(*buf, spaces);   /* Return token length */
3485 }
3486
3487 /*
3488  * Finds the next token in *buf, and if the provided token buffer is
3489  * big enough, copies the found token into it.  The result, if
3490  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3491  * must be terminated with '\0' on entry.
3492  *
3493  * Returns the length of the token found (not including the '\0').
3494  * Return value will be 0 if no token is found, and it will be >=
3495  * token_size if the token would not fit.
3496  *
3497  * The *buf pointer will be updated to point beyond the end of the
3498  * found token.  Note that this occurs even if the token buffer is
3499  * too small to hold it.
3500  */
3501 static inline size_t copy_token(const char **buf,
3502                                 char *token,
3503                                 size_t token_size)
3504 {
3505         size_t len;
3506
3507         len = next_token(buf);
3508         if (len < token_size) {
3509                 memcpy(token, *buf, len);
3510                 *(token + len) = '\0';
3511         }
3512         *buf += len;
3513
3514         return len;
3515 }
3516
3517 /*
3518  * Finds the next token in *buf, dynamically allocates a buffer big
3519  * enough to hold a copy of it, and copies the token into the new
3520  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3521  * that a duplicate buffer is created even for a zero-length token.
3522  *
3523  * Returns a pointer to the newly-allocated duplicate, or a null
3524  * pointer if memory for the duplicate was not available.  If
3525  * the lenp argument is a non-null pointer, the length of the token
3526  * (not including the '\0') is returned in *lenp.
3527  *
3528  * If successful, the *buf pointer will be updated to point beyond
3529  * the end of the found token.
3530  *
3531  * Note: uses GFP_KERNEL for allocation.
3532  */
3533 static inline char *dup_token(const char **buf, size_t *lenp)
3534 {
3535         char *dup;
3536         size_t len;
3537
3538         len = next_token(buf);
3539         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3540         if (!dup)
3541                 return NULL;
3542         *(dup + len) = '\0';
3543         *buf += len;
3544
3545         if (lenp)
3546                 *lenp = len;
3547
3548         return dup;
3549 }
3550
3551 /*
3552  * Parse the options provided for an "rbd add" (i.e., rbd image
3553  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3554  * and the data written is passed here via a NUL-terminated buffer.
3555  * Returns 0 if successful or an error code otherwise.
3556  *
3557  * The information extracted from these options is recorded in
3558  * the other parameters which return dynamically-allocated
3559  * structures:
3560  *  ceph_opts
3561  *      The address of a pointer that will refer to a ceph options
3562  *      structure.  Caller must release the returned pointer using
3563  *      ceph_destroy_options() when it is no longer needed.
3564  *  rbd_opts
3565  *      Address of an rbd options pointer.  Fully initialized by
3566  *      this function; caller must release with kfree().
3567  *  spec
3568  *      Address of an rbd image specification pointer.  Fully
3569  *      initialized by this function based on parsed options.
3570  *      Caller must release with rbd_spec_put().
3571  *
3572  * The options passed take this form:
3573  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3574  * where:
3575  *  <mon_addrs>
3576  *      A comma-separated list of one or more monitor addresses.
3577  *      A monitor address is an ip address, optionally followed
3578  *      by a port number (separated by a colon).
3579  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3580  *  <options>
3581  *      A comma-separated list of ceph and/or rbd options.
3582  *  <pool_name>
3583  *      The name of the rados pool containing the rbd image.
3584  *  <image_name>
3585  *      The name of the image in that pool to map.
3586  *  <snap_id>
3587  *      An optional snapshot id.  If provided, the mapping will
3588  *      present data from the image at the time that snapshot was
3589  *      created.  The image head is used if no snapshot id is
3590  *      provided.  Snapshot mappings are always read-only.
3591  */
3592 static int rbd_add_parse_args(const char *buf,
3593                                 struct ceph_options **ceph_opts,
3594                                 struct rbd_options **opts,
3595                                 struct rbd_spec **rbd_spec)
3596 {
3597         size_t len;
3598         char *options;
3599         const char *mon_addrs;
3600         size_t mon_addrs_size;
3601         struct rbd_spec *spec = NULL;
3602         struct rbd_options *rbd_opts = NULL;
3603         struct ceph_options *copts;
3604         int ret;
3605
3606         /* The first four tokens are required */
3607
3608         len = next_token(&buf);
3609         if (!len) {
3610                 rbd_warn(NULL, "no monitor address(es) provided");
3611                 return -EINVAL;
3612         }
3613         mon_addrs = buf;
3614         mon_addrs_size = len + 1;
3615         buf += len;
3616
3617         ret = -EINVAL;
3618         options = dup_token(&buf, NULL);
3619         if (!options)
3620                 return -ENOMEM;
3621         if (!*options) {
3622                 rbd_warn(NULL, "no options provided");
3623                 goto out_err;
3624         }
3625
3626         spec = rbd_spec_alloc();
3627         if (!spec)
3628                 goto out_mem;
3629
3630         spec->pool_name = dup_token(&buf, NULL);
3631         if (!spec->pool_name)
3632                 goto out_mem;
3633         if (!*spec->pool_name) {
3634                 rbd_warn(NULL, "no pool name provided");
3635                 goto out_err;
3636         }
3637
3638         spec->image_name = dup_token(&buf, NULL);
3639         if (!spec->image_name)
3640                 goto out_mem;
3641         if (!*spec->image_name) {
3642                 rbd_warn(NULL, "no image name provided");
3643                 goto out_err;
3644         }
3645
3646         /*
3647          * Snapshot name is optional; default is to use "-"
3648          * (indicating the head/no snapshot).
3649          */
3650         len = next_token(&buf);
3651         if (!len) {
3652                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3653                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3654         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3655                 ret = -ENAMETOOLONG;
3656                 goto out_err;
3657         }
3658         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3659         if (!spec->snap_name)
3660                 goto out_mem;
3661         *(spec->snap_name + len) = '\0';
3662
3663         /* Initialize all rbd options to the defaults */
3664
3665         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3666         if (!rbd_opts)
3667                 goto out_mem;
3668
3669         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3670
3671         copts = ceph_parse_options(options, mon_addrs,
3672                                         mon_addrs + mon_addrs_size - 1,
3673                                         parse_rbd_opts_token, rbd_opts);
3674         if (IS_ERR(copts)) {
3675                 ret = PTR_ERR(copts);
3676                 goto out_err;
3677         }
3678         kfree(options);
3679
3680         *ceph_opts = copts;
3681         *opts = rbd_opts;
3682         *rbd_spec = spec;
3683
3684         return 0;
3685 out_mem:
3686         ret = -ENOMEM;
3687 out_err:
3688         kfree(rbd_opts);
3689         rbd_spec_put(spec);
3690         kfree(options);
3691
3692         return ret;
3693 }
3694
3695 /*
3696  * An rbd format 2 image has a unique identifier, distinct from the
3697  * name given to it by the user.  Internally, that identifier is
3698  * what's used to specify the names of objects related to the image.
3699  *
3700  * A special "rbd id" object is used to map an rbd image name to its
3701  * id.  If that object doesn't exist, then there is no v2 rbd image
3702  * with the supplied name.
3703  *
3704  * This function will record the given rbd_dev's image_id field if
3705  * it can be determined, and in that case will return 0.  If any
3706  * errors occur a negative errno will be returned and the rbd_dev's
3707  * image_id field will be unchanged (and should be NULL).
3708  */
3709 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3710 {
3711         int ret;
3712         size_t size;
3713         char *object_name;
3714         void *response;
3715         void *p;
3716
3717         /*
3718          * When probing a parent image, the image id is already
3719          * known (and the image name likely is not).  There's no
3720          * need to fetch the image id again in this case.
3721          */
3722         if (rbd_dev->spec->image_id)
3723                 return 0;
3724
3725         /*
3726          * First, see if the format 2 image id file exists, and if
3727          * so, get the image's persistent id from it.
3728          */
3729         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3730         object_name = kmalloc(size, GFP_NOIO);
3731         if (!object_name)
3732                 return -ENOMEM;
3733         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3734         dout("rbd id object name is %s\n", object_name);
3735
3736         /* Response will be an encoded string, which includes a length */
3737
3738         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3739         response = kzalloc(size, GFP_NOIO);
3740         if (!response) {
3741                 ret = -ENOMEM;
3742                 goto out;
3743         }
3744
3745         ret = rbd_obj_method_sync(rbd_dev, object_name,
3746                                 "rbd", "get_id",
3747                                 NULL, 0,
3748                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3749         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3750         if (ret < 0)
3751                 goto out;
3752
3753         p = response;
3754         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3755                                                 p + RBD_IMAGE_ID_LEN_MAX,
3756                                                 NULL, GFP_NOIO);
3757         if (IS_ERR(rbd_dev->spec->image_id)) {
3758                 ret = PTR_ERR(rbd_dev->spec->image_id);
3759                 rbd_dev->spec->image_id = NULL;
3760         } else {
3761                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3762         }
3763 out:
3764         kfree(response);
3765         kfree(object_name);
3766
3767         return ret;
3768 }
3769
3770 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3771 {
3772         int ret;
3773         size_t size;
3774
3775         /* Version 1 images have no id; empty string is used */
3776
3777         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3778         if (!rbd_dev->spec->image_id)
3779                 return -ENOMEM;
3780
3781         /* Record the header object name for this rbd image. */
3782
3783         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3784         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3785         if (!rbd_dev->header_name) {
3786                 ret = -ENOMEM;
3787                 goto out_err;
3788         }
3789         sprintf(rbd_dev->header_name, "%s%s",
3790                 rbd_dev->spec->image_name, RBD_SUFFIX);
3791
3792         /* Populate rbd image metadata */
3793
3794         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3795         if (ret < 0)
3796                 goto out_err;
3797
3798         /* Version 1 images have no parent (no layering) */
3799
3800         rbd_dev->parent_spec = NULL;
3801         rbd_dev->parent_overlap = 0;
3802
3803         rbd_dev->image_format = 1;
3804
3805         dout("discovered version 1 image, header name is %s\n",
3806                 rbd_dev->header_name);
3807
3808         return 0;
3809
3810 out_err:
3811         kfree(rbd_dev->header_name);
3812         rbd_dev->header_name = NULL;
3813         kfree(rbd_dev->spec->image_id);
3814         rbd_dev->spec->image_id = NULL;
3815
3816         return ret;
3817 }
3818
3819 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3820 {
3821         size_t size;
3822         int ret;
3823         u64 ver = 0;
3824
3825         /*
3826          * Image id was filled in by the caller.  Record the header
3827          * object name for this rbd image.
3828          */
3829         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3830         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3831         if (!rbd_dev->header_name)
3832                 return -ENOMEM;
3833         sprintf(rbd_dev->header_name, "%s%s",
3834                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3835
3836         /* Get the size and object order for the image */
3837
3838         ret = rbd_dev_v2_image_size(rbd_dev);
3839         if (ret < 0)
3840                 goto out_err;
3841
3842         /* Get the object prefix (a.k.a. block_name) for the image */
3843
3844         ret = rbd_dev_v2_object_prefix(rbd_dev);
3845         if (ret < 0)
3846                 goto out_err;
3847
3848         /* Get the and check features for the image */
3849
3850         ret = rbd_dev_v2_features(rbd_dev);
3851         if (ret < 0)
3852                 goto out_err;
3853
3854         /* If the image supports layering, get the parent info */
3855
3856         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3857                 ret = rbd_dev_v2_parent_info(rbd_dev);
3858                 if (ret < 0)
3859                         goto out_err;
3860         }
3861
3862         /* crypto and compression type aren't (yet) supported for v2 images */
3863
3864         rbd_dev->header.crypt_type = 0;
3865         rbd_dev->header.comp_type = 0;
3866
3867         /* Get the snapshot context, plus the header version */
3868
3869         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3870         if (ret)
3871                 goto out_err;
3872         rbd_dev->header.obj_version = ver;
3873
3874         rbd_dev->image_format = 2;
3875
3876         dout("discovered version 2 image, header name is %s\n",
3877                 rbd_dev->header_name);
3878
3879         return 0;
3880 out_err:
3881         rbd_dev->parent_overlap = 0;
3882         rbd_spec_put(rbd_dev->parent_spec);
3883         rbd_dev->parent_spec = NULL;
3884         kfree(rbd_dev->header_name);
3885         rbd_dev->header_name = NULL;
3886         kfree(rbd_dev->header.object_prefix);
3887         rbd_dev->header.object_prefix = NULL;
3888
3889         return ret;
3890 }
3891
3892 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3893 {
3894         int ret;
3895
3896         /* no need to lock here, as rbd_dev is not registered yet */
3897         ret = rbd_dev_snaps_update(rbd_dev);
3898         if (ret)
3899                 return ret;
3900
3901         ret = rbd_dev_probe_update_spec(rbd_dev);
3902         if (ret)
3903                 goto err_out_snaps;
3904
3905         ret = rbd_dev_set_mapping(rbd_dev);
3906         if (ret)
3907                 goto err_out_snaps;
3908
3909         /* generate unique id: find highest unique id, add one */
3910         rbd_dev_id_get(rbd_dev);
3911
3912         /* Fill in the device name, now that we have its id. */
3913         BUILD_BUG_ON(DEV_NAME_LEN
3914                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3915         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3916
3917         /* Get our block major device number. */
3918
3919         ret = register_blkdev(0, rbd_dev->name);
3920         if (ret < 0)
3921                 goto err_out_id;
3922         rbd_dev->major = ret;
3923
3924         /* Set up the blkdev mapping. */
3925
3926         ret = rbd_init_disk(rbd_dev);
3927         if (ret)
3928                 goto err_out_blkdev;
3929
3930         ret = rbd_bus_add_dev(rbd_dev);
3931         if (ret)
3932                 goto err_out_disk;
3933
3934         /*
3935          * At this point cleanup in the event of an error is the job
3936          * of the sysfs code (initiated by rbd_bus_del_dev()).
3937          */
3938         down_write(&rbd_dev->header_rwsem);
3939         ret = rbd_dev_snaps_register(rbd_dev);
3940         up_write(&rbd_dev->header_rwsem);
3941         if (ret)
3942                 goto err_out_bus;
3943
3944         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3945         if (ret)
3946                 goto err_out_bus;
3947
3948         /* Everything's ready.  Announce the disk to the world. */
3949
3950         add_disk(rbd_dev->disk);
3951
3952         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3953                 (unsigned long long) rbd_dev->mapping.size);
3954
3955         return ret;
3956 err_out_bus:
3957         /* this will also clean up rest of rbd_dev stuff */
3958
3959         rbd_bus_del_dev(rbd_dev);
3960
3961         return ret;
3962 err_out_disk:
3963         rbd_free_disk(rbd_dev);
3964 err_out_blkdev:
3965         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3966 err_out_id:
3967         rbd_dev_id_put(rbd_dev);
3968 err_out_snaps:
3969         rbd_remove_all_snaps(rbd_dev);
3970
3971         return ret;
3972 }
3973
3974 /*
3975  * Probe for the existence of the header object for the given rbd
3976  * device.  For format 2 images this includes determining the image
3977  * id.
3978  */
3979 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3980 {
3981         int ret;
3982
3983         /*
3984          * Get the id from the image id object.  If it's not a
3985          * format 2 image, we'll get ENOENT back, and we'll assume
3986          * it's a format 1 image.
3987          */
3988         ret = rbd_dev_image_id(rbd_dev);
3989         if (ret)
3990                 ret = rbd_dev_v1_probe(rbd_dev);
3991         else
3992                 ret = rbd_dev_v2_probe(rbd_dev);
3993         if (ret) {
3994                 dout("probe failed, returning %d\n", ret);
3995
3996                 return ret;
3997         }
3998
3999         ret = rbd_dev_probe_finish(rbd_dev);
4000         if (ret)
4001                 rbd_header_free(&rbd_dev->header);
4002
4003         return ret;
4004 }
4005
4006 static ssize_t rbd_add(struct bus_type *bus,
4007                        const char *buf,
4008                        size_t count)
4009 {
4010         struct rbd_device *rbd_dev = NULL;
4011         struct ceph_options *ceph_opts = NULL;
4012         struct rbd_options *rbd_opts = NULL;
4013         struct rbd_spec *spec = NULL;
4014         struct rbd_client *rbdc;
4015         struct ceph_osd_client *osdc;
4016         int rc = -ENOMEM;
4017
4018         if (!try_module_get(THIS_MODULE))
4019                 return -ENODEV;
4020
4021         /* parse add command */
4022         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4023         if (rc < 0)
4024                 goto err_out_module;
4025
4026         rbdc = rbd_get_client(ceph_opts);
4027         if (IS_ERR(rbdc)) {
4028                 rc = PTR_ERR(rbdc);
4029                 goto err_out_args;
4030         }
4031         ceph_opts = NULL;       /* rbd_dev client now owns this */
4032
4033         /* pick the pool */
4034         osdc = &rbdc->client->osdc;
4035         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4036         if (rc < 0)
4037                 goto err_out_client;
4038         spec->pool_id = (u64) rc;
4039
4040         /* The ceph file layout needs to fit pool id in 32 bits */
4041
4042         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4043                 rc = -EIO;
4044                 goto err_out_client;
4045         }
4046
4047         rbd_dev = rbd_dev_create(rbdc, spec);
4048         if (!rbd_dev)
4049                 goto err_out_client;
4050         rbdc = NULL;            /* rbd_dev now owns this */
4051         spec = NULL;            /* rbd_dev now owns this */
4052
4053         rbd_dev->mapping.read_only = rbd_opts->read_only;
4054         kfree(rbd_opts);
4055         rbd_opts = NULL;        /* done with this */
4056
4057         rc = rbd_dev_probe(rbd_dev);
4058         if (rc < 0)
4059                 goto err_out_rbd_dev;
4060
4061         return count;
4062 err_out_rbd_dev:
4063         rbd_dev_destroy(rbd_dev);
4064 err_out_client:
4065         rbd_put_client(rbdc);
4066 err_out_args:
4067         if (ceph_opts)
4068                 ceph_destroy_options(ceph_opts);
4069         kfree(rbd_opts);
4070         rbd_spec_put(spec);
4071 err_out_module:
4072         module_put(THIS_MODULE);
4073
4074         dout("Error adding device %s\n", buf);
4075
4076         return (ssize_t) rc;
4077 }
4078
4079 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4080 {
4081         struct list_head *tmp;
4082         struct rbd_device *rbd_dev;
4083
4084         spin_lock(&rbd_dev_list_lock);
4085         list_for_each(tmp, &rbd_dev_list) {
4086                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4087                 if (rbd_dev->dev_id == dev_id) {
4088                         spin_unlock(&rbd_dev_list_lock);
4089                         return rbd_dev;
4090                 }
4091         }
4092         spin_unlock(&rbd_dev_list_lock);
4093         return NULL;
4094 }
4095
4096 static void rbd_dev_release(struct device *dev)
4097 {
4098         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4099
4100         if (rbd_dev->watch_event)
4101                 rbd_dev_header_watch_sync(rbd_dev, 0);
4102
4103         /* clean up and free blkdev */
4104         rbd_free_disk(rbd_dev);
4105         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4106
4107         /* release allocated disk header fields */
4108         rbd_header_free(&rbd_dev->header);
4109
4110         /* done with the id, and with the rbd_dev */
4111         rbd_dev_id_put(rbd_dev);
4112         rbd_assert(rbd_dev->rbd_client != NULL);
4113         rbd_dev_destroy(rbd_dev);
4114
4115         /* release module ref */
4116         module_put(THIS_MODULE);
4117 }
4118
4119 static ssize_t rbd_remove(struct bus_type *bus,
4120                           const char *buf,
4121                           size_t count)
4122 {
4123         struct rbd_device *rbd_dev = NULL;
4124         int target_id, rc;
4125         unsigned long ul;
4126         int ret = count;
4127
4128         rc = strict_strtoul(buf, 10, &ul);
4129         if (rc)
4130                 return rc;
4131
4132         /* convert to int; abort if we lost anything in the conversion */
4133         target_id = (int) ul;
4134         if (target_id != ul)
4135                 return -EINVAL;
4136
4137         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4138
4139         rbd_dev = __rbd_get_dev(target_id);
4140         if (!rbd_dev) {
4141                 ret = -ENOENT;
4142                 goto done;
4143         }
4144
4145         spin_lock_irq(&rbd_dev->lock);
4146         if (rbd_dev->open_count)
4147                 ret = -EBUSY;
4148         else
4149                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4150         spin_unlock_irq(&rbd_dev->lock);
4151         if (ret < 0)
4152                 goto done;
4153
4154         rbd_remove_all_snaps(rbd_dev);
4155         rbd_bus_del_dev(rbd_dev);
4156
4157 done:
4158         mutex_unlock(&ctl_mutex);
4159
4160         return ret;
4161 }
4162
4163 /*
4164  * create control files in sysfs
4165  * /sys/bus/rbd/...
4166  */
4167 static int rbd_sysfs_init(void)
4168 {
4169         int ret;
4170
4171         ret = device_register(&rbd_root_dev);
4172         if (ret < 0)
4173                 return ret;
4174
4175         ret = bus_register(&rbd_bus_type);
4176         if (ret < 0)
4177                 device_unregister(&rbd_root_dev);
4178
4179         return ret;
4180 }
4181
4182 static void rbd_sysfs_cleanup(void)
4183 {
4184         bus_unregister(&rbd_bus_type);
4185         device_unregister(&rbd_root_dev);
4186 }
4187
4188 static int __init rbd_init(void)
4189 {
4190         int rc;
4191
4192         if (!libceph_compatible(NULL)) {
4193                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4194
4195                 return -EINVAL;
4196         }
4197         rc = rbd_sysfs_init();
4198         if (rc)
4199                 return rc;
4200         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4201         return 0;
4202 }
4203
4204 static void __exit rbd_exit(void)
4205 {
4206         rbd_sysfs_cleanup();
4207 }
4208
4209 module_init(rbd_init);
4210 module_exit(rbd_exit);
4211
4212 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4213 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4214 MODULE_DESCRIPTION("rados block device");
4215
4216 /* following authorship retained from original osdblk.c */
4217 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4218
4219 MODULE_LICENSE("GPL");