]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/dtor/input
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX  ((u8)   (~0U))
58 #define U16_MAX ((u16)  (~0U))
59 #define U32_MAX ((u32)  (~0U))
60 #define U64_MAX ((u64)  (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN   \
69                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME      "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX    64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX  64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING      1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL          (0)
88
89 /*
90  * An RBD device name will be "rbd#", where the "rbd" comes from
91  * RBD_DRV_NAME above, and # is a unique integer identifier.
92  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93  * enough to hold all possible device names.
94  */
95 #define DEV_NAME_LEN            32
96 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 obj_version;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.  Each rbd_dev structure includes a pointer to
123  * an rbd_spec structure that encapsulates this identity.
124  *
125  * Each of the id's in an rbd_spec has an associated name.  For a
126  * user-mapped image, the names are supplied and the id's associated
127  * with them are looked up.  For a layered image, a parent image is
128  * defined by the tuple, and the names are looked up.
129  *
130  * An rbd_dev structure contains a parent_spec pointer which is
131  * non-null if the image it represents is a child in a layered
132  * image.  This pointer will refer to the rbd_spec structure used
133  * by the parent rbd_dev for its own identity (i.e., the structure
134  * is shared between the parent and child).
135  *
136  * Since these structures are populated once, during the discovery
137  * phase of image construction, they are effectively immutable so
138  * we make no effort to synchronize access to them.
139  *
140  * Note that code herein does not assume the image name is known (it
141  * could be a null pointer).
142  */
143 struct rbd_spec {
144         u64             pool_id;
145         char            *pool_name;
146
147         char            *image_id;
148         char            *image_name;
149
150         u64             snap_id;
151         char            *snap_name;
152
153         struct kref     kref;
154 };
155
156 /*
157  * an instance of the client.  multiple devices may share an rbd client.
158  */
159 struct rbd_client {
160         struct ceph_client      *client;
161         struct kref             kref;
162         struct list_head        node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 struct rbd_obj_request {
178         const char              *object_name;
179         u64                     offset;         /* object start byte */
180         u64                     length;         /* bytes from offset */
181
182         struct rbd_img_request  *img_request;
183         struct list_head        links;          /* img_request->obj_requests */
184         u32                     which;          /* posn image request list */
185
186         enum obj_request_type   type;
187         union {
188                 struct bio      *bio_list;
189                 struct {
190                         struct page     **pages;
191                         u32             page_count;
192                 };
193         };
194
195         struct ceph_osd_request *osd_req;
196
197         u64                     xferred;        /* bytes transferred */
198         u64                     version;
199         int                     result;
200         atomic_t                done;
201
202         rbd_obj_callback_t      callback;
203         struct completion       completion;
204
205         struct kref             kref;
206 };
207
208 struct rbd_img_request {
209         struct request          *rq;
210         struct rbd_device       *rbd_dev;
211         u64                     offset; /* starting image byte offset */
212         u64                     length; /* byte count from offset */
213         bool                    write_request;  /* false for read */
214         union {
215                 struct ceph_snap_context *snapc;        /* for writes */
216                 u64             snap_id;                /* for reads */
217         };
218         spinlock_t              completion_lock;/* protects next_completion */
219         u32                     next_completion;
220         rbd_img_callback_t      callback;
221
222         u32                     obj_request_count;
223         struct list_head        obj_requests;   /* rbd_obj_request structs */
224
225         struct kref             kref;
226 };
227
228 #define for_each_obj_request(ireq, oreq) \
229         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
234
235 struct rbd_snap {
236         struct  device          dev;
237         const char              *name;
238         u64                     size;
239         struct list_head        node;
240         u64                     id;
241         u64                     features;
242 };
243
244 struct rbd_mapping {
245         u64                     size;
246         u64                     features;
247         bool                    read_only;
248 };
249
250 /*
251  * a single device
252  */
253 struct rbd_device {
254         int                     dev_id;         /* blkdev unique id */
255
256         int                     major;          /* blkdev assigned major */
257         struct gendisk          *disk;          /* blkdev's gendisk and rq */
258
259         u32                     image_format;   /* Either 1 or 2 */
260         struct rbd_client       *rbd_client;
261
262         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264         spinlock_t              lock;           /* queue, flags, open_count */
265
266         struct rbd_image_header header;
267         unsigned long           flags;          /* possibly lock protected */
268         struct rbd_spec         *spec;
269
270         char                    *header_name;
271
272         struct ceph_file_layout layout;
273
274         struct ceph_osd_event   *watch_event;
275         struct rbd_obj_request  *watch_request;
276
277         struct rbd_spec         *parent_spec;
278         u64                     parent_overlap;
279
280         /* protects updating the header */
281         struct rw_semaphore     header_rwsem;
282
283         struct rbd_mapping      mapping;
284
285         struct list_head        node;
286
287         /* list of snapshots */
288         struct list_head        snaps;
289
290         /* sysfs related */
291         struct device           dev;
292         unsigned long           open_count;     /* protected by lock */
293 };
294
295 /*
296  * Flag bits for rbd_dev->flags.  If atomicity is required,
297  * rbd_dev->lock is used to protect access.
298  *
299  * Currently, only the "removing" flag (which is coupled with the
300  * "open_count" field) requires atomic access.
301  */
302 enum rbd_dev_flags {
303         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
304         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
305 };
306
307 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
308
309 static LIST_HEAD(rbd_dev_list);    /* devices */
310 static DEFINE_SPINLOCK(rbd_dev_list_lock);
311
312 static LIST_HEAD(rbd_client_list);              /* clients */
313 static DEFINE_SPINLOCK(rbd_client_list_lock);
314
315 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
317
318 static void rbd_dev_release(struct device *dev);
319 static void rbd_remove_snap_dev(struct rbd_snap *snap);
320
321 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322                        size_t count);
323 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324                           size_t count);
325
326 static struct bus_attribute rbd_bus_attrs[] = {
327         __ATTR(add, S_IWUSR, NULL, rbd_add),
328         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329         __ATTR_NULL
330 };
331
332 static struct bus_type rbd_bus_type = {
333         .name           = "rbd",
334         .bus_attrs      = rbd_bus_attrs,
335 };
336
337 static void rbd_root_dev_release(struct device *dev)
338 {
339 }
340
341 static struct device rbd_root_dev = {
342         .init_name =    "rbd",
343         .release =      rbd_root_dev_release,
344 };
345
346 static __printf(2, 3)
347 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348 {
349         struct va_format vaf;
350         va_list args;
351
352         va_start(args, fmt);
353         vaf.fmt = fmt;
354         vaf.va = &args;
355
356         if (!rbd_dev)
357                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358         else if (rbd_dev->disk)
359                 printk(KERN_WARNING "%s: %s: %pV\n",
360                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361         else if (rbd_dev->spec && rbd_dev->spec->image_name)
362                 printk(KERN_WARNING "%s: image %s: %pV\n",
363                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364         else if (rbd_dev->spec && rbd_dev->spec->image_id)
365                 printk(KERN_WARNING "%s: id %s: %pV\n",
366                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367         else    /* punt */
368                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369                         RBD_DRV_NAME, rbd_dev, &vaf);
370         va_end(args);
371 }
372
373 #ifdef RBD_DEBUG
374 #define rbd_assert(expr)                                                \
375                 if (unlikely(!(expr))) {                                \
376                         printk(KERN_ERR "\nAssertion failure in %s() "  \
377                                                 "at line %d:\n\n"       \
378                                         "\trbd_assert(%s);\n\n",        \
379                                         __func__, __LINE__, #expr);     \
380                         BUG();                                          \
381                 }
382 #else /* !RBD_DEBUG */
383 #  define rbd_assert(expr)      ((void) 0)
384 #endif /* !RBD_DEBUG */
385
386 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
388
389 static int rbd_open(struct block_device *bdev, fmode_t mode)
390 {
391         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392         bool removing = false;
393
394         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
395                 return -EROFS;
396
397         spin_lock_irq(&rbd_dev->lock);
398         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399                 removing = true;
400         else
401                 rbd_dev->open_count++;
402         spin_unlock_irq(&rbd_dev->lock);
403         if (removing)
404                 return -ENOENT;
405
406         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407         (void) get_device(&rbd_dev->dev);
408         set_device_ro(bdev, rbd_dev->mapping.read_only);
409         mutex_unlock(&ctl_mutex);
410
411         return 0;
412 }
413
414 static int rbd_release(struct gendisk *disk, fmode_t mode)
415 {
416         struct rbd_device *rbd_dev = disk->private_data;
417         unsigned long open_count_before;
418
419         spin_lock_irq(&rbd_dev->lock);
420         open_count_before = rbd_dev->open_count--;
421         spin_unlock_irq(&rbd_dev->lock);
422         rbd_assert(open_count_before > 0);
423
424         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
425         put_device(&rbd_dev->dev);
426         mutex_unlock(&ctl_mutex);
427
428         return 0;
429 }
430
431 static const struct block_device_operations rbd_bd_ops = {
432         .owner                  = THIS_MODULE,
433         .open                   = rbd_open,
434         .release                = rbd_release,
435 };
436
437 /*
438  * Initialize an rbd client instance.
439  * We own *ceph_opts.
440  */
441 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
442 {
443         struct rbd_client *rbdc;
444         int ret = -ENOMEM;
445
446         dout("%s:\n", __func__);
447         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448         if (!rbdc)
449                 goto out_opt;
450
451         kref_init(&rbdc->kref);
452         INIT_LIST_HEAD(&rbdc->node);
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455
456         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
457         if (IS_ERR(rbdc->client))
458                 goto out_mutex;
459         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
460
461         ret = ceph_open_session(rbdc->client);
462         if (ret < 0)
463                 goto out_err;
464
465         spin_lock(&rbd_client_list_lock);
466         list_add_tail(&rbdc->node, &rbd_client_list);
467         spin_unlock(&rbd_client_list_lock);
468
469         mutex_unlock(&ctl_mutex);
470         dout("%s: rbdc %p\n", __func__, rbdc);
471
472         return rbdc;
473
474 out_err:
475         ceph_destroy_client(rbdc->client);
476 out_mutex:
477         mutex_unlock(&ctl_mutex);
478         kfree(rbdc);
479 out_opt:
480         if (ceph_opts)
481                 ceph_destroy_options(ceph_opts);
482         dout("%s: error %d\n", __func__, ret);
483
484         return ERR_PTR(ret);
485 }
486
487 /*
488  * Find a ceph client with specific addr and configuration.  If
489  * found, bump its reference count.
490  */
491 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
492 {
493         struct rbd_client *client_node;
494         bool found = false;
495
496         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
497                 return NULL;
498
499         spin_lock(&rbd_client_list_lock);
500         list_for_each_entry(client_node, &rbd_client_list, node) {
501                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
502                         kref_get(&client_node->kref);
503                         found = true;
504                         break;
505                 }
506         }
507         spin_unlock(&rbd_client_list_lock);
508
509         return found ? client_node : NULL;
510 }
511
512 /*
513  * mount options
514  */
515 enum {
516         Opt_last_int,
517         /* int args above */
518         Opt_last_string,
519         /* string args above */
520         Opt_read_only,
521         Opt_read_write,
522         /* Boolean args above */
523         Opt_last_bool,
524 };
525
526 static match_table_t rbd_opts_tokens = {
527         /* int args above */
528         /* string args above */
529         {Opt_read_only, "read_only"},
530         {Opt_read_only, "ro"},          /* Alternate spelling */
531         {Opt_read_write, "read_write"},
532         {Opt_read_write, "rw"},         /* Alternate spelling */
533         /* Boolean args above */
534         {-1, NULL}
535 };
536
537 struct rbd_options {
538         bool    read_only;
539 };
540
541 #define RBD_READ_ONLY_DEFAULT   false
542
543 static int parse_rbd_opts_token(char *c, void *private)
544 {
545         struct rbd_options *rbd_opts = private;
546         substring_t argstr[MAX_OPT_ARGS];
547         int token, intval, ret;
548
549         token = match_token(c, rbd_opts_tokens, argstr);
550         if (token < 0)
551                 return -EINVAL;
552
553         if (token < Opt_last_int) {
554                 ret = match_int(&argstr[0], &intval);
555                 if (ret < 0) {
556                         pr_err("bad mount option arg (not int) "
557                                "at '%s'\n", c);
558                         return ret;
559                 }
560                 dout("got int token %d val %d\n", token, intval);
561         } else if (token > Opt_last_int && token < Opt_last_string) {
562                 dout("got string token %d val %s\n", token,
563                      argstr[0].from);
564         } else if (token > Opt_last_string && token < Opt_last_bool) {
565                 dout("got Boolean token %d\n", token);
566         } else {
567                 dout("got token %d\n", token);
568         }
569
570         switch (token) {
571         case Opt_read_only:
572                 rbd_opts->read_only = true;
573                 break;
574         case Opt_read_write:
575                 rbd_opts->read_only = false;
576                 break;
577         default:
578                 rbd_assert(false);
579                 break;
580         }
581         return 0;
582 }
583
584 /*
585  * Get a ceph client with specific addr and configuration, if one does
586  * not exist create it.
587  */
588 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
589 {
590         struct rbd_client *rbdc;
591
592         rbdc = rbd_client_find(ceph_opts);
593         if (rbdc)       /* using an existing client */
594                 ceph_destroy_options(ceph_opts);
595         else
596                 rbdc = rbd_client_create(ceph_opts);
597
598         return rbdc;
599 }
600
601 /*
602  * Destroy ceph client
603  *
604  * Caller must hold rbd_client_list_lock.
605  */
606 static void rbd_client_release(struct kref *kref)
607 {
608         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
609
610         dout("%s: rbdc %p\n", __func__, rbdc);
611         spin_lock(&rbd_client_list_lock);
612         list_del(&rbdc->node);
613         spin_unlock(&rbd_client_list_lock);
614
615         ceph_destroy_client(rbdc->client);
616         kfree(rbdc);
617 }
618
619 /*
620  * Drop reference to ceph client node. If it's not referenced anymore, release
621  * it.
622  */
623 static void rbd_put_client(struct rbd_client *rbdc)
624 {
625         if (rbdc)
626                 kref_put(&rbdc->kref, rbd_client_release);
627 }
628
629 static bool rbd_image_format_valid(u32 image_format)
630 {
631         return image_format == 1 || image_format == 2;
632 }
633
634 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
635 {
636         size_t size;
637         u32 snap_count;
638
639         /* The header has to start with the magic rbd header text */
640         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
641                 return false;
642
643         /* The bio layer requires at least sector-sized I/O */
644
645         if (ondisk->options.order < SECTOR_SHIFT)
646                 return false;
647
648         /* If we use u64 in a few spots we may be able to loosen this */
649
650         if (ondisk->options.order > 8 * sizeof (int) - 1)
651                 return false;
652
653         /*
654          * The size of a snapshot header has to fit in a size_t, and
655          * that limits the number of snapshots.
656          */
657         snap_count = le32_to_cpu(ondisk->snap_count);
658         size = SIZE_MAX - sizeof (struct ceph_snap_context);
659         if (snap_count > size / sizeof (__le64))
660                 return false;
661
662         /*
663          * Not only that, but the size of the entire the snapshot
664          * header must also be representable in a size_t.
665          */
666         size -= snap_count * sizeof (__le64);
667         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
668                 return false;
669
670         return true;
671 }
672
673 /*
674  * Create a new header structure, translate header format from the on-disk
675  * header.
676  */
677 static int rbd_header_from_disk(struct rbd_image_header *header,
678                                  struct rbd_image_header_ondisk *ondisk)
679 {
680         u32 snap_count;
681         size_t len;
682         size_t size;
683         u32 i;
684
685         memset(header, 0, sizeof (*header));
686
687         snap_count = le32_to_cpu(ondisk->snap_count);
688
689         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
690         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
691         if (!header->object_prefix)
692                 return -ENOMEM;
693         memcpy(header->object_prefix, ondisk->object_prefix, len);
694         header->object_prefix[len] = '\0';
695
696         if (snap_count) {
697                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
698
699                 /* Save a copy of the snapshot names */
700
701                 if (snap_names_len > (u64) SIZE_MAX)
702                         return -EIO;
703                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
704                 if (!header->snap_names)
705                         goto out_err;
706                 /*
707                  * Note that rbd_dev_v1_header_read() guarantees
708                  * the ondisk buffer we're working with has
709                  * snap_names_len bytes beyond the end of the
710                  * snapshot id array, this memcpy() is safe.
711                  */
712                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
713                         snap_names_len);
714
715                 /* Record each snapshot's size */
716
717                 size = snap_count * sizeof (*header->snap_sizes);
718                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
719                 if (!header->snap_sizes)
720                         goto out_err;
721                 for (i = 0; i < snap_count; i++)
722                         header->snap_sizes[i] =
723                                 le64_to_cpu(ondisk->snaps[i].image_size);
724         } else {
725                 WARN_ON(ondisk->snap_names_len);
726                 header->snap_names = NULL;
727                 header->snap_sizes = NULL;
728         }
729
730         header->features = 0;   /* No features support in v1 images */
731         header->obj_order = ondisk->options.order;
732         header->crypt_type = ondisk->options.crypt_type;
733         header->comp_type = ondisk->options.comp_type;
734
735         /* Allocate and fill in the snapshot context */
736
737         header->image_size = le64_to_cpu(ondisk->image_size);
738         size = sizeof (struct ceph_snap_context);
739         size += snap_count * sizeof (header->snapc->snaps[0]);
740         header->snapc = kzalloc(size, GFP_KERNEL);
741         if (!header->snapc)
742                 goto out_err;
743
744         atomic_set(&header->snapc->nref, 1);
745         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
746         header->snapc->num_snaps = snap_count;
747         for (i = 0; i < snap_count; i++)
748                 header->snapc->snaps[i] =
749                         le64_to_cpu(ondisk->snaps[i].id);
750
751         return 0;
752
753 out_err:
754         kfree(header->snap_sizes);
755         header->snap_sizes = NULL;
756         kfree(header->snap_names);
757         header->snap_names = NULL;
758         kfree(header->object_prefix);
759         header->object_prefix = NULL;
760
761         return -ENOMEM;
762 }
763
764 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
765 {
766         struct rbd_snap *snap;
767
768         if (snap_id == CEPH_NOSNAP)
769                 return RBD_SNAP_HEAD_NAME;
770
771         list_for_each_entry(snap, &rbd_dev->snaps, node)
772                 if (snap_id == snap->id)
773                         return snap->name;
774
775         return NULL;
776 }
777
778 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
779 {
780
781         struct rbd_snap *snap;
782
783         list_for_each_entry(snap, &rbd_dev->snaps, node) {
784                 if (!strcmp(snap_name, snap->name)) {
785                         rbd_dev->spec->snap_id = snap->id;
786                         rbd_dev->mapping.size = snap->size;
787                         rbd_dev->mapping.features = snap->features;
788
789                         return 0;
790                 }
791         }
792
793         return -ENOENT;
794 }
795
796 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
797 {
798         int ret;
799
800         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
801                     sizeof (RBD_SNAP_HEAD_NAME))) {
802                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
803                 rbd_dev->mapping.size = rbd_dev->header.image_size;
804                 rbd_dev->mapping.features = rbd_dev->header.features;
805                 ret = 0;
806         } else {
807                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
808                 if (ret < 0)
809                         goto done;
810                 rbd_dev->mapping.read_only = true;
811         }
812         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
813
814 done:
815         return ret;
816 }
817
818 static void rbd_header_free(struct rbd_image_header *header)
819 {
820         kfree(header->object_prefix);
821         header->object_prefix = NULL;
822         kfree(header->snap_sizes);
823         header->snap_sizes = NULL;
824         kfree(header->snap_names);
825         header->snap_names = NULL;
826         ceph_put_snap_context(header->snapc);
827         header->snapc = NULL;
828 }
829
830 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
831 {
832         char *name;
833         u64 segment;
834         int ret;
835
836         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
837         if (!name)
838                 return NULL;
839         segment = offset >> rbd_dev->header.obj_order;
840         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
841                         rbd_dev->header.object_prefix, segment);
842         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
843                 pr_err("error formatting segment name for #%llu (%d)\n",
844                         segment, ret);
845                 kfree(name);
846                 name = NULL;
847         }
848
849         return name;
850 }
851
852 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
853 {
854         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
855
856         return offset & (segment_size - 1);
857 }
858
859 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
860                                 u64 offset, u64 length)
861 {
862         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
863
864         offset &= segment_size - 1;
865
866         rbd_assert(length <= U64_MAX - offset);
867         if (offset + length > segment_size)
868                 length = segment_size - offset;
869
870         return length;
871 }
872
873 /*
874  * returns the size of an object in the image
875  */
876 static u64 rbd_obj_bytes(struct rbd_image_header *header)
877 {
878         return 1 << header->obj_order;
879 }
880
881 /*
882  * bio helpers
883  */
884
885 static void bio_chain_put(struct bio *chain)
886 {
887         struct bio *tmp;
888
889         while (chain) {
890                 tmp = chain;
891                 chain = chain->bi_next;
892                 bio_put(tmp);
893         }
894 }
895
896 /*
897  * zeros a bio chain, starting at specific offset
898  */
899 static void zero_bio_chain(struct bio *chain, int start_ofs)
900 {
901         struct bio_vec *bv;
902         unsigned long flags;
903         void *buf;
904         int i;
905         int pos = 0;
906
907         while (chain) {
908                 bio_for_each_segment(bv, chain, i) {
909                         if (pos + bv->bv_len > start_ofs) {
910                                 int remainder = max(start_ofs - pos, 0);
911                                 buf = bvec_kmap_irq(bv, &flags);
912                                 memset(buf + remainder, 0,
913                                        bv->bv_len - remainder);
914                                 bvec_kunmap_irq(buf, &flags);
915                         }
916                         pos += bv->bv_len;
917                 }
918
919                 chain = chain->bi_next;
920         }
921 }
922
923 /*
924  * Clone a portion of a bio, starting at the given byte offset
925  * and continuing for the number of bytes indicated.
926  */
927 static struct bio *bio_clone_range(struct bio *bio_src,
928                                         unsigned int offset,
929                                         unsigned int len,
930                                         gfp_t gfpmask)
931 {
932         struct bio_vec *bv;
933         unsigned int resid;
934         unsigned short idx;
935         unsigned int voff;
936         unsigned short end_idx;
937         unsigned short vcnt;
938         struct bio *bio;
939
940         /* Handle the easy case for the caller */
941
942         if (!offset && len == bio_src->bi_size)
943                 return bio_clone(bio_src, gfpmask);
944
945         if (WARN_ON_ONCE(!len))
946                 return NULL;
947         if (WARN_ON_ONCE(len > bio_src->bi_size))
948                 return NULL;
949         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
950                 return NULL;
951
952         /* Find first affected segment... */
953
954         resid = offset;
955         __bio_for_each_segment(bv, bio_src, idx, 0) {
956                 if (resid < bv->bv_len)
957                         break;
958                 resid -= bv->bv_len;
959         }
960         voff = resid;
961
962         /* ...and the last affected segment */
963
964         resid += len;
965         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
966                 if (resid <= bv->bv_len)
967                         break;
968                 resid -= bv->bv_len;
969         }
970         vcnt = end_idx - idx + 1;
971
972         /* Build the clone */
973
974         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
975         if (!bio)
976                 return NULL;    /* ENOMEM */
977
978         bio->bi_bdev = bio_src->bi_bdev;
979         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
980         bio->bi_rw = bio_src->bi_rw;
981         bio->bi_flags |= 1 << BIO_CLONED;
982
983         /*
984          * Copy over our part of the bio_vec, then update the first
985          * and last (or only) entries.
986          */
987         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
988                         vcnt * sizeof (struct bio_vec));
989         bio->bi_io_vec[0].bv_offset += voff;
990         if (vcnt > 1) {
991                 bio->bi_io_vec[0].bv_len -= voff;
992                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
993         } else {
994                 bio->bi_io_vec[0].bv_len = len;
995         }
996
997         bio->bi_vcnt = vcnt;
998         bio->bi_size = len;
999         bio->bi_idx = 0;
1000
1001         return bio;
1002 }
1003
1004 /*
1005  * Clone a portion of a bio chain, starting at the given byte offset
1006  * into the first bio in the source chain and continuing for the
1007  * number of bytes indicated.  The result is another bio chain of
1008  * exactly the given length, or a null pointer on error.
1009  *
1010  * The bio_src and offset parameters are both in-out.  On entry they
1011  * refer to the first source bio and the offset into that bio where
1012  * the start of data to be cloned is located.
1013  *
1014  * On return, bio_src is updated to refer to the bio in the source
1015  * chain that contains first un-cloned byte, and *offset will
1016  * contain the offset of that byte within that bio.
1017  */
1018 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1019                                         unsigned int *offset,
1020                                         unsigned int len,
1021                                         gfp_t gfpmask)
1022 {
1023         struct bio *bi = *bio_src;
1024         unsigned int off = *offset;
1025         struct bio *chain = NULL;
1026         struct bio **end;
1027
1028         /* Build up a chain of clone bios up to the limit */
1029
1030         if (!bi || off >= bi->bi_size || !len)
1031                 return NULL;            /* Nothing to clone */
1032
1033         end = &chain;
1034         while (len) {
1035                 unsigned int bi_size;
1036                 struct bio *bio;
1037
1038                 if (!bi) {
1039                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1040                         goto out_err;   /* EINVAL; ran out of bio's */
1041                 }
1042                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1043                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1044                 if (!bio)
1045                         goto out_err;   /* ENOMEM */
1046
1047                 *end = bio;
1048                 end = &bio->bi_next;
1049
1050                 off += bi_size;
1051                 if (off == bi->bi_size) {
1052                         bi = bi->bi_next;
1053                         off = 0;
1054                 }
1055                 len -= bi_size;
1056         }
1057         *bio_src = bi;
1058         *offset = off;
1059
1060         return chain;
1061 out_err:
1062         bio_chain_put(chain);
1063
1064         return NULL;
1065 }
1066
1067 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1068 {
1069         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1070                 atomic_read(&obj_request->kref.refcount));
1071         kref_get(&obj_request->kref);
1072 }
1073
1074 static void rbd_obj_request_destroy(struct kref *kref);
1075 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1076 {
1077         rbd_assert(obj_request != NULL);
1078         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1079                 atomic_read(&obj_request->kref.refcount));
1080         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1081 }
1082
1083 static void rbd_img_request_get(struct rbd_img_request *img_request)
1084 {
1085         dout("%s: img %p (was %d)\n", __func__, img_request,
1086                 atomic_read(&img_request->kref.refcount));
1087         kref_get(&img_request->kref);
1088 }
1089
1090 static void rbd_img_request_destroy(struct kref *kref);
1091 static void rbd_img_request_put(struct rbd_img_request *img_request)
1092 {
1093         rbd_assert(img_request != NULL);
1094         dout("%s: img %p (was %d)\n", __func__, img_request,
1095                 atomic_read(&img_request->kref.refcount));
1096         kref_put(&img_request->kref, rbd_img_request_destroy);
1097 }
1098
1099 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1100                                         struct rbd_obj_request *obj_request)
1101 {
1102         rbd_assert(obj_request->img_request == NULL);
1103
1104         rbd_obj_request_get(obj_request);
1105         obj_request->img_request = img_request;
1106         obj_request->which = img_request->obj_request_count;
1107         rbd_assert(obj_request->which != BAD_WHICH);
1108         img_request->obj_request_count++;
1109         list_add_tail(&obj_request->links, &img_request->obj_requests);
1110         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1111                 obj_request->which);
1112 }
1113
1114 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1115                                         struct rbd_obj_request *obj_request)
1116 {
1117         rbd_assert(obj_request->which != BAD_WHICH);
1118
1119         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1120                 obj_request->which);
1121         list_del(&obj_request->links);
1122         rbd_assert(img_request->obj_request_count > 0);
1123         img_request->obj_request_count--;
1124         rbd_assert(obj_request->which == img_request->obj_request_count);
1125         obj_request->which = BAD_WHICH;
1126         rbd_assert(obj_request->img_request == img_request);
1127         obj_request->img_request = NULL;
1128         obj_request->callback = NULL;
1129         rbd_obj_request_put(obj_request);
1130 }
1131
1132 static bool obj_request_type_valid(enum obj_request_type type)
1133 {
1134         switch (type) {
1135         case OBJ_REQUEST_NODATA:
1136         case OBJ_REQUEST_BIO:
1137         case OBJ_REQUEST_PAGES:
1138                 return true;
1139         default:
1140                 return false;
1141         }
1142 }
1143
1144 static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1145 {
1146         struct ceph_osd_req_op *op;
1147         va_list args;
1148         size_t size;
1149
1150         op = kzalloc(sizeof (*op), GFP_NOIO);
1151         if (!op)
1152                 return NULL;
1153         op->op = opcode;
1154         va_start(args, opcode);
1155         switch (opcode) {
1156         case CEPH_OSD_OP_READ:
1157         case CEPH_OSD_OP_WRITE:
1158                 /* rbd_osd_req_op_create(READ, offset, length) */
1159                 /* rbd_osd_req_op_create(WRITE, offset, length) */
1160                 op->extent.offset = va_arg(args, u64);
1161                 op->extent.length = va_arg(args, u64);
1162                 if (opcode == CEPH_OSD_OP_WRITE)
1163                         op->payload_len = op->extent.length;
1164                 break;
1165         case CEPH_OSD_OP_STAT:
1166                 break;
1167         case CEPH_OSD_OP_CALL:
1168                 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1169                 op->cls.class_name = va_arg(args, char *);
1170                 size = strlen(op->cls.class_name);
1171                 rbd_assert(size <= (size_t) U8_MAX);
1172                 op->cls.class_len = size;
1173                 op->payload_len = size;
1174
1175                 op->cls.method_name = va_arg(args, char *);
1176                 size = strlen(op->cls.method_name);
1177                 rbd_assert(size <= (size_t) U8_MAX);
1178                 op->cls.method_len = size;
1179                 op->payload_len += size;
1180
1181                 op->cls.argc = 0;
1182                 op->cls.indata = va_arg(args, void *);
1183                 size = va_arg(args, size_t);
1184                 rbd_assert(size <= (size_t) U32_MAX);
1185                 op->cls.indata_len = (u32) size;
1186                 op->payload_len += size;
1187                 break;
1188         case CEPH_OSD_OP_NOTIFY_ACK:
1189         case CEPH_OSD_OP_WATCH:
1190                 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1191                 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1192                 op->watch.cookie = va_arg(args, u64);
1193                 op->watch.ver = va_arg(args, u64);
1194                 op->watch.ver = cpu_to_le64(op->watch.ver);
1195                 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1196                         op->watch.flag = (u8) 1;
1197                 break;
1198         default:
1199                 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1200                 kfree(op);
1201                 op = NULL;
1202                 break;
1203         }
1204         va_end(args);
1205
1206         return op;
1207 }
1208
1209 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1210 {
1211         kfree(op);
1212 }
1213
1214 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1215                                 struct rbd_obj_request *obj_request)
1216 {
1217         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1218
1219         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1220 }
1221
1222 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1223 {
1224         dout("%s: img %p\n", __func__, img_request);
1225         if (img_request->callback)
1226                 img_request->callback(img_request);
1227         else
1228                 rbd_img_request_put(img_request);
1229 }
1230
1231 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1232
1233 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1234 {
1235         dout("%s: obj %p\n", __func__, obj_request);
1236
1237         return wait_for_completion_interruptible(&obj_request->completion);
1238 }
1239
1240 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1241 {
1242         atomic_set(&obj_request->done, 0);
1243         smp_wmb();
1244 }
1245
1246 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1247 {
1248         int done;
1249
1250         done = atomic_inc_return(&obj_request->done);
1251         if (done > 1) {
1252                 struct rbd_img_request *img_request = obj_request->img_request;
1253                 struct rbd_device *rbd_dev;
1254
1255                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1256                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1257                         obj_request);
1258         }
1259 }
1260
1261 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1262 {
1263         smp_mb();
1264         return atomic_read(&obj_request->done) != 0;
1265 }
1266
1267 static void
1268 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1269 {
1270         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1271                 obj_request, obj_request->img_request, obj_request->result,
1272                 obj_request->xferred, obj_request->length);
1273         /*
1274          * ENOENT means a hole in the image.  We zero-fill the
1275          * entire length of the request.  A short read also implies
1276          * zero-fill to the end of the request.  Either way we
1277          * update the xferred count to indicate the whole request
1278          * was satisfied.
1279          */
1280         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1281         if (obj_request->result == -ENOENT) {
1282                 zero_bio_chain(obj_request->bio_list, 0);
1283                 obj_request->result = 0;
1284                 obj_request->xferred = obj_request->length;
1285         } else if (obj_request->xferred < obj_request->length &&
1286                         !obj_request->result) {
1287                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1288                 obj_request->xferred = obj_request->length;
1289         }
1290         obj_request_done_set(obj_request);
1291 }
1292
1293 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1294 {
1295         dout("%s: obj %p cb %p\n", __func__, obj_request,
1296                 obj_request->callback);
1297         if (obj_request->callback)
1298                 obj_request->callback(obj_request);
1299         else
1300                 complete_all(&obj_request->completion);
1301 }
1302
1303 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1304 {
1305         dout("%s: obj %p\n", __func__, obj_request);
1306         obj_request_done_set(obj_request);
1307 }
1308
1309 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1310 {
1311         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1312                 obj_request->result, obj_request->xferred, obj_request->length);
1313         if (obj_request->img_request)
1314                 rbd_img_obj_request_read_callback(obj_request);
1315         else
1316                 obj_request_done_set(obj_request);
1317 }
1318
1319 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1320 {
1321         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1322                 obj_request->result, obj_request->length);
1323         /*
1324          * There is no such thing as a successful short write.
1325          * Our xferred value is the number of bytes transferred
1326          * back.  Set it to our originally-requested length.
1327          */
1328         obj_request->xferred = obj_request->length;
1329         obj_request_done_set(obj_request);
1330 }
1331
1332 /*
1333  * For a simple stat call there's nothing to do.  We'll do more if
1334  * this is part of a write sequence for a layered image.
1335  */
1336 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1337 {
1338         dout("%s: obj %p\n", __func__, obj_request);
1339         obj_request_done_set(obj_request);
1340 }
1341
1342 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1343                                 struct ceph_msg *msg)
1344 {
1345         struct rbd_obj_request *obj_request = osd_req->r_priv;
1346         u16 opcode;
1347
1348         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1349         rbd_assert(osd_req == obj_request->osd_req);
1350         rbd_assert(!!obj_request->img_request ^
1351                                 (obj_request->which == BAD_WHICH));
1352
1353         if (osd_req->r_result < 0)
1354                 obj_request->result = osd_req->r_result;
1355         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1356
1357         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1358
1359         /*
1360          * We support a 64-bit length, but ultimately it has to be
1361          * passed to blk_end_request(), which takes an unsigned int.
1362          */
1363         obj_request->xferred = osd_req->r_reply_op_len[0];
1364         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1365         opcode = osd_req->r_request_ops[0].op;
1366         switch (opcode) {
1367         case CEPH_OSD_OP_READ:
1368                 rbd_osd_read_callback(obj_request);
1369                 break;
1370         case CEPH_OSD_OP_WRITE:
1371                 rbd_osd_write_callback(obj_request);
1372                 break;
1373         case CEPH_OSD_OP_STAT:
1374                 rbd_osd_stat_callback(obj_request);
1375                 break;
1376         case CEPH_OSD_OP_CALL:
1377         case CEPH_OSD_OP_NOTIFY_ACK:
1378         case CEPH_OSD_OP_WATCH:
1379                 rbd_osd_trivial_callback(obj_request);
1380                 break;
1381         default:
1382                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1383                         obj_request->object_name, (unsigned short) opcode);
1384                 break;
1385         }
1386
1387         if (obj_request_done_test(obj_request))
1388                 rbd_obj_request_complete(obj_request);
1389 }
1390
1391 static struct ceph_osd_request *rbd_osd_req_create(
1392                                         struct rbd_device *rbd_dev,
1393                                         bool write_request,
1394                                         struct rbd_obj_request *obj_request,
1395                                         struct ceph_osd_req_op *op)
1396 {
1397         struct rbd_img_request *img_request = obj_request->img_request;
1398         struct ceph_snap_context *snapc = NULL;
1399         struct ceph_osd_client *osdc;
1400         struct ceph_osd_request *osd_req;
1401         struct timespec now;
1402         struct timespec *mtime;
1403         u64 snap_id = CEPH_NOSNAP;
1404         u64 offset = obj_request->offset;
1405         u64 length = obj_request->length;
1406
1407         if (img_request) {
1408                 rbd_assert(img_request->write_request == write_request);
1409                 if (img_request->write_request)
1410                         snapc = img_request->snapc;
1411                 else
1412                         snap_id = img_request->snap_id;
1413         }
1414
1415         /* Allocate and initialize the request, for the single op */
1416
1417         osdc = &rbd_dev->rbd_client->client->osdc;
1418         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1419         if (!osd_req)
1420                 return NULL;    /* ENOMEM */
1421
1422         rbd_assert(obj_request_type_valid(obj_request->type));
1423         switch (obj_request->type) {
1424         case OBJ_REQUEST_NODATA:
1425                 break;          /* Nothing to do */
1426         case OBJ_REQUEST_BIO:
1427                 rbd_assert(obj_request->bio_list != NULL);
1428                 osd_req->r_bio = obj_request->bio_list;
1429                 break;
1430         case OBJ_REQUEST_PAGES:
1431                 osd_req->r_pages = obj_request->pages;
1432                 osd_req->r_num_pages = obj_request->page_count;
1433                 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1434                 break;
1435         }
1436
1437         if (write_request) {
1438                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1439                 now = CURRENT_TIME;
1440                 mtime = &now;
1441         } else {
1442                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1443                 mtime = NULL;   /* not needed for reads */
1444                 offset = 0;     /* These are not used... */
1445                 length = 0;     /* ...for osd read requests */
1446         }
1447
1448         osd_req->r_callback = rbd_osd_req_callback;
1449         osd_req->r_priv = obj_request;
1450
1451         osd_req->r_oid_len = strlen(obj_request->object_name);
1452         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1453         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1454
1455         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1456
1457         /* osd_req will get its own reference to snapc (if non-null) */
1458
1459         ceph_osdc_build_request(osd_req, offset, length, 1, op,
1460                                 snapc, snap_id, mtime);
1461
1462         return osd_req;
1463 }
1464
1465 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1466 {
1467         ceph_osdc_put_request(osd_req);
1468 }
1469
1470 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1471
1472 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1473                                                 u64 offset, u64 length,
1474                                                 enum obj_request_type type)
1475 {
1476         struct rbd_obj_request *obj_request;
1477         size_t size;
1478         char *name;
1479
1480         rbd_assert(obj_request_type_valid(type));
1481
1482         size = strlen(object_name) + 1;
1483         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1484         if (!obj_request)
1485                 return NULL;
1486
1487         name = (char *)(obj_request + 1);
1488         obj_request->object_name = memcpy(name, object_name, size);
1489         obj_request->offset = offset;
1490         obj_request->length = length;
1491         obj_request->which = BAD_WHICH;
1492         obj_request->type = type;
1493         INIT_LIST_HEAD(&obj_request->links);
1494         obj_request_done_init(obj_request);
1495         init_completion(&obj_request->completion);
1496         kref_init(&obj_request->kref);
1497
1498         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1499                 offset, length, (int)type, obj_request);
1500
1501         return obj_request;
1502 }
1503
1504 static void rbd_obj_request_destroy(struct kref *kref)
1505 {
1506         struct rbd_obj_request *obj_request;
1507
1508         obj_request = container_of(kref, struct rbd_obj_request, kref);
1509
1510         dout("%s: obj %p\n", __func__, obj_request);
1511
1512         rbd_assert(obj_request->img_request == NULL);
1513         rbd_assert(obj_request->which == BAD_WHICH);
1514
1515         if (obj_request->osd_req)
1516                 rbd_osd_req_destroy(obj_request->osd_req);
1517
1518         rbd_assert(obj_request_type_valid(obj_request->type));
1519         switch (obj_request->type) {
1520         case OBJ_REQUEST_NODATA:
1521                 break;          /* Nothing to do */
1522         case OBJ_REQUEST_BIO:
1523                 if (obj_request->bio_list)
1524                         bio_chain_put(obj_request->bio_list);
1525                 break;
1526         case OBJ_REQUEST_PAGES:
1527                 if (obj_request->pages)
1528                         ceph_release_page_vector(obj_request->pages,
1529                                                 obj_request->page_count);
1530                 break;
1531         }
1532
1533         kfree(obj_request);
1534 }
1535
1536 /*
1537  * Caller is responsible for filling in the list of object requests
1538  * that comprises the image request, and the Linux request pointer
1539  * (if there is one).
1540  */
1541 static struct rbd_img_request *rbd_img_request_create(
1542                                         struct rbd_device *rbd_dev,
1543                                         u64 offset, u64 length,
1544                                         bool write_request)
1545 {
1546         struct rbd_img_request *img_request;
1547         struct ceph_snap_context *snapc = NULL;
1548
1549         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1550         if (!img_request)
1551                 return NULL;
1552
1553         if (write_request) {
1554                 down_read(&rbd_dev->header_rwsem);
1555                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1556                 up_read(&rbd_dev->header_rwsem);
1557                 if (WARN_ON(!snapc)) {
1558                         kfree(img_request);
1559                         return NULL;    /* Shouldn't happen */
1560                 }
1561         }
1562
1563         img_request->rq = NULL;
1564         img_request->rbd_dev = rbd_dev;
1565         img_request->offset = offset;
1566         img_request->length = length;
1567         img_request->write_request = write_request;
1568         if (write_request)
1569                 img_request->snapc = snapc;
1570         else
1571                 img_request->snap_id = rbd_dev->spec->snap_id;
1572         spin_lock_init(&img_request->completion_lock);
1573         img_request->next_completion = 0;
1574         img_request->callback = NULL;
1575         img_request->obj_request_count = 0;
1576         INIT_LIST_HEAD(&img_request->obj_requests);
1577         kref_init(&img_request->kref);
1578
1579         rbd_img_request_get(img_request);       /* Avoid a warning */
1580         rbd_img_request_put(img_request);       /* TEMPORARY */
1581
1582         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1583                 write_request ? "write" : "read", offset, length,
1584                 img_request);
1585
1586         return img_request;
1587 }
1588
1589 static void rbd_img_request_destroy(struct kref *kref)
1590 {
1591         struct rbd_img_request *img_request;
1592         struct rbd_obj_request *obj_request;
1593         struct rbd_obj_request *next_obj_request;
1594
1595         img_request = container_of(kref, struct rbd_img_request, kref);
1596
1597         dout("%s: img %p\n", __func__, img_request);
1598
1599         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1600                 rbd_img_obj_request_del(img_request, obj_request);
1601         rbd_assert(img_request->obj_request_count == 0);
1602
1603         if (img_request->write_request)
1604                 ceph_put_snap_context(img_request->snapc);
1605
1606         kfree(img_request);
1607 }
1608
1609 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1610                                         struct bio *bio_list)
1611 {
1612         struct rbd_device *rbd_dev = img_request->rbd_dev;
1613         struct rbd_obj_request *obj_request = NULL;
1614         struct rbd_obj_request *next_obj_request;
1615         unsigned int bio_offset;
1616         u64 image_offset;
1617         u64 resid;
1618         u16 opcode;
1619
1620         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1621
1622         opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1623                                               : CEPH_OSD_OP_READ;
1624         bio_offset = 0;
1625         image_offset = img_request->offset;
1626         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1627         resid = img_request->length;
1628         rbd_assert(resid > 0);
1629         while (resid) {
1630                 const char *object_name;
1631                 unsigned int clone_size;
1632                 struct ceph_osd_req_op *op;
1633                 u64 offset;
1634                 u64 length;
1635
1636                 object_name = rbd_segment_name(rbd_dev, image_offset);
1637                 if (!object_name)
1638                         goto out_unwind;
1639                 offset = rbd_segment_offset(rbd_dev, image_offset);
1640                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1641                 obj_request = rbd_obj_request_create(object_name,
1642                                                 offset, length,
1643                                                 OBJ_REQUEST_BIO);
1644                 kfree(object_name);     /* object request has its own copy */
1645                 if (!obj_request)
1646                         goto out_unwind;
1647
1648                 rbd_assert(length <= (u64) UINT_MAX);
1649                 clone_size = (unsigned int) length;
1650                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1651                                                 &bio_offset, clone_size,
1652                                                 GFP_ATOMIC);
1653                 if (!obj_request->bio_list)
1654                         goto out_partial;
1655
1656                 /*
1657                  * Build up the op to use in building the osd
1658                  * request.  Note that the contents of the op are
1659                  * copied by rbd_osd_req_create().
1660                  */
1661                 op = rbd_osd_req_op_create(opcode, offset, length);
1662                 if (!op)
1663                         goto out_partial;
1664                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1665                                                 img_request->write_request,
1666                                                 obj_request, op);
1667                 rbd_osd_req_op_destroy(op);
1668                 if (!obj_request->osd_req)
1669                         goto out_partial;
1670                 /* status and version are initially zero-filled */
1671
1672                 rbd_img_obj_request_add(img_request, obj_request);
1673
1674                 image_offset += length;
1675                 resid -= length;
1676         }
1677
1678         return 0;
1679
1680 out_partial:
1681         rbd_obj_request_put(obj_request);
1682 out_unwind:
1683         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1684                 rbd_obj_request_put(obj_request);
1685
1686         return -ENOMEM;
1687 }
1688
1689 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1690 {
1691         struct rbd_img_request *img_request;
1692         u32 which = obj_request->which;
1693         bool more = true;
1694
1695         img_request = obj_request->img_request;
1696
1697         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1698         rbd_assert(img_request != NULL);
1699         rbd_assert(img_request->rq != NULL);
1700         rbd_assert(img_request->obj_request_count > 0);
1701         rbd_assert(which != BAD_WHICH);
1702         rbd_assert(which < img_request->obj_request_count);
1703         rbd_assert(which >= img_request->next_completion);
1704
1705         spin_lock_irq(&img_request->completion_lock);
1706         if (which != img_request->next_completion)
1707                 goto out;
1708
1709         for_each_obj_request_from(img_request, obj_request) {
1710                 unsigned int xferred;
1711                 int result;
1712
1713                 rbd_assert(more);
1714                 rbd_assert(which < img_request->obj_request_count);
1715
1716                 if (!obj_request_done_test(obj_request))
1717                         break;
1718
1719                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1720                 xferred = (unsigned int) obj_request->xferred;
1721                 result = (int) obj_request->result;
1722                 if (result)
1723                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1724                                 img_request->write_request ? "write" : "read",
1725                                 result, xferred);
1726
1727                 more = blk_end_request(img_request->rq, result, xferred);
1728                 which++;
1729         }
1730
1731         rbd_assert(more ^ (which == img_request->obj_request_count));
1732         img_request->next_completion = which;
1733 out:
1734         spin_unlock_irq(&img_request->completion_lock);
1735
1736         if (!more)
1737                 rbd_img_request_complete(img_request);
1738 }
1739
1740 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1741 {
1742         struct rbd_device *rbd_dev = img_request->rbd_dev;
1743         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1744         struct rbd_obj_request *obj_request;
1745
1746         dout("%s: img %p\n", __func__, img_request);
1747         for_each_obj_request(img_request, obj_request) {
1748                 int ret;
1749
1750                 obj_request->callback = rbd_img_obj_callback;
1751                 ret = rbd_obj_request_submit(osdc, obj_request);
1752                 if (ret)
1753                         return ret;
1754                 /*
1755                  * The image request has its own reference to each
1756                  * of its object requests, so we can safely drop the
1757                  * initial one here.
1758                  */
1759                 rbd_obj_request_put(obj_request);
1760         }
1761
1762         return 0;
1763 }
1764
1765 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1766                                    u64 ver, u64 notify_id)
1767 {
1768         struct rbd_obj_request *obj_request;
1769         struct ceph_osd_req_op *op;
1770         struct ceph_osd_client *osdc;
1771         int ret;
1772
1773         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1774                                                         OBJ_REQUEST_NODATA);
1775         if (!obj_request)
1776                 return -ENOMEM;
1777
1778         ret = -ENOMEM;
1779         op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1780         if (!op)
1781                 goto out;
1782         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1783                                                 obj_request, op);
1784         rbd_osd_req_op_destroy(op);
1785         if (!obj_request->osd_req)
1786                 goto out;
1787
1788         osdc = &rbd_dev->rbd_client->client->osdc;
1789         obj_request->callback = rbd_obj_request_put;
1790         ret = rbd_obj_request_submit(osdc, obj_request);
1791 out:
1792         if (ret)
1793                 rbd_obj_request_put(obj_request);
1794
1795         return ret;
1796 }
1797
1798 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1799 {
1800         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1801         u64 hver;
1802         int rc;
1803
1804         if (!rbd_dev)
1805                 return;
1806
1807         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1808                 rbd_dev->header_name, (unsigned long long) notify_id,
1809                 (unsigned int) opcode);
1810         rc = rbd_dev_refresh(rbd_dev, &hver);
1811         if (rc)
1812                 rbd_warn(rbd_dev, "got notification but failed to "
1813                            " update snaps: %d\n", rc);
1814
1815         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1816 }
1817
1818 /*
1819  * Request sync osd watch/unwatch.  The value of "start" determines
1820  * whether a watch request is being initiated or torn down.
1821  */
1822 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1823 {
1824         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1825         struct rbd_obj_request *obj_request;
1826         struct ceph_osd_req_op *op;
1827         int ret;
1828
1829         rbd_assert(start ^ !!rbd_dev->watch_event);
1830         rbd_assert(start ^ !!rbd_dev->watch_request);
1831
1832         if (start) {
1833                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1834                                                 &rbd_dev->watch_event);
1835                 if (ret < 0)
1836                         return ret;
1837                 rbd_assert(rbd_dev->watch_event != NULL);
1838         }
1839
1840         ret = -ENOMEM;
1841         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1842                                                         OBJ_REQUEST_NODATA);
1843         if (!obj_request)
1844                 goto out_cancel;
1845
1846         op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1847                                 rbd_dev->watch_event->cookie,
1848                                 rbd_dev->header.obj_version, start);
1849         if (!op)
1850                 goto out_cancel;
1851         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1852                                                         obj_request, op);
1853         rbd_osd_req_op_destroy(op);
1854         if (!obj_request->osd_req)
1855                 goto out_cancel;
1856
1857         if (start)
1858                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1859         else
1860                 ceph_osdc_unregister_linger_request(osdc,
1861                                         rbd_dev->watch_request->osd_req);
1862         ret = rbd_obj_request_submit(osdc, obj_request);
1863         if (ret)
1864                 goto out_cancel;
1865         ret = rbd_obj_request_wait(obj_request);
1866         if (ret)
1867                 goto out_cancel;
1868         ret = obj_request->result;
1869         if (ret)
1870                 goto out_cancel;
1871
1872         /*
1873          * A watch request is set to linger, so the underlying osd
1874          * request won't go away until we unregister it.  We retain
1875          * a pointer to the object request during that time (in
1876          * rbd_dev->watch_request), so we'll keep a reference to
1877          * it.  We'll drop that reference (below) after we've
1878          * unregistered it.
1879          */
1880         if (start) {
1881                 rbd_dev->watch_request = obj_request;
1882
1883                 return 0;
1884         }
1885
1886         /* We have successfully torn down the watch request */
1887
1888         rbd_obj_request_put(rbd_dev->watch_request);
1889         rbd_dev->watch_request = NULL;
1890 out_cancel:
1891         /* Cancel the event if we're tearing down, or on error */
1892         ceph_osdc_cancel_event(rbd_dev->watch_event);
1893         rbd_dev->watch_event = NULL;
1894         if (obj_request)
1895                 rbd_obj_request_put(obj_request);
1896
1897         return ret;
1898 }
1899
1900 /*
1901  * Synchronous osd object method call
1902  */
1903 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1904                              const char *object_name,
1905                              const char *class_name,
1906                              const char *method_name,
1907                              const char *outbound,
1908                              size_t outbound_size,
1909                              char *inbound,
1910                              size_t inbound_size,
1911                              u64 *version)
1912 {
1913         struct rbd_obj_request *obj_request;
1914         struct ceph_osd_client *osdc;
1915         struct ceph_osd_req_op *op;
1916         struct page **pages;
1917         u32 page_count;
1918         int ret;
1919
1920         /*
1921          * Method calls are ultimately read operations but they
1922          * don't involve object data (so no offset or length).
1923          * The result should placed into the inbound buffer
1924          * provided.  They also supply outbound data--parameters for
1925          * the object method.  Currently if this is present it will
1926          * be a snapshot id.
1927          */
1928         page_count = (u32) calc_pages_for(0, inbound_size);
1929         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1930         if (IS_ERR(pages))
1931                 return PTR_ERR(pages);
1932
1933         ret = -ENOMEM;
1934         obj_request = rbd_obj_request_create(object_name, 0, 0,
1935                                                         OBJ_REQUEST_PAGES);
1936         if (!obj_request)
1937                 goto out;
1938
1939         obj_request->pages = pages;
1940         obj_request->page_count = page_count;
1941
1942         op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1943                                         method_name, outbound, outbound_size);
1944         if (!op)
1945                 goto out;
1946         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1947                                                 obj_request, op);
1948         rbd_osd_req_op_destroy(op);
1949         if (!obj_request->osd_req)
1950                 goto out;
1951
1952         osdc = &rbd_dev->rbd_client->client->osdc;
1953         ret = rbd_obj_request_submit(osdc, obj_request);
1954         if (ret)
1955                 goto out;
1956         ret = rbd_obj_request_wait(obj_request);
1957         if (ret)
1958                 goto out;
1959
1960         ret = obj_request->result;
1961         if (ret < 0)
1962                 goto out;
1963         ret = 0;
1964         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1965         if (version)
1966                 *version = obj_request->version;
1967 out:
1968         if (obj_request)
1969                 rbd_obj_request_put(obj_request);
1970         else
1971                 ceph_release_page_vector(pages, page_count);
1972
1973         return ret;
1974 }
1975
1976 static void rbd_request_fn(struct request_queue *q)
1977                 __releases(q->queue_lock) __acquires(q->queue_lock)
1978 {
1979         struct rbd_device *rbd_dev = q->queuedata;
1980         bool read_only = rbd_dev->mapping.read_only;
1981         struct request *rq;
1982         int result;
1983
1984         while ((rq = blk_fetch_request(q))) {
1985                 bool write_request = rq_data_dir(rq) == WRITE;
1986                 struct rbd_img_request *img_request;
1987                 u64 offset;
1988                 u64 length;
1989
1990                 /* Ignore any non-FS requests that filter through. */
1991
1992                 if (rq->cmd_type != REQ_TYPE_FS) {
1993                         dout("%s: non-fs request type %d\n", __func__,
1994                                 (int) rq->cmd_type);
1995                         __blk_end_request_all(rq, 0);
1996                         continue;
1997                 }
1998
1999                 /* Ignore/skip any zero-length requests */
2000
2001                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2002                 length = (u64) blk_rq_bytes(rq);
2003
2004                 if (!length) {
2005                         dout("%s: zero-length request\n", __func__);
2006                         __blk_end_request_all(rq, 0);
2007                         continue;
2008                 }
2009
2010                 spin_unlock_irq(q->queue_lock);
2011
2012                 /* Disallow writes to a read-only device */
2013
2014                 if (write_request) {
2015                         result = -EROFS;
2016                         if (read_only)
2017                                 goto end_request;
2018                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2019                 }
2020
2021                 /*
2022                  * Quit early if the mapped snapshot no longer
2023                  * exists.  It's still possible the snapshot will
2024                  * have disappeared by the time our request arrives
2025                  * at the osd, but there's no sense in sending it if
2026                  * we already know.
2027                  */
2028                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2029                         dout("request for non-existent snapshot");
2030                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2031                         result = -ENXIO;
2032                         goto end_request;
2033                 }
2034
2035                 result = -EINVAL;
2036                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2037                         goto end_request;       /* Shouldn't happen */
2038
2039                 result = -ENOMEM;
2040                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2041                                                         write_request);
2042                 if (!img_request)
2043                         goto end_request;
2044
2045                 img_request->rq = rq;
2046
2047                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2048                 if (!result)
2049                         result = rbd_img_request_submit(img_request);
2050                 if (result)
2051                         rbd_img_request_put(img_request);
2052 end_request:
2053                 spin_lock_irq(q->queue_lock);
2054                 if (result < 0) {
2055                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
2056                                 write_request ? "write" : "read", result);
2057                         __blk_end_request_all(rq, result);
2058                 }
2059         }
2060 }
2061
2062 /*
2063  * a queue callback. Makes sure that we don't create a bio that spans across
2064  * multiple osd objects. One exception would be with a single page bios,
2065  * which we handle later at bio_chain_clone_range()
2066  */
2067 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2068                           struct bio_vec *bvec)
2069 {
2070         struct rbd_device *rbd_dev = q->queuedata;
2071         sector_t sector_offset;
2072         sector_t sectors_per_obj;
2073         sector_t obj_sector_offset;
2074         int ret;
2075
2076         /*
2077          * Find how far into its rbd object the partition-relative
2078          * bio start sector is to offset relative to the enclosing
2079          * device.
2080          */
2081         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2082         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2083         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2084
2085         /*
2086          * Compute the number of bytes from that offset to the end
2087          * of the object.  Account for what's already used by the bio.
2088          */
2089         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2090         if (ret > bmd->bi_size)
2091                 ret -= bmd->bi_size;
2092         else
2093                 ret = 0;
2094
2095         /*
2096          * Don't send back more than was asked for.  And if the bio
2097          * was empty, let the whole thing through because:  "Note
2098          * that a block device *must* allow a single page to be
2099          * added to an empty bio."
2100          */
2101         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2102         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2103                 ret = (int) bvec->bv_len;
2104
2105         return ret;
2106 }
2107
2108 static void rbd_free_disk(struct rbd_device *rbd_dev)
2109 {
2110         struct gendisk *disk = rbd_dev->disk;
2111
2112         if (!disk)
2113                 return;
2114
2115         if (disk->flags & GENHD_FL_UP)
2116                 del_gendisk(disk);
2117         if (disk->queue)
2118                 blk_cleanup_queue(disk->queue);
2119         put_disk(disk);
2120 }
2121
2122 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2123                                 const char *object_name,
2124                                 u64 offset, u64 length,
2125                                 char *buf, u64 *version)
2126
2127 {
2128         struct ceph_osd_req_op *op;
2129         struct rbd_obj_request *obj_request;
2130         struct ceph_osd_client *osdc;
2131         struct page **pages = NULL;
2132         u32 page_count;
2133         size_t size;
2134         int ret;
2135
2136         page_count = (u32) calc_pages_for(offset, length);
2137         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2138         if (IS_ERR(pages))
2139                 ret = PTR_ERR(pages);
2140
2141         ret = -ENOMEM;
2142         obj_request = rbd_obj_request_create(object_name, offset, length,
2143                                                         OBJ_REQUEST_PAGES);
2144         if (!obj_request)
2145                 goto out;
2146
2147         obj_request->pages = pages;
2148         obj_request->page_count = page_count;
2149
2150         op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2151         if (!op)
2152                 goto out;
2153         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2154                                                 obj_request, op);
2155         rbd_osd_req_op_destroy(op);
2156         if (!obj_request->osd_req)
2157                 goto out;
2158
2159         osdc = &rbd_dev->rbd_client->client->osdc;
2160         ret = rbd_obj_request_submit(osdc, obj_request);
2161         if (ret)
2162                 goto out;
2163         ret = rbd_obj_request_wait(obj_request);
2164         if (ret)
2165                 goto out;
2166
2167         ret = obj_request->result;
2168         if (ret < 0)
2169                 goto out;
2170
2171         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2172         size = (size_t) obj_request->xferred;
2173         ceph_copy_from_page_vector(pages, buf, 0, size);
2174         rbd_assert(size <= (size_t) INT_MAX);
2175         ret = (int) size;
2176         if (version)
2177                 *version = obj_request->version;
2178 out:
2179         if (obj_request)
2180                 rbd_obj_request_put(obj_request);
2181         else
2182                 ceph_release_page_vector(pages, page_count);
2183
2184         return ret;
2185 }
2186
2187 /*
2188  * Read the complete header for the given rbd device.
2189  *
2190  * Returns a pointer to a dynamically-allocated buffer containing
2191  * the complete and validated header.  Caller can pass the address
2192  * of a variable that will be filled in with the version of the
2193  * header object at the time it was read.
2194  *
2195  * Returns a pointer-coded errno if a failure occurs.
2196  */
2197 static struct rbd_image_header_ondisk *
2198 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2199 {
2200         struct rbd_image_header_ondisk *ondisk = NULL;
2201         u32 snap_count = 0;
2202         u64 names_size = 0;
2203         u32 want_count;
2204         int ret;
2205
2206         /*
2207          * The complete header will include an array of its 64-bit
2208          * snapshot ids, followed by the names of those snapshots as
2209          * a contiguous block of NUL-terminated strings.  Note that
2210          * the number of snapshots could change by the time we read
2211          * it in, in which case we re-read it.
2212          */
2213         do {
2214                 size_t size;
2215
2216                 kfree(ondisk);
2217
2218                 size = sizeof (*ondisk);
2219                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2220                 size += names_size;
2221                 ondisk = kmalloc(size, GFP_KERNEL);
2222                 if (!ondisk)
2223                         return ERR_PTR(-ENOMEM);
2224
2225                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2226                                        0, size,
2227                                        (char *) ondisk, version);
2228                 if (ret < 0)
2229                         goto out_err;
2230                 if (WARN_ON((size_t) ret < size)) {
2231                         ret = -ENXIO;
2232                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2233                                 size, ret);
2234                         goto out_err;
2235                 }
2236                 if (!rbd_dev_ondisk_valid(ondisk)) {
2237                         ret = -ENXIO;
2238                         rbd_warn(rbd_dev, "invalid header");
2239                         goto out_err;
2240                 }
2241
2242                 names_size = le64_to_cpu(ondisk->snap_names_len);
2243                 want_count = snap_count;
2244                 snap_count = le32_to_cpu(ondisk->snap_count);
2245         } while (snap_count != want_count);
2246
2247         return ondisk;
2248
2249 out_err:
2250         kfree(ondisk);
2251
2252         return ERR_PTR(ret);
2253 }
2254
2255 /*
2256  * reload the ondisk the header
2257  */
2258 static int rbd_read_header(struct rbd_device *rbd_dev,
2259                            struct rbd_image_header *header)
2260 {
2261         struct rbd_image_header_ondisk *ondisk;
2262         u64 ver = 0;
2263         int ret;
2264
2265         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2266         if (IS_ERR(ondisk))
2267                 return PTR_ERR(ondisk);
2268         ret = rbd_header_from_disk(header, ondisk);
2269         if (ret >= 0)
2270                 header->obj_version = ver;
2271         kfree(ondisk);
2272
2273         return ret;
2274 }
2275
2276 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2277 {
2278         struct rbd_snap *snap;
2279         struct rbd_snap *next;
2280
2281         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2282                 rbd_remove_snap_dev(snap);
2283 }
2284
2285 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2286 {
2287         sector_t size;
2288
2289         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2290                 return;
2291
2292         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2293         dout("setting size to %llu sectors", (unsigned long long) size);
2294         rbd_dev->mapping.size = (u64) size;
2295         set_capacity(rbd_dev->disk, size);
2296 }
2297
2298 /*
2299  * only read the first part of the ondisk header, without the snaps info
2300  */
2301 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2302 {
2303         int ret;
2304         struct rbd_image_header h;
2305
2306         ret = rbd_read_header(rbd_dev, &h);
2307         if (ret < 0)
2308                 return ret;
2309
2310         down_write(&rbd_dev->header_rwsem);
2311
2312         /* Update image size, and check for resize of mapped image */
2313         rbd_dev->header.image_size = h.image_size;
2314         rbd_update_mapping_size(rbd_dev);
2315
2316         /* rbd_dev->header.object_prefix shouldn't change */
2317         kfree(rbd_dev->header.snap_sizes);
2318         kfree(rbd_dev->header.snap_names);
2319         /* osd requests may still refer to snapc */
2320         ceph_put_snap_context(rbd_dev->header.snapc);
2321
2322         if (hver)
2323                 *hver = h.obj_version;
2324         rbd_dev->header.obj_version = h.obj_version;
2325         rbd_dev->header.image_size = h.image_size;
2326         rbd_dev->header.snapc = h.snapc;
2327         rbd_dev->header.snap_names = h.snap_names;
2328         rbd_dev->header.snap_sizes = h.snap_sizes;
2329         /* Free the extra copy of the object prefix */
2330         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2331         kfree(h.object_prefix);
2332
2333         ret = rbd_dev_snaps_update(rbd_dev);
2334         if (!ret)
2335                 ret = rbd_dev_snaps_register(rbd_dev);
2336
2337         up_write(&rbd_dev->header_rwsem);
2338
2339         return ret;
2340 }
2341
2342 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2343 {
2344         int ret;
2345
2346         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2347         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2348         if (rbd_dev->image_format == 1)
2349                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2350         else
2351                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2352         mutex_unlock(&ctl_mutex);
2353
2354         return ret;
2355 }
2356
2357 static int rbd_init_disk(struct rbd_device *rbd_dev)
2358 {
2359         struct gendisk *disk;
2360         struct request_queue *q;
2361         u64 segment_size;
2362
2363         /* create gendisk info */
2364         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2365         if (!disk)
2366                 return -ENOMEM;
2367
2368         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2369                  rbd_dev->dev_id);
2370         disk->major = rbd_dev->major;
2371         disk->first_minor = 0;
2372         disk->fops = &rbd_bd_ops;
2373         disk->private_data = rbd_dev;
2374
2375         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2376         if (!q)
2377                 goto out_disk;
2378
2379         /* We use the default size, but let's be explicit about it. */
2380         blk_queue_physical_block_size(q, SECTOR_SIZE);
2381
2382         /* set io sizes to object size */
2383         segment_size = rbd_obj_bytes(&rbd_dev->header);
2384         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2385         blk_queue_max_segment_size(q, segment_size);
2386         blk_queue_io_min(q, segment_size);
2387         blk_queue_io_opt(q, segment_size);
2388
2389         blk_queue_merge_bvec(q, rbd_merge_bvec);
2390         disk->queue = q;
2391
2392         q->queuedata = rbd_dev;
2393
2394         rbd_dev->disk = disk;
2395
2396         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2397
2398         return 0;
2399 out_disk:
2400         put_disk(disk);
2401
2402         return -ENOMEM;
2403 }
2404
2405 /*
2406   sysfs
2407 */
2408
2409 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2410 {
2411         return container_of(dev, struct rbd_device, dev);
2412 }
2413
2414 static ssize_t rbd_size_show(struct device *dev,
2415                              struct device_attribute *attr, char *buf)
2416 {
2417         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2418         sector_t size;
2419
2420         down_read(&rbd_dev->header_rwsem);
2421         size = get_capacity(rbd_dev->disk);
2422         up_read(&rbd_dev->header_rwsem);
2423
2424         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2425 }
2426
2427 /*
2428  * Note this shows the features for whatever's mapped, which is not
2429  * necessarily the base image.
2430  */
2431 static ssize_t rbd_features_show(struct device *dev,
2432                              struct device_attribute *attr, char *buf)
2433 {
2434         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2435
2436         return sprintf(buf, "0x%016llx\n",
2437                         (unsigned long long) rbd_dev->mapping.features);
2438 }
2439
2440 static ssize_t rbd_major_show(struct device *dev,
2441                               struct device_attribute *attr, char *buf)
2442 {
2443         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2444
2445         return sprintf(buf, "%d\n", rbd_dev->major);
2446 }
2447
2448 static ssize_t rbd_client_id_show(struct device *dev,
2449                                   struct device_attribute *attr, char *buf)
2450 {
2451         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2452
2453         return sprintf(buf, "client%lld\n",
2454                         ceph_client_id(rbd_dev->rbd_client->client));
2455 }
2456
2457 static ssize_t rbd_pool_show(struct device *dev,
2458                              struct device_attribute *attr, char *buf)
2459 {
2460         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2461
2462         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2463 }
2464
2465 static ssize_t rbd_pool_id_show(struct device *dev,
2466                              struct device_attribute *attr, char *buf)
2467 {
2468         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469
2470         return sprintf(buf, "%llu\n",
2471                 (unsigned long long) rbd_dev->spec->pool_id);
2472 }
2473
2474 static ssize_t rbd_name_show(struct device *dev,
2475                              struct device_attribute *attr, char *buf)
2476 {
2477         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2478
2479         if (rbd_dev->spec->image_name)
2480                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2481
2482         return sprintf(buf, "(unknown)\n");
2483 }
2484
2485 static ssize_t rbd_image_id_show(struct device *dev,
2486                              struct device_attribute *attr, char *buf)
2487 {
2488         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2489
2490         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2491 }
2492
2493 /*
2494  * Shows the name of the currently-mapped snapshot (or
2495  * RBD_SNAP_HEAD_NAME for the base image).
2496  */
2497 static ssize_t rbd_snap_show(struct device *dev,
2498                              struct device_attribute *attr,
2499                              char *buf)
2500 {
2501         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2502
2503         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2504 }
2505
2506 /*
2507  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2508  * for the parent image.  If there is no parent, simply shows
2509  * "(no parent image)".
2510  */
2511 static ssize_t rbd_parent_show(struct device *dev,
2512                              struct device_attribute *attr,
2513                              char *buf)
2514 {
2515         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2516         struct rbd_spec *spec = rbd_dev->parent_spec;
2517         int count;
2518         char *bufp = buf;
2519
2520         if (!spec)
2521                 return sprintf(buf, "(no parent image)\n");
2522
2523         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2524                         (unsigned long long) spec->pool_id, spec->pool_name);
2525         if (count < 0)
2526                 return count;
2527         bufp += count;
2528
2529         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2530                         spec->image_name ? spec->image_name : "(unknown)");
2531         if (count < 0)
2532                 return count;
2533         bufp += count;
2534
2535         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2536                         (unsigned long long) spec->snap_id, spec->snap_name);
2537         if (count < 0)
2538                 return count;
2539         bufp += count;
2540
2541         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2542         if (count < 0)
2543                 return count;
2544         bufp += count;
2545
2546         return (ssize_t) (bufp - buf);
2547 }
2548
2549 static ssize_t rbd_image_refresh(struct device *dev,
2550                                  struct device_attribute *attr,
2551                                  const char *buf,
2552                                  size_t size)
2553 {
2554         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2555         int ret;
2556
2557         ret = rbd_dev_refresh(rbd_dev, NULL);
2558
2559         return ret < 0 ? ret : size;
2560 }
2561
2562 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2563 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2564 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2565 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2566 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2567 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2568 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2569 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2570 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2571 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2572 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2573
2574 static struct attribute *rbd_attrs[] = {
2575         &dev_attr_size.attr,
2576         &dev_attr_features.attr,
2577         &dev_attr_major.attr,
2578         &dev_attr_client_id.attr,
2579         &dev_attr_pool.attr,
2580         &dev_attr_pool_id.attr,
2581         &dev_attr_name.attr,
2582         &dev_attr_image_id.attr,
2583         &dev_attr_current_snap.attr,
2584         &dev_attr_parent.attr,
2585         &dev_attr_refresh.attr,
2586         NULL
2587 };
2588
2589 static struct attribute_group rbd_attr_group = {
2590         .attrs = rbd_attrs,
2591 };
2592
2593 static const struct attribute_group *rbd_attr_groups[] = {
2594         &rbd_attr_group,
2595         NULL
2596 };
2597
2598 static void rbd_sysfs_dev_release(struct device *dev)
2599 {
2600 }
2601
2602 static struct device_type rbd_device_type = {
2603         .name           = "rbd",
2604         .groups         = rbd_attr_groups,
2605         .release        = rbd_sysfs_dev_release,
2606 };
2607
2608
2609 /*
2610   sysfs - snapshots
2611 */
2612
2613 static ssize_t rbd_snap_size_show(struct device *dev,
2614                                   struct device_attribute *attr,
2615                                   char *buf)
2616 {
2617         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2618
2619         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2620 }
2621
2622 static ssize_t rbd_snap_id_show(struct device *dev,
2623                                 struct device_attribute *attr,
2624                                 char *buf)
2625 {
2626         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2627
2628         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2629 }
2630
2631 static ssize_t rbd_snap_features_show(struct device *dev,
2632                                 struct device_attribute *attr,
2633                                 char *buf)
2634 {
2635         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2636
2637         return sprintf(buf, "0x%016llx\n",
2638                         (unsigned long long) snap->features);
2639 }
2640
2641 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2642 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2643 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2644
2645 static struct attribute *rbd_snap_attrs[] = {
2646         &dev_attr_snap_size.attr,
2647         &dev_attr_snap_id.attr,
2648         &dev_attr_snap_features.attr,
2649         NULL,
2650 };
2651
2652 static struct attribute_group rbd_snap_attr_group = {
2653         .attrs = rbd_snap_attrs,
2654 };
2655
2656 static void rbd_snap_dev_release(struct device *dev)
2657 {
2658         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2659         kfree(snap->name);
2660         kfree(snap);
2661 }
2662
2663 static const struct attribute_group *rbd_snap_attr_groups[] = {
2664         &rbd_snap_attr_group,
2665         NULL
2666 };
2667
2668 static struct device_type rbd_snap_device_type = {
2669         .groups         = rbd_snap_attr_groups,
2670         .release        = rbd_snap_dev_release,
2671 };
2672
2673 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2674 {
2675         kref_get(&spec->kref);
2676
2677         return spec;
2678 }
2679
2680 static void rbd_spec_free(struct kref *kref);
2681 static void rbd_spec_put(struct rbd_spec *spec)
2682 {
2683         if (spec)
2684                 kref_put(&spec->kref, rbd_spec_free);
2685 }
2686
2687 static struct rbd_spec *rbd_spec_alloc(void)
2688 {
2689         struct rbd_spec *spec;
2690
2691         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2692         if (!spec)
2693                 return NULL;
2694         kref_init(&spec->kref);
2695
2696         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2697
2698         return spec;
2699 }
2700
2701 static void rbd_spec_free(struct kref *kref)
2702 {
2703         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2704
2705         kfree(spec->pool_name);
2706         kfree(spec->image_id);
2707         kfree(spec->image_name);
2708         kfree(spec->snap_name);
2709         kfree(spec);
2710 }
2711
2712 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2713                                 struct rbd_spec *spec)
2714 {
2715         struct rbd_device *rbd_dev;
2716
2717         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2718         if (!rbd_dev)
2719                 return NULL;
2720
2721         spin_lock_init(&rbd_dev->lock);
2722         rbd_dev->flags = 0;
2723         INIT_LIST_HEAD(&rbd_dev->node);
2724         INIT_LIST_HEAD(&rbd_dev->snaps);
2725         init_rwsem(&rbd_dev->header_rwsem);
2726
2727         rbd_dev->spec = spec;
2728         rbd_dev->rbd_client = rbdc;
2729
2730         /* Initialize the layout used for all rbd requests */
2731
2732         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2733         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2734         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2735         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2736
2737         return rbd_dev;
2738 }
2739
2740 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2741 {
2742         rbd_spec_put(rbd_dev->parent_spec);
2743         kfree(rbd_dev->header_name);
2744         rbd_put_client(rbd_dev->rbd_client);
2745         rbd_spec_put(rbd_dev->spec);
2746         kfree(rbd_dev);
2747 }
2748
2749 static bool rbd_snap_registered(struct rbd_snap *snap)
2750 {
2751         bool ret = snap->dev.type == &rbd_snap_device_type;
2752         bool reg = device_is_registered(&snap->dev);
2753
2754         rbd_assert(!ret ^ reg);
2755
2756         return ret;
2757 }
2758
2759 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2760 {
2761         list_del(&snap->node);
2762         if (device_is_registered(&snap->dev))
2763                 device_unregister(&snap->dev);
2764 }
2765
2766 static int rbd_register_snap_dev(struct rbd_snap *snap,
2767                                   struct device *parent)
2768 {
2769         struct device *dev = &snap->dev;
2770         int ret;
2771
2772         dev->type = &rbd_snap_device_type;
2773         dev->parent = parent;
2774         dev->release = rbd_snap_dev_release;
2775         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2776         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2777
2778         ret = device_register(dev);
2779
2780         return ret;
2781 }
2782
2783 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2784                                                 const char *snap_name,
2785                                                 u64 snap_id, u64 snap_size,
2786                                                 u64 snap_features)
2787 {
2788         struct rbd_snap *snap;
2789         int ret;
2790
2791         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2792         if (!snap)
2793                 return ERR_PTR(-ENOMEM);
2794
2795         ret = -ENOMEM;
2796         snap->name = kstrdup(snap_name, GFP_KERNEL);
2797         if (!snap->name)
2798                 goto err;
2799
2800         snap->id = snap_id;
2801         snap->size = snap_size;
2802         snap->features = snap_features;
2803
2804         return snap;
2805
2806 err:
2807         kfree(snap->name);
2808         kfree(snap);
2809
2810         return ERR_PTR(ret);
2811 }
2812
2813 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2814                 u64 *snap_size, u64 *snap_features)
2815 {
2816         char *snap_name;
2817
2818         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2819
2820         *snap_size = rbd_dev->header.snap_sizes[which];
2821         *snap_features = 0;     /* No features for v1 */
2822
2823         /* Skip over names until we find the one we are looking for */
2824
2825         snap_name = rbd_dev->header.snap_names;
2826         while (which--)
2827                 snap_name += strlen(snap_name) + 1;
2828
2829         return snap_name;
2830 }
2831
2832 /*
2833  * Get the size and object order for an image snapshot, or if
2834  * snap_id is CEPH_NOSNAP, gets this information for the base
2835  * image.
2836  */
2837 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2838                                 u8 *order, u64 *snap_size)
2839 {
2840         __le64 snapid = cpu_to_le64(snap_id);
2841         int ret;
2842         struct {
2843                 u8 order;
2844                 __le64 size;
2845         } __attribute__ ((packed)) size_buf = { 0 };
2846
2847         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2848                                 "rbd", "get_size",
2849                                 (char *) &snapid, sizeof (snapid),
2850                                 (char *) &size_buf, sizeof (size_buf), NULL);
2851         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2852         if (ret < 0)
2853                 return ret;
2854
2855         *order = size_buf.order;
2856         *snap_size = le64_to_cpu(size_buf.size);
2857
2858         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2859                 (unsigned long long) snap_id, (unsigned int) *order,
2860                 (unsigned long long) *snap_size);
2861
2862         return 0;
2863 }
2864
2865 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2866 {
2867         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2868                                         &rbd_dev->header.obj_order,
2869                                         &rbd_dev->header.image_size);
2870 }
2871
2872 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2873 {
2874         void *reply_buf;
2875         int ret;
2876         void *p;
2877
2878         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2879         if (!reply_buf)
2880                 return -ENOMEM;
2881
2882         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2883                                 "rbd", "get_object_prefix",
2884                                 NULL, 0,
2885                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2886         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2887         if (ret < 0)
2888                 goto out;
2889
2890         p = reply_buf;
2891         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2892                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2893                                                 NULL, GFP_NOIO);
2894
2895         if (IS_ERR(rbd_dev->header.object_prefix)) {
2896                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2897                 rbd_dev->header.object_prefix = NULL;
2898         } else {
2899                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2900         }
2901
2902 out:
2903         kfree(reply_buf);
2904
2905         return ret;
2906 }
2907
2908 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2909                 u64 *snap_features)
2910 {
2911         __le64 snapid = cpu_to_le64(snap_id);
2912         struct {
2913                 __le64 features;
2914                 __le64 incompat;
2915         } features_buf = { 0 };
2916         u64 incompat;
2917         int ret;
2918
2919         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2920                                 "rbd", "get_features",
2921                                 (char *) &snapid, sizeof (snapid),
2922                                 (char *) &features_buf, sizeof (features_buf),
2923                                 NULL);
2924         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2925         if (ret < 0)
2926                 return ret;
2927
2928         incompat = le64_to_cpu(features_buf.incompat);
2929         if (incompat & ~RBD_FEATURES_ALL)
2930                 return -ENXIO;
2931
2932         *snap_features = le64_to_cpu(features_buf.features);
2933
2934         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2935                 (unsigned long long) snap_id,
2936                 (unsigned long long) *snap_features,
2937                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2938
2939         return 0;
2940 }
2941
2942 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2943 {
2944         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2945                                                 &rbd_dev->header.features);
2946 }
2947
2948 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2949 {
2950         struct rbd_spec *parent_spec;
2951         size_t size;
2952         void *reply_buf = NULL;
2953         __le64 snapid;
2954         void *p;
2955         void *end;
2956         char *image_id;
2957         u64 overlap;
2958         int ret;
2959
2960         parent_spec = rbd_spec_alloc();
2961         if (!parent_spec)
2962                 return -ENOMEM;
2963
2964         size = sizeof (__le64) +                                /* pool_id */
2965                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2966                 sizeof (__le64) +                               /* snap_id */
2967                 sizeof (__le64);                                /* overlap */
2968         reply_buf = kmalloc(size, GFP_KERNEL);
2969         if (!reply_buf) {
2970                 ret = -ENOMEM;
2971                 goto out_err;
2972         }
2973
2974         snapid = cpu_to_le64(CEPH_NOSNAP);
2975         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2976                                 "rbd", "get_parent",
2977                                 (char *) &snapid, sizeof (snapid),
2978                                 (char *) reply_buf, size, NULL);
2979         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2980         if (ret < 0)
2981                 goto out_err;
2982
2983         ret = -ERANGE;
2984         p = reply_buf;
2985         end = (char *) reply_buf + size;
2986         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2987         if (parent_spec->pool_id == CEPH_NOPOOL)
2988                 goto out;       /* No parent?  No problem. */
2989
2990         /* The ceph file layout needs to fit pool id in 32 bits */
2991
2992         ret = -EIO;
2993         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2994                 goto out;
2995
2996         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2997         if (IS_ERR(image_id)) {
2998                 ret = PTR_ERR(image_id);
2999                 goto out_err;
3000         }
3001         parent_spec->image_id = image_id;
3002         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3003         ceph_decode_64_safe(&p, end, overlap, out_err);
3004
3005         rbd_dev->parent_overlap = overlap;
3006         rbd_dev->parent_spec = parent_spec;
3007         parent_spec = NULL;     /* rbd_dev now owns this */
3008 out:
3009         ret = 0;
3010 out_err:
3011         kfree(reply_buf);
3012         rbd_spec_put(parent_spec);
3013
3014         return ret;
3015 }
3016
3017 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3018 {
3019         size_t image_id_size;
3020         char *image_id;
3021         void *p;
3022         void *end;
3023         size_t size;
3024         void *reply_buf = NULL;
3025         size_t len = 0;
3026         char *image_name = NULL;
3027         int ret;
3028
3029         rbd_assert(!rbd_dev->spec->image_name);
3030
3031         len = strlen(rbd_dev->spec->image_id);
3032         image_id_size = sizeof (__le32) + len;
3033         image_id = kmalloc(image_id_size, GFP_KERNEL);
3034         if (!image_id)
3035                 return NULL;
3036
3037         p = image_id;
3038         end = (char *) image_id + image_id_size;
3039         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3040
3041         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3042         reply_buf = kmalloc(size, GFP_KERNEL);
3043         if (!reply_buf)
3044                 goto out;
3045
3046         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3047                                 "rbd", "dir_get_name",
3048                                 image_id, image_id_size,
3049                                 (char *) reply_buf, size, NULL);
3050         if (ret < 0)
3051                 goto out;
3052         p = reply_buf;
3053         end = (char *) reply_buf + size;
3054         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3055         if (IS_ERR(image_name))
3056                 image_name = NULL;
3057         else
3058                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3059 out:
3060         kfree(reply_buf);
3061         kfree(image_id);
3062
3063         return image_name;
3064 }
3065
3066 /*
3067  * When a parent image gets probed, we only have the pool, image,
3068  * and snapshot ids but not the names of any of them.  This call
3069  * is made later to fill in those names.  It has to be done after
3070  * rbd_dev_snaps_update() has completed because some of the
3071  * information (in particular, snapshot name) is not available
3072  * until then.
3073  */
3074 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3075 {
3076         struct ceph_osd_client *osdc;
3077         const char *name;
3078         void *reply_buf = NULL;
3079         int ret;
3080
3081         if (rbd_dev->spec->pool_name)
3082                 return 0;       /* Already have the names */
3083
3084         /* Look up the pool name */
3085
3086         osdc = &rbd_dev->rbd_client->client->osdc;
3087         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3088         if (!name) {
3089                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3090                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3091                 return -EIO;
3092         }
3093
3094         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3095         if (!rbd_dev->spec->pool_name)
3096                 return -ENOMEM;
3097
3098         /* Fetch the image name; tolerate failure here */
3099
3100         name = rbd_dev_image_name(rbd_dev);
3101         if (name)
3102                 rbd_dev->spec->image_name = (char *) name;
3103         else
3104                 rbd_warn(rbd_dev, "unable to get image name");
3105
3106         /* Look up the snapshot name. */
3107
3108         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3109         if (!name) {
3110                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3111                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3112                 ret = -EIO;
3113                 goto out_err;
3114         }
3115         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3116         if(!rbd_dev->spec->snap_name)
3117                 goto out_err;
3118
3119         return 0;
3120 out_err:
3121         kfree(reply_buf);
3122         kfree(rbd_dev->spec->pool_name);
3123         rbd_dev->spec->pool_name = NULL;
3124
3125         return ret;
3126 }
3127
3128 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3129 {
3130         size_t size;
3131         int ret;
3132         void *reply_buf;
3133         void *p;
3134         void *end;
3135         u64 seq;
3136         u32 snap_count;
3137         struct ceph_snap_context *snapc;
3138         u32 i;
3139
3140         /*
3141          * We'll need room for the seq value (maximum snapshot id),
3142          * snapshot count, and array of that many snapshot ids.
3143          * For now we have a fixed upper limit on the number we're
3144          * prepared to receive.
3145          */
3146         size = sizeof (__le64) + sizeof (__le32) +
3147                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3148         reply_buf = kzalloc(size, GFP_KERNEL);
3149         if (!reply_buf)
3150                 return -ENOMEM;
3151
3152         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3153                                 "rbd", "get_snapcontext",
3154                                 NULL, 0,
3155                                 reply_buf, size, ver);
3156         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3157         if (ret < 0)
3158                 goto out;
3159
3160         ret = -ERANGE;
3161         p = reply_buf;
3162         end = (char *) reply_buf + size;
3163         ceph_decode_64_safe(&p, end, seq, out);
3164         ceph_decode_32_safe(&p, end, snap_count, out);
3165
3166         /*
3167          * Make sure the reported number of snapshot ids wouldn't go
3168          * beyond the end of our buffer.  But before checking that,
3169          * make sure the computed size of the snapshot context we
3170          * allocate is representable in a size_t.
3171          */
3172         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3173                                  / sizeof (u64)) {
3174                 ret = -EINVAL;
3175                 goto out;
3176         }
3177         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3178                 goto out;
3179
3180         size = sizeof (struct ceph_snap_context) +
3181                                 snap_count * sizeof (snapc->snaps[0]);
3182         snapc = kmalloc(size, GFP_KERNEL);
3183         if (!snapc) {
3184                 ret = -ENOMEM;
3185                 goto out;
3186         }
3187
3188         atomic_set(&snapc->nref, 1);
3189         snapc->seq = seq;
3190         snapc->num_snaps = snap_count;
3191         for (i = 0; i < snap_count; i++)
3192                 snapc->snaps[i] = ceph_decode_64(&p);
3193
3194         rbd_dev->header.snapc = snapc;
3195
3196         dout("  snap context seq = %llu, snap_count = %u\n",
3197                 (unsigned long long) seq, (unsigned int) snap_count);
3198
3199 out:
3200         kfree(reply_buf);
3201
3202         return 0;
3203 }
3204
3205 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3206 {
3207         size_t size;
3208         void *reply_buf;
3209         __le64 snap_id;
3210         int ret;
3211         void *p;
3212         void *end;
3213         char *snap_name;
3214
3215         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3216         reply_buf = kmalloc(size, GFP_KERNEL);
3217         if (!reply_buf)
3218                 return ERR_PTR(-ENOMEM);
3219
3220         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3221         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3222                                 "rbd", "get_snapshot_name",
3223                                 (char *) &snap_id, sizeof (snap_id),
3224                                 reply_buf, size, NULL);
3225         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3226         if (ret < 0)
3227                 goto out;
3228
3229         p = reply_buf;
3230         end = (char *) reply_buf + size;
3231         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3232         if (IS_ERR(snap_name)) {
3233                 ret = PTR_ERR(snap_name);
3234                 goto out;
3235         } else {
3236                 dout("  snap_id 0x%016llx snap_name = %s\n",
3237                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3238         }
3239         kfree(reply_buf);
3240
3241         return snap_name;
3242 out:
3243         kfree(reply_buf);
3244
3245         return ERR_PTR(ret);
3246 }
3247
3248 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3249                 u64 *snap_size, u64 *snap_features)
3250 {
3251         u64 snap_id;
3252         u8 order;
3253         int ret;
3254
3255         snap_id = rbd_dev->header.snapc->snaps[which];
3256         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3257         if (ret)
3258                 return ERR_PTR(ret);
3259         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3260         if (ret)
3261                 return ERR_PTR(ret);
3262
3263         return rbd_dev_v2_snap_name(rbd_dev, which);
3264 }
3265
3266 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3267                 u64 *snap_size, u64 *snap_features)
3268 {
3269         if (rbd_dev->image_format == 1)
3270                 return rbd_dev_v1_snap_info(rbd_dev, which,
3271                                         snap_size, snap_features);
3272         if (rbd_dev->image_format == 2)
3273                 return rbd_dev_v2_snap_info(rbd_dev, which,
3274                                         snap_size, snap_features);
3275         return ERR_PTR(-EINVAL);
3276 }
3277
3278 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3279 {
3280         int ret;
3281         __u8 obj_order;
3282
3283         down_write(&rbd_dev->header_rwsem);
3284
3285         /* Grab old order first, to see if it changes */
3286
3287         obj_order = rbd_dev->header.obj_order,
3288         ret = rbd_dev_v2_image_size(rbd_dev);
3289         if (ret)
3290                 goto out;
3291         if (rbd_dev->header.obj_order != obj_order) {
3292                 ret = -EIO;
3293                 goto out;
3294         }
3295         rbd_update_mapping_size(rbd_dev);
3296
3297         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3298         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3299         if (ret)
3300                 goto out;
3301         ret = rbd_dev_snaps_update(rbd_dev);
3302         dout("rbd_dev_snaps_update returned %d\n", ret);
3303         if (ret)
3304                 goto out;
3305         ret = rbd_dev_snaps_register(rbd_dev);
3306         dout("rbd_dev_snaps_register returned %d\n", ret);
3307 out:
3308         up_write(&rbd_dev->header_rwsem);
3309
3310         return ret;
3311 }
3312
3313 /*
3314  * Scan the rbd device's current snapshot list and compare it to the
3315  * newly-received snapshot context.  Remove any existing snapshots
3316  * not present in the new snapshot context.  Add a new snapshot for
3317  * any snaphots in the snapshot context not in the current list.
3318  * And verify there are no changes to snapshots we already know
3319  * about.
3320  *
3321  * Assumes the snapshots in the snapshot context are sorted by
3322  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3323  * are also maintained in that order.)
3324  */
3325 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3326 {
3327         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3328         const u32 snap_count = snapc->num_snaps;
3329         struct list_head *head = &rbd_dev->snaps;
3330         struct list_head *links = head->next;
3331         u32 index = 0;
3332
3333         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3334         while (index < snap_count || links != head) {
3335                 u64 snap_id;
3336                 struct rbd_snap *snap;
3337                 char *snap_name;
3338                 u64 snap_size = 0;
3339                 u64 snap_features = 0;
3340
3341                 snap_id = index < snap_count ? snapc->snaps[index]
3342                                              : CEPH_NOSNAP;
3343                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3344                                      : NULL;
3345                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3346
3347                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3348                         struct list_head *next = links->next;
3349
3350                         /*
3351                          * A previously-existing snapshot is not in
3352                          * the new snap context.
3353                          *
3354                          * If the now missing snapshot is the one the
3355                          * image is mapped to, clear its exists flag
3356                          * so we can avoid sending any more requests
3357                          * to it.
3358                          */
3359                         if (rbd_dev->spec->snap_id == snap->id)
3360                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3361                         rbd_remove_snap_dev(snap);
3362                         dout("%ssnap id %llu has been removed\n",
3363                                 rbd_dev->spec->snap_id == snap->id ?
3364                                                         "mapped " : "",
3365                                 (unsigned long long) snap->id);
3366
3367                         /* Done with this list entry; advance */
3368
3369                         links = next;
3370                         continue;
3371                 }
3372
3373                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3374                                         &snap_size, &snap_features);
3375                 if (IS_ERR(snap_name))
3376                         return PTR_ERR(snap_name);
3377
3378                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3379                         (unsigned long long) snap_id);
3380                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3381                         struct rbd_snap *new_snap;
3382
3383                         /* We haven't seen this snapshot before */
3384
3385                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3386                                         snap_id, snap_size, snap_features);
3387                         if (IS_ERR(new_snap)) {
3388                                 int err = PTR_ERR(new_snap);
3389
3390                                 dout("  failed to add dev, error %d\n", err);
3391
3392                                 return err;
3393                         }
3394
3395                         /* New goes before existing, or at end of list */
3396
3397                         dout("  added dev%s\n", snap ? "" : " at end\n");
3398                         if (snap)
3399                                 list_add_tail(&new_snap->node, &snap->node);
3400                         else
3401                                 list_add_tail(&new_snap->node, head);
3402                 } else {
3403                         /* Already have this one */
3404
3405                         dout("  already present\n");
3406
3407                         rbd_assert(snap->size == snap_size);
3408                         rbd_assert(!strcmp(snap->name, snap_name));
3409                         rbd_assert(snap->features == snap_features);
3410
3411                         /* Done with this list entry; advance */
3412
3413                         links = links->next;
3414                 }
3415
3416                 /* Advance to the next entry in the snapshot context */
3417
3418                 index++;
3419         }
3420         dout("%s: done\n", __func__);
3421
3422         return 0;
3423 }
3424
3425 /*
3426  * Scan the list of snapshots and register the devices for any that
3427  * have not already been registered.
3428  */
3429 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3430 {
3431         struct rbd_snap *snap;
3432         int ret = 0;
3433
3434         dout("%s:\n", __func__);
3435         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3436                 return -EIO;
3437
3438         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3439                 if (!rbd_snap_registered(snap)) {
3440                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3441                         if (ret < 0)
3442                                 break;
3443                 }
3444         }
3445         dout("%s: returning %d\n", __func__, ret);
3446
3447         return ret;
3448 }
3449
3450 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3451 {
3452         struct device *dev;
3453         int ret;
3454
3455         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3456
3457         dev = &rbd_dev->dev;
3458         dev->bus = &rbd_bus_type;
3459         dev->type = &rbd_device_type;
3460         dev->parent = &rbd_root_dev;
3461         dev->release = rbd_dev_release;
3462         dev_set_name(dev, "%d", rbd_dev->dev_id);
3463         ret = device_register(dev);
3464
3465         mutex_unlock(&ctl_mutex);
3466
3467         return ret;
3468 }
3469
3470 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3471 {
3472         device_unregister(&rbd_dev->dev);
3473 }
3474
3475 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3476
3477 /*
3478  * Get a unique rbd identifier for the given new rbd_dev, and add
3479  * the rbd_dev to the global list.  The minimum rbd id is 1.
3480  */
3481 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3482 {
3483         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3484
3485         spin_lock(&rbd_dev_list_lock);
3486         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3487         spin_unlock(&rbd_dev_list_lock);
3488         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3489                 (unsigned long long) rbd_dev->dev_id);
3490 }
3491
3492 /*
3493  * Remove an rbd_dev from the global list, and record that its
3494  * identifier is no longer in use.
3495  */
3496 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3497 {
3498         struct list_head *tmp;
3499         int rbd_id = rbd_dev->dev_id;
3500         int max_id;
3501
3502         rbd_assert(rbd_id > 0);
3503
3504         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3505                 (unsigned long long) rbd_dev->dev_id);
3506         spin_lock(&rbd_dev_list_lock);
3507         list_del_init(&rbd_dev->node);
3508
3509         /*
3510          * If the id being "put" is not the current maximum, there
3511          * is nothing special we need to do.
3512          */
3513         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3514                 spin_unlock(&rbd_dev_list_lock);
3515                 return;
3516         }
3517
3518         /*
3519          * We need to update the current maximum id.  Search the
3520          * list to find out what it is.  We're more likely to find
3521          * the maximum at the end, so search the list backward.
3522          */
3523         max_id = 0;
3524         list_for_each_prev(tmp, &rbd_dev_list) {
3525                 struct rbd_device *rbd_dev;
3526
3527                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3528                 if (rbd_dev->dev_id > max_id)
3529                         max_id = rbd_dev->dev_id;
3530         }
3531         spin_unlock(&rbd_dev_list_lock);
3532
3533         /*
3534          * The max id could have been updated by rbd_dev_id_get(), in
3535          * which case it now accurately reflects the new maximum.
3536          * Be careful not to overwrite the maximum value in that
3537          * case.
3538          */
3539         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3540         dout("  max dev id has been reset\n");
3541 }
3542
3543 /*
3544  * Skips over white space at *buf, and updates *buf to point to the
3545  * first found non-space character (if any). Returns the length of
3546  * the token (string of non-white space characters) found.  Note
3547  * that *buf must be terminated with '\0'.
3548  */
3549 static inline size_t next_token(const char **buf)
3550 {
3551         /*
3552         * These are the characters that produce nonzero for
3553         * isspace() in the "C" and "POSIX" locales.
3554         */
3555         const char *spaces = " \f\n\r\t\v";
3556
3557         *buf += strspn(*buf, spaces);   /* Find start of token */
3558
3559         return strcspn(*buf, spaces);   /* Return token length */
3560 }
3561
3562 /*
3563  * Finds the next token in *buf, and if the provided token buffer is
3564  * big enough, copies the found token into it.  The result, if
3565  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3566  * must be terminated with '\0' on entry.
3567  *
3568  * Returns the length of the token found (not including the '\0').
3569  * Return value will be 0 if no token is found, and it will be >=
3570  * token_size if the token would not fit.
3571  *
3572  * The *buf pointer will be updated to point beyond the end of the
3573  * found token.  Note that this occurs even if the token buffer is
3574  * too small to hold it.
3575  */
3576 static inline size_t copy_token(const char **buf,
3577                                 char *token,
3578                                 size_t token_size)
3579 {
3580         size_t len;
3581
3582         len = next_token(buf);
3583         if (len < token_size) {
3584                 memcpy(token, *buf, len);
3585                 *(token + len) = '\0';
3586         }
3587         *buf += len;
3588
3589         return len;
3590 }
3591
3592 /*
3593  * Finds the next token in *buf, dynamically allocates a buffer big
3594  * enough to hold a copy of it, and copies the token into the new
3595  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3596  * that a duplicate buffer is created even for a zero-length token.
3597  *
3598  * Returns a pointer to the newly-allocated duplicate, or a null
3599  * pointer if memory for the duplicate was not available.  If
3600  * the lenp argument is a non-null pointer, the length of the token
3601  * (not including the '\0') is returned in *lenp.
3602  *
3603  * If successful, the *buf pointer will be updated to point beyond
3604  * the end of the found token.
3605  *
3606  * Note: uses GFP_KERNEL for allocation.
3607  */
3608 static inline char *dup_token(const char **buf, size_t *lenp)
3609 {
3610         char *dup;
3611         size_t len;
3612
3613         len = next_token(buf);
3614         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3615         if (!dup)
3616                 return NULL;
3617         *(dup + len) = '\0';
3618         *buf += len;
3619
3620         if (lenp)
3621                 *lenp = len;
3622
3623         return dup;
3624 }
3625
3626 /*
3627  * Parse the options provided for an "rbd add" (i.e., rbd image
3628  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3629  * and the data written is passed here via a NUL-terminated buffer.
3630  * Returns 0 if successful or an error code otherwise.
3631  *
3632  * The information extracted from these options is recorded in
3633  * the other parameters which return dynamically-allocated
3634  * structures:
3635  *  ceph_opts
3636  *      The address of a pointer that will refer to a ceph options
3637  *      structure.  Caller must release the returned pointer using
3638  *      ceph_destroy_options() when it is no longer needed.
3639  *  rbd_opts
3640  *      Address of an rbd options pointer.  Fully initialized by
3641  *      this function; caller must release with kfree().
3642  *  spec
3643  *      Address of an rbd image specification pointer.  Fully
3644  *      initialized by this function based on parsed options.
3645  *      Caller must release with rbd_spec_put().
3646  *
3647  * The options passed take this form:
3648  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3649  * where:
3650  *  <mon_addrs>
3651  *      A comma-separated list of one or more monitor addresses.
3652  *      A monitor address is an ip address, optionally followed
3653  *      by a port number (separated by a colon).
3654  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3655  *  <options>
3656  *      A comma-separated list of ceph and/or rbd options.
3657  *  <pool_name>
3658  *      The name of the rados pool containing the rbd image.
3659  *  <image_name>
3660  *      The name of the image in that pool to map.
3661  *  <snap_id>
3662  *      An optional snapshot id.  If provided, the mapping will
3663  *      present data from the image at the time that snapshot was
3664  *      created.  The image head is used if no snapshot id is
3665  *      provided.  Snapshot mappings are always read-only.
3666  */
3667 static int rbd_add_parse_args(const char *buf,
3668                                 struct ceph_options **ceph_opts,
3669                                 struct rbd_options **opts,
3670                                 struct rbd_spec **rbd_spec)
3671 {
3672         size_t len;
3673         char *options;
3674         const char *mon_addrs;
3675         size_t mon_addrs_size;
3676         struct rbd_spec *spec = NULL;
3677         struct rbd_options *rbd_opts = NULL;
3678         struct ceph_options *copts;
3679         int ret;
3680
3681         /* The first four tokens are required */
3682
3683         len = next_token(&buf);
3684         if (!len) {
3685                 rbd_warn(NULL, "no monitor address(es) provided");
3686                 return -EINVAL;
3687         }
3688         mon_addrs = buf;
3689         mon_addrs_size = len + 1;
3690         buf += len;
3691
3692         ret = -EINVAL;
3693         options = dup_token(&buf, NULL);
3694         if (!options)
3695                 return -ENOMEM;
3696         if (!*options) {
3697                 rbd_warn(NULL, "no options provided");
3698                 goto out_err;
3699         }
3700
3701         spec = rbd_spec_alloc();
3702         if (!spec)
3703                 goto out_mem;
3704
3705         spec->pool_name = dup_token(&buf, NULL);
3706         if (!spec->pool_name)
3707                 goto out_mem;
3708         if (!*spec->pool_name) {
3709                 rbd_warn(NULL, "no pool name provided");
3710                 goto out_err;
3711         }
3712
3713         spec->image_name = dup_token(&buf, NULL);
3714         if (!spec->image_name)
3715                 goto out_mem;
3716         if (!*spec->image_name) {
3717                 rbd_warn(NULL, "no image name provided");
3718                 goto out_err;
3719         }
3720
3721         /*
3722          * Snapshot name is optional; default is to use "-"
3723          * (indicating the head/no snapshot).
3724          */
3725         len = next_token(&buf);
3726         if (!len) {
3727                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3728                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3729         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3730                 ret = -ENAMETOOLONG;
3731                 goto out_err;
3732         }
3733         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3734         if (!spec->snap_name)
3735                 goto out_mem;
3736         *(spec->snap_name + len) = '\0';
3737
3738         /* Initialize all rbd options to the defaults */
3739
3740         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3741         if (!rbd_opts)
3742                 goto out_mem;
3743
3744         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3745
3746         copts = ceph_parse_options(options, mon_addrs,
3747                                         mon_addrs + mon_addrs_size - 1,
3748                                         parse_rbd_opts_token, rbd_opts);
3749         if (IS_ERR(copts)) {
3750                 ret = PTR_ERR(copts);
3751                 goto out_err;
3752         }
3753         kfree(options);
3754
3755         *ceph_opts = copts;
3756         *opts = rbd_opts;
3757         *rbd_spec = spec;
3758
3759         return 0;
3760 out_mem:
3761         ret = -ENOMEM;
3762 out_err:
3763         kfree(rbd_opts);
3764         rbd_spec_put(spec);
3765         kfree(options);
3766
3767         return ret;
3768 }
3769
3770 /*
3771  * An rbd format 2 image has a unique identifier, distinct from the
3772  * name given to it by the user.  Internally, that identifier is
3773  * what's used to specify the names of objects related to the image.
3774  *
3775  * A special "rbd id" object is used to map an rbd image name to its
3776  * id.  If that object doesn't exist, then there is no v2 rbd image
3777  * with the supplied name.
3778  *
3779  * This function will record the given rbd_dev's image_id field if
3780  * it can be determined, and in that case will return 0.  If any
3781  * errors occur a negative errno will be returned and the rbd_dev's
3782  * image_id field will be unchanged (and should be NULL).
3783  */
3784 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3785 {
3786         int ret;
3787         size_t size;
3788         char *object_name;
3789         void *response;
3790         void *p;
3791
3792         /*
3793          * When probing a parent image, the image id is already
3794          * known (and the image name likely is not).  There's no
3795          * need to fetch the image id again in this case.
3796          */
3797         if (rbd_dev->spec->image_id)
3798                 return 0;
3799
3800         /*
3801          * First, see if the format 2 image id file exists, and if
3802          * so, get the image's persistent id from it.
3803          */
3804         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3805         object_name = kmalloc(size, GFP_NOIO);
3806         if (!object_name)
3807                 return -ENOMEM;
3808         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3809         dout("rbd id object name is %s\n", object_name);
3810
3811         /* Response will be an encoded string, which includes a length */
3812
3813         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3814         response = kzalloc(size, GFP_NOIO);
3815         if (!response) {
3816                 ret = -ENOMEM;
3817                 goto out;
3818         }
3819
3820         ret = rbd_obj_method_sync(rbd_dev, object_name,
3821                                 "rbd", "get_id",
3822                                 NULL, 0,
3823                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3824         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3825         if (ret < 0)
3826                 goto out;
3827
3828         p = response;
3829         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3830                                                 p + RBD_IMAGE_ID_LEN_MAX,
3831                                                 NULL, GFP_NOIO);
3832         if (IS_ERR(rbd_dev->spec->image_id)) {
3833                 ret = PTR_ERR(rbd_dev->spec->image_id);
3834                 rbd_dev->spec->image_id = NULL;
3835         } else {
3836                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3837         }
3838 out:
3839         kfree(response);
3840         kfree(object_name);
3841
3842         return ret;
3843 }
3844
3845 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3846 {
3847         int ret;
3848         size_t size;
3849
3850         /* Version 1 images have no id; empty string is used */
3851
3852         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3853         if (!rbd_dev->spec->image_id)
3854                 return -ENOMEM;
3855
3856         /* Record the header object name for this rbd image. */
3857
3858         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3859         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3860         if (!rbd_dev->header_name) {
3861                 ret = -ENOMEM;
3862                 goto out_err;
3863         }
3864         sprintf(rbd_dev->header_name, "%s%s",
3865                 rbd_dev->spec->image_name, RBD_SUFFIX);
3866
3867         /* Populate rbd image metadata */
3868
3869         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3870         if (ret < 0)
3871                 goto out_err;
3872
3873         /* Version 1 images have no parent (no layering) */
3874
3875         rbd_dev->parent_spec = NULL;
3876         rbd_dev->parent_overlap = 0;
3877
3878         rbd_dev->image_format = 1;
3879
3880         dout("discovered version 1 image, header name is %s\n",
3881                 rbd_dev->header_name);
3882
3883         return 0;
3884
3885 out_err:
3886         kfree(rbd_dev->header_name);
3887         rbd_dev->header_name = NULL;
3888         kfree(rbd_dev->spec->image_id);
3889         rbd_dev->spec->image_id = NULL;
3890
3891         return ret;
3892 }
3893
3894 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3895 {
3896         size_t size;
3897         int ret;
3898         u64 ver = 0;
3899
3900         /*
3901          * Image id was filled in by the caller.  Record the header
3902          * object name for this rbd image.
3903          */
3904         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3905         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3906         if (!rbd_dev->header_name)
3907                 return -ENOMEM;
3908         sprintf(rbd_dev->header_name, "%s%s",
3909                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3910
3911         /* Get the size and object order for the image */
3912
3913         ret = rbd_dev_v2_image_size(rbd_dev);
3914         if (ret < 0)
3915                 goto out_err;
3916
3917         /* Get the object prefix (a.k.a. block_name) for the image */
3918
3919         ret = rbd_dev_v2_object_prefix(rbd_dev);
3920         if (ret < 0)
3921                 goto out_err;
3922
3923         /* Get the and check features for the image */
3924
3925         ret = rbd_dev_v2_features(rbd_dev);
3926         if (ret < 0)
3927                 goto out_err;
3928
3929         /* If the image supports layering, get the parent info */
3930
3931         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3932                 ret = rbd_dev_v2_parent_info(rbd_dev);
3933                 if (ret < 0)
3934                         goto out_err;
3935         }
3936
3937         /* crypto and compression type aren't (yet) supported for v2 images */
3938
3939         rbd_dev->header.crypt_type = 0;
3940         rbd_dev->header.comp_type = 0;
3941
3942         /* Get the snapshot context, plus the header version */
3943
3944         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3945         if (ret)
3946                 goto out_err;
3947         rbd_dev->header.obj_version = ver;
3948
3949         rbd_dev->image_format = 2;
3950
3951         dout("discovered version 2 image, header name is %s\n",
3952                 rbd_dev->header_name);
3953
3954         return 0;
3955 out_err:
3956         rbd_dev->parent_overlap = 0;
3957         rbd_spec_put(rbd_dev->parent_spec);
3958         rbd_dev->parent_spec = NULL;
3959         kfree(rbd_dev->header_name);
3960         rbd_dev->header_name = NULL;
3961         kfree(rbd_dev->header.object_prefix);
3962         rbd_dev->header.object_prefix = NULL;
3963
3964         return ret;
3965 }
3966
3967 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3968 {
3969         int ret;
3970
3971         /* no need to lock here, as rbd_dev is not registered yet */
3972         ret = rbd_dev_snaps_update(rbd_dev);
3973         if (ret)
3974                 return ret;
3975
3976         ret = rbd_dev_probe_update_spec(rbd_dev);
3977         if (ret)
3978                 goto err_out_snaps;
3979
3980         ret = rbd_dev_set_mapping(rbd_dev);
3981         if (ret)
3982                 goto err_out_snaps;
3983
3984         /* generate unique id: find highest unique id, add one */
3985         rbd_dev_id_get(rbd_dev);
3986
3987         /* Fill in the device name, now that we have its id. */
3988         BUILD_BUG_ON(DEV_NAME_LEN
3989                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3990         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3991
3992         /* Get our block major device number. */
3993
3994         ret = register_blkdev(0, rbd_dev->name);
3995         if (ret < 0)
3996                 goto err_out_id;
3997         rbd_dev->major = ret;
3998
3999         /* Set up the blkdev mapping. */
4000
4001         ret = rbd_init_disk(rbd_dev);
4002         if (ret)
4003                 goto err_out_blkdev;
4004
4005         ret = rbd_bus_add_dev(rbd_dev);
4006         if (ret)
4007                 goto err_out_disk;
4008
4009         /*
4010          * At this point cleanup in the event of an error is the job
4011          * of the sysfs code (initiated by rbd_bus_del_dev()).
4012          */
4013         down_write(&rbd_dev->header_rwsem);
4014         ret = rbd_dev_snaps_register(rbd_dev);
4015         up_write(&rbd_dev->header_rwsem);
4016         if (ret)
4017                 goto err_out_bus;
4018
4019         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4020         if (ret)
4021                 goto err_out_bus;
4022
4023         /* Everything's ready.  Announce the disk to the world. */
4024
4025         add_disk(rbd_dev->disk);
4026
4027         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4028                 (unsigned long long) rbd_dev->mapping.size);
4029
4030         return ret;
4031 err_out_bus:
4032         /* this will also clean up rest of rbd_dev stuff */
4033
4034         rbd_bus_del_dev(rbd_dev);
4035
4036         return ret;
4037 err_out_disk:
4038         rbd_free_disk(rbd_dev);
4039 err_out_blkdev:
4040         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4041 err_out_id:
4042         rbd_dev_id_put(rbd_dev);
4043 err_out_snaps:
4044         rbd_remove_all_snaps(rbd_dev);
4045
4046         return ret;
4047 }
4048
4049 /*
4050  * Probe for the existence of the header object for the given rbd
4051  * device.  For format 2 images this includes determining the image
4052  * id.
4053  */
4054 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4055 {
4056         int ret;
4057
4058         /*
4059          * Get the id from the image id object.  If it's not a
4060          * format 2 image, we'll get ENOENT back, and we'll assume
4061          * it's a format 1 image.
4062          */
4063         ret = rbd_dev_image_id(rbd_dev);
4064         if (ret)
4065                 ret = rbd_dev_v1_probe(rbd_dev);
4066         else
4067                 ret = rbd_dev_v2_probe(rbd_dev);
4068         if (ret) {
4069                 dout("probe failed, returning %d\n", ret);
4070
4071                 return ret;
4072         }
4073
4074         ret = rbd_dev_probe_finish(rbd_dev);
4075         if (ret)
4076                 rbd_header_free(&rbd_dev->header);
4077
4078         return ret;
4079 }
4080
4081 static ssize_t rbd_add(struct bus_type *bus,
4082                        const char *buf,
4083                        size_t count)
4084 {
4085         struct rbd_device *rbd_dev = NULL;
4086         struct ceph_options *ceph_opts = NULL;
4087         struct rbd_options *rbd_opts = NULL;
4088         struct rbd_spec *spec = NULL;
4089         struct rbd_client *rbdc;
4090         struct ceph_osd_client *osdc;
4091         int rc = -ENOMEM;
4092
4093         if (!try_module_get(THIS_MODULE))
4094                 return -ENODEV;
4095
4096         /* parse add command */
4097         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4098         if (rc < 0)
4099                 goto err_out_module;
4100
4101         rbdc = rbd_get_client(ceph_opts);
4102         if (IS_ERR(rbdc)) {
4103                 rc = PTR_ERR(rbdc);
4104                 goto err_out_args;
4105         }
4106         ceph_opts = NULL;       /* rbd_dev client now owns this */
4107
4108         /* pick the pool */
4109         osdc = &rbdc->client->osdc;
4110         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4111         if (rc < 0)
4112                 goto err_out_client;
4113         spec->pool_id = (u64) rc;
4114
4115         /* The ceph file layout needs to fit pool id in 32 bits */
4116
4117         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4118                 rc = -EIO;
4119                 goto err_out_client;
4120         }
4121
4122         rbd_dev = rbd_dev_create(rbdc, spec);
4123         if (!rbd_dev)
4124                 goto err_out_client;
4125         rbdc = NULL;            /* rbd_dev now owns this */
4126         spec = NULL;            /* rbd_dev now owns this */
4127
4128         rbd_dev->mapping.read_only = rbd_opts->read_only;
4129         kfree(rbd_opts);
4130         rbd_opts = NULL;        /* done with this */
4131
4132         rc = rbd_dev_probe(rbd_dev);
4133         if (rc < 0)
4134                 goto err_out_rbd_dev;
4135
4136         return count;
4137 err_out_rbd_dev:
4138         rbd_dev_destroy(rbd_dev);
4139 err_out_client:
4140         rbd_put_client(rbdc);
4141 err_out_args:
4142         if (ceph_opts)
4143                 ceph_destroy_options(ceph_opts);
4144         kfree(rbd_opts);
4145         rbd_spec_put(spec);
4146 err_out_module:
4147         module_put(THIS_MODULE);
4148
4149         dout("Error adding device %s\n", buf);
4150
4151         return (ssize_t) rc;
4152 }
4153
4154 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4155 {
4156         struct list_head *tmp;
4157         struct rbd_device *rbd_dev;
4158
4159         spin_lock(&rbd_dev_list_lock);
4160         list_for_each(tmp, &rbd_dev_list) {
4161                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4162                 if (rbd_dev->dev_id == dev_id) {
4163                         spin_unlock(&rbd_dev_list_lock);
4164                         return rbd_dev;
4165                 }
4166         }
4167         spin_unlock(&rbd_dev_list_lock);
4168         return NULL;
4169 }
4170
4171 static void rbd_dev_release(struct device *dev)
4172 {
4173         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4174
4175         if (rbd_dev->watch_event)
4176                 rbd_dev_header_watch_sync(rbd_dev, 0);
4177
4178         /* clean up and free blkdev */
4179         rbd_free_disk(rbd_dev);
4180         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4181
4182         /* release allocated disk header fields */
4183         rbd_header_free(&rbd_dev->header);
4184
4185         /* done with the id, and with the rbd_dev */
4186         rbd_dev_id_put(rbd_dev);
4187         rbd_assert(rbd_dev->rbd_client != NULL);
4188         rbd_dev_destroy(rbd_dev);
4189
4190         /* release module ref */
4191         module_put(THIS_MODULE);
4192 }
4193
4194 static ssize_t rbd_remove(struct bus_type *bus,
4195                           const char *buf,
4196                           size_t count)
4197 {
4198         struct rbd_device *rbd_dev = NULL;
4199         int target_id, rc;
4200         unsigned long ul;
4201         int ret = count;
4202
4203         rc = strict_strtoul(buf, 10, &ul);
4204         if (rc)
4205                 return rc;
4206
4207         /* convert to int; abort if we lost anything in the conversion */
4208         target_id = (int) ul;
4209         if (target_id != ul)
4210                 return -EINVAL;
4211
4212         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4213
4214         rbd_dev = __rbd_get_dev(target_id);
4215         if (!rbd_dev) {
4216                 ret = -ENOENT;
4217                 goto done;
4218         }
4219
4220         spin_lock_irq(&rbd_dev->lock);
4221         if (rbd_dev->open_count)
4222                 ret = -EBUSY;
4223         else
4224                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4225         spin_unlock_irq(&rbd_dev->lock);
4226         if (ret < 0)
4227                 goto done;
4228
4229         rbd_remove_all_snaps(rbd_dev);
4230         rbd_bus_del_dev(rbd_dev);
4231
4232 done:
4233         mutex_unlock(&ctl_mutex);
4234
4235         return ret;
4236 }
4237
4238 /*
4239  * create control files in sysfs
4240  * /sys/bus/rbd/...
4241  */
4242 static int rbd_sysfs_init(void)
4243 {
4244         int ret;
4245
4246         ret = device_register(&rbd_root_dev);
4247         if (ret < 0)
4248                 return ret;
4249
4250         ret = bus_register(&rbd_bus_type);
4251         if (ret < 0)
4252                 device_unregister(&rbd_root_dev);
4253
4254         return ret;
4255 }
4256
4257 static void rbd_sysfs_cleanup(void)
4258 {
4259         bus_unregister(&rbd_bus_type);
4260         device_unregister(&rbd_root_dev);
4261 }
4262
4263 static int __init rbd_init(void)
4264 {
4265         int rc;
4266
4267         if (!libceph_compatible(NULL)) {
4268                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4269
4270                 return -EINVAL;
4271         }
4272         rc = rbd_sysfs_init();
4273         if (rc)
4274                 return rc;
4275         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4276         return 0;
4277 }
4278
4279 static void __exit rbd_exit(void)
4280 {
4281         rbd_sysfs_cleanup();
4282 }
4283
4284 module_init(rbd_init);
4285 module_exit(rbd_exit);
4286
4287 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4288 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4289 MODULE_DESCRIPTION("rados block device");
4290
4291 /* following authorship retained from original osdblk.c */
4292 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4293
4294 MODULE_LICENSE("GPL");