]> Pileus Git - ~andy/linux/blob - drivers/block/rbd.c
rbd: define image request originator flag
[~andy/linux] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 struct rbd_obj_request {
174         const char              *object_name;
175         u64                     offset;         /* object start byte */
176         u64                     length;         /* bytes from offset */
177
178         struct rbd_img_request  *img_request;
179         u64                     img_offset;     /* image relative offset */
180         struct list_head        links;          /* img_request->obj_requests */
181         u32                     which;          /* posn image request list */
182
183         enum obj_request_type   type;
184         union {
185                 struct bio      *bio_list;
186                 struct {
187                         struct page     **pages;
188                         u32             page_count;
189                 };
190         };
191
192         struct ceph_osd_request *osd_req;
193
194         u64                     xferred;        /* bytes transferred */
195         u64                     version;
196         int                     result;
197         atomic_t                done;
198
199         rbd_obj_callback_t      callback;
200         struct completion       completion;
201
202         struct kref             kref;
203 };
204
205 enum img_req_flags {
206         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
207         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
208 };
209
210 struct rbd_img_request {
211         struct rbd_device       *rbd_dev;
212         u64                     offset; /* starting image byte offset */
213         u64                     length; /* byte count from offset */
214         unsigned long           flags;
215         union {
216                 u64                     snap_id;        /* for reads */
217                 struct ceph_snap_context *snapc;        /* for writes */
218         };
219         union {
220                 struct request          *rq;            /* block request */
221                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
222         };
223         spinlock_t              completion_lock;/* protects next_completion */
224         u32                     next_completion;
225         rbd_img_callback_t      callback;
226         u64                     xferred;/* aggregate bytes transferred */
227         int                     result; /* first nonzero obj_request result */
228
229         u32                     obj_request_count;
230         struct list_head        obj_requests;   /* rbd_obj_request structs */
231
232         struct kref             kref;
233 };
234
235 #define for_each_obj_request(ireq, oreq) \
236         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
237 #define for_each_obj_request_from(ireq, oreq) \
238         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
239 #define for_each_obj_request_safe(ireq, oreq, n) \
240         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
241
242 struct rbd_snap {
243         struct  device          dev;
244         const char              *name;
245         u64                     size;
246         struct list_head        node;
247         u64                     id;
248         u64                     features;
249 };
250
251 struct rbd_mapping {
252         u64                     size;
253         u64                     features;
254         bool                    read_only;
255 };
256
257 /*
258  * a single device
259  */
260 struct rbd_device {
261         int                     dev_id;         /* blkdev unique id */
262
263         int                     major;          /* blkdev assigned major */
264         struct gendisk          *disk;          /* blkdev's gendisk and rq */
265
266         u32                     image_format;   /* Either 1 or 2 */
267         struct rbd_client       *rbd_client;
268
269         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
270
271         spinlock_t              lock;           /* queue, flags, open_count */
272
273         struct rbd_image_header header;
274         unsigned long           flags;          /* possibly lock protected */
275         struct rbd_spec         *spec;
276
277         char                    *header_name;
278
279         struct ceph_file_layout layout;
280
281         struct ceph_osd_event   *watch_event;
282         struct rbd_obj_request  *watch_request;
283
284         struct rbd_spec         *parent_spec;
285         u64                     parent_overlap;
286
287         /* protects updating the header */
288         struct rw_semaphore     header_rwsem;
289
290         struct rbd_mapping      mapping;
291
292         struct list_head        node;
293
294         /* list of snapshots */
295         struct list_head        snaps;
296
297         /* sysfs related */
298         struct device           dev;
299         unsigned long           open_count;     /* protected by lock */
300 };
301
302 /*
303  * Flag bits for rbd_dev->flags.  If atomicity is required,
304  * rbd_dev->lock is used to protect access.
305  *
306  * Currently, only the "removing" flag (which is coupled with the
307  * "open_count" field) requires atomic access.
308  */
309 enum rbd_dev_flags {
310         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
311         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
312 };
313
314 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
315
316 static LIST_HEAD(rbd_dev_list);    /* devices */
317 static DEFINE_SPINLOCK(rbd_dev_list_lock);
318
319 static LIST_HEAD(rbd_client_list);              /* clients */
320 static DEFINE_SPINLOCK(rbd_client_list_lock);
321
322 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
323 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
324
325 static void rbd_dev_release(struct device *dev);
326 static void rbd_remove_snap_dev(struct rbd_snap *snap);
327
328 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
329                        size_t count);
330 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
331                           size_t count);
332
333 static struct bus_attribute rbd_bus_attrs[] = {
334         __ATTR(add, S_IWUSR, NULL, rbd_add),
335         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
336         __ATTR_NULL
337 };
338
339 static struct bus_type rbd_bus_type = {
340         .name           = "rbd",
341         .bus_attrs      = rbd_bus_attrs,
342 };
343
344 static void rbd_root_dev_release(struct device *dev)
345 {
346 }
347
348 static struct device rbd_root_dev = {
349         .init_name =    "rbd",
350         .release =      rbd_root_dev_release,
351 };
352
353 static __printf(2, 3)
354 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
355 {
356         struct va_format vaf;
357         va_list args;
358
359         va_start(args, fmt);
360         vaf.fmt = fmt;
361         vaf.va = &args;
362
363         if (!rbd_dev)
364                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
365         else if (rbd_dev->disk)
366                 printk(KERN_WARNING "%s: %s: %pV\n",
367                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
368         else if (rbd_dev->spec && rbd_dev->spec->image_name)
369                 printk(KERN_WARNING "%s: image %s: %pV\n",
370                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
371         else if (rbd_dev->spec && rbd_dev->spec->image_id)
372                 printk(KERN_WARNING "%s: id %s: %pV\n",
373                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
374         else    /* punt */
375                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
376                         RBD_DRV_NAME, rbd_dev, &vaf);
377         va_end(args);
378 }
379
380 #ifdef RBD_DEBUG
381 #define rbd_assert(expr)                                                \
382                 if (unlikely(!(expr))) {                                \
383                         printk(KERN_ERR "\nAssertion failure in %s() "  \
384                                                 "at line %d:\n\n"       \
385                                         "\trbd_assert(%s);\n\n",        \
386                                         __func__, __LINE__, #expr);     \
387                         BUG();                                          \
388                 }
389 #else /* !RBD_DEBUG */
390 #  define rbd_assert(expr)      ((void) 0)
391 #endif /* !RBD_DEBUG */
392
393 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
394 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
395
396 static int rbd_open(struct block_device *bdev, fmode_t mode)
397 {
398         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
399         bool removing = false;
400
401         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
402                 return -EROFS;
403
404         spin_lock_irq(&rbd_dev->lock);
405         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
406                 removing = true;
407         else
408                 rbd_dev->open_count++;
409         spin_unlock_irq(&rbd_dev->lock);
410         if (removing)
411                 return -ENOENT;
412
413         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
414         (void) get_device(&rbd_dev->dev);
415         set_device_ro(bdev, rbd_dev->mapping.read_only);
416         mutex_unlock(&ctl_mutex);
417
418         return 0;
419 }
420
421 static int rbd_release(struct gendisk *disk, fmode_t mode)
422 {
423         struct rbd_device *rbd_dev = disk->private_data;
424         unsigned long open_count_before;
425
426         spin_lock_irq(&rbd_dev->lock);
427         open_count_before = rbd_dev->open_count--;
428         spin_unlock_irq(&rbd_dev->lock);
429         rbd_assert(open_count_before > 0);
430
431         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
432         put_device(&rbd_dev->dev);
433         mutex_unlock(&ctl_mutex);
434
435         return 0;
436 }
437
438 static const struct block_device_operations rbd_bd_ops = {
439         .owner                  = THIS_MODULE,
440         .open                   = rbd_open,
441         .release                = rbd_release,
442 };
443
444 /*
445  * Initialize an rbd client instance.
446  * We own *ceph_opts.
447  */
448 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
449 {
450         struct rbd_client *rbdc;
451         int ret = -ENOMEM;
452
453         dout("%s:\n", __func__);
454         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
455         if (!rbdc)
456                 goto out_opt;
457
458         kref_init(&rbdc->kref);
459         INIT_LIST_HEAD(&rbdc->node);
460
461         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
462
463         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
464         if (IS_ERR(rbdc->client))
465                 goto out_mutex;
466         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
467
468         ret = ceph_open_session(rbdc->client);
469         if (ret < 0)
470                 goto out_err;
471
472         spin_lock(&rbd_client_list_lock);
473         list_add_tail(&rbdc->node, &rbd_client_list);
474         spin_unlock(&rbd_client_list_lock);
475
476         mutex_unlock(&ctl_mutex);
477         dout("%s: rbdc %p\n", __func__, rbdc);
478
479         return rbdc;
480
481 out_err:
482         ceph_destroy_client(rbdc->client);
483 out_mutex:
484         mutex_unlock(&ctl_mutex);
485         kfree(rbdc);
486 out_opt:
487         if (ceph_opts)
488                 ceph_destroy_options(ceph_opts);
489         dout("%s: error %d\n", __func__, ret);
490
491         return ERR_PTR(ret);
492 }
493
494 /*
495  * Find a ceph client with specific addr and configuration.  If
496  * found, bump its reference count.
497  */
498 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
499 {
500         struct rbd_client *client_node;
501         bool found = false;
502
503         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
504                 return NULL;
505
506         spin_lock(&rbd_client_list_lock);
507         list_for_each_entry(client_node, &rbd_client_list, node) {
508                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
509                         kref_get(&client_node->kref);
510                         found = true;
511                         break;
512                 }
513         }
514         spin_unlock(&rbd_client_list_lock);
515
516         return found ? client_node : NULL;
517 }
518
519 /*
520  * mount options
521  */
522 enum {
523         Opt_last_int,
524         /* int args above */
525         Opt_last_string,
526         /* string args above */
527         Opt_read_only,
528         Opt_read_write,
529         /* Boolean args above */
530         Opt_last_bool,
531 };
532
533 static match_table_t rbd_opts_tokens = {
534         /* int args above */
535         /* string args above */
536         {Opt_read_only, "read_only"},
537         {Opt_read_only, "ro"},          /* Alternate spelling */
538         {Opt_read_write, "read_write"},
539         {Opt_read_write, "rw"},         /* Alternate spelling */
540         /* Boolean args above */
541         {-1, NULL}
542 };
543
544 struct rbd_options {
545         bool    read_only;
546 };
547
548 #define RBD_READ_ONLY_DEFAULT   false
549
550 static int parse_rbd_opts_token(char *c, void *private)
551 {
552         struct rbd_options *rbd_opts = private;
553         substring_t argstr[MAX_OPT_ARGS];
554         int token, intval, ret;
555
556         token = match_token(c, rbd_opts_tokens, argstr);
557         if (token < 0)
558                 return -EINVAL;
559
560         if (token < Opt_last_int) {
561                 ret = match_int(&argstr[0], &intval);
562                 if (ret < 0) {
563                         pr_err("bad mount option arg (not int) "
564                                "at '%s'\n", c);
565                         return ret;
566                 }
567                 dout("got int token %d val %d\n", token, intval);
568         } else if (token > Opt_last_int && token < Opt_last_string) {
569                 dout("got string token %d val %s\n", token,
570                      argstr[0].from);
571         } else if (token > Opt_last_string && token < Opt_last_bool) {
572                 dout("got Boolean token %d\n", token);
573         } else {
574                 dout("got token %d\n", token);
575         }
576
577         switch (token) {
578         case Opt_read_only:
579                 rbd_opts->read_only = true;
580                 break;
581         case Opt_read_write:
582                 rbd_opts->read_only = false;
583                 break;
584         default:
585                 rbd_assert(false);
586                 break;
587         }
588         return 0;
589 }
590
591 /*
592  * Get a ceph client with specific addr and configuration, if one does
593  * not exist create it.
594  */
595 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
596 {
597         struct rbd_client *rbdc;
598
599         rbdc = rbd_client_find(ceph_opts);
600         if (rbdc)       /* using an existing client */
601                 ceph_destroy_options(ceph_opts);
602         else
603                 rbdc = rbd_client_create(ceph_opts);
604
605         return rbdc;
606 }
607
608 /*
609  * Destroy ceph client
610  *
611  * Caller must hold rbd_client_list_lock.
612  */
613 static void rbd_client_release(struct kref *kref)
614 {
615         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
616
617         dout("%s: rbdc %p\n", __func__, rbdc);
618         spin_lock(&rbd_client_list_lock);
619         list_del(&rbdc->node);
620         spin_unlock(&rbd_client_list_lock);
621
622         ceph_destroy_client(rbdc->client);
623         kfree(rbdc);
624 }
625
626 /*
627  * Drop reference to ceph client node. If it's not referenced anymore, release
628  * it.
629  */
630 static void rbd_put_client(struct rbd_client *rbdc)
631 {
632         if (rbdc)
633                 kref_put(&rbdc->kref, rbd_client_release);
634 }
635
636 static bool rbd_image_format_valid(u32 image_format)
637 {
638         return image_format == 1 || image_format == 2;
639 }
640
641 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
642 {
643         size_t size;
644         u32 snap_count;
645
646         /* The header has to start with the magic rbd header text */
647         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
648                 return false;
649
650         /* The bio layer requires at least sector-sized I/O */
651
652         if (ondisk->options.order < SECTOR_SHIFT)
653                 return false;
654
655         /* If we use u64 in a few spots we may be able to loosen this */
656
657         if (ondisk->options.order > 8 * sizeof (int) - 1)
658                 return false;
659
660         /*
661          * The size of a snapshot header has to fit in a size_t, and
662          * that limits the number of snapshots.
663          */
664         snap_count = le32_to_cpu(ondisk->snap_count);
665         size = SIZE_MAX - sizeof (struct ceph_snap_context);
666         if (snap_count > size / sizeof (__le64))
667                 return false;
668
669         /*
670          * Not only that, but the size of the entire the snapshot
671          * header must also be representable in a size_t.
672          */
673         size -= snap_count * sizeof (__le64);
674         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
675                 return false;
676
677         return true;
678 }
679
680 /*
681  * Create a new header structure, translate header format from the on-disk
682  * header.
683  */
684 static int rbd_header_from_disk(struct rbd_image_header *header,
685                                  struct rbd_image_header_ondisk *ondisk)
686 {
687         u32 snap_count;
688         size_t len;
689         size_t size;
690         u32 i;
691
692         memset(header, 0, sizeof (*header));
693
694         snap_count = le32_to_cpu(ondisk->snap_count);
695
696         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
697         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
698         if (!header->object_prefix)
699                 return -ENOMEM;
700         memcpy(header->object_prefix, ondisk->object_prefix, len);
701         header->object_prefix[len] = '\0';
702
703         if (snap_count) {
704                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
705
706                 /* Save a copy of the snapshot names */
707
708                 if (snap_names_len > (u64) SIZE_MAX)
709                         return -EIO;
710                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
711                 if (!header->snap_names)
712                         goto out_err;
713                 /*
714                  * Note that rbd_dev_v1_header_read() guarantees
715                  * the ondisk buffer we're working with has
716                  * snap_names_len bytes beyond the end of the
717                  * snapshot id array, this memcpy() is safe.
718                  */
719                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
720                         snap_names_len);
721
722                 /* Record each snapshot's size */
723
724                 size = snap_count * sizeof (*header->snap_sizes);
725                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
726                 if (!header->snap_sizes)
727                         goto out_err;
728                 for (i = 0; i < snap_count; i++)
729                         header->snap_sizes[i] =
730                                 le64_to_cpu(ondisk->snaps[i].image_size);
731         } else {
732                 WARN_ON(ondisk->snap_names_len);
733                 header->snap_names = NULL;
734                 header->snap_sizes = NULL;
735         }
736
737         header->features = 0;   /* No features support in v1 images */
738         header->obj_order = ondisk->options.order;
739         header->crypt_type = ondisk->options.crypt_type;
740         header->comp_type = ondisk->options.comp_type;
741
742         /* Allocate and fill in the snapshot context */
743
744         header->image_size = le64_to_cpu(ondisk->image_size);
745         size = sizeof (struct ceph_snap_context);
746         size += snap_count * sizeof (header->snapc->snaps[0]);
747         header->snapc = kzalloc(size, GFP_KERNEL);
748         if (!header->snapc)
749                 goto out_err;
750
751         atomic_set(&header->snapc->nref, 1);
752         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
753         header->snapc->num_snaps = snap_count;
754         for (i = 0; i < snap_count; i++)
755                 header->snapc->snaps[i] =
756                         le64_to_cpu(ondisk->snaps[i].id);
757
758         return 0;
759
760 out_err:
761         kfree(header->snap_sizes);
762         header->snap_sizes = NULL;
763         kfree(header->snap_names);
764         header->snap_names = NULL;
765         kfree(header->object_prefix);
766         header->object_prefix = NULL;
767
768         return -ENOMEM;
769 }
770
771 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
772 {
773         struct rbd_snap *snap;
774
775         if (snap_id == CEPH_NOSNAP)
776                 return RBD_SNAP_HEAD_NAME;
777
778         list_for_each_entry(snap, &rbd_dev->snaps, node)
779                 if (snap_id == snap->id)
780                         return snap->name;
781
782         return NULL;
783 }
784
785 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
786 {
787
788         struct rbd_snap *snap;
789
790         list_for_each_entry(snap, &rbd_dev->snaps, node) {
791                 if (!strcmp(snap_name, snap->name)) {
792                         rbd_dev->spec->snap_id = snap->id;
793                         rbd_dev->mapping.size = snap->size;
794                         rbd_dev->mapping.features = snap->features;
795
796                         return 0;
797                 }
798         }
799
800         return -ENOENT;
801 }
802
803 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
804 {
805         int ret;
806
807         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
808                     sizeof (RBD_SNAP_HEAD_NAME))) {
809                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
810                 rbd_dev->mapping.size = rbd_dev->header.image_size;
811                 rbd_dev->mapping.features = rbd_dev->header.features;
812                 ret = 0;
813         } else {
814                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
815                 if (ret < 0)
816                         goto done;
817                 rbd_dev->mapping.read_only = true;
818         }
819         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
820
821 done:
822         return ret;
823 }
824
825 static void rbd_header_free(struct rbd_image_header *header)
826 {
827         kfree(header->object_prefix);
828         header->object_prefix = NULL;
829         kfree(header->snap_sizes);
830         header->snap_sizes = NULL;
831         kfree(header->snap_names);
832         header->snap_names = NULL;
833         ceph_put_snap_context(header->snapc);
834         header->snapc = NULL;
835 }
836
837 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
838 {
839         char *name;
840         u64 segment;
841         int ret;
842
843         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
844         if (!name)
845                 return NULL;
846         segment = offset >> rbd_dev->header.obj_order;
847         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
848                         rbd_dev->header.object_prefix, segment);
849         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
850                 pr_err("error formatting segment name for #%llu (%d)\n",
851                         segment, ret);
852                 kfree(name);
853                 name = NULL;
854         }
855
856         return name;
857 }
858
859 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
860 {
861         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
862
863         return offset & (segment_size - 1);
864 }
865
866 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
867                                 u64 offset, u64 length)
868 {
869         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
870
871         offset &= segment_size - 1;
872
873         rbd_assert(length <= U64_MAX - offset);
874         if (offset + length > segment_size)
875                 length = segment_size - offset;
876
877         return length;
878 }
879
880 /*
881  * returns the size of an object in the image
882  */
883 static u64 rbd_obj_bytes(struct rbd_image_header *header)
884 {
885         return 1 << header->obj_order;
886 }
887
888 /*
889  * bio helpers
890  */
891
892 static void bio_chain_put(struct bio *chain)
893 {
894         struct bio *tmp;
895
896         while (chain) {
897                 tmp = chain;
898                 chain = chain->bi_next;
899                 bio_put(tmp);
900         }
901 }
902
903 /*
904  * zeros a bio chain, starting at specific offset
905  */
906 static void zero_bio_chain(struct bio *chain, int start_ofs)
907 {
908         struct bio_vec *bv;
909         unsigned long flags;
910         void *buf;
911         int i;
912         int pos = 0;
913
914         while (chain) {
915                 bio_for_each_segment(bv, chain, i) {
916                         if (pos + bv->bv_len > start_ofs) {
917                                 int remainder = max(start_ofs - pos, 0);
918                                 buf = bvec_kmap_irq(bv, &flags);
919                                 memset(buf + remainder, 0,
920                                        bv->bv_len - remainder);
921                                 bvec_kunmap_irq(buf, &flags);
922                         }
923                         pos += bv->bv_len;
924                 }
925
926                 chain = chain->bi_next;
927         }
928 }
929
930 /*
931  * Clone a portion of a bio, starting at the given byte offset
932  * and continuing for the number of bytes indicated.
933  */
934 static struct bio *bio_clone_range(struct bio *bio_src,
935                                         unsigned int offset,
936                                         unsigned int len,
937                                         gfp_t gfpmask)
938 {
939         struct bio_vec *bv;
940         unsigned int resid;
941         unsigned short idx;
942         unsigned int voff;
943         unsigned short end_idx;
944         unsigned short vcnt;
945         struct bio *bio;
946
947         /* Handle the easy case for the caller */
948
949         if (!offset && len == bio_src->bi_size)
950                 return bio_clone(bio_src, gfpmask);
951
952         if (WARN_ON_ONCE(!len))
953                 return NULL;
954         if (WARN_ON_ONCE(len > bio_src->bi_size))
955                 return NULL;
956         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
957                 return NULL;
958
959         /* Find first affected segment... */
960
961         resid = offset;
962         __bio_for_each_segment(bv, bio_src, idx, 0) {
963                 if (resid < bv->bv_len)
964                         break;
965                 resid -= bv->bv_len;
966         }
967         voff = resid;
968
969         /* ...and the last affected segment */
970
971         resid += len;
972         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
973                 if (resid <= bv->bv_len)
974                         break;
975                 resid -= bv->bv_len;
976         }
977         vcnt = end_idx - idx + 1;
978
979         /* Build the clone */
980
981         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
982         if (!bio)
983                 return NULL;    /* ENOMEM */
984
985         bio->bi_bdev = bio_src->bi_bdev;
986         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
987         bio->bi_rw = bio_src->bi_rw;
988         bio->bi_flags |= 1 << BIO_CLONED;
989
990         /*
991          * Copy over our part of the bio_vec, then update the first
992          * and last (or only) entries.
993          */
994         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
995                         vcnt * sizeof (struct bio_vec));
996         bio->bi_io_vec[0].bv_offset += voff;
997         if (vcnt > 1) {
998                 bio->bi_io_vec[0].bv_len -= voff;
999                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1000         } else {
1001                 bio->bi_io_vec[0].bv_len = len;
1002         }
1003
1004         bio->bi_vcnt = vcnt;
1005         bio->bi_size = len;
1006         bio->bi_idx = 0;
1007
1008         return bio;
1009 }
1010
1011 /*
1012  * Clone a portion of a bio chain, starting at the given byte offset
1013  * into the first bio in the source chain and continuing for the
1014  * number of bytes indicated.  The result is another bio chain of
1015  * exactly the given length, or a null pointer on error.
1016  *
1017  * The bio_src and offset parameters are both in-out.  On entry they
1018  * refer to the first source bio and the offset into that bio where
1019  * the start of data to be cloned is located.
1020  *
1021  * On return, bio_src is updated to refer to the bio in the source
1022  * chain that contains first un-cloned byte, and *offset will
1023  * contain the offset of that byte within that bio.
1024  */
1025 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1026                                         unsigned int *offset,
1027                                         unsigned int len,
1028                                         gfp_t gfpmask)
1029 {
1030         struct bio *bi = *bio_src;
1031         unsigned int off = *offset;
1032         struct bio *chain = NULL;
1033         struct bio **end;
1034
1035         /* Build up a chain of clone bios up to the limit */
1036
1037         if (!bi || off >= bi->bi_size || !len)
1038                 return NULL;            /* Nothing to clone */
1039
1040         end = &chain;
1041         while (len) {
1042                 unsigned int bi_size;
1043                 struct bio *bio;
1044
1045                 if (!bi) {
1046                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1047                         goto out_err;   /* EINVAL; ran out of bio's */
1048                 }
1049                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1050                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1051                 if (!bio)
1052                         goto out_err;   /* ENOMEM */
1053
1054                 *end = bio;
1055                 end = &bio->bi_next;
1056
1057                 off += bi_size;
1058                 if (off == bi->bi_size) {
1059                         bi = bi->bi_next;
1060                         off = 0;
1061                 }
1062                 len -= bi_size;
1063         }
1064         *bio_src = bi;
1065         *offset = off;
1066
1067         return chain;
1068 out_err:
1069         bio_chain_put(chain);
1070
1071         return NULL;
1072 }
1073
1074 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1075 {
1076         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1077                 atomic_read(&obj_request->kref.refcount));
1078         kref_get(&obj_request->kref);
1079 }
1080
1081 static void rbd_obj_request_destroy(struct kref *kref);
1082 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1083 {
1084         rbd_assert(obj_request != NULL);
1085         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1086                 atomic_read(&obj_request->kref.refcount));
1087         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1088 }
1089
1090 static void rbd_img_request_get(struct rbd_img_request *img_request)
1091 {
1092         dout("%s: img %p (was %d)\n", __func__, img_request,
1093                 atomic_read(&img_request->kref.refcount));
1094         kref_get(&img_request->kref);
1095 }
1096
1097 static void rbd_img_request_destroy(struct kref *kref);
1098 static void rbd_img_request_put(struct rbd_img_request *img_request)
1099 {
1100         rbd_assert(img_request != NULL);
1101         dout("%s: img %p (was %d)\n", __func__, img_request,
1102                 atomic_read(&img_request->kref.refcount));
1103         kref_put(&img_request->kref, rbd_img_request_destroy);
1104 }
1105
1106 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1107                                         struct rbd_obj_request *obj_request)
1108 {
1109         rbd_assert(obj_request->img_request == NULL);
1110
1111         rbd_obj_request_get(obj_request);
1112         obj_request->img_request = img_request;
1113         obj_request->which = img_request->obj_request_count;
1114         rbd_assert(obj_request->which != BAD_WHICH);
1115         img_request->obj_request_count++;
1116         list_add_tail(&obj_request->links, &img_request->obj_requests);
1117         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1118                 obj_request->which);
1119 }
1120
1121 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1122                                         struct rbd_obj_request *obj_request)
1123 {
1124         rbd_assert(obj_request->which != BAD_WHICH);
1125
1126         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1127                 obj_request->which);
1128         list_del(&obj_request->links);
1129         rbd_assert(img_request->obj_request_count > 0);
1130         img_request->obj_request_count--;
1131         rbd_assert(obj_request->which == img_request->obj_request_count);
1132         obj_request->which = BAD_WHICH;
1133         rbd_assert(obj_request->img_request == img_request);
1134         obj_request->img_request = NULL;
1135         obj_request->callback = NULL;
1136         rbd_obj_request_put(obj_request);
1137 }
1138
1139 static bool obj_request_type_valid(enum obj_request_type type)
1140 {
1141         switch (type) {
1142         case OBJ_REQUEST_NODATA:
1143         case OBJ_REQUEST_BIO:
1144         case OBJ_REQUEST_PAGES:
1145                 return true;
1146         default:
1147                 return false;
1148         }
1149 }
1150
1151 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1152                                 struct rbd_obj_request *obj_request)
1153 {
1154         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1155
1156         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1157 }
1158
1159 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1160 {
1161
1162         dout("%s: img %p\n", __func__, img_request);
1163
1164         /*
1165          * If no error occurred, compute the aggregate transfer
1166          * count for the image request.  We could instead use
1167          * atomic64_cmpxchg() to update it as each object request
1168          * completes; not clear which way is better off hand.
1169          */
1170         if (!img_request->result) {
1171                 struct rbd_obj_request *obj_request;
1172                 u64 xferred = 0;
1173
1174                 for_each_obj_request(img_request, obj_request)
1175                         xferred += obj_request->xferred;
1176                 img_request->xferred = xferred;
1177         }
1178
1179         if (img_request->callback)
1180                 img_request->callback(img_request);
1181         else
1182                 rbd_img_request_put(img_request);
1183 }
1184
1185 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1186
1187 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1188 {
1189         dout("%s: obj %p\n", __func__, obj_request);
1190
1191         return wait_for_completion_interruptible(&obj_request->completion);
1192 }
1193
1194 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1195 {
1196         atomic_set(&obj_request->done, 0);
1197         smp_wmb();
1198 }
1199
1200 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1201 {
1202         int done;
1203
1204         done = atomic_inc_return(&obj_request->done);
1205         if (done > 1) {
1206                 struct rbd_img_request *img_request = obj_request->img_request;
1207                 struct rbd_device *rbd_dev;
1208
1209                 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1210                 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1211                         obj_request);
1212         }
1213 }
1214
1215 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1216 {
1217         smp_mb();
1218         return atomic_read(&obj_request->done) != 0;
1219 }
1220
1221 /*
1222  * The default/initial value for all image request flags is 0.  Each
1223  * is conditionally set to 1 at image request initialization time
1224  * and currently never change thereafter.
1225  */
1226 static void img_request_write_set(struct rbd_img_request *img_request)
1227 {
1228         set_bit(IMG_REQ_WRITE, &img_request->flags);
1229         smp_mb();
1230 }
1231
1232 static bool img_request_write_test(struct rbd_img_request *img_request)
1233 {
1234         smp_mb();
1235         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1236 }
1237
1238 static void img_request_child_set(struct rbd_img_request *img_request)
1239 {
1240         set_bit(IMG_REQ_CHILD, &img_request->flags);
1241         smp_mb();
1242 }
1243
1244 static bool img_request_child_test(struct rbd_img_request *img_request)
1245 {
1246         smp_mb();
1247         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1248 }
1249
1250 static void
1251 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1252 {
1253         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1254                 obj_request, obj_request->img_request, obj_request->result,
1255                 obj_request->xferred, obj_request->length);
1256         /*
1257          * ENOENT means a hole in the image.  We zero-fill the
1258          * entire length of the request.  A short read also implies
1259          * zero-fill to the end of the request.  Either way we
1260          * update the xferred count to indicate the whole request
1261          * was satisfied.
1262          */
1263         BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1264         if (obj_request->result == -ENOENT) {
1265                 zero_bio_chain(obj_request->bio_list, 0);
1266                 obj_request->result = 0;
1267                 obj_request->xferred = obj_request->length;
1268         } else if (obj_request->xferred < obj_request->length &&
1269                         !obj_request->result) {
1270                 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1271                 obj_request->xferred = obj_request->length;
1272         }
1273         obj_request_done_set(obj_request);
1274 }
1275
1276 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1277 {
1278         dout("%s: obj %p cb %p\n", __func__, obj_request,
1279                 obj_request->callback);
1280         if (obj_request->callback)
1281                 obj_request->callback(obj_request);
1282         else
1283                 complete_all(&obj_request->completion);
1284 }
1285
1286 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1287 {
1288         dout("%s: obj %p\n", __func__, obj_request);
1289         obj_request_done_set(obj_request);
1290 }
1291
1292 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1293 {
1294         dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1295                 obj_request->result, obj_request->xferred, obj_request->length);
1296         if (obj_request->img_request)
1297                 rbd_img_obj_request_read_callback(obj_request);
1298         else
1299                 obj_request_done_set(obj_request);
1300 }
1301
1302 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1303 {
1304         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1305                 obj_request->result, obj_request->length);
1306         /*
1307          * There is no such thing as a successful short write.
1308          * Our xferred value is the number of bytes transferred
1309          * back.  Set it to our originally-requested length.
1310          */
1311         obj_request->xferred = obj_request->length;
1312         obj_request_done_set(obj_request);
1313 }
1314
1315 /*
1316  * For a simple stat call there's nothing to do.  We'll do more if
1317  * this is part of a write sequence for a layered image.
1318  */
1319 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1320 {
1321         dout("%s: obj %p\n", __func__, obj_request);
1322         obj_request_done_set(obj_request);
1323 }
1324
1325 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1326                                 struct ceph_msg *msg)
1327 {
1328         struct rbd_obj_request *obj_request = osd_req->r_priv;
1329         u16 opcode;
1330
1331         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1332         rbd_assert(osd_req == obj_request->osd_req);
1333         rbd_assert(!!obj_request->img_request ^
1334                                 (obj_request->which == BAD_WHICH));
1335
1336         if (osd_req->r_result < 0)
1337                 obj_request->result = osd_req->r_result;
1338         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1339
1340         WARN_ON(osd_req->r_num_ops != 1);       /* For now */
1341
1342         /*
1343          * We support a 64-bit length, but ultimately it has to be
1344          * passed to blk_end_request(), which takes an unsigned int.
1345          */
1346         obj_request->xferred = osd_req->r_reply_op_len[0];
1347         rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1348         opcode = osd_req->r_ops[0].op;
1349         switch (opcode) {
1350         case CEPH_OSD_OP_READ:
1351                 rbd_osd_read_callback(obj_request);
1352                 break;
1353         case CEPH_OSD_OP_WRITE:
1354                 rbd_osd_write_callback(obj_request);
1355                 break;
1356         case CEPH_OSD_OP_STAT:
1357                 rbd_osd_stat_callback(obj_request);
1358                 break;
1359         case CEPH_OSD_OP_CALL:
1360         case CEPH_OSD_OP_NOTIFY_ACK:
1361         case CEPH_OSD_OP_WATCH:
1362                 rbd_osd_trivial_callback(obj_request);
1363                 break;
1364         default:
1365                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1366                         obj_request->object_name, (unsigned short) opcode);
1367                 break;
1368         }
1369
1370         if (obj_request_done_test(obj_request))
1371                 rbd_obj_request_complete(obj_request);
1372 }
1373
1374 static void rbd_osd_req_format(struct rbd_obj_request *obj_request,
1375                                         bool write_request)
1376 {
1377         struct rbd_img_request *img_request = obj_request->img_request;
1378         struct ceph_osd_request *osd_req = obj_request->osd_req;
1379         struct ceph_snap_context *snapc = NULL;
1380         u64 snap_id = CEPH_NOSNAP;
1381         struct timespec *mtime = NULL;
1382         struct timespec now;
1383
1384         rbd_assert(osd_req != NULL);
1385
1386         if (write_request) {
1387                 now = CURRENT_TIME;
1388                 mtime = &now;
1389                 if (img_request)
1390                         snapc = img_request->snapc;
1391         } else if (img_request) {
1392                 snap_id = img_request->snap_id;
1393         }
1394         ceph_osdc_build_request(osd_req, obj_request->offset,
1395                         snapc, snap_id, mtime);
1396 }
1397
1398 static struct ceph_osd_request *rbd_osd_req_create(
1399                                         struct rbd_device *rbd_dev,
1400                                         bool write_request,
1401                                         struct rbd_obj_request *obj_request)
1402 {
1403         struct rbd_img_request *img_request = obj_request->img_request;
1404         struct ceph_snap_context *snapc = NULL;
1405         struct ceph_osd_client *osdc;
1406         struct ceph_osd_request *osd_req;
1407
1408         if (img_request) {
1409                 rbd_assert(write_request ==
1410                                 img_request_write_test(img_request));
1411                 if (write_request)
1412                         snapc = img_request->snapc;
1413         }
1414
1415         /* Allocate and initialize the request, for the single op */
1416
1417         osdc = &rbd_dev->rbd_client->client->osdc;
1418         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1419         if (!osd_req)
1420                 return NULL;    /* ENOMEM */
1421
1422         if (write_request)
1423                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1424         else
1425                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1426
1427         osd_req->r_callback = rbd_osd_req_callback;
1428         osd_req->r_priv = obj_request;
1429
1430         osd_req->r_oid_len = strlen(obj_request->object_name);
1431         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1432         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1433
1434         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1435
1436         return osd_req;
1437 }
1438
1439 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1440 {
1441         ceph_osdc_put_request(osd_req);
1442 }
1443
1444 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1445
1446 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1447                                                 u64 offset, u64 length,
1448                                                 enum obj_request_type type)
1449 {
1450         struct rbd_obj_request *obj_request;
1451         size_t size;
1452         char *name;
1453
1454         rbd_assert(obj_request_type_valid(type));
1455
1456         size = strlen(object_name) + 1;
1457         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1458         if (!obj_request)
1459                 return NULL;
1460
1461         name = (char *)(obj_request + 1);
1462         obj_request->object_name = memcpy(name, object_name, size);
1463         obj_request->offset = offset;
1464         obj_request->length = length;
1465         obj_request->which = BAD_WHICH;
1466         obj_request->type = type;
1467         INIT_LIST_HEAD(&obj_request->links);
1468         obj_request_done_init(obj_request);
1469         init_completion(&obj_request->completion);
1470         kref_init(&obj_request->kref);
1471
1472         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1473                 offset, length, (int)type, obj_request);
1474
1475         return obj_request;
1476 }
1477
1478 static void rbd_obj_request_destroy(struct kref *kref)
1479 {
1480         struct rbd_obj_request *obj_request;
1481
1482         obj_request = container_of(kref, struct rbd_obj_request, kref);
1483
1484         dout("%s: obj %p\n", __func__, obj_request);
1485
1486         rbd_assert(obj_request->img_request == NULL);
1487         rbd_assert(obj_request->which == BAD_WHICH);
1488
1489         if (obj_request->osd_req)
1490                 rbd_osd_req_destroy(obj_request->osd_req);
1491
1492         rbd_assert(obj_request_type_valid(obj_request->type));
1493         switch (obj_request->type) {
1494         case OBJ_REQUEST_NODATA:
1495                 break;          /* Nothing to do */
1496         case OBJ_REQUEST_BIO:
1497                 if (obj_request->bio_list)
1498                         bio_chain_put(obj_request->bio_list);
1499                 break;
1500         case OBJ_REQUEST_PAGES:
1501                 if (obj_request->pages)
1502                         ceph_release_page_vector(obj_request->pages,
1503                                                 obj_request->page_count);
1504                 break;
1505         }
1506
1507         kfree(obj_request);
1508 }
1509
1510 /*
1511  * Caller is responsible for filling in the list of object requests
1512  * that comprises the image request, and the Linux request pointer
1513  * (if there is one).
1514  */
1515 static struct rbd_img_request *rbd_img_request_create(
1516                                         struct rbd_device *rbd_dev,
1517                                         u64 offset, u64 length,
1518                                         bool write_request,
1519                                         bool child_request)
1520 {
1521         struct rbd_img_request *img_request;
1522         struct ceph_snap_context *snapc = NULL;
1523
1524         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1525         if (!img_request)
1526                 return NULL;
1527
1528         if (write_request) {
1529                 down_read(&rbd_dev->header_rwsem);
1530                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1531                 up_read(&rbd_dev->header_rwsem);
1532                 if (WARN_ON(!snapc)) {
1533                         kfree(img_request);
1534                         return NULL;    /* Shouldn't happen */
1535                 }
1536
1537         }
1538
1539         img_request->rq = NULL;
1540         img_request->rbd_dev = rbd_dev;
1541         img_request->offset = offset;
1542         img_request->length = length;
1543         img_request->flags = 0;
1544         if (write_request) {
1545                 img_request_write_set(img_request);
1546                 img_request->snapc = snapc;
1547         } else {
1548                 img_request->snap_id = rbd_dev->spec->snap_id;
1549         }
1550         if (child_request)
1551                 img_request_child_set(img_request);
1552         spin_lock_init(&img_request->completion_lock);
1553         img_request->next_completion = 0;
1554         img_request->callback = NULL;
1555         img_request->result = 0;
1556         img_request->obj_request_count = 0;
1557         INIT_LIST_HEAD(&img_request->obj_requests);
1558         kref_init(&img_request->kref);
1559
1560         rbd_img_request_get(img_request);       /* Avoid a warning */
1561         rbd_img_request_put(img_request);       /* TEMPORARY */
1562
1563         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1564                 write_request ? "write" : "read", offset, length,
1565                 img_request);
1566
1567         return img_request;
1568 }
1569
1570 static void rbd_img_request_destroy(struct kref *kref)
1571 {
1572         struct rbd_img_request *img_request;
1573         struct rbd_obj_request *obj_request;
1574         struct rbd_obj_request *next_obj_request;
1575
1576         img_request = container_of(kref, struct rbd_img_request, kref);
1577
1578         dout("%s: img %p\n", __func__, img_request);
1579
1580         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1581                 rbd_img_obj_request_del(img_request, obj_request);
1582         rbd_assert(img_request->obj_request_count == 0);
1583
1584         if (img_request_write_test(img_request))
1585                 ceph_put_snap_context(img_request->snapc);
1586
1587         kfree(img_request);
1588 }
1589
1590 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1591 {
1592         struct rbd_img_request *img_request;
1593         u32 which = obj_request->which;
1594         bool more = true;
1595
1596         img_request = obj_request->img_request;
1597
1598         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1599         rbd_assert(img_request != NULL);
1600         rbd_assert(!img_request_child_test(img_request))
1601         rbd_assert(img_request->rq != NULL);
1602
1603         rbd_assert(img_request->obj_request_count > 0);
1604         rbd_assert(which != BAD_WHICH);
1605         rbd_assert(which < img_request->obj_request_count);
1606         rbd_assert(which >= img_request->next_completion);
1607
1608         spin_lock_irq(&img_request->completion_lock);
1609         if (which != img_request->next_completion)
1610                 goto out;
1611
1612         for_each_obj_request_from(img_request, obj_request) {
1613                 unsigned int xferred;
1614                 int result;
1615
1616                 rbd_assert(more);
1617                 rbd_assert(which < img_request->obj_request_count);
1618
1619                 if (!obj_request_done_test(obj_request))
1620                         break;
1621
1622                 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1623                 xferred = (unsigned int)obj_request->xferred;
1624                 result = obj_request->result;
1625                 if (result) {
1626                         struct rbd_device *rbd_dev = img_request->rbd_dev;
1627
1628                         rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1629                                 img_request_write_test(img_request) ? "write"
1630                                                                     : "read",
1631                                 obj_request->length, obj_request->img_offset,
1632                                 obj_request->offset);
1633                         rbd_warn(rbd_dev, "  result %d xferred %x\n",
1634                                 result, xferred);
1635                         if (!img_request->result)
1636                                 img_request->result = result;
1637                 }
1638
1639                 more = blk_end_request(img_request->rq, result, xferred);
1640                 which++;
1641         }
1642
1643         rbd_assert(more ^ (which == img_request->obj_request_count));
1644         img_request->next_completion = which;
1645 out:
1646         spin_unlock_irq(&img_request->completion_lock);
1647
1648         if (!more)
1649                 rbd_img_request_complete(img_request);
1650 }
1651
1652 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1653                                         struct bio *bio_list)
1654 {
1655         struct rbd_device *rbd_dev = img_request->rbd_dev;
1656         struct rbd_obj_request *obj_request = NULL;
1657         struct rbd_obj_request *next_obj_request;
1658         bool write_request = img_request_write_test(img_request);
1659         unsigned int bio_offset;
1660         u64 img_offset;
1661         u64 resid;
1662         u16 opcode;
1663
1664         dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1665
1666         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1667         bio_offset = 0;
1668         img_offset = img_request->offset;
1669         rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1670         resid = img_request->length;
1671         rbd_assert(resid > 0);
1672         while (resid) {
1673                 struct ceph_osd_request *osd_req;
1674                 const char *object_name;
1675                 unsigned int clone_size;
1676                 u64 offset;
1677                 u64 length;
1678
1679                 object_name = rbd_segment_name(rbd_dev, img_offset);
1680                 if (!object_name)
1681                         goto out_unwind;
1682                 offset = rbd_segment_offset(rbd_dev, img_offset);
1683                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1684                 obj_request = rbd_obj_request_create(object_name,
1685                                                 offset, length,
1686                                                 OBJ_REQUEST_BIO);
1687                 kfree(object_name);     /* object request has its own copy */
1688                 if (!obj_request)
1689                         goto out_unwind;
1690
1691                 rbd_assert(length <= (u64) UINT_MAX);
1692                 clone_size = (unsigned int) length;
1693                 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1694                                                 &bio_offset, clone_size,
1695                                                 GFP_ATOMIC);
1696                 if (!obj_request->bio_list)
1697                         goto out_partial;
1698
1699                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1700                                                 obj_request);
1701                 if (!osd_req)
1702                         goto out_partial;
1703                 obj_request->osd_req = osd_req;
1704                 obj_request->callback = rbd_img_obj_callback;
1705
1706                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1707                                                 0, 0);
1708                 osd_req_op_extent_osd_data_bio(osd_req, 0, write_request,
1709                                 obj_request->bio_list, obj_request->length);
1710                 rbd_osd_req_format(obj_request, write_request);
1711
1712                 obj_request->img_offset = img_offset;
1713                 rbd_img_obj_request_add(img_request, obj_request);
1714
1715                 img_offset += length;
1716                 resid -= length;
1717         }
1718
1719         return 0;
1720
1721 out_partial:
1722         rbd_obj_request_put(obj_request);
1723 out_unwind:
1724         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1725                 rbd_obj_request_put(obj_request);
1726
1727         return -ENOMEM;
1728 }
1729
1730 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1731 {
1732         struct rbd_device *rbd_dev = img_request->rbd_dev;
1733         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1734         struct rbd_obj_request *obj_request;
1735         struct rbd_obj_request *next_obj_request;
1736
1737         dout("%s: img %p\n", __func__, img_request);
1738         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
1739                 int ret;
1740
1741                 ret = rbd_obj_request_submit(osdc, obj_request);
1742                 if (ret)
1743                         return ret;
1744                 /*
1745                  * The image request has its own reference to each
1746                  * of its object requests, so we can safely drop the
1747                  * initial one here.
1748                  */
1749                 rbd_obj_request_put(obj_request);
1750         }
1751
1752         return 0;
1753 }
1754
1755 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1756                                    u64 ver, u64 notify_id)
1757 {
1758         struct rbd_obj_request *obj_request;
1759         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1760         int ret;
1761
1762         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1763                                                         OBJ_REQUEST_NODATA);
1764         if (!obj_request)
1765                 return -ENOMEM;
1766
1767         ret = -ENOMEM;
1768         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1769         if (!obj_request->osd_req)
1770                 goto out;
1771         obj_request->callback = rbd_obj_request_put;
1772
1773         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
1774                                         notify_id, ver, 0);
1775         rbd_osd_req_format(obj_request, false);
1776
1777         ret = rbd_obj_request_submit(osdc, obj_request);
1778 out:
1779         if (ret)
1780                 rbd_obj_request_put(obj_request);
1781
1782         return ret;
1783 }
1784
1785 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1786 {
1787         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1788         u64 hver;
1789         int rc;
1790
1791         if (!rbd_dev)
1792                 return;
1793
1794         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1795                 rbd_dev->header_name, (unsigned long long) notify_id,
1796                 (unsigned int) opcode);
1797         rc = rbd_dev_refresh(rbd_dev, &hver);
1798         if (rc)
1799                 rbd_warn(rbd_dev, "got notification but failed to "
1800                            " update snaps: %d\n", rc);
1801
1802         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1803 }
1804
1805 /*
1806  * Request sync osd watch/unwatch.  The value of "start" determines
1807  * whether a watch request is being initiated or torn down.
1808  */
1809 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1810 {
1811         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1812         struct rbd_obj_request *obj_request;
1813         int ret;
1814
1815         rbd_assert(start ^ !!rbd_dev->watch_event);
1816         rbd_assert(start ^ !!rbd_dev->watch_request);
1817
1818         if (start) {
1819                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1820                                                 &rbd_dev->watch_event);
1821                 if (ret < 0)
1822                         return ret;
1823                 rbd_assert(rbd_dev->watch_event != NULL);
1824         }
1825
1826         ret = -ENOMEM;
1827         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1828                                                         OBJ_REQUEST_NODATA);
1829         if (!obj_request)
1830                 goto out_cancel;
1831
1832         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
1833         if (!obj_request->osd_req)
1834                 goto out_cancel;
1835
1836         if (start)
1837                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1838         else
1839                 ceph_osdc_unregister_linger_request(osdc,
1840                                         rbd_dev->watch_request->osd_req);
1841
1842         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1843                                 rbd_dev->watch_event->cookie,
1844                                 rbd_dev->header.obj_version, start);
1845         rbd_osd_req_format(obj_request, true);
1846
1847         ret = rbd_obj_request_submit(osdc, obj_request);
1848         if (ret)
1849                 goto out_cancel;
1850         ret = rbd_obj_request_wait(obj_request);
1851         if (ret)
1852                 goto out_cancel;
1853         ret = obj_request->result;
1854         if (ret)
1855                 goto out_cancel;
1856
1857         /*
1858          * A watch request is set to linger, so the underlying osd
1859          * request won't go away until we unregister it.  We retain
1860          * a pointer to the object request during that time (in
1861          * rbd_dev->watch_request), so we'll keep a reference to
1862          * it.  We'll drop that reference (below) after we've
1863          * unregistered it.
1864          */
1865         if (start) {
1866                 rbd_dev->watch_request = obj_request;
1867
1868                 return 0;
1869         }
1870
1871         /* We have successfully torn down the watch request */
1872
1873         rbd_obj_request_put(rbd_dev->watch_request);
1874         rbd_dev->watch_request = NULL;
1875 out_cancel:
1876         /* Cancel the event if we're tearing down, or on error */
1877         ceph_osdc_cancel_event(rbd_dev->watch_event);
1878         rbd_dev->watch_event = NULL;
1879         if (obj_request)
1880                 rbd_obj_request_put(obj_request);
1881
1882         return ret;
1883 }
1884
1885 /*
1886  * Synchronous osd object method call
1887  */
1888 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1889                              const char *object_name,
1890                              const char *class_name,
1891                              const char *method_name,
1892                              const char *outbound,
1893                              size_t outbound_size,
1894                              char *inbound,
1895                              size_t inbound_size,
1896                              u64 *version)
1897 {
1898         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1899         struct rbd_obj_request *obj_request;
1900         struct page **pages;
1901         u32 page_count;
1902         int ret;
1903
1904         /*
1905          * Method calls are ultimately read operations.  The result
1906          * should placed into the inbound buffer provided.  They
1907          * also supply outbound data--parameters for the object
1908          * method.  Currently if this is present it will be a
1909          * snapshot id.
1910          */
1911         page_count = (u32) calc_pages_for(0, inbound_size);
1912         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1913         if (IS_ERR(pages))
1914                 return PTR_ERR(pages);
1915
1916         ret = -ENOMEM;
1917         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
1918                                                         OBJ_REQUEST_PAGES);
1919         if (!obj_request)
1920                 goto out;
1921
1922         obj_request->pages = pages;
1923         obj_request->page_count = page_count;
1924
1925         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
1926         if (!obj_request->osd_req)
1927                 goto out;
1928
1929         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
1930                                         class_name, method_name);
1931         if (outbound_size) {
1932                 struct ceph_pagelist *pagelist;
1933
1934                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
1935                 if (!pagelist)
1936                         goto out;
1937
1938                 ceph_pagelist_init(pagelist);
1939                 ceph_pagelist_append(pagelist, outbound, outbound_size);
1940                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
1941                                                 pagelist);
1942         }
1943         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
1944                                         obj_request->pages, inbound_size,
1945                                         0, false, false);
1946         rbd_osd_req_format(obj_request, false);
1947
1948         ret = rbd_obj_request_submit(osdc, obj_request);
1949         if (ret)
1950                 goto out;
1951         ret = rbd_obj_request_wait(obj_request);
1952         if (ret)
1953                 goto out;
1954
1955         ret = obj_request->result;
1956         if (ret < 0)
1957                 goto out;
1958         ret = 0;
1959         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1960         if (version)
1961                 *version = obj_request->version;
1962 out:
1963         if (obj_request)
1964                 rbd_obj_request_put(obj_request);
1965         else
1966                 ceph_release_page_vector(pages, page_count);
1967
1968         return ret;
1969 }
1970
1971 static void rbd_request_fn(struct request_queue *q)
1972                 __releases(q->queue_lock) __acquires(q->queue_lock)
1973 {
1974         struct rbd_device *rbd_dev = q->queuedata;
1975         bool read_only = rbd_dev->mapping.read_only;
1976         struct request *rq;
1977         int result;
1978
1979         while ((rq = blk_fetch_request(q))) {
1980                 bool write_request = rq_data_dir(rq) == WRITE;
1981                 struct rbd_img_request *img_request;
1982                 u64 offset;
1983                 u64 length;
1984
1985                 /* Ignore any non-FS requests that filter through. */
1986
1987                 if (rq->cmd_type != REQ_TYPE_FS) {
1988                         dout("%s: non-fs request type %d\n", __func__,
1989                                 (int) rq->cmd_type);
1990                         __blk_end_request_all(rq, 0);
1991                         continue;
1992                 }
1993
1994                 /* Ignore/skip any zero-length requests */
1995
1996                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1997                 length = (u64) blk_rq_bytes(rq);
1998
1999                 if (!length) {
2000                         dout("%s: zero-length request\n", __func__);
2001                         __blk_end_request_all(rq, 0);
2002                         continue;
2003                 }
2004
2005                 spin_unlock_irq(q->queue_lock);
2006
2007                 /* Disallow writes to a read-only device */
2008
2009                 if (write_request) {
2010                         result = -EROFS;
2011                         if (read_only)
2012                                 goto end_request;
2013                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2014                 }
2015
2016                 /*
2017                  * Quit early if the mapped snapshot no longer
2018                  * exists.  It's still possible the snapshot will
2019                  * have disappeared by the time our request arrives
2020                  * at the osd, but there's no sense in sending it if
2021                  * we already know.
2022                  */
2023                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2024                         dout("request for non-existent snapshot");
2025                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2026                         result = -ENXIO;
2027                         goto end_request;
2028                 }
2029
2030                 result = -EINVAL;
2031                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2032                         goto end_request;       /* Shouldn't happen */
2033
2034                 result = -ENOMEM;
2035                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2036                                                         write_request, false);
2037                 if (!img_request)
2038                         goto end_request;
2039
2040                 img_request->rq = rq;
2041
2042                 result = rbd_img_request_fill_bio(img_request, rq->bio);
2043                 if (!result)
2044                         result = rbd_img_request_submit(img_request);
2045                 if (result)
2046                         rbd_img_request_put(img_request);
2047 end_request:
2048                 spin_lock_irq(q->queue_lock);
2049                 if (result < 0) {
2050                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2051                                 write_request ? "write" : "read",
2052                                 length, offset, result);
2053
2054                         __blk_end_request_all(rq, result);
2055                 }
2056         }
2057 }
2058
2059 /*
2060  * a queue callback. Makes sure that we don't create a bio that spans across
2061  * multiple osd objects. One exception would be with a single page bios,
2062  * which we handle later at bio_chain_clone_range()
2063  */
2064 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2065                           struct bio_vec *bvec)
2066 {
2067         struct rbd_device *rbd_dev = q->queuedata;
2068         sector_t sector_offset;
2069         sector_t sectors_per_obj;
2070         sector_t obj_sector_offset;
2071         int ret;
2072
2073         /*
2074          * Find how far into its rbd object the partition-relative
2075          * bio start sector is to offset relative to the enclosing
2076          * device.
2077          */
2078         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2079         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2080         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2081
2082         /*
2083          * Compute the number of bytes from that offset to the end
2084          * of the object.  Account for what's already used by the bio.
2085          */
2086         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2087         if (ret > bmd->bi_size)
2088                 ret -= bmd->bi_size;
2089         else
2090                 ret = 0;
2091
2092         /*
2093          * Don't send back more than was asked for.  And if the bio
2094          * was empty, let the whole thing through because:  "Note
2095          * that a block device *must* allow a single page to be
2096          * added to an empty bio."
2097          */
2098         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2099         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2100                 ret = (int) bvec->bv_len;
2101
2102         return ret;
2103 }
2104
2105 static void rbd_free_disk(struct rbd_device *rbd_dev)
2106 {
2107         struct gendisk *disk = rbd_dev->disk;
2108
2109         if (!disk)
2110                 return;
2111
2112         if (disk->flags & GENHD_FL_UP)
2113                 del_gendisk(disk);
2114         if (disk->queue)
2115                 blk_cleanup_queue(disk->queue);
2116         put_disk(disk);
2117 }
2118
2119 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2120                                 const char *object_name,
2121                                 u64 offset, u64 length,
2122                                 char *buf, u64 *version)
2123
2124 {
2125         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2126         struct rbd_obj_request *obj_request;
2127         struct page **pages = NULL;
2128         u32 page_count;
2129         size_t size;
2130         int ret;
2131
2132         page_count = (u32) calc_pages_for(offset, length);
2133         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2134         if (IS_ERR(pages))
2135                 ret = PTR_ERR(pages);
2136
2137         ret = -ENOMEM;
2138         obj_request = rbd_obj_request_create(object_name, offset, length,
2139                                                         OBJ_REQUEST_PAGES);
2140         if (!obj_request)
2141                 goto out;
2142
2143         obj_request->pages = pages;
2144         obj_request->page_count = page_count;
2145
2146         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2147         if (!obj_request->osd_req)
2148                 goto out;
2149
2150         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2151                                         offset, length, 0, 0);
2152         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false,
2153                                         obj_request->pages,
2154                                         obj_request->length,
2155                                         obj_request->offset & ~PAGE_MASK,
2156                                         false, false);
2157         rbd_osd_req_format(obj_request, false);
2158
2159         ret = rbd_obj_request_submit(osdc, obj_request);
2160         if (ret)
2161                 goto out;
2162         ret = rbd_obj_request_wait(obj_request);
2163         if (ret)
2164                 goto out;
2165
2166         ret = obj_request->result;
2167         if (ret < 0)
2168                 goto out;
2169
2170         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2171         size = (size_t) obj_request->xferred;
2172         ceph_copy_from_page_vector(pages, buf, 0, size);
2173         rbd_assert(size <= (size_t) INT_MAX);
2174         ret = (int) size;
2175         if (version)
2176                 *version = obj_request->version;
2177 out:
2178         if (obj_request)
2179                 rbd_obj_request_put(obj_request);
2180         else
2181                 ceph_release_page_vector(pages, page_count);
2182
2183         return ret;
2184 }
2185
2186 /*
2187  * Read the complete header for the given rbd device.
2188  *
2189  * Returns a pointer to a dynamically-allocated buffer containing
2190  * the complete and validated header.  Caller can pass the address
2191  * of a variable that will be filled in with the version of the
2192  * header object at the time it was read.
2193  *
2194  * Returns a pointer-coded errno if a failure occurs.
2195  */
2196 static struct rbd_image_header_ondisk *
2197 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2198 {
2199         struct rbd_image_header_ondisk *ondisk = NULL;
2200         u32 snap_count = 0;
2201         u64 names_size = 0;
2202         u32 want_count;
2203         int ret;
2204
2205         /*
2206          * The complete header will include an array of its 64-bit
2207          * snapshot ids, followed by the names of those snapshots as
2208          * a contiguous block of NUL-terminated strings.  Note that
2209          * the number of snapshots could change by the time we read
2210          * it in, in which case we re-read it.
2211          */
2212         do {
2213                 size_t size;
2214
2215                 kfree(ondisk);
2216
2217                 size = sizeof (*ondisk);
2218                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2219                 size += names_size;
2220                 ondisk = kmalloc(size, GFP_KERNEL);
2221                 if (!ondisk)
2222                         return ERR_PTR(-ENOMEM);
2223
2224                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2225                                        0, size,
2226                                        (char *) ondisk, version);
2227                 if (ret < 0)
2228                         goto out_err;
2229                 if (WARN_ON((size_t) ret < size)) {
2230                         ret = -ENXIO;
2231                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2232                                 size, ret);
2233                         goto out_err;
2234                 }
2235                 if (!rbd_dev_ondisk_valid(ondisk)) {
2236                         ret = -ENXIO;
2237                         rbd_warn(rbd_dev, "invalid header");
2238                         goto out_err;
2239                 }
2240
2241                 names_size = le64_to_cpu(ondisk->snap_names_len);
2242                 want_count = snap_count;
2243                 snap_count = le32_to_cpu(ondisk->snap_count);
2244         } while (snap_count != want_count);
2245
2246         return ondisk;
2247
2248 out_err:
2249         kfree(ondisk);
2250
2251         return ERR_PTR(ret);
2252 }
2253
2254 /*
2255  * reload the ondisk the header
2256  */
2257 static int rbd_read_header(struct rbd_device *rbd_dev,
2258                            struct rbd_image_header *header)
2259 {
2260         struct rbd_image_header_ondisk *ondisk;
2261         u64 ver = 0;
2262         int ret;
2263
2264         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2265         if (IS_ERR(ondisk))
2266                 return PTR_ERR(ondisk);
2267         ret = rbd_header_from_disk(header, ondisk);
2268         if (ret >= 0)
2269                 header->obj_version = ver;
2270         kfree(ondisk);
2271
2272         return ret;
2273 }
2274
2275 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2276 {
2277         struct rbd_snap *snap;
2278         struct rbd_snap *next;
2279
2280         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2281                 rbd_remove_snap_dev(snap);
2282 }
2283
2284 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2285 {
2286         sector_t size;
2287
2288         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2289                 return;
2290
2291         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2292         dout("setting size to %llu sectors", (unsigned long long) size);
2293         rbd_dev->mapping.size = (u64) size;
2294         set_capacity(rbd_dev->disk, size);
2295 }
2296
2297 /*
2298  * only read the first part of the ondisk header, without the snaps info
2299  */
2300 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2301 {
2302         int ret;
2303         struct rbd_image_header h;
2304
2305         ret = rbd_read_header(rbd_dev, &h);
2306         if (ret < 0)
2307                 return ret;
2308
2309         down_write(&rbd_dev->header_rwsem);
2310
2311         /* Update image size, and check for resize of mapped image */
2312         rbd_dev->header.image_size = h.image_size;
2313         rbd_update_mapping_size(rbd_dev);
2314
2315         /* rbd_dev->header.object_prefix shouldn't change */
2316         kfree(rbd_dev->header.snap_sizes);
2317         kfree(rbd_dev->header.snap_names);
2318         /* osd requests may still refer to snapc */
2319         ceph_put_snap_context(rbd_dev->header.snapc);
2320
2321         if (hver)
2322                 *hver = h.obj_version;
2323         rbd_dev->header.obj_version = h.obj_version;
2324         rbd_dev->header.image_size = h.image_size;
2325         rbd_dev->header.snapc = h.snapc;
2326         rbd_dev->header.snap_names = h.snap_names;
2327         rbd_dev->header.snap_sizes = h.snap_sizes;
2328         /* Free the extra copy of the object prefix */
2329         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2330         kfree(h.object_prefix);
2331
2332         ret = rbd_dev_snaps_update(rbd_dev);
2333         if (!ret)
2334                 ret = rbd_dev_snaps_register(rbd_dev);
2335
2336         up_write(&rbd_dev->header_rwsem);
2337
2338         return ret;
2339 }
2340
2341 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2342 {
2343         int ret;
2344
2345         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2346         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2347         if (rbd_dev->image_format == 1)
2348                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2349         else
2350                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2351         mutex_unlock(&ctl_mutex);
2352
2353         return ret;
2354 }
2355
2356 static int rbd_init_disk(struct rbd_device *rbd_dev)
2357 {
2358         struct gendisk *disk;
2359         struct request_queue *q;
2360         u64 segment_size;
2361
2362         /* create gendisk info */
2363         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2364         if (!disk)
2365                 return -ENOMEM;
2366
2367         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2368                  rbd_dev->dev_id);
2369         disk->major = rbd_dev->major;
2370         disk->first_minor = 0;
2371         disk->fops = &rbd_bd_ops;
2372         disk->private_data = rbd_dev;
2373
2374         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2375         if (!q)
2376                 goto out_disk;
2377
2378         /* We use the default size, but let's be explicit about it. */
2379         blk_queue_physical_block_size(q, SECTOR_SIZE);
2380
2381         /* set io sizes to object size */
2382         segment_size = rbd_obj_bytes(&rbd_dev->header);
2383         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2384         blk_queue_max_segment_size(q, segment_size);
2385         blk_queue_io_min(q, segment_size);
2386         blk_queue_io_opt(q, segment_size);
2387
2388         blk_queue_merge_bvec(q, rbd_merge_bvec);
2389         disk->queue = q;
2390
2391         q->queuedata = rbd_dev;
2392
2393         rbd_dev->disk = disk;
2394
2395         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2396
2397         return 0;
2398 out_disk:
2399         put_disk(disk);
2400
2401         return -ENOMEM;
2402 }
2403
2404 /*
2405   sysfs
2406 */
2407
2408 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2409 {
2410         return container_of(dev, struct rbd_device, dev);
2411 }
2412
2413 static ssize_t rbd_size_show(struct device *dev,
2414                              struct device_attribute *attr, char *buf)
2415 {
2416         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2417         sector_t size;
2418
2419         down_read(&rbd_dev->header_rwsem);
2420         size = get_capacity(rbd_dev->disk);
2421         up_read(&rbd_dev->header_rwsem);
2422
2423         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2424 }
2425
2426 /*
2427  * Note this shows the features for whatever's mapped, which is not
2428  * necessarily the base image.
2429  */
2430 static ssize_t rbd_features_show(struct device *dev,
2431                              struct device_attribute *attr, char *buf)
2432 {
2433         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2434
2435         return sprintf(buf, "0x%016llx\n",
2436                         (unsigned long long) rbd_dev->mapping.features);
2437 }
2438
2439 static ssize_t rbd_major_show(struct device *dev,
2440                               struct device_attribute *attr, char *buf)
2441 {
2442         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2443
2444         return sprintf(buf, "%d\n", rbd_dev->major);
2445 }
2446
2447 static ssize_t rbd_client_id_show(struct device *dev,
2448                                   struct device_attribute *attr, char *buf)
2449 {
2450         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2451
2452         return sprintf(buf, "client%lld\n",
2453                         ceph_client_id(rbd_dev->rbd_client->client));
2454 }
2455
2456 static ssize_t rbd_pool_show(struct device *dev,
2457                              struct device_attribute *attr, char *buf)
2458 {
2459         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2460
2461         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2462 }
2463
2464 static ssize_t rbd_pool_id_show(struct device *dev,
2465                              struct device_attribute *attr, char *buf)
2466 {
2467         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2468
2469         return sprintf(buf, "%llu\n",
2470                 (unsigned long long) rbd_dev->spec->pool_id);
2471 }
2472
2473 static ssize_t rbd_name_show(struct device *dev,
2474                              struct device_attribute *attr, char *buf)
2475 {
2476         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2477
2478         if (rbd_dev->spec->image_name)
2479                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2480
2481         return sprintf(buf, "(unknown)\n");
2482 }
2483
2484 static ssize_t rbd_image_id_show(struct device *dev,
2485                              struct device_attribute *attr, char *buf)
2486 {
2487         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2488
2489         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2490 }
2491
2492 /*
2493  * Shows the name of the currently-mapped snapshot (or
2494  * RBD_SNAP_HEAD_NAME for the base image).
2495  */
2496 static ssize_t rbd_snap_show(struct device *dev,
2497                              struct device_attribute *attr,
2498                              char *buf)
2499 {
2500         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2501
2502         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2503 }
2504
2505 /*
2506  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2507  * for the parent image.  If there is no parent, simply shows
2508  * "(no parent image)".
2509  */
2510 static ssize_t rbd_parent_show(struct device *dev,
2511                              struct device_attribute *attr,
2512                              char *buf)
2513 {
2514         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2515         struct rbd_spec *spec = rbd_dev->parent_spec;
2516         int count;
2517         char *bufp = buf;
2518
2519         if (!spec)
2520                 return sprintf(buf, "(no parent image)\n");
2521
2522         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2523                         (unsigned long long) spec->pool_id, spec->pool_name);
2524         if (count < 0)
2525                 return count;
2526         bufp += count;
2527
2528         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2529                         spec->image_name ? spec->image_name : "(unknown)");
2530         if (count < 0)
2531                 return count;
2532         bufp += count;
2533
2534         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2535                         (unsigned long long) spec->snap_id, spec->snap_name);
2536         if (count < 0)
2537                 return count;
2538         bufp += count;
2539
2540         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2541         if (count < 0)
2542                 return count;
2543         bufp += count;
2544
2545         return (ssize_t) (bufp - buf);
2546 }
2547
2548 static ssize_t rbd_image_refresh(struct device *dev,
2549                                  struct device_attribute *attr,
2550                                  const char *buf,
2551                                  size_t size)
2552 {
2553         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2554         int ret;
2555
2556         ret = rbd_dev_refresh(rbd_dev, NULL);
2557
2558         return ret < 0 ? ret : size;
2559 }
2560
2561 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2562 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2563 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2564 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2565 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2566 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2567 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2568 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2569 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2570 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2571 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2572
2573 static struct attribute *rbd_attrs[] = {
2574         &dev_attr_size.attr,
2575         &dev_attr_features.attr,
2576         &dev_attr_major.attr,
2577         &dev_attr_client_id.attr,
2578         &dev_attr_pool.attr,
2579         &dev_attr_pool_id.attr,
2580         &dev_attr_name.attr,
2581         &dev_attr_image_id.attr,
2582         &dev_attr_current_snap.attr,
2583         &dev_attr_parent.attr,
2584         &dev_attr_refresh.attr,
2585         NULL
2586 };
2587
2588 static struct attribute_group rbd_attr_group = {
2589         .attrs = rbd_attrs,
2590 };
2591
2592 static const struct attribute_group *rbd_attr_groups[] = {
2593         &rbd_attr_group,
2594         NULL
2595 };
2596
2597 static void rbd_sysfs_dev_release(struct device *dev)
2598 {
2599 }
2600
2601 static struct device_type rbd_device_type = {
2602         .name           = "rbd",
2603         .groups         = rbd_attr_groups,
2604         .release        = rbd_sysfs_dev_release,
2605 };
2606
2607
2608 /*
2609   sysfs - snapshots
2610 */
2611
2612 static ssize_t rbd_snap_size_show(struct device *dev,
2613                                   struct device_attribute *attr,
2614                                   char *buf)
2615 {
2616         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2617
2618         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2619 }
2620
2621 static ssize_t rbd_snap_id_show(struct device *dev,
2622                                 struct device_attribute *attr,
2623                                 char *buf)
2624 {
2625         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2626
2627         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2628 }
2629
2630 static ssize_t rbd_snap_features_show(struct device *dev,
2631                                 struct device_attribute *attr,
2632                                 char *buf)
2633 {
2634         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2635
2636         return sprintf(buf, "0x%016llx\n",
2637                         (unsigned long long) snap->features);
2638 }
2639
2640 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2641 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2642 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2643
2644 static struct attribute *rbd_snap_attrs[] = {
2645         &dev_attr_snap_size.attr,
2646         &dev_attr_snap_id.attr,
2647         &dev_attr_snap_features.attr,
2648         NULL,
2649 };
2650
2651 static struct attribute_group rbd_snap_attr_group = {
2652         .attrs = rbd_snap_attrs,
2653 };
2654
2655 static void rbd_snap_dev_release(struct device *dev)
2656 {
2657         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2658         kfree(snap->name);
2659         kfree(snap);
2660 }
2661
2662 static const struct attribute_group *rbd_snap_attr_groups[] = {
2663         &rbd_snap_attr_group,
2664         NULL
2665 };
2666
2667 static struct device_type rbd_snap_device_type = {
2668         .groups         = rbd_snap_attr_groups,
2669         .release        = rbd_snap_dev_release,
2670 };
2671
2672 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2673 {
2674         kref_get(&spec->kref);
2675
2676         return spec;
2677 }
2678
2679 static void rbd_spec_free(struct kref *kref);
2680 static void rbd_spec_put(struct rbd_spec *spec)
2681 {
2682         if (spec)
2683                 kref_put(&spec->kref, rbd_spec_free);
2684 }
2685
2686 static struct rbd_spec *rbd_spec_alloc(void)
2687 {
2688         struct rbd_spec *spec;
2689
2690         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2691         if (!spec)
2692                 return NULL;
2693         kref_init(&spec->kref);
2694
2695         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2696
2697         return spec;
2698 }
2699
2700 static void rbd_spec_free(struct kref *kref)
2701 {
2702         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2703
2704         kfree(spec->pool_name);
2705         kfree(spec->image_id);
2706         kfree(spec->image_name);
2707         kfree(spec->snap_name);
2708         kfree(spec);
2709 }
2710
2711 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2712                                 struct rbd_spec *spec)
2713 {
2714         struct rbd_device *rbd_dev;
2715
2716         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2717         if (!rbd_dev)
2718                 return NULL;
2719
2720         spin_lock_init(&rbd_dev->lock);
2721         rbd_dev->flags = 0;
2722         INIT_LIST_HEAD(&rbd_dev->node);
2723         INIT_LIST_HEAD(&rbd_dev->snaps);
2724         init_rwsem(&rbd_dev->header_rwsem);
2725
2726         rbd_dev->spec = spec;
2727         rbd_dev->rbd_client = rbdc;
2728
2729         /* Initialize the layout used for all rbd requests */
2730
2731         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2732         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2733         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2734         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2735
2736         return rbd_dev;
2737 }
2738
2739 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2740 {
2741         rbd_spec_put(rbd_dev->parent_spec);
2742         kfree(rbd_dev->header_name);
2743         rbd_put_client(rbd_dev->rbd_client);
2744         rbd_spec_put(rbd_dev->spec);
2745         kfree(rbd_dev);
2746 }
2747
2748 static bool rbd_snap_registered(struct rbd_snap *snap)
2749 {
2750         bool ret = snap->dev.type == &rbd_snap_device_type;
2751         bool reg = device_is_registered(&snap->dev);
2752
2753         rbd_assert(!ret ^ reg);
2754
2755         return ret;
2756 }
2757
2758 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2759 {
2760         list_del(&snap->node);
2761         if (device_is_registered(&snap->dev))
2762                 device_unregister(&snap->dev);
2763 }
2764
2765 static int rbd_register_snap_dev(struct rbd_snap *snap,
2766                                   struct device *parent)
2767 {
2768         struct device *dev = &snap->dev;
2769         int ret;
2770
2771         dev->type = &rbd_snap_device_type;
2772         dev->parent = parent;
2773         dev->release = rbd_snap_dev_release;
2774         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2775         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2776
2777         ret = device_register(dev);
2778
2779         return ret;
2780 }
2781
2782 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2783                                                 const char *snap_name,
2784                                                 u64 snap_id, u64 snap_size,
2785                                                 u64 snap_features)
2786 {
2787         struct rbd_snap *snap;
2788         int ret;
2789
2790         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2791         if (!snap)
2792                 return ERR_PTR(-ENOMEM);
2793
2794         ret = -ENOMEM;
2795         snap->name = kstrdup(snap_name, GFP_KERNEL);
2796         if (!snap->name)
2797                 goto err;
2798
2799         snap->id = snap_id;
2800         snap->size = snap_size;
2801         snap->features = snap_features;
2802
2803         return snap;
2804
2805 err:
2806         kfree(snap->name);
2807         kfree(snap);
2808
2809         return ERR_PTR(ret);
2810 }
2811
2812 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2813                 u64 *snap_size, u64 *snap_features)
2814 {
2815         char *snap_name;
2816
2817         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2818
2819         *snap_size = rbd_dev->header.snap_sizes[which];
2820         *snap_features = 0;     /* No features for v1 */
2821
2822         /* Skip over names until we find the one we are looking for */
2823
2824         snap_name = rbd_dev->header.snap_names;
2825         while (which--)
2826                 snap_name += strlen(snap_name) + 1;
2827
2828         return snap_name;
2829 }
2830
2831 /*
2832  * Get the size and object order for an image snapshot, or if
2833  * snap_id is CEPH_NOSNAP, gets this information for the base
2834  * image.
2835  */
2836 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2837                                 u8 *order, u64 *snap_size)
2838 {
2839         __le64 snapid = cpu_to_le64(snap_id);
2840         int ret;
2841         struct {
2842                 u8 order;
2843                 __le64 size;
2844         } __attribute__ ((packed)) size_buf = { 0 };
2845
2846         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2847                                 "rbd", "get_size",
2848                                 (char *) &snapid, sizeof (snapid),
2849                                 (char *) &size_buf, sizeof (size_buf), NULL);
2850         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2851         if (ret < 0)
2852                 return ret;
2853
2854         *order = size_buf.order;
2855         *snap_size = le64_to_cpu(size_buf.size);
2856
2857         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2858                 (unsigned long long) snap_id, (unsigned int) *order,
2859                 (unsigned long long) *snap_size);
2860
2861         return 0;
2862 }
2863
2864 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2865 {
2866         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2867                                         &rbd_dev->header.obj_order,
2868                                         &rbd_dev->header.image_size);
2869 }
2870
2871 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2872 {
2873         void *reply_buf;
2874         int ret;
2875         void *p;
2876
2877         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2878         if (!reply_buf)
2879                 return -ENOMEM;
2880
2881         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2882                                 "rbd", "get_object_prefix",
2883                                 NULL, 0,
2884                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2885         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2886         if (ret < 0)
2887                 goto out;
2888
2889         p = reply_buf;
2890         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2891                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2892                                                 NULL, GFP_NOIO);
2893
2894         if (IS_ERR(rbd_dev->header.object_prefix)) {
2895                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2896                 rbd_dev->header.object_prefix = NULL;
2897         } else {
2898                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2899         }
2900
2901 out:
2902         kfree(reply_buf);
2903
2904         return ret;
2905 }
2906
2907 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2908                 u64 *snap_features)
2909 {
2910         __le64 snapid = cpu_to_le64(snap_id);
2911         struct {
2912                 __le64 features;
2913                 __le64 incompat;
2914         } features_buf = { 0 };
2915         u64 incompat;
2916         int ret;
2917
2918         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2919                                 "rbd", "get_features",
2920                                 (char *) &snapid, sizeof (snapid),
2921                                 (char *) &features_buf, sizeof (features_buf),
2922                                 NULL);
2923         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2924         if (ret < 0)
2925                 return ret;
2926
2927         incompat = le64_to_cpu(features_buf.incompat);
2928         if (incompat & ~RBD_FEATURES_SUPPORTED)
2929                 return -ENXIO;
2930
2931         *snap_features = le64_to_cpu(features_buf.features);
2932
2933         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2934                 (unsigned long long) snap_id,
2935                 (unsigned long long) *snap_features,
2936                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2937
2938         return 0;
2939 }
2940
2941 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2942 {
2943         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2944                                                 &rbd_dev->header.features);
2945 }
2946
2947 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2948 {
2949         struct rbd_spec *parent_spec;
2950         size_t size;
2951         void *reply_buf = NULL;
2952         __le64 snapid;
2953         void *p;
2954         void *end;
2955         char *image_id;
2956         u64 overlap;
2957         int ret;
2958
2959         parent_spec = rbd_spec_alloc();
2960         if (!parent_spec)
2961                 return -ENOMEM;
2962
2963         size = sizeof (__le64) +                                /* pool_id */
2964                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2965                 sizeof (__le64) +                               /* snap_id */
2966                 sizeof (__le64);                                /* overlap */
2967         reply_buf = kmalloc(size, GFP_KERNEL);
2968         if (!reply_buf) {
2969                 ret = -ENOMEM;
2970                 goto out_err;
2971         }
2972
2973         snapid = cpu_to_le64(CEPH_NOSNAP);
2974         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2975                                 "rbd", "get_parent",
2976                                 (char *) &snapid, sizeof (snapid),
2977                                 (char *) reply_buf, size, NULL);
2978         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2979         if (ret < 0)
2980                 goto out_err;
2981
2982         ret = -ERANGE;
2983         p = reply_buf;
2984         end = (char *) reply_buf + size;
2985         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2986         if (parent_spec->pool_id == CEPH_NOPOOL)
2987                 goto out;       /* No parent?  No problem. */
2988
2989         /* The ceph file layout needs to fit pool id in 32 bits */
2990
2991         ret = -EIO;
2992         if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2993                 goto out;
2994
2995         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2996         if (IS_ERR(image_id)) {
2997                 ret = PTR_ERR(image_id);
2998                 goto out_err;
2999         }
3000         parent_spec->image_id = image_id;
3001         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3002         ceph_decode_64_safe(&p, end, overlap, out_err);
3003
3004         rbd_dev->parent_overlap = overlap;
3005         rbd_dev->parent_spec = parent_spec;
3006         parent_spec = NULL;     /* rbd_dev now owns this */
3007 out:
3008         ret = 0;
3009 out_err:
3010         kfree(reply_buf);
3011         rbd_spec_put(parent_spec);
3012
3013         return ret;
3014 }
3015
3016 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3017 {
3018         size_t image_id_size;
3019         char *image_id;
3020         void *p;
3021         void *end;
3022         size_t size;
3023         void *reply_buf = NULL;
3024         size_t len = 0;
3025         char *image_name = NULL;
3026         int ret;
3027
3028         rbd_assert(!rbd_dev->spec->image_name);
3029
3030         len = strlen(rbd_dev->spec->image_id);
3031         image_id_size = sizeof (__le32) + len;
3032         image_id = kmalloc(image_id_size, GFP_KERNEL);
3033         if (!image_id)
3034                 return NULL;
3035
3036         p = image_id;
3037         end = (char *) image_id + image_id_size;
3038         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3039
3040         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3041         reply_buf = kmalloc(size, GFP_KERNEL);
3042         if (!reply_buf)
3043                 goto out;
3044
3045         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3046                                 "rbd", "dir_get_name",
3047                                 image_id, image_id_size,
3048                                 (char *) reply_buf, size, NULL);
3049         if (ret < 0)
3050                 goto out;
3051         p = reply_buf;
3052         end = (char *) reply_buf + size;
3053         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3054         if (IS_ERR(image_name))
3055                 image_name = NULL;
3056         else
3057                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3058 out:
3059         kfree(reply_buf);
3060         kfree(image_id);
3061
3062         return image_name;
3063 }
3064
3065 /*
3066  * When a parent image gets probed, we only have the pool, image,
3067  * and snapshot ids but not the names of any of them.  This call
3068  * is made later to fill in those names.  It has to be done after
3069  * rbd_dev_snaps_update() has completed because some of the
3070  * information (in particular, snapshot name) is not available
3071  * until then.
3072  */
3073 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3074 {
3075         struct ceph_osd_client *osdc;
3076         const char *name;
3077         void *reply_buf = NULL;
3078         int ret;
3079
3080         if (rbd_dev->spec->pool_name)
3081                 return 0;       /* Already have the names */
3082
3083         /* Look up the pool name */
3084
3085         osdc = &rbd_dev->rbd_client->client->osdc;
3086         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3087         if (!name) {
3088                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3089                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3090                 return -EIO;
3091         }
3092
3093         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3094         if (!rbd_dev->spec->pool_name)
3095                 return -ENOMEM;
3096
3097         /* Fetch the image name; tolerate failure here */
3098
3099         name = rbd_dev_image_name(rbd_dev);
3100         if (name)
3101                 rbd_dev->spec->image_name = (char *) name;
3102         else
3103                 rbd_warn(rbd_dev, "unable to get image name");
3104
3105         /* Look up the snapshot name. */
3106
3107         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3108         if (!name) {
3109                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3110                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3111                 ret = -EIO;
3112                 goto out_err;
3113         }
3114         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3115         if(!rbd_dev->spec->snap_name)
3116                 goto out_err;
3117
3118         return 0;
3119 out_err:
3120         kfree(reply_buf);
3121         kfree(rbd_dev->spec->pool_name);
3122         rbd_dev->spec->pool_name = NULL;
3123
3124         return ret;
3125 }
3126
3127 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3128 {
3129         size_t size;
3130         int ret;
3131         void *reply_buf;
3132         void *p;
3133         void *end;
3134         u64 seq;
3135         u32 snap_count;
3136         struct ceph_snap_context *snapc;
3137         u32 i;
3138
3139         /*
3140          * We'll need room for the seq value (maximum snapshot id),
3141          * snapshot count, and array of that many snapshot ids.
3142          * For now we have a fixed upper limit on the number we're
3143          * prepared to receive.
3144          */
3145         size = sizeof (__le64) + sizeof (__le32) +
3146                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3147         reply_buf = kzalloc(size, GFP_KERNEL);
3148         if (!reply_buf)
3149                 return -ENOMEM;
3150
3151         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3152                                 "rbd", "get_snapcontext",
3153                                 NULL, 0,
3154                                 reply_buf, size, ver);
3155         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3156         if (ret < 0)
3157                 goto out;
3158
3159         ret = -ERANGE;
3160         p = reply_buf;
3161         end = (char *) reply_buf + size;
3162         ceph_decode_64_safe(&p, end, seq, out);
3163         ceph_decode_32_safe(&p, end, snap_count, out);
3164
3165         /*
3166          * Make sure the reported number of snapshot ids wouldn't go
3167          * beyond the end of our buffer.  But before checking that,
3168          * make sure the computed size of the snapshot context we
3169          * allocate is representable in a size_t.
3170          */
3171         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3172                                  / sizeof (u64)) {
3173                 ret = -EINVAL;
3174                 goto out;
3175         }
3176         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3177                 goto out;
3178
3179         size = sizeof (struct ceph_snap_context) +
3180                                 snap_count * sizeof (snapc->snaps[0]);
3181         snapc = kmalloc(size, GFP_KERNEL);
3182         if (!snapc) {
3183                 ret = -ENOMEM;
3184                 goto out;
3185         }
3186
3187         atomic_set(&snapc->nref, 1);
3188         snapc->seq = seq;
3189         snapc->num_snaps = snap_count;
3190         for (i = 0; i < snap_count; i++)
3191                 snapc->snaps[i] = ceph_decode_64(&p);
3192
3193         rbd_dev->header.snapc = snapc;
3194
3195         dout("  snap context seq = %llu, snap_count = %u\n",
3196                 (unsigned long long) seq, (unsigned int) snap_count);
3197
3198 out:
3199         kfree(reply_buf);
3200
3201         return 0;
3202 }
3203
3204 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3205 {
3206         size_t size;
3207         void *reply_buf;
3208         __le64 snap_id;
3209         int ret;
3210         void *p;
3211         void *end;
3212         char *snap_name;
3213
3214         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3215         reply_buf = kmalloc(size, GFP_KERNEL);
3216         if (!reply_buf)
3217                 return ERR_PTR(-ENOMEM);
3218
3219         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3220         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3221                                 "rbd", "get_snapshot_name",
3222                                 (char *) &snap_id, sizeof (snap_id),
3223                                 reply_buf, size, NULL);
3224         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3225         if (ret < 0)
3226                 goto out;
3227
3228         p = reply_buf;
3229         end = (char *) reply_buf + size;
3230         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3231         if (IS_ERR(snap_name)) {
3232                 ret = PTR_ERR(snap_name);
3233                 goto out;
3234         } else {
3235                 dout("  snap_id 0x%016llx snap_name = %s\n",
3236                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
3237         }
3238         kfree(reply_buf);
3239
3240         return snap_name;
3241 out:
3242         kfree(reply_buf);
3243
3244         return ERR_PTR(ret);
3245 }
3246
3247 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3248                 u64 *snap_size, u64 *snap_features)
3249 {
3250         u64 snap_id;
3251         u8 order;
3252         int ret;
3253
3254         snap_id = rbd_dev->header.snapc->snaps[which];
3255         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3256         if (ret)
3257                 return ERR_PTR(ret);
3258         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3259         if (ret)
3260                 return ERR_PTR(ret);
3261
3262         return rbd_dev_v2_snap_name(rbd_dev, which);
3263 }
3264
3265 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3266                 u64 *snap_size, u64 *snap_features)
3267 {
3268         if (rbd_dev->image_format == 1)
3269                 return rbd_dev_v1_snap_info(rbd_dev, which,
3270                                         snap_size, snap_features);
3271         if (rbd_dev->image_format == 2)
3272                 return rbd_dev_v2_snap_info(rbd_dev, which,
3273                                         snap_size, snap_features);
3274         return ERR_PTR(-EINVAL);
3275 }
3276
3277 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3278 {
3279         int ret;
3280         __u8 obj_order;
3281
3282         down_write(&rbd_dev->header_rwsem);
3283
3284         /* Grab old order first, to see if it changes */
3285
3286         obj_order = rbd_dev->header.obj_order,
3287         ret = rbd_dev_v2_image_size(rbd_dev);
3288         if (ret)
3289                 goto out;
3290         if (rbd_dev->header.obj_order != obj_order) {
3291                 ret = -EIO;
3292                 goto out;
3293         }
3294         rbd_update_mapping_size(rbd_dev);
3295
3296         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3297         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3298         if (ret)
3299                 goto out;
3300         ret = rbd_dev_snaps_update(rbd_dev);
3301         dout("rbd_dev_snaps_update returned %d\n", ret);
3302         if (ret)
3303                 goto out;
3304         ret = rbd_dev_snaps_register(rbd_dev);
3305         dout("rbd_dev_snaps_register returned %d\n", ret);
3306 out:
3307         up_write(&rbd_dev->header_rwsem);
3308
3309         return ret;
3310 }
3311
3312 /*
3313  * Scan the rbd device's current snapshot list and compare it to the
3314  * newly-received snapshot context.  Remove any existing snapshots
3315  * not present in the new snapshot context.  Add a new snapshot for
3316  * any snaphots in the snapshot context not in the current list.
3317  * And verify there are no changes to snapshots we already know
3318  * about.
3319  *
3320  * Assumes the snapshots in the snapshot context are sorted by
3321  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3322  * are also maintained in that order.)
3323  */
3324 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3325 {
3326         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3327         const u32 snap_count = snapc->num_snaps;
3328         struct list_head *head = &rbd_dev->snaps;
3329         struct list_head *links = head->next;
3330         u32 index = 0;
3331
3332         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3333         while (index < snap_count || links != head) {
3334                 u64 snap_id;
3335                 struct rbd_snap *snap;
3336                 char *snap_name;
3337                 u64 snap_size = 0;
3338                 u64 snap_features = 0;
3339
3340                 snap_id = index < snap_count ? snapc->snaps[index]
3341                                              : CEPH_NOSNAP;
3342                 snap = links != head ? list_entry(links, struct rbd_snap, node)
3343                                      : NULL;
3344                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3345
3346                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3347                         struct list_head *next = links->next;
3348
3349                         /*
3350                          * A previously-existing snapshot is not in
3351                          * the new snap context.
3352                          *
3353                          * If the now missing snapshot is the one the
3354                          * image is mapped to, clear its exists flag
3355                          * so we can avoid sending any more requests
3356                          * to it.
3357                          */
3358                         if (rbd_dev->spec->snap_id == snap->id)
3359                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3360                         rbd_remove_snap_dev(snap);
3361                         dout("%ssnap id %llu has been removed\n",
3362                                 rbd_dev->spec->snap_id == snap->id ?
3363                                                         "mapped " : "",
3364                                 (unsigned long long) snap->id);
3365
3366                         /* Done with this list entry; advance */
3367
3368                         links = next;
3369                         continue;
3370                 }
3371
3372                 snap_name = rbd_dev_snap_info(rbd_dev, index,
3373                                         &snap_size, &snap_features);
3374                 if (IS_ERR(snap_name))
3375                         return PTR_ERR(snap_name);
3376
3377                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3378                         (unsigned long long) snap_id);
3379                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3380                         struct rbd_snap *new_snap;
3381
3382                         /* We haven't seen this snapshot before */
3383
3384                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3385                                         snap_id, snap_size, snap_features);
3386                         if (IS_ERR(new_snap)) {
3387                                 int err = PTR_ERR(new_snap);
3388
3389                                 dout("  failed to add dev, error %d\n", err);
3390
3391                                 return err;
3392                         }
3393
3394                         /* New goes before existing, or at end of list */
3395
3396                         dout("  added dev%s\n", snap ? "" : " at end\n");
3397                         if (snap)
3398                                 list_add_tail(&new_snap->node, &snap->node);
3399                         else
3400                                 list_add_tail(&new_snap->node, head);
3401                 } else {
3402                         /* Already have this one */
3403
3404                         dout("  already present\n");
3405
3406                         rbd_assert(snap->size == snap_size);
3407                         rbd_assert(!strcmp(snap->name, snap_name));
3408                         rbd_assert(snap->features == snap_features);
3409
3410                         /* Done with this list entry; advance */
3411
3412                         links = links->next;
3413                 }
3414
3415                 /* Advance to the next entry in the snapshot context */
3416
3417                 index++;
3418         }
3419         dout("%s: done\n", __func__);
3420
3421         return 0;
3422 }
3423
3424 /*
3425  * Scan the list of snapshots and register the devices for any that
3426  * have not already been registered.
3427  */
3428 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3429 {
3430         struct rbd_snap *snap;
3431         int ret = 0;
3432
3433         dout("%s:\n", __func__);
3434         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3435                 return -EIO;
3436
3437         list_for_each_entry(snap, &rbd_dev->snaps, node) {
3438                 if (!rbd_snap_registered(snap)) {
3439                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3440                         if (ret < 0)
3441                                 break;
3442                 }
3443         }
3444         dout("%s: returning %d\n", __func__, ret);
3445
3446         return ret;
3447 }
3448
3449 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3450 {
3451         struct device *dev;
3452         int ret;
3453
3454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3455
3456         dev = &rbd_dev->dev;
3457         dev->bus = &rbd_bus_type;
3458         dev->type = &rbd_device_type;
3459         dev->parent = &rbd_root_dev;
3460         dev->release = rbd_dev_release;
3461         dev_set_name(dev, "%d", rbd_dev->dev_id);
3462         ret = device_register(dev);
3463
3464         mutex_unlock(&ctl_mutex);
3465
3466         return ret;
3467 }
3468
3469 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3470 {
3471         device_unregister(&rbd_dev->dev);
3472 }
3473
3474 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3475
3476 /*
3477  * Get a unique rbd identifier for the given new rbd_dev, and add
3478  * the rbd_dev to the global list.  The minimum rbd id is 1.
3479  */
3480 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3481 {
3482         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3483
3484         spin_lock(&rbd_dev_list_lock);
3485         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3486         spin_unlock(&rbd_dev_list_lock);
3487         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3488                 (unsigned long long) rbd_dev->dev_id);
3489 }
3490
3491 /*
3492  * Remove an rbd_dev from the global list, and record that its
3493  * identifier is no longer in use.
3494  */
3495 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3496 {
3497         struct list_head *tmp;
3498         int rbd_id = rbd_dev->dev_id;
3499         int max_id;
3500
3501         rbd_assert(rbd_id > 0);
3502
3503         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3504                 (unsigned long long) rbd_dev->dev_id);
3505         spin_lock(&rbd_dev_list_lock);
3506         list_del_init(&rbd_dev->node);
3507
3508         /*
3509          * If the id being "put" is not the current maximum, there
3510          * is nothing special we need to do.
3511          */
3512         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3513                 spin_unlock(&rbd_dev_list_lock);
3514                 return;
3515         }
3516
3517         /*
3518          * We need to update the current maximum id.  Search the
3519          * list to find out what it is.  We're more likely to find
3520          * the maximum at the end, so search the list backward.
3521          */
3522         max_id = 0;
3523         list_for_each_prev(tmp, &rbd_dev_list) {
3524                 struct rbd_device *rbd_dev;
3525
3526                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3527                 if (rbd_dev->dev_id > max_id)
3528                         max_id = rbd_dev->dev_id;
3529         }
3530         spin_unlock(&rbd_dev_list_lock);
3531
3532         /*
3533          * The max id could have been updated by rbd_dev_id_get(), in
3534          * which case it now accurately reflects the new maximum.
3535          * Be careful not to overwrite the maximum value in that
3536          * case.
3537          */
3538         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3539         dout("  max dev id has been reset\n");
3540 }
3541
3542 /*
3543  * Skips over white space at *buf, and updates *buf to point to the
3544  * first found non-space character (if any). Returns the length of
3545  * the token (string of non-white space characters) found.  Note
3546  * that *buf must be terminated with '\0'.
3547  */
3548 static inline size_t next_token(const char **buf)
3549 {
3550         /*
3551         * These are the characters that produce nonzero for
3552         * isspace() in the "C" and "POSIX" locales.
3553         */
3554         const char *spaces = " \f\n\r\t\v";
3555
3556         *buf += strspn(*buf, spaces);   /* Find start of token */
3557
3558         return strcspn(*buf, spaces);   /* Return token length */
3559 }
3560
3561 /*
3562  * Finds the next token in *buf, and if the provided token buffer is
3563  * big enough, copies the found token into it.  The result, if
3564  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3565  * must be terminated with '\0' on entry.
3566  *
3567  * Returns the length of the token found (not including the '\0').
3568  * Return value will be 0 if no token is found, and it will be >=
3569  * token_size if the token would not fit.
3570  *
3571  * The *buf pointer will be updated to point beyond the end of the
3572  * found token.  Note that this occurs even if the token buffer is
3573  * too small to hold it.
3574  */
3575 static inline size_t copy_token(const char **buf,
3576                                 char *token,
3577                                 size_t token_size)
3578 {
3579         size_t len;
3580
3581         len = next_token(buf);
3582         if (len < token_size) {
3583                 memcpy(token, *buf, len);
3584                 *(token + len) = '\0';
3585         }
3586         *buf += len;
3587
3588         return len;
3589 }
3590
3591 /*
3592  * Finds the next token in *buf, dynamically allocates a buffer big
3593  * enough to hold a copy of it, and copies the token into the new
3594  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3595  * that a duplicate buffer is created even for a zero-length token.
3596  *
3597  * Returns a pointer to the newly-allocated duplicate, or a null
3598  * pointer if memory for the duplicate was not available.  If
3599  * the lenp argument is a non-null pointer, the length of the token
3600  * (not including the '\0') is returned in *lenp.
3601  *
3602  * If successful, the *buf pointer will be updated to point beyond
3603  * the end of the found token.
3604  *
3605  * Note: uses GFP_KERNEL for allocation.
3606  */
3607 static inline char *dup_token(const char **buf, size_t *lenp)
3608 {
3609         char *dup;
3610         size_t len;
3611
3612         len = next_token(buf);
3613         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3614         if (!dup)
3615                 return NULL;
3616         *(dup + len) = '\0';
3617         *buf += len;
3618
3619         if (lenp)
3620                 *lenp = len;
3621
3622         return dup;
3623 }
3624
3625 /*
3626  * Parse the options provided for an "rbd add" (i.e., rbd image
3627  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3628  * and the data written is passed here via a NUL-terminated buffer.
3629  * Returns 0 if successful or an error code otherwise.
3630  *
3631  * The information extracted from these options is recorded in
3632  * the other parameters which return dynamically-allocated
3633  * structures:
3634  *  ceph_opts
3635  *      The address of a pointer that will refer to a ceph options
3636  *      structure.  Caller must release the returned pointer using
3637  *      ceph_destroy_options() when it is no longer needed.
3638  *  rbd_opts
3639  *      Address of an rbd options pointer.  Fully initialized by
3640  *      this function; caller must release with kfree().
3641  *  spec
3642  *      Address of an rbd image specification pointer.  Fully
3643  *      initialized by this function based on parsed options.
3644  *      Caller must release with rbd_spec_put().
3645  *
3646  * The options passed take this form:
3647  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3648  * where:
3649  *  <mon_addrs>
3650  *      A comma-separated list of one or more monitor addresses.
3651  *      A monitor address is an ip address, optionally followed
3652  *      by a port number (separated by a colon).
3653  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3654  *  <options>
3655  *      A comma-separated list of ceph and/or rbd options.
3656  *  <pool_name>
3657  *      The name of the rados pool containing the rbd image.
3658  *  <image_name>
3659  *      The name of the image in that pool to map.
3660  *  <snap_id>
3661  *      An optional snapshot id.  If provided, the mapping will
3662  *      present data from the image at the time that snapshot was
3663  *      created.  The image head is used if no snapshot id is
3664  *      provided.  Snapshot mappings are always read-only.
3665  */
3666 static int rbd_add_parse_args(const char *buf,
3667                                 struct ceph_options **ceph_opts,
3668                                 struct rbd_options **opts,
3669                                 struct rbd_spec **rbd_spec)
3670 {
3671         size_t len;
3672         char *options;
3673         const char *mon_addrs;
3674         size_t mon_addrs_size;
3675         struct rbd_spec *spec = NULL;
3676         struct rbd_options *rbd_opts = NULL;
3677         struct ceph_options *copts;
3678         int ret;
3679
3680         /* The first four tokens are required */
3681
3682         len = next_token(&buf);
3683         if (!len) {
3684                 rbd_warn(NULL, "no monitor address(es) provided");
3685                 return -EINVAL;
3686         }
3687         mon_addrs = buf;
3688         mon_addrs_size = len + 1;
3689         buf += len;
3690
3691         ret = -EINVAL;
3692         options = dup_token(&buf, NULL);
3693         if (!options)
3694                 return -ENOMEM;
3695         if (!*options) {
3696                 rbd_warn(NULL, "no options provided");
3697                 goto out_err;
3698         }
3699
3700         spec = rbd_spec_alloc();
3701         if (!spec)
3702                 goto out_mem;
3703
3704         spec->pool_name = dup_token(&buf, NULL);
3705         if (!spec->pool_name)
3706                 goto out_mem;
3707         if (!*spec->pool_name) {
3708                 rbd_warn(NULL, "no pool name provided");
3709                 goto out_err;
3710         }
3711
3712         spec->image_name = dup_token(&buf, NULL);
3713         if (!spec->image_name)
3714                 goto out_mem;
3715         if (!*spec->image_name) {
3716                 rbd_warn(NULL, "no image name provided");
3717                 goto out_err;
3718         }
3719
3720         /*
3721          * Snapshot name is optional; default is to use "-"
3722          * (indicating the head/no snapshot).
3723          */
3724         len = next_token(&buf);
3725         if (!len) {
3726                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3727                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3728         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3729                 ret = -ENAMETOOLONG;
3730                 goto out_err;
3731         }
3732         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3733         if (!spec->snap_name)
3734                 goto out_mem;
3735         *(spec->snap_name + len) = '\0';
3736
3737         /* Initialize all rbd options to the defaults */
3738
3739         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3740         if (!rbd_opts)
3741                 goto out_mem;
3742
3743         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3744
3745         copts = ceph_parse_options(options, mon_addrs,
3746                                         mon_addrs + mon_addrs_size - 1,
3747                                         parse_rbd_opts_token, rbd_opts);
3748         if (IS_ERR(copts)) {
3749                 ret = PTR_ERR(copts);
3750                 goto out_err;
3751         }
3752         kfree(options);
3753
3754         *ceph_opts = copts;
3755         *opts = rbd_opts;
3756         *rbd_spec = spec;
3757
3758         return 0;
3759 out_mem:
3760         ret = -ENOMEM;
3761 out_err:
3762         kfree(rbd_opts);
3763         rbd_spec_put(spec);
3764         kfree(options);
3765
3766         return ret;
3767 }
3768
3769 /*
3770  * An rbd format 2 image has a unique identifier, distinct from the
3771  * name given to it by the user.  Internally, that identifier is
3772  * what's used to specify the names of objects related to the image.
3773  *
3774  * A special "rbd id" object is used to map an rbd image name to its
3775  * id.  If that object doesn't exist, then there is no v2 rbd image
3776  * with the supplied name.
3777  *
3778  * This function will record the given rbd_dev's image_id field if
3779  * it can be determined, and in that case will return 0.  If any
3780  * errors occur a negative errno will be returned and the rbd_dev's
3781  * image_id field will be unchanged (and should be NULL).
3782  */
3783 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3784 {
3785         int ret;
3786         size_t size;
3787         char *object_name;
3788         void *response;
3789         void *p;
3790
3791         /*
3792          * When probing a parent image, the image id is already
3793          * known (and the image name likely is not).  There's no
3794          * need to fetch the image id again in this case.
3795          */
3796         if (rbd_dev->spec->image_id)
3797                 return 0;
3798
3799         /*
3800          * First, see if the format 2 image id file exists, and if
3801          * so, get the image's persistent id from it.
3802          */
3803         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3804         object_name = kmalloc(size, GFP_NOIO);
3805         if (!object_name)
3806                 return -ENOMEM;
3807         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3808         dout("rbd id object name is %s\n", object_name);
3809
3810         /* Response will be an encoded string, which includes a length */
3811
3812         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3813         response = kzalloc(size, GFP_NOIO);
3814         if (!response) {
3815                 ret = -ENOMEM;
3816                 goto out;
3817         }
3818
3819         ret = rbd_obj_method_sync(rbd_dev, object_name,
3820                                 "rbd", "get_id",
3821                                 NULL, 0,
3822                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3823         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3824         if (ret < 0)
3825                 goto out;
3826
3827         p = response;
3828         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3829                                                 p + RBD_IMAGE_ID_LEN_MAX,
3830                                                 NULL, GFP_NOIO);
3831         if (IS_ERR(rbd_dev->spec->image_id)) {
3832                 ret = PTR_ERR(rbd_dev->spec->image_id);
3833                 rbd_dev->spec->image_id = NULL;
3834         } else {
3835                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3836         }
3837 out:
3838         kfree(response);
3839         kfree(object_name);
3840
3841         return ret;
3842 }
3843
3844 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3845 {
3846         int ret;
3847         size_t size;
3848
3849         /* Version 1 images have no id; empty string is used */
3850
3851         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3852         if (!rbd_dev->spec->image_id)
3853                 return -ENOMEM;
3854
3855         /* Record the header object name for this rbd image. */
3856
3857         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3858         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3859         if (!rbd_dev->header_name) {
3860                 ret = -ENOMEM;
3861                 goto out_err;
3862         }
3863         sprintf(rbd_dev->header_name, "%s%s",
3864                 rbd_dev->spec->image_name, RBD_SUFFIX);
3865
3866         /* Populate rbd image metadata */
3867
3868         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3869         if (ret < 0)
3870                 goto out_err;
3871
3872         /* Version 1 images have no parent (no layering) */
3873
3874         rbd_dev->parent_spec = NULL;
3875         rbd_dev->parent_overlap = 0;
3876
3877         rbd_dev->image_format = 1;
3878
3879         dout("discovered version 1 image, header name is %s\n",
3880                 rbd_dev->header_name);
3881
3882         return 0;
3883
3884 out_err:
3885         kfree(rbd_dev->header_name);
3886         rbd_dev->header_name = NULL;
3887         kfree(rbd_dev->spec->image_id);
3888         rbd_dev->spec->image_id = NULL;
3889
3890         return ret;
3891 }
3892
3893 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3894 {
3895         size_t size;
3896         int ret;
3897         u64 ver = 0;
3898
3899         /*
3900          * Image id was filled in by the caller.  Record the header
3901          * object name for this rbd image.
3902          */
3903         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3904         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3905         if (!rbd_dev->header_name)
3906                 return -ENOMEM;
3907         sprintf(rbd_dev->header_name, "%s%s",
3908                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3909
3910         /* Get the size and object order for the image */
3911
3912         ret = rbd_dev_v2_image_size(rbd_dev);
3913         if (ret < 0)
3914                 goto out_err;
3915
3916         /* Get the object prefix (a.k.a. block_name) for the image */
3917
3918         ret = rbd_dev_v2_object_prefix(rbd_dev);
3919         if (ret < 0)
3920                 goto out_err;
3921
3922         /* Get the and check features for the image */
3923
3924         ret = rbd_dev_v2_features(rbd_dev);
3925         if (ret < 0)
3926                 goto out_err;
3927
3928         /* If the image supports layering, get the parent info */
3929
3930         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3931                 ret = rbd_dev_v2_parent_info(rbd_dev);
3932                 if (ret < 0)
3933                         goto out_err;
3934         }
3935
3936         /* crypto and compression type aren't (yet) supported for v2 images */
3937
3938         rbd_dev->header.crypt_type = 0;
3939         rbd_dev->header.comp_type = 0;
3940
3941         /* Get the snapshot context, plus the header version */
3942
3943         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3944         if (ret)
3945                 goto out_err;
3946         rbd_dev->header.obj_version = ver;
3947
3948         rbd_dev->image_format = 2;
3949
3950         dout("discovered version 2 image, header name is %s\n",
3951                 rbd_dev->header_name);
3952
3953         return 0;
3954 out_err:
3955         rbd_dev->parent_overlap = 0;
3956         rbd_spec_put(rbd_dev->parent_spec);
3957         rbd_dev->parent_spec = NULL;
3958         kfree(rbd_dev->header_name);
3959         rbd_dev->header_name = NULL;
3960         kfree(rbd_dev->header.object_prefix);
3961         rbd_dev->header.object_prefix = NULL;
3962
3963         return ret;
3964 }
3965
3966 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3967 {
3968         int ret;
3969
3970         /* no need to lock here, as rbd_dev is not registered yet */
3971         ret = rbd_dev_snaps_update(rbd_dev);
3972         if (ret)
3973                 return ret;
3974
3975         ret = rbd_dev_probe_update_spec(rbd_dev);
3976         if (ret)
3977                 goto err_out_snaps;
3978
3979         ret = rbd_dev_set_mapping(rbd_dev);
3980         if (ret)
3981                 goto err_out_snaps;
3982
3983         /* generate unique id: find highest unique id, add one */
3984         rbd_dev_id_get(rbd_dev);
3985
3986         /* Fill in the device name, now that we have its id. */
3987         BUILD_BUG_ON(DEV_NAME_LEN
3988                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3989         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3990
3991         /* Get our block major device number. */
3992
3993         ret = register_blkdev(0, rbd_dev->name);
3994         if (ret < 0)
3995                 goto err_out_id;
3996         rbd_dev->major = ret;
3997
3998         /* Set up the blkdev mapping. */
3999
4000         ret = rbd_init_disk(rbd_dev);
4001         if (ret)
4002                 goto err_out_blkdev;
4003
4004         ret = rbd_bus_add_dev(rbd_dev);
4005         if (ret)
4006                 goto err_out_disk;
4007
4008         /*
4009          * At this point cleanup in the event of an error is the job
4010          * of the sysfs code (initiated by rbd_bus_del_dev()).
4011          */
4012         down_write(&rbd_dev->header_rwsem);
4013         ret = rbd_dev_snaps_register(rbd_dev);
4014         up_write(&rbd_dev->header_rwsem);
4015         if (ret)
4016                 goto err_out_bus;
4017
4018         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4019         if (ret)
4020                 goto err_out_bus;
4021
4022         /* Everything's ready.  Announce the disk to the world. */
4023
4024         add_disk(rbd_dev->disk);
4025
4026         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4027                 (unsigned long long) rbd_dev->mapping.size);
4028
4029         return ret;
4030 err_out_bus:
4031         /* this will also clean up rest of rbd_dev stuff */
4032
4033         rbd_bus_del_dev(rbd_dev);
4034
4035         return ret;
4036 err_out_disk:
4037         rbd_free_disk(rbd_dev);
4038 err_out_blkdev:
4039         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4040 err_out_id:
4041         rbd_dev_id_put(rbd_dev);
4042 err_out_snaps:
4043         rbd_remove_all_snaps(rbd_dev);
4044
4045         return ret;
4046 }
4047
4048 /*
4049  * Probe for the existence of the header object for the given rbd
4050  * device.  For format 2 images this includes determining the image
4051  * id.
4052  */
4053 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4054 {
4055         int ret;
4056
4057         /*
4058          * Get the id from the image id object.  If it's not a
4059          * format 2 image, we'll get ENOENT back, and we'll assume
4060          * it's a format 1 image.
4061          */
4062         ret = rbd_dev_image_id(rbd_dev);
4063         if (ret)
4064                 ret = rbd_dev_v1_probe(rbd_dev);
4065         else
4066                 ret = rbd_dev_v2_probe(rbd_dev);
4067         if (ret) {
4068                 dout("probe failed, returning %d\n", ret);
4069
4070                 return ret;
4071         }
4072
4073         ret = rbd_dev_probe_finish(rbd_dev);
4074         if (ret)
4075                 rbd_header_free(&rbd_dev->header);
4076
4077         return ret;
4078 }
4079
4080 static ssize_t rbd_add(struct bus_type *bus,
4081                        const char *buf,
4082                        size_t count)
4083 {
4084         struct rbd_device *rbd_dev = NULL;
4085         struct ceph_options *ceph_opts = NULL;
4086         struct rbd_options *rbd_opts = NULL;
4087         struct rbd_spec *spec = NULL;
4088         struct rbd_client *rbdc;
4089         struct ceph_osd_client *osdc;
4090         int rc = -ENOMEM;
4091
4092         if (!try_module_get(THIS_MODULE))
4093                 return -ENODEV;
4094
4095         /* parse add command */
4096         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4097         if (rc < 0)
4098                 goto err_out_module;
4099
4100         rbdc = rbd_get_client(ceph_opts);
4101         if (IS_ERR(rbdc)) {
4102                 rc = PTR_ERR(rbdc);
4103                 goto err_out_args;
4104         }
4105         ceph_opts = NULL;       /* rbd_dev client now owns this */
4106
4107         /* pick the pool */
4108         osdc = &rbdc->client->osdc;
4109         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4110         if (rc < 0)
4111                 goto err_out_client;
4112         spec->pool_id = (u64) rc;
4113
4114         /* The ceph file layout needs to fit pool id in 32 bits */
4115
4116         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4117                 rc = -EIO;
4118                 goto err_out_client;
4119         }
4120
4121         rbd_dev = rbd_dev_create(rbdc, spec);
4122         if (!rbd_dev)
4123                 goto err_out_client;
4124         rbdc = NULL;            /* rbd_dev now owns this */
4125         spec = NULL;            /* rbd_dev now owns this */
4126
4127         rbd_dev->mapping.read_only = rbd_opts->read_only;
4128         kfree(rbd_opts);
4129         rbd_opts = NULL;        /* done with this */
4130
4131         rc = rbd_dev_probe(rbd_dev);
4132         if (rc < 0)
4133                 goto err_out_rbd_dev;
4134
4135         return count;
4136 err_out_rbd_dev:
4137         rbd_dev_destroy(rbd_dev);
4138 err_out_client:
4139         rbd_put_client(rbdc);
4140 err_out_args:
4141         if (ceph_opts)
4142                 ceph_destroy_options(ceph_opts);
4143         kfree(rbd_opts);
4144         rbd_spec_put(spec);
4145 err_out_module:
4146         module_put(THIS_MODULE);
4147
4148         dout("Error adding device %s\n", buf);
4149
4150         return (ssize_t) rc;
4151 }
4152
4153 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4154 {
4155         struct list_head *tmp;
4156         struct rbd_device *rbd_dev;
4157
4158         spin_lock(&rbd_dev_list_lock);
4159         list_for_each(tmp, &rbd_dev_list) {
4160                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4161                 if (rbd_dev->dev_id == dev_id) {
4162                         spin_unlock(&rbd_dev_list_lock);
4163                         return rbd_dev;
4164                 }
4165         }
4166         spin_unlock(&rbd_dev_list_lock);
4167         return NULL;
4168 }
4169
4170 static void rbd_dev_release(struct device *dev)
4171 {
4172         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4173
4174         if (rbd_dev->watch_event)
4175                 rbd_dev_header_watch_sync(rbd_dev, 0);
4176
4177         /* clean up and free blkdev */
4178         rbd_free_disk(rbd_dev);
4179         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4180
4181         /* release allocated disk header fields */
4182         rbd_header_free(&rbd_dev->header);
4183
4184         /* done with the id, and with the rbd_dev */
4185         rbd_dev_id_put(rbd_dev);
4186         rbd_assert(rbd_dev->rbd_client != NULL);
4187         rbd_dev_destroy(rbd_dev);
4188
4189         /* release module ref */
4190         module_put(THIS_MODULE);
4191 }
4192
4193 static ssize_t rbd_remove(struct bus_type *bus,
4194                           const char *buf,
4195                           size_t count)
4196 {
4197         struct rbd_device *rbd_dev = NULL;
4198         int target_id, rc;
4199         unsigned long ul;
4200         int ret = count;
4201
4202         rc = strict_strtoul(buf, 10, &ul);
4203         if (rc)
4204                 return rc;
4205
4206         /* convert to int; abort if we lost anything in the conversion */
4207         target_id = (int) ul;
4208         if (target_id != ul)
4209                 return -EINVAL;
4210
4211         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4212
4213         rbd_dev = __rbd_get_dev(target_id);
4214         if (!rbd_dev) {
4215                 ret = -ENOENT;
4216                 goto done;
4217         }
4218
4219         spin_lock_irq(&rbd_dev->lock);
4220         if (rbd_dev->open_count)
4221                 ret = -EBUSY;
4222         else
4223                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4224         spin_unlock_irq(&rbd_dev->lock);
4225         if (ret < 0)
4226                 goto done;
4227
4228         rbd_remove_all_snaps(rbd_dev);
4229         rbd_bus_del_dev(rbd_dev);
4230
4231 done:
4232         mutex_unlock(&ctl_mutex);
4233
4234         return ret;
4235 }
4236
4237 /*
4238  * create control files in sysfs
4239  * /sys/bus/rbd/...
4240  */
4241 static int rbd_sysfs_init(void)
4242 {
4243         int ret;
4244
4245         ret = device_register(&rbd_root_dev);
4246         if (ret < 0)
4247                 return ret;
4248
4249         ret = bus_register(&rbd_bus_type);
4250         if (ret < 0)
4251                 device_unregister(&rbd_root_dev);
4252
4253         return ret;
4254 }
4255
4256 static void rbd_sysfs_cleanup(void)
4257 {
4258         bus_unregister(&rbd_bus_type);
4259         device_unregister(&rbd_root_dev);
4260 }
4261
4262 static int __init rbd_init(void)
4263 {
4264         int rc;
4265
4266         if (!libceph_compatible(NULL)) {
4267                 rbd_warn(NULL, "libceph incompatibility (quitting)");
4268
4269                 return -EINVAL;
4270         }
4271         rc = rbd_sysfs_init();
4272         if (rc)
4273                 return rc;
4274         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4275         return 0;
4276 }
4277
4278 static void __exit rbd_exit(void)
4279 {
4280         rbd_sysfs_cleanup();
4281 }
4282
4283 module_init(rbd_init);
4284 module_exit(rbd_exit);
4285
4286 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4287 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4288 MODULE_DESCRIPTION("rados block device");
4289
4290 /* following authorship retained from original osdblk.c */
4291 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4292
4293 MODULE_LICENSE("GPL");