]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
libceph: add data pointers in osd op structures
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING      1
77
78 /* Features supported by this (client software) implementation. */
79
80 #define RBD_FEATURES_ALL          (0)
81
82 /*
83  * An RBD device name will be "rbd#", where the "rbd" comes from
84  * RBD_DRV_NAME above, and # is a unique integer identifier.
85  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
86  * enough to hold all possible device names.
87  */
88 #define DEV_NAME_LEN            32
89 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
90
91 /*
92  * block device image metadata (in-memory version)
93  */
94 struct rbd_image_header {
95         /* These four fields never change for a given rbd image */
96         char *object_prefix;
97         u64 features;
98         __u8 obj_order;
99         __u8 crypt_type;
100         __u8 comp_type;
101
102         /* The remaining fields need to be updated occasionally */
103         u64 image_size;
104         struct ceph_snap_context *snapc;
105         char *snap_names;
106         u64 *snap_sizes;
107
108         u64 obj_version;
109 };
110
111 /*
112  * An rbd image specification.
113  *
114  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
115  * identify an image.  Each rbd_dev structure includes a pointer to
116  * an rbd_spec structure that encapsulates this identity.
117  *
118  * Each of the id's in an rbd_spec has an associated name.  For a
119  * user-mapped image, the names are supplied and the id's associated
120  * with them are looked up.  For a layered image, a parent image is
121  * defined by the tuple, and the names are looked up.
122  *
123  * An rbd_dev structure contains a parent_spec pointer which is
124  * non-null if the image it represents is a child in a layered
125  * image.  This pointer will refer to the rbd_spec structure used
126  * by the parent rbd_dev for its own identity (i.e., the structure
127  * is shared between the parent and child).
128  *
129  * Since these structures are populated once, during the discovery
130  * phase of image construction, they are effectively immutable so
131  * we make no effort to synchronize access to them.
132  *
133  * Note that code herein does not assume the image name is known (it
134  * could be a null pointer).
135  */
136 struct rbd_spec {
137         u64             pool_id;
138         char            *pool_name;
139
140         char            *image_id;
141         char            *image_name;
142
143         u64             snap_id;
144         char            *snap_name;
145
146         struct kref     kref;
147 };
148
149 /*
150  * an instance of the client.  multiple devices may share an rbd client.
151  */
152 struct rbd_client {
153         struct ceph_client      *client;
154         struct kref             kref;
155         struct list_head        node;
156 };
157
158 struct rbd_img_request;
159 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
160
161 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
162
163 struct rbd_obj_request;
164 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
165
166 enum obj_request_type {
167         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
168 };
169
170 struct rbd_obj_request {
171         const char              *object_name;
172         u64                     offset;         /* object start byte */
173         u64                     length;         /* bytes from offset */
174
175         struct rbd_img_request  *img_request;
176         struct list_head        links;          /* img_request->obj_requests */
177         u32                     which;          /* posn image request list */
178
179         enum obj_request_type   type;
180         union {
181                 struct bio      *bio_list;
182                 struct {
183                         struct page     **pages;
184                         u32             page_count;
185                 };
186         };
187
188         struct ceph_osd_request *osd_req;
189
190         u64                     xferred;        /* bytes transferred */
191         u64                     version;
192         int                     result;
193         atomic_t                done;
194
195         rbd_obj_callback_t      callback;
196         struct completion       completion;
197
198         struct kref             kref;
199 };
200
201 struct rbd_img_request {
202         struct request          *rq;
203         struct rbd_device       *rbd_dev;
204         u64                     offset; /* starting image byte offset */
205         u64                     length; /* byte count from offset */
206         bool                    write_request;  /* false for read */
207         union {
208                 struct ceph_snap_context *snapc;        /* for writes */
209                 u64             snap_id;                /* for reads */
210         };
211         spinlock_t              completion_lock;/* protects next_completion */
212         u32                     next_completion;
213         rbd_img_callback_t      callback;
214
215         u32                     obj_request_count;
216         struct list_head        obj_requests;   /* rbd_obj_request structs */
217
218         struct kref             kref;
219 };
220
221 #define for_each_obj_request(ireq, oreq) \
222         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
223 #define for_each_obj_request_from(ireq, oreq) \
224         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
225 #define for_each_obj_request_safe(ireq, oreq, n) \
226         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
227
228 struct rbd_snap {
229         struct  device          dev;
230         const char              *name;
231         u64                     size;
232         struct list_head        node;
233         u64                     id;
234         u64                     features;
235 };
236
237 struct rbd_mapping {
238         u64                     size;
239         u64                     features;
240         bool                    read_only;
241 };
242
243 /*
244  * a single device
245  */
246 struct rbd_device {
247         int                     dev_id;         /* blkdev unique id */
248
249         int                     major;          /* blkdev assigned major */
250         struct gendisk          *disk;          /* blkdev's gendisk and rq */
251
252         u32                     image_format;   /* Either 1 or 2 */
253         struct rbd_client       *rbd_client;
254
255         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
256
257         spinlock_t              lock;           /* queue, flags, open_count */
258
259         struct rbd_image_header header;
260         unsigned long           flags;          /* possibly lock protected */
261         struct rbd_spec         *spec;
262
263         char                    *header_name;
264
265         struct ceph_file_layout layout;
266
267         struct ceph_osd_event   *watch_event;
268         struct rbd_obj_request  *watch_request;
269
270         struct rbd_spec         *parent_spec;
271         u64                     parent_overlap;
272
273         /* protects updating the header */
274         struct rw_semaphore     header_rwsem;
275
276         struct rbd_mapping      mapping;
277
278         struct list_head        node;
279
280         /* list of snapshots */
281         struct list_head        snaps;
282
283         /* sysfs related */
284         struct device           dev;
285         unsigned long           open_count;     /* protected by lock */
286 };
287
288 /*
289  * Flag bits for rbd_dev->flags.  If atomicity is required,
290  * rbd_dev->lock is used to protect access.
291  *
292  * Currently, only the "removing" flag (which is coupled with the
293  * "open_count" field) requires atomic access.
294  */
295 enum rbd_dev_flags {
296         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
297         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
298 };
299
300 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
301
302 static LIST_HEAD(rbd_dev_list);    /* devices */
303 static DEFINE_SPINLOCK(rbd_dev_list_lock);
304
305 static LIST_HEAD(rbd_client_list);              /* clients */
306 static DEFINE_SPINLOCK(rbd_client_list_lock);
307
308 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
309 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
310
311 static void rbd_dev_release(struct device *dev);
312 static void rbd_remove_snap_dev(struct rbd_snap *snap);
313
314 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
315                        size_t count);
316 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
317                           size_t count);
318
319 static struct bus_attribute rbd_bus_attrs[] = {
320         __ATTR(add, S_IWUSR, NULL, rbd_add),
321         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
322         __ATTR_NULL
323 };
324
325 static struct bus_type rbd_bus_type = {
326         .name           = "rbd",
327         .bus_attrs      = rbd_bus_attrs,
328 };
329
330 static void rbd_root_dev_release(struct device *dev)
331 {
332 }
333
334 static struct device rbd_root_dev = {
335         .init_name =    "rbd",
336         .release =      rbd_root_dev_release,
337 };
338
339 static __printf(2, 3)
340 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
341 {
342         struct va_format vaf;
343         va_list args;
344
345         va_start(args, fmt);
346         vaf.fmt = fmt;
347         vaf.va = &args;
348
349         if (!rbd_dev)
350                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
351         else if (rbd_dev->disk)
352                 printk(KERN_WARNING "%s: %s: %pV\n",
353                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
354         else if (rbd_dev->spec && rbd_dev->spec->image_name)
355                 printk(KERN_WARNING "%s: image %s: %pV\n",
356                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
357         else if (rbd_dev->spec && rbd_dev->spec->image_id)
358                 printk(KERN_WARNING "%s: id %s: %pV\n",
359                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
360         else    /* punt */
361                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
362                         RBD_DRV_NAME, rbd_dev, &vaf);
363         va_end(args);
364 }
365
366 #ifdef RBD_DEBUG
367 #define rbd_assert(expr)                                                \
368                 if (unlikely(!(expr))) {                                \
369                         printk(KERN_ERR "\nAssertion failure in %s() "  \
370                                                 "at line %d:\n\n"       \
371                                         "\trbd_assert(%s);\n\n",        \
372                                         __func__, __LINE__, #expr);     \
373                         BUG();                                          \
374                 }
375 #else /* !RBD_DEBUG */
376 #  define rbd_assert(expr)      ((void) 0)
377 #endif /* !RBD_DEBUG */
378
379 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
380 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
381
382 static int rbd_open(struct block_device *bdev, fmode_t mode)
383 {
384         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
385         bool removing = false;
386
387         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
388                 return -EROFS;
389
390         spin_lock_irq(&rbd_dev->lock);
391         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
392                 removing = true;
393         else
394                 rbd_dev->open_count++;
395         spin_unlock_irq(&rbd_dev->lock);
396         if (removing)
397                 return -ENOENT;
398
399         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
400         (void) get_device(&rbd_dev->dev);
401         set_device_ro(bdev, rbd_dev->mapping.read_only);
402         mutex_unlock(&ctl_mutex);
403
404         return 0;
405 }
406
407 static int rbd_release(struct gendisk *disk, fmode_t mode)
408 {
409         struct rbd_device *rbd_dev = disk->private_data;
410         unsigned long open_count_before;
411
412         spin_lock_irq(&rbd_dev->lock);
413         open_count_before = rbd_dev->open_count--;
414         spin_unlock_irq(&rbd_dev->lock);
415         rbd_assert(open_count_before > 0);
416
417         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
418         put_device(&rbd_dev->dev);
419         mutex_unlock(&ctl_mutex);
420
421         return 0;
422 }
423
424 static const struct block_device_operations rbd_bd_ops = {
425         .owner                  = THIS_MODULE,
426         .open                   = rbd_open,
427         .release                = rbd_release,
428 };
429
430 /*
431  * Initialize an rbd client instance.
432  * We own *ceph_opts.
433  */
434 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
435 {
436         struct rbd_client *rbdc;
437         int ret = -ENOMEM;
438
439         dout("%s:\n", __func__);
440         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
441         if (!rbdc)
442                 goto out_opt;
443
444         kref_init(&rbdc->kref);
445         INIT_LIST_HEAD(&rbdc->node);
446
447         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448
449         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
450         if (IS_ERR(rbdc->client))
451                 goto out_mutex;
452         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
453
454         ret = ceph_open_session(rbdc->client);
455         if (ret < 0)
456                 goto out_err;
457
458         spin_lock(&rbd_client_list_lock);
459         list_add_tail(&rbdc->node, &rbd_client_list);
460         spin_unlock(&rbd_client_list_lock);
461
462         mutex_unlock(&ctl_mutex);
463         dout("%s: rbdc %p\n", __func__, rbdc);
464
465         return rbdc;
466
467 out_err:
468         ceph_destroy_client(rbdc->client);
469 out_mutex:
470         mutex_unlock(&ctl_mutex);
471         kfree(rbdc);
472 out_opt:
473         if (ceph_opts)
474                 ceph_destroy_options(ceph_opts);
475         dout("%s: error %d\n", __func__, ret);
476
477         return ERR_PTR(ret);
478 }
479
480 /*
481  * Find a ceph client with specific addr and configuration.  If
482  * found, bump its reference count.
483  */
484 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
485 {
486         struct rbd_client *client_node;
487         bool found = false;
488
489         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
490                 return NULL;
491
492         spin_lock(&rbd_client_list_lock);
493         list_for_each_entry(client_node, &rbd_client_list, node) {
494                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
495                         kref_get(&client_node->kref);
496                         found = true;
497                         break;
498                 }
499         }
500         spin_unlock(&rbd_client_list_lock);
501
502         return found ? client_node : NULL;
503 }
504
505 /*
506  * mount options
507  */
508 enum {
509         Opt_last_int,
510         /* int args above */
511         Opt_last_string,
512         /* string args above */
513         Opt_read_only,
514         Opt_read_write,
515         /* Boolean args above */
516         Opt_last_bool,
517 };
518
519 static match_table_t rbd_opts_tokens = {
520         /* int args above */
521         /* string args above */
522         {Opt_read_only, "read_only"},
523         {Opt_read_only, "ro"},          /* Alternate spelling */
524         {Opt_read_write, "read_write"},
525         {Opt_read_write, "rw"},         /* Alternate spelling */
526         /* Boolean args above */
527         {-1, NULL}
528 };
529
530 struct rbd_options {
531         bool    read_only;
532 };
533
534 #define RBD_READ_ONLY_DEFAULT   false
535
536 static int parse_rbd_opts_token(char *c, void *private)
537 {
538         struct rbd_options *rbd_opts = private;
539         substring_t argstr[MAX_OPT_ARGS];
540         int token, intval, ret;
541
542         token = match_token(c, rbd_opts_tokens, argstr);
543         if (token < 0)
544                 return -EINVAL;
545
546         if (token < Opt_last_int) {
547                 ret = match_int(&argstr[0], &intval);
548                 if (ret < 0) {
549                         pr_err("bad mount option arg (not int) "
550                                "at '%s'\n", c);
551                         return ret;
552                 }
553                 dout("got int token %d val %d\n", token, intval);
554         } else if (token > Opt_last_int && token < Opt_last_string) {
555                 dout("got string token %d val %s\n", token,
556                      argstr[0].from);
557         } else if (token > Opt_last_string && token < Opt_last_bool) {
558                 dout("got Boolean token %d\n", token);
559         } else {
560                 dout("got token %d\n", token);
561         }
562
563         switch (token) {
564         case Opt_read_only:
565                 rbd_opts->read_only = true;
566                 break;
567         case Opt_read_write:
568                 rbd_opts->read_only = false;
569                 break;
570         default:
571                 rbd_assert(false);
572                 break;
573         }
574         return 0;
575 }
576
577 /*
578  * Get a ceph client with specific addr and configuration, if one does
579  * not exist create it.
580  */
581 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
582 {
583         struct rbd_client *rbdc;
584
585         rbdc = rbd_client_find(ceph_opts);
586         if (rbdc)       /* using an existing client */
587                 ceph_destroy_options(ceph_opts);
588         else
589                 rbdc = rbd_client_create(ceph_opts);
590
591         return rbdc;
592 }
593
594 /*
595  * Destroy ceph client
596  *
597  * Caller must hold rbd_client_list_lock.
598  */
599 static void rbd_client_release(struct kref *kref)
600 {
601         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
602
603         dout("%s: rbdc %p\n", __func__, rbdc);
604         spin_lock(&rbd_client_list_lock);
605         list_del(&rbdc->node);
606         spin_unlock(&rbd_client_list_lock);
607
608         ceph_destroy_client(rbdc->client);
609         kfree(rbdc);
610 }
611
612 /*
613  * Drop reference to ceph client node. If it's not referenced anymore, release
614  * it.
615  */
616 static void rbd_put_client(struct rbd_client *rbdc)
617 {
618         if (rbdc)
619                 kref_put(&rbdc->kref, rbd_client_release);
620 }
621
622 static bool rbd_image_format_valid(u32 image_format)
623 {
624         return image_format == 1 || image_format == 2;
625 }
626
627 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
628 {
629         size_t size;
630         u32 snap_count;
631
632         /* The header has to start with the magic rbd header text */
633         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
634                 return false;
635
636         /* The bio layer requires at least sector-sized I/O */
637
638         if (ondisk->options.order < SECTOR_SHIFT)
639                 return false;
640
641         /* If we use u64 in a few spots we may be able to loosen this */
642
643         if (ondisk->options.order > 8 * sizeof (int) - 1)
644                 return false;
645
646         /*
647          * The size of a snapshot header has to fit in a size_t, and
648          * that limits the number of snapshots.
649          */
650         snap_count = le32_to_cpu(ondisk->snap_count);
651         size = SIZE_MAX - sizeof (struct ceph_snap_context);
652         if (snap_count > size / sizeof (__le64))
653                 return false;
654
655         /*
656          * Not only that, but the size of the entire the snapshot
657          * header must also be representable in a size_t.
658          */
659         size -= snap_count * sizeof (__le64);
660         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
661                 return false;
662
663         return true;
664 }
665
666 /*
667  * Create a new header structure, translate header format from the on-disk
668  * header.
669  */
670 static int rbd_header_from_disk(struct rbd_image_header *header,
671                                  struct rbd_image_header_ondisk *ondisk)
672 {
673         u32 snap_count;
674         size_t len;
675         size_t size;
676         u32 i;
677
678         memset(header, 0, sizeof (*header));
679
680         snap_count = le32_to_cpu(ondisk->snap_count);
681
682         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
683         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
684         if (!header->object_prefix)
685                 return -ENOMEM;
686         memcpy(header->object_prefix, ondisk->object_prefix, len);
687         header->object_prefix[len] = '\0';
688
689         if (snap_count) {
690                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
691
692                 /* Save a copy of the snapshot names */
693
694                 if (snap_names_len > (u64) SIZE_MAX)
695                         return -EIO;
696                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
697                 if (!header->snap_names)
698                         goto out_err;
699                 /*
700                  * Note that rbd_dev_v1_header_read() guarantees
701                  * the ondisk buffer we're working with has
702                  * snap_names_len bytes beyond the end of the
703                  * snapshot id array, this memcpy() is safe.
704                  */
705                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
706                         snap_names_len);
707
708                 /* Record each snapshot's size */
709
710                 size = snap_count * sizeof (*header->snap_sizes);
711                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
712                 if (!header->snap_sizes)
713                         goto out_err;
714                 for (i = 0; i < snap_count; i++)
715                         header->snap_sizes[i] =
716                                 le64_to_cpu(ondisk->snaps[i].image_size);
717         } else {
718                 WARN_ON(ondisk->snap_names_len);
719                 header->snap_names = NULL;
720                 header->snap_sizes = NULL;
721         }
722
723         header->features = 0;   /* No features support in v1 images */
724         header->obj_order = ondisk->options.order;
725         header->crypt_type = ondisk->options.crypt_type;
726         header->comp_type = ondisk->options.comp_type;
727
728         /* Allocate and fill in the snapshot context */
729
730         header->image_size = le64_to_cpu(ondisk->image_size);
731         size = sizeof (struct ceph_snap_context);
732         size += snap_count * sizeof (header->snapc->snaps[0]);
733         header->snapc = kzalloc(size, GFP_KERNEL);
734         if (!header->snapc)
735                 goto out_err;
736
737         atomic_set(&header->snapc->nref, 1);
738         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
739         header->snapc->num_snaps = snap_count;
740         for (i = 0; i < snap_count; i++)
741                 header->snapc->snaps[i] =
742                         le64_to_cpu(ondisk->snaps[i].id);
743
744         return 0;
745
746 out_err:
747         kfree(header->snap_sizes);
748         header->snap_sizes = NULL;
749         kfree(header->snap_names);
750         header->snap_names = NULL;
751         kfree(header->object_prefix);
752         header->object_prefix = NULL;
753
754         return -ENOMEM;
755 }
756
757 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
758 {
759         struct rbd_snap *snap;
760
761         if (snap_id == CEPH_NOSNAP)
762                 return RBD_SNAP_HEAD_NAME;
763
764         list_for_each_entry(snap, &rbd_dev->snaps, node)
765                 if (snap_id == snap->id)
766                         return snap->name;
767
768         return NULL;
769 }
770
771 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
772 {
773
774         struct rbd_snap *snap;
775
776         list_for_each_entry(snap, &rbd_dev->snaps, node) {
777                 if (!strcmp(snap_name, snap->name)) {
778                         rbd_dev->spec->snap_id = snap->id;
779                         rbd_dev->mapping.size = snap->size;
780                         rbd_dev->mapping.features = snap->features;
781
782                         return 0;
783                 }
784         }
785
786         return -ENOENT;
787 }
788
789 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
790 {
791         int ret;
792
793         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
794                     sizeof (RBD_SNAP_HEAD_NAME))) {
795                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
796                 rbd_dev->mapping.size = rbd_dev->header.image_size;
797                 rbd_dev->mapping.features = rbd_dev->header.features;
798                 ret = 0;
799         } else {
800                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
801                 if (ret < 0)
802                         goto done;
803                 rbd_dev->mapping.read_only = true;
804         }
805         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
806
807 done:
808         return ret;
809 }
810
811 static void rbd_header_free(struct rbd_image_header *header)
812 {
813         kfree(header->object_prefix);
814         header->object_prefix = NULL;
815         kfree(header->snap_sizes);
816         header->snap_sizes = NULL;
817         kfree(header->snap_names);
818         header->snap_names = NULL;
819         ceph_put_snap_context(header->snapc);
820         header->snapc = NULL;
821 }
822
823 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
824 {
825         char *name;
826         u64 segment;
827         int ret;
828
829         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
830         if (!name)
831                 return NULL;
832         segment = offset >> rbd_dev->header.obj_order;
833         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
834                         rbd_dev->header.object_prefix, segment);
835         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
836                 pr_err("error formatting segment name for #%llu (%d)\n",
837                         segment, ret);
838                 kfree(name);
839                 name = NULL;
840         }
841
842         return name;
843 }
844
845 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
846 {
847         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
848
849         return offset & (segment_size - 1);
850 }
851
852 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
853                                 u64 offset, u64 length)
854 {
855         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
856
857         offset &= segment_size - 1;
858
859         rbd_assert(length <= U64_MAX - offset);
860         if (offset + length > segment_size)
861                 length = segment_size - offset;
862
863         return length;
864 }
865
866 /*
867  * returns the size of an object in the image
868  */
869 static u64 rbd_obj_bytes(struct rbd_image_header *header)
870 {
871         return 1 << header->obj_order;
872 }
873
874 /*
875  * bio helpers
876  */
877
878 static void bio_chain_put(struct bio *chain)
879 {
880         struct bio *tmp;
881
882         while (chain) {
883                 tmp = chain;
884                 chain = chain->bi_next;
885                 bio_put(tmp);
886         }
887 }
888
889 /*
890  * zeros a bio chain, starting at specific offset
891  */
892 static void zero_bio_chain(struct bio *chain, int start_ofs)
893 {
894         struct bio_vec *bv;
895         unsigned long flags;
896         void *buf;
897         int i;
898         int pos = 0;
899
900         while (chain) {
901                 bio_for_each_segment(bv, chain, i) {
902                         if (pos + bv->bv_len > start_ofs) {
903                                 int remainder = max(start_ofs - pos, 0);
904                                 buf = bvec_kmap_irq(bv, &flags);
905                                 memset(buf + remainder, 0,
906                                        bv->bv_len - remainder);
907                                 bvec_kunmap_irq(buf, &flags);
908                         }
909                         pos += bv->bv_len;
910                 }
911
912                 chain = chain->bi_next;
913         }
914 }
915
916 /*
917  * Clone a portion of a bio, starting at the given byte offset
918  * and continuing for the number of bytes indicated.
919  */
920 static struct bio *bio_clone_range(struct bio *bio_src,
921                                         unsigned int offset,
922                                         unsigned int len,
923                                         gfp_t gfpmask)
924 {
925         struct bio_vec *bv;
926         unsigned int resid;
927         unsigned short idx;
928         unsigned int voff;
929         unsigned short end_idx;
930         unsigned short vcnt;
931         struct bio *bio;
932
933         /* Handle the easy case for the caller */
934
935         if (!offset && len == bio_src->bi_size)
936                 return bio_clone(bio_src, gfpmask);
937
938         if (WARN_ON_ONCE(!len))
939                 return NULL;
940         if (WARN_ON_ONCE(len > bio_src->bi_size))
941                 return NULL;
942         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
943                 return NULL;
944
945         /* Find first affected segment... */
946
947         resid = offset;
948         __bio_for_each_segment(bv, bio_src, idx, 0) {
949                 if (resid < bv->bv_len)
950                         break;
951                 resid -= bv->bv_len;
952         }
953         voff = resid;
954
955         /* ...and the last affected segment */
956
957         resid += len;
958         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
959                 if (resid <= bv->bv_len)
960                         break;
961                 resid -= bv->bv_len;
962         }
963         vcnt = end_idx - idx + 1;
964
965         /* Build the clone */
966
967         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
968         if (!bio)
969                 return NULL;    /* ENOMEM */
970
971         bio->bi_bdev = bio_src->bi_bdev;
972         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
973         bio->bi_rw = bio_src->bi_rw;
974         bio->bi_flags |= 1 << BIO_CLONED;
975
976         /*
977          * Copy over our part of the bio_vec, then update the first
978          * and last (or only) entries.
979          */
980         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
981                         vcnt * sizeof (struct bio_vec));
982         bio->bi_io_vec[0].bv_offset += voff;
983         if (vcnt > 1) {
984                 bio->bi_io_vec[0].bv_len -= voff;
985                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
986         } else {
987                 bio->bi_io_vec[0].bv_len = len;
988         }
989
990         bio->bi_vcnt = vcnt;
991         bio->bi_size = len;
992         bio->bi_idx = 0;
993
994         return bio;
995 }
996
997 /*
998  * Clone a portion of a bio chain, starting at the given byte offset
999  * into the first bio in the source chain and continuing for the
1000  * number of bytes indicated.  The result is another bio chain of
1001  * exactly the given length, or a null pointer on error.
1002  *
1003  * The bio_src and offset parameters are both in-out.  On entry they
1004  * refer to the first source bio and the offset into that bio where
1005  * the start of data to be cloned is located.
1006  *
1007  * On return, bio_src is updated to refer to the bio in the source
1008  * chain that contains first un-cloned byte, and *offset will
1009  * contain the offset of that byte within that bio.
1010  */
1011 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1012                                         unsigned int *offset,
1013                                         unsigned int len,
1014                                         gfp_t gfpmask)
1015 {
1016         struct bio *bi = *bio_src;
1017         unsigned int off = *offset;
1018         struct bio *chain = NULL;
1019         struct bio **end;
1020
1021         /* Build up a chain of clone bios up to the limit */
1022
1023         if (!bi || off >= bi->bi_size || !len)
1024                 return NULL;            /* Nothing to clone */
1025
1026         end = &chain;
1027         while (len) {
1028                 unsigned int bi_size;
1029                 struct bio *bio;
1030
1031                 if (!bi) {
1032                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1033                         goto out_err;   /* EINVAL; ran out of bio's */
1034                 }
1035                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1036                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1037                 if (!bio)
1038                         goto out_err;   /* ENOMEM */
1039
1040                 *end = bio;
1041                 end = &bio->bi_next;
1042
1043                 off += bi_size;
1044                 if (off == bi->bi_size) {
1045                         bi = bi->bi_next;
1046                         off = 0;
1047                 }
1048                 len -= bi_size;
1049         }
1050         *bio_src = bi;
1051         *offset = off;
1052
1053         return chain;
1054 out_err:
1055         bio_chain_put(chain);
1056
1057         return NULL;
1058 }
1059
1060 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1061 {
1062         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1063                 atomic_read(&obj_request->kref.refcount));
1064         kref_get(&obj_request->kref);
1065 }
1066
1067 static void rbd_obj_request_destroy(struct kref *kref);
1068 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1069 {
1070         rbd_assert(obj_request != NULL);
1071         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1072                 atomic_read(&obj_request->kref.refcount));
1073         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1074 }
1075
1076 static void rbd_img_request_get(struct rbd_img_request *img_request)
1077 {
1078         dout("%s: img %p (was %d)\n", __func__, img_request,
1079                 atomic_read(&img_request->kref.refcount));
1080         kref_get(&img_request->kref);
1081 }
1082
1083 static void rbd_img_request_destroy(struct kref *kref);
1084 static void rbd_img_request_put(struct rbd_img_request *img_request)
1085 {
1086         rbd_assert(img_request != NULL);
1087         dout("%s: img %p (was %d)\n", __func__, img_request,
1088                 atomic_read(&img_request->kref.refcount));
1089         kref_put(&img_request->kref, rbd_img_request_destroy);
1090 }
1091
1092 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1093                                         struct rbd_obj_request *obj_request)
1094 {
1095         rbd_assert(obj_request->img_request == NULL);
1096
1097         rbd_obj_request_get(obj_request);
1098         obj_request->img_request = img_request;
1099         obj_request->which = img_request->obj_request_count;
1100         rbd_assert(obj_request->which != BAD_WHICH);
1101         img_request->obj_request_count++;
1102         list_add_tail(&obj_request->links, &img_request->obj_requests);
1103         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1104                 obj_request->which);
1105 }
1106
1107 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1108                                         struct rbd_obj_request *obj_request)
1109 {
1110         rbd_assert(obj_request->which != BAD_WHICH);
1111
1112         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1113                 obj_request->which);
1114         list_del(&obj_request->links);
1115         rbd_assert(img_request->obj_request_count > 0);
1116         img_request->obj_request_count--;
1117         rbd_assert(obj_request->which == img_request->obj_request_count);
1118         obj_request->which = BAD_WHICH;
1119         rbd_assert(obj_request->img_request == img_request);
1120         obj_request->img_request = NULL;
1121         obj_request->callback = NULL;
1122         rbd_obj_request_put(obj_request);
1123 }
1124
1125 static bool obj_request_type_valid(enum obj_request_type type)
1126 {
1127         switch (type) {
1128         case OBJ_REQUEST_NODATA:
1129         case OBJ_REQUEST_BIO:
1130         case OBJ_REQUEST_PAGES:
1131                 return true;
1132         default:
1133                 return false;
1134         }
1135 }
1136
1137 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1138                                 struct rbd_obj_request *obj_request)
1139 {
1140         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1141
1142         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1143 }
1144
1145 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1146 {
1147         dout("%s: img %p\n", __func__, img_request);
1148         if (img_request->callback)
1149                 img_request->callback(img_request);
1150         else
1151                 rbd_img_request_put(img_request);
1152 }
1153
1154 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1155
1156 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1157 {
1158         dout("%s: obj %p\n", __func__, obj_request);
1159
1160         return wait_for_completion_interruptible(&obj_request->completion);
1161 }
1162
1163 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1164 {
1165         atomic_set(&obj_request->done, 0);
1166         smp_wmb();
1167 }
1168
1169 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1170 {
1171         int done;
1172
1173         done = atomic_inc_return(&obj_request->done);
1174         if (done > 1) {
1175                 struct rbd_img_request *img_request = obj_request->img_request;
1176                 struct rbd_device *rbd_dev;
1177
1178                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1179                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1180                         obj_request);
1181         }
1182 }
1183
1184 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1185 {
1186         smp_mb();
1187         return atomic_read(&obj_request->done) != 0;
1188 }
1189
1190 static void
1191 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1192 {
1193         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1194                 obj_request, obj_request->img_request, obj_request->result,
1195                 obj_request->xferred, obj_request->length);
1196         /*
1197          * ENOENT means a hole in the image.  We zero-fill the
1198          * entire length of the request.  A short read also implies
1199          * zero-fill to the end of the request.  Either way we
1200          * update the xferred count to indicate the whole request
1201          * was satisfied.
1202          */
1203         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1204         if (obj_request->result == -ENOENT) {
1205                 zero_bio_chain(obj_request->bio_list, 0);
1206                 obj_request->result = 0;
1207                 obj_request->xferred = obj_request->length;
1208         } else if (obj_request->xferred < obj_request->length &&
1209                         !obj_request->result) {
1210                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1211                 obj_request->xferred = obj_request->length;
1212         }
1213         obj_request_done_set(obj_request);
1214 }
1215
1216 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1217 {
1218         dout("%s: obj %p cb %p\n", __func__, obj_request,
1219                 obj_request->callback);
1220         if (obj_request->callback)
1221                 obj_request->callback(obj_request);
1222         else
1223                 complete_all(&obj_request->completion);
1224 }
1225
1226 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1227 {
1228         dout("%s: obj %p\n", __func__, obj_request);
1229         obj_request_done_set(obj_request);
1230 }
1231
1232 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1233 {
1234         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1235                 obj_request->result, obj_request->xferred, obj_request->length);
1236         if (obj_request->img_request)
1237                 rbd_img_obj_request_read_callback(obj_request);
1238         else
1239                 obj_request_done_set(obj_request);
1240 }
1241
1242 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1243 {
1244         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1245                 obj_request->result, obj_request->length);
1246         /*
1247          * There is no such thing as a successful short write.
1248          * Our xferred value is the number of bytes transferred
1249          * back.  Set it to our originally-requested length.
1250          */
1251         obj_request->xferred = obj_request->length;
1252         obj_request_done_set(obj_request);
1253 }
1254
1255 /*
1256  * For a simple stat call there's nothing to do.  We'll do more if
1257  * this is part of a write sequence for a layered image.
1258  */
1259 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1260 {
1261         dout("%s: obj %p\n", __func__, obj_request);
1262         obj_request_done_set(obj_request);
1263 }
1264
1265 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1266                                 struct ceph_msg *msg)
1267 {
1268         struct rbd_obj_request *obj_request = osd_req->r_priv;
1269         u16 opcode;
1270
1271         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1272         rbd_assert(osd_req == obj_request->osd_req);
1273         rbd_assert(!!obj_request->img_request ^
1274                                 (obj_request->which == BAD_WHICH));
1275
1276         if (osd_req->r_result < 0)
1277                 obj_request->result = osd_req->r_result;
1278         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1279
1280         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1281
1282         /*
1283          * We support a 64-bit length, but ultimately it has to be
1284          * passed to blk_end_request(), which takes an unsigned int.
1285          */
1286         obj_request->xferred = osd_req->r_reply_op_len[0];
1287         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1288         opcode = osd_req->r_ops[0].op;
1289         switch (opcode) {
1290         case CEPH_OSD_OP_READ:
1291                 rbd_osd_read_callback(obj_request);
1292                 break;
1293         case CEPH_OSD_OP_WRITE:
1294                 rbd_osd_write_callback(obj_request);
1295                 break;
1296         case CEPH_OSD_OP_STAT:
1297                 rbd_osd_stat_callback(obj_request);
1298                 break;
1299         case CEPH_OSD_OP_CALL:
1300         case CEPH_OSD_OP_NOTIFY_ACK:
1301         case CEPH_OSD_OP_WATCH:
1302                 rbd_osd_trivial_callback(obj_request);
1303                 break;
1304         default:
1305                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1306                         obj_request->object_name, (unsigned short) opcode);
1307                 break;
1308         }
1309
1310         if (obj_request_done_test(obj_request))
1311                 rbd_obj_request_complete(obj_request);
1312 }
1313
1314 static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request,
1315                                         bool write_request)
1316 {
1317         struct rbd_img_request *img_request = obj_request->img_request;
1318         struct ceph_osd_request *osd_req = obj_request->osd_req;
1319         struct ceph_osd_data *osd_data = NULL;
1320         struct ceph_snap_context *snapc = NULL;
1321         u64 snap_id = CEPH_NOSNAP;
1322         struct timespec *mtime = NULL;
1323         struct timespec now;
1324
1325         rbd_assert(osd_req != NULL);
1326
1327         if (write_request) {
1328                 osd_data = &osd_req->r_data_out;
1329                 now = CURRENT_TIME;
1330                 mtime = &now;
1331                 if (img_request)
1332                         snapc = img_request->snapc;
1333         } else {
1334                 osd_data = &osd_req->r_data_in;
1335                 if (img_request)
1336                         snap_id = img_request->snap_id;
1337         }
1338         if (obj_request->type != OBJ_REQUEST_NODATA) {
1339                 struct ceph_osd_req_op *op = &obj_request->osd_req->r_ops[0];
1340
1341                 /*
1342                  * If it has data, it's either a object class method
1343                  * call (cls) or it's an extent operation.
1344                  */
1345                 if (op->op == CEPH_OSD_OP_CALL)
1346                         osd_req_op_cls_response_data(op, osd_data);
1347                 else
1348                         osd_req_op_extent_osd_data(op, osd_data);
1349         }
1350         ceph_osdc_build_request(osd_req, obj_request->offset,
1351                         snapc, snap_id, mtime);
1352 }
1353
1354 static struct ceph_osd_request *rbd_osd_req_create(
1355                                         struct rbd_device *rbd_dev,
1356                                         bool write_request,
1357                                         struct rbd_obj_request *obj_request)
1358 {
1359         struct rbd_img_request *img_request = obj_request->img_request;
1360         struct ceph_snap_context *snapc = NULL;
1361         struct ceph_osd_client *osdc;
1362         struct ceph_osd_request *osd_req;
1363         struct ceph_osd_data *osd_data;
1364         u64 offset = obj_request->offset;
1365
1366         if (img_request) {
1367                 rbd_assert(img_request->write_request == write_request);
1368                 if (img_request->write_request)
1369                         snapc = img_request->snapc;
1370         }
1371
1372         /* Allocate and initialize the request, for the single op */
1373
1374         osdc = &rbd_dev->rbd_client->client->osdc;
1375         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1376         if (!osd_req)
1377                 return NULL;    /* ENOMEM */
1378         osd_data = write_request ? &osd_req->r_data_out : &osd_req->r_data_in;
1379
1380         rbd_assert(obj_request_type_valid(obj_request->type));
1381         switch (obj_request->type) {
1382         case OBJ_REQUEST_NODATA:
1383                 break;          /* Nothing to do */
1384         case OBJ_REQUEST_BIO:
1385                 rbd_assert(obj_request->bio_list != NULL);
1386                 ceph_osd_data_bio_init(osd_data, obj_request->bio_list,
1387                                         obj_request->length);
1388                 break;
1389         case OBJ_REQUEST_PAGES:
1390                 ceph_osd_data_pages_init(osd_data, obj_request->pages,
1391                                 obj_request->length, offset & ~PAGE_MASK,
1392                                 false, false);
1393                 break;
1394         }
1395
1396         if (write_request)
1397                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1398         else
1399                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1400
1401         osd_req->r_callback = rbd_osd_req_callback;
1402         osd_req->r_priv = obj_request;
1403
1404         osd_req->r_oid_len = strlen(obj_request->object_name);
1405         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1406         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1407
1408         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1409
1410         return osd_req;
1411 }
1412
1413 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1414 {
1415         ceph_osdc_put_request(osd_req);
1416 }
1417
1418 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1419
1420 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1421                                                 u64 offset, u64 length,
1422                                                 enum obj_request_type type)
1423 {
1424         struct rbd_obj_request *obj_request;
1425         size_t size;
1426         char *name;
1427
1428         rbd_assert(obj_request_type_valid(type));
1429
1430         size = strlen(object_name) + 1;
1431         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1432         if (!obj_request)
1433                 return NULL;
1434
1435         name = (char *)(obj_request + 1);
1436         obj_request->object_name = memcpy(name, object_name, size);
1437         obj_request->offset = offset;
1438         obj_request->length = length;
1439         obj_request->which = BAD_WHICH;
1440         obj_request->type = type;
1441         INIT_LIST_HEAD(&obj_request->links);
1442         obj_request_done_init(obj_request);
1443         init_completion(&obj_request->completion);
1444         kref_init(&obj_request->kref);
1445
1446         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1447                 offset, length, (int)type, obj_request);
1448
1449         return obj_request;
1450 }
1451
1452 static void rbd_obj_request_destroy(struct kref *kref)
1453 {
1454         struct rbd_obj_request *obj_request;
1455
1456         obj_request = container_of(kref, struct rbd_obj_request, kref);
1457
1458         dout("%s: obj %p\n", __func__, obj_request);
1459
1460         rbd_assert(obj_request->img_request == NULL);
1461         rbd_assert(obj_request->which == BAD_WHICH);
1462
1463         if (obj_request->osd_req)
1464                 rbd_osd_req_destroy(obj_request->osd_req);
1465
1466         rbd_assert(obj_request_type_valid(obj_request->type));
1467         switch (obj_request->type) {
1468         case OBJ_REQUEST_NODATA:
1469                 break;          /* Nothing to do */
1470         case OBJ_REQUEST_BIO:
1471                 if (obj_request->bio_list)
1472                         bio_chain_put(obj_request->bio_list);
1473                 break;
1474         case OBJ_REQUEST_PAGES:
1475                 if (obj_request->pages)
1476                         ceph_release_page_vector(obj_request->pages,
1477                                                 obj_request->page_count);
1478                 break;
1479         }
1480
1481         kfree(obj_request);
1482 }
1483
1484 /*
1485  * Caller is responsible for filling in the list of object requests
1486  * that comprises the image request, and the Linux request pointer
1487  * (if there is one).
1488  */
1489 static struct rbd_img_request *rbd_img_request_create(
1490                                         struct rbd_device *rbd_dev,
1491                                         u64 offset, u64 length,
1492                                         bool write_request)
1493 {
1494         struct rbd_img_request *img_request;
1495         struct ceph_snap_context *snapc = NULL;
1496
1497         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1498         if (!img_request)
1499                 return NULL;
1500
1501         if (write_request) {
1502                 down_read(&rbd_dev->header_rwsem);
1503                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1504                 up_read(&rbd_dev->header_rwsem);
1505                 if (WARN_ON(!snapc)) {
1506                         kfree(img_request);
1507                         return NULL;    /* Shouldn't happen */
1508                 }
1509         }
1510
1511         img_request->rq = NULL;
1512         img_request->rbd_dev = rbd_dev;
1513         img_request->offset = offset;
1514         img_request->length = length;
1515         img_request->write_request = write_request;
1516         if (write_request)
1517                 img_request->snapc = snapc;
1518         else
1519                 img_request->snap_id = rbd_dev->spec->snap_id;
1520         spin_lock_init(&img_request->completion_lock);
1521         img_request->next_completion = 0;
1522         img_request->callback = NULL;
1523         img_request->obj_request_count = 0;
1524         INIT_LIST_HEAD(&img_request->obj_requests);
1525         kref_init(&img_request->kref);
1526
1527         rbd_img_request_get(img_request);       /* Avoid a warning */
1528         rbd_img_request_put(img_request);       /* TEMPORARY */
1529
1530         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1531                 write_request ? "write" : "read", offset, length,
1532                 img_request);
1533
1534         return img_request;
1535 }
1536
1537 static void rbd_img_request_destroy(struct kref *kref)
1538 {
1539         struct rbd_img_request *img_request;
1540         struct rbd_obj_request *obj_request;
1541         struct rbd_obj_request *next_obj_request;
1542
1543         img_request = container_of(kref, struct rbd_img_request, kref);
1544
1545         dout("%s: img %p\n", __func__, img_request);
1546
1547         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1548                 rbd_img_obj_request_del(img_request, obj_request);
1549         rbd_assert(img_request->obj_request_count == 0);
1550
1551         if (img_request->write_request)
1552                 ceph_put_snap_context(img_request->snapc);
1553
1554         kfree(img_request);
1555 }
1556
1557 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1558                                         struct bio *bio_list)
1559 {
1560         struct rbd_device *rbd_dev = img_request->rbd_dev;
1561         struct rbd_obj_request *obj_request = NULL;
1562         struct rbd_obj_request *next_obj_request;
1563         bool write_request = img_request->write_request;
1564         unsigned int bio_offset;
1565         u64 image_offset;
1566         u64 resid;
1567         u16 opcode;
1568
1569         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1570
1571         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1572         bio_offset = 0;
1573         image_offset = img_request->offset;
1574         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1575         resid = img_request->length;
1576         rbd_assert(resid > 0);
1577         while (resid) {
1578                 const char *object_name;
1579                 unsigned int clone_size;
1580                 struct ceph_osd_req_op *op;
1581                 u64 offset;
1582                 u64 length;
1583
1584                 object_name = rbd_segment_name(rbd_dev, image_offset);
1585                 if (!object_name)
1586                         goto out_unwind;
1587                 offset = rbd_segment_offset(rbd_dev, image_offset);
1588                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1589                 obj_request = rbd_obj_request_create(object_name,
1590                                                 offset, length,
1591                                                 OBJ_REQUEST_BIO);
1592                 kfree(object_name);     /* object request has its own copy */
1593                 if (!obj_request)
1594                         goto out_unwind;
1595
1596                 rbd_assert(length <= (u64) UINT_MAX);
1597                 clone_size = (unsigned int) length;
1598                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1599                                                 &bio_offset, clone_size,
1600                                                 GFP_ATOMIC);
1601                 if (!obj_request->bio_list)
1602                         goto out_partial;
1603
1604                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1605                                                 write_request, obj_request);
1606                 if (!obj_request->osd_req)
1607                         goto out_partial;
1608
1609                 op = &obj_request->osd_req->r_ops[0];
1610                 osd_req_op_extent_init(op, opcode, offset, length, 0, 0);
1611                 rbd_osd_req_format_op(obj_request, write_request);
1612
1613                 /* status and version are initially zero-filled */
1614
1615                 rbd_img_obj_request_add(img_request, obj_request);
1616
1617                 image_offset += length;
1618                 resid -= length;
1619         }
1620
1621         return 0;
1622
1623 out_partial:
1624         rbd_obj_request_put(obj_request);
1625 out_unwind:
1626         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1627                 rbd_obj_request_put(obj_request);
1628
1629         return -ENOMEM;
1630 }
1631
1632 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1633 {
1634         struct rbd_img_request *img_request;
1635         u32 which = obj_request->which;
1636         bool more = true;
1637
1638         img_request = obj_request->img_request;
1639
1640         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1641         rbd_assert(img_request != NULL);
1642         rbd_assert(img_request->rq != NULL);
1643         rbd_assert(img_request->obj_request_count > 0);
1644         rbd_assert(which != BAD_WHICH);
1645         rbd_assert(which < img_request->obj_request_count);
1646         rbd_assert(which >= img_request->next_completion);
1647
1648         spin_lock_irq(&img_request->completion_lock);
1649         if (which != img_request->next_completion)
1650                 goto out;
1651
1652         for_each_obj_request_from(img_request, obj_request) {
1653                 unsigned int xferred;
1654                 int result;
1655
1656                 rbd_assert(more);
1657                 rbd_assert(which < img_request->obj_request_count);
1658
1659                 if (!obj_request_done_test(obj_request))
1660                         break;
1661
1662                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1663                 xferred = (unsigned int) obj_request->xferred;
1664                 result = (int) obj_request->result;
1665                 if (result)
1666                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1667                                 img_request->write_request ? "write" : "read",
1668                                 result, xferred);
1669
1670                 more = blk_end_request(img_request->rq, result, xferred);
1671                 which++;
1672         }
1673
1674         rbd_assert(more ^ (which == img_request->obj_request_count));
1675         img_request->next_completion = which;
1676 out:
1677         spin_unlock_irq(&img_request->completion_lock);
1678
1679         if (!more)
1680                 rbd_img_request_complete(img_request);
1681 }
1682
1683 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1684 {
1685         struct rbd_device *rbd_dev = img_request->rbd_dev;
1686         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1687         struct rbd_obj_request *obj_request;
1688         struct rbd_obj_request *next_obj_request;
1689
1690         dout("%s: img %p\n", __func__, img_request);
1691         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1692                 int ret;
1693
1694                 obj_request->callback = rbd_img_obj_callback;
1695                 ret = rbd_obj_request_submit(osdc, obj_request);
1696                 if (ret)
1697                         return ret;
1698                 /*
1699                  * The image request has its own reference to each
1700                  * of its object requests, so we can safely drop the
1701                  * initial one here.
1702                  */
1703                 rbd_obj_request_put(obj_request);
1704         }
1705
1706         return 0;
1707 }
1708
1709 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1710                                    u64 ver, u64 notify_id)
1711 {
1712         struct rbd_obj_request *obj_request;
1713         struct ceph_osd_req_op *op;
1714         struct ceph_osd_client *osdc;
1715         int ret;
1716
1717         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1718                                                         OBJ_REQUEST_NODATA);
1719         if (!obj_request)
1720                 return -ENOMEM;
1721
1722         ret = -ENOMEM;
1723         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1724         if (!obj_request->osd_req)
1725                 goto out;
1726
1727         op = &obj_request->osd_req->r_ops[0];
1728         osd_req_op_watch_init(op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
1729         rbd_osd_req_format_op(obj_request, false);
1730
1731         osdc = &rbd_dev->rbd_client->client->osdc;
1732         obj_request->callback = rbd_obj_request_put;
1733         ret = rbd_obj_request_submit(osdc, obj_request);
1734 out:
1735         if (ret)
1736                 rbd_obj_request_put(obj_request);
1737
1738         return ret;
1739 }
1740
1741 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1742 {
1743         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1744         u64 hver;
1745         int rc;
1746
1747         if (!rbd_dev)
1748                 return;
1749
1750         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1751                 rbd_dev->header_name, (unsigned long long) notify_id,
1752                 (unsigned int) opcode);
1753         rc = rbd_dev_refresh(rbd_dev, &hver);
1754         if (rc)
1755                 rbd_warn(rbd_dev, "got notification but failed to "
1756                            " update snaps: %d\n", rc);
1757
1758         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1759 }
1760
1761 /*
1762  * Request sync osd watch/unwatch.  The value of "start" determines
1763  * whether a watch request is being initiated or torn down.
1764  */
1765 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1766 {
1767         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1768         struct rbd_obj_request *obj_request;
1769         struct ceph_osd_req_op *op;
1770         int ret;
1771
1772         rbd_assert(start ^ !!rbd_dev->watch_event);
1773         rbd_assert(start ^ !!rbd_dev->watch_request);
1774
1775         if (start) {
1776                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1777                                                 &rbd_dev->watch_event);
1778                 if (ret < 0)
1779                         return ret;
1780                 rbd_assert(rbd_dev->watch_event != NULL);
1781         }
1782
1783         ret = -ENOMEM;
1784         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1785                                                         OBJ_REQUEST_NODATA);
1786         if (!obj_request)
1787                 goto out_cancel;
1788
1789         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1790         if (!obj_request->osd_req)
1791                 goto out_cancel;
1792
1793         op = &obj_request->osd_req->r_ops[0];
1794         osd_req_op_watch_init(op, CEPH_OSD_OP_WATCH,
1795                                 rbd_dev->watch_event->cookie,
1796                                 rbd_dev->header.obj_version, start);
1797         rbd_osd_req_format_op(obj_request, true);
1798
1799         if (start)
1800                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1801         else
1802                 ceph_osdc_unregister_linger_request(osdc,
1803                                         rbd_dev->watch_request->osd_req);
1804         ret = rbd_obj_request_submit(osdc, obj_request);
1805         if (ret)
1806                 goto out_cancel;
1807         ret = rbd_obj_request_wait(obj_request);
1808         if (ret)
1809                 goto out_cancel;
1810         ret = obj_request->result;
1811         if (ret)
1812                 goto out_cancel;
1813
1814         /*
1815          * A watch request is set to linger, so the underlying osd
1816          * request won't go away until we unregister it.  We retain
1817          * a pointer to the object request during that time (in
1818          * rbd_dev->watch_request), so we'll keep a reference to
1819          * it.  We'll drop that reference (below) after we've
1820          * unregistered it.
1821          */
1822         if (start) {
1823                 rbd_dev->watch_request = obj_request;
1824
1825                 return 0;
1826         }
1827
1828         /* We have successfully torn down the watch request */
1829
1830         rbd_obj_request_put(rbd_dev->watch_request);
1831         rbd_dev->watch_request = NULL;
1832 out_cancel:
1833         /* Cancel the event if we're tearing down, or on error */
1834         ceph_osdc_cancel_event(rbd_dev->watch_event);
1835         rbd_dev->watch_event = NULL;
1836         if (obj_request)
1837                 rbd_obj_request_put(obj_request);
1838
1839         return ret;
1840 }
1841
1842 /*
1843  * Synchronous osd object method call
1844  */
1845 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1846                              const char *object_name,
1847                              const char *class_name,
1848                              const char *method_name,
1849                              const char *outbound,
1850                              size_t outbound_size,
1851                              char *inbound,
1852                              size_t inbound_size,
1853                              u64 *version)
1854 {
1855         struct rbd_obj_request *obj_request;
1856         struct ceph_osd_client *osdc;
1857         struct ceph_osd_req_op *op;
1858         struct page **pages;
1859         u32 page_count;
1860         int ret;
1861
1862         /*
1863          * Method calls are ultimately read operations.  The result
1864          * should placed into the inbound buffer provided.  They
1865          * also supply outbound data--parameters for the object
1866          * method.  Currently if this is present it will be a
1867          * snapshot id.
1868          */
1869         page_count = (u32) calc_pages_for(0, inbound_size);
1870         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1871         if (IS_ERR(pages))
1872                 return PTR_ERR(pages);
1873
1874         ret = -ENOMEM;
1875         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1876                                                         OBJ_REQUEST_PAGES);
1877         if (!obj_request)
1878                 goto out;
1879
1880         obj_request->pages = pages;
1881         obj_request->page_count = page_count;
1882
1883         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1884         if (!obj_request->osd_req)
1885                 goto out;
1886
1887         op = &obj_request->osd_req->r_ops[0];
1888         osd_req_op_cls_init(op, CEPH_OSD_OP_CALL, class_name, method_name,
1889                                         outbound, outbound_size);
1890         rbd_osd_req_format_op(obj_request, false);
1891
1892         osdc = &rbd_dev->rbd_client->client->osdc;
1893         ret = rbd_obj_request_submit(osdc, obj_request);
1894         if (ret)
1895                 goto out;
1896         ret = rbd_obj_request_wait(obj_request);
1897         if (ret)
1898                 goto out;
1899
1900         ret = obj_request->result;
1901         if (ret < 0)
1902                 goto out;
1903         ret = 0;
1904         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1905         if (version)
1906                 *version = obj_request->version;
1907 out:
1908         if (obj_request)
1909                 rbd_obj_request_put(obj_request);
1910         else
1911                 ceph_release_page_vector(pages, page_count);
1912
1913         return ret;
1914 }
1915
1916 static void rbd_request_fn(struct request_queue *q)
1917                 __releases(q->queue_lock) __acquires(q->queue_lock)
1918 {
1919         struct rbd_device *rbd_dev = q->queuedata;
1920         bool read_only = rbd_dev->mapping.read_only;
1921         struct request *rq;
1922         int result;
1923
1924         while ((rq = blk_fetch_request(q))) {
1925                 bool write_request = rq_data_dir(rq) == WRITE;
1926                 struct rbd_img_request *img_request;
1927                 u64 offset;
1928                 u64 length;
1929
1930                 /* Ignore any non-FS requests that filter through. */
1931
1932                 if (rq->cmd_type != REQ_TYPE_FS) {
1933                         dout("%s: non-fs request type %d\n", __func__,
1934                                 (int) rq->cmd_type);
1935                         __blk_end_request_all(rq, 0);
1936                         continue;
1937                 }
1938
1939                 /* Ignore/skip any zero-length requests */
1940
1941                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1942                 length = (u64) blk_rq_bytes(rq);
1943
1944                 if (!length) {
1945                         dout("%s: zero-length request\n", __func__);
1946                         __blk_end_request_all(rq, 0);
1947                         continue;
1948                 }
1949
1950                 spin_unlock_irq(q->queue_lock);
1951
1952                 /* Disallow writes to a read-only device */
1953
1954                 if (write_request) {
1955                         result = -EROFS;
1956                         if (read_only)
1957                                 goto end_request;
1958                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1959                 }
1960
1961                 /*
1962                  * Quit early if the mapped snapshot no longer
1963                  * exists.  It's still possible the snapshot will
1964                  * have disappeared by the time our request arrives
1965                  * at the osd, but there's no sense in sending it if
1966                  * we already know.
1967                  */
1968                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1969                         dout("request for non-existent snapshot");
1970                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1971                         result = -ENXIO;
1972                         goto end_request;
1973                 }
1974
1975                 result = -EINVAL;
1976                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1977                         goto end_request;       /* Shouldn't happen */
1978
1979                 result = -ENOMEM;
1980                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1981                                                         write_request);
1982                 if (!img_request)
1983                         goto end_request;
1984
1985                 img_request->rq = rq;
1986
1987                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1988                 if (!result)
1989                         result = rbd_img_request_submit(img_request);
1990                 if (result)
1991                         rbd_img_request_put(img_request);
1992 end_request:
1993                 spin_lock_irq(q->queue_lock);
1994                 if (result < 0) {
1995                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1996                                 write_request ? "write" : "read", result);
1997                         __blk_end_request_all(rq, result);
1998                 }
1999         }
2000 }
2001
2002 /*
2003  * a queue callback. Makes sure that we don't create a bio that spans across
2004  * multiple osd objects. One exception would be with a single page bios,
2005  * which we handle later at bio_chain_clone_range()
2006  */
2007 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2008                           struct bio_vec *bvec)
2009 {
2010         struct rbd_device *rbd_dev = q->queuedata;
2011         sector_t sector_offset;
2012         sector_t sectors_per_obj;
2013         sector_t obj_sector_offset;
2014         int ret;
2015
2016         /*
2017          * Find how far into its rbd object the partition-relative
2018          * bio start sector is to offset relative to the enclosing
2019          * device.
2020          */
2021         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2022         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2023         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2024
2025         /*
2026          * Compute the number of bytes from that offset to the end
2027          * of the object.  Account for what's already used by the bio.
2028          */
2029         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2030         if (ret > bmd->bi_size)
2031                 ret -= bmd->bi_size;
2032         else
2033                 ret = 0;
2034
2035         /*
2036          * Don't send back more than was asked for.  And if the bio
2037          * was empty, let the whole thing through because:  "Note
2038          * that a block device *must* allow a single page to be
2039          * added to an empty bio."
2040          */
2041         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2042         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2043                 ret = (int) bvec->bv_len;
2044
2045         return ret;
2046 }
2047
2048 static void rbd_free_disk(struct rbd_device *rbd_dev)
2049 {
2050         struct gendisk *disk = rbd_dev->disk;
2051
2052         if (!disk)
2053                 return;
2054
2055         if (disk->flags & GENHD_FL_UP)
2056                 del_gendisk(disk);
2057         if (disk->queue)
2058                 blk_cleanup_queue(disk->queue);
2059         put_disk(disk);
2060 }
2061
2062 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2063                                 const char *object_name,
2064                                 u64 offset, u64 length,
2065                                 char *buf, u64 *version)
2066
2067 {
2068         struct rbd_obj_request *obj_request;
2069         struct ceph_osd_req_op *op;
2070         struct ceph_osd_client *osdc;
2071         struct page **pages = NULL;
2072         u32 page_count;
2073         size_t size;
2074         int ret;
2075
2076         page_count = (u32) calc_pages_for(offset, length);
2077         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2078         if (IS_ERR(pages))
2079                 ret = PTR_ERR(pages);
2080
2081         ret = -ENOMEM;
2082         obj_request = rbd_obj_request_create(object_name, offset, length,
2083                                                         OBJ_REQUEST_PAGES);
2084         if (!obj_request)
2085                 goto out;
2086
2087         obj_request->pages = pages;
2088         obj_request->page_count = page_count;
2089
2090         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2091         if (!obj_request->osd_req)
2092                 goto out;
2093
2094         op = &obj_request->osd_req->r_ops[0];
2095         osd_req_op_extent_init(op, CEPH_OSD_OP_READ, offset, length, 0, 0);
2096         rbd_osd_req_format_op(obj_request, false);
2097
2098         osdc = &rbd_dev->rbd_client->client->osdc;
2099         ret = rbd_obj_request_submit(osdc, obj_request);
2100         if (ret)
2101                 goto out;
2102         ret = rbd_obj_request_wait(obj_request);
2103         if (ret)
2104                 goto out;
2105
2106         ret = obj_request->result;
2107         if (ret < 0)
2108                 goto out;
2109
2110         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2111         size = (size_t) obj_request->xferred;
2112         ceph_copy_from_page_vector(pages, buf, 0, size);
2113         rbd_assert(size <= (size_t) INT_MAX);
2114         ret = (int) size;
2115         if (version)
2116                 *version = obj_request->version;
2117 out:
2118         if (obj_request)
2119                 rbd_obj_request_put(obj_request);
2120         else
2121                 ceph_release_page_vector(pages, page_count);
2122
2123         return ret;
2124 }
2125
2126 /*
2127  * Read the complete header for the given rbd device.
2128  *
2129  * Returns a pointer to a dynamically-allocated buffer containing
2130  * the complete and validated header.  Caller can pass the address
2131  * of a variable that will be filled in with the version of the
2132  * header object at the time it was read.
2133  *
2134  * Returns a pointer-coded errno if a failure occurs.
2135  */
2136 static struct rbd_image_header_ondisk *
2137 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2138 {
2139         struct rbd_image_header_ondisk *ondisk = NULL;
2140         u32 snap_count = 0;
2141         u64 names_size = 0;
2142         u32 want_count;
2143         int ret;
2144
2145         /*
2146          * The complete header will include an array of its 64-bit
2147          * snapshot ids, followed by the names of those snapshots as
2148          * a contiguous block of NUL-terminated strings.  Note that
2149          * the number of snapshots could change by the time we read
2150          * it in, in which case we re-read it.
2151          */
2152         do {
2153                 size_t size;
2154
2155                 kfree(ondisk);
2156
2157                 size = sizeof (*ondisk);
2158                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2159                 size += names_size;
2160                 ondisk = kmalloc(size, GFP_KERNEL);
2161                 if (!ondisk)
2162                         return ERR_PTR(-ENOMEM);
2163
2164                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2165                                        0, size,
2166                                        (char *) ondisk, version);
2167                 if (ret < 0)
2168                         goto out_err;
2169                 if (WARN_ON((size_t) ret < size)) {
2170                         ret = -ENXIO;
2171                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2172                                 size, ret);
2173                         goto out_err;
2174                 }
2175                 if (!rbd_dev_ondisk_valid(ondisk)) {
2176                         ret = -ENXIO;
2177                         rbd_warn(rbd_dev, "invalid header");
2178                         goto out_err;
2179                 }
2180
2181                 names_size = le64_to_cpu(ondisk->snap_names_len);
2182                 want_count = snap_count;
2183                 snap_count = le32_to_cpu(ondisk->snap_count);
2184         } while (snap_count != want_count);
2185
2186         return ondisk;
2187
2188 out_err:
2189         kfree(ondisk);
2190
2191         return ERR_PTR(ret);
2192 }
2193
2194 /*
2195  * reload the ondisk the header
2196  */
2197 static int rbd_read_header(struct rbd_device *rbd_dev,
2198                            struct rbd_image_header *header)
2199 {
2200         struct rbd_image_header_ondisk *ondisk;
2201         u64 ver = 0;
2202         int ret;
2203
2204         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2205         if (IS_ERR(ondisk))
2206                 return PTR_ERR(ondisk);
2207         ret = rbd_header_from_disk(header, ondisk);
2208         if (ret >= 0)
2209                 header->obj_version = ver;
2210         kfree(ondisk);
2211
2212         return ret;
2213 }
2214
2215 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2216 {
2217         struct rbd_snap *snap;
2218         struct rbd_snap *next;
2219
2220         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2221                 rbd_remove_snap_dev(snap);
2222 }
2223
2224 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2225 {
2226         sector_t size;
2227
2228         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2229                 return;
2230
2231         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2232         dout("setting size to %llu sectors", (unsigned long long) size);
2233         rbd_dev->mapping.size = (u64) size;
2234         set_capacity(rbd_dev->disk, size);
2235 }
2236
2237 /*
2238  * only read the first part of the ondisk header, without the snaps info
2239  */
2240 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2241 {
2242         int ret;
2243         struct rbd_image_header h;
2244
2245         ret = rbd_read_header(rbd_dev, &h);
2246         if (ret < 0)
2247                 return ret;
2248
2249         down_write(&rbd_dev->header_rwsem);
2250
2251         /* Update image size, and check for resize of mapped image */
2252         rbd_dev->header.image_size = h.image_size;
2253         rbd_update_mapping_size(rbd_dev);
2254
2255         /* rbd_dev->header.object_prefix shouldn't change */
2256         kfree(rbd_dev->header.snap_sizes);
2257         kfree(rbd_dev->header.snap_names);
2258         /* osd requests may still refer to snapc */
2259         ceph_put_snap_context(rbd_dev->header.snapc);
2260
2261         if (hver)
2262                 *hver = h.obj_version;
2263         rbd_dev->header.obj_version = h.obj_version;
2264         rbd_dev->header.image_size = h.image_size;
2265         rbd_dev->header.snapc = h.snapc;
2266         rbd_dev->header.snap_names = h.snap_names;
2267         rbd_dev->header.snap_sizes = h.snap_sizes;
2268         /* Free the extra copy of the object prefix */
2269         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2270         kfree(h.object_prefix);
2271
2272         ret = rbd_dev_snaps_update(rbd_dev);
2273         if (!ret)
2274                 ret = rbd_dev_snaps_register(rbd_dev);
2275
2276         up_write(&rbd_dev->header_rwsem);
2277
2278         return ret;
2279 }
2280
2281 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2282 {
2283         int ret;
2284
2285         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2286         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2287         if (rbd_dev->image_format == 1)
2288                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2289         else
2290                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2291         mutex_unlock(&ctl_mutex);
2292
2293         return ret;
2294 }
2295
2296 static int rbd_init_disk(struct rbd_device *rbd_dev)
2297 {
2298         struct gendisk *disk;
2299         struct request_queue *q;
2300         u64 segment_size;
2301
2302         /* create gendisk info */
2303         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2304         if (!disk)
2305                 return -ENOMEM;
2306
2307         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2308                  rbd_dev->dev_id);
2309         disk->major = rbd_dev->major;
2310         disk->first_minor = 0;
2311         disk->fops = &rbd_bd_ops;
2312         disk->private_data = rbd_dev;
2313
2314         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2315         if (!q)
2316                 goto out_disk;
2317
2318         /* We use the default size, but let's be explicit about it. */
2319         blk_queue_physical_block_size(q, SECTOR_SIZE);
2320
2321         /* set io sizes to object size */
2322         segment_size = rbd_obj_bytes(&rbd_dev->header);
2323         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2324         blk_queue_max_segment_size(q, segment_size);
2325         blk_queue_io_min(q, segment_size);
2326         blk_queue_io_opt(q, segment_size);
2327
2328         blk_queue_merge_bvec(q, rbd_merge_bvec);
2329         disk->queue = q;
2330
2331         q->queuedata = rbd_dev;
2332
2333         rbd_dev->disk = disk;
2334
2335         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2336
2337         return 0;
2338 out_disk:
2339         put_disk(disk);
2340
2341         return -ENOMEM;
2342 }
2343
2344 /*
2345   sysfs
2346 */
2347
2348 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2349 {
2350         return container_of(dev, struct rbd_device, dev);
2351 }
2352
2353 static ssize_t rbd_size_show(struct device *dev,
2354                              struct device_attribute *attr, char *buf)
2355 {
2356         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2357         sector_t size;
2358
2359         down_read(&rbd_dev->header_rwsem);
2360         size = get_capacity(rbd_dev->disk);
2361         up_read(&rbd_dev->header_rwsem);
2362
2363         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2364 }
2365
2366 /*
2367  * Note this shows the features for whatever's mapped, which is not
2368  * necessarily the base image.
2369  */
2370 static ssize_t rbd_features_show(struct device *dev,
2371                              struct device_attribute *attr, char *buf)
2372 {
2373         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2374
2375         return sprintf(buf, "0x%016llx\n",
2376                         (unsigned long long) rbd_dev->mapping.features);
2377 }
2378
2379 static ssize_t rbd_major_show(struct device *dev,
2380                               struct device_attribute *attr, char *buf)
2381 {
2382         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2383
2384         return sprintf(buf, "%d\n", rbd_dev->major);
2385 }
2386
2387 static ssize_t rbd_client_id_show(struct device *dev,
2388                                   struct device_attribute *attr, char *buf)
2389 {
2390         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2391
2392         return sprintf(buf, "client%lld\n",
2393                         ceph_client_id(rbd_dev->rbd_client->client));
2394 }
2395
2396 static ssize_t rbd_pool_show(struct device *dev,
2397                              struct device_attribute *attr, char *buf)
2398 {
2399         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2400
2401         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2402 }
2403
2404 static ssize_t rbd_pool_id_show(struct device *dev,
2405                              struct device_attribute *attr, char *buf)
2406 {
2407         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2408
2409         return sprintf(buf, "%llu\n",
2410                 (unsigned long long) rbd_dev->spec->pool_id);
2411 }
2412
2413 static ssize_t rbd_name_show(struct device *dev,
2414                              struct device_attribute *attr, char *buf)
2415 {
2416         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2417
2418         if (rbd_dev->spec->image_name)
2419                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2420
2421         return sprintf(buf, "(unknown)\n");
2422 }
2423
2424 static ssize_t rbd_image_id_show(struct device *dev,
2425                              struct device_attribute *attr, char *buf)
2426 {
2427         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2428
2429         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2430 }
2431
2432 /*
2433  * Shows the name of the currently-mapped snapshot (or
2434  * RBD_SNAP_HEAD_NAME for the base image).
2435  */
2436 static ssize_t rbd_snap_show(struct device *dev,
2437                              struct device_attribute *attr,
2438                              char *buf)
2439 {
2440         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2441
2442         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2443 }
2444
2445 /*
2446  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2447  * for the parent image.  If there is no parent, simply shows
2448  * "(no parent image)".
2449  */
2450 static ssize_t rbd_parent_show(struct device *dev,
2451                              struct device_attribute *attr,
2452                              char *buf)
2453 {
2454         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2455         struct rbd_spec *spec = rbd_dev->parent_spec;
2456         int count;
2457         char *bufp = buf;
2458
2459         if (!spec)
2460                 return sprintf(buf, "(no parent image)\n");
2461
2462         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2463                         (unsigned long long) spec->pool_id, spec->pool_name);
2464         if (count < 0)
2465                 return count;
2466         bufp += count;
2467
2468         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2469                         spec->image_name ? spec->image_name : "(unknown)");
2470         if (count < 0)
2471                 return count;
2472         bufp += count;
2473
2474         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2475                         (unsigned long long) spec->snap_id, spec->snap_name);
2476         if (count < 0)
2477                 return count;
2478         bufp += count;
2479
2480         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2481         if (count < 0)
2482                 return count;
2483         bufp += count;
2484
2485         return (ssize_t) (bufp - buf);
2486 }
2487
2488 static ssize_t rbd_image_refresh(struct device *dev,
2489                                  struct device_attribute *attr,
2490                                  const char *buf,
2491                                  size_t size)
2492 {
2493         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2494         int ret;
2495
2496         ret = rbd_dev_refresh(rbd_dev, NULL);
2497
2498         return ret < 0 ? ret : size;
2499 }
2500
2501 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2502 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2503 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2504 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2505 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2506 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2507 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2508 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2509 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2510 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2511 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2512
2513 static struct attribute *rbd_attrs[] = {
2514         &dev_attr_size.attr,
2515         &dev_attr_features.attr,
2516         &dev_attr_major.attr,
2517         &dev_attr_client_id.attr,
2518         &dev_attr_pool.attr,
2519         &dev_attr_pool_id.attr,
2520         &dev_attr_name.attr,
2521         &dev_attr_image_id.attr,
2522         &dev_attr_current_snap.attr,
2523         &dev_attr_parent.attr,
2524         &dev_attr_refresh.attr,
2525         NULL
2526 };
2527
2528 static struct attribute_group rbd_attr_group = {
2529         .attrs = rbd_attrs,
2530 };
2531
2532 static const struct attribute_group *rbd_attr_groups[] = {
2533         &rbd_attr_group,
2534         NULL
2535 };
2536
2537 static void rbd_sysfs_dev_release(struct device *dev)
2538 {
2539 }
2540
2541 static struct device_type rbd_device_type = {
2542         .name           = "rbd",
2543         .groups         = rbd_attr_groups,
2544         .release        = rbd_sysfs_dev_release,
2545 };
2546
2547
2548 /*
2549   sysfs - snapshots
2550 */
2551
2552 static ssize_t rbd_snap_size_show(struct device *dev,
2553                                   struct device_attribute *attr,
2554                                   char *buf)
2555 {
2556         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2557
2558         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2559 }
2560
2561 static ssize_t rbd_snap_id_show(struct device *dev,
2562                                 struct device_attribute *attr,
2563                                 char *buf)
2564 {
2565         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2566
2567         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2568 }
2569
2570 static ssize_t rbd_snap_features_show(struct device *dev,
2571                                 struct device_attribute *attr,
2572                                 char *buf)
2573 {
2574         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2575
2576         return sprintf(buf, "0x%016llx\n",
2577                         (unsigned long long) snap->features);
2578 }
2579
2580 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2581 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2582 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2583
2584 static struct attribute *rbd_snap_attrs[] = {
2585         &dev_attr_snap_size.attr,
2586         &dev_attr_snap_id.attr,
2587         &dev_attr_snap_features.attr,
2588         NULL,
2589 };
2590
2591 static struct attribute_group rbd_snap_attr_group = {
2592         .attrs = rbd_snap_attrs,
2593 };
2594
2595 static void rbd_snap_dev_release(struct device *dev)
2596 {
2597         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2598         kfree(snap->name);
2599         kfree(snap);
2600 }
2601
2602 static const struct attribute_group *rbd_snap_attr_groups[] = {
2603         &rbd_snap_attr_group,
2604         NULL
2605 };
2606
2607 static struct device_type rbd_snap_device_type = {
2608         .groups         = rbd_snap_attr_groups,
2609         .release        = rbd_snap_dev_release,
2610 };
2611
2612 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2613 {
2614         kref_get(&spec->kref);
2615
2616         return spec;
2617 }
2618
2619 static void rbd_spec_free(struct kref *kref);
2620 static void rbd_spec_put(struct rbd_spec *spec)
2621 {
2622         if (spec)
2623                 kref_put(&spec->kref, rbd_spec_free);
2624 }
2625
2626 static struct rbd_spec *rbd_spec_alloc(void)
2627 {
2628         struct rbd_spec *spec;
2629
2630         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2631         if (!spec)
2632                 return NULL;
2633         kref_init(&spec->kref);
2634
2635         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2636
2637         return spec;
2638 }
2639
2640 static void rbd_spec_free(struct kref *kref)
2641 {
2642         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2643
2644         kfree(spec->pool_name);
2645         kfree(spec->image_id);
2646         kfree(spec->image_name);
2647         kfree(spec->snap_name);
2648         kfree(spec);
2649 }
2650
2651 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2652                                 struct rbd_spec *spec)
2653 {
2654         struct rbd_device *rbd_dev;
2655
2656         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2657         if (!rbd_dev)
2658                 return NULL;
2659
2660         spin_lock_init(&rbd_dev->lock);
2661         rbd_dev->flags = 0;
2662         INIT_LIST_HEAD(&rbd_dev->node);
2663         INIT_LIST_HEAD(&rbd_dev->snaps);
2664         init_rwsem(&rbd_dev->header_rwsem);
2665
2666         rbd_dev->spec = spec;
2667         rbd_dev->rbd_client = rbdc;
2668
2669         /* Initialize the layout used for all rbd requests */
2670
2671         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2672         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2673         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2674         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2675
2676         return rbd_dev;
2677 }
2678
2679 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2680 {
2681         rbd_spec_put(rbd_dev->parent_spec);
2682         kfree(rbd_dev->header_name);
2683         rbd_put_client(rbd_dev->rbd_client);
2684         rbd_spec_put(rbd_dev->spec);
2685         kfree(rbd_dev);
2686 }
2687
2688 static bool rbd_snap_registered(struct rbd_snap *snap)
2689 {
2690         bool ret = snap->dev.type == &rbd_snap_device_type;
2691         bool reg = device_is_registered(&snap->dev);
2692
2693         rbd_assert(!ret ^ reg);
2694
2695         return ret;
2696 }
2697
2698 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2699 {
2700         list_del(&snap->node);
2701         if (device_is_registered(&snap->dev))
2702                 device_unregister(&snap->dev);
2703 }
2704
2705 static int rbd_register_snap_dev(struct rbd_snap *snap,
2706                                   struct device *parent)
2707 {
2708         struct device *dev = &snap->dev;
2709         int ret;
2710
2711         dev->type = &rbd_snap_device_type;
2712         dev->parent = parent;
2713         dev->release = rbd_snap_dev_release;
2714         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2715         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2716
2717         ret = device_register(dev);
2718
2719         return ret;
2720 }
2721
2722 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2723                                                 const char *snap_name,
2724                                                 u64 snap_id, u64 snap_size,
2725                                                 u64 snap_features)
2726 {
2727         struct rbd_snap *snap;
2728         int ret;
2729
2730         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2731         if (!snap)
2732                 return ERR_PTR(-ENOMEM);
2733
2734         ret = -ENOMEM;
2735         snap->name = kstrdup(snap_name, GFP_KERNEL);
2736         if (!snap->name)
2737                 goto err;
2738
2739         snap->id = snap_id;
2740         snap->size = snap_size;
2741         snap->features = snap_features;
2742
2743         return snap;
2744
2745 err:
2746         kfree(snap->name);
2747         kfree(snap);
2748
2749         return ERR_PTR(ret);
2750 }
2751
2752 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2753                 u64 *snap_size, u64 *snap_features)
2754 {
2755         char *snap_name;
2756
2757         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2758
2759         *snap_size = rbd_dev->header.snap_sizes[which];
2760         *snap_features = 0;     /* No features for v1 */
2761
2762         /* Skip over names until we find the one we are looking for */
2763
2764         snap_name = rbd_dev->header.snap_names;
2765         while (which--)
2766                 snap_name += strlen(snap_name) + 1;
2767
2768         return snap_name;
2769 }
2770
2771 /*
2772  * Get the size and object order for an image snapshot, or if
2773  * snap_id is CEPH_NOSNAP, gets this information for the base
2774  * image.
2775  */
2776 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2777                                 u8 *order, u64 *snap_size)
2778 {
2779         __le64 snapid = cpu_to_le64(snap_id);
2780         int ret;
2781         struct {
2782                 u8 order;
2783                 __le64 size;
2784         } __attribute__ ((packed)) size_buf = { 0 };
2785
2786         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2787                                 "rbd", "get_size",
2788                                 (char *) &snapid, sizeof (snapid),
2789                                 (char *) &size_buf, sizeof (size_buf), NULL);
2790         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2791         if (ret < 0)
2792                 return ret;
2793
2794         *order = size_buf.order;
2795         *snap_size = le64_to_cpu(size_buf.size);
2796
2797         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2798                 (unsigned long long) snap_id, (unsigned int) *order,
2799                 (unsigned long long) *snap_size);
2800
2801         return 0;
2802 }
2803
2804 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2805 {
2806         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2807                                         &rbd_dev->header.obj_order,
2808                                         &rbd_dev->header.image_size);
2809 }
2810
2811 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2812 {
2813         void *reply_buf;
2814         int ret;
2815         void *p;
2816
2817         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2818         if (!reply_buf)
2819                 return -ENOMEM;
2820
2821         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2822                                 "rbd", "get_object_prefix",
2823                                 NULL, 0,
2824                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2825         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2826         if (ret < 0)
2827                 goto out;
2828
2829         p = reply_buf;
2830         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2831                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2832                                                 NULL, GFP_NOIO);
2833
2834         if (IS_ERR(rbd_dev->header.object_prefix)) {
2835                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2836                 rbd_dev->header.object_prefix = NULL;
2837         } else {
2838                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2839         }
2840
2841 out:
2842         kfree(reply_buf);
2843
2844         return ret;
2845 }
2846
2847 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2848                 u64 *snap_features)
2849 {
2850         __le64 snapid = cpu_to_le64(snap_id);
2851         struct {
2852                 __le64 features;
2853                 __le64 incompat;
2854         } features_buf = { 0 };
2855         u64 incompat;
2856         int ret;
2857
2858         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2859                                 "rbd", "get_features",
2860                                 (char *) &snapid, sizeof (snapid),
2861                                 (char *) &features_buf, sizeof (features_buf),
2862                                 NULL);
2863         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2864         if (ret < 0)
2865                 return ret;
2866
2867         incompat = le64_to_cpu(features_buf.incompat);
2868         if (incompat & ~RBD_FEATURES_ALL)
2869                 return -ENXIO;
2870
2871         *snap_features = le64_to_cpu(features_buf.features);
2872
2873         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2874                 (unsigned long long) snap_id,
2875                 (unsigned long long) *snap_features,
2876                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2877
2878         return 0;
2879 }
2880
2881 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2882 {
2883         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2884                                                 &rbd_dev->header.features);
2885 }
2886
2887 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2888 {
2889         struct rbd_spec *parent_spec;
2890         size_t size;
2891         void *reply_buf = NULL;
2892         __le64 snapid;
2893         void *p;
2894         void *end;
2895         char *image_id;
2896         u64 overlap;
2897         int ret;
2898
2899         parent_spec = rbd_spec_alloc();
2900         if (!parent_spec)
2901                 return -ENOMEM;
2902
2903         size = sizeof (__le64) +                                /* pool_id */
2904                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2905                 sizeof (__le64) +                               /* snap_id */
2906                 sizeof (__le64);                                /* overlap */
2907         reply_buf = kmalloc(size, GFP_KERNEL);
2908         if (!reply_buf) {
2909                 ret = -ENOMEM;
2910                 goto out_err;
2911         }
2912
2913         snapid = cpu_to_le64(CEPH_NOSNAP);
2914         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2915                                 "rbd", "get_parent",
2916                                 (char *) &snapid, sizeof (snapid),
2917                                 (char *) reply_buf, size, NULL);
2918         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2919         if (ret < 0)
2920                 goto out_err;
2921
2922         ret = -ERANGE;
2923         p = reply_buf;
2924         end = (char *) reply_buf + size;
2925         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2926         if (parent_spec->pool_id == CEPH_NOPOOL)
2927                 goto out;       /* No parent?  No problem. */
2928
2929         /* The ceph file layout needs to fit pool id in 32 bits */
2930
2931         ret = -EIO;
2932         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2933                 goto out;
2934
2935         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2936         if (IS_ERR(image_id)) {
2937                 ret = PTR_ERR(image_id);
2938                 goto out_err;
2939         }
2940         parent_spec->image_id = image_id;
2941         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2942         ceph_decode_64_safe(&p, end, overlap, out_err);
2943
2944         rbd_dev->parent_overlap = overlap;
2945         rbd_dev->parent_spec = parent_spec;
2946         parent_spec = NULL;     /* rbd_dev now owns this */
2947 out:
2948         ret = 0;
2949 out_err:
2950         kfree(reply_buf);
2951         rbd_spec_put(parent_spec);
2952
2953         return ret;
2954 }
2955
2956 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2957 {
2958         size_t image_id_size;
2959         char *image_id;
2960         void *p;
2961         void *end;
2962         size_t size;
2963         void *reply_buf = NULL;
2964         size_t len = 0;
2965         char *image_name = NULL;
2966         int ret;
2967
2968         rbd_assert(!rbd_dev->spec->image_name);
2969
2970         len = strlen(rbd_dev->spec->image_id);
2971         image_id_size = sizeof (__le32) + len;
2972         image_id = kmalloc(image_id_size, GFP_KERNEL);
2973         if (!image_id)
2974                 return NULL;
2975
2976         p = image_id;
2977         end = (char *) image_id + image_id_size;
2978         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2979
2980         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2981         reply_buf = kmalloc(size, GFP_KERNEL);
2982         if (!reply_buf)
2983                 goto out;
2984
2985         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2986                                 "rbd", "dir_get_name",
2987                                 image_id, image_id_size,
2988                                 (char *) reply_buf, size, NULL);
2989         if (ret < 0)
2990                 goto out;
2991         p = reply_buf;
2992         end = (char *) reply_buf + size;
2993         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2994         if (IS_ERR(image_name))
2995                 image_name = NULL;
2996         else
2997                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2998 out:
2999         kfree(reply_buf);
3000         kfree(image_id);
3001
3002         return image_name;
3003 }
3004
3005 /*
3006  * When a parent image gets probed, we only have the pool, image,
3007  * and snapshot ids but not the names of any of them.  This call
3008  * is made later to fill in those names.  It has to be done after
3009  * rbd_dev_snaps_update() has completed because some of the
3010  * information (in particular, snapshot name) is not available
3011  * until then.
3012  */
3013 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3014 {
3015         struct ceph_osd_client *osdc;
3016         const char *name;
3017         void *reply_buf = NULL;
3018         int ret;
3019
3020         if (rbd_dev->spec->pool_name)
3021                 return 0;       /* Already have the names */
3022
3023         /* Look up the pool name */
3024
3025         osdc = &rbd_dev->rbd_client->client->osdc;
3026         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3027         if (!name) {
3028                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3029                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3030                 return -EIO;
3031         }
3032
3033         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3034         if (!rbd_dev->spec->pool_name)
3035                 return -ENOMEM;
3036
3037         /* Fetch the image name; tolerate failure here */
3038
3039         name = rbd_dev_image_name(rbd_dev);
3040         if (name)
3041                 rbd_dev->spec->image_name = (char *) name;
3042         else
3043                 rbd_warn(rbd_dev, "unable to get image name");
3044
3045         /* Look up the snapshot name. */
3046
3047         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3048         if (!name) {
3049                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3050                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3051                 ret = -EIO;
3052                 goto out_err;
3053         }
3054         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3055         if(!rbd_dev->spec->snap_name)
3056                 goto out_err;
3057
3058         return 0;
3059 out_err:
3060         kfree(reply_buf);
3061         kfree(rbd_dev->spec->pool_name);
3062         rbd_dev->spec->pool_name = NULL;
3063
3064         return ret;
3065 }
3066
3067 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3068 {
3069         size_t size;
3070         int ret;
3071         void *reply_buf;
3072         void *p;
3073         void *end;
3074         u64 seq;
3075         u32 snap_count;
3076         struct ceph_snap_context *snapc;
3077         u32 i;
3078
3079         /*
3080          * We'll need room for the seq value (maximum snapshot id),
3081          * snapshot count, and array of that many snapshot ids.
3082          * For now we have a fixed upper limit on the number we're
3083          * prepared to receive.
3084          */
3085         size = sizeof (__le64) + sizeof (__le32) +
3086                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3087         reply_buf = kzalloc(size, GFP_KERNEL);
3088         if (!reply_buf)
3089                 return -ENOMEM;
3090
3091         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3092                                 "rbd", "get_snapcontext",
3093                                 NULL, 0,
3094                                 reply_buf, size, ver);
3095         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3096         if (ret < 0)
3097                 goto out;
3098
3099         ret = -ERANGE;
3100         p = reply_buf;
3101         end = (char *) reply_buf + size;
3102         ceph_decode_64_safe(&p, end, seq, out);
3103         ceph_decode_32_safe(&p, end, snap_count, out);
3104
3105         /*
3106          * Make sure the reported number of snapshot ids wouldn't go
3107          * beyond the end of our buffer.  But before checking that,
3108          * make sure the computed size of the snapshot context we
3109          * allocate is representable in a size_t.
3110          */
3111         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3112                                  / sizeof (u64)) {
3113                 ret = -EINVAL;
3114                 goto out;
3115         }
3116         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3117                 goto out;
3118
3119         size = sizeof (struct ceph_snap_context) +
3120                                 snap_count * sizeof (snapc->snaps[0]);
3121         snapc = kmalloc(size, GFP_KERNEL);
3122         if (!snapc) {
3123                 ret = -ENOMEM;
3124                 goto out;
3125         }
3126
3127         atomic_set(&snapc->nref, 1);
3128         snapc->seq = seq;
3129         snapc->num_snaps = snap_count;
3130         for (i = 0; i < snap_count; i++)
3131                 snapc->snaps[i] = ceph_decode_64(&p);
3132
3133         rbd_dev->header.snapc = snapc;
3134
3135         dout("  snap context seq = %llu, snap_count = %u\n",
3136                 (unsigned long long) seq, (unsigned int) snap_count);
3137
3138 out:
3139         kfree(reply_buf);
3140
3141         return 0;
3142 }
3143
3144 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3145 {
3146         size_t size;
3147         void *reply_buf;
3148         __le64 snap_id;
3149         int ret;
3150         void *p;
3151         void *end;
3152         char *snap_name;
3153
3154         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3155         reply_buf = kmalloc(size, GFP_KERNEL);
3156         if (!reply_buf)
3157                 return ERR_PTR(-ENOMEM);
3158
3159         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3160         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3161                                 "rbd", "get_snapshot_name",
3162                                 (char *) &snap_id, sizeof (snap_id),
3163                                 reply_buf, size, NULL);
3164         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3165         if (ret < 0)
3166                 goto out;
3167
3168         p = reply_buf;
3169         end = (char *) reply_buf + size;
3170         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3171         if (IS_ERR(snap_name)) {
3172                 ret = PTR_ERR(snap_name);
3173                 goto out;
3174         } else {
3175                 dout("  snap_id 0x%016llx snap_name = %s\n",
3176                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3177         }
3178         kfree(reply_buf);
3179
3180         return snap_name;
3181 out:
3182         kfree(reply_buf);
3183
3184         return ERR_PTR(ret);
3185 }
3186
3187 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3188                 u64 *snap_size, u64 *snap_features)
3189 {
3190         u64 snap_id;
3191         u8 order;
3192         int ret;
3193
3194         snap_id = rbd_dev->header.snapc->snaps[which];
3195         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3196         if (ret)
3197                 return ERR_PTR(ret);
3198         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3199         if (ret)
3200                 return ERR_PTR(ret);
3201
3202         return rbd_dev_v2_snap_name(rbd_dev, which);
3203 }
3204
3205 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3206                 u64 *snap_size, u64 *snap_features)
3207 {
3208         if (rbd_dev->image_format == 1)
3209                 return rbd_dev_v1_snap_info(rbd_dev, which,
3210                                         snap_size, snap_features);
3211         if (rbd_dev->image_format == 2)
3212                 return rbd_dev_v2_snap_info(rbd_dev, which,
3213                                         snap_size, snap_features);
3214         return ERR_PTR(-EINVAL);
3215 }
3216
3217 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3218 {
3219         int ret;
3220         __u8 obj_order;
3221
3222         down_write(&rbd_dev->header_rwsem);
3223
3224         /* Grab old order first, to see if it changes */
3225
3226         obj_order = rbd_dev->header.obj_order,
3227         ret = rbd_dev_v2_image_size(rbd_dev);
3228         if (ret)
3229                 goto out;
3230         if (rbd_dev->header.obj_order != obj_order) {
3231                 ret = -EIO;
3232                 goto out;
3233         }
3234         rbd_update_mapping_size(rbd_dev);
3235
3236         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3237         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3238         if (ret)
3239                 goto out;
3240         ret = rbd_dev_snaps_update(rbd_dev);
3241         dout("rbd_dev_snaps_update returned %d\n", ret);
3242         if (ret)
3243                 goto out;
3244         ret = rbd_dev_snaps_register(rbd_dev);
3245         dout("rbd_dev_snaps_register returned %d\n", ret);
3246 out:
3247         up_write(&rbd_dev->header_rwsem);
3248
3249         return ret;
3250 }
3251
3252 /*
3253  * Scan the rbd device's current snapshot list and compare it to the
3254  * newly-received snapshot context.  Remove any existing snapshots
3255  * not present in the new snapshot context.  Add a new snapshot for
3256  * any snaphots in the snapshot context not in the current list.
3257  * And verify there are no changes to snapshots we already know
3258  * about.
3259  *
3260  * Assumes the snapshots in the snapshot context are sorted by
3261  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3262  * are also maintained in that order.)
3263  */
3264 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3265 {
3266         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3267         const u32 snap_count = snapc->num_snaps;
3268         struct list_head *head = &rbd_dev->snaps;
3269         struct list_head *links = head->next;
3270         u32 index = 0;
3271
3272         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3273         while (index < snap_count || links != head) {
3274                 u64 snap_id;
3275                 struct rbd_snap *snap;
3276                 char *snap_name;
3277                 u64 snap_size = 0;
3278                 u64 snap_features = 0;
3279
3280                 snap_id = index < snap_count ? snapc->snaps[index]
3281                                              : CEPH_NOSNAP;
3282                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3283                                      : NULL;
3284                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3285
3286                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3287                         struct list_head *next = links->next;
3288
3289                         /*
3290                          * A previously-existing snapshot is not in
3291                          * the new snap context.
3292                          *
3293                          * If the now missing snapshot is the one the
3294                          * image is mapped to, clear its exists flag
3295                          * so we can avoid sending any more requests
3296                          * to it.
3297                          */
3298                         if (rbd_dev->spec->snap_id == snap->id)
3299                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3300                         rbd_remove_snap_dev(snap);
3301                         dout("%ssnap id %llu has been removed\n",
3302                                 rbd_dev->spec->snap_id == snap->id ?
3303                                                         "mapped " : "",
3304                                 (unsigned long long) snap->id);
3305
3306                         /* Done with this list entry; advance */
3307
3308                         links = next;
3309                         continue;
3310                 }
3311
3312                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3313                                         &snap_size, &snap_features);
3314                 if (IS_ERR(snap_name))
3315                         return PTR_ERR(snap_name);
3316
3317                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3318                         (unsigned long long) snap_id);
3319                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3320                         struct rbd_snap *new_snap;
3321
3322                         /* We haven't seen this snapshot before */
3323
3324                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3325                                         snap_id, snap_size, snap_features);
3326                         if (IS_ERR(new_snap)) {
3327                                 int err = PTR_ERR(new_snap);
3328
3329                                 dout("  failed to add dev, error %d\n", err);
3330
3331                                 return err;
3332                         }
3333
3334                         /* New goes before existing, or at end of list */
3335
3336                         dout("  added dev%s\n", snap ? "" : " at end\n");
3337                         if (snap)
3338                                 list_add_tail(&new_snap->node, &snap->node);
3339                         else
3340                                 list_add_tail(&new_snap->node, head);
3341                 } else {
3342                         /* Already have this one */
3343
3344                         dout("  already present\n");
3345
3346                         rbd_assert(snap->size == snap_size);
3347                         rbd_assert(!strcmp(snap->name, snap_name));
3348                         rbd_assert(snap->features == snap_features);
3349
3350                         /* Done with this list entry; advance */
3351
3352                         links = links->next;
3353                 }
3354
3355                 /* Advance to the next entry in the snapshot context */
3356
3357                 index++;
3358         }
3359         dout("%s: done\n", __func__);
3360
3361         return 0;
3362 }
3363
3364 /*
3365  * Scan the list of snapshots and register the devices for any that
3366  * have not already been registered.
3367  */
3368 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3369 {
3370         struct rbd_snap *snap;
3371         int ret = 0;
3372
3373         dout("%s:\n", __func__);
3374         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3375                 return -EIO;
3376
3377         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3378                 if (!rbd_snap_registered(snap)) {
3379                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3380                         if (ret < 0)
3381                                 break;
3382                 }
3383         }
3384         dout("%s: returning %d\n", __func__, ret);
3385
3386         return ret;
3387 }
3388
3389 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3390 {
3391         struct device *dev;
3392         int ret;
3393
3394         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3395
3396         dev = &rbd_dev->dev;
3397         dev->bus = &rbd_bus_type;
3398         dev->type = &rbd_device_type;
3399         dev->parent = &rbd_root_dev;
3400         dev->release = rbd_dev_release;
3401         dev_set_name(dev, "%d", rbd_dev->dev_id);
3402         ret = device_register(dev);
3403
3404         mutex_unlock(&ctl_mutex);
3405
3406         return ret;
3407 }
3408
3409 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3410 {
3411         device_unregister(&rbd_dev->dev);
3412 }
3413
3414 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3415
3416 /*
3417  * Get a unique rbd identifier for the given new rbd_dev, and add
3418  * the rbd_dev to the global list.  The minimum rbd id is 1.
3419  */
3420 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3421 {
3422         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3423
3424         spin_lock(&rbd_dev_list_lock);
3425         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3426         spin_unlock(&rbd_dev_list_lock);
3427         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3428                 (unsigned long long) rbd_dev->dev_id);
3429 }
3430
3431 /*
3432  * Remove an rbd_dev from the global list, and record that its
3433  * identifier is no longer in use.
3434  */
3435 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3436 {
3437         struct list_head *tmp;
3438         int rbd_id = rbd_dev->dev_id;
3439         int max_id;
3440
3441         rbd_assert(rbd_id > 0);
3442
3443         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3444                 (unsigned long long) rbd_dev->dev_id);
3445         spin_lock(&rbd_dev_list_lock);
3446         list_del_init(&rbd_dev->node);
3447
3448         /*
3449          * If the id being "put" is not the current maximum, there
3450          * is nothing special we need to do.
3451          */
3452         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3453                 spin_unlock(&rbd_dev_list_lock);
3454                 return;
3455         }
3456
3457         /*
3458          * We need to update the current maximum id.  Search the
3459          * list to find out what it is.  We're more likely to find
3460          * the maximum at the end, so search the list backward.
3461          */
3462         max_id = 0;
3463         list_for_each_prev(tmp, &rbd_dev_list) {
3464                 struct rbd_device *rbd_dev;
3465
3466                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3467                 if (rbd_dev->dev_id > max_id)
3468                         max_id = rbd_dev->dev_id;
3469         }
3470         spin_unlock(&rbd_dev_list_lock);
3471
3472         /*
3473          * The max id could have been updated by rbd_dev_id_get(), in
3474          * which case it now accurately reflects the new maximum.
3475          * Be careful not to overwrite the maximum value in that
3476          * case.
3477          */
3478         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3479         dout("  max dev id has been reset\n");
3480 }
3481
3482 /*
3483  * Skips over white space at *buf, and updates *buf to point to the
3484  * first found non-space character (if any). Returns the length of
3485  * the token (string of non-white space characters) found.  Note
3486  * that *buf must be terminated with '\0'.
3487  */
3488 static inline size_t next_token(const char **buf)
3489 {
3490         /*
3491         * These are the characters that produce nonzero for
3492         * isspace() in the "C" and "POSIX" locales.
3493         */
3494         const char *spaces = " \f\n\r\t\v";
3495
3496         *buf += strspn(*buf, spaces);   /* Find start of token */
3497
3498         return strcspn(*buf, spaces);   /* Return token length */
3499 }
3500
3501 /*
3502  * Finds the next token in *buf, and if the provided token buffer is
3503  * big enough, copies the found token into it.  The result, if
3504  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3505  * must be terminated with '\0' on entry.
3506  *
3507  * Returns the length of the token found (not including the '\0').
3508  * Return value will be 0 if no token is found, and it will be >=
3509  * token_size if the token would not fit.
3510  *
3511  * The *buf pointer will be updated to point beyond the end of the
3512  * found token.  Note that this occurs even if the token buffer is
3513  * too small to hold it.
3514  */
3515 static inline size_t copy_token(const char **buf,
3516                                 char *token,
3517                                 size_t token_size)
3518 {
3519         size_t len;
3520
3521         len = next_token(buf);
3522         if (len < token_size) {
3523                 memcpy(token, *buf, len);
3524                 *(token + len) = '\0';
3525         }
3526         *buf += len;
3527
3528         return len;
3529 }
3530
3531 /*
3532  * Finds the next token in *buf, dynamically allocates a buffer big
3533  * enough to hold a copy of it, and copies the token into the new
3534  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3535  * that a duplicate buffer is created even for a zero-length token.
3536  *
3537  * Returns a pointer to the newly-allocated duplicate, or a null
3538  * pointer if memory for the duplicate was not available.  If
3539  * the lenp argument is a non-null pointer, the length of the token
3540  * (not including the '\0') is returned in *lenp.
3541  *
3542  * If successful, the *buf pointer will be updated to point beyond
3543  * the end of the found token.
3544  *
3545  * Note: uses GFP_KERNEL for allocation.
3546  */
3547 static inline char *dup_token(const char **buf, size_t *lenp)
3548 {
3549         char *dup;
3550         size_t len;
3551
3552         len = next_token(buf);
3553         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3554         if (!dup)
3555                 return NULL;
3556         *(dup + len) = '\0';
3557         *buf += len;
3558
3559         if (lenp)
3560                 *lenp = len;
3561
3562         return dup;
3563 }
3564
3565 /*
3566  * Parse the options provided for an "rbd add" (i.e., rbd image
3567  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3568  * and the data written is passed here via a NUL-terminated buffer.
3569  * Returns 0 if successful or an error code otherwise.
3570  *
3571  * The information extracted from these options is recorded in
3572  * the other parameters which return dynamically-allocated
3573  * structures:
3574  *  ceph_opts
3575  *      The address of a pointer that will refer to a ceph options
3576  *      structure.  Caller must release the returned pointer using
3577  *      ceph_destroy_options() when it is no longer needed.
3578  *  rbd_opts
3579  *      Address of an rbd options pointer.  Fully initialized by
3580  *      this function; caller must release with kfree().
3581  *  spec
3582  *      Address of an rbd image specification pointer.  Fully
3583  *      initialized by this function based on parsed options.
3584  *      Caller must release with rbd_spec_put().
3585  *
3586  * The options passed take this form:
3587  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3588  * where:
3589  *  <mon_addrs>
3590  *      A comma-separated list of one or more monitor addresses.
3591  *      A monitor address is an ip address, optionally followed
3592  *      by a port number (separated by a colon).
3593  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3594  *  <options>
3595  *      A comma-separated list of ceph and/or rbd options.
3596  *  <pool_name>
3597  *      The name of the rados pool containing the rbd image.
3598  *  <image_name>
3599  *      The name of the image in that pool to map.
3600  *  <snap_id>
3601  *      An optional snapshot id.  If provided, the mapping will
3602  *      present data from the image at the time that snapshot was
3603  *      created.  The image head is used if no snapshot id is
3604  *      provided.  Snapshot mappings are always read-only.
3605  */
3606 static int rbd_add_parse_args(const char *buf,
3607                                 struct ceph_options **ceph_opts,
3608                                 struct rbd_options **opts,
3609                                 struct rbd_spec **rbd_spec)
3610 {
3611         size_t len;
3612         char *options;
3613         const char *mon_addrs;
3614         size_t mon_addrs_size;
3615         struct rbd_spec *spec = NULL;
3616         struct rbd_options *rbd_opts = NULL;
3617         struct ceph_options *copts;
3618         int ret;
3619
3620         /* The first four tokens are required */
3621
3622         len = next_token(&buf);
3623         if (!len) {
3624                 rbd_warn(NULL, "no monitor address(es) provided");
3625                 return -EINVAL;
3626         }
3627         mon_addrs = buf;
3628         mon_addrs_size = len + 1;
3629         buf += len;
3630
3631         ret = -EINVAL;
3632         options = dup_token(&buf, NULL);
3633         if (!options)
3634                 return -ENOMEM;
3635         if (!*options) {
3636                 rbd_warn(NULL, "no options provided");
3637                 goto out_err;
3638         }
3639
3640         spec = rbd_spec_alloc();
3641         if (!spec)
3642                 goto out_mem;
3643
3644         spec->pool_name = dup_token(&buf, NULL);
3645         if (!spec->pool_name)
3646                 goto out_mem;
3647         if (!*spec->pool_name) {
3648                 rbd_warn(NULL, "no pool name provided");
3649                 goto out_err;
3650         }
3651
3652         spec->image_name = dup_token(&buf, NULL);
3653         if (!spec->image_name)
3654                 goto out_mem;
3655         if (!*spec->image_name) {
3656                 rbd_warn(NULL, "no image name provided");
3657                 goto out_err;
3658         }
3659
3660         /*
3661          * Snapshot name is optional; default is to use "-"
3662          * (indicating the head/no snapshot).
3663          */
3664         len = next_token(&buf);
3665         if (!len) {
3666                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3667                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3668         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3669                 ret = -ENAMETOOLONG;
3670                 goto out_err;
3671         }
3672         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3673         if (!spec->snap_name)
3674                 goto out_mem;
3675         *(spec->snap_name + len) = '\0';
3676
3677         /* Initialize all rbd options to the defaults */
3678
3679         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3680         if (!rbd_opts)
3681                 goto out_mem;
3682
3683         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3684
3685         copts = ceph_parse_options(options, mon_addrs,
3686                                         mon_addrs + mon_addrs_size - 1,
3687                                         parse_rbd_opts_token, rbd_opts);
3688         if (IS_ERR(copts)) {
3689                 ret = PTR_ERR(copts);
3690                 goto out_err;
3691         }
3692         kfree(options);
3693
3694         *ceph_opts = copts;
3695         *opts = rbd_opts;
3696         *rbd_spec = spec;
3697
3698         return 0;
3699 out_mem:
3700         ret = -ENOMEM;
3701 out_err:
3702         kfree(rbd_opts);
3703         rbd_spec_put(spec);
3704         kfree(options);
3705
3706         return ret;
3707 }
3708
3709 /*
3710  * An rbd format 2 image has a unique identifier, distinct from the
3711  * name given to it by the user.  Internally, that identifier is
3712  * what's used to specify the names of objects related to the image.
3713  *
3714  * A special "rbd id" object is used to map an rbd image name to its
3715  * id.  If that object doesn't exist, then there is no v2 rbd image
3716  * with the supplied name.
3717  *
3718  * This function will record the given rbd_dev's image_id field if
3719  * it can be determined, and in that case will return 0.  If any
3720  * errors occur a negative errno will be returned and the rbd_dev's
3721  * image_id field will be unchanged (and should be NULL).
3722  */
3723 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3724 {
3725         int ret;
3726         size_t size;
3727         char *object_name;
3728         void *response;
3729         void *p;
3730
3731         /*
3732          * When probing a parent image, the image id is already
3733          * known (and the image name likely is not).  There's no
3734          * need to fetch the image id again in this case.
3735          */
3736         if (rbd_dev->spec->image_id)
3737                 return 0;
3738
3739         /*
3740          * First, see if the format 2 image id file exists, and if
3741          * so, get the image's persistent id from it.
3742          */
3743         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3744         object_name = kmalloc(size, GFP_NOIO);
3745         if (!object_name)
3746                 return -ENOMEM;
3747         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3748         dout("rbd id object name is %s\n", object_name);
3749
3750         /* Response will be an encoded string, which includes a length */
3751
3752         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3753         response = kzalloc(size, GFP_NOIO);
3754         if (!response) {
3755                 ret = -ENOMEM;
3756                 goto out;
3757         }
3758
3759         ret = rbd_obj_method_sync(rbd_dev, object_name,
3760                                 "rbd", "get_id",
3761                                 NULL, 0,
3762                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3763         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3764         if (ret < 0)
3765                 goto out;
3766
3767         p = response;
3768         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3769                                                 p + RBD_IMAGE_ID_LEN_MAX,
3770                                                 NULL, GFP_NOIO);
3771         if (IS_ERR(rbd_dev->spec->image_id)) {
3772                 ret = PTR_ERR(rbd_dev->spec->image_id);
3773                 rbd_dev->spec->image_id = NULL;
3774         } else {
3775                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3776         }
3777 out:
3778         kfree(response);
3779         kfree(object_name);
3780
3781         return ret;
3782 }
3783
3784 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3785 {
3786         int ret;
3787         size_t size;
3788
3789         /* Version 1 images have no id; empty string is used */
3790
3791         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3792         if (!rbd_dev->spec->image_id)
3793                 return -ENOMEM;
3794
3795         /* Record the header object name for this rbd image. */
3796
3797         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3798         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3799         if (!rbd_dev->header_name) {
3800                 ret = -ENOMEM;
3801                 goto out_err;
3802         }
3803         sprintf(rbd_dev->header_name, "%s%s",
3804                 rbd_dev->spec->image_name, RBD_SUFFIX);
3805
3806         /* Populate rbd image metadata */
3807
3808         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3809         if (ret < 0)
3810                 goto out_err;
3811
3812         /* Version 1 images have no parent (no layering) */
3813
3814         rbd_dev->parent_spec = NULL;
3815         rbd_dev->parent_overlap = 0;
3816
3817         rbd_dev->image_format = 1;
3818
3819         dout("discovered version 1 image, header name is %s\n",
3820                 rbd_dev->header_name);
3821
3822         return 0;
3823
3824 out_err:
3825         kfree(rbd_dev->header_name);
3826         rbd_dev->header_name = NULL;
3827         kfree(rbd_dev->spec->image_id);
3828         rbd_dev->spec->image_id = NULL;
3829
3830         return ret;
3831 }
3832
3833 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3834 {
3835         size_t size;
3836         int ret;
3837         u64 ver = 0;
3838
3839         /*
3840          * Image id was filled in by the caller.  Record the header
3841          * object name for this rbd image.
3842          */
3843         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3844         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3845         if (!rbd_dev->header_name)
3846                 return -ENOMEM;
3847         sprintf(rbd_dev->header_name, "%s%s",
3848                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3849
3850         /* Get the size and object order for the image */
3851
3852         ret = rbd_dev_v2_image_size(rbd_dev);
3853         if (ret < 0)
3854                 goto out_err;
3855
3856         /* Get the object prefix (a.k.a. block_name) for the image */
3857
3858         ret = rbd_dev_v2_object_prefix(rbd_dev);
3859         if (ret < 0)
3860                 goto out_err;
3861
3862         /* Get the and check features for the image */
3863
3864         ret = rbd_dev_v2_features(rbd_dev);
3865         if (ret < 0)
3866                 goto out_err;
3867
3868         /* If the image supports layering, get the parent info */
3869
3870         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3871                 ret = rbd_dev_v2_parent_info(rbd_dev);
3872                 if (ret < 0)
3873                         goto out_err;
3874         }
3875
3876         /* crypto and compression type aren't (yet) supported for v2 images */
3877
3878         rbd_dev->header.crypt_type = 0;
3879         rbd_dev->header.comp_type = 0;
3880
3881         /* Get the snapshot context, plus the header version */
3882
3883         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3884         if (ret)
3885                 goto out_err;
3886         rbd_dev->header.obj_version = ver;
3887
3888         rbd_dev->image_format = 2;
3889
3890         dout("discovered version 2 image, header name is %s\n",
3891                 rbd_dev->header_name);
3892
3893         return 0;
3894 out_err:
3895         rbd_dev->parent_overlap = 0;
3896         rbd_spec_put(rbd_dev->parent_spec);
3897         rbd_dev->parent_spec = NULL;
3898         kfree(rbd_dev->header_name);
3899         rbd_dev->header_name = NULL;
3900         kfree(rbd_dev->header.object_prefix);
3901         rbd_dev->header.object_prefix = NULL;
3902
3903         return ret;
3904 }
3905
3906 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3907 {
3908         int ret;
3909
3910         /* no need to lock here, as rbd_dev is not registered yet */
3911         ret = rbd_dev_snaps_update(rbd_dev);
3912         if (ret)
3913                 return ret;
3914
3915         ret = rbd_dev_probe_update_spec(rbd_dev);
3916         if (ret)
3917                 goto err_out_snaps;
3918
3919         ret = rbd_dev_set_mapping(rbd_dev);
3920         if (ret)
3921                 goto err_out_snaps;
3922
3923         /* generate unique id: find highest unique id, add one */
3924         rbd_dev_id_get(rbd_dev);
3925
3926         /* Fill in the device name, now that we have its id. */
3927         BUILD_BUG_ON(DEV_NAME_LEN
3928                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3929         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3930
3931         /* Get our block major device number. */
3932
3933         ret = register_blkdev(0, rbd_dev->name);
3934         if (ret < 0)
3935                 goto err_out_id;
3936         rbd_dev->major = ret;
3937
3938         /* Set up the blkdev mapping. */
3939
3940         ret = rbd_init_disk(rbd_dev);
3941         if (ret)
3942                 goto err_out_blkdev;
3943
3944         ret = rbd_bus_add_dev(rbd_dev);
3945         if (ret)
3946                 goto err_out_disk;
3947
3948         /*
3949          * At this point cleanup in the event of an error is the job
3950          * of the sysfs code (initiated by rbd_bus_del_dev()).
3951          */
3952         down_write(&rbd_dev->header_rwsem);
3953         ret = rbd_dev_snaps_register(rbd_dev);
3954         up_write(&rbd_dev->header_rwsem);
3955         if (ret)
3956                 goto err_out_bus;
3957
3958         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3959         if (ret)
3960                 goto err_out_bus;
3961
3962         /* Everything's ready.  Announce the disk to the world. */
3963
3964         add_disk(rbd_dev->disk);
3965
3966         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3967                 (unsigned long long) rbd_dev->mapping.size);
3968
3969         return ret;
3970 err_out_bus:
3971         /* this will also clean up rest of rbd_dev stuff */
3972
3973         rbd_bus_del_dev(rbd_dev);
3974
3975         return ret;
3976 err_out_disk:
3977         rbd_free_disk(rbd_dev);
3978 err_out_blkdev:
3979         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3980 err_out_id:
3981         rbd_dev_id_put(rbd_dev);
3982 err_out_snaps:
3983         rbd_remove_all_snaps(rbd_dev);
3984
3985         return ret;
3986 }
3987
3988 /*
3989  * Probe for the existence of the header object for the given rbd
3990  * device.  For format 2 images this includes determining the image
3991  * id.
3992  */
3993 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3994 {
3995         int ret;
3996
3997         /*
3998          * Get the id from the image id object.  If it's not a
3999          * format 2 image, we'll get ENOENT back, and we'll assume
4000          * it's a format 1 image.
4001          */
4002         ret = rbd_dev_image_id(rbd_dev);
4003         if (ret)
4004                 ret = rbd_dev_v1_probe(rbd_dev);
4005         else
4006                 ret = rbd_dev_v2_probe(rbd_dev);
4007         if (ret) {
4008                 dout("probe failed, returning %d\n", ret);
4009
4010                 return ret;
4011         }
4012
4013         ret = rbd_dev_probe_finish(rbd_dev);
4014         if (ret)
4015                 rbd_header_free(&rbd_dev->header);
4016
4017         return ret;
4018 }
4019
4020 static ssize_t rbd_add(struct bus_type *bus,
4021                        const char *buf,
4022                        size_t count)
4023 {
4024         struct rbd_device *rbd_dev = NULL;
4025         struct ceph_options *ceph_opts = NULL;
4026         struct rbd_options *rbd_opts = NULL;
4027         struct rbd_spec *spec = NULL;
4028         struct rbd_client *rbdc;
4029         struct ceph_osd_client *osdc;
4030         int rc = -ENOMEM;
4031
4032         if (!try_module_get(THIS_MODULE))
4033                 return -ENODEV;
4034
4035         /* parse add command */
4036         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4037         if (rc < 0)
4038                 goto err_out_module;
4039
4040         rbdc = rbd_get_client(ceph_opts);
4041         if (IS_ERR(rbdc)) {
4042                 rc = PTR_ERR(rbdc);
4043                 goto err_out_args;
4044         }
4045         ceph_opts = NULL;       /* rbd_dev client now owns this */
4046
4047         /* pick the pool */
4048         osdc = &rbdc->client->osdc;
4049         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4050         if (rc < 0)
4051                 goto err_out_client;
4052         spec->pool_id = (u64) rc;
4053
4054         /* The ceph file layout needs to fit pool id in 32 bits */
4055
4056         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4057                 rc = -EIO;
4058                 goto err_out_client;
4059         }
4060
4061         rbd_dev = rbd_dev_create(rbdc, spec);
4062         if (!rbd_dev)
4063                 goto err_out_client;
4064         rbdc = NULL;            /* rbd_dev now owns this */
4065         spec = NULL;            /* rbd_dev now owns this */
4066
4067         rbd_dev->mapping.read_only = rbd_opts->read_only;
4068         kfree(rbd_opts);
4069         rbd_opts = NULL;        /* done with this */
4070
4071         rc = rbd_dev_probe(rbd_dev);
4072         if (rc < 0)
4073                 goto err_out_rbd_dev;
4074
4075         return count;
4076 err_out_rbd_dev:
4077         rbd_dev_destroy(rbd_dev);
4078 err_out_client:
4079         rbd_put_client(rbdc);
4080 err_out_args:
4081         if (ceph_opts)
4082                 ceph_destroy_options(ceph_opts);
4083         kfree(rbd_opts);
4084         rbd_spec_put(spec);
4085 err_out_module:
4086         module_put(THIS_MODULE);
4087
4088         dout("Error adding device %s\n", buf);
4089
4090         return (ssize_t) rc;
4091 }
4092
4093 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4094 {
4095         struct list_head *tmp;
4096         struct rbd_device *rbd_dev;
4097
4098         spin_lock(&rbd_dev_list_lock);
4099         list_for_each(tmp, &rbd_dev_list) {
4100                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4101                 if (rbd_dev->dev_id == dev_id) {
4102                         spin_unlock(&rbd_dev_list_lock);
4103                         return rbd_dev;
4104                 }
4105         }
4106         spin_unlock(&rbd_dev_list_lock);
4107         return NULL;
4108 }
4109
4110 static void rbd_dev_release(struct device *dev)
4111 {
4112         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4113
4114         if (rbd_dev->watch_event)
4115                 rbd_dev_header_watch_sync(rbd_dev, 0);
4116
4117         /* clean up and free blkdev */
4118         rbd_free_disk(rbd_dev);
4119         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4120
4121         /* release allocated disk header fields */
4122         rbd_header_free(&rbd_dev->header);
4123
4124         /* done with the id, and with the rbd_dev */
4125         rbd_dev_id_put(rbd_dev);
4126         rbd_assert(rbd_dev->rbd_client != NULL);
4127         rbd_dev_destroy(rbd_dev);
4128
4129         /* release module ref */
4130         module_put(THIS_MODULE);
4131 }
4132
4133 static ssize_t rbd_remove(struct bus_type *bus,
4134                           const char *buf,
4135                           size_t count)
4136 {
4137         struct rbd_device *rbd_dev = NULL;
4138         int target_id, rc;
4139         unsigned long ul;
4140         int ret = count;
4141
4142         rc = strict_strtoul(buf, 10, &ul);
4143         if (rc)
4144                 return rc;
4145
4146         /* convert to int; abort if we lost anything in the conversion */
4147         target_id = (int) ul;
4148         if (target_id != ul)
4149                 return -EINVAL;
4150
4151         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4152
4153         rbd_dev = __rbd_get_dev(target_id);
4154         if (!rbd_dev) {
4155                 ret = -ENOENT;
4156                 goto done;
4157         }
4158
4159         spin_lock_irq(&rbd_dev->lock);
4160         if (rbd_dev->open_count)
4161                 ret = -EBUSY;
4162         else
4163                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4164         spin_unlock_irq(&rbd_dev->lock);
4165         if (ret < 0)
4166                 goto done;
4167
4168         rbd_remove_all_snaps(rbd_dev);
4169         rbd_bus_del_dev(rbd_dev);
4170
4171 done:
4172         mutex_unlock(&ctl_mutex);
4173
4174         return ret;
4175 }
4176
4177 /*
4178  * create control files in sysfs
4179  * /sys/bus/rbd/...
4180  */
4181 static int rbd_sysfs_init(void)
4182 {
4183         int ret;
4184
4185         ret = device_register(&rbd_root_dev);
4186         if (ret < 0)
4187                 return ret;
4188
4189         ret = bus_register(&rbd_bus_type);
4190         if (ret < 0)
4191                 device_unregister(&rbd_root_dev);
4192
4193         return ret;
4194 }
4195
4196 static void rbd_sysfs_cleanup(void)
4197 {
4198         bus_unregister(&rbd_bus_type);
4199         device_unregister(&rbd_root_dev);
4200 }
4201
4202 static int __init rbd_init(void)
4203 {
4204         int rc;
4205
4206         if (!libceph_compatible(NULL)) {
4207                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4208
4209                 return -EINVAL;
4210         }
4211         rc = rbd_sysfs_init();
4212         if (rc)
4213                 return rc;
4214         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4215         return 0;
4216 }
4217
4218 static void __exit rbd_exit(void)
4219 {
4220         rbd_sysfs_cleanup();
4221 }
4222
4223 module_init(rbd_init);
4224 module_exit(rbd_exit);
4225
4226 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4227 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4228 MODULE_DESCRIPTION("rados block device");
4229
4230 /* following authorship retained from original osdblk.c */
4231 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4232
4233 MODULE_LICENSE("GPL");