]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: define rbd_osd_req_format_op()
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING      1
77
78 /* Features supported by this (client software) implementation. */
79
80 #define RBD_FEATURES_ALL          (0)
81
82 /*
83  * An RBD device name will be "rbd#", where the "rbd" comes from
84  * RBD_DRV_NAME above, and # is a unique integer identifier.
85  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
86  * enough to hold all possible device names.
87  */
88 #define DEV_NAME_LEN            32
89 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
90
91 /*
92  * block device image metadata (in-memory version)
93  */
94 struct rbd_image_header {
95         /* These four fields never change for a given rbd image */
96         char *object_prefix;
97         u64 features;
98         __u8 obj_order;
99         __u8 crypt_type;
100         __u8 comp_type;
101
102         /* The remaining fields need to be updated occasionally */
103         u64 image_size;
104         struct ceph_snap_context *snapc;
105         char *snap_names;
106         u64 *snap_sizes;
107
108         u64 obj_version;
109 };
110
111 /*
112  * An rbd image specification.
113  *
114  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
115  * identify an image.  Each rbd_dev structure includes a pointer to
116  * an rbd_spec structure that encapsulates this identity.
117  *
118  * Each of the id's in an rbd_spec has an associated name.  For a
119  * user-mapped image, the names are supplied and the id's associated
120  * with them are looked up.  For a layered image, a parent image is
121  * defined by the tuple, and the names are looked up.
122  *
123  * An rbd_dev structure contains a parent_spec pointer which is
124  * non-null if the image it represents is a child in a layered
125  * image.  This pointer will refer to the rbd_spec structure used
126  * by the parent rbd_dev for its own identity (i.e., the structure
127  * is shared between the parent and child).
128  *
129  * Since these structures are populated once, during the discovery
130  * phase of image construction, they are effectively immutable so
131  * we make no effort to synchronize access to them.
132  *
133  * Note that code herein does not assume the image name is known (it
134  * could be a null pointer).
135  */
136 struct rbd_spec {
137         u64             pool_id;
138         char            *pool_name;
139
140         char            *image_id;
141         char            *image_name;
142
143         u64             snap_id;
144         char            *snap_name;
145
146         struct kref     kref;
147 };
148
149 /*
150  * an instance of the client.  multiple devices may share an rbd client.
151  */
152 struct rbd_client {
153         struct ceph_client      *client;
154         struct kref             kref;
155         struct list_head        node;
156 };
157
158 struct rbd_img_request;
159 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
160
161 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
162
163 struct rbd_obj_request;
164 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
165
166 enum obj_request_type {
167         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
168 };
169
170 struct rbd_obj_request {
171         const char              *object_name;
172         u64                     offset;         /* object start byte */
173         u64                     length;         /* bytes from offset */
174
175         struct rbd_img_request  *img_request;
176         struct list_head        links;          /* img_request->obj_requests */
177         u32                     which;          /* posn image request list */
178
179         enum obj_request_type   type;
180         union {
181                 struct bio      *bio_list;
182                 struct {
183                         struct page     **pages;
184                         u32             page_count;
185                 };
186         };
187
188         struct ceph_osd_request *osd_req;
189
190         u64                     xferred;        /* bytes transferred */
191         u64                     version;
192         int                     result;
193         atomic_t                done;
194
195         rbd_obj_callback_t      callback;
196         struct completion       completion;
197
198         struct kref             kref;
199 };
200
201 struct rbd_img_request {
202         struct request          *rq;
203         struct rbd_device       *rbd_dev;
204         u64                     offset; /* starting image byte offset */
205         u64                     length; /* byte count from offset */
206         bool                    write_request;  /* false for read */
207         union {
208                 struct ceph_snap_context *snapc;        /* for writes */
209                 u64             snap_id;                /* for reads */
210         };
211         spinlock_t              completion_lock;/* protects next_completion */
212         u32                     next_completion;
213         rbd_img_callback_t      callback;
214
215         u32                     obj_request_count;
216         struct list_head        obj_requests;   /* rbd_obj_request structs */
217
218         struct kref             kref;
219 };
220
221 #define for_each_obj_request(ireq, oreq) \
222         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
223 #define for_each_obj_request_from(ireq, oreq) \
224         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
225 #define for_each_obj_request_safe(ireq, oreq, n) \
226         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
227
228 struct rbd_snap {
229         struct  device          dev;
230         const char              *name;
231         u64                     size;
232         struct list_head        node;
233         u64                     id;
234         u64                     features;
235 };
236
237 struct rbd_mapping {
238         u64                     size;
239         u64                     features;
240         bool                    read_only;
241 };
242
243 /*
244  * a single device
245  */
246 struct rbd_device {
247         int                     dev_id;         /* blkdev unique id */
248
249         int                     major;          /* blkdev assigned major */
250         struct gendisk          *disk;          /* blkdev's gendisk and rq */
251
252         u32                     image_format;   /* Either 1 or 2 */
253         struct rbd_client       *rbd_client;
254
255         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
256
257         spinlock_t              lock;           /* queue, flags, open_count */
258
259         struct rbd_image_header header;
260         unsigned long           flags;          /* possibly lock protected */
261         struct rbd_spec         *spec;
262
263         char                    *header_name;
264
265         struct ceph_file_layout layout;
266
267         struct ceph_osd_event   *watch_event;
268         struct rbd_obj_request  *watch_request;
269
270         struct rbd_spec         *parent_spec;
271         u64                     parent_overlap;
272
273         /* protects updating the header */
274         struct rw_semaphore     header_rwsem;
275
276         struct rbd_mapping      mapping;
277
278         struct list_head        node;
279
280         /* list of snapshots */
281         struct list_head        snaps;
282
283         /* sysfs related */
284         struct device           dev;
285         unsigned long           open_count;     /* protected by lock */
286 };
287
288 /*
289  * Flag bits for rbd_dev->flags.  If atomicity is required,
290  * rbd_dev->lock is used to protect access.
291  *
292  * Currently, only the "removing" flag (which is coupled with the
293  * "open_count" field) requires atomic access.
294  */
295 enum rbd_dev_flags {
296         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
297         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
298 };
299
300 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
301
302 static LIST_HEAD(rbd_dev_list);    /* devices */
303 static DEFINE_SPINLOCK(rbd_dev_list_lock);
304
305 static LIST_HEAD(rbd_client_list);              /* clients */
306 static DEFINE_SPINLOCK(rbd_client_list_lock);
307
308 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
309 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
310
311 static void rbd_dev_release(struct device *dev);
312 static void rbd_remove_snap_dev(struct rbd_snap *snap);
313
314 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
315                        size_t count);
316 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
317                           size_t count);
318
319 static struct bus_attribute rbd_bus_attrs[] = {
320         __ATTR(add, S_IWUSR, NULL, rbd_add),
321         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
322         __ATTR_NULL
323 };
324
325 static struct bus_type rbd_bus_type = {
326         .name           = "rbd",
327         .bus_attrs      = rbd_bus_attrs,
328 };
329
330 static void rbd_root_dev_release(struct device *dev)
331 {
332 }
333
334 static struct device rbd_root_dev = {
335         .init_name =    "rbd",
336         .release =      rbd_root_dev_release,
337 };
338
339 static __printf(2, 3)
340 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
341 {
342         struct va_format vaf;
343         va_list args;
344
345         va_start(args, fmt);
346         vaf.fmt = fmt;
347         vaf.va = &args;
348
349         if (!rbd_dev)
350                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
351         else if (rbd_dev->disk)
352                 printk(KERN_WARNING "%s: %s: %pV\n",
353                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
354         else if (rbd_dev->spec && rbd_dev->spec->image_name)
355                 printk(KERN_WARNING "%s: image %s: %pV\n",
356                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
357         else if (rbd_dev->spec && rbd_dev->spec->image_id)
358                 printk(KERN_WARNING "%s: id %s: %pV\n",
359                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
360         else    /* punt */
361                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
362                         RBD_DRV_NAME, rbd_dev, &vaf);
363         va_end(args);
364 }
365
366 #ifdef RBD_DEBUG
367 #define rbd_assert(expr)                                                \
368                 if (unlikely(!(expr))) {                                \
369                         printk(KERN_ERR "\nAssertion failure in %s() "  \
370                                                 "at line %d:\n\n"       \
371                                         "\trbd_assert(%s);\n\n",        \
372                                         __func__, __LINE__, #expr);     \
373                         BUG();                                          \
374                 }
375 #else /* !RBD_DEBUG */
376 #  define rbd_assert(expr)      ((void) 0)
377 #endif /* !RBD_DEBUG */
378
379 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
380 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
381
382 static int rbd_open(struct block_device *bdev, fmode_t mode)
383 {
384         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
385         bool removing = false;
386
387         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
388                 return -EROFS;
389
390         spin_lock_irq(&rbd_dev->lock);
391         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
392                 removing = true;
393         else
394                 rbd_dev->open_count++;
395         spin_unlock_irq(&rbd_dev->lock);
396         if (removing)
397                 return -ENOENT;
398
399         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
400         (void) get_device(&rbd_dev->dev);
401         set_device_ro(bdev, rbd_dev->mapping.read_only);
402         mutex_unlock(&ctl_mutex);
403
404         return 0;
405 }
406
407 static int rbd_release(struct gendisk *disk, fmode_t mode)
408 {
409         struct rbd_device *rbd_dev = disk->private_data;
410         unsigned long open_count_before;
411
412         spin_lock_irq(&rbd_dev->lock);
413         open_count_before = rbd_dev->open_count--;
414         spin_unlock_irq(&rbd_dev->lock);
415         rbd_assert(open_count_before > 0);
416
417         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
418         put_device(&rbd_dev->dev);
419         mutex_unlock(&ctl_mutex);
420
421         return 0;
422 }
423
424 static const struct block_device_operations rbd_bd_ops = {
425         .owner                  = THIS_MODULE,
426         .open                   = rbd_open,
427         .release                = rbd_release,
428 };
429
430 /*
431  * Initialize an rbd client instance.
432  * We own *ceph_opts.
433  */
434 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
435 {
436         struct rbd_client *rbdc;
437         int ret = -ENOMEM;
438
439         dout("%s:\n", __func__);
440         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
441         if (!rbdc)
442                 goto out_opt;
443
444         kref_init(&rbdc->kref);
445         INIT_LIST_HEAD(&rbdc->node);
446
447         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
448
449         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
450         if (IS_ERR(rbdc->client))
451                 goto out_mutex;
452         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
453
454         ret = ceph_open_session(rbdc->client);
455         if (ret < 0)
456                 goto out_err;
457
458         spin_lock(&rbd_client_list_lock);
459         list_add_tail(&rbdc->node, &rbd_client_list);
460         spin_unlock(&rbd_client_list_lock);
461
462         mutex_unlock(&ctl_mutex);
463         dout("%s: rbdc %p\n", __func__, rbdc);
464
465         return rbdc;
466
467 out_err:
468         ceph_destroy_client(rbdc->client);
469 out_mutex:
470         mutex_unlock(&ctl_mutex);
471         kfree(rbdc);
472 out_opt:
473         if (ceph_opts)
474                 ceph_destroy_options(ceph_opts);
475         dout("%s: error %d\n", __func__, ret);
476
477         return ERR_PTR(ret);
478 }
479
480 /*
481  * Find a ceph client with specific addr and configuration.  If
482  * found, bump its reference count.
483  */
484 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
485 {
486         struct rbd_client *client_node;
487         bool found = false;
488
489         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
490                 return NULL;
491
492         spin_lock(&rbd_client_list_lock);
493         list_for_each_entry(client_node, &rbd_client_list, node) {
494                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
495                         kref_get(&client_node->kref);
496                         found = true;
497                         break;
498                 }
499         }
500         spin_unlock(&rbd_client_list_lock);
501
502         return found ? client_node : NULL;
503 }
504
505 /*
506  * mount options
507  */
508 enum {
509         Opt_last_int,
510         /* int args above */
511         Opt_last_string,
512         /* string args above */
513         Opt_read_only,
514         Opt_read_write,
515         /* Boolean args above */
516         Opt_last_bool,
517 };
518
519 static match_table_t rbd_opts_tokens = {
520         /* int args above */
521         /* string args above */
522         {Opt_read_only, "read_only"},
523         {Opt_read_only, "ro"},          /* Alternate spelling */
524         {Opt_read_write, "read_write"},
525         {Opt_read_write, "rw"},         /* Alternate spelling */
526         /* Boolean args above */
527         {-1, NULL}
528 };
529
530 struct rbd_options {
531         bool    read_only;
532 };
533
534 #define RBD_READ_ONLY_DEFAULT   false
535
536 static int parse_rbd_opts_token(char *c, void *private)
537 {
538         struct rbd_options *rbd_opts = private;
539         substring_t argstr[MAX_OPT_ARGS];
540         int token, intval, ret;
541
542         token = match_token(c, rbd_opts_tokens, argstr);
543         if (token < 0)
544                 return -EINVAL;
545
546         if (token < Opt_last_int) {
547                 ret = match_int(&argstr[0], &intval);
548                 if (ret < 0) {
549                         pr_err("bad mount option arg (not int) "
550                                "at '%s'\n", c);
551                         return ret;
552                 }
553                 dout("got int token %d val %d\n", token, intval);
554         } else if (token > Opt_last_int && token < Opt_last_string) {
555                 dout("got string token %d val %s\n", token,
556                      argstr[0].from);
557         } else if (token > Opt_last_string && token < Opt_last_bool) {
558                 dout("got Boolean token %d\n", token);
559         } else {
560                 dout("got token %d\n", token);
561         }
562
563         switch (token) {
564         case Opt_read_only:
565                 rbd_opts->read_only = true;
566                 break;
567         case Opt_read_write:
568                 rbd_opts->read_only = false;
569                 break;
570         default:
571                 rbd_assert(false);
572                 break;
573         }
574         return 0;
575 }
576
577 /*
578  * Get a ceph client with specific addr and configuration, if one does
579  * not exist create it.
580  */
581 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
582 {
583         struct rbd_client *rbdc;
584
585         rbdc = rbd_client_find(ceph_opts);
586         if (rbdc)       /* using an existing client */
587                 ceph_destroy_options(ceph_opts);
588         else
589                 rbdc = rbd_client_create(ceph_opts);
590
591         return rbdc;
592 }
593
594 /*
595  * Destroy ceph client
596  *
597  * Caller must hold rbd_client_list_lock.
598  */
599 static void rbd_client_release(struct kref *kref)
600 {
601         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
602
603         dout("%s: rbdc %p\n", __func__, rbdc);
604         spin_lock(&rbd_client_list_lock);
605         list_del(&rbdc->node);
606         spin_unlock(&rbd_client_list_lock);
607
608         ceph_destroy_client(rbdc->client);
609         kfree(rbdc);
610 }
611
612 /*
613  * Drop reference to ceph client node. If it's not referenced anymore, release
614  * it.
615  */
616 static void rbd_put_client(struct rbd_client *rbdc)
617 {
618         if (rbdc)
619                 kref_put(&rbdc->kref, rbd_client_release);
620 }
621
622 static bool rbd_image_format_valid(u32 image_format)
623 {
624         return image_format == 1 || image_format == 2;
625 }
626
627 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
628 {
629         size_t size;
630         u32 snap_count;
631
632         /* The header has to start with the magic rbd header text */
633         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
634                 return false;
635
636         /* The bio layer requires at least sector-sized I/O */
637
638         if (ondisk->options.order < SECTOR_SHIFT)
639                 return false;
640
641         /* If we use u64 in a few spots we may be able to loosen this */
642
643         if (ondisk->options.order > 8 * sizeof (int) - 1)
644                 return false;
645
646         /*
647          * The size of a snapshot header has to fit in a size_t, and
648          * that limits the number of snapshots.
649          */
650         snap_count = le32_to_cpu(ondisk->snap_count);
651         size = SIZE_MAX - sizeof (struct ceph_snap_context);
652         if (snap_count > size / sizeof (__le64))
653                 return false;
654
655         /*
656          * Not only that, but the size of the entire the snapshot
657          * header must also be representable in a size_t.
658          */
659         size -= snap_count * sizeof (__le64);
660         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
661                 return false;
662
663         return true;
664 }
665
666 /*
667  * Create a new header structure, translate header format from the on-disk
668  * header.
669  */
670 static int rbd_header_from_disk(struct rbd_image_header *header,
671                                  struct rbd_image_header_ondisk *ondisk)
672 {
673         u32 snap_count;
674         size_t len;
675         size_t size;
676         u32 i;
677
678         memset(header, 0, sizeof (*header));
679
680         snap_count = le32_to_cpu(ondisk->snap_count);
681
682         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
683         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
684         if (!header->object_prefix)
685                 return -ENOMEM;
686         memcpy(header->object_prefix, ondisk->object_prefix, len);
687         header->object_prefix[len] = '\0';
688
689         if (snap_count) {
690                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
691
692                 /* Save a copy of the snapshot names */
693
694                 if (snap_names_len > (u64) SIZE_MAX)
695                         return -EIO;
696                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
697                 if (!header->snap_names)
698                         goto out_err;
699                 /*
700                  * Note that rbd_dev_v1_header_read() guarantees
701                  * the ondisk buffer we're working with has
702                  * snap_names_len bytes beyond the end of the
703                  * snapshot id array, this memcpy() is safe.
704                  */
705                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
706                         snap_names_len);
707
708                 /* Record each snapshot's size */
709
710                 size = snap_count * sizeof (*header->snap_sizes);
711                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
712                 if (!header->snap_sizes)
713                         goto out_err;
714                 for (i = 0; i < snap_count; i++)
715                         header->snap_sizes[i] =
716                                 le64_to_cpu(ondisk->snaps[i].image_size);
717         } else {
718                 WARN_ON(ondisk->snap_names_len);
719                 header->snap_names = NULL;
720                 header->snap_sizes = NULL;
721         }
722
723         header->features = 0;   /* No features support in v1 images */
724         header->obj_order = ondisk->options.order;
725         header->crypt_type = ondisk->options.crypt_type;
726         header->comp_type = ondisk->options.comp_type;
727
728         /* Allocate and fill in the snapshot context */
729
730         header->image_size = le64_to_cpu(ondisk->image_size);
731         size = sizeof (struct ceph_snap_context);
732         size += snap_count * sizeof (header->snapc->snaps[0]);
733         header->snapc = kzalloc(size, GFP_KERNEL);
734         if (!header->snapc)
735                 goto out_err;
736
737         atomic_set(&header->snapc->nref, 1);
738         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
739         header->snapc->num_snaps = snap_count;
740         for (i = 0; i < snap_count; i++)
741                 header->snapc->snaps[i] =
742                         le64_to_cpu(ondisk->snaps[i].id);
743
744         return 0;
745
746 out_err:
747         kfree(header->snap_sizes);
748         header->snap_sizes = NULL;
749         kfree(header->snap_names);
750         header->snap_names = NULL;
751         kfree(header->object_prefix);
752         header->object_prefix = NULL;
753
754         return -ENOMEM;
755 }
756
757 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
758 {
759         struct rbd_snap *snap;
760
761         if (snap_id == CEPH_NOSNAP)
762                 return RBD_SNAP_HEAD_NAME;
763
764         list_for_each_entry(snap, &rbd_dev->snaps, node)
765                 if (snap_id == snap->id)
766                         return snap->name;
767
768         return NULL;
769 }
770
771 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
772 {
773
774         struct rbd_snap *snap;
775
776         list_for_each_entry(snap, &rbd_dev->snaps, node) {
777                 if (!strcmp(snap_name, snap->name)) {
778                         rbd_dev->spec->snap_id = snap->id;
779                         rbd_dev->mapping.size = snap->size;
780                         rbd_dev->mapping.features = snap->features;
781
782                         return 0;
783                 }
784         }
785
786         return -ENOENT;
787 }
788
789 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
790 {
791         int ret;
792
793         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
794                     sizeof (RBD_SNAP_HEAD_NAME))) {
795                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
796                 rbd_dev->mapping.size = rbd_dev->header.image_size;
797                 rbd_dev->mapping.features = rbd_dev->header.features;
798                 ret = 0;
799         } else {
800                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
801                 if (ret < 0)
802                         goto done;
803                 rbd_dev->mapping.read_only = true;
804         }
805         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
806
807 done:
808         return ret;
809 }
810
811 static void rbd_header_free(struct rbd_image_header *header)
812 {
813         kfree(header->object_prefix);
814         header->object_prefix = NULL;
815         kfree(header->snap_sizes);
816         header->snap_sizes = NULL;
817         kfree(header->snap_names);
818         header->snap_names = NULL;
819         ceph_put_snap_context(header->snapc);
820         header->snapc = NULL;
821 }
822
823 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
824 {
825         char *name;
826         u64 segment;
827         int ret;
828
829         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
830         if (!name)
831                 return NULL;
832         segment = offset >> rbd_dev->header.obj_order;
833         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
834                         rbd_dev->header.object_prefix, segment);
835         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
836                 pr_err("error formatting segment name for #%llu (%d)\n",
837                         segment, ret);
838                 kfree(name);
839                 name = NULL;
840         }
841
842         return name;
843 }
844
845 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
846 {
847         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
848
849         return offset & (segment_size - 1);
850 }
851
852 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
853                                 u64 offset, u64 length)
854 {
855         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
856
857         offset &= segment_size - 1;
858
859         rbd_assert(length <= U64_MAX - offset);
860         if (offset + length > segment_size)
861                 length = segment_size - offset;
862
863         return length;
864 }
865
866 /*
867  * returns the size of an object in the image
868  */
869 static u64 rbd_obj_bytes(struct rbd_image_header *header)
870 {
871         return 1 << header->obj_order;
872 }
873
874 /*
875  * bio helpers
876  */
877
878 static void bio_chain_put(struct bio *chain)
879 {
880         struct bio *tmp;
881
882         while (chain) {
883                 tmp = chain;
884                 chain = chain->bi_next;
885                 bio_put(tmp);
886         }
887 }
888
889 /*
890  * zeros a bio chain, starting at specific offset
891  */
892 static void zero_bio_chain(struct bio *chain, int start_ofs)
893 {
894         struct bio_vec *bv;
895         unsigned long flags;
896         void *buf;
897         int i;
898         int pos = 0;
899
900         while (chain) {
901                 bio_for_each_segment(bv, chain, i) {
902                         if (pos + bv->bv_len > start_ofs) {
903                                 int remainder = max(start_ofs - pos, 0);
904                                 buf = bvec_kmap_irq(bv, &flags);
905                                 memset(buf + remainder, 0,
906                                        bv->bv_len - remainder);
907                                 bvec_kunmap_irq(buf, &flags);
908                         }
909                         pos += bv->bv_len;
910                 }
911
912                 chain = chain->bi_next;
913         }
914 }
915
916 /*
917  * Clone a portion of a bio, starting at the given byte offset
918  * and continuing for the number of bytes indicated.
919  */
920 static struct bio *bio_clone_range(struct bio *bio_src,
921                                         unsigned int offset,
922                                         unsigned int len,
923                                         gfp_t gfpmask)
924 {
925         struct bio_vec *bv;
926         unsigned int resid;
927         unsigned short idx;
928         unsigned int voff;
929         unsigned short end_idx;
930         unsigned short vcnt;
931         struct bio *bio;
932
933         /* Handle the easy case for the caller */
934
935         if (!offset && len == bio_src->bi_size)
936                 return bio_clone(bio_src, gfpmask);
937
938         if (WARN_ON_ONCE(!len))
939                 return NULL;
940         if (WARN_ON_ONCE(len > bio_src->bi_size))
941                 return NULL;
942         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
943                 return NULL;
944
945         /* Find first affected segment... */
946
947         resid = offset;
948         __bio_for_each_segment(bv, bio_src, idx, 0) {
949                 if (resid < bv->bv_len)
950                         break;
951                 resid -= bv->bv_len;
952         }
953         voff = resid;
954
955         /* ...and the last affected segment */
956
957         resid += len;
958         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
959                 if (resid <= bv->bv_len)
960                         break;
961                 resid -= bv->bv_len;
962         }
963         vcnt = end_idx - idx + 1;
964
965         /* Build the clone */
966
967         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
968         if (!bio)
969                 return NULL;    /* ENOMEM */
970
971         bio->bi_bdev = bio_src->bi_bdev;
972         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
973         bio->bi_rw = bio_src->bi_rw;
974         bio->bi_flags |= 1 << BIO_CLONED;
975
976         /*
977          * Copy over our part of the bio_vec, then update the first
978          * and last (or only) entries.
979          */
980         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
981                         vcnt * sizeof (struct bio_vec));
982         bio->bi_io_vec[0].bv_offset += voff;
983         if (vcnt > 1) {
984                 bio->bi_io_vec[0].bv_len -= voff;
985                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
986         } else {
987                 bio->bi_io_vec[0].bv_len = len;
988         }
989
990         bio->bi_vcnt = vcnt;
991         bio->bi_size = len;
992         bio->bi_idx = 0;
993
994         return bio;
995 }
996
997 /*
998  * Clone a portion of a bio chain, starting at the given byte offset
999  * into the first bio in the source chain and continuing for the
1000  * number of bytes indicated.  The result is another bio chain of
1001  * exactly the given length, or a null pointer on error.
1002  *
1003  * The bio_src and offset parameters are both in-out.  On entry they
1004  * refer to the first source bio and the offset into that bio where
1005  * the start of data to be cloned is located.
1006  *
1007  * On return, bio_src is updated to refer to the bio in the source
1008  * chain that contains first un-cloned byte, and *offset will
1009  * contain the offset of that byte within that bio.
1010  */
1011 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1012                                         unsigned int *offset,
1013                                         unsigned int len,
1014                                         gfp_t gfpmask)
1015 {
1016         struct bio *bi = *bio_src;
1017         unsigned int off = *offset;
1018         struct bio *chain = NULL;
1019         struct bio **end;
1020
1021         /* Build up a chain of clone bios up to the limit */
1022
1023         if (!bi || off >= bi->bi_size || !len)
1024                 return NULL;            /* Nothing to clone */
1025
1026         end = &chain;
1027         while (len) {
1028                 unsigned int bi_size;
1029                 struct bio *bio;
1030
1031                 if (!bi) {
1032                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1033                         goto out_err;   /* EINVAL; ran out of bio's */
1034                 }
1035                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1036                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1037                 if (!bio)
1038                         goto out_err;   /* ENOMEM */
1039
1040                 *end = bio;
1041                 end = &bio->bi_next;
1042
1043                 off += bi_size;
1044                 if (off == bi->bi_size) {
1045                         bi = bi->bi_next;
1046                         off = 0;
1047                 }
1048                 len -= bi_size;
1049         }
1050         *bio_src = bi;
1051         *offset = off;
1052
1053         return chain;
1054 out_err:
1055         bio_chain_put(chain);
1056
1057         return NULL;
1058 }
1059
1060 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1061 {
1062         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1063                 atomic_read(&obj_request->kref.refcount));
1064         kref_get(&obj_request->kref);
1065 }
1066
1067 static void rbd_obj_request_destroy(struct kref *kref);
1068 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1069 {
1070         rbd_assert(obj_request != NULL);
1071         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1072                 atomic_read(&obj_request->kref.refcount));
1073         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1074 }
1075
1076 static void rbd_img_request_get(struct rbd_img_request *img_request)
1077 {
1078         dout("%s: img %p (was %d)\n", __func__, img_request,
1079                 atomic_read(&img_request->kref.refcount));
1080         kref_get(&img_request->kref);
1081 }
1082
1083 static void rbd_img_request_destroy(struct kref *kref);
1084 static void rbd_img_request_put(struct rbd_img_request *img_request)
1085 {
1086         rbd_assert(img_request != NULL);
1087         dout("%s: img %p (was %d)\n", __func__, img_request,
1088                 atomic_read(&img_request->kref.refcount));
1089         kref_put(&img_request->kref, rbd_img_request_destroy);
1090 }
1091
1092 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1093                                         struct rbd_obj_request *obj_request)
1094 {
1095         rbd_assert(obj_request->img_request == NULL);
1096
1097         rbd_obj_request_get(obj_request);
1098         obj_request->img_request = img_request;
1099         obj_request->which = img_request->obj_request_count;
1100         rbd_assert(obj_request->which != BAD_WHICH);
1101         img_request->obj_request_count++;
1102         list_add_tail(&obj_request->links, &img_request->obj_requests);
1103         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1104                 obj_request->which);
1105 }
1106
1107 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1108                                         struct rbd_obj_request *obj_request)
1109 {
1110         rbd_assert(obj_request->which != BAD_WHICH);
1111
1112         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1113                 obj_request->which);
1114         list_del(&obj_request->links);
1115         rbd_assert(img_request->obj_request_count > 0);
1116         img_request->obj_request_count--;
1117         rbd_assert(obj_request->which == img_request->obj_request_count);
1118         obj_request->which = BAD_WHICH;
1119         rbd_assert(obj_request->img_request == img_request);
1120         obj_request->img_request = NULL;
1121         obj_request->callback = NULL;
1122         rbd_obj_request_put(obj_request);
1123 }
1124
1125 static bool obj_request_type_valid(enum obj_request_type type)
1126 {
1127         switch (type) {
1128         case OBJ_REQUEST_NODATA:
1129         case OBJ_REQUEST_BIO:
1130         case OBJ_REQUEST_PAGES:
1131                 return true;
1132         default:
1133                 return false;
1134         }
1135 }
1136
1137 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1138                                 struct rbd_obj_request *obj_request)
1139 {
1140         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1141
1142         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1143 }
1144
1145 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1146 {
1147         dout("%s: img %p\n", __func__, img_request);
1148         if (img_request->callback)
1149                 img_request->callback(img_request);
1150         else
1151                 rbd_img_request_put(img_request);
1152 }
1153
1154 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1155
1156 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1157 {
1158         dout("%s: obj %p\n", __func__, obj_request);
1159
1160         return wait_for_completion_interruptible(&obj_request->completion);
1161 }
1162
1163 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1164 {
1165         atomic_set(&obj_request->done, 0);
1166         smp_wmb();
1167 }
1168
1169 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1170 {
1171         int done;
1172
1173         done = atomic_inc_return(&obj_request->done);
1174         if (done > 1) {
1175                 struct rbd_img_request *img_request = obj_request->img_request;
1176                 struct rbd_device *rbd_dev;
1177
1178                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1179                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1180                         obj_request);
1181         }
1182 }
1183
1184 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1185 {
1186         smp_mb();
1187         return atomic_read(&obj_request->done) != 0;
1188 }
1189
1190 static void
1191 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1192 {
1193         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1194                 obj_request, obj_request->img_request, obj_request->result,
1195                 obj_request->xferred, obj_request->length);
1196         /*
1197          * ENOENT means a hole in the image.  We zero-fill the
1198          * entire length of the request.  A short read also implies
1199          * zero-fill to the end of the request.  Either way we
1200          * update the xferred count to indicate the whole request
1201          * was satisfied.
1202          */
1203         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1204         if (obj_request->result == -ENOENT) {
1205                 zero_bio_chain(obj_request->bio_list, 0);
1206                 obj_request->result = 0;
1207                 obj_request->xferred = obj_request->length;
1208         } else if (obj_request->xferred < obj_request->length &&
1209                         !obj_request->result) {
1210                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1211                 obj_request->xferred = obj_request->length;
1212         }
1213         obj_request_done_set(obj_request);
1214 }
1215
1216 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1217 {
1218         dout("%s: obj %p cb %p\n", __func__, obj_request,
1219                 obj_request->callback);
1220         if (obj_request->callback)
1221                 obj_request->callback(obj_request);
1222         else
1223                 complete_all(&obj_request->completion);
1224 }
1225
1226 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1227 {
1228         dout("%s: obj %p\n", __func__, obj_request);
1229         obj_request_done_set(obj_request);
1230 }
1231
1232 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1233 {
1234         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1235                 obj_request->result, obj_request->xferred, obj_request->length);
1236         if (obj_request->img_request)
1237                 rbd_img_obj_request_read_callback(obj_request);
1238         else
1239                 obj_request_done_set(obj_request);
1240 }
1241
1242 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1243 {
1244         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1245                 obj_request->result, obj_request->length);
1246         /*
1247          * There is no such thing as a successful short write.
1248          * Our xferred value is the number of bytes transferred
1249          * back.  Set it to our originally-requested length.
1250          */
1251         obj_request->xferred = obj_request->length;
1252         obj_request_done_set(obj_request);
1253 }
1254
1255 /*
1256  * For a simple stat call there's nothing to do.  We'll do more if
1257  * this is part of a write sequence for a layered image.
1258  */
1259 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1260 {
1261         dout("%s: obj %p\n", __func__, obj_request);
1262         obj_request_done_set(obj_request);
1263 }
1264
1265 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1266                                 struct ceph_msg *msg)
1267 {
1268         struct rbd_obj_request *obj_request = osd_req->r_priv;
1269         u16 opcode;
1270
1271         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1272         rbd_assert(osd_req == obj_request->osd_req);
1273         rbd_assert(!!obj_request->img_request ^
1274                                 (obj_request->which == BAD_WHICH));
1275
1276         if (osd_req->r_result < 0)
1277                 obj_request->result = osd_req->r_result;
1278         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1279
1280         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1281
1282         /*
1283          * We support a 64-bit length, but ultimately it has to be
1284          * passed to blk_end_request(), which takes an unsigned int.
1285          */
1286         obj_request->xferred = osd_req->r_reply_op_len[0];
1287         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1288         opcode = osd_req->r_request_ops[0].op;
1289         switch (opcode) {
1290         case CEPH_OSD_OP_READ:
1291                 rbd_osd_read_callback(obj_request);
1292                 break;
1293         case CEPH_OSD_OP_WRITE:
1294                 rbd_osd_write_callback(obj_request);
1295                 break;
1296         case CEPH_OSD_OP_STAT:
1297                 rbd_osd_stat_callback(obj_request);
1298                 break;
1299         case CEPH_OSD_OP_CALL:
1300         case CEPH_OSD_OP_NOTIFY_ACK:
1301         case CEPH_OSD_OP_WATCH:
1302                 rbd_osd_trivial_callback(obj_request);
1303                 break;
1304         default:
1305                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1306                         obj_request->object_name, (unsigned short) opcode);
1307                 break;
1308         }
1309
1310         if (obj_request_done_test(obj_request))
1311                 rbd_obj_request_complete(obj_request);
1312 }
1313
1314 static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request,
1315                                         bool write_request,
1316                                         struct ceph_osd_req_op *op)
1317 {
1318         struct rbd_img_request *img_request = obj_request->img_request;
1319         struct ceph_snap_context *snapc = NULL;
1320         u64 snap_id = CEPH_NOSNAP;
1321         struct timespec *mtime = NULL;
1322         struct timespec now;
1323
1324         rbd_assert(obj_request->osd_req != NULL);
1325
1326         if (write_request) {
1327                 now = CURRENT_TIME;
1328                 mtime = &now;
1329                 if (img_request)
1330                         snapc = img_request->snapc;
1331         } else if (img_request) {
1332                 snap_id = img_request->snap_id;
1333         }
1334
1335         ceph_osdc_build_request(obj_request->osd_req, obj_request->offset,
1336                         1, op, snapc, snap_id, mtime);
1337 }
1338
1339 static struct ceph_osd_request *rbd_osd_req_create(
1340                                         struct rbd_device *rbd_dev,
1341                                         bool write_request,
1342                                         struct rbd_obj_request *obj_request)
1343 {
1344         struct rbd_img_request *img_request = obj_request->img_request;
1345         struct ceph_snap_context *snapc = NULL;
1346         struct ceph_osd_client *osdc;
1347         struct ceph_osd_request *osd_req;
1348         struct ceph_osd_data *osd_data;
1349         u64 offset = obj_request->offset;
1350
1351         if (img_request) {
1352                 rbd_assert(img_request->write_request == write_request);
1353                 if (img_request->write_request)
1354                         snapc = img_request->snapc;
1355         }
1356
1357         /* Allocate and initialize the request, for the single op */
1358
1359         osdc = &rbd_dev->rbd_client->client->osdc;
1360         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1361         if (!osd_req)
1362                 return NULL;    /* ENOMEM */
1363         osd_data = write_request ? &osd_req->r_data_out : &osd_req->r_data_in;
1364
1365         rbd_assert(obj_request_type_valid(obj_request->type));
1366         switch (obj_request->type) {
1367         case OBJ_REQUEST_NODATA:
1368                 break;          /* Nothing to do */
1369         case OBJ_REQUEST_BIO:
1370                 rbd_assert(obj_request->bio_list != NULL);
1371                 ceph_osd_data_bio_init(osd_data, obj_request->bio_list,
1372                                         obj_request->length);
1373                 break;
1374         case OBJ_REQUEST_PAGES:
1375                 ceph_osd_data_pages_init(osd_data, obj_request->pages,
1376                                 obj_request->length, offset & ~PAGE_MASK,
1377                                 false, false);
1378                 break;
1379         }
1380
1381         if (write_request)
1382                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1383         else
1384                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1385
1386         osd_req->r_callback = rbd_osd_req_callback;
1387         osd_req->r_priv = obj_request;
1388
1389         osd_req->r_oid_len = strlen(obj_request->object_name);
1390         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1391         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1392
1393         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1394
1395         return osd_req;
1396 }
1397
1398 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1399 {
1400         ceph_osdc_put_request(osd_req);
1401 }
1402
1403 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1404
1405 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1406                                                 u64 offset, u64 length,
1407                                                 enum obj_request_type type)
1408 {
1409         struct rbd_obj_request *obj_request;
1410         size_t size;
1411         char *name;
1412
1413         rbd_assert(obj_request_type_valid(type));
1414
1415         size = strlen(object_name) + 1;
1416         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1417         if (!obj_request)
1418                 return NULL;
1419
1420         name = (char *)(obj_request + 1);
1421         obj_request->object_name = memcpy(name, object_name, size);
1422         obj_request->offset = offset;
1423         obj_request->length = length;
1424         obj_request->which = BAD_WHICH;
1425         obj_request->type = type;
1426         INIT_LIST_HEAD(&obj_request->links);
1427         obj_request_done_init(obj_request);
1428         init_completion(&obj_request->completion);
1429         kref_init(&obj_request->kref);
1430
1431         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1432                 offset, length, (int)type, obj_request);
1433
1434         return obj_request;
1435 }
1436
1437 static void rbd_obj_request_destroy(struct kref *kref)
1438 {
1439         struct rbd_obj_request *obj_request;
1440
1441         obj_request = container_of(kref, struct rbd_obj_request, kref);
1442
1443         dout("%s: obj %p\n", __func__, obj_request);
1444
1445         rbd_assert(obj_request->img_request == NULL);
1446         rbd_assert(obj_request->which == BAD_WHICH);
1447
1448         if (obj_request->osd_req)
1449                 rbd_osd_req_destroy(obj_request->osd_req);
1450
1451         rbd_assert(obj_request_type_valid(obj_request->type));
1452         switch (obj_request->type) {
1453         case OBJ_REQUEST_NODATA:
1454                 break;          /* Nothing to do */
1455         case OBJ_REQUEST_BIO:
1456                 if (obj_request->bio_list)
1457                         bio_chain_put(obj_request->bio_list);
1458                 break;
1459         case OBJ_REQUEST_PAGES:
1460                 if (obj_request->pages)
1461                         ceph_release_page_vector(obj_request->pages,
1462                                                 obj_request->page_count);
1463                 break;
1464         }
1465
1466         kfree(obj_request);
1467 }
1468
1469 /*
1470  * Caller is responsible for filling in the list of object requests
1471  * that comprises the image request, and the Linux request pointer
1472  * (if there is one).
1473  */
1474 static struct rbd_img_request *rbd_img_request_create(
1475                                         struct rbd_device *rbd_dev,
1476                                         u64 offset, u64 length,
1477                                         bool write_request)
1478 {
1479         struct rbd_img_request *img_request;
1480         struct ceph_snap_context *snapc = NULL;
1481
1482         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1483         if (!img_request)
1484                 return NULL;
1485
1486         if (write_request) {
1487                 down_read(&rbd_dev->header_rwsem);
1488                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1489                 up_read(&rbd_dev->header_rwsem);
1490                 if (WARN_ON(!snapc)) {
1491                         kfree(img_request);
1492                         return NULL;    /* Shouldn't happen */
1493                 }
1494         }
1495
1496         img_request->rq = NULL;
1497         img_request->rbd_dev = rbd_dev;
1498         img_request->offset = offset;
1499         img_request->length = length;
1500         img_request->write_request = write_request;
1501         if (write_request)
1502                 img_request->snapc = snapc;
1503         else
1504                 img_request->snap_id = rbd_dev->spec->snap_id;
1505         spin_lock_init(&img_request->completion_lock);
1506         img_request->next_completion = 0;
1507         img_request->callback = NULL;
1508         img_request->obj_request_count = 0;
1509         INIT_LIST_HEAD(&img_request->obj_requests);
1510         kref_init(&img_request->kref);
1511
1512         rbd_img_request_get(img_request);       /* Avoid a warning */
1513         rbd_img_request_put(img_request);       /* TEMPORARY */
1514
1515         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1516                 write_request ? "write" : "read", offset, length,
1517                 img_request);
1518
1519         return img_request;
1520 }
1521
1522 static void rbd_img_request_destroy(struct kref *kref)
1523 {
1524         struct rbd_img_request *img_request;
1525         struct rbd_obj_request *obj_request;
1526         struct rbd_obj_request *next_obj_request;
1527
1528         img_request = container_of(kref, struct rbd_img_request, kref);
1529
1530         dout("%s: img %p\n", __func__, img_request);
1531
1532         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1533                 rbd_img_obj_request_del(img_request, obj_request);
1534         rbd_assert(img_request->obj_request_count == 0);
1535
1536         if (img_request->write_request)
1537                 ceph_put_snap_context(img_request->snapc);
1538
1539         kfree(img_request);
1540 }
1541
1542 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1543                                         struct bio *bio_list)
1544 {
1545         struct rbd_device *rbd_dev = img_request->rbd_dev;
1546         struct rbd_obj_request *obj_request = NULL;
1547         struct rbd_obj_request *next_obj_request;
1548         bool write_request = img_request->write_request;
1549         unsigned int bio_offset;
1550         u64 image_offset;
1551         u64 resid;
1552         u16 opcode;
1553
1554         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1555
1556         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1557         bio_offset = 0;
1558         image_offset = img_request->offset;
1559         rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1560         resid = img_request->length;
1561         rbd_assert(resid > 0);
1562         while (resid) {
1563                 const char *object_name;
1564                 unsigned int clone_size;
1565                 struct ceph_osd_req_op op;
1566                 u64 offset;
1567                 u64 length;
1568
1569                 object_name = rbd_segment_name(rbd_dev, image_offset);
1570                 if (!object_name)
1571                         goto out_unwind;
1572                 offset = rbd_segment_offset(rbd_dev, image_offset);
1573                 length = rbd_segment_length(rbd_dev, image_offset, resid);
1574                 obj_request = rbd_obj_request_create(object_name,
1575                                                 offset, length,
1576                                                 OBJ_REQUEST_BIO);
1577                 kfree(object_name);     /* object request has its own copy */
1578                 if (!obj_request)
1579                         goto out_unwind;
1580
1581                 rbd_assert(length <= (u64) UINT_MAX);
1582                 clone_size = (unsigned int) length;
1583                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1584                                                 &bio_offset, clone_size,
1585                                                 GFP_ATOMIC);
1586                 if (!obj_request->bio_list)
1587                         goto out_partial;
1588
1589                 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1590                                                 write_request, obj_request);
1591                 if (!obj_request->osd_req)
1592                         goto out_partial;
1593
1594                 osd_req_op_extent_init(&op, opcode, offset, length, 0, 0);
1595                 rbd_osd_req_format_op(obj_request, write_request, &op);
1596
1597                 /* status and version are initially zero-filled */
1598
1599                 rbd_img_obj_request_add(img_request, obj_request);
1600
1601                 image_offset += length;
1602                 resid -= length;
1603         }
1604
1605         return 0;
1606
1607 out_partial:
1608         rbd_obj_request_put(obj_request);
1609 out_unwind:
1610         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1611                 rbd_obj_request_put(obj_request);
1612
1613         return -ENOMEM;
1614 }
1615
1616 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1617 {
1618         struct rbd_img_request *img_request;
1619         u32 which = obj_request->which;
1620         bool more = true;
1621
1622         img_request = obj_request->img_request;
1623
1624         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1625         rbd_assert(img_request != NULL);
1626         rbd_assert(img_request->rq != NULL);
1627         rbd_assert(img_request->obj_request_count > 0);
1628         rbd_assert(which != BAD_WHICH);
1629         rbd_assert(which < img_request->obj_request_count);
1630         rbd_assert(which >= img_request->next_completion);
1631
1632         spin_lock_irq(&img_request->completion_lock);
1633         if (which != img_request->next_completion)
1634                 goto out;
1635
1636         for_each_obj_request_from(img_request, obj_request) {
1637                 unsigned int xferred;
1638                 int result;
1639
1640                 rbd_assert(more);
1641                 rbd_assert(which < img_request->obj_request_count);
1642
1643                 if (!obj_request_done_test(obj_request))
1644                         break;
1645
1646                 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1647                 xferred = (unsigned int) obj_request->xferred;
1648                 result = (int) obj_request->result;
1649                 if (result)
1650                         rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1651                                 img_request->write_request ? "write" : "read",
1652                                 result, xferred);
1653
1654                 more = blk_end_request(img_request->rq, result, xferred);
1655                 which++;
1656         }
1657
1658         rbd_assert(more ^ (which == img_request->obj_request_count));
1659         img_request->next_completion = which;
1660 out:
1661         spin_unlock_irq(&img_request->completion_lock);
1662
1663         if (!more)
1664                 rbd_img_request_complete(img_request);
1665 }
1666
1667 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1668 {
1669         struct rbd_device *rbd_dev = img_request->rbd_dev;
1670         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1671         struct rbd_obj_request *obj_request;
1672         struct rbd_obj_request *next_obj_request;
1673
1674         dout("%s: img %p\n", __func__, img_request);
1675         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1676                 int ret;
1677
1678                 obj_request->callback = rbd_img_obj_callback;
1679                 ret = rbd_obj_request_submit(osdc, obj_request);
1680                 if (ret)
1681                         return ret;
1682                 /*
1683                  * The image request has its own reference to each
1684                  * of its object requests, so we can safely drop the
1685                  * initial one here.
1686                  */
1687                 rbd_obj_request_put(obj_request);
1688         }
1689
1690         return 0;
1691 }
1692
1693 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1694                                    u64 ver, u64 notify_id)
1695 {
1696         struct rbd_obj_request *obj_request;
1697         struct ceph_osd_req_op op;
1698         struct ceph_osd_client *osdc;
1699         int ret;
1700
1701         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1702                                                         OBJ_REQUEST_NODATA);
1703         if (!obj_request)
1704                 return -ENOMEM;
1705
1706         ret = -ENOMEM;
1707         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1708         if (!obj_request->osd_req)
1709                 goto out;
1710
1711         osd_req_op_watch_init(&op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
1712         rbd_osd_req_format_op(obj_request, false, &op);
1713
1714         osdc = &rbd_dev->rbd_client->client->osdc;
1715         obj_request->callback = rbd_obj_request_put;
1716         ret = rbd_obj_request_submit(osdc, obj_request);
1717 out:
1718         if (ret)
1719                 rbd_obj_request_put(obj_request);
1720
1721         return ret;
1722 }
1723
1724 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1725 {
1726         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1727         u64 hver;
1728         int rc;
1729
1730         if (!rbd_dev)
1731                 return;
1732
1733         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1734                 rbd_dev->header_name, (unsigned long long) notify_id,
1735                 (unsigned int) opcode);
1736         rc = rbd_dev_refresh(rbd_dev, &hver);
1737         if (rc)
1738                 rbd_warn(rbd_dev, "got notification but failed to "
1739                            " update snaps: %d\n", rc);
1740
1741         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1742 }
1743
1744 /*
1745  * Request sync osd watch/unwatch.  The value of "start" determines
1746  * whether a watch request is being initiated or torn down.
1747  */
1748 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1749 {
1750         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1751         struct rbd_obj_request *obj_request;
1752         struct ceph_osd_req_op op;
1753         int ret;
1754
1755         rbd_assert(start ^ !!rbd_dev->watch_event);
1756         rbd_assert(start ^ !!rbd_dev->watch_request);
1757
1758         if (start) {
1759                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1760                                                 &rbd_dev->watch_event);
1761                 if (ret < 0)
1762                         return ret;
1763                 rbd_assert(rbd_dev->watch_event != NULL);
1764         }
1765
1766         ret = -ENOMEM;
1767         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1768                                                         OBJ_REQUEST_NODATA);
1769         if (!obj_request)
1770                 goto out_cancel;
1771
1772         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1773         if (!obj_request->osd_req)
1774                 goto out_cancel;
1775
1776         osd_req_op_watch_init(&op, CEPH_OSD_OP_WATCH,
1777                                 rbd_dev->watch_event->cookie,
1778                                 rbd_dev->header.obj_version, start);
1779         rbd_osd_req_format_op(obj_request, true, &op);
1780
1781         if (start)
1782                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1783         else
1784                 ceph_osdc_unregister_linger_request(osdc,
1785                                         rbd_dev->watch_request->osd_req);
1786         ret = rbd_obj_request_submit(osdc, obj_request);
1787         if (ret)
1788                 goto out_cancel;
1789         ret = rbd_obj_request_wait(obj_request);
1790         if (ret)
1791                 goto out_cancel;
1792         ret = obj_request->result;
1793         if (ret)
1794                 goto out_cancel;
1795
1796         /*
1797          * A watch request is set to linger, so the underlying osd
1798          * request won't go away until we unregister it.  We retain
1799          * a pointer to the object request during that time (in
1800          * rbd_dev->watch_request), so we'll keep a reference to
1801          * it.  We'll drop that reference (below) after we've
1802          * unregistered it.
1803          */
1804         if (start) {
1805                 rbd_dev->watch_request = obj_request;
1806
1807                 return 0;
1808         }
1809
1810         /* We have successfully torn down the watch request */
1811
1812         rbd_obj_request_put(rbd_dev->watch_request);
1813         rbd_dev->watch_request = NULL;
1814 out_cancel:
1815         /* Cancel the event if we're tearing down, or on error */
1816         ceph_osdc_cancel_event(rbd_dev->watch_event);
1817         rbd_dev->watch_event = NULL;
1818         if (obj_request)
1819                 rbd_obj_request_put(obj_request);
1820
1821         return ret;
1822 }
1823
1824 /*
1825  * Synchronous osd object method call
1826  */
1827 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1828                              const char *object_name,
1829                              const char *class_name,
1830                              const char *method_name,
1831                              const char *outbound,
1832                              size_t outbound_size,
1833                              char *inbound,
1834                              size_t inbound_size,
1835                              u64 *version)
1836 {
1837         struct rbd_obj_request *obj_request;
1838         struct ceph_osd_client *osdc;
1839         struct ceph_osd_req_op op;
1840         struct page **pages;
1841         u32 page_count;
1842         int ret;
1843
1844         /*
1845          * Method calls are ultimately read operations.  The result
1846          * should placed into the inbound buffer provided.  They
1847          * also supply outbound data--parameters for the object
1848          * method.  Currently if this is present it will be a
1849          * snapshot id.
1850          */
1851         page_count = (u32) calc_pages_for(0, inbound_size);
1852         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1853         if (IS_ERR(pages))
1854                 return PTR_ERR(pages);
1855
1856         ret = -ENOMEM;
1857         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1858                                                         OBJ_REQUEST_PAGES);
1859         if (!obj_request)
1860                 goto out;
1861
1862         obj_request->pages = pages;
1863         obj_request->page_count = page_count;
1864
1865         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1866         if (!obj_request->osd_req)
1867                 goto out;
1868
1869         osd_req_op_cls_init(&op, CEPH_OSD_OP_CALL, class_name, method_name,
1870                                         outbound, outbound_size);
1871         rbd_osd_req_format_op(obj_request, false, &op);
1872
1873         osdc = &rbd_dev->rbd_client->client->osdc;
1874         ret = rbd_obj_request_submit(osdc, obj_request);
1875         if (ret)
1876                 goto out;
1877         ret = rbd_obj_request_wait(obj_request);
1878         if (ret)
1879                 goto out;
1880
1881         ret = obj_request->result;
1882         if (ret < 0)
1883                 goto out;
1884         ret = 0;
1885         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1886         if (version)
1887                 *version = obj_request->version;
1888 out:
1889         if (obj_request)
1890                 rbd_obj_request_put(obj_request);
1891         else
1892                 ceph_release_page_vector(pages, page_count);
1893
1894         return ret;
1895 }
1896
1897 static void rbd_request_fn(struct request_queue *q)
1898                 __releases(q->queue_lock) __acquires(q->queue_lock)
1899 {
1900         struct rbd_device *rbd_dev = q->queuedata;
1901         bool read_only = rbd_dev->mapping.read_only;
1902         struct request *rq;
1903         int result;
1904
1905         while ((rq = blk_fetch_request(q))) {
1906                 bool write_request = rq_data_dir(rq) == WRITE;
1907                 struct rbd_img_request *img_request;
1908                 u64 offset;
1909                 u64 length;
1910
1911                 /* Ignore any non-FS requests that filter through. */
1912
1913                 if (rq->cmd_type != REQ_TYPE_FS) {
1914                         dout("%s: non-fs request type %d\n", __func__,
1915                                 (int) rq->cmd_type);
1916                         __blk_end_request_all(rq, 0);
1917                         continue;
1918                 }
1919
1920                 /* Ignore/skip any zero-length requests */
1921
1922                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1923                 length = (u64) blk_rq_bytes(rq);
1924
1925                 if (!length) {
1926                         dout("%s: zero-length request\n", __func__);
1927                         __blk_end_request_all(rq, 0);
1928                         continue;
1929                 }
1930
1931                 spin_unlock_irq(q->queue_lock);
1932
1933                 /* Disallow writes to a read-only device */
1934
1935                 if (write_request) {
1936                         result = -EROFS;
1937                         if (read_only)
1938                                 goto end_request;
1939                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
1940                 }
1941
1942                 /*
1943                  * Quit early if the mapped snapshot no longer
1944                  * exists.  It's still possible the snapshot will
1945                  * have disappeared by the time our request arrives
1946                  * at the osd, but there's no sense in sending it if
1947                  * we already know.
1948                  */
1949                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
1950                         dout("request for non-existent snapshot");
1951                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1952                         result = -ENXIO;
1953                         goto end_request;
1954                 }
1955
1956                 result = -EINVAL;
1957                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
1958                         goto end_request;       /* Shouldn't happen */
1959
1960                 result = -ENOMEM;
1961                 img_request = rbd_img_request_create(rbd_dev, offset, length,
1962                                                         write_request);
1963                 if (!img_request)
1964                         goto end_request;
1965
1966                 img_request->rq = rq;
1967
1968                 result = rbd_img_request_fill_bio(img_request, rq->bio);
1969                 if (!result)
1970                         result = rbd_img_request_submit(img_request);
1971                 if (result)
1972                         rbd_img_request_put(img_request);
1973 end_request:
1974                 spin_lock_irq(q->queue_lock);
1975                 if (result < 0) {
1976                         rbd_warn(rbd_dev, "obj_request %s result %d\n",
1977                                 write_request ? "write" : "read", result);
1978                         __blk_end_request_all(rq, result);
1979                 }
1980         }
1981 }
1982
1983 /*
1984  * a queue callback. Makes sure that we don't create a bio that spans across
1985  * multiple osd objects. One exception would be with a single page bios,
1986  * which we handle later at bio_chain_clone_range()
1987  */
1988 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1989                           struct bio_vec *bvec)
1990 {
1991         struct rbd_device *rbd_dev = q->queuedata;
1992         sector_t sector_offset;
1993         sector_t sectors_per_obj;
1994         sector_t obj_sector_offset;
1995         int ret;
1996
1997         /*
1998          * Find how far into its rbd object the partition-relative
1999          * bio start sector is to offset relative to the enclosing
2000          * device.
2001          */
2002         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2003         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2004         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2005
2006         /*
2007          * Compute the number of bytes from that offset to the end
2008          * of the object.  Account for what's already used by the bio.
2009          */
2010         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2011         if (ret > bmd->bi_size)
2012                 ret -= bmd->bi_size;
2013         else
2014                 ret = 0;
2015
2016         /*
2017          * Don't send back more than was asked for.  And if the bio
2018          * was empty, let the whole thing through because:  "Note
2019          * that a block device *must* allow a single page to be
2020          * added to an empty bio."
2021          */
2022         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2023         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2024                 ret = (int) bvec->bv_len;
2025
2026         return ret;
2027 }
2028
2029 static void rbd_free_disk(struct rbd_device *rbd_dev)
2030 {
2031         struct gendisk *disk = rbd_dev->disk;
2032
2033         if (!disk)
2034                 return;
2035
2036         if (disk->flags & GENHD_FL_UP)
2037                 del_gendisk(disk);
2038         if (disk->queue)
2039                 blk_cleanup_queue(disk->queue);
2040         put_disk(disk);
2041 }
2042
2043 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2044                                 const char *object_name,
2045                                 u64 offset, u64 length,
2046                                 char *buf, u64 *version)
2047
2048 {
2049         struct ceph_osd_req_op op;
2050         struct rbd_obj_request *obj_request;
2051         struct ceph_osd_client *osdc;
2052         struct page **pages = NULL;
2053         u32 page_count;
2054         size_t size;
2055         int ret;
2056
2057         page_count = (u32) calc_pages_for(offset, length);
2058         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2059         if (IS_ERR(pages))
2060                 ret = PTR_ERR(pages);
2061
2062         ret = -ENOMEM;
2063         obj_request = rbd_obj_request_create(object_name, offset, length,
2064                                                         OBJ_REQUEST_PAGES);
2065         if (!obj_request)
2066                 goto out;
2067
2068         obj_request->pages = pages;
2069         obj_request->page_count = page_count;
2070
2071         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2072         if (!obj_request->osd_req)
2073                 goto out;
2074
2075         osd_req_op_extent_init(&op, CEPH_OSD_OP_READ, offset, length, 0, 0);
2076         rbd_osd_req_format_op(obj_request, false, &op);
2077
2078         osdc = &rbd_dev->rbd_client->client->osdc;
2079         ret = rbd_obj_request_submit(osdc, obj_request);
2080         if (ret)
2081                 goto out;
2082         ret = rbd_obj_request_wait(obj_request);
2083         if (ret)
2084                 goto out;
2085
2086         ret = obj_request->result;
2087         if (ret < 0)
2088                 goto out;
2089
2090         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2091         size = (size_t) obj_request->xferred;
2092         ceph_copy_from_page_vector(pages, buf, 0, size);
2093         rbd_assert(size <= (size_t) INT_MAX);
2094         ret = (int) size;
2095         if (version)
2096                 *version = obj_request->version;
2097 out:
2098         if (obj_request)
2099                 rbd_obj_request_put(obj_request);
2100         else
2101                 ceph_release_page_vector(pages, page_count);
2102
2103         return ret;
2104 }
2105
2106 /*
2107  * Read the complete header for the given rbd device.
2108  *
2109  * Returns a pointer to a dynamically-allocated buffer containing
2110  * the complete and validated header.  Caller can pass the address
2111  * of a variable that will be filled in with the version of the
2112  * header object at the time it was read.
2113  *
2114  * Returns a pointer-coded errno if a failure occurs.
2115  */
2116 static struct rbd_image_header_ondisk *
2117 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2118 {
2119         struct rbd_image_header_ondisk *ondisk = NULL;
2120         u32 snap_count = 0;
2121         u64 names_size = 0;
2122         u32 want_count;
2123         int ret;
2124
2125         /*
2126          * The complete header will include an array of its 64-bit
2127          * snapshot ids, followed by the names of those snapshots as
2128          * a contiguous block of NUL-terminated strings.  Note that
2129          * the number of snapshots could change by the time we read
2130          * it in, in which case we re-read it.
2131          */
2132         do {
2133                 size_t size;
2134
2135                 kfree(ondisk);
2136
2137                 size = sizeof (*ondisk);
2138                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2139                 size += names_size;
2140                 ondisk = kmalloc(size, GFP_KERNEL);
2141                 if (!ondisk)
2142                         return ERR_PTR(-ENOMEM);
2143
2144                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2145                                        0, size,
2146                                        (char *) ondisk, version);
2147                 if (ret < 0)
2148                         goto out_err;
2149                 if (WARN_ON((size_t) ret < size)) {
2150                         ret = -ENXIO;
2151                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2152                                 size, ret);
2153                         goto out_err;
2154                 }
2155                 if (!rbd_dev_ondisk_valid(ondisk)) {
2156                         ret = -ENXIO;
2157                         rbd_warn(rbd_dev, "invalid header");
2158                         goto out_err;
2159                 }
2160
2161                 names_size = le64_to_cpu(ondisk->snap_names_len);
2162                 want_count = snap_count;
2163                 snap_count = le32_to_cpu(ondisk->snap_count);
2164         } while (snap_count != want_count);
2165
2166         return ondisk;
2167
2168 out_err:
2169         kfree(ondisk);
2170
2171         return ERR_PTR(ret);
2172 }
2173
2174 /*
2175  * reload the ondisk the header
2176  */
2177 static int rbd_read_header(struct rbd_device *rbd_dev,
2178                            struct rbd_image_header *header)
2179 {
2180         struct rbd_image_header_ondisk *ondisk;
2181         u64 ver = 0;
2182         int ret;
2183
2184         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2185         if (IS_ERR(ondisk))
2186                 return PTR_ERR(ondisk);
2187         ret = rbd_header_from_disk(header, ondisk);
2188         if (ret >= 0)
2189                 header->obj_version = ver;
2190         kfree(ondisk);
2191
2192         return ret;
2193 }
2194
2195 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2196 {
2197         struct rbd_snap *snap;
2198         struct rbd_snap *next;
2199
2200         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2201                 rbd_remove_snap_dev(snap);
2202 }
2203
2204 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2205 {
2206         sector_t size;
2207
2208         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2209                 return;
2210
2211         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2212         dout("setting size to %llu sectors", (unsigned long long) size);
2213         rbd_dev->mapping.size = (u64) size;
2214         set_capacity(rbd_dev->disk, size);
2215 }
2216
2217 /*
2218  * only read the first part of the ondisk header, without the snaps info
2219  */
2220 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2221 {
2222         int ret;
2223         struct rbd_image_header h;
2224
2225         ret = rbd_read_header(rbd_dev, &h);
2226         if (ret < 0)
2227                 return ret;
2228
2229         down_write(&rbd_dev->header_rwsem);
2230
2231         /* Update image size, and check for resize of mapped image */
2232         rbd_dev->header.image_size = h.image_size;
2233         rbd_update_mapping_size(rbd_dev);
2234
2235         /* rbd_dev->header.object_prefix shouldn't change */
2236         kfree(rbd_dev->header.snap_sizes);
2237         kfree(rbd_dev->header.snap_names);
2238         /* osd requests may still refer to snapc */
2239         ceph_put_snap_context(rbd_dev->header.snapc);
2240
2241         if (hver)
2242                 *hver = h.obj_version;
2243         rbd_dev->header.obj_version = h.obj_version;
2244         rbd_dev->header.image_size = h.image_size;
2245         rbd_dev->header.snapc = h.snapc;
2246         rbd_dev->header.snap_names = h.snap_names;
2247         rbd_dev->header.snap_sizes = h.snap_sizes;
2248         /* Free the extra copy of the object prefix */
2249         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2250         kfree(h.object_prefix);
2251
2252         ret = rbd_dev_snaps_update(rbd_dev);
2253         if (!ret)
2254                 ret = rbd_dev_snaps_register(rbd_dev);
2255
2256         up_write(&rbd_dev->header_rwsem);
2257
2258         return ret;
2259 }
2260
2261 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2262 {
2263         int ret;
2264
2265         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2266         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2267         if (rbd_dev->image_format == 1)
2268                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2269         else
2270                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2271         mutex_unlock(&ctl_mutex);
2272
2273         return ret;
2274 }
2275
2276 static int rbd_init_disk(struct rbd_device *rbd_dev)
2277 {
2278         struct gendisk *disk;
2279         struct request_queue *q;
2280         u64 segment_size;
2281
2282         /* create gendisk info */
2283         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2284         if (!disk)
2285                 return -ENOMEM;
2286
2287         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2288                  rbd_dev->dev_id);
2289         disk->major = rbd_dev->major;
2290         disk->first_minor = 0;
2291         disk->fops = &rbd_bd_ops;
2292         disk->private_data = rbd_dev;
2293
2294         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2295         if (!q)
2296                 goto out_disk;
2297
2298         /* We use the default size, but let's be explicit about it. */
2299         blk_queue_physical_block_size(q, SECTOR_SIZE);
2300
2301         /* set io sizes to object size */
2302         segment_size = rbd_obj_bytes(&rbd_dev->header);
2303         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2304         blk_queue_max_segment_size(q, segment_size);
2305         blk_queue_io_min(q, segment_size);
2306         blk_queue_io_opt(q, segment_size);
2307
2308         blk_queue_merge_bvec(q, rbd_merge_bvec);
2309         disk->queue = q;
2310
2311         q->queuedata = rbd_dev;
2312
2313         rbd_dev->disk = disk;
2314
2315         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2316
2317         return 0;
2318 out_disk:
2319         put_disk(disk);
2320
2321         return -ENOMEM;
2322 }
2323
2324 /*
2325   sysfs
2326 */
2327
2328 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2329 {
2330         return container_of(dev, struct rbd_device, dev);
2331 }
2332
2333 static ssize_t rbd_size_show(struct device *dev,
2334                              struct device_attribute *attr, char *buf)
2335 {
2336         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2337         sector_t size;
2338
2339         down_read(&rbd_dev->header_rwsem);
2340         size = get_capacity(rbd_dev->disk);
2341         up_read(&rbd_dev->header_rwsem);
2342
2343         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2344 }
2345
2346 /*
2347  * Note this shows the features for whatever's mapped, which is not
2348  * necessarily the base image.
2349  */
2350 static ssize_t rbd_features_show(struct device *dev,
2351                              struct device_attribute *attr, char *buf)
2352 {
2353         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2354
2355         return sprintf(buf, "0x%016llx\n",
2356                         (unsigned long long) rbd_dev->mapping.features);
2357 }
2358
2359 static ssize_t rbd_major_show(struct device *dev,
2360                               struct device_attribute *attr, char *buf)
2361 {
2362         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2363
2364         return sprintf(buf, "%d\n", rbd_dev->major);
2365 }
2366
2367 static ssize_t rbd_client_id_show(struct device *dev,
2368                                   struct device_attribute *attr, char *buf)
2369 {
2370         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2371
2372         return sprintf(buf, "client%lld\n",
2373                         ceph_client_id(rbd_dev->rbd_client->client));
2374 }
2375
2376 static ssize_t rbd_pool_show(struct device *dev,
2377                              struct device_attribute *attr, char *buf)
2378 {
2379         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2380
2381         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2382 }
2383
2384 static ssize_t rbd_pool_id_show(struct device *dev,
2385                              struct device_attribute *attr, char *buf)
2386 {
2387         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2388
2389         return sprintf(buf, "%llu\n",
2390                 (unsigned long long) rbd_dev->spec->pool_id);
2391 }
2392
2393 static ssize_t rbd_name_show(struct device *dev,
2394                              struct device_attribute *attr, char *buf)
2395 {
2396         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2397
2398         if (rbd_dev->spec->image_name)
2399                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2400
2401         return sprintf(buf, "(unknown)\n");
2402 }
2403
2404 static ssize_t rbd_image_id_show(struct device *dev,
2405                              struct device_attribute *attr, char *buf)
2406 {
2407         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2408
2409         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2410 }
2411
2412 /*
2413  * Shows the name of the currently-mapped snapshot (or
2414  * RBD_SNAP_HEAD_NAME for the base image).
2415  */
2416 static ssize_t rbd_snap_show(struct device *dev,
2417                              struct device_attribute *attr,
2418                              char *buf)
2419 {
2420         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2421
2422         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2423 }
2424
2425 /*
2426  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2427  * for the parent image.  If there is no parent, simply shows
2428  * "(no parent image)".
2429  */
2430 static ssize_t rbd_parent_show(struct device *dev,
2431                              struct device_attribute *attr,
2432                              char *buf)
2433 {
2434         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2435         struct rbd_spec *spec = rbd_dev->parent_spec;
2436         int count;
2437         char *bufp = buf;
2438
2439         if (!spec)
2440                 return sprintf(buf, "(no parent image)\n");
2441
2442         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2443                         (unsigned long long) spec->pool_id, spec->pool_name);
2444         if (count < 0)
2445                 return count;
2446         bufp += count;
2447
2448         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2449                         spec->image_name ? spec->image_name : "(unknown)");
2450         if (count < 0)
2451                 return count;
2452         bufp += count;
2453
2454         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2455                         (unsigned long long) spec->snap_id, spec->snap_name);
2456         if (count < 0)
2457                 return count;
2458         bufp += count;
2459
2460         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2461         if (count < 0)
2462                 return count;
2463         bufp += count;
2464
2465         return (ssize_t) (bufp - buf);
2466 }
2467
2468 static ssize_t rbd_image_refresh(struct device *dev,
2469                                  struct device_attribute *attr,
2470                                  const char *buf,
2471                                  size_t size)
2472 {
2473         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2474         int ret;
2475
2476         ret = rbd_dev_refresh(rbd_dev, NULL);
2477
2478         return ret < 0 ? ret : size;
2479 }
2480
2481 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2482 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2483 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2484 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2485 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2486 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2487 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2488 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2489 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2490 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2491 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2492
2493 static struct attribute *rbd_attrs[] = {
2494         &dev_attr_size.attr,
2495         &dev_attr_features.attr,
2496         &dev_attr_major.attr,
2497         &dev_attr_client_id.attr,
2498         &dev_attr_pool.attr,
2499         &dev_attr_pool_id.attr,
2500         &dev_attr_name.attr,
2501         &dev_attr_image_id.attr,
2502         &dev_attr_current_snap.attr,
2503         &dev_attr_parent.attr,
2504         &dev_attr_refresh.attr,
2505         NULL
2506 };
2507
2508 static struct attribute_group rbd_attr_group = {
2509         .attrs = rbd_attrs,
2510 };
2511
2512 static const struct attribute_group *rbd_attr_groups[] = {
2513         &rbd_attr_group,
2514         NULL
2515 };
2516
2517 static void rbd_sysfs_dev_release(struct device *dev)
2518 {
2519 }
2520
2521 static struct device_type rbd_device_type = {
2522         .name           = "rbd",
2523         .groups         = rbd_attr_groups,
2524         .release        = rbd_sysfs_dev_release,
2525 };
2526
2527
2528 /*
2529   sysfs - snapshots
2530 */
2531
2532 static ssize_t rbd_snap_size_show(struct device *dev,
2533                                   struct device_attribute *attr,
2534                                   char *buf)
2535 {
2536         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2537
2538         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2539 }
2540
2541 static ssize_t rbd_snap_id_show(struct device *dev,
2542                                 struct device_attribute *attr,
2543                                 char *buf)
2544 {
2545         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2546
2547         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2548 }
2549
2550 static ssize_t rbd_snap_features_show(struct device *dev,
2551                                 struct device_attribute *attr,
2552                                 char *buf)
2553 {
2554         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2555
2556         return sprintf(buf, "0x%016llx\n",
2557                         (unsigned long long) snap->features);
2558 }
2559
2560 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2561 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2562 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2563
2564 static struct attribute *rbd_snap_attrs[] = {
2565         &dev_attr_snap_size.attr,
2566         &dev_attr_snap_id.attr,
2567         &dev_attr_snap_features.attr,
2568         NULL,
2569 };
2570
2571 static struct attribute_group rbd_snap_attr_group = {
2572         .attrs = rbd_snap_attrs,
2573 };
2574
2575 static void rbd_snap_dev_release(struct device *dev)
2576 {
2577         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2578         kfree(snap->name);
2579         kfree(snap);
2580 }
2581
2582 static const struct attribute_group *rbd_snap_attr_groups[] = {
2583         &rbd_snap_attr_group,
2584         NULL
2585 };
2586
2587 static struct device_type rbd_snap_device_type = {
2588         .groups         = rbd_snap_attr_groups,
2589         .release        = rbd_snap_dev_release,
2590 };
2591
2592 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2593 {
2594         kref_get(&spec->kref);
2595
2596         return spec;
2597 }
2598
2599 static void rbd_spec_free(struct kref *kref);
2600 static void rbd_spec_put(struct rbd_spec *spec)
2601 {
2602         if (spec)
2603                 kref_put(&spec->kref, rbd_spec_free);
2604 }
2605
2606 static struct rbd_spec *rbd_spec_alloc(void)
2607 {
2608         struct rbd_spec *spec;
2609
2610         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2611         if (!spec)
2612                 return NULL;
2613         kref_init(&spec->kref);
2614
2615         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2616
2617         return spec;
2618 }
2619
2620 static void rbd_spec_free(struct kref *kref)
2621 {
2622         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2623
2624         kfree(spec->pool_name);
2625         kfree(spec->image_id);
2626         kfree(spec->image_name);
2627         kfree(spec->snap_name);
2628         kfree(spec);
2629 }
2630
2631 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2632                                 struct rbd_spec *spec)
2633 {
2634         struct rbd_device *rbd_dev;
2635
2636         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2637         if (!rbd_dev)
2638                 return NULL;
2639
2640         spin_lock_init(&rbd_dev->lock);
2641         rbd_dev->flags = 0;
2642         INIT_LIST_HEAD(&rbd_dev->node);
2643         INIT_LIST_HEAD(&rbd_dev->snaps);
2644         init_rwsem(&rbd_dev->header_rwsem);
2645
2646         rbd_dev->spec = spec;
2647         rbd_dev->rbd_client = rbdc;
2648
2649         /* Initialize the layout used for all rbd requests */
2650
2651         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2652         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2653         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2654         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2655
2656         return rbd_dev;
2657 }
2658
2659 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2660 {
2661         rbd_spec_put(rbd_dev->parent_spec);
2662         kfree(rbd_dev->header_name);
2663         rbd_put_client(rbd_dev->rbd_client);
2664         rbd_spec_put(rbd_dev->spec);
2665         kfree(rbd_dev);
2666 }
2667
2668 static bool rbd_snap_registered(struct rbd_snap *snap)
2669 {
2670         bool ret = snap->dev.type == &rbd_snap_device_type;
2671         bool reg = device_is_registered(&snap->dev);
2672
2673         rbd_assert(!ret ^ reg);
2674
2675         return ret;
2676 }
2677
2678 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2679 {
2680         list_del(&snap->node);
2681         if (device_is_registered(&snap->dev))
2682                 device_unregister(&snap->dev);
2683 }
2684
2685 static int rbd_register_snap_dev(struct rbd_snap *snap,
2686                                   struct device *parent)
2687 {
2688         struct device *dev = &snap->dev;
2689         int ret;
2690
2691         dev->type = &rbd_snap_device_type;
2692         dev->parent = parent;
2693         dev->release = rbd_snap_dev_release;
2694         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2695         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2696
2697         ret = device_register(dev);
2698
2699         return ret;
2700 }
2701
2702 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2703                                                 const char *snap_name,
2704                                                 u64 snap_id, u64 snap_size,
2705                                                 u64 snap_features)
2706 {
2707         struct rbd_snap *snap;
2708         int ret;
2709
2710         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2711         if (!snap)
2712                 return ERR_PTR(-ENOMEM);
2713
2714         ret = -ENOMEM;
2715         snap->name = kstrdup(snap_name, GFP_KERNEL);
2716         if (!snap->name)
2717                 goto err;
2718
2719         snap->id = snap_id;
2720         snap->size = snap_size;
2721         snap->features = snap_features;
2722
2723         return snap;
2724
2725 err:
2726         kfree(snap->name);
2727         kfree(snap);
2728
2729         return ERR_PTR(ret);
2730 }
2731
2732 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2733                 u64 *snap_size, u64 *snap_features)
2734 {
2735         char *snap_name;
2736
2737         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2738
2739         *snap_size = rbd_dev->header.snap_sizes[which];
2740         *snap_features = 0;     /* No features for v1 */
2741
2742         /* Skip over names until we find the one we are looking for */
2743
2744         snap_name = rbd_dev->header.snap_names;
2745         while (which--)
2746                 snap_name += strlen(snap_name) + 1;
2747
2748         return snap_name;
2749 }
2750
2751 /*
2752  * Get the size and object order for an image snapshot, or if
2753  * snap_id is CEPH_NOSNAP, gets this information for the base
2754  * image.
2755  */
2756 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2757                                 u8 *order, u64 *snap_size)
2758 {
2759         __le64 snapid = cpu_to_le64(snap_id);
2760         int ret;
2761         struct {
2762                 u8 order;
2763                 __le64 size;
2764         } __attribute__ ((packed)) size_buf = { 0 };
2765
2766         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2767                                 "rbd", "get_size",
2768                                 (char *) &snapid, sizeof (snapid),
2769                                 (char *) &size_buf, sizeof (size_buf), NULL);
2770         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2771         if (ret < 0)
2772                 return ret;
2773
2774         *order = size_buf.order;
2775         *snap_size = le64_to_cpu(size_buf.size);
2776
2777         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2778                 (unsigned long long) snap_id, (unsigned int) *order,
2779                 (unsigned long long) *snap_size);
2780
2781         return 0;
2782 }
2783
2784 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2785 {
2786         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2787                                         &rbd_dev->header.obj_order,
2788                                         &rbd_dev->header.image_size);
2789 }
2790
2791 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2792 {
2793         void *reply_buf;
2794         int ret;
2795         void *p;
2796
2797         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2798         if (!reply_buf)
2799                 return -ENOMEM;
2800
2801         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2802                                 "rbd", "get_object_prefix",
2803                                 NULL, 0,
2804                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2805         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2806         if (ret < 0)
2807                 goto out;
2808
2809         p = reply_buf;
2810         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2811                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2812                                                 NULL, GFP_NOIO);
2813
2814         if (IS_ERR(rbd_dev->header.object_prefix)) {
2815                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2816                 rbd_dev->header.object_prefix = NULL;
2817         } else {
2818                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2819         }
2820
2821 out:
2822         kfree(reply_buf);
2823
2824         return ret;
2825 }
2826
2827 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2828                 u64 *snap_features)
2829 {
2830         __le64 snapid = cpu_to_le64(snap_id);
2831         struct {
2832                 __le64 features;
2833                 __le64 incompat;
2834         } features_buf = { 0 };
2835         u64 incompat;
2836         int ret;
2837
2838         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2839                                 "rbd", "get_features",
2840                                 (char *) &snapid, sizeof (snapid),
2841                                 (char *) &features_buf, sizeof (features_buf),
2842                                 NULL);
2843         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2844         if (ret < 0)
2845                 return ret;
2846
2847         incompat = le64_to_cpu(features_buf.incompat);
2848         if (incompat & ~RBD_FEATURES_ALL)
2849                 return -ENXIO;
2850
2851         *snap_features = le64_to_cpu(features_buf.features);
2852
2853         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2854                 (unsigned long long) snap_id,
2855                 (unsigned long long) *snap_features,
2856                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2857
2858         return 0;
2859 }
2860
2861 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2862 {
2863         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2864                                                 &rbd_dev->header.features);
2865 }
2866
2867 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2868 {
2869         struct rbd_spec *parent_spec;
2870         size_t size;
2871         void *reply_buf = NULL;
2872         __le64 snapid;
2873         void *p;
2874         void *end;
2875         char *image_id;
2876         u64 overlap;
2877         int ret;
2878
2879         parent_spec = rbd_spec_alloc();
2880         if (!parent_spec)
2881                 return -ENOMEM;
2882
2883         size = sizeof (__le64) +                                /* pool_id */
2884                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2885                 sizeof (__le64) +                               /* snap_id */
2886                 sizeof (__le64);                                /* overlap */
2887         reply_buf = kmalloc(size, GFP_KERNEL);
2888         if (!reply_buf) {
2889                 ret = -ENOMEM;
2890                 goto out_err;
2891         }
2892
2893         snapid = cpu_to_le64(CEPH_NOSNAP);
2894         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2895                                 "rbd", "get_parent",
2896                                 (char *) &snapid, sizeof (snapid),
2897                                 (char *) reply_buf, size, NULL);
2898         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2899         if (ret < 0)
2900                 goto out_err;
2901
2902         ret = -ERANGE;
2903         p = reply_buf;
2904         end = (char *) reply_buf + size;
2905         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2906         if (parent_spec->pool_id == CEPH_NOPOOL)
2907                 goto out;       /* No parent?  No problem. */
2908
2909         /* The ceph file layout needs to fit pool id in 32 bits */
2910
2911         ret = -EIO;
2912         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2913                 goto out;
2914
2915         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2916         if (IS_ERR(image_id)) {
2917                 ret = PTR_ERR(image_id);
2918                 goto out_err;
2919         }
2920         parent_spec->image_id = image_id;
2921         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2922         ceph_decode_64_safe(&p, end, overlap, out_err);
2923
2924         rbd_dev->parent_overlap = overlap;
2925         rbd_dev->parent_spec = parent_spec;
2926         parent_spec = NULL;     /* rbd_dev now owns this */
2927 out:
2928         ret = 0;
2929 out_err:
2930         kfree(reply_buf);
2931         rbd_spec_put(parent_spec);
2932
2933         return ret;
2934 }
2935
2936 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2937 {
2938         size_t image_id_size;
2939         char *image_id;
2940         void *p;
2941         void *end;
2942         size_t size;
2943         void *reply_buf = NULL;
2944         size_t len = 0;
2945         char *image_name = NULL;
2946         int ret;
2947
2948         rbd_assert(!rbd_dev->spec->image_name);
2949
2950         len = strlen(rbd_dev->spec->image_id);
2951         image_id_size = sizeof (__le32) + len;
2952         image_id = kmalloc(image_id_size, GFP_KERNEL);
2953         if (!image_id)
2954                 return NULL;
2955
2956         p = image_id;
2957         end = (char *) image_id + image_id_size;
2958         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
2959
2960         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2961         reply_buf = kmalloc(size, GFP_KERNEL);
2962         if (!reply_buf)
2963                 goto out;
2964
2965         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
2966                                 "rbd", "dir_get_name",
2967                                 image_id, image_id_size,
2968                                 (char *) reply_buf, size, NULL);
2969         if (ret < 0)
2970                 goto out;
2971         p = reply_buf;
2972         end = (char *) reply_buf + size;
2973         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2974         if (IS_ERR(image_name))
2975                 image_name = NULL;
2976         else
2977                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2978 out:
2979         kfree(reply_buf);
2980         kfree(image_id);
2981
2982         return image_name;
2983 }
2984
2985 /*
2986  * When a parent image gets probed, we only have the pool, image,
2987  * and snapshot ids but not the names of any of them.  This call
2988  * is made later to fill in those names.  It has to be done after
2989  * rbd_dev_snaps_update() has completed because some of the
2990  * information (in particular, snapshot name) is not available
2991  * until then.
2992  */
2993 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2994 {
2995         struct ceph_osd_client *osdc;
2996         const char *name;
2997         void *reply_buf = NULL;
2998         int ret;
2999
3000         if (rbd_dev->spec->pool_name)
3001                 return 0;       /* Already have the names */
3002
3003         /* Look up the pool name */
3004
3005         osdc = &rbd_dev->rbd_client->client->osdc;
3006         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3007         if (!name) {
3008                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3009                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3010                 return -EIO;
3011         }
3012
3013         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3014         if (!rbd_dev->spec->pool_name)
3015                 return -ENOMEM;
3016
3017         /* Fetch the image name; tolerate failure here */
3018
3019         name = rbd_dev_image_name(rbd_dev);
3020         if (name)
3021                 rbd_dev->spec->image_name = (char *) name;
3022         else
3023                 rbd_warn(rbd_dev, "unable to get image name");
3024
3025         /* Look up the snapshot name. */
3026
3027         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3028         if (!name) {
3029                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3030                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3031                 ret = -EIO;
3032                 goto out_err;
3033         }
3034         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3035         if(!rbd_dev->spec->snap_name)
3036                 goto out_err;
3037
3038         return 0;
3039 out_err:
3040         kfree(reply_buf);
3041         kfree(rbd_dev->spec->pool_name);
3042         rbd_dev->spec->pool_name = NULL;
3043
3044         return ret;
3045 }
3046
3047 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3048 {
3049         size_t size;
3050         int ret;
3051         void *reply_buf;
3052         void *p;
3053         void *end;
3054         u64 seq;
3055         u32 snap_count;
3056         struct ceph_snap_context *snapc;
3057         u32 i;
3058
3059         /*
3060          * We'll need room for the seq value (maximum snapshot id),
3061          * snapshot count, and array of that many snapshot ids.
3062          * For now we have a fixed upper limit on the number we're
3063          * prepared to receive.
3064          */
3065         size = sizeof (__le64) + sizeof (__le32) +
3066                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3067         reply_buf = kzalloc(size, GFP_KERNEL);
3068         if (!reply_buf)
3069                 return -ENOMEM;
3070
3071         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3072                                 "rbd", "get_snapcontext",
3073                                 NULL, 0,
3074                                 reply_buf, size, ver);
3075         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3076         if (ret < 0)
3077                 goto out;
3078
3079         ret = -ERANGE;
3080         p = reply_buf;
3081         end = (char *) reply_buf + size;
3082         ceph_decode_64_safe(&p, end, seq, out);
3083         ceph_decode_32_safe(&p, end, snap_count, out);
3084
3085         /*
3086          * Make sure the reported number of snapshot ids wouldn't go
3087          * beyond the end of our buffer.  But before checking that,
3088          * make sure the computed size of the snapshot context we
3089          * allocate is representable in a size_t.
3090          */
3091         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3092                                  / sizeof (u64)) {
3093                 ret = -EINVAL;
3094                 goto out;
3095         }
3096         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3097                 goto out;
3098
3099         size = sizeof (struct ceph_snap_context) +
3100                                 snap_count * sizeof (snapc->snaps[0]);
3101         snapc = kmalloc(size, GFP_KERNEL);
3102         if (!snapc) {
3103                 ret = -ENOMEM;
3104                 goto out;
3105         }
3106
3107         atomic_set(&snapc->nref, 1);
3108         snapc->seq = seq;
3109         snapc->num_snaps = snap_count;
3110         for (i = 0; i < snap_count; i++)
3111                 snapc->snaps[i] = ceph_decode_64(&p);
3112
3113         rbd_dev->header.snapc = snapc;
3114
3115         dout("  snap context seq = %llu, snap_count = %u\n",
3116                 (unsigned long long) seq, (unsigned int) snap_count);
3117
3118 out:
3119         kfree(reply_buf);
3120
3121         return 0;
3122 }
3123
3124 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3125 {
3126         size_t size;
3127         void *reply_buf;
3128         __le64 snap_id;
3129         int ret;
3130         void *p;
3131         void *end;
3132         char *snap_name;
3133
3134         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3135         reply_buf = kmalloc(size, GFP_KERNEL);
3136         if (!reply_buf)
3137                 return ERR_PTR(-ENOMEM);
3138
3139         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3140         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3141                                 "rbd", "get_snapshot_name",
3142                                 (char *) &snap_id, sizeof (snap_id),
3143                                 reply_buf, size, NULL);
3144         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3145         if (ret < 0)
3146                 goto out;
3147
3148         p = reply_buf;
3149         end = (char *) reply_buf + size;
3150         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3151         if (IS_ERR(snap_name)) {
3152                 ret = PTR_ERR(snap_name);
3153                 goto out;
3154         } else {
3155                 dout("  snap_id 0x%016llx snap_name = %s\n",
3156                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3157         }
3158         kfree(reply_buf);
3159
3160         return snap_name;
3161 out:
3162         kfree(reply_buf);
3163
3164         return ERR_PTR(ret);
3165 }
3166
3167 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3168                 u64 *snap_size, u64 *snap_features)
3169 {
3170         u64 snap_id;
3171         u8 order;
3172         int ret;
3173
3174         snap_id = rbd_dev->header.snapc->snaps[which];
3175         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3176         if (ret)
3177                 return ERR_PTR(ret);
3178         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3179         if (ret)
3180                 return ERR_PTR(ret);
3181
3182         return rbd_dev_v2_snap_name(rbd_dev, which);
3183 }
3184
3185 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3186                 u64 *snap_size, u64 *snap_features)
3187 {
3188         if (rbd_dev->image_format == 1)
3189                 return rbd_dev_v1_snap_info(rbd_dev, which,
3190                                         snap_size, snap_features);
3191         if (rbd_dev->image_format == 2)
3192                 return rbd_dev_v2_snap_info(rbd_dev, which,
3193                                         snap_size, snap_features);
3194         return ERR_PTR(-EINVAL);
3195 }
3196
3197 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3198 {
3199         int ret;
3200         __u8 obj_order;
3201
3202         down_write(&rbd_dev->header_rwsem);
3203
3204         /* Grab old order first, to see if it changes */
3205
3206         obj_order = rbd_dev->header.obj_order,
3207         ret = rbd_dev_v2_image_size(rbd_dev);
3208         if (ret)
3209                 goto out;
3210         if (rbd_dev->header.obj_order != obj_order) {
3211                 ret = -EIO;
3212                 goto out;
3213         }
3214         rbd_update_mapping_size(rbd_dev);
3215
3216         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3217         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3218         if (ret)
3219                 goto out;
3220         ret = rbd_dev_snaps_update(rbd_dev);
3221         dout("rbd_dev_snaps_update returned %d\n", ret);
3222         if (ret)
3223                 goto out;
3224         ret = rbd_dev_snaps_register(rbd_dev);
3225         dout("rbd_dev_snaps_register returned %d\n", ret);
3226 out:
3227         up_write(&rbd_dev->header_rwsem);
3228
3229         return ret;
3230 }
3231
3232 /*
3233  * Scan the rbd device's current snapshot list and compare it to the
3234  * newly-received snapshot context.  Remove any existing snapshots
3235  * not present in the new snapshot context.  Add a new snapshot for
3236  * any snaphots in the snapshot context not in the current list.
3237  * And verify there are no changes to snapshots we already know
3238  * about.
3239  *
3240  * Assumes the snapshots in the snapshot context are sorted by
3241  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3242  * are also maintained in that order.)
3243  */
3244 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3245 {
3246         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3247         const u32 snap_count = snapc->num_snaps;
3248         struct list_head *head = &rbd_dev->snaps;
3249         struct list_head *links = head->next;
3250         u32 index = 0;
3251
3252         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3253         while (index < snap_count || links != head) {
3254                 u64 snap_id;
3255                 struct rbd_snap *snap;
3256                 char *snap_name;
3257                 u64 snap_size = 0;
3258                 u64 snap_features = 0;
3259
3260                 snap_id = index < snap_count ? snapc->snaps[index]
3261                                              : CEPH_NOSNAP;
3262                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3263                                      : NULL;
3264                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3265
3266                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3267                         struct list_head *next = links->next;
3268
3269                         /*
3270                          * A previously-existing snapshot is not in
3271                          * the new snap context.
3272                          *
3273                          * If the now missing snapshot is the one the
3274                          * image is mapped to, clear its exists flag
3275                          * so we can avoid sending any more requests
3276                          * to it.
3277                          */
3278                         if (rbd_dev->spec->snap_id == snap->id)
3279                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3280                         rbd_remove_snap_dev(snap);
3281                         dout("%ssnap id %llu has been removed\n",
3282                                 rbd_dev->spec->snap_id == snap->id ?
3283                                                         "mapped " : "",
3284                                 (unsigned long long) snap->id);
3285
3286                         /* Done with this list entry; advance */
3287
3288                         links = next;
3289                         continue;
3290                 }
3291
3292                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3293                                         &snap_size, &snap_features);
3294                 if (IS_ERR(snap_name))
3295                         return PTR_ERR(snap_name);
3296
3297                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3298                         (unsigned long long) snap_id);
3299                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3300                         struct rbd_snap *new_snap;
3301
3302                         /* We haven't seen this snapshot before */
3303
3304                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3305                                         snap_id, snap_size, snap_features);
3306                         if (IS_ERR(new_snap)) {
3307                                 int err = PTR_ERR(new_snap);
3308
3309                                 dout("  failed to add dev, error %d\n", err);
3310
3311                                 return err;
3312                         }
3313
3314                         /* New goes before existing, or at end of list */
3315
3316                         dout("  added dev%s\n", snap ? "" : " at end\n");
3317                         if (snap)
3318                                 list_add_tail(&new_snap->node, &snap->node);
3319                         else
3320                                 list_add_tail(&new_snap->node, head);
3321                 } else {
3322                         /* Already have this one */
3323
3324                         dout("  already present\n");
3325
3326                         rbd_assert(snap->size == snap_size);
3327                         rbd_assert(!strcmp(snap->name, snap_name));
3328                         rbd_assert(snap->features == snap_features);
3329
3330                         /* Done with this list entry; advance */
3331
3332                         links = links->next;
3333                 }
3334
3335                 /* Advance to the next entry in the snapshot context */
3336
3337                 index++;
3338         }
3339         dout("%s: done\n", __func__);
3340
3341         return 0;
3342 }
3343
3344 /*
3345  * Scan the list of snapshots and register the devices for any that
3346  * have not already been registered.
3347  */
3348 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3349 {
3350         struct rbd_snap *snap;
3351         int ret = 0;
3352
3353         dout("%s:\n", __func__);
3354         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3355                 return -EIO;
3356
3357         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3358                 if (!rbd_snap_registered(snap)) {
3359                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3360                         if (ret < 0)
3361                                 break;
3362                 }
3363         }
3364         dout("%s: returning %d\n", __func__, ret);
3365
3366         return ret;
3367 }
3368
3369 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3370 {
3371         struct device *dev;
3372         int ret;
3373
3374         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3375
3376         dev = &rbd_dev->dev;
3377         dev->bus = &rbd_bus_type;
3378         dev->type = &rbd_device_type;
3379         dev->parent = &rbd_root_dev;
3380         dev->release = rbd_dev_release;
3381         dev_set_name(dev, "%d", rbd_dev->dev_id);
3382         ret = device_register(dev);
3383
3384         mutex_unlock(&ctl_mutex);
3385
3386         return ret;
3387 }
3388
3389 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3390 {
3391         device_unregister(&rbd_dev->dev);
3392 }
3393
3394 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3395
3396 /*
3397  * Get a unique rbd identifier for the given new rbd_dev, and add
3398  * the rbd_dev to the global list.  The minimum rbd id is 1.
3399  */
3400 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3401 {
3402         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3403
3404         spin_lock(&rbd_dev_list_lock);
3405         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3406         spin_unlock(&rbd_dev_list_lock);
3407         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3408                 (unsigned long long) rbd_dev->dev_id);
3409 }
3410
3411 /*
3412  * Remove an rbd_dev from the global list, and record that its
3413  * identifier is no longer in use.
3414  */
3415 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3416 {
3417         struct list_head *tmp;
3418         int rbd_id = rbd_dev->dev_id;
3419         int max_id;
3420
3421         rbd_assert(rbd_id > 0);
3422
3423         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3424                 (unsigned long long) rbd_dev->dev_id);
3425         spin_lock(&rbd_dev_list_lock);
3426         list_del_init(&rbd_dev->node);
3427
3428         /*
3429          * If the id being "put" is not the current maximum, there
3430          * is nothing special we need to do.
3431          */
3432         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3433                 spin_unlock(&rbd_dev_list_lock);
3434                 return;
3435         }
3436
3437         /*
3438          * We need to update the current maximum id.  Search the
3439          * list to find out what it is.  We're more likely to find
3440          * the maximum at the end, so search the list backward.
3441          */
3442         max_id = 0;
3443         list_for_each_prev(tmp, &rbd_dev_list) {
3444                 struct rbd_device *rbd_dev;
3445
3446                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3447                 if (rbd_dev->dev_id > max_id)
3448                         max_id = rbd_dev->dev_id;
3449         }
3450         spin_unlock(&rbd_dev_list_lock);
3451
3452         /*
3453          * The max id could have been updated by rbd_dev_id_get(), in
3454          * which case it now accurately reflects the new maximum.
3455          * Be careful not to overwrite the maximum value in that
3456          * case.
3457          */
3458         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3459         dout("  max dev id has been reset\n");
3460 }
3461
3462 /*
3463  * Skips over white space at *buf, and updates *buf to point to the
3464  * first found non-space character (if any). Returns the length of
3465  * the token (string of non-white space characters) found.  Note
3466  * that *buf must be terminated with '\0'.
3467  */
3468 static inline size_t next_token(const char **buf)
3469 {
3470         /*
3471         * These are the characters that produce nonzero for
3472         * isspace() in the "C" and "POSIX" locales.
3473         */
3474         const char *spaces = " \f\n\r\t\v";
3475
3476         *buf += strspn(*buf, spaces);   /* Find start of token */
3477
3478         return strcspn(*buf, spaces);   /* Return token length */
3479 }
3480
3481 /*
3482  * Finds the next token in *buf, and if the provided token buffer is
3483  * big enough, copies the found token into it.  The result, if
3484  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3485  * must be terminated with '\0' on entry.
3486  *
3487  * Returns the length of the token found (not including the '\0').
3488  * Return value will be 0 if no token is found, and it will be >=
3489  * token_size if the token would not fit.
3490  *
3491  * The *buf pointer will be updated to point beyond the end of the
3492  * found token.  Note that this occurs even if the token buffer is
3493  * too small to hold it.
3494  */
3495 static inline size_t copy_token(const char **buf,
3496                                 char *token,
3497                                 size_t token_size)
3498 {
3499         size_t len;
3500
3501         len = next_token(buf);
3502         if (len < token_size) {
3503                 memcpy(token, *buf, len);
3504                 *(token + len) = '\0';
3505         }
3506         *buf += len;
3507
3508         return len;
3509 }
3510
3511 /*
3512  * Finds the next token in *buf, dynamically allocates a buffer big
3513  * enough to hold a copy of it, and copies the token into the new
3514  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3515  * that a duplicate buffer is created even for a zero-length token.
3516  *
3517  * Returns a pointer to the newly-allocated duplicate, or a null
3518  * pointer if memory for the duplicate was not available.  If
3519  * the lenp argument is a non-null pointer, the length of the token
3520  * (not including the '\0') is returned in *lenp.
3521  *
3522  * If successful, the *buf pointer will be updated to point beyond
3523  * the end of the found token.
3524  *
3525  * Note: uses GFP_KERNEL for allocation.
3526  */
3527 static inline char *dup_token(const char **buf, size_t *lenp)
3528 {
3529         char *dup;
3530         size_t len;
3531
3532         len = next_token(buf);
3533         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3534         if (!dup)
3535                 return NULL;
3536         *(dup + len) = '\0';
3537         *buf += len;
3538
3539         if (lenp)
3540                 *lenp = len;
3541
3542         return dup;
3543 }
3544
3545 /*
3546  * Parse the options provided for an "rbd add" (i.e., rbd image
3547  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3548  * and the data written is passed here via a NUL-terminated buffer.
3549  * Returns 0 if successful or an error code otherwise.
3550  *
3551  * The information extracted from these options is recorded in
3552  * the other parameters which return dynamically-allocated
3553  * structures:
3554  *  ceph_opts
3555  *      The address of a pointer that will refer to a ceph options
3556  *      structure.  Caller must release the returned pointer using
3557  *      ceph_destroy_options() when it is no longer needed.
3558  *  rbd_opts
3559  *      Address of an rbd options pointer.  Fully initialized by
3560  *      this function; caller must release with kfree().
3561  *  spec
3562  *      Address of an rbd image specification pointer.  Fully
3563  *      initialized by this function based on parsed options.
3564  *      Caller must release with rbd_spec_put().
3565  *
3566  * The options passed take this form:
3567  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3568  * where:
3569  *  <mon_addrs>
3570  *      A comma-separated list of one or more monitor addresses.
3571  *      A monitor address is an ip address, optionally followed
3572  *      by a port number (separated by a colon).
3573  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3574  *  <options>
3575  *      A comma-separated list of ceph and/or rbd options.
3576  *  <pool_name>
3577  *      The name of the rados pool containing the rbd image.
3578  *  <image_name>
3579  *      The name of the image in that pool to map.
3580  *  <snap_id>
3581  *      An optional snapshot id.  If provided, the mapping will
3582  *      present data from the image at the time that snapshot was
3583  *      created.  The image head is used if no snapshot id is
3584  *      provided.  Snapshot mappings are always read-only.
3585  */
3586 static int rbd_add_parse_args(const char *buf,
3587                                 struct ceph_options **ceph_opts,
3588                                 struct rbd_options **opts,
3589                                 struct rbd_spec **rbd_spec)
3590 {
3591         size_t len;
3592         char *options;
3593         const char *mon_addrs;
3594         size_t mon_addrs_size;
3595         struct rbd_spec *spec = NULL;
3596         struct rbd_options *rbd_opts = NULL;
3597         struct ceph_options *copts;
3598         int ret;
3599
3600         /* The first four tokens are required */
3601
3602         len = next_token(&buf);
3603         if (!len) {
3604                 rbd_warn(NULL, "no monitor address(es) provided");
3605                 return -EINVAL;
3606         }
3607         mon_addrs = buf;
3608         mon_addrs_size = len + 1;
3609         buf += len;
3610
3611         ret = -EINVAL;
3612         options = dup_token(&buf, NULL);
3613         if (!options)
3614                 return -ENOMEM;
3615         if (!*options) {
3616                 rbd_warn(NULL, "no options provided");
3617                 goto out_err;
3618         }
3619
3620         spec = rbd_spec_alloc();
3621         if (!spec)
3622                 goto out_mem;
3623
3624         spec->pool_name = dup_token(&buf, NULL);
3625         if (!spec->pool_name)
3626                 goto out_mem;
3627         if (!*spec->pool_name) {
3628                 rbd_warn(NULL, "no pool name provided");
3629                 goto out_err;
3630         }
3631
3632         spec->image_name = dup_token(&buf, NULL);
3633         if (!spec->image_name)
3634                 goto out_mem;
3635         if (!*spec->image_name) {
3636                 rbd_warn(NULL, "no image name provided");
3637                 goto out_err;
3638         }
3639
3640         /*
3641          * Snapshot name is optional; default is to use "-"
3642          * (indicating the head/no snapshot).
3643          */
3644         len = next_token(&buf);
3645         if (!len) {
3646                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3647                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3648         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3649                 ret = -ENAMETOOLONG;
3650                 goto out_err;
3651         }
3652         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3653         if (!spec->snap_name)
3654                 goto out_mem;
3655         *(spec->snap_name + len) = '\0';
3656
3657         /* Initialize all rbd options to the defaults */
3658
3659         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3660         if (!rbd_opts)
3661                 goto out_mem;
3662
3663         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3664
3665         copts = ceph_parse_options(options, mon_addrs,
3666                                         mon_addrs + mon_addrs_size - 1,
3667                                         parse_rbd_opts_token, rbd_opts);
3668         if (IS_ERR(copts)) {
3669                 ret = PTR_ERR(copts);
3670                 goto out_err;
3671         }
3672         kfree(options);
3673
3674         *ceph_opts = copts;
3675         *opts = rbd_opts;
3676         *rbd_spec = spec;
3677
3678         return 0;
3679 out_mem:
3680         ret = -ENOMEM;
3681 out_err:
3682         kfree(rbd_opts);
3683         rbd_spec_put(spec);
3684         kfree(options);
3685
3686         return ret;
3687 }
3688
3689 /*
3690  * An rbd format 2 image has a unique identifier, distinct from the
3691  * name given to it by the user.  Internally, that identifier is
3692  * what's used to specify the names of objects related to the image.
3693  *
3694  * A special "rbd id" object is used to map an rbd image name to its
3695  * id.  If that object doesn't exist, then there is no v2 rbd image
3696  * with the supplied name.
3697  *
3698  * This function will record the given rbd_dev's image_id field if
3699  * it can be determined, and in that case will return 0.  If any
3700  * errors occur a negative errno will be returned and the rbd_dev's
3701  * image_id field will be unchanged (and should be NULL).
3702  */
3703 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3704 {
3705         int ret;
3706         size_t size;
3707         char *object_name;
3708         void *response;
3709         void *p;
3710
3711         /*
3712          * When probing a parent image, the image id is already
3713          * known (and the image name likely is not).  There's no
3714          * need to fetch the image id again in this case.
3715          */
3716         if (rbd_dev->spec->image_id)
3717                 return 0;
3718
3719         /*
3720          * First, see if the format 2 image id file exists, and if
3721          * so, get the image's persistent id from it.
3722          */
3723         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3724         object_name = kmalloc(size, GFP_NOIO);
3725         if (!object_name)
3726                 return -ENOMEM;
3727         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3728         dout("rbd id object name is %s\n", object_name);
3729
3730         /* Response will be an encoded string, which includes a length */
3731
3732         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3733         response = kzalloc(size, GFP_NOIO);
3734         if (!response) {
3735                 ret = -ENOMEM;
3736                 goto out;
3737         }
3738
3739         ret = rbd_obj_method_sync(rbd_dev, object_name,
3740                                 "rbd", "get_id",
3741                                 NULL, 0,
3742                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3743         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3744         if (ret < 0)
3745                 goto out;
3746
3747         p = response;
3748         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3749                                                 p + RBD_IMAGE_ID_LEN_MAX,
3750                                                 NULL, GFP_NOIO);
3751         if (IS_ERR(rbd_dev->spec->image_id)) {
3752                 ret = PTR_ERR(rbd_dev->spec->image_id);
3753                 rbd_dev->spec->image_id = NULL;
3754         } else {
3755                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3756         }
3757 out:
3758         kfree(response);
3759         kfree(object_name);
3760
3761         return ret;
3762 }
3763
3764 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3765 {
3766         int ret;
3767         size_t size;
3768
3769         /* Version 1 images have no id; empty string is used */
3770
3771         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3772         if (!rbd_dev->spec->image_id)
3773                 return -ENOMEM;
3774
3775         /* Record the header object name for this rbd image. */
3776
3777         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3778         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3779         if (!rbd_dev->header_name) {
3780                 ret = -ENOMEM;
3781                 goto out_err;
3782         }
3783         sprintf(rbd_dev->header_name, "%s%s",
3784                 rbd_dev->spec->image_name, RBD_SUFFIX);
3785
3786         /* Populate rbd image metadata */
3787
3788         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3789         if (ret < 0)
3790                 goto out_err;
3791
3792         /* Version 1 images have no parent (no layering) */
3793
3794         rbd_dev->parent_spec = NULL;
3795         rbd_dev->parent_overlap = 0;
3796
3797         rbd_dev->image_format = 1;
3798
3799         dout("discovered version 1 image, header name is %s\n",
3800                 rbd_dev->header_name);
3801
3802         return 0;
3803
3804 out_err:
3805         kfree(rbd_dev->header_name);
3806         rbd_dev->header_name = NULL;
3807         kfree(rbd_dev->spec->image_id);
3808         rbd_dev->spec->image_id = NULL;
3809
3810         return ret;
3811 }
3812
3813 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3814 {
3815         size_t size;
3816         int ret;
3817         u64 ver = 0;
3818
3819         /*
3820          * Image id was filled in by the caller.  Record the header
3821          * object name for this rbd image.
3822          */
3823         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3824         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3825         if (!rbd_dev->header_name)
3826                 return -ENOMEM;
3827         sprintf(rbd_dev->header_name, "%s%s",
3828                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3829
3830         /* Get the size and object order for the image */
3831
3832         ret = rbd_dev_v2_image_size(rbd_dev);
3833         if (ret < 0)
3834                 goto out_err;
3835
3836         /* Get the object prefix (a.k.a. block_name) for the image */
3837
3838         ret = rbd_dev_v2_object_prefix(rbd_dev);
3839         if (ret < 0)
3840                 goto out_err;
3841
3842         /* Get the and check features for the image */
3843
3844         ret = rbd_dev_v2_features(rbd_dev);
3845         if (ret < 0)
3846                 goto out_err;
3847
3848         /* If the image supports layering, get the parent info */
3849
3850         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3851                 ret = rbd_dev_v2_parent_info(rbd_dev);
3852                 if (ret < 0)
3853                         goto out_err;
3854         }
3855
3856         /* crypto and compression type aren't (yet) supported for v2 images */
3857
3858         rbd_dev->header.crypt_type = 0;
3859         rbd_dev->header.comp_type = 0;
3860
3861         /* Get the snapshot context, plus the header version */
3862
3863         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3864         if (ret)
3865                 goto out_err;
3866         rbd_dev->header.obj_version = ver;
3867
3868         rbd_dev->image_format = 2;
3869
3870         dout("discovered version 2 image, header name is %s\n",
3871                 rbd_dev->header_name);
3872
3873         return 0;
3874 out_err:
3875         rbd_dev->parent_overlap = 0;
3876         rbd_spec_put(rbd_dev->parent_spec);
3877         rbd_dev->parent_spec = NULL;
3878         kfree(rbd_dev->header_name);
3879         rbd_dev->header_name = NULL;
3880         kfree(rbd_dev->header.object_prefix);
3881         rbd_dev->header.object_prefix = NULL;
3882
3883         return ret;
3884 }
3885
3886 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3887 {
3888         int ret;
3889
3890         /* no need to lock here, as rbd_dev is not registered yet */
3891         ret = rbd_dev_snaps_update(rbd_dev);
3892         if (ret)
3893                 return ret;
3894
3895         ret = rbd_dev_probe_update_spec(rbd_dev);
3896         if (ret)
3897                 goto err_out_snaps;
3898
3899         ret = rbd_dev_set_mapping(rbd_dev);
3900         if (ret)
3901                 goto err_out_snaps;
3902
3903         /* generate unique id: find highest unique id, add one */
3904         rbd_dev_id_get(rbd_dev);
3905
3906         /* Fill in the device name, now that we have its id. */
3907         BUILD_BUG_ON(DEV_NAME_LEN
3908                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3909         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3910
3911         /* Get our block major device number. */
3912
3913         ret = register_blkdev(0, rbd_dev->name);
3914         if (ret < 0)
3915                 goto err_out_id;
3916         rbd_dev->major = ret;
3917
3918         /* Set up the blkdev mapping. */
3919
3920         ret = rbd_init_disk(rbd_dev);
3921         if (ret)
3922                 goto err_out_blkdev;
3923
3924         ret = rbd_bus_add_dev(rbd_dev);
3925         if (ret)
3926                 goto err_out_disk;
3927
3928         /*
3929          * At this point cleanup in the event of an error is the job
3930          * of the sysfs code (initiated by rbd_bus_del_dev()).
3931          */
3932         down_write(&rbd_dev->header_rwsem);
3933         ret = rbd_dev_snaps_register(rbd_dev);
3934         up_write(&rbd_dev->header_rwsem);
3935         if (ret)
3936                 goto err_out_bus;
3937
3938         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
3939         if (ret)
3940                 goto err_out_bus;
3941
3942         /* Everything's ready.  Announce the disk to the world. */
3943
3944         add_disk(rbd_dev->disk);
3945
3946         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3947                 (unsigned long long) rbd_dev->mapping.size);
3948
3949         return ret;
3950 err_out_bus:
3951         /* this will also clean up rest of rbd_dev stuff */
3952
3953         rbd_bus_del_dev(rbd_dev);
3954
3955         return ret;
3956 err_out_disk:
3957         rbd_free_disk(rbd_dev);
3958 err_out_blkdev:
3959         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3960 err_out_id:
3961         rbd_dev_id_put(rbd_dev);
3962 err_out_snaps:
3963         rbd_remove_all_snaps(rbd_dev);
3964
3965         return ret;
3966 }
3967
3968 /*
3969  * Probe for the existence of the header object for the given rbd
3970  * device.  For format 2 images this includes determining the image
3971  * id.
3972  */
3973 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3974 {
3975         int ret;
3976
3977         /*
3978          * Get the id from the image id object.  If it's not a
3979          * format 2 image, we'll get ENOENT back, and we'll assume
3980          * it's a format 1 image.
3981          */
3982         ret = rbd_dev_image_id(rbd_dev);
3983         if (ret)
3984                 ret = rbd_dev_v1_probe(rbd_dev);
3985         else
3986                 ret = rbd_dev_v2_probe(rbd_dev);
3987         if (ret) {
3988                 dout("probe failed, returning %d\n", ret);
3989
3990                 return ret;
3991         }
3992
3993         ret = rbd_dev_probe_finish(rbd_dev);
3994         if (ret)
3995                 rbd_header_free(&rbd_dev->header);
3996
3997         return ret;
3998 }
3999
4000 static ssize_t rbd_add(struct bus_type *bus,
4001                        const char *buf,
4002                        size_t count)
4003 {
4004         struct rbd_device *rbd_dev = NULL;
4005         struct ceph_options *ceph_opts = NULL;
4006         struct rbd_options *rbd_opts = NULL;
4007         struct rbd_spec *spec = NULL;
4008         struct rbd_client *rbdc;
4009         struct ceph_osd_client *osdc;
4010         int rc = -ENOMEM;
4011
4012         if (!try_module_get(THIS_MODULE))
4013                 return -ENODEV;
4014
4015         /* parse add command */
4016         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4017         if (rc < 0)
4018                 goto err_out_module;
4019
4020         rbdc = rbd_get_client(ceph_opts);
4021         if (IS_ERR(rbdc)) {
4022                 rc = PTR_ERR(rbdc);
4023                 goto err_out_args;
4024         }
4025         ceph_opts = NULL;       /* rbd_dev client now owns this */
4026
4027         /* pick the pool */
4028         osdc = &rbdc->client->osdc;
4029         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4030         if (rc < 0)
4031                 goto err_out_client;
4032         spec->pool_id = (u64) rc;
4033
4034         /* The ceph file layout needs to fit pool id in 32 bits */
4035
4036         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4037                 rc = -EIO;
4038                 goto err_out_client;
4039         }
4040
4041         rbd_dev = rbd_dev_create(rbdc, spec);
4042         if (!rbd_dev)
4043                 goto err_out_client;
4044         rbdc = NULL;            /* rbd_dev now owns this */
4045         spec = NULL;            /* rbd_dev now owns this */
4046
4047         rbd_dev->mapping.read_only = rbd_opts->read_only;
4048         kfree(rbd_opts);
4049         rbd_opts = NULL;        /* done with this */
4050
4051         rc = rbd_dev_probe(rbd_dev);
4052         if (rc < 0)
4053                 goto err_out_rbd_dev;
4054
4055         return count;
4056 err_out_rbd_dev:
4057         rbd_dev_destroy(rbd_dev);
4058 err_out_client:
4059         rbd_put_client(rbdc);
4060 err_out_args:
4061         if (ceph_opts)
4062                 ceph_destroy_options(ceph_opts);
4063         kfree(rbd_opts);
4064         rbd_spec_put(spec);
4065 err_out_module:
4066         module_put(THIS_MODULE);
4067
4068         dout("Error adding device %s\n", buf);
4069
4070         return (ssize_t) rc;
4071 }
4072
4073 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4074 {
4075         struct list_head *tmp;
4076         struct rbd_device *rbd_dev;
4077
4078         spin_lock(&rbd_dev_list_lock);
4079         list_for_each(tmp, &rbd_dev_list) {
4080                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4081                 if (rbd_dev->dev_id == dev_id) {
4082                         spin_unlock(&rbd_dev_list_lock);
4083                         return rbd_dev;
4084                 }
4085         }
4086         spin_unlock(&rbd_dev_list_lock);
4087         return NULL;
4088 }
4089
4090 static void rbd_dev_release(struct device *dev)
4091 {
4092         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4093
4094         if (rbd_dev->watch_event)
4095                 rbd_dev_header_watch_sync(rbd_dev, 0);
4096
4097         /* clean up and free blkdev */
4098         rbd_free_disk(rbd_dev);
4099         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4100
4101         /* release allocated disk header fields */
4102         rbd_header_free(&rbd_dev->header);
4103
4104         /* done with the id, and with the rbd_dev */
4105         rbd_dev_id_put(rbd_dev);
4106         rbd_assert(rbd_dev->rbd_client != NULL);
4107         rbd_dev_destroy(rbd_dev);
4108
4109         /* release module ref */
4110         module_put(THIS_MODULE);
4111 }
4112
4113 static ssize_t rbd_remove(struct bus_type *bus,
4114                           const char *buf,
4115                           size_t count)
4116 {
4117         struct rbd_device *rbd_dev = NULL;
4118         int target_id, rc;
4119         unsigned long ul;
4120         int ret = count;
4121
4122         rc = strict_strtoul(buf, 10, &ul);
4123         if (rc)
4124                 return rc;
4125
4126         /* convert to int; abort if we lost anything in the conversion */
4127         target_id = (int) ul;
4128         if (target_id != ul)
4129                 return -EINVAL;
4130
4131         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4132
4133         rbd_dev = __rbd_get_dev(target_id);
4134         if (!rbd_dev) {
4135                 ret = -ENOENT;
4136                 goto done;
4137         }
4138
4139         spin_lock_irq(&rbd_dev->lock);
4140         if (rbd_dev->open_count)
4141                 ret = -EBUSY;
4142         else
4143                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4144         spin_unlock_irq(&rbd_dev->lock);
4145         if (ret < 0)
4146                 goto done;
4147
4148         rbd_remove_all_snaps(rbd_dev);
4149         rbd_bus_del_dev(rbd_dev);
4150
4151 done:
4152         mutex_unlock(&ctl_mutex);
4153
4154         return ret;
4155 }
4156
4157 /*
4158  * create control files in sysfs
4159  * /sys/bus/rbd/...
4160  */
4161 static int rbd_sysfs_init(void)
4162 {
4163         int ret;
4164
4165         ret = device_register(&rbd_root_dev);
4166         if (ret < 0)
4167                 return ret;
4168
4169         ret = bus_register(&rbd_bus_type);
4170         if (ret < 0)
4171                 device_unregister(&rbd_root_dev);
4172
4173         return ret;
4174 }
4175
4176 static void rbd_sysfs_cleanup(void)
4177 {
4178         bus_unregister(&rbd_bus_type);
4179         device_unregister(&rbd_root_dev);
4180 }
4181
4182 static int __init rbd_init(void)
4183 {
4184         int rc;
4185
4186         if (!libceph_compatible(NULL)) {
4187                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4188
4189                 return -EINVAL;
4190         }
4191         rc = rbd_sysfs_init();
4192         if (rc)
4193                 return rc;
4194         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4195         return 0;
4196 }
4197
4198 static void __exit rbd_exit(void)
4199 {
4200         rbd_sysfs_cleanup();
4201 }
4202
4203 module_init(rbd_init);
4204 module_exit(rbd_exit);
4205
4206 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4207 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4208 MODULE_DESCRIPTION("rados block device");
4209
4210 /* following authorship retained from original osdblk.c */
4211 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4212
4213 MODULE_LICENSE("GPL");