]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
libceph: make method call data be a separate data item
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING      1
77
78 /* Features supported by this (client software) implementation. */
79
80 #define RBD_FEATURES_ALL          (0)
81
82 /*
83  * An RBD device name will be "rbd#", where the "rbd" comes from
84  * RBD_DRV_NAME above, and # is a unique integer identifier.
85  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
86  * enough to hold all possible device names.
87  */
88 #define DEV_NAME_LEN            32
89 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
90
91 /*
92  * block device image metadata (in-memory version)
93  */
94 struct rbd_image_header {
95         /* These four fields never change for a given rbd image */
96         char *object_prefix;
97         u64 features;
98         __u8 obj_order;
99         __u8 crypt_type;
100         __u8 comp_type;
101
102         /* The remaining fields need to be updated occasionally */
103         u64 image_size;
104         struct ceph_snap_context *snapc;
105         char *snap_names;
106         u64 *snap_sizes;
107
108         u64 obj_version;
109 };
110
111 /*
112  * An rbd image specification.
113  *
114  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
115  * identify an image.  Each rbd_dev structure includes a pointer to
116  * an rbd_spec structure that encapsulates this identity.
117  *
118  * Each of the id's in an rbd_spec has an associated name.  For a
119  * user-mapped image, the names are supplied and the id's associated
120  * with them are looked up.  For a layered image, a parent image is
121  * defined by the tuple, and the names are looked up.
122  *
123  * An rbd_dev structure contains a parent_spec pointer which is
124  * non-null if the image it represents is a child in a layered
125  * image.  This pointer will refer to the rbd_spec structure used
126  * by the parent rbd_dev for its own identity (i.e., the structure
127  * is shared between the parent and child).
128  *
129  * Since these structures are populated once, during the discovery
130  * phase of image construction, they are effectively immutable so
131  * we make no effort to synchronize access to them.
132  *
133  * Note that code herein does not assume the image name is known (it
134  * could be a null pointer).
135  */
136 struct rbd_spec {
137         u64             pool_id;
138         char            *pool_name;
139
140         char            *image_id;
141         char            *image_name;
142
143         u64             snap_id;
144         char            *snap_name;
145
146         struct kref     kref;
147 };
148
149 /*
150  * an instance of the client.  multiple devices may share an rbd client.
151  */
152 struct rbd_client {
153         struct ceph_client      *client;
154         struct kref             kref;
155         struct list_head        node;
156 };
157
158 struct rbd_img_request;
159 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
160
161 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
162
163 struct rbd_obj_request;
164 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
165
166 enum obj_request_type {
167         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
168 };
169
170 struct rbd_obj_request {
171         const char              *object_name;
172         u64                     offset;         /* object start byte */
173         u64                     length;         /* bytes from offset */
174
175         struct rbd_img_request  *img_request;
176         struct list_head        links;          /* img_request->obj_requests */
177         u32                     which;          /* posn image request list */
178
179         enum obj_request_type   type;
180         union {
181                 struct bio      *bio_list;
182                 struct {
183                         struct page     **pages;
184                         u32             page_count;
185                 };
186         };
187
188         struct ceph_osd_request *osd_req;
189
190         u64                     xferred;        /* bytes transferred */
191         u64                     version;
192         int                     result;
193         atomic_t                done;
194
195         rbd_obj_callback_t      callback;
196         struct completion       completion;
197
198         struct kref             kref;
199 };
200
201 struct rbd_img_request {
202         struct request          *rq;
203         struct rbd_device       *rbd_dev;
204         u64                     offset; /* starting image byte offset */
205         u64                     length; /* byte count from offset */
206         bool                    write_request;  /* false for read */
207         union {
208                 struct ceph_snap_context *snapc;        /* for writes */
209                 u64             snap_id;                /* for reads */
210         };
211         spinlock_t              completion_lock;/* protects next_completion */
212         u32                     next_completion;
213         rbd_img_callback_t      callback;
214
215         u32                     obj_request_count;
216         struct list_head        obj_requests;   /* rbd_obj_request structs */
217
218         struct kref             kref;
219 };
220
221 #define for_each_obj_request(ireq, oreq) \
222         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
223 #define for_each_obj_request_from(ireq, oreq) \
224         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
225 #define for_each_obj_request_safe(ireq, oreq, n) \
226         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
227
228 struct rbd_snap {
229         struct  device          dev;
230         const char              *name;
231         u64                     size;
232         struct list_head        node;
233         u64                     id;
234         u64                     features;
235 };
236
237 struct rbd_mapping {
238         u64                     size;
239         u64                     features;
240         bool                    read_only;
241 };
242
243 /*
244  * a single device
245  */
246 struct rbd_device {
247         int                     dev_id;         /* blkdev unique id */
248
249         int                     major;          /* blkdev assigned major */
250         struct gendisk          *disk;          /* blkdev's gendisk and rq */
251
252         u32                     image_format;   /* Either 1 or 2 */
253         struct rbd_client       *rbd_client;
254
255         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
256
257         spinlock_t              lock;           /* queue, flags, open_count */
258
259         struct rbd_image_header header;
260         unsigned long           flags;          /* possibly lock protected */
261         struct rbd_spec         *spec;
262
263         char                    *header_name;
264
265         struct ceph_file_layout layout;
266
267         struct ceph_osd_event   *watch_event;
268         struct rbd_obj_request  *watch_request;
269
270         struct rbd_spec         *parent_spec;
271         u64                     parent_overlap;
272
273         /* protects updating the header */
274         struct rw_semaphore     header_rwsem;
275
276         struct rbd_mapping      mapping;
277
278         struct list_head        node;
279
280         /* list of snapshots */
281         struct list_head        snaps;
282
283         /* sysfs related */
284         struct device           dev;
285         unsigned long           open_count;     /* protected by lock */
286 };
287
288 /*
289  * Flag bits for rbd_dev->flags.  If atomicity is required,
290  * rbd_dev->lock is used to protect access.
291  *
292  * Currently, only the "removing" flag (which is coupled with the
293  * "open_count" field) requires atomic access.
294  */
295 enum rbd_dev_flags {
296         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
297         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
298 };
299
300 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
301
302 static LIST_HEAD(rbd_dev_list);    /* devices */
303 static DEFINE_SPINLOCK(rbd_dev_list_lock);
304
305 static LIST_HEAD(rbd_client_list);              /* clients */
306 static DEFINE_SPINLOCK(rbd_client_list_lock);
307
308 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
309 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
310
311 static void rbd_dev_release(struct device *dev);
312 static void rbd_remove_snap_dev(struct rbd_snap *snap);
313
314 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
315                        size_t count);
316 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
317                           size_t count);
318
319 static struct bus_attribute rbd_bus_attrs[] = {
320         __ATTR(add, S_IWUSR, NULL, rbd_add),
321         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
322         __ATTR_NULL
323 };
324
325 static struct bus_type rbd_bus_type = {
326         .name           = "rbd",
327         .bus_attrs      = rbd_bus_attrs,
328 };
329
330 static void rbd_root_dev_release(struct device *dev)
331 {
332 }
333
334 static struct device rbd_root_dev = {
335         .init_name =    "rbd",
336         .release =      rbd_root_dev_release,
337 };
338
339 static __printf(2, 3)
340 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
341 {
342         struct va_format vaf;
343         va_list args;
344
345         va_start(args, fmt);
346         vaf.fmt = fmt;
347         vaf.va = &args;
348
349         if (!rbd_dev)
350                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
351         else if (rbd_dev->disk)
352                 printk(KERN_WARNING "%s: %s: %pV\n",
353                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
354         else if (rbd_dev->spec && rbd_dev->spec->image_name)
355                 printk(KERN_WARNING "%s: image %s: %pV\n",
356                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
357         else if (rbd_dev->spec && rbd_dev->spec->image_id)
358                 printk(KERN_WARNING "%s: id %s: %pV\n",
359                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
360         else    /* punt */
361                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
362                         RBD_DRV_NAME, rbd_dev, &vaf);
363         va_end(args);
364 }
365
366 #ifdef RBD_DEBUG
367 #define rbd_assert(expr)                                                \
368                 if (unlikely(!(expr))) {                                \
369                         printk(KERN_ERR "\nAssertion failure in %s() "  \
370                                                 "at line %d:\n\n"       \
371                                         "\trbd_assert(%s);\n\n",        \
372                                         __func__, __LINE__, #expr);     \
373                         BUG();                                          \
374                 }
375 #else /* !RBD_DEBUG */
376 #  define rbd_assert(expr)      ((void) 0)
377 #endif /* !RBD_DEBUG */
378
379 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
380 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
381
382 static int rbd_open(struct block_device *bdev, fmode_t mode)
383 {
384         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
385         bool removing = false;
386
387         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
388                 return -EROFS;
389
390         spin_lock_irq(&rbd_dev->lock);
391         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
392                 removing = true;
393         else
394                 rbd_dev->open_count++;
395         spin_unlock_irq(&rbd_dev->lock);
396         if (removing)
397                 return -ENOENT;
398
399         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
400         (void) get_device(&rbd_dev->dev);
401         set_device_ro(bdev, rbd_dev->mapping.read_only);
402         mutex_unlock(&ctl_mutex);
403
404         return 0;
405 }
406
407 static int rbd_release(struct gendisk *disk, fmode_t mode)
408 {
409         struct rbd_device *rbd_dev = disk->private_data;
410         unsigned long open_count_before;
411
412         spin_lock_irq(&rbd_dev->lock);
413         open_count_before = rbd_dev->open_count--;
414         spin_unlock_irq(&rbd_dev->lock);
415         rbd_assert(open_count_before > 0);
416
417         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
418         put_device(&rbd_dev->dev);
419         mutex_unlock(&ctl_mutex);
420
421         return 0;
422 }
423
424 static const struct block_device_operations rbd_bd_ops = {
425         .owner                  = THIS_MODULE,
426         .open                   = rbd_open,
427         .release                = rbd_release,
428 };
429
430 /*
431  * Initialize an rbd client instance.
432  * We own *ceph_opts.
433  */
434 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
435 {
436         struct rbd_client *rbdc;
437         int ret = -ENOMEM;
438
439         dout("%s:\n", __func__);
440         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
441         if (!rbdc)
442                 goto out_opt;
443
444         kref_init(&rbdc->kref);
445         INIT_LIST_HEAD(&rbdc->node);
446
447         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448
449         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
450         if (IS_ERR(rbdc->client))
451                 goto out_mutex;
452         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
453
454         ret = ceph_open_session(rbdc->client);
455         if (ret < 0)
456                 goto out_err;
457
458         spin_lock(&rbd_client_list_lock);
459         list_add_tail(&rbdc->node, &rbd_client_list);
460         spin_unlock(&rbd_client_list_lock);
461
462         mutex_unlock(&ctl_mutex);
463         dout("%s: rbdc %p\n", __func__, rbdc);
464
465         return rbdc;
466
467 out_err:
468         ceph_destroy_client(rbdc->client);
469 out_mutex:
470         mutex_unlock(&ctl_mutex);
471         kfree(rbdc);
472 out_opt:
473         if (ceph_opts)
474                 ceph_destroy_options(ceph_opts);
475         dout("%s: error %d\n", __func__, ret);
476
477         return ERR_PTR(ret);
478 }
479
480 /*
481  * Find a ceph client with specific addr and configuration.  If
482  * found, bump its reference count.
483  */
484 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
485 {
486         struct rbd_client *client_node;
487         bool found = false;
488
489         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
490                 return NULL;
491
492         spin_lock(&rbd_client_list_lock);
493         list_for_each_entry(client_node, &rbd_client_list, node) {
494                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
495                         kref_get(&client_node->kref);
496                         found = true;
497                         break;
498                 }
499         }
500         spin_unlock(&rbd_client_list_lock);
501
502         return found ? client_node : NULL;
503 }
504
505 /*
506  * mount options
507  */
508 enum {
509         Opt_last_int,
510         /* int args above */
511         Opt_last_string,
512         /* string args above */
513         Opt_read_only,
514         Opt_read_write,
515         /* Boolean args above */
516         Opt_last_bool,
517 };
518
519 static match_table_t rbd_opts_tokens = {
520         /* int args above */
521         /* string args above */
522         {Opt_read_only, "read_only"},
523         {Opt_read_only, "ro"},          /* Alternate spelling */
524         {Opt_read_write, "read_write"},
525         {Opt_read_write, "rw"},         /* Alternate spelling */
526         /* Boolean args above */
527         {-1, NULL}
528 };
529
530 struct rbd_options {
531         bool    read_only;
532 };
533
534 #define RBD_READ_ONLY_DEFAULT   false
535
536 static int parse_rbd_opts_token(char *c, void *private)
537 {
538         struct rbd_options *rbd_opts = private;
539         substring_t argstr[MAX_OPT_ARGS];
540         int token, intval, ret;
541
542         token = match_token(c, rbd_opts_tokens, argstr);
543         if (token < 0)
544                 return -EINVAL;
545
546         if (token < Opt_last_int) {
547                 ret = match_int(&argstr[0], &intval);
548                 if (ret < 0) {
549                         pr_err("bad mount option arg (not int) "
550                                "at '%s'\n", c);
551                         return ret;
552                 }
553                 dout("got int token %d val %d\n", token, intval);
554         } else if (token > Opt_last_int && token < Opt_last_string) {
555                 dout("got string token %d val %s\n", token,
556                      argstr[0].from);
557         } else if (token > Opt_last_string && token < Opt_last_bool) {
558                 dout("got Boolean token %d\n", token);
559         } else {
560                 dout("got token %d\n", token);
561         }
562
563         switch (token) {
564         case Opt_read_only:
565                 rbd_opts->read_only = true;
566                 break;
567         case Opt_read_write:
568                 rbd_opts->read_only = false;
569                 break;
570         default:
571                 rbd_assert(false);
572                 break;
573         }
574         return 0;
575 }
576
577 /*
578  * Get a ceph client with specific addr and configuration, if one does
579  * not exist create it.
580  */
581 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
582 {
583         struct rbd_client *rbdc;
584
585         rbdc = rbd_client_find(ceph_opts);
586         if (rbdc)       /* using an existing client */
587                 ceph_destroy_options(ceph_opts);
588         else
589                 rbdc = rbd_client_create(ceph_opts);
590
591         return rbdc;
592 }
593
594 /*
595  * Destroy ceph client
596  *
597  * Caller must hold rbd_client_list_lock.
598  */
599 static void rbd_client_release(struct kref *kref)
600 {
601         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
602
603         dout("%s: rbdc %p\n", __func__, rbdc);
604         spin_lock(&rbd_client_list_lock);
605         list_del(&rbdc->node);
606         spin_unlock(&rbd_client_list_lock);
607
608         ceph_destroy_client(rbdc->client);
609         kfree(rbdc);
610 }
611
612 /*
613  * Drop reference to ceph client node. If it's not referenced anymore, release
614  * it.
615  */
616 static void rbd_put_client(struct rbd_client *rbdc)
617 {
618         if (rbdc)
619                 kref_put(&rbdc->kref, rbd_client_release);
620 }
621
622 static bool rbd_image_format_valid(u32 image_format)
623 {
624         return image_format == 1 || image_format == 2;
625 }
626
627 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
628 {
629         size_t size;
630         u32 snap_count;
631
632         /* The header has to start with the magic rbd header text */
633         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
634                 return false;
635
636         /* The bio layer requires at least sector-sized I/O */
637
638         if (ondisk->options.order < SECTOR_SHIFT)
639                 return false;
640
641         /* If we use u64 in a few spots we may be able to loosen this */
642
643         if (ondisk->options.order > 8 * sizeof (int) - 1)
644                 return false;
645
646         /*
647          * The size of a snapshot header has to fit in a size_t, and
648          * that limits the number of snapshots.
649          */
650         snap_count = le32_to_cpu(ondisk->snap_count);
651         size = SIZE_MAX - sizeof (struct ceph_snap_context);
652         if (snap_count > size / sizeof (__le64))
653                 return false;
654
655         /*
656          * Not only that, but the size of the entire the snapshot
657          * header must also be representable in a size_t.
658          */
659         size -= snap_count * sizeof (__le64);
660         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
661                 return false;
662
663         return true;
664 }
665
666 /*
667  * Create a new header structure, translate header format from the on-disk
668  * header.
669  */
670 static int rbd_header_from_disk(struct rbd_image_header *header,
671                                  struct rbd_image_header_ondisk *ondisk)
672 {
673         u32 snap_count;
674         size_t len;
675         size_t size;
676         u32 i;
677
678         memset(header, 0, sizeof (*header));
679
680         snap_count = le32_to_cpu(ondisk->snap_count);
681
682         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
683         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
684         if (!header->object_prefix)
685                 return -ENOMEM;
686         memcpy(header->object_prefix, ondisk->object_prefix, len);
687         header->object_prefix[len] = '\0';
688
689         if (snap_count) {
690                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
691
692                 /* Save a copy of the snapshot names */
693
694                 if (snap_names_len > (u64) SIZE_MAX)
695                         return -EIO;
696                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
697                 if (!header->snap_names)
698                         goto out_err;
699                 /*
700                  * Note that rbd_dev_v1_header_read() guarantees
701                  * the ondisk buffer we're working with has
702                  * snap_names_len bytes beyond the end of the
703                  * snapshot id array, this memcpy() is safe.
704                  */
705                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
706                         snap_names_len);
707
708                 /* Record each snapshot's size */
709
710                 size = snap_count * sizeof (*header->snap_sizes);
711                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
712                 if (!header->snap_sizes)
713                         goto out_err;
714                 for (i = 0; i < snap_count; i++)
715                         header->snap_sizes[i] =
716                                 le64_to_cpu(ondisk->snaps[i].image_size);
717         } else {
718                 WARN_ON(ondisk->snap_names_len);
719                 header->snap_names = NULL;
720                 header->snap_sizes = NULL;
721         }
722
723         header->features = 0;   /* No features support in v1 images */
724         header->obj_order = ondisk->options.order;
725         header->crypt_type = ondisk->options.crypt_type;
726         header->comp_type = ondisk->options.comp_type;
727
728         /* Allocate and fill in the snapshot context */
729
730         header->image_size = le64_to_cpu(ondisk->image_size);
731         size = sizeof (struct ceph_snap_context);
732         size += snap_count * sizeof (header->snapc->snaps[0]);
733         header->snapc = kzalloc(size, GFP_KERNEL);
734         if (!header->snapc)
735                 goto out_err;
736
737         atomic_set(&header->snapc->nref, 1);
738         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
739         header->snapc->num_snaps = snap_count;
740         for (i = 0; i < snap_count; i++)
741                 header->snapc->snaps[i] =
742                         le64_to_cpu(ondisk->snaps[i].id);
743
744         return 0;
745
746 out_err:
747         kfree(header->snap_sizes);
748         header->snap_sizes = NULL;
749         kfree(header->snap_names);
750         header->snap_names = NULL;
751         kfree(header->object_prefix);
752         header->object_prefix = NULL;
753
754         return -ENOMEM;
755 }
756
757 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
758 {
759         struct rbd_snap *snap;
760
761         if (snap_id == CEPH_NOSNAP)
762                 return RBD_SNAP_HEAD_NAME;
763
764         list_for_each_entry(snap, &rbd_dev->snaps, node)
765                 if (snap_id == snap->id)
766                         return snap->name;
767
768         return NULL;
769 }
770
771 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
772 {
773
774         struct rbd_snap *snap;
775
776         list_for_each_entry(snap, &rbd_dev->snaps, node) {
777                 if (!strcmp(snap_name, snap->name)) {
778                         rbd_dev->spec->snap_id = snap->id;
779                         rbd_dev->mapping.size = snap->size;
780                         rbd_dev->mapping.features = snap->features;
781
782                         return 0;
783                 }
784         }
785
786         return -ENOENT;
787 }
788
789 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
790 {
791         int ret;
792
793         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
794                     sizeof (RBD_SNAP_HEAD_NAME))) {
795                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
796                 rbd_dev->mapping.size = rbd_dev->header.image_size;
797                 rbd_dev->mapping.features = rbd_dev->header.features;
798                 ret = 0;
799         } else {
800                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
801                 if (ret < 0)
802                         goto done;
803                 rbd_dev->mapping.read_only = true;
804         }
805         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
806
807 done:
808         return ret;
809 }
810
811 static void rbd_header_free(struct rbd_image_header *header)
812 {
813         kfree(header->object_prefix);
814         header->object_prefix = NULL;
815         kfree(header->snap_sizes);
816         header->snap_sizes = NULL;
817         kfree(header->snap_names);
818         header->snap_names = NULL;
819         ceph_put_snap_context(header->snapc);
820         header->snapc = NULL;
821 }
822
823 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
824 {
825         char *name;
826         u64 segment;
827         int ret;
828
829         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
830         if (!name)
831                 return NULL;
832         segment = offset >> rbd_dev->header.obj_order;
833         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
834                         rbd_dev->header.object_prefix, segment);
835         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
836                 pr_err("error formatting segment name for #%llu (%d)\n",
837                         segment, ret);
838                 kfree(name);
839                 name = NULL;
840         }
841
842         return name;
843 }
844
845 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
846 {
847         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
848
849         return offset & (segment_size - 1);
850 }
851
852 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
853                                 u64 offset, u64 length)
854 {
855         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
856
857         offset &= segment_size - 1;
858
859         rbd_assert(length <= U64_MAX - offset);
860         if (offset + length > segment_size)
861                 length = segment_size - offset;
862
863         return length;
864 }
865
866 /*
867  * returns the size of an object in the image
868  */
869 static u64 rbd_obj_bytes(struct rbd_image_header *header)
870 {
871         return 1 << header->obj_order;
872 }
873
874 /*
875  * bio helpers
876  */
877
878 static void bio_chain_put(struct bio *chain)
879 {
880         struct bio *tmp;
881
882         while (chain) {
883                 tmp = chain;
884                 chain = chain->bi_next;
885                 bio_put(tmp);
886         }
887 }
888
889 /*
890  * zeros a bio chain, starting at specific offset
891  */
892 static void zero_bio_chain(struct bio *chain, int start_ofs)
893 {
894         struct bio_vec *bv;
895         unsigned long flags;
896         void *buf;
897         int i;
898         int pos = 0;
899
900         while (chain) {
901                 bio_for_each_segment(bv, chain, i) {
902                         if (pos + bv->bv_len > start_ofs) {
903                                 int remainder = max(start_ofs - pos, 0);
904                                 buf = bvec_kmap_irq(bv, &flags);
905                                 memset(buf + remainder, 0,
906                                        bv->bv_len - remainder);
907                                 bvec_kunmap_irq(buf, &flags);
908                         }
909                         pos += bv->bv_len;
910                 }
911
912                 chain = chain->bi_next;
913         }
914 }
915
916 /*
917  * Clone a portion of a bio, starting at the given byte offset
918  * and continuing for the number of bytes indicated.
919  */
920 static struct bio *bio_clone_range(struct bio *bio_src,
921                                         unsigned int offset,
922                                         unsigned int len,
923                                         gfp_t gfpmask)
924 {
925         struct bio_vec *bv;
926         unsigned int resid;
927         unsigned short idx;
928         unsigned int voff;
929         unsigned short end_idx;
930         unsigned short vcnt;
931         struct bio *bio;
932
933         /* Handle the easy case for the caller */
934
935         if (!offset && len == bio_src->bi_size)
936                 return bio_clone(bio_src, gfpmask);
937
938         if (WARN_ON_ONCE(!len))
939                 return NULL;
940         if (WARN_ON_ONCE(len > bio_src->bi_size))
941                 return NULL;
942         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
943                 return NULL;
944
945         /* Find first affected segment... */
946
947         resid = offset;
948         __bio_for_each_segment(bv, bio_src, idx, 0) {
949                 if (resid < bv->bv_len)
950                         break;
951                 resid -= bv->bv_len;
952         }
953         voff = resid;
954
955         /* ...and the last affected segment */
956
957         resid += len;
958         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
959                 if (resid <= bv->bv_len)
960                         break;
961                 resid -= bv->bv_len;
962         }
963         vcnt = end_idx - idx + 1;
964
965         /* Build the clone */
966
967         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
968         if (!bio)
969                 return NULL;    /* ENOMEM */
970
971         bio->bi_bdev = bio_src->bi_bdev;
972         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
973         bio->bi_rw = bio_src->bi_rw;
974         bio->bi_flags |= 1 << BIO_CLONED;
975
976         /*
977          * Copy over our part of the bio_vec, then update the first
978          * and last (or only) entries.
979          */
980         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
981                         vcnt * sizeof (struct bio_vec));
982         bio->bi_io_vec[0].bv_offset += voff;
983         if (vcnt > 1) {
984                 bio->bi_io_vec[0].bv_len -= voff;
985                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
986         } else {
987                 bio->bi_io_vec[0].bv_len = len;
988         }
989
990         bio->bi_vcnt = vcnt;
991         bio->bi_size = len;
992         bio->bi_idx = 0;
993
994         return bio;
995 }
996
997 /*
998  * Clone a portion of a bio chain, starting at the given byte offset
999  * into the first bio in the source chain and continuing for the
1000  * number of bytes indicated.  The result is another bio chain of
1001  * exactly the given length, or a null pointer on error.
1002  *
1003  * The bio_src and offset parameters are both in-out.  On entry they
1004  * refer to the first source bio and the offset into that bio where
1005  * the start of data to be cloned is located.
1006  *
1007  * On return, bio_src is updated to refer to the bio in the source
1008  * chain that contains first un-cloned byte, and *offset will
1009  * contain the offset of that byte within that bio.
1010  */
1011 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1012                                         unsigned int *offset,
1013                                         unsigned int len,
1014                                         gfp_t gfpmask)
1015 {
1016         struct bio *bi = *bio_src;
1017         unsigned int off = *offset;
1018         struct bio *chain = NULL;
1019         struct bio **end;
1020
1021         /* Build up a chain of clone bios up to the limit */
1022
1023         if (!bi || off >= bi->bi_size || !len)
1024                 return NULL;            /* Nothing to clone */
1025
1026         end = &chain;
1027         while (len) {
1028                 unsigned int bi_size;
1029                 struct bio *bio;
1030
1031                 if (!bi) {
1032                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1033                         goto out_err;   /* EINVAL; ran out of bio's */
1034                 }
1035                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1036                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1037                 if (!bio)
1038                         goto out_err;   /* ENOMEM */
1039
1040                 *end = bio;
1041                 end = &bio->bi_next;
1042
1043                 off += bi_size;
1044                 if (off == bi->bi_size) {
1045                         bi = bi->bi_next;
1046                         off = 0;
1047                 }
1048                 len -= bi_size;
1049         }
1050         *bio_src = bi;
1051         *offset = off;
1052
1053         return chain;
1054 out_err:
1055         bio_chain_put(chain);
1056
1057         return NULL;
1058 }
1059
1060 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1061 {
1062         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1063                 atomic_read(&obj_request->kref.refcount));
1064         kref_get(&obj_request->kref);
1065 }
1066
1067 static void rbd_obj_request_destroy(struct kref *kref);
1068 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1069 {
1070         rbd_assert(obj_request != NULL);
1071         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1072                 atomic_read(&obj_request->kref.refcount));
1073         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1074 }
1075
1076 static void rbd_img_request_get(struct rbd_img_request *img_request)
1077 {
1078         dout("%s: img %p (was %d)\n", __func__, img_request,
1079                 atomic_read(&img_request->kref.refcount));
1080         kref_get(&img_request->kref);
1081 }
1082
1083 static void rbd_img_request_destroy(struct kref *kref);
1084 static void rbd_img_request_put(struct rbd_img_request *img_request)
1085 {
1086         rbd_assert(img_request != NULL);
1087         dout("%s: img %p (was %d)\n", __func__, img_request,
1088                 atomic_read(&img_request->kref.refcount));
1089         kref_put(&img_request->kref, rbd_img_request_destroy);
1090 }
1091
1092 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1093                                         struct rbd_obj_request *obj_request)
1094 {
1095         rbd_assert(obj_request->img_request == NULL);
1096
1097         rbd_obj_request_get(obj_request);
1098         obj_request->img_request = img_request;
1099         obj_request->which = img_request->obj_request_count;
1100         rbd_assert(obj_request->which != BAD_WHICH);
1101         img_request->obj_request_count++;
1102         list_add_tail(&obj_request->links, &img_request->obj_requests);
1103         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1104                 obj_request->which);
1105 }
1106
1107 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1108                                         struct rbd_obj_request *obj_request)
1109 {
1110         rbd_assert(obj_request->which != BAD_WHICH);
1111
1112         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1113                 obj_request->which);
1114         list_del(&obj_request->links);
1115         rbd_assert(img_request->obj_request_count > 0);
1116         img_request->obj_request_count--;
1117         rbd_assert(obj_request->which == img_request->obj_request_count);
1118         obj_request->which = BAD_WHICH;
1119         rbd_assert(obj_request->img_request == img_request);
1120         obj_request->img_request = NULL;
1121         obj_request->callback = NULL;
1122         rbd_obj_request_put(obj_request);
1123 }
1124
1125 static bool obj_request_type_valid(enum obj_request_type type)
1126 {
1127         switch (type) {
1128         case OBJ_REQUEST_NODATA:
1129         case OBJ_REQUEST_BIO:
1130         case OBJ_REQUEST_PAGES:
1131                 return true;
1132         default:
1133                 return false;
1134         }
1135 }
1136
1137 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1138                                 struct rbd_obj_request *obj_request)
1139 {
1140         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1141
1142         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1143 }
1144
1145 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1146 {
1147         dout("%s: img %p\n", __func__, img_request);
1148         if (img_request->callback)
1149                 img_request->callback(img_request);
1150         else
1151                 rbd_img_request_put(img_request);
1152 }
1153
1154 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1155
1156 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1157 {
1158         dout("%s: obj %p\n", __func__, obj_request);
1159
1160         return wait_for_completion_interruptible(&obj_request->completion);
1161 }
1162
1163 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1164 {
1165         atomic_set(&obj_request->done, 0);
1166         smp_wmb();
1167 }
1168
1169 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1170 {
1171         int done;
1172
1173         done = atomic_inc_return(&obj_request->done);
1174         if (done > 1) {
1175                 struct rbd_img_request *img_request = obj_request->img_request;
1176                 struct rbd_device *rbd_dev;
1177
1178                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1179                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1180                         obj_request);
1181         }
1182 }
1183
1184 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1185 {
1186         smp_mb();
1187         return atomic_read(&obj_request->done) != 0;
1188 }
1189
1190 static void
1191 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1192 {
1193         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1194                 obj_request, obj_request->img_request, obj_request->result,
1195                 obj_request->xferred, obj_request->length);
1196         /*
1197          * ENOENT means a hole in the image.  We zero-fill the
1198          * entire length of the request.  A short read also implies
1199          * zero-fill to the end of the request.  Either way we
1200          * update the xferred count to indicate the whole request
1201          * was satisfied.
1202          */
1203         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1204         if (obj_request->result == -ENOENT) {
1205                 zero_bio_chain(obj_request->bio_list, 0);
1206                 obj_request->result = 0;
1207                 obj_request->xferred = obj_request->length;
1208         } else if (obj_request->xferred < obj_request->length &&
1209                         !obj_request->result) {
1210                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1211                 obj_request->xferred = obj_request->length;
1212         }
1213         obj_request_done_set(obj_request);
1214 }
1215
1216 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1217 {
1218         dout("%s: obj %p cb %p\n", __func__, obj_request,
1219                 obj_request->callback);
1220         if (obj_request->callback)
1221                 obj_request->callback(obj_request);
1222         else
1223                 complete_all(&obj_request->completion);
1224 }
1225
1226 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1227 {
1228         dout("%s: obj %p\n", __func__, obj_request);
1229         obj_request_done_set(obj_request);
1230 }
1231
1232 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1233 {
1234         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1235                 obj_request->result, obj_request->xferred, obj_request->length);
1236         if (obj_request->img_request)
1237                 rbd_img_obj_request_read_callback(obj_request);
1238         else
1239                 obj_request_done_set(obj_request);
1240 }
1241
1242 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1243 {
1244         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1245                 obj_request->result, obj_request->length);
1246         /*
1247          * There is no such thing as a successful short write.
1248          * Our xferred value is the number of bytes transferred
1249          * back.  Set it to our originally-requested length.
1250          */
1251         obj_request->xferred = obj_request->length;
1252         obj_request_done_set(obj_request);
1253 }
1254
1255 /*
1256  * For a simple stat call there's nothing to do.  We'll do more if
1257  * this is part of a write sequence for a layered image.
1258  */
1259 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1260 {
1261         dout("%s: obj %p\n", __func__, obj_request);
1262         obj_request_done_set(obj_request);
1263 }
1264
1265 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1266                                 struct ceph_msg *msg)
1267 {
1268         struct rbd_obj_request *obj_request = osd_req->r_priv;
1269         u16 opcode;
1270
1271         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1272         rbd_assert(osd_req == obj_request->osd_req);
1273         rbd_assert(!!obj_request->img_request ^
1274                                 (obj_request->which == BAD_WHICH));
1275
1276         if (osd_req->r_result < 0)
1277                 obj_request->result = osd_req->r_result;
1278         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1279
1280         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1281
1282         /*
1283          * We support a 64-bit length, but ultimately it has to be
1284          * passed to blk_end_request(), which takes an unsigned int.
1285          */
1286         obj_request->xferred = osd_req->r_reply_op_len[0];
1287         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1288         opcode = osd_req->r_ops[0].op;
1289         switch (opcode) {
1290         case CEPH_OSD_OP_READ:
1291                 rbd_osd_read_callback(obj_request);
1292                 break;
1293         case CEPH_OSD_OP_WRITE:
1294                 rbd_osd_write_callback(obj_request);
1295                 break;
1296         case CEPH_OSD_OP_STAT:
1297                 rbd_osd_stat_callback(obj_request);
1298                 break;
1299         case CEPH_OSD_OP_CALL:
1300         case CEPH_OSD_OP_NOTIFY_ACK:
1301         case CEPH_OSD_OP_WATCH:
1302                 rbd_osd_trivial_callback(obj_request);
1303                 break;
1304         default:
1305                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1306                         obj_request->object_name, (unsigned short) opcode);
1307                 break;
1308         }
1309
1310         if (obj_request_done_test(obj_request))
1311                 rbd_obj_request_complete(obj_request);
1312 }
1313
1314 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1315                                         bool write_request)
1316 {
1317         struct rbd_img_request *img_request = obj_request->img_request;
1318         struct ceph_osd_request *osd_req = obj_request->osd_req;
1319         struct ceph_snap_context *snapc = NULL;
1320         u64 snap_id = CEPH_NOSNAP;
1321         struct timespec *mtime = NULL;
1322         struct timespec now;
1323
1324         rbd_assert(osd_req != NULL);
1325
1326         if (write_request) {
1327                 now = CURRENT_TIME;
1328                 mtime = &now;
1329                 if (img_request)
1330                         snapc = img_request->snapc;
1331         } else if (img_request) {
1332                 snap_id = img_request->snap_id;
1333         }
1334         ceph_osdc_build_request(osd_req, obj_request->offset,
1335                         snapc, snap_id, mtime);
1336 }
1337
1338 static struct ceph_osd_request *rbd_osd_req_create(
1339                                         struct rbd_device *rbd_dev,
1340                                         bool write_request,
1341                                         struct rbd_obj_request *obj_request)
1342 {
1343         struct rbd_img_request *img_request = obj_request->img_request;
1344         struct ceph_snap_context *snapc = NULL;
1345         struct ceph_osd_client *osdc;
1346         struct ceph_osd_request *osd_req;
1347
1348         if (img_request) {
1349                 rbd_assert(img_request->write_request == write_request);
1350                 if (img_request->write_request)
1351                         snapc = img_request->snapc;
1352         }
1353
1354         /* Allocate and initialize the request, for the single op */
1355
1356         osdc = &rbd_dev->rbd_client->client->osdc;
1357         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1358         if (!osd_req)
1359                 return NULL;    /* ENOMEM */
1360
1361         if (write_request)
1362                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1363         else
1364                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1365
1366         osd_req->r_callback = rbd_osd_req_callback;
1367         osd_req->r_priv = obj_request;
1368
1369         osd_req->r_oid_len = strlen(obj_request->object_name);
1370         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1371         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1372
1373         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1374
1375         return osd_req;
1376 }
1377
1378 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1379 {
1380         ceph_osdc_put_request(osd_req);
1381 }
1382
1383 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1384
1385 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1386                                                 u64 offset, u64 length,
1387                                                 enum obj_request_type type)
1388 {
1389         struct rbd_obj_request *obj_request;
1390         size_t size;
1391         char *name;
1392
1393         rbd_assert(obj_request_type_valid(type));
1394
1395         size = strlen(object_name) + 1;
1396         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1397         if (!obj_request)
1398                 return NULL;
1399
1400         name = (char *)(obj_request + 1);
1401         obj_request->object_name = memcpy(name, object_name, size);
1402         obj_request->offset = offset;
1403         obj_request->length = length;
1404         obj_request->which = BAD_WHICH;
1405         obj_request->type = type;
1406         INIT_LIST_HEAD(&obj_request->links);
1407         obj_request_done_init(obj_request);
1408         init_completion(&obj_request->completion);
1409         kref_init(&obj_request->kref);
1410
1411         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1412                 offset, length, (int)type, obj_request);
1413
1414         return obj_request;
1415 }
1416
1417 static void rbd_obj_request_destroy(struct kref *kref)
1418 {
1419         struct rbd_obj_request *obj_request;
1420
1421         obj_request = container_of(kref, struct rbd_obj_request, kref);
1422
1423         dout("%s: obj %p\n", __func__, obj_request);
1424
1425         rbd_assert(obj_request->img_request == NULL);
1426         rbd_assert(obj_request->which == BAD_WHICH);
1427
1428         if (obj_request->osd_req)
1429                 rbd_osd_req_destroy(obj_request->osd_req);
1430
1431         rbd_assert(obj_request_type_valid(obj_request->type));
1432         switch (obj_request->type) {
1433         case OBJ_REQUEST_NODATA:
1434                 break;          /* Nothing to do */
1435         case OBJ_REQUEST_BIO:
1436                 if (obj_request->bio_list)
1437                         bio_chain_put(obj_request->bio_list);
1438                 break;
1439         case OBJ_REQUEST_PAGES:
1440                 if (obj_request->pages)
1441                         ceph_release_page_vector(obj_request->pages,
1442                                                 obj_request->page_count);
1443                 break;
1444         }
1445
1446         kfree(obj_request);
1447 }
1448
1449 /*
1450  * Caller is responsible for filling in the list of object requests
1451  * that comprises the image request, and the Linux request pointer
1452  * (if there is one).
1453  */
1454 static struct rbd_img_request *rbd_img_request_create(
1455                                         struct rbd_device *rbd_dev,
1456                                         u64 offset, u64 length,
1457                                         bool write_request)
1458 {
1459         struct rbd_img_request *img_request;
1460         struct ceph_snap_context *snapc = NULL;
1461
1462         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1463         if (!img_request)
1464                 return NULL;
1465
1466         if (write_request) {
1467                 down_read(&rbd_dev->header_rwsem);
1468                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1469                 up_read(&rbd_dev->header_rwsem);
1470                 if (WARN_ON(!snapc)) {
1471                         kfree(img_request);
1472                         return NULL;    /* Shouldn't happen */
1473                 }
1474         }
1475
1476         img_request->rq = NULL;
1477         img_request->rbd_dev = rbd_dev;
1478         img_request->offset = offset;
1479         img_request->length = length;
1480         img_request->write_request = write_request;
1481         if (write_request)
1482                 img_request->snapc = snapc;
1483         else
1484                 img_request->snap_id = rbd_dev->spec->snap_id;
1485         spin_lock_init(&img_request->completion_lock);
1486         img_request->next_completion = 0;
1487         img_request->callback = NULL;
1488         img_request->obj_request_count = 0;
1489         INIT_LIST_HEAD(&img_request->obj_requests);
1490         kref_init(&img_request->kref);
1491
1492         rbd_img_request_get(img_request);       /* Avoid a warning */
1493         rbd_img_request_put(img_request);       /* TEMPORARY */
1494
1495         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1496                 write_request ? "write" : "read", offset, length,
1497                 img_request);
1498
1499         return img_request;
1500 }
1501
1502 static void rbd_img_request_destroy(struct kref *kref)
1503 {
1504         struct rbd_img_request *img_request;
1505         struct rbd_obj_request *obj_request;
1506         struct rbd_obj_request *next_obj_request;
1507
1508         img_request = container_of(kref, struct rbd_img_request, kref);
1509
1510         dout("%s: img %p\n", __func__, img_request);
1511
1512         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1513                 rbd_img_obj_request_del(img_request, obj_request);
1514         rbd_assert(img_request->obj_request_count == 0);
1515
1516         if (img_request->write_request)
1517                 ceph_put_snap_context(img_request->snapc);
1518
1519         kfree(img_request);
1520 }
1521
1522 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1523 {
1524         struct rbd_img_request *img_request;
1525         u32 which = obj_request->which;
1526         bool more = true;
1527
1528         img_request = obj_request->img_request;
1529
1530         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1531         rbd_assert(img_request != NULL);
1532         rbd_assert(img_request->rq != NULL);
1533         rbd_assert(img_request->obj_request_count > 0);
1534         rbd_assert(which != BAD_WHICH);
1535         rbd_assert(which < img_request->obj_request_count);
1536         rbd_assert(which >= img_request->next_completion);
1537
1538         spin_lock_irq(&img_request->completion_lock);
1539         if (which != img_request->next_completion)
1540                 goto out;
1541
1542         for_each_obj_request_from(img_request, obj_request) {
1543                 unsigned int xferred;
1544                 int result;
1545
1546                 rbd_assert(more);
1547                 rbd_assert(which < img_request->obj_request_count);
1548
1549                 if (!obj_request_done_test(obj_request))
1550                         break;
1551
1552                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1553                 xferred = (unsigned int) obj_request->xferred;
1554                 result = (int) obj_request->result;
1555                 if (result)
1556                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1557                                 img_request->write_request ? "write" : "read",
1558                                 result, xferred);
1559
1560                 more = blk_end_request(img_request->rq, result, xferred);
1561                 which++;
1562         }
1563
1564         rbd_assert(more ^ (which == img_request->obj_request_count));
1565         img_request->next_completion = which;
1566 out:
1567         spin_unlock_irq(&img_request->completion_lock);
1568
1569         if (!more)
1570                 rbd_img_request_complete(img_request);
1571 }
1572
1573 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1574                                         struct bio *bio_list)
1575 {
1576         struct rbd_device *rbd_dev = img_request->rbd_dev;
1577         struct rbd_obj_request *obj_request = NULL;
1578         struct rbd_obj_request *next_obj_request;
1579         bool write_request = img_request->write_request;
1580         unsigned int bio_offset;
1581         u64 image_offset;
1582         u64 resid;
1583         u16 opcode;
1584
1585         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1586
1587         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1588         bio_offset = 0;
1589         image_offset = img_request->offset;
1590         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1591         resid = img_request->length;
1592         rbd_assert(resid > 0);
1593         while (resid) {
1594                 struct ceph_osd_request *osd_req;
1595                 const char *object_name;
1596                 unsigned int clone_size;
1597                 u64 offset;
1598                 u64 length;
1599
1600                 object_name = rbd_segment_name(rbd_dev, image_offset);
1601                 if (!object_name)
1602                         goto out_unwind;
1603                 offset = rbd_segment_offset(rbd_dev, image_offset);
1604                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1605                 obj_request = rbd_obj_request_create(object_name,
1606                                                 offset, length,
1607                                                 OBJ_REQUEST_BIO);
1608                 kfree(object_name);     /* object request has its own copy */
1609                 if (!obj_request)
1610                         goto out_unwind;
1611
1612                 rbd_assert(length <= (u64) UINT_MAX);
1613                 clone_size = (unsigned int) length;
1614                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1615                                                 &bio_offset, clone_size,
1616                                                 GFP_ATOMIC);
1617                 if (!obj_request->bio_list)
1618                         goto out_partial;
1619
1620                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1621                                                 obj_request);
1622                 if (!osd_req)
1623                         goto out_partial;
1624                 obj_request->osd_req = osd_req;
1625                 obj_request->callback = rbd_img_obj_callback;
1626
1627                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1628                                                 0, 0);
1629                 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1630                                 obj_request->bio_list, obj_request->length);
1631                 rbd_osd_req_format(obj_request, write_request);
1632
1633                 rbd_img_obj_request_add(img_request, obj_request);
1634
1635                 image_offset += length;
1636                 resid -= length;
1637         }
1638
1639         return 0;
1640
1641 out_partial:
1642         rbd_obj_request_put(obj_request);
1643 out_unwind:
1644         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1645                 rbd_obj_request_put(obj_request);
1646
1647         return -ENOMEM;
1648 }
1649
1650 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1651 {
1652         struct rbd_device *rbd_dev = img_request->rbd_dev;
1653         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1654         struct rbd_obj_request *obj_request;
1655         struct rbd_obj_request *next_obj_request;
1656
1657         dout("%s: img %p\n", __func__, img_request);
1658         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1659                 int ret;
1660
1661                 ret = rbd_obj_request_submit(osdc, obj_request);
1662                 if (ret)
1663                         return ret;
1664                 /*
1665                  * The image request has its own reference to each
1666                  * of its object requests, so we can safely drop the
1667                  * initial one here.
1668                  */
1669                 rbd_obj_request_put(obj_request);
1670         }
1671
1672         return 0;
1673 }
1674
1675 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1676                                    u64 ver, u64 notify_id)
1677 {
1678         struct rbd_obj_request *obj_request;
1679         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1680         int ret;
1681
1682         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1683                                                         OBJ_REQUEST_NODATA);
1684         if (!obj_request)
1685                 return -ENOMEM;
1686
1687         ret = -ENOMEM;
1688         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1689         if (!obj_request->osd_req)
1690                 goto out;
1691         obj_request->callback = rbd_obj_request_put;
1692
1693         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1694                                         notify_id, ver, 0);
1695         rbd_osd_req_format(obj_request, false);
1696
1697         ret = rbd_obj_request_submit(osdc, obj_request);
1698 out:
1699         if (ret)
1700                 rbd_obj_request_put(obj_request);
1701
1702         return ret;
1703 }
1704
1705 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1706 {
1707         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1708         u64 hver;
1709         int rc;
1710
1711         if (!rbd_dev)
1712                 return;
1713
1714         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1715                 rbd_dev->header_name, (unsigned long long) notify_id,
1716                 (unsigned int) opcode);
1717         rc = rbd_dev_refresh(rbd_dev, &hver);
1718         if (rc)
1719                 rbd_warn(rbd_dev, "got notification but failed to "
1720                            " update snaps: %d\n", rc);
1721
1722         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1723 }
1724
1725 /*
1726  * Request sync osd watch/unwatch.  The value of "start" determines
1727  * whether a watch request is being initiated or torn down.
1728  */
1729 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1730 {
1731         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1732         struct rbd_obj_request *obj_request;
1733         int ret;
1734
1735         rbd_assert(start ^ !!rbd_dev->watch_event);
1736         rbd_assert(start ^ !!rbd_dev->watch_request);
1737
1738         if (start) {
1739                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1740                                                 &rbd_dev->watch_event);
1741                 if (ret < 0)
1742                         return ret;
1743                 rbd_assert(rbd_dev->watch_event != NULL);
1744         }
1745
1746         ret = -ENOMEM;
1747         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1748                                                         OBJ_REQUEST_NODATA);
1749         if (!obj_request)
1750                 goto out_cancel;
1751
1752         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1753         if (!obj_request->osd_req)
1754                 goto out_cancel;
1755
1756         if (start)
1757                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1758         else
1759                 ceph_osdc_unregister_linger_request(osdc,
1760                                         rbd_dev->watch_request->osd_req);
1761
1762         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1763                                 rbd_dev->watch_event->cookie,
1764                                 rbd_dev->header.obj_version, start);
1765         rbd_osd_req_format(obj_request, true);
1766
1767         ret = rbd_obj_request_submit(osdc, obj_request);
1768         if (ret)
1769                 goto out_cancel;
1770         ret = rbd_obj_request_wait(obj_request);
1771         if (ret)
1772                 goto out_cancel;
1773         ret = obj_request->result;
1774         if (ret)
1775                 goto out_cancel;
1776
1777         /*
1778          * A watch request is set to linger, so the underlying osd
1779          * request won't go away until we unregister it.  We retain
1780          * a pointer to the object request during that time (in
1781          * rbd_dev->watch_request), so we'll keep a reference to
1782          * it.  We'll drop that reference (below) after we've
1783          * unregistered it.
1784          */
1785         if (start) {
1786                 rbd_dev->watch_request = obj_request;
1787
1788                 return 0;
1789         }
1790
1791         /* We have successfully torn down the watch request */
1792
1793         rbd_obj_request_put(rbd_dev->watch_request);
1794         rbd_dev->watch_request = NULL;
1795 out_cancel:
1796         /* Cancel the event if we're tearing down, or on error */
1797         ceph_osdc_cancel_event(rbd_dev->watch_event);
1798         rbd_dev->watch_event = NULL;
1799         if (obj_request)
1800                 rbd_obj_request_put(obj_request);
1801
1802         return ret;
1803 }
1804
1805 /*
1806  * Synchronous osd object method call
1807  */
1808 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1809                              const char *object_name,
1810                              const char *class_name,
1811                              const char *method_name,
1812                              const char *outbound,
1813                              size_t outbound_size,
1814                              char *inbound,
1815                              size_t inbound_size,
1816                              u64 *version)
1817 {
1818         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1819         struct rbd_obj_request *obj_request;
1820         struct page **pages;
1821         u32 page_count;
1822         int ret;
1823
1824         /*
1825          * Method calls are ultimately read operations.  The result
1826          * should placed into the inbound buffer provided.  They
1827          * also supply outbound data--parameters for the object
1828          * method.  Currently if this is present it will be a
1829          * snapshot id.
1830          */
1831         page_count = (u32) calc_pages_for(0, inbound_size);
1832         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1833         if (IS_ERR(pages))
1834                 return PTR_ERR(pages);
1835
1836         ret = -ENOMEM;
1837         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1838                                                         OBJ_REQUEST_PAGES);
1839         if (!obj_request)
1840                 goto out;
1841
1842         obj_request->pages = pages;
1843         obj_request->page_count = page_count;
1844
1845         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1846         if (!obj_request->osd_req)
1847                 goto out;
1848
1849         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1850                                         class_name, method_name);
1851         if (outbound_size) {
1852                 struct ceph_pagelist *pagelist;
1853
1854                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1855                 if (!pagelist)
1856                         goto out;
1857
1858                 ceph_pagelist_init(pagelist);
1859                 ceph_pagelist_append(pagelist, outbound, outbound_size);
1860                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1861                                                 pagelist);
1862         }
1863         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1864                                         obj_request->pages, inbound_size,
1865                                         0, false, false);
1866         rbd_osd_req_format(obj_request, false);
1867
1868         ret = rbd_obj_request_submit(osdc, obj_request);
1869         if (ret)
1870                 goto out;
1871         ret = rbd_obj_request_wait(obj_request);
1872         if (ret)
1873                 goto out;
1874
1875         ret = obj_request->result;
1876         if (ret < 0)
1877                 goto out;
1878         ret = 0;
1879         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1880         if (version)
1881                 *version = obj_request->version;
1882 out:
1883         if (obj_request)
1884                 rbd_obj_request_put(obj_request);
1885         else
1886                 ceph_release_page_vector(pages, page_count);
1887
1888         return ret;
1889 }
1890
1891 static void rbd_request_fn(struct request_queue *q)
1892                 __releases(q->queue_lock) __acquires(q->queue_lock)
1893 {
1894         struct rbd_device *rbd_dev = q->queuedata;
1895         bool read_only = rbd_dev->mapping.read_only;
1896         struct request *rq;
1897         int result;
1898
1899         while ((rq = blk_fetch_request(q))) {
1900                 bool write_request = rq_data_dir(rq) == WRITE;
1901                 struct rbd_img_request *img_request;
1902                 u64 offset;
1903                 u64 length;
1904
1905                 /* Ignore any non-FS requests that filter through. */
1906
1907                 if (rq->cmd_type != REQ_TYPE_FS) {
1908                         dout("%s: non-fs request type %d\n", __func__,
1909                                 (int) rq->cmd_type);
1910                         __blk_end_request_all(rq, 0);
1911                         continue;
1912                 }
1913
1914                 /* Ignore/skip any zero-length requests */
1915
1916                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1917                 length = (u64) blk_rq_bytes(rq);
1918
1919                 if (!length) {
1920                         dout("%s: zero-length request\n", __func__);
1921                         __blk_end_request_all(rq, 0);
1922                         continue;
1923                 }
1924
1925                 spin_unlock_irq(q->queue_lock);
1926
1927                 /* Disallow writes to a read-only device */
1928
1929                 if (write_request) {
1930                         result = -EROFS;
1931                         if (read_only)
1932                                 goto end_request;
1933                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1934                 }
1935
1936                 /*
1937                  * Quit early if the mapped snapshot no longer
1938                  * exists.  It's still possible the snapshot will
1939                  * have disappeared by the time our request arrives
1940                  * at the osd, but there's no sense in sending it if
1941                  * we already know.
1942                  */
1943                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1944                         dout("request for non-existent snapshot");
1945                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1946                         result = -ENXIO;
1947                         goto end_request;
1948                 }
1949
1950                 result = -EINVAL;
1951                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1952                         goto end_request;       /* Shouldn't happen */
1953
1954                 result = -ENOMEM;
1955                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1956                                                         write_request);
1957                 if (!img_request)
1958                         goto end_request;
1959
1960                 img_request->rq = rq;
1961
1962                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1963                 if (!result)
1964                         result = rbd_img_request_submit(img_request);
1965                 if (result)
1966                         rbd_img_request_put(img_request);
1967 end_request:
1968                 spin_lock_irq(q->queue_lock);
1969                 if (result < 0) {
1970                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1971                                 write_request ? "write" : "read", result);
1972                         __blk_end_request_all(rq, result);
1973                 }
1974         }
1975 }
1976
1977 /*
1978  * a queue callback. Makes sure that we don't create a bio that spans across
1979  * multiple osd objects. One exception would be with a single page bios,
1980  * which we handle later at bio_chain_clone_range()
1981  */
1982 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1983                           struct bio_vec *bvec)
1984 {
1985         struct rbd_device *rbd_dev = q->queuedata;
1986         sector_t sector_offset;
1987         sector_t sectors_per_obj;
1988         sector_t obj_sector_offset;
1989         int ret;
1990
1991         /*
1992          * Find how far into its rbd object the partition-relative
1993          * bio start sector is to offset relative to the enclosing
1994          * device.
1995          */
1996         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1997         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1998         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1999
2000         /*
2001          * Compute the number of bytes from that offset to the end
2002          * of the object.  Account for what's already used by the bio.
2003          */
2004         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2005         if (ret > bmd->bi_size)
2006                 ret -= bmd->bi_size;
2007         else
2008                 ret = 0;
2009
2010         /*
2011          * Don't send back more than was asked for.  And if the bio
2012          * was empty, let the whole thing through because:  "Note
2013          * that a block device *must* allow a single page to be
2014          * added to an empty bio."
2015          */
2016         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2017         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2018                 ret = (int) bvec->bv_len;
2019
2020         return ret;
2021 }
2022
2023 static void rbd_free_disk(struct rbd_device *rbd_dev)
2024 {
2025         struct gendisk *disk = rbd_dev->disk;
2026
2027         if (!disk)
2028                 return;
2029
2030         if (disk->flags & GENHD_FL_UP)
2031                 del_gendisk(disk);
2032         if (disk->queue)
2033                 blk_cleanup_queue(disk->queue);
2034         put_disk(disk);
2035 }
2036
2037 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2038                                 const char *object_name,
2039                                 u64 offset, u64 length,
2040                                 char *buf, u64 *version)
2041
2042 {
2043         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2044         struct rbd_obj_request *obj_request;
2045         struct page **pages = NULL;
2046         u32 page_count;
2047         size_t size;
2048         int ret;
2049
2050         page_count = (u32) calc_pages_for(offset, length);
2051         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2052         if (IS_ERR(pages))
2053                 ret = PTR_ERR(pages);
2054
2055         ret = -ENOMEM;
2056         obj_request = rbd_obj_request_create(object_name, offset, length,
2057                                                         OBJ_REQUEST_PAGES);
2058         if (!obj_request)
2059                 goto out;
2060
2061         obj_request->pages = pages;
2062         obj_request->page_count = page_count;
2063
2064         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2065         if (!obj_request->osd_req)
2066                 goto out;
2067
2068         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2069                                         offset, length, 0, 0);
2070         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2071                                         obj_request->pages,
2072                                         obj_request->length,
2073                                         obj_request->offset & ~PAGE_MASK,
2074                                         false, false);
2075         rbd_osd_req_format(obj_request, false);
2076
2077         ret = rbd_obj_request_submit(osdc, obj_request);
2078         if (ret)
2079                 goto out;
2080         ret = rbd_obj_request_wait(obj_request);
2081         if (ret)
2082                 goto out;
2083
2084         ret = obj_request->result;
2085         if (ret < 0)
2086                 goto out;
2087
2088         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2089         size = (size_t) obj_request->xferred;
2090         ceph_copy_from_page_vector(pages, buf, 0, size);
2091         rbd_assert(size <= (size_t) INT_MAX);
2092         ret = (int) size;
2093         if (version)
2094                 *version = obj_request->version;
2095 out:
2096         if (obj_request)
2097                 rbd_obj_request_put(obj_request);
2098         else
2099                 ceph_release_page_vector(pages, page_count);
2100
2101         return ret;
2102 }
2103
2104 /*
2105  * Read the complete header for the given rbd device.
2106  *
2107  * Returns a pointer to a dynamically-allocated buffer containing
2108  * the complete and validated header.  Caller can pass the address
2109  * of a variable that will be filled in with the version of the
2110  * header object at the time it was read.
2111  *
2112  * Returns a pointer-coded errno if a failure occurs.
2113  */
2114 static struct rbd_image_header_ondisk *
2115 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2116 {
2117         struct rbd_image_header_ondisk *ondisk = NULL;
2118         u32 snap_count = 0;
2119         u64 names_size = 0;
2120         u32 want_count;
2121         int ret;
2122
2123         /*
2124          * The complete header will include an array of its 64-bit
2125          * snapshot ids, followed by the names of those snapshots as
2126          * a contiguous block of NUL-terminated strings.  Note that
2127          * the number of snapshots could change by the time we read
2128          * it in, in which case we re-read it.
2129          */
2130         do {
2131                 size_t size;
2132
2133                 kfree(ondisk);
2134
2135                 size = sizeof (*ondisk);
2136                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2137                 size += names_size;
2138                 ondisk = kmalloc(size, GFP_KERNEL);
2139                 if (!ondisk)
2140                         return ERR_PTR(-ENOMEM);
2141
2142                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2143                                        0, size,
2144                                        (char *) ondisk, version);
2145                 if (ret < 0)
2146                         goto out_err;
2147                 if (WARN_ON((size_t) ret < size)) {
2148                         ret = -ENXIO;
2149                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2150                                 size, ret);
2151                         goto out_err;
2152                 }
2153                 if (!rbd_dev_ondisk_valid(ondisk)) {
2154                         ret = -ENXIO;
2155                         rbd_warn(rbd_dev, "invalid header");
2156                         goto out_err;
2157                 }
2158
2159                 names_size = le64_to_cpu(ondisk->snap_names_len);
2160                 want_count = snap_count;
2161                 snap_count = le32_to_cpu(ondisk->snap_count);
2162         } while (snap_count != want_count);
2163
2164         return ondisk;
2165
2166 out_err:
2167         kfree(ondisk);
2168
2169         return ERR_PTR(ret);
2170 }
2171
2172 /*
2173  * reload the ondisk the header
2174  */
2175 static int rbd_read_header(struct rbd_device *rbd_dev,
2176                            struct rbd_image_header *header)
2177 {
2178         struct rbd_image_header_ondisk *ondisk;
2179         u64 ver = 0;
2180         int ret;
2181
2182         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2183         if (IS_ERR(ondisk))
2184                 return PTR_ERR(ondisk);
2185         ret = rbd_header_from_disk(header, ondisk);
2186         if (ret >= 0)
2187                 header->obj_version = ver;
2188         kfree(ondisk);
2189
2190         return ret;
2191 }
2192
2193 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2194 {
2195         struct rbd_snap *snap;
2196         struct rbd_snap *next;
2197
2198         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2199                 rbd_remove_snap_dev(snap);
2200 }
2201
2202 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2203 {
2204         sector_t size;
2205
2206         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2207                 return;
2208
2209         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2210         dout("setting size to %llu sectors", (unsigned long long) size);
2211         rbd_dev->mapping.size = (u64) size;
2212         set_capacity(rbd_dev->disk, size);
2213 }
2214
2215 /*
2216  * only read the first part of the ondisk header, without the snaps info
2217  */
2218 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2219 {
2220         int ret;
2221         struct rbd_image_header h;
2222
2223         ret = rbd_read_header(rbd_dev, &h);
2224         if (ret < 0)
2225                 return ret;
2226
2227         down_write(&rbd_dev->header_rwsem);
2228
2229         /* Update image size, and check for resize of mapped image */
2230         rbd_dev->header.image_size = h.image_size;
2231         rbd_update_mapping_size(rbd_dev);
2232
2233         /* rbd_dev->header.object_prefix shouldn't change */
2234         kfree(rbd_dev->header.snap_sizes);
2235         kfree(rbd_dev->header.snap_names);
2236         /* osd requests may still refer to snapc */
2237         ceph_put_snap_context(rbd_dev->header.snapc);
2238
2239         if (hver)
2240                 *hver = h.obj_version;
2241         rbd_dev->header.obj_version = h.obj_version;
2242         rbd_dev->header.image_size = h.image_size;
2243         rbd_dev->header.snapc = h.snapc;
2244         rbd_dev->header.snap_names = h.snap_names;
2245         rbd_dev->header.snap_sizes = h.snap_sizes;
2246         /* Free the extra copy of the object prefix */
2247         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2248         kfree(h.object_prefix);
2249
2250         ret = rbd_dev_snaps_update(rbd_dev);
2251         if (!ret)
2252                 ret = rbd_dev_snaps_register(rbd_dev);
2253
2254         up_write(&rbd_dev->header_rwsem);
2255
2256         return ret;
2257 }
2258
2259 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2260 {
2261         int ret;
2262
2263         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2264         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2265         if (rbd_dev->image_format == 1)
2266                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2267         else
2268                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2269         mutex_unlock(&ctl_mutex);
2270
2271         return ret;
2272 }
2273
2274 static int rbd_init_disk(struct rbd_device *rbd_dev)
2275 {
2276         struct gendisk *disk;
2277         struct request_queue *q;
2278         u64 segment_size;
2279
2280         /* create gendisk info */
2281         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2282         if (!disk)
2283                 return -ENOMEM;
2284
2285         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2286                  rbd_dev->dev_id);
2287         disk->major = rbd_dev->major;
2288         disk->first_minor = 0;
2289         disk->fops = &rbd_bd_ops;
2290         disk->private_data = rbd_dev;
2291
2292         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2293         if (!q)
2294                 goto out_disk;
2295
2296         /* We use the default size, but let's be explicit about it. */
2297         blk_queue_physical_block_size(q, SECTOR_SIZE);
2298
2299         /* set io sizes to object size */
2300         segment_size = rbd_obj_bytes(&rbd_dev->header);
2301         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2302         blk_queue_max_segment_size(q, segment_size);
2303         blk_queue_io_min(q, segment_size);
2304         blk_queue_io_opt(q, segment_size);
2305
2306         blk_queue_merge_bvec(q, rbd_merge_bvec);
2307         disk->queue = q;
2308
2309         q->queuedata = rbd_dev;
2310
2311         rbd_dev->disk = disk;
2312
2313         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2314
2315         return 0;
2316 out_disk:
2317         put_disk(disk);
2318
2319         return -ENOMEM;
2320 }
2321
2322 /*
2323   sysfs
2324 */
2325
2326 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2327 {
2328         return container_of(dev, struct rbd_device, dev);
2329 }
2330
2331 static ssize_t rbd_size_show(struct device *dev,
2332                              struct device_attribute *attr, char *buf)
2333 {
2334         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2335         sector_t size;
2336
2337         down_read(&rbd_dev->header_rwsem);
2338         size = get_capacity(rbd_dev->disk);
2339         up_read(&rbd_dev->header_rwsem);
2340
2341         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2342 }
2343
2344 /*
2345  * Note this shows the features for whatever's mapped, which is not
2346  * necessarily the base image.
2347  */
2348 static ssize_t rbd_features_show(struct device *dev,
2349                              struct device_attribute *attr, char *buf)
2350 {
2351         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2352
2353         return sprintf(buf, "0x%016llx\n",
2354                         (unsigned long long) rbd_dev->mapping.features);
2355 }
2356
2357 static ssize_t rbd_major_show(struct device *dev,
2358                               struct device_attribute *attr, char *buf)
2359 {
2360         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2361
2362         return sprintf(buf, "%d\n", rbd_dev->major);
2363 }
2364
2365 static ssize_t rbd_client_id_show(struct device *dev,
2366                                   struct device_attribute *attr, char *buf)
2367 {
2368         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2369
2370         return sprintf(buf, "client%lld\n",
2371                         ceph_client_id(rbd_dev->rbd_client->client));
2372 }
2373
2374 static ssize_t rbd_pool_show(struct device *dev,
2375                              struct device_attribute *attr, char *buf)
2376 {
2377         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2378
2379         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2380 }
2381
2382 static ssize_t rbd_pool_id_show(struct device *dev,
2383                              struct device_attribute *attr, char *buf)
2384 {
2385         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2386
2387         return sprintf(buf, "%llu\n",
2388                 (unsigned long long) rbd_dev->spec->pool_id);
2389 }
2390
2391 static ssize_t rbd_name_show(struct device *dev,
2392                              struct device_attribute *attr, char *buf)
2393 {
2394         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2395
2396         if (rbd_dev->spec->image_name)
2397                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2398
2399         return sprintf(buf, "(unknown)\n");
2400 }
2401
2402 static ssize_t rbd_image_id_show(struct device *dev,
2403                              struct device_attribute *attr, char *buf)
2404 {
2405         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2406
2407         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2408 }
2409
2410 /*
2411  * Shows the name of the currently-mapped snapshot (or
2412  * RBD_SNAP_HEAD_NAME for the base image).
2413  */
2414 static ssize_t rbd_snap_show(struct device *dev,
2415                              struct device_attribute *attr,
2416                              char *buf)
2417 {
2418         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2419
2420         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2421 }
2422
2423 /*
2424  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2425  * for the parent image.  If there is no parent, simply shows
2426  * "(no parent image)".
2427  */
2428 static ssize_t rbd_parent_show(struct device *dev,
2429                              struct device_attribute *attr,
2430                              char *buf)
2431 {
2432         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2433         struct rbd_spec *spec = rbd_dev->parent_spec;
2434         int count;
2435         char *bufp = buf;
2436
2437         if (!spec)
2438                 return sprintf(buf, "(no parent image)\n");
2439
2440         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2441                         (unsigned long long) spec->pool_id, spec->pool_name);
2442         if (count < 0)
2443                 return count;
2444         bufp += count;
2445
2446         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2447                         spec->image_name ? spec->image_name : "(unknown)");
2448         if (count < 0)
2449                 return count;
2450         bufp += count;
2451
2452         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2453                         (unsigned long long) spec->snap_id, spec->snap_name);
2454         if (count < 0)
2455                 return count;
2456         bufp += count;
2457
2458         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2459         if (count < 0)
2460                 return count;
2461         bufp += count;
2462
2463         return (ssize_t) (bufp - buf);
2464 }
2465
2466 static ssize_t rbd_image_refresh(struct device *dev,
2467                                  struct device_attribute *attr,
2468                                  const char *buf,
2469                                  size_t size)
2470 {
2471         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2472         int ret;
2473
2474         ret = rbd_dev_refresh(rbd_dev, NULL);
2475
2476         return ret < 0 ? ret : size;
2477 }
2478
2479 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2480 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2481 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2482 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2483 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2484 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2485 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2486 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2487 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2488 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2489 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2490
2491 static struct attribute *rbd_attrs[] = {
2492         &dev_attr_size.attr,
2493         &dev_attr_features.attr,
2494         &dev_attr_major.attr,
2495         &dev_attr_client_id.attr,
2496         &dev_attr_pool.attr,
2497         &dev_attr_pool_id.attr,
2498         &dev_attr_name.attr,
2499         &dev_attr_image_id.attr,
2500         &dev_attr_current_snap.attr,
2501         &dev_attr_parent.attr,
2502         &dev_attr_refresh.attr,
2503         NULL
2504 };
2505
2506 static struct attribute_group rbd_attr_group = {
2507         .attrs = rbd_attrs,
2508 };
2509
2510 static const struct attribute_group *rbd_attr_groups[] = {
2511         &rbd_attr_group,
2512         NULL
2513 };
2514
2515 static void rbd_sysfs_dev_release(struct device *dev)
2516 {
2517 }
2518
2519 static struct device_type rbd_device_type = {
2520         .name           = "rbd",
2521         .groups         = rbd_attr_groups,
2522         .release        = rbd_sysfs_dev_release,
2523 };
2524
2525
2526 /*
2527   sysfs - snapshots
2528 */
2529
2530 static ssize_t rbd_snap_size_show(struct device *dev,
2531                                   struct device_attribute *attr,
2532                                   char *buf)
2533 {
2534         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2535
2536         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2537 }
2538
2539 static ssize_t rbd_snap_id_show(struct device *dev,
2540                                 struct device_attribute *attr,
2541                                 char *buf)
2542 {
2543         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2544
2545         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2546 }
2547
2548 static ssize_t rbd_snap_features_show(struct device *dev,
2549                                 struct device_attribute *attr,
2550                                 char *buf)
2551 {
2552         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2553
2554         return sprintf(buf, "0x%016llx\n",
2555                         (unsigned long long) snap->features);
2556 }
2557
2558 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2559 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2560 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2561
2562 static struct attribute *rbd_snap_attrs[] = {
2563         &dev_attr_snap_size.attr,
2564         &dev_attr_snap_id.attr,
2565         &dev_attr_snap_features.attr,
2566         NULL,
2567 };
2568
2569 static struct attribute_group rbd_snap_attr_group = {
2570         .attrs = rbd_snap_attrs,
2571 };
2572
2573 static void rbd_snap_dev_release(struct device *dev)
2574 {
2575         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2576         kfree(snap->name);
2577         kfree(snap);
2578 }
2579
2580 static const struct attribute_group *rbd_snap_attr_groups[] = {
2581         &rbd_snap_attr_group,
2582         NULL
2583 };
2584
2585 static struct device_type rbd_snap_device_type = {
2586         .groups         = rbd_snap_attr_groups,
2587         .release        = rbd_snap_dev_release,
2588 };
2589
2590 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2591 {
2592         kref_get(&spec->kref);
2593
2594         return spec;
2595 }
2596
2597 static void rbd_spec_free(struct kref *kref);
2598 static void rbd_spec_put(struct rbd_spec *spec)
2599 {
2600         if (spec)
2601                 kref_put(&spec->kref, rbd_spec_free);
2602 }
2603
2604 static struct rbd_spec *rbd_spec_alloc(void)
2605 {
2606         struct rbd_spec *spec;
2607
2608         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2609         if (!spec)
2610                 return NULL;
2611         kref_init(&spec->kref);
2612
2613         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2614
2615         return spec;
2616 }
2617
2618 static void rbd_spec_free(struct kref *kref)
2619 {
2620         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2621
2622         kfree(spec->pool_name);
2623         kfree(spec->image_id);
2624         kfree(spec->image_name);
2625         kfree(spec->snap_name);
2626         kfree(spec);
2627 }
2628
2629 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2630                                 struct rbd_spec *spec)
2631 {
2632         struct rbd_device *rbd_dev;
2633
2634         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2635         if (!rbd_dev)
2636                 return NULL;
2637
2638         spin_lock_init(&rbd_dev->lock);
2639         rbd_dev->flags = 0;
2640         INIT_LIST_HEAD(&rbd_dev->node);
2641         INIT_LIST_HEAD(&rbd_dev->snaps);
2642         init_rwsem(&rbd_dev->header_rwsem);
2643
2644         rbd_dev->spec = spec;
2645         rbd_dev->rbd_client = rbdc;
2646
2647         /* Initialize the layout used for all rbd requests */
2648
2649         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2650         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2651         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2652         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2653
2654         return rbd_dev;
2655 }
2656
2657 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2658 {
2659         rbd_spec_put(rbd_dev->parent_spec);
2660         kfree(rbd_dev->header_name);
2661         rbd_put_client(rbd_dev->rbd_client);
2662         rbd_spec_put(rbd_dev->spec);
2663         kfree(rbd_dev);
2664 }
2665
2666 static bool rbd_snap_registered(struct rbd_snap *snap)
2667 {
2668         bool ret = snap->dev.type == &rbd_snap_device_type;
2669         bool reg = device_is_registered(&snap->dev);
2670
2671         rbd_assert(!ret ^ reg);
2672
2673         return ret;
2674 }
2675
2676 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2677 {
2678         list_del(&snap->node);
2679         if (device_is_registered(&snap->dev))
2680                 device_unregister(&snap->dev);
2681 }
2682
2683 static int rbd_register_snap_dev(struct rbd_snap *snap,
2684                                   struct device *parent)
2685 {
2686         struct device *dev = &snap->dev;
2687         int ret;
2688
2689         dev->type = &rbd_snap_device_type;
2690         dev->parent = parent;
2691         dev->release = rbd_snap_dev_release;
2692         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2693         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2694
2695         ret = device_register(dev);
2696
2697         return ret;
2698 }
2699
2700 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2701                                                 const char *snap_name,
2702                                                 u64 snap_id, u64 snap_size,
2703                                                 u64 snap_features)
2704 {
2705         struct rbd_snap *snap;
2706         int ret;
2707
2708         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2709         if (!snap)
2710                 return ERR_PTR(-ENOMEM);
2711
2712         ret = -ENOMEM;
2713         snap->name = kstrdup(snap_name, GFP_KERNEL);
2714         if (!snap->name)
2715                 goto err;
2716
2717         snap->id = snap_id;
2718         snap->size = snap_size;
2719         snap->features = snap_features;
2720
2721         return snap;
2722
2723 err:
2724         kfree(snap->name);
2725         kfree(snap);
2726
2727         return ERR_PTR(ret);
2728 }
2729
2730 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2731                 u64 *snap_size, u64 *snap_features)
2732 {
2733         char *snap_name;
2734
2735         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2736
2737         *snap_size = rbd_dev->header.snap_sizes[which];
2738         *snap_features = 0;     /* No features for v1 */
2739
2740         /* Skip over names until we find the one we are looking for */
2741
2742         snap_name = rbd_dev->header.snap_names;
2743         while (which--)
2744                 snap_name += strlen(snap_name) + 1;
2745
2746         return snap_name;
2747 }
2748
2749 /*
2750  * Get the size and object order for an image snapshot, or if
2751  * snap_id is CEPH_NOSNAP, gets this information for the base
2752  * image.
2753  */
2754 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2755                                 u8 *order, u64 *snap_size)
2756 {
2757         __le64 snapid = cpu_to_le64(snap_id);
2758         int ret;
2759         struct {
2760                 u8 order;
2761                 __le64 size;
2762         } __attribute__ ((packed)) size_buf = { 0 };
2763
2764         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2765                                 "rbd", "get_size",
2766                                 (char *) &snapid, sizeof (snapid),
2767                                 (char *) &size_buf, sizeof (size_buf), NULL);
2768         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2769         if (ret < 0)
2770                 return ret;
2771
2772         *order = size_buf.order;
2773         *snap_size = le64_to_cpu(size_buf.size);
2774
2775         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2776                 (unsigned long long) snap_id, (unsigned int) *order,
2777                 (unsigned long long) *snap_size);
2778
2779         return 0;
2780 }
2781
2782 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2783 {
2784         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2785                                         &rbd_dev->header.obj_order,
2786                                         &rbd_dev->header.image_size);
2787 }
2788
2789 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2790 {
2791         void *reply_buf;
2792         int ret;
2793         void *p;
2794
2795         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2796         if (!reply_buf)
2797                 return -ENOMEM;
2798
2799         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2800                                 "rbd", "get_object_prefix",
2801                                 NULL, 0,
2802                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2803         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2804         if (ret < 0)
2805                 goto out;
2806
2807         p = reply_buf;
2808         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2809                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2810                                                 NULL, GFP_NOIO);
2811
2812         if (IS_ERR(rbd_dev->header.object_prefix)) {
2813                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2814                 rbd_dev->header.object_prefix = NULL;
2815         } else {
2816                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2817         }
2818
2819 out:
2820         kfree(reply_buf);
2821
2822         return ret;
2823 }
2824
2825 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2826                 u64 *snap_features)
2827 {
2828         __le64 snapid = cpu_to_le64(snap_id);
2829         struct {
2830                 __le64 features;
2831                 __le64 incompat;
2832         } features_buf = { 0 };
2833         u64 incompat;
2834         int ret;
2835
2836         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2837                                 "rbd", "get_features",
2838                                 (char *) &snapid, sizeof (snapid),
2839                                 (char *) &features_buf, sizeof (features_buf),
2840                                 NULL);
2841         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2842         if (ret < 0)
2843                 return ret;
2844
2845         incompat = le64_to_cpu(features_buf.incompat);
2846         if (incompat & ~RBD_FEATURES_ALL)
2847                 return -ENXIO;
2848
2849         *snap_features = le64_to_cpu(features_buf.features);
2850
2851         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2852                 (unsigned long long) snap_id,
2853                 (unsigned long long) *snap_features,
2854                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2855
2856         return 0;
2857 }
2858
2859 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2860 {
2861         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2862                                                 &rbd_dev->header.features);
2863 }
2864
2865 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2866 {
2867         struct rbd_spec *parent_spec;
2868         size_t size;
2869         void *reply_buf = NULL;
2870         __le64 snapid;
2871         void *p;
2872         void *end;
2873         char *image_id;
2874         u64 overlap;
2875         int ret;
2876
2877         parent_spec = rbd_spec_alloc();
2878         if (!parent_spec)
2879                 return -ENOMEM;
2880
2881         size = sizeof (__le64) +                                /* pool_id */
2882                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2883                 sizeof (__le64) +                               /* snap_id */
2884                 sizeof (__le64);                                /* overlap */
2885         reply_buf = kmalloc(size, GFP_KERNEL);
2886         if (!reply_buf) {
2887                 ret = -ENOMEM;
2888                 goto out_err;
2889         }
2890
2891         snapid = cpu_to_le64(CEPH_NOSNAP);
2892         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2893                                 "rbd", "get_parent",
2894                                 (char *) &snapid, sizeof (snapid),
2895                                 (char *) reply_buf, size, NULL);
2896         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2897         if (ret < 0)
2898                 goto out_err;
2899
2900         ret = -ERANGE;
2901         p = reply_buf;
2902         end = (char *) reply_buf + size;
2903         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2904         if (parent_spec->pool_id == CEPH_NOPOOL)
2905                 goto out;       /* No parent?  No problem. */
2906
2907         /* The ceph file layout needs to fit pool id in 32 bits */
2908
2909         ret = -EIO;
2910         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2911                 goto out;
2912
2913         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2914         if (IS_ERR(image_id)) {
2915                 ret = PTR_ERR(image_id);
2916                 goto out_err;
2917         }
2918         parent_spec->image_id = image_id;
2919         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2920         ceph_decode_64_safe(&p, end, overlap, out_err);
2921
2922         rbd_dev->parent_overlap = overlap;
2923         rbd_dev->parent_spec = parent_spec;
2924         parent_spec = NULL;     /* rbd_dev now owns this */
2925 out:
2926         ret = 0;
2927 out_err:
2928         kfree(reply_buf);
2929         rbd_spec_put(parent_spec);
2930
2931         return ret;
2932 }
2933
2934 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2935 {
2936         size_t image_id_size;
2937         char *image_id;
2938         void *p;
2939         void *end;
2940         size_t size;
2941         void *reply_buf = NULL;
2942         size_t len = 0;
2943         char *image_name = NULL;
2944         int ret;
2945
2946         rbd_assert(!rbd_dev->spec->image_name);
2947
2948         len = strlen(rbd_dev->spec->image_id);
2949         image_id_size = sizeof (__le32) + len;
2950         image_id = kmalloc(image_id_size, GFP_KERNEL);
2951         if (!image_id)
2952                 return NULL;
2953
2954         p = image_id;
2955         end = (char *) image_id + image_id_size;
2956         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2957
2958         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2959         reply_buf = kmalloc(size, GFP_KERNEL);
2960         if (!reply_buf)
2961                 goto out;
2962
2963         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2964                                 "rbd", "dir_get_name",
2965                                 image_id, image_id_size,
2966                                 (char *) reply_buf, size, NULL);
2967         if (ret < 0)
2968                 goto out;
2969         p = reply_buf;
2970         end = (char *) reply_buf + size;
2971         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2972         if (IS_ERR(image_name))
2973                 image_name = NULL;
2974         else
2975                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2976 out:
2977         kfree(reply_buf);
2978         kfree(image_id);
2979
2980         return image_name;
2981 }
2982
2983 /*
2984  * When a parent image gets probed, we only have the pool, image,
2985  * and snapshot ids but not the names of any of them.  This call
2986  * is made later to fill in those names.  It has to be done after
2987  * rbd_dev_snaps_update() has completed because some of the
2988  * information (in particular, snapshot name) is not available
2989  * until then.
2990  */
2991 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2992 {
2993         struct ceph_osd_client *osdc;
2994         const char *name;
2995         void *reply_buf = NULL;
2996         int ret;
2997
2998         if (rbd_dev->spec->pool_name)
2999                 return 0;       /* Already have the names */
3000
3001         /* Look up the pool name */
3002
3003         osdc = &rbd_dev->rbd_client->client->osdc;
3004         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3005         if (!name) {
3006                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3007                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3008                 return -EIO;
3009         }
3010
3011         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3012         if (!rbd_dev->spec->pool_name)
3013                 return -ENOMEM;
3014
3015         /* Fetch the image name; tolerate failure here */
3016
3017         name = rbd_dev_image_name(rbd_dev);
3018         if (name)
3019                 rbd_dev->spec->image_name = (char *) name;
3020         else
3021                 rbd_warn(rbd_dev, "unable to get image name");
3022
3023         /* Look up the snapshot name. */
3024
3025         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3026         if (!name) {
3027                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3028                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3029                 ret = -EIO;
3030                 goto out_err;
3031         }
3032         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3033         if(!rbd_dev->spec->snap_name)
3034                 goto out_err;
3035
3036         return 0;
3037 out_err:
3038         kfree(reply_buf);
3039         kfree(rbd_dev->spec->pool_name);
3040         rbd_dev->spec->pool_name = NULL;
3041
3042         return ret;
3043 }
3044
3045 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3046 {
3047         size_t size;
3048         int ret;
3049         void *reply_buf;
3050         void *p;
3051         void *end;
3052         u64 seq;
3053         u32 snap_count;
3054         struct ceph_snap_context *snapc;
3055         u32 i;
3056
3057         /*
3058          * We'll need room for the seq value (maximum snapshot id),
3059          * snapshot count, and array of that many snapshot ids.
3060          * For now we have a fixed upper limit on the number we're
3061          * prepared to receive.
3062          */
3063         size = sizeof (__le64) + sizeof (__le32) +
3064                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3065         reply_buf = kzalloc(size, GFP_KERNEL);
3066         if (!reply_buf)
3067                 return -ENOMEM;
3068
3069         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3070                                 "rbd", "get_snapcontext",
3071                                 NULL, 0,
3072                                 reply_buf, size, ver);
3073         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3074         if (ret < 0)
3075                 goto out;
3076
3077         ret = -ERANGE;
3078         p = reply_buf;
3079         end = (char *) reply_buf + size;
3080         ceph_decode_64_safe(&p, end, seq, out);
3081         ceph_decode_32_safe(&p, end, snap_count, out);
3082
3083         /*
3084          * Make sure the reported number of snapshot ids wouldn't go
3085          * beyond the end of our buffer.  But before checking that,
3086          * make sure the computed size of the snapshot context we
3087          * allocate is representable in a size_t.
3088          */
3089         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3090                                  / sizeof (u64)) {
3091                 ret = -EINVAL;
3092                 goto out;
3093         }
3094         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3095                 goto out;
3096
3097         size = sizeof (struct ceph_snap_context) +
3098                                 snap_count * sizeof (snapc->snaps[0]);
3099         snapc = kmalloc(size, GFP_KERNEL);
3100         if (!snapc) {
3101                 ret = -ENOMEM;
3102                 goto out;
3103         }
3104
3105         atomic_set(&snapc->nref, 1);
3106         snapc->seq = seq;
3107         snapc->num_snaps = snap_count;
3108         for (i = 0; i < snap_count; i++)
3109                 snapc->snaps[i] = ceph_decode_64(&p);
3110
3111         rbd_dev->header.snapc = snapc;
3112
3113         dout("  snap context seq = %llu, snap_count = %u\n",
3114                 (unsigned long long) seq, (unsigned int) snap_count);
3115
3116 out:
3117         kfree(reply_buf);
3118
3119         return 0;
3120 }
3121
3122 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3123 {
3124         size_t size;
3125         void *reply_buf;
3126         __le64 snap_id;
3127         int ret;
3128         void *p;
3129         void *end;
3130         char *snap_name;
3131
3132         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3133         reply_buf = kmalloc(size, GFP_KERNEL);
3134         if (!reply_buf)
3135                 return ERR_PTR(-ENOMEM);
3136
3137         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3138         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3139                                 "rbd", "get_snapshot_name",
3140                                 (char *) &snap_id, sizeof (snap_id),
3141                                 reply_buf, size, NULL);
3142         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3143         if (ret < 0)
3144                 goto out;
3145
3146         p = reply_buf;
3147         end = (char *) reply_buf + size;
3148         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3149         if (IS_ERR(snap_name)) {
3150                 ret = PTR_ERR(snap_name);
3151                 goto out;
3152         } else {
3153                 dout("  snap_id 0x%016llx snap_name = %s\n",
3154                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3155         }
3156         kfree(reply_buf);
3157
3158         return snap_name;
3159 out:
3160         kfree(reply_buf);
3161
3162         return ERR_PTR(ret);
3163 }
3164
3165 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3166                 u64 *snap_size, u64 *snap_features)
3167 {
3168         u64 snap_id;
3169         u8 order;
3170         int ret;
3171
3172         snap_id = rbd_dev->header.snapc->snaps[which];
3173         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3174         if (ret)
3175                 return ERR_PTR(ret);
3176         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3177         if (ret)
3178                 return ERR_PTR(ret);
3179
3180         return rbd_dev_v2_snap_name(rbd_dev, which);
3181 }
3182
3183 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3184                 u64 *snap_size, u64 *snap_features)
3185 {
3186         if (rbd_dev->image_format == 1)
3187                 return rbd_dev_v1_snap_info(rbd_dev, which,
3188                                         snap_size, snap_features);
3189         if (rbd_dev->image_format == 2)
3190                 return rbd_dev_v2_snap_info(rbd_dev, which,
3191                                         snap_size, snap_features);
3192         return ERR_PTR(-EINVAL);
3193 }
3194
3195 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3196 {
3197         int ret;
3198         __u8 obj_order;
3199
3200         down_write(&rbd_dev->header_rwsem);
3201
3202         /* Grab old order first, to see if it changes */
3203
3204         obj_order = rbd_dev->header.obj_order,
3205         ret = rbd_dev_v2_image_size(rbd_dev);
3206         if (ret)
3207                 goto out;
3208         if (rbd_dev->header.obj_order != obj_order) {
3209                 ret = -EIO;
3210                 goto out;
3211         }
3212         rbd_update_mapping_size(rbd_dev);
3213
3214         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3215         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3216         if (ret)
3217                 goto out;
3218         ret = rbd_dev_snaps_update(rbd_dev);
3219         dout("rbd_dev_snaps_update returned %d\n", ret);
3220         if (ret)
3221                 goto out;
3222         ret = rbd_dev_snaps_register(rbd_dev);
3223         dout("rbd_dev_snaps_register returned %d\n", ret);
3224 out:
3225         up_write(&rbd_dev->header_rwsem);
3226
3227         return ret;
3228 }
3229
3230 /*
3231  * Scan the rbd device's current snapshot list and compare it to the
3232  * newly-received snapshot context.  Remove any existing snapshots
3233  * not present in the new snapshot context.  Add a new snapshot for
3234  * any snaphots in the snapshot context not in the current list.
3235  * And verify there are no changes to snapshots we already know
3236  * about.
3237  *
3238  * Assumes the snapshots in the snapshot context are sorted by
3239  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3240  * are also maintained in that order.)
3241  */
3242 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3243 {
3244         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3245         const u32 snap_count = snapc->num_snaps;
3246         struct list_head *head = &rbd_dev->snaps;
3247         struct list_head *links = head->next;
3248         u32 index = 0;
3249
3250         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3251         while (index < snap_count || links != head) {
3252                 u64 snap_id;
3253                 struct rbd_snap *snap;
3254                 char *snap_name;
3255                 u64 snap_size = 0;
3256                 u64 snap_features = 0;
3257
3258                 snap_id = index < snap_count ? snapc->snaps[index]
3259                                              : CEPH_NOSNAP;
3260                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3261                                      : NULL;
3262                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3263
3264                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3265                         struct list_head *next = links->next;
3266
3267                         /*
3268                          * A previously-existing snapshot is not in
3269                          * the new snap context.
3270                          *
3271                          * If the now missing snapshot is the one the
3272                          * image is mapped to, clear its exists flag
3273                          * so we can avoid sending any more requests
3274                          * to it.
3275                          */
3276                         if (rbd_dev->spec->snap_id == snap->id)
3277                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3278                         rbd_remove_snap_dev(snap);
3279                         dout("%ssnap id %llu has been removed\n",
3280                                 rbd_dev->spec->snap_id == snap->id ?
3281                                                         "mapped " : "",
3282                                 (unsigned long long) snap->id);
3283
3284                         /* Done with this list entry; advance */
3285
3286                         links = next;
3287                         continue;
3288                 }
3289
3290                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3291                                         &snap_size, &snap_features);
3292                 if (IS_ERR(snap_name))
3293                         return PTR_ERR(snap_name);
3294
3295                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3296                         (unsigned long long) snap_id);
3297                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3298                         struct rbd_snap *new_snap;
3299
3300                         /* We haven't seen this snapshot before */
3301
3302                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3303                                         snap_id, snap_size, snap_features);
3304                         if (IS_ERR(new_snap)) {
3305                                 int err = PTR_ERR(new_snap);
3306
3307                                 dout("  failed to add dev, error %d\n", err);
3308
3309                                 return err;
3310                         }
3311
3312                         /* New goes before existing, or at end of list */
3313
3314                         dout("  added dev%s\n", snap ? "" : " at end\n");
3315                         if (snap)
3316                                 list_add_tail(&new_snap->node, &snap->node);
3317                         else
3318                                 list_add_tail(&new_snap->node, head);
3319                 } else {
3320                         /* Already have this one */
3321
3322                         dout("  already present\n");
3323
3324                         rbd_assert(snap->size == snap_size);
3325                         rbd_assert(!strcmp(snap->name, snap_name));
3326                         rbd_assert(snap->features == snap_features);
3327
3328                         /* Done with this list entry; advance */
3329
3330                         links = links->next;
3331                 }
3332
3333                 /* Advance to the next entry in the snapshot context */
3334
3335                 index++;
3336         }
3337         dout("%s: done\n", __func__);
3338
3339         return 0;
3340 }
3341
3342 /*
3343  * Scan the list of snapshots and register the devices for any that
3344  * have not already been registered.
3345  */
3346 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3347 {
3348         struct rbd_snap *snap;
3349         int ret = 0;
3350
3351         dout("%s:\n", __func__);
3352         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3353                 return -EIO;
3354
3355         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3356                 if (!rbd_snap_registered(snap)) {
3357                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3358                         if (ret < 0)
3359                                 break;
3360                 }
3361         }
3362         dout("%s: returning %d\n", __func__, ret);
3363
3364         return ret;
3365 }
3366
3367 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3368 {
3369         struct device *dev;
3370         int ret;
3371
3372         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3373
3374         dev = &rbd_dev->dev;
3375         dev->bus = &rbd_bus_type;
3376         dev->type = &rbd_device_type;
3377         dev->parent = &rbd_root_dev;
3378         dev->release = rbd_dev_release;
3379         dev_set_name(dev, "%d", rbd_dev->dev_id);
3380         ret = device_register(dev);
3381
3382         mutex_unlock(&ctl_mutex);
3383
3384         return ret;
3385 }
3386
3387 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3388 {
3389         device_unregister(&rbd_dev->dev);
3390 }
3391
3392 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3393
3394 /*
3395  * Get a unique rbd identifier for the given new rbd_dev, and add
3396  * the rbd_dev to the global list.  The minimum rbd id is 1.
3397  */
3398 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3399 {
3400         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3401
3402         spin_lock(&rbd_dev_list_lock);
3403         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3404         spin_unlock(&rbd_dev_list_lock);
3405         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3406                 (unsigned long long) rbd_dev->dev_id);
3407 }
3408
3409 /*
3410  * Remove an rbd_dev from the global list, and record that its
3411  * identifier is no longer in use.
3412  */
3413 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3414 {
3415         struct list_head *tmp;
3416         int rbd_id = rbd_dev->dev_id;
3417         int max_id;
3418
3419         rbd_assert(rbd_id > 0);
3420
3421         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3422                 (unsigned long long) rbd_dev->dev_id);
3423         spin_lock(&rbd_dev_list_lock);
3424         list_del_init(&rbd_dev->node);
3425
3426         /*
3427          * If the id being "put" is not the current maximum, there
3428          * is nothing special we need to do.
3429          */
3430         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3431                 spin_unlock(&rbd_dev_list_lock);
3432                 return;
3433         }
3434
3435         /*
3436          * We need to update the current maximum id.  Search the
3437          * list to find out what it is.  We're more likely to find
3438          * the maximum at the end, so search the list backward.
3439          */
3440         max_id = 0;
3441         list_for_each_prev(tmp, &rbd_dev_list) {
3442                 struct rbd_device *rbd_dev;
3443
3444                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3445                 if (rbd_dev->dev_id > max_id)
3446                         max_id = rbd_dev->dev_id;
3447         }
3448         spin_unlock(&rbd_dev_list_lock);
3449
3450         /*
3451          * The max id could have been updated by rbd_dev_id_get(), in
3452          * which case it now accurately reflects the new maximum.
3453          * Be careful not to overwrite the maximum value in that
3454          * case.
3455          */
3456         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3457         dout("  max dev id has been reset\n");
3458 }
3459
3460 /*
3461  * Skips over white space at *buf, and updates *buf to point to the
3462  * first found non-space character (if any). Returns the length of
3463  * the token (string of non-white space characters) found.  Note
3464  * that *buf must be terminated with '\0'.
3465  */
3466 static inline size_t next_token(const char **buf)
3467 {
3468         /*
3469         * These are the characters that produce nonzero for
3470         * isspace() in the "C" and "POSIX" locales.
3471         */
3472         const char *spaces = " \f\n\r\t\v";
3473
3474         *buf += strspn(*buf, spaces);   /* Find start of token */
3475
3476         return strcspn(*buf, spaces);   /* Return token length */
3477 }
3478
3479 /*
3480  * Finds the next token in *buf, and if the provided token buffer is
3481  * big enough, copies the found token into it.  The result, if
3482  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3483  * must be terminated with '\0' on entry.
3484  *
3485  * Returns the length of the token found (not including the '\0').
3486  * Return value will be 0 if no token is found, and it will be >=
3487  * token_size if the token would not fit.
3488  *
3489  * The *buf pointer will be updated to point beyond the end of the
3490  * found token.  Note that this occurs even if the token buffer is
3491  * too small to hold it.
3492  */
3493 static inline size_t copy_token(const char **buf,
3494                                 char *token,
3495                                 size_t token_size)
3496 {
3497         size_t len;
3498
3499         len = next_token(buf);
3500         if (len < token_size) {
3501                 memcpy(token, *buf, len);
3502                 *(token + len) = '\0';
3503         }
3504         *buf += len;
3505
3506         return len;
3507 }
3508
3509 /*
3510  * Finds the next token in *buf, dynamically allocates a buffer big
3511  * enough to hold a copy of it, and copies the token into the new
3512  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3513  * that a duplicate buffer is created even for a zero-length token.
3514  *
3515  * Returns a pointer to the newly-allocated duplicate, or a null
3516  * pointer if memory for the duplicate was not available.  If
3517  * the lenp argument is a non-null pointer, the length of the token
3518  * (not including the '\0') is returned in *lenp.
3519  *
3520  * If successful, the *buf pointer will be updated to point beyond
3521  * the end of the found token.
3522  *
3523  * Note: uses GFP_KERNEL for allocation.
3524  */
3525 static inline char *dup_token(const char **buf, size_t *lenp)
3526 {
3527         char *dup;
3528         size_t len;
3529
3530         len = next_token(buf);
3531         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3532         if (!dup)
3533                 return NULL;
3534         *(dup + len) = '\0';
3535         *buf += len;
3536
3537         if (lenp)
3538                 *lenp = len;
3539
3540         return dup;
3541 }
3542
3543 /*
3544  * Parse the options provided for an "rbd add" (i.e., rbd image
3545  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3546  * and the data written is passed here via a NUL-terminated buffer.
3547  * Returns 0 if successful or an error code otherwise.
3548  *
3549  * The information extracted from these options is recorded in
3550  * the other parameters which return dynamically-allocated
3551  * structures:
3552  *  ceph_opts
3553  *      The address of a pointer that will refer to a ceph options
3554  *      structure.  Caller must release the returned pointer using
3555  *      ceph_destroy_options() when it is no longer needed.
3556  *  rbd_opts
3557  *      Address of an rbd options pointer.  Fully initialized by
3558  *      this function; caller must release with kfree().
3559  *  spec
3560  *      Address of an rbd image specification pointer.  Fully
3561  *      initialized by this function based on parsed options.
3562  *      Caller must release with rbd_spec_put().
3563  *
3564  * The options passed take this form:
3565  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3566  * where:
3567  *  <mon_addrs>
3568  *      A comma-separated list of one or more monitor addresses.
3569  *      A monitor address is an ip address, optionally followed
3570  *      by a port number (separated by a colon).
3571  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3572  *  <options>
3573  *      A comma-separated list of ceph and/or rbd options.
3574  *  <pool_name>
3575  *      The name of the rados pool containing the rbd image.
3576  *  <image_name>
3577  *      The name of the image in that pool to map.
3578  *  <snap_id>
3579  *      An optional snapshot id.  If provided, the mapping will
3580  *      present data from the image at the time that snapshot was
3581  *      created.  The image head is used if no snapshot id is
3582  *      provided.  Snapshot mappings are always read-only.
3583  */
3584 static int rbd_add_parse_args(const char *buf,
3585                                 struct ceph_options **ceph_opts,
3586                                 struct rbd_options **opts,
3587                                 struct rbd_spec **rbd_spec)
3588 {
3589         size_t len;
3590         char *options;
3591         const char *mon_addrs;
3592         size_t mon_addrs_size;
3593         struct rbd_spec *spec = NULL;
3594         struct rbd_options *rbd_opts = NULL;
3595         struct ceph_options *copts;
3596         int ret;
3597
3598         /* The first four tokens are required */
3599
3600         len = next_token(&buf);
3601         if (!len) {
3602                 rbd_warn(NULL, "no monitor address(es) provided");
3603                 return -EINVAL;
3604         }
3605         mon_addrs = buf;
3606         mon_addrs_size = len + 1;
3607         buf += len;
3608
3609         ret = -EINVAL;
3610         options = dup_token(&buf, NULL);
3611         if (!options)
3612                 return -ENOMEM;
3613         if (!*options) {
3614                 rbd_warn(NULL, "no options provided");
3615                 goto out_err;
3616         }
3617
3618         spec = rbd_spec_alloc();
3619         if (!spec)
3620                 goto out_mem;
3621
3622         spec->pool_name = dup_token(&buf, NULL);
3623         if (!spec->pool_name)
3624                 goto out_mem;
3625         if (!*spec->pool_name) {
3626                 rbd_warn(NULL, "no pool name provided");
3627                 goto out_err;
3628         }
3629
3630         spec->image_name = dup_token(&buf, NULL);
3631         if (!spec->image_name)
3632                 goto out_mem;
3633         if (!*spec->image_name) {
3634                 rbd_warn(NULL, "no image name provided");
3635                 goto out_err;
3636         }
3637
3638         /*
3639          * Snapshot name is optional; default is to use "-"
3640          * (indicating the head/no snapshot).
3641          */
3642         len = next_token(&buf);
3643         if (!len) {
3644                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3645                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3646         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3647                 ret = -ENAMETOOLONG;
3648                 goto out_err;
3649         }
3650         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3651         if (!spec->snap_name)
3652                 goto out_mem;
3653         *(spec->snap_name + len) = '\0';
3654
3655         /* Initialize all rbd options to the defaults */
3656
3657         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3658         if (!rbd_opts)
3659                 goto out_mem;
3660
3661         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3662
3663         copts = ceph_parse_options(options, mon_addrs,
3664                                         mon_addrs + mon_addrs_size - 1,
3665                                         parse_rbd_opts_token, rbd_opts);
3666         if (IS_ERR(copts)) {
3667                 ret = PTR_ERR(copts);
3668                 goto out_err;
3669         }
3670         kfree(options);
3671
3672         *ceph_opts = copts;
3673         *opts = rbd_opts;
3674         *rbd_spec = spec;
3675
3676         return 0;
3677 out_mem:
3678         ret = -ENOMEM;
3679 out_err:
3680         kfree(rbd_opts);
3681         rbd_spec_put(spec);
3682         kfree(options);
3683
3684         return ret;
3685 }
3686
3687 /*
3688  * An rbd format 2 image has a unique identifier, distinct from the
3689  * name given to it by the user.  Internally, that identifier is
3690  * what's used to specify the names of objects related to the image.
3691  *
3692  * A special "rbd id" object is used to map an rbd image name to its
3693  * id.  If that object doesn't exist, then there is no v2 rbd image
3694  * with the supplied name.
3695  *
3696  * This function will record the given rbd_dev's image_id field if
3697  * it can be determined, and in that case will return 0.  If any
3698  * errors occur a negative errno will be returned and the rbd_dev's
3699  * image_id field will be unchanged (and should be NULL).
3700  */
3701 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3702 {
3703         int ret;
3704         size_t size;
3705         char *object_name;
3706         void *response;
3707         void *p;
3708
3709         /*
3710          * When probing a parent image, the image id is already
3711          * known (and the image name likely is not).  There's no
3712          * need to fetch the image id again in this case.
3713          */
3714         if (rbd_dev->spec->image_id)
3715                 return 0;
3716
3717         /*
3718          * First, see if the format 2 image id file exists, and if
3719          * so, get the image's persistent id from it.
3720          */
3721         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3722         object_name = kmalloc(size, GFP_NOIO);
3723         if (!object_name)
3724                 return -ENOMEM;
3725         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3726         dout("rbd id object name is %s\n", object_name);
3727
3728         /* Response will be an encoded string, which includes a length */
3729
3730         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3731         response = kzalloc(size, GFP_NOIO);
3732         if (!response) {
3733                 ret = -ENOMEM;
3734                 goto out;
3735         }
3736
3737         ret = rbd_obj_method_sync(rbd_dev, object_name,
3738                                 "rbd", "get_id",
3739                                 NULL, 0,
3740                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3741         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3742         if (ret < 0)
3743                 goto out;
3744
3745         p = response;
3746         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3747                                                 p + RBD_IMAGE_ID_LEN_MAX,
3748                                                 NULL, GFP_NOIO);
3749         if (IS_ERR(rbd_dev->spec->image_id)) {
3750                 ret = PTR_ERR(rbd_dev->spec->image_id);
3751                 rbd_dev->spec->image_id = NULL;
3752         } else {
3753                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3754         }
3755 out:
3756         kfree(response);
3757         kfree(object_name);
3758
3759         return ret;
3760 }
3761
3762 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3763 {
3764         int ret;
3765         size_t size;
3766
3767         /* Version 1 images have no id; empty string is used */
3768
3769         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3770         if (!rbd_dev->spec->image_id)
3771                 return -ENOMEM;
3772
3773         /* Record the header object name for this rbd image. */
3774
3775         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3776         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3777         if (!rbd_dev->header_name) {
3778                 ret = -ENOMEM;
3779                 goto out_err;
3780         }
3781         sprintf(rbd_dev->header_name, "%s%s",
3782                 rbd_dev->spec->image_name, RBD_SUFFIX);
3783
3784         /* Populate rbd image metadata */
3785
3786         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3787         if (ret < 0)
3788                 goto out_err;
3789
3790         /* Version 1 images have no parent (no layering) */
3791
3792         rbd_dev->parent_spec = NULL;
3793         rbd_dev->parent_overlap = 0;
3794
3795         rbd_dev->image_format = 1;
3796
3797         dout("discovered version 1 image, header name is %s\n",
3798                 rbd_dev->header_name);
3799
3800         return 0;
3801
3802 out_err:
3803         kfree(rbd_dev->header_name);
3804         rbd_dev->header_name = NULL;
3805         kfree(rbd_dev->spec->image_id);
3806         rbd_dev->spec->image_id = NULL;
3807
3808         return ret;
3809 }
3810
3811 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3812 {
3813         size_t size;
3814         int ret;
3815         u64 ver = 0;
3816
3817         /*
3818          * Image id was filled in by the caller.  Record the header
3819          * object name for this rbd image.
3820          */
3821         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3822         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3823         if (!rbd_dev->header_name)
3824                 return -ENOMEM;
3825         sprintf(rbd_dev->header_name, "%s%s",
3826                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3827
3828         /* Get the size and object order for the image */
3829
3830         ret = rbd_dev_v2_image_size(rbd_dev);
3831         if (ret < 0)
3832                 goto out_err;
3833
3834         /* Get the object prefix (a.k.a. block_name) for the image */
3835
3836         ret = rbd_dev_v2_object_prefix(rbd_dev);
3837         if (ret < 0)
3838                 goto out_err;
3839
3840         /* Get the and check features for the image */
3841
3842         ret = rbd_dev_v2_features(rbd_dev);
3843         if (ret < 0)
3844                 goto out_err;
3845
3846         /* If the image supports layering, get the parent info */
3847
3848         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3849                 ret = rbd_dev_v2_parent_info(rbd_dev);
3850                 if (ret < 0)
3851                         goto out_err;
3852         }
3853
3854         /* crypto and compression type aren't (yet) supported for v2 images */
3855
3856         rbd_dev->header.crypt_type = 0;
3857         rbd_dev->header.comp_type = 0;
3858
3859         /* Get the snapshot context, plus the header version */
3860
3861         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3862         if (ret)
3863                 goto out_err;
3864         rbd_dev->header.obj_version = ver;
3865
3866         rbd_dev->image_format = 2;
3867
3868         dout("discovered version 2 image, header name is %s\n",
3869                 rbd_dev->header_name);
3870
3871         return 0;
3872 out_err:
3873         rbd_dev->parent_overlap = 0;
3874         rbd_spec_put(rbd_dev->parent_spec);
3875         rbd_dev->parent_spec = NULL;
3876         kfree(rbd_dev->header_name);
3877         rbd_dev->header_name = NULL;
3878         kfree(rbd_dev->header.object_prefix);
3879         rbd_dev->header.object_prefix = NULL;
3880
3881         return ret;
3882 }
3883
3884 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3885 {
3886         int ret;
3887
3888         /* no need to lock here, as rbd_dev is not registered yet */
3889         ret = rbd_dev_snaps_update(rbd_dev);
3890         if (ret)
3891                 return ret;
3892
3893         ret = rbd_dev_probe_update_spec(rbd_dev);
3894         if (ret)
3895                 goto err_out_snaps;
3896
3897         ret = rbd_dev_set_mapping(rbd_dev);
3898         if (ret)
3899                 goto err_out_snaps;
3900
3901         /* generate unique id: find highest unique id, add one */
3902         rbd_dev_id_get(rbd_dev);
3903
3904         /* Fill in the device name, now that we have its id. */
3905         BUILD_BUG_ON(DEV_NAME_LEN
3906                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3907         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3908
3909         /* Get our block major device number. */
3910
3911         ret = register_blkdev(0, rbd_dev->name);
3912         if (ret < 0)
3913                 goto err_out_id;
3914         rbd_dev->major = ret;
3915
3916         /* Set up the blkdev mapping. */
3917
3918         ret = rbd_init_disk(rbd_dev);
3919         if (ret)
3920                 goto err_out_blkdev;
3921
3922         ret = rbd_bus_add_dev(rbd_dev);
3923         if (ret)
3924                 goto err_out_disk;
3925
3926         /*
3927          * At this point cleanup in the event of an error is the job
3928          * of the sysfs code (initiated by rbd_bus_del_dev()).
3929          */
3930         down_write(&rbd_dev->header_rwsem);
3931         ret = rbd_dev_snaps_register(rbd_dev);
3932         up_write(&rbd_dev->header_rwsem);
3933         if (ret)
3934                 goto err_out_bus;
3935
3936         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3937         if (ret)
3938                 goto err_out_bus;
3939
3940         /* Everything's ready.  Announce the disk to the world. */
3941
3942         add_disk(rbd_dev->disk);
3943
3944         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3945                 (unsigned long long) rbd_dev->mapping.size);
3946
3947         return ret;
3948 err_out_bus:
3949         /* this will also clean up rest of rbd_dev stuff */
3950
3951         rbd_bus_del_dev(rbd_dev);
3952
3953         return ret;
3954 err_out_disk:
3955         rbd_free_disk(rbd_dev);
3956 err_out_blkdev:
3957         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3958 err_out_id:
3959         rbd_dev_id_put(rbd_dev);
3960 err_out_snaps:
3961         rbd_remove_all_snaps(rbd_dev);
3962
3963         return ret;
3964 }
3965
3966 /*
3967  * Probe for the existence of the header object for the given rbd
3968  * device.  For format 2 images this includes determining the image
3969  * id.
3970  */
3971 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3972 {
3973         int ret;
3974
3975         /*
3976          * Get the id from the image id object.  If it's not a
3977          * format 2 image, we'll get ENOENT back, and we'll assume
3978          * it's a format 1 image.
3979          */
3980         ret = rbd_dev_image_id(rbd_dev);
3981         if (ret)
3982                 ret = rbd_dev_v1_probe(rbd_dev);
3983         else
3984                 ret = rbd_dev_v2_probe(rbd_dev);
3985         if (ret) {
3986                 dout("probe failed, returning %d\n", ret);
3987
3988                 return ret;
3989         }
3990
3991         ret = rbd_dev_probe_finish(rbd_dev);
3992         if (ret)
3993                 rbd_header_free(&rbd_dev->header);
3994
3995         return ret;
3996 }
3997
3998 static ssize_t rbd_add(struct bus_type *bus,
3999                        const char *buf,
4000                        size_t count)
4001 {
4002         struct rbd_device *rbd_dev = NULL;
4003         struct ceph_options *ceph_opts = NULL;
4004         struct rbd_options *rbd_opts = NULL;
4005         struct rbd_spec *spec = NULL;
4006         struct rbd_client *rbdc;
4007         struct ceph_osd_client *osdc;
4008         int rc = -ENOMEM;
4009
4010         if (!try_module_get(THIS_MODULE))
4011                 return -ENODEV;
4012
4013         /* parse add command */
4014         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4015         if (rc < 0)
4016                 goto err_out_module;
4017
4018         rbdc = rbd_get_client(ceph_opts);
4019         if (IS_ERR(rbdc)) {
4020                 rc = PTR_ERR(rbdc);
4021                 goto err_out_args;
4022         }
4023         ceph_opts = NULL;       /* rbd_dev client now owns this */
4024
4025         /* pick the pool */
4026         osdc = &rbdc->client->osdc;
4027         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4028         if (rc < 0)
4029                 goto err_out_client;
4030         spec->pool_id = (u64) rc;
4031
4032         /* The ceph file layout needs to fit pool id in 32 bits */
4033
4034         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4035                 rc = -EIO;
4036                 goto err_out_client;
4037         }
4038
4039         rbd_dev = rbd_dev_create(rbdc, spec);
4040         if (!rbd_dev)
4041                 goto err_out_client;
4042         rbdc = NULL;            /* rbd_dev now owns this */
4043         spec = NULL;            /* rbd_dev now owns this */
4044
4045         rbd_dev->mapping.read_only = rbd_opts->read_only;
4046         kfree(rbd_opts);
4047         rbd_opts = NULL;        /* done with this */
4048
4049         rc = rbd_dev_probe(rbd_dev);
4050         if (rc < 0)
4051                 goto err_out_rbd_dev;
4052
4053         return count;
4054 err_out_rbd_dev:
4055         rbd_dev_destroy(rbd_dev);
4056 err_out_client:
4057         rbd_put_client(rbdc);
4058 err_out_args:
4059         if (ceph_opts)
4060                 ceph_destroy_options(ceph_opts);
4061         kfree(rbd_opts);
4062         rbd_spec_put(spec);
4063 err_out_module:
4064         module_put(THIS_MODULE);
4065
4066         dout("Error adding device %s\n", buf);
4067
4068         return (ssize_t) rc;
4069 }
4070
4071 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4072 {
4073         struct list_head *tmp;
4074         struct rbd_device *rbd_dev;
4075
4076         spin_lock(&rbd_dev_list_lock);
4077         list_for_each(tmp, &rbd_dev_list) {
4078                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4079                 if (rbd_dev->dev_id == dev_id) {
4080                         spin_unlock(&rbd_dev_list_lock);
4081                         return rbd_dev;
4082                 }
4083         }
4084         spin_unlock(&rbd_dev_list_lock);
4085         return NULL;
4086 }
4087
4088 static void rbd_dev_release(struct device *dev)
4089 {
4090         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4091
4092         if (rbd_dev->watch_event)
4093                 rbd_dev_header_watch_sync(rbd_dev, 0);
4094
4095         /* clean up and free blkdev */
4096         rbd_free_disk(rbd_dev);
4097         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4098
4099         /* release allocated disk header fields */
4100         rbd_header_free(&rbd_dev->header);
4101
4102         /* done with the id, and with the rbd_dev */
4103         rbd_dev_id_put(rbd_dev);
4104         rbd_assert(rbd_dev->rbd_client != NULL);
4105         rbd_dev_destroy(rbd_dev);
4106
4107         /* release module ref */
4108         module_put(THIS_MODULE);
4109 }
4110
4111 static ssize_t rbd_remove(struct bus_type *bus,
4112                           const char *buf,
4113                           size_t count)
4114 {
4115         struct rbd_device *rbd_dev = NULL;
4116         int target_id, rc;
4117         unsigned long ul;
4118         int ret = count;
4119
4120         rc = strict_strtoul(buf, 10, &ul);
4121         if (rc)
4122                 return rc;
4123
4124         /* convert to int; abort if we lost anything in the conversion */
4125         target_id = (int) ul;
4126         if (target_id != ul)
4127                 return -EINVAL;
4128
4129         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4130
4131         rbd_dev = __rbd_get_dev(target_id);
4132         if (!rbd_dev) {
4133                 ret = -ENOENT;
4134                 goto done;
4135         }
4136
4137         spin_lock_irq(&rbd_dev->lock);
4138         if (rbd_dev->open_count)
4139                 ret = -EBUSY;
4140         else
4141                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4142         spin_unlock_irq(&rbd_dev->lock);
4143         if (ret < 0)
4144                 goto done;
4145
4146         rbd_remove_all_snaps(rbd_dev);
4147         rbd_bus_del_dev(rbd_dev);
4148
4149 done:
4150         mutex_unlock(&ctl_mutex);
4151
4152         return ret;
4153 }
4154
4155 /*
4156  * create control files in sysfs
4157  * /sys/bus/rbd/...
4158  */
4159 static int rbd_sysfs_init(void)
4160 {
4161         int ret;
4162
4163         ret = device_register(&rbd_root_dev);
4164         if (ret < 0)
4165                 return ret;
4166
4167         ret = bus_register(&rbd_bus_type);
4168         if (ret < 0)
4169                 device_unregister(&rbd_root_dev);
4170
4171         return ret;
4172 }
4173
4174 static void rbd_sysfs_cleanup(void)
4175 {
4176         bus_unregister(&rbd_bus_type);
4177         device_unregister(&rbd_root_dev);
4178 }
4179
4180 static int __init rbd_init(void)
4181 {
4182         int rc;
4183
4184         if (!libceph_compatible(NULL)) {
4185                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4186
4187                 return -EINVAL;
4188         }
4189         rc = rbd_sysfs_init();
4190         if (rc)
4191                 return rc;
4192         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4193         return 0;
4194 }
4195
4196 static void __exit rbd_exit(void)
4197 {
4198         rbd_sysfs_cleanup();
4199 }
4200
4201 module_init(rbd_init);
4202 module_exit(rbd_exit);
4203
4204 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4205 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4206 MODULE_DESCRIPTION("rados block device");
4207
4208 /* following authorship retained from original osdblk.c */
4209 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4210
4211 MODULE_LICENSE("GPL");