]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
libceph: combine initializing and setting osd data
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING      1
77
78 /* Features supported by this (client software) implementation. */
79
80 #define RBD_FEATURES_ALL          (0)
81
82 /*
83  * An RBD device name will be "rbd#", where the "rbd" comes from
84  * RBD_DRV_NAME above, and # is a unique integer identifier.
85  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
86  * enough to hold all possible device names.
87  */
88 #define DEV_NAME_LEN            32
89 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
90
91 /*
92  * block device image metadata (in-memory version)
93  */
94 struct rbd_image_header {
95         /* These four fields never change for a given rbd image */
96         char *object_prefix;
97         u64 features;
98         __u8 obj_order;
99         __u8 crypt_type;
100         __u8 comp_type;
101
102         /* The remaining fields need to be updated occasionally */
103         u64 image_size;
104         struct ceph_snap_context *snapc;
105         char *snap_names;
106         u64 *snap_sizes;
107
108         u64 obj_version;
109 };
110
111 /*
112  * An rbd image specification.
113  *
114  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
115  * identify an image.  Each rbd_dev structure includes a pointer to
116  * an rbd_spec structure that encapsulates this identity.
117  *
118  * Each of the id's in an rbd_spec has an associated name.  For a
119  * user-mapped image, the names are supplied and the id's associated
120  * with them are looked up.  For a layered image, a parent image is
121  * defined by the tuple, and the names are looked up.
122  *
123  * An rbd_dev structure contains a parent_spec pointer which is
124  * non-null if the image it represents is a child in a layered
125  * image.  This pointer will refer to the rbd_spec structure used
126  * by the parent rbd_dev for its own identity (i.e., the structure
127  * is shared between the parent and child).
128  *
129  * Since these structures are populated once, during the discovery
130  * phase of image construction, they are effectively immutable so
131  * we make no effort to synchronize access to them.
132  *
133  * Note that code herein does not assume the image name is known (it
134  * could be a null pointer).
135  */
136 struct rbd_spec {
137         u64             pool_id;
138         char            *pool_name;
139
140         char            *image_id;
141         char            *image_name;
142
143         u64             snap_id;
144         char            *snap_name;
145
146         struct kref     kref;
147 };
148
149 /*
150  * an instance of the client.  multiple devices may share an rbd client.
151  */
152 struct rbd_client {
153         struct ceph_client      *client;
154         struct kref             kref;
155         struct list_head        node;
156 };
157
158 struct rbd_img_request;
159 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
160
161 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
162
163 struct rbd_obj_request;
164 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
165
166 enum obj_request_type {
167         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
168 };
169
170 struct rbd_obj_request {
171         const char              *object_name;
172         u64                     offset;         /* object start byte */
173         u64                     length;         /* bytes from offset */
174
175         struct rbd_img_request  *img_request;
176         struct list_head        links;          /* img_request->obj_requests */
177         u32                     which;          /* posn image request list */
178
179         enum obj_request_type   type;
180         union {
181                 struct bio      *bio_list;
182                 struct {
183                         struct page     **pages;
184                         u32             page_count;
185                 };
186         };
187
188         struct ceph_osd_request *osd_req;
189
190         u64                     xferred;        /* bytes transferred */
191         u64                     version;
192         int                     result;
193         atomic_t                done;
194
195         rbd_obj_callback_t      callback;
196         struct completion       completion;
197
198         struct kref             kref;
199 };
200
201 struct rbd_img_request {
202         struct request          *rq;
203         struct rbd_device       *rbd_dev;
204         u64                     offset; /* starting image byte offset */
205         u64                     length; /* byte count from offset */
206         bool                    write_request;  /* false for read */
207         union {
208                 struct ceph_snap_context *snapc;        /* for writes */
209                 u64             snap_id;                /* for reads */
210         };
211         spinlock_t              completion_lock;/* protects next_completion */
212         u32                     next_completion;
213         rbd_img_callback_t      callback;
214
215         u32                     obj_request_count;
216         struct list_head        obj_requests;   /* rbd_obj_request structs */
217
218         struct kref             kref;
219 };
220
221 #define for_each_obj_request(ireq, oreq) \
222         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
223 #define for_each_obj_request_from(ireq, oreq) \
224         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
225 #define for_each_obj_request_safe(ireq, oreq, n) \
226         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
227
228 struct rbd_snap {
229         struct  device          dev;
230         const char              *name;
231         u64                     size;
232         struct list_head        node;
233         u64                     id;
234         u64                     features;
235 };
236
237 struct rbd_mapping {
238         u64                     size;
239         u64                     features;
240         bool                    read_only;
241 };
242
243 /*
244  * a single device
245  */
246 struct rbd_device {
247         int                     dev_id;         /* blkdev unique id */
248
249         int                     major;          /* blkdev assigned major */
250         struct gendisk          *disk;          /* blkdev's gendisk and rq */
251
252         u32                     image_format;   /* Either 1 or 2 */
253         struct rbd_client       *rbd_client;
254
255         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
256
257         spinlock_t              lock;           /* queue, flags, open_count */
258
259         struct rbd_image_header header;
260         unsigned long           flags;          /* possibly lock protected */
261         struct rbd_spec         *spec;
262
263         char                    *header_name;
264
265         struct ceph_file_layout layout;
266
267         struct ceph_osd_event   *watch_event;
268         struct rbd_obj_request  *watch_request;
269
270         struct rbd_spec         *parent_spec;
271         u64                     parent_overlap;
272
273         /* protects updating the header */
274         struct rw_semaphore     header_rwsem;
275
276         struct rbd_mapping      mapping;
277
278         struct list_head        node;
279
280         /* list of snapshots */
281         struct list_head        snaps;
282
283         /* sysfs related */
284         struct device           dev;
285         unsigned long           open_count;     /* protected by lock */
286 };
287
288 /*
289  * Flag bits for rbd_dev->flags.  If atomicity is required,
290  * rbd_dev->lock is used to protect access.
291  *
292  * Currently, only the "removing" flag (which is coupled with the
293  * "open_count" field) requires atomic access.
294  */
295 enum rbd_dev_flags {
296         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
297         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
298 };
299
300 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
301
302 static LIST_HEAD(rbd_dev_list);    /* devices */
303 static DEFINE_SPINLOCK(rbd_dev_list_lock);
304
305 static LIST_HEAD(rbd_client_list);              /* clients */
306 static DEFINE_SPINLOCK(rbd_client_list_lock);
307
308 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
309 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
310
311 static void rbd_dev_release(struct device *dev);
312 static void rbd_remove_snap_dev(struct rbd_snap *snap);
313
314 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
315                        size_t count);
316 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
317                           size_t count);
318
319 static struct bus_attribute rbd_bus_attrs[] = {
320         __ATTR(add, S_IWUSR, NULL, rbd_add),
321         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
322         __ATTR_NULL
323 };
324
325 static struct bus_type rbd_bus_type = {
326         .name           = "rbd",
327         .bus_attrs      = rbd_bus_attrs,
328 };
329
330 static void rbd_root_dev_release(struct device *dev)
331 {
332 }
333
334 static struct device rbd_root_dev = {
335         .init_name =    "rbd",
336         .release =      rbd_root_dev_release,
337 };
338
339 static __printf(2, 3)
340 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
341 {
342         struct va_format vaf;
343         va_list args;
344
345         va_start(args, fmt);
346         vaf.fmt = fmt;
347         vaf.va = &args;
348
349         if (!rbd_dev)
350                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
351         else if (rbd_dev->disk)
352                 printk(KERN_WARNING "%s: %s: %pV\n",
353                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
354         else if (rbd_dev->spec && rbd_dev->spec->image_name)
355                 printk(KERN_WARNING "%s: image %s: %pV\n",
356                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
357         else if (rbd_dev->spec && rbd_dev->spec->image_id)
358                 printk(KERN_WARNING "%s: id %s: %pV\n",
359                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
360         else    /* punt */
361                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
362                         RBD_DRV_NAME, rbd_dev, &vaf);
363         va_end(args);
364 }
365
366 #ifdef RBD_DEBUG
367 #define rbd_assert(expr)                                                \
368                 if (unlikely(!(expr))) {                                \
369                         printk(KERN_ERR "\nAssertion failure in %s() "  \
370                                                 "at line %d:\n\n"       \
371                                         "\trbd_assert(%s);\n\n",        \
372                                         __func__, __LINE__, #expr);     \
373                         BUG();                                          \
374                 }
375 #else /* !RBD_DEBUG */
376 #  define rbd_assert(expr)      ((void) 0)
377 #endif /* !RBD_DEBUG */
378
379 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
380 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
381
382 static int rbd_open(struct block_device *bdev, fmode_t mode)
383 {
384         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
385         bool removing = false;
386
387         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
388                 return -EROFS;
389
390         spin_lock_irq(&rbd_dev->lock);
391         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
392                 removing = true;
393         else
394                 rbd_dev->open_count++;
395         spin_unlock_irq(&rbd_dev->lock);
396         if (removing)
397                 return -ENOENT;
398
399         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
400         (void) get_device(&rbd_dev->dev);
401         set_device_ro(bdev, rbd_dev->mapping.read_only);
402         mutex_unlock(&ctl_mutex);
403
404         return 0;
405 }
406
407 static int rbd_release(struct gendisk *disk, fmode_t mode)
408 {
409         struct rbd_device *rbd_dev = disk->private_data;
410         unsigned long open_count_before;
411
412         spin_lock_irq(&rbd_dev->lock);
413         open_count_before = rbd_dev->open_count--;
414         spin_unlock_irq(&rbd_dev->lock);
415         rbd_assert(open_count_before > 0);
416
417         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
418         put_device(&rbd_dev->dev);
419         mutex_unlock(&ctl_mutex);
420
421         return 0;
422 }
423
424 static const struct block_device_operations rbd_bd_ops = {
425         .owner                  = THIS_MODULE,
426         .open                   = rbd_open,
427         .release                = rbd_release,
428 };
429
430 /*
431  * Initialize an rbd client instance.
432  * We own *ceph_opts.
433  */
434 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
435 {
436         struct rbd_client *rbdc;
437         int ret = -ENOMEM;
438
439         dout("%s:\n", __func__);
440         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
441         if (!rbdc)
442                 goto out_opt;
443
444         kref_init(&rbdc->kref);
445         INIT_LIST_HEAD(&rbdc->node);
446
447         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448
449         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
450         if (IS_ERR(rbdc->client))
451                 goto out_mutex;
452         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
453
454         ret = ceph_open_session(rbdc->client);
455         if (ret < 0)
456                 goto out_err;
457
458         spin_lock(&rbd_client_list_lock);
459         list_add_tail(&rbdc->node, &rbd_client_list);
460         spin_unlock(&rbd_client_list_lock);
461
462         mutex_unlock(&ctl_mutex);
463         dout("%s: rbdc %p\n", __func__, rbdc);
464
465         return rbdc;
466
467 out_err:
468         ceph_destroy_client(rbdc->client);
469 out_mutex:
470         mutex_unlock(&ctl_mutex);
471         kfree(rbdc);
472 out_opt:
473         if (ceph_opts)
474                 ceph_destroy_options(ceph_opts);
475         dout("%s: error %d\n", __func__, ret);
476
477         return ERR_PTR(ret);
478 }
479
480 /*
481  * Find a ceph client with specific addr and configuration.  If
482  * found, bump its reference count.
483  */
484 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
485 {
486         struct rbd_client *client_node;
487         bool found = false;
488
489         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
490                 return NULL;
491
492         spin_lock(&rbd_client_list_lock);
493         list_for_each_entry(client_node, &rbd_client_list, node) {
494                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
495                         kref_get(&client_node->kref);
496                         found = true;
497                         break;
498                 }
499         }
500         spin_unlock(&rbd_client_list_lock);
501
502         return found ? client_node : NULL;
503 }
504
505 /*
506  * mount options
507  */
508 enum {
509         Opt_last_int,
510         /* int args above */
511         Opt_last_string,
512         /* string args above */
513         Opt_read_only,
514         Opt_read_write,
515         /* Boolean args above */
516         Opt_last_bool,
517 };
518
519 static match_table_t rbd_opts_tokens = {
520         /* int args above */
521         /* string args above */
522         {Opt_read_only, "read_only"},
523         {Opt_read_only, "ro"},          /* Alternate spelling */
524         {Opt_read_write, "read_write"},
525         {Opt_read_write, "rw"},         /* Alternate spelling */
526         /* Boolean args above */
527         {-1, NULL}
528 };
529
530 struct rbd_options {
531         bool    read_only;
532 };
533
534 #define RBD_READ_ONLY_DEFAULT   false
535
536 static int parse_rbd_opts_token(char *c, void *private)
537 {
538         struct rbd_options *rbd_opts = private;
539         substring_t argstr[MAX_OPT_ARGS];
540         int token, intval, ret;
541
542         token = match_token(c, rbd_opts_tokens, argstr);
543         if (token < 0)
544                 return -EINVAL;
545
546         if (token < Opt_last_int) {
547                 ret = match_int(&argstr[0], &intval);
548                 if (ret < 0) {
549                         pr_err("bad mount option arg (not int) "
550                                "at '%s'\n", c);
551                         return ret;
552                 }
553                 dout("got int token %d val %d\n", token, intval);
554         } else if (token > Opt_last_int && token < Opt_last_string) {
555                 dout("got string token %d val %s\n", token,
556                      argstr[0].from);
557         } else if (token > Opt_last_string && token < Opt_last_bool) {
558                 dout("got Boolean token %d\n", token);
559         } else {
560                 dout("got token %d\n", token);
561         }
562
563         switch (token) {
564         case Opt_read_only:
565                 rbd_opts->read_only = true;
566                 break;
567         case Opt_read_write:
568                 rbd_opts->read_only = false;
569                 break;
570         default:
571                 rbd_assert(false);
572                 break;
573         }
574         return 0;
575 }
576
577 /*
578  * Get a ceph client with specific addr and configuration, if one does
579  * not exist create it.
580  */
581 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
582 {
583         struct rbd_client *rbdc;
584
585         rbdc = rbd_client_find(ceph_opts);
586         if (rbdc)       /* using an existing client */
587                 ceph_destroy_options(ceph_opts);
588         else
589                 rbdc = rbd_client_create(ceph_opts);
590
591         return rbdc;
592 }
593
594 /*
595  * Destroy ceph client
596  *
597  * Caller must hold rbd_client_list_lock.
598  */
599 static void rbd_client_release(struct kref *kref)
600 {
601         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
602
603         dout("%s: rbdc %p\n", __func__, rbdc);
604         spin_lock(&rbd_client_list_lock);
605         list_del(&rbdc->node);
606         spin_unlock(&rbd_client_list_lock);
607
608         ceph_destroy_client(rbdc->client);
609         kfree(rbdc);
610 }
611
612 /*
613  * Drop reference to ceph client node. If it's not referenced anymore, release
614  * it.
615  */
616 static void rbd_put_client(struct rbd_client *rbdc)
617 {
618         if (rbdc)
619                 kref_put(&rbdc->kref, rbd_client_release);
620 }
621
622 static bool rbd_image_format_valid(u32 image_format)
623 {
624         return image_format == 1 || image_format == 2;
625 }
626
627 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
628 {
629         size_t size;
630         u32 snap_count;
631
632         /* The header has to start with the magic rbd header text */
633         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
634                 return false;
635
636         /* The bio layer requires at least sector-sized I/O */
637
638         if (ondisk->options.order < SECTOR_SHIFT)
639                 return false;
640
641         /* If we use u64 in a few spots we may be able to loosen this */
642
643         if (ondisk->options.order > 8 * sizeof (int) - 1)
644                 return false;
645
646         /*
647          * The size of a snapshot header has to fit in a size_t, and
648          * that limits the number of snapshots.
649          */
650         snap_count = le32_to_cpu(ondisk->snap_count);
651         size = SIZE_MAX - sizeof (struct ceph_snap_context);
652         if (snap_count > size / sizeof (__le64))
653                 return false;
654
655         /*
656          * Not only that, but the size of the entire the snapshot
657          * header must also be representable in a size_t.
658          */
659         size -= snap_count * sizeof (__le64);
660         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
661                 return false;
662
663         return true;
664 }
665
666 /*
667  * Create a new header structure, translate header format from the on-disk
668  * header.
669  */
670 static int rbd_header_from_disk(struct rbd_image_header *header,
671                                  struct rbd_image_header_ondisk *ondisk)
672 {
673         u32 snap_count;
674         size_t len;
675         size_t size;
676         u32 i;
677
678         memset(header, 0, sizeof (*header));
679
680         snap_count = le32_to_cpu(ondisk->snap_count);
681
682         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
683         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
684         if (!header->object_prefix)
685                 return -ENOMEM;
686         memcpy(header->object_prefix, ondisk->object_prefix, len);
687         header->object_prefix[len] = '\0';
688
689         if (snap_count) {
690                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
691
692                 /* Save a copy of the snapshot names */
693
694                 if (snap_names_len > (u64) SIZE_MAX)
695                         return -EIO;
696                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
697                 if (!header->snap_names)
698                         goto out_err;
699                 /*
700                  * Note that rbd_dev_v1_header_read() guarantees
701                  * the ondisk buffer we're working with has
702                  * snap_names_len bytes beyond the end of the
703                  * snapshot id array, this memcpy() is safe.
704                  */
705                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
706                         snap_names_len);
707
708                 /* Record each snapshot's size */
709
710                 size = snap_count * sizeof (*header->snap_sizes);
711                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
712                 if (!header->snap_sizes)
713                         goto out_err;
714                 for (i = 0; i < snap_count; i++)
715                         header->snap_sizes[i] =
716                                 le64_to_cpu(ondisk->snaps[i].image_size);
717         } else {
718                 WARN_ON(ondisk->snap_names_len);
719                 header->snap_names = NULL;
720                 header->snap_sizes = NULL;
721         }
722
723         header->features = 0;   /* No features support in v1 images */
724         header->obj_order = ondisk->options.order;
725         header->crypt_type = ondisk->options.crypt_type;
726         header->comp_type = ondisk->options.comp_type;
727
728         /* Allocate and fill in the snapshot context */
729
730         header->image_size = le64_to_cpu(ondisk->image_size);
731         size = sizeof (struct ceph_snap_context);
732         size += snap_count * sizeof (header->snapc->snaps[0]);
733         header->snapc = kzalloc(size, GFP_KERNEL);
734         if (!header->snapc)
735                 goto out_err;
736
737         atomic_set(&header->snapc->nref, 1);
738         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
739         header->snapc->num_snaps = snap_count;
740         for (i = 0; i < snap_count; i++)
741                 header->snapc->snaps[i] =
742                         le64_to_cpu(ondisk->snaps[i].id);
743
744         return 0;
745
746 out_err:
747         kfree(header->snap_sizes);
748         header->snap_sizes = NULL;
749         kfree(header->snap_names);
750         header->snap_names = NULL;
751         kfree(header->object_prefix);
752         header->object_prefix = NULL;
753
754         return -ENOMEM;
755 }
756
757 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
758 {
759         struct rbd_snap *snap;
760
761         if (snap_id == CEPH_NOSNAP)
762                 return RBD_SNAP_HEAD_NAME;
763
764         list_for_each_entry(snap, &rbd_dev->snaps, node)
765                 if (snap_id == snap->id)
766                         return snap->name;
767
768         return NULL;
769 }
770
771 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
772 {
773
774         struct rbd_snap *snap;
775
776         list_for_each_entry(snap, &rbd_dev->snaps, node) {
777                 if (!strcmp(snap_name, snap->name)) {
778                         rbd_dev->spec->snap_id = snap->id;
779                         rbd_dev->mapping.size = snap->size;
780                         rbd_dev->mapping.features = snap->features;
781
782                         return 0;
783                 }
784         }
785
786         return -ENOENT;
787 }
788
789 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
790 {
791         int ret;
792
793         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
794                     sizeof (RBD_SNAP_HEAD_NAME))) {
795                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
796                 rbd_dev->mapping.size = rbd_dev->header.image_size;
797                 rbd_dev->mapping.features = rbd_dev->header.features;
798                 ret = 0;
799         } else {
800                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
801                 if (ret < 0)
802                         goto done;
803                 rbd_dev->mapping.read_only = true;
804         }
805         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
806
807 done:
808         return ret;
809 }
810
811 static void rbd_header_free(struct rbd_image_header *header)
812 {
813         kfree(header->object_prefix);
814         header->object_prefix = NULL;
815         kfree(header->snap_sizes);
816         header->snap_sizes = NULL;
817         kfree(header->snap_names);
818         header->snap_names = NULL;
819         ceph_put_snap_context(header->snapc);
820         header->snapc = NULL;
821 }
822
823 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
824 {
825         char *name;
826         u64 segment;
827         int ret;
828
829         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
830         if (!name)
831                 return NULL;
832         segment = offset >> rbd_dev->header.obj_order;
833         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
834                         rbd_dev->header.object_prefix, segment);
835         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
836                 pr_err("error formatting segment name for #%llu (%d)\n",
837                         segment, ret);
838                 kfree(name);
839                 name = NULL;
840         }
841
842         return name;
843 }
844
845 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
846 {
847         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
848
849         return offset & (segment_size - 1);
850 }
851
852 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
853                                 u64 offset, u64 length)
854 {
855         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
856
857         offset &= segment_size - 1;
858
859         rbd_assert(length <= U64_MAX - offset);
860         if (offset + length > segment_size)
861                 length = segment_size - offset;
862
863         return length;
864 }
865
866 /*
867  * returns the size of an object in the image
868  */
869 static u64 rbd_obj_bytes(struct rbd_image_header *header)
870 {
871         return 1 << header->obj_order;
872 }
873
874 /*
875  * bio helpers
876  */
877
878 static void bio_chain_put(struct bio *chain)
879 {
880         struct bio *tmp;
881
882         while (chain) {
883                 tmp = chain;
884                 chain = chain->bi_next;
885                 bio_put(tmp);
886         }
887 }
888
889 /*
890  * zeros a bio chain, starting at specific offset
891  */
892 static void zero_bio_chain(struct bio *chain, int start_ofs)
893 {
894         struct bio_vec *bv;
895         unsigned long flags;
896         void *buf;
897         int i;
898         int pos = 0;
899
900         while (chain) {
901                 bio_for_each_segment(bv, chain, i) {
902                         if (pos + bv->bv_len > start_ofs) {
903                                 int remainder = max(start_ofs - pos, 0);
904                                 buf = bvec_kmap_irq(bv, &flags);
905                                 memset(buf + remainder, 0,
906                                        bv->bv_len - remainder);
907                                 bvec_kunmap_irq(buf, &flags);
908                         }
909                         pos += bv->bv_len;
910                 }
911
912                 chain = chain->bi_next;
913         }
914 }
915
916 /*
917  * Clone a portion of a bio, starting at the given byte offset
918  * and continuing for the number of bytes indicated.
919  */
920 static struct bio *bio_clone_range(struct bio *bio_src,
921                                         unsigned int offset,
922                                         unsigned int len,
923                                         gfp_t gfpmask)
924 {
925         struct bio_vec *bv;
926         unsigned int resid;
927         unsigned short idx;
928         unsigned int voff;
929         unsigned short end_idx;
930         unsigned short vcnt;
931         struct bio *bio;
932
933         /* Handle the easy case for the caller */
934
935         if (!offset && len == bio_src->bi_size)
936                 return bio_clone(bio_src, gfpmask);
937
938         if (WARN_ON_ONCE(!len))
939                 return NULL;
940         if (WARN_ON_ONCE(len > bio_src->bi_size))
941                 return NULL;
942         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
943                 return NULL;
944
945         /* Find first affected segment... */
946
947         resid = offset;
948         __bio_for_each_segment(bv, bio_src, idx, 0) {
949                 if (resid < bv->bv_len)
950                         break;
951                 resid -= bv->bv_len;
952         }
953         voff = resid;
954
955         /* ...and the last affected segment */
956
957         resid += len;
958         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
959                 if (resid <= bv->bv_len)
960                         break;
961                 resid -= bv->bv_len;
962         }
963         vcnt = end_idx - idx + 1;
964
965         /* Build the clone */
966
967         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
968         if (!bio)
969                 return NULL;    /* ENOMEM */
970
971         bio->bi_bdev = bio_src->bi_bdev;
972         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
973         bio->bi_rw = bio_src->bi_rw;
974         bio->bi_flags |= 1 << BIO_CLONED;
975
976         /*
977          * Copy over our part of the bio_vec, then update the first
978          * and last (or only) entries.
979          */
980         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
981                         vcnt * sizeof (struct bio_vec));
982         bio->bi_io_vec[0].bv_offset += voff;
983         if (vcnt > 1) {
984                 bio->bi_io_vec[0].bv_len -= voff;
985                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
986         } else {
987                 bio->bi_io_vec[0].bv_len = len;
988         }
989
990         bio->bi_vcnt = vcnt;
991         bio->bi_size = len;
992         bio->bi_idx = 0;
993
994         return bio;
995 }
996
997 /*
998  * Clone a portion of a bio chain, starting at the given byte offset
999  * into the first bio in the source chain and continuing for the
1000  * number of bytes indicated.  The result is another bio chain of
1001  * exactly the given length, or a null pointer on error.
1002  *
1003  * The bio_src and offset parameters are both in-out.  On entry they
1004  * refer to the first source bio and the offset into that bio where
1005  * the start of data to be cloned is located.
1006  *
1007  * On return, bio_src is updated to refer to the bio in the source
1008  * chain that contains first un-cloned byte, and *offset will
1009  * contain the offset of that byte within that bio.
1010  */
1011 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1012                                         unsigned int *offset,
1013                                         unsigned int len,
1014                                         gfp_t gfpmask)
1015 {
1016         struct bio *bi = *bio_src;
1017         unsigned int off = *offset;
1018         struct bio *chain = NULL;
1019         struct bio **end;
1020
1021         /* Build up a chain of clone bios up to the limit */
1022
1023         if (!bi || off >= bi->bi_size || !len)
1024                 return NULL;            /* Nothing to clone */
1025
1026         end = &chain;
1027         while (len) {
1028                 unsigned int bi_size;
1029                 struct bio *bio;
1030
1031                 if (!bi) {
1032                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1033                         goto out_err;   /* EINVAL; ran out of bio's */
1034                 }
1035                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1036                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1037                 if (!bio)
1038                         goto out_err;   /* ENOMEM */
1039
1040                 *end = bio;
1041                 end = &bio->bi_next;
1042
1043                 off += bi_size;
1044                 if (off == bi->bi_size) {
1045                         bi = bi->bi_next;
1046                         off = 0;
1047                 }
1048                 len -= bi_size;
1049         }
1050         *bio_src = bi;
1051         *offset = off;
1052
1053         return chain;
1054 out_err:
1055         bio_chain_put(chain);
1056
1057         return NULL;
1058 }
1059
1060 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1061 {
1062         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1063                 atomic_read(&obj_request->kref.refcount));
1064         kref_get(&obj_request->kref);
1065 }
1066
1067 static void rbd_obj_request_destroy(struct kref *kref);
1068 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1069 {
1070         rbd_assert(obj_request != NULL);
1071         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1072                 atomic_read(&obj_request->kref.refcount));
1073         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1074 }
1075
1076 static void rbd_img_request_get(struct rbd_img_request *img_request)
1077 {
1078         dout("%s: img %p (was %d)\n", __func__, img_request,
1079                 atomic_read(&img_request->kref.refcount));
1080         kref_get(&img_request->kref);
1081 }
1082
1083 static void rbd_img_request_destroy(struct kref *kref);
1084 static void rbd_img_request_put(struct rbd_img_request *img_request)
1085 {
1086         rbd_assert(img_request != NULL);
1087         dout("%s: img %p (was %d)\n", __func__, img_request,
1088                 atomic_read(&img_request->kref.refcount));
1089         kref_put(&img_request->kref, rbd_img_request_destroy);
1090 }
1091
1092 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1093                                         struct rbd_obj_request *obj_request)
1094 {
1095         rbd_assert(obj_request->img_request == NULL);
1096
1097         rbd_obj_request_get(obj_request);
1098         obj_request->img_request = img_request;
1099         obj_request->which = img_request->obj_request_count;
1100         rbd_assert(obj_request->which != BAD_WHICH);
1101         img_request->obj_request_count++;
1102         list_add_tail(&obj_request->links, &img_request->obj_requests);
1103         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1104                 obj_request->which);
1105 }
1106
1107 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1108                                         struct rbd_obj_request *obj_request)
1109 {
1110         rbd_assert(obj_request->which != BAD_WHICH);
1111
1112         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1113                 obj_request->which);
1114         list_del(&obj_request->links);
1115         rbd_assert(img_request->obj_request_count > 0);
1116         img_request->obj_request_count--;
1117         rbd_assert(obj_request->which == img_request->obj_request_count);
1118         obj_request->which = BAD_WHICH;
1119         rbd_assert(obj_request->img_request == img_request);
1120         obj_request->img_request = NULL;
1121         obj_request->callback = NULL;
1122         rbd_obj_request_put(obj_request);
1123 }
1124
1125 static bool obj_request_type_valid(enum obj_request_type type)
1126 {
1127         switch (type) {
1128         case OBJ_REQUEST_NODATA:
1129         case OBJ_REQUEST_BIO:
1130         case OBJ_REQUEST_PAGES:
1131                 return true;
1132         default:
1133                 return false;
1134         }
1135 }
1136
1137 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1138                                 struct rbd_obj_request *obj_request)
1139 {
1140         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1141
1142         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1143 }
1144
1145 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1146 {
1147         dout("%s: img %p\n", __func__, img_request);
1148         if (img_request->callback)
1149                 img_request->callback(img_request);
1150         else
1151                 rbd_img_request_put(img_request);
1152 }
1153
1154 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1155
1156 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1157 {
1158         dout("%s: obj %p\n", __func__, obj_request);
1159
1160         return wait_for_completion_interruptible(&obj_request->completion);
1161 }
1162
1163 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1164 {
1165         atomic_set(&obj_request->done, 0);
1166         smp_wmb();
1167 }
1168
1169 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1170 {
1171         int done;
1172
1173         done = atomic_inc_return(&obj_request->done);
1174         if (done > 1) {
1175                 struct rbd_img_request *img_request = obj_request->img_request;
1176                 struct rbd_device *rbd_dev;
1177
1178                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1179                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1180                         obj_request);
1181         }
1182 }
1183
1184 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1185 {
1186         smp_mb();
1187         return atomic_read(&obj_request->done) != 0;
1188 }
1189
1190 static void
1191 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1192 {
1193         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1194                 obj_request, obj_request->img_request, obj_request->result,
1195                 obj_request->xferred, obj_request->length);
1196         /*
1197          * ENOENT means a hole in the image.  We zero-fill the
1198          * entire length of the request.  A short read also implies
1199          * zero-fill to the end of the request.  Either way we
1200          * update the xferred count to indicate the whole request
1201          * was satisfied.
1202          */
1203         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1204         if (obj_request->result == -ENOENT) {
1205                 zero_bio_chain(obj_request->bio_list, 0);
1206                 obj_request->result = 0;
1207                 obj_request->xferred = obj_request->length;
1208         } else if (obj_request->xferred < obj_request->length &&
1209                         !obj_request->result) {
1210                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1211                 obj_request->xferred = obj_request->length;
1212         }
1213         obj_request_done_set(obj_request);
1214 }
1215
1216 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1217 {
1218         dout("%s: obj %p cb %p\n", __func__, obj_request,
1219                 obj_request->callback);
1220         if (obj_request->callback)
1221                 obj_request->callback(obj_request);
1222         else
1223                 complete_all(&obj_request->completion);
1224 }
1225
1226 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1227 {
1228         dout("%s: obj %p\n", __func__, obj_request);
1229         obj_request_done_set(obj_request);
1230 }
1231
1232 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1233 {
1234         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1235                 obj_request->result, obj_request->xferred, obj_request->length);
1236         if (obj_request->img_request)
1237                 rbd_img_obj_request_read_callback(obj_request);
1238         else
1239                 obj_request_done_set(obj_request);
1240 }
1241
1242 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1243 {
1244         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1245                 obj_request->result, obj_request->length);
1246         /*
1247          * There is no such thing as a successful short write.
1248          * Our xferred value is the number of bytes transferred
1249          * back.  Set it to our originally-requested length.
1250          */
1251         obj_request->xferred = obj_request->length;
1252         obj_request_done_set(obj_request);
1253 }
1254
1255 /*
1256  * For a simple stat call there's nothing to do.  We'll do more if
1257  * this is part of a write sequence for a layered image.
1258  */
1259 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1260 {
1261         dout("%s: obj %p\n", __func__, obj_request);
1262         obj_request_done_set(obj_request);
1263 }
1264
1265 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1266                                 struct ceph_msg *msg)
1267 {
1268         struct rbd_obj_request *obj_request = osd_req->r_priv;
1269         u16 opcode;
1270
1271         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1272         rbd_assert(osd_req == obj_request->osd_req);
1273         rbd_assert(!!obj_request->img_request ^
1274                                 (obj_request->which == BAD_WHICH));
1275
1276         if (osd_req->r_result < 0)
1277                 obj_request->result = osd_req->r_result;
1278         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1279
1280         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1281
1282         /*
1283          * We support a 64-bit length, but ultimately it has to be
1284          * passed to blk_end_request(), which takes an unsigned int.
1285          */
1286         obj_request->xferred = osd_req->r_reply_op_len[0];
1287         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1288         opcode = osd_req->r_ops[0].op;
1289         switch (opcode) {
1290         case CEPH_OSD_OP_READ:
1291                 rbd_osd_read_callback(obj_request);
1292                 break;
1293         case CEPH_OSD_OP_WRITE:
1294                 rbd_osd_write_callback(obj_request);
1295                 break;
1296         case CEPH_OSD_OP_STAT:
1297                 rbd_osd_stat_callback(obj_request);
1298                 break;
1299         case CEPH_OSD_OP_CALL:
1300         case CEPH_OSD_OP_NOTIFY_ACK:
1301         case CEPH_OSD_OP_WATCH:
1302                 rbd_osd_trivial_callback(obj_request);
1303                 break;
1304         default:
1305                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1306                         obj_request->object_name, (unsigned short) opcode);
1307                 break;
1308         }
1309
1310         if (obj_request_done_test(obj_request))
1311                 rbd_obj_request_complete(obj_request);
1312 }
1313
1314 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1315                                         bool write_request)
1316 {
1317         struct rbd_img_request *img_request = obj_request->img_request;
1318         struct ceph_osd_request *osd_req = obj_request->osd_req;
1319         struct ceph_snap_context *snapc = NULL;
1320         u64 snap_id = CEPH_NOSNAP;
1321         struct timespec *mtime = NULL;
1322         struct timespec now;
1323
1324         rbd_assert(osd_req != NULL);
1325
1326         if (write_request) {
1327                 now = CURRENT_TIME;
1328                 mtime = &now;
1329                 if (img_request)
1330                         snapc = img_request->snapc;
1331         } else if (img_request) {
1332                 snap_id = img_request->snap_id;
1333         }
1334         ceph_osdc_build_request(osd_req, obj_request->offset,
1335                         snapc, snap_id, mtime);
1336 }
1337
1338 static struct ceph_osd_request *rbd_osd_req_create(
1339                                         struct rbd_device *rbd_dev,
1340                                         bool write_request,
1341                                         struct rbd_obj_request *obj_request)
1342 {
1343         struct rbd_img_request *img_request = obj_request->img_request;
1344         struct ceph_snap_context *snapc = NULL;
1345         struct ceph_osd_client *osdc;
1346         struct ceph_osd_request *osd_req;
1347
1348         if (img_request) {
1349                 rbd_assert(img_request->write_request == write_request);
1350                 if (img_request->write_request)
1351                         snapc = img_request->snapc;
1352         }
1353
1354         /* Allocate and initialize the request, for the single op */
1355
1356         osdc = &rbd_dev->rbd_client->client->osdc;
1357         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1358         if (!osd_req)
1359                 return NULL;    /* ENOMEM */
1360
1361         if (write_request)
1362                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1363         else
1364                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1365
1366         osd_req->r_callback = rbd_osd_req_callback;
1367         osd_req->r_priv = obj_request;
1368
1369         osd_req->r_oid_len = strlen(obj_request->object_name);
1370         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1371         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1372
1373         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1374
1375         return osd_req;
1376 }
1377
1378 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1379 {
1380         ceph_osdc_put_request(osd_req);
1381 }
1382
1383 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1384
1385 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1386                                                 u64 offset, u64 length,
1387                                                 enum obj_request_type type)
1388 {
1389         struct rbd_obj_request *obj_request;
1390         size_t size;
1391         char *name;
1392
1393         rbd_assert(obj_request_type_valid(type));
1394
1395         size = strlen(object_name) + 1;
1396         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1397         if (!obj_request)
1398                 return NULL;
1399
1400         name = (char *)(obj_request + 1);
1401         obj_request->object_name = memcpy(name, object_name, size);
1402         obj_request->offset = offset;
1403         obj_request->length = length;
1404         obj_request->which = BAD_WHICH;
1405         obj_request->type = type;
1406         INIT_LIST_HEAD(&obj_request->links);
1407         obj_request_done_init(obj_request);
1408         init_completion(&obj_request->completion);
1409         kref_init(&obj_request->kref);
1410
1411         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1412                 offset, length, (int)type, obj_request);
1413
1414         return obj_request;
1415 }
1416
1417 static void rbd_obj_request_destroy(struct kref *kref)
1418 {
1419         struct rbd_obj_request *obj_request;
1420
1421         obj_request = container_of(kref, struct rbd_obj_request, kref);
1422
1423         dout("%s: obj %p\n", __func__, obj_request);
1424
1425         rbd_assert(obj_request->img_request == NULL);
1426         rbd_assert(obj_request->which == BAD_WHICH);
1427
1428         if (obj_request->osd_req)
1429                 rbd_osd_req_destroy(obj_request->osd_req);
1430
1431         rbd_assert(obj_request_type_valid(obj_request->type));
1432         switch (obj_request->type) {
1433         case OBJ_REQUEST_NODATA:
1434                 break;          /* Nothing to do */
1435         case OBJ_REQUEST_BIO:
1436                 if (obj_request->bio_list)
1437                         bio_chain_put(obj_request->bio_list);
1438                 break;
1439         case OBJ_REQUEST_PAGES:
1440                 if (obj_request->pages)
1441                         ceph_release_page_vector(obj_request->pages,
1442                                                 obj_request->page_count);
1443                 break;
1444         }
1445
1446         kfree(obj_request);
1447 }
1448
1449 /*
1450  * Caller is responsible for filling in the list of object requests
1451  * that comprises the image request, and the Linux request pointer
1452  * (if there is one).
1453  */
1454 static struct rbd_img_request *rbd_img_request_create(
1455                                         struct rbd_device *rbd_dev,
1456                                         u64 offset, u64 length,
1457                                         bool write_request)
1458 {
1459         struct rbd_img_request *img_request;
1460         struct ceph_snap_context *snapc = NULL;
1461
1462         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1463         if (!img_request)
1464                 return NULL;
1465
1466         if (write_request) {
1467                 down_read(&rbd_dev->header_rwsem);
1468                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1469                 up_read(&rbd_dev->header_rwsem);
1470                 if (WARN_ON(!snapc)) {
1471                         kfree(img_request);
1472                         return NULL;    /* Shouldn't happen */
1473                 }
1474         }
1475
1476         img_request->rq = NULL;
1477         img_request->rbd_dev = rbd_dev;
1478         img_request->offset = offset;
1479         img_request->length = length;
1480         img_request->write_request = write_request;
1481         if (write_request)
1482                 img_request->snapc = snapc;
1483         else
1484                 img_request->snap_id = rbd_dev->spec->snap_id;
1485         spin_lock_init(&img_request->completion_lock);
1486         img_request->next_completion = 0;
1487         img_request->callback = NULL;
1488         img_request->obj_request_count = 0;
1489         INIT_LIST_HEAD(&img_request->obj_requests);
1490         kref_init(&img_request->kref);
1491
1492         rbd_img_request_get(img_request);       /* Avoid a warning */
1493         rbd_img_request_put(img_request);       /* TEMPORARY */
1494
1495         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1496                 write_request ? "write" : "read", offset, length,
1497                 img_request);
1498
1499         return img_request;
1500 }
1501
1502 static void rbd_img_request_destroy(struct kref *kref)
1503 {
1504         struct rbd_img_request *img_request;
1505         struct rbd_obj_request *obj_request;
1506         struct rbd_obj_request *next_obj_request;
1507
1508         img_request = container_of(kref, struct rbd_img_request, kref);
1509
1510         dout("%s: img %p\n", __func__, img_request);
1511
1512         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1513                 rbd_img_obj_request_del(img_request, obj_request);
1514         rbd_assert(img_request->obj_request_count == 0);
1515
1516         if (img_request->write_request)
1517                 ceph_put_snap_context(img_request->snapc);
1518
1519         kfree(img_request);
1520 }
1521
1522 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1523 {
1524         struct rbd_img_request *img_request;
1525         u32 which = obj_request->which;
1526         bool more = true;
1527
1528         img_request = obj_request->img_request;
1529
1530         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1531         rbd_assert(img_request != NULL);
1532         rbd_assert(img_request->rq != NULL);
1533         rbd_assert(img_request->obj_request_count > 0);
1534         rbd_assert(which != BAD_WHICH);
1535         rbd_assert(which < img_request->obj_request_count);
1536         rbd_assert(which >= img_request->next_completion);
1537
1538         spin_lock_irq(&img_request->completion_lock);
1539         if (which != img_request->next_completion)
1540                 goto out;
1541
1542         for_each_obj_request_from(img_request, obj_request) {
1543                 unsigned int xferred;
1544                 int result;
1545
1546                 rbd_assert(more);
1547                 rbd_assert(which < img_request->obj_request_count);
1548
1549                 if (!obj_request_done_test(obj_request))
1550                         break;
1551
1552                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1553                 xferred = (unsigned int) obj_request->xferred;
1554                 result = (int) obj_request->result;
1555                 if (result)
1556                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1557                                 img_request->write_request ? "write" : "read",
1558                                 result, xferred);
1559
1560                 more = blk_end_request(img_request->rq, result, xferred);
1561                 which++;
1562         }
1563
1564         rbd_assert(more ^ (which == img_request->obj_request_count));
1565         img_request->next_completion = which;
1566 out:
1567         spin_unlock_irq(&img_request->completion_lock);
1568
1569         if (!more)
1570                 rbd_img_request_complete(img_request);
1571 }
1572
1573 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1574                                         struct bio *bio_list)
1575 {
1576         struct rbd_device *rbd_dev = img_request->rbd_dev;
1577         struct rbd_obj_request *obj_request = NULL;
1578         struct rbd_obj_request *next_obj_request;
1579         bool write_request = img_request->write_request;
1580         unsigned int bio_offset;
1581         u64 image_offset;
1582         u64 resid;
1583         u16 opcode;
1584
1585         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1586
1587         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1588         bio_offset = 0;
1589         image_offset = img_request->offset;
1590         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1591         resid = img_request->length;
1592         rbd_assert(resid > 0);
1593         while (resid) {
1594                 struct ceph_osd_request *osd_req;
1595                 const char *object_name;
1596                 unsigned int clone_size;
1597                 u64 offset;
1598                 u64 length;
1599
1600                 object_name = rbd_segment_name(rbd_dev, image_offset);
1601                 if (!object_name)
1602                         goto out_unwind;
1603                 offset = rbd_segment_offset(rbd_dev, image_offset);
1604                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1605                 obj_request = rbd_obj_request_create(object_name,
1606                                                 offset, length,
1607                                                 OBJ_REQUEST_BIO);
1608                 kfree(object_name);     /* object request has its own copy */
1609                 if (!obj_request)
1610                         goto out_unwind;
1611
1612                 rbd_assert(length <= (u64) UINT_MAX);
1613                 clone_size = (unsigned int) length;
1614                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1615                                                 &bio_offset, clone_size,
1616                                                 GFP_ATOMIC);
1617                 if (!obj_request->bio_list)
1618                         goto out_partial;
1619
1620                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1621                                                 obj_request);
1622                 if (!osd_req)
1623                         goto out_partial;
1624                 obj_request->osd_req = osd_req;
1625                 obj_request->callback = rbd_img_obj_callback;
1626
1627                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1628                                                 0, 0);
1629                 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1630                                 obj_request->bio_list, obj_request->length);
1631                 rbd_osd_req_format(obj_request, write_request);
1632
1633                 rbd_img_obj_request_add(img_request, obj_request);
1634
1635                 image_offset += length;
1636                 resid -= length;
1637         }
1638
1639         return 0;
1640
1641 out_partial:
1642         rbd_obj_request_put(obj_request);
1643 out_unwind:
1644         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1645                 rbd_obj_request_put(obj_request);
1646
1647         return -ENOMEM;
1648 }
1649
1650 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1651 {
1652         struct rbd_device *rbd_dev = img_request->rbd_dev;
1653         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1654         struct rbd_obj_request *obj_request;
1655         struct rbd_obj_request *next_obj_request;
1656
1657         dout("%s: img %p\n", __func__, img_request);
1658         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1659                 int ret;
1660
1661                 ret = rbd_obj_request_submit(osdc, obj_request);
1662                 if (ret)
1663                         return ret;
1664                 /*
1665                  * The image request has its own reference to each
1666                  * of its object requests, so we can safely drop the
1667                  * initial one here.
1668                  */
1669                 rbd_obj_request_put(obj_request);
1670         }
1671
1672         return 0;
1673 }
1674
1675 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1676                                    u64 ver, u64 notify_id)
1677 {
1678         struct rbd_obj_request *obj_request;
1679         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1680         int ret;
1681
1682         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1683                                                         OBJ_REQUEST_NODATA);
1684         if (!obj_request)
1685                 return -ENOMEM;
1686
1687         ret = -ENOMEM;
1688         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1689         if (!obj_request->osd_req)
1690                 goto out;
1691         obj_request->callback = rbd_obj_request_put;
1692
1693         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1694                                         notify_id, ver, 0);
1695         rbd_osd_req_format(obj_request, false);
1696
1697         ret = rbd_obj_request_submit(osdc, obj_request);
1698 out:
1699         if (ret)
1700                 rbd_obj_request_put(obj_request);
1701
1702         return ret;
1703 }
1704
1705 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1706 {
1707         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1708         u64 hver;
1709         int rc;
1710
1711         if (!rbd_dev)
1712                 return;
1713
1714         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1715                 rbd_dev->header_name, (unsigned long long) notify_id,
1716                 (unsigned int) opcode);
1717         rc = rbd_dev_refresh(rbd_dev, &hver);
1718         if (rc)
1719                 rbd_warn(rbd_dev, "got notification but failed to "
1720                            " update snaps: %d\n", rc);
1721
1722         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1723 }
1724
1725 /*
1726  * Request sync osd watch/unwatch.  The value of "start" determines
1727  * whether a watch request is being initiated or torn down.
1728  */
1729 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1730 {
1731         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1732         struct rbd_obj_request *obj_request;
1733         int ret;
1734
1735         rbd_assert(start ^ !!rbd_dev->watch_event);
1736         rbd_assert(start ^ !!rbd_dev->watch_request);
1737
1738         if (start) {
1739                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1740                                                 &rbd_dev->watch_event);
1741                 if (ret < 0)
1742                         return ret;
1743                 rbd_assert(rbd_dev->watch_event != NULL);
1744         }
1745
1746         ret = -ENOMEM;
1747         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1748                                                         OBJ_REQUEST_NODATA);
1749         if (!obj_request)
1750                 goto out_cancel;
1751
1752         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1753         if (!obj_request->osd_req)
1754                 goto out_cancel;
1755
1756         if (start)
1757                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1758         else
1759                 ceph_osdc_unregister_linger_request(osdc,
1760                                         rbd_dev->watch_request->osd_req);
1761
1762         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1763                                 rbd_dev->watch_event->cookie,
1764                                 rbd_dev->header.obj_version, start);
1765         rbd_osd_req_format(obj_request, true);
1766
1767         ret = rbd_obj_request_submit(osdc, obj_request);
1768         if (ret)
1769                 goto out_cancel;
1770         ret = rbd_obj_request_wait(obj_request);
1771         if (ret)
1772                 goto out_cancel;
1773         ret = obj_request->result;
1774         if (ret)
1775                 goto out_cancel;
1776
1777         /*
1778          * A watch request is set to linger, so the underlying osd
1779          * request won't go away until we unregister it.  We retain
1780          * a pointer to the object request during that time (in
1781          * rbd_dev->watch_request), so we'll keep a reference to
1782          * it.  We'll drop that reference (below) after we've
1783          * unregistered it.
1784          */
1785         if (start) {
1786                 rbd_dev->watch_request = obj_request;
1787
1788                 return 0;
1789         }
1790
1791         /* We have successfully torn down the watch request */
1792
1793         rbd_obj_request_put(rbd_dev->watch_request);
1794         rbd_dev->watch_request = NULL;
1795 out_cancel:
1796         /* Cancel the event if we're tearing down, or on error */
1797         ceph_osdc_cancel_event(rbd_dev->watch_event);
1798         rbd_dev->watch_event = NULL;
1799         if (obj_request)
1800                 rbd_obj_request_put(obj_request);
1801
1802         return ret;
1803 }
1804
1805 /*
1806  * Synchronous osd object method call
1807  */
1808 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1809                              const char *object_name,
1810                              const char *class_name,
1811                              const char *method_name,
1812                              const char *outbound,
1813                              size_t outbound_size,
1814                              char *inbound,
1815                              size_t inbound_size,
1816                              u64 *version)
1817 {
1818         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1819         struct rbd_obj_request *obj_request;
1820         struct page **pages;
1821         u32 page_count;
1822         int ret;
1823
1824         /*
1825          * Method calls are ultimately read operations.  The result
1826          * should placed into the inbound buffer provided.  They
1827          * also supply outbound data--parameters for the object
1828          * method.  Currently if this is present it will be a
1829          * snapshot id.
1830          */
1831         page_count = (u32) calc_pages_for(0, inbound_size);
1832         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1833         if (IS_ERR(pages))
1834                 return PTR_ERR(pages);
1835
1836         ret = -ENOMEM;
1837         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1838                                                         OBJ_REQUEST_PAGES);
1839         if (!obj_request)
1840                 goto out;
1841
1842         obj_request->pages = pages;
1843         obj_request->page_count = page_count;
1844
1845         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1846         if (!obj_request->osd_req)
1847                 goto out;
1848
1849         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1850                                         class_name, method_name,
1851                                         outbound, outbound_size);
1852         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1853                                         obj_request->pages, inbound_size,
1854                                         0, false, false);
1855         rbd_osd_req_format(obj_request, false);
1856
1857         ret = rbd_obj_request_submit(osdc, obj_request);
1858         if (ret)
1859                 goto out;
1860         ret = rbd_obj_request_wait(obj_request);
1861         if (ret)
1862                 goto out;
1863
1864         ret = obj_request->result;
1865         if (ret < 0)
1866                 goto out;
1867         ret = 0;
1868         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1869         if (version)
1870                 *version = obj_request->version;
1871 out:
1872         if (obj_request)
1873                 rbd_obj_request_put(obj_request);
1874         else
1875                 ceph_release_page_vector(pages, page_count);
1876
1877         return ret;
1878 }
1879
1880 static void rbd_request_fn(struct request_queue *q)
1881                 __releases(q->queue_lock) __acquires(q->queue_lock)
1882 {
1883         struct rbd_device *rbd_dev = q->queuedata;
1884         bool read_only = rbd_dev->mapping.read_only;
1885         struct request *rq;
1886         int result;
1887
1888         while ((rq = blk_fetch_request(q))) {
1889                 bool write_request = rq_data_dir(rq) == WRITE;
1890                 struct rbd_img_request *img_request;
1891                 u64 offset;
1892                 u64 length;
1893
1894                 /* Ignore any non-FS requests that filter through. */
1895
1896                 if (rq->cmd_type != REQ_TYPE_FS) {
1897                         dout("%s: non-fs request type %d\n", __func__,
1898                                 (int) rq->cmd_type);
1899                         __blk_end_request_all(rq, 0);
1900                         continue;
1901                 }
1902
1903                 /* Ignore/skip any zero-length requests */
1904
1905                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1906                 length = (u64) blk_rq_bytes(rq);
1907
1908                 if (!length) {
1909                         dout("%s: zero-length request\n", __func__);
1910                         __blk_end_request_all(rq, 0);
1911                         continue;
1912                 }
1913
1914                 spin_unlock_irq(q->queue_lock);
1915
1916                 /* Disallow writes to a read-only device */
1917
1918                 if (write_request) {
1919                         result = -EROFS;
1920                         if (read_only)
1921                                 goto end_request;
1922                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1923                 }
1924
1925                 /*
1926                  * Quit early if the mapped snapshot no longer
1927                  * exists.  It's still possible the snapshot will
1928                  * have disappeared by the time our request arrives
1929                  * at the osd, but there's no sense in sending it if
1930                  * we already know.
1931                  */
1932                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1933                         dout("request for non-existent snapshot");
1934                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1935                         result = -ENXIO;
1936                         goto end_request;
1937                 }
1938
1939                 result = -EINVAL;
1940                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1941                         goto end_request;       /* Shouldn't happen */
1942
1943                 result = -ENOMEM;
1944                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1945                                                         write_request);
1946                 if (!img_request)
1947                         goto end_request;
1948
1949                 img_request->rq = rq;
1950
1951                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1952                 if (!result)
1953                         result = rbd_img_request_submit(img_request);
1954                 if (result)
1955                         rbd_img_request_put(img_request);
1956 end_request:
1957                 spin_lock_irq(q->queue_lock);
1958                 if (result < 0) {
1959                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1960                                 write_request ? "write" : "read", result);
1961                         __blk_end_request_all(rq, result);
1962                 }
1963         }
1964 }
1965
1966 /*
1967  * a queue callback. Makes sure that we don't create a bio that spans across
1968  * multiple osd objects. One exception would be with a single page bios,
1969  * which we handle later at bio_chain_clone_range()
1970  */
1971 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1972                           struct bio_vec *bvec)
1973 {
1974         struct rbd_device *rbd_dev = q->queuedata;
1975         sector_t sector_offset;
1976         sector_t sectors_per_obj;
1977         sector_t obj_sector_offset;
1978         int ret;
1979
1980         /*
1981          * Find how far into its rbd object the partition-relative
1982          * bio start sector is to offset relative to the enclosing
1983          * device.
1984          */
1985         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1986         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1987         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1988
1989         /*
1990          * Compute the number of bytes from that offset to the end
1991          * of the object.  Account for what's already used by the bio.
1992          */
1993         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1994         if (ret > bmd->bi_size)
1995                 ret -= bmd->bi_size;
1996         else
1997                 ret = 0;
1998
1999         /*
2000          * Don't send back more than was asked for.  And if the bio
2001          * was empty, let the whole thing through because:  "Note
2002          * that a block device *must* allow a single page to be
2003          * added to an empty bio."
2004          */
2005         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2006         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2007                 ret = (int) bvec->bv_len;
2008
2009         return ret;
2010 }
2011
2012 static void rbd_free_disk(struct rbd_device *rbd_dev)
2013 {
2014         struct gendisk *disk = rbd_dev->disk;
2015
2016         if (!disk)
2017                 return;
2018
2019         if (disk->flags & GENHD_FL_UP)
2020                 del_gendisk(disk);
2021         if (disk->queue)
2022                 blk_cleanup_queue(disk->queue);
2023         put_disk(disk);
2024 }
2025
2026 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2027                                 const char *object_name,
2028                                 u64 offset, u64 length,
2029                                 char *buf, u64 *version)
2030
2031 {
2032         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2033         struct rbd_obj_request *obj_request;
2034         struct page **pages = NULL;
2035         u32 page_count;
2036         size_t size;
2037         int ret;
2038
2039         page_count = (u32) calc_pages_for(offset, length);
2040         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2041         if (IS_ERR(pages))
2042                 ret = PTR_ERR(pages);
2043
2044         ret = -ENOMEM;
2045         obj_request = rbd_obj_request_create(object_name, offset, length,
2046                                                         OBJ_REQUEST_PAGES);
2047         if (!obj_request)
2048                 goto out;
2049
2050         obj_request->pages = pages;
2051         obj_request->page_count = page_count;
2052
2053         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2054         if (!obj_request->osd_req)
2055                 goto out;
2056
2057         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2058                                         offset, length, 0, 0);
2059         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2060                                         obj_request->pages,
2061                                         obj_request->length,
2062                                         obj_request->offset & ~PAGE_MASK,
2063                                         false, false);
2064         rbd_osd_req_format(obj_request, false);
2065
2066         ret = rbd_obj_request_submit(osdc, obj_request);
2067         if (ret)
2068                 goto out;
2069         ret = rbd_obj_request_wait(obj_request);
2070         if (ret)
2071                 goto out;
2072
2073         ret = obj_request->result;
2074         if (ret < 0)
2075                 goto out;
2076
2077         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2078         size = (size_t) obj_request->xferred;
2079         ceph_copy_from_page_vector(pages, buf, 0, size);
2080         rbd_assert(size <= (size_t) INT_MAX);
2081         ret = (int) size;
2082         if (version)
2083                 *version = obj_request->version;
2084 out:
2085         if (obj_request)
2086                 rbd_obj_request_put(obj_request);
2087         else
2088                 ceph_release_page_vector(pages, page_count);
2089
2090         return ret;
2091 }
2092
2093 /*
2094  * Read the complete header for the given rbd device.
2095  *
2096  * Returns a pointer to a dynamically-allocated buffer containing
2097  * the complete and validated header.  Caller can pass the address
2098  * of a variable that will be filled in with the version of the
2099  * header object at the time it was read.
2100  *
2101  * Returns a pointer-coded errno if a failure occurs.
2102  */
2103 static struct rbd_image_header_ondisk *
2104 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2105 {
2106         struct rbd_image_header_ondisk *ondisk = NULL;
2107         u32 snap_count = 0;
2108         u64 names_size = 0;
2109         u32 want_count;
2110         int ret;
2111
2112         /*
2113          * The complete header will include an array of its 64-bit
2114          * snapshot ids, followed by the names of those snapshots as
2115          * a contiguous block of NUL-terminated strings.  Note that
2116          * the number of snapshots could change by the time we read
2117          * it in, in which case we re-read it.
2118          */
2119         do {
2120                 size_t size;
2121
2122                 kfree(ondisk);
2123
2124                 size = sizeof (*ondisk);
2125                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2126                 size += names_size;
2127                 ondisk = kmalloc(size, GFP_KERNEL);
2128                 if (!ondisk)
2129                         return ERR_PTR(-ENOMEM);
2130
2131                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2132                                        0, size,
2133                                        (char *) ondisk, version);
2134                 if (ret < 0)
2135                         goto out_err;
2136                 if (WARN_ON((size_t) ret < size)) {
2137                         ret = -ENXIO;
2138                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2139                                 size, ret);
2140                         goto out_err;
2141                 }
2142                 if (!rbd_dev_ondisk_valid(ondisk)) {
2143                         ret = -ENXIO;
2144                         rbd_warn(rbd_dev, "invalid header");
2145                         goto out_err;
2146                 }
2147
2148                 names_size = le64_to_cpu(ondisk->snap_names_len);
2149                 want_count = snap_count;
2150                 snap_count = le32_to_cpu(ondisk->snap_count);
2151         } while (snap_count != want_count);
2152
2153         return ondisk;
2154
2155 out_err:
2156         kfree(ondisk);
2157
2158         return ERR_PTR(ret);
2159 }
2160
2161 /*
2162  * reload the ondisk the header
2163  */
2164 static int rbd_read_header(struct rbd_device *rbd_dev,
2165                            struct rbd_image_header *header)
2166 {
2167         struct rbd_image_header_ondisk *ondisk;
2168         u64 ver = 0;
2169         int ret;
2170
2171         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2172         if (IS_ERR(ondisk))
2173                 return PTR_ERR(ondisk);
2174         ret = rbd_header_from_disk(header, ondisk);
2175         if (ret >= 0)
2176                 header->obj_version = ver;
2177         kfree(ondisk);
2178
2179         return ret;
2180 }
2181
2182 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2183 {
2184         struct rbd_snap *snap;
2185         struct rbd_snap *next;
2186
2187         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2188                 rbd_remove_snap_dev(snap);
2189 }
2190
2191 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2192 {
2193         sector_t size;
2194
2195         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2196                 return;
2197
2198         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2199         dout("setting size to %llu sectors", (unsigned long long) size);
2200         rbd_dev->mapping.size = (u64) size;
2201         set_capacity(rbd_dev->disk, size);
2202 }
2203
2204 /*
2205  * only read the first part of the ondisk header, without the snaps info
2206  */
2207 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2208 {
2209         int ret;
2210         struct rbd_image_header h;
2211
2212         ret = rbd_read_header(rbd_dev, &h);
2213         if (ret < 0)
2214                 return ret;
2215
2216         down_write(&rbd_dev->header_rwsem);
2217
2218         /* Update image size, and check for resize of mapped image */
2219         rbd_dev->header.image_size = h.image_size;
2220         rbd_update_mapping_size(rbd_dev);
2221
2222         /* rbd_dev->header.object_prefix shouldn't change */
2223         kfree(rbd_dev->header.snap_sizes);
2224         kfree(rbd_dev->header.snap_names);
2225         /* osd requests may still refer to snapc */
2226         ceph_put_snap_context(rbd_dev->header.snapc);
2227
2228         if (hver)
2229                 *hver = h.obj_version;
2230         rbd_dev->header.obj_version = h.obj_version;
2231         rbd_dev->header.image_size = h.image_size;
2232         rbd_dev->header.snapc = h.snapc;
2233         rbd_dev->header.snap_names = h.snap_names;
2234         rbd_dev->header.snap_sizes = h.snap_sizes;
2235         /* Free the extra copy of the object prefix */
2236         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2237         kfree(h.object_prefix);
2238
2239         ret = rbd_dev_snaps_update(rbd_dev);
2240         if (!ret)
2241                 ret = rbd_dev_snaps_register(rbd_dev);
2242
2243         up_write(&rbd_dev->header_rwsem);
2244
2245         return ret;
2246 }
2247
2248 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2249 {
2250         int ret;
2251
2252         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2253         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2254         if (rbd_dev->image_format == 1)
2255                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2256         else
2257                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2258         mutex_unlock(&ctl_mutex);
2259
2260         return ret;
2261 }
2262
2263 static int rbd_init_disk(struct rbd_device *rbd_dev)
2264 {
2265         struct gendisk *disk;
2266         struct request_queue *q;
2267         u64 segment_size;
2268
2269         /* create gendisk info */
2270         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2271         if (!disk)
2272                 return -ENOMEM;
2273
2274         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2275                  rbd_dev->dev_id);
2276         disk->major = rbd_dev->major;
2277         disk->first_minor = 0;
2278         disk->fops = &rbd_bd_ops;
2279         disk->private_data = rbd_dev;
2280
2281         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2282         if (!q)
2283                 goto out_disk;
2284
2285         /* We use the default size, but let's be explicit about it. */
2286         blk_queue_physical_block_size(q, SECTOR_SIZE);
2287
2288         /* set io sizes to object size */
2289         segment_size = rbd_obj_bytes(&rbd_dev->header);
2290         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2291         blk_queue_max_segment_size(q, segment_size);
2292         blk_queue_io_min(q, segment_size);
2293         blk_queue_io_opt(q, segment_size);
2294
2295         blk_queue_merge_bvec(q, rbd_merge_bvec);
2296         disk->queue = q;
2297
2298         q->queuedata = rbd_dev;
2299
2300         rbd_dev->disk = disk;
2301
2302         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2303
2304         return 0;
2305 out_disk:
2306         put_disk(disk);
2307
2308         return -ENOMEM;
2309 }
2310
2311 /*
2312   sysfs
2313 */
2314
2315 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2316 {
2317         return container_of(dev, struct rbd_device, dev);
2318 }
2319
2320 static ssize_t rbd_size_show(struct device *dev,
2321                              struct device_attribute *attr, char *buf)
2322 {
2323         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2324         sector_t size;
2325
2326         down_read(&rbd_dev->header_rwsem);
2327         size = get_capacity(rbd_dev->disk);
2328         up_read(&rbd_dev->header_rwsem);
2329
2330         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2331 }
2332
2333 /*
2334  * Note this shows the features for whatever's mapped, which is not
2335  * necessarily the base image.
2336  */
2337 static ssize_t rbd_features_show(struct device *dev,
2338                              struct device_attribute *attr, char *buf)
2339 {
2340         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2341
2342         return sprintf(buf, "0x%016llx\n",
2343                         (unsigned long long) rbd_dev->mapping.features);
2344 }
2345
2346 static ssize_t rbd_major_show(struct device *dev,
2347                               struct device_attribute *attr, char *buf)
2348 {
2349         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2350
2351         return sprintf(buf, "%d\n", rbd_dev->major);
2352 }
2353
2354 static ssize_t rbd_client_id_show(struct device *dev,
2355                                   struct device_attribute *attr, char *buf)
2356 {
2357         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2358
2359         return sprintf(buf, "client%lld\n",
2360                         ceph_client_id(rbd_dev->rbd_client->client));
2361 }
2362
2363 static ssize_t rbd_pool_show(struct device *dev,
2364                              struct device_attribute *attr, char *buf)
2365 {
2366         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2367
2368         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2369 }
2370
2371 static ssize_t rbd_pool_id_show(struct device *dev,
2372                              struct device_attribute *attr, char *buf)
2373 {
2374         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2375
2376         return sprintf(buf, "%llu\n",
2377                 (unsigned long long) rbd_dev->spec->pool_id);
2378 }
2379
2380 static ssize_t rbd_name_show(struct device *dev,
2381                              struct device_attribute *attr, char *buf)
2382 {
2383         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2384
2385         if (rbd_dev->spec->image_name)
2386                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2387
2388         return sprintf(buf, "(unknown)\n");
2389 }
2390
2391 static ssize_t rbd_image_id_show(struct device *dev,
2392                              struct device_attribute *attr, char *buf)
2393 {
2394         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2395
2396         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2397 }
2398
2399 /*
2400  * Shows the name of the currently-mapped snapshot (or
2401  * RBD_SNAP_HEAD_NAME for the base image).
2402  */
2403 static ssize_t rbd_snap_show(struct device *dev,
2404                              struct device_attribute *attr,
2405                              char *buf)
2406 {
2407         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2408
2409         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2410 }
2411
2412 /*
2413  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2414  * for the parent image.  If there is no parent, simply shows
2415  * "(no parent image)".
2416  */
2417 static ssize_t rbd_parent_show(struct device *dev,
2418                              struct device_attribute *attr,
2419                              char *buf)
2420 {
2421         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2422         struct rbd_spec *spec = rbd_dev->parent_spec;
2423         int count;
2424         char *bufp = buf;
2425
2426         if (!spec)
2427                 return sprintf(buf, "(no parent image)\n");
2428
2429         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2430                         (unsigned long long) spec->pool_id, spec->pool_name);
2431         if (count < 0)
2432                 return count;
2433         bufp += count;
2434
2435         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2436                         spec->image_name ? spec->image_name : "(unknown)");
2437         if (count < 0)
2438                 return count;
2439         bufp += count;
2440
2441         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2442                         (unsigned long long) spec->snap_id, spec->snap_name);
2443         if (count < 0)
2444                 return count;
2445         bufp += count;
2446
2447         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2448         if (count < 0)
2449                 return count;
2450         bufp += count;
2451
2452         return (ssize_t) (bufp - buf);
2453 }
2454
2455 static ssize_t rbd_image_refresh(struct device *dev,
2456                                  struct device_attribute *attr,
2457                                  const char *buf,
2458                                  size_t size)
2459 {
2460         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2461         int ret;
2462
2463         ret = rbd_dev_refresh(rbd_dev, NULL);
2464
2465         return ret < 0 ? ret : size;
2466 }
2467
2468 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2469 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2470 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2471 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2472 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2473 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2474 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2475 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2476 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2477 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2478 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2479
2480 static struct attribute *rbd_attrs[] = {
2481         &dev_attr_size.attr,
2482         &dev_attr_features.attr,
2483         &dev_attr_major.attr,
2484         &dev_attr_client_id.attr,
2485         &dev_attr_pool.attr,
2486         &dev_attr_pool_id.attr,
2487         &dev_attr_name.attr,
2488         &dev_attr_image_id.attr,
2489         &dev_attr_current_snap.attr,
2490         &dev_attr_parent.attr,
2491         &dev_attr_refresh.attr,
2492         NULL
2493 };
2494
2495 static struct attribute_group rbd_attr_group = {
2496         .attrs = rbd_attrs,
2497 };
2498
2499 static const struct attribute_group *rbd_attr_groups[] = {
2500         &rbd_attr_group,
2501         NULL
2502 };
2503
2504 static void rbd_sysfs_dev_release(struct device *dev)
2505 {
2506 }
2507
2508 static struct device_type rbd_device_type = {
2509         .name           = "rbd",
2510         .groups         = rbd_attr_groups,
2511         .release        = rbd_sysfs_dev_release,
2512 };
2513
2514
2515 /*
2516   sysfs - snapshots
2517 */
2518
2519 static ssize_t rbd_snap_size_show(struct device *dev,
2520                                   struct device_attribute *attr,
2521                                   char *buf)
2522 {
2523         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2524
2525         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2526 }
2527
2528 static ssize_t rbd_snap_id_show(struct device *dev,
2529                                 struct device_attribute *attr,
2530                                 char *buf)
2531 {
2532         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2533
2534         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2535 }
2536
2537 static ssize_t rbd_snap_features_show(struct device *dev,
2538                                 struct device_attribute *attr,
2539                                 char *buf)
2540 {
2541         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2542
2543         return sprintf(buf, "0x%016llx\n",
2544                         (unsigned long long) snap->features);
2545 }
2546
2547 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2548 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2549 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2550
2551 static struct attribute *rbd_snap_attrs[] = {
2552         &dev_attr_snap_size.attr,
2553         &dev_attr_snap_id.attr,
2554         &dev_attr_snap_features.attr,
2555         NULL,
2556 };
2557
2558 static struct attribute_group rbd_snap_attr_group = {
2559         .attrs = rbd_snap_attrs,
2560 };
2561
2562 static void rbd_snap_dev_release(struct device *dev)
2563 {
2564         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2565         kfree(snap->name);
2566         kfree(snap);
2567 }
2568
2569 static const struct attribute_group *rbd_snap_attr_groups[] = {
2570         &rbd_snap_attr_group,
2571         NULL
2572 };
2573
2574 static struct device_type rbd_snap_device_type = {
2575         .groups         = rbd_snap_attr_groups,
2576         .release        = rbd_snap_dev_release,
2577 };
2578
2579 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2580 {
2581         kref_get(&spec->kref);
2582
2583         return spec;
2584 }
2585
2586 static void rbd_spec_free(struct kref *kref);
2587 static void rbd_spec_put(struct rbd_spec *spec)
2588 {
2589         if (spec)
2590                 kref_put(&spec->kref, rbd_spec_free);
2591 }
2592
2593 static struct rbd_spec *rbd_spec_alloc(void)
2594 {
2595         struct rbd_spec *spec;
2596
2597         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2598         if (!spec)
2599                 return NULL;
2600         kref_init(&spec->kref);
2601
2602         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2603
2604         return spec;
2605 }
2606
2607 static void rbd_spec_free(struct kref *kref)
2608 {
2609         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2610
2611         kfree(spec->pool_name);
2612         kfree(spec->image_id);
2613         kfree(spec->image_name);
2614         kfree(spec->snap_name);
2615         kfree(spec);
2616 }
2617
2618 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2619                                 struct rbd_spec *spec)
2620 {
2621         struct rbd_device *rbd_dev;
2622
2623         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2624         if (!rbd_dev)
2625                 return NULL;
2626
2627         spin_lock_init(&rbd_dev->lock);
2628         rbd_dev->flags = 0;
2629         INIT_LIST_HEAD(&rbd_dev->node);
2630         INIT_LIST_HEAD(&rbd_dev->snaps);
2631         init_rwsem(&rbd_dev->header_rwsem);
2632
2633         rbd_dev->spec = spec;
2634         rbd_dev->rbd_client = rbdc;
2635
2636         /* Initialize the layout used for all rbd requests */
2637
2638         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2639         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2640         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2641         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2642
2643         return rbd_dev;
2644 }
2645
2646 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2647 {
2648         rbd_spec_put(rbd_dev->parent_spec);
2649         kfree(rbd_dev->header_name);
2650         rbd_put_client(rbd_dev->rbd_client);
2651         rbd_spec_put(rbd_dev->spec);
2652         kfree(rbd_dev);
2653 }
2654
2655 static bool rbd_snap_registered(struct rbd_snap *snap)
2656 {
2657         bool ret = snap->dev.type == &rbd_snap_device_type;
2658         bool reg = device_is_registered(&snap->dev);
2659
2660         rbd_assert(!ret ^ reg);
2661
2662         return ret;
2663 }
2664
2665 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2666 {
2667         list_del(&snap->node);
2668         if (device_is_registered(&snap->dev))
2669                 device_unregister(&snap->dev);
2670 }
2671
2672 static int rbd_register_snap_dev(struct rbd_snap *snap,
2673                                   struct device *parent)
2674 {
2675         struct device *dev = &snap->dev;
2676         int ret;
2677
2678         dev->type = &rbd_snap_device_type;
2679         dev->parent = parent;
2680         dev->release = rbd_snap_dev_release;
2681         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2682         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2683
2684         ret = device_register(dev);
2685
2686         return ret;
2687 }
2688
2689 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2690                                                 const char *snap_name,
2691                                                 u64 snap_id, u64 snap_size,
2692                                                 u64 snap_features)
2693 {
2694         struct rbd_snap *snap;
2695         int ret;
2696
2697         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2698         if (!snap)
2699                 return ERR_PTR(-ENOMEM);
2700
2701         ret = -ENOMEM;
2702         snap->name = kstrdup(snap_name, GFP_KERNEL);
2703         if (!snap->name)
2704                 goto err;
2705
2706         snap->id = snap_id;
2707         snap->size = snap_size;
2708         snap->features = snap_features;
2709
2710         return snap;
2711
2712 err:
2713         kfree(snap->name);
2714         kfree(snap);
2715
2716         return ERR_PTR(ret);
2717 }
2718
2719 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2720                 u64 *snap_size, u64 *snap_features)
2721 {
2722         char *snap_name;
2723
2724         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2725
2726         *snap_size = rbd_dev->header.snap_sizes[which];
2727         *snap_features = 0;     /* No features for v1 */
2728
2729         /* Skip over names until we find the one we are looking for */
2730
2731         snap_name = rbd_dev->header.snap_names;
2732         while (which--)
2733                 snap_name += strlen(snap_name) + 1;
2734
2735         return snap_name;
2736 }
2737
2738 /*
2739  * Get the size and object order for an image snapshot, or if
2740  * snap_id is CEPH_NOSNAP, gets this information for the base
2741  * image.
2742  */
2743 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2744                                 u8 *order, u64 *snap_size)
2745 {
2746         __le64 snapid = cpu_to_le64(snap_id);
2747         int ret;
2748         struct {
2749                 u8 order;
2750                 __le64 size;
2751         } __attribute__ ((packed)) size_buf = { 0 };
2752
2753         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2754                                 "rbd", "get_size",
2755                                 (char *) &snapid, sizeof (snapid),
2756                                 (char *) &size_buf, sizeof (size_buf), NULL);
2757         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2758         if (ret < 0)
2759                 return ret;
2760
2761         *order = size_buf.order;
2762         *snap_size = le64_to_cpu(size_buf.size);
2763
2764         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2765                 (unsigned long long) snap_id, (unsigned int) *order,
2766                 (unsigned long long) *snap_size);
2767
2768         return 0;
2769 }
2770
2771 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2772 {
2773         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2774                                         &rbd_dev->header.obj_order,
2775                                         &rbd_dev->header.image_size);
2776 }
2777
2778 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2779 {
2780         void *reply_buf;
2781         int ret;
2782         void *p;
2783
2784         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2785         if (!reply_buf)
2786                 return -ENOMEM;
2787
2788         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2789                                 "rbd", "get_object_prefix",
2790                                 NULL, 0,
2791                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2792         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2793         if (ret < 0)
2794                 goto out;
2795
2796         p = reply_buf;
2797         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2798                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2799                                                 NULL, GFP_NOIO);
2800
2801         if (IS_ERR(rbd_dev->header.object_prefix)) {
2802                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2803                 rbd_dev->header.object_prefix = NULL;
2804         } else {
2805                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2806         }
2807
2808 out:
2809         kfree(reply_buf);
2810
2811         return ret;
2812 }
2813
2814 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2815                 u64 *snap_features)
2816 {
2817         __le64 snapid = cpu_to_le64(snap_id);
2818         struct {
2819                 __le64 features;
2820                 __le64 incompat;
2821         } features_buf = { 0 };
2822         u64 incompat;
2823         int ret;
2824
2825         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2826                                 "rbd", "get_features",
2827                                 (char *) &snapid, sizeof (snapid),
2828                                 (char *) &features_buf, sizeof (features_buf),
2829                                 NULL);
2830         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2831         if (ret < 0)
2832                 return ret;
2833
2834         incompat = le64_to_cpu(features_buf.incompat);
2835         if (incompat & ~RBD_FEATURES_ALL)
2836                 return -ENXIO;
2837
2838         *snap_features = le64_to_cpu(features_buf.features);
2839
2840         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2841                 (unsigned long long) snap_id,
2842                 (unsigned long long) *snap_features,
2843                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2844
2845         return 0;
2846 }
2847
2848 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2849 {
2850         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2851                                                 &rbd_dev->header.features);
2852 }
2853
2854 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2855 {
2856         struct rbd_spec *parent_spec;
2857         size_t size;
2858         void *reply_buf = NULL;
2859         __le64 snapid;
2860         void *p;
2861         void *end;
2862         char *image_id;
2863         u64 overlap;
2864         int ret;
2865
2866         parent_spec = rbd_spec_alloc();
2867         if (!parent_spec)
2868                 return -ENOMEM;
2869
2870         size = sizeof (__le64) +                                /* pool_id */
2871                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2872                 sizeof (__le64) +                               /* snap_id */
2873                 sizeof (__le64);                                /* overlap */
2874         reply_buf = kmalloc(size, GFP_KERNEL);
2875         if (!reply_buf) {
2876                 ret = -ENOMEM;
2877                 goto out_err;
2878         }
2879
2880         snapid = cpu_to_le64(CEPH_NOSNAP);
2881         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2882                                 "rbd", "get_parent",
2883                                 (char *) &snapid, sizeof (snapid),
2884                                 (char *) reply_buf, size, NULL);
2885         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2886         if (ret < 0)
2887                 goto out_err;
2888
2889         ret = -ERANGE;
2890         p = reply_buf;
2891         end = (char *) reply_buf + size;
2892         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2893         if (parent_spec->pool_id == CEPH_NOPOOL)
2894                 goto out;       /* No parent?  No problem. */
2895
2896         /* The ceph file layout needs to fit pool id in 32 bits */
2897
2898         ret = -EIO;
2899         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2900                 goto out;
2901
2902         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2903         if (IS_ERR(image_id)) {
2904                 ret = PTR_ERR(image_id);
2905                 goto out_err;
2906         }
2907         parent_spec->image_id = image_id;
2908         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2909         ceph_decode_64_safe(&p, end, overlap, out_err);
2910
2911         rbd_dev->parent_overlap = overlap;
2912         rbd_dev->parent_spec = parent_spec;
2913         parent_spec = NULL;     /* rbd_dev now owns this */
2914 out:
2915         ret = 0;
2916 out_err:
2917         kfree(reply_buf);
2918         rbd_spec_put(parent_spec);
2919
2920         return ret;
2921 }
2922
2923 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2924 {
2925         size_t image_id_size;
2926         char *image_id;
2927         void *p;
2928         void *end;
2929         size_t size;
2930         void *reply_buf = NULL;
2931         size_t len = 0;
2932         char *image_name = NULL;
2933         int ret;
2934
2935         rbd_assert(!rbd_dev->spec->image_name);
2936
2937         len = strlen(rbd_dev->spec->image_id);
2938         image_id_size = sizeof (__le32) + len;
2939         image_id = kmalloc(image_id_size, GFP_KERNEL);
2940         if (!image_id)
2941                 return NULL;
2942
2943         p = image_id;
2944         end = (char *) image_id + image_id_size;
2945         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2946
2947         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2948         reply_buf = kmalloc(size, GFP_KERNEL);
2949         if (!reply_buf)
2950                 goto out;
2951
2952         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2953                                 "rbd", "dir_get_name",
2954                                 image_id, image_id_size,
2955                                 (char *) reply_buf, size, NULL);
2956         if (ret < 0)
2957                 goto out;
2958         p = reply_buf;
2959         end = (char *) reply_buf + size;
2960         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2961         if (IS_ERR(image_name))
2962                 image_name = NULL;
2963         else
2964                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2965 out:
2966         kfree(reply_buf);
2967         kfree(image_id);
2968
2969         return image_name;
2970 }
2971
2972 /*
2973  * When a parent image gets probed, we only have the pool, image,
2974  * and snapshot ids but not the names of any of them.  This call
2975  * is made later to fill in those names.  It has to be done after
2976  * rbd_dev_snaps_update() has completed because some of the
2977  * information (in particular, snapshot name) is not available
2978  * until then.
2979  */
2980 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2981 {
2982         struct ceph_osd_client *osdc;
2983         const char *name;
2984         void *reply_buf = NULL;
2985         int ret;
2986
2987         if (rbd_dev->spec->pool_name)
2988                 return 0;       /* Already have the names */
2989
2990         /* Look up the pool name */
2991
2992         osdc = &rbd_dev->rbd_client->client->osdc;
2993         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2994         if (!name) {
2995                 rbd_warn(rbd_dev, "there is no pool with id %llu",
2996                         rbd_dev->spec->pool_id);        /* Really a BUG() */
2997                 return -EIO;
2998         }
2999
3000         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3001         if (!rbd_dev->spec->pool_name)
3002                 return -ENOMEM;
3003
3004         /* Fetch the image name; tolerate failure here */
3005
3006         name = rbd_dev_image_name(rbd_dev);
3007         if (name)
3008                 rbd_dev->spec->image_name = (char *) name;
3009         else
3010                 rbd_warn(rbd_dev, "unable to get image name");
3011
3012         /* Look up the snapshot name. */
3013
3014         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3015         if (!name) {
3016                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3017                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3018                 ret = -EIO;
3019                 goto out_err;
3020         }
3021         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3022         if(!rbd_dev->spec->snap_name)
3023                 goto out_err;
3024
3025         return 0;
3026 out_err:
3027         kfree(reply_buf);
3028         kfree(rbd_dev->spec->pool_name);
3029         rbd_dev->spec->pool_name = NULL;
3030
3031         return ret;
3032 }
3033
3034 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3035 {
3036         size_t size;
3037         int ret;
3038         void *reply_buf;
3039         void *p;
3040         void *end;
3041         u64 seq;
3042         u32 snap_count;
3043         struct ceph_snap_context *snapc;
3044         u32 i;
3045
3046         /*
3047          * We'll need room for the seq value (maximum snapshot id),
3048          * snapshot count, and array of that many snapshot ids.
3049          * For now we have a fixed upper limit on the number we're
3050          * prepared to receive.
3051          */
3052         size = sizeof (__le64) + sizeof (__le32) +
3053                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3054         reply_buf = kzalloc(size, GFP_KERNEL);
3055         if (!reply_buf)
3056                 return -ENOMEM;
3057
3058         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3059                                 "rbd", "get_snapcontext",
3060                                 NULL, 0,
3061                                 reply_buf, size, ver);
3062         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3063         if (ret < 0)
3064                 goto out;
3065
3066         ret = -ERANGE;
3067         p = reply_buf;
3068         end = (char *) reply_buf + size;
3069         ceph_decode_64_safe(&p, end, seq, out);
3070         ceph_decode_32_safe(&p, end, snap_count, out);
3071
3072         /*
3073          * Make sure the reported number of snapshot ids wouldn't go
3074          * beyond the end of our buffer.  But before checking that,
3075          * make sure the computed size of the snapshot context we
3076          * allocate is representable in a size_t.
3077          */
3078         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3079                                  / sizeof (u64)) {
3080                 ret = -EINVAL;
3081                 goto out;
3082         }
3083         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3084                 goto out;
3085
3086         size = sizeof (struct ceph_snap_context) +
3087                                 snap_count * sizeof (snapc->snaps[0]);
3088         snapc = kmalloc(size, GFP_KERNEL);
3089         if (!snapc) {
3090                 ret = -ENOMEM;
3091                 goto out;
3092         }
3093
3094         atomic_set(&snapc->nref, 1);
3095         snapc->seq = seq;
3096         snapc->num_snaps = snap_count;
3097         for (i = 0; i < snap_count; i++)
3098                 snapc->snaps[i] = ceph_decode_64(&p);
3099
3100         rbd_dev->header.snapc = snapc;
3101
3102         dout("  snap context seq = %llu, snap_count = %u\n",
3103                 (unsigned long long) seq, (unsigned int) snap_count);
3104
3105 out:
3106         kfree(reply_buf);
3107
3108         return 0;
3109 }
3110
3111 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3112 {
3113         size_t size;
3114         void *reply_buf;
3115         __le64 snap_id;
3116         int ret;
3117         void *p;
3118         void *end;
3119         char *snap_name;
3120
3121         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3122         reply_buf = kmalloc(size, GFP_KERNEL);
3123         if (!reply_buf)
3124                 return ERR_PTR(-ENOMEM);
3125
3126         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3127         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3128                                 "rbd", "get_snapshot_name",
3129                                 (char *) &snap_id, sizeof (snap_id),
3130                                 reply_buf, size, NULL);
3131         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3132         if (ret < 0)
3133                 goto out;
3134
3135         p = reply_buf;
3136         end = (char *) reply_buf + size;
3137         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3138         if (IS_ERR(snap_name)) {
3139                 ret = PTR_ERR(snap_name);
3140                 goto out;
3141         } else {
3142                 dout("  snap_id 0x%016llx snap_name = %s\n",
3143                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3144         }
3145         kfree(reply_buf);
3146
3147         return snap_name;
3148 out:
3149         kfree(reply_buf);
3150
3151         return ERR_PTR(ret);
3152 }
3153
3154 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3155                 u64 *snap_size, u64 *snap_features)
3156 {
3157         u64 snap_id;
3158         u8 order;
3159         int ret;
3160
3161         snap_id = rbd_dev->header.snapc->snaps[which];
3162         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3163         if (ret)
3164                 return ERR_PTR(ret);
3165         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3166         if (ret)
3167                 return ERR_PTR(ret);
3168
3169         return rbd_dev_v2_snap_name(rbd_dev, which);
3170 }
3171
3172 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3173                 u64 *snap_size, u64 *snap_features)
3174 {
3175         if (rbd_dev->image_format == 1)
3176                 return rbd_dev_v1_snap_info(rbd_dev, which,
3177                                         snap_size, snap_features);
3178         if (rbd_dev->image_format == 2)
3179                 return rbd_dev_v2_snap_info(rbd_dev, which,
3180                                         snap_size, snap_features);
3181         return ERR_PTR(-EINVAL);
3182 }
3183
3184 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3185 {
3186         int ret;
3187         __u8 obj_order;
3188
3189         down_write(&rbd_dev->header_rwsem);
3190
3191         /* Grab old order first, to see if it changes */
3192
3193         obj_order = rbd_dev->header.obj_order,
3194         ret = rbd_dev_v2_image_size(rbd_dev);
3195         if (ret)
3196                 goto out;
3197         if (rbd_dev->header.obj_order != obj_order) {
3198                 ret = -EIO;
3199                 goto out;
3200         }
3201         rbd_update_mapping_size(rbd_dev);
3202
3203         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3204         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3205         if (ret)
3206                 goto out;
3207         ret = rbd_dev_snaps_update(rbd_dev);
3208         dout("rbd_dev_snaps_update returned %d\n", ret);
3209         if (ret)
3210                 goto out;
3211         ret = rbd_dev_snaps_register(rbd_dev);
3212         dout("rbd_dev_snaps_register returned %d\n", ret);
3213 out:
3214         up_write(&rbd_dev->header_rwsem);
3215
3216         return ret;
3217 }
3218
3219 /*
3220  * Scan the rbd device's current snapshot list and compare it to the
3221  * newly-received snapshot context.  Remove any existing snapshots
3222  * not present in the new snapshot context.  Add a new snapshot for
3223  * any snaphots in the snapshot context not in the current list.
3224  * And verify there are no changes to snapshots we already know
3225  * about.
3226  *
3227  * Assumes the snapshots in the snapshot context are sorted by
3228  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3229  * are also maintained in that order.)
3230  */
3231 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3232 {
3233         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3234         const u32 snap_count = snapc->num_snaps;
3235         struct list_head *head = &rbd_dev->snaps;
3236         struct list_head *links = head->next;
3237         u32 index = 0;
3238
3239         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3240         while (index < snap_count || links != head) {
3241                 u64 snap_id;
3242                 struct rbd_snap *snap;
3243                 char *snap_name;
3244                 u64 snap_size = 0;
3245                 u64 snap_features = 0;
3246
3247                 snap_id = index < snap_count ? snapc->snaps[index]
3248                                              : CEPH_NOSNAP;
3249                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3250                                      : NULL;
3251                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3252
3253                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3254                         struct list_head *next = links->next;
3255
3256                         /*
3257                          * A previously-existing snapshot is not in
3258                          * the new snap context.
3259                          *
3260                          * If the now missing snapshot is the one the
3261                          * image is mapped to, clear its exists flag
3262                          * so we can avoid sending any more requests
3263                          * to it.
3264                          */
3265                         if (rbd_dev->spec->snap_id == snap->id)
3266                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3267                         rbd_remove_snap_dev(snap);
3268                         dout("%ssnap id %llu has been removed\n",
3269                                 rbd_dev->spec->snap_id == snap->id ?
3270                                                         "mapped " : "",
3271                                 (unsigned long long) snap->id);
3272
3273                         /* Done with this list entry; advance */
3274
3275                         links = next;
3276                         continue;
3277                 }
3278
3279                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3280                                         &snap_size, &snap_features);
3281                 if (IS_ERR(snap_name))
3282                         return PTR_ERR(snap_name);
3283
3284                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3285                         (unsigned long long) snap_id);
3286                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3287                         struct rbd_snap *new_snap;
3288
3289                         /* We haven't seen this snapshot before */
3290
3291                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3292                                         snap_id, snap_size, snap_features);
3293                         if (IS_ERR(new_snap)) {
3294                                 int err = PTR_ERR(new_snap);
3295
3296                                 dout("  failed to add dev, error %d\n", err);
3297
3298                                 return err;
3299                         }
3300
3301                         /* New goes before existing, or at end of list */
3302
3303                         dout("  added dev%s\n", snap ? "" : " at end\n");
3304                         if (snap)
3305                                 list_add_tail(&new_snap->node, &snap->node);
3306                         else
3307                                 list_add_tail(&new_snap->node, head);
3308                 } else {
3309                         /* Already have this one */
3310
3311                         dout("  already present\n");
3312
3313                         rbd_assert(snap->size == snap_size);
3314                         rbd_assert(!strcmp(snap->name, snap_name));
3315                         rbd_assert(snap->features == snap_features);
3316
3317                         /* Done with this list entry; advance */
3318
3319                         links = links->next;
3320                 }
3321
3322                 /* Advance to the next entry in the snapshot context */
3323
3324                 index++;
3325         }
3326         dout("%s: done\n", __func__);
3327
3328         return 0;
3329 }
3330
3331 /*
3332  * Scan the list of snapshots and register the devices for any that
3333  * have not already been registered.
3334  */
3335 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3336 {
3337         struct rbd_snap *snap;
3338         int ret = 0;
3339
3340         dout("%s:\n", __func__);
3341         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3342                 return -EIO;
3343
3344         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3345                 if (!rbd_snap_registered(snap)) {
3346                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3347                         if (ret < 0)
3348                                 break;
3349                 }
3350         }
3351         dout("%s: returning %d\n", __func__, ret);
3352
3353         return ret;
3354 }
3355
3356 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3357 {
3358         struct device *dev;
3359         int ret;
3360
3361         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3362
3363         dev = &rbd_dev->dev;
3364         dev->bus = &rbd_bus_type;
3365         dev->type = &rbd_device_type;
3366         dev->parent = &rbd_root_dev;
3367         dev->release = rbd_dev_release;
3368         dev_set_name(dev, "%d", rbd_dev->dev_id);
3369         ret = device_register(dev);
3370
3371         mutex_unlock(&ctl_mutex);
3372
3373         return ret;
3374 }
3375
3376 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3377 {
3378         device_unregister(&rbd_dev->dev);
3379 }
3380
3381 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3382
3383 /*
3384  * Get a unique rbd identifier for the given new rbd_dev, and add
3385  * the rbd_dev to the global list.  The minimum rbd id is 1.
3386  */
3387 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3388 {
3389         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3390
3391         spin_lock(&rbd_dev_list_lock);
3392         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3393         spin_unlock(&rbd_dev_list_lock);
3394         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3395                 (unsigned long long) rbd_dev->dev_id);
3396 }
3397
3398 /*
3399  * Remove an rbd_dev from the global list, and record that its
3400  * identifier is no longer in use.
3401  */
3402 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3403 {
3404         struct list_head *tmp;
3405         int rbd_id = rbd_dev->dev_id;
3406         int max_id;
3407
3408         rbd_assert(rbd_id > 0);
3409
3410         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3411                 (unsigned long long) rbd_dev->dev_id);
3412         spin_lock(&rbd_dev_list_lock);
3413         list_del_init(&rbd_dev->node);
3414
3415         /*
3416          * If the id being "put" is not the current maximum, there
3417          * is nothing special we need to do.
3418          */
3419         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3420                 spin_unlock(&rbd_dev_list_lock);
3421                 return;
3422         }
3423
3424         /*
3425          * We need to update the current maximum id.  Search the
3426          * list to find out what it is.  We're more likely to find
3427          * the maximum at the end, so search the list backward.
3428          */
3429         max_id = 0;
3430         list_for_each_prev(tmp, &rbd_dev_list) {
3431                 struct rbd_device *rbd_dev;
3432
3433                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3434                 if (rbd_dev->dev_id > max_id)
3435                         max_id = rbd_dev->dev_id;
3436         }
3437         spin_unlock(&rbd_dev_list_lock);
3438
3439         /*
3440          * The max id could have been updated by rbd_dev_id_get(), in
3441          * which case it now accurately reflects the new maximum.
3442          * Be careful not to overwrite the maximum value in that
3443          * case.
3444          */
3445         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3446         dout("  max dev id has been reset\n");
3447 }
3448
3449 /*
3450  * Skips over white space at *buf, and updates *buf to point to the
3451  * first found non-space character (if any). Returns the length of
3452  * the token (string of non-white space characters) found.  Note
3453  * that *buf must be terminated with '\0'.
3454  */
3455 static inline size_t next_token(const char **buf)
3456 {
3457         /*
3458         * These are the characters that produce nonzero for
3459         * isspace() in the "C" and "POSIX" locales.
3460         */
3461         const char *spaces = " \f\n\r\t\v";
3462
3463         *buf += strspn(*buf, spaces);   /* Find start of token */
3464
3465         return strcspn(*buf, spaces);   /* Return token length */
3466 }
3467
3468 /*
3469  * Finds the next token in *buf, and if the provided token buffer is
3470  * big enough, copies the found token into it.  The result, if
3471  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3472  * must be terminated with '\0' on entry.
3473  *
3474  * Returns the length of the token found (not including the '\0').
3475  * Return value will be 0 if no token is found, and it will be >=
3476  * token_size if the token would not fit.
3477  *
3478  * The *buf pointer will be updated to point beyond the end of the
3479  * found token.  Note that this occurs even if the token buffer is
3480  * too small to hold it.
3481  */
3482 static inline size_t copy_token(const char **buf,
3483                                 char *token,
3484                                 size_t token_size)
3485 {
3486         size_t len;
3487
3488         len = next_token(buf);
3489         if (len < token_size) {
3490                 memcpy(token, *buf, len);
3491                 *(token + len) = '\0';
3492         }
3493         *buf += len;
3494
3495         return len;
3496 }
3497
3498 /*
3499  * Finds the next token in *buf, dynamically allocates a buffer big
3500  * enough to hold a copy of it, and copies the token into the new
3501  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3502  * that a duplicate buffer is created even for a zero-length token.
3503  *
3504  * Returns a pointer to the newly-allocated duplicate, or a null
3505  * pointer if memory for the duplicate was not available.  If
3506  * the lenp argument is a non-null pointer, the length of the token
3507  * (not including the '\0') is returned in *lenp.
3508  *
3509  * If successful, the *buf pointer will be updated to point beyond
3510  * the end of the found token.
3511  *
3512  * Note: uses GFP_KERNEL for allocation.
3513  */
3514 static inline char *dup_token(const char **buf, size_t *lenp)
3515 {
3516         char *dup;
3517         size_t len;
3518
3519         len = next_token(buf);
3520         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3521         if (!dup)
3522                 return NULL;
3523         *(dup + len) = '\0';
3524         *buf += len;
3525
3526         if (lenp)
3527                 *lenp = len;
3528
3529         return dup;
3530 }
3531
3532 /*
3533  * Parse the options provided for an "rbd add" (i.e., rbd image
3534  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3535  * and the data written is passed here via a NUL-terminated buffer.
3536  * Returns 0 if successful or an error code otherwise.
3537  *
3538  * The information extracted from these options is recorded in
3539  * the other parameters which return dynamically-allocated
3540  * structures:
3541  *  ceph_opts
3542  *      The address of a pointer that will refer to a ceph options
3543  *      structure.  Caller must release the returned pointer using
3544  *      ceph_destroy_options() when it is no longer needed.
3545  *  rbd_opts
3546  *      Address of an rbd options pointer.  Fully initialized by
3547  *      this function; caller must release with kfree().
3548  *  spec
3549  *      Address of an rbd image specification pointer.  Fully
3550  *      initialized by this function based on parsed options.
3551  *      Caller must release with rbd_spec_put().
3552  *
3553  * The options passed take this form:
3554  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3555  * where:
3556  *  <mon_addrs>
3557  *      A comma-separated list of one or more monitor addresses.
3558  *      A monitor address is an ip address, optionally followed
3559  *      by a port number (separated by a colon).
3560  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3561  *  <options>
3562  *      A comma-separated list of ceph and/or rbd options.
3563  *  <pool_name>
3564  *      The name of the rados pool containing the rbd image.
3565  *  <image_name>
3566  *      The name of the image in that pool to map.
3567  *  <snap_id>
3568  *      An optional snapshot id.  If provided, the mapping will
3569  *      present data from the image at the time that snapshot was
3570  *      created.  The image head is used if no snapshot id is
3571  *      provided.  Snapshot mappings are always read-only.
3572  */
3573 static int rbd_add_parse_args(const char *buf,
3574                                 struct ceph_options **ceph_opts,
3575                                 struct rbd_options **opts,
3576                                 struct rbd_spec **rbd_spec)
3577 {
3578         size_t len;
3579         char *options;
3580         const char *mon_addrs;
3581         size_t mon_addrs_size;
3582         struct rbd_spec *spec = NULL;
3583         struct rbd_options *rbd_opts = NULL;
3584         struct ceph_options *copts;
3585         int ret;
3586
3587         /* The first four tokens are required */
3588
3589         len = next_token(&buf);
3590         if (!len) {
3591                 rbd_warn(NULL, "no monitor address(es) provided");
3592                 return -EINVAL;
3593         }
3594         mon_addrs = buf;
3595         mon_addrs_size = len + 1;
3596         buf += len;
3597
3598         ret = -EINVAL;
3599         options = dup_token(&buf, NULL);
3600         if (!options)
3601                 return -ENOMEM;
3602         if (!*options) {
3603                 rbd_warn(NULL, "no options provided");
3604                 goto out_err;
3605         }
3606
3607         spec = rbd_spec_alloc();
3608         if (!spec)
3609                 goto out_mem;
3610
3611         spec->pool_name = dup_token(&buf, NULL);
3612         if (!spec->pool_name)
3613                 goto out_mem;
3614         if (!*spec->pool_name) {
3615                 rbd_warn(NULL, "no pool name provided");
3616                 goto out_err;
3617         }
3618
3619         spec->image_name = dup_token(&buf, NULL);
3620         if (!spec->image_name)
3621                 goto out_mem;
3622         if (!*spec->image_name) {
3623                 rbd_warn(NULL, "no image name provided");
3624                 goto out_err;
3625         }
3626
3627         /*
3628          * Snapshot name is optional; default is to use "-"
3629          * (indicating the head/no snapshot).
3630          */
3631         len = next_token(&buf);
3632         if (!len) {
3633                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3634                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3635         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3636                 ret = -ENAMETOOLONG;
3637                 goto out_err;
3638         }
3639         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3640         if (!spec->snap_name)
3641                 goto out_mem;
3642         *(spec->snap_name + len) = '\0';
3643
3644         /* Initialize all rbd options to the defaults */
3645
3646         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3647         if (!rbd_opts)
3648                 goto out_mem;
3649
3650         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3651
3652         copts = ceph_parse_options(options, mon_addrs,
3653                                         mon_addrs + mon_addrs_size - 1,
3654                                         parse_rbd_opts_token, rbd_opts);
3655         if (IS_ERR(copts)) {
3656                 ret = PTR_ERR(copts);
3657                 goto out_err;
3658         }
3659         kfree(options);
3660
3661         *ceph_opts = copts;
3662         *opts = rbd_opts;
3663         *rbd_spec = spec;
3664
3665         return 0;
3666 out_mem:
3667         ret = -ENOMEM;
3668 out_err:
3669         kfree(rbd_opts);
3670         rbd_spec_put(spec);
3671         kfree(options);
3672
3673         return ret;
3674 }
3675
3676 /*
3677  * An rbd format 2 image has a unique identifier, distinct from the
3678  * name given to it by the user.  Internally, that identifier is
3679  * what's used to specify the names of objects related to the image.
3680  *
3681  * A special "rbd id" object is used to map an rbd image name to its
3682  * id.  If that object doesn't exist, then there is no v2 rbd image
3683  * with the supplied name.
3684  *
3685  * This function will record the given rbd_dev's image_id field if
3686  * it can be determined, and in that case will return 0.  If any
3687  * errors occur a negative errno will be returned and the rbd_dev's
3688  * image_id field will be unchanged (and should be NULL).
3689  */
3690 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3691 {
3692         int ret;
3693         size_t size;
3694         char *object_name;
3695         void *response;
3696         void *p;
3697
3698         /*
3699          * When probing a parent image, the image id is already
3700          * known (and the image name likely is not).  There's no
3701          * need to fetch the image id again in this case.
3702          */
3703         if (rbd_dev->spec->image_id)
3704                 return 0;
3705
3706         /*
3707          * First, see if the format 2 image id file exists, and if
3708          * so, get the image's persistent id from it.
3709          */
3710         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3711         object_name = kmalloc(size, GFP_NOIO);
3712         if (!object_name)
3713                 return -ENOMEM;
3714         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3715         dout("rbd id object name is %s\n", object_name);
3716
3717         /* Response will be an encoded string, which includes a length */
3718
3719         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3720         response = kzalloc(size, GFP_NOIO);
3721         if (!response) {
3722                 ret = -ENOMEM;
3723                 goto out;
3724         }
3725
3726         ret = rbd_obj_method_sync(rbd_dev, object_name,
3727                                 "rbd", "get_id",
3728                                 NULL, 0,
3729                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3730         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3731         if (ret < 0)
3732                 goto out;
3733
3734         p = response;
3735         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3736                                                 p + RBD_IMAGE_ID_LEN_MAX,
3737                                                 NULL, GFP_NOIO);
3738         if (IS_ERR(rbd_dev->spec->image_id)) {
3739                 ret = PTR_ERR(rbd_dev->spec->image_id);
3740                 rbd_dev->spec->image_id = NULL;
3741         } else {
3742                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3743         }
3744 out:
3745         kfree(response);
3746         kfree(object_name);
3747
3748         return ret;
3749 }
3750
3751 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3752 {
3753         int ret;
3754         size_t size;
3755
3756         /* Version 1 images have no id; empty string is used */
3757
3758         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3759         if (!rbd_dev->spec->image_id)
3760                 return -ENOMEM;
3761
3762         /* Record the header object name for this rbd image. */
3763
3764         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3765         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3766         if (!rbd_dev->header_name) {
3767                 ret = -ENOMEM;
3768                 goto out_err;
3769         }
3770         sprintf(rbd_dev->header_name, "%s%s",
3771                 rbd_dev->spec->image_name, RBD_SUFFIX);
3772
3773         /* Populate rbd image metadata */
3774
3775         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3776         if (ret < 0)
3777                 goto out_err;
3778
3779         /* Version 1 images have no parent (no layering) */
3780
3781         rbd_dev->parent_spec = NULL;
3782         rbd_dev->parent_overlap = 0;
3783
3784         rbd_dev->image_format = 1;
3785
3786         dout("discovered version 1 image, header name is %s\n",
3787                 rbd_dev->header_name);
3788
3789         return 0;
3790
3791 out_err:
3792         kfree(rbd_dev->header_name);
3793         rbd_dev->header_name = NULL;
3794         kfree(rbd_dev->spec->image_id);
3795         rbd_dev->spec->image_id = NULL;
3796
3797         return ret;
3798 }
3799
3800 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3801 {
3802         size_t size;
3803         int ret;
3804         u64 ver = 0;
3805
3806         /*
3807          * Image id was filled in by the caller.  Record the header
3808          * object name for this rbd image.
3809          */
3810         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3811         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3812         if (!rbd_dev->header_name)
3813                 return -ENOMEM;
3814         sprintf(rbd_dev->header_name, "%s%s",
3815                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3816
3817         /* Get the size and object order for the image */
3818
3819         ret = rbd_dev_v2_image_size(rbd_dev);
3820         if (ret < 0)
3821                 goto out_err;
3822
3823         /* Get the object prefix (a.k.a. block_name) for the image */
3824
3825         ret = rbd_dev_v2_object_prefix(rbd_dev);
3826         if (ret < 0)
3827                 goto out_err;
3828
3829         /* Get the and check features for the image */
3830
3831         ret = rbd_dev_v2_features(rbd_dev);
3832         if (ret < 0)
3833                 goto out_err;
3834
3835         /* If the image supports layering, get the parent info */
3836
3837         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3838                 ret = rbd_dev_v2_parent_info(rbd_dev);
3839                 if (ret < 0)
3840                         goto out_err;
3841         }
3842
3843         /* crypto and compression type aren't (yet) supported for v2 images */
3844
3845         rbd_dev->header.crypt_type = 0;
3846         rbd_dev->header.comp_type = 0;
3847
3848         /* Get the snapshot context, plus the header version */
3849
3850         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3851         if (ret)
3852                 goto out_err;
3853         rbd_dev->header.obj_version = ver;
3854
3855         rbd_dev->image_format = 2;
3856
3857         dout("discovered version 2 image, header name is %s\n",
3858                 rbd_dev->header_name);
3859
3860         return 0;
3861 out_err:
3862         rbd_dev->parent_overlap = 0;
3863         rbd_spec_put(rbd_dev->parent_spec);
3864         rbd_dev->parent_spec = NULL;
3865         kfree(rbd_dev->header_name);
3866         rbd_dev->header_name = NULL;
3867         kfree(rbd_dev->header.object_prefix);
3868         rbd_dev->header.object_prefix = NULL;
3869
3870         return ret;
3871 }
3872
3873 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3874 {
3875         int ret;
3876
3877         /* no need to lock here, as rbd_dev is not registered yet */
3878         ret = rbd_dev_snaps_update(rbd_dev);
3879         if (ret)
3880                 return ret;
3881
3882         ret = rbd_dev_probe_update_spec(rbd_dev);
3883         if (ret)
3884                 goto err_out_snaps;
3885
3886         ret = rbd_dev_set_mapping(rbd_dev);
3887         if (ret)
3888                 goto err_out_snaps;
3889
3890         /* generate unique id: find highest unique id, add one */
3891         rbd_dev_id_get(rbd_dev);
3892
3893         /* Fill in the device name, now that we have its id. */
3894         BUILD_BUG_ON(DEV_NAME_LEN
3895                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3896         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3897
3898         /* Get our block major device number. */
3899
3900         ret = register_blkdev(0, rbd_dev->name);
3901         if (ret < 0)
3902                 goto err_out_id;
3903         rbd_dev->major = ret;
3904
3905         /* Set up the blkdev mapping. */
3906
3907         ret = rbd_init_disk(rbd_dev);
3908         if (ret)
3909                 goto err_out_blkdev;
3910
3911         ret = rbd_bus_add_dev(rbd_dev);
3912         if (ret)
3913                 goto err_out_disk;
3914
3915         /*
3916          * At this point cleanup in the event of an error is the job
3917          * of the sysfs code (initiated by rbd_bus_del_dev()).
3918          */
3919         down_write(&rbd_dev->header_rwsem);
3920         ret = rbd_dev_snaps_register(rbd_dev);
3921         up_write(&rbd_dev->header_rwsem);
3922         if (ret)
3923                 goto err_out_bus;
3924
3925         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3926         if (ret)
3927                 goto err_out_bus;
3928
3929         /* Everything's ready.  Announce the disk to the world. */
3930
3931         add_disk(rbd_dev->disk);
3932
3933         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3934                 (unsigned long long) rbd_dev->mapping.size);
3935
3936         return ret;
3937 err_out_bus:
3938         /* this will also clean up rest of rbd_dev stuff */
3939
3940         rbd_bus_del_dev(rbd_dev);
3941
3942         return ret;
3943 err_out_disk:
3944         rbd_free_disk(rbd_dev);
3945 err_out_blkdev:
3946         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3947 err_out_id:
3948         rbd_dev_id_put(rbd_dev);
3949 err_out_snaps:
3950         rbd_remove_all_snaps(rbd_dev);
3951
3952         return ret;
3953 }
3954
3955 /*
3956  * Probe for the existence of the header object for the given rbd
3957  * device.  For format 2 images this includes determining the image
3958  * id.
3959  */
3960 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3961 {
3962         int ret;
3963
3964         /*
3965          * Get the id from the image id object.  If it's not a
3966          * format 2 image, we'll get ENOENT back, and we'll assume
3967          * it's a format 1 image.
3968          */
3969         ret = rbd_dev_image_id(rbd_dev);
3970         if (ret)
3971                 ret = rbd_dev_v1_probe(rbd_dev);
3972         else
3973                 ret = rbd_dev_v2_probe(rbd_dev);
3974         if (ret) {
3975                 dout("probe failed, returning %d\n", ret);
3976
3977                 return ret;
3978         }
3979
3980         ret = rbd_dev_probe_finish(rbd_dev);
3981         if (ret)
3982                 rbd_header_free(&rbd_dev->header);
3983
3984         return ret;
3985 }
3986
3987 static ssize_t rbd_add(struct bus_type *bus,
3988                        const char *buf,
3989                        size_t count)
3990 {
3991         struct rbd_device *rbd_dev = NULL;
3992         struct ceph_options *ceph_opts = NULL;
3993         struct rbd_options *rbd_opts = NULL;
3994         struct rbd_spec *spec = NULL;
3995         struct rbd_client *rbdc;
3996         struct ceph_osd_client *osdc;
3997         int rc = -ENOMEM;
3998
3999         if (!try_module_get(THIS_MODULE))
4000                 return -ENODEV;
4001
4002         /* parse add command */
4003         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4004         if (rc < 0)
4005                 goto err_out_module;
4006
4007         rbdc = rbd_get_client(ceph_opts);
4008         if (IS_ERR(rbdc)) {
4009                 rc = PTR_ERR(rbdc);
4010                 goto err_out_args;
4011         }
4012         ceph_opts = NULL;       /* rbd_dev client now owns this */
4013
4014         /* pick the pool */
4015         osdc = &rbdc->client->osdc;
4016         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4017         if (rc < 0)
4018                 goto err_out_client;
4019         spec->pool_id = (u64) rc;
4020
4021         /* The ceph file layout needs to fit pool id in 32 bits */
4022
4023         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4024                 rc = -EIO;
4025                 goto err_out_client;
4026         }
4027
4028         rbd_dev = rbd_dev_create(rbdc, spec);
4029         if (!rbd_dev)
4030                 goto err_out_client;
4031         rbdc = NULL;            /* rbd_dev now owns this */
4032         spec = NULL;            /* rbd_dev now owns this */
4033
4034         rbd_dev->mapping.read_only = rbd_opts->read_only;
4035         kfree(rbd_opts);
4036         rbd_opts = NULL;        /* done with this */
4037
4038         rc = rbd_dev_probe(rbd_dev);
4039         if (rc < 0)
4040                 goto err_out_rbd_dev;
4041
4042         return count;
4043 err_out_rbd_dev:
4044         rbd_dev_destroy(rbd_dev);
4045 err_out_client:
4046         rbd_put_client(rbdc);
4047 err_out_args:
4048         if (ceph_opts)
4049                 ceph_destroy_options(ceph_opts);
4050         kfree(rbd_opts);
4051         rbd_spec_put(spec);
4052 err_out_module:
4053         module_put(THIS_MODULE);
4054
4055         dout("Error adding device %s\n", buf);
4056
4057         return (ssize_t) rc;
4058 }
4059
4060 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4061 {
4062         struct list_head *tmp;
4063         struct rbd_device *rbd_dev;
4064
4065         spin_lock(&rbd_dev_list_lock);
4066         list_for_each(tmp, &rbd_dev_list) {
4067                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4068                 if (rbd_dev->dev_id == dev_id) {
4069                         spin_unlock(&rbd_dev_list_lock);
4070                         return rbd_dev;
4071                 }
4072         }
4073         spin_unlock(&rbd_dev_list_lock);
4074         return NULL;
4075 }
4076
4077 static void rbd_dev_release(struct device *dev)
4078 {
4079         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4080
4081         if (rbd_dev->watch_event)
4082                 rbd_dev_header_watch_sync(rbd_dev, 0);
4083
4084         /* clean up and free blkdev */
4085         rbd_free_disk(rbd_dev);
4086         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4087
4088         /* release allocated disk header fields */
4089         rbd_header_free(&rbd_dev->header);
4090
4091         /* done with the id, and with the rbd_dev */
4092         rbd_dev_id_put(rbd_dev);
4093         rbd_assert(rbd_dev->rbd_client != NULL);
4094         rbd_dev_destroy(rbd_dev);
4095
4096         /* release module ref */
4097         module_put(THIS_MODULE);
4098 }
4099
4100 static ssize_t rbd_remove(struct bus_type *bus,
4101                           const char *buf,
4102                           size_t count)
4103 {
4104         struct rbd_device *rbd_dev = NULL;
4105         int target_id, rc;
4106         unsigned long ul;
4107         int ret = count;
4108
4109         rc = strict_strtoul(buf, 10, &ul);
4110         if (rc)
4111                 return rc;
4112
4113         /* convert to int; abort if we lost anything in the conversion */
4114         target_id = (int) ul;
4115         if (target_id != ul)
4116                 return -EINVAL;
4117
4118         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4119
4120         rbd_dev = __rbd_get_dev(target_id);
4121         if (!rbd_dev) {
4122                 ret = -ENOENT;
4123                 goto done;
4124         }
4125
4126         spin_lock_irq(&rbd_dev->lock);
4127         if (rbd_dev->open_count)
4128                 ret = -EBUSY;
4129         else
4130                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4131         spin_unlock_irq(&rbd_dev->lock);
4132         if (ret < 0)
4133                 goto done;
4134
4135         rbd_remove_all_snaps(rbd_dev);
4136         rbd_bus_del_dev(rbd_dev);
4137
4138 done:
4139         mutex_unlock(&ctl_mutex);
4140
4141         return ret;
4142 }
4143
4144 /*
4145  * create control files in sysfs
4146  * /sys/bus/rbd/...
4147  */
4148 static int rbd_sysfs_init(void)
4149 {
4150         int ret;
4151
4152         ret = device_register(&rbd_root_dev);
4153         if (ret < 0)
4154                 return ret;
4155
4156         ret = bus_register(&rbd_bus_type);
4157         if (ret < 0)
4158                 device_unregister(&rbd_root_dev);
4159
4160         return ret;
4161 }
4162
4163 static void rbd_sysfs_cleanup(void)
4164 {
4165         bus_unregister(&rbd_bus_type);
4166         device_unregister(&rbd_root_dev);
4167 }
4168
4169 static int __init rbd_init(void)
4170 {
4171         int rc;
4172
4173         if (!libceph_compatible(NULL)) {
4174                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4175
4176                 return -EINVAL;
4177         }
4178         rc = rbd_sysfs_init();
4179         if (rc)
4180                 return rc;
4181         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4182         return 0;
4183 }
4184
4185 static void __exit rbd_exit(void)
4186 {
4187         rbd_sysfs_cleanup();
4188 }
4189
4190 module_init(rbd_init);
4191 module_exit(rbd_exit);
4192
4193 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4194 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4195 MODULE_DESCRIPTION("rados block device");
4196
4197 /* following authorship retained from original osdblk.c */
4198 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4199
4200 MODULE_LICENSE("GPL");